Parallel Iterators

ray.util.iter provides a parallel iterator API for simple data ingest and processing. It can be thought of as syntactic sugar around Ray actors and ray.wait loops.

Parallel iterators are lazy and can operate over infinite sequences of items. Iterator transformations are only executed when the user calls next() to fetch the next output item from the iterator.

Note

This API is new and may be revised in future Ray releases. If you encounter any bugs, please file an issue on GitHub.

Concepts

Parallel Iterators: You can create a ParallelIterator object from an existing set of items, range of numbers, set of iterators, or set of worker actors. Ray will create a worker actor that produces the data for each shard of the iterator:

# Create an iterator with 2 worker actors over the list [1, 2, 3, 4].
>>> it = ray.util.iter.from_items([1, 2, 3, 4], num_shards=2)
ParallelIterator[from_items[int, 4, shards=2]]

# Create an iterator with 32 worker actors over range(1000000).
>>> it = ray.util.iter.from_range(1000000, num_shards=32)
ParallelIterator[from_range[1000000, shards=32]]

# Create an iterator over two range(10) generators.
>>> it = ray.util.iter.from_iterators([range(10), range(10)])
ParallelIterator[from_iterators[shards=2]]

# Create an iterator from existing worker actors. These actors must
# implement the ParallelIteratorWorker interface.
>>> it = ray.util.iter.from_actors([a1, a2, a3, a4])
ParallelIterator[from_actors[shards=4]]

Simple transformations can be chained on the iterator, such as mapping, filtering, and batching. These will be executed in parallel on the workers:

# Apply a transformation to each element of the iterator.
>>> it = it.for_each(lambda x: x ** 2)
ParallelIterator[...].for_each()

# Batch together items into a lists of 32 elements.
>>> it = it.batch(32)
ParallelIterator[...].for_each().batch(32)

# Filter out items with odd values.
>>> it = it.filter(lambda x: x % 2 == 0)
ParallelIterator[...].for_each().batch(32).filter()

Local Iterators: To read elements from a parallel iterator, it has to be converted to a LocalIterator by calling gather_sync() or gather_async(). These correspond to ray.get and ray.wait loops over the actors respectively:

# Gather items synchronously (deterministic round robin across shards):
>>> it = ray.util.iter.from_range(1000000, 1)
>>> it = it.gather_sync()
LocalIterator[ParallelIterator[from_range[1000000, shards=1]].gather_sync()]

# Local iterators can be used as any other Python iterator.
>>> it.take(5)
[0, 1, 2, 3, 4]

# They also support chaining of transformations. Unlike transformations
# applied on a ParallelIterator, they will be executed in the current process.
>>> it.filter(lambda x: x % 2 == 0).take(5)
[0, 2, 4, 6, 8]

# Async gather can be used for better performance, but it is non-deterministic.
>>> it = ray.util.iter.from_range(1000, 4).gather_async()
>>> it.take(5)
[0, 250, 500, 750, 1]

Passing iterators to remote functions: Both ParallelIterator and LocalIterator are serializable. They can be passed to any Ray remote function. However, note that each shard should only be read by one process at a time:

# Get local iterators representing the shards of this ParallelIterator:
>>> it = ray.util.iter.from_range(10000, 3)
>>> [s0, s1, s2] = it.shards()
[LocalIterator[from_range[10000, shards=3].shard[0]],
 LocalIterator[from_range[10000, shards=3].shard[1]],
 LocalIterator[from_range[10000, shards=3].shard[2]]]

# Iterator shards can be passed to remote functions.
>>> @ray.remote
... def do_sum(it):
...     return sum(it)
...
>>> ray.get([do_sum.remote(s) for s in it.shards()])
[5552778, 16661667, 27780555]

Semantic Guarantees

The parallel iterator API guarantees the following semantics:

Fetch ordering: When using it.gather_sync().foreach(fn) or it.gather_async().foreach(fn) (or any other transformation after a gather), fn(x_i) will be called on the element x_i before the next element x_{i+1} is fetched from the source actor. This is useful if you need to update the source actor between iterator steps. Note that for async gather, this ordering only applies per shard.

Operator state: Operator state is preserved for each shard. This means that you can pass a stateful callable to .foreach():

class CumulativeSum:
    def __init__(self):
        self.total = 0

    def __call__(self, x):
        self.total += x
        return (self.total, x)

it = ray.util.iter.from_range(5, 1)
for x in it.for_each(CumulativeSum()).gather_sync():
    print(x)

## This prints:
#(0, 0)
#(1, 1)
#(3, 2)
#(6, 3)
#(10, 4)

Example: Streaming word frequency count

Parallel iterators can be used for simple data processing use cases such as streaming grep:

import ray
import glob
import gzip
import numpy as np

ray.init()

file_list = glob.glob("/var/log/syslog*.gz")
it = (
    ray.util.iter.from_items(file_list, num_shards=4)
       .for_each(lambda f: gzip.open(f).readlines())
       .flatten()
       .for_each(lambda line: line.decode("utf-8"))
       .for_each(lambda line: 1 if "cron" in line else 0)
       .batch(1024)
       .for_each(np.mean)
)

# Show the probability of a log line containing "cron", with a
# sliding window of 1024 lines.
for freq in it.gather_async():
    print(freq)

Example: Passing iterator shards to remote functions

Both parallel iterators and local iterators are fully serializable, so once created you can pass them to Ray tasks and actors. This can be useful for distributed training:

import ray
import numpy as np

ray.init()

@ray.remote
def train(data_shard):
    for batch in data_shard:
        print("train on", batch)  # perform model update with batch

it = (
    ray.util.iter.from_range(1000000, num_shards=4, repeat=True)
        .batch(1024)
        .for_each(np.array)
)

work = [train.remote(shard) for shard in it.shards()]
ray.get(work)

Tip

Using ParallelIterator built-in functions is typically most efficient. For example, if you find yourself using list comprehensions like [foo(x) for x in iter.gather_async()], consider using iter.for_each(foo) instead!

API Reference

ray.util.iter.from_items(items: List[T], num_shards: int = 2, repeat: bool = False)ray.util.iter.ParallelIterator[T][source]

Create a parallel iterator from an existing set of objects.

The objects will be divided round-robin among the number of shards.

Parameters
  • items (list) – The list of items to iterate over.

  • num_shards (int) – The number of worker actors to create.

  • repeat (bool) – Whether to cycle over the items forever.

ray.util.iter.from_range(n: int, num_shards: int = 2, repeat: bool = False)ray.util.iter.ParallelIterator[int][source]

Create a parallel iterator over the range 0..n.

The range will be partitioned sequentially among the number of shards.

Parameters
  • n (int) – The max end of the range of numbers.

  • num_shards (int) – The number of worker actors to create.

  • repeat (bool) – Whether to cycle over the range forever.

ray.util.iter.from_iterators(generators: List[Iterable[T]], repeat: bool = False, name=None)ray.util.iter.ParallelIterator[T][source]

Create a parallel iterator from a list of iterables. An iterable can be a conatiner (list, str, tuple, set, etc.), a generator, or a custom class that implements __iter__ or __getitem__.

An actor will be created for each iterable.

Examples

>>> # Create using a list of generators.
>>> from_iterators([range(100), range(100)])
>>> # Certain generators are not serializable.
>>> from_iterators([(x for x in range(100))])
... TypeError: can't pickle generator objects
>>> # So use lambda functions instead.
>>> # Lambda functions are serializable.
>>> from_iterators([lambda: (x for x in range(100))])
Parameters
  • generators (list) – A list of Python iterables or lambda functions that produce an iterable when called. We allow lambda functions since certain generators might not be serializable, but a lambda that returns it can be.

  • repeat (bool) – Whether to cycle over the iterators forever.

  • name (str) – Optional name to give the iterator.

ray.util.iter.from_actors(actors: List[ray.actor.ActorHandle], name=None)ray.util.iter.ParallelIterator[T][source]

Create a parallel iterator from an existing set of actors.

Each actor must subclass the ParallelIteratorWorker interface.

Parameters
  • actors (list) – List of actors that each implement ParallelIteratorWorker.

  • name (str) – Optional name to give the iterator.

class ray.util.iter.ParallelIterator(actor_sets: List[_ActorSet], name: str, parent_iterators: List[ParallelIterator[Any]])[source]

Bases: typing.Generic

A parallel iterator over a set of remote actors.

This can be used to iterate over a fixed set of task results (like an actor pool), or a stream of data (e.g., a fixed range of numbers, an infinite stream of RLlib rollout results).

This class is serializable and can be passed to other remote tasks and actors. However, each shard should be read from at most one process at a time.

Examples

>>> # Applying a function over items in parallel.
>>> it = ray.util.iter.from_items([1, 2, 3], num_shards=2)
... <__main__.ParallelIterator object>
>>> it = it.for_each(lambda x: x * 2).gather_sync()
... <__main__.LocalIterator object>
>>> print(list(it))
... [2, 4, 6]
>>> # Creating from generators.
>>> it = ray.util.iter.from_iterators([range(3), range(3)])
... <__main__.ParallelIterator object>
>>> print(list(it.gather_sync()))
... [0, 0, 1, 1, 2, 2]
>>> # Accessing the individual shards of an iterator.
>>> it = ray.util.iter.from_range(10, num_shards=2)
... <__main__.ParallelIterator object>
>>> it0 = it.get_shard(0)
... <__main__.LocalIterator object>
>>> print(list(it0))
... [0, 1, 2, 3, 4]
>>> it1 = it.get_shard(1)
... <__main__.LocalIterator object>
>>> print(list(it1))
... [5, 6, 7, 8, 9]
>>> # Gathering results from actors synchronously in parallel.
>>> it = ray.util.iter.from_actors(workers)
... <__main__.ParallelIterator object>
>>> it = it.batch_across_shards()
... <__main__.LocalIterator object>
>>> print(next(it))
... [worker_1_result_1, worker_2_result_1]
>>> print(next(it))
... [worker_1_result_2, worker_2_result_2]
for_each(fn: Callable[[T], U], max_concurrency=1, resources=None)ray.util.iter.ParallelIterator[U][source]

Remotely apply fn to each item in this iterator, at most max_concurrency at a time per shard.

If max_concurrency == 1 then fn will be executed serially by each shards

max_concurrency should be used to achieve a high degree of parallelism without the overhead of increasing the number of shards (which are actor based). This provides the semantic guarantee that fn(x_i) will _begin_ executing before fn(x_{i+1}) (but not necessarily finish first)

A performance note: When executing concurrently, this function maintains its own internal buffer. If num_async is n and max_concur is k then the total number of buffered objects could be up to n + k - 1

Parameters
  • fn (func) – function to apply to each item.

  • max_concurrency (int) – max number of concurrent calls to fn per shard. If 0, then apply all operations concurrently.

  • resources (dict) – resources that the function requires to execute. This has the same default as ray.remote and is only used when max_concurrency > 1.

Returns

a parallel iterator whose elements have fn applied.

Return type

ParallelIterator[U]

Examples

>>> next(from_range(4).for_each(
            lambda x: x * 2,
            max_concur=2,
            resources={"num_cpus": 0.1}).gather_sync()
    )
... [0, 2, 4, 8]
filter(fn: Callable[[T], bool])ray.util.iter.ParallelIterator[T][source]

Remotely filter items from this iterator.

Parameters

fn (func) – returns False for items to drop from the iterator.

Examples

>>> it = from_items([0, 1, 2]).filter(lambda x: x > 0)
>>> next(it.gather_sync())
... [1, 2]
batch(n: int)ray.util.iter.ParallelIterator[List[T]][source]

Remotely batch together items in this iterator.

Parameters

n (int) – Number of items to batch together.

Examples

>>> next(from_range(10, 1).batch(4).gather_sync())
... [0, 1, 2, 3]
flatten() → ParallelIterator[T[0]][source]

Flatten batches of items into individual items.

Examples

>>> next(from_range(10, 1).batch(4).flatten())
... 0
combine(fn: Callable[[T], List[U]])ray.util.iter.ParallelIterator[U][source]

Transform and then combine items horizontally.

This is the equivalent of for_each(fn).flatten() (flat map).

local_shuffle(shuffle_buffer_size: int, seed: int = None)ray.util.iter.ParallelIterator[T][source]

Remotely shuffle items of each shard independently

Parameters
  • shuffle_buffer_size (int) – The algorithm fills a buffer with shuffle_buffer_size elements and randomly samples elements from this buffer, replacing the selected elements with new elements. For perfect shuffling, this argument should be greater than or equal to the largest iterator size.

  • seed (int) – Seed to use for randomness. Default value is None.

Returns

A ParallelIterator with a local shuffle applied on the base iterator

Examples

>>> it = from_range(10, 1).local_shuffle(shuffle_buffer_size=2)
>>> it = it.gather_sync()
>>> next(it)
0
>>> next(it)
2
>>> next(it)
3
>>> next(it)
1
repartition(num_partitions: int, batch_ms: int = 0)ray.util.iter.ParallelIterator[T][source]

Returns a new ParallelIterator instance with num_partitions shards.

The new iterator contains the same data in this instance except with num_partitions shards. The data is split in round-robin fashion for the new ParallelIterator.

Parameters
  • num_partitions (int) – The number of shards to use for the new ParallelIterator

  • batch_ms (int) – Batches items for batch_ms milliseconds on each shard before retrieving it. Increasing batch_ms increases latency but improves throughput.

Returns

A ParallelIterator with num_partitions number of shards and the data of this ParallelIterator split round-robin among the new number of shards.

Examples

>>> it = from_range(8, 2)
>>> it = it.repartition(3)
>>> list(it.get_shard(0))
[0, 4, 3, 7]
>>> list(it.get_shard(1))
[1, 5]
>>> list(it.get_shard(2))
[2, 6]
gather_sync()ray.util.iter.LocalIterator[T][source]

Returns a local iterable for synchronous iteration.

New items will be fetched from the shards on-demand as the iterator is stepped through.

This is the equivalent of batch_across_shards().flatten().

Examples

>>> it = from_range(100, 1).gather_sync()
>>> next(it)
... 0
>>> next(it)
... 1
>>> next(it)
... 2
batch_across_shards()ray.util.iter.LocalIterator[List[T]][source]

Iterate over the results of multiple shards in parallel.

Examples

>>> it = from_iterators([range(3), range(3)])
>>> next(it.batch_across_shards())
... [0, 0]
gather_async(batch_ms=0, num_async=1)ray.util.iter.LocalIterator[T][source]

Returns a local iterable for asynchronous iteration.

New items will be fetched from the shards asynchronously as soon as the previous one is computed. Items arrive in non-deterministic order.

Parameters
  • batch_ms (int) – Batches items for batch_ms milliseconds on each shard before retrieving it. Increasing batch_ms increases latency but improves throughput. If this value is 0, then items are returned immediately.

  • num_async (int) – The max number of async requests in flight per actor. Increasing this improves the amount of pipeline parallelism in the iterator.

Examples

>>> it = from_range(100, 1).gather_async()
>>> next(it)
... 3
>>> next(it)
... 0
>>> next(it)
... 1
take(n: int) → List[T][source]

Return up to the first n items from this iterator.

show(n: int = 20)[source]

Print up to the first n items from this iterator.

union(other: ray.util.iter.ParallelIterator[T])ray.util.iter.ParallelIterator[T][source]

Return an iterator that is the union of this and the other.

select_shards(shards_to_keep: List[int])ray.util.iter.ParallelIterator[T][source]

Return a child iterator that only iterates over given shards.

It is the user’s responsibility to ensure child iterators are operating over disjoint sub-sets of this iterator’s shards.

num_shards() → int[source]

Return the number of worker actors backing this iterator.

shards() → List[ray.util.iter.LocalIterator[T]][source]

Return the list of all shards.

get_shard(shard_index: int, batch_ms: int = 0, num_async: int = 1)ray.util.iter.LocalIterator[T][source]

Return a local iterator for the given shard.

The iterator is guaranteed to be serializable and can be passed to remote tasks or actors.

Parameters
  • shard_index (int) – Index of the shard to gather.

  • batch_ms (int) – Batches items for batch_ms milliseconds before retrieving it. Increasing batch_ms increases latency but improves throughput. If this value is 0, then items are returned immediately.

  • num_async (int) – The max number of requests in flight. Increasing this improves the amount of pipeline parallelism in the iterator.

class ray.util.iter.LocalIterator(base_iterator: Callable[], Iterable[T]], shared_metrics: ray.util.iter_metrics.SharedMetrics, local_transforms: List[Callable[[Iterable], Any]] = None, timeout: int = None, name=None)[source]

Bases: typing.Generic

An iterator over a single shard of data.

It implements similar transformations as ParallelIterator[T], but the transforms will be applied locally and not remotely in parallel.

This class is serializable and can be passed to other remote tasks and actors. However, it should be read from at most one process at a time.

static get_metrics() → ray.util.iter_metrics.MetricsContext[source]

Return the current metrics context.

This can only be called within an iterator function.

shuffle(shuffle_buffer_size: int, seed: int = None)ray.util.iter.LocalIterator[T][source]

Shuffle items of this iterator

Parameters
  • shuffle_buffer_size (int) – The algorithm fills a buffer with shuffle_buffer_size elements and randomly samples elements from this buffer, replacing the selected elements with new elements. For perfect shuffling, this argument should be greater than or equal to the largest iterator size.

  • seed (int) – Seed to use for randomness. Default value is None.

Returns

A new LocalIterator with shuffling applied

take(n: int) → List[T][source]

Return up to the first n items from this iterator.

show(n: int = 20)[source]

Print up to the first n items from this iterator.

duplicate(n) → List[ray.util.iter.LocalIterator[T]][source]

Copy this iterator n times, duplicating the data.

The child iterators will be prioritized by how much of the parent stream they have consumed. That is, we will not allow children to fall behind, since that can cause infinite memory buildup in this operator.

Returns

child iterators that each have a copy

of the data of this iterator.

Return type

List[LocalIterator[T]]

union(*others: ray.util.iter.LocalIterator[T], deterministic: bool = False, round_robin_weights: List[float] = None)ray.util.iter.LocalIterator[T][source]

Return an iterator that is the union of this and the others.

Parameters
  • deterministic (bool) – If deterministic=True, we alternate between reading from one iterator and the others. Otherwise we return items from iterators as they become ready.

  • round_robin_weights (list) – List of weights to use for round robin mode. For example, [2, 1] will cause the iterator to pull twice as many items from the first iterator as the second. [2, 1, “*”] will cause as many items to be pulled as possible from the third iterator without blocking. This overrides the deterministic flag.

class ray.util.iter.ParallelIteratorWorker(item_generator: Any, repeat: bool)[source]

Bases: object

Worker actor for a ParallelIterator.

Actors that are passed to iter.from_actors() must subclass this interface.

par_iter_init(transforms)[source]

Implements ParallelIterator worker init.

par_iter_next()[source]

Implements ParallelIterator worker item fetch.

par_iter_next_batch(batch_ms: int)[source]

Batches par_iter_next.

par_iter_slice(step: int, start: int)[source]

Iterates in increments of step starting from start.

par_iter_slice_batch(step: int, start: int, batch_ms: int)[source]

Batches par_iter_slice.