123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- import os
- import sys
- from pandas.io.sql import table_exists
- from scipy.sparse import data
- sys.path.append(os.getcwd())
- import pandas as pd
- import sqlalchemy
- from cdplib.log import Log
- class MSSQLHandler:
- def __init__(self, db_uri: str = None):
-
- self._log = Log(name='MSSQLHandler')
- if db_uri is None:
- from libraries.configuration import default as cfg
-
- db_uri = sqlalchemy.engine.url.URL.create(
- cfg["MSSQL"]["MSSQL_DIALECT"],
- username=cfg["MSSQL"]["MSSQL_USER"],
- password=cfg["MSSQL"]["MSSQL_PASSWORD"],
- host=cfg["MSSQL"]["MSSQL_HOST"],
- port=cfg["MSSQL"]["MSSQL_PORT"],
- database=cfg["MSSQL"]["MSSQL_DATABASE_NAME"],
- query=dict(driver=cfg["MSSQL"]["MSSQL_DRIVER"])
- )
- self._engine = sqlalchemy.create_engine(db_uri)
- self._db_uri = str(db_uri)
- self.create_database(self._connection_params["db"])
- @property
- def _connection_params(self) -> dict:
- '''
- return: connection parameters like user,
- password, host, port, and database name
- rtype: dict
- '''
- try:
- connection_params = {}
- connection_params['user'], connection_params['password'] =\
- str(self._db_uri).split('//')[1]\
- .split('@')[0]\
- .split(':')
- connection_params['host'], connection_params['port'] =\
- str(self._db_uri).split('//')[1]\
- .split('@')[1]\
- .split('/')[0]\
- .split(':')
- connection_params['db'] = str(self._db_uri).split('/')[-1]\
- .split('?')[0]
- return connection_params
- except Exception as e:
- err = ("Could not parse connection parameters."
- "Finished with error {}")\
- .format(e)
- self._log.error(err)
- raise Exception(err)
- def query(self, query: str):
- try:
- return self._engine.execute(query)
- except Exception as e:
- err = ("Could not execute the query "
- "Finished with error {}").format(e)
- self._log.error(err)
- raise Exception(err)
- def query_to_dataframe(self, query: str, **read_sql_kwargs):
- try:
-
- connection = self._engine.connect()
- data = pd.read_sql(sql=query,
- con=connection,
- **read_sql_kwargs)
- connection.close()
-
- return data
- except Exception as e:
- err = ("Could not read the query to a dataframe. "
- "Finished with error {}").format(e)
- self._log.error(err)
- raise Exception(err)
-
- def create_database(self, database: str):
- sql = "IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = '"+ database +"') BEGIN CREATE DATABASE "+ database +" END"
- # TODO : give user feedback that database was created
- return self.query(sql)
- def create_table(self, database: str, table: str, schema: str):
- # sql = "IF NOT EXISTS (SELECT * FROM "+ database +".INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '"+ table +"') CREATE TABLE " + table + " ( "+ schema +")"
- if not self.table_exist(table):
- sql = "CREATE TABLE " + table + " ( "+ schema +")"
- print(sql)
- try:
- self.query(sql)
- self._log.info("The new table: {} has been created in database: {}".format(table, database))
- except Exception as error:
- self._log.error("An error occured when creating table: {} in database: {}, Error: {}".format(table, database, error))
- else:
- self._log.info("The new table: {} already exist in database: {}".format(table, database))
- def get_databases(self, as_dataframe: bool=True):
- sql = "SELECT * FROM sys.databases"
- if as_dataframe:
- return self.query_to_dataframe(sql)
- else:
- return self.query(sql)
- def get_tables_from_database(self, database: str, as_dataframe: bool=True):
- sql = "SELECT * FROM "+ database +".INFORMATION_SCHEMA.TABLES"
- if as_dataframe:
- return self.query_to_dataframe(sql)
- else:
- return self.query(sql)
- def table_exist(self, table: str, schema: str = None):
- sql = "IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '"+ table +"') BEGIN SELECT 1 END ELSE BEGIN SELECT 0 END"
- return bool(self.query(sql).first()[0])
- def insert_data(self, data, table: str, schema: str = None, to_sql_kwargs: dict={'index': False}, insert_method: str="append"):
-
- try:
-
- connection = self._engine.connect()
- if not self.table_exist:
- if schema is None:
- err = "Data could not be inserted, table does not exist and no schema to create one was submitted. \
- Create table using MSSQLHandler.create_table or submit a valid schema and try again"
- raise Exception(err)
- else:
- self.create_table(self._connection_params["db"], table, schema)
- if type(data) == pd.DataFrame:
- data.to_sql(name=table,
- schema=schema,
- con=connection,
- if_exists=insert_method,
- **to_sql_kwargs)
- connection.close()
-
- except Exception as e:
- err = ("Could not insert data. "
- "Finished with error {}").format(e)
- self._log.error(err)
- raise Exception(err)
- def drop_table(self, table: str):
- if self.table_exist(table):
- try:
- sql = "DROP TABLE " + table
- self.query(sql)
- self._log.info("The table has been dropped")
- except Exception as error:
- self._log.error("Could not drop the table, Error {}".format(error))
- else:
- self._log.info("The table can't be dropped because it does not exist")
|