#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Fri Nov 9 13:27:44 2018 @author: tanja @description: Implementation of machine learning pipeline selection and tuning with hyperopt library """ import os import sys import gc import logging import pickle import time import datetime import pandas as pd import numpy as np from sklearn.pipeline import Pipeline from hyperopt import fmin, tpe, rand, Trials, hp, STATUS_OK, STATUS_FAIL,\ space_eval, pyll from sklearn.model_selection import cross_validate class HyperoptPipelineSelection: ''' Use this class to perform a search for a machine learning pipeline in a given parameter space. The parameter space can include multiple types of Pipelines (SVM, XGBOOST, random forest, etc), as well as parameter distributions for each pipeline parameter. See example in main for the expected space structure. The search can be performed either randomly or with a tree-based algorithm. (Other methods are currently developped by hyperopt creators). Attribute trials is responsible for book-keeping parameter combinations that have already been tried out. This attribute is saved to a binary file every n minutes as well as every time a better pipeline was found. ''' def __init__(self, cost_func, greater_is_better: bool, trials_path: str, backup_trials_freq: int = 1, log_path: str = None, averaging_func: callable = None): ''' :param callable cost_func: function to minimize or maximize :param bool greater_is_better: when True cost_func is maximized, else minimized. :param str trials_path: path at which the trials object is saved in binary format. From the trials object we can select information about the obtained scores, score variations, and pipelines, and parameters tried out so far. If a trials object already exists at the given path, it is loaded and the search is continued, else, the search is started from the beginning. :param backup_trials_freq: frequecy in interations (trials) of saving the trials object at the trials_path. :param str log_path: Optional, when not provided logs to stdout. :param callable averaging_func: optional, when not provided set to mean. Function to aggregate the cross-validated values of the cost function. Classic situation is to take the mean, another example is, for example mean() - c*var(). ''' assert(callable(cost_func)),\ "Parameter 'cost_func' must be a callable" assert(isinstance(greater_is_better, bool)),\ "Parameter 'greater_is_better' must be bool type" assert(isinstance(trials_path, str)),\ "Parameter 'trials_path' must be of string type" if averaging_func is not None: assert(callable(averaging_func)),\ "Parameter 'averaging_func' must be a callable" self._assert_valid_directory(path=trials_path) self._configer_logger(log_path) self._cost_func = cost_func # is 1 when cost_func is minimized, -1 when cost func is maximized self._score_factor = (not greater_is_better) - greater_is_better self._trials_path = trials_path # is initialized with empty trials object self._trials = Trials() self._backup_trials_freq = backup_trials_freq self._averaging_func = averaging_func or np.mean # keeping track of the current search iteration self._run_number = 0 # space and data need to be attached to perform search. self._space_attached = False self._data_attached = False # if a trials object already exists at the given path, # it is loaded and the search is continued. Else, # the search is started from the beginning. if os.path.isfile(trials_path): try: with open(trials_path, "rb") as f: self._trials = pickle.load(f) self._logger.info(("Loaded an existing trials object" "Consisting of {} trials") .format(len(self._trials.trials))) except Exception as e: self._logger.error(("Trials object could not be loaded. " "Training starts from the beginning. " "Exit with error {}").format(e)) else: self._logger.info(("No existing trials object was found" "Initialized an empty trials object.")) self._best_score = self.best_trial_score def _configer_logger(self, log_path: str = None): ''' Can be replaced with the existing script later. When log_path is not provided, logs to stdout. ''' self._logger = logging.getLogger(__name__) if (self._logger.hasHandlers()): self._logger.handlers.clear() if log_path is not None: assert(isinstance(log_path, str)),\ "Parameter 'log_path' must be of string type" self._assert_valid_directory(log_path) handler = logging.FileHandler(log_path) else: handler = logging.StreamHandler(sys.stdout) formatter = logging.Formatter( '\n %(asctime)s %(levelname)s %(message)s') handler.setFormatter(formatter) self._logger.addHandler(handler) self._logger.setLevel("INFO") def _backup_trials(self): ''' Pickles (Saves) the trials object. Used in a scheduler. ''' with open(self._trials_path, "wb") as f: pickle.dump(self._trials, f) def _assert_valid_directory(self, path: str): ''' If the directory of a path does not exist yet, creates it. ''' assert(isinstance(path, str)),\ "Parameter 'path' must of str type" dirname = os.path.dirname("path") if len(dirname) > 0: os.mkdir(dirname, exists_ok=True) def attach_space(self, space: pyll.base.Apply = None, module_path: str = None, name: str = None): ''' :param pyll.base.Apply space: hyperopt space where the search is performed. Optional when a space is loaded from a python module. :param str module_path: path to python module where the space is defined. Optional when the space is provided directly. :param str name: name of the space loaded from a python module. Optional when the space is provided directly. ''' assert((space is not None) or ((module_path is not None) and (name is not None))),\ "Either space or (module_path, name) must be provided" if space is None: for p in ["modele_path", "name"]: assert(isinstance(p, str)),\ "Parameter '{}' must be of str type".format(p) assert(os.path.isfile(module_path)),\ "Parameter 'module_path' must be a valid file" module, extension = os.path.splitext(os.path.basename(module_path)) assert(extension == ",py"),\ "Parameter 'space' must be read from a python file" sys.path.insert(module_path) try: from module import name as space except ImportError: err = "Invalid space location or name" self._logger.error(err) raise Exception(err) assert(isinstance(space, pyll.base.Apply)),\ "Parameter 'space' must be of hyperopt space type" self._space = space self._logger.info("Attached parameter distribution space") self._space_attached = True def _convert_to_array(self, x: (pd.DataFrame, np.ndarray))\ -> np.ndarray: ''' Converts an DataFrame to an numpy array. ''' if isinstance(x, np.ndarray): return x elif (isinstance(x, pd.core.frame.DataFrame))\ or (isinstance(x, pd.core.series.Series)): return x.values else: e = 'The argument must be a numpy array or a pandas DataFrame' self._logger.critical(e) raise ValueError(e) def attach_data(self, X_train: (pd.DataFrame, np.ndarray), y_train: (pd.DataFrame, pd.Series, np.ndarray) = None, X_val: (pd.DataFrame, np.ndarray) = None, y_val: (pd.DataFrame, pd.Series, np.ndarray) = None, cv: (list, int) = None): ''' :param array X_train: data on which machine learning pipelines are trained :param array y_train: optional, vector with targets, (not all algorithms require a targets) :param array X_val: optional, validation data. When not provided, cross-validated value of the cost_func is calculated. :param array y_val: optional, validation targets :param list cv: list of tuples containing train and validation indices or an integer representing the number of folds for a random split of data during cross-validation example: [([0,1,2], [3,4]), ([1,2,3], [4,5])] ''' X_train = self._convert_to_array(X_train) if y_train is not None: y_train = self._convert_to_array(y_train) if X_val is not None: if cv is not None: self._logger.warning(("Both validation set and cv object " "are set. Validation score will be " "calculated on the validation set!")) X_val = self._convert_to_array(X_val) train_inds = list(range(len(X_train))) val_inds = list(range(len(X_train), len(X_train) + len(X_val))) # cost is evaluated with a cross validation function # that accepts an array and a cv object with # indices of the fold splits. # Here we create a trivial cv object # with one validation split. self._cv = [(train_inds, val_inds)] self._X = np.concatenate([X_train, X_val]) if y_train is not None: if y_val is None: err = "Argument y_val must be provided" self._logger.critical(err) raise ValueError(err) else: y_val = self._convert_to_array(y_val) self._y = np.concatenate([y_train, y_val]) else: self._y = None else: if cv is None: self._logger.warning(("Neither validation set nor cv object " "are set. Validation score will be " "calculated on 5 randomly " "splitted folds.")) self._X = X_train self._y = y_train self._cv = cv self._logger.info("Attached data") self._data_attached = True def _evaluate(self, pipeline: Pipeline) -> dict: ''' This method is called in _objective. Calculates the cost on the attached data. This function can be overriden, when the cost needs to be calculated differently, for example with a tensorflow model. :param Pipeline pipeline: machine learning pipeline that will be evaluated with cross-validation :output: dictionary with the aggregated cross-validation score and the score variance. ''' scores = cross_validate(estimator=pipeline, X=self._X, y=self._y, cv=self._cv or 5, scoring=make_scorer(self._cost_func), error_score=np.nan) return {'value': self._averaging_func(scores['test_score']), 'variance': np.var(scores['test_score'])} def _objective(self, space_element: dict) -> dict: ''' This method is called in search_for_best_pipeline inside the hyperopt fmin method. Uses _evaluate method. It must take as input a space element and produce an output in the form of dictionary with 2 obligatory values loss and status (STATUS_OK or STATUS_FAIL). Other values in the output are optional and can be accessed later through the trials object. :Warning: fmin minimizes the loss, when _evaluate returns a value to be maximized, it should be multiplied by -1 to obtain loss. :param dict space_element: must contain keys name (with the name of the pipeline), pipeline (Pipeline object), params (dict of pipeline params) :output: dictionary with keys loss (minimized value), status with values STATUS_OK or STATUS_FAIL uderstood by hyperopt, score (equal to loss or -loss), score_variance, timestamp (end of execution), train_time: execution time ''' assert(isinstance(space_element, dict) and set(['name', 'pipeline', 'params']) <= space_element.keys()) assert(isinstance(space_element['name'], str) and isinstance(space_element['pipeline'], Pipeline) and isinstance(space_element['params'], dict)) start_time = time.time() if not self._data_attached: raise Exception(("Data must be attached in order " "in order to effectuate the best" "pipeline search")) self._run_number += 1 pipeline = space_element['pipeline'] params = space_element['params'] pipeline.set_params(**params) self._logger.info(("Run number {0}: " "Current score is {1}: " "Training pipeline {2} " "with parameters: {3}. ").format( self._run_number, self._best_score, space_element['name'], params)) try: score_stats = self._evaluate(pipeline) assert(not np.isnan(score_stats["value"])),\ "Returned null score" if self._run_number % self._backup_trials_freq == 0: self._backup_trials() if (self._best_score != self._best_score) or\ self._score_factor*score_stats["value"] <\ self._score_factor*self._best_score: self._logger.info("Score got better, new best score is: {}" .format(score_stats["value"])) self._best_score = score_stats['value'] self._backup_trials() end_time = time.time() return {'loss': self._score_factor * score_stats["value"], 'status': STATUS_OK, 'score': score_stats["value"], 'score_variance': score_stats["variance"], 'timestamp': datetime.datetime.today(), 'train_time': end_time - start_time} except Exception as e: self._logger.warning("Trial failed with error {}".format(e)) return {'loss': np.nan, 'status': STATUS_FAIL, 'score': np.nan, 'score_variance': np.nan, 'timestamp': datetime.datetime.today(), 'train_time': np.nan} def search_for_best_pipeline(self, niter: int, algo: callable = tpe.suggest): ''' Method performing the search of the best pipeline in the given space. Calls fmin function from the hyperopt library to minimize the output of _objective. :params int niter: number of search iterations :param callable algo: now can only take values tpe for a tree-based random search or random for random search ''' assert(self._space_attached),\ "Space must be attach to be able to retrieve this information." assert(isinstance(niter, int)),\ "Parameter 'niter' must be of int type" # right now only two algorithms are provided by assert(algo in [tpe.suggest, rand.suggest]),\ ("Parameter 'algo' can be now only tpe or random. " "If other algorithms have been developped by " "by hyperopt, plased add them to the list.") try: self._logger.info(("Starting {0} iterations of search " "additional to {1} previous" .format(niter, len(self._trials.trials)))) best = fmin(fn=self._objective, space=space, algo=algo, trials=self._trials, max_evals=len(self._trials.trials) + niter) # print('AAAA', str(niter)) self._logger.info( "Best score is {0} with variance {1}" .format( self._trials.best_trial["result"]["score"], self._trials.best_trial["result"]["score_variance"])) self._logger.info(("Finished {0} iterations of search.\n" "Best parameters are:\n {1} ") .format(niter, space_eval(space, best))) self._backup_trials() except Exception as e: raise ValueError(("Failed to select best " "pipeline! Exit with error: {}").format(e)) @property def best_trial_score(self) -> float: ''' ''' if len(self._trials.trials) > 0: return self._trials.best_trial["result"]["score"] else: return np.nan @property def best_trial_score_variance(self) -> float: ''' ''' if len(self._trials.trials) > 0: return self._trials.best_trial["result"]["score_variance"] else: return np.nan @property def best_trial_pipeline(self) -> Pipeline: ''' ''' assert(self._space_attached),\ "Space must be attach to be able to retrieve this information." if len(self._trials.trials) > 0: return space_eval( space, {k: v[0] for k, v in self._trials.best_trial['misc']['vals'].items() if len(v) > 0})["pipeline"] else: err = ("Trials object is empty. " "Best pipeline cannot be returned") self._logger.error(err) raise Exception(err) def _ith_trial_loss(self, i: int) -> float: ''' ''' if len(self._trials.trials) >= i: return self._trials.trials[i]['result']['loss'] else: return np.nan def _ith_trial_element(self, i: int, name: str) -> object: ''' ''' assert(self._space_attached),\ "Space must be attach to be able to retrieve this information." if len(self._trials.trials) >= i: return space_eval(self._space, {k: v[0] for k, v in self._trials.trials[i]['misc']['vals'] .items() if len(v) > 0})[name] def _ith_trial_pipeline(self, i: int) -> Pipeline: ''' ''' return self._ith_trial_element(i=i, name='pipeline') def _ith_trial_name(self, i: int) -> str: ''' ''' return self._ith_trial_element(i=i, name='name') def _ith_trial_params(self, i: int) -> dict: ''' ''' return self._ith_trial_element(i=i, name='params') def _ith_trial_timestamp(self, i: int) -> datetime.datetime: ''' ''' if len(self._trials.trials) >= i: return self._trials.trials[i]["result"]["timestamp"] def get_n_best_trial_pipelines(self, n: int, losses: list = None) -> list: ''' Returns the list of n best pipelines documented in trials ''' if len(self._trials.trials) > 0: if losses is None: losses = [self._ith_trial_loss(i) for i in range(len(self._trials.trials))] best_n_indices = [losses.index(l) for l in sorted(list(set(losses)))[:n]] return [self._ith_trial_pipeline(i) for i in best_n_indices] else: err = ("Trials object is empty. " "Best pipeline cannot be returned") self._logger.error(err) raise Exception(err) def get_n_best_trial_pipelines_of_each_type(self, n: int) -> dict: ''' Returns a dictiionry where keys are pipeline names, and values are lists of best pipelines with this name ''' assert(isinstance(n, int)), "Parameter 'n' must be an integer" if len(self._trials.trials) > 0: best_pipelines_per_type = {} names = [self._ith_trial_name(i) for i in range(len(self._trials.trials))] for nm in names: losses = [self._ith_trial_loss(i) for i in range(len(self._trials.trials)) if self._ith_trial_name(i) == nm] best_pipelines_per_type[nm] = self.get_n_best_trial_pipelines( n=n, losses=losses) return best_pipelines_per_type else: err = ("Trials object is empty. " "Best pipeline cannot be returned") self._logger.error(err) raise Exception(err) def write_trials_documentation(self, path: str = None): ''' Saves an excel file with pipeline names, scores, parameters, and timestamps. ''' if len(self._trials.trials) > 0: path = path or "hyperopt_trials_documentation.xlsx" assert(isinstance(path, str)),\ "Parameter 'path' must be of string type" self._assert_valid_directory(path) names = [self._ith_trial_name(i) for i in range(len(self._trials.trials))] scores = [self._score_factor*self._ith_trial_loss(i) for i in range(len(self._trials.trials))] params = [self._ith_trial_params(i) for i in range(len(self._trials.trials))] timestamps = [self._ith_trial_timestamp(i) for i in range(len(self._trials.trials))] else: names = [] scores = [] params = [] timestamps = [] pd.DataFrame({"name": names, "score": scores, "params": params, "timestamp": timestamps})\ .to_excel(path) if __name__ == '__main__': from sklearn.metrics import roc_auc_score, make_scorer from xgboost import XGBClassifier from sklearn.svm import SVC from sklearn.feature_selection import SelectKBest from sklearn.decomposition import PCA from sklearn.datasets import load_iris from pprint import pprint data = load_iris() X = pd.DataFrame(data.data) y = pd.Series(data.target) # produce a binory variable y = (y == 2).astype(int) del data gc.collect() # SPACE DEFINITION ######################################## # (can be moved to a separate python script) """ A search space must be a list of dictionaries. Each dictionry must have keys: name (pipeline name or type), pipeline (instance of sklearn.pipeline.Pipeline), params (dictionary of distributions for the parameters of the pipeline that we want to tune) Here we have a space that consists of two dictionaries: KBEST_XGBOOST and PCA_SVC """ space = [] pipeline_dist_1 = {} pipeline_dist_1["name"] = "KBEST_XGBOOST" """ A pipeline consists of steps (tuples). Each step has a name and an algorithm. This pipeline, as a first step performs feature selection with SelectKBest and as a second step evaluates a machine learning algo (xgboost). Like all sklearn algorithms, a Pipeline has methods fit, predict, set_params, get_params """ pipeline_dist_1["pipeline"] = Pipeline([ ('kbest', SelectKBest()), ('xgb', XGBClassifier()) ]) """ Pipeline parameter dictionaries must be of the form: {'kbest__k': 3, xgb__n_estimators: 20}, each parameter name consists of the step name, __, and parameter name. Here, instead of values, the parameter names are followed by hyperopt distributions. Each hyperopt distribution also must have a name, due to hyperopt functionality. Here, we set the hyperopt distribution name to the step name, but it does not have to be so. Hyperopt distribution names must be different for different elements of the space. """ pipeline_dist_1["params"] = { 'kbest__k': hp.choice('kbest__k', range(1, 5)), 'xgb__n_estimators': 50 + hp.randint('xgb__n_estimators', 50), "xgb__learning_rate": hp.loguniform('xgb__learning_rate', np.log(0.01), np.log(0.2)) } space.append(pipeline_dist_1) pipeline_dist_2 = {} pipeline_dist_2["name"] = "PCA_SVC" pipeline_dist_2["pipeline"] = Pipeline([ ('pca', PCA()), ('svc', SVC(gamma="scale")) ]) pipeline_dist_2["params"] = { "pca__n_components": 1 + hp.randint("pca__n_components", 4), "svc__C": hp.loguniform("svc__C", np.log(0.01), np.log(0.1)) } space.append(pipeline_dist_2) space = hp.choice('pipelines', space) # TESTING ########################################################## trials_path = 'TEST_hyperopt_trials.pkl' doc_path = 'TEST_hyperopt_doc.xlsx' hp_obj = HyperoptPipelineSelection(cost_func=roc_auc_score, greater_is_better=True, trials_path=trials_path) hp_obj.attach_data(X_train=X, y_train=y) hp_obj.attach_space(space=space) hp_obj.search_for_best_pipeline(niter=10) print('\n', '='*20, 'TESTING', '='*20) print('\n', 'Best score:', hp_obj.best_trial_score) print('\n', 'Best score variance:', hp_obj.best_trial_score_variance) print('\n', 'Best pipeline', hp_obj.best_trial_pipeline) print('\n', 'Best 3 pipelines: \n') pprint(hp_obj.get_n_best_trial_pipelines(n=3)) print('\n', 'Best pipeline per type: \n') pprint(hp_obj.get_n_best_trial_pipelines_of_each_type(n=1)) hp_obj.write_trials_documentation(path=doc_path) # os.remove(doc_path) # os.remove(trials_path)