1 # -*- coding: utf-8 -*-
4 # Copyright 2016-2017 VMware Inc.
5 # This file is part of ETSI OSM
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: osslegalrouting@vmware.com
25 vROPs Kafka Consumer that consumes the request messages
29 from kafka
import KafkaConsumer
30 from kafka
.errors
import KafkaError
33 class vROP_KafkaConsumer(object):
35 Kafka Consumer for vROPs
38 def __init__(self
, topics
=[], broker_uri
=None):
40 Method to initize KafkaConsumer
42 broker_uri - hostname:port uri of Kafka broker
43 topics - list of topics to subscribe
48 if broker_uri
is None:
49 self
.broker
= '0.0.0.0:9092'
51 self
.broker
= broker_uri
54 print ("vROPs Consumer started, Broker URI: {}".format(self
.broker
))
55 print ("Subscribed Topics {}".format(self
.topic
))
57 self
.vrops_consumer
= KafkaConsumer(bootstrap_servers
=self
.broker
)
58 self
.vrops_consumer
.subscribe(self
.topic
)
59 except Exception as exp
:
60 msg
= "fail to create consumer for topic {} with broker {} Error : {}"\
61 .format(self
.topic
, self
.broker
, exp
)