c9021d217c2975784918ccd154e6385679a2c7e7
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
24 This is a kafka consumer app that reads the messages from the message bus for
25 alarms and metrics responses.
29 __author__
= "Prithiv Mohan"
30 __date__
= "06/Sep/2017"
33 from kafka
import KafkaConsumer
34 from kafka
.errors
import KafkaError
41 def logging_handler(filename
, mode
='a+', encoding
=None):
42 if not os
.path
.exists(filename
):
43 open(filename
, 'a').close()
44 return logging
.FileHandler(filename
, mode
)
50 'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
55 '()': logging_handler
,
57 'formatter': 'default',
58 'filename': '/var/log/osm_mon.log',
74 logging
.config
.dictConfig(log_config
)
75 logger
= logging
.getLogger('kafka')
77 if "BROKER_URI" in os
.environ
:
78 broker
= os
.getenv("BROKER_URI")
80 broker
= "localhost:9092"
82 alarm_consumer
= KafkaConsumer(
83 'alarm_response', 'osm_mon', bootstrap_servers
=broker
)
84 metric_consumer
= KafkaConsumer(
85 'metric_response', 'osm_mon', bootstrap_servers
=broker
)
87 for message
in alarm_consumer
:
89 for message
in metric_consumer
:
94 alarm_consumer
.subscribe('alarm_response')
95 metric_consumer
.subscribe('metric_response')