X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fplugins%2FOpenStack%2FAodh%2Fnotifier.py;h=698784a46b35f916ab25d9cf0034bcf6d19df0e7;hb=6eda48bf7489c00fee4e1f276404ab82db23fd1d;hp=92674cb80800e0aa6240e9e4db1305633e7eaa91;hpb=326907a0151bed5641a9e9e241bc3b05bf0b71b9;p=osm%2FMON.git diff --git a/osm_mon/plugins/OpenStack/Aodh/notifier.py b/osm_mon/plugins/OpenStack/Aodh/notifier.py index 92674cb..698784a 100644 --- a/osm_mon/plugins/OpenStack/Aodh/notifier.py +++ b/osm_mon/plugins/OpenStack/Aodh/notifier.py @@ -25,6 +25,7 @@ import json import logging import os +import re import sys import time @@ -32,19 +33,29 @@ from six.moves.BaseHTTPServer import BaseHTTPRequestHandler from six.moves.BaseHTTPServer import HTTPServer # Initialise a logger for alarm notifier +from osm_mon.core.message_bus.producer import Producer +from osm_mon.core.settings import Config + +cfg = Config.instance() logging.basicConfig(stream=sys.stdout, - format='%(asctime)s %(message)s', + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', - level=logging.INFO) + level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL)) log = logging.getLogger(__name__) +kafka_logger = logging.getLogger('kafka') +kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL)) +kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +kafka_handler = logging.StreamHandler(sys.stdout) +kafka_handler.setFormatter(kafka_formatter) +kafka_logger.addHandler(kafka_handler) + sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..', '..'))) from osm_mon.core.database import DatabaseManager -from osm_mon.core.message_bus.producer import KafkaProducer -from osm_mon.plugins.OpenStack.response import OpenStack_Response +from osm_mon.plugins.OpenStack.response import OpenStackResponseBuilder class NotifierHandler(BaseHTTPRequestHandler): @@ -72,11 +83,11 @@ class NotifierHandler(BaseHTTPRequestHandler): post_data = post_data.decode() except AttributeError: pass - log.info("This alarm was triggered: %s", json.dumps(post_data)) + log.info("This alarm was triggered: %s", post_data) # Send alarm notification to message bus try: - self.notify_alarm(json.dumps(post_data)) + self.notify_alarm(json.loads(post_data)) except Exception: log.exception("Error notifying alarm") @@ -84,8 +95,7 @@ class NotifierHandler(BaseHTTPRequestHandler): """Sends alarm notification message to bus.""" # Initialise configuration and authentication for response message - response = OpenStack_Response() - producer = KafkaProducer('alarm_response') + response = OpenStackResponseBuilder() database_manager = DatabaseManager() @@ -97,27 +107,30 @@ class NotifierHandler(BaseHTTPRequestHandler): # Generate and send response resp_message = response.generate_response( 'notify_alarm', - a_id=alarm_id, - vdu_name=alarm.vdu_name, + alarm_id=alarm_id, + vdu_name=alarm.vdur_name, vnf_member_index=alarm.vnf_member_index, - ns_id=alarm.ns_id, - metric_name=alarm.metric_name, + ns_id=alarm.nsr_id, + metric_name=alarm.monitoring_param, operation=alarm.operation, threshold_value=alarm.threshold, sev=values['severity'], date=a_date, state=values['current']) - producer.publish_alarm_response( - 'notify_alarm', resp_message) + self._publish_response('notify_alarm', json.dumps(resp_message)) log.info("Sent alarm notification: %s", resp_message) + def _publish_response(self, key: str, msg: str): + producer = Producer() + producer.send(topic='alarm_response', key=key, value=msg) + producer.flush() + def run(server_class=HTTPServer, handler_class=NotifierHandler, port=8662): """Run the webserver application to retrieve alarm notifications.""" try: server_address = ('', port) httpd = server_class(server_address, handler_class) - print('Starting alarm notifier...') log.info("Starting alarm notifier server on port: %s", port) httpd.serve_forever() except Exception as exc: @@ -125,10 +138,11 @@ def run(server_class=HTTPServer, handler_class=NotifierHandler, port=8662): if __name__ == "__main__": - from sys import argv - - # Runs the webserver - if len(argv) == 2: - run(port=int(argv[1])) + cfg = Config.instance() + p = re.compile(':(\d+)', re.IGNORECASE) + m = p.search(cfg.OS_NOTIFIER_URI) + if m: + port = m.group(1) + run(port=int(port)) else: run()