Source code for ray.serve.config

import inspect
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional

import pydantic
from pydantic import BaseModel, confloat, PositiveFloat, PositiveInt, validator
from ray.serve.constants import DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT


def _callable_accepts_batch(backend_def):
    if inspect.isfunction(backend_def):
        return hasattr(backend_def, "_serve_accept_batch")
    elif inspect.isclass(backend_def):
        return hasattr(backend_def.__call__, "_serve_accept_batch")
    elif isinstance(backend_def, str):
        return True
    else:
        raise TypeError("backend_def must be function, class, or str.")


def _callable_is_blocking(backend_def):
    if inspect.isfunction(backend_def):
        return not inspect.iscoroutinefunction(backend_def)
    elif inspect.isclass(backend_def):
        return not inspect.iscoroutinefunction(backend_def.__call__)
    elif isinstance(backend_def, str):
        return False
    else:
        raise TypeError("backend_def must be function, class, or str.")


@dataclass
class BackendMetadata:
    accepts_batches: bool = False
    is_blocking: bool = True
    autoscaling_config: Optional[Dict[str, Any]] = None


[docs]class BackendConfig(BaseModel): """Configuration options for a backend, to be set by the user. Args: num_replicas (Optional[int]): The number of processes to start up that will handle requests to this backend. Defaults to 0. max_batch_size (Optional[int]): The maximum number of requests that will be processed in one batch by this backend. Defaults to None (no maximium). batch_wait_timeout (Optional[float]): The time in seconds that backend replicas will wait for a full batch of requests before processing a partial batch. Defaults to 0. max_concurrent_queries (Optional[int]): The maximum number of queries that will be sent to a replica of this backend without receiving a response. Defaults to None (no maximum). user_config (Optional[Any]): Arguments to pass to the reconfigure method of the backend. The reconfigure method is called if user_config is not None. experimental_graceful_shutdown_wait_loop_s (Optional[float]): Duration that backend workers will wait until there is no more work to be done before shutting down. Defaults to 2s. experimental_graceful_shutdown_timeout_s (Optional[float]): Controller waits for this duration to forcefully kill the replica for shutdown. Defaults to 20s. """ internal_metadata: BackendMetadata = BackendMetadata() num_replicas: PositiveInt = 1 max_batch_size: Optional[PositiveInt] = None batch_wait_timeout: float = 0 max_concurrent_queries: Optional[int] = None user_config: Any = None experimental_graceful_shutdown_wait_loop_s: PositiveFloat = 2.0 experimental_graceful_shutdown_timeout_s: confloat(ge=0) = 20.0 class Config: validate_assignment = True extra = "forbid" arbitrary_types_allowed = True def _validate_batch_size(self): if (self.max_batch_size is not None and not self.internal_metadata.accepts_batches and self.max_batch_size > 1): raise ValueError( "max_batch_size is set in config but the function or " "method does not accept batching. Please use " "@serve.accept_batch to explicitly mark that the function or " "method accepts a list of requests as an argument.") # This is not a pydantic validator, so that we may skip this method when # creating partially filled BackendConfig objects to pass as updates--for # example, BackendConfig(max_batch_size=5). def _validate_complete(self): self._validate_batch_size() # Dynamic default for max_concurrent_queries @validator("max_concurrent_queries", always=True) def set_max_queries_by_mode(cls, v, values): # noqa 805 if v is None: # Model serving mode: if the servable is blocking and the wait # timeout is default zero seconds, then we keep the existing # behavior to allow at most max batch size queries. if (values["internal_metadata"].is_blocking and values["batch_wait_timeout"] == 0): if ("max_batch_size" in values and values["max_batch_size"] is not None): v = 2 * values["max_batch_size"] else: v = 8 # Pipeline/async mode: if the servable is not blocking, # router should just keep pushing queries to the replicas # until a high limit. # TODO(edoakes): setting this to a relatively low constant because # we can't determine if imported backends are sync or async, but we # may consider tweaking it in the future. if not values["internal_metadata"].is_blocking: v = 100 # Batch inference mode: user specifies non zero timeout to wait for # full batch. We will use 2*max_batch_size to perform double # buffering to keep the replica busy. if ("max_batch_size" in values and values["max_batch_size"] is not None and values["batch_wait_timeout"] > 0): v = 2 * values["max_batch_size"] return v
class ReplicaConfig: def __init__(self, backend_def, *init_args, ray_actor_options=None): self.backend_def = backend_def self.accepts_batches = _callable_accepts_batch(backend_def) self.is_blocking = _callable_is_blocking(backend_def) self.init_args = list(init_args) if ray_actor_options is None: self.ray_actor_options = {} else: self.ray_actor_options = ray_actor_options self.resource_dict = {} self._validate() def _validate(self): # Validate that backend_def is an import path, function, or class. if isinstance(self.backend_def, str): pass elif inspect.isfunction(self.backend_def): if len(self.init_args) != 0: raise ValueError( "init_args not supported for function backend.") elif not inspect.isclass(self.backend_def): raise TypeError( "Backend must be a function or class, it is {}.".format( type(self.backend_def))) if not isinstance(self.ray_actor_options, dict): raise TypeError("ray_actor_options must be a dictionary.") elif "lifetime" in self.ray_actor_options: raise ValueError( "Specifying lifetime in init_args is not allowed.") elif "name" in self.ray_actor_options: raise ValueError("Specifying name in init_args is not allowed.") elif "max_restarts" in self.ray_actor_options: raise ValueError("Specifying max_restarts in " "init_args is not allowed.") else: # Ray defaults to zero CPUs for placement, we default to one here. if "num_cpus" not in self.ray_actor_options: self.ray_actor_options["num_cpus"] = 1 num_cpus = self.ray_actor_options["num_cpus"] if not isinstance(num_cpus, (int, float)): raise TypeError( "num_cpus in ray_actor_options must be an int or a float.") elif num_cpus < 0: raise ValueError("num_cpus in ray_actor_options must be >= 0.") self.resource_dict["CPU"] = num_cpus num_gpus = self.ray_actor_options.get("num_gpus", 0) if not isinstance(num_gpus, (int, float)): raise TypeError( "num_gpus in ray_actor_options must be an int or a float.") elif num_gpus < 0: raise ValueError("num_gpus in ray_actor_options must be >= 0.") self.resource_dict["GPU"] = num_gpus memory = self.ray_actor_options.get("memory", 0) if not isinstance(memory, (int, float)): raise TypeError( "memory in ray_actor_options must be an int or a float.") elif memory < 0: raise ValueError("num_gpus in ray_actor_options must be >= 0.") self.resource_dict["memory"] = memory object_store_memory = self.ray_actor_options.get( "object_store_memory", 0) if not isinstance(object_store_memory, (int, float)): raise TypeError( "object_store_memory in ray_actor_options must be " "an int or a float.") elif object_store_memory < 0: raise ValueError( "object_store_memory in ray_actor_options must be >= 0.") self.resource_dict["object_store_memory"] = object_store_memory custom_resources = self.ray_actor_options.get("resources", {}) if not isinstance(custom_resources, dict): raise TypeError( "resources in ray_actor_options must be a dictionary.") self.resource_dict.update(custom_resources) class DeploymentMode(str, Enum): NoServer = "NoServer" HeadOnly = "HeadOnly" EveryNode = "EveryNode" class HTTPOptions(pydantic.BaseModel): # Documentation inside serve.start for user's convenience. host: Optional[str] = DEFAULT_HTTP_HOST port: int = DEFAULT_HTTP_PORT middlewares: List[Any] = [] location: Optional[DeploymentMode] = DeploymentMode.HeadOnly @validator("location", always=True) def location_backfill_no_server(cls, v, values): if values["host"] is None or v is None: return DeploymentMode.NoServer return v class Config: validate_assignment = True extra = "forbid" arbitrary_types_allowed = True