MigrationCleaning.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Wed Sep 25 08:09:52 2019
  5. @author: tanya
  6. """
  7. import os
  8. import sys
  9. import pandas as pd
  10. import numpy as np
  11. import gc
  12. sys.path.append(os.getcwd())
  13. from cdplib.db_migration.ParseMapping import ParseMapping
  14. from cdplib.db_migration.ParseJsonSchema import ParseJsonSchema
  15. from cdplib.utils.ExceptionsHandler import ExceptionsHandler
  16. from cdplib.utils.CleaningUtils import CleaningUtils
  17. from cdplib.log import Log
  18. class MigrationCleaning:
  19. '''
  20. Class for correcting and filtering the incorrect data.
  21. We keep the correcting and the filtering methods separated,
  22. since there might be other custom steps in between.
  23. '''
  24. def __init__(self, mapping_path: str,
  25. schema_paths: (str, list),
  26. inconsist_report_table: str = None,
  27. filter_index_columns: (str, list) = None,
  28. mapping_source: str = "internal_name",
  29. mapping_target: str = "mongo_name",
  30. mapping_parser: type = ParseMapping,
  31. schema_parser: type = ParseJsonSchema):
  32. '''
  33. '''
  34. self.log = Log('Migration Cleaning')
  35. self._exception_handler = ExceptionsHandler()
  36. assert isinstance(inconsist_report_table, str),\
  37. "Inconsistent report table should be a tablename string"
  38. self._inconsist_report_table = inconsist_report_table
  39. assert isinstance(filter_index_columns, (str, list)),\
  40. "Filter index columns must be a str or a list"
  41. self._filter_index_columns = list(filter_index_columns)
  42. self._schema_parser = schema_parser(schema_paths)
  43. self._mapping_parser = mapping_parser(mapping_path,
  44. source=mapping_source,
  45. target=mapping_target)
  46. self._mapping_path = mapping_path
  47. self._schema_paths = schema_paths
  48. from cdplib.db_handlers.SQLHandler import SQLHandler
  49. self._sql_db = SQLHandler()
  50. def _assert_dataframe_input(self, data: pd.DataFrame):
  51. '''
  52. '''
  53. assert(isinstance(data, pd.DataFrame)),\
  54. "Parameter 'data' must be a pandas dataframe"
  55. @property
  56. def _field_mapping(self):
  57. '''
  58. '''
  59. return self._mapping_parser.get_field_mapping()
  60. @property
  61. def _required_fields(self):
  62. '''
  63. '''
  64. source_required_fields = self._mapping_parser.get_required_fields()
  65. target_required_fields = self._schema_parser.get_required_fields()
  66. for source_field, target_field in self._field_mapping.items():
  67. if (target_field in target_required_fields) and\
  68. (source_field not in source_required_fields):
  69. source_required_fields.append(source_field)
  70. return source_required_fields
  71. @property
  72. def _default_values(self):
  73. '''
  74. '''
  75. default_values = {}
  76. target_default_values = self._schema_parser.get_default_values()
  77. source_default_values = self._mapping_parser.get_default_values()
  78. for source_field, target_field in self._field_mapping.items():
  79. if source_field not in source_default_values:
  80. continue
  81. elif target_field not in target_default_values:
  82. target_default_values[target_field] = np.nan
  83. default_values[source_field] = {
  84. target_default_values[target_field]:
  85. source_default_values[source_field]
  86. }
  87. return default_values
  88. @property
  89. def _python_types(self):
  90. '''
  91. '''
  92. target_types = self._schema_parser.get_python_types()
  93. result = {}
  94. for source_field, target_field in self._field_mapping.items():
  95. if target_field in target_types:
  96. result[source_field] = target_types[target_field]
  97. """
  98. date_type_mismatch =\
  99. (target_field in target_types) and\
  100. (source_field in source_types) and\
  101. (target_types[target_field] == str) and\
  102. (source_types[source_field] == np.dtype('<M8[ns]'))
  103. if date_type_mismatch:
  104. target_types[target_field] = np.dtype('<M8[ns]')
  105. if (source_field in source_types) and\
  106. (target_field in target_types) and\
  107. (target_types[target_field] != source_types[source_field]):
  108. self.log.log_and_raise_error(("Type {0} of field {1} "
  109. "in schema does not match "
  110. "type {2} of field {3} in "
  111. "migration mapping")
  112. .format(target_types[target_field],
  113. target_field,
  114. source_types[source_field],
  115. source_field))
  116. if target_field in target_types:
  117. source_types[source_field] = target_types[target_field]
  118. """
  119. return result
  120. @property
  121. def _value_mappings(self):
  122. '''
  123. '''
  124. return self._mapping_parser.get_value_mappings()
  125. @property
  126. def _date_formats(self):
  127. '''
  128. '''
  129. return self._mapping_parser.get_date_formats()
  130. def _get_mongo_schema_info(self, method_name: str):
  131. '''
  132. '''
  133. result = {}
  134. target_dict = getattr(self._schema_parser, method_name)()
  135. for source_field, target_field in self._field_mapping.items():
  136. if target_field in target_dict:
  137. result[source_field] = target_dict[target_field]
  138. return result
  139. @property
  140. def _allowed_values(self):
  141. '''
  142. '''
  143. return self._get_mongo_schema_info("get_allowed_values")
  144. @property
  145. def _minimum_values(self):
  146. '''
  147. '''
  148. return self._get_mongo_schema_info("get_minimum_value")
  149. @property
  150. def _maximum_values(self):
  151. '''
  152. '''
  153. return self._get_mongo_schema_info("get_maximum_value")
  154. @property
  155. def _patterns(self):
  156. '''
  157. '''
  158. return self._get_mongo_schema_info("get_patterns")
  159. def _filter_invalid_data(self, data: pd.DataFrame,
  160. invalid_mask: pd.Series,
  161. reason: (str, pd.Series)) -> pd.DataFrame:
  162. '''
  163. '''
  164. assert((self._inconsist_report_table is not None) and
  165. (self._filter_index_columns is not None)),\
  166. "Inconsistent report table or filter index is not provided"
  167. self._assert_dataframe_input(data)
  168. data = data.copy(deep=True)
  169. db = self._sql_db
  170. if invalid_mask.sum() == 0:
  171. return data
  172. data_inconsist = data.assign(reason=reason)\
  173. .loc[invalid_mask]\
  174. .reset_index(drop=True)
  175. if db.check_if_table_exists(self._inconsist_report_table):
  176. columns = db.get_column_names(tablename=self._inconsist_report_table)
  177. if len(columns) > 0:
  178. data_inconsist = data_inconsist[columns]
  179. db.append_to_table(data=data_inconsist,
  180. tablename=self._inconsist_report_table)
  181. n_rows_filtered = len(data_inconsist)
  182. n_instances_filtered = len(data_inconsist[self._filter_index_columns].drop_duplicates())
  183. del data_inconsist
  184. gc.collect()
  185. self.log.warning(("Filtering: {0} ."
  186. "Filtered {1} rows "
  187. "and {2} instances"
  188. .format(reason, n_rows_filtered, n_instances_filtered)))
  189. nok_index_data = data.loc[invalid_mask, self._filter_index_columns]\
  190. .drop_duplicates().reset_index(drop=True)
  191. nok_index = pd.MultiIndex.from_arrays([nok_index_data[c] for c in
  192. self._filter_index_columns])
  193. all_index = pd.MultiIndex.from_arrays([data[c] for c in
  194. self._filter_index_columns])
  195. data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True)
  196. #self._sql_db.release(db)
  197. return data
  198. def _replace_values(self, data: pd.DataFrame,
  199. default: bool) -> pd.DataFrame:
  200. '''
  201. '''
  202. if default:
  203. default_str = "default"
  204. else:
  205. default_str = "equal"
  206. self._assert_dataframe_input(data)
  207. data = data.copy(deep=True)
  208. if default:
  209. mapping = self._default_values
  210. else:
  211. mapping = self._value_mappings
  212. for column, d in mapping.items():
  213. try:
  214. if column not in data.columns:
  215. continue
  216. dtype = data[column].dtype
  217. for key, values in d.items():
  218. if not default:
  219. mask = (data[column].astype(str).isin(values))
  220. else:
  221. mask = (data[column].isin(values))
  222. if default:
  223. mask = mask | (data[column].isnull())
  224. data.loc[mask, column] = key
  225. data[column] = data[column].astype(dtype)
  226. except Exception as e:
  227. self.log.log_and_raise_error(("Failed to replace {0} values "
  228. "in {1}. Exit with error {2}"
  229. .format(default_str, column, e)))
  230. self.log.info("Replaced {} values".format(default_str))
  231. return data
  232. def replace_default_values(self, data: pd.DataFrame) -> pd.DataFrame:
  233. '''
  234. '''
  235. return self._replace_values(data=data, default=True)
  236. def map_equal_values(self, data: pd.DataFrame) -> pd.DataFrame:
  237. '''
  238. '''
  239. return self._replace_values(data=data, default=False)
  240. def convert_types(self, data: pd.DataFrame) -> pd.DataFrame:
  241. '''
  242. '''
  243. self._assert_dataframe_input(data)
  244. for column, python_type in self._python_types.items():
  245. try:
  246. if column not in data.columns:
  247. continue
  248. elif column in self._date_formats:
  249. data[column] = CleaningUtils.convert_dates(
  250. series=data[column],
  251. formats=self._date_formats[column])
  252. elif (python_type == int) and data[column].isnull().any():
  253. self.log.log_and_raise_error(("Column {} contains missing values "
  254. "and cannot be of integer type"
  255. .format(column)))
  256. elif python_type == str:
  257. python_type = object
  258. else:
  259. data = data.copy(deep=True)
  260. data[column] = data[column].astype(python_type)
  261. if data[column].dtype != python_type:
  262. self.log.warning(("After conversion type in {0} "
  263. "should be {1} "
  264. "but is still {2}"
  265. .format(column,
  266. python_type,
  267. data[column].dtype)))
  268. except Exception as e:
  269. self.log.log_and_raise_error(("Failed to convert types in {0}. "
  270. "Exit with error {1}"
  271. .format(column, e)))
  272. self.log.info("Converted dtypes")
  273. return data
  274. def filter_invalid_missing_values(self, data: pd.DataFrame) -> pd.DataFrame:
  275. '''
  276. '''
  277. self._assert_dataframe_input(data)
  278. for column in data.columns:
  279. if (column in self._required_fields) and\
  280. (data[column].isnull().any()):
  281. invalid_mask = data[column].isnull()
  282. reason = "Null value in the required field {}"\
  283. .format(column)
  284. data = self._filter_invalid_data(data=data,
  285. invalid_mask=invalid_mask,
  286. reason=reason)
  287. return data
  288. def filter_invalid_types(self, data: pd.DataFrame) -> pd.DataFrame():
  289. '''
  290. '''
  291. self._assert_dataframe_input(data)
  292. for column in data.columns:
  293. if column not in self._python_types:
  294. continue
  295. python_type = self._python_types[column]
  296. if data[column].dtype != python_type:
  297. def mismatch_type(x):
  298. return type(x) != python_type
  299. invalid_mask = data[column].apply(mismatch_type)
  300. reason = "Type mismatch if field {}".format(column)
  301. data = self._filter_invalid_data(data=data,
  302. invalid_mask=invalid_mask,
  303. reason=reason)
  304. return data
  305. def filter_invalid_patterns(self, data: pd.DataFrame) -> pd.DataFrame:
  306. '''
  307. '''
  308. self._assert_dataframe_input(data)
  309. for column in data.columns:
  310. if column not in self._patterns:
  311. continue
  312. pattern = self._patterns[column]
  313. invalid_mask = (~data[column].astype(str).str.match(pattern))
  314. reason = "Pattern mismatch in field {}".format(column)
  315. data = self._filter_invalid_data(data=data,
  316. invalid_mask=invalid_mask,
  317. reason=reason)
  318. return data
  319. def filter_invalid_values(self, data: pd.DataFrame) -> pd.DataFrame:
  320. '''
  321. '''
  322. for column in data.columns:
  323. if column in self._minimum_values:
  324. min_value = self._minimum_values[column]
  325. invalid_mask = data[column] > min_value
  326. reason = "Too large values in field {}".format(column)
  327. data = self._filter_invalid_data(data=data,
  328. invalid_mask=invalid_mask,
  329. reason=reason)
  330. elif column in self._maximum_values:
  331. max_value = self._maximum_values[column]
  332. invalid_mask = data[column] < max_value
  333. reason = "Too small values in field {}".format(column)
  334. data = self._filter_invalid_data(data=data,
  335. invalid_mask=invalid_mask,
  336. reason=reason)
  337. elif column in self._allowed_values:
  338. allowed_values = self._allowed_values[column]
  339. invalid_mask = (~data[column].isin(allowed_values))
  340. not_allowed_examples = data.loc[invalid_mask, column].unique()[:3]
  341. reason = "Not allowed values {0}... in field {1}"\
  342. .format(not_allowed_examples, column)
  343. data = self._filter_invalid_data(data=data,
  344. invalid_mask=invalid_mask,
  345. reason=reason)
  346. else:
  347. continue
  348. return data
  349. def restrict_to_collection(self, data: pd.DataFrame, collection_name: str) -> pd.DataFrame:
  350. '''
  351. '''
  352. mongo_fields = self._schema_parser.get_fields_restricted_to_collection(collection_name=collection_name)
  353. fields = self._mapping_parser.get_fields_restricted_to_collecton(collection_name=collection_name)
  354. return data[[c for c in data.columns if (c in fields) or (c in mongo_fields)]]
  355. if __name__ == "__main__":
  356. # testing
  357. from cdplib.db_handlers.SQLHandler import SQLHandler
  358. mapping_path = os.path.join(".", "migration_mappings", "rs1_mapping.json")
  359. schema_paths = [
  360. os.path.join(".", "mongo_schema", "schema_wheelsets.json"),
  361. os.path.join(".", "mongo_schema", "schema_process_instances.json")]
  362. inconsist_report_table = "test_inconsist_report_rs1"
  363. if all([os.path.isfile(p) for p in schema_paths + [mapping_path]]):
  364. cleaner = MigrationCleaning(
  365. mapping_path=mapping_path,
  366. schema_paths=schema_paths,
  367. mapping_source="internal_name",
  368. mapping_target="mongo_name",
  369. filter_index_columns=["radsatznummer"],
  370. inconsist_report_table=inconsist_report_table)
  371. db = SQLHandler()
  372. data = db.read_sql_to_dataframe("select * from rs1 limit 100")
  373. data = cleaner.replace_default_values(data)
  374. data = cleaner.map_equal_values(data)
  375. data = cleaner.convert_types(data)
  376. non_filtered_len = len(data)
  377. data = cleaner.filter_invalid_types(data)
  378. if len(data) < non_filtered_len:
  379. data = cleaner.convert_types(data)
  380. data = cleaner.filter_invalid_null_values(data)
  381. data = cleaner.filter_invalid_patterns(data)
  382. data = cleaner.filter_notallowed_values(data)
  383. print("Done!")