X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus%2Fconsumer.py;h=b12cd8881f00967eaeff197bf625678f1c8877bd;hb=93699898c51364cde193d8d441f4aed45670e7bf;hp=c9021d217c2975784918ccd154e6385679a2c7e7;hpb=e80db311a29dc8562dc84ae3336af167bac2ec5b;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/consumer.py b/osm_mon/core/message_bus/consumer.py old mode 100755 new mode 100644 index c9021d2..b12cd88 --- a/osm_mon/core/message_bus/consumer.py +++ b/osm_mon/core/message_bus/consumer.py @@ -1,95 +1,15 @@ -# Copyright 2017 Intel Research and Development Ireland Limited -# ************************************************************* - -# This file is part of OSM Monitoring module -# All Rights Reserved to Intel Corporation - -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# For those usages not covered by the Apache License, Version 2.0 please -# contact: prithiv.mohan@intel.com or adrian.hoban@intel.com -## - -''' -This is a kafka consumer app that reads the messages from the message bus for -alarms and metrics responses. - -''' - -__author__ = "Prithiv Mohan" -__date__ = "06/Sep/2017" - - from kafka import KafkaConsumer -from kafka.errors import KafkaError -import json -import logging -import logging.config -import os - - -def logging_handler(filename, mode='a+', encoding=None): - if not os.path.exists(filename): - open(filename, 'a').close() - return logging.FileHandler(filename, mode) - -log_config = { - 'version': 1, - 'formatters': { - 'default': { - 'format': '%(asctime)s %(levelname)s %(name)s %(message)s' - }, - }, - 'handlers': { - 'file': { - '()': logging_handler, - 'level': 'DEBUG', - 'formatter': 'default', - 'filename': '/var/log/osm_mon.log', - 'mode': 'a+', - 'encoding': 'utf-8', - }, - }, - 'kafka': { - 'handlers': ['file'], - 'level': 'DEBUG', - }, - 'root': { - 'handlers': ['file'], - 'level': 'DEBUG', - }, -} - - -logging.config.dictConfig(log_config) -logger = logging.getLogger('kafka') -if "BROKER_URI" in os.environ: - broker = os.getenv("BROKER_URI") -else: - broker = "localhost:9092" +from osm_mon.core.settings import Config -alarm_consumer = KafkaConsumer( - 'alarm_response', 'osm_mon', bootstrap_servers=broker) -metric_consumer = KafkaConsumer( - 'metric_response', 'osm_mon', bootstrap_servers=broker) -try: - for message in alarm_consumer: - logger.debug(message) - for message in metric_consumer: - logger.debug(message) -except KafkaError: - log.exception() -alarm_consumer.subscribe('alarm_response') -metric_consumer.subscribe('metric_response') +# noinspection PyAbstractClass +class Consumer(KafkaConsumer): + def __init__(self, group_id): + cfg = Config.instance() + super().__init__(bootstrap_servers=cfg.BROKER_URI, + key_deserializer=bytes.decode, + value_deserializer=bytes.decode, + group_id=group_id, + session_timeout_ms=60000, + heartbeat_interval_ms=20000)