##
import asyncio
import logging
+import multiprocessing
import time
-import concurrent.futures
from osm_mon.core.config import Config
from osm_mon.core.message_bus_client import MessageBusClient
class Evaluator:
-
def __init__(self, config: Config, loop=None):
self.conf = config
if not loop:
self.msg_bus = MessageBusClient(config)
def evaluate_forever(self):
- log.debug('evaluate_forever')
+ log.debug("evaluate_forever")
while True:
try:
self.evaluate()
- time.sleep(int(self.conf.get('evaluator', 'interval')))
+ time.sleep(int(self.conf.get("evaluator", "interval")))
except Exception:
log.exception("Error evaluating alarms")
def evaluate(self):
- log.debug('evaluate')
+ log.debug("evaluate")
+ log.info("Starting alarm evaluation")
alarms_tuples = self.service.evaluate_alarms()
- # Starting evaluate executor pool with pool size process_pool_size. Default process_pool_size is 20
- with concurrent.futures.ProcessPoolExecutor(self.conf.get('evaluator', 'process_pool_size')) as executor:
- log.info('Started evaluate process pool with pool size %s' % (self.conf.get('evaluator',
- 'process_pool_size')))
- evaluate_futures = []
- for alarm, status in alarms_tuples:
- evaluate_futures.append(executor.submit(Evaluator._notify_alarm, self.conf, alarm, status))
-
- try:
- # Wait for future calls to complete till process_timeout. Default is 50 seconds
- for evaluate_future in concurrent.futures.as_completed(evaluate_futures,
- self.conf.get('evaluator', 'process_timeout')):
- result = evaluate_future.result(timeout=int(self.conf.get('evaluator',
- 'process_timeout')))
- log.debug('result = %s' % (result))
- except concurrent.futures.TimeoutError as e:
- # Some processes have not completed due to timeout error
- log.info('Some processes have not finished due to TimeoutError exception')
- log.debug('concurrent.futures.TimeoutError exception %s' % (e))
-
- # Shutting down process pool executor
- Evaluator._stop_process_pool(executor)
-
- @staticmethod
- def _stop_process_pool(executor):
- log.debug("_stop_process_pool")
- log.info('Shutting down process pool')
- try:
- log.debug('Stopping residual processes in the process pool')
- for pid, process in executor._processes.items():
- if process.is_alive():
- process.terminate()
- except Exception as e:
- log.info("Exception during process termination")
- log.debug("Exception %s" % (e))
-
- try:
- # Shutting down executor
- log.debug('Shutting down process pool executor')
- executor.shutdown()
- except RuntimeError as e:
- log.info('RuntimeError in shutting down executer')
- log.debug('RuntimeError %s' % (e))
- return
-
- @staticmethod
- def _notify_alarm(conf: Config, alarm: Alarm, status: AlarmStatus):
+ processes = []
+ for alarm, status in alarms_tuples:
+ p = multiprocessing.Process(target=self.notify_alarm, args=(alarm, status))
+ p.start()
+ processes.append(p)
+ for process in processes:
+ process.join(timeout=10)
+ log.info("Alarm evaluation is complete")
+
+ def notify_alarm(self, alarm: Alarm, status: AlarmStatus):
log.debug("_notify_alarm")
- resp_message = Evaluator._build_alarm_response(alarm, status)
- msg_bus = MessageBusClient(conf)
- loop = asyncio.get_event_loop()
+ resp_message = self._build_alarm_response(alarm, status)
log.info("Sent alarm notification: %s", resp_message)
- loop.run_until_complete(msg_bus.aiowrite('alarm_response', 'notify_alarm', resp_message))
+ self.loop.run_until_complete(
+ self.msg_bus.aiowrite("alarm_response", "notify_alarm", resp_message)
+ )
+ evaluator_service = EvaluatorService(self.conf)
+ evaluator_service.update_alarm_status(status.value, alarm.uuid)
return
- @staticmethod
- def _build_alarm_response(alarm: Alarm, status: AlarmStatus):
+ def _build_alarm_response(self, alarm: Alarm, status: AlarmStatus):
log.debug("_build_alarm_response")
response = ResponseBuilder()
tags = {}
tags[name] = value
now = time.strftime("%d-%m-%Y") + " " + time.strftime("%X")
return response.generate_response(
- 'notify_alarm',
+ "notify_alarm",
alarm_id=alarm.uuid,
metric_name=alarm.metric,
operation=alarm.operation,
sev=alarm.severity,
status=status.value,
date=now,
- tags=tags)
+ tags=tags,
+ )