#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Wed Oct 9 15:17:34 2019 @author: oskar @description: Class which flattens nested Dataframes, Dictionaries and Lists into tabular form """ import sys import os import time import pandas as pd import copy sys.path.append(os.getcwd()) from cdplib.log import Log class FlattenData(): def __init__(self): self._log = Log("Flatten data") def flatten(self, data) -> pd.DataFrame(): ''' :parm data: data given in either dictionary, list or dataframe format. ''' assert(isinstance(data, (list, dict, pd.DataFrame, pd.Series))),\ "Parameter 'data' either be of List, Dictionary or DataFrame type" in_length=0 start = time.time() if type(data) is pd.DataFrame: in_length = len(data.columns) return_data = self.flatten_dataframe(data) elif type(data) is pd.Series: data = pd.DataFrame(data) in_length = len(data.columns) return_data = self.flatten_dataframe(data) elif type(data) is dict: in_length = len(data) return_data = self.flatten_dict(data) elif type(data) is list: in_length = len(data) return_data = self.flatten_list(data) else: self._log.log_and_raise_warning(("Input data type '{}' is not supported").format(type(data))) return None result_dataframe = pd.DataFrame.from_dict(return_data, orient='index') self._log.info(('Data has been flattened, created {} columns in {} seconds').format(len(result_dataframe.columns)- in_length, time.time()-start)) return result_dataframe def flatten_dataframe(self, dataframe: pd.DataFrame, incoming_key: str = None): ''' :param pd.Dataframe dataframe: dataframe containing the data to be flattened :param str incoming_key: string to be appended to the key ''' assert(isinstance(dataframe, pd.DataFrame)),\ "Parameter 'dataframe' be of DataFrame type" if incoming_key is not None: assert(isinstance(incoming_key, str)),\ "Parameter 'incoming_key' be of String type" result_dict = {} for index, row in dataframe.iterrows(): temp_result_dict = {} for key, value in row.iteritems(): temp_result = {} if incoming_key is not None: key = incoming_key + '_' + key if type(value) == list: temp_result = self.flatten_list(value, key) elif type(value) == dict: temp_result = self.flatten_dict(value, key) else: temp_result_dict[key] = value if len(temp_result) > 0: temp_result_dict = self.append_to_dict(temp_result_dict, temp_result) result_dict[index] = copy.deepcopy(temp_result_dict) return result_dict def flatten_dict(self, dictionary: dict, incoming_key: str = None): ''' :param dict dictionary: dictionary containing the data to be flattened :param str incoming_key: string to be appended to the key ''' assert(isinstance(dictionary, dict)),\ "Parameter 'dictionary' be of Dictionary type" if incoming_key is not None: assert(isinstance(incoming_key, str)),\ "Parameter 'incoming_key' be of String type" result_dict = {} for key in dictionary: temp_dataframe = dictionary[key] temp_result = {} if incoming_key is not None: key = incoming_key + '_' + key if type(temp_dataframe) == list: temp_result = self.flatten_list(temp_dataframe, key) elif type(temp_dataframe) == dict: temp_result = self.flatten_dict(temp_dataframe, key) else: result_dict[key] = temp_dataframe if len(temp_result) > 0: result_dict = self.append_to_dict(result_dict, temp_result) return result_dict def flatten_list(self, data_list: list, incoming_key: str = None): ''' :param list data_list: list containing the data to be flattened :param str incoming_key: string to be appended to the key ''' assert(isinstance(data_list, list)),\ "Parameter 'data_list' be of List type" if incoming_key is not None: assert(isinstance(incoming_key, str)),\ "Parameter 'incoming_key' be of String type" result_dict = {} for iteration, item in enumerate(data_list): temp_dataframe = item temp_result = {} key = incoming_key if incoming_key is not None: # OEBB SPECIFIC IF STATEMENT if type(data_list[iteration]) is dict and 'stationsnummer' in data_list[iteration].keys() and 'stage' in data_list[iteration].keys() : key = incoming_key + '_' + str(data_list[iteration]['stationsnummer']) + '_' + str(data_list[iteration]['stage']) else: key = incoming_key + '_' + str(iteration) else: key = str(iteration) if type(temp_dataframe) == list: temp_result = self.flatten_list(temp_dataframe, key) elif type(temp_dataframe) == dict: temp_result = self.flatten_dict(temp_dataframe, key) else: result_dict[key] = temp_dataframe if len(temp_result) > 0: result_dict = self.append_to_dict(result_dict, temp_result) return result_dict def append_to_dict(self, dictionary: dict, to_append): ''' :param dict dictionary: dictionary which holds all the resulting data. :param dict to_append: data to be added to the resulting dictionary. ''' assert(isinstance(dictionary, dict)),\ "Parameter 'dictionary' be of Dictionary type" assert(isinstance(to_append, dict)),\ "Parameter 'to_append' be of Dictionary type" for key in to_append: dictionary[key] = to_append[key] return dictionary