tsteuer před 5 roky
rodič
revize
ec7d4fa4a1

+ 0 - 11
Animals/Birds.py

@@ -1,11 +0,0 @@
-class Birds:
-    def __init__(self):
-        ''' Constructor for this class. '''
-        # Create some member animals
-        self.members = ['Sparrow', 'Robin', 'Duck']
- 
- 
-    def printMembers(self):
-        print('Printing members of the Birds class')
-        for member in self.members:
-           print('\t%s ' % member)

+ 0 - 1
Animals/__init__.py

@@ -1 +0,0 @@
-from .Birds import *

+ 211 - 0
db_handlers/MongodbHandler.py

@@ -0,0 +1,211 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+"""
+Created on Mon Sep 16 13:27:44 2019
+
+@author: oskar
+@description: Implementation of a database handler for abstraction of the mongodb.
+"""
+
+
+import json
+import simplejson
+import sys
+import os
+import jsonref
+
+from copy import deepcopy
+from pymongo import MongoClient
+import pandas as pd
+import numpy as np
+
+sys.path.append(os.getcwd())
+from libraries.log import Log
+from libraries.configuration import default as cfg
+
+class MongodbHandler:
+
+    '''
+
+    '''
+
+    def __init__(self, database_url: str = cfg['MONGO_DB']['URI'],
+                 database_name: str = cfg['MONGO_DB']['DATABASE_NAME']):
+        '''
+        :param str database_url: Url for the mongodb database
+        :param str database_name: Name of the database the database handler should handle
+        '''
+        assert(isinstance(database_url, str)),\
+            "Parameter 'database_url' must be a string type"
+        assert(isinstance(database_name, str)),\
+            "Parameter 'database_name' must be a string type"
+
+        self._log = Log("\nMongodbHandler script")
+
+        self._log.info('Mongodb Handler has been initialized')
+        # Connect to the MongoDB
+        self._client = MongoClient(database_url)
+        # Connect to the oebb_db database, or create it if it doesnt exist.
+        self._database = self._client[database_name]
+
+    def _read_schema(self, schema_path: str) -> dict:
+        '''
+        :param str schema_path: path to the schema file.
+        '''
+
+        assert(isinstance(schema_path, str)),\
+            "Parameter 'schema_path must be a string type"
+
+        with open(schema_path) as json_file:
+            schema = json.load(json_file)
+
+        if 'definitions' in schema:
+            schema = self._dereference_schema(schema)
+
+        return schema
+
+    def _dereference_schema(self, schema: dict) -> dict:
+        '''
+        :param dict schema: dictionary containing a schema which uses references.
+        '''
+
+        assert(isinstance(schema, dict)),\
+            "Parameter 'schema' must be a dictionary type"
+
+        schema = jsonref.loads(str(schema).replace("'", "\""))
+        schema = deepcopy(schema)
+        schema.pop('definitions', None)
+        return schema
+
+    def set_collection_schema(self, collection_name: str, schema_path: str,
+                              validation_level: str = 'moderate',validation_action: str = 'error'):
+        '''
+        :param str collection_name: name on the collection for which the schema will be set.
+        :param str schema_path: path to the schema file.
+        :param str validation_level: level of validation done by the mongodb.
+        :param str validation_action: what will happen upon validation error, warning or error message.
+        '''
+        assert(isinstance(collection_name, str)),\
+            "Parameter 'collection_name' must be a string type"
+        assert(isinstance(schema_path, str)),\
+            "Parameter 'schema_path' must be a string type"
+        assert(isinstance(validation_level, str)),\
+            "Parameter 'validation_lever' must be a string type"
+        assert(isinstance(validation_action, str)),\
+            "Parameter 'validation_action' must be a string type"
+
+        schema = self._read_schema(schema_path)
+
+        command = {
+                    'collMod': collection_name,
+                    'validator': {
+                        '$jsonSchema': schema
+                    },
+                    'validationLevel': validation_level,
+                    'validationAction': validation_action
+                    }
+
+        self._database.command(command)
+
+    def create_collection(self, collection_name):
+        '''
+        :param str collection_name: name of the collection to be created.
+        '''
+
+        assert(isinstance(collection_name, str)),\
+            "Parameter 'collection_name' must be a string type"
+
+        if collection_name not in self._database.list_collection_names():
+            self._log.info(("Collection '{}' has been created").format(collection_name))
+            return self._database.create_collection(collection_name)
+        else:
+            self._log.info(("Collection '{}' already exists").format(collection_name))
+            return self._database[collection_name]
+
+    def insert_data_into_collection(self, data: (dict, list, np.ndarray, pd.DataFrame, pd.Series),
+                                    collection_name: str,
+                                    ordered: bool = False):
+        '''
+        :param dict data: dictionary containing the data to be inserted in the collection
+        :param pymongo.database.Collection collection: The collection the data will be added to.
+        '''
+
+        allowed_types = (dict, list, np.ndarray, pd.DataFrame, pd.Series)
+
+        assert(isinstance(data, allowed_types)),\
+            "Parameter 'data' is of invalid type"
+
+        if isinstance(data, np.ndarray):
+            data = pd.DataFrame(data)
+
+        if isinstance(data, pd.DataFrame):
+
+            data = simplejson.loads(data.to_json(orient="records",
+                                                 date_format="iso"))
+
+        elif isinstance(data, pd.Series):
+
+            data = simplejson.loads(data.to_json(date_format="iso"))
+
+        if (len(data) == 1) or (isinstance(data, dict)):
+
+            if isinstance(data, pd.DataFrame) and (len(data) == 1):
+                data = data.iloc[0]
+
+            self._database[collection_name].insert_one(data)
+        else:
+            self._database[collection_name].insert_many(data, ordered=ordered)
+
+        self._log.info(('Data has been inserted into the {} collection').format(collection_name))
+
+    def create_collection_and_set_schema(self, collection_name: str, schema_path: str):
+        '''
+        :param str collection_name: name of the collection to be created.
+        :param str schema_path: path to the schema file.
+        '''
+        assert(isinstance(collection_name, str)),\
+            "Parameter 'collection_name' must be a string type"
+        assert(isinstance(schema_path, str)),\
+            "Parameter 'schema_path' must be a string type"
+
+        self.create_collection(collection_name)
+        self.set_collection_schema(collection_name=collection_name, schema_path=schema_path)
+
+    def query_data_and_generate_dataframe(self, collection_name: str, attribute: str = None,
+                                          attribute_value: str = None, comparison_operator: str = '$eq'):
+        '''
+
+        '''
+        if attribute is None or attribute_value is None:
+            data = self._database[collection_name].find()
+        else:
+            data = self._database[collection_name].find({attribute: {comparison_operator: attribute_value}})
+
+        df = pd.DataFrame(list(data))
+        df.set_index('radsatznummer', inplace=True)
+        return df
+
+
+if __name__ == "__main__":
+
+    log = Log("Test MongodbHandler:")
+
+    log.info('Script started')
+
+    db_handler = MongodbHandler()
+
+    # Create a colleciton for the wheelsets and give it its schema.
+    for schema_path in [
+            os.path.join(".", "mongo_schema", "schema_wheelsets.json"),
+            os.path.join(".", "mongo_schema", "schema_process_instances.json"),
+            os.path.join(".", "mongo_schema", "schema_componets.json")]:
+
+        if os.path.isfile(schema_path):
+
+            collection_name = os.path.basename(schema_path).lstrip("_schema").split(".")[0]
+
+            db_handler.create_collection_and_set_schema(collection_name, schema_path)
+
+    log.info(("Existing databases: {}, Collection in OEBB database {}")\
+             .format(db_handler._client.list_database_names(), db_handler._database.list_collection_names()))

+ 595 - 0
db_handlers/SQLHandler.py

@@ -0,0 +1,595 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Tue Sep 18 16:20:50 2018
+
+@author: tanya
+"""
+
+import os
+import sys
+import re
+import sqlalchemy
+import sqlparse
+import pandas as pd
+import warnings
+
+sys.path.append(os.getcwd())
+
+
+class SQLHandler:
+    '''
+    Resembles methods for executing sql queries
+    with different dabase connectors.
+    Remark:in each method we force new opening and
+    closing of a database connection,
+     this avoids errors when parallelizing with multiprocessing.
+    '''
+
+    def __init__(self, db_uri: str = None,
+                 is_case_insensitive: bool = False):
+        '''
+        :param str db_uri:
+            of form
+            <sqlalchemy_dialect//user:password@host:port/dbname?charset=utf8&local_infile=1>
+
+         sqlalchemy dialects:
+             for mysql : mysql+pymysql
+             for db2: ibm_db_sa
+        '''
+
+        from libraries.log import Log
+        from libraries.configuration import default as cfg
+        from sqlalchemy_utils import database_exists, create_database
+
+        self._log = Log(name='SQLHandler')
+
+        if db_uri is None:
+            db_uri = cfg["SQL_DB"]["URI"]
+
+        assert(isinstance(db_uri, str)),\
+            "Parameter 'db_uri' must be of type str"
+
+        assert(re.match(r'.+://.+:(.+)?@.+:.+/.+', db_uri) is not None),\
+            ('database url does not match the pattern: '
+             'sqlalchemy_dialect//user:password@host:port/dbname')
+
+        self._db_uri = db_uri
+
+        engine = sqlalchemy.create_engine(self._db_uri)
+
+        if not database_exists(engine.url):
+            create_database(engine.url)
+
+        query = "CREATE DATABASE IF NOT EXISTS {}"\
+            .format(self._connection_params["db"])
+
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore")
+            engine.execute(query)
+
+        assert(isinstance(is_case_insensitive, bool)),\
+            "Parameter 'is_case_sensetive' must of type bool"
+
+        if 'ibm' in db_uri and not is_case_insensitive:
+            raise Exception('Ibm db2 is case insensitive')
+
+        self._is_case_insensitive = is_case_insensitive
+
+        self._engine = sqlalchemy.create_engine(self._db_uri)
+
+    @property
+    def _connection_params(self) -> dict:
+        '''
+        return: connection parameters like user,
+        password, host, port, and database name
+        rtype: dict
+        '''
+        try:
+            connection_params = {}
+
+            connection_params['user'], connection_params['password'] =\
+                self._db_uri.split('//')[1]\
+                            .split('@')[0]\
+                            .split(':')
+
+            connection_params['host'], connection_params['port'] =\
+                self._db_uri.split('//')[1]\
+                            .split('@')[1]\
+                            .split('/')[0]\
+                            .split(':')
+
+            connection_params['db'] = self._db_uri.split('/')[-1]\
+                                                  .split('?')[0]
+
+            return connection_params
+
+        except Exception as e:
+            err = ("Could not parse connection parameters."
+                   "Finished with error {}")\
+                   .format(e)
+
+            self._log.error(err)
+            raise Exception(err)
+
+    def drop_database(self):
+        '''
+        '''
+        database = self._connection_params["db"]
+        self.execute("DROP DATABASE IF EXISTS {}".format(database))
+
+    @property
+    def _db_metadata(self) -> dict:
+        '''
+        Returns a sql-dialect specific information like information schema
+        and columnames in information_schema.tables and
+        information_schema.columns
+        For ibm databases, information_schema is set to syscat,
+        else it is set to information_schema
+        If these default values do not exist in the given database,
+        the output of the method is set to None
+
+        :return: dictionary with information_schema, schema_col,
+            table_col, column_col, default_schema
+        '''
+
+        db_metadata = {}
+
+        if 'ibm' in self._db_uri:
+            db_metadata['information_schema'] = 'syscat'
+            db_metadata['schema_col'] = 'tabschema'
+            db_metadata['table_col'] = 'tabname'
+            db_metadata['column_col'] = 'colname'
+            db_metadata['default_schema'] =\
+                self._connection_params['user'].upper()
+        else:
+            db_metadata['information_schema'] = 'information_schema'
+            db_metadata['schema_col'] = 'TABLE_SCHEMA'
+            db_metadata['table_col'] = 'TABLE_NAME'
+            db_metadata['column_col'] = 'COLUMN_NAME'
+            db_metadata['default_schema'] =\
+                self._connection_params['db']
+
+        # check if it worked to create metadata
+        try:
+            query = """SELECT *
+                       FROM {}.tables
+                       LIMIT 1
+                    """.format(db_metadata['information_schema'])
+            self.execute(query)
+
+        except Exception as e:
+            self._log.error(e)
+            db_metadata = None
+
+        return db_metadata
+
+    def execute(self, query):
+        '''
+        Executes an sql-queries.
+        Remark: queries like CREATE, DROP, SELECT work
+        for majority of sqlalchemy dialects.
+         queries like SHOW TABLES, LOAD DATA, and using
+         INFORMATION_SCHEMA are mysql specific and might
+         not exist in a different dialect.
+
+        :param str query:
+        '''
+        connection = self._engine.connect()
+        transaction = connection.begin()
+
+        errors = []
+
+        # in the case of multi-query execute each query
+        for sub_query in sqlparse.split(query):
+            if len(sub_query) > 0:
+                try:
+                    connection.execute(sub_query, multi=True)
+
+                except Exception as e:
+                    errors.append(str(e))
+
+        if len(errors) > 0:
+            err = ('Could not execute some of the queries. '
+                   'Obtained exceptions: {}'
+                   .format('\n'.join(errors)))
+
+            self._log.error(err)
+            raise Exception(err)
+
+        transaction.commit()
+        connection.close()
+
+    def execute_query_from_file(self, filename: str):
+        '''
+        '''
+        with open(filename, 'r') as f:
+            query = f.read()
+
+        self.execute(query)
+
+    def get_tablenames(self, schema: str = None, query: str = None):
+        '''
+        '''
+        if (self._db_metadata is None) and (query is None):
+            raise Exception('Please specify the query')
+
+        else:
+            try:
+                if query is None:
+                    schema_or_default_schema =\
+                        self._db_metadata['default_schema']\
+                        if schema is None else schema
+
+                    query = """SELECT DISTINCT {0}
+                               FROM {1}.tables
+                               WHERE {2} = '{3}'
+                            """.format(
+                            self._db_metadata['table_col'],
+                            self._db_metadata['information_schema'],
+                            self._db_metadata['schema_col'],
+                            schema_or_default_schema)
+
+                tables = self.read_sql_to_dataframe(query).iloc[:, 0].tolist()
+                return tables
+
+            except Exception as e:
+                err = ("Could not get tablenames"
+                       "Finished with error {}".format(e))
+
+                self._log.error(err)
+                raise Exception(err)
+
+    def check_if_table_exists(self, tablename: str,
+                              schema: str = None,
+                              query: str = None):
+        '''
+        Tries to retrieve table information from database with given query.
+        If this does not work, tries to select one row from the given table,
+        if this fails, assumes that the table does not exist.
+
+        :param str tablename:
+        :param str schema:
+        :param str query: if not specified, tries to find
+            tablename in information_schema specified in _db_metadata.
+        :return: if the table exists or not
+        :rtype: bool
+        '''
+        if self._is_case_insensitive:
+            tablename = tablename.upper()
+
+        try:
+            tablenames = self.get_tablenames(schema=schema, query=query)
+
+            table_exists = (tablename in tablenames)
+
+        except Exception as e:
+            self._log.warning(('Could not execute query to retrieve table '
+                               'information. Trying to execute a'
+                               'select statement. '
+                               'Got exeption {}').format(e))
+            try:
+                query = """SELECT *
+                           FROM {0}{1}
+                           LIMIT 1
+                        """.format('' if schema is None else schema + '.',
+                                   tablename)
+
+                self.execute(query)
+
+                table_exists = True
+
+            except Exception as e:
+                self._log.warning(('Failed to select from {0}. '
+                                   'Finished with error {1}'
+                                   'Conclusion: table does not exist')
+                                  .format(tablename, e))
+
+                table_exists = False
+
+        return table_exists
+
+    def create_schema(self, schema: str, query: str = None):
+        '''
+        Creates a schema if it does not exist, else does nothing
+
+        :param str schema:
+        :param str query: if None trying to read schemas from
+            information_schema specified in db_metadata
+        '''
+        if (query is None):
+
+            if self._db_metadata is None:
+                raise Exception('Please specify query')
+            else:
+                query = """SELECT DISTINCT {0}
+                           FROM {1}.tables""".format(
+                              self._db_metadata['schema_col'],
+                              self._db_metadata['information_schema'])
+
+        try:
+            schemas = self.read_sql_to_dataframe(query).iloc[:, 0].tolist()
+        except Exception as e:
+            err = ("Could not retrieve the list of schemas"
+                   "from the database. Finished with error {}"
+                   .format(e))
+
+            self._log.error(err)
+            raise Exception(err)
+
+        if schema not in schemas:
+            self.execute("CREATE SCHEMA {}".format(schema))
+
+    def drop_table_if_exists(self, tablename: str,
+                             schema: str = None,
+                             query: str = None):
+        '''
+        :param str tablename:
+        :param str schema:
+        :param str query: if not specified, default value is "DROP TABLE"
+        '''
+        if self._is_case_insensitive:
+            tablename = tablename.upper()
+
+        schema = '' if schema is None else schema + '.'
+
+        if query is None:
+            query = "DROP TABLE {0}{1};".format(schema, tablename)
+
+        try:
+            if self.check_if_table_exists(tablename):
+                self.execute(query)
+
+        except Exception as e:
+            err = ("Could not drop the table {0} ."
+                   "Finished with error {1}"
+                   .format(tablename, e))
+
+            self._log.error(err)
+            raise Exception(err)
+
+    def get_column_names(self, tablename: str,
+                         schema: str = None,
+                         query: str = None):
+        '''
+        Tries to retrieve column information from database with given query.
+        If this does not work, tries to select one row from the given table.
+
+        :param str tablename:
+        :param str schema:
+        :param str query: if not specified, tries to select column
+            names in the information_schema specified in db_metadata
+        '''
+        if self._is_case_insensitive:
+            tablename = tablename.upper()
+
+        if not self.check_if_table_exists(tablename=tablename,
+                                          schema=schema):
+
+            err = "Table {} does not exist".format(tablename)
+            self._log.error(err)
+            raise Exception(err)
+
+        try:
+            if query is None:
+                if self._db_metadata is None:
+                    raise Exception('Please specify the query')
+
+                else:
+                    schema_or_default_schema =\
+                        self._db_metadata['default_schema']\
+                        if schema is None else schema
+
+                    query = """SELECT DISTINCT {0}
+                               FROM {1}.columns
+                               WHERE {2} = '{3}'
+                               AND {4} = '{5}'
+                            """.format(
+                            self._db_metadata['column_col'],
+                            self._db_metadata['information_schema'],
+                            self._db_metadata['schema_col'],
+                            schema_or_default_schema,
+                            self._db_metadata['table_col'],
+                            tablename)
+
+            colnames = [c.lower() for c in
+                        self.read_sql_to_dataframe(query).iloc[:, 0].tolist()]
+
+        except Exception as e:
+            self._log.warn((
+                'Could not select columns from '
+                'informational schema. Trying to '
+                'load the table into a dataframe and selec column names.'
+                'Obtained exception {}').format(e))
+
+            query = """SELECT *
+                       FROM {0}{1}
+                       LIMIT 1
+                    """.format('' if schema is None else schema + '.',
+                               tablename)
+
+            data = self.execute(query)
+            colnames = data.columns.tolist()
+
+        return colnames
+
+    def load_csv_to_db(self, filename: str,
+                       tablename: str,
+                       schema: str = None,
+                       query: str = None,
+                       **kwargs):
+        '''
+        Tries to load data from csv file to database with a given query.
+        If this does not work, tries to load data from csv to a
+        pandas dataframe first, and then write it to the database.
+
+        :param str filename:
+        :param str tablename:
+        :param str schema:
+        :param str query: if not specified, tries to use
+        LOAD DATA LOCAL INFILE query
+        '''
+
+        if not self.check_if_table_exists(tablename=tablename,
+                                          schema=schema):
+
+            err = ('Table {} test does not exit.'
+                   'Please create it first').format(tablename)
+            self._log.error(err)
+            raise Exception(err)
+
+        else:
+            try:
+                if query is None:
+                    query = """LOAD DATA LOCAL INFILE '{0}'
+                               INTO TABLE {1}{2}
+                               COLUMNS TERMINATED BY ','
+                               OPTIONALLY ENCLOSED BY '"'
+                               LINES TERMINATED BY '\r\n'
+                               IGNORE 1 LINES
+                               ({3})
+                               ;""".format(
+                                   filename,
+                                   '' if schema is None else schema + '.',
+                                   tablename,
+                                   ','.join(self.get_column_names(tablename)))
+
+                self.execute(query)
+
+            except Exception as e:
+                err = ("Could not load the file {0} "
+                       "to the table {1} ."
+                       "Finished with error {2}")\
+                       .format(filename, tablename, e)
+
+                self._log.error(err)
+                raise Exception(err)
+
+    def read_sql_to_dataframe(self, query: str, **read_sql_kwargs):
+        '''
+        :param str query: normally a SELECT sql statement
+        :param read_sql_kwargs: additional arguments to pandas read_sql method
+        :return: selected data
+        :rtype: DataFrame
+        '''
+        try:
+            connection = self._engine.connect()
+
+            data = pd.read_sql(sql=query,
+                               con=connection,
+                               **read_sql_kwargs)
+
+            connection.close()
+            return data
+
+        except Exception as e:
+            err = ("Could not read the query to a dataframe. "
+                   "Finished with error {}").format(e)
+
+            self._log.error(err)
+            raise Exception(err)
+
+    def read_table(self, tablename: str,
+                   schema: str = None,
+                   **read_sql_kwargs):
+        '''
+        :param str tablename:
+        :param str schema:
+        :param read_sql_kwargs: additional arguments to pands read_sql_method
+        :return: selected table
+        :rtype: DataFrame
+        '''
+        schema = '' if schema is None else schema + '.'
+
+        try:
+            return self.read_sql_to_dataframe(
+                    query="SELECT * FROM {0}{1};".format(schema, tablename),
+                    **read_sql_kwargs)
+        except Exception as e:
+            err = ("Could not read the table {0} to a dataframe. "
+                   "Finished with error {1}").format(tablename, e)
+
+            self._log.error(err)
+            raise Exception(err)
+
+    def append_to_table(self, data: pd.DataFrame,
+                        tablename: str,
+                        schema: str = None,
+                        to_sql_kwargs={'index': False}):
+        '''
+        :param DataFrame data: data to append
+        :param str tablename: table where data is appended
+        :param str schema:
+        :param dict to_sql_kwargs: additional arguments to pandas to_sql method
+        '''
+        if schema is not None:
+            self.create_schema(schema)
+
+        try:
+            connection = self._engine.connect()
+
+            data.to_sql(name=tablename,
+                        schema=schema,
+                        con=connection,
+                        if_exists='append',
+                        **to_sql_kwargs)
+
+            connection.close()
+
+        except Exception as e:
+            err = ("Could append data to the table {0}. "
+                   "Finished with error {1}").format(tablename, e)
+
+            self._log.error(err)
+            raise Exception(err)
+
+    def overwrite_table(self, data: pd.DataFrame,
+                        tablename: str,
+                        schema: str = None,
+                        to_sql_kwargs={'index': False}):
+        '''
+        :param DataFrame data: data to write to dabase
+        :param str tablename: table where data is written
+        :param str schema:
+        :param to_sql_kwargs: additional arguments to pandas to_sql method
+        '''
+
+        if schema is not None:
+            self.create_schema(schema)
+
+        try:
+
+            connection = self._engine.connect()
+
+            data.to_sql(name=tablename,
+                        schema=schema,
+                        con=connection,
+                        if_exists='replace',
+                        **to_sql_kwargs)
+
+            connection.close()
+
+        except Exception as e:
+            err = ("Could overwrite the table {0}. "
+                   "Finished with error {1}").format(tablename, e)
+
+            self._log.error(err)
+            raise Exception(err)
+
+    def draw_er_diagram_from_db(self, diagram_path: str = None,
+                                schema: str = None,
+                                include_tables: list = None):
+        '''
+        '''
+        if diagram_path is None:
+            diagram_path = "erd.png"
+        else:
+            diagram_dir = os.path.dirname(diagram_path)
+            if diagram_dir != "":
+                os.makedirs(diagram_dir, exist_ok=True)
+
+        import eralchemy
+        eralchemy.render_er(self._db_uri,
+                            diagram_path,
+                            schema=schema,
+                            include_tables=include_tables)

+ 1 - 0
db_handlers/__init__.py

@@ -0,0 +1 @@
+from .db_handlers import *

binární
db_handlers/__pycache__/MongodbHandler.cpython-37.pyc


binární
db_handlers/__pycache__/SQLHandler.cpython-37.pyc


binární
db_handlers/__pycache__/SQLOperations.cpython-37.pyc


+ 352 - 0
db_migration/DataFrameToCollection.py

@@ -0,0 +1,352 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Mon Jul 22 11:05:47 2019
+
+@author: tanya
+
+@description: a function to reshape a pandas dataframe to a list of
+(possibly nested) documents with respect to a (json) mongodb schema
+"""
+
+import pandas as pd
+import os
+import sys
+
+sys.path.append(os.getcwd())
+
+
+class DataFrameToCollection:
+    '''
+    '''
+    def __init__(self, schema_path: str = None, log_path: str = None):
+        '''
+        '''
+        from libraries.log import Log
+        import json
+
+        self._log = Log("ParseJsonSchema")
+
+        if schema_path is not None:
+
+            if not os.path.isfile(schema_path):
+                err = "JsonSchema not found"
+                self._log.error(err)
+                raise FileNotFoundError(err)
+
+            # load schema to dictionary if it is a valid json file
+            try:
+                with open(schema_path, "r") as f:
+                    self.schema = json.load(f)
+
+            except Exception as e:
+                err = ("Could not load json schema, "
+                       "Obtained error {}".format(e))
+
+                self._log.error(err)
+                raise Exception(err)
+
+        else:
+            self.schema = None
+
+    def to_list_of_documents(self, data: pd.DataFrame,
+                             grp_fields: list,
+                             schema: dict = None,
+                             _return_data: bool = False) -> list:
+        '''
+        Reshapes a pandas dataframe to a list of documents according
+         to a complex (json) mongodb schema
+
+         Remark1: column names of data need to reflect the "nestedness"
+         of the field in the mongodb schema with the help of a "." separator
+         Example: field.sub_field_1, field.sub_field_2
+
+         Remark2: if the schema is stored as a json file, first load it
+         to a dictionary with the help of the python json module
+        '''
+        from copy import deepcopy
+        from libraries.log import Log
+
+        log = Log("reshape_dataframe_to_list_of_documents:")
+
+        data = self._melt_duplicated_columns(data)
+
+        reshaped_fields = []
+
+        if schema is None:
+            schema = self.schema
+
+        for field in schema["properties"]:
+
+            if field not in self._unroll_nested_names(data.columns):
+                continue
+
+            field_type = schema["properties"][field]["bsonType"]
+
+            # if field has a simple type
+            if field_type not in ["array", "object"]:
+
+                grp_fields = [c for c in grp_fields if c in data.columns]
+
+                n_distinct_values = data.groupby(grp_fields)[field].nunique()\
+                                        .max()
+
+                if n_distinct_values != 1:
+                    err = "Field {0} is not unique with respect to {1}"\
+                          .format(field, grp_fields)
+
+                    log.error(err)
+                    raise Exception(err)
+
+                if field not in grp_fields:
+                    reshaped_field = data.groupby(grp_fields)[field].first()
+                else:
+                    reshaped_field =\
+                        data[grp_fields].drop_duplicates()\
+                        .set_index(grp_fields, drop=False)[field]
+
+                reshaped_fields.append(reshaped_field)
+
+            # if field is sub-document (dictionary)
+            elif field_type == "object":
+
+                sub_schema = deepcopy(schema["properties"][field])
+
+                # rename sub-schema properties to match with data column names
+                sub_schema["properties"] =\
+                    {".".join([field, k]): v for k, v
+                     in sub_schema["properties"].items()}
+
+                sub_data = self.to_list_of_documents(
+                            data=data,
+                            schema=sub_schema,
+                            grp_fields=grp_fields,
+                            _return_data=True)
+
+                reshaped_field = sub_data.apply(self._make_dict, axis=1)
+                reshaped_field.name = field
+
+                reshaped_fields.append(reshaped_field)
+
+            # if field is a list of dictionaries
+            elif field_type == "array":
+
+                items_type = schema["properties"][field]["items"]["bsonType"]
+
+                if items_type == "object":
+
+                    sub_schema = deepcopy(schema["properties"][field]["items"])
+
+                    # rename sub-schema properties to match data column names
+                    sub_schema["properties"] =\
+                        {".".join([field, k]): v for k, v in
+                         sub_schema["properties"].items()}
+
+                    # extend grp fields by sub-fields of field simple types
+                    sub_grp_fields =\
+                        [f for f in sub_schema["properties"]
+                         if sub_schema["properties"][f]["bsonType"]
+                         not in ["array", "object"]]
+
+                    if len(sub_grp_fields) == 0:
+                        err = ("One of the sub-keys in a list of documents"
+                               " must be of simple type for the field {}"
+                               .format(field))
+
+                        log.error(err)
+                        raise Exception(err)
+
+                    # group and reshape sub-fields with complex types
+                    sub_data = self.to_list_of_documents(
+                                data=data,
+                                schema=sub_schema,
+                                grp_fields=grp_fields + sub_grp_fields,
+                                _return_data=True)
+
+                    if sub_data is not None:
+
+                        # gether the results into a list of dictionaries
+                        sub_data = sub_data.apply(self._make_dict, axis=1)
+
+                        sub_data.name = field
+                        sub_data = sub_data.reset_index(grp_fields)
+
+                        reshaped_field =\
+                            sub_data.groupby(grp_fields)[field]\
+                                    .apply(self._make_list_of_distinct)
+
+                        reshaped_fields.append(reshaped_field)
+
+                # if field is a list of values with simple type
+                else:
+
+                    grp_fields = [c for c in grp_fields if c in data.columns]
+
+                    if field in data.columns:
+
+                        reshaped_field = data.groupby(grp_fields)[field]\
+                                           .apply(self._make_list_of_distinct)
+
+                        reshaped_fields.append(reshaped_field)
+
+        if len(reshaped_fields) > 0:
+            reshaped_data = pd.concat(reshaped_fields, axis=1)
+
+            if not _return_data:
+
+                list_of_documents =\
+                    reshaped_data.drop(list(reshaped_data.index.names),
+                                       axis=1, errors="ignore")\
+                                 .reset_index(drop=False)
+
+                log.info("Done reshaping the dataframe to a list of documents")
+
+                return list_of_documents
+
+            else:
+
+                return reshaped_data
+
+    def _melt_duplicated_columns(self, data: pd.DataFrame) -> pd.DataFrame:
+        '''
+        '''
+        for c in set(data.columns):
+            if isinstance(data[c], pd.DataFrame):
+                data = pd.melt(data, id_vars=[cc for cc in data.columns
+                                              if cc != c], value_vars=c)\
+                         .drop("variable", axis=1)\
+                         .rename(columns={"value": c})
+
+        return data
+
+    def _make_dict(self, x: pd.Series) -> dict:
+        '''
+        return: transforms pandas series to a dictionary
+         is meant to be applied to a dataframe in axis = 1,
+         then the index of the input series are the column names
+         of the dataframe
+        '''
+        return {f.split(".")[-1]: x[f] for f in x.index}
+
+    def _make_list(self, x: pd.Series) -> list:
+        '''
+        return: list of values in a series
+        '''
+        return list(x)
+
+    def _make_list_of_distinct(self, x: pd.Series) -> list:
+        '''
+        return: list of unique values from a Series where
+         entries are arbitrary objects
+         (pandas unique() method does not work if entries are of complex types)
+        '''
+        distinct = []
+        [distinct.append(obj) for obj in x if obj not in distinct]
+        return distinct
+
+    def _unroll_nested_names(self, columns: list) -> list:
+        '''
+        '''
+        unrolled = []
+
+        for c in columns:
+            splitted = c.split(".")
+            for i in range(len(splitted)):
+                unrolled.append(".".join(splitted[:i+1]))
+
+        return unrolled
+
+
+if __name__ == "__main__":
+
+    # Testing
+
+    df = pd.DataFrame({
+                       "a": [1]*8 + [2]*8,
+                       "b": [10]*8 + [20]*8,
+                       "c": [100, 200]*8,
+                       "d.da": [11]*8 + [22]*8,
+                       "d.db": [33]*8 + [34]*8,
+                       "e.ea.eaa": [5]*8 + [55]*8,
+                       "e.ea.eab": [6]*8 + [66]*8,
+                       "e.eb": [2, 2, 3, 3]*4,
+                       "e.ec.eca": [1, 2, 3, 4]*4,
+                       "e.ec.ecb": [5, 6, 7, 8]*4,
+                       "f.fa": [1]*4 + [3]*4 + [11]*4 + [33]*4,
+                       "f.fb": [2]*4 + [3]*2 + [4]*2 + [22]*4 + [44]*4})
+
+    duplicate = pd.DataFrame({"c": [300, 400]*8})
+
+    df = pd.concat([df, duplicate], axis=1)
+
+    schm = {
+              "bsonType": "object",
+              "required": ["a"],
+              "properties": {
+
+                  "a": {"bsonType": "integer"},
+
+                  "b": {"bsonType": "integer"},
+
+                  "c": {
+                      "bsonType": "array",
+                      "items": {"bsonType": "integer"}
+                  },
+                  "d": {
+                      "bsonType": "object",
+                      "properties": {
+                          "da": {"bsonType": "integer"},
+                          "db": {"bsonType": "integer"}
+                       }
+                  },
+                  "e": {
+                      "bsonType": "object",
+                      "properties": {
+                          "ea": {
+                              "bsonType": "object",
+                              "properties": {
+                                  "eaa": {"bsonType": "integer"},
+                                  "eab": {"bsonType": "integer"}
+                               }
+
+                          },
+
+                          "eb": {
+                              "bsonType": "array",
+                              "items": {"bsonType": "integer"}
+                          },
+
+                          "ec": {
+                                "bsonType": "array",
+                                "items": {
+                                  "bsonType": "object",
+                                  "properties": {
+                                      "eca": {"bsonType": "integer"},
+                                      "ecb": {"bsonType": "integer"}
+                                    }
+                                  }
+                          }
+                      }
+                  },
+                  "f": {
+                      "bsonType": "array",
+                      "items": {
+                          "bsonType": "object",
+                          "properties": {
+                              "fa": {"bsonType": "integer"},
+                              "fb": {
+                                  "bsonType": "array",
+                                  "items": {"bsonType": "integer"}
+                              }
+                          }
+                      }
+                  }
+              }
+              }
+
+    grp_fields = ["a"]
+
+    result = DataFrameToCollection().to_list_of_documents(
+                    data=df,
+                    schema=schm,
+                    grp_fields=grp_fields)

+ 520 - 0
db_migration/MigrationCleaning.py

@@ -0,0 +1,520 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Wed Sep 25 08:09:52 2019
+
+@author: tanya
+"""
+
+import os
+import sys
+import pandas as pd
+import numpy as np
+import gc
+
+sys.path.append(os.getcwd())
+
+from libraries.db_migration.ParseMapping import ParseMapping
+from libraries.db_migration.ParseJsonSchema import ParseJsonSchema
+from libraries.utils.ClassLogging import ClassLogging
+from libraries.utils.CleaningUtils import CleaningUtils
+
+
+class MigrationCleaning(ClassLogging):
+    '''
+    Class for correcting and filtering the incorrect data.
+    We keep the correcting and the filtering methods separated,
+    since there might be other custom steps in between.
+    '''
+    def __init__(self, mapping_path: str,
+                 schema_paths: (str, list),
+                 inconsist_report_table: str = None,
+                 filter_index_columns: (str, list) = None,
+                 mapping_source: str = "internal_name",
+                 mapping_target: str = "mongo_name",
+                 mapping_parser: type = ParseMapping,
+                 schema_parser: type = ParseJsonSchema,
+                 log_name: str = "MigrationCleaning"):
+        '''
+        '''
+        super().__init__(log_name=log_name)
+
+        assert isinstance(inconsist_report_table, str),\
+            "Inconsistent report table should be a tablename string"
+
+        self._inconsist_report_table = inconsist_report_table
+
+        assert isinstance(filter_index_columns, (str, list)),\
+            "Filter index columns must be a str or a list"
+
+        self._filter_index_columns = list(filter_index_columns)
+
+        self._schema_parser = schema_parser(schema_paths)
+
+        self._mapping_parser = mapping_parser(mapping_path,
+                                              source=mapping_source,
+                                              target=mapping_target)
+
+        self._mapping_path = mapping_path
+        self._schema_paths = schema_paths
+
+    def _assert_dataframe_input(self, data: pd.DataFrame):
+        '''
+        '''
+        assert(isinstance(data, pd.DataFrame)),\
+            "Parameter 'data' must be a pandas dataframe"
+
+    @property
+    def _field_mapping(self):
+        '''
+        '''
+        return self._mapping_parser.get_field_mapping()
+
+    @property
+    def _required_fields(self):
+        '''
+        '''
+        source_required_fields = self._mapping_parser.get_required_fields()
+        target_required_fields = self._schema_parser.get_required_fields()
+
+        for source_field, target_field in self._field_mapping.items():
+
+            if (target_field in target_required_fields) and\
+                    (source_field not in source_required_fields):
+
+                source_required_fields.append(source_field)
+
+        return source_required_fields
+
+    @property
+    def _default_values(self):
+        '''
+        '''
+        default_values = {}
+
+        target_default_values = self._schema_parser.get_default_values()
+        source_default_values = self._mapping_parser.get_default_values()
+
+        for source_field, target_field in self._field_mapping.items():
+
+            if source_field not in source_default_values:
+                continue
+
+            elif target_field not in target_default_values:
+
+                target_default_values[target_field] = np.nan
+
+            default_values[source_field] = {
+                    target_default_values[target_field]:
+                    source_default_values[source_field]
+                    }
+
+        return default_values
+
+    @property
+    def _python_types(self):
+        '''
+        '''
+        target_types = self._schema_parser.get_python_types()
+
+        result = {}
+
+        for source_field, target_field in self._field_mapping.items():
+
+            if target_field in target_types:
+                result[source_field] = target_types[target_field]
+
+            """
+            date_type_mismatch =\
+                    (target_field in target_types) and\
+                    (source_field in source_types) and\
+                    (target_types[target_field] == str) and\
+                    (source_types[source_field] == np.dtype('<M8[ns]'))
+
+            if date_type_mismatch:
+                target_types[target_field] = np.dtype('<M8[ns]')
+
+            if (source_field in source_types) and\
+                    (target_field in target_types) and\
+                    (target_types[target_field] != source_types[source_field]):
+
+                self.log_and_raise(("Type {0} of field {1} "
+                                    "in schema does not match "
+                                    "type {2} of field {3} in "
+                                    "migration mapping")
+                                   .format(target_types[target_field],
+                                           target_field,
+                                           source_types[source_field],
+                                           source_field))
+
+            if target_field in target_types:
+                source_types[source_field] = target_types[target_field]
+
+            """
+
+        return result
+
+    @property
+    def _value_mappings(self):
+        '''
+        '''
+        return self._mapping_parser.get_value_mappings()
+
+    @property
+    def _date_formats(self):
+        '''
+        '''
+        return self._mapping_parser.get_date_formats()
+
+    def _get_mongo_schema_info(self, method_name: str):
+        '''
+        '''
+        result = {}
+
+        target_dict = getattr(self._schema_parser, method_name)()
+
+        for source_field, target_field in self._field_mapping.items():
+
+            if target_field in target_dict:
+
+                result[source_field] = target_dict[target_field]
+
+        return result
+
+    @property
+    def _allowed_values(self):
+        '''
+        '''
+        return self._get_mongo_schema_info("get_allowed_values")
+
+    @property
+    def _minimum_values(self):
+        '''
+        '''
+        return self._get_mongo_schema_info("get_minimum_value")
+
+    @property
+    def _maximum_values(self):
+        '''
+        '''
+        return self._get_mongo_schema_info("get_maximum_value")
+
+    @property
+    def _patterns(self):
+        '''
+        '''
+        return self._get_mongo_schema_info("get_patterns")
+
+    def _filter_invalid_data(self, data: pd.DataFrame,
+                             invalid_mask: pd.Series,
+                             reason: (str, pd.Series)) -> pd.DataFrame:
+        '''
+        '''
+        from libraries.db_handlers.SQLHandler import SQLHandler
+
+        assert((self._inconsist_report_table is not None) and
+               (self._filter_index_columns is not None)),\
+            "Inconsistent report table or filter index is not provided"
+
+        self._assert_dataframe_input(data)
+
+        data = data.copy(deep=True)
+
+        db = SQLHandler()
+
+        if invalid_mask.sum() == 0:
+
+            return data
+
+        data_inconsist = data.assign(reason=reason)\
+                             .loc[invalid_mask]\
+                             .reset_index(drop=True)
+
+        db.append_to_table(data=data_inconsist,
+                           tablename=self._inconsist_report_table)
+
+        n_rows_filtered = len(data_inconsist)
+        n_instances_filtered = len(data_inconsist[self._filter_index_columns].drop_duplicates())
+
+        del data_inconsist
+        gc.collect()
+
+        self._log.warning(("Filtering: {0} ."
+                           "Filtered {1} rows "
+                           "and {2} instances"
+                           .format(reason, n_rows_filtered, n_instances_filtered)))
+
+        nok_index_data = data.loc[invalid_mask, self._filter_index_columns]\
+                             .drop_duplicates().reset_index(drop=True)
+
+        nok_index = pd.MultiIndex.from_arrays([nok_index_data[c] for c in
+                                               self._filter_index_columns])
+
+        all_index = pd.MultiIndex.from_arrays([data[c] for c in
+                                               self._filter_index_columns])
+
+        data = data.loc[~all_index.isin(nok_index)].reset_index(drop=True)
+
+        return data
+
+    def _replace_values(self, data: pd.DataFrame,
+                        default: bool) -> pd.DataFrame:
+        '''
+        '''
+        if default:
+            default_str = "default"
+        else:
+            default_str = "equal"
+
+        self._assert_dataframe_input(data)
+
+        data = data.copy(deep=True)
+
+        if default:
+            mapping = self._default_values
+        else:
+            mapping = self._value_mappings
+
+        for column, d in mapping.items():
+
+            try:
+
+                if column not in data.columns:
+                    continue
+
+                dtype = data[column].dtype
+
+                for key, values in d.items():
+
+                    if not default:
+
+                        mask = (data[column].astype(str).isin(values))
+
+                    else:
+                        mask = (data[column].isin(values))
+
+                    if default:
+
+                        mask = mask | (data[column].isnull())
+
+                    data.loc[mask, column] = key
+
+                data[column] = data[column].astype(dtype)
+
+            except Exception as e:
+
+                self.log_and_raise(("Failed to replace {0} values "
+                                    "in {1}. Exit with error {2}"
+                                    .format(default_str, column, e)))
+
+        self._log.info("Replaced {} values".format(default_str))
+
+        return data
+
+    def replace_default_values(self, data: pd.DataFrame) -> pd.DataFrame:
+        '''
+        '''
+        return self._replace_values(data=data, default=True)
+
+    def map_equal_values(self, data: pd.DataFrame) -> pd.DataFrame:
+        '''
+        '''
+        return self._replace_values(data=data, default=False)
+
+    def convert_types(self, data: pd.DataFrame) -> pd.DataFrame:
+        '''
+        '''
+        self._assert_dataframe_input(data)
+
+        for column, python_type in self._python_types.items():
+
+            try:
+                if column not in data.columns:
+                    continue
+
+                elif column in self._date_formats:
+
+                    data[column] = CleaningUtils.convert_dates(
+                            series=data[column],
+                            formats=self._date_formats[column])
+
+                elif (python_type == int) and data[column].isnull().any():
+
+                    self.log_and_raise(("Column {} contains missing values "
+                                        "and cannot be of integer type"
+                                        .format(column)))
+
+                elif python_type == str:
+
+                    python_type = object
+
+                else:
+
+                    data[column] = data[column].astype(python_type)
+
+                if data[column].dtype != python_type:
+
+                    self._log.warning(("After conversion type in {0} "
+                                       "should be {1} "
+                                       "but is still {2}"
+                                       .format(column,
+                                               python_type,
+                                               data[column].dtype)))
+
+            except Exception as e:
+
+                self.log_and_raise(("Failed to convert types in {0}. "
+                                    "Exit with error {1}"
+                                    .format(column, e)))
+
+        self._log.info("Converted dtypes")
+
+        return data
+
+    def filter_invalid_null_values(self, data: pd.DataFrame) -> pd.DataFrame:
+        '''
+        '''
+        self._assert_dataframe_input(data)
+
+        for column in data.columns:
+
+            if (column in self._required_fields) and\
+                    (data[column].isnull().any()):
+
+                invalid_mask = data[column].isnull()
+
+                reason = "Null value in the required field {}"\
+                         .format(column)
+
+                data = self._filter_invalid_data(data=data,
+                                                 invalid_mask=invalid_mask,
+                                                 reason=reason)
+
+        return data
+
+    def filter_invalid_types(self, data: pd.DataFrame) -> pd.DataFrame():
+        '''
+        '''
+        self._assert_dataframe_input(data)
+
+        for column, python_type in self._python_types.items():
+
+            if data[column].dtype != python_type:
+
+                def mismatch_type(x):
+                    return type(x) != python_type
+
+                invalid_mask = data[column].apply(mismatch_type)
+
+                reason = "Type mismatch if field {}".format(column)
+
+                data = self._filter_invalid_data(data=data,
+                                                 invalid_mask=invalid_mask,
+                                                 reason=reason)
+
+        return data
+
+    def filter_invalid_patterns(self, data: pd.DataFrame) -> pd.DataFrame:
+        '''
+        '''
+        self._assert_dataframe_input(data)
+
+        for column, pattern in self._patterns:
+
+            invalid_mask = (~data[column].astype(str).str.match(pattern))
+
+            reason = "Pattern mismatch in field {}".format(column)
+
+            data = self._filter_invalid_data(data=data,
+                                             invalid_mask=invalid_mask,
+                                             reason=reason)
+
+        return data
+
+    def filter_notallowed_values(self, data: pd.DataFrame) -> pd.DataFrame:
+        '''
+        '''
+        for column, value in self._minimum_values.items():
+
+            invalid_mask = data[column] > value
+
+            reason = "Too large values in field {}".format(column)
+
+            data = self._filter_invalid_data(data=data,
+                                             invalid_mask=invalid_mask,
+                                             reason=reason)
+
+        for column, value in self._maximum_values.items():
+
+            invalid_mask = data[column] < value
+
+            reason = "Too small values in field {}".format(column)
+
+            data = self._filter_invalid_data(data=data,
+                                             invalid_mask=invalid_mask,
+                                             reason=reason)
+
+        for column, allowed_values in self._allowed_values.items():
+
+            invalid_mask = (~data[column].isin(allowed_values))
+
+            reason = "Too small values in field {}".format(column)
+
+            data = self._filter_invalid_data(data=data,
+                                             invalid_mask=invalid_mask,
+                                             reason=reason)
+
+        return data
+
+
+if __name__ == "__main__":
+
+    # testing
+
+    from libraries.db_handlers.SQLHandler import SQLHandler
+
+    mapping_path = os.path.join(".", "migration_mappings", "rs1_mapping.json")
+
+    schema_paths = [
+            os.path.join(".", "mongo_schema", "schema_wheelsets.json"),
+            os.path.join(".", "mongo_schema", "schema_process_instances.json")]
+
+    inconsist_report_table = "test_inconsist_report_rs1"
+
+    if all([os.path.isfile(p) for p in schema_paths + [mapping_path]]):
+
+        print("Found schemas!")
+
+        cleaner = MigrationCleaning(
+                mapping_path=mapping_path,
+                schema_paths=schema_paths,
+                mapping_source="internal_name",
+                mapping_target="mongo_name",
+                filter_index_columns=["radsatznummer"],
+                inconsist_report_table=inconsist_report_table)
+
+        db = SQLHandler()
+
+        data = db.read_sql_to_dataframe("select * from rs1 limit 100")
+
+        data = cleaner.replace_default_values(data)
+
+        data = cleaner.map_equal_values(data)
+
+        data = cleaner.convert_types(data)
+
+        non_filtered_len = len(data)
+
+        data = cleaner.filter_invalid_types(data)
+
+        if len(data) < non_filtered_len:
+
+            data = cleaner.convert_types(data)
+
+        data = cleaner.filter_invalid_null_values(data)
+
+        data = cleaner.filter_invalid_patterns(data)
+
+        data = cleaner.filter_notallowed_values(data)
+
+    print("Done!")

+ 62 - 0
db_migration/ParseDbSchema.py

@@ -0,0 +1,62 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Wed Sep 25 08:22:20 2019
+
+@author: tanya
+"""
+
+import os
+import sys
+import abc
+sys.path.append(os.getcwd())
+
+
+class ParseDbSchema(metaclass=abc.ABCMeta):
+    '''
+    '''
+    def __init__(self, schema_paths: [list, str], log_file: str = None):
+        '''
+        '''
+        from libraries.log import Log
+
+        self._log = Log(name="ParseDbSchema:", log_file=log_file)
+
+        if isinstance(schema_paths, str):
+            schema_paths = [schema_paths]
+
+        for schema_path in schema_paths:
+            if not os.path.isfile(schema_path):
+                err = "Schema not found"
+                self._log.error(err)
+                raise FileNotFoundError(err)
+
+    @abc.abstractmethod
+    def get_fields(self) -> list:
+        '''
+        '''
+        return
+
+    @abc.abstractmethod
+    def get_datetime_fields(self) -> list:
+        '''
+        '''
+        return
+
+    @abc.abstractmethod
+    def get_python_types(self) -> list:
+        '''
+        '''
+        return
+
+    @abc.abstractmethod
+    def get_default_values(self) -> list:
+        '''
+        '''
+        return
+
+    @abc.abstractmethod
+    def get_allowed_values(self) -> list:
+        '''
+        '''
+        return

+ 332 - 0
db_migration/ParseJsonSchema.py

@@ -0,0 +1,332 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Thu Jan 31 11:41:48 2019
+
+@author: tanya
+"""
+
+import os
+import sys
+from copy import deepcopy
+import numpy as np
+
+sys.path.append(os.getcwd())
+
+from libraries.db_migration.ParseDbSchema import ParseDbSchema
+
+
+class ParseJsonSchema(ParseDbSchema):
+    '''
+    Class for retrieving column properties from mongodb jsonSchema
+    '''
+
+    def __init__(self, schema_paths: [list, str], log_file: str = None):
+        '''
+        '''
+        import json
+        from libraries.log import Log
+
+        super().__init__(schema_paths=schema_paths, log_file=log_file)
+
+        self._log = Log(name="ParseJsonSchema", log_file=log_file)
+
+        # load schemas to dictionaries if they are valid json files
+
+        assert(isinstance(schema_paths, (list, str))),\
+            "Schema paths must be either str or lists"
+
+        if isinstance(schema_paths, str):
+            schema_paths = [schema_paths]
+
+        self.schemas = []
+
+        for schema_path in schema_paths:
+            try:
+                with open(schema_path, "r") as f:
+                    self.schemas.append(json.load(f))
+
+            except Exception as e:
+                err = ("Could not load json schema, "
+                       "Obtained error {}".format(e))
+
+                self._log.error(err)
+                raise Exception(err)
+
+    def get_fields(self) -> list:
+        '''
+        '''
+        return self._parse()
+
+    def get_required_fields(self) -> list:
+        '''
+        '''
+        return self._parse(required_only=True)
+
+    def get_mongo_types(self) -> dict:
+        '''
+        '''
+        return self._parse(field_info="bsonType")
+
+    def get_datetime_fields(self):
+        '''
+        '''
+        mongo_types = self.get_mongo_types()
+
+        return [k for k, v in mongo_types.items()
+                if v in ["date", "timestamp", "Date", "Timestamp"]]
+
+    def get_python_types(self) -> dict:
+        '''
+        '''
+        mongo_types = self.get_mongo_types()
+        python_types = {}
+
+        bson_to_python_types_except_dates = {"double": float,
+                                             "decimal": float,
+                                             "string": str,
+                                             "object": object,
+                                             "array": list,
+                                             "bool": bool,
+                                             "int": int,
+                                             "long": int,
+                                             "date": np.dtype('<M8[ns]'),
+                                             "timestamp": np.dtype('<M8[ns]')
+                                             }
+
+        for k, v in mongo_types.items():
+
+            if isinstance(v, list):
+                if ("date" in v) or ("timestamp" in v):
+                    v = "date"
+                elif "string" in v:
+                    v = "string"
+                elif ("double" in v) or ("decimal" in v):
+                    v = "double"
+                elif ("null" in v) and (len(v) == 2) and ("int" not in v):
+                    v = [t for t in v if type != "null"][0]
+                else:
+                    err = "Type {0}: {1} not convertibale".format(k, v)
+                    self._log.error(err)
+                    raise Exception(err)
+
+            if v in bson_to_python_types_except_dates:
+                python_types[k] = bson_to_python_types_except_dates[v]
+
+        return python_types
+
+    def get_patterns(self) -> dict:
+        '''
+        '''
+        return self._parse(field_info="pattern")
+
+    def get_default_values(self) -> dict:
+        '''
+        '''
+        return self._parse(field_info="default")
+
+    def get_allowed_values(self) -> dict:
+        '''
+        '''
+        return self._parse(field_info="enum")
+
+    def get_maximum_value(self) -> dict:
+        '''
+        '''
+        return self._parse(field_info="maximum")
+
+    def get_minimum_value(self) -> dict:
+        '''
+        '''
+        return self._parse(field_info="minimum")
+
+    def get_max_items(self) -> dict:
+        '''
+        '''
+        return self._parse(field_info="maxItems")
+
+    def get_min_items(self) -> dict:
+        '''
+        '''
+        return self._parse(field_info="minItems")
+
+    def get_field_descriptions(self) -> dict:
+        '''
+        '''
+        return self._parse(field_info="description")
+
+    def _parse(self,
+               field_info: str = None,
+               required_only: bool = False):
+        '''
+        '''
+        result = self._parse_one(schema=self.schemas[0],
+                                 field_info=field_info,
+                                 required_only=required_only)
+
+        for schema in self.schemas[1:]:
+
+            next_result = self._parse_one(schema=schema,
+                                          field_info=field_info,
+                                          required_only=required_only)
+
+            if isinstance(result, list):
+                result.extend(next_result)
+            else:
+                result.update(next_result)
+
+        return result
+
+    def _parse_one(self,
+                   schema: dict,
+                   field_info: str = None,
+                   required_only: bool = False,
+                   super_field_name: str = None,
+                   already_parsed: (list, dict) = None) -> (list, dict):
+        '''
+        Recursive function that returns a list of (nested) field names or
+        a dictionary of (nested) field names with field characteristics.
+
+        :param schema: if None => entire self.schema, or a sub-schema
+            of self.schema
+
+        :param field_info: optional, if provided a dictionary of field
+            names with field characteristics is returned (for examples
+            bsonType of each field), else a list of fields is returned
+
+        :param required_only: when True, only returns fields marked as
+            required in the mongo schema
+
+        :param super_field_name: needed for recursion
+            Example: the field 'article' has
+            subfields 'id' and 'supplier'.
+            If we parse the sub-document corresponding to article, then
+            super_field_name is'article' and we might get an output like
+            {'article.id': string, 'article.supplier': string}
+
+        :param alread_parsed: needed for recursion
+
+        '''
+        schema = deepcopy(schema)
+
+        assert(isinstance(schema, dict)),\
+            "Parameter 'schema' must be a dict"
+
+        if field_info is None:
+            # parse a list of fields
+            if already_parsed is None:
+                already_parsed = []
+            else:
+                assert(isinstance(already_parsed, list)),\
+                    "Parameter 'already_parsed' must be of type list"
+        else:
+            # parse a dictionary of field names with field characteristics
+            if already_parsed is None:
+                already_parsed = {}
+            else:
+                assert(isinstance(already_parsed, dict)),\
+                    "Parameter 'already_parsed' must be of type dict"
+
+        # If schema is nested, then
+        # either it is of bsonType object
+        # and the field information is stored under the key 'properties'
+        # or it is of bsonType array
+        # and the field information is stored in sub-schemas
+        # under the key 'items'
+
+        # if schema is of bsonType object
+        if "properties" in schema.keys():
+            if "required" in schema.keys():
+                required_subfields = schema["required"]
+
+            for sub_field_name in schema["properties"].keys():
+
+                sub_schema = schema["properties"][sub_field_name]
+
+                # only process fields that are required
+                if required_only and\
+                        (sub_field_name not in required_subfields):
+                    pass
+                else:
+                    if super_field_name is not None:
+                        field_name = '.'.join([super_field_name,
+                                               sub_field_name])
+                    else:
+                        field_name = sub_field_name
+
+                    # if the given sub-field is nested, parse the
+                    # sub-schema corresponding to this sub-field
+                    self._parse_one(
+                            schema=sub_schema,
+                            super_field_name=field_name,
+                            field_info=field_info,
+                            already_parsed=already_parsed,
+                            required_only=required_only)
+
+        # if schema is of bsonType array
+        elif "items" in schema.keys():
+            # one schema for all items
+            if isinstance(schema["items"], dict):
+
+                sub_schema = schema["items"]
+
+                self._parse_one(schema=sub_schema,
+                                super_field_name=super_field_name,
+                                field_info=field_info,
+                                already_parsed=already_parsed,
+                                required_only=required_only)
+
+            # list of separate schemas for each item
+            elif isinstance(schema["items"], list):
+
+                for sub_schema in schema["items"]:
+                    self._parse_one(schema=sub_schema,
+                                    super_field_name=super_field_name,
+                                    field_info=field_info,
+                                    already_parsed=already_parsed,
+                                    required_only=required_only)
+            else:
+                raise Exception(('Schema is not composed correctly: '
+                                 'items must be a dictionary or a list'))
+        else:
+            # If neither properties nor items is in schema keys
+            # we reached the last level of nestedness,
+            # field information is stored in the schema keys.
+            field_name = super_field_name
+
+            if field_info is None:
+                already_parsed.append(field_name)
+            else:
+                if field_info in schema.keys():
+                    already_parsed[field_name] = schema[field_info]
+                else:
+                    pass
+
+        return already_parsed
+
+
+if __name__ == "__main__":
+
+    # Only for testing
+
+    schema_path = os.path.join(".", "mongo_schema", "schema_wheelsets.json")
+
+    if os.path.isfile(schema_path):
+
+        parse_obj = ParseJsonSchema(schema_paths=schema_path)
+
+        fields = parse_obj.get_fields()
+
+        required_fileds = parse_obj.get_required_fields()
+
+        patterns = parse_obj.get_patterns()
+
+        mongo_types = parse_obj.get_mongo_types()
+
+        python_types_except_dates = parse_obj.get_python_types()
+
+        datetime_fields = parse_obj.get_datetime_fields()
+
+        allowed_values = parse_obj.get_allowed_values()
+
+        descriptions = parse_obj.get_field_descriptions()

+ 157 - 0
db_migration/ParseMapping.py

@@ -0,0 +1,157 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Fri Sep 20 15:33:17 2019
+
+@author: tanya
+"""
+
+import os
+import sys
+import numpy as np
+sys.path.append(os.getcwd())
+
+
+class ParseMapping:
+    '''
+    '''
+    def __init__(self, mapping_path: str, log_name: str = "ParseMapping",
+                 source: str = "original_name", target: str = "original_name"):
+        '''
+        '''
+        import json
+        from libraries.log import Log
+
+        self._log = Log(log_name)
+
+        if not os.path.isfile(mapping_path):
+            err = "Mapping not found"
+            self._log.error(err)
+            raise FileNotFoundError(err)
+
+        try:
+            with open(mapping_path, "r") as f:
+                self._mapping = json.load(f)
+
+        except Exception as e:
+            err = ("Could not load mapping. "
+                   "Exit with error {}".format(e))
+            self._log.error(err)
+            raise Exception(err)
+
+        self._source = source
+        self._target = target
+
+    def get_field_mapping(self) -> dict:
+        '''
+        '''
+        assert(all([set([self._source, self._target]) <= set(d)
+                    for d in self._mapping]))
+
+        return {d[self._source]: d[self._target] for d in self._mapping}
+
+    def _get_fields_satistisfying_condition(self, key: str, value) -> list:
+        '''
+        '''
+        assert(all([self._source in d for d in self._mapping])),\
+            "Invalid from field"
+
+        return [d[self._source] for d in self._mapping
+                if (key in d) and (d[key] == value)]
+
+    def get_required_fields(self) -> list:
+        '''
+        '''
+        return self._get_fields_satistisfying_condition(key="required",
+                                                        value=1)
+
+    def get_date_fields(self) -> list:
+        '''
+        '''
+        return self._get_fields_satistisfying_condition(key="type",
+                                                        value="Date")
+
+    def _get_info(self, key: str, value=None) -> dict:
+        '''
+        '''
+        assert(all([self._source in d for d in self._mapping])),\
+            "Invalid from field"
+
+        return {d[self._source]: d[key] for d in self._mapping
+                if (key in d) and ((value is not None)
+                and (d[key] == value)) or (key in d)}
+
+    def get_default_values(self) -> dict:
+        '''
+        '''
+        return self._get_info(key="default_values")
+
+    def get_date_formats(self) -> dict:
+        '''
+        '''
+        return self._get_info(key="date_format")
+
+    def get_types(self) -> dict:
+        '''
+        '''
+        return self._get_info(key="type")
+
+    def get_python_types(self) -> dict:
+        '''
+        '''
+        sql_to_python_dtypes = {
+                "Text": str,
+                "Date": np.dtype('<M8[ns]'),
+                "Double": float,
+                "Integer": int
+                }
+
+        sql_types = self.get_types()
+
+        return {k: sql_to_python_dtypes[v] for k, v in sql_types.items()}
+
+    def get_value_mappings(self) -> dict:
+        '''
+        '''
+        return self._get_info(key="value_mapping")
+
+    def get_column_numbers(self) -> list:
+        '''
+        '''
+        if all(["column_number" in d for d in self._mapping]):
+            column_numbers = [d["column_number"] for d in self._mapping]
+
+        elif all(["column_number" not in d for d in self._mapping]):
+            column_numbers = list(range(len(self._mapping)))
+
+        else:
+            err = ("Incorrectly filled mapping. Column numbers should ",
+                   "either in all or in neither of the fields")
+            self.log.err(err)
+            raise Exception(err)
+
+        return column_numbers
+
+
+if __name__ == "__main__":
+
+    mapping_path = os.path.join(".", "migration_mappings", "rs0_mapping.json")
+
+    if os.path.isfile(mapping_path):
+
+        print("found mapping path")
+
+        parser = ParseMapping(mapping_path, source="internal_name",
+                              target="mongo_name")
+
+        internal_to_mongo_mapping = parser.get_field_mapping()
+
+        original_to_internal_mapping = parser.get_field_mapping()
+
+        default_values = parser.get_default_values()
+
+        types = parser.get_types()
+
+        column_numbers = parser.get_column_numbers()
+
+        print("Done testing!")

+ 1 - 0
db_migration/__init__.py

@@ -0,0 +1 @@
+from db_migration import *

binární
db_migration/__pycache__/DataFrameToCollection.cpython-37.pyc


binární
db_migration/__pycache__/MigrationCleaning.cpython-37.pyc


binární
db_migration/__pycache__/ParseDbSchema.cpython-37.pyc


binární
db_migration/__pycache__/ParseJsonSchema.cpython-37.pyc


binární
db_migration/__pycache__/ParseMapping.cpython-37.pyc


+ 1 - 1
setup.py

@@ -8,6 +8,6 @@ setup(name='cdplib',
       author_email='tanja.zolotareva@acdp.at',
       install_requires=['numpy'],
       license='MIT',
-      packages=['Animals'],
+      packages=['db_handlers, db_migration'],
       zip_safe=False)