Browse Source

Solve merge conflict

ogert 4 years ago
parent
commit
0f6321d948

+ 26 - 51
cdplib/db_handlers/MongodbHandler.py

@@ -20,7 +20,6 @@ import pandas as pd
 import numpy as np
 
 sys.path.append(os.getcwd())
-from pprint import pprint
 from cdplib.log import Log
 from cdplib.db_migration.ParseJsonSchema import ParseJsonSchema
 from cdplib.Singleton_Threadsafe import SingletonThreadsafe
@@ -90,7 +89,7 @@ class MongodbHandler:
             self._client.close()
         except Exception as e:
             self._log.log_and_raise_error(('An error occured when trying to dispose the SQL engine. Error: {}').format(e))
-    
+
 
     def set_database(self, database_name: str):
         '''
@@ -110,7 +109,7 @@ class MongodbHandler:
             self._client.drop_database(self._database_name)
         except Exception as error:
             self._log.log_and_raise_error(('Couldnt drop the database. Error: {}').format(error))
-    
+
 
     def drop_collection(self, collection_name: str):
         '''
@@ -121,7 +120,7 @@ class MongodbHandler:
                 self._log.warning(('Couldnt drop the collection {}. Error: {}').format(collection_name, return_message['errmsg']))
         except Exception as error:
             self._log.log_and_raise_error(('Couldnt drop the collection {}. Error: {}').format(collection_name, error))
-    
+
     def create_index(self, collection_name: str, key: (str, int, list), direction: (str, int)='text'):
         '''
         :param str collection_name: name on the collection for which the schema will be set.
@@ -140,15 +139,15 @@ class MongodbHandler:
         assert(isinstance(direction, str)),\
             "Parameter 'direction' must be a string type"
         if type(key) == list:
-            
+
             key_list=[]
             for item in key:
                 key_list.append((item,direction))
-            
+
             self._database[collection_name].create_index(key_list,name='_'.join(key))
         else:
             self._database[collection_name].create_index([(key, direction)], name=key)
-        
+
 
     def set_collection_schema(self, collection_name: str, schema_path: str,
                               validation_level: str = 'moderate',validation_action: str = 'error'):
@@ -166,14 +165,13 @@ class MongodbHandler:
             "Parameter 'validation_level' must be a string type"
         assert(isinstance(validation_action, str)),\
             "Parameter 'validation_action' must be a string type"
-            
+
         parse_obj = ParseJsonSchema(schema_paths=schema_path)
 
-        schema = parse_obj.read_schema_and_parse_for_mongodb(schema_path)
         command = {
                     'collMod': collection_name,
                     'validator': {
-                        '$jsonSchema': schema
+                        '$jsonSchema': parse_obj.schemas[0]
                     },
                     'validationLevel': validation_level,
                     'validationAction': validation_action
@@ -241,19 +239,7 @@ class MongodbHandler:
                 self._database[collection_name].insert_many(data, ordered=ordered)
 
         except Exception as error:
-            if len(data) > 1:
-                self._log.warning(('An error occured inserting {} documents into database: {} and collection: {}.').format(len(data), self._database_name, collection_name))
-                self._log.warning('This might be because one or more documents are invalid.') 
-                self._log.warning('We will try to insert the documents one-by-one and report which are invalid.')
-                self._log.warning(('Error: {}').format(error))
-                for row in data:
-                    try:
-                        self._database[collection_name].insert_one(row)
-                    except Exception as error:
-                        pprint(row)
-                        self._log.warning(error)
-            else:
-                self._log.log_and_raise_error(('An error occured when trying to insert data into {}, {}. \nError: {}').format(self._database_name, collection_name, error))
+            self._log.log_and_raise_error(('An error occured when trying to insert data into {}, {}. \nError: {}').format(self._database_name, collection_name, error))
 
         self._log.info(('Data has been inserted into the {} collection').format(collection_name))
 
@@ -281,25 +267,16 @@ class MongodbHandler:
 
         try:
             if attribute == None or attribute_value == None:
-                query = {}
+                data = self._database[collection_name].find({},return_values)
             else:
-                query = {attribute: {comparison_operator: attribute_value}}
-
-            data = self._database[collection_name].find(query, return_values)
+                data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}}, return_values)
 
         except Exception as error:
             self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}: {}:{}. \nError:{}').format(collection_name, attribute, comparison_operator, attribute_value, error))
-            return None
-
-
-        if data.collection.count_documents(query) != 0:
-            if return_as_dataframe:
-                return self.convert_mongo_data_into_dataframe(data, index, collection_name)
-            else:
-                return data
+        if return_as_dataframe:
+            return self.convert_mongo_data_into_dataframe(data, index, collection_name)
         else:
-            self._log.warning(('No data for the query was found').format())
-            return None
+            return data
 
     def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list, index: str = None):
 
@@ -307,7 +284,6 @@ class MongodbHandler:
             data = self._database[collection_name].aggregate(pipeline=aggregation_pipeline, allowDiskUse=True)
         except Exception as error:
             self._log.log_and_raise_error(('A problem occured when aggregating the collection {} with the pipeline {}. \nError: {}').format(collection_name, aggregation_pipeline, error))
-            return None
 
         return self.convert_mongo_data_into_dataframe(data, index, collection_name)
 
@@ -325,8 +301,7 @@ class MongodbHandler:
                     df.set_index(index, inplace=True)
                 return df
             else:
-                self._log.warning(('No data for the query was found when trying to create the dataframe').format())
-                return None
+                self._log.warning(('No data for the query was found').format())
         except Exception as error:
             self._log.log_and_raise_error(('An error occured trying to convert mongo data into pd.Dataframe. \nError: {} ').format(error))
 
@@ -336,7 +311,7 @@ class MongodbHandler:
 
     def push_data_into_collection(self, data: (dict, list, np.ndarray, pd.DataFrame, pd.Series),
                                                                         collection_name: str,
-                                                                        update_label: str, 
+                                                                        update_label: str,
                                                                         query_label: str,
                                                                         query_value: str):
         '''
@@ -371,8 +346,8 @@ class MongodbHandler:
             :param str date_label: label of the attribute where the date is stored
             :param str from_date_value: date which
             :param str to_date_value: value for the query
-            :param str index: 
-            :param bool return_as_dataframe: 
+            :param str index:
+            :param bool return_as_dataframe:
         '''
         try:
             data = self._database[collection_name].find({date_label: {'$gt': from_date_value, '$lt': to_date_value}})
@@ -386,11 +361,11 @@ class MongodbHandler:
             return data
 
     def query_oldest_or_newest_date_in_collection(self, collection_name: str, date_label: str, oldest: bool = False):
-        
+
         date = None
         direction = pymongo.ASCENDING if oldest else pymongo.DESCENDING
         try:
-        
+
             data = list(self._database[collection_name].find().sort(date_label, direction).limit(1))
             if data:
                 date = self.convert_mongo_data_into_dataframe(data, collection_name=collection_name)[date_label].values[0]
@@ -405,7 +380,7 @@ class MongodbHandler:
     def query_with_sorting_and_limit(self, collection_name: str, sort_label: str, limit:int, attribute: str = None,
                                           attribute_value: str = None, comparison_operator: str = '$eq', ascending=True,
                                           index = None, return_as_dataframe: bool = True, return_id: bool = False):
-     
+
         direction = pymongo.ASCENDING if ascending else pymongo.DESCENDING
         try:
 
@@ -413,7 +388,7 @@ class MongodbHandler:
                 data = self._database[collection_name].find({},{'_id': return_id}).sort(sort_label, direction).limit(limit)
             else:
                 data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}}, {'_id': return_id}).sort(sort_label, direction).limit(limit)
-            
+
             if len(list(data)) == 0:
                 self._log.warning('No data was found for the query')
                 return None
@@ -427,7 +402,7 @@ class MongodbHandler:
             return data
 
     def update_data_in_collection(self, update_label:str, update_value: str, collection_name:str, query_label: str = None, query_value: str = None, create_if_not_exist: bool = True, find_query: dict = None, update_many: bool = False):
-        
+
         if isinstance(update_value, pd.DataFrame):
             update_value = simplejson.loads(update_value.to_json(orient="records",
                                                  date_format="iso"))
@@ -435,7 +410,7 @@ class MongodbHandler:
             if update_many:
                 if query_label and query_value:
                     self._database[collection_name].update_many({query_label:query_value}, {"$set": {update_label: update_value}}, upsert=create_if_not_exist)
-                
+
                 elif find_query:
                     self._database[collection_name].update_many(find_query, {"$set": {update_label: update_value}}, upsert=create_if_not_exist)
 
@@ -445,7 +420,7 @@ class MongodbHandler:
             else:
                 if query_label and query_value:
                     self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}}, upsert=create_if_not_exist)
-                
+
                 elif find_query:
                     self._database[collection_name].update_one(find_query, {"$set": {update_label: update_value}}, upsert=create_if_not_exist)
 
@@ -457,7 +432,7 @@ class MongodbHandler:
         except Exception as error:
             self._log.log_and_raise_error(('There was a problem updating data for label: {}, Error: {}').format(update_label, error))
 
-    
+
 
 if __name__ == "__main__":
 

+ 10 - 7
cdplib/db_migration/MigrationCleaning.py

@@ -350,16 +350,16 @@ class MigrationCleaning:
                             formats=self._date_formats[column])
 
                 elif (python_type == int) and data[column].isnull().any():
-                    
+
                     self.log.log_and_raise_error(("Column {} contains missing values "
                                         "and cannot be of integer type"
                                         .format(column)))
-                    
+
                 elif python_type == bool:
-                    
+
                     data[column] = data[column].str.lower()
-                    accepted_bool = {'ja': True, 'j': True, '1': True, 
-                                     'yes': True, 'y': True, 'true':True, 
+                    accepted_bool = {'ja': True, 'j': True, '1': True,
+                                     'yes': True, 'y': True, 'true':True,
                                      't': True, 'nein': False, 'n': False,
                                      'no': False, 'false': False, 'f': False,
                                      '0': False}
@@ -368,7 +368,7 @@ class MigrationCleaning:
 
 
                 elif python_type == str:
-                     
+
                     # might not be the smoothes solution but it works
                     python_type = str
                     data = data.copy(deep=True)
@@ -382,6 +382,10 @@ class MigrationCleaning:
                     data[column] = data[column].astype(python_type)
 
                 if data[column].dtype != python_type:
+                    print('---------------------------------------------')
+                    print(data[column].to_csv(column))
+                    print(python_type)
+                    print(column)
 
                     self.log.warning(("After conversion type in {0} "
                                        "should be {1} "
@@ -578,4 +582,3 @@ if __name__ == "__main__":
         data = cleaner.filter_notallowed_values(data)
 
     print("Done!")
-    

+ 98 - 59
cdplib/db_migration/ParseJsonSchema.py

@@ -49,14 +49,22 @@ class ParseJsonSchema(ParseDbSchema):
         for schema_path in schema_paths:
             try:
                 with open(schema_path, "r") as f:
-                   schema = json.load(f) 
-                # Load schmea dereferenced
-                self.schemas.append(self._dereference_schema(schema))
-#                self.schemas.append(schema)
+                    schema = json.load(f)
+
+                ref_flag = self._analyze_schema(schema)
+
+                if ref_flag:
+                    schema = self._format_schema_for_mongo(schema)
+                    schema = self._dereference_schema(schema)
+                    schema = self._format_schema_for_mongo(schema)
+                    self.schemas.append(schema)
+                else:
+                    schema = self._format_schema_for_mongo(schema)
+                    self.schemas.append(schema)
 
             except Exception as e:
-                err = ("Could not load json schema, "
-                       "Obtained error {}".format(e))
+                err = ("Could not load json schema:{1} , "
+                       "Obtained error {0}".format(e,schema_path))
 
                 self._log.error(err)
                 raise Exception(err)
@@ -332,42 +340,9 @@ class ParseJsonSchema(ParseDbSchema):
 
         return already_parsed
 
-    def _dereference_schema(self, schema: dict) -> dict:
-        '''
-        :param dict schema: dictionary containing a schema which uses references.
-        '''
-
-        assert(isinstance(schema, dict)),\
-            "Parameter 'schema' must be a dictionary type"
-            
-        # check if schema contains no null values in list of the example attribute    
-        self._analyze_schema(schema)
-            
-        base_dir_url = Path(os.path.join(os.getcwd(), "mongo_schema")).as_uri() + '/'
-        schema = jsonref.loads(str(schema).replace("'", "\""), base_uri=base_dir_url)
-        schema = deepcopy(schema)
-        schema.pop('definitions', None)
-        return schema
-
-    def _remove_defaults(self, schema: dict) -> dict:
-        '''
-        :param dict schema: dictionary containing a schema which uses references.
-        '''
-        if 'default' in schema:
-            del schema['default']
-        if 'default_values' in schema:
-            del schema['default_values']
-        return schema
-    
-        assert(isinstance(schema, dict)),\
-        "Parameter 'schema' must be a dictionary type"
-    
-    # Need to parse schmema for importing to mongo db 
-    # Reason:
-    # We need to drop default values since MongoDB can't handle them
-    # We need to deference json before import to Mongo DB pymongo can't deal with references
     def read_schema_and_parse_for_mongodb(self, schema_path: str) -> dict:
         '''
+        We need to deference json before import to Mongo DB pymongo can't deal with references
         :param str schema_path: path to the schema file.
         '''
 
@@ -383,29 +358,100 @@ class ParseJsonSchema(ParseDbSchema):
             schema = self._dereference_schema(schema)
 
         return schema
-           
-   
-    def _analyze_schema(self, schema: dict, definitions_flag: bool = False) -> dict:
 
+    def _analyze_schema (self, schema: dict, definitions_flag: bool = False) -> dict:
 
         for key in list(schema):
 
             if key == '$ref':
                 definitions_flag = True
                 return definitions_flag
-            
-            if key == 'examples' and None in schema[key]:
-                err = ("Schema can not contain list with None values. Jsonref dosen't support it.")
-                self._log.log_and_raise_error(err)
-    
+
             if type(schema[key]) == dict:
                 definitions_flag = self._analyze_schema(schema[key], definitions_flag)
-                
+
+        return definitions_flag
+
+    def _format_schema_for_mongo(self, schema: dict) -> dict:
+        '''
+        We use in the schema tags whih are not supported by mongo an threfore
+        must be taken care of before setting the schema for mongo.
+        :param str schema_path: path to the schema file.
+        '''
+
+        for key in list(schema):
+
+            if key == 'description':
+                cleaned_description = self._remove_single_quotes_from_description_tag(schema[key])
+                schema[key] = cleaned_description
+
+            if type(schema[key]) == dict:
+                self._format_schema_for_mongo(schema[key])
+
+            if key == 'examples':
+                self._remove_examples(schema)
+
+
             if key == 'default' or key == 'default_values':
                 self._remove_defaults(schema)
-            
-        return definitions_flag
 
+        return schema
+
+    def _dereference_schema(self, schema: dict) -> dict:
+        '''
+        :param dict schema: dictionary containing a schema which uses references.
+        '''
+
+        assert(isinstance(schema, dict)),\
+            "Parameter 'schema' must be a dictionary type"
+
+        base_dir_url = Path(os.path.join(os.getcwd(), "mongo_schema")).as_uri() + '/'
+        # json.load(f) convert double quotes into singel quotes. jsonref expects
+        # the json in string format with double quotes.
+        schema = str(schema).replace("'", "\"")
+        schema = jsonref.loads(schema, base_uri=base_dir_url)
+        schema = deepcopy(schema)
+        #schema.pop('definitions', None)
+        return schema
+
+    def _remove_defaults(self, schema: dict) -> dict:
+        '''
+        :param dict schema: dictionary containing a schema which uses 'default' tags.
+        '''
+        assert(isinstance(schema, dict)),\
+        "Parameter 'schema' must be a dictionary type"
+
+        if 'default' in schema:
+            del schema['default']
+        if 'default_values' in schema:
+            del schema['default_values']
+
+
+    def _remove_examples(self, schema: dict) -> dict:
+        '''
+        :param dict schema: dictionary containing a schema with 'examples' tags.
+        '''
+
+        assert(isinstance(schema, dict)),\
+        "Parameter 'schema' must be a dictionary type"
+
+        if 'examples' in schema:
+            del schema['examples']
+
+        assert(isinstance(schema, dict)),\
+        "Parameter 'schema' must be a dictionary type"
+
+    def _remove_single_quotes_from_description_tag(self, description: str) -> str:
+        '''
+        :param dict schema: dictionary containing a schema with 'examples' tags.
+        '''
+
+        assert(isinstance(description, str)),\
+        "Parameter 'description' must be a string type"
+
+        description = description.replace("'", "")
+
+        return description
 
 
 if __name__ == "__main__":
@@ -413,7 +459,7 @@ if __name__ == "__main__":
 #     Only for testing
 
     schema_path = os.path.join(".", "mongo_schema", "schema_components.json")
-    
+
     if os.path.isfile(schema_path):
 
         parse_obj = ParseJsonSchema(schema_paths=schema_path)
@@ -433,12 +479,5 @@ if __name__ == "__main__":
         allowed_values = parse_obj.get_allowed_values()
 
         descriptions = parse_obj.get_field_descriptions()
-    
-    
 
     
-    
-    
-    
-    
-