HyperoptPipelineSelector.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Tue Oct 6 15:04:25 2020
  5. @author: tanya
  6. @description:a class for selecting a machine learning
  7. pipeline from a deterministic space of parameter distributions
  8. over multiple pipelines.
  9. The selection is though in such a way that a Trials object is being
  10. maintained during the tuning process from which one can retrieve
  11. the best pipeline so far as well as the entire tuning history
  12. if needed.
  13. """
  14. import os
  15. import sys
  16. import pickle
  17. from copy import deepcopy
  18. import datetime
  19. import pandas as pd
  20. import numpy as np
  21. from sklearn.pipeline import Pipeline
  22. from hyperopt import fmin, tpe, rand, Trials, space_eval
  23. from cdplib.pipeline_selector.PipelineSelector import PipelineSelector,\
  24. SpaceElementType
  25. # from typing import Callable, Optional, Literal, Dict, Union, List
  26. if (sys.version_info.major == 3) & (sys.version_info.minor >= 8):
  27. from typing import Callable, Optional,\
  28. Literal, Dict, List, Union
  29. else:
  30. # from typing_extensions import *
  31. # from typing_extensions import Callable, Optional,\
  32. # Literal
  33. from cdplib.log import Log
  34. class HyperoptPipelineSelector(PipelineSelector):
  35. """
  36. Use this class to perform a search
  37. for a machine learning pipeline in a given parameter space.
  38. The parameter space can include multiple types of Pipelines
  39. (SVM, XGBOOST, random forest, etc),
  40. as well as parameter distributions for each pipeline parameter.
  41. See example in main for the expected space structure.
  42. The search can be performed either randomly
  43. or with a tree-based algorithm. (Other methods are currently
  44. developped by hyperopt creators).
  45. Attribute trials is responsible for book-keeping parameter
  46. combinations that have already been tried out. This attribute
  47. is saved to a binary file every n minutes as well as every time
  48. a better pipeline was found.
  49. """
  50. def __init__(self,
  51. # cost_func: Union[Callable, str],
  52. cost_func,
  53. greater_is_better: bool,
  54. trials_path: str,
  55. # backup_trials_freq: Optional[int] = None,
  56. backup_trials_freq = None,
  57. cross_validation_needs_scorer: bool = True,
  58. cross_val_averaging_func: Callable = np.mean,
  59. # additional_metrics: Optional[Dict[str, Callable]] = None,
  60. additional_metrics = None,
  61. # strategy_name: Optional[str] = None,
  62. strategy_name = None,
  63. # stdout_log_level: Literal["INFO", "WARNING", "ERROR"]
  64. # = "INFO")
  65. stdout_log_level = "INFO"):
  66. """
  67. param Callable cost_func: function to minimize or maximize
  68. over the elements of a given (pipeline/hyperparameter) space
  69. :param bool greater_is_better: when True
  70. cost_func is maximized, else minimized.
  71. :param str trials_path: path at which the trials object is saved
  72. in binary format. From the trials object we can
  73. select information about the obtained scores, score variations,
  74. and pipelines, and parameters tried out so far. If a trials object
  75. already exists at the given path, it is loaded and the
  76. search is continued, else, the search is started from scratch.
  77. :param backup_trials_freq: frequecy in interations (trials)
  78. of saving the trials object at the trials_path.
  79. if None, the trials object is backed up avery time
  80. the score improves.
  81. :param Callable cross_val_averaging_func: Function to aggregate
  82. the cross-validation scores.
  83. Example different from the mean: mean - c*var.
  84. :param additional_metics: dict of additional metrics to save
  85. of the form {"metric_name": metric} where metric is a Callable.
  86. :param str strategy_name:
  87. a strategy is defined by the data set (columns/features and rows),
  88. cv object, cost function.
  89. When the strategy changes, one must start with new trials.
  90. :param str stdout_log_level: can be INFO, WARNING, ERROR
  91. """
  92. try:
  93. super().__init__(cost_func=cost_func,
  94. greater_is_better=greater_is_better,
  95. trials_path=trials_path,
  96. backup_trials_freq=backup_trials_freq,
  97. cross_validation_needs_scorer=
  98. cross_validation_needs_scorer,
  99. cross_val_averaging_func=cross_val_averaging_func,
  100. additional_metrics=additional_metrics,
  101. strategy_name=strategy_name,
  102. stdout_log_level=stdout_log_level)
  103. self._logger = Log("HyperoptPipelineSelector: ",
  104. stdout_log_level=stdout_log_level)
  105. self._trials = self._trials or Trials()
  106. except Exception as e:
  107. err = "Failed to intialize. Exit with error: {}".format(e)
  108. self._logger.log_and_raise_error(err)
  109. def run_trials(self,
  110. niter: int,
  111. # algo: Literal[tpe.suggest, rand.suggest] = tpe.suggest)\
  112. algo = tpe.suggest)\
  113. -> None:
  114. '''
  115. Method performing the search of the best pipeline in the given space.
  116. Calls fmin function from the hyperopt library to minimize the output of
  117. _objective.
  118. :params int niter: number of search iterations
  119. :param algo: now can only take supported by the hyperopt library.
  120. For now these are tpe.suggest for a tree-based bayesian search
  121. or rad.suggest for randomized search
  122. '''
  123. try:
  124. self._trials = self._trials or Trials()
  125. self._logger.info(("Starting {0} iterations of search "
  126. "additional to {1} previous"
  127. .format(niter, len(self._trials.trials))))
  128. best_trial = fmin(fn=self._objective,
  129. space=self._space,
  130. algo=algo,
  131. trials=self._trials,
  132. max_evals=len(self._trials.trials) + niter)
  133. self._logger.info(
  134. "Best score is {0} with variance {1}"
  135. .format(
  136. self._trials.best_trial["result"]["score"],
  137. self._trials.best_trial["result"]["score_variance"]))
  138. self._logger.info(("Finished {0} iterations of search.\n"
  139. "Best parameters are:\n {1} ")
  140. .format(niter,
  141. space_eval(self._space, best_trial)))
  142. self.finished_tuning = True
  143. self.total_tuning_time = datetime.datetime.today()\
  144. - self.start_tuning_time
  145. self._backup_trials()
  146. except Exception as e:
  147. err = ("Failed to select best "
  148. "pipeline! Exit with error: {}").format(e)
  149. self._logger.log_and_raise_error(err)
  150. @property
  151. def number_of_trials(self):# -> Union[int, None]:
  152. """
  153. :return: number of trials run so far
  154. with the given Trials object
  155. """
  156. try:
  157. return len(self._trials.trials)
  158. except Exception as e:
  159. err = ("Failed to retrieve the number of trials. "
  160. "Exit with error {}".format(e))
  161. self._logger.log_and_raise_error(err)
  162. def _get_space_element_from_trial(self, trial: dict):#\
  163. # -> Union[Dict[str, SpaceElementType], None]:
  164. """
  165. Hyperopt trials object does not contain the space
  166. elements that result in the corresponding trials.
  167. One has to use the function space_eval from
  168. hyperopt to get the space element.
  169. After retrieving the space element,
  170. parameters of the pipeline are set.
  171. """
  172. try:
  173. trial = deepcopy(trial)
  174. assert(self.attached_space),\
  175. "Hyperparameter space not attached."
  176. space_element = space_eval(self._space,
  177. {k: v[0] for k, v in
  178. trial['misc']['vals'].items()
  179. if len(v) > 0})
  180. pipeline = deepcopy(space_element["pipeline"])
  181. params = deepcopy(space_element["params"])
  182. pipeline.set_params(**params)
  183. space_element["pipeline"] = pipeline
  184. return space_element
  185. except Exception as e:
  186. err = ("Failed to retrieve a space element from a trial. "
  187. "Exit with error: {}".format(e))
  188. self._logger.log_and_raise_error(err)
  189. def _get_space_element_from_index(self, i: int): #\
  190. # -> Union[Dict[str, SpaceElementType], None]:
  191. """
  192. Gets the space element of shape
  193. {"name": NAME, "params": PARAMS, "pipeline": PIPELINE}
  194. from the trial number i.
  195. """
  196. try:
  197. assert(len(self._trials.trials) > i),\
  198. ("Trials object is not long enough "
  199. "to retrieve index {}".format(i))
  200. return self._get_space_element_from_trial(self._trials.trials[i])
  201. except Exception as e:
  202. err = ("Failed to get space element from index. "
  203. "Exit with error {}".format(e))
  204. self._logger.log_and_raise_error(err)
  205. def _get_pipeline_from_index(self, i: int):# -> Union[Pipeline, None]:
  206. """
  207. Gets a pipeline with set parameters from the trial number i
  208. """
  209. try:
  210. space_element = self._get_space_element_from_index(i)
  211. return space_element["pipeline"]
  212. except Exception as e:
  213. err = ("Failed to retrieve pipeline from index. "
  214. "Exit with error: {}".format(e))
  215. self._logger.log_and_raise_error(err)
  216. @property
  217. def best_trial(self):# -> Union[dict, None]:
  218. """
  219. :return: dictionary with the summary of the best trial
  220. and space element (name, pipeline, params)
  221. resulting in the best trial
  222. """
  223. if len(self._trials.trials) == 0:
  224. self._logger.log_and_throw_warning("Trials object is empty")
  225. return {}
  226. else:
  227. try:
  228. best_trial = deepcopy(self._trials.best_trial)
  229. if self.attached_space:
  230. space_element = self._get_space_element_from_trial(
  231. best_trial)
  232. else:
  233. space_element = {}
  234. warn = ("Space is not attached, "
  235. "To included the best pipeline "
  236. "attach the space")
  237. self._logger.log_and_throw_warning(warn)
  238. best_trial = deepcopy(self._trials.best_trial["result"])
  239. best_trial.update(space_element)
  240. return best_trial
  241. except Exception as e:
  242. err = "Failed to retrieve best trial. Exit with error: {}"\
  243. .format(e)
  244. self._logger.log_and_raise_error(err)
  245. @property
  246. def best_trial_score(self):# -> Union[float, None]:
  247. """
  248. """
  249. try:
  250. if len(self.best_trial) > 0:
  251. return self.best_trial["score"]
  252. else:
  253. return np.nan
  254. except Exception as e:
  255. err = ("Failed to retrieve best trial score. "
  256. "Exit with error: {}".format(e))
  257. self._logger.log_and_raise_error(err)
  258. @property
  259. def best_trial_score_variance(self):# -> Union[float, None]:
  260. """
  261. """
  262. try:
  263. if len(self.best_trial) > 0:
  264. return self.best_trial["score_variance"]
  265. else:
  266. return np.nan
  267. except Exception as e:
  268. err = ("Failed to retrieve best trial score variance. "
  269. "Exit with error: {}".format(e))
  270. self._logger.log_and_raise_error(err)
  271. @property
  272. def best_trial_pipeline(self):# -> Union[Pipeline, None]:
  273. """
  274. """
  275. try:
  276. if len(self.best_trial) > 0:
  277. return self.best_trial["pipeline"]
  278. else:
  279. return np.nan
  280. except Exception as e:
  281. err = ("Failed to retrieve best trial pipeline. "
  282. "Exit with error: {}".format(e))
  283. self._logger.log_and_raise_error(err)
  284. def get_n_best_trial(self, n: int)\
  285. -> Union[List[Pipeline], None]:
  286. """
  287. :return: the list of n best trails
  288. """
  289. try:
  290. if len(self._trials.trials) == 0:
  291. return []
  292. else:
  293. n_best_trials = sorted(self._trials.trials,
  294. key=lambda x: x["result"]["score"],
  295. reverse=True)[:n]
  296. return n_best_trials
  297. except Exception as e:
  298. err = ("Failed to retrieve n best pipelines. "
  299. "Exit with error: {}".format(e))
  300. self._logger.log_and_raise_error(err)
  301. def get_n_best_trial_pipelines(self, n: int):#\
  302. # -> Union[List[Pipeline], None]:
  303. """
  304. :return: the list of n best pipelines
  305. documented in trials
  306. """
  307. try:
  308. if len(self._trials.trials) == 0:
  309. return []
  310. else:
  311. n_best_trials = sorted(self._trials.trials,
  312. key=lambda x: x["result"]["score"],
  313. reverse=True)[:n]
  314. return [self._get_space_element_from_trial(trial)["pipeline"]
  315. for trial in n_best_trials]
  316. except Exception as e:
  317. err = ("Failed to retrieve n best pipelines. "
  318. "Exit with error: {}".format(e))
  319. self._logger.log_and_raise_error(err)
  320. def get_n_best_trial_pipelines_of_each_type(self, n: int): #\
  321. # -> Union[Dict[str, List[Pipeline]], None]:
  322. """
  323. :return: a dictiionry where keys are pipeline names,
  324. and values are lists of best pipelines with this name
  325. """
  326. try:
  327. scores = [trial["result"]["score"]
  328. for trial in self._trials.trials]
  329. names = [self._get_space_element_from_trial(trial)["name"]
  330. for trial in self._trials.trials]
  331. return pd.DataFrame({"name": names, "score": scores})\
  332. .sort_values(by=["name", "score"], ascending=False)\
  333. .groupby("name")\
  334. .head(n)\
  335. .reset_index()\
  336. .assign(pipeline=lambda x: x["index"]
  337. .apply(self._get_pipeline_from_index))\
  338. .groupby("name")["pipeline"]\
  339. .apply(lambda x: list(x))\
  340. .to_dict()
  341. except Exception as e:
  342. err = ("Failed to get n best pipelines of each type. "
  343. "Exit with error: {}".format(e))
  344. self._logger.log_and_raise_error(err)
  345. def trials_to_excel(self, path: str = None) -> None:
  346. """
  347. Saves an excel file with pipeline names, scores,
  348. parameters, and timestamps.
  349. """
  350. try:
  351. results = [trial["result"] for trial in self._trials.trials]
  352. space_elements = [self._get_space_element_from_trial(trial)
  353. for trial in self._trials.trials]
  354. pd.DataFrame([{**result, **space_element}
  355. for result, space_element in
  356. zip(results, space_elements)]).to_excel(path)
  357. except Exception as e:
  358. err = ("Failed to write trials to excel. "
  359. "Exit with error: {}".format(e))
  360. self._logger.log_and_raise_error(err)
  361. if __name__ == '__main__':
  362. # elementary example
  363. from sklearn.metrics import roc_auc_score, precision_score
  364. from sklearn.datasets import load_breast_cancer
  365. from cdplib.log import Log
  366. from cdplib.db_handlers import MongodbHandler
  367. from cdplib.hyperopt.space_sample import space
  368. # from cdplib.hyperopt.composed_space_sample import space
  369. trials_path = "hyperopt_trials_TEST.pkl"
  370. additional_metrics = {"precision": precision_score}
  371. strategy_name = "strategy_1"
  372. data_path = "data_TEST.h5"
  373. cv_path = "cv_TEST.pkl"
  374. collection_name = 'TEST_' + strategy_name
  375. logger = Log("HyperoptPipelineSelector__TEST:")
  376. logger.info("Start test")
  377. data_loader = load_breast_cancer()
  378. X = data_loader["data"]
  379. y = data_loader["target"]
  380. pd.DataFrame(X).to_hdf(data_path, key="X_train")
  381. pd.Series(y).to_hdf(data_path, key="y_train")
  382. cv = [(list(range(len(X)//3)), list(range(len(X)//3, len(X)))),
  383. (list(range(2*len(X)//3)), list(range(2*len(X)//3, len(X))))]
  384. pickle.dump(cv, open(cv_path, "wb"))
  385. hs = HyperoptPipelineSelector(cost_func=roc_auc_score,
  386. greater_is_better=True,
  387. trials_path=trials_path,
  388. additional_metrics=additional_metrics,
  389. strategy_name=strategy_name,
  390. stdout_log_level="WARNING")
  391. hs.attach_space(space=space)
  392. hs.attach_data_from_hdf5(data_hdf5_store_path=data_path,
  393. cv_pickle_path=cv_path)
  394. try:
  395. # TODO: this line causes a pytype to throw not-callable error
  396. # works fine with pytype on other class methods.
  397. save_method = MongodbHandler().insert_data_into_collection
  398. save_kwargs = {'collection_name': collection_name}
  399. # save_method = pd.DataFrame.to_excel()
  400. # save_kwargs = {'excel_writer': "TEST.xlsx"}
  401. hs.configer_summary_saving(save_method=save_method,
  402. kwargs=save_kwargs)
  403. logger.info("Configured summary saving in mongo")
  404. except Exception as e:
  405. logger.warning(("Could not configure summary saving in mongo. "
  406. "Exit with error: {}".format(e)))
  407. hs.run_trials(niter=10)
  408. logger.info("Best Trial: {}".format(hs.best_trial))
  409. logger.info("Total tuning time: {}".format(hs.total_tuning_time))
  410. for file in [trials_path, data_path, cv_path]:
  411. os.remove(file)
  412. logger.info("End test")