Explorar el Código

Create a new sql class to handle MS SQL dialect

ogert hace 3 años
padre
commit
dfa7128d51
Se han modificado 3 ficheros con 203 adiciones y 28 borrados
  1. 184 0
      cdplib/db_handlers/MSSQLHandler.py
  2. 16 27
      cdplib/db_handlers/SQLHandler.py
  3. 3 1
      cdplib/db_handlers/__init__.py

+ 184 - 0
cdplib/db_handlers/MSSQLHandler.py

@@ -0,0 +1,184 @@
+import os
+import sys
+
+from pandas.io.sql import table_exists
+from scipy.sparse import data
+sys.path.append(os.getcwd())
+import pandas as pd
+import sqlalchemy
+
+from cdplib.log import Log
+
+class MSSQLHandler:
+
+    def __init__(self, db_uri: str = None):
+        
+        self._log = Log(name='MSSQLHandler')
+
+        if db_uri is None:
+
+            from libraries.configuration import default as cfg
+
+            
+            db_uri = sqlalchemy.engine.url.URL.create(
+                        cfg["SQL"]["SQL_DIALECT"],
+                        username=cfg["SQL"]["SQL_USER"],
+                        password=cfg["SQL"]["SQL_PASSWORD"],
+                        host=cfg["SQL"]["SQL_HOST"],
+                        port=cfg["SQL"]["SQL_PORT"],
+                        database=cfg["SQL"]["SQL_DATABASE_NAME"],
+                        query=dict(driver=cfg["SQL"]["SQL_DRIVER"])
+                    )
+
+        self._engine = sqlalchemy.create_engine(db_uri)
+        self._db_uri = str(db_uri)
+
+        self.create_database(self._connection_params["db"])
+
+
+    @property
+    def _connection_params(self) -> dict:
+        '''
+        return: connection parameters like user,
+        password, host, port, and database name
+        rtype: dict
+        '''
+        try:
+            connection_params = {}
+
+            connection_params['user'], connection_params['password'] =\
+                str(self._db_uri).split('//')[1]\
+                            .split('@')[0]\
+                            .split(':')
+
+            connection_params['host'], connection_params['port'] =\
+                str(self._db_uri).split('//')[1]\
+                            .split('@')[1]\
+                            .split('/')[0]\
+                            .split(':')
+
+            connection_params['db'] = str(self._db_uri).split('/')[-1]\
+                                                  .split('?')[0]
+
+            return connection_params
+
+        except Exception as e:
+            err = ("Could not parse connection parameters."
+                   "Finished with error {}")\
+                   .format(e)
+
+            self._log.error(err)
+            raise Exception(err)
+
+
+    def query(self, query: str):
+        try:
+            return self._engine.execute(query)
+        except Exception as e:
+            err = ("Could not execute the query "
+                   "Finished with error {}").format(e)
+
+            self._log.error(err)
+            raise Exception(err)
+
+    def query_to_dataframe(self, query: str, **read_sql_kwargs):
+        try:
+            
+            connection = self._engine.connect()
+
+            data = pd.read_sql(sql=query,
+                               con=connection,
+                               **read_sql_kwargs)
+
+            connection.close()
+           
+            return data
+
+        except Exception as e:
+            err = ("Could not read the query to a dataframe. "
+                   "Finished with error {}").format(e)
+
+            self._log.error(err)
+            raise Exception(err)
+
+    
+    def create_database(self, database: str):
+        sql = "IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = '"+ database +"') BEGIN CREATE DATABASE "+ database +" END"
+        # TODO : give user feedback that database was created
+        return self.query(sql)
+
+    def create_table(self, database: str, table: str, schema: str):
+        # sql = "IF NOT EXISTS (SELECT * FROM "+ database +".INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '"+ table +"') CREATE TABLE " + table + " ( "+ schema +")"
+        if not self.table_exist(table):
+            sql = "CREATE TABLE " + table + " ( "+ schema +")"
+            print(sql)
+            try:
+                self.query(sql)
+                self._log.info("The new table: {} has been created in database: {}".format(table, database))
+            except Exception as error:
+                self._log.error("An error occured when creating table: {} in database: {}, Error: {}".format(table, database, error))
+        else:
+            self._log.info("The new table: {} already exist in database: {}".format(table, database))
+
+    def get_databases(self, as_dataframe: bool=True):
+        sql = "SELECT * FROM sys.databases"
+        if as_dataframe:
+            return self.query_to_dataframe(sql)
+        else:
+            return self.query(sql)
+
+    def get_tables_from_database(self, database: str, as_dataframe: bool=True):
+        sql = "SELECT * FROM "+ database +".INFORMATION_SCHEMA.TABLES"
+        if as_dataframe:
+            return self.query_to_dataframe(sql)
+        else:
+            return self.query(sql)
+
+
+    def table_exist(self, table: str, schema: str = None):
+        sql = "IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '"+ table +"') BEGIN SELECT 1 END ELSE BEGIN SELECT 0 END"
+        return bool(self.query(sql).first()[0])
+
+    def insert_data(self, data, table: str, schema: str = None, to_sql_kwargs: dict={'index': False}, insert_method: str="append"):
+        
+        try:
+            
+            connection = self._engine.connect()
+
+            if not self.table_exist:
+                if schema is None:
+                    err = "Data could not be inserted, table does not exist and no schema to create one was submitted. \
+                            Create table using MSSQLHandler.create_table or submit a valid schema and try again"
+                    raise Exception(err)
+                else:    
+                    self.create_table(self._connection_params["db"], table, schema)
+
+
+            if type(data) == pd.DataFrame:
+                data.to_sql(name=table,
+                            schema=schema,
+                            con=connection,
+                            if_exists=insert_method,
+                            **to_sql_kwargs)
+
+            connection.close()
+            
+
+        except Exception as e:
+            err = ("Could not insert data. "
+                   "Finished with error {}").format(e)
+
+            self._log.error(err)
+            raise Exception(err)
+
+    def drop_table(self, table: str):
+        if self.table_exist(table):
+            try:
+                sql = "DROP TABLE " + table
+                self.query(sql)
+                self._log.info("The table has been dropped")
+            except Exception as error:
+                self._log.error("Could not drop the table, Error {}".format(error))
+
+        else:
+            self._log.info("The table can't be dropped because it does not exist")

+ 16 - 27
cdplib/db_handlers/SQLHandler.py

@@ -66,33 +66,22 @@ class SQLHandler:
             from libraries.configuration import default as cfg
 
             
-            if "SQL_DRIVER" in cfg["SQL"].keys():
-                                db_uri = sqlalchemy.engine.url.URL.create(
-                                cfg["SQL"]["SQL_DIALECT"],
-                                username=cfg["SQL"]["SQL_USER"],
-                                password=cfg["SQL"]["SQL_PASSWORD"],
-                                host=cfg["SQL"]["SQL_HOST"],
-                                port=cfg["SQL"]["SQL_PORT"],
-                                database=cfg["SQL"]["SQL_DATABASE_NAME"],
-                                query=dict(driver=cfg["SQL"]["SQL_DRIVER"])
-                                )
-            else:
-                # TODO : See if this still connect to Mariadb, also try to change to sqlalchemy.engine.url.URL.create
-                db_uri = "{0}://{1}:{2}@{3}:{4}/{5}?driver={}&charset=utf8&local_infile=1"\
-                     .format(cfg["SQL"]["SQL_DIALECT"],
-                             cfg["SQL"]["SQL_USER"],
-                             cfg["SQL"]["SQL_PASSWORD"],
-                             cfg["SQL"]["SQL_HOST"],
-                             cfg["SQL"]["SQL_PORT"],
-                             cfg["SQL"]["SQL_DATABASE_NAME"])
-
-
-        # assert(isinstance(db_uri, str)),\
-        #     "Parameter 'db_uri' must be of type str"
-
-        # assert(re.match(r'.+://.+:(.+)?@.+:.+/.+', db_uri) is not None),\
-        #     ('database url does not match the pattern: '
-        #      'sqlalchemy_dialect//user:password@host:port/dbname')
+            db_uri = "{0}://{1}:{2}@{3}:{4}/{5}?driver={}&charset=utf8&local_infile=1"\
+                    .format(cfg["SQL"]["SQL_DIALECT"],
+                            cfg["SQL"]["SQL_USER"],
+                            cfg["SQL"]["SQL_PASSWORD"],
+                            cfg["SQL"]["SQL_HOST"],
+                            cfg["SQL"]["SQL_PORT"],
+                            cfg["SQL"]["SQL_DATABASE_NAME"])
+
+
+        # TODO : Dont know if this is still true, it might have to be a "sqlalchemy.engine.url.URL.create"
+        assert(isinstance(db_uri, str)),\
+            "Parameter 'db_uri' must be of type str"
+
+        assert(re.match(r'.+://.+:(.+)?@.+:.+/.+', db_uri) is not None),\
+            ('database url does not match the pattern: '
+             'sqlalchemy_dialect//user:password@host:port/dbname')
 
         self._db_uri = db_uri
 

+ 3 - 1
cdplib/db_handlers/__init__.py

@@ -1,2 +1,4 @@
 from .MongodbHandler import *
-from .SQLHandler import *
+from .SQLHandler import *
+from .MSSQLHandler import *
+from .InfluxdbHandler import *