X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus%2Fcommon_consumer.py;h=3a95c768aac2d554bedbb8046cb17fdda5e312fa;hb=e27def0d99cc73c5c0b7550a28e95abd6c1cd996;hp=b8e33d233fc61961ff83965453019fa2ffccfd04;hpb=93699898c51364cde193d8d441f4aed45670e7bf;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index b8e33d2..3a95c76 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -22,7 +22,7 @@ import json import logging import sys -import threading +import time from json import JSONDecodeError import six @@ -52,10 +52,6 @@ log = logging.getLogger(__name__) kafka_logger = logging.getLogger('kafka') kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL)) -kafka_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -kafka_handler = logging.StreamHandler(sys.stdout) -kafka_handler.setFormatter(kafka_formatter) -kafka_logger.addHandler(kafka_handler) class CommonConsumer: @@ -92,11 +88,26 @@ class CommonConsumer: topics = ['metric_request', 'alarm_request', 'vim_account'] common_consumer.subscribe(topics) + retries = 1 + max_retries = 5 + while True: + try: + common_consumer.poll() + common_consumer.seek_to_end() + break + except Exception: + log.error("Error getting Kafka partitions. Maybe Kafka is not ready yet.") + log.error("Retry number %d of %d", retries, max_retries) + if retries >= max_retries: + log.error("Achieved max number of retries. Logging exception and exiting...") + log.exception("Exception: ") + return + retries = retries + 1 + time.sleep(2) log.info("Listening for messages...") for message in common_consumer: - t = threading.Thread(target=self.consume_message, args=(message,)) - t.start() + self.consume_message(message) def consume_message(self, message): log.info("Message arrived: %s", message) @@ -110,6 +121,9 @@ class CommonConsumer: if message.topic == "vim_account": if message.key == "create" or message.key == "edit": + values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'], + values['schema_version'], + values['_id']) self.auth_manager.store_auth_credentials(values) if message.key == "delete": self.auth_manager.delete_auth_credentials(values) @@ -183,7 +197,7 @@ class CommonConsumer: key = key.replace('request', 'response') producer = Producer() producer.send(topic=topic, key=key, value=json.dumps(msg)) - producer.flush() + producer.flush(timeout=5) producer.close()