X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fevaluator%2Fevaluator.py;h=61b788a0dfb2c5e2954bac01122bcca1c4035254;hb=072b0bdeb1a0919c5cc8de128fa3c62e8393ed7c;hp=d3fdfd50f9051162250bcebb5603e7a1c2ad8f74;hpb=a97bdb3eafa4f3d07d61d32635f7f36f5cc36c58;p=osm%2FMON.git diff --git a/osm_mon/evaluator/evaluator.py b/osm_mon/evaluator/evaluator.py index d3fdfd5..61b788a 100644 --- a/osm_mon/evaluator/evaluator.py +++ b/osm_mon/evaluator/evaluator.py @@ -25,11 +25,9 @@ import logging import multiprocessing import time -import peewee - from osm_mon.core.config import Config -from osm_mon.core.database import Alarm from osm_mon.core.message_bus_client import MessageBusClient +from osm_mon.core.models import Alarm from osm_mon.core.response import ResponseBuilder from osm_mon.evaluator.service import EvaluatorService, AlarmStatus @@ -37,53 +35,59 @@ log = logging.getLogger(__name__) class Evaluator: - - def __init__(self, config: Config, loop=None): + def __init__(self, config: Config): self.conf = config - if not loop: - loop = asyncio.get_event_loop() - self.loop = loop self.service = EvaluatorService(config) self.msg_bus = MessageBusClient(config) def evaluate_forever(self): - log.debug('evaluate_forever') + log.debug("evaluate_forever") while True: try: self.evaluate() - time.sleep(int(self.conf.get('evaluator', 'interval'))) - except peewee.PeeweeException: - log.exception("Database error evaluating alarms: ") - raise + time.sleep(int(self.conf.get("evaluator", "interval"))) except Exception: log.exception("Error evaluating alarms") def evaluate(self): - log.debug('evaluate') + log.debug("evaluate") + log.info("Starting alarm evaluation") alarms_tuples = self.service.evaluate_alarms() + processes = [] for alarm, status in alarms_tuples: - p = multiprocessing.Process(target=self.notify_alarm, - args=(alarm, status)) + p = multiprocessing.Process(target=self.notify_alarm, args=(alarm, status)) p.start() + processes.append(p) + for process in processes: + process.join(timeout=10) + log.info("Alarm evaluation is complete") def notify_alarm(self, alarm: Alarm, status: AlarmStatus): - log.debug("notify_alarm") + log.debug("_notify_alarm") resp_message = self._build_alarm_response(alarm, status) log.info("Sent alarm notification: %s", resp_message) - self.loop.run_until_complete(self.msg_bus.aiowrite('alarm_response', 'notify_alarm', resp_message)) + asyncio.run( + self.msg_bus.aiowrite("alarm_response", "notify_alarm", resp_message) + ) + evaluator_service = EvaluatorService(self.conf) + evaluator_service.update_alarm_status(status.value, alarm.uuid) + return def _build_alarm_response(self, alarm: Alarm, status: AlarmStatus): + log.debug("_build_alarm_response") response = ResponseBuilder() + tags = {} + for name, value in alarm.tags.items(): + tags[name] = value now = time.strftime("%d-%m-%Y") + " " + time.strftime("%X") return response.generate_response( - 'notify_alarm', + "notify_alarm", alarm_id=alarm.uuid, - vdu_name=alarm.vdur_name, - vnf_member_index=alarm.vnf_member_index, - ns_id=alarm.nsr_id, - metric_name=alarm.monitoring_param, + metric_name=alarm.metric, operation=alarm.operation, threshold_value=alarm.threshold, sev=alarm.severity, status=status.value, - date=now) + date=now, + tags=tags, + )