MongodbHandler.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Mon Sep 16 13:27:44 2019
  5. @author: oskar
  6. @description: Implementation of a database handler for abstraction of the mongodb.
  7. """
  8. import simplejson
  9. import sys
  10. import os
  11. import pymongo
  12. from pymongo import MongoClient
  13. import pandas as pd
  14. import numpy as np
  15. sys.path.append(os.getcwd())
  16. from cdplib.log import Log
  17. from cdplib.db_migration.ParseJsonSchema import ParseJsonSchema
  18. #class MongodbHandlerPool(metaclass=SingletonThreadsafe):
  19. class MongodbHandlerPool():
  20. '''
  21. '''
  22. def __init__(self, size: int = 1):
  23. self._size = size
  24. self._mongodb_handlers = [MongodbHandler() for _ in range(size)]
  25. def aquire(self):
  26. while not self._mongodb_handlers:
  27. self._mongodb_handlers = [MongodbHandler() for _ in range(self._size)]
  28. log.warning("Ran out of Mongodb handlers, 10 more have been added. Are you sure you've returned yours?")
  29. return self._mongodb_handlers.pop()
  30. def release(self, mongodb_handler):
  31. if len(self._mongodb_handlers) < self._size:
  32. self._mongodb_handlers.append(mongodb_handler)
  33. class MongodbHandler:
  34. '''
  35. '''
  36. pass
  37. def __init__(self, database_url: str = None,
  38. database_name: str = None):
  39. '''
  40. :param str database_url: Url for the mongodb database
  41. :param str database_name: Name of the database the database handler should handle
  42. '''
  43. if database_url is None:
  44. from libraries.configuration import default as cfg
  45. database_url = "mongodb://{0}:{1}@{2}:{3}"\
  46. .format(cfg["MONGO"]["MONGO_USER"],
  47. cfg["MONGO"]["MONGO_PASSWORD"],
  48. cfg["MONGO"]["MONGO_HOST"],
  49. cfg["MONGO"]["MONGO_PORT"])
  50. if database_name is None:
  51. database_name = cfg["MONGO"]["MONGO_DATABASE_NAME"]
  52. assert(isinstance(database_url, str)),\
  53. "Parameter 'database_url' must be a string type"
  54. assert(isinstance(database_name, str)),\
  55. "Parameter 'database_name' must be a string type"
  56. self._log = Log("Mongodb Handler")
  57. # Connect to the MongoDB
  58. self._client = MongoClient(database_url)
  59. # Connect to the oebb_db database, or create it if it doesnt exist.
  60. self._database = self._client[database_name]
  61. self._database_name = database_name
  62. def set_database(self, database_name):
  63. self._database = self._client[database_name]
  64. def drop_database(self):
  65. '''
  66. '''
  67. self._client.drop_database(self._database_name)
  68. def drop_collection(self, collection_name: str):
  69. '''
  70. '''
  71. self._database[collection_name].drop()
  72. def create_index(self, collection_name: str, key: str, direction: (str, int)='text'):
  73. '''
  74. :param str collection_name: name on the collection for which the schema will be set.
  75. :param str key: Which value should be used as the index.
  76. :param str direction: see https://api.mongodb.com/python/current/api/pymongo/collection.html for reference.
  77. '''
  78. allowed_directions = [1, -1, '2d', 'geoHaystack', '2dsphere', 'hashed', 'text']
  79. assert(isinstance(collection_name, str)),\
  80. "Parameter 'collection_name' must be a string type"
  81. assert(isinstance(key, (str, int)) & key in allowed_directions),\
  82. "Parameter 'key' must be a string or integer type and must be one of these values: 1, -1, '2d', 'geoHaystack', '2dsphere', 'hashed', 'text' "
  83. assert(isinstance(direction, str)),\
  84. "Parameter 'direction' must be a string type"
  85. self._database[collection_name].create_index([key, direction])
  86. def set_collection_schema(self, collection_name: str, schema_path: str,
  87. validation_level: str = 'moderate',validation_action: str = 'error'):
  88. '''
  89. :param str collection_name: name on the collection for which the schema will be set.
  90. :param str schema_path: path to the schema file.
  91. :param str validation_level: level of validation done by the mongodb.
  92. :param str validation_action: what will happen upon validation error, warning or error message.
  93. '''
  94. assert(isinstance(collection_name, str)),\
  95. "Parameter 'collection_name' must be a string type"
  96. assert(isinstance(schema_path, str)),\
  97. "Parameter 'schema_path' must be a string type"
  98. assert(isinstance(validation_level, str)),\
  99. "Parameter 'validation_level' must be a string type"
  100. assert(isinstance(validation_action, str)),\
  101. "Parameter 'validation_action' must be a string type"
  102. parse_obj = ParseJsonSchema(schema_paths=schema_path)
  103. schema = parse_obj.read_schema_and_parse_for_mongodb(schema_path)
  104. command = {
  105. 'collMod': collection_name,
  106. 'validator': {
  107. '$jsonSchema': schema
  108. },
  109. 'validationLevel': validation_level,
  110. 'validationAction': validation_action
  111. }
  112. try:
  113. self._database.command(command)
  114. except Exception as error:
  115. self._log.log_and_raise_error(('An error occured when trying to set a schema for the collection: {}. \nError: {}').format(collection_name, error))
  116. def create_collection(self, collection_name):
  117. '''
  118. :param str collection_name: name of the collection to be created.
  119. '''
  120. assert(isinstance(collection_name, str)),\
  121. "Parameter 'collection_name' must be a string type"
  122. if collection_name not in self._database.list_collection_names():
  123. try:
  124. self._log.info(("Collection '{}' has been created").format(collection_name))
  125. return self._database.create_collection(collection_name)
  126. except Exception as error:
  127. self._log.log_and_raise_error(('An error occured while creating the new collection: {}. \nError: {}').format(collection_name, error))
  128. else:
  129. self._log.info(("Collection '{}' already exists").format(collection_name))
  130. return self._database[collection_name]
  131. def insert_data_into_collection(self, data: (dict, list, np.ndarray, pd.DataFrame, pd.Series),
  132. collection_name: str,
  133. ordered: bool = False):
  134. '''
  135. :param dict data: dictionary containing the data to be inserted in the collection
  136. :param pymongo.database.Collection collection: The collection the data will be added to.
  137. '''
  138. allowed_types = (dict, list, np.ndarray, pd.DataFrame, pd.Series)
  139. assert(isinstance(data, allowed_types)),\
  140. "Parameter 'data' is of invalid type"
  141. if isinstance(data, np.ndarray):
  142. data = pd.DataFrame(data)
  143. if isinstance(data, pd.DataFrame):
  144. data = simplejson.loads(data.to_json(orient="records",
  145. date_format="iso"))
  146. elif isinstance(data, pd.Series):
  147. data = simplejson.loads(data.to_json(date_format="iso"))
  148. try:
  149. if (len(data) == 1) or (isinstance(data, dict)):
  150. if isinstance(data, pd.DataFrame) and (len(data) == 1):
  151. data = data.iloc[0]
  152. elif type(data) is list:
  153. data = data[0]
  154. self._database[collection_name].insert_one(data)
  155. else:
  156. self._database[collection_name].insert_many(data, ordered=ordered)
  157. except Exception as error:
  158. self._log.log_and_raise_error(('An error occured when trying to insert data into {}, {}. \nError: {}').format(self._database_name, collection_name, error))
  159. self._log.info(('Data has been inserted into the {} collection').format(collection_name))
  160. def create_collection_and_set_schema(self, collection_name: str, schema_path: str):
  161. '''
  162. :param str collection_name: name of the collection to be created.
  163. :param str schema_path: path to the schema file.
  164. '''
  165. assert(isinstance(collection_name, str)),\
  166. "Parameter 'collection_name' must be a string type"
  167. assert(isinstance(schema_path, str)),\
  168. "Parameter 'schema_path' must be a string type"
  169. self.create_collection(collection_name)
  170. self.set_collection_schema(collection_name=collection_name, schema_path=schema_path)
  171. def query_data_and_generate_dataframe(self, collection_name: str, attribute: str = None,
  172. attribute_value: str = None, comparison_operator: str = '$eq'):
  173. '''
  174. '''
  175. try:
  176. if attribute == None or attribute_value == None:
  177. data = self._database[collection_name].find()
  178. else:
  179. data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}})
  180. except Exception as error:
  181. self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}: {}:{}. \nError:{}').format(collection_name, attribute_value, comparison_operator, attribute_value, error))
  182. return self.convert_mongo_data_into_dataframe(data)
  183. def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list):
  184. try:
  185. data = self._database[collection_name].aggregate(pipeline=aggregation_pipeline, allowDiskUse=True)
  186. except Exception as error:
  187. self._log.log_and_raise_error(('A problem occured when aggregating the collection {} with the pipeline {}. \nError: {}').format(collection_name, aggregation_pipeline, error))
  188. return self.convert_mongo_data_into_dataframe(data)
  189. def convert_mongo_data_into_dataframe(self, data) -> pd.DataFrame():
  190. data = list(data)
  191. try:
  192. if len(data)> 0:
  193. df = pd.DataFrame(data)
  194. df.set_index('radsatznummer', inplace=True)
  195. return df
  196. else:
  197. self._log.warning(('No data for the query was found').format())
  198. except Exception as error:
  199. self._log.log_and_raise_error(('An error occured trying to convert mongo data into pd.Dataframe. \nError: {} ').format(error))
  200. def update_data_in_collection(self, query_label: str, query_value: str, update_label:str, update_value: str, collection_name:str):
  201. self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}})
  202. if __name__ == "__main__":
  203. log = Log("Test MongodbHandler:")
  204. log.info('Script started')
  205. database_url = "mongodb://{0}:{1}@{2}:{3}"\
  206. .format('root',
  207. 'oebb',
  208. 'localhost',
  209. 27017)
  210. database_name = 'test_database'
  211. db_handler = MongodbHandler(database_name=database_name, database_url=database_url)
  212. # Create a colleciton for the wheelsets and give it its schema.
  213. for schema_path in [
  214. os.path.join(".", "mongo_schema", "schema_wheelsets.json"),
  215. os.path.join(".", "mongo_schema", "schema_process_instances.json"),
  216. os.path.join(".", "mongo_schema", "schema_componets.json")]:
  217. if os.path.isfile(schema_path):
  218. collection_name = os.path.basename(schema_path).lstrip("_schema").split(".")[0]
  219. db_handler.create_collection_and_set_schema(collection_name, schema_path)
  220. else:
  221. log.log_and_raise_warning(('No file exists at path: {}').format(schema_path))
  222. log.info(("Existing databases: {}, Collection in OEBB database {}")\
  223. .format(db_handler._client.list_database_names(), db_handler._database.list_collection_names()))