X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus%2Fcommon_consumer.py;h=df3133dabf32c0bac53608b3557e32a59a18ade8;hb=cf507461d5f0d7e0121ef734d4ff70d40d6c61ec;hp=97e37fae2ec2b1d35eaed7bee043280d7eb3cb57;hpb=821a62e1e29bb603de56b028d92ad885f06fd68c;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index 97e37fa..df3133d 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -21,158 +21,170 @@ import json import logging -import os import sys -import yaml +import threading +import six import yaml -import logstash +from kafka import KafkaConsumer +from osm_common import dbmongo + +from osm_mon.core.auth import AuthManager +from osm_mon.core.database import DatabaseManager +from osm_mon.core.settings import Config +from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials +from osm_mon.plugins.CloudWatch.connection import Connection +from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms +from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics +from osm_mon.plugins.OpenStack.Aodh import alarming +from osm_mon.plugins.OpenStack.Gnocchi import metrics +from osm_mon.plugins.vRealiseOps import plugin_receiver logging.basicConfig(stream=sys.stdout, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.INFO) log = logging.getLogger(__name__) -log.addHandler(logstash.TCPLogstashHandler('dockerelk_logstash_1', 5000, version=1)) -sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..'))) +class CommonConsumer: -from kafka import KafkaConsumer + def __init__(self): + cfg = Config.instance() -from osm_mon.plugins.OpenStack.Aodh import alarming -from osm_mon.plugins.OpenStack.Gnocchi import metrics + self.auth_manager = AuthManager() + self.database_manager = DatabaseManager() + self.database_manager.create_tables() -from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms -from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics -from osm_mon.plugins.CloudWatch.connection import Connection -from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials + # Create OpenStack alarming and metric instances + self.openstack_metrics = metrics.Metrics() + self.openstack_alarms = alarming.Alarming() -from osm_mon.plugins.vRealiseOps import plugin_receiver + # Create CloudWatch alarm and metric instances + self.cloudwatch_alarms = plugin_alarms() + self.cloudwatch_metrics = plugin_metrics() + self.aws_connection = Connection() + self.aws_access_credentials = AccessCredentials() -from osm_mon.core.auth import AuthManager -from osm_mon.core.database import DatabaseManager - -# Initialize servers -if "BROKER_URI" in os.environ: - server = {'server': os.getenv("BROKER_URI")} -else: - server = {'server': 'localhost:9092'} - -# Initialize consumers for alarms and metrics -common_consumer = KafkaConsumer(bootstrap_servers=server['server'], - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - group_id="mon-consumer") - -auth_manager = AuthManager() -database_manager = DatabaseManager() -database_manager.create_tables() - -# Create OpenStack alarming and metric instances -openstack_metrics = metrics.Metrics() -openstack_alarms = alarming.Alarming() + # Create vROps plugin_receiver class instance + self.vrops_rcvr = plugin_receiver.PluginReceiver() -# Create CloudWatch alarm and metric instances -cloudwatch_alarms = plugin_alarms() -cloudwatch_metrics = plugin_metrics() -aws_connection = Connection() -aws_access_credentials = AccessCredentials() + log.info("Connecting to MongoDB...") + self.common_db = dbmongo.DbMongo() + common_db_uri = cfg.MONGO_URI.split(':') + self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'}) + log.info("Connection successful.") -# Create vROps plugin_receiver class instance -vrops_rcvr = plugin_receiver.PluginReceiver() + # Initialize consumers for alarms and metrics + self.common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, + key_deserializer=bytes.decode, + value_deserializer=bytes.decode, + group_id="mon-consumer") + # Define subscribe the consumer for the plugins + topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account'] + # TODO: Remove access_credentials + self.common_consumer.subscribe(topics) -def get_vim_type(vim_uuid): - """Get the vim type that is required by the message.""" - try: - credentials = database_manager.get_credentials(vim_uuid) + def get_vim_type(self, vim_uuid): + """Get the vim type that is required by the message.""" + credentials = self.database_manager.get_credentials(vim_uuid) return credentials.type - except Exception as exc: - log.exception("Error getting vim_type: ") - return None - -# Define subscribe the consumer for the plugins -topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account'] -# TODO: Remove access_credentials -common_consumer.subscribe(topics) - -log.info("Listening for alarm_request and metric_request messages") -for message in common_consumer: - log.info("Message arrived: %s", message) - try: + def get_vdur(self, nsr_id, member_index, vdu_name): + vnfr = self.get_vnfr(nsr_id, member_index) + for vdur in vnfr['vdur']: + if vdur['name'] == vdu_name: + return vdur + raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index, + vdu_name) + + def get_vnfr(self, nsr_id, member_index): + vnfr = self.common_db.get_one("vnfrs", + {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}) + return vnfr + + def run(self): + log.info("Listening for messages...") + for message in self.common_consumer: + t = threading.Thread(target=self.consume_message, args=(message,)) + t.start() + + def consume_message(self, message): + log.info("Message arrived: %s", message) try: - values = json.loads(message.value) - except ValueError: - values = yaml.safe_load(message.value) - # Check the message topic - if message.topic == "metric_request": - # Check the vim desired by the message - vim_type = get_vim_type(values['vim_uuid']) - - if vim_type == "openstack": - log.info("This message is for the OpenStack plugin.") - openstack_metrics.metric_calls(message) - elif vim_type == "aws": - log.info("This message is for the CloudWatch plugin.") - aws_conn = aws_connection.setEnvironment() - cloudwatch_metrics.metric_calls(message, aws_conn) - - elif vim_type == "vmware": - log.info("This metric_request message is for the vROPs plugin.") - vrops_rcvr.consume(message) - - else: - log.debug("vim_type is misconfigured or unsupported; %s", - vim_type) - - elif message.topic == "alarm_request": - # Check the vim desired by the message - vim_type = get_vim_type(values['vim_uuid']) - if vim_type == "openstack": - log.info("This message is for the OpenStack plugin.") - openstack_alarms.alarming(message) + try: + values = json.loads(message.value) + except ValueError: + values = yaml.safe_load(message.value) - elif vim_type == "aws": - log.info("This message is for the CloudWatch plugin.") - aws_conn = aws_connection.setEnvironment() - cloudwatch_alarms.alarm_calls(message, aws_conn) - - elif vim_type == "vmware": - log.info("This alarm_request message is for the vROPs plugin.") - vrops_rcvr.consume(message) - - else: - log.debug("vim_type is misconfigured or unsupported; %s", - vim_type) - - elif message.topic == "vim_account": - if message.key == "create" or message.key == "edit": - auth_manager.store_auth_credentials(values) - if message.key == "delete": - auth_manager.delete_auth_credentials(values) - - # TODO: Remove in the near future when all plugins support vim_uuid. Modify tests accordingly. - elif message.topic == "access_credentials": - # Check the vim desired by the message - vim_type = get_vim_type(values['vim_uuid']) - - if vim_type == "aws": - log.info("This message is for the CloudWatch plugin.") - aws_access_credentials.access_credential_calls(message) - - elif vim_type == "vmware": - log.info("This access_credentials message is for the vROPs plugin.") - vrops_rcvr.consume(message) + if message.topic == "vim_account": + if message.key == "create" or message.key == "edit": + self.auth_manager.store_auth_credentials(values) + if message.key == "delete": + self.auth_manager.delete_auth_credentials(values) else: - log.debug("vim_type is misconfigured or unsupported; %s", - vim_type) - - else: - log.info("This topic is not relevant to any of the MON plugins.") - - - except Exception as exc: - log.exception("Exception: %s") + # Get ns_id from message + # TODO: Standardize all message models to avoid the need of figuring out where are certain fields + contains_list = False + list_index = None + for k, v in six.iteritems(values): + if isinstance(v, dict): + if 'ns_id' in v: + contains_list = True + list_index = k + break + if not contains_list and 'ns_id' in values: + ns_id = values['ns_id'] + else: + ns_id = values[list_index]['ns_id'] + + vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index'] + + # Check the vim desired by the message + vnfr = self.get_vnfr(ns_id, vnf_index) + vim_uuid = vnfr['vim-account-id'] + + if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values: + vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name'] + vdur = self.get_vdur(ns_id, vnf_index, vdu_name) + if contains_list: + values[list_index]['resource_uuid'] = vdur['vim-id'] + else: + values['resource_uuid'] = vdur['vim-id'] + message = message._replace(value=json.dumps(values)) + + vim_type = self.get_vim_type(vim_uuid) + + if vim_type == "openstack": + log.info("This message is for the OpenStack plugin.") + if message.topic == "metric_request": + self.openstack_metrics.metric_calls(message, vim_uuid) + if message.topic == "alarm_request": + self.openstack_alarms.alarming(message, vim_uuid) + + elif vim_type == "aws": + log.info("This message is for the CloudWatch plugin.") + aws_conn = self.aws_connection.setEnvironment() + if message.topic == "metric_request": + self.cloudwatch_metrics.metric_calls(message, aws_conn) + if message.topic == "alarm_request": + self.cloudwatch_alarms.alarm_calls(message, aws_conn) + if message.topic == "access_credentials": + self.aws_access_credentials.access_credential_calls(message) + + elif vim_type == "vmware": + log.info("This metric_request message is for the vROPs plugin.") + self.vrops_rcvr.consume(message,vim_uuid) + + else: + log.debug("vim_type is misconfigured or unsupported; %s", + vim_type) + + except Exception: + log.exception("Exception processing message: ") + + +if __name__ == '__main__': + CommonConsumer().run()