MigrationCleaning.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Wed Sep 25 08:09:52 2019
  5. @author: tanya
  6. """
  7. import os
  8. import sys
  9. import pandas as pd
  10. import numpy as np
  11. import gc
  12. sys.path.append(os.getcwd())
  13. from cdplib.db_migration.ParseMapping import ParseMapping
  14. from cdplib.db_migration.ParseJsonSchema import ParseJsonSchema
  15. from cdplib.utils.ExceptionsHandler import ExceptionsHandler
  16. from cdplib.utils.CleaningUtils import CleaningUtils
  17. from cdplib.log import Log
  18. class MigrationCleaning:
  19. '''
  20. Class for correcting and filtering the incorrect data.
  21. We keep the correcting and the filtering methods separated,
  22. since there might be other custom steps in between.
  23. '''
  24. def __init__(self, mapping_path: str,
  25. schema_paths: (str, list),
  26. inconsist_report_table: str = None,
  27. filter_index_columns: (str, list) = None,
  28. mapping_source_name_tag: str = "internal_name",
  29. mapping_target_name_tag: str = "mongo_name",
  30. target_collection_name: str = None,
  31. mapping_parser: type = ParseMapping,
  32. schema_parser: type = ParseJsonSchema):
  33. '''
  34. '''
  35. self.log = Log('Migration Cleaning')
  36. self._exception_handler = ExceptionsHandler()
  37. if inconsist_report_table is not None:
  38. assert isinstance(inconsist_report_table, str),\
  39. "Inconsistent report table should be a tablename string"
  40. self._inconsist_report_table = inconsist_report_table
  41. if filter_index_columns is not None:
  42. assert isinstance(filter_index_columns, (str, list)),\
  43. "Filter index columns must be a str or a list"
  44. self._filter_index_columns = list(filter_index_columns)
  45. else:
  46. self._filter_index_columns = None
  47. self._mapping_parser = mapping_parser(mapping_path,
  48. source_name_tag=mapping_source_name_tag,
  49. target_name_tag=mapping_target_name_tag,
  50. target_collection_name=target_collection_name)
  51. if target_collection_name is not None:
  52. schema_names = [os.path.basename(schema_path) for schema_path in schema_paths]
  53. convention_schema_name = "schema_" + target_collection_name + ".json"
  54. if convention_schema_name not in schema_names:
  55. self._log.log_and_raise_warning("Found no matching of the collection name {0} in schema paths {1}"
  56. .format(target_collection_name, schema_paths))
  57. else:
  58. self._schema_parser = schema_parser(schema_paths[schema_names.index(convention_schema_name)])
  59. else:
  60. self._schema_parser = schema_parser(schema_paths)
  61. self._mapping_path = mapping_path
  62. self._schema_paths = schema_paths
  63. from cdplib.db_handlers.SQLHandler import SQLHandlerPool
  64. self._sql_db = SQLHandlerPool(20)
  65. def _assert_dataframe_input(self, data: pd.DataFrame):
  66. '''
  67. '''
  68. assert(isinstance(data, pd.DataFrame)),\
  69. "Parameter 'data' must be a pandas dataframe"
  70. @property
  71. def _field_mapping(self, collection_name: str = None):
  72. '''
  73. '''
  74. return self._mapping_parser.get_field_mapping()
  75. @property
  76. def _required_fields(self):
  77. '''
  78. '''
  79. source_required_fields = self._mapping_parser.get_required_fields()
  80. target_required_fields = self._schema_parser.get_required_fields()
  81. for source_field, target_field in self._field_mapping.items():
  82. if (target_field in target_required_fields) and\
  83. (source_field not in source_required_fields):
  84. source_required_fields.append(source_field)
  85. return source_required_fields
  86. @property
  87. def _default_values(self):
  88. '''
  89. '''
  90. default_values = {}
  91. target_default_values = self._schema_parser.get_default_values()
  92. source_default_values = self._mapping_parser.get_default_values()
  93. for source_field, target_field in self._field_mapping.items():
  94. if source_field not in source_default_values:
  95. continue
  96. elif target_field not in target_default_values:
  97. target_default_values[target_field] = np.nan
  98. default_values[source_field] = {
  99. target_default_values[target_field]:
  100. source_default_values[source_field]
  101. }
  102. return default_values
  103. @property
  104. def _python_types(self):
  105. '''
  106. '''
  107. target_types = self._schema_parser.get_python_types()
  108. result = {}
  109. for source_field, target_field in self._field_mapping.items():
  110. if target_field in target_types:
  111. result[source_field] = target_types[target_field]
  112. """
  113. date_type_mismatch =\
  114. (target_field in target_types) and\
  115. (source_field in source_types) and\
  116. (target_types[target_field] == str) and\
  117. (source_types[source_field] == np.dtype('<M8[ns]'))
  118. if date_type_mismatch:
  119. target_types[target_field] = np.dtype('<M8[ns]')
  120. if (source_field in source_types) and\
  121. (target_field in target_types) and\
  122. (target_types[target_field] != source_types[source_field]):
  123. self.log.log_and_raise_error(("Type {0} of field {1} "
  124. "in schema does not match "
  125. "type {2} of field {3} in "
  126. "migration mapping")
  127. .format(target_types[target_field],
  128. target_field,
  129. source_types[source_field],
  130. source_field))
  131. if target_field in target_types:
  132. source_types[source_field] = target_types[target_field]
  133. """
  134. return result
  135. @property
  136. def _value_mappings(self):
  137. '''
  138. '''
  139. return self._mapping_parser.get_value_mappings()
  140. @property
  141. def _date_formats(self):
  142. '''
  143. '''
  144. return self._mapping_parser.get_date_formats()
  145. def _get_mongo_schema_info(self, method_name: str):
  146. '''
  147. '''
  148. result = {}
  149. target_dict = getattr(self._schema_parser, method_name)()
  150. for source_field, target_field in self._field_mapping.items():
  151. if target_field in target_dict:
  152. result[source_field] = target_dict[target_field]
  153. return result
  154. @property
  155. def _allowed_values(self):
  156. '''
  157. '''
  158. return self._get_mongo_schema_info("get_allowed_values")
  159. @property
  160. def _minimum_values(self):
  161. '''
  162. '''
  163. return self._get_mongo_schema_info("get_minimum_value")
  164. @property
  165. def _maximum_values(self):
  166. '''
  167. '''
  168. return self._get_mongo_schema_info("get_maximum_value")
  169. @property
  170. def _patterns(self):
  171. '''
  172. '''
  173. return self._get_mongo_schema_info("get_patterns")
  174. def _filter_invalid_data(self, data: pd.DataFrame,
  175. invalid_mask: pd.Series,
  176. reason: (str, pd.Series)) -> pd.DataFrame:
  177. '''
  178. '''
  179. assert((self._inconsist_report_table is not None) and
  180. (self._filter_index_columns is not None)),\
  181. "Inconsistent report table or filter index is not provided"
  182. self._assert_dataframe_input(data)
  183. data = data.copy(deep=True)
  184. #db = self._sql_db.aquire()
  185. from cdplib.db_handlers.SQLHandler import SQLHandler
  186. db = SQLHandler()
  187. if invalid_mask.sum() == 0:
  188. #self._sql_db.release(db)
  189. return data
  190. data_inconsist = data.assign(reason=reason)\
  191. .loc[invalid_mask]\
  192. .reset_index(drop=True)
  193. if db.check_if_table_exists(self._inconsist_report_table):
  194. columns = db.get_column_names(tablename=self._inconsist_report_table)
  195. if len(columns) > 0:
  196. data_inconsist = data_inconsist[columns]
  197. db.append_to_table(data=data_inconsist,
  198. tablename=self._inconsist_report_table)
  199. n_rows_filtered = len(data_inconsist)
  200. n_instances_filtered = len(data_inconsist[self._filter_index_columns].drop_duplicates())
  201. del data_inconsist
  202. gc.collect()
  203. self.log.warning(("Filtering: {0} ."
  204. "Filtered {1} rows "
  205. "and {2} instances"
  206. .format(reason, n_rows_filtered, n_instances_filtered)))
  207. nok_index_data = data.loc[invalid_mask, self._filter_index_columns]\
  208. .drop_duplicates().reset_index(drop=True)
  209. nok_index = pd.MultiIndex.from_arrays([nok_index_data[c] for c in
  210. self._filter_index_columns])
  211. all_index = pd.MultiIndex.from_arrays([data[c] for c in
  212. self._filter_index_columns])
  213. data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True)
  214. #self._sql_db.release(db)
  215. return data
  216. def _replace_values(self, data: pd.DataFrame,
  217. default: bool) -> pd.DataFrame:
  218. '''
  219. '''
  220. if default:
  221. default_str = "default"
  222. else:
  223. default_str = "equal"
  224. self._assert_dataframe_input(data)
  225. data = data.copy(deep=True)
  226. if default:
  227. mapping = self._default_values
  228. else:
  229. mapping = self._value_mappings
  230. for column, d in mapping.items():
  231. try:
  232. if column not in data.columns:
  233. continue
  234. dtype = data[column].dtype
  235. for key, values in d.items():
  236. if not default:
  237. mask = (data[column].astype(str).isin(values))
  238. else:
  239. mask = (data[column].isin(values))
  240. if default:
  241. mask = mask | (data[column].isnull())
  242. data.loc[mask, column] = key
  243. data[column] = data[column].astype(dtype)
  244. except Exception as e:
  245. self.log.log_and_raise_error(("Failed to replace {0} values "
  246. "in {1}. Exit with error {2}"
  247. .format(default_str, column, e)))
  248. self.log.info("Replaced {} values".format(default_str))
  249. return data
  250. def replace_default_values(self, data: pd.DataFrame) -> pd.DataFrame:
  251. '''
  252. '''
  253. return self._replace_values(data=data, default=True)
  254. def map_equal_values(self, data: pd.DataFrame) -> pd.DataFrame:
  255. '''
  256. '''
  257. return self._replace_values(data=data, default=False)
  258. def convert_types(self, data: pd.DataFrame) -> pd.DataFrame:
  259. '''
  260. '''
  261. self._assert_dataframe_input(data)
  262. for column, python_type in self._python_types.items():
  263. try:
  264. if column not in data.columns:
  265. continue
  266. elif column in self._date_formats:
  267. data[column] = CleaningUtils.convert_dates(
  268. series=data[column],
  269. formats=self._date_formats[column])
  270. elif (python_type == int) and data[column].isnull().any():
  271. self.log.log_and_raise_error(("Column {} contains missing values "
  272. "and cannot be of integer type"
  273. .format(column)))
  274. elif python_type == str:
  275. python_type = object
  276. else:
  277. data = data.copy(deep=True)
  278. data[column] = data[column].astype(python_type)
  279. if data[column].dtype != python_type:
  280. self.log.warning(("After conversion type in {0} "
  281. "should be {1} "
  282. "but is still {2}"
  283. .format(column,
  284. python_type,
  285. data[column].dtype)))
  286. except Exception as e:
  287. self.log.log_and_raise_error(("Failed to convert types in {0}. "
  288. "Exit with error {1}"
  289. .format(column, e)))
  290. self.log.info("Converted dtypes")
  291. return data
  292. def filter_invalid_missing_values(self, data: pd.DataFrame) -> pd.DataFrame:
  293. '''
  294. '''
  295. self._assert_dataframe_input(data)
  296. for column in data.columns:
  297. if (column in self._required_fields) and\
  298. (data[column].isnull().any()):
  299. invalid_mask = data[column].isnull()
  300. reason = "Null value in the required field {}"\
  301. .format(column)
  302. data = self._filter_invalid_data(data=data,
  303. invalid_mask=invalid_mask,
  304. reason=reason)
  305. return data
  306. def filter_invalid_types(self, data: pd.DataFrame) -> pd.DataFrame():
  307. '''
  308. '''
  309. self._assert_dataframe_input(data)
  310. for column in data.columns:
  311. if column not in self._python_types:
  312. continue
  313. python_type = self._python_types[column]
  314. if data[column].dtype != python_type:
  315. def mismatch_type(x):
  316. return type(x) != python_type
  317. invalid_mask = data[column].apply(mismatch_type)
  318. reason = "Type mismatch if field {}".format(column)
  319. data = self._filter_invalid_data(data=data,
  320. invalid_mask=invalid_mask,
  321. reason=reason)
  322. return data
  323. def filter_invalid_patterns(self, data: pd.DataFrame) -> pd.DataFrame:
  324. '''
  325. '''
  326. self._assert_dataframe_input(data)
  327. for column in data.columns:
  328. if column not in self._patterns:
  329. continue
  330. pattern = self._patterns[column]
  331. invalid_mask = (~data[column].astype(str).str.match(pattern))
  332. reason = "Pattern mismatch in field {}".format(column)
  333. data = self._filter_invalid_data(data=data,
  334. invalid_mask=invalid_mask,
  335. reason=reason)
  336. return data
  337. def filter_invalid_values(self, data: pd.DataFrame) -> pd.DataFrame:
  338. '''
  339. '''
  340. for column in data.columns:
  341. if column in self._minimum_values:
  342. min_value = self._minimum_values[column]
  343. invalid_mask = data[column] > min_value
  344. reason = "Too large values in field {}".format(column)
  345. data = self._filter_invalid_data(data=data,
  346. invalid_mask=invalid_mask,
  347. reason=reason)
  348. elif column in self._maximum_values:
  349. max_value = self._maximum_values[column]
  350. invalid_mask = data[column] < max_value
  351. reason = "Too small values in field {}".format(column)
  352. data = self._filter_invalid_data(data=data,
  353. invalid_mask=invalid_mask,
  354. reason=reason)
  355. elif column in self._allowed_values:
  356. allowed_values = self._allowed_values[column]
  357. invalid_mask = (~data[column].isin(allowed_values))
  358. not_allowed_examples = data.loc[invalid_mask, column].unique()[:3]
  359. reason = "Not allowed values {0}... in field {1}"\
  360. .format(not_allowed_examples, column)
  361. data = self._filter_invalid_data(data=data,
  362. invalid_mask=invalid_mask,
  363. reason=reason)
  364. else:
  365. continue
  366. return data
  367. """
  368. def restrict_to_collection(self, data: pd.DataFrame, collection_name: str) -> pd.DataFrame:
  369. '''
  370. '''
  371. mongo_fields = self._schema_parser.get_fields_restricted_to_collection(collection_name=collection_name)
  372. fields = self._mapping_parser.get_fields_restricted_to_collecton(collection_name=collection_name)
  373. return data[[c for c in data.columns if (c in fields) or (c in mongo_fields)]]
  374. """
  375. if __name__ == "__main__":
  376. # testing
  377. from cdplib.db_handlers.SQLHandler import SQLHandler
  378. mapping_path = os.path.join(".", "migration_mappings", "rs1_mapping.json")
  379. schema_paths = [
  380. os.path.join(".", "mongo_schema", "schema_wheelsets.json"),
  381. os.path.join(".", "mongo_schema", "schema_process_instances.json")]
  382. inconsist_report_table = "test_inconsist_report_rs1"
  383. if all([os.path.isfile(p) for p in schema_paths + [mapping_path]]):
  384. cleaner = MigrationCleaning(
  385. mapping_path=mapping_path,
  386. schema_paths=schema_paths,
  387. mapping_source_name_tag="internal_name",
  388. mapping_target_name_tag="mongo_name",
  389. filter_index_columns=["radsatznummer"],
  390. inconsist_report_table=inconsist_report_table)
  391. db = SQLHandler()
  392. data = db.read_sql_to_dataframe("select * from rs1 limit 100")
  393. data = cleaner.replace_default_values(data)
  394. data = cleaner.map_equal_values(data)
  395. data = cleaner.convert_types(data)
  396. non_filtered_len = len(data)
  397. data = cleaner.filter_invalid_types(data)
  398. if len(data) < non_filtered_len:
  399. data = cleaner.convert_types(data)
  400. data = cleaner.filter_invalid_null_values(data)
  401. data = cleaner.filter_invalid_patterns(data)
  402. data = cleaner.filter_notallowed_values(data)
  403. print("Done!")