X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus%2Fcommon_consumer.py;h=b8e33d233fc61961ff83965453019fa2ffccfd04;hb=93699898c51364cde193d8d441f4aed45670e7bf;hp=fd97f833d354ea05e843b7873b388d5616e26d5f;hpb=e80db311a29dc8562dc84ae3336af167bac2ec5b;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index fd97f83..b8e33d2 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -23,21 +23,23 @@ import json import logging import sys import threading +from json import JSONDecodeError import six import yaml -from kafka import KafkaConsumer from osm_mon.common.common_db_client import CommonDbClient from osm_mon.core.auth import AuthManager from osm_mon.core.database import DatabaseManager +from osm_mon.core.message_bus.consumer import Consumer +from osm_mon.core.message_bus.producer import Producer from osm_mon.core.settings import Config from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials from osm_mon.plugins.CloudWatch.connection import Connection from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics -from osm_mon.plugins.OpenStack.Aodh import alarming -from osm_mon.plugins.OpenStack.Gnocchi import metrics +from osm_mon.plugins.OpenStack.Aodh import alarm_handler +from osm_mon.plugins.OpenStack.Gnocchi import metric_handler from osm_mon.plugins.vRealiseOps import plugin_receiver cfg = Config.instance() @@ -59,15 +61,13 @@ kafka_logger.addHandler(kafka_handler) class CommonConsumer: def __init__(self): - cfg = Config.instance() - self.auth_manager = AuthManager() self.database_manager = DatabaseManager() self.database_manager.create_tables() # Create OpenStack alarming and metric instances - self.openstack_metrics = metrics.Metrics() - self.openstack_alarms = alarming.Alarming() + self.openstack_metrics = metric_handler.OpenstackMetricHandler() + self.openstack_alarms = alarm_handler.OpenstackAlarmHandler() # Create CloudWatch alarm and metric instances self.cloudwatch_alarms = plugin_alarms() @@ -88,12 +88,7 @@ class CommonConsumer: return credentials.type def run(self): - common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - group_id="mon-consumer", - session_timeout_ms=60000, - heartbeat_interval_ms=20000) + common_consumer = Consumer("mon-consumer") topics = ['metric_request', 'alarm_request', 'vim_account'] common_consumer.subscribe(topics) @@ -108,9 +103,11 @@ class CommonConsumer: try: try: values = json.loads(message.value) - except ValueError: + except JSONDecodeError: values = yaml.safe_load(message.value) + response = None + if message.topic == "vim_account": if message.key == "create" or message.key == "edit": self.auth_manager.store_auth_credentials(values) @@ -153,31 +150,42 @@ class CommonConsumer: if vim_type == "openstack": log.info("This message is for the OpenStack plugin.") if message.topic == "metric_request": - self.openstack_metrics.metric_calls(message, vim_uuid) + response = self.openstack_metrics.handle_request(message.key, values, vim_uuid) if message.topic == "alarm_request": - self.openstack_alarms.alarming(message, vim_uuid) + response = self.openstack_alarms.handle_message(message.key, values, vim_uuid) elif vim_type == "aws": log.info("This message is for the CloudWatch plugin.") aws_conn = self.aws_connection.setEnvironment() if message.topic == "metric_request": - self.cloudwatch_metrics.metric_calls(message, aws_conn) + response = self.cloudwatch_metrics.metric_calls(message.key, values, aws_conn) if message.topic == "alarm_request": - self.cloudwatch_alarms.alarm_calls(message, aws_conn) - if message.topic == "access_credentials": - self.aws_access_credentials.access_credential_calls(message) + response = self.cloudwatch_alarms.alarm_calls(message.key, values, aws_conn) elif vim_type == "vmware": log.info("This metric_request message is for the vROPs plugin.") - self.vrops_rcvr.consume(message, vim_uuid) + if message.topic == "metric_request": + response = self.vrops_rcvr.handle_metric_requests(message.key, values, vim_uuid) + if message.topic == "alarm_request": + response = self.vrops_rcvr.handle_alarm_requests(message.key, values, vim_uuid) else: log.debug("vim_type is misconfigured or unsupported; %s", vim_type) + if response: + self._publish_response(message.topic, message.key, response) except Exception: log.exception("Exception processing message: ") + def _publish_response(self, topic: str, key: str, msg: dict): + topic = topic.replace('request', 'response') + key = key.replace('request', 'response') + producer = Producer() + producer.send(topic=topic, key=key, value=json.dumps(msg)) + producer.flush() + producer.close() + if __name__ == '__main__': CommonConsumer().run()