Ray Data Internals
Contents
Ray Data Internals#
This guide describes the implementation of Ray Data. The intended audience is advanced users and Ray Data developers.
For a gentler introduction to Ray Data, see Key concepts.
Datasets and blocks#
A Dataset
operates over a sequence of Ray object references
to blocks. Each block contains a disjoint subset of rows, and Ray Data
loads and transforms these blocks in parallel.
The following figure visualizes a dataset with three blocks, each holding 1000 rows.
Operations#
Reading files#
Ray Data uses Ray tasks to read files in parallel. Each read task reads one or more files and produces an output block:
To handle transient errors from remote datasources, Ray Data retries application-level exceptions.
For more information on loading data, see Loading data.
Transforming data#
Ray Data uses either Ray tasks or Ray actors to transform blocks. By default, it uses tasks.
For more information on transforming data, see Transforming data.
Shuffling data#
When you call random_shuffle()
,
sort()
, or groupby()
, Ray Data shuffles
blocks in a map-reduce style: map tasks partition blocks by value and then reduce tasks
merge co-partitioned blocks.
Note
Shuffles materialize Datasets
in memory. In other
words, shuffle execution isn’t streamed through memory.
For an in-depth guide on shuffle performance, see Performance Tips and Tuning.
Scheduling#
Ray Data uses Ray Core for execution, and is subject to the same scheduling considerations as normal Ray Tasks and Actors. Ray Data uses the following custom scheduling settings by default for improved performance:
The
SPREAD
scheduling strategy ensures that data blocks and map tasks are evenly balanced across the cluster.Dataset tasks ignore placement groups by default, see Ray Data and Placement Groups.
Ray Data and placement groups#
By default, Ray Data configures its tasks and actors to use the cluster-default scheduling strategy ("DEFAULT"
). You can inspect this configuration variable here:
ray.data.DataContext.get_current().scheduling_strategy
. This scheduling strategy schedules these Tasks and Actors outside any present
placement group. To use current placement group resources specifically for Ray Data, set ray.data.DataContext.get_current().scheduling_strategy = None
.
Consider this override only for advanced use cases to improve performance predictability. The general recommendation is to let Ray Data run outside placement groups.
Ray Data and Tune#
When using Ray Data in conjunction with Ray Tune, it’s important to ensure there are enough free CPUs for Ray Data to run on. By default, Tune tries to fully utilize cluster CPUs. This can prevent Ray Data from scheduling tasks, reducing performance or causing workloads to hang.
To ensure CPU resources are always available for Ray Data execution, limit the number of concurrent Tune trials with the max_concurrent_trials
Tune option.
import ray
from ray import tune
# This workload will use spare cluster resources for execution.
def objective(*args):
ray.data.range(10).show()
# Create a cluster with 4 CPU slots available.
ray.init(num_cpus=4)
# By setting `max_concurrent_trials=3`, this ensures the cluster will always
# have a sparse CPU for Dataset. Try setting `max_concurrent_trials=4` here,
# and notice that the experiment will appear to hang.
tuner = tune.Tuner(
tune.with_resources(objective, {"cpu": 1}),
tune_config=tune.TuneConfig(
num_samples=1,
max_concurrent_trials=3
)
)
tuner.fit()
Execution#
Ray Data execution by default is:
Lazy: This means that transformations on Dataset aren’t executed until you call a consumption operation like
ds.iter_batches()
orDataset.materialize()
. This creates opportunities for optimizing the execution plan like stage fusion.Streaming: This means that Dataset transformations are executed in a streaming way, incrementally on the base data, instead of on all of the data at once, and overlapping the execution of operations. This can be used for streaming data loading into ML training to overlap the data preprocessing and model training, or to execute batch transformations on large datasets without needing to load the entire dataset into cluster memory.
Lazy Execution#
Lazy execution offers opportunities for improved performance and memory stability due to stage fusion optimizations and aggressive garbage collection of intermediate results.
Dataset creation and transformation APIs are lazy, with execution only triggered via “sink”
APIs, such as consuming (ds.iter_batches()
),
writing (ds.write_parquet()
), or manually triggering via
ds.materialize()
. There are a few
exceptions to this rule, where transformations such as ds.union()
and
ds.limit()
trigger execution.
Check the API docs for Ray Data methods to see if they
trigger execution. Those that do trigger execution have a Note
indicating as
much.
Streaming Execution#
The following code is a hello world example which invokes the execution with
ds.iter_batches()
consumption. The example also enables verbose progress reporting, which shows per-operator progress in addition to overall progress.
import ray
import time
# Enable verbose reporting. This can also be toggled on by setting
# the environment variable RAY_DATA_VERBOSE_PROGRESS=1.
ctx = ray.data.DataContext.get_current()
ctx.execution_options.verbose_progress = True
def sleep(x):
time.sleep(0.1)
return x
for _ in (
ray.data.range_tensor(5000, shape=(80, 80, 3), parallelism=200)
.map_batches(sleep, num_cpus=2)
.map_batches(sleep, compute=ray.data.ActorPoolStrategy(min_size=2, max_size=4))
.map_batches(sleep, num_cpus=1)
.iter_batches()
):
pass
This launches a simple 4-stage pipeline. The example uses different compute arguments for each stage, which forces them to be run as separate operators instead of getting fused together. You should see a log message indicating streaming execution is being used:
2023-03-30 16:40:10,076 INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> TaskPoolMapOperator[MapBatches(sleep)] -> ActorPoolMapOperator[MapBatches(sleep)] -> TaskPoolMapOperator[MapBatches(sleep)]
The next few lines shows execution progress. Here is how to interpret the output:
Running: 7.0/16.0 CPU, 0.0/0.0 GPU, 76.91 MiB/2.25 GiB object_store_memory 65%|██▊ | 130/200 [00:08<00:02, 22.52it/s]
This line tells you how many resources are currently being used by the streaming executor out of the limits, as well as the number of completed output blocks. The streaming executor attempts to keep resource usage under the printed limits by throttling task executions.
ReadRange: 2 active, 37 queued, 7.32 MiB objects 1: 80%|████████▊ | 161/200 [00:08<00:02, 17.81it/s]
MapBatches(sleep): 5 active, 5 queued, 18.31 MiB objects 2: 76%|██▎| 151/200 [00:08<00:02, 19.93it/s]
MapBatches(sleep): 7 active, 2 queued, 25.64 MiB objects, 2 actors [all objects local] 3: 71%|▋| 142/
MapBatches(sleep): 2 active, 0 queued, 7.32 MiB objects 4: 70%|██▊ | 139/200 [00:08<00:02, 23.16it/s]
These lines are only shown when verbose progress reporting is enabled. The active
count indicates the number of running tasks for the operator. The queued
count is the number of input blocks for the operator that are computed but are not yet submitted for execution. For operators that use actor-pool execution, the number of running actors is shown as actors
.
Tip
Avoid returning large outputs from the final operation of a pipeline you are iterating over, since the consumer process is a serial bottleneck.
Fault tolerance#
Ray Data performs lineage reconstruction to recover data. If an application error or system failure occurs, Ray Data recreates blocks by re-executing tasks.
Note
Fault tolerance isn’t supported if the process that created the
Dataset
dies.
Stage Fusion Optimization#
In order to reduce memory usage and task overheads, Ray Data automatically fuses together lazy operations that are compatible:
Same compute pattern: embarrassingly parallel map vs. all-to-all shuffle
Same compute strategy: Ray tasks vs Ray actors
Same resource specification, for example,
num_cpus
ornum_gpus
requests
Read stages and subsequent map-like transformations are usually fused together.
All-to-all transformations such as
ds.random_shuffle()
can be fused with earlier
map-like stages, but not later stages.
You can tell if stage fusion is enabled by checking the Dataset stats and looking for fused stages (for example, read->map_batches
).
Stage N read->map_batches->shuffle_map: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total
* Output num rows: N min, N max, N mean, N total
Memory Management#
This section describes how Ray Data manages execution and object store memory.
Execution Memory#
During execution, a task can read multiple input blocks, and write multiple output blocks. Input and output blocks consume both worker heap memory and shared memory via Ray’s object store.
Ray Data attempts to bound its heap memory usage to num_execution_slots * max_block_size
. The number of execution slots is by default equal to the number of CPUs, unless custom resources are specified. The maximum block size is set by the configuration parameter ray.data.DataContext.target_max_block_size
and is set to 512MiB by default. When a task’s output is larger than this value, the worker automatically splits the output into multiple smaller blocks to avoid running out of heap memory.
Large block size can lead to potential out-of-memory situations. To avoid these issues, make sure no single item in your Ray Data is too large, and always call ds.map_batches()
with batch size small enough such that the output batch can comfortably fit into memory.
Object Store Memory#
Ray Data uses the Ray object store to store data blocks, which means it inherits the memory management features of the Ray object store. This section discusses the relevant features:
Object Spilling: Since Ray Data uses the Ray object store to store data blocks, any blocks that can’t fit into object store memory are automatically spilled to disk. The objects are automatically reloaded when needed by downstream compute tasks:
Locality Scheduling: Ray preferentially schedules compute tasks on nodes that already have a local copy of the object, reducing the need to transfer objects between nodes in the cluster.
Reference Counting: Dataset blocks are kept alive by object store reference counting as long as there is any Dataset that references them. To free memory, delete any Python references to the Dataset object.