Adds support for storing and getting vim creds
[osm/MON.git] / osm_mon / test / integration / test_access_cred.py
index 532a2f6..231711b 100644 (file)
 """Test an end to end Openstack access_credentials requests."""
 
 import json
-
 import logging
+import unittest
 
+import mock
 from kafka import KafkaConsumer
 from kafka import KafkaProducer
-
+from kafka.errors import KafkaError
 from keystoneclient.v3 import client
 
-import mock
-
+from osm_mon.plugins.OpenStack.Aodh import alarming
 from osm_mon.plugins.OpenStack.common import Common
 
 log = logging.getLogger(__name__)
 
-# Create an instance of the common openstack class, producer and consumer
-openstack_auth = Common()
-
-producer = KafkaProducer(bootstrap_servers='localhost:9092')
-req_consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
-                             group_id='osm_mon')
-req_consumer.subscribe("access_credentials")
-
-
-@mock.patch.object(client, "Client")
-def test_access_cred_req(keyclient):
-    """Test access credentials request message from KafkaProducer."""
-    # Set-up message, producer and consumer for tests
-    payload = {"vim_type": "OpenStack",
-               "access_config":
-               {"openstack_site": "my_site",
-                "user": "my_user",
-                "password": "my_password",
-                "vim_tenant_name": "my_tenant"}}
-
-    producer.send('access_credentials', value=json.dumps(payload))
-
-    for message in req_consumer:
-        # Check the vim desired by the message
-        vim_type = json.loads(message.value)["vim_type"].lower()
-        if vim_type == "openstack":
-            openstack_auth._authenticate(message=message)
-
-            # A keystone client is created with the valid access_credentials
-            keyclient.assert_called_with(
-                auth_url="my_site", username="my_user", password="my_password",
-                tenant_name="my_tenant")
 
-            return
+# TODO: Remove this file
+class AccessCredentialsTest(unittest.TestCase):
+    def setUp(self):
+        # Set up common and alarming class instances
+        self.alarms = alarming.Alarming()
+        self.openstack_auth = Common()
+
+        try:
+            self.producer = KafkaProducer(bootstrap_servers='localhost:9092')
+            self.req_consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
+                                              group_id='osm_mon',
+                                              consumer_timeout_ms=2000)
+            self.req_consumer.subscribe(['access_credentials'])
+        except KafkaError:
+            self.skipTest('Kafka server not present.')
+
+    @mock.patch.object(client, "Client")
+    def test_access_cred_req(self, keyclient):
+        """Test access credentials request message from KafkaProducer."""
+        # Set-up message, producer and consumer for tests
+        payload = {"vim_type": "OpenStack",
+                   "access_config":
+                       {"openstack_site": "my_site",
+                        "user": "my_user",
+                        "password": "my_password",
+                        "vim_tenant_name": "my_tenant"}}
+
+        self.producer.send('access_credentials', value=json.dumps(payload))
+
+        for message in self.req_consumer:
+            # Check the vim desired by the message
+            vim_type = json.loads(message.value)["vim_type"].lower()
+            if vim_type == "openstack":
+                self.openstack_auth._authenticate(message=message)
+
+                # A keystone client is created with the valid access_credentials
+                keyclient.assert_called_with(
+                    auth_url="my_site", username="my_user", password="my_password",
+                    tenant_name="my_tenant")
+
+                return