Use Custom Algorithm for Request Routing#

Warning

This API is in alpha and may change before becoming stable.

Different Ray serve applications demand different logics for load balancing. For example, in serving LLMs you might want to have a different policy than balancing number of requests across replicas: e.g. balancing ongoing input tokens, balancing kv-cache utilization, etc. RequestRouter is an abstraction in Ray Serve that allows extension and customization of load-balancing logic for each deployment.

This guide shows how to use RequestRouter API to achieve custom load balancing across replicas of a given deployment. It will cover the following:

  • Define a simple uniform request router for load balancing

  • Deploy an app with the uniform request router

  • Utility mixins for request routing

  • Define a complex throughput-aware request router

  • Deploy an app with the throughput-aware request router

Define simple uniform request router#

Create a file custom_request_router.py with the following code:

import random
from ray.serve.request_router import (
    PendingRequest,
    RequestRouter,
    ReplicaID,
    ReplicaResult,
    RunningReplica,
)
from typing import (
    List,
    Optional,
)


class UniformRequestRouter(RequestRouter):
    async def choose_replicas(
        self,
        candidate_replicas: List[RunningReplica],
        pending_request: Optional[PendingRequest] = None,
    ) -> List[List[RunningReplica]]:
        print("UniformRequestRouter routing request")
        index = random.randint(0, len(candidate_replicas) - 1)
        return [[candidate_replicas[index]]]

    def on_request_routed(
        self,
        pending_request: PendingRequest,
        replica_id: ReplicaID,
        result: ReplicaResult,
    ):
        print("on_request_routed callback is called!!")


This code defines a simple uniform request router that routes requests a random replica to distribute the load evenly regardless of the queue length of each replica or the body of the request. The router is defined as a class that inherits from RequestRouter. It implements the choose_replicas method, which returns the random replica for all incoming requests. The returned type is a list of lists of replicas, where each inner list represents a rank of replicas. The first rank is the most preferred and the last rank is the least preferred. The request will be attempted to be routed to the replica with the shortest request queue in each set of the rank in order until a replica is able to process the request. If none of the replicas are able to process the request, choose_replicas will be called again with a backoff delay until a replica is able to process the request.

Note

This request router also implements on_request_routed which can help you update the state of the request router after a request is routed.

Deploy an app with the uniform request router#

To use a custom request router, you need to pass the request_router_class argument to the deployment decorator. Also note that the request_router_class can be passed as the already imported class or as the string of import path to the class. Let’s deploy a simple app that uses the uniform request router like this:

from ray import serve
from ray.serve.context import _get_internal_replica_context
from ray.serve.request_router import ReplicaID


@serve.deployment(
    request_router_class="custom_request_router:UniformRequestRouter",
    num_replicas=10,
    ray_actor_options={"num_cpus": 0},
)
class UniformRequestRouterApp:
    def __init__(self):
        context = _get_internal_replica_context()
        self.replica_id: ReplicaID = context.replica_id

    async def __call__(self):
        return self.replica_id


handle = serve.run(UniformRequestRouterApp.bind())
response = handle.remote().result()
print(f"Response from UniformRequestRouterApp: {response}")
# Example output:
#   Response from UniformRequestRouterApp:
#   Replica(id='67vc4ts5', deployment='UniformRequestRouterApp', app='default')

As the request is routed, both “UniformRequestRouter routing request” and “on_request_routed callback is called!!” messages will be printed to the console. The response will also be randomly routed to one of the replicas. You can test this by sending more requests and seeing the distribution of the replicas are roughly equal.

Note

Currently, the only way to configure the request router is to pass it as an argument to the deployment decorator. This means that you cannot change the request router for an existing deployment handle with running router. If you have a particular usecase where you need to reconfigure a request router on the deployment handle, please open a feature request on the Ray GitHub repository

Utility mixins for request routing#

Ray Serve provides utility mixins that can be used to extend the functionality of the request router. These mixins can be used to implement common routing policies such as locality-aware routing, multiplexed model support, and FIFO request routing.

  • FIFOMixin: This mixin implements first in first out (FIFO) request routing. The default behavior for the request router is OOO (out of order) which routes requests to the exact replica which got assigned by the request passed to choose_replicas. This mixin is useful for the routing algorithm that can work independently of the request content, so the requests can be routed as soon as possible in the order they were received. By including this mixin, in your custom request router, the request matching algorithm will be updated to route requests FIFO. There are no additional flags needs to be configured and no additional helper methods provided by this mixin.

  • LocalityMixin: This mixin implements locality-aware request routing. It updates the internal states when between replica updates to track the location between replicas in the same node, same zone, and everything else. It offers helpers apply_locality_routing and rank_replicas_via_locality to route and ranks replicas based on their locality to the request, which can be useful for reducing latency and improving performance.

  • MultiplexMixin: When you use model-multiplexing you need to route requests based on knowing which replica has already a hot version of the model. It updates the internal states when between replica updates to track the model loaded on each replica, and size of the model cache on each replica. It offers helpers apply_multiplex_routing and rank_replicas_via_multiplex to route and ranks replicas based on their multiplexed model id of the request.

Define a complex throughput-aware request router#

A fully featured request router can be more complex and should take into account the multiplexed model, locality, the request queue length on each replica, and using custom statistics like throughput to decide which replica to route the request to. The following class defines a throughput-aware request router that routes requests to the replica with these factors in mind. Add the following code into the custom_request_router.py file:

from ray.serve.request_router import (
    FIFOMixin,
    LocalityMixin,
    MultiplexMixin,
    PendingRequest,
    RequestRouter,
    ReplicaID,
    ReplicaResult,
    RunningReplica,
)
from typing import (
    Dict,
    List,
    Optional,
)


class ThroughputAwareRequestRouter(
    FIFOMixin, MultiplexMixin, LocalityMixin, RequestRouter
):
    async def choose_replicas(
        self,
        candidate_replicas: List[RunningReplica],
        pending_request: Optional[PendingRequest] = None,
    ) -> List[List[RunningReplica]]:
        """
        This method chooses the best replica for the request based on
        multiplexed, locality, and custom throughput stats. The algorithm
        works as follows:

        1. Populate top_ranked_replicas based on available replicas based on
          multiplex_id
        2. Populate and override top_ranked_replicas info based on locality
         information of replicas (we want to prefer replicas that are in the
          same vicinity to this deployment)
        3. Select the replica with minimum throughput.
        """

        # Dictionary to hold the top-ranked replicas
        top_ranked_replicas: Dict[ReplicaID, RunningReplica] = {}
        # Take the best set of replicas for the multiplexed model
        if (
            pending_request is not None
            and pending_request.metadata.multiplexed_model_id
        ):
            ranked_replicas_multiplex: List[RunningReplica] = (
                self.rank_replicas_via_multiplex(
                    replicas=candidate_replicas,
                    multiplexed_model_id=pending_request.metadata.multiplexed_model_id,
                )
            )[0]

            # Filter out replicas that are not available (queue length exceed max ongoing request)
            ranked_replicas_multiplex = self.select_available_replicas(
                candidates=ranked_replicas_multiplex
            )

            for replica in ranked_replicas_multiplex:
                top_ranked_replicas[replica.replica_id] = replica

        # Take the best set of replicas in terms of locality
        ranked_replicas_locality: List[
            RunningReplica
        ] = self.rank_replicas_via_locality(replicas=candidate_replicas)[0]

        # Filter out replicas that are not available (queue length exceed max ongoing request)
        ranked_replicas_locality = self.select_available_replicas(
            candidates=ranked_replicas_locality
        )

        for replica in ranked_replicas_locality:
            top_ranked_replicas[replica.replica_id] = replica

        print("ThroughputAwareRequestRouter routing request")

        # Take the replica with minimum throughput.
        min_throughput_replicas = min(
            [replica for replica in top_ranked_replicas.values()],
            key=lambda r: r.routing_stats.get("throughput", 0),
        )
        return [[min_throughput_replicas]]


This request router inherits from RequestRouter, as well as FIFOMixin for FIFO request routing, LocalityMixin for locality-aware request routing, and MultiplexMixin for multiplexed model support. It implements choose_replicas to take the highest ranked replicas from rank_replicas_via_multiplex and rank_replicas_via_locality and uses the select_available_replicas helper to filter out replicas that have reached their maximum request queue length. Finally, it takes the replicas with the minimum throughput and returns the top one.

Deploy an app with the throughput-aware request router#

To use the throughput-aware request router, you can deploy an app like this:

import time
from collections import defaultdict
from ray import serve
from ray.serve.context import _get_internal_replica_context
from typing import Any, Dict


def _time_ms() -> int:
    return int(time.time() * 1000)


@serve.deployment(
    request_router_class="custom_request_router:ThroughputAwareRequestRouter",
    num_replicas=3,
    request_routing_stats_period_s=1,
    request_routing_stats_timeout_s=1,
    ray_actor_options={"num_cpus": 0},
)
class ThroughputAwareRequestRouterApp:
    def __init__(self):
        self.throughput_buckets: Dict[int, int] = defaultdict(int)
        self.last_throughput_buckets = _time_ms()
        context = _get_internal_replica_context()
        self.replica_id: ReplicaID = context.replica_id

    def __call__(self):
        self.update_throughput()
        return self.replica_id

    def update_throughput(self):
        current_timestamp_ms = _time_ms()

        # Under high concurrency, requests can come in at different times. This
        # early return helps to skip if the current_timestamp_ms is more than a
        # second older than the last throughput bucket.
        if current_timestamp_ms < self.last_throughput_buckets - 1000:
            return

        # Record the request to the bucket
        self.throughput_buckets[current_timestamp_ms] += 1
        self.last_throughput_buckets = current_timestamp_ms

    def record_routing_stats(self) -> Dict[str, Any]:
        current_timestamp_ms = _time_ms()
        throughput = 0

        for t, c in list(self.throughput_buckets.items()):
            if t < current_timestamp_ms - 1000:
                # Remove the bucket if it is older than 1 second
                self.throughput_buckets.pop(t)
            else:
                throughput += c

        return {
            "throughput": throughput,
        }


handle = serve.run(ThroughputAwareRequestRouterApp.bind())
response = handle.remote().result()
print(f"Response from ThroughputAwareRequestRouterApp: {response}")
# Example output:
#   Response from ThroughputAwareRequestRouterApp:
#   Replica(id='tkywafya', deployment='ThroughputAwareRequestRouterApp', app='default')

Similar to the uniform request router, the custom request router can be defined in the request_router_class argument of the deployment decorator. The Serve controller pulls statistics from the replica of each deployment by calling record_routing_stats. The request_routing_stats_period_s and request_routing_stats_timeout_s arguments control the frequency and timeout time of the serve controller pulling information from each replica in its background thread. You can customize the emission of these statistics by overriding record_routing_stats in the definition of the deployment class. The custom request router can then get the updated routing stats by looking up the routing_stats attribute of the running replicas and use it in the routing policy.