Ray Workflows API
Contents
Ray Workflows API#
Workflow Execution API#
- ray.workflow.run(dag: ray.dag.dag_node.DAGNode, *args, workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs) Any [source]#
Run a workflow.
If the workflow with the given id already exists, it will be resumed.
Examples
>>> import ray >>> from ray import workflow >>> Flight, Reservation, Trip = ... >>> @ray.remote ... def book_flight(origin: str, dest: str) -> Flight: ... return Flight(...) >>> @ray.remote ... def book_hotel(location: str) -> Reservation: ... return Reservation(...) >>> @ray.remote ... def finalize_trip(bookings: List[Any]) -> Trip: ... return Trip(...)
>>> flight1 = book_flight.bind("OAK", "SAN") >>> flight2 = book_flight.bind("SAN", "OAK") >>> hotel = book_hotel.bind("SAN") >>> trip = finalize_trip.bind([flight1, flight2, hotel]) >>> result = workflow.run(trip)
- Parameters
workflow_id – A unique identifier that can be used to resume the workflow. If not specified, a random id will be generated.
metadata – The metadata to add to the workflow. It has to be able to serialize to json.
- Returns
The running result.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- ray.workflow.run_async(dag: ray.dag.dag_node.DAGNode, *args, workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs) ray.ObjectRef [source]#
Run a workflow asynchronously.
If the workflow with the given id already exists, it will be resumed.
- Parameters
workflow_id – A unique identifier that can be used to resume the workflow. If not specified, a random id will be generated.
metadata – The metadata to add to the workflow. It has to be able to serialize to json.
- Returns
The running result as ray.ObjectRef.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
Management API#
- ray.workflow.resume(workflow_id: str) Any [source]#
Resume a workflow.
Resume a workflow and retrieve its output. If the workflow was incomplete, it will be re-executed from its checkpointed outputs. If the workflow was complete, returns the result immediately.
Examples
>>> from ray import workflow >>> start_trip = ... >>> trip = start_trip.bind() >>> res1 = workflow.run_async(trip, workflow_id="trip1") >>> res2 = workflow.resume_async("trip1") >>> assert ray.get(res1) == ray.get(res2)
- Parameters
workflow_id – The id of the workflow to resume.
- Returns
The output of the workflow.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- ray.workflow.resume_async(workflow_id: str) ray.ObjectRef [source]#
Resume a workflow asynchronously.
Resume a workflow and retrieve its output. If the workflow was incomplete, it will be re-executed from its checkpointed outputs. If the workflow was complete, returns the result immediately.
Examples
>>> from ray import workflow >>> start_trip = ... >>> trip = start_trip.bind() >>> res1 = workflow.run_async(trip, workflow_id="trip1") >>> res2 = workflow.resume_async("trip1") >>> assert ray.get(res1) == ray.get(res2)
- Parameters
workflow_id – The id of the workflow to resume.
- Returns
An object reference that can be used to retrieve the workflow result.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- ray.workflow.resume_all(include_failed: bool = False) List[Tuple[str, ray.ObjectRef]] [source]#
Resume all resumable workflow jobs.
This can be used after cluster restart to resume all tasks.
- Parameters
include_failed – Whether to resume FAILED workflows.
Examples
>>> from ray import workflow >>> failed_job = ... >>> workflow_task = failed_job.bind() >>> output = workflow.run_async( ... workflow_task, workflow_id="failed_job") >>> try: >>> ray.get(output) >>> except Exception: >>> print("JobFailed") >>> jobs = workflow.list_all() >>> assert jobs == [("failed_job", workflow.FAILED)] >>> assert workflow.resume_all( ... include_failed=True).get("failed_job") is not None
- Returns
A list of (workflow_id, returned_obj_ref) resumed.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- ray.workflow.list_all(status_filter: Optional[Union[ray.workflow.common.WorkflowStatus, str, Set[Union[ray.workflow.common.WorkflowStatus, str]]]] = None) List[Tuple[str, ray.workflow.common.WorkflowStatus]] [source]#
List all workflows matching a given status filter. When returning “RESUMEABLE” workflows, the workflows that was running ranks before the workflow that was pending in the result list.
- Parameters
status_filter – If given, only returns workflow with that status. This can be a single status or set of statuses. The string form of the status is also acceptable, i.e., “RUNNING”/”FAILED”/”SUCCESSFUL”/”CANCELED”/”RESUMABLE”/”PENDING”.
Examples
>>> from ray import workflow >>> long_running_job = ... >>> workflow_task = long_running_job.bind() >>> wf = workflow.run_async(workflow_task, ... workflow_id="long_running_job") >>> jobs = workflow.list_all() >>> assert jobs == [ ("long_running_job", workflow.RUNNING) ] >>> ray.get(wf) >>> jobs = workflow.list_all({workflow.RUNNING}) >>> assert jobs == [] >>> jobs = workflow.list_all(workflow.SUCCESSFUL) >>> assert jobs == [ ... ("long_running_job", workflow.SUCCESSFUL)]
- Returns
A list of tuple with workflow id and workflow status
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- ray.workflow.get_status(workflow_id: str) ray.workflow.common.WorkflowStatus [source]#
Get the status for a given workflow.
- Parameters
workflow_id – The workflow to query.
Examples
>>> from ray import workflow >>> trip = ... >>> workflow_task = trip.bind() >>> output = workflow.run(workflow_task, workflow_id="trip") >>> assert workflow.SUCCESSFUL == workflow.get_status("trip")
- Returns
The status of that workflow
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- ray.workflow.get_output(workflow_id: str, *, task_id: Optional[str] = None) Any [source]#
Get the output of a running workflow.
- Parameters
workflow_id – The workflow to get the output of.
task_id – If set, fetch the specific task instead of the output of the workflow.
Examples
>>> from ray import workflow >>> start_trip = ... >>> trip = start_trip.options(task_id="trip").bind() >>> res1 = workflow.run_async(trip, workflow_id="trip1") >>> # you could "get_output()" in another machine >>> res2 = workflow.get_output_async("trip1") >>> assert ray.get(res1) == ray.get(res2) >>> task_output = workflow.get_output_async("trip1", "trip") >>> assert ray.get(task_output) == ray.get(res1)
- Returns
The output of the workflow task.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- ray.workflow.get_output_async(workflow_id: str, *, task_id: Optional[str] = None) ray.ObjectRef [source]#
Get the output of a running workflow asynchronously.
- Parameters
workflow_id – The workflow to get the output of.
task_id – If set, fetch the specific task output instead of the output of the workflow.
- Returns
An object reference that can be used to retrieve the workflow task result.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- ray.workflow.get_metadata(workflow_id: str, task_id: Optional[str] = None) Dict[str, Any] [source]#
Get the metadata of the workflow.
This will return a dict of metadata of either the workflow ( if only workflow_id is given) or a specific workflow task (if both workflow_id and task id are given). Exception will be raised if the given workflow id or task id does not exist.
If only workflow id is given, this will return metadata on workflow level, which includes running status, workflow-level user metadata and workflow-level running stats (e.g. the start time and end time of the workflow).
If both workflow id and task id are given, this will return metadata on workflow task level, which includes task inputs, task-level user metadata and task-level running stats (e.g. the start time and end time of the task).
- Parameters
workflow_id – The workflow to get the metadata of.
task_id – If set, fetch the metadata of the specific task instead of the metadata of the workflow.
Examples
>>> from ray import workflow >>> trip = ... >>> workflow_task = trip.options( ... **workflow.options(task_id="trip", metadata={"k1": "v1"})).bind() >>> workflow.run(workflow_task, ... workflow_id="trip1", metadata={"k2": "v2"}) >>> workflow_metadata = workflow.get_metadata("trip1") >>> assert workflow_metadata["status"] == "SUCCESSFUL" >>> assert workflow_metadata["user_metadata"] == {"k2": "v2"} >>> assert "start_time" in workflow_metadata["stats"] >>> assert "end_time" in workflow_metadata["stats"] >>> task_metadata = workflow.get_metadata("trip1", "trip") >>> assert task_metadata["task_type"] == "FUNCTION" >>> assert task_metadata["user_metadata"] == {"k1": "v1"} >>> assert "start_time" in task_metadata["stats"] >>> assert "end_time" in task_metadata["stats"]
- Returns
A dictionary containing the metadata of the workflow.
- Raises
ValueError – if given workflow or workflow task does not exist.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.
- ray.workflow.cancel(workflow_id: str) None [source]#
- Cancel a workflow. Workflow checkpoints will still be saved in storage. To
clean up saved checkpoints, see
workflow.delete()
.
- Parameters
workflow_id – The workflow to cancel.
Examples
>>> from ray import workflow >>> some_job = ... >>> workflow_task = some_job.bind() >>> output = workflow.run_async(workflow_task, ... workflow_id="some_job") >>> workflow.cancel(workflow_id="some_job") >>> assert [ ... ("some_job", workflow.CANCELED)] == workflow.list_all()
- Returns
None
PublicAPI (alpha): This API is in alpha and may change before becoming stable.