DataFrameToCollection.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Mon Jul 22 11:05:47 2019
  5. @author: tanya
  6. @description: a function to reshape a pandas dataframe to a list of
  7. (possibly nested) documents with respect to a (json) mongodb schema
  8. """
  9. import pandas as pd
  10. import numpy as np
  11. import os
  12. import sys
  13. import time
  14. sys.path.append(os.getcwd())
  15. class DataFrameToCollection():
  16. '''
  17. '''
  18. def __init__(self, schema_path: str):
  19. '''
  20. '''
  21. from cdplib.log import Log
  22. import json
  23. self._log = Log("ParseJsonSchema")
  24. if not os.path.isfile(schema_path):
  25. err = "JsonSchema not found"
  26. self._log.error(err)
  27. raise FileNotFoundError(err)
  28. # load schema to dictionary if it is a valid json file
  29. try:
  30. with open(schema_path, "r") as f:
  31. self.schema = json.load(f)
  32. except Exception as e:
  33. err = ("Could not load json schema, "
  34. "Obtained error {}".format(e))
  35. self._log.error(err)
  36. raise Exception(err)
  37. def to_list_of_documents(self, data: pd.DataFrame,
  38. grp_fields: list,
  39. schema: dict = None,
  40. _final_step: bool = True) -> list:
  41. '''
  42. Reshapes a pandas dataframe to a list of documents according
  43. to a complex (json) mongodb schema
  44. Remark1: column names of data need to reflect the "nestedness"
  45. of the field in the mongodb schema with the help of a "." separator
  46. Example: field.sub_field_1, field.sub_field_2
  47. Remark2: if the schema is stored as a json file, first load it
  48. to a dictionary with the help of the python json module
  49. The function goes recurisively through all the fields and reshapes
  50. them correspondingly depending on whether the field is an array,
  51. an object, or simple field. For each field we group the data by the
  52. grp_fields and reshape it accordingly, the result is a pandas Series.
  53. In the end all the series are collected and concatenated.
  54. '''
  55. from copy import deepcopy
  56. data = self._melt_duplicated_columns(data)
  57. reshaped_fields = []
  58. if schema is None:
  59. schema = self.schema
  60. for field in schema["properties"]:
  61. if field not in self._unroll_nested_names(data.columns):
  62. continue
  63. field_type = schema["properties"][field]["bsonType"]
  64. # if field has a simple type
  65. if field_type not in ["array", "object"]:
  66. grp_fields = [c for c in grp_fields if c in data.columns]
  67. # check that there is only one possible value of this field
  68. n_distinct_values = data.groupby(grp_fields, sort=False)[field].nunique().max()
  69. # n_distinct_valus can be 0 if the column only contains NaN values
  70. if n_distinct_values > 1:
  71. err = "Field {0} is not unique with respect to {1}"\
  72. .format(field, grp_fields)
  73. self._log.error(err)
  74. raise Exception(err)
  75. if field not in grp_fields:
  76. reshaped_field = data.groupby(grp_fields, sort=False)[field].first()
  77. else:
  78. reshaped_field =\
  79. data[grp_fields].drop_duplicates()\
  80. .set_index(grp_fields, drop=False)[field]
  81. reshaped_fields.append(reshaped_field)
  82. # if field is sub-document (dictionary)
  83. elif field_type == "object":
  84. sub_schema = deepcopy(schema["properties"][field])
  85. # rename sub-schema properties to match with data column names
  86. sub_schema["properties"] =\
  87. {".".join([field, k]): v for k, v
  88. in sub_schema["properties"].items()}
  89. sub_data = self.to_list_of_documents(
  90. data=data,
  91. schema=sub_schema,
  92. grp_fields=grp_fields,
  93. _final_step=False)
  94. # Need to be checked since child elements can be empty
  95. if sub_data is not None:
  96. reshaped_field = sub_data.apply(self._make_dict, axis=1)
  97. reshaped_field.name = field
  98. reshaped_fields.append(reshaped_field)
  99. # if field is a list of dictionaries
  100. elif field_type == "array":
  101. items_type = schema["properties"][field]["items"]["bsonType"]
  102. if items_type == "object":
  103. array_object = time.time()
  104. sub_schema = deepcopy(schema["properties"][field]["items"])
  105. # rename sub-schema properties to match data column names
  106. sub_schema["properties"] =\
  107. {".".join([field, k]): v for k, v in
  108. sub_schema["properties"].items()}
  109. # extend grp fields by sub-fields of field simple types
  110. sub_grp_fields = [f for f in sub_schema["properties"]
  111. if (sub_schema["properties"][f]["bsonType"] not in ["array", "object"])
  112. and (f in data.columns)]
  113. if len(sub_grp_fields) == 0:
  114. err = ("One of the sub-keys in a list of documents"
  115. " must be of simple type for the field {}"
  116. .format(field))
  117. self._log.error(err)
  118. raise Exception(err)
  119. # group and reshape sub-fields with complex types
  120. sub_data = self.to_list_of_documents(
  121. data=data,
  122. schema=sub_schema,
  123. grp_fields=grp_fields + sub_grp_fields,
  124. _final_step=False)
  125. if sub_data is not None:
  126. # gether the results into a list of dictionaries
  127. sub_data = sub_data.apply(self._make_dict, axis=1)
  128. sub_data.name = field
  129. sub_data = sub_data.reset_index(grp_fields)
  130. ######################################################
  131. ######## OPTIMIZATIONS MAY BE POSSIBLE HERE ##########
  132. reshaped_field =\
  133. sub_data.groupby(grp_fields, sort=False)[field]\
  134. .apply(self._make_list_of_distinct)
  135. ######################################################
  136. reshaped_fields.append(reshaped_field)
  137. # if field is a list of values with simple type
  138. elif items_type == "array":
  139. grp_fields = [c for c in grp_fields if c in data.columns]
  140. if field in data.columns:
  141. reshaped_field = data.groupby(grp_fields, sort=False)[field]\
  142. .apply(self._make_list_of_distinct)
  143. reshaped_fields.append(reshaped_field)
  144. else:
  145. grp_fields = [c for c in grp_fields if c in data.columns]
  146. if field in data.columns:
  147. reshaped_field = data.groupby(grp_fields, sort=False)[field]\
  148. .apply(self._make_flattened_list_of_distinct)
  149. reshaped_fields.append(reshaped_field)
  150. if len(reshaped_fields) > 0:
  151. reshaped_fields = pd.concat(reshaped_fields, sort=False, axis=1)
  152. if _final_step:
  153. # dropping the index names if it is the final step,
  154. # if not the index is needed for merging
  155. reshaped_fields =\
  156. reshaped_fields.drop(list(reshaped_fields.index.names), axis=1, errors="ignore")\
  157. .reset_index(drop=False)
  158. self._log.info("Done reshaping the dataframe to a list of documents")
  159. return reshaped_fields
  160. else:
  161. return
  162. def _melt_duplicated_columns(self, data: pd.DataFrame) -> pd.DataFrame:
  163. '''
  164. '''
  165. data = data.copy(deep=True)
  166. for c in set(data.columns):
  167. if isinstance(data[c], pd.DataFrame):
  168. """
  169. data = pd.melt(data, id_vars=[cc for cc in data.columns
  170. if cc != c], value_vars=c)\
  171. .drop("variable", axis=1)\
  172. .rename(columns={"value": c})
  173. """
  174. data["temp"] = data[c].apply(self._make_list, axis=1)
  175. data.drop(c, axis=1, inplace=True)
  176. data = data.rename(columns={"temp": c})
  177. return data
  178. def _make_dict(self, x: pd.Series) -> dict:
  179. '''
  180. Transforms pandas series to a dictionary
  181. is meant to be applied to a dataframe in axis = 1,
  182. then the index of the input series are the column names
  183. of the dataframe
  184. '''
  185. def custom_is_null(y):
  186. if isinstance(pd.notnull(y), bool):
  187. return pd.notnull(y)
  188. else:
  189. return True
  190. return {f.split(".")[-1]: x[f] for f in x.index
  191. if custom_is_null(x[f])}
  192. def _make_list(self, x: pd.Series) -> list:
  193. '''
  194. return: list of values in a series
  195. '''
  196. return list(x)
  197. def _make_list_of_distinct(self, x: pd.Series) -> list:
  198. '''
  199. return: list of unique values from a Series where
  200. entries are arbitrary objects
  201. (pandas unique() method does not work if entries are of complex types)
  202. '''
  203. if x.size == 1:
  204. uniques = x.tolist()
  205. '''
  206. if return_value == [{}]:
  207. return []
  208. return return_value
  209. '''
  210. else:
  211. uniques = pd.DataFrame({"temp": x.values})\
  212. .assign(temp_str=lambda y: y["temp"].astype(np.str))\
  213. .drop_duplicates(subset=["temp_str"])\
  214. .drop("temp_str", axis=1).iloc[:, 0].tolist()
  215. def is_empty(y):
  216. is_empty_dict = (isinstance(y, dict) and (len(y) == 0))
  217. is_empty_list = (isinstance(y, list) and (len(y) == 0))
  218. return is_empty_dict or is_empty_list
  219. return [el for el in uniques if not is_empty(el)]
  220. def _make_flattened_list_of_distinct(self, x: pd.Series) -> list:
  221. '''
  222. return: list of unique values from a Series where
  223. entries are arbitrary objects
  224. (pandas unique() method does not work if entries are of complex types)
  225. '''
  226. uniques = self._make_list_of_distinct(x)
  227. return uniques[0]
  228. def _unroll_nested_names(self, names: list) -> list:
  229. '''
  230. Example: transform a list ["name.firstname", "name.surname"]
  231. into ["name", "name.firstname", "name.surname"]
  232. '''
  233. unrolled = []
  234. for c in names:
  235. splitted = c.split(".")
  236. for i in range(len(splitted)):
  237. unrolled.append(".".join(splitted[:i+1]))
  238. return unrolled
  239. if __name__ == "__main__":
  240. # Testing
  241. df = pd.DataFrame({
  242. "a": [1]*8 + [2]*8,
  243. "b": [10]*8 + [20]*8,
  244. "c": [100, 200]*8,
  245. "d.da": [11]*8 + [22]*8,
  246. "d.db": [33]*8 + [34]*8,
  247. "e.ea.eaa": [5]*8 + [55]*8,
  248. "e.ea.eab": [6]*8 + [66]*8,
  249. "e.eb": [2, 2, 3, 3]*4,
  250. "e.ec.eca": [1, 2, 3, 4]*4,
  251. "e.ec.ecb": [5, 6, 7, 8]*4,
  252. "f.fa": [1]*4 + [3]*4 + [11]*4 + [33]*4,
  253. "f.fb": [2]*4 + [3]*2 + [4]*2 + [22]*4 + [44]*4})
  254. duplicate = pd.DataFrame({"c": [300, 400]*8})
  255. df = pd.concat([df, duplicate], axis=1)
  256. schm = {
  257. "bsonType": "object",
  258. "required": ["a"],
  259. "properties": {
  260. "a": {"bsonType": "integer"},
  261. "b": {"bsonType": "integer"},
  262. "c": {
  263. "bsonType": "array",
  264. "items": {"bsonType": "integer"}
  265. },
  266. "d": {
  267. "bsonType": "object",
  268. "properties": {
  269. "da": {"bsonType": "integer"},
  270. "db": {"bsonType": "integer"}
  271. }
  272. },
  273. "e": {
  274. "bsonType": "object",
  275. "properties": {
  276. "ea": {
  277. "bsonType": "object",
  278. "properties": {
  279. "eaa": {"bsonType": "integer"},
  280. "eab": {"bsonType": "integer"}
  281. }
  282. },
  283. "eb": {
  284. "bsonType": "array",
  285. "items": {"bsonType": "integer"}
  286. },
  287. "ec": {
  288. "bsonType": "array",
  289. "items": {
  290. "bsonType": "object",
  291. "properties": {
  292. "eca": {"bsonType": "integer"},
  293. "ecb": {"bsonType": "integer"}
  294. }
  295. }
  296. }
  297. }
  298. },
  299. "f": {
  300. "bsonType": "array",
  301. "items": {
  302. "bsonType": "object",
  303. "properties": {
  304. "fa": {"bsonType": "integer"},
  305. "fb": {
  306. "bsonType": "array",
  307. "items": {"bsonType": "integer"}
  308. }
  309. }
  310. }
  311. }
  312. }
  313. }
  314. grp_fields = ["a"]
  315. result = DataFrameToCollection().to_list_of_documents(
  316. data=df,
  317. schema=schm,
  318. grp_fields=grp_fields)