From 53ad3c4c5d87a5b392a74cf386c29c67276ed3cb Mon Sep 17 00:00:00 2001 From: Benjamin Diaz Date: Tue, 9 Oct 2018 18:16:25 -0300 Subject: [PATCH] Adds OSMMON_KAFKA_LOG_LEVEL env var Also removes unnecessary code from producer.py and moves the consumer in common_consumer inside run method to ease mocking during tests. Signed-off-by: Benjamin Diaz Change-Id: Ib7e0b9423a7c7d2acdd12f1bcb75b20eb3a7ea72 --- docker/Dockerfile | 1 + osm_mon/core/message_bus/common_consumer.py | 28 ++++++++++++--------- osm_mon/core/message_bus/producer.py | 10 ++------ osm_mon/core/settings.py | 1 + osm_mon/plugins/OpenStack/Aodh/notifier.py | 7 ++++++ 5 files changed, 27 insertions(+), 20 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index e736b50..6f7a829 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -43,6 +43,7 @@ ENV OS_NOTIFIER_URI localhost:8662 ENV OS_DEFAULT_GRANULARITY 300 ENV REQUEST_TIMEOUT 10 ENV OSMMON_LOG_LEVEL INFO +ENV OSMMON_KAFKA_LOG_LEVEL INFO EXPOSE 8662 diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index b3ac8ae..cdc8548 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -48,6 +48,13 @@ logging.basicConfig(stream=sys.stdout, level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL)) log = logging.getLogger(__name__) +kafka_logger = logging.getLogger('kafka') +kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL)) +kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +kafka_handler = logging.StreamHandler(sys.stdout) +kafka_handler.setFormatter(kafka_formatter) +kafka_logger.addHandler(kafka_handler) + class CommonConsumer: @@ -77,17 +84,6 @@ class CommonConsumer: self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'}) log.info("Connection successful.") - # Initialize consumers for alarms and metrics - self.common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - group_id="mon-consumer") - - # Define subscribe the consumer for the plugins - topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account'] - # TODO: Remove access_credentials - self.common_consumer.subscribe(topics) - def get_vim_type(self, vim_uuid): """Get the vim type that is required by the message.""" credentials = self.database_manager.get_credentials(vim_uuid) @@ -107,8 +103,16 @@ class CommonConsumer: return vnfr def run(self): + common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI, + key_deserializer=bytes.decode, + value_deserializer=bytes.decode, + group_id="mon-consumer") + + topics = ['metric_request', 'alarm_request', 'vim_account'] + common_consumer.subscribe(topics) + log.info("Listening for messages...") - for message in self.common_consumer: + for message in common_consumer: t = threading.Thread(target=self.consume_message, args=(message,)) t.start() diff --git a/osm_mon/core/message_bus/producer.py b/osm_mon/core/message_bus/producer.py index f04ecf8..f6feba1 100755 --- a/osm_mon/core/message_bus/producer.py +++ b/osm_mon/core/message_bus/producer.py @@ -61,22 +61,16 @@ class KafkaProducer(object): self.producer = kaf( key_serializer=str.encode, value_serializer=str.encode, - bootstrap_servers=broker, api_version=(0, 10)) + bootstrap_servers=broker, api_version=(0, 10, 1)) def publish(self, key, value, topic=None): """Send the required message on the Kafka message bus.""" try: future = self.producer.send(topic=topic, key=key, value=value) - record_metadata = future.get(timeout=10) + future.get(timeout=10) except Exception: logging.exception("Error publishing to {} topic." .format(topic)) raise - try: - logging.debug("TOPIC:", record_metadata.topic) - logging.debug("PARTITION:", record_metadata.partition) - logging.debug("OFFSET:", record_metadata.offset) - except KafkaError: - pass def publish_alarm_request(self, key, message): """Publish an alarm request.""" diff --git a/osm_mon/core/settings.py b/osm_mon/core/settings.py index 166018c..6574f59 100644 --- a/osm_mon/core/settings.py +++ b/osm_mon/core/settings.py @@ -66,6 +66,7 @@ class Config(object): CfgParam('OS_DEFAULT_GRANULARITY', "300", six.text_type), CfgParam('REQUEST_TIMEOUT', 10, int), CfgParam('OSMMON_LOG_LEVEL', "INFO", six.text_type), + CfgParam('OSMMON_KAFKA_LOG_LEVEL', "INFO", six.text_type), ] _config_dict = {cfg.key: cfg for cfg in _configuration} diff --git a/osm_mon/plugins/OpenStack/Aodh/notifier.py b/osm_mon/plugins/OpenStack/Aodh/notifier.py index c43f238..e14ba7a 100644 --- a/osm_mon/plugins/OpenStack/Aodh/notifier.py +++ b/osm_mon/plugins/OpenStack/Aodh/notifier.py @@ -42,6 +42,13 @@ logging.basicConfig(stream=sys.stdout, level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL)) log = logging.getLogger(__name__) +kafka_logger = logging.getLogger('kafka') +kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL)) +kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +kafka_handler = logging.StreamHandler(sys.stdout) +kafka_handler.setFormatter(kafka_formatter) +kafka_logger.addHandler(kafka_handler) + sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..', '..'))) from osm_mon.core.database import DatabaseManager -- 2.17.1