API Comparisons¶
Comparison between Ray Core APIs and Workflows¶
Workflows is built on top of Ray, and offers a mostly consistent subset of its API while providing durability. This section highlights some of the differences:
func.remote
vs func.step
¶
With Ray tasks, func.remote
will submit a remote task to run. In Ray workflows, func.step
is used to create a Workflow
object. Execution of the workflow is deferred until .run(workflow_id="id")
or .run_async(workflow_id="id")
is called on the Workflow
. Specifying the workflow id allows for resuming of the workflow by its id in case of cluster failure.
Actor.remote
vs Actor.get_or_create
¶
With Ray actors, Actor.remote
will submit an actor creation task and create an actor process in the cluster. In Ray workflows, virtual actors are created by Actor.get_or_create
. The actor state is tracked as a dynamic workflow (durably logged) instead of in a running process. This means that the actor uses no resources when inactive, and can be used even after cluster restarts.
actor.func.remote
vs actor.func.run
¶
With Ray actors, actor.func.remote
will submit a remote task to run which is similar as func.remote
. On the other hand actor.func.run
on a virtual actor will read the actor state from storage, execute a step, and then write the new state back to storage. If the actor.func
is decorated with workflow.virtual_actor.readonly
, its result will not be logged.
Other Workflow Engines¶
Note: these comparisons are inspired by the Serverless workflows comparisons repo.
Argo API Comparison¶
The original source of these comparisons can be found here.
Conditionals¶
# https://github.com/argoproj/argo-workflows/tree/master/examples#conditionals
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: coinflip-
spec:
entrypoint: coinflip
templates:
- name: coinflip
steps:
# flip a coin
- - name: flip-coin
template: flip-coin
# evaluate the result in parallel
- - name: heads
template: heads # call heads template if "heads"
when: "{{steps.flip-coin.outputs.result}} == heads"
- name: tails
template: tails # call tails template if "tails"
when: "{{steps.flip-coin.outputs.result}} == tails"
- - name: flip-again
template: flip-coin
- - name: complex-condition
template: heads-tails-or-twice-tails
# call heads template if first flip was "heads" and second was "tails" OR both were "tails"
when: >-
( {{steps.flip-coin.outputs.result}} == heads &&
{{steps.flip-again.outputs.result}} == tails
) ||
( {{steps.flip-coin.outputs.result}} == tails &&
{{steps.flip-again.outputs.result}} == tails )
- name: heads-regex
template: heads # call heads template if ~ "hea"
when: "{{steps.flip-again.outputs.result}} =~ hea"
- name: tails-regex
template: tails # call heads template if ~ "tai"
when: "{{steps.flip-again.outputs.result}} =~ tai"
# Return heads or tails based on a random number
- name: flip-coin
script:
image: python:alpine3.6
command: [python]
source: |
import random
result = "heads" if random.randint(0,1) == 0 else "tails"
print(result)
- name: heads
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was heads\""]
- name: tails
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was tails\""]
- name: heads-tails-or-twice-tails
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was heads the first flip and tails the second. Or it was two times tails.\""]
from ray import workflow
@workflow.step
def handle_heads() -> str:
return "It was heads"
@workflow.step
def handle_tails() -> str:
return "It was tails"
@workflow.step
def flip_coin() -> str:
import random
@workflow.step
def decide(heads: bool) -> str:
if heads:
return handle_heads.step()
else:
return handle_tails.step()
return decide.step(random.random() > 0.5)
if __name__ == "__main__":
workflow.init()
print(flip_coin.step().run())
DAG¶
# https://github.com/argoproj/argo-workflows/tree/master/examples#dag
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-diamond-
spec:
entrypoint: diamond
templates:
- name: echo
inputs:
parameters:
- name: message
container:
image: alpine:3.7
command: [echo, "{{inputs.parameters.message}}"]
- name: diamond
dag:
tasks:
- name: A
template: echo
arguments:
parameters: [{name: message, value: A}]
- name: B
dependencies: [A]
template: echo
arguments:
parameters: [{name: message, value: B}]
- name: C
dependencies: [A]
template: echo
arguments:
parameters: [{name: message, value: C}]
- name: D
dependencies: [B, C]
template: echo
arguments:
parameters: [{name: message, value: D}]
from ray import workflow
@workflow.step
def echo(msg: str, *deps) -> None:
print(msg)
if __name__ == "__main__":
workflow.init()
A = echo.options(name="A").step("A")
B = echo.options(name="B").step("B", A)
C = echo.options(name="C").step("C", A)
D = echo.options(name="D").step("D", A, B)
D.run()
Multi-step Workflow¶
# https://github.com/argoproj/argo-workflows/tree/master/examples#steps
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: steps-
spec:
entrypoint: hello-hello-hello
# This spec contains two templates: hello-hello-hello and whalesay
templates:
- name: hello-hello-hello
# Instead of just running a container
# This template has a sequence of steps
steps:
- - name: hello1 # hello1 is run before the following steps
template: whalesay
arguments:
parameters:
- name: message
value: "hello1"
- - name: hello2a # double dash => run after previous step
template: whalesay
arguments:
parameters:
- name: message
value: "hello2a"
- name: hello2b # single dash => run in parallel with previous step
template: whalesay
arguments:
parameters:
- name: message
value: "hello2b"
# This is the same template as from the previous example
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
from ray import workflow
@workflow.step
def hello(msg: str, *deps) -> None:
print(msg)
@workflow.step
def wait_all(*args) -> None:
pass
if __name__ == "__main__":
workflow.init()
h1 = hello.options(name="hello1").step("hello1")
h2a = hello.options(name="hello2a").step("hello2a")
h2b = hello.options(name="hello2b").step("hello2b", h2a)
wait_all.step(h1, h2b).run()
Exit Handler¶
# https://github.com/argoproj/argo-workflows/tree/master/examples#exit-handlers
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: exit-handlers-
spec:
entrypoint: intentional-fail
onExit: exit-handler # invoke exit-handler template at end of the workflow
templates:
# primary workflow template
- name: intentional-fail
container:
image: alpine:latest
command: [sh, -c]
args: ["echo intentional failure; exit 1"]
# Exit handler templates
# After the completion of the entrypoint template, the status of the
# workflow is made available in the global variable {{workflow.status}}.
# {{workflow.status}} will be one of: Succeeded, Failed, Error
- name: exit-handler
steps:
- - name: notify
template: send-email
- name: celebrate
template: celebrate
when: "{{workflow.status}} == Succeeded"
- name: cry
template: cry
when: "{{workflow.status}} != Succeeded"
- name: send-email
container:
image: alpine:latest
command: [sh, -c]
args: ["echo send e-mail: {{workflow.name}} {{workflow.status}} {{workflow.duration}}"]
- name: celebrate
container:
image: alpine:latest
command: [sh, -c]
args: ["echo hooray!"]
- name: cry
container:
image: alpine:latest
command: [sh, -c]
args: ["echo boohoo!"]
from typing import Tuple, Optional
from ray import workflow
@workflow.step
def intentional_fail() -> str:
raise RuntimeError("oops")
@workflow.step
def cry(error: Exception) -> None:
print("Sadly", error)
@workflow.step
def celebrate(result: str) -> None:
print("Success!", result)
@workflow.step
def send_email(result: str) -> None:
print("Sending email", result)
@workflow.step
def exit_handler(res: Tuple[Optional[str], Optional[Exception]]) -> None:
result, error = res
email = send_email.step("Raw result: {}, {}".format(result, error))
if error:
handler = cry.step(error)
else:
handler = celebrate.step(result)
return wait_all.step(handler, email)
@workflow.step
def wait_all(*deps):
pass
if __name__ == "__main__":
workflow.init()
res = intentional_fail.options(catch_exceptions=True).step()
print(exit_handler.step(res).run())
Loops¶
# https://github.com/argoproj/argo-workflows/tree/master/examples#loops
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: loops-
spec:
entrypoint: loop-example
templates:
- name: loop-example
steps:
- - name: print-message
template: whalesay
arguments:
parameters:
- name: message
value: "{{item}}"
withItems: # invoke whalesay once for each item in parallel
- hello world # item 1
- goodbye world # item 2
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
from ray import workflow
@workflow.step
def hello(msg: str) -> None:
print(msg)
@workflow.step
def wait_all(*args) -> None:
pass
if __name__ == "__main__":
workflow.init()
children = []
for msg in ["hello world", "goodbye world"]:
children.append(hello.step(msg))
wait_all.step(*children).run()
Recursion¶
# https://github.com/argoproj/argo-workflows/tree/master/examples#recursion
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: coinflip-recursive-
spec:
entrypoint: coinflip
templates:
- name: coinflip
steps:
# flip a coin
- - name: flip-coin
template: flip-coin
# evaluate the result in parallel
- - name: heads
template: heads # call heads template if "heads"
when: "{{steps.flip-coin.outputs.result}} == heads"
- name: tails # keep flipping coins if "tails"
template: coinflip
when: "{{steps.flip-coin.outputs.result}} == tails"
- name: flip-coin
script:
image: python:alpine3.6
command: [python]
source: |
import random
result = "heads" if random.randint(0,1) == 0 else "tails"
print(result)
- name: heads
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was heads\""]
from ray import workflow
@workflow.step
def handle_heads() -> str:
return "It was heads"
@workflow.step
def handle_tails() -> str:
print("It was tails, retrying")
return flip_coin.step()
@workflow.step
def flip_coin() -> str:
import random
@workflow.step
def decide(heads: bool) -> str:
if heads:
return handle_heads.step()
else:
return handle_tails.step()
return decide.step(random.random() > 0.5)
if __name__ == "__main__":
workflow.init()
print(flip_coin.step().run())
Retries¶
# https://github.com/argoproj/argo-workflows/tree/master/examples#retrying-failed-or-errored-steps
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: retry-backoff-
spec:
entrypoint: retry-backoff
templates:
- name: retry-backoff
retryStrategy:
limit: 10
retryPolicy: "Always"
backoff:
duration: "1" # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
factor: 2
maxDuration: "1m" # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
affinity:
nodeAntiAffinity: {}
container:
image: python:alpine3.6
command: ["python", -c]
# fail with a 66% probability
args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
from typing import Any, Tuple, Optional
from ray import workflow
@workflow.step
def flaky_step() -> str:
import random
if random.choice([0, 1, 1]) != 0:
raise ValueError("oops")
return "ok"
@workflow.step
def custom_retry_strategy(func: Any, num_retries: int, delay_s: int) -> str:
import time
@workflow.step
def handle_result(res: Tuple[Optional[str], Optional[Exception]]) -> str:
result, error = res
if result:
return res
elif num_retries <= 0:
raise error
else:
print("Retrying exception after delay", error)
time.sleep(delay_s)
return custom_retry_strategy.step(func, num_retries - 1, delay_s)
res = func.options(catch_exceptions=True).step()
return handle_result.step(res)
if __name__ == "__main__":
workflow.init()
# Default retry strategy.
print(flaky_step.options(max_retries=10).step().run())
# Custom strategy.
print(custom_retry_strategy.step(flaky_step, 10, 1).run())
Metaflow API Comparison¶
The original source of these comparisons can be found here.
Foreach¶
# https://docs.metaflow.org/metaflow/basics#foreach
from metaflow import FlowSpec, step
class ForeachFlow(FlowSpec):
@step
def start(self):
self.titles = ["Stranger Things", "House of Cards", "Narcos"]
self.next(self.a, foreach="titles")
@step
def a(self):
self.title = "%s processed" % self.input
self.next(self.join)
@step
def join(self, inputs):
self.results = [input.title for input in inputs]
self.next(self.end)
@step
def end(self):
print("\n".join(self.results))
if __name__ == "__main__":
ForeachFlow()
from typing import List
from ray import workflow
@workflow.step
def start():
titles = ["Stranger Things", "House of Cards", "Narcos"]
children = []
for t in titles:
children.append(a.step(t))
return end.step(children)
@workflow.step
def a(title: str) -> str:
return "{} processed".format(title)
@workflow.step
def end(results: List[str]) -> str:
return "\n".join(results)
if __name__ == "__main__":
workflow.init()
start.step().run()
Cadence API Comparison¶
The original source of these comparisons can be found here.
Sub Workflows¶
// https://github.com/uber/cadence-java-samples/blob/master/src/main/java/com/uber/cadence/samples/hello/HelloChild.java
public static class GreetingWorkflowImpl implements GreetingWorkflow {
@Override
public String getGreeting(String name) {
// Workflows are stateful. So a new stub must be created for each new child.
GreetingChild child = Workflow.newChildWorkflowStub(GreetingChild.class);
// This is a non blocking call that returns immediately.
// Use child.composeGreeting("Hello", name) to call synchronously.
Promise<String> greeting = Async.function(child::composeGreeting, "Hello", name);
// Do something else here.
return greeting.get(); // blocks waiting for the child to complete.
}
// This example shows how parent workflow return right after starting a child workflow,
// and let the child run itself.
private String demoAsyncChildRun(String name) {
GreetingChild child = Workflow.newChildWorkflowStub(GreetingChild.class);
// non blocking call that initiated child workflow
Async.function(child::composeGreeting, "Hello", name);
// instead of using greeting.get() to block till child complete,
// sometimes we just want to return parent immediately and keep child running
Promise<WorkflowExecution> childPromise = Workflow.getWorkflowExecution(child);
childPromise.get(); // block until child started,
// otherwise child may not start because parent complete first.
return "let child run, parent just return";
}
public static void main(String[] args) {
// Start a worker that hosts both parent and child workflow implementations.
Worker.Factory factory = new Worker.Factory(DOMAIN);
Worker worker = factory.newWorker(TASK_LIST);
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class, GreetingChildImpl.class);
// Start listening to the workflow task list.
factory.start();
// Start a workflow execution. Usually this is done from another program.
WorkflowClient workflowClient = WorkflowClient.newInstance(DOMAIN);
// Get a workflow stub using the same task list the worker uses.
GreetingWorkflow workflow = workflowClient.newWorkflowStub(GreetingWorkflow.class);
// Execute a workflow waiting for it to complete.
String greeting = workflow.getGreeting("World");
System.out.println(greeting);
System.exit(0);
}
}
from ray import workflow
@workflow.step
def compose_greeting(greeting: str, name: str) -> str:
return greeting + ": " + name
@workflow.step
def main_workflow(name: str) -> str:
return compose_greeting.step("Hello", name)
if __name__ == "__main__":
workflow.init()
wf = main_workflow.step("Alice")
print(wf.run())
File Processing¶
// https://github.com/uber/cadence-java-samples/tree/master/src/main/java/com/uber/cadence/samples/fileprocessing
public class FileProcessingWorkflowImpl implements FileProcessingWorkflow {
// Uses the default task list shared by the pool of workers.
private final StoreActivities defaultTaskListStore;
public FileProcessingWorkflowImpl() {
// Create activity clients.
ActivityOptions ao =
new ActivityOptions.Builder()
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.setTaskList(FileProcessingWorker.TASK_LIST)
.build();
this.defaultTaskListStore = Workflow.newActivityStub(StoreActivities.class, ao);
}
@Override
public void processFile(URL source, URL destination) {
RetryOptions retryOptions =
new RetryOptions.Builder()
.setExpiration(Duration.ofSeconds(10))
.setInitialInterval(Duration.ofSeconds(1))
.build();
// Retries the whole sequence on any failure, potentially on a different host.
Workflow.retry(retryOptions, () -> processFileImpl(source, destination));
}
private void processFileImpl(URL source, URL destination) {
StoreActivities.TaskListFileNamePair downloaded = defaultTaskListStore.download(source);
// Now initialize stubs that are specific to the returned task list.
ActivityOptions hostActivityOptions =
new ActivityOptions.Builder()
.setTaskList(downloaded.getHostTaskList())
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.build();
StoreActivities hostSpecificStore =
Workflow.newActivityStub(StoreActivities.class, hostActivityOptions);
// Call processFile activity to zip the file.
// Call the activity to process the file using worker-specific task list.
String processed = hostSpecificStore.process(downloaded.getFileName());
// Call upload activity to upload the zipped file.
hostSpecificStore.upload(processed, destination);
}
}
from typing import List
import ray
from ray import workflow
FILES_TO_PROCESS = ["file-{}".format(i) for i in range(100)]
# Mock method to download a file.
def download(url: str) -> str:
return "contents" * 10000
# Mock method to process a file.
def process(contents: str) -> str:
return "processed: " + contents
# Mock method to upload a file.
def upload(contents: str) -> None:
pass
@workflow.step
def upload_all(file_contents: List[ray.ObjectRef]) -> None:
@workflow.step
def upload_one(contents: str) -> None:
upload(contents)
children = [upload_one.step(f) for f in file_contents]
@workflow.step
def wait_all(*deps) -> None:
pass
return wait_all.step(*children)
@workflow.step
def process_all(file_contents: List[ray.ObjectRef]) -> None:
@workflow.step
def process_one(contents: str) -> ray.ObjectRef:
result = process(contents)
# Result is too large to return directly; put in the object store.
return ray.put(result)
children = [process_one.step(f) for f in file_contents]
return upload_all.step(children)
@workflow.step
def download_all(urls: List[str]) -> None:
@workflow.step
def download_one(url: str) -> ray.ObjectRef:
return ray.put(download(url))
children = [download_one.step(u) for u in urls]
return process_all.step(children)
if __name__ == "__main__":
workflow.init()
res = download_all.step(FILES_TO_PROCESS)
res.run()
Trip Booking¶
// https://github.com/uber/cadence-java-samples/tree/master/src/main/java/com/uber/cadence/samples/bookingsaga
public class TripBookingWorkflowImpl implements TripBookingWorkflow {
private final ActivityOptions options =
new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofHours(1)).build();
private final TripBookingActivities activities =
Workflow.newActivityStub(TripBookingActivities.class, options);
@Override
public void bookTrip(String name) {
Saga.Options sagaOptions = new Saga.Options.Builder().setParallelCompensation(true).build();
Saga saga = new Saga(sagaOptions);
try {
String carReservationID = activities.reserveCar(name);
saga.addCompensation(activities::cancelCar, carReservationID, name);
String hotelReservationID = activities.bookHotel(name);
saga.addCompensation(activities::cancelHotel, hotelReservationID, name);
String flightReservationID = activities.bookFlight(name);
saga.addCompensation(activities::cancelFlight, flightReservationID, name);
} catch (ActivityException e) {
saga.compensate();
throw e;
}
}
}
from typing import List, Tuple, Optional
from ray import workflow
# Mock method to make requests to an external service.
def make_request(*args) -> None:
return "result"
# Generate an idempotency token (this is an extension to the cadence example).
@workflow.step
def generate_request_id():
import uuid
return uuid.uuid4().hex
@workflow.step
def cancel(request_id: str) -> None:
make_request("cancel", request_id)
@workflow.step
def book_car(request_id: str) -> str:
car_reservation_id = make_request("book_car", request_id)
return car_reservation_id
@workflow.step
def book_hotel(request_id: str, *deps) -> str:
hotel_reservation_id = make_request("book_hotel", request_id)
return hotel_reservation_id
@workflow.step
def book_flight(request_id: str, *deps) -> str:
flight_reservation_id = make_request("book_flight", request_id)
return flight_reservation_id
@workflow.step
def book_all(car_req_id: str, hotel_req_id: str, flight_req_id: str) -> str:
car_res_id = book_car.step(car_req_id)
hotel_res_id = book_hotel.step(hotel_req_id, car_res_id)
flight_res_id = book_flight.step(hotel_req_id, hotel_res_id)
@workflow.step
def concat(*ids: List[str]) -> str:
return ", ".join(ids)
return concat.step(car_res_id, hotel_res_id, flight_res_id)
@workflow.step
def handle_errors(
car_req_id: str,
hotel_req_id: str,
flight_req_id: str,
final_result: Tuple[Optional[str], Optional[Exception]],
) -> str:
result, error = final_result
@workflow.step
def wait_all(*deps) -> None:
pass
if error:
return wait_all.step(
cancel.step(car_req_id),
cancel.step(hotel_req_id),
cancel.step(flight_req_id),
)
else:
return result
if __name__ == "__main__":
workflow.init()
car_req_id = generate_request_id.step()
hotel_req_id = generate_request_id.step()
flight_req_id = generate_request_id.step()
# TODO(ekl) we could create a Saga helper function that automates this
# pattern of compensation workflows.
saga_result = book_all.options(catch_exceptions=True).step(
car_req_id, hotel_req_id, flight_req_id
)
final_result = handle_errors.step(
car_req_id, hotel_req_id, flight_req_id, saga_result
)
print(final_result.run())
Google Cloud Workflows API Comparison¶
The original source of these comparisons can be found here.
Data Conditional¶
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/step_conditional_jump.workflows.json
[
{
"firstStep": {
"call": "http.get",
"args": {
"url": "https://www.example.com/callA"
},
"result": "firstResult"
}
},
{
"whereToJump": {
"switch": [
{
"condition": "${firstResult.body.SomeField < 10}",
"next": "small"
},
{
"condition": "${firstResult.body.SomeField < 100}",
"next": "medium"
}
],
"next": "large"
}
},
{
"small": {
"call": "http.get",
"args": {
"url": "https://www.example.com/SmallFunc"
},
"next": "end"
}
},
{
"medium": {
"call": "http.get",
"args": {
"url": "https://www.example.com/MediumFunc"
},
"next": "end"
}
},
{
"large": {
"call": "http.get",
"args": {
"url": "https://www.example.com/LargeFunc"
},
"next": "end"
}
}
]
from ray import workflow
# Mock method to make a request.
def make_request(url: str) -> str:
return "42"
@workflow.step
def get_size() -> int:
return int(make_request("https://www.example.com/callA"))
@workflow.step
def small(result: int) -> str:
return make_request("https://www.example.com/SmallFunc")
@workflow.step
def medium(result: int) -> str:
return make_request("https://www.example.com/MediumFunc")
@workflow.step
def large(result: int) -> str:
return make_request("https://www.example.com/LargeFunc")
@workflow.step
def decide(result: int) -> str:
if result < 10:
return small.step(result)
elif result < 100:
return medium.step(result)
else:
return large.step(result)
if __name__ == "__main__":
workflow.init()
print(decide.step(get_size.step()).run())
Concat Array¶
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/array.workflows.json
[
{
"define": {
"assign": [
{
"array": [
"foo",
"ba",
"r"
]
},
{
"result": ""
},
{
"i": 0
}
]
}
},
{
"check_condition": {
"switch": [
{
"condition": "${len(array) > i}",
"next": "iterate"
}
],
"next": "exit_loop"
}
},
{
"iterate": {
"assign": [
{
"result": "${result + array[i]}"
},
{
"i": "${i+1}"
}
],
"next": "check_condition"
}
},
{
"exit_loop": {
"return": {
"concat_result": "${result}"
}
}
}
]
from typing import List
from ray import workflow
@workflow.step
def iterate(array: List[str], result: str, i: int) -> str:
if i >= len(array):
return result
return iterate.step(array, result + array[i], i + 1)
if __name__ == "__main__":
workflow.init()
print(iterate.step(["foo", "ba", "r"], "", 0).run())
Sub Workflows¶
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/subworkflow.workflows.json
{
"main": {
"steps": [
{
"first": {
"call": "hello",
"args": {
"input": "Kristof"
},
"result": "someOutput"
}
},
{
"second": {
"return": "${someOutput}"
}
}
]
},
"hello": {
"params": [
"input"
],
"steps": [
{
"first": {
"return": "${\"Hello \"+input}"
}
}
]
}
}
from ray import workflow
@workflow.step
def hello(name: str) -> str:
return format_name.step(name)
@workflow.step
def format_name(name: str) -> str:
return "hello, {}".format(name)
@workflow.step
def report(msg: str) -> None:
print(msg)
if __name__ == "__main__":
workflow.init()
r1 = hello.step("Kristof")
r2 = report.step(r1)
r2.run()
Prefect API Comparison¶
The original source of these comparisons can be found here.
Looping¶
# https://docs.prefect.io/core/advanced_tutorials/task-looping.html
import requests
from datetime import timedelta
import prefect
from prefect import task
from prefect import Flow, Parameter
from prefect.engine.signals import LOOP
@task(max_retries=5, retry_delay=timedelta(seconds=2))
def compute_large_fibonacci(M):
# we extract the accumulated task loop result from context
loop_payload = prefect.context.get("task_loop_result", {})
n = loop_payload.get("n", 1)
fib = loop_payload.get("fib", 1)
next_fib = requests.post(
"https://nemo.api.stdlib.com/fibonacci@0.0.1/", data={"nth": n}
).json()
if next_fib > M:
return fib # return statements end the loop
raise LOOP(message=f"Fib {n}={next_fib}", result=dict(n=n + 1, fib=next_fib))
if __name__ == "__main__":
with Flow("fibonacci") as flow:
M = Parameter("M")
fib_num = compute_large_fibonacci(M)
flow_state = flow.run(M=100)
print(flow_state.result[fib_num].result) # 89
from ray import workflow
import requests
@workflow.step
def compute_large_fib(M: int, n: int = 1, fib: int = 1):
next_fib = requests.post(
"https://nemo.api.stdlib.com/fibonacci@0.0.1/", data={"nth": n}
).json()
if next_fib > M:
return fib
else:
return compute_large_fib.step(M, n + 1, next_fib)
if __name__ == "__main__":
workflow.init()
assert compute_large_fib.step(100).run() == 89
AirFlow API Comparison¶
The original source of these comparisons can be found here.
ETL Workflow¶
# https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/example_dags/tutorial_taskflow_api_etl.html
import json
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def tutorial_taskflow_api_etl():
"""
### TaskFlow API Tutorial Documentation
This is a simple ETL data pipeline example which demonstrates the use of
the TaskFlow API using three simple tasks for Extract, Transform, and Load.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
"""
@task()
def extract():
"""
#### Extract task
A simple Extract task to get data ready for the rest of the data
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
"""
#### Load task
A simple Load task which takes in the result of the Transform task and
instead of saving it to end user review, just prints it out.
"""
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
tutorial_etl_dag = tutorial_taskflow_api_etl()
import json
import ray
from ray import workflow
@workflow.step
def extract() -> dict:
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@workflow.step
def transform(order_data_dict: dict) -> dict:
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": ray.put(total_order_value)}
@workflow.step
def load(data_dict: dict) -> str:
total_order_value = ray.get(data_dict["total_order_value"])
return f"Total order value is: {total_order_value:.2f}"
if __name__ == "__main__":
workflow.init()
order_data = extract.step()
order_summary = transform.step(order_data)
etl = load.step(order_summary)
print(etl.run())