ray.serve.batch#

ray.serve.batch(func: ray.serve.batching.F) ray.serve.batching.G[source]#
ray.serve.batch(max_batch_size: int = 10, batch_wait_timeout_s: float = 0.0) Callable[[ray.serve.batching.F], ray.serve.batching.G]

Converts a function to asynchronously handle batches.

The function can be a standalone function or a class method. In both cases, the function must be async def and take a list of objects as its sole argument and return a list of the same length as a result.

When invoked, the caller passes a single object. These will be batched and executed asynchronously once there is a batch of max_batch_size or batch_wait_timeout_s has elapsed, whichever occurs first.

max_batch_size and batch_wait_timeout_s can be updated using setter methods from the batch_handler (set_max_batch_size and set_batch_wait_timeout_s).

Example:

from ray import serve
from starlette.requests import Request

@serve.deployment
class BatchedDeployment:
    @serve.batch(max_batch_size=10, batch_wait_timeout_s=0.1)
    async def batch_handler(self, requests: List[Request]) -> List[str]:
        response_batch = []
        for r in requests:
            name = (await requests.json())["name"]
            response_batch.append(f"Hello {name}!")

        return response_batch

    def update_batch_params(self, max_batch_size, batch_wait_timeout_s):
        self.batch_handler.set_max_batch_size(max_batch_size)
        self.batch_handler.set_batch_wait_timeout_s(batch_wait_timeout_s)

    async def __call__(self, request: Request):
        return await self.batch_handler(request)

app = BatchedDeployment.bind()
Parameters
  • max_batch_size – the maximum batch size that will be executed in one call to the underlying function.

  • batch_wait_timeout_s – the maximum duration to wait for max_batch_size elements before running the current batch.

  • batch_queue_cls – the class to use for the underlying batch queue.

PublicAPI: This API is stable across Ray releases.