1 # -*- coding: utf-8 -*-
3 # Copyright 2018 University of Bristol - High Performance Networks Research
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
35 from __future__
import unicode_literals
, print_function
38 from difflib
import unified_diff
39 from operator
import itemgetter
44 from mock
import MagicMock
, patch
46 from . import fixtures
as eg
47 from ...tests
.db_helpers
import (
48 TestCaseWithDatabasePerTest
,
52 from ..engine
import WimEngine
53 from ..persistence
import WimPersistence
54 from ..wim_thread
import WimThread
57 ignore_connector
= patch('osm_ro.wim.wim_thread.CONNECTORS', MagicMock())
61 return json
.dumps(value
, indent
=4, sort_keys
=True)
65 class TestWimThreadWithDb(TestCaseWithDatabasePerTest
):
67 super(TestWimThreadWithDb
, self
).setUp()
68 self
.persist
= WimPersistence(self
.db
)
70 account
= eg
.wim_account(0, 0)
72 self
.thread
= WimThread(self
.persist
, account
)
73 self
.thread
.connector
= MagicMock()
75 def assertTasksEqual(self
, left
, right
):
76 fields
= itemgetter('item', 'item_id', 'action', 'status')
77 left_
= (t
.as_dict() for t
in left
)
78 left_
= [fields(t
) for t
in left_
]
79 right_
= [fields(t
) for t
in right
]
82 self
.assertItemsEqual(left_
, right_
)
83 except AssertionError:
84 print('left', _repr(left
))
85 print('left', len(left_
), 'items')
86 print('right', len(right_
), 'items')
87 result
= list(unified_diff(_repr(sorted(left_
)).split('\n'),
88 _repr(sorted(right_
)).split('\n'),
90 print('diff:\n', '\n'.join(result
))
93 def test_reload_actions__all_create(self
):
94 # Given we have 3 CREATE actions stored in the database
95 actions
= eg
.wim_actions('CREATE',
96 action_id
=uuid('action0'), num_links
=3)
98 {'nfvo_tenants': eg
.tenant()}
101 eg
.instance_action(action_id
=uuid('action0'))},
102 {'vim_wim_actions': actions
}
105 # When we reload the tasks
106 self
.thread
.reload_actions()
107 # All of them should be inserted as pending
108 self
.assertTasksEqual(self
.thread
.pending_tasks
, actions
)
110 def test_reload_actions__all_refresh(self
):
111 # Given just DONE tasks are in the database
112 actions
= eg
.wim_actions(status
='DONE',
113 action_id
=uuid('action0'), num_links
=3)
115 {'nfvo_tenants': eg
.tenant()}
118 eg
.instance_action(action_id
=uuid('action0'))},
119 {'vim_wim_actions': actions
}
122 # When we reload the tasks
123 self
.thread
.reload_actions()
124 # All of them should be inserted as refresh
125 self
.assertTasksEqual(self
.thread
.refresh_tasks
, actions
)
127 def test_reload_actions__grouped(self
):
128 # Given we have 2 tasks for the same item in the database
129 kwargs
= {'action_id': uuid('action0')}
130 actions
= (eg
.wim_actions('CREATE', **kwargs
) +
131 eg
.wim_actions('FIND', **kwargs
))
132 for i
, action
in enumerate(actions
):
133 action
['task_index'] = i
136 {'nfvo_tenants': eg
.tenant()}
138 {'instance_actions': eg
.instance_action(**kwargs
)},
139 {'vim_wim_actions': actions
}
142 # When we reload the tasks
143 self
.thread
.reload_actions()
144 # Just one group should be created
145 self
.assertEqual(len(self
.thread
.grouped_tasks
.values()), 1)
147 def test_reload_actions__delete_scheduled(self
):
148 # Given we have 3 tasks for the same item in the database, but one of
149 # them is a DELETE task and it is SCHEDULED
150 kwargs
= {'action_id': uuid('action0')}
151 actions
= (eg
.wim_actions('CREATE', **kwargs
) +
152 eg
.wim_actions('FIND', **kwargs
) +
153 eg
.wim_actions('DELETE', status
='SCHEDULED', **kwargs
))
154 for i
, action
in enumerate(actions
):
155 action
['task_index'] = i
158 {'nfvo_tenants': eg
.tenant()}
160 {'instance_actions': eg
.instance_action(**kwargs
)},
161 {'vim_wim_actions': actions
}
164 # When we reload the tasks
165 self
.thread
.reload_actions()
166 # Just one group should be created
167 self
.assertEqual(len(self
.thread
.grouped_tasks
.values()), 1)
169 def test_reload_actions__delete_done(self
):
170 # Given we have 3 tasks for the same item in the database, but one of
171 # them is a DELETE task and it is not SCHEDULED
172 kwargs
= {'action_id': uuid('action0')}
173 actions
= (eg
.wim_actions('CREATE', **kwargs
) +
174 eg
.wim_actions('FIND', **kwargs
) +
175 eg
.wim_actions('DELETE', status
='DONE', **kwargs
))
176 for i
, action
in enumerate(actions
):
177 action
['task_index'] = i
180 {'nfvo_tenants': eg
.tenant()}
182 {'instance_actions': eg
.instance_action(**kwargs
)},
183 {'vim_wim_actions': actions
}
186 # When we reload the tasks
187 self
.thread
.reload_actions()
188 # No pending task should be found
189 self
.assertEqual(self
.thread
.pending_tasks
, [])
191 def test_reload_actions__batch(self
):
192 # Given the group_limit is 10, and we have 24
194 kwargs
= {'action_id': uuid('action0')}
195 actions
= (eg
.wim_actions('CREATE', num_links
=8, **kwargs
) +
196 eg
.wim_actions('FIND', num_links
=8, **kwargs
) +
197 eg
.wim_actions('FIND', num_links
=8, **kwargs
))
198 for i
, action
in enumerate(actions
):
199 action
['task_index'] = i
202 {'nfvo_tenants': eg
.tenant()}
204 {'instance_actions': eg
.instance_action(**kwargs
)},
205 {'vim_wim_actions': actions
}
208 # When we reload the tasks
209 self
.thread
.reload_actions(group_limit
)
211 # Then we should still see the actions in memory properly
212 self
.assertTasksEqual(self
.thread
.pending_tasks
, actions
)
213 self
.assertEqual(len(self
.thread
.grouped_tasks
.values()), 8)
215 @disable_foreign_keys
216 def test_process_list__refresh(self
):
217 update_wan_link
= MagicMock(wrap
=self
.persist
.update_wan_link
)
218 update_action
= MagicMock(wrap
=self
.persist
.update_wan_link
)
219 patches
= dict(update_wan_link
=update_wan_link
,
220 update_action
=update_action
)
222 with patch
.multiple(self
.persist
, **patches
):
223 # Given we have 2 tasks in the refresh queue
224 kwargs
= {'action_id': uuid('action0')}
225 actions
= (eg
.wim_actions('FIND', 'DONE', **kwargs
) +
226 eg
.wim_actions('CREATE', 'BUILD', **kwargs
))
227 for i
, action
in enumerate(actions
):
228 action
['task_index'] = i
231 [{'instance_wim_nets': eg
.instance_wim_nets()}] +
232 [{'instance_actions':
233 eg
.instance_action(num_tasks
=2, **kwargs
)}] +
234 [{'vim_wim_actions': actions
}])
236 self
.thread
.insert_pending_tasks(actions
)
238 # When we process the refresh list
239 processed
= self
.thread
.process_list('refresh')
241 # Then we should have 2 updates
242 self
.assertEqual(processed
, 2)
244 # And the database should be updated accordingly
245 self
.assertEqual(update_wan_link
.call_count
, 2)
246 self
.assertEqual(update_action
.call_count
, 2)
248 @disable_foreign_keys
249 def test_delete_superseed_create(self
):
250 # Given we insert a scheduled CREATE task
251 instance_action
= eg
.instance_action(num_tasks
=1)
252 self
.thread
.pending_tasks
= []
253 engine
= WimEngine(persistence
=self
.persist
)
254 self
.addCleanup(engine
.stop_threads
)
255 wan_links
= eg
.instance_wim_nets()
256 create_actions
= engine
.create_actions(wan_links
)
257 delete_actions
= engine
.delete_actions(wan_links
)
258 engine
.incorporate_actions(create_actions
+ delete_actions
,
261 self
.populate(instance_actions
=instance_action
,
262 vim_wim_actions
=create_actions
+ delete_actions
)
264 self
.thread
.insert_pending_tasks(create_actions
)
266 assert self
.thread
.pending_tasks
[0].is_scheduled
268 # When we insert the equivalent DELETE task
269 self
.thread
.insert_pending_tasks(delete_actions
)
271 # Then the CREATE task should be superseded
272 self
.assertEqual(self
.thread
.pending_tasks
[0].action
, 'CREATE')
273 assert self
.thread
.pending_tasks
[0].is_superseded
275 self
.thread
.process_list('pending')
276 self
.thread
.process_list('refresh')
277 self
.assertFalse(self
.thread
.pending_tasks
)
281 class TestWimThread(unittest
.TestCase
):
284 account
= eg
.wim_account(0, 0)
286 self
.persist
= MagicMock()
287 self
.thread
= WimThread(self
.persist
, account
)
288 self
.thread
.connector
= MagicMock()
290 super(TestWimThread
, self
).setUp()
292 def test_process_refresh(self
):
293 # Given we have 30 tasks in the refresh queue
294 kwargs
= {'action_id': uuid('action0')}
295 actions
= eg
.wim_actions('FIND', 'DONE', num_links
=30, **kwargs
)
296 self
.thread
.insert_pending_tasks(actions
)
298 # When we process the refresh list
299 processed
= self
.thread
.process_list('refresh')
301 # Then we should have REFRESH_BATCH updates
302 self
.assertEqual(processed
, self
.thread
.BATCH
)
304 def test_process_refresh__with_superseded(self
):
305 # Given we have 30 tasks but 15 of them are superseded
306 kwargs
= {'action_id': uuid('action0')}
307 actions
= eg
.wim_actions('FIND', 'DONE', num_links
=30, **kwargs
)
308 self
.thread
.insert_pending_tasks(actions
)
309 for task
in self
.thread
.refresh_tasks
[0:30:2]:
310 task
.status
= 'SUPERSEDED'
314 # When we call the refresh_elements
315 processed
= self
.thread
.process_list('refresh')
317 # Then we should have 25 updates (since SUPERSEDED updates are cheap,
318 # they are not counted for the limits)
319 self
.assertEqual(processed
, 25)
321 # The SUPERSEDED tasks should be removed, 5 tasks should be untouched,
322 # and 10 tasks should be rescheduled
323 refresh_tasks
= self
.thread
.refresh_tasks
324 old
= [t
for t
in refresh_tasks
if t
.process_at
<= now
]
325 new
= [t
for t
in refresh_tasks
if t
.process_at
> now
]
326 self
.assertEqual(len(old
), 5)
327 self
.assertEqual(len(new
), 10)
328 self
.assertEqual(len(self
.thread
.refresh_tasks
), 15)
331 if __name__
== '__main__':