Implement feature 5949

Enable dynamic connectivity setup in multi-site Network Services

The code required to implement the feature is contained in `osm_ro/wim`
as much as possible.

* `wim/engine.py` works together with `nfvo.py` to implement the
  feature
* `wim/persistence.py` is equivalent to `nfvo_db.py` and try to
  encapsulate most of the SQL-specific code, implementing a persistence
  layer
* `wim/http_handler.py` extends `httpserver.py` adding WIM-related HTTP
  routes
* `wim/wim_thread.py` is similar to `vim_thread.py` and controls the
  execution of WIM-related tasks
* `wim/actions.py` and `wim/wan_link_actions.py` implement the action
  handling specific code, calling instances of the `wim/wimconn.py`
  subclasses

WIM connectors are still a work in progress

Individual change details (newer to older)

- Add errors for inconsistent state

- Delay re-scheduled tasks

- Move lock to inside the persistence object

- Better errors for connector failures

- Try to cache the wan_link information before it is deleted from the database

- Integrate WanLinkDelete to NFVO

- Add WanLinkDelete implementation draft with some tests

- Add basic wim network creation

- Add minimal documentation for actions

- Add checks to the create action

- Improve documentation, rearrange insert_pending and remove unused functions on WimThread

- Integrate Action classes in refresh_tasks

- Add Action classes to avoid intricate conditions

- Adding Proposed License

- Move grouping of actions to persistence

- Change WimThread to use SQL to do the heavy lifting

- Simplify WimThread reload_actions

- Add tests for derive_wan_links

- Implement find_common_wim(s)

- Add tests for create_wim_account

- Add migration scripts for version 33

- Changes to WIM and VIM threads for vim_wim_actions

- Implement wim_account management according to the discussion

- Add WimHandler integration inside httpserver

- Add quick instructions to run the tests

- Add WIM functional tests using real database

- Add DB WIM port mapping

- RO WIM-related console scripts

- Add WIM integration to NFVO

- Improve database support focusing on tests

- RO NBI WIM-related commands in HTTP server

- Adding WIM tables to MANO DB

- Add wim http handler initial implementation

- Move http utility functions to separated files

    This separation allows the code to be reused more easily and avoids
    circular dependencies.

    (The httpserver can import other modules implementing http routes,
    and those modules can then use the utility functions without having
    to import back httpserver)

- Add a HTTP handler class and custom route decorator

    These tools can be used to create independent groups of bottle
    routes/callbacks in a OOP fashion

- Extract http error codes and related logic to separated file

Change-Id: Icd5fc9fa345852b8cf571e48f427dc10bdbd24c5
Signed-off-by: Anderson Bravalheri <a.bravalheri@bristol.ac.uk>
diff --git a/osm_ro/wim/engine.py b/osm_ro/wim/engine.py
new file mode 100644
index 0000000..dde5dee
--- /dev/null
+++ b/osm_ro/wim/engine.py
@@ -0,0 +1,455 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+
+"""This module contains the domain logic, and the implementation of the
+required steps to perform VNF management and orchestration in a WAN
+environment.
+
+It works as an extension/complement to the main functions contained in the
+``nfvo.py`` file and avoids interacting directly with the database, by relying
+on the `persistence` module.
+
+No http request handling/direct interaction with the database should be present
+in this file.
+"""
+import json
+import logging
+from contextlib import contextmanager
+from itertools import groupby
+from operator import itemgetter
+from uuid import uuid4
+
+from ..utils import remove_none_items
+from .actions import Action
+from .errors import (
+    NoWimConnectedToDatacenters,
+    WimAccountNotActive
+)
+from .wim_thread import WimThread
+
+
+class WimEngine(object):
+    """Logic supporting the establishment of WAN links when NS spans across
+    different datacenters.
+    """
+    def __init__(self, persistence, logger=None, ovim=None):
+        self.persist = persistence
+        self.logger = logger or logging.getLogger('openmano.wim.engine')
+        self.threads = {}
+        self.connectors = {}
+        self.ovim = ovim
+
+    def create_wim(self, properties):
+        """Create a new wim record according to the properties
+
+        Please check the wim schema to have more information about
+        ``properties``.
+
+        Returns:
+            str: uuid of the newly created WIM record
+        """
+        return self.persist.create_wim(properties)
+
+    def get_wim(self, uuid_or_name, tenant_id=None):
+        """Retrieve existing WIM record by name or id.
+
+        If ``tenant_id`` is specified, the query will be
+        limited to the WIM associated to the given tenant.
+        """
+        # Since it is a pure DB operation, we can delegate it directly
+        return self.persist.get_wim(uuid_or_name, tenant_id)
+
+    def update_wim(self, uuid_or_name, properties):
+        """Edit an existing WIM record.
+
+        ``properties`` is a dictionary with the properties being changed,
+        if a property is not present, the old value will be preserved
+        """
+        return self.persist.update_wim(uuid_or_name, properties)
+
+    def delete_wim(self, uuid_or_name):
+        """Kill the corresponding wim threads and erase the WIM record"""
+        # Theoretically, we can rely on the database to drop the wim_accounts
+        # automatically, since we have configures 'ON CASCADE DELETE'.
+        # However, use use `delete_wim_accounts` to kill all the running
+        # threads.
+        self.delete_wim_accounts(uuid_or_name)
+        return self.persist.delete_wim(uuid_or_name)
+
+    def create_wim_account(self, wim, tenant, properties):
+        """Create an account that associates a tenant to a WIM.
+
+        As a side effect this function will spawn a new thread
+
+        Arguments:
+            wim (str): name or uuid of the WIM related to the account being
+                created
+            tenant (str): name or uuid of the nfvo tenant to which the account
+                will be created
+            properties (dict): properties of the account
+                (eg. username, password, ...)
+
+        Returns:
+            dict: Created record
+        """
+        uuid = self.persist.create_wim_account(wim, tenant, properties)
+        account = self.persist.get_wim_account_by(uuid=uuid)
+        # ^  We need to use get_wim_account_by here, since this methods returns
+        #    all the associations, and we need the wim to create the thread
+        self._spawn_thread(account)
+        return account
+
+    def _update_single_wim_account(self, account, properties):
+        """Update WIM Account, taking care to reload the corresponding thread
+
+        Arguments:
+            account (dict): Current account record
+            properties (dict): Properties to be updated
+
+        Returns:
+            dict: updated record
+        """
+        account = self.persist.update_wim_account(account['uuid'], properties)
+        self.threads[account['uuid']].reload()
+        return account
+
+    def update_wim_accounts(self, wim, tenant, properties):
+        """Update all the accounts related to a WIM and a tenant,
+        thanking care of reloading threads.
+
+        Arguments:
+            wim (str): uuid or name of a WIM record
+            tenant (str): uuid or name of a NFVO tenant record
+            properties (dict): attributes with values to be updated
+
+        Returns
+            list: Records that were updated
+        """
+        accounts = self.persist.get_wim_accounts_by(wim, tenant)
+        return [self._update_single_wim_account(account, properties)
+                for account in accounts]
+
+    def _delete_single_wim_account(self, account):
+        """Delete WIM Account, taking care to remove the corresponding thread
+        and delete the internal WIM account, if it was automatically generated.
+
+        Arguments:
+            account (dict): Current account record
+            properties (dict): Properties to be updated
+
+        Returns:
+            dict: current record (same as input)
+        """
+        self.persist.delete_wim_account(account['uuid'])
+
+        if account['uuid'] not in self.threads:
+            raise WimAccountNotActive(
+                'Requests send to the WIM Account %s are not currently '
+                'being processed.', account['uuid'])
+        else:
+            self.threads[account['uuid']].exit()
+            del self.threads[account['uuid']]
+
+        return account
+
+    def delete_wim_accounts(self, wim, tenant=None, **kwargs):
+        """Delete all the accounts related to a WIM (and a tenant),
+        thanking care of threads and internal WIM accounts.
+
+        Arguments:
+            wim (str): uuid or name of a WIM record
+            tenant (str): uuid or name of a NFVO tenant record
+
+        Returns
+            list: Records that were deleted
+        """
+        kwargs.setdefault('error_if_none', False)
+        accounts = self.persist.get_wim_accounts_by(wim, tenant, **kwargs)
+        return [self._delete_single_wim_account(a) for a in accounts]
+
+    def _reload_wim_threads(self, wim_id):
+        for thread in self.threads.values():
+            if thread.wim_account['wim_id'] == wim_id:
+                thread.reload()
+
+    def create_wim_port_mappings(self, wim, properties, tenant=None):
+        """Store information about port mappings from Database"""
+        # TODO: Review tenants... WIMs can exist across different tenants,
+        #       and the port_mappings are a WIM property, not a wim_account
+        #       property, so the concepts are not related
+        wim = self.persist.get_by_name_or_uuid('wims', wim)
+        result = self.persist.create_wim_port_mappings(wim, properties, tenant)
+        self._reload_wim_threads(wim['uuid'])
+        return result
+
+    def get_wim_port_mappings(self, wim):
+        """Retrive information about port mappings from Database"""
+        return self.persist.get_wim_port_mappings(wim)
+
+    def delete_wim_port_mappings(self, wim):
+        """Erase the port mapping records associated with the WIM"""
+        wim = self.persist.get_by_name_or_uuid('wims', wim)
+        message = self.persist.delete_wim_port_mappings(wim['uuid'])
+        self._reload_wim_threads(wim['uuid'])
+        return message
+
+    def find_common_wims(self, datacenter_ids, tenant):
+        """Find WIMs that are common to all datacenters listed"""
+        mappings = self.persist.get_wim_port_mappings(
+            datacenter=datacenter_ids, tenant=tenant, error_if_none=False)
+
+        wim_id_of = itemgetter('wim_id')
+        sorted_mappings = sorted(mappings, key=wim_id_of)  # needed by groupby
+        grouped_mappings = groupby(sorted_mappings, key=wim_id_of)
+        mapped_datacenters = {
+            wim_id: [m['datacenter_id'] for m in mappings]
+            for wim_id, mappings in grouped_mappings
+        }
+
+        return [
+            wim_id
+            for wim_id, connected_datacenters in mapped_datacenters.items()
+            if set(connected_datacenters) >= set(datacenter_ids)
+        ]
+
+    def find_common_wim(self, datacenter_ids, tenant):
+        """Find a single WIM that is able to connect all the datacenters
+        listed
+
+        Raises
+            NoWimConnectedToDatacenters: if no WIM connected to all datacenters
+                at once is found
+        """
+        suitable_wim_ids = self.find_common_wims(datacenter_ids, tenant)
+
+        if not suitable_wim_ids:
+            raise NoWimConnectedToDatacenters(datacenter_ids)
+
+        # TODO: use a criteria to determine which WIM is going to be used,
+        #       instead of always using the first one (strategy pattern can be
+        #       used here)
+        return suitable_wim_ids[0]
+
+    def derive_wan_link(self,
+                        instance_scenario_id, sce_net_id,
+                        networks, tenant):
+        """Create a instance_wim_nets record for the given information"""
+        datacenters = [n['datacenter_id'] for n in networks]
+        wim_id = self.find_common_wim(datacenters, tenant)
+
+        account = self.persist.get_wim_account_by(wim_id, tenant)
+
+        return {
+            'uuid': str(uuid4()),
+            'instance_scenario_id': instance_scenario_id,
+            'sce_net_id': sce_net_id,
+            'wim_id': wim_id,
+            'wim_account_id': account['uuid']
+        }
+
+    def derive_wan_links(self, networks, tenant=None):
+        """Discover and return what are the wan_links that have to be created
+        considering a set of networks (VLDs) required for a scenario instance
+        (NSR).
+
+        Arguments:
+            networks(list): Dicts containing the information about the networks
+                that will be instantiated to materialize a Network Service
+                (scenario) instance.
+
+        Returns:
+            list: list of WAN links to be written to the database
+        """
+        # Group networks by key=(instance_scenario_id, sce_net_id)
+        filtered = _filter_multi_vim(networks)
+        grouped_networks = _group_networks(filtered)
+        datacenters_per_group = _count_datacenters(grouped_networks)
+        # For each group count the number of networks. If greater then 1,
+        # we have to create a wan link connecting them.
+        wan_groups = [key
+                      for key, counter in datacenters_per_group
+                      if counter > 1]
+
+        return [
+            self.derive_wan_link(key[0], key[1], grouped_networks[key], tenant)
+            for key in wan_groups
+        ]
+
+    def create_action(self, wan_link):
+        """For a single wan_link create the corresponding create action"""
+        return {
+            'action': 'CREATE',
+            'status': 'SCHEDULED',
+            'item': 'instance_wim_nets',
+            'item_id': wan_link['uuid'],
+            'wim_account_id': wan_link['wim_account_id']
+        }
+
+    def create_actions(self, wan_links):
+        """For an array of wan_links, create all the corresponding actions"""
+        return [self.create_action(l) for l in wan_links]
+
+    def delete_action(self, wan_link):
+        """For a single wan_link create the corresponding create action"""
+        return {
+            'action': 'DELETE',
+            'status': 'SCHEDULED',
+            'item': 'instance_wim_nets',
+            'item_id': wan_link['uuid'],
+            'wim_account_id': wan_link['wim_account_id'],
+            'extra': json.dumps({'wan_link': wan_link})
+            # We serialize and cache the wan_link here, because it can be
+            # deleted during the delete process
+        }
+
+    def delete_actions(self, wan_links=(), instance_scenario_id=None):
+        """Given a Instance Scenario, remove all the WAN Links created in the
+        past"""
+        if instance_scenario_id:
+            wan_links = self.persist.get_wan_links(
+                instance_scenario_id=instance_scenario_id)
+        return [self.delete_action(l) for l in wan_links]
+
+    def incorporate_actions(self, wim_actions, instance_action):
+        """Make the instance action consider new WIM actions and make the WIM
+        actions aware of the instance action
+        """
+        current = instance_action.setdefault('number_tasks', 0)
+        for i, action in enumerate(wim_actions):
+            action['task_index'] = current + i
+            action['instance_action_id'] = instance_action['uuid']
+        instance_action['number_tasks'] += len(wim_actions)
+
+        return wim_actions, instance_action
+
+    def dispatch(self, tasks):
+        """Enqueue a list of tasks for further processing.
+
+        This function is supposed to be called outside from the WIM Thread.
+        """
+        for task in tasks:
+            if task['wim_account_id'] not in self.threads:
+                error_msg = str(WimAccountNotActive(
+                    'Requests send to the WIM Account %s are not currently '
+                    'being processed.', task['wim_account_id']))
+                Action(task, self.logger).fail(self.persist, error_msg)
+                self.persist.update_wan_link(task['item_id'],
+                                             {'status': 'ERROR',
+                                              'error_msg': error_msg})
+                self.logger.error('Task %s %s %s not dispatched.\n%s',
+                                  task['action'], task['item'],
+                                  task['instance_account_id'], error_msg)
+            else:
+                self.threads[task['wim_account_id']].insert_task(task)
+                self.logger.debug('Task %s %s %s dispatched',
+                                  task['action'], task['item'],
+                                  task['instance_action_id'])
+
+    def _spawn_thread(self, wim_account):
+        """Spawn a WIM thread
+
+        Arguments:
+            wim_account (dict): WIM information (usually persisted)
+                The `wim` field is required to be set with a valid WIM record
+                inside the `wim_account` dict
+
+        Return:
+            threading.Thread: Thread object
+        """
+        thread = None
+        try:
+            thread = WimThread(self.persist, wim_account, ovim=self.ovim)
+            self.threads[wim_account['uuid']] = thread
+            thread.start()
+        except:  # noqa
+            self.logger.error('Error when spawning WIM thread for %s',
+                              wim_account['uuid'], exc_info=True)
+
+        return thread
+
+    def start_threads(self):
+        """Start the threads responsible for processing WIM Actions"""
+        accounts = self.persist.get_wim_accounts(error_if_none=False)
+        self.threads = remove_none_items(
+            {a['uuid']: self._spawn_thread(a) for a in accounts})
+
+    def stop_threads(self):
+        """Stop the threads responsible for processing WIM Actions"""
+        for uuid, thread in self.threads.items():
+            thread.exit()
+            del self.threads[uuid]
+
+    @contextmanager
+    def threads_running(self):
+        """Ensure no thread will be left running"""
+        # This method is particularly important for testing :)
+        try:
+            self.start_threads()
+            yield
+        finally:
+            self.stop_threads()
+
+
+def _filter_multi_vim(networks):
+    """Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
+    return [n for n in networks if 'sce_net_id' in n and n['sce_net_id']]
+
+
+def _group_networks(networks):
+    """Group networks that correspond to the same instance_scenario_id and
+    sce_net_id (NSR and VLD).
+
+    Arguments:
+        networks(list): Dicts containing the information about the networks
+            that will be instantiated to materialize a Network Service
+            (scenario) instance.
+    Returns:
+        dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
+            are lits of networks.
+    """
+    criteria = itemgetter('instance_scenario_id', 'sce_net_id')
+
+    networks = sorted(networks, key=criteria)
+    return {k: list(v) for k, v in groupby(networks, key=criteria)}
+
+
+def _count_datacenters(grouped_networks):
+    """Count the number of datacenters in each group of networks
+
+    Returns:
+        list of tuples: the first element is the group key, while the second
+            element is the number of datacenters in each group.
+    """
+    return ((key, len(set(n['datacenter_id'] for n in group)))
+            for key, group in grouped_networks.items())