1 # -*- coding: utf-8 -*-
3 # Copyright 2018 University of Bristol - High Performance Networks Research
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
36 from itertools
import chain
38 from . import fixtures
as eg
39 from ...tests
.db_helpers
import (
40 TestCaseWithDatabasePerTest
,
44 from ..persistence
import (
46 hide_confidential_fields
,
52 class TestPersistenceUtils(unittest
.TestCase
):
53 def test_hide_confidential_fields(self
):
56 'nested.password': '123456',
57 'nested.secret': None,
59 result
= hide_confidential_fields(example
,
60 fields
=('password', 'secret'))
61 for field
in 'password', 'nested.password':
62 assert result
[field
].startswith('***')
63 self
.assertIs(result
['nested.secret'], None)
65 def test_serialize_fields(self
):
68 'nested.info': [1, 2, 3],
71 result
= serialize_fields(example
, fields
=('config', 'info'))
72 for field
in 'config', 'nested.info':
73 self
.assertIsInstance(result
[field
], str)
74 self
.assertIs(result
['nested.config'], None)
76 def test_unserialize_fields(self
):
79 'nested.info': '[1,2,3]',
80 'nested.config': None,
81 'confidential.info': '{"password": "abcdef"}'
83 result
= unserialize_fields(example
, fields
=('config', 'info'))
84 self
.assertEqual(result
['config'], dict(x
=1))
85 self
.assertEqual(result
['nested.info'], [1, 2, 3])
86 self
.assertIs(result
['nested.config'], None)
87 self
.assertNotEqual(result
['confidential.info']['password'], 'abcdef')
88 assert result
['confidential.info']['password'].startswith('***')
91 class TestWimPersistence(TestCaseWithDatabasePerTest
):
93 super(TestWimPersistence
, self
).setUp()
94 self
.persist
= WimPersistence(self
.db
)
96 def populate(self
, seeds
=None):
97 super(TestWimPersistence
, self
).populate(seeds
or eg
.consistent_set())
99 def test_query_offset(self
):
100 # Given a database contains 4 records
101 self
.populate([{'wims': [eg
.wim(i
) for i
in range(4)]}])
103 # When we query using a limit of 2 and a offset of 1
104 results
= self
.persist
.query('wims',
105 ORDER_BY
='name', LIMIT
=2, OFFSET
=1)
106 # Then we should have 2 results, skipping the first record
107 names
= [r
['name'] for r
in results
]
108 self
.assertItemsEqual(names
, ['wim1', 'wim2'])
110 def test_get_wim_account_by_wim_tenant(self
):
111 # Given a database contains WIM accounts associated to Tenants
114 # when we retrieve the account using wim and tenant
115 wim_account
= self
.persist
.get_wim_account_by(
116 uuid('wim0'), uuid('tenant0'))
118 # then the right record should be returned
119 self
.assertEqual(wim_account
['uuid'], uuid('wim-account00'))
120 self
.assertEqual(wim_account
['name'], 'wim-account00')
121 self
.assertEqual(wim_account
['user'], 'user00')
123 def test_get_wim_account_by_wim_tenant__names(self
):
124 # Given a database contains WIM accounts associated to Tenants
127 # when we retrieve the account using wim and tenant
128 wim_account
= self
.persist
.get_wim_account_by(
131 # then the right record should be returned
132 self
.assertEqual(wim_account
['uuid'], uuid('wim-account00'))
133 self
.assertEqual(wim_account
['name'], 'wim-account00')
134 self
.assertEqual(wim_account
['user'], 'user00')
136 def test_get_wim_accounts_by_wim(self
):
137 # Given a database contains WIM accounts associated to Tenants
140 # when we retrieve the accounts using wim
141 wim_accounts
= self
.persist
.get_wim_accounts_by(uuid('wim0'))
143 # then the right records should be returned
144 self
.assertEqual(len(wim_accounts
), eg
.NUM_TENANTS
)
145 for account
in wim_accounts
:
146 self
.assertEqual(account
['wim_id'], uuid('wim0'))
148 def test_get_wim_port_mappings(self
):
149 # Given a database with WIMs, datacenters and port-mappings
152 # when we retrieve the port mappings for a list of datacenters
153 # using either names or uuids
154 for criteria
in ([uuid('dc0'), uuid('dc1')], ['dc0', 'dc1']):
155 mappings
= self
.persist
.get_wim_port_mappings(datacenter
=criteria
)
157 # then each result should have a datacenter_id
158 datacenters
= [m
['datacenter_id'] for m
in mappings
]
159 for datacenter
in datacenters
:
160 self
.assertIn(datacenter
, [uuid('dc0'), uuid('dc1')])
163 wims
= [m
['wim_id'] for m
in mappings
]
165 self
.assertIsNot(wim
, None)
167 # and a array of pairs 'wan' <> 'pop' connections
168 pairs
= chain(*(m
['pop_wan_mappings'] for m
in mappings
))
169 self
.assertEqual(len(list(pairs
)), 2 * eg
.NUM_WIMS
)
171 def test_get_wim_port_mappings_multiple(self
):
172 # Given we have more then one connection in a datacenter managed by the
176 'wim_port_mappings': [
179 pop_dpid
='CC:CC:CC:CC:CC:CC:CC:CC',
180 wan_dpid
='DD:DD:DD:DD:DD:DD:DD:DD'),
183 pop_dpid
='EE:EE:EE:EE:EE:EE:EE:EE',
184 wan_dpid
='FF:FF:FF:FF:FF:FF:FF:FF')]}])
186 # when we retrieve the port mappings for the wim and datacenter:
188 self
.persist
.get_wim_port_mappings(wim
='wim0', datacenter
='dc0'))
190 # then it should return just a single result, grouped by wim and
192 self
.assertEqual(len(mappings
), 1)
193 self
.assertEqual(mappings
[0]['wim_id'], uuid('wim0'))
194 self
.assertEqual(mappings
[0]['datacenter_id'], uuid('dc0'))
196 self
.assertEqual(len(mappings
[0]['pop_wan_mappings']), 3)
198 # when we retreive the mappings for more then one wim/datacenter
199 # the grouping should still work properly
200 mappings
= self
.persist
.get_wim_port_mappings(
201 wim
=['wim0', 'wim1'], datacenter
=['dc0', 'dc1'])
202 self
.assertEqual(len(mappings
), 4)
203 pairs
= chain(*(m
['pop_wan_mappings'] for m
in mappings
))
204 self
.assertEqual(len(list(pairs
)), 6)
206 def test_get_actions_in_group(self
):
207 # Given a good number of wim actions exist in the database
208 kwargs
= {'action_id': uuid('action0')}
209 actions
= (eg
.wim_actions('CREATE', num_links
=8, **kwargs
) +
210 eg
.wim_actions('FIND', num_links
=8, **kwargs
) +
211 eg
.wim_actions('START', num_links
=8, **kwargs
))
212 for i
, action
in enumerate(actions
):
213 action
['task_index'] = i
216 {'nfvo_tenants': eg
.tenant()}
218 {'instance_actions': eg
.instance_action(**kwargs
)},
219 {'vim_wim_actions': actions
}
222 # When we retrieve them in groups
224 results
= self
.persist
.get_actions_in_groups(
225 uuid('wim-account00'), ['instance_wim_nets'], group_limit
=limit
)
227 # Then we should have N groups where N == limit
228 self
.assertEqual(len(results
), limit
)
229 for _
, task_list
in results
:
230 # And since for each link we have create 3 actions (create, find,
231 # start), we should find them in each group
232 self
.assertEqual(len(task_list
), 3)
234 @disable_foreign_keys
235 def test_update_instance_action_counters(self
):
236 # Given we have one instance action in the database with 2 incomplete
238 action
= eg
.instance_action(num_tasks
=2)
239 self
.populate([{'instance_actions': action
}])
240 # When we update the done counter by 0, nothing should happen
241 self
.persist
.update_instance_action_counters(action
['uuid'], done
=0)
242 result
= self
.persist
.get_by_uuid('instance_actions', action
['uuid'])
243 self
.assertEqual(result
['number_done'], 0)
244 self
.assertEqual(result
['number_failed'], 0)
245 # When we update the done counter by 2, number_done should be 2
246 self
.persist
.update_instance_action_counters(action
['uuid'], done
=2)
247 result
= self
.persist
.get_by_uuid('instance_actions', action
['uuid'])
248 self
.assertEqual(result
['number_done'], 2)
249 self
.assertEqual(result
['number_failed'], 0)
250 # When we update the done counter by -1, and the failed counter by 1
251 self
.persist
.update_instance_action_counters(
252 action
['uuid'], done
=-1, failed
=1)
253 # Then we should see 1 and 1
254 result
= self
.persist
.get_by_uuid('instance_actions', action
['uuid'])
255 self
.assertEqual(result
['number_done'], 1)
256 self
.assertEqual(result
['number_failed'], 1)
259 if __name__
== '__main__':