|
@@ -0,0 +1,348 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+Created on Wed Sep 30 14:15:17 2020
|
|
|
+
|
|
|
+@author: tanya
|
|
|
+@description:a class for selecting a machine learning
|
|
|
+ pipeline from a deterministic space of parameter distributions
|
|
|
+ over multiple pipelines.
|
|
|
+ The selection is though in such a way that a Trials object is being
|
|
|
+ maintained during the tuning process from which one can retrieve
|
|
|
+ the best pipeline so far as well as the entire tuning history
|
|
|
+ if needed.
|
|
|
+"""
|
|
|
+
|
|
|
+import os
|
|
|
+import sys
|
|
|
+from itertools import product
|
|
|
+from collections import ChainMap
|
|
|
+from sklearn.pipeline import Pipeline
|
|
|
+
|
|
|
+from cdplib.pipeline_selector.PipelineSelector import PipelineSelector
|
|
|
+
|
|
|
+sys.path.append(os.getcwd())
|
|
|
+
|
|
|
+
|
|
|
+class GridSearchPipelineSelector(PipelineSelector):
|
|
|
+ """
|
|
|
+ A class for selecting a machine learning
|
|
|
+ pipeline from a deterministic space of parameter distributions
|
|
|
+ over multiple pipelines.
|
|
|
+ The selection is though in such a way that a Trials object is being
|
|
|
+ maintained during the tuning process from which one can retrieve
|
|
|
+ the best pipeline so far as well as the entire tuning history
|
|
|
+ if needed.
|
|
|
+ """
|
|
|
+ def __init__(self,
|
|
|
+ cost_func,
|
|
|
+ greater_is_better: bool,
|
|
|
+ trials_path: str,
|
|
|
+ backup_trials_freq: int = 1,
|
|
|
+ cross_val_averaging_func: callable = None,
|
|
|
+ additional_metrics: dict = None,
|
|
|
+ strategy_name: str = None,
|
|
|
+ stdout_log_level: str = "INFO"
|
|
|
+ ):
|
|
|
+ """
|
|
|
+ :param callable cost_func: function to minimize or maximize
|
|
|
+
|
|
|
+ :param bool greater_is_better: when True
|
|
|
+ cost_func is maximized, else minimized.
|
|
|
+
|
|
|
+ :param str trials_path: path at which the trials object is saved
|
|
|
+ in binary format. From the trials object we can
|
|
|
+ select information about the obtained scores, score variations,
|
|
|
+ and pipelines, and parameters tried out so far. If a trials object
|
|
|
+ already exists at the given path, it is loaded and the
|
|
|
+ search is continued, else, the search is started from
|
|
|
+ the beginning.
|
|
|
+
|
|
|
+ :param backup_trials_freq: frequecy in interations (trials)
|
|
|
+ of saving the trials object at the trials_path.
|
|
|
+
|
|
|
+ :param str log_path: Optional, when not provided logs to stdout.
|
|
|
+
|
|
|
+ :param callable averaging_func: optional,
|
|
|
+ when not provided set to mean. Function
|
|
|
+ to aggregate the cross-validated values of the cost function.
|
|
|
+ Classic situation is to take the mean,
|
|
|
+ another example is, for example mean() - c*var().
|
|
|
+ :param additional_metics: dict of additional metrics to save
|
|
|
+ of the form {"metric_name": metric} where metric is a Callable.
|
|
|
+
|
|
|
+ :param str strategy_name: a name might be asigned to the trials,
|
|
|
+ a strategy is defined by the data set, cv object, cost function.
|
|
|
+ When the strategy changes, one should start with new trials.
|
|
|
+
|
|
|
+ :param str stdout_log_level: can be INFO, WARNING, ERROR
|
|
|
+ """
|
|
|
+ super().__init__(cost_func=cost_func,
|
|
|
+ greater_is_better=greater_is_better,
|
|
|
+ trials_path=trials_path,
|
|
|
+ backup_trials_freq=backup_trials_freq,
|
|
|
+ cross_val_averaging_func=cross_val_averaging_func,
|
|
|
+ additional_metrics=additional_metrics,
|
|
|
+ strategy_name=strategy_name,
|
|
|
+ stdout_log_level=stdout_log_level)
|
|
|
+
|
|
|
+ self._trials = self._trials or []
|
|
|
+
|
|
|
+ def run_trials(self):
|
|
|
+ """
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ assert(self.attached_space)
|
|
|
+ except AssertionError:
|
|
|
+ err = "Parameter distribution space must be attached"
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ done_trial_ids = [{"name": trial["name"],
|
|
|
+ "params": trial["params"],
|
|
|
+ "status": trial["status"]}
|
|
|
+ for trial in self._trials]
|
|
|
+
|
|
|
+ # list (generator) of (flattened) dictionaries
|
|
|
+ # with all different combinations of
|
|
|
+ # parameters for different pipelines
|
|
|
+ # from the space definition.
|
|
|
+ space_unfolded = ({"name": pipeline_dist["name"],
|
|
|
+ "pipeline": pipeline_dist["pipeline"],
|
|
|
+ "params": param_set}
|
|
|
+ for pipeline_dist in self._space
|
|
|
+ for param_set in
|
|
|
+ (dict(ChainMap(*tup)) for tup in
|
|
|
+ product(*[[{k: v} for v in
|
|
|
+ pipeline_dist["params"][k]]
|
|
|
+ for k in pipeline_dist["params"]])))
|
|
|
+
|
|
|
+ for space_element in space_unfolded:
|
|
|
+
|
|
|
+ trial_id = {"name": space_element["name"],
|
|
|
+ "params": space_element["params"],
|
|
|
+ "status": 'ok'}
|
|
|
+
|
|
|
+ if trial_id in done_trial_ids:
|
|
|
+ continue
|
|
|
+
|
|
|
+ result = self._objective(space_element)
|
|
|
+
|
|
|
+ pipeline = space_element["pipeline"].set_params(
|
|
|
+ **space_element["params"])
|
|
|
+
|
|
|
+ trial = {"name": space_element["name"],
|
|
|
+ "params": space_element["params"],
|
|
|
+ "pipeline": pipeline}
|
|
|
+
|
|
|
+ trial.update(result)
|
|
|
+
|
|
|
+ self._trials.append(trial)
|
|
|
+
|
|
|
+ self._backup_trials()
|
|
|
+
|
|
|
+ self.finished_tuning = True
|
|
|
+
|
|
|
+ @property
|
|
|
+ def number_of_trials(self) -> int:
|
|
|
+ """
|
|
|
+ Number of trials already run in the current trials object
|
|
|
+ """
|
|
|
+ if self._trials is None:
|
|
|
+ return 0
|
|
|
+ else:
|
|
|
+ return len(self._trials)
|
|
|
+
|
|
|
+ @property
|
|
|
+ def best_trial(self) -> dict:
|
|
|
+ """
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ assert(self._trials is not None)
|
|
|
+ except AssertionError:
|
|
|
+ err = ("Trials object is empty. "
|
|
|
+ "Call run_trials method.")
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+ try:
|
|
|
+ return max(self._trials, key=lambda x: x["score"])
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Could not retrieve the best trial. "
|
|
|
+ "Exit with error: {}".format(e))
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ @property
|
|
|
+ def best_trial_score(self) -> float:
|
|
|
+ '''
|
|
|
+ '''
|
|
|
+ try:
|
|
|
+ assert(self._trials is not None)
|
|
|
+ except AssertionError:
|
|
|
+ err = ("Trials object is empty. "
|
|
|
+ "Call run_trials method.")
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+ try:
|
|
|
+ return self.best_trial["score"]
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Could not retrieve the best trial. "
|
|
|
+ "Exit with error: {}".format(e))
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ @property
|
|
|
+ def best_trial_score_variance(self) -> float:
|
|
|
+ '''
|
|
|
+ '''
|
|
|
+ try:
|
|
|
+ assert(self._trials is not None)
|
|
|
+ except AssertionError:
|
|
|
+ err = ("Trials object is empty. "
|
|
|
+ "Call run_trials method.")
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+ try:
|
|
|
+ return self.best_trial["score_variance"]
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Could not retrieve the best trial. "
|
|
|
+ "Exit with error: {}".format(e))
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ @property
|
|
|
+ def best_trial_pipeline(self) -> Pipeline:
|
|
|
+ '''
|
|
|
+ '''
|
|
|
+ try:
|
|
|
+ assert(self._trials is not None)
|
|
|
+ except AssertionError:
|
|
|
+ err = ("Trials object is empty. "
|
|
|
+ "Call run_trials method.")
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+ try:
|
|
|
+ return self.best_trial["pipeline"]
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Could not retrieve the best trial. "
|
|
|
+ "Exit with error: {}".format(e))
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ def get_n_best_trial_pipelines(self, n: int) -> list:
|
|
|
+ """
|
|
|
+ N best pipelines with corresponding
|
|
|
+ best hyperparameters
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ assert(isinstance(n, int))
|
|
|
+ except AssertionError:
|
|
|
+ err = "Parameter n must be an int"
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ try:
|
|
|
+ assert(self._trials is not None)
|
|
|
+ except AssertionError:
|
|
|
+ err = ("Trials object is empty. "
|
|
|
+ "Call run_trials method.")
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+ try:
|
|
|
+ return [trial["pipeline"] for trial in
|
|
|
+ sorted(self._trials, key=lambda x: x["score"],
|
|
|
+ reverse=True)[:n]]
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Failed to retrieve n best trials. "
|
|
|
+ "Exit with error: {}".format(e))
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ def get_n_best_trial_pipelines_of_each_type(self, n: int) -> list:
|
|
|
+ """
|
|
|
+ If the hyperparameter search is done over multiple
|
|
|
+ pipelines, then returns n different pipeline-types
|
|
|
+ with corresponding hyperparameters
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ assert(self._trials is not None)
|
|
|
+ except AssertionError:
|
|
|
+ err = ("Trials object is empty. "
|
|
|
+ "Call run_trials method.")
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+ try:
|
|
|
+ return pd.DataFrame(self._trials)\
|
|
|
+ .sort_values(by=["name", "score"],
|
|
|
+ ascending=False)\
|
|
|
+ .groupby("name")\
|
|
|
+ .head(n)[["pipeline"]]
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Failed to retrieve n best trials of each type."
|
|
|
+ "Exit with error: {}".format(e))
|
|
|
+ self._logger.log_and_raise_error(err)
|
|
|
+
|
|
|
+ def trials_to_excel(self, path: str):
|
|
|
+ """
|
|
|
+ Trials object in the shape of table written to excel,
|
|
|
+ should contain the run number, pipeline (as str),
|
|
|
+ hyperparamters (as str), self.best_result (see self._objective method)
|
|
|
+ as well as additional information configured
|
|
|
+ through self.save_result method.
|
|
|
+ """
|
|
|
+ pd.DataFrame(self._trials).to_excel(path)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+
|
|
|
+ # elementary example
|
|
|
+
|
|
|
+ from sklearn.datasets import load_breast_cancer
|
|
|
+ from sklearn.metrics import accuracy_score, precision_score
|
|
|
+ from cdplib.gridsearch.space_sample import space
|
|
|
+ from cdplib.log import Log
|
|
|
+ from cdplib.db_handlers import MongodbHandler
|
|
|
+ import pickle
|
|
|
+ import pandas as pd
|
|
|
+ import os
|
|
|
+
|
|
|
+ trials_path = "gridsearch_trials_TEST.pkl"
|
|
|
+ additional_metrics = {"precision": precision_score}
|
|
|
+ strategy_name = "strategy_1"
|
|
|
+ data_path = "data_TEST.h5"
|
|
|
+ cv_path = "cv_TEST.pkl"
|
|
|
+ collection_name = 'TEST_' + strategy_name
|
|
|
+
|
|
|
+ logger = Log("GridSearchPipelineSelector__TEST:")
|
|
|
+
|
|
|
+ logger.info("Start test")
|
|
|
+
|
|
|
+ data_loader = load_breast_cancer()
|
|
|
+
|
|
|
+ X = data_loader["data"]
|
|
|
+ y = data_loader["target"]
|
|
|
+
|
|
|
+ pd.DataFrame(X).to_hdf(data_path, key="X_train")
|
|
|
+ pd.Series(y).to_hdf(data_path, key="y_train")
|
|
|
+
|
|
|
+ cv = [(list(range(len(X)//3)), list(range(len(X)//3, len(X)))),
|
|
|
+ (list(range(2*len(X)//3)), list(range(2*len(X)//3, len(X))))]
|
|
|
+
|
|
|
+ pickle.dump(cv, open(cv_path, "wb"))
|
|
|
+
|
|
|
+ gs = GridSearchPipelineSelector(cost_func=accuracy_score,
|
|
|
+ greater_is_better=True,
|
|
|
+ trials_path=trials_path,
|
|
|
+ additional_metrics=additional_metrics,
|
|
|
+ strategy_name=strategy_name,
|
|
|
+ stdout_log_level="WARNING")
|
|
|
+
|
|
|
+ gs.attach_space(space=space)
|
|
|
+
|
|
|
+ gs.attach_data_from_hdf5(data_hdf5_store_path=data_path,
|
|
|
+ cv_pickle_path=cv_path)
|
|
|
+
|
|
|
+ save_method = MongodbHandler().insert_data_into_collection
|
|
|
+ save_kwargs = {'collection_name': collection_name}
|
|
|
+
|
|
|
+ gs.configer_summary_saving(save_method=save_method,
|
|
|
+ kwargs=save_kwargs)
|
|
|
+
|
|
|
+ gs.run_trials()
|
|
|
+
|
|
|
+ logger.info("Best trial: {}".format(gs.best_trial))
|
|
|
+
|
|
|
+ for file in [trials_path, data_path, cv_path]:
|
|
|
+ os.remove(file)
|
|
|
+
|
|
|
+ logger.info("End test")
|