Getting Started¶
In this tutorial you will learn how to:
Create and save a Ray
Dataset
.How to transform a
Dataset
and pass it into other Ray Tasks.How to create a Ray
DatasetPipeline
and run transformations on it.
Dataset Quick Start¶
Ray Datasets implements Distributed Arrow. A Dataset consists of a list of Ray object references to blocks. Each block holds a set of items in either an Arrow table or a Python list (for Arrow incompatible objects). Let’s start by creating a Dataset.
Creating Datasets¶
Tip
Run pip install "ray[data]"
to get started!
You can get started by creating Datasets from synthetic data using ray.data.range()
and ray.data.from_items()
.
Datasets can hold either plain Python objects (i.e. their schema is a Python type), or Arrow records
(in which case their schema is Arrow).
import ray
# Create a Dataset of Python objects.
ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
ds.take(5)
# -> [0, 1, 2, 3, 4]
ds.count()
# -> 10000
# Create a Dataset of Arrow records.
ds = ray.data.from_items([{"col1": i, "col2": str(i)} for i in range(10000)])
# -> Dataset(num_blocks=200, num_rows=10000, schema={col1: int64, col2: string})
ds.show(5)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}
# -> {'col1': 3, 'col2': '3'}
# -> {'col1': 4, 'col2': '4'}
ds.schema()
# -> col1: int64
# -> col2: string
Datasets can be created from files on local disk or remote datasources such as S3. Any filesystem supported by pyarrow can be used to specify file locations:
# Read a directory of files in remote storage.
ds = ray.data.read_csv("s3://bucket/path")
# Read multiple local files.
ds = ray.data.read_csv(["/path/to/file1", "/path/to/file2"])
# Read multiple directories.
ds = ray.data.read_csv(["s3://bucket/path1", "s3://bucket/path2"])
Finally, you can create a Dataset
from existing data in the Ray object store or Ray-compatible distributed DataFrames:
import pandas as pd
import dask.dataframe as dd
# Create a Dataset from a list of Pandas DataFrame objects.
pdf = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
ds = ray.data.from_pandas([pdf])
# Create a Dataset from a Dask-on-Ray DataFrame.
dask_df = dd.from_pandas(pdf, npartitions=10)
ds = ray.data.from_dask(dask_df)
Saving Datasets¶
Datasets can be written to local or remote storage using .write_csv()
, .write_json()
, and .write_parquet()
.
# Write to csv files in /tmp/output.
ray.data.range(10000).write_csv("/tmp/output")
# -> /tmp/output/data0.csv, /tmp/output/data1.csv, ...
# Use repartition to control the number of output files:
ray.data.range(10000).repartition(1).write_csv("/tmp/output2")
# -> /tmp/output2/data0.csv
You can also convert a Dataset
to Ray-compatible distributed DataFrames:
# Convert a Ray Dataset into a Dask-on-Ray DataFrame.
dask_df = ds.to_dask()
Transforming Datasets¶
Datasets can be transformed in parallel using .map()
.
Transformations are executed eagerly and block until the operation is finished.
Datasets also supports .filter()
and .flat_map()
.
ds = ray.data.range(10000)
ds = ds.map(lambda x: x * 2)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
ds.take(5)
# -> [0, 2, 4, 6, 8]
ds.filter(lambda x: x > 5).take(5)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1859.63it/s]
# -> [6, 8, 10, 12, 14]
ds.flat_map(lambda x: [x, -x]).take(5)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1568.10it/s]
# -> [0, 0, 2, -2, 4]
To take advantage of vectorized functions, use .map_batches()
.
Note that you can also implement filter
and flat_map
using .map_batches()
,
since your map function can return an output batch of any size.
ds = ray.data.range_arrow(10000)
ds = ds.map_batches(
lambda df: df.applymap(lambda x: x * 2), batch_format="pandas")
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1927.62it/s]
ds.take(5)
# -> [{'value': 0}, {'value': 2}, ...]
By default, transformations are executed using Ray tasks.
For transformations that require setup, specify compute=ray.data.ActorPoolStrategy(min, max)
and Ray will use an autoscaling actor pool of min
to max
actors to execute your transforms.
For a fixed-size actor pool, specify ActorPoolStrategy(n, n)
.
The following is an end-to-end example of reading, transforming, and saving batch inference results using Ray Data:
from ray.data import ActorPoolStrategy
# Example of GPU batch inference on an ImageNet model.
def preprocess(image: bytes) -> bytes:
return image
class BatchInferModel:
def __init__(self):
self.model = ImageNetModel()
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
return self.model(batch)
ds = ray.data.read_binary_files("s3://bucket/image-dir")
# Preprocess the data.
ds = ds.map(preprocess)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]
# Apply GPU batch inference with actors, and assign each actor a GPU using
# ``num_gpus=1`` (any Ray remote decorator argument can be used here).
ds = ds.map_batches(
BatchInferModel, compute=ActorPoolStrategy(10, 20),
batch_size=256, num_gpus=1)
# -> Map Progress (16 actors 4 pending): 100%|██████| 200/200 [00:07, 27.60it/s]
# Save the results.
ds.repartition(1).write_json("s3://bucket/inference-results")
Exchanging datasets¶
Datasets can be passed to Ray tasks or actors and read with .iter_batches()
or .iter_rows()
.
This does not incur a copy, since the blocks of the Dataset are passed by reference as Ray objects:
@ray.remote
def consume(data: Dataset[int]) -> int:
num_batches = 0
for batch in data.iter_batches():
num_batches += 1
return num_batches
ds = ray.data.range(10000)
ray.get(consume.remote(ds))
# -> 200
Datasets can be split up into disjoint sub-datasets.
Locality-aware splitting is supported if you pass in a list of actor handles to the split()
function along with the number of desired splits.
This is a common pattern useful for loading and splitting data between distributed training actors:
@ray.remote(num_gpus=1)
class Worker:
def __init__(self, rank: int):
pass
def train(self, shard: ray.data.Dataset[int]) -> int:
for batch in shard.iter_batches(batch_size=256):
pass
return shard.count()
workers = [Worker.remote(i) for i in range(16)]
# -> [Actor(Worker, ...), Actor(Worker, ...), ...]
ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
shards = ds.split(n=16, locality_hints=workers)
# -> [Dataset(num_blocks=13, num_rows=650, schema=<class 'int'>),
# Dataset(num_blocks=13, num_rows=650, schema=<class 'int'>), ...]
ray.get([w.train.remote(s) for w, s in zip(workers, shards)])
# -> [650, 650, ...]
Dataset Pipelines Quick Start¶
Creating a DatasetPipeline¶
A DatasetPipeline can be constructed in two ways: either by pipelining the execution of an existing Dataset (via Dataset.window
), or generating repeats of an existing Dataset (via Dataset.repeat
). Similar to Datasets, you can freely pass DatasetPipelines between Ray tasks, actors, and libraries. Get started with this synthetic data example:
import ray
def func1(i: int) -> int:
return i + 1
def func2(i: int) -> int:
return i * 2
def func3(i: int) -> int:
return i % 3
# Create a dataset and then create a pipeline from it.
base = ray.data.range(1000000)
print(base)
# -> Dataset(num_blocks=200, num_rows=1000000, schema=<class 'int'>)
pipe = base.window(blocks_per_window=10)
print(pipe)
# -> DatasetPipeline(num_windows=20, num_stages=1)
# Applying transforms to pipelines adds more pipeline stages.
pipe = pipe.map(func1)
pipe = pipe.map(func2)
pipe = pipe.map(func3)
print(pipe)
# -> DatasetPipeline(num_windows=20, num_stages=4)
# Output can be pulled from the pipeline concurrently with its execution.
num_rows = 0
for row in pipe.iter_rows():
num_rows += 1
# ->
# Stage 0: 55%|█████████████████████████ |11/20 [00:02<00:00, 9.86it/s]
# Stage 1: 50%|██████████████████████ |10/20 [00:02<00:01, 9.45it/s]
# Stage 2: 45%|███████████████████ | 9/20 [00:02<00:01, 8.27it/s]
# Stage 3: 35%|████████████████ | 8/20 [00:02<00:02, 5.33it/s]
print("Total num rows", num_rows)
# -> Total num rows 1000000
You can also create a DatasetPipeline from a custom iterator over dataset creators using DatasetPipeline.from_iterable
. For example, this is how you would implement Dataset.repeat
and Dataset.window
using from_iterable
:
import ray
from ray.data.dataset_pipeline import DatasetPipeline
# Equivalent to ray.data.range(1000).repeat(times=4)
source = ray.data.range(1000)
pipe = DatasetPipeline.from_iterable(
[lambda: source, lambda: source, lambda: source, lambda: source])
# Equivalent to ray.data.range(1000).window(blocks_per_window=10)
splits = ray.data.range(1000, parallelism=200).split(20)
pipe = DatasetPipeline.from_iterable([lambda s=s: s for s in splits])
Per-Window Transformations¶
While most Dataset operations are per-row (e.g., map, filter), some operations apply to the Dataset as a whole (e.g., sort, shuffle). When applied to a pipeline, holistic transforms like shuffle are applied separately to each window in the pipeline:
# Example of randomly shuffling each window of a pipeline.
ray.data.from_items([0, 1, 2, 3, 4]) \
.repeat(2) \
.random_shuffle_each_window() \
.show_windows()
# ->
# ----- Epoch 0 ------
# === Window 0 ===
# 4
# 3
# 1
# 0
# 2
# ----- Epoch 1 ------
# === Window 1 ===
# 2
# 1
# 4
# 0
# 3
You can also apply arbitrary transformations to each window using DatasetPipeline.foreach_window()
:
# Equivalent transformation using .foreach_window()
ray.data.from_items([0, 1, 2, 3, 4]) \
.repeat(2) \
.foreach_window(lambda w: w.random_shuffle()) \
.show_windows()
# ->
# ----- Epoch 0 ------
# === Window 0 ===
# 1
# 0
# 4
# 2
# 3
# ----- Epoch 1 ------
# === Window 1 ===
# 4
# 2
# 0
# 3
# 1