DataFrameToCollection.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Mon Jul 22 11:05:47 2019
  5. @author: tanya
  6. @description: a function to reshape a pandas dataframe to a list of
  7. (possibly nested) documents with respect to a (json) mongodb schema
  8. """
  9. import pandas as pd
  10. import numpy as np
  11. import os
  12. import sys
  13. import time
  14. sys.path.append(os.getcwd())
  15. class DataFrameToCollection():
  16. '''
  17. '''
  18. def __init__(self, schema_path: str):
  19. '''
  20. '''
  21. from cdplib.log import Log
  22. import json
  23. self._log = Log("ParseJsonSchema")
  24. if not os.path.isfile(schema_path):
  25. err = "JsonSchema not found"
  26. self._log.error(err)
  27. raise FileNotFoundError(err)
  28. # load schema to dictionary if it is a valid json file
  29. try:
  30. with open(schema_path, "r") as f:
  31. self.schema = json.load(f)
  32. except Exception as e:
  33. err = ("Could not load json schema, "
  34. "Obtained error {}".format(e))
  35. self._log.error(err)
  36. raise Exception(err)
  37. def to_list_of_documents(self, data: pd.DataFrame,
  38. grp_fields: list,
  39. schema: dict = None,
  40. _final_step: bool = True,
  41. already_reshaped: list = []) -> list:
  42. '''
  43. Reshapes a pandas dataframe to a list of documents according
  44. to a complex (json) mongodb schema
  45. Remark1: column names of data need to reflect the "nestedness"
  46. of the field in the mongodb schema with the help of a "." separator
  47. Example: field.sub_field_1, field.sub_field_2
  48. Remark2: if the schema is stored as a json file, first load it
  49. to a dictionary with the help of the python json module
  50. The function goes recurisively through all the fields and reshapes
  51. them correspondingly depending on whether the field is an array,
  52. an object, or simple field. For each field we group the data by the
  53. grp_fields and reshape it accordingly, the result is a pandas Series.
  54. In the end all the series are collected and concatenated.
  55. '''
  56. from copy import deepcopy
  57. data = self._melt_duplicated_columns(data)
  58. reshaped_fields = []
  59. if schema is None:
  60. schema = self.schema
  61. for field in schema["properties"]:
  62. if field not in self._unroll_nested_names(data.columns):
  63. continue
  64. if field in already_reshaped:
  65. reshaped_field = data.groupby(grp_fields, sort=False)[field]\
  66. .apply(self._make_flattened_list_of_distinct)
  67. reshaped_fields.append(reshaped_field)
  68. else:
  69. field_type = schema["properties"][field]["bsonType"]
  70. # if field has a simple type
  71. if field_type not in ["array", "object"]:
  72. grp_fields = [c for c in grp_fields if c in data.columns]
  73. # check that there is only one possible value of this field
  74. n_distinct_values = data.groupby(grp_fields, sort=False)[field].nunique().max()
  75. # n_distinct_valus can be 0 if the column only contains NaN values
  76. if n_distinct_values > 1:
  77. err = "Field {0} is not unique with respect to {1}"\
  78. .format(field, grp_fields)
  79. self._log.error(err)
  80. raise Exception(err)
  81. if field not in grp_fields:
  82. reshaped_field = data.groupby(grp_fields, sort=False)[field].first()
  83. else:
  84. reshaped_field =\
  85. data[grp_fields].drop_duplicates()\
  86. .set_index(grp_fields, drop=False)[field]
  87. reshaped_fields.append(reshaped_field)
  88. # if field is sub-document (dictionary)
  89. elif field_type == "object":
  90. sub_schema = deepcopy(schema["properties"][field])
  91. # rename sub-schema properties to match with data column names
  92. sub_schema["properties"] =\
  93. {".".join([field, k]): v for k, v
  94. in sub_schema["properties"].items()}
  95. sub_data = self.to_list_of_documents(
  96. data=data,
  97. schema=sub_schema,
  98. grp_fields=grp_fields,
  99. _final_step=False,
  100. already_reshaped=already_reshaped)
  101. # Need to be checked since child elements can be empty
  102. if sub_data is not None:
  103. reshaped_field = sub_data.apply(self._make_dict, axis=1)
  104. reshaped_field.name = field
  105. reshaped_fields.append(reshaped_field)
  106. # if field is a list of dictionaries
  107. elif field_type == "array":
  108. items_type = schema["properties"][field]["items"]["bsonType"]
  109. if items_type == "object":
  110. array_object = time.time()
  111. sub_schema = deepcopy(schema["properties"][field]["items"])
  112. # rename sub-schema properties to match data column names
  113. sub_schema["properties"] =\
  114. {".".join([field, k]): v for k, v in
  115. sub_schema["properties"].items()}
  116. # extend grp fields by sub-fields of field simple types
  117. sub_grp_fields = [f for f in sub_schema["properties"]
  118. if (sub_schema["properties"][f]["bsonType"] not in ["array", "object"])
  119. and (f in data.columns)]
  120. if len(sub_grp_fields) == 0:
  121. err = ("One of the sub-keys in a list of documents"
  122. " must be of simple type for the field {}"
  123. .format(field))
  124. self._log.error(err)
  125. raise Exception(err)
  126. # group and reshape sub-fields with complex types
  127. sub_data = self.to_list_of_documents(
  128. data=data,
  129. schema=sub_schema,
  130. grp_fields=grp_fields + sub_grp_fields,
  131. _final_step=False,
  132. already_reshaped=already_reshaped)
  133. if sub_data is not None:
  134. # gether the results into a list of dictionaries
  135. sub_data = sub_data.apply(self._make_dict, axis=1)
  136. sub_data.name = field
  137. sub_data = sub_data.reset_index(grp_fields)
  138. ######################################################
  139. ######## OPTIMIZATIONS MAY BE POSSIBLE HERE ##########
  140. reshaped_field =\
  141. sub_data.groupby(grp_fields, sort=False)[field]\
  142. .apply(self._make_list_of_distinct)
  143. ######################################################
  144. reshaped_fields.append(reshaped_field)
  145. # if field is a list of values with simple type
  146. elif items_type == "array":
  147. grp_fields = [c for c in grp_fields if c in data.columns]
  148. if field in data.columns:
  149. reshaped_field = data.groupby(grp_fields, sort=False)[field]\
  150. .apply(self._make_list_of_distinct)
  151. reshaped_fields.append(reshaped_field)
  152. else:
  153. grp_fields = [c for c in grp_fields if c in data.columns]
  154. if field in data.columns:
  155. reshaped_field = data.groupby(grp_fields, sort=False)[field]\
  156. .apply(self._make_flattened_list_of_distinct)
  157. reshaped_fields.append(reshaped_field)
  158. if len(reshaped_fields) > 0:
  159. reshaped_fields = pd.concat(reshaped_fields, sort=False, axis=1)
  160. if _final_step:
  161. # dropping the index names if it is the final step,
  162. # if not the index is needed for merging
  163. reshaped_fields =\
  164. reshaped_fields.drop(list(reshaped_fields.index.names), axis=1, errors="ignore")\
  165. .reset_index(drop=False)
  166. self._log.info("Done reshaping the dataframe to a list of documents")
  167. return reshaped_fields
  168. else:
  169. return
  170. def _melt_duplicated_columns(self, data: pd.DataFrame) -> pd.DataFrame:
  171. '''
  172. '''
  173. data = data.copy(deep=True)
  174. for c in set(data.columns):
  175. if isinstance(data[c], pd.DataFrame):
  176. """
  177. data = pd.melt(data, id_vars=[cc for cc in data.columns
  178. if cc != c], value_vars=c)\
  179. .drop("variable", axis=1)\
  180. .rename(columns={"value": c})
  181. """
  182. data["temp"] = data[c].apply(self._make_list, axis=1)
  183. data.drop(c, axis=1, inplace=True)
  184. data = data.rename(columns={"temp": c})
  185. return data
  186. def _make_dict(self, x: pd.Series) -> dict:
  187. '''
  188. Transforms pandas series to a dictionary
  189. is meant to be applied to a dataframe in axis = 1,
  190. then the index of the input series are the column names
  191. of the dataframe
  192. '''
  193. def custom_is_null(y):
  194. if isinstance(pd.notnull(y), bool):
  195. return pd.notnull(y)
  196. else:
  197. return True
  198. return {f.split(".")[-1]: x[f] for f in x.index
  199. if custom_is_null(x[f])}
  200. def _make_list(self, x: pd.Series) -> list:
  201. '''
  202. return: list of values in a series
  203. '''
  204. return list(x)
  205. def _make_list_of_distinct(self, x: pd.Series) -> list:
  206. '''
  207. return: list of unique values from a Series where
  208. entries are arbitrary objects
  209. (pandas unique() method does not work if entries are of complex types)
  210. '''
  211. if x.size == 1:
  212. uniques = x.tolist()
  213. '''
  214. if return_value == [{}]:
  215. return []
  216. return return_value
  217. '''
  218. else:
  219. uniques = pd.DataFrame({"temp": x.values})\
  220. .assign(temp_str=lambda y: y["temp"].astype(np.str))\
  221. .drop_duplicates(subset=["temp_str"])\
  222. .drop("temp_str", axis=1).iloc[:, 0].tolist()
  223. def is_empty(y):
  224. is_empty_dict = (isinstance(y, dict) and (len(y) == 0))
  225. is_empty_list = (isinstance(y, list) and (len(y) == 0))
  226. return is_empty_dict or is_empty_list
  227. return [el for el in uniques if not is_empty(el)]
  228. def _make_flattened_list_of_distinct(self, x: pd.Series) -> list:
  229. '''
  230. return: list of unique values from a Series where
  231. entries are arbitrary objects
  232. (pandas unique() method does not work if entries are of complex types)
  233. '''
  234. uniques = self._make_list_of_distinct(x)
  235. if len(uniques) > 0:
  236. return uniques[0]
  237. else:
  238. return None
  239. def _unroll_nested_names(self, names: list) -> list:
  240. '''
  241. Example: transform a list ["name.firstname", "name.surname"]
  242. into ["name", "name.firstname", "name.surname"]
  243. '''
  244. unrolled = []
  245. for c in names:
  246. splitted = c.split(".")
  247. for i in range(len(splitted)):
  248. unrolled.append(".".join(splitted[:i+1]))
  249. return unrolled
  250. if __name__ == "__main__":
  251. # Testing
  252. df = pd.DataFrame({
  253. "a": [1]*8 + [2]*8,
  254. "b": [10]*8 + [20]*8,
  255. "c": [100, 200]*8,
  256. "d.da": [11]*8 + [22]*8,
  257. "d.db": [33]*8 + [34]*8,
  258. "e.ea.eaa": [5]*8 + [55]*8,
  259. "e.ea.eab": [6]*8 + [66]*8,
  260. "e.eb": [2, 2, 3, 3]*4,
  261. "e.ec.eca": [1, 2, 3, 4]*4,
  262. "e.ec.ecb": [5, 6, 7, 8]*4,
  263. "f.fa": [1]*4 + [3]*4 + [11]*4 + [33]*4,
  264. "f.fb": [2]*4 + [3]*2 + [4]*2 + [22]*4 + [44]*4})
  265. duplicate = pd.DataFrame({"c": [300, 400]*8})
  266. df = pd.concat([df, duplicate], axis=1)
  267. schm = {
  268. "bsonType": "object",
  269. "required": ["a"],
  270. "properties": {
  271. "a": {"bsonType": "integer"},
  272. "b": {"bsonType": "integer"},
  273. "c": {
  274. "bsonType": "array",
  275. "items": {"bsonType": "integer"}
  276. },
  277. "d": {
  278. "bsonType": "object",
  279. "properties": {
  280. "da": {"bsonType": "integer"},
  281. "db": {"bsonType": "integer"}
  282. }
  283. },
  284. "e": {
  285. "bsonType": "object",
  286. "properties": {
  287. "ea": {
  288. "bsonType": "object",
  289. "properties": {
  290. "eaa": {"bsonType": "integer"},
  291. "eab": {"bsonType": "integer"}
  292. }
  293. },
  294. "eb": {
  295. "bsonType": "array",
  296. "items": {"bsonType": "integer"}
  297. },
  298. "ec": {
  299. "bsonType": "array",
  300. "items": {
  301. "bsonType": "object",
  302. "properties": {
  303. "eca": {"bsonType": "integer"},
  304. "ecb": {"bsonType": "integer"}
  305. }
  306. }
  307. }
  308. }
  309. },
  310. "f": {
  311. "bsonType": "array",
  312. "items": {
  313. "bsonType": "object",
  314. "properties": {
  315. "fa": {"bsonType": "integer"},
  316. "fb": {
  317. "bsonType": "array",
  318. "items": {"bsonType": "integer"}
  319. }
  320. }
  321. }
  322. }
  323. }
  324. }
  325. grp_fields = ["a"]
  326. result = DataFrameToCollection().to_list_of_documents(
  327. data=df,
  328. schema=schm,
  329. grp_fields=grp_fields)