Task Fault Tolerance#
Tasks can fail due to application-level errors, e.g., Python-level exceptions, or system-level failures, e.g., a machine fails. Here, we describe the mechanisms that an application developer can use to recover from these errors.
Catching application-level failures#
Ray surfaces application-level failures as Python-level exceptions. When a task
on a remote worker or actor fails due to a Python-level exception, Ray wraps
the original exception in a RayTaskError
and stores this as the task’s
return value. This wrapped exception will be thrown to any worker that tries
to get the result, either by calling ray.get
or if the worker is executing
another task that depends on the object. If the user’s exception type can be subclassed,
the raised exception is an instance of both RayTaskError
and the user’s exception type
so the user can try-catch either of them. Otherwise, the wrapped exception is just
RayTaskError
and the actual user’s exception type can be accessed via the cause
field of the RayTaskError
.
import ray
@ray.remote
def f():
raise Exception("the real error")
@ray.remote
def g(x):
return
try:
ray.get(f.remote())
except ray.exceptions.RayTaskError as e:
print(e)
# ray::f() (pid=71867, ip=XXX.XX.XXX.XX)
# File "errors.py", line 5, in f
# raise Exception("the real error")
# Exception: the real error
try:
ray.get(g.remote(f.remote()))
except ray.exceptions.RayTaskError as e:
print(e)
# ray::g() (pid=73085, ip=128.32.132.47)
# At least one of the input arguments for this task could not be computed:
# ray.exceptions.RayTaskError: ray::f() (pid=73085, ip=XXX.XX.XXX.XX)
# File "errors.py", line 5, in f
# raise Exception("the real error")
# Exception: the real error
Example code of catching the user exception type when the exception type can be subclassed:
class MyException(Exception):
...
@ray.remote
def raises_my_exc():
raise MyException("a user exception")
try:
ray.get(raises_my_exc.remote())
except MyException as e:
print(e)
# ray::raises_my_exc() (pid=15329, ip=127.0.0.1)
# File "<$PWD>/task_exceptions.py", line 45, in raises_my_exc
# raise MyException("a user exception")
# MyException: a user exception
Example code of accessing the user exception type when the exception type can not be subclassed:
class MyFinalException(Exception):
def __init_subclass__(cls, /, *args, **kwargs):
raise TypeError("Cannot subclass this little exception class.")
@ray.remote
def raises_my_final_exc():
raise MyFinalException("a *final* user exception")
try:
ray.get(raises_my_final_exc.remote())
except ray.exceptions.RayTaskError as e:
assert isinstance(e.cause, MyFinalException)
print(e)
# 2024-04-08 21:11:47,417 WARNING exceptions.py:177 -- User exception type <class '__main__.MyFinalException'> in RayTaskError can not be subclassed! This exception will be raised as RayTaskError only. You can use `ray_task_error.cause` to access the user exception. Failure in subclassing: Cannot subclass this little exception class.
# ray::raises_my_final_exc() (pid=88226, ip=127.0.0.1)
# File "<$PWD>/task_exceptions.py", line 66, in raises_my_final_exc
# raise MyFinalException("a *final* user exception")
# MyFinalException: a *final* user exception
print(type(e.cause))
# <class '__main__.MyFinalException'>
print(e.cause)
# a *final* user exception
If Ray can’t serialize the user’s exception, it converts the exception to a RayError
.
import threading
class UnserializableException(Exception):
def __init__(self):
self.lock = threading.Lock()
@ray.remote
def raise_unserializable_error():
raise UnserializableException
try:
ray.get(raise_unserializable_error.remote())
except ray.exceptions.RayTaskError as e:
print(e)
# ray::raise_unserializable_error() (pid=328577, ip=172.31.5.154)
# File "/home/ubuntu/ray/tmp~/main.py", line 25, in raise_unserializable_error
# raise UnserializableException
# UnserializableException
print(type(e.cause))
# <class 'ray.exceptions.RayError'>
print(e.cause)
# The original cause of the RayTaskError (<class '__main__.UnserializableException'>) isn't serializable: cannot pickle '_thread.lock' object. Overwriting the cause to a RayError.
Use ray list tasks
from State API CLI to query task exit details:
# This API is only available when you download Ray via `pip install "ray[default]"`
ray list tasks
======== List: 2023-05-26 10:32:00.962610 ========
Stats:
------------------------------
Total: 3
Table:
------------------------------
TASK_ID ATTEMPT_NUMBER NAME STATE JOB_ID ACTOR_ID TYPE FUNC_OR_CLASS_NAME PARENT_TASK_ID NODE_ID WORKER_ID ERROR_TYPE
0 16310a0f0a45af5cffffffffffffffffffffffff01000000 0 f FAILED 01000000 NORMAL_TASK f ffffffffffffffffffffffffffffffffffffffff01000000 767bd47b72efb83f33dda1b661621cce9b969b4ef00788140ecca8ad b39e3c523629ab6976556bd46be5dbfbf319f0fce79a664122eb39a9 TASK_EXECUTION_EXCEPTION
1 c2668a65bda616c1ffffffffffffffffffffffff01000000 0 g FAILED 01000000 NORMAL_TASK g ffffffffffffffffffffffffffffffffffffffff01000000 767bd47b72efb83f33dda1b661621cce9b969b4ef00788140ecca8ad b39e3c523629ab6976556bd46be5dbfbf319f0fce79a664122eb39a9 TASK_EXECUTION_EXCEPTION
2 c8ef45ccd0112571ffffffffffffffffffffffff01000000 0 f FAILED 01000000 NORMAL_TASK f ffffffffffffffffffffffffffffffffffffffff01000000 767bd47b72efb83f33dda1b661621cce9b969b4ef00788140ecca8ad b39e3c523629ab6976556bd46be5dbfbf319f0fce79a664122eb39a9 TASK_EXECUTION_EXCEPTION
Retrying failed tasks#
When a worker is executing a task, if the worker dies unexpectedly, either
because the process crashed or because the machine failed, Ray will rerun
the task until either the task succeeds or the maximum number of retries is
exceeded. The default number of retries is 3 and can be overridden by
specifying max_retries
in the @ray.remote
decorator. Specifying -1
allows infinite retries, and 0 disables retries. To override the default number
of retries for all tasks submitted, set the OS environment variable
RAY_TASK_MAX_RETRIES
. e.g., by passing this to your driver script or by
using runtime environments.
You can experiment with this behavior by running the following code.
import numpy as np
import os
import ray
import time
ray.init(ignore_reinit_error=True)
@ray.remote(max_retries=1)
def potentially_fail(failure_probability):
time.sleep(0.2)
if np.random.random() < failure_probability:
os._exit(0)
return 0
for _ in range(3):
try:
# If this task crashes, Ray will retry it up to one additional
# time. If either of the attempts succeeds, the call to ray.get
# below will return normally. Otherwise, it will raise an
# exception.
ray.get(potentially_fail.remote(0.5))
print('SUCCESS')
except ray.exceptions.WorkerCrashedError:
print('FAILURE')
When a task returns a result in the Ray object store, it is possible for the
resulting object to be lost after the original task has already finished.
In these cases, Ray will also try to automatically recover the object by
re-executing the tasks that created the object. This can be configured through
the same max_retries
option described here. See object fault
tolerance for more information.
By default, Ray will not retry tasks upon exceptions thrown by application
code. However, you may control whether application-level errors are retried,
and even which application-level errors are retried, via the
retry_exceptions
argument. This is False
by default. To enable retries
upon application-level errors, set retry_exceptions=True
to retry upon any
exception, or pass a list of retryable exceptions. An example is shown below.
import numpy as np
import os
import ray
import time
ray.init(ignore_reinit_error=True)
class RandomError(Exception):
pass
@ray.remote(max_retries=1, retry_exceptions=True)
def potentially_fail(failure_probability):
if failure_probability < 0 or failure_probability > 1:
raise ValueError(
"failure_probability must be between 0 and 1, but got: "
f"{failure_probability}"
)
time.sleep(0.2)
if np.random.random() < failure_probability:
raise RandomError("Failed!")
return 0
for _ in range(3):
try:
# If this task crashes, Ray will retry it up to one additional
# time. If either of the attempts succeeds, the call to ray.get
# below will return normally. Otherwise, it will raise an
# exception.
ray.get(potentially_fail.remote(0.5))
print('SUCCESS')
except RandomError:
print('FAILURE')
# Provide the exceptions that we want to retry as an allowlist.
retry_on_exception = potentially_fail.options(retry_exceptions=[RandomError])
try:
# This will fail since we're passing in -1 for the failure_probability,
# which will raise a ValueError in the task and does not match the RandomError
# exception that we provided.
ray.get(retry_on_exception.remote(-1))
except ValueError:
print("FAILED AS EXPECTED")
else:
raise RuntimeError("An exception should be raised so this shouldn't be reached.")
# These will retry on the RandomError exception.
for _ in range(3):
try:
# If this task crashes, Ray will retry it up to one additional
# time. If either of the attempts succeeds, the call to ray.get
# below will return normally. Otherwise, it will raise an
# exception.
ray.get(retry_on_exception.remote(0.5))
print('SUCCESS')
except RandomError:
print('FAILURE AFTER RETRIES')
Use ray list tasks -f task_id=<task_id>
from State API CLI to see task attempts failures and retries:
# This API is only available when you download Ray via `pip install "ray[default]"`
ray list tasks -f task_id=16310a0f0a45af5cffffffffffffffffffffffff01000000
======== List: 2023-05-26 10:38:08.809127 ========
Stats:
------------------------------
Total: 2
Table:
------------------------------
TASK_ID ATTEMPT_NUMBER NAME STATE JOB_ID ACTOR_ID TYPE FUNC_OR_CLASS_NAME PARENT_TASK_ID NODE_ID WORKER_ID ERROR_TYPE
0 16310a0f0a45af5cffffffffffffffffffffffff01000000 0 potentially_fail FAILED 01000000 NORMAL_TASK potentially_fail ffffffffffffffffffffffffffffffffffffffff01000000 94909e0958e38d10d668aa84ed4143d0bf2c23139ae1a8b8d6ef8d9d b36d22dbf47235872ad460526deaf35c178c7df06cee5aa9299a9255 WORKER_DIED
1 16310a0f0a45af5cffffffffffffffffffffffff01000000 1 potentially_fail FINISHED 01000000 NORMAL_TASK potentially_fail ffffffffffffffffffffffffffffffffffffffff01000000 94909e0958e38d10d668aa84ed4143d0bf2c23139ae1a8b8d6ef8d9d 22df7f2a9c68f3db27498f2f435cc18582de991fbcaf49ce0094ddb0
Cancelling misbehaving tasks#
If a task is hanging, you may want to cancel the task to continue to make
progress. You can do this by calling ray.cancel
on an ObjectRef
returned by the task. By default, this will send a KeyboardInterrupt to the
task’s worker if it is mid-execution. Passing force=True
to ray.cancel
will force-exit the worker. See the API reference
for
ray.cancel
for more details.
Note that currently, Ray will not automatically retry tasks that have been cancelled.
Sometimes, application-level code may cause memory leaks on a worker after
repeated task executions, e.g., due to bugs in third-party libraries.
To make progress in these cases, you can set the max_calls
option in a
task’s @ray.remote
decorator. Once a worker has executed this many
invocations of the given remote function, it will automatically exit. By
default, max_calls
is set to infinity.