Ray Workflows API

Core API

ray.workflow.step(*args, **kwargs)[source]

A decorator used for creating workflow steps.

Examples

>>> from ray import workflow
>>> Flight, Hotel = ... 
>>> @workflow.step 
... def book_flight(origin: str, dest: str) -> Flight: 
...    return Flight(...) 
>>> @workflow.step(max_retries=3, catch_exceptions=True) 
... def book_hotel(dest: str) -> Hotel: 
...    return Hotel(...) 

PublicAPI (beta): This API is in beta and may change before becoming stable.

class ray.workflow.common.Workflow(workflow_data: ray.workflow.common.WorkflowData, prepare_inputs: Optional[Callable] = None)[source]

This class represents a workflow.

It would either be a workflow that is not executed, or it is a reference to a running workflow when ‘workflow.ref’ is not None.

property data: ray.workflow.common.WorkflowData

Get the workflow data.

run(workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) Any[source]

Run a workflow.

If the workflow with the given id already exists, it will be resumed.

Examples:
>>> from ray import workflow
>>> Flight, Reservation, Trip = ... 
>>> @workflow.step 
... def book_flight(origin: str, dest: str) -> Flight: 
...    return Flight(...) 
>>> @workflow.step 
... def book_hotel(location: str) -> Reservation: 
...    return Reservation(...) 
>>> @workflow.step 
... def finalize_trip(bookings: List[Any]) -> Trip: 
...    return Trip(...) 
>>> flight1 = book_flight.step("OAK", "SAN") 
>>> flight2 = book_flight.step("SAN", "OAK") 
>>> hotel = book_hotel.step("SAN") 
>>> trip = finalize_trip.step([flight1, flight2, hotel]) 
>>> result = trip.run() 
Args:
workflow_id: A unique identifier that can be used to resume the

workflow. If not specified, a random id will be generated.

metadata: The metadata to add to the workflow. It has to be able

to serialize to json.

Returns:

The running result.

PublicAPI (beta): This API is in beta and may change before becoming stable.

run_async(workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) <MagicMock name='mock.ObjectRef' id='139672658574032'>[source]

Run a workflow asynchronously.

If the workflow with the given id already exists, it will be resumed.

Examples:
>>> from ray import workflow
>>> Flight, Reservation, Trip = ... 
>>> @workflow.step 
... def book_flight(origin: str, dest: str) -> Flight: 
...    return Flight(...) 
>>> @workflow.step 
... def book_hotel(location: str) -> Reservation: 
...    return Reservation(...) 
>>> @workflow.step 
... def finalize_trip(bookings: List[Any]) -> Trip: 
...    return Trip(...) 
>>> flight1 = book_flight.step("OAK", "SAN") 
>>> flight2 = book_flight.step("SAN", "OAK") 
>>> hotel = book_hotel.step("SAN") 
>>> trip = finalize_trip.step([flight1, flight2, hotel]) 
>>> result = ray.get(trip.run_async()) 
Args:
workflow_id: A unique identifier that can be used to resume the

workflow. If not specified, a random id will be generated.

metadata: The metadata to add to the workflow. It has to be able

to serialize to json.

Returns:

The running result as ray.ObjectRef.

PublicAPI (beta): This API is in beta and may change before becoming stable.

Virtual Actors

ray.workflow.virtual_actor() VirtualActorClass

A decorator used for creating a virtual actor based on a class.

The class that is based on must have the “__getstate__” and

“__setstate__” method.

Examples

>>> from ray import workflow
>>> @workflow.virtual_actor
... class Counter:
...     def __init__(self, x: int):
...         self.x = x
...
...     # Mark a method as a readonly method. It would not modify the
...     # state of the virtual actor.
...     @workflow.virtual_actor.readonly
...     def get(self):
...         return self.x
...
...     def incr(self):
...         self.x += 1
...         return self.x
...
...     def __getstate__(self):
...         return self.x
...
...     def __setstate__(self, state):
...         self.x = state
...
... # Create and run a virtual actor.
... counter = Counter.get_or_create(actor_id="Counter", x=1) 
... assert ray.get(counter.run(incr)) == 2 

PublicAPI (beta): This API is in beta and may change before becoming stable.

class ray.workflow.virtual_actor_class.VirtualActorClass[source]

The virtual actor class used to create a virtual actor.

get_or_create(actor_id: str, *args, **kwargs) ray.workflow.virtual_actor_class.VirtualActor[source]

Create an actor. See VirtualActorClassBase.create().

options() ray.workflow.virtual_actor_class.VirtualActorClassBase[source]

Configures and overrides the actor instantiation parameters.

Management API

ray.workflow.resume_all(include_failed: bool = False) Dict[str, <MagicMock name='mock.ObjectRef' id='139672658574032'>][source]

Resume all resumable workflow jobs.

This can be used after cluster restart to resume all tasks.

Parameters

with_failed – Whether to resume FAILED workflows.

Examples

>>> from ray import workflow
>>> failed_job = ... 
>>> workflow_step = failed_job.step() 
>>> output = workflow_step.run_async(workflow_id="failed_job") 
>>> try: 
>>>     ray.get(output) 
>>> except Exception: 
>>>     print("JobFailed") 
>>> jobs = workflow.list_all() 
>>> assert jobs == [("failed_job", workflow.FAILED)] 
>>> assert workflow.resume_all( 
...    include_failed=True).get("failed_job") is not None 
Returns

A list of (workflow_id, returned_obj_ref) resumed.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.workflow.resume(workflow_id: str) <MagicMock name='mock.ObjectRef' id='139672658574032'>[source]

Resume a workflow.

Resume a workflow and retrieve its output. If the workflow was incomplete, it will be re-executed from its checkpointed outputs. If the workflow was complete, returns the result immediately.

Examples

>>> from ray import workflow
>>> start_trip = ... 
>>> trip = start_trip.step() 
>>> res1 = trip.run_async(workflow_id="trip1") 
>>> res2 = workflow.resume("trip1") 
>>> assert ray.get(res1) == ray.get(res2) 
Parameters

workflow_id – The id of the workflow to resume.

Returns

An object reference that can be used to retrieve the workflow result.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.workflow.list_all(status_filter: Optional[Union[ray.workflow.common.WorkflowStatus, str, Set[Union[ray.workflow.common.WorkflowStatus, str]]]] = None) List[Tuple[str, ray.workflow.common.WorkflowStatus]][source]

List all workflows matching a given status filter.

Parameters

status – If given, only returns workflow with that status. This can be a single status or set of statuses. The string form of the status is also acceptable, i.e., “RUNNING”/”FAILED”/”SUCCESSFUL”/”CANCELED”/”RESUMABLE”.

Examples

>>> from ray import workflow
>>> long_running_job = ... 
>>> workflow_step = long_running_job.step() 
>>> wf = workflow_step.run_async( 
...     workflow_id="long_running_job")
>>> jobs = workflow.list_all() 
>>> assert jobs == [ ("long_running_job", workflow.RUNNING) ] 
>>> ray.get(wf) 
>>> jobs = workflow.list_all({workflow.RUNNING}) 
>>> assert jobs == [] 
>>> jobs = workflow.list_all(workflow.SUCCESSFUL) 
>>> assert jobs == [ 
...     ("long_running_job", workflow.SUCCESSFUL)]
Returns

A list of tuple with workflow id and workflow status

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.workflow.get_status(workflow_id: str) ray.workflow.common.WorkflowStatus[source]

Get the status for a given workflow.

Parameters

workflow_id – The workflow to query.

Examples

>>> from ray import workflow
>>> trip = ... 
>>> workflow_step = trip.step() 
>>> output = workflow_step.run(workflow_id="trip") 
>>> assert workflow.SUCCESSFUL == workflow.get_status("trip") 
Returns

The status of that workflow

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.workflow.get_output(workflow_id: str, *, name: Optional[str] = None) <MagicMock name='mock.ObjectRef' id='139672658574032'>[source]

Get the output of a running workflow.

Parameters
  • workflow_id – The workflow to get the output of.

  • name – If set, fetch the specific step instead of the output of the workflow.

Examples

>>> from ray import workflow
>>> start_trip = ... 
>>> trip = start_trip.options(name="trip").step() 
>>> res1 = trip.run_async(workflow_id="trip1") 
>>> # you could "get_output()" in another machine
>>> res2 = workflow.get_output("trip1") 
>>> assert ray.get(res1) == ray.get(res2) 
>>> step_output = workflow.get_output("trip1", "trip") 
>>> assert ray.get(step_output) == ray.get(res1) 
Returns

An object reference that can be used to retrieve the workflow result.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.workflow.get_metadata(workflow_id: str, name: Optional[str] = None) Dict[str, Any][source]

Get the metadata of the workflow.

This will return a dict of metadata of either the workflow ( if only workflow_id is given) or a specific workflow step (if both workflow_id and step name are given). Exception will be raised if the given workflow id or step name does not exist.

If only workflow id is given, this will return metadata on workflow level, which includes running status, workflow-level user metadata and workflow-level running stats (e.g. the start time and end time of the workflow).

If both workflow id and step name are given, this will return metadata on workflow step level, which includes step inputs, step-level user metadata and step-level running stats (e.g. the start time and end time of the step).

Parameters
  • workflow_id – The workflow to get the metadata of.

  • name – If set, fetch the metadata of the specific step instead of the metadata of the workflow.

Examples

>>> from ray import workflow
>>> trip = ... 
>>> workflow_step = trip.options( 
...     name="trip", metadata={"k1": "v1"}).step()
>>> workflow_step.run( 
...     workflow_id="trip1", metadata={"k2": "v2"})
>>> workflow_metadata = workflow.get_metadata("trip1") 
>>> assert workflow_metadata["status"] == "SUCCESSFUL" 
>>> assert workflow_metadata["user_metadata"] == {"k2": "v2"} 
>>> assert "start_time" in workflow_metadata["stats"] 
>>> assert "end_time" in workflow_metadata["stats"] 
>>> step_metadata = workflow.get_metadata("trip1", "trip") 
>>> assert step_metadata["step_type"] == "FUNCTION" 
>>> assert step_metadata["user_metadata"] == {"k1": "v1"} 
>>> assert "start_time" in step_metadata["stats"] 
>>> assert "end_time" in step_metadata["stats"] 
Returns

A dictionary containing the metadata of the workflow.

Raises

ValueError – if given workflow or workflow step does not exist.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.workflow.get_actor(actor_id: str) VirtualActor[source]

Get an virtual actor.

Parameters

actor_id – The ID of the actor.

Returns

A virtual actor.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.workflow.cancel(workflow_id: str) None[source]
Cancel a workflow. Workflow checkpoints will still be saved in storage. To

clean up saved checkpoints, see workflow.delete().

Parameters

workflow_id – The workflow to cancel.

Examples

>>> from ray import workflow
>>> some_job = ... 
>>> workflow_step = some_job.step() 
>>> output = workflow_step.run_async(workflow_id="some_job") 
>>> workflow.cancel(workflow_id="some_job") 
>>> assert [ 
...     ("some_job", workflow.CANCELED)] == workflow.list_all()
Returns

None

PublicAPI (beta): This API is in beta and may change before becoming stable.