StatisticalFeatures.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Tue Oct 16 16:08:47 2018
  5. @author: tanya
  6. """
  7. import types
  8. import logging
  9. import pandas as pd
  10. from collections import defaultdict
  11. from functools import reduce
  12. from libraries.logging.logging_utils import configure_logging
  13. from libraries.exception_handling import InputChecks
  14. class StatisticalFeatures:
  15. '''
  16. Groups data by index columns and returns aggregated statistics for given columns
  17. :param list of tuples or dict index_cols:
  18. is either a list of tuples of form: [(colname_1, [aggfunc_1, aggfunc_2]),
  19. (colname_2, aggfunc_3)]
  20. or a dictionary of form: {colname_1 : [aggfunc_1, aggfunc_2], colname_2 : aggfunc_3}
  21. where colname_i is column to aggregate and aggfunc_i are either
  22. function variables or strings accepted by pandas for built-in function names.
  23. REMARQUE: using strings for built-in functions will speed up the calculations by a factor >= 20.
  24. WARNING: if multiple aggfuncs with the same name are given for a given column (like 'sum' and np.sum),
  25. then only the first one is kept.
  26. WARNING: nan values are ignored numpy and pandas built-in aggregation functions.
  27. '''
  28. def __init__(self, data, index_cols, path_to_log = None):
  29. '''
  30. '''
  31. configure_logging(path_to_log)
  32. self.logger = logging.getLogger(__name__)
  33. self.checks = InputChecks(logger = self.logger)
  34. self.data = data
  35. self.checks.assert_correct_type({'data', [pd.DataFrame]})
  36. self.index_cols = index_cols
  37. # make warning about missing values in index columns
  38. for col in self.index_cols:
  39. if data[col].isnull().any():
  40. self.logger.warning('Index column ' + str(col) + ' contains missing values, no features for those will be returned')
  41. def get_kpis_by_aggregation(self, kpis):
  42. '''
  43. Aggregates given fields with given aggregation functions
  44. USE CASE: per product find mean and standard variation of a price
  45. :param list or dict kpis: either a list of tuples like [(field1, [aggfunc1, aggfunc2]), (field2, aggfunc)]
  46. or a dictionary like {field1 : [aggfunc1, aggfunc2], field2 : aggfunc}
  47. where aggfunc-s are reducing functions of either function type or strings standing for functions built in pandas module
  48. :return: features with index- and kpi- columns
  49. :rtype: pandas DataFrame
  50. '''
  51. def get_valid_agg_dict_from_kpis(kpis):
  52. '''
  53. Filters inputs of incorrect shape or type,
  54. Filters out columns not present in data
  55. Removes multiple functions with the same name
  56. Makes an a quick check that the aggregation with given fields and functions does not fail on the first 2 lines
  57. Reports to the log
  58. :param list or dict kpis:
  59. '''
  60. def get_name(x):
  61. '''
  62. Returns function name for function and does nothing for string
  63. '''
  64. if isinstance(x, types.FunctionType):
  65. return x.__name__
  66. else:
  67. return x
  68. def passed_first_line_type_control(col, aggfunc):
  69. '''
  70. Checks if aggregation works on the first 2 lines of the data
  71. '''
  72. try:
  73. cols_of_object_type = set(self.data.columns[self.data.dtypes.eq(object)]) - set(self.index_cols)
  74. self.data.iloc[:2]\
  75. .fillna(value = {c:'nan' for c in cols_of_object_type})\
  76. .groupby(self.index_cols)\
  77. .agg({col : aggfunc})
  78. return True
  79. except Exception as e:
  80. self.logger.warning('Cannot use aggfunc ' + str(aggfunc) + ' on the column ' + str(col) + ' because of the error : ', str(e))
  81. return False
  82. valid_kpi_dict = defaultdict(list)
  83. if isinstance(kpis, list):
  84. incorrect_lengths = [len(kpi) !=2 for kpi in kpis]
  85. if sum(incorrect_lengths) > 0:
  86. self.logger.warning('Inputs ' + str(kpis[incorrect_lengths]) + 'do not have correct length.')
  87. cols = list(zip(*kpis))[0]
  88. kpis = [t for t in kpis if (len(t) == 2) and (t[0] in self.data.columns)]
  89. elif isinstance(kpis, dict):
  90. cols = list(kpis.keys())
  91. kpis = {k:v for k,v in kpis.items() if k in self.data.columns}.items()
  92. cols_not_in_data = set(cols) - set(self.data.columns)
  93. if len(cols_not_in_data) > 0:
  94. self.logger.warning('Columns ' + ', '.join([str(c) for c in cols_not_in_data]) + ' are not contained in data therefore cannot be used in feature generation.')
  95. for col, aggfuncs in kpis:
  96. if not isinstance(aggfuncs, list):
  97. aggfuncs = [aggfuncs]
  98. for aggfunc in aggfuncs:
  99. is_new_funcname = all([get_name(aggfunc) != get_name(f) for f in valid_kpi_dict[col]])
  100. if not is_new_funcname:
  101. self.logger.warning('Aggfunc ' + str(aggfunc) + ' cannot be used in column ' + str(col) + ', aggfunc with same name is already used.')
  102. if passed_first_line_type_control(col, aggfunc) and is_new_funcname:
  103. valid_kpi_dict[col].append(aggfunc)
  104. return valid_kpi_dict
  105. agg_dict = get_valid_agg_dict_from_kpis(kpis)
  106. if len(agg_dict) > 0:
  107. new_names = ['_'.join([col, aggfunc.__name__]) if isinstance(aggfunc, types.FunctionType)
  108. else '_'.join([col, str(aggfunc)])
  109. for col, aggfuncs in agg_dict.items() for aggfunc in aggfuncs]
  110. cols_of_object_type = set(self.data.columns[self.data.dtypes.eq(object)]) - set(self.index_cols)
  111. return self.data.fillna(value = {c:'nan' for c in cols_of_object_type})\
  112. .groupby(self.index_cols)\
  113. .agg(agg_dict)\
  114. .set_axis(new_names, axis = 'columns', inplace = False)\
  115. .reset_index()
  116. else:
  117. return self.data[self.index_cols].drop_duplicates().reset_index(drop = True)
  118. def get_value_stats(self, pivot_col, value_col = None, aggfunc = None, entries = None):
  119. '''
  120. A wrapper crosstab method with index equal to index_cols
  121. USE CASE: per product find standart variation of the price in each city
  122. :param str pivot_col: column values of which become columns in the output
  123. :param str value_col: column name to fillin vlaues
  124. :param str or func aggfunc: count if None
  125. :param list entries: values of pivot_col to show
  126. :return: table with index- and kpi- columns
  127. :rtype: pandas DataFrame
  128. '''
  129. # assert that types of the inputs are correct
  130. types_to_check = {'columns' : [str],
  131. 'value_col' : [str, type(None)],
  132. 'aggfunc' : ['str', types.FunctionType, type(None)],
  133. 'entries' : [list, type(None)]}
  134. self.checks.assert_correct_type(types_to_check)
  135. cols_to_check = [pivot_col]
  136. if not value_col is None:
  137. cols_to_check.append(value_col)
  138. self.checks.assert_column_presence(data = self.data, colnames = cols_to_check)
  139. if not entries is None:
  140. entry_filter = reduce(lambda a,b: a|b, [(self.data[pivot_col] == ent) for ent in entries])
  141. else:
  142. entry_filter = pd.Series([True]*len(self.data))
  143. index = [self.data.loc[entry_filter, col] for col in self.index_cols]
  144. columns = self.data.loc[entry_filter, pivot_col]
  145. if not value_col is None:
  146. value_col = self.data.loc[entry_filter, value_col]
  147. result = pd.crosstab(index = index, columns = columns, values = value_col, aggfunc = aggfunc)
  148. result = result.rename(columns = {c : value_col + '_' + str(c) for c in result.columns})\
  149. .reset_index()
  150. return result
  151. def get_aggregated_value_stats(self, pivot_col, value_col = None, aggfunc_step1 = None, aggfuncs_step2 = None, entries = None):
  152. '''
  153. Aggregates values obtained with method get_value_stats
  154. USE CASE: per product find average variation of the price over all cities
  155. :param str pivot_col:
  156. :param str value_col:
  157. :param str or func aggfunc_step1: aggfunc used in method get_value_stats
  158. :param list aggfuncs_step2: aggregation functions used to aggregate the output of method get_value_stats
  159. :param list entries:
  160. :return: table with index- and kpi- columns
  161. :rtype: pandas DataFrame
  162. '''
  163. self.checks.assert_correct_type({'aggfuncs_step2' : [list, type(None)]})
  164. value_stat_kpis = self.get_value_stat_kpis(pivot_col = pivot_col, value_col = value_col, aggfunc = aggfunc_step1, entries = entries)
  165. result = value_stat_kpis[self.index_cols].copy(deep = True)
  166. for aggfunc in aggfuncs_step2:
  167. colname = '_'.join(aggfunc, aggfunc_step1, value_col, pivot_col)
  168. if isinstance(aggfunc, str):
  169. result[colname] = getattr(value_stat_kpis.set_index(self.index_cols), aggfunc)().reset_index(drop = True)
  170. else:
  171. result[colname] = value_stat_kpis.set_index(self.index_cols)\
  172. .apply(aggfunc, axis = 1)\
  173. .reset_index(drop = True)
  174. return result
  175. def get_critical_value_stats(self, min_or_max, pivot_col, value_col = None, aggfunc = None):
  176. '''
  177. Finds argmin or argmax of a column
  178. USE CASE: per product find the city with maximum variation of the price
  179. :param str min_or_max: must be in ['min', 'max']
  180. :param str pivot_col:
  181. :param str value_col:
  182. :param str aggfunc:
  183. '''
  184. self.checks.assert_valid_value(arname = 'min_or_max', val = min_or_max, valid_values = ['min', 'max'])
  185. if min_or_max == 'max':
  186. aggfuncs_step2 = ['idxmax']
  187. else:
  188. aggfuncs_step2 = ['idxmin']
  189. return self.get_aggregated_value_stat_kpis(pivot_col = pivot_col,
  190. value_col = value_col,
  191. aggfunc_step1 = aggfunc,
  192. aggfucs_step2 = aggfuncs_step2)
  193. # TODO : incorporate frequency, recency of numeric columns crossing a threshold value by default equal to 0.
  194. # can also add pick detection from the other project and calculate the number of picks. Probably first create TimeSeriesManipulation class.
  195. # write tests for all methods