PipelineSelector.py 28 KB

  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Wed Sep 30 14:23:23 2020
  5. @author: tanya
  6. @description: an abstract class for selecting a machine learning
  7. pipeline from a space (deterministic or random) of parameter distributions
  8. over multiple pipelines.
  9. The selection is thought in such a way that a Trials object is being
  10. maintained during the tuning process from which one can retrieve
  11. the best pipeline so far
  12. as well as the entire tuning history if needed.
  13. Methods configure_cross_validation and configure_result_saving
  14. allow to use a custom cross-validation method and
  15. save the current best result in a file or database during training.
  16. Children classes: hyperopt and custom gridsearch.
  17. """
  18. import pickle
  19. import os
  20. import sys
  21. import time
  22. import datetime
  23. import numpy as np
  24. import pandas as pd
  25. from copy import deepcopy
  26. from abc import ABC, abstractmethod, abstractproperty
  27. from typing import Callable, Optional, TypedDict,\
  28. Literal, Dict, Iterable, List, Tuple, Union
  29. import functools
  30. from sklearn.pipeline import Pipeline
  31. from sklearn.model_selection import cross_validate as sklearn_cross_validation
  32. from sklearn.metrics import make_scorer
  33. from hyperopt import STATUS_OK, STATUS_FAIL
  34. from cdplib.log import Log
  35. from cdplib.utils import ExceptionsHandler
  36. from cdplib.utils import LoadingUtils
  37. from cdplib.ml_validation import CVComposer
  38. sys.path.append(os.getcwd())
  39. class SpaceElementType(TypedDict):
  40. name: str
  41. pipeline: Pipeline
  42. params: dict
  43. class PipelineSelector(ABC):
  44. """
  45. An abstract class for selecting a machine learning
  46. pipeline from a space (deterministic or random) of parameter
  47. distributions over multiple pipelines.
  48. The selection is though in such a way that a Trials object is being
  49. maintained during the tuning process from which one can retrieve
  50. the best pipeline so far as well as the entire tuning history
  51. if needed.
  52. Methods configure_cross_validation and configure_result_saving
  53. allow to use a custom cross-validation method and
  54. save the current best result in a file or database during training.
  55. Children classes: hyperopt and custom gridsearch.
  56. """
  57. def __init__(self,
  58. cost_func: Union[Callable, str],
  59. greater_is_better: bool,
  60. trials_path: str,
  61. backup_trials_freq: Optional[int] = None,
  62. cross_val_averaging_func: Callable = np.mean,
  63. additional_metrics: Optional[Dict[str, Callable]] = None,
  64. additional_averaging_funcs:
  65. Optional[Dict[str, Callable]] = None,
  66. strategy_name: Optional[str] = None,
  67. stdout_log_level: Literal["INFO", "WARNING", "ERROR"]
  68. = "INFO"):
  69. """
  70. :param Callable cost_func: function to minimize or maximize
  71. over the elements of a given (pipeline/hyperparameter) space
  72. :param bool greater_is_better: when True
  73. cost_func is maximized, else minimized.
  74. :param str trials_path: path at which the trials object is saved
  75. in binary format. From the trials object we can
  76. select information about the obtained scores, score variations,
  77. and pipelines, and parameters tried out so far. If a trials object
  78. already exists at the given path, it is loaded and the
  79. search is continued, else, the search is started from scratch.
  80. :param backup_trials_freq: frequecy in interations (trials)
  81. of saving the trials object at the trials_path.
  82. if None, the trials object is backed up avery time
  83. the score improves.
  84. :param Callable cross_val_averaging_func: Function to aggregate
  85. the cross-validation scores of the cost_func.
  86. Example different from the mean: mean - c*var.
  87. :param additional_metics: dict of additional metrics to keep track of
  88. in the trials of the form {"metric_name": metric}.
  89. :param additional_averaging_funcs: functions used to aggregate
  90. the output of the cross_validate function.
  91. The output always contains the scores of the cost_func,
  92. additional_metrics (if it is not empty),
  93. but it can also contain additional information
  94. (like probability threshold for example)
  95. if different from cross_val_averaging_func.
  96. Of the form {"metric_name": averaging_func}
  97. Remark:
  98. :param str strategy_name:
  99. a strategy is defined by the data set (columns/features and rows),
  100. cv object, cost function.
  101. When the strategy changes, one must start with new trials.
  102. :param str stdout_log_level: can be INFO, WARNING, ERROR
  103. """
  104. self._logger = Log("PipelineSelector: ",
  105. stdout_log_level=stdout_log_level)
  106. try:
  107. ExceptionsHandler(self._logger)\
  108. .assert_is_directory(path=trials_path)
  109. self.attached_space = False
  110. self.attached_data = False
  111. self.configured_cross_validation = False
  112. self.configured_summary_saving = False
  113. self._cost_func = cost_func
  114. # score factor is 1 when cost_func is minimized,
  115. # -1 when cost func is maximized
  116. self._score_factor = (not greater_is_better) - greater_is_better
  117. self.trials_path = trials_path
  118. self._backup_trials_freq = backup_trials_freq
  119. self._strategy_name = strategy_name
  120. self._data_path = None
  121. self._cv_path = None
  122. self._X = None
  123. self._y = None
  124. self._cv = None
  125. self._space = None
  126. # if cross-valition is not configured,
  127. # sklearn cross-validation method is taken by default
  128. self._cross_validation = sklearn_cross_validation
  129. # if a trials object already exists at the given path,
  130. # it is loaded and the search is continued. Else,
  131. # the search is started from the beginning.
  132. if os.path.isfile(self.trials_path):
  133. with open(self.trials_path, "rb") as f:
  134. self._trials = pickle.load(f)
  135. self._start_iteration = self.number_of_trials
  136. self.best_score = self.best_trial_score
  137. self._logger.info(("Loaded an existing trials object"
  138. "Consisting of {} trials")
  139. .format(self._start_iteration))
  140. else:
  141. self._logger.warning(("No existing trials object was found, "
  142. "Starting from scratch."))
  143. self._trials = None
  144. self._start_iteration = 0
  145. self.best_score = np.nan
  146. # keeping track of the current search iteration
  147. self._iteration = self._start_iteration
  148. self._score_improved = False
  149. self.start_tuning_time = datetime.datetime.today()
  150. self.total_tuning_time = None
  151. self.finished_tuning = False
  152. except Exception as e:
  153. err = ("Failed to initialize the class. "
  154. "Exit with error: {}".format(e))
  155. self._logger.log_and_raise_error(err)
  156. def _backup_trials(self) -> None:
  157. '''
  158. Pickles (Saves) the trials object in binary format.
  159. '''
  160. try:
  161. with open(self.trials_path, "wb") as f:
  162. pickle.dump(self._trials, f)
  163. except Exception as e:
  164. err = "Could not backup trials. Exit with error: {}".format(e)
  165. self._logger.log_and_raise_error(err)
  166. def configure_cross_validation(self,
  167. cross_validation: Callable,
  168. kwargs: dict = None) -> None:
  169. """
  170. Method for attaching a custom cross-validation function
  171. :param cross_validation: a function that has the same
  172. signature as sklearn.model_selection.cross_validate
  173. """
  174. try:
  175. kwargs = kwargs or {}
  176. self._cross_validation = functools.partial(
  177. self._cross_validation, **kwargs)
  178. self.configured_cross_validation = True
  179. self._logger.info("Configured cross validation")
  180. except Exception as e:
  181. err = ("Failed to configure cross-validation. "
  182. "Exit with error: {}".format(e))
  183. self._logger.log_and_raise_error(err)
  184. def configure_cross_validation_from_module(self,
  185. module_path: str,
  186. name: str) -> None:
  187. """
  188. Attaches a cross-validation funciton defined in
  189. a different python model. This function must have
  190. the same signature as sklearn.model_seclection.cross_validate
  191. :param str module_path: path to python module
  192. where the cross_validation function is defined.
  193. :param str name: name of the cross validation function
  194. loaded froma python module.
  195. """
  196. try:
  197. self._cross_validation = \
  198. LoadingUtils().load_from_module(
  199. module_path=module_path, name=name)
  200. self.configured_cross_validation = True
  201. self._logger.info("Configured cross validation")
  202. except Exception as e:
  203. err = ("Failed to load cross-validation from module. "
  204. "Exit with error: {}".format(e))
  205. self._logger.log_and_raise_error(err)
  206. def attach_space(self, space) -> None:
  207. """
  208. Method for attaching the pipeline/hyperparameter space
  209. over which the score_func is optimized.
  210. :param space: space where
  211. the search is performed. A space might be either
  212. a list of dictionaries or a hyperopt space object
  213. the elements of which are dictionaries with keys:
  214. name, pipeline, params
  215. """
  216. try:
  217. self._space = space
  218. self.attached_space = True
  219. self._logger.info("Attached parameter distribution space")
  220. except Exception as e:
  221. err = ("Failed to attach space. "
  222. "Exit with error: {}".format(e))
  223. self._logger.log_and_raise_error(err)
  224. def attach_space_from_module(self, module_path: str, name: str) -> None:
  225. """
  226. Attaches a space defined in a different python module.
  227. :param str module_path: path to python module
  228. where the space is defined.
  229. :param str name: name of the space loaded from
  230. a python module.
  231. """
  232. try:
  233. self._space = LoadingUtils().load_from_module(
  234. module_path=module_path, name=name)
  235. self.attached_space = True
  236. self._logger.info("Attached parameter distribution space")
  237. except Exception as e:
  238. err = ("Failed to attach space from module. "
  239. "Exit with error {}".format(e))
  240. self._logger.loger_and_raise_error(err)
  241. def attach_data(self, X_train: Union[pd.DataFrame, np.ndarray],
  242. y_train: Optional[pd.DataFrame, pd.Series, np.ndarray]
  243. = None,
  244. X_val: Optional[pd.DataFrame, np.ndarray]
  245. = None,
  246. y_val: Optional[pd.DataFrame, pd.Series, np.ndarray]
  247. = None,
  248. cv: Optional[Iterable[Tuple[List[int], List[int]]]]
  249. = None) -> None:
  250. '''
  251. :param array X_train: data on which
  252. machine learning pipelines are trained
  253. :param array y_train: optional, vector with targets,
  254. (None in case of unsupervided learning)
  255. :param array X_val: optional, validation data.
  256. When not provided, cross-validated value
  257. of the cost_func is calculated.
  258. :param array y_val: optional, validation targets
  259. :param list cv: iterabe of tuples containing
  260. train and validation indices or an integer representing
  261. the number of folds for a random split of data
  262. during cross-validation
  263. example: [([0,1,2], [3,4]), ([1,2,3], [4,5])]
  264. '''
  265. try:
  266. assert((cv is None) == (X_val is not None)),\
  267. "Either cv or X_val must be provided"
  268. if cv is None:
  269. assert((y_val is None) == (y_train is None)),\
  270. "y_train and y_val must be simultanious"
  271. # Here we create a trivial cv object
  272. # with one validation split.
  273. cv = CVComposer.dummy_cv()
  274. train_inds = list(range(len(X_train)))
  275. val_inds = list(range(len(X_train),
  276. len(X_train) + len(X_val)))
  277. self._cv = [(train_inds, val_inds)]
  278. self._X = np.concatenate([X_train, X_val])
  279. self._y = None if y_train is None\
  280. else np.concatenate([y_train, y_val])
  281. else:
  282. self._cv = cv
  283. self._X = X_train
  284. self._y = y_train
  285. self.attached_data = True
  286. self._logger.info("Attached data")
  287. except Exception as e:
  288. err = ("Failed to attach data. "
  289. "Exit with error: {}".format(e))
  290. self._logger.log_and_raise_error(err)
  291. def attach_data_from_hdf5(self,
  292. data_hdf5_store_path: str,
  293. cv_pickle_path: str = None) -> None:
  294. """
  295. Method for attaching data from a hdf5 store
  296. and a cv object from a pickled file.
  297. The hdf5 store is a binary file,
  298. after loading it, it is a dictionary with keys
  299. X_train (y_train, X_val, y_val).
  300. The cv is loaded from a pickle file.
  301. The reason to separate the data
  302. store from the cv store, is the hdf5 is optimized to
  303. store large dataframes (especially with simple types) and
  304. a a small list of lists like a cv-object is better
  305. to be stored as a pickle file.
  306. :param str data_hdf5_store_path: path to the hdf5 store
  307. with train and validation data
  308. :param str cv_pickle_path: path to the pickle file with
  309. the cv data
  310. """
  311. try:
  312. assert(os.path.isfile(data_hdf5_store_path)),\
  313. "Parameter hdf5_store_path is not a file"
  314. store = pd.HDFStore(data_hdf5_store_path)
  315. self._data_path = data_hdf5_store_path
  316. data_input = {key: store["key"] if key in store else None
  317. for key in ["X_train", "y_train", "X_val", "y_val"]}
  318. if cv_pickle_path is not None:
  319. assert(os.path.isfile(cv_pickle_path)),\
  320. "Parameter cv_pickle_path is not a file"
  321. data_input["cv"] = pickle.load(open(cv_pickle_path, "rb"))
  322. self._cv_path = cv_pickle_path
  323. else:
  324. data_input["cv"] = None
  325. self.attach_data(**data_input)
  326. store.close()
  327. except Exception as e:
  328. err = "Failed to attach data. Exit with error: {}".format(e)
  329. self._logger.log_and_raise_error(err)
  330. @property
  331. def default_summary(self) -> dict:
  332. """
  333. Default summary of the strategy.
  334. Every the _objective function is called
  335. the current score and the information
  336. about the tested space element is added to the
  337. summary and it is saved to the Trials.
  338. If summary saving is configured it is also
  339. saved to a file, or a database when the score improves.
  340. """
  341. summary = {}
  342. if self._strategy_name is not None:
  343. summary["strategy_name"] = self._strategy_name
  344. if isinstance(self._cost_func, str):
  345. summary["cost_func"] = self._cost_func
  346. elif hasattr(self._cost_func, "__name__"):
  347. summary["cost_func"] = self._cost_func.__name__
  348. summary["trials_path"] = self.trials_path
  349. if self._data_path is not None:
  350. summary["data_path"] = self._data_path
  351. if self._cv_path is not None:
  352. summary["cv_path"] = self._cv_path
  353. summary["start_tuning_time"] = self.start_tuning_time
  354. summary["iteration"] = self._iteration
  355. return summary
  356. def configer_summary_saving(self,
  357. save_method: Callable
  358. = functools.partial(
  359. pd.DataFrame.to_excel,
  360. **{"path_or_buf": "result.csv"}),
  361. kwargs: Optional[dict] = None) -> None:
  362. """
  363. When the score calculated by _objective function improves,
  364. the default summary is updated with information about the
  365. current score and pipeline/hyperparameters
  366. and can be saved to a file or database, depending
  367. on the configured save_method.
  368. :param Callable save_method: method for saving the result
  369. of the pipeline selection. The method must accept
  370. a pandas DataFrame as argument.
  371. By default, saving to an excel file.
  372. Examples:
  373. functools.partial(pd.DataFrame.to_csv,
  374. **{"path_or_buf": <PATH>})
  375. functools.partial(np.savetxt, **{"fname": <PATH>})
  376. functools.partial(SQLHandler(<URI>).append_to_table,
  377. **{"tablename": <NAME>})
  378. functools.partial(MongodbHandler(<URI>).insert_data_into_collection,
  379. **{"collection_name": <NAME>})
  380. using functools can be avoided by providing the kwarg argument
  381. :param dict kwargs: a dictionary with keyword arguments
  382. (like tablename) to provide to the save_method
  383. """
  384. try:
  385. kwargs = kwargs or {}
  386. self._save_method = functools.partial(save_method, **kwargs)
  387. self.configured_summary_saving = True
  388. self._logger.info("Configured summary saving")
  389. except Exception as e:
  390. err = ("Failed to configure the summary saving. "
  391. "Exit with error {}".format(e))
  392. self._logger.log_and_raise_error(err)
  393. def _save_summary(self, summary: dict) -> None:
  394. """
  395. When the score calculated by _objective function improves,
  396. the default summary is updated with information about the
  397. current score and pipeline/hyperparameters
  398. and can be saved to a file or database, depending
  399. on the configured save_method.
  400. """
  401. try:
  402. assert(self.configured_summary_saving),\
  403. "Result saving must be configured first"
  404. self._save_method(summary)
  405. except Exception as e:
  406. err = ("Could not configure summary saving. "
  407. "Exit with error: {}".format(e))
  408. self._logger.log_and_raise_error(err)
  409. def _evaluate(self, pipeline: Pipeline) -> Union[Dict[str, float], None]:
  410. """
  411. Calculates the averaged cross-validated score and score variance,
  412. as well as the averaged values and variances of the additional metrics.
  413. This method is called in the _objective function that is
  414. passed to the hyperopt optimizer.
  415. This function can be overriden, when the cost
  416. needs to be calculated differently,
  417. for example with a tensorflow model.
  418. :param Pipeline pipeline: machine learning pipeline
  419. that will be evaluated with cross-validation
  420. :return: dictionary with the aggregated
  421. cross-validation scores and
  422. the score variances for the scores in the output
  423. of the cross-validation function.
  424. form of the output:
  425. {"score": 10, #score used in optimization,
  426. "score_variance": 0.5
  427. "additional_metric1": 5,
  428. "additional_metric1_variance": 7}
  429. a custom cross-validation function can also include for
  430. example probability threshold for each fold, then
  431. the output of this function will include the average
  432. value and the variance of the probability threshold
  433. over the folds.
  434. """
  435. try:
  436. scoring = {"score": make_scorer(self.cost_func)}
  437. scoring.update({metric_name: make_scorer(metric)
  438. for metric_name, metric
  439. in self._additional_metrics.items()})
  440. scores = self._cross_validation(
  441. estimator=pipeline,
  442. X=self._X,
  443. y=self._y,
  444. cv=self._cv,
  445. scoring=self._scoring,
  446. error_score=np.nan)
  447. averaging_funcs = {
  448. metric_name: self._additional_averaging_funcs[metric_name]
  449. if metric_name in self._additional_averaging_funcs
  450. else self._cross_val_averaging_func
  451. for metric_name in scores}
  452. scores_average = {
  453. metric_name.replace("test_", ""):
  454. averaging_funcs[metric_name](scores[metric_name])
  455. for metric_name in scores
  456. if metric_name.startswith("test")}
  457. scores_variance = {
  458. metric_name.replace("test_", "") + "_variance":
  459. np.var(scores[metric_name])
  460. for metric_name in scores
  461. if metric_name.startswith("test")}
  462. return {**scores_average, **scores_variance}
  463. except Exception as e:
  464. err = "Failed to evaluate pipeline. Exit with error: {}".format(e)
  465. self._logger.log_and_raise_error(err)
  466. def _objective(self, space_element: SpaceElementType) -> dict:
  467. '''
  468. This method is called in run_trials method
  469. that is using the hyperopt fmin opmizer.
  470. Uses _evaluate method.
  471. It must take as input a space element
  472. and produce an output in the form of dictionary
  473. with 2 obligatory values loss and status
  474. (STATUS_OK or STATUS_FAIL). Other
  475. values in the output are optional and can be
  476. accessed later through the trials object.
  477. :Warning: fmin minimizes the loss,
  478. when _evaluate returns a value to be maximized,
  479. it is multiplied by -1 to obtain loss.
  480. :param SpaceElementType space_element: element
  481. of the space over which the optimization is done
  482. :output: dictionary with keys
  483. loss (minimized value),
  484. status with values STATUS_OK or STATUS_FAIL
  485. uderstood by hyperopt,
  486. score (equal to loss or -loss),
  487. score_variance,
  488. timestamp (end of execution),
  489. train_time: execution time
  490. and other keys given in self.default_summary
  491. '''
  492. try:
  493. start_time = time.time()
  494. assert(self.attached_data),\
  495. ("Data must be attached in order "
  496. "in order to effectuate the best"
  497. "pipeline search")
  498. summary = deepcopy(self.default_summary)
  499. # backup the current trials if the score improved
  500. # at previous iteration or every ith iteration
  501. # if the backup_trials_freq is set
  502. backup_cond = ((self._backup_trials_freq is not None) and
  503. ((self._iteration - self._start_iteration - 1) %
  504. self._backup_trials_freq == 0)) or\
  505. self._score_improved
  506. if backup_cond:
  507. self._backup_trials()
  508. self._score_improved = False
  509. pipeline = space_element['pipeline']
  510. params = space_element['params']
  511. pipeline.set_params(**params)
  512. self._logger.info(("Iteration {0}: "
  513. "Current score is {1}: "
  514. "Training pipeline {2} "
  515. "with parameters: {3}. ").format(
  516. self._iteration,
  517. self.best_score,
  518. space_element['name'],
  519. params))
  520. result = self._evaluate(pipeline)
  521. summary.update(result)
  522. end_time = time.time()
  523. summary['status'] = STATUS_OK
  524. summary.update(result)
  525. summary['loss'] = self._score_factor * summary['score']
  526. summary['timestamp'] = datetime.datetime.today()
  527. summary['train_time'] = end_time - start_time
  528. self._iteration += 1
  529. self._score_improved = (self.best_score != self.best_score) or\
  530. (self._score_factor*result["score"] <
  531. self._score_factor*self.best_score)
  532. if self._score_improved:
  533. self._logger.info("Score improved, new best score is: {}"
  534. .format(result["score"]))
  535. self.best_score = result['score']
  536. if self.configured_summary_saving:
  537. self._save_summary(summary)
  538. except Exception as e:
  539. self._logger.warning("Trial failed with error {}".format(e))
  540. summary = {}
  541. summary['status'] = STATUS_FAIL
  542. summary['timestamp'] = datetime.datetime.today()
  543. summary['error'] = e
  544. for key in ['loss', 'score', 'score_variance', 'train_time']:
  545. summary[key] = np.nan
  546. return summary
  547. @abstractmethod
  548. def run_trials(self):
  549. """
  550. Method that runs the hyperparameter tuning over possibly multiple
  551. pipeline types specified in self.space
  552. When run_trials method is finished the flag self.finished_tuning
  553. should be set to True and the methods self._backup_trials and
  554. optionally self._save_result should be called.
  555. """
  556. pass
  557. @abstractproperty
  558. def number_of_trials(self) -> int:
  559. """
  560. Number of trials already run in the current trials object
  561. """
  562. pass
  563. @abstractproperty
  564. def best_trial(self) -> dict:
  565. """
  566. Best trial sor far.
  567. Should contain the status, pipeline,
  568. hyperparameters, and the score (loss).
  569. Other information is otional and is defined
  570. by self.default_summary
  571. """
  572. pass
  573. @abstractproperty
  574. def best_trial_score(self) -> float:
  575. """
  576. Score of the best pipeline with the best hyperparameters
  577. """
  578. pass
  579. @abstractproperty
  580. def best_trial_score_variance(self) -> float:
  581. """
  582. Variance of the cross-validation score of the best pipeline
  583. """
  584. pass
  585. @abstractproperty
  586. def best_trial_pipeline(self) -> Pipeline:
  587. """
  588. Best pipeline with best hyperparameters
  589. """
  590. pass
  591. @abstractmethod
  592. def get_n_best_trial_pipelines(self, n: int) -> list:
  593. """
  594. N best pipelines with corresponding
  595. best hyperparameters
  596. """
  597. pass
  598. @abstractmethod
  599. def get_n_best_trial_pipelines_of_each_type(self, n_int) -> list:
  600. """
  601. If the hyperparameter search is done over multiple
  602. pipelines, then returns n different pipeline-types
  603. with corresponding hyperparameters
  604. """
  605. pass
  606. @abstractmethod
  607. def trials_to_excel(self, path: str) -> None:
  608. """
  609. Trials object in the shape of table written to excel,
  610. should contain the iteration, pipeline (as str),
  611. hyperparamters (as str), self.best_result (see self._objective method)
  612. as well as additional information defined by self.default_summary
  613. """
  614. pass