X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=RO%2Fosm_ro%2Fwim%2Ftests%2Ftest_wim_thread.py;fp=RO%2Fosm_ro%2Fwim%2Ftests%2Ftest_wim_thread.py;h=7ad66c2d2f24039a1d92d38fa6018b38de451d1b;hb=51cc9c4c78bb54c84a5d2399207d206eb30ea247;hp=0000000000000000000000000000000000000000;hpb=9f40121f66e644ddf700720d8d4bdf464f6dd414;p=osm%2FRO.git diff --git a/RO/osm_ro/wim/tests/test_wim_thread.py b/RO/osm_ro/wim/tests/test_wim_thread.py new file mode 100644 index 00000000..7ad66c2d --- /dev/null +++ b/RO/osm_ro/wim/tests/test_wim_thread.py @@ -0,0 +1,330 @@ +# -*- coding: utf-8 -*- +## +# Copyright 2018 University of Bristol - High Performance Networks Research +# Group +# All Rights Reserved. +# +# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique +# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: +# +# Neither the name of the University of Bristol nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# This work has been performed in the context of DCMS UK 5G Testbeds +# & Trials Programme and in the framework of the Metro-Haul project - +# funded by the European Commission under Grant number 761727 through the +# Horizon 2020 and 5G-PPP programmes. +## + +import unittest +from difflib import unified_diff +from operator import itemgetter +from time import time + +import json + +from unittest.mock import MagicMock, patch + +from . import fixtures as eg +from ...tests.db_helpers import ( + TestCaseWithDatabasePerTest, + disable_foreign_keys, + uuid +) +from ..engine import WimEngine +from ..persistence import WimPersistence +from ..wim_thread import WimThread + + +ignore_connector = patch('osm_ro.wim.wim_thread.CONNECTORS', MagicMock()) + + +def _repr(value): + return json.dumps(value, indent=4, sort_keys=True) + + +@ignore_connector +class TestWimThreadWithDb(TestCaseWithDatabasePerTest): + def setUp(self): + super(TestWimThreadWithDb, self).setUp() + self.persist = WimPersistence(self.db) + wim = eg.wim(0) + account = eg.wim_account(0, 0) + account['wim'] = wim + self.thread = WimThread(self.persist, {}, account) + self.thread.connector = MagicMock() + + def assertTasksEqual(self, left, right): + fields = itemgetter('item', 'item_id', 'action', 'status') + left_ = (t.as_dict() for t in left) + left_ = [fields(t) for t in left_] + right_ = [fields(t) for t in right] + + try: + self.assertItemsEqual(left_, right_) + except AssertionError: + print('left', _repr(left)) + print('left', len(left_), 'items') + print('right', len(right_), 'items') + result = list(unified_diff(_repr(sorted(left_)).split('\n'), + _repr(sorted(right_)).split('\n'), + 'left', 'right')) + print('diff:\n', '\n'.join(result)) + raise + + def test_reload_actions__all_create(self): + # Given we have 3 CREATE actions stored in the database + actions = eg.wim_actions('CREATE', + action_id=uuid('action0'), num_links=3) + self.populate([ + {'nfvo_tenants': eg.tenant()} + ] + eg.wim_set() + [ + {'instance_actions': + eg.instance_action(action_id=uuid('action0'))}, + {'vim_wim_actions': actions} + ]) + + # When we reload the tasks + self.thread.reload_actions() + # All of them should be inserted as pending + self.assertTasksEqual(self.thread.pending_tasks, actions) + + def test_reload_actions__all_refresh(self): + # Given just DONE tasks are in the database + actions = eg.wim_actions(status='DONE', + action_id=uuid('action0'), num_links=3) + self.populate([ + {'nfvo_tenants': eg.tenant()} + ] + eg.wim_set() + [ + {'instance_actions': + eg.instance_action(action_id=uuid('action0'))}, + {'vim_wim_actions': actions} + ]) + + # When we reload the tasks + self.thread.reload_actions() + # All of them should be inserted as refresh + self.assertTasksEqual(self.thread.refresh_tasks, actions) + + def test_reload_actions__grouped(self): + # Given we have 2 tasks for the same item in the database + kwargs = {'action_id': uuid('action0')} + actions = (eg.wim_actions('CREATE', **kwargs) + + eg.wim_actions('FIND', **kwargs)) + for i, action in enumerate(actions): + action['task_index'] = i + + self.populate([ + {'nfvo_tenants': eg.tenant()} + ] + eg.wim_set() + [ + {'instance_actions': eg.instance_action(**kwargs)}, + {'vim_wim_actions': actions} + ]) + + # When we reload the tasks + self.thread.reload_actions() + # Just one group should be created + self.assertEqual(len(self.thread.grouped_tasks.values()), 1) + + def test_reload_actions__delete_scheduled(self): + # Given we have 3 tasks for the same item in the database, but one of + # them is a DELETE task and it is SCHEDULED + kwargs = {'action_id': uuid('action0')} + actions = (eg.wim_actions('CREATE', **kwargs) + + eg.wim_actions('FIND', **kwargs) + + eg.wim_actions('DELETE', status='SCHEDULED', **kwargs)) + for i, action in enumerate(actions): + action['task_index'] = i + + self.populate([ + {'nfvo_tenants': eg.tenant()} + ] + eg.wim_set() + [ + {'instance_actions': eg.instance_action(**kwargs)}, + {'vim_wim_actions': actions} + ]) + + # When we reload the tasks + self.thread.reload_actions() + # Just one group should be created + self.assertEqual(len(self.thread.grouped_tasks.values()), 1) + + def test_reload_actions__delete_done(self): + # Given we have 3 tasks for the same item in the database, but one of + # them is a DELETE task and it is not SCHEDULED + kwargs = {'action_id': uuid('action0')} + actions = (eg.wim_actions('CREATE', **kwargs) + + eg.wim_actions('FIND', **kwargs) + + eg.wim_actions('DELETE', status='DONE', **kwargs)) + for i, action in enumerate(actions): + action['task_index'] = i + + self.populate([ + {'nfvo_tenants': eg.tenant()} + ] + eg.wim_set() + [ + {'instance_actions': eg.instance_action(**kwargs)}, + {'vim_wim_actions': actions} + ]) + + # When we reload the tasks + self.thread.reload_actions() + # No pending task should be found + self.assertEqual(self.thread.pending_tasks, []) + + def test_reload_actions__batch(self): + # Given the group_limit is 10, and we have 24 + group_limit = 10 + kwargs = {'action_id': uuid('action0')} + actions = (eg.wim_actions('CREATE', num_links=8, **kwargs) + + eg.wim_actions('FIND', num_links=8, **kwargs) + + eg.wim_actions('FIND', num_links=8, **kwargs)) + for i, action in enumerate(actions): + action['task_index'] = i + + self.populate([ + {'nfvo_tenants': eg.tenant()} + ] + eg.wim_set() + [ + {'instance_actions': eg.instance_action(**kwargs)}, + {'vim_wim_actions': actions} + ]) + + # When we reload the tasks + self.thread.reload_actions(group_limit) + + # Then we should still see the actions in memory properly + self.assertTasksEqual(self.thread.pending_tasks, actions) + self.assertEqual(len(self.thread.grouped_tasks.values()), 8) + + @disable_foreign_keys + def test_process_list__refresh(self): + update_wan_link = MagicMock(wrap=self.persist.update_wan_link) + update_action = MagicMock(wrap=self.persist.update_wan_link) + patches = dict(update_wan_link=update_wan_link, + update_action=update_action) + + with patch.multiple(self.persist, **patches): + # Given we have 2 tasks in the refresh queue + kwargs = {'action_id': uuid('action0')} + actions = (eg.wim_actions('FIND', 'DONE', **kwargs) + + eg.wim_actions('CREATE', 'BUILD', **kwargs)) + for i, action in enumerate(actions): + action['task_index'] = i + + self.populate( + [{'instance_wim_nets': eg.instance_wim_nets()}] + + [{'instance_actions': + eg.instance_action(num_tasks=2, **kwargs)}] + + [{'vim_wim_actions': actions}]) + + self.thread.insert_pending_tasks(actions) + + # When we process the refresh list + processed = self.thread.process_list('refresh') + + # Then we should have 2 updates + self.assertEqual(processed, 2) + + # And the database should be updated accordingly + self.assertEqual(update_wan_link.call_count, 2) + self.assertEqual(update_action.call_count, 2) + + @disable_foreign_keys + def test_delete_superseed_create(self): + # Given we insert a scheduled CREATE task + instance_action = eg.instance_action(num_tasks=1) + self.thread.pending_tasks = [] + engine = WimEngine(persistence=self.persist) + self.addCleanup(engine.stop_threads) + wan_links = eg.instance_wim_nets() + create_actions = engine.create_actions(wan_links) + delete_actions = engine.delete_actions(wan_links) + engine.incorporate_actions(create_actions + delete_actions, + instance_action) + + self.populate(instance_actions=instance_action, + vim_wim_actions=create_actions + delete_actions) + + self.thread.insert_pending_tasks(create_actions) + + assert self.thread.pending_tasks[0].is_scheduled + + # When we insert the equivalent DELETE task + self.thread.insert_pending_tasks(delete_actions) + + # Then the CREATE task should be superseded + self.assertEqual(self.thread.pending_tasks[0].action, 'CREATE') + assert self.thread.pending_tasks[0].is_superseded + + self.thread.process_list('pending') + self.thread.process_list('refresh') + self.assertFalse(self.thread.pending_tasks) + + +@ignore_connector +class TestWimThread(unittest.TestCase): + def setUp(self): + wim = eg.wim(0) + account = eg.wim_account(0, 0) + account['wim'] = wim + self.persist = MagicMock() + self.thread = WimThread(self.persist, {}, account) + self.thread.connector = MagicMock() + + super(TestWimThread, self).setUp() + + def test_process_refresh(self): + # Given we have 30 tasks in the refresh queue + kwargs = {'action_id': uuid('action0')} + actions = eg.wim_actions('FIND', 'DONE', num_links=30, **kwargs) + self.thread.insert_pending_tasks(actions) + + # When we process the refresh list + processed = self.thread.process_list('refresh') + + # Then we should have REFRESH_BATCH updates + self.assertEqual(processed, self.thread.BATCH) + + def test_process_refresh__with_superseded(self): + # Given we have 30 tasks but 15 of them are superseded + kwargs = {'action_id': uuid('action0')} + actions = eg.wim_actions('FIND', 'DONE', num_links=30, **kwargs) + self.thread.insert_pending_tasks(actions) + for task in self.thread.refresh_tasks[0:30:2]: + task.status = 'SUPERSEDED' + + now = time() + + # When we call the refresh_elements + processed = self.thread.process_list('refresh') + + # Then we should have 25 updates (since SUPERSEDED updates are cheap, + # they are not counted for the limits) + self.assertEqual(processed, 25) + + # The SUPERSEDED tasks should be removed, 5 tasks should be untouched, + # and 10 tasks should be rescheduled + refresh_tasks = self.thread.refresh_tasks + old = [t for t in refresh_tasks if t.process_at <= now] + new = [t for t in refresh_tasks if t.process_at > now] + self.assertEqual(len(old), 5) + self.assertEqual(len(new), 10) + self.assertEqual(len(self.thread.refresh_tasks), 15) + + +if __name__ == '__main__': + unittest.main()