#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Mon Sep 16 13:27:44 2019 @author: oskar @description: Implementation of a database handler for abstraction of the mongodb. """ import json import simplejson import sys import os import jsonref from copy import deepcopy from pymongo import MongoClient import pandas as pd import numpy as np sys.path.append(os.getcwd()) from libraries.log import Log from libraries.configuration import default as cfg class MongodbHandler: ''' ''' def __init__(self, database_url: str = cfg['MONGO_DB']['URI'], database_name: str = cfg['MONGO_DB']['DATABASE_NAME']): ''' :param str database_url: Url for the mongodb database :param str database_name: Name of the database the database handler should handle ''' assert(isinstance(database_url, str)),\ "Parameter 'database_url' must be a string type" assert(isinstance(database_name, str)),\ "Parameter 'database_name' must be a string type" self._log = Log("\nMongodbHandler script") self._log.info('Mongodb Handler has been initialized') # Connect to the MongoDB self._client = MongoClient(database_url) # Connect to the oebb_db database, or create it if it doesnt exist. self._database = self._client[database_name] def _read_schema(self, schema_path: str) -> dict: ''' :param str schema_path: path to the schema file. ''' assert(isinstance(schema_path, str)),\ "Parameter 'schema_path must be a string type" with open(schema_path) as json_file: schema = json.load(json_file) if 'definitions' in schema: schema = self._dereference_schema(schema) return schema def _dereference_schema(self, schema: dict) -> dict: ''' :param dict schema: dictionary containing a schema which uses references. ''' assert(isinstance(schema, dict)),\ "Parameter 'schema' must be a dictionary type" schema = jsonref.loads(str(schema).replace("'", "\"")) schema = deepcopy(schema) schema.pop('definitions', None) return schema def set_collection_schema(self, collection_name: str, schema_path: str, validation_level: str = 'moderate',validation_action: str = 'error'): ''' :param str collection_name: name on the collection for which the schema will be set. :param str schema_path: path to the schema file. :param str validation_level: level of validation done by the mongodb. :param str validation_action: what will happen upon validation error, warning or error message. ''' assert(isinstance(collection_name, str)),\ "Parameter 'collection_name' must be a string type" assert(isinstance(schema_path, str)),\ "Parameter 'schema_path' must be a string type" assert(isinstance(validation_level, str)),\ "Parameter 'validation_lever' must be a string type" assert(isinstance(validation_action, str)),\ "Parameter 'validation_action' must be a string type" schema = self._read_schema(schema_path) command = { 'collMod': collection_name, 'validator': { '$jsonSchema': schema }, 'validationLevel': validation_level, 'validationAction': validation_action } self._database.command(command) def create_collection(self, collection_name): ''' :param str collection_name: name of the collection to be created. ''' assert(isinstance(collection_name, str)),\ "Parameter 'collection_name' must be a string type" if collection_name not in self._database.list_collection_names(): self._log.info(("Collection '{}' has been created").format(collection_name)) return self._database.create_collection(collection_name) else: self._log.info(("Collection '{}' already exists").format(collection_name)) return self._database[collection_name] def insert_data_into_collection(self, data: (dict, list, np.ndarray, pd.DataFrame, pd.Series), collection_name: str, ordered: bool = False): ''' :param dict data: dictionary containing the data to be inserted in the collection :param pymongo.database.Collection collection: The collection the data will be added to. ''' allowed_types = (dict, list, np.ndarray, pd.DataFrame, pd.Series) assert(isinstance(data, allowed_types)),\ "Parameter 'data' is of invalid type" if isinstance(data, np.ndarray): data = pd.DataFrame(data) if isinstance(data, pd.DataFrame): data = simplejson.loads(data.to_json(orient="records", date_format="iso")) elif isinstance(data, pd.Series): data = simplejson.loads(data.to_json(date_format="iso")) if (len(data) == 1) or (isinstance(data, dict)): if isinstance(data, pd.DataFrame) and (len(data) == 1): data = data.iloc[0] self._database[collection_name].insert_one(data) else: self._database[collection_name].insert_many(data, ordered=ordered) self._log.info(('Data has been inserted into the {} collection').format(collection_name)) def create_collection_and_set_schema(self, collection_name: str, schema_path: str): ''' :param str collection_name: name of the collection to be created. :param str schema_path: path to the schema file. ''' assert(isinstance(collection_name, str)),\ "Parameter 'collection_name' must be a string type" assert(isinstance(schema_path, str)),\ "Parameter 'schema_path' must be a string type" self.create_collection(collection_name) self.set_collection_schema(collection_name=collection_name, schema_path=schema_path) def query_data_and_generate_dataframe(self, collection_name: str, attribute: str = None, attribute_value: str = None, comparison_operator: str = '$eq'): ''' ''' if attribute is None or attribute_value is None: data = self._database[collection_name].find() else: data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}}) df = pd.DataFrame(list(data)) df.set_index('radsatznummer', inplace=True) return df if __name__ == "__main__": log = Log("Test MongodbHandler:") log.info('Script started') db_handler = MongodbHandler() # Create a colleciton for the wheelsets and give it its schema. for schema_path in [ os.path.join(".", "mongo_schema", "schema_wheelsets.json"), os.path.join(".", "mongo_schema", "schema_process_instances.json"), os.path.join(".", "mongo_schema", "schema_componets.json")]: if os.path.isfile(schema_path): collection_name = os.path.basename(schema_path).lstrip("_schema").split(".")[0] db_handler.create_collection_and_set_schema(collection_name, schema_path) log.info(("Existing databases: {}, Collection in OEBB database {}")\ .format(db_handler._client.list_database_names(), db_handler._database.list_collection_names()))