Explorar o código

Merge branch 'master' of https://intra.acdp.at/gogs/tanja/cdplib

ogert %!s(int64=3) %!d(string=hai) anos
pai
achega
623f748855
Modificáronse 2 ficheiros con 218 adicións e 18 borrados
  1. 164 0
      cdplib/db_handlers/InfluxdbHandler.py
  2. 54 18
      cdplib/db_handlers/MongodbHandler.py

+ 164 - 0
cdplib/db_handlers/InfluxdbHandler.py

@@ -0,0 +1,164 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Tue Feb 23 19:44:22 2021
+
+@author: tanya
+"""
+
+from cdplib.log import Log
+
+import pandas as pd
+
+from influxdb import DataFrameClient
+
+from datetime import datetime
+
+
+class InfluxdbHandler:
+    """
+    """
+    def __init__(self, database_url: str = None):
+        """
+        :param database_url: DESCRIPTION
+        :type database_url: str
+        :return: DESCRIPTION
+        :rtype: TYPE
+        """
+        self._logger = Log("InfluxdbHandler:")
+
+        if database_url is None:
+            database_url = self._read_url_from_env()
+
+        self.client = DataFrameClient.from_dsn(database_url)
+
+    def _read_url_from_env(self) -> str:
+        """
+        :return: parse database url from the configuration object.
+         configuration object is create by the script
+         /libraries.configuration.py and uses a configuration file
+         (by default .env)
+        :rtype: str
+
+        """
+        try:
+            from libraries.configuration import default as cfg
+
+            assert(cfg["INFLUX"] is not None),\
+                "configuration file must contain [INFLUX]"
+
+            assert(set(["INFLUX_HOST", "INFLUX_PORT", "INFLUX_DATABASE_NAME"])
+                   <= set(cfg["INFLUX"])),\
+                ("configuration file must contain influx host, ",
+                 " port, and database name")
+
+            database_url = "influxdb://"
+
+            if "INFLUX_USER" in cfg["INFLUX"]:
+                database_url += cfg["INFLUX"]["INFLUX_USER"]
+
+            if "INFLUX_PASSWORD" in cfg["INFLUX"]:
+                database_url += ":" + cfg["INFLUX"]["INFLUX_PASSWORD"]
+
+            database_url += "@{0}:{1}/{2}".format(
+                cfg["INFLUX"]["INFLUX_HOST"],
+                cfg["INFLUX"]["INFLUX_PORT"],
+                cfg["INFLUX"]["INFLUX_DATABASE_NAME"])
+
+            return database_url
+
+        except Exception as e:
+            self._logger.log_and_raise_error(
+                ("Could not parse url from configuration file. "
+                 "Exit with error {}".format(e)))
+
+    def query_to_dataframe(self, query: str) -> pd.DataFrame:
+        """
+        :param query: DESCRIPTION
+        :type query: str
+        :return: DESCRIPTION
+        :rtype: TYPE
+        """
+        try:
+            # result of the query is a defaultdict
+            result = self.client.query(query)
+
+            return list(result.values())[0]
+        except Exception as e:
+            self._logger.log_and_raise_error(
+                ("Could not query to dataframe. "
+                 "Exit with error {}".format(e)))
+
+    def query_between_dates(self, columns: str,
+                            tables: str,
+                            start: str,
+                            stop: str) -> pd.DataFrame:
+        """
+        :param columns: DESCRIPTION
+        :type columns: str
+        :param tables: DESCRIPTION
+        :type tables: str
+        :param start: DESCRIPTION
+        :type start: str
+        :param stop: DESCRIPTION
+        :type stop: str
+        :return: DESCRIPTION
+        :rtype: TYPE
+
+        """
+        if not isinstance(start, str):
+            start = datetime.strftime(start, format="%Y-%m-%dT%H:%M:%SZ")
+
+        if not isinstance(stop, str):
+            stop = datetime.strftime(stop, format="%Y-%m-%dT%H:%M:%SZ")
+
+        query = 'SELECT ' +\
+                columns +\
+                ' FROM \"' +\
+                tables +\
+                '\" WHERE time > \'' +\
+                str(start) +\
+                '\' AND time  < \'' +\
+                str(stop) +\
+                '\' tz(\'Europe/Berlin\');'
+
+        return self.query_to_dataframe(query)
+
+    def insert_dataframe(self, dataframe: pd.DataFrame,
+                         tag_columns: list[str] = None,
+                         batch_size: int = 10000,
+                         time_precision: str = 'u'):
+        """
+        Writes each column of the dataframe which is not 
+        in tag_columns as a separate measurement to the database.
+        
+        Tag columns are put as tags to each measurement.
+        
+        The dataframe has to have a datatime index!
+        
+        :param dataframe: dataframe to write to the database
+        :type dataframe: pd.DataFrame
+        :param tag_columns: column names to be used as tags
+        :type tag_columns: list
+        :param betch_size:
+        :type batch_size: int
+        :param time_precision:
+        :type tiime_precision: str
+        """
+        
+        measurement_columns = [c for c in dataframe.columns
+                               if c not in tag_columns]
+        
+        for column in measurement_columns:
+            try:
+                self.client.write_points(
+                    dataframe=dataframe[[column] + tag_columns],
+                    measurement=column,
+                    tag_columns=tag_columns,
+                    protocol='line',
+                    batch_size=batch_size,
+                    time_precision=time_precision)
+
+            except Exception as error:
+                self._logger.log_and_raise_error(
+                    ('Could not insert data, Error: {}'.format(error)))

+ 54 - 18
cdplib/db_handlers/MongodbHandler.py

@@ -278,7 +278,7 @@ class MongodbHandler:
                     query = find_query
 
                 data = self._database[collection_name].find(query,return_values)
-                
+
             else:
                 query = {attribute: {comparison_operator: attribute_value}}
                 data = self._database[collection_name].find(query, return_values)
@@ -293,6 +293,33 @@ class MongodbHandler:
             else:
                 return data
 
+    def aggregate_and_insert_into_collection(self,
+                                         input_collection_name: str,
+                                         output_collection_name: str,
+                                         aggregation_pipeline: list = None):
+        """
+        """
+        if aggregation_pipeline is None:
+            aggregation_pipeline = [{"$out": output_collection_name}]
+        else:
+            aggregation_pipeline.append({"$out": output_collection_name})
+
+        self.aggregate_data_and_generate_dataframe(
+                collection_name=input_collection_name,
+                aggregation_pipeline=aggregation_pipeline)
+
+    def index_collection(self, collection_name: str, keys: list):
+        """
+        :param keys: compound indexes for the collection,
+         is either a list of tuples of shape (field_name, 1) or (field_name, -1)
+         for the indexing order, or a tuple of field namse, then the second element of the
+         tuple is set to 1
+        """
+        keys = [(key, 1) if not isinstance(key, tuple) else key for key in keys]
+
+        self._database[collection_name].create_index(keys)
+
+
     def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list, index: str = None, return_as_dataframe=True):
 
         try:
@@ -300,18 +327,18 @@ class MongodbHandler:
         except Exception as error:
             self._log.log_and_raise_error(('A problem occured when aggregating the collection {} with the pipeline {}. \nError: {}').format(collection_name, aggregation_pipeline, error))
             return None
-       
+
         if return_as_dataframe:
             return self.convert_mongo_data_into_dataframe(data, index, collection_name)
         else:
             return data
-      
+
     def convert_mongo_data_into_dataframe(self, data, index: str = None, collection_name: str = None, chunksize: int = 500) -> pd.DataFrame():
 
         start_time = time.time()
         '''
         self._log.info('Converting returned mongo data into a DataFrame')
-        
+
         data = list(data)
         try:
             if len(data)> 0:
@@ -330,7 +357,7 @@ class MongodbHandler:
         except Exception as error:
             self._log.log_and_raise_error(('An error occured trying to convert mongo data into pd.Dataframe. \nError: {} ').format(error))
         '''
-    
+
         frames = []
         records = []
         for iteration, value in enumerate(data):
@@ -356,11 +383,11 @@ class MongodbHandler:
             return_df.set_index(index, inplace=True)
 
         self._log.info(('{} Rows were fetched from {}. DataFrame conversion is done, took {} seconds').format(len(return_df.index), collection_name if collection_name is not None else 'the database', time.time()-start_time))
-        
+
         return return_df
 
- 
-        
+
+
 
     #def update_data_in_collection(self, query_label: str, query_value: str, update_label:str, update_value: str, collection_name:str):
     #    self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}})
@@ -382,7 +409,7 @@ class MongodbHandler:
         '''
         if type(data) == list:
             self._database[collection_name].update_one({query_label:query_value}, {"$push": {update_label: {"$each": data}}})
-            self._log.info(('A document has been pushed into the {} array in the {} collection').format(query_value, collection_name))   
+            self._log.info(('A document has been pushed into the {} array in the {} collection').format(query_value, collection_name))
         else:
             self._database[collection_name].update_one({query_label:query_value}, {"$push": {update_label: data}})
             self._log.info(('A document has been pushed into the {} array in the {} collection').format(query_value, collection_name))
@@ -410,7 +437,7 @@ class MongodbHandler:
         '''
         assert(isinstance(collection_name, str)),\
             "Parameter 'collection_name' must be a string type"
-       
+
         if return_values is None:
             return_values = {'_id': return_id}
 
@@ -422,7 +449,7 @@ class MongodbHandler:
                     query = find_query
                 else:
                     query = {date_label: {'$gt': from_date_value, '$lt': to_date_value}}
-                
+
                 data = self._database[collection_name].find(query, return_values)
 
             except Exception as error:
@@ -492,8 +519,8 @@ class MongodbHandler:
         if find_query is None:
             if query_label and query_value:
                 find_query = {query_label:query_value}
-                
-                                             
+
+
         try:
             if update_many:
                 if find_query is not None:
@@ -528,17 +555,26 @@ class MongodbHandler:
             return [value[query_label] for value in data]
         else:
             return []
-        
+
     def get_distinct_value_of_key(self, collection_name: str, key: str):
-        
+
         assert(isinstance(collection_name, str)),\
             "Parameter 'collection_name' must be a string type"
-        
+
         assert(isinstance(key, str)),\
             "Parameter 'key' must be a string type"
-        
+
         data = self._database[collection_name].distinct(key)
-        
+
+        return data
+
+    def get_number_of_entries_in_collection(self, collection_name: str):
+
+        assert(isinstance(collection_name, str)),\
+            "Parameter 'collection_name' must be a string type"
+
+        data = self._database[collection_name].count()
+
         return data