123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Mon Sep 16 13:27:44 2019
- @author: oskar
- @description: Implementation of a database handler for abstraction of the mongodb.
- """
- import json
- import simplejson
- import sys
- import os
- import jsonref
- from copy import deepcopy
- from pymongo import MongoClient
- import pandas as pd
- import numpy as np
- sys.path.append(os.getcwd())
- from libraries.log import Log
- from libraries.configuration import default as cfg
- from libraries.Singleton_Threadsafe import SingletonThreadsafe
- class MongodbHandlerPool(metaclass=SingletonThreadsafe):
- '''
- '''
- def __init__(self, size: int = 10):
- self._size = size
- self._mongodb_handlers = [MongodbHandler() for _ in range(size)]
- def aquire(self):
- while not self._mongodb_handlers:
- self._mongodb_handlers = [MongodbHandler() for _ in range(self._size)]
- log.warning("Ran out of Mongodb handlers, 10 more have been added. Are you sure you've returned yours?")
- return self._mongodb_handlers.pop()
-
- def release(self, mongodb_handler):
- if len(self._mongodb_handlers) < self._size:
- self._mongodb_handlers.append(mongodb_handler)
- class MongodbHandler:
- '''
- '''
- pass
- def __init__(self, database_url: str = None,
- database_name: str = None):
- '''
- :param str database_url: Url for the mongodb database
- :param str database_name: Name of the database the database handler should handle
- '''
- if database_url is None:
- database_url = "mongodb://{0}:{1}@{2}:{3}"\
- .format(cfg["MONGO"]["MONGO_USER"],
- cfg["MONGO"]["MONGO_PASSWORD"],
- cfg["MONGO"]["MONGO_HOST"],
- cfg["MONGO"]["MONGO_PORT"])
- if database_name is None:
- database_name = cfg["MONGO"]["MONGO_DATABASE_NAME"]
- assert(isinstance(database_url, str)),\
- "Parameter 'database_url' must be a string type"
- assert(isinstance(database_name, str)),\
- "Parameter 'database_name' must be a string type"
- self._log = Log("Mongodb Handler")
- # Connect to the MongoDB
- self._client = MongoClient(database_url)
- # Connect to the oebb_db database, or create it if it doesnt exist.
- self._database = self._client[database_name]
- self._database_name = database_name
- def set_database(self, database_name):
- self._database = self._client[database_name]
- def drop_database(self):
- '''
- '''
- self._client.drop_database(self._database_name)
- def drop_collection(self, collection_name: str):
- '''
- '''
- self._database[collection_name].drop()
- def _read_schema(self, schema_path: str) -> dict:
- '''
- :param str schema_path: path to the schema file.
- '''
- assert(isinstance(schema_path, str)),\
- "Parameter 'schema_path must be a string type"
- with open(schema_path) as json_file:
- schema = json.load(json_file)
- definitions_flag = self._analyze_schema(schema)
- if definitions_flag:
- schema = self._dereference_schema(schema)
- return schema
- def _analyze_schema(self, schema: dict, definitions_flag: bool = False) -> dict:
- for key in schema:
- if key == 'definitions':
- definitions_flag = True
- return definitions_flag
- if key == 'default' or key == 'default_values':
- return self._remove_defaults(schema)
- if type(schema[key]) == dict:
- definitions_flag = self._analyze_schema(schema[key], definitions_flag)
- return definitions_flag
- def _dereference_schema(self, schema: dict) -> dict:
- '''
- :param dict schema: dictionary containing a schema which uses references.
- '''
- assert(isinstance(schema, dict)),\
- "Parameter 'schema' must be a dictionary type"
- schema = jsonref.loads(str(schema).replace("'", "\""))
- schema = deepcopy(schema)
- schema.pop('definitions', None)
- return schema
- def _remove_defaults(self, schema: dict) -> dict:
- '''
- :param dict schema: dictionary containing a schema which uses references.
- '''
- if 'default' in schema:
- del schema['default']
- if 'default_values' in schema:
- del schema['default_values']
- return schema
- assert(isinstance(schema, dict)),\
- "Parameter 'schema' must be a dictionary type"
- def set_collection_schema(self, collection_name: str, schema_path: str,
- validation_level: str = 'moderate',validation_action: str = 'error'):
- '''
- :param str collection_name: name on the collection for which the schema will be set.
- :param str schema_path: path to the schema file.
- :param str validation_level: level of validation done by the mongodb.
- :param str validation_action: what will happen upon validation error, warning or error message.
- '''
- assert(isinstance(collection_name, str)),\
- "Parameter 'collection_name' must be a string type"
- assert(isinstance(schema_path, str)),\
- "Parameter 'schema_path' must be a string type"
- assert(isinstance(validation_level, str)),\
- "Parameter 'validation_lever' must be a string type"
- assert(isinstance(validation_action, str)),\
- "Parameter 'validation_action' must be a string type"
- schema = self._read_schema(schema_path)
- command = {
- 'collMod': collection_name,
- 'validator': {
- '$jsonSchema': schema
- },
- 'validationLevel': validation_level,
- 'validationAction': validation_action
- }
- self._database.command(command)
- def create_collection(self, collection_name):
- '''
- :param str collection_name: name of the collection to be created.
- '''
- assert(isinstance(collection_name, str)),\
- "Parameter 'collection_name' must be a string type"
- if collection_name not in self._database.list_collection_names():
- self._log.info(("Collection '{}' has been created").format(collection_name))
- return self._database.create_collection(collection_name)
- else:
- self._log.info(("Collection '{}' already exists").format(collection_name))
- return self._database[collection_name]
- def insert_data_into_collection(self, data: (dict, list, np.ndarray, pd.DataFrame, pd.Series),
- collection_name: str,
- ordered: bool = False):
- '''
- :param dict data: dictionary containing the data to be inserted in the collection
- :param pymongo.database.Collection collection: The collection the data will be added to.
- '''
- allowed_types = (dict, list, np.ndarray, pd.DataFrame, pd.Series)
- assert(isinstance(data, allowed_types)),\
- "Parameter 'data' is of invalid type"
- if isinstance(data, np.ndarray):
- data = pd.DataFrame(data)
- if isinstance(data, pd.DataFrame):
- data = simplejson.loads(data.to_json(orient="records",
- date_format="iso"))
- elif isinstance(data, pd.Series):
- data = simplejson.loads(data.to_json(date_format="iso"))
- if (len(data) == 1) or (isinstance(data, dict)):
- if isinstance(data, pd.DataFrame) and (len(data) == 1):
- data = data.iloc[0]
- elif type(data) is list:
- data = data[0]
- self._database[collection_name].insert_one(data)
- else:
- self._database[collection_name].insert_many(data, ordered=ordered)
- self._log.info(('Data has been inserted into the {} collection').format(collection_name))
- def create_collection_and_set_schema(self, collection_name: str, schema_path: str):
- '''
- :param str collection_name: name of the collection to be created.
- :param str schema_path: path to the schema file.
- '''
- assert(isinstance(collection_name, str)),\
- "Parameter 'collection_name' must be a string type"
- assert(isinstance(schema_path, str)),\
- "Parameter 'schema_path' must be a string type"
- self.create_collection(collection_name)
- self.set_collection_schema(collection_name=collection_name, schema_path=schema_path)
- def query_data_and_generate_dataframe(self, collection_name: str, attribute: str = None,
- attribute_value: str = None, comparison_operator: str = '$eq'):
- '''
- '''
- if attribute == None or attribute_value == None:
- data = self._database[collection_name].find()
- else:
- data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}})
- if data.count() > 0:
- df = pd.DataFrame(list(data))
- df.set_index('radsatznummer', inplace=True)
- return df
- else:
- self._log.warning(('No data for the query was found').format())
- def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list):
- data = list(self._database[collection_name].aggregate(pipeline=aggregation_pipeline, allowDiskUse=True))
- if len(data)> 0:
- df = pd.DataFrame(data)
- df.set_index('radsatznummer', inplace=True)
- return df
- else:
- self._log.warning(('No data for the query was found').format())
- def update_data_in_collection(self, query_label: str, query_value: str, update_label:str, update_value: str, collection_name:str):
- self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}})
- if __name__ == "__main__":
- log = Log("Test MongodbHandler:")
- log.info('Script started')
- db_handler = MongodbHandler()
- # Create a colleciton for the wheelsets and give it its schema.
- for schema_path in [
- os.path.join(".", "mongo_schema", "schema_wheelsets.json"),
- os.path.join(".", "mongo_schema", "schema_process_instances.json"),
- os.path.join(".", "mongo_schema", "schema_componets.json")]:
- if os.path.isfile(schema_path):
- collection_name = os.path.basename(schema_path).lstrip("_schema").split(".")[0]
- db_handler.create_collection_and_set_schema(collection_name, schema_path)
- log.info(("Existing databases: {}, Collection in OEBB database {}")\
- .format(db_handler._client.list_database_names(), db_handler._database.list_collection_names()))
|