DatasetPipeline API

class ray.data.dataset_pipeline.DatasetPipeline(base_iterable: Iterable[Callable[[], ray.data.dataset.Dataset[ray.data.block.T]]], stages: Optional[List[Callable[[ray.data.dataset.Dataset[Any]], ray.data.dataset.Dataset[Any]]]] = None, length: Optional[int] = None, progress_bars: bool = True, _executed: Optional[List[bool]] = None)[source]

Implements a pipeline of Datasets.

DatasetPipelines implement pipelined execution. This allows for the overlapped execution of data input (e.g., reading files), computation (e.g. feature preprocessing), and output (e.g., distributed ML training).

A DatasetPipeline can be created by either repeating a Dataset (ds.repeat(times=None)), by turning a single Dataset into a pipeline (ds.window(blocks_per_window=10)), or defined explicitly using DatasetPipeline.from_iterable().

DatasetPipeline supports the all the per-record transforms of Datasets (e.g., map, flat_map, filter), holistic transforms (e.g., repartition), and output methods (e.g., iter_rows, to_tf, to_torch, write_datasource).

PublicAPI: This API is stable across Ray releases.

Basic Transformations

ray.data.DatasetPipeline.map

Apply Dataset.map to each dataset/window in this pipeline.

ray.data.DatasetPipeline.map_batches

Apply Dataset.map_batches to each dataset/window in this pipeline.

ray.data.DatasetPipeline.flat_map

Apply Dataset.flat_map to each dataset/window in this pipeline.

ray.data.DatasetPipeline.foreach_window

Apply a transform to each dataset/window in this pipeline.

ray.data.DatasetPipeline.filter

Apply Dataset.filter to each dataset/window in this pipeline.

ray.data.DatasetPipeline.add_column

Apply Dataset.add_column to each dataset/window in this pipeline.

ray.data.DatasetPipeline.drop_columns

Apply Dataset.drop_columns to each dataset/window in this pipeline.

Sorting, Shuffling, Repartitioning

ray.data.DatasetPipeline.sort_each_window

Apply Dataset.sort to each dataset/window in this pipeline.

ray.data.DatasetPipeline.random_shuffle_each_window

Apply Dataset.random_shuffle to each dataset/window in this pipeline.

ray.data.DatasetPipeline.randomize_block_order_each_window

Apply Dataset.randomize_block_order to each dataset/window in this pipeline.

ray.data.DatasetPipeline.repartition_each_window

Apply Dataset.repartition to each dataset/window in this pipeline.

Splitting DatasetPipelines

ray.data.DatasetPipeline.split

Split the pipeline into n disjoint pipeline shards.

ray.data.DatasetPipeline.split_at_indices

Split the datasets within the pipeline at the given indices (like np.split).

Creating DatasetPipelines

ray.data.DatasetPipeline.repeat

Repeat this pipeline a given number or times, or indefinitely.

ray.data.DatasetPipeline.rewindow

Change the windowing (blocks per dataset) of this pipeline.

ray.data.DatasetPipeline.from_iterable

Create a pipeline from an sequence of Dataset producing functions.

Consuming DatasetPipelines

ray.data.DatasetPipeline.show

Call Dataset.show over the stream of output batches from the pipeline

ray.data.DatasetPipeline.show_windows

Print up to the given number of records from each window/dataset.

ray.data.DatasetPipeline.take

Call Dataset.take over the stream of output batches from the pipeline

ray.data.DatasetPipeline.take_all

Call Dataset.take_all over the stream of output batches from the pipeline

ray.data.DatasetPipeline.iter_rows

Return a local row iterator over the data in the pipeline.

ray.data.DatasetPipeline.iter_batches

Return a local batched iterator over the data in the pipeline.

ray.data.DatasetPipeline.iter_torch_batches

Call Dataset.iter_torch_batches over the stream of output batches from the pipeline.

ray.data.DatasetPipeline.iter_tf_batches

Call Dataset.iter_tf_batches over the stream of output batches from the pipeline.

I/O and Conversion

ray.data.DatasetPipeline.write_json

Call Dataset.write_json on each output dataset of this pipeline.

ray.data.DatasetPipeline.write_csv

Call Dataset.write_csv on each output dataset of this pipeline.

ray.data.DatasetPipeline.write_parquet

Call Dataset.write_parquet on each output dataset of this pipeline.

ray.data.DatasetPipeline.write_datasource

Call Dataset.write_datasource on each output dataset of this pipeline.

ray.data.DatasetPipeline.to_tf

Call Dataset.to_tf over the stream of output batches from the pipeline

ray.data.DatasetPipeline.to_torch

Call Dataset.to_torch over the stream of output batches from the pipeline

Inspecting Metadata

ray.data.DatasetPipeline.schema

Return the schema of the dataset pipeline.

ray.data.DatasetPipeline.count

Count the number of records in the dataset pipeline.

ray.data.DatasetPipeline.stats

Returns a string containing execution timing information.

ray.data.DatasetPipeline.sum

Sum the records in the dataset pipeline.

Basic transformations

DatasetPipeline.map(fn: Union[Callable[[ray.data.block.T], ray.data.block.U], ray.data.block._CallableClassProtocol[ray.data.block.T, ray.data.block.U]], *, compute: Optional[Union[str, ray.data._internal.compute.ComputeStrategy]] = None, **ray_remote_args) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U][source]

Apply Dataset.map to each dataset/window in this pipeline.

DatasetPipeline.map_batches(fn: Union[Callable[[Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]], Union[List[ray.data.block.T], pyarrow.Table, pandas.DataFrame, bytes, numpy.ndarray, Dict[str, numpy.ndarray]]], ray.data.block._CallableClassProtocol], *, batch_size: Optional[int] = 4096, compute: Union[str, ray.data._internal.compute.ComputeStrategy] = None, batch_format: str = 'native', fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, fn_constructor_args: Optional[Iterable[Any]] = None, fn_constructor_kwargs: Optional[Dict[str, Any]] = None, **ray_remote_args) DatasetPipeline[U][source]

Apply Dataset.map_batches to each dataset/window in this pipeline.

DatasetPipeline.flat_map(fn: Union[Callable[[ray.data.block.T], ray.data.block.U], ray.data.block._CallableClassProtocol[ray.data.block.T, ray.data.block.U]], *, compute: Optional[Union[str, ray.data._internal.compute.ComputeStrategy]] = None, **ray_remote_args) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U][source]

Apply Dataset.flat_map to each dataset/window in this pipeline.

DatasetPipeline.foreach_window(fn: Callable[[ray.data.dataset.Dataset[ray.data.block.T]], ray.data.dataset.Dataset[ray.data.block.U]]) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U][source]

Apply a transform to each dataset/window in this pipeline.

Args:

fn: The function to transform each dataset with.

Returns:

The transformed DatasetPipeline.

DeveloperAPI: This API may change across minor Ray releases.

DatasetPipeline.filter(fn: Union[Callable[[ray.data.block.T], ray.data.block.U], ray.data.block._CallableClassProtocol[ray.data.block.T, ray.data.block.U]], *, compute: Optional[Union[str, ray.data._internal.compute.ComputeStrategy]] = None, **ray_remote_args) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.T][source]

Apply Dataset.filter to each dataset/window in this pipeline.

DatasetPipeline.add_column(col: str, fn: Callable[[pandas.DataFrame], pandas.Series], *, compute: Optional[str] = None, **ray_remote_args) DatasetPipeline[U][source]

Apply Dataset.add_column to each dataset/window in this pipeline.

DatasetPipeline.drop_columns(cols: List[str], *, compute: Optional[str] = None, **ray_remote_args) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U][source]

Apply Dataset.drop_columns to each dataset/window in this pipeline.

Sorting, Shuffling, Repartitioning

DatasetPipeline.sort_each_window(key: Union[None, str, Callable[[ray.data.block.T], Any]] = None, descending: bool = False) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U][source]

Apply Dataset.sort to each dataset/window in this pipeline.

DatasetPipeline.random_shuffle_each_window(*, seed: Optional[int] = None, num_blocks: Optional[int] = None) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U][source]

Apply Dataset.random_shuffle to each dataset/window in this pipeline.

DatasetPipeline.randomize_block_order_each_window(*, seed: Optional[int] = None) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U][source]

Apply Dataset.randomize_block_order to each dataset/window in this pipeline.

DatasetPipeline.repartition_each_window(num_blocks: int, *, shuffle: bool = False) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.U][source]

Apply Dataset.repartition to each dataset/window in this pipeline.

Splitting DatasetPipelines

DatasetPipeline.split(n: int, *, equal: bool = False, locality_hints: Optional[List[Any]] = None) List[ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.T]][source]

Split the pipeline into n disjoint pipeline shards.

This returns a list of sub-pipelines that can be passed to Ray tasks and actors and used to read the pipeline records in parallel.

Examples

>>> import ray
>>> pipe = ray.data.range(10).repeat(50) 
>>> workers = ... 
>>> # Split up a pipeline to process over `n` worker actors.
>>> shards = pipe.split( 
...     len(workers), locality_hints=workers)
>>> for shard, worker in zip(shards, workers): 
...     worker.consume.remote(shard) 

Time complexity: O(1)

Implementation detail: this launches a coordinator actor that is used to execute the pipeline and push data blocks to each pipeline shard. Reading from an individual shard will be blocked if other shards are falling behind. A warning will be printed if a shard has been blocked on read for more than 10 seconds.

Parameters
  • n – Number of child pipelines to return.

  • equal – Whether to guarantee each split has an equal number of records. This may drop records if they cannot be divided equally among the splits.

  • locality_hints – [Experimental] A list of Ray actor handles of size n. The system will try to co-locate the blocks of the ith pipeline shard with the ith actor to maximize data locality.

Returns

A list of n disjoint pipeline splits.

DatasetPipeline.split_at_indices(indices: List[int]) List[ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.T]][source]

Split the datasets within the pipeline at the given indices (like np.split).

This will split each dataset contained within this pipeline, thereby producing len(indices) + 1 pipelines with the first pipeline containing the [0, indices[0]) slice from each dataset, the second pipeline containing the [indices[0], indices[1]) slice from each dataset, and so on, with the final pipeline will containing the [indices[-1], self.count()) slice from each dataset.

Examples

>>> import ray
>>> p1, p2, p3 = ray.data.range( 
...     8).repeat(2).split_at_indices([2, 5]) 
>>> p1.take() 
[0, 1, 0, 1]
>>> p2.take() 
[2, 3, 4, 2, 3, 4]
>>> p3.take() 
[5, 6, 7, 5, 6, 7]

Time complexity: O(num splits)

See also: DatasetPipeline.split

Parameters

indices – List of sorted integers which indicate where the pipeline will be split. If an index exceeds the length of the pipeline, an empty pipeline will be returned.

Returns

The pipeline splits.

Creating DatasetPipelines

DatasetPipeline.repeat(times: Optional[int] = None) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.T][source]

Repeat this pipeline a given number or times, or indefinitely.

This operation is only allowed for pipelines of a finite length. An error will be raised for pipelines of infinite length.

Note that every repeat of the pipeline is considered an “epoch” for the purposes of iter_epochs(). If there are multiple repeat calls, the latest repeat takes precedence for the purpose of defining epochs.

Parameters

times – The number of times to loop over this pipeline, or None to repeat indefinitely.

DatasetPipeline.rewindow(*, blocks_per_window: int, preserve_epoch: bool = True) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.T][source]

Change the windowing (blocks per dataset) of this pipeline.

Changes the windowing of this pipeline to the specified size. For example, if the current pipeline has two blocks per dataset, and rewindow(blocks_per_window=4) is requested, adjacent datasets will be merged until each dataset is 4 blocks. If rewindow(blocks_per_window) was requested the datasets will be split into smaller windows.

Parameters
  • blocks_per_window – The new target blocks per window.

  • preserve_epoch – Whether to preserve epoch boundaries. If set to False, then windows can contain data from two adjacent epochs.

static DatasetPipeline.from_iterable(iterable: Iterable[Callable[[], ray.data.dataset.Dataset[ray.data.block.T]]]) ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.T][source]

Create a pipeline from an sequence of Dataset producing functions.

Parameters

iterable – A finite or infinite-length sequence of functions that each produce a Dataset when called.

Consuming DatasetPipelines

DatasetPipeline.show(limit: int = 20) None[source]

Call Dataset.show over the stream of output batches from the pipeline

DatasetPipeline.show_windows(limit_per_dataset: int = 10) None[source]

Print up to the given number of records from each window/dataset.

This is helpful as a debugging tool for understanding the structure of dataset pipelines.

Parameters

limit_per_dataset – Rows to print per window/dataset.

DatasetPipeline.take(limit: int = 20) List[ray.data.block.T][source]

Call Dataset.take over the stream of output batches from the pipeline

DatasetPipeline.take_all(limit: int = 100000) List[ray.data.block.T][source]

Call Dataset.take_all over the stream of output batches from the pipeline

DatasetPipeline.iter_rows(*, prefetch_blocks: int = 0) Iterator[Union[ray.data.block.T, ray.data.row.TableRow]][source]

Return a local row iterator over the data in the pipeline.

If the dataset is a tabular dataset (Arrow/Pandas blocks), dict-like mappings TableRow are yielded for each row by the iterator. If the dataset is not tabular, the raw row is yielded.

Examples

>>> import ray
>>> for i in ray.data.range(1000000).repeat(5).iter_rows(): 
...     print(i) 

Time complexity: O(1)

Parameters

prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

Returns

A local iterator over the records in the pipeline.

DatasetPipeline.iter_batches(*, prefetch_blocks: int = 0, batch_size: Optional[int] = 256, batch_format: str = 'native', drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None) Iterator[Union[pandas.DataFrame, pyarrow.Table, numpy.ndarray, Dict[str, numpy.ndarray], list]][source]

Return a local batched iterator over the data in the pipeline.

Examples

>>> import ray
>>> ds = ray.data.range(1000000).repeat(5) 
>>> for pandas_df in ds.iter_batches(): 
...     print(pandas_df) 

Time complexity: O(1)

Parameters
  • prefetch_blocks – The number of blocks to prefetch ahead of the current block during the scan.

  • batch_size – The number of rows in each batch, or None to use entire blocks as batches (blocks may contain different number of rows). The final batch may include fewer than batch_size rows if drop_last is False. Defaults to 256.

  • batch_format – The format in which to return each batch. Specify “native” to use the current block format (promoting Arrow to pandas automatically), “pandas” to select pandas.DataFrame or “pyarrow” to select pyarrow.Table. Default is “native”.

  • drop_last – Whether to drop the last batch if it’s incomplete.

  • local_shuffle_buffer_size – If non-None, the data will be randomly shuffled using a local in-memory shuffle buffer, and this value will serve as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to the buffer, the remaining rows in the buffer will be drained. This buffer size must be greater than or equal to batch_size, and therefore batch_size must also be specified when using local shuffling.

  • local_shuffle_seed – The seed to use for the local random shuffle.

Returns

An iterator over record batches.

DatasetPipeline.iter_epochs(max_epoch: int = - 1) Iterator[ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.T]][source]

Split this pipeline up by epoch.

This allows reading of data per-epoch for repeated Datasets, which is useful for ML training. For example, ray.data.range(10).repeat(50) generates a pipeline with 500 rows total split across 50 epochs. This method allows iterating over the data individually per epoch (repetition) of the original data.

Parameters

max_epoch – If greater than zero, stop after the given number of epochs.

Examples

>>> import ray
>>> epochs = ray.data.range(10).repeat(50).iter_epochs()  
>>> for i, epoch in enumerate(epochs): 
...     print("Epoch", i) 
...     for row in epoch.iter_rows(): 
...         print(row) 
Returns

Iterator over epoch objects, where each epoch is a DatasetPipeline containing data from that epoch only.

DatasetPipeline.iter_tf_batches(*, prefetch_blocks: int = 0, batch_size: Optional[int] = 256, batch_format: str = 'native', drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None) Iterator[Union[tf.Tensor, Dict[str, tf.Tensor]]][source]

Call Dataset.iter_tf_batches over the stream of output batches from the pipeline.

DatasetPipeline.iter_torch_batches(*, prefetch_blocks: int = 0, batch_size: Optional[int] = 256, batch_format: str = 'native', drop_last: bool = False, local_shuffle_buffer_size: Optional[int] = None, local_shuffle_seed: Optional[int] = None) Iterator[Union[torch.Tensor, Dict[str, torch.Tensor]]][source]

Call Dataset.iter_torch_batches over the stream of output batches from the pipeline.

DatasetPipeline.iter_datasets() Iterator[ray.data.dataset.Dataset[ray.data.block.T]][source]

Iterate over the output datasets of this pipeline.

Returns:

Iterator over the datasets outputted from this pipeline.

DeveloperAPI: This API may change across minor Ray releases.

I/O and Conversion

DatasetPipeline.write_json(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, pandas_json_args_fn: Callable[[], Dict[str, Any]] = <function DatasetPipeline.<lambda>>, ray_remote_args: Dict[str, Any] = None, **pandas_json_args) None[source]

Call Dataset.write_json on each output dataset of this pipeline.

DatasetPipeline.write_csv(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, arrow_csv_args_fn: Callable[[], Dict[str, Any]] = <function DatasetPipeline.<lambda>>, ray_remote_args: Dict[str, Any] = None, **arrow_csv_args) None[source]

Call Dataset.write_csv on each output dataset of this pipeline.

DatasetPipeline.write_parquet(path: str, *, filesystem: Optional[pyarrow.fs.FileSystem] = None, try_create_dir: bool = True, arrow_open_stream_args: Optional[Dict[str, Any]] = None, block_path_provider: ray.data.datasource.file_based_datasource.BlockWritePathProvider = <ray.data.datasource.file_based_datasource.DefaultBlockWritePathProvider object>, arrow_parquet_args_fn: Callable[[], Dict[str, Any]] = <function DatasetPipeline.<lambda>>, ray_remote_args: Dict[str, Any] = None, **arrow_parquet_args) None[source]

Call Dataset.write_parquet on each output dataset of this pipeline.

DatasetPipeline.write_datasource(datasource: ray.data.datasource.datasource.Datasource[ray.data.block.T], *, ray_remote_args: Optional[Dict[str, Any]] = None, **write_args) None[source]

Call Dataset.write_datasource on each output dataset of this pipeline.

DatasetPipeline.to_tf(*, output_signature: Union[tf.TypeSpec, List[tf.TypeSpec], Dict[str, tf.TypeSpec], Tuple[Union[tf.TypeSpec, List[tf.TypeSpec], Dict[str, tf.TypeSpec]], tf.TypeSpec]], label_column: Optional[str] = None, feature_columns: Optional[Union[List[str], List[List[str]], Dict[str, List[str]]]] = None, prefetch_blocks: int = 0, batch_size: int = 1, drop_last: bool = False) tf.data.Dataset[source]

Call Dataset.to_tf over the stream of output batches from the pipeline

DatasetPipeline.to_torch(*, label_column: Optional[str] = None, feature_columns: Optional[Union[List[str], List[List[str]], Dict[str, List[str]]]] = None, label_column_dtype: Optional[torch.dtype] = None, feature_column_dtypes: Optional[Union[torch.dtype, List[torch.dtype], Dict[str, torch.dtype]]] = None, batch_size: int = 1, prefetch_blocks: int = 0, drop_last: bool = False, unsqueeze_label_tensor: bool = True, unsqueeze_feature_tensors: bool = True) torch.utils.data.IterableDataset[source]

Call Dataset.to_torch over the stream of output batches from the pipeline

Inspecting Metadata

DatasetPipeline.schema(fetch_if_missing: bool = False) Union[type, pyarrow.lib.Schema][source]

Return the schema of the dataset pipeline.

For datasets of Arrow records, this will return the Arrow schema. For dataset of Python objects, this returns their Python type.

Note: This is intended to be a method for peeking schema before the execution of DatasetPipeline. If execution has already started, it will simply return the cached schema from the previous call.

Time complexity: O(1)

Parameters

fetch_if_missing – If True, synchronously fetch the schema if it’s not known. Default is False, where None is returned if the schema is not known.

Returns

The Python type or Arrow schema of the records, or None if the schema is not known.

DatasetPipeline.count() int[source]

Count the number of records in the dataset pipeline.

This blocks until the entire pipeline is fully executed.

Time complexity: O(dataset size / parallelism)

Returns

The number of records in the dataset pipeline.

DatasetPipeline.stats(exclude_first_window: bool = True) str[source]

Returns a string containing execution timing information.

Parameters

exclude_first_window – Whether to exclude the first window from the pipeline time breakdown. This is generally a good idea since there is always a stall waiting for the first window to be initially computed, which can be misleading in the stats.

DatasetPipeline.sum() int[source]

Sum the records in the dataset pipeline.

This blocks until the entire pipeline is fully executed.

Time complexity: O(dataset size / parallelism)

Returns

The sum of the records in the dataset pipeline.