Ray Workflows provides strong fault tolerance and exactly-once execution semantics by checkpointing. However, checkpointing could be time consuming, especially when you have large inputs and outputs for workflow tasks. When exactly-once execution semantics is not required, you can skip some checkpoints to speed up your workflow.
Checkpoints can be skipped by specifying
data = read_data.options(**workflow.options(checkpoint=False)).bind(10)
This example skips checkpointing the output of
read_data. During recovery,
read_data would be executed again if recovery requires its output.
If the output of a task is another task (i.e., for dynamic workflows), we skip checkpointing the entire task.
Use Workflows with Ray Client¶
Ray Workflows supports Ray Client API, so you can submit workflows to a remote
Ray cluster. This requires starting the Ray cluster with the
for specifying the workflow storage.
To submit a workflow to a remote cluster, all you need is connect Ray to the cluster before submitting a workflow. No code changes are required. For example:
import subprocess import ray from ray import workflow @ray.remote def hello(count): return ["hello world"] * count try: subprocess.check_call( ["ray", "start", "--head", "--ray-client-server-port=10001", "--storage=file:///tmp/ray/workflow_data"]) ray.init("ray://127.0.0.1:10001") assert workflow.run(hello.bind(3)) == ["hello world"] * 3 finally: subprocess.check_call(["ray", "stop"])
Ray client support is still experimental and has some limitations. One known limitation is that
workflows will not work properly with ObjectRefs as workflow task inputs. For example,