ray.workflow.run(dag: ray.dag.dag_node.DAGNode, *args, workflow_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs) Any[source]#

Run a workflow.

If the workflow with the given id already exists, it will be resumed.


>>> import ray
>>> from ray import workflow
>>> Flight, Reservation, Trip = ... 
>>> @ray.remote 
... def book_flight(origin: str, dest: str) -> Flight: 
...    return Flight(...) 
>>> @ray.remote 
... def book_hotel(location: str) -> Reservation: 
...    return Reservation(...) 
>>> @ray.remote 
... def finalize_trip(bookings: List[Any]) -> Trip: 
...    return Trip(...) 
>>> flight1 = book_flight.bind("OAK", "SAN") 
>>> flight2 = book_flight.bind("SAN", "OAK") 
>>> hotel = book_hotel.bind("SAN") 
>>> trip = finalize_trip.bind([flight1, flight2, hotel]) 
>>> result = workflow.run(trip) 
  • workflow_id – A unique identifier that can be used to resume the workflow. If not specified, a random id will be generated.

  • metadata – The metadata to add to the workflow. It has to be able to serialize to json.


The running result.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.