5 Commits b3f46ff21a ... 195ff25fc2

Author SHA1 Message Date
  tsteuer 195ff25fc2 update migrationcleaning 4 years ago
  tsteuer 4e464b745c enable multiple mapping files 4 years ago
  tsteuer 094f32f2fb resolve conflict 4 years ago
  tsteuer 86a7442185 Update 'setup.py' 4 years ago
  tsteuer b6de516668 Update 'setup.py' 4 years ago

+ 57 - 28
cdplib/db_migration/MigrationCleaning.py

@@ -19,6 +19,8 @@ from cdplib.db_migration.ParseJsonSchema import ParseJsonSchema
 from cdplib.utils.ExceptionsHandler import ExceptionsHandler
 from cdplib.utils.ExceptionsHandler import ExceptionsHandler
 from cdplib.utils.CleaningUtils import CleaningUtils
 from cdplib.utils.CleaningUtils import CleaningUtils
 from cdplib.log import Log
 from cdplib.log import Log
+import json
+from boltons.iterutils import remap
 
 
 class MigrationCleaning:
 class MigrationCleaning:
     '''
     '''
@@ -26,7 +28,7 @@ class MigrationCleaning:
     We keep the correcting and the filtering methods separated,
     We keep the correcting and the filtering methods separated,
     since there might be other custom steps in between.
     since there might be other custom steps in between.
     '''
     '''
-    def __init__(self, mapping_path: str,
+    def __init__(self, mapping_paths: (str, list),
                  schema_paths: (str, list),
                  schema_paths: (str, list),
                  inconsist_report_table: str = None,
                  inconsist_report_table: str = None,
                  filter_index_columns: (str, list) = None,
                  filter_index_columns: (str, list) = None,
@@ -51,11 +53,11 @@ class MigrationCleaning:
 
 
         self._schema_parser = schema_parser(schema_paths)
         self._schema_parser = schema_parser(schema_paths)
 
 
-        self._mapping_parser = mapping_parser(mapping_path,
+        self._mapping_parser = mapping_parser(mapping_paths,
                                               source=mapping_source,
                                               source=mapping_source,
                                               target=mapping_target)
                                               target=mapping_target)
 
 
-        self._mapping_path = mapping_path
+        self._mapping_paths = mapping_paths
         self._schema_paths = schema_paths
         self._schema_paths = schema_paths
 
 
         from cdplib.db_handlers.SQLHandler import SQLHandler
         from cdplib.db_handlers.SQLHandler import SQLHandler
@@ -92,6 +94,11 @@ class MigrationCleaning:
     @property
     @property
     def _default_values(self):
     def _default_values(self):
         '''
         '''
+        Returns a dictonary in which the default values of the mongo schema
+        are mapped to the default values of the migration mapping. In migration
+        mapping the default values should be specified as the values which
+        doesn't contain any information and can be seen therefore as an empty
+        value. 
         '''
         '''
         default_values = {}
         default_values = {}
 
 
@@ -104,14 +111,13 @@ class MigrationCleaning:
                 continue
                 continue
 
 
             elif target_field not in target_default_values:
             elif target_field not in target_default_values:
-
                 target_default_values[target_field] = np.nan
                 target_default_values[target_field] = np.nan
 
 
             default_values[source_field] = {
             default_values[source_field] = {
                     target_default_values[target_field]:
                     target_default_values[target_field]:
                     source_default_values[source_field]
                     source_default_values[source_field]
                     }
                     }
-
+  
         return default_values
         return default_values
 
 
     @property
     @property
@@ -119,7 +125,6 @@ class MigrationCleaning:
         '''
         '''
         '''
         '''
         target_types = self._schema_parser.get_python_types()
         target_types = self._schema_parser.get_python_types()
-
         result = {}
         result = {}
 
 
         for source_field, target_field in self._field_mapping.items():
         for source_field, target_field in self._field_mapping.items():
@@ -229,7 +234,7 @@ class MigrationCleaning:
         data_inconsist = data.assign(reason=reason)\
         data_inconsist = data.assign(reason=reason)\
                              .loc[invalid_mask]\
                              .loc[invalid_mask]\
                              .reset_index(drop=True)
                              .reset_index(drop=True)
-
+                                  
         if db.check_if_table_exists(self._inconsist_report_table):
         if db.check_if_table_exists(self._inconsist_report_table):
             columns = db.get_column_names(tablename=self._inconsist_report_table)
             columns = db.get_column_names(tablename=self._inconsist_report_table)
 
 
@@ -264,8 +269,6 @@ class MigrationCleaning:
 
 
         data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True)
         data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True)
 
 
-        #self._sql_db.release(db)
-
         return data
         return data
 
 
     def _replace_values(self, data: pd.DataFrame,
     def _replace_values(self, data: pd.DataFrame,
@@ -298,10 +301,11 @@ class MigrationCleaning:
                 for key, values in d.items():
                 for key, values in d.items():
 
 
                     if not default:
                     if not default:
-
+                    
                         mask = (data[column].astype(str).isin(values))
                         mask = (data[column].astype(str).isin(values))
 
 
                     else:
                     else:
+
                         mask = (data[column].isin(values))
                         mask = (data[column].isin(values))
 
 
                     if default:
                     if default:
@@ -309,7 +313,7 @@ class MigrationCleaning:
                         mask = mask | (data[column].isnull())
                         mask = mask | (data[column].isnull())
 
 
                     data.loc[mask, column] = key
                     data.loc[mask, column] = key
-
+                    
                 data[column] = data[column].astype(dtype)
                 data[column] = data[column].astype(dtype)
 
 
             except Exception as e:
             except Exception as e:
@@ -357,12 +361,11 @@ class MigrationCleaning:
 
 
                 elif python_type == bool:
                 elif python_type == bool:
 
 
-                    data[column] = data[column].str.lower()
-                    accepted_bool = {'ja': True, 'j': True, '1': True, 1:True,
+                    accepted_bool = {'ja': True, 'j': True, '1': True, 1: True,
                                      'yes': True, 'y': True, 'true':True,
                                      'yes': True, 'y': True, 'true':True,
                                      't': True, 'nein': False, 'n': False,
                                      't': True, 'nein': False, 'n': False,
                                      'no': False, 'false': False, 'f': False,
                                      'no': False, 'false': False, 'f': False,
-                                     '0': False, 0:False}
+                                     '0': False, 0: False}
                     data[column] = data[column].map(accepted_bool)
                     data[column] = data[column].map(accepted_bool)
                     data[column] = data[column].astype(bool)
                     data[column] = data[column].astype(bool)
 
 
@@ -375,6 +378,10 @@ class MigrationCleaning:
                     data[column] = data[column].astype(python_type)
                     data[column] = data[column].astype(python_type)
                     python_type = object
                     python_type = object
                     data[column] = data[column].astype(python_type)
                     data[column] = data[column].astype(python_type)
+                    
+                elif python_type == float:
+                    data = data.fillna(np.inf)
+                    data[column] = data[column].astype(python_type)
 
 
                 else:
                 else:
 
 
@@ -382,10 +389,6 @@ class MigrationCleaning:
                     data[column] = data[column].astype(python_type)
                     data[column] = data[column].astype(python_type)
 
 
                 if data[column].dtype != python_type:
                 if data[column].dtype != python_type:
-                    print('---------------------------------------------')
-                    print(data[column].to_csv(column))
-                    print(python_type)
-                    print(column)
 
 
                     self.log.warning(("After conversion type in {0} "
                     self.log.warning(("After conversion type in {0} "
                                        "should be {1} "
                                        "should be {1} "
@@ -436,7 +439,11 @@ class MigrationCleaning:
                 continue
                 continue
 
 
             python_type = self._python_types[column]
             python_type = self._python_types[column]
-
+            
+            #Needs to be done since coumn dtype of strings is a object
+            if python_type == str:
+                python_type = object
+                       
             if data[column].dtype != python_type:
             if data[column].dtype != python_type:
 
 
                 def mismatch_type(x):
                 def mismatch_type(x):
@@ -444,7 +451,7 @@ class MigrationCleaning:
 
 
                 invalid_mask = data[column].apply(mismatch_type)
                 invalid_mask = data[column].apply(mismatch_type)
 
 
-                reason = "Type mismatch if field {}".format(column)
+                reason = "Type mismatch in field {}".format(column)
 
 
                 data = self._filter_invalid_data(data=data,
                 data = self._filter_invalid_data(data=data,
                                                  invalid_mask=invalid_mask,
                                                  invalid_mask=invalid_mask,
@@ -466,12 +473,12 @@ class MigrationCleaning:
 
 
             invalid_mask = (~data[column].astype(str).str.match(pattern))
             invalid_mask = (~data[column].astype(str).str.match(pattern))
 
 
-            reason = "Pattern mismatch in field {}".format(column)
+            reason = "Pattern mismatch in field {0}. Pattern: {1}Example: {2}"\
+                    .format(column,pattern,data.iloc[0][column])
 
 
             data = self._filter_invalid_data(data=data,
             data = self._filter_invalid_data(data=data,
                                              invalid_mask=invalid_mask,
                                              invalid_mask=invalid_mask,
                                              reason=reason)
                                              reason=reason)
-
         return data
         return data
 
 
     def filter_invalid_values(self, data: pd.DataFrame) -> pd.DataFrame:
     def filter_invalid_values(self, data: pd.DataFrame) -> pd.DataFrame:
@@ -504,7 +511,7 @@ class MigrationCleaning:
                                                  reason=reason)
                                                  reason=reason)
 
 
             elif column in self._allowed_values:
             elif column in self._allowed_values:
-
+                
                 allowed_values = self._allowed_values[column]
                 allowed_values = self._allowed_values[column]
 
 
                 invalid_mask = (~data[column].isin(allowed_values))
                 invalid_mask = (~data[column].isin(allowed_values))
@@ -522,15 +529,37 @@ class MigrationCleaning:
                 continue
                 continue
 
 
         return data
         return data
+    
+    def drop_columns_with_no_content(self, data: pd.DataFrame) -> pd.DataFrame():
+        '''
+        '''
+        data = data.dropna(how ='all', axis='columns')
+        for column in data.columns: 
+            unique_values = data[column].unique() 
+            no_content_signs = [None, '-', 'n.a'] 
+            intersection = list(set(unique_values) & set(no_content_signs))
+            if len(intersection) - len(unique_values) == 0:
+                data = data.drop(columns=[column])
+            
+        return data
+    
+    def clean_json_from_None_object(self, data: pd.DataFrame) -> pd.DataFrame():
+        data = data.to_json(date_format="iso")
+        data = json.loads(data)
+        new_data = remap(data, lambda p, k, v: v is not None)
+        new_data = remap(new_data, lambda p, k, v: v != 'None')
+        new_data = remap(new_data, lambda p, k, v: v != 'inf')
+        new_data = remap(new_data, lambda p, k, v: (isinstance(v,bool) or (not isinstance(v,bool) and bool(v))))
+        return new_data
 
 
     def restrict_to_collection(self, data: pd.DataFrame, collection_name: str) -> pd.DataFrame:
     def restrict_to_collection(self, data: pd.DataFrame, collection_name: str) -> pd.DataFrame:
         '''
         '''
         '''
         '''
         mongo_fields = self._schema_parser.get_fields_restricted_to_collection(collection_name=collection_name)
         mongo_fields = self._schema_parser.get_fields_restricted_to_collection(collection_name=collection_name)
-
-        fields = self._mapping_parser.get_fields_restricted_to_collecton(collection_name=collection_name)
-
-        return data[[c for c in data.columns if (c in fields) or (c in mongo_fields)]]
+        
+        mapping_fields = self._mapping_parser.get_fields_restricted_to_collection(collection_name=collection_name)
+          
+        return data[[c for c in data.columns if (c in mapping_fields) or (c in mongo_fields)]]
 
 
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
@@ -550,7 +579,7 @@ if __name__ == "__main__":
     if all([os.path.isfile(p) for p in schema_paths + [mapping_path]]):
     if all([os.path.isfile(p) for p in schema_paths + [mapping_path]]):
 
 
         cleaner = MigrationCleaning(
         cleaner = MigrationCleaning(
-                mapping_path=mapping_path,
+                mapping_paths=mapping_path,
                 schema_paths=schema_paths,
                 schema_paths=schema_paths,
                 mapping_source="internal_name",
                 mapping_source="internal_name",
                 mapping_target="mongo_name",
                 mapping_target="mongo_name",

+ 1 - 2
cdplib/db_migration/ParseJsonSchema.py

@@ -113,7 +113,6 @@ class ParseJsonSchema(ParseDbSchema):
         '''
         '''
         mongo_types = self.get_mongo_types()
         mongo_types = self.get_mongo_types()
         python_types = {}
         python_types = {}
-
         bson_to_python_types = {"double": float,
         bson_to_python_types = {"double": float,
                                 "decimal": float,
                                 "decimal": float,
                                 "string": str,
                                 "string": str,
@@ -199,7 +198,7 @@ class ParseJsonSchema(ParseDbSchema):
         result = self._parse_one(schema=schemas[0],
         result = self._parse_one(schema=schemas[0],
                                  field_info=field_info,
                                  field_info=field_info,
                                  required_only=required_only)
                                  required_only=required_only)
-
+        
         for schema in schemas[1:]:
         for schema in schemas[1:]:
             
             
             next_result = self._parse_one(schema=schema,
             next_result = self._parse_one(schema=schema,

+ 65 - 27
cdplib/db_migration/ParseMapping.py

@@ -9,39 +9,72 @@ Created on Fri Sep 20 15:33:17 2019
 import os
 import os
 import sys
 import sys
 import numpy as np
 import numpy as np
+import json
+
+from cdplib.log import Log
 sys.path.append(os.getcwd())
 sys.path.append(os.getcwd())
 
 
 class ParseMapping:
 class ParseMapping:
     '''
     '''
     '''
     '''
-    def __init__(self, mapping_path: str, log_name: str = "ParseMapping",
+    def __init__(self, mapping_paths: (str, list), log_name: str = "ParseMapping",
                  source: str = "original_name", target: str = "mongo_name",
                  source: str = "original_name", target: str = "mongo_name",
-                 target_collection: str = "mongo_collection"):
+                 target_collections: str = "mongo_collection"):
         '''
         '''
         '''
         '''
-        import json
-        from cdplib.log import Log
 
 
         self._log = Log('Parse Mapping')
         self._log = Log('Parse Mapping')
-
-        if not os.path.isfile(mapping_path):
-            err = "Mapping not found "+mapping_path
-            self._log.error(err)
-            raise FileNotFoundError(err)
-
-        try:
-            with open(mapping_path, "r") as f:
-                self._mapping = json.load(f)
-
-        except Exception as e:
-            err = ("Could not load mapping. " + mapping_path +
-                   "Exit with error {}".format(e))
-            self._log.error(err)
-            raise Exception(err)
-
+        
+        assert(isinstance(mapping_paths, (list, str))),\
+            "Mapping_paths must be either str or lists"
+            
+        if isinstance(mapping_paths, str):
+            mapping_paths = [mapping_paths]
+        
+        self._mapping_paths =  mapping_paths       
         self._source = source
         self._source = source
         self._target = target
         self._target = target
-        self._target_collection = target_collection
+        self._target_collections = target_collections
+        self._update_mapping()
+        
+    def _update_mapping(self):
+        '''
+        Since we can have multiple mappings per table we need to add them to
+        the object. I concatenated the mapping so that we don't have to adjust 
+        all function of the class to accept also list input. The class could
+        be adjusted to accept list or even a dictornary with the key name as 
+        name of the mapping and value the json mapping. 
+        !!! WARNING !!!! 
+        Since the mapping are just concatenated there is right now 
+        no way to ditinguish from the object itself which item belongs to which
+        mapping file.
+        '''
+        mappings = []
+    
+        for mapping_path in self._mapping_paths:
+            try:
+                with open(mapping_path, "r") as f:
+                    mapping = json.load(f) 
+                mappings.append(mapping)
+                        
+            except Exception as e:
+                err = ("Could not load json schema:{1} , "
+                       "Obtained error {0}".format(e, mapping_path))
+
+                self._log.error(err)
+                raise Exception(err)
+                
+        if len(mappings) > 1:
+            concatenate_mapping = []
+            for mapping in mappings:
+                if not concatenate_mapping:
+                    concatenate_mapping = mapping
+                else:
+                   concatenate_mapping.extend(mapping)
+            self._mapping = concatenate_mapping
+        else:
+            self._mapping = mappings[0]
+        
 
 
     def get_field_mapping(self) -> dict:
     def get_field_mapping(self) -> dict:
         '''
         '''
@@ -58,7 +91,7 @@ class ParseMapping:
             "Invalid from field"
             "Invalid from field"
 
 
         return [d[self._source] for d in self._mapping
         return [d[self._source] for d in self._mapping
-                if (key in d) and (d[key] == value)]
+                if (key in d) and (value in d[key])]
 
 
     def get_required_fields(self) -> list:
     def get_required_fields(self) -> list:
         '''
         '''
@@ -72,10 +105,10 @@ class ParseMapping:
         return self._get_fields_satistisfying_condition(key="type",
         return self._get_fields_satistisfying_condition(key="type",
                                                         value="Date")
                                                         value="Date")
 
 
-    def get_fields_restricted_to_collecton(self, collection_name: str) -> list:
+    def get_fields_restricted_to_collection(self, collection_name: str) -> list:
         '''
         '''
         '''
         '''
-        return self._get_fields_satistisfying_condition(key=self._target_collection,
+        return self._get_fields_satistisfying_condition(key=self._target_collections,
                                                         value=collection_name)
                                                         value=collection_name)
 
 
     def _get_info(self, key: str, value=None) -> dict:
     def _get_info(self, key: str, value=None) -> dict:
@@ -84,9 +117,14 @@ class ParseMapping:
         assert(all([self._source in d for d in self._mapping])),\
         assert(all([self._source in d for d in self._mapping])),\
             "Invalid from field"
             "Invalid from field"
 
 
-        return {d[self._source]: d[key] for d in self._mapping
-                if (key in d) and ((value is not None)
-                and (d[key] == value)) or (key in d)}
+        result = {}
+
+        for d in self._mapping: 
+             if key in d and d[key] is not None and d[key]:
+                 result.update({d[self._source]: d[key]})
+            
+        return result
+
 
 
     def get_default_values(self) -> dict:
     def get_default_values(self) -> dict:
         '''
         '''

+ 3 - 3
setup.py

@@ -1,12 +1,12 @@
 from setuptools import setup,find_packages
 from setuptools import setup,find_packages
 
 
-INSTALL_REQUIRES = [    
+INSTALL_REQUIRES = [
         'pandas',
         'pandas',
         'sqlalchemy',
         'sqlalchemy',
         'sqlparse',
         'sqlparse',
-        'pymysql',      
+        'pymysql',
         'pymongo',
         'pymongo',
-        'jsonref', 
+        'jsonref',
         'simplejson',
         'simplejson',
         'mysql',
         'mysql',
         'sqlalchemy_utils',
         'sqlalchemy_utils',