Advanced Topics

Skipping Checkpoints

Ray Workflows provides strong fault tolerance and exactly-once execution semantics by checkpointing. However, checkpointing could be time consuming, especially when you have large inputs and outputs for workflow tasks. When exactly-once execution semantics is not required, you can skip some checkpoints to speed up your workflow.

Checkpoints can be skipped by specifying checkpoint=False:

data = read_data.options(**workflow.options(checkpoint=False)).bind(10)

This example skips checkpointing the output of read_data. During recovery, read_data would be executed again if recovery requires its output.

If the output of a task is another task (i.e., for dynamic workflows), we skip checkpointing the entire task.

Use Workflows with Ray Client

Ray Workflows supports Ray Client API, so you can submit workflows to a remote Ray cluster. This requires starting the Ray cluster with the --storage=<storage_uri> option for specifying the workflow storage.

To submit a workflow to a remote cluster, all you need is connect Ray to the cluster before submitting a workflow. No code changes are required. For example:

import subprocess
import ray
from ray import workflow

@ray.remote
def hello(count):
    return ["hello world"] * count

try:
    subprocess.check_call(
        ["ray", "start", "--head", "--ray-client-server-port=10001", "--storage=file:///tmp/ray/workflow_data"])
    ray.init("ray://127.0.0.1:10001")
    assert workflow.run(hello.bind(3)) == ["hello world"] * 3
finally:
    subprocess.check_call(["ray", "stop"])

Warning

Ray client support is still experimental and has some limitations. One known limitation is that workflows will not work properly with ObjectRefs as workflow task inputs. For example, workflow.run(task.bind(ray.put(123))).