Adds OSMMON_KAFKA_LOG_LEVEL env var 70/6670/2
authorBenjamin Diaz <bdiaz@whitestack.com>
Tue, 9 Oct 2018 21:16:25 +0000 (18:16 -0300)
committerBenjamin Diaz <bdiaz@whitestack.com>
Tue, 9 Oct 2018 21:47:30 +0000 (18:47 -0300)
Also removes unnecessary code from producer.py and moves
the consumer in common_consumer inside run method to ease
mocking during tests.

Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
Change-Id: Ib7e0b9423a7c7d2acdd12f1bcb75b20eb3a7ea72

docker/Dockerfile
osm_mon/core/message_bus/common_consumer.py
osm_mon/core/message_bus/producer.py
osm_mon/core/settings.py
osm_mon/plugins/OpenStack/Aodh/notifier.py

index e736b50..6f7a829 100644 (file)
@@ -43,6 +43,7 @@ ENV OS_NOTIFIER_URI localhost:8662
 ENV OS_DEFAULT_GRANULARITY 300
 ENV REQUEST_TIMEOUT 10
 ENV OSMMON_LOG_LEVEL INFO
+ENV OSMMON_KAFKA_LOG_LEVEL INFO
 
 EXPOSE 8662
 
index b3ac8ae..cdc8548 100755 (executable)
@@ -48,6 +48,13 @@ logging.basicConfig(stream=sys.stdout,
                     level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
 log = logging.getLogger(__name__)
 
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
+kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+kafka_handler = logging.StreamHandler(sys.stdout)
+kafka_handler.setFormatter(kafka_formatter)
+kafka_logger.addHandler(kafka_handler)
+
 
 class CommonConsumer:
 
@@ -77,17 +84,6 @@ class CommonConsumer:
         self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
         log.info("Connection successful.")
 
-        # Initialize consumers for alarms and metrics
-        self.common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
-                                             key_deserializer=bytes.decode,
-                                             value_deserializer=bytes.decode,
-                                             group_id="mon-consumer")
-
-        # Define subscribe the consumer for the plugins
-        topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
-        # TODO: Remove access_credentials
-        self.common_consumer.subscribe(topics)
-
     def get_vim_type(self, vim_uuid):
         """Get the vim type that is required by the message."""
         credentials = self.database_manager.get_credentials(vim_uuid)
@@ -107,8 +103,16 @@ class CommonConsumer:
         return vnfr
 
     def run(self):
+        common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
+                                        key_deserializer=bytes.decode,
+                                        value_deserializer=bytes.decode,
+                                        group_id="mon-consumer")
+
+        topics = ['metric_request', 'alarm_request', 'vim_account']
+        common_consumer.subscribe(topics)
+
         log.info("Listening for messages...")
-        for message in self.common_consumer:
+        for message in common_consumer:
             t = threading.Thread(target=self.consume_message, args=(message,))
             t.start()
 
index f04ecf8..f6feba1 100755 (executable)
@@ -61,22 +61,16 @@ class KafkaProducer(object):
         self.producer = kaf(
             key_serializer=str.encode,
             value_serializer=str.encode,
-            bootstrap_servers=broker, api_version=(0, 10))
+            bootstrap_servers=broker, api_version=(0, 10, 1))
 
     def publish(self, key, value, topic=None):
         """Send the required message on the Kafka message bus."""
         try:
             future = self.producer.send(topic=topic, key=key, value=value)
-            record_metadata = future.get(timeout=10)
+            future.get(timeout=10)
         except Exception:
             logging.exception("Error publishing to {} topic." .format(topic))
             raise
-        try:
-            logging.debug("TOPIC:", record_metadata.topic)
-            logging.debug("PARTITION:", record_metadata.partition)
-            logging.debug("OFFSET:", record_metadata.offset)
-        except KafkaError:
-            pass
 
     def publish_alarm_request(self, key, message):
         """Publish an alarm request."""
index 166018c..6574f59 100644 (file)
@@ -66,6 +66,7 @@ class Config(object):
         CfgParam('OS_DEFAULT_GRANULARITY', "300", six.text_type),
         CfgParam('REQUEST_TIMEOUT', 10, int),
         CfgParam('OSMMON_LOG_LEVEL', "INFO", six.text_type),
+        CfgParam('OSMMON_KAFKA_LOG_LEVEL', "INFO", six.text_type),
     ]
 
     _config_dict = {cfg.key: cfg for cfg in _configuration}
index c43f238..e14ba7a 100644 (file)
@@ -42,6 +42,13 @@ logging.basicConfig(stream=sys.stdout,
                     level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
 log = logging.getLogger(__name__)
 
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
+kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+kafka_handler = logging.StreamHandler(sys.stdout)
+kafka_handler.setFormatter(kafka_formatter)
+kafka_logger.addHandler(kafka_handler)
+
 sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..', '..')))
 
 from osm_mon.core.database import DatabaseManager