Source code for ray.tune.integration.ray_train
from typing import Any, Dict, List, Optional
import ray.tune
from ray.train import Checkpoint as RayTrainCheckpoint
from ray.train.v2._internal.execution.context import TrainRunContext
from ray.train.v2.api.callback import UserCallback
from ray.util.annotations import DeveloperAPI
CHECKPOINT_PATH_KEY = "checkpoint_path"
[docs]
@DeveloperAPI
class TuneReportCallback(UserCallback):
"""Propagate metrics and checkpoint paths from Ray Train workers to Ray Tune."""
def after_report(
self,
run_context: TrainRunContext,
metrics: List[Dict[str, Any]],
checkpoint: Optional[RayTrainCheckpoint],
):
# TODO: This can be changed to aggregate the metrics from all workers.
# For now, just achieve feature parity with the old Tune+Train integration.
metrics = metrics[0].copy()
# If a checkpoint is provided, add the checkpoint path to the metrics.
# Don't report the checkpoint again since it's already been uploaded
# to storage.
if checkpoint:
metrics[CHECKPOINT_PATH_KEY] = checkpoint.path
ray.tune.report(metrics=metrics)