Merge "Added vROPs consumer, receiver that will consume messages & act on it. Aligned...
[osm/MON.git] / core / message_bus / northbound_consumer.py
1
2 # Copyright© 2017 Intel Research and Development Ireland Limited
3 # *************************************************************
4
5 # This file is part of OSM Monitoring module
6 # All Rights Reserved to Intel Corporation
7
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11
12 # http://www.apache.org/licenses/LICENSE-2.0
13
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
22 ##
23
24 '''
25 This is a kafka consumer app that reads the messages from the message bus for
26 alarms and metrics responses.
27
28 #TODO: (Prithiv Mohan)
29 - Modify topics based on new schema definitions
30 - Include consumer logging
31 '''
32
33 __author__ = "Prithiv Mohan"
34 __date__ = "06/Sep/2017"
35
36 from kafka import KafkaConsumer
37 from kafka.errors import KafkaError
38 import logging
39
40 class KafkaConsumer(object):
41 """Adds messages to a kafka topic. Topic is hardcoded as 'alarms' and group as
42 'my_group' for now.
43
44 """
45
46 def __init__(self, uri):
47 """Init
48
49 uri - kafka connection details
50 """
51 if not cfg.CONF.kafka.uri:
52 raise Exception("Kafka URI Not Found. Check the config file for Kafka URI")
53 else:
54 broker = cfg.CONF.kafka.uri
55 consumer = KafkaConsumer('alarms',
56 group_id='my_group',
57 bootstrap_servers=broker, api_version=(0,10))
58 #KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
59
60 def consume(self, topic, messages):
61 for message in self._consumer:
62 print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))