#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Tue Oct 16 16:08:47 2018 @author: tanya """ import types import logging import pandas as pd from collections import defaultdict from functools import reduce from libraries.logging.logging_utils import configure_logging from libraries.exception_handling import InputChecks class StatisticalFeatures: ''' Groups data by index columns and returns aggregated statistics for given columns :param list of tuples or dict index_cols: is either a list of tuples of form: [(colname_1, [aggfunc_1, aggfunc_2]), (colname_2, aggfunc_3)] or a dictionary of form: {colname_1 : [aggfunc_1, aggfunc_2], colname_2 : aggfunc_3} where colname_i is column to aggregate and aggfunc_i are either function variables or strings accepted by pandas for built-in function names. REMARQUE: using strings for built-in functions will speed up the calculations by a factor >= 20. WARNING: if multiple aggfuncs with the same name are given for a given column (like 'sum' and np.sum), then only the first one is kept. WARNING: nan values are ignored numpy and pandas built-in aggregation functions. ''' def __init__(self, data, index_cols, path_to_log = None): ''' ''' configure_logging(path_to_log) self.logger = logging.getLogger(__name__) self.checks = InputChecks(logger = self.logger) self.data = data self.checks.assert_correct_type({'data', [pd.DataFrame]}) self.index_cols = index_cols # make warning about missing values in index columns for col in self.index_cols: if data[col].isnull().any(): self.logger.warning('Index column ' + str(col) + ' contains missing values, no features for those will be returned') def get_kpis_by_aggregation(self, kpis): ''' Aggregates given fields with given aggregation functions USE CASE: per product find mean and standard variation of a price :param list or dict kpis: either a list of tuples like [(field1, [aggfunc1, aggfunc2]), (field2, aggfunc)] or a dictionary like {field1 : [aggfunc1, aggfunc2], field2 : aggfunc} where aggfunc-s are reducing functions of either function type or strings standing for functions built in pandas module :return: features with index- and kpi- columns :rtype: pandas DataFrame ''' def get_valid_agg_dict_from_kpis(kpis): ''' Filters inputs of incorrect shape or type, Filters out columns not present in data Removes multiple functions with the same name Makes an a quick check that the aggregation with given fields and functions does not fail on the first 2 lines Reports to the log :param list or dict kpis: ''' def get_name(x): ''' Returns function name for function and does nothing for string ''' if isinstance(x, types.FunctionType): return x.__name__ else: return x def passed_first_line_type_control(col, aggfunc): ''' Checks if aggregation works on the first 2 lines of the data ''' try: cols_of_object_type = set(self.data.columns[self.data.dtypes.eq(object)]) - set(self.index_cols) self.data.iloc[:2]\ .fillna(value = {c:'nan' for c in cols_of_object_type})\ .groupby(self.index_cols)\ .agg({col : aggfunc}) return True except Exception as e: self.logger.warning('Cannot use aggfunc ' + str(aggfunc) + ' on the column ' + str(col) + ' because of the error : ', str(e)) return False valid_kpi_dict = defaultdict(list) if isinstance(kpis, list): incorrect_lengths = [len(kpi) !=2 for kpi in kpis] if sum(incorrect_lengths) > 0: self.logger.warning('Inputs ' + str(kpis[incorrect_lengths]) + 'do not have correct length.') cols = list(zip(*kpis))[0] kpis = [t for t in kpis if (len(t) == 2) and (t[0] in self.data.columns)] elif isinstance(kpis, dict): cols = list(kpis.keys()) kpis = {k:v for k,v in kpis.items() if k in self.data.columns}.items() cols_not_in_data = set(cols) - set(self.data.columns) if len(cols_not_in_data) > 0: self.logger.warning('Columns ' + ', '.join([str(c) for c in cols_not_in_data]) + ' are not contained in data therefore cannot be used in feature generation.') for col, aggfuncs in kpis: if not isinstance(aggfuncs, list): aggfuncs = [aggfuncs] for aggfunc in aggfuncs: is_new_funcname = all([get_name(aggfunc) != get_name(f) for f in valid_kpi_dict[col]]) if not is_new_funcname: self.logger.warning('Aggfunc ' + str(aggfunc) + ' cannot be used in column ' + str(col) + ', aggfunc with same name is already used.') if passed_first_line_type_control(col, aggfunc) and is_new_funcname: valid_kpi_dict[col].append(aggfunc) return valid_kpi_dict agg_dict = get_valid_agg_dict_from_kpis(kpis) if len(agg_dict) > 0: new_names = ['_'.join([col, aggfunc.__name__]) if isinstance(aggfunc, types.FunctionType) else '_'.join([col, str(aggfunc)]) for col, aggfuncs in agg_dict.items() for aggfunc in aggfuncs] cols_of_object_type = set(self.data.columns[self.data.dtypes.eq(object)]) - set(self.index_cols) return self.data.fillna(value = {c:'nan' for c in cols_of_object_type})\ .groupby(self.index_cols)\ .agg(agg_dict)\ .set_axis(new_names, axis = 'columns', inplace = False)\ .reset_index() else: return self.data[self.index_cols].drop_duplicates().reset_index(drop = True) def get_value_stats(self, pivot_col, value_col = None, aggfunc = None, entries = None): ''' A wrapper crosstab method with index equal to index_cols USE CASE: per product find standart variation of the price in each city :param str pivot_col: column values of which become columns in the output :param str value_col: column name to fillin vlaues :param str or func aggfunc: count if None :param list entries: values of pivot_col to show :return: table with index- and kpi- columns :rtype: pandas DataFrame ''' # assert that types of the inputs are correct types_to_check = {'columns' : [str], 'value_col' : [str, type(None)], 'aggfunc' : ['str', types.FunctionType, type(None)], 'entries' : [list, type(None)]} self.checks.assert_correct_type(types_to_check) cols_to_check = [pivot_col] if not value_col is None: cols_to_check.append(value_col) self.checks.assert_column_presence(data = self.data, colnames = cols_to_check) if not entries is None: entry_filter = reduce(lambda a,b: a|b, [(self.data[pivot_col] == ent) for ent in entries]) else: entry_filter = pd.Series([True]*len(self.data)) index = [self.data.loc[entry_filter, col] for col in self.index_cols] columns = self.data.loc[entry_filter, pivot_col] if not value_col is None: value_col = self.data.loc[entry_filter, value_col] result = pd.crosstab(index = index, columns = columns, values = value_col, aggfunc = aggfunc) result = result.rename(columns = {c : value_col + '_' + str(c) for c in result.columns})\ .reset_index() return result def get_aggregated_value_stats(self, pivot_col, value_col = None, aggfunc_step1 = None, aggfuncs_step2 = None, entries = None): ''' Aggregates values obtained with method get_value_stats USE CASE: per product find average variation of the price over all cities :param str pivot_col: :param str value_col: :param str or func aggfunc_step1: aggfunc used in method get_value_stats :param list aggfuncs_step2: aggregation functions used to aggregate the output of method get_value_stats :param list entries: :return: table with index- and kpi- columns :rtype: pandas DataFrame ''' self.checks.assert_correct_type({'aggfuncs_step2' : [list, type(None)]}) value_stat_kpis = self.get_value_stat_kpis(pivot_col = pivot_col, value_col = value_col, aggfunc = aggfunc_step1, entries = entries) result = value_stat_kpis[self.index_cols].copy(deep = True) for aggfunc in aggfuncs_step2: colname = '_'.join(aggfunc, aggfunc_step1, value_col, pivot_col) if isinstance(aggfunc, str): result[colname] = getattr(value_stat_kpis.set_index(self.index_cols), aggfunc)().reset_index(drop = True) else: result[colname] = value_stat_kpis.set_index(self.index_cols)\ .apply(aggfunc, axis = 1)\ .reset_index(drop = True) return result def get_critical_value_stats(self, min_or_max, pivot_col, value_col = None, aggfunc = None): ''' Finds argmin or argmax of a column USE CASE: per product find the city with maximum variation of the price :param str min_or_max: must be in ['min', 'max'] :param str pivot_col: :param str value_col: :param str aggfunc: ''' self.checks.assert_valid_value(arname = 'min_or_max', val = min_or_max, valid_values = ['min', 'max']) if min_or_max == 'max': aggfuncs_step2 = ['idxmax'] else: aggfuncs_step2 = ['idxmin'] return self.get_aggregated_value_stat_kpis(pivot_col = pivot_col, value_col = value_col, aggfunc_step1 = aggfunc, aggfucs_step2 = aggfuncs_step2) # TODO : incorporate frequency, recency of numeric columns crossing a threshold value by default equal to 0. # can also add pick detection from the other project and calculate the number of picks. Probably first create TimeSeriesManipulation class. # write tests for all methods