PipelineSelector.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Wed Sep 30 14:23:23 2020
  5. @author: tanya
  6. @description: an abstract class for selecting a machine learning
  7. pipeline in a space of parameter distributions over multiple pipelines.
  8. The selection is though in such a way that a Trials object is being
  9. maintained during the tuning process from which one can retrieve
  10. the best pipeline so far as well as the entire tuning history
  11. if needed.
  12. Children classes: hyperopt and custom gridsearch.
  13. """
  14. import pickle
  15. import os
  16. import sys
  17. import time
  18. import datetime
  19. from typing import Callable
  20. import numpy as np
  21. import pandas as pd
  22. from abc import ABC, abstractmethod, abstractproperty
  23. from sklearn.pipeline import Pipeline
  24. from sklearn.model_selection import cross_validate as sklearn_cross_validator
  25. from sklearn.metrics import make_scorer
  26. from hyperopt import STATUS_OK, STATUS_FAIL
  27. from cdplib.log import Log
  28. from cdplib.utils import ExceptionsHandler
  29. from cdplib.utils import LoadingUtils
  30. sys.path.append(os.getcwd())
  31. class PipelineSelector(ABC):
  32. """
  33. An abstract class for selecting a machine learning
  34. pipeline in a space of parameter distributions over multiple pipelines.
  35. The selection is though in such a way that a Trials object is being
  36. maintained during the tuning process from which one can retrieve
  37. the best pipeline so far as well as the entire tuning history
  38. if needed.
  39. """
  40. def __init__(self,
  41. cost_func,
  42. greater_is_better: bool,
  43. trials_path: str,
  44. backup_trials_freq: int = 1,
  45. cross_val_averaging_func: callable = None):
  46. '''
  47. :param callable cost_func: function to minimize or maximize
  48. :param bool greater_is_better: when True
  49. cost_func is maximized, else minimized.
  50. :param str trials_path: path at which the trials object is saved
  51. in binary format. From the trials object we can
  52. select information about the obtained scores, score variations,
  53. and pipelines, and parameters tried out so far. If a trials object
  54. already exists at the given path, it is loaded and the
  55. search is continued, else, the search is started from
  56. the beginning.
  57. :param backup_trials_freq: frequecy in interations (trials)
  58. of saving the trials object at the trials_path.
  59. :param str log_path: Optional, when not provided logs to stdout.
  60. :param callable cross_val_averaging_func: optional,
  61. when not provided set to mean. Function
  62. to aggregate the cross-validated values of the cost function.
  63. Classic situation is to take the mean,
  64. another example is, for example mean() - c*var().
  65. '''
  66. self._logger = Log("PipelineSelector: ")
  67. input_errors = [(cost_func, Callable,
  68. "Parameter 'cost_func' must be a callable"),
  69. (greater_is_better, bool,
  70. "Parameter 'greater_is_better' must be bool type"),
  71. (trials_path, str,
  72. "Parameter 'trials_path' must be of string type"),
  73. (cross_val_averaging_func, (Callable, None.__class__),
  74. ("Parameter 'cross_val_averaging_func'"
  75. "must be a callable")),
  76. (backup_trials_freq, int,
  77. "Parameter backup_trials_freq must be an int")]
  78. for p, t, err in input_errors:
  79. try:
  80. assert(isinstance(p, t))
  81. except AssertionError:
  82. self._logger.log_and_raise_error(err)
  83. ExceptionsHandler(self._logger).assert_is_directory(path=trials_path)
  84. self._cost_func = cost_func
  85. # is 1 when cost_func is minimized, -1 when cost func is maximized
  86. self._score_factor = (not greater_is_better) - greater_is_better
  87. self._trials_path = trials_path
  88. self._backup_trials_freq = backup_trials_freq
  89. self._cross_val_averaging_func = cross_val_averaging_func or np.mean
  90. # keeping track of the current search iteration
  91. self._run_number = 0
  92. # space and data need to be attached to perform search.
  93. self._space_attached = False
  94. self._data_attached = False
  95. self._cross_validator_attached = False
  96. # _best_score is the same as best_trial_score property
  97. # but is defined in order not to go through all the trials
  98. # at each iteration.
  99. self._best_score = np.nan
  100. # if a trials object already exists at the given path,
  101. # it is loaded and the search is continued. Else,
  102. # the search is started from the beginning.
  103. if os.path.isfile(trials_path):
  104. try:
  105. with open(trials_path, "rb") as f:
  106. self._trials = pickle.load(f)
  107. self._best_score = self.best_trial_score
  108. self._logger.info(("Loaded an existing trials object"
  109. "Consisting of {} trials")
  110. .format(len(self._trials.trials)))
  111. except Exception as e:
  112. err = ("Trials object could not be loaded. "
  113. "Exit with error {}").format(e)
  114. self._logger.log_and_raise_error(err)
  115. self._trials = None
  116. else:
  117. self._logger.warning(("No existing trials object was found, "
  118. "Starting from scratch."))
  119. self._trials = None
  120. def _backup_trials(self):
  121. '''
  122. Pickles (Saves) the trials object.
  123. Used in a scheduler.
  124. '''
  125. with open(self._trials_path, "wb") as f:
  126. pickle.dump(self._trials, f)
  127. def attach_cross_validator(self, cross_validator: Callable = None,
  128. module_path: str = None,
  129. name: str = None):
  130. """
  131. Method for attaching a custom cross-validation function
  132. :param cross_validator: a function that has the same
  133. signature as sklearn.model_selection.cross_validate
  134. :param str module_path: path to python module
  135. where the space is defined. Optional when
  136. the space is provided directly.
  137. :param str name: name of the space loaded from
  138. a python module. Optional when the space
  139. is provided directly.
  140. """
  141. try:
  142. assert((cross_validator is not None) or
  143. ((module_path is not None) and (name is not None)))
  144. except AssertionError:
  145. err = ("Either cross_validator or "
  146. "(module_path, name) must be provided")
  147. self._logger.log_and_raise_error(err)
  148. self._cross_validator = cross_validator or\
  149. LoadingUtils().load_from_module(module_path=module_path, name=name)
  150. self._logger.info("Attached a cross validator")
  151. self._cross_validator_attached = True
  152. def attach_space(self, space=None,
  153. module_path: str = None,
  154. name: str = None):
  155. '''
  156. :param space: space where
  157. the search is performed. Optional when a space
  158. is loaded from a python module. A space might be either
  159. a list of dictionaries or a hyperopt space object
  160. the elements of which are dictionaries with keys:
  161. name, pipeline, params
  162. :param str module_path: path to python module
  163. where the space is defined. Optional when
  164. the space is provided directly.
  165. :param str name: name of the space loaded from
  166. a python module. Optional when the space
  167. is provided directly.
  168. '''
  169. try:
  170. assert((space is not None) or
  171. ((module_path is not None) and (name is not None)))
  172. except AssertionError:
  173. err = "Either space or (module_path, name) must be provided"
  174. self._logger.log_and_raise_error(err)
  175. self._space = space or LoadingUtils().load_from_module(
  176. module_path=module_path, name=name)
  177. self._logger.info("Attached parameter distribution space")
  178. self._space_attached = True
  179. def attach_data(self, X_train: (pd.DataFrame, np.ndarray),
  180. y_train: (pd.DataFrame, pd.Series, np.ndarray) = None,
  181. X_val: (pd.DataFrame, np.ndarray) = None,
  182. y_val: (pd.DataFrame, pd.Series, np.ndarray) = None,
  183. cv: (list, int) = None):
  184. '''
  185. :param array X_train: data on which
  186. machine learning pipelines are trained
  187. :param array y_train: optional, vector with targets,
  188. (not all algorithms require a targets)
  189. :param array X_val: optional, validation data.
  190. When not provided, cross-validated value
  191. of the cost_func is calculated.
  192. :param array y_val: optional, validation targets
  193. :param list cv: list of tuples containing
  194. train and validation indices or an integer representing
  195. the number of folds for a random split of data
  196. during cross-validation
  197. example: [([0,1,2], [3,4]), ([1,2,3], [4,5])]
  198. '''
  199. NoneType = None.__class__
  200. input_err = "Non-valid combination of train and val data types"
  201. if cv is None:
  202. try:
  203. assert(isinstance(X_train, (pd.DataFrame, np.ndarray)) and
  204. isinstance(X_val, (pd.DataFrame, np.ndarray)) and
  205. isinstance(y_train, (pd.Series, np.ndarray,
  206. pd.DataFrame, NoneType)) and
  207. isinstance(y_val, (pd.Series, np.ndarray)) and
  208. ((y_val is None) if (y_train is None)
  209. else (y_val is not None)))
  210. except AssertionError:
  211. self._logger.log_and_raise_error(input_err)
  212. # cost is evaluated with a cross validation function
  213. # that accepts an array and a cv object with
  214. # indices of the fold splits.
  215. # Here we create a trivial cv object
  216. # with one validation split.
  217. train_inds = list(range(len(X_train)))
  218. val_inds = list(range(len(X_train),
  219. len(X_train) + len(X_val)))
  220. self._cv = [(train_inds, val_inds)]
  221. self._X = np.concatenate([X_train, X_val])
  222. self._y = None if y_train is None\
  223. else np.concatenate([y_train, y_val])
  224. else:
  225. try:
  226. assert(isinstance(X_train, (pd.DataFrame, np.array)) and
  227. isinstance(y_train, (pd.Series, np.array,
  228. pd.DataFrame, NoneType)) and
  229. (X_val is None) and (y_val is None))
  230. except AssertionError:
  231. self._logger.log_and_raise_error(input_err)
  232. self._cv = cv
  233. self._X = X_train
  234. self._y = y_train
  235. self._logger.info("Attached data")
  236. self._data_attached = True
  237. def _evaluate(self, pipeline: Pipeline) -> dict:
  238. '''
  239. This method is called in _objective.
  240. Calculates the cost on the attached data.
  241. This function can be overriden, when the cost
  242. needs to be calculated differently,
  243. for example with a tensorflow model.
  244. :param Pipeline pipeline: machine learning pipeline
  245. that will be evaluated with cross-validation
  246. :output: dictionary with the aggregated
  247. cross-validation score and
  248. the score variance.
  249. '''
  250. if not self._cross_validator_attached:
  251. self._cross_validator = sklearn_cross_validator
  252. scores = self._cross_validator(
  253. estimator=pipeline,
  254. X=self._X,
  255. y=self._y,
  256. cv=self._cv or 5,
  257. scoring=make_scorer(self._cost_func),
  258. error_score=np.nan)
  259. return {'value': self._cross_val_averaging_func(scores['test_score']),
  260. 'variance': np.var(scores['test_score'])}
  261. def _objective(self, space_element: dict) -> dict:
  262. '''
  263. This method is called in search_for_best_pipeline
  264. inside the hyperopt fmin method.
  265. Uses _evaluate method.
  266. It must take as input a space element
  267. and produce an output in the form of dictionary
  268. with 2 obligatory values loss and status
  269. (STATUS_OK or STATUS_FAIL). Other
  270. values in the output are optional and can be
  271. accessed later through the trials object.
  272. :Warning: fmin minimizes the loss,
  273. when _evaluate returns a value to be maximized,
  274. it should be multiplied by -1 to obtain loss.
  275. :param dict space_element: must contain keys
  276. name (with the name of the pipeline),
  277. pipeline (Pipeline object),
  278. params (dict of pipeline params)
  279. :output: dictionary with keys
  280. loss (minimized value),
  281. status with values STATUS_OK or STATUS_FAIL
  282. uderstood by hyperopt,
  283. score (equal to loss or -loss),
  284. score_variance,
  285. timestamp (end of execution),
  286. train_time: execution time
  287. '''
  288. try:
  289. assert(isinstance(space_element, dict) and
  290. set(['name', 'pipeline', 'params']) <= space_element.keys())
  291. assert(isinstance(space_element['name'], str) and
  292. isinstance(space_element['pipeline'], Pipeline) and
  293. isinstance(space_element['params'], dict))
  294. except AssertionError:
  295. err = "Space elements are of wrong form"
  296. self._logger.log_and_raise_error(err)
  297. start_time = time.time()
  298. if not self._data_attached:
  299. err = ("Data must be attached in order "
  300. "in order to effectuate the best"
  301. "pipeline search")
  302. self._logger.log_and_raise_error(err)
  303. self._run_number += 1
  304. pipeline = space_element['pipeline']
  305. params = space_element['params']
  306. pipeline.set_params(**params)
  307. self._logger.info(("Run number {0}: "
  308. "Current score is {1}: "
  309. "Training pipeline {2} "
  310. "with parameters: {3}. ").format(
  311. self._run_number,
  312. self._best_score,
  313. space_element['name'],
  314. params))
  315. try:
  316. result = self._evaluate(pipeline)
  317. assert(not np.isnan(result["value"]))
  318. if self._run_number % self._backup_trials_freq == 0:
  319. self._backup_trials()
  320. if (self._best_score != self._best_score) or\
  321. self._score_factor*result["value"] <\
  322. self._score_factor*self._best_score:
  323. self._logger.info("Score got better, new best score is: {}"
  324. .format(result["value"]))
  325. self._best_score = result['value']
  326. end_time = time.time()
  327. return {'loss': self._score_factor * result["value"],
  328. 'status': STATUS_OK,
  329. 'score': result["value"],
  330. 'score_variance': result["variance"],
  331. 'timestamp': datetime.datetime.today(),
  332. 'train_time': end_time - start_time}
  333. except Exception as e:
  334. self._logger.warning("Trial failed with error {}".format(e))
  335. return {'loss': np.nan,
  336. 'status': STATUS_FAIL,
  337. 'score': np.nan,
  338. 'score_variance': np.nan,
  339. 'timestamp': datetime.datetime.today(),
  340. 'train_time': np.nan}
  341. @abstractmethod
  342. def run_trials(self):
  343. """
  344. """
  345. pass
  346. @abstractproperty
  347. def best_trial(self) -> float:
  348. """
  349. """
  350. pass
  351. @abstractproperty
  352. def best_trial_score(self) -> float:
  353. """
  354. """
  355. pass
  356. @abstractproperty
  357. def best_trial_score_variance(self) -> float:
  358. """
  359. """
  360. pass
  361. @abstractproperty
  362. def best_trial_pipeline(self) -> Pipeline:
  363. """
  364. """
  365. pass