|
@@ -0,0 +1,824 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+Created on Wed Sep 30 14:23:23 2020
|
|
|
+
|
|
|
+@author: tanya
|
|
|
+@description: an abstract class for selecting a machine learning
|
|
|
+ pipeline from a space (deterministic or random) of parameter distributions
|
|
|
+ over multiple pipelines.
|
|
|
+ The selection is thought in such a way that a Trials object is being
|
|
|
+ maintained during the tuning process from which one can retrieve
|
|
|
+ the best pipeline so far
|
|
|
+ as well as the entire tuning history if needed.
|
|
|
+ Methods configure_cross_validation and configure_result_saving
|
|
|
+ allow to use a custom cross-validation method and
|
|
|
+ save the current best result in a file or database during training.
|
|
|
+ Children classes: hyperopt and custom gridsearch.
|
|
|
+"""
|
|
|
+
|
|
|
+import pickle
|
|
|
+import os
|
|
|
+import sys
|
|
|
+import time
|
|
|
+import datetime
|
|
|
+import numpy as np
|
|
|
+import pandas as pd
|
|
|
+from copy import deepcopy
|
|
|
+from abc import ABC, abstractmethod, abstractproperty
|
|
|
+
|
|
|
+if sys.version_info >= (3, 8):
|
|
|
+ from typing import Callable, TypedDict,\
|
|
|
+ Literal, Dict, Iterable, List, Tuple, Union
|
|
|
+else:
|
|
|
+ from typing_extensions import Callable, TypedDict,\
|
|
|
+ Literal, Dict, Iterable, List, Tuple, Union
|
|
|
+
|
|
|
+import functools
|
|
|
+from sklearn.pipeline import Pipeline
|
|
|
+from sklearn.model_selection import cross_validate as sklearn_cross_validation
|
|
|
+from sklearn.metrics import make_scorer
|
|
|
+from hyperopt import STATUS_OK, STATUS_FAIL
|
|
|
+from cdplib.log import Log
|
|
|
+from cdplib.utils import ExceptionsHandler
|
|
|
+from cdplib.utils import LoadingUtils
|
|
|
+from cdplib.ml_validation import CVComposer
|
|
|
+
|
|
|
+sys.path.append(os.getcwd())
|
|
|
+
|
|
|
+
|
|
|
+class SpaceElementType(TypedDict):
|
|
|
+ name: str
|
|
|
+ pipeline: Pipeline
|
|
|
+ params: dict
|
|
|
+
|
|
|
+# TODO Tanya: add possibility to include confusion matrix in
|
|
|
+# additional metrics
|
|
|
+# check that cv object contains indices
|
|
|
+
|
|
|
+class PipelineSelector(ABC):
|
|
|
+ """
|
|
|
+ An abstract class for selecting a machine learning
|
|
|
+ pipeline from a space (deterministic or random) of parameter
|
|
|
+ distributions over multiple pipelines.
|
|
|
+ The selection is though in such a way that a Trials object is being
|
|
|
+ maintained during the tuning process from which one can retrieve
|
|
|
+ the best pipeline so far as well as the entire tuning history
|
|
|
+ if needed.
|
|
|
+ Methods configure_cross_validation and configure_result_saving
|
|
|
+ allow to use a custom cross-validation method and
|
|
|
+ save the current best result in a file or database during training.
|
|
|
+ Children classes: hyperopt and custom gridsearch.
|
|
|
+ """
|
|
|
+ def __init__(self,
|
|
|
+ cost_func: Union[Callable, str],
|
|
|
+ greater_is_better: bool,
|
|
|
+ trials_path: str,
|
|
|
+ backup_trials_freq: int = None,
|
|
|
+ cross_validation_needs_scorer: bool = True,
|
|
|
+ cross_val_averaging_func: Callable = np.mean,
|
|
|
+ additional_metrics: Dict[str, Callable] = None,
|
|
|
+ additional_averaging_funcs: Dict[str, Callable] = None,
|
|
|
+ strategy_name: str = None,
|
|
|
+ stdout_log_level: Literal["INFO", "WARNING", "ERROR"]
|
|
|
+ = "INFO"):
|
|
|
+ """
|
|
|
+ :param Callable cost_func: function to minimize or maximize
|
|
|
+ over the elements of a given (pipeline/hyperparameter) space
|
|
|
+
|
|
|
+ :param bool greater_is_better: when True
|
|
|
+ cost_func is maximized, else minimized.
|
|
|
+
|
|
|
+ :param str trials_path: path at which the trials object is saved
|
|
|
+ in binary format. From the trials object we can
|
|
|
+ select information about the obtained scores, score variations,
|
|
|
+ and pipelines, and parameters tried out so far. If a trials object
|
|
|
+ already exists at the given path, it is loaded and the
|
|
|
+ search is continued, else, the search is started from scratch.
|
|
|
+
|
|
|
+ :param backup_trials_freq: frequecy in interations (trials)
|
|
|
+ of saving the trials object at the trials_path.
|
|
|
+ if None, the trials object is backed up avery time
|
|
|
+ the score improves.
|
|
|
+
|
|
|
+ :param Callable cross_val_averaging_func: Function to aggregate
|
|
|
+ the cross-validation scores of the cost_func.
|
|
|
+ Example different from the mean: mean - c*var.
|
|
|
+
|
|
|
+ :param additional_metics: dict of additional metrics to keep track of
|
|
|
+ in the trials of the form {"metric_name": metric}.
|
|
|
+
|
|
|
+ :param additional_averaging_funcs: functions used to aggregate
|
|
|
+ the output of the cross_validate function.
|
|
|
+ The output always contains the scores of the cost_func,
|
|
|
+ additional_metrics (if it is not empty),
|
|
|
+ but it can also contain additional information
|
|
|
+ (like probability threshold for example)
|
|
|
+ if different from cross_val_averaging_func.
|
|
|
+ Of the form {"metric_name": averaging_func}
|
|
|
+
|
|
|
+ Remark:
|
|
|
+
|
|
|
+ :param str strategy_name:
|
|
|
+ a strategy is defined by the data set (columns/features and rows),
|
|
|
+ cv object, cost function.
|
|
|
+ When the strategy changes, one must start with new trials.
|
|
|
+
|
|
|
+ :param str stdout_log_level: can be INFO, WARNING, ERROR
|
|
|
+ """
|
|
|
+ self._logger = Log("PipelineSelector: ",
|
|
|
+ stdout_log_level=stdout_log_level)
|
|
|
+
|
|
|
+ try:
|
|
|
+
|
|
|
+ ExceptionsHandler(self._logger)\
|
|
|
+ .assert_is_directory(path=trials_path)
|
|
|
+
|
|
|
+ self.attached_space = False
|
|
|
+ self.attached_data = False
|
|
|
+ self.configured_cross_validation = False
|
|
|
+ self.configured_summary_saving = False
|
|
|
+
|
|
|
+ self._cost_func = cost_func
|
|
|
+ self._greater_is_better = greater_is_better
|
|
|
+ # score factor is 1 when cost_func is minimized,
|
|
|
+ # -1 when cost func is maximized
|
|
|
+ self._score_factor = (not greater_is_better) - greater_is_better
|
|
|
+ self._cross_val_averaging_func = cross_val_averaging_func
|
|
|
+ self._additional_metrics = additional_metrics
|
|
|
+ self._additional_averaging_funcs = additional_averaging_funcs or {}
|
|
|
+
|
|
|
+ self.trials_path = trials_path
|
|
|
+ self._backup_trials_freq = backup_trials_freq
|
|
|
+
|
|
|
+ self._strategy_name = strategy_name
|
|
|
+ self._data_path = None
|
|
|
+ self._cv_path = None
|
|
|
+ self._X = None
|
|
|
+ self._y = None
|
|
|
+ self._cv = None
|
|
|
+ self._space = None
|
|
|
+
|
|
|
+ # if cross-valition is not configured,
|
|
|
+ # sklearn cross-validation method is taken by default
|
|
|
+ self._cross_validation = sklearn_cross_validation
|
|
|
+
|
|
|
+ self._cross_validation_needs_scorer = cross_validation_needs_scorer
|
|
|
+
|
|
|
+ # if a trials object already exists at the given path,
|
|
|
+ # it is loaded and the search is continued. Else,
|
|
|
+ # the search is started from the beginning.
|
|
|
+ if os.path.isfile(self.trials_path):
|
|
|
+
|
|
|
+ with open(self.trials_path, "rb") as f:
|
|
|
+ self._trials = pickle.load(f)
|
|
|
+
|
|
|
+ if len(self._trials) == 0:
|
|
|
+ self._trials = None
|
|
|
+
|
|
|
+ else:
|
|
|
+ self._trials = None
|
|
|
+
|
|
|
+ if self._trials is not None:
|
|
|
+
|
|
|
+ self._start_iteration = self.number_of_trials
|
|
|
+
|
|
|
+ self.best_score = self.best_trial_score
|
|
|
+
|
|
|
+ self._logger.info(("Loaded an existing trials object"
|
|
|
+ "Consisting of {} trials")
|
|
|
+ .format(self._start_iteration))
|
|
|
+
|
|
|
+ else:
|
|
|
+ self._logger.warning(("No existing trials object was found, "
|
|
|
+ "Starting from scratch."))
|
|
|
+
|
|
|
+ self._trials = None
|
|
|
+ self._start_iteration = 0
|
|
|
+ self.best_score = np.nan
|
|
|
+
|
|
|
+ # keeping track of the current search iteration
|
|
|
+ self._iteration = self._start_iteration
|
|
|
+ self._score_improved = False
|
|
|
+
|
|
|
+ self.start_tuning_time = datetime.datetime.today()
|
|
|
+ self.total_tuning_time = None
|
|
|
+ self.finished_tuning = False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Failed to initialize the class. "
|
|
|
+ "Exit with error: {}".format(e))
|
|
|
+
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ def _backup_trials(self) -> None:
|
|
|
+ '''
|
|
|
+ Pickles (Saves) the trials object in binary format.
|
|
|
+ '''
|
|
|
+ try:
|
|
|
+ with open(self.trials_path, "wb") as f:
|
|
|
+ pickle.dump(self._trials, f)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = "Could not backup trials. Exit with error: {}".format(e)
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ def configure_cross_validation(self,
|
|
|
+ cross_validation: Callable,
|
|
|
+ kwargs: dict = None) -> None:
|
|
|
+ """
|
|
|
+ Method for attaching a custom cross-validation function
|
|
|
+
|
|
|
+ :param cross_validation: a function that has the same
|
|
|
+ signature as sklearn.model_selection.cross_validate
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ kwargs = kwargs or {}
|
|
|
+
|
|
|
+ self._cross_validation = functools.partial(
|
|
|
+ cross_validation, **kwargs)
|
|
|
+
|
|
|
+ self.configured_cross_validation = True
|
|
|
+
|
|
|
+ self._logger.info("Configured cross validation")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Failed to configure cross-validation. "
|
|
|
+ "Exit with error: {}".format(e))
|
|
|
+
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ def configure_cross_validation_from_module(self,
|
|
|
+ module_path: str,
|
|
|
+ name: str) -> None:
|
|
|
+ """
|
|
|
+ Attaches a cross-validation funciton defined in
|
|
|
+ a different python model. This function must have
|
|
|
+ the same signature as sklearn.model_seclection.cross_validate
|
|
|
+
|
|
|
+ :param str module_path: path to python module
|
|
|
+ where the cross_validation function is defined.
|
|
|
+
|
|
|
+ :param str name: name of the cross validation function
|
|
|
+ loaded froma python module.
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ self._cross_validation = \
|
|
|
+ LoadingUtils().load_from_module(
|
|
|
+ module_path=module_path, name=name)
|
|
|
+
|
|
|
+ self.configured_cross_validation = True
|
|
|
+
|
|
|
+ self._logger.info("Configured cross validation")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Failed to load cross-validation from module. "
|
|
|
+ "Exit with error: {}".format(e))
|
|
|
+
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ def attach_space(self, space) -> None:
|
|
|
+ """
|
|
|
+ Method for attaching the pipeline/hyperparameter space
|
|
|
+ over which the score_func is optimized.
|
|
|
+
|
|
|
+ :param space: space where
|
|
|
+ the search is performed. A space might be either
|
|
|
+ a list of dictionaries or a hyperopt space object
|
|
|
+ the elements of which are dictionaries with keys:
|
|
|
+ name, pipeline, params
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ self._space = space
|
|
|
+
|
|
|
+ self.attached_space = True
|
|
|
+
|
|
|
+ self._logger.info("Attached parameter distribution space")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Failed to attach space. "
|
|
|
+ "Exit with error: {}".format(e))
|
|
|
+
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ def attach_space_from_module(self, module_path: str, name: str) -> None:
|
|
|
+ """
|
|
|
+ Attaches a space defined in a different python module.
|
|
|
+
|
|
|
+ :param str module_path: path to python module
|
|
|
+ where the space is defined.
|
|
|
+
|
|
|
+ :param str name: name of the space loaded from
|
|
|
+ a python module.
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ self._space = LoadingUtils().load_from_module(
|
|
|
+ module_path=module_path, name=name)
|
|
|
+
|
|
|
+ self.attached_space = True
|
|
|
+
|
|
|
+ self._logger.info("Attached parameter distribution space")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Failed to attach space from module. "
|
|
|
+ "Exit with error {}".format(e))
|
|
|
+
|
|
|
+ self._logger.loger_and_raise_error(err)
|
|
|
+
|
|
|
+ def attach_data(self, X_train: Union[pd.DataFrame, np.ndarray],
|
|
|
+ y_train: Union[pd.DataFrame, pd.Series, np.ndarray]
|
|
|
+ = None,
|
|
|
+ X_val: Union[pd.DataFrame, np.ndarray]
|
|
|
+ = None,
|
|
|
+ y_val: Union[pd.DataFrame, pd.Series, np.ndarray]
|
|
|
+ = None,
|
|
|
+ cv: Union[Iterable[Tuple[List[int], List[int]]]]
|
|
|
+ = None) -> None:
|
|
|
+ '''
|
|
|
+ :param array X_train: data on which
|
|
|
+ machine learning pipelines are trained
|
|
|
+
|
|
|
+ :param array y_train: optional, vector with targets,
|
|
|
+ (None in case of unsupervided learning)
|
|
|
+
|
|
|
+ :param array X_val: optional, validation data.
|
|
|
+ When not provided, cross-validated value
|
|
|
+ of the cost_func is calculated.
|
|
|
+
|
|
|
+ :param array y_val: optional, validation targets
|
|
|
+
|
|
|
+ :param list cv: iterabe of tuples containing
|
|
|
+ train and validation indices or an integer representing
|
|
|
+ the number of folds for a random split of data
|
|
|
+ during cross-validation
|
|
|
+ example: [([0,1,2], [3,4]), ([1,2,3], [4,5])]
|
|
|
+ '''
|
|
|
+ try:
|
|
|
+ assert((cv is None) == (X_val is not None)),\
|
|
|
+ "Either cv or X_val must be provided"
|
|
|
+
|
|
|
+ if cv is None:
|
|
|
+
|
|
|
+ assert((y_val is None) == (y_train is None)),\
|
|
|
+ "y_train and y_val must be simultanious"
|
|
|
+
|
|
|
+ # Here we create a trivial cv object
|
|
|
+ # with one validation split.
|
|
|
+
|
|
|
+ # XXX Tanya finish here
|
|
|
+
|
|
|
+ cv = CVComposer.dummy_cv()
|
|
|
+
|
|
|
+ train_inds = list(range(len(X_train)))
|
|
|
+ val_inds = list(range(len(X_train),
|
|
|
+ len(X_train) + len(X_val)))
|
|
|
+
|
|
|
+ self._cv = [(train_inds, val_inds)]
|
|
|
+
|
|
|
+ self._X = np.concatenate([X_train, X_val])
|
|
|
+ self._y = None if y_train is None\
|
|
|
+ else np.concatenate([y_train, y_val])
|
|
|
+
|
|
|
+ else:
|
|
|
+
|
|
|
+ self._cv = cv
|
|
|
+ self._X = X_train
|
|
|
+ self._y = y_train
|
|
|
+
|
|
|
+ self.attached_data = True
|
|
|
+
|
|
|
+ self._logger.info("Attached data")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Failed to attach data. "
|
|
|
+ "Exit with error: {}".format(e))
|
|
|
+
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ def attach_data_from_hdf5(self,
|
|
|
+ data_hdf5_store_path: str,
|
|
|
+ cv_pickle_path: str = None) -> None:
|
|
|
+ """
|
|
|
+ Method for attaching data from a hdf5 store
|
|
|
+ and a cv object from a pickled file.
|
|
|
+
|
|
|
+ The hdf5 store is a binary file,
|
|
|
+ after loading it, it is a dictionary with keys
|
|
|
+ X_train (y_train, X_val, y_val).
|
|
|
+
|
|
|
+ The cv is loaded from a pickle file.
|
|
|
+
|
|
|
+ The reason to separate the data
|
|
|
+ store from the cv store, is the hdf5 is optimized to
|
|
|
+ store large dataframes (especially with simple types) and
|
|
|
+ a a small list of lists like a cv-object is better
|
|
|
+ to be stored as a pickle file.
|
|
|
+
|
|
|
+ :param str data_hdf5_store_path: path to the hdf5 store
|
|
|
+ with train and validation data
|
|
|
+ :param str cv_pickle_path: path to the pickle file with
|
|
|
+ the cv data
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ assert(os.path.isfile(data_hdf5_store_path)),\
|
|
|
+ "Parameter hdf5_store_path is not a file"
|
|
|
+
|
|
|
+ # close all opened files, because hdf5 will
|
|
|
+ # fail to reopen an opened (for some reason) file
|
|
|
+ import tables
|
|
|
+ tables.file._open_files.close_all()
|
|
|
+
|
|
|
+ store = pd.HDFStore(data_hdf5_store_path)
|
|
|
+
|
|
|
+ self._data_path = data_hdf5_store_path
|
|
|
+
|
|
|
+ data_input = {key: store[key] if key in store else None
|
|
|
+ for key in ["X_train", "y_train", "X_val", "y_val"]}
|
|
|
+
|
|
|
+ if cv_pickle_path is not None:
|
|
|
+
|
|
|
+ assert(os.path.isfile(cv_pickle_path)),\
|
|
|
+ "Parameter cv_pickle_path is not a file"
|
|
|
+
|
|
|
+ data_input["cv"] = pickle.load(open(cv_pickle_path, "rb"))
|
|
|
+
|
|
|
+ self._cv_path = cv_pickle_path
|
|
|
+
|
|
|
+ else:
|
|
|
+ data_input["cv"] = None
|
|
|
+
|
|
|
+ self.attach_data(**data_input)
|
|
|
+
|
|
|
+ store.close()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = "Failed to attach data. Exit with error: {}".format(e)
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ @property
|
|
|
+ def default_summary(self) -> dict:
|
|
|
+ """
|
|
|
+ Default summary of the strategy.
|
|
|
+ Every the _objective function is called
|
|
|
+ the current score and the information
|
|
|
+ about the tested space element is added to the
|
|
|
+ summary and it is saved to the Trials.
|
|
|
+ If summary saving is configured it is also
|
|
|
+ saved to a file, or a database when the score improves.
|
|
|
+ """
|
|
|
+ summary = {}
|
|
|
+
|
|
|
+ if self._strategy_name is not None:
|
|
|
+ summary["strategy_name"] = self._strategy_name
|
|
|
+
|
|
|
+ if isinstance(self._cost_func, str):
|
|
|
+ summary["cost_func"] = self._cost_func
|
|
|
+
|
|
|
+ elif hasattr(self._cost_func, "__name__"):
|
|
|
+ summary["cost_func"] = self._cost_func.__name__
|
|
|
+
|
|
|
+ summary["trials_path"] = self.trials_path
|
|
|
+
|
|
|
+ if self._data_path is not None:
|
|
|
+ summary["data_path"] = self._data_path
|
|
|
+
|
|
|
+ if self._cv_path is not None:
|
|
|
+ summary["cv_path"] = self._cv_path
|
|
|
+
|
|
|
+ summary["start_tuning_time"] = self.start_tuning_time
|
|
|
+
|
|
|
+ summary["iteration"] = self._iteration
|
|
|
+
|
|
|
+ return summary
|
|
|
+
|
|
|
+ def configer_summary_saving(self,
|
|
|
+ save_method: Callable
|
|
|
+ = functools.partial(
|
|
|
+ pd.DataFrame.to_excel,
|
|
|
+ **{"path_or_buf": "result.csv"}),
|
|
|
+ kwargs: dict = None) -> None:
|
|
|
+ """
|
|
|
+ When the score calculated by _objective function improves,
|
|
|
+ the default summary is updated with information about the
|
|
|
+ current score and pipeline/hyperparameters
|
|
|
+ and can be saved to a file or database, depending
|
|
|
+ on the configured save_method.
|
|
|
+
|
|
|
+ :param Callable save_method: method for saving the result
|
|
|
+ of the pipeline selection. The method must accept
|
|
|
+ a pandas DataFrame as argument.
|
|
|
+ By default, saving to an excel file.
|
|
|
+
|
|
|
+ Examples:
|
|
|
+ functools.partial(pd.DataFrame.to_csv,
|
|
|
+ **{"path_or_buf": <PATH>})
|
|
|
+ functools.partial(np.savetxt, **{"fname": <PATH>})
|
|
|
+
|
|
|
+ functools.partial(SQLHandler(<URI>).append_to_table,
|
|
|
+ **{"tablename": <NAME>})
|
|
|
+
|
|
|
+ functools.partial(MongodbHandler(<URI>).insert_data_into_collection,
|
|
|
+ **{"collection_name": <NAME>})
|
|
|
+
|
|
|
+ using functools can be avoided by providing the kwarg argument
|
|
|
+
|
|
|
+ :param dict kwargs: a dictionary with keyword arguments
|
|
|
+ (like tablename) to provide to the save_method
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ kwargs = kwargs or {}
|
|
|
+
|
|
|
+ self._save_method = functools.partial(save_method, **kwargs)
|
|
|
+
|
|
|
+ self.configured_summary_saving = True
|
|
|
+
|
|
|
+ self._logger.info("Configured summary saving")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Failed to configure the summary saving. "
|
|
|
+ "Exit with error {}".format(e))
|
|
|
+
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ def _save_summary(self, summary: dict) -> None:
|
|
|
+ """
|
|
|
+ When the score calculated by _objective function improves,
|
|
|
+ the default summary is updated with information about the
|
|
|
+ current score and pipeline/hyperparameters
|
|
|
+ and can be saved to a file or database, depending
|
|
|
+ on the configured save_method.
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ assert(self.configured_summary_saving),\
|
|
|
+ "Result saving must be configured first"
|
|
|
+
|
|
|
+ self._save_method(summary)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Could not configure summary saving. "
|
|
|
+ "Exit with error: {}".format(e))
|
|
|
+
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ def _evaluate(self, pipeline: Pipeline) -> Union[Dict[str, float], None]:
|
|
|
+ """
|
|
|
+ Calculates the averaged cross-validated score and score variance,
|
|
|
+ as well as the averaged values and variances of the additional metrics.
|
|
|
+
|
|
|
+ This method is called in the _objective function that is
|
|
|
+ passed to the hyperopt optimizer.
|
|
|
+
|
|
|
+ This function can be overriden, when the cost
|
|
|
+ needs to be calculated differently,
|
|
|
+ for example with a tensorflow model.
|
|
|
+
|
|
|
+ :param Pipeline pipeline: machine learning pipeline
|
|
|
+ that will be evaluated with cross-validation
|
|
|
+
|
|
|
+ :return: dictionary with the aggregated
|
|
|
+ cross-validation scores and
|
|
|
+ the score variances for the scores in the output
|
|
|
+ of the cross-validation function.
|
|
|
+
|
|
|
+ form of the output:
|
|
|
+ {"score": 10, #score used in optimization,
|
|
|
+ "score_variance": 0.5
|
|
|
+ "additional_metric1": 5,
|
|
|
+ "additional_metric1_variance": 7}
|
|
|
+
|
|
|
+ a custom cross-validation function can also include for
|
|
|
+ example probability threshold for each fold, then
|
|
|
+ the output of this function will include the average
|
|
|
+ value and the variance of the probability threshold
|
|
|
+ over the folds.
|
|
|
+ """
|
|
|
+ try:
|
|
|
+
|
|
|
+ scoring = {"score": self._cost_func} | self._additional_metrics
|
|
|
+
|
|
|
+ if self._cross_validation_needs_scorer:
|
|
|
+ for metric_name, metric in scoring.itmes():
|
|
|
+ scoring[metric_name] = make_scorer(
|
|
|
+ metric, greater_is_better=self._greater_is_better)
|
|
|
+
|
|
|
+ cross_validation_input_args = {
|
|
|
+ "estimator": pipeline,
|
|
|
+ "X": self._X,
|
|
|
+ "y": self._y,
|
|
|
+ "cv": self._cv,
|
|
|
+ "scoring": scoring
|
|
|
+ }
|
|
|
+
|
|
|
+ if "error_score" in self._cross_validation.__annotations__:
|
|
|
+ cross_validation_input_args["error_score"] = np.nan
|
|
|
+
|
|
|
+ scores = self._cross_validation(**cross_validation_input_args)
|
|
|
+
|
|
|
+ averaging_funcs = {
|
|
|
+ metric_name: self._additional_averaging_funcs[metric_name]
|
|
|
+ if metric_name in self._additional_averaging_funcs
|
|
|
+ else self._cross_val_averaging_func
|
|
|
+ for metric_name in scores}
|
|
|
+
|
|
|
+ scores_average = {
|
|
|
+ metric_name.replace("test_", ""):
|
|
|
+ averaging_funcs[metric_name](scores[metric_name])
|
|
|
+ for metric_name in scores
|
|
|
+ if metric_name.startswith("test")}
|
|
|
+
|
|
|
+ scores_variance = {
|
|
|
+ metric_name.replace("test_", "") + "_variance":
|
|
|
+ np.var(scores[metric_name])
|
|
|
+ for metric_name in scores
|
|
|
+ if metric_name.startswith("test")}
|
|
|
+
|
|
|
+ return {**scores_average, **scores_variance}
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = "Failed to evaluate pipeline. Exit with error: {}".format(e)
|
|
|
+
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ def _objective(self, space_element: SpaceElementType) -> dict:
|
|
|
+ '''
|
|
|
+ This method is called in run_trials method
|
|
|
+ that is using the hyperopt fmin opmizer.
|
|
|
+
|
|
|
+ Uses _evaluate method.
|
|
|
+
|
|
|
+ It must take as input a space element
|
|
|
+ and produce an output in the form of dictionary
|
|
|
+ with 2 obligatory values loss and status
|
|
|
+ (STATUS_OK or STATUS_FAIL). Other
|
|
|
+ values in the output are optional and can be
|
|
|
+ accessed later through the trials object.
|
|
|
+
|
|
|
+ :Warning: fmin minimizes the loss,
|
|
|
+ when _evaluate returns a value to be maximized,
|
|
|
+ it is multiplied by -1 to obtain loss.
|
|
|
+
|
|
|
+ :param SpaceElementType space_element: element
|
|
|
+ of the space over which the optimization is done
|
|
|
+
|
|
|
+ :output: dictionary with keys
|
|
|
+ loss (minimized value),
|
|
|
+ status with values STATUS_OK or STATUS_FAIL
|
|
|
+ uderstood by hyperopt,
|
|
|
+ score (equal to loss or -loss),
|
|
|
+ score_variance,
|
|
|
+ timestamp (end of execution),
|
|
|
+ train_time: execution time
|
|
|
+ and other keys given in self.default_summary
|
|
|
+ '''
|
|
|
+ try:
|
|
|
+ start_time = time.time()
|
|
|
+
|
|
|
+ assert(self.attached_data),\
|
|
|
+ ("Data must be attached in order "
|
|
|
+ "in order to effectuate the best"
|
|
|
+ "pipeline search")
|
|
|
+
|
|
|
+ summary = deepcopy(self.default_summary)
|
|
|
+
|
|
|
+ # backup the current trials if the score improved
|
|
|
+ # at previous iteration or every ith iteration
|
|
|
+ # if the backup_trials_freq is set
|
|
|
+ backup_cond = ((self._backup_trials_freq is not None) and
|
|
|
+ ((self._iteration - self._start_iteration - 1) %
|
|
|
+ self._backup_trials_freq == 0)) or\
|
|
|
+ self._score_improved
|
|
|
+
|
|
|
+ if backup_cond:
|
|
|
+ self._backup_trials()
|
|
|
+ self._score_improved = False
|
|
|
+
|
|
|
+ pipeline = space_element['pipeline']
|
|
|
+ params = space_element['params']
|
|
|
+ pipeline.set_params(**params)
|
|
|
+
|
|
|
+ self._logger.info(("Iteration {0}: "
|
|
|
+ "Current score is {1}: "
|
|
|
+ "Training pipeline {2} "
|
|
|
+ "with parameters: {3}. ").format(
|
|
|
+ self._iteration,
|
|
|
+ self.best_score,
|
|
|
+ space_element['name'],
|
|
|
+ params))
|
|
|
+
|
|
|
+ result = self._evaluate(pipeline)
|
|
|
+
|
|
|
+ summary.update(result)
|
|
|
+
|
|
|
+ end_time = time.time()
|
|
|
+
|
|
|
+ summary['status'] = STATUS_OK
|
|
|
+ summary.update(result)
|
|
|
+ summary['loss'] = self._score_factor * summary['score']
|
|
|
+ summary['timestamp'] = datetime.datetime.today()
|
|
|
+ summary['train_time'] = end_time - start_time
|
|
|
+
|
|
|
+ self._iteration += 1
|
|
|
+
|
|
|
+ self._score_improved = (self.best_score != self.best_score) or\
|
|
|
+ (self._score_factor*result["score"] <
|
|
|
+ self._score_factor*self.best_score)
|
|
|
+
|
|
|
+ if self._score_improved:
|
|
|
+
|
|
|
+ self._logger.info("Score improved, new best score is: {}"
|
|
|
+ .format(result["score"]))
|
|
|
+
|
|
|
+ self.best_score = result['score']
|
|
|
+
|
|
|
+ if self.configured_summary_saving:
|
|
|
+ self._save_summary(summary)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+
|
|
|
+ self._logger.warning("Trial failed with error {}".format(e))
|
|
|
+
|
|
|
+ summary = {}
|
|
|
+ summary['status'] = STATUS_FAIL
|
|
|
+ summary['timestamp'] = datetime.datetime.today()
|
|
|
+ summary['error'] = e
|
|
|
+ for key in ['loss', 'score', 'score_variance', 'train_time']:
|
|
|
+ summary[key] = np.nan
|
|
|
+
|
|
|
+ return summary
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def run_trials(self):
|
|
|
+ """
|
|
|
+ Method that runs the hyperparameter tuning over possibly multiple
|
|
|
+ pipeline types specified in self.space
|
|
|
+ When run_trials method is finished the flag self.finished_tuning
|
|
|
+ should be set to True and the methods self._backup_trials and
|
|
|
+ optionally self._save_result should be called.
|
|
|
+ """
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractproperty
|
|
|
+ def number_of_trials(self) -> int:
|
|
|
+ """
|
|
|
+ Number of trials already run in the current trials object
|
|
|
+ """
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractproperty
|
|
|
+ def best_trial(self) -> dict:
|
|
|
+ """
|
|
|
+ Best trial sor far.
|
|
|
+ Should contain the status, pipeline,
|
|
|
+ hyperparameters, and the score (loss).
|
|
|
+ Other information is otional and is defined
|
|
|
+ by self.default_summary
|
|
|
+ """
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractproperty
|
|
|
+ def best_trial_score(self) -> float:
|
|
|
+ """
|
|
|
+ Score of the best pipeline with the best hyperparameters
|
|
|
+ """
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractproperty
|
|
|
+ def best_trial_score_variance(self) -> float:
|
|
|
+ """
|
|
|
+ Variance of the cross-validation score of the best pipeline
|
|
|
+ """
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractproperty
|
|
|
+ def best_trial_pipeline(self) -> Pipeline:
|
|
|
+ """
|
|
|
+ Best pipeline with best hyperparameters
|
|
|
+ """
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def get_n_best_trial_pipelines(self, n: int) -> list:
|
|
|
+ """
|
|
|
+ N best pipelines with corresponding
|
|
|
+ best hyperparameters
|
|
|
+ """
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def get_n_best_trial_pipelines_of_each_type(self, n_int) -> list:
|
|
|
+ """
|
|
|
+ If the hyperparameter search is done over multiple
|
|
|
+ pipelines, then returns n different pipeline-types
|
|
|
+ with corresponding hyperparameters
|
|
|
+ """
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def trials_to_excel(self, path: str) -> None:
|
|
|
+ """
|
|
|
+ Trials object in the shape of table written to excel,
|
|
|
+ should contain the iteration, pipeline (as str),
|
|
|
+ hyperparamters (as str), self.best_result (see self._objective method)
|
|
|
+ as well as additional information defined by self.default_summary
|
|
|
+ """
|
|
|
+ pass
|