ray.rllib.utils.metrics.metrics_logger.MetricsLogger.merge_and_log_n_dicts#
- MetricsLogger.merge_and_log_n_dicts(stats_dicts: List[Dict[str, Any]], *, key: str | Tuple[str, ...] | None = None, reduce: str | None = 'mean', window: int | float | None = None, ema_coeff: float | None = None, clear_on_reduce: bool = False) None [source]#
Merges n dicts, generated by n parallel components, and logs the results.
from ray.rllib.utils.metrics.metrics_logger import MetricsLogger from ray.rllib.utils.test_utils import check # Example: n Learners logging loss stats to be merged. # Note that losses should usually be logged with a window=1 so they don't # get smeared over time and instead provide an accurate picture of the # current situation. main_logger = MetricsLogger() logger_learner1 = MetricsLogger() logger_learner1.log_value("loss", 0.1, window=1) learner1_results = logger_learner1.reduce() logger_learner2 = MetricsLogger() logger_learner2.log_value("loss", 0.2, window=1) learner2_results = logger_learner2.reduce() # Merge the stats from both Learners. main_logger.merge_and_log_n_dicts( [learner1_results, learner2_results], key="learners", ) check(main_logger.peek(("learners", "loss")), 0.15) # Example: m EnvRunners logging episode returns to be merged. main_logger = MetricsLogger() logger_env_runner1 = MetricsLogger() logger_env_runner1.log_value("mean_ret", 100.0, window=3) logger_env_runner1.log_value("mean_ret", 200.0) logger_env_runner1.log_value("mean_ret", 300.0) logger_env_runner1.log_value("mean_ret", 400.0) env_runner1_results = logger_env_runner1.reduce() logger_env_runner2 = MetricsLogger() logger_env_runner2.log_value("mean_ret", 150.0, window=3) logger_env_runner2.log_value("mean_ret", 250.0) logger_env_runner2.log_value("mean_ret", 350.0) logger_env_runner2.log_value("mean_ret", 450.0) env_runner2_results = logger_env_runner2.reduce() # Merge the stats from both EnvRunners. main_logger.merge_and_log_n_dicts( [env_runner1_results, env_runner2_results], key="env_runners", ) # The expected procedure is as follows: # The individual internal values lists of the two loggers are as follows: # env runner 1: [100, 200, 300, 400] # env runner 2: [150, 250, 350, 450] # Move backwards from index=-1 (each time, loop through both env runners) # index=-1 -> [400, 450] -> reduce-mean -> [425] -> repeat 2 times (number # of env runners) -> [425, 425] # index=-2 -> [300, 350] -> reduce-mean -> [325] -> repeat 2 times # -> append -> [425, 425, 325, 325] -> STOP b/c we have reached >= window. # reverse the list -> [325, 325, 425, 425] check( main_logger.stats["env_runners"]["mean_ret"].values, [325, 325, 425, 425], ) check(main_logger.peek(("env_runners", "mean_ret")), (325 + 425 + 425) / 3) # Example: Lifetime sum over n parallel components' stats. main_logger = MetricsLogger() logger1 = MetricsLogger() logger1.log_value("some_stat", 50, reduce="sum", window=None) logger1.log_value("some_stat", 25, reduce="sum", window=None) logger1_results = logger1.reduce() logger2 = MetricsLogger() logger2.log_value("some_stat", 75, reduce="sum", window=None) logger2_results = logger2.reduce() # Merge the stats from both Learners. main_logger.merge_and_log_n_dicts([logger1_results, logger2_results]) check(main_logger.peek("some_stat"), 150) # Example: Sum over n parallel components' stats with a window of 3. main_logger = MetricsLogger() logger1 = MetricsLogger() logger1.log_value("some_stat", 50, reduce="sum", window=3) logger1.log_value("some_stat", 25, reduce="sum") logger1.log_value("some_stat", 10, reduce="sum") logger1.log_value("some_stat", 5, reduce="sum") logger1_results = logger1.reduce() logger2 = MetricsLogger() logger2.log_value("some_stat", 75, reduce="sum", window=3) logger2.log_value("some_stat", 100, reduce="sum") logger2_results = logger2.reduce() # Merge the stats from both Learners. main_logger.merge_and_log_n_dicts([logger1_results, logger2_results]) # The expected procedure is as follows: # The individual internal values lists of the two loggers are as follows: # env runner 1: [50, 25, 10, 5] # env runner 2: [75, 100] # Move backwards from index=-1 (each time, loop through both loggers) # index=-1 -> [5, 100] -> leave as-is, b/c we are sum'ing -> [5, 100] # index=-2 -> [10, 75] -> leave as-is -> [5, 100, 10, 75] -> STOP b/c we # have reached >= window. # reverse the list -> [75, 10, 100, 5] check(main_logger.peek("some_stat"), 115) # last 3 items (window) get sum'd
- Parameters:
stats_dicts – List of n stats dicts to be merged and then logged.
key – Optional top-level key under which to log all keys/key sequences found in the n
stats_dicts
.reduce – The reduction method to apply, once
self.reduce()
is called. If None, will collect all logged values underkey
in a list (and also return that list upon callingself.reduce()
).window – An optional window size to reduce over. If not None, then the reduction operation is only applied to the most recent
window
items, and - after reduction - the internal values list underkey
is shortened to hold at mostwindow
items (the most recent ones). Must be None ifema_coeff
is provided. If None (andema_coeff
is None), reduction must not be “mean”.ema_coeff – An optional EMA coefficient to use if
reduce
is “mean” and nowindow
is provided. Note that if bothwindow
andema_coeff
are provided, an error is thrown. Also, ifema_coeff
is provided,reduce
must be “mean”. The reduction formula for EMA is: EMA(t1) = (1.0 - ema_coeff) * EMA(t0) + ema_coeff * new_valueclear_on_reduce – If True, all values under
key
will be emptied afterself.reduce()
is called. Setting this to True is useful for cases, in which the internal values list would otherwise grow indefinitely, for example if reduce is None and there is nowindow
provided.