Adds alarm engine
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py
deleted file mode 100755 (executable)
index 3a95c76..0000000
+++ /dev/null
@@ -1,205 +0,0 @@
-# Copyright 2017 Intel Research and Development Ireland Limited
-# *************************************************************
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Intel Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you
-# may not use this file except in compliance with the License. You may
-# obtain a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied. See the License for the specific language governing
-# permissions and limitations under the License.
-#
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
-"""A common KafkaConsumer for all MON plugins."""
-
-import json
-import logging
-import sys
-import time
-from json import JSONDecodeError
-
-import six
-import yaml
-
-from osm_mon.common.common_db_client import CommonDbClient
-from osm_mon.core.auth import AuthManager
-from osm_mon.core.database import DatabaseManager
-from osm_mon.core.message_bus.consumer import Consumer
-from osm_mon.core.message_bus.producer import Producer
-from osm_mon.core.settings import Config
-from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
-from osm_mon.plugins.CloudWatch.connection import Connection
-from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
-from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
-from osm_mon.plugins.OpenStack.Aodh import alarm_handler
-from osm_mon.plugins.OpenStack.Gnocchi import metric_handler
-from osm_mon.plugins.vRealiseOps import plugin_receiver
-
-cfg = Config.instance()
-
-logging.basicConfig(stream=sys.stdout,
-                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-                    datefmt='%m/%d/%Y %I:%M:%S %p',
-                    level=logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
-log = logging.getLogger(__name__)
-
-kafka_logger = logging.getLogger('kafka')
-kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
-
-
-class CommonConsumer:
-
-    def __init__(self):
-        self.auth_manager = AuthManager()
-        self.database_manager = DatabaseManager()
-        self.database_manager.create_tables()
-
-        # Create OpenStack alarming and metric instances
-        self.openstack_metrics = metric_handler.OpenstackMetricHandler()
-        self.openstack_alarms = alarm_handler.OpenstackAlarmHandler()
-
-        # Create CloudWatch alarm and metric instances
-        self.cloudwatch_alarms = plugin_alarms()
-        self.cloudwatch_metrics = plugin_metrics()
-        self.aws_connection = Connection()
-        self.aws_access_credentials = AccessCredentials()
-
-        # Create vROps plugin_receiver class instance
-        self.vrops_rcvr = plugin_receiver.PluginReceiver()
-
-        log.info("Connecting to MongoDB...")
-        self.common_db = CommonDbClient()
-        log.info("Connection successful.")
-
-    def get_vim_type(self, vim_uuid):
-        """Get the vim type that is required by the message."""
-        credentials = self.database_manager.get_credentials(vim_uuid)
-        return credentials.type
-
-    def run(self):
-        common_consumer = Consumer("mon-consumer")
-
-        topics = ['metric_request', 'alarm_request', 'vim_account']
-        common_consumer.subscribe(topics)
-        retries = 1
-        max_retries = 5
-        while True:
-            try:
-                common_consumer.poll()
-                common_consumer.seek_to_end()
-                break
-            except Exception:
-                log.error("Error getting Kafka partitions. Maybe Kafka is not ready yet.")
-                log.error("Retry number %d of %d", retries, max_retries)
-                if retries >= max_retries:
-                    log.error("Achieved max number of retries. Logging exception and exiting...")
-                    log.exception("Exception: ")
-                    return
-                retries = retries + 1
-                time.sleep(2)
-
-        log.info("Listening for messages...")
-        for message in common_consumer:
-            self.consume_message(message)
-
-    def consume_message(self, message):
-        log.info("Message arrived: %s", message)
-        try:
-            try:
-                values = json.loads(message.value)
-            except JSONDecodeError:
-                values = yaml.safe_load(message.value)
-
-            response = None
-
-            if message.topic == "vim_account":
-                if message.key == "create" or message.key == "edit":
-                    values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'],
-                                                                                 values['schema_version'],
-                                                                                 values['_id'])
-                    self.auth_manager.store_auth_credentials(values)
-                if message.key == "delete":
-                    self.auth_manager.delete_auth_credentials(values)
-
-            else:
-                # Get ns_id from message
-                # TODO: Standardize all message models to avoid the need of figuring out where are certain fields
-                contains_list = False
-                list_index = None
-                for k, v in six.iteritems(values):
-                    if isinstance(v, dict):
-                        if 'ns_id' in v:
-                            contains_list = True
-                            list_index = k
-                            break
-                if not contains_list and 'ns_id' in values:
-                    ns_id = values['ns_id']
-                else:
-                    ns_id = values[list_index]['ns_id']
-
-                vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
-
-                # Check the vim desired by the message
-                vnfr = self.common_db.get_vnfr(ns_id, vnf_index)
-                vim_uuid = vnfr['vim-account-id']
-
-                if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
-                    vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
-                    vdur = self.common_db.get_vdur(ns_id, vnf_index, vdu_name)
-                    if contains_list:
-                        values[list_index]['resource_uuid'] = vdur['vim-id']
-                    else:
-                        values['resource_uuid'] = vdur['vim-id']
-                    message = message._replace(value=json.dumps(values))
-
-                vim_type = self.get_vim_type(vim_uuid)
-
-                if vim_type == "openstack":
-                    log.info("This message is for the OpenStack plugin.")
-                    if message.topic == "metric_request":
-                        response = self.openstack_metrics.handle_request(message.key, values, vim_uuid)
-                    if message.topic == "alarm_request":
-                        response = self.openstack_alarms.handle_message(message.key, values, vim_uuid)
-
-                elif vim_type == "aws":
-                    log.info("This message is for the CloudWatch plugin.")
-                    aws_conn = self.aws_connection.setEnvironment()
-                    if message.topic == "metric_request":
-                        response = self.cloudwatch_metrics.metric_calls(message.key, values, aws_conn)
-                    if message.topic == "alarm_request":
-                        response = self.cloudwatch_alarms.alarm_calls(message.key, values, aws_conn)
-
-                elif vim_type == "vmware":
-                    log.info("This metric_request message is for the vROPs plugin.")
-                    if message.topic == "metric_request":
-                        response = self.vrops_rcvr.handle_metric_requests(message.key, values, vim_uuid)
-                    if message.topic == "alarm_request":
-                        response = self.vrops_rcvr.handle_alarm_requests(message.key, values, vim_uuid)
-
-                else:
-                    log.debug("vim_type is misconfigured or unsupported; %s",
-                              vim_type)
-            if response:
-                self._publish_response(message.topic, message.key, response)
-
-        except Exception:
-            log.exception("Exception processing message: ")
-
-    def _publish_response(self, topic: str, key: str, msg: dict):
-        topic = topic.replace('request', 'response')
-        key = key.replace('request', 'response')
-        producer = Producer()
-        producer.send(topic=topic, key=key, value=json.dumps(msg))
-        producer.flush(timeout=5)
-        producer.close()
-
-
-if __name__ == '__main__':
-    CommonConsumer().run()