Selaa lähdekoodia

added gridsearch parameter selection and a parent abstract pipeline selector

tanja 3 vuotta sitten
vanhempi
commit
5d4bb77653

+ 173 - 0
cdplib/fine_tuning/FineTunedClassiferCV.py

@@ -0,0 +1,173 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Thu Apr 23 08:51:53 2020
+
+@author: tanya
+
+@description: class for fine-tuning a sklearn classifier
+(optimizing the probability threshold)
+"""
+
+import pandas as pd
+import numpy as np
+
+from typing import Callable
+
+from sklearn.base import (BaseEstimator, ClassifierMixin,
+                          clone, MetaEstimatorMixin)
+
+from cdplib.log import Log
+
+from cdplib.utils.TyperConverter import TypeConverter
+
+
+class FineTunedClassifierCV(BaseEstimator, ClassifierMixin,
+                            MetaEstimatorMixin):
+    """
+    Probability threshold tuning for a given estimator.
+    Overrides the method predict of the given sklearn classifer
+    and returns predictions with the optimal value of
+    the probability threshold.
+
+    An object of this class can be passed to an sklearn Pipeline
+    """
+    def __init__(self, estimator, cost_func: Callable, greater_is_better: bool,
+                 cv=None, threshold_step: float = 0.1):
+        """
+        """
+        self.estimator = estimator
+
+        self.is_fitted = False
+
+        self.greater_is_better = greater_is_better
+
+        if cv is None:
+            self.cv = ...
+        else:
+            self.cv = cv
+
+        self.cost_func = cost_func
+
+        self.threshold_step = threshold_step
+
+        self.optimal_threshold = 0.5
+
+        self._logger = Log("FineTunedClassifyCV")
+
+    def _get_best_threshold(self, y_val: (pd.DataFrame, np.array),
+                            proba_pred: (pd.DataFrame, np.array)):
+        '''
+        '''
+        costs = {}
+
+        for t in np.arange(self.threshold_step, 1, self.threshold_step):
+            costs[t] = self.cost_func(y_val, (proba_pred >= t).astype(int))
+
+        if self.greater_is_better:
+            return max(costs, key=costs.get)
+        else:
+            return min(costs, key=costs.get)
+
+    def fit(self, X: (pd.DataFrame, np.array),
+            y: (pd.DataFrame, np.array) = None,
+            **fit_args):
+        """
+        """
+        X = TypeConverter().convert_to_ndarray(X)
+        if y is not None:
+            y = TypeConverter().convert_to_ndarray(X)
+
+        optimal_thrs_per_fold = []
+
+        for train_inds, val_inds in self.cv:
+            X_train, X_val = X[train_inds], X[val_inds]
+
+            if y is not None:
+                y_train, y_val = y[train_inds], y[val_inds]
+            else:
+                y_train, y_val = None, None
+
+            estimator = clone(fine_tuned_clf.estimator)
+
+            estimator.fit(X_train, y_train, **fit_args)
+
+            proba_pred = estimator.predict_proba(X_val)
+
+            optimal_thr = self._get_best_threshold(y_val, proba_pred)
+
+            optimal_thrs_per_fold.append(optimal_thr)
+
+        self.optimal_threshold = np.mean(optimal_thrs_per_fold)
+
+        self.estimator.fit(X, **fit_args)
+
+    def predict(self, X: (pd.DataFrame, np.array)) -> np.array:
+        """
+        """
+        if self.is_fitted:
+
+            proba_pred = self.estimator.predict_proba(X)
+
+            return (proba_pred >= self.optimal_threshold).astype(int)
+
+        else:
+            self._logger.warn("You should fit first")
+
+    def get_params(self):
+        """
+        """
+        params = self.estimator.get_params()
+
+        params.update({"cv": self.cv, "cost_func": self.cost_func})
+
+        return params
+
+    def set_params(self, **params: dict):
+        """
+        """
+        for param in params:
+            if param == "cv":
+                self.cv = params[param]
+                params.pop(param)
+
+            elif param == "cost_func":
+                self.cost_func = params[param]
+                params.pop(param)
+
+        self.estimator.set_params(**params)
+
+
+if __name__ == "__main__":
+    # test
+    from sklearn.datasets import load_iris
+    from sklearn.metrics import accuracy_score
+    import gc
+    from xgboost import XGBRFClassifier
+
+    data = load_iris()
+    X, y = data["data"], data["target"]
+    y = (y==1).astype(int)
+    del data
+    gc.collect()
+
+    # make a custom cv object
+    val_len = len(X)//10
+    split_inds = range(len(X)//2, len(X), val_len)
+
+    cv = []
+
+    for i in split_inds:
+        train_inds = list(range(i))
+        val_inds = list(range(i, i + val_len))
+        cv.append((train_inds, val_inds))
+
+    clf = XGBRFClassifier()
+
+    fine_tuned_clf = FineTunedClassifierCV(estimator=clf,
+                                           cv=cv,
+                                           greater_is_better=True,
+                                           cost_func=accuracy_score)
+
+    fine_tuned_clf.fit(X=X, y=y)
+

+ 1 - 0
cdplib/fine_tuning/__init__.py

@@ -0,0 +1 @@
+from .FineTunedClassiferCV import *

+ 375 - 0
cdplib/gridsearch/GridSearchPipelineSelector.py

@@ -0,0 +1,375 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Wed Sep 30 14:15:17 2020
+
+@author: tanya
+@description:a class for selecting a machine learning
+ pipeline from a deterministic space of parameter distributions
+ over multiple pipelines.
+ The selection is though in such a way that a Trials object is being
+ maintained during the tuning process from which one can retrieve
+ the best pipeline so far as well as the entire tuning history
+ if needed.
+"""
+
+import os
+import datetime
+import numpy as np
+from itertools import product
+from collections import ChainMap
+from sklearn.pipeline import Pipeline
+from typing import Callable, Optional, Literal, Dict, Union, List
+
+from cdplib.pipeline_selector.PipelineSelector import PipelineSelector
+
+
+class GridSearchPipelineSelector(PipelineSelector):
+    """
+    A class for selecting a machine learning
+     pipeline from a deterministic space of parameter distributions
+     over multiple pipelines.
+     The selection is though in such a way that a Trials object is being
+     maintained during the tuning process from which one can retrieve
+     the best pipeline so far as well as the entire tuning history
+     if needed.
+    """
+    def __init__(self,
+                 cost_func: Union[Callable, str],
+                 greater_is_better: bool,
+                 trials_path: str,
+                 backup_trials_freq: Optional[int] = None,
+                 cross_val_averaging_func: Callable = np.mean,
+                 additional_metrics: Optional[Dict[str, Callable]] = None,
+                 strategy_name: Optional[str] = None,
+                 stdout_log_level: Literal["INFO", "WARNING", "ERROR"]
+                 = "INFO"
+                 ):
+        """
+        ::param Callable cost_func: function to minimize or maximize
+            over the elements of a given (pipeline/hyperparameter) space
+
+        :param bool greater_is_better: when True
+            cost_func is maximized, else minimized.
+
+        :param str trials_path: path at which the trials object is saved
+            in binary format. From the trials object we can
+            select information about the obtained scores, score variations,
+            and pipelines, and parameters tried out so far. If a trials object
+            already exists at the given path, it is loaded and the
+            search is continued, else, the search is started from scratch.
+
+        :param backup_trials_freq: frequecy in interations (trials)
+            of saving the trials object at the trials_path.
+            if None, the trials object is backed up avery time
+            the score improves.
+
+        :param Callable cross_val_averaging_func: Function to aggregate
+            the cross-validation scores.
+            Example different from the mean: mean - c*var.
+
+        :param additional_metics: dict of additional metrics to save
+            of the form {"metric_name": metric} where metric is a Callable.
+
+        :param str strategy_name:
+            a strategy is defined by the data set (columns/features and rows),
+            cv object, cost function.
+            When the strategy changes, one must start with new trials.
+
+        :param str stdout_log_level: can be INFO, WARNING, ERROR
+        """
+        try:
+
+            super().__init__(cost_func=cost_func,
+                             greater_is_better=greater_is_better,
+                             trials_path=trials_path,
+                             backup_trials_freq=backup_trials_freq,
+                             cross_val_averaging_func=cross_val_averaging_func,
+                             additional_metrics=additional_metrics,
+                             strategy_name=strategy_name,
+                             stdout_log_level=stdout_log_level)
+
+            self._logger = Log("GridsearchPipelineSelector: ",
+                               stdout_log_level=stdout_log_level)
+
+            self._trials = self._trials or []
+
+        except Exception as e:
+            err = "Failed initialization. Exit with error: {}".format(e)
+
+            self._logger.log_and_raise_error(err)
+
+    def run_trials(self) -> None:
+        """
+        """
+        try:
+            assert(self.attached_space),\
+                "Parameter distribution space must be attached"
+
+            done_trial_ids = [{"name": trial["name"],
+                               "params": trial["params"],
+                               "status": trial["status"]}
+                              for trial in self._trials]
+
+            # list (generator) of (flattened) dictionaries
+            # with all different combinations of
+            # parameters for different pipelines
+            # from the space definition.
+            space_unfolded = ({"name": param_dist["name"],
+                               "pipeline": param_dist["pipeline"],
+                               "params": param_set}
+                              for param_dist in self._space
+                              for param_set in
+                              (dict(ChainMap(*tup)) for tup in
+                               product(*[[{k: v} for v in
+                                          param_dist["params"][k]]
+                                         for k in param_dist["params"]])))
+
+            for space_element in space_unfolded:
+
+                # uniquely identifies the current space element
+                trial_id = {"name": space_element["name"],
+                            "params": space_element["params"],
+                            "status": 'ok'}
+
+                # verify if the current pipline/parameters
+                # were already tested before
+                if trial_id in done_trial_ids:
+                    continue
+
+                result = self._objective(space_element)
+
+                pipeline = space_element["pipeline"].set_params(
+                        **space_element["params"])
+
+                trial = {"name": space_element["name"],
+                         "params": space_element["params"],
+                         "pipeline": pipeline}
+
+                trial.update(result)
+
+                self._trials.append(trial)
+
+            self.finished_tuning = True
+
+            self.total_tuning_time = datetime.datetime.today()\
+                - self.start_tuning_time
+
+            self._backup_trials()
+
+        except Exception as e:
+            err = "Failed to run trials. Exit with error: {}".format(e)
+            self._logger.log_and_raise_error(err)
+
+    @property
+    def number_of_trials(self) -> Union[int, None]:
+        """
+        Number of trials already run in the current trials object
+        """
+        try:
+            return len(self._trials)
+
+        except Exception as e:
+            err = ("Failed to retrieve the number of trials. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    @property
+    def best_trial(self) -> Union[dict, None]:
+        """
+        """
+        try:
+            assert(len(self._trials) > 0),\
+                ("Trials object is empty. "
+                 "Call run_trials method.")
+
+            return max(self._trials, key=lambda x: x["score"])
+
+        except Exception as e:
+            err = ("Could not retrieve the best trial. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    @property
+    def best_trial_score(self) -> Union[float, None]:
+        '''
+        '''
+        try:
+            assert(len(self._trials) > 0),\
+                ("Trials object is empty. "
+                 "Call run_trials method.")
+
+            return self.best_trial["score"]
+
+        except Exception as e:
+            err = ("Could not retrieve the best trial. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    @property
+    def best_trial_score_variance(self) -> Union[float, None]:
+        '''
+        '''
+        try:
+            assert(len(self._trials) > 0),\
+                ("Trials object is empty. "
+                 "Call run_trials method.")
+
+            return self.best_trial["score_variance"]
+
+        except Exception as e:
+            err = ("Could not retrieve the best trial. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    @property
+    def best_trial_pipeline(self) -> Union[Pipeline, None]:
+        '''
+        '''
+        try:
+            assert(len(self._trials) > 0),\
+                ("Trials object is empty. "
+                 "Call run_trials method.")
+
+            return self.best_trial["pipeline"]
+
+        except Exception as e:
+            err = ("Could not retrieve the best trial. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    def get_n_best_trial_pipelines(self, n: int)\
+            -> Union[List[Pipeline], None]:
+        """
+        N best pipelines with corresponding
+        best hyperparameters
+        """
+        try:
+            assert(len(self._trials) > 0),\
+                ("Trials object is empty. "
+                 "Call run_trials method.")
+
+            return [trial["pipeline"] for trial in
+                    sorted(self._trials, key=lambda x: x["score"],
+                           reverse=True)[:n]]
+
+        except Exception as e:
+            err = ("Failed to retrieve n best trials. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    def get_n_best_trial_pipelines_of_each_type(self, n: int)\
+            -> Union[Dict[str, List[Pipeline]], None]:
+        """
+        If the hyperparameter search is done over multiple
+        pipelines, then returns n different pipeline-types
+        with corresponding hyperparameters
+        """
+        try:
+            assert(len(self._trials) > 0),\
+                ("Trials object is empty. "
+                 "Call run_trials method.")
+
+            return pd.DataFrame(self._trials)\
+                     .sort_values(by=["name", "score"],
+                                  ascending=False)\
+                     .groupby("name")\
+                     .head(n)\
+                     .groupby("name")["pipeline"]\
+                     .apply(lambda x: list(x))\
+                     .to_dict()
+
+        except Exception as e:
+            err = ("Failed to retrieve n best trials of each type."
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    def trials_to_excel(self, path: str) -> None:
+        """
+        Trials object in the shape of table written to excel,
+        should contain the run number, pipeline (as str),
+        hyperparamters (as str), self.best_result (see self._objective method)
+        as well as additional information configured
+        through self.save_result method.
+        """
+        try:
+            pd.DataFrame(self._trials).to_excel(path)
+
+        except Exception as e:
+            err = ("Failed to write trials to excel. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+
+if __name__ == "__main__":
+
+    # elementary example
+
+    from sklearn.datasets import load_breast_cancer
+    from sklearn.metrics import accuracy_score, precision_score
+    from cdplib.gridsearch.space_sample import space
+    from cdplib.log import Log
+    from cdplib.db_handlers import MongodbHandler
+    import pickle
+    import pandas as pd
+    import os
+
+    trials_path = "gridsearch_trials_TEST.pkl"
+    additional_metrics = {"precision": precision_score}
+    strategy_name = "strategy_1"
+    data_path = "data_TEST.h5"
+    cv_path = "cv_TEST.pkl"
+    collection_name = 'TEST_' + strategy_name
+
+    logger = Log("GridSearchPipelineSelector__TEST:")
+
+    logger.info("Start test")
+
+    data_loader = load_breast_cancer()
+
+    X = data_loader["data"]
+    y = data_loader["target"]
+
+    pd.DataFrame(X).to_hdf(data_path, key="X_train")
+    pd.Series(y).to_hdf(data_path, key="y_train")
+
+    cv = [(list(range(len(X)//3)), list(range(len(X)//3, len(X)))),
+          (list(range(2*len(X)//3)), list(range(2*len(X)//3, len(X))))]
+
+    pickle.dump(cv, open(cv_path, "wb"))
+
+    gs = GridSearchPipelineSelector(cost_func=accuracy_score,
+                                    greater_is_better=True,
+                                    trials_path=trials_path,
+                                    additional_metrics=additional_metrics,
+                                    strategy_name=strategy_name,
+                                    stdout_log_level="WARNING")
+
+    gs.attach_space(space=space)
+
+    gs.attach_data_from_hdf5(data_hdf5_store_path=data_path,
+                             cv_pickle_path=cv_path)
+
+    save_method = MongodbHandler().insert_data_into_collection
+    save_kwargs = {'collection_name': collection_name}
+
+    gs.configer_summary_saving(save_method=save_method,
+                               kwargs=save_kwargs)
+
+    gs.run_trials()
+
+    logger.info("Best trial: {}".format(gs.best_trial))
+    logger.info("Total tuning time: {}".format(gs.total_tuning_time))
+
+    for file in [trials_path, data_path, cv_path]:
+        os.remove(file)
+
+    logger.info("End test")

+ 2 - 0
cdplib/gridsearch/__init__.py

@@ -0,0 +1,2 @@
+from GridSearchPipelineSelector import *
+from .space_sample import *

+ 33 - 0
cdplib/gridsearch/space_sample.py

@@ -0,0 +1,33 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Mon Oct  5 09:50:24 2020
+
+@author: tanya
+"""
+
+from sklearn.ensemble import RandomForestClassifier
+from sklearn.feature_selection import SelectPercentile
+from sklearn.linear_model import LogisticRegression
+from sklearn.decomposition import PCA
+from sklearn.pipeline import Pipeline
+from sklearn.preprocessing import StandardScaler
+
+
+space = [
+        {"name": "std_scaler_kbest_rf",
+         "pipeline": Pipeline([
+                 ("std_scaler", StandardScaler()),
+                 ("kbest", SelectPercentile()),
+                 ("rf", RandomForestClassifier())]),
+         "params": {"kbest__percentile": [2, 3],
+                    "rf__n_estimators": [10, 20]}},
+
+        {"name": "std_scaler_pca_lr",
+         "pipeline": Pipeline([
+                 ("std_scaler", StandardScaler()),
+                 ("pca", PCA()),
+                 ("lr", LogisticRegression())]),
+         "params": {"lr__C": [0.5, 1],
+                    "pca__n_components": [2, 3]}}
+        ]

+ 85 - 0
cdplib/hyperparameter_space_composer/SpaceComposer.py

@@ -0,0 +1,85 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Wed Sep 30 13:54:04 2020
+
+@author: tanya
+@description: a class that from a given list of pipeline steps
+ composes a space to be passed in the GridsearchPipelineSelector
+ or HyperoptPipelineSelector classes.
+ A classic list of steps would be: [encoders, transformers, selectors, models]
+"""
+from sklearn.pipeline import Pipeline
+from hyperopt import hp
+from itertools import product
+
+
+class SpaceComposer:
+    """
+    A class that from a given list of pipeline steps
+    composes a space to be passed to GridsearchPipelineSelector
+    or HyperoptPipelineSelector.
+    """
+    def compose_gridsearch_space(self, step_list: list) -> list:
+        """
+        Composes a hyperparameter space for input to the
+        GridsearchPipelineSelector class.
+
+        :param step_list: a classic list of steps would be
+        [encoders, transformers, selectors, models],
+        where, for example, selectors is a list
+        of sklearn feature selectors, each selector given as a dict:
+        for example {"name": "kbest",
+                     "object": SelectPercentile(),
+                     "params": {
+                             "percentile":
+                                 [5, 10, 20],
+                             "score_func":
+                                 [f_classif, chi2, mutual_info_classif]}}
+
+        :return: a list of dictionaries of form
+            {"name": NAME, "pipeline": PIPELINE, "params": PARAMS}
+        """
+        space = []
+
+        step_combinations = product(*[step for step in
+                                      step_list if len(step) > 0])
+
+        for step_combination in step_combinations:
+
+            space_element = {}
+
+            space_element["name"] = "_".join([step["name"]
+                                              for step in step_combination])
+
+            space_element["pipeline"] = Pipeline(
+                    [(step["name"], step["object"])
+                     for step in step_combination])
+
+            space_element["params"] =\
+                {step["name"] + "__" + param_name: param_dist
+                 for step in step_combination
+                 for param_name, param_dist
+                 in step["params"].items()}
+
+            space.append(space_element)
+
+        return space
+
+    def compose_hyperopt_space(self, step_list: list) -> hp.choice:
+        """
+        Composes a hyperopt space from a list of steps.
+        A classic list of steps would be
+        [encoders, transformers, selectors, models],
+        where, for example, selectors is a list
+        of sklearn feature selectors, each selector given as a dict:
+        for example {"name": "kbest",
+                     "object": SelectPercentile(),
+                     "params": {
+                             "percentile":
+                                 3 + hp.randint("kbest__percentile", 200),
+                             "score_func":
+                                 hp.choice("kbest__score_func",
+                                    [f_classif, chi2, mutual_info_classif])}}
+        """
+        return hp.choice("pipelines", self.compose_gridsearch_space(step_list))

+ 1 - 0
cdplib/hyperparameter_space_composer/__init__.py

@@ -0,0 +1 @@
+from .SpaceComposer import *

+ 208 - 0
cdplib/ml_validation/CVComposer.py

@@ -0,0 +1,208 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Wed Dec  9 10:27:39 2020
+
+@author: tanya
+"""
+
+from typing import Union, Iterable, Tuple, List, NewType
+import pandas as pd
+import numpy as np
+from itertools import accumulate, repeat, takewhile, chain
+
+from cdplib.log import Log
+
+
+CVType = NewType("CVType", Iterable[Tuple[List]])
+
+DataSetType = NewType("DataSetType",
+                      Union[pd.DataFrame, pd.Sereis, np.ndarray, List])
+
+
+class CVComposer:
+    """
+    Groups methods for composing cv objects
+    that follow standards from sklearn,
+    these cv objects can be passed to algorithms like gridsearch, etc
+    """
+    def __init__(self):
+        """
+        """
+        self._logger = Log("CVComposer: ")
+
+    def dummy_cv(
+            self,
+            train_set_size: Union[int, None] = None,
+            train_index: Union[pd.Series, np.ndarray, None] = None,
+            test_set_size: Union[int, None] = None,
+            test_index: DataSetType = None) -> CVType:
+        """
+        """
+        assert((train_index is None) != (train_set_size is None)),\
+            "Set train_index or train_set_size"
+
+        assert((test_index is None) != (test_set_size is None)),\
+            "Set train_index or train_set_size"
+
+        train_index = train_index if (train_index is not None)\
+            else list(range(train_set_size))
+
+        test_index = test_index if (test_index is not None)\
+            else list(range(train_set_size, train_set_size + test_set_size))
+
+        return [(train_index, test_index)]
+
+    def dummy_cv_and_concatenated_data_set(
+            self,
+            X_train: DataSetType,
+            y_train: Union[DataSetType, None] = None,
+            X_test: DataSetType,
+            y_test: Union[DataSetType, None] = None)\
+            -> Tuple[DataSetType, DataSetType, CVType]:
+        """
+        """
+        assert((y_test is None) == (y_train is None))
+
+        use_index = (isinstance(X_train, pd.DataFrame) and
+                     isinstance(X_test, pd.DataFrame) and
+                     (len(set(X_train.index) and set(X_test.index)) == 0))
+
+        if use_index:
+
+            cv = self.dummy_cv(train_index=X_train.index,
+                               test_index=X_test.index)
+
+            X = pd.concat([X_train, X_test], ignore_index=False, axis=0)
+
+        else:
+            cv = self.dummy_cv(train_size=len(X_train),
+                               test_size=len(X_test))
+
+            X = np.concatenate([X_train, X_test])
+
+        use_target_index = use_index and (
+                    isinstance(y_train, pd.Series) and
+                    isinstance(y_test, pd.Series) and
+                    (X_train.index.equals(y_train.index)) and
+                    (X_test.index.equals(y_test.index)))
+
+        if use_target_index:
+
+            y = pd.concat([y_train, y_test], ignore_index=False, axis=0)
+
+        else:
+
+            y = np.concatenate([y_train, y_test]) if (y_train is not None)\
+                else None
+
+        result_to_np = (
+            (isinstance(X_train, pd.DataFrame) !=
+             isinstance(X_test, pd.DataFrame)) or
+            (isinstance(X_train, pd.DataFrame)) and
+            (len(set(X_train.index) and set(X_test.index)) != 0))
+
+        if result_to_np:
+            self._logger.log_and_throw_warning(
+                    "The concatenated dataframe is converted to numpy")
+
+        return cv, X, y
+
+    def expanding_cv(self, test_proportion: float,
+                     start_train_proportion: float,
+                     step_proportion: float = None,
+                     expanding_test_size: bool = False,
+                     data_set_size: Union[float, None] = None,
+                     index: Union[pd.Series, np.ndarray, list, None] = None)\
+            -> Union[Iterable[Tuple[List]], None]:
+        """
+        """
+        try:
+            assert((index is None) != (data_set_size is None)),\
+                "Set index or data_set_size"
+
+            index = pd.Series(index) if (index is not None)\
+                else pd.Series(range(data_set_size))
+
+            data_set_size = data_set_size or len(index)
+
+            start_train_size = int(start_train_proportion * data_set_size)
+            step_size = int(step_proportion * data_set_size)
+
+            test_size = int(test_proportion * data_set_size)
+
+            train_inds_set = (list(range(train_size))
+                              for train_size in
+                              takewhile(
+                                      lambda x: x <= data_set_size - test_size,
+                                      accumulate(repeat(start_train_size),
+                                                 lambda x, _: x + step_size)))
+
+            for train_inds in train_inds_set:
+
+                if expanding_test_size:
+
+                    yield (index[train_inds],
+                           index[train_inds[-1] + 1:
+                                 train_inds[-1] + 1
+                                 + int(test_proportion*len(train_inds))])
+
+                else:
+
+                    yield (index[train_inds],
+                           index[train_inds[-1] + 1:
+                                 train_inds[-1] + 1 + test_size])
+
+        except Exception as e:
+            self._logger.log_and_raise_error(("Failed to make expanding cv. "
+                                              "Exit with error: {}".format(e)))
+
+    def sliding_window_cv(
+        self,
+        test_proportion: float,
+        train_proportion: float,
+        step_proportion: float = None,
+        data_set_size: Union[float, None] = None,
+        index: Union[pd.Series, np.ndarray, list, None] = None)\
+            -> Union[Iterable[Tuple[List]], None]:
+        """
+        """
+        try:
+            assert((index is None) != (data_set_size is None)),\
+                "Set index or data_set_size"
+
+            index = pd.Series(index) if (index is not None)\
+                else pd.Series(range(data_set_size))
+
+            data_set_size = data_set_size or len(index)
+
+            train_size = int(train_proportion * data_set_size)
+            test_size = int(test_proportion * data_set_size)
+            step_size = int(step_proportion * data_set_size)
+
+            train_sizes = takewhile(lambda x: x <= data_set_size - test_size,
+                                    accumulate(repeat(train_size),
+                                               lambda x, _: x + step_size))
+
+            train_starts = takewhile(lambda x: x <= data_set_size
+                                     - train_size - test_size,
+                                     accumulate(repeat(step_size),
+                                                lambda x, _: x + step_size))
+
+            train_starts = chain([0], train_starts)
+
+            train_inds_set = list(range(train_start, train_size)
+                                  for train_start, train_size in
+                                  zip(train_starts, train_sizes))
+
+            cv = ((index[train_inds], index[train_inds[-1] + 1:
+                                            train_inds[-1] + 1 + test_size])
+                  for train_inds in train_inds_set)
+
+            return cv
+
+        except Exception as e:
+            self._logger.log_and_raise_error(
+                    ("Failed to make sliding window cv. "
+                     "Exit with error: {}".format(e)))
+

+ 3 - 0
cdplib/ml_validation/__init__.py

@@ -0,0 +1,3 @@
+from cross_validate_with_fine_tuning import *
+from CVComposer import *
+from expanding_cv import *

+ 491 - 0
cdplib/ml_validation/cross_validate_with_fine_tuning.py

@@ -0,0 +1,491 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Thu Oct 29 13:58:23 2020
+
+@author: tanya
+
+
+@description:
+
+* Input:
+    - pipeline/hyperparameter space
+    - data_train
+    - cv
+    - cv_folds
+
+* For each pipeline:
+
+    -> Split data_train into folds according to cv
+
+     -> For each fold:
+
+         => get data_train_fold, data_test_fold, cv_fold
+
+         => split data_train_fold into subfolds according to cv_fold
+
+         => For each subfold:
+
+             ==> get data_train_subfold, data_test_subfold
+
+             ==> train pipeline on data_train_subfold
+
+             ==> find best_threshold_subfold on data_test_subfold
+
+        => Find averaged_threshold_fold averaged over best_threshold_subfold
+
+        => train pipeline on data_train_fold
+
+        => find score_fold on data_test_fold with proba_threshold_fold
+
+        => find best_threshold_fold on data_test_fold
+
+    -> find score averaged over score_fold
+
+    -> find averaged_threshold averaged over best_threshold_fold
+
+* choose (pipeline/hyperparameters, threshold) in the space with best score
+
+"""
+
+import pandas as pd
+import numpy as np
+from itertools import zip_longest
+from typing import Union, Callable, Dict, Iterable, Tuple, List
+from copy import deepcopy
+from itertools import accumulate, repeat, takewhile, chain
+
+from sklearn.model_selection import StratifiedKFold
+
+from cdplib.log import Log
+
+
+
+
+
+aa = make_sliding_window_cv(data_set_size=50,
+                            test_proportion=0.1,
+                            train_proportion=0.6,
+                            step_proportion=0.1)
+
+aa = list(aa)
+
+aa = make_sliding_window_cv(test_proportion=0.1,
+                            train_proportion=0.6,
+                            step_proportion=0.05,
+                            index=pd.date_range(start=pd.to_datetime("2020-01-01"), periods=50))
+
+aa = list(aa)
+
+
+# TODO: write with yield !!!!
+
+def make_nested_expanding_cv(
+        test_proportion: float,
+        start_train_proportion: float,
+        step_proportion: float = None,
+        expanding_test_size: bool = False,
+        data_set_size: Union[float, None] = None,
+        index: Union[pd.Series, np.ndarray, list, None] = None)\
+        -> Iterable[Tuple[List]]:
+    """
+    """
+    logger = Log("make_nested_expanding_cv:")
+
+    try:
+        cv = make_expanding_cv(test_proportion=test_proportion,
+                               start_train_proportion=start_train_proportion,
+                               step_proportion=step_proportion,
+                               expanding_test_size=expanding_test_size,
+                               data_set_size=data_set_size,
+                               index=index)
+
+        nested_cv = []
+
+        for train_inds, test_inds in cv:
+
+            fold_index = train_inds if index is not None\
+                else None
+
+            fold_size = len(train_inds) if index is None else None
+
+            fold_cv = make_expanding_cv(
+                    test_proportion=test_proportion,
+                    start_train_proportion=start_train_proportion,
+                    step_proportion=step_proportion,
+                    expanding_test_size=expanding_test_size,
+                    data_set_size=fold_size,
+                    index=fold_index)
+
+            nested_cv.append(list(fold_cv))
+
+        return nested_cv
+
+    except Exception as e:
+        logger.log_and_raise_error(("Failed to make nested expanding cv. "
+                                    "Exit with error: {}".format(e)))
+
+
+
+
+for train_inds, test_inds in aa:
+    print(len(test_inds)/(len(train_inds) + len(test_inds)))
+    print(len(test_inds)/50)
+
+aaa = list(aaa)
+
+for aaa_cv in aaa:
+    for train_inds, test_inds in aaa_cv:
+        print(len(test_inds)/(len(train_inds) + len(test_inds)))
+        print(len(test_inds)/50)
+
+aaa = make_nested_expanding_cv(#data_set_size=50,
+                               test_proportion=0.1,
+                               start_train_proportion=0.6,
+                               step_proportion=0.1,
+                               index=pd.date_range(start=pd.to_datetime("2020-01-01"), periods=50))
+
+aaa = list(aaa)
+
+
+
+
+
+def cv_slice_dataset(X, y, train_inds, test_inds)\
+        -> Tuple[Union[pd.DataFrame, np.ndarray],
+                 Union[pd.Series, np.ndarray]]:
+    """
+    """
+    if isinstance(X, pd.DataFrame):
+        X_train = X.loc[train_inds]
+        X_val = X.loc[test_inds]
+    else:
+        X_train = X[train_inds]
+        X_val = X[test_inds]
+
+    if y is not None:
+        y_train = y[train_inds]
+        y_val = y[test_inds]
+
+    return X_train, X_val, y_train, y_val
+
+
+def get_optimal_proba_threshold(score_func: Callable,
+                                y_true: Union[pd.Series, np.ndarray],
+                                proba: Union[pd.Series, np.ndarray],
+                                threshold_set: Union[Iterable, None] = None):
+    """
+    """
+    scores = {}
+
+    if threshold_set is None:
+        threshold_set = np.arange(0, 1, 0.1)
+
+    for threshold in threshold_set:
+
+        y_pred = (proba >= threshold).astype(int)
+
+        scores[threshold] = score_func(y_true, y_pred)
+
+    return max(scores, key=scores.get)
+
+
+def cross_validate_with_optimal_threshold(
+        estimator: object,
+        score_func: Callable,
+        X_train: Union[pd.DataFrame, np.ndarray],
+        y_train: Union[pd.Series, np.ndarray, None] = None,
+        X_val: Union[pd.DataFrame, np.ndarray, None] = None,
+        y_val: Union[pd.Series, np.ndarray, None] = None,
+        X_val_threshold: Union[pd.DataFrame, np.ndarray, None] = None,
+        y_val_threshold: Union[pd.Series, np.ndarray, None] = None,
+        cv: Union[Iterable, int, None] = None,
+        cv_threshold: Union[Iterable, int, None] = None,
+        additional_metrics: Union[Dict[str, Callable], None] = None,
+        threshold_set: Union[Iterable, None] = None,
+        scores: Dict = None)\
+            -> Dict:
+    """
+    """
+    logger = Log("cross_validate_with_optimal_threshold:")
+
+    X_train = deepcopy(X_train)
+    y_train = deepcopy(y_train)
+    X_val = deepcopy(X_val)
+    y_val = deepcopy(y_val)
+    X_val_threshold = deepcopy(X_val_threshold)
+    y_val_threshold = deepcopy(y_val_threshold)
+
+    scores = scores or {"test_threshold": [],
+                        "test_score": [],
+                        "train_score": []}
+
+    additional_metrics = additional_metrics or {}
+
+    for metric_name, metric in additional_metrics.items():
+        if "test_" + metric_name not in scores:
+            scores["test_" + metric_name] = []
+            scores["train_" + metric_name] = []
+
+    if cv is None:
+
+        # test score is calculated on X_vals
+
+        assert((X_val is not None) and (y_val is not None)),\
+            "Validation set must be set"
+
+        if cv_threshold is None:
+
+            refit = (X_val_threshold is not None)
+
+            # if a validation set for proba threshold tuning is not given,
+            # we use the validation set on which we calculate the test score
+            # (this might lead to overfitting)
+
+            X_val_threshold = X_val_threshold if refit else deepcopy(X_val)
+            y_val_threshold = y_val_threshold if refit else deepcopy(y_val)
+
+            cv_threshold, X_train, y_train = make_dummy_cv(
+                    X_train=X_train,
+                    y_train=y_train,
+                    X_val=X_val_threshold,
+                    y_val=y_val_threshold)
+        else:
+
+            # if cv_threshold is given, we find the optimal threshold
+            # on each fold and output the average value for the threshold
+
+            if (X_val_threshold is not None):
+                logger.log_and_throw_warning((
+                        "X_val_threshold is set "
+                        "but cv_threshold will be used"))
+
+            if isinstance(cv_threshold, int):
+                cv_threshold = StratifiedKFold(n_splits=cv_threshold)\
+                    .split(X=X_train, y=y_train)
+
+            refit = True
+
+        thresholds = []
+
+        for train_inds, val_inds in cv_threshold:
+
+            print("----- In cv threshold fold")
+
+            X_train_fold, X_val_fold, y_train_fold, y_val_fold =\
+                cv_slice_dataset(X=X_train,
+                                 y=y_train,
+                                 train_inds=train_inds,
+                                 test_inds=val_inds)
+
+            estimator.fit(X_train_fold, y_train_fold)
+
+            proba_val = estimator.predict_proba(X_val_fold)[:, 1]
+
+            threshold = get_optimal_proba_threshold(score_func=score_func,
+                                                    y_true=y_val_fold,
+                                                    proba=proba_val)
+
+            thresholds.append(threshold)
+
+            print("----- Threshold:", threshold)
+
+        scores["test_threshold"].append(np.mean(thresholds))
+
+        if refit:
+
+            estimator.fit(X_train, y_train)
+
+            proba_val = estimator.predict_proba(X_val)[:, 1]
+
+        proba_train = estimator.predict_proba(X_train)[:, 1]
+
+        pred_train = (proba_train >= threshold)
+        pred_val = (proba_val >= threshold)
+
+        train_score = score_func(y_train, pred_train)
+        test_score = score_func(y_val, pred_val)
+
+        for metric_name, metric in additional_metrics.items():
+            scores["train_" + metric_name].append(metric(y_train, pred_train))
+            scores["test_" + metric_name].append(metric(y_val, pred_val))
+
+        scores["train_score"].append(train_score)
+        scores["test_score"].append(test_score)
+
+        return scores
+
+    else:
+
+        if isinstance(cv, int):
+            cv = StratifiedKFold(n_splits=cv).split(X=X_train, y=y_train)
+
+        cv_threshold = cv_threshold or []
+
+        for (train_inds, val_inds), cv_fold in zip_longest(cv, cv_threshold):
+
+            print("=== In cv fold")
+
+            X_train_fold, X_val_fold, y_train_fold, y_val_fold =\
+                cv_slice_dataset(X=X_train,
+                                 y=y_train,
+                                 train_inds=train_inds,
+                                 test_inds=val_inds)
+
+            scores = cross_validate_with_optimal_threshold(
+                    estimator=estimator,
+                    score_func=score_func,
+                    X_train=X_train_fold,
+                    y_train=y_train_fold,
+                    X_val=X_val_fold,
+                    y_val=y_val_fold,
+                    cv_threshold=cv_fold,
+                    additional_metrics=additional_metrics,
+                    threshold_set=threshold_set,
+                    scores=scores)
+
+            print("=== scores:", scores)
+
+        return scores
+
+
+if __name__ == "__main__":
+
+    from sklearn.metrics import accuracy_score, precision_score
+    from sklearn.datasets import load_breast_cancer
+    from xgboost import XGBRFClassifier
+    from sklearn.model_selection import train_test_split
+
+    data_loader = load_breast_cancer()
+
+    X = data_loader["data"]
+    y = data_loader["target"]
+
+    X_train, X_val, y_train, y_val = train_test_split(X, y)
+
+    estimator = XGBRFClassifier()
+
+    score_func = accuracy_score
+
+    additional_metrics = {"precision": precision_score}
+
+    averaged_scores = []
+    averaged_thresholds = []
+
+    print("\nTesting cv=None, cv_threshold=None, X_val_threshold=None\n")
+
+    scores = cross_validate_with_optimal_threshold(
+            estimator=estimator,
+            score_func=accuracy_score,
+            X_train=X_train,
+            y_train=y_train,
+            X_val=X_val,
+            y_val=y_val,
+            X_val_threshold=None,
+            y_val_threshold=None,
+            cv=None,
+            cv_threshold=None,
+            additional_metrics=additional_metrics)
+
+    print("\nScores:", scores)
+
+    averaged_scores.append(np.mean(scores["test_score"]))
+    averaged_thresholds.append(np.mean(scores["test_threshold"]))
+
+    print("\n ########################################################## \n")
+
+    X_train, X_val_threshold, y_train, y_val_threshold =\
+        train_test_split(X_train, y_train)
+
+    print("\nTesting cv=None, cv_threshold=None, X_val_threshold\n")
+
+    scores = cross_validate_with_optimal_threshold(
+            estimator=estimator,
+            score_func=accuracy_score,
+            X_train=X_train,
+            y_train=y_train,
+            X_val=X_val,
+            y_val=y_val,
+            X_val_threshold=X_val_threshold,
+            y_val_threshold=y_val_threshold,
+            cv=None,
+            cv_threshold=None,
+            additional_metrics=additional_metrics)
+
+    print("\nScores:", scores)
+
+    averaged_scores.append(np.mean(scores["test_score"]))
+    averaged_thresholds.append(np.mean(scores["test_threshold"]))
+
+    print("\n ########################################################## \n")
+
+    print("\nTesting cv=None, cv_threshold=3 \n")
+
+    scores = cross_validate_with_optimal_threshold(
+            estimator=estimator,
+            score_func=accuracy_score,
+            X_train=X_train,
+            y_train=y_train,
+            X_val=X_val,
+            y_val=y_val,
+            X_val_threshold=X_val_threshold,
+            y_val_threshold=y_val_threshold,
+            cv=None,
+            cv_threshold=3,
+            additional_metrics=additional_metrics)
+
+    print("\nScores:", scores)
+
+    averaged_scores.append(np.mean(scores["test_score"]))
+    averaged_thresholds.append(np.mean(scores["test_threshold"]))
+
+    print("\n ########################################################## \n")
+
+    print("\nTesting cv=3, cv_threshold=None \n")
+
+    scores = cross_validate_with_optimal_threshold(
+            estimator=estimator,
+            score_func=accuracy_score,
+            X_train=X_train,
+            y_train=y_train,
+            X_val=X_val,
+            y_val=y_val,
+            X_val_threshold=X_val_threshold,
+            y_val_threshold=y_val_threshold,
+            cv=3,
+            cv_threshold=None,
+            additional_metrics=additional_metrics)
+
+    print("\nScores:", scores)
+
+    print("\n ########################################################## \n")
+
+    print("\nTesting cv=3, cv_threshold=[3, 3, 3] \n")
+
+    scores = cross_validate_with_optimal_threshold(
+            estimator=estimator,
+            score_func=accuracy_score,
+            X_train=X_train,
+            y_train=y_train,
+            X_val=X_val,
+            y_val=y_val,
+            X_val_threshold=X_val_threshold,
+            y_val_threshold=y_val_threshold,
+            cv=3,
+            cv_threshold=[3, 3, 3],
+            additional_metrics=additional_metrics)
+
+    print("\nScores:", scores)
+
+    averaged_scores.append(np.mean(scores["test_score"]))
+    averaged_thresholds.append(np.mean(scores["test_threshold"]))
+
+    print("\n ########################################################## \n")
+
+    # TODO: check overwriting X_train,
+    # additional metrics append instead of overwrite
+    # check the length of cv_threshold
+    # test custom cv, cv_threshold
+
+    print("\n Averaged test score:", averaged_scores)
+    print("\n Averaged threshold:", averaged_thresholds)

+ 97 - 0
cdplib/ml_validation/expanding_cv.py

@@ -0,0 +1,97 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Wed Dec  9 09:55:52 2020
+
+@author: tanya
+"""
+
+from typing import Union, Iterable, Tuple, List
+import pandas as pd
+import numpy as np
+from itertools import accumulate, repeat, takewhile
+
+from cdplib.log import Log
+
+
+def make_expanding_cv(test_proportion: float,
+                      start_train_proportion: float,
+                      step_proportion: float = None,
+                      expanding_test_size: bool = False,
+                      data_set_size: Union[float, None] = None,
+                      index: Union[pd.Series, np.ndarray, list, None] = None)\
+        -> Union[Iterable[Tuple[List]], None]:
+    """
+
+    """
+    logger = Log("make_expanding_cv:")
+
+    try:
+        assert((index is None) != (data_set_size is None)),\
+            "Set index or data_set_size"
+
+        index = index if (index is not None)\
+            else pd.Series(range(data_set_size))
+
+        data_set_size = data_set_size or len(index)
+
+        start_train_size = int(start_train_proportion * data_set_size)
+        step_size = int(step_proportion * data_set_size)
+
+        test_size = int(test_proportion * data_set_size)
+
+        train_inds_set = (list(range(train_size))
+                          for train_size in
+                          takewhile(
+                                  lambda x: x <= data_set_size - test_size,
+                                  accumulate(repeat(start_train_size),
+                                             lambda x, _: x + step_size)))
+
+        for train_inds in train_inds_set:
+
+            if expanding_test_size:
+
+                yield (index[train_inds],
+                       index[train_inds[-1] + 1:
+                             train_inds[-1] + 1
+                             + int(test_proportion*len(train_inds))])
+
+            else:
+
+                yield (index[train_inds],
+                       index[train_inds[-1] + 1:
+                             train_inds[-1] + 1 + test_size])
+
+    except Exception as e:
+        logger.log_and_raise_error(("Failed to make expanding cv. "
+                                    "Exit with error: {}".format(e)))
+
+
+if __name__ == "__main__":
+
+    logger = Log("Test_expanding_cv: ")
+
+    logger.info("Start Testing")
+
+    logger.info("Testing expanding cv: ")
+
+    cv = make_expanding_cv(data_set_size=50,
+                           test_proportion=0.1,
+                           start_train_proportion=0.6,
+                           step_proportion=0.1,
+                           expanding_test_size=True)
+
+    cv = list(cv)
+
+    logger.info("Testing expanding cv with datetime index")
+
+    cv = make_expanding_cv(
+            test_proportion=0.1,
+            start_train_proportion=0.6,
+            step_proportion=0.1,
+            index=pd.date_range(start=pd.to_datetime("2020-01-01"),
+                                periods=50))
+
+    cv = list(cv)
+
+    logger.info("Finish testing")

+ 789 - 0
cdplib/pipeline_selector/PipelineSelector.py

@@ -0,0 +1,789 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Wed Sep 30 14:23:23 2020
+
+@author: tanya
+@description: an abstract class for selecting a machine learning
+ pipeline from a space (deterministic or random) of parameter distributions
+ over multiple pipelines.
+ The selection is thought in such a way that a Trials object is being
+ maintained during the tuning process from which one can retrieve
+ the best pipeline so far
+ as well as the entire tuning history if needed.
+ Methods configure_cross_validation and configure_result_saving
+ allow to use a custom cross-validation method and
+ save the current best result in a file or database during training.
+ Children classes: hyperopt and custom gridsearch.
+"""
+
+import pickle
+import os
+import sys
+import time
+import datetime
+import numpy as np
+import pandas as pd
+from copy import deepcopy
+from abc import ABC, abstractmethod, abstractproperty
+from typing import Callable, Optional, TypedDict,\
+    Literal, Dict, Iterable, List, Tuple, Union
+import functools
+from sklearn.pipeline import Pipeline
+from sklearn.model_selection import cross_validate as sklearn_cross_validation
+from sklearn.metrics import make_scorer
+from hyperopt import STATUS_OK, STATUS_FAIL
+from cdplib.log import Log
+from cdplib.utils import ExceptionsHandler
+from cdplib.utils import LoadingUtils
+from cdplib.ml_validation import CVComposer
+
+sys.path.append(os.getcwd())
+
+
+class SpaceElementType(TypedDict):
+    name: str
+    pipeline: Pipeline
+    params: dict
+
+
+class PipelineSelector(ABC):
+    """
+    An abstract class for selecting a machine learning
+    pipeline from a space (deterministic or random) of parameter
+    distributions over multiple pipelines.
+    The selection is though in such a way that a Trials object is being
+    maintained during the tuning process from which one can retrieve
+    the best pipeline so far as well as the entire tuning history
+    if needed.
+    Methods configure_cross_validation and configure_result_saving
+    allow to use a custom cross-validation method and
+    save the current best result in a file or database during training.
+    Children classes: hyperopt and custom gridsearch.
+    """
+    def __init__(self,
+                 cost_func: Union[Callable, str],
+                 greater_is_better: bool,
+                 trials_path: str,
+                 backup_trials_freq: Optional[int] = None,
+                 cross_val_averaging_func: Callable = np.mean,
+                 additional_metrics: Optional[Dict[str, Callable]] = None,
+                 additional_averaging_funcs:
+                     Optional[Dict[str, Callable]] = None,
+                 strategy_name: Optional[str] = None,
+                 stdout_log_level: Literal["INFO", "WARNING", "ERROR"]
+                 = "INFO"):
+        """
+        :param Callable cost_func: function to minimize or maximize
+            over the elements of a given (pipeline/hyperparameter) space
+
+        :param bool greater_is_better: when True
+            cost_func is maximized, else minimized.
+
+        :param str trials_path: path at which the trials object is saved
+            in binary format. From the trials object we can
+            select information about the obtained scores, score variations,
+            and pipelines, and parameters tried out so far. If a trials object
+            already exists at the given path, it is loaded and the
+            search is continued, else, the search is started from scratch.
+
+        :param backup_trials_freq: frequecy in interations (trials)
+            of saving the trials object at the trials_path.
+            if None, the trials object is backed up avery time
+            the score improves.
+
+        :param Callable cross_val_averaging_func: Function to aggregate
+            the cross-validation scores of the cost_func.
+            Example different from the mean: mean - c*var.
+
+        :param additional_metics: dict of additional metrics to keep track of
+            in the trials of the form {"metric_name": metric}.
+
+        :param additional_averaging_funcs: functions used to aggregate
+            the output of the cross_validate function.
+            The output always contains the scores of the cost_func,
+            additional_metrics (if it is not empty),
+            but it can also contain additional information
+            (like probability threshold for example)
+            if different from cross_val_averaging_func.
+            Of the form {"metric_name": averaging_func}
+
+            Remark:
+
+        :param str strategy_name:
+            a strategy is defined by the data set (columns/features and rows),
+            cv object, cost function.
+            When the strategy changes, one must start with new trials.
+
+        :param str stdout_log_level: can be INFO, WARNING, ERROR
+        """
+        self._logger = Log("PipelineSelector: ",
+                           stdout_log_level=stdout_log_level)
+
+        try:
+
+            ExceptionsHandler(self._logger)\
+                .assert_is_directory(path=trials_path)
+
+            self.attached_space = False
+            self.attached_data = False
+            self.configured_cross_validation = False
+            self.configured_summary_saving = False
+
+            self._cost_func = cost_func
+            # score factor is 1 when cost_func is minimized,
+            # -1 when cost func is maximized
+            self._score_factor = (not greater_is_better) - greater_is_better
+            self.trials_path = trials_path
+            self._backup_trials_freq = backup_trials_freq
+            self._strategy_name = strategy_name
+            self._data_path = None
+            self._cv_path = None
+
+            self._X = None
+            self._y = None
+            self._cv = None
+            self._space = None
+
+            # if cross-valition is not configured,
+            # sklearn cross-validation method is taken by default
+            self._cross_validation = sklearn_cross_validation
+
+            # if a trials object already exists at the given path,
+            # it is loaded and the search is continued. Else,
+            # the search is started from the beginning.
+            if os.path.isfile(self.trials_path):
+
+                with open(self.trials_path, "rb") as f:
+                    self._trials = pickle.load(f)
+
+                self._start_iteration = self.number_of_trials
+
+                self.best_score = self.best_trial_score
+
+                self._logger.info(("Loaded an existing trials object"
+                                   "Consisting of {} trials")
+                                  .format(self._start_iteration))
+
+            else:
+                self._logger.warning(("No existing trials object was found, "
+                                      "Starting from scratch."))
+
+                self._trials = None
+                self._start_iteration = 0
+                self.best_score = np.nan
+
+            # keeping track of the current search iteration
+            self._iteration = self._start_iteration
+            self._score_improved = False
+
+            self.start_tuning_time = datetime.datetime.today()
+            self.total_tuning_time = None
+            self.finished_tuning = False
+
+        except Exception as e:
+            err = ("Failed to initialize the class. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    def _backup_trials(self) -> None:
+        '''
+        Pickles (Saves) the trials object in binary format.
+        '''
+        try:
+            with open(self.trials_path, "wb") as f:
+                pickle.dump(self._trials, f)
+
+        except Exception as e:
+            err = "Could not backup trials. Exit with error: {}".format(e)
+            self._logger.log_and_raise_error(err)
+
+    def configure_cross_validation(self,
+                                   cross_validation: Callable,
+                                   kwargs: dict = None) -> None:
+        """
+        Method for attaching a custom cross-validation function
+
+        :param cross_validation: a function that has the same
+             signature as sklearn.model_selection.cross_validate
+        """
+        try:
+            kwargs = kwargs or {}
+
+            self._cross_validation = functools.partial(
+                    self._cross_validation, **kwargs)
+
+            self.configured_cross_validation = True
+
+            self._logger.info("Configured cross validation")
+
+        except Exception as e:
+            err = ("Failed to configure cross-validation. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    def configure_cross_validation_from_module(self,
+                                               module_path: str,
+                                               name: str) -> None:
+        """
+        Attaches a cross-validation funciton defined in
+        a different python model. This function must have
+        the same signature as sklearn.model_seclection.cross_validate
+
+        :param str module_path: path to python module
+            where the cross_validation function is defined.
+
+        :param str name: name of the cross validation function
+            loaded froma python module.
+        """
+        try:
+            self._cross_validation = \
+                LoadingUtils().load_from_module(
+                        module_path=module_path, name=name)
+
+            self.configured_cross_validation = True
+
+            self._logger.info("Configured cross validation")
+
+        except Exception as e:
+            err = ("Failed to load cross-validation from module. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    def attach_space(self, space) -> None:
+        """
+        Method for attaching the pipeline/hyperparameter space
+        over which the score_func is optimized.
+
+        :param space: space where
+            the search is performed. A space might be either
+            a list of dictionaries or a hyperopt space object
+            the elements of which are dictionaries with keys:
+            name, pipeline, params
+        """
+        try:
+            self._space = space
+
+            self.attached_space = True
+
+            self._logger.info("Attached parameter distribution space")
+
+        except Exception as e:
+            err = ("Failed to attach space. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    def attach_space_from_module(self, module_path: str, name: str) -> None:
+        """
+        Attaches a space defined in a different python module.
+
+        :param str module_path: path to python module
+            where the space is defined.
+
+        :param str name: name of the space loaded from
+            a python module.
+        """
+        try:
+            self._space = LoadingUtils().load_from_module(
+                    module_path=module_path, name=name)
+
+            self.attached_space = True
+
+            self._logger.info("Attached parameter distribution space")
+
+        except Exception as e:
+            err = ("Failed to attach space from module. "
+                   "Exit with error {}".format(e))
+
+            self._logger.loger_and_raise_error(err)
+
+    def attach_data(self, X_train: Union[pd.DataFrame, np.ndarray],
+                    y_train: Optional[pd.DataFrame, pd.Series, np.ndarray]
+                    = None,
+                    X_val: Optional[pd.DataFrame, np.ndarray]
+                    = None,
+                    y_val: Optional[pd.DataFrame, pd.Series, np.ndarray]
+                    = None,
+                    cv: Optional[Iterable[Tuple[List[int], List[int]]]]
+                    = None) -> None:
+        '''
+        :param array X_train: data on which
+            machine learning pipelines are trained
+
+        :param array y_train: optional, vector with targets,
+            (None in case of unsupervided learning)
+
+        :param array X_val: optional, validation data.
+            When not provided, cross-validated value
+            of the cost_func is calculated.
+
+        :param array y_val: optional, validation targets
+
+        :param list cv: iterabe of tuples containing
+            train and validation indices or an integer representing
+            the number of folds for a random split of data
+            during cross-validation
+            example: [([0,1,2], [3,4]), ([1,2,3], [4,5])]
+        '''
+        try:
+            assert((cv is None) == (X_val is not None)),\
+                "Either cv or X_val must be provided"
+
+            if cv is None:
+
+                assert((y_val is None) == (y_train is None)),\
+                    "y_train and y_val must be simultanious"
+
+                # Here we create a trivial cv object
+                # with one validation split.
+                cv = CVComposer.dummy_cv()
+
+
+
+
+
+                train_inds = list(range(len(X_train)))
+                val_inds = list(range(len(X_train),
+                                      len(X_train) + len(X_val)))
+
+                self._cv = [(train_inds, val_inds)]
+
+                self._X = np.concatenate([X_train, X_val])
+                self._y = None if y_train is None\
+                    else np.concatenate([y_train, y_val])
+
+            else:
+
+                self._cv = cv
+                self._X = X_train
+                self._y = y_train
+
+            self.attached_data = True
+
+            self._logger.info("Attached data")
+
+        except Exception as e:
+            err = ("Failed to attach data. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    def attach_data_from_hdf5(self,
+                              data_hdf5_store_path: str,
+                              cv_pickle_path: str = None) -> None:
+        """
+        Method for attaching data from a hdf5 store
+         and a cv object from a pickled file.
+
+         The hdf5 store is a binary file,
+         after loading it, it is a dictionary with keys
+         X_train (y_train, X_val, y_val).
+
+         The cv is loaded from a pickle file.
+
+         The reason to separate the data
+         store from the cv store, is the hdf5 is optimized to
+         store large dataframes (especially with simple types) and
+         a a small list of lists like a cv-object is better
+         to be stored as a pickle file.
+
+        :param str data_hdf5_store_path: path to the hdf5 store
+            with train and validation data
+        :param str cv_pickle_path: path to the pickle file with
+            the cv data
+        """
+        try:
+            assert(os.path.isfile(data_hdf5_store_path)),\
+                "Parameter hdf5_store_path is not a file"
+
+            store = pd.HDFStore(data_hdf5_store_path)
+
+            self._data_path = data_hdf5_store_path
+
+            data_input = {key: store["key"] if key in store else None
+                          for key in ["X_train", "y_train", "X_val", "y_val"]}
+
+            if cv_pickle_path is not None:
+
+                assert(os.path.isfile(cv_pickle_path)),\
+                    "Parameter cv_pickle_path is not a file"
+
+                data_input["cv"] = pickle.load(open(cv_pickle_path, "rb"))
+
+                self._cv_path = cv_pickle_path
+
+            else:
+                data_input["cv"] = None
+
+            self.attach_data(**data_input)
+
+            store.close()
+
+        except Exception as e:
+            err = "Failed to attach data. Exit with error: {}".format(e)
+            self._logger.log_and_raise_error(err)
+
+    @property
+    def default_summary(self) -> dict:
+        """
+        Default summary of the strategy.
+        Every the _objective function is called
+        the current score and the information
+        about the tested space element is added to the
+        summary and it is saved to the Trials.
+        If summary saving is configured it is also
+        saved to a file, or a database when the score improves.
+        """
+        summary = {}
+
+        if self._strategy_name is not None:
+            summary["strategy_name"] = self._strategy_name
+
+        if isinstance(self._cost_func, str):
+            summary["cost_func"] = self._cost_func
+
+        elif hasattr(self._cost_func, "__name__"):
+            summary["cost_func"] = self._cost_func.__name__
+
+        summary["trials_path"] = self.trials_path
+
+        if self._data_path is not None:
+            summary["data_path"] = self._data_path
+
+        if self._cv_path is not None:
+            summary["cv_path"] = self._cv_path
+
+        summary["start_tuning_time"] = self.start_tuning_time
+
+        summary["iteration"] = self._iteration
+
+        return summary
+
+    def configer_summary_saving(self,
+                                save_method: Callable
+                                = functools.partial(
+                                        pd.DataFrame.to_excel,
+                                        **{"path_or_buf": "result.csv"}),
+                                kwargs: Optional[dict] = None) -> None:
+        """
+        When the score calculated by _objective function improves,
+        the default summary is updated with information about the
+        current score and pipeline/hyperparameters
+        and can be saved to a file or database, depending
+        on the configured save_method.
+
+        :param Callable save_method: method for saving the result
+            of the pipeline selection. The method must accept
+            a pandas DataFrame as argument.
+            By default, saving to an excel file.
+
+            Examples:
+                functools.partial(pd.DataFrame.to_csv,
+                                  **{"path_or_buf": <PATH>})
+                functools.partial(np.savetxt, **{"fname": <PATH>})
+
+                functools.partial(SQLHandler(<URI>).append_to_table,
+                                  **{"tablename": <NAME>})
+
+                functools.partial(MongodbHandler(<URI>).insert_data_into_collection,
+                                  **{"collection_name": <NAME>})
+
+            using functools can be avoided by providing the kwarg argument
+
+        :param dict kwargs: a dictionary with keyword arguments
+            (like tablename) to provide to the save_method
+        """
+        try:
+            kwargs = kwargs or {}
+
+            self._save_method = functools.partial(save_method, **kwargs)
+
+            self.configured_summary_saving = True
+
+            self._logger.info("Configured summary saving")
+
+        except Exception as e:
+            err = ("Failed to configure the summary saving. "
+                   "Exit with error {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    def _save_summary(self, summary: dict) -> None:
+        """
+        When the score calculated by _objective function improves,
+        the default summary is updated with information about the
+        current score and pipeline/hyperparameters
+        and can be saved to a file or database, depending
+        on the configured save_method.
+        """
+        try:
+            assert(self.configured_summary_saving),\
+                "Result saving must be configured first"
+
+            self._save_method(summary)
+
+        except Exception as e:
+            err = ("Could not configure summary saving. "
+                   "Exit with error: {}".format(e))
+
+            self._logger.log_and_raise_error(err)
+
+    def _evaluate(self, pipeline: Pipeline) -> Union[Dict[str, float], None]:
+        """
+        Calculates the averaged cross-validated score and score variance,
+        as well as the averaged values and variances of the additional metrics.
+
+        This method is called in the _objective function that is
+        passed to the hyperopt optimizer.
+
+        This function can be overriden, when the cost
+        needs to be calculated differently,
+        for example with a tensorflow model.
+
+        :param Pipeline pipeline: machine learning pipeline
+            that will be evaluated with cross-validation
+
+        :return: dictionary with the aggregated
+            cross-validation scores and
+            the score variances for the scores in the output
+            of the cross-validation function.
+
+            form of the output:
+                {"score": 10, #score used in optimization,
+                 "score_variance": 0.5
+                 "additional_metric1": 5,
+                 "additional_metric1_variance": 7}
+
+            a custom cross-validation function can also include for
+            example probability threshold for each fold, then
+            the output of this function will include the average
+            value and the variance of the probability threshold
+            over the folds.
+        """
+        try:
+            scoring = {"score": make_scorer(self.cost_func)}
+
+            scoring.update({metric_name: make_scorer(metric)
+                            for metric_name, metric
+                            in self._additional_metrics.items()})
+
+            scores = self._cross_validation(
+                    estimator=pipeline,
+                    X=self._X,
+                    y=self._y,
+                    cv=self._cv,
+                    scoring=self._scoring,
+                    error_score=np.nan)
+
+            averaging_funcs = {
+                    metric_name: self._additional_averaging_funcs[metric_name]
+                    if metric_name in self._additional_averaging_funcs
+                    else self._cross_val_averaging_func
+                    for metric_name in scores}
+
+            scores_average = {
+                    metric_name.replace("test_", ""):
+                    averaging_funcs[metric_name](scores[metric_name])
+                    for metric_name in scores
+                    if metric_name.startswith("test")}
+
+            scores_variance = {
+                    metric_name.replace("test_", "") + "_variance":
+                    np.var(scores[metric_name])
+                    for metric_name in scores
+                    if metric_name.startswith("test")}
+
+            return {**scores_average, **scores_variance}
+
+        except Exception as e:
+            err = "Failed to evaluate pipeline. Exit with error: {}".format(e)
+
+            self._logger.log_and_raise_error(err)
+
+    def _objective(self, space_element: SpaceElementType) -> dict:
+        '''
+        This method is called in run_trials method
+        that is using the hyperopt fmin opmizer.
+
+        Uses _evaluate method.
+
+        It must take as input a space element
+        and produce an output in the form of dictionary
+        with 2 obligatory values loss and status
+        (STATUS_OK or STATUS_FAIL). Other
+        values in the output are optional and can be
+        accessed later through the trials object.
+
+        :Warning: fmin minimizes the loss,
+        when _evaluate returns a value to be maximized,
+        it is multiplied by -1 to obtain loss.
+
+        :param SpaceElementType space_element: element
+            of the space over which the optimization is done
+
+        :output: dictionary with keys
+            loss (minimized value),
+            status with values STATUS_OK or STATUS_FAIL
+            uderstood by hyperopt,
+            score (equal to loss or -loss),
+            score_variance,
+            timestamp (end of execution),
+            train_time: execution time
+            and other keys given in self.default_summary
+        '''
+        try:
+            start_time = time.time()
+
+            assert(self.attached_data),\
+                ("Data must be attached in order "
+                 "in order to effectuate the best"
+                 "pipeline search")
+
+            summary = deepcopy(self.default_summary)
+
+            # backup the current trials if the score improved
+            # at previous iteration or every ith iteration
+            # if the backup_trials_freq is set
+            backup_cond = ((self._backup_trials_freq is not None) and
+                           ((self._iteration - self._start_iteration - 1) %
+                            self._backup_trials_freq == 0)) or\
+                self._score_improved
+
+            if backup_cond:
+                self._backup_trials()
+                self._score_improved = False
+
+            pipeline = space_element['pipeline']
+            params = space_element['params']
+            pipeline.set_params(**params)
+
+            self._logger.info(("Iteration {0}: "
+                               "Current score is {1}: "
+                               "Training pipeline {2} "
+                               "with parameters: {3}. ").format(
+                                  self._iteration,
+                                  self.best_score,
+                                  space_element['name'],
+                                  params))
+
+            result = self._evaluate(pipeline)
+
+            summary.update(result)
+
+            end_time = time.time()
+
+            summary['status'] = STATUS_OK
+            summary.update(result)
+            summary['loss'] = self._score_factor * summary['score']
+            summary['timestamp'] = datetime.datetime.today()
+            summary['train_time'] = end_time - start_time
+
+            self._iteration += 1
+
+            self._score_improved = (self.best_score != self.best_score) or\
+                                   (self._score_factor*result["score"] <
+                                    self._score_factor*self.best_score)
+
+            if self._score_improved:
+
+                self._logger.info("Score improved, new best score is: {}"
+                                  .format(result["score"]))
+
+                self.best_score = result['score']
+
+                if self.configured_summary_saving:
+                    self._save_summary(summary)
+
+        except Exception as e:
+
+            self._logger.warning("Trial failed with error {}".format(e))
+
+            summary = {}
+            summary['status'] = STATUS_FAIL
+            summary['timestamp'] = datetime.datetime.today()
+            summary['error'] = e
+            for key in ['loss', 'score', 'score_variance', 'train_time']:
+                summary[key] = np.nan
+
+        return summary
+
+    @abstractmethod
+    def run_trials(self):
+        """
+        Method that runs the hyperparameter tuning over possibly multiple
+        pipeline types specified in self.space
+        When run_trials method is finished the flag self.finished_tuning
+        should be set to True and the methods self._backup_trials and
+        optionally self._save_result should be called.
+        """
+        pass
+
+    @abstractproperty
+    def number_of_trials(self) -> int:
+        """
+        Number of trials already run in the current trials object
+        """
+        pass
+
+    @abstractproperty
+    def best_trial(self) -> dict:
+        """
+        Best trial sor far.
+         Should contain the status, pipeline,
+         hyperparameters, and the score (loss).
+         Other information is otional and is defined
+         by self.default_summary
+        """
+        pass
+
+    @abstractproperty
+    def best_trial_score(self) -> float:
+        """
+        Score of the best pipeline with the best hyperparameters
+        """
+        pass
+
+    @abstractproperty
+    def best_trial_score_variance(self) -> float:
+        """
+        Variance of the cross-validation score of the best pipeline
+        """
+        pass
+
+    @abstractproperty
+    def best_trial_pipeline(self) -> Pipeline:
+        """
+        Best pipeline with best hyperparameters
+        """
+        pass
+
+    @abstractmethod
+    def get_n_best_trial_pipelines(self, n: int) -> list:
+        """
+        N best pipelines with corresponding
+        best hyperparameters
+        """
+        pass
+
+    @abstractmethod
+    def get_n_best_trial_pipelines_of_each_type(self, n_int) -> list:
+        """
+        If the hyperparameter search is done over multiple
+        pipelines, then returns n different pipeline-types
+        with corresponding hyperparameters
+        """
+        pass
+
+    @abstractmethod
+    def trials_to_excel(self, path: str) -> None:
+        """
+        Trials object in the shape of table written to excel,
+        should contain the iteration, pipeline (as str),
+        hyperparamters (as str), self.best_result (see self._objective method)
+        as well as additional information defined by self.default_summary
+        """
+        pass

+ 1 - 0
cdplib/pipeline_selector/__init__.py

@@ -0,0 +1 @@
+from .PipelineSelector import *