Actor Fault Tolerance#
Actors can fail if the actor process dies, or if the owner of the actor
dies. The owner of an actor is the worker that originally created the actor by
calling ActorClass.remote()
. Detached actors do
not have an owner process and are cleaned up when the Ray cluster is destroyed.
Actor process failure#
Ray can automatically restart actors that crash unexpectedly.
This behavior is controlled using max_restarts
,
which sets the maximum number of times that an actor will be restarted.
The default value of max_restarts
is 0, meaning that the actor won’t be
restarted. If set to -1, the actor will be restarted infinitely many times.
When an actor is restarted, its state will be recreated by rerunning its
constructor.
After the specified number of restarts, subsequent actor methods will
raise a RayActorError
.
By default, actor tasks execute with at-most-once semantics
(max_task_retries=0
in the @ray.remote
decorator
). This means that if an
actor task is submitted to an actor that is unreachable, Ray will report the
error with RayActorError
, a Python-level exception that is thrown when
ray.get
is called on the future returned by the task. Note that this
exception may be thrown even though the task did indeed execute successfully.
For example, this can happen if the actor dies immediately after executing the
task.
Ray also offers at-least-once execution semantics for actor tasks
(max_task_retries=-1
or max_task_retries > 0
). This means that if an
actor task is submitted to an actor that is unreachable, the system will
automatically retry the task. With this option, the system will only throw a
RayActorError
to the application if one of the following occurs: (1) the
actor’s max_restarts
limit has been exceeded and the actor cannot be
restarted anymore, or (2) the max_task_retries
limit has been exceeded for
this particular task. Note that if the actor is currently restarting when a
task is submitted, this will count for one retry. The retry limit can be set to
infinity with max_task_retries = -1
.
You can experiment with this behavior by running the following code.
import os
import ray
ray.init()
# This actor kills itself after executing 10 tasks.
@ray.remote(max_restarts=4, max_task_retries=-1)
class Actor:
def __init__(self):
self.counter = 0
def increment_and_possibly_fail(self):
# Exit after every 10 tasks.
if self.counter == 10:
os._exit(0)
self.counter += 1
return self.counter
actor = Actor.remote()
# The actor will be reconstructed up to 4 times, so we can execute up to 50
# tasks successfully. The actor is reconstructed by rerunning its constructor.
# Methods that were executing when the actor died will be retried and will not
# raise a `RayActorError`. Retried methods may execute twice, once on the
# failed actor and a second time on the restarted actor.
for _ in range(50):
counter = ray.get(actor.increment_and_possibly_fail.remote())
print(counter) # Prints the sequence 1-10 5 times.
# After the actor has been restarted 4 times, all subsequent methods will
# raise a `RayActorError`.
for _ in range(10):
try:
counter = ray.get(actor.increment_and_possibly_fail.remote())
print(counter) # Unreachable.
except ray.exceptions.RayActorError:
print("FAILURE") # Prints 10 times.
For at-least-once actors, the system will still guarantee execution ordering
according to the initial submission order. For example, any tasks submitted
after a failed actor task will not execute on the actor until the failed actor
task has been successfully retried. The system will not attempt to re-execute
any tasks that executed successfully before the failure
(unless max_task_retries
is nonzero and the task is needed for object
reconstruction).
Note
For async or threaded actors, tasks might be executed out of order. Upon actor restart, the system will only retry incomplete tasks. Previously completed tasks will not be re-executed.
At-least-once execution is best suited for read-only actors or actors with ephemeral state that does not need to be rebuilt after a failure. For actors that have critical state, the application is responsible for recovering the state, e.g., by taking periodic checkpoints and recovering from the checkpoint upon actor restart.
Actor checkpointing#
max_restarts
automatically restarts the crashed actor,
but it doesn’t automatically restore application level state in your actor.
Instead, you should manually checkpoint your actor’s state and recover upon actor restart.
For actors that are restarted manually, the actor’s creator should manage the checkpoint and manually restart and recover the actor upon failure. This is recommended if you want the creator to decide when the actor should be restarted and/or if the creator is coordinating actor checkpoints with other execution:
import os
import sys
import ray
import json
import tempfile
import shutil
@ray.remote(num_cpus=1)
class Worker:
def __init__(self):
self.state = {"num_tasks_executed": 0}
def execute_task(self, crash=False):
if crash:
sys.exit(1)
# Execute the task
# ...
# Update the internal state
self.state["num_tasks_executed"] = self.state["num_tasks_executed"] + 1
def checkpoint(self):
return self.state
def restore(self, state):
self.state = state
class Controller:
def __init__(self):
self.worker = Worker.remote()
self.worker_state = ray.get(self.worker.checkpoint.remote())
def execute_task_with_fault_tolerance(self):
i = 0
while True:
i = i + 1
try:
ray.get(self.worker.execute_task.remote(crash=(i % 2 == 1)))
# Checkpoint the latest worker state
self.worker_state = ray.get(self.worker.checkpoint.remote())
return
except ray.exceptions.RayActorError:
print("Actor crashes, restarting...")
# Restart the actor and restore the state
self.worker = Worker.remote()
ray.get(self.worker.restore.remote(self.worker_state))
controller = Controller()
controller.execute_task_with_fault_tolerance()
controller.execute_task_with_fault_tolerance()
assert ray.get(controller.worker.checkpoint.remote())["num_tasks_executed"] == 2
Alternatively, if you are using Ray’s automatic actor restart, the actor can checkpoint itself manually and restore from a checkpoint in the constructor:
@ray.remote(max_restarts=-1, max_task_retries=-1)
class ImmortalActor:
def __init__(self, checkpoint_file):
self.checkpoint_file = checkpoint_file
if os.path.exists(self.checkpoint_file):
# Restore from a checkpoint
with open(self.checkpoint_file, "r") as f:
self.state = json.load(f)
else:
self.state = {}
def update(self, key, value):
import random
if random.randrange(10) < 5:
sys.exit(1)
self.state[key] = value
# Checkpoint the latest state
with open(self.checkpoint_file, "w") as f:
json.dump(self.state, f)
def get(self, key):
return self.state[key]
checkpoint_dir = tempfile.mkdtemp()
actor = ImmortalActor.remote(os.path.join(checkpoint_dir, "checkpoint.json"))
ray.get(actor.update.remote("1", 1))
ray.get(actor.update.remote("2", 2))
assert ray.get(actor.get.remote("1")) == 1
shutil.rmtree(checkpoint_dir)
Note
If the checkpoint is saved to external storage, make sure it’s accessible to the entire cluster since the actor can be restarted on a different node. For example, save the checkpoint to cloud storage (e.g., S3) or a shared directory (e.g., via NFS).
Actor creator failure#
For non-detached actors, the owner of an actor is the
worker that created it, i.e. the worker that called ActorClass.remote()
. Similar to
objects, if the owner of an actor dies, then
the actor will also fate-share with the owner. Ray will not automatically
recover an actor whose owner is dead, even if it has a nonzero
max_restarts
.
Since detached actors do not have an owner, they will still be restarted by Ray even if their original creator dies. Detached actors will continue to be automatically restarted until the maximum restarts is exceeded, the actor is destroyed, or until the Ray cluster is destroyed.
You can try out this behavior in the following code.
import ray
import os
import signal
ray.init()
@ray.remote(max_restarts=-1)
class Actor:
def ping(self):
return "hello"
@ray.remote
class Parent:
def generate_actors(self):
self.child = Actor.remote()
self.detached_actor = Actor.options(name="actor", lifetime="detached").remote()
return self.child, self.detached_actor, os.getpid()
parent = Parent.remote()
actor, detached_actor, pid = ray.get(parent.generate_actors.remote())
os.kill(pid, signal.SIGKILL)
try:
print("actor.ping:", ray.get(actor.ping.remote()))
except ray.exceptions.RayActorError as e:
print("Failed to submit actor call", e)
# Failed to submit actor call The actor died unexpectedly before finishing this task.
# class_name: Actor
# actor_id: 56f541b178ff78470f79c3b601000000
# namespace: ea8b3596-7426-4aa8-98cc-9f77161c4d5f
# The actor is dead because because all references to the actor were removed.
try:
print("detached_actor.ping:", ray.get(detached_actor.ping.remote()))
except ray.exceptions.RayActorError as e:
print("Failed to submit detached actor call", e)
# detached_actor.ping: hello
Force-killing a misbehaving actor#
Sometimes application-level code can cause an actor to hang or leak resources.
In these cases, Ray allows you to recover from the failure by manually
terminating the actor. You can do this by calling
ray.kill
on any handle to the actor. Note that it does not need to be the
original handle to the actor.
If max_restarts
is set, you can also allow Ray to automatically restart the actor by passing no_restart=False
to ray.kill
.
Actor method exceptions#
Sometime you want to retry when an actor method raises exceptions. Use max_task_retries
with retry_exceptions
to retry.
Note that by default, retrying on user raised exceptions is disabled. To enable it, make sure the method is idempotent, that is, invoking it multiple times should be equivalent to invoking it only once.
You can set retry_exceptions
in the @ray.method(retry_exceptions=...)
decorator, or in the .options(retry_exceptions=...)
in the method call.
Retry behavior depends on the value you set retry_exceptions
to:
- retry_exceptions == False
(default): No retries for user exceptions.
- retry_exceptions == True
: Ray retries a method on user exception up to max_task_retries
times.
- retry_exceptions
is a list of exceptions: Ray retries a method on user exception up to max_task_retries
times, only if the method raises an exception from these specific classes.
max_task_retries
applies to both exceptions and actor crashes. A Ray actor can set this option to apply to all of its methods. A method can also set an overriding option for itself. Ray searches for the first non-default value of max_task_retries
in this order:
The method call’s value, for example,
actor.method.options(max_task_retries=2)
. Ray ignores this value if you don’t set it.The method definition’s value, for example,
@ray.method(max_task_retries=2)
. Ray ignores this value if you don’t set it.The actor creation call’s value, for example,
Actor.options(max_task_retries=2)
. Ray ignores this value if you didn’t set it.The Actor class definition’s value, for example,
@ray.remote(max_task_retries=2)
decorator. Ray ignores this value if you didn’t set it.The default value,`0`.
For example, if a method sets max_task_retries=5
and retry_exceptions=True
, and the actor sets max_restarts=2
, Ray executes the method up to 6 times: once for the initial invocation, and 5 additional retries. The 6 invocations may include 2 actor crashes. After the 6th invocation, a ray.get
call to the result Ray ObjectRef raises the exception raised in the last invocation, or ray.exceptions.RayActorError
if the actor crashed in the last invocation.