Actor Fault Tolerance#

Actors can fail if the actor process dies, or if the owner of the actor dies. The owner of an actor is the worker that originally created the actor by calling ActorClass.remote(). Detached actors do not have an owner process and are cleaned up when the Ray cluster is destroyed.

Actor process failure#

Ray can automatically restart actors that crash unexpectedly. This behavior is controlled using max_restarts, which sets the maximum number of times that an actor will be restarted. The default value of max_restarts is 0, meaning that the actor won’t be restarted. If set to -1, the actor will be restarted infinitely many times. When an actor is restarted, its state will be recreated by rerunning its constructor. After the specified number of restarts, subsequent actor methods will raise a RayActorError.

By default, actor tasks execute with at-most-once semantics (max_task_retries=0 in the @ray.remote decorator). This means that if an actor task is submitted to an actor that is unreachable, Ray will report the error with RayActorError, a Python-level exception that is thrown when ray.get is called on the future returned by the task. Note that this exception may be thrown even though the task did indeed execute successfully. For example, this can happen if the actor dies immediately after executing the task.

Ray also offers at-least-once execution semantics for actor tasks (max_task_retries=-1 or max_task_retries > 0). This means that if an actor task is submitted to an actor that is unreachable, the system will automatically retry the task. With this option, the system will only throw a RayActorError to the application if one of the following occurs: (1) the actor’s max_restarts limit has been exceeded and the actor cannot be restarted anymore, or (2) the max_task_retries limit has been exceeded for this particular task. Note that if the actor is currently restarting when a task is submitted, this will count for one retry. The retry limit can be set to infinity with max_task_retries = -1.

You can experiment with this behavior by running the following code.

import os
import ray

ray.init()

# This actor kills itself after executing 10 tasks.
@ray.remote(max_restarts=4, max_task_retries=-1)
class Actor:
    def __init__(self):
        self.counter = 0

    def increment_and_possibly_fail(self):
        # Exit after every 10 tasks.
        if self.counter == 10:
            os._exit(0)
        self.counter += 1
        return self.counter

actor = Actor.remote()

# The actor will be reconstructed up to 4 times, so we can execute up to 50
# tasks successfully. The actor is reconstructed by rerunning its constructor.
# Methods that were executing when the actor died will be retried and will not
# raise a `RayActorError`. Retried methods may execute twice, once on the
# failed actor and a second time on the restarted actor.
for _ in range(50):
    counter = ray.get(actor.increment_and_possibly_fail.remote())
    print(counter)  # Prints the sequence 1-10 5 times.

# After the actor has been restarted 4 times, all subsequent methods will
# raise a `RayActorError`.
for _ in range(10):
    try:
        counter = ray.get(actor.increment_and_possibly_fail.remote())
        print(counter)  # Unreachable.
    except ray.exceptions.RayActorError:
        print("FAILURE")  # Prints 10 times.

For at-least-once actors, the system will still guarantee execution ordering according to the initial submission order. For example, any tasks submitted after a failed actor task will not execute on the actor until the failed actor task has been successfully retried. The system will not attempt to re-execute any tasks that executed successfully before the failure (unless max_task_retries is nonzero and the task is needed for object reconstruction).

Note

For async or threaded actors, tasks might be executed out of order. Upon actor restart, the system will only retry incomplete tasks. Previously completed tasks will not be re-executed.

At-least-once execution is best suited for read-only actors or actors with ephemeral state that does not need to be rebuilt after a failure. For actors that have critical state, the application is responsible for recovering the state, e.g., by taking periodic checkpoints and recovering from the checkpoint upon actor restart.

Actor checkpointing#

max_restarts automatically restarts the crashed actor, but it doesn’t automatically restore application level state in your actor. Instead, you should manually checkpoint your actor’s state and recover upon actor restart.

For actors that are restarted manually, the actor’s creator should manage the checkpoint and manually restart and recover the actor upon failure. This is recommended if you want the creator to decide when the actor should be restarted and/or if the creator is coordinating actor checkpoints with other execution:

import os
import sys
import ray
import json
import tempfile
import shutil


@ray.remote(num_cpus=1)
class Worker:
    def __init__(self):
        self.state = {"num_tasks_executed": 0}

    def execute_task(self, crash=False):
        if crash:
            sys.exit(1)

        # Execute the task
        # ...

        # Update the internal state
        self.state["num_tasks_executed"] = self.state["num_tasks_executed"] + 1

    def checkpoint(self):
        return self.state

    def restore(self, state):
        self.state = state


class Controller:
    def __init__(self):
        self.worker = Worker.remote()
        self.worker_state = ray.get(self.worker.checkpoint.remote())

    def execute_task_with_fault_tolerance(self):
        i = 0
        while True:
            i = i + 1
            try:
                ray.get(self.worker.execute_task.remote(crash=(i % 2 == 1)))
                # Checkpoint the latest worker state
                self.worker_state = ray.get(self.worker.checkpoint.remote())
                return
            except ray.exceptions.RayActorError:
                print("Actor crashes, restarting...")
                # Restart the actor and restore the state
                self.worker = Worker.remote()
                ray.get(self.worker.restore.remote(self.worker_state))


controller = Controller()
controller.execute_task_with_fault_tolerance()
controller.execute_task_with_fault_tolerance()
assert ray.get(controller.worker.checkpoint.remote())["num_tasks_executed"] == 2

Alternatively, if you are using Ray’s automatic actor restart, the actor can checkpoint itself manually and restore from a checkpoint in the constructor:

@ray.remote(max_restarts=-1, max_task_retries=-1)
class ImmortalActor:
    def __init__(self, checkpoint_file):
        self.checkpoint_file = checkpoint_file

        if os.path.exists(self.checkpoint_file):
            # Restore from a checkpoint
            with open(self.checkpoint_file, "r") as f:
                self.state = json.load(f)
        else:
            self.state = {}

    def update(self, key, value):
        import random

        if random.randrange(10) < 5:
            sys.exit(1)

        self.state[key] = value

        # Checkpoint the latest state
        with open(self.checkpoint_file, "w") as f:
            json.dump(self.state, f)

    def get(self, key):
        return self.state[key]


checkpoint_dir = tempfile.mkdtemp()
actor = ImmortalActor.remote(os.path.join(checkpoint_dir, "checkpoint.json"))
ray.get(actor.update.remote("1", 1))
ray.get(actor.update.remote("2", 2))
assert ray.get(actor.get.remote("1")) == 1
shutil.rmtree(checkpoint_dir)

Note

If the checkpoint is saved to external storage, make sure it’s accessible to the entire cluster since the actor can be restarted on a different node. For example, save the checkpoint to cloud storage (e.g., S3) or a shared directory (e.g., via NFS).

Actor creator failure#

For non-detached actors, the owner of an actor is the worker that created it, i.e. the worker that called ActorClass.remote(). Similar to objects, if the owner of an actor dies, then the actor will also fate-share with the owner. Ray will not automatically recover an actor whose owner is dead, even if it has a nonzero max_restarts.

Since detached actors do not have an owner, they will still be restarted by Ray even if their original creator dies. Detached actors will continue to be automatically restarted until the maximum restarts is exceeded, the actor is destroyed, or until the Ray cluster is destroyed.

You can try out this behavior in the following code.

import ray
import os
import signal
ray.init()

@ray.remote(max_restarts=-1)
class Actor:
    def ping(self):
        return "hello"

@ray.remote
class Parent:
    def generate_actors(self):
        self.child = Actor.remote()
        self.detached_actor = Actor.options(name="actor", lifetime="detached").remote()
        return self.child, self.detached_actor, os.getpid()

parent = Parent.remote()
actor, detached_actor, pid = ray.get(parent.generate_actors.remote())

os.kill(pid, signal.SIGKILL)

try:
    print("actor.ping:", ray.get(actor.ping.remote()))
except ray.exceptions.RayActorError as e:
    print("Failed to submit actor call", e)
# Failed to submit actor call The actor died unexpectedly before finishing this task.
# 	class_name: Actor
# 	actor_id: 56f541b178ff78470f79c3b601000000
# 	namespace: ea8b3596-7426-4aa8-98cc-9f77161c4d5f
# The actor is dead because because all references to the actor were removed.

try:
    print("detached_actor.ping:", ray.get(detached_actor.ping.remote()))
except ray.exceptions.RayActorError as e:
    print("Failed to submit detached actor call", e)
# detached_actor.ping: hello

Force-killing a misbehaving actor#

Sometimes application-level code can cause an actor to hang or leak resources. In these cases, Ray allows you to recover from the failure by manually terminating the actor. You can do this by calling ray.kill on any handle to the actor. Note that it does not need to be the original handle to the actor.

If max_restarts is set, you can also allow Ray to automatically restart the actor by passing no_restart=False to ray.kill.

Unavailable actors#

When an actor can’t accept method calls, a ray.get on the method’s returned object reference may raise ActorUnavailableError. This exception indicates the actor isn’t accessible at the moment, but may recover after waiting and retrying. Typical cases include:

  • The actor is restarting. For example, it’s waiting for resources or running the class constructor during the restart.

  • The actor is experiencing transient network issues, like connection outages.

  • The actor is dead, but the death hasn’t yet been reported to the system.

Actor method calls are executed at-most-once. When a ray.get() call raises the ActorUnavailableError exception, there’s no guarantee on whether the actor executed the task or not. If the method has side effects, they may or may not be observable. Ray does guarantee that the method won’t be executed twice, unless the actor or the method is configured with retries, as described in the next section.

The actor may or may not recover in the next calls. Those subsequent calls may raise ActorDiedError if the actor is confirmed dead, ActorUnavailableError if it’s still unreachable, or return values normally if the actor recovered.

As a best practice, if the caller gets the ActorUnavailableError error, it should “quarantine” the actor and stop sending traffic to the actor. It can then periodically ping the actor until it raises ActorDiedError or returns OK.

If a task has max_task_retries > 0 and it received ActorUnavailableError, Ray will retry the task up to max_task_retries times. If the actor is restarting in its constructor, the task retry will fail, consuming one retry count. If there are still retries remaining, Ray will retry again after RAY_task_retry_delay_ms, until all retries are consumed or the actor is ready to accept tasks. If the constructor takes a long time to run, consider increasing max_task_retries or increase RAY_task_retry_delay_ms.

Actor method exceptions#

Sometime you want to retry when an actor method raises exceptions. Use max_task_retries with retry_exceptions to retry.

Note that by default, retrying on user raised exceptions is disabled. To enable it, make sure the method is idempotent, that is, invoking it multiple times should be equivalent to invoking it only once.

You can set retry_exceptions in the @ray.method(retry_exceptions=...) decorator, or in the options(retry_exceptions=...) in the method call.

Retry behavior depends on the value you set retry_exceptions to: - retry_exceptions == False (default): No retries for user exceptions. - retry_exceptions == True: Ray retries a method on user exception up to max_task_retries times. - retry_exceptions is a list of exceptions: Ray retries a method on user exception up to max_task_retries times, only if the method raises an exception from these specific classes.

max_task_retries applies to both exceptions and actor crashes. A Ray actor can set this option to apply to all of its methods. A method can also set an overriding option for itself. Ray searches for the first non-default value of max_task_retries in this order:

  • The method call’s value, for example, actor.method.options(max_task_retries=2). Ray ignores this value if you don’t set it.

  • The method definition’s value, for example, @ray.method(max_task_retries=2). Ray ignores this value if you don’t set it.

  • The actor creation call’s value, for example, Actor.options(max_task_retries=2). Ray ignores this value if you didn’t set it.

  • The Actor class definition’s value, for example, @ray.remote(max_task_retries=2) decorator. Ray ignores this value if you didn’t set it.

  • The default value,`0`.

For example, if a method sets max_task_retries=5 and retry_exceptions=True, and the actor sets max_restarts=2, Ray executes the method up to 6 times: once for the initial invocation, and 5 additional retries. The 6 invocations may include 2 actor crashes. After the 6th invocation, a ray.get call to the result Ray ObjectRef raises the exception raised in the last invocation, or ray.exceptions.RayActorError if the actor crashed in the last invocation.