ray.rllib.utils.metrics.metrics_logger.MetricsLogger.aggregate#
- MetricsLogger.aggregate(stats_dicts: List[Dict[str, Any]], *, key: str | Tuple[str, ...] | None = None) None [source]#
Merges n stats_dicts and logs result by merging on the time axis with existing stats.
The n stats_dicts should be generated by n parallel components such that merging their respective stats in parallel is meaningful. This lets us aggregate stats in a tree structure of MetricsLoggers.
If you want to log a dictionary of values (not Stats objects), use
log_dict
.from ray.rllib.utils.metrics.metrics_logger import MetricsLogger from ray.rllib.utils.test_utils import check # Example: n Learners logging loss stats to be merged. # Note that losses should usually be logged with a window=1 so they don't # get smeared over time and instead provide an accurate picture of the # current situation. main_logger = MetricsLogger() logger_learner1 = MetricsLogger() logger_learner1.log_value("loss", 0.1, window=1) learner1_results = logger_learner1.reduce() logger_learner2 = MetricsLogger() logger_learner2.log_value("loss", 0.2, window=1) learner2_results = logger_learner2.reduce() # Merge the stats from both Learners. main_logger.aggregate( [learner1_results, learner2_results], key="learners", ) check(main_logger.peek(("learners", "loss")), 0.15) # Example: m EnvRunners logging episode returns to be merged. main_logger = MetricsLogger() logger_env_runner1 = MetricsLogger() logger_env_runner1.log_value("mean_ret", 100.0, window=3) logger_env_runner1.log_value("mean_ret", 200.0, window=3) logger_env_runner1.log_value("mean_ret", 300.0, window=3) logger_env_runner1.log_value("mean_ret", 400.0, window=3) env_runner1_results = logger_env_runner1.reduce() logger_env_runner2 = MetricsLogger() logger_env_runner2.log_value("mean_ret", 150.0, window=3) logger_env_runner2.log_value("mean_ret", 250.0, window=3) logger_env_runner2.log_value("mean_ret", 350.0, window=3) logger_env_runner2.log_value("mean_ret", 450.0, window=3) env_runner2_results = logger_env_runner2.reduce() # Merge the stats from both EnvRunners. main_logger.aggregate( [env_runner1_results, env_runner2_results], key="env_runners", ) # The expected procedure is as follows: # The individual internal values lists of the two loggers are as follows: # env runner 1: [200, 300, 400] # env runner 2: [250, 350, 450] # Move backwards from index=-1 (each time, loop through both env runners) # index=-1 -> [400, 450] -> mean -> [425] -> repeat 2 times (number # of env runners) -> [425, 425] # index=-2 -> [300, 350] -> mean -> [325] -> repeat 2 times # -> append -> [425, 425, 325, 325] -> STOP b/c we have reached >= window. # reverse the list -> [325, 325, 425, 425] # deque(max_len=3) -> [325, 425, 425] check( main_logger.stats["env_runners"]["mean_ret"].values, [325, 425, 425], ) check(main_logger.peek(("env_runners", "mean_ret")), (325 + 425 + 425) / 3) # Example: Lifetime sum over n parallel components' stats. main_logger = MetricsLogger() logger1 = MetricsLogger() logger1.log_value("some_stat", 50, reduce="sum", window=None) logger1.log_value("some_stat", 25, reduce="sum", window=None) logger1_results = logger1.reduce() logger2 = MetricsLogger() logger2.log_value("some_stat", 75, reduce="sum", window=None) logger2_results = logger2.reduce() # Merge the stats from both Learners. main_logger.aggregate([logger1_results, logger2_results]) check(main_logger.peek("some_stat"), 150) # Example: Sum over n parallel components' stats with a window of 3. main_logger = MetricsLogger() logger1 = MetricsLogger() logger1.log_value("some_stat", 50, reduce="sum", window=3) logger1.log_value("some_stat", 25, reduce="sum", window=3) logger1.log_value("some_stat", 10, reduce="sum", window=3) logger1.log_value("some_stat", 5, reduce="sum", window=3) logger1_results = logger1.reduce() logger2 = MetricsLogger() logger2.log_value("some_stat", 75, reduce="sum", window=3) logger2.log_value("some_stat", 100, reduce="sum", window=3) logger2_results = logger2.reduce() # Merge the stats from both Learners. main_logger.aggregate([logger1_results, logger2_results]) # The expected procedure is as follows: # The individual internal values lists of the two loggers are as follows: # env runner 1: [50, 25, 10, 5] # env runner 2: [75, 100] # Move backwards from index=-1 (each time, loop through both loggers) # index=-1 -> [5, 100] -> reduce over both two indices -> [(5 + 100) / 2, (5 + 100) / 2] = [52.5, 52.5] # Result = [52.5, 52.5] # len() = 2 < window = 3 # index=-2 -> [10, 75] -> reduce over both two indices -> [(10 + 75) / 2, (10 + 75) / 2] = [42.5, 42.5] # result = [42.5, 42.5, 52.5, 52.5] # len() = 4 >= window = 3 check(main_logger.peek("some_stat"), 147.5) # last 3 items (window) get sum'd
- Parameters:
stats_dicts – List of n stats dicts to be merged and then logged.
key – Optional top-level key under which to log all keys/key sequences found in the n
stats_dicts
.