Model Composition
Contents
Model Composition#
This section helps you:
compose multiple deployments containing ML logic or business logic into a single application
independently scale and configure each of your ML models and business logic steps
connect your Ray Serve deployments together with the deployment graph API
Composing Deployments using ServeHandles#
You can call deployment methods from within other deployments using the ServeHandle. This lets you divide your application’s steps (such as preprocessing, model inference, and post-processing) into independent deployments that can be independently scaled and configured.
To use the ServeHandle
, use handle.remote
to send requests to a deployment.
These requests can be ordinary Python args and kwargs that are passed directly to the method. This method call returns a Ray ObjectRef
whose result can be waited for or retrieved using await
or ray.get
.
Model Composition Example#
Here’s an example:
1# File name: hello.py
2import ray
3from ray import serve
4
5
6@serve.deployment
7class LanguageClassifer:
8 def __init__(self, spanish_responder, french_responder):
9 self.spanish_responder = spanish_responder
10 self.french_responder = french_responder
11
12 async def __call__(self, http_request):
13 request = await http_request.json()
14 language, name = request["language"], request["name"]
15
16 if language == "spanish":
17 ref = await self.spanish_responder.say_hello.remote(name)
18 elif language == "french":
19 ref = await self.french_responder.say_hello.remote(name)
20 else:
21 return "Please try again."
22
23 return await ref
24
25
26@serve.deployment
27class SpanishResponder:
28 def say_hello(self, name: str):
29 return f"Hola {name}"
30
31
32@serve.deployment
33class FrenchResponder:
34 def say_hello(self, name: str):
35 return f"Bonjour {name}"
36
37
38spanish_responder = SpanishResponder.bind()
39french_responder = FrenchResponder.bind()
40language_classifier = LanguageClassifer.bind(spanish_responder, french_responder)
In line 40, the LanguageClassifier
deployment takes in the spanish_responder
and french_responder
as constructor arguments. At runtime, these arguments are converted into ServeHandles
. LanguageClassifier
can then call the spanish_responder
and french_responder
’s deployment methods using this handle.
For example, the LanguageClassifier
’s __call__
method uses the HTTP request’s values to decide whether to respond in Spanish or French. It then forwards the request’s name to the spanish_responder
or the french_responder
on lines 17 and 19 using the ServeHandles
. The calls are formatted as:
await self.spanish_responder.say_hello.remote(name)
This call has a few parts:
await
lets us issue an asynchronous request through theServeHandle
.self.spanish_responder
is theSpanishResponder
handle taken in through the constructor.say_hello
is theSpanishResponder
method to invoke.remote
indicates that this is aServeHandle
call to another deployment. This is required when invoking a deployment’s method through another deployment. It needs to be added to the method name.name
is the argument forsay_hello
. You can pass any number of arguments or keyword arguments here.
This call returns a reference to the result– not the result itself. This pattern allows the call to execute asynchronously. To get the actual result, await
the reference. await
blocks until the asynchronous call executes, and then it returns the result. In this example, line 23 calls await ref
and returns the resulting string. Note that getting the result needs two await
statements in total. First, the script must await
the ServeHandle
call itself to retrieve a reference. Then it must await
the reference to get the final result.
Warning
You can use the ray.get(ref)
method to get the return value of remote ServeHandle
calls. However, calling ray.get
from inside a deployment is an antipattern. It blocks the deployment from executing any other code until the call is finished. Using await
lets the deployment process other requests while waiting for the ServeHandle
call to finish. You should use await
instead of ray.get
inside deployments.
You can copy the hello.py
script above and run it with serve run
. Make sure to run the command from a directory containing hello.py
, so it can locate the script:
$ serve run hello:language_classifier
You can use this client script to interact with the example:
# File name: hello_client.py
import requests
response = requests.post(
"http://localhost:8000", json={"language": "spanish", "name": "Dora"}
)
greeting = response.text
print(greeting)
While the serve run
command is running, open a separate terminal window and run this script:
$ python hello_client.py
Hola Dora
Note
Composition lets you break apart your application and independently scale each part. For instance, suppose this LanguageClassifier
application’s requests were 75% Spanish and 25% French. You could scale your SpanishResponder
to have 3 replicas and your FrenchResponder
to have 1 replica, so you could meet your workload’s demand. This flexibility also applies to reserving resources like CPUs and GPUs, as well as any other configurations you can set for each deployment.
With composition, you can avoid application-level bottlenecks when serving models and business logic steps that use different types and amounts of resources.
ServeHandle Deep Dive#
Conceptually, a ServeHandle
is a client-side load balancer, routing requests to any replicas of a given deployment. Also, it performs buffering internally so it won’t overwhelm the replicas.
Using the current number of requests buffered, it informs the autoscaler to scale up the number of replicas.
ServeHandle
s take request parameters and returns a future object of type ray.ObjectRef
, whose value will be filled with the result object. Because of the internal buffering, the time from submitting a request to getting a ray.ObjectRef
can vary.
Because of this variability, Serve offers two types of handles to ensure the buffering period is handled efficiently. We offer synchronous and asynchronous versions of the handle:
RayServeSyncHandle
directly returns aray.ObjectRef
. It blocks the current thread until the request is matched to a replica.RayServeDeploymentHandle
returns anasyncio.Task
upon submission. Theasyncio.Task
can be awaited to resolve to aray.ObjectRef
. While the current request is buffered, other requests can be processed concurrently.
serve.run
deploys a deployment graph and returns the entrypoint node’s handle (the node you passed as argument to serve.run
). The return type is a RayServeSyncHandle
. This is useful for interacting with and testing the newly created deployment graph.
from starlette.requests import Request
import ray
from ray import serve
from ray.serve.handle import RayServeSyncHandle
@serve.deployment
class Model:
def __call__(self) -> str:
return "hello"
handle: RayServeSyncHandle = serve.run(Model.bind())
ref: ray.ObjectRef = handle.remote() # blocks until request is assigned to replica
assert ray.get(ref) == "hello"
In all other cases, RayServeDeploymentHandle
is the default because the API is more performant than its blocking counterpart. For example, when implementing a dynamic dispatch node in deployment graph, the handle is asynchronous.
import asyncio
import random
import ray
from ray import serve
from ray.serve.handle import RayServeDeploymentHandle, RayServeSyncHandle
@serve.deployment
class Model:
def __call__(self) -> str:
return "hello"
@serve.deployment
class DynamicDispatcher:
def __init__(
self, handle_a: RayServeDeploymentHandle, handle_b: RayServeDeploymentHandle
):
self.handle_a = handle_a
self.handle_b = handle_b
async def __call__(self):
handle_chosen = self.handle_a if random.random() < 0.5 else self.handle_b
# The request is enqueued.
submission_task: asyncio.Task = handle_chosen.remote()
# The request is assigned to a replica.
ref: ray.ObjectRef = await submission_task
# The request has been processed by the replica.
result = await ref
return result
handle: RayServeSyncHandle = serve.run(
DynamicDispatcher.bind(Model.bind(), Model.bind())
)
ref: ray.ObjectRef = handle.remote()
assert ray.get(ref) == "hello"
The result of deployment_handle.remote()
can also be passed directly as an argument to other downstream handles, without having to await on it.
import asyncio
import ray
from ray import serve
from ray.serve.handle import RayServeDeploymentHandle, RayServeSyncHandle
@serve.deployment
class Model:
def __call__(self, inp):
return "hello " + inp
@serve.deployment
class Chain:
def __init__(
self, handle_a: RayServeDeploymentHandle, handle_b: RayServeDeploymentHandle
):
self.handle_a = handle_a
self.handle_b = handle_b
async def __call__(self, inp):
ref: asyncio.Task = await self.handle_b.remote(
# Serve can handle enqueued-task as dependencies.
self.handle_a.remote(inp)
)
return await ref
handle: RayServeSyncHandle = serve.run(Chain.bind(Model.bind(), Model.bind()))
ref: ray.ObjectRef = handle.remote("Serve")
assert ray.get(ref) == "hello hello Serve"
In both types of ServeHandle
, you can call a specific method by using the .method_name
accessor. For example:
import ray
from ray import serve
from ray.serve.handle import RayServeSyncHandle
@serve.deployment
class Deployment:
def method1(self, arg: str) -> str:
return f"Method1: {arg}"
def __call__(self, arg: str) -> str:
return f"__call__: {arg}"
handle: RayServeSyncHandle = serve.run(Deployment.bind())
ray.get(handle.remote("hi")) # Defaults to calling the __call__ method.
ray.get(handle.method1.remote("hi")) # Call a different method.
Note
ray.ObjectRef
corresponds to the result of a request submission. To retrieve the result, you can use the synchronous Ray Core API ray.get(ref)
or the async API await ref
. To wait for the result to be available without retrieving it, you can use the synchronous API ray.wait([ref])
or the async API await asyncio.wait([ref])
. You can mix and match these calls, but we recommend using async APIs to increase concurrency.
Deployment Graph API#
Note
The call graph is in alpha, so its APIs are subject to change.
For more advanced composition patterns, it can be useful to surface the relationships between deployments, instead of hiding them inside individual deployment definitions.
Ray Serve’s deployment graph API lets you specify how to route requests through your deployments, so you can explicitly create a dependency graph. It also has additional features like HTTP adapters and input routing that help you build more expressive graphs.
Binding Deployments#
The basic building block for all deployment graphs is the DeploymentNode
. One type of DeploymentNode
is the ClassNode
. You can create ClassNodes
by binding class-based deployments to their constructor’s arguments with the bind
method. This may sound familiar because you’ve already been doing this whenever you bind and run class-based deployments, such as in the Calling Deployments using ServeHandles section.
As another example:
# File name: echo.py
from starlette.requests import Request
from ray import serve
@serve.deployment
class EchoClass:
def __init__(self, echo_str: str):
self.echo_str = echo_str
def __call__(self, request: Request) -> str:
return self.echo_str
# You can create ClassNodes from the EchoClass deployment
foo_node = EchoClass.bind("foo")
bar_node = EchoClass.bind("bar")
baz_node = EchoClass.bind("baz")
echo.py
defines three ClassNodes
: foo_node
, bar_node
, and baz_node
. The nodes are defined by invoking bind
on the EchoClass
deployment. They have different behaviors because they use different arguments in the bind
call.
Note that all three of these nodes were created from the same EchoClass
deployment. Class deployments are essentially factories for ClassNodes
. A single class deployment can produce multiple ClassNodes
through multiple bind
statements.
There are two options to run a node:
serve.run(node)
: This Python call can be added to your Python script to run a particular node. This call starts a Ray cluster (if one isn’t already running), deploys the node to it, and then returns. You can call this function multiple times in the same script on differentDeploymentNodes
. Each time, it tears down any deployments it previously deployed and deploy the passed-in node’s deployment. After the script exits, the cluster and any nodes deployed byserve.run
are torn down.serve run module:node
: This CLI command starts a Ray cluster and runs the node at the import pathmodule:node
. It then blocks, allowing you to open a separate terminal window and issue requests to the running deployment. You can stop theserve run
command withctrl-c
.
When you run a node, you are deploying the node’s deployment and its bound arguments. Ray Serve creates a deployment in Ray and instantiates your deployment’s class using the arguments. By default, you can send requests to your deployment at http://localhost:8000
. These requests are converted to Starlette request
objects and passed to your class’s __call__
method.
Note
Additionally, when you run a node, the deployment’s configurations (which you can set in the @serve.deployment
decorator, through an options
call, or a Serve config file) still apply to the deployment. You can use this to independently scale and configure your graph’s deployments by, for instance, setting different num_replicas
, num_cpus
, or num_gpus
values for different deployments.
You can try this example out using the serve run
CLI:
$ serve run echo:foo_node
Here’s a client script that can send requests to your node:
# File name: echo_client.py
import requests
response = requests.get("http://localhost:8000/")
echo = response.text
print(echo)
While the deployment is running with serve run
, open a separate terminal window and issue a request to it with the echo_client.py
script:
$ python echo_client.py
foo
Building the Call Graph: MethodNodes and FunctionNodes#
After defining your ClassNodes
, you can specify how HTTP requests should be processed using the call graph. As an example, let’s look at a deployment graph that implements this chain of arithmetic operations:
output = request + 2 - 1 + 3
Here’s the graph:
1# File name: arithmetic.py
2from ray import serve
3from ray.serve.drivers import DAGDriver
4from ray.serve.deployment_graph import InputNode
5
6from starlette.requests import Request
7
8
9@serve.deployment
10class AddCls:
11 def __init__(self, addend: float):
12 self.addend = addend
13
14 def add(self, number: float) -> float:
15 return number + self.addend
16
17 async def unpack_request(self, http_request: Request) -> float:
18 return await http_request.json()
19
20
21@serve.deployment
22def subtract_one_fn(number: float) -> float:
23 return number - 1
24
25
26@serve.deployment
27async def unpack_request(http_request: Request) -> float:
28 return await http_request.json()
29
30
31add_2 = AddCls.bind(2)
32add_3 = AddCls.bind(3)
33
34with InputNode() as http_request:
35 request_number = unpack_request.bind(http_request)
36 add_2_output = add_2.add.bind(request_number)
37 subtract_1_output = subtract_one_fn.bind(add_2_output)
38 add_3_output = add_3.add.bind(subtract_1_output)
39
40graph = DAGDriver.bind(add_3_output)
Lines 31 and 32 bind two ClassNodes
from the AddCls
deployment. Line 34 starts the call graph:
with InputNode() as http_request:
request_number = unpack_request.bind(http_request)
add_2_output = add_2.add.bind(request_number)
subtract_1_output = subtract_one_fn.bind(add_2_output)
add_3_output = add_3.add.bind(subtract_1_output)
The with
statement (known as a “context manager” in Python) initializes a special Ray Serve-provided object called an InputNode
. This isn’t a DeploymentNode
like ClassNodes
, MethodNodes
, or FunctionNodes
. Rather, it’s the input of the graph. In this case, that input is an HTTP request. In a later section, you’ll learn how to change this input using another Ray Serve-provided object called the DAGDriver
.
Note
The InputNode
tells Ray Serve where to send the graph input at runtime. In this example, for instance, http_request
is an InputNode
object, so you can’t call request
methods like .json()
on it directly in the context manager. However, during runtime, Ray Serve passes incoming HTTP requests directly into the same functions and methods that http_request
is passed into, so those functions and methods can call request
methods like .json()
on the request
object that gets passed in.
You can use the InputNode
to indicate which node(s) the graph input should be passed into by passing the InputNode
into bind
calls within the context manager. In this example, the http_request
is passed to only one node, unpack_request
. The output of that bind call, request_number
, is a FunctionNode
. FunctionNodes
are produced when deployments containing functions are bound to arguments for that function using bind
. request_number
represents the output of unpack_request
when called on incoming HTTP requests. unpack_request
, which is defined on line 26, processes the HTTP request’s JSON body and returns a number that can be passed into arithmetic operations.
Tip
If you don’t want to manually unpack HTTP requests, check out this guide’s section on HTTP adapters, which can handle unpacking for you.
The graph then passes request_number
into a bind
call on add_2
’s add
method. The output of this call, add_2_output
is a MethodNode
. MethodNodes
are produced when ClassNode
methods are bound to arguments using bind
. In this case, add_2_output
represents the result of adding 2 to the number in the request.
The rest of the call graph uses another FunctionNode
and MethodNode
to finish the chain of arithmetic. add_2_output
is bound to the subtract_one_fn
deployment, producing the subtract_1_output
FunctionNode
. Then, the subtract_1_output
is bound to the add_3.add
method, producing the add_3_output
MethodNode
. This add_3_output
MethodNode
represents the final output from the chain of arithmetic operations.
To run the call graph, you need to use a driver. Drivers are deployments that process the call graph that you’ve written and route incoming requests through your deployments based on that graph. Ray Serve provides a driver called DAGDriver
used on line 40:
graph = DAGDriver.bind(add_3_output)
Generally, the DAGDriver
needs to be bound to the FunctionNode
or MethodNode
representing the final output of a graph. This bind
call returns a ClassNode
that you can run in serve.run
or serve run
. Running this ClassNode
also deploys the rest of the graph’s deployments.
Note
The DAGDriver
can also be bound to ClassNodes
. This is useful if you construct a deployment graph where ClassNodes
invoke other ClassNodes
’ methods. In this case, you should pass in the “root” ClassNode
to DAGDriver
(i.e. the one that you would otherwise pass into serve.run
). Check out the Calling Deployments using ServeHandles section for more info.
You can test this example using this client script:
# File name: arithmetic_client.py
import requests
response = requests.post("http://localhost:8000/", json=5)
output = response.json()
print(output)
Start the graph in the terminal:
$ serve run arithmetic:graph
In a separate terminal window, run the client script to make requests to the graph:
$ python arithmetic_client.py
9
Drivers and HTTP Adapters#
Ray Serve provides the DAGDriver
, which routes HTTP requests through your call graph. As mentioned in the call graph section, the DAGDriver
takes in a DeploymentNode
and it produces a ClassNode
that you can run.
The DAGDriver
also has an optional keyword argument: http_adapter
. HTTP adapters are functions that get run on the HTTP request before it’s passed into the graph. Ray Serve provides a handful of these adapters, so you can rely on them to conveniently handle the HTTP parsing while focusing your attention on the graph itself.
For instance, you can use the Ray Serve-provided json_request
adapter to simplify the arithmetic call graph by eliminating the unpack_request
function. You can replace lines 29 through 38 with this graph:
# This import can go to the top of the file.
from ray.serve.http_adapters import json_request
add_2 = AddCls.bind(2)
add_3 = AddCls.bind(3)
with InputNode() as request_number:
add_2_output = add_2.add.bind(request_number)
subtract_1_output = subtract_one_fn.bind(add_2_output)
add_3_output = add_3.add.bind(subtract_1_output)
graph = DAGDriver.bind(add_3_output, http_adapter=json_request)
Without an http_adapter
, an InputNode
represents an HTTP request, and at runtime, incoming HTTP request
objects are passed into the same functions and methods that the InputNode
is passed into. When you set an http_adapter
, the InputNode
represents the http_adapter
’s output.
At runtime:
Ray Serve sends each HTTP
request
object to theDAGDriver
.The
DAGDriver
calls thehttp_adapter
function on each request.The
DAGDriver
passes thehttp_adapter
output to the same function and methods that theInputNode
is passed into, kicking off the request’s journey through the call graph.
In the example above, the InputNode
represents the number packaged inside the request’s JSON body instead of the HTTP request itself. You can pass the JSON directly into the graph instead of first unpacking it from the request.
See the guide on http_adapters
to learn more.
Testing the Graph with the Python API#
The serve.run
function returns a handle that you can use to test your graph in Python, without using HTTP requests.
To test your graph,
Call
serve.run
on your graph and store the returned handle.Call
handle.predict.remote(input)
. Theinput
argument becomes the input represented byInputNode
. Make sure to refactor your call graph accordingly, since it takes in this input directly, instead of an HTTP request. You can use an HTTP adapter to make sure the graph you’re testing matches the one you ultimately deploy.predict.remote
returns a reference to the result, so the graph can execute asynchronously. Callray.get
on this reference to get the final result.
As an example, you can continue rewriting the arithmetic graph example from above to use predict.remote
. You can add testing code to the example:
# These imports can go to the top of the file.
import ray
from ray.serve.http_adapters import json_request
add_2 = AddCls.bind(2)
add_3 = AddCls.bind(3)
with InputNode() as request_number:
add_2_output = add_2.add.bind(request_number)
subtract_1_output = subtract_one_fn.bind(add_2_output)
add_3_output = add_3.add.bind(subtract_1_output)
graph = DAGDriver.bind(add_3_output, http_adapter=json_request)
handle = serve.run(graph)
ref = handle.predict.remote(5)
result = ray.get(ref)
print(result)
Note that the graph itself is still the same. The only change is the testing code added after it. You can run this Python script directly now to test the graph:
$ python arithmetic.py
9
Visualizing the Graph#
You can render an illustration of your deployment graph to see its nodes and their connection.
Make sure you have pydot
and graphviz
to follow this section:
pip install -U pydot && brew install graphviz
pip install -U pydot && winget install graphviz
pip install -U pydot && sudo apt-get install -y graphviz
Here’s an example graph:
# File name: deployment_graph_viz.py
from ray import serve
from ray.serve.deployment_graph import InputNode
from ray.dag.vis_utils import _dag_to_dot
@serve.deployment
class Model:
def __init__(self, weight: int):
self.weight = weight
def forward(self, input: int) -> int:
return input + self.weight
@serve.deployment
def combine(output_1: int, output_2: int, kwargs_output: int = 0) -> int:
return output_1 + output_2 + kwargs_output
m1 = Model.bind(1)
m2 = Model.bind(2)
with InputNode() as user_input:
m1_output = m1.forward.bind(user_input[0])
m2_output = m2.forward.bind(user_input[1])
combine_output = combine.bind(m1_output, m2_output, kwargs_output=user_input[2])
# m1_output visualization
graph = _dag_to_dot(m1_output)
to_string = graph.to_string()
print(to_string)
# Full graph visualization
graph = _dag_to_dot(combine_output)
to_string = graph.to_string()
print(to_string)
The ray.dag.vis_utils._dag_to_dot
method takes in a DeploymentNode
and produces a graph visualization. You can see the string form of the visualization by running the script:
$ python deployment_graph_viz.py
digraph G {
rankdir=LR;
INPUT_ATTRIBUTE_NODE -> forward;
INPUT_NODE -> INPUT_ATTRIBUTE_NODE;
Model -> forward;
}
digraph G {
rankdir=LR;
forward -> combine;
INPUT_ATTRIBUTE_NODE -> forward;
INPUT_NODE -> INPUT_ATTRIBUTE_NODE;
Model -> forward;
forward_1 -> combine;
INPUT_ATTRIBUTE_NODE_1 -> forward_1;
INPUT_NODE -> INPUT_ATTRIBUTE_NODE_1;
Model_1 -> forward_1;
INPUT_ATTRIBUTE_NODE_2 -> combine;
INPUT_NODE -> INPUT_ATTRIBUTE_NODE_2;
}
You can render these strings in graphviz
tools such as https://dreampuf.github.io/GraphvizOnline.
When the script visualizes m1_output
, it shows a partial execution path of the entire graph:
This path includes only the dependencies needed to generate m1_output
.
On the other hand, when the script visualizes the final graph output, combine_output
, it captures all nodes used in execution since they’re all required to create the final output.
Visualizing the Graph with Gradio#
Another option is to visualize your deployment graph through Gradio. Check out the Graph Visualization with Gradio Tutorial to learn how to interactively run your deployment graph through the Gradio UI and see the intermediate outputs of each node in real time as they finish evaluation.
Next Steps#
To learn more about deployment graphs, check out some deployment graph patterns you can incorporate into your own graph!