Преглед на файлове

added latest code version

tanja преди 5 години
родител
ревизия
ccc71e2181

+ 15 - 11
cdplib/FlattenData.py

@@ -13,7 +13,7 @@ import time
 import pandas as pd
 import pandas as pd
 import copy
 import copy
 sys.path.append(os.getcwd())
 sys.path.append(os.getcwd())
-from cdplib.log import Log
+from libraries.log import Log
 log = Log("Flatten data")
 log = Log("Flatten data")
 
 
 class FlattenData():
 class FlattenData():
@@ -46,8 +46,9 @@ class FlattenData():
         '''
         '''
         assert(isinstance(dataframe, pd.DataFrame)),\
         assert(isinstance(dataframe, pd.DataFrame)),\
             "Parameter 'dataframe' be of DataFrame type"
             "Parameter 'dataframe' be of DataFrame type"
-        assert(isinstance(incoming_key, str)),\
-            "Parameter 'incoming_key' be of String type"
+        if incoming_key is not None:
+            assert(isinstance(incoming_key, str)),\
+                "Parameter 'incoming_key' be of String type"
 
 
         result_dict = {}
         result_dict = {}
         for index, row in dataframe.iterrows():
         for index, row in dataframe.iterrows():
@@ -64,7 +65,7 @@ class FlattenData():
                     temp_result_dict[key] = value
                     temp_result_dict[key] = value
 
 
                 if len(temp_result) > 0:
                 if len(temp_result) > 0:
-                    result_dict = self.append_to_dict(result_dict, temp_result)
+                    temp_result_dict = self.append_to_dict(temp_result_dict, temp_result)
 
 
             result_dict[index] = copy.deepcopy(temp_result_dict)
             result_dict[index] = copy.deepcopy(temp_result_dict)
         
         
@@ -76,10 +77,11 @@ class FlattenData():
         :param dict dictionary: dictionary containing the data to be flattened
         :param dict dictionary: dictionary containing the data to be flattened
         :param str incoming_key: string to be appended to the key
         :param str incoming_key: string to be appended to the key
         '''
         '''
-        assert(isinstance(dictionary, pd.DataFrame)),\
+        assert(isinstance(dictionary, dict)),\
             "Parameter 'dictionary' be of Dictionary type"
             "Parameter 'dictionary' be of Dictionary type"
-        assert(isinstance(incoming_key, str)),\
-            "Parameter 'incoming_key' be of String type"
+        if incoming_key is not None:
+            assert(isinstance(incoming_key, str)),\
+                "Parameter 'incoming_key' be of String type"
 
 
 
 
         result_dict = {}
         result_dict = {}
@@ -107,10 +109,11 @@ class FlattenData():
         :param str incoming_key: string to be appended to the key
         :param str incoming_key: string to be appended to the key
         '''
         '''
 
 
-        assert(isinstance(data_list, pd.DataFrame)),\
+        assert(isinstance(data_list, list)),\
             "Parameter 'data_list' be of List type"
             "Parameter 'data_list' be of List type"
-        assert(isinstance(incoming_key, str)),\
-            "Parameter 'incoming_key' be of String type"
+        if incoming_key is not None:
+            assert(isinstance(incoming_key, str)),\
+                "Parameter 'incoming_key' be of String type"
 
 
         result_dict = {}
         result_dict = {}
         for iteration, item in enumerate(data_list):
         for iteration, item in enumerate(data_list):
@@ -119,6 +122,7 @@ class FlattenData():
             temp_result = {}
             temp_result = {}
             key = incoming_key
             key = incoming_key
             if incoming_key is not None:
             if incoming_key is not None:
+                # OEBB SPECIFIC IF STATEMENT
                 if type(data_list[iteration]) is dict:
                 if type(data_list[iteration]) is dict:
                     if 'stationsnummer' in data_list[iteration].keys() and 'stage' in data_list[iteration].keys() :
                     if 'stationsnummer' in data_list[iteration].keys() and 'stage' in data_list[iteration].keys() :
                         
                         
@@ -145,7 +149,7 @@ class FlattenData():
         :param dict dictionary: dictionary which holds all the resulting data.
         :param dict dictionary: dictionary which holds all the resulting data.
         :param dict to_append: data to be added to the resulting dictionary.
         :param dict to_append: data to be added to the resulting dictionary.
         '''
         '''
-        assert(isinstance(dictionary, (list, dict))),\
+        assert(isinstance(dictionary, dict)),\
             "Parameter 'dictionary' be of Dictionary type"
             "Parameter 'dictionary' be of Dictionary type"
         assert(isinstance(to_append, dict)),\
         assert(isinstance(to_append, dict)),\
             "Parameter 'to_append' be of Dictionary type"
             "Parameter 'to_append' be of Dictionary type"

+ 178 - 0
cdplib/SimplifiedProcessModel.py

@@ -0,0 +1,178 @@
+
+import sys
+import os
+import time
+sys.path.append(os.getcwd())
+from cdplib.log import Log
+log = Log("Simplified Process Model")
+
+
+class SimplifiedProcessModel:
+
+    def __init__(self):
+        self._log = Log("Simplified Process Model")
+
+    @property
+    def process_stages(self):
+        '''
+        For machine learning predictions we divide the process into
+        big stages, stages can be skipped dependeing on the IHS of the
+        wheelset. We use all information geathered during the previous
+        process stages to make predictions for the next stage.
+        '''
+        import networkx as nx
+        '''
+        critical_stations = {"A": [421, 110]}
+
+        critical_stations["B"] = [130]
+
+        critical_stations["C"] = [140, 150]
+
+        critical_stations["D"] = [410]
+
+        critical_stations["E"] = [510, 520, 535,
+                                530, 550]
+
+        critical_stations["F"] = [490, 480, 430, 170]
+
+        critical_stations["G"] = [595, 190, 630]
+
+        critical_stations["H"] = [640]
+
+        critical_stations["I"] = [650, 560]
+
+        critical_stations["J"] = [675]
+
+        critical_stations["K"] = [690]
+
+        critical_stations["L"] = [710, 670]
+
+        stages_graph = nx.DiGraph()
+
+        for stage in critical_stations:
+            stages_graph.add_node(stage, stations=critical_stations[stage])
+
+        stages_graph.add_edge("A", "B")
+        stages_graph.add_edge("B", "C")
+        stages_graph.add_edge("C", "D")
+        stages_graph.add_edge("D", "E")
+        stages_graph.add_edge("D", "F")
+        stages_graph.add_edge("E", "G")
+        stages_graph.add_edge("F", "G")
+        stages_graph.add_edge("G", "H")
+        stages_graph.add_edge("H", "I")
+        stages_graph.add_edge("I", "J")
+        stages_graph.add_edge("J", "K")
+        stages_graph.add_edge("K", "L")
+        '''
+
+        critical_stations = {"A": [421, 110]}
+
+        critical_stations["B"] = [130]
+
+        critical_stations["C"] = [140, 150]
+
+        critical_stations["D"] = [410]
+
+        critical_stations["E"] = [510, 520, 535,
+                                530, 320, 550]
+
+        #critical_stations["F"] = [490, 480, 430, 595]
+
+        #critical_stations["G"] = [170, 506, 430]
+
+        # Merge the two above since they both contain station 430
+        critical_stations["F"] = [490, 480, 430, 595, 170, 506, 430]
+
+        critical_stations["H"] = [190]
+
+        critical_stations["I"] = [320, 340, 630]
+
+        #This path is in parallel with the rest of the paths. handle as an exception
+        #critical_stations["J"] = [680]
+
+        critical_stations["K"] = [640]
+
+        stages_graph = nx.DiGraph()
+        for stage in critical_stations:
+            stages_graph.add_node(node_for_adding=stage, stations=critical_stations[stage])
+
+        stages_graph.add_edge("A", "B")
+
+        stages_graph.add_edge("B", "C")
+        stages_graph.add_edge("B", "K")
+        #stages_graph.add_edge("B", "J")
+
+        stages_graph.add_edge("C", "D")
+
+        stages_graph.add_edge("D", "E")
+        stages_graph.add_edge("D", "F")
+        #stages_graph.add_edge("D", "G")
+        stages_graph.add_edge("D", "H")
+
+        stages_graph.add_edge("E", "H")
+
+        #stages_graph.add_edge("F", "G")
+
+        #stages_graph.add_edge("G", "H")
+
+        stages_graph.add_edge("E", "I")
+        stages_graph.add_edge("H", "I")
+
+        #stages_graph.add_edge("J", "K")
+
+        stages_graph.add_edge("I", "K")
+
+        return stages_graph
+
+    def process_stages_paths(self):
+        import networkx as nx
+        result_graph = self.process_stages
+
+        result_dict = {}
+        stage_list = result_graph.nodes()
+        for stage in stage_list:
+            for path in nx.all_simple_paths(result_graph, source="A", target=stage):
+                station_list = []
+                for stage in path:
+                        for station in result_graph.nodes()[stage]['stations']:
+                            station_list.append(station)
+                result_dict[str(path)] = station_list
+        return result_dict
+
+    def get_stage(self, station_nummer: int):
+        import networkx as nx
+        result_graph = self.process_stages
+        for path in nx.all_simple_paths(result_graph, source="A", target="K"):
+            station_list = []
+            for stage in path:
+                    for station in result_graph.nodes()[stage]['stations']:
+                        if station == station_nummer:
+                            return stage
+        return
+
+    def calculate_stage_path(self, stations_list: list):
+        position_list = list(self.process_stages.nodes())
+        current_index = 0
+        stage_path = []
+        for station in stations_list:
+            stage = self.get_stage(station)
+            if stage is not None:
+                new_index = position_list.index(stage)
+                if current_index <= new_index:
+                    if stage not in stage_path:
+                        stage_path.append(stage)
+                        current_index = new_index
+                else:
+                    self._log.warning('Path does not conform to the model, no prediction can be made')
+                    return
+        return str(stage_path)
+
+    def get_stations_for_prediction_model(self, stations_list):
+        stages_paths = self.process_stages_paths()
+        prediction_stations = stages_paths[self.calculate_stage_path(stations_list)]
+
+        # Hack solution for handling that station 680 is running in parallel with most other stations
+        if len(prediction_stations) > 3:
+            prediction_stations.append(680)
+        return prediction_stations

+ 59 - 37
cdplib/db_handlers/MongodbHandler.py

@@ -21,15 +21,17 @@ import pandas as pd
 import numpy as np
 import numpy as np
 
 
 sys.path.append(os.getcwd())
 sys.path.append(os.getcwd())
-from cdplib.log import Log
-from cdplib.Singleton_Threadsafe import SingletonThreadsafe
+from libraries.log import Log
+from libraries.configuration import default as cfg
+from libraries.Singleton_Threadsafe import SingletonThreadsafe
 
 
 
 
-class MongodbHandlerPool(metaclass=SingletonThreadsafe):
+#class MongodbHandlerPool(metaclass=SingletonThreadsafe):
+class MongodbHandlerPool():
     '''
     '''
     '''
     '''
 
 
-    def __init__(self, size: int = 10):
+    def __init__(self, size: int = 1):
         self._size = size
         self._size = size
         self._mongodb_handlers = [MongodbHandler() for _ in range(size)]
         self._mongodb_handlers = [MongodbHandler() for _ in range(size)]
 
 
@@ -38,7 +40,7 @@ class MongodbHandlerPool(metaclass=SingletonThreadsafe):
             self._mongodb_handlers = [MongodbHandler() for _ in range(self._size)]
             self._mongodb_handlers = [MongodbHandler() for _ in range(self._size)]
             log.warning("Ran out of Mongodb handlers, 10 more have been added. Are you sure you've returned yours?")
             log.warning("Ran out of Mongodb handlers, 10 more have been added. Are you sure you've returned yours?")
         return self._mongodb_handlers.pop()
         return self._mongodb_handlers.pop()
-
+        
     def release(self, mongodb_handler):
     def release(self, mongodb_handler):
         if len(self._mongodb_handlers) < self._size:
         if len(self._mongodb_handlers) < self._size:
             self._mongodb_handlers.append(mongodb_handler)
             self._mongodb_handlers.append(mongodb_handler)
@@ -58,8 +60,6 @@ class MongodbHandler:
         '''
         '''
         if database_url is None:
         if database_url is None:
 
 
-            from libraries.configuration import default as cfg
-
             database_url = "mongodb://{0}:{1}@{2}:{3}"\
             database_url = "mongodb://{0}:{1}@{2}:{3}"\
                            .format(cfg["MONGO"]["MONGO_USER"],
                            .format(cfg["MONGO"]["MONGO_USER"],
                                    cfg["MONGO"]["MONGO_PASSWORD"],
                                    cfg["MONGO"]["MONGO_PASSWORD"],
@@ -186,7 +186,11 @@ class MongodbHandler:
                     'validationAction': validation_action
                     'validationAction': validation_action
                     }
                     }
 
 
-        self._database.command(command)
+        try:
+            self._database.command(command)
+
+        except Exception as error:
+            self._log.log_and_raise_error(('An error occured when trying to set a schema for the collection: {}. \nError: {}').format(collection_name, error))
 
 
     def create_collection(self, collection_name):
     def create_collection(self, collection_name):
         '''
         '''
@@ -197,8 +201,12 @@ class MongodbHandler:
             "Parameter 'collection_name' must be a string type"
             "Parameter 'collection_name' must be a string type"
 
 
         if collection_name not in self._database.list_collection_names():
         if collection_name not in self._database.list_collection_names():
-            self._log.info(("Collection '{}' has been created").format(collection_name))
-            return self._database.create_collection(collection_name)
+            try:
+                self._log.info(("Collection '{}' has been created").format(collection_name))
+                return self._database.create_collection(collection_name)
+            
+            except Exception as error:
+                self._log.log_and_raise_error(('An error occured while creating the new collection: {}. \nError: {}').format(collection_name, error))
         else:
         else:
             self._log.info(("Collection '{}' already exists").format(collection_name))
             self._log.info(("Collection '{}' already exists").format(collection_name))
             return self._database[collection_name]
             return self._database[collection_name]
@@ -228,16 +236,20 @@ class MongodbHandler:
 
 
             data = simplejson.loads(data.to_json(date_format="iso"))
             data = simplejson.loads(data.to_json(date_format="iso"))
 
 
-        if (len(data) == 1) or (isinstance(data, dict)):
+        try:
+            if (len(data) == 1) or (isinstance(data, dict)):
 
 
-            if isinstance(data, pd.DataFrame) and (len(data) == 1):
-                data = data.iloc[0]
-            elif type(data) is list:
-                data = data[0]
+                if isinstance(data, pd.DataFrame) and (len(data) == 1):
+                    data = data.iloc[0]
+                elif type(data) is list:
+                    data = data[0]
 
 
-            self._database[collection_name].insert_one(data)
-        else:
-            self._database[collection_name].insert_many(data, ordered=ordered)
+                self._database[collection_name].insert_one(data)
+            else:
+                self._database[collection_name].insert_many(data, ordered=ordered)
+        
+        except Exception as error:
+            self._log.log_and_raise_error(('An error occured when trying to insert data into {}, {}. \nError: {}').format(self._database_name, collection_name, error))
 
 
         self._log.info(('Data has been inserted into the {} collection').format(collection_name))
         self._log.info(('Data has been inserted into the {} collection').format(collection_name))
 
 
@@ -259,29 +271,39 @@ class MongodbHandler:
         '''
         '''
 
 
         '''
         '''
-        if attribute == None or attribute_value == None:
-            data = self._database[collection_name].find()
-        else:
-            data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}})
+        try:
+            if attribute == None or attribute_value == None:
+                data = self._database[collection_name].find()
+            else:
+                data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}})
 
 
-        if data.count() > 0:
-            df = pd.DataFrame(list(data))
-            df.set_index('radsatznummer', inplace=True)
-            return df
-        else:
-            self._log.warning(('No data for the query was found').format())
-
-    def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list):
+        except Exception as error:
+            self._log.log_and_raise_error(('An error occured trying to query data from {}, with query {}: {}:{}. \nError:{}').format(collection_name, attribute_value, comparison_operator, attribute_value, error))
 
 
-        data = list(self._database[collection_name].aggregate(pipeline=aggregation_pipeline, allowDiskUse=True))
+        return self.convert_mongo_data_into_dataframe(data)
 
 
-        if len(data)> 0:
-            df = pd.DataFrame(data)
-            df.set_index('radsatznummer', inplace=True)
-            return df
-        else:
-            self._log.warning(('No data for the query was found').format())
+    def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list):
 
 
+        try:
+            data = self._database[collection_name].aggregate(pipeline=aggregation_pipeline, allowDiskUse=True)
+         
+        except Exception as error:
+            self._log.log_and_raise_error(('A problem occured when aggregating the collection {} with the pipeline {}. \nError: {}').format(collection_name, aggregation_pipeline, error))
+
+        return self.convert_mongo_data_into_dataframe(data)
+
+    def convert_mongo_data_into_dataframe(self, data) -> pd.DataFrame():
+        
+        data = list(data)
+        try:
+            if len(data)> 0:
+                df = pd.DataFrame(data)
+                df.set_index('radsatznummer', inplace=True)
+                return df
+            else:
+                self._log.warning(('No data for the query was found').format())
+        except Exception as error:
+            self._log.log_and_raise_error(('An error occured trying to convert mongo data into pd.Dataframe. \nError: {} ').format(errors))
 
 
     def update_data_in_collection(self, query_label: str, query_value: str, update_label:str, update_value: str, collection_name:str):
     def update_data_in_collection(self, query_label: str, query_value: str, update_label:str, update_value: str, collection_name:str):
         self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}})
         self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}})

+ 151 - 0
cdplib/db_handlers/OebbMongodbHandler.py

@@ -0,0 +1,151 @@
+"""
+Created on Mon Sep 16 13:27:44 2019
+
+@author: oskar
+@description: Oebb specific Mongodb Handler which inherits from the 'general' Mongodb Handler
+"""
+
+
+import json
+import simplejson
+import sys
+import os
+import jsonref
+
+from copy import deepcopy
+from pymongo import MongoClient
+import pandas as pd
+import numpy as np
+
+sys.path.append(os.getcwd())
+from libraries.log import Log
+from libraries.configuration import default as cfg
+from libraries.db_handlers.MongodbHandler import MongodbHandler
+from libraries.Singleton_Threadsafe import SingletonThreadsafe
+
+
+class OebbMongodbHandlerPool(metaclass=SingletonThreadsafe):
+    '''
+    '''
+
+    def __init__(self, size: int = 10):
+        self._size = size
+        self._mongodb_handlers = [OebbMongodbHandler() for _ in range(size)]
+
+    def aquire(self):
+        while not self._mongodb_handlers:
+            self._mongodb_handlers = [OebbMongodbHandler() for _ in range(self._size)]
+            log.warning("Ran out of Mongodb handlers, 10 more have been added. Are you sure you've returned yours?")
+        return self._mongodb_handlers.pop()
+        
+    def release(self, mongodb_handler):
+        if len(self._mongodb_handlers) < self._size:
+            self._mongodb_handlers.append(mongodb_handler)
+
+
+class OebbMongodbHandler(MongodbHandler):
+
+    def __init__(self, database_url: str = None,
+                 database_name: str = None):
+        '''
+        '''
+        super().__init__(
+                database_url=database_url,
+                database_name=database_name)
+
+        self._log = Log("Oebb Mongodb Handler")
+
+
+    def query_process_instances_with_stage_filtering(self, stage_path: list):
+        '''
+        :param list stage_path: Stage path for which the data will be queried. 
+
+        Method will return all wheelsets which stage_path is a subset of their final stage path. 
+        The wheelsets will only be returned with the actual stations from the stages in the stage_path.
+        '''
+    
+        assert(isinstance(stage_path, list)),\
+            "Parameter 'stage_path must be a list type"
+
+        aggregation_pipeline = [
+                                {"$match": {"$expr": {"$setIsSubset": [stage_path, '$final_state.stage_path']}}},
+                                {"$unwind": "$process"},
+                                {"$sort": {"process.step_number": 1}},
+                                {"$group": {
+                                    "_id": {
+                                            "_id": "$_id", 
+                                            "radsatznummer": "$radsatznummer",
+                                            "final_state": "$final_state"
+                                            }, 
+                                        "process": {"$push": "$process"}
+                                    }
+                                },
+                                {"$project": {
+                                    "_id": "$_id._id",
+                                    "radsatznummer": "$_id.radsatznummer",
+                                    "final_state": "$_id.final_state",
+                                    "process":{
+                                        "$filter":{
+                                            "input": "$process",
+                                            "as": "process",
+                                            "cond":{
+                                                "$setIsSubset": [["$$process.stage"], stage_path]
+                                            },      
+                                        }
+                                    }
+                                }
+                                }
+                            ]
+
+        return self.aggregate_data_and_generate_dataframe('process_instances',aggregation_pipeline)                
+
+    def query_process_instances_with_guaranteed_station_sorting(self, attribute: str = None, 
+                                                attribute_value: str = None, comparison_operator: str = '$eq', ascending: bool = True):
+        '''
+        :param str attribute:
+        :param str attribute_value:
+        :param str comparison_operator:
+        :param bool ascending:
+
+        Method will return all wheelsets which stage_path is a subset of their final stage path. 
+        The wheelsets will only be returned with the actual stations from the stages in the stage_path.
+        '''
+        if attribute is not None:
+            assert(isinstance(attribute, str)),\
+                "Parameter 'attribute' must be a string type"
+
+        if attribute_value is not None:
+            assert(isinstance(attribute_value, (str, int, bool))),\
+                "Parameter 'attribute_value' must be a string, integer or boolean type"
+    
+        assert(isinstance(comparison_operator, str)),\
+            "Parameter 'comparison_operator' must be a string type"
+
+        assert(isinstance(ascending, bool)),\
+            "Parameter 'ascending' must be a boolean type"
+
+        
+        order = 1 if ascending else -1
+
+        aggregation_pipeline = [
+                                {"$match":{ attribute:{comparison_operator: attribute_value}}},
+                                {"$unwind": "$process"},
+                                {"$sort": {"process.step_number": order}},
+                                {"$group": {
+                                    "_id": {
+                                            "_id": "$_id", 
+                                            "radsatznummer": "$radsatznummer",
+                                            "final_state": "$final_state"
+                                            }, 
+                                        "process": {"$push": "$process"}
+                                    }
+                                },
+                                {"$project": {
+                                    "_id": "$_id._id",
+                                    "radsatznummer": "$_id.radsatznummer",
+                                    "final_state": "$_id.final_state",
+                                    "process": "$process"}
+                                }
+                            ]
+                    
+        return self.aggregate_data_and_generate_dataframe('process_instances',aggregation_pipeline)

+ 12 - 10
cdplib/db_handlers/SQLHandler.py

@@ -15,22 +15,25 @@ import pandas as pd
 import warnings
 import warnings
 
 
 sys.path.append(os.getcwd())
 sys.path.append(os.getcwd())
-from cdplib.Singleton_Threadsafe import SingletonThreadsafe
+from libraries.log import Log
+from libraries.Singleton_Threadsafe import SingletonThreadsafe
 
 
 class SQLHandlerPool(metaclass=SingletonThreadsafe):
 class SQLHandlerPool(metaclass=SingletonThreadsafe):
+#class SQLHandlerPool():
     '''
     '''
     '''
     '''
 
 
-    def __init__(self, size: int = 10):
+    def __init__(self, size: int = 1):
         self._size = size
         self._size = size
+        self._log = Log(name='SQLHandlerPool')
         self._sql_handlers = [SQLHandler() for _ in range(size)]
         self._sql_handlers = [SQLHandler() for _ in range(size)]
 
 
     def aquire(self):
     def aquire(self):
         while not self._sql_handlers:
         while not self._sql_handlers:
             self._sql_handlers = [SQLHandler() for _ in range(self._size)]
             self._sql_handlers = [SQLHandler() for _ in range(self._size)]
-            log.warning("Ran out of SQL handlers, 10 more have been added. Are you sure you've returned yours?")
+            self._log.warning("Ran out of SQL handlers, 10 more have been added. Are you sure you've returned yours?")
         return self._sql_handlers.pop()
         return self._sql_handlers.pop()
-
+        
     def release(self, mongodb_handler):
     def release(self, mongodb_handler):
         if len(self._sql_handlers) < self._size:
         if len(self._sql_handlers) < self._size:
             self._sql_handlers.append(mongodb_handler)
             self._sql_handlers.append(mongodb_handler)
@@ -56,15 +59,14 @@ class SQLHandler:
              for db2: ibm_db_sa
              for db2: ibm_db_sa
         '''
         '''
 
 
-        from cdplib.log import Log
+        
+        from libraries.configuration import default as cfg
         from sqlalchemy_utils import database_exists, create_database
         from sqlalchemy_utils import database_exists, create_database
 
 
         self._log = Log(name='SQLHandler')
         self._log = Log(name='SQLHandler')
 
 
         if db_uri is None:
         if db_uri is None:
 
 
-            from libraries.configuration import default as cfg
-
             db_uri = "mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset=utf8&local_infile=1"\
             db_uri = "mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset=utf8&local_infile=1"\
                      .format(cfg["SQL"]["SQL_USER"],
                      .format(cfg["SQL"]["SQL_USER"],
                              cfg["SQL"]["SQL_PASSWORD"],
                              cfg["SQL"]["SQL_PASSWORD"],
@@ -144,7 +146,7 @@ class SQLHandler:
         self.execute("DROP DATABASE IF EXISTS {}".format(database))
         self.execute("DROP DATABASE IF EXISTS {}".format(database))
         self._engine.execute("CREATE DATABASE {}".format(database))
         self._engine.execute("CREATE DATABASE {}".format(database))
         self._engine.execute("USE {}".format(database))
         self._engine.execute("USE {}".format(database))
-
+        
     @property
     @property
     def _db_metadata(self) -> dict:
     def _db_metadata(self) -> dict:
         '''
         '''
@@ -204,7 +206,7 @@ class SQLHandler:
         '''
         '''
         connection = self._engine.connect()
         connection = self._engine.connect()
         transaction = connection.begin()
         transaction = connection.begin()
-
+    
         errors = []
         errors = []
 
 
         # in the case of multi-query execute each query
         # in the case of multi-query execute each query
@@ -505,7 +507,7 @@ class SQLHandler:
             data = pd.read_sql(sql=query,
             data = pd.read_sql(sql=query,
                                con=connection,
                                con=connection,
                                **read_sql_kwargs)
                                **read_sql_kwargs)
-            #self._engine.dispose()
+                               
             connection.close()
             connection.close()
             return data
             return data
 
 

BIN
cdplib/db_handlers/__pycache__/MongodbHandler.cpython-37.pyc


BIN
cdplib/db_handlers/__pycache__/SQLHandler.cpython-37.pyc


+ 0 - 396
cdplib/db_migration/DataFrameToCollection.py

@@ -1,396 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-"""
-Created on Mon Jul 22 11:05:47 2019
-
-@author: tanya
-
-@description: a function to reshape a pandas dataframe to a list of
-(possibly nested) documents with respect to a (json) mongodb schema
-"""
-
-import pandas as pd
-import numpy as np
-import os
-import sys
-
-sys.path.append(os.getcwd())
-
-
-class DataFrameToCollection():
-    '''
-    '''
-    def __init__(self, schema_path: str):
-        '''
-        '''
-        from cdplib.log import Log
-        import json
-
-        self._log = Log("ParseJsonSchema")
-
-
-        if not os.path.isfile(schema_path):
-            err = "JsonSchema not found"
-            self._log.error(err)
-            raise FileNotFoundError(err)
-
-        # load schema to dictionary if it is a valid json file
-        try:
-            with open(schema_path, "r") as f:
-                self.schema = json.load(f)
-
-        except Exception as e:
-            err = ("Could not load json schema, "
-                   "Obtained error {}".format(e))
-
-            self._log.error(err)
-            raise Exception(err)
-
-
-    def to_list_of_documents(self, data: pd.DataFrame,
-                             grp_fields: list,
-                             schema: dict = None,
-                             _final_step: bool = True) -> list:
-        '''
-        Reshapes a pandas dataframe to a list of documents according
-         to a complex (json) mongodb schema
-
-         Remark1: column names of data need to reflect the "nestedness"
-         of the field in the mongodb schema with the help of a "." separator
-         Example: field.sub_field_1, field.sub_field_2
-
-         Remark2: if the schema is stored as a json file, first load it
-         to a dictionary with the help of the python json module
-
-         The function goes recurisively through all the fields and reshapes
-         them correspondingly depending on whether the field is an array,
-         an object, or simple field. For each field we group the data by the
-         grp_fields and reshape it accordingly, the result is a pandas Series.
-         In the end all the series are collected and concatenated.
-        '''
-        from copy import deepcopy
-
-        data = self._melt_duplicated_columns(data)
-
-        reshaped_fields = []
-
-        if schema is None:
-            schema = self.schema
-
-        for field in schema["properties"]:
-
-            if field not in self._unroll_nested_names(data.columns):
-                continue
-
-            field_type = schema["properties"][field]["bsonType"]
-
-            # if field has a simple type
-            if field_type not in ["array", "object"]:
-
-                grp_fields = [c for c in grp_fields if c in data.columns]
-
-                # check that there is only one possible value of this field
-                n_distinct_values = data.groupby(grp_fields, sort=False)[field].nunique().max()
-
-                if n_distinct_values != 1:
-                    err = "Field {0} is not unique with respect to {1}"\
-                          .format(field, grp_fields)
-
-                    self._log.error(err)
-                    raise Exception(err)
-
-                if field not in grp_fields:
-                    reshaped_field = data.groupby(grp_fields, sort=False)[field].first()
-                else:
-                    reshaped_field =\
-                        data[grp_fields].drop_duplicates()\
-                        .set_index(grp_fields, drop=False)[field]
-
-                reshaped_fields.append(reshaped_field)
-
-            # if field is sub-document (dictionary)
-            elif field_type == "object":
-
-                sub_schema = deepcopy(schema["properties"][field])
-
-                # rename sub-schema properties to match with data column names
-                sub_schema["properties"] =\
-                    {".".join([field, k]): v for k, v
-                     in sub_schema["properties"].items()}
-
-                sub_data = self.to_list_of_documents(
-                            data=data,
-                            schema=sub_schema,
-                            grp_fields=grp_fields,
-                            _final_step=False)
-
-                reshaped_field = sub_data.apply(self._make_dict, axis=1)
-                reshaped_field.name = field
-
-                reshaped_fields.append(reshaped_field)
-
-            # if field is a list of dictionaries
-            elif field_type == "array":
-
-                items_type = schema["properties"][field]["items"]["bsonType"]
-
-                if items_type == "object":
-
-                    sub_schema = deepcopy(schema["properties"][field]["items"])
-
-                    # rename sub-schema properties to match data column names
-                    sub_schema["properties"] =\
-                        {".".join([field, k]): v for k, v in
-                         sub_schema["properties"].items()}
-
-                    # extend grp fields by sub-fields of field simple types
-                    sub_grp_fields = [f for f in sub_schema["properties"]
-                                      if (sub_schema["properties"][f]["bsonType"] not in ["array", "object"])
-                                      and (f in data.columns)]
-
-                    if len(sub_grp_fields) == 0:
-                        err = ("One of the sub-keys in a list of documents"
-                               " must be of simple type for the field {}"
-                               .format(field))
-
-                        self._log.error(err)
-                        raise Exception(err)
-
-                    # group and reshape sub-fields with complex types
-                    sub_data = self.to_list_of_documents(
-                                data=data,
-                                schema=sub_schema,
-                                grp_fields=grp_fields + sub_grp_fields,
-                                _final_step=False)
-
-                    if sub_data is not None:
-
-                        # gether the results into a list of dictionaries
-                        sub_data = sub_data.apply(self._make_dict, axis=1)
-
-                        sub_data.name = field
-                        sub_data = sub_data.reset_index(grp_fields)
-
-                        reshaped_field =\
-                            sub_data.groupby(grp_fields, sort=False)[field]\
-                                    .apply(self._make_list_of_distinct)
-
-                        reshaped_fields.append(reshaped_field)
-
-                # if field is a list of values with simple type
-                elif items_type == "array":
-
-                    grp_fields = [c for c in grp_fields if c in data.columns]
-
-                    if field in data.columns:
-
-                        reshaped_field = data.groupby(grp_fields, sort=False)[field]\
-                                             .apply(self._make_list_of_distinct)
-
-                        reshaped_fields.append(reshaped_field)
-
-                else:
-
-                    grp_fields = [c for c in grp_fields if c in data.columns]
-
-                    if field in data.columns:
-
-                        reshaped_field = data.groupby(grp_fields, sort=False)[field]\
-                                             .apply(self._make_flattened_list_of_distinct)
-
-                        reshaped_fields.append(reshaped_field)
-
-        if len(reshaped_fields) > 0:
-
-            reshaped_fields = pd.concat(reshaped_fields, sort=False, axis=1)
-
-            if _final_step:
-                # dropping the index names if it is the final step,
-                # if not the index is needed for merging
-                reshaped_fields =\
-                    reshaped_fields.drop(list(reshaped_fields.index.names), axis=1, errors="ignore")\
-                                   .reset_index(drop=False)
-
-                self._log.info("Done reshaping the dataframe to a list of documents")
-
-            return reshaped_fields
-
-        else:
-            return
-
-    def _melt_duplicated_columns(self, data: pd.DataFrame) -> pd.DataFrame:
-        '''
-        '''
-        data = data.copy(deep=True)
-
-        for c in set(data.columns):
-            if isinstance(data[c], pd.DataFrame):
-                """
-                data = pd.melt(data, id_vars=[cc for cc in data.columns
-                                              if cc != c], value_vars=c)\
-                         .drop("variable", axis=1)\
-                         .rename(columns={"value": c})
-                """
-                data["temp"] = data[c].apply(self._make_list, axis=1)
-                data.drop(c, axis=1, inplace=True)
-                data = data.rename(columns={"temp": c})
-
-        return data
-
-    def _make_dict(self, x: pd.Series) -> dict:
-        '''
-        Transforms pandas series to a dictionary
-         is meant to be applied to a dataframe in axis = 1,
-         then the index of the input series are the column names
-         of the dataframe
-        '''
-        def custom_is_null(y):
-            if isinstance(pd.notnull(y), bool):
-                return pd.notnull(y)
-            else:
-                return True
-
-        return {f.split(".")[-1]: x[f] for f in x.index
-                if custom_is_null(x[f])}
-
-    def _make_list(self, x: pd.Series) -> list:
-        '''
-        return: list of values in a series
-        '''
-        return list(x)
-
-    def _make_list_of_distinct(self, x: pd.Series) -> list:
-        '''
-        return: list of unique values from a Series where
-         entries are arbitrary objects
-         (pandas unique() method does not work if entries are of complex types)
-        '''
-        uniques = pd.DataFrame({"temp": x.tolist()})\
-                    .assign(temp_str=lambda y: y["temp"].astype(str))\
-                    .drop_duplicates(subset=["temp_str"])\
-                    .drop("temp_str", axis=1).iloc[:, 0].tolist()
-
-        def is_empty(y):
-            is_empty_dict = (isinstance(y, dict) and (len(y) == 0))
-            is_empty_list = (isinstance(y, list) and (len(y) == 0))
-            return is_empty_dict or is_empty_list
-
-        return [el for el in uniques if not is_empty(el)]
-
-    def _make_flattened_list_of_distinct(self, x: pd.Series) -> list:
-        '''
-        return: list of unique values from a Series where
-         entries are arbitrary objects
-         (pandas unique() method does not work if entries are of complex types)
-        '''
-        uniques = self._make_list_of_distinct(x)
-        return uniques[0]
-
-    def _unroll_nested_names(self, names: list) -> list:
-        '''
-        Example: transform a list ["name.firstname", "name.surname"]
-        into ["name", "name.firstname", "name.surname"]
-        '''
-        unrolled = []
-
-        for c in names:
-            splitted = c.split(".")
-            for i in range(len(splitted)):
-                unrolled.append(".".join(splitted[:i+1]))
-
-        return unrolled
-
-
-if __name__ == "__main__":
-
-    # Testing
-
-    df = pd.DataFrame({
-                       "a": [1]*8 + [2]*8,
-                       "b": [10]*8 + [20]*8,
-                       "c": [100, 200]*8,
-                       "d.da": [11]*8 + [22]*8,
-                       "d.db": [33]*8 + [34]*8,
-                       "e.ea.eaa": [5]*8 + [55]*8,
-                       "e.ea.eab": [6]*8 + [66]*8,
-                       "e.eb": [2, 2, 3, 3]*4,
-                       "e.ec.eca": [1, 2, 3, 4]*4,
-                       "e.ec.ecb": [5, 6, 7, 8]*4,
-                       "f.fa": [1]*4 + [3]*4 + [11]*4 + [33]*4,
-                       "f.fb": [2]*4 + [3]*2 + [4]*2 + [22]*4 + [44]*4})
-
-    duplicate = pd.DataFrame({"c": [300, 400]*8})
-
-    df = pd.concat([df, duplicate], axis=1)
-
-    schm = {
-              "bsonType": "object",
-              "required": ["a"],
-              "properties": {
-
-                  "a": {"bsonType": "integer"},
-
-                  "b": {"bsonType": "integer"},
-
-                  "c": {
-                      "bsonType": "array",
-                      "items": {"bsonType": "integer"}
-                  },
-                  "d": {
-                      "bsonType": "object",
-                      "properties": {
-                          "da": {"bsonType": "integer"},
-                          "db": {"bsonType": "integer"}
-                       }
-                  },
-                  "e": {
-                      "bsonType": "object",
-                      "properties": {
-                          "ea": {
-                              "bsonType": "object",
-                              "properties": {
-                                  "eaa": {"bsonType": "integer"},
-                                  "eab": {"bsonType": "integer"}
-                               }
-
-                          },
-
-                          "eb": {
-                              "bsonType": "array",
-                              "items": {"bsonType": "integer"}
-                          },
-
-                          "ec": {
-                                "bsonType": "array",
-                                "items": {
-                                  "bsonType": "object",
-                                  "properties": {
-                                      "eca": {"bsonType": "integer"},
-                                      "ecb": {"bsonType": "integer"}
-                                    }
-                                  }
-                          }
-                      }
-                  },
-                  "f": {
-                      "bsonType": "array",
-                      "items": {
-                          "bsonType": "object",
-                          "properties": {
-                              "fa": {"bsonType": "integer"},
-                              "fb": {
-                                  "bsonType": "array",
-                                  "items": {"bsonType": "integer"}
-                              }
-                          }
-                      }
-                  }
-              }
-              }
-
-    grp_fields = ["a"]
-
-    result = DataFrameToCollection().to_list_of_documents(
-                    data=df,
-                    schema=schm,
-                    grp_fields=grp_fields)

+ 12 - 12
cdplib/db_migration/MigrationCleaning.py

@@ -14,11 +14,11 @@ import gc
 
 
 sys.path.append(os.getcwd())
 sys.path.append(os.getcwd())
 
 
-from cdplib.db_migration.ParseMapping import ParseMapping
-from cdplib.db_migration.ParseJsonSchema import ParseJsonSchema
-from cdplib.utils.ExceptionsHandler import ExceptionsHandler
-from cdplib.utils.CleaningUtils import CleaningUtils
-from cdplib.log import Log
+from libraries.db_migration.ParseMapping import ParseMapping
+from libraries.db_migration.ParseJsonSchema import ParseJsonSchema
+from libraries.utils.ExceptionsHandler import ExceptionsHandler
+from libraries.utils.CleaningUtils import CleaningUtils
+from libraries.log import Log
 
 
 class MigrationCleaning:
 class MigrationCleaning:
     '''
     '''
@@ -58,7 +58,7 @@ class MigrationCleaning:
         self._mapping_path = mapping_path
         self._mapping_path = mapping_path
         self._schema_paths = schema_paths
         self._schema_paths = schema_paths
 
 
-        from cdplib.db_handlers.SQLHandler import SQLHandlerPool
+        from libraries.db_handlers.SQLHandler import SQLHandlerPool
         self._sql_db = SQLHandlerPool(20)
         self._sql_db = SQLHandlerPool(20)
 
 
     def _assert_dataframe_input(self, data: pd.DataFrame):
     def _assert_dataframe_input(self, data: pd.DataFrame):
@@ -213,8 +213,6 @@ class MigrationCleaning:
                              reason: (str, pd.Series)) -> pd.DataFrame:
                              reason: (str, pd.Series)) -> pd.DataFrame:
         '''
         '''
         '''
         '''
-        from cdplib.db_handlers.SQLHandler import SQLHandler
-
         assert((self._inconsist_report_table is not None) and
         assert((self._inconsist_report_table is not None) and
                (self._filter_index_columns is not None)),\
                (self._filter_index_columns is not None)),\
             "Inconsistent report table or filter index is not provided"
             "Inconsistent report table or filter index is not provided"
@@ -223,11 +221,13 @@ class MigrationCleaning:
 
 
         data = data.copy(deep=True)
         data = data.copy(deep=True)
 
 
-        db = self._sql_db.aquire()#SQLHandler()
+        #db = self._sql_db.aquire()
+        from libraries.db_handlers.SQLHandler import SQLHandler
+        db = SQLHandler()
 
 
         if invalid_mask.sum() == 0:
         if invalid_mask.sum() == 0:
 
 
-            self._sql_db.release(db)
+            #self._sql_db.release(db)
             return data
             return data
 
 
         data_inconsist = data.assign(reason=reason)\
         data_inconsist = data.assign(reason=reason)\
@@ -265,7 +265,7 @@ class MigrationCleaning:
 
 
         data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True)
         data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True)
 
 
-        self._sql_db.release(db)
+        #self._sql_db.release(db)
 
 
         return data
         return data
 
 
@@ -517,7 +517,7 @@ if __name__ == "__main__":
 
 
     # testing
     # testing
 
 
-    from cdplib.db_handlers.SQLHandler import SQLHandler
+    from libraries.db_handlers.SQLHandler import SQLHandler
 
 
     mapping_path = os.path.join(".", "migration_mappings", "rs1_mapping.json")
     mapping_path = os.path.join(".", "migration_mappings", "rs1_mapping.json")
 
 

+ 0 - 62
cdplib/db_migration/ParseDbSchema.py

@@ -1,62 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-"""
-Created on Wed Sep 25 08:22:20 2019
-
-@author: tanya
-"""
-
-import os
-import sys
-import abc
-sys.path.append(os.getcwd())
-
-
-class ParseDbSchema(metaclass=abc.ABCMeta):
-    '''
-    '''
-    def __init__(self, schema_paths: [list, str], log_file: str = None):
-        '''
-        '''
-        from cdplib.log import Log
-
-        self._log = Log(name="ParseDbSchema:", log_file=log_file)
-
-        if isinstance(schema_paths, str):
-            schema_paths = [schema_paths]
-
-        for schema_path in schema_paths:
-            if not os.path.isfile(schema_path):
-                err = "Schema not found"
-                self._log.error(err)
-                raise FileNotFoundError(err)
-
-    @abc.abstractmethod
-    def get_fields(self) -> list:
-        '''
-        '''
-        return
-
-    @abc.abstractmethod
-    def get_datetime_fields(self) -> list:
-        '''
-        '''
-        return
-
-    @abc.abstractmethod
-    def get_python_types(self) -> list:
-        '''
-        '''
-        return
-
-    @abc.abstractmethod
-    def get_default_values(self) -> list:
-        '''
-        '''
-        return
-
-    @abc.abstractmethod
-    def get_allowed_values(self) -> list:
-        '''
-        '''
-        return

+ 0 - 354
cdplib/db_migration/ParseJsonSchema.py

@@ -1,354 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-"""
-Created on Thu Jan 31 11:41:48 2019
-
-@author: tanya
-"""
-
-import os
-import sys
-from copy import deepcopy
-import numpy as np
-
-sys.path.append(os.getcwd())
-
-from cdplib.db_migration.ParseDbSchema import ParseDbSchema
-
-
-class ParseJsonSchema(ParseDbSchema):
-    '''
-    Class for retrieving column properties from mongodb jsonSchema
-    '''
-
-    def __init__(self, schema_paths: [list, str], log_file: str = None):
-        '''
-        '''
-        import json
-        from cdplib.log import Log
-
-        super().__init__(schema_paths=schema_paths, log_file=log_file)
-
-        self._log = Log(name="ParseJsonSchema", log_file=log_file)
-
-        # load schemas to dictionaries if they are valid json files
-
-        assert(isinstance(schema_paths, (list, str))),\
-            "Schema paths must be either str or lists"
-
-        if isinstance(schema_paths, str):
-            schema_paths = [schema_paths]
-
-        self._schema_paths = schema_paths
-
-        self.schemas = []
-
-        for schema_path in schema_paths:
-            try:
-                with open(schema_path, "r") as f:
-                    self.schemas.append(json.load(f))
-
-            except Exception as e:
-                err = ("Could not load json schema, "
-                       "Obtained error {}".format(e))
-
-                self._log.error(err)
-                raise Exception(err)
-
-    @property
-    def _collection_names(self) -> list:
-        '''
-        '''
-        # Don't use strip() instaed of replace since schema_c.strip(schema_)
-        # will discard the c as well which is not a appropriate output
-        return [os.path.basename(p).replace("schema_","").split(".")[0] for p in self._schema_paths]
-
-    def get_fields(self) -> list:
-        '''
-        '''
-        return self._parse()
-
-    def get_fields_restricted_to_collection(self, collection_name: str) -> list:
-        '''
-        '''
-        schemas = [self.schemas[self._collection_names.index(collection_name)]]
-        return self._parse(schemas=schemas)
-
-    def get_required_fields(self) -> list:
-        '''
-        '''
-        return self._parse(required_only=True)
-
-    def get_mongo_types(self) -> dict:
-        '''
-        '''
-        return self._parse(field_info="bsonType")
-
-    def get_datetime_fields(self):
-        '''
-        '''
-        mongo_types = self.get_mongo_types()
-
-        return [k for k, v in mongo_types.items()
-                if v in ["date", "timestamp", "Date", "Timestamp"]]
-
-    def get_python_types(self) -> dict:
-        '''
-        '''
-        mongo_types = self.get_mongo_types()
-        python_types = {}
-
-        bson_to_python_types = {"double": float,
-                                "decimal": float,
-                                "string": str,
-                                "object": object,
-                                "array": list,
-                                "bool": bool,
-                                "int": int,
-                                "long": int,
-                                "date": np.dtype('<M8[ns]'),
-                                "timestamp": np.dtype('<M8[ns]')
-                                }
-
-        for k, v in mongo_types.items():
-
-            if isinstance(v, list):
-                if ("date" in v) or ("timestamp" in v):
-                    v = "date"
-                elif "string" in v:
-                    v = "string"
-                elif ("double" in v) or ("decimal" in v):
-                    v = "double"
-                elif ("null" in v) and (len(v) == 2) and ("int" not in v):
-                    v = [t for t in v if type != "null"][0]
-                else:
-                    err = "Type {0}: {1} not convertibale".format(k, v)
-                    self._log.error(err)
-                    raise Exception(err)
-
-            if v in bson_to_python_types:
-                python_types[k] = bson_to_python_types[v]
-
-        return python_types
-
-    def get_patterns(self) -> dict:
-        '''
-        '''
-        return self._parse(field_info="pattern")
-
-    def get_default_values(self) -> dict:
-        '''
-        '''
-        return self._parse(field_info="default")
-
-    def get_allowed_values(self) -> dict:
-        '''
-        '''
-        return self._parse(field_info="enum")
-
-    def get_maximum_value(self) -> dict:
-        '''
-        '''
-        return self._parse(field_info="maximum")
-
-    def get_minimum_value(self) -> dict:
-        '''
-        '''
-        return self._parse(field_info="minimum")
-
-    def get_max_items(self) -> dict:
-        '''
-        '''
-        return self._parse(field_info="maxItems")
-
-    def get_min_items(self) -> dict:
-        '''
-        '''
-        return self._parse(field_info="minItems")
-
-    def get_field_descriptions(self) -> dict:
-        '''
-        '''
-        return self._parse(field_info="description")
-
-    def _parse(self,
-               field_info: str = None,
-               required_only: bool = False,
-               schemas: list = None):
-        '''
-        '''
-        if schemas is None:
-            schemas = self.schemas
-
-        result = self._parse_one(schema=schemas[0],
-                                 field_info=field_info,
-                                 required_only=required_only)
-
-        for schema in schemas[1:]:
-
-            next_result = self._parse_one(schema=schema,
-                                          field_info=field_info,
-                                          required_only=required_only)
-
-            if isinstance(result, list):
-                result.extend(next_result)
-            else:
-                result.update(next_result)
-
-        return result
-
-    def _parse_one(self,
-                   schema: dict,
-                   field_info: str = None,
-                   required_only: bool = False,
-                   super_field_name: str = None,
-                   already_parsed: (list, dict) = None) -> (list, dict):
-        '''
-        Recursive function that returns a list of (nested) field names or
-        a dictionary of (nested) field names with field characteristics.
-
-        :param schema: if None => entire self.schema, or a sub-schema
-            of self.schema
-
-        :param field_info: optional, if provided a dictionary of field
-            names with field characteristics is returned (for examples
-            bsonType of each field), else a list of fields is returned
-
-        :param required_only: when True, only returns fields marked as
-            required in the mongo schema
-
-        :param super_field_name: needed for recursion
-            Example: the field 'article' has
-            subfields 'id' and 'supplier'.
-            If we parse the sub-document corresponding to article, then
-            super_field_name is'article' and we might get an output like
-            {'article.id': string, 'article.supplier': string}
-
-        :param alread_parsed: needed for recursion
-
-        '''
-        schema = deepcopy(schema)
-
-        assert(isinstance(schema, dict)),\
-            "Parameter 'schema' must be a dict"
-
-        if field_info is None:
-            # parse a list of fields
-            if already_parsed is None:
-                already_parsed = []
-            else:
-                assert(isinstance(already_parsed, list)),\
-                    "Parameter 'already_parsed' must be of type list"
-        else:
-            # parse a dictionary of field names with field characteristics
-            if already_parsed is None:
-                already_parsed = {}
-            else:
-                assert(isinstance(already_parsed, dict)),\
-                    "Parameter 'already_parsed' must be of type dict"
-
-        # If schema is nested, then
-        # either it is of bsonType object
-        # and the field information is stored under the key 'properties'
-        # or it is of bsonType array
-        # and the field information is stored in sub-schemas
-        # under the key 'items'
-
-        # if schema is of bsonType object
-        if "properties" in schema.keys():
-            if "required" in schema.keys():
-                required_subfields = schema["required"]
-            else:
-                required_subfields = []
-
-            for sub_field_name in schema["properties"].keys():
-
-                sub_schema = schema["properties"][sub_field_name]
-
-                # only process fields that are required
-                if required_only and\
-                        (sub_field_name not in required_subfields):
-                    pass
-                else:
-                    if super_field_name is not None:
-                        field_name = '.'.join([super_field_name,
-                                               sub_field_name])
-                    else:
-                        field_name = sub_field_name
-
-                    # if the given sub-field is nested, parse the
-                    # sub-schema corresponding to this sub-field
-                    self._parse_one(
-                            schema=sub_schema,
-                            super_field_name=field_name,
-                            field_info=field_info,
-                            already_parsed=already_parsed,
-                            required_only=required_only)
-
-        # if schema is of bsonType array
-        elif "items" in schema.keys():
-            # one schema for all items
-            if isinstance(schema["items"], dict):
-
-                sub_schema = schema["items"]
-
-                self._parse_one(schema=sub_schema,
-                                super_field_name=super_field_name,
-                                field_info=field_info,
-                                already_parsed=already_parsed,
-                                required_only=required_only)
-
-            # list of separate schemas for each item
-            elif isinstance(schema["items"], list):
-
-                for sub_schema in schema["items"]:
-                    self._parse_one(schema=sub_schema,
-                                    super_field_name=super_field_name,
-                                    field_info=field_info,
-                                    already_parsed=already_parsed,
-                                    required_only=required_only)
-            else:
-                raise Exception(('Schema is not composed correctly: '
-                                 'items must be a dictionary or a list'))
-        else:
-            # If neither properties nor items is in schema keys
-            # we reached the last level of nestedness,
-            # field information is stored in the schema keys.
-            field_name = super_field_name
-
-            if field_info is None:
-                already_parsed.append(field_name)
-            else:
-                if field_info in schema.keys():
-                    already_parsed[field_name] = schema[field_info]
-                else:
-                    pass
-
-        return already_parsed
-
-
-if __name__ == "__main__":
-
-    # Only for testing
-
-    schema_path = os.path.join(".", "mongo_schema", "schema_wheelsets.json")
-
-    if os.path.isfile(schema_path):
-
-        parse_obj = ParseJsonSchema(schema_paths=schema_path)
-
-        fields = parse_obj.get_fields()
-
-        required_fileds = parse_obj.get_required_fields()
-
-        patterns = parse_obj.get_patterns()
-
-        mongo_types = parse_obj.get_mongo_types()
-
-        python_types_except_dates = parse_obj.get_python_types()
-
-        datetime_fields = parse_obj.get_datetime_fields()
-
-        allowed_values = parse_obj.get_allowed_values()
-
-        descriptions = parse_obj.get_field_descriptions()

+ 0 - 165
cdplib/db_migration/ParseMapping.py

@@ -1,165 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-"""
-Created on Fri Sep 20 15:33:17 2019
-
-@author: tanya
-"""
-
-import os
-import sys
-import numpy as np
-sys.path.append(os.getcwd())
-from cdplib.log import Log
-
-class ParseMapping:
-    '''
-    '''
-    def __init__(self, mapping_path: str, log_name: str = "ParseMapping",
-                 source: str = "original_name", target: str = "mongo_name",
-                 target_collection: str = "mongo_collection"):
-        '''
-        '''
-        import json
-        from cdplib.log import Log
-
-        log = Log('Parse Mapping')
-
-        if not os.path.isfile(mapping_path):
-            err = "Mapping not found"
-            self._log.error(err)
-            raise FileNotFoundError(err)
-
-        try:
-            with open(mapping_path, "r") as f:
-                self._mapping = json.load(f)
-
-        except Exception as e:
-            err = ("Could not load mapping. "
-                   "Exit with error {}".format(e))
-            self._log.error(err)
-            raise Exception(err)
-
-        self._source = source
-        self._target = target
-        self._target_collection = target_collection
-
-    def get_field_mapping(self) -> dict:
-        '''
-        '''
-        assert(all([set([self._source, self._target]) <= set(d)
-                    for d in self._mapping]))
-
-        return {d[self._source]: d[self._target] for d in self._mapping}
-
-    def _get_fields_satistisfying_condition(self, key: str, value) -> list:
-        '''
-        '''
-        assert(all([self._source in d for d in self._mapping])),\
-            "Invalid from field"
-
-        return [d[self._source] for d in self._mapping
-                if (key in d) and (d[key] == value)]
-
-    def get_required_fields(self) -> list:
-        '''
-        '''
-        return self._get_fields_satistisfying_condition(key="required",
-                                                        value=1)
-
-    def get_date_fields(self) -> list:
-        '''
-        '''
-        return self._get_fields_satistisfying_condition(key="type",
-                                                        value="Date")
-
-    def get_fields_restricted_to_collecton(self, collection_name: str) -> list:
-        '''
-        '''
-        return self._get_fields_satistisfying_condition(key=self._target_collection,
-                                                        value=collection_name)
-
-    def _get_info(self, key: str, value=None) -> dict:
-        '''
-        '''
-        assert(all([self._source in d for d in self._mapping])),\
-            "Invalid from field"
-
-        return {d[self._source]: d[key] for d in self._mapping
-                if (key in d) and ((value is not None)
-                and (d[key] == value)) or (key in d)}
-
-    def get_default_values(self) -> dict:
-        '''
-        '''
-        return self._get_info(key="default_values")
-
-    def get_date_formats(self) -> dict:
-        '''
-        '''
-        return self._get_info(key="date_format")
-
-    def get_types(self) -> dict:
-        '''
-        '''
-        return self._get_info(key="type")
-
-    def get_python_types(self) -> dict:
-        '''
-        '''
-        sql_to_python_dtypes = {
-                "Text": str,
-                "Date": np.dtype('<M8[ns]'),
-                "Double": float,
-                "Integer": int
-                }
-
-        sql_types = self.get_types()
-
-        return {k: sql_to_python_dtypes[v] for k, v in sql_types.items()}
-
-    def get_value_mappings(self) -> dict:
-        '''
-        '''
-        return self._get_info(key="value_mapping")
-
-    def get_column_numbers(self) -> list:
-        '''
-        '''
-        if all(["column_number" in d for d in self._mapping]):
-            column_numbers = [d["column_number"] for d in self._mapping]
-
-        elif all(["column_number" not in d for d in self._mapping]):
-            column_numbers = list(range(len(self._mapping)))
-
-        else:
-            err = ("Incorrectly filled mapping. Column numbers should ",
-                   "either in all or in neither of the fields")
-            self.log.err(err)
-            raise Exception(err)
-
-        return column_numbers
-
-
-if __name__ == "__main__":
-
-    mapping_path = os.path.join(".", "migration_mappings", "rs0_mapping.json")
-
-    if os.path.isfile(mapping_path):
-
-        print("found mapping path")
-
-        parser = ParseMapping(mapping_path, source="internal_name",
-                              target="mongo_name")
-
-        internal_to_mongo_mapping = parser.get_field_mapping()
-
-        original_to_internal_mapping = parser.get_field_mapping()
-
-        default_values = parser.get_default_values()
-
-        types = parser.get_types()
-
-        column_numbers = parser.get_column_numbers()
-
-        print("Done testing!")

+ 0 - 8
cdplib/db_migration/Test.py

@@ -1,8 +0,0 @@
-class Test: 
-    
-    def __init__(self,name):
-        self.name=name
-        
-    def print_name(self):
-        print(self.name)
-        

+ 0 - 7
cdplib/db_migration/__init__.py

@@ -1,7 +0,0 @@
-from .Test import *
-from .DataFrameToCollection import *
-from .MigrationCleaning import *
-from .ParseDbSchema import *
-from .ParseJsonSchema import *
-from .ParseMapping import *
-

BIN
cdplib/db_migration/__pycache__/DataFrameToCollection.cpython-37.pyc


BIN
cdplib/db_migration/__pycache__/MigrationCleaning.cpython-37.pyc


BIN
cdplib/db_migration/__pycache__/ParseJsonSchema.cpython-37.pyc


BIN
cdplib/db_migration/__pycache__/ParseMapping.cpython-37.pyc


+ 1 - 1
cdplib/log.py

@@ -85,6 +85,6 @@ class Log():
     def log_and_raise_warning(self, message):
     def log_and_raise_warning(self, message):
         '''
         '''
         '''
         '''
-        self._loggger.warning(message)
+        self._logger.warning(message)
 
 
         raise Warning(message)
         raise Warning(message)

+ 4 - 3
cdplib/utils/CleaningUtils.py

@@ -37,7 +37,7 @@ class CleaningUtils:
 
 
         return converted
         return converted
 
 
-    def standarize_writing(self, s: str):
+    def standarize_writing(self, s: str, to_lowercase: bool = True):
         '''
         '''
         '''
         '''
         import re
         import re
@@ -54,9 +54,10 @@ class CleaningUtils:
         for char, correct_char in german_character_mapping.items():
         for char, correct_char in german_character_mapping.items():
             s = s.replace(char, correct_char)
             s = s.replace(char, correct_char)
 
 
-        s = s.lower()
+        if to_lowercase:
+            s = s.lower()
 
 
-        s = re.sub('[^0-9a-zA-Z]+', '_', s)
+        s = re.sub('[^0-9a-zA-Z]+', '_', s).lstrip("_").rstrip("_")
 
 
         return s
         return s
 
 

+ 5 - 2
cdplib/utils/ExceptionsHandler.py

@@ -20,12 +20,15 @@ class ExceptionsHandler:
         '''
         '''
         '''
         '''
 
 
-    def check_is_file(self, path):
+    def check_is_file(self, path, logger=None):
         '''
         '''
         '''
         '''
+        if logger is None:
+            logger = logging.getLogger()
+
         if not os.path.isfile(path):
         if not os.path.isfile(path):
             err = "File {} not found".format(path)
             err = "File {} not found".format(path)
-            self._log.error(err)
+            logger.error(err)
             raise FileNotFoundError(err)
             raise FileNotFoundError(err)
 
 
     def _check_column_abscence(self, columns: (str, list), data: pd.DataFrame,
     def _check_column_abscence(self, columns: (str, list), data: pd.DataFrame,

BIN
cdplib/utils/__pycache__/ClassLogging.cpython-37.pyc


BIN
cdplib/utils/__pycache__/CleaningUtils.cpython-37.pyc


BIN
cdplib/utils/__pycache__/ExceptionsHandler.cpython-37.pyc