Core API: Deployments

This section should help you:

  • create, query, update and configure deployments

  • configure resources of your deployments

  • specify different Python dependencies across different deployment using Runtime Environments

Tip

Get in touch with us if you’re using or considering using Ray Serve.

Creating a Deployment

Deployments are the central concept in Ray Serve. They allow you to define and update your business logic or models that will handle incoming requests as well as how this is exposed over HTTP or in Python.

A deployment is defined using @serve.deployment on a Python class (or function for simple use cases). You can specify arguments to be passed to the constructor when you call Deployment.deploy(), shown below.

A deployment consists of a number of replicas, which are individual copies of the function or class that are started in separate Ray Actors (processes).

@serve.deployment
class MyFirstDeployment:
  # Take the message to return as an argument to the constructor.
  def __init__(self, msg):
      self.msg = msg

  def __call__(self, request):
      return self.msg

  def other_method(self, arg):
      return self.msg

MyFirstDeployment.deploy("Hello world!")

Deployments can be exposed in two ways: over HTTP or in Python via the ServeHandle API. By default, HTTP requests will be forwarded to the __call__ method of the class (or the function) and a Starlette Request object will be the sole argument. You can also define a deployment that wraps a FastAPI app for more flexible handling of HTTP requests. See FastAPI HTTP Deployments for details.

To serve multiple deployments defined by the same class, use the name option:

MyFirstDeployment.options(name="hello_service").deploy("Hello!")
MyFirstDeployment.options(name="hi_service").deploy("Hi!")

You can also list all available deployments and dynamically get references to them:

>> serve.list_deployments()
{'A': Deployment(name=A,version=None,route_prefix=/A)}
{'MyFirstDeployment': Deployment(name=MyFirstDeployment,version=None,route_prefix=/MyFirstDeployment}

# Returns the same object as the original MyFirstDeployment object.
# This can be used to redeploy, get a handle, etc.
deployment = serve.get_deployment("MyFirstDeployment")

Exposing a Deployment

By default, deployments are exposed over HTTP at http://localhost:8000/<deployment_name>. The HTTP path that the deployment is available at can be changed using the route_prefix option. All requests to /{route_prefix} and any subpaths will be routed to the deployment (using a longest-prefix match for overlapping route prefixes).

Here’s an example:

@serve.deployment(name="http_deployment", route_prefix="/api")
class HTTPDeployment:
  def __call__(self, request):
      return "Hello world!"

After creating the deployment, it is now exposed by the HTTP server and handles requests using the specified class. We can query the model to verify that it’s working.

import requests
print(requests.get("http://127.0.0.1:8000/api").text)

We can also query the deployment using the ServeHandle interface.

# To get a handle from the same script, use the Deployment object directly:
handle = HTTPDeployment.get_handle()

# To get a handle from a different script, reference it by name:
handle = serve.get_deployment("http_deployment").get_handle()

print(ray.get(handle.remote()))

As noted above, there are two ways to expose deployments. The first is by using the ServeHandle interface. This method allows you to access deployments within a Python script or code, making it convenient for a Python developer. And the second is by using the HTTP request, allowing access to deployments via a web client application.

Let’s look at a simple end-to-end example using both ways to expose and access deployments. Your output may vary due to random nature of how the prediction is computed; however, the example illustrates two things: 1) how to expose and use deployments and 2) how to use replicas, to which requests are sent. Note that each pid is a separate replica associated with each deployment name, rep-1 and rep-2 respectively.

#
# This brief example shows how to create, deploy, and expose access to
# deployment models, using the simple Ray Serve deployment APIs.
# Once deployed, you can access deployment via two methods:
# ServerHandle API and HTTP
#
import os
from random import random

import requests
import starlette
from starlette.requests import Request
import ray
from ray import serve

#
# A simple example model stored in a pickled format at an accessible path
# that can be reloaded and deserialized into a model instance. Once deployed
# in Ray Serve, we can use it for prediction. The prediction is a fake condition,
# based on threshold of weight greater than 0.5.
#


class Model:
    def __init__(self, path):
        self.path = path

    def predict(self, data):
        return random() + data if data > 0.5 else data


@serve.deployment
class Deployment:
    # Take in a path to load your desired model
    def __init__(self, path: str) -> None:
        self.path = path
        self.model = Model(path)
        # Get the pid on which this deployment is running on
        self.pid = os.getpid()

    # Deployments are callable. Here we simply return a prediction from
    # our request
    def __call__(self, starlette_request) -> str:
        # Request came via an HTTP
        if isinstance(starlette_request, starlette.requests.Request):
            data = starlette_request.query_params['data']
        else:
            # Request came via a ServerHandle API method call.
            data = starlette_request
        pred = self.model.predict(float(data))
        return f"(pid: {self.pid}); path: {self.path}; data: {float(data):.3f}; prediction: {pred:.3f}"


if __name__ == '__main__':

    # Start a Ray Serve instance. This will automatically start
    # or connect to an existing Ray cluster.
    serve.start()

    # Create two distinct deployments of the same class as
    # two replicas. Associate each deployment with a unique 'name'.
    # This name can be used as to fetch its respective serve handle.
    # See code below for method 1.
    Deployment.options(name="rep-1", num_replicas=2).deploy("/model/rep-1.pkl")
    Deployment.options(name="rep-2", num_replicas=2).deploy("/model/rep-2.pkl")

    # Get the current list of deployments
    print(serve.list_deployments())

    print("ServerHandle API responses: " + "--" * 5)

    # Method 1) Access each deployment using the ServerHandle API
    for _ in range(2):
        for d_name in ["rep-1", "rep-2"]:
            # Get handle to the each deployment and invoke its method.
            # Which replica the request is dispatched to is determined
            # by the Router actor.
            handle = serve.get_deployment(d_name).get_handle()
            print(f"handle name : {d_name}")
            print(f"prediction  : {ray.get(handle.remote(random()))}")
            print("-" * 2)

    print("HTTP responses: " + "--" * 5)

    # Method 2) Access deployment via HTTP Request
    for _ in range(2):
        for d_name in ["rep-1", "rep-2"]:
            # Send HTTP request along with data payload
            url = f"http://127.0.0.1:8000/{d_name}"
            print(f"handle name : {d_name}")
            print(f"prediction  : {requests.get(url, params= {'data': random()}).text}")
# Output:
# {'rep-1': Deployment(name=rep-1,version=None,route_prefix=/rep-1),
# 'rep-2': Deployment(name=rep-2,version=None,route_prefix=/rep-2)}
#
# ServerHandle API responses: ----------
# handle name : rep-1
# prediction  : (pid: 62636); path: /model/rep-1.pkl; data: 0.600; prediction: 1.292
# --
# handle name : rep-2
# prediction  : (pid: 62635); path: /model/rep-2.pkl; data: 0.075; prediction: 0.075
# --
# handle name : rep-1
# prediction  : (pid: 62634); path: /model/rep-1.pkl; data: 0.186; prediction: 0.186
# --
# handle name : rep-2
# prediction  : (pid: 62637); path: /model/rep-2.pkl; data: 0.751; prediction: 1.444
# --
# HTTP responses: ----------
# handle name : rep-1
# prediction  : (pid: 62636); path: /model/rep-1.pkl; data: 0.582; prediction: 1.481
# handle name : rep-2
# prediction  : (pid: 62637); path: /model/rep-2.pkl; data: 0.778; prediction: 1.678
# handle name : rep-1
# prediction  : (pid: 62634); path: /model/rep-1.pkl; data: 0.139; prediction: 0.139
# handle name : rep-2
# prediction  : (pid: 62635); path: /model/rep-2.pkl; data: 0.569; prediction: 1.262

Updating a Deployment

Often you want to be able to update your code or configuration options for a deployment over time. Deployments can be updated simply by updating the code or configuration options and calling deploy() again.

@serve.deployment(name="my_deployment", num_replicas=1)
class SimpleDeployment:
    pass

# Creates one initial replica.
SimpleDeployment.deploy()

# Re-deploys, creating an additional replica.
# This could be the SAME Python script, modified and re-run.
@serve.deployment(name="my_deployment", num_replicas=2)
class SimpleDeployment:
    pass

SimpleDeployment.deploy()

# You can also use Deployment.options() to change options without redefining
# the class. This is useful for programmatically updating deployments.
SimpleDeployment.options(num_replicas=2).deploy()

By default, each call to .deploy() will cause a redeployment, even if the underlying code and options didn’t change. This could be detrimental if you have many deployments in a script and and only want to update one: if you re-run the script, all of the deployments will be redeployed, not just the one you updated. To prevent this, you may provide a version string for the deployment as a keyword argument in the decorator or Deployment.options(). If provided, the replicas will only be updated if the value of version is updated; if the value of version is unchanged, the call to .deploy() will be a no-op. When a redeployment happens, Serve will perform a rolling update, bringing down at most 20% of the replicas at any given time.

Configuring a Deployment

There are a number of things you’ll likely want to do with your serving application including scaling out or configuring the maximum number of in-flight requests for a deployment. All of these options can be specified either in @serve.deployment or in Deployment.options().

To update the config options for a running deployment, simply redeploy it with the new options set.

Scaling Out

To scale out a deployment to many processes, simply configure the number of replicas.

# Create with a single replica.
@serve.deployment(num_replicas=1)
def func(*args):
    pass

func.deploy()

# Scale up to 10 replicas.
func.options(num_replicas=10).deploy()

# Scale back down to 1 replica.
func.options(num_replicas=1).deploy()

Autoscaling

Serve also has experimental support for a demand-based replica autoscaler. It reacts to traffic spikes via observing queue sizes and making scaling decisions. To configure it, you can set the _autoscaling field in deployment options.

Warning

The API is experimental and subject to change. We welcome you to test it out and leave us feedback through Github Issues or our discussion forum!

@serve.deployment(
    _autoscaling_config={
        "min_replicas": 1,
        "max_replicas": 5,
        "target_num_ongoing_requests_per_replica": 10,
    },
    version="v1")
def func(_):
    time.sleep(1)
    return ""

func.deploy() # The func deployment will now autoscale based on requests demand.

The min_replicas and max_replicas fields configure the range of replicas which the Serve autoscaler chooses from. Deployments will start with min_replicas initially.

The target_num_ongoing_requests_per_replica configuration specifies how aggressively the autoscaler should react to traffic. Serve will try to make sure that each replica has roughly that number of requests being processed and waiting in the queue. For example, if your processing time is 10ms and the latency constraint is 100ms, you can have at most 10 requests ongoing per replica so the last requests can finish within the latency constraint. We recommend you benchmark your application code and set this number based on end to end latency objective.

Note

The version field is required for autoscaling. We are actively working on removing this limitation.

Resource Management (CPUs, GPUs)

To assign hardware resources per replica, you can pass resource requirements to ray_actor_options. By default, each replica requires one CPU. To learn about options to pass in, take a look at Resources with Actor guide.

For example, to create a deployment where each replica uses a single GPU, you can do the following:

@serve.deployment(ray_actor_options={"num_gpus": 1})
def func(*args):
    return do_something_with_my_gpu()

Fractional Resources

The resources specified in ray_actor_options can also be fractional. This allows you to flexibly share resources between replicas. For example, if you have two models and each doesn’t fully saturate a GPU, you might want to have them share a GPU by allocating 0.5 GPUs each. The same could be done to multiplex over CPUs.

@serve.deployment(name="deployment1", ray_actor_options={"num_gpus": 0.5})
def func(*args):
    return do_something_with_my_gpu()

@serve.deployment(name="deployment2", ray_actor_options={"num_gpus": 0.5})
def func(*args):
    return do_something_with_my_gpu()

Configuring Parallelism with OMP_NUM_THREADS

Deep learning models like PyTorch and Tensorflow often use multithreading when performing inference. The number of CPUs they use is controlled by the OMP_NUM_THREADS environment variable. To avoid contention, Ray sets OMP_NUM_THREADS=1 by default because Ray workers and actors use a single CPU by default. If you do want to enable this parallelism in your Serve deployment, just set OMP_NUM_THREADS to the desired value either when starting Ray or in your function/class definition:

OMP_NUM_THREADS=12 ray start --head
OMP_NUM_THREADS=12 ray start --address=$HEAD_NODE_ADDRESS
@serve.deployment
class MyDeployment:
    def __init__(self, parallelism):
        os.environ["OMP_NUM_THREADS"] = parallelism
        # Download model weights, initialize model, etc.

MyDeployment.deploy()

Note

Some other libraries may not respect OMP_NUM_THREADS and have their own way to configure parallelism. For example, if you’re using OpenCV, you’ll need to manually set the number of threads using cv2.setNumThreads(num_threads) (set to 0 to disable multi-threading). You can check the configuration using cv2.getNumThreads() and cv2.getNumberOfCPUs().

User Configuration (Experimental)

Suppose you want to update a parameter in your model without needing to restart the replicas in your deployment. You can do this by writing a reconfigure method for the class underlying your deployment. At runtime, you can then pass in your new parameters by setting the user_config option.

The following simple example will make the usage clear:

import requests
import random

from ray import serve

serve.start()


@serve.deployment(route_prefix="/threshold")
class Threshold:
    def __init__(self):
        # self.model won't be changed by reconfigure.
        self.model = random.Random()  # Imagine this is some heavyweight model.

    def reconfigure(self, config):
        # This will be called when the class is created and when
        # the user_config is updated.
        self.threshold = config["threshold"]

    def __call__(self, request):
        return self.model.random() > self.threshold


Threshold.options(user_config={"threshold": 0.01}).deploy()
print(requests.get("http://127.0.0.1:8000/threshold").text)  # true, probably

Threshold.options(user_config={"threshold": 0.99}).deploy()
print(requests.get("http://127.0.0.1:8000/threshold").text)  # false, probably

The reconfigure method is called when the class is created if user_config is set. In particular, it’s also called when new replicas are created in the future if scale up your deployment later. The reconfigure method is also called each time user_config is updated.

Handling Dependencies

Ray Serve supports serving deployments with different (possibly conflicting) Python dependencies. For example, you can simultaneously serve one deployment that uses legacy Tensorflow 1 and another that uses Tensorflow 2.

This is supported on Mac OS and Linux using Ray’s Runtime environments feature. As with all other Ray actor options, pass the runtime environment in via ray_actor_options in your deployment. Be sure to first run pip install "ray[default]" to ensure the Runtime Environments feature is installed.

Example:

import requests
from ray import serve

serve.start()


@serve.deployment
def requests_version(request):
    return requests.__version__


requests_version.options(
    name="25",
    ray_actor_options={
        "runtime_env": {
            "pip": {"packages": ["ray[serve]", "requests==2.25.1"], "pip_check": False}
        }
    },
).deploy()
requests_version.options(
    name="26",
    ray_actor_options={
        "runtime_env": {
            "pip": {"packages": ["ray[serve]", "requests==2.26.0"], "pip_check": False}
        }
    },
).deploy()

assert requests.get("http://127.0.0.1:8000/25").text == "2.25.1"
assert requests.get("http://127.0.0.1:8000/26").text == "2.26.0"

Note

When using a Ray library (for example, Ray Serve) in a runtime environment, it must explicitly be included in the dependencies, as in the above example. This is not required when just using Ray Core.

Tip

Avoid dynamically installing packages that install from source: these can be slow and use up all resources while installing, leading to problems with the Ray cluster. Consider precompiling such packages in a private repository or Docker image.

The dependencies required in the deployment may be different than the dependencies installed in the driver program (the one running Serve API calls). In this case, you should use a delayed import within the class to avoid importing unavailable packages in the driver. This applies even when not using runtime environments.

Example:

from ray import serve

serve.start()


@serve.deployment
class MyDeployment:
    def __call__(self, model_path):
        from my_module import my_model

        self.model = my_model.load(model_path)


MyDeployment.deploy("/model_path.pkl")