Java Tutorial#

To use Java Ray Serve, you need the following dependency in your pom.xml.

<dependency>
  <groupId>io.ray</groupId>
  <artifactId>ray-serve</artifactId>
  <version>${ray.version}</version>
  <scope>provided</scope>
</dependency>

NOTE: After installing Ray via Python, the Java jar of Ray Serve is included locally. The provided scope could ensure the Java code using Ray Serve can be compiled and will not cause version conflicts when deployed on the cluster.

Example Model#

Our example use case is derived from production workflow of a financial application. The application needs to compute the best strategy to interact with different banks for a single task.

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

public class Strategy {

  public List<String> calc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
    List<String> results = new ArrayList<>();
    for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
      String bank = e.getKey();
      for (List<String> indicators : e.getValue()) {
        results.addAll(calcBankIndicators(time, bank, indicators));
      }
    }
    return results;
  }

  public List<String> calcBankIndicators(Long time, String bank, List<String> indicators) {
    List<String> results = new ArrayList<>();
    for (String indicator : indicators) {
      results.add(calcIndicator(time, bank, indicator));
    }
    return results;
  }

  public String calcIndicator(Long time, String bank, String indicator) {
    // do bank data calculation
    return bank + "-" + indicator + "-" + time; // Demo;
  }
}

This Strategy class is used to calculate the indicators of a number of banks.

  • The calc method is the entry of the calculation. The input parameters are the time interval of calculation and the map of the banks and their indicators. As we can see, the calc method contains a two-tier for loop, traversing each indicator list of each bank, and calling the calcBankIndicators method to calculate the indicators of the specified bank.

  • There is another layer of for loop in the calcBankIndicators method, which traverses each indicator, and then calls the calcIndicator method to calculate the specific indicator of the bank.

  • The calcIndicator method is a specific calculation logic based on the bank, the specified time interval and the indicator.

This is the code that uses the Strategy class:

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StrategyCalc {

  public static void main(String[] args) {
    long time = System.currentTimeMillis();
    String bank1 = "demo_bank_1";
    String bank2 = "demo_bank_2";
    String indicator1 = "demo_indicator_1";
    String indicator2 = "demo_indicator_2";
    Map<String, List<List<String>>> banksAndIndicators = new HashMap<>();
    banksAndIndicators.put(bank1, Arrays.asList(Arrays.asList(indicator1, indicator2)));
    banksAndIndicators.put(
        bank2, Arrays.asList(Arrays.asList(indicator1), Arrays.asList(indicator2)));

    Strategy strategy = new Strategy();
    List<String> results = strategy.calc(time, banksAndIndicators);

    System.out.println(results);
  }
}

When the scale of banks and indicators expands, the three-tier for loop will slow down the calculation. Even if the thread pool is used to calculate each indicator in parallel, we may encounter a single machine performance bottleneck. Moreover, this Strategy object cannot be reused as a resident service.

Converting to a Ray Serve Deployment#

Through Ray Serve, the core computing logic of Strategy can be deployed as a scalable distributed computing service.

First, we can extract the indicator calculation of each institution into a separate StrategyOnRayServe class:

public class StrategyOnRayServe {

  public String calcIndicator(Long time, String bank, String indicator) {
    // do bank data calculation
    return bank + "-" + indicator + "-" + time; // Demo;
  }
}

Next, we start the Ray Serve runtime and deploy StrategyOnRayServe as a deployment.

  public void deploy() {
    Serve.start(null);

    Application deployment =
        Serve.deployment()
            .setName("strategy")
            .setDeploymentDef(StrategyOnRayServe.class.getName())
            .setNumReplicas(4)
            .bind();
    Serve.run(deployment);
  }

The Deployment.create makes a Deployment object named “strategy.” After executing Deployment.deploy, this “strategy” deployment is deployed in the instance of Ray Serve with four replicas, and we can access it for distributed parallel computing.

Testing the Ray Serve Deployment#

Now we can test the “strategy” deployment using RayServeHandle inside Ray:

  public List<String> calc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
    Deployment deployment = Serve.getDeployment("strategy");

    List<String> results = new ArrayList<>();
    for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
      String bank = e.getKey();
      for (List<String> indicators : e.getValue()) {
        for (String indicator : indicators) {
          results.add(
              (String)
                  deployment
                      .getHandle()
                      .method("calcIndicator")
                      .remote(time, bank, indicator)
                      .result());
        }
      }
    }
    return results;
  }

At present, the calculation of each bank’s each indicator is still executed serially, and sent to Ray for execution. We can make the calculation concurrent, which not only improves the calculation efficiency, but also solves the bottleneck of single machine.

  public List<String> parallelCalc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
    Deployment deployment = Serve.getDeployment("strategy");

    List<String> results = new ArrayList<>();
    List<DeploymentResponse> responses = new ArrayList<>();
    for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
      String bank = e.getKey();
      for (List<String> indicators : e.getValue()) {
        for (String indicator : indicators) {
          responses.add(
              deployment.getHandle().method("calcIndicator").remote(time, bank, indicator));
        }
      }
    }
    for (DeploymentResponse response : responses) {
      results.add((String) response.result());
    }
    return results;
  }

Now, we can use StrategyCalcOnRayServe like the example in the main method:

  public static void main(String[] args) {

    long time = System.currentTimeMillis();
    String bank1 = "demo_bank_1";
    String bank2 = "demo_bank_2";
    String indicator1 = "demo_indicator_1";
    String indicator2 = "demo_indicator_2";
    Map<String, List<List<String>>> banksAndIndicators = new HashMap<>();
    banksAndIndicators.put(bank1, Arrays.asList(Arrays.asList(indicator1, indicator2)));
    banksAndIndicators.put(
        bank2, Arrays.asList(Arrays.asList(indicator1), Arrays.asList(indicator2)));

    StrategyCalcOnRayServe strategy = new StrategyCalcOnRayServe();
    strategy.deploy();
    List<String> results = strategy.parallelCalc(time, banksAndIndicators);

    System.out.println(results);
  }

Calling Ray Serve Deployment with HTTP#

Another way to test or call a deployment is through the HTTP request. But there are now two limitations for the Java deployments:

  • The HTTP requests can only be processed by the call method of the user class.

  • The call method could only have one input parameter, and the type of the input parameter and the returned value can only be String.

If we want to call the “strategy” deployment via HTTP, the class can be rewritten like this:

import com.google.gson.Gson;

public class HttpStrategyOnRayServe {

  static class BankIndicator {
    long time;
    String bank;
    String indicator;
  }

  private Gson gson = new Gson();

  public String call(String dataJson) {
    BankIndicator data = gson.fromJson(dataJson, BankIndicator.class);
    // do bank data calculation
    return data.bank + "-" + data.indicator + "-" + data.time; // Demo;
  }
}

After deploying this deployment, we can access it through curl command:

curl -d '{"time":1641038674, "bank":"test_bank", "indicator":"test_indicator"}' http://127.0.0.1:8000/strategy

It can also be accessed using HTTP Client in Java code:

  private Gson gson = new Gson();

  public String httpCalc(Long time, String bank, String indicator) {
    Map<String, Object> data = new HashMap<>();
    data.put("time", time);
    data.put("bank", bank);
    data.put("indicator", indicator);

    String result;
    try {
      result =
          Request.post("http://127.0.0.1:8000/http-strategy")
              .bodyString(gson.toJson(data), null)
              .execute()
              .returnContent()
              .asString();
    } catch (IOException e) {
      result = "error";
    }

    return result;
  }

The example of strategy calculation using HTTP to access deployment is as follows:

  public List<String> calc(Long time, Map<String, List<List<String>>> banksAndIndicators) {

    List<String> results = new ArrayList<>();
    for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
      String bank = e.getKey();
      for (List<String> indicators : e.getValue()) {
        for (String indicator : indicators) {
          results.add(httpCalc(time, bank, indicator));
        }
      }
    }
    return results;
  }

This code can also be rewritten to support concurrency:

  private ExecutorService executorService = Executors.newFixedThreadPool(4);

  public List<String> parallelCalc(Long time, Map<String, List<List<String>>> banksAndIndicators) {

    List<String> results = new ArrayList<>();
    List<Future<String>> futures = new ArrayList<>();
    for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
      String bank = e.getKey();
      for (List<String> indicators : e.getValue()) {
        for (String indicator : indicators) {
          futures.add(executorService.submit(() -> httpCalc(time, bank, indicator)));
        }
      }
    }
    for (Future<String> future : futures) {
      try {
        results.add(future.get());
      } catch (InterruptedException | ExecutionException e1) {
        results.add("error");
      }
    }
    return results;
  }

Finally, the complete usage of HttpStrategyCalcOnRayServe is like this:

  public static void main(String[] args) {

    long time = System.currentTimeMillis();
    String bank1 = "demo_bank_1";
    String bank2 = "demo_bank_2";
    String indicator1 = "demo_indicator_1";
    String indicator2 = "demo_indicator_2";
    Map<String, List<List<String>>> banksAndIndicators = new HashMap<>();
    banksAndIndicators.put(bank1, Arrays.asList(Arrays.asList(indicator1, indicator2)));
    banksAndIndicators.put(
        bank2, Arrays.asList(Arrays.asList(indicator1), Arrays.asList(indicator2)));

    HttpStrategyCalcOnRayServe strategy = new HttpStrategyCalcOnRayServe();
    strategy.deploy();
    List<String> results = strategy.parallelCalc(time, banksAndIndicators);

    System.out.println(results);
  }