1 # -*- coding: utf-8 -*-
3 # Copyright 2018 University of Bristol - High Performance Networks Research
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
35 from __future__
import unicode_literals
38 from itertools
import chain
39 from types
import StringType
41 from six
.moves
import range
43 from . import fixtures
as eg
44 from ...tests
.db_helpers
import (
45 TestCaseWithDatabasePerTest
,
49 from ..persistence
import (
51 hide_confidential_fields
,
57 class TestPersistenceUtils(unittest
.TestCase
):
58 def test_hide_confidential_fields(self
):
61 'nested.password': '123456',
62 'nested.secret': None,
64 result
= hide_confidential_fields(example
,
65 fields
=('password', 'secret'))
66 for field
in 'password', 'nested.password':
67 assert result
[field
].startswith('***')
68 self
.assertIs(result
['nested.secret'], None)
70 def test_serialize_fields(self
):
73 'nested.info': [1, 2, 3],
76 result
= serialize_fields(example
, fields
=('config', 'info'))
77 for field
in 'config', 'nested.info':
78 self
.assertIsInstance(result
[field
], StringType
)
79 self
.assertIs(result
['nested.config'], None)
81 def test_unserialize_fields(self
):
84 'nested.info': '[1,2,3]',
85 'nested.config': None,
86 'confidential.info': '{"password": "abcdef"}'
88 result
= unserialize_fields(example
, fields
=('config', 'info'))
89 self
.assertEqual(result
['config'], dict(x
=1))
90 self
.assertEqual(result
['nested.info'], [1, 2, 3])
91 self
.assertIs(result
['nested.config'], None)
92 self
.assertNotEqual(result
['confidential.info']['password'], 'abcdef')
93 assert result
['confidential.info']['password'].startswith('***')
96 class TestWimPersistence(TestCaseWithDatabasePerTest
):
98 super(TestWimPersistence
, self
).setUp()
99 self
.persist
= WimPersistence(self
.db
)
101 def populate(self
, seeds
=None):
102 super(TestWimPersistence
, self
).populate(seeds
or eg
.consistent_set())
104 def test_query_offset(self
):
105 # Given a database contains 4 records
106 self
.populate([{'wims': [eg
.wim(i
) for i
in range(4)]}])
108 # When we query using a limit of 2 and a offset of 1
109 results
= self
.persist
.query('wims',
110 ORDER_BY
='name', LIMIT
=2, OFFSET
=1)
111 # Then we should have 2 results, skipping the first record
112 names
= [r
['name'] for r
in results
]
113 self
.assertItemsEqual(names
, ['wim1', 'wim2'])
115 def test_get_wim_account_by_wim_tenant(self
):
116 # Given a database contains WIM accounts associated to Tenants
119 # when we retrieve the account using wim and tenant
120 wim_account
= self
.persist
.get_wim_account_by(
121 uuid('wim0'), uuid('tenant0'))
123 # then the right record should be returned
124 self
.assertEqual(wim_account
['uuid'], uuid('wim-account00'))
125 self
.assertEqual(wim_account
['name'], 'wim-account00')
126 self
.assertEqual(wim_account
['user'], 'user00')
128 def test_get_wim_account_by_wim_tenant__names(self
):
129 # Given a database contains WIM accounts associated to Tenants
132 # when we retrieve the account using wim and tenant
133 wim_account
= self
.persist
.get_wim_account_by(
136 # then the right record should be returned
137 self
.assertEqual(wim_account
['uuid'], uuid('wim-account00'))
138 self
.assertEqual(wim_account
['name'], 'wim-account00')
139 self
.assertEqual(wim_account
['user'], 'user00')
141 def test_get_wim_accounts_by_wim(self
):
142 # Given a database contains WIM accounts associated to Tenants
145 # when we retrieve the accounts using wim
146 wim_accounts
= self
.persist
.get_wim_accounts_by(uuid('wim0'))
148 # then the right records should be returned
149 self
.assertEqual(len(wim_accounts
), eg
.NUM_TENANTS
)
150 for account
in wim_accounts
:
151 self
.assertEqual(account
['wim_id'], uuid('wim0'))
153 def test_get_wim_port_mappings(self
):
154 # Given a database with WIMs, datacenters and port-mappings
157 # when we retrieve the port mappings for a list of datacenters
158 # using either names or uuids
159 for criteria
in ([uuid('dc0'), uuid('dc1')], ['dc0', 'dc1']):
160 mappings
= self
.persist
.get_wim_port_mappings(datacenter
=criteria
)
162 # then each result should have a datacenter_id
163 datacenters
= [m
['datacenter_id'] for m
in mappings
]
164 for datacenter
in datacenters
:
165 self
.assertIn(datacenter
, [uuid('dc0'), uuid('dc1')])
168 wims
= [m
['wim_id'] for m
in mappings
]
170 self
.assertIsNot(wim
, None)
172 # and a array of pairs 'wan' <> 'pop' connections
173 pairs
= chain(*(m
['wan_pop_port_mappings'] for m
in mappings
))
174 self
.assertEqual(len(list(pairs
)), 2 * eg
.NUM_WIMS
)
176 def test_get_wim_port_mappings_multiple(self
):
177 # Given we have more then one connection in a datacenter managed by the
181 'wim_port_mappings': [
184 pop_dpid
='CC:CC:CC:CC:CC:CC:CC:CC',
185 wan_dpid
='DD:DD:DD:DD:DD:DD:DD:DD'),
188 pop_dpid
='EE:EE:EE:EE:EE:EE:EE:EE',
189 wan_dpid
='FF:FF:FF:FF:FF:FF:FF:FF')]}])
191 # when we retrieve the port mappings for the wim and datacenter:
193 self
.persist
.get_wim_port_mappings(wim
='wim0', datacenter
='dc0'))
195 # then it should return just a single result, grouped by wim and
197 self
.assertEqual(len(mappings
), 1)
198 self
.assertEqual(mappings
[0]['wim_id'], uuid('wim0'))
199 self
.assertEqual(mappings
[0]['datacenter_id'], uuid('dc0'))
201 self
.assertEqual(len(mappings
[0]['wan_pop_port_mappings']), 3)
203 # when we retreive the mappings for more then one wim/datacenter
204 # the grouping should still work properly
205 mappings
= self
.persist
.get_wim_port_mappings(
206 wim
=['wim0', 'wim1'], datacenter
=['dc0', 'dc1'])
207 self
.assertEqual(len(mappings
), 4)
208 pairs
= chain(*(m
['wan_pop_port_mappings'] for m
in mappings
))
209 self
.assertEqual(len(list(pairs
)), 6)
211 def test_get_actions_in_group(self
):
212 # Given a good number of wim actions exist in the database
213 kwargs
= {'action_id': uuid('action0')}
214 actions
= (eg
.wim_actions('CREATE', num_links
=8, **kwargs
) +
215 eg
.wim_actions('FIND', num_links
=8, **kwargs
) +
216 eg
.wim_actions('START', num_links
=8, **kwargs
))
217 for i
, action
in enumerate(actions
):
218 action
['task_index'] = i
221 {'nfvo_tenants': eg
.tenant()}
223 {'instance_actions': eg
.instance_action(**kwargs
)},
224 {'vim_wim_actions': actions
}
227 # When we retrieve them in groups
229 results
= self
.persist
.get_actions_in_groups(
230 uuid('wim-account00'), ['instance_wim_nets'], group_limit
=limit
)
232 # Then we should have N groups where N == limit
233 self
.assertEqual(len(results
), limit
)
234 for _
, task_list
in results
:
235 # And since for each link we have create 3 actions (create, find,
236 # start), we should find them in each group
237 self
.assertEqual(len(task_list
), 3)
239 @disable_foreign_keys
240 def test_update_instance_action_counters(self
):
241 # Given we have one instance action in the database with 2 incomplete
243 action
= eg
.instance_action(num_tasks
=2)
244 self
.populate([{'instance_actions': action
}])
245 # When we update the done counter by 0, nothing should happen
246 self
.persist
.update_instance_action_counters(action
['uuid'], done
=0)
247 result
= self
.persist
.get_by_uuid('instance_actions', action
['uuid'])
248 self
.assertEqual(result
['number_done'], 0)
249 self
.assertEqual(result
['number_failed'], 0)
250 # When we update the done counter by 2, number_done should be 2
251 self
.persist
.update_instance_action_counters(action
['uuid'], done
=2)
252 result
= self
.persist
.get_by_uuid('instance_actions', action
['uuid'])
253 self
.assertEqual(result
['number_done'], 2)
254 self
.assertEqual(result
['number_failed'], 0)
255 # When we update the done counter by -1, and the failed counter by 1
256 self
.persist
.update_instance_action_counters(
257 action
['uuid'], done
=-1, failed
=1)
258 # Then we should see 1 and 1
259 result
= self
.persist
.get_by_uuid('instance_actions', action
['uuid'])
260 self
.assertEqual(result
['number_done'], 1)
261 self
.assertEqual(result
['number_failed'], 1)
264 if __name__
== '__main__':