123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Mon Jul 22 11:05:47 2019
- @author: tanya
- @description: a function to reshape a pandas dataframe to a list of
- (possibly nested) documents with respect to a (json) mongodb schema
- """
- import pandas as pd
- import os
- import sys
- sys.path.append(os.getcwd())
- class DataFrameToCollection:
- '''
- '''
- def __init__(self, schema_path: str = None, log_path: str = None):
- '''
- '''
- from libraries.log import Log
- import json
- self._log = Log("ParseJsonSchema")
- if schema_path is not None:
- if not os.path.isfile(schema_path):
- err = "JsonSchema not found"
- self._log.error(err)
- raise FileNotFoundError(err)
- # load schema to dictionary if it is a valid json file
- try:
- with open(schema_path, "r") as f:
- self.schema = json.load(f)
- except Exception as e:
- err = ("Could not load json schema, "
- "Obtained error {}".format(e))
- self._log.error(err)
- raise Exception(err)
- else:
- self.schema = None
- def to_list_of_documents(self, data: pd.DataFrame,
- grp_fields: list,
- schema: dict = None,
- _return_data: bool = False) -> list:
- '''
- Reshapes a pandas dataframe to a list of documents according
- to a complex (json) mongodb schema
- Remark1: column names of data need to reflect the "nestedness"
- of the field in the mongodb schema with the help of a "." separator
- Example: field.sub_field_1, field.sub_field_2
- Remark2: if the schema is stored as a json file, first load it
- to a dictionary with the help of the python json module
- '''
- from copy import deepcopy
- from libraries.log import Log
- log = Log("reshape_dataframe_to_list_of_documents:")
- data = self._melt_duplicated_columns(data)
- reshaped_fields = []
- if schema is None:
- schema = self.schema
- for field in schema["properties"]:
- if field not in self._unroll_nested_names(data.columns):
- continue
- field_type = schema["properties"][field]["bsonType"]
- # if field has a simple type
- if field_type not in ["array", "object"]:
- grp_fields = [c for c in grp_fields if c in data.columns]
- n_distinct_values = data.groupby(grp_fields)[field].nunique()\
- .max()
- if n_distinct_values != 1:
- err = "Field {0} is not unique with respect to {1}"\
- .format(field, grp_fields)
- log.error(err)
- raise Exception(err)
- if field not in grp_fields:
- reshaped_field = data.groupby(grp_fields)[field].first()
- else:
- reshaped_field =\
- data[grp_fields].drop_duplicates()\
- .set_index(grp_fields, drop=False)[field]
- reshaped_fields.append(reshaped_field)
- # if field is sub-document (dictionary)
- elif field_type == "object":
- sub_schema = deepcopy(schema["properties"][field])
- # rename sub-schema properties to match with data column names
- sub_schema["properties"] =\
- {".".join([field, k]): v for k, v
- in sub_schema["properties"].items()}
- sub_data = self.to_list_of_documents(
- data=data,
- schema=sub_schema,
- grp_fields=grp_fields,
- _return_data=True)
- reshaped_field = sub_data.apply(self._make_dict, axis=1)
- reshaped_field.name = field
- reshaped_fields.append(reshaped_field)
- # if field is a list of dictionaries
- elif field_type == "array":
- items_type = schema["properties"][field]["items"]["bsonType"]
- if items_type == "object":
- sub_schema = deepcopy(schema["properties"][field]["items"])
- # rename sub-schema properties to match data column names
- sub_schema["properties"] =\
- {".".join([field, k]): v for k, v in
- sub_schema["properties"].items()}
- # extend grp fields by sub-fields of field simple types
- sub_grp_fields =\
- [f for f in sub_schema["properties"]
- if sub_schema["properties"][f]["bsonType"]
- not in ["array", "object"]]
- if len(sub_grp_fields) == 0:
- err = ("One of the sub-keys in a list of documents"
- " must be of simple type for the field {}"
- .format(field))
- log.error(err)
- raise Exception(err)
- # group and reshape sub-fields with complex types
- sub_data = self.to_list_of_documents(
- data=data,
- schema=sub_schema,
- grp_fields=grp_fields + sub_grp_fields,
- _return_data=True)
- if sub_data is not None:
- # gether the results into a list of dictionaries
- sub_data = sub_data.apply(self._make_dict, axis=1)
- sub_data.name = field
- sub_data = sub_data.reset_index(grp_fields)
- reshaped_field =\
- sub_data.groupby(grp_fields)[field]\
- .apply(self._make_list_of_distinct)
- reshaped_fields.append(reshaped_field)
- # if field is a list of values with simple type
- else:
- grp_fields = [c for c in grp_fields if c in data.columns]
- if field in data.columns:
- reshaped_field = data.groupby(grp_fields)[field]\
- .apply(self._make_list_of_distinct)
- reshaped_fields.append(reshaped_field)
- if len(reshaped_fields) > 0:
- reshaped_data = pd.concat(reshaped_fields, axis=1)
- if not _return_data:
- list_of_documents =\
- reshaped_data.drop(list(reshaped_data.index.names),
- axis=1, errors="ignore")\
- .reset_index(drop=False)
- log.info("Done reshaping the dataframe to a list of documents")
- return list_of_documents
- else:
- return reshaped_data
- def _melt_duplicated_columns(self, data: pd.DataFrame) -> pd.DataFrame:
- '''
- '''
- for c in set(data.columns):
- if isinstance(data[c], pd.DataFrame):
- data = pd.melt(data, id_vars=[cc for cc in data.columns
- if cc != c], value_vars=c)\
- .drop("variable", axis=1)\
- .rename(columns={"value": c})
- return data
- def _make_dict(self, x: pd.Series) -> dict:
- '''
- return: transforms pandas series to a dictionary
- is meant to be applied to a dataframe in axis = 1,
- then the index of the input series are the column names
- of the dataframe
- '''
- return {f.split(".")[-1]: x[f] for f in x.index}
- def _make_list(self, x: pd.Series) -> list:
- '''
- return: list of values in a series
- '''
- return list(x)
- def _make_list_of_distinct(self, x: pd.Series) -> list:
- '''
- return: list of unique values from a Series where
- entries are arbitrary objects
- (pandas unique() method does not work if entries are of complex types)
- '''
- distinct = []
- [distinct.append(obj) for obj in x if obj not in distinct]
- return distinct
- def _unroll_nested_names(self, columns: list) -> list:
- '''
- '''
- unrolled = []
- for c in columns:
- splitted = c.split(".")
- for i in range(len(splitted)):
- unrolled.append(".".join(splitted[:i+1]))
- return unrolled
- if __name__ == "__main__":
- # Testing
- df = pd.DataFrame({
- "a": [1]*8 + [2]*8,
- "b": [10]*8 + [20]*8,
- "c": [100, 200]*8,
- "d.da": [11]*8 + [22]*8,
- "d.db": [33]*8 + [34]*8,
- "e.ea.eaa": [5]*8 + [55]*8,
- "e.ea.eab": [6]*8 + [66]*8,
- "e.eb": [2, 2, 3, 3]*4,
- "e.ec.eca": [1, 2, 3, 4]*4,
- "e.ec.ecb": [5, 6, 7, 8]*4,
- "f.fa": [1]*4 + [3]*4 + [11]*4 + [33]*4,
- "f.fb": [2]*4 + [3]*2 + [4]*2 + [22]*4 + [44]*4})
- duplicate = pd.DataFrame({"c": [300, 400]*8})
- df = pd.concat([df, duplicate], axis=1)
- schm = {
- "bsonType": "object",
- "required": ["a"],
- "properties": {
- "a": {"bsonType": "integer"},
- "b": {"bsonType": "integer"},
- "c": {
- "bsonType": "array",
- "items": {"bsonType": "integer"}
- },
- "d": {
- "bsonType": "object",
- "properties": {
- "da": {"bsonType": "integer"},
- "db": {"bsonType": "integer"}
- }
- },
- "e": {
- "bsonType": "object",
- "properties": {
- "ea": {
- "bsonType": "object",
- "properties": {
- "eaa": {"bsonType": "integer"},
- "eab": {"bsonType": "integer"}
- }
- },
- "eb": {
- "bsonType": "array",
- "items": {"bsonType": "integer"}
- },
- "ec": {
- "bsonType": "array",
- "items": {
- "bsonType": "object",
- "properties": {
- "eca": {"bsonType": "integer"},
- "ecb": {"bsonType": "integer"}
- }
- }
- }
- }
- },
- "f": {
- "bsonType": "array",
- "items": {
- "bsonType": "object",
- "properties": {
- "fa": {"bsonType": "integer"},
- "fb": {
- "bsonType": "array",
- "items": {"bsonType": "integer"}
- }
- }
- }
- }
- }
- }
- grp_fields = ["a"]
- result = DataFrameToCollection().to_list_of_documents(
- data=df,
- schema=schm,
- grp_fields=grp_fields)
|