Procházet zdrojové kódy

added an abstract class for pipeline selection and a class for gridsearch with trials

tanja před 3 roky
rodič
revize
6b4e27299b

+ 0 - 16
cdplib/gridsearch.py

@@ -1,16 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-"""
-Created on Wed Sep 30 14:15:17 2020
-
-@author: tanya
-"""
-
-from typing import Callable
-import numpy as np
-import pickle
-import os
-
-
-class GridSearchPipelineSelection:
-

+ 195 - 0
cdplib/gridsearch/gridsearch.py

@@ -0,0 +1,195 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Wed Sep 30 14:15:17 2020
+
+@author: tanya
+"""
+
+import os
+import sys
+from itertools import product
+from collections import ChainMap
+from sklearn.pipeline import Pipeline
+from cdplib.pipeline_selector.PipelineSelector import PipelineSelector
+
+sys.path.append(os.getcwd())
+
+
+class GridSearchPipelineSelector(PipelineSelector):
+    """
+    """
+    def __init__(self,
+                 cost_func,
+                 greater_is_better: bool,
+                 trials_path: str,
+                 backup_trials_freq: int = 1,
+                 cross_val_averaging_func: callable = None):
+        '''
+        :param callable cost_func: function to minimize or maximize
+
+        :param bool greater_is_better: when True
+            cost_func is maximized, else minimized.
+
+        :param str trials_path: path at which the trials object is saved
+            in binary format. From the trials object we can
+            select information about the obtained scores, score variations,
+            and pipelines, and parameters tried out so far. If a trials object
+            already exists at the given path, it is loaded and the
+            search is continued, else, the search is started from
+            the beginning.
+
+        :param backup_trials_freq: frequecy in interations (trials)
+            of saving the trials object at the trials_path.
+
+        :param str log_path: Optional, when not provided logs to stdout.
+
+        :param callable averaging_func: optional,
+            when not provided set to mean. Function
+            to aggregate the cross-validated values of the cost function.
+            Classic situation is to take the mean,
+            another example is, for example mean() - c*var().
+        '''
+        super().__init__(cost_func=cost_func,
+                         greater_is_better=greater_is_better,
+                         trials_path=trials_path,
+                         backup_trials_freq=backup_trials_freq,
+                         cross_val_averaging_func=cross_val_averaging_func)
+
+    def run_trials(self):
+        """
+        """
+        if not self._space_attached:
+            err = "Parameter distribution space must be attached"
+            self._logger.log_and_raise_error(err)
+
+        self._trials = self._trials or []
+
+        done_trial_ids = [{"name": trial["name"],
+                           "param_set": trial["param_set"]}
+                          for trial in self._trials]
+
+        # list (generator) of (flattened) dictionaries
+        # with all different combinations of
+        # parameters for different pipelines
+        # from the space definition.
+        space_unfolded = ({"name": pipeline_dist["name"],
+                           "pipeline": pipeline_dist["pipeline"],
+                           "param_set": param_set}
+                          for pipeline_dist in self._space
+                          for param_set in
+                          (dict(ChainMap(*tup)) for tup in
+                           product(*[[{k: v} for v in
+                                      pipeline_dist["params"][k]]
+                                     for k in pipeline_dist["params"]])))
+
+        for space_element in space_unfolded:
+
+            trial_id = {"name": space_element["name"],
+                        "param_set": space_element["param_set"]}
+
+            if trial_id in done_trial_ids:
+                continue
+
+            result = self._objective(space_element)
+
+            pipeline = space_element["pipeline"].set_params(
+                    **space_element["param_set"])
+
+            self._trials.append({"name": space_element["name"],
+                                 "param_set": space_element["param_set"],
+                                 "pipeline": pipeline,
+                                 "result": result})
+
+    @property
+    def best_trial(self) -> dict:
+        """
+        """
+        if self._trials is None:
+            self._logger.error(("Trials object is empty. "
+                                "Call run_trials method."))
+        else:
+            try:
+                return max(gs._trials, key=lambda x: x["result"]["score"])
+            except Exception as e:
+                err = ("Could not retrieve the best trial. "
+                       "Exit with error: {}".format(e))
+                self._logger.log_and_raise_error(err)
+
+    @property
+    def best_trial_score(self) -> float:
+        '''
+        '''
+        if self._trials is None:
+            self._logger.error(("Trials object is empty. "
+                                "Call run_trials method."))
+        else:
+            try:
+                return self.best_trial["result"]["score"]
+            except Exception as e:
+                err = ("Could not retrieve the best trial. "
+                       "Exit with error: {}".format(e))
+                self._logger.log_and_raise_error(err)
+
+    @property
+    def best_trial_score_variance(self) -> float:
+        '''
+        '''
+        if self._trials is None:
+            self._logger.error(("Trials object is empty. "
+                                "Call run_trials method."))
+        else:
+            try:
+                return self.best_trial["result"]["score_variance"]
+            except Exception as e:
+                err = ("Could not retrieve the best trial. "
+                       "Exit with error: {}".format(e))
+                self._logger.log_and_raise_error(err)
+
+    @property
+    def best_trial_pipeline(self) -> Pipeline:
+        '''
+        '''
+        if self._trials is None:
+            self._logger.error(("Trials object is empty. "
+                                "Call run_trials method."))
+        else:
+            try:
+                return self.best_trial["pipeline"]
+            except Exception as e:
+                err = ("Could not retrieve the best trial. "
+                       "Exit with error: {}".format(e))
+                self._logger.log_and_raise_error(err)
+
+
+if __name__ == "__main__":
+
+    # Small test
+
+    from sklearn.datasets import load_breast_cancer
+    from sklearn.metrics import accuracy_score
+    from cdplib.gridsearch.space_sample import space
+    from cdplib.log import Log
+
+    logger = Log("GridSearchPipelineSelector__TEST:")
+
+    logger.info("Start test")
+
+    data_loader = load_breast_cancer()
+
+    X = data_loader["data"]
+    y = data_loader["target"]
+
+    gs = GridSearchPipelineSelector(cost_func=accuracy_score,
+                                    greater_is_better=True,
+                                    trials_path="trials_TEST.pkl")
+
+    gs.attach_space(space=space)
+    gs.attach_data(X_train=X, y_train=y, cv=5)
+
+    gs.run_trials()
+
+    logger.info("Best trial: {}".format(gs.best_trial))
+    logger.info("Best trial: {}".format(gs.best_trial_pipeline))
+
+    logger.info("End test")

+ 33 - 0
cdplib/gridsearch/space_sample.py

@@ -0,0 +1,33 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Mon Oct  5 09:50:24 2020
+
+@author: tanya
+"""
+
+from sklearn.ensemble import RandomForestClassifier
+from sklearn.feature_selection import SelectKBest
+from sklearn.linear_model import LogisticRegression
+from sklearn.decomposition import PCA
+from sklearn.pipeline import Pipeline
+from sklearn.preprocessing import StandardScaler
+
+
+space = [
+        {"name": "std_scaler_kbest_rf",
+         "pipeline": Pipeline([
+                 ("std_scaler", StandardScaler()),
+                 ("kbest", SelectKBest()),
+                 ("rf", RandomForestClassifier())]),
+         "params": {"kbest__k": [2, 3],
+                    "rf__n_estimators": [10, 20]}},
+
+        {"name": "std_scaler_pca_lr",
+         "pipeline": Pipeline([
+                 ("std_scaler", StandardScaler()),
+                 ("pca", PCA()),
+                 ("lr", LogisticRegression())]),
+         "params": {"lr__C": [0.5, 1],
+                    "pca__n_components": [2, 3]}}
+        ]

+ 85 - 55
cdplib/pipeline_selector/PipelineSelector.py

@@ -4,6 +4,13 @@
 Created on Wed Sep 30 14:23:23 2020
 
 @author: tanya
+@description: an abstract class for selecting a machine learning
+ pipeline in a space of parameter distributions over multiple pipelines.
+ The selection is though in such a way that a Trials object is being
+ maintained during the tuning process from which one can retrieve
+ the best pipeline so far as well as the entire tuning history
+ if needed.
+ Children classes: hyperopt and custom gridsearch.
 """
 
 import pickle
@@ -14,7 +21,7 @@ import datetime
 from typing import Callable
 import numpy as np
 import pandas as pd
-from abc import ABC, abstractmethod
+from abc import ABC, abstractmethod, abstractproperty
 from sklearn.pipeline import Pipeline
 from sklearn.model_selection import cross_validate as sklearn_cross_validator
 from sklearn.metrics import make_scorer
@@ -28,14 +35,19 @@ sys.path.append(os.getcwd())
 
 class PipelineSelector(ABC):
     """
+    An abstract class for selecting a machine learning
+    pipeline in a space of parameter distributions over multiple pipelines.
+    The selection is though in such a way that a Trials object is being
+    maintained during the tuning process from which one can retrieve
+    the best pipeline so far as well as the entire tuning history
+    if needed.
     """
     def __init__(self,
                  cost_func,
                  greater_is_better: bool,
                  trials_path: str,
                  backup_trials_freq: int = 1,
-                 log_path: str = None,
-                 averaging_func: callable = None):
+                 cross_val_averaging_func: callable = None):
         '''
         :param callable cost_func: function to minimize or maximize
 
@@ -55,27 +67,31 @@ class PipelineSelector(ABC):
 
         :param str log_path: Optional, when not provided logs to stdout.
 
-        :param callable averaging_func: optional,
+        :param callable cross_val_averaging_func: optional,
             when not provided set to mean. Function
             to aggregate the cross-validated values of the cost function.
             Classic situation is to take the mean,
             another example is, for example mean() - c*var().
         '''
-
-        assert(callable(cost_func)),\
-            "Parameter 'cost_func' must be a callable"
-
-        assert(isinstance(greater_is_better, bool)),\
-            "Parameter 'greater_is_better' must be bool type"
-
-        assert(isinstance(trials_path, str)),\
-            "Parameter 'trials_path' must be of string type"
-
-        if averaging_func is not None:
-            assert(callable(averaging_func)),\
-                "Parameter 'averaging_func' must be a callable"
-
-        self._logger = Log("PipelineSelector")
+        self._logger = Log("PipelineSelector: ")
+
+        input_errors = [(cost_func, Callable,
+                         "Parameter 'cost_func' must be a callable"),
+                        (greater_is_better, bool,
+                         "Parameter 'greater_is_better' must be bool type"),
+                        (trials_path, str,
+                         "Parameter 'trials_path' must be of string type"),
+                        (cross_val_averaging_func, (Callable, None.__class__),
+                         ("Parameter 'cross_val_averaging_func'"
+                          "must be a callable")),
+                        (backup_trials_freq, int,
+                         "Parameter backup_trials_freq must be an int")]
+
+        for p, t, err in input_errors:
+            try:
+                assert(isinstance(p, t))
+            except AssertionError:
+                self._logger.log_and_raise_error(err)
 
         ExceptionsHandler(self._logger).assert_is_directory(path=trials_path)
 
@@ -83,16 +99,18 @@ class PipelineSelector(ABC):
         # is 1 when cost_func is minimized, -1 when cost func is maximized
         self._score_factor = (not greater_is_better) - greater_is_better
         self._trials_path = trials_path
-        # is initialized with empty trials object
-        self._trials = None
         self._backup_trials_freq = backup_trials_freq
-        self._averaging_func = averaging_func or np.mean
+        self._cross_val_averaging_func = cross_val_averaging_func or np.mean
         # keeping track of the current search iteration
         self._run_number = 0
         # space and data need to be attached to perform search.
         self._space_attached = False
         self._data_attached = False
         self._cross_validator_attached = False
+        # _best_score is the same as best_trial_score property
+        # but is defined in order not to go through all the trials
+        # at each iteration.
+        self._best_score = np.nan
 
         # if a trials object already exists at the given path,
         # it is loaded and the search is continued. Else,
@@ -102,20 +120,23 @@ class PipelineSelector(ABC):
                 with open(trials_path, "rb") as f:
                     self._trials = pickle.load(f)
 
+                self._best_score = self.best_trial_score
+
                 self._logger.info(("Loaded an existing trials object"
                                    "Consisting of {} trials")
                                   .format(len(self._trials.trials)))
 
             except Exception as e:
-                self._logger.error(("Trials object could not be loaded. "
-                                    "Training starts from the beginning. "
-                                    "Exit with error {}").format(e))
+                err = ("Trials object could not be loaded. "
+                       "Exit with error {}").format(e)
+                self._logger.log_and_raise_error(err)
+                self._trials = None
 
         else:
-            self._logger.info(("No existing trials object was found"
-                               "Initialized an empty trials object."))
+            self._logger.warning(("No existing trials object was found, "
+                                  "Starting from scratch."))
 
-        self._best_score = self.best_trial_score
+            self._trials = None
 
     def _backup_trials(self):
         '''
@@ -140,9 +161,13 @@ class PipelineSelector(ABC):
             a python module. Optional when the space
             is provided directly.
         """
-        assert((cross_validator is not None) or
-               ((module_path is not None) and (name is not None))),\
-            "Either space or (module_path, name) must be provided"
+        try:
+            assert((cross_validator is not None) or
+                   ((module_path is not None) and (name is not None)))
+        except AssertionError:
+            err = ("Either cross_validator or "
+                   "(module_path, name) must be provided")
+            self._logger.log_and_raise_error(err)
 
         self._cross_validator = cross_validator or\
             LoadingUtils().load_from_module(module_path=module_path, name=name)
@@ -169,9 +194,12 @@ class PipelineSelector(ABC):
             a python module. Optional when the space
             is provided directly.
         '''
-        assert((space is not None) or
-               ((module_path is not None) and (name is not None))),\
-            "Either space or (module_path, name) must be provided"
+        try:
+            assert((space is not None) or
+                   ((module_path is not None) and (name is not None)))
+        except AssertionError:
+            err = "Either space or (module_path, name) must be provided"
+            self._logger.log_and_raise_error(err)
 
         self._space = space or LoadingUtils().load_from_module(
                 module_path=module_path, name=name)
@@ -214,7 +242,8 @@ class PipelineSelector(ABC):
                        isinstance(y_train, (pd.Series, np.array,
                                             pd.DataFrame, NoneType)) and
                        isinstance(y_val, (pd.Series, np.array)) and
-                       (type(y_train) == type(y_val)))
+                       ((y_val is None) if (y_train is None)
+                        else (y_val is not None)))
             except AssertionError:
                 self._logger.log_and_raise_error(input_err)
 
@@ -276,7 +305,7 @@ class PipelineSelector(ABC):
                 scoring=make_scorer(self._cost_func),
                 error_score=np.nan)
 
-        return {'value': self._averaging_func(scores['test_score']),
+        return {'value': self._cross_val_averaging_func(scores['test_score']),
                 'variance': np.var(scores['test_score'])}
 
     def _objective(self, space_element: dict) -> dict:
@@ -388,26 +417,27 @@ class PipelineSelector(ABC):
         """
         """
         pass
-        self._trials = self._trials or []
-
-        finished_combinations = [trial["combination"]
-                                 for trial in self._trials]
 
-        for space_element in self._space:
-            combination = [(trial["name"],
-                            [(k, trial["params"][k])
-                            for k in trial["params"]])
-                           for trial in self._trials]
-
-            if combination not in finished_combinations:
-
-                result = self._objective(space_element)
-
-                pipeline = space_element["pipeline"].set_params(
-                        space_element["params"])
+    @abstractproperty
+    def best_trial(self) -> float:
+        """
+        """
+        pass
 
-                self._trials.append({"combination": combination,
-                                     "pipeline": pipeline,
-                                     "result": result})
+    @abstractproperty
+    def best_trial_score(self) -> float:
+        """
+        """
+        pass
 
+    @abstractproperty
+    def best_trial_score_variance(self) -> float:
+        """
+        """
+        pass
 
+    @abstractproperty
+    def best_trial_pipeline(self) -> Pipeline:
+        """
+        """
+        pass

+ 1 - 1
cdplib/utils/LoadingUtils.py

@@ -9,7 +9,7 @@ Created on Thu Oct  1 12:58:58 2020
 
 import os
 import sys
-from cdp.log import Log
+from cdplib.log import Log
 
 
 class LoadingUtils: