CVComposer.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Wed Dec 9 10:27:39 2020
  5. @author: tanya
  6. """
  7. from typing import Union, Iterable, Tuple, List, NewType
  8. import pandas as pd
  9. import numpy as np
  10. from itertools import accumulate, repeat, takewhile, chain
  11. from cdplib.log import Log
  12. CVType = NewType("CVType", Iterable[Tuple[List]])
  13. DataSetType = NewType("DataSetType",
  14. Union[pd.DataFrame, pd.Series, np.ndarray, List])
  15. class CVComposer:
  16. """
  17. Groups methods for composing cv objects
  18. that follow standards from sklearn,
  19. these cv objects can be passed to algorithms like gridsearch, etc
  20. """
  21. def __init__(self):
  22. """
  23. """
  24. self._logger = Log("CVComposer: ")
  25. def dummy_cv(
  26. self,
  27. train_set_size: Union[int, None] = None,
  28. train_index: Union[pd.Series, np.ndarray, None] = None,
  29. test_set_size: Union[int, None] = None,
  30. test_index: DataSetType = None) -> CVType:
  31. """
  32. """
  33. assert((train_index is None) != (train_set_size is None)),\
  34. "Set train_index or train_set_size"
  35. assert((test_index is None) != (test_set_size is None)),\
  36. "Set train_index or train_set_size"
  37. train_index = train_index if (train_index is not None)\
  38. else list(range(train_set_size))
  39. test_index = test_index if (test_index is not None)\
  40. else list(range(train_set_size, train_set_size + test_set_size))
  41. return [(train_index, test_index)]
  42. def dummy_cv_and_concatenated_data_set(
  43. self,
  44. X_train: DataSetType,
  45. X_test: DataSetType,
  46. y_train: Union[DataSetType, None] = None,
  47. y_test: Union[DataSetType, None] = None)\
  48. -> Tuple[DataSetType, DataSetType, CVType]:
  49. """
  50. """
  51. assert((y_test is None) == (y_train is None))
  52. use_index = (isinstance(X_train, pd.DataFrame) and
  53. isinstance(X_test, pd.DataFrame) and
  54. (len(set(X_train.index) and set(X_test.index)) == 0))
  55. if use_index:
  56. cv = self.dummy_cv(train_set_index=X_train.index,
  57. test_set_index=X_test.index)
  58. X = pd.concat([X_train, X_test], ignore_index=False, axis=0)
  59. else:
  60. cv = self.dummy_cv(train_set_size=len(X_train),
  61. test_set_size=len(X_test))
  62. X = np.concatenate([X_train, X_test])
  63. use_target_index = use_index and (
  64. isinstance(y_train, pd.Series) and
  65. isinstance(y_test, pd.Series) and
  66. (X_train.index.equals(y_train.index)) and
  67. (X_test.index.equals(y_test.index)))
  68. if use_target_index:
  69. y = pd.concat([y_train, y_test], ignore_index=False, axis=0)
  70. else:
  71. y = np.concatenate([y_train, y_test]) if (y_train is not None)\
  72. else None
  73. result_to_np = (
  74. (isinstance(X_train, pd.DataFrame) !=
  75. isinstance(X_test, pd.DataFrame)) or
  76. (isinstance(X_train, pd.DataFrame)) and
  77. (len(set(X_train.index) and set(X_test.index)) != 0))
  78. if result_to_np:
  79. self._logger.log_and_throw_warning(
  80. "The concatenated dataframe is converted to numpy")
  81. return cv, X, y
  82. def expanding_cv(self, test_proportion: float,
  83. start_train_proportion: float,
  84. step_proportion: float = None,
  85. expanding_test_size: bool = False,
  86. data_set_size: Union[float, None] = None,
  87. index: Union[pd.Series, np.ndarray, list, None] = None)\
  88. -> Union[Iterable[Tuple[List]], None]:
  89. """
  90. """
  91. try:
  92. assert((index is None) != (data_set_size is None)),\
  93. "Set index or data_set_size"
  94. index = pd.Series(index) if (index is not None)\
  95. else pd.Series(range(data_set_size))
  96. data_set_size = data_set_size or len(index)
  97. start_train_size = int(start_train_proportion * data_set_size)
  98. step_size = int(step_proportion * data_set_size)
  99. test_size = int(test_proportion * data_set_size)
  100. train_inds_set = (list(range(train_size))
  101. for train_size in
  102. takewhile(
  103. lambda x: x <= data_set_size - test_size,
  104. accumulate(repeat(start_train_size),
  105. lambda x, _: x + step_size)))
  106. for train_inds in train_inds_set:
  107. if expanding_test_size:
  108. yield (index[train_inds],
  109. index[train_inds[-1] + 1:
  110. train_inds[-1] + 1
  111. + int(test_proportion*len(train_inds))])
  112. else:
  113. yield (index[train_inds],
  114. index[train_inds[-1] + 1:
  115. train_inds[-1] + 1 + test_size])
  116. except Exception as e:
  117. self._logger.log_and_raise_error(("Failed to make expanding cv. "
  118. "Exit with error: {}".format(e)))
  119. def sliding_window_cv(
  120. self,
  121. test_proportion: float,
  122. train_proportion: float,
  123. step_proportion: float = None,
  124. data_set_size: Union[float, None] = None,
  125. index: Union[pd.Series, np.ndarray, list, None] = None)\
  126. -> Union[Iterable[Tuple[List]], None]:
  127. """
  128. """
  129. try:
  130. assert((index is None) != (data_set_size is None)),\
  131. "Set index or data_set_size"
  132. index = pd.Series(index) if (index is not None)\
  133. else pd.Series(range(data_set_size))
  134. data_set_size = data_set_size or len(index)
  135. train_size = int(train_proportion * data_set_size)
  136. test_size = int(test_proportion * data_set_size)
  137. step_size = int(step_proportion * data_set_size)
  138. train_sizes = takewhile(lambda x: x <= data_set_size - test_size,
  139. accumulate(repeat(train_size),
  140. lambda x, _: x + step_size))
  141. train_starts = takewhile(lambda x: x <= data_set_size
  142. - train_size - test_size,
  143. accumulate(repeat(step_size),
  144. lambda x, _: x + step_size))
  145. train_starts = chain([0], train_starts)
  146. train_inds_set = list(range(train_start, train_size)
  147. for train_start, train_size in
  148. zip(train_starts, train_sizes))
  149. cv = ((index[train_inds], index[train_inds[-1] + 1:
  150. train_inds[-1] + 1 + test_size])
  151. for train_inds in train_inds_set)
  152. return cv
  153. except Exception as e:
  154. self._logger.log_and_raise_error(
  155. ("Failed to make sliding window cv. "
  156. "Exit with error: {}".format(e)))
  157. def nested_expanding_cv(self,
  158. test_proportion: float,
  159. start_train_proportion: float,
  160. step_proportion: float = None,
  161. expanding_test_size: bool = False,
  162. data_set_size: Union[float, None] = None,
  163. index: Union[pd.Series, np.ndarray, list, None] = None)\
  164. -> Iterable[Tuple[List]]:
  165. """
  166. """
  167. logger = Log("make_nested_expanding_cv:")
  168. try:
  169. cv = self.expanding_cv(test_proportion=test_proportion,
  170. start_train_proportion=start_train_proportion,
  171. step_proportion=step_proportion,
  172. expanding_test_size=expanding_test_size,
  173. data_set_size=data_set_size,
  174. index=index)
  175. nested_cv = []
  176. for train_inds, test_inds in cv:
  177. fold_index = train_inds if index is not None\
  178. else None
  179. fold_size = len(train_inds) if index is None else None
  180. fold_cv = self.expanding_cv(
  181. test_proportion=test_proportion,
  182. start_train_proportion=start_train_proportion,
  183. step_proportion=step_proportion,
  184. expanding_test_size=expanding_test_size,
  185. data_set_size=fold_size,
  186. index=fold_index)
  187. nested_cv.append(list(fold_cv))
  188. return nested_cv
  189. except Exception as e:
  190. logger.log_and_raise_error(("Failed to make nested expanding cv. "
  191. "Exit with error: {}".format(e)))
  192. def cv_slice_dataset(self, X, y, train_inds, test_inds)\
  193. -> Tuple[Union[pd.DataFrame, np.ndarray],
  194. Union[pd.Series, np.ndarray]]:
  195. """
  196. """
  197. if isinstance(X, pd.DataFrame):
  198. X_train = X.loc[train_inds]
  199. X_val = X.loc[test_inds]
  200. else:
  201. X_train = X[train_inds]
  202. X_val = X[test_inds]
  203. if y is not None:
  204. y_train = y[train_inds]
  205. y_val = y[test_inds]
  206. return X_train, X_val, y_train, y_val