Ray Core Walkthrough

This walkthrough will overview the core concepts of Ray:

  1. Starting Ray

  2. Using remote functions (tasks)

  3. Fetching results (object refs)

  4. Using remote classes (actors)

With Ray, your code will work on a single machine and can be easily scaled to large cluster.

Java demo code in this documentation can be found here.

Installation

To run this walkthrough, install Ray with pip install -U ray. For the latest wheels (for a snapshot of master), you can use these instructions at Daily Releases (Nightlies).

Starting Ray

You can start Ray on a single machine by adding this to your code.

Note

In recent versions of Ray (>=1.5), ray.init() will automatically be called on the first use of a Ray remote API.

import ray

# Start Ray. If you're connecting to an existing cluster, you would use
# ray.init(address=<cluster-address>) instead.
ray.init()

...

Ray will then be able to utilize all cores of your machine. Find out how to configure the number of cores Ray will use at Configuring Ray.

To start a multi-node Ray cluster, see the cluster setup page.

Remote functions (Tasks)

Ray enables arbitrary functions to be executed asynchronously. These asynchronous Ray functions are called “remote functions”. Here is an example.

# A regular Python function.
def my_function():
    return 1

# By adding the `@ray.remote` decorator, a regular Python function
# becomes a Ray remote function.
@ray.remote
def my_function():
    return 1

# To invoke this remote function, use the `remote` method.
# This will immediately return an object ref (a future) and then create
# a task that will be executed on a worker process.
obj_ref = my_function.remote()

# The result can be retrieved with ``ray.get``.
assert ray.get(obj_ref) == 1

@ray.remote
def slow_function():
    time.sleep(10)
    return 1

# Invocations of Ray remote functions happen in parallel.
# All computation is performed in the background, driven by Ray's internal event loop.
for _ in range(4):
    # This doesn't block.
    slow_function.remote()

See the ray.remote package reference page for specific documentation on how to use ray.remote.

Passing object refs to remote functions

Object refs can also be passed into remote functions. When the function actually gets executed, the argument will be a retrieved as a regular object. For example, take this function:

@ray.remote
def function_with_an_argument(value):
    return value + 1


obj_ref1 = my_function.remote()
assert ray.get(obj_ref1) == 1

# You can pass an object ref as an argument to another Ray remote function.
obj_ref2 = function_with_an_argument.remote(obj_ref1)
assert ray.get(obj_ref2) == 2

Note the following behaviors:

  • The second task will not be executed until the first task has finished executing because the second task depends on the output of the first task.

  • If the two tasks are scheduled on different machines, the output of the first task (the value corresponding to obj_ref1/objRef1) will be sent over the network to the machine where the second task is scheduled.

Specifying required resources

Oftentimes, you may want to specify a task’s resource requirements (for example one task may require a GPU). Ray will automatically detect the available GPUs and CPUs on the machine. However, you can override this default behavior by passing in specific resources.

ray.init(num_cpus=8, num_gpus=4, resources={'Custom': 2})`

Ray also allows specifying a task’s resources requirements (e.g., CPU, GPU, and custom resources). The task will only run on a machine if there are enough resources available to execute the task.

# Specify required resources.
@ray.remote(num_cpus=4, num_gpus=2)
def my_function():
    return 1

Note

  • If you do not specify any resources, the default is 1 CPU resource and no other resources.

  • If specifying CPUs, Ray does not enforce isolation (i.e., your task is expected to honor its request).

  • If specifying GPUs, Ray does provide isolation in forms of visible devices (setting the environment variable CUDA_VISIBLE_DEVICES), but it is the task’s responsibility to actually use the GPUs (e.g., through a deep learning framework like TensorFlow or PyTorch).

The resource requirements of a task have implications for the Ray’s scheduling concurrency. In particular, the sum of the resource requirements of all of the concurrently executing tasks on a given node cannot exceed the node’s total resources.

Below are more examples of resource specifications:

# Ray also supports fractional resource requirements.
@ray.remote(num_gpus=0.5)
def h():
    return 1

# Ray support custom resources too.
@ray.remote(resources={'Custom': 1})
def f():
    return 1

Multiple returns

Python remote functions can return multiple object refs.

@ray.remote(num_returns=3)
def return_multiple():
    return 1, 2, 3

a, b, c = return_multiple.remote()

Cancelling tasks

Remote functions can be canceled by calling ray.cancel (docstring) on the returned Object ref. Remote actor functions can be stopped by killing the actor using the ray.kill interface.

@ray.remote
def blocking_operation():
    time.sleep(10e6)

obj_ref = blocking_operation.remote()
ray.cancel(obj_ref)

from ray.exceptions import TaskCancelledError

try:
    ray.get(obj_ref)
except TaskCancelledError:
    print("Object reference was cancelled.")

Objects in Ray

In Ray, we can create and compute on objects. We refer to these objects as remote objects, and we use object refs to refer to them. Remote objects are stored in shared-memory object stores, and there is one object store per node in the cluster. In the cluster setting, we may not actually know which machine each object lives on.

An object ref is essentially a unique ID that can be used to refer to a remote object. If you’re familiar with futures, our object refs are conceptually similar.

Object refs can be created in multiple ways.

  1. They are returned by remote function calls.

  2. They are returned by put (docstring).

# Put an object in Ray's object store.
y = 1
object_ref = ray.put(y)

Note

Remote objects are immutable. That is, their values cannot be changed after creation. This allows remote objects to be replicated in multiple object stores without needing to synchronize the copies.

Fetching Results

You can use the get method (docstring) to fetch the result of a remote object from an object ref. If the current node’s object store does not contain the object, the object is downloaded.

If the object is a numpy array or a collection of numpy arrays, the get call is zero-copy and returns arrays backed by shared object store memory. Otherwise, we deserialize the object data into a Python object.

# Get the value of one object ref.
obj_ref = ray.put(1)
assert ray.get(obj_ref) == 1

# Get the values of multiple object refs in parallel.
assert ray.get([ray.put(i) for i in range(3)]) == [0, 1, 2]

# You can also set a timeout to return early from a ``get`` that's blocking for too long.
from ray.exceptions import GetTimeoutError

@ray.remote
def long_running_function():
    time.sleep(8)

obj_ref = long_running_function.remote()
try:
    ray.get(obj_ref, timeout=4)
except GetTimeoutError:
    print("`get` timed out.")

After launching a number of tasks, you may want to know which ones have finished executing. This can be done with wait (ray.wait). The function works as follows.

ready_refs, remaining_refs = ray.wait(object_refs, num_returns=1, timeout=None)

Object Spilling

When the object store gets full, objects will be spilled to disk. This feature is available in Ray 1.3+.

Remote Classes (Actors)

Actors extend the Ray API from functions (tasks) to classes. An actor is essentially a stateful worker.

The ray.remote decorator indicates that instances of the Counter class will be actors. Each actor runs in its own Python process.

@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

# Create an actor from this class.
counter = Counter.remote()

Specifying required resources

You can specify resource requirements in actors too (see the Actors section for more details.)

# Specify required resources for an actor.
@ray.remote(num_cpus=2, num_gpus=0.5)
class Actor(object):
    pass

Calling the actor

We can interact with the actor by calling its methods with the remote operator. We can then call get on the object ref to retrieve the actual value.

# Call the actor.
obj_ref = counter.increment.remote()
assert ray.get(obj_ref) == 1

Methods called on different actors can execute in parallel, and methods called on the same actor are executed serially in the order that they are called. Methods on the same actor will share state with one another, as shown below.

# Create ten Counter actors.
counters = [Counter.remote() for _ in range(10)]

# Increment each Counter once and get the results. These tasks all happen in
# parallel.
results = ray.get([c.increment.remote() for c in counters])
print(results)  # prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

# Increment the first Counter five times. These tasks are executed serially
# and share state.
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results)  # prints [2, 3, 4, 5, 6]

To learn more about Ray Actors, see the Actors section.