import asyncio
import atexit
import collections
from copy import copy
import inspect
import logging
import random
import re
import time
import yaml
import json
from dataclasses import dataclass
from functools import wraps
from typing import (
Any,
Callable,
Dict,
Optional,
TextIO,
Tuple,
Type,
Union,
List,
Iterable,
overload,
)
from fastapi import APIRouter, FastAPI
from ray.exceptions import RayActorError
from ray.experimental.dag.class_node import ClassNode
from ray.experimental.dag.function_node import FunctionNode
from starlette.requests import Request
from uvicorn.config import Config
from uvicorn.lifespan.on import LifespanOn
from ray.actor import ActorHandle
from ray.serve.common import (
DeploymentInfo,
DeploymentStatus,
DeploymentStatusInfo,
ReplicaTag,
)
from ray.serve.config import (
AutoscalingConfig,
DeploymentConfig,
HTTPOptions,
ReplicaConfig,
)
from ray.serve.constants import (
DEFAULT_CHECKPOINT_PATH,
HTTP_PROXY_TIMEOUT,
SERVE_CONTROLLER_NAME,
MAX_CACHED_HANDLES,
CONTROLLER_MAX_CONCURRENCY,
DEFAULT_HTTP_HOST,
DEFAULT_HTTP_PORT,
)
from ray.serve.controller import ServeController
from ray.serve.exceptions import RayServeException
from ray.experimental.dag import DAGNode
from ray.serve.handle import RayServeHandle, RayServeSyncHandle
from ray.serve.http_util import ASGIHTTPSender, make_fastapi_class_based_view
from ray.serve.utils import (
LoggingContext,
ensure_serialization_context,
format_actor_name,
get_current_node_resource_key,
get_random_letters,
get_deployment_import_path,
logger,
DEFAULT,
)
from ray.util.annotations import PublicAPI
import ray
from ray import cloudpickle
from ray.serve.schema import (
RayActorOptionsSchema,
DeploymentSchema,
DeploymentStatusSchema,
ServeApplicationSchema,
ServeApplicationStatusSchema,
)
_INTERNAL_REPLICA_CONTEXT = None
_global_client: "Client" = None
_UUID_RE = re.compile(
"[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89aAbB][a-f0-9]{3}-[a-f0-9]{12}"
)
# The polling interval for serve client to wait to deployment state
_CLIENT_POLLING_INTERVAL_S: float = 1
def _get_controller_namespace(
detached: bool, _override_controller_namespace: Optional[str] = None
):
"""Gets the controller's namespace.
Args:
detached (bool): Whether serve.start() was called with detached=True
_override_controller_namespace (Optional[str]): When set, this is the
controller's namespace
"""
if _override_controller_namespace is not None:
return _override_controller_namespace
controller_namespace = ray.get_runtime_context().namespace
if not detached:
return controller_namespace
# Start controller in "serve" namespace if detached and currently
# in anonymous namespace.
if _UUID_RE.fullmatch(controller_namespace) is not None:
controller_namespace = "serve"
return controller_namespace
def internal_get_global_client(
_override_controller_namespace: Optional[str] = None,
_health_check_controller: bool = False,
) -> "Client":
"""Gets the global client, which stores the controller's handle.
Args:
_override_controller_namespace (Optional[str]): If None and there's no
cached client, searches for the controller in this namespace.
_health_check_controller (bool): If True, run a health check on the
cached controller if it exists. If the check fails, try reconnecting
to the controller.
"""
try:
if _global_client is not None:
if _health_check_controller:
ray.get(_global_client._controller.check_alive.remote())
return _global_client
except RayActorError:
logger.info("The cached controller has died. Reconnecting.")
_set_global_client(None)
return _connect(_override_controller_namespace=_override_controller_namespace)
def _set_global_client(client):
global _global_client
_global_client = client
@dataclass
class ReplicaContext:
"""Stores data for Serve API calls from within deployments."""
deployment: str
replica_tag: ReplicaTag
_internal_controller_name: str
_internal_controller_namespace: str
servable_object: Callable
def _set_internal_replica_context(
deployment: str,
replica_tag: ReplicaTag,
controller_name: str,
controller_namespace: str,
servable_object: Callable,
):
global _INTERNAL_REPLICA_CONTEXT
_INTERNAL_REPLICA_CONTEXT = ReplicaContext(
deployment, replica_tag, controller_name, controller_namespace, servable_object
)
def _ensure_connected(f: Callable) -> Callable:
@wraps(f)
def check(self, *args, **kwargs):
if self._shutdown:
raise RayServeException("Client has already been shut down.")
return f(self, *args, **kwargs)
return check
class Client:
def __init__(
self,
controller: ActorHandle,
controller_name: str,
detached: bool = False,
_override_controller_namespace: Optional[str] = None,
):
self._controller: ServeController = controller
self._controller_name = controller_name
self._detached = detached
self._override_controller_namespace = _override_controller_namespace
self._shutdown = False
self._http_config: HTTPOptions = ray.get(controller.get_http_config.remote())
self._root_url = ray.get(controller.get_root_url.remote())
self._checkpoint_path = ray.get(controller.get_checkpoint_path.remote())
# Each handle has the overhead of long poll client, therefore cached.
self.handle_cache = dict()
self._evicted_handle_keys = set()
# NOTE(edoakes): Need this because the shutdown order isn't guaranteed
# when the interpreter is exiting so we can't rely on __del__ (it
# throws a nasty stacktrace).
if not self._detached:
def shutdown_serve_client():
self.shutdown()
atexit.register(shutdown_serve_client)
@property
def root_url(self):
return self._root_url
@property
def http_config(self):
return self._http_config
@property
def checkpoint_path(self):
return self._checkpoint_path
def __del__(self):
if not self._detached:
logger.debug(
"Shutting down Ray Serve because client went out of "
"scope. To prevent this, either keep a reference to "
"the client or use serve.start(detached=True)."
)
self.shutdown()
def __reduce__(self):
raise RayServeException(("Ray Serve client cannot be serialized."))
def shutdown(self) -> None:
"""Completely shut down the connected Serve instance.
Shuts down all processes and deletes all state associated with the
instance.
"""
if ray.is_initialized() and not self._shutdown:
ray.get(self._controller.shutdown.remote())
self._wait_for_deployments_shutdown()
ray.kill(self._controller, no_restart=True)
# Wait for the named actor entry gets removed as well.
started = time.time()
while True:
try:
controller_namespace = _get_controller_namespace(
self._detached,
self._override_controller_namespace,
)
ray.get_actor(self._controller_name, namespace=controller_namespace)
if time.time() - started > 5:
logger.warning(
"Waited 5s for Serve to shutdown gracefully but "
"the controller is still not cleaned up. "
"You can ignore this warning if you are shutting "
"down the Ray cluster."
)
break
except ValueError: # actor name is removed
break
self._shutdown = True
def _wait_for_deployments_shutdown(self, timeout_s: int = 60):
"""Waits for all deployments to be shut down and deleted.
Raises TimeoutError if this doesn't happen before timeout_s.
"""
start = time.time()
while time.time() - start < timeout_s:
statuses = ray.get(self._controller.get_deployment_statuses.remote())
if len(statuses) == 0:
break
else:
logger.debug(
f"Waiting for shutdown, {len(statuses)} deployments still alive."
)
time.sleep(_CLIENT_POLLING_INTERVAL_S)
else:
live_names = list(statuses.keys())
raise TimeoutError(
f"Shutdown didn't complete after {timeout_s}s. "
f"Deployments still alive: {live_names}."
)
def _wait_for_deployment_healthy(self, name: str, timeout_s: int = -1):
"""Waits for the named deployment to enter "HEALTHY" status.
Raises RuntimeError if the deployment enters the "UNHEALTHY" status
instead.
Raises TimeoutError if this doesn't happen before timeout_s.
"""
start = time.time()
while time.time() - start < timeout_s or timeout_s < 0:
statuses = ray.get(self._controller.get_deployment_statuses.remote())
try:
status = statuses[name]
except KeyError:
raise RuntimeError(
f"Waiting for deployment {name} to be HEALTHY, "
"but deployment doesn't exist."
) from None
if status.status == DeploymentStatus.HEALTHY:
break
elif status.status == DeploymentStatus.UNHEALTHY:
raise RuntimeError(f"Deployment {name} is UNHEALTHY: {status.message}")
else:
# Guard against new unhandled statuses being added.
assert status.status == DeploymentStatus.UPDATING
logger.debug(
f"Waiting for {name} to be healthy, current status: {status.status}."
)
time.sleep(_CLIENT_POLLING_INTERVAL_S)
else:
raise TimeoutError(
f"Deployment {name} did not become HEALTHY after {timeout_s}s."
)
def _wait_for_deployment_deleted(self, name: str, timeout_s: int = 60):
"""Waits for the named deployment to be shut down and deleted.
Raises TimeoutError if this doesn't happen before timeout_s.
"""
start = time.time()
while time.time() - start < timeout_s:
statuses = ray.get(self._controller.get_deployment_statuses.remote())
if name not in statuses:
break
else:
curr_status = statuses[name].status
logger.debug(
f"Waiting for {name} to be deleted, current status: {curr_status}."
)
time.sleep(_CLIENT_POLLING_INTERVAL_S)
else:
raise TimeoutError(f"Deployment {name} wasn't deleted after {timeout_s}s.")
@_ensure_connected
def deploy(
self,
name: str,
deployment_def: Union[Callable, Type[Callable], str],
init_args: Tuple[Any],
init_kwargs: Dict[Any, Any],
ray_actor_options: Optional[Dict] = None,
config: Optional[Union[DeploymentConfig, Dict[str, Any]]] = None,
version: Optional[str] = None,
prev_version: Optional[str] = None,
route_prefix: Optional[str] = None,
url: Optional[str] = None,
_blocking: Optional[bool] = True,
):
controller_deploy_args = self.get_deploy_args(
name=name,
deployment_def=deployment_def,
init_args=init_args,
init_kwargs=init_kwargs,
ray_actor_options=ray_actor_options,
config=config,
version=version,
prev_version=prev_version,
route_prefix=route_prefix,
)
updating = ray.get(self._controller.deploy.remote(**controller_deploy_args))
tag = self.log_deployment_update_status(name, version, updating)
if _blocking:
self._wait_for_deployment_healthy(name)
self.log_deployment_ready(name, version, url, tag)
@_ensure_connected
def deploy_group(self, deployments: List[Dict], _blocking: bool = True):
deployment_args_list = []
for deployment in deployments:
deployment_args_list.append(
self.get_deploy_args(
deployment["name"],
deployment["func_or_class"],
deployment["init_args"],
deployment["init_kwargs"],
ray_actor_options=deployment["ray_actor_options"],
config=deployment["config"],
version=deployment["version"],
prev_version=deployment["prev_version"],
route_prefix=deployment["route_prefix"],
)
)
updating_list = ray.get(
self._controller.deploy_group.remote(deployment_args_list)
)
tags = []
for i, updating in enumerate(updating_list):
deployment = deployments[i]
name, version = deployment["name"], deployment["version"]
tags.append(self.log_deployment_update_status(name, version, updating))
for i, deployment in enumerate(deployments):
name = deployment["name"]
url = deployment["url"]
if _blocking:
self._wait_for_deployment_healthy(name)
self.log_deployment_ready(name, version, url, tags[i])
@_ensure_connected
def delete_deployments(self, names: Iterable[str], blocking: bool = True) -> None:
ray.get(self._controller.delete_deployments.remote(names))
if blocking:
for name in names:
self._wait_for_deployment_deleted(name)
@_ensure_connected
def get_deployment_info(self, name: str) -> Tuple[DeploymentInfo, str]:
return ray.get(self._controller.get_deployment_info.remote(name))
@_ensure_connected
def list_deployments(self) -> Dict[str, Tuple[DeploymentInfo, str]]:
return ray.get(self._controller.list_deployments.remote())
@_ensure_connected
def get_deployment_statuses(self) -> Dict[str, DeploymentStatusInfo]:
return ray.get(self._controller.get_deployment_statuses.remote())
@_ensure_connected
def get_handle(
self,
deployment_name: str,
missing_ok: Optional[bool] = False,
sync: bool = True,
_internal_pickled_http_request: bool = False,
) -> Union[RayServeHandle, RayServeSyncHandle]:
"""Retrieve RayServeHandle for service deployment to invoke it from Python.
Args:
deployment_name (str): A registered service deployment.
missing_ok (bool): If true, then Serve won't check the deployment
is registered. False by default.
sync (bool): If true, then Serve will return a ServeHandle that
works everywhere. Otherwise, Serve will return a ServeHandle
that's only usable in asyncio loop.
Returns:
RayServeHandle
"""
cache_key = (deployment_name, missing_ok, sync)
if cache_key in self.handle_cache:
cached_handle = self.handle_cache[cache_key]
if cached_handle.is_polling and cached_handle.is_same_loop:
return cached_handle
all_endpoints = ray.get(self._controller.get_all_endpoints.remote())
if not missing_ok and deployment_name not in all_endpoints:
raise KeyError(f"Deployment '{deployment_name}' does not exist.")
try:
asyncio_loop_running = asyncio.get_event_loop().is_running()
except RuntimeError as ex:
if "There is no current event loop in thread" in str(ex):
asyncio_loop_running = False
else:
raise ex
if asyncio_loop_running and sync:
logger.warning(
"You are retrieving a sync handle inside an asyncio loop. "
"Try getting client.get_handle(.., sync=False) to get better "
"performance. Learn more at https://docs.ray.io/en/master/"
"serve/http-servehandle.html#sync-and-async-handles"
)
if not asyncio_loop_running and not sync:
logger.warning(
"You are retrieving an async handle outside an asyncio loop. "
"You should make sure client.get_handle is called inside a "
"running event loop. Or call client.get_handle(.., sync=True) "
"to create sync handle. Learn more at https://docs.ray.io/en/"
"master/serve/http-servehandle.html#sync-and-async-handles"
)
if sync:
handle = RayServeSyncHandle(
self._controller,
deployment_name,
_internal_pickled_http_request=_internal_pickled_http_request,
)
else:
handle = RayServeHandle(
self._controller,
deployment_name,
_internal_pickled_http_request=_internal_pickled_http_request,
)
self.handle_cache[cache_key] = handle
if cache_key in self._evicted_handle_keys:
logger.warning(
"You just got a ServeHandle that was evicted from internal "
"cache. This means you are getting too many ServeHandles in "
"the same process, this will bring down Serve's performance. "
"Please post a github issue at "
"https://github.com/ray-project/ray/issues to let the Serve "
"team to find workaround for your use case."
)
if len(self.handle_cache) > MAX_CACHED_HANDLES:
# Perform random eviction to keep the handle cache from growing
# infinitely. We used use WeakValueDictionary but hit
# https://github.com/ray-project/ray/issues/18980.
evict_key = random.choice(list(self.handle_cache.keys()))
self._evicted_handle_keys.add(evict_key)
self.handle_cache.pop(evict_key)
return handle
@_ensure_connected
def get_deploy_args(
self,
name: str,
deployment_def: Union[Callable, Type[Callable], str],
init_args: Tuple[Any],
init_kwargs: Dict[Any, Any],
ray_actor_options: Optional[Dict] = None,
config: Optional[Union[DeploymentConfig, Dict[str, Any]]] = None,
version: Optional[str] = None,
prev_version: Optional[str] = None,
route_prefix: Optional[str] = None,
) -> Dict:
"""
Takes a deployment's configuration, and returns the arguments needed
for the controller to deploy it.
"""
if config is None:
config = {}
if ray_actor_options is None:
ray_actor_options = {}
curr_job_env = ray.get_runtime_context().runtime_env
if "runtime_env" in ray_actor_options:
# It is illegal to set field working_dir to None.
if curr_job_env.get("working_dir") is not None:
ray_actor_options["runtime_env"].setdefault(
"working_dir", curr_job_env.get("working_dir")
)
else:
ray_actor_options["runtime_env"] = curr_job_env
replica_config = ReplicaConfig(
deployment_def,
init_args=init_args,
init_kwargs=init_kwargs,
ray_actor_options=ray_actor_options,
)
if isinstance(config, dict):
deployment_config = DeploymentConfig.parse_obj(config)
elif isinstance(config, DeploymentConfig):
deployment_config = config
else:
raise TypeError("config must be a DeploymentConfig or a dictionary.")
if (
deployment_config.autoscaling_config is not None
and deployment_config.max_concurrent_queries
< deployment_config.autoscaling_config.target_num_ongoing_requests_per_replica # noqa: E501
):
logger.warning(
"Autoscaling will never happen, "
"because 'max_concurrent_queries' is less than "
"'target_num_ongoing_requests_per_replica' now."
)
controller_deploy_args = {
"name": name,
"deployment_config_proto_bytes": deployment_config.to_proto_bytes(),
"replica_config": replica_config,
"version": version,
"prev_version": prev_version,
"route_prefix": route_prefix,
"deployer_job_id": ray.get_runtime_context().job_id,
}
return controller_deploy_args
@_ensure_connected
def log_deployment_update_status(
self, name: str, version: str, updating: bool
) -> str:
tag = f"component=serve deployment={name}"
if updating:
msg = f"Updating deployment '{name}'"
if version is not None:
msg += f" to version '{version}'"
logger.info(f"{msg}. {tag}")
else:
logger.info(
f"Deployment '{name}' is already at version "
f"'{version}', not updating. {tag}"
)
return tag
@_ensure_connected
def log_deployment_ready(self, name: str, version: str, url: str, tag: str) -> None:
if url is not None:
url_part = f" at `{url}`"
else:
url_part = ""
logger.info(
f"Deployment '{name}{':'+version if version else ''}' is ready"
f"{url_part}. {tag}"
)
def _check_http_and_checkpoint_options(
client: Client,
http_options: Union[dict, HTTPOptions],
checkpoint_path: str,
) -> None:
if checkpoint_path and checkpoint_path != client.checkpoint_path:
logger.warning(
f"The new client checkpoint path '{checkpoint_path}' "
f"is different from the existing one '{client.checkpoint_path}'. "
"The new checkpoint path is ignored."
)
if http_options:
client_http_options = client.http_config
new_http_options = (
http_options
if isinstance(http_options, HTTPOptions)
else HTTPOptions.parse_obj(http_options)
)
different_fields = []
all_http_option_fields = new_http_options.__dict__
for field in all_http_option_fields:
if getattr(new_http_options, field) != getattr(client_http_options, field):
different_fields.append(field)
if len(different_fields):
logger.warning(
"The new client HTTP config differs from the existing one "
f"in the following fields: {different_fields}. "
"The new HTTP config is ignored."
)
[docs]@PublicAPI(stability="beta")
def start(
detached: bool = False,
http_options: Optional[Union[dict, HTTPOptions]] = None,
dedicated_cpu: bool = False,
_checkpoint_path: str = DEFAULT_CHECKPOINT_PATH,
_override_controller_namespace: Optional[str] = None,
**kwargs,
) -> Client:
"""Initialize a serve instance.
By default, the instance will be scoped to the lifetime of the returned
Client object (or when the script exits). If detached is set to True, the
instance will instead persist until serve.shutdown() is called. This is
only relevant if connecting to a long-running Ray cluster (e.g., with
ray.init(address="auto") or ray.init("ray://<remote_addr>")).
Args:
detached (bool): Whether not the instance should be detached from this
script. If set, the instance will live on the Ray cluster until it is
explicitly stopped with serve.shutdown().
http_options (Optional[Dict, serve.HTTPOptions]): Configuration options
for HTTP proxy. You can pass in a dictionary or HTTPOptions object
with fields:
- host(str, None): Host for HTTP servers to listen on. Defaults to
"127.0.0.1". To expose Serve publicly, you probably want to set
this to "0.0.0.0".
- port(int): Port for HTTP server. Defaults to 8000.
- root_path(str): Root path to mount the serve application
(for example, "/serve"). All deployment routes will be prefixed
with this path. Defaults to "".
- middlewares(list): A list of Starlette middlewares that will be
applied to the HTTP servers in the cluster. Defaults to [].
- location(str, serve.config.DeploymentMode): The deployment
location of HTTP servers:
- "HeadOnly": start one HTTP server on the head node. Serve
assumes the head node is the node you executed serve.start
on. This is the default.
- "EveryNode": start one HTTP server per node.
- "NoServer" or None: disable HTTP server.
- num_cpus (int): The number of CPU cores to reserve for each
internal Serve HTTP proxy actor. Defaults to 0.
dedicated_cpu (bool): Whether to reserve a CPU core for the internal
Serve controller actor. Defaults to False.
"""
http_deprecated_args = ["http_host", "http_port", "http_middlewares"]
for key in http_deprecated_args:
if key in kwargs:
raise ValueError(
f"{key} is deprecated, please use serve.start(http_options="
f'{{"{key}": {kwargs[key]}}}) instead.'
)
# Initialize ray if needed.
ray.worker.global_worker.filter_logs_by_job = False
if not ray.is_initialized():
ray.init(namespace="serve")
controller_namespace = _get_controller_namespace(
detached, _override_controller_namespace=_override_controller_namespace
)
try:
client = internal_get_global_client(
_override_controller_namespace=_override_controller_namespace,
_health_check_controller=True,
)
logger.info(
"Connecting to existing Serve instance in namespace "
f"'{controller_namespace}'."
)
_check_http_and_checkpoint_options(client, http_options, _checkpoint_path)
return client
except RayServeException:
pass
if detached:
controller_name = SERVE_CONTROLLER_NAME
else:
controller_name = format_actor_name(get_random_letters(), SERVE_CONTROLLER_NAME)
if isinstance(http_options, dict):
http_options = HTTPOptions.parse_obj(http_options)
if http_options is None:
http_options = HTTPOptions()
controller = ServeController.options(
num_cpus=1 if dedicated_cpu else 0,
name=controller_name,
lifetime="detached" if detached else None,
max_restarts=-1,
max_task_retries=-1,
# Pin Serve controller on the head node.
resources={get_current_node_resource_key(): 0.01},
namespace=controller_namespace,
max_concurrency=CONTROLLER_MAX_CONCURRENCY,
).remote(
controller_name,
http_options,
_checkpoint_path,
detached=detached,
_override_controller_namespace=_override_controller_namespace,
)
proxy_handles = ray.get(controller.get_http_proxies.remote())
if len(proxy_handles) > 0:
try:
ray.get(
[handle.ready.remote() for handle in proxy_handles.values()],
timeout=HTTP_PROXY_TIMEOUT,
)
except ray.exceptions.GetTimeoutError:
raise TimeoutError(
"HTTP proxies not available after {HTTP_PROXY_TIMEOUT}s."
)
client = Client(
controller,
controller_name,
detached=detached,
_override_controller_namespace=_override_controller_namespace,
)
_set_global_client(client)
logger.info(
f"Started{' detached ' if detached else ' '}Serve instance in "
f"namespace '{controller_namespace}'."
)
return client
def _connect(_override_controller_namespace: Optional[str] = None) -> Client:
"""Connect to an existing Serve instance on this Ray cluster.
If calling from the driver program, the Serve instance on this Ray cluster
must first have been initialized using `serve.start(detached=True)`.
If called from within a replica, this will connect to the same Serve
instance that the replica is running in.
Args:
_override_controller_namespace (Optional[str]): The namespace to use
when looking for the controller. If None, Serve recalculates the
controller's namespace using _get_controller_namespace().
"""
# Initialize ray if needed.
ray.worker.global_worker.filter_logs_by_job = False
if not ray.is_initialized():
ray.init(namespace="serve")
# When running inside of a replica, _INTERNAL_REPLICA_CONTEXT is set to
# ensure that the correct instance is connected to.
if _INTERNAL_REPLICA_CONTEXT is None:
controller_name = SERVE_CONTROLLER_NAME
controller_namespace = _get_controller_namespace(
detached=True, _override_controller_namespace=_override_controller_namespace
)
else:
controller_name = _INTERNAL_REPLICA_CONTEXT._internal_controller_name
controller_namespace = _INTERNAL_REPLICA_CONTEXT._internal_controller_namespace
# Try to get serve controller if it exists
try:
controller = ray.get_actor(controller_name, namespace=controller_namespace)
except ValueError:
raise RayServeException(
"There is no "
"instance running on this Ray cluster. Please "
"call `serve.start(detached=True) to start "
"one."
)
client = Client(
controller,
controller_name,
detached=True,
_override_controller_namespace=_override_controller_namespace,
)
_set_global_client(client)
return client
[docs]@PublicAPI
def shutdown() -> None:
"""Completely shut down the connected Serve instance.
Shuts down all processes and deletes all state associated with the
instance.
"""
if _global_client is None:
return
internal_get_global_client().shutdown()
_set_global_client(None)
[docs]@PublicAPI
def get_replica_context() -> ReplicaContext:
"""If called from a deployment, returns the deployment and replica tag.
A replica tag uniquely identifies a single replica for a Ray Serve
deployment at runtime. Replica tags are of the form
`<deployment_name>#<random letters>`.
Raises:
RayServeException: if not called from within a Ray Serve deployment.
Example:
>>> serve.get_replica_context().deployment # deployment_name
>>> serve.get_replica_context().replica_tag # deployment_name#krcwoa
"""
if _INTERNAL_REPLICA_CONTEXT is None:
raise RayServeException(
"`serve.get_replica_context()` "
"may only be called from within a "
"Ray Serve deployment."
)
return _INTERNAL_REPLICA_CONTEXT
@PublicAPI(stability="beta")
def ingress(app: Union["FastAPI", "APIRouter", Callable]):
"""Mark an ASGI application ingress for Serve.
Args:
app (FastAPI,APIRouter,Starlette,etc): the app or router object serve
as ingress for this deployment. It can be any ASGI compatible
object.
Example:
>>> app = FastAPI()
>>> @serve.deployment
@serve.ingress(app)
class App:
pass
>>> App.deploy()
"""
def decorator(cls):
if not inspect.isclass(cls):
raise ValueError("@serve.ingress must be used with a class.")
if issubclass(cls, collections.abc.Callable):
raise ValueError(
"Class passed to @serve.ingress may not have __call__ method."
)
# Sometimes there are decorators on the methods. We want to fix
# the fast api routes here.
if isinstance(app, (FastAPI, APIRouter)):
make_fastapi_class_based_view(app, cls)
# Free the state of the app so subsequent modification won't affect
# this ingress deployment. We don't use copy.copy here to avoid
# recursion issue.
ensure_serialization_context()
frozen_app = cloudpickle.loads(cloudpickle.dumps(app))
class ASGIAppWrapper(cls):
async def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._serve_app = frozen_app
# Use uvicorn's lifespan handling code to properly deal with
# startup and shutdown event.
self._serve_asgi_lifespan = LifespanOn(
Config(self._serve_app, lifespan="on")
)
# Replace uvicorn logger with our own.
self._serve_asgi_lifespan.logger = logger
# LifespanOn's logger logs in INFO level thus becomes spammy
# Within this block we temporarily uplevel for cleaner logging
with LoggingContext(
self._serve_asgi_lifespan.logger, level=logging.WARNING
):
await self._serve_asgi_lifespan.startup()
async def __call__(self, request: Request):
sender = ASGIHTTPSender()
await self._serve_app(
request.scope,
request.receive,
sender,
)
return sender.build_asgi_response()
# NOTE: __del__ must be async so that we can run asgi shutdown
# in the same event loop.
async def __del__(self):
# LifespanOn's logger logs in INFO level thus becomes spammy
# Within this block we temporarily uplevel for cleaner logging
with LoggingContext(
self._serve_asgi_lifespan.logger, level=logging.WARNING
):
await self._serve_asgi_lifespan.shutdown()
# Make sure to call user's del method as well.
super_cls = super()
if hasattr(super_cls, "__del__"):
super_cls.__del__()
ASGIAppWrapper.__name__ = cls.__name__
return ASGIAppWrapper
return decorator
@PublicAPI(stability="alpha")
class RayServeDAGHandle:
"""Resolved from a DeploymentNode at runtime.
This can be used to call the DAG from a driver deployment to efficiently
orchestrate a deployment graph.
"""
def __init__(self, dag_node_json: str) -> None:
self.dag_node_json = dag_node_json
# NOTE(simon): Making this lazy to avoid deserialization in controller for now
# This would otherwise hang because it's trying to get handles from within
# the controller.
self.dag_node = None
@classmethod
def _deserialize(cls, *args):
"""Required for this class's __reduce__ method to be picklable."""
return cls(*args)
def __reduce__(self):
return RayServeDAGHandle._deserialize, (self.dag_node_json,)
def remote(self, *args, **kwargs):
from ray.serve.pipeline.json_serde import dagnode_from_json
if self.dag_node is None:
self.dag_node = json.loads(
self.dag_node_json, object_hook=dagnode_from_json
)
return self.dag_node.execute(*args, **kwargs)
@PublicAPI(stability="alpha")
class DeploymentMethodNode(DAGNode):
"""Represents a method call on a bound deployment node.
These method calls can be composed into an optimized call DAG and passed
to a "driver" deployment that will orchestrate the calls at runtime.
This class cannot be called directly. Instead, when it is bound to a
deployment node, it will be resolved to a DeployedCallGraph at runtime.
"""
# TODO (jiaodong): Later unify and refactor this with pipeline node class
pass
@PublicAPI(stability="alpha")
class DeploymentNode(ClassNode):
"""Represents a deployment with its bound config options and arguments.
The bound deployment can be run using serve.run().
A bound deployment can be passed as an argument to other bound deployments
to build a deployment graph. When the graph is deployed, the
bound deployments passed into a constructor will be converted to
RayServeHandles that can be used to send requests.
Calling deployment.method.bind() will return a DeploymentMethodNode
that can be used to compose an optimized call graph.
"""
# TODO (jiaodong): Later unify and refactor this with pipeline node class
def bind(self, *args, **kwargs):
"""Bind the default __call__ method and return a DeploymentMethodNode"""
return self.__call__.bind(*args, **kwargs)
@PublicAPI(stability="alpha")
class DeploymentFunctionNode(FunctionNode):
"""Represents a serve.deployment decorated function from user.
It's the counterpart of DeploymentNode that represents function as body
instead of class.
"""
pass
[docs]@PublicAPI
class Deployment:
def __init__(
self,
func_or_class: Union[Callable, str],
name: str,
config: DeploymentConfig,
version: Optional[str] = None,
prev_version: Optional[str] = None,
init_args: Optional[Tuple[Any]] = None,
init_kwargs: Optional[Tuple[Any]] = None,
route_prefix: Union[str, None, DEFAULT] = DEFAULT.VALUE,
ray_actor_options: Optional[Dict] = None,
_internal=False,
) -> None:
"""Construct a Deployment. CONSTRUCTOR SHOULDN'T BE USED DIRECTLY.
Deployments should be created, retrieved, and updated using
`@serve.deployment`, `serve.get_deployment`, and `Deployment.options`,
respectively.
"""
if not _internal:
raise RuntimeError(
"The Deployment constructor should not be called "
"directly. Use `@serve.deployment` instead."
)
if not callable(func_or_class) and not isinstance(func_or_class, str):
raise TypeError("@serve.deployment must be called on a class or function.")
if not isinstance(name, str):
raise TypeError("name must be a string.")
if not (version is None or isinstance(version, str)):
raise TypeError("version must be a string.")
if not (prev_version is None or isinstance(prev_version, str)):
raise TypeError("prev_version must be a string.")
if not (init_args is None or isinstance(init_args, (tuple, list))):
raise TypeError("init_args must be a tuple.")
if not (init_kwargs is None or isinstance(init_kwargs, dict)):
raise TypeError("init_kwargs must be a dict.")
if route_prefix is not DEFAULT.VALUE and route_prefix is not None:
if not isinstance(route_prefix, str):
raise TypeError("route_prefix must be a string.")
if not route_prefix.startswith("/"):
raise ValueError("route_prefix must start with '/'.")
if route_prefix != "/" and route_prefix.endswith("/"):
raise ValueError(
"route_prefix must not end with '/' unless it's the root."
)
if "{" in route_prefix or "}" in route_prefix:
raise ValueError("route_prefix may not contain wildcards.")
if not (ray_actor_options is None or isinstance(ray_actor_options, dict)):
raise TypeError("ray_actor_options must be a dict.")
if init_args is None:
init_args = ()
if init_kwargs is None:
init_kwargs = {}
# TODO(architkulkarni): Enforce that autoscaling_config and
# user-provided num_replicas should be mutually exclusive.
if version is None and config.autoscaling_config is not None:
# TODO(architkulkarni): Remove this restriction.
raise ValueError(
"Currently autoscaling is only supported for "
"versioned deployments. Try @serve.deployment(version=...)."
)
self._func_or_class = func_or_class
self._name = name
self._version = version
self._prev_version = prev_version
self._config = config
self._init_args = init_args
self._init_kwargs = init_kwargs
self._route_prefix = route_prefix
self._ray_actor_options = ray_actor_options
@property
def name(self) -> str:
"""Unique name of this deployment."""
return self._name
@property
def version(self) -> Optional[str]:
"""Version of this deployment.
If None, will be redeployed every time `.deploy()` is called.
"""
return self._version
@property
def prev_version(self) -> Optional[str]:
"""Existing version of deployment to target.
If prev_version does not match with existing deployment
version, the deployment will fail to be deployed.
"""
return self._prev_version
@property
def func_or_class(self) -> Union[Callable, str]:
"""Underlying class or function that this deployment wraps."""
return self._func_or_class
@property
def num_replicas(self) -> int:
"""Current target number of replicas."""
return self._config.num_replicas
@property
def user_config(self) -> Any:
"""Current dynamic user-provided config options."""
return self._config.user_config
@property
def max_concurrent_queries(self) -> int:
"""Current max outstanding queries from each handle."""
return self._config.max_concurrent_queries
@property
def route_prefix(self) -> Optional[str]:
"""HTTP route prefix that this deployment is exposed under."""
if self._route_prefix is DEFAULT.VALUE:
return f"/{self._name}"
return self._route_prefix
@property
def ray_actor_options(self) -> Optional[Dict]:
"""Actor options such as resources required for each replica."""
return self._ray_actor_options
@property
def init_args(self) -> Tuple[Any]:
"""Positional args passed to the underlying class's constructor."""
return self._init_args
@property
def init_kwargs(self) -> Tuple[Any]:
"""Keyword args passed to the underlying class's constructor."""
return self._init_kwargs
@property
def url(self) -> Optional[str]:
"""Full HTTP url for this deployment."""
if self._route_prefix is None:
# this deployment is not exposed over HTTP
return None
return internal_get_global_client().root_url + self.route_prefix
def __call__(self):
raise RuntimeError(
"Deployments cannot be constructed directly. "
"Use `deployment.deploy() instead.`"
)
@PublicAPI(stability="alpha")
def bind(self, *args, **kwargs) -> Union[DeploymentNode, DeploymentFunctionNode]:
"""Bind the provided arguments and return a DeploymentNode.
The returned bound deployment can be deployed or bound to other
deployments to create a deployment graph.
"""
copied_self = copy(self)
copied_self._init_args = []
copied_self._init_kwargs = {}
copied_self._func_or_class = "dummpy.module"
schema_shell = deployment_to_schema(copied_self)
if inspect.isfunction(self._func_or_class):
return DeploymentFunctionNode(
self._func_or_class,
args, # Used to bind and resolve DAG only, can take user input
kwargs, # Used to bind and resolve DAG only, can take user input
self._ray_actor_options or dict(),
other_args_to_resolve={
"deployment_schema": schema_shell,
"is_from_serve_deployment": True,
},
)
else:
return DeploymentNode(
self._func_or_class,
args,
kwargs,
cls_options=self._ray_actor_options or dict(),
other_args_to_resolve={
"deployment_schema": schema_shell,
"is_from_serve_deployment": True,
},
)
[docs] @PublicAPI
def deploy(self, *init_args, _blocking=True, **init_kwargs):
"""Deploy or update this deployment.
Args:
init_args (optional): args to pass to the class __init__
method. Not valid if this deployment wraps a function.
init_kwargs (optional): kwargs to pass to the class __init__
method. Not valid if this deployment wraps a function.
"""
if len(init_args) == 0 and self._init_args is not None:
init_args = self._init_args
if len(init_kwargs) == 0 and self._init_kwargs is not None:
init_kwargs = self._init_kwargs
return internal_get_global_client().deploy(
self._name,
self._func_or_class,
init_args,
init_kwargs,
ray_actor_options=self._ray_actor_options,
config=self._config,
version=self._version,
prev_version=self._prev_version,
route_prefix=self.route_prefix,
url=self.url,
_blocking=_blocking,
)
[docs] @PublicAPI
def delete(self):
"""Delete this deployment."""
return internal_get_global_client().delete_deployments([self._name])
[docs] @PublicAPI
def get_handle(
self, sync: Optional[bool] = True
) -> Union[RayServeHandle, RayServeSyncHandle]:
"""Get a ServeHandle to this deployment to invoke it from Python.
Args:
sync (bool): If true, then Serve will return a ServeHandle that
works everywhere. Otherwise, Serve will return an
asyncio-optimized ServeHandle that's only usable in an asyncio
loop.
Returns:
ServeHandle
"""
return internal_get_global_client().get_handle(
self._name, missing_ok=True, sync=sync
)
[docs] @PublicAPI
def options(
self,
func_or_class: Optional[Callable] = None,
name: Optional[str] = None,
version: Optional[str] = None,
prev_version: Optional[str] = None,
init_args: Optional[Tuple[Any]] = None,
init_kwargs: Optional[Dict[Any, Any]] = None,
route_prefix: Union[str, None, DEFAULT] = DEFAULT.VALUE,
num_replicas: Optional[int] = None,
ray_actor_options: Optional[Dict] = None,
user_config: Optional[Any] = None,
max_concurrent_queries: Optional[int] = None,
_autoscaling_config: Optional[Union[Dict, AutoscalingConfig]] = None,
_graceful_shutdown_wait_loop_s: Optional[float] = None,
_graceful_shutdown_timeout_s: Optional[float] = None,
_health_check_period_s: Optional[float] = None,
_health_check_timeout_s: Optional[float] = None,
) -> "Deployment":
"""Return a copy of this deployment with updated options.
Only those options passed in will be updated, all others will remain
unchanged from the existing deployment.
"""
new_config = self._config.copy()
if num_replicas is not None:
new_config.num_replicas = num_replicas
if user_config is not None:
new_config.user_config = user_config
if max_concurrent_queries is not None:
new_config.max_concurrent_queries = max_concurrent_queries
if func_or_class is None:
func_or_class = self._func_or_class
if name is None:
name = self._name
if version is None:
version = self._version
if prev_version is None:
prev_version = self._prev_version
if init_args is None:
init_args = self._init_args
if init_kwargs is None:
init_kwargs = self._init_kwargs
if route_prefix is DEFAULT.VALUE:
# Default is to keep the previous value
route_prefix = self._route_prefix
if ray_actor_options is None:
ray_actor_options = self._ray_actor_options
if _autoscaling_config is not None:
new_config.autoscaling_config = _autoscaling_config
if _graceful_shutdown_wait_loop_s is not None:
new_config.graceful_shutdown_wait_loop_s = _graceful_shutdown_wait_loop_s
if _graceful_shutdown_timeout_s is not None:
new_config.graceful_shutdown_timeout_s = _graceful_shutdown_timeout_s
if _health_check_period_s is not None:
new_config.health_check_period_s = _health_check_period_s
if _health_check_timeout_s is not None:
new_config.health_check_timeout_s = _health_check_timeout_s
return Deployment(
func_or_class,
name,
new_config,
version=version,
prev_version=prev_version,
init_args=init_args,
init_kwargs=init_kwargs,
route_prefix=route_prefix,
ray_actor_options=ray_actor_options,
_internal=True,
)
@PublicAPI(stability="alpha")
def set_options(
self,
func_or_class: Optional[Callable] = None,
name: Optional[str] = None,
version: Optional[str] = None,
prev_version: Optional[str] = None,
init_args: Optional[Tuple[Any]] = None,
init_kwargs: Optional[Dict[Any, Any]] = None,
route_prefix: Union[str, None, DEFAULT] = DEFAULT.VALUE,
num_replicas: Optional[int] = None,
ray_actor_options: Optional[Dict] = None,
user_config: Optional[Any] = None,
max_concurrent_queries: Optional[int] = None,
_autoscaling_config: Optional[Union[Dict, AutoscalingConfig]] = None,
_graceful_shutdown_wait_loop_s: Optional[float] = None,
_graceful_shutdown_timeout_s: Optional[float] = None,
_health_check_period_s: Optional[float] = None,
_health_check_timeout_s: Optional[float] = None,
) -> None:
"""Overwrite this deployment's options. Mutates the deployment.
Only those options passed in will be updated, all others will remain
unchanged.
"""
validated = self.options(
func_or_class=func_or_class,
name=name,
version=version,
prev_version=prev_version,
init_args=init_args,
init_kwargs=init_kwargs,
route_prefix=route_prefix,
num_replicas=num_replicas,
ray_actor_options=ray_actor_options,
user_config=user_config,
max_concurrent_queries=max_concurrent_queries,
_autoscaling_config=_autoscaling_config,
_graceful_shutdown_wait_loop_s=_graceful_shutdown_wait_loop_s,
_graceful_shutdown_timeout_s=_graceful_shutdown_timeout_s,
_health_check_period_s=_health_check_period_s,
_health_check_timeout_s=_health_check_timeout_s,
)
self._func_or_class = validated._func_or_class
self._name = validated._name
self._version = validated._version
self._prev_version = validated._prev_version
self._init_args = validated._init_args
self._init_kwargs = validated._init_kwargs
self._route_prefix = validated._route_prefix
self._ray_actor_options = validated._ray_actor_options
self._config = validated._config
def __eq__(self, other):
return all(
[
self._name == other._name,
self._version == other._version,
self._config == other._config,
self._init_args == other._init_args,
self._init_kwargs == other._init_kwargs,
# compare route prefix with default value resolved
self.route_prefix == other.route_prefix,
self._ray_actor_options == self._ray_actor_options,
]
)
def __str__(self):
return (
f"Deployment(name={self._name},"
f"version={self._version},"
f"route_prefix={self.route_prefix})"
)
def __repr__(self):
return str(self)
@overload
def deployment(func_or_class: Callable) -> Deployment:
pass
@overload
def deployment(
name: Optional[str] = None,
version: Optional[str] = None,
prev_version: Optional[str] = None,
num_replicas: Optional[int] = None,
init_args: Optional[Tuple[Any]] = None,
init_kwargs: Optional[Dict[Any, Any]] = None,
route_prefix: Union[str, None, DEFAULT] = DEFAULT.VALUE,
ray_actor_options: Optional[Dict] = None,
user_config: Optional[Any] = None,
max_concurrent_queries: Optional[int] = None,
_autoscaling_config: Optional[Union[Dict, AutoscalingConfig]] = None,
_graceful_shutdown_wait_loop_s: Optional[float] = None,
_graceful_shutdown_timeout_s: Optional[float] = None,
_health_check_period_s: Optional[float] = None,
_health_check_timeout_s: Optional[float] = None,
) -> Callable[[Callable], Deployment]:
pass
[docs]@PublicAPI
def deployment(
_func_or_class: Optional[Callable] = None,
name: Optional[str] = None,
version: Optional[str] = None,
prev_version: Optional[str] = None,
num_replicas: Optional[int] = None,
init_args: Optional[Tuple[Any]] = None,
init_kwargs: Optional[Dict[Any, Any]] = None,
route_prefix: Union[str, None, DEFAULT] = DEFAULT.VALUE,
ray_actor_options: Optional[Dict] = None,
user_config: Optional[Any] = None,
max_concurrent_queries: Optional[int] = None,
_autoscaling_config: Optional[Union[Dict, AutoscalingConfig]] = None,
_graceful_shutdown_wait_loop_s: Optional[float] = None,
_graceful_shutdown_timeout_s: Optional[float] = None,
_health_check_period_s: Optional[float] = None,
_health_check_timeout_s: Optional[float] = None,
) -> Callable[[Callable], Deployment]:
"""Define a Serve deployment.
Args:
name (Optional[str]): Globally-unique name identifying this deployment.
If not provided, the name of the class or function will be used.
version (Optional[str]): Version of the deployment. This is used to
indicate a code change for the deployment; when it is re-deployed
with a version change, a rolling update of the replicas will be
performed. If not provided, every deployment will be treated as a
new version.
prev_version (Optional[str]): Version of the existing deployment which
is used as a precondition for the next deployment. If prev_version
does not match with the existing deployment's version, the
deployment will fail. If not provided, deployment procedure will
not check the existing deployment's version.
num_replicas (Optional[int]): The number of processes to start up that
will handle requests to this deployment. Defaults to 1.
init_args (Optional[Tuple]): Positional args to be passed to the class
constructor when starting up deployment replicas. These can also be
passed when you call `.deploy()` on the returned Deployment.
init_kwargs (Optional[Dict]): Keyword args to be passed to the class
constructor when starting up deployment replicas. These can also be
passed when you call `.deploy()` on the returned Deployment.
route_prefix (Optional[str]): Requests to paths under this HTTP path
prefix will be routed to this deployment. Defaults to '/{name}'.
When set to 'None', no HTTP endpoint will be created.
Routing is done based on longest-prefix match, so if you have
deployment A with a prefix of '/a' and deployment B with a prefix
of '/a/b', requests to '/a', '/a/', and '/a/c' go to A and requests
to '/a/b', '/a/b/', and '/a/b/c' go to B. Routes must not end with
a '/' unless they're the root (just '/'), which acts as a
catch-all.
ray_actor_options (dict): Options to be passed to the Ray actor
constructor such as resource requirements.
user_config (Optional[Any]): [experimental] Config to pass to the
reconfigure method of the deployment. This can be updated
dynamically without changing the version of the deployment and
restarting its replicas. The user_config needs to be hashable to
keep track of updates, so it must only contain hashable types, or
hashable types nested in lists and dictionaries.
max_concurrent_queries (Optional[int]): The maximum number of queries
that will be sent to a replica of this deployment without receiving
a response. Defaults to 100.
Example:
>>> @serve.deployment(name="deployment1", version="v1")
class MyDeployment:
pass
>>> MyDeployment.deploy(*init_args)
>>> MyDeployment.options(num_replicas=2, init_args=init_args).deploy()
Returns:
Deployment
"""
if num_replicas is not None and _autoscaling_config is not None:
raise ValueError(
"Manually setting num_replicas is not allowed when "
"_autoscaling_config is provided."
)
config = DeploymentConfig()
if num_replicas is not None:
config.num_replicas = num_replicas
if user_config is not None:
config.user_config = user_config
if max_concurrent_queries is not None:
config.max_concurrent_queries = max_concurrent_queries
if _autoscaling_config is not None:
config.autoscaling_config = _autoscaling_config
if _graceful_shutdown_wait_loop_s is not None:
config.graceful_shutdown_wait_loop_s = _graceful_shutdown_wait_loop_s
if _graceful_shutdown_timeout_s is not None:
config.graceful_shutdown_timeout_s = _graceful_shutdown_timeout_s
if _health_check_period_s is not None:
config.health_check_period_s = _health_check_period_s
if _health_check_timeout_s is not None:
config.health_check_timeout_s = _health_check_timeout_s
def decorator(_func_or_class):
return Deployment(
_func_or_class,
name if name is not None else _func_or_class.__name__,
config,
version=version,
prev_version=prev_version,
init_args=init_args,
init_kwargs=init_kwargs,
route_prefix=route_prefix,
ray_actor_options=ray_actor_options,
_internal=True,
)
# This handles both parametrized and non-parametrized usage of the
# decorator. See the @serve.batch code for more details.
return decorator(_func_or_class) if callable(_func_or_class) else decorator
[docs]@PublicAPI
def get_deployment(name: str) -> Deployment:
"""Dynamically fetch a handle to a Deployment object.
This can be used to update and redeploy a deployment without access to
the original definition.
Example:
>>> MyDeployment = serve.get_deployment("name")
>>> MyDeployment.options(num_replicas=10).deploy()
Args:
name(str): name of the deployment. This must have already been
deployed.
Returns:
Deployment
"""
try:
(
deployment_info,
route_prefix,
) = internal_get_global_client().get_deployment_info(name)
except KeyError:
raise KeyError(
f"Deployment {name} was not found. Did you call Deployment.deploy()?"
)
return Deployment(
cloudpickle.loads(deployment_info.replica_config.serialized_deployment_def),
name,
deployment_info.deployment_config,
version=deployment_info.version,
init_args=deployment_info.replica_config.init_args,
init_kwargs=deployment_info.replica_config.init_kwargs,
route_prefix=route_prefix,
ray_actor_options=deployment_info.replica_config.ray_actor_options,
_internal=True,
)
[docs]@PublicAPI
def list_deployments() -> Dict[str, Deployment]:
"""Returns a dictionary of all active deployments.
Dictionary maps deployment name to Deployment objects.
"""
infos = internal_get_global_client().list_deployments()
deployments = {}
for name, (deployment_info, route_prefix) in infos.items():
deployments[name] = Deployment(
cloudpickle.loads(deployment_info.replica_config.serialized_deployment_def),
name,
deployment_info.deployment_config,
version=deployment_info.version,
init_args=deployment_info.replica_config.init_args,
init_kwargs=deployment_info.replica_config.init_kwargs,
route_prefix=route_prefix,
ray_actor_options=deployment_info.replica_config.ray_actor_options,
_internal=True,
)
return deployments
def get_deployment_statuses() -> Dict[str, DeploymentStatusInfo]:
"""Returns a dictionary of deployment statuses.
A deployment's status is one of {UPDATING, UNHEALTHY, and HEALTHY}.
Example:
>>> statuses = get_deployment_statuses()
>>> status_info = statuses["deployment_name"]
>>> status = status_info.status
>>> message = status_info.message
Returns:
Dict[str, DeploymentStatus]: This dictionary maps the running
deployment's name to a DeploymentStatus object containing its
status and a message explaining the status.
"""
return internal_get_global_client().get_deployment_statuses()
class ImmutableDeploymentDict(dict):
def __init__(self, deployments: Dict[str, Deployment]):
super().__init__()
self.update(deployments)
def __setitem__(self, *args):
"""Not allowed. Modify deployment options using set_options instead."""
raise RuntimeError(
"Setting deployments in a built app is not allowed. Modify the "
'options using app.deployments["deployment"].set_options instead.'
)
class Application:
"""A static, pre-built Serve application.
An application consists of a number of Serve deployments that can send
requests to each other. One of the deployments acts as the "ingress,"
meaning that it receives external traffic and is the entrypoint to the
application.
The ingress deployment can be accessed via app.ingress and a dictionary of
all deployments can be accessed via app.deployments.
The config options of each deployment can be modified using set_options:
app.deployments["name"].set_options(...).
This application object can be written to a config file and later deployed
to production using the Serve CLI or REST API.
"""
def __init__(self, deployments: List[Deployment]):
deployment_dict = {}
for d in deployments:
if not isinstance(d, Deployment):
raise TypeError(f"Got {type(d)}. Expected deployment.")
elif d.name in deployment_dict:
raise ValueError(f"App got multiple deployments named '{d.name}'.")
deployment_dict[d.name] = d
self._deployments = ImmutableDeploymentDict(deployment_dict)
@property
def deployments(self) -> ImmutableDeploymentDict:
return self._deployments
@property
def ingress(self) -> Optional[Deployment]:
"""Gets the app's ingress, if one exists.
The ingress is the single deployment with a non-None route prefix. If more
or less than one deployment has a route prefix, no single ingress exists,
so returns None.
"""
ingress = None
for deployment in self._deployments.values():
if deployment.route_prefix is not None:
if ingress is None:
ingress = deployment
else:
return None
return ingress
def to_dict(self) -> Dict:
"""Returns this Application's deployments as a dictionary.
This dictionary adheres to the Serve REST API schema. It can be deployed
via the Serve REST API.
Returns:
Dict: The Application's deployments formatted in a dictionary.
"""
return ServeApplicationSchema(
deployments=[deployment_to_schema(d) for d in self._deployments.values()]
).dict()
@classmethod
def from_dict(cls, d: Dict) -> "Application":
"""Converts a dictionary of deployment data to an application.
Takes in a dictionary matching the Serve REST API schema and converts
it to an application containing those deployments.
Args:
d (Dict): A dictionary containing the deployments' data that matches
the Serve REST API schema.
Returns:
Application: a new application object containing the deployments.
"""
schema = ServeApplicationSchema.parse_obj(d)
return cls([schema_to_deployment(s) for s in schema.deployments])
def to_yaml(self, f: Optional[TextIO] = None) -> Optional[str]:
"""Returns this application's deployments as a YAML string.
Optionally writes the YAML string to a file as well. To write to a
file, use this pattern:
with open("file_name.txt", "w") as f:
app.to_yaml(f=f)
This file is formatted as a Serve YAML config file. It can be deployed
via the Serve CLI.
Args:
f (Optional[TextIO]): A pointer to the file where the YAML should
be written.
Returns:
Optional[String]: The deployments' YAML string. The output is from
yaml.safe_dump(). Returned only if no file pointer is passed in.
"""
return yaml.safe_dump(
self.to_dict(), stream=f, default_flow_style=False, sort_keys=False
)
@classmethod
def from_yaml(cls, str_or_file: Union[str, TextIO]) -> "Application":
"""Converts YAML data to deployments for an application.
Takes in a string or a file pointer to a file containing deployment
definitions in YAML. These definitions are converted to a new
application object containing the deployments.
To read from a file, use the following pattern:
with open("file_name.txt", "w") as f:
app = app.from_yaml(str_or_file)
Args:
str_or_file (Union[String, TextIO]): Either a string containing
YAML deployment definitions or a pointer to a file containing
YAML deployment definitions. The YAML format must adhere to the
ServeApplicationSchema JSON Schema defined in
ray.serve.schema. This function works with
Serve YAML config files.
Returns:
Application: a new Application object containing the deployments.
"""
return cls.from_dict(yaml.safe_load(str_or_file))
@PublicAPI(stability="alpha")
def run(
target: Union[DeploymentNode, DeploymentFunctionNode, Application],
_blocking: bool = True,
*,
host: str = DEFAULT_HTTP_HOST,
port: int = DEFAULT_HTTP_PORT,
) -> RayServeHandle:
"""Run a Serve application and return a ServeHandle to the ingress.
Either a DeploymentNode, DeploymentFunctionNode, or a pre-built application
can be passed in. If a node is passed in, all of the deployments it depends
on will be deployed. If there is an ingress, its handle will be returned.
Args:
target (Union[DeploymentNode, DeploymentFunctionNode, Application]):
A user-built Serve Application or a DeploymentNode that acts as the
root node of DAG. By default DeploymentNode is the Driver
deployment unless user provides a customized one.
host (str): The host passed into serve.start().
port (int): The port passed into serve.start().
Returns:
RayServeHandle: A regular ray serve handle that can be called by user
to execute the serve DAG.
"""
# TODO (jiaodong): Resolve circular reference in pipeline codebase and serve
from ray.serve.pipeline.api import build as pipeline_build
from ray.serve.pipeline.api import get_and_validate_ingress_deployment
client = start(detached=True, http_options={"host": host, "port": port})
if isinstance(target, Application):
deployments = list(target.deployments.values())
ingress = target.ingress
# Each DAG should always provide a valid Driver DeploymentNode
elif isinstance(target, DeploymentNode):
deployments = pipeline_build(target)
ingress = get_and_validate_ingress_deployment(deployments)
# Special case where user is doing single function serve.run(func.bind())
elif isinstance(target, DeploymentFunctionNode):
deployments = pipeline_build(target)
ingress = get_and_validate_ingress_deployment(deployments)
if len(deployments) != 1:
raise ValueError(
"We only support single function node in serve.run, ex: "
"serve.run(func.bind()). For more than one nodes in your DAG, "
"Please provide a driver class and bind it as entrypoint to "
"your Serve DAG."
)
elif isinstance(target, DAGNode):
raise ValueError(
"Invalid DAGNode type as entry to serve.run(), "
f"type: {type(target)}, accepted: DeploymentNode, "
"DeploymentFunctionNode please provide a driver class and bind it "
"as entrypoint to your Serve DAG."
)
else:
raise TypeError(
"Expected a DeploymentNode, DeploymentFunctionNode, or "
"Application as target. Got unexpected type "
f'"{type(target)}" instead.'
)
parameter_group = []
for deployment in deployments:
deployment_parameters = {
"name": deployment._name,
"func_or_class": deployment._func_or_class,
"init_args": deployment.init_args,
"init_kwargs": deployment.init_kwargs,
"ray_actor_options": deployment._ray_actor_options,
"config": deployment._config,
"version": deployment._version,
"prev_version": deployment._prev_version,
"route_prefix": deployment.route_prefix,
"url": deployment.url,
}
parameter_group.append(deployment_parameters)
client.deploy_group(parameter_group, _blocking=_blocking)
if ingress is not None:
return ingress.get_handle()
def build(target: Union[DeploymentNode, DeploymentFunctionNode]) -> Application:
"""Builds a Serve application into a static application.
Takes in a DeploymentNode and converts it to a Serve application
consisting of one or more deployments. This is intended to be used for
production scenarios and deployed via the Serve REST API or CLI, so there
are some restrictions placed on the deployments:
1) All of the deployments must be importable. That is, they cannot be
defined in __main__ or inline defined. The deployments will be
imported in production using the same import path they were here.
2) All arguments bound to the deployment must be JSON-serializable.
The returned Application object can be exported to a dictionary or YAML
config.
"""
# TODO(edoakes): this should accept host and port, but we don't
# currently support them in the REST API.
raise NotImplementedError()
def deployment_to_schema(d: Deployment) -> DeploymentSchema:
"""Converts a live deployment object to a corresponding structured schema.
If the deployment has a class or function, it will be attemptetd to be
converted to a valid corresponding import path.
init_args and init_kwargs must also be JSON-serializable or this call will
fail.
"""
if d.ray_actor_options is not None:
ray_actor_options_schema = RayActorOptionsSchema.parse_obj(d.ray_actor_options)
else:
ray_actor_options_schema = None
return DeploymentSchema(
name=d.name,
import_path=get_deployment_import_path(d),
init_args=d.init_args,
init_kwargs=d.init_kwargs,
num_replicas=d.num_replicas,
route_prefix=d.route_prefix,
max_concurrent_queries=d.max_concurrent_queries,
user_config=d.user_config,
autoscaling_config=d._config.autoscaling_config,
graceful_shutdown_wait_loop_s=d._config.graceful_shutdown_wait_loop_s,
graceful_shutdown_timeout_s=d._config.graceful_shutdown_timeout_s,
health_check_period_s=d._config.health_check_period_s,
health_check_timeout_s=d._config.health_check_timeout_s,
ray_actor_options=ray_actor_options_schema,
)
def schema_to_deployment(s: DeploymentSchema) -> Deployment:
from ray.serve.pipeline.json_serde import convert_from_json_safe_obj
if s.ray_actor_options is None:
ray_actor_options = None
else:
ray_actor_options = s.ray_actor_options.dict(exclude_unset=True)
return deployment(
name=s.name,
init_args=convert_from_json_safe_obj(s.init_args),
init_kwargs=convert_from_json_safe_obj(s.init_kwargs),
num_replicas=s.num_replicas,
route_prefix=s.route_prefix,
max_concurrent_queries=s.max_concurrent_queries,
user_config=convert_from_json_safe_obj(s.user_config),
_autoscaling_config=s.autoscaling_config,
_graceful_shutdown_wait_loop_s=s.graceful_shutdown_wait_loop_s,
_graceful_shutdown_timeout_s=s.graceful_shutdown_timeout_s,
_health_check_period_s=s.health_check_period_s,
_health_check_timeout_s=s.health_check_timeout_s,
ray_actor_options=ray_actor_options,
)(s.import_path)
def serve_application_to_schema(
deployments: List[Deployment],
) -> ServeApplicationSchema:
schemas = [deployment_to_schema(d) for d in deployments]
return ServeApplicationSchema(deployments=schemas)
def schema_to_serve_application(schema: ServeApplicationSchema) -> List[Deployment]:
return [schema_to_deployment(s) for s in schema.deployments]
def status_info_to_schema(
deployment_name: str, status_info: Union[DeploymentStatusInfo, Dict]
) -> DeploymentStatusSchema:
if isinstance(status_info, DeploymentStatusInfo):
return DeploymentStatusSchema(
name=deployment_name, status=status_info.status, message=status_info.message
)
elif isinstance(status_info, dict):
return DeploymentStatusSchema(
name=deployment_name,
status=status_info["status"],
message=status_info["message"],
)
else:
raise TypeError(
f"Got {type(status_info)} as status_info's "
"type. Expected status_info to be either a "
"DeploymentStatusInfo or a dictionary."
)
def serve_application_status_to_schema(
status_infos: Dict[str, Union[DeploymentStatusInfo, Dict]]
) -> ServeApplicationStatusSchema:
schemas = [
status_info_to_schema(deployment_name, status_info)
for deployment_name, status_info in status_infos.items()
]
return ServeApplicationStatusSchema(statuses=schemas)