ray.data.DatasetPipeline#

class ray.data.DatasetPipeline(base_iterable: Iterable[Callable[[], ray.data.dataset.Dataset[ray.data.block.T]]], stages: Optional[List[Callable[[ray.data.dataset.Dataset[Any]], ray.data.dataset.Dataset[Any]]]] = None, length: Optional[int] = None, progress_bars: bool = True, _executed: Optional[List[bool]] = None)[source]#

Bases: Generic[ray.data.block.T]

Implements a pipeline of Datasets.

DatasetPipelines implement pipelined execution. This allows for the overlapped execution of data input (e.g., reading files), computation (e.g. feature preprocessing), and output (e.g., distributed ML training).

A DatasetPipeline can be created by either repeating a Dataset (ds.repeat(times=None)), by turning a single Dataset into a pipeline (ds.window(blocks_per_window=10)), or defined explicitly using DatasetPipeline.from_iterable().

DatasetPipeline supports the all the per-record transforms of Datasets (e.g., map, flat_map, filter), holistic transforms (e.g., repartition), and output methods (e.g., iter_rows, to_tf, to_torch, write_datasource).

PublicAPI: This API is stable across Ray releases.

Methods

__init__(base_iterable[, stages, length, ...])

Construct a DatasetPipeline (internal API).

add_column(col, fn, *[, compute])

Apply Dataset.add_column to each dataset/window in this pipeline.

count()

Count the number of records in the dataset pipeline.

dataset_format()

The format of the dataset pipeline's underlying data blocks.

drop_columns(cols, *[, compute])

Apply Dataset.drop_columns to each dataset/window in this pipeline.

filter(fn, *[, compute])

Apply Dataset.filter to each dataset/window in this pipeline.

flat_map(fn, *[, compute])

Apply Dataset.flat_map to each dataset/window in this pipeline.

foreach_window(fn)

Apply a transform to each dataset/window in this pipeline.

from_iterable(iterable)

Create a pipeline from an sequence of Dataset producing functions.

iter_batches(*[, prefetch_blocks, ...])

Return a local batched iterator over the data in the pipeline.

iter_datasets()

Iterate over the output datasets of this pipeline.

iter_epochs([max_epoch])

Split this pipeline up by epoch.

iter_rows(*[, prefetch_blocks])

Return a local row iterator over the data in the pipeline.

iter_tf_batches(*[, prefetch_blocks, ...])

Call Dataset.iter_tf_batches over the stream of output batches from the pipeline.

iter_torch_batches(*[, prefetch_blocks, ...])

Call Dataset.iter_torch_batches over the stream of output batches from the pipeline.

iterator()

Return a DatasetIterator that can be used to repeatedly iterate over the dataset.

map(fn, *[, compute])

Apply Dataset.map to each dataset/window in this pipeline.

map_batches(fn, *[, batch_size, compute, ...])

Apply Dataset.map_batches to each dataset/window in this pipeline.

random_shuffle_each_window(*[, seed, num_blocks])

Apply Dataset.random_shuffle to each dataset/window in this pipeline.

randomize_block_order_each_window(*[, seed])

Apply Dataset.randomize_block_order to each dataset/window in this pipeline.

repartition_each_window(num_blocks, *[, shuffle])

Apply Dataset.repartition to each dataset/window in this pipeline.

repeat([times])

Repeat this pipeline a given number or times, or indefinitely.

rewindow(*, blocks_per_window[, preserve_epoch])

Change the windowing (blocks per dataset) of this pipeline.

schema([fetch_if_missing])

Return the schema of the dataset pipeline.

select_columns(cols, *[, compute])

Apply Dataset.select_columns to each dataset/window in this pipeline.

show([limit])

Call Dataset.show over the stream of output batches from the pipeline

show_windows([limit_per_dataset])

Print up to the given number of records from each window/dataset.

sort_each_window([key, descending])

Apply Dataset.sort to each dataset/window in this pipeline.

split(n, *[, equal, locality_hints])

Split the pipeline into n disjoint pipeline shards.

split_at_indices(indices)

Split the datasets within the pipeline at the given indices (like np.split).

stats([exclude_first_window])

Returns a string containing execution timing information.

sum()

Sum the records in the dataset pipeline.

take([limit])

Call Dataset.take over the stream of output batches from the pipeline

take_all([limit])

Call Dataset.take_all over the stream of output batches from the pipeline

to_tf(*, output_signature[, label_column, ...])

Call Dataset.to_tf over the stream of output batches from the pipeline

to_torch(*[, label_column, feature_columns, ...])

Call Dataset.to_torch over the stream of output batches from the pipeline

write_csv(path, *[, filesystem, ...])

Call Dataset.write_csv on each output dataset of this pipeline.

write_datasource(datasource, *[, ...])

Call Dataset.write_datasource on each output dataset of this pipeline.

write_json(path, *[, filesystem, ...])

Call Dataset.write_json on each output dataset of this pipeline.

write_parquet(path, *[, filesystem, ...])

Call Dataset.write_parquet on each output dataset of this pipeline.

write_tfrecords(path, *[, filesystem, ...])

Call Dataset.write_tfrecords on each output dataset of this pipeline.