API Comparisons#
Comparison between Ray Core APIs and Workflows#
Ray Workflows is built on top of Ray, and offers a mostly consistent subset of its API while providing durability. This section highlights some of the differences:
func.remote
vs func.bind
#
With Ray tasks, func.remote
will submit a remote task to run eagerly; func.bind
will generate
a node in a DAG, it will not be executed until the DAG is been executed.
Under the context of Ray Workflow, the execution of the DAG is deferred until workflow.run(dag, workflow_id=...)
or workflow.run_async(dag, workflow_id=...)
is called on the DAG.
Specifying the workflow id allows for resuming of the workflow by its id in case of cluster failure.
Other Workflow Engines#
Note: these comparisons are inspired by the Serverless workflows comparisons repo.
Argo API Comparison#
The original source of these comparisons can be found here.
Conditionals#
# https://github.com/argoproj/argo-workflows/tree/master/examples#conditionals
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: coinflip-
spec:
entrypoint: coinflip
templates:
- name: coinflip
steps:
# flip a coin
- - name: flip-coin
template: flip-coin
# evaluate the result in parallel
- - name: heads
template: heads # call heads template if "heads"
when: "{{steps.flip-coin.outputs.result}} == heads"
- name: tails
template: tails # call tails template if "tails"
when: "{{steps.flip-coin.outputs.result}} == tails"
- - name: flip-again
template: flip-coin
- - name: complex-condition
template: heads-tails-or-twice-tails
# call heads template if first flip was "heads" and second was "tails" OR both were "tails"
when: >-
( {{steps.flip-coin.outputs.result}} == heads &&
{{steps.flip-again.outputs.result}} == tails
) ||
( {{steps.flip-coin.outputs.result}} == tails &&
{{steps.flip-again.outputs.result}} == tails )
- name: heads-regex
template: heads # call heads template if ~ "hea"
when: "{{steps.flip-again.outputs.result}} =~ hea"
- name: tails-regex
template: tails # call heads template if ~ "tai"
when: "{{steps.flip-again.outputs.result}} =~ tai"
# Return heads or tails based on a random number
- name: flip-coin
script:
image: python:alpine3.6
command: [python]
source: |
import random
result = "heads" if random.randint(0,1) == 0 else "tails"
print(result)
- name: heads
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was heads\""]
- name: tails
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was tails\""]
- name: heads-tails-or-twice-tails
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was heads the first flip and tails the second. Or it was two times tails.\""]
import ray
from ray import workflow
@ray.remote
def handle_heads() -> str:
return "It was heads"
@ray.remote
def handle_tails() -> str:
return "It was tails"
@ray.remote
def flip_coin() -> str:
import random
@ray.remote
def decide(heads: bool) -> str:
return workflow.continuation(
handle_heads.bind() if heads else handle_tails.bind()
)
return workflow.continuation(decide.bind(random.random() > 0.5))
if __name__ == "__main__":
print(workflow.run(flip_coin.bind()))
DAG#
# https://github.com/argoproj/argo-workflows/tree/master/examples#dag
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-diamond-
spec:
entrypoint: diamond
templates:
- name: echo
inputs:
parameters:
- name: message
container:
image: alpine:3.7
command: [echo, "{{inputs.parameters.message}}"]
- name: diamond
dag:
tasks:
- name: A
template: echo
arguments:
parameters: [{name: message, value: A}]
- name: B
dependencies: [A]
template: echo
arguments:
parameters: [{name: message, value: B}]
- name: C
dependencies: [A]
template: echo
arguments:
parameters: [{name: message, value: C}]
- name: D
dependencies: [B, C]
template: echo
arguments:
parameters: [{name: message, value: D}]
import ray
from ray import workflow
@ray.remote
def echo(msg: str, *deps) -> None:
print(msg)
if __name__ == "__main__":
A = echo.options(**workflow.options(task_id="A")).bind("A")
B = echo.options(**workflow.options(task_id="B")).bind("B", A)
C = echo.options(**workflow.options(task_id="C")).bind("C", A)
D = echo.options(**workflow.options(task_id="D")).bind("D", A, B)
workflow.run(D)
Multi-step Workflow#
# https://github.com/argoproj/argo-workflows/tree/master/examples#steps
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: steps-
spec:
entrypoint: hello-hello-hello
# This spec contains two templates: hello-hello-hello and whalesay
templates:
- name: hello-hello-hello
# Instead of just running a container
# This template has a sequence of steps
steps:
- - name: hello1 # hello1 is run before the following steps
template: whalesay
arguments:
parameters:
- name: message
value: "hello1"
- - name: hello2a # double dash => run after previous step
template: whalesay
arguments:
parameters:
- name: message
value: "hello2a"
- name: hello2b # single dash => run in parallel with previous step
template: whalesay
arguments:
parameters:
- name: message
value: "hello2b"
# This is the same template as from the previous example
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
import ray
from ray import workflow
@ray.remote
def hello(msg: str, *deps) -> None:
print(msg)
@ray.remote
def wait_all(*args) -> None:
pass
if __name__ == "__main__":
h1 = hello.options(**workflow.options(task_id="hello1")).bind("hello1")
h2a = hello.options(**workflow.options(task_id="hello2a")).bind("hello2a")
h2b = hello.options(**workflow.options(task_id="hello2b")).bind("hello2b", h2a)
workflow.run(wait_all.bind(h1, h2b))
Exit Handler#
# https://github.com/argoproj/argo-workflows/tree/master/examples#exit-handlers
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: exit-handlers-
spec:
entrypoint: intentional-fail
onExit: exit-handler # invoke exit-handler template at end of the workflow
templates:
# primary workflow template
- name: intentional-fail
container:
image: alpine:latest
command: [sh, -c]
args: ["echo intentional failure; exit 1"]
# Exit handler templates
# After the completion of the entrypoint template, the status of the
# workflow is made available in the global variable {{workflow.status}}.
# {{workflow.status}} will be one of: Succeeded, Failed, Error
- name: exit-handler
steps:
- - name: notify
template: send-email
- name: celebrate
template: celebrate
when: "{{workflow.status}} == Succeeded"
- name: cry
template: cry
when: "{{workflow.status}} != Succeeded"
- name: send-email
container:
image: alpine:latest
command: [sh, -c]
args: ["echo send e-mail: {{workflow.name}} {{workflow.status}} {{workflow.duration}}"]
- name: celebrate
container:
image: alpine:latest
command: [sh, -c]
args: ["echo hooray!"]
- name: cry
container:
image: alpine:latest
command: [sh, -c]
args: ["echo boohoo!"]
from typing import Tuple, Optional
import ray
from ray import workflow
@ray.remote
def intentional_fail() -> str:
raise RuntimeError("oops")
@ray.remote
def cry(error: Exception) -> None:
print("Sadly", error)
@ray.remote
def celebrate(result: str) -> None:
print("Success!", result)
@ray.remote
def send_email(result: str) -> None:
print("Sending email", result)
@ray.remote
def exit_handler(res: Tuple[Optional[str], Optional[Exception]]) -> None:
result, error = res
email = send_email.bind(f"Raw result: {result}, {error}")
if error:
handler = cry.bind(error)
else:
handler = celebrate.bind(result)
return workflow.continuation(wait_all.bind(handler, email))
@ray.remote
def wait_all(*deps):
return "done"
if __name__ == "__main__":
res = intentional_fail.options(**workflow.options(catch_exceptions=True)).bind()
print(workflow.run(exit_handler.bind(res)))
Loops#
# https://github.com/argoproj/argo-workflows/tree/master/examples#loops
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: loops-
spec:
entrypoint: loop-example
templates:
- name: loop-example
steps:
- - name: print-message
template: whalesay
arguments:
parameters:
- name: message
value: "{{item}}"
withItems: # invoke whalesay once for each item in parallel
- hello world # item 1
- goodbye world # item 2
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
import ray
from ray import workflow
@ray.remote
def hello(msg: str) -> None:
print(msg)
@ray.remote
def wait_all(*args) -> None:
pass
if __name__ == "__main__":
children = []
for msg in ["hello world", "goodbye world"]:
children.append(hello.bind(msg))
workflow.run(wait_all.bind(*children))
Recursion#
# https://github.com/argoproj/argo-workflows/tree/master/examples#recursion
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: coinflip-recursive-
spec:
entrypoint: coinflip
templates:
- name: coinflip
steps:
# flip a coin
- - name: flip-coin
template: flip-coin
# evaluate the result in parallel
- - name: heads
template: heads # call heads template if "heads"
when: "{{steps.flip-coin.outputs.result}} == heads"
- name: tails # keep flipping coins if "tails"
template: coinflip
when: "{{steps.flip-coin.outputs.result}} == tails"
- name: flip-coin
script:
image: python:alpine3.6
command: [python]
source: |
import random
result = "heads" if random.randint(0,1) == 0 else "tails"
print(result)
- name: heads
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was heads\""]
import ray
from ray import workflow
@ray.remote
def handle_heads() -> str:
return "It was heads"
@ray.remote
def handle_tails() -> str:
print("It was tails, retrying")
return workflow.continuation(flip_coin.bind())
@ray.remote
def flip_coin() -> str:
import random
@ray.remote
def decide(heads: bool) -> str:
if heads:
return workflow.continuation(handle_heads.bind())
else:
return workflow.continuation(handle_tails.bind())
return workflow.continuation(decide.bind(random.random() > 0.5))
if __name__ == "__main__":
print(workflow.run(flip_coin.bind()))
Retries#
# https://github.com/argoproj/argo-workflows/tree/master/examples#retrying-failed-or-errored-steps
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: retry-backoff-
spec:
entrypoint: retry-backoff
templates:
- name: retry-backoff
retryStrategy:
limit: 10
retryPolicy: "Always"
backoff:
duration: "1" # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
factor: 2
maxDuration: "1m" # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
affinity:
nodeAntiAffinity: {}
container:
image: python:alpine3.6
command: ["python", -c]
# fail with a 66% probability
args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
from typing import Any, Tuple, Optional
import ray
from ray import workflow
@ray.remote
def flaky_step() -> str:
import random
if random.choice([0, 1, 1]) != 0:
raise ValueError("oops")
return "ok"
@ray.remote
def custom_retry_strategy(func: Any, num_retries: int, delay_s: int) -> str:
import time
@ray.remote
def handle_result(res: Tuple[Optional[str], Optional[Exception]]) -> str:
result, error = res
if result:
return res
elif num_retries <= 0:
raise error
else:
print("Retrying exception after delay", error)
time.sleep(delay_s)
return workflow.continuation(
custom_retry_strategy.bind(func, num_retries - 1, delay_s)
)
res = func.options(**workflow.options(catch_exceptions=True)).bind()
return workflow.continuation(handle_result.bind(res))
if __name__ == "__main__":
# Default retry strategy.
print(
workflow.run(flaky_step.options(max_retries=10, retry_exceptions=True).bind())
)
# Custom strategy.
print(workflow.run(custom_retry_strategy.bind(flaky_step, 10, 1)))
Metaflow API Comparison#
The original source of these comparisons can be found here.
Foreach#
# https://docs.metaflow.org/metaflow/basics#foreach
from metaflow import FlowSpec, step
class ForeachFlow(FlowSpec):
@step
def start(self):
self.titles = ["Stranger Things", "House of Cards", "Narcos"]
self.next(self.a, foreach="titles")
@step
def a(self):
self.title = "%s processed" % self.input
self.next(self.join)
@step
def join(self, inputs):
self.results = [input.title for input in inputs]
self.next(self.end)
@step
def end(self):
print("\n".join(self.results))
if __name__ == "__main__":
ForeachFlow()
from typing import List
import ray
from ray import workflow
@ray.remote
def start():
titles = ["Stranger Things", "House of Cards", "Narcos"]
children = [a.bind(t) for t in titles]
return workflow.continuation(end.bind(children))
@ray.remote
def a(title: str) -> str:
return f"{title} processed"
@ray.remote
def end(results: "List[ray.ObjectRef[str]]") -> str:
return "\n".join(ray.get(results))
if __name__ == "__main__":
workflow.run(start.bind())
Cadence API Comparison#
The original source of these comparisons can be found here.
Sub Workflows#
// https://github.com/uber/cadence-java-samples/blob/master/src/main/java/com/uber/cadence/samples/hello/HelloChild.java
public static class GreetingWorkflowImpl implements GreetingWorkflow {
@Override
public String getGreeting(String name) {
// Workflows are stateful. So a new stub must be created for each new child.
GreetingChild child = Workflow.newChildWorkflowStub(GreetingChild.class);
// This is a non blocking call that returns immediately.
// Use child.composeGreeting("Hello", name) to call synchronously.
Promise<String> greeting = Async.function(child::composeGreeting, "Hello", name);
// Do something else here.
return greeting.get(); // blocks waiting for the child to complete.
}
// This example shows how parent workflow return right after starting a child workflow,
// and let the child run itself.
private String demoAsyncChildRun(String name) {
GreetingChild child = Workflow.newChildWorkflowStub(GreetingChild.class);
// non blocking call that initiated child workflow
Async.function(child::composeGreeting, "Hello", name);
// instead of using greeting.get() to block till child complete,
// sometimes we just want to return parent immediately and keep child running
Promise<WorkflowExecution> childPromise = Workflow.getWorkflowExecution(child);
childPromise.get(); // block until child started,
// otherwise child may not start because parent complete first.
return "let child run, parent just return";
}
public static void main(String[] args) {
// Start a worker that hosts both parent and child workflow implementations.
Worker.Factory factory = new Worker.Factory(DOMAIN);
Worker worker = factory.newWorker(TASK_LIST);
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class, GreetingChildImpl.class);
// Start listening to the workflow task list.
factory.start();
// Start a workflow execution. Usually this is done from another program.
WorkflowClient workflowClient = WorkflowClient.newInstance(DOMAIN);
// Get a workflow stub using the same task list the worker uses.
GreetingWorkflow workflow = workflowClient.newWorkflowStub(GreetingWorkflow.class);
// Execute a workflow waiting for it to complete.
String greeting = workflow.getGreeting("World");
System.out.println(greeting);
System.exit(0);
}
}
import ray
from ray import workflow
@ray.remote
def compose_greeting(greeting: str, name: str) -> str:
return greeting + ": " + name
@ray.remote
def main_workflow(name: str) -> str:
return workflow.continuation(compose_greeting.bind("Hello", name))
if __name__ == "__main__":
print(workflow.run(main_workflow.bind("Alice")))
File Processing#
// https://github.com/uber/cadence-java-samples/tree/master/src/main/java/com/uber/cadence/samples/fileprocessing
public class FileProcessingWorkflowImpl implements FileProcessingWorkflow {
// Uses the default task list shared by the pool of workers.
private final StoreActivities defaultTaskListStore;
public FileProcessingWorkflowImpl() {
// Create activity clients.
ActivityOptions ao =
new ActivityOptions.Builder()
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.setTaskList(FileProcessingWorker.TASK_LIST)
.build();
this.defaultTaskListStore = Workflow.newActivityStub(StoreActivities.class, ao);
}
@Override
public void processFile(URL source, URL destination) {
RetryOptions retryOptions =
new RetryOptions.Builder()
.setExpiration(Duration.ofSeconds(10))
.setInitialInterval(Duration.ofSeconds(1))
.build();
// Retries the whole sequence on any failure, potentially on a different host.
Workflow.retry(retryOptions, () -> processFileImpl(source, destination));
}
private void processFileImpl(URL source, URL destination) {
StoreActivities.TaskListFileNamePair downloaded = defaultTaskListStore.download(source);
// Now initialize stubs that are specific to the returned task list.
ActivityOptions hostActivityOptions =
new ActivityOptions.Builder()
.setTaskList(downloaded.getHostTaskList())
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.build();
StoreActivities hostSpecificStore =
Workflow.newActivityStub(StoreActivities.class, hostActivityOptions);
// Call processFile activity to zip the file.
// Call the activity to process the file using worker-specific task list.
String processed = hostSpecificStore.process(downloaded.getFileName());
// Call upload activity to upload the zipped file.
hostSpecificStore.upload(processed, destination);
}
}
from typing import List
import ray
from ray import workflow
FILES_TO_PROCESS = ["file-{}".format(i) for i in range(100)]
# Mock method to download a file.
def download(url: str) -> str:
return "contents" * 10000
# Mock method to process a file.
def process(contents: str) -> str:
return "processed: " + contents
# Mock method to upload a file.
def upload(contents: str) -> None:
pass
@ray.remote
def upload_all(file_contents: List[ray.ObjectRef]) -> None:
@ray.remote
def upload_one(contents: str) -> None:
upload(contents)
children = [upload_one.bind(f) for f in file_contents]
@ray.remote
def wait_all(*deps) -> None:
pass
return wait_all.bind(*children)
@ray.remote
def process_all(file_contents: List[ray.ObjectRef]) -> None:
@ray.remote
def process_one(contents: str) -> str:
return process(contents)
children = [process_one.bind(f) for f in file_contents]
return upload_all.bind(children)
@ray.remote
def download_all(urls: List[str]) -> None:
@ray.remote
def download_one(url: str) -> str:
return download(url)
children = [download_one.bind(u) for u in urls]
return process_all.bind(children)
if __name__ == "__main__":
res = download_all.bind(FILES_TO_PROCESS)
workflow.run(res)
Trip Booking#
// https://github.com/uber/cadence-java-samples/tree/master/src/main/java/com/uber/cadence/samples/bookingsaga
public class TripBookingWorkflowImpl implements TripBookingWorkflow {
private final ActivityOptions options =
new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofHours(1)).build();
private final TripBookingActivities activities =
Workflow.newActivityStub(TripBookingActivities.class, options);
@Override
public void bookTrip(String name) {
Saga.Options sagaOptions = new Saga.Options.Builder().setParallelCompensation(true).build();
Saga saga = new Saga(sagaOptions);
try {
String carReservationID = activities.reserveCar(name);
saga.addCompensation(activities::cancelCar, carReservationID, name);
String hotelReservationID = activities.bookHotel(name);
saga.addCompensation(activities::cancelHotel, hotelReservationID, name);
String flightReservationID = activities.bookFlight(name);
saga.addCompensation(activities::cancelFlight, flightReservationID, name);
} catch (ActivityException e) {
saga.compensate();
throw e;
}
}
}
from typing import List, Tuple, Optional
import ray
from ray import workflow
# Mock method to make requests to an external service.
def make_request(*args) -> None:
return "-".join(args)
# Generate an idempotency token (this is an extension to the cadence example).
@ray.remote
def generate_request_id():
import uuid
return uuid.uuid4().hex
@ray.remote
def book_car(request_id: str) -> str:
car_reservation_id = make_request("book_car", request_id)
return car_reservation_id
@ray.remote
def book_hotel(request_id: str, *deps) -> str:
hotel_reservation_id = make_request("book_hotel", request_id)
return hotel_reservation_id
@ray.remote
def book_flight(request_id: str, *deps) -> str:
flight_reservation_id = make_request("book_flight", request_id)
return flight_reservation_id
@ray.remote
def book_all(car_req_id: str, hotel_req_id: str, flight_req_id: str) -> str:
car_res_id = book_car.bind(car_req_id)
hotel_res_id = book_hotel.bind(hotel_req_id, car_res_id)
flight_res_id = book_flight.bind(hotel_req_id, hotel_res_id)
@ray.remote
def concat(*ids: List[str]) -> str:
return ", ".join(ids)
return workflow.continuation(concat.bind(car_res_id, hotel_res_id, flight_res_id))
@ray.remote
def handle_errors(
car_req_id: str,
hotel_req_id: str,
flight_req_id: str,
final_result: Tuple[Optional[str], Optional[Exception]],
) -> str:
result, error = final_result
@ray.remote
def wait_all(*deps) -> None:
pass
@ray.remote
def cancel(request_id: str) -> None:
make_request("cancel", request_id)
if error:
return workflow.continuation(
wait_all.bind(
cancel.bind(car_req_id),
cancel.bind(hotel_req_id),
cancel.bind(flight_req_id),
)
)
else:
return result
if __name__ == "__main__":
car_req_id = generate_request_id.bind()
hotel_req_id = generate_request_id.bind()
flight_req_id = generate_request_id.bind()
# TODO(ekl) we could create a Saga helper function that automates this
# pattern of compensation workflows.
saga_result = book_all.options(**workflow.options(catch_exceptions=True)).bind(
car_req_id, hotel_req_id, flight_req_id
)
final_result = handle_errors.bind(
car_req_id, hotel_req_id, flight_req_id, saga_result
)
print(workflow.run(final_result))
Google Cloud Workflows API Comparison#
The original source of these comparisons can be found here.
Data Conditional#
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/step_conditional_jump.workflows.json
[
{
"firstStep": {
"call": "http.get",
"args": {
"url": "https://www.example.com/callA"
},
"result": "firstResult"
}
},
{
"whereToJump": {
"switch": [
{
"condition": "${firstResult.body.SomeField < 10}",
"next": "small"
},
{
"condition": "${firstResult.body.SomeField < 100}",
"next": "medium"
}
],
"next": "large"
}
},
{
"small": {
"call": "http.get",
"args": {
"url": "https://www.example.com/SmallFunc"
},
"next": "end"
}
},
{
"medium": {
"call": "http.get",
"args": {
"url": "https://www.example.com/MediumFunc"
},
"next": "end"
}
},
{
"large": {
"call": "http.get",
"args": {
"url": "https://www.example.com/LargeFunc"
},
"next": "end"
}
}
]
import ray
from ray import workflow
# Mock method to make a request.
def make_request(url: str) -> str:
return "42"
@ray.remote
def get_size() -> int:
return int(make_request("https://www.example.com/callA"))
@ray.remote
def small(result: int) -> str:
return make_request("https://www.example.com/SmallFunc")
@ray.remote
def medium(result: int) -> str:
return make_request("https://www.example.com/MediumFunc")
@ray.remote
def large(result: int) -> str:
return make_request("https://www.example.com/LargeFunc")
@ray.remote
def decide(result: int) -> str:
if result < 10:
return workflow.continuation(small.bind(result))
elif result < 100:
return workflow.continuation(medium.bind(result))
else:
return workflow.continuation(large.bind(result))
if __name__ == "__main__":
print(workflow.run(decide.bind(get_size.bind())))
Concat Array#
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/array.workflows.json
[
{
"define": {
"assign": [
{
"array": [
"foo",
"ba",
"r"
]
},
{
"result": ""
},
{
"i": 0
}
]
}
},
{
"check_condition": {
"switch": [
{
"condition": "${len(array) > i}",
"next": "iterate"
}
],
"next": "exit_loop"
}
},
{
"iterate": {
"assign": [
{
"result": "${result + array[i]}"
},
{
"i": "${i+1}"
}
],
"next": "check_condition"
}
},
{
"exit_loop": {
"return": {
"concat_result": "${result}"
}
}
}
]
from typing import List
import ray
from ray import workflow
@ray.remote
def iterate(array: List[str], result: str, i: int) -> str:
if i >= len(array):
return result
return workflow.continuation(iterate.bind(array, result + array[i], i + 1))
if __name__ == "__main__":
print(workflow.run(iterate.bind(["foo", "ba", "r"], "", 0)))
Sub Workflows#
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/subworkflow.workflows.json
{
"main": {
"steps": [
{
"first": {
"call": "hello",
"args": {
"input": "Kristof"
},
"result": "someOutput"
}
},
{
"second": {
"return": "${someOutput}"
}
}
]
},
"hello": {
"params": [
"input"
],
"steps": [
{
"first": {
"return": "${\"Hello \"+input}"
}
}
]
}
}
import ray
from ray import workflow
@ray.remote
def hello(name: str) -> str:
return workflow.continuation(format_name.bind(name))
@ray.remote
def format_name(name: str) -> str:
return f"hello, {name}"
@ray.remote
def report(msg: str) -> None:
print(msg)
if __name__ == "__main__":
r1 = hello.bind("Kristof")
r2 = report.bind(r1)
workflow.run(r2)
Prefect API Comparison#
The original source of these comparisons can be found here.
Looping#
# https://docs.prefect.io/core/advanced_tutorials/task-looping.html
import requests
from datetime import timedelta
import prefect
from prefect import task
from prefect import Flow, Parameter
from prefect.engine.signals import LOOP
@task(max_retries=5, retry_delay=timedelta(seconds=2))
def compute_large_fibonacci(M):
# we extract the accumulated task loop result from context
loop_payload = prefect.context.get("task_loop_result", {})
n = loop_payload.get("n", 1)
fib = loop_payload.get("fib", 1)
next_fib = requests.post(
"https://nemo.api.stdlib.com/[email protected]/", data={"nth": n}
).json()
if next_fib > M:
return fib # return statements end the loop
raise LOOP(message=f"Fib {n}={next_fib}", result=dict(n=n + 1, fib=next_fib))
if __name__ == "__main__":
with Flow("fibonacci") as flow:
M = Parameter("M")
fib_num = compute_large_fibonacci(M)
flow_state = flow.run(M=100)
print(flow_state.result[fib_num].result) # 89
import tempfile
import ray
from ray import workflow
from ray.actor import ActorHandle
@ray.remote
class FibonacciActor:
def __init__(self):
self.cache = {}
def compute(self, n):
if n not in self.cache:
assert n > 0
a, b = 0, 1
for _ in range(n - 1):
a, b = b, a + b
self.cache[n] = b
return self.cache[n]
@ray.remote
def compute_large_fib(fibonacci_actor: ActorHandle, M: int, n: int = 1, fib: int = 1):
next_fib = ray.get(fibonacci_actor.compute.remote(n))
if next_fib > M:
return fib
else:
return workflow.continuation(
compute_large_fib.bind(fibonacci_actor, M, n + 1, next_fib)
)
if __name__ == "__main__":
ray.init(storage=f"file://{tempfile.TemporaryDirectory().name}")
assert workflow.run(compute_large_fib.bind(FibonacciActor.remote(), 100)) == 89
AirFlow API Comparison#
The original source of these comparisons can be found here.
ETL Workflow#
# https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/example_dags/tutorial_taskflow_api_etl.html
import json
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def tutorial_taskflow_api_etl():
"""
### TaskFlow API Tutorial Documentation
This is a simple ETL data pipeline example which demonstrates the use of
the TaskFlow API using three simple tasks for Extract, Transform, and Load.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
"""
@task()
def extract():
"""
#### Extract task
A simple Extract task to get data ready for the rest of the data
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
"""
#### Load task
A simple Load task which takes in the result of the Transform task and
instead of saving it to end user review, just prints it out.
"""
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
tutorial_etl_dag = tutorial_taskflow_api_etl()
import json
import ray
from ray import workflow
@ray.remote
def extract() -> dict:
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@ray.remote
def transform(order_data_dict: dict) -> dict:
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": ray.put(total_order_value)}
@ray.remote
def load(data_dict: dict) -> str:
total_order_value = ray.get(data_dict["total_order_value"])
return f"Total order value is: {total_order_value:.2f}"
if __name__ == "__main__":
order_data = extract.bind()
order_summary = transform.bind(order_data)
etl = load.bind(order_summary)
print(workflow.run(etl))