ddbdf8b796c50878e83cd79eaafadcdeef28e330
[osm/MON.git] / osm_mon / test / core / test_common_consumer.py
1 import unittest
2
3 import mock
4 from kafka import KafkaProducer
5 from kafka.errors import KafkaError
6
7 from osm_mon.core.database import VimCredentials
8 from osm_mon.core.message_bus.common_consumer import *
9
10
11 @mock.patch.object(dbmongo.DbMongo, "db_connect", mock.Mock())
12 class CommonConsumerTest(unittest.TestCase):
13
14 def setUp(self):
15 try:
16 KafkaProducer(bootstrap_servers='localhost:9092',
17 key_serializer=str.encode,
18 value_serializer=str.encode
19 )
20 except KafkaError:
21 self.skipTest('Kafka server not present.')
22
23 @mock.patch.object(DatabaseManager, "get_credentials")
24 def test_get_vim_type(self, get_creds):
25 mock_creds = VimCredentials()
26 mock_creds.id = 'test_id'
27 mock_creds.user = 'user'
28 mock_creds.url = 'url'
29 mock_creds.password = 'password'
30 mock_creds.tenant_name = 'tenant_name'
31 mock_creds.type = 'openstack'
32
33 get_creds.return_value = mock_creds
34
35 common_consumer = CommonConsumer()
36 vim_type = common_consumer.get_vim_type('test_id')
37
38 self.assertEqual(vim_type, 'openstack')
39
40 @mock.patch.object(dbmongo.DbMongo, "get_one")
41 def test_get_vdur(self, get_one):
42 get_one.return_value = {'_id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
43 '_admin': {
44 'projects_read': ['admin'], 'created': 1526044312.102287,
45 'modified': 1526044312.102287, 'projects_write': ['admin']
46 },
47 'vim-account-id': 'c1740601-7287-48c8-a2c9-bce8fee459eb',
48 'nsr-id-ref': '5ec3f571-d540-4cb0-9992-971d1b08312e',
49 'vdur': [
50 {
51 'internal-connection-point': [],
52 'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
53 'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
54 'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7'
55 }
56 ],
57 'vnfd-ref': 'ubuntuvnf_vnfd',
58 'member-vnf-index-ref': '1',
59 'created-time': 1526044312.0999322,
60 'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
61 'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'}
62 common_consumer = CommonConsumer()
63 vdur = common_consumer.get_vdur('5ec3f571-d540-4cb0-9992-971d1b08312e', '1', 'ubuntuvnf_vnfd-VM')
64 expected_vdur = {
65 'internal-connection-point': [],
66 'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
67 'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
68 'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7'
69 }
70
71 self.assertDictEqual(vdur, expected_vdur)
72
73
74 if __name__ == '__main__':
75 unittest.main()