Fault tolerance#
Ray is a distributed system, and that means failures can happen. Generally, Ray classifies failures into two classes: 1. application-level failures 2. system-level failures Bugs in user-level code or external system failures trigger application-level failures. Node failures, network failures, or just bugs in Ray trigger system-level failures. The following section contains the mechanisms that Ray provides to allow applications to recover from failures.
To handle application-level failures, Ray provides mechanisms to catch errors, retry failed code, and handle misbehaving code. See the pages for task and actor fault tolerance for more information on these mechanisms.
Ray also provides several mechanisms to automatically recover from internal system-level failures like node failures. In particular, Ray can automatically recover from some failures in the distributed object store.
How to write fault tolerant Ray applications#
There are several recommendations to make Ray applications fault tolerant:
First, if the fault tolerance mechanisms provided by Ray don’t work for you, you can always catch exceptions caused by failures and recover manually.
@ray.remote
class Actor:
    def read_only(self):
        import sys
        import random
        rand = random.random()
        if rand < 0.2:
            return 2 / 0
        elif rand < 0.3:
            sys.exit(1)
        return 2
actor = Actor.remote()
# Manually retry the actor task.
while True:
    try:
        print(ray.get(actor.read_only.remote()))
        break
    except ZeroDivisionError:
        pass
    except ray.exceptions.RayActorError:
        # Manually restart the actor
        actor = Actor.remote()
Second, avoid letting an ObjectRef outlive its owner task or actor
(the task or actor that creates the initial ObjectRef by calling ray.put() or foo.remote()).
As long as there are still references to an object,
the owner worker of the object keeps running even after the corresponding task or actor finishes.
If the owner worker fails, Ray cannot recover the object automatically for those who try to access the object.
One example of creating such outlived objects is returning ObjectRef created by ray.put() from a task:
import ray
# Non-fault tolerant version:
@ray.remote
def a():
    x_ref = ray.put(1)
    return x_ref
x_ref = ray.get(a.remote())
# Object x outlives its owner task A.
try:
    # If owner of x (i.e. the worker process running task A) dies,
    # the application can no longer get value of x.
    print(ray.get(x_ref))
except ray.exceptions.OwnerDiedError:
    pass
In the preceding example, object x outlives its owner task a.
If the worker process running task a fails, calling ray.get on x_ref afterwards results in an OwnerDiedError exception.
The following example is a fault tolerant version which returns x directly. In this example, the driver owns x and you only access it within the lifetime of the driver.
If x is lost, Ray can automatically recover it via lineage reconstruction.
See Anti-pattern: Returning ray.put() ObjectRefs from a task harms performance and fault tolerance for more details.
# Fault tolerant version:
@ray.remote
def a():
    # Here we return the value directly instead of calling ray.put() first.
    return 1
# The owner of x is the driver
# so x is accessible and can be auto recovered
# during the entire lifetime of the driver.
x_ref = a.remote()
print(ray.get(x_ref))
Third, avoid using custom resource requirements that only particular nodes can satisfy. If that particular node fails, Ray won’t retry the running tasks or actors.
@ray.remote
def b():
    return 1
# If the node with ip 127.0.0.3 fails while task b is running,
# Ray cannot retry the task on other nodes.
b.options(resources={"node:127.0.0.3": 1}).remote()
If you prefer running a task on a particular node, you can use the NodeAffinitySchedulingStrategy.
It allows you to specify the affinity as a soft constraint so even if the target node fails, the task can still be retried on other nodes.
# Prefer running on the particular node specified by node id
# but can also run on other nodes if the target node fails.
b.options(
    scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=ray.get_runtime_context().get_node_id(), soft=True
    )
).remote()