瀏覽代碼

added InfluxdbHandler

tanja 3 年之前
父節點
當前提交
cdce792d33
共有 1 個文件被更改,包括 117 次插入0 次删除
  1. 117 0
      cdplib/db_handlers/InfluxdbHandler.py

+ 117 - 0
cdplib/db_handlers/InfluxdbHandler.py

@@ -0,0 +1,117 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Tue Feb 23 19:44:22 2021
+
+@author: tanya
+"""
+
+from cdplib.log import Log
+
+import pandas as pd
+
+from influxdb import DataFrameClient
+
+
+class InfluxdbHandler:
+    """
+    """
+    def __init__(self, database_url: str = None):
+        """
+        :param database_url: DESCRIPTION
+        :type database_url: str
+        :return: DESCRIPTION
+        :rtype: TYPE
+        """
+        self._logger = Log("InfluxdbHandler:")
+
+        if database_url is None:
+            database_url = self._read_url_from_env()
+
+        self.client = DataFrameClient.from_dsn(database_url)
+
+    def _read_url_from_env(self) -> str:
+        """
+        :return: parse database url from the configuration object.
+         configuration object is create by the script
+         /libraries.configuration.py and uses a configuration file
+         (by default .env)
+        :rtype: str
+
+        """
+        try:
+            from libraries.configuration import default as cfg
+
+            assert(cfg["INFLUX"] is not None),\
+                "configuration file must contain [INFLUX]"
+
+            assert(set(["INFLUX_HOST", "INFLUX_PORT", "INFLUX_DATABASE_NAME"])
+                   <= set(cfg["INFLUX"])),\
+                ("configuration file must contain influx host, ",
+                 " port, and database name")
+
+            database_url = "influxdb://"
+
+            if "INFLUX_USER" in cfg["INFLUX"]:
+                database_url += cfg["INFLUX"]["INFLUX_USER"]
+
+            if "INFLUX_PASSWORD" in cfg["INFLUX"]:
+                database_url += ":" + cfg["INFLUX"]["INFLUX_PASSWORD"]
+
+            database_url += "@{0}:{1}/{2}".format(
+                cfg["INFLUX"]["INFLUX_HOST"],
+                cfg["INFLUX"]["INFLUX_PORT"],
+                cfg["INFLUX"]["INFLUX_DATABASE_NAME"])
+
+            return database_url
+
+        except Exception as e:
+            self._logger.log_and_raise_error(
+                ("Could not parse url from configuration file. "
+                 "Exit with error {}".format(e)))
+
+    def query_to_dataframe(self, query: str) -> pd.DataFrame:
+        """
+        :param query: DESCRIPTION
+        :type query: str
+        :return: DESCRIPTION
+        :rtype: TYPE
+        """
+        try:
+            # result of the query is a defaultdict
+            result = self.client.query(query)
+
+            return list(result.values())[0]
+        except Exception as e:
+            self._logger.log_and_raise_error(
+                ("Could not query to dataframe. "
+                 "Exit with error {}".format(e)))
+
+    def query_between_dates(self, columns: str,
+                            tables: str,
+                            start: str,
+                            stop: str) -> pd.DataFrame:
+        """
+        :param columns: DESCRIPTION
+        :type columns: str
+        :param tables: DESCRIPTION
+        :type tables: str
+        :param start: DESCRIPTION
+        :type start: str
+        :param stop: DESCRIPTION
+        :type stop: str
+        :return: DESCRIPTION
+        :rtype: TYPE
+
+        """
+        query = 'SELECT ' +\
+                columns +\
+                ' FROM \"' +\
+                tables +\
+                '\" WHERE time > \'' +\
+                str(start) +\
+                '\' AND time  < \'' +\
+                str(stop) +\
+                '\' tz(\'Europe/Berlin\');'
+
+        return self.query_to_dataframe(query)