Set Up FastAPI and HTTP#

This section helps you understand how to:

  • Send HTTP requests to Serve deployments

  • Use Ray Serve to integrate with FastAPI

  • Use customized HTTP adapters

  • Choose which feature to use for your use case

  • Set up keep alive timeout

Choosing the right HTTP feature#

Serve offers a layered approach to expose your model with the right HTTP API.

Considering your use case, you can choose the right level of abstraction:

Calling Deployments via HTTP#

When you deploy a Serve application, the ingress deployment (the one passed to serve.run) is exposed over HTTP.

import starlette.requests
import requests
from ray import serve


@serve.deployment
class Counter:
    def __call__(self, request: starlette.requests.Request):
        return request.query_params


serve.run(Counter.bind())
resp = requests.get("http://localhost:8000?a=b&c=d")
assert resp.json() == {"a": "b", "c": "d"}

Requests to the Serve HTTP server at / are routed to the deployment’s __call__ method with a Starlette Request object as the sole argument. The __call__ method can return any JSON-serializable object or a Starlette Response object (e.g., to return a custom status code or custom headers). A Serve app’s route prefix can be changed from / to another string by setting route_prefix in serve.run() or the Serve config file.

Request cancellation#

When processing a request takes longer than the end-to-end timeout or an HTTP client disconnects before receiving a response, Serve cancels the in-flight request:

  • If the proxy hasn’t yet sent the request to a replica, Serve simply drops the request.

  • If the request has been sent to a replica, Serve attempts to interrupt the replica and cancel the request. The asyncio.Task running the handler on the replica is cancelled, raising an asyncio.CancelledError the next time it enters an await statement. See the asyncio docs for more info. Handle this exception in a try-except block to customize your deployment’s behavior when a request is cancelled:

import asyncio
from ray import serve


@serve.deployment
async def startled():
    try:
        print("Replica received request!")
        await asyncio.sleep(10000)
    except asyncio.CancelledError:
        # Add custom behavior that should run
        # upon cancellation here.
        print("Request got cancelled!")

If no await statements are left in the deployment’s code before the request completes, the replica processes the request as usual, sends the response back to the proxy, and the proxy discards the response. Use await statements for blocking operations in a deployment, so Serve can cancel in-flight requests without waiting for the blocking operation to complete.

Cancellation cascades to any downstream deployment handle, task, or actor calls that were spawned in the deployment’s request-handling method. These can handle the asyncio.CancelledError in the same way as the ingress deployment.

To prevent an async call from being interrupted by asyncio.CancelledError, use asyncio.shield():

import asyncio
from ray import serve


@serve.deployment
class SnoringSleeper:
    async def snore(self):
        await asyncio.sleep(1)
        print("ZZZ")

    async def __call__(self):
        try:
            print("SnoringSleeper received request!")

            # Prevent the snore() method from being cancelled
            await asyncio.shield(self.snore())

        except asyncio.CancelledError:
            print("SnoringSleeper's request was cancelled!")


app = SnoringSleeper.bind()

When the request is cancelled, a cancellation error is raised inside the SnoringSleeper deployment’s __call__() method. However, the cancellation is not raised inside the snore() call, so ZZZ is printed even if the request is cancelled. Note that asyncio.shield cannot be used on a DeploymentHandle call to prevent the downstream handler from being cancelled. You need to explicitly handle the cancellation error in that handler as well.

FastAPI HTTP Deployments#

If you want to define more complex HTTP handling logic, Serve integrates with FastAPI. This allows you to define a Serve deployment using the @serve.ingress decorator that wraps a FastAPI app with its full range of features. The most basic example of this is shown below, but for more details on all that FastAPI has to offer such as variable routes, automatic type validation, dependency injection (e.g., for database connections), and more, please check out their documentation.

Note

A Serve application that’s integrated with FastAPI still respects the route_prefix set through Serve. The routes are that registered through the FastAPI app object are layered on top of the route prefix. For instance, if your Serve application has route_prefix = /my_app and you decorate a method with @app.get("/fetch_data"), then you can call that method by sending a GET request to the path /my_app/fetch_data.

import ray
import requests
from fastapi import FastAPI
from ray import serve

app = FastAPI()


@serve.deployment
@serve.ingress(app)
class MyFastAPIDeployment:
    @app.get("/")
    def root(self):
        return "Hello, world!"


serve.run(MyFastAPIDeployment.bind(), route_prefix="/hello")
resp = requests.get("http://localhost:8000/hello")
assert resp.json() == "Hello, world!"

Now if you send a request to /hello, this will be routed to the root method of our deployment. We can also easily leverage FastAPI to define multiple routes with different HTTP methods:

import ray
import requests
from fastapi import FastAPI
from ray import serve

app = FastAPI()


@serve.deployment
@serve.ingress(app)
class MyFastAPIDeployment:
    @app.get("/")
    def root(self):
        return "Hello, world!"

    @app.post("/{subpath}")
    def root(self, subpath: str):
        return f"Hello from {subpath}!"


serve.run(MyFastAPIDeployment.bind(), route_prefix="/hello")
resp = requests.post("http://localhost:8000/hello/Serve")
assert resp.json() == "Hello from Serve!"

You can also pass in an existing FastAPI app to a deployment to serve it as-is:

import ray
import requests
from fastapi import FastAPI
from ray import serve

app = FastAPI()


@app.get("/")
def f():
    return "Hello from the root!"


@serve.deployment
@serve.ingress(app)
class FastAPIWrapper:
    pass


serve.run(FastAPIWrapper.bind(), route_prefix="/")
resp = requests.get("http://localhost:8000/")
assert resp.json() == "Hello from the root!"

This is useful for scaling out an existing FastAPI app with no modifications necessary. Existing middlewares, automatic OpenAPI documentation generation, and other advanced FastAPI features should work as-is.

WebSockets#

Serve supports WebSockets via FastAPI:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

from ray import serve


app = FastAPI()


@serve.deployment
@serve.ingress(app)
class EchoServer:
    @app.websocket("/")
    async def echo(self, ws: WebSocket):
        await ws.accept()

        try:
            while True:
                text = await ws.receive_text()
                await ws.send_text(text)
        except WebSocketDisconnect:
            print("Client disconnected.")


serve_app = serve.run(EchoServer.bind())

Decorate the function that handles WebSocket requests with @app.websocket. Read more about FastAPI WebSockets in the FastAPI documentation.

Query the deployment using the websockets package (pip install websockets):

from websockets.sync.client import connect

with connect("ws://localhost:8000") as websocket:
    websocket.send("Eureka!")
    assert websocket.recv() == "Eureka!"

    websocket.send("I've found it!")
    assert websocket.recv() == "I've found it!"

Streaming Responses#

Some applications must stream incremental results back to the caller. This is common for text generation using large language models (LLMs) or video processing applications. The full forward pass may take multiple seconds, so providing incremental results as they’re available provides a much better user experience.

To use HTTP response streaming, return a StreamingResponse that wraps a generator from your HTTP handler. This is supported for basic HTTP ingress deployments using a __call__ method and when using the FastAPI integration.

The code below defines a Serve application that incrementally streams numbers up to a provided max. The client-side code is also updated to handle the streaming outputs. This code uses the stream=True option to the requests library.

import time
from typing import Generator

import requests
from starlette.responses import StreamingResponse
from starlette.requests import Request

from ray import serve


@serve.deployment
class StreamingResponder:
    def generate_numbers(self, max: int) -> Generator[str, None, None]:
        for i in range(max):
            yield str(i)
            time.sleep(0.1)

    def __call__(self, request: Request) -> StreamingResponse:
        max = request.query_params.get("max", "25")
        gen = self.generate_numbers(int(max))
        return StreamingResponse(gen, status_code=200, media_type="text/plain")


serve.run(StreamingResponder.bind())

r = requests.get("http://localhost:8000?max=10", stream=True)
start = time.time()
r.raise_for_status()
for chunk in r.iter_content(chunk_size=None, decode_unicode=True):
    print(f"Got result {round(time.time()-start, 1)}s after start: '{chunk}'")

Save this code in stream.py and run it:

$ python stream.py
[2023-05-25 10:44:23]  INFO ray._private.worker::Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(ServeController pid=40401) INFO 2023-05-25 10:44:25,296 controller 40401 deployment_state.py:1259 - Deploying new version of deployment default_StreamingResponder.
(ProxyActor pid=40403) INFO:     Started server process [40403]
(ServeController pid=40401) INFO 2023-05-25 10:44:25,333 controller 40401 deployment_state.py:1498 - Adding 1 replica to deployment default_StreamingResponder.
Got result 0.0s after start: '0'
Got result 0.1s after start: '1'
Got result 0.2s after start: '2'
Got result 0.3s after start: '3'
Got result 0.4s after start: '4'
Got result 0.5s after start: '5'
Got result 0.6s after start: '6'
Got result 0.7s after start: '7'
Got result 0.8s after start: '8'
Got result 0.9s after start: '9'
(ServeReplica:default_StreamingResponder pid=41052) INFO 2023-05-25 10:49:52,230 default_StreamingResponder default_StreamingResponder#qlZFCa yomKnJifNJ / default replica.py:634 - __CALL__ OK 1017.6ms

Terminating the stream when a client disconnects#

In some cases, you may want to cease processing a request when the client disconnects before the full stream has been returned. If you pass an async generator to StreamingResponse, it is cancelled and raises an asyncio.CancelledError when the client disconnects. Note that you must await at some point in the generator for the cancellation to occur.

In the example below, the generator streams responses forever until the client disconnects, then it prints that it was cancelled and exits. Save this code in stream.py and run it:

import asyncio
import time
from typing import AsyncGenerator

import requests
from starlette.responses import StreamingResponse
from starlette.requests import Request

from ray import serve


@serve.deployment
class StreamingResponder:
    async def generate_forever(self) -> AsyncGenerator[str, None]:
        try:
            i = 0
            while True:
                yield str(i)
                i += 1
                await asyncio.sleep(0.1)
        except asyncio.CancelledError:
            print("Cancelled! Exiting.")

    def __call__(self, request: Request) -> StreamingResponse:
        gen = self.generate_forever()
        return StreamingResponse(gen, status_code=200, media_type="text/plain")


serve.run(StreamingResponder.bind())

r = requests.get("http://localhost:8000?max=10", stream=True)
start = time.time()
r.raise_for_status()
for i, chunk in enumerate(r.iter_content(chunk_size=None, decode_unicode=True)):
    print(f"Got result {round(time.time()-start, 1)}s after start: '{chunk}'")
    if i == 10:
        print("Client disconnecting")
        break
$ python stream.py
[2023-07-10 16:08:41]  INFO ray._private.worker::Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(ServeController pid=50801) INFO 2023-07-10 16:08:42,296 controller 40401 deployment_state.py:1259 - Deploying new version of deployment default_StreamingResponder.
(ProxyActor pid=50803) INFO:     Started server process [50803]
(ServeController pid=50805) INFO 2023-07-10 16:08:42,963 controller 50805 deployment_state.py:1586 - Adding 1 replica to deployment default_StreamingResponder.
Got result 0.0s after start: '0'
Got result 0.1s after start: '1'
Got result 0.2s after start: '2'
Got result 0.3s after start: '3'
Got result 0.4s after start: '4'
Got result 0.5s after start: '5'
Got result 0.6s after start: '6'
Got result 0.7s after start: '7'
Got result 0.8s after start: '8'
Got result 0.9s after start: '9'
Got result 1.0s after start: '10'
Client disconnecting
(ServeReplica:default_StreamingResponder pid=50842) Cancelled! Exiting.
(ServeReplica:default_StreamingResponder pid=50842) INFO 2023-07-10 16:08:45,756 default_StreamingResponder default_StreamingResponder#cmpnmF ahteNDQSWx / default replica.py:691 - __CALL__ OK 1019.1ms

Set keep alive timeout#

Serve uses a Uvicorn HTTP server internally to serve HTTP requests. By default, Uvicorn keeps HTTP connections alive for 5 seconds between requests. Modify the keep-alive timeout by setting the keep_alive_timeout_s in the http_options field of the Serve config files. This config is global to your Ray cluster, and you can’t update it during runtime. You can also set the RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S environment variable to set the keep alive timeout. RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S takes precedence over the keep_alive_timeout_s config if both are set. See Uvicorn’s keep alive timeout guide for more information.