import copy
import logging
from typing import Dict, List, Optional, Tuple
import ray
import ray.cloudpickle as pickle
from ray.tune.result import DEFAULT_METRIC
from ray.tune.search import (
UNDEFINED_METRIC_MODE,
UNDEFINED_SEARCH_SPACE,
UNRESOLVED_SEARCH_SPACE,
Searcher,
)
from ray.tune.search.sample import (
Categorical,
Domain,
Float,
Integer,
Quantized,
Uniform,
)
from ray.tune.search.variant_generator import parse_spec_vars
from ray.tune.utils.util import unflatten_dict
try:
import zoopt
from zoopt import Solution, ValueType
except ImportError:
zoopt = None
Solution = ValueType = None
logger = logging.getLogger(__name__)
[docs]
class ZOOptSearch(Searcher):
"""A wrapper around ZOOpt to provide trial suggestions.
ZOOptSearch is a library for derivative-free optimization. It is backed by
the `ZOOpt <https://github.com/polixir/ZOOpt>`__ package. Currently,
Asynchronous Sequential RAndomized COordinate Shrinking (ASRacos)
is implemented in Tune.
To use ZOOptSearch, install zoopt (>=0.4.1): ``pip install -U zoopt``.
Tune automatically converts search spaces to ZOOpt"s format:
.. code-block:: python
from ray import train, tune
from ray.tune.search.zoopt import ZOOptSearch
"config": {
"iterations": 10, # evaluation times
"width": tune.uniform(-10, 10),
"height": tune.uniform(-10, 10)
}
zoopt_search_config = {
"parallel_num": 8, # how many workers to parallel
}
zoopt_search = ZOOptSearch(
algo="Asracos", # only support Asracos currently
budget=20, # must match `num_samples` in `tune.TuneConfig()`.
dim_dict=dim_dict,
metric="mean_loss",
mode="min",
**zoopt_search_config
)
tuner = tune.Tuner(
my_objective,
tune_config=tune.TuneConfig(
search_alg=zoopt_search,
num_samples=20
),
run_config=train.RunConfig(
name="zoopt_search",
stop={"timesteps_total": 10}
),
param_space=config
)
tuner.fit()
If you would like to pass the search space manually, the code would
look like this:
.. code-block:: python
from ray import train, tune
from ray.tune.search.zoopt import ZOOptSearch
from zoopt import ValueType
dim_dict = {
"height": (ValueType.CONTINUOUS, [-10, 10], 1e-2),
"width": (ValueType.DISCRETE, [-10, 10], False),
"layers": (ValueType.GRID, [4, 8, 16])
}
"config": {
"iterations": 10, # evaluation times
}
zoopt_search_config = {
"parallel_num": 8, # how many workers to parallel
}
zoopt_search = ZOOptSearch(
algo="Asracos", # only support Asracos currently
budget=20, # must match `num_samples` in `tune.TuneConfig()`.
dim_dict=dim_dict,
metric="mean_loss",
mode="min",
**zoopt_search_config
)
tuner = tune.Tuner(
my_objective,
tune_config=tune.TuneConfig(
search_alg=zoopt_search,
num_samples=20
),
run_config=train.RunConfig(
name="zoopt_search",
stop={"timesteps_total": 10}
),
)
tuner.fit()
Parameters:
algo: To specify an algorithm in zoopt you want to use.
Only support ASRacos currently.
budget: Number of samples.
dim_dict: Dimension dictionary.
For continuous dimensions: (continuous, search_range, precision);
For discrete dimensions: (discrete, search_range, has_order);
For grid dimensions: (grid, grid_list).
More details can be found in zoopt package.
metric: The training result objective value attribute. If None
but a mode was passed, the anonymous metric `_metric` will be used
per default.
mode: One of {min, max}. Determines whether objective is
minimizing or maximizing the metric attribute.
points_to_evaluate: Initial parameter suggestions to be run
first. This is for when you already have some good parameters
you want to run first to help the algorithm make better suggestions
for future parameters. Needs to be a list of dicts containing the
configurations.
parallel_num: How many workers to parallel. Note that initial
phase may start less workers than this number. More details can
be found in zoopt package.
"""
optimizer = None
def __init__(
self,
algo: str = "asracos",
budget: Optional[int] = None,
dim_dict: Optional[Dict] = None,
metric: Optional[str] = None,
mode: Optional[str] = None,
points_to_evaluate: Optional[List[Dict]] = None,
parallel_num: int = 1,
**kwargs
):
assert (
zoopt is not None
), "ZOOpt not found - please install zoopt by `pip install -U zoopt`."
assert budget is not None, "`budget` should not be None!"
if mode:
assert mode in ["min", "max"], "`mode` must be 'min' or 'max'."
_algo = algo.lower()
assert _algo in [
"asracos",
"sracos",
], "`algo` must be in ['asracos', 'sracos'] currently"
self._algo = _algo
if isinstance(dim_dict, dict) and dim_dict:
resolved_vars, domain_vars, grid_vars = parse_spec_vars(dim_dict)
if domain_vars or grid_vars:
logger.warning(
UNRESOLVED_SEARCH_SPACE.format(par="dim_dict", cls=type(self))
)
dim_dict = self.convert_search_space(dim_dict, join=True)
self._dim_dict = dim_dict
self._budget = budget
self._metric = metric
if mode == "max":
self._metric_op = -1.0
elif mode == "min":
self._metric_op = 1.0
self._points_to_evaluate = copy.deepcopy(points_to_evaluate)
self._live_trial_mapping = {}
self._dim_keys = []
self.solution_dict = {}
self.best_solution_list = []
self.optimizer = None
self.kwargs = kwargs
self.parallel_num = parallel_num
super(ZOOptSearch, self).__init__(metric=self._metric, mode=mode)
if self._dim_dict:
self._setup_zoopt()
def _setup_zoopt(self):
if self._metric is None and self._mode:
# If only a mode was passed, use anonymous metric
self._metric = DEFAULT_METRIC
_dim_list = []
for k in self._dim_dict:
self._dim_keys.append(k)
_dim_list.append(self._dim_dict[k])
init_samples = None
if self._points_to_evaluate:
logger.warning(
"`points_to_evaluate` is ignored by ZOOpt in versions <= 0.4.1."
)
init_samples = [
Solution(x=tuple(point[dim] for dim in self._dim_keys))
for point in self._points_to_evaluate
]
dim = zoopt.Dimension2(_dim_list)
par = zoopt.Parameter(budget=self._budget, init_samples=init_samples)
if self._algo == "sracos" or self._algo == "asracos":
from zoopt.algos.opt_algorithms.racos.sracos import SRacosTune
self.optimizer = SRacosTune(
dimension=dim,
parameter=par,
parallel_num=self.parallel_num,
**self.kwargs
)
def set_search_properties(
self, metric: Optional[str], mode: Optional[str], config: Dict, **spec
) -> bool:
if self._dim_dict:
return False
space = self.convert_search_space(config)
self._dim_dict = space
if metric:
self._metric = metric
if mode:
self._mode = mode
if self._mode == "max":
self._metric_op = -1.0
elif self._mode == "min":
self._metric_op = 1.0
self._setup_zoopt()
return True
def suggest(self, trial_id: str) -> Optional[Dict]:
if not self._dim_dict or not self.optimizer:
raise RuntimeError(
UNDEFINED_SEARCH_SPACE.format(
cls=self.__class__.__name__, space="dim_dict"
)
)
if not self._metric or not self._mode:
raise RuntimeError(
UNDEFINED_METRIC_MODE.format(
cls=self.__class__.__name__, metric=self._metric, mode=self._mode
)
)
_solution = self.optimizer.suggest()
if _solution == "FINISHED":
if ray.__version__ >= "0.8.7":
return Searcher.FINISHED
else:
return None
if _solution:
self.solution_dict[str(trial_id)] = _solution
_x = _solution.get_x()
new_trial = dict(zip(self._dim_keys, _x))
self._live_trial_mapping[trial_id] = new_trial
return unflatten_dict(new_trial)
[docs]
def on_trial_complete(
self, trial_id: str, result: Optional[Dict] = None, error: bool = False
):
"""Notification for the completion of trial."""
if result:
_solution = self.solution_dict[str(trial_id)]
_best_solution_so_far = self.optimizer.complete(
_solution, self._metric_op * result[self._metric]
)
if _best_solution_so_far:
self.best_solution_list.append(_best_solution_so_far)
del self._live_trial_mapping[trial_id]
def save(self, checkpoint_path: str):
save_object = self.__dict__
with open(checkpoint_path, "wb") as outputFile:
pickle.dump(save_object, outputFile)
def restore(self, checkpoint_path: str):
with open(checkpoint_path, "rb") as inputFile:
save_object = pickle.load(inputFile)
self.__dict__.update(save_object)
@staticmethod
def convert_search_space(spec: Dict, join: bool = False) -> Dict[str, Tuple]:
spec = copy.deepcopy(spec)
resolved_vars, domain_vars, grid_vars = parse_spec_vars(spec)
if not domain_vars and not grid_vars:
return {}
if grid_vars:
raise ValueError(
"Grid search parameters cannot be automatically converted "
"to a ZOOpt search space."
)
def resolve_value(domain: Domain) -> Tuple:
quantize = None
sampler = domain.get_sampler()
if isinstance(sampler, Quantized):
quantize = sampler.q
sampler = sampler.sampler
if isinstance(domain, Float):
precision = quantize or 1e-12
if isinstance(sampler, Uniform):
return (
ValueType.CONTINUOUS,
[domain.lower, domain.upper],
precision,
)
elif isinstance(domain, Integer):
if isinstance(sampler, Uniform):
return (ValueType.DISCRETE, [domain.lower, domain.upper - 1], True)
elif isinstance(domain, Categorical):
# Categorical variables would use ValueType.DISCRETE with
# has_partial_order=False, however, currently we do not
# keep track of category values and cannot automatically
# translate back and forth between them.
if isinstance(sampler, Uniform):
return (ValueType.GRID, domain.categories)
raise ValueError(
"ZOOpt does not support parameters of type "
"`{}` with samplers of type `{}`".format(
type(domain).__name__, type(domain.sampler).__name__
)
)
conv_spec = {
"/".join(path): resolve_value(domain) for path, domain in domain_vars
}
if join:
spec.update(conv_spec)
conv_spec = spec
return conv_spec