c9021d217c2975784918ccd154e6385679a2c7e7
[osm/MON.git] / osm_mon / core / message_bus / consumer.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
21 ##
22
23 '''
24 This is a kafka consumer app that reads the messages from the message bus for
25 alarms and metrics responses.
26
27 '''
28
29 __author__ = "Prithiv Mohan"
30 __date__ = "06/Sep/2017"
31
32
33 from kafka import KafkaConsumer
34 from kafka.errors import KafkaError
35 import json
36 import logging
37 import logging.config
38 import os
39
40
41 def logging_handler(filename, mode='a+', encoding=None):
42 if not os.path.exists(filename):
43 open(filename, 'a').close()
44 return logging.FileHandler(filename, mode)
45
46 log_config = {
47 'version': 1,
48 'formatters': {
49 'default': {
50 'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
51 },
52 },
53 'handlers': {
54 'file': {
55 '()': logging_handler,
56 'level': 'DEBUG',
57 'formatter': 'default',
58 'filename': '/var/log/osm_mon.log',
59 'mode': 'a+',
60 'encoding': 'utf-8',
61 },
62 },
63 'kafka': {
64 'handlers': ['file'],
65 'level': 'DEBUG',
66 },
67 'root': {
68 'handlers': ['file'],
69 'level': 'DEBUG',
70 },
71 }
72
73
74 logging.config.dictConfig(log_config)
75 logger = logging.getLogger('kafka')
76
77 if "BROKER_URI" in os.environ:
78 broker = os.getenv("BROKER_URI")
79 else:
80 broker = "localhost:9092"
81
82 alarm_consumer = KafkaConsumer(
83 'alarm_response', 'osm_mon', bootstrap_servers=broker)
84 metric_consumer = KafkaConsumer(
85 'metric_response', 'osm_mon', bootstrap_servers=broker)
86 try:
87 for message in alarm_consumer:
88 logger.debug(message)
89 for message in metric_consumer:
90 logger.debug(message)
91 except KafkaError:
92 log.exception()
93
94 alarm_consumer.subscribe('alarm_response')
95 metric_consumer.subscribe('metric_response')