Browse Source

fix ParseMapping get functions and append mapping_path to MigrationCleaning

tsteuer 4 years ago
parent
commit
10ab0931b0
2 changed files with 130 additions and 76 deletions
  1. 47 32
      cdplib/db_migration/MigrationCleaning.py
  2. 83 44
      cdplib/db_migration/ParseMapping.py

+ 47 - 32
cdplib/db_migration/MigrationCleaning.py

@@ -69,6 +69,21 @@ class MigrationCleaning:
         assert(isinstance(data, pd.DataFrame)),\
             "Parameter 'data' must be a pandas dataframe"
 
+    def append_mapping_path(self, mapping_path = str):
+        '''
+        Appends a new mapping to the _mapping_paths variable from MigartionCleaning
+        and to mapping_paths from ParseMapping
+        '''
+        assert(isinstance(mapping_path, str)),\
+            "Parameter 'mapping_path' must be a string"
+
+        mapping_paths = []
+        mapping_paths.append(self._mapping_paths)
+        mapping_paths.append(mapping_path)
+        self._mapping_paths = mapping_paths
+        self._mapping_parser._mapping_paths = mapping_paths
+
+
     @property
     def _field_mapping(self):
         '''
@@ -98,7 +113,7 @@ class MigrationCleaning:
         are mapped to the default values of the migration mapping. In migration
         mapping the default values should be specified as the values which
         doesn't contain any information and can be seen therefore as an empty
-        value. 
+        value.
         '''
         default_values = {}
 
@@ -117,7 +132,7 @@ class MigrationCleaning:
                     target_default_values[target_field]:
                     source_default_values[source_field]
                     }
-  
+
         return default_values
 
     @property
@@ -174,21 +189,6 @@ class MigrationCleaning:
         '''
         return self._mapping_parser.get_date_formats()
 
-    def _get_mongo_schema_info(self, method_name: str):
-        '''
-        '''
-        result = {}
-
-        target_dict = getattr(self._schema_parser, method_name)()
-
-        for source_field, target_field in self._field_mapping.items():
-
-            if target_field in target_dict:
-
-                result[source_field] = target_dict[target_field]
-
-        return result
-
     @property
     def _allowed_values(self):
         '''
@@ -213,6 +213,21 @@ class MigrationCleaning:
         '''
         return self._get_mongo_schema_info("get_patterns")
 
+    def _get_mongo_schema_info(self, method_name: str):
+        '''
+        '''
+        result = {}
+
+        target_dict = getattr(self._schema_parser, method_name)()
+
+        for source_field, target_field in self._field_mapping.items():
+
+            if target_field in target_dict:
+
+                result[source_field] = target_dict[target_field]
+
+        return result
+
     def _filter_invalid_data(self, data: pd.DataFrame,
                              invalid_mask: pd.Series,
                              reason: (str, pd.Series)) -> pd.DataFrame:
@@ -234,7 +249,7 @@ class MigrationCleaning:
         data_inconsist = data.assign(reason=reason)\
                              .loc[invalid_mask]\
                              .reset_index(drop=True)
-                                  
+
         if db.check_if_table_exists(self._inconsist_report_table):
             columns = db.get_column_names(tablename=self._inconsist_report_table)
 
@@ -301,7 +316,7 @@ class MigrationCleaning:
                 for key, values in d.items():
 
                     if not default:
-                    
+
                         mask = (data[column].astype(str).isin(values))
 
                     else:
@@ -313,7 +328,7 @@ class MigrationCleaning:
                         mask = mask | (data[column].isnull())
 
                     data.loc[mask, column] = key
-                    
+
                 data[column] = data[column].astype(dtype)
 
             except Exception as e:
@@ -378,7 +393,7 @@ class MigrationCleaning:
                     data[column] = data[column].astype(python_type)
                     python_type = object
                     data[column] = data[column].astype(python_type)
-                    
+
                 elif python_type == float:
                     data = data.fillna(np.inf)
                     data[column] = data[column].astype(python_type)
@@ -439,11 +454,11 @@ class MigrationCleaning:
                 continue
 
             python_type = self._python_types[column]
-            
+
             #Needs to be done since coumn dtype of strings is a object
             if python_type == str:
                 python_type = object
-                       
+
             if data[column].dtype != python_type:
 
                 def mismatch_type(x):
@@ -511,7 +526,7 @@ class MigrationCleaning:
                                                  reason=reason)
 
             elif column in self._allowed_values:
-                
+
                 allowed_values = self._allowed_values[column]
 
                 invalid_mask = (~data[column].isin(allowed_values))
@@ -529,20 +544,20 @@ class MigrationCleaning:
                 continue
 
         return data
-    
+
     def drop_columns_with_no_content(self, data: pd.DataFrame) -> pd.DataFrame():
         '''
         '''
         data = data.dropna(how ='all', axis='columns')
-        for column in data.columns: 
-            unique_values = data[column].unique() 
-            no_content_signs = [None, '-', 'n.a'] 
+        for column in data.columns:
+            unique_values = data[column].unique()
+            no_content_signs = [None, '-', 'n.a']
             intersection = list(set(unique_values) & set(no_content_signs))
             if len(intersection) - len(unique_values) == 0:
                 data = data.drop(columns=[column])
-            
+
         return data
-    
+
     def clean_json_from_None_object(self, data: pd.DataFrame) -> pd.DataFrame():
         data = data.to_json(date_format="iso")
         data = json.loads(data)
@@ -556,9 +571,9 @@ class MigrationCleaning:
         '''
         '''
         mongo_fields = self._schema_parser.get_fields_restricted_to_collection(collection_name=collection_name)
-        
+
         mapping_fields = self._mapping_parser.get_fields_restricted_to_collection(collection_name=collection_name)
-          
+
         return data[[c for c in data.columns if (c in mapping_fields) or (c in mongo_fields)]]
 
 

+ 83 - 44
cdplib/db_migration/ParseMapping.py

@@ -24,46 +24,46 @@ class ParseMapping:
         '''
 
         self._log = Log('Parse Mapping')
-        
+
         assert(isinstance(mapping_paths, (list, str))),\
             "Mapping_paths must be either str or lists"
-            
+
         if isinstance(mapping_paths, str):
             mapping_paths = [mapping_paths]
-        
-        self._mapping_paths =  mapping_paths       
+
+        self._mapping_paths =  mapping_paths
         self._source = source
         self._target = target
         self._target_collections = target_collections
         self._update_mapping()
-        
+
     def _update_mapping(self):
         '''
         Since we can have multiple mappings per table we need to add them to
-        the object. I concatenated the mapping so that we don't have to adjust 
+        the object. I concatenated the mapping so that we don't have to adjust
         all function of the class to accept also list input. The class could
-        be adjusted to accept list or even a dictornary with the key name as 
-        name of the mapping and value the json mapping. 
-        !!! WARNING !!!! 
-        Since the mapping are just concatenated there is right now 
+        be adjusted to accept list or even a dictornary with the key name as
+        name of the mapping and value the json mapping.
+        !!! WARNING !!!!
+        Since the mapping are just concatenated there is right now
         no way to ditinguish from the object itself which item belongs to which
         mapping file.
         '''
         mappings = []
-    
+
         for mapping_path in self._mapping_paths:
             try:
                 with open(mapping_path, "r") as f:
-                    mapping = json.load(f) 
+                    mapping = json.load(f)
                 mappings.append(mapping)
-                        
+
             except Exception as e:
                 err = ("Could not load json schema:{1} , "
                        "Obtained error {0}".format(e, mapping_path))
 
                 self._log.error(err)
                 raise Exception(err)
-                
+
         if len(mappings) > 1:
             concatenate_mapping = []
             for mapping in mappings:
@@ -74,7 +74,7 @@ class ParseMapping:
             self._mapping = concatenate_mapping
         else:
             self._mapping = mappings[0]
-        
+
 
     def get_field_mapping(self) -> dict:
         '''
@@ -111,43 +111,83 @@ class ParseMapping:
         return self._get_fields_satistisfying_condition(key=self._target_collections,
                                                         value=collection_name)
 
-    def _get_info(self, key: str, value=None) -> dict:
+    def _get_property_from_mapping(self, property_names: list) -> dict:
         '''
+        Get specified property names from migration mapping json.
         '''
+        assert(isinstance(property_names,list)),\
+            "Parameter 'property_names' is not a list"
+
         assert(all([self._source in d for d in self._mapping])),\
-            "Invalid from field"
+            "Not all objects in the mapping json contain property tag " + self._source
 
         result = {}
-  
         for column_mapping in self._mapping:
+            for property_name in property_names:
+                if property_name in column_mapping and column_mapping[property_name]:
+                 result.update({column_mapping[self._source]: column_mapping[property_name]})
 
-            if (key in column_mapping and column_mapping[key] is not None\
-                and column_mapping[key] and (column_mapping[key] == value or value is None))\
-                or (column_mapping[key] == value):
-                    
-                 result.update({column_mapping[self._source]: column_mapping[key]})
-     
         return result
 
 
 
     def get_default_values(self) -> dict:
         '''
+        Get default values from migration mapping json. If more peorerty names
+        are beeing added also add them in the unit test.
+        '''
+        standard_default_names=["default_values"]
+
+        return self._get_property_from_mapping(standard_default_names)
+
+    def get_types(self) -> dict:
         '''
-        return self._get_info(key="default_values")
+        Get type from migration mapping json. If more peorerty names
+        are beeing added also add them in the unit test.
+        '''
+
+        standard_type_names=["type"]
+
+        return self._get_property_from_mapping(standard_type_names)
+
+    def get_value_mappings(self) -> dict:
+        '''
+        Get type from migration mapping json. If more peorerty names
+        are beeing added also add them in the unit test.
+        '''
+
+        standard_value_mapping_names=["value_mapping"]
+
+        return self._get_property_from_mapping(standard_value_mapping_names)
 
     def get_date_formats(self) -> dict:
         '''
+        Get date fromats from migration mapping json. If more peorerty names
+        are beeing added or value also add them in the unit test.
         '''
-        return self._get_info(key="type", value="DATETIME")
-    
+        assert(all([self._source in d for d in self._mapping])),\
+            "Not all objects in the mapping json contain property tag " + self._source
+
+        standard_property_names=["type"]
+        standard_property_values=["DATETIME"]
+
+        date_formats = {}
+        for column_mapping in self._mapping:
+            for property_name in standard_property_names:
+                if property_name in column_mapping and column_mapping[property_name]:
+                    for value in standard_property_values:
+                        if column_mapping[property_name] == value:
+                            date_formats.update({column_mapping[self._source]: column_mapping[property_name]})
+
+        return date_formats
+
     def get_internal_names(self) -> dict:
         '''
         '''
- 
+
         if all(["internal_name" in d for d in self._mapping]):
             internal_names = [d["internal_name"] for d in self._mapping]
-    
+
         elif all(["internal_name" not in d for d in self._mapping]):
             internal_names = list(range(len(self._mapping)))
 
@@ -177,10 +217,7 @@ class ParseMapping:
 
         return mongo_names
 
-    def get_types(self) -> dict:
-        '''
-        '''
-        return self._get_info(key="type")
+
 
     def get_python_types(self) -> dict:
         '''
@@ -196,10 +233,7 @@ class ParseMapping:
 
         return {k: sql_to_python_dtypes[v] for k, v in sql_types.items()}
 
-    def get_value_mappings(self) -> dict:
-        '''
-        '''
-        return self._get_info(key="value_mapping")
+
 
     def get_column_numbers(self) -> list:
         '''
@@ -221,7 +255,7 @@ class ParseMapping:
 
 if __name__ == "__main__":
 
-    mapping_path = os.path.join(".", "migration_mappings", "rs0_mapping.json")
+    mapping_path = os.path.join(".", "migration_mappings", "unit_test_migration_mapping.json")
 
     if os.path.isfile(mapping_path):
 
@@ -230,14 +264,19 @@ if __name__ == "__main__":
         parser = ParseMapping(mapping_path, source="internal_name",
                               target="mongo_name")
 
-        internal_to_mongo_mapping = parser.get_field_mapping()
-
-        original_to_internal_mapping = parser.get_field_mapping()
-
         default_values = parser.get_default_values()
-
+        print(default_values)
+        date_formats = parser.get_date_formats()
+        print(date_formats)
+        mongo_names = parser.get_mongo_names()
+        print(mongo_names)
         types = parser.get_types()
-
+        print(types)
         column_numbers = parser.get_column_numbers()
+        print(column_numbers)
+        value_mappings = parser.get_value_mappings()
+        print(value_mappings)
+        sys.exit()
+
 
         print("Done testing!")