123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Wed Sep 25 08:09:52 2019
- @author: tanya
- """
- import os
- import sys
- import pandas as pd
- import numpy as np
- import gc
- sys.path.append(os.getcwd())
- from libraries.db_migration.ParseMapping import ParseMapping
- from libraries.db_migration.ParseJsonSchema import ParseJsonSchema
- from libraries.utils.ClassLogging import ClassLogging
- from libraries.utils.CleaningUtils import CleaningUtils
- class MigrationCleaning(ClassLogging):
- '''
- Class for correcting and filtering the incorrect data.
- We keep the correcting and the filtering methods separated,
- since there might be other custom steps in between.
- '''
- def __init__(self, mapping_path: str,
- schema_paths: (str, list),
- inconsist_report_table: str = None,
- filter_index_columns: (str, list) = None,
- mapping_source: str = "internal_name",
- mapping_target: str = "mongo_name",
- mapping_parser: type = ParseMapping,
- schema_parser: type = ParseJsonSchema,
- log_name: str = "MigrationCleaning"):
- '''
- '''
- super().__init__(log_name=log_name)
- assert isinstance(inconsist_report_table, str),\
- "Inconsistent report table should be a tablename string"
- self._inconsist_report_table = inconsist_report_table
- assert isinstance(filter_index_columns, (str, list)),\
- "Filter index columns must be a str or a list"
- self._filter_index_columns = list(filter_index_columns)
- self._schema_parser = schema_parser(schema_paths)
- self._mapping_parser = mapping_parser(mapping_path,
- source=mapping_source,
- target=mapping_target)
- self._mapping_path = mapping_path
- self._schema_paths = schema_paths
- def _assert_dataframe_input(self, data: pd.DataFrame):
- '''
- '''
- assert(isinstance(data, pd.DataFrame)),\
- "Parameter 'data' must be a pandas dataframe"
- @property
- def _field_mapping(self):
- '''
- '''
- return self._mapping_parser.get_field_mapping()
- @property
- def _required_fields(self):
- '''
- '''
- source_required_fields = self._mapping_parser.get_required_fields()
- target_required_fields = self._schema_parser.get_required_fields()
- for source_field, target_field in self._field_mapping.items():
- if (target_field in target_required_fields) and\
- (source_field not in source_required_fields):
- source_required_fields.append(source_field)
- return source_required_fields
- @property
- def _default_values(self):
- '''
- '''
- default_values = {}
- target_default_values = self._schema_parser.get_default_values()
- source_default_values = self._mapping_parser.get_default_values()
- for source_field, target_field in self._field_mapping.items():
- if source_field not in source_default_values:
- continue
- elif target_field not in target_default_values:
- target_default_values[target_field] = np.nan
- default_values[source_field] = {
- target_default_values[target_field]:
- source_default_values[source_field]
- }
- return default_values
- @property
- def _python_types(self):
- '''
- '''
- target_types = self._schema_parser.get_python_types()
- result = {}
- for source_field, target_field in self._field_mapping.items():
- if target_field in target_types:
- result[source_field] = target_types[target_field]
- """
- date_type_mismatch =\
- (target_field in target_types) and\
- (source_field in source_types) and\
- (target_types[target_field] == str) and\
- (source_types[source_field] == np.dtype('<M8[ns]'))
- if date_type_mismatch:
- target_types[target_field] = np.dtype('<M8[ns]')
- if (source_field in source_types) and\
- (target_field in target_types) and\
- (target_types[target_field] != source_types[source_field]):
- self.log_and_raise(("Type {0} of field {1} "
- "in schema does not match "
- "type {2} of field {3} in "
- "migration mapping")
- .format(target_types[target_field],
- target_field,
- source_types[source_field],
- source_field))
- if target_field in target_types:
- source_types[source_field] = target_types[target_field]
- """
- return result
- @property
- def _value_mappings(self):
- '''
- '''
- return self._mapping_parser.get_value_mappings()
- @property
- def _date_formats(self):
- '''
- '''
- return self._mapping_parser.get_date_formats()
- def _get_mongo_schema_info(self, method_name: str):
- '''
- '''
- result = {}
- target_dict = getattr(self._schema_parser, method_name)()
- for source_field, target_field in self._field_mapping.items():
- if target_field in target_dict:
- result[source_field] = target_dict[target_field]
- return result
- @property
- def _allowed_values(self):
- '''
- '''
- return self._get_mongo_schema_info("get_allowed_values")
- @property
- def _minimum_values(self):
- '''
- '''
- return self._get_mongo_schema_info("get_minimum_value")
- @property
- def _maximum_values(self):
- '''
- '''
- return self._get_mongo_schema_info("get_maximum_value")
- @property
- def _patterns(self):
- '''
- '''
- return self._get_mongo_schema_info("get_patterns")
- def _filter_invalid_data(self, data: pd.DataFrame,
- invalid_mask: pd.Series,
- reason: (str, pd.Series)) -> pd.DataFrame:
- '''
- '''
- from libraries.db_handlers.SQLHandler import SQLHandler
- assert((self._inconsist_report_table is not None) and
- (self._filter_index_columns is not None)),\
- "Inconsistent report table or filter index is not provided"
- self._assert_dataframe_input(data)
- data = data.copy(deep=True)
- db = SQLHandler()
- if invalid_mask.sum() == 0:
- return data
- data_inconsist = data.assign(reason=reason)\
- .loc[invalid_mask]\
- .reset_index(drop=True)
- db.append_to_table(data=data_inconsist,
- tablename=self._inconsist_report_table)
- n_rows_filtered = len(data_inconsist)
- n_instances_filtered = len(data_inconsist[self._filter_index_columns].drop_duplicates())
- del data_inconsist
- gc.collect()
- self._log.warning(("Filtering: {0} ."
- "Filtered {1} rows "
- "and {2} instances"
- .format(reason, n_rows_filtered, n_instances_filtered)))
- nok_index_data = data.loc[invalid_mask, self._filter_index_columns]\
- .drop_duplicates().reset_index(drop=True)
- nok_index = pd.MultiIndex.from_arrays([nok_index_data[c] for c in
- self._filter_index_columns])
- all_index = pd.MultiIndex.from_arrays([data[c] for c in
- self._filter_index_columns])
- data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True)
- return data
- def _replace_values(self, data: pd.DataFrame,
- default: bool) -> pd.DataFrame:
- '''
- '''
- if default:
- default_str = "default"
- else:
- default_str = "equal"
- self._assert_dataframe_input(data)
- data = data.copy(deep=True)
- if default:
- mapping = self._default_values
- else:
- mapping = self._value_mappings
- for column, d in mapping.items():
- try:
- if column not in data.columns:
- continue
- dtype = data[column].dtype
- for key, values in d.items():
- if not default:
- mask = (data[column].astype(str).isin(values))
- else:
- mask = (data[column].isin(values))
- if default:
- mask = mask | (data[column].isnull())
- data.loc[mask, column] = key
- data[column] = data[column].astype(dtype)
- except Exception as e:
- self.log_and_raise(("Failed to replace {0} values "
- "in {1}. Exit with error {2}"
- .format(default_str, column, e)))
- self._log.info("Replaced {} values".format(default_str))
- return data
- def replace_default_values(self, data: pd.DataFrame) -> pd.DataFrame:
- '''
- '''
- return self._replace_values(data=data, default=True)
- def map_equal_values(self, data: pd.DataFrame) -> pd.DataFrame:
- '''
- '''
- return self._replace_values(data=data, default=False)
- def convert_types(self, data: pd.DataFrame) -> pd.DataFrame:
- '''
- '''
- self._assert_dataframe_input(data)
- for column, python_type in self._python_types.items():
- try:
- if column not in data.columns:
- continue
- elif column in self._date_formats:
- data[column] = CleaningUtils.convert_dates(
- series=data[column],
- formats=self._date_formats[column])
- elif (python_type == int) and data[column].isnull().any():
- self.log_and_raise(("Column {} contains missing values "
- "and cannot be of integer type"
- .format(column)))
- elif python_type == str:
- python_type = object
- else:
- data[column] = data[column].astype(python_type)
- if data[column].dtype != python_type:
- self._log.warning(("After conversion type in {0} "
- "should be {1} "
- "but is still {2}"
- .format(column,
- python_type,
- data[column].dtype)))
- except Exception as e:
- self.log_and_raise(("Failed to convert types in {0}. "
- "Exit with error {1}"
- .format(column, e)))
- self._log.info("Converted dtypes")
- return data
- def filter_invalid_null_values(self, data: pd.DataFrame) -> pd.DataFrame:
- '''
- '''
- self._assert_dataframe_input(data)
- for column in data.columns:
- if (column in self._required_fields) and\
- (data[column].isnull().any()):
- invalid_mask = data[column].isnull()
- reason = "Null value in the required field {}"\
- .format(column)
- data = self._filter_invalid_data(data=data,
- invalid_mask=invalid_mask,
- reason=reason)
- return data
- def filter_invalid_types(self, data: pd.DataFrame) -> pd.DataFrame():
- '''
- '''
- self._assert_dataframe_input(data)
- for column, python_type in self._python_types.items():
- if data[column].dtype != python_type:
- def mismatch_type(x):
- return type(x) != python_type
- invalid_mask = data[column].apply(mismatch_type)
- reason = "Type mismatch if field {}".format(column)
- data = self._filter_invalid_data(data=data,
- invalid_mask=invalid_mask,
- reason=reason)
- return data
- def filter_invalid_patterns(self, data: pd.DataFrame) -> pd.DataFrame:
- '''
- '''
- self._assert_dataframe_input(data)
- for column, pattern in self._patterns:
- invalid_mask = (~data[column].astype(str).str.match(pattern))
- reason = "Pattern mismatch in field {}".format(column)
- data = self._filter_invalid_data(data=data,
- invalid_mask=invalid_mask,
- reason=reason)
- return data
- def filter_notallowed_values(self, data: pd.DataFrame) -> pd.DataFrame:
- '''
- '''
- for column, value in self._minimum_values.items():
- invalid_mask = data[column] > value
- reason = "Too large values in field {}".format(column)
- data = self._filter_invalid_data(data=data,
- invalid_mask=invalid_mask,
- reason=reason)
- for column, value in self._maximum_values.items():
- invalid_mask = data[column] < value
- reason = "Too small values in field {}".format(column)
- data = self._filter_invalid_data(data=data,
- invalid_mask=invalid_mask,
- reason=reason)
- for column, allowed_values in self._allowed_values.items():
- invalid_mask = (~data[column].isin(allowed_values))
- reason = "Too small values in field {}".format(column)
- data = self._filter_invalid_data(data=data,
- invalid_mask=invalid_mask,
- reason=reason)
- return data
- if __name__ == "__main__":
- # testing
- from libraries.db_handlers.SQLHandler import SQLHandler
- mapping_path = os.path.join(".", "migration_mappings", "rs1_mapping.json")
- schema_paths = [
- os.path.join(".", "mongo_schema", "schema_wheelsets.json"),
- os.path.join(".", "mongo_schema", "schema_process_instances.json")]
- inconsist_report_table = "test_inconsist_report_rs1"
- if all([os.path.isfile(p) for p in schema_paths + [mapping_path]]):
- print("Found schemas!")
- cleaner = MigrationCleaning(
- mapping_path=mapping_path,
- schema_paths=schema_paths,
- mapping_source="internal_name",
- mapping_target="mongo_name",
- filter_index_columns=["radsatznummer"],
- inconsist_report_table=inconsist_report_table)
- db = SQLHandler()
- data = db.read_sql_to_dataframe("select * from rs1 limit 100")
- data = cleaner.replace_default_values(data)
- data = cleaner.map_equal_values(data)
- data = cleaner.convert_types(data)
- non_filtered_len = len(data)
- data = cleaner.filter_invalid_types(data)
- if len(data) < non_filtered_len:
- data = cleaner.convert_types(data)
- data = cleaner.filter_invalid_null_values(data)
- data = cleaner.filter_invalid_patterns(data)
- data = cleaner.filter_notallowed_values(data)
- print("Done!")
|