##
import asyncio
import logging
+import multiprocessing
import time
-import concurrent.futures
from osm_mon.core.config import Config
from osm_mon.core.message_bus_client import MessageBusClient
def evaluate(self):
log.debug('evaluate')
+ log.info('Starting alarm evaluation')
alarms_tuples = self.service.evaluate_alarms()
- # Starting evaluate executor pool with pool size process_pool_size. Default process_pool_size is 20
- with concurrent.futures.ProcessPoolExecutor(self.conf.get('evaluator', 'process_pool_size')) as executor:
- log.info('Started evaluate process pool with pool size %s' % (self.conf.get('evaluator',
- 'process_pool_size')))
- evaluate_futures = []
- for alarm, status in alarms_tuples:
- evaluate_futures.append(executor.submit(Evaluator._notify_alarm, self.conf, alarm, status))
-
- try:
- # Wait for future calls to complete till process_timeout. Default is 50 seconds
- for evaluate_future in concurrent.futures.as_completed(evaluate_futures,
- self.conf.get('evaluator', 'process_timeout')):
- result = evaluate_future.result(timeout=int(self.conf.get('evaluator',
- 'process_timeout')))
- log.debug('result = %s' % (result))
- except concurrent.futures.TimeoutError as e:
- # Some processes have not completed due to timeout error
- log.info('Some processes have not finished due to TimeoutError exception')
- log.debug('concurrent.futures.TimeoutError exception %s' % (e))
-
- # Shutting down process pool executor
- Evaluator._stop_process_pool(executor)
-
- @staticmethod
- def _stop_process_pool(executor):
- log.debug("_stop_process_pool")
- log.info('Shutting down process pool')
- try:
- log.debug('Stopping residual processes in the process pool')
- for pid, process in executor._processes.items():
- if process.is_alive():
- process.terminate()
- except Exception as e:
- log.info("Exception during process termination")
- log.debug("Exception %s" % (e))
-
- try:
- # Shutting down executor
- log.debug('Shutting down process pool executor')
- executor.shutdown()
- except RuntimeError as e:
- log.info('RuntimeError in shutting down executer')
- log.debug('RuntimeError %s' % (e))
- return
-
- @staticmethod
- def _notify_alarm(conf: Config, alarm: Alarm, status: AlarmStatus):
+ processes = []
+ for alarm, status in alarms_tuples:
+ p = multiprocessing.Process(target=self.notify_alarm,
+ args=(alarm, status))
+ p.start()
+ processes.append(p)
+ for process in processes:
+ process.join(timeout=10)
+ log.info('Alarm evaluation is complete')
+
+ def notify_alarm(self, alarm: Alarm, status: AlarmStatus):
log.debug("_notify_alarm")
- resp_message = Evaluator._build_alarm_response(alarm, status)
- msg_bus = MessageBusClient(conf)
- loop = asyncio.get_event_loop()
+ resp_message = self._build_alarm_response(alarm, status)
log.info("Sent alarm notification: %s", resp_message)
- loop.run_until_complete(msg_bus.aiowrite('alarm_response', 'notify_alarm', resp_message))
- return
+ self.loop.run_until_complete(self.msg_bus.aiowrite('alarm_response', 'notify_alarm', resp_message))
- @staticmethod
- def _build_alarm_response(alarm: Alarm, status: AlarmStatus):
+ def _build_alarm_response(self, alarm: Alarm, status: AlarmStatus):
log.debug("_build_alarm_response")
response = ResponseBuilder()
tags = {}
super().setUp()
self.config = Config()
- @mock.patch('concurrent.futures.ProcessPoolExecutor.submit')
- @mock.patch.object(Evaluator, "_notify_alarm")
+ @mock.patch('multiprocessing.Process')
+ @mock.patch.object(Evaluator, "notify_alarm")
@mock.patch.object(EvaluatorService, "evaluate_alarms")
- def test_evaluate(self, evaluate_alarms, _notify_alarm, futures):
+ def test_evaluate(self, evaluate_alarms, notify_alarm, process):
mock_alarm = mock.Mock()
mock_alarm.operation = 'gt'
mock_alarm.threshold = 50.0
evaluator = Evaluator(self.config)
evaluator.evaluate()
- futures.assert_called_with(Evaluator._notify_alarm, self.config, mock_alarm, AlarmStatus.ALARM)
+ process.assert_called_with(target=notify_alarm, args=(mock_alarm, AlarmStatus.ALARM))