blob: 6a232a452586317b1632df2e01d2a346ef3f7c74 [file] [log] [blame]
# -*- coding: utf-8 -*-
##
# Copyright 2018 University of Bristol - High Performance Networks Research
# Group
# All Rights Reserved.
#
# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact with: <highperformance-networks@bristol.ac.uk>
#
# Neither the name of the University of Bristol nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# This work has been performed in the context of DCMS UK 5G Testbeds
# & Trials Programme and in the framework of the Metro-Haul project -
# funded by the European Commission under Grant number 761727 through the
# Horizon 2020 and 5G-PPP programmes.
##
"""This module contains the domain logic, and the implementation of the
required steps to perform VNF management and orchestration in a WAN
environment.
It works as an extension/complement to the main functions contained in the
``nfvo.py`` file and avoids interacting directly with the database, by relying
on the `persistence` module.
No http request handling/direct interaction with the database should be present
in this file.
"""
import json
import logging
from contextlib import contextmanager
from itertools import groupby
from operator import itemgetter
# from sys import exc_info
from uuid import uuid4
from ..utils import remove_none_items
from .actions import Action
from .errors import (
DbBaseException,
NoWimConnectedToDatacenters,
UnexpectedDatabaseError,
WimAccountNotActive,
UndefinedWimConnector
)
from .wim_thread import WimThread
# from ..http_tools.errors import Bad_Request
from pkg_resources import iter_entry_points
class WimEngine(object):
"""Logic supporting the establishment of WAN links when NS spans across
different datacenters.
"""
def __init__(self, persistence, plugins, logger=None, ovim=None):
self.persist = persistence
self.plugins = plugins if plugins is not None else {}
self.logger = logger or logging.getLogger('openmano.wim.engine')
self.threads = {}
self.connectors = {}
self.ovim = ovim
def _load_plugin(self, name, type="sdn"):
# type can be vim or sdn
for v in iter_entry_points('osm_ro{}.plugins'.format(type), name):
self.plugins[name] = v.load()
if name and name not in self.plugins:
raise UndefinedWimConnector(type, name)
def create_wim(self, properties):
"""Create a new wim record according to the properties
Please check the wim schema to have more information about
``properties``.
The ``config`` property might contain a ``wim_port_mapping`` dict,
In this case, the method ``create_wim_port_mappings`` will be
automatically invoked.
Returns:
str: uuid of the newly created WIM record
"""
port_mapping = ((properties.get('config', {}) or {})
.pop('wim_port_mapping', {}))
plugin_name = "rosdn_" + properties["type"]
if plugin_name not in self.plugins:
self._load_plugin(plugin_name, type="sdn")
uuid = self.persist.create_wim(properties)
if port_mapping:
try:
self.create_wim_port_mappings(uuid, port_mapping)
except DbBaseException as e:
# Rollback
self.delete_wim(uuid)
ex = UnexpectedDatabaseError('Failed to create port mappings'
'Rolling back wim creation')
self.logger.exception(str(ex))
raise ex from e
return uuid
def get_wim(self, uuid_or_name, tenant_id=None):
"""Retrieve existing WIM record by name or id.
If ``tenant_id`` is specified, the query will be
limited to the WIM associated to the given tenant.
"""
# Since it is a pure DB operation, we can delegate it directly
return self.persist.get_wim(uuid_or_name, tenant_id)
def update_wim(self, uuid_or_name, properties):
"""Edit an existing WIM record.
``properties`` is a dictionary with the properties being changed,
if a property is not present, the old value will be preserved
Similarly to create_wim, the ``config`` property might contain a
``wim_port_mapping`` dict, In this case, port mappings will be
automatically updated.
"""
port_mapping = ((properties.get('config', {}) or {})
.pop('wim_port_mapping', {}))
orig_props = self.persist.get_by_name_or_uuid('wims', uuid_or_name)
uuid = orig_props['uuid']
response = self.persist.update_wim(uuid, properties)
if port_mapping:
try:
# It is very complex to diff and update individually all the
# port mappings. Therefore a practical approach is just delete
# and create it again.
self.persist.delete_wim_port_mappings(uuid)
# ^ Calling from persistence avoid reloading twice the thread
self.create_wim_port_mappings(uuid, port_mapping)
except DbBaseException as e:
# Rollback
self.update_wim(uuid_or_name, orig_props)
ex = UnexpectedDatabaseError('Failed to update port mappings'
'Rolling back wim updates\n')
self.logger.exception(str(ex))
raise ex from e
return response
def delete_wim(self, uuid_or_name):
"""Kill the corresponding wim threads and erase the WIM record"""
# Theoretically, we can rely on the database to drop the wim_accounts
# automatically, since we have configures 'ON CASCADE DELETE'.
# However, use use `delete_wim_accounts` to kill all the running
# threads.
self.delete_wim_accounts(uuid_or_name)
return self.persist.delete_wim(uuid_or_name)
def create_wim_account(self, wim, tenant, properties):
"""Create an account that associates a tenant to a WIM.
As a side effect this function will spawn a new thread
Arguments:
wim (str): name or uuid of the WIM related to the account being
created
tenant (str): name or uuid of the nfvo tenant to which the account
will be created
properties (dict): properties of the account
(eg. username, password, ...)
Returns:
dict: Created record
"""
uuid = self.persist.create_wim_account(wim, tenant, properties)
account = self.persist.get_wim_account_by(uuid=uuid)
# ^ We need to use get_wim_account_by here, since this methods returns
# all the associations, and we need the wim to create the thread
self._spawn_thread(account)
return account
def _update_single_wim_account(self, account, properties):
"""Update WIM Account, taking care to reload the corresponding thread
Arguments:
account (dict): Current account record
properties (dict): Properties to be updated
Returns:
dict: updated record
"""
account = self.persist.update_wim_account(account['uuid'], properties)
self.threads[account['uuid']].reload()
return account
def update_wim_accounts(self, wim, tenant, properties):
"""Update all the accounts related to a WIM and a tenant,
thanking care of reloading threads.
Arguments:
wim (str): uuid or name of a WIM record
tenant (str): uuid or name of a NFVO tenant record
properties (dict): attributes with values to be updated
Returns
list: Records that were updated
"""
accounts = self.persist.get_wim_accounts_by(wim, tenant)
return [self._update_single_wim_account(account, properties)
for account in accounts]
def _delete_single_wim_account(self, account):
"""Delete WIM Account, taking care to remove the corresponding thread
and delete the internal WIM account, if it was automatically generated.
Arguments:
account (dict): Current account record
properties (dict): Properties to be updated
Returns:
dict: current record (same as input)
"""
self.persist.delete_wim_account(account['uuid'])
if account['uuid'] not in self.threads:
raise WimAccountNotActive(
'Requests send to the WIM Account %s are not currently '
'being processed.', account['uuid'])
else:
self.threads[account['uuid']].exit()
del self.threads[account['uuid']]
return account
def delete_wim_accounts(self, wim, tenant=None, **kwargs):
"""Delete all the accounts related to a WIM (and a tenant),
thanking care of threads and internal WIM accounts.
Arguments:
wim (str): uuid or name of a WIM record
tenant (str): uuid or name of a NFVO tenant record
Returns
list: Records that were deleted
"""
kwargs.setdefault('error_if_none', False)
accounts = self.persist.get_wim_accounts_by(wim, tenant, **kwargs)
return [self._delete_single_wim_account(a) for a in accounts]
def _reload_wim_threads(self, wim_id):
for thread in self.threads.values():
if thread.wim_account['wim_id'] == wim_id:
thread.reload()
def create_wim_port_mappings(self, wim, properties, tenant=None):
"""Store information about port mappings from Database"""
# TODO: Review tenants... WIMs can exist across different tenants,
# and the port_mappings are a WIM property, not a wim_account
# property, so the concepts are not related
wim = self.persist.get_by_name_or_uuid('wims', wim)
result = self.persist.create_wim_port_mappings(wim, properties, tenant)
self._reload_wim_threads(wim['uuid'])
return result
def get_wim_port_mappings(self, wim):
"""Retrive information about port mappings from Database"""
return self.persist.get_wim_port_mappings(wim)
def delete_wim_port_mappings(self, wim):
"""Erase the port mapping records associated with the WIM"""
wim = self.persist.get_by_name_or_uuid('wims', wim)
message = self.persist.delete_wim_port_mappings(wim['uuid'])
self._reload_wim_threads(wim['uuid'])
return message
def find_common_wims(self, datacenter_ids, tenant):
"""Find WIMs that are common to all datacenters listed"""
mappings = self.persist.get_wim_port_mappings(
datacenter=datacenter_ids, tenant=tenant, error_if_none=False)
wim_id_of = itemgetter('wim_id')
sorted_mappings = sorted(mappings, key=wim_id_of) # needed by groupby
grouped_mappings = groupby(sorted_mappings, key=wim_id_of)
mapped_datacenters = {
wim_id: [m['datacenter_id'] for m in mappings]
for wim_id, mappings in grouped_mappings
}
return [
wim_id
for wim_id, connected_datacenters in mapped_datacenters.items()
if set(connected_datacenters) >= set(datacenter_ids)
]
def find_common_wim(self, datacenter_ids, tenant):
"""Find a single WIM that is able to connect all the datacenters
listed
Raises:
NoWimConnectedToDatacenters: if no WIM connected to all datacenters
at once is found
"""
suitable_wim_ids = self.find_common_wims(datacenter_ids, tenant)
if not suitable_wim_ids:
raise NoWimConnectedToDatacenters(datacenter_ids)
# TODO: use a criteria to determine which WIM is going to be used,
# instead of always using the first one (strategy pattern can be
# used here)
return suitable_wim_ids[0]
def find_suitable_wim_account(self, datacenter_ids, tenant):
"""Find a WIM account that is able to connect all the datacenters
listed
Arguments:
datacenter_ids (list): List of UUIDs of all the datacenters (vims),
that need to be connected.
tenant (str): UUID of the OSM tenant
Returns:
object with the WIM account that is able to connect all the
datacenters.
"""
wim_id = self.find_common_wim(datacenter_ids, tenant)
return self.persist.get_wim_account_by(wim_id, tenant)
def derive_wan_link(self,
wim_usage,
instance_scenario_id, sce_net_id,
networks, tenant, related=None):
"""Create a instance_wim_nets record for the given information"""
if sce_net_id in wim_usage:
account_id = wim_usage[sce_net_id]
account = self.persist.get_wim_account_by(uuid=account_id)
wim_id = account['wim_id']
else:
datacenters = [n['datacenter_id'] for n in networks]
wim_id = self.find_common_wim(datacenters, tenant)
account = self.persist.get_wim_account_by(wim_id, tenant)
return {
'uuid': str(uuid4()),
'instance_scenario_id': instance_scenario_id,
'sce_net_id': sce_net_id,
'wim_id': wim_id,
'wim_account_id': account['uuid'],
'related': related
}
def derive_wan_links(self, wim_usage, networks, tenant=None):
"""Discover and return what are the wan_links that have to be created
considering a set of networks (VLDs) required for a scenario instance
(NSR).
Arguments:
wim_usage(dict): Mapping between sce_net_id and wim_id. If wim_id is False, means not create wam_links
networks(list): Dicts containing the information about the networks
that will be instantiated to materialize a Network Service
(scenario) instance.
Corresponding to the ``instance_net`` record.
Returns:
list: list of WAN links to be written to the database
"""
# Group networks by key=(instance_scenario_id, sce_net_id)
related = None
if networks:
related = networks[0].get("related")
filtered = _filter_multi_vim(networks)
grouped_networks = _group_networks(filtered)
datacenters_per_group = _count_datacenters(grouped_networks)
# For each group count the number of networks. If greater then 1,
# we have to create a wan link connecting them.
wan_groups = [key
for key, counter in datacenters_per_group
if counter > 1]
# Keys are tuples(instance_scenario_id, sce_net_id)
return [
self.derive_wan_link(wim_usage,
key[0], key[1], grouped_networks[key], tenant, related)
for key in wan_groups if wim_usage.get(key[1]) is not False
]
def create_action(self, wan_link):
"""For a single wan_link create the corresponding create action"""
return {
'action': 'CREATE',
'status': 'SCHEDULED',
'item': 'instance_wim_nets',
'item_id': wan_link['uuid'],
'wim_account_id': wan_link['wim_account_id']
}
def create_actions(self, wan_links):
"""For an array of wan_links, create all the corresponding actions"""
return [self.create_action(l) for l in wan_links]
def delete_action(self, wan_link):
"""For a single wan_link create the corresponding create action"""
return {
'action': 'DELETE',
'status': 'SCHEDULED',
'item': 'instance_wim_nets',
'item_id': wan_link['uuid'],
'wim_account_id': wan_link['wim_account_id'],
'extra': json.dumps({'wan_link': wan_link})
# We serialize and cache the wan_link here, because it can be
# deleted during the delete process
}
def delete_actions(self, wan_links=(), instance_scenario_id=None):
"""Given a Instance Scenario, remove all the WAN Links created in the
past"""
if instance_scenario_id:
wan_links = self.persist.get_wan_links(
instance_scenario_id=instance_scenario_id, sdn='false')
return [self.delete_action(l) for l in wan_links]
def incorporate_actions(self, wim_actions, instance_action):
"""Make the instance action consider new WIM actions and make the WIM
actions aware of the instance action
"""
current = instance_action.setdefault('number_tasks', 0)
for i, action in enumerate(wim_actions):
action['task_index'] = current + i
action['instance_action_id'] = instance_action['uuid']
instance_action['number_tasks'] += len(wim_actions)
return wim_actions, instance_action
def dispatch(self, tasks):
"""Enqueue a list of tasks for further processing.
This function is supposed to be called outside from the WIM Thread.
"""
for task in tasks:
if task['wim_account_id'] not in self.threads:
error_msg = str(WimAccountNotActive(
'Requests send to the WIM Account %s are not currently '
'being processed.', task['wim_account_id']))
Action(task, self.logger).fail(self.persist, error_msg)
self.persist.update_wan_link(task['item_id'],
{'status': 'ERROR',
'error_msg': error_msg})
self.logger.error('Task %s %s %s not dispatched.\n%s',
task['action'], task['item'],
task['instance_account_id'], error_msg)
else:
self.threads[task['wim_account_id']].insert_task(task)
self.logger.debug('Task %s %s %s dispatched',
task['action'], task['item'],
task['instance_action_id'])
def _spawn_thread(self, wim_account):
"""Spawn a WIM thread
Arguments:
wim_account (dict): WIM information (usually persisted)
The `wim` field is required to be set with a valid WIM record
inside the `wim_account` dict
Return:
threading.Thread: Thread object
"""
thread = None
try:
thread = WimThread(self.persist, self.plugins, wim_account, ovim=self.ovim)
self.threads[wim_account['uuid']] = thread
thread.start()
except: # noqa
self.logger.error('Error when spawning WIM thread for %s',
wim_account['uuid'], exc_info=True)
return thread
def start_threads(self):
"""Start the threads responsible for processing WIM Actions"""
accounts = self.persist.get_wim_accounts(error_if_none=False)
thread_dict = {}
for account in accounts:
try:
plugin_name = "rosdn_" + account["wim"]["type"]
if plugin_name not in self.plugins:
self._load_plugin(plugin_name, type="sdn")
thread_dict[account["uuid"]] = self._spawn_thread(account)
except UndefinedWimConnector as e:
self.logger.error(e)
self.threads = remove_none_items(thread_dict)
def stop_threads(self):
"""Stop the threads responsible for processing WIM Actions"""
for uuid, thread in self.threads.items():
thread.exit()
self.threads.clear()
@contextmanager
def threads_running(self):
"""Ensure no thread will be left running"""
# This method is particularly important for testing :)
try:
self.start_threads()
yield
finally:
self.stop_threads()
def _filter_multi_vim(networks):
"""Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
return [n for n in networks if 'sce_net_id' in n and n['sce_net_id']]
def _group_networks(networks):
"""Group networks that correspond to the same instance_scenario_id and
sce_net_id (NSR and VLD).
Arguments:
networks(list): Dicts containing the information about the networks
that will be instantiated to materialize a Network Service
(scenario) instance.
Returns:
dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
are list of networks.
"""
criteria = itemgetter('instance_scenario_id', 'sce_net_id')
networks = sorted(networks, key=criteria)
return {k: list(v) for k, v in groupby(networks, key=criteria)}
def _count_datacenters(grouped_networks):
"""Count the number of datacenters in each group of networks
Returns:
list of tuples: the first element is the group key, while the second
element is the number of datacenters in each group.
"""
return ((key, len(set(n['datacenter_id'] for n in group)))
for key, group in grouped_networks.items())