123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Wed Oct 9 15:17:34 2019
- @author: oskar
- @description: Class which flattens nested Dataframes, Dictionaries and Lists into tabular form
- """
- import sys
- import os
- import time
- import pandas as pd
- import copy
- sys.path.append(os.getcwd())
- from cdplib.log import Log
- class FlattenData():
- def __init__(self):
- self._log = Log("Flatten data")
-
- def flatten(self, data) -> pd.DataFrame():
- '''
- :parm data: data given in either dictionary, list or dataframe format.
- '''
- assert(isinstance(data, (list, dict, pd.DataFrame))),\
- "Parameter 'data' either be of List, Dictionary or DataFrame type"
- start = time.time()
- if type(data) is pd.DataFrame:
- return_data = self.flatten_dataframe(data)
- if type(data) is dict:
- return_data = self.flatten_dict(data)
- if type(data) is list:
- return_data = self.flatten_list(data)
- else:
- self._log.log_and_raise_warning('Input data type is not supported')
- return None
- result_dataframe = pd.DataFrame.from_dict(return_data, orient='index')
- self._log.info(('Data has been flattened, created {} columns in {} seconds').format(len(result_dataframe.columns)- len(data.columns), time.time()-start))
- return result_dataframe
- def flatten_dataframe(self, dataframe: pd.DataFrame, incoming_key: str = None):
- '''
- :param pd.Dataframe dataframe: dataframe containing the data to be flattened
- :param str incoming_key: string to be appended to the key
- '''
- assert(isinstance(dataframe, pd.DataFrame)),\
- "Parameter 'dataframe' be of DataFrame type"
- if incoming_key is not None:
- assert(isinstance(incoming_key, str)),\
- "Parameter 'incoming_key' be of String type"
- result_dict = {}
- for index, row in dataframe.iterrows():
- temp_result_dict = {}
- for key, value in row.iteritems():
- temp_result = {}
- if incoming_key is not None:
- key = incoming_key + '_' + key
- if type(value) == list:
- temp_result = self.flatten_list(value, key)
- elif type(value) == dict:
- temp_result = self.flatten_dict(value, key)
- else:
- temp_result_dict[key] = value
- if len(temp_result) > 0:
- temp_result_dict = self.append_to_dict(temp_result_dict, temp_result)
- result_dict[index] = copy.deepcopy(temp_result_dict)
- return result_dataframe
- def flatten_dict(self, dictionary: dict, incoming_key: str = None):
- '''
- :param dict dictionary: dictionary containing the data to be flattened
- :param str incoming_key: string to be appended to the key
- '''
- assert(isinstance(dictionary, dict)),\
- "Parameter 'dictionary' be of Dictionary type"
- if incoming_key is not None:
- assert(isinstance(incoming_key, str)),\
- "Parameter 'incoming_key' be of String type"
- result_dict = {}
- for key in dictionary:
- temp_dataframe = dictionary[key]
- temp_result = {}
- if incoming_key is not None:
- key = incoming_key + '_' + key
- if type(temp_dataframe) == list:
- temp_result = self.flatten_list(temp_dataframe, key)
- elif type(temp_dataframe) == dict:
- temp_result = self.flatten_dict(temp_dataframe, key)
- else:
- result_dict[key] = temp_dataframe
- if len(temp_result) > 0:
- result_dict = self.append_to_dict(result_dict, temp_result)
- return result_dict
- def flatten_list(self, data_list: list, incoming_key: str = None):
- '''
- :param list data_list: list containing the data to be flattened
- :param str incoming_key: string to be appended to the key
- '''
- assert(isinstance(data_list, list)),\
- "Parameter 'data_list' be of List type"
- if incoming_key is not None:
- assert(isinstance(incoming_key, str)),\
- "Parameter 'incoming_key' be of String type"
- result_dict = {}
- for iteration, item in enumerate(data_list):
- temp_dataframe = item
- temp_result = {}
- key = incoming_key
- if incoming_key is not None:
- # OEBB SPECIFIC IF STATEMENT
- if type(data_list[iteration]) is dict:
- if 'stationsnummer' in data_list[iteration].keys() and 'stage' in data_list[iteration].keys() :
- key = incoming_key + '_' + str(data_list[iteration]['stationsnummer']) + '_' + str(data_list[iteration]['stage'])
- else:
- key = incoming_key + '_' + str(iteration)
- else:
- key = str(iteration)
- if type(temp_dataframe) == list:
- temp_result = self.flatten_list(temp_dataframe, key)
- elif type(temp_dataframe) == dict:
- temp_result = self.flatten_dict(temp_dataframe, key)
- else:
- result_dict[key] = temp_dataframe
- if len(temp_result) > 0:
- result_dict = self.append_to_dict(result_dict, temp_result)
- return result_dict
- def append_to_dict(self, dictionary: dict, to_append):
- '''
- :param dict dictionary: dictionary which holds all the resulting data.
- :param dict to_append: data to be added to the resulting dictionary.
- '''
- assert(isinstance(dictionary, dict)),\
- "Parameter 'dictionary' be of Dictionary type"
- assert(isinstance(to_append, dict)),\
- "Parameter 'to_append' be of Dictionary type"
- for key in to_append:
- dictionary[key] = to_append[key]
- return dictionary
|