ray.rllib.utils.metrics.metrics_logger.MetricsLogger.aggregate#

MetricsLogger.aggregate(stats_dicts: List[Dict[str, Any]], *, key: str | Tuple[str, ...] | None = None) None[source]#

Merges n stats_dicts and logs result by merging on the time axis with existing stats.

The n stats_dicts should be generated by n parallel components such that merging their respective stats in parallel is meaningful. This lets us aggregate stats in a tree structure of MetricsLoggers.

If you want to log a dictionary of values (not Stats objects), use log_dict.

from ray.rllib.utils.metrics.metrics_logger import MetricsLogger
from ray.rllib.utils.test_utils import check

# Example: n Learners logging loss stats to be merged.
# Note that losses should usually be logged with a window=1 so they don't
# get smeared over time and instead provide an accurate picture of the
# current situation.
main_logger = MetricsLogger()

logger_learner1 = MetricsLogger()
logger_learner1.log_value("loss", 0.1, window=1)
learner1_results = logger_learner1.reduce()

logger_learner2 = MetricsLogger()
logger_learner2.log_value("loss", 0.2, window=1)
learner2_results = logger_learner2.reduce()

# Merge the stats from both Learners.
main_logger.aggregate(
    [learner1_results, learner2_results],
    key="learners",
)
check(main_logger.peek(("learners", "loss")), 0.15)

# Example: m EnvRunners logging episode returns to be merged.
main_logger = MetricsLogger()

logger_env_runner1 = MetricsLogger()
logger_env_runner1.log_value("mean_ret", 100.0, window=3)
logger_env_runner1.log_value("mean_ret", 200.0, window=3)
logger_env_runner1.log_value("mean_ret", 300.0, window=3)
logger_env_runner1.log_value("mean_ret", 400.0, window=3)
env_runner1_results = logger_env_runner1.reduce()

logger_env_runner2 = MetricsLogger()
logger_env_runner2.log_value("mean_ret", 150.0, window=3)
logger_env_runner2.log_value("mean_ret", 250.0, window=3)
logger_env_runner2.log_value("mean_ret", 350.0, window=3)
logger_env_runner2.log_value("mean_ret", 450.0, window=3)
env_runner2_results = logger_env_runner2.reduce()

# Merge the stats from both EnvRunners.
main_logger.aggregate(
    [env_runner1_results, env_runner2_results],
    key="env_runners",
)
# The expected procedure is as follows:
# The individual internal values lists of the two loggers are as follows:
# env runner 1: [200, 300, 400]
# env runner 2: [250, 350, 450]
# Move backwards from index=-1 (each time, loop through both env runners)
# index=-1 -> [400, 450] -> mean -> [425] -> repeat 2 times (number
#   of env runners) -> [425, 425]
# index=-2 -> [300, 350] -> mean -> [325] -> repeat 2 times
#   -> append -> [425, 425, 325, 325] -> STOP b/c we have reached >= window.
# reverse the list -> [325, 325, 425, 425]
# deque(max_len=3) -> [325, 425, 425]
check(
    main_logger.stats["env_runners"]["mean_ret"].values,
    [325, 425, 425],
)
check(main_logger.peek(("env_runners", "mean_ret")), (325 + 425 + 425) / 3)

# Example: Lifetime sum over n parallel components' stats.
main_logger = MetricsLogger()

logger1 = MetricsLogger()
logger1.log_value("some_stat", 50, reduce="sum", window=None)
logger1.log_value("some_stat", 25, reduce="sum", window=None)
logger1_results = logger1.reduce()

logger2 = MetricsLogger()
logger2.log_value("some_stat", 75, reduce="sum", window=None)
logger2_results = logger2.reduce()

# Merge the stats from both Learners.
main_logger.aggregate([logger1_results, logger2_results])
check(main_logger.peek("some_stat"), 150)

# Example: Sum over n parallel components' stats with a window of 3.
main_logger = MetricsLogger()

logger1 = MetricsLogger()
logger1.log_value("some_stat", 50, reduce="sum", window=3)
logger1.log_value("some_stat", 25, reduce="sum", window=3)
logger1.log_value("some_stat", 10, reduce="sum", window=3)
logger1.log_value("some_stat", 5, reduce="sum", window=3)
logger1_results = logger1.reduce()

logger2 = MetricsLogger()
logger2.log_value("some_stat", 75, reduce="sum", window=3)
logger2.log_value("some_stat", 100, reduce="sum", window=3)
logger2_results = logger2.reduce()

# Merge the stats from both Learners.
main_logger.aggregate([logger1_results, logger2_results])
# The expected procedure is as follows:
# The individual internal values lists of the two loggers are as follows:
# env runner 1: [50, 25, 10, 5]
# env runner 2: [75, 100]
# Move backwards from index=-1 (each time, loop through both loggers)
# index=-1 -> [5, 100] -> reduce over both two indices -> [(5 + 100) / 2, (5 + 100) / 2] = [52.5, 52.5]
# Result = [52.5, 52.5]
# len() = 2 < window = 3
# index=-2 -> [10, 75] -> reduce over both two indices -> [(10 + 75) / 2, (10 + 75) / 2] = [42.5, 42.5]
# result = [42.5, 42.5, 52.5, 52.5]
# len() = 4 >= window = 3
check(main_logger.peek("some_stat"), 147.5)  # last 3 items (window) get sum'd
Parameters:
  • stats_dicts – List of n stats dicts to be merged and then logged.

  • key – Optional top-level key under which to log all keys/key sequences found in the n stats_dicts.