Workflow Metadata#

Observability is important for workflows - sometimes we not only want to get the output, but also want to gain insights on the internal states (e.g., to measure the performance or find bottlenecks). Workflow metadata provides several stats that help understand the workflow, from basic running status and task options to performance and user-imposed metadata.

Retrieving metadata#

Workflow metadata can be retrieved with workflow.get_metadata(workflow_id). For example:

import ray
from ray import workflow

@ray.remote
def add(left: int, right: int) -> int:
    return left + right

workflow.run(add.bind(10, 20), workflow_id="add_example")

workflow_metadata = workflow.get_metadata("add_example")

assert workflow_metadata["status"] == "SUCCESSFUL"
assert "start_time" in workflow_metadata["stats"]
assert "end_time" in workflow_metadata["stats"]

You can also retrieve metadata for individual workflow tasks by providing the task name:

workflow.run(
    add.options(
        **workflow.options(task_id="add_task")
    ).bind(10, 20), workflow_id="add_example_2")

task_metadata = workflow.get_metadata("add_example_2", task_id="add_task")

assert "start_time" in workflow_metadata["stats"]
assert "end_time" in workflow_metadata["stats"]

User-defined metadata#

Custom metadata can be added to a workflow or a workflow task by the user, which is useful when you want to attach some extra information to the workflow or workflow task.

  • workflow-level metadata can be added via .run(metadata=metadata)

  • task-level metadata can be added via .options(**workflow.options(metadata=metadata)) or in the decorator @workflow.options(metadata=metadata)

workflow.run(add.options(**workflow.options(task_id="add_task", metadata={"task_k": "task_v"})).bind(10, 20),
    workflow_id="add_example_3", metadata={"workflow_k": "workflow_v"})

assert workflow.get_metadata("add_example_3")["user_metadata"] == {"workflow_k": "workflow_v"}
assert workflow.get_metadata("add_example_3", task_id="add_task")["user_metadata"] == {"task_k": "task_v"}

Note: user-defined metadata must be a python dictionary with values that are JSON serializable.

Available Metrics#

Workflow level

  • status: workflow states, can be one of RUNNING, FAILED, RESUMABLE, CANCELED, or SUCCESSFUL.

  • user_metadata: a python dictionary of custom metadata by the user via workflow.run().

  • stats: workflow running stats, including workflow start time and end time.

Task level

  • name: name of the task, either provided by the user via task.options(**workflow.options(task_id=xxx)) or generated by the system.

  • task_options: options of the task, either provided by the user via task.options() or default by system.

  • user_metadata: a python dictionary of custom metadata by the user via task.options().

  • stats: task running stats, including task start time and end time.

Notes#

1. Unlike get_output(), get_metadata() returns an immediate result for the time it is called, this also means not all fields will be available in the result if corresponding metadata is not available (e.g., metadata["stats"]["end_time"] won’t be available until the workflow is completed).

import time

@ray.remote
def simple():
    time.sleep(1000)
    return 0

workflow.run_async(simple.bind(), workflow_id="workflow_id")

# make sure workflow task starts running
time.sleep(2)

workflow_metadata = workflow.get_metadata("workflow_id")
assert workflow_metadata["status"] == "RUNNING"
assert "start_time" in workflow_metadata["stats"]
assert "end_time" not in workflow_metadata["stats"]

workflow.cancel("workflow_id")

workflow_metadata = workflow.get_metadata("workflow_id")
assert workflow_metadata["status"] == "CANCELED"
assert "start_time" in workflow_metadata["stats"]
assert "end_time" not in workflow_metadata["stats"]

2. For resumed workflows, the current behavior is that “stats” will be updated whenever a workflow is resumed.

from pathlib import Path

workflow_id = "simple"

error_flag = Path("error")
error_flag.touch()

@ray.remote
def simple():
    if error_flag.exists():
        raise ValueError()
    return 0

try:
    workflow.run(simple.bind(), workflow_id=workflow_id)
except ray.exceptions.RayTaskError:
    pass

workflow_metadata_failed = workflow.get_metadata(workflow_id)
assert workflow_metadata_failed["status"] == "FAILED"

# remove flag to make task success
error_flag.unlink()
ref = workflow.resume_async(workflow_id)
assert ray.get(ref) == 0

workflow_metadata_resumed = workflow.get_metadata(workflow_id)
assert workflow_metadata_resumed["status"] == "SUCCESSFUL"

# make sure resume updated running metrics
assert workflow_metadata_resumed["stats"]["start_time"] > workflow_metadata_failed["stats"]["start_time"]
assert workflow_metadata_resumed["stats"]["end_time"] > workflow_metadata_failed["stats"]["end_time"]