Scheduling

For each task or actor, Ray will choose a node to run it and the scheduling decision is based on the following factors.

Resources

Each task or actor has the specified resource requirements. Given that, a node can be in one of the following states:

  • Feasible: the node has the required resources to run the task or actor. Depending on the current availability of these resources, there are two sub-states:

    • Available: the node has the required resources and they are free now.

    • Unavailable: the node has the required resources but they are currently being used by other tasks or actors.

  • Infeasible: the node doesn’t have the required resources. For example a CPU-only node is infeasible for a GPU task.

Resource requirements are hard requirements meaning that only feasible nodes are eligible to run the task or actor. If there are feasible nodes, Ray will either choose an avaialbe node or wait until a unavailable node to become available depending on other factors discussed below. If all nodes are infeasible, the task or actor cannot be scheduled until feasible nodes are added to the cluster.

Scheduling Strategies

Tasks or actors support a scheduling_strategy option to specify the strategy used to decide the best node among feasible nodes. Currently the supported strategies are the followings.

“DEFAULT”

"DEFAULT" is the default strategy used by Ray. With the current implementation, Ray will try to pack tasks or actors on nodes until the resource utilization is beyond a certain threshold and spread them afterwards.

Currently Ray handles actors that don’t require any resources (i.e., num_cpus=0 with no other resources) specially by randomly choosing a node in the cluster without considering resource utilization. Since nodes are randomly chosen, actors that don’t require any resources are effectively SPREAD across the cluster.

@ray.remote
def func():
    return 1


@ray.remote(num_cpus=1)
class Actor:
    pass


# If unspecified, "DEFAULT" scheduling strategy is used.
func.remote()
actor = Actor.remote()
# Explicitly set scheduling strategy to "DEFAULT".
func.options(scheduling_strategy="DEFAULT").remote()
actor = Actor.options(scheduling_strategy="DEFAULT").remote()

# Zero-CPU (and no other resources) actors are randomly assigned to nodes.
actor = Actor.options(num_cpus=0).remote()

“SPREAD”

"SPREAD" strategy will try to spread the tasks or actors among available nodes.

@ray.remote(scheduling_strategy="SPREAD")
def spread_func():
    return 2


@ray.remote(num_cpus=1)
class SpreadActor:
    pass


# Spread tasks across the cluster.
[spread_func.remote() for _ in range(10)]
# Spread actors across the cluster.
actors = [SpreadActor.options(scheduling_strategy="SPREAD").remote() for _ in range(10)]

PlacementGroupSchedulingStrategy

PlacementGroupSchedulingStrategy will schedule the task or actor to where the placement group is located. This is useful for actor gang scheduling. See Placement Group for more details.

NodeAffinitySchedulingStrategy

NodeAffinitySchedulingStrategy is a low-level strategy that allows a task or actor to be scheduled onto a particular node specified by its node id. The soft flag specifies whether the task or actor is allowed to run somewhere else if the specified node doesn’t exist (e.g. if the node dies) or is infeasible because it does not have the resources required to run the task or actor. In these cases, if soft is True, the task or actor will be scheduled onto a different feasible node. Otherwise, the task or actor will fail with TaskUnschedulableError or ActorUnschedulableError. As long as the specified node is alive and feasible, the task or actor will only run there regardless of the soft flag. This means if the node currently has no available resources, the task or actor will wait until resources become available. This strategy should only be used if other high level scheduling strategies (e.g. placement group) cannot give the desired task or actor placements. It has the following known limitations:

  • It’s a low-level strategy which prevents optimizations by a smart scheduler.

  • It cannot fully utilize an autoscaling cluster since node ids must be known when the tasks or actors are created.

  • It can be difficult to make the best static placement decision especially in a multi-tenant cluster: for example, an application won’t know what else is being scheduled onto the same nodes.

@ray.remote
def node_affinity_func():
    return ray.get_runtime_context().node_id


@ray.remote(num_cpus=1)
class NodeAffinityActor:
    pass


# Only run the task on the local node.
node_affinity_func.options(
    scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=ray.get_runtime_context().node_id,
        soft=False,
    )
).remote()

# Run the two node_affinity_func tasks on the same node if possible.
node_affinity_func.options(
    scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=ray.get(node_affinity_func.remote()),
        soft=True,
    )
).remote()

# Only run the actor on the local node.
actor = NodeAffinityActor.options(
    scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=ray.get_runtime_context().node_id,
        soft=False,
    )
).remote()

Locality-Aware Scheduling

By default, Ray prefers available nodes that have large task arguments local to avoid transferring data over the network. If there are multiple large task arguments, the node with most object bytes local is preferred. This takes precedence over the "DEFAULT" scheduling strategy, which means Ray will try to run the task on the locality preferred node regardless of the node resource utilization. However, if the locality preferred node is not available, Ray may run the task somewhere else. When other scheduling strategies are specified, they have higher precedence and data locality is no longer considered.

Note

Locality-aware scheduling is only for tasks not actors.

@ray.remote
def large_object_func():
    # Large object is stored in the local object store
    # and available in the distributed memory,
    # instead of returning inline directly to the caller.
    return [1] * (1024 * 1024)


@ray.remote
def small_object_func():
    # Small object is returned inline directly to the caller,
    # instead of storing in the distributed memory.
    return [1]


@ray.remote
def consume_func(data):
    return len(data)


large_object = large_object_func.remote()
small_object = small_object_func.remote()

# Ray will try to run consume_func on the same node
# where large_object_func runs.
consume_func.remote(large_object)

# Ray will try to spread consume_func across the entire cluster
# instead of only running on the node where large_object_func runs.
[
    consume_func.options(scheduling_strategy="SPREAD").remote(large_object)
    for i in range(10)
]

# Ray won't consider locality for scheduling consume_func
# since the argument is small and will be sent to the worker node inline directly.
consume_func.remote(small_object)