X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fplugins%2FvRealiseOps%2Fkafka_consumer_vrops.py;fp=osm_mon%2Fplugins%2FvRealiseOps%2Fkafka_consumer_vrops.py;h=f5e11e677974f019518dc4d65e8dce646b23284a;hb=c7397b95dbaeebd7d872779eec809daed9e487cc;hp=0000000000000000000000000000000000000000;hpb=71ce7eca516321aff84332df56702e718968735b;p=osm%2FMON.git diff --git a/osm_mon/plugins/vRealiseOps/kafka_consumer_vrops.py b/osm_mon/plugins/vRealiseOps/kafka_consumer_vrops.py new file mode 100644 index 0000000..f5e11e6 --- /dev/null +++ b/osm_mon/plugins/vRealiseOps/kafka_consumer_vrops.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- + +## +# Copyright 2016-2017 VMware Inc. +# This file is part of ETSI OSM +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact: osslegalrouting@vmware.com +## + +""" +vROPs Kafka Consumer that consumes the request messages +""" + + +from kafka import KafkaConsumer +from kafka.errors import KafkaError +import logging as log + +class vROP_KafkaConsumer(object): + """ + Kafka Consumer for vROPs + """ + + def __init__(self, topics=[], broker_uri=None): + """ + Method to initize KafkaConsumer + Args: + broker_uri - hostname:port uri of Kafka broker + topics - list of topics to subscribe + Returns: + None + """ + + if broker_uri is None: + self.broker = '0.0.0.0:9092' + else: + self.broker = broker_uri + + self.topic = topics + print ("vROPs Consumer started, Broker URI: {}".format(self.broker)) + print ("Subscribed Topics {}".format(self.topic)) + try: + self.vrops_consumer = KafkaConsumer(bootstrap_servers=self.broker) + self.vrops_consumer.subscribe(self.topic) + except Exception as exp: + msg = "fail to create consumer for topic {} with broker {} Error : {}"\ + .format(self.topic, self.broker, exp) + log.error(msg) + raise Exception(msg) +