123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Tue Oct 16 16:08:47 2018
- @author: tanya
- """
- import types
- import logging
- import pandas as pd
- from collections import defaultdict
- from functools import reduce
- from libraries.logging.logging_utils import configure_logging
- from libraries.exception_handling import InputChecks
-
- class StatisticalFeatures:
- '''
- Groups data by index columns and returns aggregated statistics for given columns
-
- :param list of tuples or dict index_cols:
- is either a list of tuples of form: [(colname_1, [aggfunc_1, aggfunc_2]),
- (colname_2, aggfunc_3)]
- or a dictionary of form: {colname_1 : [aggfunc_1, aggfunc_2], colname_2 : aggfunc_3}
- where colname_i is column to aggregate and aggfunc_i are either
- function variables or strings accepted by pandas for built-in function names.
- REMARQUE: using strings for built-in functions will speed up the calculations by a factor >= 20.
- WARNING: if multiple aggfuncs with the same name are given for a given column (like 'sum' and np.sum),
- then only the first one is kept.
- WARNING: nan values are ignored numpy and pandas built-in aggregation functions.
-
- '''
- def __init__(self, data, index_cols, path_to_log = None):
- '''
- '''
- configure_logging(path_to_log)
-
- self.logger = logging.getLogger(__name__)
-
- self.checks = InputChecks(logger = self.logger)
-
- self.data = data
-
- self.checks.assert_correct_type({'data', [pd.DataFrame]})
-
- self.index_cols = index_cols
-
- # make warning about missing values in index columns
- for col in self.index_cols:
- if data[col].isnull().any():
- self.logger.warning('Index column ' + str(col) + ' contains missing values, no features for those will be returned')
-
- def get_kpis_by_aggregation(self, kpis):
- '''
- Aggregates given fields with given aggregation functions
- USE CASE: per product find mean and standard variation of a price
-
- :param list or dict kpis: either a list of tuples like [(field1, [aggfunc1, aggfunc2]), (field2, aggfunc)]
- or a dictionary like {field1 : [aggfunc1, aggfunc2], field2 : aggfunc}
- where aggfunc-s are reducing functions of either function type or strings standing for functions built in pandas module
-
- :return: features with index- and kpi- columns
- :rtype: pandas DataFrame
- '''
- def get_valid_agg_dict_from_kpis(kpis):
- '''
- Filters inputs of incorrect shape or type,
- Filters out columns not present in data
- Removes multiple functions with the same name
- Makes an a quick check that the aggregation with given fields and functions does not fail on the first 2 lines
- Reports to the log
- :param list or dict kpis:
- '''
- def get_name(x):
- '''
- Returns function name for function and does nothing for string
- '''
- if isinstance(x, types.FunctionType):
- return x.__name__
- else:
- return x
-
- def passed_first_line_type_control(col, aggfunc):
- '''
- Checks if aggregation works on the first 2 lines of the data
- '''
- try:
- cols_of_object_type = set(self.data.columns[self.data.dtypes.eq(object)]) - set(self.index_cols)
- self.data.iloc[:2]\
- .fillna(value = {c:'nan' for c in cols_of_object_type})\
- .groupby(self.index_cols)\
- .agg({col : aggfunc})
- return True
- except Exception as e:
- self.logger.warning('Cannot use aggfunc ' + str(aggfunc) + ' on the column ' + str(col) + ' because of the error : ', str(e))
- return False
-
-
-
- valid_kpi_dict = defaultdict(list)
-
- if isinstance(kpis, list):
- incorrect_lengths = [len(kpi) !=2 for kpi in kpis]
- if sum(incorrect_lengths) > 0:
- self.logger.warning('Inputs ' + str(kpis[incorrect_lengths]) + 'do not have correct length.')
-
- cols = list(zip(*kpis))[0]
- kpis = [t for t in kpis if (len(t) == 2) and (t[0] in self.data.columns)]
- elif isinstance(kpis, dict):
- cols = list(kpis.keys())
- kpis = {k:v for k,v in kpis.items() if k in self.data.columns}.items()
-
- cols_not_in_data = set(cols) - set(self.data.columns)
- if len(cols_not_in_data) > 0:
- self.logger.warning('Columns ' + ', '.join([str(c) for c in cols_not_in_data]) + ' are not contained in data therefore cannot be used in feature generation.')
-
- for col, aggfuncs in kpis:
- if not isinstance(aggfuncs, list):
- aggfuncs = [aggfuncs]
-
- for aggfunc in aggfuncs:
- is_new_funcname = all([get_name(aggfunc) != get_name(f) for f in valid_kpi_dict[col]])
- if not is_new_funcname:
- self.logger.warning('Aggfunc ' + str(aggfunc) + ' cannot be used in column ' + str(col) + ', aggfunc with same name is already used.')
-
- if passed_first_line_type_control(col, aggfunc) and is_new_funcname:
- valid_kpi_dict[col].append(aggfunc)
-
- return valid_kpi_dict
-
-
-
-
- agg_dict = get_valid_agg_dict_from_kpis(kpis)
-
- if len(agg_dict) > 0:
-
- new_names = ['_'.join([col, aggfunc.__name__]) if isinstance(aggfunc, types.FunctionType)
- else '_'.join([col, str(aggfunc)])
- for col, aggfuncs in agg_dict.items() for aggfunc in aggfuncs]
-
- cols_of_object_type = set(self.data.columns[self.data.dtypes.eq(object)]) - set(self.index_cols)
- return self.data.fillna(value = {c:'nan' for c in cols_of_object_type})\
- .groupby(self.index_cols)\
- .agg(agg_dict)\
- .set_axis(new_names, axis = 'columns', inplace = False)\
- .reset_index()
- else:
- return self.data[self.index_cols].drop_duplicates().reset_index(drop = True)
-
-
-
-
-
-
-
- def get_value_stats(self, pivot_col, value_col = None, aggfunc = None, entries = None):
- '''
- A wrapper crosstab method with index equal to index_cols
- USE CASE: per product find standart variation of the price in each city
-
- :param str pivot_col: column values of which become columns in the output
- :param str value_col: column name to fillin vlaues
- :param str or func aggfunc: count if None
- :param list entries: values of pivot_col to show
- :return: table with index- and kpi- columns
- :rtype: pandas DataFrame
- '''
-
- # assert that types of the inputs are correct
- types_to_check = {'columns' : [str],
- 'value_col' : [str, type(None)],
- 'aggfunc' : ['str', types.FunctionType, type(None)],
- 'entries' : [list, type(None)]}
-
- self.checks.assert_correct_type(types_to_check)
-
- cols_to_check = [pivot_col]
- if not value_col is None:
- cols_to_check.append(value_col)
- self.checks.assert_column_presence(data = self.data, colnames = cols_to_check)
- if not entries is None:
- entry_filter = reduce(lambda a,b: a|b, [(self.data[pivot_col] == ent) for ent in entries])
- else:
- entry_filter = pd.Series([True]*len(self.data))
-
- index = [self.data.loc[entry_filter, col] for col in self.index_cols]
- columns = self.data.loc[entry_filter, pivot_col]
- if not value_col is None:
- value_col = self.data.loc[entry_filter, value_col]
-
- result = pd.crosstab(index = index, columns = columns, values = value_col, aggfunc = aggfunc)
- result = result.rename(columns = {c : value_col + '_' + str(c) for c in result.columns})\
- .reset_index()
- return result
-
-
-
- def get_aggregated_value_stats(self, pivot_col, value_col = None, aggfunc_step1 = None, aggfuncs_step2 = None, entries = None):
- '''
- Aggregates values obtained with method get_value_stats
- USE CASE: per product find average variation of the price over all cities
-
- :param str pivot_col:
- :param str value_col:
- :param str or func aggfunc_step1: aggfunc used in method get_value_stats
- :param list aggfuncs_step2: aggregation functions used to aggregate the output of method get_value_stats
- :param list entries:
- :return: table with index- and kpi- columns
- :rtype: pandas DataFrame
- '''
- self.checks.assert_correct_type({'aggfuncs_step2' : [list, type(None)]})
-
- value_stat_kpis = self.get_value_stat_kpis(pivot_col = pivot_col, value_col = value_col, aggfunc = aggfunc_step1, entries = entries)
- result = value_stat_kpis[self.index_cols].copy(deep = True)
-
- for aggfunc in aggfuncs_step2:
- colname = '_'.join(aggfunc, aggfunc_step1, value_col, pivot_col)
-
- if isinstance(aggfunc, str):
- result[colname] = getattr(value_stat_kpis.set_index(self.index_cols), aggfunc)().reset_index(drop = True)
- else:
- result[colname] = value_stat_kpis.set_index(self.index_cols)\
- .apply(aggfunc, axis = 1)\
- .reset_index(drop = True)
-
- return result
-
-
-
-
-
- def get_critical_value_stats(self, min_or_max, pivot_col, value_col = None, aggfunc = None):
- '''
- Finds argmin or argmax of a column
- USE CASE: per product find the city with maximum variation of the price
-
- :param str min_or_max: must be in ['min', 'max']
- :param str pivot_col:
- :param str value_col:
- :param str aggfunc:
- '''
- self.checks.assert_valid_value(arname = 'min_or_max', val = min_or_max, valid_values = ['min', 'max'])
-
- if min_or_max == 'max':
- aggfuncs_step2 = ['idxmax']
- else:
- aggfuncs_step2 = ['idxmin']
-
- return self.get_aggregated_value_stat_kpis(pivot_col = pivot_col,
- value_col = value_col,
- aggfunc_step1 = aggfunc,
- aggfucs_step2 = aggfuncs_step2)
-
-
-
-
- # TODO : incorporate frequency, recency of numeric columns crossing a threshold value by default equal to 0.
-
- # can also add pick detection from the other project and calculate the number of picks. Probably first create TimeSeriesManipulation class.
-
- # write tests for all methods
|