Source code for ray.serve._private.common

import json
from dataclasses import asdict, dataclass, field
from enum import Enum
from typing import Any, Awaitable, Callable, Dict, List, Optional

from starlette.types import Scope

import ray
from ray.actor import ActorHandle
from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME, SERVE_NAMESPACE
from ray.serve.generated.serve_pb2 import (
    DeploymentStatus as DeploymentStatusProto,
    DeploymentStatusInfo as DeploymentStatusInfoProto,
    DeploymentStatusTrigger as DeploymentStatusTriggerProto,
)
from ray.serve.grpc_util import RayServegRPCContext
from ray.util.annotations import PublicAPI

REPLICA_ID_FULL_ID_STR_PREFIX = "SERVE_REPLICA::"


@dataclass(frozen=True)
class DeploymentID:
    name: str
    app_name: str = SERVE_DEFAULT_APP_NAME

    def to_replica_actor_class_name(self):
        return f"ServeReplica:{self.app_name}:{self.name}"

    def __str__(self):
        return f"Deployment(name='{self.name}', app='{self.app_name}')"

    def __repr__(self):
        return str(self)


[docs] @PublicAPI(stability="alpha") @dataclass(frozen=True) class ReplicaID: """A unique identifier for a replica.""" unique_id: str """A unique identifier for the replica within the deployment.""" deployment_id: DeploymentID """The deployment this replica belongs to.""" def to_full_id_str(self) -> str: s = f"{self.deployment_id.name}#{self.unique_id}" if self.deployment_id.app_name: s = f"{self.deployment_id.app_name}#{s}" return f"{REPLICA_ID_FULL_ID_STR_PREFIX}{s}" @staticmethod def is_full_id_str(s: str) -> bool: return s.startswith(REPLICA_ID_FULL_ID_STR_PREFIX) @classmethod def from_full_id_str(cls, s: str): assert cls.is_full_id_str(s) parsed = s[len(REPLICA_ID_FULL_ID_STR_PREFIX) :].split("#") if len(parsed) == 3: app_name, deployment_name, unique_id = parsed elif len(parsed) == 2: app_name = "" deployment_name, unique_id = parsed else: raise ValueError( f"Given replica ID string {s} didn't match expected pattern, " "ensure it has either two or three fields with delimiter '#'." ) return cls( unique_id, deployment_id=DeploymentID(name=deployment_name, app_name=app_name), ) def __repr__(self) -> str: return str(self) def __str__(self) -> str: """Returns a human-readable string. This is used in user-facing log messages, so take care when updating it. """ return ( f"Replica(" f"id='{self.unique_id}', " f"deployment='{self.deployment_id.name}', " f"app='{self.deployment_id.app_name}'" ")" )
NodeId = str Duration = float ApplicationName = str @dataclass class EndpointInfo: route: str app_is_cross_language: bool = False # Keep in sync with ServeReplicaState in dashboard/client/src/type/serve.ts class ReplicaState(str, Enum): STARTING = "STARTING" UPDATING = "UPDATING" RECOVERING = "RECOVERING" RUNNING = "RUNNING" STOPPING = "STOPPING" PENDING_MIGRATION = "PENDING_MIGRATION" class DeploymentStatus(str, Enum): UPDATING = "UPDATING" HEALTHY = "HEALTHY" UNHEALTHY = "UNHEALTHY" DEPLOY_FAILED = "DEPLOY_FAILED" UPSCALING = "UPSCALING" DOWNSCALING = "DOWNSCALING" class DeploymentStatusTrigger(str, Enum): UNSPECIFIED = "UNSPECIFIED" CONFIG_UPDATE_STARTED = "CONFIG_UPDATE_STARTED" CONFIG_UPDATE_COMPLETED = "CONFIG_UPDATE_COMPLETED" UPSCALE_COMPLETED = "UPSCALE_COMPLETED" DOWNSCALE_COMPLETED = "DOWNSCALE_COMPLETED" AUTOSCALING = "AUTOSCALING" REPLICA_STARTUP_FAILED = "REPLICA_STARTUP_FAILED" HEALTH_CHECK_FAILED = "HEALTH_CHECK_FAILED" INTERNAL_ERROR = "INTERNAL_ERROR" DELETING = "DELETING" # Internal Enum used to manage deployment state machine class DeploymentStatusInternalTrigger(str, Enum): HEALTHY = "HEALTHY" CONFIG_UPDATE = "CONFIG_UPDATE" AUTOSCALE_UP = "AUTOSCALE_UP" AUTOSCALE_DOWN = "AUTOSCALE_DOWN" MANUALLY_INCREASE_NUM_REPLICAS = "MANUALLY_INCREASE_NUM_REPLICAS" MANUALLY_DECREASE_NUM_REPLICAS = "MANUALLY_DECREASE_NUM_REPLICAS" REPLICA_STARTUP_FAILED = "REPLICA_STARTUP_FAILED" HEALTH_CHECK_FAILED = "HEALTH_CHECK_FAILED" INTERNAL_ERROR = "INTERNAL_ERROR" DELETE = "DELETE" # List of states in ranked order. # # Each ranked state has the format of a tuple with either 1 or 2 items. # If 1 item: contains a single DeploymentStatus, representing states with # that DeploymentStatus and any DeploymentStatusTrigger. # If 2 items: tuple contains a DeploymentStatus and a DeploymentStatusTrigger, # representing a state with that status and status trigger. DEPLOYMENT_STATUS_RANKING_ORDER = { # Status ranking order is defined in a following fashion: # 0. (Highest) State signaling a deploy failure. (DeploymentStatus.DEPLOY_FAILED,): 0, # 1. State signaling any non-deploy failures in the system. (DeploymentStatus.UNHEALTHY,): 1, # 2. States signaling the user updated the configuration. (DeploymentStatus.UPDATING,): 2, (DeploymentStatus.UPSCALING, DeploymentStatusTrigger.CONFIG_UPDATE_STARTED): 2, ( DeploymentStatus.DOWNSCALING, DeploymentStatusTrigger.CONFIG_UPDATE_STARTED, ): 2, # 3. Steady state or autoscaling. (DeploymentStatus.UPSCALING, DeploymentStatusTrigger.AUTOSCALING): 3, (DeploymentStatus.DOWNSCALING, DeploymentStatusTrigger.AUTOSCALING): 3, (DeploymentStatus.HEALTHY,): 3, } @dataclass(eq=True) class DeploymentStatusInfo: name: str status: DeploymentStatus status_trigger: DeploymentStatusTrigger message: str = "" @property def rank(self) -> int: """Get priority of state based on ranking_order(). The ranked order indicates what the status should be of a hierarchically "higher" resource when derived from a group of `DeploymentStatusInfo` sub-resources. """ if (self.status,) in DEPLOYMENT_STATUS_RANKING_ORDER: return DEPLOYMENT_STATUS_RANKING_ORDER[(self.status,)] elif (self.status, self.status_trigger) in DEPLOYMENT_STATUS_RANKING_ORDER: return DEPLOYMENT_STATUS_RANKING_ORDER[(self.status, self.status_trigger)] def debug_string(self): return json.dumps(asdict(self), indent=4) def _updated_copy( self, status: DeploymentStatus = None, status_trigger: DeploymentStatusTrigger = None, message: str = "", ): """Returns a copy of the current object with the passed in kwargs updated.""" return DeploymentStatusInfo( name=self.name, status=status if status else self.status, status_trigger=status_trigger if status_trigger else self.status_trigger, message=message, ) def update_message(self, message: str): return self._updated_copy(message=message) def handle_transition( self, trigger: DeploymentStatusInternalTrigger, message: str = "", ) -> "DeploymentStatusInfo": """Handles a transition from the current state to the next state. Args: trigger: An internal trigger that determines the state transition. message: The message to set in status info. Returns: New instance of DeploymentStatusInfo representing the next state to transition to. """ # If there was an unexpected internal error during reconciliation, set # status to unhealthy immediately and return if trigger == DeploymentStatusInternalTrigger.INTERNAL_ERROR: return self._updated_copy( status=DeploymentStatus.UNHEALTHY, status_trigger=DeploymentStatusTrigger.INTERNAL_ERROR, message=message, ) # If deployment is being deleted, set status immediately and return elif trigger == DeploymentStatusInternalTrigger.DELETE: return self._updated_copy( status=DeploymentStatus.UPDATING, status_trigger=DeploymentStatusTrigger.DELETING, message=message, ) # Otherwise, go through normal state machine transitions elif self.status == DeploymentStatus.UPDATING: # Finished updating configuration and transition to healthy if trigger == DeploymentStatusInternalTrigger.HEALTHY: return self._updated_copy( status=DeploymentStatus.HEALTHY, status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_COMPLETED, message=message, ) # A new configuration has been deployed before deployment # has finished updating elif trigger == DeploymentStatusInternalTrigger.CONFIG_UPDATE: return self._updated_copy( status=DeploymentStatus.UPDATING, status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED, message=message, ) # Autoscaling. elif trigger == DeploymentStatusInternalTrigger.AUTOSCALE_UP: return self._updated_copy( status=DeploymentStatus.UPSCALING, status_trigger=DeploymentStatusTrigger.AUTOSCALING, message=message, ) elif trigger == DeploymentStatusInternalTrigger.AUTOSCALE_DOWN: return self._updated_copy( status=DeploymentStatus.DOWNSCALING, status_trigger=DeploymentStatusTrigger.AUTOSCALING, message=message, ) # Manually increasing or decreasing num replicas does not # change the status while deployment is still updating. elif trigger in { DeploymentStatusInternalTrigger.MANUALLY_INCREASE_NUM_REPLICAS, DeploymentStatusInternalTrigger.MANUALLY_DECREASE_NUM_REPLICAS, }: return self # Failures occurred while a deployment was being updated elif trigger == DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED: return self._updated_copy( status=DeploymentStatus.DEPLOY_FAILED, status_trigger=DeploymentStatusTrigger.HEALTH_CHECK_FAILED, message=message, ) elif trigger == DeploymentStatusInternalTrigger.REPLICA_STARTUP_FAILED: return self._updated_copy( status=DeploymentStatus.DEPLOY_FAILED, status_trigger=DeploymentStatusTrigger.REPLICA_STARTUP_FAILED, message=message, ) elif self.status in {DeploymentStatus.UPSCALING, DeploymentStatus.DOWNSCALING}: # Deployment transitions to healthy if trigger == DeploymentStatusInternalTrigger.HEALTHY: return self._updated_copy( status=DeploymentStatus.HEALTHY, status_trigger=DeploymentStatusTrigger.UPSCALE_COMPLETED if self.status == DeploymentStatus.UPSCALING else DeploymentStatusTrigger.DOWNSCALE_COMPLETED, message=message, ) # Configuration is updated before scaling is finished elif trigger == DeploymentStatusInternalTrigger.CONFIG_UPDATE: return self._updated_copy( status=DeploymentStatus.UPDATING, status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED, message=message, ) # Upscale replicas before previous upscaling/downscaling has finished elif ( self.status_trigger == DeploymentStatusTrigger.AUTOSCALING and trigger == DeploymentStatusInternalTrigger.AUTOSCALE_UP ) or ( self.status_trigger == DeploymentStatusTrigger.CONFIG_UPDATE_STARTED and trigger == DeploymentStatusInternalTrigger.MANUALLY_INCREASE_NUM_REPLICAS ): return self._updated_copy( status=DeploymentStatus.UPSCALING, message=message ) # Downscale replicas before previous upscaling/downscaling has finished elif ( self.status_trigger == DeploymentStatusTrigger.AUTOSCALING and trigger == DeploymentStatusInternalTrigger.AUTOSCALE_DOWN ) or ( self.status_trigger == DeploymentStatusTrigger.CONFIG_UPDATE_STARTED and trigger == DeploymentStatusInternalTrigger.MANUALLY_DECREASE_NUM_REPLICAS ): return self._updated_copy( status=DeploymentStatus.DOWNSCALING, message=message ) # Failures occurred while upscaling/downscaling elif trigger == DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED: return self._updated_copy( status=DeploymentStatus.UNHEALTHY, status_trigger=DeploymentStatusTrigger.HEALTH_CHECK_FAILED, message=message, ) elif trigger == DeploymentStatusInternalTrigger.REPLICA_STARTUP_FAILED: return self._updated_copy( status=DeploymentStatus.UNHEALTHY, status_trigger=DeploymentStatusTrigger.REPLICA_STARTUP_FAILED, message=message, ) elif self.status == DeploymentStatus.HEALTHY: # Deployment remains healthy if trigger == DeploymentStatusInternalTrigger.HEALTHY: return self # New configuration is deployed elif trigger == DeploymentStatusInternalTrigger.CONFIG_UPDATE: return self._updated_copy( status=DeploymentStatus.UPDATING, status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED, message=message, ) # Manually scaling / autoscaling num replicas elif ( trigger == DeploymentStatusInternalTrigger.MANUALLY_INCREASE_NUM_REPLICAS ): return self._updated_copy( status=DeploymentStatus.UPSCALING, status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED, message=message, ) elif ( trigger == DeploymentStatusInternalTrigger.MANUALLY_DECREASE_NUM_REPLICAS ): return self._updated_copy( status=DeploymentStatus.DOWNSCALING, status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED, message=message, ) elif trigger == DeploymentStatusInternalTrigger.AUTOSCALE_UP: return self._updated_copy( status=DeploymentStatus.UPSCALING, status_trigger=DeploymentStatusTrigger.AUTOSCALING, message=message, ) elif trigger == DeploymentStatusInternalTrigger.AUTOSCALE_DOWN: return self._updated_copy( status=DeploymentStatus.DOWNSCALING, status_trigger=DeploymentStatusTrigger.AUTOSCALING, message=message, ) # Health check for one or more replicas has failed elif trigger == DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED: return self._updated_copy( status=DeploymentStatus.UNHEALTHY, status_trigger=DeploymentStatusTrigger.HEALTH_CHECK_FAILED, message=message, ) elif self.status == DeploymentStatus.UNHEALTHY: # The deployment recovered if trigger == DeploymentStatusInternalTrigger.HEALTHY: return self._updated_copy( status=DeploymentStatus.HEALTHY, status_trigger=DeploymentStatusTrigger.UNSPECIFIED, message=message, ) # A new configuration is being deployed. elif trigger == DeploymentStatusInternalTrigger.CONFIG_UPDATE: return self._updated_copy( status=DeploymentStatus.UPDATING, status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED, message=message, ) # Old failures keep getting triggered, or new failures occurred. elif trigger == DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED: return self._updated_copy( status=DeploymentStatus.UNHEALTHY, status_trigger=DeploymentStatusTrigger.HEALTH_CHECK_FAILED, message=message, ) elif trigger == DeploymentStatusInternalTrigger.REPLICA_STARTUP_FAILED: return self._updated_copy( status=DeploymentStatus.UNHEALTHY, status_trigger=DeploymentStatusTrigger.REPLICA_STARTUP_FAILED, message=message, ) elif self.status == DeploymentStatus.DEPLOY_FAILED: # The deployment recovered if trigger == DeploymentStatusInternalTrigger.HEALTHY: return self._updated_copy( status=DeploymentStatus.HEALTHY, status_trigger=DeploymentStatusTrigger.UNSPECIFIED, message=message, ) # A new configuration is being deployed. elif trigger == DeploymentStatusInternalTrigger.CONFIG_UPDATE: return self._updated_copy( status=DeploymentStatus.UPDATING, status_trigger=DeploymentStatusTrigger.CONFIG_UPDATE_STARTED, message=message, ) # Old failures keep getting triggered, or new failures occurred. elif trigger == DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED: return self._updated_copy( status=DeploymentStatus.DEPLOY_FAILED, status_trigger=DeploymentStatusTrigger.HEALTH_CHECK_FAILED, message=message, ) elif trigger == DeploymentStatusInternalTrigger.REPLICA_STARTUP_FAILED: return self._updated_copy( status=DeploymentStatus.DEPLOY_FAILED, status_trigger=DeploymentStatusTrigger.REPLICA_STARTUP_FAILED, message=message, ) # If it's any other transition, ignore it. return self def to_proto(self): return DeploymentStatusInfoProto( name=self.name, status=f"DEPLOYMENT_STATUS_{self.status.name}", status_trigger=f"DEPLOYMENT_STATUS_TRIGGER_{self.status_trigger.name}", message=self.message, ) @classmethod def from_proto(cls, proto: DeploymentStatusInfoProto): status = DeploymentStatusProto.Name(proto.status)[len("DEPLOYMENT_STATUS_") :] status_trigger = DeploymentStatusTriggerProto.Name(proto.status_trigger)[ len("DEPLOYMENT_STATUS_TRIGGER_") : ] return cls( name=proto.name, status=DeploymentStatus(status), status_trigger=DeploymentStatusTrigger(status_trigger), message=proto.message, ) @dataclass(frozen=True) class RunningReplicaInfo: replica_id: ReplicaID node_id: Optional[str] node_ip: Optional[str] availability_zone: Optional[str] actor_handle: ActorHandle max_ongoing_requests: int is_cross_language: bool = False multiplexed_model_ids: List[str] = field(default_factory=list) routing_stats: Dict[str, Any] = field(default_factory=dict) port: Optional[int] = None def __post_init__(self): # Set hash value when object is constructed. # We use _actor_id to hash the ActorHandle object # instead of actor_handle itself to make sure # it is consistently same actor handle between different # object ids. hash_val = hash( " ".join( [ self.replica_id.to_full_id_str(), self.node_id if self.node_id else "", str(self.actor_handle._actor_id), str(self.max_ongoing_requests), str(self.is_cross_language), str(self.multiplexed_model_ids), str(self.routing_stats), ] ) ) # RunningReplicaInfo class set frozen=True, this is the hacky way to set # new attribute for the class. object.__setattr__(self, "_hash", hash_val) def __hash__(self): return self._hash def __eq__(self, other): return all( [ isinstance(other, RunningReplicaInfo), self._hash == other._hash, ] ) @dataclass(frozen=True) class DeploymentTargetInfo: is_available: bool running_replicas: List[RunningReplicaInfo] class ServeDeployMode(str, Enum): MULTI_APP = "MULTI_APP" class ServeComponentType(str, Enum): REPLICA = "replica" @dataclass class RequestRoutingInfo: """Information about the request routing. It includes deployment name (from ReplicaID), replica tag (from ReplicaID), multiplex model ids, and routing stats. """ replica_id: ReplicaID multiplexed_model_ids: Optional[List[str]] = None routing_stats: Optional[Dict[str, Any]] = None @dataclass class gRPCRequest: """Sent from the GRPC proxy to replicas on both unary and streaming codepaths.""" user_request_proto: Any class RequestProtocol(str, Enum): UNDEFINED = "UNDEFINED" HTTP = "HTTP" GRPC = "gRPC" class DeploymentHandleSource(str, Enum): UNKNOWN = "UNKNOWN" PROXY = "PROXY" REPLICA = "REPLICA" @dataclass class RequestMetadata: # request_id can be passed by the client and is only generated by the proxy if the # client did not pass it in the headers. It is used for logging across different # system. We can not guarantee the uniqueness of its value. request_id: str # internal_request_id is always generated by the proxy and is used for tracking # request objects. We can assume this is always unique between requests. internal_request_id: str # Method of the user callable to execute. call_method: str = "__call__" # HTTP route path of the request. route: str = "" # Application name. app_name: str = "" # Multiplexed model ID. multiplexed_model_id: str = "" # If this request expects a streaming response. is_streaming: bool = False # The protocol to serve this request _request_protocol: RequestProtocol = RequestProtocol.UNDEFINED # Serve's gRPC context associated with this request for getting and setting metadata grpc_context: Optional[RayServegRPCContext] = None _by_reference: bool = True @property def is_http_request(self) -> bool: return self._request_protocol == RequestProtocol.HTTP @property def is_grpc_request(self) -> bool: return self._request_protocol == RequestProtocol.GRPC class StreamingHTTPRequest: """Sent from the HTTP proxy to replicas on the streaming codepath.""" def __init__( self, asgi_scope: Scope, *, proxy_actor_name: Optional[str] = None, receive_asgi_messages: Optional[ Callable[[RequestMetadata], Awaitable[bytes]] ] = None, ): self._asgi_scope: Scope = asgi_scope if proxy_actor_name is None and receive_asgi_messages is None: raise ValueError( "Either proxy_actor_name or receive_asgi_messages must be provided." ) # If receive_asgi_messages is passed, it'll be called directly. # If proxy_actor_name is passed, the actor will be fetched and its # `receive_asgi_messages` method will be called. self._proxy_actor_name: Optional[str] = proxy_actor_name # Need to keep the actor handle cached to avoid "lost reference to actor" error. self._cached_proxy_actor: Optional[ActorHandle] = None self._receive_asgi_messages: Optional[ Callable[[RequestMetadata], Awaitable[bytes]] ] = receive_asgi_messages @property def asgi_scope(self) -> Scope: return self._asgi_scope @property def receive_asgi_messages(self) -> Callable[[RequestMetadata], Awaitable[bytes]]: if self._receive_asgi_messages is None: self._cached_proxy_actor = ray.get_actor( self._proxy_actor_name, namespace=SERVE_NAMESPACE ) self._receive_asgi_messages = ( self._cached_proxy_actor.receive_asgi_messages.remote ) return self._receive_asgi_messages class TargetCapacityDirection(str, Enum): """Determines what direction the target capacity is scaling.""" UP = "UP" DOWN = "DOWN" @dataclass(frozen=True) class ReplicaQueueLengthInfo: accepted: bool num_ongoing_requests: int @dataclass(frozen=True) class CreatePlacementGroupRequest: bundles: List[Dict[str, float]] strategy: str target_node_id: str name: str runtime_env: Optional[str] = None