ray.data.Dataset.streaming_split#

Dataset.streaming_split(n: int, *, equal: bool = False, locality_hints: Optional[List[NodeIdStr]] = None) List[ray.data.dataset_iterator.DatasetIterator][source]#

Returns n DatasetIterators that can be used to read disjoint subsets of the dataset in parallel.

This method is the recommended way to consume Datasets from multiple processes (e.g., for distributed training), and requires streaming execution mode.

Streaming split works by delegating the execution of this Dataset to a coordinator actor. The coordinator pulls block references from the executed stream, and divides those blocks among n output iterators. Iterators pull blocks from the coordinator actor to return to their caller on next.

The returned iterators are also repeatable; each iteration will trigger a new execution of the Dataset. There is an implicit barrier at the start of each iteration, which means that next must be called on all iterators before the iteration starts.

Warning: because iterators are pulling blocks from the same Dataset execution, if one iterator falls behind other iterators may be stalled.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Examples

>>> import ray
>>> ds = ray.data.range(1000000)
>>> it1, it2 = ds.streaming_split(2, equal=True)
>>> # Can consume from both iterators in parallel.
>>> @ray.remote
... def consume(it):
...    for batch in it.iter_batches():
...        print(batch)
>>> ray.get([consume.remote(it1), consume.remote(it2)])  
>>> # Can loop over the iterators multiple times (multiple epochs).
>>> @ray.remote
... def train(it):
...    NUM_EPOCHS = 100
...    for _ in range(NUM_EPOCHS):
...        for batch in it.iter_batches():
...            print(batch)
>>> ray.get([train.remote(it1), train.remote(it2)])  
>>> # ERROR: this will block waiting for a read on `it2` to start.
>>> ray.get(train.remote(it1))  
Parameters
  • n – Number of output iterators to return.

  • equal – If True, each output iterator will see an exactly equal number of rows, dropping data if necessary. If False, some iterators may see slightly more or less rows than other, but no data will be dropped.

  • locality_hints – Specify the node ids corresponding to each iterator location. Datasets will try to minimize data movement based on the iterator output locations. This list must have length n. You can get the current node id of a task or actor by calling ray.get_runtime_context().get_node_id().

Returns

The output iterator splits. These iterators are Ray-serializable and can be freely passed to any Ray task or actor.