Use Custom Algorithm for Request Routing#
Warning
This API is in alpha and may change before becoming stable.
Different Ray serve applications demand different logics for load balancing. For
example, in serving LLMs you might want to have a different policy than balancing
number of requests across replicas: e.g. balancing ongoing input tokens, balancing
kv-cache utilization, etc. RequestRouter
is an abstraction in Ray Serve that allows extension and customization of
load-balancing logic for each deployment.
This guide shows how to use RequestRouter
API to achieve custom load balancing across replicas of a given deployment. It will
cover the following:
Define a simple uniform request router for load balancing
Deploy an app with the uniform request router
Utility mixins for request routing
Define a complex throughput-aware request router
Deploy an app with the throughput-aware request router
Define simple uniform request router#
Create a file custom_request_router.py
with the following code:
import random
from ray.serve.request_router import (
PendingRequest,
RequestRouter,
ReplicaID,
ReplicaResult,
RunningReplica,
)
from typing import (
List,
Optional,
)
class UniformRequestRouter(RequestRouter):
async def choose_replicas(
self,
candidate_replicas: List[RunningReplica],
pending_request: Optional[PendingRequest] = None,
) -> List[List[RunningReplica]]:
print("UniformRequestRouter routing request")
index = random.randint(0, len(candidate_replicas) - 1)
return [[candidate_replicas[index]]]
def on_request_routed(
self,
pending_request: PendingRequest,
replica_id: ReplicaID,
result: ReplicaResult,
):
print("on_request_routed callback is called!!")
This code defines a simple uniform request router that routes requests a random replica
to distribute the load evenly regardless of the queue length of each replica or the body
of the request. The router is defined as a class that inherits from
RequestRouter
. It implements the choose_replicas
method, which returns the random replica for all incoming requests. The returned type
is a list of lists of replicas, where each inner list represents a rank of replicas.
The first rank is the most preferred and the last rank is the least preferred. The
request will be attempted to be routed to the replica with the shortest request queue in
each set of the rank in order until a replica is able to process the request. If none of
the replicas are able to process the request, choose_replicas
will be called again with a backoff delay until a replica is able to process the
request.
Note
This request router also implements on_request_routed
which can help you update the state of the request router after a request is routed.
Deploy an app with the uniform request router#
To use a custom request router, you need to pass the request_router_class
argument to
the deployment
decorator. Also note that the request_router_class
can be passed as the already
imported class or as the string of import path to the class. Let’s deploy a simple app
that uses the uniform request router like this:
from ray import serve
from ray.serve.context import _get_internal_replica_context
from ray.serve.request_router import ReplicaID
@serve.deployment(
request_router_class="custom_request_router:UniformRequestRouter",
num_replicas=10,
ray_actor_options={"num_cpus": 0},
)
class UniformRequestRouterApp:
def __init__(self):
context = _get_internal_replica_context()
self.replica_id: ReplicaID = context.replica_id
async def __call__(self):
return self.replica_id
handle = serve.run(UniformRequestRouterApp.bind())
response = handle.remote().result()
print(f"Response from UniformRequestRouterApp: {response}")
# Example output:
# Response from UniformRequestRouterApp:
# Replica(id='67vc4ts5', deployment='UniformRequestRouterApp', app='default')
As the request is routed, both “UniformRequestRouter routing request” and “on_request_routed callback is called!!” messages will be printed to the console. The response will also be randomly routed to one of the replicas. You can test this by sending more requests and seeing the distribution of the replicas are roughly equal.
Note
Currently, the only way to configure the request router is to pass it as an argument to the deployment decorator. This means that you cannot change the request router for an existing deployment handle with running router. If you have a particular usecase where you need to reconfigure a request router on the deployment handle, please open a feature request on the Ray GitHub repository
Utility mixins for request routing#
Ray Serve provides utility mixins that can be used to extend the functionality of the request router. These mixins can be used to implement common routing policies such as locality-aware routing, multiplexed model support, and FIFO request routing.
FIFOMixin
: This mixin implements first in first out (FIFO) request routing. The default behavior for the request router is OOO (out of order) which routes requests to the exact replica which got assigned by the request passed tochoose_replicas
. This mixin is useful for the routing algorithm that can work independently of the request content, so the requests can be routed as soon as possible in the order they were received. By including this mixin, in your custom request router, the request matching algorithm will be updated to route requests FIFO. There are no additional flags needs to be configured and no additional helper methods provided by this mixin.LocalityMixin
: This mixin implements locality-aware request routing. It updates the internal states when between replica updates to track the location between replicas in the same node, same zone, and everything else. It offers helpersapply_locality_routing
andrank_replicas_via_locality
to route and ranks replicas based on their locality to the request, which can be useful for reducing latency and improving performance.MultiplexMixin
: When you use model-multiplexing you need to route requests based on knowing which replica has already a hot version of the model. It updates the internal states when between replica updates to track the model loaded on each replica, and size of the model cache on each replica. It offers helpersapply_multiplex_routing
andrank_replicas_via_multiplex
to route and ranks replicas based on their multiplexed model id of the request.
Define a complex throughput-aware request router#
A fully featured request router can be more complex and should take into account the
multiplexed model, locality, the request queue length on each replica, and using custom
statistics like throughput to decide which replica to route the request to. The
following class defines a throughput-aware request router that routes requests to the
replica with these factors in mind. Add the following code into the
custom_request_router.py
file:
from ray.serve.request_router import (
FIFOMixin,
LocalityMixin,
MultiplexMixin,
PendingRequest,
RequestRouter,
ReplicaID,
ReplicaResult,
RunningReplica,
)
from typing import (
Dict,
List,
Optional,
)
class ThroughputAwareRequestRouter(
FIFOMixin, MultiplexMixin, LocalityMixin, RequestRouter
):
async def choose_replicas(
self,
candidate_replicas: List[RunningReplica],
pending_request: Optional[PendingRequest] = None,
) -> List[List[RunningReplica]]:
"""
This method chooses the best replica for the request based on
multiplexed, locality, and custom throughput stats. The algorithm
works as follows:
1. Populate top_ranked_replicas based on available replicas based on
multiplex_id
2. Populate and override top_ranked_replicas info based on locality
information of replicas (we want to prefer replicas that are in the
same vicinity to this deployment)
3. Select the replica with minimum throughput.
"""
# Dictionary to hold the top-ranked replicas
top_ranked_replicas: Dict[ReplicaID, RunningReplica] = {}
# Take the best set of replicas for the multiplexed model
if (
pending_request is not None
and pending_request.metadata.multiplexed_model_id
):
ranked_replicas_multiplex: List[RunningReplica] = (
self.rank_replicas_via_multiplex(
replicas=candidate_replicas,
multiplexed_model_id=pending_request.metadata.multiplexed_model_id,
)
)[0]
# Filter out replicas that are not available (queue length exceed max ongoing request)
ranked_replicas_multiplex = self.select_available_replicas(
candidates=ranked_replicas_multiplex
)
for replica in ranked_replicas_multiplex:
top_ranked_replicas[replica.replica_id] = replica
# Take the best set of replicas in terms of locality
ranked_replicas_locality: List[
RunningReplica
] = self.rank_replicas_via_locality(replicas=candidate_replicas)[0]
# Filter out replicas that are not available (queue length exceed max ongoing request)
ranked_replicas_locality = self.select_available_replicas(
candidates=ranked_replicas_locality
)
for replica in ranked_replicas_locality:
top_ranked_replicas[replica.replica_id] = replica
print("ThroughputAwareRequestRouter routing request")
# Take the replica with minimum throughput.
min_throughput_replicas = min(
[replica for replica in top_ranked_replicas.values()],
key=lambda r: r.routing_stats.get("throughput", 0),
)
return [[min_throughput_replicas]]
This request router inherits from RequestRouter
,
as well as FIFOMixin
for FIFO
request routing, LocalityMixin
for locality-aware request routing, and
MultiplexMixin
for multiplexed model support. It implements
choose_replicas
to take the highest ranked replicas from rank_replicas_via_multiplex
and rank_replicas_via_locality
and uses the select_available_replicas
helper to filter out replicas that have reached their maximum request queue length.
Finally, it takes the replicas with the minimum throughput and returns the top one.
Deploy an app with the throughput-aware request router#
To use the throughput-aware request router, you can deploy an app like this:
import time
from collections import defaultdict
from ray import serve
from ray.serve.context import _get_internal_replica_context
from typing import Any, Dict
def _time_ms() -> int:
return int(time.time() * 1000)
@serve.deployment(
request_router_class="custom_request_router:ThroughputAwareRequestRouter",
num_replicas=3,
request_routing_stats_period_s=1,
request_routing_stats_timeout_s=1,
ray_actor_options={"num_cpus": 0},
)
class ThroughputAwareRequestRouterApp:
def __init__(self):
self.throughput_buckets: Dict[int, int] = defaultdict(int)
self.last_throughput_buckets = _time_ms()
context = _get_internal_replica_context()
self.replica_id: ReplicaID = context.replica_id
def __call__(self):
self.update_throughput()
return self.replica_id
def update_throughput(self):
current_timestamp_ms = _time_ms()
# Under high concurrency, requests can come in at different times. This
# early return helps to skip if the current_timestamp_ms is more than a
# second older than the last throughput bucket.
if current_timestamp_ms < self.last_throughput_buckets - 1000:
return
# Record the request to the bucket
self.throughput_buckets[current_timestamp_ms] += 1
self.last_throughput_buckets = current_timestamp_ms
def record_routing_stats(self) -> Dict[str, Any]:
current_timestamp_ms = _time_ms()
throughput = 0
for t, c in list(self.throughput_buckets.items()):
if t < current_timestamp_ms - 1000:
# Remove the bucket if it is older than 1 second
self.throughput_buckets.pop(t)
else:
throughput += c
return {
"throughput": throughput,
}
handle = serve.run(ThroughputAwareRequestRouterApp.bind())
response = handle.remote().result()
print(f"Response from ThroughputAwareRequestRouterApp: {response}")
# Example output:
# Response from ThroughputAwareRequestRouterApp:
# Replica(id='tkywafya', deployment='ThroughputAwareRequestRouterApp', app='default')
Similar to the uniform request router, the custom request router can be defined in the
request_router_class
argument of the deployment
decorator. The Serve controller pulls statistics from the replica of each deployment by
calling record_routing_stats. The request_routing_stats_period_s
and
request_routing_stats_timeout_s
arguments control the frequency and timeout time of
the serve controller pulling information from each replica in its background thread.
You can customize the emission of these statistics by overriding record_routing_stats
in the definition of the deployment class. The custom request router can then get the
updated routing stats by looking up the routing_stats
attribute of the running
replicas and use it in the routing policy.