|
@@ -0,0 +1,184 @@
|
|
|
+import os
|
|
|
+import sys
|
|
|
+
|
|
|
+from pandas.io.sql import table_exists
|
|
|
+from scipy.sparse import data
|
|
|
+sys.path.append(os.getcwd())
|
|
|
+import pandas as pd
|
|
|
+import sqlalchemy
|
|
|
+
|
|
|
+from cdplib.log import Log
|
|
|
+
|
|
|
+class MSSQLHandler:
|
|
|
+
|
|
|
+ def __init__(self, db_uri: str = None):
|
|
|
+
|
|
|
+ self._log = Log(name='MSSQLHandler')
|
|
|
+
|
|
|
+ if db_uri is None:
|
|
|
+
|
|
|
+ from libraries.configuration import default as cfg
|
|
|
+
|
|
|
+
|
|
|
+ db_uri = sqlalchemy.engine.url.URL.create(
|
|
|
+ cfg["SQL"]["SQL_DIALECT"],
|
|
|
+ username=cfg["SQL"]["SQL_USER"],
|
|
|
+ password=cfg["SQL"]["SQL_PASSWORD"],
|
|
|
+ host=cfg["SQL"]["SQL_HOST"],
|
|
|
+ port=cfg["SQL"]["SQL_PORT"],
|
|
|
+ database=cfg["SQL"]["SQL_DATABASE_NAME"],
|
|
|
+ query=dict(driver=cfg["SQL"]["SQL_DRIVER"])
|
|
|
+ )
|
|
|
+
|
|
|
+ self._engine = sqlalchemy.create_engine(db_uri)
|
|
|
+ self._db_uri = str(db_uri)
|
|
|
+
|
|
|
+ self.create_database(self._connection_params["db"])
|
|
|
+
|
|
|
+
|
|
|
+ @property
|
|
|
+ def _connection_params(self) -> dict:
|
|
|
+ '''
|
|
|
+ return: connection parameters like user,
|
|
|
+ password, host, port, and database name
|
|
|
+ rtype: dict
|
|
|
+ '''
|
|
|
+ try:
|
|
|
+ connection_params = {}
|
|
|
+
|
|
|
+ connection_params['user'], connection_params['password'] =\
|
|
|
+ str(self._db_uri).split('//')[1]\
|
|
|
+ .split('@')[0]\
|
|
|
+ .split(':')
|
|
|
+
|
|
|
+ connection_params['host'], connection_params['port'] =\
|
|
|
+ str(self._db_uri).split('//')[1]\
|
|
|
+ .split('@')[1]\
|
|
|
+ .split('/')[0]\
|
|
|
+ .split(':')
|
|
|
+
|
|
|
+ connection_params['db'] = str(self._db_uri).split('/')[-1]\
|
|
|
+ .split('?')[0]
|
|
|
+
|
|
|
+ return connection_params
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Could not parse connection parameters."
|
|
|
+ "Finished with error {}")\
|
|
|
+ .format(e)
|
|
|
+
|
|
|
+ self._log.error(err)
|
|
|
+ raise Exception(err)
|
|
|
+
|
|
|
+
|
|
|
+ def query(self, query: str):
|
|
|
+ try:
|
|
|
+ return self._engine.execute(query)
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Could not execute the query "
|
|
|
+ "Finished with error {}").format(e)
|
|
|
+
|
|
|
+ self._log.error(err)
|
|
|
+ raise Exception(err)
|
|
|
+
|
|
|
+ def query_to_dataframe(self, query: str, **read_sql_kwargs):
|
|
|
+ try:
|
|
|
+
|
|
|
+ connection = self._engine.connect()
|
|
|
+
|
|
|
+ data = pd.read_sql(sql=query,
|
|
|
+ con=connection,
|
|
|
+ **read_sql_kwargs)
|
|
|
+
|
|
|
+ connection.close()
|
|
|
+
|
|
|
+ return data
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Could not read the query to a dataframe. "
|
|
|
+ "Finished with error {}").format(e)
|
|
|
+
|
|
|
+ self._log.error(err)
|
|
|
+ raise Exception(err)
|
|
|
+
|
|
|
+
|
|
|
+ def create_database(self, database: str):
|
|
|
+ sql = "IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = '"+ database +"') BEGIN CREATE DATABASE "+ database +" END"
|
|
|
+ # TODO : give user feedback that database was created
|
|
|
+ return self.query(sql)
|
|
|
+
|
|
|
+ def create_table(self, database: str, table: str, schema: str):
|
|
|
+ # sql = "IF NOT EXISTS (SELECT * FROM "+ database +".INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '"+ table +"') CREATE TABLE " + table + " ( "+ schema +")"
|
|
|
+ if not self.table_exist(table):
|
|
|
+ sql = "CREATE TABLE " + table + " ( "+ schema +")"
|
|
|
+ print(sql)
|
|
|
+ try:
|
|
|
+ self.query(sql)
|
|
|
+ self._log.info("The new table: {} has been created in database: {}".format(table, database))
|
|
|
+ except Exception as error:
|
|
|
+ self._log.error("An error occured when creating table: {} in database: {}, Error: {}".format(table, database, error))
|
|
|
+ else:
|
|
|
+ self._log.info("The new table: {} already exist in database: {}".format(table, database))
|
|
|
+
|
|
|
+ def get_databases(self, as_dataframe: bool=True):
|
|
|
+ sql = "SELECT * FROM sys.databases"
|
|
|
+ if as_dataframe:
|
|
|
+ return self.query_to_dataframe(sql)
|
|
|
+ else:
|
|
|
+ return self.query(sql)
|
|
|
+
|
|
|
+ def get_tables_from_database(self, database: str, as_dataframe: bool=True):
|
|
|
+ sql = "SELECT * FROM "+ database +".INFORMATION_SCHEMA.TABLES"
|
|
|
+ if as_dataframe:
|
|
|
+ return self.query_to_dataframe(sql)
|
|
|
+ else:
|
|
|
+ return self.query(sql)
|
|
|
+
|
|
|
+
|
|
|
+ def table_exist(self, table: str, schema: str = None):
|
|
|
+ sql = "IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '"+ table +"') BEGIN SELECT 1 END ELSE BEGIN SELECT 0 END"
|
|
|
+ return bool(self.query(sql).first()[0])
|
|
|
+
|
|
|
+ def insert_data(self, data, table: str, schema: str = None, to_sql_kwargs: dict={'index': False}, insert_method: str="append"):
|
|
|
+
|
|
|
+ try:
|
|
|
+
|
|
|
+ connection = self._engine.connect()
|
|
|
+
|
|
|
+ if not self.table_exist:
|
|
|
+ if schema is None:
|
|
|
+ err = "Data could not be inserted, table does not exist and no schema to create one was submitted. \
|
|
|
+ Create table using MSSQLHandler.create_table or submit a valid schema and try again"
|
|
|
+ raise Exception(err)
|
|
|
+ else:
|
|
|
+ self.create_table(self._connection_params["db"], table, schema)
|
|
|
+
|
|
|
+
|
|
|
+ if type(data) == pd.DataFrame:
|
|
|
+ data.to_sql(name=table,
|
|
|
+ schema=schema,
|
|
|
+ con=connection,
|
|
|
+ if_exists=insert_method,
|
|
|
+ **to_sql_kwargs)
|
|
|
+
|
|
|
+ connection.close()
|
|
|
+
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ err = ("Could not insert data. "
|
|
|
+ "Finished with error {}").format(e)
|
|
|
+
|
|
|
+ self._log.error(err)
|
|
|
+ raise Exception(err)
|
|
|
+
|
|
|
+ def drop_table(self, table: str):
|
|
|
+ if self.table_exist(table):
|
|
|
+ try:
|
|
|
+ sql = "DROP TABLE " + table
|
|
|
+ self.query(sql)
|
|
|
+ self._log.info("The table has been dropped")
|
|
|
+ except Exception as error:
|
|
|
+ self._log.error("Could not drop the table, Error {}".format(error))
|
|
|
+
|
|
|
+ else:
|
|
|
+ self._log.info("The table can't be dropped because it does not exist")
|