|
@@ -0,0 +1,270 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+Created on Tue Oct 16 16:08:47 2018
|
|
|
+
|
|
|
+@author: tanya
|
|
|
+"""
|
|
|
+import types
|
|
|
+import logging
|
|
|
+import pandas as pd
|
|
|
+
|
|
|
+from collections import defaultdict
|
|
|
+from functools import reduce
|
|
|
+
|
|
|
+from libraries.logging.logging_utils import configure_logging
|
|
|
+from libraries.exception_handling import InputChecks
|
|
|
+
|
|
|
+class StatisticalFeatures:
|
|
|
+ '''
|
|
|
+ Groups data by index columns and returns aggregated statistics for given columns
|
|
|
+
|
|
|
+ :param list of tuples or dict index_cols:
|
|
|
+ is either a list of tuples of form: [(colname_1, [aggfunc_1, aggfunc_2]),
|
|
|
+ (colname_2, aggfunc_3)]
|
|
|
+ or a dictionary of form: {colname_1 : [aggfunc_1, aggfunc_2], colname_2 : aggfunc_3}
|
|
|
+ where colname_i is column to aggregate and aggfunc_i are either
|
|
|
+ function variables or strings accepted by pandas for built-in function names.
|
|
|
+ REMARQUE: using strings for built-in functions will speed up the calculations by a factor >= 20.
|
|
|
+ WARNING: if multiple aggfuncs with the same name are given for a given column (like 'sum' and np.sum),
|
|
|
+ then only the first one is kept.
|
|
|
+ WARNING: nan values are ignored numpy and pandas built-in aggregation functions.
|
|
|
+
|
|
|
+ '''
|
|
|
+ def __init__(self, data, index_cols, path_to_log = None):
|
|
|
+ '''
|
|
|
+ '''
|
|
|
+ configure_logging(path_to_log)
|
|
|
+
|
|
|
+ self.logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+ self.checks = InputChecks(logger = self.logger)
|
|
|
+
|
|
|
+ self.data = data
|
|
|
+
|
|
|
+ self.checks.assert_correct_type({'data', [pd.DataFrame]})
|
|
|
+
|
|
|
+ self.index_cols = index_cols
|
|
|
+
|
|
|
+ # make warning about missing values in index columns
|
|
|
+ for col in self.index_cols:
|
|
|
+ if data[col].isnull().any():
|
|
|
+ self.logger.warning('Index column ' + str(col) + ' contains missing values, no features for those will be returned')
|
|
|
+
|
|
|
+
|
|
|
+ def get_kpis_by_aggregation(self, kpis):
|
|
|
+ '''
|
|
|
+ Aggregates given fields with given aggregation functions
|
|
|
+ USE CASE: per product find mean and standard variation of a price
|
|
|
+
|
|
|
+ :param list or dict kpis: either a list of tuples like [(field1, [aggfunc1, aggfunc2]), (field2, aggfunc)]
|
|
|
+ or a dictionary like {field1 : [aggfunc1, aggfunc2], field2 : aggfunc}
|
|
|
+ where aggfunc-s are reducing functions of either function type or strings standing for functions built in pandas module
|
|
|
+
|
|
|
+ :return: features with index- and kpi- columns
|
|
|
+ :rtype: pandas DataFrame
|
|
|
+ '''
|
|
|
+ def get_valid_agg_dict_from_kpis(kpis):
|
|
|
+ '''
|
|
|
+ Filters inputs of incorrect shape or type,
|
|
|
+ Filters out columns not present in data
|
|
|
+ Removes multiple functions with the same name
|
|
|
+ Makes an a quick check that the aggregation with given fields and functions does not fail on the first 2 lines
|
|
|
+ Reports to the log
|
|
|
+ :param list or dict kpis:
|
|
|
+ '''
|
|
|
+ def get_name(x):
|
|
|
+ '''
|
|
|
+ Returns function name for function and does nothing for string
|
|
|
+ '''
|
|
|
+ if isinstance(x, types.FunctionType):
|
|
|
+ return x.__name__
|
|
|
+ else:
|
|
|
+ return x
|
|
|
+
|
|
|
+ def passed_first_line_type_control(col, aggfunc):
|
|
|
+ '''
|
|
|
+ Checks if aggregation works on the first 2 lines of the data
|
|
|
+ '''
|
|
|
+ try:
|
|
|
+ cols_of_object_type = set(self.data.columns[self.data.dtypes.eq(object)]) - set(self.index_cols)
|
|
|
+ self.data.iloc[:2]\
|
|
|
+ .fillna(value = {c:'nan' for c in cols_of_object_type})\
|
|
|
+ .groupby(self.index_cols)\
|
|
|
+ .agg({col : aggfunc})
|
|
|
+ return True
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.warning('Cannot use aggfunc ' + str(aggfunc) + ' on the column ' + str(col) + ' because of the error : ', str(e))
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ valid_kpi_dict = defaultdict(list)
|
|
|
+
|
|
|
+ if isinstance(kpis, list):
|
|
|
+ incorrect_lengths = [len(kpi) !=2 for kpi in kpis]
|
|
|
+ if sum(incorrect_lengths) > 0:
|
|
|
+ self.logger.warning('Inputs ' + str(kpis[incorrect_lengths]) + 'do not have correct length.')
|
|
|
+
|
|
|
+ cols = list(zip(*kpis))[0]
|
|
|
+ kpis = [t for t in kpis if (len(t) == 2) and (t[0] in self.data.columns)]
|
|
|
+ elif isinstance(kpis, dict):
|
|
|
+ cols = list(kpis.keys())
|
|
|
+ kpis = {k:v for k,v in kpis.items() if k in self.data.columns}.items()
|
|
|
+
|
|
|
+ cols_not_in_data = set(cols) - set(self.data.columns)
|
|
|
+ if len(cols_not_in_data) > 0:
|
|
|
+ self.logger.warning('Columns ' + ', '.join([str(c) for c in cols_not_in_data]) + ' are not contained in data therefore cannot be used in feature generation.')
|
|
|
+
|
|
|
+ for col, aggfuncs in kpis:
|
|
|
+ if not isinstance(aggfuncs, list):
|
|
|
+ aggfuncs = [aggfuncs]
|
|
|
+
|
|
|
+ for aggfunc in aggfuncs:
|
|
|
+ is_new_funcname = all([get_name(aggfunc) != get_name(f) for f in valid_kpi_dict[col]])
|
|
|
+ if not is_new_funcname:
|
|
|
+ self.logger.warning('Aggfunc ' + str(aggfunc) + ' cannot be used in column ' + str(col) + ', aggfunc with same name is already used.')
|
|
|
+
|
|
|
+ if passed_first_line_type_control(col, aggfunc) and is_new_funcname:
|
|
|
+ valid_kpi_dict[col].append(aggfunc)
|
|
|
+
|
|
|
+ return valid_kpi_dict
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ agg_dict = get_valid_agg_dict_from_kpis(kpis)
|
|
|
+
|
|
|
+ if len(agg_dict) > 0:
|
|
|
+
|
|
|
+ new_names = ['_'.join([col, aggfunc.__name__]) if isinstance(aggfunc, types.FunctionType)
|
|
|
+ else '_'.join([col, str(aggfunc)])
|
|
|
+ for col, aggfuncs in agg_dict.items() for aggfunc in aggfuncs]
|
|
|
+
|
|
|
+ cols_of_object_type = set(self.data.columns[self.data.dtypes.eq(object)]) - set(self.index_cols)
|
|
|
+ return self.data.fillna(value = {c:'nan' for c in cols_of_object_type})\
|
|
|
+ .groupby(self.index_cols)\
|
|
|
+ .agg(agg_dict)\
|
|
|
+ .set_axis(new_names, axis = 'columns', inplace = False)\
|
|
|
+ .reset_index()
|
|
|
+ else:
|
|
|
+ return self.data[self.index_cols].drop_duplicates().reset_index(drop = True)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def get_value_stats(self, pivot_col, value_col = None, aggfunc = None, entries = None):
|
|
|
+ '''
|
|
|
+ A wrapper crosstab method with index equal to index_cols
|
|
|
+ USE CASE: per product find standart variation of the price in each city
|
|
|
+
|
|
|
+ :param str pivot_col: column values of which become columns in the output
|
|
|
+ :param str value_col: column name to fillin vlaues
|
|
|
+ :param str or func aggfunc: count if None
|
|
|
+ :param list entries: values of pivot_col to show
|
|
|
+ :return: table with index- and kpi- columns
|
|
|
+ :rtype: pandas DataFrame
|
|
|
+ '''
|
|
|
+
|
|
|
+ # assert that types of the inputs are correct
|
|
|
+ types_to_check = {'columns' : [str],
|
|
|
+ 'value_col' : [str, type(None)],
|
|
|
+ 'aggfunc' : ['str', types.FunctionType, type(None)],
|
|
|
+ 'entries' : [list, type(None)]}
|
|
|
+
|
|
|
+ self.checks.assert_correct_type(types_to_check)
|
|
|
+
|
|
|
+ cols_to_check = [pivot_col]
|
|
|
+ if not value_col is None:
|
|
|
+ cols_to_check.append(value_col)
|
|
|
+ self.checks.assert_column_presence(data = self.data, colnames = cols_to_check)
|
|
|
+
|
|
|
+ if not entries is None:
|
|
|
+ entry_filter = reduce(lambda a,b: a|b, [(self.data[pivot_col] == ent) for ent in entries])
|
|
|
+ else:
|
|
|
+ entry_filter = pd.Series([True]*len(self.data))
|
|
|
+
|
|
|
+ index = [self.data.loc[entry_filter, col] for col in self.index_cols]
|
|
|
+ columns = self.data.loc[entry_filter, pivot_col]
|
|
|
+ if not value_col is None:
|
|
|
+ value_col = self.data.loc[entry_filter, value_col]
|
|
|
+
|
|
|
+ result = pd.crosstab(index = index, columns = columns, values = value_col, aggfunc = aggfunc)
|
|
|
+ result = result.rename(columns = {c : value_col + '_' + str(c) for c in result.columns})\
|
|
|
+ .reset_index()
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def get_aggregated_value_stats(self, pivot_col, value_col = None, aggfunc_step1 = None, aggfuncs_step2 = None, entries = None):
|
|
|
+ '''
|
|
|
+ Aggregates values obtained with method get_value_stats
|
|
|
+ USE CASE: per product find average variation of the price over all cities
|
|
|
+
|
|
|
+ :param str pivot_col:
|
|
|
+ :param str value_col:
|
|
|
+ :param str or func aggfunc_step1: aggfunc used in method get_value_stats
|
|
|
+ :param list aggfuncs_step2: aggregation functions used to aggregate the output of method get_value_stats
|
|
|
+ :param list entries:
|
|
|
+ :return: table with index- and kpi- columns
|
|
|
+ :rtype: pandas DataFrame
|
|
|
+ '''
|
|
|
+ self.checks.assert_correct_type({'aggfuncs_step2' : [list, type(None)]})
|
|
|
+
|
|
|
+ value_stat_kpis = self.get_value_stat_kpis(pivot_col = pivot_col, value_col = value_col, aggfunc = aggfunc_step1, entries = entries)
|
|
|
+
|
|
|
+ result = value_stat_kpis[self.index_cols].copy(deep = True)
|
|
|
+
|
|
|
+ for aggfunc in aggfuncs_step2:
|
|
|
+ colname = '_'.join(aggfunc, aggfunc_step1, value_col, pivot_col)
|
|
|
+
|
|
|
+ if isinstance(aggfunc, str):
|
|
|
+ result[colname] = getattr(value_stat_kpis.set_index(self.index_cols), aggfunc)().reset_index(drop = True)
|
|
|
+ else:
|
|
|
+ result[colname] = value_stat_kpis.set_index(self.index_cols)\
|
|
|
+ .apply(aggfunc, axis = 1)\
|
|
|
+ .reset_index(drop = True)
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def get_critical_value_stats(self, min_or_max, pivot_col, value_col = None, aggfunc = None):
|
|
|
+ '''
|
|
|
+ Finds argmin or argmax of a column
|
|
|
+ USE CASE: per product find the city with maximum variation of the price
|
|
|
+
|
|
|
+ :param str min_or_max: must be in ['min', 'max']
|
|
|
+ :param str pivot_col:
|
|
|
+ :param str value_col:
|
|
|
+ :param str aggfunc:
|
|
|
+ '''
|
|
|
+ self.checks.assert_valid_value(arname = 'min_or_max', val = min_or_max, valid_values = ['min', 'max'])
|
|
|
+
|
|
|
+ if min_or_max == 'max':
|
|
|
+ aggfuncs_step2 = ['idxmax']
|
|
|
+ else:
|
|
|
+ aggfuncs_step2 = ['idxmin']
|
|
|
+
|
|
|
+ return self.get_aggregated_value_stat_kpis(pivot_col = pivot_col,
|
|
|
+ value_col = value_col,
|
|
|
+ aggfunc_step1 = aggfunc,
|
|
|
+ aggfucs_step2 = aggfuncs_step2)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ # TODO : incorporate frequency, recency of numeric columns crossing a threshold value by default equal to 0.
|
|
|
+
|
|
|
+ # can also add pick detection from the other project and calculate the number of picks. Probably first create TimeSeriesManipulation class.
|
|
|
+
|
|
|
+ # write tests for all methods
|