123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Fri Nov 9 13:27:44 2018
- @author: tanja
- @description: Implementation of machine learning
- pipeline selection and tuning with hyperopt library
- """
- import os
- import sys
- import gc
- import logging
- import pickle
- import time
- import datetime
- import pandas as pd
- import numpy as np
- from sklearn.pipeline import Pipeline
- from hyperopt import fmin, tpe, rand, Trials, hp, STATUS_OK, STATUS_FAIL,\
- space_eval, pyll
- from sklearn.model_selection import cross_validate
- class HyperoptPipelineSelection:
- '''
- Use this class to perform a search
- for a machine learning pipeline in a given parameter space.
- The parameter space can include multiple types of Pipelines
- (SVM, XGBOOST, random forest, etc),
- as well as parameter distributions for each pipeline parameter.
- See example in main for the expected space structure.
- The search can be performed either randomly
- or with a tree-based algorithm. (Other methods are currently
- developped by hyperopt creators).
- Attribute trials is responsible for book-keeping parameter
- combinations that have already been tried out. This attribute
- is saved to a binary file every n minutes as well as every time
- a better pipeline was found.
- '''
- def __init__(self,
- cost_func,
- greater_is_better: bool,
- trials_path: str,
- backup_trials_freq: int = 1,
- log_path: str = None,
- averaging_func: callable = None):
- '''
- :param callable cost_func: function to minimize or maximize
- :param bool greater_is_better: when True
- cost_func is maximized, else minimized.
- :param str trials_path: path at which the trials object is saved
- in binary format. From the trials object we can
- select information about the obtained scores, score variations,
- and pipelines, and parameters tried out so far. If a trials object
- already exists at the given path, it is loaded and the
- search is continued, else, the search is started from
- the beginning.
- :param backup_trials_freq: frequecy in interations (trials)
- of saving the trials object at the trials_path.
- :param str log_path: Optional, when not provided logs to stdout.
- :param callable averaging_func: optional,
- when not provided set to mean. Function
- to aggregate the cross-validated values of the cost function.
- Classic situation is to take the mean,
- another example is, for example mean() - c*var().
- '''
- assert(callable(cost_func)),\
- "Parameter 'cost_func' must be a callable"
- assert(isinstance(greater_is_better, bool)),\
- "Parameter 'greater_is_better' must be bool type"
- assert(isinstance(trials_path, str)),\
- "Parameter 'trials_path' must be of string type"
- if averaging_func is not None:
- assert(callable(averaging_func)),\
- "Parameter 'averaging_func' must be a callable"
- self._assert_valid_directory(path=trials_path)
- self._configer_logger(log_path)
- self._cost_func = cost_func
- # is 1 when cost_func is minimized, -1 when cost func is maximized
- self._score_factor = (not greater_is_better) - greater_is_better
- self._trials_path = trials_path
- # is initialized with empty trials object
- self._trials = Trials()
- self._backup_trials_freq = backup_trials_freq
- self._averaging_func = averaging_func or np.mean
- # keeping track of the current search iteration
- self._run_number = 0
- # space and data need to be attached to perform search.
- self._space_attached = False
- self._data_attached = False
- # if a trials object already exists at the given path,
- # it is loaded and the search is continued. Else,
- # the search is started from the beginning.
- if os.path.isfile(trials_path):
- try:
- with open(trials_path, "rb") as f:
- self._trials = pickle.load(f)
- self._logger.info(("Loaded an existing trials object"
- "Consisting of {} trials")
- .format(len(self._trials.trials)))
- except Exception as e:
- self._logger.error(("Trials object could not be loaded. "
- "Training starts from the beginning. "
- "Exit with error {}").format(e))
- else:
- self._logger.info(("No existing trials object was found"
- "Initialized an empty trials object."))
- self._best_score = self.best_trial_score
- def _configer_logger(self, log_path: str = None):
- '''
- Can be replaced with the existing script later.
- When log_path is not provided, logs to stdout.
- '''
- self._logger = logging.getLogger(__name__)
- if (self._logger.hasHandlers()):
- self._logger.handlers.clear()
- if log_path is not None:
- assert(isinstance(log_path, str)),\
- "Parameter 'log_path' must be of string type"
- self._assert_valid_directory(log_path)
- handler = logging.FileHandler(log_path)
- else:
- handler = logging.StreamHandler(sys.stdout)
- formatter = logging.Formatter(
- '\n %(asctime)s %(levelname)s %(message)s')
- handler.setFormatter(formatter)
- self._logger.addHandler(handler)
- self._logger.setLevel("INFO")
- def _backup_trials(self):
- '''
- Pickles (Saves) the trials object.
- Used in a scheduler.
- '''
- with open(self._trials_path, "wb") as f:
- pickle.dump(self._trials, f)
- def _assert_valid_directory(self, path: str):
- '''
- If the directory of a path does not exist yet,
- creates it.
- '''
- assert(isinstance(path, str)),\
- "Parameter 'path' must of str type"
- dirname = os.path.dirname("path")
- if len(dirname) > 0:
- os.mkdir(dirname, exists_ok=True)
- def attach_space(self, space: pyll.base.Apply = None,
- module_path: str = None,
- name: str = None):
- '''
- :param pyll.base.Apply space: hyperopt space where
- the search is performed. Optional when a space
- is loaded from a python module.
- :param str module_path: path to python module
- where the space is defined. Optional when
- the space is provided directly.
- :param str name: name of the space loaded from
- a python module. Optional when the space
- is provided directly.
- '''
- assert((space is not None) or
- ((module_path is not None) and (name is not None))),\
- "Either space or (module_path, name) must be provided"
- if space is None:
- for p in ["modele_path", "name"]:
- assert(isinstance(p, str)),\
- "Parameter '{}' must be of str type".format(p)
- assert(os.path.isfile(module_path)),\
- "Parameter 'module_path' must be a valid file"
- module, extension = os.path.splitext(os.path.basename(module_path))
- assert(extension == ",py"),\
- "Parameter 'space' must be read from a python file"
- sys.path.insert(module_path)
- try:
- from module import name as space
- except ImportError:
- err = "Invalid space location or name"
- self._logger.error(err)
- raise Exception(err)
- assert(isinstance(space, pyll.base.Apply)),\
- "Parameter 'space' must be of hyperopt space type"
- self._space = space
- self._logger.info("Attached parameter distribution space")
- self._space_attached = True
- def _convert_to_array(self, x: (pd.DataFrame, np.ndarray))\
- -> np.ndarray:
- '''
- Converts an DataFrame to an numpy array.
- '''
- if isinstance(x, np.ndarray):
- return x
- elif (isinstance(x, pd.core.frame.DataFrame))\
- or (isinstance(x, pd.core.series.Series)):
- return x.values
- else:
- e = 'The argument must be a numpy array or a pandas DataFrame'
- self._logger.critical(e)
- raise ValueError(e)
- def attach_data(self, X_train: (pd.DataFrame, np.ndarray),
- y_train: (pd.DataFrame, pd.Series, np.ndarray) = None,
- X_val: (pd.DataFrame, np.ndarray) = None,
- y_val: (pd.DataFrame, pd.Series, np.ndarray) = None,
- cv: (list, int) = None):
- '''
- :param array X_train: data on which
- machine learning pipelines are trained
- :param array y_train: optional, vector with targets,
- (not all algorithms require a targets)
- :param array X_val: optional, validation data.
- When not provided, cross-validated value
- of the cost_func is calculated.
- :param array y_val: optional, validation targets
- :param list cv: list of tuples containing
- train and validation indices or an integer representing
- the number of folds for a random split of data
- during cross-validation
- example: [([0,1,2], [3,4]), ([1,2,3], [4,5])]
- '''
- X_train = self._convert_to_array(X_train)
- if y_train is not None:
- y_train = self._convert_to_array(y_train)
- if X_val is not None:
- if cv is not None:
- self._logger.warning(("Both validation set and cv object "
- "are set. Validation score will be "
- "calculated on the validation set!"))
- X_val = self._convert_to_array(X_val)
- train_inds = list(range(len(X_train)))
- val_inds = list(range(len(X_train),
- len(X_train) + len(X_val)))
- # cost is evaluated with a cross validation function
- # that accepts an array and a cv object with
- # indices of the fold splits.
- # Here we create a trivial cv object
- # with one validation split.
- self._cv = [(train_inds, val_inds)]
- self._X = np.concatenate([X_train, X_val])
- if y_train is not None:
- if y_val is None:
- err = "Argument y_val must be provided"
- self._logger.critical(err)
- raise ValueError(err)
- else:
- y_val = self._convert_to_array(y_val)
- self._y = np.concatenate([y_train, y_val])
- else:
- self._y = None
- else:
- if cv is None:
- self._logger.warning(("Neither validation set nor cv object "
- "are set. Validation score will be "
- "calculated on 5 randomly "
- "splitted folds."))
- self._X = X_train
- self._y = y_train
- self._cv = cv
- self._logger.info("Attached data")
- self._data_attached = True
- def _evaluate(self, pipeline: Pipeline) -> dict:
- '''
- This method is called in _objective.
- Calculates the cost on the attached data.
- This function can be overriden, when the cost
- needs to be calculated differently,
- for example with a tensorflow model.
- :param Pipeline pipeline: machine learning pipeline
- that will be evaluated with cross-validation
- :output: dictionary with the aggregated
- cross-validation score and
- the score variance.
- '''
- scores = cross_validate(estimator=pipeline,
- X=self._X,
- y=self._y,
- cv=self._cv or 5,
- scoring=make_scorer(self._cost_func),
- error_score=np.nan)
- return {'value': self._averaging_func(scores['test_score']),
- 'variance': np.var(scores['test_score'])}
- def _objective(self, space_element: dict) -> dict:
- '''
- This method is called in search_for_best_pipeline
- inside the hyperopt fmin method.
- Uses _evaluate method.
- It must take as input a space element
- and produce an output in the form of dictionary
- with 2 obligatory values loss and status
- (STATUS_OK or STATUS_FAIL). Other
- values in the output are optional and can be
- accessed later through the trials object.
- :Warning: fmin minimizes the loss,
- when _evaluate returns a value to be maximized,
- it should be multiplied by -1 to obtain loss.
- :param dict space_element: must contain keys
- name (with the name of the pipeline),
- pipeline (Pipeline object),
- params (dict of pipeline params)
- :output: dictionary with keys
- loss (minimized value),
- status with values STATUS_OK or STATUS_FAIL
- uderstood by hyperopt,
- score (equal to loss or -loss),
- score_variance,
- timestamp (end of execution),
- train_time: execution time
- '''
- assert(isinstance(space_element, dict) and
- set(['name', 'pipeline', 'params']) <= space_element.keys())
- assert(isinstance(space_element['name'], str) and
- isinstance(space_element['pipeline'], Pipeline) and
- isinstance(space_element['params'], dict))
- start_time = time.time()
- if not self._data_attached:
- raise Exception(("Data must be attached in order "
- "in order to effectuate the best"
- "pipeline search"))
- self._run_number += 1
- pipeline = space_element['pipeline']
- params = space_element['params']
- pipeline.set_params(**params)
- self._logger.info(("Run number {0}: "
- "Current score is {1}: "
- "Training pipeline {2} "
- "with parameters: {3}. ").format(
- self._run_number,
- self._best_score,
- space_element['name'],
- params))
- try:
- score_stats = self._evaluate(pipeline)
- assert(not np.isnan(score_stats["value"])),\
- "Returned null score"
- if self._run_number % self._backup_trials_freq == 0:
- self._backup_trials()
- if (self._best_score != self._best_score) or\
- self._score_factor*score_stats["value"] <\
- self._score_factor*self._best_score:
- self._logger.info("Score got better, new best score is: {}"
- .format(score_stats["value"]))
- self._best_score = score_stats['value']
- self._backup_trials()
- end_time = time.time()
- return {'loss': self._score_factor * score_stats["value"],
- 'status': STATUS_OK,
- 'score': score_stats["value"],
- 'score_variance': score_stats["variance"],
- 'timestamp': datetime.datetime.today(),
- 'train_time': end_time - start_time}
- except Exception as e:
- self._logger.warning("Trial failed with error {}".format(e))
- return {'loss': np.nan,
- 'status': STATUS_FAIL,
- 'score': np.nan,
- 'score_variance': np.nan,
- 'timestamp': datetime.datetime.today(),
- 'train_time': np.nan}
- def search_for_best_pipeline(self,
- niter: int,
- algo: callable = tpe.suggest):
- '''
- Method performing the search of the best pipeline in the given space.
- Calls fmin function from the hyperopt library to minimize the output of
- _objective.
- :params int niter: number of search iterations
- :param callable algo: now can only take values tpe for a tree-based
- random search or random for random search
- '''
- assert(self._space_attached),\
- "Space must be attach to be able to retrieve this information."
- assert(isinstance(niter, int)),\
- "Parameter 'niter' must be of int type"
- # right now only two algorithms are provided by
- assert(algo in [tpe.suggest, rand.suggest]),\
- ("Parameter 'algo' can be now only tpe or random. "
- "If other algorithms have been developped by "
- "by hyperopt, plased add them to the list.")
- try:
- self._logger.info(("Starting {0} iterations of search "
- "additional to {1} previous"
- .format(niter, len(self._trials.trials))))
- best = fmin(fn=self._objective,
- space=space,
- algo=algo,
- trials=self._trials,
- max_evals=len(self._trials.trials) + niter)
- # print('AAAA', str(niter))
- self._logger.info(
- "Best score is {0} with variance {1}"
- .format(
- self._trials.best_trial["result"]["score"],
- self._trials.best_trial["result"]["score_variance"]))
- self._logger.info(("Finished {0} iterations of search.\n"
- "Best parameters are:\n {1} ")
- .format(niter,
- space_eval(space, best)))
- self._backup_trials()
- except Exception as e:
- raise ValueError(("Failed to select best "
- "pipeline! Exit with error: {}").format(e))
- @property
- def best_trial_score(self) -> float:
- '''
- '''
- if len(self._trials.trials) > 0:
- return self._trials.best_trial["result"]["score"]
- else:
- return np.nan
- @property
- def best_trial_score_variance(self) -> float:
- '''
- '''
- if len(self._trials.trials) > 0:
- return self._trials.best_trial["result"]["score_variance"]
- else:
- return np.nan
- @property
- def best_trial_pipeline(self) -> Pipeline:
- '''
- '''
- assert(self._space_attached),\
- "Space must be attach to be able to retrieve this information."
- if len(self._trials.trials) > 0:
- return space_eval(
- space,
- {k: v[0] for k, v in
- self._trials.best_trial['misc']['vals'].items()
- if len(v) > 0})["pipeline"]
- else:
- err = ("Trials object is empty. "
- "Best pipeline cannot be returned")
- self._logger.error(err)
- raise Exception(err)
- def _ith_trial_loss(self, i: int) -> float:
- '''
- '''
- if len(self._trials.trials) >= i:
- return self._trials.trials[i]['result']['loss']
- else:
- return np.nan
- def _ith_trial_element(self, i: int, name: str) -> object:
- '''
- '''
- assert(self._space_attached),\
- "Space must be attach to be able to retrieve this information."
- if len(self._trials.trials) >= i:
- return space_eval(self._space,
- {k: v[0] for k, v in
- self._trials.trials[i]['misc']['vals']
- .items() if len(v) > 0})[name]
- def _ith_trial_pipeline(self, i: int) -> Pipeline:
- '''
- '''
- return self._ith_trial_element(i=i, name='pipeline')
- def _ith_trial_name(self, i: int) -> str:
- '''
- '''
- return self._ith_trial_element(i=i, name='name')
- def _ith_trial_params(self, i: int) -> dict:
- '''
- '''
- return self._ith_trial_element(i=i, name='params')
- def _ith_trial_timestamp(self, i: int) -> datetime.datetime:
- '''
- '''
- if len(self._trials.trials) >= i:
- return self._trials.trials[i]["result"]["timestamp"]
- def get_n_best_trial_pipelines(self, n: int, losses: list = None) -> list:
- '''
- Returns the list of n best pipelines
- documented in trials
- '''
- if len(self._trials.trials) > 0:
- if losses is None:
- losses = [self._ith_trial_loss(i)
- for i in range(len(self._trials.trials))]
- best_n_indices = [losses.index(l)
- for l in sorted(list(set(losses)))[:n]]
- return [self._ith_trial_pipeline(i) for i in best_n_indices]
- else:
- err = ("Trials object is empty. "
- "Best pipeline cannot be returned")
- self._logger.error(err)
- raise Exception(err)
- def get_n_best_trial_pipelines_of_each_type(self, n: int) -> dict:
- '''
- Returns a dictiionry where keys are pipeline names,
- and values are lists of best pipelines with this name
- '''
- assert(isinstance(n, int)), "Parameter 'n' must be an integer"
- if len(self._trials.trials) > 0:
- best_pipelines_per_type = {}
- names = [self._ith_trial_name(i)
- for i in range(len(self._trials.trials))]
- for nm in names:
- losses = [self._ith_trial_loss(i)
- for i in range(len(self._trials.trials))
- if self._ith_trial_name(i) == nm]
- best_pipelines_per_type[nm] = self.get_n_best_trial_pipelines(
- n=n,
- losses=losses)
- return best_pipelines_per_type
- else:
- err = ("Trials object is empty. "
- "Best pipeline cannot be returned")
- self._logger.error(err)
- raise Exception(err)
- def write_trials_documentation(self, path: str = None):
- '''
- Saves an excel file with pipeline names, scores,
- parameters, and timestamps.
- '''
- if len(self._trials.trials) > 0:
- path = path or "hyperopt_trials_documentation.xlsx"
- assert(isinstance(path, str)),\
- "Parameter 'path' must be of string type"
- self._assert_valid_directory(path)
- names = [self._ith_trial_name(i)
- for i in range(len(self._trials.trials))]
- scores = [self._score_factor*self._ith_trial_loss(i)
- for i in range(len(self._trials.trials))]
- params = [self._ith_trial_params(i)
- for i in range(len(self._trials.trials))]
- timestamps = [self._ith_trial_timestamp(i)
- for i in range(len(self._trials.trials))]
- else:
- names = []
- scores = []
- params = []
- timestamps = []
- pd.DataFrame({"name": names,
- "score": scores,
- "params": params,
- "timestamp": timestamps})\
- .to_excel(path)
- if __name__ == '__main__':
- from sklearn.metrics import roc_auc_score, make_scorer
- from xgboost import XGBClassifier
- from sklearn.svm import SVC
- from sklearn.feature_selection import SelectKBest
- from sklearn.decomposition import PCA
- from sklearn.datasets import load_iris
- from pprint import pprint
- data = load_iris()
- X = pd.DataFrame(data.data)
- y = pd.Series(data.target)
- # produce a binory variable
- y = (y == 2).astype(int)
- del data
- gc.collect()
- # SPACE DEFINITION ########################################
- # (can be moved to a separate python script)
- """
- A search space must be a list of dictionaries.
- Each dictionry must have keys:
- name (pipeline name or type),
- pipeline (instance of sklearn.pipeline.Pipeline),
- params (dictionary of distributions for the parameters of
- the pipeline that we want to tune)
- Here we have a space that consists of two dictionaries:
- KBEST_XGBOOST and PCA_SVC
- """
- space = []
- pipeline_dist_1 = {}
- pipeline_dist_1["name"] = "KBEST_XGBOOST"
- """
- A pipeline consists of steps (tuples).
- Each step has a name and an algorithm.
- This pipeline, as a first step performs
- feature selection with SelectKBest and
- as a second step evaluates a machine learning algo (xgboost).
- Like all sklearn algorithms, a Pipeline has methods
- fit, predict, set_params, get_params
- """
- pipeline_dist_1["pipeline"] = Pipeline([
- ('kbest', SelectKBest()),
- ('xgb', XGBClassifier())
- ])
- """
- Pipeline parameter dictionaries must be of the form:
- {'kbest__k': 3, xgb__n_estimators: 20},
- each parameter name consists of the step name, __, and parameter name.
- Here, instead of values, the parameter names are followed
- by hyperopt distributions.
- Each hyperopt distribution also must have a name,
- due to hyperopt functionality.
- Here, we set the hyperopt distribution name to the step name,
- but it does not have to be so. Hyperopt distribution names
- must be different for different elements of the space.
- """
- pipeline_dist_1["params"] = {
- 'kbest__k': hp.choice('kbest__k', range(1, 5)),
- 'xgb__n_estimators':
- 50 + hp.randint('xgb__n_estimators', 50),
- "xgb__learning_rate":
- hp.loguniform('xgb__learning_rate', np.log(0.01), np.log(0.2))
- }
- space.append(pipeline_dist_1)
- pipeline_dist_2 = {}
- pipeline_dist_2["name"] = "PCA_SVC"
- pipeline_dist_2["pipeline"] = Pipeline([
- ('pca', PCA()),
- ('svc', SVC(gamma="scale"))
- ])
- pipeline_dist_2["params"] = {
- "pca__n_components": 1 + hp.randint("pca__n_components", 4),
- "svc__C": hp.loguniform("svc__C", np.log(0.01), np.log(0.1))
- }
- space.append(pipeline_dist_2)
- space = hp.choice('pipelines', space)
- # TESTING ##########################################################
- trials_path = 'TEST_hyperopt_trials.pkl'
- doc_path = 'TEST_hyperopt_doc.xlsx'
- hp_obj = HyperoptPipelineSelection(cost_func=roc_auc_score,
- greater_is_better=True,
- trials_path=trials_path)
- hp_obj.attach_data(X_train=X, y_train=y)
- hp_obj.attach_space(space=space)
- hp_obj.search_for_best_pipeline(niter=10)
- print('\n', '='*20, 'TESTING', '='*20)
- print('\n', 'Best score:', hp_obj.best_trial_score)
- print('\n', 'Best score variance:', hp_obj.best_trial_score_variance)
- print('\n', 'Best pipeline', hp_obj.best_trial_pipeline)
- print('\n', 'Best 3 pipelines: \n')
- pprint(hp_obj.get_n_best_trial_pipelines(n=3))
- print('\n', 'Best pipeline per type: \n')
- pprint(hp_obj.get_n_best_trial_pipelines_of_each_type(n=1))
- hp_obj.write_trials_documentation(path=doc_path)
- # os.remove(doc_path)
- # os.remove(trials_path)
|