Refactors code in OpenStack plugin
[osm/MON.git] / osm_mon / plugins / vRealiseOps / kafka_consumer_vrops.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2016-2017 VMware Inc.
5 # This file is part of ETSI OSM
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: osslegalrouting@vmware.com
22 ##
23
24 """
25 vROPs Kafka Consumer that consumes the request messages
26 """
27
28
29 from kafka import KafkaConsumer
30 from kafka.errors import KafkaError
31 import logging as log
32
33 class vROP_KafkaConsumer(object):
34 """
35 Kafka Consumer for vROPs
36 """
37
38 def __init__(self, topics=[], broker_uri=None):
39 """
40 Method to initize KafkaConsumer
41 Args:
42 broker_uri - hostname:port uri of Kafka broker
43 topics - list of topics to subscribe
44 Returns:
45 None
46 """
47
48 if broker_uri is None:
49 self.broker = '0.0.0.0:9092'
50 else:
51 self.broker = broker_uri
52
53 self.topic = topics
54 print ("vROPs Consumer started, Broker URI: {}".format(self.broker))
55 print ("Subscribed Topics {}".format(self.topic))
56 try:
57 self.vrops_consumer = KafkaConsumer(bootstrap_servers=self.broker)
58 self.vrops_consumer.subscribe(self.topic)
59 except Exception as exp:
60 msg = "fail to create consumer for topic {} with broker {} Error : {}"\
61 .format(self.topic, self.broker, exp)
62 log.error(msg)
63 raise Exception(msg)
64