ray.data.Dataset.streaming_split#
- Dataset.streaming_split(n: int, *, equal: bool = False, locality_hints: List[NodeIdStr] | None = None) List[DataIterator][source]#
- Returns - n- DataIteratorsthat can be used to read disjoint subsets of the dataset in parallel.- This method is the recommended way to consume - Datasetsfor distributed training.- Streaming split works by delegating the execution of this - Datasetto a coordinator actor. The coordinator pulls block references from the executed stream, and divides those blocks among- noutput iterators. Iterators pull blocks from the coordinator actor to return to their caller on- next.- The returned iterators are also repeatable; each iteration will trigger a new execution of the Dataset. There is an implicit barrier at the start of each iteration, which means that - nextmust be called on all iterators before the iteration starts.- Warning - Because iterators are pulling blocks from the same - Datasetexecution, if one iterator falls behind, other iterators may be stalled.- Note - This operation will trigger execution of the lazy transformations performed on this dataset. - Examples - import ray ds = ray.data.range(100) it1, it2 = ds.streaming_split(2, equal=True) - Consume data from iterators in parallel. - @ray.remote def consume(it): for batch in it.iter_batches(): pass ray.get([consume.remote(it1), consume.remote(it2)]) - You can loop over the iterators multiple times (multiple epochs). - @ray.remote def train(it): NUM_EPOCHS = 2 for _ in range(NUM_EPOCHS): for batch in it.iter_batches(): pass ray.get([train.remote(it1), train.remote(it2)]) - The following remote function call blocks waiting for a read on - it2to start.- ray.get(train.remote(it1)) - Parameters:
- n – Number of output iterators to return. 
- equal – If - True, each output iterator sees an exactly equal number of rows, dropping data if necessary. If- False, some iterators may see slightly more or less rows than others, but no data is dropped.
- locality_hints – Specify the node ids corresponding to each iterator location. Dataset will try to minimize data movement based on the iterator output locations. This list must have length - n. You can get the current node id of a task or actor by calling- ray.get_runtime_context().get_node_id().
 
- Returns:
- The output iterator splits. These iterators are Ray-serializable and can be freely passed to any Ray task or actor. 
 - See also - Dataset.split()
- Unlike - streaming_split(),- split()materializes the dataset in memory.