Refactors codebase
[osm/MON.git] / osm_mon / core / message_bus / common_consumer.py
index fd97f83..b8e33d2 100755 (executable)
@@ -23,21 +23,23 @@ import json
 import logging
 import sys
 import threading
+from json import JSONDecodeError
 
 import six
 import yaml
-from kafka import KafkaConsumer
 
 from osm_mon.common.common_db_client import CommonDbClient
 from osm_mon.core.auth import AuthManager
 from osm_mon.core.database import DatabaseManager
+from osm_mon.core.message_bus.consumer import Consumer
+from osm_mon.core.message_bus.producer import Producer
 from osm_mon.core.settings import Config
 from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
 from osm_mon.plugins.CloudWatch.connection import Connection
 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
-from osm_mon.plugins.OpenStack.Aodh import alarming
-from osm_mon.plugins.OpenStack.Gnocchi import metrics
+from osm_mon.plugins.OpenStack.Aodh import alarm_handler
+from osm_mon.plugins.OpenStack.Gnocchi import metric_handler
 from osm_mon.plugins.vRealiseOps import plugin_receiver
 
 cfg = Config.instance()
@@ -59,15 +61,13 @@ kafka_logger.addHandler(kafka_handler)
 class CommonConsumer:
 
     def __init__(self):
-        cfg = Config.instance()
-
         self.auth_manager = AuthManager()
         self.database_manager = DatabaseManager()
         self.database_manager.create_tables()
 
         # Create OpenStack alarming and metric instances
-        self.openstack_metrics = metrics.Metrics()
-        self.openstack_alarms = alarming.Alarming()
+        self.openstack_metrics = metric_handler.OpenstackMetricHandler()
+        self.openstack_alarms = alarm_handler.OpenstackAlarmHandler()
 
         # Create CloudWatch alarm and metric instances
         self.cloudwatch_alarms = plugin_alarms()
@@ -88,12 +88,7 @@ class CommonConsumer:
         return credentials.type
 
     def run(self):
-        common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
-                                        key_deserializer=bytes.decode,
-                                        value_deserializer=bytes.decode,
-                                        group_id="mon-consumer",
-                                        session_timeout_ms=60000,
-                                        heartbeat_interval_ms=20000)
+        common_consumer = Consumer("mon-consumer")
 
         topics = ['metric_request', 'alarm_request', 'vim_account']
         common_consumer.subscribe(topics)
@@ -108,9 +103,11 @@ class CommonConsumer:
         try:
             try:
                 values = json.loads(message.value)
-            except ValueError:
+            except JSONDecodeError:
                 values = yaml.safe_load(message.value)
 
+            response = None
+
             if message.topic == "vim_account":
                 if message.key == "create" or message.key == "edit":
                     self.auth_manager.store_auth_credentials(values)
@@ -153,31 +150,42 @@ class CommonConsumer:
                 if vim_type == "openstack":
                     log.info("This message is for the OpenStack plugin.")
                     if message.topic == "metric_request":
-                        self.openstack_metrics.metric_calls(message, vim_uuid)
+                        response = self.openstack_metrics.handle_request(message.key, values, vim_uuid)
                     if message.topic == "alarm_request":
-                        self.openstack_alarms.alarming(message, vim_uuid)
+                        response = self.openstack_alarms.handle_message(message.key, values, vim_uuid)
 
                 elif vim_type == "aws":
                     log.info("This message is for the CloudWatch plugin.")
                     aws_conn = self.aws_connection.setEnvironment()
                     if message.topic == "metric_request":
-                        self.cloudwatch_metrics.metric_calls(message, aws_conn)
+                        response = self.cloudwatch_metrics.metric_calls(message.key, values, aws_conn)
                     if message.topic == "alarm_request":
-                        self.cloudwatch_alarms.alarm_calls(message, aws_conn)
-                    if message.topic == "access_credentials":
-                        self.aws_access_credentials.access_credential_calls(message)
+                        response = self.cloudwatch_alarms.alarm_calls(message.key, values, aws_conn)
 
                 elif vim_type == "vmware":
                     log.info("This metric_request message is for the vROPs plugin.")
-                    self.vrops_rcvr.consume(message, vim_uuid)
+                    if message.topic == "metric_request":
+                        response = self.vrops_rcvr.handle_metric_requests(message.key, values, vim_uuid)
+                    if message.topic == "alarm_request":
+                        response = self.vrops_rcvr.handle_alarm_requests(message.key, values, vim_uuid)
 
                 else:
                     log.debug("vim_type is misconfigured or unsupported; %s",
                               vim_type)
+            if response:
+                self._publish_response(message.topic, message.key, response)
 
         except Exception:
             log.exception("Exception processing message: ")
 
+    def _publish_response(self, topic: str, key: str, msg: dict):
+        topic = topic.replace('request', 'response')
+        key = key.replace('request', 'response')
+        producer = Producer()
+        producer.send(topic=topic, key=key, value=json.dumps(msg))
+        producer.flush()
+        producer.close()
+
 
 if __name__ == '__main__':
     CommonConsumer().run()