Advanced Usage

This page will cover some more advanced examples of using Ray’s flexible programming model.

Synchronization

Tasks or actors can often contend over the same resource or need to communicate with each other. Here are some standard ways to perform synchronization across Ray processes.

Inter-process synchronization using FileLock

If you have several tasks or actors writing to the same file or downloading a file on a single node, you can use FileLock to synchronize.

This often occurs for data loading and preprocessing.

import ray
from filelock import FileLock

@ray.remote
def write_to_file(text):
    # Create a filelock object. Consider using an absolute path for the lock.
    with FileLock("my_data.txt.lock"):
        with open("my_data.txt","a") as f:
            f.write(text)

ray.init()
ray.get([write_to_file.remote("hi there!\n") for i in range(3)])

with open("my_data.txt") as f:
    print(f.read())

## Output is:

# hi there!
# hi there!
# hi there!

Multi-node synchronization using SignalActor

When you have multiple tasks that need to wait on some condition, you can use a SignalActor to coordinate.

# Also available via `from ray.test_utils import SignalActor`
import ray
import asyncio

@ray.remote(num_cpus=0)
class SignalActor:
    def __init__(self):
        self.ready_event = asyncio.Event()

    def send(self, clear=False):
        self.ready_event.set()
        if clear:
            self.ready_event.clear()

    async def wait(self, should_wait=True):
        if should_wait:
            await self.ready_event.wait()

@ray.remote
def wait_and_go(signal):
    ray.get(signal.wait.remote())

    print("go!")

ray.init()
signal = SignalActor.remote()
tasks = [wait_and_go.remote(signal) for _ in range(4)]
print("ready...")
# Tasks will all be waiting for the signals.
print("set..")
ray.get(signal.send.remote())

# Tasks are unblocked.
ray.get(tasks)

##  Output is:
# ready...
# get set..

# (pid=77366) go!
# (pid=77372) go!
# (pid=77367) go!
# (pid=77358) go!

Message passing using Ray Queue

Sometimes just using one signal to synchronize is not enough. If you need to send data among many tasks or actors, you can use ray.util.queue.Queue.

import ray
from ray.util.queue import Queue

ray.init()
# You can pass this object around to different tasks/actors
queue = Queue(maxsize=100)

@ray.remote
def consumer(queue):
    next_item = queue.get(block=True)
    print(f"got work {next_item}")

consumers = [consumer.remote(queue) for _ in range(2)]

[queue.put(i) for i in range(10)]

Ray’s Queue API has similar API as Python’s asyncio.Queue and queue.Queue.

Dynamic Remote Parameters

You can dynamically adjust resource requirements or return values of ray.remote during execution with .options.

For example, here we instantiate many copies of the same actor with varying resource requirements. Note that to create these actors successfully, Ray will need to be started with sufficient CPU resources and the relevant custom resources:

@ray.remote(num_cpus=4)
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

a1 = Counter.options(num_cpus=1, resources={"Custom1": 1}).remote()
a2 = Counter.options(num_cpus=2, resources={"Custom2": 1}).remote()
a3 = Counter.options(num_cpus=3, resources={"Custom3": 1}).remote()

You can specify different resource requirements for tasks (but not for actor methods):

@ray.remote
def g():
    return ray.get_gpu_ids()

object_gpu_ids = g.remote()
assert ray.get(object_gpu_ids) == [0]

dynamic_object_gpu_ids = g.options(num_cpus=1, num_gpus=1).remote()
assert ray.get(dynamic_object_gpu_ids) == [0]

And vary the number of return values for tasks (and actor methods too):

@ray.remote
def f(n):
    return list(range(n))

id1, id2 = f.options(num_returns=2).remote(2)
assert ray.get(id1) == 0
assert ray.get(id2) == 1

And specify a name for tasks (and actor methods too) at task submission time:

import setproctitle

@ray.remote
def f(x):
   assert setproctitle.getproctitle() == "ray::special_f"
   return x + 1

obj = f.options(name="special_f").remote(3)
assert ray.get(obj) == 4

This name will appear as the task name in the machine view of the dashboard, will appear as the worker process name when this task is executing (if a Python task), and will appear as the task name in the logs.

_images/task_name_dashboard.png

Dynamic Custom Resources

Ray enables explicit developer control with respect to the task and actor placement by using custom resources. Further, users are able to dynamically adjust custom resources programmatically with ray.experimental.set_resource. This allows the Ray application to implement virtually any scheduling policy, including task affinity, data locality, anti-affinity, load balancing, gang scheduling, and priority-based scheduling.

ray.init()
resource_name = "test_resource"
resource_capacity = 1.0

@ray.remote
def set_resource(resource_name, resource_capacity):
    ray.experimental.set_resource(resource_name, resource_capacity)

ray.get(set_resource.remote(resource_name, resource_capacity))

available_resources = ray.available_resources()
cluster_resources = ray.cluster_resources()

assert available_resources[resource_name] == resource_capacity
assert cluster_resources[resource_name] == resource_capacity
ray.experimental.set_resource(resource_name, capacity, client_id=None)[source]

Set a resource to a specified capacity.

This creates, updates or deletes a custom resource for a target NodeID. If the resource already exists, it’s capacity is updated to the new value. If the capacity is set to 0, the resource is deleted. If NodeID is not specified or set to None, the resource is created on the local client where the actor is running.

Parameters
  • resource_name (str) – Name of the resource to be created

  • capacity (int) – Capacity of the new resource. Resource is deleted if capacity is 0.

  • client_id (str) – The NodeID of the node where the resource is to be set.

Returns

None

Raises

ValueError – This exception is raised when a non-negative capacity is specified.

Accelerator Types

Ray supports resource specific accelerator types. The accelerator_type field can be used to force to a task to run on a node with a specific type of accelerator. Under the hood, the accelerator type option is implemented as a custom resource demand of "accelerator_type:<type>": 0.001. This forces the task to be placed on a node with that particular accelerator type available. This also lets the multi-node-type autoscaler know that there is demand for that type of resource, potentially triggering the launch of new nodes providing that accelerator.

from ray.accelerators import NVIDIA_TESLA_V100

@ray.remote(num_gpus=1, accelerator_type=NVIDIA_TESLA_V100)
def train(data):
    return "This function was run on a node with a Tesla V100 GPU"

See ray.util.accelerators to see available accelerator types. Current automatically detected accelerator types include Nvidia GPUs.

Nested Remote Functions

Remote functions can call other remote functions, resulting in nested tasks. For example, consider the following.

@ray.remote
def f():
    return 1

@ray.remote
def g():
    # Call f 4 times and return the resulting object refs.
    return [f.remote() for _ in range(4)]

@ray.remote
def h():
    # Call f 4 times, block until those 4 tasks finish,
    # retrieve the results, and return the values.
    return ray.get([f.remote() for _ in range(4)])

Then calling g and h produces the following behavior.

>>> ray.get(g.remote())
[ObjectRef(b1457ba0911ae84989aae86f89409e953dd9a80e),
 ObjectRef(7c14a1d13a56d8dc01e800761a66f09201104275),
 ObjectRef(99763728ffc1a2c0766a2000ebabded52514e9a6),
 ObjectRef(9c2f372e1933b04b2936bb6f58161285829b9914)]

>>> ray.get(h.remote())
[1, 1, 1, 1]

One limitation is that the definition of f must come before the definitions of g and h because as soon as g is defined, it will be pickled and shipped to the workers, and so if f hasn’t been defined yet, the definition will be incomplete.

Circular Dependencies

Consider the following remote function.

@ray.remote(num_cpus=1, num_gpus=1)
def g():
    return ray.get(f.remote())

When a g task is executing, it will release its CPU resources when it gets blocked in the call to ray.get. It will reacquire the CPU resources when ray.get returns. It will retain its GPU resources throughout the lifetime of the task because the task will most likely continue to use GPU memory.

Cython Code in Ray

To use Cython code in Ray, run the following from directory $RAY_HOME/examples/cython:

pip install scipy # For BLAS example
pip install -e .
python cython_main.py --help

You can import the cython_examples module from a Python script or interpreter.

Notes

  • You must include the following two lines at the top of any *.pyx file:

#!python
# cython: embedsignature=True, binding=True
  • You cannot decorate Cython functions within a *.pyx file (there are ways around this, but creates a leaky abstraction between Cython and Python that would be very challenging to support generally). Instead, prefer the following in your Python code:

some_cython_func = ray.remote(some_cython_module.some_cython_func)
  • You cannot transfer memory buffers to a remote function (see example8, which currently fails); your remote function must return a value

  • Have a look at cython_main.py, cython_simple.pyx, and setup.py for examples of how to call, define, and build Cython code, respectively. The Cython documentation is also very helpful.

  • Several limitations come from Cython’s own unsupported Python features.

  • We currently do not support compiling and distributing Cython code to ray clusters. In other words, Cython developers are responsible for compiling and distributing any Cython code to their cluster (much as would be the case for users who need Python packages like scipy).

  • For most simple use cases, developers need not worry about Python 2 or 3, but users who do need to care can have a look at the language_level Cython compiler directive (see here).

Inspecting Cluster State

Applications written on top of Ray will often want to have some information or diagnostics about the cluster. Some common questions include:

  1. How many nodes are in my autoscaling cluster?

  2. What resources are currently available in my cluster, both used and total?

  3. What are the objects currently in my cluster?

For this, you can use the global state API.

Node Information

To get information about the current nodes in your cluster, you can use ray.nodes():

ray.nodes()[source]

Get a list of the nodes in the cluster (for debugging only).

Returns

Information about the Ray clients in the cluster.

import ray

ray.init()

print(ray.nodes())

"""
[{'NodeID': '2691a0c1aed6f45e262b2372baf58871734332d7',
  'Alive': True,
  'NodeManagerAddress': '192.168.1.82',
  'NodeManagerHostname': 'host-MBP.attlocal.net',
  'NodeManagerPort': 58472,
  'ObjectManagerPort': 52383,
  'ObjectStoreSocketName': '/tmp/ray/session_2020-08-04_11-00-17_114725_17883/sockets/plasma_store',
  'RayletSocketName': '/tmp/ray/session_2020-08-04_11-00-17_114725_17883/sockets/raylet',
  'MetricsExportPort': 64860,
  'alive': True,
  'Resources': {'CPU': 16.0, 'memory': 100.0, 'object_store_memory': 34.0, 'node:192.168.1.82': 1.0}}]
"""

The above information includes:

  • NodeID: A unique identifier for the raylet.

  • alive: Whether the node is still alive.

  • NodeManagerAddress: PrivateIP of the node that the raylet is on.

  • Resources: The total resource capacity on the node.

  • MetricsExportPort: The port number at which metrics are exposed to through a Prometheus endpoint.

Resource Information

To get information about the current total resource capacity of your cluster, you can use ray.cluster_resources().

ray.cluster_resources()[source]

Get the current total cluster resources.

Note that this information can grow stale as nodes are added to or removed from the cluster.

Returns

A dictionary mapping resource name to the total quantity of that

resource in the cluster.

To get information about the current available resource capacity of your cluster, you can use ray.available_resources().

ray.available_resources()[source]

Get the current available cluster resources.

This is different from cluster_resources in that this will return idle (available) resources rather than total resources.

Note that this information can grow stale as tasks start and finish.

Returns

A dictionary mapping resource name to the total quantity of that

resource in the cluster.