ray.data.DatasetPipeline
ray.data.DatasetPipeline#
- class ray.data.DatasetPipeline(base_iterable: Iterable[Callable[[], ray.data.dataset.Dataset[ray.data.block.T]]], stages: Optional[List[Callable[[ray.data.dataset.Dataset[Any]], ray.data.dataset.Dataset[Any]]]] = None, length: Optional[int] = None, progress_bars: bool = True, _executed: Optional[List[bool]] = None)[source]#
Bases:
Generic
[ray.data.block.T
]Implements a pipeline of Datasets.
DatasetPipelines implement pipelined execution. This allows for the overlapped execution of data input (e.g., reading files), computation (e.g. feature preprocessing), and output (e.g., distributed ML training).
A DatasetPipeline can be created by either repeating a Dataset (
ds.repeat(times=None)
), by turning a single Dataset into a pipeline (ds.window(blocks_per_window=10)
), or defined explicitly usingDatasetPipeline.from_iterable()
.DatasetPipeline supports the all the per-record transforms of Datasets (e.g., map, flat_map, filter), holistic transforms (e.g., repartition), and output methods (e.g., iter_rows, to_tf, to_torch, write_datasource).
PublicAPI: This API is stable across Ray releases.
Methods
__init__
(base_iterable[, stages, length, ...])Construct a DatasetPipeline (internal API).
add_column
(col, fn, *[, compute])Apply
Dataset.add_column
to each dataset/window in this pipeline.count
()Count the number of records in the dataset pipeline.
The format of the dataset pipeline's underlying data blocks.
drop_columns
(cols, *[, compute])Apply
Dataset.drop_columns
to each dataset/window in this pipeline.filter
(fn, *[, compute])Apply
Dataset.filter
to each dataset/window in this pipeline.flat_map
(fn, *[, compute])Apply
Dataset.flat_map
to each dataset/window in this pipeline.foreach_window
(fn)Apply a transform to each dataset/window in this pipeline.
from_iterable
(iterable)Create a pipeline from an sequence of Dataset producing functions.
iter_batches
(*[, prefetch_blocks, ...])Return a local batched iterator over the data in the pipeline.
Iterate over the output datasets of this pipeline.
iter_epochs
([max_epoch])Split this pipeline up by epoch.
iter_rows
(*[, prefetch_blocks])Return a local row iterator over the data in the pipeline.
iter_tf_batches
(*[, prefetch_blocks, ...])Call
Dataset.iter_tf_batches
over the stream of output batches from the pipeline.iter_torch_batches
(*[, prefetch_blocks, ...])Call
Dataset.iter_torch_batches
over the stream of output batches from the pipeline.iterator
()Return a
DatasetIterator
that can be used to repeatedly iterate over the dataset.map
(fn, *[, compute])Apply
Dataset.map
to each dataset/window in this pipeline.map_batches
(fn, *[, batch_size, compute, ...])Apply
Dataset.map_batches
to each dataset/window in this pipeline.random_shuffle_each_window
(*[, seed, num_blocks])Apply
Dataset.random_shuffle
to each dataset/window in this pipeline.randomize_block_order_each_window
(*[, seed])Apply
Dataset.randomize_block_order
to each dataset/window in this pipeline.repartition_each_window
(num_blocks, *[, shuffle])Apply
Dataset.repartition
to each dataset/window in this pipeline.repeat
([times])Repeat this pipeline a given number or times, or indefinitely.
rewindow
(*, blocks_per_window[, preserve_epoch])Change the windowing (blocks per dataset) of this pipeline.
schema
([fetch_if_missing])Return the schema of the dataset pipeline.
select_columns
(cols, *[, compute])Apply
Dataset.select_columns
to each dataset/window in this pipeline.show
([limit])Call
Dataset.show
over the stream of output batches from the pipelineshow_windows
([limit_per_dataset])Print up to the given number of records from each window/dataset.
sort_each_window
([key, descending])Apply
Dataset.sort
to each dataset/window in this pipeline.split
(n, *[, equal, locality_hints])Split the pipeline into
n
disjoint pipeline shards.split_at_indices
(indices)Split the datasets within the pipeline at the given indices (like np.split).
stats
([exclude_first_window])Returns a string containing execution timing information.
sum
()Sum the records in the dataset pipeline.
take
([limit])Call
Dataset.take
over the stream of output batches from the pipelinetake_all
([limit])Call
Dataset.take_all
over the stream of output batches from the pipelineto_tf
(*, output_signature[, label_column, ...])Call
Dataset.to_tf
over the stream of output batches from the pipelineto_torch
(*[, label_column, feature_columns, ...])Call
Dataset.to_torch
over the stream of output batches from the pipelinewrite_csv
(path, *[, filesystem, ...])Call
Dataset.write_csv
on each output dataset of this pipeline.write_datasource
(datasource, *[, ...])Call
Dataset.write_datasource
on each output dataset of this pipeline.write_json
(path, *[, filesystem, ...])Call
Dataset.write_json
on each output dataset of this pipeline.write_parquet
(path, *[, filesystem, ...])Call
Dataset.write_parquet
on each output dataset of this pipeline.write_tfrecords
(path, *[, filesystem, ...])Call
Dataset.write_tfrecords
on each output dataset of this pipeline.