Quickstart#
Hello World#
This “hello world” example uses Ray Compiled Graph. First, install Ray.
pip install "ray[adag]"
We will define a simple actor.
import ray
import time
@ray.remote
class SimpleActor:
def echo(self, msg):
return msg
Create a very simple actor that directly returns a given input using classic Ray Core APIs, remote
and ray.get
.
a = SimpleActor.remote()
# warmup
for _ in range(5):
msg_ref = a.echo.remote("hello")
ray.get(msg_ref)
start = time.perf_counter()
msg_ref = a.echo.remote("hello")
ray.get(msg_ref)
end = time.perf_counter()
print(f"Execution takes {(end - start) * 1000 * 1000} us")
Execution takes 969.0364822745323 us
Create an equivalent program using Ray Compiled Graph. Note 4 key differences with the classic Ray Core APIs.
Create a static DAG using
with InputNode() as inp:
context manager.Use
bind
instead ofremote
.Use
experimental_compile
API for compilation.Use
execute
to execute the DAG.
Define a graph and compile it using experimental_compile
API.
import ray.dag
with ray.dag.InputNode() as inp:
# Note that it uses `bind` instead of `remote`.
dag = a.echo.bind(inp)
# experimental_compile is the key for optimizing the performance.
dag = dag.experimental_compile()
Next, execute the DAG and measure the performance.
# warmup
for _ in range(5):
msg_ref = dag.execute("hello")
ray.get(msg_ref)
start = time.perf_counter()
# `dag.execute` runs the DAG and returns a future. You can use `ray.get` API.
msg_ref = dag.execute("hello")
ray.get(msg_ref)
end = time.perf_counter()
print(f"Execution takes {(end - start) * 1000 * 1000} us")
Execution takes 86.72196418046951 us
The performance of the same DAG improved by 10X. The explanation for this improvement is because the function echo
is cheap and thus highly affected by
the system overhead. Due to various bookkeeping and distributed protocols, the classic Ray Core APIs usually have 1ms+ system overhead.
Because the DAG is known ahead of time, Compiled Graph can pre-allocate all necessary
resources ahead of time and greatly reduce the system overhead.
GPU to GPU communication#
Consider a very simple GPU to GPU example. With a type hint, Compiled Graph can prepare NCCL communicator and proper operations ahead of time, avoiding the deadlock and overlapping the compute and communication.
Ray Compiled Graph uses cupy library under the hood to support NCCL operations. The version of NCCL is affected by the cupy version. The Ray team is also planning to support custom communicator in the future, for example to support collectives across CPUs or to reuse existing collective groups.
Next, create sender and receiver actors.
import ray
import ray.dag
import torch
from ray.experimental.channel.torch_tensor_type import TorchTensorType
ray.init()
# Note that the following example requires at least 2 GPUs.
assert ray.available_resources().get("GPU") >= 2, "At least 2 GPUs are required to run this example."
import torch
@ray.remote(num_gpus=1)
class GPUSender:
def send(self, shape):
return torch.zeros(shape, device="cuda")
@ray.remote(num_gpus=1)
class GPUReceiver:
def recv(self, tensor: torch.Tensor):
assert tensor.device.type == "cuda"
return tensor.shape
sender = GPUSender.remote()
receiver = GPUReceiver.remote()
To support GPU to GPU RDMA with NCCL, you can use with_type_hint
API with Compiled Graph.
with ray.dag.InputNode() as inp:
dag = sender.send.bind(inp)
# It gives a type hint that the return value of `send` should use
# NCCL.
dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
dag = receiver.recv.bind(dag)
# Compile API prepares the NCCL communicator across all workers and schedule operations
# accordingly.
dag = dag.experimental_compile()
assert ray.get(dag.execute((10, ))) == (10, )