--- /dev/null
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+
+"""This module contains the domain logic, and the implementation of the
+required steps to perform VNF management and orchestration in a WAN
+environment.
+
+It works as an extension/complement to the main functions contained in the
+``nfvo.py`` file and avoids interacting directly with the database, by relying
+on the `persistence` module.
+
+No http request handling/direct interaction with the database should be present
+in this file.
+"""
+import json
+import logging
+from contextlib import contextmanager
+from itertools import groupby
+from operator import itemgetter
+from sys import exc_info
+from uuid import uuid4
+
+from ..utils import remove_none_items
+from .actions import Action
+from .errors import (
+ DbBaseException,
+ NoWimConnectedToDatacenters,
+ UnexpectedDatabaseError,
+ WimAccountNotActive
+)
+from .wim_thread import WimThread
+
+
+class WimEngine(object):
+ """Logic supporting the establishment of WAN links when NS spans across
+ different datacenters.
+ """
+ def __init__(self, persistence, logger=None, ovim=None):
+ self.persist = persistence
+ self.logger = logger or logging.getLogger('openmano.wim.engine')
+ self.threads = {}
+ self.connectors = {}
+ self.ovim = ovim
+
+ def create_wim(self, properties):
+ """Create a new wim record according to the properties
+
+ Please check the wim schema to have more information about
+ ``properties``.
+
+ The ``config`` property might contain a ``wim_port_mapping`` dict,
+ In this case, the method ``create_wim_port_mappings`` will be
+ automatically invoked.
+
+ Returns:
+ str: uuid of the newly created WIM record
+ """
+ port_mapping = ((properties.get('config', {}) or {})
+ .pop('wim_port_mapping', {}))
+ uuid = self.persist.create_wim(properties)
+
+ if port_mapping:
+ try:
+ self.create_wim_port_mappings(uuid, port_mapping)
+ except DbBaseException as e:
+ # Rollback
+ self.delete_wim(uuid)
+ ex = UnexpectedDatabaseError('Failed to create port mappings'
+ 'Rolling back wim creation')
+ self.logger.exception(str(ex))
+ raise ex from e
+
+ return uuid
+
+ def get_wim(self, uuid_or_name, tenant_id=None):
+ """Retrieve existing WIM record by name or id.
+
+ If ``tenant_id`` is specified, the query will be
+ limited to the WIM associated to the given tenant.
+ """
+ # Since it is a pure DB operation, we can delegate it directly
+ return self.persist.get_wim(uuid_or_name, tenant_id)
+
+ def update_wim(self, uuid_or_name, properties):
+ """Edit an existing WIM record.
+
+ ``properties`` is a dictionary with the properties being changed,
+ if a property is not present, the old value will be preserved
+
+ Similarly to create_wim, the ``config`` property might contain a
+ ``wim_port_mapping`` dict, In this case, port mappings will be
+ automatically updated.
+ """
+ port_mapping = ((properties.get('config', {}) or {})
+ .pop('wim_port_mapping', {}))
+ orig_props = self.persist.get_by_name_or_uuid('wims', uuid_or_name)
+ uuid = orig_props['uuid']
+
+ response = self.persist.update_wim(uuid, properties)
+
+ if port_mapping:
+ try:
+ # It is very complex to diff and update individually all the
+ # port mappings. Therefore a practical approach is just delete
+ # and create it again.
+ self.persist.delete_wim_port_mappings(uuid)
+ # ^ Calling from persistence avoid reloading twice the thread
+ self.create_wim_port_mappings(uuid, port_mapping)
+ except DbBaseException as e:
+ # Rollback
+ self.update_wim(uuid_or_name, orig_props)
+ ex = UnexpectedDatabaseError('Failed to update port mappings'
+ 'Rolling back wim updates\n')
+ self.logger.exception(str(ex))
+ raise ex from e
+
+ return response
+
+ def delete_wim(self, uuid_or_name):
+ """Kill the corresponding wim threads and erase the WIM record"""
+ # Theoretically, we can rely on the database to drop the wim_accounts
+ # automatically, since we have configures 'ON CASCADE DELETE'.
+ # However, use use `delete_wim_accounts` to kill all the running
+ # threads.
+ self.delete_wim_accounts(uuid_or_name)
+ return self.persist.delete_wim(uuid_or_name)
+
+ def create_wim_account(self, wim, tenant, properties):
+ """Create an account that associates a tenant to a WIM.
+
+ As a side effect this function will spawn a new thread
+
+ Arguments:
+ wim (str): name or uuid of the WIM related to the account being
+ created
+ tenant (str): name or uuid of the nfvo tenant to which the account
+ will be created
+ properties (dict): properties of the account
+ (eg. username, password, ...)
+
+ Returns:
+ dict: Created record
+ """
+ uuid = self.persist.create_wim_account(wim, tenant, properties)
+ account = self.persist.get_wim_account_by(uuid=uuid)
+ # ^ We need to use get_wim_account_by here, since this methods returns
+ # all the associations, and we need the wim to create the thread
+ self._spawn_thread(account)
+ return account
+
+ def _update_single_wim_account(self, account, properties):
+ """Update WIM Account, taking care to reload the corresponding thread
+
+ Arguments:
+ account (dict): Current account record
+ properties (dict): Properties to be updated
+
+ Returns:
+ dict: updated record
+ """
+ account = self.persist.update_wim_account(account['uuid'], properties)
+ self.threads[account['uuid']].reload()
+ return account
+
+ def update_wim_accounts(self, wim, tenant, properties):
+ """Update all the accounts related to a WIM and a tenant,
+ thanking care of reloading threads.
+
+ Arguments:
+ wim (str): uuid or name of a WIM record
+ tenant (str): uuid or name of a NFVO tenant record
+ properties (dict): attributes with values to be updated
+
+ Returns
+ list: Records that were updated
+ """
+ accounts = self.persist.get_wim_accounts_by(wim, tenant)
+ return [self._update_single_wim_account(account, properties)
+ for account in accounts]
+
+ def _delete_single_wim_account(self, account):
+ """Delete WIM Account, taking care to remove the corresponding thread
+ and delete the internal WIM account, if it was automatically generated.
+
+ Arguments:
+ account (dict): Current account record
+ properties (dict): Properties to be updated
+
+ Returns:
+ dict: current record (same as input)
+ """
+ self.persist.delete_wim_account(account['uuid'])
+
+ if account['uuid'] not in self.threads:
+ raise WimAccountNotActive(
+ 'Requests send to the WIM Account %s are not currently '
+ 'being processed.', account['uuid'])
+ else:
+ self.threads[account['uuid']].exit()
+ del self.threads[account['uuid']]
+
+ return account
+
+ def delete_wim_accounts(self, wim, tenant=None, **kwargs):
+ """Delete all the accounts related to a WIM (and a tenant),
+ thanking care of threads and internal WIM accounts.
+
+ Arguments:
+ wim (str): uuid or name of a WIM record
+ tenant (str): uuid or name of a NFVO tenant record
+
+ Returns
+ list: Records that were deleted
+ """
+ kwargs.setdefault('error_if_none', False)
+ accounts = self.persist.get_wim_accounts_by(wim, tenant, **kwargs)
+ return [self._delete_single_wim_account(a) for a in accounts]
+
+ def _reload_wim_threads(self, wim_id):
+ for thread in self.threads.values():
+ if thread.wim_account['wim_id'] == wim_id:
+ thread.reload()
+
+ def create_wim_port_mappings(self, wim, properties, tenant=None):
+ """Store information about port mappings from Database"""
+ # TODO: Review tenants... WIMs can exist across different tenants,
+ # and the port_mappings are a WIM property, not a wim_account
+ # property, so the concepts are not related
+ wim = self.persist.get_by_name_or_uuid('wims', wim)
+ result = self.persist.create_wim_port_mappings(wim, properties, tenant)
+ self._reload_wim_threads(wim['uuid'])
+ return result
+
+ def get_wim_port_mappings(self, wim):
+ """Retrive information about port mappings from Database"""
+ return self.persist.get_wim_port_mappings(wim)
+
+ def delete_wim_port_mappings(self, wim):
+ """Erase the port mapping records associated with the WIM"""
+ wim = self.persist.get_by_name_or_uuid('wims', wim)
+ message = self.persist.delete_wim_port_mappings(wim['uuid'])
+ self._reload_wim_threads(wim['uuid'])
+ return message
+
+ def find_common_wims(self, datacenter_ids, tenant):
+ """Find WIMs that are common to all datacenters listed"""
+ mappings = self.persist.get_wim_port_mappings(
+ datacenter=datacenter_ids, tenant=tenant, error_if_none=False)
+
+ wim_id_of = itemgetter('wim_id')
+ sorted_mappings = sorted(mappings, key=wim_id_of) # needed by groupby
+ grouped_mappings = groupby(sorted_mappings, key=wim_id_of)
+ mapped_datacenters = {
+ wim_id: [m['datacenter_id'] for m in mappings]
+ for wim_id, mappings in grouped_mappings
+ }
+
+ return [
+ wim_id
+ for wim_id, connected_datacenters in mapped_datacenters.items()
+ if set(connected_datacenters) >= set(datacenter_ids)
+ ]
+
+ def find_common_wim(self, datacenter_ids, tenant):
+ """Find a single WIM that is able to connect all the datacenters
+ listed
+
+ Raises:
+ NoWimConnectedToDatacenters: if no WIM connected to all datacenters
+ at once is found
+ """
+ suitable_wim_ids = self.find_common_wims(datacenter_ids, tenant)
+
+ if not suitable_wim_ids:
+ raise NoWimConnectedToDatacenters(datacenter_ids)
+
+ # TODO: use a criteria to determine which WIM is going to be used,
+ # instead of always using the first one (strategy pattern can be
+ # used here)
+ return suitable_wim_ids[0]
+
+ def find_suitable_wim_account(self, datacenter_ids, tenant):
+ """Find a WIM account that is able to connect all the datacenters
+ listed
+
+ Arguments:
+ datacenter_ids (list): List of UUIDs of all the datacenters (vims),
+ that need to be connected.
+ tenant (str): UUID of the OSM tenant
+
+ Returns:
+ object with the WIM account that is able to connect all the
+ datacenters.
+ """
+ wim_id = self.find_common_wim(datacenter_ids, tenant)
+ return self.persist.get_wim_account_by(wim_id, tenant)
+
+ def derive_wan_link(self,
+ wim_usage,
+ instance_scenario_id, sce_net_id,
+ networks, tenant, related=None):
+ """Create a instance_wim_nets record for the given information"""
+ if sce_net_id in wim_usage:
+ account_id = wim_usage[sce_net_id]
+ account = self.persist.get_wim_account_by(uuid=account_id)
+ wim_id = account['wim_id']
+ else:
+ datacenters = [n['datacenter_id'] for n in networks]
+ wim_id = self.find_common_wim(datacenters, tenant)
+ account = self.persist.get_wim_account_by(wim_id, tenant)
+
+ return {
+ 'uuid': str(uuid4()),
+ 'instance_scenario_id': instance_scenario_id,
+ 'sce_net_id': sce_net_id,
+ 'wim_id': wim_id,
+ 'wim_account_id': account['uuid'],
+ 'related': related
+ }
+
+ def derive_wan_links(self, wim_usage, networks, tenant=None):
+ """Discover and return what are the wan_links that have to be created
+ considering a set of networks (VLDs) required for a scenario instance
+ (NSR).
+
+ Arguments:
+ wim_usage(dict): Mapping between sce_net_id and wim_id. If wim_id is False, means not create wam_links
+ networks(list): Dicts containing the information about the networks
+ that will be instantiated to materialize a Network Service
+ (scenario) instance.
+ Corresponding to the ``instance_net`` record.
+
+ Returns:
+ list: list of WAN links to be written to the database
+ """
+ # Group networks by key=(instance_scenario_id, sce_net_id)
+ related = None
+ if networks:
+ related = networks[0].get("related")
+ filtered = _filter_multi_vim(networks)
+ grouped_networks = _group_networks(filtered)
+ datacenters_per_group = _count_datacenters(grouped_networks)
+ # For each group count the number of networks. If greater then 1,
+ # we have to create a wan link connecting them.
+ wan_groups = [key
+ for key, counter in datacenters_per_group
+ if counter > 1]
+ # Keys are tuples(instance_scenario_id, sce_net_id)
+ return [
+ self.derive_wan_link(wim_usage,
+ key[0], key[1], grouped_networks[key], tenant, related)
+ for key in wan_groups if wim_usage.get(key[1]) is not False
+ ]
+
+ def create_action(self, wan_link):
+ """For a single wan_link create the corresponding create action"""
+ return {
+ 'action': 'CREATE',
+ 'status': 'SCHEDULED',
+ 'item': 'instance_wim_nets',
+ 'item_id': wan_link['uuid'],
+ 'wim_account_id': wan_link['wim_account_id']
+ }
+
+ def create_actions(self, wan_links):
+ """For an array of wan_links, create all the corresponding actions"""
+ return [self.create_action(l) for l in wan_links]
+
+ def delete_action(self, wan_link):
+ """For a single wan_link create the corresponding create action"""
+ return {
+ 'action': 'DELETE',
+ 'status': 'SCHEDULED',
+ 'item': 'instance_wim_nets',
+ 'item_id': wan_link['uuid'],
+ 'wim_account_id': wan_link['wim_account_id'],
+ 'extra': json.dumps({'wan_link': wan_link})
+ # We serialize and cache the wan_link here, because it can be
+ # deleted during the delete process
+ }
+
+ def delete_actions(self, wan_links=(), instance_scenario_id=None):
+ """Given a Instance Scenario, remove all the WAN Links created in the
+ past"""
+ if instance_scenario_id:
+ wan_links = self.persist.get_wan_links(
+ instance_scenario_id=instance_scenario_id)
+ return [self.delete_action(l) for l in wan_links]
+
+ def incorporate_actions(self, wim_actions, instance_action):
+ """Make the instance action consider new WIM actions and make the WIM
+ actions aware of the instance action
+ """
+ current = instance_action.setdefault('number_tasks', 0)
+ for i, action in enumerate(wim_actions):
+ action['task_index'] = current + i
+ action['instance_action_id'] = instance_action['uuid']
+ instance_action['number_tasks'] += len(wim_actions)
+
+ return wim_actions, instance_action
+
+ def dispatch(self, tasks):
+ """Enqueue a list of tasks for further processing.
+
+ This function is supposed to be called outside from the WIM Thread.
+ """
+ for task in tasks:
+ if task['wim_account_id'] not in self.threads:
+ error_msg = str(WimAccountNotActive(
+ 'Requests send to the WIM Account %s are not currently '
+ 'being processed.', task['wim_account_id']))
+ Action(task, self.logger).fail(self.persist, error_msg)
+ self.persist.update_wan_link(task['item_id'],
+ {'status': 'ERROR',
+ 'error_msg': error_msg})
+ self.logger.error('Task %s %s %s not dispatched.\n%s',
+ task['action'], task['item'],
+ task['instance_account_id'], error_msg)
+ else:
+ self.threads[task['wim_account_id']].insert_task(task)
+ self.logger.debug('Task %s %s %s dispatched',
+ task['action'], task['item'],
+ task['instance_action_id'])
+
+ def _spawn_thread(self, wim_account):
+ """Spawn a WIM thread
+
+ Arguments:
+ wim_account (dict): WIM information (usually persisted)
+ The `wim` field is required to be set with a valid WIM record
+ inside the `wim_account` dict
+
+ Return:
+ threading.Thread: Thread object
+ """
+ thread = None
+ try:
+ thread = WimThread(self.persist, wim_account, ovim=self.ovim)
+ self.threads[wim_account['uuid']] = thread
+ thread.start()
+ except: # noqa
+ self.logger.error('Error when spawning WIM thread for %s',
+ wim_account['uuid'], exc_info=True)
+
+ return thread
+
+ def start_threads(self):
+ """Start the threads responsible for processing WIM Actions"""
+ accounts = self.persist.get_wim_accounts(error_if_none=False)
+ self.threads = remove_none_items(
+ {a['uuid']: self._spawn_thread(a) for a in accounts})
+
+ def stop_threads(self):
+ """Stop the threads responsible for processing WIM Actions"""
+ for uuid, thread in self.threads.items():
+ thread.exit()
+ self.threads.clear()
+
+ @contextmanager
+ def threads_running(self):
+ """Ensure no thread will be left running"""
+ # This method is particularly important for testing :)
+ try:
+ self.start_threads()
+ yield
+ finally:
+ self.stop_threads()
+
+
+def _filter_multi_vim(networks):
+ """Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
+ return [n for n in networks if 'sce_net_id' in n and n['sce_net_id']]
+
+
+def _group_networks(networks):
+ """Group networks that correspond to the same instance_scenario_id and
+ sce_net_id (NSR and VLD).
+
+ Arguments:
+ networks(list): Dicts containing the information about the networks
+ that will be instantiated to materialize a Network Service
+ (scenario) instance.
+ Returns:
+ dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
+ are list of networks.
+ """
+ criteria = itemgetter('instance_scenario_id', 'sce_net_id')
+
+ networks = sorted(networks, key=criteria)
+ return {k: list(v) for k, v in groupby(networks, key=criteria)}
+
+
+def _count_datacenters(grouped_networks):
+ """Count the number of datacenters in each group of networks
+
+ Returns:
+ list of tuples: the first element is the group key, while the second
+ element is the number of datacenters in each group.
+ """
+ return ((key, len(set(n['datacenter_id'] for n in group)))
+ for key, group in grouped_networks.items())