ray.rllib.utils.metrics.metrics_logger.MetricsLogger.merge_and_log_n_dicts#

MetricsLogger.merge_and_log_n_dicts(stats_dicts: List[Dict[str, Any]], *, key: str | Tuple[str, ...] | None = None, reduce: str | None = 'mean', window: int | float | None = None, ema_coeff: float | None = None, clear_on_reduce: bool = False) None[source]#

Merges n dicts, generated by n parallel components, and logs the results.

from ray.rllib.utils.metrics.metrics_logger import MetricsLogger
from ray.rllib.utils.test_utils import check

# Example: n Learners logging loss stats to be merged.
# Note that losses should usually be logged with a window=1 so they don't
# get smeared over time and instead provide an accurate picture of the
# current situation.
main_logger = MetricsLogger()

logger_learner1 = MetricsLogger()
logger_learner1.log_value("loss", 0.1, window=1)
learner1_results = logger_learner1.reduce()

logger_learner2 = MetricsLogger()
logger_learner2.log_value("loss", 0.2, window=1)
learner2_results = logger_learner2.reduce()

# Merge the stats from both Learners.
main_logger.merge_and_log_n_dicts(
    [learner1_results, learner2_results],
    key="learners",
)
check(main_logger.peek(("learners", "loss")), 0.15)

# Example: m EnvRunners logging episode returns to be merged.
main_logger = MetricsLogger()

logger_env_runner1 = MetricsLogger()
logger_env_runner1.log_value("mean_ret", 100.0, window=3)
logger_env_runner1.log_value("mean_ret", 200.0)
logger_env_runner1.log_value("mean_ret", 300.0)
logger_env_runner1.log_value("mean_ret", 400.0)
env_runner1_results = logger_env_runner1.reduce()

logger_env_runner2 = MetricsLogger()
logger_env_runner2.log_value("mean_ret", 150.0, window=3)
logger_env_runner2.log_value("mean_ret", 250.0)
logger_env_runner2.log_value("mean_ret", 350.0)
logger_env_runner2.log_value("mean_ret", 450.0)
env_runner2_results = logger_env_runner2.reduce()

# Merge the stats from both EnvRunners.
main_logger.merge_and_log_n_dicts(
    [env_runner1_results, env_runner2_results],
    key="env_runners",
)
# The expected procedure is as follows:
# The individual internal values lists of the two loggers are as follows:
# env runner 1: [100, 200, 300, 400]
# env runner 2: [150, 250, 350, 450]
# Move backwards from index=-1 (each time, loop through both env runners)
# index=-1 -> [400, 450] -> reduce-mean -> [425] -> repeat 2 times (number
#   of env runners) -> [425, 425]
# index=-2 -> [300, 350] -> reduce-mean -> [325] -> repeat 2 times
#   -> append -> [425, 425, 325, 325] -> STOP b/c we have reached >= window.
# reverse the list -> [325, 325, 425, 425]
check(
    main_logger.stats["env_runners"]["mean_ret"].values,
    [325, 325, 425, 425],
)
check(main_logger.peek(("env_runners", "mean_ret")), (325 + 425 + 425) / 3)

# Example: Lifetime sum over n parallel components' stats.
main_logger = MetricsLogger()

logger1 = MetricsLogger()
logger1.log_value("some_stat", 50, reduce="sum", window=None)
logger1.log_value("some_stat", 25, reduce="sum", window=None)
logger1_results = logger1.reduce()

logger2 = MetricsLogger()
logger2.log_value("some_stat", 75, reduce="sum", window=None)
logger2_results = logger2.reduce()

# Merge the stats from both Learners.
main_logger.merge_and_log_n_dicts([logger1_results, logger2_results])
check(main_logger.peek("some_stat"), 150)

# Example: Sum over n parallel components' stats with a window of 3.
main_logger = MetricsLogger()

logger1 = MetricsLogger()
logger1.log_value("some_stat", 50, reduce="sum", window=3)
logger1.log_value("some_stat", 25, reduce="sum")
logger1.log_value("some_stat", 10, reduce="sum")
logger1.log_value("some_stat", 5, reduce="sum")
logger1_results = logger1.reduce()

logger2 = MetricsLogger()
logger2.log_value("some_stat", 75, reduce="sum", window=3)
logger2.log_value("some_stat", 100, reduce="sum")
logger2_results = logger2.reduce()

# Merge the stats from both Learners.
main_logger.merge_and_log_n_dicts([logger1_results, logger2_results])
# The expected procedure is as follows:
# The individual internal values lists of the two loggers are as follows:
# env runner 1: [50, 25, 10, 5]
# env runner 2: [75, 100]
# Move backwards from index=-1 (each time, loop through both loggers)
# index=-1 -> [5, 100] -> leave as-is, b/c we are sum'ing -> [5, 100]
# index=-2 -> [10, 75] -> leave as-is -> [5, 100, 10, 75] -> STOP b/c we
# have reached >= window.
# reverse the list -> [75, 10, 100, 5]
check(main_logger.peek("some_stat"), 115)  # last 3 items (window) get sum'd
Parameters:
  • stats_dicts – List of n stats dicts to be merged and then logged.

  • key – Optional top-level key under which to log all keys/key sequences found in the n stats_dicts.

  • reduce – The reduction method to apply, once self.reduce() is called. If None, will collect all logged values under key in a list (and also return that list upon calling self.reduce()).

  • window – An optional window size to reduce over. If not None, then the reduction operation is only applied to the most recent window items, and - after reduction - the internal values list under key is shortened to hold at most window items (the most recent ones). Must be None if ema_coeff is provided. If None (and ema_coeff is None), reduction must not be “mean”.

  • ema_coeff – An optional EMA coefficient to use if reduce is “mean” and no window is provided. Note that if both window and ema_coeff are provided, an error is thrown. Also, if ema_coeff is provided, reduce must be “mean”. The reduction formula for EMA is: EMA(t1) = (1.0 - ema_coeff) * EMA(t0) + ema_coeff * new_value

  • clear_on_reduce – If True, all values under key will be emptied after self.reduce() is called. Setting this to True is useful for cases, in which the internal values list would otherwise grow indefinitely, for example if reduce is None and there is no window provided.