123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Tue Oct 6 15:04:25 2020
- @author: tanya
- @description:a class for selecting a machine learning
- pipeline from a deterministic space of parameter distributions
- over multiple pipelines.
- The selection is though in such a way that a Trials object is being
- maintained during the tuning process from which one can retrieve
- the best pipeline so far as well as the entire tuning history
- if needed.
- """
- import os
- import pickle
- from copy import deepcopy
- import datetime
- import pandas as pd
- import numpy as np
- from sklearn.pipeline import Pipeline
- from hyperopt import fmin, tpe, rand, Trials, space_eval
- from cdplib.pipeline_selector.PipelineSelector import PipelineSelector,\
- SpaceElementType
- from typing import Callable, Optional, Literal, Dict, Union, List
- class HyperoptPipelineSelector(PipelineSelector):
- """
- Use this class to perform a search
- for a machine learning pipeline in a given parameter space.
- The parameter space can include multiple types of Pipelines
- (SVM, XGBOOST, random forest, etc),
- as well as parameter distributions for each pipeline parameter.
- See example in main for the expected space structure.
- The search can be performed either randomly
- or with a tree-based algorithm. (Other methods are currently
- developped by hyperopt creators).
- Attribute trials is responsible for book-keeping parameter
- combinations that have already been tried out. This attribute
- is saved to a binary file every n minutes as well as every time
- a better pipeline was found.
- """
- def __init__(self,
- cost_func: Union[Callable, str],
- greater_is_better: bool,
- trials_path: str,
- backup_trials_freq: Optional[int] = None,
- cross_val_averaging_func: Callable = np.mean,
- additional_metrics: Optional[Dict[str, Callable]] = None,
- strategy_name: Optional[str] = None,
- stdout_log_level: Literal["INFO", "WARNING", "ERROR"]
- = "INFO"):
- """
- param Callable cost_func: function to minimize or maximize
- over the elements of a given (pipeline/hyperparameter) space
- :param bool greater_is_better: when True
- cost_func is maximized, else minimized.
- :param str trials_path: path at which the trials object is saved
- in binary format. From the trials object we can
- select information about the obtained scores, score variations,
- and pipelines, and parameters tried out so far. If a trials object
- already exists at the given path, it is loaded and the
- search is continued, else, the search is started from scratch.
- :param backup_trials_freq: frequecy in interations (trials)
- of saving the trials object at the trials_path.
- if None, the trials object is backed up avery time
- the score improves.
- :param Callable cross_val_averaging_func: Function to aggregate
- the cross-validation scores.
- Example different from the mean: mean - c*var.
- :param additional_metics: dict of additional metrics to save
- of the form {"metric_name": metric} where metric is a Callable.
- :param str strategy_name:
- a strategy is defined by the data set (columns/features and rows),
- cv object, cost function.
- When the strategy changes, one must start with new trials.
- :param str stdout_log_level: can be INFO, WARNING, ERROR
- """
- try:
- super().__init__(cost_func=cost_func,
- greater_is_better=greater_is_better,
- trials_path=trials_path,
- backup_trials_freq=backup_trials_freq,
- cross_val_averaging_func=cross_val_averaging_func,
- additional_metrics=additional_metrics,
- strategy_name=strategy_name,
- stdout_log_level=stdout_log_level)
- self._logger = Log("HyperoptPipelineSelector: ",
- stdout_log_level=stdout_log_level)
- self._trials = self._trials or Trials()
- except Exception as e:
- err = "Failed to intialize. Exit with error: {}".format(e)
- self._logger.log_and_raise_error(err)
- def run_trials(self,
- niter: int,
- algo: Literal[tpe.suggest, rand.suggest] = tpe.suggest)\
- -> None:
- '''
- Method performing the search of the best pipeline in the given space.
- Calls fmin function from the hyperopt library to minimize the output of
- _objective.
- :params int niter: number of search iterations
- :param algo: now can only take supported by the hyperopt library.
- For now these are tpe.suggest for a tree-based bayesian search
- or rad.suggest for randomized search
- '''
- try:
- self._trials = self._trials or Trials()
- self._logger.info(("Starting {0} iterations of search "
- "additional to {1} previous"
- .format(niter, len(self._trials.trials))))
- best_trial = fmin(fn=self._objective,
- space=self._space,
- algo=algo,
- trials=self._trials,
- max_evals=len(self._trials.trials) + niter)
- self._logger.info(
- "Best score is {0} with variance {1}"
- .format(
- self._trials.best_trial["result"]["score"],
- self._trials.best_trial["result"]["score_variance"]))
- self._logger.info(("Finished {0} iterations of search.\n"
- "Best parameters are:\n {1} ")
- .format(niter,
- space_eval(self._space, best_trial)))
- self.finished_tuning = True
- self.total_tuning_time = datetime.datetime.today()\
- - self.start_tuning_time
- self._backup_trials()
- except Exception as e:
- err = ("Failed to select best "
- "pipeline! Exit with error: {}").format(e)
- self._logger.log_and_raise_error(err)
- @property
- def number_of_trials(self) -> Union[int, None]:
- """
- :return: number of trials run so far
- with the given Trials object
- """
- try:
- return len(self._trials.trials)
- except Exception as e:
- err = ("Failed to retrieve the number of trials. "
- "Exit with error {}".format(e))
- self._logger.log_and_raise_error(err)
- def _get_space_element_from_trial(self, trial: dict)\
- -> Union[Dict[str, SpaceElementType], None]:
- """
- Hyperopt trials object does not contain the space
- elements that result in the corresponding trials.
- One has to use the function space_eval from
- hyperopt to get the space element.
- After retrieving the space element,
- parameters of the pipeline are set.
- """
- try:
- trial = deepcopy(trial)
- assert(self.attached_space),\
- "Hyperparameter space not attached."
- space_element = space_eval(self._space,
- {k: v[0] for k, v in
- trial['misc']['vals'].items()
- if len(v) > 0})
- pipeline = deepcopy(space_element["pipeline"])
- params = deepcopy(space_element["params"])
- pipeline.set_params(**params)
- space_element["pipeline"] = pipeline
- return space_element
- except Exception as e:
- err = ("Failed to retrieve a space element from a trial. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- def _get_space_element_from_index(self, i: int)\
- -> Union[Dict[str, SpaceElementType], None]:
- """
- Gets the space element of shape
- {"name": NAME, "params": PARAMS, "pipeline": PIPELINE}
- from the trial number i.
- """
- try:
- assert(len(self._trials.trials) > i),\
- ("Trials object is not long enough "
- "to retrieve index {}".format(i))
- return self._get_space_element_from_trial(self._trials.trials[i])
- except Exception as e:
- err = ("Failed to get space element from index. "
- "Exit with error {}".format(e))
- self._logger.log_and_raise_error(err)
- def _get_pipeline_from_index(self, i: int) -> Union[Pipeline, None]:
- """
- Gets a pipeline with set parameters from the trial number i
- """
- try:
- space_element = self._get_space_element_from_index(i)
- return space_element["pipeline"]
- except Exception as e:
- err = ("Failed to retrieve pipeline from index. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- @property
- def best_trial(self) -> Union[dict, None]:
- """
- :return: dictionary with the summary of the best trial
- and space element (name, pipeline, params)
- resulting in the best trial
- """
- if len(self._trials.trials) == 0:
- self._logger.log_and_throw_warning("Trials object is empty")
- return {}
- else:
- try:
- best_trial = deepcopy(self._trials.best_trial)
- if self.attached_space:
- space_element = self._get_space_element_from_trial(
- best_trial)
- else:
- space_element = {}
- warn = ("Space is not attached, "
- "To included the best pipeline "
- "attach the space")
- self._logger.log_and_throw_warning(warn)
- best_trial = deepcopy(self._trials.best_trial["result"])
- best_trial.update(space_element)
- return best_trial
- except Exception as e:
- err = "Failed to retrieve best trial. Exit with error: {}"\
- .format(e)
- self._logger.log_and_raise_error(err)
- @property
- def best_trial_score(self) -> Union[float, None]:
- """
- """
- try:
- if len(self.best_trial) > 0:
- return self.best_trial["score"]
- else:
- return np.nan
- except Exception as e:
- err = ("Failed to retrieve best trial score. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- @property
- def best_trial_score_variance(self) -> Union[float, None]:
- """
- """
- try:
- if len(self.best_trial) > 0:
- return self.best_trial["score_variance"]
- else:
- return np.nan
- except Exception as e:
- err = ("Failed to retrieve best trial score variance. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- @property
- def best_trial_pipeline(self) -> Union[Pipeline, None]:
- """
- """
- try:
- if len(self.best_trial) > 0:
- return self.best_trial["pipeline"]
- else:
- return np.nan
- except Exception as e:
- err = ("Failed to retrieve best trial pipeline. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- def get_n_best_trial_pipelines(self, n: int)\
- -> Union[List[Pipeline], None]:
- """
- :return: the list of n best pipelines
- documented in trials
- """
- try:
- if len(self._trials.trials) == 0:
- return []
- else:
- n_best_trials = sorted(self._trials.trials,
- key=lambda x: x["result"]["score"],
- reverse=True)[:n]
- return [self._get_space_element_from_trial(trial)["pipeline"]
- for trial in n_best_trials]
- except Exception as e:
- err = ("Failed to retrieve n best pipelines. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- def get_n_best_trial_pipelines_of_each_type(self, n: int)\
- -> Union[Dict[str, List[Pipeline]], None]:
- """
- :return: a dictiionry where keys are pipeline names,
- and values are lists of best pipelines with this name
- """
- try:
- scores = [trial["result"]["score"]
- for trial in self._trials.trials]
- names = [self._get_space_element_from_trial(trial)["name"]
- for trial in self._trials.trials]
- return pd.DataFrame({"name": names, "score": scores})\
- .sort_values(by=["name", "score"], ascending=False)\
- .groupby("name")\
- .head(n)\
- .reset_index()\
- .assign(pipeline=lambda x: x["index"]
- .apply(self._get_pipeline_from_index))\
- .groupby("name")["pipeline"]\
- .apply(lambda x: list(x))\
- .to_dict()
- except Exception as e:
- err = ("Failed to get n best pipelines of each type. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- def trials_to_excel(self, path: str = None) -> None:
- """
- Saves an excel file with pipeline names, scores,
- parameters, and timestamps.
- """
- try:
- results = [trial["result"] for trial in self._trials.trials]
- space_elements = [self._get_space_element_from_trial(trial)
- for trial in self._trials.trials]
- pd.DataFrame([{**result, **space_element}
- for result, space_element in
- zip(results, space_elements)]).to_excel(path)
- except Exception as e:
- err = ("Failed to write trials to excel. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- if __name__ == '__main__':
- # elementary example
- from sklearn.metrics import roc_auc_score, precision_score
- from sklearn.datasets import load_breast_cancer
- from cdplib.log import Log
- from cdplib.db_handlers import MongodbHandler
- from cdplib.hyperopt.space_sample import space
- # from cdplib.hyperopt.composed_space_sample import space
- trials_path = "hyperopt_trials_TEST.pkl"
- additional_metrics = {"precision": precision_score}
- strategy_name = "strategy_1"
- data_path = "data_TEST.h5"
- cv_path = "cv_TEST.pkl"
- collection_name = 'TEST_' + strategy_name
- logger = Log("HyperoptPipelineSelector__TEST:")
- logger.info("Start test")
- data_loader = load_breast_cancer()
- X = data_loader["data"]
- y = data_loader["target"]
- pd.DataFrame(X).to_hdf(data_path, key="X_train")
- pd.Series(y).to_hdf(data_path, key="y_train")
- cv = [(list(range(len(X)//3)), list(range(len(X)//3, len(X)))),
- (list(range(2*len(X)//3)), list(range(2*len(X)//3, len(X))))]
- pickle.dump(cv, open(cv_path, "wb"))
- hs = HyperoptPipelineSelector(cost_func=roc_auc_score,
- greater_is_better=True,
- trials_path=trials_path,
- additional_metrics=additional_metrics,
- strategy_name=strategy_name,
- stdout_log_level="WARNING")
- hs.attach_space(space=space)
- hs.attach_data_from_hdf5(data_hdf5_store_path=data_path,
- cv_pickle_path=cv_path)
- try:
- # TODO: this line causes a pytype to throw not-callable error
- # works fine with pytype on other class methods.
- save_method = MongodbHandler().insert_data_into_collection
- save_kwargs = {'collection_name': collection_name}
- # save_method = pd.DataFrame.to_excel()
- # save_kwargs = {'excel_writer': "TEST.xlsx"}
- hs.configer_summary_saving(save_method=save_method,
- kwargs=save_kwargs)
- logger.info("Configured summary saving in mongo")
- except Exception as e:
- logger.warning(("Could not configure summary saving in mongo. "
- "Exit with error: {}".format(e)))
- hs.run_trials(niter=10)
- logger.info("Best Trial: {}".format(hs.best_trial))
- logger.info("Total tuning time: {}".format(hs.total_tuning_time))
- for file in [trials_path, data_path, cv_path]:
- os.remove(file)
- logger.info("End test")
|