DataFrameToCollection.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Mon Jul 22 11:05:47 2019
  5. @author: tanya
  6. @description: a function to reshape a pandas dataframe to a list of
  7. (possibly nested) documents with respect to a (json) mongodb schema
  8. """
  9. import pandas as pd
  10. import os
  11. import sys
  12. sys.path.append(os.getcwd())
  13. class DataFrameToCollection():
  14. '''
  15. '''
  16. def __init__(self, schema_path: str):
  17. '''
  18. '''
  19. from cdplib.log import Log
  20. import json
  21. self._log = Log("ParseJsonSchema")
  22. if not os.path.isfile(schema_path):
  23. err = "JsonSchema not found"
  24. self._log.error(err)
  25. raise FileNotFoundError(err)
  26. # load schema to dictionary if it is a valid json file
  27. try:
  28. with open(schema_path, "r") as f:
  29. self.schema = json.load(f)
  30. except Exception as e:
  31. err = ("Could not load json schema, "
  32. "Obtained error {}".format(e))
  33. self._log.error(err)
  34. raise Exception(err)
  35. def to_list_of_documents(self, data: pd.DataFrame,
  36. grp_fields: list,
  37. schema: dict = None,
  38. _final_step: bool = True) -> list:
  39. '''
  40. Reshapes a pandas dataframe to a list of documents according
  41. to a complex (json) mongodb schema
  42. Remark1: column names of data need to reflect the "nestedness"
  43. of the field in the mongodb schema with the help of a "." separator
  44. Example: field.sub_field_1, field.sub_field_2
  45. Remark2: if the schema is stored as a json file, first load it
  46. to a dictionary with the help of the python json module
  47. The function goes recurisively through all the fields and reshapes
  48. them correspondingly depending on whether the field is an array,
  49. an object, or simple field. For each field we group the data by the
  50. grp_fields and reshape it accordingly, the result is a pandas Series.
  51. In the end all the series are collected and concatenated.
  52. '''
  53. from copy import deepcopy
  54. data = self._melt_duplicated_columns(data)
  55. reshaped_fields = []
  56. if schema is None:
  57. schema = self.schema
  58. for field in schema["properties"]:
  59. if field not in self._unroll_nested_names(data.columns):
  60. continue
  61. field_type = schema["properties"][field]["bsonType"]
  62. # if field has a simple type
  63. if field_type not in ["array", "object"]:
  64. grp_fields = [c for c in grp_fields if c in data.columns]
  65. # check that there is only one possible value of this field
  66. n_distinct_values = data.groupby(grp_fields, sort=False)[field].nunique().max()
  67. #n_distinct_valus can be 0 if the column only contains NaN values
  68. if n_distinct_values > 1:
  69. err = "Field {0} is not unique with respect to {1}"\
  70. .format(field, grp_fields)
  71. self._log.error(err)
  72. raise Exception(err)
  73. if field not in grp_fields:
  74. reshaped_field = data.groupby(grp_fields, sort=False)[field].first()
  75. else:
  76. reshaped_field =\
  77. data[grp_fields].drop_duplicates()\
  78. .set_index(grp_fields, drop=False)[field]
  79. reshaped_fields.append(reshaped_field)
  80. # if field is sub-document (dictionary)
  81. elif field_type == "object":
  82. sub_schema = deepcopy(schema["properties"][field])
  83. # rename sub-schema properties to match with data column names
  84. sub_schema["properties"] =\
  85. {".".join([field, k]): v for k, v
  86. in sub_schema["properties"].items()}
  87. sub_data = self.to_list_of_documents(
  88. data=data,
  89. schema=sub_schema,
  90. grp_fields=grp_fields,
  91. _final_step=False)
  92. # Need to be checked since child elements can be empty
  93. if sub_data is not None:
  94. reshaped_field = sub_data.apply(self._make_dict, axis=1)
  95. reshaped_field.name = field
  96. reshaped_fields.append(reshaped_field)
  97. # if field is a list of dictionaries
  98. elif field_type == "array":
  99. items_type = schema["properties"][field]["items"]["bsonType"]
  100. if items_type == "object":
  101. sub_schema = deepcopy(schema["properties"][field]["items"])
  102. # rename sub-schema properties to match data column names
  103. sub_schema["properties"] =\
  104. {".".join([field, k]): v for k, v in
  105. sub_schema["properties"].items()}
  106. # extend grp fields by sub-fields of field simple types
  107. sub_grp_fields = [f for f in sub_schema["properties"]
  108. if (sub_schema["properties"][f]["bsonType"] not in ["array", "object"])
  109. and (f in data.columns)]
  110. if len(sub_grp_fields) == 0:
  111. err = ("One of the sub-keys in a list of documents"
  112. " must be of simple type for the field {}"
  113. .format(field))
  114. self._log.error(err)
  115. raise Exception(err)
  116. # group and reshape sub-fields with complex types
  117. sub_data = self.to_list_of_documents(
  118. data=data,
  119. schema=sub_schema,
  120. grp_fields=grp_fields + sub_grp_fields,
  121. _final_step=False)
  122. if sub_data is not None:
  123. # gether the results into a list of dictionaries
  124. sub_data = sub_data.apply(self._make_dict, axis=1)
  125. sub_data.name = field
  126. sub_data = sub_data.reset_index(grp_fields)
  127. reshaped_field =\
  128. sub_data.groupby(grp_fields, sort=False)[field]\
  129. .apply(self._make_list_of_distinct)
  130. reshaped_fields.append(reshaped_field)
  131. # if field is a list of values with simple type
  132. elif items_type == "array":
  133. grp_fields = [c for c in grp_fields if c in data.columns]
  134. if field in data.columns:
  135. reshaped_field = data.groupby(grp_fields, sort=False)[field]\
  136. .apply(self._make_list_of_distinct)
  137. reshaped_fields.append(reshaped_field)
  138. else:
  139. grp_fields = [c for c in grp_fields if c in data.columns]
  140. if field in data.columns:
  141. reshaped_field = data.groupby(grp_fields, sort=False)[field]\
  142. .apply(self._make_flattened_list_of_distinct)
  143. reshaped_fields.append(reshaped_field)
  144. if len(reshaped_fields) > 0:
  145. reshaped_fields = pd.concat(reshaped_fields, sort=False, axis=1)
  146. if _final_step:
  147. # dropping the index names if it is the final step,
  148. # if not the index is needed for merging
  149. reshaped_fields =\
  150. reshaped_fields.drop(list(reshaped_fields.index.names), axis=1, errors="ignore")\
  151. .reset_index(drop=False)
  152. self._log.info("Done reshaping the dataframe to a list of documents")
  153. return reshaped_fields
  154. else:
  155. return
  156. def _melt_duplicated_columns(self, data: pd.DataFrame) -> pd.DataFrame:
  157. '''
  158. '''
  159. data = data.copy(deep=True)
  160. for c in set(data.columns):
  161. if isinstance(data[c], pd.DataFrame):
  162. """
  163. data = pd.melt(data, id_vars=[cc for cc in data.columns
  164. if cc != c], value_vars=c)\
  165. .drop("variable", axis=1)\
  166. .rename(columns={"value": c})
  167. """
  168. data["temp"] = data[c].apply(self._make_list, axis=1)
  169. data.drop(c, axis=1, inplace=True)
  170. data = data.rename(columns={"temp": c})
  171. return data
  172. def _make_dict(self, x: pd.Series) -> dict:
  173. '''
  174. Transforms pandas series to a dictionary
  175. is meant to be applied to a dataframe in axis = 1,
  176. then the index of the input series are the column names
  177. of the dataframe
  178. '''
  179. def custom_is_null(y):
  180. if isinstance(pd.notnull(y), bool):
  181. return pd.notnull(y)
  182. else:
  183. return True
  184. return {f.split(".")[-1]: x[f] for f in x.index
  185. if custom_is_null(x[f])}
  186. def _make_list(self, x: pd.Series) -> list:
  187. '''
  188. return: list of values in a series
  189. '''
  190. return list(x)
  191. def _make_list_of_distinct(self, x: pd.Series) -> list:
  192. '''
  193. return: list of unique values from a Series where
  194. entries are arbitrary objects
  195. (pandas unique() method does not work if entries are of complex types)
  196. '''
  197. uniques = pd.DataFrame({"temp": x.tolist()})\
  198. .assign(temp_str=lambda y: y["temp"].astype(str))\
  199. .drop_duplicates(subset=["temp_str"])\
  200. .drop("temp_str", axis=1).iloc[:, 0].tolist()
  201. def is_empty(y):
  202. is_empty_dict = (isinstance(y, dict) and (len(y) == 0))
  203. is_empty_list = (isinstance(y, list) and (len(y) == 0))
  204. return is_empty_dict or is_empty_list
  205. return [el for el in uniques if not is_empty(el)]
  206. def _make_flattened_list_of_distinct(self, x: pd.Series) -> list:
  207. '''
  208. return: list of unique values from a Series where
  209. entries are arbitrary objects
  210. (pandas unique() method does not work if entries are of complex types)
  211. '''
  212. uniques = self._make_list_of_distinct(x)
  213. return uniques[0]
  214. def _unroll_nested_names(self, names: list) -> list:
  215. '''
  216. Example: transform a list ["name.firstname", "name.surname"]
  217. into ["name", "name.firstname", "name.surname"]
  218. '''
  219. unrolled = []
  220. for c in names:
  221. splitted = c.split(".")
  222. for i in range(len(splitted)):
  223. unrolled.append(".".join(splitted[:i+1]))
  224. return unrolled
  225. if __name__ == "__main__":
  226. # Testing
  227. df = pd.DataFrame({
  228. "a": [1]*8 + [2]*8,
  229. "b": [10]*8 + [20]*8,
  230. "c": [100, 200]*8,
  231. "d.da": [11]*8 + [22]*8,
  232. "d.db": [33]*8 + [34]*8,
  233. "e.ea.eaa": [5]*8 + [55]*8,
  234. "e.ea.eab": [6]*8 + [66]*8,
  235. "e.eb": [2, 2, 3, 3]*4,
  236. "e.ec.eca": [1, 2, 3, 4]*4,
  237. "e.ec.ecb": [5, 6, 7, 8]*4,
  238. "f.fa": [1]*4 + [3]*4 + [11]*4 + [33]*4,
  239. "f.fb": [2]*4 + [3]*2 + [4]*2 + [22]*4 + [44]*4})
  240. duplicate = pd.DataFrame({"c": [300, 400]*8})
  241. df = pd.concat([df, duplicate], axis=1)
  242. schm = {
  243. "bsonType": "object",
  244. "required": ["a"],
  245. "properties": {
  246. "a": {"bsonType": "integer"},
  247. "b": {"bsonType": "integer"},
  248. "c": {
  249. "bsonType": "array",
  250. "items": {"bsonType": "integer"}
  251. },
  252. "d": {
  253. "bsonType": "object",
  254. "properties": {
  255. "da": {"bsonType": "integer"},
  256. "db": {"bsonType": "integer"}
  257. }
  258. },
  259. "e": {
  260. "bsonType": "object",
  261. "properties": {
  262. "ea": {
  263. "bsonType": "object",
  264. "properties": {
  265. "eaa": {"bsonType": "integer"},
  266. "eab": {"bsonType": "integer"}
  267. }
  268. },
  269. "eb": {
  270. "bsonType": "array",
  271. "items": {"bsonType": "integer"}
  272. },
  273. "ec": {
  274. "bsonType": "array",
  275. "items": {
  276. "bsonType": "object",
  277. "properties": {
  278. "eca": {"bsonType": "integer"},
  279. "ecb": {"bsonType": "integer"}
  280. }
  281. }
  282. }
  283. }
  284. },
  285. "f": {
  286. "bsonType": "array",
  287. "items": {
  288. "bsonType": "object",
  289. "properties": {
  290. "fa": {"bsonType": "integer"},
  291. "fb": {
  292. "bsonType": "array",
  293. "items": {"bsonType": "integer"}
  294. }
  295. }
  296. }
  297. }
  298. }
  299. }
  300. grp_fields = ["a"]
  301. result = DataFrameToCollection().to_list_of_documents(
  302. data=df,
  303. schema=schm,
  304. grp_fields=grp_fields)