import asyncio
import concurrent.futures
import logging
import time
import warnings
from typing import (
Any,
AsyncIterator,
Coroutine,
Dict,
Generator,
Generic,
Iterator,
List,
Optional,
Tuple,
TypeVar,
Union,
cast,
)
import ray
from ray import serve
from ray._raylet import ObjectRefGenerator # type: ignore[attr-defined]
from ray.serve._private.common import (
OBJ_REF_NOT_SUPPORTED_ERROR,
DeploymentHandleSource,
DeploymentID,
RequestMetadata,
)
from ray.serve._private.constants import SERVE_LOGGER_NAME
from ray.serve._private.default_impl import (
CreateRouterCallable,
create_dynamic_handle_options,
create_init_handle_options,
create_router,
)
from ray.serve._private.handle_options import (
DynamicHandleOptionsBase,
InitHandleOptionsBase,
)
from ray.serve._private.replica_result import ReplicaResult
from ray.serve._private.router import Router
from ray.serve._private.usage import ServeUsageTag
from ray.serve._private.utils import (
DEFAULT,
calculate_remaining_timeout,
get_random_string,
inside_ray_client_context,
is_running_in_asyncio_loop,
)
from ray.serve.exceptions import RayServeException, RequestCancelledError
from ray.util import metrics
from ray.util.annotations import DeveloperAPI, PublicAPI
logger = logging.getLogger(SERVE_LOGGER_NAME)
# TypeVar for the deployment class type in DeploymentHandle[T]
T = TypeVar("T")
# TypeVar for the response/result type in DeploymentResponse[R]
R = TypeVar("R")
class _DeploymentHandleBase(Generic[T]):
def __init__(
self,
deployment_name: str,
app_name: str,
*,
init_options: Optional[InitHandleOptionsBase] = None,
handle_options: Optional[DynamicHandleOptionsBase] = None,
_router: Optional[Router] = None,
_create_router: Optional[CreateRouterCallable] = None,
_request_counter: Optional[metrics.Counter] = None,
_handle_id: Optional[str] = None,
):
self.deployment_id = DeploymentID(name=deployment_name, app_name=app_name)
self.init_options: Optional[InitHandleOptionsBase] = init_options
self.handle_options: DynamicHandleOptionsBase = (
handle_options or create_dynamic_handle_options()
)
# Handle ID is shared among handles that are returned by
# `handle.options` or `handle.method`
self.handle_id = _handle_id or get_random_string()
self.request_counter = _request_counter or self._create_request_counter(
app_name, deployment_name, self.handle_id
)
self._router: Optional[Router] = _router
if _create_router is None:
self._create_router = create_router
else:
self._create_router = _create_router
@staticmethod
def _gen_handle_tag(app_name: str, deployment_name: str, handle_id: str):
if app_name:
return f"{app_name}#{deployment_name}#{handle_id}"
else:
return f"{deployment_name}#{handle_id}"
@classmethod
def _create_request_counter(
cls, app_name: str, deployment_name: str, handle_id: str
):
return metrics.Counter(
"serve_handle_request_counter",
description=(
"The number of handle.remote() calls that have been "
"made on this handle."
),
tag_keys=("handle", "deployment", "route", "application"),
).set_default_tags(
{
"handle": cls._gen_handle_tag(
app_name, deployment_name, handle_id=handle_id
),
"deployment": deployment_name,
"application": app_name,
}
)
def running_replicas_populated(self) -> bool:
if self._router is None:
return False
return self._router.running_replicas_populated()
@property
def deployment_name(self) -> str:
return self.deployment_id.name
@property
def app_name(self) -> str:
return self.deployment_id.app_name
@property
def is_initialized(self) -> bool:
return self._router is not None
def _init(self, **kwargs):
"""Initialize this handle with arguments.
A handle can only be initialized once. A handle is implicitly
initialized when `.options()` or `.remote()` is called. Therefore
to initialize a handle with custom init options, you must do it
before calling `.options()` or `.remote()`.
"""
if self._router is not None:
raise RuntimeError(
"Handle has already been initialized. Note that a handle is implicitly "
"initialized when you call `.options()` or `.remote()`. You either "
"tried to call `._init()` twice or called `._init()` after calling "
"`.options()` or `.remote()`. If you want to modify the init options, "
"please do so before calling `.options()` or `.remote()`. This handle "
f"was initialized with {self.init_options}."
)
init_options = create_init_handle_options(**kwargs)
self._router = self._create_router(
handle_id=self.handle_id,
deployment_id=self.deployment_id,
handle_options=init_options,
)
self.init_options = init_options
logger.info(
f"Initialized DeploymentHandle {self.handle_id} for {self.deployment_id}.",
extra={"log_to_stderr": False},
)
# Record handle api telemetry when not in the proxy
if (
self.init_options._source != DeploymentHandleSource.PROXY
and self.__class__ == DeploymentHandle
):
ServeUsageTag.DEPLOYMENT_HANDLE_API_USED.record("1")
def _is_router_running_in_separate_loop(self) -> bool:
if self.init_options is None:
return False
return self.init_options._run_router_in_separate_loop
def _options(
self, _prefer_local_routing=DEFAULT.VALUE, **kwargs
) -> "DeploymentHandle[T]":
if kwargs.get("stream") is True and inside_ray_client_context():
raise RuntimeError(
"Streaming DeploymentHandles are not currently supported when "
"connected to a remote Ray cluster using Ray Client."
)
new_handle_options = self.handle_options.copy_and_update(**kwargs)
# TODO(zcin): remove when _prefer_local_routing is removed from options() path
if _prefer_local_routing != DEFAULT.VALUE:
self._init(_prefer_local_routing=_prefer_local_routing)
if not self.is_initialized:
self._init()
return DeploymentHandle(
self.deployment_name,
self.app_name,
init_options=self.init_options,
handle_options=new_handle_options,
_router=self._router,
_create_router=self._create_router,
_request_counter=self.request_counter,
_handle_id=self.handle_id,
)
def _remote(
self,
args: Tuple[Any],
kwargs: Dict[str, Any],
) -> Tuple[concurrent.futures.Future, RequestMetadata]:
if not self.is_initialized:
self._init()
metadata = serve._private.default_impl.get_request_metadata(
self.init_options, self.handle_options
)
self.request_counter.inc(
tags={
"route": metadata.route,
"application": metadata.app_name,
}
)
if self._router is None:
raise RuntimeError("Router is not initialized")
return self._router.assign_request(metadata, *args, **kwargs), metadata
def options(
self,
*,
method_name: Union[str, DEFAULT] = DEFAULT.VALUE,
multiplexed_model_id: Union[str, DEFAULT] = DEFAULT.VALUE,
stream: Union[bool, DEFAULT] = DEFAULT.VALUE,
use_new_handle_api: Union[bool, DEFAULT] = DEFAULT.VALUE,
_prefer_local_routing: Union[bool, DEFAULT] = DEFAULT.VALUE,
) -> "DeploymentHandle[T]":
raise NotImplementedError
def __getattr__(self, name: str) -> "DeploymentHandle[T]":
return self.options(method_name=name)
def shutdown(self):
if self._router:
shutdown_future = self._router.shutdown()
if self._is_router_running_in_separate_loop():
shutdown_future.result()
else:
logger.warning(
"Synchronously shutting down a router that's running in the same "
"event loop can only be done best effort. Please use "
"`shutdown_async` instead."
)
async def shutdown_async(self):
if self._router:
shutdown_future: Union[
asyncio.Future, concurrent.futures.Future
] = self._router.shutdown()
if self._is_router_running_in_separate_loop:
await asyncio.wrap_future(shutdown_future)
else:
await shutdown_future
def __repr__(self) -> str:
return f"{self.__class__.__name__}" f"(deployment='{self.deployment_name}')"
@classmethod
def _deserialize(cls, kwargs: Dict[str, Any]) -> "_DeploymentHandleBase[T]":
"""Required for this class's __reduce__ method to be picklable."""
return cls(**kwargs)
def __reduce__(self):
serialized_constructor_args = {
"deployment_name": self.deployment_name,
"app_name": self.app_name,
"handle_options": self.handle_options,
}
return self.__class__._deserialize, (serialized_constructor_args,)
class _DeploymentResponseBase(Generic[R]):
def __init__(
self,
replica_result_future: Union[
concurrent.futures.Future[ReplicaResult], asyncio.Future[ReplicaResult]
],
request_metadata: RequestMetadata,
_is_router_running_in_separate_loop: bool = True,
):
self._cancelled = False
self._replica_result_future = replica_result_future
self._replica_result: Optional[ReplicaResult] = None
self._request_metadata: RequestMetadata = request_metadata
self._is_router_running_in_separate_loop = _is_router_running_in_separate_loop
@property
def request_id(self) -> str:
return self._request_metadata.request_id
@property
def by_reference(self) -> bool:
return self._request_metadata._by_reference
def _fetch_future_result_sync(
self, _timeout_s: Optional[float] = None
) -> ReplicaResult:
"""Synchronously fetch the replica result.
The result is cached in `self._replica_result`.
"""
if self._replica_result is None:
if not self._is_router_running_in_separate_loop:
raise RuntimeError(
"Sync methods should not be called from within an `asyncio` event "
"loop. Use `await response` instead."
)
try:
# When _is_router_running_in_separate_loop is True, the future
# is a concurrent.futures.Future (not asyncio.Future)
sync_future = cast(
concurrent.futures.Future[ReplicaResult],
self._replica_result_future,
)
self._replica_result = sync_future.result(timeout=_timeout_s)
except concurrent.futures.TimeoutError:
raise TimeoutError("Timed out resolving to ObjectRef.") from None
except concurrent.futures.CancelledError:
raise RequestCancelledError(self.request_id) from None
return self._replica_result
async def _fetch_future_result_async(self) -> ReplicaResult:
"""Asynchronously fetch replica result.
The result is cached in `self._replica_result`..
"""
if self._replica_result is None:
if self._is_router_running_in_separate_loop:
# Use `asyncio.wrap_future` so `self._replica_result_future` can be awaited
# safely from any asyncio loop.
# self._replica_result_future is a object of type concurrent.futures.Future
self._replica_result = await asyncio.wrap_future(
self._replica_result_future
)
else:
# self._replica_result_future is a object of type asyncio.Future
async_future = cast(
asyncio.Future[ReplicaResult], self._replica_result_future
)
self._replica_result = await async_future
return self._replica_result
def cancel(self):
"""Attempt to cancel the `DeploymentHandle` call.
This is best effort.
- If the request hasn't been assigned to a replica, the assignment will be
cancelled.
- If the request has been assigned to a replica, `ray.cancel` will be
called on the object ref, attempting to cancel the request and any downstream
requests it makes.
If the request is successfully cancelled, subsequent operations on the ref will
raise an exception:
- If the request was cancelled before assignment, they'll raise
`asyncio.CancelledError` (or a `concurrent.futures.CancelledError` for
synchronous methods like `.result()`.).
- If the request was cancelled after assignment, they'll raise
`ray.exceptions.TaskCancelledError`.
"""
if self._cancelled:
return
self._cancelled = True
self._replica_result_future.cancel()
if not self._is_router_running_in_separate_loop:
# Given that there is a event loop running, we can't call sync methods.
# Hence optimistically cancel the replica result future and replica result.
if self._replica_result:
self._replica_result.cancel()
return
try:
# try to fetch the results synchronously. if it succeeds,
# we will explicitly cancel the replica result. if it fails,
# the request is already cancelled and we can return early.
self._fetch_future_result_sync()
except RequestCancelledError:
# request is already cancelled nothing to do here
return
self._replica_result.cancel()
@DeveloperAPI
def cancelled(self) -> bool:
"""Whether or not the request has been cancelled.
This is `True` if `.cancel()` is called, but the request may actually have run
to completion.
"""
return self._cancelled
[docs]
@PublicAPI(stability="stable")
class DeploymentResponse(_DeploymentResponseBase[R]):
"""A future-like object wrapping the result of a unary deployment handle call.
From inside a deployment, a `DeploymentResponse` can be awaited to retrieve the
output of the call without blocking the asyncio event loop.
From outside a deployment, `.result()` can be used to retrieve the output in a
blocking manner.
Example:
.. code-block:: python
from ray import serve
from ray.serve.handle import DeploymentHandle
@serve.deployment
class Downstream:
def say_hi(self, message: str) -> str:
return f"Hello {message}!"
@serve.deployment
class Caller:
def __init__(self, handle: DeploymentHandle):
self._downstream_handle = handle
async def __call__(self, message: str) -> str:
# Inside a deployment: `await` the result to enable concurrency.
response = self._downstream_handle.say_hi.remote(message)
return await response
app = Caller.bind(Downstream.bind())
handle: DeploymentHandle = serve.run(app)
# Outside a deployment: call `.result()` to get output.
response = handle.remote("world")
assert response.result() == "Hello world!"
A `DeploymentResponse` can be passed directly to another `DeploymentHandle` call
without fetching the result to enable composing multiple deployments together.
Example:
.. code-block:: python
from ray import serve
from ray.serve.handle import DeploymentHandle
@serve.deployment
class Adder:
def add(self, val: int) -> int:
return val + 1
@serve.deployment
class Caller:
def __init__(self, handle: DeploymentHandle):
self._adder_handle = handle
async def __call__(self, start: int) -> int:
return await self._adder_handle.add.remote(
# Pass the response directly to another handle call without awaiting.
self._adder_handle.add.remote(start)
)
app = Caller.bind(Adder.bind())
handle: DeploymentHandle = serve.run(app)
assert handle.remote(0).result() == 2
"""
def __await__(self) -> Generator[Any, None, R]:
"""Yields the final result of the deployment handle call."""
try:
replica_result = yield from self._fetch_future_result_async().__await__()
result = yield from replica_result.get_async().__await__()
return result
except asyncio.CancelledError:
if self._cancelled:
raise RequestCancelledError(self.request_id) from None
else:
raise asyncio.CancelledError from None
def __reduce__(self):
raise RayServeException(
"`DeploymentResponse` is not serializable. If you are passing the "
"`DeploymentResponse` in a nested object (e.g. a list or dictionary) to a "
"downstream deployment handle call, that is no longer supported. Please "
"only pass `DeploymentResponse` objects as top level arguments."
)
[docs]
def result(
self,
*,
timeout_s: Optional[float] = None,
_skip_asyncio_check: bool = False,
) -> R:
"""Fetch the result of the handle call synchronously.
This should *not* be used from within a deployment as it runs in an asyncio
event loop. For model composition, `await` the response instead.
If `timeout_s` is provided and the result is not available before the timeout,
a `TimeoutError` is raised.
"""
if not _skip_asyncio_check and is_running_in_asyncio_loop():
raise RuntimeError(
"Sync methods should not be called from within an `asyncio` event "
"loop. Use `await response` instead."
)
start_time_s = time.time()
replica_result = self._fetch_future_result_sync(timeout_s)
remaining_timeout_s = calculate_remaining_timeout(
timeout_s=timeout_s, start_time_s=start_time_s, curr_time_s=time.time()
)
return replica_result.get(remaining_timeout_s)
@DeveloperAPI
async def _to_object_ref(self) -> ray.ObjectRef:
"""Advanced API to convert the response to a Ray `ObjectRef`.
This is used to pass the output of a `DeploymentHandle` call to a Ray task or
actor method call.
This method is `async def` because it will block until the handle call has been
assigned to a replica. If there are many requests in flight and all
replicas' queues are full, this may be a slow operation.
"""
ServeUsageTag.DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED.record("1")
if not self._request_metadata._by_reference:
raise OBJ_REF_NOT_SUPPORTED_ERROR
replica_result = await self._fetch_future_result_async()
return await replica_result.to_object_ref_async()
@DeveloperAPI
def _to_object_ref_sync(
self,
_timeout_s: Optional[float] = None,
_allow_running_in_asyncio_loop: bool = False,
) -> ray.ObjectRef:
"""Advanced API to convert the response to a Ray `ObjectRef`.
This is used to pass the output of a `DeploymentHandle` call to a Ray task or
actor method call.
This method is a *blocking* call because it will block until the handle call has
been assigned to a replica. If there are many requests in flight and all
replicas' queues are full, this may be a slow operation.
From inside a deployment, `_to_object_ref` should be used instead to avoid
blocking the asyncio event loop.
"""
ServeUsageTag.DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED.record("1")
if not self._request_metadata._by_reference:
raise OBJ_REF_NOT_SUPPORTED_ERROR
if not _allow_running_in_asyncio_loop and is_running_in_asyncio_loop():
raise RuntimeError(
"Sync methods should not be called from within an `asyncio` event "
"loop. Use `await response._to_object_ref()` instead."
)
# First, fetch the result of the future
start_time_s = time.time()
replica_result = self._fetch_future_result_sync(_timeout_s)
# Then, if necessary, resolve generator to ref
remaining_timeout_s = calculate_remaining_timeout(
timeout_s=_timeout_s,
start_time_s=start_time_s,
curr_time_s=time.time(),
)
return replica_result.to_object_ref(timeout_s=remaining_timeout_s)
[docs]
@PublicAPI(stability="stable")
class DeploymentResponseGenerator(_DeploymentResponseBase[R]):
"""A future-like object wrapping the result of a streaming deployment handle call.
This is returned when using `handle.options(stream=True)` and calling a generator
deployment method.
`DeploymentResponseGenerator` is both a synchronous and asynchronous iterator.
When iterating over results from inside a deployment, `async for` should be used to
avoid blocking the asyncio event loop.
When iterating over results from outside a deployment, use a standard `for` loop.
Example:
.. code-block:: python
from typing import AsyncGenerator, Generator
from ray import serve
from ray.serve.handle import DeploymentHandle
@serve.deployment
class Streamer:
def generate_numbers(self, limit: int) -> Generator[int]:
for i in range(limit):
yield i
@serve.deployment
class Caller:
def __init__(self, handle: DeploymentHandle):
# Set `stream=True` on the handle to enable streaming calls.
self._streaming_handle = handle.options(stream=True)
async def __call__(self, limit: int) -> AsyncIterator[int]:
gen: DeploymentResponseGenerator = (
self._streaming_handle.generate_numbers.remote(limit)
)
# Inside a deployment: use `async for` to enable concurrency.
async for i in gen:
yield i
app = Caller.bind(Streamer.bind())
handle: DeploymentHandle = serve.run(app)
# Outside a deployment: use a standard `for` loop.
gen: DeploymentResponseGenerator = handle.options(stream=True).remote(10)
assert [i for i in gen] == list(range(10))
A `DeploymentResponseGenerator` *cannot* currently be passed to another
`DeploymentHandle` call.
"""
def __await__(self):
raise TypeError(
"`DeploymentResponseGenerator` cannot be awaited directly. Use `async for` "
"or `await response.__anext__() instead`."
)
def __aiter__(self) -> AsyncIterator[R]:
return self
async def __anext__(self) -> R:
try:
replica_result = await self._fetch_future_result_async()
return await replica_result.__anext__()
except asyncio.CancelledError:
if self._cancelled:
raise RequestCancelledError(self.request_id) from None
else:
raise asyncio.CancelledError from None
def __iter__(self) -> Iterator[R]:
return self
def __next__(self) -> R:
if is_running_in_asyncio_loop():
raise RuntimeError(
"Sync methods should not be called from within an `asyncio` event "
"loop. Use `async for` or `await response.__anext__()` instead."
)
replica_result = self._fetch_future_result_sync()
return replica_result.__next__()
[docs]
def result(
self,
*,
timeout_s: Optional[float] = None,
_skip_asyncio_check: bool = False,
) -> Any:
"""Not supported on `DeploymentResponseGenerator`.
This method exists only for API parity with `DeploymentResponse.result()` to
aid static typing. A `DeploymentResponseGenerator` is returned when using
streaming handles (e.g., `handle.options(stream=True)`).
To consume results, iterate over the generator instead of calling `.result()`:
- Outside a deployment: use a standard `for` loop
- Inside a deployment: use `async for`
Always raises `TypeError`.
"""
raise TypeError(
"`DeploymentResponseGenerator` doesn't support `.result()`. "
"Use iteration instead: `for item in response` (outside deployments) "
"or `async for item in response` (inside deployments)."
)
@DeveloperAPI
async def _to_object_ref_gen(self) -> ObjectRefGenerator:
"""Advanced API to convert the generator to a Ray `ObjectRefGenerator`.
This method is `async def` because it will block until the handle call has been
assigned to a replica. If there are many requests in flight and all
replicas' queues are full, this may be a slow operation.
"""
ServeUsageTag.DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED.record("1")
if not self._request_metadata._by_reference:
raise OBJ_REF_NOT_SUPPORTED_ERROR
replica_result = await self._fetch_future_result_async()
return replica_result.to_object_ref_gen()
@DeveloperAPI
def _to_object_ref_gen_sync(
self,
_timeout_s: Optional[float] = None,
_allow_running_in_asyncio_loop: bool = False,
) -> ObjectRefGenerator:
"""Advanced API to convert the generator to a Ray `ObjectRefGenerator`.
This method is a *blocking* call because it will block until the handle call has
been assigned to a replica. If there are many requests in flight and all
replicas' queues are full, this may be a slow operation.
From inside a deployment, `_to_object_ref_gen` should be used instead to avoid
blocking the asyncio event loop.
"""
ServeUsageTag.DEPLOYMENT_HANDLE_TO_OBJECT_REF_API_USED.record("1")
if not self._request_metadata._by_reference:
raise OBJ_REF_NOT_SUPPORTED_ERROR
if not _allow_running_in_asyncio_loop and is_running_in_asyncio_loop():
raise RuntimeError(
"Sync methods should not be called from within an `asyncio` event "
"loop. Use `await response._to_object_ref()` instead."
)
replica_result = self._fetch_future_result_sync(_timeout_s)
return replica_result.to_object_ref_gen()
[docs]
@PublicAPI(stability="alpha")
class DeploymentBroadcastResponse:
"""Wraps the results of a broadcast call to all replicas of a deployment.
Collects results from all replicas in parallel. Results are collected
from every replica that was live at the time ``broadcast()`` was called;
replicas added after the call are not included.
Example:
.. code-block:: python
handle = serve.get_deployment_handle("MyDeployment", "app")
response = handle.broadcast("reset_cache")
results = response.results()
"""
def __init__(
self,
coro: Coroutine[Any, Any, List[ReplicaResult]],
loop: Optional[asyncio.AbstractEventLoop],
):
"""Initialize a DeploymentBroadcastResponse.
Args:
coro: Coroutine that resolves to the list of replica results
(i.e. ``Router.broadcast(...)``).
loop: The router's event loop. ``None`` in local testing mode.
"""
self._coro = coro
self._loop = loop
self._replica_results: Optional[List[ReplicaResult]] = None
# Cached future so the single-use coroutine is only scheduled once.
# A retry after timeout reuses this future instead of re-scheduling.
self._scheduled_future: Optional[concurrent.futures.Future] = None
def _ensure_scheduled(self) -> concurrent.futures.Future:
"""Schedule the coroutine on the router's loop exactly once."""
if self._scheduled_future is None:
assert self._loop is not None, "Cannot schedule without an event loop."
self._scheduled_future = asyncio.run_coroutine_threadsafe(
self._coro, self._loop
)
return self._scheduled_future
def _fetch_replica_results_sync(
self, timeout_s: Optional[float] = None
) -> List[ReplicaResult]:
if self._replica_results is None:
if self._loop is None or self._loop.is_closed():
# Local testing mode: no running event loop.
# Cache the outcome in _scheduled_future so the single-use
# coroutine is never re-entered on a subsequent call.
if self._scheduled_future is None:
cached_future: concurrent.futures.Future = (
concurrent.futures.Future()
)
tmp_loop = asyncio.new_event_loop()
try:
cached_future.set_result(
tmp_loop.run_until_complete(self._coro)
)
except Exception as e:
cached_future.set_exception(e)
finally:
tmp_loop.close()
self._scheduled_future = cached_future
self._replica_results = self._scheduled_future.result(timeout=timeout_s)
else:
self._replica_results = self._ensure_scheduled().result(
timeout=timeout_s
)
return self._replica_results
async def _fetch_replica_results_async(self) -> List[ReplicaResult]:
if self._replica_results is None:
running_loop = None
try:
running_loop = asyncio.get_running_loop()
except RuntimeError:
pass
if self._loop is not None and running_loop is self._loop:
# Already running inside the router's event loop — just await.
self._replica_results = await self._coro
elif self._loop is not None:
# Different loop (router runs in a separate thread).
self._replica_results = await asyncio.wrap_future(
self._ensure_scheduled()
)
else:
# No event loop (local testing mode).
self._replica_results = await self._coro
assert self._replica_results is not None
return self._replica_results
[docs]
def results(
self,
*,
timeout_s: Optional[float] = None,
return_exceptions: bool = False,
) -> List[Any]:
"""Fetch results from all replicas synchronously.
Returns a list of results, one per replica. The order of results is
not guaranteed.
Args:
timeout_s: Timeout in seconds. If ``None``, blocks indefinitely.
return_exceptions: If ``False`` (default), raise immediately on the
first replica exception. If ``True``, collect exceptions in the
returned list instead of raising.
Returns:
A list of results, one per replica.
Raises:
TimeoutError: If the timeout is exceeded.
"""
if is_running_in_asyncio_loop():
raise RuntimeError(
"Sync methods should not be called from within an `asyncio` event "
"loop. Use `await response.results_async()` instead."
)
if timeout_s is not None and timeout_s < 0:
raise ValueError("timeout_s must be None or non-negative.")
start_time_s = time.time()
try:
replica_results = self._fetch_replica_results_sync(timeout_s)
except concurrent.futures.TimeoutError:
raise TimeoutError("Timed out waiting for broadcast results.") from None
def _fetch_one(rr: ReplicaResult):
remaining = calculate_remaining_timeout(
timeout_s=timeout_s,
start_time_s=start_time_s,
curr_time_s=time.time(),
)
return rr.get(remaining)
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=max(1, len(replica_results))
)
futures = [executor.submit(_fetch_one, rr) for rr in replica_results]
collected = []
first_error = None
for fut in futures:
try:
collected.append(fut.result())
except Exception as e:
if return_exceptions:
collected.append(e)
else:
first_error = e
break
# Shut down without waiting so we don't block on slow replicas
# after an error has already been observed.
executor.shutdown(wait=first_error is None)
if first_error is not None:
raise first_error
return collected
[docs]
async def results_async(
self,
*,
timeout_s: Optional[float] = None,
return_exceptions: bool = False,
) -> List[Any]:
"""Fetch results from all replicas asynchronously.
Args:
timeout_s: Timeout in seconds for the full broadcast collection.
If ``None``, waits indefinitely.
return_exceptions: If ``False`` (default), raise on replica
exception. If ``True``, collect exceptions in the returned list
instead of raising.
Returns:
A list of results, one per replica.
Raises:
TimeoutError: If the timeout is exceeded.
"""
if timeout_s is not None and timeout_s < 0:
raise ValueError("timeout_s must be None or non-negative.")
start_time_s = time.time()
if timeout_s is not None:
try:
replica_results = await asyncio.wait_for(
self._fetch_replica_results_async(),
timeout=timeout_s,
)
except asyncio.TimeoutError:
raise TimeoutError("Timed out waiting for broadcast results.") from None
else:
replica_results = await self._fetch_replica_results_async()
gather_coro = asyncio.gather(
*[rr.get_async() for rr in replica_results],
return_exceptions=return_exceptions,
)
remaining_timeout_s = calculate_remaining_timeout(
timeout_s=timeout_s,
start_time_s=start_time_s,
curr_time_s=time.time(),
)
if remaining_timeout_s is not None:
try:
return list(
await asyncio.wait_for(gather_coro, timeout=remaining_timeout_s)
)
except asyncio.TimeoutError:
raise TimeoutError("Timed out waiting for broadcast results.") from None
return list(await gather_coro)
def __repr__(self) -> str:
return "DeploymentBroadcastResponse()"
[docs]
@PublicAPI(stability="stable")
class DeploymentHandle(_DeploymentHandleBase[T]):
"""A handle used to make requests to a deployment at runtime.
This is primarily used to compose multiple deployments within a single application.
It can also be used to make calls to the ingress deployment of an application (e.g.,
for programmatic testing).
Example:
.. code-block:: python
import ray
from ray import serve
from ray.serve.handle import DeploymentHandle, DeploymentResponse
@serve.deployment
class Downstream:
def say_hi(self, message: str):
return f"Hello {message}!"
self._message = message
@serve.deployment
class Ingress:
def __init__(self, handle: DeploymentHandle):
self._downstream_handle = handle
async def __call__(self, name: str) -> str:
response = self._downstream_handle.say_hi.remote(name)
return await response
app = Ingress.bind(Downstream.bind())
handle: DeploymentHandle = serve.run(app)
response = handle.remote("world")
assert response.result() == "Hello world!"
"""
[docs]
def options(
self,
*,
method_name: Union[str, DEFAULT] = DEFAULT.VALUE,
multiplexed_model_id: Union[str, DEFAULT] = DEFAULT.VALUE,
stream: Union[bool, DEFAULT] = DEFAULT.VALUE,
use_new_handle_api: Union[bool, DEFAULT] = DEFAULT.VALUE,
_prefer_local_routing: Union[bool, DEFAULT] = DEFAULT.VALUE,
_by_reference: Union[bool, DEFAULT] = DEFAULT.VALUE,
request_serialization: Union[str, DEFAULT] = DEFAULT.VALUE,
response_serialization: Union[str, DEFAULT] = DEFAULT.VALUE,
) -> "DeploymentHandle[T]":
"""Set options for this handle and return an updated copy of it.
Args:
method_name: The method name to call on the deployment.
multiplexed_model_id: The model ID to use for multiplexed model requests.
stream: Whether to use streaming for the request.
use_new_handle_api: Whether to use the new handle API.
_prefer_local_routing: Whether to prefer local routing.
_by_reference: Whether to use by reference.
request_serialization: Serialization method for RPC requests.
Available options: "cloudpickle", "pickle", "msgpack", "orjson".
Defaults to "cloudpickle".
response_serialization: Serialization method for RPC responses.
Available options: "cloudpickle", "pickle", "msgpack", "orjson".
Defaults to "cloudpickle".
Example:
.. code-block:: python
response: DeploymentResponse = handle.options(
method_name="other_method",
multiplexed_model_id="model:v1",
).remote()
"""
if use_new_handle_api is not DEFAULT.VALUE:
warnings.warn(
"Setting `use_new_handle_api` no longer has any effect. "
"This argument will be removed in a future version."
)
if _prefer_local_routing is not DEFAULT.VALUE:
warnings.warn(
"Modifying `_prefer_local_routing` with `options()` is "
"deprecated. Please use `init()` instead."
)
return self._options(
method_name=method_name,
multiplexed_model_id=multiplexed_model_id,
stream=stream,
_prefer_local_routing=_prefer_local_routing,
_by_reference=_by_reference,
request_serialization=request_serialization,
response_serialization=response_serialization,
)
[docs]
def remote(
self, *args, **kwargs
) -> Union[DeploymentResponse[Any], DeploymentResponseGenerator[Any]]:
"""Issue a remote call to a method of the deployment.
By default, the result is a `DeploymentResponse` that can be awaited to fetch
the result of the call or passed to another `.remote()` call to compose multiple
deployments.
If `handle.options(stream=True)` is set and a generator method is called, this
returns a `DeploymentResponseGenerator` instead.
Example:
.. code-block:: python
# Fetch the result directly.
response = handle.remote()
result = await response
# Pass the result to another handle call.
composed_response = handle2.remote(handle1.remote())
composed_result = await composed_response
Args:
*args: Positional arguments to be serialized and passed to the
remote method call.
**kwargs: Keyword arguments to be serialized and passed to the
remote method call.
"""
future, request_metadata = self._remote(args, kwargs)
if self.handle_options.stream:
return DeploymentResponseGenerator(
future,
request_metadata,
_is_router_running_in_separate_loop=self._is_router_running_in_separate_loop(),
)
else:
return DeploymentResponse(
future,
request_metadata,
_is_router_running_in_separate_loop=self._is_router_running_in_separate_loop(),
)
[docs]
def broadcast(
self,
method_name: str,
*args,
**kwargs,
) -> DeploymentBroadcastResponse:
"""Call a method on all replicas of this deployment in parallel.
Unlike ``remote()``, which routes the request to a single replica
via load balancing, ``broadcast()`` fans the call out to **every**
running replica concurrently.
This is useful for coordinated operations such as cache resets,
configuration updates, or state synchronization across replicas.
.. warning::
``broadcast()`` bypasses per-replica backpressure
(``max_queued_requests`` is not enforced). It is intended for
**infrequent control-plane operations** such as cache
invalidation, configuration reload, or state synchronisation
across replicas. Do not call it on the hot request path — doing
so will send one request per replica on every call, with no
rate limiting.
Example:
.. code-block:: python
handle = serve.get_deployment_handle("MyDeployment", "app")
# Call reset_cache on every replica and collect results.
response = handle.broadcast("reset_cache")
results = response.results()
# Pass arguments to the broadcast call.
response = handle.broadcast("update_config", new_value=42)
results = response.results()
Args:
method_name: The name of the method to call on each replica.
*args: Positional arguments passed to the method.
**kwargs: Keyword arguments passed to the method.
Returns:
A :class:`DeploymentBroadcastResponse` that can be used to
collect results from all replicas.
"""
if not self.is_initialized:
self._init()
metadata = serve._private.default_impl.get_request_metadata(
self.init_options,
self.handle_options.copy_and_update(
method_name=method_name,
stream=False,
),
)
if self._router is None:
raise RuntimeError("Router is not initialized")
self.request_counter.inc(
tags={
"route": metadata.route,
"application": metadata.app_name,
}
)
coro = self._router.broadcast(metadata, *args, **kwargs)
return DeploymentBroadcastResponse(coro, self._router.event_loop)