- # Create CloudWatch alarm and metric instances
- cloudwatch_alarms = plugin_alarms()
- cloudwatch_metrics = plugin_metrics()
- aws_connection = Connection()
- aws_access_credentials = AccessCredentials()
-
- # Create vROps plugin_receiver class instance
- vrops_rcvr = plugin_receiver.PluginReceiver()
-
- common_db = dbmongo.DbMongo()
- common_db_uri = cfg.MONGO_URI.split(':')
- common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
-
- # Initialize consumers for alarms and metrics
- common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- group_id="mon-consumer")
-
- # Define subscribe the consumer for the plugins
- topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
- # TODO: Remove access_credentials
- common_consumer.subscribe(topics)
-
- log.info("Listening for alarm_request and metric_request messages")
- for message in common_consumer:
+ topics = ['metric_request', 'alarm_request', 'vim_account']
+ common_consumer.subscribe(topics)
+ retries = 1
+ max_retries = 5
+ while True:
+ try:
+ common_consumer.poll()
+ common_consumer.seek_to_end()
+ break
+ except Exception:
+ log.error("Error getting Kafka partitions. Maybe Kafka is not ready yet.")
+ log.error("Retry number %d of %d", retries, max_retries)
+ if retries >= max_retries:
+ log.error("Achieved max number of retries. Logging exception and exiting...")
+ log.exception("Exception: ")
+ return
+ retries = retries + 1
+ time.sleep(2)
+
+ log.info("Listening for messages...")
+ for message in common_consumer:
+ self.consume_message(message)
+
+ def consume_message(self, message):