123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Created on Tue Oct 1 11:15:03 2019
- @author: tanya
- """
- import os
- import sys
- from typing import Callable
- sys.path.append(os.getcwd())
- def get_all_wheelsets():
- '''
- return: list of distinct wheelset numbers in the process
- '''
- from libraries.db_handlers.SQLHandler import SQLHandler
- sql_db = SQLHandler()
- query = "SELECT DISTINCT radsatznummer FROM rs1"
- return sql_db.read_sql_to_dataframe(query)["radsatznummer"].tolist()
- def parallelized_import(all_instances: list,
- mongo_schema_path: str,
- import_chunk: Callable,
- log_name: str = None):
- from concurrent.futures import ThreadPoolExecutor
- from libraries.db_handlers.MongodbHandler import MongodbHandler
- from libraries.log import Log
- import argparse
- argparser = argparse.ArgumentParser(description='Import process instances collection')
- argparser.add_argument('--chunksize', type=int, default=100, help="Number of wheelsets processed at a time")
- argparser.add_argument('--max_workers', type=int, default=10, help="Number of workers in ThreadPoolExecutor")
- args = argparser.parse_args()
- log = Log(log_name)
- log.info("Start application")
- log.info("Processing {0} wheelsets at a time parallelized with {1} workers"
- .format(args.chunksize, args.max_workers))
- collection_name = os.path.basename(mongo_schema_path).strip("schema_").split(".")[0]
- mongodb = MongodbHandler()
- mongodb.create_collection_and_set_schema(
- collection_name=collection_name,
- schema_path=mongo_schema_path)
- try:
- n_chunks = len(all_instances)//args.chunksize + 1
- with ThreadPoolExecutor(max_workers=args.max_workers) as executor:
- for i in range(n_chunks):
- executor.submit(import_chunk,
- all_instances[i*args.chunksize:(i+1)*args.chunksize], i)
- except Exception as e:
- err = ("Failed to import {0} in mongodb. "
- "Exit with error: {1}".format(collection_name, e))
- log.error(err)
- raise Exception(e)
- log.info("Finished application")
|