X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus%2Fcommon_consumer.py;h=3e1f7455dda4aaa7c1a5e1286c2b462f2e2452a6;hb=cca77765cc8d43a0a5524e3754b7587b149300b4;hp=b2677d877f2f5ad027252f12d3088209cabd6081;hpb=4da146638bc3838270fa41c9f9fb91961f726c97;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py index b2677d8..3e1f745 100755 --- a/osm_mon/core/message_bus/common_consumer.py +++ b/osm_mon/core/message_bus/common_consumer.py @@ -22,6 +22,7 @@ import json import logging import sys +import time from json import JSONDecodeError import six @@ -91,8 +92,22 @@ class CommonConsumer: topics = ['metric_request', 'alarm_request', 'vim_account'] common_consumer.subscribe(topics) - common_consumer.poll() - common_consumer.seek_to_end() + retries = 1 + max_retries = 5 + while True: + try: + common_consumer.poll() + common_consumer.seek_to_end() + break + except Exception: + log.error("Error getting Kafka partitions. Maybe Kafka is not ready yet.") + log.error("Retry number %d of %d", retries, max_retries) + if retries >= max_retries: + log.error("Achieved max number of retries. Logging exception and exiting...") + log.exception("Exception: ") + return + retries = retries + 1 + time.sleep(2) log.info("Listening for messages...") for message in common_consumer: