Refactors codebase
[osm/MON.git] / osm_mon / core / message_bus / consumer.py
old mode 100755 (executable)
new mode 100644 (file)
index c9021d2..b12cd88
@@ -1,95 +1,15 @@
-# Copyright 2017 Intel Research and Development Ireland Limited
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Intel Corporation
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-#         http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
-##
-
-'''
-This is a kafka consumer app that reads the messages from the message bus for
-alarms and metrics responses.
-
-'''
-
-__author__ = "Prithiv Mohan"
-__date__ = "06/Sep/2017"
-
-
 from kafka import KafkaConsumer
-from kafka.errors import KafkaError
-import json
-import logging
-import logging.config
-import os
-
-
-def logging_handler(filename, mode='a+', encoding=None):
-    if not os.path.exists(filename):
-        open(filename, 'a').close()
-    return logging.FileHandler(filename, mode)
-
-log_config = {
-    'version': 1,
-    'formatters': {
-        'default': {
-            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
-        },
-    },
-    'handlers': {
-        'file': {
-            '()': logging_handler,
-            'level': 'DEBUG',
-            'formatter': 'default',
-            'filename': '/var/log/osm_mon.log',
-            'mode': 'a+',
-            'encoding': 'utf-8',
-        },
-    },
-    'kafka': {
-        'handlers': ['file'],
-        'level': 'DEBUG',
-    },
-    'root': {
-        'handlers': ['file'],
-        'level': 'DEBUG',
-    },
-}
-
-
-logging.config.dictConfig(log_config)
-logger = logging.getLogger('kafka')
 
-if "BROKER_URI" in os.environ:
-    broker = os.getenv("BROKER_URI")
-else:
-    broker = "localhost:9092"
+from osm_mon.core.settings import Config
 
-alarm_consumer = KafkaConsumer(
-    'alarm_response', 'osm_mon', bootstrap_servers=broker)
-metric_consumer = KafkaConsumer(
-    'metric_response', 'osm_mon', bootstrap_servers=broker)
-try:
-    for message in alarm_consumer:
-        logger.debug(message)
-    for message in metric_consumer:
-        logger.debug(message)
-except KafkaError:
-    log.exception()
 
-alarm_consumer.subscribe('alarm_response')
-metric_consumer.subscribe('metric_response')
+# noinspection PyAbstractClass
+class Consumer(KafkaConsumer):
+    def __init__(self, group_id):
+        cfg = Config.instance()
+        super().__init__(bootstrap_servers=cfg.BROKER_URI,
+                         key_deserializer=bytes.decode,
+                         value_deserializer=bytes.decode,
+                         group_id=group_id,
+                         session_timeout_ms=60000,
+                         heartbeat_interval_ms=20000)