Fault Tolerance#

Ray is a distributed system, and that means failures can happen. Generally, failures can be classified into two classes: 1) application-level failures, and 2) system-level failures. The former can happen because of bugs in user-level code, or if external systems fail. The latter can be triggered by node failures, network failures, or just bugs in Ray. Here, we describe the mechanisms that Ray provides to allow applications to recover from failures.

To handle application-level failures, Ray provides mechanisms to catch errors, retry failed code, and handle misbehaving code. See the pages for task and actor fault tolerance for more information on these mechanisms.

Ray also provides several mechanisms to automatically recover from internal system-level failures like node failures. In particular, Ray can automatically recover from some failures in the distributed object store.

How to Write Fault Tolerant Ray Applications#

There are several recommendations to make Ray applications fault tolerant:

First, if the fault tolerance mechanisms provided by Ray don’t work for you, you can always catch exceptions caused by failures and recover manually.

@ray.remote
class Actor:
    def read_only(self):
        import sys
        import random

        rand = random.random()
        if rand < 0.2:
            return 2 / 0
        elif rand < 0.3:
            sys.exit(1)

        return 2


actor = Actor.remote()
# Manually retry the actor task.
while True:
    try:
        print(ray.get(actor.read_only.remote()))
        break
    except ZeroDivisionError:
        pass
    except ray.exceptions.RayActorError:
        # Manually restart the actor
        actor = Actor.remote()

Second, avoid letting an ObjectRef outlive its owner task or actor (the task or actor that creates the initial ObjectRef by calling ray.put() or foo.remote()). As long as there are still references to an object, the owner worker of the object keeps running even after the corresponding task or actor finishes. If the owner worker fails, Ray cannot recover the object automatically for those who try to access the object. One example of creating such outlived objects is returning ObjectRef created by ray.put() from a task:

import ray


# Non-fault tolerant version:
@ray.remote
def a():
    x_ref = ray.put(1)
    return x_ref


x_ref = ray.get(a.remote())
# Object x outlives its owner task A.
try:
    # If owner of x (i.e. the worker process running task A) dies,
    # the application can no longer get value of x.
    print(ray.get(x_ref))
except ray.exceptions.OwnerDiedError:
    pass

In the above example, object x outlives its owner task a. If the worker process running task a fails, calling ray.get on x_ref afterwards will result in an OwnerDiedError exception.

A fault tolerant version is returning x directly so that it is owned by the driver and it’s only accessed within the lifetime of the driver. If x is lost, Ray can automatically recover it via lineage reconstruction. See Anti-pattern: Returning ray.put() ObjectRefs from a task harms performance and fault tolerance for more details.

# Fault tolerant version:
@ray.remote
def a():
    # Here we return the value directly instead of calling ray.put() first.
    return 1


# The owner of x is the driver
# so x is accessible and can be auto recovered
# during the entire lifetime of the driver.
x_ref = a.remote()
print(ray.get(x_ref))

Third, avoid using custom resource requirements that can only be satisfied by a particular node. If that particular node fails, the running tasks or actors cannot be retried.

@ray.remote
def b():
    return 1


# If the node with ip 127.0.0.3 fails while task b is running,
# Ray cannot retry the task on other nodes.
b.options(resources={"node:127.0.0.3": 1}).remote()

If you prefer running a task on a particular node, you can use the NodeAffinitySchedulingStrategy. It allows you to specify the affinity as a soft constraint so even if the target node fails, the task can still be retried on other nodes.

# Prefer running on the particular node specified by node id
# but can also run on other nodes if the target node fails.
b.options(
    scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=ray.get_runtime_context().get_node_id(), soft=True
    )
).remote()

More about Ray Fault Tolerance#