|
@@ -242,8 +242,23 @@ class MongodbHandler:
|
|
|
else:
|
|
|
self._database[collection_name].insert_many(data, ordered=ordered)
|
|
|
|
|
|
- except pymongo.errors.BulkWriteError as error:
|
|
|
- pprint(error.details)
|
|
|
+ except Exception as error:
|
|
|
+ if len(data) > 1:
|
|
|
+
|
|
|
+ self._log.warning(('An error occured inserting {} documents into database: {} and collection: {}.').format(len(data), self._database_name, collection_name))
|
|
|
+ self._log.warning('This might be because one or more documents are invalid.')
|
|
|
+ self._log.warning('We will try to insert the documents one-by-one and report which are invalid.')
|
|
|
+ self._log.warning(('Error: {}').format(error))
|
|
|
+
|
|
|
+ for row in data:
|
|
|
+
|
|
|
+ try:
|
|
|
+ self._database[collection_name].insert_one(row)
|
|
|
+ except Exception as error:
|
|
|
+ pprint(row)
|
|
|
+ self._log.warning(error)
|
|
|
+ else:
|
|
|
+ self._log.log_and_raise_error(('An error occured when trying to insert data into {}, {}. \nError: {}').format(self._database_name, collection_name, error))
|
|
|
|
|
|
self._log.info(('Data has been inserted into the {} collection').format(collection_name))
|
|
|
|
|
@@ -340,10 +355,7 @@ class MongodbHandler:
|
|
|
if records:
|
|
|
frames.append(pd.DataFrame(records))
|
|
|
|
|
|
- if len(frames) > 1:
|
|
|
- return_df = pd.concat(frames, axis=0, sort=False)
|
|
|
- else:
|
|
|
- return_df = frames[0]
|
|
|
+ return_df = pd.concat(frames, axis=0, sort=False)
|
|
|
|
|
|
if index is not None:
|
|
|
return_df.set_index(index, inplace=True)
|
|
@@ -373,10 +385,12 @@ class MongodbHandler:
|
|
|
:param str query_label: label for the query
|
|
|
:param str query_value: value for the query
|
|
|
'''
|
|
|
-
|
|
|
-
|
|
|
- self._database[collection_name].update_one({query_label:query_value}, {"$push": {update_label: data}})
|
|
|
- self._log.info(('A document has been pushed into the {} array in the {} collection').format(query_value, collection_name))
|
|
|
+ if type(data) == list:
|
|
|
+ self._database[collection_name].update_one({query_label:query_value}, {"$push": {update_label: {"$each": data}}})
|
|
|
+ self._log.info(('A document has been pushed into the {} array in the {} collection').format(query_value, collection_name))
|
|
|
+ else:
|
|
|
+ self._database[collection_name].update_one({query_label:query_value}, {"$push": {update_label: data}})
|
|
|
+ self._log.info(('A document has been pushed into the {} array in the {} collection').format(query_value, collection_name))
|
|
|
|
|
|
def document_exists(self, collection_name: str, query_label: str, query_value:str):
|
|
|
'''
|
|
@@ -388,7 +402,7 @@ class MongodbHandler:
|
|
|
'''
|
|
|
return self._database[collection_name].find({query_label:query_value}).count() > 0
|
|
|
|
|
|
- def query_data_between_dates_and_generate_dataframe(self, collection_name: str, date_label: str = None, from_date_value: str = None, to_date_value: str = None, index: str = None, return_id: bool = False, return_as_dataframe: bool = True, find_query: dict = None):
|
|
|
+ def query_data_between_dates_and_generate_dataframe(self, collection_name: str, date_label: str, from_date_value: str, to_date_value: str, index: str = None, return_id: bool = False, return_as_dataframe: bool = True):
|
|
|
'''
|
|
|
Queries data between two dates.
|
|
|
|
|
@@ -401,30 +415,18 @@ class MongodbHandler:
|
|
|
'''
|
|
|
assert(isinstance(collection_name, str)),\
|
|
|
"Parameter 'collection_name' must be a string type"
|
|
|
+ try:
|
|
|
+ query = {date_label: {'$gt': from_date_value, '$lt': to_date_value}}
|
|
|
+ data = self._database[collection_name].find(query, {'_id': return_id})
|
|
|
|
|
|
- if date_label and from_date_value and to_date_value or find_query:
|
|
|
-
|
|
|
- try:
|
|
|
-
|
|
|
- if find_query:
|
|
|
- query = find_query
|
|
|
- else:
|
|
|
- query = {date_label: {'$gt': from_date_value, '$lt': to_date_value}}
|
|
|
-
|
|
|
- data = self._database[collection_name].find(query, {'_id': return_id})
|
|
|
-
|
|
|
- except Exception as error:
|
|
|
- self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}. \nError:{}').format(collection_name, query, error))
|
|
|
+ except Exception as error:
|
|
|
+ self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}. \nError:{}').format(collection_name, query, error))
|
|
|
|
|
|
- if data.collection.count_documents(query) != 0:
|
|
|
- if return_as_dataframe:
|
|
|
- return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
- else:
|
|
|
- return data
|
|
|
+ if data.collection.count_documents(query) != 0:
|
|
|
+ if return_as_dataframe:
|
|
|
+ return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
else:
|
|
|
- self._log.warning(('No data was found for the query: {}, in collection: {}').format(query, collection_name))
|
|
|
- return None
|
|
|
-
|
|
|
+ return data
|
|
|
|
|
|
def query_oldest_or_newest_date_in_collection(self, collection_name: str, date_label: str, oldest: bool = False):
|
|
|
|
|
@@ -503,20 +505,6 @@ class MongodbHandler:
|
|
|
except Exception as error:
|
|
|
self._log.log_and_raise_error(('There was a problem updating data for label: {}, Error: {}').format(update_label, error))
|
|
|
|
|
|
- def which_document_in_list_exists(self, collection_name: str, query_label: str, query_values:list):
|
|
|
- '''
|
|
|
- Checking whether the document in the list exist or not.
|
|
|
-
|
|
|
- :param str collection_name: collection to add data to
|
|
|
- :param str query_label: label for the query
|
|
|
- :param list query_values: values to see if they exist or not
|
|
|
- '''
|
|
|
- query = {query_label:{'$in': query_values}}
|
|
|
- data = self._database[collection_name].find(query,{query_label:1, '_id':0})
|
|
|
- if data.collection.count_documents(query) != 0:
|
|
|
- return [value[query_label] for value in data]
|
|
|
- else:
|
|
|
- return []
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|