Deploying Ray Serve

In the Key Concepts, you saw some of the basics of how to write serve applications. This section will dive a bit deeper into how Ray Serve runs on a Ray cluster and how you’re able to deploy and update your serve application over time.

To deploy a Ray Serve instance you’re going to need several things.

  1. A running Ray cluster (you can deploy one on your local machine for testing). To learn more about Ray clusters see Deploying Ray.

  2. A Ray Serve instance.

  3. Your Ray Serve endpoint(s) and backend(s).

Deploying a Model with Ray Serve

Let’s get started deploying our first Ray Serve application. The first thing you’ll need to do is start a Ray cluster. You can do that using the Ray autoscaler, but in our case we’ll create it on our local machine. To learn more about Ray Clusters see Deploying Ray.

Starting the Cluster

We do that by running:

ray start --head

That starts a cluster on our local machine. We can shut that down by running ray stop. You should run this after we complete this tutorial.

Setup: Training a Model

Make sure you install Scikit-learn.

Place the following in a python script and run it. In this example we’re training a model and saving it to disk for us to load into our Ray Serve app.

import pickle
import json
import numpy as np

from sklearn.datasets import load_iris
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import mean_squared_error

# Load data
iris_dataset = load_iris()
data, target, target_names = iris_dataset["data"], iris_dataset[
    "target"], iris_dataset["target_names"]

# Instantiate model
model = GradientBoostingClassifier()

# Training and validation split
np.random.shuffle(data), np.random.shuffle(target)
train_x, train_y = data[:100], target[:100]
val_x, val_y = data[100:], target[100:]

# Train and evaluate models
model.fit(train_x, train_y)
print("MSE:", mean_squared_error(model.predict(val_x), val_y))

# Save the model and label to file
with open("/tmp/iris_model_logistic_regression.pkl", "wb") as f:
    pickle.dump(model, f)
with open("/tmp/iris_labels.json", "w") as f:
    json.dump(target_names.tolist(), f)

As discussed in other Tutorials, we can use any framework to build these models. In general, you’ll just want to have the ability to persist these models to disk.

Now that we’ve trained that model and saved it to disk (keep in mind this could also be a service like S3), we’ll need to create a backend to serve the model.

Creating a Model and Serving it

In the following snippet we will complete two things: 1. Define a servable model by instantiating a class and defining the __call__ method. 2. Connect to our running Ray cluster(ray.init(...)) and then start or connect to the Ray Serve instance on that cluster(serve.init(...)).

You can see that defining the model is straightforward and simple, we’re simply instantiating the model like we might a typical Python class.

Configuring our model to accept traffic is specified via .set_traffic after we created a backend in serve for our model (and versioned it with a string).

import pickle  # noqa: E402
import json  # noqa: E402

from ray import serve  # noqa: E402
import ray  # noqa: E402


class BoostingModel:
    def __init__(self):
        with open("/tmp/iris_model_logistic_regression.pkl", "rb") as f:
            self.model = pickle.load(f)
        with open("/tmp/iris_labels.json") as f:
            self.label_list = json.load(f)

    def __call__(self, flask_request):
        payload = flask_request.json
        print("Worker: received flask request with data", payload)

        input_vector = [
            payload["sepal length"],
            payload["sepal width"],
            payload["petal length"],
            payload["petal width"],
        ]
        prediction = self.model.predict([input_vector])[0]
        human_name = self.label_list[prediction]
        return {"result": human_name}


# connect to our existing Ray cluster
# note that the password will be different for your redis instance!
ray.init(address="auto")
# now we initialize /connect to the Ray service

# listen on 0.0.0.0 to make the HTTP server accessible from other machines.
serve.init(http_host="0.0.0.0")
serve.create_backend("lr:v1", BoostingModel)
serve.create_endpoint("iris_classifier", backend="lr:v1", route="/regressor")

What serve does when we run this code is store the model as a Ray actor and route traffic to it as the endpoint is queried, in this case over HTTP. Note that in order for this endpoint to be accessible from other machines, we need to specify http_host="0.0.0.0" in serve.init like we did here.

Now let’s query our endpoint to see the result.

Querying our Endpoint

We’ll use the requests library to query our endpoint and be able to get a result.

import requests  # noqa: E402

sample_request_input = {
    "sepal length": 1.2,
    "sepal width": 1.0,
    "petal length": 1.1,
    "petal width": 0.9,
}
response = requests.get(
    "http://localhost:8000/regressor", json=sample_request_input)
print(response.text)
# Result:
# {
#  "result": "setosa",
#  "version": "v1"
# }
# this result may vary, since the training parameters may change.
# as we update this model, this result will also change over time.

Now that we defined a model and have it running on our Ray cluster. Let’s proceed with updating this model with a new set of code.

Updating Your Model Over Time

Updating our model is as simple as deploying the first one. While the code snippet includes a lot of information, all that we’re doing is we are defining a new model, saving it, then loading it into serve. The key lines are at the end.

import pickle  # noqa: E402
import json  # noqa: E402
import numpy as np  # noqa: E402

from sklearn.datasets import load_iris  # noqa: E402
from sklearn.ensemble import GradientBoostingClassifier  # noqa: E402
from sklearn.metrics import mean_squared_error  # noqa: E402

# Load data
iris_dataset = load_iris()
data, target, target_names = iris_dataset["data"], iris_dataset[
    "target"], iris_dataset["target_names"]

# Instantiate model
model = GradientBoostingClassifier()

# Training and validation split
np.random.shuffle(data), np.random.shuffle(target)
train_x, train_y = data[:100], target[:100]
val_x, val_y = data[100:], target[100:]

# Train and evaluate models
model.fit(train_x, train_y)
print("MSE:", mean_squared_error(model.predict(val_x), val_y))

# Save the model and label to file
with open("/tmp/iris_model_logistic_regression_2.pkl", "wb") as f:
    pickle.dump(model, f)
with open("/tmp/iris_labels_2.json", "w") as f:
    json.dump(target_names.tolist(), f)


import pickle  # noqa: E402
import json  # noqa: E402

from ray import serve  # noqa: E402
import ray  # noqa: E402


class BoostingModelv2:
    def __init__(self):
        with open("/tmp/iris_model_logistic_regression_2.pkl", "rb") as f:
            self.model = pickle.load(f)
        with open("/tmp/iris_labels_2.json") as f:
            self.label_list = json.load(f)

    def __call__(self, flask_request):
        payload = flask_request.json
        print("Worker: received flask request with data", payload)

        input_vector = [
            payload["sepal length"],
            payload["sepal width"],
            payload["petal length"],
            payload["petal width"],
        ]
        prediction = self.model.predict([input_vector])[0]
        human_name = self.label_list[prediction]
        return {"result": human_name, "version": "v2"}

# connect to our existing Ray cluster
# note that the password will be different for your redis instance!
# ray.init(address='auto', redis_password='5241590000000000')
# now we initialize /connect to the Ray service


serve.init()
serve.create_backend("lr:v2", BoostingModelv2)
serve.set_traffic("iris_classifier", {"lr:v2": 0.25, "lr:v1": 0.75})

Consequentially, since Ray Serve runs as a service, all we need to tell it is that (a) there’s a new model and (b) how much traffic we should send to that model (and from what endpoint).

We do that with the line at the end of the code snippet, which allows us to split traffic between these two models.

serve.set_traffic("iris_classifier", {"lr:v2": 0.25, "lr:v1": 0.75})

While this is a simple operation, you may want to see Splitting Traffic Between Backends for more information. One thing you may want to consider as well is Session Affinity which gives you the ability to ensure that queries from users/clients always get mapped to the same backend. versions.

Now that we’re up and running serving two models in production, let’s query our results several times to see some results. You’ll notice that we’re now splitting traffic between these two different models.

Querying our Endpoint

We’ll use the requests library to query our endpoint and be able to get a result.

import requests  # noqa: E402

sample_request_input = {
    "sepal length": 1.2,
    "sepal width": 1.0,
    "petal length": 1.1,
    "petal width": 0.9,
}
response = requests.get(
    "http://localhost:8000/regressor", json=sample_request_input)
print(response.text)
# Result:
# {
#  "result": "setosa",
#  "version": "v1"
# }
# this result may vary, since the training parameters may change.
# as we update this model, this result will also change over time.

If you run this code several times, you’ll notice that the output will change - this is due to us running the two models in parallel that we created above.

Upon concluding the above tutorial, you’ll want to run ray stop to shutdown the Ray cluster on your local machine.

Deploying as a Kubernetes Service

In order to deploy Ray Serve on Kubernetes, we need to do the following:

  1. Start a Ray cluster on Kubernetes.

  2. Expose the head node of the cluster as a Service.

  3. Start Ray Serve on the cluster.

There are multiple ways to start a Ray cluster on Kubernetes, see Deploying on Kubernetes for more information. Here, we will be using the Automatic Cluster Setup tool, which has support for Kubernetes as a backend.

The autoscaler takes in a yaml config file that describes the cluster. Here, we’ll be using the Kubernetes default config with a few small modifications. First, we need to make sure that the head node of the cluster, where Ray Serve will run its HTTP server, is exposed as a Kubernetes Service. There is already a default head node service defined in the services field of the config, so we just need to make sure that it’s exposing the right port: 8000, which Ray Serve binds on by default.

# Service that maps to the head node of the Ray cluster.
- apiVersion: v1
  kind: Service
  metadata:
      name: ray-head
  spec:
      # Must match the label in the head pod spec below.
      selector:
          component: ray-head
      ports:
          - protocol: TCP
            # Port that this service will listen on.
            port: 8000
            # Port that requests will be sent to in pods backing the service.
            targetPort: 8000

Then, we also need to make sure that the head node pod spec matches the selector defined here and exposes the same port:

head_node:
  apiVersion: v1
  kind: Pod
  metadata:
    # Automatically generates a name for the pod with this prefix.
    generateName: ray-head-

    # Matches the selector in the service definition above.
    labels:
        component: ray-head

  spec:
    # ...
    containers:
    - name: ray-node
      # ...
      ports:
          - containerPort: 8000 # Ray Serve default port.
    # ...

The rest of the config remains unchanged for this example, though you may want to change the container image or the number of worker pods started by default when running your own deployment. Now, we just need to start the cluster:

# Start the cluster.
$ ray up ray/python/ray/autoscaler/kubernetes/example-full.yaml

# Check the status of the service pointing to the head node. If configured
# properly, you should see the 'Endpoints' field populated with an IP
# address like below. If not, make sure the head node pod started
# successfully and the selector/labels match.
$ kubectl -n ray describe service ray-head
  Name:              ray-head
  Namespace:         ray
  Labels:            <none>
  Annotations:       <none>
  Selector:          component=ray-head
  Type:              ClusterIP
  IP:                10.100.188.203
  Port:              <unset>  8000/TCP
  TargetPort:        8000/TCP
  Endpoints:         192.168.73.98:8000
  Session Affinity:  None
  Events:            <none>

With the cluster now running, we can run a simple script to start Ray Serve and deploy a “hello world” backend:

import ray
from ray import serve

# Connect to the running Ray cluster.
ray.init(address="auto")
# Bind on 0.0.0.0 to expose the HTTP server on external IPs.
serve.init(http_host="0.0.0.0")

def hello():
    return "hello world"

serve.create_backend("hello_backend", hello)
serve.create_endpoint("hello_endpoint", backend="hello_backend", route="/hello")

Save this script locally as deploy.py and run it on the head node using ray submit:

$ ray submit ray/python/ray/autoscaler/kubernetes/example-full.yaml deploy.py

Now we can try querying the service by sending an HTTP request to the service from within the Kubernetes cluster.

# Get a shell inside of the head node.
$ ray attach ray/python/ray/autoscaler/kubernetes/example-full.yaml

# Query the Ray Serve endpoint. This can be run from anywhere in the
# Kubernetes cluster.
$ curl -X GET http://$RAY_HEAD_SERVICE_HOST:8000/hello
hello world

In order to expose the Ray Serve endpoint externally, we would need to deploy the Service we created here behind an Ingress or a NodePort. Please refer to the Kubernetes documentation for more information.

Deployment FAQ

Best practices for local development

One thing you may notice is that we never have to declare a while True loop or something to keep the Ray Serve process running. In general, we don’t recommend using forever loops and therefore opt for launching a Ray Cluster locally. Specify a Ray cluster like we did in Deploying a Model with Ray Serve. To learn more, in general, about Ray Clusters see Deploying Ray.

Deploying Multiple Serve Instaces on a Single Ray Cluster

You can run multiple serve instances on the same Ray cluster by providing a name in serve.init().

# Create a first cluster whose HTTP server listens on 8000.
serve.init(name="cluster1", http_port=8000)

# Create a second cluster whose HTTP server listens on 8001.
serve.init(name="cluster2", http_port=8001)

# Create a backend that will be served on the second cluster.
serve.create_backend("backend2", function)
serve.create_endpoint("endpoint2", backend="backend2", route="/increment")

# Switch back the the first cluster and create the same backend on it.
serve.init(name="cluster1")
serve.create_backend("backend1", function)
serve.create_endpoint("endpoint1", backend="backend1", route="/increment")