Revert "Removing deprecated/unused/outdated code"
[osm/RO.git] / RO / osm_ro / wim / tests / test_wim_thread.py
diff --git a/RO/osm_ro/wim/tests/test_wim_thread.py b/RO/osm_ro/wim/tests/test_wim_thread.py
new file mode 100644 (file)
index 0000000..7ad66c2
--- /dev/null
@@ -0,0 +1,330 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+
+import unittest
+from difflib import unified_diff
+from operator import itemgetter
+from time import time
+
+import json
+
+from unittest.mock import MagicMock, patch
+
+from . import fixtures as eg
+from ...tests.db_helpers import (
+    TestCaseWithDatabasePerTest,
+    disable_foreign_keys,
+    uuid
+)
+from ..engine import WimEngine
+from ..persistence import WimPersistence
+from ..wim_thread import WimThread
+
+
+ignore_connector = patch('osm_ro.wim.wim_thread.CONNECTORS', MagicMock())
+
+
+def _repr(value):
+    return json.dumps(value, indent=4, sort_keys=True)
+
+
+@ignore_connector
+class TestWimThreadWithDb(TestCaseWithDatabasePerTest):
+    def setUp(self):
+        super(TestWimThreadWithDb, self).setUp()
+        self.persist = WimPersistence(self.db)
+        wim = eg.wim(0)
+        account = eg.wim_account(0, 0)
+        account['wim'] = wim
+        self.thread = WimThread(self.persist, {}, account)
+        self.thread.connector = MagicMock()
+
+    def assertTasksEqual(self, left, right):
+        fields = itemgetter('item', 'item_id', 'action', 'status')
+        left_ = (t.as_dict() for t in left)
+        left_ = [fields(t) for t in left_]
+        right_ = [fields(t) for t in right]
+
+        try:
+            self.assertItemsEqual(left_, right_)
+        except AssertionError:
+            print('left', _repr(left))
+            print('left', len(left_), 'items')
+            print('right', len(right_), 'items')
+            result = list(unified_diff(_repr(sorted(left_)).split('\n'),
+                                       _repr(sorted(right_)).split('\n'),
+                                       'left', 'right'))
+            print('diff:\n', '\n'.join(result))
+            raise
+
+    def test_reload_actions__all_create(self):
+        # Given we have 3 CREATE actions stored in the database
+        actions = eg.wim_actions('CREATE',
+                                 action_id=uuid('action0'), num_links=3)
+        self.populate([
+            {'nfvo_tenants': eg.tenant()}
+        ] + eg.wim_set() + [
+            {'instance_actions':
+                eg.instance_action(action_id=uuid('action0'))},
+            {'vim_wim_actions': actions}
+        ])
+
+        # When we reload the tasks
+        self.thread.reload_actions()
+        # All of them should be inserted as pending
+        self.assertTasksEqual(self.thread.pending_tasks, actions)
+
+    def test_reload_actions__all_refresh(self):
+        # Given just DONE tasks are in the database
+        actions = eg.wim_actions(status='DONE',
+                                 action_id=uuid('action0'), num_links=3)
+        self.populate([
+            {'nfvo_tenants': eg.tenant()}
+        ] + eg.wim_set() + [
+            {'instance_actions':
+                eg.instance_action(action_id=uuid('action0'))},
+            {'vim_wim_actions': actions}
+        ])
+
+        # When we reload the tasks
+        self.thread.reload_actions()
+        # All of them should be inserted as refresh
+        self.assertTasksEqual(self.thread.refresh_tasks, actions)
+
+    def test_reload_actions__grouped(self):
+        # Given we have 2 tasks for the same item in the database
+        kwargs = {'action_id': uuid('action0')}
+        actions = (eg.wim_actions('CREATE', **kwargs) +
+                   eg.wim_actions('FIND', **kwargs))
+        for i, action in enumerate(actions):
+            action['task_index'] = i
+
+        self.populate([
+            {'nfvo_tenants': eg.tenant()}
+        ] + eg.wim_set() + [
+            {'instance_actions': eg.instance_action(**kwargs)},
+            {'vim_wim_actions': actions}
+        ])
+
+        # When we reload the tasks
+        self.thread.reload_actions()
+        # Just one group should be created
+        self.assertEqual(len(self.thread.grouped_tasks.values()), 1)
+
+    def test_reload_actions__delete_scheduled(self):
+        # Given we have 3 tasks for the same item in the database, but one of
+        # them is a DELETE task and it is SCHEDULED
+        kwargs = {'action_id': uuid('action0')}
+        actions = (eg.wim_actions('CREATE', **kwargs) +
+                   eg.wim_actions('FIND', **kwargs) +
+                   eg.wim_actions('DELETE', status='SCHEDULED', **kwargs))
+        for i, action in enumerate(actions):
+            action['task_index'] = i
+
+        self.populate([
+            {'nfvo_tenants': eg.tenant()}
+        ] + eg.wim_set() + [
+            {'instance_actions': eg.instance_action(**kwargs)},
+            {'vim_wim_actions': actions}
+        ])
+
+        # When we reload the tasks
+        self.thread.reload_actions()
+        # Just one group should be created
+        self.assertEqual(len(self.thread.grouped_tasks.values()), 1)
+
+    def test_reload_actions__delete_done(self):
+        # Given we have 3 tasks for the same item in the database, but one of
+        # them is a DELETE task and it is not SCHEDULED
+        kwargs = {'action_id': uuid('action0')}
+        actions = (eg.wim_actions('CREATE', **kwargs) +
+                   eg.wim_actions('FIND', **kwargs) +
+                   eg.wim_actions('DELETE', status='DONE', **kwargs))
+        for i, action in enumerate(actions):
+            action['task_index'] = i
+
+        self.populate([
+            {'nfvo_tenants': eg.tenant()}
+        ] + eg.wim_set() + [
+            {'instance_actions': eg.instance_action(**kwargs)},
+            {'vim_wim_actions': actions}
+        ])
+
+        # When we reload the tasks
+        self.thread.reload_actions()
+        # No pending task should be found
+        self.assertEqual(self.thread.pending_tasks, [])
+
+    def test_reload_actions__batch(self):
+        # Given the group_limit is 10, and we have 24
+        group_limit = 10
+        kwargs = {'action_id': uuid('action0')}
+        actions = (eg.wim_actions('CREATE', num_links=8, **kwargs) +
+                   eg.wim_actions('FIND', num_links=8, **kwargs) +
+                   eg.wim_actions('FIND', num_links=8, **kwargs))
+        for i, action in enumerate(actions):
+            action['task_index'] = i
+
+        self.populate([
+            {'nfvo_tenants': eg.tenant()}
+        ] + eg.wim_set() + [
+            {'instance_actions': eg.instance_action(**kwargs)},
+            {'vim_wim_actions': actions}
+        ])
+
+        # When we reload the tasks
+        self.thread.reload_actions(group_limit)
+
+        # Then we should still see the actions in memory properly
+        self.assertTasksEqual(self.thread.pending_tasks, actions)
+        self.assertEqual(len(self.thread.grouped_tasks.values()), 8)
+
+    @disable_foreign_keys
+    def test_process_list__refresh(self):
+        update_wan_link = MagicMock(wrap=self.persist.update_wan_link)
+        update_action = MagicMock(wrap=self.persist.update_wan_link)
+        patches = dict(update_wan_link=update_wan_link,
+                       update_action=update_action)
+
+        with patch.multiple(self.persist, **patches):
+            # Given we have 2 tasks in the refresh queue
+            kwargs = {'action_id': uuid('action0')}
+            actions = (eg.wim_actions('FIND', 'DONE', **kwargs) +
+                       eg.wim_actions('CREATE', 'BUILD', **kwargs))
+            for i, action in enumerate(actions):
+                action['task_index'] = i
+
+            self.populate(
+                [{'instance_wim_nets': eg.instance_wim_nets()}] +
+                [{'instance_actions':
+                    eg.instance_action(num_tasks=2, **kwargs)}] +
+                [{'vim_wim_actions': actions}])
+
+            self.thread.insert_pending_tasks(actions)
+
+            # When we process the refresh list
+            processed = self.thread.process_list('refresh')
+
+            # Then we should have 2 updates
+            self.assertEqual(processed, 2)
+
+            # And the database should be updated accordingly
+            self.assertEqual(update_wan_link.call_count, 2)
+            self.assertEqual(update_action.call_count, 2)
+
+    @disable_foreign_keys
+    def test_delete_superseed_create(self):
+        # Given we insert a scheduled CREATE task
+        instance_action = eg.instance_action(num_tasks=1)
+        self.thread.pending_tasks = []
+        engine = WimEngine(persistence=self.persist)
+        self.addCleanup(engine.stop_threads)
+        wan_links = eg.instance_wim_nets()
+        create_actions = engine.create_actions(wan_links)
+        delete_actions = engine.delete_actions(wan_links)
+        engine.incorporate_actions(create_actions + delete_actions,
+                                   instance_action)
+
+        self.populate(instance_actions=instance_action,
+                      vim_wim_actions=create_actions + delete_actions)
+
+        self.thread.insert_pending_tasks(create_actions)
+
+        assert self.thread.pending_tasks[0].is_scheduled
+
+        # When we insert the equivalent DELETE task
+        self.thread.insert_pending_tasks(delete_actions)
+
+        # Then the CREATE task should be superseded
+        self.assertEqual(self.thread.pending_tasks[0].action, 'CREATE')
+        assert self.thread.pending_tasks[0].is_superseded
+
+        self.thread.process_list('pending')
+        self.thread.process_list('refresh')
+        self.assertFalse(self.thread.pending_tasks)
+
+
+@ignore_connector
+class TestWimThread(unittest.TestCase):
+    def setUp(self):
+        wim = eg.wim(0)
+        account = eg.wim_account(0, 0)
+        account['wim'] = wim
+        self.persist = MagicMock()
+        self.thread = WimThread(self.persist, {}, account)
+        self.thread.connector = MagicMock()
+
+        super(TestWimThread, self).setUp()
+
+    def test_process_refresh(self):
+        # Given we have 30 tasks in the refresh queue
+        kwargs = {'action_id': uuid('action0')}
+        actions = eg.wim_actions('FIND', 'DONE', num_links=30, **kwargs)
+        self.thread.insert_pending_tasks(actions)
+
+        # When we process the refresh list
+        processed = self.thread.process_list('refresh')
+
+        # Then we should have REFRESH_BATCH updates
+        self.assertEqual(processed, self.thread.BATCH)
+
+    def test_process_refresh__with_superseded(self):
+        # Given we have 30 tasks but 15 of them are superseded
+        kwargs = {'action_id': uuid('action0')}
+        actions = eg.wim_actions('FIND', 'DONE', num_links=30, **kwargs)
+        self.thread.insert_pending_tasks(actions)
+        for task in self.thread.refresh_tasks[0:30:2]:
+            task.status = 'SUPERSEDED'
+
+        now = time()
+
+        # When we call the refresh_elements
+        processed = self.thread.process_list('refresh')
+
+        # Then we should have 25 updates (since SUPERSEDED updates are cheap,
+        # they are not counted for the limits)
+        self.assertEqual(processed, 25)
+
+        # The SUPERSEDED tasks should be removed, 5 tasks should be untouched,
+        # and 10 tasks should be rescheduled
+        refresh_tasks = self.thread.refresh_tasks
+        old = [t for t in refresh_tasks if t.process_at <= now]
+        new = [t for t in refresh_tasks if t.process_at > now]
+        self.assertEqual(len(old), 5)
+        self.assertEqual(len(new), 10)
+        self.assertEqual(len(self.thread.refresh_tasks), 15)
+
+
+if __name__ == '__main__':
+    unittest.main()