X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus%2Fcommon_consumer.py;h=b3ac8ae156bf0f1efc3b4b1f9776a8e9cbc4e449;hb=b5b7819197730f5000d90a60ed13b32ba4e18fad;hp=dc5816e9b19ff505d990d018d1c1fc0ee4e9810c;hpb=ffe5a8e4e35d3fc7860eb9231618dc0e84e448b7;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index dc5816e..b3ac8ae 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -22,92 +22,97 @@ import json import logging import sys +import threading import six import yaml - from kafka import KafkaConsumer +from osm_common import dbmongo +from osm_mon.core.auth import AuthManager +from osm_mon.core.database import DatabaseManager from osm_mon.core.settings import Config -from osm_mon.plugins.OpenStack.Aodh import alarming -from osm_mon.plugins.OpenStack.Gnocchi import metrics - +from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials +from osm_mon.plugins.CloudWatch.connection import Connection from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics -from osm_mon.plugins.CloudWatch.connection import Connection -from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials - +from osm_mon.plugins.OpenStack.Aodh import alarming +from osm_mon.plugins.OpenStack.Gnocchi import metrics from osm_mon.plugins.vRealiseOps import plugin_receiver -from osm_mon.core.auth import AuthManager -from osm_mon.core.database import DatabaseManager - -from osm_common import dbmongo +cfg = Config.instance() logging.basicConfig(stream=sys.stdout, - format='%(asctime)s %(message)s', + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', - level=logging.INFO) + level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL)) log = logging.getLogger(__name__) -def get_vim_type(db_manager, vim_uuid): - """Get the vim type that is required by the message.""" - credentials = db_manager.get_credentials(vim_uuid) - return credentials.type - - -def get_vdur(common_db, nsr_id, member_index, vdu_name): - vnfr = get_vnfr(common_db, nsr_id, member_index) - for vdur in vnfr['vdur']: - if vdur['vdu-id-ref'] == vdu_name: - return vdur - raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index, vdu_name) - - -def get_vnfr(common_db, nsr_id, member_index): - vnfr = common_db.get_one(table="vnfrs", filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}) - return vnfr - - -def main(): - cfg = Config.instance() - cfg.read_environ() - - auth_manager = AuthManager() - database_manager = DatabaseManager() - database_manager.create_tables() - - # Create OpenStack alarming and metric instances - openstack_metrics = metrics.Metrics() - openstack_alarms = alarming.Alarming() - - # Create CloudWatch alarm and metric instances - cloudwatch_alarms = plugin_alarms() - cloudwatch_metrics = plugin_metrics() - aws_connection = Connection() - aws_access_credentials = AccessCredentials() - - # Create vROps plugin_receiver class instance - vrops_rcvr = plugin_receiver.PluginReceiver() - - common_db = dbmongo.DbMongo() - common_db_uri = cfg.MONGO_URI.split(':') - common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'}) - - # Initialize consumers for alarms and metrics - common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - group_id="mon-consumer") - - # Define subscribe the consumer for the plugins - topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account'] - # TODO: Remove access_credentials - common_consumer.subscribe(topics) - - log.info("Listening for alarm_request and metric_request messages") - for message in common_consumer: +class CommonConsumer: + + def __init__(self): + cfg = Config.instance() + + self.auth_manager = AuthManager() + self.database_manager = DatabaseManager() + self.database_manager.create_tables() + + # Create OpenStack alarming and metric instances + self.openstack_metrics = metrics.Metrics() + self.openstack_alarms = alarming.Alarming() + + # Create CloudWatch alarm and metric instances + self.cloudwatch_alarms = plugin_alarms() + self.cloudwatch_metrics = plugin_metrics() + self.aws_connection = Connection() + self.aws_access_credentials = AccessCredentials() + + # Create vROps plugin_receiver class instance + self.vrops_rcvr = plugin_receiver.PluginReceiver() + + log.info("Connecting to MongoDB...") + self.common_db = dbmongo.DbMongo() + common_db_uri = cfg.MONGO_URI.split(':') + self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'}) + log.info("Connection successful.") + + # Initialize consumers for alarms and metrics + self.common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, + key_deserializer=bytes.decode, + value_deserializer=bytes.decode, + group_id="mon-consumer") + + # Define subscribe the consumer for the plugins + topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account'] + # TODO: Remove access_credentials + self.common_consumer.subscribe(topics) + + def get_vim_type(self, vim_uuid): + """Get the vim type that is required by the message.""" + credentials = self.database_manager.get_credentials(vim_uuid) + return credentials.type + + def get_vdur(self, nsr_id, member_index, vdu_name): + vnfr = self.get_vnfr(nsr_id, member_index) + for vdur in vnfr['vdur']: + if vdur['name'] == vdu_name: + return vdur + raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index, + vdu_name) + + def get_vnfr(self, nsr_id, member_index): + vnfr = self.common_db.get_one("vnfrs", + {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}) + return vnfr + + def run(self): + log.info("Listening for messages...") + for message in self.common_consumer: + t = threading.Thread(target=self.consume_message, args=(message,)) + t.start() + + def consume_message(self, message): log.info("Message arrived: %s", message) try: try: @@ -117,9 +122,9 @@ def main(): if message.topic == "vim_account": if message.key == "create" or message.key == "edit": - auth_manager.store_auth_credentials(values) + self.auth_manager.store_auth_credentials(values) if message.key == "delete": - auth_manager.delete_auth_credentials(values) + self.auth_manager.delete_auth_credentials(values) else: # Get ns_id from message @@ -140,39 +145,40 @@ def main(): vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index'] # Check the vim desired by the message - vnfr = get_vnfr(common_db, ns_id, vnf_index) + vnfr = self.get_vnfr(ns_id, vnf_index) vim_uuid = vnfr['vim-account-id'] - vim_type = get_vim_type(database_manager, vim_uuid) if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values: vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name'] - vdur = get_vdur(common_db, ns_id, vnf_index, vdu_name) + vdur = self.get_vdur(ns_id, vnf_index, vdu_name) if contains_list: values[list_index]['resource_uuid'] = vdur['vim-id'] else: values['resource_uuid'] = vdur['vim-id'] message = message._replace(value=json.dumps(values)) + vim_type = self.get_vim_type(vim_uuid) + if vim_type == "openstack": log.info("This message is for the OpenStack plugin.") if message.topic == "metric_request": - openstack_metrics.metric_calls(message, vim_uuid) + self.openstack_metrics.metric_calls(message, vim_uuid) if message.topic == "alarm_request": - openstack_alarms.alarming(message, vim_uuid) + self.openstack_alarms.alarming(message, vim_uuid) elif vim_type == "aws": log.info("This message is for the CloudWatch plugin.") - aws_conn = aws_connection.setEnvironment() + aws_conn = self.aws_connection.setEnvironment() if message.topic == "metric_request": - cloudwatch_metrics.metric_calls(message, aws_conn) + self.cloudwatch_metrics.metric_calls(message, aws_conn) if message.topic == "alarm_request": - cloudwatch_alarms.alarm_calls(message, aws_conn) + self.cloudwatch_alarms.alarm_calls(message, aws_conn) if message.topic == "access_credentials": - aws_access_credentials.access_credential_calls(message) + self.aws_access_credentials.access_credential_calls(message) elif vim_type == "vmware": log.info("This metric_request message is for the vROPs plugin.") - vrops_rcvr.consume(message) + self.vrops_rcvr.consume(message,vim_uuid) else: log.debug("vim_type is misconfigured or unsupported; %s", @@ -183,4 +189,4 @@ def main(): if __name__ == '__main__': - main() + CommonConsumer().run()