2 # Copyright© 2017 Intel Research and Development Ireland Limited
3 # *************************************************************
5 # This file is part of OSM Monitoring module
6 # All Rights Reserved to Intel Corporation
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
25 This is a kafka consumer app that reads the messages from the message bus for
26 alarms and metrics responses.
28 #TODO: (Prithiv Mohan)
29 - Modify topics based on new schema definitions
30 - Include consumer logging
33 __author__
= "Prithiv Mohan"
34 __date__
= "06/Sep/2017"
36 from kafka
import KafkaConsumer
37 from kafka
.errors
import KafkaError
40 class KafkaConsumer(object):
41 """Adds messages to a kafka topic. Topic is hardcoded as 'alarms' and group as
46 def __init__(self
, uri
):
49 uri - kafka connection details
51 if not cfg
.CONF
.kafka
.uri
:
52 raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
54 broker
= cfg
.CONF
.kafka
.uri
55 consumer
= KafkaConsumer('alarms',
57 bootstrap_servers
=broker
, api_version
=(0,10))
58 #KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
60 def consume(self
, topic
, messages
):
61 for message
in self
._consumer
:
62 print ("%s:%d:%d: key=%s value=%s" % (message
.topic
, message
.partition
, message
.offset
, message
.key
, message
.value
))