# Initialize Kafka consumer
log.info("Connecting to Kafka server at %s", kafka_server)
+ # TODO: Add logic to handle deduplication of messages when using group_id.
+ # See: https://stackoverflow.com/a/29836412
consumer = KafkaConsumer(bootstrap_servers=kafka_server,
key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- group_id="policy-module-agent")
+ value_deserializer=bytes.decode)
consumer.subscribe(['lcm_pm', 'alarm_response'])
for message in consumer:
log.info("Message arrived: %s", message)
- log.info("Message key: %s", message.key)
try:
if message.key == 'configure_scaling':
content = json.loads(message.value)
alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
if alarm:
lcm_client = LcmClient()
- log.info("Sending scaling action message: %s", json.dumps(alarm))
+ log.info("Sending scaling action message for ns: %s", alarm_id)
lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.action)
except Exception:
log.exception("Error consuming message: ")