Lazy Computation Graphs with the Ray DAG API#

With ray.remote you have the flexibility of running an application where computation is executed remotely at runtime. For a ray.remote decorated class or function, you can also use .bind on the body to build a static computation graph.

Note

Ray DAG is designed to be a developer facing API where recommended use cases are

  1. Locally iterate and test your application authored by higher level libraries.

  2. Build libraries on top of the Ray DAG APIs.

When .bind() is called on a ray.remote decorated class or function, it will generate an intermediate representation (IR) node that act as backbone and building blocks of the DAG that is statically holding the computation graph together, where each IR node is resolved to value at execution time with respect to their topological order.

The IR node can also be assigned to a variable and passed into other nodes as arguments.

Ray DAG with functions#

The IR node generated by .bind() on a ray.remote decorated function is executed as a Ray Task upon execution which will be solved to the task output.

This example shows how to build a chain of functions where each node can be executed as root node while iterating, or used as input args or kwargs of other functions to form more complex DAGs.

Any IR node can be executed directly dag_node.execute() that acts as root of the DAG, where all other non-reachable nodes from the root will be igored.

import ray

ray.init()

@ray.remote
def func(src, inc=1):
    return src + inc

a_ref = func.bind(1, inc=2)
assert ray.get(a_ref.execute()) == 3 # 1 + 2 = 3
b_ref = func.bind(a_ref, inc=3)
assert ray.get(b_ref.execute()) == 6 # (1 + 2) + 3 = 6
c_ref = func.bind(b_ref, inc=a_ref)
assert ray.get(c_ref.execute()) == 9 # ((1 + 2) + 3) + (1 + 2) = 9

Ray DAG with classes and class methods#

The IR node generated by .bind() on a ray.remote decorated class is executed as a Ray Actor upon execution. The Actor will be instantiated every time the node is executed, and the classmethod calls can form a chain of function calls specific to the parent actor instance.

DAG IR nodes generated from a function, class or classmethod can be combined together to form a DAG.

import ray

ray.init()

@ray.remote
class Actor:
    def __init__(self, init_value):
        self.i = init_value

    def inc(self, x):
        self.i += x

    def get(self):
        return self.i

a1 = Actor.bind(10)  # Instantiate Actor with init_value 10.
val = a1.get.bind()  # ClassMethod that returns value from get() from
                     # the actor created.
assert ray.get(val.execute()) == 10

@ray.remote
def combine(x, y):
    return x + y

a2 = Actor.bind(10) # Instantiate another Actor with init_value 10.
a1.inc.bind(2)  # Call inc() on the actor created with increment of 2.
a1.inc.bind(4)  # Call inc() on the actor created with increment of 4.
a2.inc.bind(6)  # Call inc() on the actor created with increment of 6.

# Combine outputs from a1.get() and a2.get()
dag = combine.bind(a1.get.bind(), a2.get.bind())

# a1 +  a2 + inc(2) + inc(4) + inc(6)
# 10 + (10 + ( 2   +    4    +   6)) = 32
assert ray.get(dag.execute()) == 32

Ray DAG with custom InputNode#

InputNode is the singleton node of a DAG that represents user input value at runtime. It should be used within a context manager with no args, and called as args of dag_node.execute()

import ray

ray.init()

from ray.dag.input_node import InputNode

@ray.remote
def a(user_input):
    return user_input * 2

@ray.remote
def b(user_input):
    return user_input + 1

@ray.remote
def c(x, y):
    return x + y

with InputNode() as dag_input:
    a_ref = a.bind(dag_input)
    b_ref = b.bind(dag_input)
    dag = c.bind(a_ref, b_ref)

#   a(2)  +   b(2)  = c
# (2 * 2) + (2 + 1)
assert ray.get(dag.execute(2)) == 7

#   a(3)  +   b(3)  = c
# (3 * 2) + (3 + 1)
assert ray.get(dag.execute(3)) == 10

Ray DAG with multiple MultiOutputNode#

MultiOutputNode is useful when you have more than 1 output from a DAG. dag_node.execute() returns a list of Ray object references passed to MultiOutputNode. The below example shows the multi output node of 2 outputs.

import ray

from ray.dag.input_node import InputNode
from ray.dag.output_node import MultiOutputNode

@ray.remote
def f(input):
    return input + 1

with InputNode() as input_data:
    dag = MultiOutputNode([f.bind(input_data["x"]), f.bind(input_data["y"])])

refs = dag.execute({"x": 1, "y": 2})
assert ray.get(refs) == [2, 3]

Reuse Ray Actors in DAGs#

Actors can be a part of the DAG definition with the Actor.bind() API. However, when a DAG finishes execution, Ray kills Actors created with bind.

You can avoid killing your Actors whenever DAG finishes by creating Actors with Actor.remote().

import ray
from ray.dag.input_node import InputNode
from ray.dag.output_node import MultiOutputNode

@ray.remote
class Worker:
    def __init__(self):
        self.forwarded = 0

    def forward(self, input_data: int):
        self.forwarded += 1
        return input_data + 1
    
    def num_forwarded(self):
        return self.forwarded
    
# Create an actor via ``remote`` API not ``bind`` API to avoid
# killing actors when a DAG is finished.
worker = Worker.remote()

with InputNode() as input_data:
    dag = MultiOutputNode([worker.forward.bind(input_data)])

# Actors are reused. The DAG definition doesn't include
# actor creation.
assert ray.get(dag.execute(1)) == [2]
assert ray.get(dag.execute(2)) == [3]
assert ray.get(dag.execute(3)) == [4]

# You can still use other actor methods via `remote` API.
assert ray.get(worker.num_forwarded.remote()) == 3

More resources#

You can find more application patterns and examples in the following resources from other Ray libraries built on top of Ray DAG API with the same mechanism.