Source code for ray.serve.api

import collections
import inspect
import logging
from typing import Any, Callable, Dict, Optional, Tuple, Union, overload

from fastapi import APIRouter, FastAPI
from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag
from starlette.requests import Request
from uvicorn.config import Config
from uvicorn.lifespan.on import LifespanOn

from ray import cloudpickle
from ray.dag import DAGNode
from ray.util.annotations import Deprecated, PublicAPI

from ray.serve.application import Application
from ray.serve._private.client import ServeControllerClient
from ray.serve.config import AutoscalingConfig, DeploymentConfig, HTTPOptions
from ray.serve._private.constants import (
    DEFAULT_HTTP_HOST,
    DEFAULT_HTTP_PORT,
    SERVE_DEFAULT_APP_NAME,
    MIGRATION_MESSAGE,
)
from ray.serve.context import (
    ReplicaContext,
    get_global_client,
    get_internal_replica_context,
    _set_global_client,
)
from ray.serve.deployment import Deployment
from ray.serve.deployment_graph import ClassNode, FunctionNode
from ray.serve._private.deployment_graph_build import build as pipeline_build
from ray.serve._private.deployment_graph_build import (
    get_and_validate_ingress_deployment,
)
from ray.serve.exceptions import RayServeException
from ray.serve.handle import RayServeHandle
from ray.serve._private.http_util import ASGIHTTPSender, make_fastapi_class_based_view
from ray.serve._private.logging_utils import LoggingContext
from ray.serve._private.utils import (
    DEFAULT,
    Default,
    ensure_serialization_context,
    in_interactive_shell,
    install_serve_encoders_to_fastapi,
    guarded_deprecation_warning,
    record_serve_tag,
)

from ray.serve._private import api as _private_api

logger = logging.getLogger(__file__)


[docs]@guarded_deprecation_warning(instructions=MIGRATION_MESSAGE) @Deprecated(message=MIGRATION_MESSAGE) def start( detached: bool = False, http_options: Optional[Union[dict, HTTPOptions]] = None, dedicated_cpu: bool = False, **kwargs, ) -> ServeControllerClient: """Initialize a serve instance. By default, the instance will be scoped to the lifetime of the returned Client object (or when the script exits). If detached is set to True, the instance will instead persist until serve.shutdown() is called. This is only relevant if connecting to a long-running Ray cluster (e.g., with ray.init(address="auto") or ray.init("ray://<remote_addr>")). Args: detached: Whether not the instance should be detached from this script. If set, the instance will live on the Ray cluster until it is explicitly stopped with serve.shutdown(). http_options (Optional[Dict, serve.HTTPOptions]): Configuration options for HTTP proxy. You can pass in a dictionary or HTTPOptions object with fields: - host: Host for HTTP servers to listen on. Defaults to "127.0.0.1". To expose Serve publicly, you probably want to set this to "0.0.0.0". - port: Port for HTTP server. Defaults to 8000. - root_path: Root path to mount the serve application (for example, "/serve"). All deployment routes will be prefixed with this path. Defaults to "". - middlewares: A list of Starlette middlewares that will be applied to the HTTP servers in the cluster. Defaults to []. - location(str, serve.config.DeploymentMode): The deployment location of HTTP servers: - "HeadOnly": start one HTTP server on the head node. Serve assumes the head node is the node you executed serve.start on. This is the default. - "EveryNode": start one HTTP server per node. - "NoServer" or None: disable HTTP server. - num_cpus: The number of CPU cores to reserve for each internal Serve HTTP proxy actor. Defaults to 0. dedicated_cpu: Whether to reserve a CPU core for the internal Serve controller actor. Defaults to False. """ client = _private_api.serve_start(detached, http_options, dedicated_cpu, **kwargs) # Record after Ray has been started. record_extra_usage_tag(TagKey.SERVE_API_VERSION, "v1") return client
[docs]@PublicAPI(stability="stable") def shutdown() -> None: """Completely shut down the connected Serve instance. Shuts down all processes and deletes all state associated with the instance. """ try: client = get_global_client() except RayServeException: logger.info( "Nothing to shut down. There's no Serve application " "running on this Ray cluster." ) return client.shutdown() _set_global_client(None)
@PublicAPI(stability="beta") def get_replica_context() -> ReplicaContext: """If called from a deployment, returns the deployment and replica tag. A replica tag uniquely identifies a single replica for a Ray Serve deployment at runtime. Replica tags are of the form `<deployment_name>#<random letters>`. Raises: RayServeException: if not called from within a Ray Serve deployment. Example: >>> from ray import serve >>> # deployment_name >>> serve.get_replica_context().deployment # doctest: +SKIP >>> # deployment_name#krcwoa >>> serve.get_replica_context().replica_tag # doctest: +SKIP """ internal_replica_context = get_internal_replica_context() if internal_replica_context is None: raise RayServeException( "`serve.get_replica_context()` " "may only be called from within a " "Ray Serve deployment." ) return internal_replica_context @PublicAPI(stability="beta") def ingress(app: Union["FastAPI", "APIRouter", Callable]): """Mark an ASGI application ingress for Serve. Args: app (FastAPI,APIRouter,Starlette,etc): the app or router object serve as ingress for this deployment. It can be any ASGI compatible object. Example: >>> from fastapi import FastAPI >>> from ray import serve >>> app = FastAPI() # doctest: +SKIP >>> app = FastAPI() # doctest: +SKIP >>> @serve.deployment # doctest: +SKIP ... @serve.ingress(app) # doctest: +SKIP ... class App: # doctest: +SKIP ... pass # doctest: +SKIP >>> App.deploy() # doctest: +SKIP """ def decorator(cls): if not inspect.isclass(cls): raise ValueError("@serve.ingress must be used with a class.") if issubclass(cls, collections.abc.Callable): raise ValueError( "Class passed to @serve.ingress may not have __call__ method." ) # Sometimes there are decorators on the methods. We want to fix # the fast api routes here. if isinstance(app, (FastAPI, APIRouter)): make_fastapi_class_based_view(app, cls) # Free the state of the app so subsequent modification won't affect # this ingress deployment. We don't use copy.copy here to avoid # recursion issue. ensure_serialization_context() frozen_app = cloudpickle.loads(cloudpickle.dumps(app)) class ASGIAppWrapper(cls): async def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) record_serve_tag("SERVE_FASTAPI_USED", "1") install_serve_encoders_to_fastapi() self._serve_app = frozen_app # Use uvicorn's lifespan handling code to properly deal with # startup and shutdown event. self._serve_asgi_lifespan = LifespanOn( Config(self._serve_app, lifespan="on") ) # Replace uvicorn logger with our own. self._serve_asgi_lifespan.logger = logger # LifespanOn's logger logs in INFO level thus becomes spammy # Within this block we temporarily uplevel for cleaner logging with LoggingContext( self._serve_asgi_lifespan.logger, level=logging.WARNING ): await self._serve_asgi_lifespan.startup() async def __call__(self, request: Request): sender = ASGIHTTPSender() await self._serve_app( request.scope, request.receive, sender, ) return sender.build_asgi_response() # NOTE: __del__ must be async so that we can run asgi shutdown # in the same event loop. async def __del__(self): # LifespanOn's logger logs in INFO level thus becomes spammy # Within this block we temporarily uplevel for cleaner logging with LoggingContext( self._serve_asgi_lifespan.logger, level=logging.WARNING ): await self._serve_asgi_lifespan.shutdown() # Make sure to call user's del method as well. super_cls = super() if hasattr(super_cls, "__del__"): super_cls.__del__() ASGIAppWrapper.__name__ = cls.__name__ if hasattr(frozen_app, "docs_url"): ASGIAppWrapper.__fastapi_docs_path__ = frozen_app.docs_url return ASGIAppWrapper return decorator @overload def deployment(func_or_class: Callable) -> Deployment: pass @overload def deployment( name: Default[str] = DEFAULT.VALUE, version: Default[str] = DEFAULT.VALUE, num_replicas: Default[int] = DEFAULT.VALUE, init_args: Default[Tuple[Any]] = DEFAULT.VALUE, init_kwargs: Default[Dict[Any, Any]] = DEFAULT.VALUE, route_prefix: Default[Union[str, None]] = DEFAULT.VALUE, ray_actor_options: Default[Dict] = DEFAULT.VALUE, user_config: Default[Any] = DEFAULT.VALUE, max_concurrent_queries: Default[int] = DEFAULT.VALUE, autoscaling_config: Default[Union[Dict, AutoscalingConfig]] = DEFAULT.VALUE, graceful_shutdown_wait_loop_s: Default[float] = DEFAULT.VALUE, graceful_shutdown_timeout_s: Default[float] = DEFAULT.VALUE, health_check_period_s: Default[float] = DEFAULT.VALUE, health_check_timeout_s: Default[float] = DEFAULT.VALUE, is_driver_deployment: Optional[bool] = DEFAULT.VALUE, ) -> Callable[[Callable], Deployment]: pass @PublicAPI(stability="beta") def deployment( _func_or_class: Optional[Callable] = None, name: Default[str] = DEFAULT.VALUE, version: Default[str] = DEFAULT.VALUE, num_replicas: Default[Optional[int]] = DEFAULT.VALUE, init_args: Default[Tuple[Any]] = DEFAULT.VALUE, init_kwargs: Default[Dict[Any, Any]] = DEFAULT.VALUE, route_prefix: Default[Union[str, None]] = DEFAULT.VALUE, ray_actor_options: Default[Dict] = DEFAULT.VALUE, user_config: Default[Optional[Any]] = DEFAULT.VALUE, max_concurrent_queries: Default[int] = DEFAULT.VALUE, autoscaling_config: Default[Union[Dict, AutoscalingConfig, None]] = DEFAULT.VALUE, graceful_shutdown_wait_loop_s: Default[float] = DEFAULT.VALUE, graceful_shutdown_timeout_s: Default[float] = DEFAULT.VALUE, health_check_period_s: Default[float] = DEFAULT.VALUE, health_check_timeout_s: Default[float] = DEFAULT.VALUE, is_driver_deployment: Optional[bool] = DEFAULT.VALUE, ) -> Callable[[Callable], Deployment]: """Define a Serve deployment. Args: name (Default[str]): Globally-unique name identifying this deployment. If not provided, the name of the class or function will be used. version [DEPRECATED] (Default[str]): Version of the deployment. This is used to indicate a code change for the deployment; when it is re-deployed with a version change, a rolling update of the replicas will be performed. If not provided, every deployment will be treated as a new version. num_replicas (Default[Optional[int]]): The number of processes to start up that will handle requests to this deployment. Defaults to 1. init_args (Default[Tuple[Any]]): Positional args to be passed to the class constructor when starting up deployment replicas. These can also be passed when you call `.deploy()` on the returned Deployment. init_kwargs (Default[Dict[Any, Any]]): Keyword args to be passed to the class constructor when starting up deployment replicas. These can also be passed when you call `.deploy()` on the returned Deployment. route_prefix (Default[Union[str, None]]): Requests to paths under this HTTP path prefix will be routed to this deployment. Defaults to '/{name}'. When set to 'None', no HTTP endpoint will be created. Routing is done based on longest-prefix match, so if you have deployment A with a prefix of '/a' and deployment B with a prefix of '/a/b', requests to '/a', '/a/', and '/a/c' go to A and requests to '/a/b', '/a/b/', and '/a/b/c' go to B. Routes must not end with a '/' unless they're the root (just '/'), which acts as a catch-all. ray_actor_options (Default[Dict]): Options to be passed to the Ray actor constructor such as resource requirements. Valid options are `accelerator_type`, `memory`, `num_cpus`, `num_gpus`, `object_store_memory`, `resources`, and `runtime_env`. user_config (Default[Optional[Any]]): Config to pass to the reconfigure method of the deployment. This can be updated dynamically without changing the version of the deployment and restarting its replicas. The user_config must be json-serializable to keep track of updates, so it must only contain json-serializable types, or json-serializable types nested in lists and dictionaries. max_concurrent_queries (Default[int]): The maximum number of queries that will be sent to a replica of this deployment without receiving a response. Defaults to 100. is_driver_deployment (Optional[bool]): [Experiment] when set it as True, serve will deploy exact one deployment to every node. Example: >>> from ray import serve >>> @serve.deployment(name="deployment1") # doctest: +SKIP ... class MyDeployment: # doctest: +SKIP ... pass # doctest: +SKIP >>> MyDeployment.bind(*init_args) # doctest: +SKIP >>> MyDeployment.options( # doctest: +SKIP ... num_replicas=2, init_args=init_args).bind() Returns: Deployment """ # NOTE: The user_configured_option_names should be the first thing that's # defined in this function. It depends on the locals() dictionary storing # only the function args/kwargs. # Create list of all user-configured options from keyword args user_configured_option_names = [ option for option, value in locals().items() if option != "_func_or_class" and value is not DEFAULT.VALUE ] # Num of replicas should not be 0. # TODO(Sihan) seperate num_replicas attribute from internal and api if num_replicas == 0: raise ValueError("num_replicas is expected to larger than 0") if num_replicas not in [DEFAULT.VALUE, None] and autoscaling_config not in [ DEFAULT.VALUE, None, ]: raise ValueError( "Manually setting num_replicas is not allowed when " "autoscaling_config is provided." ) if version is not DEFAULT.VALUE: logger.warning( "DeprecationWarning: `version` in `@serve.deployment` has been deprecated. " "Explicitly specifying version will raise an error in the future!" ) if is_driver_deployment is DEFAULT.VALUE: is_driver_deployment = False config = DeploymentConfig.from_default( num_replicas=num_replicas if num_replicas is not None else 1, user_config=user_config, max_concurrent_queries=max_concurrent_queries, autoscaling_config=autoscaling_config, graceful_shutdown_wait_loop_s=graceful_shutdown_wait_loop_s, graceful_shutdown_timeout_s=graceful_shutdown_timeout_s, health_check_period_s=health_check_period_s, health_check_timeout_s=health_check_timeout_s, ) config.user_configured_option_names = set(user_configured_option_names) def decorator(_func_or_class): return Deployment( _func_or_class, name if name is not DEFAULT.VALUE else _func_or_class.__name__, config, version=(version if version is not DEFAULT.VALUE else None), init_args=(init_args if init_args is not DEFAULT.VALUE else None), init_kwargs=(init_kwargs if init_kwargs is not DEFAULT.VALUE else None), route_prefix=route_prefix, ray_actor_options=( ray_actor_options if ray_actor_options is not DEFAULT.VALUE else None ), is_driver_deployment=is_driver_deployment, _internal=True, ) # This handles both parametrized and non-parametrized usage of the # decorator. See the @serve.batch code for more details. return decorator(_func_or_class) if callable(_func_or_class) else decorator @guarded_deprecation_warning(instructions=MIGRATION_MESSAGE) @Deprecated(message=MIGRATION_MESSAGE) def get_deployment(name: str) -> Deployment: """Dynamically fetch a handle to a Deployment object. This can be used to update and redeploy a deployment without access to the original definition. Example: >>> from ray import serve >>> MyDeployment = serve.get_deployment("name") # doctest: +SKIP >>> MyDeployment.options(num_replicas=10).deploy() # doctest: +SKIP Args: name: name of the deployment. This must have already been deployed. Returns: Deployment """ record_extra_usage_tag(TagKey.SERVE_API_VERSION, "v1") return _private_api.get_deployment(name) @guarded_deprecation_warning(instructions=MIGRATION_MESSAGE) @Deprecated(message=MIGRATION_MESSAGE) def list_deployments() -> Dict[str, Deployment]: """Returns a dictionary of all active deployments. Dictionary maps deployment name to Deployment objects. """ record_extra_usage_tag(TagKey.SERVE_API_VERSION, "v1") return _private_api.list_deployments()
[docs]@PublicAPI(stability="beta") def run( target: Union[ClassNode, FunctionNode], _blocking: bool = True, host: str = DEFAULT_HTTP_HOST, port: int = DEFAULT_HTTP_PORT, name: str = SERVE_DEFAULT_APP_NAME, route_prefix: str = "/", ) -> Optional[RayServeHandle]: """Run a Serve application and return a ServeHandle to the ingress. Either a ClassNode, FunctionNode, or a pre-built application can be passed in. If a node is passed in, all of the deployments it depends on will be deployed. If there is an ingress, its handle will be returned. Args: target (Union[ClassNode, FunctionNode, Application]): A user-built Serve Application or a ClassNode that acts as the root node of DAG. By default ClassNode is the Driver deployment unless user provides a customized one. host: Host for HTTP servers to listen on. Defaults to "127.0.0.1". To expose Serve publicly, you probably want to set this to "0.0.0.0". port: Port for HTTP server. Defaults to 8000. name: Application name. If not provided, this will be the only application running on the cluster (it will delete all others). route_prefix: Route prefix for HTTP requests. If not provided, it will use route_prefix of the ingress deployment. By default, the ingress route prefix is '/'. Returns: RayServeHandle: A regular ray serve handle that can be called by user to execute the serve DAG. """ client = _private_api.serve_start( detached=True, http_options={"host": host, "port": port, "location": "EveryNode"}, ) # Record after Ray has been started. record_extra_usage_tag(TagKey.SERVE_API_VERSION, "v2") if isinstance(target, Application): deployments = list(target.deployments.values()) ingress = target.ingress # Each DAG should always provide a valid Driver ClassNode elif isinstance(target, ClassNode): deployments = pipeline_build(target, name) ingress = get_and_validate_ingress_deployment(deployments) # Special case where user is doing single function serve.run(func.bind()) elif isinstance(target, FunctionNode): deployments = pipeline_build(target, name) ingress = get_and_validate_ingress_deployment(deployments) if len(deployments) != 1: raise ValueError( "We only support single function node in serve.run, ex: " "serve.run(func.bind()). For more than one nodes in your DAG, " "Please provide a driver class and bind it as entrypoint to " "your Serve DAG." ) elif isinstance(target, DAGNode): raise ValueError( "Invalid DAGNode type as entry to serve.run(), " f"type: {type(target)}, accepted: ClassNode, " "FunctionNode please provide a driver class and bind it " "as entrypoint to your Serve DAG." ) else: raise TypeError( "Expected a ClassNode, FunctionNode, or Application as target. " f"Got unexpected type {type(target)} instead." ) # when name provided, keep all existing applications # otherwise, delete all of them. remove_past_deployments = True if name: remove_past_deployments = False parameter_group = [] for deployment in deployments: # Overwrite route prefix if route_prefix != "/" and deployment._route_prefix: deployment._route_prefix = route_prefix deployment_parameters = { "name": deployment._name, "func_or_class": deployment._func_or_class, "init_args": deployment.init_args, "init_kwargs": deployment.init_kwargs, "ray_actor_options": deployment._ray_actor_options, "config": deployment._config, "version": deployment._version, "route_prefix": deployment.route_prefix, "url": deployment.url, "is_driver_deployment": deployment._is_driver_deployment, "docs_path": deployment._docs_path, } parameter_group.append(deployment_parameters) client.deploy_group( name, parameter_group, _blocking=_blocking, remove_past_deployments=remove_past_deployments, ) if ingress is not None: return ingress._get_handle()
[docs]@PublicAPI(stability="alpha") def build( target: Union[ClassNode, FunctionNode], name: str = SERVE_DEFAULT_APP_NAME ) -> Application: """Builds a Serve application into a static application. Takes in a ClassNode or FunctionNode and converts it to a Serve application consisting of one or more deployments. This is intended to be used for production scenarios and deployed via the Serve REST API or CLI, so there are some restrictions placed on the deployments: 1) All of the deployments must be importable. That is, they cannot be defined in __main__ or inline defined. The deployments will be imported in production using the same import path they were here. 2) All arguments bound to the deployment must be JSON-serializable. The returned Application object can be exported to a dictionary or YAML config. Args: target (Union[ClassNode, FunctionNode]): A ClassNode or FunctionNode that acts as the top level node of the DAG. name: The name of the Serve application. Returns: The static built Serve application """ if in_interactive_shell(): raise RuntimeError( "build cannot be called from an interactive shell like " "IPython or Jupyter because it requires all deployments to be " "importable to run the app after building." ) # TODO(edoakes): this should accept host and port, but we don't # currently support them in the REST API. return Application(pipeline_build(target, name))
[docs]@PublicAPI(stability="alpha") def delete(name: str, _blocking: bool = True): """Delete an app by its name Deletes the app with all corresponding deployments. Args: name: the name of app to delete. """ client = get_global_client() client.delete_apps([name], blocking=_blocking)