ray.data.Dataset.window
ray.data.Dataset.window#
- Dataset.window(*, blocks_per_window: Optional[int] = None, bytes_per_window: Optional[int] = None) DatasetPipeline[T] [source]#
Convert this into a DatasetPipeline by windowing over data blocks.
Transformations prior to the call to
window()
are evaluated in bulk on the entire dataset. Transformations done on the returned pipeline are evaluated incrementally per window of blocks as data is read from the output of the pipeline.Windowing execution allows for output to be read sooner without waiting for all transformations to fully execute, and can also improve efficiency if transforms use different resources (e.g., GPUs).
Without windowing:
[preprocessing......] [inference.......] [write........] Time ----------------------------------------------------------->
With windowing:
[prep1] [prep2] [prep3] [infer1] [infer2] [infer3] [write1] [write2] [write3] Time ----------------------------------------------------------->
Examples
>>> import ray >>> # Create an inference pipeline. >>> ds = ray.data.read_binary_files(dir) >>> infer = ... >>> pipe = ds.window(blocks_per_window=10).map(infer) DatasetPipeline(num_windows=40, num_stages=2) >>> # The higher the stage parallelism, the shorter the pipeline. >>> pipe = ds.window(blocks_per_window=20).map(infer) DatasetPipeline(num_windows=20, num_stages=2) >>> # Outputs can be incrementally read from the pipeline. >>> for item in pipe.iter_rows(): ... print(item)
- Parameters
blocks_per_window – The window size (parallelism) in blocks. Increasing window size increases pipeline throughput, but also increases the latency to initial output, since it decreases the length of the pipeline. Setting this to infinity effectively disables pipelining.
bytes_per_window – Specify the window size in bytes instead of blocks. This will be treated as an upper bound for the window size, but each window will still include at least one block. This is mutually exclusive with
blocks_per_window
.