Selaa lähdekoodia

resolve conflicts

tsteuer 5 vuotta sitten
vanhempi
commit
c495030ed5

+ 23 - 14
cdplib/FlattenData.py

@@ -20,23 +20,35 @@ class FlattenData():
     def __init__(self):
         self._log = Log("Flatten data")
     
-    def flatten(self, data):
+    def flatten(self, data) -> pd.DataFrame():
         '''
         :parm data: data given in either dictionary, list or dataframe format.
         '''
 
-        assert(isinstance(data, (list, dict, pd.DataFrame))),\
+        assert(isinstance(data, (list, dict, pd.DataFrame, pd.Series))),\
             "Parameter 'data' either be of List, Dictionary or DataFrame type"
-
+        in_length=0
         start = time.time()
         if type(data) is pd.DataFrame:
+            in_length = len(data.columns)
+            return_data = self.flatten_dataframe(data)
+        elif type(data) is pd.Series:
+            data = pd.DataFrame(data)
+            in_length = len(data.columns)
             return_data = self.flatten_dataframe(data)
-            self._log.info(('Data has been flattened, created {} columns in {} seconds').format(len(return_data.columns)- len(data.columns), time.time()-start))
-            return return_data
-        if type(data) is dict:
-            return self.flatten_dict(data)
-        if type(data) is list:
-            return self.flatten_list(data)
+        elif type(data) is dict:
+            in_length = len(data)
+            return_data = self.flatten_dict(data)
+        elif type(data) is list:
+            in_length = len(data)
+            return_data =  self.flatten_list(data)
+        else:
+            self._log.log_and_raise_warning(("Input data type '{}' is not supported").format(type(data)))
+            return None
+
+        result_dataframe = pd.DataFrame.from_dict(return_data, orient='index')
+        self._log.info(('Data has been flattened, created {} columns in {} seconds').format(len(result_dataframe.columns)- in_length, time.time()-start))
+        return result_dataframe
 
     def flatten_dataframe(self, dataframe: pd.DataFrame, incoming_key: str = None):
         '''
@@ -68,8 +80,7 @@ class FlattenData():
 
             result_dict[index] = copy.deepcopy(temp_result_dict)
 
-        result_dataframe = pd.DataFrame.from_dict(result_dict, orient='index')
-        return result_dataframe
+        return result_dict
 
     def flatten_dict(self, dictionary: dict, incoming_key: str = None):
         '''
@@ -122,9 +133,7 @@ class FlattenData():
             key = incoming_key
             if incoming_key is not None:
                 # OEBB SPECIFIC IF STATEMENT
-                if type(data_list[iteration]) is dict:
-                    if 'stationsnummer' in data_list[iteration].keys() and 'stage' in data_list[iteration].keys() :
-
+                if type(data_list[iteration]) is dict and 'stationsnummer' in data_list[iteration].keys() and 'stage' in data_list[iteration].keys() :
                         key = incoming_key + '_' + str(data_list[iteration]['stationsnummer']) + '_' + str(data_list[iteration]['stage'])
                 else:
                     key = incoming_key + '_' + str(iteration)

+ 20 - 11
cdplib/db_handlers/MongodbHandler.py

@@ -22,10 +22,10 @@ import numpy as np
 sys.path.append(os.getcwd())
 from cdplib.log import Log
 from cdplib.db_migration.ParseJsonSchema import ParseJsonSchema
+from cdplib.Singleton_Threadsafe import SingletonThreadsafe
 
-
-#class MongodbHandlerPool(metaclass=SingletonThreadsafe):
-class MongodbHandlerPool():
+class MongodbHandlerPool(metaclass=SingletonThreadsafe):
+#class MongodbHandlerPool():
     '''
     '''
 
@@ -84,6 +84,13 @@ class MongodbHandler:
 
         self._database_name = database_name
 
+    def __del__(self):
+        try:
+            self._client.close()
+        except Exception as e:
+            self._log.log_and_raise_error(('An error occured when trying to dispose the SQL engine. Error: {}').format(e))
+    
+
     def set_database(self, database_name: str):
         '''
         :param str database_name: Name of the database.
@@ -95,7 +102,6 @@ class MongodbHandler:
             self._log.info(('Database: {} didnt exist, it will be created for you once a collection is created in it').format(database_name))
         self._database = self._client[database_name]
 
-
     def drop_database(self):
         '''
         '''
@@ -134,7 +140,7 @@ class MongodbHandler:
             "Parameter 'direction' must be a string type"
             
         self._database[collection_name].create_index([(key, direction)], name=key)
-        #collection.create_index([('field_i_want_to_index', pymongo.TEXT)], name='search_index', default_language='english')
+        
 
     def set_collection_schema(self, collection_name: str, schema_path: str,
                               validation_level: str = 'moderate',validation_action: str = 'error'):
@@ -246,20 +252,23 @@ class MongodbHandler:
         self.set_collection_schema(collection_name=collection_name, schema_path=schema_path)
 
     def query_data_and_generate_dataframe(self, collection_name: str, attribute: str = None,
-                                          attribute_value: str = None, comparison_operator: str = '$eq', index = None):
+                                          attribute_value: str = None, comparison_operator: str = '$eq',
+                                          index = None, return_as_dataframe: bool = True, return_id: bool = False):
         '''
 
         '''
         try:
             if attribute == None or attribute_value == None:
-                data = self._database[collection_name].find()
+                data = self._database[collection_name].find({},{'_id': return_id})
             else:
-                data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}})
+                data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}}, {'_id': return_id})
 
         except Exception as error:
-            self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}: {}:{}. \nError:{}').format(collection_name, attribute_value, comparison_operator, attribute_value, error))
-
-        return self.convert_mongo_data_into_dataframe(data, index, collection_name)
+            self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}: {}:{}. \nError:{}').format(collection_name, attribute, comparison_operator, attribute_value, error))
+        if return_as_dataframe:
+            return self.convert_mongo_data_into_dataframe(data, index, collection_name)
+        else:
+            return data
 
     def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list, index: str = None):
 

+ 16 - 4
cdplib/db_handlers/SQLHandler.py

@@ -19,11 +19,10 @@ from cdplib.log import Log
 from cdplib.Singleton_Threadsafe import SingletonThreadsafe
 
 class SQLHandlerPool(metaclass=SingletonThreadsafe):
-#class SQLHandlerPool():
     '''
     '''
 
-    def __init__(self, size: int = 1):
+    def __init__(self, size: int = 20):
         self._size = size
         self._log = Log(name='SQLHandlerPool')
         self._sql_handlers = [SQLHandler() for _ in range(size)]
@@ -34,9 +33,10 @@ class SQLHandlerPool(metaclass=SingletonThreadsafe):
             self._log.warning("Ran out of SQL handlers, 10 more have been added. Are you sure you've returned yours?")
         return self._sql_handlers.pop()
 
-    def release(self, mongodb_handler):
+    def release(self, sql_handler):
+        sql_handler._engine.dispose()
         if len(self._sql_handlers) < self._size:
-            self._sql_handlers.append(mongodb_handler)
+            self._sql_handlers.append(sql_handler)
 
 class SQLHandler:
     '''
@@ -104,6 +104,10 @@ class SQLHandler:
 
         self._engine = engine
 
+
+    def __del__(self):
+        self.dispose_engine()
+
     @property
     def _connection_params(self) -> dict:
         '''
@@ -501,6 +505,7 @@ class SQLHandler:
         :rtype: DataFrame
         '''
         try:
+            
             connection = self._engine.connect()
 
             data = pd.read_sql(sql=query,
@@ -508,6 +513,7 @@ class SQLHandler:
                                **read_sql_kwargs)
 
             connection.close()
+           
             return data
 
         except Exception as e:
@@ -628,3 +634,9 @@ class SQLHandler:
                             diagram_path,
                             schema=schema,
                             include_tables=include_tables)
+
+    def dispose_engine(self):
+        try:
+            self._engine.dispose()
+        except Exception as e:
+            self._log.log_and_raise_error(('An error occured when trying to dispose the SQL engine. Error: {}').format(e))

+ 34 - 26
cdplib/db_migration/DataFrameToCollection.py

@@ -10,8 +10,10 @@ Created on Mon Jul 22 11:05:47 2019
 """
 
 import pandas as pd
+import numpy as np
 import os
 import sys
+import time
 
 sys.path.append(os.getcwd())
 
@@ -67,6 +69,7 @@ class DataFrameToCollection():
          grp_fields and reshape it accordingly, the result is a pandas Series.
          In the end all the series are collected and concatenated.
         '''
+
         from copy import deepcopy
 
         data = self._melt_duplicated_columns(data)
@@ -77,7 +80,7 @@ class DataFrameToCollection():
             schema = self.schema
 
         for field in schema["properties"]:
-            
+
             if field not in self._unroll_nested_names(data.columns):
                 continue
 
@@ -90,8 +93,8 @@ class DataFrameToCollection():
 
                 # check that there is only one possible value of this field
                 n_distinct_values = data.groupby(grp_fields, sort=False)[field].nunique().max()
-                
-                #n_distinct_valus can be 0 if the column only contains NaN values
+
+                # n_distinct_valus can be 0 if the column only contains NaN values
                 if n_distinct_values > 1:
                     err = "Field {0} is not unique with respect to {1}"\
                           .format(field, grp_fields)
@@ -112,33 +115,34 @@ class DataFrameToCollection():
             elif field_type == "object":
 
                 sub_schema = deepcopy(schema["properties"][field])
-            
+
                 # rename sub-schema properties to match with data column names
                 sub_schema["properties"] =\
                     {".".join([field, k]): v for k, v
                      in sub_schema["properties"].items()}
-                
+
                 sub_data = self.to_list_of_documents(
                             data=data,
                             schema=sub_schema,
                             grp_fields=grp_fields,
                             _final_step=False)
-                
+
                 # Need to be checked since child elements can be empty
                 if sub_data is not None:
+
                     reshaped_field = sub_data.apply(self._make_dict, axis=1)
                     reshaped_field.name = field
-    
+
                     reshaped_fields.append(reshaped_field)
 
             # if field is a list of dictionaries
             elif field_type == "array":
-             
+
 
                 items_type = schema["properties"][field]["items"]["bsonType"]
 
                 if items_type == "object":
-
+                    array_object = time.time()
                     sub_schema = deepcopy(schema["properties"][field]["items"])
 
                     # rename sub-schema properties to match data column names
@@ -158,7 +162,7 @@ class DataFrameToCollection():
 
                         self._log.error(err)
                         raise Exception(err)
-                        
+
                     # group and reshape sub-fields with complex types
                     sub_data = self.to_list_of_documents(
                                 data=data,
@@ -173,16 +177,17 @@ class DataFrameToCollection():
 
                         sub_data.name = field
                         sub_data = sub_data.reset_index(grp_fields)
-
+                        ######################################################
+                        ######## OPTIMIZATIONS MAY BE POSSIBLE HERE ##########
                         reshaped_field =\
                             sub_data.groupby(grp_fields, sort=False)[field]\
                                     .apply(self._make_list_of_distinct)
-
+                        ######################################################
                         reshaped_fields.append(reshaped_field)
 
+
                 # if field is a list of values with simple type
                 elif items_type == "array":
-
                     grp_fields = [c for c in grp_fields if c in data.columns]
 
                     if field in data.columns:
@@ -191,7 +196,6 @@ class DataFrameToCollection():
                                              .apply(self._make_list_of_distinct)
 
                         reshaped_fields.append(reshaped_field)
-
                 else:
 
                     grp_fields = [c for c in grp_fields if c in data.columns]
@@ -268,10 +272,22 @@ class DataFrameToCollection():
          entries are arbitrary objects
          (pandas unique() method does not work if entries are of complex types)
         '''
-        uniques = pd.DataFrame({"temp": x.tolist()})\
-                    .assign(temp_str=lambda y: y["temp"].astype(str))\
-                    .drop_duplicates(subset=["temp_str"])\
-                    .drop("temp_str", axis=1).iloc[:, 0].tolist()
+
+
+        if x.size == 1:
+            uniques = x.tolist()
+            '''
+            if return_value == [{}]:
+                return []
+            return return_value
+            '''
+        else:
+
+            uniques = pd.DataFrame({"temp": x.values})\
+                        .assign(temp_str=lambda y: y["temp"].astype(np.str))\
+                        .drop_duplicates(subset=["temp_str"])\
+                        .drop("temp_str", axis=1).iloc[:, 0].tolist()
+
 
         def is_empty(y):
             is_empty_dict = (isinstance(y, dict) and (len(y) == 0))
@@ -397,11 +413,3 @@ if __name__ == "__main__":
                     data=df,
                     schema=schm,
                     grp_fields=grp_fields)
-
-    
-    
-    
-    
-    
-    
-    

+ 3 - 6
cdplib/db_migration/MigrationCleaning.py

@@ -58,8 +58,8 @@ class MigrationCleaning:
         self._mapping_path = mapping_path
         self._schema_paths = schema_paths
 
-        from cdplib.db_handlers.SQLHandler import SQLHandlerPool
-        self._sql_db = SQLHandlerPool(20)
+        from cdplib.db_handlers.SQLHandler import SQLHandler
+        self._sql_db = SQLHandler()
 
     def _assert_dataframe_input(self, data: pd.DataFrame):
         '''
@@ -221,13 +221,10 @@ class MigrationCleaning:
 
         data = data.copy(deep=True)
 
-        #db = self._sql_db.aquire()
-        from cdplib.db_handlers.SQLHandler import SQLHandler
-        db = SQLHandler()
+        db = self._sql_db
 
         if invalid_mask.sum() == 0:
 
-            #self._sql_db.release(db)
             return data
 
         data_inconsist = data.assign(reason=reason)\

+ 4 - 3
cdplib/unit_tests/TestFlattenData.py

@@ -2,6 +2,7 @@ import unittest
 import sys
 import os
 import pandas as pd
+from pprint import pprint
 sys.path.append(os.getcwd())
 from cdplib.log import Log
 from cdplib.FlattenData import FlattenData
@@ -34,10 +35,10 @@ class TestMongodbHandler(unittest.TestCase):
 
         flattened_dict = self.flattener.flatten(nested_data_dict)
         flattened_list = self.flattener.flatten(nested_data_list)
-        flattened_df = self.flattener.flatten(nested_data_df)
+        flattened_df = self.flattener.flatten(nested_data_df['two_levels']) 
 
-        self.assertEqual(nested_data_dict["two_levels"]["one_level"], flattened_dict['two_levels_one_level'])
-        self.assertEqual(nested_data_dict["two_levels"]["one_level"], flattened_list['0_two_levels_one_level'])
+        self.assertEqual(nested_data_dict["two_levels"]["one_level"], flattened_dict.loc['two_levels_one_level', 0])
+        self.assertEqual(nested_data_dict["two_levels"]["one_level"], flattened_list.loc['0_two_levels_one_level', 0])
         self.assertEqual(nested_data_dict["two_levels"]["one_level"], flattened_df.loc[0 , 'two_levels_one_level'])
         
 if __name__ == '__main__':

+ 1 - 1
cdplib/unit_tests/TestMongodbHandler.py

@@ -81,7 +81,7 @@ class TestMongodbHandler(unittest.TestCase):
         Fetch data and confirms thats it is the same as was entered into the database
         Do the same with more specific query
         '''
-        self.assertEqual(self.mongodb_handler.query_data_and_generate_dataframe(self.first_collection_name).to_dict()['test_value_double'][0], self.valid_input['test_value_double'])
+        self.assertEqual(self.mongodb_handler.query_data_and_generate_dataframe(self.first_collection_name).to_dict()['test_value_double'][0], self.valid_input['test_value_double'])git 
         self.assertEqual(self.mongodb_handler.query_data_and_generate_dataframe(self.first_collection_name, 'test_value_string', 'test_value').to_dict()['test_value_double'][0], self.valid_input['test_value_double'])
     
     def test_F_aggregate_data_and_generate_dataframe(self):