#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Wed Oct 9 15:17:34 2019 @author: oskar @description: Class which flattens nested Dataframes, Dictionaries and Lists into tabular form """ import sys import os import time import pandas as pd import copy sys.path.append(os.getcwd()) from cdplib.log import Log class FlattenData(): def __init__(self): self._log = Log("Flatten data") def flatten(self, data, labels_to_ignore: list = []) -> pd.DataFrame(): ''' :parm data: data given in either dictionary, list or dataframe format. ''' assert(isinstance(data, (list, dict, pd.DataFrame, pd.Series))),\ "Parameter 'data' either be of List, Dictionary or DataFrame type" in_length=0 start = time.time() index_name=None if type(data) is pd.DataFrame: in_length = len(data.columns) index_name = data.index.name return_data = self.flatten_dataframe(data, labels_to_ignore=labels_to_ignore) elif type(data) is pd.Series: data = pd.DataFrame(data) in_length = len(data.columns) return_data = self.flatten_dataframe(data, labels_to_ignore=labels_to_ignore) elif type(data) is dict: in_length = len(data) return_data = self.flatten_dict(data, labels_to_ignore=labels_to_ignore) elif type(data) is list: in_length = len(data) return_data = self.flatten_list(data, labels_to_ignore=labels_to_ignore) else: self._log.log_and_raise_warning(("Input data type '{}' is not supported").format(type(data))) return None result_dataframe = pd.DataFrame.from_dict(return_data, orient='index') if index_name is not None: result_dataframe.index.name = index_name self._log.info(('Data has been flattened, created {} columns in {} seconds').format(len(result_dataframe.columns)- in_length, time.time()-start)) return result_dataframe def flatten_dataframe(self, dataframe: pd.DataFrame, incoming_key: str = None, labels_to_ignore: list = []): ''' :param pd.Dataframe dataframe: dataframe containing the data to be flattened :param str incoming_key: string to be appended to the key ''' assert(isinstance(dataframe, pd.DataFrame)),\ "Parameter 'dataframe' be of DataFrame type" if incoming_key is not None and not isinstance(incoming_key, str): incoming_key = str(incoming_key) result_dict = {} for index, row in dataframe.iterrows(): temp_result_dict = {} for key, value in row.iteritems(): if not isinstance(key, str): key = str(key) small_key = key if incoming_key is not None: key = incoming_key + '_' + key temp_result = {} if small_key in labels_to_ignore: temp_result_dict[key] = value else: if type(value) == list: temp_result = self.flatten_list(value, key, labels_to_ignore) elif type(value) == dict: temp_result = self.flatten_dict(value, key, labels_to_ignore) else: temp_result_dict[key] = value if len(temp_result) > 0: temp_result_dict = self.append_to_dict(temp_result_dict, temp_result) result_dict[index] = copy.deepcopy(temp_result_dict) return result_dict def flatten_dict(self, dictionary: dict, incoming_key: str = None, labels_to_ignore: list = []): ''' :param dict dictionary: dictionary containing the data to be flattened :param str incoming_key: string to be appended to the key ''' assert(isinstance(dictionary, dict)),\ "Parameter 'dictionary' be of Dictionary type" if incoming_key is not None and not isinstance(incoming_key, str): incoming_key = str(incoming_key) result_dict = {} for key in dictionary: temp_data = dictionary[key] if not isinstance(key, str): key = str(key) small_key = key if incoming_key is not None: key = incoming_key + '_' + key temp_result = {} if small_key in labels_to_ignore: result_dict[key] = temp_data else: if type(temp_data) == list: temp_result = self.flatten_list(temp_data, key, labels_to_ignore) elif type(temp_data) == dict: temp_result = self.flatten_dict(temp_data, key, labels_to_ignore) else: result_dict[key] = temp_data if len(temp_result) > 0: result_dict = self.append_to_dict(result_dict, temp_result) return result_dict def flatten_list(self, data_list: list, incoming_key: str = None, labels_to_ignore: list = []): ''' :param list data_list: list containing the data to be flattened :param str incoming_key: string to be appended to the key ''' assert(isinstance(data_list, list)),\ "Parameter 'data_list' be of List type" if incoming_key is not None and not isinstance(incoming_key, str): incoming_key = str(incoming_key) result_dict = {} for iteration, item in enumerate(data_list): temp_dataframe = item temp_result = {} key = incoming_key if not isinstance(key, str): key = str(key) if incoming_key is not None: list_iterator = self.add_list_iterator(data_list, iteration) key = incoming_key + '_' + list_iterator else: key = str(iteration) if type(temp_dataframe) == list: temp_result = self.flatten_list(temp_dataframe, key, labels_to_ignore) elif type(temp_dataframe) == dict: temp_result = self.flatten_dict(temp_dataframe, key, labels_to_ignore) else: result_dict[key] = temp_dataframe if len(temp_result) > 0: result_dict = self.append_to_dict(result_dict, temp_result) return result_dict def append_to_dict(self, dictionary: dict, to_append): ''' :param dict dictionary: dictionary which holds all the resulting data. :param dict to_append: data to be added to the resulting dictionary. ''' assert(isinstance(dictionary, dict)),\ "Parameter 'dictionary' be of Dictionary type" assert(isinstance(to_append, dict)),\ "Parameter 'to_append' be of Dictionary type" for key in to_append: dictionary[key] = to_append[key] return dictionary def flatten_if_not_flat(self, data: pd.DataFrame, labels_to_ignore: list = []): for data_type in data.dtypes: if data_type == object: return self.flatten(data, labels_to_ignore=labels_to_ignore) return data # Create class that inherits FlattenData and overwrite this function for specific implementations. def add_list_iterator(self, data_list: list, iteration: int): return str(iteration)