SQLHandler.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. """
  2. Created on Tue Sep 18 16:20:50 2018
  3. @author: tanya
  4. """
  5. import os
  6. import sys
  7. import re
  8. import sqlalchemy
  9. import sqlparse
  10. import pandas as pd
  11. import warnings
  12. sys.path.append(os.getcwd())
  13. from cdplib.log import Log
  14. from cdplib.Singleton_Threadsafe import SingletonThreadsafe
  15. class SQLHandlerPool(metaclass=SingletonThreadsafe):
  16. '''
  17. '''
  18. def __init__(self, size: int = 20):
  19. self._size = size
  20. self._log = Log(name='SQLHandlerPool')
  21. self._sql_handlers = [SQLHandler() for _ in range(size)]
  22. def aquire(self):
  23. while not self._sql_handlers:
  24. self._sql_handlers = [SQLHandler() for _ in range(self._size)]
  25. self._log.warning("Ran out of SQL handlers, 10 more have been added. Are you sure you've returned yours?")
  26. return self._sql_handlers.pop()
  27. def release(self, sql_handler):
  28. sql_handler._engine.dispose()
  29. if len(self._sql_handlers) < self._size:
  30. self._sql_handlers.append(sql_handler)
  31. class SQLHandler:
  32. '''
  33. Resembles methods for executing sql queries
  34. with different dabase connectors.
  35. Remark:in each method we force new opening and
  36. closing of a database connection,
  37. this avoids errors when parallelizing with multiprocessing.
  38. '''
  39. pass
  40. def __init__(self, db_uri: str = None,
  41. is_case_insensitive: bool = False):
  42. '''
  43. :param str db_uri:
  44. of form
  45. <sqlalchemy_dialect//user:password@host:port/dbname?charset=utf8&local_infile=1>
  46. sqlalchemy dialects:
  47. for mysql : mysql+pymysql
  48. for db2: ibm_db_sa
  49. for mssql: mssql+pyodbc
  50. '''
  51. from sqlalchemy_utils import database_exists, create_database
  52. self._log = Log(name='SQLHandler')
  53. if db_uri is None:
  54. from libraries.configuration import default as cfg
  55. if "SQL_DRIVER" in cfg["SQL"].keys():
  56. db_uri = sqlalchemy.engine.url.URL.create(
  57. cfg["SQL"]["SQL_DIALECT"],
  58. username=cfg["SQL"]["SQL_USER"],
  59. password=cfg["SQL"]["SQL_PASSWORD"],
  60. host=cfg["SQL"]["SQL_HOST"],
  61. port=cfg["SQL"]["SQL_PORT"],
  62. database=cfg["SQL"]["SQL_DATABASE_NAME"],
  63. query=dict(driver=cfg["SQL"]["SQL_DRIVER"])
  64. )
  65. else:
  66. # TODO : See if this still connect to Mariadb, also try to change to sqlalchemy.engine.url.URL.create
  67. db_uri = "{0}://{1}:{2}@{3}:{4}/{5}?driver={}&charset=utf8&local_infile=1"\
  68. .format(cfg["SQL"]["SQL_DIALECT"],
  69. cfg["SQL"]["SQL_USER"],
  70. cfg["SQL"]["SQL_PASSWORD"],
  71. cfg["SQL"]["SQL_HOST"],
  72. cfg["SQL"]["SQL_PORT"],
  73. cfg["SQL"]["SQL_DATABASE_NAME"])
  74. # assert(isinstance(db_uri, str)),\
  75. # "Parameter 'db_uri' must be of type str"
  76. # assert(re.match(r'.+://.+:(.+)?@.+:.+/.+', db_uri) is not None),\
  77. # ('database url does not match the pattern: '
  78. # 'sqlalchemy_dialect//user:password@host:port/dbname')
  79. self._db_uri = db_uri
  80. engine = sqlalchemy.create_engine(self._db_uri)
  81. if not database_exists(engine.url):
  82. create_database(engine.url)
  83. query = "CREATE DATABASE IF NOT EXISTS {}"\
  84. .format(self._connection_params["db"])
  85. with warnings.catch_warnings():
  86. warnings.simplefilter("ignore")
  87. engine.execute(query)
  88. assert(isinstance(is_case_insensitive, bool)),\
  89. "Parameter 'is_case_sensetive' must of type bool"
  90. if 'ibm' in db_uri and not is_case_insensitive:
  91. raise Exception('Ibm db2 is case insensitive')
  92. self._is_case_insensitive = is_case_insensitive
  93. self._engine = engine
  94. def __del__(self):
  95. self.dispose_engine()
  96. @property
  97. def _connection_params(self) -> dict:
  98. '''
  99. return: connection parameters like user,
  100. password, host, port, and database name
  101. rtype: dict
  102. '''
  103. try:
  104. connection_params = {}
  105. connection_params['user'], connection_params['password'] =\
  106. str(self._db_uri).split('//')[1]\
  107. .split('@')[0]\
  108. .split(':')
  109. connection_params['host'], connection_params['port'] =\
  110. str(self._db_uri).split('//')[1]\
  111. .split('@')[1]\
  112. .split('/')[0]\
  113. .split(':')
  114. connection_params['db'] = str(self._db_uri).split('/')[-1]\
  115. .split('?')[0]
  116. return connection_params
  117. except Exception as e:
  118. err = ("Could not parse connection parameters."
  119. "Finished with error {}")\
  120. .format(e)
  121. self._log.error(err)
  122. raise Exception(err)
  123. def drop_database(self):
  124. '''
  125. '''
  126. database = self._connection_params["db"]
  127. self.execute("DROP DATABASE IF EXISTS {}".format(database))
  128. self._engine.execute("CREATE DATABASE {}".format(database))
  129. self._engine.execute("USE {}".format(database))
  130. @property
  131. def _db_metadata(self) -> dict:
  132. '''
  133. Returns a sql-dialect specific information like information schema
  134. and columnames in information_schema.tables and
  135. information_schema.columns
  136. For ibm databases, information_schema is set to syscat,
  137. else it is set to information_schema
  138. If these default values do not exist in the given database,
  139. the output of the method is set to None
  140. :return: dictionary with information_schema, schema_col,
  141. table_col, column_col, default_schema
  142. '''
  143. db_metadata = {}
  144. if 'ibm' in self._db_uri:
  145. db_metadata['information_schema'] = 'syscat'
  146. db_metadata['schema_col'] = 'tabschema'
  147. db_metadata['table_col'] = 'tabname'
  148. db_metadata['column_col'] = 'colname'
  149. db_metadata['default_schema'] =\
  150. self._connection_params['user'].upper()
  151. else:
  152. db_metadata['information_schema'] = 'information_schema'
  153. db_metadata['schema_col'] = 'TABLE_SCHEMA'
  154. db_metadata['table_col'] = 'TABLE_NAME'
  155. db_metadata['column_col'] = 'COLUMN_NAME'
  156. db_metadata['default_schema'] =\
  157. self._connection_params['db']
  158. # check if it worked to create metadata
  159. try:
  160. query = """SELECT *
  161. FROM {}.tables
  162. LIMIT 1
  163. """.format(db_metadata['information_schema'])
  164. self.execute(query)
  165. except Exception as e:
  166. self._log.error(e)
  167. db_metadata = None
  168. return db_metadata
  169. def execute(self, query):
  170. '''
  171. Executes an sql-queries.
  172. Remark: queries like CREATE, DROP, SELECT work
  173. for majority of sqlalchemy dialects.
  174. queries like SHOW TABLES, LOAD DATA, and using
  175. INFORMATION_SCHEMA are mysql specific and might
  176. not exist in a different dialect.
  177. :param str query:
  178. '''
  179. connection = self._engine.connect()
  180. transaction = connection.begin()
  181. errors = []
  182. results = []
  183. # in the case of multi-query execute each query
  184. for sub_query in sqlparse.split(query):
  185. if len(sub_query) > 0:
  186. try:
  187. result = connection.execute(sub_query)
  188. if result.returns_rows:
  189. data = pd.DataFrame(result.fetchall())
  190. data.columns = result.keys()
  191. results.append(data)
  192. except Exception as e:
  193. errors.append(str(e))
  194. if len(errors) > 0:
  195. err = ('Could not execute some of the queries. '
  196. 'Obtained exceptions: {}'
  197. .format('\n'.join(errors)))
  198. self._log.error(err)
  199. raise Exception(err)
  200. transaction.commit()
  201. connection.close()
  202. return results
  203. def execute_query_from_file(self, filename: str):
  204. '''
  205. '''
  206. with open(filename, 'r') as f:
  207. query = f.read()
  208. self.execute(query)
  209. def get_tablenames(self, schema: str = None, query: str = None):
  210. '''
  211. '''
  212. if (self._db_metadata is None) and (query is None):
  213. raise Exception('Please specify the query')
  214. else:
  215. try:
  216. if query is None:
  217. schema_or_default_schema =\
  218. self._db_metadata['default_schema']\
  219. if schema is None else schema
  220. query = """SELECT DISTINCT {0}
  221. FROM {1}.tables
  222. WHERE {2} = '{3}'
  223. """.format(
  224. self._db_metadata['table_col'],
  225. self._db_metadata['information_schema'],
  226. self._db_metadata['schema_col'],
  227. schema_or_default_schema)
  228. tables = self.read_sql_to_dataframe(query).iloc[:, 0].tolist()
  229. return tables
  230. except Exception as e:
  231. err = ("Could not get tablenames"
  232. "Finished with error {}".format(e))
  233. self._log.error(err)
  234. raise Exception(err)
  235. def check_if_table_exists(self, tablename: str,
  236. schema: str = None,
  237. query: str = None) -> bool:
  238. '''
  239. Tries to retrieve table information from database with given query.
  240. If this does not work, tries to select one row from the given table,
  241. if this fails, assumes that the table does not exist.
  242. :param str tablename:
  243. :param str schema:
  244. :param str query: if not specified, tries to find
  245. tablename in information_schema specified in _db_metadata.
  246. :return: if the table exists or not
  247. :rtype: bool
  248. '''
  249. if self._is_case_insensitive:
  250. tablename = tablename.upper()
  251. try:
  252. tablenames = self.get_tablenames(schema=schema, query=query)
  253. table_exists = (tablename in tablenames)
  254. except Exception as e:
  255. self._log.warning(('Could not execute query to retrieve table '
  256. 'information. Trying to execute a'
  257. 'select statement. '
  258. 'Got exeption {}').format(e))
  259. try:
  260. query = """SELECT *
  261. FROM {0}{1}
  262. LIMIT 1
  263. """.format('' if schema is None else schema + '.',
  264. tablename)
  265. self.execute(query)
  266. table_exists = True
  267. except Exception as e:
  268. self._log.warning(('Failed to select from {0}. '
  269. 'Finished with error {1}'
  270. 'Conclusion: table does not exist')
  271. .format(tablename, e))
  272. table_exists = False
  273. return table_exists
  274. def create_schema(self, schema: str, query: str = None):
  275. '''
  276. Creates a schema if it does not exist, else does nothing
  277. :param str schema:
  278. :param str query: if None trying to read schemas from
  279. information_schema specified in db_metadata
  280. '''
  281. if (query is None):
  282. if self._db_metadata is None:
  283. raise Exception('Please specify query')
  284. else:
  285. query = """SELECT DISTINCT {0}
  286. FROM {1}.tables""".format(
  287. self._db_metadata['schema_col'],
  288. self._db_metadata['information_schema'])
  289. try:
  290. schemas = self.read_sql_to_dataframe(query).iloc[:, 0].tolist()
  291. except Exception as e:
  292. err = ("Could not retrieve the list of schemas"
  293. "from the database. Finished with error {}"
  294. .format(e))
  295. self._log.error(err)
  296. raise Exception(err)
  297. if schema not in schemas:
  298. self.execute("CREATE SCHEMA {}".format(schema))
  299. def drop_table_if_exists(self, tablename: str,
  300. schema: str = None,
  301. query: str = None):
  302. '''
  303. :param str tablename:
  304. :param str schema:
  305. :param str query: if not specified, default value is "DROP TABLE"
  306. '''
  307. if self._is_case_insensitive:
  308. tablename = tablename.upper()
  309. schema = '' if schema is None else schema + '.'
  310. if query is None:
  311. query = "DROP TABLE {0}{1};".format(schema, tablename)
  312. try:
  313. if self.check_if_table_exists(tablename):
  314. self.execute(query)
  315. except Exception as e:
  316. err = ("Could not drop the table {0} ."
  317. "Finished with error {1}"
  318. .format(tablename, e))
  319. self._log.error(err)
  320. raise Exception(err)
  321. def get_column_names(self, tablename: str,
  322. schema: str = None,
  323. query: str = None) -> list:
  324. '''
  325. Tries to retrieve column information from database with given query.
  326. If this does not work, tries to select one row from the given table.
  327. :param str tablename:
  328. :param str schema:
  329. :param str query: if not specified, tries to select column
  330. names in the information_schema specified in db_metadata
  331. '''
  332. if self._is_case_insensitive:
  333. tablename = tablename.upper()
  334. if not self.check_if_table_exists(tablename=tablename,
  335. schema=schema):
  336. err = "Table {} does not exist".format(tablename)
  337. self._log.error(err)
  338. raise Exception(err)
  339. try:
  340. if query is None:
  341. if self._db_metadata is None:
  342. raise Exception('Please specify the query')
  343. else:
  344. schema_or_default_schema =\
  345. self._db_metadata['default_schema']\
  346. if schema is None else schema
  347. query = """SELECT DISTINCT {0}
  348. FROM {1}.columns
  349. WHERE {2} = '{3}'
  350. AND {4} = '{5}'
  351. """.format(
  352. self._db_metadata['column_col'],
  353. self._db_metadata['information_schema'],
  354. self._db_metadata['schema_col'],
  355. schema_or_default_schema,
  356. self._db_metadata['table_col'],
  357. tablename)
  358. colnames = [c.lower() for c in
  359. self.read_sql_to_dataframe(query).iloc[:, 0].tolist()]
  360. except Exception as e:
  361. self._log.warn((
  362. 'Could not select columns from '
  363. 'informational schema. Trying to '
  364. 'load the table into a dataframe and selec column names.'
  365. 'Obtained exception {}').format(e))
  366. query = """SELECT *
  367. FROM {0}{1}
  368. LIMIT 1
  369. """.format('' if schema is None else schema + '.',
  370. tablename)
  371. data = self.execute(query)
  372. colnames = data[0].columns.tolist()
  373. return colnames
  374. def load_csv_to_db(self, filename: str,
  375. tablename: str,
  376. schema: str = None,
  377. query: str = None,
  378. **kwargs):
  379. '''
  380. Tries to load data from csv file to database with a given query.
  381. If this does not work, tries to load data from csv to a
  382. pandas dataframe first, and then write it to the database.
  383. :param str filename:
  384. :param str tablename:
  385. :param str schema:
  386. :param str query: if not specified, tries to use
  387. LOAD DATA LOCAL INFILE query
  388. '''
  389. if not self.check_if_table_exists(tablename=tablename,
  390. schema=schema):
  391. err = ('Table {} test does not exit.'
  392. 'Please create it first').format(tablename)
  393. self._log.error(err)
  394. raise Exception(err)
  395. else:
  396. try:
  397. if query is None:
  398. query = """LOAD DATA LOCAL INFILE '{0}'
  399. INTO TABLE {1}{2}
  400. COLUMNS TERMINATED BY ','
  401. OPTIONALLY ENCLOSED BY '"'
  402. LINES TERMINATED BY '\r\n'
  403. IGNORE 1 LINES
  404. ({3})
  405. ;""".format(
  406. filename,
  407. '' if schema is None else schema + '.',
  408. tablename,
  409. ','.join(self.get_column_names(tablename)))
  410. self.execute(query)
  411. except Exception as e:
  412. err = ("Could not load the file {0} "
  413. "to the table {1} ."
  414. "Finished with error {2}")\
  415. .format(filename, tablename, e)
  416. self._log.error(err)
  417. raise Exception(err)
  418. def read_sql_to_dataframe(self, query: str, **read_sql_kwargs):
  419. '''
  420. :param str query: normally a SELECT sql statement
  421. :param read_sql_kwargs: additional arguments to pandas read_sql method
  422. :return: selected data
  423. :rtype: DataFrame
  424. '''
  425. try:
  426. connection = self._engine.connect()
  427. data = pd.read_sql(sql=query,
  428. con=connection,
  429. **read_sql_kwargs)
  430. connection.close()
  431. return data
  432. except Exception as e:
  433. err = ("Could not read the query to a dataframe. "
  434. "Finished with error {}").format(e)
  435. self._log.error(err)
  436. raise Exception(err)
  437. def read_table(self, tablename: str,
  438. schema: str = None,
  439. **read_sql_kwargs):
  440. '''
  441. :param str tablename:
  442. :param str schema:
  443. :param read_sql_kwargs: additional arguments to pands read_sql_method
  444. :return: selected table
  445. :rtype: DataFrame
  446. '''
  447. schema = '' if schema is None else schema + '.'
  448. try:
  449. return self.read_sql_to_dataframe(
  450. query="SELECT * FROM {0}{1};".format(schema, tablename),
  451. **read_sql_kwargs)
  452. except Exception as e:
  453. err = ("Could not read the table {0} to a dataframe. "
  454. "Finished with error {1}").format(tablename, e)
  455. self._log.error(err)
  456. raise Exception(err)
  457. def append_to_table(self, data: pd.DataFrame,
  458. tablename: str,
  459. schema: str = None,
  460. to_sql_kwargs={'index': False}):
  461. '''
  462. :param DataFrame data: data to append
  463. :param str tablename: table where data is appended
  464. :param str schema:
  465. :param dict to_sql_kwargs: additional arguments to pandas to_sql method
  466. '''
  467. if schema is not None:
  468. self.create_schema(schema)
  469. try:
  470. connection = self._engine.connect()
  471. if self.check_if_table_exists(tablename=tablename, schema=schema):
  472. data.to_sql(name=tablename,
  473. schema=schema,
  474. con=connection,
  475. if_exists='append',
  476. **to_sql_kwargs)
  477. else:
  478. self.overwrite_table(data=data,
  479. tablename=tablename,
  480. schema=schema,
  481. to_sql_kwargs=to_sql_kwargs)
  482. connection.close()
  483. except Exception as e:
  484. err = ("Could not append data to the table {0}. "
  485. "Finished with error {1}").format(tablename, e)
  486. self._log.error(err)
  487. raise Exception(err)
  488. def overwrite_table(self, data: pd.DataFrame,
  489. tablename: str,
  490. schema: str = None,
  491. to_sql_kwargs={'index': False}):
  492. '''
  493. :param DataFrame data: data to write to dabase
  494. :param str tablename: table where data is written
  495. :param str schema:
  496. :param to_sql_kwargs: additional arguments to pandas to_sql method
  497. '''
  498. if schema is not None:
  499. self.create_schema(schema)
  500. try:
  501. connection = self._engine.connect()
  502. data.to_sql(name=tablename,
  503. schema=schema,
  504. con=connection,
  505. if_exists='replace',
  506. **to_sql_kwargs)
  507. connection.close()
  508. except Exception as e:
  509. err = ("Could overwrite the table {0}. "
  510. "Finished with error {1}").format(tablename, e)
  511. self._log.error(err)
  512. raise Exception(err)
  513. def draw_er_diagram_from_db(self, diagram_path: str = None,
  514. schema: str = None,
  515. include_tables: list = None):
  516. '''
  517. '''
  518. if diagram_path is None:
  519. diagram_path = "erd.png"
  520. else:
  521. diagram_dir = os.path.dirname(diagram_path)
  522. if diagram_dir != "":
  523. os.makedirs(diagram_dir, exist_ok=True)
  524. import eralchemy
  525. eralchemy.render_er(self._db_uri,
  526. diagram_path,
  527. schema=schema,
  528. include_tables=include_tables)
  529. def dispose_engine(self):
  530. try:
  531. self._engine.dispose()
  532. except Exception as e:
  533. print(('An error occured when trying to dispose the SQL engine. Error: {}').format(e))
  534. raise Exception(e)