Ray Data Internals#

This guide describes the implementation of Ray Data. The intended audience is advanced users and Ray Data developers.

For a gentler introduction to Ray Data, see Quickstart.

Datasets and blocks#

A Dataset operates over a sequence of Ray object references to blocks. Each block contains a disjoint subset of rows, and Ray Data loads and transforms these blocks in parallel.

The following figure visualizes a dataset with three blocks, each holding 1000 rows. The Dataset is the user-facing Python object (usually held in the driver), while materialized blocks are stored as objects in Ray’s shared-memory object store.

Generally speaking, the number of concurrently executing tasks determines CPU utilization (or GPU utilization if using GPU transforms). The total heap memory usage is a function (determined by your UDF) of the number of concurrently executing tasks multiplied by the block size. The total number of materialized blocks in scope determines Ray’s object store usage; if this exceeds Ray’s object store capacity, then Ray automatically spills blocks to disk. Ray Data uses streaming execution to minimize the total number of materialized blocks in scope and therefore avoid spilling.

../_images/dataset-arch.svg

Operations#

Reading files#

Ray Data uses Ray tasks to read files in parallel. Each read task reads one or more files and produces a stream of one or more output blocks. These output blocks are either stored in Ray’s object store or fed directly to the downstream transform.

../_images/dataset-read.svg

To handle transient errors from remote datasources, Ray Data retries application-level exceptions.

For more information on loading data, see Loading data.

Transforming data#

Ray Data uses either Ray tasks or Ray actors to transform blocks. By default, it uses tasks. Usually, transforms are fused together and with the upstream read task.

../_images/dataset-map.svg

For the preceding example, each read task generates one output block. In the case where the read input file is large, a single read task may output multiple blocks. For more information on transforming data, see Transforming data.

Shuffling data#

When you call random_shuffle(), sort(), or groupby(), Ray Data shuffles blocks in a map-reduce style: map tasks partition blocks by value and then reduce tasks merge co-partitioned blocks.

Note

Shuffles materialize Datasets in memory. In other words, shuffle execution isn’t streamed through memory.

For an in-depth guide on shuffle performance, see Performance Tips and Tuning.

Scheduling#

Ray Data uses Ray Core for execution. Below is a summary of the scheduling strategy for Ray Data:

  • The SPREAD scheduling strategy ensures that data blocks and map tasks are evenly balanced across the cluster.

  • Dataset tasks ignore placement groups by default, see Ray Data and Placement Groups.

  • Map operations use the SPREAD scheduling strategy if the total argument size is less than 50 MB; otherwise, they use the DEFAULT scheduling strategy.

  • Read operations use the SPREAD scheduling strategy.

  • All other operations, such as split, sort, and shuffle, use the DEFAULT scheduling strategy.

Ray Data and placement groups#

By default, Ray Data configures its tasks and actors to use the cluster-default scheduling strategy ("DEFAULT"). You can inspect this configuration variable here: ray.data.DataContext.get_current().scheduling_strategy. This scheduling strategy schedules these Tasks and Actors outside any present placement group. To use current placement group resources specifically for Ray Data, set ray.data.DataContext.get_current().scheduling_strategy = None.

Consider this override only for advanced use cases to improve performance predictability. The general recommendation is to let Ray Data run outside placement groups.

Ray Data and Tune#

When using Ray Data in conjunction with Ray Tune, it’s important to ensure there are enough free CPUs for Ray Data to run on. By default, Tune tries to fully utilize cluster CPUs. This can prevent Ray Data from scheduling tasks, reducing performance or causing workloads to hang.

To ensure CPU resources are always available for Ray Data execution, limit the number of concurrent Tune trials with the max_concurrent_trials Tune option.

import ray
from ray import tune

# This workload will use spare cluster resources for execution.
def objective(*args):
    ray.data.range(10).show()

# Create a cluster with 4 CPU slots available.
ray.init(num_cpus=4)

# By setting `max_concurrent_trials=3`, this ensures the cluster will always
# have a sparse CPU for Dataset. Try setting `max_concurrent_trials=4` here,
# and notice that the experiment will appear to hang.
tuner = tune.Tuner(
    tune.with_resources(objective, {"cpu": 1}),
    tune_config=tune.TuneConfig(
        num_samples=1,
        max_concurrent_trials=3
    )
)
tuner.fit()

Execution#

Ray Data execution by default is:

  • Lazy: This means that transformations on Dataset aren’t executed until you call a consumption operation like ds.iter_batches() or Dataset.materialize(). This creates opportunities for optimizing the execution plan like operator fusion.

  • Streaming: This means that Dataset transformations are executed in a streaming way, incrementally on the base data, instead of on all of the data at once, and overlapping the execution of operations. This can be used for streaming data loading into ML training to overlap the data preprocessing and model training, or to execute batch transformations on large datasets without needing to load the entire dataset into cluster memory.

Lazy Execution#

Lazy execution offers opportunities for improved performance and memory stability, thanks to optimizations such as operator fusion and aggressive garbage collection of intermediate results.

Dataset creation and transformation APIs are lazy, with execution only triggered by “sink” APIs, such as consuming (ds.iter_batches()), writing (ds.write_parquet()), or manually triggering with ds.materialize(). There are a few exceptions to this rule, where transformations such as ds.union() and ds.limit() trigger execution.

Check the API docs for Ray Data methods to see if they trigger execution. Those that do trigger execution have a Note indicating as much.

Streaming Execution#

The following code is a hello world example which invokes the execution with ds.iter_batches() consumption. The example also enables verbose progress reporting, which shows per-operator progress in addition to overall progress.

import ray
import time

# Enable verbose reporting. This can also be toggled on by setting
# the environment variable RAY_DATA_VERBOSE_PROGRESS=1.
ctx = ray.data.DataContext.get_current()
ctx.execution_options.verbose_progress = True

def sleep(x):
    time.sleep(0.1)
    return x

class SleepClass():
    def __call__(self, x):
        time.sleep(0.1)
        return x

for _ in (
    ray.data.range_tensor(5000, shape=(80, 80, 3), override_num_blocks=200)
    .map_batches(sleep, num_cpus=2)
    .map_batches(SleepClass, concurrency=(2, 4))
    .map_batches(sleep, num_cpus=1)
    .iter_batches()
):
    pass

This launches a simple 4-operator Dataset. The example uses different compute arguments for each operator, which forces them to be run as separate operators instead of getting fused together. You should see a log message indicating streaming execution is being used:

2023-03-30 16:40:10,076      INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> TaskPoolMapOperator[MapBatches(sleep)] -> ActorPoolMapOperator[MapBatches(sleep)] -> TaskPoolMapOperator[MapBatches(sleep)]

The next few lines shows execution progress. Here is how to interpret the output:

Running: 7.0/16.0 CPU, 0.0/0.0 GPU, 76.91 MiB/2.25 GiB object_store_memory 65%|██▊ | 130/200 [00:08<00:02, 22.52it/s]

This line tells you how many resources are currently being used by the streaming executor out of the limits, as well as the number of completed output blocks. The streaming executor attempts to keep resource usage under the printed limits by throttling task executions.

ReadRange: 2 active, 37 queued, 7.32 MiB objects 1:  80%|████████▊  | 161/200 [00:08<00:02, 17.81it/s]
MapBatches(sleep): 5 active, 5 queued, 18.31 MiB objects 2:  76%|██▎| 151/200 [00:08<00:02, 19.93it/s]
MapBatches(sleep): 7 active, 2 queued, 25.64 MiB objects, 2 actors [all objects local] 3:  71%|▋| 142/
MapBatches(sleep): 2 active, 0 queued, 7.32 MiB objects 4:  70%|██▊ | 139/200 [00:08<00:02, 23.16it/s]

These lines are only shown when verbose progress reporting is enabled. The active count indicates the number of running tasks for the operator. The queued count is the number of input blocks for the operator that are computed but are not yet submitted for execution. For operators that use actor-pool execution, the number of running actors is shown as actors.

Tip

Avoid returning large outputs from the final operation of a Dataset you are iterating over, since the consumer process is a serial bottleneck.

Fault tolerance#

Ray Data performs lineage reconstruction to recover data. If an application error or system failure occurs, Ray Data recreates blocks by re-executing tasks.

Note

Fault tolerance isn’t supported if the process that created the Dataset dies.

Operator Fusion Optimization#

In order to reduce memory usage and task overheads, Ray Data automatically fuses together compatible operators which share the same compute pattern and strategy:

  • Same compute pattern: embarrassingly parallel map vs. all-to-all shuffle

  • Same compute strategy: Ray tasks vs Ray actors

  • Same resource specification, for example, num_cpus or num_gpus requests

Read operators and subsequent map-like transformations are usually fused together. All-to-all transformations such as ds.random_shuffle() can be fused with earlier map-like operators, but not later operators.

You can tell if operator fusion is enabled by checking the Dataset stats and looking for fused operators (for example, Read->MapBatches).

Operator N Read->MapBatches->RandomShuffleMap: N tasks executed, N blocks produced in T
* Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total
* Output num rows: N min, N max, N mean, N total

Memory Management#

This section describes how Ray Data manages execution and object store memory.

Execution Memory#

During execution, a task can read multiple input blocks, and write multiple output blocks. Input and output blocks consume both worker heap memory and shared memory through Ray’s object store. Ray caps object store memory usage by spilling to disk, but excessive worker heap memory usage can cause out-of-memory errors.

For more information on tuning memory usage and preventing out-of-memory errors, see the performance guide.

Object Store Memory#

Ray Data uses the Ray object store to store data blocks, which means it inherits the memory management features of the Ray object store. This section discusses the relevant features:

  • Object Spilling: Since Ray Data uses the Ray object store to store data blocks, any blocks that can’t fit into object store memory are automatically spilled to disk. The objects are automatically reloaded when needed by downstream compute tasks:

  • Locality Scheduling: Ray preferentially schedules compute tasks on nodes that already have a local copy of the object, reducing the need to transfer objects between nodes in the cluster.

  • Reference Counting: Dataset blocks are kept alive by object store reference counting as long as there is any Dataset that references them. To free memory, delete any Python references to the Dataset object.