Browse Source

added a new hyperopt selector scipt called Selector and sample parameter spaces

tanja 3 years ago
parent
commit
0d882e185b

+ 496 - 0
cdplib/hyperopt/HyperoptPipelineSelector.py

@@ -0,0 +1,496 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Tue Oct  6 15:04:25 2020
+
+@author: tanya
+@description:a class for selecting a machine learning
+ pipeline from a deterministic space of parameter distributions
+ over multiple pipelines.
+ The selection is though in such a way that a Trials object is being
+ maintained during the tuning process from which one can retrieve
+ the best pipeline so far as well as the entire tuning history
+ if needed.
+"""
+
+import os
+
+import pickle
+
+from copy import deepcopy
+
+import datetime
+
+import pandas as pd
+import numpy as np
+
+from sklearn.pipeline import Pipeline
+
+from hyperopt import fmin, tpe, rand, Trials, space_eval
+
+from cdplib.pipeline_selector.PipelineSelector import PipelineSelector,\
+     SpaceElementType
+
+from typing import Callable, Optional, Literal, Dict, Union, List
+
+
+class HyperoptPipelineSelector(PipelineSelector):
+    """
+    Use this class to perform a search
+    for a machine learning pipeline in a given parameter space.
+    The parameter space can include multiple types of Pipelines
+    (SVM, XGBOOST, random forest, etc),
+    as well as parameter distributions for each pipeline parameter.
+    See example in main for the expected space structure.
+
+    The search can be performed either randomly
+    or with a tree-based algorithm. (Other methods are currently
+    developped by hyperopt creators).
+
+    Attribute trials is responsible for book-keeping parameter
+    combinations that have already been tried out. This attribute
+    is saved to a binary file every n minutes as well as every time
+    a better pipeline was found.
+    """
+    def __init__(self,
+                 cost_func: Union[Callable, str],
+                 greater_is_better: bool,
+                 trials_path: str,
+                 backup_trials_freq: Optional[int] = None,
+                 cross_val_averaging_func: Callable = np.mean,
+                 additional_metrics: Optional[Dict[str, Callable]] = None,
+                 strategy_name: Optional[str] = None,
+                 stdout_log_level: Literal["INFO", "WARNING", "ERROR"]
+                 = "INFO"):
+        """
+        param Callable cost_func: function to minimize or maximize
+            over the elements of a given (pipeline/hyperparameter) space
+
+        :param bool greater_is_better: when True
+            cost_func is maximized, else minimized.
+
+        :param str trials_path: path at which the trials object is saved
+            in binary format. From the trials object we can
+            select information about the obtained scores, score variations,
+            and pipelines, and parameters tried out so far. If a trials object
+            already exists at the given path, it is loaded and the
+            search is continued, else, the search is started from scratch.
+
+        :param backup_trials_freq: frequecy in interations (trials)
+            of saving the trials object at the trials_path.
+            if None, the trials object is backed up avery time
+            the score improves.
+
+        :param Callable cross_val_averaging_func: Function to aggregate
+            the cross-validation scores.
+            Example different from the mean: mean - c*var.
+
+        :param additional_metics: dict of additional metrics to save
+            of the form {"metric_name": metric} where metric is a Callable.
+
+        :param str strategy_name:
+            a strategy is defined by the data set (columns/features and rows),
+            cv object, cost function.
+            When the strategy changes, one must start with new trials.
+
+        :param str stdout_log_level: can be INFO, WARNING, ERROR
+        """
+
+        try:
+
+            super().__init__(cost_func=cost_func,
+                             greater_is_better=greater_is_better,
+                             trials_path=trials_path,
+                             backup_trials_freq=backup_trials_freq,
+                             cross_val_averaging_func=cross_val_averaging_func,
+                             additional_metrics=additional_metrics,
+                             strategy_name=strategy_name,
+                             stdout_log_level=stdout_log_level)
+
+            self._logger = Log("HyperoptPipelineSelector: ",
+                               stdout_log_level=stdout_log_level)
+
+            self._trials = self._trials or Trials()
+
+        except Exception as e:
+            err = "Failed to intialize. Exit with error: {}".format(e)
+            self._logger.log_and_raise_error(err)
+
+    def run_trials(self,
+                   niter: int,
+                   algo: Literal[tpe.suggest, rand.suggest] = tpe.suggest)\
+            -> None:
+        '''
+        Method performing the search of the best pipeline in the given space.
+        Calls fmin function from the hyperopt library to minimize the output of
+        _objective.
+
+        :params int niter: number of search iterations
+        :param algo: now can only take supported by the hyperopt library.
+            For now these are tpe.suggest for a tree-based bayesian search
+            or rad.suggest for randomized search
+        '''
+        try:
+            self._trials = self._trials or Trials()
+
+            self._logger.info(("Starting {0} iterations of search "
+                               "additional to {1} previous"
+                               .format(niter, len(self._trials.trials))))
+
+            best_trial = fmin(fn=self._objective,
+                              space=self._space,
+                              algo=algo,
+                              trials=self._trials,
+                              max_evals=len(self._trials.trials) + niter)
+
+            self._logger.info(
+                    "Best score is {0} with variance {1}"
+                    .format(
+                     self._trials.best_trial["result"]["score"],
+                     self._trials.best_trial["result"]["score_variance"]))
+
+            self._logger.info(("Finished {0} iterations of search.\n"
+                               "Best parameters are:\n {1} ")
+                              .format(niter,
+                                      space_eval(self._space, best_trial)))
+
+            self.finished_tuning = True
+
+            self.total_tuning_time = datetime.datetime.today()\
+                - self.start_tuning_time
+
+            self._backup_trials()
+
+        except Exception as e:
+            err = ("Failed to select best "
+                   "pipeline! Exit with error: {}").format(e)
+
+            self._logger.log_and_raise_error(err)
+
+    @property
+    def number_of_trials(self) -> Union[int, None]:
+        """
+        :return: number of trials run so far
+            with the given Trials object
+        """
+
+        try:
+            return len(self._trials.trials)
+
+        except Exception as e:
+            err = ("Failed to retrieve the number of trials. "
+                   "Exit with error {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    def _get_space_element_from_trial(self, trial: Dict)\
+            -> Union[Dict[SpaceElementType], None]:
+        """
+        Hyperopt trials object does not contain the space
+             elements that result in the corresponding trials.
+             One has to use the function space_eval from
+             hyperopt to get the space element.
+
+        After retrieving the space element,
+            parameters of the pipeline are set.
+        """
+        try:
+            trial = deepcopy(trial)
+
+            assert(self.attached_space),\
+                "Hyperparameter space not attached."
+
+            space_element = space_eval(self._space,
+                                       {k: v[0] for k, v in
+                                        trial['misc']['vals'].items()
+                                        if len(v) > 0})
+
+            pipeline = deepcopy(space_element["pipeline"])
+            params = deepcopy(space_element["params"])
+            pipeline.set_params(**params)
+
+            space_element["pipeline"] = pipeline
+
+            return space_element
+
+        except Exception as e:
+            err = ("Failed to retrieve a space element from a trial. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    def _get_space_element_from_index(self, i: int)\
+            -> Union[Dict[SpaceElementType], None]:
+        """
+        Gets the space element of shape
+        {"name": NAME, "params": PARAMS, "pipeline": PIPELINE}
+        from the trial number i.
+        """
+        try:
+            assert(len(self._trials.trials) > i),\
+                ("Trials object is not long enough "
+                 "to retrieve index {}".format(i))
+
+            return self._get_space_element_from_trial(self._trials.trials[i])
+
+        except Exception as e:
+            err = ("Failed to get space element from index. "
+                   "Exit with error {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    def _get_pipeline_from_index(self, i: int) -> Union[Pipeline, None]:
+        """
+        Gets a pipeline with set parameters from the trial number i
+        """
+        try:
+            space_element = self._get_space_element_from_index(i)
+
+            return space_element["pipeline"]
+
+        except Exception as e:
+            err = ("Failed to retrieve pipeline from index. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    @property
+    def best_trial(self) -> Union[Dict, None]:
+        """
+        :return: dictionary with the summary of the best trial
+            and space element (name, pipeline, params)
+            resulting in the best trial
+        """
+        if len(self._trials.trials) == 0:
+
+            self._logger.log_and_throw_warning("Trials object is empty")
+            return {}
+
+        else:
+
+            try:
+                best_trial = deepcopy(self._trials.best_trial)
+
+                if self.attached_space:
+
+                    space_element = self._get_space_element_from_trial(
+                            best_trial)
+                else:
+                    space_element = {}
+
+                    warn = ("Space is not attached, "
+                            "To included the best pipeline "
+                            "attach the space")
+                    self._logger.log_and_throw_warning(warn)
+
+                best_trial = deepcopy(self._trials.best_trial["result"])
+
+                best_trial.update(space_element)
+
+                return best_trial
+
+            except Exception as e:
+                err = "Failed to retrieve best trial. Exit with error: {}"\
+                    .format(e)
+
+                self._logger.log_and_raise_error(err)
+
+    @property
+    def best_trial_score(self) -> Union[float, None]:
+        """
+        """
+        try:
+            if len(self.best_trial) > 0:
+                return self.best_trial["score"]
+            else:
+                return np.nan
+
+        except Exception as e:
+            err = ("Failed to retrieve best trial score. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    @property
+    def best_trial_score_variance(self) -> Union[float, None]:
+        """
+        """
+        try:
+            if len(self.best_trial) > 0:
+                return self.best_trial["score_variance"]
+            else:
+                return np.nan
+
+        except Exception as e:
+            err = ("Failed to retrieve best trial score variance. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    @property
+    def best_trial_pipeline(self) -> Union[Pipeline, None]:
+        """
+        """
+        try:
+            if len(self.best_trial) > 0:
+                return self.best_trial["pipeline"]
+            else:
+                return np.nan
+
+        except Exception as e:
+            err = ("Failed to retrieve best trial pipeline. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    def get_n_best_trial_pipelines(self, n: int)\
+            -> Union[List[Pipeline], None]:
+        """
+        :return: the list of n best pipelines
+        documented in trials
+        """
+        try:
+            if len(self._trials.trials) == 0:
+                return []
+            else:
+                n_best_trials = sorted(self._trials.trials,
+                                       key=lambda x: x["result"]["score"],
+                                       reverse=True)[:n]
+
+                return [self._get_space_element_from_trial(trial)["pipeline"]
+                        for trial in n_best_trials]
+
+        except Exception as e:
+            err = ("Failed to retrieve n best pipelines. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    def get_n_best_trial_pipelines_of_each_type(self, n: int)\
+            -> Union[Dict[str, List[Pipeline]], None]:
+        """
+        :return: a dictiionry where keys are pipeline names,
+        and values are lists of best pipelines with this name
+        """
+        try:
+            scores = [trial["result"]["score"]
+                      for trial in self._trials.trials]
+
+            names = [self._get_space_element_from_trial(trial)["name"]
+                     for trial in self._trials.trials]
+
+            return pd.DataFrame({"name": names, "score": scores})\
+                     .sort_values(by=["name", "score"], ascending=False)\
+                     .groupby("name")\
+                     .head(n)\
+                     .reset_index()\
+                     .assign(pipeline=lambda x: x["index"]
+                             .apply(self._get_pipeline_from_index))\
+                     .groupby("name")["pipeline"]\
+                     .apply(lambda x: list(x))\
+                     .to_dict()
+
+        except Exception as e:
+            err = ("Failed to get n best pipelines of each type. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    def trials_to_excel(self, path: str = None) -> None:
+        """
+        Saves an excel file with pipeline names, scores,
+        parameters, and timestamps.
+        """
+        try:
+            results = [trial["result"] for trial in self._trials.trials]
+
+            space_elements = [self._get_space_element_from_trial(trial)
+                              for trial in self._trials.trials]
+
+            pd.DataFrame([{**result, **space_element}
+                          for result, space_element in
+                          zip(results, space_elements)]).to_excel(path)
+
+        except Exception as e:
+            err = ("Failed to write trials to excel. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+
+if __name__ == '__main__':
+
+    # elementary example
+
+    from sklearn.metrics import roc_auc_score, precision_score
+    from sklearn.datasets import load_breast_cancer
+    from cdplib.log import Log
+    from cdplib.db_handlers import MongodbHandler
+    from cdplib.hyperopt.space_sample import space
+    # from cdplib.hyperopt.composed_space_sample import space
+
+    trials_path = "hyperopt_trials_TEST.pkl"
+    additional_metrics = {"precision": precision_score}
+    strategy_name = "strategy_1"
+    data_path = "data_TEST.h5"
+    cv_path = "cv_TEST.pkl"
+    collection_name = 'TEST_' + strategy_name
+
+    logger = Log("HyperoptPipelineSelector__TEST:")
+
+    logger.info("Start test")
+
+    data_loader = load_breast_cancer()
+
+    X = data_loader["data"]
+    y = data_loader["target"]
+
+    pd.DataFrame(X).to_hdf(data_path, key="X_train")
+    pd.Series(y).to_hdf(data_path, key="y_train")
+
+    cv = [(list(range(len(X)//3)), list(range(len(X)//3, len(X)))),
+          (list(range(2*len(X)//3)), list(range(2*len(X)//3, len(X))))]
+
+    pickle.dump(cv, open(cv_path, "wb"))
+
+    hs = HyperoptPipelineSelector(cost_func=roc_auc_score,
+                                  greater_is_better=True,
+                                  trials_path=trials_path,
+                                  additional_metrics=additional_metrics,
+                                  strategy_name=strategy_name,
+                                  stdout_log_level="WARNING")
+
+    hs.attach_space(space=space)
+
+    hs.attach_data_from_hdf5(data_hdf5_store_path=data_path,
+                             cv_pickle_path=cv_path)
+
+    try:
+
+        # TODO: this line causes a pytype to throw not-callable error
+        # works fine with pytype on other class methods.
+        save_method = MongodbHandler().insert_data_into_collection
+        save_kwargs = {'collection_name': collection_name}
+
+        # save_method = pd.DataFrame.to_excel()
+        # save_kwargs = {'excel_writer': "TEST.xlsx"}
+
+        hs.configer_summary_saving(save_method=save_method,
+                                   kwargs=save_kwargs)
+
+        logger.info("Configured summary saving in mongo")
+
+    except Exception as e:
+
+        logger.warning(("Could not configure summary saving in mongo. "
+                        "Exit with error: {}".format(e)))
+
+    hs.run_trials(niter=10)
+
+    logger.info("Best Trial: {}".format(hs.best_trial))
+    logger.info("Total tuning time: {}".format(hs.total_tuning_time))
+
+    for file in [trials_path, data_path, cv_path]:
+        os.remove(file)
+
+    logger.info("End test")

+ 3 - 0
cdplib/hyperopt/__init__.py

@@ -1 +1,4 @@
 from .HyperoptPipelineSelection import *
+from .HyperoptPipelineSelector import *
+from .composed_space_sample import *
+from .space_sample import *

+ 116 - 0
cdplib/hyperopt/composed_space_sample.py

@@ -0,0 +1,116 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Mon Jul  6 14:02:24 2020
+
+@author: tanya
+@description: space object to pass to HyperoptPipelineSelection class
+"""
+from sklearn.ensemble import RandomForestClassifier
+from sklearn.feature_selection import SelectFromModel, SelectPercentile,\
+    RFE, SelectFpr, f_classif, chi2, mutual_info_classif
+from xgboost import XGBRFClassifier
+from sklearn.svm import SVC
+from sklearn.linear_model import LogisticRegression
+from sklearn.decomposition import PCA
+from sklearn.pipeline import Pipeline
+from sklearn.preprocessing import StandardScaler
+from hyperopt import hp
+
+from cdplib.hyperparameter_space_composer.SpaceComposer import SpaceComposer
+
+# TODO: add sample spaces for encoders and transformers
+
+encoders = []
+
+transformers = []
+
+selectors = [
+    {"name": "kbest",
+     "object": SelectPercentile(),
+     "params": {
+       "percentile": 3 + hp.randint("kbest__percentile", 60),
+       "score_func": hp.choice("kbest__score_func",
+                               [f_classif, chi2, mutual_info_classif])}},
+
+    {"name": "fpr",
+     "object": SelectFpr(),
+     "params": {
+        "score_func": hp.choice("fpr__score_func",
+                                [f_classif, chi2]),
+        # mutual_info_classif does not work here
+        "alpha": hp.uniform("fpr__alpha", 0.1, 0.6)}},
+
+    {"name": "rfe_rf",
+     "object":
+         RFE(estimator=RandomForestClassifier(n_jobs=-1, random_state=33)),
+     "params": {
+         "n_features_to_select":
+             3 + hp.randint("rfe_rf__n_features_to_select", 200),
+         "estimator__n_estimators":
+             20 + hp.randint("rfe_rf__estimator__n_estimators", 70)}},
+
+    {"name": "rfm_rf",
+     "object":
+         SelectFromModel(estimator=RandomForestClassifier(n_jobs=-1,
+                                                          random_state=33)),
+     "params": {
+         "estimator__n_estimators":
+             20 + hp.randint("rfm_rf__estimator__n_estimators", 70)}},
+
+    {"name": "rfm_lr",
+     "object":
+         SelectFromModel(estimator=LogisticRegression(n_jobs=-1,
+                                                      random_state=33)),
+     "params": {
+          "estimator__C": hp.uniform("rfm_lr__estimator__C", 0.1, 1000)}},
+
+    {"name": "std_scaler_pca",
+     "object": Pipeline([
+             ("scaler", StandardScaler()),
+             ("pca", PCA(random_state=33))]),
+     "params": {
+        "pca__n_components": hp.uniform("pca__n_components", 0.1, 1),
+       }}
+    ]
+
+models = [
+        {"name": "xgb",
+         "object": XGBRFClassifier(n_jobs=-1, eval_metric="map", seed=33),
+         "params": {
+           "n_estimators": 50 + hp.randint('xgb__n_estimators', 100),
+           "max_depth": 3 + hp.randint("xgb__max_depth", 10),
+           "learning_rate": hp.loguniform("xgb__learning_rate", 0.01, 0.5)
+           }},
+
+        {"name": "rf",
+         "object": RandomForestClassifier(n_jobs=-1, random_state=33),
+         "params": {
+           "n_estimators": 50 + hp.randint('rf__n_estimators', 500),
+           "max_depth": 3 + hp.randint("rf__max_depth", 10),
+           "min_samples_leaf": 1 + hp.randint("rf__min_samples_leaf", 10)
+           }},
+
+        # the default solver does not accept l1 penalty
+        {"name": "lr",
+         "object": LogisticRegression(random_state=33,
+                                      solver='liblinear',
+                                      # n_jobs=-1
+                                      ),
+         "params":  {
+           "penalty": hp.choice("lr__penalty", ["l1", "l2"]),
+           "C": hp.uniform("lr__C", 0.1, 1000)}},
+
+        # svc does not support parallelizaiton, therefore is slow
+        {"name": "svc",
+         "object": SVC(random_state=33),
+         "params": {
+            "kernel": hp.choice("svc__kernel", ["linear", "poly", "rbf"]),
+            "degree": 2 + hp.randint("svc__degree", 3),
+            "C": hp.uniform("svc__C", 0.1, 1000)
+            }}
+        ]
+
+step_list = [encoders, transformers, selectors, models]
+
+space = SpaceComposer().compose_hyperopt_space(step_list)

+ 40 - 0
cdplib/hyperopt/space_sample.py

@@ -0,0 +1,40 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Mon Oct  5 09:50:24 2020
+
+@author: tanya
+"""
+
+from sklearn.ensemble import RandomForestClassifier
+from sklearn.feature_selection import SelectPercentile
+from sklearn.linear_model import LogisticRegression
+from sklearn.decomposition import PCA
+from sklearn.pipeline import Pipeline
+from sklearn.preprocessing import StandardScaler
+from hyperopt import hp
+import numpy as np
+
+
+space = hp.choice("pipelines", [
+
+        {"name": "std_scaler_kbest_rf",
+         "pipeline": Pipeline([
+                 ("std_scaler", StandardScaler()),
+                 ("kbest", SelectPercentile()),
+                 ("rf", RandomForestClassifier())]),
+         "params": {"kbest__percentile":
+                    hp.choice('kbest__percentile', range(1, 3)),
+                    "rf__n_estimators":
+                    50 + hp.randint('rf__n_estimators', 50)}},
+
+        {"name": "std_scaler_pca_lr",
+         "pipeline": Pipeline([
+                 ("std_scaler", StandardScaler()),
+                 ("pca", PCA()),
+                 ("lr", LogisticRegression())]),
+         "params": {"lr__C":
+                    hp.loguniform("lr__C", np.log(0.01), np.log(0.1)),
+                    "pca__n_components":
+                    1 + hp.randint("pca__n_components", 4)}}
+        ])