Batching Tutorial

In this guide, we will deploy a simple vectorized adder that takes a batch of queries and adds them at once. In particular, we show:

  • How to implement and deploy a Ray Serve deployment that accepts batches.

  • How to configure the batch size.

  • How to query the model in Python.

This tutorial should help the following use cases:

  • You want to perform offline batch inference on a cluster of machines.

  • You want to serve online queries and your model can take advantage of batching. For example, linear regressions and neural networks use CPU and GPU’s vectorized instructions to perform computation in parallel. Performing inference with batching can increase the throughput of the model as well as utilization of the hardware.

Let’s import Ray Serve and some other helpers.

from typing import List
import time

import numpy as np
import requests
from starlette.requests import Request

import ray
from ray import serve

You can use the @serve.batch decorator to annotate a function or a method. This annotation will automatically cause calls to the function to be batched together. The function must handle a list of objects and will be called with a single object. This function must also be async def so that you can handle multiple queries concurrently:

async def my_batch_handler(self, requests: List):

This batch handler can then be called from another async def method in your deployment. These calls will be batched and executed together, but return an individual result as if they were a normal function call:

class MyBackend:
    async def my_batch_handler(self, requests: List):
        results = []
        for request in requests:
        return results

    async def __call__(self, request):
        await self.my_batch_handler(request)


By default, Ray Serve performs opportunistic batching. This means that as soon as the batch handler is called, the method will be executed without waiting for a full batch. If there are more queries available after this call finishes, a larger batch may be executed. This behavior can be tuned using the batch_wait_timeout_s option to @serve.batch (defaults to 0). Increasing this timeout may improve throughput at the cost of latency under low load.

Let’s define a deployment that takes in a list of requests, extracts the input value, converts them into an array, and uses NumPy to add 1 to each element.

class BatchAdder:
    async def handle_batch(self, numbers: List[int]):
        input_array = np.array(numbers)
        print("Our input array has shape:", input_array.shape)
        # Sleep for 200ms, this could be performing CPU intensive computation
        # in real models
        output_array = input_array + 1
        return output_array.astype(int).tolist()

    async def __call__(self, request: Request):
        return await self.handle_batch(int(request.query_params["number"]))

Let’s deploy it. Note that in the @serve.batch decorator, we are specifying specifying the maximum batch size via max_batch_size=4. This option limits the maximum possible batch size that will be executed at once.


Let’s define a Ray remote task to send queries in parallel. As you can see, the first batch has a batch size of 1, and the subsequent queries have a batch size of 4. Even though each query is issued independently, Ray Serve was able to evaluate them in batches.

def send_query(number):
    resp = requests.get("http://localhost:8000/adder?number={}".format(number))
    return int(resp.text)

# Let's use Ray to send all queries in parallel
results = ray.get([send_query.remote(i) for i in range(9)])
print("Result returned:", results)
# Output
# (pid=...) Our input array has shape: (1,)
# (pid=...) Our input array has shape: (4,)
# (pid=...) Our input array has shape: (4,)
# Result returned: [1, 2, 3, 4, 5, 6, 7, 8, 9]

What if you want to evaluate a whole batch in Python? Ray Serve allows you to send queries via the Python API. A batch of queries can either come from the web server or the Python API. Learn more here.

To query the deployment via the Python API, we can use Deployment.get_handle to receive a handle to the corresponding deployment. To enqueue a query, you can call handle.method.remote(data). This call returns immediately with a Ray ObjectRef. You can call ray.get to retrieve the result.

handle = BatchAdder.get_handle()
input_batch = list(range(9))
print("Input batch is", input_batch)
# Input batch is [0, 1, 2, 3, 4, 5, 6, 7, 8]

result_batch = ray.get([handle.handle_batch.remote(i) for i in input_batch])
# Output
# (pid=...) Current context is python
# (pid=...) Our input array has shape: (1,)
# (pid=...) Current context is python
# (pid=...) Our input array has shape: (4,)
# (pid=...) Current context is python
# (pid=...) Our input array has shape: (4,)

print("Result batch is", result_batch)
# Result batch is [1, 2, 3, 4, 5, 6, 7, 8, 9]