Key Concepts#

Dataset#

A Dataset operates over a sequence of Ray object references to blocks. Each block holds a set of records in an Arrow table or pandas DataFrame. Having multiple blocks in a dataset allows for parallel transformation and ingest.

For ML use cases, Dataset natively supports mixing tensors with tabular data. To learn more, read Working with tensor data.

The following figure visualizes a dataset with three blocks, each holding 1000 rows. Note that certain blocks may not be computed yet. Normally, callers iterate over dataset blocks in a streaming fashion, so that not all blocks need to be materialized in the cluster memory at once.

../_images/dataset-arch.svg

Reading Data#

Dataset uses Ray tasks to read data from remote storage in parallel. Each read task reads one or more files and produces an output block:

../_images/dataset-read.svg

You can manually specify the number of read tasks, but the final parallelism is always capped by the number of files in the underlying dataset.

For an in-depth guide on creating datasets, read Loading Data.

Transforming Data#

Dataset uses either Ray tasks or Ray actors to transform data blocks. By default, it uses tasks.

To use Actors, pass an ActorPoolStrategy to compute in methods like map_batches(). ActorPoolStrategy creates an autoscaling pool of Ray actors. This allows you to cache expensive state initialization (e.g., model loading for GPU-based tasks).

../_images/dataset-map.svg

For an in-depth guide on transforming datasets, read Transforming Data.

Shuffling Data#

Operations like sort() and groupby() require blocks to be partitioned by value or shuffled. Dataset uses tasks to shuffle blocks in a map-reduce style: map tasks partition blocks by value and then reduce tasks merge co-partitioned blocks.

Call repartition() to change the number of blocks in a Dataset. Repartition has two modes:

  • shuffle=False - performs the minimal data movement needed to equalize block sizes

  • shuffle=True - performs a full distributed shuffle

../_images/dataset-shuffle.svg

Dataset can shuffle multi-terabyte datasets, leveraging the Ray object store for disk spilling. For an in-depth guide on shuffle performance, read Performance Tips and Tuning. Note that operations like shuffle materialize the entire Dataset prior to their execution (shuffle execution is not streamed through memory).

Iteration and materialization#

Most transformations on a dataset are lazy. They don’t execute until you iterate over the dataset or call Dataset.materialize(). When a Dataset is materialized, its type becomes a MaterializedDataset, which indicates that all its blocks are materialized in Ray object store memory.

Dataset transformations are executed in a streaming way, incrementally on the data and with operators processed in parallel, see Streaming Execution.

Datasets and MaterializedDatasets can be freely passed between Ray tasks, actors, and libraries without incurring copies of the underlying block data (pass by reference semantics).

Fault tolerance#

Dataset performs lineage reconstruction to recover data. If an application error or system failure occurs, Dataset recreates lost blocks by re-executing tasks. If compute=ActorPoolStrategy(size=n) is used, then Ray restarts the actor used for computing the block prior to re-executing the task.

Fault tolerance is not supported if the original worker process that created the Dataset dies. This is because the creator stores the metadata for the objects that comprise the Dataset.