Updates to the common producer 00/5800/2
authorHelena McGough <helena.mcgough@intel.com>
Wed, 20 Dec 2017 10:49:21 +0000 (10:49 +0000)
committerh.mcgough <helena.mcgough@intel.com>
Wed, 20 Dec 2017 15:45:00 +0000 (17:45 +0200)
 - Fixes request messages
 - Included docstrings

Change-Id: Ibde323df9bb1b30f3f718c9b399394438356d1e0
Signed-off-by: Helena McGough <helena.mcgough@intel.com>
osm_mon/core/message_bus/producer.py

index 85b608a..aad8b62 100755 (executable)
@@ -3,7 +3,6 @@
 
 # This file is part of OSM Monitoring module
 # All Rights Reserved to Intel Corporation
-
 # Licensed under the Apache License, Version 2.0 (the "License"); you may
 # not use this file except in compliance with the License. You may obtain
 # a copy of the License at
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
 ##
-'''
-This is a kafka producer app that interacts with the SO and the plugins of the
-datacenters like OpenStack, VMWare, AWS.
-'''
+"""This is a common kafka producer app.
+
+It interacts with the SO and the plugins of the datacenters: OpenStack, VMWare
+and AWS.
+"""
 
-from kafka import KafkaProducer as kaf
-from kafka.errors import KafkaError
 import logging
-import json
-import jsmin
+
 import os
-from os import listdir
-from jsmin import jsmin
+
+import jsmin
+
+from kafka import KafkaProducer as kaf
+
+from kafka.errors import KafkaError
 
 __author__ = "Prithiv Mohan"
 __date__ = "06/Sep/2017"
 
-json_path=os.path.abspath(os.pardir+"/MON/osm_mon/core/models/")
+json_path = os.path.abspath(os.pardir + "/MON/osm_mon/core/models/")
+
+# TODO(): validate all of the request and response messages against the
+# json_schemas
+
 
 class KafkaProducer(object):
+    """A common KafkaProducer for requests and responses."""
 
     def __init__(self, topic):
-
+        """Initialize the common kafka producer."""
         self._topic = topic
 
         if "BROKER_URI" in os.environ:
@@ -61,6 +67,7 @@ class KafkaProducer(object):
             bootstrap_servers=broker, api_version=(0, 10))
 
     def publish(self, key, value, topic=None):
+        """Send the required message on the Kafka message bus."""
         try:
             future = self.producer.send(topic=topic, key=key, value=value)
             self.producer.flush()
@@ -76,17 +83,17 @@ class KafkaProducer(object):
             pass
 
     def create_alarm_request(self, key, message, topic):
-
+        """Create alarm request from SO to MON."""
         # External to MON
 
         payload_create_alarm = jsmin(
             open(os.path.join(json_path, 'create_alarm.json')).read())
         self.publish(key,
-                     value=json.dumps(payload_create_alarm),
+                     value=message,
                      topic='alarm_request')
 
     def create_alarm_response(self, key, message, topic):
-
+        """Response to a create alarm request from MON to SO."""
         # Internal to MON
 
         payload_create_alarm_resp = jsmin(
@@ -97,29 +104,29 @@ class KafkaProducer(object):
                      topic='alarm_response')
 
     def acknowledge_alarm(self, key, message, topic):
-
+        """Alarm acknowledgement request from SO to MON."""
         # Internal to MON
 
         payload_acknowledge_alarm = jsmin(
             open(os.path.join(json_path, 'acknowledge_alarm.json')).read())
 
         self.publish(key,
-                     value=json.dumps(payload_acknowledge_alarm),
+                     value=message,
                      topic='alarm_request')
 
     def list_alarm_request(self, key, message, topic):
-
+        """List alarms request from SO to MON."""
         # External to MON
 
         payload_alarm_list_req = jsmin(
             open(os.path.join(json_path, 'list_alarm_req.json')).read())
 
         self.publish(key,
-                     value=json.dumps(payload_alarm_list_req),
+                     value=message,
                      topic='alarm_request')
 
     def notify_alarm(self, key, message, topic):
-
+        """Notify of triggered alarm from MON to SO."""
         payload_notify_alarm = jsmin(
             open(os.path.join(json_path, 'notify_alarm.json')).read())
 
@@ -128,7 +135,7 @@ class KafkaProducer(object):
                      topic='alarm_response')
 
     def list_alarm_response(self, key, message, topic):
-
+        """Response for list alarms request from MON to SO."""
         payload_list_alarm_resp = jsmin(
             open(os.path.join(json_path, 'list_alarm_resp.json')).read())
 
@@ -137,18 +144,18 @@ class KafkaProducer(object):
                      topic='alarm_response')
 
     def update_alarm_request(self, key, message, topic):
-
+        """Update alarm request from SO to MON."""
         # External to Mon
 
         payload_update_alarm_req = jsmin(
             open(os.path.join(json_path, 'update_alarm_req.json')).read())
 
         self.publish(key,
-                     value=json.dumps(payload_update_alarm_req),
+                     value=message,
                      topic='alarm_request')
 
     def update_alarm_response(self, key, message, topic):
-
+        """Response from update alarm request from MON to SO."""
         # Internal to Mon
 
         payload_update_alarm_resp = jsmin(
@@ -159,18 +166,18 @@ class KafkaProducer(object):
                      topic='alarm_response')
 
     def delete_alarm_request(self, key, message, topic):
-
+        """Delete alarm request from SO to MON."""
         # External to Mon
 
         payload_delete_alarm_req = jsmin(
             open(os.path.join(json_path, 'delete_alarm_req.json')).read())
 
         self.publish(key,
-                     value=json.dumps(payload_delete_alarm_req),
+                     value=message,
                      topic='alarm_request')
 
     def delete_alarm_response(self, key, message, topic):
-
+        """Response for a delete alarm request from MON to SO."""
         # Internal to Mon
 
         payload_delete_alarm_resp = jsmin(
@@ -181,18 +188,18 @@ class KafkaProducer(object):
                      topic='alarm_response')
 
     def create_metrics_request(self, key, message, topic):
-
+        """Create metrics request from SO to MON."""
         # External to Mon
 
         payload_create_metrics_req = jsmin(
             open(os.path.join(json_path, 'create_metric_req.json')).read())
 
         self.publish(key,
-                     value=json.dumps(payload_create_metrics_req),
+                     value=message,
                      topic='metric_request')
 
     def create_metrics_resp(self, key, message, topic):
-
+        """Response for a create metric request from MON to SO."""
         # Internal to Mon
 
         payload_create_metrics_resp = jsmin(
@@ -203,18 +210,18 @@ class KafkaProducer(object):
                      topic='metric_response')
 
     def read_metric_data_request(self, key, message, topic):
-
+        """Read metric data request from SO to MON."""
         # External to Mon
 
         payload_read_metric_data_request = jsmin(
             open(os.path.join(json_path, 'read_metric_data_req.json')).read())
 
         self.publish(key,
-                     value=json.dumps(payload_read_metric_data_request),
+                     value=message,
                      topic='metric_request')
 
     def read_metric_data_response(self, key, message, topic):
-
+        """Response from MON to SO for read metric data request."""
         # Internal to Mon
 
         payload_metric_data_response = jsmin(
@@ -225,18 +232,18 @@ class KafkaProducer(object):
                      topic='metric_response')
 
     def list_metric_request(self, key, message, topic):
-
+        """List metric request from SO to MON."""
         # External to MON
 
         payload_metric_list_req = jsmin(
             open(os.path.join(json_path, 'list_metric_req.json')).read())
 
         self.publish(key,
-                     value=json.dumps(payload_metric_list_req),
+                     value=message,
                      topic='metric_request')
 
     def list_metric_response(self, key, message, topic):
-
+        """Response from SO to MON for list metrics request."""
         # Internal to MON
 
         payload_metric_list_resp = jsmin(
@@ -247,18 +254,18 @@ class KafkaProducer(object):
                      topic='metric_response')
 
     def delete_metric_request(self, key, message, topic):
-
+        """Delete metric request from SO to MON."""
         # External to Mon
 
         payload_delete_metric_req = jsmin(
             open(os.path.join(json_path, 'delete_metric_req.json')).read())
 
         self.publish(key,
-                     value=json.dumps(payload_delete_metric_req),
+                     value=message,
                      topic='metric_request')
 
     def delete_metric_response(self, key, message, topic):
-
+        """Response from MON to SO for delete metric request."""
         # Internal to Mon
 
         payload_delete_metric_resp = jsmin(
@@ -269,18 +276,18 @@ class KafkaProducer(object):
                      topic='metric_response')
 
     def update_metric_request(self, key, message, topic):
-
+        """Metric update request from SO to MON."""
         # External to Mon
 
         payload_update_metric_req = jsmin(
             open(os.path.join(json_path, 'update_metric_req.json')).read())
 
         self.publish(key,
-                     value=json.dumps(payload_update_metric_req),
+                     value=message,
                      topic='metric_request')
 
     def update_metric_response(self, key, message, topic):
-
+        """Reponse from MON to SO for metric update."""
         # Internal to Mon
 
         payload_update_metric_resp = jsmin(
@@ -291,10 +298,10 @@ class KafkaProducer(object):
                      topic='metric_response')
 
     def access_credentials(self, key, message, topic):
-
+        """Send access credentials to MON from SO."""
         payload_access_credentials = jsmin(
             open(os.path.join(json_path, 'access_credentials.json')).read())
 
         self.publish(key,
-                     value=json.dumps(payload_access_credentials),
+                     value=message,
                      topic='access_credentials')