|
@@ -0,0 +1,798 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+Created on Fri Nov 9 13:27:44 2018
|
|
|
+
|
|
|
+@author: tanja
|
|
|
+@description: Implementation of machine learning
|
|
|
+ pipeline selection and tuning with hyperopt library
|
|
|
+"""
|
|
|
+
|
|
|
+import os
|
|
|
+import sys
|
|
|
+import gc
|
|
|
+import logging
|
|
|
+import pickle
|
|
|
+import time
|
|
|
+import datetime
|
|
|
+
|
|
|
+import pandas as pd
|
|
|
+import numpy as np
|
|
|
+
|
|
|
+from sklearn.pipeline import Pipeline
|
|
|
+
|
|
|
+from hyperopt import fmin, tpe, rand, Trials, hp, STATUS_OK, STATUS_FAIL,\
|
|
|
+ space_eval, pyll
|
|
|
+
|
|
|
+from sklearn.model_selection import cross_validate
|
|
|
+
|
|
|
+
|
|
|
+class HyperoptPipelineSelection:
|
|
|
+ '''
|
|
|
+ Use this class to perform a search
|
|
|
+ for a machine learning pipeline in a given parameter space.
|
|
|
+ The parameter space can include multiple types of Pipelines
|
|
|
+ (SVM, XGBOOST, random forest, etc),
|
|
|
+ as well as parameter distributions for each pipeline parameter.
|
|
|
+ See example in main for the expected space structure.
|
|
|
+
|
|
|
+ The search can be performed either randomly
|
|
|
+ or with a tree-based algorithm. (Other methods are currently
|
|
|
+ developped by hyperopt creators).
|
|
|
+
|
|
|
+ Attribute trials is responsible for book-keeping parameter
|
|
|
+ combinations that have already been tried out. This attribute
|
|
|
+ is saved to a binary file every n minutes as well as every time
|
|
|
+ a better pipeline was found.
|
|
|
+ '''
|
|
|
+ def __init__(self,
|
|
|
+ cost_func,
|
|
|
+ greater_is_better: bool,
|
|
|
+ trials_path: str,
|
|
|
+ backup_trials_freq: int = 1,
|
|
|
+ log_path: str = None,
|
|
|
+ averaging_func: callable = None):
|
|
|
+ '''
|
|
|
+ :param callable cost_func: function to minimize or maximize
|
|
|
+
|
|
|
+ :param bool greater_is_better: when True
|
|
|
+ cost_func is maximized, else minimized.
|
|
|
+
|
|
|
+ :param str trials_path: path at which the trials object is saved
|
|
|
+ in binary format. From the trials object we can
|
|
|
+ select information about the obtained scores, score variations,
|
|
|
+ and pipelines, and parameters tried out so far. If a trials object
|
|
|
+ already exists at the given path, it is loaded and the
|
|
|
+ search is continued, else, the search is started from
|
|
|
+ the beginning.
|
|
|
+
|
|
|
+ :param backup_trials_freq: frequecy in interations (trials)
|
|
|
+ of saving the trials object at the trials_path.
|
|
|
+
|
|
|
+ :param str log_path: Optional, when not provided logs to stdout.
|
|
|
+
|
|
|
+ :param callable averaging_func: optional,
|
|
|
+ when not provided set to mean. Function
|
|
|
+ to aggregate the cross-validated values of the cost function.
|
|
|
+ Classic situation is to take the mean,
|
|
|
+ another example is, for example mean() - c*var().
|
|
|
+ '''
|
|
|
+
|
|
|
+ assert(callable(cost_func)),\
|
|
|
+ "Parameter 'cost_func' must be a callable"
|
|
|
+
|
|
|
+ assert(isinstance(greater_is_better, bool)),\
|
|
|
+ "Parameter 'greater_is_better' must be bool type"
|
|
|
+
|
|
|
+ assert(isinstance(trials_path, str)),\
|
|
|
+ "Parameter 'trials_path' must be of string type"
|
|
|
+
|
|
|
+ if averaging_func is not None:
|
|
|
+ assert(callable(averaging_func)),\
|
|
|
+ "Parameter 'averaging_func' must be a callable"
|
|
|
+
|
|
|
+ self._assert_valid_directory(path=trials_path)
|
|
|
+
|
|
|
+ self._configer_logger(log_path)
|
|
|
+
|
|
|
+ self._cost_func = cost_func
|
|
|
+ # is 1 when cost_func is minimized, -1 when cost func is maximized
|
|
|
+ self._score_factor = (not greater_is_better) - greater_is_better
|
|
|
+ self._trials_path = trials_path
|
|
|
+ # is initialized with empty trials object
|
|
|
+ self._trials = Trials()
|
|
|
+ self._backup_trials_freq = backup_trials_freq
|
|
|
+ self._averaging_func = averaging_func or np.mean
|
|
|
+ # keeping track of the current search iteration
|
|
|
+ self._run_number = 0
|
|
|
+ # space and data need to be attached to perform search.
|
|
|
+ self._space_attached = False
|
|
|
+ self._data_attached = False
|
|
|
+
|
|
|
+ # if a trials object already exists at the given path,
|
|
|
+ # it is loaded and the search is continued. Else,
|
|
|
+ # the search is started from the beginning.
|
|
|
+ if os.path.isfile(trials_path):
|
|
|
+ try:
|
|
|
+ with open(trials_path, "rb") as f:
|
|
|
+ self._trials = pickle.load(f)
|
|
|
+
|
|
|
+ self._logger.info(("Loaded an existing trials object"
|
|
|
+ "Consisting of {} trials")
|
|
|
+ .format(len(self._trials.trials)))
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self._logger.error(("Trials object could not be loaded. "
|
|
|
+ "Training starts from the beginning. "
|
|
|
+ "Exit with error {}").format(e))
|
|
|
+
|
|
|
+ else:
|
|
|
+ self._logger.info(("No existing trials object was found"
|
|
|
+ "Initialized an empty trials object."))
|
|
|
+
|
|
|
+ self._best_score = self.best_trial_score
|
|
|
+
|
|
|
+ def _configer_logger(self, log_path: str = None):
|
|
|
+ '''
|
|
|
+ Can be replaced with the existing script later.
|
|
|
+ When log_path is not provided, logs to stdout.
|
|
|
+ '''
|
|
|
+
|
|
|
+ self._logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+ if (self._logger.hasHandlers()):
|
|
|
+ self._logger.handlers.clear()
|
|
|
+
|
|
|
+ if log_path is not None:
|
|
|
+ assert(isinstance(log_path, str)),\
|
|
|
+ "Parameter 'log_path' must be of string type"
|
|
|
+ self._assert_valid_directory(log_path)
|
|
|
+
|
|
|
+ handler = logging.FileHandler(log_path)
|
|
|
+ else:
|
|
|
+ handler = logging.StreamHandler(sys.stdout)
|
|
|
+
|
|
|
+ formatter = logging.Formatter(
|
|
|
+ '\n %(asctime)s %(levelname)s %(message)s')
|
|
|
+
|
|
|
+ handler.setFormatter(formatter)
|
|
|
+ self._logger.addHandler(handler)
|
|
|
+ self._logger.setLevel("INFO")
|
|
|
+
|
|
|
+ def _backup_trials(self):
|
|
|
+ '''
|
|
|
+ Pickles (Saves) the trials object.
|
|
|
+ Used in a scheduler.
|
|
|
+ '''
|
|
|
+ with open(self._trials_path, "wb") as f:
|
|
|
+ pickle.dump(self._trials, f)
|
|
|
+
|
|
|
+ def _assert_valid_directory(self, path: str):
|
|
|
+ '''
|
|
|
+ If the directory of a path does not exist yet,
|
|
|
+ creates it.
|
|
|
+ '''
|
|
|
+ assert(isinstance(path, str)),\
|
|
|
+ "Parameter 'path' must of str type"
|
|
|
+
|
|
|
+ dirname = os.path.dirname("path")
|
|
|
+
|
|
|
+ if len(dirname) > 0:
|
|
|
+ os.mkdir(dirname, exists_ok=True)
|
|
|
+
|
|
|
+ def attach_space(self, space: pyll.base.Apply = None,
|
|
|
+ module_path: str = None,
|
|
|
+ name: str = None):
|
|
|
+ '''
|
|
|
+ :param pyll.base.Apply space: hyperopt space where
|
|
|
+ the search is performed. Optional when a space
|
|
|
+ is loaded from a python module.
|
|
|
+
|
|
|
+ :param str module_path: path to python module
|
|
|
+ where the space is defined. Optional when
|
|
|
+ the space is provided directly.
|
|
|
+
|
|
|
+ :param str name: name of the space loaded from
|
|
|
+ a python module. Optional when the space
|
|
|
+ is provided directly.
|
|
|
+ '''
|
|
|
+ assert((space is not None) or
|
|
|
+ ((module_path is not None) and (name is not None))),\
|
|
|
+ "Either space or (module_path, name) must be provided"
|
|
|
+
|
|
|
+ if space is None:
|
|
|
+ for p in ["modele_path", "name"]:
|
|
|
+ assert(isinstance(p, str)),\
|
|
|
+ "Parameter '{}' must be of str type".format(p)
|
|
|
+
|
|
|
+ assert(os.path.isfile(module_path)),\
|
|
|
+ "Parameter 'module_path' must be a valid file"
|
|
|
+
|
|
|
+ module, extension = os.path.splitext(os.path.basename(module_path))
|
|
|
+ assert(extension == ",py"),\
|
|
|
+ "Parameter 'space' must be read from a python file"
|
|
|
+
|
|
|
+ sys.path.insert(module_path)
|
|
|
+
|
|
|
+ try:
|
|
|
+ from module import name as space
|
|
|
+ except ImportError:
|
|
|
+ err = "Invalid space location or name"
|
|
|
+ self._logger.error(err)
|
|
|
+ raise Exception(err)
|
|
|
+
|
|
|
+ assert(isinstance(space, pyll.base.Apply)),\
|
|
|
+ "Parameter 'space' must be of hyperopt space type"
|
|
|
+
|
|
|
+ self._space = space
|
|
|
+ self._logger.info("Attached parameter distribution space")
|
|
|
+ self._space_attached = True
|
|
|
+
|
|
|
+ def _convert_to_array(self, x: (pd.DataFrame, np.ndarray))\
|
|
|
+ -> np.ndarray:
|
|
|
+ '''
|
|
|
+ Converts an DataFrame to an numpy array.
|
|
|
+ '''
|
|
|
+ if isinstance(x, np.ndarray):
|
|
|
+ return x
|
|
|
+
|
|
|
+ elif (isinstance(x, pd.core.frame.DataFrame))\
|
|
|
+ or (isinstance(x, pd.core.series.Series)):
|
|
|
+ return x.values
|
|
|
+
|
|
|
+ else:
|
|
|
+ e = 'The argument must be a numpy array or a pandas DataFrame'
|
|
|
+ self._logger.critical(e)
|
|
|
+ raise ValueError(e)
|
|
|
+
|
|
|
+ def attach_data(self, X_train: (pd.DataFrame, np.ndarray),
|
|
|
+ y_train: (pd.DataFrame, pd.Series, np.ndarray) = None,
|
|
|
+ X_val: (pd.DataFrame, np.ndarray) = None,
|
|
|
+ y_val: (pd.DataFrame, pd.Series, np.ndarray) = None,
|
|
|
+ cv: (list, int) = None):
|
|
|
+ '''
|
|
|
+ :param array X_train: data on which
|
|
|
+ machine learning pipelines are trained
|
|
|
+
|
|
|
+ :param array y_train: optional, vector with targets,
|
|
|
+ (not all algorithms require a targets)
|
|
|
+
|
|
|
+ :param array X_val: optional, validation data.
|
|
|
+ When not provided, cross-validated value
|
|
|
+ of the cost_func is calculated.
|
|
|
+
|
|
|
+ :param array y_val: optional, validation targets
|
|
|
+
|
|
|
+ :param list cv: list of tuples containing
|
|
|
+ train and validation indices or an integer representing
|
|
|
+ the number of folds for a random split of data
|
|
|
+ during cross-validation
|
|
|
+ example: [([0,1,2], [3,4]), ([1,2,3], [4,5])]
|
|
|
+ '''
|
|
|
+
|
|
|
+ X_train = self._convert_to_array(X_train)
|
|
|
+ if y_train is not None:
|
|
|
+ y_train = self._convert_to_array(y_train)
|
|
|
+
|
|
|
+ if X_val is not None:
|
|
|
+ if cv is not None:
|
|
|
+ self._logger.warning(("Both validation set and cv object "
|
|
|
+ "are set. Validation score will be "
|
|
|
+ "calculated on the validation set!"))
|
|
|
+
|
|
|
+ X_val = self._convert_to_array(X_val)
|
|
|
+
|
|
|
+ train_inds = list(range(len(X_train)))
|
|
|
+ val_inds = list(range(len(X_train),
|
|
|
+ len(X_train) + len(X_val)))
|
|
|
+
|
|
|
+ # cost is evaluated with a cross validation function
|
|
|
+ # that accepts an array and a cv object with
|
|
|
+ # indices of the fold splits.
|
|
|
+ # Here we create a trivial cv object
|
|
|
+ # with one validation split.
|
|
|
+ self._cv = [(train_inds, val_inds)]
|
|
|
+ self._X = np.concatenate([X_train, X_val])
|
|
|
+
|
|
|
+ if y_train is not None:
|
|
|
+ if y_val is None:
|
|
|
+ err = "Argument y_val must be provided"
|
|
|
+ self._logger.critical(err)
|
|
|
+ raise ValueError(err)
|
|
|
+ else:
|
|
|
+ y_val = self._convert_to_array(y_val)
|
|
|
+ self._y = np.concatenate([y_train, y_val])
|
|
|
+ else:
|
|
|
+ self._y = None
|
|
|
+ else:
|
|
|
+ if cv is None:
|
|
|
+ self._logger.warning(("Neither validation set nor cv object "
|
|
|
+ "are set. Validation score will be "
|
|
|
+ "calculated on 5 randomly "
|
|
|
+ "splitted folds."))
|
|
|
+
|
|
|
+ self._X = X_train
|
|
|
+ self._y = y_train
|
|
|
+ self._cv = cv
|
|
|
+
|
|
|
+ self._logger.info("Attached data")
|
|
|
+ self._data_attached = True
|
|
|
+
|
|
|
+ def _evaluate(self, pipeline: Pipeline) -> dict:
|
|
|
+ '''
|
|
|
+ This method is called in _objective.
|
|
|
+
|
|
|
+ Calculates the cost on the attached data.
|
|
|
+ This function can be overriden, when the cost
|
|
|
+ needs to be calculated differently,
|
|
|
+ for example with a tensorflow model.
|
|
|
+
|
|
|
+ :param Pipeline pipeline: machine learning pipeline
|
|
|
+ that will be evaluated with cross-validation
|
|
|
+
|
|
|
+ :output: dictionary with the aggregated
|
|
|
+ cross-validation score and
|
|
|
+ the score variance.
|
|
|
+ '''
|
|
|
+
|
|
|
+ scores = cross_validate(estimator=pipeline,
|
|
|
+ X=self._X,
|
|
|
+ y=self._y,
|
|
|
+ cv=self._cv or 5,
|
|
|
+ scoring=make_scorer(self._cost_func),
|
|
|
+ error_score=np.nan)
|
|
|
+
|
|
|
+ return {'value': self._averaging_func(scores['test_score']),
|
|
|
+ 'variance': np.var(scores['test_score'])}
|
|
|
+
|
|
|
+ def _objective(self, space_element: dict) -> dict:
|
|
|
+ '''
|
|
|
+ This method is called in search_for_best_pipeline
|
|
|
+ inside the hyperopt fmin method.
|
|
|
+
|
|
|
+ Uses _evaluate method.
|
|
|
+
|
|
|
+ It must take as input a space element
|
|
|
+ and produce an output in the form of dictionary
|
|
|
+ with 2 obligatory values loss and status
|
|
|
+ (STATUS_OK or STATUS_FAIL). Other
|
|
|
+ values in the output are optional and can be
|
|
|
+ accessed later through the trials object.
|
|
|
+
|
|
|
+ :Warning: fmin minimizes the loss,
|
|
|
+ when _evaluate returns a value to be maximized,
|
|
|
+ it should be multiplied by -1 to obtain loss.
|
|
|
+
|
|
|
+ :param dict space_element: must contain keys
|
|
|
+ name (with the name of the pipeline),
|
|
|
+ pipeline (Pipeline object),
|
|
|
+ params (dict of pipeline params)
|
|
|
+
|
|
|
+ :output: dictionary with keys
|
|
|
+ loss (minimized value),
|
|
|
+ status with values STATUS_OK or STATUS_FAIL
|
|
|
+ uderstood by hyperopt,
|
|
|
+ score (equal to loss or -loss),
|
|
|
+ score_variance,
|
|
|
+ timestamp (end of execution),
|
|
|
+ train_time: execution time
|
|
|
+ '''
|
|
|
+ assert(isinstance(space_element, dict) and
|
|
|
+ set(['name', 'pipeline', 'params']) <= space_element.keys())
|
|
|
+
|
|
|
+ assert(isinstance(space_element['name'], str) and
|
|
|
+ isinstance(space_element['pipeline'], Pipeline) and
|
|
|
+ isinstance(space_element['params'], dict))
|
|
|
+
|
|
|
+ start_time = time.time()
|
|
|
+
|
|
|
+ if not self._data_attached:
|
|
|
+ raise Exception(("Data must be attached in order "
|
|
|
+ "in order to effectuate the best"
|
|
|
+ "pipeline search"))
|
|
|
+
|
|
|
+ self._run_number += 1
|
|
|
+
|
|
|
+ pipeline = space_element['pipeline']
|
|
|
+ params = space_element['params']
|
|
|
+ pipeline.set_params(**params)
|
|
|
+
|
|
|
+ self._logger.info(("Run number {0}: "
|
|
|
+ "Current score is {1}: "
|
|
|
+ "Training pipeline {2} "
|
|
|
+ "with parameters: {3}. ").format(
|
|
|
+ self._run_number,
|
|
|
+ self._best_score,
|
|
|
+ space_element['name'],
|
|
|
+ params))
|
|
|
+
|
|
|
+ try:
|
|
|
+ score_stats = self._evaluate(pipeline)
|
|
|
+ assert(not np.isnan(score_stats["value"])),\
|
|
|
+ "Returned null score"
|
|
|
+
|
|
|
+ if self._run_number % self._backup_trials_freq == 0:
|
|
|
+ self._backup_trials()
|
|
|
+
|
|
|
+ if (self._best_score != self._best_score) or\
|
|
|
+ self._score_factor*score_stats["value"] <\
|
|
|
+ self._score_factor*self._best_score:
|
|
|
+
|
|
|
+ self._logger.info("Score got better, new best score is: {}"
|
|
|
+ .format(score_stats["value"]))
|
|
|
+
|
|
|
+ self._best_score = score_stats['value']
|
|
|
+
|
|
|
+ self._backup_trials()
|
|
|
+
|
|
|
+ end_time = time.time()
|
|
|
+
|
|
|
+ return {'loss': self._score_factor * score_stats["value"],
|
|
|
+ 'status': STATUS_OK,
|
|
|
+ 'score': score_stats["value"],
|
|
|
+ 'score_variance': score_stats["variance"],
|
|
|
+ 'timestamp': datetime.datetime.today(),
|
|
|
+ 'train_time': end_time - start_time}
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+
|
|
|
+ self._logger.warning("Trial failed with error {}".format(e))
|
|
|
+
|
|
|
+ return {'loss': np.nan,
|
|
|
+ 'status': STATUS_FAIL,
|
|
|
+ 'score': np.nan,
|
|
|
+ 'score_variance': np.nan,
|
|
|
+ 'timestamp': datetime.datetime.today(),
|
|
|
+ 'train_time': np.nan}
|
|
|
+
|
|
|
+ def search_for_best_pipeline(self,
|
|
|
+ niter: int,
|
|
|
+ algo: callable = tpe.suggest):
|
|
|
+ '''
|
|
|
+ Method performing the search of the best pipeline in the given space.
|
|
|
+ Calls fmin function from the hyperopt library to minimize the output of
|
|
|
+ _objective.
|
|
|
+
|
|
|
+ :params int niter: number of search iterations
|
|
|
+ :param callable algo: now can only take values tpe for a tree-based
|
|
|
+ random search or random for random search
|
|
|
+ '''
|
|
|
+ assert(self._space_attached),\
|
|
|
+ "Space must be attach to be able to retrieve this information."
|
|
|
+
|
|
|
+ assert(isinstance(niter, int)),\
|
|
|
+ "Parameter 'niter' must be of int type"
|
|
|
+
|
|
|
+ # right now only two algorithms are provided by
|
|
|
+ assert(algo in [tpe.suggest, rand.suggest]),\
|
|
|
+ ("Parameter 'algo' can be now only tpe or random. "
|
|
|
+ "If other algorithms have been developped by "
|
|
|
+ "by hyperopt, plased add them to the list.")
|
|
|
+
|
|
|
+ try:
|
|
|
+ self._logger.info(("Starting {0} iterations of search "
|
|
|
+ "additional to {1} previous"
|
|
|
+ .format(niter, len(self._trials.trials))))
|
|
|
+
|
|
|
+ best = fmin(fn=self._objective,
|
|
|
+ space=space,
|
|
|
+ algo=algo,
|
|
|
+ trials=self._trials,
|
|
|
+ max_evals=len(self._trials.trials) + niter)
|
|
|
+
|
|
|
+ # print('AAAA', str(niter))
|
|
|
+
|
|
|
+ self._logger.info(
|
|
|
+ "Best score is {0} with variance {1}"
|
|
|
+ .format(
|
|
|
+ self._trials.best_trial["result"]["score"],
|
|
|
+ self._trials.best_trial["result"]["score_variance"]))
|
|
|
+
|
|
|
+ self._logger.info(("Finished {0} iterations of search.\n"
|
|
|
+ "Best parameters are:\n {1} ")
|
|
|
+ .format(niter,
|
|
|
+ space_eval(space, best)))
|
|
|
+
|
|
|
+ self._backup_trials()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ raise ValueError(("Failed to select best "
|
|
|
+ "pipeline! Exit with error: {}").format(e))
|
|
|
+
|
|
|
+ @property
|
|
|
+ def best_trial_score(self) -> float:
|
|
|
+ '''
|
|
|
+ '''
|
|
|
+ if len(self._trials.trials) > 0:
|
|
|
+ return self._trials.best_trial["result"]["score"]
|
|
|
+ else:
|
|
|
+ return np.nan
|
|
|
+
|
|
|
+ @property
|
|
|
+ def best_trial_score_variance(self) -> float:
|
|
|
+ '''
|
|
|
+ '''
|
|
|
+ if len(self._trials.trials) > 0:
|
|
|
+ return self._trials.best_trial["result"]["score_variance"]
|
|
|
+ else:
|
|
|
+ return np.nan
|
|
|
+
|
|
|
+ @property
|
|
|
+ def best_trial_pipeline(self) -> Pipeline:
|
|
|
+ '''
|
|
|
+ '''
|
|
|
+ assert(self._space_attached),\
|
|
|
+ "Space must be attach to be able to retrieve this information."
|
|
|
+
|
|
|
+ if len(self._trials.trials) > 0:
|
|
|
+
|
|
|
+ return space_eval(
|
|
|
+ space,
|
|
|
+ {k: v[0] for k, v in
|
|
|
+ self._trials.best_trial['misc']['vals'].items()
|
|
|
+ if len(v) > 0})["pipeline"]
|
|
|
+ else:
|
|
|
+ err = ("Trials object is empty. "
|
|
|
+ "Best pipeline cannot be returned")
|
|
|
+
|
|
|
+ self._logger.error(err)
|
|
|
+ raise Exception(err)
|
|
|
+
|
|
|
+ def _ith_trial_loss(self, i: int) -> float:
|
|
|
+ '''
|
|
|
+ '''
|
|
|
+ if len(self._trials.trials) >= i:
|
|
|
+ return self._trials.trials[i]['result']['loss']
|
|
|
+ else:
|
|
|
+ return np.nan
|
|
|
+
|
|
|
+ def _ith_trial_element(self, i: int, name: str) -> object:
|
|
|
+ '''
|
|
|
+ '''
|
|
|
+ assert(self._space_attached),\
|
|
|
+ "Space must be attach to be able to retrieve this information."
|
|
|
+
|
|
|
+ if len(self._trials.trials) >= i:
|
|
|
+ return space_eval(self._space,
|
|
|
+ {k: v[0] for k, v in
|
|
|
+ self._trials.trials[i]['misc']['vals']
|
|
|
+ .items() if len(v) > 0})[name]
|
|
|
+
|
|
|
+ def _ith_trial_pipeline(self, i: int) -> Pipeline:
|
|
|
+ '''
|
|
|
+ '''
|
|
|
+ return self._ith_trial_element(i=i, name='pipeline')
|
|
|
+
|
|
|
+ def _ith_trial_name(self, i: int) -> str:
|
|
|
+ '''
|
|
|
+ '''
|
|
|
+ return self._ith_trial_element(i=i, name='name')
|
|
|
+
|
|
|
+ def _ith_trial_params(self, i: int) -> dict:
|
|
|
+ '''
|
|
|
+ '''
|
|
|
+ return self._ith_trial_element(i=i, name='params')
|
|
|
+
|
|
|
+ def _ith_trial_timestamp(self, i: int) -> datetime.datetime:
|
|
|
+ '''
|
|
|
+ '''
|
|
|
+ if len(self._trials.trials) >= i:
|
|
|
+ return self._trials.trials[i]["result"]["timestamp"]
|
|
|
+
|
|
|
+ def get_n_best_trial_pipelines(self, n: int, losses: list = None) -> list:
|
|
|
+ '''
|
|
|
+ Returns the list of n best pipelines
|
|
|
+ documented in trials
|
|
|
+ '''
|
|
|
+ if len(self._trials.trials) > 0:
|
|
|
+ if losses is None:
|
|
|
+ losses = [self._ith_trial_loss(i)
|
|
|
+ for i in range(len(self._trials.trials))]
|
|
|
+
|
|
|
+ best_n_indices = [losses.index(l)
|
|
|
+ for l in sorted(list(set(losses)))[:n]]
|
|
|
+
|
|
|
+ return [self._ith_trial_pipeline(i) for i in best_n_indices]
|
|
|
+ else:
|
|
|
+ err = ("Trials object is empty. "
|
|
|
+ "Best pipeline cannot be returned")
|
|
|
+
|
|
|
+ self._logger.error(err)
|
|
|
+ raise Exception(err)
|
|
|
+
|
|
|
+ def get_n_best_trial_pipelines_of_each_type(self, n: int) -> dict:
|
|
|
+ '''
|
|
|
+ Returns a dictiionry where keys are pipeline names,
|
|
|
+ and values are lists of best pipelines with this name
|
|
|
+ '''
|
|
|
+ assert(isinstance(n, int)), "Parameter 'n' must be an integer"
|
|
|
+
|
|
|
+ if len(self._trials.trials) > 0:
|
|
|
+
|
|
|
+ best_pipelines_per_type = {}
|
|
|
+ names = [self._ith_trial_name(i)
|
|
|
+ for i in range(len(self._trials.trials))]
|
|
|
+
|
|
|
+ for nm in names:
|
|
|
+ losses = [self._ith_trial_loss(i)
|
|
|
+ for i in range(len(self._trials.trials))
|
|
|
+ if self._ith_trial_name(i) == nm]
|
|
|
+
|
|
|
+ best_pipelines_per_type[nm] = self.get_n_best_trial_pipelines(
|
|
|
+ n=n,
|
|
|
+ losses=losses)
|
|
|
+
|
|
|
+ return best_pipelines_per_type
|
|
|
+
|
|
|
+ else:
|
|
|
+ err = ("Trials object is empty. "
|
|
|
+ "Best pipeline cannot be returned")
|
|
|
+
|
|
|
+ self._logger.error(err)
|
|
|
+ raise Exception(err)
|
|
|
+
|
|
|
+ def write_trials_documentation(self, path: str = None):
|
|
|
+ '''
|
|
|
+ Saves an excel file with pipeline names, scores,
|
|
|
+ parameters, and timestamps.
|
|
|
+ '''
|
|
|
+ if len(self._trials.trials) > 0:
|
|
|
+ path = path or "hyperopt_trials_documentation.xlsx"
|
|
|
+
|
|
|
+ assert(isinstance(path, str)),\
|
|
|
+ "Parameter 'path' must be of string type"
|
|
|
+
|
|
|
+ self._assert_valid_directory(path)
|
|
|
+
|
|
|
+ names = [self._ith_trial_name(i)
|
|
|
+ for i in range(len(self._trials.trials))]
|
|
|
+ scores = [self._score_factor*self._ith_trial_loss(i)
|
|
|
+ for i in range(len(self._trials.trials))]
|
|
|
+ params = [self._ith_trial_params(i)
|
|
|
+ for i in range(len(self._trials.trials))]
|
|
|
+ timestamps = [self._ith_trial_timestamp(i)
|
|
|
+ for i in range(len(self._trials.trials))]
|
|
|
+
|
|
|
+ else:
|
|
|
+ names = []
|
|
|
+ scores = []
|
|
|
+ params = []
|
|
|
+ timestamps = []
|
|
|
+
|
|
|
+ pd.DataFrame({"name": names,
|
|
|
+ "score": scores,
|
|
|
+ "params": params,
|
|
|
+ "timestamp": timestamps})\
|
|
|
+ .to_excel(path)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+
|
|
|
+ from sklearn.metrics import roc_auc_score, make_scorer
|
|
|
+ from xgboost import XGBClassifier
|
|
|
+ from sklearn.svm import SVC
|
|
|
+ from sklearn.feature_selection import SelectKBest
|
|
|
+ from sklearn.decomposition import PCA
|
|
|
+ from sklearn.datasets import load_iris
|
|
|
+ from pprint import pprint
|
|
|
+
|
|
|
+ data = load_iris()
|
|
|
+ X = pd.DataFrame(data.data)
|
|
|
+ y = pd.Series(data.target)
|
|
|
+ # produce a binory variable
|
|
|
+ y = (y == 2).astype(int)
|
|
|
+ del data
|
|
|
+ gc.collect()
|
|
|
+
|
|
|
+ # SPACE DEFINITION ########################################
|
|
|
+ # (can be moved to a separate python script)
|
|
|
+
|
|
|
+ """
|
|
|
+ A search space must be a list of dictionaries.
|
|
|
+ Each dictionry must have keys:
|
|
|
+ name (pipeline name or type),
|
|
|
+ pipeline (instance of sklearn.pipeline.Pipeline),
|
|
|
+ params (dictionary of distributions for the parameters of
|
|
|
+ the pipeline that we want to tune)
|
|
|
+
|
|
|
+ Here we have a space that consists of two dictionaries:
|
|
|
+ KBEST_XGBOOST and PCA_SVC
|
|
|
+ """
|
|
|
+ space = []
|
|
|
+
|
|
|
+ pipeline_dist_1 = {}
|
|
|
+ pipeline_dist_1["name"] = "KBEST_XGBOOST"
|
|
|
+
|
|
|
+ """
|
|
|
+ A pipeline consists of steps (tuples).
|
|
|
+ Each step has a name and an algorithm.
|
|
|
+ This pipeline, as a first step performs
|
|
|
+ feature selection with SelectKBest and
|
|
|
+ as a second step evaluates a machine learning algo (xgboost).
|
|
|
+
|
|
|
+ Like all sklearn algorithms, a Pipeline has methods
|
|
|
+ fit, predict, set_params, get_params
|
|
|
+ """
|
|
|
+ pipeline_dist_1["pipeline"] = Pipeline([
|
|
|
+ ('kbest', SelectKBest()),
|
|
|
+ ('xgb', XGBClassifier())
|
|
|
+ ])
|
|
|
+ """
|
|
|
+ Pipeline parameter dictionaries must be of the form:
|
|
|
+ {'kbest__k': 3, xgb__n_estimators: 20},
|
|
|
+ each parameter name consists of the step name, __, and parameter name.
|
|
|
+
|
|
|
+ Here, instead of values, the parameter names are followed
|
|
|
+ by hyperopt distributions.
|
|
|
+ Each hyperopt distribution also must have a name,
|
|
|
+ due to hyperopt functionality.
|
|
|
+
|
|
|
+ Here, we set the hyperopt distribution name to the step name,
|
|
|
+ but it does not have to be so. Hyperopt distribution names
|
|
|
+ must be different for different elements of the space.
|
|
|
+ """
|
|
|
+
|
|
|
+ pipeline_dist_1["params"] = {
|
|
|
+ 'kbest__k': hp.choice('kbest__k', range(1, 5)),
|
|
|
+
|
|
|
+ 'xgb__n_estimators':
|
|
|
+ 50 + hp.randint('xgb__n_estimators', 50),
|
|
|
+
|
|
|
+ "xgb__learning_rate":
|
|
|
+ hp.loguniform('xgb__learning_rate', np.log(0.01), np.log(0.2))
|
|
|
+ }
|
|
|
+
|
|
|
+ space.append(pipeline_dist_1)
|
|
|
+
|
|
|
+ pipeline_dist_2 = {}
|
|
|
+ pipeline_dist_2["name"] = "PCA_SVC"
|
|
|
+
|
|
|
+ pipeline_dist_2["pipeline"] = Pipeline([
|
|
|
+ ('pca', PCA()),
|
|
|
+ ('svc', SVC(gamma="scale"))
|
|
|
+ ])
|
|
|
+
|
|
|
+ pipeline_dist_2["params"] = {
|
|
|
+ "pca__n_components": 1 + hp.randint("pca__n_components", 4),
|
|
|
+
|
|
|
+ "svc__C": hp.loguniform("svc__C", np.log(0.01), np.log(0.1))
|
|
|
+ }
|
|
|
+
|
|
|
+ space.append(pipeline_dist_2)
|
|
|
+
|
|
|
+ space = hp.choice('pipelines', space)
|
|
|
+
|
|
|
+ # TESTING ##########################################################
|
|
|
+
|
|
|
+ trials_path = 'TEST_hyperopt_trials.pkl'
|
|
|
+
|
|
|
+ doc_path = 'TEST_hyperopt_doc.xlsx'
|
|
|
+
|
|
|
+ hp_obj = HyperoptPipelineSelection(cost_func=roc_auc_score,
|
|
|
+ greater_is_better=True,
|
|
|
+ trials_path=trials_path)
|
|
|
+
|
|
|
+ hp_obj.attach_data(X_train=X, y_train=y)
|
|
|
+
|
|
|
+ hp_obj.attach_space(space=space)
|
|
|
+
|
|
|
+ hp_obj.search_for_best_pipeline(niter=10)
|
|
|
+
|
|
|
+ print('\n', '='*20, 'TESTING', '='*20)
|
|
|
+
|
|
|
+ print('\n', 'Best score:', hp_obj.best_trial_score)
|
|
|
+
|
|
|
+ print('\n', 'Best score variance:', hp_obj.best_trial_score_variance)
|
|
|
+
|
|
|
+ print('\n', 'Best pipeline', hp_obj.best_trial_pipeline)
|
|
|
+
|
|
|
+ print('\n', 'Best 3 pipelines: \n')
|
|
|
+ pprint(hp_obj.get_n_best_trial_pipelines(n=3))
|
|
|
+
|
|
|
+ print('\n', 'Best pipeline per type: \n')
|
|
|
+ pprint(hp_obj.get_n_best_trial_pipelines_of_each_type(n=1))
|
|
|
+
|
|
|
+ hp_obj.write_trials_documentation(path=doc_path)
|
|
|
+
|
|
|
+ # os.remove(doc_path)
|
|
|
+ # os.remove(trials_path)
|