|
@@ -240,7 +240,22 @@ class MongodbHandler:
|
|
|
self._database[collection_name].insert_many(data, ordered=ordered)
|
|
|
|
|
|
except Exception as error:
|
|
|
- self._log.log_and_raise_error(('An error occured when trying to insert data into {}, {}. \nError: {}').format(self._database_name, collection_name, error))
|
|
|
+ if len(data) > 1:
|
|
|
+
|
|
|
+ self._log.warning(('An error occured inserting {} documents into database: {} and collection: {}.').format(len(data), self._database_name, collection_name))
|
|
|
+ self._log.warning('This might be because one or more documents are invalid.')
|
|
|
+ self._log.warning('We will try to insert the documents one-by-one and report which are invalid.')
|
|
|
+ self._log.warning(('Error: {}').format(error))
|
|
|
+
|
|
|
+ for row in data:
|
|
|
+
|
|
|
+ try:
|
|
|
+ self._database[collection_name].insert_one(row)
|
|
|
+ except Exception as error:
|
|
|
+ pprint(row)
|
|
|
+ self._log.warning(error)
|
|
|
+ else:
|
|
|
+ self._log.log_and_raise_error(('An error occured when trying to insert data into {}, {}. \nError: {}').format(self._database_name, collection_name, error))
|
|
|
|
|
|
self._log.info(('Data has been inserted into the {} collection').format(collection_name))
|
|
|
|
|
@@ -274,10 +289,13 @@ class MongodbHandler:
|
|
|
|
|
|
except Exception as error:
|
|
|
self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}: {}:{}. \nError:{}').format(collection_name, attribute, comparison_operator, attribute_value, error))
|
|
|
- if return_as_dataframe:
|
|
|
- return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
- else:
|
|
|
- return data
|
|
|
+ return None
|
|
|
+
|
|
|
+ if data.collection.count_documents(query) != 0:
|
|
|
+ if return_as_dataframe:
|
|
|
+ return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
+ else:
|
|
|
+ return data
|
|
|
|
|
|
def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list, index: str = None):
|
|
|
|
|
@@ -285,6 +303,7 @@ class MongodbHandler:
|
|
|
data = self._database[collection_name].aggregate(pipeline=aggregation_pipeline, allowDiskUse=True)
|
|
|
except Exception as error:
|
|
|
self._log.log_and_raise_error(('A problem occured when aggregating the collection {} with the pipeline {}. \nError: {}').format(collection_name, aggregation_pipeline, error))
|
|
|
+ return None
|
|
|
|
|
|
return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
|
|
@@ -312,27 +331,31 @@ class MongodbHandler:
|
|
|
except Exception as error:
|
|
|
self._log.log_and_raise_error(('An error occured trying to convert mongo data into pd.Dataframe. \nError: {} ').format(error))
|
|
|
'''
|
|
|
- frames = []
|
|
|
- records = []
|
|
|
- for iteration, value in enumerate(data):
|
|
|
+ if data.collection.count_documents(query) != 0:
|
|
|
+ frames = []
|
|
|
+ records = []
|
|
|
+ for iteration, value in enumerate(data):
|
|
|
|
|
|
- records.append(value)
|
|
|
- if iteration % chunksize == 0:
|
|
|
- frames.append(pd.DataFrame(records))
|
|
|
- records = []
|
|
|
+ records.append(value)
|
|
|
+ if iteration % chunksize == 0:
|
|
|
+ frames.append(pd.DataFrame(records))
|
|
|
+ records = []
|
|
|
|
|
|
- if records:
|
|
|
- frames.append(pd.DataFrame(records))
|
|
|
- return_df = pd.concat(frames, axis=0, sort=False)
|
|
|
+ if records:
|
|
|
+ frames.append(pd.DataFrame(records))
|
|
|
+ return_df = pd.concat(frames, axis=0, sort=False)
|
|
|
|
|
|
+ if index is not None:
|
|
|
+ return_df.set_index(index, inplace=True)
|
|
|
|
|
|
- print(index)
|
|
|
- if index is not None:
|
|
|
- return_df.set_index(index, inplace=True)
|
|
|
+ self._log.info(('{} Rows were fetched from {}. DataFrame conversion is done, took {} seconds').format(len(return_df.index), collection_name if collection_name is not None else 'the database', time.time()-start_time))
|
|
|
+
|
|
|
+ return return_df
|
|
|
|
|
|
- self._log.info(('{} Rows were fetched from {}. DataFrame conversion is done, took {} seconds').format(len(return_df.index), collection_name if collection_name is not None else 'the database', time.time()-start_time))
|
|
|
+ else:
|
|
|
+ self._log.warning('No data for the query was found when trying to create the dataframe')
|
|
|
+ return None
|
|
|
|
|
|
- return return_df
|
|
|
|
|
|
#def update_data_in_collection(self, query_label: str, query_value: str, update_label:str, update_value: str, collection_name:str):
|
|
|
# self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}})
|