123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Tue Feb 23 19:44:22 2021
- @author: tanya
- """
- from cdplib.log import Log
- import pandas as pd
- from influxdb import DataFrameClient
- from datetime import datetime
- class InfluxdbHandler:
- """
- """
- def __init__(self, database_url: str = None):
- """
- :param database_url: DESCRIPTION
- :type database_url: str
- :return: DESCRIPTION
- :rtype: TYPE
- """
- self._logger = Log("InfluxdbHandler:")
- if database_url is None:
- database_url = self._read_url_from_env()
- self.client = DataFrameClient.from_dsn(database_url)
- def _read_url_from_env(self) -> str:
- """
- :return: parse database url from the configuration object.
- configuration object is create by the script
- /libraries.configuration.py and uses a configuration file
- (by default .env)
- :rtype: str
- """
- try:
- from libraries.configuration import default as cfg
- assert(cfg["INFLUX"] is not None),\
- "configuration file must contain [INFLUX]"
- assert(set(["INFLUX_HOST", "INFLUX_PORT", "INFLUX_DATABASE_NAME"])
- <= set(cfg["INFLUX"])),\
- ("configuration file must contain influx host, ",
- " port, and database name")
- database_url = "influxdb://"
- if "INFLUX_USER" in cfg["INFLUX"]:
- database_url += cfg["INFLUX"]["INFLUX_USER"]
- if "INFLUX_PASSWORD" in cfg["INFLUX"]:
- database_url += ":" + cfg["INFLUX"]["INFLUX_PASSWORD"]
- database_url += "@{0}:{1}/{2}".format(
- cfg["INFLUX"]["INFLUX_HOST"],
- cfg["INFLUX"]["INFLUX_PORT"],
- cfg["INFLUX"]["INFLUX_DATABASE_NAME"])
- return database_url
- except Exception as e:
- self._logger.log_and_raise_error(
- ("Could not parse url from configuration file. "
- "Exit with error {}".format(e)))
- def query_to_dataframe(self, query: str) -> pd.DataFrame:
- """
- :param query: DESCRIPTION
- :type query: str
- :return: DESCRIPTION
- :rtype: TYPE
- """
- try:
- # result of the query is a defaultdict
- result = self.client.query(query)
- return list(result.values())[0]
- except Exception as e:
- self._logger.log_and_raise_error(
- ("Could not query to dataframe. "
- "Exit with error {}".format(e)))
- def query_between_dates(self, columns: str,
- tables: str,
- start: str,
- stop: str) -> pd.DataFrame:
- """
- :param columns: DESCRIPTION
- :type columns: str
- :param tables: DESCRIPTION
- :type tables: str
- :param start: DESCRIPTION
- :type start: str
- :param stop: DESCRIPTION
- :type stop: str
- :return: DESCRIPTION
- :rtype: TYPE
- """
- if not isinstance(start, str):
- start = datetime.strftime(start, format="%Y-%m-%dT%H:%M:%SZ")
- if not isinstance(stop, str):
- stop = datetime.strftime(stop, format="%Y-%m-%dT%H:%M:%SZ")
- query = 'SELECT ' +\
- columns +\
- ' FROM \"' +\
- tables +\
- '\" WHERE time > \'' +\
- str(start) +\
- '\' AND time < \'' +\
- str(stop) +\
- '\' tz(\'Europe/Berlin\');'
- return self.query_to_dataframe(query)
|