Workflows API Reference

Core API

ray.workflow.init(storage: Union[str, ray.workflow.storage.base.Storage, None] = None) → None[source]

Initialize workflow.

Parameters

storage – The external storage URL or a custom storage class. If not specified, /tmp/ray/workflow_data will be used.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.workflow.step(*args, **kwargs)[source]

A decorator used for creating workflow steps.

Examples

>>> @workflow.step
... def book_flight(origin: str, dest: str) -> Flight:
...    return Flight(...)
>>> @workflow.step(max_retries=3, catch_exceptions=True)
... def book_hotel(dest: str) -> Hotel:
...    return Hotel(...)

PublicAPI (beta): This API is in beta and may change before becoming stable.

class ray.workflow.common.Workflow(workflow_data: ray.workflow.common.WorkflowData, prepare_inputs: Optional[Callable] = None)[source]
property data

Get the workflow data.

run(workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) → Any[source]

Run a workflow.

If the workflow with the given id already exists, it will be resumed.

Examples:
>>> @workflow.step
... def book_flight(origin: str, dest: str) -> Flight:
...    return Flight(...)
>>> @workflow.step
... def book_hotel(location: str) -> Reservation:
...    return Reservation(...)
>>> @workflow.step
... def finalize_trip(bookings: List[Any]) -> Trip:
...    return Trip(...)
>>> flight1 = book_flight.step("OAK", "SAN")
>>> flight2 = book_flight.step("SAN", "OAK")
>>> hotel = book_hotel.step("SAN")
>>> trip = finalize_trip.step([flight1, flight2, hotel])
>>> result = trip.run()
Args:
workflow_id: A unique identifier that can be used to resume the

workflow. If not specified, a random id will be generated.

metadata: The metadata to add to the workflow. It has to be able

to serialize to json.

Returns:

The running result.

PublicAPI (beta): This API is in beta and may change before becoming stable.

run_async(workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) → <Mock name=’mock.ObjectRef’ id=’140204541774160’>[source]

Run a workflow asynchronously.

If the workflow with the given id already exists, it will be resumed.

Examples:
>>> @workflow.step
... def book_flight(origin: str, dest: str) -> Flight:
...    return Flight(...)
>>> @workflow.step
... def book_hotel(location: str) -> Reservation:
...    return Reservation(...)
>>> @workflow.step
... def finalize_trip(bookings: List[Any]) -> Trip:
...    return Trip(...)
>>> flight1 = book_flight.step("OAK", "SAN")
>>> flight2 = book_flight.step("SAN", "OAK")
>>> hotel = book_hotel.step("SAN")
>>> trip = finalize_trip.step([flight1, flight2, hotel])
>>> result = ray.get(trip.run_async())
Args:
workflow_id: A unique identifier that can be used to resume the

workflow. If not specified, a random id will be generated.

metadata: The metadata to add to the workflow. It has to be able

to serialize to json.

Returns:

The running result as ray.ObjectRef.

PublicAPI (beta): This API is in beta and may change before becoming stable.

Virtual Actors

ray.workflow.virtual_actor(_cls: type) → VirtualActorClass

A decorator used for creating a virtual actor based on a class.

The class that is based on must have the “__getstate__” and

“__setstate__” method.

Examples

>>> @workflow.virtual_actor
... class Counter:
... def __init__(self, x: int):
...     self.x = x
...
... # Mark a method as a readonly method. It would not modify the
... # state of the virtual actor.
... @workflow.virtual_actor.readonly
... def get(self):
...     return self.x
...
... def incr(self):
...     self.x += 1
...     return self.x
...
... def __getstate__(self):
...     return self.x
...
... def __setstate__(self, state):
...     self.x = state
...
... # Create and run a virtual actor.
... counter = Counter.get_or_create(actor_id="Counter", x=1)
... assert ray.get(counter.run(incr)) == 2

PublicAPI (beta): This API is in beta and may change before becoming stable.

class ray.workflow.virtual_actor_class.VirtualActorClass[source]

The virtual actor class used to create a virtual actor.

get_or_create(actor_id: str, *args, **kwargs) → ray.workflow.virtual_actor_class.VirtualActor[source]

Create an actor. See VirtualActorClassBase.create().

options() → ray.workflow.virtual_actor_class.VirtualActorClassBase[source]

Configures and overrides the actor instantiation parameters.

Management API

ray.workflow.resume_all(include_failed: bool = False) → Dict[str, <Mock name=’mock.ObjectRef’ id=’140204541774160’>][source]

Resume all resumable workflow jobs.

This can be used after cluster restart to resume all tasks.

Parameters

with_failed – Whether to resume FAILED workflows.

Examples

>>> workflow_step = failed_job.step()
>>> output = workflow_step.run_async(workflow_id="failed_job")
>>> try:
>>>     ray.get(output)
>>> except Exception:
>>>     print("JobFailed")
>>> jobs = workflow.list_all()
>>> assert jobs == [("failed_job", workflow.FAILED)]
>>> assert workflow.resume_all(
>>>   include_failed=True).get("failed_job") is not None
Returns

A list of (workflow_id, returned_obj_ref) resumed.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.workflow.resume(workflow_id: str) → <Mock name=’mock.ObjectRef’ id=’140204541774160’>[source]

Resume a workflow.

Resume a workflow and retrieve its output. If the workflow was incomplete, it will be re-executed from its checkpointed outputs. If the workflow was complete, returns the result immediately.

Examples

>>> trip = start_trip.step()
>>> res1 = trip.run_async(workflow_id="trip1")
>>> res2 = workflow.resume("trip1")
>>> assert ray.get(res1) == ray.get(res2)
Parameters

workflow_id – The id of the workflow to resume.

Returns

An object reference that can be used to retrieve the workflow result.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.workflow.list_all(status_filter: Union[ray.workflow.common.WorkflowStatus, str, Set[Union[ray.workflow.common.WorkflowStatus, str]], None] = None) → List[Tuple[str, ray.workflow.common.WorkflowStatus]][source]

List all workflows matching a given status filter.

Parameters

status – If given, only returns workflow with that status. This can be a single status or set of statuses. The string form of the status is also acceptable, i.e., “RUNNING”/”FAILED”/”SUCCESSFUL”/”CANCELED”/”RESUMABLE”.

Examples

>>> workflow_step = long_running_job.step()
>>> wf = workflow_step.run_async(workflow_id="long_running_job")
>>> jobs = workflow.list_all()
>>> assert jobs == [ ("long_running_job", workflow.RUNNING) ]
>>> ray.get(wf)
>>> jobs = workflow.list_all({workflow.RUNNING})
>>> assert jobs == []
>>> jobs = workflow.list_all(workflow.SUCCESSFUL)
>>> assert jobs == [ ("long_running_job", workflow.SUCCESSFUL) ]
Returns

A list of tuple with workflow id and workflow status

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.workflow.get_status(workflow_id: str) → ray.workflow.common.WorkflowStatus[source]

Get the status for a given workflow.

Parameters

workflow_id – The workflow to query.

Examples

>>> workflow_step = trip.step()
>>> output = workflow_step.run(workflow_id="trip")
>>> assert workflow.SUCCESSFUL == workflow.get_status("trip")
Returns

The status of that workflow

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.workflow.get_output(workflow_id: str, *, name: Optional[str] = None) → <Mock name=’mock.ObjectRef’ id=’140204541774160’>[source]

Get the output of a running workflow.

Parameters
  • workflow_id – The workflow to get the output of.

  • name – If set, fetch the specific step instead of the output of the workflow.

Examples

>>> trip = start_trip.options(name="trip").step()
>>> res1 = trip.run_async(workflow_id="trip1")
>>> # you could "get_output()" in another machine
>>> res2 = workflow.get_output("trip1")
>>> assert ray.get(res1) == ray.get(res2)
>>> step_output = workflow.get_output("trip1", "trip")
>>> assert ray.get(step_output) == ray.get(res1)
Returns

An object reference that can be used to retrieve the workflow result.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.workflow.get_actor(actor_id: str) → VirtualActor[source]

Get an virtual actor.

Parameters

actor_id – The ID of the actor.

Returns

A virtual actor.

PublicAPI (beta): This API is in beta and may change before becoming stable.

ray.workflow.cancel(workflow_id: str) → None[source]
Cancel a workflow. Workflow checkpoints will still be saved in storage. To

clean up saved checkpoints, see workflow.delete().

Parameters

workflow_id – The workflow to cancel.

Examples

>>> workflow_step = some_job.step()
>>> output = workflow_step.run_async(workflow_id="some_job")
>>> workflow.cancel(workflow_id="some_job")
>>> assert [("some_job", workflow.CANCELED)] == workflow.list_all()
Returns

None

PublicAPI (beta): This API is in beta and may change before becoming stable.