123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Wed Sep 30 14:23:23 2020
- @author: tanya
- @description: an abstract class for selecting a machine learning
- pipeline from a space (deterministic or random) of parameter distributions
- over multiple pipelines.
- The selection is thought in such a way that a Trials object is being
- maintained during the tuning process from which one can retrieve
- the best pipeline so far
- as well as the entire tuning history if needed.
- Methods configure_cross_validation and configure_result_saving
- allow to use a custom cross-validation method and
- save the current best result in a file or database during training.
- Children classes: hyperopt and custom gridsearch.
- """
- import pickle
- import os
- import sys
- import time
- import datetime
- import numpy as np
- import pandas as pd
- from copy import deepcopy
- from abc import ABC, abstractmethod, abstractproperty
- if (sys.version_info.major == 3) & (sys.version_info.minor >= 8):
- print("I have python version {}.{} and will import typing".format(sys.version_info.major, sys.version_info.minor))
- from typing import Callable, TypedDict,\
- Literal, Dict, Iterable, List, Tuple, Union
- else:
- # from typing_extensions import *
- print("I have python version {}.{} and will import typing_extensions".format(sys.version_info.major, sys.version_info.minor))
- from typing_extensions import Callable, TypedDict,\
- Literal, Tuple, Union
- import functools
- from sklearn.pipeline import Pipeline
- from sklearn.model_selection import cross_validate as sklearn_cross_validation
- from sklearn.metrics import make_scorer
- from hyperopt import STATUS_OK, STATUS_FAIL
- from cdplib.log import Log
- from cdplib.utils.ExceptionsHandler import ExceptionsHandler
- from cdplib.utils import LoadingUtils
- from cdplib.ml_validation import CVComposer
- sys.path.append(os.getcwd())
- class SpaceElementType(TypedDict):
- name: str
- pipeline: Pipeline
- params: dict
-
- # TODO Tanya: add possibility to include confusion matrix in
- # additional metrics
- # check that cv object contains indices
- class PipelineSelector(ABC):
- """
- An abstract class for selecting a machine learning
- pipeline from a space (deterministic or random) of parameter
- distributions over multiple pipelines.
- The selection is though in such a way that a Trials object is being
- maintained during the tuning process from which one can retrieve
- the best pipeline so far as well as the entire tuning history
- if needed.
- Methods configure_cross_validation and configure_result_saving
- allow to use a custom cross-validation method and
- save the current best result in a file or database during training.
- Children classes: hyperopt and custom gridsearch.
- """
- def __init__(self,
- cost_func: Union[Callable, str],
- greater_is_better: bool,
- trials_path: str,
- backup_trials_freq: int = None,
- cross_validation_needs_scorer: bool = True,
- cross_val_averaging_func: Callable = np.mean,
- # additional_metrics: Dict[str, Callable] = None,
- additional_metrics = None,
- # additional_averaging_funcs: Dict[str, Callable] = None,
- additional_averaging_funcs = None,
- strategy_name: str = None,
- stdout_log_level: Literal["INFO", "WARNING", "ERROR"]
- = "INFO"):
- """
- :param Callable cost_func: function to minimize or maximize
- over the elements of a given (pipeline/hyperparameter) space
- :param bool greater_is_better: when True
- cost_func is maximized, else minimized.
- :param str trials_path: path at which the trials object is saved
- in binary format. From the trials object we can
- select information about the obtained scores, score variations,
- and pipelines, and parameters tried out so far. If a trials object
- already exists at the given path, it is loaded and the
- search is continued, else, the search is started from scratch.
- :param backup_trials_freq: frequecy in interations (trials)
- of saving the trials object at the trials_path.
- if None, the trials object is backed up avery time
- the score improves.
- :param Callable cross_val_averaging_func: Function to aggregate
- the cross-validation scores of the cost_func.
- Example different from the mean: mean - c*var.
- :param additional_metics: dict of additional metrics to keep track of
- in the trials of the form {"metric_name": metric}.
- :param additional_averaging_funcs: functions used to aggregate
- the output of the cross_validate function.
- The output always contains the scores of the cost_func,
- additional_metrics (if it is not empty),
- but it can also contain additional information
- (like probability threshold for example)
- if different from cross_val_averaging_func.
- Of the form {"metric_name": averaging_func}
- Remark:
- :param str strategy_name:
- a strategy is defined by the data set (columns/features and rows),
- cv object, cost function.
- When the strategy changes, one must start with new trials.
- :param str stdout_log_level: can be INFO, WARNING, ERROR
- """
- self._logger = Log("PipelineSelector: ",
- stdout_log_level=stdout_log_level)
- try:
- ExceptionsHandler(self._logger)\
- .assert_is_directory(path=trials_path)
- self.attached_space = False
- self.attached_data = False
- self.configured_cross_validation = False
- self.configured_summary_saving = False
- self._cost_func = cost_func
- self._greater_is_better = greater_is_better
- # score factor is 1 when cost_func is minimized,
- # -1 when cost func is maximized
- self._score_factor = (not greater_is_better) - greater_is_better
- self._cross_val_averaging_func = cross_val_averaging_func
- self._additional_metrics = additional_metrics
- self._additional_averaging_funcs = additional_averaging_funcs or {}
-
- self.trials_path = trials_path
- self._backup_trials_freq = backup_trials_freq
- self._strategy_name = strategy_name
- self._data_path = None
- self._cv_path = None
- self._X = None
- self._y = None
- self._cv = None
- self._space = None
- # if cross-valition is not configured,
- # sklearn cross-validation method is taken by default
- self._cross_validation = sklearn_cross_validation
-
- self._cross_validation_needs_scorer = cross_validation_needs_scorer
- # if a trials object already exists at the given path,
- # it is loaded and the search is continued. Else,
- # the search is started from the beginning.
- if os.path.isfile(self.trials_path):
- with open(self.trials_path, "rb") as f:
- self._trials = pickle.load(f)
-
- if len(self._trials) == 0:
- self._trials = None
-
- else:
- self._trials = None
-
- if self._trials is not None:
- self._start_iteration = self.number_of_trials
- self.best_score = self.best_trial_score
- self._logger.info(("Loaded an existing trials object"
- "Consisting of {} trials")
- .format(self._start_iteration))
- else:
- self._logger.warning(("No existing trials object was found, "
- "Starting from scratch."))
- self._trials = None
- self._start_iteration = 0
- self.best_score = np.nan
- # keeping track of the current search iteration
- self._iteration = self._start_iteration
- self._score_improved = False
- self.start_tuning_time = datetime.datetime.today()
- self.total_tuning_time = None
- self.finished_tuning = False
-
- except Exception as e:
- err = ("Failed to initialize the class. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- def _backup_trials(self) -> None:
- '''
- Pickles (Saves) the trials object in binary format.
- '''
- try:
- with open(self.trials_path, "wb") as f:
- pickle.dump(self._trials, f)
- except Exception as e:
- err = "Could not backup trials. Exit with error: {}".format(e)
- self._logger.log_and_raise_error(err)
- def configure_cross_validation(self,
- cross_validation: Callable,
- kwargs: dict = None) -> None:
- """
- Method for attaching a custom cross-validation function
- :param cross_validation: a function that has the same
- signature as sklearn.model_selection.cross_validate
- """
- try:
- kwargs = kwargs or {}
- self._cross_validation = functools.partial(
- cross_validation, **kwargs)
- self.configured_cross_validation = True
- self._logger.info("Configured cross validation")
- except Exception as e:
- err = ("Failed to configure cross-validation. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- def configure_cross_validation_from_module(self,
- module_path: str,
- name: str) -> None:
- """
- Attaches a cross-validation funciton defined in
- a different python model. This function must have
- the same signature as sklearn.model_seclection.cross_validate
- :param str module_path: path to python module
- where the cross_validation function is defined.
- :param str name: name of the cross validation function
- loaded froma python module.
- """
- try:
- self._cross_validation = \
- LoadingUtils().load_from_module(
- module_path=module_path, name=name)
- self.configured_cross_validation = True
- self._logger.info("Configured cross validation")
- except Exception as e:
- err = ("Failed to load cross-validation from module. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- def attach_space(self, space) -> None:
- """
- Method for attaching the pipeline/hyperparameter space
- over which the score_func is optimized.
- :param space: space where
- the search is performed. A space might be either
- a list of dictionaries or a hyperopt space object
- the elements of which are dictionaries with keys:
- name, pipeline, params
- """
- try:
- self._space = space
- self.attached_space = True
- self._logger.info("Attached parameter distribution space")
- except Exception as e:
- err = ("Failed to attach space. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- def attach_space_from_module(self, module_path: str, name: str) -> None:
- """
- Attaches a space defined in a different python module.
- :param str module_path: path to python module
- where the space is defined.
- :param str name: name of the space loaded from
- a python module.
- """
- try:
- self._space = LoadingUtils().load_from_module(
- module_path=module_path, name=name)
- self.attached_space = True
- self._logger.info("Attached parameter distribution space")
- except Exception as e:
- err = ("Failed to attach space from module. "
- "Exit with error {}".format(e))
- self._logger.loger_and_raise_error(err)
- def attach_data(self, X_train: Union[pd.DataFrame, np.ndarray],
- y_train: Union[pd.DataFrame, pd.Series, np.ndarray]
- = None,
- X_val: Union[pd.DataFrame, np.ndarray]
- = None,
- y_val: Union[pd.DataFrame, pd.Series, np.ndarray]
- = None,
- # cv: Union[Iterable[Tuple[List[int], List[int]]]]
- cv
- = None) -> None:
- '''
- :param array X_train: data on which
- machine learning pipelines are trained
- :param array y_train: optional, vector with targets,
- (None in case of unsupervided learning)
- :param array X_val: optional, validation data.
- When not provided, cross-validated value
- of the cost_func is calculated.
- :param array y_val: optional, validation targets
- :param list cv: iterabe of tuples containing
- train and validation indices or an integer representing
- the number of folds for a random split of data
- during cross-validation
- example: [([0,1,2], [3,4]), ([1,2,3], [4,5])]
- '''
- try:
- assert((cv is None) == (X_val is not None)),\
- "Either cv or X_val must be provided"
- if cv is None:
- assert((y_val is None) == (y_train is None)),\
- "y_train and y_val must be simultanious"
- # Here we create a trivial cv object
- # with one validation split.
-
- # XXX Tanya finish here
-
- cv = CVComposer.dummy_cv()
- train_inds = list(range(len(X_train)))
- val_inds = list(range(len(X_train),
- len(X_train) + len(X_val)))
- self._cv = [(train_inds, val_inds)]
- self._X = np.concatenate([X_train, X_val])
- self._y = None if y_train is None\
- else np.concatenate([y_train, y_val])
- else:
- self._cv = cv
- self._X = X_train
- self._y = y_train
- self.attached_data = True
- self._logger.info("Attached data")
- except Exception as e:
- err = ("Failed to attach data. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- def attach_data_from_hdf5(self,
- data_hdf5_store_path: str,
- cv_pickle_path: str = None) -> None:
- """
- Method for attaching data from a hdf5 store
- and a cv object from a pickled file.
- The hdf5 store is a binary file,
- after loading it, it is a dictionary with keys
- X_train (y_train, X_val, y_val).
- The cv is loaded from a pickle file.
- The reason to separate the data
- store from the cv store, is the hdf5 is optimized to
- store large dataframes (especially with simple types) and
- a a small list of lists like a cv-object is better
- to be stored as a pickle file.
- :param str data_hdf5_store_path: path to the hdf5 store
- with train and validation data
- :param str cv_pickle_path: path to the pickle file with
- the cv data
- """
- try:
- assert(os.path.isfile(data_hdf5_store_path)),\
- "Parameter hdf5_store_path is not a file"
-
- # close all opened files, because hdf5 will
- # fail to reopen an opened (for some reason) file
- import tables
- tables.file._open_files.close_all()
- store = pd.HDFStore(data_hdf5_store_path)
- self._data_path = data_hdf5_store_path
- data_input = {key: store[key] if key in store else None
- for key in ["X_train", "y_train", "X_val", "y_val"]}
- if cv_pickle_path is not None:
- assert(os.path.isfile(cv_pickle_path)),\
- "Parameter cv_pickle_path is not a file"
- data_input["cv"] = pickle.load(open(cv_pickle_path, "rb"))
- self._cv_path = cv_pickle_path
- else:
- data_input["cv"] = None
- self.attach_data(**data_input)
- store.close()
- except Exception as e:
- err = "Failed to attach data. Exit with error: {}".format(e)
- self._logger.log_and_raise_error(err)
- @property
- def default_summary(self) -> dict:
- """
- Default summary of the strategy.
- Every the _objective function is called
- the current score and the information
- about the tested space element is added to the
- summary and it is saved to the Trials.
- If summary saving is configured it is also
- saved to a file, or a database when the score improves.
- """
- summary = {}
- if self._strategy_name is not None:
- summary["strategy_name"] = self._strategy_name
- if isinstance(self._cost_func, str):
- summary["cost_func"] = self._cost_func
- elif hasattr(self._cost_func, "__name__"):
- summary["cost_func"] = self._cost_func.__name__
- summary["trials_path"] = self.trials_path
- if self._data_path is not None:
- summary["data_path"] = self._data_path
- if self._cv_path is not None:
- summary["cv_path"] = self._cv_path
- summary["start_tuning_time"] = self.start_tuning_time
- summary["iteration"] = self._iteration
- return summary
- def configer_summary_saving(self,
- save_method: Callable
- = functools.partial(
- pd.DataFrame.to_excel,
- **{"path_or_buf": "result.csv"}),
- kwargs: dict = None) -> None:
- """
- When the score calculated by _objective function improves,
- the default summary is updated with information about the
- current score and pipeline/hyperparameters
- and can be saved to a file or database, depending
- on the configured save_method.
- :param Callable save_method: method for saving the result
- of the pipeline selection. The method must accept
- a pandas DataFrame as argument.
- By default, saving to an excel file.
- Examples:
- functools.partial(pd.DataFrame.to_csv,
- **{"path_or_buf": <PATH>})
- functools.partial(np.savetxt, **{"fname": <PATH>})
- functools.partial(SQLHandler(<URI>).append_to_table,
- **{"tablename": <NAME>})
- functools.partial(MongodbHandler(<URI>).insert_data_into_collection,
- **{"collection_name": <NAME>})
- using functools can be avoided by providing the kwarg argument
- :param dict kwargs: a dictionary with keyword arguments
- (like tablename) to provide to the save_method
- """
- try:
- kwargs = kwargs or {}
- self._save_method = functools.partial(save_method, **kwargs)
- self.configured_summary_saving = True
- self._logger.info("Configured summary saving")
- except Exception as e:
- err = ("Failed to configure the summary saving. "
- "Exit with error {}".format(e))
- self._logger.log_and_raise_error(err)
- def _save_summary(self, summary: dict) -> None:
- """
- When the score calculated by _objective function improves,
- the default summary is updated with information about the
- current score and pipeline/hyperparameters
- and can be saved to a file or database, depending
- on the configured save_method.
- """
- try:
- assert(self.configured_summary_saving),\
- "Result saving must be configured first"
- self._save_method(summary)
- except Exception as e:
- err = ("Could not configure summary saving. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- def _evaluate(self, pipeline: Pipeline) :#-> Union[Dict[str, float], None]:
- """
- Calculates the averaged cross-validated score and score variance,
- as well as the averaged values and variances of the additional metrics.
- This method is called in the _objective function that is
- passed to the hyperopt optimizer.
- This function can be overriden, when the cost
- needs to be calculated differently,
- for example with a tensorflow model.
- :param Pipeline pipeline: machine learning pipeline
- that will be evaluated with cross-validation
- :return: dictionary with the aggregated
- cross-validation scores and
- the score variances for the scores in the output
- of the cross-validation function.
- form of the output:
- {"score": 10, #score used in optimization,
- "score_variance": 0.5
- "additional_metric1": 5,
- "additional_metric1_variance": 7}
- a custom cross-validation function can also include for
- example probability threshold for each fold, then
- the output of this function will include the average
- value and the variance of the probability threshold
- over the folds.
- """
- try:
-
- scoring = {"score": self._cost_func} | self._additional_metrics
-
- if self._cross_validation_needs_scorer:
- for metric_name, metric in scoring.items():
- scoring[metric_name] = make_scorer(
- metric, greater_is_better=self._greater_is_better)
-
- cross_validation_input_args = {
- "estimator": pipeline,
- "X": self._X,
- "y": self._y,
- "cv": self._cv,
- "scoring": scoring
- }
-
- if "error_score" in self._cross_validation.__annotations__:
- cross_validation_input_args["error_score"] = np.nan
- scores = self._cross_validation(**cross_validation_input_args)
- averaging_funcs = {
- metric_name: self._additional_averaging_funcs[metric_name]
- if metric_name in self._additional_averaging_funcs
- else self._cross_val_averaging_func
- for metric_name in scores}
- scores_average = {
- metric_name.replace("test_", ""):
- averaging_funcs[metric_name](scores[metric_name])
- for metric_name in scores
- if metric_name.startswith("test")}
- scores_variance = {
- metric_name.replace("test_", "") + "_variance":
- np.var(scores[metric_name])
- for metric_name in scores
- if metric_name.startswith("test")}
- return {**scores_average, **scores_variance}
- except Exception as e:
- err = "Failed to evaluate pipeline. Exit with error: {}".format(e)
- self._logger.log_and_raise_error(err)
- def _objective(self, space_element: SpaceElementType) -> dict:
- '''
- This method is called in run_trials method
- that is using the hyperopt fmin opmizer.
- Uses _evaluate method.
- It must take as input a space element
- and produce an output in the form of dictionary
- with 2 obligatory values loss and status
- (STATUS_OK or STATUS_FAIL). Other
- values in the output are optional and can be
- accessed later through the trials object.
- :Warning: fmin minimizes the loss,
- when _evaluate returns a value to be maximized,
- it is multiplied by -1 to obtain loss.
- :param SpaceElementType space_element: element
- of the space over which the optimization is done
- :output: dictionary with keys
- loss (minimized value),
- status with values STATUS_OK or STATUS_FAIL
- uderstood by hyperopt,
- score (equal to loss or -loss),
- score_variance,
- timestamp (end of execution),
- train_time: execution time
- and other keys given in self.default_summary
- '''
- try:
- start_time = time.time()
- assert(self.attached_data),\
- ("Data must be attached in order "
- "in order to effectuate the best"
- "pipeline search")
- summary = deepcopy(self.default_summary)
- # backup the current trials if the score improved
- # at previous iteration or every ith iteration
- # if the backup_trials_freq is set
- backup_cond = ((self._backup_trials_freq is not None) and
- ((self._iteration - self._start_iteration - 1) %
- self._backup_trials_freq == 0)) or\
- self._score_improved
- if backup_cond:
- self._backup_trials()
- self._score_improved = False
- pipeline = space_element['pipeline']
- params = space_element['params']
- pipeline.set_params(**params)
- self._logger.info(("Iteration {0}: "
- "Current score is {1}: "
- "Training pipeline {2} "
- "with parameters: {3}. ").format(
- self._iteration,
- self.best_score,
- space_element['name'],
- params))
- result = self._evaluate(pipeline)
- summary.update(result)
- end_time = time.time()
- summary['status'] = STATUS_OK
- summary.update(result)
- summary['loss'] = self._score_factor * summary['score']
- summary['timestamp'] = datetime.datetime.today()
- summary['train_time'] = end_time - start_time
- self._iteration += 1
- self._score_improved = (self.best_score != self.best_score) or\
- (self._score_factor*result["score"] <
- self._score_factor*self.best_score)
- if self._score_improved:
- self._logger.info("Score improved, new best score is: {}"
- .format(result["score"]))
- self.best_score = result['score']
- if self.configured_summary_saving:
- self._save_summary(summary)
- except Exception as e:
- self._logger.warning("Trial failed with error {}".format(e))
- summary = {}
- summary['status'] = STATUS_FAIL
- summary['timestamp'] = datetime.datetime.today()
- summary['error'] = e
- for key in ['loss', 'score', 'score_variance', 'train_time']:
- summary[key] = np.nan
- return summary
- @abstractmethod
- def run_trials(self):
- """
- Method that runs the hyperparameter tuning over possibly multiple
- pipeline types specified in self.space
- When run_trials method is finished the flag self.finished_tuning
- should be set to True and the methods self._backup_trials and
- optionally self._save_result should be called.
- """
- pass
- @abstractproperty
- def number_of_trials(self) -> int:
- """
- Number of trials already run in the current trials object
- """
- pass
- @abstractproperty
- def best_trial(self) -> dict:
- """
- Best trial sor far.
- Should contain the status, pipeline,
- hyperparameters, and the score (loss).
- Other information is otional and is defined
- by self.default_summary
- """
- pass
- @abstractproperty
- def best_trial_score(self) -> float:
- """
- Score of the best pipeline with the best hyperparameters
- """
- pass
- @abstractproperty
- def best_trial_score_variance(self) -> float:
- """
- Variance of the cross-validation score of the best pipeline
- """
- pass
- @abstractproperty
- def best_trial_pipeline(self) -> Pipeline:
- """
- Best pipeline with best hyperparameters
- """
- pass
- @abstractmethod
- def get_n_best_trial_pipelines(self, n: int) -> list:
- """
- N best pipelines with corresponding
- best hyperparameters
- """
- pass
- @abstractmethod
- def get_n_best_trial_pipelines_of_each_type(self, n_int) -> list:
- """
- If the hyperparameter search is done over multiple
- pipelines, then returns n different pipeline-types
- with corresponding hyperparameters
- """
- pass
- @abstractmethod
- def trials_to_excel(self, path: str) -> None:
- """
- Trials object in the shape of table written to excel,
- should contain the iteration, pipeline (as str),
- hyperparamters (as str), self.best_result (see self._objective method)
- as well as additional information defined by self.default_summary
- """
- pass
|