123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Tue Feb 23 19:44:22 2021
- @author: tanya
- """
- from cdplib.log import Log
- import pandas as pd
- from influxdb import DataFrameClient
- from datetime import datetime
- class InfluxdbHandler:
- """
- """
- def __init__(self, database_url: str = None):
- """
- :param database_url: DESCRIPTION
- :type database_url: str
- :return: DESCRIPTION
- :rtype: TYPE
- """
- self._logger = Log("InfluxdbHandler:")
- if database_url is None:
- database_url = self._read_url_from_env()
- self.client = DataFrameClient.from_dsn(database_url)
- def _read_url_from_env(self) -> str:
- """
- :return: parse database url from the configuration object.
- configuration object is create by the script
- /libraries.configuration.py and uses a configuration file
- (by default .env)
- :rtype: str
- """
- try:
- from libraries.configuration import default as cfg
- assert(cfg["INFLUX"] is not None),\
- "configuration file must contain [INFLUX]"
- assert(set(["INFLUX_HOST", "INFLUX_PORT", "INFLUX_DATABASE_NAME"])
- <= set(cfg["INFLUX"])),\
- ("configuration file must contain influx host, ",
- " port, and database name")
- database_url = "influxdb://"
- if "INFLUX_USER" in cfg["INFLUX"]:
- database_url += cfg["INFLUX"]["INFLUX_USER"]
- if "INFLUX_PASSWORD" in cfg["INFLUX"]:
- database_url += ":" + cfg["INFLUX"]["INFLUX_PASSWORD"]
- database_url += "@{0}:{1}/{2}".format(
- cfg["INFLUX"]["INFLUX_HOST"],
- cfg["INFLUX"]["INFLUX_PORT"],
- cfg["INFLUX"]["INFLUX_DATABASE_NAME"])
- return database_url
- except Exception as e:
- self._logger.log_and_raise_error(
- ("Could not parse url from configuration file. "
- "Exit with error {}".format(e)))
- def query_to_dataframe(self, query: str) -> pd.DataFrame:
- """
- :param query: DESCRIPTION
- :type query: str
- :return: DESCRIPTION
- :rtype: TYPE
- """
- try:
- # result of the query is a defaultdict
- result = self.client.query(query)
- return list(result.values())[0]
- except Exception as e:
- self._logger.log_and_raise_error(
- ("Could not query to dataframe. "
- "Exit with error {}".format(e)))
- def query_between_dates(self, columns: str,
- tables: str,
- start: str,
- stop: str) -> pd.DataFrame:
- """
- :param columns: DESCRIPTION
- :type columns: str
- :param tables: DESCRIPTION
- :type tables: str
- :param start: DESCRIPTION
- :type start: str
- :param stop: DESCRIPTION
- :type stop: str
- :return: DESCRIPTION
- :rtype: TYPE
- """
- if not isinstance(start, str):
- start = datetime.strftime(start, format="%Y-%m-%dT%H:%M:%SZ")
- if not isinstance(stop, str):
- stop = datetime.strftime(stop, format="%Y-%m-%dT%H:%M:%SZ")
- query = 'SELECT ' +\
- columns +\
- ' FROM \"' +\
- tables +\
- '\" WHERE time > \'' +\
- str(start) +\
- '\' AND time < \'' +\
- str(stop) +\
- '\' tz(\'Europe/Berlin\');'
- return self.query_to_dataframe(query)
- def insert_dataframe(self, dataframe: pd.DataFrame,
- tag_columns: list[str] = None,
- batch_size: int = 10000,
- time_precision: str = 'u'):
- """
- Writes each column of the dataframe which is not
- in tag_columns as a separate measurement to the database.
-
- Tag columns are put as tags to each measurement.
-
- The dataframe has to have a datatime index!
-
- :param dataframe: dataframe to write to the database
- :type dataframe: pd.DataFrame
- :param tag_columns: column names to be used as tags
- :type tag_columns: list
- :param betch_size:
- :type batch_size: int
- :param time_precision:
- :type tiime_precision: str
- """
-
- tags = {col: dataframe[col] for col in tag_columns}
-
- measurement_columns = [c for c in dataframe.columns
- if c not in tag_columns]
-
- for column in measurement_columns:
- try:
- self.client.write_points(
- dataframe=dataframe[column].to_frame(),
- measurement=column,
- tags=tags,
- protocol='line',
- batch_size=batch_size,
- time_precision=time_precision)
- except Exception as error:
- self._logger.loger_and_raise_error(
- ('Could not insert data, Error: {}'.format(error)))
|