API Comparisons#


The experimental Ray Workflows library has been deprecated and will be removed in a future version of Ray.

Comparison between Ray Core APIs and Workflows#

Ray Workflows is built on top of Ray, and offers a mostly consistent subset of its API while providing durability. This section highlights some of the differences:

func.remote vs func.bind#

With Ray tasks, func.remote will submit a remote task to run eagerly; func.bind will generate a node in a DAG, it will not be executed until the DAG is been executed.

Under the context of Ray Workflow, the execution of the DAG is deferred until workflow.run(dag, workflow_id=...) or workflow.run_async(dag, workflow_id=...) is called on the DAG. Specifying the workflow id allows for resuming of the workflow by its id in case of cluster failure.

Other Workflow Engines#

Note: these comparisons are inspired by the Serverless workflows comparisons repo.

Argo API Comparison#

The original source of these comparisons can be found here.


Argo version:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#conditionals
apiVersion: argoproj.io/v1alpha1
kind: Workflow
  generateName: coinflip-
  entrypoint: coinflip
  - name: coinflip
    # flip a coin
    - - name: flip-coin
        template: flip-coin
    # evaluate the result in parallel
    - - name: heads
        template: heads                       # call heads template if "heads"
        when: "{{steps.flip-coin.outputs.result}} == heads"
      - name: tails
        template: tails                       # call tails template if "tails"
        when: "{{steps.flip-coin.outputs.result}} == tails"
    - - name: flip-again
        template: flip-coin
    - - name: complex-condition
        template: heads-tails-or-twice-tails
        # call heads template if first flip was "heads" and second was "tails" OR both were "tails"
        when: >-
            ( {{steps.flip-coin.outputs.result}} == heads &&
              {{steps.flip-again.outputs.result}} == tails
            ) ||
            ( {{steps.flip-coin.outputs.result}} == tails &&
              {{steps.flip-again.outputs.result}} == tails )
      - name: heads-regex
        template: heads                       # call heads template if ~ "hea"
        when: "{{steps.flip-again.outputs.result}} =~ hea"
      - name: tails-regex
        template: tails                       # call heads template if ~ "tai"
        when: "{{steps.flip-again.outputs.result}} =~ tai"

  # Return heads or tails based on a random number
  - name: flip-coin
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        result = "heads" if random.randint(0,1) == 0 else "tails"

  - name: heads
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads\""]

  - name: tails
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was tails\""]

  - name: heads-tails-or-twice-tails
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads the first flip and tails the second. Or it was two times tails.\""]
Workflow version:#
import ray
from ray import workflow

def handle_heads() -> str:
    return "It was heads"

def handle_tails() -> str:
    return "It was tails"

def flip_coin() -> str:
    import random

    def decide(heads: bool) -> str:
        return workflow.continuation(
            handle_heads.bind() if heads else handle_tails.bind()

    return workflow.continuation(decide.bind(random.random() > 0.5))

if __name__ == "__main__":


Argo version:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#dag
apiVersion: argoproj.io/v1alpha1
kind: Workflow
  generateName: dag-diamond-
  entrypoint: diamond
  - name: echo
      - name: message
      image: alpine:3.7
      command: [echo, "{{inputs.parameters.message}}"]
  - name: diamond
      - name: A
        template: echo
          parameters: [{name: message, value: A}]
      - name: B
        dependencies: [A]
        template: echo
          parameters: [{name: message, value: B}]
      - name: C
        dependencies: [A]
        template: echo
          parameters: [{name: message, value: C}]
      - name: D
        dependencies: [B, C]
        template: echo
          parameters: [{name: message, value: D}]
Workflow version:#
import ray
from ray import workflow

def echo(msg: str, *deps) -> None:

if __name__ == "__main__":
    A = echo.options(**workflow.options(task_id="A")).bind("A")
    B = echo.options(**workflow.options(task_id="B")).bind("B", A)
    C = echo.options(**workflow.options(task_id="C")).bind("C", A)
    D = echo.options(**workflow.options(task_id="D")).bind("D", A, B)

Multi-step Workflow#

Argo version:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#steps
apiVersion: argoproj.io/v1alpha1
kind: Workflow
  generateName: steps-
  entrypoint: hello-hello-hello

  # This spec contains two templates: hello-hello-hello and whalesay
  - name: hello-hello-hello
    # Instead of just running a container
    # This template has a sequence of steps
    - - name: hello1            # hello1 is run before the following steps
        template: whalesay
          - name: message
            value: "hello1"
    - - name: hello2a           # double dash => run after previous step
        template: whalesay
          - name: message
            value: "hello2a"
      - name: hello2b           # single dash => run in parallel with previous step
        template: whalesay
          - name: message
            value: "hello2b"

  # This is the same template as from the previous example
  - name: whalesay
      - name: message
      image: docker/whalesay
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]
Workflow version:#
import ray
from ray import workflow

def hello(msg: str, *deps) -> None:

def wait_all(*args) -> None:

if __name__ == "__main__":
    h1 = hello.options(**workflow.options(task_id="hello1")).bind("hello1")
    h2a = hello.options(**workflow.options(task_id="hello2a")).bind("hello2a")
    h2b = hello.options(**workflow.options(task_id="hello2b")).bind("hello2b", h2a)
    workflow.run(wait_all.bind(h1, h2b))

Exit Handler#

Argo version:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#exit-handlers
apiVersion: argoproj.io/v1alpha1
kind: Workflow
  generateName: exit-handlers-
  entrypoint: intentional-fail
  onExit: exit-handler                  # invoke exit-handler template at end of the workflow
  # primary workflow template
  - name: intentional-fail
      image: alpine:latest
      command: [sh, -c]
      args: ["echo intentional failure; exit 1"]

  # Exit handler templates
  # After the completion of the entrypoint template, the status of the
  # workflow is made available in the global variable {{workflow.status}}.
  # {{workflow.status}} will be one of: Succeeded, Failed, Error
  - name: exit-handler
    - - name: notify
        template: send-email
      - name: celebrate
        template: celebrate
        when: "{{workflow.status}} == Succeeded"
      - name: cry
        template: cry
        when: "{{workflow.status}} != Succeeded"
  - name: send-email
      image: alpine:latest
      command: [sh, -c]
      args: ["echo send e-mail: {{workflow.name}} {{workflow.status}} {{workflow.duration}}"]
  - name: celebrate
      image: alpine:latest
      command: [sh, -c]
      args: ["echo hooray!"]
  - name: cry
      image: alpine:latest
      command: [sh, -c]
      args: ["echo boohoo!"]
Workflow version:#
from typing import Tuple, Optional

import ray
from ray import workflow

def intentional_fail() -> str:
    raise RuntimeError("oops")

def cry(error: Exception) -> None:
    print("Sadly", error)

def celebrate(result: str) -> None:
    print("Success!", result)

def send_email(result: str) -> None:
    print("Sending email", result)

def exit_handler(res: Tuple[Optional[str], Optional[Exception]]) -> None:
    result, error = res
    email = send_email.bind(f"Raw result: {result}, {error}")
    if error:
        handler = cry.bind(error)
        handler = celebrate.bind(result)
    return workflow.continuation(wait_all.bind(handler, email))

def wait_all(*deps):
    return "done"

if __name__ == "__main__":
    res = intentional_fail.options(**workflow.options(catch_exceptions=True)).bind()


Argo version:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#loops
apiVersion: argoproj.io/v1alpha1
kind: Workflow
  generateName: loops-
  entrypoint: loop-example
  - name: loop-example
    - - name: print-message
        template: whalesay
          - name: message
            value: "{{item}}"
        withItems:              # invoke whalesay once for each item in parallel
        - hello world           # item 1
        - goodbye world         # item 2

  - name: whalesay
      - name: message
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]
Workflow version:#
import ray
from ray import workflow

def hello(msg: str) -> None:

def wait_all(*args) -> None:

if __name__ == "__main__":
    children = []
    for msg in ["hello world", "goodbye world"]:


Argo version:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#recursion
apiVersion: argoproj.io/v1alpha1
kind: Workflow
  generateName: coinflip-recursive-
  entrypoint: coinflip
  - name: coinflip
    # flip a coin
    - - name: flip-coin
        template: flip-coin
    # evaluate the result in parallel
    - - name: heads
        template: heads                 # call heads template if "heads"
        when: "{{steps.flip-coin.outputs.result}} == heads"
      - name: tails                     # keep flipping coins if "tails"
        template: coinflip
        when: "{{steps.flip-coin.outputs.result}} == tails"

  - name: flip-coin
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        result = "heads" if random.randint(0,1) == 0 else "tails"

  - name: heads
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads\""]
Workflow version:#
import ray
from ray import workflow

def handle_heads() -> str:
    return "It was heads"

def handle_tails() -> str:
    print("It was tails, retrying")
    return workflow.continuation(flip_coin.bind())

def flip_coin() -> str:
    import random

    def decide(heads: bool) -> str:
        if heads:
            return workflow.continuation(handle_heads.bind())
            return workflow.continuation(handle_tails.bind())

    return workflow.continuation(decide.bind(random.random() > 0.5))

if __name__ == "__main__":


Argo version:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#retrying-failed-or-errored-steps
apiVersion: argoproj.io/v1alpha1
kind: Workflow
  generateName: retry-backoff-
  entrypoint: retry-backoff
  - name: retry-backoff
      limit: 10
      retryPolicy: "Always"
        duration: "1"      # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
        factor: 2
        maxDuration: "1m"  # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
        nodeAntiAffinity: {}
      image: python:alpine3.6
      command: ["python", -c]
      # fail with a 66% probability
      args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
Workflow version:#
from typing import Any, Tuple, Optional

import ray
from ray import workflow

def flaky_step() -> str:
    import random

    if random.choice([0, 1, 1]) != 0:
        raise ValueError("oops")

    return "ok"

def custom_retry_strategy(func: Any, num_retries: int, delay_s: int) -> str:
    import time

    def handle_result(res: Tuple[Optional[str], Optional[Exception]]) -> str:
        result, error = res
        if result:
            return res
        elif num_retries <= 0:
            raise error
            print("Retrying exception after delay", error)
            return workflow.continuation(
                custom_retry_strategy.bind(func, num_retries - 1, delay_s)

    res = func.options(**workflow.options(catch_exceptions=True)).bind()
    return workflow.continuation(handle_result.bind(res))

if __name__ == "__main__":
    # Default retry strategy.
        workflow.run(flaky_step.options(max_retries=10, retry_exceptions=True).bind())
    # Custom strategy.
    print(workflow.run(custom_retry_strategy.bind(flaky_step, 10, 1)))

Metaflow API Comparison#

The original source of these comparisons can be found here.


Metaflow version:#
# https://docs.metaflow.org/metaflow/basics#foreach
from metaflow import FlowSpec, step

class ForeachFlow(FlowSpec):
    def start(self):
        self.titles = ["Stranger Things", "House of Cards", "Narcos"]
        self.next(self.a, foreach="titles")

    def a(self):
        self.title = "%s processed" % self.input

    def join(self, inputs):
        self.results = [input.title for input in inputs]

    def end(self):

if __name__ == "__main__":
Workflow version:#
from typing import List

import ray
from ray import workflow

def start():
    titles = ["Stranger Things", "House of Cards", "Narcos"]
    children = [a.bind(t) for t in titles]
    return workflow.continuation(end.bind(children))

def a(title: str) -> str:
    return f"{title} processed"

def end(results: "List[ray.ObjectRef[str]]") -> str:
    return "\n".join(ray.get(results))

if __name__ == "__main__":

Cadence API Comparison#

The original source of these comparisons can be found here.

Sub Workflows#

Cadence version:#
// https://github.com/uber/cadence-java-samples/blob/master/src/main/java/com/uber/cadence/samples/hello/HelloChild.java
public static class GreetingWorkflowImpl implements GreetingWorkflow {

  public String getGreeting(String name) {
    // Workflows are stateful. So a new stub must be created for each new child.
    GreetingChild child = Workflow.newChildWorkflowStub(GreetingChild.class);

    // This is a non blocking call that returns immediately.
    // Use child.composeGreeting("Hello", name) to call synchronously.
    Promise<String> greeting = Async.function(child::composeGreeting, "Hello", name);
    // Do something else here.
    return greeting.get(); // blocks waiting for the child to complete.

  // This example shows how parent workflow return right after starting a child workflow,
  // and let the child run itself.
  private String demoAsyncChildRun(String name) {
    GreetingChild child = Workflow.newChildWorkflowStub(GreetingChild.class);
    // non blocking call that initiated child workflow
    Async.function(child::composeGreeting, "Hello", name);
    // instead of using greeting.get() to block till child complete,
    // sometimes we just want to return parent immediately and keep child running
    Promise<WorkflowExecution> childPromise = Workflow.getWorkflowExecution(child);
    childPromise.get(); // block until child started,
    // otherwise child may not start because parent complete first.
    return "let child run, parent just return";

  public static void main(String[] args) {
    // Start a worker that hosts both parent and child workflow implementations.
    Worker.Factory factory = new Worker.Factory(DOMAIN);
    Worker worker = factory.newWorker(TASK_LIST);
    worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class, GreetingChildImpl.class);
    // Start listening to the workflow task list.

    // Start a workflow execution. Usually this is done from another program.
    WorkflowClient workflowClient = WorkflowClient.newInstance(DOMAIN);
    // Get a workflow stub using the same task list the worker uses.
    GreetingWorkflow workflow = workflowClient.newWorkflowStub(GreetingWorkflow.class);
    // Execute a workflow waiting for it to complete.
    String greeting = workflow.getGreeting("World");
Workflow version:#
import ray
from ray import workflow

def compose_greeting(greeting: str, name: str) -> str:
    return greeting + ": " + name

def main_workflow(name: str) -> str:
    return workflow.continuation(compose_greeting.bind("Hello", name))

if __name__ == "__main__":

File Processing#

Cadence version:#
// https://github.com/uber/cadence-java-samples/tree/master/src/main/java/com/uber/cadence/samples/fileprocessing
public class FileProcessingWorkflowImpl implements FileProcessingWorkflow {

  // Uses the default task list shared by the pool of workers.
  private final StoreActivities defaultTaskListStore;

  public FileProcessingWorkflowImpl() {
    // Create activity clients.
    ActivityOptions ao =
        new ActivityOptions.Builder()
    this.defaultTaskListStore = Workflow.newActivityStub(StoreActivities.class, ao);

  public void processFile(URL source, URL destination) {
    RetryOptions retryOptions =
        new RetryOptions.Builder()
    // Retries the whole sequence on any failure, potentially on a different host.
    Workflow.retry(retryOptions, () -> processFileImpl(source, destination));

  private void processFileImpl(URL source, URL destination) {
    StoreActivities.TaskListFileNamePair downloaded = defaultTaskListStore.download(source);

    // Now initialize stubs that are specific to the returned task list.
    ActivityOptions hostActivityOptions =
        new ActivityOptions.Builder()
    StoreActivities hostSpecificStore =
        Workflow.newActivityStub(StoreActivities.class, hostActivityOptions);

    // Call processFile activity to zip the file.
    // Call the activity to process the file using worker-specific task list.
    String processed = hostSpecificStore.process(downloaded.getFileName());
    // Call upload activity to upload the zipped file.
    hostSpecificStore.upload(processed, destination);
Workflow version:#
from typing import List

import ray
from ray import workflow

FILES_TO_PROCESS = ["file-{}".format(i) for i in range(100)]

# Mock method to download a file.
def download(url: str) -> str:
    return "contents" * 10000

# Mock method to process a file.
def process(contents: str) -> str:
    return "processed: " + contents

# Mock method to upload a file.
def upload(contents: str) -> None:

def upload_all(file_contents: List[ray.ObjectRef]) -> None:
    def upload_one(contents: str) -> None:

    children = [upload_one.bind(f) for f in file_contents]

    def wait_all(*deps) -> None:

    return wait_all.bind(*children)

def process_all(file_contents: List[ray.ObjectRef]) -> None:
    def process_one(contents: str) -> str:
        return process(contents)

    children = [process_one.bind(f) for f in file_contents]
    return upload_all.bind(children)

def download_all(urls: List[str]) -> None:
    def download_one(url: str) -> str:
        return download(url)

    children = [download_one.bind(u) for u in urls]
    return process_all.bind(children)

if __name__ == "__main__":
    res = download_all.bind(FILES_TO_PROCESS)

Trip Booking#

Cadence version:#
// https://github.com/uber/cadence-java-samples/tree/master/src/main/java/com/uber/cadence/samples/bookingsaga
public class TripBookingWorkflowImpl implements TripBookingWorkflow {

  private final ActivityOptions options =
      new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofHours(1)).build();
  private final TripBookingActivities activities =
      Workflow.newActivityStub(TripBookingActivities.class, options);

  public void bookTrip(String name) {
    Saga.Options sagaOptions = new Saga.Options.Builder().setParallelCompensation(true).build();
    Saga saga = new Saga(sagaOptions);
    try {
      String carReservationID = activities.reserveCar(name);
      saga.addCompensation(activities::cancelCar, carReservationID, name);

      String hotelReservationID = activities.bookHotel(name);
      saga.addCompensation(activities::cancelHotel, hotelReservationID, name);

      String flightReservationID = activities.bookFlight(name);
      saga.addCompensation(activities::cancelFlight, flightReservationID, name);
    } catch (ActivityException e) {
      throw e;
Workflow version:#
from typing import List, Tuple, Optional

import ray
from ray import workflow

# Mock method to make requests to an external service.
def make_request(*args) -> None:
    return "-".join(args)

# Generate an idempotency token (this is an extension to the cadence example).
def generate_request_id():
    import uuid

    return uuid.uuid4().hex

def book_car(request_id: str) -> str:
    car_reservation_id = make_request("book_car", request_id)
    return car_reservation_id

def book_hotel(request_id: str, *deps) -> str:
    hotel_reservation_id = make_request("book_hotel", request_id)
    return hotel_reservation_id

def book_flight(request_id: str, *deps) -> str:
    flight_reservation_id = make_request("book_flight", request_id)
    return flight_reservation_id

def book_all(car_req_id: str, hotel_req_id: str, flight_req_id: str) -> str:
    car_res_id = book_car.bind(car_req_id)
    hotel_res_id = book_hotel.bind(hotel_req_id, car_res_id)
    flight_res_id = book_flight.bind(hotel_req_id, hotel_res_id)

    def concat(*ids: List[str]) -> str:
        return ", ".join(ids)

    return workflow.continuation(concat.bind(car_res_id, hotel_res_id, flight_res_id))

def handle_errors(
    car_req_id: str,
    hotel_req_id: str,
    flight_req_id: str,
    final_result: Tuple[Optional[str], Optional[Exception]],
) -> str:
    result, error = final_result

    def wait_all(*deps) -> None:

    def cancel(request_id: str) -> None:
        make_request("cancel", request_id)

    if error:
        return workflow.continuation(
        return result

if __name__ == "__main__":
    car_req_id = generate_request_id.bind()
    hotel_req_id = generate_request_id.bind()
    flight_req_id = generate_request_id.bind()
    # TODO(ekl) we could create a Saga helper function that automates this
    # pattern of compensation workflows.
    saga_result = book_all.options(**workflow.options(catch_exceptions=True)).bind(
        car_req_id, hotel_req_id, flight_req_id
    final_result = handle_errors.bind(
        car_req_id, hotel_req_id, flight_req_id, saga_result

Google Cloud Workflows API Comparison#

The original source of these comparisons can be found here.

Data Conditional#

Google Cloud version:#
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/step_conditional_jump.workflows.json
    "firstStep": {
      "call": "http.get",
      "args": {
        "url": "https://www.example.com/callA"
      "result": "firstResult"
    "whereToJump": {
      "switch": [
          "condition": "${firstResult.body.SomeField < 10}",
          "next": "small"
          "condition": "${firstResult.body.SomeField < 100}",
          "next": "medium"
      "next": "large"
    "small": {
      "call": "http.get",
      "args": {
        "url": "https://www.example.com/SmallFunc"
      "next": "end"
    "medium": {
      "call": "http.get",
      "args": {
        "url": "https://www.example.com/MediumFunc"
      "next": "end"
    "large": {
      "call": "http.get",
      "args": {
        "url": "https://www.example.com/LargeFunc"
      "next": "end"
Workflow version:#
import ray
from ray import workflow

# Mock method to make a request.
def make_request(url: str) -> str:
    return "42"

def get_size() -> int:
    return int(make_request("https://www.example.com/callA"))

def small(result: int) -> str:
    return make_request("https://www.example.com/SmallFunc")

def medium(result: int) -> str:
    return make_request("https://www.example.com/MediumFunc")

def large(result: int) -> str:
    return make_request("https://www.example.com/LargeFunc")

def decide(result: int) -> str:
    if result < 10:
        return workflow.continuation(small.bind(result))
    elif result < 100:
        return workflow.continuation(medium.bind(result))
        return workflow.continuation(large.bind(result))

if __name__ == "__main__":

Concat Array#

Google Cloud version:#
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/array.workflows.json
    "define": {
      "assign": [
          "array": [
          "result": ""
          "i": 0
    "check_condition": {
      "switch": [
          "condition": "${len(array) > i}",
          "next": "iterate"
      "next": "exit_loop"
    "iterate": {
      "assign": [
          "result": "${result + array[i]}"
          "i": "${i+1}"
      "next": "check_condition"
    "exit_loop": {
      "return": {
        "concat_result": "${result}"
Workflow version:#
from typing import List

import ray
from ray import workflow

def iterate(array: List[str], result: str, i: int) -> str:
    if i >= len(array):
        return result
    return workflow.continuation(iterate.bind(array, result + array[i], i + 1))

if __name__ == "__main__":
    print(workflow.run(iterate.bind(["foo", "ba", "r"], "", 0)))

Sub Workflows#

Google Cloud version:#
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/subworkflow.workflows.json
  "main": {
    "steps": [
        "first": {
          "call": "hello",
          "args": {
            "input": "Kristof"
          "result": "someOutput"
        "second": {
          "return": "${someOutput}"
  "hello": {
    "params": [
    "steps": [
        "first": {
          "return": "${\"Hello \"+input}"
Workflow version:#
import ray
from ray import workflow

def hello(name: str) -> str:
    return workflow.continuation(format_name.bind(name))

def format_name(name: str) -> str:
    return f"hello, {name}"

def report(msg: str) -> None:

if __name__ == "__main__":
    r1 = hello.bind("Kristof")
    r2 = report.bind(r1)

Prefect API Comparison#

The original source of these comparisons can be found here.


Prefect version:#
# https://docs.prefect.io/core/advanced_tutorials/task-looping.html

import requests
from datetime import timedelta

import prefect
from prefect import task
from prefect import Flow, Parameter
from prefect.engine.signals import LOOP

@task(max_retries=5, retry_delay=timedelta(seconds=2))
def compute_large_fibonacci(M):
    # we extract the accumulated task loop result from context
    loop_payload = prefect.context.get("task_loop_result", {})

    n = loop_payload.get("n", 1)
    fib = loop_payload.get("fib", 1)

    next_fib = requests.post(
        "https://nemo.api.stdlib.com/[email protected]/", data={"nth": n}

    if next_fib > M:
        return fib  # return statements end the loop

    raise LOOP(message=f"Fib {n}={next_fib}", result=dict(n=n + 1, fib=next_fib))

if __name__ == "__main__":
    with Flow("fibonacci") as flow:
        M = Parameter("M")
        fib_num = compute_large_fibonacci(M)

    flow_state = flow.run(M=100)
    print(flow_state.result[fib_num].result) # 89
Workflow version:#
import tempfile

import ray
from ray import workflow
from ray.actor import ActorHandle

class FibonacciActor:
    def __init__(self):
        self.cache = {}

    def compute(self, n):
        if n not in self.cache:
            assert n > 0
            a, b = 0, 1
            for _ in range(n - 1):
                a, b = b, a + b
            self.cache[n] = b
        return self.cache[n]

def compute_large_fib(fibonacci_actor: ActorHandle, M: int, n: int = 1, fib: int = 1):
    next_fib = ray.get(fibonacci_actor.compute.remote(n))
    if next_fib > M:
        return fib
        return workflow.continuation(
            compute_large_fib.bind(fibonacci_actor, M, n + 1, next_fib)

if __name__ == "__main__":
    assert workflow.run(compute_large_fib.bind(FibonacciActor.remote(), 100)) == 89

AirFlow API Comparison#

The original source of these comparisons can be found here.

ETL Workflow#

AirFlow version:#
# https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/example_dags/tutorial_taskflow_api_etl.html

import json

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def tutorial_taskflow_api_etl():
    ### TaskFlow API Tutorial Documentation
    This is a simple ETL data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is

    def extract():
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict

    def transform(order_data_dict: dict):
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}

    def load(total_order_value: float):
        #### Load task
        A simple Load task which takes in the result of the Transform task and
        instead of saving it to end user review, just prints it out.

        print(f"Total order value is: {total_order_value:.2f}")

    order_data = extract()
    order_summary = transform(order_data)

tutorial_etl_dag = tutorial_taskflow_api_etl()
Workflow version:#
import json

import ray
from ray import workflow

def extract() -> dict:
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    order_data_dict = json.loads(data_string)
    return order_data_dict

def transform(order_data_dict: dict) -> dict:
    total_order_value = 0
    for value in order_data_dict.values():
        total_order_value += value
    return {"total_order_value": ray.put(total_order_value)}

def load(data_dict: dict) -> str:
    total_order_value = ray.get(data_dict["total_order_value"])
    return f"Total order value is: {total_order_value:.2f}"

if __name__ == "__main__":
    order_data = extract.bind()
    order_summary = transform.bind(order_data)
    etl = load.bind(order_summary)