X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus%2Fcommon_consumer.py;h=3af2d4abd61654499f078005af6b23819561997d;hb=59c6f793318bb42f2b311b37ef9c6fc3ceebe36f;hp=60ad313b65e30f14fe669866cede7da5739f538f;hpb=326907a0151bed5641a9e9e241bc3b05bf0b71b9;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index 60ad313..3af2d4a 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -40,12 +40,21 @@ from osm_mon.plugins.OpenStack.Aodh import alarming from osm_mon.plugins.OpenStack.Gnocchi import metrics from osm_mon.plugins.vRealiseOps import plugin_receiver +cfg = Config.instance() + logging.basicConfig(stream=sys.stdout, - format='%(asctime)s %(message)s', + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', - level=logging.INFO) + level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL)) log = logging.getLogger(__name__) +kafka_logger = logging.getLogger('kafka') +kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL)) +kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +kafka_handler = logging.StreamHandler(sys.stdout) +kafka_handler.setFormatter(kafka_formatter) +kafka_logger.addHandler(kafka_handler) + class CommonConsumer: @@ -75,17 +84,6 @@ class CommonConsumer: self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'}) log.info("Connection successful.") - # Initialize consumers for alarms and metrics - self.common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - group_id="mon-consumer") - - # Define subscribe the consumer for the plugins - topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account'] - # TODO: Remove access_credentials - self.common_consumer.subscribe(topics) - def get_vim_type(self, vim_uuid): """Get the vim type that is required by the message.""" credentials = self.database_manager.get_credentials(vim_uuid) @@ -94,19 +92,29 @@ class CommonConsumer: def get_vdur(self, nsr_id, member_index, vdu_name): vnfr = self.get_vnfr(nsr_id, member_index) for vdur in vnfr['vdur']: - if vdur['vdu-id-ref'] == vdu_name: + if vdur['name'] == vdu_name: return vdur raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index, vdu_name) def get_vnfr(self, nsr_id, member_index): - vnfr = self.common_db.get_one(table="vnfrs", - filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}) + vnfr = self.common_db.get_one("vnfrs", + {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}) return vnfr def run(self): + common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, + key_deserializer=bytes.decode, + value_deserializer=bytes.decode, + group_id="mon-consumer", + session_timeout_ms=60000, + heartbeat_interval_ms=20000) + + topics = ['metric_request', 'alarm_request', 'vim_account'] + common_consumer.subscribe(topics) + log.info("Listening for messages...") - for message in self.common_consumer: + for message in common_consumer: t = threading.Thread(target=self.consume_message, args=(message,)) t.start() @@ -176,7 +184,7 @@ class CommonConsumer: elif vim_type == "vmware": log.info("This metric_request message is for the vROPs plugin.") - self.vrops_rcvr.consume(message) + self.vrops_rcvr.consume(message,vim_uuid) else: log.debug("vim_type is misconfigured or unsupported; %s",