Merge "Updated Code of AWS plugin including Minor alternations of consumer/producer...
[osm/MON.git] / plugins / vRealiseOps / kafka_consumer_vrops.py
1 # -*- coding: utf-8 -*-
2 """
3 vROPs Kafka Consumer that consumes the request messages
4 """
5
6
7 from kafka import KafkaConsumer
8 from kafka.errors import KafkaError
9 import logging as log
10
11 class vROP_KafkaConsumer(object):
12 """
13 Kafka Consumer for vROPs
14 """
15
16 def __init__(self, topics=[], broker_uri=None):
17 """
18 Method to initize KafkaConsumer
19 Args:
20 broker_uri - hostname:port uri of Kafka broker
21 topics - list of topics to subscribe
22 Returns:
23 None
24 """
25
26 if broker_uri is None:
27 self.broker = '0.0.0.0:9092'
28 else:
29 self.broker = broker_uri
30
31 self.topic = topics
32 print ("vROPs Consumer started, Broker URI: {}".format(self.broker))
33 print ("Subscribed Topics {}".format(self.topic))
34 try:
35 self.vrops_consumer = KafkaConsumer(bootstrap_servers=self.broker)
36 self.vrops_consumer.subscribe(self.topic)
37 except Exception as exp:
38 msg = "fail to create consumer for topic {} with broker {} Error : {}"\
39 .format(self.topic, self.broker, exp)
40 log.error(msg)
41 raise Exception(msg)
42