#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Tue Feb 23 19:44:22 2021 @author: tanya """ from cdplib.log import Log import pandas as pd from influxdb import DataFrameClient from datetime import datetime class InfluxdbHandler: """ """ def __init__(self, database_url: str = None): """ :param database_url: DESCRIPTION :type database_url: str :return: DESCRIPTION :rtype: TYPE """ self._logger = Log("InfluxdbHandler:") if database_url is None: database_url = self._read_url_from_env() self.client = DataFrameClient.from_dsn(database_url) def _read_url_from_env(self) -> str: """ :return: parse database url from the configuration object. configuration object is create by the script /libraries.configuration.py and uses a configuration file (by default .env) :rtype: str """ try: from libraries.configuration import default as cfg assert(cfg["INFLUX"] is not None),\ "configuration file must contain [INFLUX]" assert(set(["INFLUX_HOST", "INFLUX_PORT", "INFLUX_DATABASE_NAME"]) <= set(cfg["INFLUX"])),\ ("configuration file must contain influx host, ", " port, and database name") database_url = "influxdb://" if "INFLUX_USER" in cfg["INFLUX"]: database_url += cfg["INFLUX"]["INFLUX_USER"] if "INFLUX_PASSWORD" in cfg["INFLUX"]: database_url += ":" + cfg["INFLUX"]["INFLUX_PASSWORD"] database_url += "@{0}:{1}/{2}".format( cfg["INFLUX"]["INFLUX_HOST"], cfg["INFLUX"]["INFLUX_PORT"], cfg["INFLUX"]["INFLUX_DATABASE_NAME"]) return database_url except Exception as e: self._logger.log_and_raise_error( ("Could not parse url from configuration file. " "Exit with error {}".format(e))) def query_to_dataframe(self, query: str) -> pd.DataFrame: """ :param query: DESCRIPTION :type query: str :return: DESCRIPTION :rtype: TYPE """ try: # result of the query is a defaultdict result = self.client.query(query) if len(list(result.values())) > 0: return list(result.values())[0] else: return pd.DataFrame() except Exception as e: self._logger.log_and_raise_error( ("Could not query to dataframe. " "Exit with error {}".format(e))) def query_between_dates(self, columns: str, tables: str, start: str = None, stop: str = None) -> pd.DataFrame: """ :param columns: DESCRIPTION :type columns: str :param tables: DESCRIPTION :type tables: str :param start: DESCRIPTION :type start: str :param stop: DESCRIPTION :type stop: str :return: DESCRIPTION :rtype: TYPE """ if (start is not None) and (not isinstance(start, str)): start = datetime.strftime(start, format="%Y-%m-%dT%H:%M:%SZ") if (stop is not None) and (not isinstance(stop, str)): stop = datetime.strftime(stop, format="%Y-%m-%dT%H:%M:%SZ") query = 'SELECT ' + columns + ' FROM \"' + tables if (start is not None) and (stop is not None): query += '\" WHERE time > \'' +\ str(start) +\ '\' AND time < \'' +\ str(stop) +\ '\' tz(\'Europe/Berlin\');' elif start is not None: query += '\" WHERE time >= \'' + str(start) +\ '\' tz(\'Europe/Berlin\');' elif stop is not None: query += '\" WHERE time <= \'' + str(stop) +\ '\' tz(\'Europe/Berlin\');' else: query += ';' return self.query_to_dataframe(query) def insert_dataframe(self, dataframe: pd.DataFrame, tag_columns: list = [], batch_size: int = 10000, time_precision: str = 'u'): """ Writes each column of the dataframe which is not in tag_columns as a separate measurement to the database. Tag columns are put as tags to each measurement. The dataframe has to have a datatime index! :param dataframe: dataframe to write to the database :type dataframe: pd.DataFrame :param tag_columns: column names to be used as tags :type tag_columns: list :param betch_size: :type batch_size: int :param time_precision: :type tiime_precision: str """ measurement_columns = [c for c in dataframe.columns if c not in (tag_columns or [])] for column in measurement_columns: try: self.client.write_points( dataframe=dataframe[[column] + (tag_columns or [])], measurement=column, tag_columns=tag_columns, protocol='line', batch_size=batch_size, time_precision=time_precision) except Exception as error: self._logger.log_and_raise_error( ('Could not insert data, Error: {}'.format(error))) if __name__ == "__main__": influx_handler = InfluxdbHandler()