import numpy
import logging
import sherpa
from sherpa.algorithms import Algorithm
import pandas
from sherpa.core import Choice, Continuous, Discrete, Ordinal
import collections
import GPyOpt as gpyopt_package
import GPy
import warnings
bayesoptlogger = logging.getLogger(__name__)
[docs]class GPyOpt(Algorithm):
"""
Sherpa wrapper around the GPyOpt package
(https://github.com/SheffieldML/GPyOpt).
Args:
model_type (str): The model used:
- 'GP', standard Gaussian process.
- 'GP_MCMC', Gaussian process with prior in the hyper-parameters.
- 'sparseGP', sparse Gaussian process.
- 'warperdGP', warped Gaussian process.
- 'InputWarpedGP', input warped Gaussian process
- 'RF', random forest (scikit-learn).
num_initial_data_points (int): Number of data points to collect before
fitting model. Needs to be greater/equal to the number of hyper-
parameters that are being optimized. Using default 'infer' corres-
ponds to number of hyperparameters + 1 or 0 if results are not empty.
initial_data_points (list[dict] or pandas.Dataframe): Specifies initial
data points. If len(initial_data_points)<num_initial_data_points
then the rest is randomly sampled. Use this option to provide
hyperparameter configurations that are known to be good.
acquisition_type (str): Type of acquisition function to use.
- 'EI', expected improvement.
- 'EI_MCMC', integrated expected improvement (requires GP_MCMC model).
- 'MPI', maximum probability of improvement.
- 'MPI_MCMC', maximum probability of improvement (requires GP_MCMC model).
- 'LCB', GP-Lower confidence bound.
- 'LCB_MCMC', integrated GP-Lower confidence bound (requires GP_MCMC model).
max_concurrent (int): The number of concurrent trials. This generates
a batch of max_concurrent trials from GPyOpt to evaluate. If a new
observation becomes available, the model is re-evaluated and a new
batch is created regardless of whether the previous batch was used
up. The used method is local penalization.
verbosity (bool): Print models and other options during the optimization.
max_num_trials (int): maximum number of trials to run for.
"""
allows_repetition = True
def __init__(self, model_type='GP', num_initial_data_points='infer',
initial_data_points=[], acquisition_type='EI',
max_concurrent=4, verbosity=False, max_num_trials=None):
self.model_type = model_type
assert (num_initial_data_points == 'infer'
or isinstance(num_initial_data_points, int)),\
"num_initial_data_points needs to be 'infer' or int."
self.num_initial_data_points = num_initial_data_points
self._num_initial_data_points = -1
self.initial_data_points = initial_data_points
self.acquisition_type = acquisition_type
assert model_type != 'GP_MCMC' and acquisition_type != 'EI_MCMC'\
if max_concurrent > 1 else True,\
"GPyOpt has a bug for _MCMC with batch size > 1."
self.max_concurrent = max_concurrent
self.verbosity = verbosity
self.next_trials = collections.deque()
self.num_points_seen_by_model = 0
self.random_search = sherpa.algorithms.RandomSearch()
self.domain = []
self.max_num_trials = max_num_trials
self.count = 0
def get_suggestion(self, parameters, results, lower_is_better):
self.count += 1
if self.max_num_trials and self.count > self.max_num_trials:
return None
# setup
if self._num_initial_data_points == -1:
self._num_initial_data_points = self._infer_num_initial_data_points(
self.num_initial_data_points,
parameters)
self.next_trials.extend(
self._process_initial_data_points(self.initial_data_points,
parameters))
num_completed_trials = self._num_completed_trials(results)
if (num_completed_trials >= self._num_initial_data_points
and num_completed_trials > self.num_points_seen_by_model):
# generate a new batch from bayes opt and set it as next
# observations
# clear previous batch since new data is available
self.next_trials.clear()
X, y, y_var = self._prepare_data_for_bayes_opt(parameters, results)
domain = self._initialize_domain(parameters)
batch = self._generate_bayesopt_batch(X, y, lower_is_better, domain)
batch_list_of_dicts = self._reverse_to_sherpa_format(batch,
parameters)
self.next_trials.extend(batch_list_of_dicts)
self.num_points_seen_by_model = num_completed_trials
if len(self.next_trials) == 0:
random_trial = self.random_search.get_suggestion(parameters,
results,
lower_is_better)
self.next_trials.append(random_trial)
return self.next_trials.popleft()
@classmethod
def _num_completed_trials(cls, results):
return (len(results.query("Status == 'COMPLETED'"))
if results is not None and len(results) > 0 else 0)
def _generate_bayesopt_batch(self, X, y, lower_is_better, domain):
y_adjusted = y * (-1)**(not lower_is_better)
bo_step = gpyopt_package.methods.BayesianOptimization(f=None,
domain=domain,
X=X, Y=y_adjusted,
acquisition_type=self.acquisition_type,
evaluator_type='local_penalization',
batch_size=self.max_concurrent,
verbosity=self.verbosity,
maximize=False,
exact_feval=False,
model_type=self.model_type)
return bo_step.suggest_next_locations()
def get_best_pred(self, parameters, results, lower_is_better):
if self._num_completed_trials(results) >= self._num_initial_data_points:
X, y, y_var = self._prepare_data_for_bayes_opt(parameters, results)
domain = self._initialize_domain(parameters)
best_pred = self._generate_best_predicted(X, y, lower_is_better, domain)
list_of_dict = self._reverse_to_sherpa_format(best_pred,
parameters)
return list_of_dict[0]
else:
return {}
def _generate_best_predicted(self, X, y, lower_is_better, domain):
"""
This is a work-around to utilize GPyOpt's maximizers for the acquisition
function to get the parameter setting that is the best as predicted by
the model.
"""
y_adjusted = y * (-1)**(not lower_is_better)
bo = gpyopt_package.methods.BayesianOptimization(f=None,
domain=domain,
X=X,
Y=y_adjusted,
acquisition_type='LCB',
batch_size=1,
verbosity=self.verbosity,
maximize=False,
exact_feval=False,
model_type=self.model_type)
bo.acquisition.exploration_weight = 0.
return bo.suggest_next_locations()
@staticmethod
def _infer_num_initial_data_points(num_initial_data_points,
parameters):
"""
Infers number of initial data points, or overwrites and warns user if
she defined less than the number of points needed.
"""
if num_initial_data_points == 'infer':
return len(parameters) + 1
elif num_initial_data_points >= len(parameters):
return num_initial_data_points
else:
warnings.warn("num_initial_data_points < number of "
"parameters found. Setting "
"num_initial_data_points to "
"len(parameters)+1.", UserWarning)
return len(parameters) + 1
@staticmethod
def _process_initial_data_points(initial_data_points, parameters):
"""
Turns initial_data_points into list of dicts (if Pandas.DataFrame) and
assures that all defined parameters have settings in the
initial_data_points.
"""
if isinstance(initial_data_points, pandas.DataFrame):
_initial_data_points = list(initial_data_points.T.to_dict().values())
else:
_initial_data_points = initial_data_points
for p in parameters:
if not all(p.name in data_point
for data_point in _initial_data_points):
raise ValueError("Missing parameter in initial_data_point. Check that you " \
"included all specified hyperparameters.")
return _initial_data_points
@staticmethod
def _prepare_data_for_bayes_opt(parameters, results):
"""
Turn historical data from Sherpa results dataframe into design matrix
X and objective values y to be consumed by GPyOpt.
"""
completed = results.query("Status == 'COMPLETED'")
X = numpy.zeros((len(completed), len(parameters)))
for i, p in enumerate(parameters):
transform = ParameterTransform.from_parameter(p)
historical_data = completed[p.name]
X[:, i] = transform.sherpa_format_to_gpyopt_design_format(
historical_data)
y = numpy.array(completed.Objective).reshape((-1, 1))
if 'ObjectiveStdErr' in completed.columns:
y_var = numpy.array(completed.ObjectiveStdErr).reshape((-1, 1))
else:
y_var = None
return X, y, y_var
@staticmethod
def _initialize_domain(parameters):
"""
Turn Sherpa parameter definitions into GPyOpt parameter definitions.
"""
domain = []
for p in parameters:
domain.append(
ParameterTransform.from_parameter(p).to_gpyopt_domain())
return domain
@staticmethod
def _reverse_to_sherpa_format(X_next, parameters):
"""
Turn design matrix from GPyOpt back into a list of dictionaries with
Sherpa-style parameters.
"""
col_dict = {}
for i, p in enumerate(parameters):
transform = ParameterTransform.from_parameter(p)
col_dict[p.name] = transform.gpyopt_design_format_to_list_in_sherpa_format(X_next[:, i])
return list(pandas.DataFrame(col_dict).T.to_dict().values())
class ParameterTransform(object):
"""
ParamterTransform base class, creates correct object
depending on parameter.
"""
def __init__(self, parameter):
self.parameter = parameter
@staticmethod
def from_parameter(parameter):
if isinstance(parameter, Choice) or isinstance(parameter, Ordinal):
return ChoiceTransform(parameter)
elif isinstance(parameter, Continuous):
if parameter.scale == 'log':
return LogContinuousTransform(parameter)
else:
return ContinuousTransform(parameter)
elif isinstance(parameter, Discrete):
if parameter.scale == 'log':
warnings.warn("GPyOpt discrete parameter does not "
"support log-scale.", UserWarning)
return DiscreteTransform(parameter)
def to_gpyopt_domain(self):
raise NotImplementedError
def gpyopt_design_format_to_list_in_sherpa_format(self, x):
raise NotImplementedError
def sherpa_format_to_gpyopt_design_format(self, x):
raise NotImplementedError
class ContinuousTransform(ParameterTransform):
"""
Transforms/reverses Continuous variables.
"""
def to_gpyopt_domain(self):
return {'name': self.parameter.name,
'type': 'continuous',
'domain': tuple(self.parameter.range)}
def gpyopt_design_format_to_list_in_sherpa_format(self, x):
return x
def sherpa_format_to_gpyopt_design_format(self, x):
return x
class LogContinuousTransform(ParameterTransform):
"""
Transforms/reverses Continuous variables if on log-scale.
"""
def to_gpyopt_domain(self):
return {'name': self.parameter.name,
'type': 'continuous',
'domain': (numpy.log10(self.parameter.range[0]),
numpy.log10(self.parameter.range[1]))}
def gpyopt_design_format_to_list_in_sherpa_format(self, x):
return 10**x
def sherpa_format_to_gpyopt_design_format(self, x):
return numpy.log10(x)
class ChoiceTransform(ParameterTransform):
"""
Transforms/reverses Choice variables to numeric choices since GPyOpt
does not accept string choices.
"""
def to_gpyopt_domain(self):
return {'name': self.parameter.name, 'type': 'categorical',
'domain': numpy.array(range(len(self.parameter.range)))}
def gpyopt_design_format_to_list_in_sherpa_format(self, x):
return [self.parameter.range[int(elem)] for elem in x]
def sherpa_format_to_gpyopt_design_format(self, x):
return [self.parameter.range.index(elem) for elem in x]
class DiscreteTransform(ParameterTransform):
"""
Transforms Discrete parameter from/to GPyOpt
"""
def to_gpyopt_domain(self):
return {'name': self.parameter.name,
'type': 'discrete',
'domain': tuple(range(self.parameter.range[0],
self.parameter.range[1]+1))}
def gpyopt_design_format_to_list_in_sherpa_format(self, x):
return list(x.astype('int'))
def sherpa_format_to_gpyopt_design_format(self, x):
return x