浏览代码

added HyperoprtPipelineSelector

tanja 3 年之前
父节点
当前提交
d90c42d5da
共有 1 个文件被更改,包括 448 次插入0 次删除
  1. 448 0
      cdplib/hyperopt/HyperoptPipelineSelector.py

+ 448 - 0
cdplib/hyperopt/HyperoptPipelineSelector.py

@@ -0,0 +1,448 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Tue Oct  6 15:04:25 2020
+
+@author: tanya
+@description:a class for selecting a machine learning
+ pipeline from a deterministic space of parameter distributions
+ over multiple pipelines.
+ The selection is though in such a way that a Trials object is being
+ maintained during the tuning process from which one can retrieve
+ the best pipeline so far as well as the entire tuning history
+ if needed.
+"""
+
+import os
+
+import pickle
+
+from copy import deepcopy
+
+from typing import Callable
+
+import pandas as pd
+import numpy as np
+
+from sklearn.pipeline import Pipeline
+
+from hyperopt import fmin, tpe, rand, Trials, space_eval
+
+from cdplib.pipeline_selector.PipelineSelector import PipelineSelector
+
+
+class HyperoptPipelineSelector(PipelineSelector):
+    """
+    Use this class to perform a search
+    for a machine learning pipeline in a given parameter space.
+    The parameter space can include multiple types of Pipelines
+    (SVM, XGBOOST, random forest, etc),
+    as well as parameter distributions for each pipeline parameter.
+    See example in main for the expected space structure.
+
+    The search can be performed either randomly
+    or with a tree-based algorithm. (Other methods are currently
+    developped by hyperopt creators).
+
+    Attribute trials is responsible for book-keeping parameter
+    combinations that have already been tried out. This attribute
+    is saved to a binary file every n minutes as well as every time
+    a better pipeline was found.
+    """
+    def __init__(self,
+                 cost_func: (Callable, str),
+                 greater_is_better: bool,
+                 trials_path: str,
+                 backup_trials_freq: int = None,
+                 cross_val_averaging_func: Callable = None,
+                 additional_metrics: dict = None,
+                 strategy_name: str = None,
+                 stdout_log_level: str = "INFO"):
+        """
+        :param callable cost_func: function to minimize or maximize
+
+        :param bool greater_is_better: when True
+            cost_func is maximized, else minimized.
+
+        :param str trials_path: path at which the trials object is saved
+            in binary format. From the trials object we can
+            select information about the obtained scores, score variations,
+            and pipelines, and parameters tried out so far. If a trials object
+            already exists at the given path, it is loaded and the
+            search is continued, else, the search is started from
+            the beginning.
+
+        :param backup_trials_freq: frequecy in interations (trials)
+            of saving the trials object at the trials_path.
+
+        :param str log_path: Optional, when not provided logs to stdout.
+
+        :param callable averaging_func: optional,
+            when not provided set to mean. Function
+            to aggregate the cross-validated values of the cost function.
+            Classic situation is to take the mean,
+            another example is, for example mean() - c*var().
+        :param additional_metics: dict of additional metrics to save
+            of the form {"metric_name": metric} where metric is a Callable.
+
+        :param str strategy_name: a name might be asigned to the trials,
+            a strategy is defined by the data set, cv object, cost function.
+            When the strategy changes, one should start with new trials.
+
+        :param str stdout_log_level: can be INFO, WARNING, ERROR
+        """
+
+        super().__init__(cost_func=cost_func,
+                         greater_is_better=greater_is_better,
+                         trials_path=trials_path,
+                         backup_trials_freq=backup_trials_freq,
+                         cross_val_averaging_func=cross_val_averaging_func,
+                         additional_metrics=additional_metrics,
+                         strategy_name=strategy_name,
+                         stdout_log_level=stdout_log_level)
+
+        self._trials = self._trials or Trials()
+
+    def run_trials(self,
+                   niter: int,
+                   algo: callable = tpe.suggest):
+        '''
+        Method performing the search of the best pipeline in the given space.
+        Calls fmin function from the hyperopt library to minimize the output of
+        _objective.
+
+        :params int niter: number of search iterations
+        :param callable algo: now can only take values tpe for a tree-based
+            random search or random for random search
+        '''
+        try:
+            assert(self.attached_space)
+        except AssertionError:
+            err = ("Space must be attach to be able to "
+                   "retrieve this information.")
+            self._logger.log_and_raise_error(err)
+
+        try:
+            assert(isinstance(niter, int))
+        except AssertionError:
+            err = "Parameter 'niter' must be of int type"
+            self._logger.log_and_raise_error(err, ErrorType=NameError)
+
+        try:
+            # right now only two algorithms are provided by hyperopt
+            assert(algo in [tpe.suggest, rand.suggest])
+        except AssertionError:
+            err = ("Parameter 'algo' can be now only tpe or random. "
+                   "If other algorithms have been developped by "
+                   "by hyperopt, plased add them to the list.")
+            self._logger.log_and_raise_error(err)
+
+        try:
+            self._trials = self._trials or Trials()
+
+            self._logger.info(("Starting {0} iterations of search "
+                               "additional to {1} previous"
+                               .format(niter, len(self._trials.trials))))
+
+            best_trial = fmin(fn=self._objective,
+                              space=self._space,
+                              algo=algo,
+                              trials=self._trials,
+                              max_evals=len(self._trials.trials) + niter)
+
+            self._logger.info(
+                    "Best score is {0} with variance {1}"
+                    .format(
+                     self._trials.best_trial["result"]["score"],
+                     self._trials.best_trial["result"]["score_variance"]))
+
+            self._logger.info(("Finished {0} iterations of search.\n"
+                               "Best parameters are:\n {1} ")
+                              .format(niter,
+                                      space_eval(self._space, best_trial)))
+
+            self.finished_tuning = True
+
+            self._backup_trials()
+
+        except Exception as e:
+            raise ValueError(("Failed to select best "
+                             "pipeline! Exit with error: {}").format(e))
+
+    @property
+    def number_of_trials(self) -> int:
+        """
+        :return: number of trials run so far
+            with the given Trials object
+        """
+
+        try:
+            return len(self._trials.trials)
+        except Exception as e:
+            err = ("Failed to retrieve the number of trials. "
+                   "Exit with error {}".format(e))
+            self._logger.log_and_raise_error(err)
+
+    def _get_space_element_from_trial(self, trial) -> dict:
+        """
+        Hyperopt trials object does not contain the space
+             elements that result in the corresponding trials.
+             One has to use the function space_eval from
+             hyperopt to get the space element.
+
+        After retrieving the space element,
+            parameters of the pipeline are set.
+        """
+        trial = deepcopy(trial)
+
+        try:
+            assert(self.attached_space)
+        except AssertionError:
+            err = "Hyperparameter space not attached."
+            self._logger.log_and_raise_error(err)
+
+        try:
+            space_element = space_eval(self._space,
+                                       {k: v[0] for k, v in
+                                        trial['misc']['vals'].items()
+                                        if len(v) > 0})
+
+            pipeline = deepcopy(space_element["pipeline"])
+            params = deepcopy(space_element["params"])
+            pipeline.set_params(**params)
+
+            space_element["pipeline"] = pipeline
+
+            return space_element
+
+        except Exception as e:
+            err = ("Failed to retrieve a space element from a trial. "
+                   "Exit with error: {}".format(e))
+
+    def _get_space_element_from_index(self, i: int) -> dict:
+        """
+        Gets the space element of shape
+        {"name": NAME, "params": PARAMS, "pipeline": PIPELINE}
+        from the trial number i.
+        """
+        try:
+            assert(len(self._trials.trials) > i)
+        except AssertionError:
+            err = ("Trials object is not long enough "
+                   "to retrieve index {}".format(i))
+            self._logger.log_and_raise_error(err, ErrorType=NameError)
+
+        try:
+            return self._get_space_element_from_trial(self._trials.trials[i])
+
+        except Exception as e:
+            err = ("Failed to get space element from index. "
+                   "Exit with error {}".format(e))
+            self._logger.log_and_raise_error(err)
+
+    def _get_pipeline_from_index(self, i: int) -> Pipeline:
+        """
+        Gets a pipeline with set parameters from the trial number i
+        """
+        try:
+            space_element = self._get_space_element_from_index(i)
+
+            return space_element["pipeline"]
+
+        except Exception as e:
+            err = ("Failed to retrieve pipeline from index. "
+                   "Exit with error: {}".format(e))
+            self._logger.log_and_raise_error(err)
+
+    @property
+    def best_trial(self) -> dict:
+        """
+        :return: dictionary with the summary of the best trial
+            and space element (name, pipeline, params)
+            resulting in the best trial
+        """
+        if len(self._trials.trials) == 0:
+            self._logger.log_and_throw_warning("Trials object is empty")
+            return {}
+        else:
+            try:
+                assert(self.attached_space)
+            except AssertionError:
+                err = "Space is not attached"
+
+            try:
+                best_trial = deepcopy(self._trials.best_trial)
+
+                space_element = self._get_space_element_from_trial(best_trial)
+
+                best_trial = deepcopy(self._trials.best_trial["result"])
+
+                best_trial.update(space_element)
+
+                return best_trial
+
+            except Exception as e:
+                err = "Failed to retrieve best trial. Exit with error: {}"\
+                    .format(e)
+                self._logger.log_and_raise_error(err)
+
+    @property
+    def best_trial_score(self) -> float:
+        """
+        """
+        try:
+            if len(self.best_trial) > 0:
+                return self.best_trial["score"]
+            else:
+                return np.nan
+
+        except Exception as e:
+            err = ("Failed to retrieve best trial score. "
+                   "Exit with error: {}".format(e))
+            self._logger.log_and_raise_error(err)
+
+    @property
+    def best_trial_score_variance(self) -> float:
+        """
+        """
+        try:
+            if len(self.best_trial) > 0:
+                return self.best_trial["score_variance"]
+            else:
+                return np.nan
+
+        except Exception as e:
+            err = ("Failed to retrieve best trial score variance. "
+                   "Exit with error: {}".format(e))
+            self._logger.log_and_raise_error(err)
+
+    @property
+    def best_trial_pipeline(self) -> Pipeline:
+        """
+        """
+        try:
+            if len(self.best_trial) > 0:
+                return self.best_trial["pipeline"]
+            else:
+                return np.nan
+
+        except Exception as e:
+            err = ("Failed to retrieve best trial pipeline. "
+                   "Exit with error: {}".format(e))
+            self._logger.log_and_raise_error(err)
+
+    def get_n_best_trial_pipelines(self, n: int) -> list:
+        """
+        :return: the list of n best pipelines
+        documented in trials
+        """
+        try:
+            if len(self._trials.trials) == 0:
+                return []
+            else:
+                n_best_trials = sorted(self._trials.trials,
+                                       key=lambda x: x["result"]["score"],
+                                       reverse=True)[:n]
+
+                return [self._get_space_element_from_trial(trial)["pipeline"]
+                        for trial in n_best_trials]
+
+        except Exception as e:
+            err = ("Failed to retrieve n best pipelines. "
+                   "Exit with error: {}".format(e))
+            self._logger.log_and_raise_error(err)
+
+    def get_n_best_trial_pipelines_of_each_type(self, n: int) -> dict:
+        """
+        :return: a dictiionry where keys are pipeline names,
+        and values are lists of best pipelines with this name
+        """
+        scores = [trial["result"]["score"] for trial in self._trials.trials]
+
+        names = [self._get_space_element_from_trial(trial)["name"]
+                 for trial in self._trials.trials]
+
+        return pd.DataFrame({"name": names, "score": scores})\
+                 .sort_values(by=["name", "score"], ascending=False)\
+                 .groupby("name")\
+                 .head(n)\
+                 .reset_index()\
+                 .assign(pipeline=lambda x: x["index"]
+                         .apply(self._get_pipeline_from_index))\
+                 .groupby("name")["pipeline"]\
+                 .apply(lambda x: list(x))\
+                 .to_dict()
+
+    def trials_to_excel(self, path: str = None):
+        """
+        Saves an excel file with pipeline names, scores,
+        parameters, and timestamps.
+        """
+        results = [trial["result"] for trial in self._trials.trials]
+
+        space_elements = [self._get_space_element_from_trial(trial)
+                          for trial in self._trials.trials]
+
+        pd.DataFrame([{**result, **space_element}
+                      for result, space_element in
+                      zip(results, space_elements)]).to_excel(path)
+
+
+if __name__ == '__main__':
+
+    # elementary example
+
+    from sklearn.metrics import roc_auc_score, precision_score
+    from sklearn.datasets import load_breast_cancer
+    from cdplib.log import Log
+    from cdplib.db_handlers import MongodbHandler
+    from cdplib.hyperopt.space_sample import space
+
+    trials_path = "hyperopt_trials_TEST.pkl"
+    additional_metrics = {"precision": precision_score}
+    strategy_name = "strategy_1"
+    data_path = "data_TEST.h5"
+    cv_path = "cv_TEST.pkl"
+    collection_name = 'TEST_' + strategy_name
+
+    logger = Log("HyperoptPipelineSelector__TEST:")
+
+    logger.info("Start test")
+
+    data_loader = load_breast_cancer()
+
+    X = data_loader["data"]
+    y = data_loader["target"]
+
+    pd.DataFrame(X).to_hdf(data_path, key="X_train")
+    pd.Series(y).to_hdf(data_path, key="y_train")
+
+    cv = [(list(range(len(X)//3)), list(range(len(X)//3, len(X)))),
+          (list(range(2*len(X)//3)), list(range(2*len(X)//3, len(X))))]
+
+    pickle.dump(cv, open(cv_path, "wb"))
+
+    hs = HyperoptPipelineSelector(cost_func=roc_auc_score,
+                                  greater_is_better=True,
+                                  trials_path=trials_path,
+                                  additional_metrics=additional_metrics,
+                                  strategy_name=strategy_name,
+                                  stdout_log_level="WARNING")
+
+    hs.attach_space(space=space)
+
+    hs.attach_data_from_hdf5(data_hdf5_store_path=data_path,
+                             cv_pickle_path=cv_path)
+
+    save_method = MongodbHandler().insert_data_into_collection
+    save_kwargs = {'collection_name': collection_name}
+
+    hs.configer_summary_saving(save_method=save_method,
+                               kwargs=save_kwargs)
+
+    hs.run_trials(niter=10)
+
+    for file in [trials_path, data_path, cv_path]:
+        os.remove(file)
+
+    logger.info("End test")