Scheduling, Execution, and Memory Management#

Scheduling#

Datasets uses Ray core for execution, and hence is subject to the same scheduling considerations as normal Ray tasks and actors. Datasets uses the following custom scheduling settings by default for improved performance:

  • The SPREAD scheduling strategy is used to ensure data blocks are evenly balanced across the cluster.

  • Retries of application-level exceptions are enabled to handle transient errors from remote datasources.

  • Dataset tasks ignore placement groups by default, see Datasets and Placement Groups.

Datasets and Tune#

When using Datasets in conjunction with Ray Tune, it is important to ensure there are enough free CPUs for Datasets to run on. By default, Tune will try to fully utilize cluster CPUs. This can prevent Datasets from scheduling tasks, reducing performance or causing workloads to hang.

As an example, the following shows two ways to use Datasets together with Tune:

By limiting the number of concurrent Tune trials, we ensure CPU resources are always available for Datasets execution. This can be done using the max_concurrent_trials Tune option.

import ray
from ray import tune

# This Dataset workload will use spare cluster resources for execution.
def objective(*args):
    ray.data.range(10).show()

# Create a cluster with 4 CPU slots available.
ray.init(num_cpus=4)

# By setting `max_concurrent_trials=3`, this ensures the cluster will always
# have a sparse CPU for Datasets. Try setting `max_concurrent_trials=4` here,
# and notice that the experiment will appear to hang.
tuner = tune.Tuner(
    tune.with_resources(objective, {"cpu": 1}),
    tune_config=tune.TuneConfig(
        num_samples=1,
        max_concurrent_trials=3
    )
)
tuner.fit()

Alternatively, we can tell Tune to set aside CPU resources for other libraries. This can be done by setting _max_cpu_fraction_per_node=0.8, which reserves 20% of node CPUs for Dataset execution.

import ray
from ray import tune

# This Dataset workload will use reserved cluster resources for execution.
def objective(*args):
    ray.data.range(10).show()

# Create a cluster with 4 CPU slots available.
ray.init(num_cpus=4)

# This runs smoothly since _max_cpu_fraction_per_node is set to 0.8, effectively
# reserving 1 CPU for Datasets task execution.
tuner = tune.Tuner(
    tune.with_resources(objective, tune.PlacementGroupFactory(
        [{"CPU": 1}],
        _max_cpu_fraction_per_node=0.8,
    )),
    tune_config=tune.TuneConfig(num_samples=1)
)
tuner.fit()

Warning

This option is experimental and not currently recommended for use with autoscaling clusters (scale-up will not trigger properly).

Datasets and Placement Groups#

By default, Datasets configures its tasks and actors to use the cluster-default scheduling strategy (“DEFAULT”). You can inspect this configuration variable here: ray.data.DatasetContext.get_current().scheduling_strategy. This scheduling strategy will schedule these tasks and actors outside any present placement group. If you want to force Datasets to schedule tasks within the current placement group (i.e., to use current placement group resources specifically for Datasets), you can set ray.data.DatasetContext.get_current().scheduling_strategy = None.

This should be considered for advanced use cases to improve performance predictability only. We generally recommend letting Datasets run outside placement groups as documented in the Datasets and Other Libraries section.

Execution#

The Datasets execution by default is:

  • Lazy: This means that transformations on Dataset are not executed until a consumption operation (e.g. ds.iter_batches()) or Dataset.materialize() is called. This creates opportunities for optimizing the execution plan (e.g. stage fusion).

  • Pipelined: This means that Dataset transformations will be executed in a streaming way, incrementally on the base data, instead of on all of the data at once, and overlapping the execution of operations. This can be used for streaming data loading into ML training to overlap the data preprocessing and model training, or to execute batch transformations on large datasets without needing to load the entire dataset into cluster memory.

Lazy Execution#

Lazy execution offers opportunities for improved performance and memory stability due to stage fusion optimizations and aggressive garbage collection of intermediate results.

Dataset creation and transformation APIs are lazy, with execution only triggered via “sink” APIs, such as consuming (ds.iter_batches()), writing (ds.write_parquet()), or manually triggering via ds.materialize(). There are a few exceptions to this rule, where transformations such as ds.union() and ds.limit() trigger execution; we plan to make these operations lazy in the future.

Check the API docs for Datasets methods to see if they trigger execution. Those that do trigger execution will have a Note indicating as much.

Streaming Execution#

The following code is a hello world example which invokes the execution with ds.iter_batches() consumption. We will also enable verbose progress reporting, which shows per-operator progress in addition to overall progress.

import ray
import time

# Enable verbose reporting. This can also be toggled on by setting
# the environment variable RAY_DATA_VERBOSE_PROGRESS=1.
ctx = ray.data.DatasetContext.get_current()
ctx.execution_options.verbose_progress = True

def sleep(x):
    time.sleep(0.1)
    return x

for _ in (
    ray.data.range_tensor(5000, shape=(80, 80, 3), parallelism=200)
    .map_batches(sleep, num_cpus=2)
    .map_batches(sleep, compute=ray.data.ActorPoolStrategy(2, 4))
    .map_batches(sleep, num_cpus=1)
    .iter_batches()
):
    pass

This launches a simple 4-stage pipeline. We use different compute args for each stage, which forces them to be run as separate operators instead of getting fused together. You should see a log message indicating streaming execution is being used:

2023-03-30 16:40:10,076      INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> TaskPoolMapOperator[MapBatches(sleep)] -> ActorPoolMapOperator[MapBatches(sleep)] -> TaskPoolMapOperator[MapBatches(sleep)]

The next few lines will show execution progress. Here is how to interpret the output:

Running: 7.0/16.0 CPU, 0.0/0.0 GPU, 76.91 MiB/2.25 GiB object_store_memory 65%|██▊ | 130/200 [00:08<00:02, 22.52it/s]

This line tells you how many resources are currently being used by the streaming executor out of the limits, as well as the number of completed output blocks. The streaming executor will attempt to keep resource usage under the printed limits by throttling task executions.

ReadRange: 2 active, 37 queued, 7.32 MiB objects 1:  80%|████████▊  | 161/200 [00:08<00:02, 17.81it/s]
MapBatches(sleep): 5 active, 5 queued, 18.31 MiB objects 2:  76%|██▎| 151/200 [00:08<00:02, 19.93it/s]
MapBatches(sleep): 7 active, 2 queued, 25.64 MiB objects, 2 actors [all objects local] 3:  71%|▋| 142/
MapBatches(sleep): 2 active, 0 queued, 7.32 MiB objects 4:  70%|██▊ | 139/200 [00:08<00:02, 23.16it/s]

These lines are only shown when verbose progress reporting is enabled. The active count indicates the number of running tasks for the operator. The queued count is the number of input blocks for the operator that are computed but are not yet submitted for execution. For operators that use actor-pool execution, the number of running actors is shown as actors.

Tip

Avoid returning large outputs from the final operation of a pipeline you are iterating over, since the consumer process will be a serial bottleneck.

Configuring Resources and Locality#

By default, the CPU and GPU limits are set to the cluster size, and the object store memory limit conservatively to 1/4 of the total object store size to avoid the possibility of disk spilling.

You may want to customize these limits in the following scenarios: - If running multiple concurrent jobs on the cluster, setting lower limits can avoid resource contention between the jobs. - If you want to fine-tune the memory limit to maximize performance. - For data loading into training jobs, you may want to set the object store memory to a low value (e.g., 2GB) to limit resource usage.

Execution options can be configured via the global DatasetContext. The options will be applied for future jobs launched in the process:

ctx = ray.data.DatasetContext.get_current()
ctx.execution_options.resource_limits.cpu = 10
ctx.execution_options.resource_limits.gpu = 5
ctx.execution_options.resource_limits.object_store_memory = 10e9

Deterministic Execution#

# By default, this is set to False.
ctx.execution_options.preserve_order = True

To enable deterministic execution, set the above to True. This may decrease performance, but will ensure block ordering is preserved through execution. This flag defaults to False.

Actor Locality Optimization (ML inference use case)#

# By default, this is set to True already.
ctx.execution_options.actor_locality_enabled = True

The actor locality optimization (if you’re using actor pools) tries to schedule objects that are already local to an actor’s node to the same actor. This reduces network traffic across nodes. When actor locality is enabled, you’ll see a report in the progress output of the hit rate:

MapBatches(Model): 0 active, 0 queued, 0 actors [992 locality hits, 8 misses]: 100%|██████████| 1000/1000 [00:59<00:00, 16.84it/s]

Locality with Output (ML ingest use case)#

ctx.execution_options.locality_with_output = True

Setting this to True tells Datasets to prefer placing operator tasks onto the consumer node in the cluster, rather than spreading them evenly across the cluster. This can be useful if you know you’ll be consuming the output data directly on the consumer node (i.e., for ML training ingest). However, this may incur a performance penalty for other use cases.

Scalability#

We expect the data streaming backend to scale to tens of thousands of files / blocks and up to hundreds of terabytes of data. Please report if you experience performance degradation at these scales, we would be very interested to investigate!

Stage Fusion Optimization#

In order to reduce memory usage and task overheads, Datasets will automatically fuse together lazy operations that are compatible:

  • Same compute pattern: embarrassingly parallel map vs. all-to-all shuffle

  • Same compute strategy: Ray tasks vs Ray actors

  • Same resource specification, e.g. num_cpus or num_gpus requests

Read stages and subsequent map-like transformations will usually be fused together. All-to-all transformations such as ds.random_shuffle() can be fused with earlier map-like stages, but not later stages.

You can tell if stage fusion is enabled by checking the Dataset stats and looking for fused stages (e.g., read->map_batches).

Stage N read->map_batches->shuffle_map: N/N blocks executed in T
* Remote wall time: T min, T max, T mean, T total
* Remote cpu time: T min, T max, T mean, T total
* Output num rows: N min, N max, N mean, N total

Memory Management#

This section describes how Datasets manages execution and object store memory.

Execution Memory#

During execution, a task can read multiple input blocks, and write multiple output blocks. Input and output blocks consume both worker heap memory and shared memory via Ray’s object store.

Datasets attempts to bound its heap memory usage to num_execution_slots * max_block_size. The number of execution slots is by default equal to the number of CPUs, unless custom resources are specified. The maximum block size is set by the configuration parameter ray.data.DatasetContext.target_max_block_size and is set to 512MiB by default. When a task’s output is larger than this value, the worker will automatically split the output into multiple smaller blocks to avoid running out of heap memory.

Large block size can lead to potential out-of-memory situations. To avoid these issues, make sure no single item in your Datasets is too large, and always call ds.map_batches() with batch size small enough such that the output batch can comfortably fit into memory.

Object Store Memory#

Datasets uses the Ray object store to store data blocks, which means it inherits the memory management features of the Ray object store. This section discusses the relevant features:

  • Object Spilling: Since Datasets uses the Ray object store to store data blocks, any blocks that can’t fit into object store memory are automatically spilled to disk. The objects are automatically reloaded when needed by downstream compute tasks:

  • Locality Scheduling: Ray will preferentially schedule compute tasks on nodes that already have a local copy of the object, reducing the need to transfer objects between nodes in the cluster.

  • Reference Counting: Dataset blocks are kept alive by object store reference counting as long as there is any Dataset that references them. To free memory, delete any Python references to the Dataset object.

Block Data Formats#

In order to optimize conversion costs, Datasets can hold tabular data in-memory as either Arrow Tables or Pandas DataFrames.

Different ways of creating Datasets leads to a different starting internal format:

  • Reading tabular files (Parquet, CSV, JSON) creates Arrow blocks initially.

  • Converting from Pandas, Dask, Modin, and Mars creates Pandas blocks initially.

  • Reading NumPy files or converting from NumPy ndarrays creates Arrow blocks.

  • Reading TFRecord file creates Arrow blocks.

  • Reading MongoDB creates Arrow blocks.

However, this internal format is not exposed to the user. Datasets converts between formats as needed internally depending on the specified batch_format of transformations.