ray.workflow.run#

ray.workflow.run(dag: DAGNode, *args, workflow_id: str | None = None, metadata: Dict[str, Any] | None = None, **kwargs) Any[source]#

Run a workflow.

If the workflow with the given id already exists, it will be resumed.

Examples

import ray
from ray import workflow

@ray.remote
def book_flight(origin: str, dest: str):
   return f"Flight: {origin}->{dest}"

@ray.remote
def book_hotel(location: str):
   return f"Hotel: {location}"

@ray.remote
def finalize_trip(bookings: List[Any]):
   return ' | '.join(ray.get(bookings))

flight1 = book_flight.bind("OAK", "SAN")
flight2 = book_flight.bind("SAN", "OAK")
hotel = book_hotel.bind("SAN")
trip = finalize_trip.bind([flight1, flight2, hotel])
print(workflow.run(trip))
Flight: OAK->SAN | Flight: SAN->OAK | Hotel: SAN
Parameters:
  • workflow_id – A unique identifier that can be used to resume the workflow. If not specified, a random id will be generated.

  • metadata – The metadata to add to the workflow. It has to be able to serialize to json.

Returns:

The running result.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.