Getting Started

A Ray Dataset is a distributed data collection. It holds a list of Ray object references pointing to distributed data blocks, and has APIs for distributed data loading and processing. Each block holds an ordered collection of items in either an Arrow table (when creating from or transforming to tabular or tensor data) or a Python list (for non-tabular Python objects).

In this tutorial you will learn how to:

  • Create and save a Ray Dataset.

  • Transform a Dataset.

  • Pass a Dataset to Ray tasks/actors and access the data inside.

Tip

Run pip install "ray[data]" to get started!

Creating and Saving Datasets

You can create a Dataset from Python objects. These objects can be held inside Dataset as the plain Python objects (where the schema is a Python type), or as Arrow records (in which case their schema is Arrow).

import ray

# Create a Dataset of Python objects.
ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)

ds.take(5)
# -> [0, 1, 2, 3, 4]

ds.schema()
# <class 'int'>

# Create a Dataset from Python objects, which are held as Arrow records.
ds = ray.data.from_items([
        {"sepal.length": 5.1, "sepal.width": 3.5,
         "petal.length": 1.4, "petal.width": 0.2, "variety": "Setosa"},
        {"sepal.length": 4.9, "sepal.width": 3.0,
         "petal.length": 1.4, "petal.width": 0.2, "variety": "Setosa"},
        {"sepal.length": 4.7, "sepal.width": 3.2,
         "petal.length": 1.3, "petal.width": 0.2, "variety": "Setosa"},
     ])
# Dataset(num_blocks=3, num_rows=3,
#         schema={sepal.length: float64, sepal.width: float64,
#                 petal.length: float64, petal.width: float64, variety: object})

ds.show()
# -> {'sepal.length': 5.1, 'sepal.width': 3.5,
#     'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.9, 'sepal.width': 3.0,
#     'petal.length': 1.4, 'petal.width': 0.2, 'variety': 'Setosa'}
# -> {'sepal.length': 4.7, 'sepal.width': 3.2,
#     'petal.length': 1.3, 'petal.width': 0.2, 'variety': 'Setosa'}

ds.schema()
# -> sepal.length: double
# -> sepal.width: double
# -> petal.length: double
# -> petal.width: double
# -> variety: string

Datasets can also be created from files on local disk or remote datasources such as S3. Any filesystem supported by pyarrow can be used to specify file locations. See more at Creating Datasets.

# Create from CSV.
# Tip: "example://" is a convenient protocol to access the
# python/ray/data/examples/data directory.
ds = ray.data.read_csv("example://iris.csv")
# Dataset(num_blocks=1, num_rows=150,
#         schema={sepal.length: float64, sepal.width: float64,
#                 petal.length: float64, petal.width: float64, variety: object})

# Create from Parquet.
ds = ray.data.read_parquet("example://iris.parquet")
# Dataset(num_blocks=1, num_rows=150,
#         schema={sepal.length: float64, sepal.width: float64,
#                 petal.length: float64, petal.width: float64, variety: object})

Once you have a Dataset (potentially after transformation), you can save it to local or remote storage in desired format, using .write_csv(), .write_json(), and .write_parquet(). See more at Saving Datasets.

# Write to Parquet files in /tmp/iris.
ds.write_parquet("/tmp/iris")
# -> /tmp/iris/data_000000.parquet

# Use repartition to control the number of output files:
ds.repartition(2).write_parquet("/tmp/iris2")
# -> /tmp/iris2/data_000000.parquet
# -> /tmp/iris2/data_000001.parquet

See the Creating Datasets and Saving Datasets guides for more details on how to create and save datasets.

Transforming Datasets

Once you have a Dataset, you can transform it by applying a user-defined function, which produces another Dataset. Under the hood, the transformation is executed in parallel for performance at scale.

import pandas

# Create 10 blocks for parallelism.
ds = ds.repartition(10)
# Dataset(num_blocks=10, num_rows=150,
#         schema={sepal.length: float64, sepal.width: float64,
#                 petal.length: float64, petal.width: float64, variety: object})

# Find rows with sepal.length < 5.5 and petal.length > 3.5.
def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame:
    return df[(df["sepal.length"] < 5.5) & (df["petal.length"] > 3.5)]

transformed_ds = ds.map_batches(transform_batch)
# Dataset(num_blocks=10, num_rows=3,
#         schema={sepal.length: float64, sepal.width: float64,
#                 petal.length: float64, petal.width: float64, variety: object})

transformed_ds.show()
# -> {'sepal.length': 5.2, 'sepal.width': 2.7,
#     'petal.length': 3.9, 'petal.width': 1.4, 'variety': 'Versicolor'}
# -> {'sepal.length': 5.4, 'sepal.width': 3.0,
#     'petal.length': 4.5, 'petal.width': 1.5, 'variety': 'Versicolor'}
# -> {'sepal.length': 4.9, 'sepal.width': 2.5,
#     'petal.length': 4.5, 'petal.width': 1.7, 'variety': 'Virginica'}

Tip

Datasets also provides the convenience transformation methods ds.map(), ds.flat_map(), and ds.filter(), which are not vectorized (slower than ds.map_batches()), but may be useful for development.

These transformations are composable. You can further apply transformations on the output Dataset, forming a chain of transformations to express more complex logic.

By default, transformations are executed using Ray tasks. For transformations that require setup, you may want to use Ray actors by specifying compute=ray.data.ActorPoolStrategy(min, max) and Ray will use an autoscaling actor pool of min to max actors to execute your transforms. This will cache the stateful setup at the actor creation time, which is particularly useful if the setup is expensive.

See the Transforming Datasets guide for an in-depth guide on transforming datasets.

Accessing and exchanging datasets

Datasets can be passed to Ray tasks or actors and accessed with .iter_batches() or .iter_rows(). This does not incur a copy, since the blocks of the Dataset are passed by reference as Ray objects:

@ray.remote
def consume(data) -> int:
    num_batches = 0
    for batch in data.iter_batches(batch_size=10):
        num_batches += 1
    return num_batches

ray.get(consume.remote(ds))
# -> 15

Datasets can be split up into disjoint sub-datasets. Locality-aware splitting is supported if you pass in a list of actor handles to the split() function along with the number of desired splits. This is a common pattern useful for loading and splitting data between distributed training actors:

@ray.remote
class Worker:
    def __init__(self, rank: int):
        pass

    def train(self, shard) -> int:
        for batch in shard.iter_batches(batch_size=256):
            pass
        return shard.count()

workers = [Worker.remote(i) for i in range(4)]
# -> [Actor(Worker, ...), Actor(Worker, ...), ...]

shards = ds.split(n=4)
# -> [
#       Dataset(num_blocks=3, num_rows=45,
#               schema={sepal.length: double, sepal.width: double,
#                       petal.length: double, petal.width: double, variety: string}),
#       Dataset(num_blocks=3, num_rows=45,
#               schema={sepal.length: double, sepal.width: double,
#                       petal.length: double, petal.width: double, variety: string}),
#       Dataset(num_blocks=2, num_rows=30,
#               schema={sepal.length: double, sepal.width: double,
#                       petal.length: double, petal.width: double, variety: string}),
#       Dataset(num_blocks=2, num_rows=30,
#               schema={sepal.length: double, sepal.width: double,
#                       petal.length: double, petal.width: double, variety: string}),
#    ]

ray.get([w.train.remote(s) for w, s in zip(workers, shards)])
# -> [45, 45, 30, 30]

See the Consuming Datasets guide for an in-depth guide on accessing and exchanging datasets.