|
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Mon Sep 16 13:27:44 2019
- @author: oskar
- @description: Implementation of a database handler for abstraction of the mongodb.
- """
- import simplejson
- import sys
- import os
- import time
- import pymongo
- from pymongo import MongoClient
- import pandas as pd
- import numpy as np
- from pprint import pprint
- sys.path.append(os.getcwd())
- from cdplib.log import Log
- from cdplib.db_migration.ParseJsonSchema import ParseJsonSchema
- from cdplib.Singleton_Threadsafe import SingletonThreadsafe
- class MongodbHandlerPool(metaclass=SingletonThreadsafe):
- #class MongodbHandlerPool():
- '''
- '''
- def __init__(self, size: int = 1):
- self._size = size
- self._mongodb_handlers = [MongodbHandler() for _ in range(size)]
- def aquire(self):
- while not self._mongodb_handlers:
- self._mongodb_handlers = [MongodbHandler() for _ in range(self._size)]
- log.warning("Ran out of Mongodb handlers, 10 more have been added. Are you sure you've returned yours?")
- return self._mongodb_handlers.pop()
- def release(self, mongodb_handler):
- if len(self._mongodb_handlers) < self._size:
- self._mongodb_handlers.append(mongodb_handler)
- class MongodbHandler:
- '''
- '''
- pass
- def __init__(self, database_url: str = None,
- database_name: str = None):
- '''
- :param str database_url: Url for the mongodb database
- :param str database_name: Name of the database the database handler should handle
- '''
- if database_url is None:
- from libraries.configuration import default as cfg
- database_url = "mongodb://{0}:{1}@{2}:{3}"\
- .format(cfg["MONGO"]["MONGO_USER"],
- cfg["MONGO"]["MONGO_PASSWORD"],
- cfg["MONGO"]["MONGO_HOST"],
- cfg["MONGO"]["MONGO_PORT"])
- if database_name is None:
- database_name = cfg["MONGO"]["MONGO_DATABASE_NAME"]
- assert(isinstance(database_url, str)),\
- "Parameter 'database_url' must be a string type"
- assert(isinstance(database_name, str)),\
- "Parameter 'database_name' must be a string type"
- self._log = Log("Mongodb Handler")
- # Connect to the MongoDB
- self._client = MongoClient(database_url)
- # Connect to the oebb_db database, or create it if it doesnt exist.
- self._database = self._client[database_name]
- self._database_name = database_name
- def __del__(self):
- try:
- self._client.close()
- except Exception as e:
- self._log.log_and_raise_error(('An error occured when trying to dispose the SQL engine. Error: {}').format(e))
- def set_database(self, database_name: str):
- '''
- :param str database_name: Name of the database.
- '''
- assert(isinstance(database_name, str)),\
- "Parameter 'database_name' must be a string type"
- if database_name not in self._client.list_database_names():
- self._log.info(('Database: {} didnt exist, it will be created for you once a collection is created in it').format(database_name))
- self._database = self._client[database_name]
- def drop_database(self):
- '''
- '''
- try:
- self._client.drop_database(self._database_name)
- except Exception as error:
- self._log.log_and_raise_error(('Couldnt drop the database. Error: {}').format(error))
- def drop_collection(self, collection_name: str):
- '''
- '''
- try:
- return_message = self._database.drop_collection(collection_name)
- if 'errmsg' in return_message:
- self._log.warning(('Couldnt drop the collection {}. Error: {}').format(collection_name, return_message['errmsg']))
- except Exception as error:
- self._log.log_and_raise_error(('Couldnt drop the collection {}. Error: {}').format(collection_name, error))
- def create_index(self, collection_name: str, key: (str, int, list), direction: (str, int)='text'):
- '''
- :param str collection_name: name on the collection for which the schema will be set.
- :param str key: Which value should be used as the index.
- :param str direction: see https://api.mongodb.com/python/current/api/pymongo/collection.html for reference.
- '''
- allowed_directions = [1, -1, '2d', 'geoHaystack', '2dsphere', 'hashed', 'text']
- assert(isinstance(collection_name, str)),\
- "Parameter 'collection_name' must be a string type"
- assert(isinstance(key, (str, int, list))),\
- "Parameter 'key' must be a string, integer or list type"
- assert(direction in allowed_directions),\
- "Parameter 'key' must be one of these values: 1, -1, '2d', 'geoHaystack', '2dsphere', 'hashed', 'text' "
- assert(isinstance(direction, str)),\
- "Parameter 'direction' must be a string type"
- if type(key) == list:
- key_list=[]
- for item in key:
- key_list.append((item,direction))
- self._database[collection_name].create_index(key_list,name='_'.join(key))
- else:
- self._database[collection_name].create_index([(key, direction)], name=key)
- def set_collection_schema(self, collection_name: str, schema_path: str,
- validation_level: str = 'moderate',validation_action: str = 'error'):
- '''
- :param str collection_name: name on the collection for which the schema will be set.
- :param str schema_path: path to the schema file.
- :param str validation_level: level of validation done by the mongodb.
- :param str validation_action: what will happen upon validation error, warning or error message.
- '''
- assert(isinstance(collection_name, str)),\
- "Parameter 'collection_name' must be a string type"
- assert(isinstance(schema_path, str)),\
- "Parameter 'schema_path' must be a string type"
- assert(isinstance(validation_level, str)),\
- "Parameter 'validation_level' must be a string type"
- assert(isinstance(validation_action, str)),\
- "Parameter 'validation_action' must be a string type"
- parse_obj = ParseJsonSchema(schema_paths=schema_path)
- command = {
- 'collMod': collection_name,
- 'validator': {
- '$jsonSchema': parse_obj.load_and_parse_schema_for_mongodb(schema_path)
- },
- 'validationLevel': validation_level,
- 'validationAction': validation_action
- }
- try:
- self._database.command(command)
- except Exception as error:
- self._log.log_and_raise_error(('An error occured when trying to set a schema for the collection: {}. \nError: {}').format(collection_name, error))
- def create_collection(self, collection_name):
- '''
- :param str collection_name: name of the collection to be created.
- '''
- assert(isinstance(collection_name, str)),\
- "Parameter 'collection_name' must be a string type"
- if collection_name not in self._database.list_collection_names():
- try:
- self._log.info(("Collection '{}' has been created").format(collection_name))
- self._database.create_collection(collection_name)
- except Exception as error:
- self._log.log_and_raise_error(('An error occured while creating the new collection: {}. \nError: {}').format(collection_name, error))
- else:
- self._log.info(("Collection '{}' already exists").format(collection_name))
- self._database[collection_name]
- def insert_data_into_collection(self, data: (dict, list, np.ndarray, pd.DataFrame, pd.Series),
- collection_name: str,
- ordered: bool = False):
- '''
- :param dict data: dictionary containing the data to be inserted in the collection
- :param pymongo.database.Collection collection: The collection the data will be added to.
- '''
- allowed_types = (dict, list, np.ndarray, pd.DataFrame, pd.Series)
- assert(isinstance(data, allowed_types)),\
- "Parameter 'data' is of invalid type"
- if isinstance(data, np.ndarray):
- data = pd.DataFrame(data)
- if isinstance(data, pd.DataFrame):
- data = simplejson.loads(data.to_json(orient="records",
- date_format="iso"))
- elif isinstance(data, pd.Series):
- data = simplejson.loads(data.to_json(date_format="iso"))
- try:
- if (len(data) == 1) or (isinstance(data, dict)):
- if isinstance(data, pd.DataFrame) and (len(data) == 1):
- insert_data = data.iloc[0]
- elif type(data) is list:
- insert_data = data[0]
- else:
- insert_data = data
- self._database[collection_name].insert_one(insert_data)
- else:
- self._database[collection_name].insert_many(data, ordered=ordered)
- except Exception as error:
- if len(data) > 1:
- self._log.warning(('An error occured inserting {} documents into database: {} and collection: {}.').format(len(data), self._database_name, collection_name))
- self._log.warning('This might be because one or more documents are invalid.')
- self._log.warning('We will try to insert the documents one-by-one and report which are invalid.')
- self._log.warning(('Error: {}').format(error))
-
- for row in data:
- try:
- self._database[collection_name].insert_one(row)
- except Exception as error:
- pprint(row)
- self._log.warning(error)
- else:
- self._log.log_and_raise_error(('An error occured when trying to insert data into {}, {}. \nError: {}').format(self._database_name, collection_name, error))
- self._log.info(('Data has been inserted into the {} collection').format(collection_name))
- def create_collection_and_set_schema(self, collection_name: str, schema_path: str):
- '''
- :param str collection_name: name of the collection to be created.
- :param str schema_path: path to the schema file.
- '''
- assert(isinstance(collection_name, str)),\
- "Parameter 'collection_name' must be a string type"
- assert(isinstance(schema_path, str)),\
- "Parameter 'schema_path' must be a string type"
- self.create_collection(collection_name)
- self.set_collection_schema(collection_name=collection_name, schema_path=schema_path)
- def query_data_and_generate_dataframe(self, collection_name: str, attribute: str = None,
- attribute_value: str = None, comparison_operator: str = '$eq',
- index = None, return_as_dataframe: bool = True, return_id: bool = False,
- find_query:dict = None, return_values: dict = None):
- '''
- '''
- if return_values is None:
- return_values = {'_id': return_id}
- try:
- if attribute == None or attribute_value == None:
- if find_query is None:
- query = {}
- else:
- query = find_query
- data = self._database[collection_name].find(query,return_values)
-
- else:
- query = {attribute: {comparison_operator: attribute_value}}
- data = self._database[collection_name].find(query, return_values)
- except Exception as error:
- self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}: {}:{}. \nError:{}').format(collection_name, attribute, comparison_operator, attribute_value, error))
- return None
- if data.collection.count_documents(query) != 0:
- if return_as_dataframe:
- return self.convert_mongo_data_into_dataframe(data, index, collection_name)
- else:
- return data
- def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list, index: str = None):
- try:
- data = self._database[collection_name].aggregate(pipeline=aggregation_pipeline, allowDiskUse=True)
- except Exception as error:
- self._log.log_and_raise_error(('A problem occured when aggregating the collection {} with the pipeline {}. \nError: {}').format(collection_name, aggregation_pipeline, error))
- return None
- return self.convert_mongo_data_into_dataframe(data, index, collection_name)
- def convert_mongo_data_into_dataframe(self, data, index: str = None, collection_name: str = None, chunksize: int = 500) -> pd.DataFrame():
- start_time = time.time()
- '''
- self._log.info('Converting returned mongo data into a DataFrame')
-
- data = list(data)
- try:
- if len(data)> 0:
- if collection_name:
- self._log.info(('{} rows were fetched from the {} collection').format(len(data), collection_name))
- else:
- self._log.info(('{} rows were fetched from the database').format(len(data)))
- df = pd.DataFrame(data)
- if index is not None:
- df.set_index(index, inplace=True)
- self._log.info(('DataFrame conversion is done, took {} seconds').format(time.time()-start_time))
- return df
- else:
- self._log.warning(('No data for the query was found').format())
- except Exception as error:
- self._log.log_and_raise_error(('An error occured trying to convert mongo data into pd.Dataframe. \nError: {} ').format(error))
- '''
-
- frames = []
- records = []
- for iteration, value in enumerate(data):
- records.append(value)
- if iteration + 1 % chunksize == 0:
- frames.append(pd.DataFrame(records))
- records = []
- if records:
- frames.append(pd.DataFrame(records))
- return_df = pd.concat(frames, axis=0, sort=False)
- if index is not None:
- return_df.set_index(index, inplace=True)
- self._log.info(('{} Rows were fetched from {}. DataFrame conversion is done, took {} seconds').format(len(return_df.index), collection_name if collection_name is not None else 'the database', time.time()-start_time))
-
- return return_df
-
-
- #def update_data_in_collection(self, query_label: str, query_value: str, update_label:str, update_value: str, collection_name:str):
- # self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}})
- def push_data_into_collection(self, data: (dict, list, np.ndarray, pd.DataFrame, pd.Series),
- collection_name: str,
- update_label: str,
- query_label: str,
- query_value: str):
- '''
- Adds data to an exising array within the collection.
- :param data: data to add
- :param str collection_name: collection to add data to
- :param str update_label: label of the attribute where data is to be added
- :param str query_label: label for the query
- :param str query_value: value for the query
- '''
- self._database[collection_name].update_one({query_label:query_value}, {"$push": {update_label: data}})
- self._log.info(('A document has been pushed into the {} array in the {} collection').format(query_value, collection_name))
- def document_exists(self, collection_name: str, query_label: str, query_value:str):
- '''
- Checking whether the document exists or not.
- :param str collection_name: collection to add data to
- :param str query_label: label for the query
- :param str query_value: value for the query
- '''
- return self._database[collection_name].find({query_label:query_value}).count() > 0
- def query_data_between_dates_and_generate_dataframe(self, collection_name: str, date_label: str, from_date_value: str, to_date_value: str, index: str = None, return_id: bool = False, return_as_dataframe: bool = True):
- '''
- Queries data between two dates.
- :param str collection_name: collection to add data to
- :param str date_label: label of the attribute where the date is stored
- :param str from_date_value: date which
- :param str to_date_value: value for the query
- :param str index:
- :param bool return_as_dataframe:
- '''
- assert(isinstance(collection_name, str)),\
- "Parameter 'collection_name' must be a string type"
- try:
- query = {date_label: {'$gt': from_date_value, '$lt': to_date_value}}
- data = self._database[collection_name].find(query, {'_id': return_id})
- except Exception as error:
- self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}. \nError:{}').format(collection_name, query, error))
- if data.collection.count_documents(query) != 0:
- if return_as_dataframe:
- return self.convert_mongo_data_into_dataframe(data, index, collection_name)
- else:
- return data
- def query_oldest_or_newest_date_in_collection(self, collection_name: str, date_label: str, oldest: bool = False):
- date = None
- direction = pymongo.ASCENDING if oldest else pymongo.DESCENDING
- try:
- data = list(self._database[collection_name].find().sort(date_label, direction).limit(1))
- if data:
- date = self.convert_mongo_data_into_dataframe(data, collection_name=collection_name)[date_label].values[0]
- else:
- self._log.warning('No date was found for this query.')
- except Exception as error:
- self._log.log_and_raise_error(('An error occured trying to query data from {}, finding the oldest: {}, value for: {}. \nError:{}').format(collection_name, oldest, date_label, error))
- return date
- def query_with_sorting_and_limit(self, collection_name: str, sort_label: str, limit:int, attribute: str = None,
- attribute_value: str = None, comparison_operator: str = '$eq', ascending=True,
- index = None, return_as_dataframe: bool = True, return_id: bool = False):
- direction = pymongo.ASCENDING if ascending else pymongo.DESCENDING
- try:
- if attribute == None or attribute_value == None:
- query = {}
- data = self._database[collection_name].find(query,{'_id': return_id}).sort(sort_label, direction).limit(limit)
- else:
- query = {attribute: {comparison_operator: attribute_value}}
- data = self._database[collection_name].find(query, {'_id': return_id}).sort(sort_label, direction).limit(limit)
- except Exception as error:
- self._log.log_and_raise_error(('An error occured trying to query data from {}, \nError:{}').format(collection_name, error))
- if data.collection.count_documents(query) != 0:
- if return_as_dataframe:
- return self.convert_mongo_data_into_dataframe(data, index, collection_name)
- else:
- return data
- else:
- self._log.warning('No data was found for the query')
- return None
- def update_data_in_collection(self, update_label:str, update_value: str, collection_name:str, query_label: str = None, query_value: str = None, create_if_not_exist: bool = True, find_query: dict = None, update_query:dict = None, update_many: bool = False):
- if isinstance(update_value, pd.DataFrame):
- update_value = simplejson.loads(update_value.to_json(orient="records",
- date_format="iso"))
- if update_query is None:
- update_query = {"$set": {update_label: update_value}}
- if find_query is None:
- if query_label and query_value:
- find_query = {query_label:query_value}
-
-
- try:
- if update_many:
- if find_query is not None:
- self._database[collection_name].update_many(find_query, update_query, upsert=create_if_not_exist)
- else:
- self._database[collection_name].update_many({}, update_query, upsert=create_if_not_exist)
- else:
- if find_query is not None:
- self._database[collection_name].update_one(find_query, update_query, upsert=create_if_not_exist)
- else:
- self._database[collection_name].update_one({}, update_query, upsert=create_if_not_exist)
- self._log.info(('Data for label: {} has been updated').format(update_label))
- except Exception as error:
- self._log.log_and_raise_error(('There was a problem updating data for label: {}, Error: {}').format(update_label, error))
- if __name__ == "__main__":
- log = Log("Test MongodbHandler:")
- log.info('Script started')
- database_url = "mongodb://{0}:{1}@{2}:{3}"\
- .format('root',
- 'oebb',
- 'localhost',
- 27017)
- database_name = 'test_database'
- db_handler = MongodbHandler(database_name=database_name, database_url=database_url)
- # Create a colleciton for the wheelsets and give it its schema.
- for schema_path in [
- os.path.join(".", "mongo_schema", "schema_wheelsets.json"),
- os.path.join(".", "mongo_schema", "schema_process_instances.json"),
- os.path.join(".", "mongo_schema", "schema_componets.json")]:
- if os.path.isfile(schema_path):
- collection_name = os.path.basename(schema_path).lstrip("_schema").split(".")[0]
- db_handler.create_collection_and_set_schema(collection_name, schema_path)
- else:
- log.log_and_raise_warning(('No file exists at path: {}').format(schema_path))
- log.info(("Existing databases: {}, Collection in OEBB database {}")\
- .format(db_handler._client.list_database_names(), db_handler._database.list_collection_names()))
|