#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Mon Jul 22 11:05:47 2019 @author: tanya @description: a function to reshape a pandas dataframe to a list of (possibly nested) documents with respect to a (json) mongodb schema """ import pandas as pd import numpy as np import os import sys import time sys.path.append(os.getcwd()) class DataFrameToCollection(): ''' ''' def __init__(self, schema_path: str): ''' ''' from cdplib.log import Log import json self._log = Log("ParseJsonSchema") if not os.path.isfile(schema_path): err = "JsonSchema not found" self._log.error(err) raise FileNotFoundError(err) # load schema to dictionary if it is a valid json file try: with open(schema_path, "r") as f: self.schema = json.load(f) except Exception as e: err = ("Could not load json schema, " "Obtained error {}".format(e)) self._log.error(err) raise Exception(err) def to_list_of_documents(self, data: pd.DataFrame, grp_fields: list, schema: dict = None, _final_step: bool = True, already_reshaped: list = []) -> list: ''' Reshapes a pandas dataframe to a list of documents according to a complex (json) mongodb schema Remark1: column names of data need to reflect the "nestedness" of the field in the mongodb schema with the help of a "." separator Example: field.sub_field_1, field.sub_field_2 Remark2: if the schema is stored as a json file, first load it to a dictionary with the help of the python json module The function goes recurisively through all the fields and reshapes them correspondingly depending on whether the field is an array, an object, or simple field. For each field we group the data by the grp_fields and reshape it accordingly, the result is a pandas Series. In the end all the series are collected and concatenated. ''' from copy import deepcopy data = self._melt_duplicated_columns(data) reshaped_fields = [] if schema is None: schema = self.schema for field in schema["properties"]: if field not in self._unroll_nested_names(data.columns): continue if field in already_reshaped: reshaped_field = data.groupby(grp_fields, sort=False)[field]\ .apply(self._make_flattened_list_of_distinct) reshaped_fields.append(reshaped_field) else: field_type = schema["properties"][field]["bsonType"] # if field has a simple type if field_type not in ["array", "object"]: grp_fields = [c for c in grp_fields if c in data.columns] # check that there is only one possible value of this field n_distinct_values = data.groupby(grp_fields, sort=False)[field].nunique().max() # n_distinct_valus can be 0 if the column only contains NaN values if n_distinct_values > 1: err = "Field {0} is not unique with respect to {1}"\ .format(field, grp_fields) self._log.error(err) raise Exception(err) if field not in grp_fields: reshaped_field = data.groupby(grp_fields, sort=False)[field].first() else: reshaped_field =\ data[grp_fields].drop_duplicates()\ .set_index(grp_fields, drop=False)[field] reshaped_fields.append(reshaped_field) # if field is sub-document (dictionary) elif field_type == "object": sub_schema = deepcopy(schema["properties"][field]) # rename sub-schema properties to match with data column names sub_schema["properties"] =\ {".".join([field, k]): v for k, v in sub_schema["properties"].items()} sub_data = self.to_list_of_documents( data=data, schema=sub_schema, grp_fields=grp_fields, _final_step=False, already_reshaped=already_reshaped) # Need to be checked since child elements can be empty if sub_data is not None: reshaped_field = sub_data.apply(self._make_dict, axis=1) reshaped_field.name = field reshaped_fields.append(reshaped_field) # if field is a list of dictionaries elif field_type == "array": items_type = schema["properties"][field]["items"]["bsonType"] if items_type == "object": array_object = time.time() sub_schema = deepcopy(schema["properties"][field]["items"]) # rename sub-schema properties to match data column names sub_schema["properties"] =\ {".".join([field, k]): v for k, v in sub_schema["properties"].items()} # extend grp fields by sub-fields of field simple types sub_grp_fields = [f for f in sub_schema["properties"] if (sub_schema["properties"][f]["bsonType"] not in ["array", "object"]) and (f in data.columns)] if len(sub_grp_fields) == 0: err = ("One of the sub-keys in a list of documents" " must be of simple type for the field {}" .format(field)) self._log.error(err) raise Exception(err) # group and reshape sub-fields with complex types sub_data = self.to_list_of_documents( data=data, schema=sub_schema, grp_fields=grp_fields + sub_grp_fields, _final_step=False, already_reshaped=already_reshaped) if sub_data is not None: # gether the results into a list of dictionaries sub_data = sub_data.apply(self._make_dict, axis=1) sub_data.name = field sub_data = sub_data.reset_index(grp_fields) ###################################################### ######## OPTIMIZATIONS MAY BE POSSIBLE HERE ########## reshaped_field =\ sub_data.groupby(grp_fields, sort=False)[field]\ .apply(self._make_list_of_distinct) ###################################################### reshaped_fields.append(reshaped_field) # if field is a list of values with simple type elif items_type == "array": grp_fields = [c for c in grp_fields if c in data.columns] if field in data.columns: reshaped_field = data.groupby(grp_fields, sort=False)[field]\ .apply(self._make_list_of_distinct) reshaped_fields.append(reshaped_field) else: grp_fields = [c for c in grp_fields if c in data.columns] if field in data.columns: reshaped_field = data.groupby(grp_fields, sort=False)[field]\ .apply(self._make_flattened_list_of_distinct) reshaped_fields.append(reshaped_field) if len(reshaped_fields) > 0: reshaped_fields = pd.concat(reshaped_fields, sort=False, axis=1) if _final_step: # dropping the index names if it is the final step, # if not the index is needed for merging reshaped_fields =\ reshaped_fields.drop(list(reshaped_fields.index.names), axis=1, errors="ignore")\ .reset_index(drop=False) self._log.info("Done reshaping the dataframe to a list of documents") return reshaped_fields else: return def _melt_duplicated_columns(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' data = data.copy(deep=True) for c in set(data.columns): if isinstance(data[c], pd.DataFrame): """ data = pd.melt(data, id_vars=[cc for cc in data.columns if cc != c], value_vars=c)\ .drop("variable", axis=1)\ .rename(columns={"value": c}) """ data["temp"] = data[c].apply(self._make_list, axis=1) data.drop(c, axis=1, inplace=True) data = data.rename(columns={"temp": c}) return data def _make_dict(self, x: pd.Series) -> dict: ''' Transforms pandas series to a dictionary is meant to be applied to a dataframe in axis = 1, then the index of the input series are the column names of the dataframe ''' def custom_is_null(y): if isinstance(pd.notnull(y), bool): return pd.notnull(y) else: return True return {f.split(".")[-1]: x[f] for f in x.index if custom_is_null(x[f])} def _make_list(self, x: pd.Series) -> list: ''' return: list of values in a series ''' return list(x) def _make_list_of_distinct(self, x: pd.Series) -> list: ''' return: list of unique values from a Series where entries are arbitrary objects (pandas unique() method does not work if entries are of complex types) ''' if x.size == 1: uniques = x.tolist() ''' if return_value == [{}]: return [] return return_value ''' else: uniques = pd.DataFrame({"temp": x.values})\ .assign(temp_str=lambda y: y["temp"].astype(np.str))\ .drop_duplicates(subset=["temp_str"])\ .drop("temp_str", axis=1).iloc[:, 0].tolist() def is_empty(y): is_empty_dict = (isinstance(y, dict) and (len(y) == 0)) is_empty_list = (isinstance(y, list) and (len(y) == 0)) return is_empty_dict or is_empty_list return [el for el in uniques if not is_empty(el)] def _make_flattened_list_of_distinct(self, x: pd.Series) -> list: ''' return: list of unique values from a Series where entries are arbitrary objects (pandas unique() method does not work if entries are of complex types) ''' uniques = self._make_list_of_distinct(x) if len(uniques) > 0: return uniques[0] else: return None def _unroll_nested_names(self, names: list) -> list: ''' Example: transform a list ["name.firstname", "name.surname"] into ["name", "name.firstname", "name.surname"] ''' unrolled = [] for c in names: splitted = c.split(".") for i in range(len(splitted)): unrolled.append(".".join(splitted[:i+1])) return unrolled if __name__ == "__main__": # Testing df = pd.DataFrame({ "a": [1]*8 + [2]*8, "b": [10]*8 + [20]*8, "c": [100, 200]*8, "d.da": [11]*8 + [22]*8, "d.db": [33]*8 + [34]*8, "e.ea.eaa": [5]*8 + [55]*8, "e.ea.eab": [6]*8 + [66]*8, "e.eb": [2, 2, 3, 3]*4, "e.ec.eca": [1, 2, 3, 4]*4, "e.ec.ecb": [5, 6, 7, 8]*4, "f.fa": [1]*4 + [3]*4 + [11]*4 + [33]*4, "f.fb": [2]*4 + [3]*2 + [4]*2 + [22]*4 + [44]*4}) duplicate = pd.DataFrame({"c": [300, 400]*8}) df = pd.concat([df, duplicate], axis=1) schm = { "bsonType": "object", "required": ["a"], "properties": { "a": {"bsonType": "integer"}, "b": {"bsonType": "integer"}, "c": { "bsonType": "array", "items": {"bsonType": "integer"} }, "d": { "bsonType": "object", "properties": { "da": {"bsonType": "integer"}, "db": {"bsonType": "integer"} } }, "e": { "bsonType": "object", "properties": { "ea": { "bsonType": "object", "properties": { "eaa": {"bsonType": "integer"}, "eab": {"bsonType": "integer"} } }, "eb": { "bsonType": "array", "items": {"bsonType": "integer"} }, "ec": { "bsonType": "array", "items": { "bsonType": "object", "properties": { "eca": {"bsonType": "integer"}, "ecb": {"bsonType": "integer"} } } } } }, "f": { "bsonType": "array", "items": { "bsonType": "object", "properties": { "fa": {"bsonType": "integer"}, "fb": { "bsonType": "array", "items": {"bsonType": "integer"} } } } } } } grp_fields = ["a"] result = DataFrameToCollection().to_list_of_documents( data=df, schema=schm, grp_fields=grp_fields)