|
@@ -5,11 +5,15 @@ Created on Wed Sep 30 14:23:23 2020
|
|
|
|
|
|
@author: tanya
|
|
|
@description: an abstract class for selecting a machine learning
|
|
|
- pipeline in a space of parameter distributions over multiple pipelines.
|
|
|
- The selection is though in such a way that a Trials object is being
|
|
|
+ pipeline from a space (deterministic or random) of parameter distributions
|
|
|
+ over multiple pipelines.
|
|
|
+ The selection is thought in such a way that a Trials object is being
|
|
|
maintained during the tuning process from which one can retrieve
|
|
|
- the best pipeline so far as well as the entire tuning history
|
|
|
- if needed.
|
|
|
+ the best pipeline so far
|
|
|
+ as well as the entire tuning history if needed.
|
|
|
+ Methods configure_cross_validation and configure_result_saving
|
|
|
+ allow to use a custom cross-validation method and
|
|
|
+ save the current best result in a file or database during training.
|
|
|
Children classes: hyperopt and custom gridsearch.
|
|
|
"""
|
|
|
|
|
@@ -18,12 +22,13 @@ import os
|
|
|
import sys
|
|
|
import time
|
|
|
import datetime
|
|
|
-from typing import Callable
|
|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
from abc import ABC, abstractmethod, abstractproperty
|
|
|
+from typing import Callable
|
|
|
+import functools
|
|
|
from sklearn.pipeline import Pipeline
|
|
|
-from sklearn.model_selection import cross_validate as sklearn_cross_validator
|
|
|
+from sklearn.model_selection import cross_validate as sklearn_cross_validation
|
|
|
from sklearn.metrics import make_scorer
|
|
|
from hyperopt import STATUS_OK, STATUS_FAIL
|
|
|
from cdplib.log import Log
|
|
@@ -36,20 +41,28 @@ sys.path.append(os.getcwd())
|
|
|
class PipelineSelector(ABC):
|
|
|
"""
|
|
|
An abstract class for selecting a machine learning
|
|
|
- pipeline in a space of parameter distributions over multiple pipelines.
|
|
|
+ pipeline from a space (deterministic or random) of parameter
|
|
|
+ distributions over multiple pipelines.
|
|
|
The selection is though in such a way that a Trials object is being
|
|
|
maintained during the tuning process from which one can retrieve
|
|
|
the best pipeline so far as well as the entire tuning history
|
|
|
if needed.
|
|
|
+ Methods configure_cross_validation and configure_result_saving
|
|
|
+ allow to use a custom cross-validation method and
|
|
|
+ save the current best result in a file or database during training.
|
|
|
+ Children classes: hyperopt and custom gridsearch.
|
|
|
"""
|
|
|
def __init__(self,
|
|
|
- cost_func,
|
|
|
+ cost_func: (Callable, str),
|
|
|
greater_is_better: bool,
|
|
|
trials_path: str,
|
|
|
- backup_trials_freq: int = 1,
|
|
|
- cross_val_averaging_func: callable = None):
|
|
|
- '''
|
|
|
- :param callable cost_func: function to minimize or maximize
|
|
|
+ backup_trials_freq: int = None,
|
|
|
+ cross_val_averaging_func: Callable = None,
|
|
|
+ additional_metrics: dict = None,
|
|
|
+ strategy_name: str = None,
|
|
|
+ stdout_log_level: str = "INFO"):
|
|
|
+ """
|
|
|
+ :param Callable cost_func: function to minimize or maximize
|
|
|
|
|
|
:param bool greater_is_better: when True
|
|
|
cost_func is maximized, else minimized.
|
|
@@ -64,67 +77,98 @@ class PipelineSelector(ABC):
|
|
|
|
|
|
:param backup_trials_freq: frequecy in interations (trials)
|
|
|
of saving the trials object at the trials_path.
|
|
|
+ if None, the trials object is backed up avery time
|
|
|
+ the score improves.
|
|
|
|
|
|
:param str log_path: Optional, when not provided logs to stdout.
|
|
|
|
|
|
- :param callable cross_val_averaging_func: optional,
|
|
|
+ :param Callable cross_val_averaging_func: optional,
|
|
|
when not provided set to mean. Function
|
|
|
to aggregate the cross-validated values of the cost function.
|
|
|
Classic situation is to take the mean,
|
|
|
another example is, for example mean() - c*var().
|
|
|
- '''
|
|
|
- self._logger = Log("PipelineSelector: ")
|
|
|
+
|
|
|
+ :param additional_metics: dict of additional metrics to save
|
|
|
+ of the form {"metric_name": metric} where metric is a Callable.
|
|
|
+
|
|
|
+ :param str strategy_name: a name might be asigned to the trials,
|
|
|
+ a strategy is defined by the data set, cv object, cost function.
|
|
|
+ When the strategy changes, one should start with new trials.
|
|
|
+
|
|
|
+ :param str stdout_log_level: can be INFO, WARNING, ERROR
|
|
|
+ """
|
|
|
+ self._logger = Log("PipelineSelector: ",
|
|
|
+ stdout_log_level=stdout_log_level)
|
|
|
|
|
|
input_errors = [(cost_func, Callable,
|
|
|
- "Parameter 'cost_func' must be a callable"),
|
|
|
+ "Parameter 'cost_func' must be a Callable"),
|
|
|
(greater_is_better, bool,
|
|
|
"Parameter 'greater_is_better' must be bool type"),
|
|
|
(trials_path, str,
|
|
|
"Parameter 'trials_path' must be of string type"),
|
|
|
(cross_val_averaging_func, (Callable, None.__class__),
|
|
|
("Parameter 'cross_val_averaging_func'"
|
|
|
- "must be a callable")),
|
|
|
- (backup_trials_freq, int,
|
|
|
- "Parameter backup_trials_freq must be an int")]
|
|
|
+ "must be a Callable")),
|
|
|
+ (backup_trials_freq, (int, None.__class__),
|
|
|
+ "Parameter backup_trials_freq must be an int"),
|
|
|
+ (additional_metrics, (dict, None.__class__),
|
|
|
+ "Parameter additional_metrics must be a dict"),
|
|
|
+ (strategy_name, (str, None.__class__),
|
|
|
+ "Parameter strategy_name must be a str"),
|
|
|
+ (stdout_log_level, str,
|
|
|
+ "Parameter stdout_log_level must be a str")]
|
|
|
|
|
|
for p, t, err in input_errors:
|
|
|
try:
|
|
|
assert(isinstance(p, t))
|
|
|
except AssertionError:
|
|
|
- self._logger.log_and_raise_error(err)
|
|
|
+ self._logger.log_and_raise_error(err, ErrorType=NameError)
|
|
|
+
|
|
|
+ try:
|
|
|
+ assert((additional_metrics is None) or
|
|
|
+ all([isinstance(metric, Callable)
|
|
|
+ for metric in additional_metrics.values()]))
|
|
|
+ except AssertionError:
|
|
|
+ err = "Metrics in additional_metrics must be Callables"
|
|
|
+ self._logger.log_and_raise_error(err, ErrorType=NameError)
|
|
|
|
|
|
ExceptionsHandler(self._logger).assert_is_directory(path=trials_path)
|
|
|
|
|
|
self._cost_func = cost_func
|
|
|
- # is 1 when cost_func is minimized, -1 when cost func is maximized
|
|
|
+ # score factor is 1 when cost_func is minimized,
|
|
|
+ # -1 when cost func is maximized
|
|
|
self._score_factor = (not greater_is_better) - greater_is_better
|
|
|
- self._trials_path = trials_path
|
|
|
+ self.trials_path = trials_path
|
|
|
self._backup_trials_freq = backup_trials_freq
|
|
|
self._cross_val_averaging_func = cross_val_averaging_func or np.mean
|
|
|
- # keeping track of the current search iteration
|
|
|
- self._run_number = 0
|
|
|
- # space and data need to be attached to perform search.
|
|
|
- self._space_attached = False
|
|
|
- self._data_attached = False
|
|
|
- self._cross_validator_attached = False
|
|
|
- # _best_score is the same as best_trial_score property
|
|
|
- # but is defined in order not to go through all the trials
|
|
|
- # at each iteration.
|
|
|
- self._best_score = np.nan
|
|
|
+ self._additional_metrics = additional_metrics or {}
|
|
|
+ self._strategy_name = strategy_name
|
|
|
+ self._data_path = None
|
|
|
+ self._cv_path = None
|
|
|
+
|
|
|
+ # best_score can be also read from trials
|
|
|
+ # but is kept explicitely in order not to
|
|
|
+ # search through the trials object every time
|
|
|
+ # loss is the opposite of score
|
|
|
+ self.best_score = np.nan
|
|
|
+
|
|
|
+ self._cross_validation = sklearn_cross_validation
|
|
|
|
|
|
# if a trials object already exists at the given path,
|
|
|
# it is loaded and the search is continued. Else,
|
|
|
# the search is started from the beginning.
|
|
|
- if os.path.isfile(trials_path):
|
|
|
+ if os.path.isfile(self.trials_path):
|
|
|
try:
|
|
|
- with open(trials_path, "rb") as f:
|
|
|
+ with open(self.trials_path, "rb") as f:
|
|
|
self._trials = pickle.load(f)
|
|
|
|
|
|
- self._best_score = self.best_trial_score
|
|
|
+ self._start_iteration = self.number_of_trials
|
|
|
+
|
|
|
+ self.best_score = self.best_trial_score
|
|
|
|
|
|
self._logger.info(("Loaded an existing trials object"
|
|
|
"Consisting of {} trials")
|
|
|
- .format(len(self._trials.trials)))
|
|
|
+ .format(self._start_iteration))
|
|
|
|
|
|
except Exception as e:
|
|
|
err = ("Trials object could not be loaded. "
|
|
@@ -137,75 +181,142 @@ class PipelineSelector(ABC):
|
|
|
"Starting from scratch."))
|
|
|
|
|
|
self._trials = None
|
|
|
+ self._start_iteration = 0
|
|
|
+
|
|
|
+ self.attached_space = False
|
|
|
+ self.attached_data = False
|
|
|
+ self.configured_cross_validation = False
|
|
|
+ self.configured_summary_saving = False
|
|
|
+
|
|
|
+ # keeping track of the current search iteration
|
|
|
+ self._iteration = self._start_iteration
|
|
|
+ self._score_improved = False
|
|
|
+
|
|
|
+ self.start_tuning_time = datetime.datetime.today()
|
|
|
+ self.end_tuning_time = None
|
|
|
+ self.finished_tuning = False
|
|
|
|
|
|
def _backup_trials(self):
|
|
|
'''
|
|
|
Pickles (Saves) the trials object.
|
|
|
Used in a scheduler.
|
|
|
'''
|
|
|
- with open(self._trials_path, "wb") as f:
|
|
|
- pickle.dump(self._trials, f)
|
|
|
+ try:
|
|
|
+ with open(self.trials_path, "wb") as f:
|
|
|
+ pickle.dump(self._trials, f)
|
|
|
+ except Exception as e:
|
|
|
+ err = "Could not backup trials. Exit with error: {}".format(e)
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
|
|
|
- def attach_cross_validator(self, cross_validator: Callable = None,
|
|
|
- module_path: str = None,
|
|
|
- name: str = None):
|
|
|
+ def configure_cross_validation(self,
|
|
|
+ cross_validation: Callable,
|
|
|
+ kwargs: dict = None):
|
|
|
"""
|
|
|
Method for attaching a custom cross-validation function
|
|
|
- :param cross_validator: a function that has the same
|
|
|
+ :param cross_validation: a function that has the same
|
|
|
signature as sklearn.model_selection.cross_validate
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ assert(isinstance(cross_validation, Callable))
|
|
|
+ except AssertionError:
|
|
|
+ err = "Parameter cross_validation must be a function"
|
|
|
+ self._logger.log_and_raise_error(err, ErrorType=NameError)
|
|
|
+
|
|
|
+ try:
|
|
|
+ kwargs = kwargs or {}
|
|
|
+ assert(isinstance(kwargs, dict))
|
|
|
+ except AssertionError:
|
|
|
+ err = "Paramter kwargs must be a dict"
|
|
|
+ self._logger.log_and_raise_error(err, ErrorType=NameError)
|
|
|
+
|
|
|
+ try:
|
|
|
+ self._cross_validation = functools.partial(
|
|
|
+ self._cross_validation, **kwargs)
|
|
|
+
|
|
|
+ self.configured_cross_validation = True
|
|
|
+
|
|
|
+ if hasattr(cross_validation, "__name__"):
|
|
|
+ self.best_result["cross_validation"] =\
|
|
|
+ cross_validation.__name__
|
|
|
+
|
|
|
+ self._logger.info("Configured cross validation")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Failed to configure cross-validation. "
|
|
|
+ "Exit with error: {}".format(e))
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ def configure_cross_validation_from_module(self,
|
|
|
+ module_path: str,
|
|
|
+ name: str):
|
|
|
+ """
|
|
|
:param str module_path: path to python module
|
|
|
- where the space is defined. Optional when
|
|
|
- the space is provided directly.
|
|
|
+ where the cross_validation function is defined.
|
|
|
|
|
|
- :param str name: name of the space loaded from
|
|
|
- a python module. Optional when the space
|
|
|
- is provided directly.
|
|
|
+ :param str name: name of the cross validation function
|
|
|
+ loaded froma python module.
|
|
|
"""
|
|
|
try:
|
|
|
- assert((cross_validator is not None) or
|
|
|
- ((module_path is not None) and (name is not None)))
|
|
|
+ assert(isinstance(module_path, str) and
|
|
|
+ isinstance(name, str))
|
|
|
except AssertionError:
|
|
|
- err = ("Either cross_validator or "
|
|
|
- "(module_path, name) must be provided")
|
|
|
- self._logger.log_and_raise_error(err)
|
|
|
+ err = "Parameters module_path and name must be of str type"
|
|
|
+ self._logger.log_and_raise_error(err, ErrorType=NameError)
|
|
|
|
|
|
- self._cross_validator = cross_validator or\
|
|
|
- LoadingUtils().load_from_module(module_path=module_path, name=name)
|
|
|
+ try:
|
|
|
+ self._cross_validation = \
|
|
|
+ LoadingUtils().load_from_module(
|
|
|
+ module_path=module_path, name=name)
|
|
|
|
|
|
- self._logger.info("Attached a cross validator")
|
|
|
- self._cross_validator_attached = True
|
|
|
+ self.configured_cross_validation = True
|
|
|
|
|
|
- def attach_space(self, space=None,
|
|
|
- module_path: str = None,
|
|
|
- name: str = None):
|
|
|
- '''
|
|
|
+ self.best_result["cross_validation"] = name
|
|
|
+
|
|
|
+ self._logger.info("Configured cross validation")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Failed to load cross-validation from module. "
|
|
|
+ "Exit with error: {}".format(e))
|
|
|
+ self._logger.log_and_raise_error(e)
|
|
|
+
|
|
|
+ def attach_space(self, space):
|
|
|
+ """
|
|
|
:param space: space where
|
|
|
- the search is performed. Optional when a space
|
|
|
- is loaded from a python module. A space might be either
|
|
|
+ the search is performed. A space might be either
|
|
|
a list of dictionaries or a hyperopt space object
|
|
|
the elements of which are dictionaries with keys:
|
|
|
name, pipeline, params
|
|
|
+ """
|
|
|
+ self._space = space
|
|
|
+ self._logger.info("Attached parameter distribution space")
|
|
|
+ self.attached_space = True
|
|
|
|
|
|
+ def attach_space_from_module(self, module_path: str, name: str):
|
|
|
+ """
|
|
|
:param str module_path: path to python module
|
|
|
- where the space is defined. Optional when
|
|
|
- the space is provided directly.
|
|
|
+ where the space is defined.
|
|
|
|
|
|
:param str name: name of the space loaded from
|
|
|
- a python module. Optional when the space
|
|
|
- is provided directly.
|
|
|
- '''
|
|
|
+ a python module.
|
|
|
+ """
|
|
|
try:
|
|
|
- assert((space is not None) or
|
|
|
- ((module_path is not None) and (name is not None)))
|
|
|
+ assert(isinstance(module_path, str) and
|
|
|
+ isinstance(name, str))
|
|
|
except AssertionError:
|
|
|
- err = "Either space or (module_path, name) must be provided"
|
|
|
- self._logger.log_and_raise_error(err)
|
|
|
+ err = "Parameters module_path and name must be of str type"
|
|
|
+ self._logger.log_and_raise_error(err, ErrorType=NameError)
|
|
|
|
|
|
- self._space = space or LoadingUtils().load_from_module(
|
|
|
- module_path=module_path, name=name)
|
|
|
+ try:
|
|
|
+ self._space = LoadingUtils().load_from_module(
|
|
|
+ module_path=module_path, name=name)
|
|
|
|
|
|
- self._logger.info("Attached parameter distribution space")
|
|
|
- self._space_attached = True
|
|
|
+ self._logger.info("Attached parameter distribution space")
|
|
|
+
|
|
|
+ self.attached_space = True
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Failed to attach space from module. "
|
|
|
+ "Exit with error {}".format(e))
|
|
|
+ self._logger.loger_and_raise_error(err)
|
|
|
|
|
|
def attach_data(self, X_train: (pd.DataFrame, np.ndarray),
|
|
|
y_train: (pd.DataFrame, pd.Series, np.ndarray) = None,
|
|
@@ -242,25 +353,28 @@ class PipelineSelector(ABC):
|
|
|
isinstance(y_train, (pd.Series, np.ndarray,
|
|
|
pd.DataFrame, NoneType)) and
|
|
|
isinstance(y_val, (pd.Series, np.ndarray)) and
|
|
|
- ((y_val is None) if (y_train is None)
|
|
|
- else (y_val is not None)))
|
|
|
+ (y_val is None) == (y_train is None))
|
|
|
except AssertionError:
|
|
|
self._logger.log_and_raise_error(input_err)
|
|
|
|
|
|
- # cost is evaluated with a cross validation function
|
|
|
- # that accepts an array and a cv object with
|
|
|
- # indices of the fold splits.
|
|
|
- # Here we create a trivial cv object
|
|
|
- # with one validation split.
|
|
|
-
|
|
|
- train_inds = list(range(len(X_train)))
|
|
|
- val_inds = list(range(len(X_train),
|
|
|
- len(X_train) + len(X_val)))
|
|
|
-
|
|
|
- self._cv = [(train_inds, val_inds)]
|
|
|
- self._X = np.concatenate([X_train, X_val])
|
|
|
- self._y = None if y_train is None\
|
|
|
- else np.concatenate([y_train, y_val])
|
|
|
+ try:
|
|
|
+ # cost is evaluated with a cross validation function
|
|
|
+ # that accepts an array and a cv object with
|
|
|
+ # indices of the fold splits.
|
|
|
+ # Here we create a trivial cv object
|
|
|
+ # with one validation split.
|
|
|
+
|
|
|
+ train_inds = list(range(len(X_train)))
|
|
|
+ val_inds = list(range(len(X_train),
|
|
|
+ len(X_train) + len(X_val)))
|
|
|
+
|
|
|
+ self._cv = [(train_inds, val_inds)]
|
|
|
+ self._X = np.concatenate([X_train, X_val])
|
|
|
+ self._y = None if y_train is None\
|
|
|
+ else np.concatenate([y_train, y_val])
|
|
|
+ except Exception as e:
|
|
|
+ err = "Failed to attach data. Exit with error: {}".format(e)
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
|
|
|
else:
|
|
|
try:
|
|
@@ -276,10 +390,139 @@ class PipelineSelector(ABC):
|
|
|
self._y = y_train
|
|
|
|
|
|
self._logger.info("Attached data")
|
|
|
- self._data_attached = True
|
|
|
+ self.attached_data = True
|
|
|
|
|
|
- def _evaluate(self, pipeline: Pipeline) -> dict:
|
|
|
- '''
|
|
|
+ def attach_data_from_hdf5(self,
|
|
|
+ data_hdf5_store_path: str,
|
|
|
+ cv_pickle_path: str = None):
|
|
|
+ """
|
|
|
+ Method for attaching data from a hdf5 store.
|
|
|
+ The hdf5 store is a binary file,
|
|
|
+ after loading it, it is a dictionary with keys
|
|
|
+ X_train (y_train, X_val, y_val). The cv is loaded
|
|
|
+ from a pickle file. The reason to separate the data
|
|
|
+ store from the cv store, is the hdf5 is optimized to
|
|
|
+ store large dataframes (especially with simple types) and
|
|
|
+ a a small list of lists like a cv-object is better
|
|
|
+ to be stored as a pickle file.
|
|
|
+ :param str data_hdf5_store_path: path to the hdf5 store
|
|
|
+ with train and validation data
|
|
|
+ :param str cv_pickle_path: path to the pickle file with
|
|
|
+ the cv data
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ assert(os.path.isfile(data_hdf5_store_path))
|
|
|
+ except AssertionError:
|
|
|
+ err = "Parameter hdf5_store_path is not a file"
|
|
|
+ self._logger.log_and_raise_error(err, ErrorType=NameError)
|
|
|
+
|
|
|
+ # load the hdf5 store
|
|
|
+ try:
|
|
|
+ store = pd.HDFStore(data_hdf5_store_path)
|
|
|
+ self._data_path = data_hdf5_store_path
|
|
|
+ except Exception as e:
|
|
|
+ err = "Could not load the hdf5 store. Exit with error: {}."\
|
|
|
+ .format(e)
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ data_input = {}
|
|
|
+
|
|
|
+ for key in ["/X_train", "/y_train", "/X_val", "/y_val"]:
|
|
|
+ if key not in store.keys():
|
|
|
+ data_input[key.replace("/", "")] = None
|
|
|
+ else:
|
|
|
+ data_input[key.replace("/", "")] = store[key]
|
|
|
+
|
|
|
+ if cv_pickle_path is not None:
|
|
|
+ try:
|
|
|
+ assert(os.path.isfile(cv_pickle_path))
|
|
|
+ except AssertionError:
|
|
|
+ err = "Parameter hdf5_store_path is not a file"
|
|
|
+ self._logger.log_and_raise_error(err, ErrorType=NameError)
|
|
|
+
|
|
|
+ try:
|
|
|
+ data_input["cv"] = pickle.load(open(cv_pickle_path, "rb"))
|
|
|
+ self._cv_path = cv_pickle_path
|
|
|
+ except Exception as e:
|
|
|
+ err = "Could not load the pickeled cv. Exit with error: {}."\
|
|
|
+ .format(e)
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+ else:
|
|
|
+ data_input["cv"] = None
|
|
|
+
|
|
|
+ self.attach_data(**data_input)
|
|
|
+
|
|
|
+ store.close()
|
|
|
+
|
|
|
+ def configer_summary_saving(self,
|
|
|
+ save_method: Callable = None,
|
|
|
+ kwargs: dict = None):
|
|
|
+ """
|
|
|
+ Attaching a method for saving information about
|
|
|
+ the trials/space/strategy and the result of
|
|
|
+ the current best pipeline. This method can
|
|
|
+ save the result in a txt or a json file,
|
|
|
+ or in a database for example. Arguments like
|
|
|
+ file path or the table name can be specified in kwargs.
|
|
|
+ :param Callable save_method: method for saving the result
|
|
|
+ of the pipeline selection. The method must accept
|
|
|
+ a pandas DataFrame as argument. See self._save_result
|
|
|
+ method for the format of the argument being saved.
|
|
|
+ By default, saving to a csv file.
|
|
|
+ Examples:
|
|
|
+ functools.partial(pd.DataFrame.to_csv,
|
|
|
+ **{"path_or_buf": <PATH>})
|
|
|
+ functools.partial(np.savetxt, **{"fname": <PATH>})
|
|
|
+
|
|
|
+ functools.partial(SQLHandler(<URI>).append_to_table,
|
|
|
+ **{"tablename": <NAME>})
|
|
|
+
|
|
|
+ functools.partial(MongodbHandler(<URI>).insert_data_into_collection,
|
|
|
+ **{"collection_name": <NAME>})
|
|
|
+
|
|
|
+ using functools can be avoided by providing the kwarg argument
|
|
|
+ :param dict kwargs: a dictionary with keyword arguments
|
|
|
+ (like tablename) to provide to the save_method
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ save_method = save_method or functools.partial(
|
|
|
+ pd.DataFrame.to_excel, **{"path_or_buf": "result.csv"})
|
|
|
+
|
|
|
+ kwargs = kwargs or {}
|
|
|
+
|
|
|
+ self._save_method = functools.partial(save_method, **kwargs)
|
|
|
+
|
|
|
+ self.configured_summary_saving = True
|
|
|
+
|
|
|
+ self._logger.info("Configured summary saving")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Failed to configure the summary saving. "
|
|
|
+ "Exit with error {}".format(e))
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ def _save_summary(self, summary: dict):
|
|
|
+ """
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ assert(self.configured_summary_saving)
|
|
|
+ except AssertionError:
|
|
|
+ err = "Result saving must be configured first"
|
|
|
+ self._logger.log_and_raise_error(err, ErrorType=AssertionError)
|
|
|
+
|
|
|
+ try:
|
|
|
+ self._save_method(summary)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Could not configure summary saving. "
|
|
|
+ "Exit with error: {}".format(e))
|
|
|
+
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ def _evaluate(self, pipeline: Pipeline,
|
|
|
+ scoring: Callable = None,
|
|
|
+ cross_validation: Callable = None) -> dict:
|
|
|
+ """
|
|
|
This method is called in _objective.
|
|
|
|
|
|
Calculates the cost on the attached data.
|
|
@@ -289,24 +532,46 @@ class PipelineSelector(ABC):
|
|
|
|
|
|
:param Pipeline pipeline: machine learning pipeline
|
|
|
that will be evaluated with cross-validation
|
|
|
+ :param cross_validation: a function that has the same
|
|
|
+ signature as sklearn.model_selection.cross_validate
|
|
|
|
|
|
- :output: dictionary with the aggregated
|
|
|
+ :return: dictionary with the aggregated
|
|
|
cross-validation score and
|
|
|
the score variance.
|
|
|
- '''
|
|
|
- if not self._cross_validator_attached:
|
|
|
- self._cross_validator = sklearn_cross_validator
|
|
|
+ """
|
|
|
+ try:
|
|
|
+
|
|
|
+ scoring = {"score": make_scorer(self._cost_func)}
|
|
|
+
|
|
|
+ scoring.update({metric_name: make_scorer(metric)
|
|
|
+ for metric_name, metric
|
|
|
+ in self._additional_metrics.items()})
|
|
|
+
|
|
|
+ scores = self._cross_validation(
|
|
|
+ estimator=pipeline,
|
|
|
+ X=self._X,
|
|
|
+ y=self._y,
|
|
|
+ cv=self._cv or 5,
|
|
|
+ scoring=scoring,
|
|
|
+ error_score=np.nan)
|
|
|
+
|
|
|
+ scores_average = {
|
|
|
+ metric_name.replace("test_", ""):
|
|
|
+ self._cross_val_averaging_func(scores[metric_name])
|
|
|
+ for metric_name in scores
|
|
|
+ if metric_name.startswith("test")}
|
|
|
+
|
|
|
+ scores_variance = {
|
|
|
+ metric_name.replace("test_", "") + "_variance":
|
|
|
+ np.var(scores[metric_name])
|
|
|
+ for metric_name in scores
|
|
|
+ if metric_name.startswith("test")}
|
|
|
|
|
|
- scores = self._cross_validator(
|
|
|
- estimator=pipeline,
|
|
|
- X=self._X,
|
|
|
- y=self._y,
|
|
|
- cv=self._cv or 5,
|
|
|
- scoring=make_scorer(self._cost_func),
|
|
|
- error_score=np.nan)
|
|
|
+ return {**scores_average, **scores_variance}
|
|
|
|
|
|
- return {'value': self._cross_val_averaging_func(scores['test_score']),
|
|
|
- 'variance': np.var(scores['test_score'])}
|
|
|
+ except Exception as e:
|
|
|
+ err = "Failed to evaluate pipeline. Exit with error: {}".format(e)
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
|
|
|
def _objective(self, space_element: dict) -> dict:
|
|
|
'''
|
|
@@ -354,79 +619,137 @@ class PipelineSelector(ABC):
|
|
|
|
|
|
start_time = time.time()
|
|
|
|
|
|
- if not self._data_attached:
|
|
|
+ try:
|
|
|
+ assert(self.attached_data)
|
|
|
+ except AssertionError:
|
|
|
err = ("Data must be attached in order "
|
|
|
"in order to effectuate the best"
|
|
|
"pipeline search")
|
|
|
self._logger.log_and_raise_error(err)
|
|
|
|
|
|
- self._run_number += 1
|
|
|
+ summary = {}
|
|
|
+
|
|
|
+ if self._strategy_name is not None:
|
|
|
+ summary["strategy_name"] = self._strategy_name
|
|
|
+
|
|
|
+ if isinstance(self._cost_func, str):
|
|
|
+ summary["cost_func"] = self._cost_func
|
|
|
+
|
|
|
+ elif hasattr(self._cost_func, "__name__"):
|
|
|
+ summary["cost_func"] = self._cost_func.__name__
|
|
|
+
|
|
|
+ summary["trials_path"] = self.trials_path
|
|
|
+
|
|
|
+ if self._data_path is not None:
|
|
|
+ summary["data_path"] = self._data_path
|
|
|
+
|
|
|
+ if self._cv_path is not None:
|
|
|
+ summary["cv_path"] = self._cv_path
|
|
|
|
|
|
- pipeline = space_element['pipeline']
|
|
|
- params = space_element['params']
|
|
|
- pipeline.set_params(**params)
|
|
|
+ summary["start_tuning_time"] = self.start_tuning_time
|
|
|
|
|
|
- self._logger.info(("Run number {0}: "
|
|
|
- "Current score is {1}: "
|
|
|
- "Training pipeline {2} "
|
|
|
- "with parameters: {3}. ").format(
|
|
|
- self._run_number,
|
|
|
- self._best_score,
|
|
|
- space_element['name'],
|
|
|
- params))
|
|
|
+ summary["iteration"] = self._iteration
|
|
|
+
|
|
|
+ backup_cond = (self._backup_trials_freq is not None) and\
|
|
|
+ ((self._iteration - self._start_iteration - 1) %
|
|
|
+ self._backup_trials_freq == 0) or\
|
|
|
+ self._score_improved
|
|
|
+
|
|
|
+ if backup_cond:
|
|
|
+ self._backup_trials()
|
|
|
+ self._score_improved = False
|
|
|
|
|
|
try:
|
|
|
+ pipeline = space_element['pipeline']
|
|
|
+ params = space_element['params']
|
|
|
+ pipeline.set_params(**params)
|
|
|
+
|
|
|
+ self._logger.info(("Iteration {0}: "
|
|
|
+ "Current score is {1}: "
|
|
|
+ "Training pipeline {2} "
|
|
|
+ "with parameters: {3}. ").format(
|
|
|
+ self._iteration,
|
|
|
+ self.best_score,
|
|
|
+ space_element['name'],
|
|
|
+ params))
|
|
|
+
|
|
|
result = self._evaluate(pipeline)
|
|
|
|
|
|
- assert(not np.isnan(result["value"]))
|
|
|
+ summary.update(result)
|
|
|
|
|
|
- if self._run_number % self._backup_trials_freq == 0:
|
|
|
- self._backup_trials()
|
|
|
+ end_time = time.time()
|
|
|
|
|
|
- if (self._best_score != self._best_score) or\
|
|
|
- self._score_factor*result["value"] <\
|
|
|
- self._score_factor*self._best_score:
|
|
|
+ assert(not np.isnan(result["score"])),\
|
|
|
+ "Score value is not in the output of the _evaluate method"
|
|
|
|
|
|
- self._logger.info("Score got better, new best score is: {}"
|
|
|
- .format(result["value"]))
|
|
|
+ summary['status'] = STATUS_OK
|
|
|
+ summary.update(result)
|
|
|
+ summary['loss'] = self._score_factor * summary['score']
|
|
|
+ summary['timestamp'] = datetime.datetime.today()
|
|
|
+ summary['train_time'] = end_time - start_time
|
|
|
|
|
|
- self._best_score = result['value']
|
|
|
+ self._iteration += 1
|
|
|
|
|
|
- end_time = time.time()
|
|
|
+ self._score_improved = (self.best_score != self.best_score) or\
|
|
|
+ (self._score_factor*result["score"] <
|
|
|
+ self._score_factor*self.best_score)
|
|
|
|
|
|
- return {'loss': self._score_factor * result["value"],
|
|
|
- 'status': STATUS_OK,
|
|
|
- 'score': result["value"],
|
|
|
- 'score_variance': result["variance"],
|
|
|
- 'timestamp': datetime.datetime.today(),
|
|
|
- 'train_time': end_time - start_time}
|
|
|
+ if self._score_improved:
|
|
|
+
|
|
|
+ self._logger.info("Score improved, new best score is: {}"
|
|
|
+ .format(result["score"]))
|
|
|
+
|
|
|
+ self.best_score = result['score']
|
|
|
+
|
|
|
+ if self.configured_summary_saving:
|
|
|
+ self._save_summary(summary)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
self._logger.warning("Trial failed with error {}".format(e))
|
|
|
|
|
|
- return {'loss': np.nan,
|
|
|
- 'status': STATUS_FAIL,
|
|
|
- 'score': np.nan,
|
|
|
- 'score_variance': np.nan,
|
|
|
- 'timestamp': datetime.datetime.today(),
|
|
|
- 'train_time': np.nan}
|
|
|
+ summary['status'] = STATUS_FAIL
|
|
|
+ summary['timestamp'] = datetime.datetime.today()
|
|
|
+ summary['error'] = e
|
|
|
+ for key in ['loss', 'score', 'score_variance', 'train_time']:
|
|
|
+ summary[key] = np.nan
|
|
|
+
|
|
|
+ return summary
|
|
|
|
|
|
@abstractmethod
|
|
|
def run_trials(self):
|
|
|
"""
|
|
|
+ Method that runs the hyperparameter tuning over possibly multiple
|
|
|
+ pipeline types specified in self.space
|
|
|
+ When run_trials method is finished the flag self.finished_tuning
|
|
|
+ should be set to True and the methods self._backup_trials and
|
|
|
+ optionally self._save_result should be called.
|
|
|
"""
|
|
|
pass
|
|
|
|
|
|
@abstractproperty
|
|
|
- def best_trial(self) -> float:
|
|
|
+ def number_of_trials(self) -> int:
|
|
|
"""
|
|
|
+ Number of trials already run in the current trials object
|
|
|
+ """
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractproperty
|
|
|
+ def best_trial(self) -> dict:
|
|
|
+ """
|
|
|
+ Best trial sor far.
|
|
|
+ Should contain the best pipeline,
|
|
|
+ best hyperparameters,
|
|
|
+ as well as an output of the self._objective method,
|
|
|
+ but the exact form of the output depends on the implementation
|
|
|
+ of the Trials object.
|
|
|
"""
|
|
|
pass
|
|
|
|
|
|
@abstractproperty
|
|
|
def best_trial_score(self) -> float:
|
|
|
"""
|
|
|
+ Score of the best pipeline with the best hyperparameters
|
|
|
"""
|
|
|
pass
|
|
|
|
|
@@ -439,5 +762,34 @@ class PipelineSelector(ABC):
|
|
|
@abstractproperty
|
|
|
def best_trial_pipeline(self) -> Pipeline:
|
|
|
"""
|
|
|
+ Best pipeline with best hyperparameters
|
|
|
+ """
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def get_n_best_trial_pipelines(self, n: int) -> list:
|
|
|
+ """
|
|
|
+ N best pipelines with corresponding
|
|
|
+ best hyperparameters
|
|
|
+ """
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def get_n_best_trial_pipelines_of_each_type(self, n_int) -> list:
|
|
|
+ """
|
|
|
+ If the hyperparameter search is done over multiple
|
|
|
+ pipelines, then returns n different pipeline-types
|
|
|
+ with corresponding hyperparameters
|
|
|
+ """
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def trials_to_excel(self, path: str):
|
|
|
+ """
|
|
|
+ Trials object in the shape of table written to excel,
|
|
|
+ should contain the iteration, pipeline (as str),
|
|
|
+ hyperparamters (as str), self.best_result (see self._objective method)
|
|
|
+ as well as additional information configured
|
|
|
+ through self.save_result method.
|
|
|
"""
|
|
|
pass
|