Revert "Removing deprecated/unused/outdated code"
[osm/RO.git] / RO / osm_ro / wim / engine.py
diff --git a/RO/osm_ro/wim/engine.py b/RO/osm_ro/wim/engine.py
new file mode 100644 (file)
index 0000000..ec01b23
--- /dev/null
@@ -0,0 +1,555 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+
+"""This module contains the domain logic, and the implementation of the
+required steps to perform VNF management and orchestration in a WAN
+environment.
+
+It works as an extension/complement to the main functions contained in the
+``nfvo.py`` file and avoids interacting directly with the database, by relying
+on the `persistence` module.
+
+No http request handling/direct interaction with the database should be present
+in this file.
+"""
+import json
+import logging
+from contextlib import contextmanager
+from itertools import groupby
+from operator import itemgetter
+# from sys import exc_info
+from uuid import uuid4
+
+from ..utils import remove_none_items
+from .actions import Action
+from .errors import (
+    DbBaseException,
+    NoWimConnectedToDatacenters,
+    UnexpectedDatabaseError,
+    WimAccountNotActive,
+    UndefinedWimConnector
+)
+from .wim_thread import WimThread
+# from ..http_tools.errors import Bad_Request
+from pkg_resources import iter_entry_points
+
+
+class WimEngine(object):
+    """Logic supporting the establishment of WAN links when NS spans across
+    different datacenters.
+    """
+    def __init__(self, persistence, plugins, logger=None, ovim=None):
+        self.persist = persistence
+        self.plugins = plugins if plugins is not None else {}
+        self.logger = logger or logging.getLogger('openmano.wim.engine')
+        self.threads = {}
+        self.connectors = {}
+        self.ovim = ovim
+
+    def _load_plugin(self, name, type="sdn"):
+        # type can be vim or sdn
+        for v in iter_entry_points('osm_ro{}.plugins'.format(type), name):
+            self.plugins[name] = v.load()
+        if name and name not in self.plugins:
+            raise UndefinedWimConnector(type, name)
+
+    def create_wim(self, properties):
+        """Create a new wim record according to the properties
+
+        Please check the wim schema to have more information about
+        ``properties``.
+
+        The ``config`` property might contain a ``wim_port_mapping`` dict,
+        In this case, the method ``create_wim_port_mappings`` will be
+        automatically invoked.
+
+        Returns:
+            str: uuid of the newly created WIM record
+        """
+        port_mapping = ((properties.get('config', {}) or {})
+                        .pop('wim_port_mapping', {}))
+        plugin_name = "rosdn_" + properties["type"]
+        if plugin_name not in self.plugins:
+            self._load_plugin(plugin_name, type="sdn")
+
+        uuid = self.persist.create_wim(properties)
+
+        if port_mapping:
+            try:
+                self.create_wim_port_mappings(uuid, port_mapping)
+            except DbBaseException as e:
+                # Rollback
+                self.delete_wim(uuid)
+                ex = UnexpectedDatabaseError('Failed to create port mappings'
+                                             'Rolling back wim creation')
+                self.logger.exception(str(ex))
+                raise ex from e
+
+        return uuid
+
+    def get_wim(self, uuid_or_name, tenant_id=None):
+        """Retrieve existing WIM record by name or id.
+
+        If ``tenant_id`` is specified, the query will be
+        limited to the WIM associated to the given tenant.
+        """
+        # Since it is a pure DB operation, we can delegate it directly
+        return self.persist.get_wim(uuid_or_name, tenant_id)
+
+    def update_wim(self, uuid_or_name, properties):
+        """Edit an existing WIM record.
+
+        ``properties`` is a dictionary with the properties being changed,
+        if a property is not present, the old value will be preserved
+
+        Similarly to create_wim, the ``config`` property might contain a
+        ``wim_port_mapping`` dict, In this case, port mappings will be
+        automatically updated.
+        """
+        port_mapping = ((properties.get('config', {}) or {})
+                        .pop('wim_port_mapping', {}))
+        orig_props = self.persist.get_by_name_or_uuid('wims', uuid_or_name)
+        uuid = orig_props['uuid']
+
+        response = self.persist.update_wim(uuid, properties)
+
+        if port_mapping:
+            try:
+                # It is very complex to diff and update individually all the
+                # port mappings. Therefore a practical approach is just delete
+                # and create it again.
+                self.persist.delete_wim_port_mappings(uuid)
+                # ^  Calling from persistence avoid reloading twice the thread
+                self.create_wim_port_mappings(uuid, port_mapping)
+            except DbBaseException as e:
+                # Rollback
+                self.update_wim(uuid_or_name, orig_props)
+                ex = UnexpectedDatabaseError('Failed to update port mappings'
+                                             'Rolling back wim updates\n')
+                self.logger.exception(str(ex))
+                raise ex from e
+
+        return response
+
+    def delete_wim(self, uuid_or_name):
+        """Kill the corresponding wim threads and erase the WIM record"""
+        # Theoretically, we can rely on the database to drop the wim_accounts
+        # automatically, since we have configures 'ON CASCADE DELETE'.
+        # However, use use `delete_wim_accounts` to kill all the running
+        # threads.
+        self.delete_wim_accounts(uuid_or_name)
+        return self.persist.delete_wim(uuid_or_name)
+
+    def create_wim_account(self, wim, tenant, properties):
+        """Create an account that associates a tenant to a WIM.
+
+        As a side effect this function will spawn a new thread
+
+        Arguments:
+            wim (str): name or uuid of the WIM related to the account being
+                created
+            tenant (str): name or uuid of the nfvo tenant to which the account
+                will be created
+            properties (dict): properties of the account
+                (eg. username, password, ...)
+
+        Returns:
+            dict: Created record
+        """
+        uuid = self.persist.create_wim_account(wim, tenant, properties)
+        account = self.persist.get_wim_account_by(uuid=uuid)
+        # ^  We need to use get_wim_account_by here, since this methods returns
+        #    all the associations, and we need the wim to create the thread
+        self._spawn_thread(account)
+        return account
+
+    def _update_single_wim_account(self, account, properties):
+        """Update WIM Account, taking care to reload the corresponding thread
+
+        Arguments:
+            account (dict): Current account record
+            properties (dict): Properties to be updated
+
+        Returns:
+            dict: updated record
+        """
+        account = self.persist.update_wim_account(account['uuid'], properties)
+        self.threads[account['uuid']].reload()
+        return account
+
+    def update_wim_accounts(self, wim, tenant, properties):
+        """Update all the accounts related to a WIM and a tenant,
+        thanking care of reloading threads.
+
+        Arguments:
+            wim (str): uuid or name of a WIM record
+            tenant (str): uuid or name of a NFVO tenant record
+            properties (dict): attributes with values to be updated
+
+        Returns
+            list: Records that were updated
+        """
+        accounts = self.persist.get_wim_accounts_by(wim, tenant)
+        return [self._update_single_wim_account(account, properties)
+                for account in accounts]
+
+    def _delete_single_wim_account(self, account):
+        """Delete WIM Account, taking care to remove the corresponding thread
+        and delete the internal WIM account, if it was automatically generated.
+
+        Arguments:
+            account (dict): Current account record
+            properties (dict): Properties to be updated
+
+        Returns:
+            dict: current record (same as input)
+        """
+        self.persist.delete_wim_account(account['uuid'])
+
+        if account['uuid'] not in self.threads:
+            raise WimAccountNotActive(
+                'Requests send to the WIM Account %s are not currently '
+                'being processed.', account['uuid'])
+        else:
+            self.threads[account['uuid']].exit()
+            del self.threads[account['uuid']]
+
+        return account
+
+    def delete_wim_accounts(self, wim, tenant=None, **kwargs):
+        """Delete all the accounts related to a WIM (and a tenant),
+        thanking care of threads and internal WIM accounts.
+
+        Arguments:
+            wim (str): uuid or name of a WIM record
+            tenant (str): uuid or name of a NFVO tenant record
+
+        Returns
+            list: Records that were deleted
+        """
+        kwargs.setdefault('error_if_none', False)
+        accounts = self.persist.get_wim_accounts_by(wim, tenant, **kwargs)
+        return [self._delete_single_wim_account(a) for a in accounts]
+
+    def _reload_wim_threads(self, wim_id):
+        for thread in self.threads.values():
+            if thread.wim_account['wim_id'] == wim_id:
+                thread.reload()
+
+    def create_wim_port_mappings(self, wim, properties, tenant=None):
+        """Store information about port mappings from Database"""
+        # TODO: Review tenants... WIMs can exist across different tenants,
+        #       and the port_mappings are a WIM property, not a wim_account
+        #       property, so the concepts are not related
+        wim = self.persist.get_by_name_or_uuid('wims', wim)
+        result = self.persist.create_wim_port_mappings(wim, properties, tenant)
+        self._reload_wim_threads(wim['uuid'])
+        return result
+
+    def get_wim_port_mappings(self, wim):
+        """Retrive information about port mappings from Database"""
+        return self.persist.get_wim_port_mappings(wim)
+
+    def delete_wim_port_mappings(self, wim):
+        """Erase the port mapping records associated with the WIM"""
+        wim = self.persist.get_by_name_or_uuid('wims', wim)
+        message = self.persist.delete_wim_port_mappings(wim['uuid'])
+        self._reload_wim_threads(wim['uuid'])
+        return message
+
+    def find_common_wims(self, datacenter_ids, tenant):
+        """Find WIMs that are common to all datacenters listed"""
+        mappings = self.persist.get_wim_port_mappings(
+            datacenter=datacenter_ids, tenant=tenant, error_if_none=False)
+
+        wim_id_of = itemgetter('wim_id')
+        sorted_mappings = sorted(mappings, key=wim_id_of)  # needed by groupby
+        grouped_mappings = groupby(sorted_mappings, key=wim_id_of)
+        mapped_datacenters = {
+            wim_id: [m['datacenter_id'] for m in mappings]
+            for wim_id, mappings in grouped_mappings
+        }
+
+        return [
+            wim_id
+            for wim_id, connected_datacenters in mapped_datacenters.items()
+            if set(connected_datacenters) >= set(datacenter_ids)
+        ]
+
+    def find_common_wim(self, datacenter_ids, tenant):
+        """Find a single WIM that is able to connect all the datacenters
+        listed
+
+        Raises:
+            NoWimConnectedToDatacenters: if no WIM connected to all datacenters
+                at once is found
+        """
+        suitable_wim_ids = self.find_common_wims(datacenter_ids, tenant)
+
+        if not suitable_wim_ids:
+            raise NoWimConnectedToDatacenters(datacenter_ids)
+
+        # TODO: use a criteria to determine which WIM is going to be used,
+        #       instead of always using the first one (strategy pattern can be
+        #       used here)
+        return suitable_wim_ids[0]
+
+    def find_suitable_wim_account(self, datacenter_ids, tenant):
+        """Find a WIM account that is able to connect all the datacenters
+        listed
+
+        Arguments:
+            datacenter_ids (list): List of UUIDs of all the datacenters (vims),
+                that need to be connected.
+            tenant (str): UUID of the OSM tenant
+
+        Returns:
+            object with the WIM account that is able to connect all the
+                 datacenters.
+        """
+        wim_id = self.find_common_wim(datacenter_ids, tenant)
+        return self.persist.get_wim_account_by(wim_id, tenant)
+
+    def derive_wan_link(self,
+                        wim_usage,
+                        instance_scenario_id, sce_net_id,
+                        networks, tenant, related=None):
+        """Create a instance_wim_nets record for the given information"""
+        if sce_net_id in wim_usage:
+            account_id = wim_usage[sce_net_id]
+            account = self.persist.get_wim_account_by(uuid=account_id)
+            wim_id = account['wim_id']
+        else:
+            datacenters = [n['datacenter_id'] for n in networks]
+            wim_id = self.find_common_wim(datacenters, tenant)
+            account = self.persist.get_wim_account_by(wim_id, tenant)
+
+        return {
+            'uuid': str(uuid4()),
+            'instance_scenario_id': instance_scenario_id,
+            'sce_net_id': sce_net_id,
+            'wim_id': wim_id,
+            'wim_account_id': account['uuid'],
+            'related': related
+        }
+
+    def derive_wan_links(self, wim_usage, networks, tenant=None):
+        """Discover and return what are the wan_links that have to be created
+        considering a set of networks (VLDs) required for a scenario instance
+        (NSR).
+
+        Arguments:
+            wim_usage(dict): Mapping between sce_net_id and wim_id. If wim_id is False, means not create wam_links
+            networks(list): Dicts containing the information about the networks
+                that will be instantiated to materialize a Network Service
+                (scenario) instance.
+                Corresponding to the ``instance_net`` record.
+
+        Returns:
+            list: list of WAN links to be written to the database
+        """
+        # Group networks by key=(instance_scenario_id, sce_net_id)
+        related = None
+        if networks:
+            related = networks[0].get("related")
+        filtered = _filter_multi_vim(networks)
+        grouped_networks = _group_networks(filtered)
+        datacenters_per_group = _count_datacenters(grouped_networks)
+        # For each group count the number of networks. If greater then 1,
+        # we have to create a wan link connecting them.
+        wan_groups = [key
+                      for key, counter in datacenters_per_group
+                      if counter > 1]
+        # Keys are tuples(instance_scenario_id, sce_net_id)
+        return [
+            self.derive_wan_link(wim_usage,
+                                 key[0], key[1], grouped_networks[key], tenant, related)
+            for key in wan_groups if wim_usage.get(key[1]) is not False
+        ]
+
+    def create_action(self, wan_link):
+        """For a single wan_link create the corresponding create action"""
+        return {
+            'action': 'CREATE',
+            'status': 'SCHEDULED',
+            'item': 'instance_wim_nets',
+            'item_id': wan_link['uuid'],
+            'wim_account_id': wan_link['wim_account_id']
+        }
+
+    def create_actions(self, wan_links):
+        """For an array of wan_links, create all the corresponding actions"""
+        return [self.create_action(li) for li in wan_links]
+
+    def delete_action(self, wan_link):
+        """For a single wan_link create the corresponding create action"""
+        return {
+            'action': 'DELETE',
+            'status': 'SCHEDULED',
+            'item': 'instance_wim_nets',
+            'item_id': wan_link['uuid'],
+            'wim_account_id': wan_link['wim_account_id'],
+            'extra': json.dumps({'wan_link': wan_link})
+            # We serialize and cache the wan_link here, because it can be
+            # deleted during the delete process
+        }
+
+    def delete_actions(self, wan_links=(), instance_scenario_id=None):
+        """Given a Instance Scenario, remove all the WAN Links created in the
+        past"""
+        if instance_scenario_id:
+            wan_links = self.persist.get_wan_links(
+                instance_scenario_id=instance_scenario_id, sdn='false')
+        return [self.delete_action(li) for li in wan_links]
+
+    def incorporate_actions(self, wim_actions, instance_action):
+        """Make the instance action consider new WIM actions and make the WIM
+        actions aware of the instance action
+        """
+        current = instance_action.setdefault('number_tasks', 0)
+        for i, action in enumerate(wim_actions):
+            action['task_index'] = current + i
+            action['instance_action_id'] = instance_action['uuid']
+        instance_action['number_tasks'] += len(wim_actions)
+
+        return wim_actions, instance_action
+
+    def dispatch(self, tasks):
+        """Enqueue a list of tasks for further processing.
+
+        This function is supposed to be called outside from the WIM Thread.
+        """
+        for task in tasks:
+            if task['wim_account_id'] not in self.threads:
+                error_msg = str(WimAccountNotActive(
+                    'Requests send to the WIM Account %s are not currently '
+                    'being processed.', task['wim_account_id']))
+                Action(task, self.logger).fail(self.persist, error_msg)
+                self.persist.update_wan_link(task['item_id'],
+                                             {'status': 'ERROR',
+                                              'error_msg': error_msg})
+                self.logger.error('Task %s %s %s not dispatched.\n%s',
+                                  task['action'], task['item'],
+                                  task['instance_account_id'], error_msg)
+            else:
+                self.threads[task['wim_account_id']].insert_task(task)
+                self.logger.debug('Task %s %s %s dispatched',
+                                  task['action'], task['item'],
+                                  task['instance_action_id'])
+
+    def _spawn_thread(self, wim_account):
+        """Spawn a WIM thread
+
+        Arguments:
+            wim_account (dict): WIM information (usually persisted)
+                The `wim` field is required to be set with a valid WIM record
+                inside the `wim_account` dict
+
+        Return:
+            threading.Thread: Thread object
+        """
+        thread = None
+        try:
+            thread = WimThread(self.persist, self.plugins, wim_account, ovim=self.ovim)
+            self.threads[wim_account['uuid']] = thread
+            thread.start()
+        except:  # noqa
+            self.logger.error('Error when spawning WIM thread for %s',
+                              wim_account['uuid'], exc_info=True)
+
+        return thread
+
+    def start_threads(self):
+        """Start the threads responsible for processing WIM Actions"""
+        accounts = self.persist.get_wim_accounts(error_if_none=False)
+        thread_dict = {}
+        for account in accounts:
+            try:
+                plugin_name = "rosdn_" + account["wim"]["type"]
+                if plugin_name not in self.plugins:
+                    self._load_plugin(plugin_name, type="sdn")
+                thread_dict[account["uuid"]] = self._spawn_thread(account)
+            except UndefinedWimConnector as e:
+                self.logger.error(e)
+        self.threads = remove_none_items(thread_dict)
+
+    def stop_threads(self):
+        """Stop the threads responsible for processing WIM Actions"""
+        for uuid, thread in self.threads.items():
+            thread.exit()
+        self.threads.clear()
+
+    @contextmanager
+    def threads_running(self):
+        """Ensure no thread will be left running"""
+        # This method is particularly important for testing :)
+        try:
+            self.start_threads()
+            yield
+        finally:
+            self.stop_threads()
+
+
+def _filter_multi_vim(networks):
+    """Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
+    return [n for n in networks if 'sce_net_id' in n and n['sce_net_id']]
+
+
+def _group_networks(networks):
+    """Group networks that correspond to the same instance_scenario_id and
+    sce_net_id (NSR and VLD).
+
+    Arguments:
+        networks(list): Dicts containing the information about the networks
+            that will be instantiated to materialize a Network Service
+            (scenario) instance.
+    Returns:
+        dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
+            are list of networks.
+    """
+    criteria = itemgetter('instance_scenario_id', 'sce_net_id')
+
+    networks = sorted(networks, key=criteria)
+    return {k: list(v) for k, v in groupby(networks, key=criteria)}
+
+
+def _count_datacenters(grouped_networks):
+    """Count the number of datacenters in each group of networks
+
+    Returns:
+        list of tuples: the first element is the group key, while the second
+            element is the number of datacenters in each group.
+    """
+    return ((key, len(set(n['datacenter_id'] for n in group)))
+            for key, group in grouped_networks.items())