ray.serve.handle.DeploymentResponseGenerator#

class ray.serve.handle.DeploymentResponseGenerator[source]#

A future-like object wrapping the result of a streaming deployment handle call.

This is returned when using handle.options(stream=True) and calling a generator deployment method.

DeploymentResponseGenerator is both a synchronous and asynchronous iterator.

When iterating over results from inside a deployment, async for should be used to avoid blocking the asyncio event loop.

When iterating over results from outside a deployment, use a standard for loop.

Example:

from typing import AsyncGenerator, Generator

from ray import serve
from ray.serve.handle import DeploymentHandle

@serve.deployment
class Streamer:
    def generate_numbers(self, limit: int) -> Generator[int]:
        for i in range(limit):
            yield i

@serve.deployment
class Caller:
    def __init__(self, handle: DeploymentHandle):
        # Set `stream=True` on the handle to enable streaming calls.
        self._streaming_handle = handle.options(stream=True)

async def __call__(self, limit: int) -> AsyncIterator[int]:
    gen: DeploymentResponseGenerator = (
        self._streaming_handle.generate_numbers.remote(limit)
    )

    # Inside a deployment: use `async for` to enable concurrency.
    async for i in gen:
        yield i

app = Caller.bind(Streamer.bind())
handle: DeploymentHandle = serve.run(app)

# Outside a deployment: use a standard `for` loop.
gen: DeploymentResponseGenerator = handle.options(stream=True).remote(10)
assert [i for i in gen] == list(range(10))

A DeploymentResponseGenerator cannot currently be passed to another DeploymentHandle call.