@@ -0,0 +1,184 @@
+import os
+import sys
+from pandas.io.sql import table_exists
+from scipy.sparse import data
+import pandas as pd
+import sqlalchemy
+from cdplib.log import Log
+class MSSQLHandler:
+ def __init__(self, db_uri: str = None):
+ self._log = Log(name='MSSQLHandler')
+ if db_uri is None:
+ from libraries.configuration import default as cfg
+ db_uri = sqlalchemy.engine.url.URL.create(
+ cfg["SQL"]["SQL_DIALECT"],
+ username=cfg["SQL"]["SQL_USER"],
+ password=cfg["SQL"]["SQL_PASSWORD"],
+ host=cfg["SQL"]["SQL_HOST"],
+ port=cfg["SQL"]["SQL_PORT"],
+ database=cfg["SQL"]["SQL_DATABASE_NAME"],
+ query=dict(driver=cfg["SQL"]["SQL_DRIVER"])
+ )
+ self._engine = sqlalchemy.create_engine(db_uri)
+ self._db_uri = str(db_uri)
+ self.create_database(self._connection_params["db"])
+ @property
+ def _connection_params(self) -> dict:
+ '''
+ return: connection parameters like user,
+ password, host, port, and database name
+ rtype: dict
+ '''
+ try:
+ connection_params = {}
+ connection_params['user'], connection_params['password'] =\
+ str(self._db_uri).split('//')[1]\
+ .split('@')[0]\
+ .split(':')
+ connection_params['host'], connection_params['port'] =\
+ str(self._db_uri).split('//')[1]\
+ .split('@')[1]\
+ .split('/')[0]\
+ .split(':')
+ connection_params['db'] = str(self._db_uri).split('/')[-1]\
+ .split('?')[0]
+ return connection_params
+ except Exception as e:
+ err = ("Could not parse connection parameters."
+ "Finished with error {}")\
+ .format(e)
+ self._log.error(err)
+ raise Exception(err)
+ def query(self, query: str):
+ try:
+ return self._engine.execute(query)
+ except Exception as e:
+ err = ("Could not execute the query "
+ "Finished with error {}").format(e)
+ self._log.error(err)
+ raise Exception(err)
+ def query_to_dataframe(self, query: str, **read_sql_kwargs):
+ try:
+ connection = self._engine.connect()
+ data = pd.read_sql(sql=query,
+ con=connection,
+ **read_sql_kwargs)
+ connection.close()
+ return data
+ except Exception as e:
+ err = ("Could not read the query to a dataframe. "
+ "Finished with error {}").format(e)
+ self._log.error(err)
+ raise Exception(err)
+ def create_database(self, database: str):
+ sql = "IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = '"+ database +"') BEGIN CREATE DATABASE "+ database +" END"
+ # TODO : give user feedback that database was created
+ return self.query(sql)
+ def create_table(self, database: str, table: str, schema: str):
+ # sql = "IF NOT EXISTS (SELECT * FROM "+ database +".INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '"+ table +"') CREATE TABLE " + table + " ( "+ schema +")"
+ if not self.table_exist(table):
+ sql = "CREATE TABLE " + table + " ( "+ schema +")"
+ print(sql)
+ try:
+ self.query(sql)
+ self._log.info("The new table: {} has been created in database: {}".format(table, database))
+ except Exception as error:
+ self._log.error("An error occured when creating table: {} in database: {}, Error: {}".format(table, database, error))
+ else:
+ self._log.info("The new table: {} already exist in database: {}".format(table, database))
+ def get_databases(self, as_dataframe: bool=True):
+ sql = "SELECT * FROM sys.databases"
+ if as_dataframe:
+ return self.query_to_dataframe(sql)
+ else:
+ return self.query(sql)
+ def get_tables_from_database(self, database: str, as_dataframe: bool=True):
+ if as_dataframe:
+ return self.query_to_dataframe(sql)
+ else:
+ return self.query(sql)
+ def table_exist(self, table: str, schema: str = None):
+ return bool(self.query(sql).first()[0])
+ def insert_data(self, data, table: str, schema: str = None, to_sql_kwargs: dict={'index': False}, insert_method: str="append"):
+ try:
+ connection = self._engine.connect()
+ if not self.table_exist:
+ if schema is None:
+ err = "Data could not be inserted, table does not exist and no schema to create one was submitted. \
+ Create table using MSSQLHandler.create_table or submit a valid schema and try again"
+ raise Exception(err)
+ else:
+ self.create_table(self._connection_params["db"], table, schema)
+ if type(data) == pd.DataFrame:
+ data.to_sql(name=table,
+ schema=schema,
+ con=connection,
+ if_exists=insert_method,
+ **to_sql_kwargs)
+ connection.close()
+ except Exception as e:
+ err = ("Could not insert data. "
+ "Finished with error {}").format(e)
+ self._log.error(err)
+ raise Exception(err)
+ def drop_table(self, table: str):
+ if self.table_exist(table):
+ try:
+ sql = "DROP TABLE " + table
+ self.query(sql)
+ self._log.info("The table has been dropped")
+ except Exception as error:
+ self._log.error("Could not drop the table, Error {}".format(error))
+ else:
+ self._log.info("The table can't be dropped because it does not exist")