X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus%2Fcommon_consumer.py;h=e32fa2bba7e691977598cec7e7d6a52e666a61ef;hb=27784a805d77f11d049e9a16704e6977e6967e85;hp=3af2d4abd61654499f078005af6b23819561997d;hpb=59c6f793318bb42f2b311b37ef9c6fc3ceebe36f;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index 3af2d4a..e32fa2b 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -22,22 +22,24 @@ import json import logging import sys -import threading +import time +from json import JSONDecodeError import six import yaml -from kafka import KafkaConsumer -from osm_common import dbmongo +from osm_mon.common.common_db_client import CommonDbClient from osm_mon.core.auth import AuthManager from osm_mon.core.database import DatabaseManager +from osm_mon.core.message_bus.consumer import Consumer +from osm_mon.core.message_bus.producer import Producer from osm_mon.core.settings import Config from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials from osm_mon.plugins.CloudWatch.connection import Connection from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics -from osm_mon.plugins.OpenStack.Aodh import alarming -from osm_mon.plugins.OpenStack.Gnocchi import metrics +from osm_mon.plugins.OpenStack.Aodh import alarm_handler +from osm_mon.plugins.OpenStack.Gnocchi import metric_handler from osm_mon.plugins.vRealiseOps import plugin_receiver cfg = Config.instance() @@ -50,24 +52,18 @@ log = logging.getLogger(__name__) kafka_logger = logging.getLogger('kafka') kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL)) -kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -kafka_handler = logging.StreamHandler(sys.stdout) -kafka_handler.setFormatter(kafka_formatter) -kafka_logger.addHandler(kafka_handler) class CommonConsumer: def __init__(self): - cfg = Config.instance() - self.auth_manager = AuthManager() self.database_manager = DatabaseManager() self.database_manager.create_tables() # Create OpenStack alarming and metric instances - self.openstack_metrics = metrics.Metrics() - self.openstack_alarms = alarming.Alarming() + self.openstack_metrics = metric_handler.OpenstackMetricHandler() + self.openstack_alarms = alarm_handler.OpenstackAlarmHandler() # Create CloudWatch alarm and metric instances self.cloudwatch_alarms = plugin_alarms() @@ -79,9 +75,7 @@ class CommonConsumer: self.vrops_rcvr = plugin_receiver.PluginReceiver() log.info("Connecting to MongoDB...") - self.common_db = dbmongo.DbMongo() - common_db_uri = cfg.MONGO_URI.split(':') - self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'}) + self.common_db = CommonDbClient() log.info("Connection successful.") def get_vim_type(self, vim_uuid): @@ -89,43 +83,42 @@ class CommonConsumer: credentials = self.database_manager.get_credentials(vim_uuid) return credentials.type - def get_vdur(self, nsr_id, member_index, vdu_name): - vnfr = self.get_vnfr(nsr_id, member_index) - for vdur in vnfr['vdur']: - if vdur['name'] == vdu_name: - return vdur - raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index, - vdu_name) - - def get_vnfr(self, nsr_id, member_index): - vnfr = self.common_db.get_one("vnfrs", - {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}) - return vnfr - def run(self): - common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - group_id="mon-consumer", - session_timeout_ms=60000, - heartbeat_interval_ms=20000) + common_consumer = Consumer("mon-consumer") topics = ['metric_request', 'alarm_request', 'vim_account'] common_consumer.subscribe(topics) + retries = 1 + max_retries = 5 + while True: + try: + common_consumer.poll() + common_consumer.seek_to_end() + break + except Exception: + log.error("Error getting Kafka partitions. Maybe Kafka is not ready yet.") + log.error("Retry number %d of %d", retries, max_retries) + if retries >= max_retries: + log.error("Achieved max number of retries. Logging exception and exiting...") + log.exception("Exception: ") + return + retries = retries + 1 + time.sleep(2) log.info("Listening for messages...") for message in common_consumer: - t = threading.Thread(target=self.consume_message, args=(message,)) - t.start() + self.consume_message(message) def consume_message(self, message): log.info("Message arrived: %s", message) try: try: values = json.loads(message.value) - except ValueError: + except JSONDecodeError: values = yaml.safe_load(message.value) + response = None + if message.topic == "vim_account": if message.key == "create" or message.key == "edit": self.auth_manager.store_auth_credentials(values) @@ -151,12 +144,12 @@ class CommonConsumer: vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index'] # Check the vim desired by the message - vnfr = self.get_vnfr(ns_id, vnf_index) + vnfr = self.common_db.get_vnfr(ns_id, vnf_index) vim_uuid = vnfr['vim-account-id'] if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values: vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name'] - vdur = self.get_vdur(ns_id, vnf_index, vdu_name) + vdur = self.common_db.get_vdur(ns_id, vnf_index, vdu_name) if contains_list: values[list_index]['resource_uuid'] = vdur['vim-id'] else: @@ -168,31 +161,42 @@ class CommonConsumer: if vim_type == "openstack": log.info("This message is for the OpenStack plugin.") if message.topic == "metric_request": - self.openstack_metrics.metric_calls(message, vim_uuid) + response = self.openstack_metrics.handle_request(message.key, values, vim_uuid) if message.topic == "alarm_request": - self.openstack_alarms.alarming(message, vim_uuid) + response = self.openstack_alarms.handle_message(message.key, values, vim_uuid) elif vim_type == "aws": log.info("This message is for the CloudWatch plugin.") aws_conn = self.aws_connection.setEnvironment() if message.topic == "metric_request": - self.cloudwatch_metrics.metric_calls(message, aws_conn) + response = self.cloudwatch_metrics.metric_calls(message.key, values, aws_conn) if message.topic == "alarm_request": - self.cloudwatch_alarms.alarm_calls(message, aws_conn) - if message.topic == "access_credentials": - self.aws_access_credentials.access_credential_calls(message) + response = self.cloudwatch_alarms.alarm_calls(message.key, values, aws_conn) elif vim_type == "vmware": log.info("This metric_request message is for the vROPs plugin.") - self.vrops_rcvr.consume(message,vim_uuid) + if message.topic == "metric_request": + response = self.vrops_rcvr.handle_metric_requests(message.key, values, vim_uuid) + if message.topic == "alarm_request": + response = self.vrops_rcvr.handle_alarm_requests(message.key, values, vim_uuid) else: log.debug("vim_type is misconfigured or unsupported; %s", vim_type) + if response: + self._publish_response(message.topic, message.key, response) except Exception: log.exception("Exception processing message: ") + def _publish_response(self, topic: str, key: str, msg: dict): + topic = topic.replace('request', 'response') + key = key.replace('request', 'response') + producer = Producer() + producer.send(topic=topic, key=key, value=json.dumps(msg)) + producer.flush(timeout=5) + producer.close() + if __name__ == '__main__': CommonConsumer().run()