Renaming folder structure
[osm/MON.git] / osm_mon / core / message_bus / consumer.py
diff --git a/osm_mon/core/message_bus/consumer.py b/osm_mon/core/message_bus/consumer.py
new file mode 100755 (executable)
index 0000000..c9021d2
--- /dev/null
@@ -0,0 +1,95 @@
+# Copyright 2017 Intel Research and Development Ireland Limited
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Intel Corporation
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
+##
+
+'''
+This is a kafka consumer app that reads the messages from the message bus for
+alarms and metrics responses.
+
+'''
+
+__author__ = "Prithiv Mohan"
+__date__ = "06/Sep/2017"
+
+
+from kafka import KafkaConsumer
+from kafka.errors import KafkaError
+import json
+import logging
+import logging.config
+import os
+
+
+def logging_handler(filename, mode='a+', encoding=None):
+    if not os.path.exists(filename):
+        open(filename, 'a').close()
+    return logging.FileHandler(filename, mode)
+
+log_config = {
+    'version': 1,
+    'formatters': {
+        'default': {
+            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
+        },
+    },
+    'handlers': {
+        'file': {
+            '()': logging_handler,
+            'level': 'DEBUG',
+            'formatter': 'default',
+            'filename': '/var/log/osm_mon.log',
+            'mode': 'a+',
+            'encoding': 'utf-8',
+        },
+    },
+    'kafka': {
+        'handlers': ['file'],
+        'level': 'DEBUG',
+    },
+    'root': {
+        'handlers': ['file'],
+        'level': 'DEBUG',
+    },
+}
+
+
+logging.config.dictConfig(log_config)
+logger = logging.getLogger('kafka')
+
+if "BROKER_URI" in os.environ:
+    broker = os.getenv("BROKER_URI")
+else:
+    broker = "localhost:9092"
+
+alarm_consumer = KafkaConsumer(
+    'alarm_response', 'osm_mon', bootstrap_servers=broker)
+metric_consumer = KafkaConsumer(
+    'metric_response', 'osm_mon', bootstrap_servers=broker)
+try:
+    for message in alarm_consumer:
+        logger.debug(message)
+    for message in metric_consumer:
+        logger.debug(message)
+except KafkaError:
+    log.exception()
+
+alarm_consumer.subscribe('alarm_response')
+metric_consumer.subscribe('metric_response')