MongodbHandler.py 11 KB


  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Mon Sep 16 13:27:44 2019
  5. @author: oskar
  6. @description: Implementation of a database handler for abstraction of the mongodb.
  7. """
  8. import json
  9. import simplejson
  10. import sys
  11. import os
  12. import jsonref
  13. from copy import deepcopy
  14. from pymongo import MongoClient
  15. import pandas as pd
  16. import numpy as np
  17. sys.path.append(os.getcwd())
  18. from libraries.log import Log
  19. from libraries.configuration import default as cfg
  20. from libraries.Singleton_Threadsafe import SingletonThreadsafe
  21. class MongodbHandlerPool(metaclass=SingletonThreadsafe):
  22. '''
  23. '''
  24. def __init__(self, size: int = 10):
  25. self._size = size
  26. self._mongodb_handlers = [MongodbHandler() for _ in range(size)]
  27. def aquire(self):
  28. while not self._mongodb_handlers:
  29. self._mongodb_handlers = [MongodbHandler() for _ in range(self._size)]
  30. log.warning("Ran out of Mongodb handlers, 10 more have been added. Are you sure you've returned yours?")
  31. return self._mongodb_handlers.pop()
  32. def release(self, mongodb_handler):
  33. if len(self._mongodb_handlers) < self._size:
  34. self._mongodb_handlers.append(mongodb_handler)
  35. class MongodbHandler:
  36. '''
  37. '''
  38. pass
  39. def __init__(self, database_url: str = None,
  40. database_name: str = None):
  41. '''
  42. :param str database_url: Url for the mongodb database
  43. :param str database_name: Name of the database the database handler should handle
  44. '''
  45. if database_url is None:
  46. database_url = "mongodb://{0}:{1}@{2}:{3}"\
  47. .format(cfg["MONGO"]["MONGO_USER"],
  48. cfg["MONGO"]["MONGO_PASSWORD"],
  49. cfg["MONGO"]["MONGO_HOST"],
  50. cfg["MONGO"]["MONGO_PORT"])
  51. if database_name is None:
  52. database_name = cfg["MONGO"]["MONGO_DATABASE_NAME"]
  53. assert(isinstance(database_url, str)),\
  54. "Parameter 'database_url' must be a string type"
  55. assert(isinstance(database_name, str)),\
  56. "Parameter 'database_name' must be a string type"
  57. self._log = Log("Mongodb Handler")
  58. # Connect to the MongoDB
  59. self._client = MongoClient(database_url)
  60. # Connect to the oebb_db database, or create it if it doesnt exist.
  61. self._database = self._client[database_name]
  62. self._database_name = database_name
  63. def set_database(self, database_name):
  64. self._database = self._client[database_name]
  65. def drop_database(self):
  66. '''
  67. '''
  68. self._client.drop_database(self._database_name)
  69. def drop_collection(self, collection_name: str):
  70. '''
  71. '''
  72. self._database[collection_name].drop()
  73. def _read_schema(self, schema_path: str) -> dict:
  74. '''
  75. :param str schema_path: path to the schema file.
  76. '''
  77. assert(isinstance(schema_path, str)),\
  78. "Parameter 'schema_path must be a string type"
  79. with open(schema_path) as json_file:
  80. schema = json.load(json_file)
  81. definitions_flag = self._analyze_schema(schema)
  82. if definitions_flag:
  83. schema = self._dereference_schema(schema)
  84. return schema
  85. def _analyze_schema(self, schema: dict, definitions_flag: bool = False) -> dict:
  86. for key in schema:
  87. if key == 'definitions':
  88. definitions_flag = True
  89. return definitions_flag
  90. if key == 'default' or key == 'default_values':
  91. return self._remove_defaults(schema)
  92. if type(schema[key]) == dict:
  93. definitions_flag = self._analyze_schema(schema[key], definitions_flag)
  94. return definitions_flag
  95. def _dereference_schema(self, schema: dict) -> dict:
  96. '''
  97. :param dict schema: dictionary containing a schema which uses references.
  98. '''
  99. assert(isinstance(schema, dict)),\
  100. "Parameter 'schema' must be a dictionary type"
  101. schema = jsonref.loads(str(schema).replace("'", "\""))
  102. schema = deepcopy(schema)
  103. schema.pop('definitions', None)
  104. return schema
  105. def _remove_defaults(self, schema: dict) -> dict:
  106. '''
  107. :param dict schema: dictionary containing a schema which uses references.
  108. '''
  109. if 'default' in schema:
  110. del schema['default']
  111. if 'default_values' in schema:
  112. del schema['default_values']
  113. return schema
  114. assert(isinstance(schema, dict)),\
  115. "Parameter 'schema' must be a dictionary type"
  116. def set_collection_schema(self, collection_name: str, schema_path: str,
  117. validation_level: str = 'moderate',validation_action: str = 'error'):
  118. '''
  119. :param str collection_name: name on the collection for which the schema will be set.
  120. :param str schema_path: path to the schema file.
  121. :param str validation_level: level of validation done by the mongodb.
  122. :param str validation_action: what will happen upon validation error, warning or error message.
  123. '''
  124. assert(isinstance(collection_name, str)),\
  125. "Parameter 'collection_name' must be a string type"
  126. assert(isinstance(schema_path, str)),\
  127. "Parameter 'schema_path' must be a string type"
  128. assert(isinstance(validation_level, str)),\
  129. "Parameter 'validation_lever' must be a string type"
  130. assert(isinstance(validation_action, str)),\
  131. "Parameter 'validation_action' must be a string type"
  132. schema = self._read_schema(schema_path)
  133. command = {
  134. 'collMod': collection_name,
  135. 'validator': {
  136. '$jsonSchema': schema
  137. },
  138. 'validationLevel': validation_level,
  139. 'validationAction': validation_action
  140. }
  141. self._database.command(command)
  142. def create_collection(self, collection_name):
  143. '''
  144. :param str collection_name: name of the collection to be created.
  145. '''
  146. assert(isinstance(collection_name, str)),\
  147. "Parameter 'collection_name' must be a string type"
  148. if collection_name not in self._database.list_collection_names():
  149. self._log.info(("Collection '{}' has been created").format(collection_name))
  150. return self._database.create_collection(collection_name)
  151. else:
  152. self._log.info(("Collection '{}' already exists").format(collection_name))
  153. return self._database[collection_name]
  154. def insert_data_into_collection(self, data: (dict, list, np.ndarray, pd.DataFrame, pd.Series),
  155. collection_name: str,
  156. ordered: bool = False):
  157. '''
  158. :param dict data: dictionary containing the data to be inserted in the collection
  159. :param pymongo.database.Collection collection: The collection the data will be added to.
  160. '''
  161. allowed_types = (dict, list, np.ndarray, pd.DataFrame, pd.Series)
  162. assert(isinstance(data, allowed_types)),\
  163. "Parameter 'data' is of invalid type"
  164. if isinstance(data, np.ndarray):
  165. data = pd.DataFrame(data)
  166. if isinstance(data, pd.DataFrame):
  167. data = simplejson.loads(data.to_json(orient="records",
  168. date_format="iso"))
  169. elif isinstance(data, pd.Series):
  170. data = simplejson.loads(data.to_json(date_format="iso"))
  171. if (len(data) == 1) or (isinstance(data, dict)):
  172. if isinstance(data, pd.DataFrame) and (len(data) == 1):
  173. data = data.iloc[0]
  174. elif type(data) is list:
  175. data = data[0]
  176. self._database[collection_name].insert_one(data)
  177. else:
  178. self._database[collection_name].insert_many(data, ordered=ordered)
  179. self._log.info(('Data has been inserted into the {} collection').format(collection_name))
  180. def create_collection_and_set_schema(self, collection_name: str, schema_path: str):
  181. '''
  182. :param str collection_name: name of the collection to be created.
  183. :param str schema_path: path to the schema file.
  184. '''
  185. assert(isinstance(collection_name, str)),\
  186. "Parameter 'collection_name' must be a string type"
  187. assert(isinstance(schema_path, str)),\
  188. "Parameter 'schema_path' must be a string type"
  189. self.create_collection(collection_name)
  190. self.set_collection_schema(collection_name=collection_name, schema_path=schema_path)
  191. def query_data_and_generate_dataframe(self, collection_name: str, attribute: str = None,
  192. attribute_value: str = None, comparison_operator: str = '$eq'):
  193. '''
  194. '''
  195. if attribute == None or attribute_value == None:
  196. data = self._database[collection_name].find()
  197. else:
  198. data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}})
  199. if data.count() > 0:
  200. df = pd.DataFrame(list(data))
  201. df.set_index('radsatznummer', inplace=True)
  202. return df
  203. else:
  204. self._log.warning(('No data for the query was found').format())
  205. def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list):
  206. data = list(self._database[collection_name].aggregate(pipeline=aggregation_pipeline, allowDiskUse=True))
  207. if len(data)> 0:
  208. df = pd.DataFrame(data)
  209. df.set_index('radsatznummer', inplace=True)
  210. return df
  211. else:
  212. self._log.warning(('No data for the query was found').format())
  213. def update_data_in_collection(self, query_label: str, query_value: str, update_label:str, update_value: str, collection_name:str):
  214. self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}})
  215. if __name__ == "__main__":
  216. log = Log("Test MongodbHandler:")
  217. log.info('Script started')
  218. db_handler = MongodbHandler()
  219. # Create a colleciton for the wheelsets and give it its schema.
  220. for schema_path in [
  221. os.path.join(".", "mongo_schema", "schema_wheelsets.json"),
  222. os.path.join(".", "mongo_schema", "schema_process_instances.json"),
  223. os.path.join(".", "mongo_schema", "schema_componets.json")]:
  224. if os.path.isfile(schema_path):
  225. collection_name = os.path.basename(schema_path).lstrip("_schema").split(".")[0]
  226. db_handler.create_collection_and_set_schema(collection_name, schema_path)
  227. log.info(("Existing databases: {}, Collection in OEBB database {}")\
  228. .format(db_handler._client.list_database_names(), db_handler._database.list_collection_names()))