MigrationCleaning.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Wed Sep 25 08:09:52 2019
  5. @author: tanya
  6. """
  7. import os
  8. import sys
  9. import pandas as pd
  10. import numpy as np
  11. import gc
  12. from copy import deepcopy
  13. sys.path.append(os.getcwd())
  14. from cdplib.db_migration.ParseMapping import ParseMapping
  15. from cdplib.db_migration.ParseJsonSchema import ParseJsonSchema
  16. from cdplib.utils.ExceptionsHandler import ExceptionsHandler
  17. from cdplib.utils.CleaningUtils import CleaningUtils
  18. from cdplib.log import Log
  19. import json
  20. from boltons.iterutils import remap
  21. class MigrationCleaning:
  22. '''
  23. Class for correcting and filtering the incorrect data.
  24. We keep the correcting and the filtering methods separated,
  25. since there might be other custom steps in between.
  26. '''
  27. def __init__(self, mapping_paths: (str, list),
  28. schema_paths: (str, list),
  29. inconsist_report_table: str = None,
  30. filter_index_columns: (str, list) = None,
  31. mapping_source: str = "internal_name",
  32. mapping_target: str = "mongo_name",
  33. mapping_parser: type = ParseMapping,
  34. schema_parser: type = ParseJsonSchema):
  35. '''
  36. '''
  37. self.log = Log('Migration Cleaning')
  38. self._exception_handler = ExceptionsHandler()
  39. assert isinstance(inconsist_report_table, str),\
  40. "Inconsistent report table should be a tablename string"
  41. self._inconsist_report_table = inconsist_report_table
  42. assert isinstance(filter_index_columns, (str, list)),\
  43. "Filter index columns must be a str or a list"
  44. self._filter_index_columns = list(filter_index_columns)
  45. self._schema_parser = schema_parser(schema_paths)
  46. self._mapping_parser = mapping_parser(mapping_paths,
  47. source=mapping_source,
  48. target=mapping_target)
  49. self._mapping_paths = mapping_paths
  50. self._schema_paths = schema_paths
  51. from cdplib.db_handlers.SQLHandler import SQLHandler
  52. self._sql_db = SQLHandler()
  53. def _assert_dataframe_input(self, data: pd.DataFrame):
  54. '''
  55. '''
  56. assert(isinstance(data, pd.DataFrame)),\
  57. "Parameter 'data' must be a pandas dataframe"
  58. def append_mapping_path(self, mapping_path = str):
  59. '''
  60. Appends a new mapping to the _mapping_paths variable from MigartionCleaning
  61. and to mapping_paths from ParseMapping
  62. '''
  63. assert(isinstance(mapping_path, str)),\
  64. "Parameter 'mapping_path' must be a string"
  65. mapping_paths = []
  66. mapping_paths.append(self._mapping_paths)
  67. mapping_paths.append(mapping_path)
  68. self._mapping_paths = mapping_paths
  69. self._mapping_parser._mapping_paths = mapping_paths
  70. @property
  71. def _field_mapping(self):
  72. '''
  73. '''
  74. return self._mapping_parser.get_field_mapping()
  75. @property
  76. def _required_fields(self):
  77. '''
  78. '''
  79. source_required_fields = self._mapping_parser.get_required_fields()
  80. target_required_fields = self._schema_parser.get_required_fields()
  81. for source_field, target_field in self._field_mapping.items():
  82. if (target_field in target_required_fields) and\
  83. (source_field not in source_required_fields):
  84. source_required_fields.append(source_field)
  85. return source_required_fields
  86. @property
  87. def _default_values(self):
  88. '''
  89. Returns a dictonary in which the default values of the mongo schema
  90. are mapped to the default values of the migration mapping. In migration
  91. mapping the default values should be specified as the values which
  92. doesn't contain any information and can be seen therefore as an empty
  93. value.
  94. '''
  95. default_values = {}
  96. target_default_values = self._schema_parser.get_default_values()
  97. source_default_values = self._mapping_parser.get_default_values()
  98. for source_field, target_field in self._field_mapping.items():
  99. if source_field not in source_default_values:
  100. continue
  101. elif target_field not in target_default_values:
  102. target_default_values[target_field] = np.nan
  103. default_values[source_field] = {
  104. target_default_values[target_field]:
  105. source_default_values[source_field]
  106. }
  107. return default_values
  108. @property
  109. def _python_types(self):
  110. '''
  111. '''
  112. target_types = self._schema_parser.get_python_types()
  113. result = {}
  114. for source_field, target_field in self._field_mapping.items():
  115. if target_field in target_types:
  116. result[source_field] = target_types[target_field]
  117. """
  118. date_type_mismatch =\
  119. (target_field in target_types) and\
  120. (source_field in source_types) and\
  121. (target_types[target_field] == str) and\
  122. (source_types[source_field] == np.dtype('<M8[ns]'))
  123. if date_type_mismatch:
  124. target_types[target_field] = np.dtype('<M8[ns]')
  125. if (source_field in source_types) and\
  126. (target_field in target_types) and\
  127. (target_types[target_field] != source_types[source_field]):
  128. self.log.log_and_raise_error(("Type {0} of field {1} "
  129. "in schema does not match "
  130. "type {2} of field {3} in "
  131. "migration mapping")
  132. .format(target_types[target_field],
  133. target_field,
  134. source_types[source_field],
  135. source_field))
  136. if target_field in target_types:
  137. source_types[source_field] = target_types[target_field]
  138. """
  139. return result
  140. @property
  141. def _value_mappings(self):
  142. '''
  143. '''
  144. return self._mapping_parser.get_value_mappings()
  145. @property
  146. def _date_formats(self):
  147. '''
  148. '''
  149. return self._mapping_parser.get_date_formats()
  150. @property
  151. def _allowed_values(self):
  152. '''
  153. '''
  154. return self._get_mongo_schema_info("get_allowed_values")
  155. @property
  156. def _minimum_values(self):
  157. '''
  158. '''
  159. return self._get_mongo_schema_info("get_minimum_value")
  160. @property
  161. def _maximum_values(self):
  162. '''
  163. '''
  164. return self._get_mongo_schema_info("get_maximum_value")
  165. @property
  166. def _patterns(self):
  167. '''
  168. '''
  169. return self._get_mongo_schema_info("get_patterns")
  170. def _get_mongo_schema_info(self, method_name: str):
  171. '''
  172. '''
  173. result = {}
  174. target_dict = getattr(self._schema_parser, method_name)()
  175. for source_field, target_field in self._field_mapping.items():
  176. if target_field in target_dict:
  177. result[source_field] = target_dict[target_field]
  178. return result
  179. def _filter_invalid_data(self, data: pd.DataFrame,
  180. invalid_mask: pd.Series,
  181. reason: (str, pd.Series)) -> pd.DataFrame:
  182. '''
  183. '''
  184. assert((self._inconsist_report_table is not None) and
  185. (self._filter_index_columns is not None)),\
  186. "Inconsistent report table or filter index is not provided"
  187. self._assert_dataframe_input(data)
  188. data = data.copy(deep=True)
  189. db = self._sql_db
  190. if invalid_mask.sum() == 0:
  191. return data
  192. data_inconsist = data.assign(reason=reason)\
  193. .loc[invalid_mask]\
  194. .reset_index(drop=True)
  195. if db.check_if_table_exists(self._inconsist_report_table):
  196. columns = db.get_column_names(tablename=self._inconsist_report_table)
  197. if len(columns) > 0:
  198. columns_not_in_data = [column for column in columns if column not in data.columns]
  199. for value in columns_not_in_data:
  200. data_inconsist[value] = 'Column does not exist in the mongo database and has therefore been dropped'
  201. data_inconsist = data_inconsist[columns]
  202. db.append_to_table(data=data_inconsist,
  203. tablename=self._inconsist_report_table)
  204. n_rows_filtered = len(data_inconsist)
  205. n_instances_filtered = len(data_inconsist[self._filter_index_columns].drop_duplicates())
  206. del data_inconsist
  207. gc.collect()
  208. self.log.warning(("Filtering: {0} ."
  209. "Filtered {1} rows "
  210. "and {2} instances"
  211. .format(reason, n_rows_filtered, n_instances_filtered)))
  212. nok_index_data = data.loc[invalid_mask, self._filter_index_columns]\
  213. .drop_duplicates().reset_index(drop=True)
  214. nok_index = pd.MultiIndex.from_arrays([nok_index_data[c] for c in
  215. self._filter_index_columns])
  216. all_index = pd.MultiIndex.from_arrays([data[c] for c in
  217. self._filter_index_columns])
  218. data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True)
  219. return data
  220. def _replace_values(self, data: pd.DataFrame,
  221. default: bool) -> pd.DataFrame:
  222. '''
  223. '''
  224. if default:
  225. default_str = "default"
  226. else:
  227. default_str = "equal"
  228. self._assert_dataframe_input(data)
  229. data = data.copy(deep=True)
  230. if default:
  231. mapping = self._default_values
  232. else:
  233. mapping = self._value_mappings
  234. for column, d in mapping.items():
  235. try:
  236. if column not in data.columns:
  237. continue
  238. dtype = data[column].dtype
  239. for key, values in d.items():
  240. if not default:
  241. mask = (data[column].astype(str).isin(values))
  242. else:
  243. mask = (data[column].isin(values))
  244. if default:
  245. mask = mask | (data[column].isnull())
  246. data.loc[mask, column] = key
  247. data[column] = data[column].astype(dtype)
  248. except Exception as e:
  249. self.log.log_and_raise_error(("Failed to replace {0} values "
  250. "in {1}. Exit with error {2}"
  251. .format(default_str, column, e)))
  252. self.log.info("Replaced {} values".format(default_str))
  253. return data
  254. def replace_default_values(self, data: pd.DataFrame) -> pd.DataFrame:
  255. '''
  256. '''
  257. return self._replace_values(data=data, default=True)
  258. def map_equal_values(self, data: pd.DataFrame) -> pd.DataFrame:
  259. '''
  260. '''
  261. return self._replace_values(data=data, default=False)
  262. def convert_types(self, data: pd.DataFrame) -> pd.DataFrame:
  263. '''
  264. '''
  265. self._assert_dataframe_input(data)
  266. for column, python_type in self._python_types.items():
  267. try:
  268. if column not in data.columns:
  269. continue
  270. elif column in self._date_formats:
  271. data[column] = CleaningUtils.convert_dates(
  272. series=data[column],
  273. formats=self._date_formats[column])
  274. elif (python_type == int) and data[column].isnull().any():
  275. self.log.log_and_raise_error(("Column {} contains missing values "
  276. "and cannot be of integer type"
  277. .format(column)))
  278. elif python_type == bool:
  279. accepted_bool = {'ja': True, 'j': True, '1': True, 1: True,
  280. 'yes': True, 'y': True, 'true':True,
  281. 't': True, 'nein': False, 'n': False,
  282. 'no': False, 'false': False, 'f': False,
  283. '0': False, 0: False}
  284. data[column] = data[column].map(accepted_bool)
  285. data[column] = data[column].astype(bool)
  286. elif python_type == str:
  287. # might not be the smoothes solution but it works
  288. python_type = str
  289. data = data.copy(deep=True)
  290. data[column] = data[column].astype(python_type)
  291. python_type = object
  292. data[column] = data[column].astype(python_type)
  293. elif python_type == float:
  294. data[column] = data[column].fillna(np.inf)
  295. # Replaces empty fields when type is string
  296. if data[column].dtypes == object:
  297. data[column] = data[column].replace(r'^\s*$', str(np.inf), regex=True)
  298. data[column] = data[column].astype(python_type)
  299. else:
  300. data = data.copy(deep=True)
  301. data[column] = data[column].astype(python_type)
  302. if data[column].dtype != python_type:
  303. self.log.warning(("After conversion type in {0} "
  304. "should be {1} "
  305. "but is still {2}"
  306. .format(column,
  307. python_type,
  308. data[column].dtype)))
  309. except Exception as e:
  310. self.log.log_and_raise_error(("Failed to convert types in {0}. "
  311. "Exit with error {1}"
  312. .format(column, e)))
  313. self.log.info("Converted dtypes")
  314. return data
  315. def filter_invalid_missing_values(self, data: pd.DataFrame) -> pd.DataFrame:
  316. '''
  317. '''
  318. self._assert_dataframe_input(data)
  319. for column in data.columns:
  320. if (column in self._required_fields) and\
  321. (data[column].isnull().any()):
  322. invalid_mask = data[column].isnull()
  323. reason = "Null value in the required field {}"\
  324. .format(column)
  325. data = self._filter_invalid_data(data=data,
  326. invalid_mask=invalid_mask,
  327. reason=reason)
  328. return data
  329. def filter_invalid_types(self, data: pd.DataFrame) -> pd.DataFrame():
  330. '''
  331. '''
  332. self._assert_dataframe_input(data)
  333. for column in data.columns:
  334. if column not in self._python_types:
  335. continue
  336. python_type = self._python_types[column]
  337. #Needs to be done since coumn dtype of strings is a object
  338. if python_type == str:
  339. python_type = object
  340. if data[column].dtype != python_type:
  341. def mismatch_type(x):
  342. return type(x) != python_type
  343. invalid_mask = data[column].apply(mismatch_type)
  344. reason = "Type mismatch in field {}".format(column)
  345. data = self._filter_invalid_data(data=data,
  346. invalid_mask=invalid_mask,
  347. reason=reason)
  348. return data
  349. def filter_invalid_patterns(self, data: pd.DataFrame) -> pd.DataFrame:
  350. '''
  351. '''
  352. self._assert_dataframe_input(data)
  353. for column in data.columns:
  354. if column not in self._patterns:
  355. continue
  356. pattern = self._patterns[column]
  357. invalid_mask = (~data[column].astype(str).str.match(pattern))
  358. reason = "Pattern mismatch in field {0}. Pattern: {1}Example: {2}"\
  359. .format(column,pattern,data.iloc[0][column])
  360. data = self._filter_invalid_data(data=data,
  361. invalid_mask=invalid_mask,
  362. reason=reason)
  363. return data
  364. def filter_invalid_values(self, data: pd.DataFrame) -> pd.DataFrame:
  365. '''
  366. '''
  367. for column in data.columns:
  368. if column in self._minimum_values:
  369. min_value = self._minimum_values[column]
  370. invalid_mask = data[column] > min_value
  371. reason = "Too large values in field {}".format(column)
  372. data = self._filter_invalid_data(data=data,
  373. invalid_mask=invalid_mask,
  374. reason=reason)
  375. elif column in self._maximum_values:
  376. max_value = self._maximum_values[column]
  377. invalid_mask = data[column] < max_value
  378. reason = "Too small values in field {}".format(column)
  379. data = self._filter_invalid_data(data=data,
  380. invalid_mask=invalid_mask,
  381. reason=reason)
  382. elif column in self._allowed_values:
  383. allowed_values = self._allowed_values[column]
  384. invalid_mask = (~data[column].isin(allowed_values))
  385. not_allowed_examples = data.loc[invalid_mask, column].unique()[:3]
  386. reason = "Not allowed values {0}... in field {1}"\
  387. .format(not_allowed_examples, column)
  388. data = self._filter_invalid_data(data=data,
  389. invalid_mask=invalid_mask,
  390. reason=reason)
  391. else:
  392. continue
  393. return data
  394. def drop_columns_with_no_content(self, data: pd.DataFrame) -> pd.DataFrame():
  395. '''
  396. '''
  397. data = data.dropna(how ='all', axis='columns')
  398. for column in data.columns:
  399. unique_values = data[column].unique()
  400. no_content_signs = [None, '-', 'n.a']
  401. intersection = list(set(unique_values) & set(no_content_signs))
  402. if len(intersection) - len(unique_values) == 0:
  403. data = data.drop(columns=[column])
  404. return data
  405. def clean_json_from_None_object(self, data: pd.DataFrame, clean_bool: bool = True) -> pd.DataFrame():
  406. data = data.to_json(date_format="iso")
  407. data = json.loads(data)
  408. new_data = remap(data, lambda p, k, v: v is not None)
  409. new_data = remap(new_data, lambda p, k, v: v != 'None')
  410. new_data = remap(new_data, lambda p, k, v: v != 'inf')
  411. # cleans not only bool type also int which are 0 or 1
  412. # only use if it is necessary have to be change that it only considers
  413. # Ture and False for bools
  414. if clean_bool:
  415. new_data = remap(new_data, lambda p, k, v: (isinstance(v,bool) or (not isinstance(v,bool) and bool(v))))
  416. return new_data
  417. def restrict_to_collection(self, data: pd.DataFrame, collection_name: str) -> pd.DataFrame:
  418. '''
  419. '''
  420. mongo_fields = self._schema_parser.get_fields_restricted_to_collection(collection_name=collection_name)
  421. mapping_fields = self._mapping_parser.get_fields_restricted_to_collection(collection_name=collection_name)
  422. return data[[c for c in data.columns if (c in mapping_fields) or (c in mongo_fields)]]
  423. def map_toleranzen_values(self, data: pd.DataFrame, toleranzen: pd.DataFrame):
  424. toleranzen.drop('nr', axis=1, inplace=True)
  425. toleranzen.columns = ['toleranzbez_wellen_reference', 'toleranzbez_innenring_reference', 'wellenschenkel.geometrie.durchmesser.min', 'wellenschenkel.geometrie.durchmesser.max', 'innenring.geometrie.durchmesser.min',
  426. 'innenring.geometrie.durchmesser.max', 'wellenschenkel_innenring_difference.geometrie.durchmesser.min', 'wellenschenkel_innenring_difference.geometrie.durchmesser.max']
  427. labyrinten_drop_columns = ['innenring.geometrie.durchmesser.min', 'innenring.geometrie.durchmesser.max',
  428. 'wellenschenkel_innenring_difference.geometrie.durchmesser.min', 'wellenschenkel_innenring_difference.geometrie.durchmesser.max']
  429. labyrinten_columns= ['toleranzbez_wellen_reference', 'toleranzbez_innenring_reference', 'labyrinthring.geometrie.durchmesser.min', 'labyrinthring.geometrie.durchmesser.max']
  430. reparatur_stufe_labyrinten_columns= ['toleranzbez_wellen_reference', 'toleranzbez_innenring_reference', 'labyrinthring.reparatur_stufe.durchmesser.min', 'labyrinthring.reparatur_stufe.durchmesser.max']
  431. reparatur_stufe_columns = ['toleranzbez_wellen_reference', 'toleranzbez_innenring_reference', 'wellenschenkel.reparatur_stufe.durchmesser.min',
  432. 'wellenschenkel.reparatur_stufe.durchmesser.max', 'innenring.reparatur_stufe.durchmesser.min',
  433. 'innenring.reparatur_stufe.durchmesser.max', 'wellenschenkel_innenring_difference.reparatur_stufe.durchmesser.min',
  434. 'wellenschenkel_innenring_difference.reparatur_stufe.durchmesser.max']
  435. toleranzen_reference_columns = ['wellenschenkel_toleranz', 'labyrinthring_toleranz', 'wellen_reparatur_stufe_toleranz', 'labyrinthring_reparatur_stufe_toleranz']
  436. available_columns = [column for column in data.columns if column in toleranzen_reference_columns]
  437. for column in available_columns:
  438. merge_map = [False] *len(data.index)
  439. if 'toleranz' in column:
  440. temp_toleranzen = deepcopy(toleranzen)
  441. if 'labyrinthring' in column:
  442. temp_toleranzen.drop(labyrinten_drop_columns, axis=1, inplace=True)
  443. if 'reparatur_stufe' in column:
  444. temp_toleranzen.columns = reparatur_stufe_labyrinten_columns
  445. merge_map = data['labyrinthting_reparatur_stufe_zulaessig'] == 'Nein'
  446. else:
  447. temp_toleranzen.columns = labyrinten_columns
  448. elif 'reparatur_stufe' in column:
  449. temp_toleranzen.columns = reparatur_stufe_columns
  450. merge_map = data['innenring_reparatur_stufe_zulaessig'] == 'Ja'
  451. data_before = len(data.index)
  452. data = data.merge(temp_toleranzen, how='left', left_on=column, right_on='toleranzbez_wellen_reference')
  453. data.loc[merge_map, temp_toleranzen.columns] = np.nan
  454. if data_before != len(data.index):
  455. print('WEVE LOST DATA!!')
  456. print('before:', data_before, 'now:', len(data.index))
  457. sys.exit()
  458. data.drop(['toleranzbez_wellen_reference', 'toleranzbez_innenring_reference'], axis=1, inplace=True)
  459. return data
  460. def label_is_level(
  461. self,
  462. data: pd.DataFrame,
  463. column: str = "is",
  464. include_schrott: bool = False,
  465. drop_rows_with_no_is: bool = False) -> pd.DataFrame:
  466. '''
  467. '''
  468. is_level_mapping = {0: None,
  469. 1: ["IS1"],
  470. 2: ["IS1L", "IL", "IS1 + IL"],
  471. 3: ["IS2"],
  472. 4: ["IS3"]}
  473. for k, v in is_level_mapping.items():
  474. if v is not None:
  475. data.loc[data[column].isin(v), column] = k
  476. else:
  477. data.loc[data[column].isnull(), column] = k
  478. if include_schrott and ("operation_type_2" in data.columns):
  479. schrott_mask = (data["operation_type_2"] == 2)
  480. data.loc[schrott_mask, column] = 5
  481. data.loc[~data[column].isin([0,1,2,3,4,5]), column] = 0
  482. if drop_rows_with_no_is:
  483. data = data.loc[data[column] != 0].copy(deep=True)
  484. return data.reset_index(drop=True)
  485. if __name__ == "__main__":
  486. # testing
  487. from cdplib.db_handlers.SQLHandler import SQLHandler
  488. mapping_path = os.path.join(".", "migration_mappings", "rs1_mapping.json")
  489. schema_paths = [
  490. os.path.join(".", "mongo_schema", "schema_wheelsets.json"),
  491. os.path.join(".", "mongo_schema", "schema_process_instances.json")]
  492. inconsist_report_table = "test_inconsist_report_rs1"
  493. if all([os.path.isfile(p) for p in schema_paths + [mapping_path]]):
  494. cleaner = MigrationCleaning(
  495. mapping_paths=mapping_path,
  496. schema_paths=schema_paths,
  497. mapping_source="internal_name",
  498. mapping_target="mongo_name",
  499. filter_index_columns=["radsatznummer"],
  500. inconsist_report_table=inconsist_report_table)
  501. db = SQLHandler()
  502. data = db.read_sql_to_dataframe("select * from rs1 limit 100")
  503. data = cleaner.replace_default_values(data)
  504. data = cleaner.map_equal_values(data)
  505. data = cleaner.convert_types(data)
  506. non_filtered_len = len(data)
  507. data = cleaner.filter_invalid_types(data)
  508. if len(data) < non_filtered_len:
  509. data = cleaner.convert_types(data)
  510. data = cleaner.filter_invalid_null_values(data)
  511. data = cleaner.filter_invalid_patterns(data)
  512. data = cleaner.filter_notallowed_values(data)
  513. print("Done!")