Dynamic generators#

Python generators are functions that behave like iterators, yielding one value per iteration. Ray supports remote generators for two use cases:

  1. To reduce max heap memory usage when returning multiple values from a remote function. See the design pattern guide for an example.

  2. When the number of return values is set dynamically by the remote function instead of by the caller.

Remote generators can be used in both actor and non-actor tasks.

num_returns set by the task caller#

Where possible, the caller should set the remote function’s number of return values using @ray.remote(num_returns=x) or foo.options(num_returns=x).remote(). Ray will return this many ObjectRefs to the caller. The remote task should then return the same number of values, usually as a tuple or list. Compared to setting the number of return values dynamically, this adds less complexity to user code and less performance overhead, as Ray will know exactly how many ObjectRefs to return to the caller ahead of time.

Without changing the caller’s syntax, we can also use a remote generator function to yield the values iteratively. The generator should yield the same number of return values specified by the caller, and these will be stored one at a time in Ray’s object store. An error will be raised for generators that yield a different number of values from the one specified by the caller.

For example, we can swap the following code that returns a list of return values:

import numpy as np


@ray.remote
def large_values(num_returns):
    return [
        np.random.randint(np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8)
        for _ in range(num_returns)
    ]

for this code, which uses a generator function:

@ray.remote
def large_values_generator(num_returns):
    for i in range(num_returns):
        yield np.random.randint(
            np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8
        )
        print(f"yielded return value {i}")

The advantage of doing so is that the generator function does not need to hold all of its return values in memory at once. It can yield the arrays one at a time to reduce memory pressure.

num_returns set by the task executor#

In some cases, the caller may not know the number of return values to expect from a remote function. For example, suppose we want to write a task that breaks up its argument into equal-size chunks and returns these. We may not know the size of the argument until we execute the task, so we don’t know the number of return values to expect.

In these cases, we can use a remote generator function that returns a dynamic number of values. To use this feature, set num_returns="dynamic" in the @ray.remote decorator or the remote function’s .options(). Then, when invoking the remote function, Ray will return a single ObjectRef that will get populated with an DynamicObjectRefGenerator when the task completes. The DynamicObjectRefGenerator can be used to iterate over a list of ObjectRefs containing the actual values returned by the task.

import numpy as np


@ray.remote(num_returns="dynamic")
def split(array, chunk_size):
    while len(array) > 0:
        yield array[:chunk_size]
        array = array[chunk_size:]


array_ref = ray.put(np.zeros(np.random.randint(1000_000)))
block_size = 1000

# Returns an ObjectRef[DynamicObjectRefGenerator].
dynamic_ref = split.remote(array_ref, block_size)
print(dynamic_ref)
# ObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000)

i = -1
ref_generator = ray.get(dynamic_ref)
print(ref_generator)
# <ray._raylet.DynamicObjectRefGenerator object at 0x7f7e2116b290>
for i, ref in enumerate(ref_generator):
    # Each DynamicObjectRefGenerator iteration returns an ObjectRef.
    assert len(ray.get(ref)) <= block_size
num_blocks_generated = i + 1
array_size = len(ray.get(array_ref))
assert array_size <= num_blocks_generated * block_size
print(f"Split array of size {array_size} into {num_blocks_generated} blocks of "
      f"size {block_size} each.")
# Split array of size 63153 into 64 blocks of size 1000 each.

# NOTE: The dynamic_ref points to the generated ObjectRefs. Make sure that this
# ObjectRef goes out of scope so that Ray can garbage-collect the internal
# ObjectRefs.
del dynamic_ref

We can also pass the ObjectRef returned by a task with num_returns="dynamic" to another task. The task will receive the DynamicObjectRefGenerator, which it can use to iterate over the task’s return values. Similarly, you can also pass an ObjectRefGenerator as a task argument.

@ray.remote
def get_size(ref_generator : DynamicObjectRefGenerator):
    print(ref_generator)
    num_elements = 0
    for ref in ref_generator:
        array = ray.get(ref)
        assert len(array) <= block_size
        num_elements += len(array)
    return num_elements


# Returns an ObjectRef[DynamicObjectRefGenerator].
dynamic_ref = split.remote(array_ref, block_size)
assert array_size == ray.get(get_size.remote(dynamic_ref))
# (get_size pid=1504184)
# <ray._raylet.DynamicObjectRefGenerator object at 0x7f81c4250ad0>

# This also works, but should be avoided because you have to call an additional
# `ray.get`, which blocks the driver.
ref_generator = ray.get(dynamic_ref)
assert array_size == ray.get(get_size.remote(ref_generator))
# (get_size pid=1504184)
# <ray._raylet.DynamicObjectRefGenerator object at 0x7f81c4251b50>

Exception handling#

If a generator function raises an exception before yielding all its values, the values that it already stored will still be accessible through their ObjectRefs. The remaining ObjectRefs will contain the raised exception. This is true for both static and dynamic num_returns. If the task was called with num_returns="dynamic", the exception will be stored as an additional final ObjectRef in the DynamicObjectRefGenerator.

@ray.remote
def generator():
    for i in range(2):
        yield i
    raise Exception("error")


ref1, ref2, ref3, ref4 = generator.options(num_returns=4).remote()
assert ray.get([ref1, ref2]) == [0, 1]
# All remaining ObjectRefs will contain the error.
try:
    ray.get([ref3, ref4])
except Exception as error:
    print(error)

dynamic_ref = generator.options(num_returns="dynamic").remote()
ref_generator = ray.get(dynamic_ref)
ref1, ref2, ref3 = ref_generator
assert ray.get([ref1, ref2]) == [0, 1]
# Generators with num_returns="dynamic" will store the exception in the final
# ObjectRef.
try:
    ray.get(ref3)
except Exception as error:
    print(error)

Note that there is currently a known bug where exceptions will not be propagated for generators that yield more values than expected. This can occur in two cases:

  1. When num_returns is set by the caller, but the generator task returns more than this value.

  2. When a generator task with num_returns="dynamic" is re-executed, and the re-executed task yields more values than the original execution. Note that in general, Ray does not guarantee correctness for task re-execution if the task is nondeterministic, and it is recommended to set @ray.remote(num_retries=0) for such tasks.

# Generators that yield more values than expected currently do not throw an
# exception (the error is only logged).
# See https://github.com/ray-project/ray/issues/28689.
ref1, ref2 = generator.options(num_returns=2).remote()
assert ray.get([ref1, ref2]) == [0, 1]
"""
(generator pid=2375938) 2022-09-28 11:08:51,386 ERROR worker.py:755 --
    Unhandled error: Task threw exception, but all return values already
    created.  This should only occur when using generator tasks.
...
"""

Limitations#

Although a generator function creates ObjectRefs one at a time, currently Ray will not schedule dependent tasks until the entire task is complete and all values have been created. This is similar to the semantics used by tasks that return multiple values as a list.