Datasets: Distributed Data Loading and Compute


Datasets is available as beta in Ray 1.8+. Please file feature requests and bug reports on GitHub Issues or join the discussion on the Ray Slack.

Ray Datasets are the standard way to load and exchange data in Ray libraries and applications. Datasets provide basic distributed data transformations such as map, filter, and repartition, and are compatible with a variety of file formats, datasources, and distributed frameworks.


Data Loading for ML Training

Ray Datasets are designed to load and preprocess data for distributed ML training pipelines. Compared to other loading solutions, Datasets is more flexible (e.g., can express higher-quality per-epoch global shuffles) and provides higher overall performance.

Datasets is not intended as a replacement for more general data processing systems. Its utility is as the last-mile bridge from ETL pipeline outputs to distributed applications and libraries in Ray:


Ray-integrated DataFrame libraries can also be seamlessly used with Datasets, to enable running a full data to ML pipeline completely within Ray without requiring data to be materialized to external storage:


See the Talks section for more Dataset ML use cases and benchmarks.

General Parallel Compute

Beyond data loading, Datasets simplifies general purpose parallel GPU/CPU compute in Ray (e.g., for GPU batch inference). Datasets provides a higher level API for Ray tasks and actors in such embarassingly parallel compute situations, internally handling operations like batching, pipelining, and memory management.


Since it is built on Ray, Datasets can leverage the full functionality of Ray’s distributed scheduler, e.g., using actors for optimizing setup time and GPU scheduling via the num_gpus argument.


Ray Datasets implement Distributed Arrow. A Dataset consists of a list of Ray object references to blocks. Each block holds a set of items in either an Arrow table or a Python list (for Arrow incompatible objects). Having multiple blocks in a dataset allows for parallel transformation and ingest of the data (e.g., into Ray Train for ML training).

The following figure visualizes a Dataset that has three Arrow table blocks, each block holding 1000 rows each:


Since a Ray Dataset is just a list of Ray object references, it can be freely passed between Ray tasks, actors, and libraries like any other object reference. This flexibility is a unique characteristic of Ray Datasets.

Compared to Spark RDDs and Dask Bags, Datasets offers a more basic set of features, and executes operations eagerly for simplicity. It is intended that users cast Datasets into more featureful dataframe types (e.g., ds.to_dask()) for advanced operations.

Datasource Compatibility Matrices

Input compatibility matrix

Input Type

Read API


CSV File Format

JSON File Format

Parquet File Format

Numpy File Format

Text Files

Binary Files

Python Objects

Spark Dataframe

Dask Dataframe

Modin Dataframe

MARS Dataframe


Pandas Dataframe Objects

NumPy ndarray Objects

Arrow Table Objects

Custom Datasource

Output compatibility matrix

Output Type

Dataset API


CSV File Format


JSON File Format


Parquet File Format


Numpy File Format


Spark Dataframe


Dask Dataframe


Modin Dataframe


MARS Dataframe



Arrow Table Objects


Arrow Table Iterator


Single Pandas Dataframe


Pandas Dataframe Objects


NumPy ndarray Objects


Pandas Dataframe Iterator


PyTorch Iterable Dataset


TensorFlow Iterable Dataset


Custom Datasource


Creating Datasets


Run pip install "ray[data]" to get started!

Get started by creating Datasets from synthetic data using and Datasets can hold either plain Python objects (schema is a Python type), or Arrow records (schema is Arrow).

import ray

# Create a Dataset of Python objects.
ds =
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)

# -> [0, 1, 2, 3, 4]

# -> 10000

# Create a Dataset of Arrow records.
ds =[{"col1": i, "col2": str(i)} for i in range(10000)])
# -> Dataset(num_blocks=200, num_rows=10000, schema={col1: int64, col2: string})
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}
# -> {'col1': 3, 'col2': '3'}
# -> {'col1': 4, 'col2': '4'}

# -> col1: int64
# -> col2: string

Datasets can be created from files on local disk or remote datasources such as S3. Any filesystem supported by pyarrow can be used to specify file locations:

# Read a directory of files in remote storage.
ds ="s3://bucket/path")

# Read multiple local files.
ds =["/path/to/file1", "/path/to/file2"])

# Read multiple directories.
ds =["s3://bucket/path1", "s3://bucket/path2"])

Finally, you can create a Dataset from existing data in the Ray object store or Ray-compatible distributed DataFrames:

import pandas as pd
import dask.dataframe as dd

# Create a Dataset from a list of Pandas DataFrame objects.
pdf = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
ds =[pdf])

# Create a Dataset from a Dask-on-Ray DataFrame.
dask_df = dd.from_pandas(pdf, npartitions=10)
ds =

Saving Datasets

Datasets can be written to local or remote storage using .write_csv(), .write_json(), and .write_parquet().

# Write to csv files in /tmp/output."/tmp/output")
# -> /tmp/output/data0.csv, /tmp/output/data1.csv, ...

# Use repartition to control the number of output files:"/tmp/output2")
# -> /tmp/output2/data0.csv

You can also convert a Dataset to Ray-compatibile distributed DataFrames:

# Convert a Ray Dataset into a Dask-on-Ray DataFrame.
dask_df = ds.to_dask()

Transforming Datasets

Datasets can be transformed in parallel using .map(). Transformations are executed eagerly and block until the operation is finished. Datasets also supports .filter() and .flat_map().

ds =
ds = x: x * 2)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
# -> [0, 2, 4, 6, 8]

ds.filter(lambda x: x > 5).take(5)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1859.63it/s]
# -> [6, 8, 10, 12, 14]

ds.flat_map(lambda x: [x, -x]).take(5)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1568.10it/s]
# -> [0, 0, 2, -2, 4]

To take advantage of vectorized functions, use .map_batches(). Note that you can also implement filter and flat_map using .map_batches(), since your map function can return an output batch of any size.

ds =
ds = ds.map_batches(
    lambda df: df.applymap(lambda x: x * 2), batch_format="pandas")
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1927.62it/s]
# -> [{'value': 0}, {'value': 2}, ...]

By default, transformations are executed using Ray tasks. For transformations that require setup, specify compute="actors" and Ray will use an autoscaling actor pool to execute your transforms instead. The following is an end-to-end example of reading, transforming, and saving batch inference results using Datasets:

# Example of GPU batch inference on an ImageNet model.
def preprocess(image: bytes) -> bytes:
    return image

class BatchInferModel:
    def __init__(self):
        self.model = ImageNetModel()
    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        return self.model(batch)

ds ="s3://bucket/image-dir")

# Preprocess the data.
ds =
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]

# Apply GPU batch inference with actors, and assign each actor a GPU using
# ``num_gpus=1`` (any Ray remote decorator argument can be used here).
ds = ds.map_batches(BatchInferModel, compute="actors", batch_size=256, num_gpus=1)
# -> Map Progress (16 actors 4 pending): 100%|██████| 200/200 [00:07, 27.60it/s]

# Save the results.

Exchanging datasets

Datasets can be passed to Ray tasks or actors and read with .iter_batches() or .iter_rows(). This does not incur a copy, since the blocks of the Dataset are passed by reference as Ray objects:

def consume(data: Dataset[int]) -> int:
    num_batches = 0
    for batch in data.iter_batches():
        num_batches += 1
    return num_batches

ds =
# -> 200

Datasets can be split up into disjoint sub-datasets. Locality-aware splitting is supported if you pass in a list of actor handles to the split() function along with the number of desired splits. This is a common pattern useful for loading and splitting data between distributed training actors:

class Worker:
    def __init__(self, rank: int):

    def train(self, shard:[int]) -> int:
        for batch in shard.iter_batches(batch_size=256):
        return shard.count()

workers = [Worker.remote(i) for i in range(16)]
# -> [Actor(Worker, ...), Actor(Worker, ...), ...]

ds =
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)

shards = ds.split(n=16, locality_hints=workers)
# -> [Dataset(num_blocks=13, num_rows=650, schema=<class 'int'>),
#     Dataset(num_blocks=13, num_rows=650, schema=<class 'int'>), ...]

ray.get([w.train.remote(s) for s in shards])
# -> [650, 650, ...]

Custom datasources

Datasets can read and write in parallel to custom datasources defined in Python.

# Read from a custom datasource.
ds =, **read_args)

# Write to a custom datasource.
ds.write_datasource(YourCustomDatasource(), **write_args)

Talks and Materials


Contributions to Datasets are welcome! There are many potential improvements, including:

  • Supporting more datasources and transforms.

  • Integration with more ecosystem libraries.

  • Adding features that require partitioning such as groupby() and join().

  • Performance optimizations.