Adds Dockerfile
[osm/MON.git] / osm_mon / test / functional / test_vim_account.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24
25 """Test an end to end Openstack vim_account requests."""
26
27 import json
28 import logging
29 import time
30 import unittest
31
32 from kafka import KafkaConsumer
33 from kafka import KafkaProducer
34 from kafka.errors import KafkaError
35
36 from osm_mon.core.auth import AuthManager
37
38 log = logging.getLogger(__name__)
39
40
41 class VimAccountTest(unittest.TestCase):
42 def setUp(self):
43 try:
44 self.producer = KafkaProducer(bootstrap_servers='localhost:9092')
45 self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
46 group_id='osm_mon')
47 self.consumer.subscribe(['vim_account'])
48 self.auth_manager = AuthManager()
49 except KafkaError:
50 self.skipTest('Kafka server not present.')
51
52 @unittest.skip("Correct support for functional tests is pending.")
53 # TODO: Refactor
54 def test_create_edit_delete_vim_account(self):
55 """Test vim_account creation message from KafkaProducer."""
56 # Set-up message, producer and consumer for tests
57 create_payload = {
58 "_id": "test_id",
59 "name": "test_name",
60 "vim_type": "openstack",
61 "vim_url": "auth_url",
62 "vim_user": "user",
63 "vim_password": "password",
64 "vim_tenant_name": "tenant",
65 "config":
66 {
67 "foo": "bar"
68 }
69 }
70
71 self.producer.send('vim_account', key=b'create', value=json.dumps(create_payload))
72
73 self.producer.flush()
74
75 time.sleep(1)
76 creds = self.auth_manager.get_credentials('test_id')
77 self.assertIsNotNone(creds)
78 self.assertEqual(creds.name, create_payload['name'])
79 self.assertEqual(json.loads(creds.config), create_payload['config'])
80
81 # Set-up message, producer and consumer for tests
82 edit_payload = {
83 "_id": "test_id",
84 "name": "test_name_edited",
85 "vim_type": "openstack",
86 "vim_url": "auth_url",
87 "vim_user": "user",
88 "vim_password": "password",
89 "vim_tenant_name": "tenant",
90 "config":
91 {
92 "foo_edited": "bar_edited"
93 }
94 }
95
96 self.producer.send('vim_account', key=b'edit', value=json.dumps(edit_payload))
97
98 self.producer.flush()
99
100 time.sleep(1)
101 creds = self.auth_manager.get_credentials('test_id')
102 self.assertEqual(creds.name, edit_payload['name'])
103 self.assertEqual(json.loads(creds.config), edit_payload['config'])
104
105 delete_payload = {
106 "_id": "test_id"
107 }
108
109 self.producer.send('vim_account', key=b'delete', value=json.dumps(delete_payload))
110
111 self.producer.flush()
112
113 time.sleep(1)
114 creds = self.auth_manager.get_credentials('test_id')
115 self.assertIsNone(creds)