Source code for ray.tune.search.sigopt.sigopt_search

import copy
import json
import os
import logging
import pickle
from pathlib import Path
from typing import Dict, List, Optional, Union

try:
    import sigopt as sgo

    Connection = sgo.Connection
except ImportError:
    sgo = None
    Connection = None

from ray.tune.result import DEFAULT_METRIC
from ray.tune.search import Searcher

logger = logging.getLogger(__name__)


def load_sigopt_key(overwrite: bool = False) -> bool:
    """Load SigOpt key from config file and save in environment.

    Args:
        overwrite: If True, will overwrite an existing SIGOPT_KEY env variable.

    Returns:
        True if a key was loaded into the environment.
    """
    if "SIGOPT_KEY" in os.environ and not overwrite:
        return False

    sigopt_key_file = Path("~/.sigopt/client/config.json").expanduser()
    if not sigopt_key_file.exists():
        return False

    try:
        with open(sigopt_key_file, "rt") as f:
            config = json.load(f)
        os.environ["SIGOPT_KEY"] = config["api_token"]
        return True
    except Exception:
        return False


[docs]class SigOptSearch(Searcher): """A wrapper around SigOpt to provide trial suggestions. You must install SigOpt and have a SigOpt API key to use this module. Store the API token as an environment variable ``SIGOPT_KEY`` as follows: .. code-block:: bash pip install -U sigopt export SIGOPT_KEY= ... You will need to use the `SigOpt experiment and space specification <https://docs.sigopt.com/ai-module-api-references/experiments>`_. This searcher manages its own concurrency. If this Searcher is used in a ``ConcurrencyLimiter``, the ``max_concurrent`` value passed to it will override the value passed here. Parameters: space: SigOpt configuration. Parameters will be sampled from this configuration and will be used to override parameters generated in the variant generation process. Not used if existing experiment_id is given name: Name of experiment. Required by SigOpt. max_concurrent: Number of maximum concurrent trials supported based on the user's SigOpt plan. Defaults to 1. If this Searcher is used in a ``ConcurrencyLimiter``, the ``max_concurrent`` value passed to it will override the value passed here. connection: An existing connection to SigOpt. experiment_id: Optional, if given will connect to an existing experiment. This allows for a more interactive experience with SigOpt, such as prior beliefs and constraints. observation_budget: Optional, can improve SigOpt performance. project: Optional, Project name to assign this experiment to. SigOpt can group experiments by project metric (str or list(str)): If str then the training result objective value attribute. If list(str) then a list of metrics that can be optimized together. SigOpt currently supports up to 2 metrics. mode: If experiment_id is given then this field is ignored, If str then must be one of {min, max}. If list then must be comprised of {min, max, obs}. Determines whether objective is minimizing or maximizing the metric attribute. If metrics is a list then mode must be a list of the same length as metric. Example: .. code-block:: python space = [ { 'name': 'width', 'type': 'int', 'bounds': { 'min': 0, 'max': 20 }, }, { 'name': 'height', 'type': 'int', 'bounds': { 'min': -100, 'max': 100 }, }, ] algo = SigOptSearch( space, name="SigOpt Example Experiment", metric="mean_loss", mode="min") Example: .. code-block:: python space = [ { 'name': 'width', 'type': 'int', 'bounds': { 'min': 0, 'max': 20 }, }, { 'name': 'height', 'type': 'int', 'bounds': { 'min': -100, 'max': 100 }, }, ] algo = SigOptSearch( space, name="SigOpt Multi Objective Example Experiment", metric=["average", "std"], mode=["max", "min"]) """ OBJECTIVE_MAP = { "max": {"objective": "maximize", "strategy": "optimize"}, "min": {"objective": "minimize", "strategy": "optimize"}, "obs": {"strategy": "store"}, } def __init__( self, space: List[Dict] = None, name: str = "Default Tune Experiment", max_concurrent: int = 1, connection: Optional[Connection] = None, experiment_id: Optional[str] = None, observation_budget: Optional[int] = None, project: Optional[str] = None, metric: Optional[Union[str, List[str]]] = "episode_reward_mean", mode: Optional[Union[str, List[str]]] = "max", points_to_evaluate: Optional[List[Dict]] = None, **kwargs, ): assert (experiment_id is None) ^ ( space is None ), "space xor experiment_id must be set" assert type(max_concurrent) is int and max_concurrent > 0 self._experiment_id = experiment_id self._name = name self._max_concurrent = max_concurrent self._connection = connection self._observation_budget = observation_budget self._project = project self._space = space self._metric = metric self._mode = mode self._live_trial_mapping = {} self._points_to_evaluate = points_to_evaluate self.experiment = None super(SigOptSearch, self).__init__(metric=metric, mode=mode, **kwargs) self._setup_optimizer() def _setup_optimizer(self): if self._metric is None and self._mode: # If only a mode was passed, use anonymous metric self._metric = DEFAULT_METRIC if self._mode is None: raise ValueError("`mode` argument passed to SigOptSearch must be set.") if isinstance(self._metric, str): self._metric = [self._metric] if isinstance(self._mode, str): self._mode = [self._mode] if self._connection is not None: self.conn = self._connection else: assert ( sgo is not None ), """SigOpt must be installed! You can install SigOpt with the command: `pip install -U sigopt`.""" load_sigopt_key() assert ( "SIGOPT_KEY" in os.environ ), "SigOpt API key must be stored as environ variable at SIGOPT_KEY" # Create a connection with SigOpt API, requires API key self.conn = sgo.Connection(client_token=os.environ["SIGOPT_KEY"]) if self._experiment_id is None: sigopt_params = dict( name=self._name, parameters=self._space, parallel_bandwidth=self._max_concurrent, ) if self._observation_budget is not None: sigopt_params["observation_budget"] = self._observation_budget if self._project is not None: sigopt_params["project"] = self._project if len(self._metric) > 1 and self._observation_budget is None: raise ValueError( "observation_budget is required for an" "experiment with more than one optimized metric" ) sigopt_params["metrics"] = self.serialize_metric(self._metric, self._mode) self.experiment = self.conn.experiments().create(**sigopt_params) else: self.experiment = self.conn.experiments(self._experiment_id).fetch() def set_search_properties( self, metric: Optional[str], mode: Optional[str], config: Dict, **spec ) -> bool: if config or self.experiment: # no automatic conversion of search space just yet return False if metric: self._metric = metric if mode: self._mode = mode self._setup_optimizer() return True def set_max_concurrency(self, max_concurrent: int) -> bool: self._max_concurrent = max_concurrent self.experiment = None return True def suggest(self, trial_id: str): # Required here and not in on __init__ # to make sure set_max_concurrency works correctly if not self.experiment: self._setup_optimizer() if self._max_concurrent: if len(self._live_trial_mapping) >= self._max_concurrent: return None suggestion_kwargs = {} if self._points_to_evaluate: config = self._points_to_evaluate.pop(0) suggestion_kwargs = {"assignments": config} # Get new suggestion from SigOpt suggestion = ( self.conn.experiments(self.experiment.id) .suggestions() .create(**suggestion_kwargs) ) self._live_trial_mapping[trial_id] = suggestion.id return copy.deepcopy(suggestion.assignments)
[docs] def on_trial_complete( self, trial_id: str, result: Optional[Dict] = None, error: bool = False ): """Notification for the completion of trial. If a trial fails, it will be reported as a failed Observation, telling the optimizer that the Suggestion led to a metric failure, which updates the feasible region and improves parameter recommendation. Creates SigOpt Observation object for trial. """ if result: payload = dict( suggestion=self._live_trial_mapping[trial_id], values=self.serialize_result(result), ) self.conn.experiments(self.experiment.id).observations().create(**payload) # Update the experiment object self.experiment = self.conn.experiments(self.experiment.id).fetch() elif error: # Reports a failed Observation self.conn.experiments(self.experiment.id).observations().create( failed=True, suggestion=self._live_trial_mapping[trial_id] ) del self._live_trial_mapping[trial_id]
[docs] @staticmethod def serialize_metric(metrics: List[str], modes: List[str]): """ Converts metrics to https://app.sigopt.com/docs/objects/metric """ serialized_metric = [] for metric, mode in zip(metrics, modes): serialized_metric.append( dict(name=metric, **SigOptSearch.OBJECTIVE_MAP[mode].copy()) ) return serialized_metric
[docs] def serialize_result(self, result: Dict): """ Converts experiments results to https://app.sigopt.com/docs/objects/metric_evaluation """ missing_scores = [metric for metric in self._metric if metric not in result] if missing_scores: raise ValueError( f"Some metrics specified during initialization are missing. " f"Missing metrics: {missing_scores}, provided result {result}" ) values = [] for metric in self._metric: value = dict(name=metric, value=result[metric]) values.append(value) return values
def save(self, checkpoint_path: str): trials_object = ( self.experiment.id, self._live_trial_mapping, self._points_to_evaluate, ) with open(checkpoint_path, "wb") as outputFile: pickle.dump(trials_object, outputFile) def restore(self, checkpoint_path: str): with open(checkpoint_path, "rb") as inputFile: trials_object = pickle.load(inputFile) ( experiment_id, self._live_trial_mapping, self._points_to_evaluate, ) = trials_object self.experiment = self.conn.experiments(experiment_id).fetch()