123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Mon Sep 16 13:27:44 2019
- @author: oskar
- @description: Implementation of a database handler for abstraction of the mongodb.
- """
- import json
- import simplejson
- import sys
- import os
- import jsonref
- from copy import deepcopy
- from pymongo import MongoClient
- import pandas as pd
- import numpy as np
- sys.path.append(os.getcwd())
- from libraries.log import Log
- from libraries.configuration import default as cfg
- class MongodbHandler:
- '''
- '''
- def __init__(self, database_url: str = cfg['MONGO_DB']['URI'],
- database_name: str = cfg['MONGO_DB']['DATABASE_NAME']):
- '''
- :param str database_url: Url for the mongodb database
- :param str database_name: Name of the database the database handler should handle
- '''
- assert(isinstance(database_url, str)),\
- "Parameter 'database_url' must be a string type"
- assert(isinstance(database_name, str)),\
- "Parameter 'database_name' must be a string type"
- self._log = Log("\nMongodbHandler script")
- self._log.info('Mongodb Handler has been initialized')
- # Connect to the MongoDB
- self._client = MongoClient(database_url)
- # Connect to the oebb_db database, or create it if it doesnt exist.
- self._database = self._client[database_name]
- def _read_schema(self, schema_path: str) -> dict:
- '''
- :param str schema_path: path to the schema file.
- '''
- assert(isinstance(schema_path, str)),\
- "Parameter 'schema_path must be a string type"
- with open(schema_path) as json_file:
- schema = json.load(json_file)
- if 'definitions' in schema:
- schema = self._dereference_schema(schema)
- return schema
- def _dereference_schema(self, schema: dict) -> dict:
- '''
- :param dict schema: dictionary containing a schema which uses references.
- '''
- assert(isinstance(schema, dict)),\
- "Parameter 'schema' must be a dictionary type"
- schema = jsonref.loads(str(schema).replace("'", "\""))
- schema = deepcopy(schema)
- schema.pop('definitions', None)
- return schema
- def set_collection_schema(self, collection_name: str, schema_path: str,
- validation_level: str = 'moderate',validation_action: str = 'error'):
- '''
- :param str collection_name: name on the collection for which the schema will be set.
- :param str schema_path: path to the schema file.
- :param str validation_level: level of validation done by the mongodb.
- :param str validation_action: what will happen upon validation error, warning or error message.
- '''
- assert(isinstance(collection_name, str)),\
- "Parameter 'collection_name' must be a string type"
- assert(isinstance(schema_path, str)),\
- "Parameter 'schema_path' must be a string type"
- assert(isinstance(validation_level, str)),\
- "Parameter 'validation_lever' must be a string type"
- assert(isinstance(validation_action, str)),\
- "Parameter 'validation_action' must be a string type"
- schema = self._read_schema(schema_path)
- command = {
- 'collMod': collection_name,
- 'validator': {
- '$jsonSchema': schema
- },
- 'validationLevel': validation_level,
- 'validationAction': validation_action
- }
- self._database.command(command)
- def create_collection(self, collection_name):
- '''
- :param str collection_name: name of the collection to be created.
- '''
- assert(isinstance(collection_name, str)),\
- "Parameter 'collection_name' must be a string type"
- if collection_name not in self._database.list_collection_names():
- self._log.info(("Collection '{}' has been created").format(collection_name))
- return self._database.create_collection(collection_name)
- else:
- self._log.info(("Collection '{}' already exists").format(collection_name))
- return self._database[collection_name]
- def insert_data_into_collection(self, data: (dict, list, np.ndarray, pd.DataFrame, pd.Series),
- collection_name: str,
- ordered: bool = False):
- '''
- :param dict data: dictionary containing the data to be inserted in the collection
- :param pymongo.database.Collection collection: The collection the data will be added to.
- '''
- allowed_types = (dict, list, np.ndarray, pd.DataFrame, pd.Series)
- assert(isinstance(data, allowed_types)),\
- "Parameter 'data' is of invalid type"
- if isinstance(data, np.ndarray):
- data = pd.DataFrame(data)
- if isinstance(data, pd.DataFrame):
- data = simplejson.loads(data.to_json(orient="records",
- date_format="iso"))
- elif isinstance(data, pd.Series):
- data = simplejson.loads(data.to_json(date_format="iso"))
- if (len(data) == 1) or (isinstance(data, dict)):
- if isinstance(data, pd.DataFrame) and (len(data) == 1):
- data = data.iloc[0]
- self._database[collection_name].insert_one(data)
- else:
- self._database[collection_name].insert_many(data, ordered=ordered)
- self._log.info(('Data has been inserted into the {} collection').format(collection_name))
- def create_collection_and_set_schema(self, collection_name: str, schema_path: str):
- '''
- :param str collection_name: name of the collection to be created.
- :param str schema_path: path to the schema file.
- '''
- assert(isinstance(collection_name, str)),\
- "Parameter 'collection_name' must be a string type"
- assert(isinstance(schema_path, str)),\
- "Parameter 'schema_path' must be a string type"
- self.create_collection(collection_name)
- self.set_collection_schema(collection_name=collection_name, schema_path=schema_path)
- def query_data_and_generate_dataframe(self, collection_name: str, attribute: str = None,
- attribute_value: str = None, comparison_operator: str = '$eq'):
- '''
- '''
- if attribute is None or attribute_value is None:
- data = self._database[collection_name].find()
- else:
- data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}})
- df = pd.DataFrame(list(data))
- df.set_index('radsatznummer', inplace=True)
- return df
- if __name__ == "__main__":
- log = Log("Test MongodbHandler:")
- log.info('Script started')
- db_handler = MongodbHandler()
- # Create a colleciton for the wheelsets and give it its schema.
- for schema_path in [
- os.path.join(".", "mongo_schema", "schema_wheelsets.json"),
- os.path.join(".", "mongo_schema", "schema_process_instances.json"),
- os.path.join(".", "mongo_schema", "schema_componets.json")]:
- if os.path.isfile(schema_path):
- collection_name = os.path.basename(schema_path).lstrip("_schema").split(".")[0]
- db_handler.create_collection_and_set_schema(collection_name, schema_path)
- log.info(("Existing databases: {}, Collection in OEBB database {}")\
- .format(db_handler._client.list_database_names(), db_handler._database.list_collection_names()))
|