#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Wed Sep 25 08:09:52 2019 @author: tanya """ import os import sys import pandas as pd import numpy as np import gc sys.path.append(os.getcwd()) from libraries.db_migration.ParseMapping import ParseMapping from libraries.db_migration.ParseJsonSchema import ParseJsonSchema from libraries.utils.ClassLogging import ClassLogging from libraries.utils.CleaningUtils import CleaningUtils class MigrationCleaning(ClassLogging): ''' Class for correcting and filtering the incorrect data. We keep the correcting and the filtering methods separated, since there might be other custom steps in between. ''' def __init__(self, mapping_path: str, schema_paths: (str, list), inconsist_report_table: str = None, filter_index_columns: (str, list) = None, mapping_source: str = "internal_name", mapping_target: str = "mongo_name", mapping_parser: type = ParseMapping, schema_parser: type = ParseJsonSchema, log_name: str = "MigrationCleaning"): ''' ''' super().__init__(log_name=log_name) assert isinstance(inconsist_report_table, str),\ "Inconsistent report table should be a tablename string" self._inconsist_report_table = inconsist_report_table assert isinstance(filter_index_columns, (str, list)),\ "Filter index columns must be a str or a list" self._filter_index_columns = list(filter_index_columns) self._schema_parser = schema_parser(schema_paths) self._mapping_parser = mapping_parser(mapping_path, source=mapping_source, target=mapping_target) self._mapping_path = mapping_path self._schema_paths = schema_paths def _assert_dataframe_input(self, data: pd.DataFrame): ''' ''' assert(isinstance(data, pd.DataFrame)),\ "Parameter 'data' must be a pandas dataframe" @property def _field_mapping(self): ''' ''' return self._mapping_parser.get_field_mapping() @property def _required_fields(self): ''' ''' source_required_fields = self._mapping_parser.get_required_fields() target_required_fields = self._schema_parser.get_required_fields() for source_field, target_field in self._field_mapping.items(): if (target_field in target_required_fields) and\ (source_field not in source_required_fields): source_required_fields.append(source_field) return source_required_fields @property def _default_values(self): ''' ''' default_values = {} target_default_values = self._schema_parser.get_default_values() source_default_values = self._mapping_parser.get_default_values() for source_field, target_field in self._field_mapping.items(): if source_field not in source_default_values: continue elif target_field not in target_default_values: target_default_values[target_field] = np.nan default_values[source_field] = { target_default_values[target_field]: source_default_values[source_field] } return default_values @property def _python_types(self): ''' ''' target_types = self._schema_parser.get_python_types() result = {} for source_field, target_field in self._field_mapping.items(): if target_field in target_types: result[source_field] = target_types[target_field] """ date_type_mismatch =\ (target_field in target_types) and\ (source_field in source_types) and\ (target_types[target_field] == str) and\ (source_types[source_field] == np.dtype(' pd.DataFrame: ''' ''' from libraries.db_handlers.SQLHandler import SQLHandler assert((self._inconsist_report_table is not None) and (self._filter_index_columns is not None)),\ "Inconsistent report table or filter index is not provided" self._assert_dataframe_input(data) data = data.copy(deep=True) db = SQLHandler() if invalid_mask.sum() == 0: return data data_inconsist = data.assign(reason=reason)\ .loc[invalid_mask]\ .reset_index(drop=True) db.append_to_table(data=data_inconsist, tablename=self._inconsist_report_table) n_rows_filtered = len(data_inconsist) n_instances_filtered = len(data_inconsist[self._filter_index_columns].drop_duplicates()) del data_inconsist gc.collect() self._log.warning(("Filtering: {0} ." "Filtered {1} rows " "and {2} instances" .format(reason, n_rows_filtered, n_instances_filtered))) nok_index_data = data.loc[invalid_mask, self._filter_index_columns]\ .drop_duplicates().reset_index(drop=True) nok_index = pd.MultiIndex.from_arrays([nok_index_data[c] for c in self._filter_index_columns]) all_index = pd.MultiIndex.from_arrays([data[c] for c in self._filter_index_columns]) data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True) return data def _replace_values(self, data: pd.DataFrame, default: bool) -> pd.DataFrame: ''' ''' if default: default_str = "default" else: default_str = "equal" self._assert_dataframe_input(data) data = data.copy(deep=True) if default: mapping = self._default_values else: mapping = self._value_mappings for column, d in mapping.items(): try: if column not in data.columns: continue dtype = data[column].dtype for key, values in d.items(): if not default: mask = (data[column].astype(str).isin(values)) else: mask = (data[column].isin(values)) if default: mask = mask | (data[column].isnull()) data.loc[mask, column] = key data[column] = data[column].astype(dtype) except Exception as e: self.log_and_raise(("Failed to replace {0} values " "in {1}. Exit with error {2}" .format(default_str, column, e))) self._log.info("Replaced {} values".format(default_str)) return data def replace_default_values(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' return self._replace_values(data=data, default=True) def map_equal_values(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' return self._replace_values(data=data, default=False) def convert_types(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' self._assert_dataframe_input(data) for column, python_type in self._python_types.items(): try: if column not in data.columns: continue elif column in self._date_formats: data[column] = CleaningUtils.convert_dates( series=data[column], formats=self._date_formats[column]) elif (python_type == int) and data[column].isnull().any(): self.log_and_raise(("Column {} contains missing values " "and cannot be of integer type" .format(column))) elif python_type == str: python_type = object else: data[column] = data[column].astype(python_type) if data[column].dtype != python_type: self._log.warning(("After conversion type in {0} " "should be {1} " "but is still {2}" .format(column, python_type, data[column].dtype))) except Exception as e: self.log_and_raise(("Failed to convert types in {0}. " "Exit with error {1}" .format(column, e))) self._log.info("Converted dtypes") return data def filter_invalid_null_values(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' self._assert_dataframe_input(data) for column in data.columns: if (column in self._required_fields) and\ (data[column].isnull().any()): invalid_mask = data[column].isnull() reason = "Null value in the required field {}"\ .format(column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) return data def filter_invalid_types(self, data: pd.DataFrame) -> pd.DataFrame(): ''' ''' self._assert_dataframe_input(data) for column, python_type in self._python_types.items(): if data[column].dtype != python_type: def mismatch_type(x): return type(x) != python_type invalid_mask = data[column].apply(mismatch_type) reason = "Type mismatch if field {}".format(column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) return data def filter_invalid_patterns(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' self._assert_dataframe_input(data) for column, pattern in self._patterns: invalid_mask = (~data[column].astype(str).str.match(pattern)) reason = "Pattern mismatch in field {}".format(column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) return data def filter_notallowed_values(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' for column, value in self._minimum_values.items(): invalid_mask = data[column] > value reason = "Too large values in field {}".format(column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) for column, value in self._maximum_values.items(): invalid_mask = data[column] < value reason = "Too small values in field {}".format(column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) for column, allowed_values in self._allowed_values.items(): invalid_mask = (~data[column].isin(allowed_values)) reason = "Too small values in field {}".format(column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) return data if __name__ == "__main__": # testing from libraries.db_handlers.SQLHandler import SQLHandler mapping_path = os.path.join(".", "migration_mappings", "rs1_mapping.json") schema_paths = [ os.path.join(".", "mongo_schema", "schema_wheelsets.json"), os.path.join(".", "mongo_schema", "schema_process_instances.json")] inconsist_report_table = "test_inconsist_report_rs1" if all([os.path.isfile(p) for p in schema_paths + [mapping_path]]): print("Found schemas!") cleaner = MigrationCleaning( mapping_path=mapping_path, schema_paths=schema_paths, mapping_source="internal_name", mapping_target="mongo_name", filter_index_columns=["radsatznummer"], inconsist_report_table=inconsist_report_table) db = SQLHandler() data = db.read_sql_to_dataframe("select * from rs1 limit 100") data = cleaner.replace_default_values(data) data = cleaner.map_equal_values(data) data = cleaner.convert_types(data) non_filtered_len = len(data) data = cleaner.filter_invalid_types(data) if len(data) < non_filtered_len: data = cleaner.convert_types(data) data = cleaner.filter_invalid_null_values(data) data = cleaner.filter_invalid_patterns(data) data = cleaner.filter_notallowed_values(data) print("Done!")