Monitoring and Logging Metrics#
Ray Train provides an API for attaching metrics to checkpoints from the training function by calling ray.train.report(metrics, checkpoint)
.
The results will be collected from the distributed workers and passed to the Ray Train driver process for book-keeping.
The primary use-case for reporting is for metrics (accuracy, loss, etc.) at the end of each training epoch. See Saving checkpoints during training for usage examples.
Only the result reported by the rank 0 worker will be attached to the checkpoint.
However, in order to ensure consistency, train.report()
acts as a barrier and must be called on each worker.
To aggregate results from multiple workers, see How to obtain and aggregate results from different workers?.
How to obtain and aggregate results from different workers?#
In real applications, you may want to calculate optimization metrics besides accuracy and loss: recall, precision, Fbeta, etc. You may also want to collect metrics from multiple workers. While Ray Train currently only reports metrics from the rank 0 worker, you can use third-party libraries or distributed primitives of your machine learning framework to report metrics from multiple workers.
Ray Train natively supports TorchMetrics, which provides a collection of machine learning metrics for distributed, scalable PyTorch models.
Here is an example of reporting both the aggregated R2 score and mean train and validation loss from all workers.
# First, pip install torchmetrics
# This code is tested with torchmetrics==0.7.3 and torch==1.12.1
import os
import tempfile
import ray.train.torch
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import torch
import torch.nn as nn
import torchmetrics
from torch.optim import Adam
import numpy as np
def train_func(config):
n = 100
# create a toy dataset
X = torch.Tensor(np.random.normal(0, 1, size=(n, 4)))
X_valid = torch.Tensor(np.random.normal(0, 1, size=(n, 4)))
Y = torch.Tensor(np.random.uniform(0, 1, size=(n, 1)))
Y_valid = torch.Tensor(np.random.uniform(0, 1, size=(n, 1)))
# toy neural network : 1-layer
# wrap the model in DDP
model = ray.train.torch.prepare_model(nn.Linear(4, 1))
criterion = nn.MSELoss()
mape = torchmetrics.MeanAbsolutePercentageError()
# for averaging loss
mean_valid_loss = torchmetrics.MeanMetric()
optimizer = Adam(model.parameters(), lr=3e-4)
for epoch in range(config["num_epochs"]):
model.train()
y = model.forward(X)
# compute loss
loss = criterion(y, Y)
# back-propagate loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
# evaluate
model.eval()
with torch.no_grad():
pred = model(X_valid)
valid_loss = criterion(pred, Y_valid)
# save loss in aggregator
mean_valid_loss(valid_loss)
mape(pred, Y_valid)
# collect all metrics
# use .item() to obtain a value that can be reported
valid_loss = valid_loss.item()
mape_collected = mape.compute().item()
mean_valid_loss_collected = mean_valid_loss.compute().item()
with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
torch.save(
model.state_dict(), os.path.join(temp_checkpoint_dir, "model.pt")
)
train.report(
{
"mape_collected": mape_collected,
"valid_loss": valid_loss,
"mean_valid_loss_collected": mean_valid_loss_collected,
},
checkpoint=train.Checkpoint.from_directory(temp_checkpoint_dir),
)
# reset for next epoch
mape.reset()
mean_valid_loss.reset()
trainer = TorchTrainer(
train_func,
train_loop_config={"num_epochs": 5},
scaling_config=ScalingConfig(num_workers=2),
)
result = trainer.fit()
print(result.metrics["valid_loss"], result.metrics["mean_valid_loss_collected"])
# 0.5109779238700867 0.5512474775314331
(Deprecated) Reporting free-floating metrics#
Reporting metrics with ray.train.report(metrics, checkpoint=None)
from every worker writes the metrics to a Ray Tune log file (progress.csv
, result.json
)
and is accessible via the Result.metrics_dataframe
on the Result
returned by trainer.fit()
.
As of Ray 2.43, this behavior is deprecated and will not be supported in Ray Train V2, which is an overhaul of Ray Train’s implementation and select APIs.
Ray Train V2 only keeps a slim set of experiment tracking features that are necessary for fault tolerance, so it does not support reporting free-floating metrics that are not attached to checkpoints. The recommendation for metric tracking is to report metrics directly from the workers to experiment tracking tools such as MLFlow and WandB. See Experiment Tracking for examples.
In Ray Train V2, reporting only metrics from all workers is a no-op. However, it is still possible to access the results reported by all workers to implement custom metric-handling logic.
import os
assert os.environ["RAY_TRAIN_V2_ENABLED"] == "1"
from typing import Any, Dict, List, Optional
import ray.train
import ray.train.torch
def train_fn_per_worker(config):
# Free-floating metrics can be accessed from the callback below.
ray.train.report({"rank": ray.train.get_context().get_world_rank()})
class CustomMetricsCallback(ray.train.UserCallback):
def after_report(
self,
run_context,
metrics: List[Dict[str, Any]],
checkpoint: Optional[ray.train.Checkpoint],
):
rank_0_metrics = metrics[0]
print(rank_0_metrics)
# Ex: Write metrics to a file...
trainer = ray.train.torch.TorchTrainer(
train_fn_per_worker,
scaling_config=ray.train.ScalingConfig(num_workers=2),
run_config=ray.train.RunConfig(callbacks=[CustomMetricsCallback()]),
)
trainer.fit()
To use Ray Tune Callbacks
that depend on free-floating metrics reported by workers, run Ray Train as a single Ray Tune trial.
See the following resources for more information:
Train V2 REP: Technical details about the API changes in Train V2
Train V2 Migration Guide: Full migration guide for Train V2