Ray Workflows API¶
Core API¶
- ray.workflow.init(storage: Optional[Union[str, ray.workflow.storage.base.Storage]] = None) None [source]¶
Initialize workflow.
- Parameters
storage – The external storage URL or a custom storage class. If not specified,
/tmp/ray/workflow_data
will be used.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- ray.workflow.step(*args, **kwargs)[source]¶
A decorator used for creating workflow steps.
Examples
>>> @workflow.step ... def book_flight(origin: str, dest: str) -> Flight: ... return Flight(...)
>>> @workflow.step(max_retries=3, catch_exceptions=True) ... def book_hotel(dest: str) -> Hotel: ... return Hotel(...)
PublicAPI (beta): This API is in beta and may change before becoming stable.
- class ray.workflow.common.Workflow(workflow_data: ray.workflow.common.WorkflowData, prepare_inputs: Optional[Callable] = None)[source]¶
This class represents a workflow.
It would either be a workflow that is not executed, or it is a reference to a running workflow when ‘workflow.ref’ is not None.
- property data: ray.workflow.common.WorkflowData¶
Get the workflow data.
- run(workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) Any [source]¶
Run a workflow.
If the workflow with the given id already exists, it will be resumed.
- Examples:
>>> @workflow.step ... def book_flight(origin: str, dest: str) -> Flight: ... return Flight(...)
>>> @workflow.step ... def book_hotel(location: str) -> Reservation: ... return Reservation(...)
>>> @workflow.step ... def finalize_trip(bookings: List[Any]) -> Trip: ... return Trip(...)
>>> flight1 = book_flight.step("OAK", "SAN") >>> flight2 = book_flight.step("SAN", "OAK") >>> hotel = book_hotel.step("SAN") >>> trip = finalize_trip.step([flight1, flight2, hotel]) >>> result = trip.run()
- Args:
- workflow_id: A unique identifier that can be used to resume the
workflow. If not specified, a random id will be generated.
- metadata: The metadata to add to the workflow. It has to be able
to serialize to json.
- Returns:
The running result.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- run_async(workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) <Mock name='mock.ObjectRef' id='140586090734992'> [source]¶
Run a workflow asynchronously.
If the workflow with the given id already exists, it will be resumed.
- Examples:
>>> @workflow.step ... def book_flight(origin: str, dest: str) -> Flight: ... return Flight(...)
>>> @workflow.step ... def book_hotel(location: str) -> Reservation: ... return Reservation(...)
>>> @workflow.step ... def finalize_trip(bookings: List[Any]) -> Trip: ... return Trip(...)
>>> flight1 = book_flight.step("OAK", "SAN") >>> flight2 = book_flight.step("SAN", "OAK") >>> hotel = book_hotel.step("SAN") >>> trip = finalize_trip.step([flight1, flight2, hotel]) >>> result = ray.get(trip.run_async())
- Args:
- workflow_id: A unique identifier that can be used to resume the
workflow. If not specified, a random id will be generated.
- metadata: The metadata to add to the workflow. It has to be able
to serialize to json.
- Returns:
The running result as ray.ObjectRef.
PublicAPI (beta): This API is in beta and may change before becoming stable.
Virtual Actors¶
- ray.workflow.virtual_actor() VirtualActorClass ¶
A decorator used for creating a virtual actor based on a class.
- The class that is based on must have the “__getstate__” and
“__setstate__” method.
Examples
>>> @workflow.virtual_actor ... class Counter: ... def __init__(self, x: int): ... self.x = x ... ... # Mark a method as a readonly method. It would not modify the ... # state of the virtual actor. ... @workflow.virtual_actor.readonly ... def get(self): ... return self.x ... ... def incr(self): ... self.x += 1 ... return self.x ... ... def __getstate__(self): ... return self.x ... ... def __setstate__(self, state): ... self.x = state ... ... # Create and run a virtual actor. ... counter = Counter.get_or_create(actor_id="Counter", x=1) ... assert ray.get(counter.run(incr)) == 2
PublicAPI (beta): This API is in beta and may change before becoming stable.
Management API¶
- ray.workflow.resume_all(include_failed: bool = False) Dict[str, <Mock name='mock.ObjectRef' id='140586090734992'>] [source]¶
Resume all resumable workflow jobs.
This can be used after cluster restart to resume all tasks.
- Parameters
with_failed – Whether to resume FAILED workflows.
Examples
>>> workflow_step = failed_job.step() >>> output = workflow_step.run_async(workflow_id="failed_job") >>> try: >>> ray.get(output) >>> except Exception: >>> print("JobFailed") >>> jobs = workflow.list_all() >>> assert jobs == [("failed_job", workflow.FAILED)] >>> assert workflow.resume_all( >>> include_failed=True).get("failed_job") is not None
- Returns
A list of (workflow_id, returned_obj_ref) resumed.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- ray.workflow.resume(workflow_id: str) <Mock name='mock.ObjectRef' id='140586090734992'> [source]¶
Resume a workflow.
Resume a workflow and retrieve its output. If the workflow was incomplete, it will be re-executed from its checkpointed outputs. If the workflow was complete, returns the result immediately.
Examples
>>> trip = start_trip.step() >>> res1 = trip.run_async(workflow_id="trip1") >>> res2 = workflow.resume("trip1") >>> assert ray.get(res1) == ray.get(res2)
- Parameters
workflow_id – The id of the workflow to resume.
- Returns
An object reference that can be used to retrieve the workflow result.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- ray.workflow.list_all(status_filter: Optional[Union[ray.workflow.common.WorkflowStatus, str, Set[Union[ray.workflow.common.WorkflowStatus, str]]]] = None) List[Tuple[str, ray.workflow.common.WorkflowStatus]] [source]¶
List all workflows matching a given status filter.
- Parameters
status – If given, only returns workflow with that status. This can be a single status or set of statuses. The string form of the status is also acceptable, i.e., “RUNNING”/”FAILED”/”SUCCESSFUL”/”CANCELED”/”RESUMABLE”.
Examples
>>> workflow_step = long_running_job.step() >>> wf = workflow_step.run_async(workflow_id="long_running_job") >>> jobs = workflow.list_all() >>> assert jobs == [ ("long_running_job", workflow.RUNNING) ] >>> ray.get(wf) >>> jobs = workflow.list_all({workflow.RUNNING}) >>> assert jobs == [] >>> jobs = workflow.list_all(workflow.SUCCESSFUL) >>> assert jobs == [ ("long_running_job", workflow.SUCCESSFUL) ]
- Returns
A list of tuple with workflow id and workflow status
PublicAPI (beta): This API is in beta and may change before becoming stable.
- ray.workflow.get_status(workflow_id: str) ray.workflow.common.WorkflowStatus [source]¶
Get the status for a given workflow.
- Parameters
workflow_id – The workflow to query.
Examples
>>> workflow_step = trip.step() >>> output = workflow_step.run(workflow_id="trip") >>> assert workflow.SUCCESSFUL == workflow.get_status("trip")
- Returns
The status of that workflow
PublicAPI (beta): This API is in beta and may change before becoming stable.
- ray.workflow.get_output(workflow_id: str, *, name: Optional[str] = None) <Mock name='mock.ObjectRef' id='140586090734992'> [source]¶
Get the output of a running workflow.
- Parameters
workflow_id – The workflow to get the output of.
name – If set, fetch the specific step instead of the output of the workflow.
Examples
>>> trip = start_trip.options(name="trip").step() >>> res1 = trip.run_async(workflow_id="trip1") >>> # you could "get_output()" in another machine >>> res2 = workflow.get_output("trip1") >>> assert ray.get(res1) == ray.get(res2) >>> step_output = workflow.get_output("trip1", "trip") >>> assert ray.get(step_output) == ray.get(res1)
- Returns
An object reference that can be used to retrieve the workflow result.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- ray.workflow.get_metadata(workflow_id: str, name: Optional[str] = None) Dict[str, Any] [source]¶
Get the metadata of the workflow.
This will return a dict of metadata of either the workflow ( if only workflow_id is given) or a specific workflow step (if both workflow_id and step name are given). Exception will be raised if the given workflow id or step name does not exist.
If only workflow id is given, this will return metadata on workflow level, which includes running status, workflow-level user metadata and workflow-level running stats (e.g. the start time and end time of the workflow).
If both workflow id and step name are given, this will return metadata on workflow step level, which includes step inputs, step-level user metadata and step-level running stats (e.g. the start time and end time of the step).
- Parameters
workflow_id – The workflow to get the metadata of.
name – If set, fetch the metadata of the specific step instead of the metadata of the workflow.
Examples
>>> workflow_step = trip.options( ... name="trip", metadata={"k1": "v1"}).step() >>> workflow_step.run(workflow_id="trip1", metadata={"k2": "v2"}) >>> workflow_metadata = workflow.get_metadata("trip1") >>> assert workflow_metadata["status"] == "SUCCESSFUL" >>> assert workflow_metadata["user_metadata"] == {"k2": "v2"} >>> assert "start_time" in workflow_metadata["stats"] >>> assert "end_time" in workflow_metadata["stats"] >>> step_metadata = workflow.get_metadata("trip1", "trip") >>> assert step_metadata["step_type"] == "FUNCTION" >>> assert step_metadata["user_metadata"] == {"k1": "v1"} >>> assert "start_time" in step_metadata["stats"] >>> assert "end_time" in step_metadata["stats"]
- Returns
A dictionary containing the metadata of the workflow.
- Raises
ValueError – if given workflow or workflow step does not exist.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- ray.workflow.get_actor(actor_id: str) VirtualActor [source]¶
Get an virtual actor.
- Parameters
actor_id – The ID of the actor.
- Returns
A virtual actor.
PublicAPI (beta): This API is in beta and may change before becoming stable.
- ray.workflow.cancel(workflow_id: str) None [source]¶
- Cancel a workflow. Workflow checkpoints will still be saved in storage. To
clean up saved checkpoints, see workflow.delete().
- Parameters
workflow_id – The workflow to cancel.
Examples
>>> workflow_step = some_job.step() >>> output = workflow_step.run_async(workflow_id="some_job") >>> workflow.cancel(workflow_id="some_job") >>> assert [("some_job", workflow.CANCELED)] == workflow.list_all()
- Returns
None
PublicAPI (beta): This API is in beta and may change before becoming stable.