소스 검색

enable multiple mapping files

tsteuer 4 년 전
부모
커밋
4e464b745c
3개의 변경된 파일95개의 추가작업 그리고 57개의 파일을 삭제
  1. 29 28
      cdplib/db_migration/MigrationCleaning.py
  2. 1 2
      cdplib/db_migration/ParseJsonSchema.py
  3. 65 27
      cdplib/db_migration/ParseMapping.py

+ 29 - 28
cdplib/db_migration/MigrationCleaning.py

@@ -26,7 +26,7 @@ class MigrationCleaning:
     We keep the correcting and the filtering methods separated,
     since there might be other custom steps in between.
     '''
-    def __init__(self, mapping_path: str,
+    def __init__(self, mapping_paths: (str, list),
                  schema_paths: (str, list),
                  inconsist_report_table: str = None,
                  filter_index_columns: (str, list) = None,
@@ -51,11 +51,11 @@ class MigrationCleaning:
 
         self._schema_parser = schema_parser(schema_paths)
 
-        self._mapping_parser = mapping_parser(mapping_path,
+        self._mapping_parser = mapping_parser(mapping_paths,
                                               source=mapping_source,
                                               target=mapping_target)
 
-        self._mapping_path = mapping_path
+        self._mapping_paths = mapping_paths
         self._schema_paths = schema_paths
 
         from cdplib.db_handlers.SQLHandler import SQLHandler
@@ -92,6 +92,11 @@ class MigrationCleaning:
     @property
     def _default_values(self):
         '''
+        Returns a dictonary in which the default values of the mongo schema
+        are mapped to the default values of the migration mapping. In migration
+        mapping the default values should be specified as the values which
+        doesn't contain any information and can be seen therefore as an empty
+        value. 
         '''
         default_values = {}
 
@@ -104,14 +109,13 @@ class MigrationCleaning:
                 continue
 
             elif target_field not in target_default_values:
-
                 target_default_values[target_field] = np.nan
 
             default_values[source_field] = {
                     target_default_values[target_field]:
                     source_default_values[source_field]
                     }
-
+  
         return default_values
 
     @property
@@ -119,7 +123,6 @@ class MigrationCleaning:
         '''
         '''
         target_types = self._schema_parser.get_python_types()
-
         result = {}
 
         for source_field, target_field in self._field_mapping.items():
@@ -229,7 +232,7 @@ class MigrationCleaning:
         data_inconsist = data.assign(reason=reason)\
                              .loc[invalid_mask]\
                              .reset_index(drop=True)
-
+                                  
         if db.check_if_table_exists(self._inconsist_report_table):
             columns = db.get_column_names(tablename=self._inconsist_report_table)
 
@@ -264,8 +267,6 @@ class MigrationCleaning:
 
         data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True)
 
-        #self._sql_db.release(db)
-
         return data
 
     def _replace_values(self, data: pd.DataFrame,
@@ -298,10 +299,11 @@ class MigrationCleaning:
                 for key, values in d.items():
 
                     if not default:
-
+                    
                         mask = (data[column].astype(str).isin(values))
 
                     else:
+
                         mask = (data[column].isin(values))
 
                     if default:
@@ -309,7 +311,7 @@ class MigrationCleaning:
                         mask = mask | (data[column].isnull())
 
                     data.loc[mask, column] = key
-
+                    
                 data[column] = data[column].astype(dtype)
 
             except Exception as e:
@@ -357,12 +359,11 @@ class MigrationCleaning:
 
                 elif python_type == bool:
 
-                    data[column] = data[column].str.lower()
-                    accepted_bool = {'ja': True, 'j': True, '1': True, 1:True,
+                    accepted_bool = {'ja': True, 'j': True, '1': True, 1: True,
                                      'yes': True, 'y': True, 'true':True,
                                      't': True, 'nein': False, 'n': False,
                                      'no': False, 'false': False, 'f': False,
-                                     '0': False, 0:False}
+                                     '0': False, 0: False}
                     data[column] = data[column].map(accepted_bool)
                     data[column] = data[column].astype(bool)
 
@@ -382,10 +383,6 @@ class MigrationCleaning:
                     data[column] = data[column].astype(python_type)
 
                 if data[column].dtype != python_type:
-                    print('---------------------------------------------')
-                    print(data[column].to_csv(column))
-                    print(python_type)
-                    print(column)
 
                     self.log.warning(("After conversion type in {0} "
                                        "should be {1} "
@@ -436,7 +433,11 @@ class MigrationCleaning:
                 continue
 
             python_type = self._python_types[column]
-
+            
+            #Needs to be done since coumn dtype of strings is a object
+            if python_type == str:
+                python_type = object
+                       
             if data[column].dtype != python_type:
 
                 def mismatch_type(x):
@@ -444,7 +445,7 @@ class MigrationCleaning:
 
                 invalid_mask = data[column].apply(mismatch_type)
 
-                reason = "Type mismatch if field {}".format(column)
+                reason = "Type mismatch in field {}".format(column)
 
                 data = self._filter_invalid_data(data=data,
                                                  invalid_mask=invalid_mask,
@@ -466,12 +467,12 @@ class MigrationCleaning:
 
             invalid_mask = (~data[column].astype(str).str.match(pattern))
 
-            reason = "Pattern mismatch in field {}".format(column)
+            reason = "Pattern mismatch in field {0}. Pattern: {1}Example: {2}"\
+                    .format(column,pattern,data.iloc[0][column])
 
             data = self._filter_invalid_data(data=data,
                                              invalid_mask=invalid_mask,
                                              reason=reason)
-
         return data
 
     def filter_invalid_values(self, data: pd.DataFrame) -> pd.DataFrame:
@@ -504,7 +505,7 @@ class MigrationCleaning:
                                                  reason=reason)
 
             elif column in self._allowed_values:
-
+                
                 allowed_values = self._allowed_values[column]
 
                 invalid_mask = (~data[column].isin(allowed_values))
@@ -527,10 +528,10 @@ class MigrationCleaning:
         '''
         '''
         mongo_fields = self._schema_parser.get_fields_restricted_to_collection(collection_name=collection_name)
-
-        fields = self._mapping_parser.get_fields_restricted_to_collecton(collection_name=collection_name)
-
-        return data[[c for c in data.columns if (c in fields) or (c in mongo_fields)]]
+        
+        mapping_fields = self._mapping_parser.get_fields_restricted_to_collection(collection_name=collection_name)
+          
+        return data[[c for c in data.columns if (c in mapping_fields) or (c in mongo_fields)]]
 
 
 if __name__ == "__main__":
@@ -550,7 +551,7 @@ if __name__ == "__main__":
     if all([os.path.isfile(p) for p in schema_paths + [mapping_path]]):
 
         cleaner = MigrationCleaning(
-                mapping_path=mapping_path,
+                mapping_paths=mapping_path,
                 schema_paths=schema_paths,
                 mapping_source="internal_name",
                 mapping_target="mongo_name",

+ 1 - 2
cdplib/db_migration/ParseJsonSchema.py

@@ -113,7 +113,6 @@ class ParseJsonSchema(ParseDbSchema):
         '''
         mongo_types = self.get_mongo_types()
         python_types = {}
-
         bson_to_python_types = {"double": float,
                                 "decimal": float,
                                 "string": str,
@@ -199,7 +198,7 @@ class ParseJsonSchema(ParseDbSchema):
         result = self._parse_one(schema=schemas[0],
                                  field_info=field_info,
                                  required_only=required_only)
-
+        
         for schema in schemas[1:]:
             
             next_result = self._parse_one(schema=schema,

+ 65 - 27
cdplib/db_migration/ParseMapping.py

@@ -9,39 +9,72 @@ Created on Fri Sep 20 15:33:17 2019
 import os
 import sys
 import numpy as np
+import json
+
+from cdplib.log import Log
 sys.path.append(os.getcwd())
 
 class ParseMapping:
     '''
     '''
-    def __init__(self, mapping_path: str, log_name: str = "ParseMapping",
+    def __init__(self, mapping_paths: (str, list), log_name: str = "ParseMapping",
                  source: str = "original_name", target: str = "mongo_name",
-                 target_collection: str = "mongo_collection"):
+                 target_collections: str = "mongo_collection"):
         '''
         '''
-        import json
-        from cdplib.log import Log
 
         self._log = Log('Parse Mapping')
-
-        if not os.path.isfile(mapping_path):
-            err = "Mapping not found "+mapping_path
-            self._log.error(err)
-            raise FileNotFoundError(err)
-
-        try:
-            with open(mapping_path, "r") as f:
-                self._mapping = json.load(f)
-
-        except Exception as e:
-            err = ("Could not load mapping. " + mapping_path +
-                   "Exit with error {}".format(e))
-            self._log.error(err)
-            raise Exception(err)
-
+        
+        assert(isinstance(mapping_paths, (list, str))),\
+            "Mapping_paths must be either str or lists"
+            
+        if isinstance(mapping_paths, str):
+            mapping_paths = [mapping_paths]
+        
+        self._mapping_paths =  mapping_paths       
         self._source = source
         self._target = target
-        self._target_collection = target_collection
+        self._target_collections = target_collections
+        self._update_mapping()
+        
+    def _update_mapping(self):
+        '''
+        Since we can have multiple mappings per table we need to add them to
+        the object. I concatenated the mapping so that we don't have to adjust 
+        all function of the class to accept also list input. The class could
+        be adjusted to accept list or even a dictornary with the key name as 
+        name of the mapping and value the json mapping. 
+        !!! WARNING !!!! 
+        Since the mapping are just concatenated there is right now 
+        no way to ditinguish from the object itself which item belongs to which
+        mapping file.
+        '''
+        mappings = []
+    
+        for mapping_path in self._mapping_paths:
+            try:
+                with open(mapping_path, "r") as f:
+                    mapping = json.load(f) 
+                mappings.append(mapping)
+                        
+            except Exception as e:
+                err = ("Could not load json schema:{1} , "
+                       "Obtained error {0}".format(e, mapping_path))
+
+                self._log.error(err)
+                raise Exception(err)
+                
+        if len(mappings) > 1:
+            concatenate_mapping = []
+            for mapping in mappings:
+                if not concatenate_mapping:
+                    concatenate_mapping = mapping
+                else:
+                   concatenate_mapping.extend(mapping)
+            self._mapping = concatenate_mapping
+        else:
+            self._mapping = mappings[0]
+        
 
     def get_field_mapping(self) -> dict:
         '''
@@ -58,7 +91,7 @@ class ParseMapping:
             "Invalid from field"
 
         return [d[self._source] for d in self._mapping
-                if (key in d) and (d[key] == value)]
+                if (key in d) and (value in d[key])]
 
     def get_required_fields(self) -> list:
         '''
@@ -72,10 +105,10 @@ class ParseMapping:
         return self._get_fields_satistisfying_condition(key="type",
                                                         value="Date")
 
-    def get_fields_restricted_to_collecton(self, collection_name: str) -> list:
+    def get_fields_restricted_to_collection(self, collection_name: str) -> list:
         '''
         '''
-        return self._get_fields_satistisfying_condition(key=self._target_collection,
+        return self._get_fields_satistisfying_condition(key=self._target_collections,
                                                         value=collection_name)
 
     def _get_info(self, key: str, value=None) -> dict:
@@ -84,9 +117,14 @@ class ParseMapping:
         assert(all([self._source in d for d in self._mapping])),\
             "Invalid from field"
 
-        return {d[self._source]: d[key] for d in self._mapping
-                if (key in d) and ((value is not None)
-                and (d[key] == value)) or (key in d)}
+        result = {}
+
+        for d in self._mapping: 
+             if key in d and d[key] is not None and d[key]:
+                 result.update({d[self._source]: d[key]})
+            
+        return result
+
 
     def get_default_values(self) -> dict:
         '''