Merge "[MON] Implements multithreading for message consumption"
[osm/MON.git] / policy_module / osm_policy_module / core / agent.py
index aa3f3ff..cdd5dfc 100644 (file)
@@ -45,11 +45,10 @@ class PolicyModuleAgent:
 
         # Initialize Kafka consumer
         log.info("Connecting to Kafka server at %s", kafka_server)
-        # TODO: Add logic to handle deduplication of messages when using group_id.
-        # See: https://stackoverflow.com/a/29836412
         consumer = KafkaConsumer(bootstrap_servers=kafka_server,
                                  key_deserializer=bytes.decode,
-                                 value_deserializer=bytes.decode)
+                                 value_deserializer=bytes.decode,
+                                 group_id="pm-consumer")
         consumer.subscribe(['lcm_pm', 'alarm_response'])
 
         for message in consumer:
@@ -113,7 +112,7 @@ class PolicyModuleAgent:
                         log.info("Sending scaling action message for ns: %s", alarm_id)
                         lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.action)
                     except ScalingAlarm.DoesNotExist:
-                        log.info("There is no action configured for alarm %.", alarm_id)
+                        log.info("There is no action configured for alarm %s.", alarm_id)
             except Exception:
                 log.exception("Error consuming message: ")