ray.dag.input_node.InputNode#

class ray.dag.input_node.InputNode(*args, input_type: type | Dict[int | str, type] | None = None, _other_args_to_resolve=None, **kwargs)[source]#

Bases: DAGNode

Ray dag node used in DAG building API to mark entrypoints of a DAG.

Should only be function or class method. A DAG can have multiple entrypoints, but only one instance of InputNode exists per DAG, shared among all DAGNodes.

Example:

        m1.forward
        /       \
dag_input     ensemble -> dag_output
        \       /
        m2.forward

In this pipeline, each user input is broadcasted to both m1.forward and m2.forward as first stop of the DAG, and authored like

import ray

@ray.remote
class Model:
    def __init__(self, val):
        self.val = val
    def forward(self, input):
        return self.val * input

@ray.remote
def combine(a, b):
    return a + b

with InputNode() as dag_input:
    m1 = Model.bind(1)
    m2 = Model.bind(2)
    m1_output = m1.forward.bind(dag_input[0])
    m2_output = m2.forward.bind(dag_input.x)
    ray_dag = combine.bind(m1_output, m2_output)

# Pass mix of args and kwargs as input.
ray_dag.execute(1, x=2) # 1 sent to m1, 2 sent to m2

# Alternatively user can also pass single data object, list or dict
# and access them via list index, object attribute or dict key str.
ray_dag.execute(UserDataObject(m1=1, m2=2))
# dag_input.m1, dag_input.m2
ray_dag.execute([1, 2])
# dag_input[0], dag_input[1]
ray_dag.execute({"m1": 1, "m2": 2})
# dag_input["m1"], dag_input["m2"]

DeveloperAPI: This API may change across minor Ray releases.

Methods

__init__

InputNode should only take attributes of validating and converting input data rather than the input data itself.

apply_functional

Apply a given function to DAGNodes in source_input_list, and return the replaced inputs without mutating or coping any DAGNode.

apply_recursive

Apply callable on each node in this DAG in a bottom-up tree walk.

execute

Execute this DAG using the Ray default executor _execute_impl().

experimental_compile

Compile an accelerated execution path for this DAG.

get_args

Return the tuple of arguments for this node.

get_kwargs

Return the dict of keyword arguments for this node.

get_object_refs_from_last_execute

Gets cached object refs from the last call to execute().

get_options

Return the dict of options arguments for this node.

get_other_args_to_resolve

Return the dict of other args to resolve arguments for this node.

get_result_type

Get type of the output of this DAGNode.

get_stable_uuid

Return stable uuid for this node.

set_context

Set field in parent DAGNode attribute that can be resolved in both pickle and JSON serialization

traverse_and_apply

Traverse all nodes in the connected component of the DAG that contains the self node, and apply the given function to each node.

with_tensor_transport

Configure the torch tensor transport for this node.

Attributes

type_hint