Ray Workflows API

Workflow Execution API

ray.workflow.run(dag: ray.dag.dag_node.DAGNode, *args, workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs) Any[source]

Run a workflow.

If the workflow with the given id already exists, it will be resumed.

Examples

>>> import ray
>>> from ray import workflow
>>> Flight, Reservation, Trip = ... 
>>> @ray.remote 
... def book_flight(origin: str, dest: str) -> Flight: 
...    return Flight(...) 
>>> @ray.remote 
... def book_hotel(location: str) -> Reservation: 
...    return Reservation(...) 
>>> @ray.remote 
... def finalize_trip(bookings: List[Any]) -> Trip: 
...    return Trip(...) 
>>> flight1 = book_flight.bind("OAK", "SAN") 
>>> flight2 = book_flight.bind("SAN", "OAK") 
>>> hotel = book_hotel.bind("SAN") 
>>> trip = finalize_trip.bind([flight1, flight2, hotel]) 
>>> result = workflow.run(trip) 
Parameters
  • workflow_id – A unique identifier that can be used to resume the workflow. If not specified, a random id will be generated.

  • metadata – The metadata to add to the workflow. It has to be able to serialize to json.

Returns

The running result.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

ray.workflow.run_async(dag: ray.dag.dag_node.DAGNode, *args, workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs) ray.ObjectRef[source]

Run a workflow asynchronously.

If the workflow with the given id already exists, it will be resumed.

Parameters
  • workflow_id – A unique identifier that can be used to resume the workflow. If not specified, a random id will be generated.

  • metadata – The metadata to add to the workflow. It has to be able to serialize to json.

Returns

The running result as ray.ObjectRef.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

Management API

ray.workflow.resume(workflow_id: str) Any[source]

Resume a workflow.

Resume a workflow and retrieve its output. If the workflow was incomplete, it will be re-executed from its checkpointed outputs. If the workflow was complete, returns the result immediately.

Examples

>>> from ray import workflow
>>> start_trip = ... 
>>> trip = start_trip.bind() 
>>> res1 = workflow.run_async(trip, workflow_id="trip1") 
>>> res2 = workflow.resume_async("trip1") 
>>> assert ray.get(res1) == ray.get(res2) 
Parameters

workflow_id – The id of the workflow to resume.

Returns

The output of the workflow.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

ray.workflow.resume_async(workflow_id: str) ray.ObjectRef[source]

Resume a workflow asynchronously.

Resume a workflow and retrieve its output. If the workflow was incomplete, it will be re-executed from its checkpointed outputs. If the workflow was complete, returns the result immediately.

Examples

>>> from ray import workflow
>>> start_trip = ... 
>>> trip = start_trip.bind() 
>>> res1 = workflow.run_async(trip, workflow_id="trip1") 
>>> res2 = workflow.resume_async("trip1") 
>>> assert ray.get(res1) == ray.get(res2) 
Parameters

workflow_id – The id of the workflow to resume.

Returns

An object reference that can be used to retrieve the workflow result.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

ray.workflow.resume_all(include_failed: bool = False) List[Tuple[str, ray.ObjectRef]][source]

Resume all resumable workflow jobs.

This can be used after cluster restart to resume all tasks.

Parameters

include_failed – Whether to resume FAILED workflows.

Examples

>>> from ray import workflow
>>> failed_job = ... 
>>> workflow_task = failed_job.bind() 
>>> output = workflow.run_async( 
...     workflow_task, workflow_id="failed_job")
>>> try: 
>>>     ray.get(output) 
>>> except Exception: 
>>>     print("JobFailed") 
>>> jobs = workflow.list_all() 
>>> assert jobs == [("failed_job", workflow.FAILED)] 
>>> assert workflow.resume_all( 
...    include_failed=True).get("failed_job") is not None 
Returns

A list of (workflow_id, returned_obj_ref) resumed.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

ray.workflow.list_all(status_filter: Optional[Union[ray.workflow.common.WorkflowStatus, str, Set[Union[ray.workflow.common.WorkflowStatus, str]]]] = None) List[Tuple[str, ray.workflow.common.WorkflowStatus]][source]

List all workflows matching a given status filter. When returning “RESUMEABLE” workflows, the workflows that was running ranks before the workflow that was pending in the result list.

Parameters

status_filter – If given, only returns workflow with that status. This can be a single status or set of statuses. The string form of the status is also acceptable, i.e., “RUNNING”/”FAILED”/”SUCCESSFUL”/”CANCELED”/”RESUMABLE”/”PENDING”.

Examples

>>> from ray import workflow
>>> long_running_job = ... 
>>> workflow_task = long_running_job.bind() 
>>> wf = workflow.run_async(workflow_task, 
...     workflow_id="long_running_job")
>>> jobs = workflow.list_all() 
>>> assert jobs == [ ("long_running_job", workflow.RUNNING) ] 
>>> ray.get(wf) 
>>> jobs = workflow.list_all({workflow.RUNNING}) 
>>> assert jobs == [] 
>>> jobs = workflow.list_all(workflow.SUCCESSFUL) 
>>> assert jobs == [ 
...     ("long_running_job", workflow.SUCCESSFUL)]
Returns

A list of tuple with workflow id and workflow status

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

ray.workflow.get_status(workflow_id: str) ray.workflow.common.WorkflowStatus[source]

Get the status for a given workflow.

Parameters

workflow_id – The workflow to query.

Examples

>>> from ray import workflow
>>> trip = ... 
>>> workflow_task = trip.bind() 
>>> output = workflow.run(workflow_task, workflow_id="trip") 
>>> assert workflow.SUCCESSFUL == workflow.get_status("trip") 
Returns

The status of that workflow

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

ray.workflow.get_output(workflow_id: str, *, task_id: Optional[str] = None) Any[source]

Get the output of a running workflow.

Parameters
  • workflow_id – The workflow to get the output of.

  • task_id – If set, fetch the specific task instead of the output of the workflow.

Examples

>>> from ray import workflow
>>> start_trip = ... 
>>> trip = start_trip.options(task_id="trip").bind() 
>>> res1 = workflow.run_async(trip, workflow_id="trip1") 
>>> # you could "get_output()" in another machine
>>> res2 = workflow.get_output_async("trip1") 
>>> assert ray.get(res1) == ray.get(res2) 
>>> task_output = workflow.get_output_async("trip1", "trip") 
>>> assert ray.get(task_output) == ray.get(res1) 
Returns

The output of the workflow task.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

ray.workflow.get_output_async(workflow_id: str, *, task_id: Optional[str] = None) ray.ObjectRef[source]

Get the output of a running workflow asynchronously.

Parameters
  • workflow_id – The workflow to get the output of.

  • task_id – If set, fetch the specific task output instead of the output of the workflow.

Returns

An object reference that can be used to retrieve the workflow task result.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

ray.workflow.get_metadata(workflow_id: str, task_id: Optional[str] = None) Dict[str, Any][source]

Get the metadata of the workflow.

This will return a dict of metadata of either the workflow ( if only workflow_id is given) or a specific workflow task (if both workflow_id and task id are given). Exception will be raised if the given workflow id or task id does not exist.

If only workflow id is given, this will return metadata on workflow level, which includes running status, workflow-level user metadata and workflow-level running stats (e.g. the start time and end time of the workflow).

If both workflow id and task id are given, this will return metadata on workflow task level, which includes task inputs, task-level user metadata and task-level running stats (e.g. the start time and end time of the task).

Parameters
  • workflow_id – The workflow to get the metadata of.

  • task_id – If set, fetch the metadata of the specific task instead of the metadata of the workflow.

Examples

>>> from ray import workflow
>>> trip = ... 
>>> workflow_task = trip.options( 
...     **workflow.options(task_id="trip", metadata={"k1": "v1"})).bind()
>>> workflow.run(workflow_task, 
...     workflow_id="trip1", metadata={"k2": "v2"})
>>> workflow_metadata = workflow.get_metadata("trip1") 
>>> assert workflow_metadata["status"] == "SUCCESSFUL" 
>>> assert workflow_metadata["user_metadata"] == {"k2": "v2"} 
>>> assert "start_time" in workflow_metadata["stats"] 
>>> assert "end_time" in workflow_metadata["stats"] 
>>> task_metadata = workflow.get_metadata("trip1", "trip") 
>>> assert task_metadata["task_type"] == "FUNCTION" 
>>> assert task_metadata["user_metadata"] == {"k1": "v1"} 
>>> assert "start_time" in task_metadata["stats"] 
>>> assert "end_time" in task_metadata["stats"] 
Returns

A dictionary containing the metadata of the workflow.

Raises

ValueError – if given workflow or workflow task does not exist.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

ray.workflow.cancel(workflow_id: str) None[source]
Cancel a workflow. Workflow checkpoints will still be saved in storage. To

clean up saved checkpoints, see workflow.delete().

Parameters

workflow_id – The workflow to cancel.

Examples

>>> from ray import workflow
>>> some_job = ... 
>>> workflow_task = some_job.bind() 
>>> output = workflow.run_async(workflow_task,  
...     workflow_id="some_job")
>>> workflow.cancel(workflow_id="some_job") 
>>> assert [ 
...     ("some_job", workflow.CANCELED)] == workflow.list_all()
Returns

None

PublicAPI (alpha): This API is in alpha and may change before becoming stable.