Browse Source

added current version of libraries

tanja 4 years ago
parent
commit
4dc75c35a3

+ 63 - 0
cdplib/ExceptionsHandler.py

@@ -0,0 +1,63 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Fri Sep 27 14:20:58 2019
+
+@author: tanya
+"""
+
+import os
+import sys
+import logging
+import pandas as pd
+sys.path.append(os.getcwd())
+
+
+class ExceptionsHandler:
+    '''
+    '''
+    def __init__(self):
+        '''
+        '''
+
+    def check_is_file(self, path):
+        '''
+        '''
+        if not os.path.isfile(path):
+            err = "File {} not found".format(path)
+            self._log.error(err)
+            raise FileNotFoundError(err)
+
+    def _check_column_abscence(self, columns: (str, list), data: pd.DataFrame,
+                               error_or_warning: str, logger = None):
+        '''
+        '''
+        if logger is None:
+            logger = logging.getLogger()
+        if isinstance(columns, str):
+            columns = [columns]
+
+        for column in columns:
+
+            if column not in data.columns:
+                err = ("{} is not an internal column name".format(column))
+                getattr(logger, error_or_warning)(err)
+
+                if error_or_warning == "error":
+                    raise Exception(err)
+
+    def error_column_abscence(self, columns: (str, list), data: pd.DataFrame, logger = None):
+        '''
+        '''
+        return self._check_column_abscence(columns=columns,
+                                           data=data,
+                                           error_or_warning="error",
+                                           logger=logger)
+
+    def warn_column_abscence(self, columns: (str, list), data: pd.DataFrame, logger = None):
+        '''
+        '''
+        return self._check_column_abscence(columns=columns,
+                                           data=data,
+                                           error_or_warning="warning",
+                                           logger=logger)

+ 0 - 207
cdplib/configuration.py

@@ -1,207 +0,0 @@
-"""
-@author: Juegen Pannosch (welser project), modified by Tanja Zolotareva
-
-@description: Here we define a data-structure that contains arguments
-used throughout the project. Arguments (like data locations) that can differ
-from person to person are loaded from the ./.config file, arguments that should
-be the same fro everyone are defined directly in the data structure. All
-all changes in this script should be committed to git.
-"""
-
-# -*- coding: utf-8 -*-
-import os
-import configparser
-
-
-class Configuration:
-
-    def __init__(self,
-                 config_file: str = os.path.join(os.getcwd(), ".env")):
-        '''
-        '''
-        assert isinstance(config_file, str), "the config_file must be a string"
-
-        assert os.path.isfile(config_file), "config file was not found"
-
-        self._parse_ini_file(config_file)
-
-    def __getitem__(self, item):
-        '''
-        '''
-        if item in self._config:
-            return self._config[item]
-        else:
-            return None
-
-    def _parse_ini_file(self, config_file: str):
-        '''
-        '''
-        self._config = dict()
-
-        config = configparser.ConfigParser()
-        config.read(config_file)
-
-        for key in config:
-            self._config[key] = {}
-            sub_config = config[key]
-
-            for sub_key in sub_config:
-                name = sub_key.upper()
-                value = sub_config[sub_key]
-
-                self._config[key][name] = value if (value != '') else None
-
-    @property
-    def labeled_history_folder(self):
-        '''
-        '''
-        return os.path.join(self._config["LOCATIONS"]["DATA_DIR"],
-                            "Aufarbeitungsdaten/2018/Datenextrakt einsatzfähige Radsätze 2018")
-
-    @property
-    def unlabeled_history_yearly_folders(self):
-        '''
-        '''
-        folders = []
-
-        for year in ["2016", "2017", "2018"]:
-
-            folders.append(os.path.join(self._config["LOCATIONS"]["DATA_DIR"],
-                                        "Aufarbeitungsdaten",
-                                        year,
-                                        "Datenextrakt alle Radsätze {} ausgehend von der Station 110").format(year))
-
-        return folders
-
-    @property
-    def additional_data_folder(self):
-        '''
-        '''
-        return os.path.join(self._config["LOCATIONS"]["DATA_DIR"],
-                            "Info-Austausch")
-
-    @property
-    def columns_rs516(self):
-        '''
-        '''
-        return {0: "radsatznummer",
-                1: "positionsnummer",
-                2: "status",
-                3: "taetigkeitsname",
-                4: "datum",
-                5: "presskrafdiagram_min",
-                6: "presskrafdiagram_max",
-                7: "presskrafdiagram_wert"}
-
-    @property
-    def ihs_labels(self):
-        '''
-        For analysis we replace replace string IHS by an integer value,
-        can be useful for comparing IHS of two wheelsets
-        '''
-
-        ihs_labels = {"null": -1,
-                      "IS1": 0,
-                      "IS1L": 1,
-                      "IS2": 2,
-                      "IS3": 3}
-
-        return ihs_labels
-
-    @property
-    def schrott_schadcodes(self):
-        '''
-        If during the process one of the following schadcodes is assigned,
-         then the wheelset is scap and is removed from the process.
-         This should correspond to aufarbeitungstyp = 2 in rs0, but if there
-         was a delay (or a mistake) in the maintainance of the table
-         rs0, this might not be the case. Count as scap anyway.
-        '''
-        schadcodes_schrott = ["RSAUS"]
-
-        return schadcodes_schrott
-
-    @property
-    def schrott_taetigkeiten(self):
-        '''
-        If during the process one of the folling tätigkeiten is assigned,
-         then the wheelset is scap and is removed from the process.
-         This should correspond to aufarbeitungstyp = 2 in rs0 and (or)
-         to assignment of a corresponding schadcode. Data might contain
-         inconsistencies. If such an activity is assigned, count as scrap.
-        '''
-        taetigkeiten_schrott = ["RADSATZ AUSSCHEIDEN"]
-
-        return taetigkeiten_schrott
-
-    @property
-    def status_labels(self):
-        '''
-        Used to uniformize the column "Status" in the table rs1,
-         integer values convenient for analysis
-        '''
-        status_labels = {"Scheiden": 2,
-                         "Schlecht": 1,
-                         "Fertig": 0,
-                         "Gut": 0}
-
-        return status_labels
-
-    @property
-    def process_stages(self):
-        '''
-        For machine learning predictions we divide the process into
-         big stages, stages can be skipped dependeing on the IHS of the
-         wheelset. We use all information geathered during the previous
-         process stages to make predictions for the next stage.
-        '''
-        import networkx as nx
-
-        critical_stations = {"A": [421, 110]}
-
-        critical_stations["B"] = [130, 131]
-
-        critical_stations["C"] = [140, 141, 142, 150]
-
-        critical_stations["D"] = [410, 420]
-
-        critical_stations["E"] = [510, 511, 520, 521, 535,
-                                  530, 531, 516, 550]
-
-        critical_stations["F"] = [490, 480, 430, 170]
-
-        critical_stations["G"] = [595, 190, 630]
-
-        critical_stations["H"] = [640, 641]
-
-        critical_stations["I"] = [650, 560]
-
-        critical_stations["J"] = [675]
-
-        critical_stations["K"] = [690]
-
-        critical_stations["L"] = [710, 670]
-
-        stages_graph = nx.DiGraph()
-
-        for stage in critical_stations:
-            stages_graph.add_node(stage, stations=critical_stations[stage])
-
-        stages_graph.add_edge("A", "B")
-        stages_graph.add_edge("B", "C")
-        stages_graph.add_edge("C", "D")
-        stages_graph.add_edge("D", "E")
-        stages_graph.add_edge("D", "F")
-        stages_graph.add_edge("E", "G")
-        stages_graph.add_edge("F", "G")
-        stages_graph.add_edge("G", "H")
-        stages_graph.add_edge("H", "I")
-        stages_graph.add_edge("I", "J")
-        stages_graph.add_edge("J", "K")
-        stages_graph.add_edge("K", "L")
-
-        return stages_graph
-
-
-# singleton
-default = Configuration()

+ 63 - 0
cdplib/data_cleaning/DataCleaningUtils.py

@@ -0,0 +1,63 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Fri Sep 27 16:20:03 2019
+
+@author: tanya
+"""
+
+import pandas as pd
+import numpy as np
+
+
+class CleaningUtils:
+    '''
+    '''
+    def convert_dates(series: pd.Series, formats: (str, list)) -> pd.Series:
+        '''
+        '''
+        formats = list(formats)
+
+        converted = pd.Series([pd.to_datetime(np.nan)]*len(series))
+
+        for formt in formats:
+            if formt == "%d%m%Y":
+                missing_leading_zero = (series.astype(str).str.len() == 7)
+
+                series = series.astype(str)
+
+                series.loc[missing_leading_zero] = "0" +\
+                    series.loc[missing_leading_zero]
+
+            converted_this_format = pd.to_datetime(series,
+                                                   format=formt,
+                                                   errors="coerce")
+
+            converted.fillna(converted_this_format, inplace=True)
+
+        return converted
+
+    def standarize_writing(self, s: str, to_lowercase: bool = True):
+        '''
+        '''
+        import re
+
+        german_character_mapping = {"ß": "ss",
+                                    "ü": "ue",
+                                    "Ü": "Ue",
+                                    "ä": "ae",
+                                    "Ä": "Ae",
+                                    "ö": "oe",
+                                    "Ö": "Oe"}
+
+        s = s.encode('raw_unicode_escape').decode('raw_unicode_escape')
+        for char, correct_char in german_character_mapping.items():
+            s = s.replace(char, correct_char)
+
+        if to_lowercase:
+            s = s.lower()
+
+        s = re.sub('[^0-9a-zA-Z]+', '_', s).lstrip("_").rstrip("_")
+
+        return s
+

+ 111 - 12
cdplib/db_handlers/MongodbHandler.py

@@ -21,34 +21,81 @@ import pandas as pd
 import numpy as np
 import numpy as np
 
 
 sys.path.append(os.getcwd())
 sys.path.append(os.getcwd())
-from cdplib.log import Log
-from cdplib.configuration import default as cfg
+from libraries.log import Log
+from libraries.configuration import default as cfg
+from libraries.Singleton_Threadsafe import SingletonThreadsafe
 
 
-class MongodbHandler:
 
 
+class MongodbHandlerPool(metaclass=SingletonThreadsafe):
+    '''
     '''
     '''
 
 
+    def __init__(self, size: int = 10):
+        self._size = size
+        self._mongodb_handlers = [MongodbHandler() for _ in range(size)]
+
+    def aquire(self):
+        while not self._mongodb_handlers:
+            self._mongodb_handlers = [MongodbHandler() for _ in range(self._size)]
+            log.warning("Ran out of Mongodb handlers, 10 more have been added. Are you sure you've returned yours?")
+        return self._mongodb_handlers.pop()
+        
+    def release(self, mongodb_handler):
+        if len(self._mongodb_handlers) < self._size:
+            self._mongodb_handlers.append(mongodb_handler)
+
+
+class MongodbHandler:
+
     '''
     '''
 
 
-    def __init__(self, database_url: str = cfg['MONGO_DB']['URI'],
-                 database_name: str = cfg['MONGO_DB']['DATABASE_NAME']):
+    '''
+    pass
+    def __init__(self, database_url: str = None,
+                 database_name: str = None):
         '''
         '''
         :param str database_url: Url for the mongodb database
         :param str database_url: Url for the mongodb database
         :param str database_name: Name of the database the database handler should handle
         :param str database_name: Name of the database the database handler should handle
         '''
         '''
+        if database_url is None:
+
+            database_url = "mongodb://{0}:{1}@{2}:{3}"\
+                           .format(cfg["MONGO"]["MONGO_USER"],
+                                   cfg["MONGO"]["MONGO_PASSWORD"],
+                                   cfg["MONGO"]["MONGO_HOST"],
+                                   cfg["MONGO"]["MONGO_PORT"])
+
+            if database_name is None:
+
+                database_name = cfg["MONGO"]["MONGO_DATABASE_NAME"]
+
         assert(isinstance(database_url, str)),\
         assert(isinstance(database_url, str)),\
             "Parameter 'database_url' must be a string type"
             "Parameter 'database_url' must be a string type"
         assert(isinstance(database_name, str)),\
         assert(isinstance(database_name, str)),\
             "Parameter 'database_name' must be a string type"
             "Parameter 'database_name' must be a string type"
 
 
-        self._log = Log("\nMongodbHandler script")
+        self._log = Log("Mongodb Handler")
 
 
-        self._log.info('Mongodb Handler has been initialized')
         # Connect to the MongoDB
         # Connect to the MongoDB
         self._client = MongoClient(database_url)
         self._client = MongoClient(database_url)
         # Connect to the oebb_db database, or create it if it doesnt exist.
         # Connect to the oebb_db database, or create it if it doesnt exist.
         self._database = self._client[database_name]
         self._database = self._client[database_name]
 
 
+        self._database_name = database_name
+
+    def set_database(self, database_name):
+        self._database = self._client[database_name]
+
+    def drop_database(self):
+        '''
+        '''
+        self._client.drop_database(self._database_name)
+
+    def drop_collection(self, collection_name: str):
+        '''
+        '''
+        self._database[collection_name].drop()
+
     def _read_schema(self, schema_path: str) -> dict:
     def _read_schema(self, schema_path: str) -> dict:
         '''
         '''
         :param str schema_path: path to the schema file.
         :param str schema_path: path to the schema file.
@@ -60,11 +107,29 @@ class MongodbHandler:
         with open(schema_path) as json_file:
         with open(schema_path) as json_file:
             schema = json.load(json_file)
             schema = json.load(json_file)
 
 
-        if 'definitions' in schema:
+        definitions_flag = self._analyze_schema(schema)
+
+        if definitions_flag:
             schema = self._dereference_schema(schema)
             schema = self._dereference_schema(schema)
 
 
         return schema
         return schema
 
 
+    def _analyze_schema(self, schema: dict, definitions_flag: bool = False) -> dict:
+
+
+        for key in schema:
+            if key == 'definitions':
+                definitions_flag = True
+                return definitions_flag
+
+            if key == 'default' or key == 'default_values':
+                return self._remove_defaults(schema)
+
+            if type(schema[key]) == dict:
+                definitions_flag = self._analyze_schema(schema[key], definitions_flag)
+
+        return definitions_flag
+
     def _dereference_schema(self, schema: dict) -> dict:
     def _dereference_schema(self, schema: dict) -> dict:
         '''
         '''
         :param dict schema: dictionary containing a schema which uses references.
         :param dict schema: dictionary containing a schema which uses references.
@@ -78,6 +143,20 @@ class MongodbHandler:
         schema.pop('definitions', None)
         schema.pop('definitions', None)
         return schema
         return schema
 
 
+    def _remove_defaults(self, schema: dict) -> dict:
+        '''
+        :param dict schema: dictionary containing a schema which uses references.
+        '''
+        if 'default' in schema:
+            del schema['default']
+        if 'default_values' in schema:
+            del schema['default_values']
+        return schema
+
+
+        assert(isinstance(schema, dict)),\
+            "Parameter 'schema' must be a dictionary type"
+
     def set_collection_schema(self, collection_name: str, schema_path: str,
     def set_collection_schema(self, collection_name: str, schema_path: str,
                               validation_level: str = 'moderate',validation_action: str = 'error'):
                               validation_level: str = 'moderate',validation_action: str = 'error'):
         '''
         '''
@@ -152,6 +231,8 @@ class MongodbHandler:
 
 
             if isinstance(data, pd.DataFrame) and (len(data) == 1):
             if isinstance(data, pd.DataFrame) and (len(data) == 1):
                 data = data.iloc[0]
                 data = data.iloc[0]
+            elif type(data) is list:
+                data = data[0]
 
 
             self._database[collection_name].insert_one(data)
             self._database[collection_name].insert_one(data)
         else:
         else:
@@ -177,14 +258,32 @@ class MongodbHandler:
         '''
         '''
 
 
         '''
         '''
-        if attribute is None or attribute_value is None:
+        if attribute == None or attribute_value == None:
             data = self._database[collection_name].find()
             data = self._database[collection_name].find()
         else:
         else:
             data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}})
             data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}})
 
 
-        df = pd.DataFrame(list(data))
-        df.set_index('radsatznummer', inplace=True)
-        return df
+        if data.count() > 0:
+            df = pd.DataFrame(list(data))
+            df.set_index('radsatznummer', inplace=True)
+            return df
+        else:
+            self._log.warning(('No data for the query was found').format())
+
+    def aggregate_data_and_generate_dataframe(self, collection_name: str, aggregation_pipeline: list):
+
+        data = list(self._database[collection_name].aggregate(pipeline=aggregation_pipeline, allowDiskUse=True))
+
+        if len(data)> 0:
+            df = pd.DataFrame(data)
+            df.set_index('radsatznummer', inplace=True)
+            return df
+        else:
+            self._log.warning(('No data for the query was found').format())
+
+
+    def update_data_in_collection(self, query_label: str, query_value: str, update_label:str, update_value: str, collection_name:str):
+        self._database[collection_name].update_one({query_label:query_value}, {"$set": {update_label: update_value}})
 
 
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":

+ 47 - 14
cdplib/db_handlers/SQLHandler.py

@@ -15,7 +15,25 @@ import pandas as pd
 import warnings
 import warnings
 
 
 sys.path.append(os.getcwd())
 sys.path.append(os.getcwd())
+from libraries.Singleton_Threadsafe import SingletonThreadsafe
 
 
+class SQLHandlerPool(metaclass=SingletonThreadsafe):
+    '''
+    '''
+
+    def __init__(self, size: int = 10):
+        self._size = size
+        self._sql_handlers = [SQLHandler() for _ in range(size)]
+
+    def aquire(self):
+        while not self._sql_handlers:
+            self._sql_handlers = [SQLHandler() for _ in range(self._size)]
+            log.warning("Ran out of SQL handlers, 10 more have been added. Are you sure you've returned yours?")
+        return self._sql_handlers.pop()
+        
+    def release(self, mongodb_handler):
+        if len(self._sql_handlers) < self._size:
+            self._sql_handlers.append(mongodb_handler)
 
 
 class SQLHandler:
 class SQLHandler:
     '''
     '''
@@ -25,7 +43,7 @@ class SQLHandler:
     closing of a database connection,
     closing of a database connection,
      this avoids errors when parallelizing with multiprocessing.
      this avoids errors when parallelizing with multiprocessing.
     '''
     '''
-
+    pass
     def __init__(self, db_uri: str = None,
     def __init__(self, db_uri: str = None,
                  is_case_insensitive: bool = False):
                  is_case_insensitive: bool = False):
         '''
         '''
@@ -45,7 +63,13 @@ class SQLHandler:
         self._log = Log(name='SQLHandler')
         self._log = Log(name='SQLHandler')
 
 
         if db_uri is None:
         if db_uri is None:
-            db_uri = cfg["SQL_DB"]["URI"]
+
+            db_uri = "mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset=utf8&local_infile=1"\
+                     .format(cfg["SQL"]["SQL_USER"],
+                             cfg["SQL"]["SQL_PASSWORD"],
+                             cfg["SQL"]["SQL_HOST"],
+                             cfg["SQL"]["SQL_PORT"],
+                             cfg["SQL"]["SQL_DATABASE_NAME"])
 
 
         assert(isinstance(db_uri, str)),\
         assert(isinstance(db_uri, str)),\
             "Parameter 'db_uri' must be of type str"
             "Parameter 'db_uri' must be of type str"
@@ -76,7 +100,7 @@ class SQLHandler:
 
 
         self._is_case_insensitive = is_case_insensitive
         self._is_case_insensitive = is_case_insensitive
 
 
-        self._engine = sqlalchemy.create_engine(self._db_uri)
+        self._engine = engine
 
 
     @property
     @property
     def _connection_params(self) -> dict:
     def _connection_params(self) -> dict:
@@ -117,7 +141,9 @@ class SQLHandler:
         '''
         '''
         database = self._connection_params["db"]
         database = self._connection_params["db"]
         self.execute("DROP DATABASE IF EXISTS {}".format(database))
         self.execute("DROP DATABASE IF EXISTS {}".format(database))
-
+        self._engine.execute("CREATE DATABASE {}".format(database))
+        self._engine.execute("USE {}".format(database))
+        
     @property
     @property
     def _db_metadata(self) -> dict:
     def _db_metadata(self) -> dict:
         '''
         '''
@@ -177,7 +203,7 @@ class SQLHandler:
         '''
         '''
         connection = self._engine.connect()
         connection = self._engine.connect()
         transaction = connection.begin()
         transaction = connection.begin()
-
+    
         errors = []
         errors = []
 
 
         # in the case of multi-query execute each query
         # in the case of multi-query execute each query
@@ -242,7 +268,7 @@ class SQLHandler:
 
 
     def check_if_table_exists(self, tablename: str,
     def check_if_table_exists(self, tablename: str,
                               schema: str = None,
                               schema: str = None,
-                              query: str = None):
+                              query: str = None) -> bool:
         '''
         '''
         Tries to retrieve table information from database with given query.
         Tries to retrieve table information from database with given query.
         If this does not work, tries to select one row from the given table,
         If this does not work, tries to select one row from the given table,
@@ -478,7 +504,7 @@ class SQLHandler:
             data = pd.read_sql(sql=query,
             data = pd.read_sql(sql=query,
                                con=connection,
                                con=connection,
                                **read_sql_kwargs)
                                **read_sql_kwargs)
-
+            #self._engine.dispose()
             connection.close()
             connection.close()
             return data
             return data
 
 
@@ -528,16 +554,24 @@ class SQLHandler:
         try:
         try:
             connection = self._engine.connect()
             connection = self._engine.connect()
 
 
-            data.to_sql(name=tablename,
-                        schema=schema,
-                        con=connection,
-                        if_exists='append',
-                        **to_sql_kwargs)
+            if self.check_if_table_exists(tablename=tablename, schema=schema):
+
+                data.to_sql(name=tablename,
+                            schema=schema,
+                            con=connection,
+                            if_exists='append',
+                            **to_sql_kwargs)
+            else:
+
+                self.overwrite_table(data=data,
+                                     tablename=tablename,
+                                     schema=schema,
+                                     to_sql_kwargs=to_sql_kwargs)
 
 
             connection.close()
             connection.close()
 
 
         except Exception as e:
         except Exception as e:
-            err = ("Could append data to the table {0}. "
+            err = ("Could not append data to the table {0}. "
                    "Finished with error {1}").format(tablename, e)
                    "Finished with error {1}").format(tablename, e)
 
 
             self._log.error(err)
             self._log.error(err)
@@ -566,7 +600,6 @@ class SQLHandler:
                         con=connection,
                         con=connection,
                         if_exists='replace',
                         if_exists='replace',
                         **to_sql_kwargs)
                         **to_sql_kwargs)
-
             connection.close()
             connection.close()
 
 
         except Exception as e:
         except Exception as e:

BIN
cdplib/db_handlers/__pycache__/MongodbHandler.cpython-37.pyc


BIN
cdplib/db_handlers/__pycache__/SQLHandler.cpython-37.pyc


+ 98 - 54
cdplib/db_migration/DataFrameToCollection.py

@@ -10,16 +10,17 @@ Created on Mon Jul 22 11:05:47 2019
 """
 """
 
 
 import pandas as pd
 import pandas as pd
+import numpy as np
 import os
 import os
 import sys
 import sys
 
 
 sys.path.append(os.getcwd())
 sys.path.append(os.getcwd())
 
 
 
 
-class DataFrameToCollection:
+class DataFrameToCollection():
     '''
     '''
     '''
     '''
-    def __init__(self, schema_path: str = None, log_path: str = None):
+    def __init__(self, schema_path: str):
         '''
         '''
         '''
         '''
         from libraries.log import Log
         from libraries.log import Log
@@ -27,32 +28,29 @@ class DataFrameToCollection:
 
 
         self._log = Log("ParseJsonSchema")
         self._log = Log("ParseJsonSchema")
 
 
-        if schema_path is not None:
 
 
-            if not os.path.isfile(schema_path):
-                err = "JsonSchema not found"
-                self._log.error(err)
-                raise FileNotFoundError(err)
+        if not os.path.isfile(schema_path):
+            err = "JsonSchema not found"
+            self._log.error(err)
+            raise FileNotFoundError(err)
 
 
-            # load schema to dictionary if it is a valid json file
-            try:
-                with open(schema_path, "r") as f:
-                    self.schema = json.load(f)
+        # load schema to dictionary if it is a valid json file
+        try:
+            with open(schema_path, "r") as f:
+                self.schema = json.load(f)
 
 
-            except Exception as e:
-                err = ("Could not load json schema, "
-                       "Obtained error {}".format(e))
+        except Exception as e:
+            err = ("Could not load json schema, "
+                   "Obtained error {}".format(e))
 
 
-                self._log.error(err)
-                raise Exception(err)
+            self._log.error(err)
+            raise Exception(err)
 
 
-        else:
-            self.schema = None
 
 
     def to_list_of_documents(self, data: pd.DataFrame,
     def to_list_of_documents(self, data: pd.DataFrame,
                              grp_fields: list,
                              grp_fields: list,
                              schema: dict = None,
                              schema: dict = None,
-                             _return_data: bool = False) -> list:
+                             _final_step: bool = True) -> list:
         '''
         '''
         Reshapes a pandas dataframe to a list of documents according
         Reshapes a pandas dataframe to a list of documents according
          to a complex (json) mongodb schema
          to a complex (json) mongodb schema
@@ -63,11 +61,14 @@ class DataFrameToCollection:
 
 
          Remark2: if the schema is stored as a json file, first load it
          Remark2: if the schema is stored as a json file, first load it
          to a dictionary with the help of the python json module
          to a dictionary with the help of the python json module
+
+         The function goes recurisively through all the fields and reshapes
+         them correspondingly depending on whether the field is an array,
+         an object, or simple field. For each field we group the data by the
+         grp_fields and reshape it accordingly, the result is a pandas Series.
+         In the end all the series are collected and concatenated.
         '''
         '''
         from copy import deepcopy
         from copy import deepcopy
-        from libraries.log import Log
-
-        log = Log("reshape_dataframe_to_list_of_documents:")
 
 
         data = self._melt_duplicated_columns(data)
         data = self._melt_duplicated_columns(data)
 
 
@@ -88,18 +89,18 @@ class DataFrameToCollection:
 
 
                 grp_fields = [c for c in grp_fields if c in data.columns]
                 grp_fields = [c for c in grp_fields if c in data.columns]
 
 
-                n_distinct_values = data.groupby(grp_fields)[field].nunique()\
-                                        .max()
+                # check that there is only one possible value of this field
+                n_distinct_values = data.groupby(grp_fields, sort=False)[field].nunique().max()
 
 
                 if n_distinct_values != 1:
                 if n_distinct_values != 1:
                     err = "Field {0} is not unique with respect to {1}"\
                     err = "Field {0} is not unique with respect to {1}"\
                           .format(field, grp_fields)
                           .format(field, grp_fields)
 
 
-                    log.error(err)
+                    self._log.error(err)
                     raise Exception(err)
                     raise Exception(err)
 
 
                 if field not in grp_fields:
                 if field not in grp_fields:
-                    reshaped_field = data.groupby(grp_fields)[field].first()
+                    reshaped_field = data.groupby(grp_fields, sort=False)[field].first()
                 else:
                 else:
                     reshaped_field =\
                     reshaped_field =\
                         data[grp_fields].drop_duplicates()\
                         data[grp_fields].drop_duplicates()\
@@ -121,7 +122,7 @@ class DataFrameToCollection:
                             data=data,
                             data=data,
                             schema=sub_schema,
                             schema=sub_schema,
                             grp_fields=grp_fields,
                             grp_fields=grp_fields,
-                            _return_data=True)
+                            _final_step=False)
 
 
                 reshaped_field = sub_data.apply(self._make_dict, axis=1)
                 reshaped_field = sub_data.apply(self._make_dict, axis=1)
                 reshaped_field.name = field
                 reshaped_field.name = field
@@ -143,17 +144,16 @@ class DataFrameToCollection:
                          sub_schema["properties"].items()}
                          sub_schema["properties"].items()}
 
 
                     # extend grp fields by sub-fields of field simple types
                     # extend grp fields by sub-fields of field simple types
-                    sub_grp_fields =\
-                        [f for f in sub_schema["properties"]
-                         if sub_schema["properties"][f]["bsonType"]
-                         not in ["array", "object"]]
+                    sub_grp_fields = [f for f in sub_schema["properties"]
+                                      if (sub_schema["properties"][f]["bsonType"] not in ["array", "object"])
+                                      and (f in data.columns)]
 
 
                     if len(sub_grp_fields) == 0:
                     if len(sub_grp_fields) == 0:
                         err = ("One of the sub-keys in a list of documents"
                         err = ("One of the sub-keys in a list of documents"
                                " must be of simple type for the field {}"
                                " must be of simple type for the field {}"
                                .format(field))
                                .format(field))
 
 
-                        log.error(err)
+                        self._log.error(err)
                         raise Exception(err)
                         raise Exception(err)
 
 
                     # group and reshape sub-fields with complex types
                     # group and reshape sub-fields with complex types
@@ -161,7 +161,7 @@ class DataFrameToCollection:
                                 data=data,
                                 data=data,
                                 schema=sub_schema,
                                 schema=sub_schema,
                                 grp_fields=grp_fields + sub_grp_fields,
                                 grp_fields=grp_fields + sub_grp_fields,
-                                _return_data=True)
+                                _final_step=False)
 
 
                     if sub_data is not None:
                     if sub_data is not None:
 
 
@@ -172,61 +172,86 @@ class DataFrameToCollection:
                         sub_data = sub_data.reset_index(grp_fields)
                         sub_data = sub_data.reset_index(grp_fields)
 
 
                         reshaped_field =\
                         reshaped_field =\
-                            sub_data.groupby(grp_fields)[field]\
+                            sub_data.groupby(grp_fields, sort=False)[field]\
                                     .apply(self._make_list_of_distinct)
                                     .apply(self._make_list_of_distinct)
 
 
                         reshaped_fields.append(reshaped_field)
                         reshaped_fields.append(reshaped_field)
 
 
                 # if field is a list of values with simple type
                 # if field is a list of values with simple type
+                elif items_type == "array":
+
+                    grp_fields = [c for c in grp_fields if c in data.columns]
+
+                    if field in data.columns:
+
+                        reshaped_field = data.groupby(grp_fields, sort=False)[field]\
+                                             .apply(self._make_list_of_distinct)
+
+                        reshaped_fields.append(reshaped_field)
+
                 else:
                 else:
 
 
                     grp_fields = [c for c in grp_fields if c in data.columns]
                     grp_fields = [c for c in grp_fields if c in data.columns]
 
 
                     if field in data.columns:
                     if field in data.columns:
 
 
-                        reshaped_field = data.groupby(grp_fields)[field]\
-                                           .apply(self._make_list_of_distinct)
+                        reshaped_field = data.groupby(grp_fields, sort=False)[field]\
+                                             .apply(self._make_flattened_list_of_distinct)
 
 
                         reshaped_fields.append(reshaped_field)
                         reshaped_fields.append(reshaped_field)
 
 
         if len(reshaped_fields) > 0:
         if len(reshaped_fields) > 0:
-            reshaped_data = pd.concat(reshaped_fields, axis=1)
-
-            if not _return_data:
 
 
-                list_of_documents =\
-                    reshaped_data.drop(list(reshaped_data.index.names),
-                                       axis=1, errors="ignore")\
-                                 .reset_index(drop=False)
+            reshaped_fields = pd.concat(reshaped_fields, sort=False, axis=1)
 
 
-                log.info("Done reshaping the dataframe to a list of documents")
+            if _final_step:
+                # dropping the index names if it is the final step,
+                # if not the index is needed for merging
+                reshaped_fields =\
+                    reshaped_fields.drop(list(reshaped_fields.index.names), axis=1, errors="ignore")\
+                                   .reset_index(drop=False)
 
 
-                return list_of_documents
+                self._log.info("Done reshaping the dataframe to a list of documents")
 
 
-            else:
+            return reshaped_fields
 
 
-                return reshaped_data
+        else:
+            return
 
 
     def _melt_duplicated_columns(self, data: pd.DataFrame) -> pd.DataFrame:
     def _melt_duplicated_columns(self, data: pd.DataFrame) -> pd.DataFrame:
         '''
         '''
         '''
         '''
+        data = data.copy(deep=True)
+
         for c in set(data.columns):
         for c in set(data.columns):
             if isinstance(data[c], pd.DataFrame):
             if isinstance(data[c], pd.DataFrame):
+                """
                 data = pd.melt(data, id_vars=[cc for cc in data.columns
                 data = pd.melt(data, id_vars=[cc for cc in data.columns
                                               if cc != c], value_vars=c)\
                                               if cc != c], value_vars=c)\
                          .drop("variable", axis=1)\
                          .drop("variable", axis=1)\
                          .rename(columns={"value": c})
                          .rename(columns={"value": c})
+                """
+                data["temp"] = data[c].apply(self._make_list, axis=1)
+                data.drop(c, axis=1, inplace=True)
+                data = data.rename(columns={"temp": c})
 
 
         return data
         return data
 
 
     def _make_dict(self, x: pd.Series) -> dict:
     def _make_dict(self, x: pd.Series) -> dict:
         '''
         '''
-        return: transforms pandas series to a dictionary
+        Transforms pandas series to a dictionary
          is meant to be applied to a dataframe in axis = 1,
          is meant to be applied to a dataframe in axis = 1,
          then the index of the input series are the column names
          then the index of the input series are the column names
          of the dataframe
          of the dataframe
         '''
         '''
-        return {f.split(".")[-1]: x[f] for f in x.index}
+        def custom_is_null(y):
+            if isinstance(pd.notnull(y), bool):
+                return pd.notnull(y)
+            else:
+                return True
+
+        return {f.split(".")[-1]: x[f] for f in x.index
+                if custom_is_null(x[f])}
 
 
     def _make_list(self, x: pd.Series) -> list:
     def _make_list(self, x: pd.Series) -> list:
         '''
         '''
@@ -240,16 +265,35 @@ class DataFrameToCollection:
          entries are arbitrary objects
          entries are arbitrary objects
          (pandas unique() method does not work if entries are of complex types)
          (pandas unique() method does not work if entries are of complex types)
         '''
         '''
-        distinct = []
-        [distinct.append(obj) for obj in x if obj not in distinct]
-        return distinct
+        uniques = pd.DataFrame({"temp": x.tolist()})\
+                    .assign(temp_str=lambda y: y["temp"].astype(str))\
+                    .drop_duplicates(subset=["temp_str"])\
+                    .drop("temp_str", axis=1).iloc[:, 0].tolist()
+
+        def is_empty(y):
+            is_empty_dict = (isinstance(y, dict) and (len(y) == 0))
+            is_empty_list = (isinstance(y, list) and (len(y) == 0))
+            return is_empty_dict or is_empty_list
+
+        return [el for el in uniques if not is_empty(el)]
+
+    def _make_flattened_list_of_distinct(self, x: pd.Series) -> list:
+        '''
+        return: list of unique values from a Series where
+         entries are arbitrary objects
+         (pandas unique() method does not work if entries are of complex types)
+        '''
+        uniques = self._make_list_of_distinct(x)
+        return uniques[0]
 
 
-    def _unroll_nested_names(self, columns: list) -> list:
+    def _unroll_nested_names(self, names: list) -> list:
         '''
         '''
+        Example: transform a list ["name.firstname", "name.surname"]
+        into ["name", "name.firstname", "name.surname"]
         '''
         '''
         unrolled = []
         unrolled = []
 
 
-        for c in columns:
+        for c in names:
             splitted = c.split(".")
             splitted = c.split(".")
             for i in range(len(splitted)):
             for i in range(len(splitted)):
                 unrolled.append(".".join(splitted[:i+1]))
                 unrolled.append(".".join(splitted[:i+1]))

+ 156 - 0
cdplib/db_migration/FlattenData.py

@@ -0,0 +1,156 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Wed Oct 9 15:17:34 2019
+
+@author: oskar
+@description: Class which flattens nested Dataframes, Dictionaries and Lists into tabular form
+"""
+
+import sys
+import os
+import time
+import pandas as pd
+import copy
+sys.path.append(os.getcwd())
+from libraries.log import Log
+log = Log("Flatten data")
+
+class FlattenData():
+
+    def __init__(self):
+        log.info('Flatten Data Initialized')
+
+    def flatten(self, data):
+        '''
+        :parm data: data given in either dictionary, list or dataframe format.
+        '''
+
+        assert(isinstance(data, (list, dict, pd.DataFrame))),\
+            "Parameter 'data' either be of List, Dictionary or DataFrame type"
+
+        start = time.time()
+        if type(data) is pd.DataFrame:
+            return_data = self.flatten_dataframe(data)
+            print(('Data has been flattened in {} seconds').format(time.time()-start))
+            return return_data
+        if type(data) is dict:
+            return self.flatten_dict(data)
+        if type(data) is list:
+            return self.flatten_list(data)
+
+    def flatten_dataframe(self, dataframe: pd.DataFrame, incoming_key: str = None):
+        '''
+        :param pd.Dataframe dataframe: dataframe containing the data to be flattened
+        :param str incoming_key: string to be appended to the key
+        '''
+        assert(isinstance(dataframe, pd.DataFrame)),\
+            "Parameter 'dataframe' be of DataFrame type"
+        assert(isinstance(incoming_key, str)),\
+            "Parameter 'incoming_key' be of String type"
+
+        result_dict = {}
+        for index, row in dataframe.iterrows():
+            temp_result_dict = {}
+            for key, value in row.iteritems():
+                temp_result = {}
+                if incoming_key is not None:
+                    key = incoming_key + '_' + key
+                if type(value) == list:
+                    temp_result = self.flatten_list(value, key)
+                elif type(value) == dict:
+                    temp_result = self.flatten_dict(value, key)
+                else:
+                    temp_result_dict[key] = value
+
+                if len(temp_result) > 0:
+                    result_dict = self.append_to_dict(result_dict, temp_result)
+
+            result_dict[index] = copy.deepcopy(temp_result_dict)
+        
+        result_dataframe = pd.DataFrame.from_dict(result_dict, orient='index')
+        return result_dataframe
+
+    def flatten_dict(self, dictionary: dict, incoming_key: str = None):
+        '''
+        :param dict dictionary: dictionary containing the data to be flattened
+        :param str incoming_key: string to be appended to the key
+        '''
+        assert(isinstance(dictionary, pd.DataFrame)),\
+            "Parameter 'dictionary' be of Dictionary type"
+        assert(isinstance(incoming_key, str)),\
+            "Parameter 'incoming_key' be of String type"
+
+
+        result_dict = {}
+        for key in dictionary:
+
+            temp_dataframe = dictionary[key]
+            temp_result = {}
+            if incoming_key is not None:
+                key = incoming_key + '_' + key
+            if type(temp_dataframe) == list:
+                temp_result = self.flatten_list(temp_dataframe, key)
+            elif type(temp_dataframe) == dict:
+                temp_result = self.flatten_dict(temp_dataframe, key)
+            else:
+                result_dict[key] = temp_dataframe
+
+            if len(temp_result) > 0:
+                result_dict = self.append_to_dict(result_dict, temp_result)
+
+        return result_dict
+
+    def flatten_list(self, data_list: list, incoming_key: str = None):
+        '''
+        :param list data_list: list containing the data to be flattened
+        :param str incoming_key: string to be appended to the key
+        '''
+
+        assert(isinstance(data_list, pd.DataFrame)),\
+            "Parameter 'data_list' be of List type"
+        assert(isinstance(incoming_key, str)),\
+            "Parameter 'incoming_key' be of String type"
+
+        result_dict = {}
+        for iteration, item in enumerate(data_list):
+
+            temp_dataframe = item
+            temp_result = {}
+            key = incoming_key
+            if incoming_key is not None:
+                if type(data_list[iteration]) is dict:
+                    if 'stationsnummer' in data_list[iteration].keys() and 'stage' in data_list[iteration].keys() :
+                        
+                        key = incoming_key + '_' + str(data_list[iteration]['stationsnummer']) + '_' + str(data_list[iteration]['stage'])
+                else:
+                    key = incoming_key + '_' + str(iteration)
+            if type(temp_dataframe) == list:
+                temp_result = self.flatten_list(temp_dataframe, key)
+                result_dict = self.append_to_dict(result_dict, temp_result)
+            
+            elif type(temp_dataframe) == dict:
+                temp_result = self.flatten_dict(temp_dataframe, key)
+                result_dict = self.append_to_dict(result_dict, temp_result)
+            else:
+                result_dict[key] = temp_dataframe
+
+            if len(temp_result) > 0:
+                result_dict = self.append_to_dict(result_dict, temp_result)
+                
+        return result_dict
+
+    def append_to_dict(self, dictionary: dict, to_append):
+        '''
+        :param dict dictionary: dictionary which holds all the resulting data.
+        :param dict to_append: data to be added to the resulting dictionary.
+        '''
+        assert(isinstance(dictionary, (list, dict))),\
+            "Parameter 'dictionary' be of Dictionary type"
+        assert(isinstance(to_append, dict)),\
+            "Parameter 'to_append' be of Dictionary type"
+
+        for key in to_append:
+            dictionary[key] = to_append[key]
+        
+        return dictionary

+ 79 - 35
cdplib/db_migration/MigrationCleaning.py

@@ -16,11 +16,11 @@ sys.path.append(os.getcwd())
 
 
 from libraries.db_migration.ParseMapping import ParseMapping
 from libraries.db_migration.ParseMapping import ParseMapping
 from libraries.db_migration.ParseJsonSchema import ParseJsonSchema
 from libraries.db_migration.ParseJsonSchema import ParseJsonSchema
-from libraries.utils.ClassLogging import ClassLogging
+from libraries.utils.ExceptionsHandler import ExceptionsHandler
 from libraries.utils.CleaningUtils import CleaningUtils
 from libraries.utils.CleaningUtils import CleaningUtils
+from libraries.log import Log
 
 
-
-class MigrationCleaning(ClassLogging):
+class MigrationCleaning:
     '''
     '''
     Class for correcting and filtering the incorrect data.
     Class for correcting and filtering the incorrect data.
     We keep the correcting and the filtering methods separated,
     We keep the correcting and the filtering methods separated,
@@ -33,12 +33,12 @@ class MigrationCleaning(ClassLogging):
                  mapping_source: str = "internal_name",
                  mapping_source: str = "internal_name",
                  mapping_target: str = "mongo_name",
                  mapping_target: str = "mongo_name",
                  mapping_parser: type = ParseMapping,
                  mapping_parser: type = ParseMapping,
-                 schema_parser: type = ParseJsonSchema,
-                 log_name: str = "MigrationCleaning"):
+                 schema_parser: type = ParseJsonSchema):
         '''
         '''
         '''
         '''
-        super().__init__(log_name=log_name)
-
+        self._log = Log('Migration Cleaning')
+        self._exception_handler = ExceptionsHandler()
+        
         assert isinstance(inconsist_report_table, str),\
         assert isinstance(inconsist_report_table, str),\
             "Inconsistent report table should be a tablename string"
             "Inconsistent report table should be a tablename string"
 
 
@@ -58,6 +58,9 @@ class MigrationCleaning(ClassLogging):
         self._mapping_path = mapping_path
         self._mapping_path = mapping_path
         self._schema_paths = schema_paths
         self._schema_paths = schema_paths
 
 
+        from libraries.db_handlers.SQLHandler import SQLHandlerPool
+        self._sql_db = SQLHandlerPool(20)
+
     def _assert_dataframe_input(self, data: pd.DataFrame):
     def _assert_dataframe_input(self, data: pd.DataFrame):
         '''
         '''
         '''
         '''
@@ -220,19 +223,26 @@ class MigrationCleaning(ClassLogging):
 
 
         data = data.copy(deep=True)
         data = data.copy(deep=True)
 
 
-        db = SQLHandler()
+        db = self._sql_db.aquire()#SQLHandler()
 
 
         if invalid_mask.sum() == 0:
         if invalid_mask.sum() == 0:
 
 
+            self._sql_db.release(db)
             return data
             return data
 
 
         data_inconsist = data.assign(reason=reason)\
         data_inconsist = data.assign(reason=reason)\
                              .loc[invalid_mask]\
                              .loc[invalid_mask]\
                              .reset_index(drop=True)
                              .reset_index(drop=True)
 
 
+        if db.check_if_table_exists(self._inconsist_report_table):
+            columns = db.get_column_names(tablename=self._inconsist_report_table)
+
+            if len(columns) > 0:
+                data_inconsist = data_inconsist[columns]
+
         db.append_to_table(data=data_inconsist,
         db.append_to_table(data=data_inconsist,
                            tablename=self._inconsist_report_table)
                            tablename=self._inconsist_report_table)
-
+       
         n_rows_filtered = len(data_inconsist)
         n_rows_filtered = len(data_inconsist)
         n_instances_filtered = len(data_inconsist[self._filter_index_columns].drop_duplicates())
         n_instances_filtered = len(data_inconsist[self._filter_index_columns].drop_duplicates())
 
 
@@ -255,6 +265,8 @@ class MigrationCleaning(ClassLogging):
 
 
         data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True)
         data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True)
 
 
+        self._sql_db.release(db)
+
         return data
         return data
 
 
     def _replace_values(self, data: pd.DataFrame,
     def _replace_values(self, data: pd.DataFrame,
@@ -303,7 +315,7 @@ class MigrationCleaning(ClassLogging):
 
 
             except Exception as e:
             except Exception as e:
 
 
-                self.log_and_raise(("Failed to replace {0} values "
+                self._exception_handler.log_and_raise(("Failed to replace {0} values "
                                     "in {1}. Exit with error {2}"
                                     "in {1}. Exit with error {2}"
                                     .format(default_str, column, e)))
                                     .format(default_str, column, e)))
 
 
@@ -350,6 +362,7 @@ class MigrationCleaning(ClassLogging):
 
 
                 else:
                 else:
 
 
+                    data = data.copy(deep=True)
                     data[column] = data[column].astype(python_type)
                     data[column] = data[column].astype(python_type)
 
 
                 if data[column].dtype != python_type:
                 if data[column].dtype != python_type:
@@ -363,7 +376,7 @@ class MigrationCleaning(ClassLogging):
 
 
             except Exception as e:
             except Exception as e:
 
 
-                self.log_and_raise(("Failed to convert types in {0}. "
+                self._exception_handler.log_and_raise(("Failed to convert types in {0}. "
                                     "Exit with error {1}"
                                     "Exit with error {1}"
                                     .format(column, e)))
                                     .format(column, e)))
 
 
@@ -371,7 +384,7 @@ class MigrationCleaning(ClassLogging):
 
 
         return data
         return data
 
 
-    def filter_invalid_null_values(self, data: pd.DataFrame) -> pd.DataFrame:
+    def filter_invalid_missing_values(self, data: pd.DataFrame) -> pd.DataFrame:
         '''
         '''
         '''
         '''
         self._assert_dataframe_input(data)
         self._assert_dataframe_input(data)
@@ -397,7 +410,12 @@ class MigrationCleaning(ClassLogging):
         '''
         '''
         self._assert_dataframe_input(data)
         self._assert_dataframe_input(data)
 
 
-        for column, python_type in self._python_types.items():
+        for column in data.columns:
+
+            if column not in self._python_types:
+                continue
+
+            python_type = self._python_types[column]
 
 
             if data[column].dtype != python_type:
             if data[column].dtype != python_type:
 
 
@@ -419,7 +437,12 @@ class MigrationCleaning(ClassLogging):
         '''
         '''
         self._assert_dataframe_input(data)
         self._assert_dataframe_input(data)
 
 
-        for column, pattern in self._patterns:
+        for column in data.columns:
+
+            if column not in self._patterns:
+                continue
+
+            pattern = self._patterns[column]
 
 
             invalid_mask = (~data[column].astype(str).str.match(pattern))
             invalid_mask = (~data[column].astype(str).str.match(pattern))
 
 
@@ -431,41 +454,64 @@ class MigrationCleaning(ClassLogging):
 
 
         return data
         return data
 
 
-    def filter_notallowed_values(self, data: pd.DataFrame) -> pd.DataFrame:
+    def filter_invalid_values(self, data: pd.DataFrame) -> pd.DataFrame:
         '''
         '''
         '''
         '''
-        for column, value in self._minimum_values.items():
+        for column in data.columns:
 
 
-            invalid_mask = data[column] > value
+            if column in self._minimum_values:
 
 
-            reason = "Too large values in field {}".format(column)
+                min_value = self._minimum_values[column]
 
 
-            data = self._filter_invalid_data(data=data,
-                                             invalid_mask=invalid_mask,
-                                             reason=reason)
+                invalid_mask = data[column] > min_value
 
 
-        for column, value in self._maximum_values.items():
+                reason = "Too large values in field {}".format(column)
 
 
-            invalid_mask = data[column] < value
+                data = self._filter_invalid_data(data=data,
+                                                 invalid_mask=invalid_mask,
+                                                 reason=reason)
 
 
-            reason = "Too small values in field {}".format(column)
+            elif column in self._maximum_values:
 
 
-            data = self._filter_invalid_data(data=data,
-                                             invalid_mask=invalid_mask,
-                                             reason=reason)
+                max_value = self._maximum_values[column]
 
 
-        for column, allowed_values in self._allowed_values.items():
+                invalid_mask = data[column] < max_value
 
 
-            invalid_mask = (~data[column].isin(allowed_values))
+                reason = "Too small values in field {}".format(column)
 
 
-            reason = "Too small values in field {}".format(column)
+                data = self._filter_invalid_data(data=data,
+                                                 invalid_mask=invalid_mask,
+                                                 reason=reason)
 
 
-            data = self._filter_invalid_data(data=data,
-                                             invalid_mask=invalid_mask,
-                                             reason=reason)
+            elif column in self._allowed_values:
+
+                allowed_values = self._allowed_values[column]
+
+                invalid_mask = (~data[column].isin(allowed_values))
+
+                not_allowed_examples = data.loc[invalid_mask, column].unique()[:3]
+
+                reason = "Not allowed values {0}... in field {1}"\
+                         .format(not_allowed_examples, column)
+
+                data = self._filter_invalid_data(data=data,
+                                                 invalid_mask=invalid_mask,
+                                                 reason=reason)
+
+            else:
+                continue
 
 
         return data
         return data
 
 
+    def restrict_to_collection(self, data: pd.DataFrame, collection_name: str) -> pd.DataFrame:
+        '''
+        '''
+        mongo_fields = self._schema_parser.get_fields_restricted_to_collection(collection_name=collection_name)
+
+        fields = self._mapping_parser.get_fields_restricted_to_collecton(collection_name=collection_name)
+
+        return data[[c for c in data.columns if (c in fields) or (c in mongo_fields)]]
+
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
 
 
@@ -483,8 +529,6 @@ if __name__ == "__main__":
 
 
     if all([os.path.isfile(p) for p in schema_paths + [mapping_path]]):
     if all([os.path.isfile(p) for p in schema_paths + [mapping_path]]):
 
 
-        print("Found schemas!")
-
         cleaner = MigrationCleaning(
         cleaner = MigrationCleaning(
                 mapping_path=mapping_path,
                 mapping_path=mapping_path,
                 schema_paths=schema_paths,
                 schema_paths=schema_paths,

+ 38 - 16
cdplib/db_migration/ParseJsonSchema.py

@@ -39,6 +39,8 @@ class ParseJsonSchema(ParseDbSchema):
         if isinstance(schema_paths, str):
         if isinstance(schema_paths, str):
             schema_paths = [schema_paths]
             schema_paths = [schema_paths]
 
 
+        self._schema_paths = schema_paths
+
         self.schemas = []
         self.schemas = []
 
 
         for schema_path in schema_paths:
         for schema_path in schema_paths:
@@ -53,11 +55,25 @@ class ParseJsonSchema(ParseDbSchema):
                 self._log.error(err)
                 self._log.error(err)
                 raise Exception(err)
                 raise Exception(err)
 
 
+    @property
+    def _collection_names(self) -> list:
+        '''
+        '''
+        # Don't use strip() instaed of replace since schema_c.strip(schema_)
+        # will discard the c as well which is not a appropriate output
+        return [os.path.basename(p).replace("schema_","").split(".")[0] for p in self._schema_paths]
+
     def get_fields(self) -> list:
     def get_fields(self) -> list:
         '''
         '''
         '''
         '''
         return self._parse()
         return self._parse()
 
 
+    def get_fields_restricted_to_collection(self, collection_name: str) -> list:
+        '''
+        '''
+        schemas = [self.schemas[self._collection_names.index(collection_name)]]
+        return self._parse(schemas=schemas)
+
     def get_required_fields(self) -> list:
     def get_required_fields(self) -> list:
         '''
         '''
         '''
         '''
@@ -82,17 +98,17 @@ class ParseJsonSchema(ParseDbSchema):
         mongo_types = self.get_mongo_types()
         mongo_types = self.get_mongo_types()
         python_types = {}
         python_types = {}
 
 
-        bson_to_python_types_except_dates = {"double": float,
-                                             "decimal": float,
-                                             "string": str,
-                                             "object": object,
-                                             "array": list,
-                                             "bool": bool,
-                                             "int": int,
-                                             "long": int,
-                                             "date": np.dtype('<M8[ns]'),
-                                             "timestamp": np.dtype('<M8[ns]')
-                                             }
+        bson_to_python_types = {"double": float,
+                                "decimal": float,
+                                "string": str,
+                                "object": object,
+                                "array": list,
+                                "bool": bool,
+                                "int": int,
+                                "long": int,
+                                "date": np.dtype('<M8[ns]'),
+                                "timestamp": np.dtype('<M8[ns]')
+                                }
 
 
         for k, v in mongo_types.items():
         for k, v in mongo_types.items():
 
 
@@ -110,8 +126,8 @@ class ParseJsonSchema(ParseDbSchema):
                     self._log.error(err)
                     self._log.error(err)
                     raise Exception(err)
                     raise Exception(err)
 
 
-            if v in bson_to_python_types_except_dates:
-                python_types[k] = bson_to_python_types_except_dates[v]
+            if v in bson_to_python_types:
+                python_types[k] = bson_to_python_types[v]
 
 
         return python_types
         return python_types
 
 
@@ -157,14 +173,18 @@ class ParseJsonSchema(ParseDbSchema):
 
 
     def _parse(self,
     def _parse(self,
                field_info: str = None,
                field_info: str = None,
-               required_only: bool = False):
+               required_only: bool = False,
+               schemas: list = None):
         '''
         '''
         '''
         '''
-        result = self._parse_one(schema=self.schemas[0],
+        if schemas is None:
+            schemas = self.schemas
+
+        result = self._parse_one(schema=schemas[0],
                                  field_info=field_info,
                                  field_info=field_info,
                                  required_only=required_only)
                                  required_only=required_only)
 
 
-        for schema in self.schemas[1:]:
+        for schema in schemas[1:]:
 
 
             next_result = self._parse_one(schema=schema,
             next_result = self._parse_one(schema=schema,
                                           field_info=field_info,
                                           field_info=field_info,
@@ -238,6 +258,8 @@ class ParseJsonSchema(ParseDbSchema):
         if "properties" in schema.keys():
         if "properties" in schema.keys():
             if "required" in schema.keys():
             if "required" in schema.keys():
                 required_subfields = schema["required"]
                 required_subfields = schema["required"]
+            else:
+                required_subfields = []
 
 
             for sub_field_name in schema["properties"].keys():
             for sub_field_name in schema["properties"].keys():
 
 

+ 11 - 3
cdplib/db_migration/ParseMapping.py

@@ -10,19 +10,20 @@ import os
 import sys
 import sys
 import numpy as np
 import numpy as np
 sys.path.append(os.getcwd())
 sys.path.append(os.getcwd())
-
+from libraries.log import Log
 
 
 class ParseMapping:
 class ParseMapping:
     '''
     '''
     '''
     '''
     def __init__(self, mapping_path: str, log_name: str = "ParseMapping",
     def __init__(self, mapping_path: str, log_name: str = "ParseMapping",
-                 source: str = "original_name", target: str = "original_name"):
+                 source: str = "original_name", target: str = "mongo_name",
+                 target_collection: str = "mongo_collection"):
         '''
         '''
         '''
         '''
         import json
         import json
         from libraries.log import Log
         from libraries.log import Log
 
 
-        self._log = Log(log_name)
+        log = Log('Parse Mapping')
 
 
         if not os.path.isfile(mapping_path):
         if not os.path.isfile(mapping_path):
             err = "Mapping not found"
             err = "Mapping not found"
@@ -41,6 +42,7 @@ class ParseMapping:
 
 
         self._source = source
         self._source = source
         self._target = target
         self._target = target
+        self._target_collection = target_collection
 
 
     def get_field_mapping(self) -> dict:
     def get_field_mapping(self) -> dict:
         '''
         '''
@@ -71,6 +73,12 @@ class ParseMapping:
         return self._get_fields_satistisfying_condition(key="type",
         return self._get_fields_satistisfying_condition(key="type",
                                                         value="Date")
                                                         value="Date")
 
 
+    def get_fields_restricted_to_collecton(self, collection_name: str) -> list:
+        '''
+        '''
+        return self._get_fields_satistisfying_condition(key=self._target_collection,
+                                                        value=collection_name)
+
     def _get_info(self, key: str, value=None) -> dict:
     def _get_info(self, key: str, value=None) -> dict:
         '''
         '''
         '''
         '''

BIN
cdplib/db_migration/__pycache__/DataFrameToCollection.cpython-37.pyc


BIN
cdplib/db_migration/__pycache__/MigrationCleaning.cpython-37.pyc


BIN
cdplib/db_migration/__pycache__/ParseDbSchema.cpython-37.pyc


BIN
cdplib/db_migration/__pycache__/ParseJsonSchema.cpython-37.pyc


BIN
cdplib/db_migration/__pycache__/ParseMapping.cpython-37.pyc


+ 798 - 0
cdplib/hyperopt/HyperoptPipelineSelection.py

@@ -0,0 +1,798 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Fri Nov  9 13:27:44 2018
+
+@author: tanja
+@description: Implementation of machine learning
+                pipeline selection and tuning with hyperopt library
+"""
+
+import os
+import sys
+import gc
+import logging
+import pickle
+import time
+import datetime
+
+import pandas as pd
+import numpy as np
+
+from sklearn.pipeline import Pipeline
+
+from hyperopt import fmin, tpe, rand, Trials, hp, STATUS_OK, STATUS_FAIL,\
+    space_eval, pyll
+
+from sklearn.model_selection import cross_validate
+
+
+class HyperoptPipelineSelection:
+    '''
+    Use this class to perform a search
+    for a machine learning pipeline in a given parameter space.
+    The parameter space can include multiple types of Pipelines
+    (SVM, XGBOOST, random forest, etc),
+    as well as parameter distributions for each pipeline parameter.
+    See example in main for the expected space structure.
+
+    The search can be performed either randomly
+    or with a tree-based algorithm. (Other methods are currently
+    developped by hyperopt creators).
+
+    Attribute trials is responsible for book-keeping parameter
+    combinations that have already been tried out. This attribute
+    is saved to a binary file every n minutes as well as every time
+    a better pipeline was found.
+    '''
+    def __init__(self,
+                 cost_func,
+                 greater_is_better: bool,
+                 trials_path: str,
+                 backup_trials_freq: int = 1,
+                 log_path: str = None,
+                 averaging_func: callable = None):
+        '''
+        :param callable cost_func: function to minimize or maximize
+
+        :param bool greater_is_better: when True
+            cost_func is maximized, else minimized.
+
+        :param str trials_path: path at which the trials object is saved
+            in binary format. From the trials object we can
+            select information about the obtained scores, score variations,
+            and pipelines, and parameters tried out so far. If a trials object
+            already exists at the given path, it is loaded and the
+            search is continued, else, the search is started from
+            the beginning.
+
+        :param backup_trials_freq: frequecy in interations (trials)
+            of saving the trials object at the trials_path.
+
+        :param str log_path: Optional, when not provided logs to stdout.
+
+        :param callable averaging_func: optional,
+            when not provided set to mean. Function
+            to aggregate the cross-validated values of the cost function.
+            Classic situation is to take the mean,
+            another example is, for example mean() - c*var().
+        '''
+
+        assert(callable(cost_func)),\
+            "Parameter 'cost_func' must be a callable"
+
+        assert(isinstance(greater_is_better, bool)),\
+            "Parameter 'greater_is_better' must be bool type"
+
+        assert(isinstance(trials_path, str)),\
+            "Parameter 'trials_path' must be of string type"
+
+        if averaging_func is not None:
+            assert(callable(averaging_func)),\
+                "Parameter 'averaging_func' must be a callable"
+
+        self._assert_valid_directory(path=trials_path)
+
+        self._configer_logger(log_path)
+
+        self._cost_func = cost_func
+        # is 1 when cost_func is minimized, -1 when cost func is maximized
+        self._score_factor = (not greater_is_better) - greater_is_better
+        self._trials_path = trials_path
+        # is initialized with empty trials object
+        self._trials = Trials()
+        self._backup_trials_freq = backup_trials_freq
+        self._averaging_func = averaging_func or np.mean
+        # keeping track of the current search iteration
+        self._run_number = 0
+        # space and data need to be attached to perform search.
+        self._space_attached = False
+        self._data_attached = False
+
+        # if a trials object already exists at the given path,
+        # it is loaded and the search is continued. Else,
+        # the search is started from the beginning.
+        if os.path.isfile(trials_path):
+            try:
+                with open(trials_path, "rb") as f:
+                    self._trials = pickle.load(f)
+
+                self._logger.info(("Loaded an existing trials object"
+                                   "Consisting of {} trials")
+                                  .format(len(self._trials.trials)))
+
+            except Exception as e:
+                self._logger.error(("Trials object could not be loaded. "
+                                    "Training starts from the beginning. "
+                                    "Exit with error {}").format(e))
+
+        else:
+            self._logger.info(("No existing trials object was found"
+                               "Initialized an empty trials object."))
+
+        self._best_score = self.best_trial_score
+
+    def _configer_logger(self, log_path: str = None):
+        '''
+        Can be replaced with the existing script later.
+        When log_path is not provided, logs to stdout.
+        '''
+
+        self._logger = logging.getLogger(__name__)
+
+        if (self._logger.hasHandlers()):
+            self._logger.handlers.clear()
+
+        if log_path is not None:
+            assert(isinstance(log_path, str)),\
+                "Parameter 'log_path' must be of string type"
+            self._assert_valid_directory(log_path)
+
+            handler = logging.FileHandler(log_path)
+        else:
+            handler = logging.StreamHandler(sys.stdout)
+
+        formatter = logging.Formatter(
+                '\n %(asctime)s %(levelname)s %(message)s')
+
+        handler.setFormatter(formatter)
+        self._logger.addHandler(handler)
+        self._logger.setLevel("INFO")
+
+    def _backup_trials(self):
+        '''
+        Pickles (Saves) the trials object.
+        Used in a scheduler.
+        '''
+        with open(self._trials_path, "wb") as f:
+            pickle.dump(self._trials, f)
+
+    def _assert_valid_directory(self, path: str):
+        '''
+        If the directory of a path does not exist yet,
+        creates it.
+        '''
+        assert(isinstance(path, str)),\
+            "Parameter 'path' must of str type"
+
+        dirname = os.path.dirname("path")
+
+        if len(dirname) > 0:
+            os.mkdir(dirname, exists_ok=True)
+
+    def attach_space(self, space: pyll.base.Apply = None,
+                     module_path: str = None,
+                     name: str = None):
+        '''
+        :param pyll.base.Apply space: hyperopt space where
+            the search is performed. Optional when a space
+            is loaded from a python module.
+
+        :param str module_path: path to python module
+            where the space is defined. Optional when
+            the space is provided directly.
+
+        :param str name: name of the space loaded from
+            a python module. Optional when the space
+            is provided directly.
+        '''
+        assert((space is not None) or
+               ((module_path is not None) and (name is not None))),\
+            "Either space or (module_path, name) must be provided"
+
+        if space is None:
+            for p in ["modele_path", "name"]:
+                assert(isinstance(p, str)),\
+                    "Parameter '{}' must be of str type".format(p)
+
+            assert(os.path.isfile(module_path)),\
+                "Parameter 'module_path' must be a valid file"
+
+            module, extension = os.path.splitext(os.path.basename(module_path))
+            assert(extension == ",py"),\
+                "Parameter 'space' must be read from a python file"
+
+            sys.path.insert(module_path)
+
+            try:
+                from module import name as space
+            except ImportError:
+                err = "Invalid space location or name"
+                self._logger.error(err)
+                raise Exception(err)
+
+        assert(isinstance(space, pyll.base.Apply)),\
+            "Parameter 'space' must be of hyperopt space type"
+
+        self._space = space
+        self._logger.info("Attached parameter distribution space")
+        self._space_attached = True
+
+    def _convert_to_array(self, x: (pd.DataFrame, np.ndarray))\
+            -> np.ndarray:
+        '''
+        Converts an DataFrame to an numpy array.
+        '''
+        if isinstance(x, np.ndarray):
+            return x
+
+        elif (isinstance(x, pd.core.frame.DataFrame))\
+                or (isinstance(x, pd.core.series.Series)):
+            return x.values
+
+        else:
+            e = 'The argument must be a numpy array or a pandas DataFrame'
+            self._logger.critical(e)
+            raise ValueError(e)
+
+    def attach_data(self, X_train: (pd.DataFrame, np.ndarray),
+                    y_train: (pd.DataFrame, pd.Series, np.ndarray) = None,
+                    X_val: (pd.DataFrame, np.ndarray) = None,
+                    y_val: (pd.DataFrame, pd.Series, np.ndarray) = None,
+                    cv: (list, int) = None):
+        '''
+        :param array X_train: data on which
+            machine learning pipelines are trained
+
+        :param array y_train: optional, vector with targets,
+            (not all algorithms require a targets)
+
+        :param array X_val: optional, validation data.
+            When not provided, cross-validated value
+            of the cost_func is calculated.
+
+        :param array y_val: optional, validation targets
+
+        :param list cv: list of tuples containing
+            train and validation indices or an integer representing
+            the number of folds for a random split of data
+            during cross-validation
+            example: [([0,1,2], [3,4]), ([1,2,3], [4,5])]
+        '''
+
+        X_train = self._convert_to_array(X_train)
+        if y_train is not None:
+            y_train = self._convert_to_array(y_train)
+
+        if X_val is not None:
+            if cv is not None:
+                self._logger.warning(("Both validation set and cv object "
+                                      "are set. Validation score will be "
+                                      "calculated on the validation set!"))
+
+            X_val = self._convert_to_array(X_val)
+
+            train_inds = list(range(len(X_train)))
+            val_inds = list(range(len(X_train),
+                                  len(X_train) + len(X_val)))
+
+            # cost is evaluated with a cross validation function
+            # that accepts an array and a cv object with
+            # indices of the fold splits.
+            # Here we create a trivial cv object
+            # with one validation split.
+            self._cv = [(train_inds, val_inds)]
+            self._X = np.concatenate([X_train, X_val])
+
+            if y_train is not None:
+                if y_val is None:
+                    err = "Argument y_val must be provided"
+                    self._logger.critical(err)
+                    raise ValueError(err)
+                else:
+                    y_val = self._convert_to_array(y_val)
+                    self._y = np.concatenate([y_train, y_val])
+            else:
+                self._y = None
+        else:
+            if cv is None:
+                self._logger.warning(("Neither validation set nor cv object "
+                                      "are set. Validation score will be "
+                                      "calculated on 5 randomly "
+                                      "splitted folds."))
+
+            self._X = X_train
+            self._y = y_train
+            self._cv = cv
+
+        self._logger.info("Attached data")
+        self._data_attached = True
+
+    def _evaluate(self, pipeline: Pipeline) -> dict:
+        '''
+        This method is called in _objective.
+
+        Calculates the cost on the attached data.
+        This function can be overriden, when the cost
+        needs to be calculated differently,
+        for example with a tensorflow model.
+
+        :param Pipeline pipeline: machine learning pipeline
+            that will be evaluated with cross-validation
+
+        :output: dictionary with the aggregated
+            cross-validation score and
+            the score variance.
+        '''
+
+        scores = cross_validate(estimator=pipeline,
+                                X=self._X,
+                                y=self._y,
+                                cv=self._cv or 5,
+                                scoring=make_scorer(self._cost_func),
+                                error_score=np.nan)
+
+        return {'value': self._averaging_func(scores['test_score']),
+                'variance': np.var(scores['test_score'])}
+
+    def _objective(self, space_element: dict) -> dict:
+        '''
+        This method is called in search_for_best_pipeline
+        inside the hyperopt fmin method.
+
+        Uses _evaluate method.
+
+        It must take as input a space element
+        and produce an output in the form of dictionary
+        with 2 obligatory values loss and status
+        (STATUS_OK or STATUS_FAIL). Other
+        values in the output are optional and can be
+        accessed later through the trials object.
+
+        :Warning: fmin minimizes the loss,
+        when _evaluate returns a value to be maximized,
+        it should be multiplied by -1 to obtain loss.
+
+        :param dict space_element: must contain keys
+            name (with the name of the pipeline),
+            pipeline (Pipeline object),
+            params (dict of pipeline params)
+
+        :output: dictionary with keys
+            loss (minimized value),
+            status with values STATUS_OK or STATUS_FAIL
+            uderstood by hyperopt,
+            score (equal to loss or -loss),
+            score_variance,
+            timestamp (end of execution),
+            train_time: execution time
+        '''
+        assert(isinstance(space_element, dict) and
+               set(['name', 'pipeline', 'params']) <= space_element.keys())
+
+        assert(isinstance(space_element['name'], str) and
+               isinstance(space_element['pipeline'], Pipeline) and
+               isinstance(space_element['params'], dict))
+
+        start_time = time.time()
+
+        if not self._data_attached:
+            raise Exception(("Data must be attached in order "
+                             "in order to effectuate the best"
+                             "pipeline search"))
+
+        self._run_number += 1
+
+        pipeline = space_element['pipeline']
+        params = space_element['params']
+        pipeline.set_params(**params)
+
+        self._logger.info(("Run number {0}: "
+                           "Current score is {1}: "
+                           "Training pipeline {2} "
+                           "with parameters: {3}. ").format(
+                             self._run_number,
+                             self._best_score,
+                             space_element['name'],
+                             params))
+
+        try:
+            score_stats = self._evaluate(pipeline)
+            assert(not np.isnan(score_stats["value"])),\
+                "Returned null score"
+
+            if self._run_number % self._backup_trials_freq == 0:
+                self._backup_trials()
+
+            if (self._best_score != self._best_score) or\
+                self._score_factor*score_stats["value"] <\
+                    self._score_factor*self._best_score:
+
+                self._logger.info("Score got better, new best score is: {}"
+                                  .format(score_stats["value"]))
+
+                self._best_score = score_stats['value']
+
+                self._backup_trials()
+
+            end_time = time.time()
+
+            return {'loss': self._score_factor * score_stats["value"],
+                    'status': STATUS_OK,
+                    'score': score_stats["value"],
+                    'score_variance': score_stats["variance"],
+                    'timestamp': datetime.datetime.today(),
+                    'train_time': end_time - start_time}
+
+        except Exception as e:
+
+            self._logger.warning("Trial failed with error {}".format(e))
+
+            return {'loss': np.nan,
+                    'status': STATUS_FAIL,
+                    'score': np.nan,
+                    'score_variance': np.nan,
+                    'timestamp': datetime.datetime.today(),
+                    'train_time': np.nan}
+
+    def search_for_best_pipeline(self,
+                                 niter: int,
+                                 algo: callable = tpe.suggest):
+        '''
+        Method performing the search of the best pipeline in the given space.
+        Calls fmin function from the hyperopt library to minimize the output of
+        _objective.
+
+        :params int niter: number of search iterations
+        :param callable algo: now can only take values tpe for a tree-based
+            random search or random for random search
+        '''
+        assert(self._space_attached),\
+            "Space must be attach to be able to retrieve this information."
+
+        assert(isinstance(niter, int)),\
+            "Parameter 'niter' must be of int type"
+
+        # right now only two algorithms are provided by
+        assert(algo in [tpe.suggest, rand.suggest]),\
+            ("Parameter 'algo' can be now only tpe or random. "
+             "If other algorithms have been developped by "
+             "by hyperopt, plased add them to the list.")
+
+        try:
+            self._logger.info(("Starting {0} iterations of search "
+                               "additional to {1} previous"
+                               .format(niter, len(self._trials.trials))))
+
+            best = fmin(fn=self._objective,
+                        space=space,
+                        algo=algo,
+                        trials=self._trials,
+                        max_evals=len(self._trials.trials) + niter)
+
+            # print('AAAA', str(niter))
+
+            self._logger.info(
+                    "Best score is {0} with variance {1}"
+                    .format(
+                     self._trials.best_trial["result"]["score"],
+                     self._trials.best_trial["result"]["score_variance"]))
+
+            self._logger.info(("Finished {0} iterations of search.\n"
+                               "Best parameters are:\n {1} ")
+                              .format(niter,
+                                      space_eval(space, best)))
+
+            self._backup_trials()
+
+        except Exception as e:
+            raise ValueError(("Failed to select best "
+                             "pipeline! Exit with error: {}").format(e))
+
+    @property
+    def best_trial_score(self) -> float:
+        '''
+        '''
+        if len(self._trials.trials) > 0:
+            return self._trials.best_trial["result"]["score"]
+        else:
+            return np.nan
+
+    @property
+    def best_trial_score_variance(self) -> float:
+        '''
+        '''
+        if len(self._trials.trials) > 0:
+            return self._trials.best_trial["result"]["score_variance"]
+        else:
+            return np.nan
+
+    @property
+    def best_trial_pipeline(self) -> Pipeline:
+        '''
+        '''
+        assert(self._space_attached),\
+            "Space must be attach to be able to retrieve this information."
+
+        if len(self._trials.trials) > 0:
+
+            return space_eval(
+                    space,
+                    {k: v[0] for k, v in
+                     self._trials.best_trial['misc']['vals'].items()
+                     if len(v) > 0})["pipeline"]
+        else:
+            err = ("Trials object is empty. "
+                   "Best pipeline cannot be returned")
+
+            self._logger.error(err)
+            raise Exception(err)
+
+    def _ith_trial_loss(self, i: int) -> float:
+        '''
+        '''
+        if len(self._trials.trials) >= i:
+            return self._trials.trials[i]['result']['loss']
+        else:
+            return np.nan
+
+    def _ith_trial_element(self, i: int, name: str) -> object:
+        '''
+        '''
+        assert(self._space_attached),\
+            "Space must be attach to be able to retrieve this information."
+
+        if len(self._trials.trials) >= i:
+            return space_eval(self._space,
+                              {k: v[0] for k, v in
+                               self._trials.trials[i]['misc']['vals']
+                               .items() if len(v) > 0})[name]
+
+    def _ith_trial_pipeline(self, i: int) -> Pipeline:
+        '''
+        '''
+        return self._ith_trial_element(i=i, name='pipeline')
+
+    def _ith_trial_name(self, i: int) -> str:
+        '''
+        '''
+        return self._ith_trial_element(i=i, name='name')
+
+    def _ith_trial_params(self, i: int) -> dict:
+        '''
+        '''
+        return self._ith_trial_element(i=i, name='params')
+
+    def _ith_trial_timestamp(self, i: int) -> datetime.datetime:
+        '''
+        '''
+        if len(self._trials.trials) >= i:
+            return self._trials.trials[i]["result"]["timestamp"]
+
+    def get_n_best_trial_pipelines(self, n: int, losses: list = None) -> list:
+        '''
+        Returns the list of n best pipelines
+        documented in trials
+        '''
+        if len(self._trials.trials) > 0:
+            if losses is None:
+                losses = [self._ith_trial_loss(i)
+                          for i in range(len(self._trials.trials))]
+
+            best_n_indices = [losses.index(l)
+                              for l in sorted(list(set(losses)))[:n]]
+
+            return [self._ith_trial_pipeline(i) for i in best_n_indices]
+        else:
+            err = ("Trials object is empty. "
+                   "Best pipeline cannot be returned")
+
+            self._logger.error(err)
+            raise Exception(err)
+
+    def get_n_best_trial_pipelines_of_each_type(self, n: int) -> dict:
+        '''
+        Returns a dictiionry where keys are pipeline names,
+        and values are lists of best pipelines with this name
+        '''
+        assert(isinstance(n, int)), "Parameter 'n' must be an integer"
+
+        if len(self._trials.trials) > 0:
+
+            best_pipelines_per_type = {}
+            names = [self._ith_trial_name(i)
+                     for i in range(len(self._trials.trials))]
+
+            for nm in names:
+                losses = [self._ith_trial_loss(i)
+                          for i in range(len(self._trials.trials))
+                          if self._ith_trial_name(i) == nm]
+
+                best_pipelines_per_type[nm] = self.get_n_best_trial_pipelines(
+                                                        n=n,
+                                                        losses=losses)
+
+            return best_pipelines_per_type
+
+        else:
+            err = ("Trials object is empty. "
+                   "Best pipeline cannot be returned")
+
+            self._logger.error(err)
+            raise Exception(err)
+
+    def write_trials_documentation(self, path: str = None):
+        '''
+        Saves an excel file with pipeline names, scores,
+        parameters, and timestamps.
+        '''
+        if len(self._trials.trials) > 0:
+            path = path or "hyperopt_trials_documentation.xlsx"
+
+            assert(isinstance(path, str)),\
+                "Parameter 'path' must be of string type"
+
+            self._assert_valid_directory(path)
+
+            names = [self._ith_trial_name(i)
+                     for i in range(len(self._trials.trials))]
+            scores = [self._score_factor*self._ith_trial_loss(i)
+                      for i in range(len(self._trials.trials))]
+            params = [self._ith_trial_params(i)
+                      for i in range(len(self._trials.trials))]
+            timestamps = [self._ith_trial_timestamp(i)
+                          for i in range(len(self._trials.trials))]
+
+        else:
+            names = []
+            scores = []
+            params = []
+            timestamps = []
+
+        pd.DataFrame({"name": names,
+                      "score": scores,
+                      "params": params,
+                      "timestamp": timestamps})\
+          .to_excel(path)
+
+
+if __name__ == '__main__':
+
+    from sklearn.metrics import roc_auc_score, make_scorer
+    from xgboost import XGBClassifier
+    from sklearn.svm import SVC
+    from sklearn.feature_selection import SelectKBest
+    from sklearn.decomposition import PCA
+    from sklearn.datasets import load_iris
+    from pprint import pprint
+
+    data = load_iris()
+    X = pd.DataFrame(data.data)
+    y = pd.Series(data.target)
+    # produce a binory variable
+    y = (y == 2).astype(int)
+    del data
+    gc.collect()
+
+    # SPACE DEFINITION ########################################
+    # (can be moved to a separate python script)
+
+    """
+    A search space must be a list of dictionaries.
+    Each dictionry must have keys:
+        name (pipeline name or type),
+        pipeline (instance of sklearn.pipeline.Pipeline),
+        params (dictionary of distributions for the parameters of
+                the pipeline that we want to tune)
+
+    Here we have a space that consists of two dictionaries:
+    KBEST_XGBOOST and PCA_SVC
+    """
+    space = []
+
+    pipeline_dist_1 = {}
+    pipeline_dist_1["name"] = "KBEST_XGBOOST"
+
+    """
+    A pipeline consists of steps (tuples).
+    Each step has a name and an algorithm.
+    This pipeline, as a first step performs
+    feature selection with SelectKBest and
+    as a second step evaluates a machine learning algo (xgboost).
+
+    Like all sklearn algorithms, a Pipeline has methods
+    fit, predict, set_params, get_params
+    """
+    pipeline_dist_1["pipeline"] = Pipeline([
+                                     ('kbest', SelectKBest()),
+                                     ('xgb', XGBClassifier())
+                                     ])
+    """
+    Pipeline parameter dictionaries must be of the form:
+    {'kbest__k': 3, xgb__n_estimators: 20},
+    each parameter name consists of the step name, __, and parameter name.
+
+    Here, instead of values, the parameter names are followed
+    by hyperopt distributions.
+    Each hyperopt distribution also must have a name,
+    due to hyperopt functionality.
+
+    Here, we set the hyperopt distribution name to the step name,
+    but it does not have to be so. Hyperopt distribution names
+    must be different for different elements of the space.
+    """
+
+    pipeline_dist_1["params"] = {
+            'kbest__k': hp.choice('kbest__k', range(1, 5)),
+
+            'xgb__n_estimators':
+            50 + hp.randint('xgb__n_estimators', 50),
+
+            "xgb__learning_rate":
+            hp.loguniform('xgb__learning_rate', np.log(0.01), np.log(0.2))
+            }
+
+    space.append(pipeline_dist_1)
+
+    pipeline_dist_2 = {}
+    pipeline_dist_2["name"] = "PCA_SVC"
+
+    pipeline_dist_2["pipeline"] = Pipeline([
+                                     ('pca', PCA()),
+                                     ('svc', SVC(gamma="scale"))
+                                     ])
+
+    pipeline_dist_2["params"] = {
+            "pca__n_components": 1 + hp.randint("pca__n_components", 4),
+
+            "svc__C": hp.loguniform("svc__C", np.log(0.01), np.log(0.1))
+            }
+
+    space.append(pipeline_dist_2)
+
+    space = hp.choice('pipelines', space)
+
+    # TESTING ##########################################################
+
+    trials_path = 'TEST_hyperopt_trials.pkl'
+
+    doc_path = 'TEST_hyperopt_doc.xlsx'
+
+    hp_obj = HyperoptPipelineSelection(cost_func=roc_auc_score,
+                                       greater_is_better=True,
+                                       trials_path=trials_path)
+
+    hp_obj.attach_data(X_train=X, y_train=y)
+
+    hp_obj.attach_space(space=space)
+
+    hp_obj.search_for_best_pipeline(niter=10)
+
+    print('\n', '='*20, 'TESTING', '='*20)
+
+    print('\n', 'Best score:', hp_obj.best_trial_score)
+
+    print('\n', 'Best score variance:', hp_obj.best_trial_score_variance)
+
+    print('\n', 'Best pipeline', hp_obj.best_trial_pipeline)
+
+    print('\n', 'Best 3 pipelines: \n')
+    pprint(hp_obj.get_n_best_trial_pipelines(n=3))
+
+    print('\n', 'Best pipeline per type: \n')
+    pprint(hp_obj.get_n_best_trial_pipelines_of_each_type(n=1))
+
+    hp_obj.write_trials_documentation(path=doc_path)
+
+    # os.remove(doc_path)
+    # os.remove(trials_path)

+ 38 - 6
cdplib/log.py

@@ -6,12 +6,19 @@
 import sys
 import sys
 import os
 import os
 import logging
 import logging
+from datetime import datetime
 
 
+sys.path.append(os.getcwd())
 
 
-class Log:
+
+class Log():
+    '''
+    '''
+    pass
     def __init__(self, name: str = None,
     def __init__(self, name: str = None,
                  log_file: str = None,
                  log_file: str = None,
-                 log_level: str = "INFO",
+                 log_level: str = "ERROR",
+                 stdout_log_level: str = "INFO",
                  print_to_stdout: bool = True):
                  print_to_stdout: bool = True):
         """Sets the log level and the path where the log file is stored
         """Sets the log level and the path where the log file is stored
 
 
@@ -23,30 +30,34 @@ class Log:
 
 
         self._logger = logging.getLogger(name)
         self._logger = logging.getLogger(name)
 
 
+        self._logger.setLevel("DEBUG")
+
         if (self._logger.hasHandlers()):
         if (self._logger.hasHandlers()):
             self._logger.handlers.clear()
             self._logger.handlers.clear()
 
 
         if log_file is None:
         if log_file is None:
-            log_file = os.path.join(".", "all.log")
+            log_file = os.path.join(".", "logs", str(datetime.today().date()) + ".log")
 
 
         assert(isinstance(log_file, str)),\
         assert(isinstance(log_file, str)),\
             "Parameter 'log_path' must be of string type"
             "Parameter 'log_path' must be of string type"
 
 
+        os.makedirs(os.path.dirname(log_file), exist_ok=True)
+
         formatter = logging.Formatter(
         formatter = logging.Formatter(
                 '\n %(name)s %(asctime)s %(levelname)s %(message)s')
                 '\n %(name)s %(asctime)s %(levelname)s %(message)s')
 
 
-        os.makedirs(os.path.dirname(log_file), exist_ok=True)
-
         file_handler = logging.FileHandler(log_file)
         file_handler = logging.FileHandler(log_file)
         file_handler.setFormatter(formatter)
         file_handler.setFormatter(formatter)
+        file_handler.setLevel(log_level)
         self._logger.addHandler(file_handler)
         self._logger.addHandler(file_handler)
 
 
         if print_to_stdout:
         if print_to_stdout:
             stream_handler = logging.StreamHandler(sys.stdout)
             stream_handler = logging.StreamHandler(sys.stdout)
             stream_handler.setFormatter(formatter)
             stream_handler.setFormatter(formatter)
+            stream_handler.setLevel(stdout_log_level)
             self._logger.addHandler(stream_handler)
             self._logger.addHandler(stream_handler)
 
 
-        self._logger.setLevel(log_level)
+        # self._logger.setLevel(log_level)
 
 
     def info(self, message: str):
     def info(self, message: str):
         self._logger.info(message)
         self._logger.info(message)
@@ -56,3 +67,24 @@ class Log:
 
 
     def error(self, message: str):
     def error(self, message: str):
         self._logger.error(message)
         self._logger.error(message)
+
+    def log_and_raise_error(self, message):
+        '''
+        '''
+        self._logger.error(message, exc_info=True)
+
+        raise Exception(message)
+
+    def log_and_raise_error_stack_info(self, message):
+        '''
+        '''
+        self._logger.error(message, exc_info=True, stack_info=True)
+
+        raise Exception(message)
+
+    def log_and_raise_warning(self, message):
+        '''
+        '''
+        self._loggger.warning(message)
+
+        raise Warning(message)