blob: 3fdd03244f125abdde809788deb62cf134d1a4a0 [file] [log] [blame]
# -*- coding: utf-8 -*-
##
# Copyright 2018 University of Bristol - High Performance Networks Research
# Group
# All Rights Reserved.
#
# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact with: <highperformance-networks@bristol.ac.uk>
#
# Neither the name of the University of Bristol nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# This work has been performed in the context of DCMS UK 5G Testbeds
# & Trials Programme and in the framework of the Metro-Haul project -
# funded by the European Commission under Grant number 761727 through the
# Horizon 2020 and 5G-PPP programmes.
##
"""This module contains the domain logic, and the implementation of the
required steps to perform VNF management and orchestration in a WAN
environment.
It works as an extension/complement to the main functions contained in the
``nfvo.py`` file and avoids interacting directly with the database, by relying
on the `persistence` module.
No http request handling/direct interaction with the database should be present
in this file.
"""
import json
import logging
from contextlib import contextmanager
from itertools import groupby
from operator import itemgetter
from sys import exc_info
from uuid import uuid4
from six import reraise
from ..utils import remove_none_items
from .actions import Action
from .errors import (
DbBaseException,
NoWimConnectedToDatacenters,
UnexpectedDatabaseError,
WimAccountNotActive
)
from .wim_thread import WimThread
class WimEngine(object):
"""Logic supporting the establishment of WAN links when NS spans across
different datacenters.
"""
def __init__(self, persistence, logger=None, ovim=None):
self.persist = persistence
self.logger = logger or logging.getLogger('openmano.wim.engine')
self.threads = {}
self.connectors = {}
self.ovim = ovim
def create_wim(self, properties):
"""Create a new wim record according to the properties
Please check the wim schema to have more information about
``properties``.
The ``config`` property might contain a ``wim_port_mapping`` dict,
In this case, the method ``create_wim_port_mappings`` will be
automatically invoked.
Returns:
str: uuid of the newly created WIM record
"""
port_mapping = ((properties.get('config', {}) or {})
.pop('wim_port_mapping', {}))
uuid = self.persist.create_wim(properties)
if port_mapping:
try:
self.create_wim_port_mappings(uuid, port_mapping)
except DbBaseException:
# Rollback
self.delete_wim(uuid)
ex = UnexpectedDatabaseError('Failed to create port mappings'
'Rolling back wim creation')
self.logger.exception(str(ex))
reraise(ex.__class__, ex, exc_info()[2])
return uuid
def get_wim(self, uuid_or_name, tenant_id=None):
"""Retrieve existing WIM record by name or id.
If ``tenant_id`` is specified, the query will be
limited to the WIM associated to the given tenant.
"""
# Since it is a pure DB operation, we can delegate it directly
return self.persist.get_wim(uuid_or_name, tenant_id)
def update_wim(self, uuid_or_name, properties):
"""Edit an existing WIM record.
``properties`` is a dictionary with the properties being changed,
if a property is not present, the old value will be preserved
Similarly to create_wim, the ``config`` property might contain a
``wim_port_mapping`` dict, In this case, port mappings will be
automatically updated.
"""
port_mapping = ((properties.get('config', {}) or {})
.pop('wim_port_mapping', {}))
orig_props = self.persist.get_by_name_or_uuid('wims', uuid_or_name)
uuid = orig_props['uuid']
response = self.persist.update_wim(uuid, properties)
if port_mapping:
try:
# It is very complex to diff and update individually all the
# port mappings. Therefore a practical approach is just delete
# and create it again.
self.persist.delete_wim_port_mappings(uuid)
# ^ Calling from persistence avoid reloading twice the thread
self.create_wim_port_mappings(uuid, port_mapping)
except DbBaseException:
# Rollback
self.update_wim(uuid_or_name, orig_props)
ex = UnexpectedDatabaseError('Failed to update port mappings'
'Rolling back wim updates\n')
self.logger.exception(str(ex))
reraise(ex.__class__, ex, exc_info()[2])
return response
def delete_wim(self, uuid_or_name):
"""Kill the corresponding wim threads and erase the WIM record"""
# Theoretically, we can rely on the database to drop the wim_accounts
# automatically, since we have configures 'ON CASCADE DELETE'.
# However, use use `delete_wim_accounts` to kill all the running
# threads.
self.delete_wim_accounts(uuid_or_name)
return self.persist.delete_wim(uuid_or_name)
def create_wim_account(self, wim, tenant, properties):
"""Create an account that associates a tenant to a WIM.
As a side effect this function will spawn a new thread
Arguments:
wim (str): name or uuid of the WIM related to the account being
created
tenant (str): name or uuid of the nfvo tenant to which the account
will be created
properties (dict): properties of the account
(eg. username, password, ...)
Returns:
dict: Created record
"""
uuid = self.persist.create_wim_account(wim, tenant, properties)
account = self.persist.get_wim_account_by(uuid=uuid)
# ^ We need to use get_wim_account_by here, since this methods returns
# all the associations, and we need the wim to create the thread
self._spawn_thread(account)
return account
def _update_single_wim_account(self, account, properties):
"""Update WIM Account, taking care to reload the corresponding thread
Arguments:
account (dict): Current account record
properties (dict): Properties to be updated
Returns:
dict: updated record
"""
account = self.persist.update_wim_account(account['uuid'], properties)
self.threads[account['uuid']].reload()
return account
def update_wim_accounts(self, wim, tenant, properties):
"""Update all the accounts related to a WIM and a tenant,
thanking care of reloading threads.
Arguments:
wim (str): uuid or name of a WIM record
tenant (str): uuid or name of a NFVO tenant record
properties (dict): attributes with values to be updated
Returns
list: Records that were updated
"""
accounts = self.persist.get_wim_accounts_by(wim, tenant)
return [self._update_single_wim_account(account, properties)
for account in accounts]
def _delete_single_wim_account(self, account):
"""Delete WIM Account, taking care to remove the corresponding thread
and delete the internal WIM account, if it was automatically generated.
Arguments:
account (dict): Current account record
properties (dict): Properties to be updated
Returns:
dict: current record (same as input)
"""
self.persist.delete_wim_account(account['uuid'])
if account['uuid'] not in self.threads:
raise WimAccountNotActive(
'Requests send to the WIM Account %s are not currently '
'being processed.', account['uuid'])
else:
self.threads[account['uuid']].exit()
del self.threads[account['uuid']]
return account
def delete_wim_accounts(self, wim, tenant=None, **kwargs):
"""Delete all the accounts related to a WIM (and a tenant),
thanking care of threads and internal WIM accounts.
Arguments:
wim (str): uuid or name of a WIM record
tenant (str): uuid or name of a NFVO tenant record
Returns
list: Records that were deleted
"""
kwargs.setdefault('error_if_none', False)
accounts = self.persist.get_wim_accounts_by(wim, tenant, **kwargs)
return [self._delete_single_wim_account(a) for a in accounts]
def _reload_wim_threads(self, wim_id):
for thread in self.threads.values():
if thread.wim_account['wim_id'] == wim_id:
thread.reload()
def create_wim_port_mappings(self, wim, properties, tenant=None):
"""Store information about port mappings from Database"""
# TODO: Review tenants... WIMs can exist across different tenants,
# and the port_mappings are a WIM property, not a wim_account
# property, so the concepts are not related
wim = self.persist.get_by_name_or_uuid('wims', wim)
result = self.persist.create_wim_port_mappings(wim, properties, tenant)
self._reload_wim_threads(wim['uuid'])
return result
def get_wim_port_mappings(self, wim):
"""Retrive information about port mappings from Database"""
return self.persist.get_wim_port_mappings(wim)
def delete_wim_port_mappings(self, wim):
"""Erase the port mapping records associated with the WIM"""
wim = self.persist.get_by_name_or_uuid('wims', wim)
message = self.persist.delete_wim_port_mappings(wim['uuid'])
self._reload_wim_threads(wim['uuid'])
return message
def find_common_wims(self, datacenter_ids, tenant):
"""Find WIMs that are common to all datacenters listed"""
mappings = self.persist.get_wim_port_mappings(
datacenter=datacenter_ids, tenant=tenant, error_if_none=False)
wim_id_of = itemgetter('wim_id')
sorted_mappings = sorted(mappings, key=wim_id_of) # needed by groupby
grouped_mappings = groupby(sorted_mappings, key=wim_id_of)
mapped_datacenters = {
wim_id: [m['datacenter_id'] for m in mappings]
for wim_id, mappings in grouped_mappings
}
return [
wim_id
for wim_id, connected_datacenters in mapped_datacenters.items()
if set(connected_datacenters) >= set(datacenter_ids)
]
def find_common_wim(self, datacenter_ids, tenant):
"""Find a single WIM that is able to connect all the datacenters
listed
Raises:
NoWimConnectedToDatacenters: if no WIM connected to all datacenters
at once is found
"""
suitable_wim_ids = self.find_common_wims(datacenter_ids, tenant)
if not suitable_wim_ids:
raise NoWimConnectedToDatacenters(datacenter_ids)
# TODO: use a criteria to determine which WIM is going to be used,
# instead of always using the first one (strategy pattern can be
# used here)
return suitable_wim_ids[0]
def find_suitable_wim_account(self, datacenter_ids, tenant):
"""Find a WIM account that is able to connect all the datacenters
listed
Arguments:
datacenter_ids (list): List of UUIDs of all the datacenters (vims),
that need to be connected.
tenant (str): UUID of the OSM tenant
Returns:
object with the WIM account that is able to connect all the
datacenters.
"""
wim_id = self.find_common_wim(datacenter_ids, tenant)
return self.persist.get_wim_account_by(wim_id, tenant)
def derive_wan_link(self,
wim_usage,
instance_scenario_id, sce_net_id,
networks, tenant, related=None):
"""Create a instance_wim_nets record for the given information"""
if sce_net_id in wim_usage:
account_id = wim_usage[sce_net_id]
account = self.persist.get_wim_account_by(uuid=account_id)
wim_id = account['wim_id']
else:
datacenters = [n['datacenter_id'] for n in networks]
wim_id = self.find_common_wim(datacenters, tenant)
account = self.persist.get_wim_account_by(wim_id, tenant)
return {
'uuid': str(uuid4()),
'instance_scenario_id': instance_scenario_id,
'sce_net_id': sce_net_id,
'wim_id': wim_id,
'wim_account_id': account['uuid'],
'related': related
}
def derive_wan_links(self, wim_usage, networks, tenant=None):
"""Discover and return what are the wan_links that have to be created
considering a set of networks (VLDs) required for a scenario instance
(NSR).
Arguments:
wim_usage(dict): Mapping between sce_net_id and wim_id. If wim_id is False, means not create wam_links
networks(list): Dicts containing the information about the networks
that will be instantiated to materialize a Network Service
(scenario) instance.
Corresponding to the ``instance_net`` record.
Returns:
list: list of WAN links to be written to the database
"""
# Group networks by key=(instance_scenario_id, sce_net_id)
related = None
if networks:
related = networks[0].get("related")
filtered = _filter_multi_vim(networks)
grouped_networks = _group_networks(filtered)
datacenters_per_group = _count_datacenters(grouped_networks)
# For each group count the number of networks. If greater then 1,
# we have to create a wan link connecting them.
wan_groups = [key
for key, counter in datacenters_per_group
if counter > 1]
# Keys are tuples(instance_scenario_id, sce_net_id)
return [
self.derive_wan_link(wim_usage,
key[0], key[1], grouped_networks[key], tenant, related)
for key in wan_groups if wim_usage.get(key[1]) is not False
]
def create_action(self, wan_link):
"""For a single wan_link create the corresponding create action"""
return {
'action': 'CREATE',
'status': 'SCHEDULED',
'item': 'instance_wim_nets',
'item_id': wan_link['uuid'],
'wim_account_id': wan_link['wim_account_id']
}
def create_actions(self, wan_links):
"""For an array of wan_links, create all the corresponding actions"""
return [self.create_action(l) for l in wan_links]
def delete_action(self, wan_link):
"""For a single wan_link create the corresponding create action"""
return {
'action': 'DELETE',
'status': 'SCHEDULED',
'item': 'instance_wim_nets',
'item_id': wan_link['uuid'],
'wim_account_id': wan_link['wim_account_id'],
'extra': json.dumps({'wan_link': wan_link})
# We serialize and cache the wan_link here, because it can be
# deleted during the delete process
}
def delete_actions(self, wan_links=(), instance_scenario_id=None):
"""Given a Instance Scenario, remove all the WAN Links created in the
past"""
if instance_scenario_id:
wan_links = self.persist.get_wan_links(
instance_scenario_id=instance_scenario_id)
return [self.delete_action(l) for l in wan_links]
def incorporate_actions(self, wim_actions, instance_action):
"""Make the instance action consider new WIM actions and make the WIM
actions aware of the instance action
"""
current = instance_action.setdefault('number_tasks', 0)
for i, action in enumerate(wim_actions):
action['task_index'] = current + i
action['instance_action_id'] = instance_action['uuid']
instance_action['number_tasks'] += len(wim_actions)
return wim_actions, instance_action
def dispatch(self, tasks):
"""Enqueue a list of tasks for further processing.
This function is supposed to be called outside from the WIM Thread.
"""
for task in tasks:
if task['wim_account_id'] not in self.threads:
error_msg = str(WimAccountNotActive(
'Requests send to the WIM Account %s are not currently '
'being processed.', task['wim_account_id']))
Action(task, self.logger).fail(self.persist, error_msg)
self.persist.update_wan_link(task['item_id'],
{'status': 'ERROR',
'error_msg': error_msg})
self.logger.error('Task %s %s %s not dispatched.\n%s',
task['action'], task['item'],
task['instance_account_id'], error_msg)
else:
self.threads[task['wim_account_id']].insert_task(task)
self.logger.debug('Task %s %s %s dispatched',
task['action'], task['item'],
task['instance_action_id'])
def _spawn_thread(self, wim_account):
"""Spawn a WIM thread
Arguments:
wim_account (dict): WIM information (usually persisted)
The `wim` field is required to be set with a valid WIM record
inside the `wim_account` dict
Return:
threading.Thread: Thread object
"""
thread = None
try:
thread = WimThread(self.persist, wim_account, ovim=self.ovim)
self.threads[wim_account['uuid']] = thread
thread.start()
except: # noqa
self.logger.error('Error when spawning WIM thread for %s',
wim_account['uuid'], exc_info=True)
return thread
def start_threads(self):
"""Start the threads responsible for processing WIM Actions"""
accounts = self.persist.get_wim_accounts(error_if_none=False)
self.threads = remove_none_items(
{a['uuid']: self._spawn_thread(a) for a in accounts})
def stop_threads(self):
"""Stop the threads responsible for processing WIM Actions"""
for uuid, thread in self.threads.items():
thread.exit()
del self.threads[uuid]
@contextmanager
def threads_running(self):
"""Ensure no thread will be left running"""
# This method is particularly important for testing :)
try:
self.start_threads()
yield
finally:
self.stop_threads()
def _filter_multi_vim(networks):
"""Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
return [n for n in networks if 'sce_net_id' in n and n['sce_net_id']]
def _group_networks(networks):
"""Group networks that correspond to the same instance_scenario_id and
sce_net_id (NSR and VLD).
Arguments:
networks(list): Dicts containing the information about the networks
that will be instantiated to materialize a Network Service
(scenario) instance.
Returns:
dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
are list of networks.
"""
criteria = itemgetter('instance_scenario_id', 'sce_net_id')
networks = sorted(networks, key=criteria)
return {k: list(v) for k, v in groupby(networks, key=criteria)}
def _count_datacenters(grouped_networks):
"""Count the number of datacenters in each group of networks
Returns:
list of tuples: the first element is the group key, while the second
element is the number of datacenters in each group.
"""
return ((key, len(set(n['datacenter_id'] for n in group)))
for key, group in grouped_networks.items())