tanja преди 3 години
родител
ревизия
7c3e6b725b
променени са 9 файла, в които са добавени 315 реда и са изтрити 0 реда
  1. 0 0
      .gitignore
  2. 23 0
      cdplib/db_handlers/InfluxdbHandler.py
  3. 0 0
      classes.png
  4. 0 0
      hooks/README.txt
  5. 0 0
      hooks/pre-commit
  6. 0 0
      packages.png
  7. 0 0
      setup.py
  8. 115 0
      tests/testSQLOperations.py
  9. 177 0
      tests/testStatisticalFeatures.py

+ 0 - 0
.gitignore


+ 23 - 0
cdplib/db_handlers/InfluxdbHandler.py

@@ -123,3 +123,26 @@ class InfluxdbHandler:
                 '\' tz(\'Europe/Berlin\');'
 
         return self.query_to_dataframe(query)
+
+    def insert_dataframe(self, dataframe: pd.DataFrame,
+                         batch_size: int = 10000,
+                         time_precision: str = 'u'):
+        """
+        :param dataframe: DESCRIPTION
+        :type dataframe: pd.DataFrame
+        :return: DESCRIPTION
+        :rtype: TYPE
+
+        """
+        for column in dataframe.columns:
+            try:
+                self.client.write_points(
+                    dataframe=dataframe[column].to_frame(),
+                    measurement=column,
+                    protocol='line',
+                    batch_size=batch_size,
+                    time_precision=time_precision)
+
+            except Exception as error:
+                self._logger.loger_and_raise_error(
+                    ('Could not insert data, Error: {}'.format(error)))

+ 0 - 0
classes.png


+ 0 - 0
hooks/README.txt


+ 0 - 0
hooks/pre-commit


+ 0 - 0
packages.png


+ 0 - 0
setup.py


+ 115 - 0
tests/testSQLOperations.py

@@ -0,0 +1,115 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Wed Sep 19 14:34:22 2018
+
+@author: tanya
+"""
+
+import os
+import unittest
+import pandas as pd
+
+from libraries.database_operations_library import SQLOperations
+
+
+class TestSQLOperations(unittest.TestCase):
+    '''
+    '''
+    def __init__(self, test_df = None):
+        print('\n', '='*5, 'Testing class : SQLOperations', '='*5)
+        self.inst = SQLOperations(db_url = None)
+        print('Connected to', self.inst.db_url)
+        
+        if test_df is None:
+            self.test_df = pd.DataFrame({'a' : [1,2,3,4,5], 'b' : ['A', 'B', 'C', 'A', 'V'], 'c' : [0.1, 0.2, 0.3, 0.4, 0.5]})
+        else:
+            self.test_df = test_df
+
+        
+    def _create_test_table(self, test_tablename, create_table_query = None):
+        '''
+        '''
+        self.inst.drop_table_if_exists(test_tablename)        
+        
+        if create_table_query is None:
+            if 'ibm_db' in self.inst.db_url:
+                create_table_query = """CREATE TABLE {} (
+                                        a INT,
+                                        b CHAR,
+                                        c DECIMAL(10 , 2 )
+                                        );""".format(test_tablename)
+            else:
+                create_table_query = """CREATE TABLE test (
+                                        a INT,
+                                        b TEXT,
+                                        c DECIMAL(10 , 2 )
+                                        );"""
+                
+        self.inst.execute(create_table_query)
+
+
+        
+class TestExecute(TestSQLOperations):
+    '''
+    '''
+    def __init__(self):
+        super(TestExecute, self).__init__()
+        print('\n', '-'*2, 'Testing method : execute')
+        
+    def test_create_table(self, test_tablename, create_table_query = None):
+        '''
+        '''
+        print('-'*4, 'Testing create table operation')
+        self._create_test_table(test_tablename = test_tablename, create_table_query = create_table_query)
+        self.assertTrue(self.inst.check_if_table_exists(test_tablename))
+        self.inst.drop_table_if_exists(test_tablename)
+        print('-'*4, 'Test ran successfully!')
+        
+class TestLoad_csv_to_db(TestSQLOperations):
+    '''
+    '''
+    def __init__(self):
+        super(TestLoad_csv_to_db, self).__init__()
+        print('\n', '-'*2, 'Testing method : load_csv_to_db')
+    
+    def test_correct_content(self, test_csv_path, test_tablename, create_table_query = None):
+        '''
+        '''
+        print('-'*4, 'Testig that load operation gives correct result')
+        os.makedirs(os.path.dirname(test_csv_path), exist_ok = True)
+        if not self.inst.drop_table_if_exists(test_tablename):
+            self._create_test_table(test_tablename)
+    
+        self.test_df.to_csv(test_csv_path, index = False)
+        self.inst.load_csv_to_db(csv_path = test_csv_path, tablename = test_tablename)
+        try:
+            connection = self.inst.engine.connect()
+            test_df_from_sql = pd.read_sql(sql = "SELECT * FROM test", con = connection)
+            connection.close()
+        except Exception as e:
+            raise Exception('ERROR: test csv file has not been load to sql at all, \n, exit with {}'.format(e))
+        
+        print('-'*4, 'Testing data has correct shape')
+        self.assertTupleEqual(self.test_df.shape, test_df_from_sql.shape)
+        
+        print('-'*4,'Testing data has correct columns')
+        self.assertSetEqual(set(self.test_df.columns), set(test_df_from_sql.columns))
+        
+        print('-'*4,'Testing data has correct content')
+        for col in self.test_df.columns:
+            test_df_from_sql[col] = test_df_from_sql[col].astype(self.test_df[col].dtype)
+        pd.testing.assert_frame_equal(self.test_df, test_df_from_sql)
+        
+        print('-'*4, 'Test ran successfully!')
+
+                        
+if __name__ == '__main__':
+    
+    test_tablename = 'test10'
+    test_csv_path = '/home/tanya/acdp/data_samples/test.csv'
+    
+    TestExecute().test_create_table(test_tablename = test_tablename)
+    TestLoad_csv_to_db().test_correct_content(test_csv_path = test_csv_path, test_tablename = test_tablename)
+    
+    print('Done!', '\n')

+ 177 - 0
tests/testStatisticalFeatures.py

@@ -0,0 +1,177 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Thu Oct 18 16:26:47 2018
+
+@author: tanya
+"""
+
+import os
+import unittest
+import logging
+import pandas as pd
+import numpy as np
+
+from pandas.util.testing import assert_frame_equal
+
+from libraries.feature_engineering.in_memory_feature_engineering.StatisticalFeatures import StatisticalFeatures
+from libraries.logging.logging_utils import configure_logging
+
+
+class TestStatisticalFeatures(unittest.TestCase):
+    '''
+    '''
+    def __init__(self, data = None, index_cols = None, path_to_log = None):
+        '''
+        '''        
+        if index_cols is None:
+            self.index_cols = ['id1', 'id2']
+        else:
+            self.index_cols = index_cols
+        
+        if data is None:
+            self.data = pd.DataFrame({'int' : [1,2,3,2,55,3,7],
+                                                     'float' : [0.1, 7, 0.1, 99.9, 99.9, np.nan, 7],
+                                                     'str' : ['a', np.nan, 'c', 'a', 'a', '', 'c'],
+                                                     'datetime' : [pd.datetime(2017, 1, 2), np.nan, pd.datetime(2017, 5, 3), pd.datetime(2017, 1, 4),
+                                                                   '2018-01-19', pd.datetime(2018, 1, 4), pd.datetime(2019, 3, 23)],              
+                                                     'nan' : [np.nan]*7,
+                                                     'id1' : [1,1,3,3,3,1,1],
+                                                     'id2' : ['a', 'a', 'b', 'b', 'a', 'a', np.nan]})\
+                                                     .sort_values(by = self.index_cols)
+        else:
+            self.data = data
+            
+        
+        self.obj = StatisticalFeatures(data = self.data, index_cols = self.index_cols, path_to_log = path_to_log)
+            
+class TestKpisByAggregation(TestStatisticalFeatures):
+    '''
+    '''
+    def __init__(self, data = None, index_cols = None, path_to_log = None):
+        '''
+        '''
+        super(TestKpisByAggregation, self).__init__(data = data, index_cols = index_cols, path_to_log = path_to_log)
+    
+    def test_builtin_aggfuncs_numeric_cols(self, answer = None, kpis = None):
+        '''Tests the expected behaviour of pandas builtin aggregation function,
+           in particular behaviour with missing values
+           
+           :param DataFrame data:
+           :param list index_cols:
+           :param DataFrame answer:
+           :param list of tuples or dict kpis:    
+        '''
+        kpis = kpis or [('int', ['min', 'std']),
+                        ('float', ['mean', np.sum]),
+                        ('float', 'sum'),
+                        ('nan', 'mean')]
+            
+        
+        answer = answer or pd.DataFrame([
+                            {'id1' : 1, 'id2' : 'a', 'int_min' : 1, 'int_std' : pd.Series([1,2,3]).std(), 'float_mean' : np.mean([0.1, 7.0]), 'float_sum' : 7.1, 'nan_mean' : np.nan},
+                            {'id1' : 3, 'id2' : 'b', 'int_min' : 2, 'int_std' : pd.Series([2,3]).std(), 'float_mean' : np.mean([0.1, 99.9]), 'float_sum' : 100, 'nan_mean' : np.nan},
+                            {'id1' : 3, 'id2' : 'a', 'int_min' : 55, 'int_std' : np.nan, 'float_mean' : 99.9, 'float_sum' : 99.9, 'nan_mean' : np.nan},
+                            ]).sort_values(self.index_cols).set_index(self.index_cols)
+            
+        result = self.obj.get_kpis_by_aggregation(kpis = kpis).sort_values(self.index_cols).set_index(self.index_cols)
+        
+        assert_frame_equal(result, answer[result.columns])
+        
+        
+    def test_dict_kpi(self, kpis = None, answer = None):
+        '''
+        '''
+        kpis = kpis or {'int' : ['min', 'std'], 'float' : 'mean'}
+            
+        answer = answer or pd.DataFrame([
+                            {'id1' : 1, 'id2' : 'a', 'int_min' : 1, 'int_std' : pd.Series([1,2,3]).std(), 'float_mean' : np.mean([0.1, 7.0])},
+                            {'id1' : 3, 'id2' : 'b', 'int_min' : 2, 'int_std' : pd.Series([2,3]).std(), 'float_mean' : np.mean([0.1, 99.9])},
+                            {'id1' : 3, 'id2' : 'a', 'int_min' : 55, 'int_std' : np.nan, 'float_mean' : 99.9},
+                            ]).sort_values(self.index_cols).set_index(self.index_cols)
+                
+        result = self.obj.get_kpis_by_aggregation(kpis = kpis).sort_values(self.index_cols).set_index(self.index_cols)
+        
+        assert_frame_equal(result, answer[result.columns])
+        
+        
+    def test_string_cols(self, kpis = None, answer = None):
+        '''
+        '''
+        kpis = kpis or {'str' : ['sum']}
+            
+        answer = answer or pd.DataFrame([
+                            {'id1' : 1, 'id2' : 'a', 'str_sum' : 'anan'},
+                            {'id1' : 3, 'id2' : 'b', 'str_sum' : 'ca'},
+                            {'id1' : 3, 'id2' : 'a', 'str_sum' : 'a'},
+                            ]).sort_values(self.index_cols).set_index(self.index_cols)
+                
+        result = self.obj.get_kpis_by_aggregation(kpis = kpis).sort_values(self.index_cols).set_index(self.index_cols)
+        
+        assert_frame_equal(result, answer[result.columns])
+        
+        
+    def test_custom_aggfunc(self, kpis, answer = None):
+        '''
+        '''
+        
+        if kpis is None:
+            def custom_sum(x):
+                return np.sum(x)
+            
+            kpis = {'int' : custom_sum}
+        
+        answer = answer or pd.DataFrame([
+                           {'id1' : 1, 'id2' : 'a', 'int_custom_sum' : 6},
+                           {'id1' : 3, 'id2' : 'b', 'int_custom_sum' : 55},
+                           {'id1' : 3, 'id2' : 'a', 'int_custom_sum' : 5},
+                           ]).sort_values(self.index_cols).set_index(self.index_cols)
+                
+        result = self.obj.get_kpis_by_aggregation(kpis = kpis).sort_values(self.index_cols).set_index(self.index_cols)
+        
+        assert_frame_equal(result, answer[result.columns])
+            
+        
+    def test_some_wrong_col(self, kpis = None, answer = None):
+        '''
+        '''
+        kpis = kpis or {'bla' : 'sum', 'int' : 'sum'}
+            
+        answer = answer or pd.DataFrame([
+                   {'id1' : 1, 'id2' : 'a', 'int_sum' : 6},
+                   {'id1' : 3, 'id2' : 'a', 'int_sum' : 55},
+                   {'id1' : 3, 'id2' : 'b', 'int_sum' : 5},
+                   ]).sort_values(self.index_cols).set_index(self.index_cols)
+                
+        result = self.obj.get_kpis_by_aggregation(kpis = kpis).sort_values(self.index_cols).set_index(self.index_cols)
+                
+        assert_frame_equal(result, answer[result.columns])
+        
+    def test_all_wrong_cols(self, kpis = None, answer = None):
+        '''
+        '''
+        kpis = kpis or {'bla' : 'sum', 'blub' : 'sum'}
+            
+        result = self.obj.get_kpis_by_aggregation(kpis = kpis)
+            
+        answer = self.data[self.index_cols].drop_duplicates().reset_index(drop = True)
+                
+        assert_frame_equal(result, answer[result.columns])
+        
+if __name__ == '__main__':
+    
+    path_to_log = os.path.join(os.environ.get('PROJECT_DIR'),
+                               'tests', 'test_feature_engineering','test_in_memory_feature_engineering',
+                               'test_kpis_by_aggregation.log')
+    
+    configure_logging(path_to_log)
+    logger = logging.getLogger(__name__)
+
+    inst = TestKpisByAggregation(path_to_log = path_to_log)
+    inst.test_builtin_aggfuncs_numeric_cols()
+    inst.test_dict_kpi()
+    inst.test_string_cols()
+    inst.test_some_wrong_col()
+    inst.test_all_wrong_cols()
+        
+    logger.info('Done testing method get_kpis_by_aggregation!')