Experimental Direct Ingress#

In the 2.1, Serve provides an alpha version of gRPC ingress.

With RPC protocol, You will get:

  • Standardized inference request/response schema during client and serve.

  • High performant endpoint than HTTP protocol.

In this section, you will learn how to

  • use Serve’s built-in gRPC schema to receive client traffic

  • bring your own gRPC schema into your Serve application

Use Serve’s Schema#

Serve provides a simple gRPC schema to machine learning inference workload. It is designed to be kept simple, and you are encouraged to adapt it for your own need.

message PredictRequest {
  map<string, bytes> input = 2;
}

message PredictResponse {
  bytes prediction = 1;
}

service PredictAPIsService {
  rpc Predict(PredictRequest) returns (PredictResponse);
}

Take a look at the following code samples for using DefaultgRPCDriver in Ray Serve.

To implement the Serve, your class needs to inherit ray.serve.drivers.DefaultgRPCDriver.

import ray
from ray import serve
from ray.serve.drivers import DefaultgRPCDriver
from ray.serve.handle import RayServeDeploymentHandle
from ray.serve.deployment_graph import InputNode
from typing import Dict
import struct


@serve.deployment
class FruitMarket:
    def __init__(
        self,
        orange_stand: RayServeDeploymentHandle,
        apple_stand: RayServeDeploymentHandle,
    ):
        self.directory = {
            "ORANGE": orange_stand,
            "APPLE": apple_stand,
        }

    async def check_price(self, inputs: Dict[str, bytes]) -> float:
        costs = 0
        for fruit, amount in inputs.items():
            if fruit not in self.directory:
                return
            fruit_stand = self.directory[fruit]
            ref: ray.ObjectRef = await fruit_stand.remote(int(amount))
            result = await ref
            costs += result
        return bytearray(struct.pack("f", costs))


@serve.deployment
class OrangeStand:
    def __init__(self):
        self.price = 2.0

    def __call__(self, num_oranges: int):
        return num_oranges * self.price


@serve.deployment
class AppleStand:
    def __init__(self):
        self.price = 3.0

    def __call__(self, num_oranges: int):
        return num_oranges * self.price


with InputNode() as input:
    orange_stand = OrangeStand.bind()
    apple_stand = AppleStand.bind()
    fruit_market = FruitMarket.bind(orange_stand, apple_stand)
    my_deployment = DefaultgRPCDriver.bind(fruit_market.check_price.bind(input))

serve.run(my_deployment)

Client: You can use Serve’s built-in gRPC client to send query to the model.

import grpc
from ray.serve.generated import serve_pb2, serve_pb2_grpc
import asyncio
import struct


async def send_request():
    async with grpc.aio.insecure_channel("localhost:9000") as channel:
        stub = serve_pb2_grpc.PredictAPIsServiceStub(channel)
        response = await stub.Predict(
            serve_pb2.PredictRequest(
                input={"ORANGE": bytes("10", "utf-8"), "APPLE": bytes("3", "utf-8")}
            )
        )
    return response


async def main():
    resp = await send_request()
    print(struct.unpack("f", resp.prediction))


# for python>=3.7, please use asyncio.run(main())
asyncio.get_event_loop().run_until_complete(main())

Note

  • input is a dictionary of map<string, bytes> following the schema described above.

  • The user input data needs to be serialized to bytes type and fed into the input.

  • The response will be under bytes type, which means the user code is responsible for serializing the output into bytes.

  • By default, the gRPC port is 9000. You can change it by passing port number when calling DefaultgRPCDriver bind function.

  • If the serialization/deserialization cost is huge and unnecessary, you can also bring your own schema to use! Checkout Bring your own schema section!

  • There is no difference of scaling config for your business code in gRPC case, you can set the config scaling/autoscaling config inside the serve.deployment decorator.

Client schema code generation#

You can use the client either by importing it from the ray Python package. Alternatively, you can just copy Serve’s protobuf file to generate the gRPC client.

  • Install the gRPC code generation tools

pip install grpcio-tools
  • Generate gRPC code based on the schema

python -m grpc_tools.protoc --proto_path=src/ray/protobuf/ --python_out=. --grpc_python_out=. src/ray/protobuf/serve.proto

After the two steps above, you should have serve_pb2.py and serve_pb2_grpc.py files generated.

Bring your own schema#

If you have a customized schema to use, Serve also supports it!

Assume you have the following customized schema and have generated the corresponding gRPC code:

message PingRequest {
  bool no_reply = 1;
}
message PingReply {
}

message PingTimeoutRequest {}
message PingTimeoutReply {}

service TestService {
  rpc Ping(PingRequest) returns (PingReply);
  rpc PingTimeout(PingTimeoutRequest) returns (PingTimeoutReply);
}

After the code is generated, you can implement the business logic for gRPC server by creating a subclass of the generated TestServiceServicer, and then you just need two extra steps to adopt your schema into Ray Serve.

  • Inherit ray.serve.drivers.gRPCIngress in your implementation class.

  • Add the @serve.deployment(is_driver_deployment=True) decorator.

Server:

from ray import serve
from ray.serve.drivers import gRPCIngress
import test_service_pb2_grpc, test_service_pb2


@serve.deployment(is_driver_deployment=True)
class MyDriver(test_service_pb2_grpc.TestServiceServicer, gRPCIngress):
    def __init__(self):
        super().__init__()

    async def Ping(self, request, context):
        # play with your dag and then reply
        return test_service_pb2.PingReply()


my_deployment = MyDriver.bind()

serve.run(my_deployment)

Client: You can directly use the client code to play it!

import grpc
import test_service_pb2_grpc, test_service_pb2

channel = grpc.aio.insecure_channel("localhost:9000")
stub = test_service_pb2_grpc.TestServiceStub(channel)
response = stub.Ping(test_service_pb2.PingRequest())

Note

  • is_driver_deployment (experimental flag) is needed to mark the class as driver, serve will make sure the driver class deployment gets deployed one replica per node.

  • gRPCIngress is used for starting a gRPC server. Your driver class needs to inherit from it.