#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Wed Sep 30 14:15:17 2020 @author: tanya @description:a class for selecting a machine learning pipeline from a deterministic space of parameter distributions over multiple pipelines. The selection is though in such a way that a Trials object is being maintained during the tuning process from which one can retrieve the best pipeline so far as well as the entire tuning history if needed. """ import os import datetime import numpy as np from copy import deepcopy from itertools import product from collections import ChainMap from sklearn.pipeline import Pipeline from typing import Callable, Optional, Literal, Dict, Union, List from cdplib.log import Log from cdplib.pipeline_selector.PipelineSelector import PipelineSelector class GridSearchPipelineSelector(PipelineSelector): """ A class for selecting a machine learning pipeline from a deterministic space of parameter distributions over multiple pipelines. The selection is though in such a way that a Trials object is being maintained during the tuning process from which one can retrieve the best pipeline so far as well as the entire tuning history if needed. """ def __init__(self, cost_func: Union[Callable, str], greater_is_better: bool, trials_path: str, backup_trials_freq: Optional[int] = None, cross_validation_needs_scorer: bool = True, cross_val_averaging_func: Callable = np.mean, additional_metrics: Optional[Dict[str, Callable]] = None, strategy_name: Optional[str] = None, stdout_log_level: Literal["INFO", "WARNING", "ERROR"] = "INFO" ): """ ::param Callable cost_func: function to minimize or maximize over the elements of a given (pipeline/hyperparameter) space :param bool greater_is_better: when True cost_func is maximized, else minimized. :param str trials_path: path at which the trials object is saved in binary format. From the trials object we can select information about the obtained scores, score variations, and pipelines, and parameters tried out so far. If a trials object already exists at the given path, it is loaded and the search is continued, else, the search is started from scratch. :param backup_trials_freq: frequecy in interations (trials) of saving the trials object at the trials_path. if None, the trials object is backed up avery time the score improves. :param Callable cross_val_averaging_func: Function to aggregate the cross-validation scores. Example different from the mean: mean - c*var. :param additional_metics: dict of additional metrics to save of the form {"metric_name": metric} where metric is a Callable. :param str strategy_name: a strategy is defined by the data set (columns/features and rows), cv object, cost function. When the strategy changes, one must start with new trials. :param str stdout_log_level: can be INFO, WARNING, ERROR """ try: super().__init__(cost_func=cost_func, greater_is_better=greater_is_better, trials_path=trials_path, backup_trials_freq=backup_trials_freq, cross_validation_needs_scorer= cross_validation_needs_scorer, cross_val_averaging_func=cross_val_averaging_func, additional_metrics=additional_metrics, strategy_name=strategy_name, stdout_log_level=stdout_log_level) self._logger = Log("GridsearchPipelineSelector: ", stdout_log_level=stdout_log_level) self._trials = self._trials or [] except Exception as e: err = "Failed initialization. Exit with error: {}".format(e) self._logger.log_and_raise_error(err) def run_trials(self) -> None: """ """ try: assert(self.attached_space),\ "Parameter distribution space must be attached" # XXX Tanya: if the list of values is empty # in the space element, remove it done_trial_ids = [{"name": trial["name"], "params": trial["params"], "status": trial["status"]} for trial in self._trials] # list (generator) of (flattened) dictionaries # with all different combinations of # parameters for different pipelines # from the space definition. space_unfolded = ({"name": param_dist["name"], "pipeline": param_dist["pipeline"], "params": param_set} for param_dist in self._space for param_set in (dict(ChainMap(*tup)) for tup in product(*[[{k: v} for v in param_dist["params"][k]] for k in param_dist["params"]]))) for space_element in space_unfolded: # uniquely identifies the current space element trial_id = {"name": space_element["name"], "params": space_element["params"], "status": 'ok'} # verify if the current pipline/parameters # were already tested before if trial_id in done_trial_ids: continue result = self._objective(space_element) pipeline = deepcopy(space_element["pipeline"]) pipeline = pipeline.set_params(**space_element["params"]) trial = {"name": space_element["name"], "params": space_element["params"], "pipeline": pipeline} trial.update(result) self._trials.append(trial) self.finished_tuning = True self.total_tuning_time = datetime.datetime.today()\ - self.start_tuning_time self._backup_trials() except Exception as e: err = "Failed to run trials. Exit with error: {}".format(e) self._logger.log_and_raise_error(err) @property def number_of_trials(self) -> Union[int, None]: """ Number of trials already run in the current trials object """ try: return len(self._trials) except Exception as e: err = ("Failed to retrieve the number of trials. " "Exit with error: {}".format(e)) self._logger.log_and_raise_error(err) @property def best_trial(self) -> Union[dict, None]: """ """ try: assert(len(self._trials) > 0),\ ("Trials object is empty. " "Call run_trials method.") return max(self._trials, key=lambda x: x["score"]) except Exception as e: err = ("Could not retrieve the best trial. " "Exit with error: {}".format(e)) self._logger.log_and_raise_error(err) @property def best_trial_score(self) -> Union[float, None]: ''' ''' try: assert(len(self._trials) > 0),\ ("Trials object is empty. " "Call run_trials method.") return self.best_trial["score"] except Exception as e: err = ("Could not retrieve the best trial. " "Exit with error: {}".format(e)) self._logger.log_and_raise_error(err) @property def best_trial_score_variance(self) -> Union[float, None]: ''' ''' try: assert(len(self._trials) > 0),\ ("Trials object is empty. " "Call run_trials method.") return self.best_trial["score_variance"] except Exception as e: err = ("Could not retrieve the best trial. " "Exit with error: {}".format(e)) self._logger.log_and_raise_error(err) @property def best_trial_pipeline(self) -> Union[Pipeline, None]: ''' ''' try: assert(len(self._trials) > 0),\ ("Trials object is empty. " "Call run_trials method.") return self.best_trial["pipeline"] except Exception as e: err = ("Could not retrieve the best trial. " "Exit with error: {}".format(e)) self._logger.log_and_raise_error(err) def get_n_best_trial_pipelines(self, n: int)\ -> Union[List[Pipeline], None]: """ N best pipelines with corresponding best hyperparameters """ try: assert(len(self._trials) > 0),\ ("Trials object is empty. " "Call run_trials method.") return [trial["pipeline"] for trial in sorted(self._trials, key=lambda x: x["score"], reverse=True)[:n]] except Exception as e: err = ("Failed to retrieve n best trials. " "Exit with error: {}".format(e)) self._logger.log_and_raise_error(err) def get_n_best_trial_pipelines_of_each_type(self, n: int)\ -> Union[Dict[str, List[Pipeline]], None]: """ If the hyperparameter search is done over multiple pipelines, then returns n different pipeline-types with corresponding hyperparameters """ try: assert(len(self._trials) > 0),\ ("Trials object is empty. " "Call run_trials method.") return pd.DataFrame(self._trials)\ .sort_values(by=["name", "score"], ascending=False)\ .groupby("name")\ .head(n)\ .groupby("name")["pipeline"]\ .apply(lambda x: list(x))\ .to_dict() except Exception as e: err = ("Failed to retrieve n best trials of each type." "Exit with error: {}".format(e)) self._logger.log_and_raise_error(err) def trials_to_excel(self, path: str) -> None: """ Trials object in the shape of table written to excel, should contain the run number, pipeline (as str), hyperparamters (as str), self.best_result (see self._objective method) as well as additional information configured through self.save_result method. """ try: pd.DataFrame(self._trials).to_excel(path) except Exception as e: err = ("Failed to write trials to excel. " "Exit with error: {}".format(e)) self._logger.log_and_raise_error(err) if __name__ == "__main__": # elementary example from sklearn.datasets import load_breast_cancer from sklearn.metrics import accuracy_score, precision_score from cdplib.gridsearch.space_sample import space from cdplib.db_handlers import MongodbHandler import pickle import pandas as pd trials_path = "gridsearch_trials_TEST.pkl" additional_metrics = {"precision": precision_score} strategy_name = "strategy_1" data_path = "data_TEST.h5" cv_path = "cv_TEST.pkl" collection_name = 'TEST_' + strategy_name logger = Log("GridSearchPipelineSelector__TEST:") logger.info("Start test") data_loader = load_breast_cancer() X = data_loader["data"] y = data_loader["target"] pd.DataFrame(X).to_hdf(data_path, key="X_train") pd.Series(y).to_hdf(data_path, key="y_train") cv = [(list(range(len(X)//3)), list(range(len(X)//3, len(X)))), (list(range(2*len(X)//3)), list(range(2*len(X)//3, len(X))))] pickle.dump(cv, open(cv_path, "wb")) gs = GridSearchPipelineSelector(cost_func=accuracy_score, greater_is_better=True, trials_path=trials_path, additional_metrics=additional_metrics, strategy_name=strategy_name, stdout_log_level="WARNING") gs.attach_space(space=space) gs.attach_data_from_hdf5(data_hdf5_store_path=data_path, cv_pickle_path=cv_path) save_method = MongodbHandler().insert_data_into_collection save_kwargs = {'collection_name': collection_name} gs.configer_summary_saving(save_method=save_method, kwargs=save_kwargs) gs.run_trials() logger.info("Best trial: {}".format(gs.best_trial)) logger.info("Total tuning time: {}".format(gs.total_tuning_time)) for file in [trials_path, data_path, cv_path]: os.remove(file) logger.info("End test") # XXX Tanya check warnings