Pattern: Linear Pipeline#

This deployment graph pattern is a linear pipeline of deployments. The request flows from each deployment to the next, getting transformed each time.

pic

Code#

# File name: linear_pipeline.py

import ray
from ray import serve
from ray.serve.drivers import DAGDriver
from ray.serve.http_adapters import json_request
from ray.serve.deployment_graph import InputNode


@serve.deployment
class Model:
    def __init__(self, weight: float):
        self.weight = weight

    def forward(self, input: float) -> float:
        return input + self.weight


nodes = [Model.bind(0), Model.bind(1), Model.bind(2)]
outputs = [None, None, None]

with InputNode() as graph_input:
    outputs[0] = nodes[0].forward.bind(graph_input)

    for i in range(1, len(nodes)):
        outputs[i] = nodes[i].forward.bind(outputs[i - 1])

graph = DAGDriver.bind(outputs[-1], http_adapter=json_request)

handle = serve.run(graph)

sum = ray.get(handle.predict.remote(1))
print(sum)

Execution#

This graph has three nodes, which are all instances of the Model deployment. Each Model is constructed with a different weight, and its forward method adds that weight to the input.

The call graph calls each deployment’s forward method, one after another, which adds all the Model’s weights to the input. The code executes the graph on an input of 0 and after adding all the weights (0, 1, and 2), it gets a final sum of 3:

$ python linear_pipeline.py

3