1 # -*- coding: utf-8 -*-
3 # Copyright 2018 University of Bristol - High Performance Networks Research
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
36 from difflib
import unified_diff
37 from operator
import itemgetter
42 from unittest
.mock
import MagicMock
, patch
44 from . import fixtures
as eg
45 from ...tests
.db_helpers
import (
46 TestCaseWithDatabasePerTest
,
50 from ..engine
import WimEngine
51 from ..persistence
import WimPersistence
52 from ..wim_thread
import WimThread
55 ignore_connector
= patch('osm_ro.wim.wim_thread.CONNECTORS', MagicMock())
59 return json
.dumps(value
, indent
=4, sort_keys
=True)
63 class TestWimThreadWithDb(TestCaseWithDatabasePerTest
):
65 super(TestWimThreadWithDb
, self
).setUp()
66 self
.persist
= WimPersistence(self
.db
)
68 account
= eg
.wim_account(0, 0)
70 self
.thread
= WimThread(self
.persist
, {}, account
)
71 self
.thread
.connector
= MagicMock()
73 def assertTasksEqual(self
, left
, right
):
74 fields
= itemgetter('item', 'item_id', 'action', 'status')
75 left_
= (t
.as_dict() for t
in left
)
76 left_
= [fields(t
) for t
in left_
]
77 right_
= [fields(t
) for t
in right
]
80 self
.assertItemsEqual(left_
, right_
)
81 except AssertionError:
82 print('left', _repr(left
))
83 print('left', len(left_
), 'items')
84 print('right', len(right_
), 'items')
85 result
= list(unified_diff(_repr(sorted(left_
)).split('\n'),
86 _repr(sorted(right_
)).split('\n'),
88 print('diff:\n', '\n'.join(result
))
91 def test_reload_actions__all_create(self
):
92 # Given we have 3 CREATE actions stored in the database
93 actions
= eg
.wim_actions('CREATE',
94 action_id
=uuid('action0'), num_links
=3)
96 {'nfvo_tenants': eg
.tenant()}
99 eg
.instance_action(action_id
=uuid('action0'))},
100 {'vim_wim_actions': actions
}
103 # When we reload the tasks
104 self
.thread
.reload_actions()
105 # All of them should be inserted as pending
106 self
.assertTasksEqual(self
.thread
.pending_tasks
, actions
)
108 def test_reload_actions__all_refresh(self
):
109 # Given just DONE tasks are in the database
110 actions
= eg
.wim_actions(status
='DONE',
111 action_id
=uuid('action0'), num_links
=3)
113 {'nfvo_tenants': eg
.tenant()}
116 eg
.instance_action(action_id
=uuid('action0'))},
117 {'vim_wim_actions': actions
}
120 # When we reload the tasks
121 self
.thread
.reload_actions()
122 # All of them should be inserted as refresh
123 self
.assertTasksEqual(self
.thread
.refresh_tasks
, actions
)
125 def test_reload_actions__grouped(self
):
126 # Given we have 2 tasks for the same item in the database
127 kwargs
= {'action_id': uuid('action0')}
128 actions
= (eg
.wim_actions('CREATE', **kwargs
) +
129 eg
.wim_actions('FIND', **kwargs
))
130 for i
, action
in enumerate(actions
):
131 action
['task_index'] = i
134 {'nfvo_tenants': eg
.tenant()}
136 {'instance_actions': eg
.instance_action(**kwargs
)},
137 {'vim_wim_actions': actions
}
140 # When we reload the tasks
141 self
.thread
.reload_actions()
142 # Just one group should be created
143 self
.assertEqual(len(self
.thread
.grouped_tasks
.values()), 1)
145 def test_reload_actions__delete_scheduled(self
):
146 # Given we have 3 tasks for the same item in the database, but one of
147 # them is a DELETE task and it is SCHEDULED
148 kwargs
= {'action_id': uuid('action0')}
149 actions
= (eg
.wim_actions('CREATE', **kwargs
) +
150 eg
.wim_actions('FIND', **kwargs
) +
151 eg
.wim_actions('DELETE', status
='SCHEDULED', **kwargs
))
152 for i
, action
in enumerate(actions
):
153 action
['task_index'] = i
156 {'nfvo_tenants': eg
.tenant()}
158 {'instance_actions': eg
.instance_action(**kwargs
)},
159 {'vim_wim_actions': actions
}
162 # When we reload the tasks
163 self
.thread
.reload_actions()
164 # Just one group should be created
165 self
.assertEqual(len(self
.thread
.grouped_tasks
.values()), 1)
167 def test_reload_actions__delete_done(self
):
168 # Given we have 3 tasks for the same item in the database, but one of
169 # them is a DELETE task and it is not SCHEDULED
170 kwargs
= {'action_id': uuid('action0')}
171 actions
= (eg
.wim_actions('CREATE', **kwargs
) +
172 eg
.wim_actions('FIND', **kwargs
) +
173 eg
.wim_actions('DELETE', status
='DONE', **kwargs
))
174 for i
, action
in enumerate(actions
):
175 action
['task_index'] = i
178 {'nfvo_tenants': eg
.tenant()}
180 {'instance_actions': eg
.instance_action(**kwargs
)},
181 {'vim_wim_actions': actions
}
184 # When we reload the tasks
185 self
.thread
.reload_actions()
186 # No pending task should be found
187 self
.assertEqual(self
.thread
.pending_tasks
, [])
189 def test_reload_actions__batch(self
):
190 # Given the group_limit is 10, and we have 24
192 kwargs
= {'action_id': uuid('action0')}
193 actions
= (eg
.wim_actions('CREATE', num_links
=8, **kwargs
) +
194 eg
.wim_actions('FIND', num_links
=8, **kwargs
) +
195 eg
.wim_actions('FIND', num_links
=8, **kwargs
))
196 for i
, action
in enumerate(actions
):
197 action
['task_index'] = i
200 {'nfvo_tenants': eg
.tenant()}
202 {'instance_actions': eg
.instance_action(**kwargs
)},
203 {'vim_wim_actions': actions
}
206 # When we reload the tasks
207 self
.thread
.reload_actions(group_limit
)
209 # Then we should still see the actions in memory properly
210 self
.assertTasksEqual(self
.thread
.pending_tasks
, actions
)
211 self
.assertEqual(len(self
.thread
.grouped_tasks
.values()), 8)
213 @disable_foreign_keys
214 def test_process_list__refresh(self
):
215 update_wan_link
= MagicMock(wrap
=self
.persist
.update_wan_link
)
216 update_action
= MagicMock(wrap
=self
.persist
.update_wan_link
)
217 patches
= dict(update_wan_link
=update_wan_link
,
218 update_action
=update_action
)
220 with patch
.multiple(self
.persist
, **patches
):
221 # Given we have 2 tasks in the refresh queue
222 kwargs
= {'action_id': uuid('action0')}
223 actions
= (eg
.wim_actions('FIND', 'DONE', **kwargs
) +
224 eg
.wim_actions('CREATE', 'BUILD', **kwargs
))
225 for i
, action
in enumerate(actions
):
226 action
['task_index'] = i
229 [{'instance_wim_nets': eg
.instance_wim_nets()}] +
230 [{'instance_actions':
231 eg
.instance_action(num_tasks
=2, **kwargs
)}] +
232 [{'vim_wim_actions': actions
}])
234 self
.thread
.insert_pending_tasks(actions
)
236 # When we process the refresh list
237 processed
= self
.thread
.process_list('refresh')
239 # Then we should have 2 updates
240 self
.assertEqual(processed
, 2)
242 # And the database should be updated accordingly
243 self
.assertEqual(update_wan_link
.call_count
, 2)
244 self
.assertEqual(update_action
.call_count
, 2)
246 @disable_foreign_keys
247 def test_delete_superseed_create(self
):
248 # Given we insert a scheduled CREATE task
249 instance_action
= eg
.instance_action(num_tasks
=1)
250 self
.thread
.pending_tasks
= []
251 engine
= WimEngine(persistence
=self
.persist
)
252 self
.addCleanup(engine
.stop_threads
)
253 wan_links
= eg
.instance_wim_nets()
254 create_actions
= engine
.create_actions(wan_links
)
255 delete_actions
= engine
.delete_actions(wan_links
)
256 engine
.incorporate_actions(create_actions
+ delete_actions
,
259 self
.populate(instance_actions
=instance_action
,
260 vim_wim_actions
=create_actions
+ delete_actions
)
262 self
.thread
.insert_pending_tasks(create_actions
)
264 assert self
.thread
.pending_tasks
[0].is_scheduled
266 # When we insert the equivalent DELETE task
267 self
.thread
.insert_pending_tasks(delete_actions
)
269 # Then the CREATE task should be superseded
270 self
.assertEqual(self
.thread
.pending_tasks
[0].action
, 'CREATE')
271 assert self
.thread
.pending_tasks
[0].is_superseded
273 self
.thread
.process_list('pending')
274 self
.thread
.process_list('refresh')
275 self
.assertFalse(self
.thread
.pending_tasks
)
279 class TestWimThread(unittest
.TestCase
):
282 account
= eg
.wim_account(0, 0)
284 self
.persist
= MagicMock()
285 self
.thread
= WimThread(self
.persist
, {}, account
)
286 self
.thread
.connector
= MagicMock()
288 super(TestWimThread
, self
).setUp()
290 def test_process_refresh(self
):
291 # Given we have 30 tasks in the refresh queue
292 kwargs
= {'action_id': uuid('action0')}
293 actions
= eg
.wim_actions('FIND', 'DONE', num_links
=30, **kwargs
)
294 self
.thread
.insert_pending_tasks(actions
)
296 # When we process the refresh list
297 processed
= self
.thread
.process_list('refresh')
299 # Then we should have REFRESH_BATCH updates
300 self
.assertEqual(processed
, self
.thread
.BATCH
)
302 def test_process_refresh__with_superseded(self
):
303 # Given we have 30 tasks but 15 of them are superseded
304 kwargs
= {'action_id': uuid('action0')}
305 actions
= eg
.wim_actions('FIND', 'DONE', num_links
=30, **kwargs
)
306 self
.thread
.insert_pending_tasks(actions
)
307 for task
in self
.thread
.refresh_tasks
[0:30:2]:
308 task
.status
= 'SUPERSEDED'
312 # When we call the refresh_elements
313 processed
= self
.thread
.process_list('refresh')
315 # Then we should have 25 updates (since SUPERSEDED updates are cheap,
316 # they are not counted for the limits)
317 self
.assertEqual(processed
, 25)
319 # The SUPERSEDED tasks should be removed, 5 tasks should be untouched,
320 # and 10 tasks should be rescheduled
321 refresh_tasks
= self
.thread
.refresh_tasks
322 old
= [t
for t
in refresh_tasks
if t
.process_at
<= now
]
323 new
= [t
for t
in refresh_tasks
if t
.process_at
> now
]
324 self
.assertEqual(len(old
), 5)
325 self
.assertEqual(len(new
), 10)
326 self
.assertEqual(len(self
.thread
.refresh_tasks
), 15)
329 if __name__
== '__main__':