- values = json.loads(message.value)
- except JSONDecodeError:
- values = yaml.safe_load(message.value)
-
- response = None
-
- if message.topic == "vim_account":
- if message.key == "create" or message.key == "edit":
- values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'],
- values['schema_version'],
- values['_id'])
- self.auth_manager.store_auth_credentials(values)
- if message.key == "delete":
- self.auth_manager.delete_auth_credentials(values)
-
- elif message.topic == "alarm_request":
- if message.key == "create_alarm_request":
- alarm_details = values['alarm_create_request']
+ await self.msg_bus.aioread(topics, self._process_msg)
+ break
+ except Exception as e:
+ # Failed to subscribe to kafka topic
+ log.error("Error when subscribing to topic(s) %s", str(topics))
+ log.exception("Exception %s", str(e))
+ # Wait for some time for kaka to stabilize and then reattempt to subscribe again
+ time.sleep(wait_time)
+ log.info("Retrying to subscribe the kafka topic(s) %s", str(topics))
+
+ async def _process_msg(self, topic, key, values):
+ log.info("Message arrived: %s", values)
+ try:
+
+ if topic == "alarm_request":
+ if key == "create_alarm_request":
+ alarm_details = values["alarm_create_request"]
+ cor_id = alarm_details["correlation_id"]