parallelized_import.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Created on Tue Oct 1 11:15:03 2019
  5. @author: tanya
  6. """
  7. import os
  8. import sys
  9. from typing import Callable
  10. sys.path.append(os.getcwd())
  11. def get_all_wheelsets():
  12. '''
  13. return: list of distinct wheelset numbers in the process
  14. '''
  15. from libraries.db_handlers.SQLHandler import SQLHandler
  16. sql_db = SQLHandler()
  17. query = "SELECT DISTINCT radsatznummer FROM rs1"
  18. return sql_db.read_sql_to_dataframe(query)["radsatznummer"].tolist()
  19. def parallelized_import(all_instances: list,
  20. mongo_schema_path: str,
  21. import_chunk: Callable,
  22. log_name: str = None):
  23. from concurrent.futures import ThreadPoolExecutor
  24. from libraries.db_handlers.MongodbHandler import MongodbHandler
  25. from libraries.log import Log
  26. import argparse
  27. argparser = argparse.ArgumentParser(description='Import process instances collection')
  28. argparser.add_argument('--chunksize', type=int, default=100, help="Number of wheelsets processed at a time")
  29. argparser.add_argument('--max_workers', type=int, default=10, help="Number of workers in ThreadPoolExecutor")
  30. args = argparser.parse_args()
  31. log = Log(log_name)
  32. log.info("Start application")
  33. log.info("Processing {0} wheelsets at a time parallelized with {1} workers"
  34. .format(args.chunksize, args.max_workers))
  35. collection_name = os.path.basename(mongo_schema_path).strip("schema_").split(".")[0]
  36. mongodb = MongodbHandler()
  37. mongodb.create_collection_and_set_schema(
  38. collection_name=collection_name,
  39. schema_path=mongo_schema_path)
  40. try:
  41. n_chunks = len(all_instances)//args.chunksize + 1
  42. with ThreadPoolExecutor(max_workers=args.max_workers) as executor:
  43. for i in range(n_chunks):
  44. executor.submit(import_chunk,
  45. all_instances[i*args.chunksize:(i+1)*args.chunksize], i)
  46. except Exception as e:
  47. err = ("Failed to import {0} in mongodb. "
  48. "Exit with error: {1}".format(collection_name, e))
  49. log.error(err)
  50. raise Exception(e)
  51. log.info("Finished application")