Model Composition¶

This section helps you:

  • compose multiple deployments containing ML logic or business logic into a single application

  • independently scale and configure each of your ML models and business logic steps

  • connect your Ray Serve deployments together with the deployment graph API

Composing Deployments using ServeHandles¶

You can call deployment methods from within other deployments using the ServeHandle. This lets you divide your application’s steps (such as preprocessing, model inference, and post-processing) into independent deployments that can be independently scaled and configured.

To use the ServeHandle, use handle.remote to send requests to a deployment. These requests can be ordinary Python args and kwargs that are passed directly to the method. This method call returns a Ray ObjectRef whose result can be waited for or retrieved using await or ray.get.

Model Composition Example¶

Here’s an example:

 1# File name: hello.py
 2import ray
 3from ray import serve
 4
 5
 6@serve.deployment
 7class LanguageClassifer:
 8    def __init__(self, spanish_responder, french_responder):
 9        self.spanish_responder = spanish_responder
10        self.french_responder = french_responder
11
12    async def __call__(self, http_request):
13        request = await http_request.json()
14        language, name = request["language"], request["name"]
15
16        if language == "spanish":
17            ref = await self.spanish_responder.say_hello.remote(name)
18        elif language == "french":
19            ref = await self.french_responder.say_hello.remote(name)
20        else:
21            return "Please try again."
22
23        return await ref
24
25
26@serve.deployment
27class SpanishResponder:
28    def say_hello(self, name: str):
29        return f"Hola {name}"
30
31
32@serve.deployment
33class FrenchResponder:
34    def say_hello(self, name: str):
35        return f"Bonjour {name}"
36
37
38spanish_responder = SpanishResponder.bind()
39french_responder = FrenchResponder.bind()
40language_classifier = LanguageClassifer.bind(spanish_responder, french_responder)

In line 40, the LanguageClassifier deployment takes in the spanish_responder and french_responder as constructor arguments. At runtime, these arguments are converted into ServeHandles. LanguageClassifier can then call the spanish_responder and french_responder’s deployment methods using this handle.

For example, the LanguageClassifier’s __call__ method uses the HTTP request’s values to decide whether to respond in Spanish or French. It then forwards the request’s name to the spanish_responder or the french_responder on lines 17 and 19 using the ServeHandles. The calls are formatted as:

await self.spanish_responder.say_hello.remote(name)

This call has a few parts:

  • await lets us issue an asynchronous request through the ServeHandle.

  • self.spanish_responder is the SpanishResponder handle taken in through the constructor.

  • say_hello is the SpanishResponder method to invoke.

  • remote indicates that this is a ServeHandle call to another deployment. This is required when invoking a deployment’s method through another deployment. It needs to be added to the method name.

  • name is the argument for say_hello. You can pass any number of arguments or keyword arguments here.

This call returns a reference to the result– not the result itself. This pattern allows the call to execute asynchronously. To get the actual result, await the reference. await blocks until the asynchronous call executes, and then it returns the result. In this example, line 23 calls await ref and returns the resulting string. Note that getting the result needs two await statements in total. First, the script must await the ServeHandle call itself to retrieve a reference. Then it must await the reference to get the final result.

Warning

You can use the ray.get(ref) method to get the return value of remote ServeHandle calls. However, calling ray.get from inside a deployment is an antipattern. It blocks the deployment from executing any other code until the call is finished. Using await lets the deployment process other requests while waiting for the ServeHandle call to finish. You should use await instead of ray.get inside deployments.

You can copy the hello.py script above and run it with serve run. Make sure to run the command from a directory containing hello.py, so it can locate the script:

$ serve run hello:language_classifier

You can use this client script to interact with the example:

# File name: hello_client.py
import requests

response = requests.post(
    "http://localhost:8000", json={"language": "spanish", "name": "Dora"}
)
greeting = response.text
print(greeting)

While the serve run command is running, open a separate terminal window and run this script:

$ python hello_client.py

Hola Dora

Note

Composition lets you break apart your application and independently scale each part. For instance, suppose this LanguageClassifier application’s requests were 75% Spanish and 25% French. You could scale your SpanishResponder to have 3 replicas and your FrenchResponder to have 1 replica, so you could meet your workload’s demand. This flexibility also applies to reserving resources like CPUs and GPUs, as well as any other configurations you can set for each deployment.

With composition, you can avoid application-level bottlenecks when serving models and business logic steps that use different types and amounts of resources.

ServeHandle Deep Dive¶

Conceptually, a ServeHandle is a client-side load balancer, routing requests to any replicas of a given deployment. Also, it performs buffering internally so it won’t overwhelm the replicas. Using the current number of requests buffered, it informs the autoscaler to scale up the number of replicas.

architecture-diagram-of-serve-handle

ServeHandles take request parameters and returns a future object of type ray.ObjectRef, whose value will be filled with the result object. Because of the internal buffering, the time from submitting a request to getting a ray.ObjectRef can vary.

Because of this variability, Serve offers two types of handles to ensure the buffering period is handled efficiently. We offer synchronous and asynchronous versions of the handle:

  • RayServeSyncHandle directly returns a ray.ObjectRef. It blocks the current thread until the request is matched to a replica.

  • RayServeDeploymentHandle returns an asyncio.Task upon submission. The asyncio.Task can be awaited to resolve to a ray.ObjectRef. While the current request is buffered, other requests can be processed concurrently.

serve.run deploys a deployment graph and returns the entrypoint node’s handle (the node you passed as argument to serve.run). The return type is a RayServeSyncHandle. This is useful for interacting with and testing the newly created deployment graph.

from starlette.requests import Request

import ray
from ray import serve
from ray.serve.handle import RayServeSyncHandle


@serve.deployment
class Model:
    def __call__(self) -> str:
        return "hello"


handle: RayServeSyncHandle = serve.run(Model.bind())
ref: ray.ObjectRef = handle.remote()  # blocks until request is assigned to replica
assert ray.get(ref) == "hello"

In all other cases, RayServeDeploymentHandle is the default because the API is more performant than its blocking counterpart. For example, when implementing a dynamic dispatch node in deployment graph, the handle is asynchronous.

import asyncio
import random
import ray
from ray import serve
from ray.serve.handle import RayServeDeploymentHandle, RayServeSyncHandle


@serve.deployment
class Model:
    def __call__(self) -> str:
        return "hello"


@serve.deployment
class DynamicDispatcher:
    def __init__(
        self, handle_a: RayServeDeploymentHandle, handle_b: RayServeDeploymentHandle
    ):
        self.handle_a = handle_a
        self.handle_b = handle_b

    async def __call__(self):
        handle_chosen = self.handle_a if random.random() < 0.5 else self.handle_b

        # The request is enqueued.
        submission_task: asyncio.Task = handle_chosen.remote()
        # The request is assigned to a replica.
        ref: ray.ObjectRef = await submission_task
        # The request has been processed by the replica.
        result = await ref

        return result


handle: RayServeSyncHandle = serve.run(
    DynamicDispatcher.bind(Model.bind(), Model.bind())
)
ref: ray.ObjectRef = handle.remote()
assert ray.get(ref) == "hello"

The result of deployment_handle.remote() can also be passed directly as an argument to other downstream handles, without having to await on it.

import asyncio
import ray
from ray import serve
from ray.serve.handle import RayServeDeploymentHandle, RayServeSyncHandle


@serve.deployment
class Model:
    def __call__(self, inp):
        return "hello " + inp


@serve.deployment
class Chain:
    def __init__(
        self, handle_a: RayServeDeploymentHandle, handle_b: RayServeDeploymentHandle
    ):
        self.handle_a = handle_a
        self.handle_b = handle_b

    async def __call__(self, inp):
        ref: asyncio.Task = await self.handle_b.remote(
            # Serve can handle enqueued-task as dependencies.
            self.handle_a.remote(inp)
        )
        return await ref


handle: RayServeSyncHandle = serve.run(Chain.bind(Model.bind(), Model.bind()))
ref: ray.ObjectRef = handle.remote("Serve")
assert ray.get(ref) == "hello hello Serve"

In both types of ServeHandle, you can call a specific method by using the .method_name accessor. For example:

import ray
from ray import serve
from ray.serve.handle import RayServeSyncHandle


@serve.deployment
class Deployment:
    def method1(self, arg: str) -> str:
        return f"Method1: {arg}"

    def __call__(self, arg: str) -> str:
        return f"__call__: {arg}"


handle: RayServeSyncHandle = serve.run(Deployment.bind())

ray.get(handle.remote("hi"))  # Defaults to calling the __call__ method.
ray.get(handle.method1.remote("hi"))  # Call a different method.

Note

ray.ObjectRef corresponds to the result of a request submission. To retrieve the result, you can use the synchronous Ray Core API ray.get(ref) or the async API await ref. To wait for the result to be available without retrieving it, you can use the synchronous API ray.wait([ref]) or the async API await asyncio.wait([ref]). You can mix and match these calls, but we recommend using async APIs to increase concurrency.

Deployment Graph API¶

Note

The call graph is in alpha, so its APIs are subject to change.

For more advanced composition patterns, it can be useful to surface the relationships between deployments, instead of hiding them inside individual deployment definitions.

Ray Serve’s deployment graph API lets you specify how to route requests through your deployments, so you can explicitly create a dependency graph. It also has additional features like HTTP adapters and input routing that help you build more expressive graphs.

Binding Deployments¶

The basic building block for all deployment graphs is the DeploymentNode. One type of DeploymentNode is the ClassNode. You can create ClassNodes by binding class-based deployments to their constructor’s arguments with the bind method. This may sound familiar because you’ve already been doing this whenever you bind and run class-based deployments, such as in the Calling Deployments using ServeHandles section.

As another example:

# File name: echo.py
from starlette.requests import Request

from ray import serve


@serve.deployment
class EchoClass:
    def __init__(self, echo_str: str):
        self.echo_str = echo_str

    def __call__(self, request: Request) -> str:
        return self.echo_str


# You can create ClassNodes from the EchoClass deployment
foo_node = EchoClass.bind("foo")
bar_node = EchoClass.bind("bar")
baz_node = EchoClass.bind("baz")

echo.py defines three ClassNodes: foo_node, bar_node, and baz_node. The nodes are defined by invoking bind on the EchoClass deployment. They have different behaviors because they use different arguments in the bind call.

Note that all three of these nodes were created from the same EchoClass deployment. Class deployments are essentially factories for ClassNodes. A single class deployment can produce multiple ClassNodes through multiple bind statements.

There are two options to run a node:

  1. serve.run(node): This Python call can be added to your Python script to run a particular node. This call starts a Ray cluster (if one isn’t already running), deploys the node to it, and then returns. You can call this function multiple times in the same script on different DeploymentNodes. Each time, it tears down any deployments it previously deployed and deploy the passed-in node’s deployment. After the script exits, the cluster and any nodes deployed by serve.run are torn down.

  2. serve run module:node: This CLI command starts a Ray cluster and runs the node at the import path module:node. It then blocks, allowing you to open a separate terminal window and issue requests to the running deployment. You can stop the serve run command with ctrl-c.

When you run a node, you are deploying the node’s deployment and its bound arguments. Ray Serve creates a deployment in Ray and instantiates your deployment’s class using the arguments. By default, you can send requests to your deployment at http://localhost:8000. These requests are converted to Starlette request objects and passed to your class’s __call__ method.

Note

Additionally, when you run a node, the deployment’s configurations (which you can set in the @serve.deployment decorator, through an options call, or a Serve config file) still apply to the deployment. You can use this to independently scale and configure your graph’s deployments by, for instance, setting different num_replicas, num_cpus, or num_gpus values for different deployments.

You can try this example out using the serve run CLI:

$ serve run echo:foo_node

Here’s a client script that can send requests to your node:

# File name: echo_client.py
import requests

response = requests.get("http://localhost:8000/")
echo = response.text
print(echo)

While the deployment is running with serve run, open a separate terminal window and issue a request to it with the echo_client.py script:

$ python echo_client.py

foo

Building the Call Graph: MethodNodes and FunctionNodes¶

After defining your ClassNodes, you can specify how HTTP requests should be processed using the call graph. As an example, let’s look at a deployment graph that implements this chain of arithmetic operations:

output = request + 2 - 1 + 3

Here’s the graph:

 1# File name: arithmetic.py
 2from ray import serve
 3from ray.serve.drivers import DAGDriver
 4from ray.serve.deployment_graph import InputNode
 5
 6from starlette.requests import Request
 7
 8
 9@serve.deployment
10class AddCls:
11    def __init__(self, addend: float):
12        self.addend = addend
13
14    def add(self, number: float) -> float:
15        return number + self.addend
16
17    async def unpack_request(self, http_request: Request) -> float:
18        return await http_request.json()
19
20
21@serve.deployment
22def subtract_one_fn(number: float) -> float:
23    return number - 1
24
25
26@serve.deployment
27async def unpack_request(http_request: Request) -> float:
28    return await http_request.json()
29
30
31add_2 = AddCls.bind(2)
32add_3 = AddCls.bind(3)
33
34with InputNode() as http_request:
35    request_number = unpack_request.bind(http_request)
36    add_2_output = add_2.add.bind(request_number)
37    subtract_1_output = subtract_one_fn.bind(add_2_output)
38    add_3_output = add_3.add.bind(subtract_1_output)
39
40graph = DAGDriver.bind(add_3_output)

Lines 29 and 30 bind two ClassNodes from the AddCls deployment. Line 32 starts the call graph:

with InputNode() as http_request:
    request_number = unpack_request.bind(http_request)
    add_2_output = add_2.add.bind(request_number)
    subtract_1_output = subtract_one_fn.bind(add_2_output)
    add_3_output = add_3.add.bind(subtract_1_output)

The with statement (known as a “context manager” in Python) initializes a special Ray Serve-provided object called an InputNode. This isn’t a DeploymentNode like ClassNodes, MethodNodes, or FunctionNodes. Rather, it’s the input of the graph. In this case, that input is an HTTP request. In a later section, you’ll learn how to change this input using another Ray Serve-provided object called the DAGDriver.

Note

The InputNode tells Ray Serve where to send the graph input at runtime. In this example, for instance, http_request is an InputNode object, so you can’t call request methods like .json() on it directly in the context manager. However, during runtime, Ray Serve passes incoming HTTP requests directly into the same functions and methods that http_request is passed into, so those functions and methods can call request methods like .json() on the request object that gets passed in.

You can use the InputNode to indicate which node(s) the graph input should be passed into by passing the InputNode into bind calls within the context manager. In this example, the http_request is passed to only one node, unpack_request. The output of that bind call, request_number, is a FunctionNode. FunctionNodes are produced when deployments containing functions are bound to arguments for that function using bind. request_number represents the output of unpack_request when called on incoming HTTP requests. unpack_request, which is defined on line 26, processes the HTTP request’s JSON body and returns a number that can be passed into arithmetic operations.

Tip

If you don’t want to manually unpack HTTP requests, check out this guide’s section on HTTP adapters, which can handle unpacking for you.

The graph then passes request_number into a bind call on add_2’s add method. The output of this call, add_2_output is a MethodNode. MethodNodes are produced when ClassNode methods are bound to arguments using bind. In this case, add_2_output represents the result of adding 2 to the number in the request.

The rest of the call graph uses another FunctionNode and MethodNode to finish the chain of arithmetic. add_2_output is bound to the subtract_one_fn deployment, producing the subtract_1_output FunctionNode. Then, the subtract_1_output is bound to the add_3.add method, producing the add_3_output MethodNode. This add_3_output MethodNode represents the final output from the chain of arithmetic operations.

To run the call graph, you need to use a driver. Drivers are deployments that process the call graph that you’ve written and route incoming requests through your deployments based on that graph. Ray Serve provides a driver called DAGDriver used on line 38:

deployment_graph = DAGDriver.bind(add_3_output)

Generally, the DAGDriver needs to be bound to the FunctionNode or MethodNode representing the final output of a graph. This bind call returns a ClassNode that you can run in serve.run or serve run. Running this ClassNode also deploys the rest of the graph’s deployments.

Note

The DAGDriver can also be bound to ClassNodes. This is useful if you construct a deployment graph where ClassNodes invoke other ClassNodes’ methods. In this case, you should pass in the “root” ClassNode to DAGDriver (i.e. the one that you would otherwise pass into serve.run). Check out the Calling Deployments using ServeHandles section for more info.

You can test this example using this client script:

# File name: arithmetic_client.py
import requests

response = requests.post("http://localhost:8000/", json=5)
output = response.json()
print(output)

Start the graph in the terminal:

$ serve run arithmetic:graph

In a separate terminal window, run the client script to make requests to the graph:

$ python arithmetic_client.py

9

Drivers and HTTP Adapters¶

Ray Serve provides the DAGDriver, which routes HTTP requests through your call graph. As mentioned in the call graph section, the DAGDriver takes in a DeploymentNode and it produces a ClassNode that you can run.

The DAGDriver also has an optional keyword argument: http_adapter. HTTP adapters are functions that get run on the HTTP request before it’s passed into the graph. Ray Serve provides a handful of these adapters, so you can rely on them to conveniently handle the HTTP parsing while focusing your attention on the graph itself.

For instance, you can use the Ray Serve-provided json_request adapter to simplify the arithmetic call graph by eliminating the unpack_request function. You can replace lines 29 through 38 with this graph:

# This import can go to the top of the file.
from ray.serve.http_adapters import json_request

add_2 = AddCls.bind(2)
add_3 = AddCls.bind(3)

with InputNode() as request_number:
    add_2_output = add_2.add.bind(request_number)
    subtract_1_output = subtract_one_fn.bind(add_2_output)
    add_3_output = add_3.add.bind(subtract_1_output)

graph = DAGDriver.bind(add_3_output, http_adapter=json_request)

Without an http_adapter, an InputNode represents an HTTP request, and at runtime, incoming HTTP request objects are passed into the same functions and methods that the InputNode is passed into. When you set an http_adapter, the InputNode represents the http_adapter’s output.

At runtime:

  1. Ray Serve sends each HTTP request object to the DAGDriver.

  2. The DAGDriver calls the http_adapter function on each request.

  3. The DAGDriver passes the http_adapter output to the same function and methods that the InputNode is passed into, kicking off the request’s journey through the call graph.

In the example above, the InputNode represents the number packaged inside the request’s JSON body instead of the HTTP request itself. You can pass the JSON directly into the graph instead of first unpacking it from the request.

See the guide on http_adapters to learn more.

Testing the Graph with the Python API¶

The serve.run function returns a handle that you can use to test your graph in Python, without using HTTP requests.

To test your graph,

  1. Call serve.run on your graph and store the returned handle.

  2. Call handle.predict.remote(input). The input argument becomes the input represented by InputNode. Make sure to refactor your call graph accordingly, since it takes in this input directly, instead of an HTTP request. You can use an HTTP adapter to make sure the graph you’re testing matches the one you ultimately deploy.

  3. predict.remote returns a reference to the result, so the graph can execute asynchronously. Call ray.get on this reference to get the final result.

As an example, you can continue rewriting the arithmetic graph example from above to use predict.remote. You can add testing code to the example:

# These imports can go to the top of the file.
import ray
from ray.serve.http_adapters import json_request

add_2 = AddCls.bind(2)
add_3 = AddCls.bind(3)

with InputNode() as request_number:
    add_2_output = add_2.add.bind(request_number)
    subtract_1_output = subtract_one_fn.bind(add_2_output)
    add_3_output = add_3.add.bind(subtract_1_output)

graph = DAGDriver.bind(add_3_output, http_adapter=json_request)

handle = serve.run(graph)

ref = handle.predict.remote(5)
result = ray.get(ref)
print(result)

Note that the graph itself is still the same. The only change is the testing code added after it. You can run this Python script directly now to test the graph:

$ python arithmetic.py

9

Visualizing the Graph¶

You can render an illustration of your deployment graph to see its nodes and their connection.

Make sure you have pydot and graphviz to follow this section:

pip install -U pydot && brew install graphviz
pip install -U pydot && winget install graphviz
pip install -U pydot && sudo apt-get install -y graphviz

Here’s an example graph:

# File name: deployment_graph_viz.py

from ray import serve
from ray.serve.deployment_graph import InputNode
from ray.dag.vis_utils import _dag_to_dot


@serve.deployment
class Model:
    def __init__(self, weight: int):
        self.weight = weight

    def forward(self, input: int) -> int:
        return input + self.weight


@serve.deployment
def combine(output_1: int, output_2: int, kwargs_output: int = 0) -> int:
    return output_1 + output_2 + kwargs_output


m1 = Model.bind(1)
m2 = Model.bind(2)

with InputNode() as user_input:
    m1_output = m1.forward.bind(user_input[0])
    m2_output = m2.forward.bind(user_input[1])
    combine_output = combine.bind(m1_output, m2_output, kwargs_output=user_input[2])

# m1_output visualization
graph = _dag_to_dot(m1_output)
to_string = graph.to_string()
print(to_string)

# Full graph visualization
graph = _dag_to_dot(combine_output)
to_string = graph.to_string()
print(to_string)

The ray.dag.vis_utils._dag_to_dot method takes in a DeploymentNode and produces a graph visualization. You can see the string form of the visualization by running the script:

$ python deployment_graph_viz.py

digraph G {
rankdir=LR;
INPUT_ATTRIBUTE_NODE -> forward;
INPUT_NODE -> INPUT_ATTRIBUTE_NODE;
Model -> forward;
}

digraph G {
rankdir=LR;
forward -> combine;
INPUT_ATTRIBUTE_NODE -> forward;
INPUT_NODE -> INPUT_ATTRIBUTE_NODE;
Model -> forward;
forward_1 -> combine;
INPUT_ATTRIBUTE_NODE_1 -> forward_1;
INPUT_NODE -> INPUT_ATTRIBUTE_NODE_1;
Model_1 -> forward_1;
INPUT_ATTRIBUTE_NODE_2 -> combine;
INPUT_NODE -> INPUT_ATTRIBUTE_NODE_2;
}

You can render these strings in graphviz tools such as https://dreampuf.github.io/GraphvizOnline.

When the script visualizes m1_output, it shows a partial execution path of the entire graph:

pic

This path includes only the dependencies needed to generate m1_output.

On the other hand, when the script visualizes the final graph output, combine_output, it captures all nodes used in execution since they’re all required to create the final output.

pic

Next Steps¶

To learn more about deployment graphs, check out some deployment graph patterns you can incorporate into your own graph!