MSSQLHandler.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. import os
  2. import sys
  3. from pandas.io.sql import table_exists
  4. from scipy.sparse import data
  5. sys.path.append(os.getcwd())
  6. import pandas as pd
  7. import sqlalchemy
  8. from cdplib.log import Log
  9. class MSSQLHandler:
  10. def __init__(self, db_uri: str = None):
  11. self._log = Log(name='MSSQLHandler')
  12. if db_uri is None:
  13. from libraries.configuration import default as cfg
  14. db_uri = sqlalchemy.engine.url.URL.create(
  15. cfg["MSSQL"]["MSSQL_DIALECT"],
  16. username=cfg["MSSQL"]["MSSQL_USER"],
  17. password=cfg["MSSQL"]["MSSQL_PASSWORD"],
  18. host=cfg["MSSQL"]["MSSQL_HOST"],
  19. port=cfg["MSSQL"]["MSSQL_PORT"],
  20. database=cfg["MSSQL"]["MSSQL_DATABASE_NAME"],
  21. query=dict(driver=cfg["MSSQL"]["MSSQL_DRIVER"])
  22. )
  23. self._engine = sqlalchemy.create_engine(db_uri)
  24. self._db_uri = str(db_uri)
  25. self.create_database(self._connection_params["db"])
  26. @property
  27. def _connection_params(self) -> dict:
  28. '''
  29. return: connection parameters like user,
  30. password, host, port, and database name
  31. rtype: dict
  32. '''
  33. try:
  34. connection_params = {}
  35. connection_params['user'], connection_params['password'] =\
  36. str(self._db_uri).split('//')[1]\
  37. .split('@')[0]\
  38. .split(':')
  39. connection_params['host'], connection_params['port'] =\
  40. str(self._db_uri).split('//')[1]\
  41. .split('@')[1]\
  42. .split('/')[0]\
  43. .split(':')
  44. connection_params['db'] = str(self._db_uri).split('/')[-1]\
  45. .split('?')[0]
  46. return connection_params
  47. except Exception as e:
  48. err = ("Could not parse connection parameters."
  49. "Finished with error {}")\
  50. .format(e)
  51. self._log.error(err)
  52. raise Exception(err)
  53. def query(self, query: str):
  54. try:
  55. return self._engine.execute(query)
  56. except Exception as e:
  57. err = ("Could not execute the query "
  58. "Finished with error {}").format(e)
  59. self._log.error(err)
  60. raise Exception(err)
  61. def query_to_dataframe(self, query: str, **read_sql_kwargs):
  62. try:
  63. connection = self._engine.connect()
  64. data = pd.read_sql(sql=query,
  65. con=connection,
  66. **read_sql_kwargs)
  67. connection.close()
  68. return data
  69. except Exception as e:
  70. err = ("Could not read the query to a dataframe. "
  71. "Finished with error {}").format(e)
  72. self._log.error(err)
  73. raise Exception(err)
  74. def create_database(self, database: str):
  75. sql = "IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = '"+ database +"') BEGIN CREATE DATABASE "+ database +" END"
  76. # TODO : give user feedback that database was created
  77. return self.query(sql)
  78. def create_table(self, database: str, table: str, schema: str):
  79. # sql = "IF NOT EXISTS (SELECT * FROM "+ database +".INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '"+ table +"') CREATE TABLE " + table + " ( "+ schema +")"
  80. if not self.table_exist(table):
  81. sql = "CREATE TABLE " + table + " ( "+ schema +")"
  82. print(sql)
  83. try:
  84. self.query(sql)
  85. self._log.info("The new table: {} has been created in database: {}".format(table, database))
  86. except Exception as error:
  87. self._log.error("An error occured when creating table: {} in database: {}, Error: {}".format(table, database, error))
  88. else:
  89. self._log.info("The new table: {} already exist in database: {}".format(table, database))
  90. def get_databases(self, as_dataframe: bool=True):
  91. sql = "SELECT * FROM sys.databases"
  92. if as_dataframe:
  93. return self.query_to_dataframe(sql)
  94. else:
  95. return self.query(sql)
  96. def get_tables_from_database(self, database: str, as_dataframe: bool=True):
  97. sql = "SELECT * FROM "+ database +".INFORMATION_SCHEMA.TABLES"
  98. if as_dataframe:
  99. return self.query_to_dataframe(sql)
  100. else:
  101. return self.query(sql)
  102. def table_exist(self, table: str, schema: str = None):
  103. sql = "IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '"+ table +"') BEGIN SELECT 1 END ELSE BEGIN SELECT 0 END"
  104. return bool(self.query(sql).first()[0])
  105. def insert_data(self, data, table: str, schema: str = None, to_sql_kwargs: dict={'index': False}, insert_method: str="append"):
  106. try:
  107. connection = self._engine.connect()
  108. if not self.table_exist:
  109. if schema is None:
  110. err = "Data could not be inserted, table does not exist and no schema to create one was submitted. \
  111. Create table using MSSQLHandler.create_table or submit a valid schema and try again"
  112. raise Exception(err)
  113. else:
  114. self.create_table(self._connection_params["db"], table, schema)
  115. if type(data) == pd.DataFrame:
  116. data.to_sql(name=table,
  117. schema=schema,
  118. con=connection,
  119. if_exists=insert_method,
  120. **to_sql_kwargs)
  121. connection.close()
  122. except Exception as e:
  123. err = ("Could not insert data. "
  124. "Finished with error {}").format(e)
  125. self._log.error(err)
  126. raise Exception(err)
  127. def drop_table(self, table: str):
  128. if self.table_exist(table):
  129. try:
  130. sql = "DROP TABLE " + table
  131. self.query(sql)
  132. self._log.info("The table has been dropped")
  133. except Exception as error:
  134. self._log.error("Could not drop the table, Error {}".format(error))
  135. else:
  136. self._log.info("The table can't be dropped because it does not exist")