Merge "[MON] Fixes notifier bug during alarm triggering" v4.0 BUILD_v4.0.1_1 BUILD_v4.0.1_2
authorlavado <glavado@whitestack.com>
Fri, 6 Jul 2018 19:14:23 +0000 (21:14 +0200)
committerGerrit Code Review <root@osm.etsi.org>
Fri, 6 Jul 2018 19:14:23 +0000 (21:14 +0200)
13 files changed:
osm_mon/core/message_bus/common_consumer.py
osm_mon/core/message_bus/producer.py
osm_mon/plugins/CloudWatch/plugin_alarm.py
osm_mon/plugins/CloudWatch/plugin_metric.py
osm_mon/plugins/OpenStack/Aodh/alarming.py
osm_mon/plugins/OpenStack/Aodh/notifier.py
osm_mon/plugins/OpenStack/Gnocchi/metrics.py
osm_mon/test/OpenStack/integration/test_alarm_integration.py
osm_mon/test/OpenStack/integration/test_metric_integration.py
osm_mon/test/OpenStack/integration/test_notify_alarm.py
osm_mon/test/OpenStack/unit/test_notifier.py
osm_mon/test/core/test_common_consumer.py
policy_module/osm_policy_module/core/agent.py

index dc5816e..60ad313 100755 (executable)
 import json
 import logging
 import sys
+import threading
 
 import six
 import yaml
-
 from kafka import KafkaConsumer
+from osm_common import dbmongo
 
+from osm_mon.core.auth import AuthManager
+from osm_mon.core.database import DatabaseManager
 from osm_mon.core.settings import Config
-from osm_mon.plugins.OpenStack.Aodh import alarming
-from osm_mon.plugins.OpenStack.Gnocchi import metrics
-
+from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
+from osm_mon.plugins.CloudWatch.connection import Connection
 from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
 from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
-from osm_mon.plugins.CloudWatch.connection import Connection
-from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
-
+from osm_mon.plugins.OpenStack.Aodh import alarming
+from osm_mon.plugins.OpenStack.Gnocchi import metrics
 from osm_mon.plugins.vRealiseOps import plugin_receiver
 
-from osm_mon.core.auth import AuthManager
-from osm_mon.core.database import DatabaseManager
-
-from osm_common import dbmongo
-
 logging.basicConfig(stream=sys.stdout,
                     format='%(asctime)s %(message)s',
                     datefmt='%m/%d/%Y %I:%M:%S %p',
@@ -51,63 +47,70 @@ logging.basicConfig(stream=sys.stdout,
 log = logging.getLogger(__name__)
 
 
-def get_vim_type(db_manager, vim_uuid):
-    """Get the vim type that is required by the message."""
-    credentials = db_manager.get_credentials(vim_uuid)
-    return credentials.type
-
-
-def get_vdur(common_db, nsr_id, member_index, vdu_name):
-    vnfr = get_vnfr(common_db, nsr_id, member_index)
-    for vdur in vnfr['vdur']:
-        if vdur['vdu-id-ref'] == vdu_name:
-            return vdur
-    raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index, vdu_name)
-
-
-def get_vnfr(common_db, nsr_id, member_index):
-    vnfr = common_db.get_one(table="vnfrs", filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
-    return vnfr
-
-
-def main():
-    cfg = Config.instance()
-    cfg.read_environ()
-
-    auth_manager = AuthManager()
-    database_manager = DatabaseManager()
-    database_manager.create_tables()
-
-    # Create OpenStack alarming and metric instances
-    openstack_metrics = metrics.Metrics()
-    openstack_alarms = alarming.Alarming()
-
-    # Create CloudWatch alarm and metric instances
-    cloudwatch_alarms = plugin_alarms()
-    cloudwatch_metrics = plugin_metrics()
-    aws_connection = Connection()
-    aws_access_credentials = AccessCredentials()
-
-    # Create vROps plugin_receiver class instance
-    vrops_rcvr = plugin_receiver.PluginReceiver()
-
-    common_db = dbmongo.DbMongo()
-    common_db_uri = cfg.MONGO_URI.split(':')
-    common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
-
-    # Initialize consumers for alarms and metrics
-    common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
-                                    key_deserializer=bytes.decode,
-                                    value_deserializer=bytes.decode,
-                                    group_id="mon-consumer")
-
-    # Define subscribe the consumer for the plugins
-    topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
-    # TODO: Remove access_credentials
-    common_consumer.subscribe(topics)
-
-    log.info("Listening for alarm_request and metric_request messages")
-    for message in common_consumer:
+class CommonConsumer:
+
+    def __init__(self):
+        cfg = Config.instance()
+
+        self.auth_manager = AuthManager()
+        self.database_manager = DatabaseManager()
+        self.database_manager.create_tables()
+
+        # Create OpenStack alarming and metric instances
+        self.openstack_metrics = metrics.Metrics()
+        self.openstack_alarms = alarming.Alarming()
+
+        # Create CloudWatch alarm and metric instances
+        self.cloudwatch_alarms = plugin_alarms()
+        self.cloudwatch_metrics = plugin_metrics()
+        self.aws_connection = Connection()
+        self.aws_access_credentials = AccessCredentials()
+
+        # Create vROps plugin_receiver class instance
+        self.vrops_rcvr = plugin_receiver.PluginReceiver()
+
+        log.info("Connecting to MongoDB...")
+        self.common_db = dbmongo.DbMongo()
+        common_db_uri = cfg.MONGO_URI.split(':')
+        self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
+        log.info("Connection successful.")
+
+        # Initialize consumers for alarms and metrics
+        self.common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
+                                             key_deserializer=bytes.decode,
+                                             value_deserializer=bytes.decode,
+                                             group_id="mon-consumer")
+
+        # Define subscribe the consumer for the plugins
+        topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
+        # TODO: Remove access_credentials
+        self.common_consumer.subscribe(topics)
+
+    def get_vim_type(self, vim_uuid):
+        """Get the vim type that is required by the message."""
+        credentials = self.database_manager.get_credentials(vim_uuid)
+        return credentials.type
+
+    def get_vdur(self, nsr_id, member_index, vdu_name):
+        vnfr = self.get_vnfr(nsr_id, member_index)
+        for vdur in vnfr['vdur']:
+            if vdur['vdu-id-ref'] == vdu_name:
+                return vdur
+        raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index,
+                         vdu_name)
+
+    def get_vnfr(self, nsr_id, member_index):
+        vnfr = self.common_db.get_one(table="vnfrs",
+                                      filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
+        return vnfr
+
+    def run(self):
+        log.info("Listening for messages...")
+        for message in self.common_consumer:
+            t = threading.Thread(target=self.consume_message, args=(message,))
+            t.start()
+
+    def consume_message(self, message):
         log.info("Message arrived: %s", message)
         try:
             try:
@@ -117,9 +120,9 @@ def main():
 
             if message.topic == "vim_account":
                 if message.key == "create" or message.key == "edit":
-                    auth_manager.store_auth_credentials(values)
+                    self.auth_manager.store_auth_credentials(values)
                 if message.key == "delete":
-                    auth_manager.delete_auth_credentials(values)
+                    self.auth_manager.delete_auth_credentials(values)
 
             else:
                 # Get ns_id from message
@@ -140,39 +143,40 @@ def main():
                 vnf_index = values[list_index]['vnf_member_index'] if contains_list else values['vnf_member_index']
 
                 # Check the vim desired by the message
-                vnfr = get_vnfr(common_db, ns_id, vnf_index)
+                vnfr = self.get_vnfr(ns_id, vnf_index)
                 vim_uuid = vnfr['vim-account-id']
-                vim_type = get_vim_type(database_manager, vim_uuid)
 
                 if (contains_list and 'vdu_name' in values[list_index]) or 'vdu_name' in values:
                     vdu_name = values[list_index]['vdu_name'] if contains_list else values['vdu_name']
-                    vdur = get_vdur(common_db, ns_id, vnf_index, vdu_name)
+                    vdur = self.get_vdur(ns_id, vnf_index, vdu_name)
                     if contains_list:
                         values[list_index]['resource_uuid'] = vdur['vim-id']
                     else:
                         values['resource_uuid'] = vdur['vim-id']
                     message = message._replace(value=json.dumps(values))
 
+                vim_type = self.get_vim_type(vim_uuid)
+
                 if vim_type == "openstack":
                     log.info("This message is for the OpenStack plugin.")
                     if message.topic == "metric_request":
-                        openstack_metrics.metric_calls(message, vim_uuid)
+                        self.openstack_metrics.metric_calls(message, vim_uuid)
                     if message.topic == "alarm_request":
-                        openstack_alarms.alarming(message, vim_uuid)
+                        self.openstack_alarms.alarming(message, vim_uuid)
 
                 elif vim_type == "aws":
                     log.info("This message is for the CloudWatch plugin.")
-                    aws_conn = aws_connection.setEnvironment()
+                    aws_conn = self.aws_connection.setEnvironment()
                     if message.topic == "metric_request":
-                        cloudwatch_metrics.metric_calls(message, aws_conn)
+                        self.cloudwatch_metrics.metric_calls(message, aws_conn)
                     if message.topic == "alarm_request":
-                        cloudwatch_alarms.alarm_calls(message, aws_conn)
+                        self.cloudwatch_alarms.alarm_calls(message, aws_conn)
                     if message.topic == "access_credentials":
-                        aws_access_credentials.access_credential_calls(message)
+                        self.aws_access_credentials.access_credential_calls(message)
 
                 elif vim_type == "vmware":
                     log.info("This metric_request message is for the vROPs plugin.")
-                    vrops_rcvr.consume(message)
+                    self.vrops_rcvr.consume(message)
 
                 else:
                     log.debug("vim_type is misconfigured or unsupported; %s",
@@ -183,4 +187,4 @@ def main():
 
 
 if __name__ == '__main__':
-    main()
+    CommonConsumer().run()
index bf0839c..f04ecf8 100755 (executable)
@@ -67,97 +67,34 @@ class KafkaProducer(object):
         """Send the required message on the Kafka message bus."""
         try:
             future = self.producer.send(topic=topic, key=key, value=value)
-            self.producer.flush()
+            record_metadata = future.get(timeout=10)
         except Exception:
             logging.exception("Error publishing to {} topic." .format(topic))
             raise
         try:
-            record_metadata = future.get(timeout=10)
             logging.debug("TOPIC:", record_metadata.topic)
             logging.debug("PARTITION:", record_metadata.partition)
             logging.debug("OFFSET:", record_metadata.offset)
         except KafkaError:
             pass
 
-    def create_alarm_request(self, key, message):
-        """Create alarm request from SO to MON."""
+    def publish_alarm_request(self, key, message):
+        """Publish an alarm request."""
         # External to MON
 
         self.publish(key,
                      value=message,
                      topic='alarm_request')
 
-    def create_alarm_response(self, key, message):
-        """Response to a create alarm request from MON to SO."""
-        # Internal to MON
-
-        self.publish(key,
-                     value=message,
-                     topic='alarm_response')
-
-    def acknowledge_alarm(self, key, message):
-        """Alarm acknowledgement request from SO to MON."""
+    def publish_alarm_response(self, key, message):
+        """Publish an alarm response."""
         # Internal to MON
 
-        self.publish(key,
-                     value=message,
-                     topic='alarm_request')
-
-    def list_alarm_request(self, key, message):
-        """List alarms request from SO to MON."""
-        # External to MON
-
-        self.publish(key,
-                     value=message,
-                     topic='alarm_request')
-
-    def notify_alarm(self, key, message):
-        """Notify of triggered alarm from MON to SO."""
-
-        self.publish(key,
-                     value=message,
-                     topic='alarm_response')
-
-    def list_alarm_response(self, key, message):
-        """Response for list alarms request from MON to SO."""
-
-        self.publish(key,
-                     value=message,
-                     topic='alarm_response')
-
-    def update_alarm_request(self, key, message):
-        """Update alarm request from SO to MON."""
-        # External to Mon
-
-        self.publish(key,
-                     value=message,
-                     topic='alarm_request')
-
-    def update_alarm_response(self, key, message):
-        """Response from update alarm request from MON to SO."""
-        # Internal to Mon
-
-        self.publish(key,
-                     value=message,
-                     topic='alarm_response')
-
-    def delete_alarm_request(self, key, message):
-        """Delete alarm request from SO to MON."""
-        # External to Mon
-
-        self.publish(key,
-                     value=message,
-                     topic='alarm_request')
-
-    def delete_alarm_response(self, key, message):
-        """Response for a delete alarm request from MON to SO."""
-        # Internal to Mon
-
         self.publish(key,
                      value=message,
                      topic='alarm_response')
 
-    def create_metrics_request(self, key, message):
+    def publish_metrics_request(self, key, message):
         """Create metrics request from SO to MON."""
         # External to Mon
 
@@ -165,7 +102,7 @@ class KafkaProducer(object):
                      value=message,
                      topic='metric_request')
 
-    def create_metrics_resp(self, key, message):
+    def publish_metrics_response(self, key, message):
         """Response for a create metric request from MON to SO."""
         # Internal to Mon
 
index dea2b06..40e7fe5 100644 (file)
@@ -87,12 +87,12 @@ class plugin_alarms():
                         log.debug("Alarm Already exists")
                         payload = json.dumps(config_resp)                                   
                         file = open('../../core/models/create_alarm_resp.json','wb').write((payload))
-                        self.producer.create_alarm_response(key='create_alarm_response',message=payload)
+                        self.producer.publish_alarm_response(key='create_alarm_response',message=payload)
              
                     else: 
                         payload = json.dumps(config_resp)                                
                         file = open('../../core/models/create_alarm_resp.json','wb').write((payload))                           
-                        self.producer.create_alarm_response(key='create_alarm_response',message=payload)
+                        self.producer.publish_alarm_response(key='create_alarm_response',message=payload)
                         log.info("New alarm created with alarm info: %s", config_resp)                           
              
                 else:
index dc687db..36b89e3 100644 (file)
@@ -91,7 +91,7 @@ class plugin_metrics():
                             metric_response['metric_create_response'] = metric_resp
                             payload = json.dumps(metric_response)                                                                  
                             file = open('../../core/models/create_metric_resp.json','wb').write((payload))
-                            self.producer.create_metrics_resp(key='create_metric_response',message=payload,topic = 'metric_response')
+                            self.producer.publish_metrics_response(key='create_metric_response', message=payload, topic ='metric_response')
                             
                             log.info("Metric configured: %s", metric_resp)
                             return metric_response
index 44b8fdd..f380b4e 100644 (file)
@@ -177,7 +177,7 @@ class Alarming(object):
                 log.exception("Error updating alarm")
                 raise e
             finally:
-                self._generate_and_send_response('create_alarm_response',
+                self._generate_and_send_response('update_alarm_response',
                                                  alarm_details['correlation_id'],
                                                  status=status,
                                                  alarm_id=alarm_id)
@@ -387,13 +387,13 @@ class Alarming(object):
             log.exception("Desired Gnocchi metric not found:", e)
             raise e
 
-    def _generate_and_send_response(self, topic, correlation_id, **kwargs):
+    def _generate_and_send_response(self, key, correlation_id, **kwargs):
         try:
             resp_message = self._response.generate_response(
-                topic, cor_id=correlation_id, **kwargs)
+                key, cor_id=correlation_id, **kwargs)
             log.info("Response Message: %s", resp_message)
-            self._producer.create_alarm_response(
-                topic, resp_message)
+            self._producer.publish_alarm_response(
+                key, resp_message)
         except Exception as e:
             log.exception("Response creation failed:")
             raise e
index 8f3f0fe..1de3284 100644 (file)
@@ -107,7 +107,7 @@ class NotifierHandler(BaseHTTPRequestHandler):
             sev=values['severity'],
             date=a_date,
             state=values['current'])
-        producer.notify_alarm(
+        producer.publish_alarm_response(
             'notify_alarm', resp_message)
         log.info("Sent alarm notification: %s", resp_message)
 
index 7af9427..825671e 100644 (file)
@@ -103,7 +103,7 @@ class Metrics(object):
                     metric_id=metric_id,
                     r_id=resource_id)
                 log.info("Response messages: %s", resp_message)
-                self._producer.create_metrics_resp(
+                self._producer.publish_metrics_response(
                     'create_metric_response', resp_message)
             except Exception as exc:
                 log.warning("Failed to create response: %s", exc)
index 32532e1..bdd2033 100644 (file)
@@ -33,7 +33,7 @@ from kafka.errors import KafkaError
 
 from osm_mon.core.auth import AuthManager
 from osm_mon.core.database import DatabaseManager, VimCredentials
-from osm_mon.core.message_bus.producer import KafkaProducer as prod
+from osm_mon.core.message_bus.producer import KafkaProducer as Producer
 from osm_mon.plugins.OpenStack import response
 from osm_mon.plugins.OpenStack.Aodh import alarming
 from osm_mon.plugins.OpenStack.common import Common
@@ -44,6 +44,10 @@ mock_creds = VimCredentials()
 mock_creds.config = '{}'
 
 
+@mock.patch.object(Producer, "publish_alarm_request", mock.Mock())
+@mock.patch.object(DatabaseManager, "save_alarm", mock.Mock())
+@mock.patch.object(Common, "get_auth_token", mock.Mock())
+@mock.patch.object(Common, "get_endpoint", mock.Mock())
 class AlarmIntegrationTest(unittest.TestCase):
     def setUp(self):
         try:
@@ -67,21 +71,20 @@ class AlarmIntegrationTest(unittest.TestCase):
         self.producer.close()
         self.req_consumer.close()
 
-    @mock.patch.object(Common, "get_auth_token", mock.Mock())
-    @mock.patch.object(Common, "get_endpoint", mock.Mock())
+    @mock.patch.object(Common, "perform_request")
     @mock.patch.object(AuthManager, 'get_credentials')
-    @mock.patch.object(prod, "update_alarm_response")
     @mock.patch.object(alarming.Alarming, "update_alarm")
     @mock.patch.object(response.OpenStack_Response, "generate_response")
-    def test_update_alarm_req(self, resp, update_alarm, update_resp, get_creds):
+    def test_update_alarm_req(self, resp, update_alarm, get_creds, perf_req):
         """Test Aodh update alarm request message from KafkaProducer."""
         # Set-up message, producer and consumer for tests
-        payload = {"alarm_update_request":
-                       {"correlation_id": 123,
-                        "alarm_uuid": "alarm_id",
-                        "metric_uuid": "metric_id"}}
+        payload = {"alarm_update_request": {"correlation_id": 123,
+                                            "alarm_uuid": "alarm_id",
+                                            "metric_uuid": "metric_id"}}
 
         get_creds.return_value = mock_creds
+        perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+        resp.return_value = ''
 
         self.producer.send('alarm_request', key="update_alarm_request",
                            value=json.dumps(payload))
@@ -89,79 +92,71 @@ class AlarmIntegrationTest(unittest.TestCase):
         for message in self.req_consumer:
             if message.key == "update_alarm_request":
                 # Mock a valid alarm update
-                update_alarm.return_value = "alarm_id", True
+                update_alarm.return_value = "alarm_id"
                 self.alarms.alarming(message, 'test_id')
 
                 # A response message is generated and sent via MON's producer
                 resp.assert_called_with(
                     'update_alarm_response', alarm_id="alarm_id", cor_id=123,
                     status=True)
-                update_resp.assert_called_with(
-                    'update_alarm_response', resp.return_value)
 
                 return
         self.fail("No message received in consumer")
 
-    @mock.patch.object(DatabaseManager, "save_alarm", mock.Mock())
-    @mock.patch.object(Common, "get_auth_token", mock.Mock())
-    @mock.patch.object(Common, "get_endpoint", mock.Mock())
+    @mock.patch.object(Common, "perform_request")
     @mock.patch.object(AuthManager, 'get_credentials')
-    @mock.patch.object(prod, "create_alarm_response")
     @mock.patch.object(alarming.Alarming, "configure_alarm")
     @mock.patch.object(response.OpenStack_Response, "generate_response")
-    def test_create_alarm_req(self, resp, config_alarm, create_resp, get_creds):
+    def test_create_alarm_req(self, resp, config_alarm, get_creds, perf_req):
         """Test Aodh create alarm request message from KafkaProducer."""
         # Set-up message, producer and consumer for tests
-        payload = {"alarm_create_request":
-                       {"correlation_id": 123,
-                        "alarm_name": "my_alarm",
-                        "metric_name": "my_metric",
-                        "resource_uuid": "my_resource",
-                        "severity": "WARNING",
-                        "threshold_value": 60,
-                        "operation": "GT",
-                        "vdu_name": "vdu",
-                        "vnf_member_index": "1",
-                        "ns_id": "1"}}
+        payload = {"alarm_create_request": {"correlation_id": 123,
+                                            "alarm_name": "my_alarm",
+                                            "metric_name": "cpu_utilization",
+                                            "resource_uuid": "my_resource",
+                                            "severity": "WARNING",
+                                            "threshold_value": 60,
+                                            "operation": "GT",
+                                            "vdu_name": "vdu",
+                                            "vnf_member_index": "1",
+                                            "ns_id": "1"}}
 
         get_creds.return_value = mock_creds
-
+        perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+        resp.return_value = ''
         self.producer.send('alarm_request', key="create_alarm_request",
                            value=json.dumps(payload))
 
         for message in self.req_consumer:
             if message.key == "create_alarm_request":
                 # Mock a valid alarm creation
-                config_alarm.return_value = "alarm_id", True
+                config_alarm.return_value = "alarm_id"
                 self.alarms.alarming(message, 'test_id')
 
                 # A response message is generated and sent via MON's produce
                 resp.assert_called_with(
                     'create_alarm_response', status=True, alarm_id="alarm_id",
                     cor_id=123)
-                create_resp.assert_called_with(
-                    'create_alarm_response', resp.return_value)
 
                 return
         self.fail("No message received in consumer")
 
-    @mock.patch.object(Common, "get_auth_token", mock.Mock())
-    @mock.patch.object(Common, "get_endpoint", mock.Mock())
+    @mock.patch.object(Common, "perform_request")
     @mock.patch.object(AuthManager, 'get_credentials')
-    @mock.patch.object(prod, "list_alarm_response")
     @mock.patch.object(alarming.Alarming, "list_alarms")
     @mock.patch.object(response.OpenStack_Response, "generate_response")
-    def test_list_alarm_req(self, resp, list_alarm, list_resp, get_creds):
+    def test_list_alarm_req(self, resp, list_alarm, get_creds, perf_req):
         """Test Aodh list alarm request message from KafkaProducer."""
         # Set-up message, producer and consumer for tests
-        payload = {"alarm_list_request":
-                       {"correlation_id": 123,
-                        "resource_uuid": "resource_id", }}
+        payload = {"alarm_list_request": {"correlation_id": 123,
+                                          "resource_uuid": "resource_id", }}
 
         self.producer.send('alarm_request', key="list_alarm_request",
                            value=json.dumps(payload))
 
         get_creds.return_value = mock_creds
+        perf_req.return_value = type('obj', (object,), {'text': json.dumps([])})
+        resp.return_value = ''
 
         for message in self.req_consumer:
             if message.key == "list_alarm_request":
@@ -173,30 +168,26 @@ class AlarmIntegrationTest(unittest.TestCase):
                 resp.assert_called_with(
                     'list_alarm_response', alarm_list=[],
                     cor_id=123)
-                # Producer attempts to send the response message back to the SO
-                list_resp.assert_called_with(
-                    'list_alarm_response', resp.return_value)
 
                 return
         self.fail("No message received in consumer")
 
-    @mock.patch.object(Common, "get_auth_token", mock.Mock())
-    @mock.patch.object(Common, "get_endpoint", mock.Mock())
+    @mock.patch.object(Common, "perform_request")
     @mock.patch.object(AuthManager, 'get_credentials')
     @mock.patch.object(alarming.Alarming, "delete_alarm")
-    @mock.patch.object(prod, "delete_alarm_response")
     @mock.patch.object(response.OpenStack_Response, "generate_response")
-    def test_delete_alarm_req(self, resp, del_resp, del_alarm, get_creds):
+    def test_delete_alarm_req(self, resp, del_alarm, get_creds, perf_req):
         """Test Aodh delete alarm request message from KafkaProducer."""
         # Set-up message, producer and consumer for tests
-        payload = {"alarm_delete_request":
-                       {"correlation_id": 123,
-                        "alarm_uuid": "alarm_id", }}
+        payload = {"alarm_delete_request": {"correlation_id": 123,
+                                            "alarm_uuid": "alarm_id", }}
 
         self.producer.send('alarm_request', key="delete_alarm_request",
                            value=json.dumps(payload))
 
         get_creds.return_value = mock_creds
+        perf_req.return_value = type('obj', (object,), {'text': json.dumps([])})
+        resp.return_value = ''
 
         for message in self.req_consumer:
             if message.key == "delete_alarm_request":
@@ -205,22 +196,17 @@ class AlarmIntegrationTest(unittest.TestCase):
                 # Response message is generated and sent by MON's producer
                 resp.assert_called_with(
                     'delete_alarm_response', alarm_id="alarm_id",
-                    status=del_alarm.return_value, cor_id=123)
-                del_resp.assert_called_with(
-                    'delete_alarm_response', resp.return_value)
+                    status=True, cor_id=123)
 
                 return
         self.fail("No message received in consumer")
 
-    @mock.patch.object(Common, "get_auth_token", mock.Mock())
-    @mock.patch.object(Common, "get_endpoint", mock.Mock())
     @mock.patch.object(AuthManager, 'get_credentials')
     @mock.patch.object(alarming.Alarming, "update_alarm_state")
     def test_ack_alarm_req(self, ack_alarm, get_creds):
         """Test Aodh acknowledge alarm request message from KafkaProducer."""
         # Set-up message, producer and consumer for tests
-        payload = {"ack_details":
-                       {"alarm_uuid": "alarm_id", }}
+        payload = {"ack_details": {"alarm_uuid": "alarm_id", }}
 
         self.producer.send('alarm_request', key="acknowledge_alarm",
                            value=json.dumps(payload))
index 45a34d3..eb672da 100644 (file)
@@ -29,7 +29,9 @@ import unittest
 
 from kafka.errors import KafkaError
 
-from osm_mon.core.message_bus.producer import KafkaProducer as prod
+from osm_mon.core.auth import AuthManager
+from osm_mon.core.database import VimCredentials
+from osm_mon.core.message_bus.producer import KafkaProducer as Producer
 
 from kafka import KafkaConsumer
 from kafka import KafkaProducer
@@ -44,7 +46,13 @@ from osm_mon.plugins.OpenStack.common import Common
 
 log = logging.getLogger(__name__)
 
+mock_creds = VimCredentials()
+mock_creds.config = '{}'
 
+
+@mock.patch.object(Producer, "publish_alarm_request", mock.Mock())
+@mock.patch.object(Common, "get_auth_token", mock.Mock())
+@mock.patch.object(Common, "get_endpoint", mock.Mock())
 class MetricIntegrationTest(unittest.TestCase):
     def setUp(self):
         # Set up common and alarming class instances
@@ -65,18 +73,21 @@ class MetricIntegrationTest(unittest.TestCase):
         except KafkaError:
             self.skipTest('Kafka server not present.')
 
-    @mock.patch.object(Common, "get_auth_token", mock.Mock())
-    @mock.patch.object(Common, "get_endpoint", mock.Mock())
+    @mock.patch.object(Common, "perform_request")
+    @mock.patch.object(AuthManager, 'get_credentials')
     @mock.patch.object(metrics.Metrics, "configure_metric")
-    @mock.patch.object(prod, "create_metrics_resp")
     @mock.patch.object(response.OpenStack_Response, "generate_response")
-    def test_create_metric_req(self, resp, create_resp, config_metric):
+    def test_create_metric_req(self, resp, config_metric, get_creds, perf_req):
         """Test Gnocchi create metric request message from producer."""
         # Set-up message, producer and consumer for tests
         payload = {"metric_create_request": {"correlation_id": 123,
                                              "metric_name": "cpu_utilization",
                                              "resource_uuid": "resource_id"}}
 
+        get_creds.return_value = mock_creds
+        perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+        resp.return_value = ''
+
         self.producer.send('metric_request', key="create_metric_request",
                            value=json.dumps(payload))
 
@@ -90,18 +101,15 @@ class MetricIntegrationTest(unittest.TestCase):
                 resp.assert_called_with(
                     'create_metric_response', status=True, cor_id=123,
                     metric_id="metric_id", r_id="resource_id")
-                create_resp.assert_called_with(
-                    'create_metric_response', resp.return_value)
 
                 return
         self.fail("No message received in consumer")
 
-    @mock.patch.object(Common, "get_auth_token", mock.Mock())
-    @mock.patch.object(Common, "get_endpoint", mock.Mock())
+    @mock.patch.object(Common, "perform_request")
+    @mock.patch.object(AuthManager, 'get_credentials')
     @mock.patch.object(metrics.Metrics, "delete_metric")
-    @mock.patch.object(prod, "delete_metric_response")
     @mock.patch.object(response.OpenStack_Response, "generate_response")
-    def test_delete_metric_req(self, resp, del_resp, del_metric):
+    def test_delete_metric_req(self, resp, del_metric, get_creds, perf_req):
         """Test Gnocchi delete metric request message from producer."""
         # Set-up message, producer and consumer for tests
         payload = {"vim_type": "OpenSTACK",
@@ -110,6 +118,10 @@ class MetricIntegrationTest(unittest.TestCase):
                    "metric_name": "cpu_utilization",
                    "resource_uuid": "resource_id"}
 
+        get_creds.return_value = mock_creds
+        perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+        resp.return_value = ''
+
         self.producer.send('metric_request', key="delete_metric_request",
                            value=json.dumps(payload))
 
@@ -121,21 +133,18 @@ class MetricIntegrationTest(unittest.TestCase):
 
                 # A response message is generated and sent by MON's producer
                 resp.assert_called_with(
-                    'delete_metric_response', m_id=None,
+                    'delete_metric_response', m_id='1',
                     m_name="cpu_utilization", status=True, r_id="resource_id",
                     cor_id=123)
-                del_resp.assert_called_with(
-                    'delete_metric_response', resp.return_value)
 
                 return
         self.fail("No message received in consumer")
 
-    @mock.patch.object(Common, "get_auth_token", mock.Mock())
-    @mock.patch.object(Common, "get_endpoint", mock.Mock())
+    @mock.patch.object(Common, "perform_request")
+    @mock.patch.object(AuthManager, 'get_credentials')
     @mock.patch.object(metrics.Metrics, "read_metric_data")
-    @mock.patch.object(prod, "read_metric_data_response")
     @mock.patch.object(response.OpenStack_Response, "generate_response")
-    def test_read_metric_data_req(self, resp, read_resp, read_data):
+    def test_read_metric_data_req(self, resp, read_data, get_creds, perf_req):
         """Test Gnocchi read metric data request message from producer."""
         # Set-up message, producer and consumer for tests
         payload = {"vim_type": "OpenSTACK",
@@ -144,6 +153,10 @@ class MetricIntegrationTest(unittest.TestCase):
                    "metric_name": "cpu_utilization",
                    "resource_uuid": "resource_id"}
 
+        get_creds.return_value = mock_creds
+        perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+        resp.return_value = ''
+
         self.producer.send('metric_request', key="read_metric_data_request",
                            value=json.dumps(payload))
 
@@ -156,21 +169,18 @@ class MetricIntegrationTest(unittest.TestCase):
 
                 # A response message is generated and sent by MON's producer
                 resp.assert_called_with(
-                    'read_metric_data_response', m_id=None,
+                    'read_metric_data_response', m_id='1',
                     m_name="cpu_utilization", r_id="resource_id", cor_id=123, times=[],
                     metrics=[])
-                read_resp.assert_called_with(
-                    'read_metric_data_response', resp.return_value)
 
                 return
         self.fail("No message received in consumer")
 
-    @mock.patch.object(Common, "get_auth_token", mock.Mock())
-    @mock.patch.object(Common, "get_endpoint", mock.Mock())
+    @mock.patch.object(Common, "perform_request")
+    @mock.patch.object(AuthManager, 'get_credentials')
     @mock.patch.object(metrics.Metrics, "list_metrics")
-    @mock.patch.object(prod, "list_metric_response")
     @mock.patch.object(response.OpenStack_Response, "generate_response")
-    def test_list_metrics_req(self, resp, list_resp, list_metrics):
+    def test_list_metrics_req(self, resp, list_metrics, get_creds, perf_req):
         """Test Gnocchi list metrics request message from producer."""
         # Set-up message, producer and consumer for tests
         payload = {"vim_type": "OpenSTACK",
@@ -178,6 +188,10 @@ class MetricIntegrationTest(unittest.TestCase):
                    "metrics_list_request":
                        {"correlation_id": 123, }}
 
+        get_creds.return_value = mock_creds
+        perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+        resp.return_value = ''
+
         self.producer.send('metric_request', key="list_metric_request",
                            value=json.dumps(payload))
 
@@ -191,24 +205,25 @@ class MetricIntegrationTest(unittest.TestCase):
                 # A response message is generated and sent by MON's producer
                 resp.assert_called_with(
                     'list_metric_response', m_list=[], cor_id=123)
-                list_resp.assert_called_with(
-                    'list_metric_response', resp.return_value)
 
                 return
         self.fail("No message received in consumer")
 
-    @mock.patch.object(Common, "get_auth_token", mock.Mock())
-    @mock.patch.object(Common, "get_endpoint", mock.Mock())
+    @mock.patch.object(Common, "perform_request")
+    @mock.patch.object(AuthManager, 'get_credentials')
     @mock.patch.object(metrics.Metrics, "get_metric_id")
-    @mock.patch.object(prod, "update_metric_response")
     @mock.patch.object(response.OpenStack_Response, "generate_response")
-    def test_update_metrics_req(self, resp, update_resp, get_id):
+    def test_update_metrics_req(self, resp, get_id, get_creds, perf_req):
         """Test Gnocchi update metric request message from KafkaProducer."""
         # Set-up message, producer and consumer for tests
         payload = {"metric_create_request": {"metric_name": "my_metric",
                                              "correlation_id": 123,
                                              "resource_uuid": "resource_id", }}
 
+        get_creds.return_value = mock_creds
+        perf_req.return_value = type('obj', (object,), {'text': json.dumps({"metrics": {"cpu_util": "1"}})})
+        resp.return_value = ''
+
         self.producer.send('metric_request', key="update_metric_request",
                            value=json.dumps(payload))
 
@@ -224,8 +239,6 @@ class MetricIntegrationTest(unittest.TestCase):
                 resp.assert_called_with(
                     'update_metric_response', status=False, cor_id=123,
                     r_id="resource_id", m_id="metric_id")
-                update_resp.assert_called_with(
-                    'update_metric_response', resp.return_value)
 
                 return
         self.fail("No message received in consumer")
index 1b2c64c..0841446 100644 (file)
@@ -119,8 +119,8 @@ class MockNotifierHandler(BaseHTTPRequestHandler):
                         r_id=resource_id,
                         sev=values['severity'], date=a_date,
                         state=values['current'], vim_type="OpenStack")
-                    self._producer.notify_alarm(
-                        'notify_alarm', resp_message, 'alarm_response')
+                    self._producer.publish_alarm_response(
+                        'notify_alarm', resp_message)
                 except Exception:
                     pass
 
@@ -154,7 +154,7 @@ def test_do_get():
 
 
 class AlarmNotificationTest(unittest.TestCase):
-    @mock.patch.object(KafkaProducer, "notify_alarm")
+    @mock.patch.object(KafkaProducer, "publish_alarm_response")
     @mock.patch.object(OpenStack_Response, "generate_response")
     @mock.patch.object(Common, "perform_request")
     @mock.patch.object(Common, "get_endpoint")
@@ -188,4 +188,4 @@ class AlarmNotificationTest(unittest.TestCase):
             vim_type="OpenStack")
 
         # Response message is sent back to the SO via MON's producer
-        notify.assert_called_with("notify_alarm", mock.ANY, "alarm_response")
+        notify.assert_called_with("notify_alarm", mock.ANY)
index a135cec..e2695d4 100644 (file)
@@ -100,7 +100,7 @@ class TestNotifier(unittest.TestCase):
         set_head.assert_called_once()
         notify.assert_called_with(post_data)
 
-    @mock.patch.object(KafkaProducer, "notify_alarm")
+    @mock.patch.object(KafkaProducer, "publish_alarm_response")
     @mock.patch.object(DatabaseManager, "get_alarm")
     def test_notify_alarm_valid_alarm(
             self, get_alarm, notify):
@@ -116,7 +116,7 @@ class TestNotifier(unittest.TestCase):
 
         notify.assert_called_with("notify_alarm", mock.ANY)
 
-    @mock.patch.object(KafkaProducer, "notify_alarm")
+    @mock.patch.object(KafkaProducer, "publish_alarm_response")
     @mock.patch.object(DatabaseManager, "get_alarm")
     def test_notify_alarm_invalid_alarm(
             self, get_alarm, notify):
index 56ac492..ddbdf8b 100644 (file)
@@ -1,12 +1,25 @@
 import unittest
 
 import mock
+from kafka import KafkaProducer
+from kafka.errors import KafkaError
 
 from osm_mon.core.database import VimCredentials
 from osm_mon.core.message_bus.common_consumer import *
 
 
+@mock.patch.object(dbmongo.DbMongo, "db_connect", mock.Mock())
 class CommonConsumerTest(unittest.TestCase):
+
+    def setUp(self):
+        try:
+            KafkaProducer(bootstrap_servers='localhost:9092',
+                          key_serializer=str.encode,
+                          value_serializer=str.encode
+                          )
+        except KafkaError:
+            self.skipTest('Kafka server not present.')
+
     @mock.patch.object(DatabaseManager, "get_credentials")
     def test_get_vim_type(self, get_creds):
         mock_creds = VimCredentials()
@@ -19,8 +32,8 @@ class CommonConsumerTest(unittest.TestCase):
 
         get_creds.return_value = mock_creds
 
-        db_manager = DatabaseManager()
-        vim_type = get_vim_type(db_manager, 'test_id')
+        common_consumer = CommonConsumer()
+        vim_type = common_consumer.get_vim_type('test_id')
 
         self.assertEqual(vim_type, 'openstack')
 
@@ -46,9 +59,8 @@ class CommonConsumerTest(unittest.TestCase):
                                 'created-time': 1526044312.0999322,
                                 'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
                                 'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'}
-
-        common_db = dbmongo.DbMongo()
-        vdur = get_vdur(common_db, '5ec3f571-d540-4cb0-9992-971d1b08312e', '1', 'ubuntuvnf_vnfd-VM')
+        common_consumer = CommonConsumer()
+        vdur = common_consumer.get_vdur('5ec3f571-d540-4cb0-9992-971d1b08312e', '1', 'ubuntuvnf_vnfd-VM')
         expected_vdur = {
             'internal-connection-point': [],
             'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
index 3a3b20c..cdd5dfc 100644 (file)
@@ -45,11 +45,10 @@ class PolicyModuleAgent:
 
         # Initialize Kafka consumer
         log.info("Connecting to Kafka server at %s", kafka_server)
-        # TODO: Add logic to handle deduplication of messages when using group_id.
-        # See: https://stackoverflow.com/a/29836412
         consumer = KafkaConsumer(bootstrap_servers=kafka_server,
                                  key_deserializer=bytes.decode,
-                                 value_deserializer=bytes.decode)
+                                 value_deserializer=bytes.decode,
+                                 group_id="pm-consumer")
         consumer.subscribe(['lcm_pm', 'alarm_response'])
 
         for message in consumer: