Source code for ray.serve.exceptions
from typing import Optional
import grpc
from ray.exceptions import TaskCancelledError
from ray.serve._private.common import DeploymentID
from ray.util.annotations import PublicAPI
[docs]
@PublicAPI(stability="stable")
class RayServeException(Exception):
pass
[docs]
@PublicAPI(stability="stable")
class gRPCStatusError(RayServeException):
"""Internal exception that wraps an exception with user-set gRPC status code.
This is used to preserve user-set gRPC status codes when exceptions are raised
in deployments. When a user sets a status code on the gRPC context before raising
an exception, this wrapper carries that status code through the error handling
path so the proxy can return the user's intended status code instead of INTERNAL.
"""
def __init__(
self,
original_exception: BaseException,
code: Optional[grpc.StatusCode] = None,
details: Optional[str] = None,
):
# Store attributes with underscore prefix to avoid conflicts with
# Ray's exception handling (Ray uses 'cause' internally).
self._original_exception = original_exception
self._grpc_code = code
self._grpc_details = details
super().__init__(str(original_exception))
@property
def original_exception(self) -> BaseException:
"""The original exception that was raised."""
return self._original_exception
@property
def grpc_code(self) -> Optional[grpc.StatusCode]:
"""The user-set gRPC status code, if any."""
return self._grpc_code
@property
def grpc_details(self) -> Optional[str]:
"""The user-set gRPC status details, if any."""
return self._grpc_details
def __str__(self) -> str:
return str(self._original_exception)
[docs]
@PublicAPI(stability="alpha")
class BackPressureError(RayServeException):
"""Raised when max_queued_requests is exceeded on a DeploymentHandle."""
def __init__(self, num_queued_requests: int, max_queued_requests: int):
super().__init__(num_queued_requests, max_queued_requests)
self._message = (
f"Request dropped due to backpressure "
f"(num_queued_requests={num_queued_requests}, "
f"max_queued_requests={max_queued_requests})."
)
def __str__(self) -> str:
return self._message
@property
def message(self) -> str:
return self._message
[docs]
@PublicAPI(stability="alpha")
class RequestCancelledError(RayServeException, TaskCancelledError):
"""Raise when a Serve request is cancelled."""
def __init__(self, request_id: Optional[str] = None):
self._request_id: Optional[str] = request_id
def __str__(self):
if self._request_id:
return f"Request {self._request_id} was cancelled."
else:
return "Request was cancelled."
[docs]
@PublicAPI(stability="alpha")
class DeploymentUnavailableError(RayServeException):
"""Raised when a Serve deployment is unavailable to receive requests.
Currently this happens because the deployment failed to deploy.
"""
def __init__(self, deployment_id: DeploymentID):
self._deployment_id = deployment_id
@property
def message(self) -> str:
return f"{self._deployment_id} is unavailable because it failed to deploy."