123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Tue Sep 18 16:20:50 2018
- @author: tanya
- """
- import os
- import sys
- import re
- import sqlalchemy
- import sqlparse
- import pandas as pd
- import warnings
- sys.path.append(os.getcwd())
- class SQLHandler:
- '''
- Resembles methods for executing sql queries
- with different dabase connectors.
- Remark:in each method we force new opening and
- closing of a database connection,
- this avoids errors when parallelizing with multiprocessing.
- '''
- def __init__(self, db_uri: str = None,
- is_case_insensitive: bool = False):
- '''
- :param str db_uri:
- of form
- <sqlalchemy_dialect//user:password@host:port/dbname?charset=utf8&local_infile=1>
- sqlalchemy dialects:
- for mysql : mysql+pymysql
- for db2: ibm_db_sa
- '''
- from libraries.log import Log
- from libraries.configuration import default as cfg
- from sqlalchemy_utils import database_exists, create_database
- self._log = Log(name='SQLHandler')
- if db_uri is None:
- db_uri = cfg["SQL_DB"]["URI"]
- assert(isinstance(db_uri, str)),\
- "Parameter 'db_uri' must be of type str"
- assert(re.match(r'.+://.+:(.+)?@.+:.+/.+', db_uri) is not None),\
- ('database url does not match the pattern: '
- 'sqlalchemy_dialect//user:password@host:port/dbname')
- self._db_uri = db_uri
- engine = sqlalchemy.create_engine(self._db_uri)
- if not database_exists(engine.url):
- create_database(engine.url)
- query = "CREATE DATABASE IF NOT EXISTS {}"\
- .format(self._connection_params["db"])
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- engine.execute(query)
- assert(isinstance(is_case_insensitive, bool)),\
- "Parameter 'is_case_sensetive' must of type bool"
- if 'ibm' in db_uri and not is_case_insensitive:
- raise Exception('Ibm db2 is case insensitive')
- self._is_case_insensitive = is_case_insensitive
- self._engine = sqlalchemy.create_engine(self._db_uri)
- @property
- def _connection_params(self) -> dict:
- '''
- return: connection parameters like user,
- password, host, port, and database name
- rtype: dict
- '''
- try:
- connection_params = {}
- connection_params['user'], connection_params['password'] =\
- self._db_uri.split('//')[1]\
- .split('@')[0]\
- .split(':')
- connection_params['host'], connection_params['port'] =\
- self._db_uri.split('//')[1]\
- .split('@')[1]\
- .split('/')[0]\
- .split(':')
- connection_params['db'] = self._db_uri.split('/')[-1]\
- .split('?')[0]
- return connection_params
- except Exception as e:
- err = ("Could not parse connection parameters."
- "Finished with error {}")\
- .format(e)
- self._log.error(err)
- raise Exception(err)
- def drop_database(self):
- '''
- '''
- database = self._connection_params["db"]
- self.execute("DROP DATABASE IF EXISTS {}".format(database))
- @property
- def _db_metadata(self) -> dict:
- '''
- Returns a sql-dialect specific information like information schema
- and columnames in information_schema.tables and
- information_schema.columns
- For ibm databases, information_schema is set to syscat,
- else it is set to information_schema
- If these default values do not exist in the given database,
- the output of the method is set to None
- :return: dictionary with information_schema, schema_col,
- table_col, column_col, default_schema
- '''
- db_metadata = {}
- if 'ibm' in self._db_uri:
- db_metadata['information_schema'] = 'syscat'
- db_metadata['schema_col'] = 'tabschema'
- db_metadata['table_col'] = 'tabname'
- db_metadata['column_col'] = 'colname'
- db_metadata['default_schema'] =\
- self._connection_params['user'].upper()
- else:
- db_metadata['information_schema'] = 'information_schema'
- db_metadata['schema_col'] = 'TABLE_SCHEMA'
- db_metadata['table_col'] = 'TABLE_NAME'
- db_metadata['column_col'] = 'COLUMN_NAME'
- db_metadata['default_schema'] =\
- self._connection_params['db']
- # check if it worked to create metadata
- try:
- query = """SELECT *
- FROM {}.tables
- LIMIT 1
- """.format(db_metadata['information_schema'])
- self.execute(query)
- except Exception as e:
- self._log.error(e)
- db_metadata = None
- return db_metadata
- def execute(self, query):
- '''
- Executes an sql-queries.
- Remark: queries like CREATE, DROP, SELECT work
- for majority of sqlalchemy dialects.
- queries like SHOW TABLES, LOAD DATA, and using
- INFORMATION_SCHEMA are mysql specific and might
- not exist in a different dialect.
- :param str query:
- '''
- connection = self._engine.connect()
- transaction = connection.begin()
- errors = []
- # in the case of multi-query execute each query
- for sub_query in sqlparse.split(query):
- if len(sub_query) > 0:
- try:
- connection.execute(sub_query, multi=True)
- except Exception as e:
- errors.append(str(e))
- if len(errors) > 0:
- err = ('Could not execute some of the queries. '
- 'Obtained exceptions: {}'
- .format('\n'.join(errors)))
- self._log.error(err)
- raise Exception(err)
- transaction.commit()
- connection.close()
- def execute_query_from_file(self, filename: str):
- '''
- '''
- with open(filename, 'r') as f:
- query = f.read()
- self.execute(query)
- def get_tablenames(self, schema: str = None, query: str = None):
- '''
- '''
- if (self._db_metadata is None) and (query is None):
- raise Exception('Please specify the query')
- else:
- try:
- if query is None:
- schema_or_default_schema =\
- self._db_metadata['default_schema']\
- if schema is None else schema
- query = """SELECT DISTINCT {0}
- FROM {1}.tables
- WHERE {2} = '{3}'
- """.format(
- self._db_metadata['table_col'],
- self._db_metadata['information_schema'],
- self._db_metadata['schema_col'],
- schema_or_default_schema)
- tables = self.read_sql_to_dataframe(query).iloc[:, 0].tolist()
- return tables
- except Exception as e:
- err = ("Could not get tablenames"
- "Finished with error {}".format(e))
- self._log.error(err)
- raise Exception(err)
- def check_if_table_exists(self, tablename: str,
- schema: str = None,
- query: str = None):
- '''
- Tries to retrieve table information from database with given query.
- If this does not work, tries to select one row from the given table,
- if this fails, assumes that the table does not exist.
- :param str tablename:
- :param str schema:
- :param str query: if not specified, tries to find
- tablename in information_schema specified in _db_metadata.
- :return: if the table exists or not
- :rtype: bool
- '''
- if self._is_case_insensitive:
- tablename = tablename.upper()
- try:
- tablenames = self.get_tablenames(schema=schema, query=query)
- table_exists = (tablename in tablenames)
- except Exception as e:
- self._log.warning(('Could not execute query to retrieve table '
- 'information. Trying to execute a'
- 'select statement. '
- 'Got exeption {}').format(e))
- try:
- query = """SELECT *
- FROM {0}{1}
- LIMIT 1
- """.format('' if schema is None else schema + '.',
- tablename)
- self.execute(query)
- table_exists = True
- except Exception as e:
- self._log.warning(('Failed to select from {0}. '
- 'Finished with error {1}'
- 'Conclusion: table does not exist')
- .format(tablename, e))
- table_exists = False
- return table_exists
- def create_schema(self, schema: str, query: str = None):
- '''
- Creates a schema if it does not exist, else does nothing
- :param str schema:
- :param str query: if None trying to read schemas from
- information_schema specified in db_metadata
- '''
- if (query is None):
- if self._db_metadata is None:
- raise Exception('Please specify query')
- else:
- query = """SELECT DISTINCT {0}
- FROM {1}.tables""".format(
- self._db_metadata['schema_col'],
- self._db_metadata['information_schema'])
- try:
- schemas = self.read_sql_to_dataframe(query).iloc[:, 0].tolist()
- except Exception as e:
- err = ("Could not retrieve the list of schemas"
- "from the database. Finished with error {}"
- .format(e))
- self._log.error(err)
- raise Exception(err)
- if schema not in schemas:
- self.execute("CREATE SCHEMA {}".format(schema))
- def drop_table_if_exists(self, tablename: str,
- schema: str = None,
- query: str = None):
- '''
- :param str tablename:
- :param str schema:
- :param str query: if not specified, default value is "DROP TABLE"
- '''
- if self._is_case_insensitive:
- tablename = tablename.upper()
- schema = '' if schema is None else schema + '.'
- if query is None:
- query = "DROP TABLE {0}{1};".format(schema, tablename)
- try:
- if self.check_if_table_exists(tablename):
- self.execute(query)
- except Exception as e:
- err = ("Could not drop the table {0} ."
- "Finished with error {1}"
- .format(tablename, e))
- self._log.error(err)
- raise Exception(err)
- def get_column_names(self, tablename: str,
- schema: str = None,
- query: str = None):
- '''
- Tries to retrieve column information from database with given query.
- If this does not work, tries to select one row from the given table.
- :param str tablename:
- :param str schema:
- :param str query: if not specified, tries to select column
- names in the information_schema specified in db_metadata
- '''
- if self._is_case_insensitive:
- tablename = tablename.upper()
- if not self.check_if_table_exists(tablename=tablename,
- schema=schema):
- err = "Table {} does not exist".format(tablename)
- self._log.error(err)
- raise Exception(err)
- try:
- if query is None:
- if self._db_metadata is None:
- raise Exception('Please specify the query')
- else:
- schema_or_default_schema =\
- self._db_metadata['default_schema']\
- if schema is None else schema
- query = """SELECT DISTINCT {0}
- FROM {1}.columns
- WHERE {2} = '{3}'
- AND {4} = '{5}'
- """.format(
- self._db_metadata['column_col'],
- self._db_metadata['information_schema'],
- self._db_metadata['schema_col'],
- schema_or_default_schema,
- self._db_metadata['table_col'],
- tablename)
- colnames = [c.lower() for c in
- self.read_sql_to_dataframe(query).iloc[:, 0].tolist()]
- except Exception as e:
- self._log.warn((
- 'Could not select columns from '
- 'informational schema. Trying to '
- 'load the table into a dataframe and selec column names.'
- 'Obtained exception {}').format(e))
- query = """SELECT *
- FROM {0}{1}
- LIMIT 1
- """.format('' if schema is None else schema + '.',
- tablename)
- data = self.execute(query)
- colnames = data.columns.tolist()
- return colnames
- def load_csv_to_db(self, filename: str,
- tablename: str,
- schema: str = None,
- query: str = None,
- **kwargs):
- '''
- Tries to load data from csv file to database with a given query.
- If this does not work, tries to load data from csv to a
- pandas dataframe first, and then write it to the database.
- :param str filename:
- :param str tablename:
- :param str schema:
- :param str query: if not specified, tries to use
- LOAD DATA LOCAL INFILE query
- '''
- if not self.check_if_table_exists(tablename=tablename,
- schema=schema):
- err = ('Table {} test does not exit.'
- 'Please create it first').format(tablename)
- self._log.error(err)
- raise Exception(err)
- else:
- try:
- if query is None:
- query = """LOAD DATA LOCAL INFILE '{0}'
- INTO TABLE {1}{2}
- COLUMNS TERMINATED BY ','
- OPTIONALLY ENCLOSED BY '"'
- LINES TERMINATED BY '\r\n'
- IGNORE 1 LINES
- ({3})
- ;""".format(
- filename,
- '' if schema is None else schema + '.',
- tablename,
- ','.join(self.get_column_names(tablename)))
- self.execute(query)
- except Exception as e:
- err = ("Could not load the file {0} "
- "to the table {1} ."
- "Finished with error {2}")\
- .format(filename, tablename, e)
- self._log.error(err)
- raise Exception(err)
- def read_sql_to_dataframe(self, query: str, **read_sql_kwargs):
- '''
- :param str query: normally a SELECT sql statement
- :param read_sql_kwargs: additional arguments to pandas read_sql method
- :return: selected data
- :rtype: DataFrame
- '''
- try:
- connection = self._engine.connect()
- data = pd.read_sql(sql=query,
- con=connection,
- **read_sql_kwargs)
- connection.close()
- return data
- except Exception as e:
- err = ("Could not read the query to a dataframe. "
- "Finished with error {}").format(e)
- self._log.error(err)
- raise Exception(err)
- def read_table(self, tablename: str,
- schema: str = None,
- **read_sql_kwargs):
- '''
- :param str tablename:
- :param str schema:
- :param read_sql_kwargs: additional arguments to pands read_sql_method
- :return: selected table
- :rtype: DataFrame
- '''
- schema = '' if schema is None else schema + '.'
- try:
- return self.read_sql_to_dataframe(
- query="SELECT * FROM {0}{1};".format(schema, tablename),
- **read_sql_kwargs)
- except Exception as e:
- err = ("Could not read the table {0} to a dataframe. "
- "Finished with error {1}").format(tablename, e)
- self._log.error(err)
- raise Exception(err)
- def append_to_table(self, data: pd.DataFrame,
- tablename: str,
- schema: str = None,
- to_sql_kwargs={'index': False}):
- '''
- :param DataFrame data: data to append
- :param str tablename: table where data is appended
- :param str schema:
- :param dict to_sql_kwargs: additional arguments to pandas to_sql method
- '''
- if schema is not None:
- self.create_schema(schema)
- try:
- connection = self._engine.connect()
- data.to_sql(name=tablename,
- schema=schema,
- con=connection,
- if_exists='append',
- **to_sql_kwargs)
- connection.close()
- except Exception as e:
- err = ("Could append data to the table {0}. "
- "Finished with error {1}").format(tablename, e)
- self._log.error(err)
- raise Exception(err)
- def overwrite_table(self, data: pd.DataFrame,
- tablename: str,
- schema: str = None,
- to_sql_kwargs={'index': False}):
- '''
- :param DataFrame data: data to write to dabase
- :param str tablename: table where data is written
- :param str schema:
- :param to_sql_kwargs: additional arguments to pandas to_sql method
- '''
- if schema is not None:
- self.create_schema(schema)
- try:
- connection = self._engine.connect()
- data.to_sql(name=tablename,
- schema=schema,
- con=connection,
- if_exists='replace',
- **to_sql_kwargs)
- connection.close()
- except Exception as e:
- err = ("Could overwrite the table {0}. "
- "Finished with error {1}").format(tablename, e)
- self._log.error(err)
- raise Exception(err)
- def draw_er_diagram_from_db(self, diagram_path: str = None,
- schema: str = None,
- include_tables: list = None):
- '''
- '''
- if diagram_path is None:
- diagram_path = "erd.png"
- else:
- diagram_dir = os.path.dirname(diagram_path)
- if diagram_dir != "":
- os.makedirs(diagram_dir, exist_ok=True)
- import eralchemy
- eralchemy.render_er(self._db_uri,
- diagram_path,
- schema=schema,
- include_tables=include_tables)
|