Adds OSMMON_KAFKA_LOG_LEVEL env var
[osm/MON.git] / osm_mon / core / message_bus / producer.py
index bf0839c..f6feba1 100755 (executable)
@@ -61,103 +61,34 @@ class KafkaProducer(object):
         self.producer = kaf(
             key_serializer=str.encode,
             value_serializer=str.encode,
-            bootstrap_servers=broker, api_version=(0, 10))
+            bootstrap_servers=broker, api_version=(0, 10, 1))
 
     def publish(self, key, value, topic=None):
         """Send the required message on the Kafka message bus."""
         try:
             future = self.producer.send(topic=topic, key=key, value=value)
-            self.producer.flush()
+            future.get(timeout=10)
         except Exception:
             logging.exception("Error publishing to {} topic." .format(topic))
             raise
-        try:
-            record_metadata = future.get(timeout=10)
-            logging.debug("TOPIC:", record_metadata.topic)
-            logging.debug("PARTITION:", record_metadata.partition)
-            logging.debug("OFFSET:", record_metadata.offset)
-        except KafkaError:
-            pass
-
-    def create_alarm_request(self, key, message):
-        """Create alarm request from SO to MON."""
-        # External to MON
-
-        self.publish(key,
-                     value=message,
-                     topic='alarm_request')
-
-    def create_alarm_response(self, key, message):
-        """Response to a create alarm request from MON to SO."""
-        # Internal to MON
-
-        self.publish(key,
-                     value=message,
-                     topic='alarm_response')
 
-    def acknowledge_alarm(self, key, message):
-        """Alarm acknowledgement request from SO to MON."""
-        # Internal to MON
-
-        self.publish(key,
-                     value=message,
-                     topic='alarm_request')
-
-    def list_alarm_request(self, key, message):
-        """List alarms request from SO to MON."""
+    def publish_alarm_request(self, key, message):
+        """Publish an alarm request."""
         # External to MON
 
         self.publish(key,
                      value=message,
                      topic='alarm_request')
 
-    def notify_alarm(self, key, message):
-        """Notify of triggered alarm from MON to SO."""
-
-        self.publish(key,
-                     value=message,
-                     topic='alarm_response')
-
-    def list_alarm_response(self, key, message):
-        """Response for list alarms request from MON to SO."""
-
-        self.publish(key,
-                     value=message,
-                     topic='alarm_response')
-
-    def update_alarm_request(self, key, message):
-        """Update alarm request from SO to MON."""
-        # External to Mon
-
-        self.publish(key,
-                     value=message,
-                     topic='alarm_request')
-
-    def update_alarm_response(self, key, message):
-        """Response from update alarm request from MON to SO."""
-        # Internal to Mon
-
-        self.publish(key,
-                     value=message,
-                     topic='alarm_response')
-
-    def delete_alarm_request(self, key, message):
-        """Delete alarm request from SO to MON."""
-        # External to Mon
-
-        self.publish(key,
-                     value=message,
-                     topic='alarm_request')
-
-    def delete_alarm_response(self, key, message):
-        """Response for a delete alarm request from MON to SO."""
-        # Internal to Mon
+    def publish_alarm_response(self, key, message):
+        """Publish an alarm response."""
+        # Internal to MON
 
         self.publish(key,
                      value=message,
                      topic='alarm_response')
 
-    def create_metrics_request(self, key, message):
+    def publish_metrics_request(self, key, message):
         """Create metrics request from SO to MON."""
         # External to Mon
 
@@ -165,7 +96,7 @@ class KafkaProducer(object):
                      value=message,
                      topic='metric_request')
 
-    def create_metrics_resp(self, key, message):
+    def publish_metrics_response(self, key, message):
         """Response for a create metric request from MON to SO."""
         # Internal to Mon