inject_user_key routine fixes
[osm/RO.git] / osm_ro / wim / tests / test_wim_thread.py
1 # -*- coding: utf-8 -*-
2 ##
3 # Copyright 2018 University of Bristol - High Performance Networks Research
4 # Group
5 # All Rights Reserved.
6 #
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9 #
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
13 #
14 # http://www.apache.org/licenses/LICENSE-2.0
15 #
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
20 # under the License.
21 #
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
24 #
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
28 #
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
33 ##
34
35 from __future__ import unicode_literals, print_function
36
37 import unittest
38 from difflib import unified_diff
39 from operator import itemgetter
40 from time import time
41
42 import json
43
44 from mock import MagicMock, patch
45
46 from . import fixtures as eg
47 from ...tests.db_helpers import (
48 TestCaseWithDatabasePerTest,
49 disable_foreign_keys,
50 uuid
51 )
52 from ..engine import WimEngine
53 from ..persistence import WimPersistence
54 from ..wim_thread import WimThread
55
56
57 ignore_connector = patch('osm_ro.wim.wim_thread.CONNECTORS', MagicMock())
58
59
60 def _repr(value):
61 return json.dumps(value, indent=4, sort_keys=True)
62
63
64 @ignore_connector
65 class TestWimThreadWithDb(TestCaseWithDatabasePerTest):
66 def setUp(self):
67 super(TestWimThreadWithDb, self).setUp()
68 self.persist = WimPersistence(self.db)
69 wim = eg.wim(0)
70 account = eg.wim_account(0, 0)
71 account['wim'] = wim
72 self.thread = WimThread(self.persist, account)
73 self.thread.connector = MagicMock()
74
75 def assertTasksEqual(self, left, right):
76 fields = itemgetter('item', 'item_id', 'action', 'status')
77 left_ = (t.as_dict() for t in left)
78 left_ = [fields(t) for t in left_]
79 right_ = [fields(t) for t in right]
80
81 try:
82 self.assertItemsEqual(left_, right_)
83 except AssertionError:
84 print('left', _repr(left))
85 print('left', len(left_), 'items')
86 print('right', len(right_), 'items')
87 result = list(unified_diff(_repr(sorted(left_)).split('\n'),
88 _repr(sorted(right_)).split('\n'),
89 'left', 'right'))
90 print('diff:\n', '\n'.join(result))
91 raise
92
93 def test_reload_actions__all_create(self):
94 # Given we have 3 CREATE actions stored in the database
95 actions = eg.wim_actions('CREATE',
96 action_id=uuid('action0'), num_links=3)
97 self.populate([
98 {'nfvo_tenants': eg.tenant()}
99 ] + eg.wim_set() + [
100 {'instance_actions':
101 eg.instance_action(action_id=uuid('action0'))},
102 {'vim_wim_actions': actions}
103 ])
104
105 # When we reload the tasks
106 self.thread.reload_actions()
107 # All of them should be inserted as pending
108 self.assertTasksEqual(self.thread.pending_tasks, actions)
109
110 def test_reload_actions__all_refresh(self):
111 # Given just DONE tasks are in the database
112 actions = eg.wim_actions(status='DONE',
113 action_id=uuid('action0'), num_links=3)
114 self.populate([
115 {'nfvo_tenants': eg.tenant()}
116 ] + eg.wim_set() + [
117 {'instance_actions':
118 eg.instance_action(action_id=uuid('action0'))},
119 {'vim_wim_actions': actions}
120 ])
121
122 # When we reload the tasks
123 self.thread.reload_actions()
124 # All of them should be inserted as refresh
125 self.assertTasksEqual(self.thread.refresh_tasks, actions)
126
127 def test_reload_actions__grouped(self):
128 # Given we have 2 tasks for the same item in the database
129 kwargs = {'action_id': uuid('action0')}
130 actions = (eg.wim_actions('CREATE', **kwargs) +
131 eg.wim_actions('FIND', **kwargs))
132 for i, action in enumerate(actions):
133 action['task_index'] = i
134
135 self.populate([
136 {'nfvo_tenants': eg.tenant()}
137 ] + eg.wim_set() + [
138 {'instance_actions': eg.instance_action(**kwargs)},
139 {'vim_wim_actions': actions}
140 ])
141
142 # When we reload the tasks
143 self.thread.reload_actions()
144 # Just one group should be created
145 self.assertEqual(len(self.thread.grouped_tasks.values()), 1)
146
147 def test_reload_actions__delete_scheduled(self):
148 # Given we have 3 tasks for the same item in the database, but one of
149 # them is a DELETE task and it is SCHEDULED
150 kwargs = {'action_id': uuid('action0')}
151 actions = (eg.wim_actions('CREATE', **kwargs) +
152 eg.wim_actions('FIND', **kwargs) +
153 eg.wim_actions('DELETE', status='SCHEDULED', **kwargs))
154 for i, action in enumerate(actions):
155 action['task_index'] = i
156
157 self.populate([
158 {'nfvo_tenants': eg.tenant()}
159 ] + eg.wim_set() + [
160 {'instance_actions': eg.instance_action(**kwargs)},
161 {'vim_wim_actions': actions}
162 ])
163
164 # When we reload the tasks
165 self.thread.reload_actions()
166 # Just one group should be created
167 self.assertEqual(len(self.thread.grouped_tasks.values()), 1)
168
169 def test_reload_actions__delete_done(self):
170 # Given we have 3 tasks for the same item in the database, but one of
171 # them is a DELETE task and it is not SCHEDULED
172 kwargs = {'action_id': uuid('action0')}
173 actions = (eg.wim_actions('CREATE', **kwargs) +
174 eg.wim_actions('FIND', **kwargs) +
175 eg.wim_actions('DELETE', status='DONE', **kwargs))
176 for i, action in enumerate(actions):
177 action['task_index'] = i
178
179 self.populate([
180 {'nfvo_tenants': eg.tenant()}
181 ] + eg.wim_set() + [
182 {'instance_actions': eg.instance_action(**kwargs)},
183 {'vim_wim_actions': actions}
184 ])
185
186 # When we reload the tasks
187 self.thread.reload_actions()
188 # No pending task should be found
189 self.assertEqual(self.thread.pending_tasks, [])
190
191 def test_reload_actions__batch(self):
192 # Given the group_limit is 10, and we have 24
193 group_limit = 10
194 kwargs = {'action_id': uuid('action0')}
195 actions = (eg.wim_actions('CREATE', num_links=8, **kwargs) +
196 eg.wim_actions('FIND', num_links=8, **kwargs) +
197 eg.wim_actions('FIND', num_links=8, **kwargs))
198 for i, action in enumerate(actions):
199 action['task_index'] = i
200
201 self.populate([
202 {'nfvo_tenants': eg.tenant()}
203 ] + eg.wim_set() + [
204 {'instance_actions': eg.instance_action(**kwargs)},
205 {'vim_wim_actions': actions}
206 ])
207
208 # When we reload the tasks
209 self.thread.reload_actions(group_limit)
210
211 # Then we should still see the actions in memory properly
212 self.assertTasksEqual(self.thread.pending_tasks, actions)
213 self.assertEqual(len(self.thread.grouped_tasks.values()), 8)
214
215 @disable_foreign_keys
216 def test_process_list__refresh(self):
217 update_wan_link = MagicMock(wrap=self.persist.update_wan_link)
218 update_action = MagicMock(wrap=self.persist.update_wan_link)
219 patches = dict(update_wan_link=update_wan_link,
220 update_action=update_action)
221
222 with patch.multiple(self.persist, **patches):
223 # Given we have 2 tasks in the refresh queue
224 kwargs = {'action_id': uuid('action0')}
225 actions = (eg.wim_actions('FIND', 'DONE', **kwargs) +
226 eg.wim_actions('CREATE', 'BUILD', **kwargs))
227 for i, action in enumerate(actions):
228 action['task_index'] = i
229
230 self.populate(
231 [{'instance_wim_nets': eg.instance_wim_nets()}] +
232 [{'instance_actions':
233 eg.instance_action(num_tasks=2, **kwargs)}] +
234 [{'vim_wim_actions': actions}])
235
236 self.thread.insert_pending_tasks(actions)
237
238 # When we process the refresh list
239 processed = self.thread.process_list('refresh')
240
241 # Then we should have 2 updates
242 self.assertEqual(processed, 2)
243
244 # And the database should be updated accordingly
245 self.assertEqual(update_wan_link.call_count, 2)
246 self.assertEqual(update_action.call_count, 2)
247
248 @disable_foreign_keys
249 def test_delete_superseed_create(self):
250 # Given we insert a scheduled CREATE task
251 instance_action = eg.instance_action(num_tasks=1)
252 self.thread.pending_tasks = []
253 engine = WimEngine(persistence=self.persist)
254 self.addCleanup(engine.stop_threads)
255 wan_links = eg.instance_wim_nets()
256 create_actions = engine.create_actions(wan_links)
257 delete_actions = engine.delete_actions(wan_links)
258 engine.incorporate_actions(create_actions + delete_actions,
259 instance_action)
260
261 self.populate(instance_actions=instance_action,
262 vim_wim_actions=create_actions + delete_actions)
263
264 self.thread.insert_pending_tasks(create_actions)
265
266 assert self.thread.pending_tasks[0].is_scheduled
267
268 # When we insert the equivalent DELETE task
269 self.thread.insert_pending_tasks(delete_actions)
270
271 # Then the CREATE task should be superseded
272 self.assertEqual(self.thread.pending_tasks[0].action, 'CREATE')
273 assert self.thread.pending_tasks[0].is_superseded
274
275 self.thread.process_list('pending')
276 self.thread.process_list('refresh')
277 self.assertFalse(self.thread.pending_tasks)
278
279
280 @ignore_connector
281 class TestWimThread(unittest.TestCase):
282 def setUp(self):
283 wim = eg.wim(0)
284 account = eg.wim_account(0, 0)
285 account['wim'] = wim
286 self.persist = MagicMock()
287 self.thread = WimThread(self.persist, account)
288 self.thread.connector = MagicMock()
289
290 super(TestWimThread, self).setUp()
291
292 def test_process_refresh(self):
293 # Given we have 30 tasks in the refresh queue
294 kwargs = {'action_id': uuid('action0')}
295 actions = eg.wim_actions('FIND', 'DONE', num_links=30, **kwargs)
296 self.thread.insert_pending_tasks(actions)
297
298 # When we process the refresh list
299 processed = self.thread.process_list('refresh')
300
301 # Then we should have REFRESH_BATCH updates
302 self.assertEqual(processed, self.thread.BATCH)
303
304 def test_process_refresh__with_superseded(self):
305 # Given we have 30 tasks but 15 of them are superseded
306 kwargs = {'action_id': uuid('action0')}
307 actions = eg.wim_actions('FIND', 'DONE', num_links=30, **kwargs)
308 self.thread.insert_pending_tasks(actions)
309 for task in self.thread.refresh_tasks[0:30:2]:
310 task.status = 'SUPERSEDED'
311
312 now = time()
313
314 # When we call the refresh_elements
315 processed = self.thread.process_list('refresh')
316
317 # Then we should have 25 updates (since SUPERSEDED updates are cheap,
318 # they are not counted for the limits)
319 self.assertEqual(processed, 25)
320
321 # The SUPERSEDED tasks should be removed, 5 tasks should be untouched,
322 # and 10 tasks should be rescheduled
323 refresh_tasks = self.thread.refresh_tasks
324 old = [t for t in refresh_tasks if t.process_at <= now]
325 new = [t for t in refresh_tasks if t.process_at > now]
326 self.assertEqual(len(old), 5)
327 self.assertEqual(len(new), 10)
328 self.assertEqual(len(self.thread.refresh_tasks), 15)
329
330
331 if __name__ == '__main__':
332 unittest.main()