API Comparisons#

Comparison between Ray Core APIs and Workflows#

Ray Workflows is built on top of Ray, and offers a mostly consistent subset of its API while providing durability. This section highlights some of the differences:

func.remote vs func.bind#

With Ray tasks, func.remote will submit a remote task to run eagerly; func.bind will generate a node in a DAG, it will not be executed until the DAG is been executed.

Under the context of Ray Workflow, the execution of the DAG is deferred until workflow.run(dag, workflow_id=...) or workflow.run_async(dag, workflow_id=...) is called on the DAG. Specifying the workflow id allows for resuming of the workflow by its id in case of cluster failure.

Other Workflow Engines#

Note: these comparisons are inspired by the Serverless workflows comparisons repo.

Argo API Comparison#

The original source of these comparisons can be found here.

Conditionals#

Argo version:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#conditionals
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: coinflip-
spec:
  entrypoint: coinflip
  templates:
  - name: coinflip
    steps:
    # flip a coin
    - - name: flip-coin
        template: flip-coin
    # evaluate the result in parallel
    - - name: heads
        template: heads                       # call heads template if "heads"
        when: "{{steps.flip-coin.outputs.result}} == heads"
      - name: tails
        template: tails                       # call tails template if "tails"
        when: "{{steps.flip-coin.outputs.result}} == tails"
    - - name: flip-again
        template: flip-coin
    - - name: complex-condition
        template: heads-tails-or-twice-tails
        # call heads template if first flip was "heads" and second was "tails" OR both were "tails"
        when: >-
            ( {{steps.flip-coin.outputs.result}} == heads &&
              {{steps.flip-again.outputs.result}} == tails
            ) ||
            ( {{steps.flip-coin.outputs.result}} == tails &&
              {{steps.flip-again.outputs.result}} == tails )
      - name: heads-regex
        template: heads                       # call heads template if ~ "hea"
        when: "{{steps.flip-again.outputs.result}} =~ hea"
      - name: tails-regex
        template: tails                       # call heads template if ~ "tai"
        when: "{{steps.flip-again.outputs.result}} =~ tai"

  # Return heads or tails based on a random number
  - name: flip-coin
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        result = "heads" if random.randint(0,1) == 0 else "tails"
        print(result)

  - name: heads
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads\""]

  - name: tails
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was tails\""]
  
  - name: heads-tails-or-twice-tails
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads the first flip and tails the second. Or it was two times tails.\""]
Workflow version:#
import ray
from ray import workflow


@ray.remote
def handle_heads() -> str:
    return "It was heads"


@ray.remote
def handle_tails() -> str:
    return "It was tails"


@ray.remote
def flip_coin() -> str:
    import random

    @ray.remote
    def decide(heads: bool) -> str:
        return workflow.continuation(
            handle_heads.bind() if heads else handle_tails.bind()
        )

    return workflow.continuation(decide.bind(random.random() > 0.5))


if __name__ == "__main__":
    print(workflow.run(flip_coin.bind()))

DAG#

Argo version:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#dag
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: dag-diamond-
spec:
  entrypoint: diamond
  templates:
  - name: echo
    inputs:
      parameters:
      - name: message
    container:
      image: alpine:3.7
      command: [echo, "{{inputs.parameters.message}}"]
  - name: diamond
    dag:
      tasks:
      - name: A
        template: echo
        arguments:
          parameters: [{name: message, value: A}]
      - name: B
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: B}]
      - name: C
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: C}]
      - name: D
        dependencies: [B, C]
        template: echo
        arguments:
          parameters: [{name: message, value: D}]
Workflow version:#
import ray
from ray import workflow


@ray.remote
def echo(msg: str, *deps) -> None:
    print(msg)


if __name__ == "__main__":
    A = echo.options(**workflow.options(task_id="A")).bind("A")
    B = echo.options(**workflow.options(task_id="B")).bind("B", A)
    C = echo.options(**workflow.options(task_id="C")).bind("C", A)
    D = echo.options(**workflow.options(task_id="D")).bind("D", A, B)
    workflow.run(D)

Multi-step Workflow#

Argo version:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#steps
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: steps-
spec:
  entrypoint: hello-hello-hello

  # This spec contains two templates: hello-hello-hello and whalesay
  templates:
  - name: hello-hello-hello
    # Instead of just running a container
    # This template has a sequence of steps
    steps:
    - - name: hello1            # hello1 is run before the following steps
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello1"
    - - name: hello2a           # double dash => run after previous step
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello2a"
      - name: hello2b           # single dash => run in parallel with previous step
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "hello2b"

  # This is the same template as from the previous example
  - name: whalesay
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]
Workflow version:#
import ray
from ray import workflow


@ray.remote
def hello(msg: str, *deps) -> None:
    print(msg)


@ray.remote
def wait_all(*args) -> None:
    pass


if __name__ == "__main__":
    h1 = hello.options(**workflow.options(task_id="hello1")).bind("hello1")
    h2a = hello.options(**workflow.options(task_id="hello2a")).bind("hello2a")
    h2b = hello.options(**workflow.options(task_id="hello2b")).bind("hello2b", h2a)
    workflow.run(wait_all.bind(h1, h2b))

Exit Handler#

Argo version:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#exit-handlers
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: exit-handlers-
spec:
  entrypoint: intentional-fail
  onExit: exit-handler                  # invoke exit-handler template at end of the workflow
  templates:
  # primary workflow template
  - name: intentional-fail
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo intentional failure; exit 1"]

  # Exit handler templates
  # After the completion of the entrypoint template, the status of the
  # workflow is made available in the global variable {{workflow.status}}.
  # {{workflow.status}} will be one of: Succeeded, Failed, Error
  - name: exit-handler
    steps:
    - - name: notify
        template: send-email
      - name: celebrate
        template: celebrate
        when: "{{workflow.status}} == Succeeded"
      - name: cry
        template: cry
        when: "{{workflow.status}} != Succeeded"
  - name: send-email
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo send e-mail: {{workflow.name}} {{workflow.status}} {{workflow.duration}}"]
  - name: celebrate
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo hooray!"]
  - name: cry
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo boohoo!"]
Workflow version:#
from typing import Tuple, Optional

import ray
from ray import workflow


@ray.remote
def intentional_fail() -> str:
    raise RuntimeError("oops")


@ray.remote
def cry(error: Exception) -> None:
    print("Sadly", error)


@ray.remote
def celebrate(result: str) -> None:
    print("Success!", result)


@ray.remote
def send_email(result: str) -> None:
    print("Sending email", result)


@ray.remote
def exit_handler(res: Tuple[Optional[str], Optional[Exception]]) -> None:
    result, error = res
    email = send_email.bind(f"Raw result: {result}, {error}")
    if error:
        handler = cry.bind(error)
    else:
        handler = celebrate.bind(result)
    return workflow.continuation(wait_all.bind(handler, email))


@ray.remote
def wait_all(*deps):
    return "done"


if __name__ == "__main__":
    res = intentional_fail.options(**workflow.options(catch_exceptions=True)).bind()
    print(workflow.run(exit_handler.bind(res)))

Loops#

Argo version:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#loops
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-
spec:
  entrypoint: loop-example
  templates:
  - name: loop-example
    steps:
    - - name: print-message
        template: whalesay
        arguments:
          parameters:
          - name: message
            value: "{{item}}"
        withItems:              # invoke whalesay once for each item in parallel
        - hello world           # item 1
        - goodbye world         # item 2

  - name: whalesay
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["{{inputs.parameters.message}}"]
Workflow version:#
import ray
from ray import workflow


@ray.remote
def hello(msg: str) -> None:
    print(msg)


@ray.remote
def wait_all(*args) -> None:
    pass


if __name__ == "__main__":
    children = []
    for msg in ["hello world", "goodbye world"]:
        children.append(hello.bind(msg))
    workflow.run(wait_all.bind(*children))

Recursion#

Argo version:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#recursion
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: coinflip-recursive-
spec:
  entrypoint: coinflip
  templates:
  - name: coinflip
    steps:
    # flip a coin
    - - name: flip-coin
        template: flip-coin
    # evaluate the result in parallel
    - - name: heads
        template: heads                 # call heads template if "heads"
        when: "{{steps.flip-coin.outputs.result}} == heads"
      - name: tails                     # keep flipping coins if "tails"
        template: coinflip
        when: "{{steps.flip-coin.outputs.result}} == tails"

  - name: flip-coin
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import random
        result = "heads" if random.randint(0,1) == 0 else "tails"
        print(result)

  - name: heads
    container:
      image: alpine:3.6
      command: [sh, -c]
      args: ["echo \"it was heads\""]
Workflow version:#
import ray
from ray import workflow


@ray.remote
def handle_heads() -> str:
    return "It was heads"


@ray.remote
def handle_tails() -> str:
    print("It was tails, retrying")
    return workflow.continuation(flip_coin.bind())


@ray.remote
def flip_coin() -> str:
    import random

    @ray.remote
    def decide(heads: bool) -> str:
        if heads:
            return workflow.continuation(handle_heads.bind())
        else:
            return workflow.continuation(handle_tails.bind())

    return workflow.continuation(decide.bind(random.random() > 0.5))


if __name__ == "__main__":
    print(workflow.run(flip_coin.bind()))

Retries#

Argo version:#
# https://github.com/argoproj/argo-workflows/tree/master/examples#retrying-failed-or-errored-steps
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: retry-backoff-
spec:
  entrypoint: retry-backoff
  templates:
  - name: retry-backoff
    retryStrategy:
      limit: 10
      retryPolicy: "Always"
      backoff:
        duration: "1"      # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
        factor: 2
        maxDuration: "1m"  # Must be a string. Default unit is seconds. Could also be a Duration, e.g.: "2m", "6h", "1d"
      affinity:
        nodeAntiAffinity: {}
    container:
      image: python:alpine3.6
      command: ["python", -c]
      # fail with a 66% probability
      args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
Workflow version:#
from typing import Any, Tuple, Optional

import ray
from ray import workflow


@ray.remote
def flaky_step() -> str:
    import random

    if random.choice([0, 1, 1]) != 0:
        raise ValueError("oops")

    return "ok"


@ray.remote
def custom_retry_strategy(func: Any, num_retries: int, delay_s: int) -> str:
    import time

    @ray.remote
    def handle_result(res: Tuple[Optional[str], Optional[Exception]]) -> str:
        result, error = res
        if result:
            return res
        elif num_retries <= 0:
            raise error
        else:
            print("Retrying exception after delay", error)
            time.sleep(delay_s)
            return workflow.continuation(
                custom_retry_strategy.bind(func, num_retries - 1, delay_s)
            )

    res = func.options(**workflow.options(catch_exceptions=True)).bind()
    return workflow.continuation(handle_result.bind(res))


if __name__ == "__main__":
    # Default retry strategy.
    print(
        workflow.run(flaky_step.options(max_retries=10, retry_exceptions=True).bind())
    )
    # Custom strategy.
    print(workflow.run(custom_retry_strategy.bind(flaky_step, 10, 1)))

Metaflow API Comparison#

The original source of these comparisons can be found here.

Foreach#

Metaflow version:#
# https://docs.metaflow.org/metaflow/basics#foreach
from metaflow import FlowSpec, step


class ForeachFlow(FlowSpec):
    @step
    def start(self):
        self.titles = ["Stranger Things", "House of Cards", "Narcos"]
        self.next(self.a, foreach="titles")

    @step
    def a(self):
        self.title = "%s processed" % self.input
        self.next(self.join)

    @step
    def join(self, inputs):
        self.results = [input.title for input in inputs]
        self.next(self.end)

    @step
    def end(self):
        print("\n".join(self.results))


if __name__ == "__main__":
    ForeachFlow()
Workflow version:#
from typing import List

import ray
from ray import workflow


@ray.remote
def start():
    titles = ["Stranger Things", "House of Cards", "Narcos"]
    children = [a.bind(t) for t in titles]
    return workflow.continuation(end.bind(children))


@ray.remote
def a(title: str) -> str:
    return f"{title} processed"


@ray.remote
def end(results: "List[ray.ObjectRef[str]]") -> str:
    return "\n".join(ray.get(results))


if __name__ == "__main__":
    workflow.run(start.bind())

Cadence API Comparison#

The original source of these comparisons can be found here.

Sub Workflows#

Cadence version:#
// https://github.com/uber/cadence-java-samples/blob/master/src/main/java/com/uber/cadence/samples/hello/HelloChild.java
public static class GreetingWorkflowImpl implements GreetingWorkflow {

  @Override
  public String getGreeting(String name) {
    // Workflows are stateful. So a new stub must be created for each new child.
    GreetingChild child = Workflow.newChildWorkflowStub(GreetingChild.class);

    // This is a non blocking call that returns immediately.
    // Use child.composeGreeting("Hello", name) to call synchronously.
    Promise<String> greeting = Async.function(child::composeGreeting, "Hello", name);
    // Do something else here.
    return greeting.get(); // blocks waiting for the child to complete.
  }

  // This example shows how parent workflow return right after starting a child workflow,
  // and let the child run itself.
  private String demoAsyncChildRun(String name) {
    GreetingChild child = Workflow.newChildWorkflowStub(GreetingChild.class);
    // non blocking call that initiated child workflow
    Async.function(child::composeGreeting, "Hello", name);
    // instead of using greeting.get() to block till child complete,
    // sometimes we just want to return parent immediately and keep child running
    Promise<WorkflowExecution> childPromise = Workflow.getWorkflowExecution(child);
    childPromise.get(); // block until child started,
    // otherwise child may not start because parent complete first.
    return "let child run, parent just return";
  }

  public static void main(String[] args) {
    // Start a worker that hosts both parent and child workflow implementations.
    Worker.Factory factory = new Worker.Factory(DOMAIN);
    Worker worker = factory.newWorker(TASK_LIST);
    worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class, GreetingChildImpl.class);
    // Start listening to the workflow task list.
    factory.start();

    // Start a workflow execution. Usually this is done from another program.
    WorkflowClient workflowClient = WorkflowClient.newInstance(DOMAIN);
    // Get a workflow stub using the same task list the worker uses.
    GreetingWorkflow workflow = workflowClient.newWorkflowStub(GreetingWorkflow.class);
    // Execute a workflow waiting for it to complete.
    String greeting = workflow.getGreeting("World");
    System.out.println(greeting);
    System.exit(0);
  }
}
Workflow version:#
import ray
from ray import workflow


@ray.remote
def compose_greeting(greeting: str, name: str) -> str:
    return greeting + ": " + name


@ray.remote
def main_workflow(name: str) -> str:
    return workflow.continuation(compose_greeting.bind("Hello", name))


if __name__ == "__main__":
    print(workflow.run(main_workflow.bind("Alice")))

File Processing#

Cadence version:#
// https://github.com/uber/cadence-java-samples/tree/master/src/main/java/com/uber/cadence/samples/fileprocessing
public class FileProcessingWorkflowImpl implements FileProcessingWorkflow {

  // Uses the default task list shared by the pool of workers.
  private final StoreActivities defaultTaskListStore;

  public FileProcessingWorkflowImpl() {
    // Create activity clients.
    ActivityOptions ao =
        new ActivityOptions.Builder()
            .setScheduleToCloseTimeout(Duration.ofSeconds(10))
            .setTaskList(FileProcessingWorker.TASK_LIST)
            .build();
    this.defaultTaskListStore = Workflow.newActivityStub(StoreActivities.class, ao);
  }

  @Override
  public void processFile(URL source, URL destination) {
    RetryOptions retryOptions =
        new RetryOptions.Builder()
            .setExpiration(Duration.ofSeconds(10))
            .setInitialInterval(Duration.ofSeconds(1))
            .build();
    // Retries the whole sequence on any failure, potentially on a different host.
    Workflow.retry(retryOptions, () -> processFileImpl(source, destination));
  }

  private void processFileImpl(URL source, URL destination) {
    StoreActivities.TaskListFileNamePair downloaded = defaultTaskListStore.download(source);

    // Now initialize stubs that are specific to the returned task list.
    ActivityOptions hostActivityOptions =
        new ActivityOptions.Builder()
            .setTaskList(downloaded.getHostTaskList())
            .setScheduleToCloseTimeout(Duration.ofSeconds(10))
            .build();
    StoreActivities hostSpecificStore =
        Workflow.newActivityStub(StoreActivities.class, hostActivityOptions);

    // Call processFile activity to zip the file.
    // Call the activity to process the file using worker-specific task list.
    String processed = hostSpecificStore.process(downloaded.getFileName());
    // Call upload activity to upload the zipped file.
    hostSpecificStore.upload(processed, destination);
  }
}
Workflow version:#
from typing import List

import ray
from ray import workflow

FILES_TO_PROCESS = ["file-{}".format(i) for i in range(100)]


# Mock method to download a file.
def download(url: str) -> str:
    return "contents" * 10000


# Mock method to process a file.
def process(contents: str) -> str:
    return "processed: " + contents


# Mock method to upload a file.
def upload(contents: str) -> None:
    pass


@ray.remote
def upload_all(file_contents: List[ray.ObjectRef]) -> None:
    @ray.remote
    def upload_one(contents: str) -> None:
        upload(contents)

    children = [upload_one.bind(f) for f in file_contents]

    @ray.remote
    def wait_all(*deps) -> None:
        pass

    return wait_all.bind(*children)


@ray.remote
def process_all(file_contents: List[ray.ObjectRef]) -> None:
    @ray.remote
    def process_one(contents: str) -> str:
        return process(contents)

    children = [process_one.bind(f) for f in file_contents]
    return upload_all.bind(children)


@ray.remote
def download_all(urls: List[str]) -> None:
    @ray.remote
    def download_one(url: str) -> str:
        return download(url)

    children = [download_one.bind(u) for u in urls]
    return process_all.bind(children)


if __name__ == "__main__":
    res = download_all.bind(FILES_TO_PROCESS)
    workflow.run(res)

Trip Booking#

Cadence version:#
// https://github.com/uber/cadence-java-samples/tree/master/src/main/java/com/uber/cadence/samples/bookingsaga
public class TripBookingWorkflowImpl implements TripBookingWorkflow {

  private final ActivityOptions options =
      new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofHours(1)).build();
  private final TripBookingActivities activities =
      Workflow.newActivityStub(TripBookingActivities.class, options);

  @Override
  public void bookTrip(String name) {
    Saga.Options sagaOptions = new Saga.Options.Builder().setParallelCompensation(true).build();
    Saga saga = new Saga(sagaOptions);
    try {
      String carReservationID = activities.reserveCar(name);
      saga.addCompensation(activities::cancelCar, carReservationID, name);

      String hotelReservationID = activities.bookHotel(name);
      saga.addCompensation(activities::cancelHotel, hotelReservationID, name);

      String flightReservationID = activities.bookFlight(name);
      saga.addCompensation(activities::cancelFlight, flightReservationID, name);
    } catch (ActivityException e) {
      saga.compensate();
      throw e;
    }
  }
}
Workflow version:#
from typing import List, Tuple, Optional

import ray
from ray import workflow


# Mock method to make requests to an external service.
def make_request(*args) -> None:
    return "-".join(args)


# Generate an idempotency token (this is an extension to the cadence example).
@ray.remote
def generate_request_id():
    import uuid

    return uuid.uuid4().hex


@ray.remote
def book_car(request_id: str) -> str:
    car_reservation_id = make_request("book_car", request_id)
    return car_reservation_id


@ray.remote
def book_hotel(request_id: str, *deps) -> str:
    hotel_reservation_id = make_request("book_hotel", request_id)
    return hotel_reservation_id


@ray.remote
def book_flight(request_id: str, *deps) -> str:
    flight_reservation_id = make_request("book_flight", request_id)
    return flight_reservation_id


@ray.remote
def book_all(car_req_id: str, hotel_req_id: str, flight_req_id: str) -> str:
    car_res_id = book_car.bind(car_req_id)
    hotel_res_id = book_hotel.bind(hotel_req_id, car_res_id)
    flight_res_id = book_flight.bind(hotel_req_id, hotel_res_id)

    @ray.remote
    def concat(*ids: List[str]) -> str:
        return ", ".join(ids)

    return workflow.continuation(concat.bind(car_res_id, hotel_res_id, flight_res_id))


@ray.remote
def handle_errors(
    car_req_id: str,
    hotel_req_id: str,
    flight_req_id: str,
    final_result: Tuple[Optional[str], Optional[Exception]],
) -> str:
    result, error = final_result

    @ray.remote
    def wait_all(*deps) -> None:
        pass

    @ray.remote
    def cancel(request_id: str) -> None:
        make_request("cancel", request_id)

    if error:
        return workflow.continuation(
            wait_all.bind(
                cancel.bind(car_req_id),
                cancel.bind(hotel_req_id),
                cancel.bind(flight_req_id),
            )
        )
    else:
        return result


if __name__ == "__main__":
    car_req_id = generate_request_id.bind()
    hotel_req_id = generate_request_id.bind()
    flight_req_id = generate_request_id.bind()
    # TODO(ekl) we could create a Saga helper function that automates this
    # pattern of compensation workflows.
    saga_result = book_all.options(**workflow.options(catch_exceptions=True)).bind(
        car_req_id, hotel_req_id, flight_req_id
    )
    final_result = handle_errors.bind(
        car_req_id, hotel_req_id, flight_req_id, saga_result
    )
    print(workflow.run(final_result))

Google Cloud Workflows API Comparison#

The original source of these comparisons can be found here.

Data Conditional#

Google Cloud version:#
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/step_conditional_jump.workflows.json
[
  {
    "firstStep": {
      "call": "http.get",
      "args": {
        "url": "https://www.example.com/callA"
      },
      "result": "firstResult"
    }
  },
  {
    "whereToJump": {
      "switch": [
        {
          "condition": "${firstResult.body.SomeField < 10}",
          "next": "small"
        },
        {
          "condition": "${firstResult.body.SomeField < 100}",
          "next": "medium"
        }
      ],
      "next": "large"
    }
  },
  {
    "small": {
      "call": "http.get",
      "args": {
        "url": "https://www.example.com/SmallFunc"
      },
      "next": "end"
    }
  },
  {
    "medium": {
      "call": "http.get",
      "args": {
        "url": "https://www.example.com/MediumFunc"
      },
      "next": "end"
    }
  },
  {
    "large": {
      "call": "http.get",
      "args": {
        "url": "https://www.example.com/LargeFunc"
      },
      "next": "end"
    }
  }
]
Workflow version:#
import ray
from ray import workflow


# Mock method to make a request.
def make_request(url: str) -> str:
    return "42"


@ray.remote
def get_size() -> int:
    return int(make_request("https://www.example.com/callA"))


@ray.remote
def small(result: int) -> str:
    return make_request("https://www.example.com/SmallFunc")


@ray.remote
def medium(result: int) -> str:
    return make_request("https://www.example.com/MediumFunc")


@ray.remote
def large(result: int) -> str:
    return make_request("https://www.example.com/LargeFunc")


@ray.remote
def decide(result: int) -> str:
    if result < 10:
        return workflow.continuation(small.bind(result))
    elif result < 100:
        return workflow.continuation(medium.bind(result))
    else:
        return workflow.continuation(large.bind(result))


if __name__ == "__main__":
    print(workflow.run(decide.bind(get_size.bind())))

Concat Array#

Google Cloud version:#
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/array.workflows.json
[
  {
    "define": {
      "assign": [
        {
          "array": [
            "foo",
            "ba",
            "r"
          ]
        },
        {
          "result": ""
        },
        {
          "i": 0
        }
      ]
    }
  },
  {
    "check_condition": {
      "switch": [
        {
          "condition": "${len(array) > i}",
          "next": "iterate"
        }
      ],
      "next": "exit_loop"
    }
  },
  {
    "iterate": {
      "assign": [
        {
          "result": "${result + array[i]}"
        },
        {
          "i": "${i+1}"
        }
      ],
      "next": "check_condition"
    }
  },
  {
    "exit_loop": {
      "return": {
        "concat_result": "${result}"
      }
    }
  }
]
Workflow version:#
from typing import List

import ray
from ray import workflow


@ray.remote
def iterate(array: List[str], result: str, i: int) -> str:
    if i >= len(array):
        return result
    return workflow.continuation(iterate.bind(array, result + array[i], i + 1))


if __name__ == "__main__":
    print(workflow.run(iterate.bind(["foo", "ba", "r"], "", 0)))

Sub Workflows#

Google Cloud version:#
# https://github.com/GoogleCloudPlatform/workflows-samples/blob/main/src/subworkflow.workflows.json
{
  "main": {
    "steps": [
      {
        "first": {
          "call": "hello",
          "args": {
            "input": "Kristof"
          },
          "result": "someOutput"
        }
      },
      {
        "second": {
          "return": "${someOutput}"
        }
      }
    ]
  },
  "hello": {
    "params": [
      "input"
    ],
    "steps": [
      {
        "first": {
          "return": "${\"Hello \"+input}"
        }
      }
    ]
  }
}
Workflow version:#
import ray
from ray import workflow


@ray.remote
def hello(name: str) -> str:
    return workflow.continuation(format_name.bind(name))


@ray.remote
def format_name(name: str) -> str:
    return f"hello, {name}"


@ray.remote
def report(msg: str) -> None:
    print(msg)


if __name__ == "__main__":
    r1 = hello.bind("Kristof")
    r2 = report.bind(r1)
    workflow.run(r2)

Prefect API Comparison#

The original source of these comparisons can be found here.

Looping#

Prefect version:#
# https://docs.prefect.io/core/advanced_tutorials/task-looping.html

import requests
from datetime import timedelta

import prefect
from prefect import task
from prefect import Flow, Parameter
from prefect.engine.signals import LOOP


@task(max_retries=5, retry_delay=timedelta(seconds=2))
def compute_large_fibonacci(M):
    # we extract the accumulated task loop result from context
    loop_payload = prefect.context.get("task_loop_result", {})

    n = loop_payload.get("n", 1)
    fib = loop_payload.get("fib", 1)

    next_fib = requests.post(
        "https://nemo.api.stdlib.com/[email protected]/", data={"nth": n}
    ).json()

    if next_fib > M:
        return fib  # return statements end the loop

    raise LOOP(message=f"Fib {n}={next_fib}", result=dict(n=n + 1, fib=next_fib))


if __name__ == "__main__":
    with Flow("fibonacci") as flow:
        M = Parameter("M")
        fib_num = compute_large_fibonacci(M)

    flow_state = flow.run(M=100)
    print(flow_state.result[fib_num].result) # 89
Workflow version:#
import tempfile

import ray
from ray import workflow
from ray.actor import ActorHandle


@ray.remote
class FibonacciActor:
    def __init__(self):
        self.cache = {}

    def compute(self, n):
        if n not in self.cache:
            assert n > 0
            a, b = 0, 1
            for _ in range(n - 1):
                a, b = b, a + b
            self.cache[n] = b
        return self.cache[n]


@ray.remote
def compute_large_fib(fibonacci_actor: ActorHandle, M: int, n: int = 1, fib: int = 1):
    next_fib = ray.get(fibonacci_actor.compute.remote(n))
    if next_fib > M:
        return fib
    else:
        return workflow.continuation(
            compute_large_fib.bind(fibonacci_actor, M, n + 1, next_fib)
        )


if __name__ == "__main__":
    ray.init(storage=f"file://{tempfile.TemporaryDirectory().name}")
    assert workflow.run(compute_large_fib.bind(FibonacciActor.remote(), 100)) == 89

AirFlow API Comparison#

The original source of these comparisons can be found here.

ETL Workflow#

AirFlow version:#
# https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/example_dags/tutorial_taskflow_api_etl.html

import json

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
}


@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def tutorial_taskflow_api_etl():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple ETL data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """

    @task()
    def extract():
        """
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        """
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict

    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}

    @task()
    def load(total_order_value: float):
        """
        #### Load task
        A simple Load task which takes in the result of the Transform task and
        instead of saving it to end user review, just prints it out.
        """

        print(f"Total order value is: {total_order_value:.2f}")

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])


tutorial_etl_dag = tutorial_taskflow_api_etl()
Workflow version:#
import json

import ray
from ray import workflow


@ray.remote
def extract() -> dict:
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    order_data_dict = json.loads(data_string)
    return order_data_dict


@ray.remote
def transform(order_data_dict: dict) -> dict:
    total_order_value = 0
    for value in order_data_dict.values():
        total_order_value += value
    return {"total_order_value": ray.put(total_order_value)}


@ray.remote
def load(data_dict: dict) -> str:
    total_order_value = ray.get(data_dict["total_order_value"])
    return f"Total order value is: {total_order_value:.2f}"


if __name__ == "__main__":
    order_data = extract.bind()
    order_summary = transform.bind(order_data)
    etl = load.bind(order_summary)
    print(workflow.run(etl))