ray.serve.batch(_sync_func: Callable[[List[T]], List[R]], /) Callable[[T], R][source]#
ray.serve.batch(_async_func: Callable[[List[T]], Coroutine[Any, Any, List[R]]], /) Callable[[T], Coroutine[Any, Any, R]]
ray.serve.batch(_sync_meth: _SyncBatchingMethod[SelfType, T, R], /) Callable[[SelfType, T], R]
ray.serve.batch(_async_meth: _AsyncBatchingMethod[SelfType, T, R], /) Callable[[SelfType, T], Coroutine[Any, Any, R]]
ray.serve.batch(_: Literal[None] = None, /, max_batch_size: int = 10, batch_wait_timeout_s: float = 0.0) _BatchDecorator

Converts a function to asynchronously handle batches.

The function can be a standalone function or a class method. In both cases, the function must be async def and take a list of objects as its sole argument and return a list of the same length as a result.

When invoked, the caller passes a single object. These will be batched and executed asynchronously once there is a batch of max_batch_size or batch_wait_timeout_s has elapsed, whichever occurs first.

max_batch_size and batch_wait_timeout_s can be updated using setter methods from the batch_handler (set_max_batch_size and set_batch_wait_timeout_s).


from ray import serve
from starlette.requests import Request

class BatchedDeployment:
    @serve.batch(max_batch_size=10, batch_wait_timeout_s=0.1)
    async def batch_handler(self, requests: List[Request]) -> List[str]:
        response_batch = []
        for r in requests:
            name = (await requests.json())["name"]
            response_batch.append(f"Hello {name}!")

        return response_batch

    def update_batch_params(self, max_batch_size, batch_wait_timeout_s):

    async def __call__(self, request: Request):
        return await self.batch_handler(request)

app = BatchedDeployment.bind()
  • max_batch_size – the maximum batch size that will be executed in one call to the underlying function.

  • batch_wait_timeout_s – the maximum duration to wait for max_batch_size elements before running the current batch.