Experimental Direct Ingress
Contents
Experimental Direct Ingress#
In the 2.1, Serve provides an alpha version of gRPC ingress.
With RPC protocol, You will get:
Standardized inference request/response schema during client and serve.
High performant endpoint than HTTP protocol.
In this section, you will learn how to
use Serve’s built-in gRPC schema to receive client traffic
bring your own gRPC schema into your Serve application
Use Serve’s Schema#
Serve provides a simple gRPC schema to machine learning inference workload. It is designed to be kept simple, and you are encouraged to adapt it for your own need.
message PredictRequest {
map<string, bytes> input = 2;
}
message PredictResponse {
bytes prediction = 1;
}
service PredictAPIsService {
rpc Predict(PredictRequest) returns (PredictResponse);
}
Take a look at the following code samples for using DefaultgRPCDriver
in Ray Serve.
To implement the Serve, your class needs to inherit ray.serve.drivers.DefaultgRPCDriver
.
import ray
from ray import serve
from ray.serve.drivers import DefaultgRPCDriver
from ray.serve.handle import RayServeDeploymentHandle
from ray.serve.deployment_graph import InputNode
from typing import Dict
import struct
@serve.deployment
class FruitMarket:
def __init__(
self,
orange_stand: RayServeDeploymentHandle,
apple_stand: RayServeDeploymentHandle,
):
self.directory = {
"ORANGE": orange_stand,
"APPLE": apple_stand,
}
async def check_price(self, inputs: Dict[str, bytes]) -> float:
costs = 0
for fruit, amount in inputs.items():
if fruit not in self.directory:
return
fruit_stand = self.directory[fruit]
ref: ray.ObjectRef = await fruit_stand.remote(int(amount))
result = await ref
costs += result
return bytearray(struct.pack("f", costs))
@serve.deployment
class OrangeStand:
def __init__(self):
self.price = 2.0
def __call__(self, num_oranges: int):
return num_oranges * self.price
@serve.deployment
class AppleStand:
def __init__(self):
self.price = 3.0
def __call__(self, num_oranges: int):
return num_oranges * self.price
with InputNode() as input:
orange_stand = OrangeStand.bind()
apple_stand = AppleStand.bind()
fruit_market = FruitMarket.bind(orange_stand, apple_stand)
my_deployment = DefaultgRPCDriver.bind(fruit_market.check_price.bind(input))
serve.run(my_deployment)
Client: You can use Serve’s built-in gRPC client to send query to the model.
import grpc
from ray.serve.generated import serve_pb2, serve_pb2_grpc
import asyncio
import struct
async def send_request():
async with grpc.aio.insecure_channel("localhost:9000") as channel:
stub = serve_pb2_grpc.PredictAPIsServiceStub(channel)
response = await stub.Predict(
serve_pb2.PredictRequest(
input={"ORANGE": bytes("10", "utf-8"), "APPLE": bytes("3", "utf-8")}
)
)
return response
async def main():
resp = await send_request()
print(struct.unpack("f", resp.prediction))
# for python>=3.7, please use asyncio.run(main())
asyncio.get_event_loop().run_until_complete(main())
Note
input
is a dictionary ofmap<string, bytes>
following the schema described above.The user input data needs to be serialized to
bytes
type and fed into theinput
.The response will be under
bytes
type, which means the user code is responsible for serializing the output into bytes.By default, the gRPC port is 9000. You can change it by passing port number when calling DefaultgRPCDriver bind function.
If the serialization/deserialization cost is huge and unnecessary, you can also bring your own schema to use! Checkout Bring your own schema section!
There is no difference of scaling config for your business code in gRPC case, you can set the config scaling/autoscaling config inside the
serve.deployment
decorator.
Client schema code generation#
You can use the client either by importing it from the ray
Python package. Alternatively, you can just copy Serve’s protobuf file to generate the gRPC client.
Install the gRPC code generation tools
pip install grpcio-tools
Generate gRPC code based on the schema
python -m grpc_tools.protoc --proto_path=src/ray/protobuf/ --python_out=. --grpc_python_out=. src/ray/protobuf/serve.proto
After the two steps above, you should have serve_pb2.py
and serve_pb2_grpc.py
files generated.
Bring your own schema#
If you have a customized schema to use, Serve also supports it!
Assume you have the following customized schema and have generated the corresponding gRPC code:
message PingRequest {
bool no_reply = 1;
}
message PingReply {
}
message PingTimeoutRequest {}
message PingTimeoutReply {}
service TestService {
rpc Ping(PingRequest) returns (PingReply);
rpc PingTimeout(PingTimeoutRequest) returns (PingTimeoutReply);
}
After the code is generated, you can implement the business logic for gRPC server by creating a subclass of the generated TestServiceServicer
, and then you just need two extra steps to adopt your schema into Ray Serve.
Inherit
ray.serve.drivers.gRPCIngress
in your implementation class.Add the
@serve.deployment(is_driver_deployment=True)
decorator.
Server:
from ray import serve
from ray.serve.drivers import gRPCIngress
import test_service_pb2_grpc, test_service_pb2
@serve.deployment(is_driver_deployment=True)
class MyDriver(test_service_pb2_grpc.TestServiceServicer, gRPCIngress):
def __init__(self):
super().__init__()
async def Ping(self, request, context):
# play with your dag and then reply
return test_service_pb2.PingReply()
my_deployment = MyDriver.bind()
serve.run(my_deployment)
Client: You can directly use the client code to play it!
import grpc
import test_service_pb2_grpc, test_service_pb2
channel = grpc.aio.insecure_channel("localhost:9000")
stub = test_service_pb2_grpc.TestServiceStub(channel)
response = stub.Ping(test_service_pb2.PingRequest())
Note
is_driver_deployment
(experimental flag) is needed to mark the class as driver, serve will make sure the driver class deployment gets deployed one replica per node.gRPCIngress
is used for starting a gRPC server. Your driver class needs to inherit from it.