feature8030 move WIM connector to plugins
[osm/RO.git] / RO / osm_ro / wim / tests / test_wim_thread.py
1 # -*- coding: utf-8 -*-
2 ##
3 # Copyright 2018 University of Bristol - High Performance Networks Research
4 # Group
5 # All Rights Reserved.
6 #
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9 #
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
13 #
14 # http://www.apache.org/licenses/LICENSE-2.0
15 #
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
20 # under the License.
21 #
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
24 #
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
28 #
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
33 ##
34
35 import unittest
36 from difflib import unified_diff
37 from operator import itemgetter
38 from time import time
39
40 import json
41
42 from unittest.mock import MagicMock, patch
43
44 from . import fixtures as eg
45 from ...tests.db_helpers import (
46 TestCaseWithDatabasePerTest,
47 disable_foreign_keys,
48 uuid
49 )
50 from ..engine import WimEngine
51 from ..persistence import WimPersistence
52 from ..wim_thread import WimThread
53
54
55 ignore_connector = patch('osm_ro.wim.wim_thread.CONNECTORS', MagicMock())
56
57
58 def _repr(value):
59 return json.dumps(value, indent=4, sort_keys=True)
60
61
62 @ignore_connector
63 class TestWimThreadWithDb(TestCaseWithDatabasePerTest):
64 def setUp(self):
65 super(TestWimThreadWithDb, self).setUp()
66 self.persist = WimPersistence(self.db)
67 wim = eg.wim(0)
68 account = eg.wim_account(0, 0)
69 account['wim'] = wim
70 self.thread = WimThread(self.persist, {}, account)
71 self.thread.connector = MagicMock()
72
73 def assertTasksEqual(self, left, right):
74 fields = itemgetter('item', 'item_id', 'action', 'status')
75 left_ = (t.as_dict() for t in left)
76 left_ = [fields(t) for t in left_]
77 right_ = [fields(t) for t in right]
78
79 try:
80 self.assertItemsEqual(left_, right_)
81 except AssertionError:
82 print('left', _repr(left))
83 print('left', len(left_), 'items')
84 print('right', len(right_), 'items')
85 result = list(unified_diff(_repr(sorted(left_)).split('\n'),
86 _repr(sorted(right_)).split('\n'),
87 'left', 'right'))
88 print('diff:\n', '\n'.join(result))
89 raise
90
91 def test_reload_actions__all_create(self):
92 # Given we have 3 CREATE actions stored in the database
93 actions = eg.wim_actions('CREATE',
94 action_id=uuid('action0'), num_links=3)
95 self.populate([
96 {'nfvo_tenants': eg.tenant()}
97 ] + eg.wim_set() + [
98 {'instance_actions':
99 eg.instance_action(action_id=uuid('action0'))},
100 {'vim_wim_actions': actions}
101 ])
102
103 # When we reload the tasks
104 self.thread.reload_actions()
105 # All of them should be inserted as pending
106 self.assertTasksEqual(self.thread.pending_tasks, actions)
107
108 def test_reload_actions__all_refresh(self):
109 # Given just DONE tasks are in the database
110 actions = eg.wim_actions(status='DONE',
111 action_id=uuid('action0'), num_links=3)
112 self.populate([
113 {'nfvo_tenants': eg.tenant()}
114 ] + eg.wim_set() + [
115 {'instance_actions':
116 eg.instance_action(action_id=uuid('action0'))},
117 {'vim_wim_actions': actions}
118 ])
119
120 # When we reload the tasks
121 self.thread.reload_actions()
122 # All of them should be inserted as refresh
123 self.assertTasksEqual(self.thread.refresh_tasks, actions)
124
125 def test_reload_actions__grouped(self):
126 # Given we have 2 tasks for the same item in the database
127 kwargs = {'action_id': uuid('action0')}
128 actions = (eg.wim_actions('CREATE', **kwargs) +
129 eg.wim_actions('FIND', **kwargs))
130 for i, action in enumerate(actions):
131 action['task_index'] = i
132
133 self.populate([
134 {'nfvo_tenants': eg.tenant()}
135 ] + eg.wim_set() + [
136 {'instance_actions': eg.instance_action(**kwargs)},
137 {'vim_wim_actions': actions}
138 ])
139
140 # When we reload the tasks
141 self.thread.reload_actions()
142 # Just one group should be created
143 self.assertEqual(len(self.thread.grouped_tasks.values()), 1)
144
145 def test_reload_actions__delete_scheduled(self):
146 # Given we have 3 tasks for the same item in the database, but one of
147 # them is a DELETE task and it is SCHEDULED
148 kwargs = {'action_id': uuid('action0')}
149 actions = (eg.wim_actions('CREATE', **kwargs) +
150 eg.wim_actions('FIND', **kwargs) +
151 eg.wim_actions('DELETE', status='SCHEDULED', **kwargs))
152 for i, action in enumerate(actions):
153 action['task_index'] = i
154
155 self.populate([
156 {'nfvo_tenants': eg.tenant()}
157 ] + eg.wim_set() + [
158 {'instance_actions': eg.instance_action(**kwargs)},
159 {'vim_wim_actions': actions}
160 ])
161
162 # When we reload the tasks
163 self.thread.reload_actions()
164 # Just one group should be created
165 self.assertEqual(len(self.thread.grouped_tasks.values()), 1)
166
167 def test_reload_actions__delete_done(self):
168 # Given we have 3 tasks for the same item in the database, but one of
169 # them is a DELETE task and it is not SCHEDULED
170 kwargs = {'action_id': uuid('action0')}
171 actions = (eg.wim_actions('CREATE', **kwargs) +
172 eg.wim_actions('FIND', **kwargs) +
173 eg.wim_actions('DELETE', status='DONE', **kwargs))
174 for i, action in enumerate(actions):
175 action['task_index'] = i
176
177 self.populate([
178 {'nfvo_tenants': eg.tenant()}
179 ] + eg.wim_set() + [
180 {'instance_actions': eg.instance_action(**kwargs)},
181 {'vim_wim_actions': actions}
182 ])
183
184 # When we reload the tasks
185 self.thread.reload_actions()
186 # No pending task should be found
187 self.assertEqual(self.thread.pending_tasks, [])
188
189 def test_reload_actions__batch(self):
190 # Given the group_limit is 10, and we have 24
191 group_limit = 10
192 kwargs = {'action_id': uuid('action0')}
193 actions = (eg.wim_actions('CREATE', num_links=8, **kwargs) +
194 eg.wim_actions('FIND', num_links=8, **kwargs) +
195 eg.wim_actions('FIND', num_links=8, **kwargs))
196 for i, action in enumerate(actions):
197 action['task_index'] = i
198
199 self.populate([
200 {'nfvo_tenants': eg.tenant()}
201 ] + eg.wim_set() + [
202 {'instance_actions': eg.instance_action(**kwargs)},
203 {'vim_wim_actions': actions}
204 ])
205
206 # When we reload the tasks
207 self.thread.reload_actions(group_limit)
208
209 # Then we should still see the actions in memory properly
210 self.assertTasksEqual(self.thread.pending_tasks, actions)
211 self.assertEqual(len(self.thread.grouped_tasks.values()), 8)
212
213 @disable_foreign_keys
214 def test_process_list__refresh(self):
215 update_wan_link = MagicMock(wrap=self.persist.update_wan_link)
216 update_action = MagicMock(wrap=self.persist.update_wan_link)
217 patches = dict(update_wan_link=update_wan_link,
218 update_action=update_action)
219
220 with patch.multiple(self.persist, **patches):
221 # Given we have 2 tasks in the refresh queue
222 kwargs = {'action_id': uuid('action0')}
223 actions = (eg.wim_actions('FIND', 'DONE', **kwargs) +
224 eg.wim_actions('CREATE', 'BUILD', **kwargs))
225 for i, action in enumerate(actions):
226 action['task_index'] = i
227
228 self.populate(
229 [{'instance_wim_nets': eg.instance_wim_nets()}] +
230 [{'instance_actions':
231 eg.instance_action(num_tasks=2, **kwargs)}] +
232 [{'vim_wim_actions': actions}])
233
234 self.thread.insert_pending_tasks(actions)
235
236 # When we process the refresh list
237 processed = self.thread.process_list('refresh')
238
239 # Then we should have 2 updates
240 self.assertEqual(processed, 2)
241
242 # And the database should be updated accordingly
243 self.assertEqual(update_wan_link.call_count, 2)
244 self.assertEqual(update_action.call_count, 2)
245
246 @disable_foreign_keys
247 def test_delete_superseed_create(self):
248 # Given we insert a scheduled CREATE task
249 instance_action = eg.instance_action(num_tasks=1)
250 self.thread.pending_tasks = []
251 engine = WimEngine(persistence=self.persist)
252 self.addCleanup(engine.stop_threads)
253 wan_links = eg.instance_wim_nets()
254 create_actions = engine.create_actions(wan_links)
255 delete_actions = engine.delete_actions(wan_links)
256 engine.incorporate_actions(create_actions + delete_actions,
257 instance_action)
258
259 self.populate(instance_actions=instance_action,
260 vim_wim_actions=create_actions + delete_actions)
261
262 self.thread.insert_pending_tasks(create_actions)
263
264 assert self.thread.pending_tasks[0].is_scheduled
265
266 # When we insert the equivalent DELETE task
267 self.thread.insert_pending_tasks(delete_actions)
268
269 # Then the CREATE task should be superseded
270 self.assertEqual(self.thread.pending_tasks[0].action, 'CREATE')
271 assert self.thread.pending_tasks[0].is_superseded
272
273 self.thread.process_list('pending')
274 self.thread.process_list('refresh')
275 self.assertFalse(self.thread.pending_tasks)
276
277
278 @ignore_connector
279 class TestWimThread(unittest.TestCase):
280 def setUp(self):
281 wim = eg.wim(0)
282 account = eg.wim_account(0, 0)
283 account['wim'] = wim
284 self.persist = MagicMock()
285 self.thread = WimThread(self.persist, {}, account)
286 self.thread.connector = MagicMock()
287
288 super(TestWimThread, self).setUp()
289
290 def test_process_refresh(self):
291 # Given we have 30 tasks in the refresh queue
292 kwargs = {'action_id': uuid('action0')}
293 actions = eg.wim_actions('FIND', 'DONE', num_links=30, **kwargs)
294 self.thread.insert_pending_tasks(actions)
295
296 # When we process the refresh list
297 processed = self.thread.process_list('refresh')
298
299 # Then we should have REFRESH_BATCH updates
300 self.assertEqual(processed, self.thread.BATCH)
301
302 def test_process_refresh__with_superseded(self):
303 # Given we have 30 tasks but 15 of them are superseded
304 kwargs = {'action_id': uuid('action0')}
305 actions = eg.wim_actions('FIND', 'DONE', num_links=30, **kwargs)
306 self.thread.insert_pending_tasks(actions)
307 for task in self.thread.refresh_tasks[0:30:2]:
308 task.status = 'SUPERSEDED'
309
310 now = time()
311
312 # When we call the refresh_elements
313 processed = self.thread.process_list('refresh')
314
315 # Then we should have 25 updates (since SUPERSEDED updates are cheap,
316 # they are not counted for the limits)
317 self.assertEqual(processed, 25)
318
319 # The SUPERSEDED tasks should be removed, 5 tasks should be untouched,
320 # and 10 tasks should be rescheduled
321 refresh_tasks = self.thread.refresh_tasks
322 old = [t for t in refresh_tasks if t.process_at <= now]
323 new = [t for t in refresh_tasks if t.process_at > now]
324 self.assertEqual(len(old), 5)
325 self.assertEqual(len(new), 10)
326 self.assertEqual(len(self.thread.refresh_tasks), 15)
327
328
329 if __name__ == '__main__':
330 unittest.main()