Pipelining Compute

Creating a DatasetPipeline

A DatasetPipeline can be constructed in two ways: either by pipelining the execution of an existing Dataset (via Dataset.window), or generating repeats of an existing Dataset (via Dataset.repeat). Similar to Datasets, you can freely pass DatasetPipelines between Ray tasks, actors, and libraries. Get started with this synthetic data example:

import ray

def func1(i: int) -> int:
    return i + 1

def func2(i: int) -> int:
    return i * 2

def func3(i: int) -> int:
    return i % 3

# Create a dataset and then create a pipeline from it.
base = ray.data.range(1000000)
print(base)
# -> Dataset(num_blocks=200, num_rows=1000000, schema=<class 'int'>)
pipe = base.window(blocks_per_window=10)
print(pipe)
# -> DatasetPipeline(num_windows=20, num_stages=1)

# Applying transforms to pipelines adds more pipeline stages.
pipe = pipe.map(func1)
pipe = pipe.map(func2)
pipe = pipe.map(func3)
print(pipe)
# -> DatasetPipeline(num_windows=20, num_stages=4)

# Output can be pulled from the pipeline concurrently with its execution.
num_rows = 0
for row in pipe.iter_rows():
    num_rows += 1
# ->
# Stage 0:  55%|█████████████████████████                |11/20 [00:02<00:00,  9.86it/s]
# Stage 1:  50%|██████████████████████                   |10/20 [00:02<00:01,  9.45it/s]
# Stage 2:  45%|███████████████████                      | 9/20 [00:02<00:01,  8.27it/s]
# Stage 3:  35%|████████████████                         | 8/20 [00:02<00:02,  5.33it/s]
print("Total num rows", num_rows)
# -> Total num rows 1000000

You can also create a DatasetPipeline from a custom iterator over dataset creators using DatasetPipeline.from_iterable. For example, this is how you would implement Dataset.repeat and Dataset.window using from_iterable:

import ray
from ray.data.dataset_pipeline import DatasetPipeline

# Equivalent to ray.data.range(1000).repeat(times=4)
source = ray.data.range(1000)
pipe = DatasetPipeline.from_iterable(
    [lambda: source, lambda: source, lambda: source, lambda: source])

# Equivalent to ray.data.range(1000).window(blocks_per_window=10)
splits = ray.data.range(1000, parallelism=200).split(20)
pipe = DatasetPipeline.from_iterable([lambda s=s: s for s in splits])

Per-Window Transformations

While most Dataset operations are per-row (e.g., map, filter), some operations apply to the Dataset as a whole (e.g., sort, shuffle). When applied to a pipeline, holistic transforms like shuffle are applied separately to each window in the pipeline:

# Example of randomly shuffling each window of a pipeline.
ray.data.from_items([0, 1, 2, 3, 4]) \
    .repeat(2) \
    .random_shuffle_each_window() \
    .show_windows()
# ->
# ----- Epoch 0 ------
# === Window 0 ===
# 4
# 3
# 1
# 0
# 2
# ----- Epoch 1 ------
# === Window 1 ===
# 2
# 1
# 4
# 0
# 3

You can also apply arbitrary transformations to each window using DatasetPipeline.foreach_window():

# Equivalent transformation using .foreach_window()
ray.data.from_items([0, 1, 2, 3, 4]) \
    .repeat(2) \
    .foreach_window(lambda w: w.random_shuffle()) \
    .show_windows()
# ->
# ----- Epoch 0 ------
# === Window 0 ===
# 1
# 0
# 4
# 2
# 3
# ----- Epoch 1 ------
# === Window 1 ===
# 4
# 2
# 0
# 3
# 1