MON vROPs plugin updates -
[osm/MON.git] / core / message_bus / producer.py
index 621d63f..f5abce2 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright© 2017 Intel Research and Development Ireland Limited
+# Copyright 2017 Intel Research and Development Ireland Limited
 # *************************************************************
 
 # This file is part of OSM Monitoring module
 # *************************************************************
 
 # This file is part of OSM Monitoring module
 '''
 This is a kafka producer app that interacts with the SO and the plugins of the
 datacenters like OpenStack, VMWare, AWS.
 '''
 This is a kafka producer app that interacts with the SO and the plugins of the
 datacenters like OpenStack, VMWare, AWS.
-#TODO: Interfacing with the APIs of the monitoring tool plugins (Prithiv Mohan).
 '''
 
 __author__ = "Prithiv Mohan"
 __date__   = "06/Sep/2017"
 
 
 '''
 
 __author__ = "Prithiv Mohan"
 __date__   = "06/Sep/2017"
 
 
-from kafka import KafkaProducer
+from kafka import KafkaProducer as kaf
 from kafka.errors import KafkaError
 import logging
 import json
 from kafka.errors import KafkaError
 import logging
 import json
+import jsmin
 import os
 from os import listdir
 import os
 from os import listdir
+from jsmin import jsmin
 
 
+json_path = os.path.join(os.pardir+"/models/")
 
 
 class KafkaProducer(object):
 
 
 
 class KafkaProducer(object):
 
-    def __init__(self, topic, message):
+    def __init__(self, topic):
 
         self._topic= topic
 
         self._topic= topic
-        self._message = message
 
 
-    if "ZOOKEEPER_URI" in os.environ:
-        broker = os.getenv("ZOOKEEPER_URI")
-    else:
-        broker = "localhost:2181"
+        if "BROKER_URI" in os.environ:
+            broker = os.getenv("BROKER_URI")
+        else:
+            broker = "localhost:9092"
 
         '''
 
         '''
-        If the zookeeper broker URI is not set in the env, by default,
+        If the broker URI is not set in the env, by default,
         localhost container is taken as the host because an instance of
         is already running.
         '''
 
         localhost container is taken as the host because an instance of
         is already running.
         '''
 
-        producer = KafkaProducer(key_serializer=str.encode,
+        self.producer = kaf(key_serializer=str.encode,
                    value_serializer=lambda v: json.dumps(v).encode('ascii'),
                    bootstrap_servers=broker, api_version=(0,10))
 
 
                    value_serializer=lambda v: json.dumps(v).encode('ascii'),
                    bootstrap_servers=broker, api_version=(0,10))
 
 
-    def publish(self, key, message, topic=None):
+    def publish(self, key, value, topic=None):
         try:
         try:
-            future = producer.send('alarms', key, payload)
-            producer.flush()
+            future = self.producer.send(key, value, topic)
+            self.producer.flush()
         except Exception:
         except Exception:
-            log.exception("Error publishing to {} topic." .format(topic))
+            logging.exception("Error publishing to {} topic." .format(topic))
             raise
         try:
             record_metadata = future.get(timeout=10)
             raise
         try:
             record_metadata = future.get(timeout=10)
-            self._log.debug("TOPIC:", record_metadata.topic)
-            self._log.debug("PARTITION:", record_metadata.partition)
-            self._log.debug("OFFSET:", record_metadata.offset)
+            logging.debug("TOPIC:", record_metadata.topic)
+            logging.debug("PARTITION:", record_metadata.partition)
+            logging.debug("OFFSET:", record_metadata.offset)
         except KafkaError:
             pass
 
         except KafkaError:
             pass
 
-    json_path = os.path.join(os.pardir+"/models/")
-
     def create_alarm_request(self, key, message, topic):
 
        #External to MON
 
     def create_alarm_request(self, key, message, topic):
 
        #External to MON
 
-        payload_create_alarm = json.loads(open(os.path.join(json_path,
+        payload_create_alarm = jsmin(open(os.path.join(json_path,
                                          'create_alarm.json')).read())
                                          'create_alarm.json')).read())
-        publish(key,
+        self.publish(key,
                 value=json.dumps(payload_create_alarm),
                 topic='alarm_request')
 
                 value=json.dumps(payload_create_alarm),
                 topic='alarm_request')
 
@@ -93,41 +92,51 @@ class KafkaProducer(object):
 
        #Internal to MON
 
 
        #Internal to MON
 
-        payload_create_alarm_resp = json.loads(open(os.path.join(json_path,
+        payload_create_alarm_resp = jsmin(open(os.path.join(json_path,
                                          'create_alarm_resp.json')).read())
 
                                          'create_alarm_resp.json')).read())
 
-        publish(key,
-                value = json.dumps(payload_create_alarm_resp),
+        self.publish(key,
+                value = message,
                 topic = 'alarm_response')
 
                 topic = 'alarm_response')
 
+    def acknowledge_alarm(self, key, message, topic):
+
+       #Internal to MON
+
+        payload_acknowledge_alarm = jsmin(open(os.path.join(json_path,
+                                         'acknowledge_alarm.json')).read())
+
+        self.publish(key,
+                value = json.dumps(payload_acknowledge_alarm),
+                topic = 'alarm_request')
 
     def list_alarm_request(self, key, message, topic):
 
         #External to MON
 
 
     def list_alarm_request(self, key, message, topic):
 
         #External to MON
 
-        payload_alarm_list_req = json.loads(open(os.path.join(json_path,
+        payload_alarm_list_req = jsmin(open(os.path.join(json_path,
                                       'list_alarm_req.json')).read())
 
                                       'list_alarm_req.json')).read())
 
-        publish(key,
+        self.publish(key,
                 value=json.dumps(payload_alarm_list_req),
                 topic='alarm_request')
 
     def notify_alarm(self, key, message, topic):
 
                 value=json.dumps(payload_alarm_list_req),
                 topic='alarm_request')
 
     def notify_alarm(self, key, message, topic):
 
-        payload_notify_alarm = json.loads(open(os.path.join(json_path,
+        payload_notify_alarm = jsmin(open(os.path.join(json_path,
                                           'notify_alarm.json')).read())
 
                                           'notify_alarm.json')).read())
 
-        publish(key,
-                value=json.dumps(payload_notify_alarm),
+        self.publish(key,
+                value=message,
                 topic='alarm_response')
 
     def list_alarm_response(self, key, message, topic):
 
                 topic='alarm_response')
 
     def list_alarm_response(self, key, message, topic):
 
-        payload_list_alarm_resp = json.loads(open(os.path.join(json_path,
+        payload_list_alarm_resp = jsmin(open(os.path.join(json_path,
                                              'list_alarm_resp.json')).read())
 
                                              'list_alarm_resp.json')).read())
 
-        publish(key,
-                value=json.dumps(payload_list_alarm_resp),
+        self.publish(key,
+                value=message,
                 topic='alarm_response')
 
 
                 topic='alarm_response')
 
 
@@ -135,10 +144,10 @@ class KafkaProducer(object):
 
       # External to Mon
 
 
       # External to Mon
 
-        payload_update_alarm_req = json.loads(open(os.path.join(json_path,
+        payload_update_alarm_req = jsmin(open(os.path.join(json_path,
                                         'update_alarm_req.json')).read())
 
                                         'update_alarm_req.json')).read())
 
-        publish(key,
+        self.publish(key,
                 value=json.dumps(payload_update_alarm_req),
                 topic='alarm_request')
 
                 value=json.dumps(payload_update_alarm_req),
                 topic='alarm_request')
 
@@ -147,11 +156,11 @@ class KafkaProducer(object):
 
         # Internal to Mon 
 
 
         # Internal to Mon 
 
-        payload_update_alarm_resp = json.loads(open(os.path.join(json_path,
+        payload_update_alarm_resp = jsmin(open(os.path.join(json_path,
                                         'update_alarm_resp.json')).read())
 
                                         'update_alarm_resp.json')).read())
 
-        publish(key,
-                value=json.dumps(payload_update_alarm_resp),
+        self.publish(key,
+                value=message,
                 topic='alarm_response')
 
 
                 topic='alarm_response')
 
 
@@ -159,10 +168,10 @@ class KafkaProducer(object):
 
       # External to Mon
 
 
       # External to Mon
 
-        payload_delete_alarm_req = json.loads(open(os.path.join(json_path,
+        payload_delete_alarm_req = jsmin(open(os.path.join(json_path,
                                         'delete_alarm_req.json')).read())
 
                                         'delete_alarm_req.json')).read())
 
-        publish(key,
+        self.publish(key,
                 value=json.dumps(payload_delete_alarm_req),
                 topic='alarm_request')
 
                 value=json.dumps(payload_delete_alarm_req),
                 topic='alarm_request')
 
@@ -170,11 +179,11 @@ class KafkaProducer(object):
 
       # Internal to Mon
 
 
       # Internal to Mon
 
-        payload_delete_alarm_resp = json.loads(open(os.path.join(json_path,
+        payload_delete_alarm_resp = jsmin(open(os.path.join(json_path,
                                                'delete_alarm_resp.json')).read())
 
                                                'delete_alarm_resp.json')).read())
 
-        publish(key,
-                value=json.dumps(payload_delete_alarm_resp),
+        self.publish(key,
+                value=message,
                 topic='alarm_response')
 
 
                 topic='alarm_response')
 
 
@@ -183,10 +192,10 @@ class KafkaProducer(object):
 
         # External to Mon
 
 
         # External to Mon
 
-        payload_create_metrics_req = json.loads(open(os.path.join(json_path,
+        payload_create_metrics_req = jsmin(open(os.path.join(json_path,
                                                 'create_metric_req.json')).read())
 
                                                 'create_metric_req.json')).read())
 
-        publish(key,
+        self.publish(key,
                 value=json.dumps(payload_create_metrics_req),
                 topic='metric_request')
 
                 value=json.dumps(payload_create_metrics_req),
                 topic='metric_request')
 
@@ -195,11 +204,11 @@ class KafkaProducer(object):
 
         # Internal to Mon
 
 
         # Internal to Mon
 
-        payload_create_metrics_resp = json.loads(open(os.path.join(json_path,
+        payload_create_metrics_resp = jsmin(open(os.path.join(json_path,
                                                  'create_metric_resp.json')).read())
 
                                                  'create_metric_resp.json')).read())
 
-        publish(key,
-                value=json.dumps(payload_create_metrics_resp),
+        self.publish(key,
+                value=message,
                 topic='metric_response')
 
 
                 topic='metric_response')
 
 
@@ -207,10 +216,10 @@ class KafkaProducer(object):
 
         # External to Mon
 
 
         # External to Mon
 
-        payload_read_metric_data_request = json.loads(open(os.path.join(json_path,
+        payload_read_metric_data_request = jsmin(open(os.path.join(json_path,
                                                       'read_metric_data_req.json')).read())
 
                                                       'read_metric_data_req.json')).read())
 
-        publish(key,
+        self.publish(key,
                 value=json.dumps(payload_read_metric_data_request),
                 topic='metric_request')
 
                 value=json.dumps(payload_read_metric_data_request),
                 topic='metric_request')
 
@@ -219,11 +228,11 @@ class KafkaProducer(object):
 
         # Internal to Mon
 
 
         # Internal to Mon
 
-        payload_metric_data_response = json.loads(open(os.path.join(json_path,
+        payload_metric_data_response = jsmin(open(os.path.join(json_path,
                                                   'read_metric_data_resp.json')).read())
 
                                                   'read_metric_data_resp.json')).read())
 
-        publish(key,
-                value=json.dumps(payload_metric_data_response),
+        self.publish(key,
+                value=message,
                 topic='metric_response')
 
 
                 topic='metric_response')
 
 
@@ -231,10 +240,10 @@ class KafkaProducer(object):
 
         #External to MON
 
 
         #External to MON
 
-        payload_metric_list_req = json.loads(open(os.path.join(json_path,
+        payload_metric_list_req = jsmin(open(os.path.join(json_path,
                                              'list_metric_req.json')).read())
 
                                              'list_metric_req.json')).read())
 
-        publish(key,
+        self.publish(key,
                 value=json.dumps(payload_metric_list_req),
                 topic='metric_request')
 
                 value=json.dumps(payload_metric_list_req),
                 topic='metric_request')
 
@@ -242,11 +251,11 @@ class KafkaProducer(object):
 
       #Internal to MON
 
 
       #Internal to MON
 
-        payload_metric_list_resp = json.loads(open(os.path.join(json_path,
+        payload_metric_list_resp = jsmin(open(os.path.join(json_path,
                                               'list_metrics_resp.json')).read())
 
                                               'list_metrics_resp.json')).read())
 
-        publish(key,
-                value=json.dumps(payload_metric_list_resp),
+        self.publish(key,
+                value=message,
                 topic='metric_response')
 
 
                 topic='metric_response')
 
 
@@ -254,10 +263,10 @@ class KafkaProducer(object):
 
       # External to Mon
 
 
       # External to Mon
 
-        payload_delete_metric_req = json.loads(open(os.path.join(json_path,
+        payload_delete_metric_req = jsmin(open(os.path.join(json_path,
                                                'delete_metric_req.json')).read())
 
                                                'delete_metric_req.json')).read())
 
-        publish(key,
+        self.publish(key,
                 value=json.dumps(payload_delete_metric_req),
                 topic='metric_request')
 
                 value=json.dumps(payload_delete_metric_req),
                 topic='metric_request')
 
@@ -266,11 +275,11 @@ class KafkaProducer(object):
 
       # Internal to Mon
 
 
       # Internal to Mon
 
-        payload_delete_metric_resp = json.loads(open(os.path.join(json_path,
+        payload_delete_metric_resp = jsmin(open(os.path.join(json_path,
                                                 'delete_metric_resp.json')).read())
 
                                                 'delete_metric_resp.json')).read())
 
-        publish(key,
-                value=json.dumps(payload_delete_metric_resp),
+        self.publish(key,
+                value=message,
                 topic='metric_response')
 
 
                 topic='metric_response')
 
 
@@ -278,10 +287,10 @@ class KafkaProducer(object):
 
         # External to Mon
 
 
         # External to Mon
 
-        payload_update_metric_req = json.loads(open(os.path.join(json_path,
+        payload_update_metric_req = jsmin(open(os.path.join(json_path,
                                                'update_metric_req.json')).read())
 
                                                'update_metric_req.json')).read())
 
-        publish(key,
+        self.publish(key,
                 value=json.dumps(payload_update_metric_req),
                 topic='metric_request')
 
                 value=json.dumps(payload_update_metric_req),
                 topic='metric_request')
 
@@ -290,18 +299,18 @@ class KafkaProducer(object):
 
         # Internal to Mon
 
 
         # Internal to Mon
 
-        payload_update_metric_resp = json.loads(open(os.path.join(json_path,
+        payload_update_metric_resp = jsmin(open(os.path.join(json_path,
                                                 'update_metric_resp.json')).read())
 
                                                 'update_metric_resp.json')).read())
 
-        publish(key,
-                value=json.dumps(payload_update_metric_resp),
-                topic='metric_response)
+        self.publish(key,
+                value=message,
+                topic='metric_response')
 
     def access_credentials(self, key, message, topic):
 
 
     def access_credentials(self, key, message, topic):
 
-        payload_access_credentials = json.loads(open(os.path.join(json_path,
+        payload_access_credentials = jsmin(open(os.path.join(json_path,
                                                 'access_credentials.json')).read())
 
                                                 'access_credentials.json')).read())
 
-        publish(key,
+        self.publish(key,
                 value=json.dumps(payload_access_credentials),
                 topic='access_credentials')
                 value=json.dumps(payload_access_credentials),
                 topic='access_credentials')