InfluxdbHandler.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Tue Feb 23 19:44:22 2021
  5. @author: tanya
  6. """
  7. from cdplib.log import Log
  8. import pandas as pd
  9. from influxdb import DataFrameClient
  10. from datetime import datetime
  11. class InfluxdbHandler:
  12. """
  13. """
  14. def __init__(self, database_url: str = None):
  15. """
  16. :param database_url: DESCRIPTION
  17. :type database_url: str
  18. :return: DESCRIPTION
  19. :rtype: TYPE
  20. """
  21. self._logger = Log("InfluxdbHandler:")
  22. if database_url is None:
  23. database_url = self._read_url_from_env()
  24. self.client = DataFrameClient.from_dsn(database_url)
  25. def _read_url_from_env(self) -> str:
  26. """
  27. :return: parse database url from the configuration object.
  28. configuration object is create by the script
  29. /libraries.configuration.py and uses a configuration file
  30. (by default .env)
  31. :rtype: str
  32. """
  33. try:
  34. from libraries.configuration import default as cfg
  35. assert(cfg["INFLUX"] is not None),\
  36. "configuration file must contain [INFLUX]"
  37. assert(set(["INFLUX_HOST", "INFLUX_PORT", "INFLUX_DATABASE_NAME"])
  38. <= set(cfg["INFLUX"])),\
  39. ("configuration file must contain influx host, ",
  40. " port, and database name")
  41. database_url = "influxdb://"
  42. if "INFLUX_USER" in cfg["INFLUX"]:
  43. database_url += cfg["INFLUX"]["INFLUX_USER"]
  44. if "INFLUX_PASSWORD" in cfg["INFLUX"]:
  45. database_url += ":" + cfg["INFLUX"]["INFLUX_PASSWORD"]
  46. database_url += "@{0}:{1}/{2}".format(
  47. cfg["INFLUX"]["INFLUX_HOST"],
  48. cfg["INFLUX"]["INFLUX_PORT"],
  49. cfg["INFLUX"]["INFLUX_DATABASE_NAME"])
  50. return database_url
  51. except Exception as e:
  52. self._logger.log_and_raise_error(
  53. ("Could not parse url from configuration file. "
  54. "Exit with error {}".format(e)))
  55. def query_to_dataframe(self, query: str) -> pd.DataFrame:
  56. """
  57. :param query: DESCRIPTION
  58. :type query: str
  59. :return: DESCRIPTION
  60. :rtype: TYPE
  61. """
  62. try:
  63. # result of the query is a defaultdict
  64. result = self.client.query(query)
  65. return list(result.values())[0]
  66. except Exception as e:
  67. self._logger.log_and_raise_error(
  68. ("Could not query to dataframe. "
  69. "Exit with error {}".format(e)))
  70. def query_between_dates(self, columns: str,
  71. tables: str,
  72. start: str,
  73. stop: str) -> pd.DataFrame:
  74. """
  75. :param columns: DESCRIPTION
  76. :type columns: str
  77. :param tables: DESCRIPTION
  78. :type tables: str
  79. :param start: DESCRIPTION
  80. :type start: str
  81. :param stop: DESCRIPTION
  82. :type stop: str
  83. :return: DESCRIPTION
  84. :rtype: TYPE
  85. """
  86. if not isinstance(start, str):
  87. start = datetime.strftime(start, format="%Y-%m-%dT%H:%M:%SZ")
  88. if not isinstance(stop, str):
  89. stop = datetime.strftime(stop, format="%Y-%m-%dT%H:%M:%SZ")
  90. query = 'SELECT ' +\
  91. columns +\
  92. ' FROM \"' +\
  93. tables +\
  94. '\" WHERE time > \'' +\
  95. str(start) +\
  96. '\' AND time < \'' +\
  97. str(stop) +\
  98. '\' tz(\'Europe/Berlin\');'
  99. return self.query_to_dataframe(query)
  100. def insert_dataframe(self, dataframe: pd.DataFrame,
  101. tag_columns: list[str] = None,
  102. batch_size: int = 10000,
  103. time_precision: str = 'u'):
  104. """
  105. Writes each column of the dataframe which is not
  106. in tag_columns as a separate measurement to the database.
  107. Tag columns are put as tags to each measurement.
  108. The dataframe has to have a datatime index!
  109. :param dataframe: dataframe to write to the database
  110. :type dataframe: pd.DataFrame
  111. :param tag_columns: column names to be used as tags
  112. :type tag_columns: list
  113. :param betch_size:
  114. :type batch_size: int
  115. :param time_precision:
  116. :type tiime_precision: str
  117. """
  118. tags = {col: dataframe[col] for col in tag_columns}
  119. measurement_columns = [c for c in dataframe.columns
  120. if c not in tag_columns]
  121. for column in measurement_columns:
  122. try:
  123. self.client.write_points(
  124. dataframe=dataframe[column].to_frame(),
  125. measurement=column,
  126. tags=tags,
  127. protocol='line',
  128. batch_size=batch_size,
  129. time_precision=time_precision)
  130. except Exception as error:
  131. self._logger.loger_and_raise_error(
  132. ('Could not insert data, Error: {}'.format(error)))