123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Wed Sep 30 14:15:17 2020
- @author: tanya
- @description:a class for selecting a machine learning
- pipeline from a deterministic space of parameter distributions
- over multiple pipelines.
- The selection is though in such a way that a Trials object is being
- maintained during the tuning process from which one can retrieve
- the best pipeline so far as well as the entire tuning history
- if needed.
- """
- import os
- import datetime
- import numpy as np
- from itertools import product
- from collections import ChainMap
- from sklearn.pipeline import Pipeline
- from typing import Callable, Optional, Literal, Dict, Union, List
- from cdplib.pipeline_selector.PipelineSelector import PipelineSelector
- class GridSearchPipelineSelector(PipelineSelector):
- """
- A class for selecting a machine learning
- pipeline from a deterministic space of parameter distributions
- over multiple pipelines.
- The selection is though in such a way that a Trials object is being
- maintained during the tuning process from which one can retrieve
- the best pipeline so far as well as the entire tuning history
- if needed.
- """
- def __init__(self,
- cost_func: Union[Callable, str],
- greater_is_better: bool,
- trials_path: str,
- backup_trials_freq: Optional[int] = None,
- cross_val_averaging_func: Callable = np.mean,
- additional_metrics: Optional[Dict[str, Callable]] = None,
- strategy_name: Optional[str] = None,
- stdout_log_level: Literal["INFO", "WARNING", "ERROR"]
- = "INFO"
- ):
- """
- ::param Callable cost_func: function to minimize or maximize
- over the elements of a given (pipeline/hyperparameter) space
- :param bool greater_is_better: when True
- cost_func is maximized, else minimized.
- :param str trials_path: path at which the trials object is saved
- in binary format. From the trials object we can
- select information about the obtained scores, score variations,
- and pipelines, and parameters tried out so far. If a trials object
- already exists at the given path, it is loaded and the
- search is continued, else, the search is started from scratch.
- :param backup_trials_freq: frequecy in interations (trials)
- of saving the trials object at the trials_path.
- if None, the trials object is backed up avery time
- the score improves.
- :param Callable cross_val_averaging_func: Function to aggregate
- the cross-validation scores.
- Example different from the mean: mean - c*var.
- :param additional_metics: dict of additional metrics to save
- of the form {"metric_name": metric} where metric is a Callable.
- :param str strategy_name:
- a strategy is defined by the data set (columns/features and rows),
- cv object, cost function.
- When the strategy changes, one must start with new trials.
- :param str stdout_log_level: can be INFO, WARNING, ERROR
- """
- try:
- super().__init__(cost_func=cost_func,
- greater_is_better=greater_is_better,
- trials_path=trials_path,
- backup_trials_freq=backup_trials_freq,
- cross_val_averaging_func=cross_val_averaging_func,
- additional_metrics=additional_metrics,
- strategy_name=strategy_name,
- stdout_log_level=stdout_log_level)
- self._logger = Log("GridsearchPipelineSelector: ",
- stdout_log_level=stdout_log_level)
- self._trials = self._trials or []
- except Exception as e:
- err = "Failed initialization. Exit with error: {}".format(e)
- self._logger.log_and_raise_error(err)
- def run_trials(self) -> None:
- """
- """
- try:
- assert(self.attached_space),\
- "Parameter distribution space must be attached"
- done_trial_ids = [{"name": trial["name"],
- "params": trial["params"],
- "status": trial["status"]}
- for trial in self._trials]
- # list (generator) of (flattened) dictionaries
- # with all different combinations of
- # parameters for different pipelines
- # from the space definition.
- space_unfolded = ({"name": param_dist["name"],
- "pipeline": param_dist["pipeline"],
- "params": param_set}
- for param_dist in self._space
- for param_set in
- (dict(ChainMap(*tup)) for tup in
- product(*[[{k: v} for v in
- param_dist["params"][k]]
- for k in param_dist["params"]])))
- for space_element in space_unfolded:
- # uniquely identifies the current space element
- trial_id = {"name": space_element["name"],
- "params": space_element["params"],
- "status": 'ok'}
- # verify if the current pipline/parameters
- # were already tested before
- if trial_id in done_trial_ids:
- continue
- result = self._objective(space_element)
- pipeline = space_element["pipeline"].set_params(
- **space_element["params"])
- trial = {"name": space_element["name"],
- "params": space_element["params"],
- "pipeline": pipeline}
- trial.update(result)
- self._trials.append(trial)
- self.finished_tuning = True
- self.total_tuning_time = datetime.datetime.today()\
- - self.start_tuning_time
- self._backup_trials()
- except Exception as e:
- err = "Failed to run trials. Exit with error: {}".format(e)
- self._logger.log_and_raise_error(err)
- @property
- def number_of_trials(self) -> Union[int, None]:
- """
- Number of trials already run in the current trials object
- """
- try:
- return len(self._trials)
- except Exception as e:
- err = ("Failed to retrieve the number of trials. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- @property
- def best_trial(self) -> Union[dict, None]:
- """
- """
- try:
- assert(len(self._trials) > 0),\
- ("Trials object is empty. "
- "Call run_trials method.")
- return max(self._trials, key=lambda x: x["score"])
- except Exception as e:
- err = ("Could not retrieve the best trial. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- @property
- def best_trial_score(self) -> Union[float, None]:
- '''
- '''
- try:
- assert(len(self._trials) > 0),\
- ("Trials object is empty. "
- "Call run_trials method.")
- return self.best_trial["score"]
- except Exception as e:
- err = ("Could not retrieve the best trial. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- @property
- def best_trial_score_variance(self) -> Union[float, None]:
- '''
- '''
- try:
- assert(len(self._trials) > 0),\
- ("Trials object is empty. "
- "Call run_trials method.")
- return self.best_trial["score_variance"]
- except Exception as e:
- err = ("Could not retrieve the best trial. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- @property
- def best_trial_pipeline(self) -> Union[Pipeline, None]:
- '''
- '''
- try:
- assert(len(self._trials) > 0),\
- ("Trials object is empty. "
- "Call run_trials method.")
- return self.best_trial["pipeline"]
- except Exception as e:
- err = ("Could not retrieve the best trial. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- def get_n_best_trial_pipelines(self, n: int)\
- -> Union[List[Pipeline], None]:
- """
- N best pipelines with corresponding
- best hyperparameters
- """
- try:
- assert(len(self._trials) > 0),\
- ("Trials object is empty. "
- "Call run_trials method.")
- return [trial["pipeline"] for trial in
- sorted(self._trials, key=lambda x: x["score"],
- reverse=True)[:n]]
- except Exception as e:
- err = ("Failed to retrieve n best trials. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- def get_n_best_trial_pipelines_of_each_type(self, n: int)\
- -> Union[Dict[str, List[Pipeline]], None]:
- """
- If the hyperparameter search is done over multiple
- pipelines, then returns n different pipeline-types
- with corresponding hyperparameters
- """
- try:
- assert(len(self._trials) > 0),\
- ("Trials object is empty. "
- "Call run_trials method.")
- return pd.DataFrame(self._trials)\
- .sort_values(by=["name", "score"],
- ascending=False)\
- .groupby("name")\
- .head(n)\
- .groupby("name")["pipeline"]\
- .apply(lambda x: list(x))\
- .to_dict()
- except Exception as e:
- err = ("Failed to retrieve n best trials of each type."
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- def trials_to_excel(self, path: str) -> None:
- """
- Trials object in the shape of table written to excel,
- should contain the run number, pipeline (as str),
- hyperparamters (as str), self.best_result (see self._objective method)
- as well as additional information configured
- through self.save_result method.
- """
- try:
- pd.DataFrame(self._trials).to_excel(path)
- except Exception as e:
- err = ("Failed to write trials to excel. "
- "Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- if __name__ == "__main__":
- # elementary example
- from sklearn.datasets import load_breast_cancer
- from sklearn.metrics import accuracy_score, precision_score
- from cdplib.gridsearch.space_sample import space
- from cdplib.log import Log
- from cdplib.db_handlers import MongodbHandler
- import pickle
- import pandas as pd
- import os
- trials_path = "gridsearch_trials_TEST.pkl"
- additional_metrics = {"precision": precision_score}
- strategy_name = "strategy_1"
- data_path = "data_TEST.h5"
- cv_path = "cv_TEST.pkl"
- collection_name = 'TEST_' + strategy_name
- logger = Log("GridSearchPipelineSelector__TEST:")
- logger.info("Start test")
- data_loader = load_breast_cancer()
- X = data_loader["data"]
- y = data_loader["target"]
- pd.DataFrame(X).to_hdf(data_path, key="X_train")
- pd.Series(y).to_hdf(data_path, key="y_train")
- cv = [(list(range(len(X)//3)), list(range(len(X)//3, len(X)))),
- (list(range(2*len(X)//3)), list(range(2*len(X)//3, len(X))))]
- pickle.dump(cv, open(cv_path, "wb"))
- gs = GridSearchPipelineSelector(cost_func=accuracy_score,
- greater_is_better=True,
- trials_path=trials_path,
- additional_metrics=additional_metrics,
- strategy_name=strategy_name,
- stdout_log_level="WARNING")
- gs.attach_space(space=space)
- gs.attach_data_from_hdf5(data_hdf5_store_path=data_path,
- cv_pickle_path=cv_path)
- save_method = MongodbHandler().insert_data_into_collection
- save_kwargs = {'collection_name': collection_name}
- gs.configer_summary_saving(save_method=save_method,
- kwargs=save_kwargs)
- gs.run_trials()
- logger.info("Best trial: {}".format(gs.best_trial))
- logger.info("Total tuning time: {}".format(gs.total_tuning_time))
- for file in [trials_path, data_path, cv_path]:
- os.remove(file)
- logger.info("End test")
|