소스 검색

added new classes LoadingUtils, PipelineSelector and GridsearchPipelineSelector

tanja 3 년 전
부모
커밋
c49f49730c
4개의 변경된 파일503개의 추가작업 그리고 18개의 파일을 삭제
  1. 16 0
      cdplib/gridsearch.py
  2. 413 0
      cdplib/pipeline_selector/PipelineSelector.py
  3. 28 18
      cdplib/utils/ExceptionsHandler.py
  4. 46 0
      cdplib/utils/LoadingUtils.py

+ 16 - 0
cdplib/gridsearch.py

@@ -0,0 +1,16 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Wed Sep 30 14:15:17 2020
+
+@author: tanya
+"""
+
+from typing import Callable
+import numpy as np
+import pickle
+import os
+
+
+class GridSearchPipelineSelection:
+

+ 413 - 0
cdplib/pipeline_selector/PipelineSelector.py

@@ -0,0 +1,413 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Wed Sep 30 14:23:23 2020
+
+@author: tanya
+"""
+
+import pickle
+import os
+import sys
+import time
+import datetime
+from typing import Callable
+import numpy as np
+import pandas as pd
+from abc import ABC, abstractmethod
+from sklearn.pipeline import Pipeline
+from sklearn.model_selection import cross_validate as sklearn_cross_validator
+from sklearn.metrics import make_scorer
+from hyperopt import STATUS_OK, STATUS_FAIL
+from cdplib.log import Log
+from cdplib.utils import ExceptionsHandler
+from cdplib.utils import LoadingUtils
+
+sys.path.append(os.getcwd())
+
+
+class PipelineSelector(ABC):
+    """
+    """
+    def __init__(self,
+                 cost_func,
+                 greater_is_better: bool,
+                 trials_path: str,
+                 backup_trials_freq: int = 1,
+                 log_path: str = None,
+                 averaging_func: callable = None):
+        '''
+        :param callable cost_func: function to minimize or maximize
+
+        :param bool greater_is_better: when True
+            cost_func is maximized, else minimized.
+
+        :param str trials_path: path at which the trials object is saved
+            in binary format. From the trials object we can
+            select information about the obtained scores, score variations,
+            and pipelines, and parameters tried out so far. If a trials object
+            already exists at the given path, it is loaded and the
+            search is continued, else, the search is started from
+            the beginning.
+
+        :param backup_trials_freq: frequecy in interations (trials)
+            of saving the trials object at the trials_path.
+
+        :param str log_path: Optional, when not provided logs to stdout.
+
+        :param callable averaging_func: optional,
+            when not provided set to mean. Function
+            to aggregate the cross-validated values of the cost function.
+            Classic situation is to take the mean,
+            another example is, for example mean() - c*var().
+        '''
+
+        assert(callable(cost_func)),\
+            "Parameter 'cost_func' must be a callable"
+
+        assert(isinstance(greater_is_better, bool)),\
+            "Parameter 'greater_is_better' must be bool type"
+
+        assert(isinstance(trials_path, str)),\
+            "Parameter 'trials_path' must be of string type"
+
+        if averaging_func is not None:
+            assert(callable(averaging_func)),\
+                "Parameter 'averaging_func' must be a callable"
+
+        self._logger = Log("PipelineSelector")
+
+        ExceptionsHandler(self._logger).assert_is_directory(path=trials_path)
+
+        self._cost_func = cost_func
+        # is 1 when cost_func is minimized, -1 when cost func is maximized
+        self._score_factor = (not greater_is_better) - greater_is_better
+        self._trials_path = trials_path
+        # is initialized with empty trials object
+        self._trials = None
+        self._backup_trials_freq = backup_trials_freq
+        self._averaging_func = averaging_func or np.mean
+        # keeping track of the current search iteration
+        self._run_number = 0
+        # space and data need to be attached to perform search.
+        self._space_attached = False
+        self._data_attached = False
+        self._cross_validator_attached = False
+
+        # if a trials object already exists at the given path,
+        # it is loaded and the search is continued. Else,
+        # the search is started from the beginning.
+        if os.path.isfile(trials_path):
+            try:
+                with open(trials_path, "rb") as f:
+                    self._trials = pickle.load(f)
+
+                self._logger.info(("Loaded an existing trials object"
+                                   "Consisting of {} trials")
+                                  .format(len(self._trials.trials)))
+
+            except Exception as e:
+                self._logger.error(("Trials object could not be loaded. "
+                                    "Training starts from the beginning. "
+                                    "Exit with error {}").format(e))
+
+        else:
+            self._logger.info(("No existing trials object was found"
+                               "Initialized an empty trials object."))
+
+        self._best_score = self.best_trial_score
+
+    def _backup_trials(self):
+        '''
+        Pickles (Saves) the trials object.
+        Used in a scheduler.
+        '''
+        with open(self._trials_path, "wb") as f:
+            pickle.dump(self._trials, f)
+
+    def attach_cross_validator(self, cross_validator: Callable = None,
+                               module_path: str = None,
+                               name: str = None):
+        """
+        Method for attaching a custom cross-validation function
+        :param cross_validator: a function that has the same
+             signature as sklearn.model_selection.cross_validate
+        :param str module_path: path to python module
+            where the space is defined. Optional when
+            the space is provided directly.
+
+        :param str name: name of the space loaded from
+            a python module. Optional when the space
+            is provided directly.
+        """
+        assert((cross_validator is not None) or
+               ((module_path is not None) and (name is not None))),\
+            "Either space or (module_path, name) must be provided"
+
+        self._cross_validator = cross_validator or\
+            LoadingUtils().load_from_module(module_path=module_path, name=name)
+
+        self._logger.info("Attached a cross validator")
+        self._cross_validator_attached = True
+
+    def attach_space(self, space=None,
+                     module_path: str = None,
+                     name: str = None):
+        '''
+        :param space: space where
+            the search is performed. Optional when a space
+            is loaded from a python module. A space might be either
+            a list of dictionaries or a hyperopt space object
+            the elements of which are dictionaries with keys:
+            name, pipeline, params
+
+        :param str module_path: path to python module
+            where the space is defined. Optional when
+            the space is provided directly.
+
+        :param str name: name of the space loaded from
+            a python module. Optional when the space
+            is provided directly.
+        '''
+        assert((space is not None) or
+               ((module_path is not None) and (name is not None))),\
+            "Either space or (module_path, name) must be provided"
+
+        self._space = space or LoadingUtils().load_from_module(
+                module_path=module_path, name=name)
+
+        self._logger.info("Attached parameter distribution space")
+        self._space_attached = True
+
+    def attach_data(self, X_train: (pd.DataFrame, np.ndarray),
+                    y_train: (pd.DataFrame, pd.Series, np.ndarray) = None,
+                    X_val: (pd.DataFrame, np.ndarray) = None,
+                    y_val: (pd.DataFrame, pd.Series, np.ndarray) = None,
+                    cv: (list, int) = None):
+        '''
+        :param array X_train: data on which
+            machine learning pipelines are trained
+
+        :param array y_train: optional, vector with targets,
+            (not all algorithms require a targets)
+
+        :param array X_val: optional, validation data.
+            When not provided, cross-validated value
+            of the cost_func is calculated.
+
+        :param array y_val: optional, validation targets
+
+        :param list cv: list of tuples containing
+            train and validation indices or an integer representing
+            the number of folds for a random split of data
+            during cross-validation
+            example: [([0,1,2], [3,4]), ([1,2,3], [4,5])]
+        '''
+        NoneType = None.__classe__
+
+        input_err = "Non-valid combination of train and val data types"
+
+        if cv is None:
+            try:
+                assert(isinstance(X_train, (pd.DataFrame, np.array)) and
+                       isinstance(X_val, (pd.DataFrame, np.array)) and
+                       isinstance(y_train, (pd.Series, np.array,
+                                            pd.DataFrame, NoneType)) and
+                       isinstance(y_val, (pd.Series, np.array)) and
+                       (type(y_train) == type(y_val)))
+            except AssertionError:
+                self._logger.log_and_raise_error(input_err)
+
+            # cost is evaluated with a cross validation function
+            # that accepts an array and a cv object with
+            # indices of the fold splits.
+            # Here we create a trivial cv object
+            # with one validation split.
+
+            train_inds = list(range(len(X_train)))
+            val_inds = list(range(len(X_train),
+                                  len(X_train) + len(X_val)))
+
+            self._cv = [(train_inds, val_inds)]
+            self._X = np.concatenate([X_train, X_val])
+            self._y = None if y_train is None\
+                else np.concatenate([y_train, y_val])
+
+        else:
+            try:
+                assert(isinstance(X_train, (pd.DataFrame, np.array)) and
+                       isinstance(y_train, (pd.Series, np.array,
+                                            pd.DataFrame, NoneType)) and
+                       (X_val is None) and (y_val is None))
+            except AssertionError:
+                self._logger.log_and_raise_error(input_err)
+
+            self._cv = cv
+            self._X = X_train
+            self._y = y_train
+
+        self._logger.info("Attached data")
+        self._data_attached = True
+
+    def _evaluate(self, pipeline: Pipeline) -> dict:
+        '''
+        This method is called in _objective.
+
+        Calculates the cost on the attached data.
+        This function can be overriden, when the cost
+        needs to be calculated differently,
+        for example with a tensorflow model.
+
+        :param Pipeline pipeline: machine learning pipeline
+            that will be evaluated with cross-validation
+
+        :output: dictionary with the aggregated
+            cross-validation score and
+            the score variance.
+        '''
+        if not self._cross_validator_attached:
+            self._cross_validator = sklearn_cross_validator
+
+        scores = self._cross_validator(
+                estimator=pipeline,
+                X=self._X,
+                y=self._y,
+                cv=self._cv or 5,
+                scoring=make_scorer(self._cost_func),
+                error_score=np.nan)
+
+        return {'value': self._averaging_func(scores['test_score']),
+                'variance': np.var(scores['test_score'])}
+
+    def _objective(self, space_element: dict) -> dict:
+        '''
+        This method is called in search_for_best_pipeline
+        inside the hyperopt fmin method.
+
+        Uses _evaluate method.
+
+        It must take as input a space element
+        and produce an output in the form of dictionary
+        with 2 obligatory values loss and status
+        (STATUS_OK or STATUS_FAIL). Other
+        values in the output are optional and can be
+        accessed later through the trials object.
+
+        :Warning: fmin minimizes the loss,
+        when _evaluate returns a value to be maximized,
+        it should be multiplied by -1 to obtain loss.
+
+        :param dict space_element: must contain keys
+            name (with the name of the pipeline),
+            pipeline (Pipeline object),
+            params (dict of pipeline params)
+
+        :output: dictionary with keys
+            loss (minimized value),
+            status with values STATUS_OK or STATUS_FAIL
+            uderstood by hyperopt,
+            score (equal to loss or -loss),
+            score_variance,
+            timestamp (end of execution),
+            train_time: execution time
+        '''
+        try:
+            assert(isinstance(space_element, dict) and
+                   set(['name', 'pipeline', 'params']) <= space_element.keys())
+
+            assert(isinstance(space_element['name'], str) and
+                   isinstance(space_element['pipeline'], Pipeline) and
+                   isinstance(space_element['params'], dict))
+
+        except AssertionError:
+            err = "Space elements are of wrong form"
+            self._logger.log_and_raise_error(err)
+
+        start_time = time.time()
+
+        if not self._data_attached:
+            err = ("Data must be attached in order "
+                   "in order to effectuate the best"
+                   "pipeline search")
+            self._logger.log_and_raise_error(err)
+
+        self._run_number += 1
+
+        pipeline = space_element['pipeline']
+        params = space_element['params']
+        pipeline.set_params(**params)
+
+        self._logger.info(("Run number {0}: "
+                           "Current score is {1}: "
+                           "Training pipeline {2} "
+                           "with parameters: {3}. ").format(
+                             self._run_number,
+                             self._best_score,
+                             space_element['name'],
+                             params))
+
+        try:
+            result = self._evaluate(pipeline)
+
+            assert(not np.isnan(result["value"]))
+
+            if self._run_number % self._backup_trials_freq == 0:
+                self._backup_trials()
+
+            if (self._best_score != self._best_score) or\
+                self._score_factor*result["value"] <\
+                    self._score_factor*self._best_score:
+
+                self._logger.info("Score got better, new best score is: {}"
+                                  .format(result["value"]))
+
+                self._best_score = result['value']
+
+            end_time = time.time()
+
+            return {'loss': self._score_factor * result["value"],
+                    'status': STATUS_OK,
+                    'score': result["value"],
+                    'score_variance': result["variance"],
+                    'timestamp': datetime.datetime.today(),
+                    'train_time': end_time - start_time}
+
+        except Exception as e:
+
+            self._logger.warning("Trial failed with error {}".format(e))
+
+            return {'loss': np.nan,
+                    'status': STATUS_FAIL,
+                    'score': np.nan,
+                    'score_variance': np.nan,
+                    'timestamp': datetime.datetime.today(),
+                    'train_time': np.nan}
+
+    @abstractmethod
+    def run_trials(self):
+        """
+        """
+        pass
+        self._trials = self._trials or []
+
+        finished_combinations = [trial["combination"]
+                                 for trial in self._trials]
+
+        for space_element in self._space:
+            combination = [(trial["name"],
+                            [(k, trial["params"][k])
+                            for k in trial["params"]])
+                           for trial in self._trials]
+
+            if combination not in finished_combinations:
+
+                result = self._objective(space_element)
+
+                pipeline = space_element["pipeline"].set_params(
+                        space_element["params"])
+
+                self._trials.append({"combination": combination,
+                                     "pipeline": pipeline,
+                                     "result": result})
+
+

+ 28 - 18
cdplib/utils/ExceptionsHandler.py

@@ -8,35 +8,45 @@ Created on Fri Sep 27 14:20:58 2019
 
 import os
 import sys
-import logging
 import pandas as pd
+from cdplib.log import Log
+
 sys.path.append(os.getcwd())
 
 
 class ExceptionsHandler:
     '''
     '''
-    def __init__(self):
+    def __init__(self, logger: Log = None):
         '''
         '''
+        self._logger = logger or Log("ExceptionHandler")
 
-    def check_is_file(self, path, logger=None):
+    def check_is_file(self, path: str):
         '''
         '''
-        if logger is None:
-            logger = logging.getLogger()
-
         if not os.path.isfile(path):
             err = "File {} not found".format(path)
-            logger.error(err)
+            self._logger.error(err)
             raise FileNotFoundError(err)
 
-    def _check_column_abscence(self, columns: (str, list), data: pd.DataFrame,
-                               error_or_warning: str, logger = None):
+    def assert_is_directory(self, path: str):
+        ""
+        ""
+        assert(isinstance(path, str)),\
+            "Parameter 'path' must of str type"
+
+        dirname = os.path.dirname("path")
+
+        if len(dirname) > 0:
+            os.mkdir(dirname, exists_ok=True)
+
+    def _check_column_abscence(self,
+                               columns: (str, list),
+                               data: pd.DataFrame,
+                               error_or_warning: str):
         '''
         '''
-        if logger is None:
-            logger = logging.getLogger()
         if isinstance(columns, str):
             columns = [columns]
 
@@ -44,23 +54,23 @@ class ExceptionsHandler:
 
             if column not in data.columns:
                 err = ("{} is not an internal column name".format(column))
-                getattr(logger, error_or_warning)(err)
+                getattr(self._logger, error_or_warning)(err)
 
                 if error_or_warning == "error":
                     raise Exception(err)
 
-    def error_column_abscence(self, columns: (str, list), data: pd.DataFrame, logger = None):
+    def error_column_abscence(self,
+                              columns: (str, list),
+                              data: pd.DataFrame):
         '''
         '''
         return self._check_column_abscence(columns=columns,
                                            data=data,
-                                           error_or_warning="error",
-                                           logger=logger)
+                                           error_or_warning="error")
 
-    def warn_column_abscence(self, columns: (str, list), data: pd.DataFrame, logger = None):
+    def warn_column_abscence(self, columns: (str, list), data: pd.DataFrame):
         '''
         '''
         return self._check_column_abscence(columns=columns,
                                            data=data,
-                                           error_or_warning="warning",
-                                           logger=logger)
+                                           error_or_warning="warning")

+ 46 - 0
cdplib/utils/LoadingUtils.py

@@ -0,0 +1,46 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Thu Oct  1 12:58:58 2020
+
+@author: tanya
+@description: class for methods of loading data from external sources
+"""
+
+import os
+import sys
+from cdp.log import Log
+
+
+class LoadingUtils:
+    """
+    """
+    def __init__(self, logger=None):
+        """
+        """
+        self._logger = logger or Log("LoadingUtils")
+
+    def load_from_module(self, module_path: str, name: str):
+        """
+        """
+        for p in ["modele_path", "name"]:
+            assert(isinstance(p, str)),\
+                "Parameter '{}' must be of str type".format(p)
+
+            assert(os.path.isfile(module_path)),\
+                "Parameter 'module_path' must be a valid file"
+
+            module, extension = os.path.splitext(os.path.basename(module_path))
+
+            assert(extension == ",py"),\
+                "Parameter 'space' must be read from a python file"
+
+            sys.path.insert(module_path)
+
+            try:
+                from module import name
+                return name
+
+            except ImportError:
+                err = "Invalid space location or name"
+                self._logger.log_and_raise_error(err)