MigrationCleaning.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Wed Sep 25 08:09:52 2019
  5. @author: tanya
  6. """
  7. import os
  8. import sys
  9. import pandas as pd
  10. import numpy as np
  11. import gc
  12. sys.path.append(os.getcwd())
  13. from libraries.db_migration.ParseMapping import ParseMapping
  14. from libraries.db_migration.ParseJsonSchema import ParseJsonSchema
  15. from libraries.utils.ExceptionsHandler import ExceptionsHandler
  16. from libraries.utils.CleaningUtils import CleaningUtils
  17. from libraries.log import Log
  18. class MigrationCleaning:
  19. '''
  20. Class for correcting and filtering the incorrect data.
  21. We keep the correcting and the filtering methods separated,
  22. since there might be other custom steps in between.
  23. '''
  24. def __init__(self, mapping_path: str,
  25. schema_paths: (str, list),
  26. inconsist_report_table: str = None,
  27. filter_index_columns: (str, list) = None,
  28. mapping_source: str = "internal_name",
  29. mapping_target: str = "mongo_name",
  30. mapping_parser: type = ParseMapping,
  31. schema_parser: type = ParseJsonSchema):
  32. '''
  33. '''
  34. self._log = Log('Migration Cleaning')
  35. self._exception_handler = ExceptionsHandler()
  36. assert isinstance(inconsist_report_table, str),\
  37. "Inconsistent report table should be a tablename string"
  38. self._inconsist_report_table = inconsist_report_table
  39. assert isinstance(filter_index_columns, (str, list)),\
  40. "Filter index columns must be a str or a list"
  41. self._filter_index_columns = list(filter_index_columns)
  42. self._schema_parser = schema_parser(schema_paths)
  43. self._mapping_parser = mapping_parser(mapping_path,
  44. source=mapping_source,
  45. target=mapping_target)
  46. self._mapping_path = mapping_path
  47. self._schema_paths = schema_paths
  48. from libraries.db_handlers.SQLHandler import SQLHandlerPool
  49. self._sql_db = SQLHandlerPool(20)
  50. def _assert_dataframe_input(self, data: pd.DataFrame):
  51. '''
  52. '''
  53. assert(isinstance(data, pd.DataFrame)),\
  54. "Parameter 'data' must be a pandas dataframe"
  55. @property
  56. def _field_mapping(self):
  57. '''
  58. '''
  59. return self._mapping_parser.get_field_mapping()
  60. @property
  61. def _required_fields(self):
  62. '''
  63. '''
  64. source_required_fields = self._mapping_parser.get_required_fields()
  65. target_required_fields = self._schema_parser.get_required_fields()
  66. for source_field, target_field in self._field_mapping.items():
  67. if (target_field in target_required_fields) and\
  68. (source_field not in source_required_fields):
  69. source_required_fields.append(source_field)
  70. return source_required_fields
  71. @property
  72. def _default_values(self):
  73. '''
  74. '''
  75. default_values = {}
  76. target_default_values = self._schema_parser.get_default_values()
  77. source_default_values = self._mapping_parser.get_default_values()
  78. for source_field, target_field in self._field_mapping.items():
  79. if source_field not in source_default_values:
  80. continue
  81. elif target_field not in target_default_values:
  82. target_default_values[target_field] = np.nan
  83. default_values[source_field] = {
  84. target_default_values[target_field]:
  85. source_default_values[source_field]
  86. }
  87. return default_values
  88. @property
  89. def _python_types(self):
  90. '''
  91. '''
  92. target_types = self._schema_parser.get_python_types()
  93. result = {}
  94. for source_field, target_field in self._field_mapping.items():
  95. if target_field in target_types:
  96. result[source_field] = target_types[target_field]
  97. """
  98. date_type_mismatch =\
  99. (target_field in target_types) and\
  100. (source_field in source_types) and\
  101. (target_types[target_field] == str) and\
  102. (source_types[source_field] == np.dtype('<M8[ns]'))
  103. if date_type_mismatch:
  104. target_types[target_field] = np.dtype('<M8[ns]')
  105. if (source_field in source_types) and\
  106. (target_field in target_types) and\
  107. (target_types[target_field] != source_types[source_field]):
  108. self.log_and_raise(("Type {0} of field {1} "
  109. "in schema does not match "
  110. "type {2} of field {3} in "
  111. "migration mapping")
  112. .format(target_types[target_field],
  113. target_field,
  114. source_types[source_field],
  115. source_field))
  116. if target_field in target_types:
  117. source_types[source_field] = target_types[target_field]
  118. """
  119. return result
  120. @property
  121. def _value_mappings(self):
  122. '''
  123. '''
  124. return self._mapping_parser.get_value_mappings()
  125. @property
  126. def _date_formats(self):
  127. '''
  128. '''
  129. return self._mapping_parser.get_date_formats()
  130. def _get_mongo_schema_info(self, method_name: str):
  131. '''
  132. '''
  133. result = {}
  134. target_dict = getattr(self._schema_parser, method_name)()
  135. for source_field, target_field in self._field_mapping.items():
  136. if target_field in target_dict:
  137. result[source_field] = target_dict[target_field]
  138. return result
  139. @property
  140. def _allowed_values(self):
  141. '''
  142. '''
  143. return self._get_mongo_schema_info("get_allowed_values")
  144. @property
  145. def _minimum_values(self):
  146. '''
  147. '''
  148. return self._get_mongo_schema_info("get_minimum_value")
  149. @property
  150. def _maximum_values(self):
  151. '''
  152. '''
  153. return self._get_mongo_schema_info("get_maximum_value")
  154. @property
  155. def _patterns(self):
  156. '''
  157. '''
  158. return self._get_mongo_schema_info("get_patterns")
  159. def _filter_invalid_data(self, data: pd.DataFrame,
  160. invalid_mask: pd.Series,
  161. reason: (str, pd.Series)) -> pd.DataFrame:
  162. '''
  163. '''
  164. from libraries.db_handlers.SQLHandler import SQLHandler
  165. assert((self._inconsist_report_table is not None) and
  166. (self._filter_index_columns is not None)),\
  167. "Inconsistent report table or filter index is not provided"
  168. self._assert_dataframe_input(data)
  169. data = data.copy(deep=True)
  170. db = self._sql_db.aquire()#SQLHandler()
  171. if invalid_mask.sum() == 0:
  172. self._sql_db.release(db)
  173. return data
  174. data_inconsist = data.assign(reason=reason)\
  175. .loc[invalid_mask]\
  176. .reset_index(drop=True)
  177. if db.check_if_table_exists(self._inconsist_report_table):
  178. columns = db.get_column_names(tablename=self._inconsist_report_table)
  179. if len(columns) > 0:
  180. data_inconsist = data_inconsist[columns]
  181. db.append_to_table(data=data_inconsist,
  182. tablename=self._inconsist_report_table)
  183. n_rows_filtered = len(data_inconsist)
  184. n_instances_filtered = len(data_inconsist[self._filter_index_columns].drop_duplicates())
  185. del data_inconsist
  186. gc.collect()
  187. self._log.warning(("Filtering: {0} ."
  188. "Filtered {1} rows "
  189. "and {2} instances"
  190. .format(reason, n_rows_filtered, n_instances_filtered)))
  191. nok_index_data = data.loc[invalid_mask, self._filter_index_columns]\
  192. .drop_duplicates().reset_index(drop=True)
  193. nok_index = pd.MultiIndex.from_arrays([nok_index_data[c] for c in
  194. self._filter_index_columns])
  195. all_index = pd.MultiIndex.from_arrays([data[c] for c in
  196. self._filter_index_columns])
  197. data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True)
  198. self._sql_db.release(db)
  199. return data
  200. def _replace_values(self, data: pd.DataFrame,
  201. default: bool) -> pd.DataFrame:
  202. '''
  203. '''
  204. if default:
  205. default_str = "default"
  206. else:
  207. default_str = "equal"
  208. self._assert_dataframe_input(data)
  209. data = data.copy(deep=True)
  210. if default:
  211. mapping = self._default_values
  212. else:
  213. mapping = self._value_mappings
  214. for column, d in mapping.items():
  215. try:
  216. if column not in data.columns:
  217. continue
  218. dtype = data[column].dtype
  219. for key, values in d.items():
  220. if not default:
  221. mask = (data[column].astype(str).isin(values))
  222. else:
  223. mask = (data[column].isin(values))
  224. if default:
  225. mask = mask | (data[column].isnull())
  226. data.loc[mask, column] = key
  227. data[column] = data[column].astype(dtype)
  228. except Exception as e:
  229. self._exception_handler.log_and_raise(("Failed to replace {0} values "
  230. "in {1}. Exit with error {2}"
  231. .format(default_str, column, e)))
  232. self._log.info("Replaced {} values".format(default_str))
  233. return data
  234. def replace_default_values(self, data: pd.DataFrame) -> pd.DataFrame:
  235. '''
  236. '''
  237. return self._replace_values(data=data, default=True)
  238. def map_equal_values(self, data: pd.DataFrame) -> pd.DataFrame:
  239. '''
  240. '''
  241. return self._replace_values(data=data, default=False)
  242. def convert_types(self, data: pd.DataFrame) -> pd.DataFrame:
  243. '''
  244. '''
  245. self._assert_dataframe_input(data)
  246. for column, python_type in self._python_types.items():
  247. try:
  248. if column not in data.columns:
  249. continue
  250. elif column in self._date_formats:
  251. data[column] = CleaningUtils.convert_dates(
  252. series=data[column],
  253. formats=self._date_formats[column])
  254. elif (python_type == int) and data[column].isnull().any():
  255. self.log_and_raise(("Column {} contains missing values "
  256. "and cannot be of integer type"
  257. .format(column)))
  258. elif python_type == str:
  259. python_type = object
  260. else:
  261. data = data.copy(deep=True)
  262. data[column] = data[column].astype(python_type)
  263. if data[column].dtype != python_type:
  264. self._log.warning(("After conversion type in {0} "
  265. "should be {1} "
  266. "but is still {2}"
  267. .format(column,
  268. python_type,
  269. data[column].dtype)))
  270. except Exception as e:
  271. self._exception_handler.log_and_raise(("Failed to convert types in {0}. "
  272. "Exit with error {1}"
  273. .format(column, e)))
  274. self._log.info("Converted dtypes")
  275. return data
  276. def filter_invalid_missing_values(self, data: pd.DataFrame) -> pd.DataFrame:
  277. '''
  278. '''
  279. self._assert_dataframe_input(data)
  280. for column in data.columns:
  281. if (column in self._required_fields) and\
  282. (data[column].isnull().any()):
  283. invalid_mask = data[column].isnull()
  284. reason = "Null value in the required field {}"\
  285. .format(column)
  286. data = self._filter_invalid_data(data=data,
  287. invalid_mask=invalid_mask,
  288. reason=reason)
  289. return data
  290. def filter_invalid_types(self, data: pd.DataFrame) -> pd.DataFrame():
  291. '''
  292. '''
  293. self._assert_dataframe_input(data)
  294. for column in data.columns:
  295. if column not in self._python_types:
  296. continue
  297. python_type = self._python_types[column]
  298. if data[column].dtype != python_type:
  299. def mismatch_type(x):
  300. return type(x) != python_type
  301. invalid_mask = data[column].apply(mismatch_type)
  302. reason = "Type mismatch if field {}".format(column)
  303. data = self._filter_invalid_data(data=data,
  304. invalid_mask=invalid_mask,
  305. reason=reason)
  306. return data
  307. def filter_invalid_patterns(self, data: pd.DataFrame) -> pd.DataFrame:
  308. '''
  309. '''
  310. self._assert_dataframe_input(data)
  311. for column in data.columns:
  312. if column not in self._patterns:
  313. continue
  314. pattern = self._patterns[column]
  315. invalid_mask = (~data[column].astype(str).str.match(pattern))
  316. reason = "Pattern mismatch in field {}".format(column)
  317. data = self._filter_invalid_data(data=data,
  318. invalid_mask=invalid_mask,
  319. reason=reason)
  320. return data
  321. def filter_invalid_values(self, data: pd.DataFrame) -> pd.DataFrame:
  322. '''
  323. '''
  324. for column in data.columns:
  325. if column in self._minimum_values:
  326. min_value = self._minimum_values[column]
  327. invalid_mask = data[column] > min_value
  328. reason = "Too large values in field {}".format(column)
  329. data = self._filter_invalid_data(data=data,
  330. invalid_mask=invalid_mask,
  331. reason=reason)
  332. elif column in self._maximum_values:
  333. max_value = self._maximum_values[column]
  334. invalid_mask = data[column] < max_value
  335. reason = "Too small values in field {}".format(column)
  336. data = self._filter_invalid_data(data=data,
  337. invalid_mask=invalid_mask,
  338. reason=reason)
  339. elif column in self._allowed_values:
  340. allowed_values = self._allowed_values[column]
  341. invalid_mask = (~data[column].isin(allowed_values))
  342. not_allowed_examples = data.loc[invalid_mask, column].unique()[:3]
  343. reason = "Not allowed values {0}... in field {1}"\
  344. .format(not_allowed_examples, column)
  345. data = self._filter_invalid_data(data=data,
  346. invalid_mask=invalid_mask,
  347. reason=reason)
  348. else:
  349. continue
  350. return data
  351. def restrict_to_collection(self, data: pd.DataFrame, collection_name: str) -> pd.DataFrame:
  352. '''
  353. '''
  354. mongo_fields = self._schema_parser.get_fields_restricted_to_collection(collection_name=collection_name)
  355. fields = self._mapping_parser.get_fields_restricted_to_collecton(collection_name=collection_name)
  356. return data[[c for c in data.columns if (c in fields) or (c in mongo_fields)]]
  357. if __name__ == "__main__":
  358. # testing
  359. from libraries.db_handlers.SQLHandler import SQLHandler
  360. mapping_path = os.path.join(".", "migration_mappings", "rs1_mapping.json")
  361. schema_paths = [
  362. os.path.join(".", "mongo_schema", "schema_wheelsets.json"),
  363. os.path.join(".", "mongo_schema", "schema_process_instances.json")]
  364. inconsist_report_table = "test_inconsist_report_rs1"
  365. if all([os.path.isfile(p) for p in schema_paths + [mapping_path]]):
  366. cleaner = MigrationCleaning(
  367. mapping_path=mapping_path,
  368. schema_paths=schema_paths,
  369. mapping_source="internal_name",
  370. mapping_target="mongo_name",
  371. filter_index_columns=["radsatznummer"],
  372. inconsist_report_table=inconsist_report_table)
  373. db = SQLHandler()
  374. data = db.read_sql_to_dataframe("select * from rs1 limit 100")
  375. data = cleaner.replace_default_values(data)
  376. data = cleaner.map_equal_values(data)
  377. data = cleaner.convert_types(data)
  378. non_filtered_len = len(data)
  379. data = cleaner.filter_invalid_types(data)
  380. if len(data) < non_filtered_len:
  381. data = cleaner.convert_types(data)
  382. data = cleaner.filter_invalid_null_values(data)
  383. data = cleaner.filter_invalid_patterns(data)
  384. data = cleaner.filter_notallowed_values(data)
  385. print("Done!")