DatasetPipeline.split(n: int, *, equal: bool = False, locality_hints: Optional[List[Any]] = None) List[ray.data.dataset_pipeline.DatasetPipeline[ray.data.block.T]][source]#

Split the pipeline into n disjoint pipeline shards.

This returns a list of sub-pipelines that can be passed to Ray tasks and actors and used to read the pipeline records in parallel.


>>> import ray
>>> pipe = ray.data.range(10).repeat(50) 
>>> workers = ... 
>>> # Split up a pipeline to process over `n` worker actors.
>>> shards = pipe.split( 
...     len(workers), locality_hints=workers)
>>> for shard, worker in zip(shards, workers): 
...     worker.consume.remote(shard) 

Time complexity: O(1)

Implementation detail: this launches a coordinator actor that is used to execute the pipeline and push data blocks to each pipeline shard. Reading from an individual shard will be blocked if other shards are falling behind. A warning will be printed if a shard has been blocked on read for more than 10 seconds.

  • n – Number of child pipelines to return.

  • equal – Whether to guarantee each split has an equal number of records. This may drop records if they cannot be divided equally among the splits.

  • locality_hints – [Experimental] A list of Ray actor handles of size n. The system will try to co-locate the blocks of the ith pipeline shard with the ith actor to maximize data locality.


A list of n disjoint pipeline splits.