#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Wed Sep 30 14:23:23 2020 @author: tanya @description: an abstract class for selecting a machine learning pipeline from a space (deterministic or random) of parameter distributions over multiple pipelines. The selection is thought in such a way that a Trials object is being maintained during the tuning process from which one can retrieve the best pipeline so far as well as the entire tuning history if needed. Methods configure_cross_validation and configure_result_saving allow to use a custom cross-validation method and save the current best result in a file or database during training. Children classes: hyperopt and custom gridsearch. """ import pickle import os import sys import time import datetime import numpy as np import pandas as pd from copy import deepcopy from abc import ABC, abstractmethod, abstractproperty if (sys.version_info.major == 3) & (sys.version_info.minor >= 8): print("I have python version {}.{} and will import typing".format(sys.version_info.major, sys.version_info.minor)) from typing import Callable, TypedDict,\ Literal, Dict, Iterable, List, Tuple, Union else: # from typing_extensions import * print("I have python version {}.{} and will import typing_extensions".format(sys.version_info.major, sys.version_info.minor)) from typing_extensions import TypedDict import functools from sklearn.pipeline import Pipeline from sklearn.model_selection import cross_validate as sklearn_cross_validation from sklearn.metrics import make_scorer from hyperopt import STATUS_OK, STATUS_FAIL from cdplib.log import Log from cdplib.utils.ExceptionsHandler import ExceptionsHandler from cdplib.utils import LoadingUtils from cdplib.ml_validation import CVComposer sys.path.append(os.getcwd()) class SpaceElementType(TypedDict): name: str pipeline: Pipeline params: dict # TODO Tanya: add possibility to include confusion matrix in # additional metrics # check that cv object contains indices class PipelineSelector(ABC): """ An abstract class for selecting a machine learning pipeline from a space (deterministic or random) of parameter distributions over multiple pipelines. The selection is though in such a way that a Trials object is being maintained during the tuning process from which one can retrieve the best pipeline so far as well as the entire tuning history if needed. Methods configure_cross_validation and configure_result_saving allow to use a custom cross-validation method and save the current best result in a file or database during training. Children classes: hyperopt and custom gridsearch. """ def __init__(self, # cost_func: Union[Callable, str], cost_func, greater_is_better: bool, trials_path: str, backup_trials_freq: int = None, cross_validation_needs_scorer: bool = True, # cross_val_averaging_func: Callable = np.mean, cross_val_averaging_func = np.mean, # additional_metrics: Dict[str, Callable] = None, additional_metrics = {}, # additional_averaging_funcs: Dict[str, Callable] = None, additional_averaging_funcs = None, strategy_name: str = None, # stdout_log_level: Literal["INFO", "WARNING", "ERROR"] # = "INFO") stdout_log_level = "INFO"): """ :param Callable cost_func: function to minimize or maximize over the elements of a given (pipeline/hyperparameter) space :param bool greater_is_better: when True cost_func is maximized, else minimized. :param str trials_path: path at which the trials object is saved in binary format. From the trials object we can select information about the obtained scores, score variations, and pipelines, and parameters tried out so far. If a trials object already exists at the given path, it is loaded and the search is continued, else, the search is started from scratch. :param backup_trials_freq: frequecy in interations (trials) of saving the trials object at the trials_path. if None, the trials object is backed up avery time the score improves. :param Callable cross_val_averaging_func: Function to aggregate the cross-validation scores of the cost_func. Example different from the mean: mean - c*var. :param additional_metics: dict of additional metrics to keep track of in the trials of the form {"metric_name": metric}. :param additional_averaging_funcs: functions used to aggregate the output of the cross_validate function. The output always contains the scores of the cost_func, additional_metrics (if it is not empty), but it can also contain additional information (like probability threshold for example) if different from cross_val_averaging_func. Of the form {"metric_name": averaging_func} Remark: :param str strategy_name: a strategy is defined by the data set (columns/features and rows), cv object, cost function. When the strategy changes, one must start with new trials. :param str stdout_log_level: can be INFO, WARNING, ERROR """ self._logger = Log("PipelineSelector: ", stdout_log_level=stdout_log_level) try: ExceptionsHandler(self._logger)\ .assert_is_directory(path=trials_path) self.attached_space = False self.attached_data = False self.configured_cross_validation = False self.configured_summary_saving = False self._cost_func = cost_func self._greater_is_better = greater_is_better # score factor is 1 when cost_func is minimized, # -1 when cost func is maximized self._score_factor = (not greater_is_better) - greater_is_better self._cross_val_averaging_func = cross_val_averaging_func self._additional_metrics = additional_metrics self._additional_averaging_funcs = additional_averaging_funcs or {} self.trials_path = trials_path self._backup_trials_freq = backup_trials_freq self._strategy_name = strategy_name self._data_path = None self._cv_path = None self._X = None self._y = None self._cv = None self._space = None # if cross-valition is not configured, # sklearn cross-validation method is taken by default self._cross_validation = sklearn_cross_validation self._cross_validation_needs_scorer = cross_validation_needs_scorer # if a trials object already exists at the given path, # it is loaded and the search is continued. Else, # the search is started from the beginning. if os.path.isfile(self.trials_path): with open(self.trials_path, "rb") as f: self._trials = pickle.load(f) if len(self._trials) == 0: self._trials = None else: self._trials = None if self._trials is not None: self._start_iteration = self.number_of_trials self.best_score = self.best_trial_score self._logger.info(("Loaded an existing trials object" "Consisting of {} trials") .format(self._start_iteration)) else: self._logger.warning(("No existing trials object was found, " "Starting from scratch.")) self._trials = None self._start_iteration = 0 self.best_score = np.nan # keeping track of the current search iteration self._iteration = self._start_iteration self._score_improved = False self.start_tuning_time = datetime.datetime.today() self.total_tuning_time = None self.finished_tuning = False except Exception as e: err = ("Failed to initialize the class. " "Exit with error: {}".format(e)) self._logger.log_and_raise_error(err) def _backup_trials(self) -> None: ''' Pickles (Saves) the trials object in binary format. ''' try: with open(self.trials_path, "wb") as f: pickle.dump(self._trials, f) except Exception as e: err = "Could not backup trials. Exit with error: {}".format(e) self._logger.log_and_raise_error(err) def configure_cross_validation(self, # cross_validation: Callable, cross_validation, kwargs: dict = None) -> None: """ Method for attaching a custom cross-validation function :param cross_validation: a function that has the same signature as sklearn.model_selection.cross_validate """ try: kwargs = kwargs or {} self._cross_validation = functools.partial( cross_validation, **kwargs) self.configured_cross_validation = True self._logger.info("Configured cross validation") except Exception as e: err = ("Failed to configure cross-validation. " "Exit with error: {}".format(e)) self._logger.log_and_raise_error(err) def configure_cross_validation_from_module(self, module_path: str, name: str) -> None: """ Attaches a cross-validation funciton defined in a different python model. This function must have the same signature as sklearn.model_seclection.cross_validate :param str module_path: path to python module where the cross_validation function is defined. :param str name: name of the cross validation function loaded froma python module. """ try: self._cross_validation = \ LoadingUtils().load_from_module( module_path=module_path, name=name) self.configured_cross_validation = True self._logger.info("Configured cross validation") except Exception as e: err = ("Failed to load cross-validation from module. " "Exit with error: {}".format(e)) self._logger.log_and_raise_error(err) def attach_space(self, space) -> None: """ Method for attaching the pipeline/hyperparameter space over which the score_func is optimized. :param space: space where the search is performed. A space might be either a list of dictionaries or a hyperopt space object the elements of which are dictionaries with keys: name, pipeline, params """ try: self._space = space self.attached_space = True self._logger.info("Attached parameter distribution space") except Exception as e: err = ("Failed to attach space. " "Exit with error: {}".format(e)) self._logger.log_and_raise_error(err) def attach_space_from_module(self, module_path: str, name: str) -> None: """ Attaches a space defined in a different python module. :param str module_path: path to python module where the space is defined. :param str name: name of the space loaded from a python module. """ try: self._space = LoadingUtils().load_from_module( module_path=module_path, name=name) self.attached_space = True self._logger.info("Attached parameter distribution space") except Exception as e: err = ("Failed to attach space from module. " "Exit with error {}".format(e)) self._logger.loger_and_raise_error(err) def attach_data(self, # X_train: Union[pd.DataFrame, np.ndarray], # y_train: Union[pd.DataFrame, pd.Series, np.ndarray] # = None, # X_val: Union[pd.DataFrame, np.ndarray] # = None, # y_val: Union[pd.DataFrame, pd.Series, np.ndarray] # = None, # cv: Union[Iterable[Tuple[List[int], List[int]]]] # = None X_train, y_train = None, X_val = None, y_val = None, cv = None) -> None: ''' :param array X_train: data on which machine learning pipelines are trained :param array y_train: optional, vector with targets, (None in case of unsupervided learning) :param array X_val: optional, validation data. When not provided, cross-validated value of the cost_func is calculated. :param array y_val: optional, validation targets :param list cv: iterabe of tuples containing train and validation indices or an integer representing the number of folds for a random split of data during cross-validation example: [([0,1,2], [3,4]), ([1,2,3], [4,5])] ''' try: assert((cv is None) == (X_val is not None)),\ "Either cv or X_val must be provided" if cv is None: assert((y_val is None) == (y_train is None)),\ "y_train and y_val must be simultanious" # Here we create a trivial cv object # with one validation split. # XXX Tanya finish here cv = CVComposer.dummy_cv() train_inds = list(range(len(X_train))) val_inds = list(range(len(X_train), len(X_train) + len(X_val))) self._cv = [(train_inds, val_inds)] self._X = np.concatenate([X_train, X_val]) self._y = None if y_train is None\ else np.concatenate([y_train, y_val]) else: self._cv = cv self._X = X_train self._y = y_train self.attached_data = True self._logger.info("Attached data") except Exception as e: err = ("Failed to attach data. " "Exit with error: {}".format(e)) self._logger.log_and_raise_error(err) def attach_data_from_hdf5(self, data_hdf5_store_path: str, cv_pickle_path: str = None) -> None: """ Method for attaching data from a hdf5 store and a cv object from a pickled file. The hdf5 store is a binary file, after loading it, it is a dictionary with keys X_train (y_train, X_val, y_val). The cv is loaded from a pickle file. The reason to separate the data store from the cv store, is the hdf5 is optimized to store large dataframes (especially with simple types) and a a small list of lists like a cv-object is better to be stored as a pickle file. :param str data_hdf5_store_path: path to the hdf5 store with train and validation data :param str cv_pickle_path: path to the pickle file with the cv data """ try: assert(os.path.isfile(data_hdf5_store_path)),\ "Parameter hdf5_store_path is not a file" # close all opened files, because hdf5 will # fail to reopen an opened (for some reason) file import tables tables.file._open_files.close_all() store = pd.HDFStore(data_hdf5_store_path) self._data_path = data_hdf5_store_path data_input = {key: store[key] if key in store else None for key in ["X_train", "y_train", "X_val", "y_val"]} if cv_pickle_path is not None: assert(os.path.isfile(cv_pickle_path)),\ "Parameter cv_pickle_path is not a file" data_input["cv"] = pickle.load(open(cv_pickle_path, "rb")) self._cv_path = cv_pickle_path else: data_input["cv"] = None self.attach_data(**data_input) store.close() except Exception as e: err = "Failed to attach data. Exit with error: {}".format(e) self._logger.log_and_raise_error(err) @property def default_summary(self) -> dict: """ Default summary of the strategy. Every the _objective function is called the current score and the information about the tested space element is added to the summary and it is saved to the Trials. If summary saving is configured it is also saved to a file, or a database when the score improves. """ summary = {} if self._strategy_name is not None: summary["strategy_name"] = self._strategy_name if isinstance(self._cost_func, str): summary["cost_func"] = self._cost_func elif hasattr(self._cost_func, "__name__"): summary["cost_func"] = self._cost_func.__name__ summary["trials_path"] = self.trials_path if self._data_path is not None: summary["data_path"] = self._data_path if self._cv_path is not None: summary["cv_path"] = self._cv_path summary["start_tuning_time"] = self.start_tuning_time summary["iteration"] = self._iteration return summary def configer_summary_saving(self, # save_method: Callable save_method = functools.partial( pd.DataFrame.to_excel, **{"path_or_buf": "result.csv"}), kwargs: dict = None) -> None: """ When the score calculated by _objective function improves, the default summary is updated with information about the current score and pipeline/hyperparameters and can be saved to a file or database, depending on the configured save_method. :param Callable save_method: method for saving the result of the pipeline selection. The method must accept a pandas DataFrame as argument. By default, saving to an excel file. Examples: functools.partial(pd.DataFrame.to_csv, **{"path_or_buf": }) functools.partial(np.savetxt, **{"fname": }) functools.partial(SQLHandler().append_to_table, **{"tablename": }) functools.partial(MongodbHandler().insert_data_into_collection, **{"collection_name": }) using functools can be avoided by providing the kwarg argument :param dict kwargs: a dictionary with keyword arguments (like tablename) to provide to the save_method """ try: kwargs = kwargs or {} self._save_method = functools.partial(save_method, **kwargs) self.configured_summary_saving = True self._logger.info("Configured summary saving") except Exception as e: err = ("Failed to configure the summary saving. " "Exit with error {}".format(e)) self._logger.log_and_raise_error(err) def _save_summary(self, summary: dict) -> None: """ When the score calculated by _objective function improves, the default summary is updated with information about the current score and pipeline/hyperparameters and can be saved to a file or database, depending on the configured save_method. """ try: assert(self.configured_summary_saving),\ "Result saving must be configured first" self._save_method(summary) except Exception as e: err = ("Could not configure summary saving. " "Exit with error: {}".format(e)) self._logger.log_and_raise_error(err) def _evaluate(self, pipeline: Pipeline) :#-> Union[Dict[str, float], None]: """ Calculates the averaged cross-validated score and score variance, as well as the averaged values and variances of the additional metrics. This method is called in the _objective function that is passed to the hyperopt optimizer. This function can be overriden, when the cost needs to be calculated differently, for example with a tensorflow model. :param Pipeline pipeline: machine learning pipeline that will be evaluated with cross-validation :return: dictionary with the aggregated cross-validation scores and the score variances for the scores in the output of the cross-validation function. form of the output: {"score": 10, #score used in optimization, "score_variance": 0.5 "additional_metric1": 5, "additional_metric1_variance": 7} a custom cross-validation function can also include for example probability threshold for each fold, then the output of this function will include the average value and the variance of the probability threshold over the folds. """ try: # scoring = {"score": self._cost_func} | self._additional_metrics scoring = {"score": self._cost_func, **self._additional_metrics} if self._cross_validation_needs_scorer: for metric_name, metric in scoring.items(): scoring[metric_name] = make_scorer( metric, greater_is_better=self._greater_is_better) cross_validation_input_args = { "estimator": pipeline, "X": self._X, "y": self._y, "cv": self._cv, "scoring": scoring } if "error_score" in self._cross_validation.__annotations__: cross_validation_input_args["error_score"] = np.nan scores = self._cross_validation(**cross_validation_input_args) averaging_funcs = { metric_name: self._additional_averaging_funcs[metric_name] if metric_name in self._additional_averaging_funcs else self._cross_val_averaging_func for metric_name in scores} scores_average = { metric_name.replace("test_", ""): averaging_funcs[metric_name](scores[metric_name]) for metric_name in scores if metric_name.startswith("test")} scores_variance = { metric_name.replace("test_", "") + "_variance": np.var(scores[metric_name]) for metric_name in scores if metric_name.startswith("test")} return {**scores_average, **scores_variance} except Exception as e: err = "Failed to evaluate pipeline. Exit with error: {}".format(e) self._logger.log_and_raise_error(err) def _objective(self, space_element: SpaceElementType) -> dict: ''' This method is called in run_trials method that is using the hyperopt fmin opmizer. Uses _evaluate method. It must take as input a space element and produce an output in the form of dictionary with 2 obligatory values loss and status (STATUS_OK or STATUS_FAIL). Other values in the output are optional and can be accessed later through the trials object. :Warning: fmin minimizes the loss, when _evaluate returns a value to be maximized, it is multiplied by -1 to obtain loss. :param SpaceElementType space_element: element of the space over which the optimization is done :output: dictionary with keys loss (minimized value), status with values STATUS_OK or STATUS_FAIL uderstood by hyperopt, score (equal to loss or -loss), score_variance, timestamp (end of execution), train_time: execution time and other keys given in self.default_summary ''' try: start_time = time.time() assert(self.attached_data),\ ("Data must be attached in order " "in order to effectuate the best" "pipeline search") summary = deepcopy(self.default_summary) # backup the current trials if the score improved # at previous iteration or every ith iteration # if the backup_trials_freq is set backup_cond = ((self._backup_trials_freq is not None) and ((self._iteration - self._start_iteration - 1) % self._backup_trials_freq == 0)) or\ self._score_improved if backup_cond: self._backup_trials() self._score_improved = False pipeline = space_element['pipeline'] params = space_element['params'] pipeline.set_params(**params) self._logger.info(("Iteration {0}: " "Current score is {1}: " "Training pipeline {2} " "with parameters: {3}. ").format( self._iteration, self.best_score, space_element['name'], params)) result = self._evaluate(pipeline) summary.update(result) end_time = time.time() summary['status'] = STATUS_OK summary.update(result) summary['loss'] = self._score_factor * summary['score'] summary['timestamp'] = datetime.datetime.today() summary['train_time'] = end_time - start_time self._iteration += 1 self._score_improved = (self.best_score != self.best_score) or\ (self._score_factor*result["score"] < self._score_factor*self.best_score) if self._score_improved: self._logger.info("Score improved, new best score is: {}" .format(result["score"])) self.best_score = result['score'] if self.configured_summary_saving: self._save_summary(summary) except Exception as e: self._logger.warning("Trial failed with error {}".format(e)) summary = {} summary['status'] = STATUS_FAIL summary['timestamp'] = datetime.datetime.today() summary['error'] = e for key in ['loss', 'score', 'score_variance', 'train_time']: summary[key] = np.nan return summary @abstractmethod def run_trials(self): """ Method that runs the hyperparameter tuning over possibly multiple pipeline types specified in self.space When run_trials method is finished the flag self.finished_tuning should be set to True and the methods self._backup_trials and optionally self._save_result should be called. """ pass @abstractproperty def number_of_trials(self) -> int: """ Number of trials already run in the current trials object """ pass @abstractproperty def best_trial(self) -> dict: """ Best trial sor far. Should contain the status, pipeline, hyperparameters, and the score (loss). Other information is otional and is defined by self.default_summary """ pass @abstractproperty def best_trial_score(self) -> float: """ Score of the best pipeline with the best hyperparameters """ pass @abstractproperty def best_trial_score_variance(self) -> float: """ Variance of the cross-validation score of the best pipeline """ pass @abstractproperty def best_trial_pipeline(self) -> Pipeline: """ Best pipeline with best hyperparameters """ pass @abstractmethod def get_n_best_trial_pipelines(self, n: int) -> list: """ N best pipelines with corresponding best hyperparameters """ pass @abstractmethod def get_n_best_trial_pipelines_of_each_type(self, n_int) -> list: """ If the hyperparameter search is done over multiple pipelines, then returns n different pipeline-types with corresponding hyperparameters """ pass @abstractmethod def trials_to_excel(self, path: str) -> None: """ Trials object in the shape of table written to excel, should contain the iteration, pipeline (as str), hyperparamters (as str), self.best_result (see self._objective method) as well as additional information defined by self.default_summary """ pass