API Comparisons

Comparison between Ray Core APIs and Workflows

Workflows is built on top of Ray, and offers a mostly consistent subset of its API while providing durability. This section highlights some of the differences:

func.remote vs func.step

With Ray tasks, func.remote will submit a remote task to run. In Ray workflows, func.step is used to create a Workflow object. Execution of the workflow is deferred until .run(workflow_id="id") or .run_async(workflow_id="id") is called on the Workflow. Specifying the workflow id allows for resuming of the workflow by its id in case of cluster failure.

Actor.remote vs Actor.get_or_create

With Ray actors, Actor.remote will submit an actor creation task and create an actor process in the cluster. In Ray workflows, virtual actors are created by Actor.get_or_create. The actor state is tracked as a dynamic workflow (durably logged) instead of in a running process. This means that the actor uses no resources when inactive, and can be used even after cluster restarts.

actor.func.remote vs actor.func.run

With Ray actors, actor.func.remote will submit a remote task to run which is similar as func.remote. On the other hand actor.func.run on a virtual actor will read the actor state from storage, execute a step, and then write the new state back to storage. If the actor.func is decorated with workflow.virtual_actor.readonly, its result will not be logged.

Other Workflow Engines

Note: these comparisons are inspired by the Serverless workflows comparisons repo.

Argo API Comparison

The original source of these comparisons can be found here.

Conditionals

Argo version:
# https://github.com/argoproj/argo-workflows/tree/master/examples#conditionals
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: coinflip-
spec:
  entrypoint: coinflip
  templates:
  - name: coinflip
    steps:
    # flip a coin
    - - name: flip-coin
        template: flip-coin
    # evaluate the result in parallel
    - - name: heads
        template: heads                       # call heads template if "heads"
        when: "{{steps.flip-coin.outputs.result}} == heads"
      - name: tails
        template: tails                       # call tails template if "tails"
        when: "{{steps.flip-coin.outputs.result}} == tails"
    - - name: flip-again
        template: flip-coin
    - - name: complex-condition
        template: heads-tails-or-twice-tails
        # call heads template if first flip was "heads" and second was "tails" OR both were "tails"
        when: >-
            ( {{steps.flip-coin.outputs.result}} == heads &&
              {{steps.flip-again.outputs.result}} == tails
            ) ||
            ( {{steps.flip-coin.outputs.result}} == tails &&
              {{steps.flip-again.outputs.result}} == tails )
      - name: heads-regex
        template: heads                       # call heads template if ~ "hea"
        when: "{{steps.flip-again.outputs.result}} =~ hea"
      - name: tails-regex
        template: tails                       # call heads template if ~ "tai"
        when: "{{steps.flip-again.outputs.result}} =~ tai"

  # Return heads or tails based on a random number
  - name: flip-coin
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        result = "heads" if random.randint(0,1) == 0 else "tails"
        print(result)

  - name: heads
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads\""]

  - name: tails
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was tails\""]
  
  - name: heads-tails-or-twice-tails
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads the first flip and tails the second. Or it was two times tails.\""]
Workflow version:
from ray import workflow


@workflow.step
def handle_heads() -> str:
    return "It was heads"


@workflow.step
def handle_tails() -> str:
    return "It was tails"


@workflow.step
def flip_coin() -> str:
    import random

    @workflow.step
    def decide(heads: bool) -> str:
        if heads:
            return handle_heads.step()
        else:
            return handle_tails.step()

    return decide.step(random.random() > 0.5)


if __name__ == "__main__":
    workflow.init()
    print(flip_coin.step().run())

DAG

Argo version:
# https://github.com/argoproj/argo-workflows/tree/master/examples#dag
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: dag-diamond-
spec:
  entrypoint: diamond
  templates:
  - name: echo
    inputs:
      parameters:
      - name: message
    container:
      image: alpine:3.7
      command: [echo, "{{inputs.parameters.message}}"]
  - name: diamond
    dag:
      tasks:
      - name: A
        template: echo
        arguments:
          parameters: [{name: message, value: A}]
      - name: B
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: B}]
      - name: C
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: C}]
      - name: D
        dependencies: [B, C]
        template: echo
        arguments:
          parameters: [{name: message, value: D}]
Workflow version:
from ray import workflow


@workflow.step
def echo(msg: str, *deps) -> None:
    print(msg)


if __name__ == "__main__":
    workflow.init()
    A = echo.options(name="A").step("A")
    B = echo.options(name="B").step("B", A)
    C = echo.options(name="C").step("C", A)
    D = echo.options(name="D").step("D", A, B)
    D.run()

Multi-step Workflow

Argo version:
# https://github.com/argoproj/argo-workflows/tree/master/examples#steps
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: steps-
spec:
  entrypoint: hello-hello-hello

  # This spec contains two templates: hello-hello-hello and whalesay
  templates:
  - name: hello-hello-hello
    # Instead of just running a container
    # This template has a sequence of steps
    steps:
    - - name: hello1            # hello1 is run before the following steps
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello1"
    - - name: hello2a           # double dash => run after previous step
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello2a"
      - name: hello2b           # single dash => run in parallel with previous step
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello2b"

  # This is the same template as from the previous example
  - name: whalesay
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]
Workflow version:
from ray import workflow


@workflow.step
def hello(msg: str, *deps) -> None:
    print(msg)


@workflow.step
def wait_all(*args) -> None:
    pass


if __name__ == "__main__":
    workflow.init()
    h1 = hello.options(name="hello1").step("hello1")
    h2a = hello.options(name="hello2a").step("hello2a")
    h2b = hello.options(name="hello2b").step("hello2b", h2a)
    wait_all.step(h1, h2b).run()

Exit Handler

Argo version:
# https://github.com/argoproj/argo-workflows/tree/master/examples#exit-handlers
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: exit-handlers-
spec:
  entrypoint: intentional-fail
  onExit: exit-handler                  # invoke exit-handler template at end of the workflow
  templates:
  # primary workflow template
  - name: intentional-fail
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo intentional failure; exit 1"]

  # Exit handler templates
  # After the completion of the entrypoint template, the status of the
  # workflow is made available in the global variable {{workflow.status}}.
  # {{workflow.status}} will be one of: Succeeded, Failed, Error
  - name: exit-handler
    steps:
    - - name: notify
        template: send-email
      - name: celebrate
        template: celebrate
        when: "{{workflow.status}} == Succeeded"
      - name: cry
        template: cry
        when: "{{workflow.status}} != Succeeded"
  - name: send-email
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo send e-mail: {{workflow.name}} {{workflow.status}} {{workflow.duration}}"]
  - name: celebrate
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo hooray!"]
  - name: cry
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo boohoo!"]
Workflow version:
from typing import Tuple, Optional

from ray import workflow


@workflow.step
def intentional_fail() -> str:
    raise RuntimeError("oops")


@workflow.step
def cry(error: Exception) -> None:
    print("Sadly", error)


@workflow.step
def celebrate(result: str) -> None:
    print("Success!", result)


@workflow.step
def send_email(result: str) -> None:
    print("Sending email", result)


@workflow.step
def exit_handler(res: Tuple[Optional[str], Optional[Exception]]) -> None:
    result, error = res
    email = send_email.step("Raw result: {}, {}".format(result, error))
    if error:
        handler = cry.step(error)
    else:
        handler = celebrate.step(result)
    return wait_all.step(handler, email)


@workflow.step
def wait_all(*deps):
    pass


if __name__ == "__main__":
    workflow.init()
    res = intentional_fail.options(catch_exceptions=True).step()
    print(exit_handler.step(res).run())

Loops

Argo version:
# https://github.com/argoproj/argo-workflows/tree/master/examples#loops
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-
spec:
  entrypoint: loop-example
  templates:
  - name: loop-example
    steps:
    - - name: print-message
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems:              # invoke whalesay once for each item in parallel
        - hello world           # item 1
        - goodbye world         # item 2

  - name: whalesay
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]
Workflow version:
from ray import workflow


@workflow.step
def hello(msg: str) -> None:
    print(msg)


@workflow.step
def wait_all(*args) -> None:
    pass


if __name__ == "__main__":
    workflow.init()
    children = []
    for msg in ["hello world", "goodbye world"]:
        children.append(hello.step(msg))
    wait_all.step(*children).run()

Recursion

Argo version:
# https://github.com/argoproj/argo-workflows/tree/master/examples#recursion
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: coinflip-recursive-
spec:
  entrypoint: coinflip
  templates:
  - name: coinflip
    steps:
    # flip a coin
    - - name: flip-coin
        template: flip-coin
    # evaluate the result in parallel
    - - name: heads
        template: heads                 # call heads template if "heads"
        when: "{{steps.flip-coin.outputs.result}} == heads"
      - name: tails                     # keep flipping coins if "tails"
        template: coinflip
        when: "{{steps.flip-coin.outputs.result}} == tails"

  - name: flip-coin
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        result = "heads" if random.randint(0,1) == 0 else "tails"
        print(result)

  - name: heads
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads\""]
Workflow version:
from ray import workflow


@workflow.step
def handle_heads() -> str:
    return "It was heads"


@workflow.step
def handle_tails() -> str:
    print("It was tails, retrying")
    return flip_coin.step()


@workflow.step
def flip_coin() -> str:
    import random

    @workflow.step
    def decide(heads: bool) -> str:
        if heads:
            return handle_heads.step()
        else:
            return handle_tails.step()

    return decide.step(random.random() > 0.5)


if __name__ == "__main__":
    workflow.init()
    print(flip_coin.step().run())

Retries

Argo version:
# https://github.com/argoproj/argo-workflows/tree/master/examples#retrying-failed-or-errored-steps
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: retry-backoff-
spec:
  entrypoint: retry-backoff
  templates:
  - name: retry-backoff
    retryStrategy:
      limit: 10
      retryPolicy: "Always"
      backoff:
        duration: "1"      # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
        factor: 2
        maxDuration: "1m"  # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
      affinity:
        nodeAntiAffinity: {}
    container:
      image: python:alpine3.6
      command: ["python", -c]
      # fail with a 66% probability
      args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
Workflow version:
from typing import Any, Tuple, Optional

from ray import workflow


@workflow.step
def flaky_step() -> str:
    import random

    if random.choice([0, 1, 1]) != 0:
        raise ValueError("oops")

    return "ok"


@workflow.step
def custom_retry_strategy(func: Any, num_retries: int, delay_s: int) -> str:
    import time

    @workflow.step
    def handle_result(res: Tuple[Optional[str], Optional[Exception]]) -> str:
        result, error = res
        if result:
            return res
        elif num_retries <= 0:
            raise error
        else:
            print("Retrying exception after delay", error)
            time.sleep(delay_s)
            return custom_retry_strategy.step(func, num_retries - 1, delay_s)

    res = func.options(catch_exceptions=True).step()
    return handle_result.step(res)


if __name__ == "__main__":
    workflow.init()
    # Default retry strategy.
    print(flaky_step.options(max_retries=10).step().run())
    # Custom strategy.
    print(custom_retry_strategy.step(flaky_step, 10, 1).run())

Metaflow API Comparison

The original source of these comparisons can be found here.

Foreach

Metaflow version:
# https://docs.metaflow.org/metaflow/basics#foreach
from metaflow import FlowSpec, step


class ForeachFlow(FlowSpec):
    @step
    def start(self):
        self.titles = ["Stranger Things", "House of Cards", "Narcos"]
        self.next(self.a, foreach="titles")

    @step
    def a(self):
        self.title = "%s processed" % self.input
        self.next(self.join)

    @step
    def join(self, inputs):
        self.results = [input.title for input in inputs]
        self.next(self.end)

    @step
    def end(self):
        print("\n".join(self.results))


if __name__ == "__main__":
    ForeachFlow()
Workflow version:
from typing import List

from ray import workflow


@workflow.step
def start():
    titles = ["Stranger Things", "House of Cards", "Narcos"]
    children = []
    for t in titles:
        children.append(a.step(t))
    return end.step(children)


@workflow.step
def a(title: str) -> str:
    return "{} processed".format(title)


@workflow.step
def end(results: List[str]) -> str:
    return "\n".join(results)


if __name__ == "__main__":
    workflow.init()
    start.step().run()

Cadence API Comparison

The original source of these comparisons can be found here.

Sub Workflows

Cadence version:
// https://github.com/uber/cadence-java-samples/blob/master/src/main/java/com/uber/cadence/samples/hello/HelloChild.java
public static class GreetingWorkflowImpl implements GreetingWorkflow {

  @Override
  public String getGreeting(String name) {
    // Workflows are stateful. So a new stub must be created for each new child.
    GreetingChild child = Workflow.newChildWorkflowStub(GreetingChild.class);

    // This is a non blocking call that returns immediately.
    // Use child.composeGreeting("Hello", name) to call synchronously.
    Promise<String> greeting = Async.function(child::composeGreeting, "Hello", name);
    // Do something else here.
    return greeting.get(); // blocks waiting for the child to complete.
  }

  // This example shows how parent workflow return right after starting a child workflow,
  // and let the child run itself.
  private String demoAsyncChildRun(String name) {
    GreetingChild child = Workflow.newChildWorkflowStub(GreetingChild.class);
    // non blocking call that initiated child workflow
    Async.function(child::composeGreeting, "Hello", name);
    // instead of using greeting.get() to block till child complete,
    // sometimes we just want to return parent immediately and keep child running
    Promise<WorkflowExecution> childPromise = Workflow.getWorkflowExecution(child);
    childPromise.get(); // block until child started,
    // otherwise child may not start because parent complete first.
    return "let child run, parent just return";
  }

  public static void main(String[] args) {
    // Start a worker that hosts both parent and child workflow implementations.
    Worker.Factory factory = new Worker.Factory(DOMAIN);
    Worker worker = factory.newWorker(TASK_LIST);
    worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class, GreetingChildImpl.class);
    // Start listening to the workflow task list.
    factory.start();

    // Start a workflow execution. Usually this is done from another program.
    WorkflowClient workflowClient = WorkflowClient.newInstance(DOMAIN);
    // Get a workflow stub using the same task list the worker uses.
    GreetingWorkflow workflow = workflowClient.newWorkflowStub(GreetingWorkflow.class);
    // Execute a workflow waiting for it to complete.
    String greeting = workflow.getGreeting("World");
    System.out.println(greeting);
    System.exit(0);
  }
}
Workflow version:
from ray import workflow


@workflow.step
def compose_greeting(greeting: str, name: str) -> str:
    return greeting + ": " + name


@workflow.step
def main_workflow(name: str) -> str:
    return compose_greeting.step("Hello", name)


if __name__ == "__main__":
    workflow.init()
    wf = main_workflow.step("Alice")
    print(wf.run())

File Processing

Cadence version:
// https://github.com/uber/cadence-java-samples/tree/master/src/main/java/com/uber/cadence/samples/fileprocessing
public class FileProcessingWorkflowImpl implements FileProcessingWorkflow {

  // Uses the default task list shared by the pool of workers.
  private final StoreActivities defaultTaskListStore;

  public FileProcessingWorkflowImpl() {
    // Create activity clients.
    ActivityOptions ao =
        new ActivityOptions.Builder()
            .setScheduleToCloseTimeout(Duration.ofSeconds(10))
            .setTaskList(FileProcessingWorker.TASK_LIST)
            .build();
    this.defaultTaskListStore = Workflow.newActivityStub(StoreActivities.class, ao);
  }

  @Override
  public void processFile(URL source, URL destination) {
    RetryOptions retryOptions =
        new RetryOptions.Builder()
            .setExpiration(Duration.ofSeconds(10))
            .setInitialInterval(Duration.ofSeconds(1))
            .build();
    // Retries the whole sequence on any failure, potentially on a different host.
    Workflow.retry(retryOptions, () -> processFileImpl(source, destination));
  }

  private void processFileImpl(URL source, URL destination) {
    StoreActivities.TaskListFileNamePair downloaded = defaultTaskListStore.download(source);

    // Now initialize stubs that are specific to the returned task list.
    ActivityOptions hostActivityOptions =
        new ActivityOptions.Builder()
            .setTaskList(downloaded.getHostTaskList())
            .setScheduleToCloseTimeout(Duration.ofSeconds(10))
            .build();
    StoreActivities hostSpecificStore =
        Workflow.newActivityStub(StoreActivities.class, hostActivityOptions);

    // Call processFile activity to zip the file.
    // Call the activity to process the file using worker-specific task list.
    String processed = hostSpecificStore.process(downloaded.getFileName());
    // Call upload activity to upload the zipped file.
    hostSpecificStore.upload(processed, destination);
  }
}
Workflow version:
from typing import List

import ray
from ray import workflow

FILES_TO_PROCESS = ["file-{}".format(i) for i in range(100)]


# Mock method to download a file.
def download(url: str) -> str:
    return "contents" * 10000


# Mock method to process a file.
def process(contents: str) -> str:
    return "processed: " + contents


# Mock method to upload a file.
def upload(contents: str) -> None:
    pass


@workflow.step
def upload_all(file_contents: List[ray.ObjectRef]) -> None:
    @workflow.step
    def upload_one(contents: str) -> None:
        upload(contents)

    children = [upload_one.step(f) for f in file_contents]

    @workflow.step
    def wait_all(*deps) -> None:
        pass

    return wait_all.step(*children)


@workflow.step
def process_all(file_contents: List[ray.ObjectRef]) -> None:
    @workflow.step
    def process_one(contents: str) -> ray.ObjectRef:
        result = process(contents)
        # Result is too large to return directly; put in the object store.
        return ray.put(result)

    children = [process_one.step(f) for f in file_contents]
    return upload_all.step(children)


@workflow.step
def download_all(urls: List[str]) -> None:
    @workflow.step
    def download_one(url: str) -> ray.ObjectRef:
        return ray.put(download(url))

    children = [download_one.step(u) for u in urls]
    return process_all.step(children)


if __name__ == "__main__":
    workflow.init()
    res = download_all.step(FILES_TO_PROCESS)
    res.run()

Trip Booking

Cadence version:
// https://github.com/uber/cadence-java-samples/tree/master/src/main/java/com/uber/cadence/samples/bookingsaga
public class TripBookingWorkflowImpl implements TripBookingWorkflow {

  private final ActivityOptions options =
      new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofHours(1)).build();
  private final TripBookingActivities activities =
      Workflow.newActivityStub(TripBookingActivities.class, options);

  @Override
  public void bookTrip(String name) {
    Saga.Options sagaOptions = new Saga.Options.Builder().setParallelCompensation(true).build();
    Saga saga = new Saga(sagaOptions);
    try {
      String carReservationID = activities.reserveCar(name);
      saga.addCompensation(activities::cancelCar, carReservationID, name);

      String hotelReservationID = activities.bookHotel(name);
      saga.addCompensation(activities::cancelHotel, hotelReservationID, name);

      String flightReservationID = activities.bookFlight(name);
      saga.addCompensation(activities::cancelFlight, flightReservationID, name);
    } catch (ActivityException e) {
      saga.compensate();
      throw e;
    }
  }
}
Workflow version:
from typing import List, Tuple, Optional

from ray import workflow


# Mock method to make requests to an external service.
def make_request(*args) -> None:
    return "result"


# Generate an idempotency token (this is an extension to the cadence example).
@workflow.step
def generate_request_id():
    import uuid
    return uuid.uuid4().hex


@workflow.step
def cancel(request_id: str) -> None:
    make_request("cancel", request_id)


@workflow.step
def book_car(request_id: str) -> str:
    car_reservation_id = make_request("book_car", request_id)
    return car_reservation_id


@workflow.step
def book_hotel(request_id: str, *deps) -> str:
    hotel_reservation_id = make_request("book_hotel", request_id)
    return hotel_reservation_id


@workflow.step
def book_flight(request_id: str, *deps) -> str:
    flight_reservation_id = make_request("book_flight", request_id)
    return flight_reservation_id


@workflow.step
def book_all(car_req_id: str, hotel_req_id: str, flight_req_id: str) -> str:
    car_res_id = book_car.step(car_req_id)
    hotel_res_id = book_hotel.step(hotel_req_id, car_res_id)
    flight_res_id = book_flight.step(hotel_req_id, hotel_res_id)

    @workflow.step
    def concat(*ids: List[str]) -> str:
        return ", ".join(ids)

    return concat.step(car_res_id, hotel_res_id, flight_res_id)


@workflow.step
def handle_errors(
        car_req_id: str, hotel_req_id: str, flight_req_id: str,
        final_result: Tuple[Optional[str], Optional[Exception]]) -> str:
    result, error = final_result

    @workflow.step
    def wait_all(*deps) -> None:
        pass

    if error:
        return wait_all.step(
            cancel.step(car_req_id), cancel.step(hotel_req_id),
            cancel.step(flight_req_id))
    else:
        return result


if __name__ == "__main__":
    workflow.init()
    car_req_id = generate_request_id.step()
    hotel_req_id = generate_request_id.step()
    flight_req_id = generate_request_id.step()
    # TODO(ekl) we could create a Saga helper function that automates this
    # pattern of compensation workflows.
    saga_result = book_all.options(catch_exceptions=True) \
        .step(car_req_id, hotel_req_id, flight_req_id)
    final_result = handle_errors.step(car_req_id, hotel_req_id, flight_req_id,
                                      saga_result)
    print(final_result.run())

Google Cloud Workflows API Comparison

The original source of these comparisons can be found here.

Data Conditional

Google Cloud version:
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/step_conditional_jump.workflows.json
[
  {
    "firstStep": {
      "call": "http.get",
      "args": {
        "url": "https://www.example.com/callA"
      },
      "result": "firstResult"
    }
  },
  {
    "whereToJump": {
      "switch": [
        {
          "condition": "${firstResult.body.SomeField < 10}",
          "next": "small"
        },
        {
          "condition": "${firstResult.body.SomeField < 100}",
          "next": "medium"
        }
      ],
      "next": "large"
    }
  },
  {
    "small": {
      "call": "http.get",
      "args": {
        "url": "https://www.example.com/SmallFunc"
      },
      "next": "end"
    }
  },
  {
    "medium": {
      "call": "http.get",
      "args": {
        "url": "https://www.example.com/MediumFunc"
      },
      "next": "end"
    }
  },
  {
    "large": {
      "call": "http.get",
      "args": {
        "url": "https://www.example.com/LargeFunc"
      },
      "next": "end"
    }
  }
]
Workflow version:
from ray import workflow


# Mock method to make a request.
def make_request(url: str) -> str:
    return "42"


@workflow.step
def get_size() -> int:
    return int(make_request("https://www.example.com/callA"))


@workflow.step
def small(result: int) -> str:
    return make_request("https://www.example.com/SmallFunc")


@workflow.step
def medium(result: int) -> str:
    return make_request("https://www.example.com/MediumFunc")


@workflow.step
def large(result: int) -> str:
    return make_request("https://www.example.com/LargeFunc")


@workflow.step
def decide(result: int) -> str:
    if result < 10:
        return small.step(result)
    elif result < 100:
        return medium.step(result)
    else:
        return large.step(result)


if __name__ == "__main__":
    workflow.init()
    print(decide.step(get_size.step()).run())

Concat Array

Google Cloud version:
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/array.workflows.json
[
  {
    "define": {
      "assign": [
        {
          "array": [
            "foo",
            "ba",
            "r"
          ]
        },
        {
          "result": ""
        },
        {
          "i": 0
        }
      ]
    }
  },
  {
    "check_condition": {
      "switch": [
        {
          "condition": "${len(array) > i}",
          "next": "iterate"
        }
      ],
      "next": "exit_loop"
    }
  },
  {
    "iterate": {
      "assign": [
        {
          "result": "${result + array[i]}"
        },
        {
          "i": "${i+1}"
        }
      ],
      "next": "check_condition"
    }
  },
  {
    "exit_loop": {
      "return": {
        "concat_result": "${result}"
      }
    }
  }
]
Workflow version:
from typing import List

from ray import workflow


@workflow.step
def iterate(array: List[str], result: str, i: int) -> str:
    if i >= len(array):
        return result
    return iterate.step(array, result + array[i], i + 1)


if __name__ == "__main__":
    workflow.init()
    print(iterate.step(["foo", "ba", "r"], "", 0).run())

Sub Workflows

Google Cloud version:
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/subworkflow.workflows.json
{
  "main": {
    "steps": [
      {
        "first": {
          "call": "hello",
          "args": {
            "input": "Kristof"
          },
          "result": "someOutput"
        }
      },
      {
        "second": {
          "return": "${someOutput}"
        }
      }
    ]
  },
  "hello": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "first": {
          "return": "${\"Hello \"+input}"
        }
      }
    ]
  }
}
Workflow version:
from ray import workflow


@workflow.step
def hello(name: str) -> str:
    return format_name.step(name)


@workflow.step
def format_name(name: str) -> str:
    return "hello, {}".format(name)


@workflow.step
def report(msg: str) -> None:
    print(msg)


if __name__ == "__main__":
    workflow.init()
    r1 = hello.step("Kristof")
    r2 = report.step(r1)
    r2.run()

Prefect API Comparison

The original source of these comparisons can be found here.

Looping

Prefect version:
# https://docs.prefect.io/core/advanced_tutorials/task-looping.html

import requests
from datetime import timedelta

import prefect
from prefect import task
from prefect import Flow, Parameter
from prefect.engine.signals import LOOP


@task(max_retries=5, retry_delay=timedelta(seconds=2))
def compute_large_fibonacci(M):
    # we extract the accumulated task loop result from context
    loop_payload = prefect.context.get("task_loop_result", {})

    n = loop_payload.get("n", 1)
    fib = loop_payload.get("fib", 1)

    next_fib = requests.post(
        "https://nemo.api.stdlib.com/fibonacci@0.0.1/", data={"nth": n}
    ).json()

    if next_fib > M:
        return fib  # return statements end the loop

    raise LOOP(message=f"Fib {n}={next_fib}", result=dict(n=n + 1, fib=next_fib))


if __name__ == "__main__":
    with Flow("fibonacci") as flow:
        M = Parameter("M")
        fib_num = compute_large_fibonacci(M)

    flow_state = flow.run(M=100)
    print(flow_state.result[fib_num].result) # 89
Workflow version:
from ray import workflow
import requests


@workflow.step
def compute_large_fib(M: int, n: int = 1, fib: int = 1):
    next_fib = requests.post(
        "https://nemo.api.stdlib.com/fibonacci@0.0.1/", data={
            "nth": n
        }).json()
    if next_fib > M:
        return fib
    else:
        return compute_large_fib.step(M, n + 1, next_fib)


if __name__ == "__main__":
    workflow.init()
    assert compute_large_fib.step(100).run() == 89

AirFlow API Comparison

The original source of these comparisons can be found here.

ETL Workflow

AirFlow version:
# https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/example_dags/tutorial_taskflow_api_etl.html

import json

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
}


@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def tutorial_taskflow_api_etl():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple ETL data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """

    @task()
    def extract():
        """
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        """
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict

    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}

    @task()
    def load(total_order_value: float):
        """
        #### Load task
        A simple Load task which takes in the result of the Transform task and
        instead of saving it to end user review, just prints it out.
        """

        print(f"Total order value is: {total_order_value:.2f}")

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])


tutorial_etl_dag = tutorial_taskflow_api_etl()
Workflow version:
import json

import ray
from ray import workflow


@workflow.step
def extract() -> dict:
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    order_data_dict = json.loads(data_string)
    return order_data_dict


@workflow.step
def transform(order_data_dict: dict) -> dict:
    total_order_value = 0
    for value in order_data_dict.values():
        total_order_value += value
    return {"total_order_value": ray.put(total_order_value)}


@workflow.step
def load(data_dict: dict) -> str:
    total_order_value = ray.get(data_dict["total_order_value"])
    return f"Total order value is: {total_order_value:.2f}"


if __name__ == "__main__":
    workflow.init()
    order_data = extract.step()
    order_summary = transform.step(order_data)
    etl = load.step(order_summary)
    print(etl.run())