Ray Direct Transport (RDT)#

Ray objects are normally stored in Ray’s CPU-based object store and copied and deserialized when accessed by a Ray task or actor. For GPU data specifically, this can lead to unnecessary and expensive data transfers. For example, passing a CUDA torch.Tensor from one Ray task to another would require a copy from GPU to CPU memory, then back again to GPU memory.

Ray Direct Transport (RDT) is a new feature that allows Ray to store and pass objects directly between Ray actors. This feature augments the familiar Ray ObjectRef API by:

  • Keeping GPU data in GPU memory until a transfer is needed

  • Avoiding expensive serialization and copies to and from the Ray object store

  • Using efficient data transports like collective communication libraries (Gloo or NCCL) or point-to-point RDMA (via NVIDIA’s NIXL) to transfer data directly between devices, including both CPU and GPUs

Note

RDT is currently in alpha. Not all Ray Core APIs are supported yet. Future releases may introduce breaking API changes. See the limitations section for more details.

Getting started#

Tip

RDT currently supports torch.Tensor objects created by Ray actor tasks. Other datatypes and Ray non-actor tasks may be supported in future releases.

This walkthrough will show how to create and use RDT with different tensor transports, i.e. the mechanism used to transfer the tensor between actors. Currently, RDT supports the following tensor transports:

  1. Gloo: A collective communication library for PyTorch and CPUs.

  2. NVIDIA NCCL: A collective communication library for NVIDIA GPUs.

  3. NVIDIA NIXL (backed by UCX): A library for accelerating point-to-point transfers via RDMA, especially between various types of memory and NVIDIA GPUs.

For ease of following along, we’ll start with the Gloo transport, which can be used without any physical GPUs.

Usage with Gloo (CPUs only)#

Installation#

Note

Under construction.

Walkthrough#

To get started, define an actor class and a task that returns a torch.Tensor:

import torch
import ray


@ray.remote
class MyActor:
    def random_tensor(self):
        return torch.randn(1000, 1000)


As written, when the torch.Tensor is returned, it will be copied into Ray’s CPU-based object store. For CPU-based tensors, this can require an expensive step to copy and serialize the object, while GPU-based tensors additionally require a copy to and from CPU memory.

To enable RDT, use the tensor_transport option in the @ray.method decorator.

@ray.remote
class MyActor:
    @ray.method(tensor_transport="gloo")
    def random_tensor(self):
        return torch.randn(1000, 1000)


This decorator can be added to any actor tasks that return a torch.Tensor, or that return torch.Tensors nested inside other Python objects. Adding this decorator will change Ray’s behavior in the following ways:

  1. When returning the tensor, Ray will store a reference to the tensor instead of copying it to CPU memory.

  2. When the ray.ObjectRef is passed to another task, Ray will use Gloo to transfer the tensor to the destination task.

Note that for (2) to work, the @ray.method(tensor_transport) decorator only needs to be added to the actor task that returns the tensor. It should not be added to actor tasks that consume the tensor (unless those tasks also return tensors).

Also, for (2) to work, we must first create a collective group of actors.

Creating a collective group#

To create a collective group for use with RDT:

  1. Create multiple Ray actors.

  2. Create a collective group on the actors using the ray.experimental.collective.create_collective_group function. The backend specified must match the tensor_transport used in the @ray.method decorator.

Here is an example:

import torch
import ray
from ray.experimental.collective import create_collective_group


@ray.remote
class MyActor:
    @ray.method(tensor_transport="gloo")
    def random_tensor(self):
        return torch.randn(1000, 1000)

    def sum(self, tensor: torch.Tensor):
        return torch.sum(tensor)


sender, receiver = MyActor.remote(), MyActor.remote()
# The tensor_transport specified here must match the one used in the @ray.method
# decorator.
group = create_collective_group([sender, receiver], backend="torch_gloo")

The actors can now communicate directly via gloo. The group can also be destroyed using the ray.experimental.collective.destroy_collective_group function. After calling this function, a new collective group can be created on the same actors.

Passing objects to other actors#

Now that we have a collective group, we can create and pass RDT objects between the actors. Here is a full example:

import torch
import ray
from ray.experimental.collective import create_collective_group


@ray.remote
class MyActor:
    @ray.method(tensor_transport="gloo")
    def random_tensor(self):
        return torch.randn(1000, 1000)

    def sum(self, tensor: torch.Tensor):
        return torch.sum(tensor)


sender, receiver = MyActor.remote(), MyActor.remote()
group = create_collective_group([sender, receiver], backend="torch_gloo")

# The tensor will be stored by the `sender` actor instead of in Ray's object
# store.
tensor = sender.random_tensor.remote()
result = receiver.sum.remote(tensor)
print(ray.get(result))

When the ray.ObjectRef is passed to another task, Ray will use Gloo to transfer the tensor directly from the source actor to the destination actor instead of the default object store. Note that the @ray.method(tensor_transport) decorator is only added to the actor task that returns the tensor; once this hint has been added, the receiving actor task receiver.sum will automatically use Gloo to receive the tensor. In this example, because MyActor.sum does not have the @ray.method(tensor_transport) decorator, it will use the default Ray object store transport to return torch.sum(tensor).

RDT also supports passing tensors nested inside Python data structures, as well as actor tasks that return multiple tensors, like in this example:

import torch
import ray
from ray.experimental.collective import create_collective_group


@ray.remote
class MyActor:
    @ray.method(tensor_transport="gloo")
    def random_tensor_dict(self):
        return {"tensor1": torch.randn(1000, 1000), "tensor2": torch.randn(1000, 1000)}

    def sum(self, tensor_dict: dict):
        return torch.sum(tensor_dict["tensor1"]) + torch.sum(tensor_dict["tensor2"])


sender, receiver = MyActor.remote(), MyActor.remote()
group = create_collective_group([sender, receiver], backend="torch_gloo")

# Both tensor values in the dictionary will be stored by the `sender` actor
# instead of in Ray's object store.
tensor_dict = sender.random_tensor_dict.remote()
result = receiver.sum.remote(tensor_dict)
print(ray.get(result))

Passing RDT objects to the actor that produced them#

RDT ray.ObjectRefs can also be passed to the actor that produced them. This avoids any copies and just provides a reference to the same torch.Tensor that was previously created. For example:

import torch
import ray
from ray.experimental.collective import create_collective_group


@ray.remote
class MyActor:
    @ray.method(tensor_transport="gloo")
    def random_tensor(self):
        return torch.randn(1000, 1000)

    def sum(self, tensor: torch.Tensor):
        return torch.sum(tensor)


sender, receiver = MyActor.remote(), MyActor.remote()
group = create_collective_group([sender, receiver], backend="torch_gloo")

tensor = sender.random_tensor.remote()
# Pass the ObjectRef back to the actor that produced it. The tensor will be
# passed back to the same actor without copying.
sum1 = sender.sum.remote(tensor)
sum2 = receiver.sum.remote(tensor)
assert torch.allclose(*ray.get([sum1, sum2]))

Note

Ray only keeps a reference to the tensor created by the user, so the tensor objects are mutable. If sender.sum were to modify the tensor in the above example, the changes would also be seen by receiver.sum. This differs from the normal Ray Core API, which always makes an immutable copy of data returned by actors.

ray.get#

The ray.get function can also be used as usual to retrieve the result of an RDT object, via Ray’s object store.

print(ray.get(tensor))
# torch.Tensor(...)

Usage with NCCL (NVIDIA GPUs only)#

RDT requires just a few lines of code change to switch tensor transports. Here is the Gloo example, modified to use NVIDIA GPUs and the NCCL library for collective GPU communication.

import torch
import ray
from ray.experimental.collective import create_collective_group


@ray.remote(num_gpus=1)
class MyActor:
    @ray.method(tensor_transport="nccl")
    def random_tensor(self):
        return torch.randn(1000, 1000).cuda()

    def sum(self, tensor: torch.Tensor):
        return torch.sum(tensor)


sender, receiver = MyActor.remote(), MyActor.remote()
group = create_collective_group([sender, receiver], backend="nccl")

# The tensor will be stored by the `sender` actor instead of in Ray's object
# store.
tensor = sender.random_tensor.remote()
result = receiver.sum.remote(tensor)
ray.get(result)

The main code differences are:

  1. The @ray.method uses tensor_transport="nccl" instead of tensor_transport="gloo".

  2. The ray.experimental.collective.create_collective_group function is used to create a collective group.

  3. The tensor is created on the GPU using the .cuda() method.

Usage with NIXL (CPUs or NVIDIA GPUs)#

NIXL can transfer data between different devices, including CPUs and NVIDIA GPUs, but doesn’t require a collective group to be created ahead of time. This means that any actor that has NIXL installed in its environment can be used to create and pass an RDT object.

Otherwise, the usage is the same as in the Gloo example.

Here is an example showing how to use NIXL to transfer an RDT object between two actors:

import torch
import ray


@ray.remote(num_gpus=1)
class MyActor:
    @ray.method(tensor_transport="nixl")
    def random_tensor(self):
        return torch.randn(1000, 1000).cuda()

    def sum(self, tensor: torch.Tensor):
        return torch.sum(tensor)


# No collective group is needed. The two actors just need to have NIXL
# installed.
sender, receiver = MyActor.remote(), MyActor.remote()

# The tensor will be stored by the `sender` actor instead of in Ray's object
# store.
tensor = sender.random_tensor.remote()
result = receiver.sum.remote(tensor)
ray.get(result)

Compared to the Gloo example, the main code differences are:

  1. The @ray.method uses tensor_transport="nixl" instead of tensor_transport="gloo".

  2. No collective group is needed.

Summary#

RDT allows Ray to store and pass objects directly between Ray actors, using accelerated transports like GLOO, NCCL, and NIXL. Here are the main points to keep in mind:

  • If using a collective-based tensor transport (Gloo or NCCL), a collective group must be created ahead of time. NIXL just requires all involved actors to have NIXL installed.

  • Unlike objects in the Ray object store, RDT objects are mutable, meaning that Ray only holds a reference, not a copy, to the stored tensor(s).

  • Otherwise, actors can be used as normal.

For a full list of limitations, see the limitations section.

Microbenchmarks#

Note

Under construction.

Limitations#

RDT is currently in alpha and currently has the following limitations, which may be addressed in future releases:

  • Support for torch.Tensor objects only.

  • Support for Ray actors only, not Ray tasks.

  • Support for the following transports: Gloo, NCCL, and NIXL.

  • Support for CPUs and NVIDIA GPUs only.

  • RDT objects are mutable. This means that Ray only holds a reference to the tensor, and will not copy it until a transfer is requested. Thus, if the application code also keeps a reference to a tensor before returning it, and modifies the tensor in place, then some or all of the changes may be seen by the receiving actor.

For collective-based tensor transports (Gloo and NCCL):

  • Only the process that created the collective group can submit actor tasks that return and pass RDT objects. If the creating process passes the actor handles to other processes, those processes can submit actor tasks as usual, but will not be able to use RDT objects.

  • Similarly, the process that created the collective group cannot serialize and pass RDT ray.ObjectRefs to other Ray tasks or actors. Instead, the ray.ObjectRefs can only be passed as direct arguments to other actor tasks, and those actors must be in the same collective group.

  • Each actor can only be in one collective group per tensor transport at a time.

  • No support for ray.put.

  • If a system-level error occurs during a collective operation, the collective group will be destroyed and the actors will no longer be able to communicate via the collective group. Note that application-level errors, i.e. exceptions raised by user code, will not destroy the collective group and will instead be propagated to any dependent task(s), as for non-RDT Ray objects. System-level errors include:

    • Errors internal to the third-party transport, e.g., NCCL network errors

    • Actor and node failure

    • Tensors returned by the user that are located on an unsupported device, e.g., a CPU tensor when using NCCL

    • Any unexpected system bugs

Advanced: RDT Internals#

Note

Under construction.