Deploying Ray Serve

This section should help you:

  • understand how Ray Serve runs on a Ray cluster beyond the basics mentioned in Core API: Deployments

  • deploy and update your Serve application over time

  • monitor your Serve application using the Ray Dashboard and logging

Lifetime of a Ray Serve Instance

Ray Serve instances run on top of Ray clusters and are started using serve.start. Once serve.start has been called, further API calls can be used to create and update the deployments that will be used to serve your Python code (including ML models). The Serve instance will be torn down when the script exits.

When running on a long-lived Ray cluster (e.g., one started using ray start and connected to using ray.init(address="auto", namespace="serve"), you can also deploy a Ray Serve instance as a long-running service using serve.start(detached=True). In this case, the Serve instance will continue to run on the Ray cluster even after the script that calls it exits. If you want to run another script to update the Serve instance, you can run another script that connects to the same Ray cluster and makes further API calls (e.g., to create, update, or delete a deployment). Note that there can only be one detached Serve instance on each Ray cluster.

All non-detached Serve instances will be started in the current namespace that was specified when connecting to the cluster. If a namespace is specified for a detached Serve instance, it will be used. Otherwise if the current namespace is anonymous, the Serve instance will be started in the serve namespace.

If serve.start() is called again in a process in which there is already a running Serve instance, Serve will re-connect to the existing instance (regardless of whether the original instance was detached or not). To reconnect to a Serve instance that exists in the Ray cluster but not in the current process, connect to the cluster with the same namespace that was specified when starting the instance and run serve.start().

Deploying on a Single Node

While Ray Serve makes it easy to scale out on a multi-node Ray cluster, in some scenarios a single node may suite your needs. There are two ways you can run Ray Serve on a single node, shown below. In general, Option 2 is recommended for most users because it allows you to fully make use of Serve’s ability to dynamically update running deployments.

  1. Start Ray and deploy with Ray Serve all in a single Python file.

import ray
from ray import serve
import time

# This will start Ray locally and start Serve on top of it.
serve.start()

@serve.deployment
def my_func(request):
  return "hello"

my_func.deploy()

# Serve will be shut down once the script exits, so keep it alive manually.
while True:
    time.sleep(5)
    print(serve.list_deployments())
  1. First running ray start --head on the machine, then connecting to the running local Ray cluster using ray.init(address="auto", namespace="serve") in your Serve script(s) (this is the Ray namespace, not Kubernetes namespace, and you can specify any namespace that you like). You can run multiple scripts to update your deployments over time.

ray start --head # Start local Ray cluster.
serve start # Start Serve on the local Ray cluster.
import ray
from ray import serve

# This will connect to the running Ray cluster.
ray.init(address="auto", namespace="serve")

@serve.deployment
def my_func(request):
  return "hello"

my_func.deploy()

Deploying on Kubernetes

In order to deploy Ray Serve on Kubernetes, we need to do the following:

  1. Start a Ray cluster on Kubernetes.

  2. Expose the head node of the cluster as a Service.

  3. Start Ray Serve on the cluster.

There are multiple ways to start a Ray cluster on Kubernetes, see Deploying on Kubernetes for more information. Here, we will be using the Ray Cluster Launcher tool, which has support for Kubernetes as a backend.

The cluster launcher takes in a yaml config file that describes the cluster. Here, we’ll be using the Kubernetes default config with a few small modifications. First, we need to make sure that the head node of the cluster, where Ray Serve will run its HTTP server, is exposed as a Kubernetes Service. There is already a default head node service defined in the services field of the config, so we just need to make sure that it’s exposing the right port: 8000, which Ray Serve binds on by default.

# Service that maps to the head node of the Ray cluster.
- apiVersion: v1
  kind: Service
  metadata:
      name: ray-head
  spec:
      # Must match the label in the head pod spec below.
      selector:
          component: ray-head
      ports:
          - protocol: TCP
            # Port that this service will listen on.
            port: 8000
            # Port that requests will be sent to in pods backing the service.
            targetPort: 8000

Then, we also need to make sure that the head node pod spec matches the selector defined here and exposes the same port:

head_node:
  apiVersion: v1
  kind: Pod
  metadata:
    # Automatically generates a name for the pod with this prefix.
    generateName: ray-head-

    # Matches the selector in the service definition above.
    labels:
        component: ray-head

  spec:
    # ...
    containers:
    - name: ray-node
      # ...
      ports:
          - containerPort: 8000 # Ray Serve default port.
    # ...

The rest of the config remains unchanged for this example, though you may want to change the container image or the number of worker pods started by default when running your own deployment. Now, we just need to start the cluster:

# Start the cluster.
$ ray up ray/python/ray/autoscaler/kubernetes/example-full.yaml

# Check the status of the service pointing to the head node. If configured
# properly, you should see the 'Endpoints' field populated with an IP
# address like below. If not, make sure the head node pod started
# successfully and the selector/labels match.
$ kubectl -n ray describe service ray-head
  Name:              ray-head
  Namespace:         ray
  Labels:            <none>
  Annotations:       <none>
  Selector:          component=ray-head
  Type:              ClusterIP
  IP:                10.100.188.203
  Port:              <unset>  8000/TCP
  TargetPort:        8000/TCP
  Endpoints:         192.168.73.98:8000
  Session Affinity:  None
  Events:            <none>

With the cluster now running, we can run a simple script to start Ray Serve and deploy a “hello world” deployment:

import ray
from ray import serve

# Connect to the running Ray cluster.
ray.init(address="auto", namespace="serve")
# Bind on 0.0.0.0 to expose the HTTP server on external IPs.
serve.start(detached=True, http_options={"host": "0.0.0.0"})


@serve.deployment(route_prefix="/hello")
def hello(request):
    return "hello world"

hello.deploy()

Save this script locally as deploy.py and run it on the head node using ray submit:

$ ray submit ray/python/ray/autoscaler/kubernetes/example-full.yaml deploy.py

Now we can try querying the service by sending an HTTP request to the service from within the Kubernetes cluster.

# Get a shell inside of the head node.
$ ray attach ray/python/ray/autoscaler/kubernetes/example-full.yaml

# Query the Ray Serve deployment. This can be run from anywhere in the
# Kubernetes cluster.
$ curl -X GET http://$RAY_HEAD_SERVICE_HOST:8000/hello
hello world

In order to expose the Ray Serve deployment externally, we would need to deploy the Service we created here behind an Ingress or a NodePort. Please refer to the Kubernetes documentation for more information.

Failure Recovery

Ray Serve is resilient to any component failures within the Ray cluster out of the box. You can checkout the detail of how process and worker node failure handled at How does Serve handle fault tolerance?. However, when the Ray head node goes down, you would need to recover the state by creating a new Ray cluster and re-deploys all Serve deployments into that cluster.

Note

Ray currently cannot survive head node failure and we recommend using application specific failure recovery solutions. Although Ray is not currently highly available (HA), it is on the long term roadmap and being actively worked on.

Ray Serve added an experimental feature to help recovering the state. This features enables Serve to write all your deployment configuration and code into a storage location. Upon Ray cluster failure and restarts, you can simply call Serve to reconstruct the state.

Here is how to use it:

Warning

The API is experimental and subject to change. We welcome you to test it out and leave us feedback through github issues or discussion forum!

You can use both the start argument and the CLI to specify it:

serve.start(_checkpoint_path=...)

or

serve start --checkpoint-path ...

The checkpoint path argument accepts the following format:

  • file://local_file_path

  • s3://bucket/path

  • gs://bucket/path

  • custom://importable.custom_python.Class/path

While we have native support for on disk, AWS S3, and Google Cloud Storage (GCS), there is no reason we cannot support more.

In Kubernetes environment, we recommend using Persistent Volumes to create a disk and mount it into the Ray head node. For example, you can provision Azure Disk, AWS Elastic Block Store, or GCP Persistent Disk using the K8s Persistent Volumes API. Alternatively, you can also directly write to object store like S3.

You can easily try to plug into your own implementation using the custom:// path and inherit the KVStoreBase class. Feel free to open new github issues and contribute more storage backends!

Monitoring

Ray Dashboard

A high-level way to monitor your Ray Serve deployment (or any Ray application) is via the Ray Dashboard. See the Ray Dashboard documentation for a detailed overview, including instructions on how to view the dashboard.

Below is an example of what the Ray Dashboard might look like for a Serve deployment:

https://raw.githubusercontent.com/ray-project/Images/master/docs/dashboard/serve-dashboard.png

Here you can see the Serve controller actor, an HTTP proxy actor, and all of the replicas for each Serve deployment. To learn about the function of the controller and proxy actors, see the Serve Architecture page. In this example pictured above, we have a single-node cluster with a deployment named Counter with num_replicas=2.

Logging

Logging in Ray Serve uses Python’s standard logging facility.

Note

For an general overview of logging in Ray, see Ray Logging.

Tracing Backends and Replicas

When looking through log files of your Ray Serve application, it is useful to know which deployment and replica each log line originated from. To automatically include the current deployment and replica in your logs, simply call logger = logging.getLogger("ray"), and use logger within your deployment code:

import logging

from ray import serve
import requests

serve.start()

logger = logging.getLogger("ray")


@serve.deployment
def f(*_args):
    logger.info("Some info!")


f.deploy()

requests.get("http://127.0.0.1:8000/f")

Querying the above deployment will produce a log line like the following:

(pid=42161) 2021-02-26 11:05:21,709     INFO snippet_logger.py:13 -- Some info! component=serve deployment=f replica=f#jZlnUI

To write your own custom logger using Python’s logging package, use the following method:

ray.serve.get_replica_context() → ray.serve.api.ReplicaContext[source]

If called from a deployment, returns the deployment and replica tag.

A replica tag uniquely identifies a single replica for a Ray Serve deployment at runtime. Replica tags are of the form <deployment_name>#<random letters>.

Raises

RayServeException – if not called from within a Ray Serve deployment.

Example

>>> serve.get_replica_context().deployment # deployment_name
>>> serve.get_replica_context().replica_tag # deployment_name#krcwoa

PublicAPI: This API is stable across Ray releases.

Ray Serve logs can be ingested by your favorite external logging agent. Ray logs from the current session are exported to the directory /tmp/ray/session_latest/logs and remain there until the next session starts.

Tutorial: Ray Serve with Loki

Here is a quick walkthrough of how to explore and filter your logs using Loki. Setup and configuration is very easy on Kubernetes, but in this tutorial we’ll just set things up manually.

First, install Loki and Promtail using the instructions on https://grafana.com. It will be convenient to save the Loki and Promtail executables in the same directory, and to navigate to this directory in your terminal before beginning this walkthrough.

Now let’s get our logs into Loki using Promtail.

Save the following file as promtail-local-config.yaml:

server:
  http_listen_port: 9080
  grpc_listen_port: 0

positions:
  filename: /tmp/positions.yaml

clients:
  - url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: ray
static_configs:
  - labels:
    job: ray
    __path__: /tmp/ray/session_latest/logs/*.*

The relevant part for Ray is the static_configs field, where we have indicated the location of our log files with __path__. The expression *.* will match all files, but not directories, which cause an error with Promtail.

We will run Loki locally. Grab the default config file for Loki with the following command in your terminal:

wget https://raw.githubusercontent.com/grafana/loki/v2.1.0/cmd/loki/loki-local-config.yaml

Now start Loki:

./loki-darwin-amd64 -config.file=loki-local-config.yaml

Here you may need to replace ./loki-darwin-amd64 with the path to your Loki executable file, which may have a different name depending on your operating system.

Start Promtail and pass in the path to the config file we saved earlier:

./promtail-darwin-amd64 -config.file=promtail-local-config.yaml

As above, you may need to replace ./promtail-darwin-amd64 with the appropriate filename and path.

Now we are ready to start our Ray Serve deployment. Start a long-running Ray cluster and Ray Serve instance in your terminal:

ray start --head
serve start

Now run the following Python script to deploy a basic Serve deployment with a Serve deployment logger:

import logging
import ray
from ray import serve
import requests

ray.init(address="auto")

logger = logging.getLogger("ray")


@serve.deployment
class Counter:
    def __init__(self):
        self.count = 0

    def __call__(self, request):
        self.count += 1
        logger.info(f"count: {self.count}")
        return {"count": self.count}


Counter.deploy()

for i in range(10):
    requests.get("http://127.0.0.1:8000/Counter")

Now install and run Grafana and navigate to http://localhost:3000, where you can log in with the default username “admin” and default password “admin”. On the welcome page, click “Add your first data source” and click “Loki” to add Loki as a data source.

Now click “Explore” in the left-side panel. You are ready to run some queries!

To filter all these Ray logs for the ones relevant to our deployment, use the following LogQL query:

{job="ray"} |= "deployment=Counter"

You should see something similar to the following:

https://raw.githubusercontent.com/ray-project/Images/master/docs/serve/loki-serve.png

Metrics

Ray Serve exposes important system metrics like the number of successful and errored requests through the Ray metrics monitoring infrastructure. By default, the metrics are exposed in Prometheus format on each node.

The following metrics are exposed by Ray Serve:

Name

Description

serve_deployment_request_counter

The number of queries that have been processed in this replica.

serve_deployment_error_counter

The number of exceptions that have occurred in the deployment.

serve_deployment_replica_starts

The number of times this replica has been restarted due to failure.

serve_deployment_queuing_latency_ms

The latency for queries in the replica’s queue waiting to be processed.

serve_deployment_processing_latency_ms

The latency for queries to be processed.

serve_replica_queued_queries

The current number of queries queued in the deployment replicas.

serve_replica_processing_queries

The current number of queries being processed.

serve_num_http_requests

The number of HTTP requests processed.

serve_num_router_requests

The number of requests processed by the router.

serve_handle_request_counter

The number of requests processed by this ServeHandle.

serve_deployment_queued_queries

The number of queries for this deployment waiting to be assigned to a replica.

To see this in action, run ray start --head --metrics-export-port=8080 in your terminal, and then run the following script:

import ray
from ray import serve

import time

ray.init(address="auto")
serve.start()


@serve.deployment
def f(request):
    time.sleep(1)


f.deploy()

handle = f.get_handle()
while True:
    ray.get(handle.remote())

In your web browser, navigate to localhost:8080. In the output there, you can search for serve_ to locate the metrics above. The metrics are updated once every ten seconds, and you will need to refresh the page to see the new values.

For example, after running the script for some time and refreshing localhost:8080 you might see something that looks like:

ray_serve_deployment_processing_latency_ms_count{...,deployment="f",...} 99.0
ray_serve_deployment_processing_latency_ms_sum{...,deployment="f",...} 99279.30498123169

which indicates that the average processing latency is just over one second, as expected.

You can even define a custom metric to use in your deployment, and tag it with the current deployment or replica. Here’s an example:

@serve.deployment
class MyDeployment:
    def __init__(self):
        self.my_counter = metrics.Counter(
            "my_counter",
            description=("The number of excellent reqs to this deployment."),
            tag_keys=("deployment", ))
        self.my_counter.set_default_tags({
            "deployment": serve.get_current_deployment()
        })

    def call(self, excellent=False):
        if excellent:
            self.my_counter.inc()


See the Ray Metrics documentation for more details, including instructions for scraping these metrics using Prometheus.