#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Wed Sep 25 08:09:52 2019 @author: tanya """ import os import sys import pandas as pd import numpy as np import gc sys.path.append(os.getcwd()) from cdplib.db_migration.ParseMapping import ParseMapping from cdplib.db_migration.ParseJsonSchema import ParseJsonSchema from cdplib.utils.ExceptionsHandler import ExceptionsHandler from cdplib.utils.CleaningUtils import CleaningUtils from cdplib.log import Log class MigrationCleaning: ''' Class for correcting and filtering the incorrect data. We keep the correcting and the filtering methods separated, since there might be other custom steps in between. ''' def __init__(self, mapping_path: str, schema_paths: (str, list), inconsist_report_table: str = None, filter_index_columns: (str, list) = None, mapping_source_name_tag: str = "internal_name", mapping_target_name_tag: str = "mongo_name", target_collection_name: str = None, mapping_parser: type = ParseMapping, schema_parser: type = ParseJsonSchema): ''' ''' self.log = Log('Migration Cleaning') self._exception_handler = ExceptionsHandler() if inconsist_report_table is not None: assert isinstance(inconsist_report_table, str),\ "Inconsistent report table should be a tablename string" self._inconsist_report_table = inconsist_report_table if filter_index_columns is not None: assert isinstance(filter_index_columns, (str, list)),\ "Filter index columns must be a str or a list" self._filter_index_columns = list(filter_index_columns) else: self._filter_index_columns = None self._mapping_parser = mapping_parser(mapping_path, source_name_tag=mapping_source_name_tag, target_name_tag=mapping_target_name_tag, target_collection_name=target_collection_name) if target_collection_name is not None: schema_names = [os.path.basename(schema_path) for schema_path in schema_paths] convention_schema_name = "schema_" + target_collection_name + ".json" if convention_schema_name not in schema_names: self._log.log_and_raise_warning("Found no matching of the collection name {0} in schema paths {1}" .format(target_collection_name, schema_paths)) else: self._schema_parser = schema_parser(schema_paths[schema_names.index(convention_schema_name)]) else: self._schema_parser = schema_parser(schema_paths) self._mapping_path = mapping_path self._schema_paths = schema_paths from cdplib.db_handlers.SQLHandler import SQLHandlerPool self._sql_db = SQLHandlerPool(20) def _assert_dataframe_input(self, data: pd.DataFrame): ''' ''' assert(isinstance(data, pd.DataFrame)),\ "Parameter 'data' must be a pandas dataframe" @property def _field_mapping(self, collection_name: str = None): ''' ''' return self._mapping_parser.get_field_mapping() @property def _required_fields(self): ''' ''' source_required_fields = self._mapping_parser.get_required_fields() target_required_fields = self._schema_parser.get_required_fields() for source_field, target_field in self._field_mapping.items(): if (target_field in target_required_fields) and\ (source_field not in source_required_fields): source_required_fields.append(source_field) return source_required_fields @property def _default_values(self): ''' ''' default_values = {} target_default_values = self._schema_parser.get_default_values() source_default_values = self._mapping_parser.get_default_values() for source_field, target_field in self._field_mapping.items(): if source_field not in source_default_values: continue elif target_field not in target_default_values: target_default_values[target_field] = np.nan default_values[source_field] = { target_default_values[target_field]: source_default_values[source_field] } return default_values @property def _python_types(self): ''' ''' target_types = self._schema_parser.get_python_types() result = {} for source_field, target_field in self._field_mapping.items(): if target_field in target_types: result[source_field] = target_types[target_field] """ date_type_mismatch =\ (target_field in target_types) and\ (source_field in source_types) and\ (target_types[target_field] == str) and\ (source_types[source_field] == np.dtype(' pd.DataFrame: ''' ''' assert((self._inconsist_report_table is not None) and (self._filter_index_columns is not None)),\ "Inconsistent report table or filter index is not provided" self._assert_dataframe_input(data) data = data.copy(deep=True) #db = self._sql_db.aquire() from cdplib.db_handlers.SQLHandler import SQLHandler db = SQLHandler() if invalid_mask.sum() == 0: #self._sql_db.release(db) return data data_inconsist = data.assign(reason=reason)\ .loc[invalid_mask]\ .reset_index(drop=True) if db.check_if_table_exists(self._inconsist_report_table): columns = db.get_column_names(tablename=self._inconsist_report_table) if len(columns) > 0: data_inconsist = data_inconsist[columns] db.append_to_table(data=data_inconsist, tablename=self._inconsist_report_table) n_rows_filtered = len(data_inconsist) n_instances_filtered = len(data_inconsist[self._filter_index_columns].drop_duplicates()) del data_inconsist gc.collect() self.log.warning(("Filtering: {0} ." "Filtered {1} rows " "and {2} instances" .format(reason, n_rows_filtered, n_instances_filtered))) nok_index_data = data.loc[invalid_mask, self._filter_index_columns]\ .drop_duplicates().reset_index(drop=True) nok_index = pd.MultiIndex.from_arrays([nok_index_data[c] for c in self._filter_index_columns]) all_index = pd.MultiIndex.from_arrays([data[c] for c in self._filter_index_columns]) data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True) #self._sql_db.release(db) return data def _replace_values(self, data: pd.DataFrame, default: bool) -> pd.DataFrame: ''' ''' if default: default_str = "default" else: default_str = "equal" self._assert_dataframe_input(data) data = data.copy(deep=True) if default: mapping = self._default_values else: mapping = self._value_mappings for column, d in mapping.items(): try: if column not in data.columns: continue dtype = data[column].dtype for key, values in d.items(): if not default: mask = (data[column].astype(str).isin(values)) else: mask = (data[column].isin(values)) if default: mask = mask | (data[column].isnull()) data.loc[mask, column] = key data[column] = data[column].astype(dtype) except Exception as e: self.log.log_and_raise_error(("Failed to replace {0} values " "in {1}. Exit with error {2}" .format(default_str, column, e))) self.log.info("Replaced {} values".format(default_str)) return data def replace_default_values(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' return self._replace_values(data=data, default=True) def map_equal_values(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' return self._replace_values(data=data, default=False) def convert_types(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' self._assert_dataframe_input(data) for column, python_type in self._python_types.items(): try: if column not in data.columns: continue elif column in self._date_formats: data[column] = CleaningUtils.convert_dates( series=data[column], formats=self._date_formats[column]) elif (python_type == int) and data[column].isnull().any(): self.log.log_and_raise_error(("Column {} contains missing values " "and cannot be of integer type" .format(column))) elif python_type == str: python_type = object else: data = data.copy(deep=True) data[column] = data[column].astype(python_type) if data[column].dtype != python_type: self.log.warning(("After conversion type in {0} " "should be {1} " "but is still {2}" .format(column, python_type, data[column].dtype))) except Exception as e: self.log.log_and_raise_error(("Failed to convert types in {0}. " "Exit with error {1}" .format(column, e))) self.log.info("Converted dtypes") return data def filter_invalid_missing_values(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' self._assert_dataframe_input(data) for column in data.columns: if (column in self._required_fields) and\ (data[column].isnull().any()): invalid_mask = data[column].isnull() reason = "Null value in the required field {}"\ .format(column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) return data def filter_invalid_types(self, data: pd.DataFrame) -> pd.DataFrame(): ''' ''' self._assert_dataframe_input(data) for column in data.columns: if column not in self._python_types: continue python_type = self._python_types[column] if data[column].dtype != python_type: def mismatch_type(x): return type(x) != python_type invalid_mask = data[column].apply(mismatch_type) reason = "Type mismatch if field {}".format(column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) return data def filter_invalid_patterns(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' self._assert_dataframe_input(data) for column in data.columns: if column not in self._patterns: continue pattern = self._patterns[column] invalid_mask = (~data[column].astype(str).str.match(pattern)) reason = "Pattern mismatch in field {}".format(column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) return data def filter_invalid_values(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' for column in data.columns: if column in self._minimum_values: min_value = self._minimum_values[column] invalid_mask = data[column] > min_value reason = "Too large values in field {}".format(column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) elif column in self._maximum_values: max_value = self._maximum_values[column] invalid_mask = data[column] < max_value reason = "Too small values in field {}".format(column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) elif column in self._allowed_values: allowed_values = self._allowed_values[column] invalid_mask = (~data[column].isin(allowed_values)) not_allowed_examples = data.loc[invalid_mask, column].unique()[:3] reason = "Not allowed values {0}... in field {1}"\ .format(not_allowed_examples, column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) else: continue return data """ def restrict_to_collection(self, data: pd.DataFrame, collection_name: str) -> pd.DataFrame: ''' ''' mongo_fields = self._schema_parser.get_fields_restricted_to_collection(collection_name=collection_name) fields = self._mapping_parser.get_fields_restricted_to_collecton(collection_name=collection_name) return data[[c for c in data.columns if (c in fields) or (c in mongo_fields)]] """ if __name__ == "__main__": # testing from cdplib.db_handlers.SQLHandler import SQLHandler mapping_path = os.path.join(".", "migration_mappings", "rs1_mapping.json") schema_paths = [ os.path.join(".", "mongo_schema", "schema_wheelsets.json"), os.path.join(".", "mongo_schema", "schema_process_instances.json")] inconsist_report_table = "test_inconsist_report_rs1" if all([os.path.isfile(p) for p in schema_paths + [mapping_path]]): cleaner = MigrationCleaning( mapping_path=mapping_path, schema_paths=schema_paths, mapping_source_name_tag="internal_name", mapping_target_name_tag="mongo_name", filter_index_columns=["radsatznummer"], inconsist_report_table=inconsist_report_table) db = SQLHandler() data = db.read_sql_to_dataframe("select * from rs1 limit 100") data = cleaner.replace_default_values(data) data = cleaner.map_equal_values(data) data = cleaner.convert_types(data) non_filtered_len = len(data) data = cleaner.filter_invalid_types(data) if len(data) < non_filtered_len: data = cleaner.convert_types(data) data = cleaner.filter_invalid_null_values(data) data = cleaner.filter_invalid_patterns(data) data = cleaner.filter_notallowed_values(data) print("Done!")