#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Tue Oct 1 11:15:03 2019 @author: tanya """ import os import sys from typing import Callable sys.path.append(os.getcwd()) def get_all_wheelsets(): ''' return: list of distinct wheelset numbers in the process ''' from libraries.db_handlers.SQLHandler import SQLHandler sql_db = SQLHandler() query = "SELECT DISTINCT radsatznummer FROM rs1" return sql_db.read_sql_to_dataframe(query)["radsatznummer"].tolist() def parallelized_import(all_instances: list, mongo_schema_path: str, import_chunk: Callable, log_name: str = None): from concurrent.futures import ThreadPoolExecutor from libraries.db_handlers.MongodbHandler import MongodbHandler from libraries.log import Log import argparse argparser = argparse.ArgumentParser(description='Import process instances collection') argparser.add_argument('--chunksize', type=int, default=100, help="Number of wheelsets processed at a time") argparser.add_argument('--max_workers', type=int, default=10, help="Number of workers in ThreadPoolExecutor") args = argparser.parse_args() log = Log(log_name) log.info("Start application") log.info("Processing {0} wheelsets at a time parallelized with {1} workers" .format(args.chunksize, args.max_workers)) collection_name = os.path.basename(mongo_schema_path).strip("schema_").split(".")[0] mongodb = MongodbHandler() mongodb.create_collection_and_set_schema( collection_name=collection_name, schema_path=mongo_schema_path) try: n_chunks = len(all_instances)//args.chunksize + 1 with ThreadPoolExecutor(max_workers=args.max_workers) as executor: for i in range(n_chunks): executor.submit(import_chunk, all_instances[i*args.chunksize:(i+1)*args.chunksize], i) except Exception as e: err = ("Failed to import {0} in mongodb. " "Exit with error: {1}".format(collection_name, e)) log.error(err) raise Exception(e) log.info("Finished application")