Revert "Removing deprecated/unused/outdated code"
[osm/RO.git] / RO / osm_ro / wim / wim_thread.py
diff --git a/RO/osm_ro/wim/wim_thread.py b/RO/osm_ro/wim/wim_thread.py
new file mode 100644 (file)
index 0000000..7d57d55
--- /dev/null
@@ -0,0 +1,443 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+
+"""
+Thread-based interaction with WIMs. Tasks are stored in the
+database (vim_wim_actions table) and processed sequentially
+
+Please check the Action class for information about the content of each action.
+"""
+
+import logging
+import threading
+from contextlib import contextmanager
+from functools import partial
+from itertools import islice, chain, takewhile
+from operator import itemgetter, attrgetter
+# from sys import exc_info
+from time import time, sleep
+
+import queue
+
+from . import wan_link_actions
+from ..utils import ensure, partition, pipe
+from .actions import IGNORE, PENDING, REFRESH
+from .errors import (
+    DbBaseException,
+    QueueFull,
+    InvalidParameters as Invalid,
+    UndefinedAction,
+)
+from osm_ro_plugin.sdn_failing import SdnFailingConnector
+from osm_ro_plugin.sdnconn import SdnConnectorError
+from osm_ro_plugin.sdn_dummy import SdnDummyConnector
+
+ACTIONS = {
+    'instance_wim_nets': wan_link_actions.ACTIONS
+}
+
+CONNECTORS = {
+    # "odl": wimconn_odl.OdlConnector,
+    "dummy": SdnDummyConnector,
+    # Add extra connectors here not managed via plugins
+}
+
+
+class WimThread(threading.Thread):
+    """Specialized task queue implementation that runs in an isolated thread.
+
+    Objects of this class have a few methods that are intended to be used
+    outside of the thread:
+
+    - start
+    - insert_task
+    - reload
+    - exit
+
+    All the other methods are used internally to manipulate/process the task
+    queue.
+    """
+    RETRY_SCHEDULED = 10  # 10 seconds
+    REFRESH_BUILD = 10    # 10 seconds
+    REFRESH_ACTIVE = 60   # 1 minute
+    BATCH = 10            # 10 actions per round
+    QUEUE_SIZE = 2000
+    RECOVERY_TIME = 5     # Sleep 5s to leave the system some time to recover
+    MAX_RECOVERY_TIME = 180
+    WAITING_TIME = 1      # Wait 1s for taks to arrive, when there are none
+
+    def __init__(self, persistence, plugins, wim_account, logger=None, ovim=None):
+        """Init a thread.
+
+        Arguments:
+            persistence: Database abstraction layer
+            plugins: dictionary with the vim/sdn plugins
+            wim_account: Record containing wim_account, tenant and wim
+                information.
+        """
+        name = '{}.{}.{}'.format(wim_account['wim']['name'],
+                                 wim_account['name'], wim_account['uuid'])
+        super(WimThread, self).__init__(name=name)
+        self.plugins = plugins
+        if "rosdn_dummy" not in self.plugins:
+            self.plugins["rosdn_dummy"] = SdnDummyConnector
+
+        self.name = name
+        self.connector = None
+        self.wim_account = wim_account
+
+        self.logger = logger or logging.getLogger('openmano.wim.'+self.name)
+        self.persist = persistence
+        self.ovim = ovim
+
+        self.task_queue = queue.Queue(self.QUEUE_SIZE)
+
+        self.refresh_tasks = []
+        """Time ordered task list for refreshing the status of WIM nets"""
+
+        self.pending_tasks = []
+        """Time ordered task list for creation, deletion of WIM nets"""
+
+        self.grouped_tasks = {}
+        """ It contains all the creation/deletion pending tasks grouped by
+        its concrete vm, net, etc
+
+            <item><item_id>:
+                -   <task1>  # e.g. CREATE task
+                    <task2>  # e.g. DELETE task
+        """
+
+        self._insert_task = {
+            PENDING: partial(self.schedule, list_name='pending'),
+            REFRESH: partial(self.schedule, list_name='refresh'),
+            IGNORE: lambda task, *_, **__: task.save(self.persist)}
+        """Send the task to the right processing queue"""
+
+    def on_start(self):
+        """Run a series of procedures every time the thread (re)starts"""
+        self.connector = self.get_connector()
+        self.reload_actions()
+
+    def get_connector(self):
+        """Create an WimConnector instance according to the wim.type"""
+        error_msg = ''
+        account_id = self.wim_account['uuid']
+        try:
+            account = self.persist.get_wim_account_by(
+                uuid=account_id, hide=None)  # Credentials need to be available
+            wim = account['wim']
+            mapping = self.persist.query('wim_port_mappings',
+                                         WHERE={'wim_id': wim['uuid']},
+                                         error_if_none=False)
+            if wim["type"] in CONNECTORS:
+                return CONNECTORS[wim['type']](wim, account, {'service_endpoint_mapping': mapping or []})
+            else:    # load a plugin
+                return self.plugins["rosdn_" + wim["type"]](
+                    wim, account, {'service_endpoint_mapping': mapping or []})
+        except DbBaseException as ex:
+            error_msg = ('Error when retrieving WIM account ({})\n'
+                         .format(account_id)) + str(ex)
+            self.logger.error(error_msg, exc_info=True)
+        except KeyError as ex:
+            error_msg = ('Unable to find the WIM connector for WIM ({})\n'
+                         .format(wim['type'])) + str(ex)
+            self.logger.error(error_msg)
+        except (SdnConnectorError, Exception) as ex:
+            # TODO: Remove the Exception class here when the connector class is
+            # ready
+            error_msg = ('Error when loading WIM connector for WIM ({})\n'
+                         .format(wim['type'])) + str(ex)
+            self.logger.error(error_msg, exc_info=True)
+
+        error_msg_extra = ('Any task targeting WIM account {} ({}) will fail.'
+                           .format(account_id, self.wim_account.get('name')))
+        self.logger.warning(error_msg_extra)
+        return SdnFailingConnector(error_msg + '\n' + error_msg_extra)
+
+    @contextmanager
+    def avoid_exceptions(self):
+        """Make a real effort to keep the thread alive, by avoiding the
+        exceptions. They are instead logged as a critical errors.
+        """
+        try:
+            yield
+        except Exception as ex:
+            self.logger.critical("Unexpected exception %s", ex, exc_info=True)
+            sleep(self.RECOVERY_TIME)
+
+    def reload_actions(self, group_limit=100):
+        """Read actions from database and reload them at memory.
+
+        This method will clean and reload the attributes ``refresh_tasks``,
+        ``pending_tasks`` and ``grouped_tasks``
+
+        Attributes:
+            group_limit (int): maximum number of action groups (those that
+                refer to the same ``<item, item_id>``) to be retrieved from the
+                database in each batch.
+        """
+
+        # First we clean the cache to let the garbage collector work
+        self.refresh_tasks = []
+        self.pending_tasks = []
+        self.grouped_tasks = {}
+
+        offset = 0
+
+        while True:
+            # Do things in batches
+            task_groups = self.persist.get_actions_in_groups(
+                self.wim_account['uuid'], item_types=('instance_wim_nets',),
+                group_offset=offset, group_limit=group_limit)
+            offset += (group_limit - 1)  # Update for the next batch
+
+            if not task_groups:
+                break
+
+            pending_groups = (g for _, g in task_groups if is_pending_group(g))
+
+            for task_list in pending_groups:
+                with self.avoid_exceptions():
+                    self.insert_pending_tasks(filter_pending_tasks(task_list))
+
+            self.logger.debug(
+                'Reloaded wim actions pending: %d refresh: %d',
+                len(self.pending_tasks), len(self.refresh_tasks))
+
+    def insert_pending_tasks(self, task_list):
+        """Insert task in the list of actions being processed"""
+        task_list = [action_from(task, self.logger) for task in task_list]
+
+        for task in task_list:
+            group = task.group_key
+            self.grouped_tasks.setdefault(group, [])
+            # Each task can try to supersede the other ones,
+            # but just DELETE actions will actually do
+            task.supersede(self.grouped_tasks[group])
+            self.grouped_tasks[group].append(task)
+
+        # We need a separate loop so each task can check all the other
+        # ones before deciding
+        for task in task_list:
+            self._insert_task[task.processing](task)
+            self.logger.debug('Insert WIM task: %s (%s): %s %s',
+                              task.id, task.status, task.action, task.item)
+
+    def schedule(self, task, when=None, list_name='pending'):
+        """Insert a task in the correct list, respecting the schedule.
+        The refreshing list is ordered by threshold_time (task.process_at)
+        It is assumed that this is called inside this thread
+
+        Arguments:
+            task (Action): object representing the task.
+                This object must implement the ``process`` method and inherit
+                from the ``Action`` class
+            list_name: either 'refresh' or 'pending'
+            when (float): unix time in seconds since as a float number
+        """
+        processing_list = {'refresh': self.refresh_tasks,
+                           'pending': self.pending_tasks}[list_name]
+
+        when = when or time()
+        task.process_at = when
+
+        schedule = (t.process_at for t in processing_list)
+        index = len(list(takewhile(lambda moment: moment <= when, schedule)))
+
+        processing_list.insert(index, task)
+        self.logger.debug(
+            'Schedule of %s in "%s" - waiting position: %d (%f)',
+            task.id, list_name, index, task.process_at)
+
+        return task
+
+    def process_list(self, list_name='pending'):
+        """Process actions in batches and reschedule them if necessary"""
+        task_list, handler = {
+            'refresh': (self.refresh_tasks, self._refresh_single),
+            'pending': (self.pending_tasks, self._process_single)}[list_name]
+
+        now = time()
+        waiting = ((i, task) for i, task in enumerate(task_list)
+                   if task.process_at is None or task.process_at <= now)
+
+        is_superseded = pipe(itemgetter(1), attrgetter('is_superseded'))
+        superseded, active = partition(is_superseded, waiting)
+        superseded = [(i, t.save(self.persist)) for i, t in superseded]
+
+        batch = islice(active, self.BATCH)
+        refreshed = [(i, handler(t)) for i, t in batch]
+
+        # Since pop changes the indexes in the list, we need to do it backwards
+        remove = sorted([i for i, _ in chain(refreshed, superseded)])
+        return len([task_list.pop(i) for i in reversed(remove)])
+
+    def _refresh_single(self, task):
+        """Refresh just a single task, and reschedule it if necessary"""
+        now = time()
+
+        result = task.refresh(self.connector, self.persist)
+        self.logger.debug('Refreshing WIM task: %s (%s): %s %s => %r',
+                          task.id, task.status, task.action, task.item, result)
+
+        interval = self.REFRESH_BUILD if task.is_build else self.REFRESH_ACTIVE
+        self.schedule(task, now + interval, 'refresh')
+
+        return result
+
+    def _process_single(self, task):
+        """Process just a single task, and reschedule it if necessary"""
+        now = time()
+
+        result = task.process(self.connector, self.persist, self.ovim)
+        self.logger.debug('Executing WIM task: %s (%s): %s %s => %r',
+                          task.id, task.status, task.action, task.item, result)
+
+        if task.action == 'DELETE':
+            del self.grouped_tasks[task.group_key]
+
+        self._insert_task[task.processing](task, now + self.RETRY_SCHEDULED)
+
+        return result
+
+    def insert_task(self, task):
+        """Send a message to the running thread
+
+        This function is supposed to be called outside of the WIM Thread.
+
+        Arguments:
+            task (str or dict): `"exit"`, `"reload"` or dict representing a
+                task. For more information about the fields in task, please
+                check the Action class.
+        """
+        try:
+            self.task_queue.put(task, False)
+            return None
+        except queue.Full as e:
+            ex = QueueFull(self.name)
+            raise ex from e
+
+    def reload(self):
+        """Send a message to the running thread to reload itself"""
+        self.insert_task('reload')
+
+    def exit(self):
+        """Send a message to the running thread to kill itself"""
+        self.insert_task('exit')
+
+    def run(self):
+        self.logger.debug('Starting: %s', self.name)
+        recovery_time = 0
+        while True:
+            self.on_start()
+            reload_thread = False
+            self.logger.debug('Reloaded: %s', self.name)
+
+            while True:
+                with self.avoid_exceptions():
+                    while not self.task_queue.empty():
+                        task = self.task_queue.get()
+                        if isinstance(task, dict):
+                            self.insert_pending_tasks([task])
+                        elif isinstance(task, list):
+                            self.insert_pending_tasks(task)
+                        elif isinstance(task, str):
+                            if task == 'exit':
+                                self.logger.debug('Finishing: %s', self.name)
+                                return 0
+                            elif task == 'reload':
+                                reload_thread = True
+                                break
+                        self.task_queue.task_done()
+
+                    if reload_thread:
+                        break
+
+                    if not(self.process_list('pending') +
+                           self.process_list('refresh')):
+                        sleep(self.WAITING_TIME)
+
+                    if isinstance(self.connector, SdnFailingConnector):
+                        # Wait sometime to try instantiating the connector
+                        # again and restart
+                        # Increase the recovery time if restarting is not
+                        # working (up to a limit)
+                        recovery_time = min(self.MAX_RECOVERY_TIME,
+                                            recovery_time + self.RECOVERY_TIME)
+                        sleep(recovery_time)
+                        break
+                    else:
+                        recovery_time = 0
+
+        self.logger.debug("Finishing")
+
+
+def is_pending_group(group):
+    return all(task['action'] != 'DELETE' or
+               task['status'] == 'SCHEDULED'
+               for task in group)
+
+
+def filter_pending_tasks(group):
+    return (t for t in group
+            if (t['status'] == 'SCHEDULED' or
+                t['action'] in ('CREATE', 'FIND')))
+
+
+def action_from(record, logger=None, mapping=ACTIONS):
+    """Create an Action object from a action record (dict)
+
+    Arguments:
+        mapping (dict): Nested data structure that maps the relationship
+            between action properties and object constructors.  This data
+            structure should be a dict with 2 levels of keys: item type and
+            action type. Example::
+                {'wan_link':
+                    {'CREATE': WanLinkCreate}
+                    ...}
+                ...}
+        record (dict): action information
+
+    Return:
+        (Action.Base): Object representing the action
+    """
+    ensure('item' in record, Invalid('`record` should contain "item"'))
+    ensure('action' in record, Invalid('`record` should contain "action"'))
+
+    try:
+        factory = mapping[record['item']][record['action']]
+        return factory(record, logger=logger)
+    except KeyError as e:
+        ex = UndefinedAction(record['item'], record['action'])
+        raise ex from e