|
@@ -20,7 +20,6 @@ import pandas as pd
|
|
|
import numpy as np
|
|
|
|
|
|
sys.path.append(os.getcwd())
|
|
|
-from pprint import pprint
|
|
|
from cdplib.log import Log
|
|
|
from cdplib.db_migration.ParseJsonSchema import ParseJsonSchema
|
|
|
from cdplib.Singleton_Threadsafe import SingletonThreadsafe
|
|
@@ -90,7 +89,7 @@ class MongodbHandler:
|
|
|
self._client.close()
|
|
|
except Exception as e:
|
|
|
self._log.log_and_raise_error(('An error occured when trying to dispose the SQL engine. Error: {}').format(e))
|
|
|
-
|
|
|
+
|
|
|
|
|
|
def set_database(self, database_name: str):
|
|
|
'''
|
|
@@ -110,7 +109,7 @@ class MongodbHandler:
|
|
|
self._client.drop_database(self._database_name)
|
|
|
except Exception as error:
|
|
|
self._log.log_and_raise_error(('Couldnt drop the database. Error: {}').format(error))
|
|
|
-
|
|
|
+
|
|
|
|
|
|
def drop_collection(self, collection_name: str):
|
|
|
'''
|
|
@@ -121,7 +120,7 @@ class MongodbHandler:
|
|
|
self._log.warning(('Couldnt drop the collection {}. Error: {}').format(collection_name, return_message['errmsg']))
|
|
|
except Exception as error:
|
|
|
self._log.log_and_raise_error(('Couldnt drop the collection {}. Error: {}').format(collection_name, error))
|
|
|
-
|
|
|
+
|
|
|
def create_index(self, collection_name: str, key: (str, int, list), direction: (str, int)='text'):
|
|
|
'''
|
|
|
:param str collection_name: name on the collection for which the schema will be set.
|
|
@@ -140,15 +139,15 @@ class MongodbHandler:
|
|
|
assert(isinstance(direction, str)),\
|
|
|
"Parameter 'direction' must be a string type"
|
|
|
if type(key) == list:
|
|
|
-
|
|
|
+
|
|
|
key_list=[]
|
|
|
for item in key:
|
|
|
key_list.append((item,direction))
|
|
|
-
|
|
|
+
|
|
|
self._database[collection_name].create_index(key_list,name='_'.join(key))
|
|
|
else:
|
|
|
self._database[collection_name].create_index([(key, direction)], name=key)
|
|
|
-
|
|
|
+
|
|
|
|
|
|
def set_collection_schema(self, collection_name: str, schema_path: str,
|
|
|
validation_level: str = 'moderate',validation_action: str = 'error'):
|
|
@@ -166,14 +165,13 @@ class MongodbHandler:
|
|
|
"Parameter 'validation_level' must be a string type"
|
|
|
assert(isinstance(validation_action, str)),\
|
|
|
"Parameter 'validation_action' must be a string type"
|
|
|
-
|
|
|
+
|
|
|
parse_obj = ParseJsonSchema(schema_paths=schema_path)
|
|
|
|
|
|
- schema = parse_obj.read_schema_and_parse_for_mongodb(schema_path)
|
|
|
command = {
|
|
|
'collMod': collection_name,
|
|
|
'validator': {
|
|
|
- '$jsonSchema': schema
|
|
|
+ '$jsonSchema': parse_obj.schemas[0]
|
|
|
},
|
|
|
'validationLevel': validation_level,
|
|
|
'validationAction': validation_action
|
|
@@ -241,19 +239,7 @@ class MongodbHandler:
|
|
|
self._database[collection_name].insert_many(data, ordered=ordered)
|
|
|
|
|
|
except Exception as error:
|
|
|
- if len(data) > 1:
|
|
|
- self._log.warning(('An error occured inserting {} documents into database: {} and collection: {}.').format(len(data), self._database_name, collection_name))
|
|
|
- self._log.warning('This might be because one or more documents are invalid.')
|
|
|
- self._log.warning('We will try to insert the documents one-by-one and report which are invalid.')
|
|
|
- self._log.warning(('Error: {}').format(error))
|
|
|
- for row in data:
|
|
|
- try:
|
|
|
- self._database[collection_name].insert_one(row)
|
|
|
- except Exception as error:
|
|
|
- pprint(row)
|
|
|
- self._log.warning(error)
|
|
|
- else:
|
|
|
- self._log.log_and_raise_error(('An error occured when trying to insert data into {}, {}. \nError: {}').format(self._database_name, collection_name, error))
|
|
|
+ self._log.log_and_raise_error(('An error occured when trying to insert data into {}, {}. \nError: {}').format(self._database_name, collection_name, error))
|
|
|
|
|
|
self._log.info(('Data has been inserted into the {} collection').format(collection_name))
|
|
|
|
|
@@ -281,25 +267,16 @@ class MongodbHandler:
|
|
|
|
|
|
try:
|
|
|
if attribute == None or attribute_value == None:
|
|
|
- query = {}
|
|
|
+ data = self._database[collection_name].find({},return_values)
|
|
|
else:
|
|
|
- query = {attribute: {comparison_operator: attribute_value}}
|
|
|
-
|
|
|
- data = self._database[collection_name].find(query, return_values)
|
|
|
+ data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}}, return_values)
|
|
|
|
|
|
except Exception as error:
|
|
|
self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}: {}:{}. \nError:{}').format(collection_name, attribute, comparison_operator, attribute_value, error))
|
|
|
- return None
|
|
|
-
|
|
|
-
|
|
|
- if data.collection.count_documents(query) != 0:
|
|
|
- if return_as_dataframe:
|
|
|
- return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
- else:
|
|
|
- return data
|
|
|
+ if return_as_dataframe:
|
|
|
+ return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
else:
|
|
|
- self._log.warning(('No data for the query was found').format())
|
|
|
- return None
|
|
|
+ return data
|
|
|
|
|
|
def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list, index: str = None):
|
|
|
|
|
@@ -307,7 +284,6 @@ class MongodbHandler:
|
|
|
data = self._database[collection_name].aggregate(pipeline=aggregation_pipeline, allowDiskUse=True)
|
|
|
except Exception as error:
|
|
|
self._log.log_and_raise_error(('A problem occured when aggregating the collection {} with the pipeline {}. \nError: {}').format(collection_name, aggregation_pipeline, error))
|
|
|
- return None
|
|
|
|
|
|
return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
|
|
@@ -325,8 +301,7 @@ class MongodbHandler:
|
|
|
df.set_index(index, inplace=True)
|
|
|
return df
|
|
|
else:
|
|
|
- self._log.warning(('No data for the query was found when trying to create the dataframe').format())
|
|
|
- return None
|
|
|
+ self._log.warning(('No data for the query was found').format())
|
|
|
except Exception as error:
|
|
|
self._log.log_and_raise_error(('An error occured trying to convert mongo data into pd.Dataframe. \nError: {} ').format(error))
|
|
|
|
|
@@ -336,7 +311,7 @@ class MongodbHandler:
|
|
|
|
|
|
def push_data_into_collection(self, data: (dict, list, np.ndarray, pd.DataFrame, pd.Series),
|
|
|
collection_name: str,
|
|
|
- update_label: str,
|
|
|
+ update_label: str,
|
|
|
query_label: str,
|
|
|
query_value: str):
|
|
|
'''
|
|
@@ -371,8 +346,8 @@ class MongodbHandler:
|
|
|
:param str date_label: label of the attribute where the date is stored
|
|
|
:param str from_date_value: date which
|
|
|
:param str to_date_value: value for the query
|
|
|
- :param str index:
|
|
|
- :param bool return_as_dataframe:
|
|
|
+ :param str index:
|
|
|
+ :param bool return_as_dataframe:
|
|
|
'''
|
|
|
try:
|
|
|
data = self._database[collection_name].find({date_label: {'$gt': from_date_value, '$lt': to_date_value}})
|
|
@@ -386,11 +361,11 @@ class MongodbHandler:
|
|
|
return data
|
|
|
|
|
|
def query_oldest_or_newest_date_in_collection(self, collection_name: str, date_label: str, oldest: bool = False):
|
|
|
-
|
|
|
+
|
|
|
date = None
|
|
|
direction = pymongo.ASCENDING if oldest else pymongo.DESCENDING
|
|
|
try:
|
|
|
-
|
|
|
+
|
|
|
data = list(self._database[collection_name].find().sort(date_label, direction).limit(1))
|
|
|
if data:
|
|
|
date = self.convert_mongo_data_into_dataframe(data, collection_name=collection_name)[date_label].values[0]
|
|
@@ -405,7 +380,7 @@ class MongodbHandler:
|
|
|
def query_with_sorting_and_limit(self, collection_name: str, sort_label: str, limit:int, attribute: str = None,
|
|
|
attribute_value: str = None, comparison_operator: str = '$eq', ascending=True,
|
|
|
index = None, return_as_dataframe: bool = True, return_id: bool = False):
|
|
|
-
|
|
|
+
|
|
|
direction = pymongo.ASCENDING if ascending else pymongo.DESCENDING
|
|
|
try:
|
|
|
|
|
@@ -413,7 +388,7 @@ class MongodbHandler:
|
|
|
data = self._database[collection_name].find({},{'_id': return_id}).sort(sort_label, direction).limit(limit)
|
|
|
else:
|
|
|
data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}}, {'_id': return_id}).sort(sort_label, direction).limit(limit)
|
|
|
-
|
|
|
+
|
|
|
if len(list(data)) == 0:
|
|
|
self._log.warning('No data was found for the query')
|
|
|
return None
|
|
@@ -427,7 +402,7 @@ class MongodbHandler:
|
|
|
return data
|
|
|
|
|
|
def update_data_in_collection(self, update_label:str, update_value: str, collection_name:str, query_label: str = None, query_value: str = None, create_if_not_exist: bool = True, find_query: dict = None, update_many: bool = False):
|
|
|
-
|
|
|
+
|
|
|
if isinstance(update_value, pd.DataFrame):
|
|
|
update_value = simplejson.loads(update_value.to_json(orient="records",
|
|
|
date_format="iso"))
|
|
@@ -435,7 +410,7 @@ class MongodbHandler:
|
|
|
if update_many:
|
|
|
if query_label and query_value:
|
|
|
self._database[collection_name].update_many({query_label:query_value}, {"$set": {update_label: update_value}}, upsert=create_if_not_exist)
|
|
|
-
|
|
|
+
|
|
|
elif find_query:
|
|
|
self._database[collection_name].update_many(find_query, {"$set": {update_label: update_value}}, upsert=create_if_not_exist)
|
|
|
|
|
@@ -445,7 +420,7 @@ class MongodbHandler:
|
|
|
else:
|
|
|
if query_label and query_value:
|
|
|
self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}}, upsert=create_if_not_exist)
|
|
|
-
|
|
|
+
|
|
|
elif find_query:
|
|
|
self._database[collection_name].update_one(find_query, {"$set": {update_label: update_value}}, upsert=create_if_not_exist)
|
|
|
|
|
@@ -457,7 +432,7 @@ class MongodbHandler:
|
|
|
except Exception as error:
|
|
|
self._log.log_and_raise_error(('There was a problem updating data for label: {}, Error: {}').format(update_label, error))
|
|
|
|
|
|
-
|
|
|
+
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|