#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Wed Sep 25 08:09:52 2019 @author: tanya """ import os import sys import pandas as pd import numpy as np import gc from copy import deepcopy sys.path.append(os.getcwd()) from cdplib.db_migration.ParseMapping import ParseMapping from cdplib.db_migration.ParseJsonSchema import ParseJsonSchema from cdplib.utils.ExceptionsHandler import ExceptionsHandler from cdplib.utils.CleaningUtils import CleaningUtils from cdplib.log import Log import json from boltons.iterutils import remap class MigrationCleaning: ''' Class for correcting and filtering the incorrect data. We keep the correcting and the filtering methods separated, since there might be other custom steps in between. ''' def __init__(self, mapping_paths: (str, list), schema_paths: (str, list), inconsist_report_table: str = None, filter_index_columns: (str, list) = None, mapping_source: str = "internal_name", mapping_target: str = "mongo_name", mapping_parser: type = ParseMapping, schema_parser: type = ParseJsonSchema): ''' ''' self.log = Log('Migration Cleaning') self._exception_handler = ExceptionsHandler() assert isinstance(inconsist_report_table, str),\ "Inconsistent report table should be a tablename string" self._inconsist_report_table = inconsist_report_table assert isinstance(filter_index_columns, (str, list)),\ "Filter index columns must be a str or a list" self._filter_index_columns = list(filter_index_columns) self._schema_parser = schema_parser(schema_paths) self._mapping_parser = mapping_parser(mapping_paths, source=mapping_source, target=mapping_target) self._mapping_paths = mapping_paths self._schema_paths = schema_paths from cdplib.db_handlers.SQLHandler import SQLHandler self._sql_db = SQLHandler() def _assert_dataframe_input(self, data: pd.DataFrame): ''' ''' assert(isinstance(data, pd.DataFrame)),\ "Parameter 'data' must be a pandas dataframe" def append_mapping_path(self, mapping_path = str): ''' Appends a new mapping to the _mapping_paths variable from MigartionCleaning and to mapping_paths from ParseMapping ''' assert(isinstance(mapping_path, str)),\ "Parameter 'mapping_path' must be a string" mapping_paths = [] mapping_paths.append(self._mapping_paths) mapping_paths.append(mapping_path) self._mapping_paths = mapping_paths self._mapping_parser._mapping_paths = mapping_paths @property def _field_mapping(self): ''' ''' return self._mapping_parser.get_field_mapping() @property def _required_fields(self): ''' ''' source_required_fields = self._mapping_parser.get_required_fields() target_required_fields = self._schema_parser.get_required_fields() for source_field, target_field in self._field_mapping.items(): if (target_field in target_required_fields) and\ (source_field not in source_required_fields): source_required_fields.append(source_field) return source_required_fields @property def _default_values(self): ''' Returns a dictonary in which the default values of the mongo schema are mapped to the default values of the migration mapping. In migration mapping the default values should be specified as the values which doesn't contain any information and can be seen therefore as an empty value. ''' default_values = {} target_default_values = self._schema_parser.get_default_values() source_default_values = self._mapping_parser.get_default_values() for source_field, target_field in self._field_mapping.items(): if source_field not in source_default_values: continue elif target_field not in target_default_values: target_default_values[target_field] = np.nan default_values[source_field] = { target_default_values[target_field]: source_default_values[source_field] } return default_values @property def _python_types(self): ''' ''' target_types = self._schema_parser.get_python_types() result = {} for source_field, target_field in self._field_mapping.items(): if target_field in target_types: result[source_field] = target_types[target_field] """ date_type_mismatch =\ (target_field in target_types) and\ (source_field in source_types) and\ (target_types[target_field] == str) and\ (source_types[source_field] == np.dtype(' pd.DataFrame: ''' ''' assert((self._inconsist_report_table is not None) and (self._filter_index_columns is not None)),\ "Inconsistent report table or filter index is not provided" self._assert_dataframe_input(data) data = data.copy(deep=True) db = self._sql_db if invalid_mask.sum() == 0: return data data_inconsist = data.assign(reason=reason)\ .loc[invalid_mask]\ .reset_index(drop=True) if db.check_if_table_exists(self._inconsist_report_table): columns = db.get_column_names(tablename=self._inconsist_report_table) if len(columns) > 0: # TODO Tanya:The commented lines caused the reason to be the same for all entries. #columns_not_in_data = [column for column in columns if column not in data.columns] #for value in columns_not_in_data: # data_inconsist[value] = 'Column does not exist in the mongo database and has therefore been dropped' data_inconsist = data_inconsist[columns] db.append_to_table(data=data_inconsist, tablename=self._inconsist_report_table) n_rows_filtered = len(data_inconsist) n_instances_filtered = len(data_inconsist[self._filter_index_columns].drop_duplicates()) del data_inconsist gc.collect() self.log.warning(("Filtering: {0} ." "Filtered {1} rows " "and {2} instances" .format(reason, n_rows_filtered, n_instances_filtered))) nok_index_data = data.loc[invalid_mask, self._filter_index_columns]\ .drop_duplicates().reset_index(drop=True) nok_index = pd.MultiIndex.from_arrays([nok_index_data[c] for c in self._filter_index_columns]) all_index = pd.MultiIndex.from_arrays([data[c] for c in self._filter_index_columns]) data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True) return data def _replace_values(self, data: pd.DataFrame, default: bool) -> pd.DataFrame: ''' ''' if default: default_str = "default" else: default_str = "equal" self._assert_dataframe_input(data) data = data.copy(deep=True) if default: mapping = self._default_values else: mapping = self._value_mappings for column, d in mapping.items(): try: if column not in data.columns: continue dtype = data[column].dtype for key, values in d.items(): if not default: mask = (data[column].astype(str).isin(values)) else: mask = (data[column].isin(values)) if default: mask = mask | (data[column].isnull()) data.loc[mask, column] = key data[column] = data[column].astype(dtype) except Exception as e: self.log.log_and_raise_error(("Failed to replace {0} values " "in {1}. Exit with error {2}" .format(default_str, column, e))) self.log.info("Replaced {} values".format(default_str)) return data def replace_default_values(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' return self._replace_values(data=data, default=True) def map_equal_values(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' return self._replace_values(data=data, default=False) def convert_types(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' self._assert_dataframe_input(data) for column, python_type in self._python_types.items(): try: if column not in data.columns: continue elif column in self._date_formats: data[column] = CleaningUtils.convert_dates( series=data[column], formats=self._date_formats[column]) elif (python_type == int) and data[column].isnull().any(): self.log.log_and_raise_error(("Column {} contains missing values " "and cannot be of integer type" .format(column))) elif python_type == bool: accepted_bool = {'ja': True, 'j': True, '1': True, 1: True, 'yes': True, 'y': True, 'true':True, 't': True, 'nein': False, 'n': False, 'no': False, 'false': False, 'f': False, '0': False, 0: False} data[column] = data[column].map(accepted_bool) data[column] = data[column].astype(bool) elif python_type == str: # might not be the smoothes solution but it works python_type = str data = data.copy(deep=True) data[column] = data[column].astype(python_type) python_type = object data[column] = data[column].astype(python_type) elif python_type == float: data[column] = data[column].fillna(np.inf) # Replaces empty fields when type is string if data[column].dtypes == object: data[column] = data[column].replace(r'^\s*$', str(np.inf), regex=True) data[column] = data[column].astype(python_type) else: data = data.copy(deep=True) data[column] = data[column].astype(python_type) if data[column].dtype != python_type: self.log.warning(("After conversion type in {0} " "should be {1} " "but is still {2}" .format(column, python_type, data[column].dtype))) except Exception as e: self.log.log_and_raise_error(("Failed to convert types in {0}. " "Exit with error {1}" .format(column, e))) self.log.info("Converted dtypes") return data def filter_invalid_missing_values(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' self._assert_dataframe_input(data) for column in data.columns: if (column in self._required_fields) and\ (data[column].isnull().any()): invalid_mask = data[column].isnull() reason = "Null value in the required field {}"\ .format(column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) return data def filter_invalid_types(self, data: pd.DataFrame) -> pd.DataFrame(): ''' ''' self._assert_dataframe_input(data) for column in data.columns: if column not in self._python_types: continue python_type = self._python_types[column] #Needs to be done since coumn dtype of strings is a object if python_type == str: python_type = object if data[column].dtype != python_type: def mismatch_type(x): return type(x) != python_type invalid_mask = data[column].apply(mismatch_type) reason = "Type mismatch in field {}".format(column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) return data def filter_invalid_patterns(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' self._assert_dataframe_input(data) for column in data.columns: if column not in self._patterns: continue pattern = self._patterns[column] invalid_mask = (~data[column].astype(str).str.match(pattern)) reason = "Pattern mismatch in field {0}. Pattern: {1}Example: {2}"\ .format(column,pattern,data.iloc[0][column]) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) return data def filter_invalid_values(self, data: pd.DataFrame) -> pd.DataFrame: ''' ''' for column in data.columns: if column in self._minimum_values: min_value = self._minimum_values[column] invalid_mask = data[column] > min_value reason = "Too large values in field {}".format(column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) elif column in self._maximum_values: max_value = self._maximum_values[column] invalid_mask = data[column] < max_value reason = "Too small values in field {}".format(column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) elif column in self._allowed_values: allowed_values = self._allowed_values[column] invalid_mask = (~data[column].isin(allowed_values)) not_allowed_examples = data.loc[invalid_mask, column].unique()[:3] reason = "Not allowed values {0}... in field {1}"\ .format(not_allowed_examples, column) data = self._filter_invalid_data(data=data, invalid_mask=invalid_mask, reason=reason) else: continue return data def drop_columns_with_no_content(self, data: pd.DataFrame) -> pd.DataFrame(): ''' ''' data = data.dropna(how ='all', axis='columns') for column in data.columns: unique_values = data[column].unique() no_content_signs = [None, '-', 'n.a'] intersection = list(set(unique_values) & set(no_content_signs)) if len(intersection) - len(unique_values) == 0: data = data.drop(columns=[column]) return data def clean_json_from_None_object(self, data: pd.DataFrame, clean_bool: bool = True) -> pd.DataFrame(): data = data.to_json(date_format="iso") data = json.loads(data) new_data = remap(data, lambda p, k, v: v is not None) new_data = remap(new_data, lambda p, k, v: v != 'None') new_data = remap(new_data, lambda p, k, v: v != 'inf') # cleans not only bool type also int which are 0 or 1 # only use if it is necessary have to be change that it only considers # Ture and False for bools if clean_bool: new_data = remap(new_data, lambda p, k, v: (isinstance(v,bool) or (not isinstance(v,bool) and bool(v)))) return new_data def restrict_to_collection(self, data: pd.DataFrame, collection_name: str) -> pd.DataFrame: ''' ''' mongo_fields = self._schema_parser.get_fields_restricted_to_collection(collection_name=collection_name) mapping_fields = self._mapping_parser.get_fields_restricted_to_collection(collection_name=collection_name) return data[[c for c in data.columns if (c in mapping_fields) or (c in mongo_fields)]] def map_toleranzen_values(self, data: pd.DataFrame, toleranzen: pd.DataFrame): toleranzen.drop('nr', axis=1, inplace=True) toleranzen.columns = ['toleranzbez_wellen_reference', 'toleranzbez_innenring_reference', 'wellenschenkel.geometrie.durchmesser.min', 'wellenschenkel.geometrie.durchmesser.max', 'innenring.geometrie.durchmesser.min', 'innenring.geometrie.durchmesser.max', 'wellenschenkel_innenring_difference.geometrie.durchmesser.min', 'wellenschenkel_innenring_difference.geometrie.durchmesser.max'] labyrinten_drop_columns = ['innenring.geometrie.durchmesser.min', 'innenring.geometrie.durchmesser.max', 'wellenschenkel_innenring_difference.geometrie.durchmesser.min', 'wellenschenkel_innenring_difference.geometrie.durchmesser.max'] labyrinten_columns= ['toleranzbez_wellen_reference', 'toleranzbez_innenring_reference', 'labyrinthring.geometrie.durchmesser.min', 'labyrinthring.geometrie.durchmesser.max'] reparatur_stufe_labyrinten_columns= ['toleranzbez_wellen_reference', 'toleranzbez_innenring_reference', 'labyrinthring.reparatur_stufe.durchmesser.min', 'labyrinthring.reparatur_stufe.durchmesser.max'] reparatur_stufe_columns = ['toleranzbez_wellen_reference', 'toleranzbez_innenring_reference', 'wellenschenkel.reparatur_stufe.durchmesser.min', 'wellenschenkel.reparatur_stufe.durchmesser.max', 'innenring.reparatur_stufe.durchmesser.min', 'innenring.reparatur_stufe.durchmesser.max', 'wellenschenkel_innenring_difference.reparatur_stufe.durchmesser.min', 'wellenschenkel_innenring_difference.reparatur_stufe.durchmesser.max'] toleranzen_reference_columns = ['wellenschenkel_toleranz', 'labyrinthring_toleranz', 'wellen_reparatur_stufe_toleranz', 'labyrinthring_reparatur_stufe_toleranz'] available_columns = [column for column in data.columns if column in toleranzen_reference_columns] for column in available_columns: merge_map = [False] *len(data.index) if 'toleranz' in column: temp_toleranzen = deepcopy(toleranzen) if 'labyrinthring' in column: temp_toleranzen.drop(labyrinten_drop_columns, axis=1, inplace=True) if 'reparatur_stufe' in column: temp_toleranzen.columns = reparatur_stufe_labyrinten_columns merge_map = data['labyrinthting_reparatur_stufe_zulaessig'] == 'Nein' else: temp_toleranzen.columns = labyrinten_columns elif 'reparatur_stufe' in column: temp_toleranzen.columns = reparatur_stufe_columns merge_map = data['innenring_reparatur_stufe_zulaessig'] == 'Ja' data_before = len(data.index) data = data.merge(temp_toleranzen, how='left', left_on=column, right_on='toleranzbez_wellen_reference') data.loc[merge_map, temp_toleranzen.columns] = np.nan if data_before != len(data.index): print('WEVE LOST DATA!!') print('before:', data_before, 'now:', len(data.index)) sys.exit() data.drop(['toleranzbez_wellen_reference', 'toleranzbez_innenring_reference'], axis=1, inplace=True) return data def label_is_level( self, data: pd.DataFrame, column: str = "is", include_schrott: bool = False, drop_rows_with_no_is: bool = False) -> pd.DataFrame: ''' ''' is_level_mapping = {0: None, 1: ["IS1"], 2: ["IS1L", "IL", "IS1 + IL"], 3: ["IS2"], 4: ["IS3"]} for k, v in is_level_mapping.items(): if v is not None: data.loc[data[column].isin(v), column] = k else: data.loc[data[column].isnull(), column] = k if include_schrott and ("operation_type_2" in data.columns): schrott_mask = (data["operation_type_2"] == 2) data.loc[schrott_mask, column] = 5 data.loc[~data[column].isin([0,1,2,3,4,5]), column] = 0 if drop_rows_with_no_is: data = data.loc[data[column] != 0].copy(deep=True) return data.reset_index(drop=True) if __name__ == "__main__": # testing from cdplib.db_handlers.SQLHandler import SQLHandler mapping_path = os.path.join(".", "migration_mappings", "rs1_mapping.json") schema_paths = [ os.path.join(".", "mongo_schema", "schema_wheelsets.json"), os.path.join(".", "mongo_schema", "schema_process_instances.json")] inconsist_report_table = "test_inconsist_report_rs1" if all([os.path.isfile(p) for p in schema_paths + [mapping_path]]): cleaner = MigrationCleaning( mapping_paths=mapping_path, schema_paths=schema_paths, mapping_source="internal_name", mapping_target="mongo_name", filter_index_columns=["radsatznummer"], inconsist_report_table=inconsist_report_table) db = SQLHandler() data = db.read_sql_to_dataframe("select * from rs1 limit 100") data = cleaner.replace_default_values(data) data = cleaner.map_equal_values(data) data = cleaner.convert_types(data) non_filtered_len = len(data) data = cleaner.filter_invalid_types(data) if len(data) < non_filtered_len: data = cleaner.convert_types(data) data = cleaner.filter_invalid_null_values(data) data = cleaner.filter_invalid_patterns(data) data = cleaner.filter_notallowed_values(data) print("Done!")