|
@@ -19,6 +19,8 @@ from cdplib.db_migration.ParseJsonSchema import ParseJsonSchema
|
|
|
from cdplib.utils.ExceptionsHandler import ExceptionsHandler
|
|
|
from cdplib.utils.CleaningUtils import CleaningUtils
|
|
|
from cdplib.log import Log
|
|
|
+import json
|
|
|
+from boltons.iterutils import remap
|
|
|
|
|
|
class MigrationCleaning:
|
|
|
'''
|
|
@@ -26,7 +28,7 @@ class MigrationCleaning:
|
|
|
We keep the correcting and the filtering methods separated,
|
|
|
since there might be other custom steps in between.
|
|
|
'''
|
|
|
- def __init__(self, mapping_path: str,
|
|
|
+ def __init__(self, mapping_paths: (str, list),
|
|
|
schema_paths: (str, list),
|
|
|
inconsist_report_table: str = None,
|
|
|
filter_index_columns: (str, list) = None,
|
|
@@ -51,11 +53,11 @@ class MigrationCleaning:
|
|
|
|
|
|
self._schema_parser = schema_parser(schema_paths)
|
|
|
|
|
|
- self._mapping_parser = mapping_parser(mapping_path,
|
|
|
+ self._mapping_parser = mapping_parser(mapping_paths,
|
|
|
source=mapping_source,
|
|
|
target=mapping_target)
|
|
|
|
|
|
- self._mapping_path = mapping_path
|
|
|
+ self._mapping_paths = mapping_paths
|
|
|
self._schema_paths = schema_paths
|
|
|
|
|
|
from cdplib.db_handlers.SQLHandler import SQLHandler
|
|
@@ -67,6 +69,21 @@ class MigrationCleaning:
|
|
|
assert(isinstance(data, pd.DataFrame)),\
|
|
|
"Parameter 'data' must be a pandas dataframe"
|
|
|
|
|
|
+ def append_mapping_path(self, mapping_path = str):
|
|
|
+ '''
|
|
|
+ Appends a new mapping to the _mapping_paths variable from MigartionCleaning
|
|
|
+ and to mapping_paths from ParseMapping
|
|
|
+ '''
|
|
|
+ assert(isinstance(mapping_path, str)),\
|
|
|
+ "Parameter 'mapping_path' must be a string"
|
|
|
+
|
|
|
+ mapping_paths = []
|
|
|
+ mapping_paths.append(self._mapping_paths)
|
|
|
+ mapping_paths.append(mapping_path)
|
|
|
+ self._mapping_paths = mapping_paths
|
|
|
+ self._mapping_parser._mapping_paths = mapping_paths
|
|
|
+
|
|
|
+
|
|
|
@property
|
|
|
def _field_mapping(self):
|
|
|
'''
|
|
@@ -92,6 +109,11 @@ class MigrationCleaning:
|
|
|
@property
|
|
|
def _default_values(self):
|
|
|
'''
|
|
|
+ Returns a dictonary in which the default values of the mongo schema
|
|
|
+ are mapped to the default values of the migration mapping. In migration
|
|
|
+ mapping the default values should be specified as the values which
|
|
|
+ doesn't contain any information and can be seen therefore as an empty
|
|
|
+ value.
|
|
|
'''
|
|
|
default_values = {}
|
|
|
|
|
@@ -104,7 +126,6 @@ class MigrationCleaning:
|
|
|
continue
|
|
|
|
|
|
elif target_field not in target_default_values:
|
|
|
-
|
|
|
target_default_values[target_field] = np.nan
|
|
|
|
|
|
default_values[source_field] = {
|
|
@@ -119,7 +140,6 @@ class MigrationCleaning:
|
|
|
'''
|
|
|
'''
|
|
|
target_types = self._schema_parser.get_python_types()
|
|
|
-
|
|
|
result = {}
|
|
|
|
|
|
for source_field, target_field in self._field_mapping.items():
|
|
@@ -169,21 +189,6 @@ class MigrationCleaning:
|
|
|
'''
|
|
|
return self._mapping_parser.get_date_formats()
|
|
|
|
|
|
- def _get_mongo_schema_info(self, method_name: str):
|
|
|
- '''
|
|
|
- '''
|
|
|
- result = {}
|
|
|
-
|
|
|
- target_dict = getattr(self._schema_parser, method_name)()
|
|
|
-
|
|
|
- for source_field, target_field in self._field_mapping.items():
|
|
|
-
|
|
|
- if target_field in target_dict:
|
|
|
-
|
|
|
- result[source_field] = target_dict[target_field]
|
|
|
-
|
|
|
- return result
|
|
|
-
|
|
|
@property
|
|
|
def _allowed_values(self):
|
|
|
'''
|
|
@@ -208,6 +213,21 @@ class MigrationCleaning:
|
|
|
'''
|
|
|
return self._get_mongo_schema_info("get_patterns")
|
|
|
|
|
|
+ def _get_mongo_schema_info(self, method_name: str):
|
|
|
+ '''
|
|
|
+ '''
|
|
|
+ result = {}
|
|
|
+
|
|
|
+ target_dict = getattr(self._schema_parser, method_name)()
|
|
|
+
|
|
|
+ for source_field, target_field in self._field_mapping.items():
|
|
|
+
|
|
|
+ if target_field in target_dict:
|
|
|
+
|
|
|
+ result[source_field] = target_dict[target_field]
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
def _filter_invalid_data(self, data: pd.DataFrame,
|
|
|
invalid_mask: pd.Series,
|
|
|
reason: (str, pd.Series)) -> pd.DataFrame:
|
|
@@ -264,8 +284,6 @@ class MigrationCleaning:
|
|
|
|
|
|
data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True)
|
|
|
|
|
|
- #self._sql_db.release(db)
|
|
|
-
|
|
|
return data
|
|
|
|
|
|
def _replace_values(self, data: pd.DataFrame,
|
|
@@ -302,6 +320,7 @@ class MigrationCleaning:
|
|
|
mask = (data[column].astype(str).isin(values))
|
|
|
|
|
|
else:
|
|
|
+
|
|
|
mask = (data[column].isin(values))
|
|
|
|
|
|
if default:
|
|
@@ -357,12 +376,11 @@ class MigrationCleaning:
|
|
|
|
|
|
elif python_type == bool:
|
|
|
|
|
|
- data[column] = data[column].str.lower()
|
|
|
- accepted_bool = {'ja': True, 'j': True, '1': True, 1:True,
|
|
|
+ accepted_bool = {'ja': True, 'j': True, '1': True, 1: True,
|
|
|
'yes': True, 'y': True, 'true':True,
|
|
|
't': True, 'nein': False, 'n': False,
|
|
|
'no': False, 'false': False, 'f': False,
|
|
|
- '0': False, 0:False}
|
|
|
+ '0': False, 0: False}
|
|
|
data[column] = data[column].map(accepted_bool)
|
|
|
data[column] = data[column].astype(bool)
|
|
|
|
|
@@ -376,16 +394,17 @@ class MigrationCleaning:
|
|
|
python_type = object
|
|
|
data[column] = data[column].astype(python_type)
|
|
|
|
|
|
+ elif python_type == float:
|
|
|
+
|
|
|
+ data[column] = data[column].fillna(np.inf)
|
|
|
+ data[column] = data[column].astype(python_type)
|
|
|
+
|
|
|
else:
|
|
|
|
|
|
data = data.copy(deep=True)
|
|
|
data[column] = data[column].astype(python_type)
|
|
|
|
|
|
if data[column].dtype != python_type:
|
|
|
- print('---------------------------------------------')
|
|
|
- print(data[column].to_csv(column))
|
|
|
- print(python_type)
|
|
|
- print(column)
|
|
|
|
|
|
self.log.warning(("After conversion type in {0} "
|
|
|
"should be {1} "
|
|
@@ -437,6 +456,10 @@ class MigrationCleaning:
|
|
|
|
|
|
python_type = self._python_types[column]
|
|
|
|
|
|
+ #Needs to be done since coumn dtype of strings is a object
|
|
|
+ if python_type == str:
|
|
|
+ python_type = object
|
|
|
+
|
|
|
if data[column].dtype != python_type:
|
|
|
|
|
|
def mismatch_type(x):
|
|
@@ -444,7 +467,7 @@ class MigrationCleaning:
|
|
|
|
|
|
invalid_mask = data[column].apply(mismatch_type)
|
|
|
|
|
|
- reason = "Type mismatch if field {}".format(column)
|
|
|
+ reason = "Type mismatch in field {}".format(column)
|
|
|
|
|
|
data = self._filter_invalid_data(data=data,
|
|
|
invalid_mask=invalid_mask,
|
|
@@ -466,12 +489,12 @@ class MigrationCleaning:
|
|
|
|
|
|
invalid_mask = (~data[column].astype(str).str.match(pattern))
|
|
|
|
|
|
- reason = "Pattern mismatch in field {}".format(column)
|
|
|
+ reason = "Pattern mismatch in field {0}. Pattern: {1}Example: {2}"\
|
|
|
+ .format(column,pattern,data.iloc[0][column])
|
|
|
|
|
|
data = self._filter_invalid_data(data=data,
|
|
|
invalid_mask=invalid_mask,
|
|
|
reason=reason)
|
|
|
-
|
|
|
return data
|
|
|
|
|
|
def filter_invalid_values(self, data: pd.DataFrame) -> pd.DataFrame:
|
|
@@ -523,14 +546,36 @@ class MigrationCleaning:
|
|
|
|
|
|
return data
|
|
|
|
|
|
+ def drop_columns_with_no_content(self, data: pd.DataFrame) -> pd.DataFrame():
|
|
|
+ '''
|
|
|
+ '''
|
|
|
+ data = data.dropna(how ='all', axis='columns')
|
|
|
+ for column in data.columns:
|
|
|
+ unique_values = data[column].unique()
|
|
|
+ no_content_signs = [None, '-', 'n.a']
|
|
|
+ intersection = list(set(unique_values) & set(no_content_signs))
|
|
|
+ if len(intersection) - len(unique_values) == 0:
|
|
|
+ data = data.drop(columns=[column])
|
|
|
+
|
|
|
+ return data
|
|
|
+
|
|
|
+ def clean_json_from_None_object(self, data: pd.DataFrame) -> pd.DataFrame():
|
|
|
+ data = data.to_json(date_format="iso")
|
|
|
+ data = json.loads(data)
|
|
|
+ new_data = remap(data, lambda p, k, v: v is not None)
|
|
|
+ new_data = remap(new_data, lambda p, k, v: v != 'None')
|
|
|
+ new_data = remap(new_data, lambda p, k, v: v != 'inf')
|
|
|
+ new_data = remap(new_data, lambda p, k, v: (isinstance(v,bool) or (not isinstance(v,bool) and bool(v))))
|
|
|
+ return new_data
|
|
|
+
|
|
|
def restrict_to_collection(self, data: pd.DataFrame, collection_name: str) -> pd.DataFrame:
|
|
|
'''
|
|
|
'''
|
|
|
mongo_fields = self._schema_parser.get_fields_restricted_to_collection(collection_name=collection_name)
|
|
|
|
|
|
- fields = self._mapping_parser.get_fields_restricted_to_collecton(collection_name=collection_name)
|
|
|
+ mapping_fields = self._mapping_parser.get_fields_restricted_to_collection(collection_name=collection_name)
|
|
|
|
|
|
- return data[[c for c in data.columns if (c in fields) or (c in mongo_fields)]]
|
|
|
+ return data[[c for c in data.columns if (c in mapping_fields) or (c in mongo_fields)]]
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
@@ -550,7 +595,7 @@ if __name__ == "__main__":
|
|
|
if all([os.path.isfile(p) for p in schema_paths + [mapping_path]]):
|
|
|
|
|
|
cleaner = MigrationCleaning(
|
|
|
- mapping_path=mapping_path,
|
|
|
+ mapping_paths=mapping_path,
|
|
|
schema_paths=schema_paths,
|
|
|
mapping_source="internal_name",
|
|
|
mapping_target="mongo_name",
|