InfluxdbHandler.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Tue Feb 23 19:44:22 2021
  5. @author: tanya
  6. """
  7. from cdplib.log import Log
  8. import pandas as pd
  9. from influxdb import DataFrameClient
  10. from datetime import datetime
  11. class InfluxdbHandler:
  12. """
  13. """
  14. def __init__(self, database_url: str = None):
  15. """
  16. :param database_url: DESCRIPTION
  17. :type database_url: str
  18. :return: DESCRIPTION
  19. :rtype: TYPE
  20. """
  21. self._logger = Log("InfluxdbHandler:")
  22. if database_url is None:
  23. database_url = self._read_url_from_env()
  24. self.client = DataFrameClient.from_dsn(database_url)
  25. def _read_url_from_env(self) -> str:
  26. """
  27. :return: parse database url from the configuration object.
  28. configuration object is create by the script
  29. /libraries.configuration.py and uses a configuration file
  30. (by default .env)
  31. :rtype: str
  32. """
  33. try:
  34. from libraries.configuration import default as cfg
  35. assert(cfg["INFLUX"] is not None),\
  36. "configuration file must contain [INFLUX]"
  37. assert(set(["INFLUX_HOST", "INFLUX_PORT", "INFLUX_DATABASE_NAME"])
  38. <= set(cfg["INFLUX"])),\
  39. ("configuration file must contain influx host, ",
  40. " port, and database name")
  41. database_url = "influxdb://"
  42. if "INFLUX_USER" in cfg["INFLUX"]:
  43. database_url += cfg["INFLUX"]["INFLUX_USER"]
  44. if "INFLUX_PASSWORD" in cfg["INFLUX"]:
  45. database_url += ":" + cfg["INFLUX"]["INFLUX_PASSWORD"]
  46. database_url += "@{0}:{1}/{2}".format(
  47. cfg["INFLUX"]["INFLUX_HOST"],
  48. cfg["INFLUX"]["INFLUX_PORT"],
  49. cfg["INFLUX"]["INFLUX_DATABASE_NAME"])
  50. return database_url
  51. except Exception as e:
  52. self._logger.log_and_raise_error(
  53. ("Could not parse url from configuration file. "
  54. "Exit with error {}".format(e)))
  55. def query_to_dataframe(self, query: str) -> pd.DataFrame:
  56. """
  57. :param query: DESCRIPTION
  58. :type query: str
  59. :return: DESCRIPTION
  60. :rtype: TYPE
  61. """
  62. try:
  63. # result of the query is a defaultdict
  64. result = self.client.query(query)
  65. if len(list(result.values())) > 0:
  66. return list(result.values())[0]
  67. else:
  68. return pd.DataFrame()
  69. except Exception as e:
  70. self._logger.log_and_raise_error(
  71. ("Could not query to dataframe. "
  72. "Exit with error {}".format(e)))
  73. def query_between_dates(self, columns: str,
  74. tables: str,
  75. start: str = None,
  76. stop: str = None) -> pd.DataFrame:
  77. """
  78. :param columns: DESCRIPTION
  79. :type columns: str
  80. :param tables: DESCRIPTION
  81. :type tables: str
  82. :param start: DESCRIPTION
  83. :type start: str
  84. :param stop: DESCRIPTION
  85. :type stop: str
  86. :return: DESCRIPTION
  87. :rtype: TYPE
  88. """
  89. if (start is not None) and (not isinstance(start, str)):
  90. start = datetime.strftime(start, format="%Y-%m-%dT%H:%M:%SZ")
  91. if (stop is not None) and (not isinstance(stop, str)):
  92. stop = datetime.strftime(stop, format="%Y-%m-%dT%H:%M:%SZ")
  93. query = 'SELECT ' + columns + ' FROM \"' + tables
  94. if (start is not None) and (stop is not None):
  95. query += '\" WHERE time > \'' +\
  96. str(start) +\
  97. '\' AND time < \'' +\
  98. str(stop) +\
  99. '\' tz(\'Europe/Berlin\');'
  100. elif start is not None:
  101. query += '\" WHERE time >= \'' + str(start) +\
  102. '\' tz(\'Europe/Berlin\');'
  103. elif stop is not None:
  104. query += '\" WHERE time <= \'' + str(stop) +\
  105. '\' tz(\'Europe/Berlin\');'
  106. else:
  107. query += ';'
  108. return self.query_to_dataframe(query)
  109. def insert_dataframe(self, dataframe: pd.DataFrame,
  110. tag_columns: list = [],
  111. batch_size: int = 10000,
  112. time_precision: str = 'u'):
  113. """
  114. Writes each column of the dataframe which is not
  115. in tag_columns as a separate measurement to the database.
  116. Tag columns are put as tags to each measurement.
  117. The dataframe has to have a datatime index!
  118. :param dataframe: dataframe to write to the database
  119. :type dataframe: pd.DataFrame
  120. :param tag_columns: column names to be used as tags
  121. :type tag_columns: list
  122. :param betch_size:
  123. :type batch_size: int
  124. :param time_precision:
  125. :type tiime_precision: str
  126. """
  127. measurement_columns = [c for c in dataframe.columns
  128. if c not in (tag_columns or [])]
  129. for column in measurement_columns:
  130. try:
  131. self.client.write_points(
  132. dataframe=dataframe[[column] + (tag_columns or [])],
  133. measurement=column,
  134. tag_columns=tag_columns,
  135. protocol='line',
  136. batch_size=batch_size,
  137. time_precision=time_precision)
  138. except Exception as error:
  139. self._logger.log_and_raise_error(
  140. ('Could not insert data, Error: {}'.format(error)))
  141. if __name__ == "__main__":
  142. influx_handler = InfluxdbHandler()