Getting Started#

A Ray Dataset is a distributed data collection. It holds references to distributed data blocks, and exposes APIs for loading and processing data.

Install Ray Data#

To install Ray Data, run:

$ pip install 'ray[data]'

To learn more about installing Ray and its libraries, read Installing Ray.

Create a dataset#

Create datasets from on-disk files, Python objects, and cloud storage services like S3. Ray reads from any filesystem supported by Arrow.

import ray

dataset = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

dataset.show(limit=1)
{'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}

To learn more about creating datasets, read Creating datasets.

Transform the dataset#

Apply user-defined functions (UDFs) to transform datasets. Ray executes transformations in parallel for performance at scale.

import pandas as pd

# Find rows with spepal length < 5.5 and petal length > 3.5.
def transform_batch(df: pd.DataFrame) -> pd.DataFrame:
    return df[(df["sepal length (cm)"] < 5.5) & (df["petal length (cm)"] > 3.5)]

transformed_dataset = dataset.map_batches(transform_batch)
print(transformed_dataset)
MapBatches(transform_batch)
+- Dataset(
      num_blocks=...,
      num_rows=150,
      schema={
         sepal length (cm): double,
         sepal width (cm): double,
         petal length (cm): double,
         petal width (cm): double,
         target: int64
      }
   )

To learn more about transforming datasets, read Transforming datasets.

Consume the dataset#

Pass datasets to Ray tasks or actors, and access records with methods like iter_batches().

batches = transformed_dataset.iter_batches(batch_size=8)
print(next(iter(batches)))
   sepal length (cm)  ...  target
0                5.2  ...       1
1                5.4  ...       1
2                4.9  ...       2

[3 rows x 5 columns]
@ray.remote
def consume(dataset: ray.data.Dataset) -> int:
    num_batches = 0
    for batch in dataset.iter_batches(batch_size=8):
        num_batches += 1
    return num_batches

ray.get(consume.remote(transformed_dataset))
@ray.remote
class Worker:

    def train(self, shard) -> int:
        for batch in shard.iter_batches(batch_size=8):
            pass
        return shard.count()

workers = [Worker.remote() for _ in range(4)]
shards = transformed_dataset.split(n=4, locality_hints=workers)
ray.get([w.train.remote(s) for w, s in zip(workers, shards)])

To learn more about consuming datasets, read Consuming datasets.

Save the dataset#

Call methods like write_parquet() to save datasets to local or remote filesystems.

import os

transformed_dataset.write_parquet("iris")

print(os.listdir("iris"))
['..._000000.parquet']

To learn more about saving datasets, read Saving datasets.

Next Steps#

  • To check how your application is doing, you can use the Ray dashboard.