Datasets: Flexible Distributed Data Loading

Tip

Datasets is available as alpha in Ray 1.6+. Please file feature requests and bug reports on GitHub Issues or join the discussion on the Ray Slack.

Ray Datasets are the standard way to load and exchange data in Ray libraries and applications. Datasets provide basic distributed data transformations such as map, filter, and repartition, and are compatible with a variety of file formats, datasources, and distributed frameworks.

../_images/dataset.svg

Concepts

Ray Datasets implement Distributed Arrow. A Dataset consists of a list of Ray object references to blocks. Each block holds a set of items in either an Arrow table, Arrow tensor, or a Python list (for Arrow incompatible objects). Having multiple blocks in a dataset allows for parallel transformation and ingest of the data.

The following figure visualizes a Dataset that has three Arrow table blocks, each block holding 1000 rows each:

../_images/dataset-arch.svg

Since a Ray Dataset is just a list of Ray object references, it can be freely passed between Ray tasks, actors, and libraries like any other object reference. This flexibility is a unique characteristic of Ray Datasets.

Compared to Spark RDDs and Dask Bags, Datasets offers a more basic set of features, and executes operations eagerly for simplicity. It is intended that users cast Datasets into more featureful dataframe types (e.g., ds.to_dask()) for advanced operations.

Datasource Compatibility Matrices

Input compatibility matrix

Input Type

Read API

Status

CSV File Format

ray.data.read_csv()

JSON File Format

ray.data.read_json()

Parquet File Format

ray.data.read_parquet()

Numpy File Format

ray.data.read_numpy()

Text Files

ray.data.read_text()

Binary Files

ray.data.read_binary_files()

Python Objects

ray.data.from_items()

Spark Dataframe

ray.data.from_spark()

Dask Dataframe

ray.data.from_dask()

Modin Dataframe

ray.data.from_modin()

MARS Dataframe

ray.data.from_mars()

(todo)

Pandas Dataframe Objects

ray.data.from_pandas()

NumPy ndarray Objects

ray.data.from_numpy()

Arrow Table Objects

ray.data.from_arrow()

Custom Datasource

ray.data.read_datasource()

Output compatibility matrix

Output Type

Dataset API

Status

CSV File Format

ds.write_csv()

JSON File Format

ds.write_json()

Parquet File Format

ds.write_parquet()

Numpy File Format

ds.write_numpy()

Spark Dataframe

ds.to_spark()

Dask Dataframe

ds.to_dask()

Modin Dataframe

ds.to_modin()

MARS Dataframe

ds.to_mars()

(todo)

Arrow Table Objects

ds.to_arrow()

Arrow Table Iterator

ds.iter_batches(batch_format="pyarrow")

Pandas Dataframe Objects

ds.to_pandas()

NumPy ndarray Objects

ds.to_numpy()

Pandas Dataframe Iterator

ds.iter_batches(batch_format="pandas")

PyTorch Iterable Dataset

ds.to_torch()

TensorFlow Iterable Dataset

ds.to_tf()

Custom Datasource

ds.write_datasource()

Creating Datasets

Get started by creating Datasets from synthetic data using ray.data.range() and ray.data.from_items(). Datasets can hold either plain Python objects (schema is a Python type), or Arrow records (schema is Arrow).

import ray

# Create a Dataset of Python objects.
ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)

ds.take(5)
# -> [0, 1, 2, 3, 4]

ds.count()
# -> 10000

# Create a Dataset of Arrow records.
ds = ray.data.from_items([{"col1": i, "col2": str(i)} for i in range(10000)])
# -> Dataset(num_blocks=200, num_rows=10000, schema={col1: int64, col2: string})

ds.show(5)
# -> ArrowRow({'col1': 0, 'col2': '0'})
# -> ArrowRow({'col1': 1, 'col2': '1'})
# -> ArrowRow({'col1': 2, 'col2': '2'})
# -> ArrowRow({'col1': 3, 'col2': '3'})
# -> ArrowRow({'col1': 4, 'col2': '4'})

ds.schema()
# -> col1: int64
# -> col2: string

Datasets can be created from files on local disk or remote datasources such as S3. Any filesystem supported by pyarrow can be used to specify file locations:

# Read a directory of files in remote storage.
ds = ray.data.read_csv("s3://bucket/path")

# Read multiple local files.
ds = ray.data.read_csv(["/path/to/file1", "/path/to/file2"])

# Read multiple directories.
ds = ray.data.read_csv(["s3://bucket/path1", "s3://bucket/path2"])

Finally, you can create a Dataset from existing data in the Ray object store or Ray-compatible distributed DataFrames:

import pandas as pd
import dask.dataframe as dd

# Create a Dataset from a list of Pandas DataFrame objects.
pdf = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
ds = ray.data.from_pandas([ray.put(pdf)])

# Create a Dataset from a Dask-on-Ray DataFrame.
dask_df = dd.from_pandas(pdf, npartitions=10)
ds = ray.data.from_dask(dask_df)

Saving Datasets

Datasets can be written to local or remote storage using .write_csv(), .write_json(), and .write_parquet().

# Write to csv files in /tmp/output.
ray.data.range(10000).write_csv("/tmp/output")
# -> /tmp/output/data0.csv, /tmp/output/data1.csv, ...

# Use repartition to control the number of output files:
ray.data.range(10000).repartition(1).write_csv("/tmp/output2")
# -> /tmp/output2/data0.csv

You can also convert a Dataset to Ray-compatibile distributed DataFrames:

# Convert a Ray Dataset into a Dask-on-Ray DataFrame.
dask_df = ds.to_dask()

Transforming Datasets

Datasets can be transformed in parallel using .map(). Transformations are executed eagerly and block until the operation is finished. Datasets also supports .filter() and .flat_map().

ds = ray.data.range(10000)
ds = ds.map(lambda x: x * 2)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
ds.take(5)
# -> [0, 2, 4, 6, 8]

ds.filter(lambda x: x > 5).take(5)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1859.63it/s]
# -> [6, 8, 10, 12, 14]

ds.flat_map(lambda x: [x, -x]).take(5)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1568.10it/s]
# -> [0, 0, 2, -2, 4]

To take advantage of vectorized functions, use .map_batches(). Note that you can also implement filter and flat_map using .map_batches(), since your map function can return an output batch of any size.

ds = ray.data.range_arrow(10000)
ds = ds.map_batches(
    lambda df: df.applymap(lambda x: x * 2), batch_format="pandas")
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1927.62it/s]
ds.take(5)
# -> [ArrowRow({'value': 0}), ArrowRow({'value': 2}), ...]

By default, transformations are executed using Ray tasks. For transformations that require setup, specify compute="actors" and Ray will use an autoscaling actor pool to execute your transforms instead. The following is an end-to-end example of reading, transforming, and saving batch inference results using Datasets:

# Example of GPU batch inference on an ImageNet model.
def preprocess(image: bytes) -> bytes:
    return image

class BatchInferModel:
    def __init__(self):
        self.model = ImageNetModel()
    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        return self.model(batch)

ds = ray.data.read_binary_files("s3://bucket/image-dir")

# Preprocess the data.
ds = ds.map(preprocess)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]

# Apply GPU batch inference with actors, and assign each actor a GPU using
# ``num_gpus=1`` (any Ray remote decorator argument can be used here).
ds = ds.map_batches(BatchInferModel, compute="actors", batch_size=256, num_gpus=1)
# -> Map Progress (16 actors 4 pending): 100%|██████| 200/200 [00:07, 27.60it/s]

# Save the results.
ds.repartition(1).write_json("s3://bucket/inference-results")

Exchanging datasets

Datasets can be passed to Ray tasks or actors and read with .iter_batches() or .iter_rows(). This does not incur a copy, since the blocks of the Dataset are passed by reference as Ray objects:

@ray.remote
def consume(data: Dataset[int]) -> int:
    num_batches = 0
    for batch in data.iter_batches():
        num_batches += 1
    return num_batches

ds = ray.data.range(10000)
ray.get(consume.remote(ds))
# -> 200

Datasets can be split up into disjoint sub-datasets. Locality-aware splitting is supported if you pass in a list of actor handles to the split() function along with the number of desired splits. This is a common pattern useful for loading and splitting data between distributed training actors:

@ray.remote(num_gpus=1)
class Worker:
    def __init__(self, rank: int):
        pass

    def train(self, shard: ray.data.Dataset[int]) -> int:
        for batch in shard.iter_batches(batch_size=256):
            pass
        return shard.count()

workers = [Worker.remote(i) for i in range(16)]
# -> [Actor(Worker, ...), Actor(Worker, ...), ...]

ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)

shards = ds.split(n=16, locality_hints=workers)
# -> [Dataset(num_blocks=13, num_rows=650, schema=<class 'int'>),
#     Dataset(num_blocks=13, num_rows=650, schema=<class 'int'>), ...]

ray.get([w.train.remote(s) for s in shards])
# -> [650, 650, ...]

Custom datasources

Datasets can read and write in parallel to custom datasources defined in Python.

# Read from a custom datasource.
ds = ray.data.read_datasource(YourCustomDatasource(), **read_args)

# Write to a custom datasource.
ds.write_datasource(YourCustomDatasource(), **write_args)

Contributing

Contributions to Datasets are welcome! There are many potential improvements, including:

  • Supporting more datasources and transforms.

  • Integration with more ecosystem libraries.

  • Adding features that require partitioning such as groupby() and join().

  • Performance optimizations.