- # Starting evaluate executor pool with pool size process_pool_size. Default process_pool_size is 20
- with concurrent.futures.ProcessPoolExecutor(self.conf.get('evaluator', 'process_pool_size')) as executor:
- log.info('Started evaluate process pool with pool size %s' % (self.conf.get('evaluator',
- 'process_pool_size')))
- evaluate_futures = []
- for alarm, status in alarms_tuples:
- evaluate_futures.append(executor.submit(Evaluator._notify_alarm, self.conf, alarm, status))
-
- try:
- # Wait for future calls to complete till process_timeout. Default is 50 seconds
- for evaluate_future in concurrent.futures.as_completed(evaluate_futures,
- self.conf.get('evaluator', 'process_timeout')):
- result = evaluate_future.result(timeout=int(self.conf.get('evaluator',
- 'process_timeout')))
- log.debug('result = %s' % (result))
- except concurrent.futures.TimeoutError as e:
- # Some processes have not completed due to timeout error
- log.info('Some processes have not finished due to TimeoutError exception')
- log.debug('concurrent.futures.TimeoutError exception %s' % (e))
- Evaluator._stop_process_pool(executor)
-
- @staticmethod
- def _stop_process_pool(executor):
- log.debug("_stop_process_pool")
- log.info('Stopping all processes in the process pool')
- try:
- for pid, process in executor._processes.items():
- if process.is_alive():
- process.terminate()
- except Exception as e:
- log.info("Exception during process termination")
- log.debug("Exception %s" % (e))
- executor.shutdown()
- return
-
- @staticmethod
- def _notify_alarm(conf: Config, alarm: Alarm, status: AlarmStatus):
+ processes = []
+ for alarm, status in alarms_tuples:
+ p = multiprocessing.Process(target=self.notify_alarm, args=(alarm, status))
+ p.start()
+ processes.append(p)
+ for process in processes:
+ process.join(timeout=10)
+ log.info("Alarm evaluation is complete")
+
+ def notify_alarm(self, alarm: Alarm, status: AlarmStatus):