Pipeline API (Experimental)

This section should help you:

  • understand the experimental pipeline API.

  • build on top of the API to construct your multi-model inference pipelines.


This API is experimental and the API is subject to change. We are actively looking for feedback via the Ray Forum or GitHub Issues

Serve Pipeline is a new experimental package purposely built to help developing and deploying multi-models inference pipelines, also known as model composition.

Model composition is common in real-world ML applications. In many cases, you need to:

  • Split CPU bounded preprocessing and GPU bounded model inference to scale each phase separately.

  • Chain multiple models together for a single tasks.

  • Combine the output from multiple models to create ensemble result.

  • Dynamically select models based on attribute of the input data.

The Serve Pipeline has the following features:

  • It has a python based, declarative API for constructing pipeline DAG.

  • It gives you visibility into the whole pipeline without losing the flexibility of coding arbitrary graph using code.

  • You can develop and test pipeline locally with local execution mode.

  • Each model in the DAG can be scaled to many replicas across the Ray cluster. You can fine-tune the resource usage to achieve maximum throughput and utilization.

Compare to ServeHandle, Serve Pipeline is more explicit about the dependencies of each model in the pipeline and let you deploy the entire DAG at once.

Compare to KServe (formerly KFServing), Serve Pipeline enables writing pipeline as code and arbitrary control flow operation using Python.

Compare to building your own orchestration micro-services, Serve Pipeline helps you to be productive in building scalable pipeline in hours.

Basic API

Serve Pipeline is a standalone package that can be used without Ray Serve. However, the expected usage is to use it inside your Serve deployment.

You can import it as:

from ray.serve import pipeline

You can decorate any function or class using pipeline.step. You can then combine these steps into a pipeline by calling the decorated steps. In the example below, we have a single step that takes the special node pipeline.INPUT, , which is a placeholder for the arguments that will be passed into the pipeline.

Once you have defined the pipeline by combining one or more steps, you can call .deploy() to instantiate it. Once you have instantiated the pipeline, you can call .call(inp) to invoke synchronously.

def echo(inp):
    return inp

my_node = echo(pipeline.INPUT)
my_pipeline = my_node.deploy()
assert my_pipeline.call(42) == 42

The input to a pipeline node can be the pipeline.INPUT special node or one or more other pipeline nodes. Here is an example of simple chaining pipeline.

def add_one(inp):
    return inp + 1

def double(inp):
    return inp**2

my_node = double(add_one(pipeline.INPUT))
my_pipeline = my_node.deploy()
assert my_pipeline.call(1) == 4

For classes, you need to instantiate them with init args first, then pass in their upstream nodes. This allows you to have the same code definition but pass different arguments, like URIs for model weights (you can see an example of this in the ensemble example section.)

class Adder:
    def __init__(self, value):
        self.value = value

    def __call__(self, inp):
        return self.value + inp

my_pipeline = Adder(2)(pipeline.INPUT).deploy()
assert my_pipeline.call(2) == 4

The decorator also takes two arguments to configure where the node will be executed.

ray.serve.pipeline.step(_func_or_class: Optional[Callable] = None, execution_mode: Union[ray.serve.pipeline.common.ExecutionMode, str] = <ExecutionMode.LOCAL: 1>, num_replicas: int = 1) → Callable[[Callable], ray.serve.pipeline.step.PipelineStep][source]

Decorator used to define a pipeline step.

  • execution_mode (ExecutionMode) –

    The execution mode for this step. Supported modes:

    • ExecutionMode.LOCAL (default): executes this step inline in the calling process.

    • ExecutionMode.TASKS: executes this step in Ray tasks.

    • ExecutionMode.ACTORS: executes this step in Ray actors.

  • num_replicas (int) – The number of Ray actors to start that will run this step (default to 1). Only valid when using ExecutionMode.ACTORS.


>>> @pipeline.step(execution_mode="actors", num_replicas=10)
    def my_step(*args):


Here is an example pipeline that uses actors instead of local execution mode. The local execution mode is the default running mode. It runs the node directly within the process instead of distributing them out. This mode is useful for local testing and development.

@pipeline.step(execution_mode="actors", num_replicas=2)
def echo(inp):
    return inp

my_pipeline = echo(pipeline.INPUT).deploy()
assert my_pipeline.call(42) == 42

Chaining Example

In this section, we show how to implement a two stage pipeline that’s common for computer vision tasks. For workloads like image classification, the preprocessing steps are CPU bounded and hard to parallelize. The actual inference steps can run on GPU and batched (batching helps improving throughput without sacrificing latency, you can learn more in our batching tutorial). They are often split up into separate stages and scaled separately to increase throughput.

def preprocess(img_bytes):
    from torchvision import transforms
    import PIL.Image
    import io

    preprocessor = transforms.Compose([
        transforms.Lambda(lambda t: t[:3, ...]),  # remove alpha channel
            mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    return preprocessor(PIL.Image.open(io.BytesIO(img_bytes))).unsqueeze(0)

@pipeline.step(execution_mode="actors", num_replicas=2)
class ClassificationModel:
    def __init__(self, model_name):
        import torchvision.models.resnet
        self.model = getattr(torchvision.models.resnet,

    def __call__(self, inp_tensor):
        import torch
        with torch.no_grad():
            output = self.model(inp_tensor).squeeze()
            sorted_value, sorted_idx = output.sort()
        return {
            "top_5_categories": sorted_idx.numpy().tolist()[-5:],
            "top_5_scores": sorted_value.numpy().tolist()[-5:]

import PIL.Image
import io
import numpy as np

# Generate dummy input
_buffer = io.BytesIO()
    np.zeros((720, 720, 3), int), mode="RGB").save(_buffer, "png")
dummy_png_bytes = _buffer.getvalue()

sequential_pipeline = (ClassificationModel("resnet18")(preprocess(
result = sequential_pipeline.call(dummy_png_bytes)
assert result["top_5_categories"] == [898, 412, 600, 731, 463]

Ensemble Example

We will now expand on previous example to construct an ensemble pipeline. In the previous example, our pipeline looks like: preprocess -> resnet18. What if we want to aggregate the output from many different models? You can build this scatter-gather pattern easily with Pipeline. The below code snippet shows how to construct a pipeline looks like: preprocess -> [resnet18, resnet34] -> combine_output.

def combine_output(*classifier_outputs):
    # Here will we will just concatenate the result from multiple models
    # You can easily extend this to other ensemble techniques like voting
    # or weighted average.
    return sum([out["top_5_categories"] for out in classifier_outputs], [])

preprocess_node = preprocess(pipeline.INPUT)
model_nodes = [
    for model in ["resnet18", "resnet34"]
ensemble_pipeline = combine_output(*model_nodes).deploy()
result = ensemble_pipeline.call(dummy_png_bytes)
assert result == [898, 412, 600, 731, 463, 899, 618, 733, 463, 600]

More Use Case Examples

There are even more use cases for Serve Pipeline.


Please feel free to suggest more use cases and contribute your examples by sending a Github Pull Requests!

Combining business logic + ML models

Based off the previous ensemble example, you can put arbitrary business logic in your pipeline step.

def dynamic_weighting_combine(*classifier_outputs):
    # Pseudo-code:
    # Example of bringing in custom business logic and arbitrary Python code.
    # You can issue database queries, log metrics, and run complex computation.
    my_weights = my_db.get("dynamic_weights")
    weighted_output = average(classifier_outputs, my_weights)
    my_api_response = my_response_model.reshape(
        [out.astype("int") for out in weighted_output])
    return my_api_response