from functools import partial
import os
import pandas as pd
from typing import Optional, Union
from ray.air.result import Result
from ray.cloudpickle import cloudpickle
from ray.exceptions import RayTaskError
from ray.tune.analysis import ExperimentAnalysis
from ray.tune.error import TuneError
from ray.tune.experiment import Trial
from ray.tune.trainable.util import TrainableUtil
from ray.util import PublicAPI
[docs]@PublicAPI(stability="beta")
class ResultGrid:
"""A set of ``Result`` objects for interacting with Ray Tune results.
You can use it to inspect the trials and obtain the best result.
The constructor is a private API. This object can only be created as a result of
``Tuner.fit()``.
Example:
.. testcode::
import random
from ray import air, tune
def random_error_trainable(config):
if random.random() < 0.5:
return {"loss": 0.0}
else:
raise ValueError("This is an error")
tuner = tune.Tuner(
random_error_trainable,
run_config=air.RunConfig(name="example-experiment"),
tune_config=tune.TuneConfig(num_samples=10),
)
try:
result_grid = tuner.fit()
except ValueError:
pass
for i in range(len(result_grid)):
result = result_grid[i]
if not result.error:
print(f"Trial finishes successfully with metrics"
f"{result.metrics}.")
else:
print(f"Trial failed with error {result.error}.")
.. testoutput::
:hide:
...
You can also use ``result_grid`` for more advanced analysis.
>>> # Get the best result based on a particular metric.
>>> best_result = result_grid.get_best_result( # doctest: +SKIP
... metric="loss", mode="min")
>>> # Get the best checkpoint corresponding to the best result.
>>> best_checkpoint = best_result.checkpoint # doctest: +SKIP
>>> # Get a dataframe for the last reported results of all of the trials
>>> df = result_grid.get_dataframe() # doctest: +SKIP
>>> # Get a dataframe for the minimum loss seen for each trial
>>> df = result_grid.get_dataframe(metric="loss", mode="min") # doctest: +SKIP
Note that trials of all statuses are included in the final result grid.
If a trial is not in terminated state, its latest result and checkpoint as
seen by Tune will be provided.
See :doc:`/tune/examples/tune_analyze_results` for more usage examples.
"""
def __init__(
self,
experiment_analysis: ExperimentAnalysis,
):
self._experiment_analysis = experiment_analysis
self._results = [
self._trial_to_result(trial) for trial in self._experiment_analysis.trials
]
@property
def _local_path(self) -> str:
"""Return path pointing to the experiment directory on the local disk."""
return self._experiment_analysis._local_path
@property
def _remote_path(self) -> Optional[str]:
"""Return path pointing to the experiment directory on remote storage."""
return self._experiment_analysis._remote_path
@property
def experiment_path(self) -> str:
"""Path pointing to the experiment directory on persistent storage.
This can point to a remote storage location (e.g. S3) or to a local
location (path on the head node).
For instance, if your remote storage path is ``s3://bucket/location``,
this will point to ``s3://bucket/location/experiment_name``.
"""
return self._remote_path or self._local_path
[docs] def get_best_result(
self,
metric: Optional[str] = None,
mode: Optional[str] = None,
scope: str = "last",
filter_nan_and_inf: bool = True,
) -> Result:
"""Get the best result from all the trials run.
Args:
metric: Key for trial info to order on. Defaults to
the metric specified in your Tuner's ``TuneConfig``.
mode: One of [min, max]. Defaults to the mode specified
in your Tuner's ``TuneConfig``.
scope: One of [all, last, avg, last-5-avg, last-10-avg].
If `scope=last`, only look at each trial's final step for
`metric`, and compare across trials based on `mode=[min,max]`.
If `scope=avg`, consider the simple average over all steps
for `metric` and compare across trials based on
`mode=[min,max]`. If `scope=last-5-avg` or `scope=last-10-avg`,
consider the simple average over the last 5 or 10 steps for
`metric` and compare across trials based on `mode=[min,max]`.
If `scope=all`, find each trial's min/max score for `metric`
based on `mode`, and compare trials based on `mode=[min,max]`.
filter_nan_and_inf: If True (default), NaN or infinite
values are disregarded and these trials are never selected as
the best trial.
"""
if len(self._experiment_analysis.trials) == 1:
return self._trial_to_result(self._experiment_analysis.trials[0])
if not metric and not self._experiment_analysis.default_metric:
raise ValueError(
"No metric is provided. Either pass in a `metric` arg to "
"`get_best_result` or specify a metric in the "
"`TuneConfig` of your `Tuner`."
)
if not mode and not self._experiment_analysis.default_mode:
raise ValueError(
"No mode is provided. Either pass in a `mode` arg to "
"`get_best_result` or specify a mode in the "
"`TuneConfig` of your `Tuner`."
)
best_trial = self._experiment_analysis.get_best_trial(
metric=metric,
mode=mode,
scope=scope,
filter_nan_and_inf=filter_nan_and_inf,
)
if not best_trial:
error_msg = (
"No best trial found for the given metric: "
f"{metric or self._experiment_analysis.default_metric}. "
"This means that no trial has reported this metric"
)
error_msg += (
", or all values reported for this metric are NaN. To not ignore NaN "
"values, you can set the `filter_nan_and_inf` arg to False."
if filter_nan_and_inf
else "."
)
raise RuntimeError(error_msg)
return self._trial_to_result(best_trial)
[docs] def get_dataframe(
self,
filter_metric: Optional[str] = None,
filter_mode: Optional[str] = None,
) -> pd.DataFrame:
"""Return dataframe of all trials with their configs and reported results.
Per default, this returns the last reported results for each trial.
If ``filter_metric`` and ``filter_mode`` are set, the results from each
trial are filtered for this metric and mode. For example, if
``filter_metric="some_metric"`` and ``filter_mode="max"``, for each trial,
every received result is checked, and the one where ``some_metric`` is
maximal is returned.
Example:
.. testcode::
from ray.air import session
from ray.air.config import RunConfig
from ray.tune import Tuner
def training_loop_per_worker(config):
session.report({"accuracy": 0.8})
result_grid = Tuner(
trainable=training_loop_per_worker,
run_config=RunConfig(name="my_tune_run")
).fit()
# Get last reported results per trial
df = result_grid.get_dataframe()
# Get best ever reported accuracy per trial
df = result_grid.get_dataframe(
filter_metric="accuracy", filter_mode="max"
)
.. testoutput::
:hide:
...
Args:
filter_metric: Metric to filter best result for.
filter_mode: If ``filter_metric`` is given, one of ``["min", "max"]``
to specify if we should find the minimum or maximum result.
Returns:
Pandas DataFrame with each trial as a row and their results as columns.
"""
return self._experiment_analysis.dataframe(
metric=filter_metric, mode=filter_mode
)
def __len__(self) -> int:
return len(self._results)
def __getitem__(self, i: int) -> Result:
"""Returns the i'th result in the grid."""
return self._results[i]
@property
def errors(self):
"""Returns the exceptions of errored trials."""
return [result.error for result in self if result.error]
@property
def num_errors(self):
"""Returns the number of errored trials."""
return len(
[t for t in self._experiment_analysis.trials if t.status == Trial.ERROR]
)
@property
def num_terminated(self):
"""Returns the number of terminated (but not errored) trials."""
return len(
[
t
for t in self._experiment_analysis.trials
if t.status == Trial.TERMINATED
]
)
@staticmethod
def _populate_exception(trial: Trial) -> Optional[Union[TuneError, RayTaskError]]:
if trial.pickled_error_file and os.path.exists(trial.pickled_error_file):
with open(trial.pickled_error_file, "rb") as f:
e = cloudpickle.load(f)
return e
elif trial.error_file and os.path.exists(trial.error_file):
with open(trial.error_file, "r") as f:
return TuneError(f.read())
return None
def _trial_to_result(self, trial: Trial) -> Result:
local_to_remote_path_fn = (
partial(
TrainableUtil.get_remote_storage_path,
local_path_prefix=trial.local_path,
remote_path_prefix=trial.remote_path,
)
if trial.uses_cloud_checkpointing
else None
)
checkpoint = trial.checkpoint.to_air_checkpoint(
local_to_remote_path_fn,
)
best_checkpoints = [
(
checkpoint.to_air_checkpoint(local_to_remote_path_fn),
checkpoint.metrics,
)
for checkpoint in trial.get_trial_checkpoints()
]
result = Result(
checkpoint=checkpoint,
metrics=trial.last_result.copy(),
error=self._populate_exception(trial),
_local_path=trial.local_path,
_remote_path=trial.remote_path,
metrics_dataframe=self._experiment_analysis.trial_dataframes.get(
trial.local_path
)
if self._experiment_analysis.trial_dataframes
else None,
best_checkpoints=best_checkpoints,
)
return result
def __repr__(self) -> str:
all_results_repr = [result._repr(indent=2) for result in self]
all_results_repr = ",\n".join(all_results_repr)
return f"ResultGrid<[\n{all_results_repr}\n]>"