MongodbHandler.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Mon Sep 16 13:27:44 2019
  5. @author: oskar
  6. @description: Implementation of a database handler for abstraction of the mongodb.
  7. """
  8. import simplejson
  9. import sys
  10. import os
  11. import time
  12. import pymongo
  13. from pymongo import MongoClient
  14. import pandas as pd
  15. import numpy as np
  16. from pprint import pprint
  17. sys.path.append(os.getcwd())
  18. from cdplib.log import Log
  19. from cdplib.db_migration.ParseJsonSchema import ParseJsonSchema
  20. from cdplib.Singleton_Threadsafe import SingletonThreadsafe
  21. class MongodbHandlerPool(metaclass=SingletonThreadsafe):
  22. #class MongodbHandlerPool():
  23. '''
  24. '''
  25. def __init__(self, size: int = 1):
  26. self._size = size
  27. self._mongodb_handlers = [MongodbHandler() for _ in range(size)]
  28. def aquire(self):
  29. while not self._mongodb_handlers:
  30. self._mongodb_handlers = [MongodbHandler() for _ in range(self._size)]
  31. log.warning("Ran out of Mongodb handlers, 10 more have been added. Are you sure you've returned yours?")
  32. return self._mongodb_handlers.pop()
  33. def release(self, mongodb_handler):
  34. if len(self._mongodb_handlers) < self._size:
  35. self._mongodb_handlers.append(mongodb_handler)
  36. class MongodbHandler:
  37. '''
  38. '''
  39. pass
  40. def __init__(self, database_url: str = None,
  41. database_name: str = None):
  42. '''
  43. :param str database_url: Url for the mongodb database
  44. :param str database_name: Name of the database the database handler should handle
  45. '''
  46. if database_url is None:
  47. from libraries.configuration import default as cfg
  48. database_url = "mongodb://{0}:{1}@{2}:{3}"\
  49. .format(cfg["MONGO"]["MONGO_USER"],
  50. cfg["MONGO"]["MONGO_PASSWORD"],
  51. cfg["MONGO"]["MONGO_HOST"],
  52. cfg["MONGO"]["MONGO_PORT"])
  53. if database_name is None:
  54. database_name = cfg["MONGO"]["MONGO_DATABASE_NAME"]
  55. assert(isinstance(database_url, str)),\
  56. "Parameter 'database_url' must be a string type"
  57. assert(isinstance(database_name, str)),\
  58. "Parameter 'database_name' must be a string type"
  59. self._log = Log("Mongodb Handler")
  60. # Connect to the MongoDB
  61. self._client = MongoClient(database_url)
  62. # Connect to the oebb_db database, or create it if it doesnt exist.
  63. self._database = self._client[database_name]
  64. self._database_name = database_name
  65. def __del__(self):
  66. try:
  67. self._client.close()
  68. except Exception as e:
  69. self._log.log_and_raise_error(('An error occured when trying to dispose the SQL engine. Error: {}').format(e))
  70. def set_database(self, database_name: str):
  71. '''
  72. :param str database_name: Name of the database.
  73. '''
  74. assert(isinstance(database_name, str)),\
  75. "Parameter 'database_name' must be a string type"
  76. if database_name not in self._client.list_database_names():
  77. self._log.info(('Database: {} didnt exist, it will be created for you once a collection is created in it').format(database_name))
  78. self._database = self._client[database_name]
  79. def drop_database(self):
  80. '''
  81. '''
  82. try:
  83. self._client.drop_database(self._database_name)
  84. except Exception as error:
  85. self._log.log_and_raise_error(('Couldnt drop the database. Error: {}').format(error))
  86. def drop_collection(self, collection_name: str):
  87. '''
  88. '''
  89. try:
  90. return_message = self._database.drop_collection(collection_name)
  91. if 'errmsg' in return_message:
  92. self._log.warning(('Couldnt drop the collection {}. Error: {}').format(collection_name, return_message['errmsg']))
  93. except Exception as error:
  94. self._log.log_and_raise_error(('Couldnt drop the collection {}. Error: {}').format(collection_name, error))
  95. def create_index(self, collection_name: str, key: (str, int, list), direction: (str, int)='text'):
  96. '''
  97. :param str collection_name: name on the collection for which the schema will be set.
  98. :param str key: Which value should be used as the index.
  99. :param str direction: see https://api.mongodb.com/python/current/api/pymongo/collection.html for reference.
  100. '''
  101. allowed_directions = [1, -1, '2d', 'geoHaystack', '2dsphere', 'hashed', 'text']
  102. assert(isinstance(collection_name, str)),\
  103. "Parameter 'collection_name' must be a string type"
  104. assert(isinstance(key, (str, int, list))),\
  105. "Parameter 'key' must be a string, integer or list type"
  106. assert(direction in allowed_directions),\
  107. "Parameter 'key' must be one of these values: 1, -1, '2d', 'geoHaystack', '2dsphere', 'hashed', 'text' "
  108. assert(isinstance(direction, str)),\
  109. "Parameter 'direction' must be a string type"
  110. if type(key) == list:
  111. key_list=[]
  112. for item in key:
  113. key_list.append((item,direction))
  114. self._database[collection_name].create_index(key_list,name='_'.join(key))
  115. else:
  116. self._database[collection_name].create_index([(key, direction)], name=key)
  117. def set_collection_schema(self, collection_name: str, schema_path: str,
  118. validation_level: str = 'moderate',validation_action: str = 'error'):
  119. '''
  120. :param str collection_name: name on the collection for which the schema will be set.
  121. :param str schema_path: path to the schema file.
  122. :param str validation_level: level of validation done by the mongodb.
  123. :param str validation_action: what will happen upon validation error, warning or error message.
  124. '''
  125. assert(isinstance(collection_name, str)),\
  126. "Parameter 'collection_name' must be a string type"
  127. assert(isinstance(schema_path, str)),\
  128. "Parameter 'schema_path' must be a string type"
  129. assert(isinstance(validation_level, str)),\
  130. "Parameter 'validation_level' must be a string type"
  131. assert(isinstance(validation_action, str)),\
  132. "Parameter 'validation_action' must be a string type"
  133. parse_obj = ParseJsonSchema(schema_paths=schema_path)
  134. command = {
  135. 'collMod': collection_name,
  136. 'validator': {
  137. '$jsonSchema': parse_obj.load_and_parse_schema_for_mongodb(schema_path)
  138. },
  139. 'validationLevel': validation_level,
  140. 'validationAction': validation_action
  141. }
  142. try:
  143. self._database.command(command)
  144. except Exception as error:
  145. self._log.log_and_raise_error(('An error occured when trying to set a schema for the collection: {}. \nError: {}').format(collection_name, error))
  146. def create_collection(self, collection_name):
  147. '''
  148. :param str collection_name: name of the collection to be created.
  149. '''
  150. assert(isinstance(collection_name, str)),\
  151. "Parameter 'collection_name' must be a string type"
  152. if collection_name not in self._database.list_collection_names():
  153. try:
  154. self._log.info(("Collection '{}' has been created").format(collection_name))
  155. self._database.create_collection(collection_name)
  156. except Exception as error:
  157. self._log.log_and_raise_error(('An error occured while creating the new collection: {}. \nError: {}').format(collection_name, error))
  158. else:
  159. self._log.info(("Collection '{}' already exists").format(collection_name))
  160. self._database[collection_name]
  161. def insert_data_into_collection(self, data: (dict, list, np.ndarray, pd.DataFrame, pd.Series),
  162. collection_name: str,
  163. ordered: bool = False):
  164. '''
  165. :param dict data: dictionary containing the data to be inserted in the collection
  166. :param pymongo.database.Collection collection: The collection the data will be added to.
  167. '''
  168. allowed_types = (dict, list, np.ndarray, pd.DataFrame, pd.Series)
  169. assert(isinstance(data, allowed_types)),\
  170. "Parameter 'data' is of invalid type"
  171. if isinstance(data, np.ndarray):
  172. data = pd.DataFrame(data)
  173. if isinstance(data, pd.DataFrame):
  174. data = simplejson.loads(data.to_json(orient="records",
  175. date_format="iso"))
  176. elif isinstance(data, pd.Series):
  177. data = simplejson.loads(data.to_json(date_format="iso"))
  178. try:
  179. if (len(data) == 1) or (isinstance(data, dict)):
  180. if isinstance(data, pd.DataFrame) and (len(data) == 1):
  181. insert_data = data.iloc[0]
  182. elif type(data) is list:
  183. insert_data = data[0]
  184. else:
  185. insert_data = data
  186. self._database[collection_name].insert_one(insert_data)
  187. else:
  188. self._database[collection_name].insert_many(data, ordered=ordered)
  189. except pymongo.errors.BulkWriteError as error:
  190. pprint(error.details)
  191. self._log.info(('Data has been inserted into the {} collection').format(collection_name))
  192. def create_collection_and_set_schema(self, collection_name: str, schema_path: str):
  193. '''
  194. :param str collection_name: name of the collection to be created.
  195. :param str schema_path: path to the schema file.
  196. '''
  197. assert(isinstance(collection_name, str)),\
  198. "Parameter 'collection_name' must be a string type"
  199. assert(isinstance(schema_path, str)),\
  200. "Parameter 'schema_path' must be a string type"
  201. self.create_collection(collection_name)
  202. self.set_collection_schema(collection_name=collection_name, schema_path=schema_path)
  203. def query_data_and_generate_dataframe(self, collection_name: str, attribute: str = None,
  204. attribute_value: str = None, comparison_operator: str = '$eq',
  205. index = None, return_as_dataframe: bool = True, return_id: bool = False,
  206. find_query:dict = None, return_values: dict = None):
  207. '''
  208. '''
  209. if return_values is None:
  210. return_values = {'_id': return_id}
  211. try:
  212. if attribute == None or attribute_value == None:
  213. if find_query is None:
  214. query = {}
  215. else:
  216. query = find_query
  217. data = self._database[collection_name].find(query,return_values)
  218. else:
  219. query = {attribute: {comparison_operator: attribute_value}}
  220. data = self._database[collection_name].find(query, return_values)
  221. except Exception as error:
  222. self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}: {}:{}. \nError:{}').format(collection_name, attribute, comparison_operator, attribute_value, error))
  223. return None
  224. if data.collection.count_documents(query) != 0:
  225. if return_as_dataframe:
  226. return self.convert_mongo_data_into_dataframe(data, index, collection_name)
  227. else:
  228. return data
  229. def aggregate_and_insert_into_collection(self,
  230. input_collection_name: str,
  231. output_collection_name: str,
  232. aggregation_pipeline: list = None):
  233. """
  234. """
  235. if aggregation_pipeline is None:
  236. aggregation_pipeline = [{"$out": output_collection_name}]
  237. else:
  238. aggregation_pipeline.append({"$out": output_collection_name})
  239. self.aggregate_data_and_generate_dataframe(
  240. collection_name=input_collection_name,
  241. aggregation_pipeline=aggregation_pipeline)
  242. def index_collection(self, collection_name: str, keys: list):
  243. """
  244. :param keys: compound indexes for the collection,
  245. is either a list of tuples of shape (field_name, 1) or (field_name, -1)
  246. for the indexing order, or a tuple of field namse, then the second element of the
  247. tuple is set to 1
  248. """
  249. keys = [(key, 1) if not isinstance(key, tuple) else key for key in keys]
  250. self._database[collection_name].create_index(keys)
  251. def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list, index: str = None, return_as_dataframe=True):
  252. try:
  253. data = self._database[collection_name].aggregate(pipeline=aggregation_pipeline, allowDiskUse=True)
  254. except Exception as error:
  255. self._log.log_and_raise_error(('A problem occured when aggregating the collection {} with the pipeline {}. \nError: {}').format(collection_name, aggregation_pipeline, error))
  256. return None
  257. if return_as_dataframe:
  258. return self.convert_mongo_data_into_dataframe(data, index, collection_name)
  259. else:
  260. return data
  261. def convert_mongo_data_into_dataframe(self, data, index: str = None, collection_name: str = None, chunksize: int = 500) -> pd.DataFrame():
  262. start_time = time.time()
  263. '''
  264. self._log.info('Converting returned mongo data into a DataFrame')
  265. data = list(data)
  266. try:
  267. if len(data)> 0:
  268. if collection_name:
  269. self._log.info(('{} rows were fetched from the {} collection').format(len(data), collection_name))
  270. else:
  271. self._log.info(('{} rows were fetched from the database').format(len(data)))
  272. df = pd.DataFrame(data)
  273. if index is not None:
  274. df.set_index(index, inplace=True)
  275. self._log.info(('DataFrame conversion is done, took {} seconds').format(time.time()-start_time))
  276. return df
  277. else:
  278. self._log.warning(('No data for the query was found').format())
  279. except Exception as error:
  280. self._log.log_and_raise_error(('An error occured trying to convert mongo data into pd.Dataframe. \nError: {} ').format(error))
  281. '''
  282. frames = []
  283. records = []
  284. for iteration, value in enumerate(data):
  285. records.append(value)
  286. if iteration + 1 % chunksize == 0:
  287. frames.append(pd.DataFrame(records))
  288. records = []
  289. if records:
  290. frames.append(pd.DataFrame(records))
  291. if len(frames) > 1:
  292. return_df = pd.concat(frames, axis=0, sort=False)
  293. else:
  294. if len(frames) == 0:
  295. self._log.warning('Query returned empty Cursor')
  296. return_df = pd.DataFrame()
  297. else:
  298. return_df = frames[0]
  299. if index is not None:
  300. return_df.set_index(index, inplace=True)
  301. self._log.info(('{} Rows were fetched from {}. DataFrame conversion is done, took {} seconds').format(len(return_df.index), collection_name if collection_name is not None else 'the database', time.time()-start_time))
  302. return return_df
  303. #def update_data_in_collection(self, query_label: str, query_value: str, update_label:str, update_value: str, collection_name:str):
  304. # self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}})
  305. def push_data_into_collection(self, data: (dict, list, np.ndarray, pd.DataFrame, pd.Series),
  306. collection_name: str,
  307. update_label: str,
  308. query_label: str,
  309. query_value: str):
  310. '''
  311. Adds data to an exising array within the collection.
  312. :param data: data to add
  313. :param str collection_name: collection to add data to
  314. :param str update_label: label of the attribute where data is to be added
  315. :param str query_label: label for the query
  316. :param str query_value: value for the query
  317. '''
  318. if type(data) == list:
  319. self._database[collection_name].update_one({query_label:query_value}, {"$push": {update_label: {"$each": data}}})
  320. self._log.info(('A document has been pushed into the {} array in the {} collection').format(query_value, collection_name))
  321. else:
  322. self._database[collection_name].update_one({query_label:query_value}, {"$push": {update_label: data}})
  323. self._log.info(('A document has been pushed into the {} array in the {} collection').format(query_value, collection_name))
  324. def document_exists(self, collection_name: str, query_label: str, query_value:str):
  325. '''
  326. Checking whether the document exists or not.
  327. :param str collection_name: collection to add data to
  328. :param str query_label: label for the query
  329. :param str query_value: value for the query
  330. '''
  331. return self._database[collection_name].find({query_label:query_value}).count() > 0
  332. def query_data_between_dates_and_generate_dataframe(self, collection_name: str, date_label: str = None, from_date_value: str = None, to_date_value: str = None, index: str = None, return_id: bool = False, return_as_dataframe: bool = True, find_query: dict = None, return_values: dict = None):
  333. '''
  334. Queries data between two dates.
  335. :param str collection_name: collection to add data to
  336. :param str date_label: label of the attribute where the date is stored
  337. :param str from_date_value: date which
  338. :param str to_date_value: value for the query
  339. :param str index:
  340. :param bool return_as_dataframe:
  341. '''
  342. assert(isinstance(collection_name, str)),\
  343. "Parameter 'collection_name' must be a string type"
  344. if return_values is None:
  345. return_values = {'_id': return_id}
  346. if date_label and from_date_value and to_date_value or find_query:
  347. try:
  348. if find_query:
  349. query = find_query
  350. else:
  351. query = {date_label: {'$gt': from_date_value, '$lt': to_date_value}}
  352. data = self._database[collection_name].find(query, return_values)
  353. except Exception as error:
  354. self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}. \nError:{}').format(collection_name, query, error))
  355. if data.collection.count_documents(query) != 0:
  356. if return_as_dataframe:
  357. return self.convert_mongo_data_into_dataframe(data, index, collection_name)
  358. else:
  359. return data
  360. else:
  361. self._log.warning(('No data was found for the query: {}, in collection: {}').format(query, collection_name))
  362. return None
  363. def query_oldest_or_newest_date_in_collection(self, collection_name: str, date_label: str, oldest: bool = False):
  364. date = None
  365. direction = pymongo.ASCENDING if oldest else pymongo.DESCENDING
  366. try:
  367. data = list(self._database[collection_name].find().sort(date_label, direction).limit(1))
  368. if data:
  369. date = self.convert_mongo_data_into_dataframe(data, collection_name=collection_name)[date_label].values[0]
  370. else:
  371. self._log.warning('No date was found for this query.')
  372. except Exception as error:
  373. self._log.log_and_raise_error(('An error occured trying to query data from {}, finding the oldest: {}, value for: {}. \nError:{}').format(collection_name, oldest, date_label, error))
  374. return date
  375. def query_with_sorting_and_limit(self, collection_name: str, sort_label: str, limit:int, attribute: str = None,
  376. attribute_value: str = None, comparison_operator: str = '$eq', ascending=True,
  377. index = None, return_as_dataframe: bool = True, return_id: bool = False):
  378. direction = pymongo.ASCENDING if ascending else pymongo.DESCENDING
  379. try:
  380. if attribute == None or attribute_value == None:
  381. query = {}
  382. data = self._database[collection_name].find(query,{'_id': return_id}).sort(sort_label, direction).limit(limit)
  383. else:
  384. query = {attribute: {comparison_operator: attribute_value}}
  385. data = self._database[collection_name].find(query, {'_id': return_id}).sort(sort_label, direction).limit(limit)
  386. except Exception as error:
  387. self._log.log_and_raise_error(('An error occured trying to query data from {}, \nError:{}').format(collection_name, error))
  388. if data.collection.count_documents(query) != 0:
  389. if return_as_dataframe:
  390. return self.convert_mongo_data_into_dataframe(data, index, collection_name)
  391. else:
  392. return data
  393. else:
  394. self._log.warning('No data was found for the query')
  395. return None
  396. def update_data_in_collection(self, collection_name:str, update_label:str = None, update_value: str = None, query_label: str = None, query_value: str = None, create_if_not_exist: bool = True, find_query: dict = None, update_query:dict = None, update_many: bool = False):
  397. if isinstance(update_value, pd.DataFrame):
  398. update_value = simplejson.loads(update_value.to_json(orient="records",
  399. date_format="iso"))
  400. if update_query is None:
  401. update_query = {"$set": {update_label: update_value}}
  402. if find_query is None:
  403. if query_label and query_value:
  404. find_query = {query_label:query_value}
  405. try:
  406. if update_many:
  407. if find_query is not None:
  408. self._database[collection_name].update_many(find_query, update_query, upsert=create_if_not_exist)
  409. else:
  410. self._database[collection_name].update_many({}, update_query, upsert=create_if_not_exist)
  411. else:
  412. if find_query is not None:
  413. self._database[collection_name].update_one(find_query, update_query, upsert=create_if_not_exist)
  414. else:
  415. self._database[collection_name].update_one({}, update_query, upsert=create_if_not_exist)
  416. self._log.info(('Data for label: {} has been updated').format(update_label))
  417. except Exception as error:
  418. self._log.log_and_raise_error(('There was a problem updating data for label: {}, Error: {}').format(update_label, error))
  419. def which_document_in_list_exists(self, collection_name: str, query_label: str, query_values:list):
  420. '''
  421. Checking whether the document in the list exist or not.
  422. :param str collection_name: collection to add data to
  423. :param str query_label: label for the query
  424. :param list query_values: values to see if they exist or not
  425. '''
  426. query = {query_label:{'$in': query_values}}
  427. data = self._database[collection_name].find(query,{query_label:1, '_id':0})
  428. if data.collection.count_documents(query) != 0:
  429. return [value[query_label] for value in data]
  430. else:
  431. return []
  432. def get_distinct_value_of_key(self, collection_name: str, key: str):
  433. assert(isinstance(collection_name, str)),\
  434. "Parameter 'collection_name' must be a string type"
  435. assert(isinstance(key, str)),\
  436. "Parameter 'key' must be a string type"
  437. data = self._database[collection_name].distinct(key)
  438. return data
  439. def get_number_of_entries_in_collection(self, collection_name: str):
  440. assert(isinstance(collection_name, str)),\
  441. "Parameter 'collection_name' must be a string type"
  442. data = self._database[collection_name].count()
  443. return data
  444. if __name__ == "__main__":
  445. log = Log("Test MongodbHandler:")
  446. log.info('Script started')
  447. database_url = "mongodb://{0}:{1}@{2}:{3}"\
  448. .format('root',
  449. 'oebb',
  450. 'localhost',
  451. 27017)
  452. database_name = 'test_database'
  453. db_handler = MongodbHandler(database_name=database_name, database_url=database_url)
  454. # Create a colleciton for the wheelsets and give it its schema.
  455. for schema_path in [
  456. os.path.join(".", "mongo_schema", "schema_wheelsets.json"),
  457. os.path.join(".", "mongo_schema", "schema_process_instances.json"),
  458. os.path.join(".", "mongo_schema", "schema_componets.json")]:
  459. if os.path.isfile(schema_path):
  460. collection_name = os.path.basename(schema_path).lstrip("_schema").split(".")[0]
  461. db_handler.create_collection_and_set_schema(collection_name, schema_path)
  462. else:
  463. log.log_and_raise_warning(('No file exists at path: {}').format(schema_path))
  464. log.info(("Existing databases: {}, Collection in OEBB database {}")\
  465. .format(db_handler._client.list_database_names(), db_handler._database.list_collection_names()))