DataFrameToCollection.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Mon Jul 22 11:05:47 2019
  5. @author: tanya
  6. @description: a function to reshape a pandas dataframe to a list of
  7. (possibly nested) documents with respect to a (json) mongodb schema
  8. """
  9. import pandas as pd
  10. import os
  11. import sys
  12. sys.path.append(os.getcwd())
  13. class DataFrameToCollection:
  14. '''
  15. '''
  16. def __init__(self, schema_path: str = None, log_path: str = None):
  17. '''
  18. '''
  19. from libraries.log import Log
  20. import json
  21. self._log = Log("ParseJsonSchema")
  22. if schema_path is not None:
  23. if not os.path.isfile(schema_path):
  24. err = "JsonSchema not found"
  25. self._log.error(err)
  26. raise FileNotFoundError(err)
  27. # load schema to dictionary if it is a valid json file
  28. try:
  29. with open(schema_path, "r") as f:
  30. self.schema = json.load(f)
  31. except Exception as e:
  32. err = ("Could not load json schema, "
  33. "Obtained error {}".format(e))
  34. self._log.error(err)
  35. raise Exception(err)
  36. else:
  37. self.schema = None
  38. def to_list_of_documents(self, data: pd.DataFrame,
  39. grp_fields: list,
  40. schema: dict = None,
  41. _return_data: bool = False) -> list:
  42. '''
  43. Reshapes a pandas dataframe to a list of documents according
  44. to a complex (json) mongodb schema
  45. Remark1: column names of data need to reflect the "nestedness"
  46. of the field in the mongodb schema with the help of a "." separator
  47. Example: field.sub_field_1, field.sub_field_2
  48. Remark2: if the schema is stored as a json file, first load it
  49. to a dictionary with the help of the python json module
  50. '''
  51. from copy import deepcopy
  52. from libraries.log import Log
  53. log = Log("reshape_dataframe_to_list_of_documents:")
  54. data = self._melt_duplicated_columns(data)
  55. reshaped_fields = []
  56. if schema is None:
  57. schema = self.schema
  58. for field in schema["properties"]:
  59. if field not in self._unroll_nested_names(data.columns):
  60. continue
  61. field_type = schema["properties"][field]["bsonType"]
  62. # if field has a simple type
  63. if field_type not in ["array", "object"]:
  64. grp_fields = [c for c in grp_fields if c in data.columns]
  65. n_distinct_values = data.groupby(grp_fields)[field].nunique()\
  66. .max()
  67. if n_distinct_values != 1:
  68. err = "Field {0} is not unique with respect to {1}"\
  69. .format(field, grp_fields)
  70. log.error(err)
  71. raise Exception(err)
  72. if field not in grp_fields:
  73. reshaped_field = data.groupby(grp_fields)[field].first()
  74. else:
  75. reshaped_field =\
  76. data[grp_fields].drop_duplicates()\
  77. .set_index(grp_fields, drop=False)[field]
  78. reshaped_fields.append(reshaped_field)
  79. # if field is sub-document (dictionary)
  80. elif field_type == "object":
  81. sub_schema = deepcopy(schema["properties"][field])
  82. # rename sub-schema properties to match with data column names
  83. sub_schema["properties"] =\
  84. {".".join([field, k]): v for k, v
  85. in sub_schema["properties"].items()}
  86. sub_data = self.to_list_of_documents(
  87. data=data,
  88. schema=sub_schema,
  89. grp_fields=grp_fields,
  90. _return_data=True)
  91. reshaped_field = sub_data.apply(self._make_dict, axis=1)
  92. reshaped_field.name = field
  93. reshaped_fields.append(reshaped_field)
  94. # if field is a list of dictionaries
  95. elif field_type == "array":
  96. items_type = schema["properties"][field]["items"]["bsonType"]
  97. if items_type == "object":
  98. sub_schema = deepcopy(schema["properties"][field]["items"])
  99. # rename sub-schema properties to match data column names
  100. sub_schema["properties"] =\
  101. {".".join([field, k]): v for k, v in
  102. sub_schema["properties"].items()}
  103. # extend grp fields by sub-fields of field simple types
  104. sub_grp_fields =\
  105. [f for f in sub_schema["properties"]
  106. if sub_schema["properties"][f]["bsonType"]
  107. not in ["array", "object"]]
  108. if len(sub_grp_fields) == 0:
  109. err = ("One of the sub-keys in a list of documents"
  110. " must be of simple type for the field {}"
  111. .format(field))
  112. log.error(err)
  113. raise Exception(err)
  114. # group and reshape sub-fields with complex types
  115. sub_data = self.to_list_of_documents(
  116. data=data,
  117. schema=sub_schema,
  118. grp_fields=grp_fields + sub_grp_fields,
  119. _return_data=True)
  120. if sub_data is not None:
  121. # gether the results into a list of dictionaries
  122. sub_data = sub_data.apply(self._make_dict, axis=1)
  123. sub_data.name = field
  124. sub_data = sub_data.reset_index(grp_fields)
  125. reshaped_field =\
  126. sub_data.groupby(grp_fields)[field]\
  127. .apply(self._make_list_of_distinct)
  128. reshaped_fields.append(reshaped_field)
  129. # if field is a list of values with simple type
  130. else:
  131. grp_fields = [c for c in grp_fields if c in data.columns]
  132. if field in data.columns:
  133. reshaped_field = data.groupby(grp_fields)[field]\
  134. .apply(self._make_list_of_distinct)
  135. reshaped_fields.append(reshaped_field)
  136. if len(reshaped_fields) > 0:
  137. reshaped_data = pd.concat(reshaped_fields, axis=1)
  138. if not _return_data:
  139. list_of_documents =\
  140. reshaped_data.drop(list(reshaped_data.index.names),
  141. axis=1, errors="ignore")\
  142. .reset_index(drop=False)
  143. log.info("Done reshaping the dataframe to a list of documents")
  144. return list_of_documents
  145. else:
  146. return reshaped_data
  147. def _melt_duplicated_columns(self, data: pd.DataFrame) -> pd.DataFrame:
  148. '''
  149. '''
  150. for c in set(data.columns):
  151. if isinstance(data[c], pd.DataFrame):
  152. data = pd.melt(data, id_vars=[cc for cc in data.columns
  153. if cc != c], value_vars=c)\
  154. .drop("variable", axis=1)\
  155. .rename(columns={"value": c})
  156. return data
  157. def _make_dict(self, x: pd.Series) -> dict:
  158. '''
  159. return: transforms pandas series to a dictionary
  160. is meant to be applied to a dataframe in axis = 1,
  161. then the index of the input series are the column names
  162. of the dataframe
  163. '''
  164. return {f.split(".")[-1]: x[f] for f in x.index}
  165. def _make_list(self, x: pd.Series) -> list:
  166. '''
  167. return: list of values in a series
  168. '''
  169. return list(x)
  170. def _make_list_of_distinct(self, x: pd.Series) -> list:
  171. '''
  172. return: list of unique values from a Series where
  173. entries are arbitrary objects
  174. (pandas unique() method does not work if entries are of complex types)
  175. '''
  176. distinct = []
  177. [distinct.append(obj) for obj in x if obj not in distinct]
  178. return distinct
  179. def _unroll_nested_names(self, columns: list) -> list:
  180. '''
  181. '''
  182. unrolled = []
  183. for c in columns:
  184. splitted = c.split(".")
  185. for i in range(len(splitted)):
  186. unrolled.append(".".join(splitted[:i+1]))
  187. return unrolled
  188. if __name__ == "__main__":
  189. # Testing
  190. df = pd.DataFrame({
  191. "a": [1]*8 + [2]*8,
  192. "b": [10]*8 + [20]*8,
  193. "c": [100, 200]*8,
  194. "d.da": [11]*8 + [22]*8,
  195. "d.db": [33]*8 + [34]*8,
  196. "e.ea.eaa": [5]*8 + [55]*8,
  197. "e.ea.eab": [6]*8 + [66]*8,
  198. "e.eb": [2, 2, 3, 3]*4,
  199. "e.ec.eca": [1, 2, 3, 4]*4,
  200. "e.ec.ecb": [5, 6, 7, 8]*4,
  201. "f.fa": [1]*4 + [3]*4 + [11]*4 + [33]*4,
  202. "f.fb": [2]*4 + [3]*2 + [4]*2 + [22]*4 + [44]*4})
  203. duplicate = pd.DataFrame({"c": [300, 400]*8})
  204. df = pd.concat([df, duplicate], axis=1)
  205. schm = {
  206. "bsonType": "object",
  207. "required": ["a"],
  208. "properties": {
  209. "a": {"bsonType": "integer"},
  210. "b": {"bsonType": "integer"},
  211. "c": {
  212. "bsonType": "array",
  213. "items": {"bsonType": "integer"}
  214. },
  215. "d": {
  216. "bsonType": "object",
  217. "properties": {
  218. "da": {"bsonType": "integer"},
  219. "db": {"bsonType": "integer"}
  220. }
  221. },
  222. "e": {
  223. "bsonType": "object",
  224. "properties": {
  225. "ea": {
  226. "bsonType": "object",
  227. "properties": {
  228. "eaa": {"bsonType": "integer"},
  229. "eab": {"bsonType": "integer"}
  230. }
  231. },
  232. "eb": {
  233. "bsonType": "array",
  234. "items": {"bsonType": "integer"}
  235. },
  236. "ec": {
  237. "bsonType": "array",
  238. "items": {
  239. "bsonType": "object",
  240. "properties": {
  241. "eca": {"bsonType": "integer"},
  242. "ecb": {"bsonType": "integer"}
  243. }
  244. }
  245. }
  246. }
  247. },
  248. "f": {
  249. "bsonType": "array",
  250. "items": {
  251. "bsonType": "object",
  252. "properties": {
  253. "fa": {"bsonType": "integer"},
  254. "fb": {
  255. "bsonType": "array",
  256. "items": {"bsonType": "integer"}
  257. }
  258. }
  259. }
  260. }
  261. }
  262. }
  263. grp_fields = ["a"]
  264. result = DataFrameToCollection().to_list_of_documents(
  265. data=df,
  266. schema=schm,
  267. grp_fields=grp_fields)