123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Tue Apr 28 12:01:07 2020
- @author: tanya
- @description: a class to reshape a pandas dataframe to a dataframe
- where rows are (possibly nested) documents
- according to a mongodb JsonSchema
- """
- import pandas as pd
- import numpy as np
- import os
- import sys
- from copy import deepcopy
- from cdplib.log import Log
- sys.path.append(os.getcwd())
- class DataFrameToCollection():
- def __init__(self,
- grp_fields: list,
- schema_path: str = None,
- schema: dict = None):
- '''
- Method to_documents transforms a dataframe
- to a dataframe where each row is a (possibly nested) document
- columns of the dataframe represent nested fields of the schema
- separated by "."
- Example: customer.name.first, customer.name.last
- :param grp_fields: the unique identifiers of the documents
- :param schema: JsonSchema for the mongodb collection the data should
- be conform with
- '''
- import json
- self._logger = Log("DataFrameToCollection")
- self._grp_fields = grp_fields
- if (schema_path is None) and (schema is None):
- err = "Specify either schema or schema_path"
- self._logger.log_and_raise_error(err)
- elif schema is not None:
- self.schema = schema
- else:
- if not os.path.isfile(schema_path):
- err = "JsonSchema not found"
- self._logger.log_and_raise_error(err)
- # load schema to dictionary if it is a valid json file
- try:
- with open(schema_path, "r") as f:
- self.schema = json.load(f)
- except Exception as e:
- err = ("Failed to load the schema,"
- "exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- def to_documents(self,
- data: pd.DataFrame,
- grp_fields: list = None,
- schema: dict = None):
- '''
- Transforms a dataframe to a dataframe where each
- row is a (possibly nested) document.
- columns of the dataframe represent nested fields of the schema
- separated by "."
- Example: customer.name.first, customer.name.last
- '''
- data = self._concatenate_duplicated_columns(data)
- if grp_fields is None:
- grp_fields = self._grp_fields
- grp_fields = [c for c in grp_fields if c in data.columns]
- if schema is None:
- schema = self.schema
- reshaped_df = data[grp_fields].drop_duplicates().set_index(grp_fields)
- for field in schema["properties"]:
- field_type = self._field_type(field=field, schema=schema)
- if not self._field_is_a_column:
- continue
- args = {"field": field, "schema": schema,
- "data": data, "grp_fields": grp_fields}
- if field_type == "object":
- reshaped_column = self._reshape_object_column(**args)
- elif field_type == "array":
- reshaped_column = self._reshape_array_column(**args)
- else:
- # has simple type
- reshaped_column = self._reshape_simple_column(**args)
- reshaped_df = self._bring_to_same_index(
- df1=reshaped_df,
- df2=reshaped_column)
- reshaped_df = pd.concat([reshaped_df, reshaped_column],
- axis=1, ignore_index=False)
- # reset the index
- reshaped_df = reshaped_df\
- .drop(list(reshaped_df.index.names), axis=1, errors="ignore")\
- .reset_index(drop=False)
- return reshaped_df
- def _reshape_simple_column(self, field: str,
- data: pd.DataFrame,
- grp_fields: list,
- schema: dict) -> pd.DataFrame:
- '''
- Example: grp_fields = [product_id, store_id],
- field = price,
- some_other_field = customer_id
- product_id | store_id | price | customer_id
- p1 s1 5.99 c1
- p1 s1 5.99 c2
- p2 s1 7.99 c3
- result:
- index price
- product_id store_id
- p1 s1 5.99
- p2 s1 7.99
- '''
- self._assert_is_one_to_one(field=field,
- data=data,
- grp_fields=grp_fields)
- if field not in grp_fields:
- fields = grp_fields + [field]
- else:
- fields = grp_fields
- return data[fields].drop_duplicates(subset=grp_fields)\
- .set_index(grp_fields, drop=False)[field]
- def _reshape_object_column(self, field: str,
- data: pd.DataFrame,
- grp_fields: list,
- schema: dict) -> pd.Series:
- '''
- Reshapes a DataFrame to a Series with one column of dictionary dtype
- Example 1 (simple):
- grp_fields: ["transaction_id"]
- schema = {"bsonType": object,
- "properties": {
- "ransaction_id": str,
- "commodity": {
- "bsonType": object,
- "properties": {
- "id": String,
- "name": "fruits",
- }
- },
- "customer": {
- "bsonType": "object",
- "properties": {
- "name": {
- "bsonType": "object",
- "properties": {
- "first": str,
- "last": str
- },
- "bank": str
- }
- }
- }
- }
- }
- data:
- =====================================================
- transaction_id | commodity.id | commodity.name | customer.name.first | customer.name.last | customer.bank
- t1 com1 fruits Jude Law sberbank
- t2 com2 meat Meryl Streep raiffeisen
- ===================================================
- field 1: commodity (level of nestedness 1)
- result 1:
- Index commodity
- transaction_id
- p1 {id: com1, name: fruits}
- p2 {id: com2, name: meat}
- ====================================================
- field 2: customer (level of nestedness 2)
- result 2:
- Index customer
- transaction_id
- t1 {name: {first: Jude, last: Law}, bank: sberbank}
- t2 {name: {first: Meryl, last: Streep}, bank: raiffeisen}
- =====================================================
- '''
- data = data.copy(deep=True)
- field_subschema = self._object_subschema(
- field=field,
- schema=schema)
- # if sub_schema is nested then recursively apply the function
- # to reshape its sub-fields first
- reshaped_df = self.to_documents(
- data=data,
- grp_fields=grp_fields,
- schema=field_subschema)
- reshaped_df = reshaped_df\
- .set_index(grp_fields, drop=False)\
- .drop(self._grp_fields, axis=1, errors="ignore")
- if reshaped_df is not None:
- reshaped_column = reshaped_df.apply(self._row_to_dict, axis=1)
- reshaped_column.name = field
- else:
- reshaped_column = None
- return reshaped_column
- def _reshape_array_column(self, field: str,
- data: pd.DataFrame,
- grp_fields: list,
- schema: dict) -> pd.DataFrame:
- '''
- Example
- grp_fields = [customer]
- schema = {"bsonType": object,
- "properties": {
- "customer": str,
- "product": {
- "bsonType": array,
- "items": {
- "bsonType": "str"
- }
- },
- "commodity": {
- "bsonType": array,
- "items": {
- "bsonType": "object",
- "properties": {
- "id": String,
- "name": "fruits",
- }
- }
- }
- }
- }
- data:
- =====================================================
- customer | product | commodity.id | commodity.name
- c1 p1 com1 fruits
- c1 p2 com1 fruits
- c1 p3 com2 meat
- c1 p4 com1 fruits
- c2 p1 com2 meat
- ===================================================
- field 1: product
- result 1:
- Index product
- customer
- c1 [p1, p2, p3, p4]
- c2 [p1]
- ====================================================
- field 2: commodity
- result 2:
- Index commodity
- customer
- c1 [{id: com1, name: fruits}, {id: com2, name: meat}]
- c2 [{id: com2, name: meant}]
- '''
- data = data.copy(deep=True)
- items_type = self._items_type(field=field, schema=schema)
- if items_type == "array":
- reshaped_column = data.groupby(grp_fields, sort=False)[field]\
- .apply(self._column_to_uniques, axis=0)
- elif items_type == "object":
- items_subschema = self._items_subschema(field=field, schema=schema)
- simple_subfields = self._simple_subfields(field=field,
- schema=items_subschema)
- reshaped_column = self._reshape_object_column(
- field=field,
- data=data,
- grp_fields=grp_fields + simple_subfields,
- schema=items_subschema,)
- reshaped_column = reshaped_column\
- .reset_index(grp_fields)\
- .groupby(grp_fields, sort=False)[field]\
- .apply(self._column_to_uniques)
- else:
- # items type is simple type
- reshaped_column = data\
- .groupby(grp_fields, sort=False)[field]\
- .apply(self._column_to_uniques_flattened)
- return reshaped_column
- def _concatenate_duplicated_columns(self,
- data: pd.DataFrame) -> pd.DataFrame:
- '''
- Example:
- data:
- =====================================================
- customer | product | product
- c1 p1 p2
- c2 p1 p3
- result
- customer | product
- c1 [p1, p2]
- c2 [p1, p3]
- '''
- data = data.copy(deep=True)
- def row_to_list(row: pd.Series) -> list:
- return list(row)
- for c in set(data.columns):
- if isinstance(data[c], pd.DataFrame):
- data["temp"] = data[c].apply(row_to_list, axis=1)
- data.drop(c, axis=1, inplace=True)
- data = data.rename(columns={"temp": c})
- return data
- def _field_is_a_column(self, field: str,
- data: pd.DataFrame,
- schema: dict) -> bool:
- '''
- Example:
- schema = {"bsonType": object,
- "properties": {
- "customer": str,
- "commodity": {
- "bsonType": array,
- "items": {
- "bsonType": "object",
- "properties": {
- "id": String,
- "name": "fruits",
- }
- }
- }
- }
- }
- data:
- =====================================================
- customer | commodity.id | commodity.name
- c1 com1 fruits
- c1 com1 fruits
- c1 com2 meat
- c1 com1 fruits
- c2 com2 meat
- ===================================================
- Returns True for customer, commodity, commodity.id, commodity.name
- '''
- unrolled_names = []
- for c in data.columns:
- splitted = c.split(".")
- for i in range(len(splitted)):
- unrolled_names.append(".".join(splitted[:i+1]))
- return (field in unrolled_names)
- def _field_type(self, field: str, schema: dict) -> str:
- '''
- Can be simple type, object, or array
- '''
- return schema["properties"][field]["bsonType"]
- def _items_type(self, field: str, schema: dict) -> str:
- '''
- type of items of an array field (can be simple, object, or array)
- '''
- return schema["properties"][field]["items"]["bsonType"]
- def _has_simple_type(self, field: str, schema: dict) -> bool:
- '''
- True if is not array and is not object
- '''
- field_type = self._field_type(field=field, schema=schema)
- return field_type not in ["array", "object"]
- def _simple_subfields(self, field: str, schema: dict) -> list:
- '''
- Returns list of fields that have a simple type
- Example:
- "customer": {
- "bsonType": "object",
- "properties": {
- "name": {
- "bsonType": "object",
- "properties": {
- "first": str,
- "last": str
- },
- }
- "bank": str
- }
- }
- returns [customer.bank]
- '''
- subschema = schema["properties"][field]
- return [field + "." + f for f in subschema["properties"]
- if self._has_simple_type(field=f, schema=subschema)]
- def _assert_is_one_to_one(self, field: str,
- data: pd.DataFrame,
- grp_fields: list):
- '''
- A column with simple or object type should not have duplicated entries
- per combination of grp columns.
- Example: grp_fields = [product_id, store_id],
- field = price,
- some_other_field = customer_id
- Good example:
- product_id | store_id | price | customer_id
- p1 s1 5.99 c1
- p1 s1 5.99 c2
- p2 s1 7.99 c3
- Bad example:
- product_id | store_id | price | customer_id
- p1 s1 5.99 c1
- p1 s1 6.99 c2
- p2 s1 7.99 c3
- '''
- is_one_to_one = \
- (len(data[grp_fields + [field]].drop_duplicates()) ==
- len(data[grp_fields].drop_duplicates()))
- if not is_one_to_one:
- print(data[grp_fields + [field]])
- err = ("Column {0} should be one to one "
- "with {1}").format(field, grp_fields)
- self._logger.log_and_raise_error(err)
- def _bring_to_same_index(self,
- df1: pd.DataFrame,
- df2: pd.DataFrame):
- '''
- if index names of df2 are in index or column names of df1,
- sets the index of df1 to the index of df2,
- else raises an error
- '''
- def has_same_index(df1, df2):
- expr = df1.index.names != df2.index.names
- if hasattr(expr, "any"):
- return expr.any()
- else:
- return expr
- try:
- if has_same_index(df1, df2):
- df2_index_names = list(df2.index.names)
- df1 = df2\
- .reset_index(drop=False)\
- .drop("index", axis=1, errors="ignore")\
- .set_index(df2_index_names)
- return df1
- except Exception as e:
- err = ("Error in the index of the reshaped dataframe."
- " Exit with error: {}".format(e))
- self._logger.log_and_raise_error(err)
- def _object_subschema(self, field: str, schema: dict) -> dict:
- '''
- If a field is an object, its value is a sub-schema.
- This method completes the sub-schema key names
- to the key names of the initial schema.
- Example: field: commodity
- schema = {"bsonType": object,
- "properties": {
- "commodity": {
- "bsonType": object,
- "properties": {
- "id": "com1",
- "name": "fruits",
- }
- },
- "product_id": p1,
- "store_id": s1
- }
- }
- result
- {commodity.id : com1, commodity.name : fruits}
- '''
- subschema = deepcopy(schema["properties"][field])
- subschema["properties"] = {".".join([field, k]): v
- for k, v
- in subschema["properties"].items()}
- return subschema
- def _items_subschema(self, field: str, schema: dict) -> dict:
- '''
- schema = {"bsonType": object,
- "properties": {
- "commodity": {
- "bsonType": array,
- "items": {
- "bsonType": "object",
- "properties": {
- "id": String,
- "name": "fruits",
- }
- }
- }
- }
- }
- field = commodity
- returns {"bsonType": "object",
- "properties": {
- "id": String,
- "name": "fruits",
- }
- }
- '''
- subschema = deepcopy(schema["properties"][field]["items"])
- return {"bsonType": "object",
- "properties": {field: subschema}}
- def _row_to_dict(self, row: pd.Series) -> dict:
- '''
- Transforms pandas series to a dictionary
- is meant to be applied to a dataframe in axis = 1
- Example:
- row:
- Index commodity.id | commodity.name
- transaction_id
- t1 com1 fruits
- result:
- Index commodity
- transaction_id
- t1 {id: com1, name: fruits}
- '''
- def drop_prefix(field: str) -> str:
- return field.split(".")[-1]
- def row_not_null(entry) -> str:
- '''
- entry can be dict, list (or array), scalar
- check these three cases
- '''
- if isinstance(entry, dict):
- # not null, if one of the keys is not null
- return pd.notnull(list(entry.values())).any()
- # list
- elif hasattr(pd.notnull(entry), "any"):
- return pd.notnull(entry).any()
- # scalar
- else:
- return pd.notnull(entry)
- return {drop_prefix(field): row[field] for field in row.index
- if row_not_null(row[field])}
- def _column_to_uniques(self, column: pd.Series) -> pd.DataFrame:
- '''
- return: list of unique values from a Series where
- entries are arbitrary objects
- (pandas unique() method does not work if entries are of complex types)
- Example:
- column:
- 1
- 1
- 2
- [1,2]
- [1,2],
- [1,2,3]
- [[1,2], [1,2]]
- {"a": 1, "b": 2}
- Result:
- 1
- 2
- [1,2]
- [1,2,3],
- [[1,2], [1,2]]
- {"a": 1, "b": 2}
- '''
- is_simple_dtype = (np.issubdtype(pd.Series(column), np.number)) or\
- (pd.Series(column).dtype == str)
- if is_simple_dtype:
- uniques = list(pd.Series(column).unique())
- else:
- uniques = pd.DataFrame({"temp": column.tolist()})\
- .assign(temp_str=lambda y: y["temp"].astype(str))\
- .drop_duplicates(subset=["temp_str"])\
- .drop("temp_str", axis=1).iloc[:, 0].tolist()
- def is_empty(y):
- is_empty_dict = (isinstance(y, dict) and (len(y) == 0))
- is_empty_list = (isinstance(y, list) and (len(y) == 0))
- return is_empty_dict or is_empty_list
- uniques = [el for el in uniques if not is_empty(el)]
- return uniques
- def _column_to_uniques_flattened(self, column: pd.Series) -> pd.Series:
- '''
- Does the same as colum_to_uniques, but flattens lists of lists
- Example:
- column:
- 1
- 1
- 2
- [1,2]
- [1,2],
- [1,2,3]
- [[1,2], [1,2]]
- {"a": 1, "b": 2}
- Result:
- 1
- 2
- [1,2]
- [1,2,3],
- {"a": 1, "b": 2}
- '''
- uniques = self._column_to_uniques(column)
- def flatten_list(l):
- from itertools import chain
- return list(chain.from_iterable(l))
- is_list_of_lists = (isinstance(uniques, list)) \
- and (isinstance(uniques[0], list))
- if is_list_of_lists:
- uniques = flatten_list(uniques)
- return uniques
- def test():
- '''
- '''
- df = pd.DataFrame({
- "a": [5]*8 + [6]*8,
- "b": [10]*8 + [20]*8,
- "c1": [100, 200]*8,
- "c": [100, 200]*8,
- "d.da": [11]*8 + [22]*8,
- "d.db": [33]*8 + [34]*8,
- "e.ea.eaa": [5]*8 + [55]*8,
- "e.ea.eab": [6]*8 + [66]*8,
- "e.eb": [2, 2, 3, 3]*4,
- "e.ec.eca": [1, 2, 3, 4]*4,
- "e.ec.ecb": [5, 6, 7, 8]*4,
- "f.fa": [1]*4 + [3]*4 + [11]*4 + [33]*4,
- "f.fb": [2]*4 + [3]*2 + [4]*2 + [22]*4 + [44]*4,
- "g.ga": [1, 2, 3, 4]*4,
- })
- print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
- print("DataFrame looks like")
- print(df)
- print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
- duplicate = pd.DataFrame({"c": [300, 400]*8})
- df = pd.concat([df, duplicate], axis=1)
- schm = {"bsonType": "object",
- "required": ["a"],
- "properties": {
- "a": {
- "bsonType": "integer"
- },
- "b": {
- "bsonType": "integer"
- },
- "c": {
- "bsonType": "array",
- "items": {
- "bsonType": "integer"
- }
- },
- "c1": {
- "bsonType": "array",
- "items": {
- "bsonType": "integer"
- }
- },
- "d": {
- "bsonType": "object",
- "properties": {
- "da": {"bsonType": "integer"},
- "db": {"bsonType": "integer"}
- }
- },
- "e": {
- "bsonType": "object",
- "properties": {
- "ea": {
- "bsonType": "object",
- "properties": {
- "eaa": {"bsonType": "integer"},
- "eab": {"bsonType": "integer"}
- }
- },
- "eb": {
- "bsonType": "array",
- "items": {
- "bsonType": "integer"
- }
- },
- "ec": {
- "bsonType": "array",
- "items": {
- "bsonType": "object",
- "properties": {
- "eca": {"bsonType": "integer"},
- "ecb": {"bsonType": "integer"}
- }
- }
- }
- }
- },
- "f": {
- "bsonType": "array",
- "items": {
- "bsonType": "object",
- "properties": {
- "fa": {"bsonType": "integer"},
- "fb": {
- "bsonType": "array",
- "items": {"bsonType": "integer"}
- }
- }
- }
- },
- "g": {
- "bsonType": "array",
- "items": {
- "bsonType": "object",
- "properties": {
- "ga": {"bsonType": "integer"}
- }
- }
- }
- }
- }
- grp_fields = ["a"]
- reshaper = DataFrameToCollection(grp_fields=grp_fields, schema=schm)
- reshaped_df = reshaper.to_documents(df)
- print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
- print("DataFrame looks like")
- print(reshaped_df)
- print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
- if __name__ == "__main__":
- test()
|