|
@@ -13,11 +13,13 @@ Created on Mon Sep 16 13:27:44 2019
|
|
import simplejson
|
|
import simplejson
|
|
import sys
|
|
import sys
|
|
import os
|
|
import os
|
|
|
|
+import time
|
|
|
|
|
|
import pymongo
|
|
import pymongo
|
|
from pymongo import MongoClient
|
|
from pymongo import MongoClient
|
|
import pandas as pd
|
|
import pandas as pd
|
|
import numpy as np
|
|
import numpy as np
|
|
|
|
+from pprint import pprint
|
|
|
|
|
|
sys.path.append(os.getcwd())
|
|
sys.path.append(os.getcwd())
|
|
from cdplib.log import Log
|
|
from cdplib.log import Log
|
|
@@ -171,7 +173,7 @@ class MongodbHandler:
|
|
command = {
|
|
command = {
|
|
'collMod': collection_name,
|
|
'collMod': collection_name,
|
|
'validator': {
|
|
'validator': {
|
|
- '$jsonSchema': parse_obj.schemas[0]
|
|
|
|
|
|
+ '$jsonSchema': parse_obj.load_and_parse_schema_for_mongodb(schema_path)
|
|
},
|
|
},
|
|
'validationLevel': validation_level,
|
|
'validationLevel': validation_level,
|
|
'validationAction': validation_action
|
|
'validationAction': validation_action
|
|
@@ -239,7 +241,22 @@ class MongodbHandler:
|
|
self._database[collection_name].insert_many(data, ordered=ordered)
|
|
self._database[collection_name].insert_many(data, ordered=ordered)
|
|
|
|
|
|
except Exception as error:
|
|
except Exception as error:
|
|
- self._log.log_and_raise_error(('An error occured when trying to insert data into {}, {}. \nError: {}').format(self._database_name, collection_name, error))
|
|
|
|
|
|
+ if len(data) > 1:
|
|
|
|
+
|
|
|
|
+ self._log.warning(('An error occured inserting {} documents into database: {} and collection: {}.').format(len(data), self._database_name, collection_name))
|
|
|
|
+ self._log.warning('This might be because one or more documents are invalid.')
|
|
|
|
+ self._log.warning('We will try to insert the documents one-by-one and report which are invalid.')
|
|
|
|
+ self._log.warning(('Error: {}').format(error))
|
|
|
|
+
|
|
|
|
+ for row in data:
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ self._database[collection_name].insert_one(row)
|
|
|
|
+ except Exception as error:
|
|
|
|
+ pprint(row)
|
|
|
|
+ self._log.warning(error)
|
|
|
|
+ else:
|
|
|
|
+ self._log.log_and_raise_error(('An error occured when trying to insert data into {}, {}. \nError: {}').format(self._database_name, collection_name, error))
|
|
|
|
|
|
self._log.info(('Data has been inserted into the {} collection').format(collection_name))
|
|
self._log.info(('Data has been inserted into the {} collection').format(collection_name))
|
|
|
|
|
|
@@ -267,16 +284,22 @@ class MongodbHandler:
|
|
|
|
|
|
try:
|
|
try:
|
|
if attribute == None or attribute_value == None:
|
|
if attribute == None or attribute_value == None:
|
|
- data = self._database[collection_name].find({},return_values)
|
|
|
|
|
|
+ query = {}
|
|
|
|
+ data = self._database[collection_name].find(query,return_values)
|
|
|
|
+
|
|
else:
|
|
else:
|
|
- data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}}, return_values)
|
|
|
|
|
|
+ query = {attribute: {comparison_operator: attribute_value}}
|
|
|
|
+ data = self._database[collection_name].find(query, return_values)
|
|
|
|
|
|
except Exception as error:
|
|
except Exception as error:
|
|
self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}: {}:{}. \nError:{}').format(collection_name, attribute, comparison_operator, attribute_value, error))
|
|
self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}: {}:{}. \nError:{}').format(collection_name, attribute, comparison_operator, attribute_value, error))
|
|
- if return_as_dataframe:
|
|
|
|
- return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
|
- else:
|
|
|
|
- return data
|
|
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
+ if data.collection.count_documents(query) != 0:
|
|
|
|
+ if return_as_dataframe:
|
|
|
|
+ return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
|
+ else:
|
|
|
|
+ return data
|
|
|
|
|
|
def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list, index: str = None):
|
|
def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list, index: str = None):
|
|
|
|
|
|
@@ -284,11 +307,16 @@ class MongodbHandler:
|
|
data = self._database[collection_name].aggregate(pipeline=aggregation_pipeline, allowDiskUse=True)
|
|
data = self._database[collection_name].aggregate(pipeline=aggregation_pipeline, allowDiskUse=True)
|
|
except Exception as error:
|
|
except Exception as error:
|
|
self._log.log_and_raise_error(('A problem occured when aggregating the collection {} with the pipeline {}. \nError: {}').format(collection_name, aggregation_pipeline, error))
|
|
self._log.log_and_raise_error(('A problem occured when aggregating the collection {} with the pipeline {}. \nError: {}').format(collection_name, aggregation_pipeline, error))
|
|
|
|
+ return None
|
|
|
|
|
|
return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
|
|
|
- def convert_mongo_data_into_dataframe(self, data, index: str = None, collection_name: str = None) -> pd.DataFrame():
|
|
|
|
|
|
+ def convert_mongo_data_into_dataframe(self, data, index: str = None, collection_name: str = None, chunksize: int = 500) -> pd.DataFrame():
|
|
|
|
|
|
|
|
+ start_time = time.time()
|
|
|
|
+ '''
|
|
|
|
+ self._log.info('Converting returned mongo data into a DataFrame')
|
|
|
|
+
|
|
data = list(data)
|
|
data = list(data)
|
|
try:
|
|
try:
|
|
if len(data)> 0:
|
|
if len(data)> 0:
|
|
@@ -299,11 +327,38 @@ class MongodbHandler:
|
|
df = pd.DataFrame(data)
|
|
df = pd.DataFrame(data)
|
|
if index is not None:
|
|
if index is not None:
|
|
df.set_index(index, inplace=True)
|
|
df.set_index(index, inplace=True)
|
|
|
|
+
|
|
|
|
+ self._log.info(('DataFrame conversion is done, took {} seconds').format(time.time()-start_time))
|
|
return df
|
|
return df
|
|
else:
|
|
else:
|
|
self._log.warning(('No data for the query was found').format())
|
|
self._log.warning(('No data for the query was found').format())
|
|
except Exception as error:
|
|
except Exception as error:
|
|
self._log.log_and_raise_error(('An error occured trying to convert mongo data into pd.Dataframe. \nError: {} ').format(error))
|
|
self._log.log_and_raise_error(('An error occured trying to convert mongo data into pd.Dataframe. \nError: {} ').format(error))
|
|
|
|
+ '''
|
|
|
|
+
|
|
|
|
+ frames = []
|
|
|
|
+ records = []
|
|
|
|
+ for iteration, value in enumerate(data):
|
|
|
|
+
|
|
|
|
+ records.append(value)
|
|
|
|
+ if iteration + 1 % chunksize == 0:
|
|
|
|
+ frames.append(pd.DataFrame(records))
|
|
|
|
+ records = []
|
|
|
|
+
|
|
|
|
+ if records:
|
|
|
|
+ frames.append(pd.DataFrame(records))
|
|
|
|
+
|
|
|
|
+ return_df = pd.concat(frames, axis=0, sort=False)
|
|
|
|
+
|
|
|
|
+ if index is not None:
|
|
|
|
+ return_df.set_index(index, inplace=True)
|
|
|
|
+
|
|
|
|
+ self._log.info(('{} Rows were fetched from {}. DataFrame conversion is done, took {} seconds').format(len(return_df.index), collection_name if collection_name is not None else 'the database', time.time()-start_time))
|
|
|
|
+
|
|
|
|
+ return return_df
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
|
|
#def update_data_in_collection(self, query_label: str, query_value: str, update_label:str, update_value: str, collection_name:str):
|
|
#def update_data_in_collection(self, query_label: str, query_value: str, update_label:str, update_value: str, collection_name:str):
|
|
# self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}})
|
|
# self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}})
|
|
@@ -338,7 +393,7 @@ class MongodbHandler:
|
|
'''
|
|
'''
|
|
return self._database[collection_name].find({query_label:query_value}).count() > 0
|
|
return self._database[collection_name].find({query_label:query_value}).count() > 0
|
|
|
|
|
|
- def query_data_between_dates_and_generate_dataframe(self, collection_name: str, date_label: str, from_date_value: str, to_date_value: str, index: str = None, return_as_dataframe: bool = True):
|
|
|
|
|
|
+ def query_data_between_dates_and_generate_dataframe(self, collection_name: str, date_label: str, from_date_value: str, to_date_value: str, index: str = None, return_id: bool = False, return_as_dataframe: bool = True):
|
|
'''
|
|
'''
|
|
Queries data between two dates.
|
|
Queries data between two dates.
|
|
|
|
|
|
@@ -349,16 +404,20 @@ class MongodbHandler:
|
|
:param str index:
|
|
:param str index:
|
|
:param bool return_as_dataframe:
|
|
:param bool return_as_dataframe:
|
|
'''
|
|
'''
|
|
|
|
+ assert(isinstance(collection_name, str)),\
|
|
|
|
+ "Parameter 'collection_name' must be a string type"
|
|
try:
|
|
try:
|
|
- data = self._database[collection_name].find({date_label: {'$gt': from_date_value, '$lt': to_date_value}})
|
|
|
|
|
|
+ query = {date_label: {'$gt': from_date_value, '$lt': to_date_value}}
|
|
|
|
+ data = self._database[collection_name].find(query, {'_id': return_id})
|
|
|
|
|
|
except Exception as error:
|
|
except Exception as error:
|
|
- self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}: $gt:{}, $lt:{}. \nError:{}').format(collection_name, date_label, from_date_value, to_date_value, error))
|
|
|
|
|
|
+ self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}. \nError:{}').format(collection_name, query, error))
|
|
|
|
|
|
- if return_as_dataframe:
|
|
|
|
- return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
|
- else:
|
|
|
|
- return data
|
|
|
|
|
|
+ if data.collection.count_documents(query) != 0:
|
|
|
|
+ if return_as_dataframe:
|
|
|
|
+ return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
|
+ else:
|
|
|
|
+ return data
|
|
|
|
|
|
def query_oldest_or_newest_date_in_collection(self, collection_name: str, date_label: str, oldest: bool = False):
|
|
def query_oldest_or_newest_date_in_collection(self, collection_name: str, date_label: str, oldest: bool = False):
|
|
|
|
|
|
@@ -385,21 +444,23 @@ class MongodbHandler:
|
|
try:
|
|
try:
|
|
|
|
|
|
if attribute == None or attribute_value == None:
|
|
if attribute == None or attribute_value == None:
|
|
- data = self._database[collection_name].find({},{'_id': return_id}).sort(sort_label, direction).limit(limit)
|
|
|
|
|
|
+ query = {}
|
|
|
|
+ data = self._database[collection_name].find(query,{'_id': return_id}).sort(sort_label, direction).limit(limit)
|
|
else:
|
|
else:
|
|
- data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}}, {'_id': return_id}).sort(sort_label, direction).limit(limit)
|
|
|
|
-
|
|
|
|
- if len(list(data)) == 0:
|
|
|
|
- self._log.warning('No data was found for the query')
|
|
|
|
- return None
|
|
|
|
|
|
+ query = {attribute: {comparison_operator: attribute_value}}
|
|
|
|
+ data = self._database[collection_name].find(query, {'_id': return_id}).sort(sort_label, direction).limit(limit)
|
|
|
|
|
|
except Exception as error:
|
|
except Exception as error:
|
|
self._log.log_and_raise_error(('An error occured trying to query data from {}, \nError:{}').format(collection_name, error))
|
|
self._log.log_and_raise_error(('An error occured trying to query data from {}, \nError:{}').format(collection_name, error))
|
|
|
|
|
|
- if return_as_dataframe:
|
|
|
|
- return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
|
|
|
+ if data.collection.count_documents(query) != 0:
|
|
|
|
+ if return_as_dataframe:
|
|
|
|
+ return self.convert_mongo_data_into_dataframe(data, index, collection_name)
|
|
|
|
+ else:
|
|
|
|
+ return data
|
|
else:
|
|
else:
|
|
- return data
|
|
|
|
|
|
+ self._log.warning('No data was found for the query')
|
|
|
|
+ return None
|
|
|
|
|
|
def update_data_in_collection(self, update_label:str, update_value: str, collection_name:str, query_label: str = None, query_value: str = None, create_if_not_exist: bool = True, find_query: dict = None, update_many: bool = False):
|
|
def update_data_in_collection(self, update_label:str, update_value: str, collection_name:str, query_label: str = None, query_value: str = None, create_if_not_exist: bool = True, find_query: dict = None, update_many: bool = False):
|
|
|
|
|