|
@@ -293,22 +293,19 @@ class MongodbHandler:
|
|
|
else:
|
|
|
return data
|
|
|
|
|
|
- def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list, index: str = None):
|
|
|
+ def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list, index: str = None, return_as_dataframe=True):
|
|
|
|
|
|
try:
|
|
|
data = self._database[collection_name].aggregate(pipeline=aggregation_pipeline, allowDiskUse=True)
|
|
|
except Exception as error:
|
|
|
self._log.log_and_raise_error(('A problem occured when aggregating the collection {} with the pipeline {}. \nError: {}').format(collection_name, aggregation_pipeline, error))
|
|
|
return None
|
|
|
- if data.collection.count_documents(aggregation_pipeline) != 0:
|
|
|
- if return_as_dataframe:
|
|
|
- return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
- else:
|
|
|
- return data
|
|
|
+
|
|
|
+ if return_as_dataframe:
|
|
|
+ return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
else:
|
|
|
- self._log.warning('No data was found for the query')
|
|
|
- return None
|
|
|
-
|
|
|
+ return data
|
|
|
+
|
|
|
def convert_mongo_data_into_dataframe(self, data, index: str = None, collection_name: str = None, chunksize: int = 500) -> pd.DataFrame():
|
|
|
|
|
|
start_time = time.time()
|