Adds OSMMON_KAFKA_LOG_LEVEL env var
Also removes unnecessary code from producer.py and moves
the consumer in common_consumer inside run method to ease
mocking during tests.
Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
Change-Id: Ib7e0b9423a7c7d2acdd12f1bcb75b20eb3a7ea72
diff --git a/docker/Dockerfile b/docker/Dockerfile
index e736b50..6f7a829 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -43,6 +43,7 @@
ENV OS_DEFAULT_GRANULARITY 300
ENV REQUEST_TIMEOUT 10
ENV OSMMON_LOG_LEVEL INFO
+ENV OSMMON_KAFKA_LOG_LEVEL INFO
EXPOSE 8662
diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py
index b3ac8ae..cdc8548 100755
--- a/osm_mon/core/message_bus/common_consumer.py
+++ b/osm_mon/core/message_bus/common_consumer.py
@@ -48,6 +48,13 @@
level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
log = logging.getLogger(__name__)
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
+kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+kafka_handler = logging.StreamHandler(sys.stdout)
+kafka_handler.setFormatter(kafka_formatter)
+kafka_logger.addHandler(kafka_handler)
+
class CommonConsumer:
@@ -77,17 +84,6 @@
self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
log.info("Connection successful.")
- # Initialize consumers for alarms and metrics
- self.common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- group_id="mon-consumer")
-
- # Define subscribe the consumer for the plugins
- topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
- # TODO: Remove access_credentials
- self.common_consumer.subscribe(topics)
-
def get_vim_type(self, vim_uuid):
"""Get the vim type that is required by the message."""
credentials = self.database_manager.get_credentials(vim_uuid)
@@ -107,8 +103,16 @@
return vnfr
def run(self):
+ common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
+ key_deserializer=bytes.decode,
+ value_deserializer=bytes.decode,
+ group_id="mon-consumer")
+
+ topics = ['metric_request', 'alarm_request', 'vim_account']
+ common_consumer.subscribe(topics)
+
log.info("Listening for messages...")
- for message in self.common_consumer:
+ for message in common_consumer:
t = threading.Thread(target=self.consume_message, args=(message,))
t.start()
diff --git a/osm_mon/core/message_bus/producer.py b/osm_mon/core/message_bus/producer.py
index f04ecf8..f6feba1 100755
--- a/osm_mon/core/message_bus/producer.py
+++ b/osm_mon/core/message_bus/producer.py
@@ -61,22 +61,16 @@
self.producer = kaf(
key_serializer=str.encode,
value_serializer=str.encode,
- bootstrap_servers=broker, api_version=(0, 10))
+ bootstrap_servers=broker, api_version=(0, 10, 1))
def publish(self, key, value, topic=None):
"""Send the required message on the Kafka message bus."""
try:
future = self.producer.send(topic=topic, key=key, value=value)
- record_metadata = future.get(timeout=10)
+ future.get(timeout=10)
except Exception:
logging.exception("Error publishing to {} topic." .format(topic))
raise
- try:
- logging.debug("TOPIC:", record_metadata.topic)
- logging.debug("PARTITION:", record_metadata.partition)
- logging.debug("OFFSET:", record_metadata.offset)
- except KafkaError:
- pass
def publish_alarm_request(self, key, message):
"""Publish an alarm request."""
diff --git a/osm_mon/core/settings.py b/osm_mon/core/settings.py
index 166018c..6574f59 100644
--- a/osm_mon/core/settings.py
+++ b/osm_mon/core/settings.py
@@ -66,6 +66,7 @@
CfgParam('OS_DEFAULT_GRANULARITY', "300", six.text_type),
CfgParam('REQUEST_TIMEOUT', 10, int),
CfgParam('OSMMON_LOG_LEVEL', "INFO", six.text_type),
+ CfgParam('OSMMON_KAFKA_LOG_LEVEL', "INFO", six.text_type),
]
_config_dict = {cfg.key: cfg for cfg in _configuration}
diff --git a/osm_mon/plugins/OpenStack/Aodh/notifier.py b/osm_mon/plugins/OpenStack/Aodh/notifier.py
index c43f238..e14ba7a 100644
--- a/osm_mon/plugins/OpenStack/Aodh/notifier.py
+++ b/osm_mon/plugins/OpenStack/Aodh/notifier.py
@@ -42,6 +42,13 @@
level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
log = logging.getLogger(__name__)
+kafka_logger = logging.getLogger('kafka')
+kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
+kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+kafka_handler = logging.StreamHandler(sys.stdout)
+kafka_handler.setFormatter(kafka_formatter)
+kafka_logger.addHandler(kafka_handler)
+
sys.path.append(os.path.abspath(os.path.join(os.path.realpath(__file__), '..', '..', '..', '..', '..')))
from osm_mon.core.database import DatabaseManager