A Gentle Introduction to Ray

https://github.com/ray-project/ray/raw/master/doc/source/images/ray_header_logo.png

Ray provides a simple, universal API for building distributed applications.

Ray accomplishes this mission by:

  1. Providing simple primitives for building and running distributed applications.

  2. Enabling end users to parallelize single machine code, with little to zero code changes.

  3. Including a large ecosystem of applications, libraries, and tools on top of the core Ray to enable complex applications.

Ray Core provides the simple primitives for application building.

On top of Ray Core are several libraries for solving problems in machine learning:

There are also many community integrations with Ray, including Dask, MARS, Modin, Horovod, Hugging Face, Scikit-learn, and others. Check out the full list of Ray distributed libraries here.

This tutorial will provide a tour of the core features of Ray.

Ray provides Python and Java API. To use Ray in Python, first install Ray with: pip install ray. To use Ray in Java, first add the ray-api and ray-runtime dependencies in your project. Then we can use Ray to parallelize your program.

Parallelizing Python/Java Functions with Ray Tasks

First, import ray and init the Ray service. Then decorate your function with @ray.remote to declare that you want to run this function remotely. Lastly, call that function with .remote() instead of calling it normally. This remote call yields a future, or ObjectRef that you can then fetch with ray.get.

import ray
ray.init()

@ray.remote
def f(x):
    return x * x

futures = [f.remote(i) for i in range(4)]
print(ray.get(futures)) # [0, 1, 4, 9]

First, use Ray.init to initialize Ray runtime. Then you can use Ray.task(...).remote() to convert any Java static method into a Ray task. The task will run asynchronously in a remote worker process. The remote method will return an ObjectRef, and you can then fetch the actual result with get.

import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import java.util.ArrayList;
import java.util.List;

public class RayDemo {

  public static int square(int x) {
    return x * x;
  }

  public static void main(String[] args) {
    // Intialize Ray runtime.
    Ray.init();
    List<ObjectRef<Integer>> objectRefList = new ArrayList<>();
    // Invoke the `square` method 4 times remotely as Ray tasks.
    // The tasks will run in parallel in the background.
    for (int i = 0; i < 4; i++) {
      objectRefList.add(Ray.task(RayDemo::square, i).remote());
    }
    // Get the actual results of the tasks.
    System.out.println(Ray.get(objectRefList));  // [0, 1, 4, 9]
  }
}

In the above code block we defined some Ray Tasks. While these are great for stateless operations, sometimes you must maintain the state of your application. You can do that with Ray Actors.

Parallelizing Python/Java Classes with Ray Actors

Ray provides actors to allow you to parallelize an instance of a class in Python/Java. When you instantiate a class that is a Ray actor, Ray will start a remote instance of that class in the cluster. This actor can then execute remote method calls and maintain its own internal state.

import ray
ray.init() # Only call this once.

@ray.remote
class Counter(object):
    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1

    def read(self):
        return self.n

counters = [Counter.remote() for i in range(4)]
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures)) # [1, 1, 1, 1]
import io.ray.api.ActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class RayDemo {

  public static class Counter {

    private int value = 0;

    public void increment() {
      this.value += 1;
    }

    public int read() {
      return this.value;
    }
  }

  public static void main(String[] args) {
    // Intialize Ray runtime.
    Ray.init();
    List<ActorHandle<Counter>> counters = new ArrayList<>();
    // Create 4 actors from the `Counter` class.
    // They will run in remote worker processes.
    for (int i = 0; i < 4; i++) {
      counters.add(Ray.actor(Counter::new).remote());
    }

    // Invoke the `increment` method on each actor.
    // This will send an actor task to each remote actor.
    for (ActorHandle<Counter> counter : counters) {
      counter.task(Counter::increment).remote();
    }
    // Invoke the `read` method on each actor, and print the results.
    List<ObjectRef<Integer>> objectRefList = counters.stream()
        .map(counter -> counter.task(Counter::read).remote())
        .collect(Collectors.toList());
    System.out.println(Ray.get(objectRefList));  // [1, 1, 1, 1]
  }
}

An Overview of the Ray Libraries

Ray has a rich ecosystem of libraries and frameworks built on top of it. The main ones being:

Tune Quick Start

Tune is a library for hyperparameter tuning at any scale. With Tune, you can launch a multi-node distributed hyperparameter sweep in less than 10 lines of code. Tune supports any deep learning framework, including PyTorch, TensorFlow, and Keras.

Note

To run this example, you will need to install the following:

$ pip install 'ray[tune]'

This example runs a small grid search with an iterative training function.

from ray import tune


def objective(step, alpha, beta):
    return (0.1 + alpha * step / 100)**(-1) + beta * 0.1


def training_function(config):
    # Hyperparameters
    alpha, beta = config["alpha"], config["beta"]
    for step in range(10):
        # Iterative training function - can be any arbitrary training procedure.
        intermediate_score = objective(step, alpha, beta)
        # Feed the score back back to Tune.
        tune.report(mean_loss=intermediate_score)


analysis = tune.run(
    training_function,
    config={
        "alpha": tune.grid_search([0.001, 0.01, 0.1]),
        "beta": tune.choice([1, 2, 3])
    })

print("Best config: ", analysis.get_best_config(
    metric="mean_loss", mode="min"))

# Get a dataframe for analyzing trial results.
df = analysis.results_df

If TensorBoard is installed, automatically visualize all trial results:

tensorboard --logdir ~/ray_results

RLlib Quick Start

RLlib is an open-source library for reinforcement learning built on top of Ray that offers both high scalability and a unified API for a variety of applications.

pip install tensorflow  # or tensorflow-gpu
pip install ray[rllib]  # also recommended: ray[debug]
import gym
from gym.spaces import Discrete, Box
from ray import tune

class SimpleCorridor(gym.Env):
    def __init__(self, config):
        self.end_pos = config["corridor_length"]
        self.cur_pos = 0
        self.action_space = Discrete(2)
        self.observation_space = Box(0.0, self.end_pos, shape=(1, ))

    def reset(self):
        self.cur_pos = 0
        return [self.cur_pos]

    def step(self, action):
        if action == 0 and self.cur_pos > 0:
            self.cur_pos -= 1
        elif action == 1:
            self.cur_pos += 1
        done = self.cur_pos >= self.end_pos
        return [self.cur_pos], 1 if done else 0, done, {}

tune.run(
    "PPO",
    config={
        "env": SimpleCorridor,
        "num_workers": 4,
        "env_config": {"corridor_length": 5}})

Where to go next?

Visit the Walkthrough page a more comprehensive overview of Ray features.

Ray programs can run on a single machine, and can also seamlessly scale to large clusters. To execute the above Ray script in the cloud, just download this configuration file, and run:

ray submit [CLUSTER.YAML] example.py --start

Read more about launching clusters.