MigrationCleaning.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Wed Sep 25 08:09:52 2019
  5. @author: tanya
  6. """
  7. import os
  8. import sys
  9. import pandas as pd
  10. import numpy as np
  11. import gc
  12. sys.path.append(os.getcwd())
  13. from cdplib.db_migration.ParseMapping import ParseMapping
  14. from cdplib.db_migration.ParseJsonSchema import ParseJsonSchema
  15. from cdplib.utils.ExceptionsHandler import ExceptionsHandler
  16. from cdplib.utils.CleaningUtils import CleaningUtils
  17. from cdplib.log import Log
  18. import json
  19. from boltons.iterutils import remap
  20. class MigrationCleaning:
  21. '''
  22. Class for correcting and filtering the incorrect data.
  23. We keep the correcting and the filtering methods separated,
  24. since there might be other custom steps in between.
  25. '''
  26. def __init__(self, mapping_paths: (str, list),
  27. schema_paths: (str, list),
  28. inconsist_report_table: str = None,
  29. filter_index_columns: (str, list) = None,
  30. mapping_source: str = "internal_name",
  31. mapping_target: str = "mongo_name",
  32. mapping_parser: type = ParseMapping,
  33. schema_parser: type = ParseJsonSchema):
  34. '''
  35. '''
  36. self.log = Log('Migration Cleaning')
  37. self._exception_handler = ExceptionsHandler()
  38. assert isinstance(inconsist_report_table, str),\
  39. "Inconsistent report table should be a tablename string"
  40. self._inconsist_report_table = inconsist_report_table
  41. assert isinstance(filter_index_columns, (str, list)),\
  42. "Filter index columns must be a str or a list"
  43. self._filter_index_columns = list(filter_index_columns)
  44. self._schema_parser = schema_parser(schema_paths)
  45. self._mapping_parser = mapping_parser(mapping_paths,
  46. source=mapping_source,
  47. target=mapping_target)
  48. self._mapping_paths = mapping_paths
  49. self._schema_paths = schema_paths
  50. from cdplib.db_handlers.SQLHandler import SQLHandler
  51. self._sql_db = SQLHandler()
  52. def _assert_dataframe_input(self, data: pd.DataFrame):
  53. '''
  54. '''
  55. assert(isinstance(data, pd.DataFrame)),\
  56. "Parameter 'data' must be a pandas dataframe"
  57. @property
  58. def _field_mapping(self):
  59. '''
  60. '''
  61. return self._mapping_parser.get_field_mapping()
  62. @property
  63. def _required_fields(self):
  64. '''
  65. '''
  66. source_required_fields = self._mapping_parser.get_required_fields()
  67. target_required_fields = self._schema_parser.get_required_fields()
  68. for source_field, target_field in self._field_mapping.items():
  69. if (target_field in target_required_fields) and\
  70. (source_field not in source_required_fields):
  71. source_required_fields.append(source_field)
  72. return source_required_fields
  73. @property
  74. def _default_values(self):
  75. '''
  76. Returns a dictonary in which the default values of the mongo schema
  77. are mapped to the default values of the migration mapping. In migration
  78. mapping the default values should be specified as the values which
  79. doesn't contain any information and can be seen therefore as an empty
  80. value.
  81. '''
  82. default_values = {}
  83. target_default_values = self._schema_parser.get_default_values()
  84. source_default_values = self._mapping_parser.get_default_values()
  85. for source_field, target_field in self._field_mapping.items():
  86. if source_field not in source_default_values:
  87. continue
  88. elif target_field not in target_default_values:
  89. target_default_values[target_field] = np.nan
  90. default_values[source_field] = {
  91. target_default_values[target_field]:
  92. source_default_values[source_field]
  93. }
  94. return default_values
  95. @property
  96. def _python_types(self):
  97. '''
  98. '''
  99. target_types = self._schema_parser.get_python_types()
  100. result = {}
  101. for source_field, target_field in self._field_mapping.items():
  102. if target_field in target_types:
  103. result[source_field] = target_types[target_field]
  104. """
  105. date_type_mismatch =\
  106. (target_field in target_types) and\
  107. (source_field in source_types) and\
  108. (target_types[target_field] == str) and\
  109. (source_types[source_field] == np.dtype('<M8[ns]'))
  110. if date_type_mismatch:
  111. target_types[target_field] = np.dtype('<M8[ns]')
  112. if (source_field in source_types) and\
  113. (target_field in target_types) and\
  114. (target_types[target_field] != source_types[source_field]):
  115. self.log.log_and_raise_error(("Type {0} of field {1} "
  116. "in schema does not match "
  117. "type {2} of field {3} in "
  118. "migration mapping")
  119. .format(target_types[target_field],
  120. target_field,
  121. source_types[source_field],
  122. source_field))
  123. if target_field in target_types:
  124. source_types[source_field] = target_types[target_field]
  125. """
  126. return result
  127. @property
  128. def _value_mappings(self):
  129. '''
  130. '''
  131. return self._mapping_parser.get_value_mappings()
  132. @property
  133. def _date_formats(self):
  134. '''
  135. '''
  136. return self._mapping_parser.get_date_formats()
  137. def _get_mongo_schema_info(self, method_name: str):
  138. '''
  139. '''
  140. result = {}
  141. target_dict = getattr(self._schema_parser, method_name)()
  142. for source_field, target_field in self._field_mapping.items():
  143. if target_field in target_dict:
  144. result[source_field] = target_dict[target_field]
  145. return result
  146. @property
  147. def _allowed_values(self):
  148. '''
  149. '''
  150. return self._get_mongo_schema_info("get_allowed_values")
  151. @property
  152. def _minimum_values(self):
  153. '''
  154. '''
  155. return self._get_mongo_schema_info("get_minimum_value")
  156. @property
  157. def _maximum_values(self):
  158. '''
  159. '''
  160. return self._get_mongo_schema_info("get_maximum_value")
  161. @property
  162. def _patterns(self):
  163. '''
  164. '''
  165. return self._get_mongo_schema_info("get_patterns")
  166. def _filter_invalid_data(self, data: pd.DataFrame,
  167. invalid_mask: pd.Series,
  168. reason: (str, pd.Series)) -> pd.DataFrame:
  169. '''
  170. '''
  171. assert((self._inconsist_report_table is not None) and
  172. (self._filter_index_columns is not None)),\
  173. "Inconsistent report table or filter index is not provided"
  174. self._assert_dataframe_input(data)
  175. data = data.copy(deep=True)
  176. db = self._sql_db
  177. if invalid_mask.sum() == 0:
  178. return data
  179. data_inconsist = data.assign(reason=reason)\
  180. .loc[invalid_mask]\
  181. .reset_index(drop=True)
  182. if db.check_if_table_exists(self._inconsist_report_table):
  183. columns = db.get_column_names(tablename=self._inconsist_report_table)
  184. if len(columns) > 0:
  185. columns_not_in_data = [column for column in columns if column not in data.columns]
  186. for value in columns_not_in_data:
  187. data_inconsist[value] = 'Column does not exist in the mongo database and has therefore been dropped'
  188. data_inconsist = data_inconsist[columns]
  189. db.append_to_table(data=data_inconsist,
  190. tablename=self._inconsist_report_table)
  191. n_rows_filtered = len(data_inconsist)
  192. n_instances_filtered = len(data_inconsist[self._filter_index_columns].drop_duplicates())
  193. del data_inconsist
  194. gc.collect()
  195. self.log.warning(("Filtering: {0} ."
  196. "Filtered {1} rows "
  197. "and {2} instances"
  198. .format(reason, n_rows_filtered, n_instances_filtered)))
  199. nok_index_data = data.loc[invalid_mask, self._filter_index_columns]\
  200. .drop_duplicates().reset_index(drop=True)
  201. nok_index = pd.MultiIndex.from_arrays([nok_index_data[c] for c in
  202. self._filter_index_columns])
  203. all_index = pd.MultiIndex.from_arrays([data[c] for c in
  204. self._filter_index_columns])
  205. data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True)
  206. return data
  207. def _replace_values(self, data: pd.DataFrame,
  208. default: bool) -> pd.DataFrame:
  209. '''
  210. '''
  211. if default:
  212. default_str = "default"
  213. else:
  214. default_str = "equal"
  215. self._assert_dataframe_input(data)
  216. data = data.copy(deep=True)
  217. if default:
  218. mapping = self._default_values
  219. else:
  220. mapping = self._value_mappings
  221. for column, d in mapping.items():
  222. try:
  223. if column not in data.columns:
  224. continue
  225. dtype = data[column].dtype
  226. for key, values in d.items():
  227. if not default:
  228. mask = (data[column].astype(str).isin(values))
  229. else:
  230. mask = (data[column].isin(values))
  231. if default:
  232. mask = mask | (data[column].isnull())
  233. data.loc[mask, column] = key
  234. data[column] = data[column].astype(dtype)
  235. except Exception as e:
  236. self.log.log_and_raise_error(("Failed to replace {0} values "
  237. "in {1}. Exit with error {2}"
  238. .format(default_str, column, e)))
  239. self.log.info("Replaced {} values".format(default_str))
  240. return data
  241. def replace_default_values(self, data: pd.DataFrame) -> pd.DataFrame:
  242. '''
  243. '''
  244. return self._replace_values(data=data, default=True)
  245. def map_equal_values(self, data: pd.DataFrame) -> pd.DataFrame:
  246. '''
  247. '''
  248. return self._replace_values(data=data, default=False)
  249. def convert_types(self, data: pd.DataFrame) -> pd.DataFrame:
  250. '''
  251. '''
  252. self._assert_dataframe_input(data)
  253. for column, python_type in self._python_types.items():
  254. try:
  255. if column not in data.columns:
  256. continue
  257. elif column in self._date_formats:
  258. data[column] = CleaningUtils.convert_dates(
  259. series=data[column],
  260. formats=self._date_formats[column])
  261. elif (python_type == int) and data[column].isnull().any():
  262. self.log.log_and_raise_error(("Column {} contains missing values "
  263. "and cannot be of integer type"
  264. .format(column)))
  265. elif python_type == bool:
  266. accepted_bool = {'ja': True, 'j': True, '1': True, 1: True,
  267. 'yes': True, 'y': True, 'true':True,
  268. 't': True, 'nein': False, 'n': False,
  269. 'no': False, 'false': False, 'f': False,
  270. '0': False, 0: False}
  271. data[column] = data[column].map(accepted_bool)
  272. data[column] = data[column].astype(bool)
  273. elif python_type == str:
  274. # might not be the smoothes solution but it works
  275. python_type = str
  276. data = data.copy(deep=True)
  277. data[column] = data[column].astype(python_type)
  278. python_type = object
  279. data[column] = data[column].astype(python_type)
  280. elif python_type == float:
  281. data = data.fillna(np.inf)
  282. data[column] = data[column].astype(python_type)
  283. else:
  284. data = data.copy(deep=True)
  285. data[column] = data[column].astype(python_type)
  286. if data[column].dtype != python_type:
  287. self.log.warning(("After conversion type in {0} "
  288. "should be {1} "
  289. "but is still {2}"
  290. .format(column,
  291. python_type,
  292. data[column].dtype)))
  293. except Exception as e:
  294. self.log.log_and_raise_error(("Failed to convert types in {0}. "
  295. "Exit with error {1}"
  296. .format(column, e)))
  297. self.log.info("Converted dtypes")
  298. return data
  299. def filter_invalid_missing_values(self, data: pd.DataFrame) -> pd.DataFrame:
  300. '''
  301. '''
  302. self._assert_dataframe_input(data)
  303. for column in data.columns:
  304. if (column in self._required_fields) and\
  305. (data[column].isnull().any()):
  306. invalid_mask = data[column].isnull()
  307. reason = "Null value in the required field {}"\
  308. .format(column)
  309. data = self._filter_invalid_data(data=data,
  310. invalid_mask=invalid_mask,
  311. reason=reason)
  312. return data
  313. def filter_invalid_types(self, data: pd.DataFrame) -> pd.DataFrame():
  314. '''
  315. '''
  316. self._assert_dataframe_input(data)
  317. for column in data.columns:
  318. if column not in self._python_types:
  319. continue
  320. python_type = self._python_types[column]
  321. #Needs to be done since coumn dtype of strings is a object
  322. if python_type == str:
  323. python_type = object
  324. if data[column].dtype != python_type:
  325. def mismatch_type(x):
  326. return type(x) != python_type
  327. invalid_mask = data[column].apply(mismatch_type)
  328. reason = "Type mismatch in field {}".format(column)
  329. data = self._filter_invalid_data(data=data,
  330. invalid_mask=invalid_mask,
  331. reason=reason)
  332. return data
  333. def filter_invalid_patterns(self, data: pd.DataFrame) -> pd.DataFrame:
  334. '''
  335. '''
  336. self._assert_dataframe_input(data)
  337. for column in data.columns:
  338. if column not in self._patterns:
  339. continue
  340. pattern = self._patterns[column]
  341. invalid_mask = (~data[column].astype(str).str.match(pattern))
  342. reason = "Pattern mismatch in field {0}. Pattern: {1}Example: {2}"\
  343. .format(column,pattern,data.iloc[0][column])
  344. data = self._filter_invalid_data(data=data,
  345. invalid_mask=invalid_mask,
  346. reason=reason)
  347. return data
  348. def filter_invalid_values(self, data: pd.DataFrame) -> pd.DataFrame:
  349. '''
  350. '''
  351. for column in data.columns:
  352. if column in self._minimum_values:
  353. min_value = self._minimum_values[column]
  354. invalid_mask = data[column] > min_value
  355. reason = "Too large values in field {}".format(column)
  356. data = self._filter_invalid_data(data=data,
  357. invalid_mask=invalid_mask,
  358. reason=reason)
  359. elif column in self._maximum_values:
  360. max_value = self._maximum_values[column]
  361. invalid_mask = data[column] < max_value
  362. reason = "Too small values in field {}".format(column)
  363. data = self._filter_invalid_data(data=data,
  364. invalid_mask=invalid_mask,
  365. reason=reason)
  366. elif column in self._allowed_values:
  367. allowed_values = self._allowed_values[column]
  368. invalid_mask = (~data[column].isin(allowed_values))
  369. not_allowed_examples = data.loc[invalid_mask, column].unique()[:3]
  370. reason = "Not allowed values {0}... in field {1}"\
  371. .format(not_allowed_examples, column)
  372. data = self._filter_invalid_data(data=data,
  373. invalid_mask=invalid_mask,
  374. reason=reason)
  375. else:
  376. continue
  377. return data
  378. def drop_columns_with_no_content(self, data: pd.DataFrame) -> pd.DataFrame():
  379. '''
  380. '''
  381. data = data.dropna(how ='all', axis='columns')
  382. for column in data.columns:
  383. unique_values = data[column].unique()
  384. no_content_signs = [None, '-', 'n.a']
  385. intersection = list(set(unique_values) & set(no_content_signs))
  386. if len(intersection) - len(unique_values) == 0:
  387. data = data.drop(columns=[column])
  388. return data
  389. def clean_json_from_None_object(self, data: pd.DataFrame) -> pd.DataFrame():
  390. data = data.to_json(date_format="iso")
  391. data = json.loads(data)
  392. new_data = remap(data, lambda p, k, v: v is not None)
  393. new_data = remap(new_data, lambda p, k, v: v != 'None')
  394. new_data = remap(new_data, lambda p, k, v: v != 'inf')
  395. new_data = remap(new_data, lambda p, k, v: (isinstance(v,bool) or (not isinstance(v,bool) and bool(v))))
  396. return new_data
  397. def restrict_to_collection(self, data: pd.DataFrame, collection_name: str) -> pd.DataFrame:
  398. '''
  399. '''
  400. mongo_fields = self._schema_parser.get_fields_restricted_to_collection(collection_name=collection_name)
  401. mapping_fields = self._mapping_parser.get_fields_restricted_to_collection(collection_name=collection_name)
  402. return data[[c for c in data.columns if (c in mapping_fields) or (c in mongo_fields)]]
  403. if __name__ == "__main__":
  404. # testing
  405. from cdplib.db_handlers.SQLHandler import SQLHandler
  406. mapping_path = os.path.join(".", "migration_mappings", "rs1_mapping.json")
  407. schema_paths = [
  408. os.path.join(".", "mongo_schema", "schema_wheelsets.json"),
  409. os.path.join(".", "mongo_schema", "schema_process_instances.json")]
  410. inconsist_report_table = "test_inconsist_report_rs1"
  411. if all([os.path.isfile(p) for p in schema_paths + [mapping_path]]):
  412. cleaner = MigrationCleaning(
  413. mapping_paths=mapping_path,
  414. schema_paths=schema_paths,
  415. mapping_source="internal_name",
  416. mapping_target="mongo_name",
  417. filter_index_columns=["radsatznummer"],
  418. inconsist_report_table=inconsist_report_table)
  419. db = SQLHandler()
  420. data = db.read_sql_to_dataframe("select * from rs1 limit 100")
  421. data = cleaner.replace_default_values(data)
  422. data = cleaner.map_equal_values(data)
  423. data = cleaner.convert_types(data)
  424. non_filtered_len = len(data)
  425. data = cleaner.filter_invalid_types(data)
  426. if len(data) < non_filtered_len:
  427. data = cleaner.convert_types(data)
  428. data = cleaner.filter_invalid_null_values(data)
  429. data = cleaner.filter_invalid_patterns(data)
  430. data = cleaner.filter_notallowed_values(data)
  431. print("Done!")