inject_user_key routine fixes
[osm/RO.git] / osm_ro / wim / tests / test_persistence.py
1 # -*- coding: utf-8 -*-
2 ##
3 # Copyright 2018 University of Bristol - High Performance Networks Research
4 # Group
5 # All Rights Reserved.
6 #
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9 #
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
13 #
14 # http://www.apache.org/licenses/LICENSE-2.0
15 #
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
20 # under the License.
21 #
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
24 #
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
28 #
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
33 ##
34
35 from __future__ import unicode_literals
36
37 import unittest
38 from itertools import chain
39 from types import StringType
40
41 from six.moves import range
42
43 from . import fixtures as eg
44 from ...tests.db_helpers import (
45 TestCaseWithDatabasePerTest,
46 disable_foreign_keys,
47 uuid
48 )
49 from ..persistence import (
50 WimPersistence,
51 hide_confidential_fields,
52 serialize_fields,
53 unserialize_fields
54 )
55
56
57 class TestPersistenceUtils(unittest.TestCase):
58 def test_hide_confidential_fields(self):
59 example = {
60 'password': '123456',
61 'nested.password': '123456',
62 'nested.secret': None,
63 }
64 result = hide_confidential_fields(example,
65 fields=('password', 'secret'))
66 for field in 'password', 'nested.password':
67 assert result[field].startswith('***')
68 self.assertIs(result['nested.secret'], None)
69
70 def test_serialize_fields(self):
71 example = {
72 'config': dict(x=1),
73 'nested.info': [1, 2, 3],
74 'nested.config': None
75 }
76 result = serialize_fields(example, fields=('config', 'info'))
77 for field in 'config', 'nested.info':
78 self.assertIsInstance(result[field], StringType)
79 self.assertIs(result['nested.config'], None)
80
81 def test_unserialize_fields(self):
82 example = {
83 'config': '{"x": 1}',
84 'nested.info': '[1,2,3]',
85 'nested.config': None,
86 'confidential.info': '{"password": "abcdef"}'
87 }
88 result = unserialize_fields(example, fields=('config', 'info'))
89 self.assertEqual(result['config'], dict(x=1))
90 self.assertEqual(result['nested.info'], [1, 2, 3])
91 self.assertIs(result['nested.config'], None)
92 self.assertNotEqual(result['confidential.info']['password'], 'abcdef')
93 assert result['confidential.info']['password'].startswith('***')
94
95
96 class TestWimPersistence(TestCaseWithDatabasePerTest):
97 def setUp(self):
98 super(TestWimPersistence, self).setUp()
99 self.persist = WimPersistence(self.db)
100
101 def populate(self, seeds=None):
102 super(TestWimPersistence, self).populate(seeds or eg.consistent_set())
103
104 def test_query_offset(self):
105 # Given a database contains 4 records
106 self.populate([{'wims': [eg.wim(i) for i in range(4)]}])
107
108 # When we query using a limit of 2 and a offset of 1
109 results = self.persist.query('wims',
110 ORDER_BY='name', LIMIT=2, OFFSET=1)
111 # Then we should have 2 results, skipping the first record
112 names = [r['name'] for r in results]
113 self.assertItemsEqual(names, ['wim1', 'wim2'])
114
115 def test_get_wim_account_by_wim_tenant(self):
116 # Given a database contains WIM accounts associated to Tenants
117 self.populate()
118
119 # when we retrieve the account using wim and tenant
120 wim_account = self.persist.get_wim_account_by(
121 uuid('wim0'), uuid('tenant0'))
122
123 # then the right record should be returned
124 self.assertEqual(wim_account['uuid'], uuid('wim-account00'))
125 self.assertEqual(wim_account['name'], 'wim-account00')
126 self.assertEqual(wim_account['user'], 'user00')
127
128 def test_get_wim_account_by_wim_tenant__names(self):
129 # Given a database contains WIM accounts associated to Tenants
130 self.populate()
131
132 # when we retrieve the account using wim and tenant
133 wim_account = self.persist.get_wim_account_by(
134 'wim0', 'tenant0')
135
136 # then the right record should be returned
137 self.assertEqual(wim_account['uuid'], uuid('wim-account00'))
138 self.assertEqual(wim_account['name'], 'wim-account00')
139 self.assertEqual(wim_account['user'], 'user00')
140
141 def test_get_wim_accounts_by_wim(self):
142 # Given a database contains WIM accounts associated to Tenants
143 self.populate()
144
145 # when we retrieve the accounts using wim
146 wim_accounts = self.persist.get_wim_accounts_by(uuid('wim0'))
147
148 # then the right records should be returned
149 self.assertEqual(len(wim_accounts), eg.NUM_TENANTS)
150 for account in wim_accounts:
151 self.assertEqual(account['wim_id'], uuid('wim0'))
152
153 def test_get_wim_port_mappings(self):
154 # Given a database with WIMs, datacenters and port-mappings
155 self.populate()
156
157 # when we retrieve the port mappings for a list of datacenters
158 # using either names or uuids
159 for criteria in ([uuid('dc0'), uuid('dc1')], ['dc0', 'dc1']):
160 mappings = self.persist.get_wim_port_mappings(datacenter=criteria)
161
162 # then each result should have a datacenter_id
163 datacenters = [m['datacenter_id'] for m in mappings]
164 for datacenter in datacenters:
165 self.assertIn(datacenter, [uuid('dc0'), uuid('dc1')])
166
167 # a wim_id
168 wims = [m['wim_id'] for m in mappings]
169 for wim in wims:
170 self.assertIsNot(wim, None)
171
172 # and a array of pairs 'wan' <> 'pop' connections
173 pairs = chain(*(m['pop_wan_mappings'] for m in mappings))
174 self.assertEqual(len(list(pairs)), 2 * eg.NUM_WIMS)
175
176 def test_get_wim_port_mappings_multiple(self):
177 # Given we have more then one connection in a datacenter managed by the
178 # WIM
179 self.populate()
180 self.populate([{
181 'wim_port_mappings': [
182 eg.wim_port_mapping(
183 0, 0,
184 pop_dpid='CC:CC:CC:CC:CC:CC:CC:CC',
185 wan_dpid='DD:DD:DD:DD:DD:DD:DD:DD'),
186 eg.wim_port_mapping(
187 0, 0,
188 pop_dpid='EE:EE:EE:EE:EE:EE:EE:EE',
189 wan_dpid='FF:FF:FF:FF:FF:FF:FF:FF')]}])
190
191 # when we retrieve the port mappings for the wim and datacenter:
192 mappings = (
193 self.persist.get_wim_port_mappings(wim='wim0', datacenter='dc0'))
194
195 # then it should return just a single result, grouped by wim and
196 # datacenter
197 self.assertEqual(len(mappings), 1)
198 self.assertEqual(mappings[0]['wim_id'], uuid('wim0'))
199 self.assertEqual(mappings[0]['datacenter_id'], uuid('dc0'))
200
201 self.assertEqual(len(mappings[0]['pop_wan_mappings']), 3)
202
203 # when we retreive the mappings for more then one wim/datacenter
204 # the grouping should still work properly
205 mappings = self.persist.get_wim_port_mappings(
206 wim=['wim0', 'wim1'], datacenter=['dc0', 'dc1'])
207 self.assertEqual(len(mappings), 4)
208 pairs = chain(*(m['pop_wan_mappings'] for m in mappings))
209 self.assertEqual(len(list(pairs)), 6)
210
211 def test_get_actions_in_group(self):
212 # Given a good number of wim actions exist in the database
213 kwargs = {'action_id': uuid('action0')}
214 actions = (eg.wim_actions('CREATE', num_links=8, **kwargs) +
215 eg.wim_actions('FIND', num_links=8, **kwargs) +
216 eg.wim_actions('START', num_links=8, **kwargs))
217 for i, action in enumerate(actions):
218 action['task_index'] = i
219
220 self.populate([
221 {'nfvo_tenants': eg.tenant()}
222 ] + eg.wim_set() + [
223 {'instance_actions': eg.instance_action(**kwargs)},
224 {'vim_wim_actions': actions}
225 ])
226
227 # When we retrieve them in groups
228 limit = 5
229 results = self.persist.get_actions_in_groups(
230 uuid('wim-account00'), ['instance_wim_nets'], group_limit=limit)
231
232 # Then we should have N groups where N == limit
233 self.assertEqual(len(results), limit)
234 for _, task_list in results:
235 # And since for each link we have create 3 actions (create, find,
236 # start), we should find them in each group
237 self.assertEqual(len(task_list), 3)
238
239 @disable_foreign_keys
240 def test_update_instance_action_counters(self):
241 # Given we have one instance action in the database with 2 incomplete
242 # tasks
243 action = eg.instance_action(num_tasks=2)
244 self.populate([{'instance_actions': action}])
245 # When we update the done counter by 0, nothing should happen
246 self.persist.update_instance_action_counters(action['uuid'], done=0)
247 result = self.persist.get_by_uuid('instance_actions', action['uuid'])
248 self.assertEqual(result['number_done'], 0)
249 self.assertEqual(result['number_failed'], 0)
250 # When we update the done counter by 2, number_done should be 2
251 self.persist.update_instance_action_counters(action['uuid'], done=2)
252 result = self.persist.get_by_uuid('instance_actions', action['uuid'])
253 self.assertEqual(result['number_done'], 2)
254 self.assertEqual(result['number_failed'], 0)
255 # When we update the done counter by -1, and the failed counter by 1
256 self.persist.update_instance_action_counters(
257 action['uuid'], done=-1, failed=1)
258 # Then we should see 1 and 1
259 result = self.persist.get_by_uuid('instance_actions', action['uuid'])
260 self.assertEqual(result['number_done'], 1)
261 self.assertEqual(result['number_failed'], 1)
262
263
264 if __name__ == '__main__':
265 unittest.main()