DataFrameToCollection.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Mon Jul 22 11:05:47 2019
  5. @author: tanya
  6. @description: a function to reshape a pandas dataframe to a list of
  7. (possibly nested) documents with respect to a (json) mongodb schema
  8. """
  9. import pandas as pd
  10. import numpy as np
  11. import os
  12. import sys
  13. sys.path.append(os.getcwd())
  14. class DataFrameToCollection():
  15. '''
  16. '''
  17. def __init__(self, schema_path: str):
  18. '''
  19. '''
  20. from libraries.log import Log
  21. import json
  22. self._log = Log("ParseJsonSchema")
  23. if not os.path.isfile(schema_path):
  24. err = "JsonSchema not found"
  25. self._log.error(err)
  26. raise FileNotFoundError(err)
  27. # load schema to dictionary if it is a valid json file
  28. try:
  29. with open(schema_path, "r") as f:
  30. self.schema = json.load(f)
  31. except Exception as e:
  32. err = ("Could not load json schema, "
  33. "Obtained error {}".format(e))
  34. self._log.error(err)
  35. raise Exception(err)
  36. def to_list_of_documents(self, data: pd.DataFrame,
  37. grp_fields: list,
  38. schema: dict = None,
  39. _final_step: bool = True) -> list:
  40. '''
  41. Reshapes a pandas dataframe to a list of documents according
  42. to a complex (json) mongodb schema
  43. Remark1: column names of data need to reflect the "nestedness"
  44. of the field in the mongodb schema with the help of a "." separator
  45. Example: field.sub_field_1, field.sub_field_2
  46. Remark2: if the schema is stored as a json file, first load it
  47. to a dictionary with the help of the python json module
  48. The function goes recurisively through all the fields and reshapes
  49. them correspondingly depending on whether the field is an array,
  50. an object, or simple field. For each field we group the data by the
  51. grp_fields and reshape it accordingly, the result is a pandas Series.
  52. In the end all the series are collected and concatenated.
  53. '''
  54. from copy import deepcopy
  55. data = self._melt_duplicated_columns(data)
  56. reshaped_fields = []
  57. if schema is None:
  58. schema = self.schema
  59. for field in schema["properties"]:
  60. if field not in self._unroll_nested_names(data.columns):
  61. continue
  62. field_type = schema["properties"][field]["bsonType"]
  63. # if field has a simple type
  64. if field_type not in ["array", "object"]:
  65. grp_fields = [c for c in grp_fields if c in data.columns]
  66. # check that there is only one possible value of this field
  67. n_distinct_values = data.groupby(grp_fields, sort=False)[field].nunique().max()
  68. if n_distinct_values != 1:
  69. err = "Field {0} is not unique with respect to {1}"\
  70. .format(field, grp_fields)
  71. self._log.error(err)
  72. raise Exception(err)
  73. if field not in grp_fields:
  74. reshaped_field = data.groupby(grp_fields, sort=False)[field].first()
  75. else:
  76. reshaped_field =\
  77. data[grp_fields].drop_duplicates()\
  78. .set_index(grp_fields, drop=False)[field]
  79. reshaped_fields.append(reshaped_field)
  80. # if field is sub-document (dictionary)
  81. elif field_type == "object":
  82. sub_schema = deepcopy(schema["properties"][field])
  83. # rename sub-schema properties to match with data column names
  84. sub_schema["properties"] =\
  85. {".".join([field, k]): v for k, v
  86. in sub_schema["properties"].items()}
  87. sub_data = self.to_list_of_documents(
  88. data=data,
  89. schema=sub_schema,
  90. grp_fields=grp_fields,
  91. _final_step=False)
  92. reshaped_field = sub_data.apply(self._make_dict, axis=1)
  93. reshaped_field.name = field
  94. reshaped_fields.append(reshaped_field)
  95. # if field is a list of dictionaries
  96. elif field_type == "array":
  97. items_type = schema["properties"][field]["items"]["bsonType"]
  98. if items_type == "object":
  99. sub_schema = deepcopy(schema["properties"][field]["items"])
  100. # rename sub-schema properties to match data column names
  101. sub_schema["properties"] =\
  102. {".".join([field, k]): v for k, v in
  103. sub_schema["properties"].items()}
  104. # extend grp fields by sub-fields of field simple types
  105. sub_grp_fields = [f for f in sub_schema["properties"]
  106. if (sub_schema["properties"][f]["bsonType"] not in ["array", "object"])
  107. and (f in data.columns)]
  108. if len(sub_grp_fields) == 0:
  109. err = ("One of the sub-keys in a list of documents"
  110. " must be of simple type for the field {}"
  111. .format(field))
  112. self._log.error(err)
  113. raise Exception(err)
  114. # group and reshape sub-fields with complex types
  115. sub_data = self.to_list_of_documents(
  116. data=data,
  117. schema=sub_schema,
  118. grp_fields=grp_fields + sub_grp_fields,
  119. _final_step=False)
  120. if sub_data is not None:
  121. # gether the results into a list of dictionaries
  122. sub_data = sub_data.apply(self._make_dict, axis=1)
  123. sub_data.name = field
  124. sub_data = sub_data.reset_index(grp_fields)
  125. reshaped_field =\
  126. sub_data.groupby(grp_fields, sort=False)[field]\
  127. .apply(self._make_list_of_distinct)
  128. reshaped_fields.append(reshaped_field)
  129. # if field is a list of values with simple type
  130. elif items_type == "array":
  131. grp_fields = [c for c in grp_fields if c in data.columns]
  132. if field in data.columns:
  133. reshaped_field = data.groupby(grp_fields, sort=False)[field]\
  134. .apply(self._make_list_of_distinct)
  135. reshaped_fields.append(reshaped_field)
  136. else:
  137. grp_fields = [c for c in grp_fields if c in data.columns]
  138. if field in data.columns:
  139. reshaped_field = data.groupby(grp_fields, sort=False)[field]\
  140. .apply(self._make_flattened_list_of_distinct)
  141. reshaped_fields.append(reshaped_field)
  142. if len(reshaped_fields) > 0:
  143. reshaped_fields = pd.concat(reshaped_fields, sort=False, axis=1)
  144. if _final_step:
  145. # dropping the index names if it is the final step,
  146. # if not the index is needed for merging
  147. reshaped_fields =\
  148. reshaped_fields.drop(list(reshaped_fields.index.names), axis=1, errors="ignore")\
  149. .reset_index(drop=False)
  150. self._log.info("Done reshaping the dataframe to a list of documents")
  151. return reshaped_fields
  152. else:
  153. return
  154. def _melt_duplicated_columns(self, data: pd.DataFrame) -> pd.DataFrame:
  155. '''
  156. '''
  157. data = data.copy(deep=True)
  158. for c in set(data.columns):
  159. if isinstance(data[c], pd.DataFrame):
  160. """
  161. data = pd.melt(data, id_vars=[cc for cc in data.columns
  162. if cc != c], value_vars=c)\
  163. .drop("variable", axis=1)\
  164. .rename(columns={"value": c})
  165. """
  166. data["temp"] = data[c].apply(self._make_list, axis=1)
  167. data.drop(c, axis=1, inplace=True)
  168. data = data.rename(columns={"temp": c})
  169. return data
  170. def _make_dict(self, x: pd.Series) -> dict:
  171. '''
  172. Transforms pandas series to a dictionary
  173. is meant to be applied to a dataframe in axis = 1,
  174. then the index of the input series are the column names
  175. of the dataframe
  176. '''
  177. def custom_is_null(y):
  178. if isinstance(pd.notnull(y), bool):
  179. return pd.notnull(y)
  180. else:
  181. return True
  182. return {f.split(".")[-1]: x[f] for f in x.index
  183. if custom_is_null(x[f])}
  184. def _make_list(self, x: pd.Series) -> list:
  185. '''
  186. return: list of values in a series
  187. '''
  188. return list(x)
  189. def _make_list_of_distinct(self, x: pd.Series) -> list:
  190. '''
  191. return: list of unique values from a Series where
  192. entries are arbitrary objects
  193. (pandas unique() method does not work if entries are of complex types)
  194. '''
  195. uniques = pd.DataFrame({"temp": x.tolist()})\
  196. .assign(temp_str=lambda y: y["temp"].astype(str))\
  197. .drop_duplicates(subset=["temp_str"])\
  198. .drop("temp_str", axis=1).iloc[:, 0].tolist()
  199. def is_empty(y):
  200. is_empty_dict = (isinstance(y, dict) and (len(y) == 0))
  201. is_empty_list = (isinstance(y, list) and (len(y) == 0))
  202. return is_empty_dict or is_empty_list
  203. return [el for el in uniques if not is_empty(el)]
  204. def _make_flattened_list_of_distinct(self, x: pd.Series) -> list:
  205. '''
  206. return: list of unique values from a Series where
  207. entries are arbitrary objects
  208. (pandas unique() method does not work if entries are of complex types)
  209. '''
  210. uniques = self._make_list_of_distinct(x)
  211. return uniques[0]
  212. def _unroll_nested_names(self, names: list) -> list:
  213. '''
  214. Example: transform a list ["name.firstname", "name.surname"]
  215. into ["name", "name.firstname", "name.surname"]
  216. '''
  217. unrolled = []
  218. for c in names:
  219. splitted = c.split(".")
  220. for i in range(len(splitted)):
  221. unrolled.append(".".join(splitted[:i+1]))
  222. return unrolled
  223. if __name__ == "__main__":
  224. # Testing
  225. df = pd.DataFrame({
  226. "a": [1]*8 + [2]*8,
  227. "b": [10]*8 + [20]*8,
  228. "c": [100, 200]*8,
  229. "d.da": [11]*8 + [22]*8,
  230. "d.db": [33]*8 + [34]*8,
  231. "e.ea.eaa": [5]*8 + [55]*8,
  232. "e.ea.eab": [6]*8 + [66]*8,
  233. "e.eb": [2, 2, 3, 3]*4,
  234. "e.ec.eca": [1, 2, 3, 4]*4,
  235. "e.ec.ecb": [5, 6, 7, 8]*4,
  236. "f.fa": [1]*4 + [3]*4 + [11]*4 + [33]*4,
  237. "f.fb": [2]*4 + [3]*2 + [4]*2 + [22]*4 + [44]*4})
  238. duplicate = pd.DataFrame({"c": [300, 400]*8})
  239. df = pd.concat([df, duplicate], axis=1)
  240. schm = {
  241. "bsonType": "object",
  242. "required": ["a"],
  243. "properties": {
  244. "a": {"bsonType": "integer"},
  245. "b": {"bsonType": "integer"},
  246. "c": {
  247. "bsonType": "array",
  248. "items": {"bsonType": "integer"}
  249. },
  250. "d": {
  251. "bsonType": "object",
  252. "properties": {
  253. "da": {"bsonType": "integer"},
  254. "db": {"bsonType": "integer"}
  255. }
  256. },
  257. "e": {
  258. "bsonType": "object",
  259. "properties": {
  260. "ea": {
  261. "bsonType": "object",
  262. "properties": {
  263. "eaa": {"bsonType": "integer"},
  264. "eab": {"bsonType": "integer"}
  265. }
  266. },
  267. "eb": {
  268. "bsonType": "array",
  269. "items": {"bsonType": "integer"}
  270. },
  271. "ec": {
  272. "bsonType": "array",
  273. "items": {
  274. "bsonType": "object",
  275. "properties": {
  276. "eca": {"bsonType": "integer"},
  277. "ecb": {"bsonType": "integer"}
  278. }
  279. }
  280. }
  281. }
  282. },
  283. "f": {
  284. "bsonType": "array",
  285. "items": {
  286. "bsonType": "object",
  287. "properties": {
  288. "fa": {"bsonType": "integer"},
  289. "fb": {
  290. "bsonType": "array",
  291. "items": {"bsonType": "integer"}
  292. }
  293. }
  294. }
  295. }
  296. }
  297. }
  298. grp_fields = ["a"]
  299. result = DataFrameToCollection().to_list_of_documents(
  300. data=df,
  301. schema=schm,
  302. grp_fields=grp_fields)