@@ -242,8 +242,23 @@ class MongodbHandler:
self._database[collection_name].insert_many(data, ordered=ordered)
- except pymongo.errors.BulkWriteError as error:
- pprint(error.details)
+ except Exception as error:
+ if len(data) > 1:
+ self._log.warning(('An error occured inserting {} documents into database: {} and collection: {}.').format(len(data), self._database_name, collection_name))
+ self._log.warning('This might be because one or more documents are invalid.')
+ self._log.warning('We will try to insert the documents one-by-one and report which are invalid.')
+ self._log.warning(('Error: {}').format(error))
+ for row in data:
+ try:
+ self._database[collection_name].insert_one(row)
+ except Exception as error:
+ pprint(row)
+ self._log.warning(error)
+ else:
+ self._log.log_and_raise_error(('An error occured when trying to insert data into {}, {}. \nError: {}').format(self._database_name, collection_name, error))
self._log.info(('Data has been inserted into the {} collection').format(collection_name))
@@ -340,10 +355,7 @@ class MongodbHandler:
if records:
- if len(frames) > 1:
- return_df = pd.concat(frames, axis=0, sort=False)
- else:
- return_df = frames[0]
+ return_df = pd.concat(frames, axis=0, sort=False)
if index is not None:
return_df.set_index(index, inplace=True)
@@ -373,10 +385,12 @@ class MongodbHandler:
:param str query_label: label for the query
:param str query_value: value for the query
- self._database[collection_name].update_one({query_label:query_value}, {"$push": {update_label: data}})
- self._log.info(('A document has been pushed into the {} array in the {} collection').format(query_value, collection_name))
+ if type(data) == list:
+ self._database[collection_name].update_one({query_label:query_value}, {"$push": {update_label: {"$each": data}}})
+ self._log.info(('A document has been pushed into the {} array in the {} collection').format(query_value, collection_name))
+ else:
+ self._database[collection_name].update_one({query_label:query_value}, {"$push": {update_label: data}})
+ self._log.info(('A document has been pushed into the {} array in the {} collection').format(query_value, collection_name))
def document_exists(self, collection_name: str, query_label: str, query_value:str):
@@ -401,7 +415,7 @@ class MongodbHandler:
assert(isinstance(collection_name, str)),\
"Parameter 'collection_name' must be a string type"
if return_values is None:
return_values = {'_id': return_id}
@@ -419,15 +433,11 @@ class MongodbHandler:
except Exception as error:
self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}. \nError:{}').format(collection_name, query, error))
- if data.collection.count_documents(query) != 0:
- if return_as_dataframe:
- return self.convert_mongo_data_into_dataframe(data, index, collection_name)
- else:
- return data
+ if data.collection.count_documents(query) != 0:
+ if return_as_dataframe:
+ return self.convert_mongo_data_into_dataframe(data, index, collection_name)
- self._log.warning(('No data was found for the query: {}, in collection: {}').format(query, collection_name))
- return None
+ return data
def query_oldest_or_newest_date_in_collection(self, collection_name: str, date_label: str, oldest: bool = False):
@@ -506,20 +516,6 @@ class MongodbHandler:
except Exception as error:
self._log.log_and_raise_error(('There was a problem updating data for label: {}, Error: {}').format(update_label, error))
- def which_document_in_list_exists(self, collection_name: str, query_label: str, query_values:list):
- '''
- Checking whether the document in the list exist or not.
- :param str collection_name: collection to add data to
- :param str query_label: label for the query
- :param list query_values: values to see if they exist or not
- '''
- query = {query_label:{'$in': query_values}}
- data = self._database[collection_name].find(query,{query_label:1, '_id':0})
- if data.collection.count_documents(query) != 0:
- return [value[query_label] for value in data]
- else:
- return []
if __name__ == "__main__":