Set Up FastAPI and HTTP#
This section helps you understand how to:
Send HTTP requests to Serve deployments
Use Ray Serve to integrate with FastAPI
Use customized HTTP adapters
Choose which feature to use for your use case
Set up keep alive timeout
Choosing the right HTTP feature#
Serve offers a layered approach to expose your model with the right HTTP API.
Considering your use case, you can choose the right level of abstraction:
If you are comfortable working with the raw request object, use
starlette.request.Requests
API.If you want a fully fledged API server with validation and doc generation, use the FastAPI integration.
Calling Deployments via HTTP#
When you deploy a Serve application, the ingress deployment (the one passed to serve.run
) is exposed over HTTP.
import starlette.requests
import requests
from ray import serve
@serve.deployment
class Counter:
def __call__(self, request: starlette.requests.Request):
return request.query_params
serve.run(Counter.bind())
resp = requests.get("http://localhost:8000?a=b&c=d")
assert resp.json() == {"a": "b", "c": "d"}
Requests to the Serve HTTP server at /
are routed to the deployment’s __call__
method with a Starlette Request object as the sole argument. The __call__
method can return any JSON-serializable object or a Starlette Response object (e.g., to return a custom status code or custom headers). A Serve app’s route prefix can be changed from /
to another string by setting route_prefix
in serve.run()
or the Serve config file.
Request cancellation#
When processing a request takes longer than the end-to-end timeout or an HTTP client disconnects before receiving a response, Serve cancels the in-flight request:
If the proxy hasn’t yet sent the request to a replica, Serve simply drops the request.
If the request has been sent to a replica, Serve attempts to interrupt the replica and cancel the request. The
asyncio.Task
running the handler on the replica is cancelled, raising anasyncio.CancelledError
the next time it enters anawait
statement. See the asyncio docs for more info. Handle this exception in a try-except block to customize your deployment’s behavior when a request is cancelled:
import asyncio
from ray import serve
@serve.deployment
async def startled():
try:
print("Replica received request!")
await asyncio.sleep(10000)
except asyncio.CancelledError:
# Add custom behavior that should run
# upon cancellation here.
print("Request got cancelled!")
If no await
statements are left in the deployment’s code before the request completes, the replica processes the request as usual, sends the response back to the proxy, and the proxy discards the response. Use await
statements for blocking operations in a deployment, so Serve can cancel in-flight requests without waiting for the blocking operation to complete.
Cancellation cascades to any downstream deployment handle, task, or actor calls that were spawned in the deployment’s request-handling method. These can handle the asyncio.CancelledError
in the same way as the ingress deployment.
To prevent an async call from being interrupted by asyncio.CancelledError
, use asyncio.shield()
:
import asyncio
from ray import serve
@serve.deployment
class SnoringSleeper:
async def snore(self):
await asyncio.sleep(1)
print("ZZZ")
async def __call__(self):
try:
print("SnoringSleeper received request!")
# Prevent the snore() method from being cancelled
await asyncio.shield(self.snore())
except asyncio.CancelledError:
print("SnoringSleeper's request was cancelled!")
app = SnoringSleeper.bind()
When the request is cancelled, a cancellation error is raised inside the SnoringSleeper
deployment’s __call__()
method. However, the cancellation is not raised inside the snore()
call, so ZZZ
is printed even if the request is cancelled. Note that asyncio.shield
cannot be used on a DeploymentHandle
call to prevent the downstream handler from being cancelled. You need to explicitly handle the cancellation error in that handler as well.
FastAPI HTTP Deployments#
If you want to define more complex HTTP handling logic, Serve integrates with FastAPI. This allows you to define a Serve deployment using the @serve.ingress
decorator that wraps a FastAPI app with its full range of features. The most basic example of this is shown below, but for more details on all that FastAPI has to offer such as variable routes, automatic type validation, dependency injection (e.g., for database connections), and more, please check out their documentation.
Note
A Serve application that’s integrated with FastAPI still respects the route_prefix
set through Serve. The routes are that registered through the FastAPI app
object are layered on top of the route prefix. For instance, if your Serve application has route_prefix = /my_app
and you decorate a method with @app.get("/fetch_data")
, then you can call that method by sending a GET request to the path /my_app/fetch_data
.
import ray
import requests
from fastapi import FastAPI
from ray import serve
app = FastAPI()
@serve.deployment
@serve.ingress(app)
class MyFastAPIDeployment:
@app.get("/")
def root(self):
return "Hello, world!"
serve.run(MyFastAPIDeployment.bind(), route_prefix="/hello")
resp = requests.get("http://localhost:8000/hello")
assert resp.json() == "Hello, world!"
Now if you send a request to /hello
, this will be routed to the root
method of our deployment. We can also easily leverage FastAPI to define multiple routes with different HTTP methods:
import ray
import requests
from fastapi import FastAPI
from ray import serve
app = FastAPI()
@serve.deployment
@serve.ingress(app)
class MyFastAPIDeployment:
@app.get("/")
def root(self):
return "Hello, world!"
@app.post("/{subpath}")
def root(self, subpath: str):
return f"Hello from {subpath}!"
serve.run(MyFastAPIDeployment.bind(), route_prefix="/hello")
resp = requests.post("http://localhost:8000/hello/Serve")
assert resp.json() == "Hello from Serve!"
You can also pass in an existing FastAPI app to a deployment to serve it as-is:
import ray
import requests
from fastapi import FastAPI
from ray import serve
app = FastAPI()
@app.get("/")
def f():
return "Hello from the root!"
@serve.deployment
@serve.ingress(app)
class FastAPIWrapper:
pass
serve.run(FastAPIWrapper.bind(), route_prefix="/")
resp = requests.get("http://localhost:8000/")
assert resp.json() == "Hello from the root!"
This is useful for scaling out an existing FastAPI app with no modifications necessary. Existing middlewares, automatic OpenAPI documentation generation, and other advanced FastAPI features should work as-is.
WebSockets#
Serve supports WebSockets via FastAPI:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from ray import serve
app = FastAPI()
@serve.deployment
@serve.ingress(app)
class EchoServer:
@app.websocket("/")
async def echo(self, ws: WebSocket):
await ws.accept()
try:
while True:
text = await ws.receive_text()
await ws.send_text(text)
except WebSocketDisconnect:
print("Client disconnected.")
serve_app = serve.run(EchoServer.bind())
Decorate the function that handles WebSocket requests with @app.websocket
. Read more about FastAPI WebSockets in the FastAPI documentation.
Query the deployment using the websockets
package (pip install websockets
):
from websockets.sync.client import connect
with connect("ws://localhost:8000") as websocket:
websocket.send("Eureka!")
assert websocket.recv() == "Eureka!"
websocket.send("I've found it!")
assert websocket.recv() == "I've found it!"
Streaming Responses#
Some applications must stream incremental results back to the caller. This is common for text generation using large language models (LLMs) or video processing applications. The full forward pass may take multiple seconds, so providing incremental results as they’re available provides a much better user experience.
To use HTTP response streaming, return a StreamingResponse that wraps a generator from your HTTP handler.
This is supported for basic HTTP ingress deployments using a __call__
method and when using the FastAPI integration.
The code below defines a Serve application that incrementally streams numbers up to a provided max
.
The client-side code is also updated to handle the streaming outputs.
This code uses the stream=True
option to the requests library.
import time
from typing import Generator
import requests
from starlette.responses import StreamingResponse
from starlette.requests import Request
from ray import serve
@serve.deployment
class StreamingResponder:
def generate_numbers(self, max: int) -> Generator[str, None, None]:
for i in range(max):
yield str(i)
time.sleep(0.1)
def __call__(self, request: Request) -> StreamingResponse:
max = request.query_params.get("max", "25")
gen = self.generate_numbers(int(max))
return StreamingResponse(gen, status_code=200, media_type="text/plain")
serve.run(StreamingResponder.bind())
r = requests.get("http://localhost:8000?max=10", stream=True)
start = time.time()
r.raise_for_status()
for chunk in r.iter_content(chunk_size=None, decode_unicode=True):
print(f"Got result {round(time.time()-start, 1)}s after start: '{chunk}'")
Save this code in stream.py
and run it:
$ python stream.py
[2023-05-25 10:44:23] INFO ray._private.worker::Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(ServeController pid=40401) INFO 2023-05-25 10:44:25,296 controller 40401 deployment_state.py:1259 - Deploying new version of deployment default_StreamingResponder.
(ProxyActor pid=40403) INFO: Started server process [40403]
(ServeController pid=40401) INFO 2023-05-25 10:44:25,333 controller 40401 deployment_state.py:1498 - Adding 1 replica to deployment default_StreamingResponder.
Got result 0.0s after start: '0'
Got result 0.1s after start: '1'
Got result 0.2s after start: '2'
Got result 0.3s after start: '3'
Got result 0.4s after start: '4'
Got result 0.5s after start: '5'
Got result 0.6s after start: '6'
Got result 0.7s after start: '7'
Got result 0.8s after start: '8'
Got result 0.9s after start: '9'
(ServeReplica:default_StreamingResponder pid=41052) INFO 2023-05-25 10:49:52,230 default_StreamingResponder default_StreamingResponder#qlZFCa yomKnJifNJ / default replica.py:634 - __CALL__ OK 1017.6ms
Terminating the stream when a client disconnects#
In some cases, you may want to cease processing a request when the client disconnects before the full stream has been returned.
If you pass an async generator to StreamingResponse
, it is cancelled and raises an asyncio.CancelledError
when the client disconnects.
Note that you must await
at some point in the generator for the cancellation to occur.
In the example below, the generator streams responses forever until the client disconnects, then it prints that it was cancelled and exits. Save this code in stream.py
and run it:
import asyncio
import time
from typing import AsyncGenerator
import requests
from starlette.responses import StreamingResponse
from starlette.requests import Request
from ray import serve
@serve.deployment
class StreamingResponder:
async def generate_forever(self) -> AsyncGenerator[str, None]:
try:
i = 0
while True:
yield str(i)
i += 1
await asyncio.sleep(0.1)
except asyncio.CancelledError:
print("Cancelled! Exiting.")
def __call__(self, request: Request) -> StreamingResponse:
gen = self.generate_forever()
return StreamingResponse(gen, status_code=200, media_type="text/plain")
serve.run(StreamingResponder.bind())
r = requests.get("http://localhost:8000?max=10", stream=True)
start = time.time()
r.raise_for_status()
for i, chunk in enumerate(r.iter_content(chunk_size=None, decode_unicode=True)):
print(f"Got result {round(time.time()-start, 1)}s after start: '{chunk}'")
if i == 10:
print("Client disconnecting")
break
$ python stream.py
[2023-07-10 16:08:41] INFO ray._private.worker::Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(ServeController pid=50801) INFO 2023-07-10 16:08:42,296 controller 40401 deployment_state.py:1259 - Deploying new version of deployment default_StreamingResponder.
(ProxyActor pid=50803) INFO: Started server process [50803]
(ServeController pid=50805) INFO 2023-07-10 16:08:42,963 controller 50805 deployment_state.py:1586 - Adding 1 replica to deployment default_StreamingResponder.
Got result 0.0s after start: '0'
Got result 0.1s after start: '1'
Got result 0.2s after start: '2'
Got result 0.3s after start: '3'
Got result 0.4s after start: '4'
Got result 0.5s after start: '5'
Got result 0.6s after start: '6'
Got result 0.7s after start: '7'
Got result 0.8s after start: '8'
Got result 0.9s after start: '9'
Got result 1.0s after start: '10'
Client disconnecting
(ServeReplica:default_StreamingResponder pid=50842) Cancelled! Exiting.
(ServeReplica:default_StreamingResponder pid=50842) INFO 2023-07-10 16:08:45,756 default_StreamingResponder default_StreamingResponder#cmpnmF ahteNDQSWx / default replica.py:691 - __CALL__ OK 1019.1ms
Set keep alive timeout#
Serve uses a Uvicorn HTTP server internally to serve HTTP requests. By default, Uvicorn
keeps HTTP connections alive for 5 seconds between requests. Modify the keep-alive
timeout by setting the keep_alive_timeout_s
in the http_options
field of the Serve
config files. This config is global to your Ray cluster, and you can’t update it during
runtime. You can also set the RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S
environment variable to
set the keep alive timeout. RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S
takes
precedence over the keep_alive_timeout_s
config if both are set. See
Uvicorn’s keep alive timeout guide for more information.