Ray Generators#
Python generators are functions that behave like iterators, yielding one value per iteration. Ray also supports the generators API.
Any generator function decorated with ray.remote
becomes a Ray generator task.
Generator tasks stream outputs back to the caller before the task finishes.
+import ray
import time
# Takes 25 seconds to finish.
[email protected]
def f():
for i in range(5):
time.sleep(5)
yield i
-for obj in f():
+for obj_ref in f.remote():
# Prints every 5 seconds and stops after 25 seconds.
- print(obj)
+ print(ray.get(obj_ref))
The above Ray generator yields the output every 5 seconds 5 times.
With a normal Ray task, you have to wait 25 seconds to access the output.
With a Ray generator, the caller can access the object reference
before the task f
finishes.
The Ray generator is useful when
You want to reduce heap memory or object store memory usage by yielding and garbage collecting (GC) the output before the task finishes.
You are familiar with the Python generator and want the equivalent programming models.
Ray libraries use the Ray generator to support streaming use cases
Ray Serve uses Ray generators to support streaming responses.
Ray Data is a streaming data processing library, which uses Ray generators to control and reduce concurrent memory usages.
Ray generator works with existing Ray APIs seamlessly
You can use Ray generators in both actor and non-actor tasks.
Ray generators work with all actor execution models, including threaded actors and async actors.
Ray generators work with built-in fault tolerance features such as retry or lineage reconstruction.
Ray generators work with Ray APIs such as ray.wait, ray.cancel, etc.
Getting started#
Define a Python generator function and decorate it with ray.remote
to create a Ray generator.
import ray
import time
@ray.remote
def task():
for i in range(5):
time.sleep(5)
yield i
The Ray generator task returns an ObjectRefGenerator
object, which is
compatible with generator and async generator APIs. You can access the
next
, __iter__
, __anext__
, __aiter__
APIs from the class.
Whenever a task invokes yield
, a corresponding output is ready and available from a generator as a Ray object reference.
You can call next(gen)
to obtain an object reference.
If next
has no more items to generate, it raises StopIteration
. If __anext__
has no more items to generate, it raises
StopAsyncIteration
The next
API blocks the thread until the task generates a next object reference with yield
.
Since the ObjectRefGenerator
is just a Python generator, you can also use a for loop to
iterate object references.
If you want to avoid blocking a thread, you can either use asyncio or ray.wait API.
gen = task.remote()
# Blocks for 5 seconds.
ref = next(gen)
# return 0
ray.get(ref)
# Blocks for 5 seconds.
ref = next(gen)
# Return 1
ray.get(ref)
# Returns 2~4 every 5 seconds.
for ref in gen:
print(ray.get(ref))
Note
For a normal Python generator, a generator function is paused and resumed when next
function is
called on a generator. Ray eagerly executes a generator task to completion regardless of whether the caller is polling the partial results or not.
Error handling#
If a generator task has a failure (by an application exception or system error such as an unexpected node failure),
the next(gen)
returns an object reference that contains an exception. When you call ray.get
,
Ray raises the exception.
@ray.remote
def task():
for i in range(5):
time.sleep(1)
if i == 1:
raise ValueError
yield i
gen = task.remote()
# it's okay.
ray.get(next(gen))
# Raises an exception
try:
ray.get(next(gen))
except ValueError as e:
print(f"Exception is raised when i == 1 as expected {e}")
In the above example, if the an application fails the task, Ray returns the object reference with an exception
in a correct order. For example, if Ray raises the exception after the second yield, the third
next(gen)
returns an object reference with an exception all the time. If a system error fails the task,
(e.g., a node failure or worker process failure), next(gen)
returns the object reference that contains the system level exception
at any time without an ordering guarantee.
It means when you have N yields, the generator can create from 1 to N + 1 object references
(N output + ref with a system-level exception) when there failures occur.
Generator from Actor Tasks#
The Ray generator is compatible with all actor execution models. It seamlessly works with regular actors, async actors, and threaded actors.
@ray.remote
class Actor:
def f(self):
for i in range(5):
yield i
@ray.remote
class AsyncActor:
async def f(self):
for i in range(5):
yield i
@ray.remote(max_concurrency=5)
class ThreadedActor:
def f(self):
for i in range(5):
yield i
actor = Actor.remote()
for ref in actor.f.remote():
print(ray.get(ref))
actor = AsyncActor.remote()
for ref in actor.f.remote():
print(ray.get(ref))
actor = ThreadedActor.remote()
for ref in actor.f.remote():
print(ray.get(ref))
Using the Ray generator with asyncio#
The returned ObjectRefGenerator
is also compatible with asyncio. You can
use __anext__
or async for
loops.
import asyncio
@ray.remote
def task():
for i in range(5):
time.sleep(1)
yield i
async def main():
async for ref in task.remote():
print(await ref)
asyncio.run(main())
Garbage collection of object references#
The returned ref from next(generator)
is just a regular Ray object reference and is distributed ref counted in the same way.
If references are not consumed from a generator by the next
API, references are garbage collected (GC’ed) when the generator is GC’ed.
@ray.remote
def task():
for i in range(5):
time.sleep(1)
yield i
gen = task.remote()
ref1 = next(gen)
del gen
In the following example, Ray counts ref1
as a normal Ray object reference after Ray returns it. Other references
that aren’t consumed with next(gen)
are removed when the generator is GC’ed. In this example, garbage collection happens when you call del gen
.
Fault tolerance#
Fault tolerance features work with Ray generator tasks and actor tasks. For example;
Task fault tolerance features:
max_retries
,retry_exceptions
Actor fault tolerance features:
max_restarts
,max_task_retries
Object fault tolerance features: object reconstruction
Cancellation#
The ray.cancel()
function works with both Ray generator tasks and actor tasks.
Semantic-wise, cancelling a generator task isn’t different from cancelling a regular task.
When you cancel a task, next(gen)
can return the reference that contains TaskCancelledError
without any special ordering guarantee.
How to wait for generator without blocking a thread (compatibility to ray.wait and ray.get)#
When using a generator, next
API blocks its thread until a next object reference is available.
However, you may not want this behavior all the time. You may want to wait for a generator without blocking a thread.
Unblocking wait is possible with the Ray generator in the following ways:
Wait until a generator task completes
ObjectRefGenerator
has an API completed
. It returns an object reference that is available when a generator task finishes or errors.
For example, you can do ray.get(<generator_instance>.completed())
to wait until a task completes. Note that using ray.get
to ObjectRefGenerator
isn’t allowed.
Use asyncio and await
ObjectRefGenerator
is compatible with asyncio. You can create multiple asyncio tasks that create a generator task
and wait for it to avoid blocking a thread.
import asyncio
@ray.remote
def task():
for i in range(5):
time.sleep(1)
yield i
async def async_task():
async for ref in task.remote():
print(await ref)
async def main():
t1 = async_task()
t2 = async_task()
await asyncio.gather(t1, t2)
asyncio.run(main())
Use ray.wait
You can pass ObjectRefGenerator
as an input to ray.wait
. The generator is “ready” if a next item
is available. Once Ray finds from a ready list, next(gen)
returns the next object reference immediately without blocking. See the example below for more details.
@ray.remote
def task():
for i in range(5):
time.sleep(5)
yield i
gen = task.remote()
# Because it takes 5 seconds to make the first yield,
# with 0 timeout, the generator is unready.
ready, unready = ray.wait([gen], timeout=0)
print("timeout 0, nothing is ready.")
print(ready)
assert len(ready) == 0
assert len(unready) == 1
# Without a timeout argument, ray.wait waits until the given argument
# is ready. When a next item is ready, it returns.
ready, unready = ray.wait([gen])
print("Wait for 5 seconds. The next item is ready.")
assert len(ready) == 1
assert len(unready) == 0
next(gen)
# Because the second yield hasn't happened yet,
ready, unready = ray.wait([gen], timeout=0)
print("Wait for 0 seconds. The next item is not ready.")
print(ready, unready)
assert len(ready) == 0
assert len(unready) == 1
All the input arguments (such as timeout
, num_returns
, and fetch_local
) from ray.wait
works with a generator.
ray.wait
can mix regular Ray object references with generators for inputs. In this case, the application should handle
all input arguments (such as timeout
, num_returns
, and fetch_local
) from ray.wait
work with generators.
from ray._raylet import ObjectRefGenerator
@ray.remote
def generator_task():
for i in range(5):
time.sleep(5)
yield i
@ray.remote
def regular_task():
for i in range(5):
time.sleep(5)
return
gen = [generator_task.remote()]
ref = [regular_task.remote()]
ready, unready = [], [*gen, *ref]
result = []
while unready:
ready, unready = ray.wait(unready)
for r in ready:
if isinstance(r, ObjectRefGenerator):
try:
ref = next(r)
result.append(ray.get(ref))
except StopIteration:
pass
else:
unready.append(r)
else:
result.append(ray.get(r))
Thread safety#
ObjectRefGenerator
object is not thread-safe.
Limitation#
Ray generators don’t support these features:
throw
,send
, andclose
APIs.return
statements from generators.Passing
ObjectRefGenerator
to another task or actor.