Source code for ray.serve.deployment

import inspect
import logging
from copy import deepcopy
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

from ray.serve._private.config import (
    DeploymentConfig,
    ReplicaConfig,
    handle_num_replicas_auto,
)
from ray.serve._private.constants import SERVE_LOGGER_NAME
from ray.serve._private.usage import ServeUsageTag
from ray.serve._private.utils import DEFAULT, Default
from ray.serve.config import AutoscalingConfig
from ray.serve.schema import DeploymentSchema, LoggingConfig, RayActorOptionsSchema
from ray.util.annotations import PublicAPI

logger = logging.getLogger(SERVE_LOGGER_NAME)


[docs] @PublicAPI(stability="stable") class Application: """One or more deployments bound with arguments that can be deployed together. Can be passed into another `Deployment.bind()` to compose multiple deployments in a single application, passed to `serve.run`, or deployed via a Serve config file. For example, to define an Application and run it in Python: .. code-block:: python from ray import serve from ray.serve import Application @serve.deployment class MyDeployment: pass app: Application = MyDeployment.bind(OtherDeployment.bind()) serve.run(app) To run the same app using the command line interface (CLI): .. code-block:: bash serve run python_file:app To deploy the same app via a config file: .. code-block:: yaml applications: my_app: import_path: python_file:app """ def __init__(self, bound_deployment: "Deployment"): # This is used by `build_app`, but made private so users don't use it. self._bound_deployment = bound_deployment
[docs] @PublicAPI(stability="stable") class Deployment: """Class (or function) decorated with the `@serve.deployment` decorator. This is run on a number of replica actors. Requests to those replicas call this class. One or more deployments can be composed together into an `Application` which is then run via `serve.run` or a config file. Example: .. code-block:: python @serve.deployment class MyDeployment: def __init__(self, name: str): self._name = name def __call__(self, request): return "Hello world!" app = MyDeployment.bind() # Run via `serve.run` or the `serve run` CLI command. serve.run(app) """ def __init__( self, name: str, deployment_config: DeploymentConfig, replica_config: ReplicaConfig, version: Optional[str] = None, _internal=False, ) -> None: if not _internal: raise RuntimeError( "The Deployment constructor should not be called " "directly. Use `@serve.deployment` instead." ) if not isinstance(name, str): raise TypeError("name must be a string.") if not (version is None or isinstance(version, str)): raise TypeError("version must be a string.") docs_path = None if ( inspect.isclass(replica_config.deployment_def) and hasattr(replica_config.deployment_def, "__module__") and replica_config.deployment_def.__module__ == "ray.serve.api" and hasattr(replica_config.deployment_def, "__fastapi_docs_path__") ): docs_path = replica_config.deployment_def.__fastapi_docs_path__ self._name = name self._version = version self._deployment_config = deployment_config self._replica_config = replica_config self._docs_path = docs_path @property def name(self) -> str: """Unique name of this deployment.""" return self._name @property def version(self) -> Optional[str]: return self._version @property def func_or_class(self) -> Union[Callable, str]: """Underlying class or function that this deployment wraps.""" return self._replica_config.deployment_def @property def num_replicas(self) -> int: """Target number of replicas.""" return self._deployment_config.num_replicas @property def user_config(self) -> Any: """Dynamic user-provided config options.""" return self._deployment_config.user_config @property def max_ongoing_requests(self) -> int: """Max number of requests a replica can handle at once.""" return self._deployment_config.max_ongoing_requests @property def max_queued_requests(self) -> int: """Max number of requests that can be queued in each deployment handle.""" return self._deployment_config.max_queued_requests @property def route_prefix(self): raise ValueError( "`route_prefix` can no longer be specified at the deployment level. " "Pass it to `serve.run` or in the application config instead." ) @property def ray_actor_options(self) -> Optional[Dict]: """Actor options such as resources required for each replica.""" return self._replica_config.ray_actor_options @property def init_args(self) -> Tuple[Any]: return self._replica_config.init_args @property def init_kwargs(self) -> Tuple[Any]: return self._replica_config.init_kwargs @property def url(self) -> Optional[str]: logger.warning( "DeprecationWarning: `Deployment.url` is deprecated " "and will be removed in the future." ) return None @property def logging_config(self) -> Dict: return self._deployment_config.logging_config def set_logging_config(self, logging_config: Dict): self._deployment_config.logging_config = logging_config def __call__(self): raise RuntimeError( "Deployments cannot be constructed directly. " "Use `deployment.deploy() instead.`" )
[docs] def bind(self, *args, **kwargs) -> Application: """Bind the arguments to the deployment and return an Application. The returned Application can be deployed using `serve.run` (or via config file) or bound to another deployment for composition. """ return Application(self.options(_init_args=args, _init_kwargs=kwargs))
[docs] def options( self, func_or_class: Optional[Callable] = None, name: Default[str] = DEFAULT.VALUE, version: Default[str] = DEFAULT.VALUE, num_replicas: Default[Optional[Union[int, str]]] = DEFAULT.VALUE, route_prefix: Default[Union[str, None]] = DEFAULT.VALUE, ray_actor_options: Default[Optional[Dict]] = DEFAULT.VALUE, placement_group_bundles: Default[List[Dict[str, float]]] = DEFAULT.VALUE, placement_group_strategy: Default[str] = DEFAULT.VALUE, max_replicas_per_node: Default[int] = DEFAULT.VALUE, user_config: Default[Optional[Any]] = DEFAULT.VALUE, max_ongoing_requests: Default[int] = DEFAULT.VALUE, max_queued_requests: Default[int] = DEFAULT.VALUE, autoscaling_config: Default[ Union[Dict, AutoscalingConfig, None] ] = DEFAULT.VALUE, graceful_shutdown_wait_loop_s: Default[float] = DEFAULT.VALUE, graceful_shutdown_timeout_s: Default[float] = DEFAULT.VALUE, health_check_period_s: Default[float] = DEFAULT.VALUE, health_check_timeout_s: Default[float] = DEFAULT.VALUE, logging_config: Default[Union[Dict, LoggingConfig, None]] = DEFAULT.VALUE, _init_args: Default[Tuple[Any]] = DEFAULT.VALUE, _init_kwargs: Default[Dict[Any, Any]] = DEFAULT.VALUE, _internal: bool = False, ) -> "Deployment": """Return a copy of this deployment with updated options. Only those options passed in will be updated, all others will remain unchanged from the existing deployment. Refer to the `@serve.deployment` decorator docs for available arguments. """ if route_prefix is not DEFAULT.VALUE: raise ValueError( "`route_prefix` can no longer be specified at the deployment level. " "Pass it to `serve.run` or in the application config instead." ) # Modify max_ongoing_requests and autoscaling_config if # `num_replicas="auto"` if max_ongoing_requests is None: raise ValueError("`max_ongoing_requests` must be non-null, got None.") if num_replicas == "auto": num_replicas = None max_ongoing_requests, autoscaling_config = handle_num_replicas_auto( max_ongoing_requests, autoscaling_config ) ServeUsageTag.AUTO_NUM_REPLICAS_USED.record("1") # NOTE: The user_configured_option_names should be the first thing that's # defined in this method. It depends on the locals() dictionary storing # only the function args/kwargs. # Create list of all user-configured options from keyword args user_configured_option_names = [ option for option, value in locals().items() if option not in {"self", "func_or_class", "_internal"} and value is not DEFAULT.VALUE ] new_deployment_config = deepcopy(self._deployment_config) if not _internal: new_deployment_config.user_configured_option_names.update( user_configured_option_names ) if num_replicas not in [ DEFAULT.VALUE, None, "auto", ] and autoscaling_config not in [ DEFAULT.VALUE, None, ]: raise ValueError( "Manually setting num_replicas is not allowed when " "autoscaling_config is provided." ) if num_replicas == 0: raise ValueError("num_replicas is expected to larger than 0") if not _internal and version is not DEFAULT.VALUE: logger.warning( "DeprecationWarning: `version` in `Deployment.options()` has been " "deprecated. Explicitly specifying version will raise an error in the " "future!" ) elif num_replicas not in [DEFAULT.VALUE, None]: new_deployment_config.num_replicas = num_replicas if user_config is not DEFAULT.VALUE: new_deployment_config.user_config = user_config if max_ongoing_requests is not DEFAULT.VALUE: new_deployment_config.max_ongoing_requests = max_ongoing_requests if max_queued_requests is not DEFAULT.VALUE: new_deployment_config.max_queued_requests = max_queued_requests if func_or_class is None: func_or_class = self._replica_config.deployment_def if name is DEFAULT.VALUE: name = self._name if version is DEFAULT.VALUE: version = self._version if _init_args is DEFAULT.VALUE: _init_args = self._replica_config.init_args if _init_kwargs is DEFAULT.VALUE: _init_kwargs = self._replica_config.init_kwargs if ray_actor_options is DEFAULT.VALUE: ray_actor_options = self._replica_config.ray_actor_options if placement_group_bundles is DEFAULT.VALUE: placement_group_bundles = self._replica_config.placement_group_bundles if placement_group_strategy is DEFAULT.VALUE: placement_group_strategy = self._replica_config.placement_group_strategy if max_replicas_per_node is DEFAULT.VALUE: max_replicas_per_node = self._replica_config.max_replicas_per_node if autoscaling_config is not DEFAULT.VALUE: new_deployment_config.autoscaling_config = autoscaling_config if graceful_shutdown_wait_loop_s is not DEFAULT.VALUE: new_deployment_config.graceful_shutdown_wait_loop_s = ( graceful_shutdown_wait_loop_s ) if graceful_shutdown_timeout_s is not DEFAULT.VALUE: new_deployment_config.graceful_shutdown_timeout_s = ( graceful_shutdown_timeout_s ) if health_check_period_s is not DEFAULT.VALUE: new_deployment_config.health_check_period_s = health_check_period_s if health_check_timeout_s is not DEFAULT.VALUE: new_deployment_config.health_check_timeout_s = health_check_timeout_s if logging_config is not DEFAULT.VALUE: if isinstance(logging_config, LoggingConfig): logging_config = logging_config.dict() new_deployment_config.logging_config = logging_config new_replica_config = ReplicaConfig.create( func_or_class, init_args=_init_args, init_kwargs=_init_kwargs, ray_actor_options=ray_actor_options, placement_group_bundles=placement_group_bundles, placement_group_strategy=placement_group_strategy, max_replicas_per_node=max_replicas_per_node, ) return Deployment( name, new_deployment_config, new_replica_config, version=version, _internal=True, )
def __eq__(self, other): return all( [ self._name == other._name, self._version == other._version, self._deployment_config == other._deployment_config, self._replica_config.init_args == other._replica_config.init_args, self._replica_config.init_kwargs == other._replica_config.init_kwargs, self._replica_config.ray_actor_options == self._replica_config.ray_actor_options, ] ) def __str__(self): return f"Deployment(name={self._name})" def __repr__(self): return str(self)
def deployment_to_schema(d: Deployment) -> DeploymentSchema: """Converts a live deployment object to a corresponding structured schema. Args: d: Deployment object to convert """ if d.ray_actor_options is not None: ray_actor_options_schema = RayActorOptionsSchema.parse_obj(d.ray_actor_options) else: ray_actor_options_schema = None deployment_options = { "name": d.name, "num_replicas": None if d._deployment_config.autoscaling_config else d.num_replicas, "max_ongoing_requests": d.max_ongoing_requests, "max_queued_requests": d.max_queued_requests, "user_config": d.user_config, "autoscaling_config": d._deployment_config.autoscaling_config, "graceful_shutdown_wait_loop_s": d._deployment_config.graceful_shutdown_wait_loop_s, # noqa: E501 "graceful_shutdown_timeout_s": d._deployment_config.graceful_shutdown_timeout_s, "health_check_period_s": d._deployment_config.health_check_period_s, "health_check_timeout_s": d._deployment_config.health_check_timeout_s, "ray_actor_options": ray_actor_options_schema, "placement_group_strategy": d._replica_config.placement_group_strategy, "placement_group_bundles": d._replica_config.placement_group_bundles, "max_replicas_per_node": d._replica_config.max_replicas_per_node, "logging_config": d._deployment_config.logging_config, } # Let non-user-configured options be set to defaults. If the schema # is converted back to a deployment, this lets Serve continue tracking # which options were set by the user. Name is a required field in the # schema, so it should be passed in explicitly. for option in list(deployment_options.keys()): if ( option != "name" and option not in d._deployment_config.user_configured_option_names ): del deployment_options[option] # TODO(Sihan) DeploymentConfig num_replicas and auto_config can be set together # because internally we use these two field for autoscale and deploy. # We can improve the code after we separate the user faced deployment config and # internal deployment config. return DeploymentSchema(**deployment_options) def schema_to_deployment(s: DeploymentSchema) -> Deployment: """Creates a deployment with parameters specified in schema. The returned deployment CANNOT be deployed immediately. It's func_or_class value is an empty string (""), which is not a valid import path. The func_or_class value must be overwritten with a valid function or class before the deployment can be deployed. """ if s.ray_actor_options is DEFAULT.VALUE: ray_actor_options = None else: ray_actor_options = s.ray_actor_options.dict(exclude_unset=True) if s.placement_group_bundles is DEFAULT.VALUE: placement_group_bundles = None else: placement_group_bundles = s.placement_group_bundles if s.placement_group_strategy is DEFAULT.VALUE: placement_group_strategy = None else: placement_group_strategy = s.placement_group_strategy if s.max_replicas_per_node is DEFAULT.VALUE: max_replicas_per_node = None else: max_replicas_per_node = s.max_replicas_per_node deployment_config = DeploymentConfig.from_default( num_replicas=s.num_replicas, user_config=s.user_config, max_ongoing_requests=s.max_ongoing_requests, max_queued_requests=s.max_queued_requests, autoscaling_config=s.autoscaling_config, graceful_shutdown_wait_loop_s=s.graceful_shutdown_wait_loop_s, graceful_shutdown_timeout_s=s.graceful_shutdown_timeout_s, health_check_period_s=s.health_check_period_s, health_check_timeout_s=s.health_check_timeout_s, logging_config=s.logging_config, ) deployment_config.user_configured_option_names = ( s._get_user_configured_option_names() ) replica_config = ReplicaConfig.create( deployment_def="", init_args=(), init_kwargs={}, ray_actor_options=ray_actor_options, placement_group_bundles=placement_group_bundles, placement_group_strategy=placement_group_strategy, max_replicas_per_node=max_replicas_per_node, ) return Deployment( name=s.name, deployment_config=deployment_config, replica_config=replica_config, _internal=True, )