X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus%2Fcommon_consumer.py;h=cdc85487cb53b194a98ffdd5b0dfb549d8097293;hb=refs%2Fchanges%2F70%2F6670%2F2;hp=b3ac8ae156bf0f1efc3b4b1f9776a8e9cbc4e449;hpb=b5b7819197730f5000d90a60ed13b32ba4e18fad;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index b3ac8ae..cdc8548 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -48,6 +48,13 @@ logging.basicConfig(stream=sys.stdout, level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL)) log = logging.getLogger(__name__) +kafka_logger = logging.getLogger('kafka') +kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL)) +kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +kafka_handler = logging.StreamHandler(sys.stdout) +kafka_handler.setFormatter(kafka_formatter) +kafka_logger.addHandler(kafka_handler) + class CommonConsumer: @@ -77,17 +84,6 @@ class CommonConsumer: self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'}) log.info("Connection successful.") - # Initialize consumers for alarms and metrics - self.common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - group_id="mon-consumer") - - # Define subscribe the consumer for the plugins - topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account'] - # TODO: Remove access_credentials - self.common_consumer.subscribe(topics) - def get_vim_type(self, vim_uuid): """Get the vim type that is required by the message.""" credentials = self.database_manager.get_credentials(vim_uuid) @@ -107,8 +103,16 @@ class CommonConsumer: return vnfr def run(self): + common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, + key_deserializer=bytes.decode, + value_deserializer=bytes.decode, + group_id="mon-consumer") + + topics = ['metric_request', 'alarm_request', 'vim_account'] + common_consumer.subscribe(topics) + log.info("Listening for messages...") - for message in self.common_consumer: + for message in common_consumer: t = threading.Thread(target=self.consume_message, args=(message,)) t.start()