123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Wed Sep 30 14:23:23 2020
- @author: tanya
- @description: an abstract class for selecting a machine learning
- pipeline in a space of parameter distributions over multiple pipelines.
- The selection is though in such a way that a Trials object is being
- maintained during the tuning process from which one can retrieve
- the best pipeline so far as well as the entire tuning history
- if needed.
- Children classes: hyperopt and custom gridsearch.
- """
- import pickle
- import os
- import sys
- import time
- import datetime
- from typing import Callable
- import numpy as np
- import pandas as pd
- from abc import ABC, abstractmethod, abstractproperty
- from sklearn.pipeline import Pipeline
- from sklearn.model_selection import cross_validate as sklearn_cross_validator
- from sklearn.metrics import make_scorer
- from hyperopt import STATUS_OK, STATUS_FAIL
- from cdplib.log import Log
- from cdplib.utils import ExceptionsHandler
- from cdplib.utils import LoadingUtils
- sys.path.append(os.getcwd())
- class PipelineSelector(ABC):
- """
- An abstract class for selecting a machine learning
- pipeline in a space of parameter distributions over multiple pipelines.
- The selection is though in such a way that a Trials object is being
- maintained during the tuning process from which one can retrieve
- the best pipeline so far as well as the entire tuning history
- if needed.
- """
- def __init__(self,
- cost_func,
- greater_is_better: bool,
- trials_path: str,
- backup_trials_freq: int = 1,
- cross_val_averaging_func: callable = None):
- '''
- :param callable cost_func: function to minimize or maximize
- :param bool greater_is_better: when True
- cost_func is maximized, else minimized.
- :param str trials_path: path at which the trials object is saved
- in binary format. From the trials object we can
- select information about the obtained scores, score variations,
- and pipelines, and parameters tried out so far. If a trials object
- already exists at the given path, it is loaded and the
- search is continued, else, the search is started from
- the beginning.
- :param backup_trials_freq: frequecy in interations (trials)
- of saving the trials object at the trials_path.
- :param str log_path: Optional, when not provided logs to stdout.
- :param callable cross_val_averaging_func: optional,
- when not provided set to mean. Function
- to aggregate the cross-validated values of the cost function.
- Classic situation is to take the mean,
- another example is, for example mean() - c*var().
- '''
- self._logger = Log("PipelineSelector: ")
- input_errors = [(cost_func, Callable,
- "Parameter 'cost_func' must be a callable"),
- (greater_is_better, bool,
- "Parameter 'greater_is_better' must be bool type"),
- (trials_path, str,
- "Parameter 'trials_path' must be of string type"),
- (cross_val_averaging_func, (Callable, None.__class__),
- ("Parameter 'cross_val_averaging_func'"
- "must be a callable")),
- (backup_trials_freq, int,
- "Parameter backup_trials_freq must be an int")]
- for p, t, err in input_errors:
- try:
- assert(isinstance(p, t))
- except AssertionError:
- self._logger.log_and_raise_error(err)
- ExceptionsHandler(self._logger).assert_is_directory(path=trials_path)
- self._cost_func = cost_func
- # is 1 when cost_func is minimized, -1 when cost func is maximized
- self._score_factor = (not greater_is_better) - greater_is_better
- self._trials_path = trials_path
- self._backup_trials_freq = backup_trials_freq
- self._cross_val_averaging_func = cross_val_averaging_func or np.mean
- # keeping track of the current search iteration
- self._run_number = 0
- # space and data need to be attached to perform search.
- self._space_attached = False
- self._data_attached = False
- self._cross_validator_attached = False
- # _best_score is the same as best_trial_score property
- # but is defined in order not to go through all the trials
- # at each iteration.
- self._best_score = np.nan
- # if a trials object already exists at the given path,
- # it is loaded and the search is continued. Else,
- # the search is started from the beginning.
- if os.path.isfile(trials_path):
- try:
- with open(trials_path, "rb") as f:
- self._trials = pickle.load(f)
- self._best_score = self.best_trial_score
- self._logger.info(("Loaded an existing trials object"
- "Consisting of {} trials")
- .format(len(self._trials.trials)))
- except Exception as e:
- err = ("Trials object could not be loaded. "
- "Exit with error {}").format(e)
- self._logger.log_and_raise_error(err)
- self._trials = None
- else:
- self._logger.warning(("No existing trials object was found, "
- "Starting from scratch."))
- self._trials = None
- def _backup_trials(self):
- '''
- Pickles (Saves) the trials object.
- Used in a scheduler.
- '''
- with open(self._trials_path, "wb") as f:
- pickle.dump(self._trials, f)
- def attach_cross_validator(self, cross_validator: Callable = None,
- module_path: str = None,
- name: str = None):
- """
- Method for attaching a custom cross-validation function
- :param cross_validator: a function that has the same
- signature as sklearn.model_selection.cross_validate
- :param str module_path: path to python module
- where the space is defined. Optional when
- the space is provided directly.
- :param str name: name of the space loaded from
- a python module. Optional when the space
- is provided directly.
- """
- try:
- assert((cross_validator is not None) or
- ((module_path is not None) and (name is not None)))
- except AssertionError:
- err = ("Either cross_validator or "
- "(module_path, name) must be provided")
- self._logger.log_and_raise_error(err)
- self._cross_validator = cross_validator or\
- LoadingUtils().load_from_module(module_path=module_path, name=name)
- self._logger.info("Attached a cross validator")
- self._cross_validator_attached = True
- def attach_space(self, space=None,
- module_path: str = None,
- name: str = None):
- '''
- :param space: space where
- the search is performed. Optional when a space
- is loaded from a python module. A space might be either
- a list of dictionaries or a hyperopt space object
- the elements of which are dictionaries with keys:
- name, pipeline, params
- :param str module_path: path to python module
- where the space is defined. Optional when
- the space is provided directly.
- :param str name: name of the space loaded from
- a python module. Optional when the space
- is provided directly.
- '''
- try:
- assert((space is not None) or
- ((module_path is not None) and (name is not None)))
- except AssertionError:
- err = "Either space or (module_path, name) must be provided"
- self._logger.log_and_raise_error(err)
- self._space = space or LoadingUtils().load_from_module(
- module_path=module_path, name=name)
- self._logger.info("Attached parameter distribution space")
- self._space_attached = True
- def attach_data(self, X_train: (pd.DataFrame, np.ndarray),
- y_train: (pd.DataFrame, pd.Series, np.ndarray) = None,
- X_val: (pd.DataFrame, np.ndarray) = None,
- y_val: (pd.DataFrame, pd.Series, np.ndarray) = None,
- cv: (list, int) = None):
- '''
- :param array X_train: data on which
- machine learning pipelines are trained
- :param array y_train: optional, vector with targets,
- (not all algorithms require a targets)
- :param array X_val: optional, validation data.
- When not provided, cross-validated value
- of the cost_func is calculated.
- :param array y_val: optional, validation targets
- :param list cv: list of tuples containing
- train and validation indices or an integer representing
- the number of folds for a random split of data
- during cross-validation
- example: [([0,1,2], [3,4]), ([1,2,3], [4,5])]
- '''
- NoneType = None.__class__
- input_err = "Non-valid combination of train and val data types"
- if cv is None:
- try:
- assert(isinstance(X_train, (pd.DataFrame, np.ndarray)) and
- isinstance(X_val, (pd.DataFrame, np.ndarray)) and
- isinstance(y_train, (pd.Series, np.ndarray,
- pd.DataFrame, NoneType)) and
- isinstance(y_val, (pd.Series, np.ndarray)) and
- ((y_val is None) if (y_train is None)
- else (y_val is not None)))
- except AssertionError:
- self._logger.log_and_raise_error(input_err)
- # cost is evaluated with a cross validation function
- # that accepts an array and a cv object with
- # indices of the fold splits.
- # Here we create a trivial cv object
- # with one validation split.
- train_inds = list(range(len(X_train)))
- val_inds = list(range(len(X_train),
- len(X_train) + len(X_val)))
- self._cv = [(train_inds, val_inds)]
- self._X = np.concatenate([X_train, X_val])
- self._y = None if y_train is None\
- else np.concatenate([y_train, y_val])
- else:
- try:
- assert(isinstance(X_train, (pd.DataFrame, np.array)) and
- isinstance(y_train, (pd.Series, np.array,
- pd.DataFrame, NoneType)) and
- (X_val is None) and (y_val is None))
- except AssertionError:
- self._logger.log_and_raise_error(input_err)
- self._cv = cv
- self._X = X_train
- self._y = y_train
- self._logger.info("Attached data")
- self._data_attached = True
- def _evaluate(self, pipeline: Pipeline) -> dict:
- '''
- This method is called in _objective.
- Calculates the cost on the attached data.
- This function can be overriden, when the cost
- needs to be calculated differently,
- for example with a tensorflow model.
- :param Pipeline pipeline: machine learning pipeline
- that will be evaluated with cross-validation
- :output: dictionary with the aggregated
- cross-validation score and
- the score variance.
- '''
- if not self._cross_validator_attached:
- self._cross_validator = sklearn_cross_validator
- scores = self._cross_validator(
- estimator=pipeline,
- X=self._X,
- y=self._y,
- cv=self._cv or 5,
- scoring=make_scorer(self._cost_func),
- error_score=np.nan)
- return {'value': self._cross_val_averaging_func(scores['test_score']),
- 'variance': np.var(scores['test_score'])}
- def _objective(self, space_element: dict) -> dict:
- '''
- This method is called in search_for_best_pipeline
- inside the hyperopt fmin method.
- Uses _evaluate method.
- It must take as input a space element
- and produce an output in the form of dictionary
- with 2 obligatory values loss and status
- (STATUS_OK or STATUS_FAIL). Other
- values in the output are optional and can be
- accessed later through the trials object.
- :Warning: fmin minimizes the loss,
- when _evaluate returns a value to be maximized,
- it should be multiplied by -1 to obtain loss.
- :param dict space_element: must contain keys
- name (with the name of the pipeline),
- pipeline (Pipeline object),
- params (dict of pipeline params)
- :output: dictionary with keys
- loss (minimized value),
- status with values STATUS_OK or STATUS_FAIL
- uderstood by hyperopt,
- score (equal to loss or -loss),
- score_variance,
- timestamp (end of execution),
- train_time: execution time
- '''
- try:
- assert(isinstance(space_element, dict) and
- set(['name', 'pipeline', 'params']) <= space_element.keys())
- assert(isinstance(space_element['name'], str) and
- isinstance(space_element['pipeline'], Pipeline) and
- isinstance(space_element['params'], dict))
- except AssertionError:
- err = "Space elements are of wrong form"
- self._logger.log_and_raise_error(err)
- start_time = time.time()
- if not self._data_attached:
- err = ("Data must be attached in order "
- "in order to effectuate the best"
- "pipeline search")
- self._logger.log_and_raise_error(err)
- self._run_number += 1
- pipeline = space_element['pipeline']
- params = space_element['params']
- pipeline.set_params(**params)
- self._logger.info(("Run number {0}: "
- "Current score is {1}: "
- "Training pipeline {2} "
- "with parameters: {3}. ").format(
- self._run_number,
- self._best_score,
- space_element['name'],
- params))
- try:
- result = self._evaluate(pipeline)
- assert(not np.isnan(result["value"]))
- if self._run_number % self._backup_trials_freq == 0:
- self._backup_trials()
- if (self._best_score != self._best_score) or\
- self._score_factor*result["value"] <\
- self._score_factor*self._best_score:
- self._logger.info("Score got better, new best score is: {}"
- .format(result["value"]))
- self._best_score = result['value']
- end_time = time.time()
- return {'loss': self._score_factor * result["value"],
- 'status': STATUS_OK,
- 'score': result["value"],
- 'score_variance': result["variance"],
- 'timestamp': datetime.datetime.today(),
- 'train_time': end_time - start_time}
- except Exception as e:
- self._logger.warning("Trial failed with error {}".format(e))
- return {'loss': np.nan,
- 'status': STATUS_FAIL,
- 'score': np.nan,
- 'score_variance': np.nan,
- 'timestamp': datetime.datetime.today(),
- 'train_time': np.nan}
- @abstractmethod
- def run_trials(self):
- """
- """
- pass
- @abstractproperty
- def best_trial(self) -> float:
- """
- """
- pass
- @abstractproperty
- def best_trial_score(self) -> float:
- """
- """
- pass
- @abstractproperty
- def best_trial_score_variance(self) -> float:
- """
- """
- pass
- @abstractproperty
- def best_trial_pipeline(self) -> Pipeline:
- """
- """
- pass
|