1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
22 # __author__ = "Helena McGough"
23 """Test an end to end Openstack access_credentials requests."""
30 from kafka
import KafkaConsumer
31 from kafka
import KafkaProducer
32 from kafka
.errors
import KafkaError
33 from keystoneclient
.v3
import client
35 from osm_mon
.plugins
.OpenStack
.Aodh
import alarming
36 from osm_mon
.plugins
.OpenStack
.common
import Common
38 log
= logging
.getLogger(__name__
)
41 # TODO: Remove this file
42 class AccessCredentialsTest(unittest
.TestCase
):
44 # Set up common and alarming class instances
45 self
.alarms
= alarming
.Alarming()
46 self
.openstack_auth
= Common()
49 self
.producer
= KafkaProducer(bootstrap_servers
='localhost:9092')
50 self
.req_consumer
= KafkaConsumer(bootstrap_servers
='localhost:9092',
52 consumer_timeout_ms
=2000)
53 self
.req_consumer
.subscribe(['access_credentials'])
55 self
.skipTest('Kafka server not present.')
57 @mock.patch
.object(client
, "Client")
58 def test_access_cred_req(self
, keyclient
):
59 """Test access credentials request message from KafkaProducer."""
60 # Set-up message, producer and consumer for tests
61 payload
= {"vim_type": "OpenStack",
63 {"openstack_site": "my_site",
65 "password": "my_password",
66 "vim_tenant_name": "my_tenant"}}
68 self
.producer
.send('access_credentials', value
=json
.dumps(payload
))
70 for message
in self
.req_consumer
:
71 # Check the vim desired by the message
72 vim_type
= json
.loads(message
.value
)["vim_type"].lower()
73 if vim_type
== "openstack":
74 self
.openstack_auth
._authenticate
(message
=message
)
76 # A keystone client is created with the valid access_credentials
77 keyclient
.assert_called_with(
78 auth_url
="my_site", username
="my_user", password
="my_password",
79 tenant_name
="my_tenant")