Serve a Java App#
To use Java Ray Serve, you need the following dependency in your pom.xml.
<dependency>
<groupId>io.ray</groupId>
<artifactId>ray-serve</artifactId>
<version>${ray.version}</version>
<scope>provided</scope>
</dependency>
NOTE: After installing Ray with Python, the local environment includes the Java jar of Ray Serve. The
provided
scope ensures that you can compile the Java code using Ray Serve without version conflicts when you deploy on the cluster.
Example model#
This example use case is a production workflow of a financial application. The application needs to compute the best strategy to interact with different banks for a single task.
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
public class Strategy {
public List<String> calc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
List<String> results = new ArrayList<>();
for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
String bank = e.getKey();
for (List<String> indicators : e.getValue()) {
results.addAll(calcBankIndicators(time, bank, indicators));
}
}
return results;
}
public List<String> calcBankIndicators(Long time, String bank, List<String> indicators) {
List<String> results = new ArrayList<>();
for (String indicator : indicators) {
results.add(calcIndicator(time, bank, indicator));
}
return results;
}
public String calcIndicator(Long time, String bank, String indicator) {
// do bank data calculation
return bank + "-" + indicator + "-" + time; // Demo;
}
}
This example uses the Strategy
class to calculate the indicators of a number of banks.
The
calc
method is the entry of the calculation. The input parameters are the time interval of calculation and the map of the banks and their indicators. Thecalc
method contains a two-tierfor
loop, traversing each indicator list of each bank, and calling thecalcBankIndicators
method to calculate the indicators of the specified bank.
There is another layer of
for
loop in thecalcBankIndicators
method, which traverses each indicator, and then calls thecalcIndicator
method to calculate the specific indicator of the bank.The
calcIndicator
method is a specific calculation logic based on the bank, the specified time interval and the indicator.
This code uses the Strategy
class:
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StrategyCalc {
public static void main(String[] args) {
long time = System.currentTimeMillis();
String bank1 = "demo_bank_1";
String bank2 = "demo_bank_2";
String indicator1 = "demo_indicator_1";
String indicator2 = "demo_indicator_2";
Map<String, List<List<String>>> banksAndIndicators = new HashMap<>();
banksAndIndicators.put(bank1, Arrays.asList(Arrays.asList(indicator1, indicator2)));
banksAndIndicators.put(
bank2, Arrays.asList(Arrays.asList(indicator1), Arrays.asList(indicator2)));
Strategy strategy = new Strategy();
List<String> results = strategy.calc(time, banksAndIndicators);
System.out.println(results);
}
}
When the scale of banks and indicators expands, the three-tier for
loop slows down the calculation. Even if you use the thread pool to calculate each indicator in parallel, you may encounter a single machine performance bottleneck. Moreover, you can’t use this Strategy
object as a resident service.
Converting to a Ray Serve Deployment#
Through Ray Serve, you can deploy the core computing logic of Strategy
as a scalable distributed computing service.
First, extract the indicator calculation of each institution into a separate StrategyOnRayServe
class:
public class StrategyOnRayServe {
public String calcIndicator(Long time, String bank, String indicator) {
// do bank data calculation
return bank + "-" + indicator + "-" + time; // Demo;
}
}
Next, start the Ray Serve runtime and deploy StrategyOnRayServe
as a deployment.
public void deploy() {
Serve.start(null);
Application deployment =
Serve.deployment()
.setName("strategy")
.setDeploymentDef(StrategyOnRayServe.class.getName())
.setNumReplicas(4)
.bind();
Serve.run(deployment);
}
The Deployment.create
makes a Deployment object named strategy
. After executing Deployment.deploy
, the Ray Serve instance deploys this strategy
deployment with four replicas, and you can access it for distributed parallel computing.
Testing the Ray Serve Deployment#
You can test the strategy
deployment using RayServeHandle inside Ray:
public List<String> calc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
Deployment deployment = Serve.getDeployment("strategy");
List<String> results = new ArrayList<>();
for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
String bank = e.getKey();
for (List<String> indicators : e.getValue()) {
for (String indicator : indicators) {
results.add(
(String)
deployment
.getHandle()
.method("calcIndicator")
.remote(time, bank, indicator)
.result());
}
}
}
return results;
}
This code executes the calculation of each bank’s indicator serially, and sends it to Ray for execution. You can make the calculation concurrent, which not only improves the calculation efficiency, but also solves the bottleneck of single machine.
public List<String> parallelCalc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
Deployment deployment = Serve.getDeployment("strategy");
List<String> results = new ArrayList<>();
List<DeploymentResponse> responses = new ArrayList<>();
for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
String bank = e.getKey();
for (List<String> indicators : e.getValue()) {
for (String indicator : indicators) {
responses.add(
deployment.getHandle().method("calcIndicator").remote(time, bank, indicator));
}
}
}
for (DeploymentResponse response : responses) {
results.add((String) response.result());
}
return results;
}
You can use StrategyCalcOnRayServe
like the example in the main
method:
public static void main(String[] args) {
long time = System.currentTimeMillis();
String bank1 = "demo_bank_1";
String bank2 = "demo_bank_2";
String indicator1 = "demo_indicator_1";
String indicator2 = "demo_indicator_2";
Map<String, List<List<String>>> banksAndIndicators = new HashMap<>();
banksAndIndicators.put(bank1, Arrays.asList(Arrays.asList(indicator1, indicator2)));
banksAndIndicators.put(
bank2, Arrays.asList(Arrays.asList(indicator1), Arrays.asList(indicator2)));
StrategyCalcOnRayServe strategy = new StrategyCalcOnRayServe();
strategy.deploy();
List<String> results = strategy.parallelCalc(time, banksAndIndicators);
System.out.println(results);
}
Calling Ray Serve Deployment with HTTP#
Another way to test or call a deployment is through the HTTP request. However, two limitations exist for the Java deployments:
Only the
call
method of the user class can process the HTTP requests.The
call
method can only have one input parameter, and the type of the input parameter and the returned value can only beString
.
If you want to call the strategy
deployment with HTTP, then you can rewrite the class like this code:
import com.google.gson.Gson;
public class HttpStrategyOnRayServe {
static class BankIndicator {
long time;
String bank;
String indicator;
}
private Gson gson = new Gson();
public String call(String dataJson) {
BankIndicator data = gson.fromJson(dataJson, BankIndicator.class);
// do bank data calculation
return data.bank + "-" + data.indicator + "-" + data.time; // Demo;
}
}
After deploying this deployment, you can access it with the curl
command:
curl -d '{"time":1641038674, "bank":"test_bank", "indicator":"test_indicator"}' http://127.0.0.1:8000/strategy
You can also access it using HTTP Client in Java code:
private Gson gson = new Gson();
public String httpCalc(Long time, String bank, String indicator) {
Map<String, Object> data = new HashMap<>();
data.put("time", time);
data.put("bank", bank);
data.put("indicator", indicator);
String result;
try {
result =
Request.post("http://127.0.0.1:8000/http-strategy")
.bodyString(gson.toJson(data), null)
.execute()
.returnContent()
.asString();
} catch (IOException e) {
result = "error";
}
return result;
}
The example of strategy calculation using HTTP to access deployment is as follows:
public List<String> calc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
List<String> results = new ArrayList<>();
for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
String bank = e.getKey();
for (List<String> indicators : e.getValue()) {
for (String indicator : indicators) {
results.add(httpCalc(time, bank, indicator));
}
}
}
return results;
}
You can also rewrite this code to support concurrency:
private ExecutorService executorService = Executors.newFixedThreadPool(4);
public List<String> parallelCalc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
List<String> results = new ArrayList<>();
List<Future<String>> futures = new ArrayList<>();
for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
String bank = e.getKey();
for (List<String> indicators : e.getValue()) {
for (String indicator : indicators) {
futures.add(executorService.submit(() -> httpCalc(time, bank, indicator)));
}
}
}
for (Future<String> future : futures) {
try {
results.add(future.get());
} catch (InterruptedException | ExecutionException e1) {
results.add("error");
}
}
return results;
}
Finally, the complete usage of HttpStrategyCalcOnRayServe
is like this code:
public static void main(String[] args) {
long time = System.currentTimeMillis();
String bank1 = "demo_bank_1";
String bank2 = "demo_bank_2";
String indicator1 = "demo_indicator_1";
String indicator2 = "demo_indicator_2";
Map<String, List<List<String>>> banksAndIndicators = new HashMap<>();
banksAndIndicators.put(bank1, Arrays.asList(Arrays.asList(indicator1, indicator2)));
banksAndIndicators.put(
bank2, Arrays.asList(Arrays.asList(indicator1), Arrays.asList(indicator2)));
HttpStrategyCalcOnRayServe strategy = new HttpStrategyCalcOnRayServe();
strategy.deploy();
List<String> results = strategy.parallelCalc(time, banksAndIndicators);
System.out.println(results);
}