|
@@ -0,0 +1,396 @@
|
|
|
|
+#!/usr/bin/env python3
|
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
|
+"""
|
|
|
|
+Created on Mon Jul 22 11:05:47 2019
|
|
|
|
+
|
|
|
|
+@author: tanya
|
|
|
|
+
|
|
|
|
+@description: a function to reshape a pandas dataframe to a list of
|
|
|
|
+(possibly nested) documents with respect to a (json) mongodb schema
|
|
|
|
+"""
|
|
|
|
+
|
|
|
|
+import pandas as pd
|
|
|
|
+import numpy as np
|
|
|
|
+import os
|
|
|
|
+import sys
|
|
|
|
+
|
|
|
|
+sys.path.append(os.getcwd())
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class DataFrameToCollection():
|
|
|
|
+ '''
|
|
|
|
+ '''
|
|
|
|
+ def __init__(self, schema_path: str):
|
|
|
|
+ '''
|
|
|
|
+ '''
|
|
|
|
+ from cdplib.log import Log
|
|
|
|
+ import json
|
|
|
|
+
|
|
|
|
+ self._log = Log("ParseJsonSchema")
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ if not os.path.isfile(schema_path):
|
|
|
|
+ err = "JsonSchema not found"
|
|
|
|
+ self._log.error(err)
|
|
|
|
+ raise FileNotFoundError(err)
|
|
|
|
+
|
|
|
|
+ # load schema to dictionary if it is a valid json file
|
|
|
|
+ try:
|
|
|
|
+ with open(schema_path, "r") as f:
|
|
|
|
+ self.schema = json.load(f)
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ err = ("Could not load json schema, "
|
|
|
|
+ "Obtained error {}".format(e))
|
|
|
|
+
|
|
|
|
+ self._log.error(err)
|
|
|
|
+ raise Exception(err)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ def to_list_of_documents(self, data: pd.DataFrame,
|
|
|
|
+ grp_fields: list,
|
|
|
|
+ schema: dict = None,
|
|
|
|
+ _final_step: bool = True) -> list:
|
|
|
|
+ '''
|
|
|
|
+ Reshapes a pandas dataframe to a list of documents according
|
|
|
|
+ to a complex (json) mongodb schema
|
|
|
|
+
|
|
|
|
+ Remark1: column names of data need to reflect the "nestedness"
|
|
|
|
+ of the field in the mongodb schema with the help of a "." separator
|
|
|
|
+ Example: field.sub_field_1, field.sub_field_2
|
|
|
|
+
|
|
|
|
+ Remark2: if the schema is stored as a json file, first load it
|
|
|
|
+ to a dictionary with the help of the python json module
|
|
|
|
+
|
|
|
|
+ The function goes recurisively through all the fields and reshapes
|
|
|
|
+ them correspondingly depending on whether the field is an array,
|
|
|
|
+ an object, or simple field. For each field we group the data by the
|
|
|
|
+ grp_fields and reshape it accordingly, the result is a pandas Series.
|
|
|
|
+ In the end all the series are collected and concatenated.
|
|
|
|
+ '''
|
|
|
|
+ from copy import deepcopy
|
|
|
|
+
|
|
|
|
+ data = self._melt_duplicated_columns(data)
|
|
|
|
+
|
|
|
|
+ reshaped_fields = []
|
|
|
|
+
|
|
|
|
+ if schema is None:
|
|
|
|
+ schema = self.schema
|
|
|
|
+
|
|
|
|
+ for field in schema["properties"]:
|
|
|
|
+
|
|
|
|
+ if field not in self._unroll_nested_names(data.columns):
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ field_type = schema["properties"][field]["bsonType"]
|
|
|
|
+
|
|
|
|
+ # if field has a simple type
|
|
|
|
+ if field_type not in ["array", "object"]:
|
|
|
|
+
|
|
|
|
+ grp_fields = [c for c in grp_fields if c in data.columns]
|
|
|
|
+
|
|
|
|
+ # check that there is only one possible value of this field
|
|
|
|
+ n_distinct_values = data.groupby(grp_fields, sort=False)[field].nunique().max()
|
|
|
|
+
|
|
|
|
+ if n_distinct_values != 1:
|
|
|
|
+ err = "Field {0} is not unique with respect to {1}"\
|
|
|
|
+ .format(field, grp_fields)
|
|
|
|
+
|
|
|
|
+ self._log.error(err)
|
|
|
|
+ raise Exception(err)
|
|
|
|
+
|
|
|
|
+ if field not in grp_fields:
|
|
|
|
+ reshaped_field = data.groupby(grp_fields, sort=False)[field].first()
|
|
|
|
+ else:
|
|
|
|
+ reshaped_field =\
|
|
|
|
+ data[grp_fields].drop_duplicates()\
|
|
|
|
+ .set_index(grp_fields, drop=False)[field]
|
|
|
|
+
|
|
|
|
+ reshaped_fields.append(reshaped_field)
|
|
|
|
+
|
|
|
|
+ # if field is sub-document (dictionary)
|
|
|
|
+ elif field_type == "object":
|
|
|
|
+
|
|
|
|
+ sub_schema = deepcopy(schema["properties"][field])
|
|
|
|
+
|
|
|
|
+ # rename sub-schema properties to match with data column names
|
|
|
|
+ sub_schema["properties"] =\
|
|
|
|
+ {".".join([field, k]): v for k, v
|
|
|
|
+ in sub_schema["properties"].items()}
|
|
|
|
+
|
|
|
|
+ sub_data = self.to_list_of_documents(
|
|
|
|
+ data=data,
|
|
|
|
+ schema=sub_schema,
|
|
|
|
+ grp_fields=grp_fields,
|
|
|
|
+ _final_step=False)
|
|
|
|
+
|
|
|
|
+ reshaped_field = sub_data.apply(self._make_dict, axis=1)
|
|
|
|
+ reshaped_field.name = field
|
|
|
|
+
|
|
|
|
+ reshaped_fields.append(reshaped_field)
|
|
|
|
+
|
|
|
|
+ # if field is a list of dictionaries
|
|
|
|
+ elif field_type == "array":
|
|
|
|
+
|
|
|
|
+ items_type = schema["properties"][field]["items"]["bsonType"]
|
|
|
|
+
|
|
|
|
+ if items_type == "object":
|
|
|
|
+
|
|
|
|
+ sub_schema = deepcopy(schema["properties"][field]["items"])
|
|
|
|
+
|
|
|
|
+ # rename sub-schema properties to match data column names
|
|
|
|
+ sub_schema["properties"] =\
|
|
|
|
+ {".".join([field, k]): v for k, v in
|
|
|
|
+ sub_schema["properties"].items()}
|
|
|
|
+
|
|
|
|
+ # extend grp fields by sub-fields of field simple types
|
|
|
|
+ sub_grp_fields = [f for f in sub_schema["properties"]
|
|
|
|
+ if (sub_schema["properties"][f]["bsonType"] not in ["array", "object"])
|
|
|
|
+ and (f in data.columns)]
|
|
|
|
+
|
|
|
|
+ if len(sub_grp_fields) == 0:
|
|
|
|
+ err = ("One of the sub-keys in a list of documents"
|
|
|
|
+ " must be of simple type for the field {}"
|
|
|
|
+ .format(field))
|
|
|
|
+
|
|
|
|
+ self._log.error(err)
|
|
|
|
+ raise Exception(err)
|
|
|
|
+
|
|
|
|
+ # group and reshape sub-fields with complex types
|
|
|
|
+ sub_data = self.to_list_of_documents(
|
|
|
|
+ data=data,
|
|
|
|
+ schema=sub_schema,
|
|
|
|
+ grp_fields=grp_fields + sub_grp_fields,
|
|
|
|
+ _final_step=False)
|
|
|
|
+
|
|
|
|
+ if sub_data is not None:
|
|
|
|
+
|
|
|
|
+ # gether the results into a list of dictionaries
|
|
|
|
+ sub_data = sub_data.apply(self._make_dict, axis=1)
|
|
|
|
+
|
|
|
|
+ sub_data.name = field
|
|
|
|
+ sub_data = sub_data.reset_index(grp_fields)
|
|
|
|
+
|
|
|
|
+ reshaped_field =\
|
|
|
|
+ sub_data.groupby(grp_fields, sort=False)[field]\
|
|
|
|
+ .apply(self._make_list_of_distinct)
|
|
|
|
+
|
|
|
|
+ reshaped_fields.append(reshaped_field)
|
|
|
|
+
|
|
|
|
+ # if field is a list of values with simple type
|
|
|
|
+ elif items_type == "array":
|
|
|
|
+
|
|
|
|
+ grp_fields = [c for c in grp_fields if c in data.columns]
|
|
|
|
+
|
|
|
|
+ if field in data.columns:
|
|
|
|
+
|
|
|
|
+ reshaped_field = data.groupby(grp_fields, sort=False)[field]\
|
|
|
|
+ .apply(self._make_list_of_distinct)
|
|
|
|
+
|
|
|
|
+ reshaped_fields.append(reshaped_field)
|
|
|
|
+
|
|
|
|
+ else:
|
|
|
|
+
|
|
|
|
+ grp_fields = [c for c in grp_fields if c in data.columns]
|
|
|
|
+
|
|
|
|
+ if field in data.columns:
|
|
|
|
+
|
|
|
|
+ reshaped_field = data.groupby(grp_fields, sort=False)[field]\
|
|
|
|
+ .apply(self._make_flattened_list_of_distinct)
|
|
|
|
+
|
|
|
|
+ reshaped_fields.append(reshaped_field)
|
|
|
|
+
|
|
|
|
+ if len(reshaped_fields) > 0:
|
|
|
|
+
|
|
|
|
+ reshaped_fields = pd.concat(reshaped_fields, sort=False, axis=1)
|
|
|
|
+
|
|
|
|
+ if _final_step:
|
|
|
|
+ # dropping the index names if it is the final step,
|
|
|
|
+ # if not the index is needed for merging
|
|
|
|
+ reshaped_fields =\
|
|
|
|
+ reshaped_fields.drop(list(reshaped_fields.index.names), axis=1, errors="ignore")\
|
|
|
|
+ .reset_index(drop=False)
|
|
|
|
+
|
|
|
|
+ self._log.info("Done reshaping the dataframe to a list of documents")
|
|
|
|
+
|
|
|
|
+ return reshaped_fields
|
|
|
|
+
|
|
|
|
+ else:
|
|
|
|
+ return
|
|
|
|
+
|
|
|
|
+ def _melt_duplicated_columns(self, data: pd.DataFrame) -> pd.DataFrame:
|
|
|
|
+ '''
|
|
|
|
+ '''
|
|
|
|
+ data = data.copy(deep=True)
|
|
|
|
+
|
|
|
|
+ for c in set(data.columns):
|
|
|
|
+ if isinstance(data[c], pd.DataFrame):
|
|
|
|
+ """
|
|
|
|
+ data = pd.melt(data, id_vars=[cc for cc in data.columns
|
|
|
|
+ if cc != c], value_vars=c)\
|
|
|
|
+ .drop("variable", axis=1)\
|
|
|
|
+ .rename(columns={"value": c})
|
|
|
|
+ """
|
|
|
|
+ data["temp"] = data[c].apply(self._make_list, axis=1)
|
|
|
|
+ data.drop(c, axis=1, inplace=True)
|
|
|
|
+ data = data.rename(columns={"temp": c})
|
|
|
|
+
|
|
|
|
+ return data
|
|
|
|
+
|
|
|
|
+ def _make_dict(self, x: pd.Series) -> dict:
|
|
|
|
+ '''
|
|
|
|
+ Transforms pandas series to a dictionary
|
|
|
|
+ is meant to be applied to a dataframe in axis = 1,
|
|
|
|
+ then the index of the input series are the column names
|
|
|
|
+ of the dataframe
|
|
|
|
+ '''
|
|
|
|
+ def custom_is_null(y):
|
|
|
|
+ if isinstance(pd.notnull(y), bool):
|
|
|
|
+ return pd.notnull(y)
|
|
|
|
+ else:
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+ return {f.split(".")[-1]: x[f] for f in x.index
|
|
|
|
+ if custom_is_null(x[f])}
|
|
|
|
+
|
|
|
|
+ def _make_list(self, x: pd.Series) -> list:
|
|
|
|
+ '''
|
|
|
|
+ return: list of values in a series
|
|
|
|
+ '''
|
|
|
|
+ return list(x)
|
|
|
|
+
|
|
|
|
+ def _make_list_of_distinct(self, x: pd.Series) -> list:
|
|
|
|
+ '''
|
|
|
|
+ return: list of unique values from a Series where
|
|
|
|
+ entries are arbitrary objects
|
|
|
|
+ (pandas unique() method does not work if entries are of complex types)
|
|
|
|
+ '''
|
|
|
|
+ uniques = pd.DataFrame({"temp": x.tolist()})\
|
|
|
|
+ .assign(temp_str=lambda y: y["temp"].astype(str))\
|
|
|
|
+ .drop_duplicates(subset=["temp_str"])\
|
|
|
|
+ .drop("temp_str", axis=1).iloc[:, 0].tolist()
|
|
|
|
+
|
|
|
|
+ def is_empty(y):
|
|
|
|
+ is_empty_dict = (isinstance(y, dict) and (len(y) == 0))
|
|
|
|
+ is_empty_list = (isinstance(y, list) and (len(y) == 0))
|
|
|
|
+ return is_empty_dict or is_empty_list
|
|
|
|
+
|
|
|
|
+ return [el for el in uniques if not is_empty(el)]
|
|
|
|
+
|
|
|
|
+ def _make_flattened_list_of_distinct(self, x: pd.Series) -> list:
|
|
|
|
+ '''
|
|
|
|
+ return: list of unique values from a Series where
|
|
|
|
+ entries are arbitrary objects
|
|
|
|
+ (pandas unique() method does not work if entries are of complex types)
|
|
|
|
+ '''
|
|
|
|
+ uniques = self._make_list_of_distinct(x)
|
|
|
|
+ return uniques[0]
|
|
|
|
+
|
|
|
|
+ def _unroll_nested_names(self, names: list) -> list:
|
|
|
|
+ '''
|
|
|
|
+ Example: transform a list ["name.firstname", "name.surname"]
|
|
|
|
+ into ["name", "name.firstname", "name.surname"]
|
|
|
|
+ '''
|
|
|
|
+ unrolled = []
|
|
|
|
+
|
|
|
|
+ for c in names:
|
|
|
|
+ splitted = c.split(".")
|
|
|
|
+ for i in range(len(splitted)):
|
|
|
|
+ unrolled.append(".".join(splitted[:i+1]))
|
|
|
|
+
|
|
|
|
+ return unrolled
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
+
|
|
|
|
+ # Testing
|
|
|
|
+
|
|
|
|
+ df = pd.DataFrame({
|
|
|
|
+ "a": [1]*8 + [2]*8,
|
|
|
|
+ "b": [10]*8 + [20]*8,
|
|
|
|
+ "c": [100, 200]*8,
|
|
|
|
+ "d.da": [11]*8 + [22]*8,
|
|
|
|
+ "d.db": [33]*8 + [34]*8,
|
|
|
|
+ "e.ea.eaa": [5]*8 + [55]*8,
|
|
|
|
+ "e.ea.eab": [6]*8 + [66]*8,
|
|
|
|
+ "e.eb": [2, 2, 3, 3]*4,
|
|
|
|
+ "e.ec.eca": [1, 2, 3, 4]*4,
|
|
|
|
+ "e.ec.ecb": [5, 6, 7, 8]*4,
|
|
|
|
+ "f.fa": [1]*4 + [3]*4 + [11]*4 + [33]*4,
|
|
|
|
+ "f.fb": [2]*4 + [3]*2 + [4]*2 + [22]*4 + [44]*4})
|
|
|
|
+
|
|
|
|
+ duplicate = pd.DataFrame({"c": [300, 400]*8})
|
|
|
|
+
|
|
|
|
+ df = pd.concat([df, duplicate], axis=1)
|
|
|
|
+
|
|
|
|
+ schm = {
|
|
|
|
+ "bsonType": "object",
|
|
|
|
+ "required": ["a"],
|
|
|
|
+ "properties": {
|
|
|
|
+
|
|
|
|
+ "a": {"bsonType": "integer"},
|
|
|
|
+
|
|
|
|
+ "b": {"bsonType": "integer"},
|
|
|
|
+
|
|
|
|
+ "c": {
|
|
|
|
+ "bsonType": "array",
|
|
|
|
+ "items": {"bsonType": "integer"}
|
|
|
|
+ },
|
|
|
|
+ "d": {
|
|
|
|
+ "bsonType": "object",
|
|
|
|
+ "properties": {
|
|
|
|
+ "da": {"bsonType": "integer"},
|
|
|
|
+ "db": {"bsonType": "integer"}
|
|
|
|
+ }
|
|
|
|
+ },
|
|
|
|
+ "e": {
|
|
|
|
+ "bsonType": "object",
|
|
|
|
+ "properties": {
|
|
|
|
+ "ea": {
|
|
|
|
+ "bsonType": "object",
|
|
|
|
+ "properties": {
|
|
|
|
+ "eaa": {"bsonType": "integer"},
|
|
|
|
+ "eab": {"bsonType": "integer"}
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ },
|
|
|
|
+
|
|
|
|
+ "eb": {
|
|
|
|
+ "bsonType": "array",
|
|
|
|
+ "items": {"bsonType": "integer"}
|
|
|
|
+ },
|
|
|
|
+
|
|
|
|
+ "ec": {
|
|
|
|
+ "bsonType": "array",
|
|
|
|
+ "items": {
|
|
|
|
+ "bsonType": "object",
|
|
|
|
+ "properties": {
|
|
|
|
+ "eca": {"bsonType": "integer"},
|
|
|
|
+ "ecb": {"bsonType": "integer"}
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ },
|
|
|
|
+ "f": {
|
|
|
|
+ "bsonType": "array",
|
|
|
|
+ "items": {
|
|
|
|
+ "bsonType": "object",
|
|
|
|
+ "properties": {
|
|
|
|
+ "fa": {"bsonType": "integer"},
|
|
|
|
+ "fb": {
|
|
|
|
+ "bsonType": "array",
|
|
|
|
+ "items": {"bsonType": "integer"}
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ grp_fields = ["a"]
|
|
|
|
+
|
|
|
|
+ result = DataFrameToCollection().to_list_of_documents(
|
|
|
|
+ data=df,
|
|
|
|
+ schema=schm,
|
|
|
|
+ grp_fields=grp_fields)
|