#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Mon Jul 22 11:05:47 2019 @author: tanya @description: a function to reshape a pandas dataframe to a list of (possibly nested) documents with respect to a (json) mongodb schema """ import pandas as pd import os import sys sys.path.append(os.getcwd()) class DataFrameToCollection: ''' ''' def __init__(self, schema_path: str = None, log_path: str = None): ''' ''' from libraries.log import Log import json self._log = Log("ParseJsonSchema") if schema_path is not None: if not os.path.isfile(schema_path): err = "JsonSchema not found" self._log.error(err) raise FileNotFoundError(err) # load schema to dictionary if it is a valid json file try: with open(schema_path, "r") as f: self.schema = json.load(f) except Exception as e: err = ("Could not load json schema, " "Obtained error {}".format(e)) self._log.error(err) raise Exception(err) else: self.schema = None def to_list_of_documents(self, data: pd.DataFrame, grp_fields: list, schema: dict = None, _return_data: bool = False) -> list: ''' Reshapes a pandas dataframe to a list of documents according to a complex (json) mongodb schema Remark1: column names of data need to reflect the "nestedness" of the field in the mongodb schema with the help of a "." separator Example: field.sub_field_1, field.sub_field_2 Remark2: if the schema is stored as a json file, first load it to a dictionary with the help of the python json module ''' from copy import deepcopy from libraries.log import Log log = Log("reshape_dataframe_to_list_of_documents:") data = self._melt_duplicated_columns(data) reshaped_fields = [] if schema is None: schema = self.schema for field in schema["properties"]: if field not in self._unroll_nested_names(data.columns): continue field_type = schema["properties"][field]["bsonType"] # if field has a simple type if field_type not in ["array", "object"]: grp_fields = [c for c in grp_fields if c in data.columns] n_distinct_values = data.groupby(grp_fields)[field].nunique()\ .max() if n_distinct_values != 1: err = "Field {0} is not unique with respect to {1}"\ .format(field, grp_fields) log.error(err) raise Exception(err) if field not in grp_fields: reshaped_field = data.groupby(grp_fields)[field].first() else: reshaped_field =\ data[grp_fields].drop_duplicates()\ .set_index(grp_fields, drop=False)[field] reshaped_fields.append(reshaped_field) # if field is sub-document (dictionary) elif field_type == "object": sub_schema = deepcopy(schema["properties"][field]) # rename sub-schema properties to match with data column names sub_schema["properties"] =\ {".".join([field, k]): v for k, v in sub_schema["properties"].items()} sub_data = self.to_list_of_documents( data=data, schema=sub_schema, grp_fields=grp_fields, _return_data=True) reshaped_field = sub_data.apply(self._make_dict, axis=1) reshaped_field.name = field reshaped_fields.append(reshaped_field) # if field is a list of dictionaries elif field_type == "array": items_type = schema["properties"][field]["items"]["bsonType"] if items_type == "object": sub_schema = deepcopy(schema["properties"][field]["items"]) # rename sub-schema properties to match data column names sub_schema["properties"] =\ {".".join([field, k]): v for k, v in sub_schema["properties"].items()} # extend grp fields by sub-fields of field simple types sub_grp_fields =\ [f for f in sub_schema["properties"] if sub_schema["properties"][f]["bsonType"] not in ["array", "object"]] if len(sub_grp_fields) == 0: err = ("One of the sub-keys in a list of documents" " must be of simple type for the field {}" .format(field)) log.error(err) raise Exception(err) # group and reshape sub-fields with complex types sub_data = self.to_list_of_documents( data=data, schema=sub_schema, grp_fields=grp_fields + sub_grp_fields, _return_data=True) if sub_data is not None: # gether the results into a list of dictionaries sub_data = sub_data.apply(self._make_dict, axis=1) sub_data.name = field sub_data = sub_data.reset_index(grp_fields) reshaped_field =\ sub_data.groupby(grp_fields)[field]\ .apply(self._make_list_of_distinct) reshaped_fields.append(reshaped_field) # if field is a list of values with simple type else: grp_fields = [c for c in grp_fields if c in data.columns] if field in data.columns: reshaped_field = data.groupby(grp_fields)[field]\ .apply(self._make_list_of_distinct) reshaped_fields.append(reshaped_field) if len(reshaped_fields) > 0: reshaped_data = pd.concat(reshaped_fields, axis=1) if not _return_data: list_of_documents =\ reshaped_data.drop(list(reshaped_data.index.names), axis=1, errors="ignore")\ .reset_index(drop=False) log.info("Done reshaping the dataframe to a list of documents") return list_of_documents else: return reshaped_data def _melt_duplicated_columns(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' for c in set(data.columns): if isinstance(data[c], pd.DataFrame): data = pd.melt(data, id_vars=[cc for cc in data.columns if cc != c], value_vars=c)\ .drop("variable", axis=1)\ .rename(columns={"value": c}) return data def _make_dict(self, x: pd.Series) -> dict: ''' return: transforms pandas series to a dictionary is meant to be applied to a dataframe in axis = 1, then the index of the input series are the column names of the dataframe ''' return {f.split(".")[-1]: x[f] for f in x.index} def _make_list(self, x: pd.Series) -> list: ''' return: list of values in a series ''' return list(x) def _make_list_of_distinct(self, x: pd.Series) -> list: ''' return: list of unique values from a Series where entries are arbitrary objects (pandas unique() method does not work if entries are of complex types) ''' distinct = [] [distinct.append(obj) for obj in x if obj not in distinct] return distinct def _unroll_nested_names(self, columns: list) -> list: ''' ''' unrolled = [] for c in columns: splitted = c.split(".") for i in range(len(splitted)): unrolled.append(".".join(splitted[:i+1])) return unrolled if __name__ == "__main__": # Testing df = pd.DataFrame({ "a": [1]*8 + [2]*8, "b": [10]*8 + [20]*8, "c": [100, 200]*8, "d.da": [11]*8 + [22]*8, "d.db": [33]*8 + [34]*8, "e.ea.eaa": [5]*8 + [55]*8, "e.ea.eab": [6]*8 + [66]*8, "e.eb": [2, 2, 3, 3]*4, "e.ec.eca": [1, 2, 3, 4]*4, "e.ec.ecb": [5, 6, 7, 8]*4, "f.fa": [1]*4 + [3]*4 + [11]*4 + [33]*4, "f.fb": [2]*4 + [3]*2 + [4]*2 + [22]*4 + [44]*4}) duplicate = pd.DataFrame({"c": [300, 400]*8}) df = pd.concat([df, duplicate], axis=1) schm = { "bsonType": "object", "required": ["a"], "properties": { "a": {"bsonType": "integer"}, "b": {"bsonType": "integer"}, "c": { "bsonType": "array", "items": {"bsonType": "integer"} }, "d": { "bsonType": "object", "properties": { "da": {"bsonType": "integer"}, "db": {"bsonType": "integer"} } }, "e": { "bsonType": "object", "properties": { "ea": { "bsonType": "object", "properties": { "eaa": {"bsonType": "integer"}, "eab": {"bsonType": "integer"} } }, "eb": { "bsonType": "array", "items": {"bsonType": "integer"} }, "ec": { "bsonType": "array", "items": { "bsonType": "object", "properties": { "eca": {"bsonType": "integer"}, "ecb": {"bsonType": "integer"} } } } } }, "f": { "bsonType": "array", "items": { "bsonType": "object", "properties": { "fa": {"bsonType": "integer"}, "fb": { "bsonType": "array", "items": {"bsonType": "integer"} } } } } } } grp_fields = ["a"] result = DataFrameToCollection().to_list_of_documents( data=df, schema=schm, grp_fields=grp_fields)