Serving ML Models

This section should help you:

  • batch requests to optimize performance

  • serve multiple models by composing deployments

Request Batching

You can also have Ray Serve batch requests for performance, which is especially important for some ML models that run on GPUs. In order to use this feature, you need to do the following two things:

  1. Use async def for your request handling logic to process queries concurrently.

  2. Use the @serve.batch decorator to batch individual queries that come into the replica. The method/function that’s decorated should handle a list of requests and return a list of the same length.

@serve.deployment(route_prefix="/increment")
class BatchingExample:
    def __init__(self):
        self.count = 0

    @serve.batch
    async def handle_batch(self, requests):
        responses = []
        for request in requests:
            responses.append(request.json())

        return responses

    async def __call__(self, request):
        return await self.handle_batch(request)

BatchingExample.deploy()

Please take a look at Batching Tutorial for a deep dive.

Model Composition

Note

Serve recently added an experimental API for building deployment graphs of multiple models. Please take a look at the Deployment Graph API and try it out!

Ray Serve supports composing individually scalable models into a single model out of the box. For instance, you can combine multiple models to perform stacking or ensembles.

To define a higher-level composed model you need to do three things:

  1. Define your underlying models (the ones that you will compose together) as Ray Serve deployments.

  2. Define your composed model, using the handles of the underlying models (see the example below).

  3. Define a deployment representing this composed model and query it!

In order to avoid synchronous execution in the composed model (e.g., it’s very slow to make calls to the composed model), you’ll need to make the function asynchronous by using an async def. You’ll see this in the example below.

That’s it. Let’s take a look at an example:

from random import random
import requests
import ray
from ray import serve

#
# Our pipeline will be structured as follows:
# - Input comes in, the composed model sends it to model_one
# - model_one outputs a random number between 0 and 1, if the value is
#   greater than 0.5, then the data is sent to model_two
# - otherwise, the data is returned to the user.

# Let's define two models that just print out the data they received.


@serve.deployment
def model_one(data):
    print(f"Model 1 called with data:{data}")
    return random()


@serve.deployment
def model_two(data):
    print(f"Model 2 called with data:{data}")
    # Use this data sent from model_one.
    return data


# max_concurrent_queries is optional. By default, if you pass in an async
# function, Ray Serve sets the limit to a high number.
@serve.deployment(max_concurrent_queries=10, route_prefix="/composed")
class ComposedModel:
    def __init__(self):
        # Use the Python ServeHandle APIs.
        # Set sync=False to override default, which is use this in a
        # synchronous mode.  We want these deployments to be run within an
        # asynchronous event loop for concurrency. See documentation for Sync
        # and Async ServeHandle APIs for details:
        # https://docs.ray.io/en/latest/serve/http-servehandle.html
        self.model_one = model_one.get_handle(sync=False)
        self.model_two = model_two.get_handle(sync=False)

    # This method can be called concurrently.
    async def __call__(self, starlette_request):
        # At this point you are yielding to the event loop to take in another
        # request.
        data = await starlette_request.body()

        # Use await twice here for two reasons:
        # 1. Since we are running within a async def callable function and we
        # want to use this model_one deployment to run in an asynchronous
        # fashion, this is standard async-await pattern. This await call will
        # return an ObjectRef.
        # 2. The second await waits on the ObjectRef to do an implicit
        # ray.get(Object) to fetch the actual value returned.
        # Hence two awaits.
        score = await (await self.model_one.remote(data=data))
        if score > 0.5:
            await (await self.model_two.remote(data=data))
            result = {"model_used: 1 & 2;  score": score}
        else:
            result = {"model_used: 1 ; score": score}

        return result


if __name__ == "__main__":

    # Start ray with 8 processes.
    if ray.is_initialized:
        ray.shutdown()
    ray.init(num_cpus=8)
    serve.start()
    # Start deployment instances.
    model_one.deploy()
    model_two.deploy()
    ComposedModel.deploy()

    # Now send requests.
    for _ in range(8):
        resp = requests.get("http://127.0.0.1:8000/composed", data="Hey!")
        print(resp.json())

    ray.shutdown()

# Output
# {'model_used: 1 ; score': 0.20814435670233788}
# {'model_used: 1 ; score': 0.02964993348224776}
# {'model_used: 1 & 2;  score': 0.7570845154147877}
# {'model_used: 1 & 2;  score': 0.8166808845518793}
# {'model_used: 1 ; score': 0.28354556740137904}
# {'model_used: 1 & 2;  score': 0.5826064390148368}
# {'model_used: 1 ; score': 0.4460146836937825}
# {'model_used: 1 ; score': 0.37099434069129833}

Integration with Model Registries

Ray Serve is flexible. If you can load your model as a Python function or class, then you can scale it up and serve it with Ray Serve.

For example, if you are using the MLflow Model Registry to manage your models, the following wrapper class will allow you to load a model using its MLflow Model URI:

import pandas as pd
import mlflow.pyfunc

@serve.deployment
class MLflowDeployment:
    def __init__(self, model_uri):
        self.model = mlflow.pyfunc.load_model(model_uri=model_uri)

    async def __call__(self, request):
        csv_text = await request.body() # The body contains just raw csv text.
        df = pd.read_csv(csv_text)
        return self.model.predict(df)

model_uri = "model:/my_registered_model/Production"
MLflowDeployment.deploy(model_uri)

To serve multiple different MLflow models in the same program, use the name option:

MLflowDeployment.options(name="my_mlflow_model_1").deploy(model_uri)

Tip

The above approach will work for any model registry, not just MLflow. Namely, load the model from the registry in __init__, and forward the request to the model in __call__.

For a complete hands-on and seamless integration with MLflow, try this self-contained example on your laptop. But first install mlflow.

pip install mlflow
# This brief example shows how to deploy models saved in a model registry such as
# MLflow to Ray Serve, using the simple Ray Serve deployment APIs. You can peruse
# the saved models' metrics and parameters in MLflow ui.
#
import json
import numpy as np
import pandas as pd
import requests
import os
import tempfile

from sklearn.datasets import load_iris
from sklearn.ensemble import GradientBoostingClassifier
from mlflow.tracking import MlflowClient

from ray import serve
import mlflow


def create_and_save_model():
    # load Iris data
    iris_data = load_iris()
    data, target, target_names = (iris_data['data'],
                                  iris_data['target'],
                                  iris_data['target_names'])

    # Instantiate a model
    model = GradientBoostingClassifier()

    # Training and validation split
    np.random.shuffle(data), np.random.shuffle(target)
    train_x, train_y = data[:100], target[:100]
    val_x, val_y = data[100:], target[100:]

    # Create labels list as file
    LABEL_PATH = os.path.join(tempfile.gettempdir(), "iris_labels.json")
    with open(LABEL_PATH, "w") as f:
        json.dump(target_names.tolist(), f)

    # Train the model and save our label list as an MLflow artifact
    # mlflow.sklearn.autolog automatically logs all parameters and metrics during
    # the training.
    mlflow.sklearn.autolog()
    with mlflow.start_run() as run:
        model.fit(train_x, train_y)
        # Log label list as a artifact
        mlflow.log_artifact(LABEL_PATH, artifact_path="labels")
    return run.info.run_id

#
# Create our Ray Serve deployment class
#


@serve.deployment(route_prefix="/regressor")
class BoostingModel:
    def __init__(self, uri):
        # Load the model and label artifact from the local
        # Mlflow model registry as a PyFunc Model
        self.model = mlflow.pyfunc.load_model(model_uri=uri)

        # Download the artifact list of labels
        local_dir = "/tmp/artifact_downloads"
        if not os.path.exists(local_dir):
            os.mkdir(local_dir)
        client = MlflowClient()
        local_path = f"{client.download_artifacts(run_id, 'labels', local_dir)}/iris_labels.json"
        with open(local_path, "r") as f:
            self.label_list = json.load(f)

    async def __call__(self, starlette_request):
        payload = await starlette_request.json()
        print(f"Worker: received Starlette request with data: {payload}")

        # Get the input vector from the payload
        input_vector = [
            payload["sepal length"],
            payload["sepal width"],
            payload["petal length"],
            payload["petal width"],
        ]

        # Convert the input vector in a Pandas DataFrame for prediction since
        # an MLflow PythonFunc model, model.predict(...), takes pandas DataFrame
        prediction = self.model.predict(pd.DataFrame([input_vector]))[0]
        human_name = self.label_list[prediction]
        return {"result": human_name}


if __name__ == '__main__':

    # Train and save the model artifacts in MLflow.
    # Here our MLflow model registry is local file
    # directory ./mlruns
    run_id = create_and_save_model()

    # Start the Ray Serve instance
    serve.start()
    # Construct model uri to load the model from our model registry
    uri = f"runs:/{run_id}/model"
    # Deploy our model.
    BoostingModel.deploy(uri)

    # Send in a request for labels types virginica, setosa, versicolor
    sample_request_inputs = [{
        "sepal length": 6.3,
        "sepal width": 3.3,
        "petal length": 6.0,
        "petal width": 2.5},
        {
        "sepal length": 5.1,
        "sepal width": 3.5,
        "petal length": 1.4,
        "petal width": 0.2},
        {
        "sepal length": 6.4,
        "sepal width": 3.2,
        "petal length": 4.5,
        "petal width": 1.5},
    ]
    for input_request in sample_request_inputs:
        response = requests.get("http://localhost:8000/regressor",
                            json=input_request)
        print(response.text)

    print("Launch MLflow ui to see the model parameters, metrics, and artifacts: `mlflow ui` from current directory.")

    #output
    #{
    #   "result": "versicolor"
    #}
    #{
    #    "result": "virginica"
    #}
    #{
    #    "result": "setosa"
    #}
    #
    # Launch MLflow ui to see the model parameters, metrics, and artifacts: `mlflow ui` from current directory.

For an even more hands-off and seamless integration with MLflow, check out the Ray Serve MLflow deployment plugin. A full tutorial is available here.

Framework-Specific Tutorials

Ray Serve seamlessly integrates with popular Python ML libraries. Below are tutorials with some of these frameworks to help get you started.