Online serving#

 

https://img.shields.io/static/v1?label=&message=View%20On%20GitHub&color=586069&logo=github&labelColor=2f363d 

This tutorial launches an online service that deploys the trained model to generate predictions and autoscales based on incoming traffic.

%%bash
pip install -q "matplotlib==3.10.0" "torch==2.7.0" "transformers==4.52.3" "scikit-learn==1.6.0" "mlflow==2.19.0" "ipywidgets==8.1.3"
Successfully registered `matplotlib, torch` and 4 other packages to be installed on all cluster nodes.
View and update dependencies here: https://console.anyscale.com/cld_kvedZWag2qA8i5BjxUevf5i7/prj_cz951f43jjdybtzkx1s5sjgz99/workspaces/expwrk_eys8cskj5aivghbf773dp2vmcd?workspace-tab=dependencies
%load_ext autoreload
%autoreload all
import os
import ray
import sys
sys.path.append(os.path.abspath(".."))
ray.init(runtime_env={"working_dir": "../"})
2025-06-23 20:03:54,080	INFO worker.py:1723 -- Connecting to existing Ray cluster at address: 10.0.61.28:6379...
2025-06-23 20:03:54,091	INFO worker.py:1908 -- Connected to Ray cluster. View the dashboard at https://session-gcwehd9xxjzkv5lxv8lgcdgx2n.i.anyscaleuserdata.com 
2025-06-23 20:03:54,133	INFO packaging.py:588 -- Creating a file package for local module '../'.
2025-06-23 20:03:54,190	INFO packaging.py:380 -- Pushing file package 'gcs://_ray_pkg_60b8ab9607f9a287.zip' (12.99MiB) to Ray cluster...
2025-06-23 20:03:54,250	INFO packaging.py:393 -- Successfully pushed file package 'gcs://_ray_pkg_60b8ab9607f9a287.zip'.
import os
from fastapi import FastAPI
import mlflow
import requests
from starlette.requests import Request
from urllib.parse import urlparse
from ray import serve
import numpy as np
from PIL import Image
import torch
from transformers import CLIPModel, CLIPProcessor
from doggos.infer import TorchPredictor
from doggos.model import collate_fn
from doggos.utils import url_to_array

Deployments#

First create a deployment for the trained model that generates a probability distribution for a given image URL. You can specify the compute you want to use with ray_actor_options, and how you want to horizontally scale, with num_replicas, this specific deployment.

@serve.deployment(
    num_replicas="1", 
    ray_actor_options={
        "num_gpus": 1, 
        "accelerator_type": "L4",
    },
)
class ClassPredictor:
    def __init__(self, model_id, artifacts_dir, device="cuda"):
        """Initialize the model."""
        # Embdding model
        self.processor = CLIPProcessor.from_pretrained(model_id)
        self.model = CLIPModel.from_pretrained(model_id)
        self.model.to(device=device)
        self.device = device

        # Trained classifier
        self.predictor = TorchPredictor.from_artifacts_dir(artifacts_dir=artifacts_dir)
        self.preprocessor = self.predictor.preprocessor

    def get_probabilities(self, url):
        image = Image.fromarray(np.uint8(url_to_array(url=url))).convert("RGB")
        inputs = self.processor(images=[image], return_tensors="pt", padding=True).to(self.device)
        with torch.inference_mode():
            embedding = self.model.get_image_features(**inputs).cpu().numpy()
        outputs = self.predictor.predict_probabilities(
            collate_fn({"embedding": embedding}))
        return {"probabilities": outputs["probabilities"][0]}
🧱 Model composition

Ray Serve makes it easy to do model composition where you can compose multiple deployments containing ML models or business logic into a single application. You can independently scale even fractional resources, and configure each of your deployments.

https://raw.githubusercontent.com/anyscale/multimodal-ai/refs/heads/main/images/serve_composition.png

Application#

# Define app.
api = FastAPI(
    title="doggos", 
    description="classify your dog", 
    version="0.1",
)
@serve.deployment
@serve.ingress(api)
class Doggos:
    def __init__(self, classifier):
        self.classifier = classifier
        
    @api.post("/predict/")
    async def predict(self, request: Request):
        data = await request.json()
        probabilities = await self.classifier.get_probabilities.remote(url=data["url"])
        return probabilities
# Model registry.
model_registry = "/mnt/cluster_storage/mlflow/doggos"
experiment_name = "doggos"
mlflow.set_tracking_uri(f"file:{model_registry}")
# Get best_run's artifact_dir.
mlflow.set_tracking_uri(f"file:{model_registry}")
sorted_runs = mlflow.search_runs(
    experiment_names=[experiment_name], 
    order_by=["metrics.val_loss ASC"])
best_run = sorted_runs.iloc[0]
artifacts_dir = urlparse(best_run.artifact_uri).path
# Define app.
app = Doggos.bind(
    classifier=ClassPredictor.bind(
        model_id="openai/clip-vit-base-patch32",
        artifacts_dir=artifacts_dir,
        device="cuda"
    )
)
# Run service locally.
serve.run(app, route_prefix="/")
(ProxyActor pid=75693) INFO 2025-06-23 20:04:07,726 proxy 10.0.61.28 -- Proxy starting on node b4c1ef3393280e7df5c15725708ef231f52e1e31e050f75f5d32a41a (HTTP port: 8000).
(ProxyActor pid=75693) INFO 2025-06-23 20:04:07,794 proxy 10.0.61.28 -- Got updated endpoints: {}.
INFO 2025-06-23 20:04:07,815 serve 75456 -- Started Serve in namespace "serve".
(ServeController pid=75629) INFO 2025-06-23 20:04:07,905 controller 75629 -- Deploying new version of Deployment(name='ClassPredictor', app='default') (initial target replicas: 1).
(ServeController pid=75629) INFO 2025-06-23 20:04:07,907 controller 75629 -- Deploying new version of Deployment(name='Doggos', app='default') (initial target replicas: 1).
(ProxyActor pid=75693) INFO 2025-06-23 20:04:07,910 proxy 10.0.61.28 -- Got updated endpoints: {Deployment(name='Doggos', app='default'): EndpointInfo(route='/', app_is_cross_language=False)}.
(ServeController pid=75629) INFO 2025-06-23 20:04:08,013 controller 75629 -- Adding 1 replica to Deployment(name='ClassPredictor', app='default').
(ServeController pid=75629) INFO 2025-06-23 20:04:08,014 controller 75629 -- Adding 1 replica to Deployment(name='Doggos', app='default').
(ProxyActor pid=75693) INFO 2025-06-23 20:04:07,922 proxy 10.0.61.28 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x74a85c10e6c0>.
(ServeController pid=75629) WARNING 2025-06-23 20:04:38,040 controller 75629 -- Deployment 'ClassPredictor' in application 'default' has 1 replicas that have taken more than 30s to be scheduled. This may be due to waiting for the cluster to auto-scale or for a runtime environment to be installed. Resources required for each replica: {"CPU": 1, "GPU": 1, "accelerator_type:L4": 0.001}, total resources available: {"accelerator_type:L4": 0.999, "CPU": 2.0}. Use `ray status` for more details.
(ServeController pid=75629) WARNING 2025-06-23 20:04:38,041 controller 75629 -- Deployment 'Doggos' in application 'default' has 1 replicas that have taken more than 30s to be scheduled. This may be due to waiting for the cluster to auto-scale or for a runtime environment to be installed. Resources required for each replica: {"CPU": 1}, total resources available: {"CPU": 2.0}. Use `ray status` for more details.
(ServeReplica:default:Doggos pid=19668, ip=10.0.95.114) INFO 2025-06-23 20:05:03,231 default_Doggos 21c29nfb -- Direct ingress is disabled, skipping direct ingress server start
(ProxyActor pid=19768, ip=10.0.95.114) INFO 2025-06-23 20:05:05,037 proxy 10.0.95.114 -- Proxy starting on node 760a1c063ba581ef6100d697d1e1d263b0b354b603658541229768ae (HTTP port: 8000).
(ProxyActor pid=19768, ip=10.0.95.114) INFO 2025-06-23 20:05:05,092 proxy 10.0.95.114 -- Got updated endpoints: {Deployment(name='Doggos', app='default'): EndpointInfo(route='/', app_is_cross_language=False)}.
(ProxyActor pid=19768, ip=10.0.95.114) INFO 2025-06-23 20:05:05,105 proxy 10.0.95.114 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x79942d0e5730>.
(ServeReplica:default:ClassPredictor pid=19669, ip=10.0.95.114) Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
(ServeController pid=75629) WARNING 2025-06-23 20:05:08,122 controller 75629 -- Deployment 'ClassPredictor' in application 'default' has 1 replicas that have taken more than 30s to initialize.
(ServeController pid=75629) This may be caused by a slow __init__ or reconfigure method.
(ServeReplica:default:ClassPredictor pid=19669, ip=10.0.95.114) INFO 2025-06-23 20:05:09,415 default_ClassPredictor fyf5xp23 -- Direct ingress is disabled, skipping direct ingress server start
INFO 2025-06-23 20:05:10,065 serve 75456 -- Application 'default' is ready at http://127.0.0.1:8000/.
INFO 2025-06-23 20:05:10,071 serve 75456 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x727c2ab61430>.
DeploymentHandle(deployment='Doggos')
# Send a request.
url = "https://doggos-dataset.s3.us-west-2.amazonaws.com/samara.png"
data = {"url": url}
response = requests.post("http://127.0.0.1:8000/predict/", json=data)
probabilities = response.json()["probabilities"]
sorted_probabilities = sorted(probabilities.items(), key=lambda x: x[1], reverse=True)
sorted_probabilities[0:3]
[('collie', 0.2568000853061676),
 ('border_collie', 0.16908691823482513),
 ('bernese_mountain_dog', 0.0767023041844368)]
(autoscaler +38m14s) Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.

Ray Serve#

Ray Serve is a highly scalable and flexible model serving library for building online inference APIs that allows you to:

  • Wrap models and business logic as separate serve deployments and connect them together (pipeline, ensemble, etc.)

  • Avoid one large service that’s network and compute bounded and an inefficient use of resources.

  • Utilize fractional heterogeneous resources, which isn’t possible with SageMaker, Vertex, KServe, etc., and horizontally scale withnum_replicas.

  • autoscale up and down based on traffic.

  • Integrate with FastAPI and HTTP.

  • Set up a gRPC service to build distributed systems and microservices.

  • Enable dynamic batching based on batch size, time, etc.

  • Access a suite of utilities for serving LLMs that are inference-engine agnostic and have batteries-included support for LLM-specific features such as multi-LoRA support

https://raw.githubusercontent.com/anyscale/multimodal-ai/refs/heads/main/images/ray_serve.png

🔥 RayTurbo Serve on Anyscale has more functionality on top of Ray Serve:

  • fast autoscaling and model loading to get services up and running even faster with 5x improvements even for LLMs.

  • 54% higher QPS and up-to 3x streaming tokens per second for high traffic serving use-cases with no proxy bottlenecks.

  • replica compaction into fewer nodes where possible to reduce resource fragmentation and improve hardware utilization.

  • zero-downtime incremental rollouts so your service is never interrupted.

  • different environments for each service in a multi-serve application.

  • multi availability-zone aware scheduling of Ray Serve replicas to provide higher redundancy to availability zone failures.

Observability#

The Ray dashboard and specifically the Serve view automatically captures observability for Ray Serve applications. You can view the service deployments and their replicas and time-series metrics to see the service’s health.

https://raw.githubusercontent.com/anyscale/multimodal-ai/refs/heads/main/images/serve_dashboard.png

Production services#

Anyscale Services (API ref) offers a fault tolerant, scalable, and optimized way to serve Ray Serve applications. You can:

  • rollout and update services with canary deployment and zero-downtime upgrades.

  • monitor services through a dedicated service page, unified log viewer, tracing, set up alerts, etc.

  • scale a service, with num_replicas=auto, and utilize replica compaction to consolidate nodes that are fractionally utilized.

  • get head node fault tolerance. OSS Ray recovers from failed workers and replicas but not head node crashes.

  • serve multiple applications in a single service.

https://raw.githubusercontent.com/anyscale/multimodal-ai/refs/heads/main/images/canary.png

Note:

  • This tutorial uses a containerfile to define dependencies, but you could easily use a pre-built image as well.

  • You can specify the compute as a compute config or inline in a Service config file.

  • When you don’t specify compute while launching from a workspace, this configuration defaults to the compute configuration of the workspace.

# Production online service.
anyscale service deploy doggos.serve:app --name=doggos-app \
    --containerfile="/home/ray/default/containerfile" \
    --compute-config="/home/ray/default/configs/aws.yaml" \
    --working-dir="/home/ray/default" \
    --exclude=""
(anyscale +1.9s) Restarting existing service 'doggos-app'.
(anyscale +3.2s) Uploading local dir '/home/ray/default' to cloud storage.
(anyscale +5.2s) Including workspace-managed pip dependencies.
(anyscale +5.8s) Service 'doggos-app' deployed (version ID: akz9ul28).
(anyscale +5.8s) View the service in the UI: 'https://console.anyscale.com/services/service2_6hxismeqf1fkd2h7pfmljmncvm'
(anyscale +5.8s) Query the service once it's running using the following curl command (add the path you want to query):
(anyscale +5.8s) curl -H "Authorization: Bearer <BEARER_TOKEN>" https://doggos-app-bxauk.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com/
curl -X POST "https://doggos-app-bxauk.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com/predict/" \
     -H "Authorization: Bearer <BEARER_TOKEN>" \
     -H "Content-Type: application/json" \
     -d '{"url": "https://doggos-dataset.s3.us-west-2.amazonaws.com/samara.png", "k": 4}'
# Terminate service.
anyscale service terminate --name doggos-app
(anyscale +1.5s) Service service2_6hxismeqf1fkd2h7pfmljmncvm terminate initiated.
(anyscale +1.5s) View the service in the UI at https://console.anyscale.com/services/service2_6hxismeqf1fkd2h7pfmljmncvm

CI/CD#

While Anyscale Jobs and Services are useful atomic concepts that help you productionize workloads, they’re also useful for nodes in a larger ML DAG or CI/CD workflow. You can chain Jobs together, store results and then serve your application with those artifacts. From there, you can trigger updates to your service and retrigger the Jobs based on events, time, etc. While you can simply use the Anyscale CLI to integrate with any orchestration platform, Anyscale does support some purpose-built integrations like Airflow and Prefect.

https://raw.githubusercontent.com/anyscale/multimodal-ai/refs/heads/main/images/cicd.png

🚨 Note: Reset this notebook using the “🔄 Restart” button location at the notebook’s menu bar. This way we can free up all the variables, utils, etc. used in this notebook.