+# TODO: Remove this file
+class AccessCredentialsTest(unittest.TestCase):
+ def setUp(self):
+ # Set up common and alarming class instances
+ self.alarms = alarming.Alarming()
+ self.openstack_auth = Common()
+
+ try:
+ self.producer = KafkaProducer(bootstrap_servers='localhost:9092')
+ self.req_consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
+ group_id='osm_mon',
+ consumer_timeout_ms=2000)
+ self.req_consumer.subscribe(['access_credentials'])
+ except KafkaError:
+ self.skipTest('Kafka server not present.')
+
+ @mock.patch.object(client, "Client")
+ def test_access_cred_req(self, keyclient):
+ """Test access credentials request message from KafkaProducer."""
+ # Set-up message, producer and consumer for tests
+ payload = {"vim_type": "OpenStack",
+ "access_config":
+ {"openstack_site": "my_site",
+ "user": "my_user",
+ "password": "my_password",
+ "vim_tenant_name": "my_tenant"}}
+
+ self.producer.send('access_credentials', value=json.dumps(payload))
+
+ for message in self.req_consumer:
+ # Check the vim desired by the message
+ vim_type = json.loads(message.value)["vim_type"].lower()
+ if vim_type == "openstack":
+ self.openstack_auth._authenticate(message=message)
+
+ # A keystone client is created with the valid access_credentials
+ keyclient.assert_called_with(
+ auth_url="my_site", username="my_user", password="my_password",
+ tenant_name="my_tenant")
+
+ return