[PM] Adds group-id to Kafka consumer in agent 20/6320/1
authorBenjamin Diaz <bdiaz@whitestack.com>
Fri, 6 Jul 2018 15:03:35 +0000 (12:03 -0300)
committerBenjamin Diaz <bdiaz@whitestack.com>
Fri, 6 Jul 2018 15:03:35 +0000 (12:03 -0300)
Fixes CPU spikes during OSM startup

Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
policy_module/osm_policy_module/core/agent.py

index 3a3b20c..cdd5dfc 100644 (file)
@@ -45,11 +45,10 @@ class PolicyModuleAgent:
 
         # Initialize Kafka consumer
         log.info("Connecting to Kafka server at %s", kafka_server)
-        # TODO: Add logic to handle deduplication of messages when using group_id.
-        # See: https://stackoverflow.com/a/29836412
         consumer = KafkaConsumer(bootstrap_servers=kafka_server,
                                  key_deserializer=bytes.decode,
-                                 value_deserializer=bytes.decode)
+                                 value_deserializer=bytes.decode,
+                                 group_id="pm-consumer")
         consumer.subscribe(['lcm_pm', 'alarm_response'])
 
         for message in consumer: