|
@@ -301,6 +301,123 @@ class MongodbHandler:
|
|
self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}})
|
|
self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}})
|
|
|
|
|
|
|
|
|
|
|
|
+ def push_data_into_collection(self, data: (dict, list, np.ndarray, pd.DataFrame, pd.Series),
|
|
|
|
+ collection_name: str,
|
|
|
|
+ update_label: str,
|
|
|
|
+ query_label: str,
|
|
|
|
+ query_value: str):
|
|
|
|
+ '''
|
|
|
|
+ Adds data to an exising array within the collection.
|
|
|
|
+
|
|
|
|
+ :param data: data to add
|
|
|
|
+ :param str collection_name: collection to add data to
|
|
|
|
+ :param str update_label: label of the attribute where data is to be added
|
|
|
|
+ :param str query_label: label for the query
|
|
|
|
+ :param str query_value: value for the query
|
|
|
|
+ '''
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ self._database[collection_name].update_one({query_label:query_value}, {"$push": {update_label: data}})
|
|
|
|
+ self._log.info(('A document has been pushed into the {} array in the {} collection').format(query_value, collection_name))
|
|
|
|
+
|
|
|
|
+ def document_exists(self, collection_name: str, query_label: str, query_value:str):
|
|
|
|
+ '''
|
|
|
|
+ Checking whether the document exists or not.
|
|
|
|
+
|
|
|
|
+ :param str collection_name: collection to add data to
|
|
|
|
+ :param str query_label: label for the query
|
|
|
|
+ :param str query_value: value for the query
|
|
|
|
+ '''
|
|
|
|
+ return self._database[collection_name].find({query_label:query_value}).count() > 0
|
|
|
|
+
|
|
|
|
+ def query_data_between_dates_and_generate_dataframe(self, collection_name: str, date_label: str, from_date_value: str, to_date_value: str, index: str = None, return_as_dataframe: bool = True):
|
|
|
|
+ '''
|
|
|
|
+ Queries data between two dates.
|
|
|
|
+
|
|
|
|
+ :param str collection_name: collection to add data to
|
|
|
|
+ :param str date_label: label of the attribute where the date is stored
|
|
|
|
+ :param str from_date_value: date which
|
|
|
|
+ :param str to_date_value: value for the query
|
|
|
|
+ :param str index:
|
|
|
|
+ :param bool return_as_dataframe:
|
|
|
|
+ '''
|
|
|
|
+ try:
|
|
|
|
+ data = self._database[collection_name].find({date_label: {'$gt': from_date_value, '$lt': to_date_value}})
|
|
|
|
+
|
|
|
|
+ except Exception as error:
|
|
|
|
+ self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}: $gt:{}, $lt:{}. \nError:{}').format(collection_name, date_label, from_date_value, to_date_value, error))
|
|
|
|
+
|
|
|
|
+ if return_as_dataframe:
|
|
|
|
+ return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
|
+ else:
|
|
|
|
+ return data
|
|
|
|
+
|
|
|
|
+ def query_oldest_or_newest_date_in_collection(self, collection_name: str, date_label: str, oldest: bool = False):
|
|
|
|
+
|
|
|
|
+ date = None
|
|
|
|
+ direction = pymongo.ASCENDING if oldest else pymongo.DESCENDING
|
|
|
|
+ try:
|
|
|
|
+
|
|
|
|
+ data = list(self._database[collection_name].find().sort(date_label, direction).limit(1))
|
|
|
|
+ if data:
|
|
|
|
+ date = self.convert_mongo_data_into_dataframe(data, collection_name=collection_name)[date_label].values[0]
|
|
|
|
+ else:
|
|
|
|
+ self._log.warning('No date was found for this query.')
|
|
|
|
+
|
|
|
|
+ except Exception as error:
|
|
|
|
+ self._log.log_and_raise_error(('An error occured trying to query data from {}, finding the oldest: {}, value for: {}. \nError:{}').format(collection_name, oldest, date_label, error))
|
|
|
|
+
|
|
|
|
+ return date
|
|
|
|
+
|
|
|
|
+ def query_with_sorting_and_limit(self, collection_name: str, sort_label: str, limit:int, attribute: str = None,
|
|
|
|
+ attribute_value: str = None, comparison_operator: str = '$eq', ascending=True,
|
|
|
|
+ index = None, return_as_dataframe: bool = True, return_id: bool = False):
|
|
|
|
+
|
|
|
|
+ direction = pymongo.ASCENDING if ascending else pymongo.DESCENDING
|
|
|
|
+ try:
|
|
|
|
+
|
|
|
|
+ if attribute == None or attribute_value == None:
|
|
|
|
+ data = self._database[collection_name].find({},{'_id': return_id}).sort(sort_label, direction).limit(limit)
|
|
|
|
+ else:
|
|
|
|
+ data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}}, {'_id': return_id}).sort(sort_label, direction).limit(limit)
|
|
|
|
+
|
|
|
|
+ except Exception as error:
|
|
|
|
+ self._log.log_and_raise_error(('An error occured trying to query data from {}, \nError:{}').format(collection_name, error))
|
|
|
|
+
|
|
|
|
+ if return_as_dataframe:
|
|
|
|
+ return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
|
+ else:
|
|
|
|
+ return data
|
|
|
|
+
|
|
|
|
+ def update_data_in_collection(self, update_label:str, update_value: str, collection_name:str, query_label: str = None, query_value: str = None, create_if_not_exist: bool = True, find_query: dict = None):
|
|
|
|
+
|
|
|
|
+ if isinstance(update_value, pd.DataFrame):
|
|
|
|
+ update_value = simplejson.loads(update_value.to_json(orient="records",
|
|
|
|
+ date_format="iso"))
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ print('-$-'*40)
|
|
|
|
+ print(('Data for label: {}').format(update_label))
|
|
|
|
+ print('Size of document:', sys.getsizeof(update_value), 'bytes')
|
|
|
|
+ print('-$-'*40)
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ if query_label and query_value:
|
|
|
|
+ self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}}, upsert=create_if_not_exist)
|
|
|
|
+
|
|
|
|
+ elif find_query:
|
|
|
|
+ self._database[collection_name].update_one(find_query, {"$set": {update_label: update_value}}, upsert=create_if_not_exist)
|
|
|
|
+
|
|
|
|
+ else:
|
|
|
|
+ self._database[collection_name].update_one({}, {"$set": {update_label: update_value}}, upsert=create_if_not_exist)
|
|
|
|
+
|
|
|
|
+ self._log.info(('Data for label: {} has been updated').format(update_label))
|
|
|
|
+
|
|
|
|
+ except Exception as error:
|
|
|
|
+ self._log.log_and_raise_error(('There was a problem updating data for label: {}, Error: {}').format(update_label, error))
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
if __name__ == "__main__":
|
|
if __name__ == "__main__":
|
|
|
|
|
|
log = Log("Test MongodbHandler:")
|
|
log = Log("Test MongodbHandler:")
|