Implement feature 5949
Enable dynamic connectivity setup in multi-site Network Services
The code required to implement the feature is contained in `osm_ro/wim`
as much as possible.
* `wim/engine.py` works together with `nfvo.py` to implement the
feature
* `wim/persistence.py` is equivalent to `nfvo_db.py` and try to
encapsulate most of the SQL-specific code, implementing a persistence
layer
* `wim/http_handler.py` extends `httpserver.py` adding WIM-related HTTP
routes
* `wim/wim_thread.py` is similar to `vim_thread.py` and controls the
execution of WIM-related tasks
* `wim/actions.py` and `wim/wan_link_actions.py` implement the action
handling specific code, calling instances of the `wim/wimconn.py`
subclasses
WIM connectors are still a work in progress
Individual change details (newer to older)
- Add errors for inconsistent state
- Delay re-scheduled tasks
- Move lock to inside the persistence object
- Better errors for connector failures
- Try to cache the wan_link information before it is deleted from the database
- Integrate WanLinkDelete to NFVO
- Add WanLinkDelete implementation draft with some tests
- Add basic wim network creation
- Add minimal documentation for actions
- Add checks to the create action
- Improve documentation, rearrange insert_pending and remove unused functions on WimThread
- Integrate Action classes in refresh_tasks
- Add Action classes to avoid intricate conditions
- Adding Proposed License
- Move grouping of actions to persistence
- Change WimThread to use SQL to do the heavy lifting
- Simplify WimThread reload_actions
- Add tests for derive_wan_links
- Implement find_common_wim(s)
- Add tests for create_wim_account
- Add migration scripts for version 33
- Changes to WIM and VIM threads for vim_wim_actions
- Implement wim_account management according to the discussion
- Add WimHandler integration inside httpserver
- Add quick instructions to run the tests
- Add WIM functional tests using real database
- Add DB WIM port mapping
- RO WIM-related console scripts
- Add WIM integration to NFVO
- Improve database support focusing on tests
- RO NBI WIM-related commands in HTTP server
- Adding WIM tables to MANO DB
- Add wim http handler initial implementation
- Move http utility functions to separated files
This separation allows the code to be reused more easily and avoids
circular dependencies.
(The httpserver can import other modules implementing http routes,
and those modules can then use the utility functions without having
to import back httpserver)
- Add a HTTP handler class and custom route decorator
These tools can be used to create independent groups of bottle
routes/callbacks in a OOP fashion
- Extract http error codes and related logic to separated file
Change-Id: Icd5fc9fa345852b8cf571e48f427dc10bdbd24c5
Signed-off-by: Anderson Bravalheri <a.bravalheri@bristol.ac.uk>
diff --git a/osm_ro/wim/__init__.py b/osm_ro/wim/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/osm_ro/wim/__init__.py
diff --git a/osm_ro/wim/actions.py b/osm_ro/wim/actions.py
new file mode 100644
index 0000000..f224460
--- /dev/null
+++ b/osm_ro/wim/actions.py
@@ -0,0 +1,423 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+# pylint: disable=E1101,E0203,W0201
+
+"""Common logic for task management"""
+import logging
+from time import time
+from types import StringTypes
+
+from six.moves import range
+
+import yaml
+
+from ..utils import (
+ filter_dict_keys,
+ filter_out_dict_keys,
+ merge_dicts,
+ remove_none_items,
+ truncate
+)
+
+PENDING, REFRESH, IGNORE = range(3)
+
+TIMEOUT = 1 * 60 * 60 # 1 hour
+MIN_ATTEMPTS = 10
+
+
+class Action(object):
+ """Create a basic object representing the action record.
+
+ Arguments:
+ record (dict): record as returned by the database
+ **kwargs: extra keyword arguments to overwrite the fields in record
+ """
+
+ PROPERTIES = [
+ 'task_index', # MD - Index number of the task.
+ # This together with the instance_action_id
+ # forms a unique key identifier
+ 'action', # MD - CREATE, DELETE, FIND
+ 'item', # MD - table name, eg. instance_wim_nets
+ 'item_id', # MD - uuid of the referenced entry in the
+ # previous table
+ 'instance_action_id', # MD - reference to a cohesive group of actions
+ # related to the same instance-scenario
+ 'wim_account_id', # MD - reference to the WIM account used
+ # by the thread/connector
+ 'wim_internal_id', # MD - internal ID used by the WIM to refer to
+ # the item
+ 'datacenter_vim_id', # MD - reference to the VIM account used
+ # by the thread/connector
+ 'vim_id', # MD - internal ID used by the VIM to refer to
+ # the item
+ 'status', # MD - SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
+ 'extra', # MD - text with yaml format at database,
+ # dict at memory with:
+ # `- params: list with the params to be sent to the VIM for CREATE
+ # or FIND. For DELETE the vim_id is taken from other
+ # related tasks
+ # `- find: (only for CREATE tasks) if present it should FIND
+ # before creating and use if existing.
+ # Contains the FIND params
+ # `- depends_on: list with the 'task_index'es of tasks that must be
+ # completed before. e.g. a vm creation depends on a net
+ # creation
+ # `- sdn_net_id: used for net.
+ # `- tries
+ # `- created_items:
+ # dictionary with extra elements created that need
+ # to be deleted. e.g. ports,
+ # `- volumes,...
+ # `- created: False if the VIM element is not created by
+ # other actions, and it should not be deleted
+ # `- wim_status: WIM status of the element. Stored also at database
+ # in the item table
+ 'params', # M - similar to extra[params]
+ 'depends_on', # M - similar to extra[depends_on]
+ 'depends', # M - dict with task_index(from depends_on) to
+ # task class
+ 'error_msg', # MD - descriptive text upon an error
+ 'created_at', # MD - task DB creation time
+ 'modified_at', # MD - last DB update time
+ 'process_at', # M - unix epoch when to process the task
+ ]
+
+ __slots__ = PROPERTIES + [
+ 'logger',
+ ]
+
+ def __init__(self, record, logger=None, **kwargs):
+ self.logger = logger or logging.getLogger('openmano.wim.action')
+ attrs = merge_dicts(dict.fromkeys(self.PROPERTIES), record, kwargs)
+ self.update(_expand_extra(attrs))
+
+ def __repr__(self):
+ return super(Action, self).__repr__() + repr(self.as_dict())
+
+ def as_dict(self, *fields):
+ """Representation of the object as a dict"""
+ attrs = (set(self.PROPERTIES) & set(fields)
+ if fields else self.PROPERTIES)
+ return {k: getattr(self, k) for k in attrs}
+
+ def as_record(self):
+ """Returns a dict that can be send to the persistence layer"""
+ special = ['params', 'depends_on', 'depends']
+ record = self.as_dict()
+ record['extra'].update(self.as_dict(*special))
+ non_fields = special + ['process_at']
+
+ return remove_none_items(filter_out_dict_keys(record, non_fields))
+
+ def update(self, values=None, **kwargs):
+ """Update the in-memory representation of the task (works similarly to
+ dict.update). The update is NOT automatically persisted.
+ """
+ # "white-listed mass assignment"
+ updates = merge_dicts(values, kwargs)
+ for attr in set(self.PROPERTIES) & set(updates.keys()):
+ setattr(self, attr, updates[attr])
+
+ def save(self, persistence, **kwargs):
+ """Persist current state of the object to the database.
+
+ Arguments:
+ persistence: object encapsulating the database
+ **kwargs: extra properties to be updated before saving
+
+ Note:
+ If any key word argument is passed, the object itself will be
+ changed as an extra side-effect.
+ """
+ action_id = self.instance_action_id
+ index = self.task_index
+ if kwargs:
+ self.update(kwargs)
+ properties = self.as_record()
+
+ return persistence.update_action(action_id, index, properties)
+
+ def fail(self, persistence, reason, status='FAILED'):
+ """Mark action as FAILED, updating tables accordingly"""
+ persistence.update_instance_action_counters(
+ self.instance_action_id,
+ failed=1,
+ done=(-1 if self.status == 'DONE' else 0))
+
+ self.status = status
+ self.error_msg = truncate(reason)
+ self.logger.error('%s %s: %s', self.id, status, reason)
+ return self.save(persistence)
+
+ def succeed(self, persistence, status='DONE'):
+ """Mark action as DONE, updating tables accordingly"""
+ persistence.update_instance_action_counters(
+ self.instance_action_id, done=1)
+ self.status = status
+ self.logger.debug('%s %s', self.id, status)
+ return self.save(persistence)
+
+ def defer(self, persistence, reason,
+ timeout=TIMEOUT, min_attempts=MIN_ATTEMPTS):
+ """Postpone the task processing, taking care to not timeout.
+
+ Arguments:
+ persistence: object encapsulating the database
+ reason (str): explanation for the delay
+ timeout (int): maximum delay tolerated since the first attempt.
+ Note that this number is a time delta, in seconds
+ min_attempts (int): Number of attempts to try before giving up.
+ """
+ now = time()
+ last_attempt = self.extra.get('last_attempted_at') or time()
+ attempts = self.extra.get('attempts') or 0
+
+ if last_attempt - now > timeout and attempts > min_attempts:
+ self.fail(persistence,
+ 'Timeout reached. {} attempts in the last {:d} min'
+ .format(attempts, last_attempt / 60))
+
+ self.extra['last_attempted_at'] = time()
+ self.extra['attempts'] = attempts + 1
+ self.logger.info('%s DEFERRED: %s', self.id, reason)
+ return self.save(persistence)
+
+ @property
+ def group_key(self):
+ """Key defining the group to which this tasks belongs"""
+ return (self.item, self.item_id)
+
+ @property
+ def processing(self):
+ """Processing status for the task (PENDING, REFRESH, IGNORE)"""
+ if self.status == 'SCHEDULED':
+ return PENDING
+
+ return IGNORE
+
+ @property
+ def id(self):
+ """Unique identifier of this particular action"""
+ return '{}[{}]'.format(self.instance_action_id, self.task_index)
+
+ @property
+ def is_scheduled(self):
+ return self.status == 'SCHEDULED'
+
+ @property
+ def is_build(self):
+ return self.status == 'BUILD'
+
+ @property
+ def is_done(self):
+ return self.status == 'DONE'
+
+ @property
+ def is_failed(self):
+ return self.status == 'FAILED'
+
+ @property
+ def is_superseded(self):
+ return self.status == 'SUPERSEDED'
+
+ def refresh(self, connector, persistence):
+ """Use the connector/persistence to refresh the status of the item.
+
+ After the item status is refreshed any change in the task should be
+ persisted to the database.
+
+ Arguments:
+ connector: object containing the classes to access the WIM or VIM
+ persistence: object containing the methods necessary to query the
+ database and to persist the updates
+ """
+ self.logger.debug(
+ 'Action `%s` has no refresh to be done',
+ self.__class__.__name__)
+
+ def expand_dependency_links(self, task_group):
+ """Expand task indexes into actual IDs"""
+ if not self.depends_on or (
+ isinstance(self.depends, dict) and self.depends):
+ return
+
+ num_tasks = len(task_group)
+ references = {
+ "TASK-{}".format(i): task_group[i]
+ for i in self.depends_on
+ if i < num_tasks and task_group[i].task_index == i and
+ task_group[i].instance_action_id == self.instance_action_id
+ }
+ self.depends = references
+
+ def become_superseded(self, superseding):
+ """When another action tries to supersede this one,
+ we need to change both of them, so the surviving actions will be
+ logic consistent.
+
+ This method should do the required internal changes, and also
+ suggest changes for the other, superseding, action.
+
+ Arguments:
+ superseding: other task superseding this one
+
+ Returns:
+ dict: changes suggested to the action superseding this one.
+ A special key ``superseding_needed`` is used to
+ suggest if the superseding is actually required or not.
+ If not present, ``superseding_needed`` is assumed to
+ be False.
+ """
+ self.status = 'SUPERSEDED'
+ self.logger.debug(
+ 'Action `%s` was superseded by `%s`',
+ self.__class__.__name__, superseding.__class__.__name__)
+ return {}
+
+ def supersede(self, others):
+ """Supersede other tasks, if necessary
+
+ Arguments:
+ others (list): action objects being superseded
+
+ When the task decide to supersede others, this method should call
+ ``become_superseded`` on the other actions, collect the suggested
+ updates and perform the necessary changes
+ """
+ # By default actions don't supersede others
+ self.logger.debug(
+ 'Action `%s` does not supersede other actions',
+ self.__class__.__name__)
+
+ def process(self, connector, persistence, ovim):
+ """Abstract method, that needs to be implemented.
+ Process the current task.
+
+ Arguments:
+ connector: object with API for accessing the WAN
+ Infrastructure Manager system
+ persistence: abstraction layer for the database
+ ovim: instance of openvim, abstraction layer that enable
+ SDN-related operations
+ """
+ raise NotImplementedError
+
+
+class FindAction(Action):
+ """Abstract class that should be inherited for FIND actions, depending on
+ the item type.
+ """
+ @property
+ def processing(self):
+ if self.status in ('DONE', 'BUILD'):
+ return REFRESH
+
+ return super(FindAction, self).processing
+
+ def become_superseded(self, superseding):
+ super(FindAction, self).become_superseded(superseding)
+ info = ('vim_id', 'wim_internal_id')
+ return remove_none_items({f: getattr(self, f) for f in info})
+
+
+class CreateAction(Action):
+ """Abstract class that should be inherited for CREATE actions, depending on
+ the item type.
+ """
+ @property
+ def processing(self):
+ if self.status in ('DONE', 'BUILD'):
+ return REFRESH
+
+ return super(CreateAction, self).processing
+
+ def become_superseded(self, superseding):
+ super(CreateAction, self).become_superseded(superseding)
+
+ created = self.extra.get('created', True)
+ sdn_net_id = self.extra.get('sdn_net_id')
+ pending_info = self.wim_internal_id or self.vim_id or sdn_net_id
+ if not(created and pending_info):
+ return {}
+
+ extra_fields = ('sdn_net_id', 'interfaces', 'created_items')
+ extra_info = filter_dict_keys(self.extra or {}, extra_fields)
+
+ return {'superseding_needed': True,
+ 'wim_internal_id': self.wim_internal_id,
+ 'vim_id': self.vim_id,
+ 'extra': remove_none_items(extra_info)}
+
+
+class DeleteAction(Action):
+ """Abstract class that should be inherited for DELETE actions, depending on
+ the item type.
+ """
+ def supersede(self, others):
+ self.logger.debug('%s %s %s %s might supersede other actions',
+ self.id, self.action, self.item, self.item_id)
+ # First collect all the changes from the superseded tasks
+ changes = [other.become_superseded(self) for other in others]
+ needed = any(change.pop('superseding_needed', False)
+ for change in changes)
+
+ # Deal with the nested ones first
+ extras = [change.pop('extra', None) or {} for change in changes]
+ items = [extra.pop('created_items', None) or {} for extra in extras]
+ items = merge_dicts(self.extra.get('created_items', {}), *items)
+ self.extra = merge_dicts(self.extra, {'created_items': items}, *extras)
+
+ # Accept the other ones
+ change = ((key, value) for key, value in merge_dicts(*changes).items()
+ if key in self.PROPERTIES)
+ for attr, value in change:
+ setattr(self, attr, value)
+
+ # Reevaluate if the action itself is needed
+ if not needed:
+ self.status = 'SUPERSEDED'
+
+
+def _expand_extra(record):
+ extra = record.pop('extra', None) or {}
+ if isinstance(extra, StringTypes):
+ extra = yaml.safe_load(extra)
+
+ record['params'] = extra.get('params')
+ record['depends_on'] = extra.get('depends_on', [])
+ record['depends'] = extra.get('depends', None)
+ record['extra'] = extra
+
+ return record
diff --git a/osm_ro/wim/engine.py b/osm_ro/wim/engine.py
new file mode 100644
index 0000000..dde5dee
--- /dev/null
+++ b/osm_ro/wim/engine.py
@@ -0,0 +1,455 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+
+"""This module contains the domain logic, and the implementation of the
+required steps to perform VNF management and orchestration in a WAN
+environment.
+
+It works as an extension/complement to the main functions contained in the
+``nfvo.py`` file and avoids interacting directly with the database, by relying
+on the `persistence` module.
+
+No http request handling/direct interaction with the database should be present
+in this file.
+"""
+import json
+import logging
+from contextlib import contextmanager
+from itertools import groupby
+from operator import itemgetter
+from uuid import uuid4
+
+from ..utils import remove_none_items
+from .actions import Action
+from .errors import (
+ NoWimConnectedToDatacenters,
+ WimAccountNotActive
+)
+from .wim_thread import WimThread
+
+
+class WimEngine(object):
+ """Logic supporting the establishment of WAN links when NS spans across
+ different datacenters.
+ """
+ def __init__(self, persistence, logger=None, ovim=None):
+ self.persist = persistence
+ self.logger = logger or logging.getLogger('openmano.wim.engine')
+ self.threads = {}
+ self.connectors = {}
+ self.ovim = ovim
+
+ def create_wim(self, properties):
+ """Create a new wim record according to the properties
+
+ Please check the wim schema to have more information about
+ ``properties``.
+
+ Returns:
+ str: uuid of the newly created WIM record
+ """
+ return self.persist.create_wim(properties)
+
+ def get_wim(self, uuid_or_name, tenant_id=None):
+ """Retrieve existing WIM record by name or id.
+
+ If ``tenant_id`` is specified, the query will be
+ limited to the WIM associated to the given tenant.
+ """
+ # Since it is a pure DB operation, we can delegate it directly
+ return self.persist.get_wim(uuid_or_name, tenant_id)
+
+ def update_wim(self, uuid_or_name, properties):
+ """Edit an existing WIM record.
+
+ ``properties`` is a dictionary with the properties being changed,
+ if a property is not present, the old value will be preserved
+ """
+ return self.persist.update_wim(uuid_or_name, properties)
+
+ def delete_wim(self, uuid_or_name):
+ """Kill the corresponding wim threads and erase the WIM record"""
+ # Theoretically, we can rely on the database to drop the wim_accounts
+ # automatically, since we have configures 'ON CASCADE DELETE'.
+ # However, use use `delete_wim_accounts` to kill all the running
+ # threads.
+ self.delete_wim_accounts(uuid_or_name)
+ return self.persist.delete_wim(uuid_or_name)
+
+ def create_wim_account(self, wim, tenant, properties):
+ """Create an account that associates a tenant to a WIM.
+
+ As a side effect this function will spawn a new thread
+
+ Arguments:
+ wim (str): name or uuid of the WIM related to the account being
+ created
+ tenant (str): name or uuid of the nfvo tenant to which the account
+ will be created
+ properties (dict): properties of the account
+ (eg. username, password, ...)
+
+ Returns:
+ dict: Created record
+ """
+ uuid = self.persist.create_wim_account(wim, tenant, properties)
+ account = self.persist.get_wim_account_by(uuid=uuid)
+ # ^ We need to use get_wim_account_by here, since this methods returns
+ # all the associations, and we need the wim to create the thread
+ self._spawn_thread(account)
+ return account
+
+ def _update_single_wim_account(self, account, properties):
+ """Update WIM Account, taking care to reload the corresponding thread
+
+ Arguments:
+ account (dict): Current account record
+ properties (dict): Properties to be updated
+
+ Returns:
+ dict: updated record
+ """
+ account = self.persist.update_wim_account(account['uuid'], properties)
+ self.threads[account['uuid']].reload()
+ return account
+
+ def update_wim_accounts(self, wim, tenant, properties):
+ """Update all the accounts related to a WIM and a tenant,
+ thanking care of reloading threads.
+
+ Arguments:
+ wim (str): uuid or name of a WIM record
+ tenant (str): uuid or name of a NFVO tenant record
+ properties (dict): attributes with values to be updated
+
+ Returns
+ list: Records that were updated
+ """
+ accounts = self.persist.get_wim_accounts_by(wim, tenant)
+ return [self._update_single_wim_account(account, properties)
+ for account in accounts]
+
+ def _delete_single_wim_account(self, account):
+ """Delete WIM Account, taking care to remove the corresponding thread
+ and delete the internal WIM account, if it was automatically generated.
+
+ Arguments:
+ account (dict): Current account record
+ properties (dict): Properties to be updated
+
+ Returns:
+ dict: current record (same as input)
+ """
+ self.persist.delete_wim_account(account['uuid'])
+
+ if account['uuid'] not in self.threads:
+ raise WimAccountNotActive(
+ 'Requests send to the WIM Account %s are not currently '
+ 'being processed.', account['uuid'])
+ else:
+ self.threads[account['uuid']].exit()
+ del self.threads[account['uuid']]
+
+ return account
+
+ def delete_wim_accounts(self, wim, tenant=None, **kwargs):
+ """Delete all the accounts related to a WIM (and a tenant),
+ thanking care of threads and internal WIM accounts.
+
+ Arguments:
+ wim (str): uuid or name of a WIM record
+ tenant (str): uuid or name of a NFVO tenant record
+
+ Returns
+ list: Records that were deleted
+ """
+ kwargs.setdefault('error_if_none', False)
+ accounts = self.persist.get_wim_accounts_by(wim, tenant, **kwargs)
+ return [self._delete_single_wim_account(a) for a in accounts]
+
+ def _reload_wim_threads(self, wim_id):
+ for thread in self.threads.values():
+ if thread.wim_account['wim_id'] == wim_id:
+ thread.reload()
+
+ def create_wim_port_mappings(self, wim, properties, tenant=None):
+ """Store information about port mappings from Database"""
+ # TODO: Review tenants... WIMs can exist across different tenants,
+ # and the port_mappings are a WIM property, not a wim_account
+ # property, so the concepts are not related
+ wim = self.persist.get_by_name_or_uuid('wims', wim)
+ result = self.persist.create_wim_port_mappings(wim, properties, tenant)
+ self._reload_wim_threads(wim['uuid'])
+ return result
+
+ def get_wim_port_mappings(self, wim):
+ """Retrive information about port mappings from Database"""
+ return self.persist.get_wim_port_mappings(wim)
+
+ def delete_wim_port_mappings(self, wim):
+ """Erase the port mapping records associated with the WIM"""
+ wim = self.persist.get_by_name_or_uuid('wims', wim)
+ message = self.persist.delete_wim_port_mappings(wim['uuid'])
+ self._reload_wim_threads(wim['uuid'])
+ return message
+
+ def find_common_wims(self, datacenter_ids, tenant):
+ """Find WIMs that are common to all datacenters listed"""
+ mappings = self.persist.get_wim_port_mappings(
+ datacenter=datacenter_ids, tenant=tenant, error_if_none=False)
+
+ wim_id_of = itemgetter('wim_id')
+ sorted_mappings = sorted(mappings, key=wim_id_of) # needed by groupby
+ grouped_mappings = groupby(sorted_mappings, key=wim_id_of)
+ mapped_datacenters = {
+ wim_id: [m['datacenter_id'] for m in mappings]
+ for wim_id, mappings in grouped_mappings
+ }
+
+ return [
+ wim_id
+ for wim_id, connected_datacenters in mapped_datacenters.items()
+ if set(connected_datacenters) >= set(datacenter_ids)
+ ]
+
+ def find_common_wim(self, datacenter_ids, tenant):
+ """Find a single WIM that is able to connect all the datacenters
+ listed
+
+ Raises
+ NoWimConnectedToDatacenters: if no WIM connected to all datacenters
+ at once is found
+ """
+ suitable_wim_ids = self.find_common_wims(datacenter_ids, tenant)
+
+ if not suitable_wim_ids:
+ raise NoWimConnectedToDatacenters(datacenter_ids)
+
+ # TODO: use a criteria to determine which WIM is going to be used,
+ # instead of always using the first one (strategy pattern can be
+ # used here)
+ return suitable_wim_ids[0]
+
+ def derive_wan_link(self,
+ instance_scenario_id, sce_net_id,
+ networks, tenant):
+ """Create a instance_wim_nets record for the given information"""
+ datacenters = [n['datacenter_id'] for n in networks]
+ wim_id = self.find_common_wim(datacenters, tenant)
+
+ account = self.persist.get_wim_account_by(wim_id, tenant)
+
+ return {
+ 'uuid': str(uuid4()),
+ 'instance_scenario_id': instance_scenario_id,
+ 'sce_net_id': sce_net_id,
+ 'wim_id': wim_id,
+ 'wim_account_id': account['uuid']
+ }
+
+ def derive_wan_links(self, networks, tenant=None):
+ """Discover and return what are the wan_links that have to be created
+ considering a set of networks (VLDs) required for a scenario instance
+ (NSR).
+
+ Arguments:
+ networks(list): Dicts containing the information about the networks
+ that will be instantiated to materialize a Network Service
+ (scenario) instance.
+
+ Returns:
+ list: list of WAN links to be written to the database
+ """
+ # Group networks by key=(instance_scenario_id, sce_net_id)
+ filtered = _filter_multi_vim(networks)
+ grouped_networks = _group_networks(filtered)
+ datacenters_per_group = _count_datacenters(grouped_networks)
+ # For each group count the number of networks. If greater then 1,
+ # we have to create a wan link connecting them.
+ wan_groups = [key
+ for key, counter in datacenters_per_group
+ if counter > 1]
+
+ return [
+ self.derive_wan_link(key[0], key[1], grouped_networks[key], tenant)
+ for key in wan_groups
+ ]
+
+ def create_action(self, wan_link):
+ """For a single wan_link create the corresponding create action"""
+ return {
+ 'action': 'CREATE',
+ 'status': 'SCHEDULED',
+ 'item': 'instance_wim_nets',
+ 'item_id': wan_link['uuid'],
+ 'wim_account_id': wan_link['wim_account_id']
+ }
+
+ def create_actions(self, wan_links):
+ """For an array of wan_links, create all the corresponding actions"""
+ return [self.create_action(l) for l in wan_links]
+
+ def delete_action(self, wan_link):
+ """For a single wan_link create the corresponding create action"""
+ return {
+ 'action': 'DELETE',
+ 'status': 'SCHEDULED',
+ 'item': 'instance_wim_nets',
+ 'item_id': wan_link['uuid'],
+ 'wim_account_id': wan_link['wim_account_id'],
+ 'extra': json.dumps({'wan_link': wan_link})
+ # We serialize and cache the wan_link here, because it can be
+ # deleted during the delete process
+ }
+
+ def delete_actions(self, wan_links=(), instance_scenario_id=None):
+ """Given a Instance Scenario, remove all the WAN Links created in the
+ past"""
+ if instance_scenario_id:
+ wan_links = self.persist.get_wan_links(
+ instance_scenario_id=instance_scenario_id)
+ return [self.delete_action(l) for l in wan_links]
+
+ def incorporate_actions(self, wim_actions, instance_action):
+ """Make the instance action consider new WIM actions and make the WIM
+ actions aware of the instance action
+ """
+ current = instance_action.setdefault('number_tasks', 0)
+ for i, action in enumerate(wim_actions):
+ action['task_index'] = current + i
+ action['instance_action_id'] = instance_action['uuid']
+ instance_action['number_tasks'] += len(wim_actions)
+
+ return wim_actions, instance_action
+
+ def dispatch(self, tasks):
+ """Enqueue a list of tasks for further processing.
+
+ This function is supposed to be called outside from the WIM Thread.
+ """
+ for task in tasks:
+ if task['wim_account_id'] not in self.threads:
+ error_msg = str(WimAccountNotActive(
+ 'Requests send to the WIM Account %s are not currently '
+ 'being processed.', task['wim_account_id']))
+ Action(task, self.logger).fail(self.persist, error_msg)
+ self.persist.update_wan_link(task['item_id'],
+ {'status': 'ERROR',
+ 'error_msg': error_msg})
+ self.logger.error('Task %s %s %s not dispatched.\n%s',
+ task['action'], task['item'],
+ task['instance_account_id'], error_msg)
+ else:
+ self.threads[task['wim_account_id']].insert_task(task)
+ self.logger.debug('Task %s %s %s dispatched',
+ task['action'], task['item'],
+ task['instance_action_id'])
+
+ def _spawn_thread(self, wim_account):
+ """Spawn a WIM thread
+
+ Arguments:
+ wim_account (dict): WIM information (usually persisted)
+ The `wim` field is required to be set with a valid WIM record
+ inside the `wim_account` dict
+
+ Return:
+ threading.Thread: Thread object
+ """
+ thread = None
+ try:
+ thread = WimThread(self.persist, wim_account, ovim=self.ovim)
+ self.threads[wim_account['uuid']] = thread
+ thread.start()
+ except: # noqa
+ self.logger.error('Error when spawning WIM thread for %s',
+ wim_account['uuid'], exc_info=True)
+
+ return thread
+
+ def start_threads(self):
+ """Start the threads responsible for processing WIM Actions"""
+ accounts = self.persist.get_wim_accounts(error_if_none=False)
+ self.threads = remove_none_items(
+ {a['uuid']: self._spawn_thread(a) for a in accounts})
+
+ def stop_threads(self):
+ """Stop the threads responsible for processing WIM Actions"""
+ for uuid, thread in self.threads.items():
+ thread.exit()
+ del self.threads[uuid]
+
+ @contextmanager
+ def threads_running(self):
+ """Ensure no thread will be left running"""
+ # This method is particularly important for testing :)
+ try:
+ self.start_threads()
+ yield
+ finally:
+ self.stop_threads()
+
+
+def _filter_multi_vim(networks):
+ """Ignore networks without sce_net_id (all VNFs go to the same VIM)"""
+ return [n for n in networks if 'sce_net_id' in n and n['sce_net_id']]
+
+
+def _group_networks(networks):
+ """Group networks that correspond to the same instance_scenario_id and
+ sce_net_id (NSR and VLD).
+
+ Arguments:
+ networks(list): Dicts containing the information about the networks
+ that will be instantiated to materialize a Network Service
+ (scenario) instance.
+ Returns:
+ dict: Keys are tuples (instance_scenario_id, sce_net_id) and values
+ are lits of networks.
+ """
+ criteria = itemgetter('instance_scenario_id', 'sce_net_id')
+
+ networks = sorted(networks, key=criteria)
+ return {k: list(v) for k, v in groupby(networks, key=criteria)}
+
+
+def _count_datacenters(grouped_networks):
+ """Count the number of datacenters in each group of networks
+
+ Returns:
+ list of tuples: the first element is the group key, while the second
+ element is the number of datacenters in each group.
+ """
+ return ((key, len(set(n['datacenter_id'] for n in group)))
+ for key, group in grouped_networks.items())
diff --git a/osm_ro/wim/errors.py b/osm_ro/wim/errors.py
new file mode 100644
index 0000000..16c53b5
--- /dev/null
+++ b/osm_ro/wim/errors.py
@@ -0,0 +1,180 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+from six.moves import queue
+
+from ..db_base import db_base_Exception as DbBaseException
+from ..http_tools.errors import (
+ Bad_Request,
+ Conflict,
+ HttpMappedError,
+ Internal_Server_Error,
+ Not_Found
+)
+
+
+class NoRecordFound(DbBaseException):
+ """No record was found in the database"""
+
+ def __init__(self, criteria, table=None):
+ table_info = '{} - '.format(table) if table else ''
+ super(NoRecordFound, self).__init__(
+ '{}: {}`{}`'.format(self.__class__.__doc__, table_info, criteria),
+ http_code=Not_Found)
+
+
+class MultipleRecordsFound(DbBaseException):
+ """More than one record was found in the database"""
+
+ def __init__(self, criteria, table=None):
+ table_info = '{} - '.format(table) if table else ''
+ super(MultipleRecordsFound, self).__init__(
+ '{}: {}`{}`'.format(self.__class__.__doc__, table_info, criteria),
+ http_code=Conflict)
+
+
+class WimAndTenantNotAttached(DbBaseException):
+ """Wim and Tenant are not attached"""
+
+ def __init__(self, wim, tenant):
+ super(WimAndTenantNotAttached, self).__init__(
+ '{}: `{}` <> `{}`'.format(self.__class__.__doc__, wim, tenant),
+ http_code=Conflict)
+
+
+class WimAndTenantAlreadyAttached(DbBaseException):
+ """There is already a wim account attaching the given wim and tenant"""
+
+ def __init__(self, wim, tenant):
+ super(WimAndTenantAlreadyAttached, self).__init__(
+ '{}: `{}` <> `{}`'.format(self.__class__.__doc__, wim, tenant),
+ http_code=Conflict)
+
+
+class NoWimConnectedToDatacenters(NoRecordFound):
+ """No WIM that is able to connect the given datacenters was found"""
+
+
+class InvalidParameters(DbBaseException):
+ """The given parameters are invalid"""
+
+ def __init__(self, message, http_code=Bad_Request):
+ super(InvalidParameters, self).__init__(message, http_code)
+
+
+class UndefinedAction(HttpMappedError):
+ """No action found"""
+
+ def __init__(self, item_type, action, http_code=Internal_Server_Error):
+ message = ('The action {} {} is not defined'.format(action, item_type))
+ super(UndefinedAction, self).__init__(message, http_code)
+
+
+class UndefinedWimConnector(DbBaseException):
+ """The connector class for the specified wim type is not implemented"""
+
+ def __init__(self, wim_type, module_name, location_reference):
+ super(UndefinedWimConnector, self).__init__(
+ ('{}: `{}`. Could not find module `{}` '
+ '(check if it is necessary to install a plugin)'
+ .format(self.__class__.__doc__, wim_type, module_name)),
+ http_code=Bad_Request)
+
+
+class WimAccountOverwrite(DbBaseException):
+ """An attempt to overwrite an existing WIM account was identified"""
+
+ def __init__(self, wim_account, diff=None, tip=None):
+ message = self.__class__.__doc__
+ account_info = (
+ 'Account -- name: {name}, uuid: {uuid}'.format(**wim_account)
+ if wim_account else '')
+ diff_info = (
+ 'Differing fields: ' + ', '.join(diff.keys()) if diff else '')
+
+ super(WimAccountOverwrite, self).__init__(
+ '\n'.join(m for m in (message, account_info, diff_info, tip) if m),
+ http_code=Conflict)
+
+
+class UnexpectedDatabaseError(DbBaseException):
+ """The database didn't raised an exception but also the query was not
+ executed (maybe the connection had some problems?)
+ """
+
+
+class UndefinedUuidOrName(DbBaseException):
+ """Trying to query for a record using an empty uuid or name"""
+
+ def __init__(self, table=None):
+ table_info = '{} - '.format(table.split()[0]) if table else ''
+ super(UndefinedUuidOrName, self).__init__(
+ table_info + self.__class__.__doc__, http_status=Bad_Request)
+
+
+class UndefinedWanMappingType(InvalidParameters):
+ """The dict wan_service_mapping_info MUST contain a `type` field"""
+
+ def __init__(self, given):
+ super(UndefinedWanMappingType, self).__init__(
+ '{}. Given: `{}`'.format(self.__class__.__doc__, given))
+
+
+class QueueFull(HttpMappedError, queue.Full):
+ """Thread queue is full"""
+
+ def __init__(self, thread_name, http_code=Internal_Server_Error):
+ message = ('Thread {} queue is full'.format(thread_name))
+ super(QueueFull, self).__init__(message, http_code)
+
+
+class InconsistentState(HttpMappedError):
+ """An unexpected inconsistency was find in the state of the program"""
+
+ def __init__(self, arg, http_code=Internal_Server_Error):
+ if isinstance(arg, HttpMappedError):
+ http_code = arg.http_code
+ message = str(arg)
+ else:
+ message = arg
+
+ super(InconsistentState, self).__init__(message, http_code)
+
+
+class WimAccountNotActive(HttpMappedError, KeyError):
+ """WIM Account is not active yet (no thread is running)"""
+
+ def __init__(self, message, http_code=Internal_Server_Error):
+ message += ('\nThe thread responsible for processing the actions have '
+ 'suddenly stopped, or have never being spawned')
+ super(WimAccountNotActive, self).__init__(message, http_code)
diff --git a/osm_ro/wim/failing_connector.py b/osm_ro/wim/failing_connector.py
new file mode 100644
index 0000000..b66551c
--- /dev/null
+++ b/osm_ro/wim/failing_connector.py
@@ -0,0 +1,82 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+
+"""In the case any error happens when trying to initiate the WIM Connector,
+we need a replacement for it, that will throw an error every time we try to
+execute any action
+"""
+import json
+from .wimconn import WimConnectorError
+
+
+class FailingConnector(object):
+ """Placeholder for a connector whose incitation failed,
+ This place holder will just raise an error every time an action is needed
+ from the connector.
+
+ This way we can make sure that all the other parts of the program will work
+ but the user will have all the information available to fix the problem.
+ """
+ def __init__(self, error_msg):
+ self.error_msg = error_msg
+
+ def check_credentials(self):
+ raise WimConnectorError('Impossible to use WIM:\n' + self.error_msg)
+
+ def get_connectivity_service_status(self, service_uuid, _conn_info=None):
+ raise WimConnectorError('Impossible to retrieve status for {}\n\n{}'
+ .format(service_uuid, self.error_msg))
+
+ def create_connectivity_service(self, service_uuid, *args, **kwargs):
+ raise WimConnectorError('Impossible to connect {}.\n{}\n{}\n{}'
+ .format(service_uuid, self.error_msg,
+ json.dumps(args, indent=4),
+ json.dumps(kwargs, indent=4)))
+
+ def delete_connectivity_service(self, service_uuid, _conn_info=None):
+ raise WimConnectorError('Impossible to disconnect {}\n\n{}'
+ .format(service_uuid, self.error_msg))
+
+ def edit_connectivity_service(self, service_uuid, *args, **kwargs):
+ raise WimConnectorError('Impossible to change connection {}.\n{}\n'
+ '{}\n{}'
+ .format(service_uuid, self.error_msg,
+ json.dumps(args, indent=4),
+ json.dumps(kwargs, indent=4)))
+
+ def clear_all_connectivity_services(self):
+ raise WimConnectorError('Impossible to use WIM:\n' + self.error_msg)
+
+ def get_all_active_connectivity_services(self):
+ raise WimConnectorError('Impossible to use WIM:\n' + self.error_msg)
diff --git a/osm_ro/wim/http_handler.py b/osm_ro/wim/http_handler.py
new file mode 100644
index 0000000..f5eeed9
--- /dev/null
+++ b/osm_ro/wim/http_handler.py
@@ -0,0 +1,223 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+
+"""This module works as an extension to the toplevel ``httpserver`` module,
+implementing callbacks for the HTTP routes related to the WIM features of OSM.
+
+Acting as a front-end, it is responsible for converting the HTTP request
+payload into native python objects, calling the correct engine methods
+and converting back the response objects into strings to be send in the HTTP
+response payload.
+
+Direct domain/persistence logic should be avoided in this file, instead
+calls to other layers should be done.
+"""
+import logging
+
+from bottle import request
+
+from .. import utils
+from ..http_tools.errors import ErrorHandler
+from ..http_tools.handler import BaseHandler, route
+from ..http_tools.request_processing import (
+ filter_query_string,
+ format_in,
+ format_out
+)
+from .engine import WimEngine
+from .persistence import WimPersistence
+from .schemas import (
+ wim_account_schema,
+ wim_edit_schema,
+ wim_port_mapping_schema,
+ wim_schema
+)
+
+
+class WimHandler(BaseHandler):
+ """HTTP route implementations for WIM related URLs
+
+ Arguments:
+ db: instance of mydb [optional]. This argument must be provided
+ if not ``persistence`` is passed
+ persistence (WimPersistence): High-level data storage abstraction
+ [optional]. If this argument is not present, ``db`` must be.
+ engine (WimEngine): Implementation of the business logic
+ for the engine of WAN networks
+ logger (logging.Logger): logger object [optional]
+ url_base(str): Path fragment to be prepended to the routes [optional]
+ plugins(list): List of bottle plugins to be applied to routes
+ [optional]
+ """
+ def __init__(self, db=None, persistence=None, engine=None,
+ url_base='', logger=None, plugins=()):
+ self.persist = persistence or WimPersistence(db)
+ self.engine = engine or WimEngine(self.persist)
+ self.url_base = url_base
+ self.logger = logger or logging.getLogger('openmano.wim.http')
+ error_handler = ErrorHandler(self.logger)
+ self.plugins = [error_handler] + list(plugins)
+
+ @route('GET', '/<tenant_id>/wims')
+ def http_list_wims(self, tenant_id):
+ allowed_fields = ('uuid', 'name', 'wim_url', 'type', 'created_at')
+ select_, where_, limit_ = filter_query_string(
+ request.query, None, allowed_fields)
+ # ^ Since we allow the user to customize the db query using the HTTP
+ # query and it is quite difficult to re-use this query, let's just
+ # do a ad-hoc call to the db
+
+ from_ = 'wims'
+ if tenant_id != 'any':
+ where_['nfvo_tenant_id'] = tenant_id
+ if 'created_at' in select_:
+ select_[select_.index('created_at')] = (
+ 'w.created_at as created_at')
+ if 'created_at' in where_:
+ where_['w.created_at'] = where_.pop('created_at')
+ from_ = ('wims as w join wim_nfvo_tenants as wt '
+ 'on w.uuid=wt.wim_id')
+
+ wims = self.persist.query(
+ FROM=from_, SELECT=select_, WHERE=where_, LIMIT=limit_,
+ error_if_none=False)
+
+ utils.convert_float_timestamp2str(wims)
+ return format_out({'wims': wims})
+
+ @route('GET', '/<tenant_id>/wims/<wim_id>')
+ def http_get_wim(self, tenant_id, wim_id):
+ tenant_id = None if tenant_id == 'any' else tenant_id
+ wim = self.engine.get_wim(wim_id, tenant_id)
+ return format_out({'wim': wim})
+
+ @route('POST', '/wims')
+ def http_create_wim(self):
+ http_content, _ = format_in(wim_schema, confidential_data=True)
+ r = utils.remove_extra_items(http_content, wim_schema)
+ if r:
+ self.logger.debug("Remove extra items received %r", r)
+ data = self.engine.create_wim(http_content['wim'])
+ return self.http_get_wim('any', data)
+
+ @route('PUT', '/wims/<wim_id>')
+ def http_update_wim(self, wim_id):
+ '''edit wim details, can use both uuid or name'''
+ # parse input data
+ http_content, _ = format_in(wim_edit_schema)
+ r = utils.remove_extra_items(http_content, wim_edit_schema)
+ if r:
+ self.logger.debug("Remove received extra items %s", r)
+
+ wim_id = self.engine.update_wim(wim_id, http_content['wim'])
+ return self.http_get_wim('any', wim_id)
+
+ @route('DELETE', '/wims/<wim_id>')
+ def http_delete_wim(self, wim_id):
+ """Delete a wim from a database, can use both uuid or name"""
+ data = self.engine.delete_wim(wim_id)
+ # TODO Remove WIM in orchestrator
+ return format_out({"result": "wim '" + data + "' deleted"})
+
+ @route('POST', '/<tenant_id>/wims/<wim_id>')
+ def http_create_wim_account(self, tenant_id, wim_id):
+ """Associate an existing wim to this tenant"""
+ # parse input data
+ http_content, _ = format_in(
+ wim_account_schema, confidential_data=True)
+ removed = utils.remove_extra_items(http_content, wim_account_schema)
+ removed and self.logger.debug("Remove extra items %r", removed)
+ account = self.engine.create_wim_account(
+ wim_id, tenant_id, http_content['wim_account'])
+ # check update succeeded
+ return format_out({"wim_account": account})
+
+ @route('PUT', '/<tenant_id>/wims/<wim_id>')
+ def http_update_wim_accounts(self, tenant_id, wim_id):
+ """Edit the association of an existing wim to this tenant"""
+ tenant_id = None if tenant_id == 'any' else tenant_id
+ # parse input data
+ http_content, _ = format_in(
+ wim_account_schema, confidential_data=True)
+ removed = utils.remove_extra_items(http_content, wim_account_schema)
+ removed and self.logger.debug("Remove extra items %r", removed)
+ accounts = self.engine.update_wim_accounts(
+ wim_id, tenant_id, http_content['wim_account'])
+
+ if tenant_id:
+ return format_out({'wim_account': accounts[0]})
+
+ return format_out({'wim_accounts': accounts})
+
+ @route('DELETE', '/<tenant_id>/wims/<wim_id>')
+ def http_delete_wim_accounts(self, tenant_id, wim_id):
+ """Deassociate an existing wim to this tenant"""
+ tenant_id = None if tenant_id == 'any' else tenant_id
+ accounts = self.engine.delete_wim_accounts(wim_id, tenant_id,
+ error_if_none=True)
+
+ properties = (
+ (account['name'], wim_id,
+ utils.safe_get(account, 'association.nfvo_tenant_id', tenant_id))
+ for account in accounts)
+
+ return format_out({
+ 'result': '\n'.join('WIM account `{}` deleted. '
+ 'Tenant `{}` detached from WIM `{}`'
+ .format(*p) for p in properties)
+ })
+
+ @route('POST', '/<tenant_id>/wims/<wim_id>/port_mapping')
+ def http_create_wim_port_mappings(self, tenant_id, wim_id):
+ """Set the wim port mapping for a wim"""
+ # parse input data
+ http_content, _ = format_in(wim_port_mapping_schema)
+
+ data = self.engine.create_wim_port_mappings(
+ wim_id, http_content['wim_port_mapping'], tenant_id)
+ return format_out({"wim_port_mapping": data})
+
+ @route('GET', '/<tenant_id>/wims/<wim_id>/port_mapping')
+ def http_get_wim_port_mappings(self, tenant_id, wim_id):
+ """Get wim port mapping details"""
+ # TODO: tenant_id is never used, so it should be removed
+ data = self.engine.get_wim_port_mappings(wim_id)
+ return format_out({"wim_port_mapping": data})
+
+ @route('DELETE', '/<tenant_id>/wims/<wim_id>/port_mapping')
+ def http_delete_wim_port_mappings(self, tenant_id, wim_id):
+ """Clean wim port mapping"""
+ # TODO: tenant_id is never used, so it should be removed
+ data = self.engine.delete_wim_port_mappings(wim_id)
+ return format_out({"result": data})
diff --git a/osm_ro/wim/persistence.py b/osm_ro/wim/persistence.py
new file mode 100644
index 0000000..8a74d49
--- /dev/null
+++ b/osm_ro/wim/persistence.py
@@ -0,0 +1,1018 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+
+"""This module contains only logic related to managing records in a database
+which includes data format normalization, data format validation and etc.
+(It works as an extension to `nfvo_db.py` for the WIM feature)
+
+No domain logic/architectural concern should be present in this file.
+"""
+import json
+import logging
+from contextlib import contextmanager
+from hashlib import sha1
+from itertools import groupby
+from operator import itemgetter
+from sys import exc_info
+from threading import Lock
+from time import time
+from uuid import uuid1 as generate_uuid
+
+from six import reraise
+
+import yaml
+
+from ..utils import (
+ check_valid_uuid,
+ convert_float_timestamp2str,
+ expand_joined_fields,
+ filter_dict_keys,
+ filter_out_dict_keys,
+ merge_dicts,
+ remove_none_items
+)
+from .errors import (
+ DbBaseException,
+ InvalidParameters,
+ MultipleRecordsFound,
+ NoRecordFound,
+ UndefinedUuidOrName,
+ UndefinedWanMappingType,
+ UnexpectedDatabaseError,
+ WimAccountOverwrite,
+ WimAndTenantAlreadyAttached
+)
+
+_WIM = 'wims AS wim '
+
+_WIM_JOIN = (
+ _WIM +
+ ' JOIN wim_nfvo_tenants AS association '
+ ' ON association.wim_id=wim.uuid '
+ ' JOIN nfvo_tenants AS nfvo_tenant '
+ ' ON association.nfvo_tenant_id=nfvo_tenant.uuid '
+ ' JOIN wim_accounts AS wim_account '
+ ' ON association.wim_account_id=wim_account.uuid '
+)
+
+_WIM_ACCOUNT_JOIN = (
+ 'wim_accounts AS wim_account '
+ ' JOIN wim_nfvo_tenants AS association '
+ ' ON association.wim_account_id=wim_account.uuid '
+ ' JOIN wims AS wim '
+ ' ON association.wim_id=wim.uuid '
+ ' JOIN nfvo_tenants AS nfvo_tenant '
+ ' ON association.nfvo_tenant_id=nfvo_tenant.uuid '
+)
+
+_DATACENTER_JOIN = (
+ 'datacenters AS datacenter '
+ ' JOIN tenants_datacenters AS association '
+ ' ON association.datacenter_id=datacenter.uuid '
+ ' JOIN datacenter_tenants as datacenter_account '
+ ' ON association.datacenter_tenant_id=datacenter_account.uuid '
+ ' JOIN nfvo_tenants AS nfvo_tenant '
+ ' ON association.nfvo_tenant_id=nfvo_tenant.uuid '
+)
+
+_PORT_MAPPING = 'wim_port_mappings as wim_port_mapping '
+
+_PORT_MAPPING_JOIN_WIM = (
+ ' JOIN wims as wim '
+ ' ON wim_port_mapping.wim_id=wim.uuid '
+)
+
+_PORT_MAPPING_JOIN_DATACENTER = (
+ ' JOIN datacenters as datacenter '
+ ' ON wim_port_mapping.datacenter_id=datacenter.uuid '
+)
+
+_WIM_SELECT = [
+ 'wim.{0} as {0}'.format(_field)
+ for _field in 'uuid name description wim_url type config '
+ 'created_at modified_at'.split()
+]
+
+_WIM_ACCOUNT_SELECT = 'uuid name user password config'.split()
+
+_PORT_MAPPING_SELECT = ('wim_port_mapping.*', )
+
+_CONFIDENTIAL_FIELDS = ('password', 'passwd')
+
+_SERIALIZED_FIELDS = ('config', 'vim_info', 'wim_info', 'conn_info', 'extra',
+ 'wan_service_mapping_info')
+
+UNIQUE_PORT_MAPPING_INFO_FIELDS = {
+ 'dpid-port': ('wan_switch_dpid', 'wan_switch_port')
+}
+"""Fields that should be unique for each port mapping that relies on
+wan_service_mapping_info.
+
+For example, for port mappings of type 'dpid-port', each combination of
+wan_switch_dpid and wan_switch_port should be unique (the same switch cannot
+be connected to two different places using the same port)
+"""
+
+
+class WimPersistence(object):
+ """High level interactions with the WIM tables in the database"""
+
+ def __init__(self, db, logger=None, lock=None):
+ self.db = db
+ self.logger = logger or logging.getLogger('openmano.wim.persistence')
+ self.lock = lock or Lock()
+
+ def query(self,
+ FROM=None,
+ SELECT=None,
+ WHERE=None,
+ ORDER_BY=None,
+ LIMIT=None,
+ OFFSET=None,
+ error_if_none=True,
+ error_if_multiple=False,
+ postprocess=None,
+ hide=_CONFIDENTIAL_FIELDS,
+ **kwargs):
+ """Retrieve records from the database.
+
+ Keyword Arguments:
+ SELECT, FROM, WHERE, LIMIT, ORDER_BY: used to compose the SQL
+ query. See ``nfvo_db.get_rows``.
+ OFFSET: only valid when used togheter with LIMIT.
+ Ignore the OFFSET first results of the query.
+ error_if_none: by default an error is raised if no record is
+ found. With this option it is possible to disable this error.
+ error_if_multiple: by default no error is raised if more then one
+ record is found.
+ With this option it is possible to enable this error.
+ postprocess: function applied to every retrieved record.
+ This function receives a dict as input and must return it
+ after modifications. Moreover this function should accept a
+ second optional parameter ``hide`` indicating
+ the confidential fiels to be obfuscated.
+ By default a minimal postprocessing function is applied,
+ obfuscating confidential fields and converting timestamps.
+ hide: option proxied to postprocess
+
+ All the remaining keyword arguments will be assumed to be ``name``s or
+ ``uuid``s to compose the WHERE statement, according to their format.
+ If the value corresponds to an array, the first element will determine
+ if it is an name or UUID.
+
+ For example:
+ - ``wim="abcdef"``` will be turned into ``wim.name="abcdef"``,
+ - ``datacenter="5286a274-8a1b-4b8d-a667-9c94261ad855"``
+ will be turned into
+ ``datacenter.uuid="5286a274-8a1b-4b8d-a667-9c94261ad855"``.
+ - ``wim=["5286a274-8a1b-4b8d-a667-9c94261ad855", ...]``
+ will be turned into
+ ``wim.uuid=["5286a274-8a1b-4b8d-a667-9c94261ad855", ...]``
+
+ Raises:
+ NoRecordFound: if the query result set is empty
+ DbBaseException: errors occuring during the execution of the query.
+ """
+ # Defaults:
+ postprocess = postprocess or _postprocess_record
+ WHERE = WHERE or {}
+
+ # Find remaining keywords by name or uuid
+ WHERE.update(_compose_where_from_uuids_or_names(**kwargs))
+ WHERE = WHERE or None
+ # ^ If the where statement is empty, it is better to leave it as None,
+ # so it can be filtered out at a later stage
+ LIMIT = ('{:d},{:d}'.format(OFFSET, LIMIT)
+ if LIMIT and OFFSET else LIMIT)
+
+ query = remove_none_items({
+ 'SELECT': SELECT, 'FROM': FROM, 'WHERE': WHERE,
+ 'LIMIT': LIMIT, 'ORDER_BY': ORDER_BY})
+
+ with self.lock:
+ records = self.db.get_rows(**query)
+
+ table = FROM.split()[0]
+ if error_if_none and not records:
+ raise NoRecordFound(WHERE, table)
+
+ if error_if_multiple and len(records) > 1:
+ self.logger.error('Multiple records '
+ 'FROM %s WHERE %s:\n\n%s\n\n',
+ FROM, WHERE, json.dumps(records, indent=4))
+ raise MultipleRecordsFound(WHERE, table)
+
+ return [
+ expand_joined_fields(postprocess(record, hide))
+ for record in records
+ ]
+
+ def query_one(self, *args, **kwargs):
+ """Similar to ``query``, but ensuring just one result.
+ ``error_if_multiple`` is enabled by default.
+ """
+ kwargs.setdefault('error_if_multiple', True)
+ records = self.query(*args, **kwargs)
+ return records[0] if records else None
+
+ def get_by_uuid(self, table, uuid, **kwargs):
+ """Retrieve one record from the database based on its uuid
+
+ Arguments:
+ table (str): table name (to be used in SQL's FROM statement).
+ uuid (str): unique identifier for record.
+
+ For additional keyword arguments and exceptions see :obj:`~.query`
+ (``error_if_multiple`` is enabled by default).
+ """
+ if uuid is None:
+ raise UndefinedUuidOrName(table)
+ return self.query_one(table, WHERE={'uuid': uuid}, **kwargs)
+
+ def get_by_name_or_uuid(self, table, uuid_or_name, **kwargs):
+ """Retrieve a record from the database based on a value that can be its
+ uuid or name.
+
+ Arguments:
+ table (str): table name (to be used in SQL's FROM statement).
+ uuid_or_name (str): this value can correspond to either uuid or
+ name
+ For additional keyword arguments and exceptions see :obj:`~.query`
+ (``error_if_multiple`` is enabled by default).
+ """
+ if uuid_or_name is None:
+ raise UndefinedUuidOrName(table)
+
+ key = 'uuid' if check_valid_uuid(uuid_or_name) else 'name'
+ return self.query_one(table, WHERE={key: uuid_or_name}, **kwargs)
+
+ def get_wims(self, uuid_or_name=None, tenant=None, **kwargs):
+ """Retrieve information about one or more WIMs stored in the database
+
+ Arguments:
+ uuid_or_name (str): uuid or name for WIM
+ tenant (str): [optional] uuid or name for NFVO tenant
+
+ See :obj:`~.query` for additional keyword arguments.
+ """
+ kwargs.update(wim=uuid_or_name, tenant=tenant)
+ from_ = _WIM_JOIN if tenant else _WIM
+ select_ = _WIM_SELECT[:] + (['wim_account.*'] if tenant else [])
+
+ kwargs.setdefault('SELECT', select_)
+ return self.query(from_, **kwargs)
+
+ def get_wim(self, wim, tenant=None, **kwargs):
+ """Similar to ``get_wims`` but ensure only one result is returned"""
+ kwargs.setdefault('error_if_multiple', True)
+ return self.get_wims(wim, tenant)[0]
+
+ def create_wim(self, wim_descriptor):
+ """Create a new wim record inside the database and returns its uuid
+
+ Arguments:
+ wim_descriptor (dict): properties of the record
+ (usually each field corresponds to a database column, but extra
+ information can be offloaded to another table or serialized as
+ JSON/YAML)
+ Returns:
+ str: UUID of the created WIM
+ """
+ if "config" in wim_descriptor:
+ wim_descriptor["config"] = _serialize(wim_descriptor["config"])
+
+ with self.lock:
+ return self.db.new_row(
+ "wims", wim_descriptor, add_uuid=True, confidential_data=True)
+
+ def update_wim(self, uuid_or_name, wim_descriptor):
+ """Change an existing WIM record on the database"""
+ # obtain data, check that only one exist
+ wim = self.get_by_name_or_uuid('wims', uuid_or_name)
+
+ # edit data
+ wim_id = wim['uuid']
+ where = {'uuid': wim['uuid']}
+
+ # unserialize config, edit and serialize it again
+ if wim_descriptor.get('config'):
+ new_config_dict = wim_descriptor["config"]
+ config_dict = remove_none_items(merge_dicts(
+ wim.get('config') or {}, new_config_dict))
+ wim_descriptor['config'] = (
+ _serialize(config_dict) if config_dict else None)
+
+ with self.lock:
+ self.db.update_rows('wims', wim_descriptor, where)
+
+ return wim_id
+
+ def delete_wim(self, wim):
+ # get nfvo_tenant info
+ wim = self.get_by_name_or_uuid('wims', wim)
+
+ with self.lock:
+ self.db.delete_row_by_id('wims', wim['uuid'])
+
+ return wim['uuid'] + ' ' + wim['name']
+
+ def get_wim_accounts_by(self, wim=None, tenant=None, uuid=None, **kwargs):
+ """Retrieve WIM account information from the database together
+ with the related records (wim, nfvo_tenant and wim_nfvo_tenant)
+
+ Arguments:
+ wim (str): uuid or name for WIM
+ tenant (str): [optional] uuid or name for NFVO tenant
+
+ See :obj:`~.query` for additional keyword arguments.
+ """
+ kwargs.update(wim=wim, tenant=tenant)
+ kwargs.setdefault('postprocess', _postprocess_wim_account)
+ if uuid:
+ kwargs.setdefault('WHERE', {'wim_account.uuid': uuid})
+ return self.query(FROM=_WIM_ACCOUNT_JOIN, **kwargs)
+
+ def get_wim_account_by(self, wim=None, tenant=None, **kwargs):
+ """Similar to ``get_wim_accounts_by``, but ensuring just one result"""
+ kwargs.setdefault('error_if_multiple', True)
+ return self.get_wim_accounts_by(wim, tenant, **kwargs)[0]
+
+ def get_wim_accounts(self, **kwargs):
+ """Retrieve all the accounts from the database"""
+ kwargs.setdefault('postprocess', _postprocess_wim_account)
+ return self.query(FROM=_WIM_ACCOUNT_JOIN, **kwargs)
+
+ def get_wim_account(self, uuid_or_name, **kwargs):
+ """Retrieve WIM Account record by UUID or name,
+ See :obj:`get_by_name_or_uuid` for keyword arguments.
+ """
+ kwargs.setdefault('postprocess', _postprocess_wim_account)
+ kwargs.setdefault('SELECT', _WIM_ACCOUNT_SELECT)
+ return self.get_by_name_or_uuid('wim_accounts', uuid_or_name, **kwargs)
+
+ @contextmanager
+ def _associate(self, wim_id, nfvo_tenant_id):
+ """Auxiliary method for ``create_wim_account``
+
+ This method just create a row in the association table
+ ``wim_nfvo_tenants``
+ """
+ try:
+ with self.lock:
+ yield
+ except DbBaseException as db_exception:
+ error_msg = str(db_exception)
+ if all([msg in error_msg
+ for msg in ("already in use", "'wim_nfvo_tenant'")]):
+ ex = WimAndTenantAlreadyAttached(wim_id, nfvo_tenant_id)
+ reraise(ex.__class__, ex, exc_info()[2])
+
+ raise
+
+ def create_wim_account(self, wim, tenant, properties):
+ """Associate a wim to a tenant using the ``wim_nfvo_tenants`` table
+ and create a ``wim_account`` to store credentials and configurations.
+
+ For the sake of simplification, we assume that each NFVO tenant can be
+ attached to a WIM using only one WIM account. This is automatically
+ guaranteed via database constraints.
+ For corner cases, the same WIM can be registered twice using another
+ name.
+
+ Arguments:
+ wim (str): name or uuid of the WIM related to the account being
+ created
+ tenant (str): name or uuid of the nfvo tenant to which the account
+ will be created
+ properties (dict): properties of the account
+ (eg. user, password, ...)
+ """
+ wim_id = self.get_by_name_or_uuid('wims', wim, SELECT=['uuid'])['uuid']
+ tenant = self.get_by_name_or_uuid('nfvo_tenants', tenant,
+ SELECT=['uuid', 'name'])
+ account = properties.setdefault('name', tenant['name'])
+
+ wim_account = self.query_one('wim_accounts',
+ WHERE={'wim_id': wim_id, 'name': account},
+ error_if_none=False)
+
+ transaction = []
+ used_uuids = []
+
+ if wim_account is None:
+ # If a row for the wim account doesn't exist yet, we need to
+ # create one, otherwise we can just re-use it.
+ account_id = str(generate_uuid())
+ used_uuids.append(account_id)
+ row = merge_dicts(properties, wim_id=wim_id, uuid=account_id)
+ transaction.append({'wim_accounts': _preprocess_wim_account(row)})
+ else:
+ account_id = wim_account['uuid']
+ properties.pop('config', None) # Config is too complex to compare
+ diff = {k: v for k, v in properties.items() if v != wim_account[k]}
+ if diff:
+ tip = 'Edit the account first, and then attach it to a tenant'
+ raise WimAccountOverwrite(wim_account, diff, tip)
+
+ transaction.append({
+ 'wim_nfvo_tenants': {'nfvo_tenant_id': tenant['uuid'],
+ 'wim_id': wim_id,
+ 'wim_account_id': account_id}})
+
+ with self._associate(wim_id, tenant['uuid']):
+ self.db.new_rows(transaction, used_uuids, confidential_data=True)
+
+ return account_id
+
+ def update_wim_account(self, uuid, properties, hide=_CONFIDENTIAL_FIELDS):
+ """Update WIM account record by overwriting fields with new values
+
+ Specially for the field ``config`` this means that a new dict will be
+ merged to the existing one.
+
+ Attributes:
+ uuid (str): UUID for the WIM account
+ properties (dict): fields that should be overwritten
+
+ Returns:
+ Updated wim_account
+ """
+ wim_account = self.get_by_uuid('wim_accounts', uuid)
+ safe_fields = 'user password name created'.split()
+ updates = _preprocess_wim_account(
+ merge_dicts(wim_account, filter_dict_keys(properties, safe_fields))
+ )
+
+ if properties.get('config'):
+ old_config = wim_account.get('config') or {}
+ new_config = merge_dicts(old_config, properties['config'])
+ updates['config'] = _serialize(new_config)
+
+ with self.lock:
+ num_changes = self.db.update_rows(
+ 'wim_accounts', UPDATE=updates,
+ WHERE={'uuid': wim_account['uuid']})
+
+ if num_changes is None:
+ raise UnexpectedDatabaseError('Impossible to update wim_account '
+ '{name}:{uuid}'.format(*wim_account))
+
+ return self.get_wim_account(wim_account['uuid'], hide=hide)
+
+ def delete_wim_account(self, uuid):
+ """Remove WIM account record from the database"""
+ # Since we have foreign keys configured with ON CASCADE, we can rely
+ # on the database engine to guarantee consistency, deleting the
+ # dependant records
+ with self.lock:
+ return self.db.delete_row_by_id('wim_accounts', uuid)
+
+ def get_datacenters_by(self, datacenter=None, tenant=None, **kwargs):
+ """Retrieve datacenter information from the database together
+ with the related records (nfvo_tenant)
+
+ Arguments:
+ datacenter (str): uuid or name for datacenter
+ tenant (str): [optional] uuid or name for NFVO tenant
+
+ See :obj:`~.query` for additional keyword arguments.
+ """
+ kwargs.update(datacenter=datacenter, tenant=tenant)
+ return self.query(_DATACENTER_JOIN, **kwargs)
+
+ def get_datacenter_by(self, datacenter=None, tenant=None, **kwargs):
+ """Similar to ``get_datacenters_by``, but ensuring just one result"""
+ kwargs.setdefault('error_if_multiple', True)
+ return self.get_datacenters_by(datacenter, tenant, **kwargs)[0]
+
+ def _create_single_port_mapping(self, properties):
+ info = properties.setdefault('wan_service_mapping_info', {})
+ endpoint_id = properties.get('wan_service_endpoint_id')
+
+ if info.get('mapping_type') and not endpoint_id:
+ properties['wan_service_endpoint_id'] = (
+ self._generate_port_mapping_id(info))
+
+ properties['wan_service_mapping_info'] = _serialize(info)
+
+ try:
+ with self.lock:
+ self.db.new_row('wim_port_mappings', properties,
+ add_uuid=False, confidential_data=True)
+ except DbBaseException as old_exception:
+ self.logger.exception(old_exception)
+ ex = InvalidParameters(
+ "The mapping must contain the "
+ "'pop_switch_dpid', 'pop_switch_port', and "
+ "wan_service_mapping_info: "
+ "('wan_switch_dpid' and 'wan_switch_port') or "
+ "'wan_service_endpoint_id}'")
+ reraise(ex.__class__, ex, exc_info()[2])
+
+ return properties
+
+ def create_wim_port_mappings(self, wim, port_mappings, tenant=None):
+ if not isinstance(wim, dict):
+ wim = self.get_by_name_or_uuid('wims', wim)
+
+ for port_mapping in port_mappings:
+ port_mapping['wim_name'] = wim['name']
+ datacenter = self.get_datacenter_by(
+ port_mapping['datacenter_name'], tenant)
+ for pop_wan_port_mapping in port_mapping['pop_wan_mappings']:
+ element = merge_dicts(pop_wan_port_mapping, {
+ 'wim_id': wim['uuid'],
+ 'datacenter_id': datacenter['uuid']})
+ self._create_single_port_mapping(element)
+
+ return port_mappings
+
+ def _filter_port_mappings_by_tenant(self, mappings, tenant):
+ """Make sure all the datacenters and wims listed in the port mapping
+ belong to an specific tenant
+ """
+
+ # NOTE: Theoretically this could be done at SQL level, but given the
+ # number of tables involved (wim_port_mappings, wim_accounts,
+ # wims, wim_nfvo_tenants, datacenters, datacenter_tenants,
+ # tenants_datacents and nfvo_tenants), it would result in a
+ # extremely complex query. Moreover, the predicate can vary:
+ # for `get_wim_port_mappings` we can have any combination of
+ # (wim, datacenter, tenant), not all of them having the 3 values
+ # so we have combinatorial trouble to write the 'FROM' statement.
+
+ kwargs = {'tenant': tenant, 'error_if_none': False}
+ # Cache results to speedup things
+ datacenters = {}
+ wims = {}
+
+ def _get_datacenter(uuid):
+ return (
+ datacenters.get(uuid) or
+ datacenters.setdefault(
+ uuid, self.get_datacenters_by(uuid, **kwargs)))
+
+ def _get_wims(uuid):
+ return (wims.get(uuid) or
+ wims.setdefault(uuid, self.get_wims(uuid, **kwargs)))
+
+ return [
+ mapping
+ for mapping in mappings
+ if (_get_datacenter(mapping['datacenter_id']) and
+ _get_wims(mapping['wim_id']))
+ ]
+
+ def get_wim_port_mappings(self, wim=None, datacenter=None, tenant=None,
+ **kwargs):
+ """List all the port mappings, optionally filtering by wim, datacenter
+ AND/OR tenant
+ """
+ from_ = [_PORT_MAPPING,
+ _PORT_MAPPING_JOIN_WIM if wim else '',
+ _PORT_MAPPING_JOIN_DATACENTER if datacenter else '']
+
+ criteria = ('wim_id', 'datacenter_id')
+ kwargs.setdefault('error_if_none', False)
+ mappings = self.query(
+ ' '.join(from_),
+ SELECT=_PORT_MAPPING_SELECT,
+ ORDER_BY=['wim_port_mapping.{}'.format(c) for c in criteria],
+ wim=wim, datacenter=datacenter,
+ postprocess=_postprocess_wim_port_mapping,
+ **kwargs)
+
+ if tenant:
+ mappings = self._filter_port_mappings_by_tenant(mappings, tenant)
+
+ # We don't have to sort, since we have used 'ORDER_BY'
+ grouped_mappings = groupby(mappings, key=itemgetter(*criteria))
+
+ return [
+ {'wim_id': key[0],
+ 'datacenter_id': key[1],
+ 'wan_pop_port_mappings': [
+ filter_out_dict_keys(mapping, (
+ 'id', 'wim_id', 'datacenter_id',
+ 'created_at', 'modified_at'))
+ for mapping in group]}
+ for key, group in grouped_mappings
+ ]
+
+ def delete_wim_port_mappings(self, wim_id):
+ with self.lock:
+ self.db.delete_row(FROM='wim_port_mappings',
+ WHERE={"wim_id": wim_id})
+ return "port mapping for wim {} deleted.".format(wim_id)
+
+ def update_wim_port_mapping(self, id, properties):
+ original = self.query_one('wim_port_mappings', WHERE={'id': id})
+
+ mapping_info = remove_none_items(merge_dicts(
+ original.get('wan_service_mapping_info') or {},
+ properties.get('wan_service_mapping_info') or {}))
+
+ updates = preprocess_record(
+ merge_dicts(original, remove_none_items(properties),
+ wan_service_mapping_info=mapping_info))
+
+ with self.lock:
+ num_changes = self.db.update_rows(
+ 'wim_port_mappings', UPDATE=updates, WHERE={'id': id})
+
+ if num_changes is None:
+ raise UnexpectedDatabaseError(
+ 'Impossible to update wim_port_mappings %s:\n%s\n',
+ id, _serialize(properties))
+
+ return num_changes
+
+ def get_actions_in_groups(self, wim_account_id,
+ item_types=('instance_wim_nets',),
+ group_offset=0, group_limit=150):
+ """Retrieve actions from the database in groups.
+ Each group contains all the actions that have the same ``item`` type
+ and ``item_id``.
+
+ Arguments:
+ wim_account_id: restrict the search to actions to be performed
+ using the same account
+ item_types (list): [optional] filter the actions to the given
+ item types
+ group_limit (int): maximum number of groups returned by the
+ function
+ group_offset (int): skip the N first groups. Used together with
+ group_limit for pagination purposes.
+
+ Returns:
+ List of groups, where each group is a tuple ``(key, actions)``.
+ In turn, ``key`` is a tuple containing the values of
+ ``(item, item_id)`` used to create the group and ``actions`` is a
+ list of ``vim_wim_actions`` records (dicts).
+ """
+
+ type_options = set(
+ '"{}"'.format(self.db.escape_string(t)) for t in item_types)
+
+ items = ('SELECT DISTINCT a.item, a.item_id, a.wim_account_id '
+ 'FROM vim_wim_actions AS a '
+ 'WHERE a.wim_account_id="{}" AND a.item IN ({}) '
+ 'ORDER BY a.item, a.item_id '
+ 'LIMIT {:d},{:d}').format(
+ self.safe_str(wim_account_id),
+ ','.join(type_options),
+ group_offset, group_limit
+ )
+
+ join = 'vim_wim_actions NATURAL JOIN ({}) AS items'.format(items)
+ with self.lock:
+ db_results = self.db.get_rows(
+ FROM=join, ORDER_BY=('item', 'item_id', 'created_at'))
+
+ results = (_postprocess_action(r) for r in db_results)
+ criteria = itemgetter('item', 'item_id')
+ return [(k, list(g)) for k, g in groupby(results, key=criteria)]
+
+ def update_action(self, instance_action_id, task_index, properties):
+ condition = {'instance_action_id': instance_action_id,
+ 'task_index': task_index}
+ action = self.query_one('vim_wim_actions', WHERE=condition)
+
+ extra = remove_none_items(merge_dicts(
+ action.get('extra') or {},
+ properties.get('extra') or {}))
+
+ updates = preprocess_record(
+ merge_dicts(action, properties, extra=extra))
+
+ with self.lock:
+ num_changes = self.db.update_rows('vim_wim_actions',
+ UPDATE=updates, WHERE=condition)
+
+ if num_changes is None:
+ raise UnexpectedDatabaseError(
+ 'Impossible to update vim_wim_actions '
+ '{instance_action_id}[{task_index}]'.format(*action))
+
+ return num_changes
+
+ def get_wan_links(self, uuid=None, **kwargs):
+ """Retrieve WAN link records from the database
+
+ Keyword Arguments:
+ uuid, instance_scenario_id, sce_net_id, wim_id, wim_account_id:
+ attributes that can be used at the WHERE clause
+ """
+ kwargs.setdefault('uuid', uuid)
+ kwargs.setdefault('error_if_none', False)
+
+ criteria_fields = ('uuid', 'instance_scenario_id', 'sce_net_id',
+ 'wim_id', 'wim_account_id')
+ criteria = remove_none_items(filter_dict_keys(kwargs, criteria_fields))
+ kwargs = filter_out_dict_keys(kwargs, criteria_fields)
+
+ return self.query('instance_wim_nets', WHERE=criteria, **kwargs)
+
+ def update_wan_link(self, uuid, properties):
+ wan_link = self.get_by_uuid('instance_wim_nets', uuid)
+
+ wim_info = remove_none_items(merge_dicts(
+ wan_link.get('wim_info') or {},
+ properties.get('wim_info') or {}))
+
+ updates = preprocess_record(
+ merge_dicts(wan_link, properties, wim_info=wim_info))
+
+ self.logger.debug({'UPDATE': updates})
+ with self.lock:
+ num_changes = self.db.update_rows(
+ 'instance_wim_nets', UPDATE=updates,
+ WHERE={'uuid': wan_link['uuid']})
+
+ if num_changes is None:
+ raise UnexpectedDatabaseError(
+ 'Impossible to update instance_wim_nets ' + wan_link['uuid'])
+
+ return num_changes
+
+ def get_instance_nets(self, instance_scenario_id, sce_net_id, **kwargs):
+ """Retrieve all the instance nets related to the same instance_scenario
+ and scenario network
+ """
+ return self.query(
+ 'instance_nets',
+ WHERE={'instance_scenario_id': instance_scenario_id,
+ 'sce_net_id': sce_net_id},
+ ORDER_BY=kwargs.pop(
+ 'ORDER_BY', ('instance_scenario_id', 'sce_net_id')),
+ **kwargs)
+
+ def update_instance_action_counters(self, uuid, failed=None, done=None):
+ """Atomically increment/decrement number_done and number_failed fields
+ in the instance action table
+ """
+ changes = remove_none_items({
+ 'number_failed': failed and {'INCREMENT': failed},
+ 'number_done': done and {'INCREMENT': done}
+ })
+
+ if not changes:
+ return 0
+
+ with self.lock:
+ return self.db.update_rows('instance_actions',
+ WHERE={'uuid': uuid}, UPDATE=changes)
+
+ def get_only_vm_with_external_net(self, instance_net_id, **kwargs):
+ """Return an instance VM if that is the only VM connected to an
+ external network identified by instance_net_id
+ """
+ counting = ('SELECT DISTINCT instance_net_id '
+ 'FROM instance_interfaces '
+ 'WHERE instance_net_id="{}" AND type="external" '
+ 'GROUP BY instance_net_id '
+ 'HAVING COUNT(*)=1').format(self.safe_str(instance_net_id))
+
+ vm_item = ('SELECT DISTINCT instance_vm_id '
+ 'FROM instance_interfaces NATURAL JOIN ({}) AS a'
+ .format(counting))
+
+ return self.query_one(
+ 'instance_vms JOIN ({}) as instance_interface '
+ 'ON instance_vms.uuid=instance_interface.instance_vm_id'
+ .format(vm_item), **kwargs)
+
+ def safe_str(self, string):
+ """Return a SQL safe string"""
+ return self.db.escape_string(string)
+
+ def _generate_port_mapping_id(self, mapping_info):
+ """Given a port mapping represented by a dict with a 'type' field,
+ generate a unique string, in a injective way.
+ """
+ mapping_info = mapping_info.copy() # Avoid mutating original object
+ mapping_type = mapping_info.pop('mapping_type', None)
+ if not mapping_type:
+ raise UndefinedWanMappingType(mapping_info)
+
+ unique_fields = UNIQUE_PORT_MAPPING_INFO_FIELDS.get(mapping_type)
+
+ if unique_fields:
+ mapping_info = filter_dict_keys(mapping_info, unique_fields)
+ else:
+ self.logger.warning('Unique fields for WIM port mapping of type '
+ '%s not defined. Please add a list of fields '
+ 'which combination should be unique in '
+ 'UNIQUE_PORT_MAPPING_INFO_FIELDS '
+ '(`wim/persistency.py) ', mapping_type)
+
+ repeatable_repr = json.dumps(mapping_info, encoding='utf-8',
+ sort_keys=True, indent=False)
+
+ return ':'.join([mapping_type, _str2id(repeatable_repr)])
+
+
+def _serialize(value):
+ """Serialize an arbitrary value in a consistent way,
+ so it can be stored in a database inside a text field
+ """
+ return yaml.safe_dump(value, default_flow_style=True, width=256)
+
+
+def _unserialize(text):
+ """Unserialize text representation into an arbitrary value,
+ so it can be loaded from the database
+ """
+ return yaml.safe_load(text)
+
+
+def preprocess_record(record):
+ """Small transformations to be applied to the data that cames from the
+ user before writing it to the database. By default, filter out timestamps,
+ and serialize the ``config`` field.
+ """
+ automatic_fields = ['created_at', 'modified_at']
+ record = serialize_fields(filter_out_dict_keys(record, automatic_fields))
+
+ return record
+
+
+def _preprocess_wim_account(wim_account):
+ """Do the default preprocessing and convert the 'created' field from
+ boolean to string
+ """
+ wim_account = preprocess_record(wim_account)
+
+ created = wim_account.get('created')
+ wim_account['created'] = (
+ 'true' if created is True or created == 'true' else 'false')
+
+ return wim_account
+
+
+def _postprocess_record(record, hide=_CONFIDENTIAL_FIELDS):
+ """By default, hide passwords fields, unserialize ``config`` fields, and
+ convert float timestamps to strings
+ """
+ record = hide_confidential_fields(record, hide)
+ record = unserialize_fields(record, hide)
+
+ convert_float_timestamp2str(record)
+
+ return record
+
+
+def _postprocess_action(action):
+ if action.get('extra'):
+ action['extra'] = _unserialize(action['extra'])
+
+ return action
+
+
+def _postprocess_wim_account(wim_account, hide=_CONFIDENTIAL_FIELDS):
+ """Do the default postprocessing and convert the 'created' field from
+ string to boolean
+ """
+ # Fix fields from join
+ for field in ('type', 'description', 'wim_url'):
+ if field in wim_account:
+ wim_account['wim.'+field] = wim_account.pop(field)
+
+ for field in ('id', 'nfvo_tenant_id', 'wim_account_id'):
+ if field in wim_account:
+ wim_account['association.'+field] = wim_account.pop(field)
+
+ wim_account = _postprocess_record(wim_account, hide)
+
+ created = wim_account.get('created')
+ wim_account['created'] = (created is True or created == 'true')
+
+ return wim_account
+
+
+def _postprocess_wim_port_mapping(mapping, hide=_CONFIDENTIAL_FIELDS):
+ mapping = _postprocess_record(mapping, hide=hide)
+ mapping_info = mapping.get('wan_service_mapping_info', None) or {}
+ mapping['wan_service_mapping_info'] = mapping_info
+ return mapping
+
+
+def hide_confidential_fields(record, fields=_CONFIDENTIAL_FIELDS):
+ """Obfuscate confidential fields from the input dict.
+
+ Note:
+ This function performs a SHALLOW operation.
+ """
+ if not(isinstance(record, dict) and fields):
+ return record
+
+ keys = record.iterkeys()
+ keys = (k for k in keys for f in fields if k == f or k.endswith('.'+f))
+
+ return merge_dicts(record, {k: '********' for k in keys if record[k]})
+
+
+def unserialize_fields(record, hide=_CONFIDENTIAL_FIELDS,
+ fields=_SERIALIZED_FIELDS):
+ """Unserialize fields that where stored in the database as a serialized
+ YAML (or JSON)
+ """
+ keys = record.iterkeys()
+ keys = (k for k in keys for f in fields if k == f or k.endswith('.'+f))
+
+ return merge_dicts(record, {
+ key: hide_confidential_fields(_unserialize(record[key]), hide)
+ for key in keys if record[key]
+ })
+
+
+def serialize_fields(record, fields=_SERIALIZED_FIELDS):
+ """Serialize fields to be stored in the database as YAML"""
+ keys = record.iterkeys()
+ keys = (k for k in keys for f in fields if k == f or k.endswith('.'+f))
+
+ return merge_dicts(record, {
+ key: _serialize(record[key])
+ for key in keys if record[key] is not None
+ })
+
+
+def _decide_name_or_uuid(value):
+ reference = value
+
+ if isinstance(value, (list, tuple)):
+ reference = value[0] if value else ''
+
+ return 'uuid' if check_valid_uuid(reference) else 'name'
+
+
+def _compose_where_from_uuids_or_names(**conditions):
+ """Create a dict containing the right conditions to be used in a database
+ query.
+
+ This function chooses between ``names`` and ``uuid`` fields based on the
+ format of the passed string.
+ If a list is passed, the first element of the list will be used to choose
+ the name of the field.
+ If a ``None`` value is passed, ``uuid`` is used.
+
+ Note that this function automatically translates ``tenant`` to
+ ``nfvo_tenant`` for the sake of brevity.
+
+ Example:
+ >>> _compose_where_from_uuids_or_names(
+ wim='abcdef',
+ tenant=['xyz123', 'def456']
+ datacenter='5286a274-8a1b-4b8d-a667-9c94261ad855')
+ {'wim.name': 'abcdef',
+ 'nfvo_tenant.name': ['xyz123', 'def456']
+ 'datacenter.uuid': '5286a274-8a1b-4b8d-a667-9c94261ad855'}
+ """
+ if 'tenant' in conditions:
+ conditions['nfvo_tenant'] = conditions.pop('tenant')
+
+ return {
+ '{}.{}'.format(kind, _decide_name_or_uuid(value)): value
+ for kind, value in conditions.items() if value
+ }
+
+
+def _str2id(text):
+ """Create an ID (following the UUID format) from a piece of arbitrary
+ text.
+
+ Different texts should generate different IDs, and the same text should
+ generate the same ID in a repeatable way.
+ """
+ return sha1(text).hexdigest()
diff --git a/osm_ro/wim/schemas.py b/osm_ro/wim/schemas.py
new file mode 100644
index 0000000..a040405
--- /dev/null
+++ b/osm_ro/wim/schemas.py
@@ -0,0 +1,177 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+
+from ..openmano_schemas import (
+ description_schema,
+ name_schema,
+ nameshort_schema
+)
+
+# WIM -------------------------------------------------------------------------
+wim_types = ["tapi", "onos", "odl"]
+
+wim_schema_properties = {
+ "name": name_schema,
+ "description": description_schema,
+ "type": {
+ "type": "string",
+ "enum": ["tapi", "onos", "odl"]
+ },
+ "wim_url": description_schema,
+ "config": {"type": "object"}
+}
+
+wim_schema = {
+ "title": "wim information schema",
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "wim": {
+ "type": "object",
+ "properties": wim_schema_properties,
+ "required": ["name", "type", "wim_url"],
+ "additionalProperties": True
+ }
+ },
+ "required": ["wim"],
+ "additionalProperties": False
+}
+
+wim_edit_schema = {
+ "title": "wim edit information schema",
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "wim": {
+ "type": "object",
+ "properties": wim_schema_properties,
+ "additionalProperties": False
+ }
+ },
+ "required": ["wim"],
+ "additionalProperties": False
+}
+
+wim_account_schema = {
+ "title": "wim account information schema",
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "wim_account": {
+ "type": "object",
+ "properties": {
+ "name": name_schema,
+ "user": nameshort_schema,
+ "password": nameshort_schema,
+ "config": {"type": "object"}
+ },
+ "additionalProperties": True
+ }
+ },
+ "required": ["wim_account"],
+ "additionalProperties": False
+}
+
+dpid_type = {
+ "type": "string",
+ "pattern":
+ "^[0-9a-zA-Z]+(:[0-9a-zA-Z]+)*$"
+}
+
+port_type = {
+ "oneOf": [
+ {"type": "string",
+ "minLength": 1,
+ "maxLength": 5},
+ {"type": "integer",
+ "minimum": 1,
+ "maximum": 65534}
+ ]
+}
+
+wim_port_mapping_schema = {
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "title": "wim mapping information schema",
+ "type": "object",
+ "properties": {
+ "wim_port_mapping": {
+ "type": "array",
+ "items": {
+ "type": "object",
+ "properties": {
+ "datacenter_name": nameshort_schema,
+ "pop_wan_mappings": {
+ "type": "array",
+ "items": {
+ "type": "object",
+ "properties": {
+ "pop_switch_dpid": dpid_type,
+ "pop_switch_port": port_type,
+ "wan_service_endpoint_id": name_schema,
+ "wan_service_mapping_info": {
+ "type": "object",
+ "properties": {
+ "mapping_type": name_schema,
+ "wan_switch_dpid": dpid_type,
+ "wan_switch_port": port_type
+ },
+ "additionalProperties": True,
+ "required": ["mapping_type"]
+ }
+ },
+ "oneOf": [
+ {
+ "required": [
+ "pop_switch_dpid",
+ "pop_switch_port",
+ "wan_service_endpoint_id"
+ ]
+ },
+ {
+ "required": [
+ "pop_switch_dpid",
+ "pop_switch_port",
+ "wan_service_mapping_info"
+ ]
+ }
+ ]
+ }
+ }
+ },
+ "required": ["datacenter_name", "pop_wan_mappings"]
+ }
+ }
+ },
+ "required": ["wim_port_mapping"]
+}
diff --git a/osm_ro/wim/tests/__init__.py b/osm_ro/wim/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/osm_ro/wim/tests/__init__.py
diff --git a/osm_ro/wim/tests/fixtures.py b/osm_ro/wim/tests/fixtures.py
new file mode 100644
index 0000000..1b52e49
--- /dev/null
+++ b/osm_ro/wim/tests/fixtures.py
@@ -0,0 +1,307 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+# pylint: disable=W0621
+
+from __future__ import unicode_literals
+
+import json
+from time import time
+
+from six.moves import range
+
+from ...tests.db_helpers import uuid, sha1
+
+NUM_WIMS = 3
+NUM_TENANTS = 2
+NUM_DATACENTERS = 2
+
+
+# In the following functions, the identifiers should be simple integers
+
+
+def wim(identifier=0):
+ return {'name': 'wim%d' % identifier,
+ 'uuid': uuid('wim%d' % identifier),
+ 'wim_url': 'localhost',
+ 'type': 'tapi'}
+
+
+def tenant(identifier=0):
+ return {'name': 'tenant%d' % identifier,
+ 'uuid': uuid('tenant%d' % identifier)}
+
+
+def wim_account(wim, tenant):
+ return {'name': 'wim-account%d%d' % (tenant, wim),
+ 'uuid': uuid('wim-account%d%d' % (tenant, wim)),
+ 'user': 'user%d%d' % (tenant, wim),
+ 'password': 'password%d%d' % (tenant, wim),
+ 'wim_id': uuid('wim%d' % wim),
+ 'created': 'true'}
+
+
+def wim_tenant_association(wim, tenant):
+ return {'nfvo_tenant_id': uuid('tenant%d' % tenant),
+ 'wim_id': uuid('wim%d' % wim),
+ 'wim_account_id': uuid('wim-account%d%d' % (tenant, wim))}
+
+
+def wim_set(identifier=0, tenant=0):
+ """Records necessary to create a WIM and connect it to a tenant"""
+ return [
+ {'wims': [wim(identifier)]},
+ {'wim_accounts': [wim_account(identifier, tenant)]},
+ {'wim_nfvo_tenants': [wim_tenant_association(identifier, tenant)]}
+ ]
+
+
+def datacenter(identifier):
+ return {'uuid': uuid('dc%d' % identifier),
+ 'name': 'dc%d' % identifier,
+ 'type': 'openvim',
+ 'vim_url': 'localhost'}
+
+
+def datacenter_account(datacenter, tenant):
+ return {'name': 'dc-account%d%d' % (tenant, datacenter),
+ 'uuid': uuid('dc-account%d%d' % (tenant, datacenter)),
+ 'datacenter_id': uuid('dc%d' % datacenter),
+ 'created': 'true'}
+
+
+def datacenter_tenant_association(datacenter, tenant):
+ return {'nfvo_tenant_id': uuid('tenant%d' % tenant),
+ 'datacenter_id': uuid('dc%d' % datacenter),
+ 'datacenter_tenant_id':
+ uuid('dc-account%d%d' % (tenant, datacenter))}
+
+
+def datacenter_set(identifier, tenant):
+ """Records necessary to create a datacenter and connect it to a tenant"""
+ return [
+ {'datacenters': [datacenter(identifier)]},
+ {'datacenter_tenants': [datacenter_account(identifier, tenant)]},
+ {'tenants_datacenters': [
+ datacenter_tenant_association(identifier, tenant)
+ ]}
+ ]
+
+
+def wim_port_mapping(wim, datacenter,
+ pop_dpid='AA:AA:AA:AA:AA:AA:AA:AA', pop_port=0,
+ wan_dpid='BB:BB:BB:BB:BB:BB:BB:BB', wan_port=0):
+ mapping_info = {'mapping_type': 'dpid-port',
+ 'wan_switch_dpid': wan_dpid,
+ 'wan_switch_port': wan_port + datacenter + 1}
+ id_ = 'dpid-port|' + sha1(json.dumps(mapping_info, sort_keys=True))
+
+ return {'wim_id': uuid('wim%d' % wim),
+ 'datacenter_id': uuid('dc%d' % datacenter),
+ 'pop_switch_dpid': pop_dpid,
+ 'pop_switch_port': pop_port + wim + 1,
+ # ^ Datacenter router have one port managed by each WIM
+ 'wan_service_endpoint_id': id_,
+ # ^ WIM managed router have one port connected to each DC
+ 'wan_service_mapping_info': json.dumps(mapping_info)}
+
+
+def processed_port_mapping(wim, datacenter,
+ num_pairs=1,
+ pop_dpid='AA:AA:AA:AA:AA:AA:AA:AA',
+ wan_dpid='BB:BB:BB:BB:BB:BB:BB:BB'):
+ """Emulate the response of the Persistence class, where the records in the
+ data base are grouped by wim and datacenter
+ """
+ return {
+ 'wim_id': uuid('wim%d' % wim),
+ 'datacenter_id': uuid('dc%d' % datacenter),
+ 'wan_pop_port_mappings': [
+ {'pop_switch_dpid': pop_dpid,
+ 'pop_switch_port': wim + 1 + i,
+ 'wan_service_endpoint_id':
+ sha1('dpid-port|%s|%d' % (wan_dpid, datacenter + 1 + i)),
+ 'wan_service_mapping_info': {
+ 'mapping_type': 'dpid-port',
+ 'wan_switch_dpid': wan_dpid,
+ 'wan_switch_port': datacenter + 1 + i}}
+ for i in range(num_pairs)
+ ]
+ }
+
+
+def consistent_set(num_wims=NUM_WIMS, num_tenants=NUM_TENANTS,
+ num_datacenters=NUM_DATACENTERS):
+ return [
+ {'nfvo_tenants': [tenant(i) for i in range(num_tenants)]},
+ {'wims': [wim(j) for j in range(num_wims)]},
+ {'wim_accounts': [
+ wim_account(j, i)
+ for i in range(num_tenants)
+ for j in range(num_wims)
+ ]},
+ {'wim_nfvo_tenants': [
+ wim_tenant_association(j, i)
+ for i in range(num_tenants)
+ for j in range(num_wims)
+ ]},
+ {'datacenters': [
+ datacenter(k)
+ for k in range(num_datacenters)
+ ]},
+ {'datacenter_tenants': [
+ datacenter_account(k, i)
+ for i in range(num_tenants)
+ for k in range(num_datacenters)
+ ]},
+ {'tenants_datacenters': [
+ datacenter_tenant_association(k, i)
+ for i in range(num_tenants)
+ for k in range(num_datacenters)
+ ]},
+ {'wim_port_mappings': [
+ wim_port_mapping(j, k)
+ for j in range(num_wims)
+ for k in range(num_datacenters)
+ ]},
+ ]
+
+
+def instance_nets(num_datacenters=2, num_links=2):
+ """Example of multi-site deploy with N datacenters and M WAN links between
+ them (e.g M = 2 -> back and forth)
+ """
+ return [
+ {'uuid': uuid('net%d%d' % (k, l)),
+ 'datacenter_id': uuid('dc%d' % k),
+ 'datacenter_tenant_id': uuid('dc-account0%d' % k),
+ 'instance_scenario_id': uuid('nsr0'),
+ # ^ instance_scenario_id == NS Record id
+ 'sce_net_id': uuid('vld%d' % l),
+ # ^ scenario net id == VLD id
+ 'status': 'BUILD',
+ 'vim_net_id': None,
+ 'created': True}
+ for k in range(num_datacenters)
+ for l in range(num_links)
+ ]
+
+
+def wim_actions(action='CREATE', status='SCHEDULED',
+ action_id=None, instance=0,
+ wim=0, tenant=0, num_links=1):
+ """Create a list of actions for the WIM,
+
+ Arguments:
+ action: type of action (CREATE) by default
+ wim: WIM fixture index to create actions for
+ tenant: tenant fixture index to create actions for
+ num_links: number of WAN links to be established by each WIM
+ """
+
+ action_id = action_id or 'ACTION-{}'.format(time())
+
+ return [
+ {
+ 'action': action,
+ 'wim_internal_id': uuid('-wim-net%d%d%d' % (wim, instance, link)),
+ 'wim_account_id': uuid('wim-account%d%d' % (tenant, wim)),
+ 'instance_action_id': action_id,
+ 'item': 'instance_wim_nets',
+ 'item_id': uuid('wim-net%d%d%d' % (wim, instance, link)),
+ 'status': status,
+ 'task_index': link,
+ 'created_at': time(),
+ 'modified_at': time(),
+ 'extra': None
+ }
+ for link in range(num_links)
+ ]
+
+
+def instance_action(tenant=0, instance=0, action_id=None,
+ num_tasks=1, num_done=0, num_failed=0):
+ action_id = action_id or 'ACTION-{}'.format(time())
+
+ return {
+ 'uuid': action_id,
+ 'tenant_id': uuid('tenant%d' % tenant),
+ 'instance_id': uuid('nsr%d' % instance),
+ 'number_tasks': num_tasks,
+ 'number_done': num_done,
+ 'number_failed': num_failed,
+ }
+
+
+def instance_wim_nets(instance=0, wim=0, num_links=1,
+ status='SCHEDULED_CREATION'):
+ """Example of multi-site deploy with N wims and M WAN links between
+ them (e.g M = 2 -> back and forth)
+ VIM nets
+ """
+ return [
+ {'uuid': uuid('wim-net%d%d%d' % (wim, instance, l)),
+ 'wim_id': uuid('wim%d' % wim),
+ 'wim_account_id': uuid('wim-account%d' % wim),
+ 'wim_internal_id': uuid('-net%d%d' % (wim, l)),
+ 'instance_scenario_id': uuid('nsr%d' % instance),
+ # ^ instance_scenario_id == NS Record id
+ 'sce_net_id': uuid('vld%d' % l),
+ # ^ scenario net id == VLD id
+ 'status': status,
+ 'created': False}
+ for l in range(num_links)
+ ]
+
+
+def instance_vm(instance=0, vim_info=None):
+ vim_info = {'OS-EXT-SRV-ATTR:hypervisor_hostname': 'host%d' % instance}
+ return {
+ 'uuid': uuid('vm%d' % instance),
+ 'instance_vnf_id': uuid('vnf%d' % instance),
+ 'vm_id': uuid('vm%d' % instance),
+ 'vim_vm_id': uuid('vm%d' % instance),
+ 'status': 'ACTIVE',
+ 'vim_info': vim_info,
+ }
+
+
+def instance_interface(instance=0, interface=0, datacenter=0, link=0):
+ return {
+ 'uuid': uuid('interface%d%d' % (instance, interface)),
+ 'instance_vm_id': uuid('vm%d' % instance),
+ 'instance_net_id': uuid('net%d%d' % (datacenter, link)),
+ 'interface_id': uuid('iface%d' % interface),
+ 'type': 'external',
+ 'vlan': 3
+ }
diff --git a/osm_ro/wim/tests/test_actions.py b/osm_ro/wim/tests/test_actions.py
new file mode 100644
index 0000000..920182b
--- /dev/null
+++ b/osm_ro/wim/tests/test_actions.py
@@ -0,0 +1,366 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+# pylint: disable=E1101
+
+from __future__ import unicode_literals, print_function
+
+import json
+import unittest
+from time import time
+
+from mock import MagicMock, patch
+
+from . import fixtures as eg
+from ...tests.db_helpers import (
+ TestCaseWithDatabasePerTest,
+ disable_foreign_keys,
+ uuid,
+)
+from ..persistence import WimPersistence
+from ..wan_link_actions import WanLinkCreate, WanLinkDelete
+from ..wimconn import WimConnectorError
+
+
+class TestActionsWithDb(TestCaseWithDatabasePerTest):
+ def setUp(self):
+ super(TestActionsWithDb, self).setUp()
+ self.persist = WimPersistence(self.db)
+ self.connector = MagicMock()
+ self.ovim = MagicMock()
+
+
+class TestCreate(TestActionsWithDb):
+ @disable_foreign_keys
+ def test_process__instance_nets_on_build(self):
+ # Given we want 1 WAN link between 2 datacenters
+ # and the local network in each datacenter is still being built
+ wan_link = eg.instance_wim_nets()
+ instance_nets = eg.instance_nets(num_datacenters=2, num_links=1)
+ for net in instance_nets:
+ net['status'] = 'BUILD'
+ self.populate([{'instance_nets': instance_nets,
+ 'instance_wim_nets': wan_link}])
+
+ # When we try to process a CREATE action that refers to the same
+ # instance_scenario_id and sce_net_id
+ now = time()
+ action = WanLinkCreate(eg.wim_actions('CREATE')[0])
+ action.instance_scenario_id = instance_nets[0]['instance_scenario_id']
+ action.sce_net_id = instance_nets[0]['sce_net_id']
+ # -- ensure it is in the database for updates --> #
+ action_record = action.as_record()
+ action_record['extra'] = json.dumps(action_record['extra'])
+ self.populate([{'vim_wim_actions': action_record}])
+ # <-- #
+ action.process(self.connector, self.persist, self.ovim)
+
+ # Then the action should be defered
+ assert action.is_scheduled
+ self.assertEqual(action.extra['attempts'], 1)
+ self.assertGreater(action.extra['last_attempted_at'], now)
+
+ @disable_foreign_keys
+ def test_process__instance_nets_on_error(self):
+ # Given we want 1 WAN link between 2 datacenters
+ # and at least one local network is in a not good state (error, or
+ # being deleted)
+ instance_nets = eg.instance_nets(num_datacenters=2, num_links=1)
+ instance_nets[1]['status'] = 'SCHEDULED_DELETION'
+ wan_link = eg.instance_wim_nets()
+ self.populate([{'instance_nets': instance_nets,
+ 'instance_wim_nets': wan_link}])
+
+ # When we try to process a CREATE action that refers to the same
+ # instance_scenario_id and sce_net_id
+ action = WanLinkCreate(eg.wim_actions('CREATE')[0])
+ action.instance_scenario_id = instance_nets[0]['instance_scenario_id']
+ action.sce_net_id = instance_nets[0]['sce_net_id']
+ # -- ensure it is in the database for updates --> #
+ action_record = action.as_record()
+ action_record['extra'] = json.dumps(action_record['extra'])
+ self.populate([{'vim_wim_actions': action_record}])
+ # <-- #
+ action.process(self.connector, self.persist, self.ovim)
+
+ # Then the action should fail
+ assert action.is_failed
+ self.assertIn('issue with the local networks', action.error_msg)
+ self.assertIn('SCHEDULED_DELETION', action.error_msg)
+
+ def prepare_create__sdn(self):
+ db_state = [{'nfvo_tenants': eg.tenant()}] + eg.wim_set()
+
+ instance_nets = eg.instance_nets(num_datacenters=2, num_links=1)
+ port_mappings = [
+ eg.wim_port_mapping(0, 0),
+ eg.wim_port_mapping(0, 1)
+ ]
+ instance_action = eg.instance_action(action_id='ACTION-000')
+ for i, net in enumerate(instance_nets):
+ net['status'] = 'ACTIVE'
+ net['sdn_net_id'] = uuid('sdn-net%d' % i)
+
+ db_state += [{'instance_nets': instance_nets},
+ {'instance_wim_nets': eg.instance_wim_nets()},
+ {'wim_port_mappings': port_mappings},
+ {'instance_actions': instance_action}]
+
+ action = WanLinkCreate(
+ eg.wim_actions('CREATE', action_id='ACTION-000')[0])
+ # --> ensure it is in the database for updates --> #
+ action_record = action.as_record()
+ action_record['extra'] = json.dumps(action_record['extra'])
+ self.populate([{'vim_wim_actions': action_record}])
+
+ return db_state, action
+
+ @disable_foreign_keys
+ def test_process__sdn(self):
+ # Given we want 1 WAN link between 2 datacenters
+ # and the local network in each datacenter is already created
+ db_state, action = self.prepare_create__sdn()
+ self.populate(db_state)
+
+ instance_action = self.persist.get_by_uuid(
+ 'instance_actions', action.instance_action_id)
+ number_done = instance_action['number_done']
+ number_failed = instance_action['number_failed']
+
+ connector_patch = patch.object(
+ self.connector, 'create_connectivity_service',
+ lambda *_, **__: (uuid('random-id'), None))
+
+ ovim_patch = patch.object(
+ self.ovim, 'get_ports', MagicMock(return_value=[{
+ 'switch_dpid': 'AA:AA:AA:AA:AA:AA:AA:AA',
+ 'switch_port': 1,
+ }]))
+
+ # If the connector works fine
+ with connector_patch, ovim_patch:
+ # When we try to process a CREATE action that refers to the same
+ # instance_scenario_id and sce_net_id
+ action.process(self.connector, self.persist, self.ovim)
+
+ # Then the action should be succeeded
+ db_action = self.persist.query_one('vim_wim_actions', WHERE={
+ 'instance_action_id': action.instance_action_id,
+ 'task_index': action.task_index})
+ self.assertEqual(db_action['status'], 'DONE')
+
+ instance_action = self.persist.get_by_uuid(
+ 'instance_actions', action.instance_action_id)
+ self.assertEqual(instance_action['number_done'], number_done + 1)
+ self.assertEqual(instance_action['number_failed'], number_failed)
+
+ @disable_foreign_keys
+ def test_process__sdn_fail(self):
+ # Given we want 1 WAN link between 2 datacenters
+ # and the local network in each datacenter is already created
+ db_state, action = self.prepare_create__sdn()
+ self.populate(db_state)
+
+ instance_action = self.persist.get_by_uuid(
+ 'instance_actions', action.instance_action_id)
+ number_done = instance_action['number_done']
+ number_failed = instance_action['number_failed']
+
+ connector_patch = patch.object(
+ self.connector, 'create_connectivity_service',
+ MagicMock(side_effect=WimConnectorError('foobar')))
+
+ ovim_patch = patch.object(
+ self.ovim, 'get_ports', MagicMock(return_value=[{
+ 'switch_dpid': 'AA:AA:AA:AA:AA:AA:AA:AA',
+ 'switch_port': 1,
+ }]))
+
+ # If the connector throws an error
+ with connector_patch, ovim_patch:
+ # When we try to process a CREATE action that refers to the same
+ # instance_scenario_id and sce_net_id
+ action.process(self.connector, self.persist, self.ovim)
+
+ # Then the action should be fail
+ db_action = self.persist.query_one('vim_wim_actions', WHERE={
+ 'instance_action_id': action.instance_action_id,
+ 'task_index': action.task_index})
+ self.assertEqual(db_action['status'], 'FAILED')
+
+ instance_action = self.persist.get_by_uuid(
+ 'instance_actions', action.instance_action_id)
+ self.assertEqual(instance_action['number_done'], number_done)
+ self.assertEqual(instance_action['number_failed'], number_failed + 1)
+
+
+class TestDelete(TestActionsWithDb):
+ @disable_foreign_keys
+ def test_process__no_internal_id(self):
+ # Given no WAN link was created yet,
+ # when we try to process a DELETE action, with no wim_internal_id
+ action = WanLinkDelete(eg.wim_actions('DELETE')[0])
+ action.wim_internal_id = None
+ # -- ensure it is in the database for updates --> #
+ action_record = action.as_record()
+ action_record['extra'] = json.dumps(action_record['extra'])
+ self.populate([{'vim_wim_actions': action_record,
+ 'instance_wim_nets': eg.instance_wim_nets()}])
+ # <-- #
+ action.process(self.connector, self.persist, self.ovim)
+
+ # Then the action should succeed
+ assert action.is_done
+
+ def prepare_delete(self):
+ db_state = [{'nfvo_tenants': eg.tenant()}] + eg.wim_set()
+
+ instance_nets = eg.instance_nets(num_datacenters=2, num_links=1)
+ port_mappings = [
+ eg.wim_port_mapping(0, 0),
+ eg.wim_port_mapping(0, 1)
+ ]
+ instance_action = eg.instance_action(action_id='ACTION-000')
+ for i, net in enumerate(instance_nets):
+ net['status'] = 'ACTIVE'
+ net['sdn_net_id'] = uuid('sdn-net%d' % i)
+
+ db_state += [{'instance_nets': instance_nets},
+ {'instance_wim_nets': eg.instance_wim_nets()},
+ {'wim_port_mappings': port_mappings},
+ {'instance_actions': instance_action}]
+
+ action = WanLinkDelete(
+ eg.wim_actions('DELETE', action_id='ACTION-000')[0])
+ # --> ensure it is in the database for updates --> #
+ action_record = action.as_record()
+ action_record['extra'] = json.dumps(action_record['extra'])
+ self.populate([{'vim_wim_actions': action_record}])
+
+ return db_state, action
+
+ @disable_foreign_keys
+ def test_process(self):
+ # Given we want to delete 1 WAN link between 2 datacenters
+ db_state, action = self.prepare_delete()
+ self.populate(db_state)
+
+ instance_action = self.persist.get_by_uuid(
+ 'instance_actions', action.instance_action_id)
+ number_done = instance_action['number_done']
+ number_failed = instance_action['number_failed']
+
+ connector_patch = patch.object(
+ self.connector, 'delete_connectivity_service')
+
+ # If the connector works fine
+ with connector_patch:
+ # When we try to process a DELETE action that refers to the same
+ # instance_scenario_id and sce_net_id
+ action.process(self.connector, self.persist, self.ovim)
+
+ # Then the action should be succeeded
+ db_action = self.persist.query_one('vim_wim_actions', WHERE={
+ 'instance_action_id': action.instance_action_id,
+ 'task_index': action.task_index})
+ self.assertEqual(db_action['status'], 'DONE')
+
+ instance_action = self.persist.get_by_uuid(
+ 'instance_actions', action.instance_action_id)
+ self.assertEqual(instance_action['number_done'], number_done + 1)
+ self.assertEqual(instance_action['number_failed'], number_failed)
+
+ @disable_foreign_keys
+ def test_process__wan_link_error(self):
+ # Given we have a delete action that targets a wan link with an error
+ db_state, action = self.prepare_delete()
+ wan_link = [tables for tables in db_state
+ if tables.get('instance_wim_nets')][0]['instance_wim_nets']
+ from pprint import pprint
+ pprint(wan_link)
+ wan_link[0]['status'] = 'ERROR'
+ self.populate(db_state)
+
+ # When we try to process it
+ action.process(self.connector, self.persist, self.ovim)
+
+ # Then it should fail
+ assert action.is_failed
+
+ def create_action(self):
+ action = WanLinkCreate(
+ eg.wim_actions('CREATE', action_id='ACTION-000')[0])
+ # --> ensure it is in the database for updates --> #
+ action_record = action.as_record()
+ action_record['extra'] = json.dumps(action_record['extra'])
+ self.populate([{'vim_wim_actions': action_record}])
+
+ return action
+
+ @disable_foreign_keys
+ def test_create_and_delete(self):
+ # Given a CREATE action was well succeeded
+ db_state, delete_action = self.prepare_delete()
+ delete_action.save(self.persist, task_index=1)
+ self.populate(db_state)
+ create_action = self.create_action()
+
+ connector_patch = patch.multiple(
+ self.connector,
+ delete_connectivity_service=MagicMock(),
+ create_connectivity_service=(
+ lambda *_, **__: (uuid('random-id'), None)))
+
+ ovim_patch = patch.object(
+ self.ovim, 'get_ports', MagicMock(return_value=[{
+ 'switch_dpid': 'AA:AA:AA:AA:AA:AA:AA:AA',
+ 'switch_port': 1,
+ }]))
+
+ with connector_patch, ovim_patch:
+ create_action.process(self.connector, self.persist, self.ovim)
+
+ # When we try to process a CREATE action that refers to the same
+ # instance_scenario_id and sce_net_id
+ with connector_patch:
+ delete_action.process(self.connector, self.persist, self.ovim)
+
+ # Then the DELETE action should be successful
+ db_action = self.persist.query_one('vim_wim_actions', WHERE={
+ 'instance_action_id': delete_action.instance_action_id,
+ 'task_index': delete_action.task_index})
+ self.assertEqual(db_action['status'], 'DONE')
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/osm_ro/wim/tests/test_engine.py b/osm_ro/wim/tests/test_engine.py
new file mode 100644
index 0000000..6fb2d8c
--- /dev/null
+++ b/osm_ro/wim/tests/test_engine.py
@@ -0,0 +1,180 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+
+from __future__ import unicode_literals
+
+import unittest
+
+from mock import MagicMock
+
+from . import fixtures as eg
+from ...tests.db_helpers import TestCaseWithDatabasePerTest, uuid
+from ..errors import NoWimConnectedToDatacenters
+from ..engine import WimEngine
+from ..persistence import WimPersistence
+
+
+class TestWimEngineDbMethods(TestCaseWithDatabasePerTest):
+ def setUp(self):
+ super(TestWimEngineDbMethods, self).setUp()
+ self.persist = WimPersistence(self.db)
+ self.engine = WimEngine(persistence=self.persist)
+ self.addCleanup(self.engine.stop_threads)
+
+ def populate(self, seeds=None):
+ super(TestWimEngineDbMethods, self).populate(
+ seeds or eg.consistent_set())
+
+ def test_find_common_wims(self):
+ # Given we have 2 WIM, 3 datacenters, but just 1 of the WIMs have
+ # access to them
+ self.populate([{'nfvo_tenants': [eg.tenant(0)]}] +
+ eg.wim_set(0, 0) +
+ eg.wim_set(1, 0) +
+ eg.datacenter_set(0, 0) +
+ eg.datacenter_set(1, 0) +
+ eg.datacenter_set(2, 0) +
+ [{'wim_port_mappings': [
+ eg.wim_port_mapping(0, 0),
+ eg.wim_port_mapping(0, 1),
+ eg.wim_port_mapping(0, 2)]}])
+
+ # When we retrieve the wims interconnecting some datacenters
+ wim_ids = self.engine.find_common_wims(
+ [uuid('dc0'), uuid('dc1'), uuid('dc2')], tenant='tenant0')
+
+ # Then we should have just the first wim
+ self.assertEqual(len(wim_ids), 1)
+ self.assertEqual(wim_ids[0], uuid('wim0'))
+
+ def test_find_common_wims_multiple(self):
+ # Given we have 2 WIM, 3 datacenters, and all the WIMs have access to
+ # all datacenters
+ self.populate([{'nfvo_tenants': [eg.tenant(0)]}] +
+ eg.wim_set(0, 0) +
+ eg.wim_set(1, 0) +
+ eg.datacenter_set(0, 0) +
+ eg.datacenter_set(1, 0) +
+ eg.datacenter_set(2, 0) +
+ [{'wim_port_mappings': [
+ eg.wim_port_mapping(0, 0),
+ eg.wim_port_mapping(0, 1),
+ eg.wim_port_mapping(0, 2),
+ eg.wim_port_mapping(1, 0),
+ eg.wim_port_mapping(1, 1),
+ eg.wim_port_mapping(1, 2)]}])
+
+ # When we retrieve the wims interconnecting tree datacenters
+ wim_ids = self.engine.find_common_wims(
+ [uuid('dc0'), uuid('dc1'), uuid('dc2')], tenant='tenant0')
+
+ # Then we should have all the wims
+ self.assertEqual(len(wim_ids), 2)
+ self.assertItemsEqual(wim_ids, [uuid('wim0'), uuid('wim1')])
+
+ def test_find_common_wim(self):
+ # Given we have 1 WIM, 3 datacenters but the WIM have access to just 2
+ # of them
+ self.populate([{'nfvo_tenants': [eg.tenant(0)]}] +
+ eg.wim_set(0, 0) +
+ eg.datacenter_set(0, 0) +
+ eg.datacenter_set(1, 0) +
+ eg.datacenter_set(2, 0) +
+ [{'wim_port_mappings': [
+ eg.wim_port_mapping(0, 0),
+ eg.wim_port_mapping(0, 1)]}])
+
+ # When we retrieve the common wim for the 2 datacenter that are
+ # interconnected
+ wim_id = self.engine.find_common_wim(
+ [uuid('dc0'), uuid('dc1')], tenant='tenant0')
+
+ # Then we should find the wim
+ self.assertEqual(wim_id, uuid('wim0'))
+
+ # When we try to retrieve the common wim for the all the datacenters
+ # Then a NoWimConnectedToDatacenters exception should be raised
+ with self.assertRaises(NoWimConnectedToDatacenters):
+ self.engine.find_common_wim(
+ [uuid('dc0'), uuid('dc1'), uuid('dc2')], tenant='tenant0')
+
+ def test_find_common_wim__different_tenants(self):
+ # Given we have 1 WIM and 2 datacenters connected but the WIMs don't
+ # belong to the tenant we have access to...
+ self.populate([{'nfvo_tenants': [eg.tenant(0), eg.tenant(1)]}] +
+ eg.wim_set(0, 0) +
+ eg.datacenter_set(0, 0) +
+ eg.datacenter_set(1, 0) +
+ [{'wim_port_mappings': [
+ eg.wim_port_mapping(0, 0),
+ eg.wim_port_mapping(0, 1)]}])
+
+ # When we retrieve the common wim for the 2 datacenter that are
+ # interconnected, but using another tenant,
+ # Then we should get an exception
+ with self.assertRaises(NoWimConnectedToDatacenters):
+ self.engine.find_common_wim(
+ [uuid('dc0'), uuid('dc1')], tenant='tenant1')
+
+
+class TestWimEngine(unittest.TestCase):
+ def test_derive_wan_link(self):
+ # Given we have 2 datacenters connected by the same WIM, with port
+ # mappings registered
+ mappings = [eg.processed_port_mapping(0, 0),
+ eg.processed_port_mapping(0, 1)]
+ persist = MagicMock(
+ get_wim_port_mappings=MagicMock(return_value=mappings))
+
+ engine = WimEngine(persistence=persist)
+ self.addCleanup(engine.stop_threads)
+
+ # When we receive a list of 4 instance nets, representing
+ # 2 VLDs connecting 2 datacenters each
+ instance_nets = eg.instance_nets(2, 2)
+ wan_links = engine.derive_wan_links(
+ instance_nets, uuid('tenant0'))
+
+ # Then we should derive 2 wan_links with the same instance_scenario_id
+ # and different scenario_network_id
+ self.assertEqual(len(wan_links), 2)
+ for link in wan_links:
+ self.assertEqual(link['instance_scenario_id'], uuid('nsr0'))
+ # Each VLD needs a network to be created in each datacenter
+ self.assertItemsEqual([l['sce_net_id'] for l in wan_links],
+ [uuid('vld0'), uuid('vld1')])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/osm_ro/wim/tests/test_http_handler.py b/osm_ro/wim/tests/test_http_handler.py
new file mode 100644
index 0000000..04577e4
--- /dev/null
+++ b/osm_ro/wim/tests/test_http_handler.py
@@ -0,0 +1,508 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+
+from __future__ import unicode_literals
+
+import unittest
+
+import bottle
+from mock import MagicMock, patch
+from webtest import TestApp
+
+from . import fixtures as eg # "examples"
+from ...http_tools.errors import Conflict, Not_Found
+from ...tests.db_helpers import TestCaseWithDatabasePerTest, uuid
+from ...utils import merge_dicts
+from ..http_handler import WimHandler
+
+OK = 200
+
+
+@patch('osm_ro.wim.wim_thread.CONNECTORS', MagicMock()) # Avoid external calls
+@patch('osm_ro.wim.wim_thread.WimThread.start', MagicMock()) # Avoid running
+class TestHttpHandler(TestCaseWithDatabasePerTest):
+ def setUp(self):
+ super(TestHttpHandler, self).setUp()
+ bottle.debug(True)
+ handler = WimHandler(db=self.db)
+ self.engine = handler.engine
+ self.addCleanup(self.engine.stop_threads)
+ self.app = TestApp(handler.wsgi_app)
+
+ def populate(self, seeds=None):
+ super(TestHttpHandler, self).populate(seeds or eg.consistent_set())
+
+ def test_list_wims(self):
+ # Given some wims are registered in the database
+ self.populate()
+ # when a GET /<tenant_id>/wims request arrives
+ tenant_id = uuid('tenant0')
+ response = self.app.get('/{}/wims'.format(tenant_id))
+
+ # then the request should be well succeeded
+ self.assertEqual(response.status_code, OK)
+ # and all the registered wims should be present
+ retrieved_wims = {v['name']: v for v in response.json['wims']}
+ for name in retrieved_wims:
+ identifier = int(name.replace('wim', ''))
+ self.assertDictContainsSubset(
+ eg.wim(identifier), retrieved_wims[name])
+
+ def test_show_wim(self):
+ # Given some wims are registered in the database
+ self.populate()
+ # when a GET /<tenant_id>/wims/<wim_id> request arrives
+ tenant_id = uuid('tenant0')
+ wim_id = uuid('wim1')
+ response = self.app.get('/{}/wims/{}'.format(tenant_id, wim_id))
+
+ # then the request should be well succeeded
+ self.assertEqual(response.status_code, OK)
+ # and the registered wim (wim1) should be present
+ self.assertDictContainsSubset(eg.wim(1), response.json['wim'])
+ # Moreover, it also works with tenant_id = all
+ response = self.app.get('/any/wims/{}'.format(wim_id))
+ self.assertEqual(response.status_code, OK)
+ self.assertDictContainsSubset(eg.wim(1), response.json['wim'])
+
+ def test_show_wim__wim_doesnt_exists(self):
+ # Given wim_id does not refer to any already registered wim
+ self.populate()
+ # when a GET /<tenant_id>/wims/<wim_id> request arrives
+ tenant_id = uuid('tenant0')
+ wim_id = uuid('wim999')
+ response = self.app.get(
+ '/{}/wims/{}'.format(tenant_id, wim_id),
+ expect_errors=True)
+
+ # then the result should not be well succeeded
+ self.assertEqual(response.status_code, Not_Found)
+
+ def test_show_wim__tenant_doesnt_exists(self):
+ # Given wim_id does not refer to any already registered wim
+ self.populate()
+ # when a GET /<tenant_id>/wims/<wim_id> request arrives
+ tenant_id = uuid('tenant999')
+ wim_id = uuid('wim0')
+ response = self.app.get(
+ '/{}/wims/{}'.format(tenant_id, wim_id),
+ expect_errors=True)
+
+ # then the result should not be well succeeded
+ self.assertEqual(response.status_code, Not_Found)
+
+ def test_edit_wim(self):
+ # Given a WIM exists in the database
+ self.populate()
+ # when a PUT /wims/<wim_id> request arrives
+ wim_id = uuid('wim1')
+ response = self.app.put_json('/wims/{}'.format(wim_id), {
+ 'wim': {'name': 'My-New-Name'}})
+
+ # then the request should be well succeeded
+ self.assertEqual(response.status_code, OK)
+ # and the registered wim (wim1) should be present
+ self.assertDictContainsSubset(
+ merge_dicts(eg.wim(1), name='My-New-Name'),
+ response.json['wim'])
+
+ def test_delete_wim(self):
+ # Given a WIM exists in the database
+ self.populate()
+ num_accounts = self.count('wim_accounts')
+ num_associations = self.count('wim_nfvo_tenants')
+ num_mappings = self.count('wim_port_mappings')
+
+ with self.engine.threads_running():
+ num_threads = len(self.engine.threads)
+ # when a DELETE /wims/<wim_id> request arrives
+ wim_id = uuid('wim1')
+ response = self.app.delete('/wims/{}'.format(wim_id))
+ num_threads_after = len(self.engine.threads)
+
+ # then the request should be well succeeded
+ self.assertEqual(response.status_code, OK)
+ self.assertIn('deleted', response.json['result'])
+ # and the registered wim1 should be deleted
+ response = self.app.get(
+ '/any/wims/{}'.format(wim_id),
+ expect_errors=True)
+ self.assertEqual(response.status_code, Not_Found)
+ # and all the dependent records in other tables should be deleted:
+ # wim_accounts, wim_nfvo_tenants, wim_port_mappings
+ self.assertEqual(self.count('wim_nfvo_tenants'),
+ num_associations - eg.NUM_TENANTS)
+ self.assertLess(self.count('wim_port_mappings'), num_mappings)
+ self.assertEqual(self.count('wim_accounts'),
+ num_accounts - eg.NUM_TENANTS)
+ # And the threads associated with the wim accounts should be stopped
+ self.assertEqual(num_threads_after, num_threads - eg.NUM_TENANTS)
+
+ def test_create_wim(self):
+ # Given no WIM exists yet
+ # when a POST /wims request arrives with the right payload
+ response = self.app.post_json('/wims', {'wim': eg.wim(999)})
+
+ # then the request should be well succeeded
+ self.assertEqual(response.status_code, OK)
+ self.assertEqual(response.json['wim']['name'], 'wim999')
+
+ def test_create_wim_account(self):
+ # Given a WIM and a NFVO tenant exist but are not associated
+ self.populate([{'wims': [eg.wim(0)]},
+ {'nfvo_tenants': [eg.tenant(0)]}])
+
+ with self.engine.threads_running():
+ num_threads = len(self.engine.threads)
+ # when a POST /<tenant_id>/wims/<wim_id> arrives
+ response = self.app.post_json(
+ '/{}/wims/{}'.format(uuid('tenant0'), uuid('wim0')),
+ {'wim_account': eg.wim_account(0, 0)})
+
+ num_threads_after = len(self.engine.threads)
+
+ # then a new thread should be created
+ self.assertEqual(num_threads_after, num_threads + 1)
+
+ # and the request should be well succeeded
+ self.assertEqual(response.status_code, OK)
+ self.assertEqual(response.json['wim_account']['name'], 'wim-account00')
+
+ # and a new association record should be created
+ association = self.db.get_rows(FROM='wim_nfvo_tenants')
+ assert association
+ self.assertEqual(len(association), 1)
+ self.assertEqual(association[0]['wim_id'], uuid('wim0'))
+ self.assertEqual(association[0]['nfvo_tenant_id'], uuid('tenant0'))
+ self.assertEqual(association[0]['wim_account_id'],
+ response.json['wim_account']['uuid'])
+
+ def test_create_wim_account__existing_account(self):
+ # Given a WIM, a WIM account and a NFVO tenants exist
+ # But the NFVO and the WIM are not associated
+ self.populate([
+ {'wims': [eg.wim(0)]},
+ {'nfvo_tenants': [eg.tenant(0)]},
+ {'wim_accounts': [eg.wim_account(0, 0)]}])
+
+ # when a POST /<tenant_id>/wims/<wim_id> arrives
+ # and it refers to an existing wim account
+ response = self.app.post_json(
+ '/{}/wims/{}'.format(uuid('tenant0'), uuid('wim0')),
+ {'wim_account': {'name': 'wim-account00'}})
+
+ # then the request should be well succeeded
+ self.assertEqual(response.status_code, OK)
+ # and the association should be created
+ association = self.db.get_rows(
+ FROM='wim_nfvo_tenants',
+ WHERE={'wim_id': uuid('wim0'),
+ 'nfvo_tenant_id': uuid('tenant0')})
+ assert association
+ self.assertEqual(len(association), 1)
+ # but no new wim_account should be created
+ wim_accounts = self.db.get_rows(FROM='wim_accounts')
+ self.assertEqual(len(wim_accounts), 1)
+ self.assertEqual(wim_accounts[0]['name'], 'wim-account00')
+
+ def test_create_wim_account__existing_account__differing(self):
+ # Given a WIM, a WIM account and a NFVO tenants exist
+ # But the NFVO and the WIM are not associated
+ self.populate([
+ {'wims': [eg.wim(0)]},
+ {'nfvo_tenants': [eg.tenant(0)]},
+ {'wim_accounts': [eg.wim_account(0, 0)]}])
+
+ # when a POST /<tenant_id>/wims/<wim_id> arrives
+ # and it refers to an existing wim account,
+ # but with different fields
+ response = self.app.post_json(
+ '/{}/wims/{}'.format(uuid('tenant0'), uuid('wim0')), {
+ 'wim_account': {
+ 'name': 'wim-account00',
+ 'user': 'john',
+ 'password': 'abc123'}},
+ expect_errors=True)
+
+ # then the request should not be well succeeded
+ self.assertEqual(response.status_code, Conflict)
+ # some useful message should be displayed
+ response.mustcontain('attempt to overwrite', 'user', 'password')
+ # and the association should not be created
+ association = self.db.get_rows(
+ FROM='wim_nfvo_tenants',
+ WHERE={'wim_id': uuid('wim0'),
+ 'nfvo_tenant_id': uuid('tenant0')})
+ assert not association
+
+ def test_create_wim_account__association_already_exists(self):
+ # Given a WIM, a WIM account and a NFVO tenants exist
+ # and are correctly associated
+ self.populate()
+ num_assoc_before = self.count('wim_nfvo_tenants')
+
+ # when a POST /<tenant_id>/wims/<wim_id> arrives trying to connect a
+ # WIM and a tenant for the second time
+ response = self.app.post_json(
+ '/{}/wims/{}'.format(uuid('tenant0'), uuid('wim0')), {
+ 'wim_account': {
+ 'user': 'user999',
+ 'password': 'password999'}},
+ expect_errors=True)
+
+ # then the request should not be well succeeded
+ self.assertEqual(response.status_code, Conflict)
+ # the message should be useful
+ response.mustcontain('There is already', uuid('wim0'), uuid('tenant0'))
+
+ num_assoc_after = self.count('wim_nfvo_tenants')
+
+ # and the number of association record should not be increased
+ self.assertEqual(num_assoc_before, num_assoc_after)
+
+ def test_create_wim__tenant_doesnt_exist(self):
+ # Given a tenant not exists
+ self.populate()
+
+ # But the user tries to create a wim_account anyway
+ response = self.app.post_json(
+ '/{}/wims/{}'.format(uuid('tenant999'), uuid('wim0')), {
+ 'wim_account': {
+ 'user': 'user999',
+ 'password': 'password999'}},
+ expect_errors=True)
+
+ # then the request should not be well succeeded
+ self.assertEqual(response.status_code, Not_Found)
+ # the message should be useful
+ response.mustcontain('No record was found', uuid('tenant999'))
+
+ def test_create_wim__wim_doesnt_exist(self):
+ # Given a tenant not exists
+ self.populate()
+
+ # But the user tries to create a wim_account anyway
+ response = self.app.post_json(
+ '/{}/wims/{}'.format(uuid('tenant0'), uuid('wim999')), {
+ 'wim_account': {
+ 'user': 'user999',
+ 'password': 'password999'}},
+ expect_errors=True)
+
+ # then the request should not be well succeeded
+ self.assertEqual(response.status_code, Not_Found)
+ # the message should be useful
+ response.mustcontain('No record was found', uuid('wim999'))
+
+ def test_update_wim_account(self):
+ # Given a WIM account connecting a tenant and a WIM exists
+ self.populate()
+
+ with self.engine.threads_running():
+ num_threads = len(self.engine.threads)
+
+ thread = self.engine.threads[uuid('wim-account00')]
+ reload = MagicMock(wraps=thread.reload)
+
+ with patch.object(thread, 'reload', reload):
+ # when a PUT /<tenant_id>/wims/<wim_id> arrives
+ response = self.app.put_json(
+ '/{}/wims/{}'.format(uuid('tenant0'), uuid('wim0')), {
+ 'wim_account': {
+ 'name': 'account888',
+ 'user': 'user888'}})
+
+ num_threads_after = len(self.engine.threads)
+
+ # then the wim thread should be restarted
+ reload.assert_called_once()
+ # and no thread should be added or removed
+ self.assertEqual(num_threads_after, num_threads)
+
+ # and the request should be well succeeded
+ self.assertEqual(response.status_code, OK)
+ self.assertEqual(response.json['wim_account']['name'], 'account888')
+ self.assertEqual(response.json['wim_account']['user'], 'user888')
+
+ def test_update_wim_account__multiple(self):
+ # Given a WIM account connected to several tenants
+ self.populate()
+
+ with self.engine.threads_running():
+ # when a PUT /any/wims/<wim_id> arrives
+ response = self.app.put_json(
+ '/any/wims/{}'.format(uuid('wim0')), {
+ 'wim_account': {
+ 'user': 'user888',
+ 'config': {'x': 888}}})
+
+ # then the request should be well succeeded
+ self.assertEqual(response.status_code, OK)
+ self.assertEqual(len(response.json['wim_accounts']), eg.NUM_TENANTS)
+
+ for account in response.json['wim_accounts']:
+ self.assertEqual(account['user'], 'user888')
+ self.assertEqual(account['config']['x'], 888)
+
+ def test_delete_wim_account(self):
+ # Given a WIM account exists and it is connected to a tenant
+ self.populate()
+
+ num_accounts_before = self.count('wim_accounts')
+
+ with self.engine.threads_running():
+ thread = self.engine.threads[uuid('wim-account00')]
+ exit = MagicMock(wraps=thread.exit)
+ num_threads = len(self.engine.threads)
+
+ with patch.object(thread, 'exit', exit):
+ # when a PUT /<tenant_id>/wims/<wim_id> arrives
+ response = self.app.delete_json(
+ '/{}/wims/{}'.format(uuid('tenant0'), uuid('wim0')))
+
+ num_threads_after = len(self.engine.threads)
+
+ # then the wim thread should exit
+ self.assertEqual(num_threads_after, num_threads - 1)
+ exit.assert_called_once()
+
+ # and the request should be well succeeded
+ self.assertEqual(response.status_code, OK)
+ response.mustcontain('account `wim-account00` deleted')
+
+ # and the number of wim_accounts should decrease
+ num_accounts_after = self.count('wim_accounts')
+ self.assertEqual(num_accounts_after, num_accounts_before - 1)
+
+ def test_delete_wim_account__multiple(self):
+ # Given a WIM account exists and it is connected to several tenants
+ self.populate()
+
+ num_accounts_before = self.count('wim_accounts')
+
+ with self.engine.threads_running():
+ # when a PUT /<tenant_id>/wims/<wim_id> arrives
+ response = self.app.delete_json(
+ '/any/wims/{}'.format(uuid('wim0')))
+
+ # then the request should be well succeeded
+ self.assertEqual(response.status_code, OK)
+ response.mustcontain('account `wim-account00` deleted')
+ response.mustcontain('account `wim-account10` deleted')
+
+ # and the number of wim_accounts should decrease
+ num_accounts_after = self.count('wim_accounts')
+ self.assertEqual(num_accounts_after,
+ num_accounts_before - eg.NUM_TENANTS)
+
+ def test_delete_wim_account__doesnt_exist(self):
+ # Given we have a tenant that is not connected to a WIM
+ self.populate()
+ tenant = {'uuid': uuid('tenant888'), 'name': 'tenant888'}
+ self.populate([{'nfvo_tenants': [tenant]}])
+
+ num_accounts_before = self.count('wim_accounts')
+
+ # when a PUT /<tenant_id>/wims/<wim_id> arrives
+ response = self.app.delete(
+ '/{}/wims/{}'.format(uuid('tenant888'), uuid('wim0')),
+ expect_errors=True)
+
+ # then the request should not succeed
+ self.assertEqual(response.status_code, Not_Found)
+
+ # and the number of wim_accounts should not decrease
+ num_accounts_after = self.count('wim_accounts')
+ self.assertEqual(num_accounts_after, num_accounts_before)
+
+ def test_create_port_mappings(self):
+ # Given we have a wim and datacenter without any port mappings
+ self.populate([{'nfvo_tenants': eg.tenant(0)}] +
+ eg.datacenter_set(888, 0) +
+ eg.wim_set(999, 0))
+
+ # when a POST /<tenant_id>/wims/<wim_id>/port_mapping arrives
+ response = self.app.post_json(
+ '/{}/wims/{}/port_mapping'.format(uuid('tenant0'), uuid('wim999')),
+ {'wim_port_mapping': [{
+ 'datacenter_name': 'dc888',
+ 'pop_wan_mappings': [
+ {'pop_switch_dpid': 'AA:AA:AA:AA:AA:AA:AA:AA',
+ 'pop_switch_port': 1,
+ 'wan_service_mapping_info': {
+ 'mapping_type': 'dpid-port',
+ 'wan_switch_dpid': 'BB:BB:BB:BB:BB:BB:BB:BB',
+ 'wan_switch_port': 1
+ }}
+ ]}
+ ]})
+
+ # the request should be well succeeded
+ self.assertEqual(response.status_code, OK)
+ # and port mappings should be stored in the database
+ port_mapping = self.db.get_rows(FROM='wim_port_mappings')
+ self.assertEqual(len(port_mapping), 1)
+
+ def test_get_port_mappings(self):
+ # Given WIMS and datacenters exist with port mappings between them
+ self.populate()
+ # when a GET /<tenant_id>/wims/<wim_id>/port_mapping arrives
+ response = self.app.get(
+ '/{}/wims/{}/port_mapping'.format(uuid('tenant0'), uuid('wim0')))
+ # the request should be well succeeded
+ self.assertEqual(response.status_code, OK)
+ # and we should see port mappings for each WIM, datacenter pair
+ mappings = response.json['wim_port_mapping']
+ self.assertEqual(len(mappings), eg.NUM_DATACENTERS)
+ # ^ In the fixture set all the datacenters are connected to all wims
+
+ def test_delete_port_mappings(self):
+ # Given WIMS and datacenters exist with port mappings between them
+ self.populate()
+ num_mappings_before = self.count('wim_port_mappings')
+
+ # when a DELETE /<tenant_id>/wims/<wim_id>/port_mapping arrives
+ response = self.app.delete(
+ '/{}/wims/{}/port_mapping'.format(uuid('tenant0'), uuid('wim0')))
+ # the request should be well succeeded
+ self.assertEqual(response.status_code, OK)
+ # and the number of port mappings should decrease
+ num_mappings_after = self.count('wim_port_mappings')
+ self.assertEqual(num_mappings_after,
+ num_mappings_before - eg.NUM_DATACENTERS)
+ # ^ In the fixture set all the datacenters are connected to all wims
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/osm_ro/wim/tests/test_persistence.py b/osm_ro/wim/tests/test_persistence.py
new file mode 100644
index 0000000..d09a116
--- /dev/null
+++ b/osm_ro/wim/tests/test_persistence.py
@@ -0,0 +1,265 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+
+from __future__ import unicode_literals
+
+import unittest
+from itertools import chain
+from types import StringType
+
+from six.moves import range
+
+from . import fixtures as eg
+from ...tests.db_helpers import (
+ TestCaseWithDatabasePerTest,
+ disable_foreign_keys,
+ uuid
+)
+from ..persistence import (
+ WimPersistence,
+ hide_confidential_fields,
+ serialize_fields,
+ unserialize_fields
+)
+
+
+class TestPersistenceUtils(unittest.TestCase):
+ def test_hide_confidential_fields(self):
+ example = {
+ 'password': '123456',
+ 'nested.password': '123456',
+ 'nested.secret': None,
+ }
+ result = hide_confidential_fields(example,
+ fields=('password', 'secret'))
+ for field in 'password', 'nested.password':
+ assert result[field].startswith('***')
+ self.assertIs(result['nested.secret'], None)
+
+ def test_serialize_fields(self):
+ example = {
+ 'config': dict(x=1),
+ 'nested.info': [1, 2, 3],
+ 'nested.config': None
+ }
+ result = serialize_fields(example, fields=('config', 'info'))
+ for field in 'config', 'nested.info':
+ self.assertIsInstance(result[field], StringType)
+ self.assertIs(result['nested.config'], None)
+
+ def test_unserialize_fields(self):
+ example = {
+ 'config': '{"x": 1}',
+ 'nested.info': '[1,2,3]',
+ 'nested.config': None,
+ 'confidential.info': '{"password": "abcdef"}'
+ }
+ result = unserialize_fields(example, fields=('config', 'info'))
+ self.assertEqual(result['config'], dict(x=1))
+ self.assertEqual(result['nested.info'], [1, 2, 3])
+ self.assertIs(result['nested.config'], None)
+ self.assertNotEqual(result['confidential.info']['password'], 'abcdef')
+ assert result['confidential.info']['password'].startswith('***')
+
+
+class TestWimPersistence(TestCaseWithDatabasePerTest):
+ def setUp(self):
+ super(TestWimPersistence, self).setUp()
+ self.persist = WimPersistence(self.db)
+
+ def populate(self, seeds=None):
+ super(TestWimPersistence, self).populate(seeds or eg.consistent_set())
+
+ def test_query_offset(self):
+ # Given a database contains 4 records
+ self.populate([{'wims': [eg.wim(i) for i in range(4)]}])
+
+ # When we query using a limit of 2 and a offset of 1
+ results = self.persist.query('wims',
+ ORDER_BY='name', LIMIT=2, OFFSET=1)
+ # Then we should have 2 results, skipping the first record
+ names = [r['name'] for r in results]
+ self.assertItemsEqual(names, ['wim1', 'wim2'])
+
+ def test_get_wim_account_by_wim_tenant(self):
+ # Given a database contains WIM accounts associated to Tenants
+ self.populate()
+
+ # when we retrieve the account using wim and tenant
+ wim_account = self.persist.get_wim_account_by(
+ uuid('wim0'), uuid('tenant0'))
+
+ # then the right record should be returned
+ self.assertEqual(wim_account['uuid'], uuid('wim-account00'))
+ self.assertEqual(wim_account['name'], 'wim-account00')
+ self.assertEqual(wim_account['user'], 'user00')
+
+ def test_get_wim_account_by_wim_tenant__names(self):
+ # Given a database contains WIM accounts associated to Tenants
+ self.populate()
+
+ # when we retrieve the account using wim and tenant
+ wim_account = self.persist.get_wim_account_by(
+ 'wim0', 'tenant0')
+
+ # then the right record should be returned
+ self.assertEqual(wim_account['uuid'], uuid('wim-account00'))
+ self.assertEqual(wim_account['name'], 'wim-account00')
+ self.assertEqual(wim_account['user'], 'user00')
+
+ def test_get_wim_accounts_by_wim(self):
+ # Given a database contains WIM accounts associated to Tenants
+ self.populate()
+
+ # when we retrieve the accounts using wim
+ wim_accounts = self.persist.get_wim_accounts_by(uuid('wim0'))
+
+ # then the right records should be returned
+ self.assertEqual(len(wim_accounts), eg.NUM_TENANTS)
+ for account in wim_accounts:
+ self.assertEqual(account['wim_id'], uuid('wim0'))
+
+ def test_get_wim_port_mappings(self):
+ # Given a database with WIMs, datacenters and port-mappings
+ self.populate()
+
+ # when we retrieve the port mappings for a list of datacenters
+ # using either names or uuids
+ for criteria in ([uuid('dc0'), uuid('dc1')], ['dc0', 'dc1']):
+ mappings = self.persist.get_wim_port_mappings(datacenter=criteria)
+
+ # then each result should have a datacenter_id
+ datacenters = [m['datacenter_id'] for m in mappings]
+ for datacenter in datacenters:
+ self.assertIn(datacenter, [uuid('dc0'), uuid('dc1')])
+
+ # a wim_id
+ wims = [m['wim_id'] for m in mappings]
+ for wim in wims:
+ self.assertIsNot(wim, None)
+
+ # and a array of pairs 'wan' <> 'pop' connections
+ pairs = chain(*(m['wan_pop_port_mappings'] for m in mappings))
+ self.assertEqual(len(list(pairs)), 2 * eg.NUM_WIMS)
+
+ def test_get_wim_port_mappings_multiple(self):
+ # Given we have more then one connection in a datacenter managed by the
+ # WIM
+ self.populate()
+ self.populate([{
+ 'wim_port_mappings': [
+ eg.wim_port_mapping(
+ 0, 0,
+ pop_dpid='CC:CC:CC:CC:CC:CC:CC:CC',
+ wan_dpid='DD:DD:DD:DD:DD:DD:DD:DD'),
+ eg.wim_port_mapping(
+ 0, 0,
+ pop_dpid='EE:EE:EE:EE:EE:EE:EE:EE',
+ wan_dpid='FF:FF:FF:FF:FF:FF:FF:FF')]}])
+
+ # when we retrieve the port mappings for the wim and datacenter:
+ mappings = (
+ self.persist.get_wim_port_mappings(wim='wim0', datacenter='dc0'))
+
+ # then it should return just a single result, grouped by wim and
+ # datacenter
+ self.assertEqual(len(mappings), 1)
+ self.assertEqual(mappings[0]['wim_id'], uuid('wim0'))
+ self.assertEqual(mappings[0]['datacenter_id'], uuid('dc0'))
+
+ self.assertEqual(len(mappings[0]['wan_pop_port_mappings']), 3)
+
+ # when we retreive the mappings for more then one wim/datacenter
+ # the grouping should still work properly
+ mappings = self.persist.get_wim_port_mappings(
+ wim=['wim0', 'wim1'], datacenter=['dc0', 'dc1'])
+ self.assertEqual(len(mappings), 4)
+ pairs = chain(*(m['wan_pop_port_mappings'] for m in mappings))
+ self.assertEqual(len(list(pairs)), 6)
+
+ def test_get_actions_in_group(self):
+ # Given a good number of wim actions exist in the database
+ kwargs = {'action_id': uuid('action0')}
+ actions = (eg.wim_actions('CREATE', num_links=8, **kwargs) +
+ eg.wim_actions('FIND', num_links=8, **kwargs) +
+ eg.wim_actions('START', num_links=8, **kwargs))
+ for i, action in enumerate(actions):
+ action['task_index'] = i
+
+ self.populate([
+ {'nfvo_tenants': eg.tenant()}
+ ] + eg.wim_set() + [
+ {'instance_actions': eg.instance_action(**kwargs)},
+ {'vim_wim_actions': actions}
+ ])
+
+ # When we retrieve them in groups
+ limit = 5
+ results = self.persist.get_actions_in_groups(
+ uuid('wim-account00'), ['instance_wim_nets'], group_limit=limit)
+
+ # Then we should have N groups where N == limit
+ self.assertEqual(len(results), limit)
+ for _, task_list in results:
+ # And since for each link we have create 3 actions (create, find,
+ # start), we should find them in each group
+ self.assertEqual(len(task_list), 3)
+
+ @disable_foreign_keys
+ def test_update_instance_action_counters(self):
+ # Given we have one instance action in the database with 2 incomplete
+ # tasks
+ action = eg.instance_action(num_tasks=2)
+ self.populate([{'instance_actions': action}])
+ # When we update the done counter by 0, nothing should happen
+ self.persist.update_instance_action_counters(action['uuid'], done=0)
+ result = self.persist.get_by_uuid('instance_actions', action['uuid'])
+ self.assertEqual(result['number_done'], 0)
+ self.assertEqual(result['number_failed'], 0)
+ # When we update the done counter by 2, number_done should be 2
+ self.persist.update_instance_action_counters(action['uuid'], done=2)
+ result = self.persist.get_by_uuid('instance_actions', action['uuid'])
+ self.assertEqual(result['number_done'], 2)
+ self.assertEqual(result['number_failed'], 0)
+ # When we update the done counter by -1, and the failed counter by 1
+ self.persist.update_instance_action_counters(
+ action['uuid'], done=-1, failed=1)
+ # Then we should see 1 and 1
+ result = self.persist.get_by_uuid('instance_actions', action['uuid'])
+ self.assertEqual(result['number_done'], 1)
+ self.assertEqual(result['number_failed'], 1)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/osm_ro/wim/tests/test_wim_thread.py b/osm_ro/wim/tests/test_wim_thread.py
new file mode 100644
index 0000000..6d61848
--- /dev/null
+++ b/osm_ro/wim/tests/test_wim_thread.py
@@ -0,0 +1,332 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+
+from __future__ import unicode_literals, print_function
+
+import unittest
+from difflib import unified_diff
+from operator import itemgetter
+from time import time
+
+import json
+
+from mock import MagicMock, patch
+
+from . import fixtures as eg
+from ...tests.db_helpers import (
+ TestCaseWithDatabasePerTest,
+ disable_foreign_keys,
+ uuid
+)
+from ..engine import WimEngine
+from ..persistence import WimPersistence
+from ..wim_thread import WimThread
+
+
+ignore_connector = patch('osm_ro.wim.wim_thread.CONNECTORS', MagicMock())
+
+
+def _repr(value):
+ return json.dumps(value, indent=4, sort_keys=True)
+
+
+@ignore_connector
+class TestWimThreadWithDb(TestCaseWithDatabasePerTest):
+ def setUp(self):
+ super(TestWimThreadWithDb, self).setUp()
+ self.persist = WimPersistence(self.db)
+ wim = eg.wim(0)
+ account = eg.wim_account(0, 0)
+ account['wim'] = wim
+ self.thread = WimThread(self.persist, account)
+ self.thread.connector = MagicMock()
+
+ def assertTasksEqual(self, left, right):
+ fields = itemgetter('item', 'item_id', 'action', 'status')
+ left_ = (t.as_dict() for t in left)
+ left_ = [fields(t) for t in left_]
+ right_ = [fields(t) for t in right]
+
+ try:
+ self.assertItemsEqual(left_, right_)
+ except AssertionError:
+ print('left', _repr(left))
+ print('left', len(left_), 'items')
+ print('right', len(right_), 'items')
+ result = list(unified_diff(_repr(sorted(left_)).split('\n'),
+ _repr(sorted(right_)).split('\n'),
+ 'left', 'right'))
+ print('diff:\n', '\n'.join(result))
+ raise
+
+ def test_reload_actions__all_create(self):
+ # Given we have 3 CREATE actions stored in the database
+ actions = eg.wim_actions('CREATE',
+ action_id=uuid('action0'), num_links=3)
+ self.populate([
+ {'nfvo_tenants': eg.tenant()}
+ ] + eg.wim_set() + [
+ {'instance_actions':
+ eg.instance_action(action_id=uuid('action0'))},
+ {'vim_wim_actions': actions}
+ ])
+
+ # When we reload the tasks
+ self.thread.reload_actions()
+ # All of them should be inserted as pending
+ self.assertTasksEqual(self.thread.pending_tasks, actions)
+
+ def test_reload_actions__all_refresh(self):
+ # Given just DONE tasks are in the database
+ actions = eg.wim_actions(status='DONE',
+ action_id=uuid('action0'), num_links=3)
+ self.populate([
+ {'nfvo_tenants': eg.tenant()}
+ ] + eg.wim_set() + [
+ {'instance_actions':
+ eg.instance_action(action_id=uuid('action0'))},
+ {'vim_wim_actions': actions}
+ ])
+
+ # When we reload the tasks
+ self.thread.reload_actions()
+ # All of them should be inserted as refresh
+ self.assertTasksEqual(self.thread.refresh_tasks, actions)
+
+ def test_reload_actions__grouped(self):
+ # Given we have 2 tasks for the same item in the database
+ kwargs = {'action_id': uuid('action0')}
+ actions = (eg.wim_actions('CREATE', **kwargs) +
+ eg.wim_actions('FIND', **kwargs))
+ for i, action in enumerate(actions):
+ action['task_index'] = i
+
+ self.populate([
+ {'nfvo_tenants': eg.tenant()}
+ ] + eg.wim_set() + [
+ {'instance_actions': eg.instance_action(**kwargs)},
+ {'vim_wim_actions': actions}
+ ])
+
+ # When we reload the tasks
+ self.thread.reload_actions()
+ # Just one group should be created
+ self.assertEqual(len(self.thread.grouped_tasks.values()), 1)
+
+ def test_reload_actions__delete_scheduled(self):
+ # Given we have 3 tasks for the same item in the database, but one of
+ # them is a DELETE task and it is SCHEDULED
+ kwargs = {'action_id': uuid('action0')}
+ actions = (eg.wim_actions('CREATE', **kwargs) +
+ eg.wim_actions('FIND', **kwargs) +
+ eg.wim_actions('DELETE', status='SCHEDULED', **kwargs))
+ for i, action in enumerate(actions):
+ action['task_index'] = i
+
+ self.populate([
+ {'nfvo_tenants': eg.tenant()}
+ ] + eg.wim_set() + [
+ {'instance_actions': eg.instance_action(**kwargs)},
+ {'vim_wim_actions': actions}
+ ])
+
+ # When we reload the tasks
+ self.thread.reload_actions()
+ # Just one group should be created
+ self.assertEqual(len(self.thread.grouped_tasks.values()), 1)
+
+ def test_reload_actions__delete_done(self):
+ # Given we have 3 tasks for the same item in the database, but one of
+ # them is a DELETE task and it is not SCHEDULED
+ kwargs = {'action_id': uuid('action0')}
+ actions = (eg.wim_actions('CREATE', **kwargs) +
+ eg.wim_actions('FIND', **kwargs) +
+ eg.wim_actions('DELETE', status='DONE', **kwargs))
+ for i, action in enumerate(actions):
+ action['task_index'] = i
+
+ self.populate([
+ {'nfvo_tenants': eg.tenant()}
+ ] + eg.wim_set() + [
+ {'instance_actions': eg.instance_action(**kwargs)},
+ {'vim_wim_actions': actions}
+ ])
+
+ # When we reload the tasks
+ self.thread.reload_actions()
+ # No pending task should be found
+ self.assertEqual(self.thread.pending_tasks, [])
+
+ def test_reload_actions__batch(self):
+ # Given the group_limit is 10, and we have 24
+ group_limit = 10
+ kwargs = {'action_id': uuid('action0')}
+ actions = (eg.wim_actions('CREATE', num_links=8, **kwargs) +
+ eg.wim_actions('FIND', num_links=8, **kwargs) +
+ eg.wim_actions('FIND', num_links=8, **kwargs))
+ for i, action in enumerate(actions):
+ action['task_index'] = i
+
+ self.populate([
+ {'nfvo_tenants': eg.tenant()}
+ ] + eg.wim_set() + [
+ {'instance_actions': eg.instance_action(**kwargs)},
+ {'vim_wim_actions': actions}
+ ])
+
+ # When we reload the tasks
+ self.thread.reload_actions(group_limit)
+
+ # Then we should still see the actions in memory properly
+ self.assertTasksEqual(self.thread.pending_tasks, actions)
+ self.assertEqual(len(self.thread.grouped_tasks.values()), 8)
+
+ @disable_foreign_keys
+ def test_process_list__refresh(self):
+ update_wan_link = MagicMock(wrap=self.persist.update_wan_link)
+ update_action = MagicMock(wrap=self.persist.update_wan_link)
+ patches = dict(update_wan_link=update_wan_link,
+ update_action=update_action)
+
+ with patch.multiple(self.persist, **patches):
+ # Given we have 2 tasks in the refresh queue
+ kwargs = {'action_id': uuid('action0')}
+ actions = (eg.wim_actions('FIND', 'DONE', **kwargs) +
+ eg.wim_actions('CREATE', 'BUILD', **kwargs))
+ for i, action in enumerate(actions):
+ action['task_index'] = i
+
+ self.populate(
+ [{'instance_wim_nets': eg.instance_wim_nets()}] +
+ [{'instance_actions':
+ eg.instance_action(num_tasks=2, **kwargs)}] +
+ [{'vim_wim_actions': actions}])
+
+ self.thread.insert_pending_tasks(actions)
+
+ # When we process the refresh list
+ processed = self.thread.process_list('refresh')
+
+ # Then we should have 2 updates
+ self.assertEqual(processed, 2)
+
+ # And the database should be updated accordingly
+ self.assertEqual(update_wan_link.call_count, 2)
+ self.assertEqual(update_action.call_count, 2)
+
+ @disable_foreign_keys
+ def test_delete_superseed_create(self):
+ # Given we insert a scheduled CREATE task
+ instance_action = eg.instance_action(num_tasks=1)
+ self.thread.pending_tasks = []
+ engine = WimEngine(persistence=self.persist)
+ self.addCleanup(engine.stop_threads)
+ wan_links = eg.instance_wim_nets()
+ create_actions = engine.create_actions(wan_links)
+ delete_actions = engine.delete_actions(wan_links)
+ engine.incorporate_actions(create_actions + delete_actions,
+ instance_action)
+
+ self.populate(instance_actions=instance_action,
+ vim_wim_actions=create_actions + delete_actions)
+
+ self.thread.insert_pending_tasks(create_actions)
+
+ assert self.thread.pending_tasks[0].is_scheduled
+
+ # When we insert the equivalent DELETE task
+ self.thread.insert_pending_tasks(delete_actions)
+
+ # Then the CREATE task should be superseded
+ self.assertEqual(self.thread.pending_tasks[0].action, 'CREATE')
+ assert self.thread.pending_tasks[0].is_superseded
+
+ self.thread.process_list('pending')
+ self.thread.process_list('refresh')
+ self.assertFalse(self.thread.pending_tasks)
+
+
+@ignore_connector
+class TestWimThread(unittest.TestCase):
+ def setUp(self):
+ wim = eg.wim(0)
+ account = eg.wim_account(0, 0)
+ account['wim'] = wim
+ self.persist = MagicMock()
+ self.thread = WimThread(self.persist, account)
+ self.thread.connector = MagicMock()
+
+ super(TestWimThread, self).setUp()
+
+ def test_process_refresh(self):
+ # Given we have 30 tasks in the refresh queue
+ kwargs = {'action_id': uuid('action0')}
+ actions = eg.wim_actions('FIND', 'DONE', num_links=30, **kwargs)
+ self.thread.insert_pending_tasks(actions)
+
+ # When we process the refresh list
+ processed = self.thread.process_list('refresh')
+
+ # Then we should have REFRESH_BATCH updates
+ self.assertEqual(processed, self.thread.BATCH)
+
+ def test_process_refresh__with_superseded(self):
+ # Given we have 30 tasks but 15 of them are superseded
+ kwargs = {'action_id': uuid('action0')}
+ actions = eg.wim_actions('FIND', 'DONE', num_links=30, **kwargs)
+ self.thread.insert_pending_tasks(actions)
+ for task in self.thread.refresh_tasks[0:30:2]:
+ task.status = 'SUPERSEDED'
+
+ now = time()
+
+ # When we call the refresh_elements
+ processed = self.thread.process_list('refresh')
+
+ # Then we should have 25 updates (since SUPERSEDED updates are cheap,
+ # they are not counted for the limits)
+ self.assertEqual(processed, 25)
+
+ # The SUPERSEDED tasks should be removed, 5 tasks should be untouched,
+ # and 10 tasks should be rescheduled
+ refresh_tasks = self.thread.refresh_tasks
+ old = [t for t in refresh_tasks if t.process_at <= now]
+ new = [t for t in refresh_tasks if t.process_at > now]
+ self.assertEqual(len(old), 5)
+ self.assertEqual(len(new), 10)
+ self.assertEqual(len(self.thread.refresh_tasks), 15)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/osm_ro/wim/tox.ini b/osm_ro/wim/tox.ini
new file mode 100644
index 0000000..29f1a8f
--- /dev/null
+++ b/osm_ro/wim/tox.ini
@@ -0,0 +1,58 @@
+# This tox file allows the devs to run unit tests only for this subpackage.
+# In order to do so, cd into the directory and run `tox`
+
+[tox]
+minversion = 1.8
+envlist = py27,flake8,radon
+skipsdist = True
+
+[testenv]
+passenv = *_DB_*
+setenv =
+ PATH = {env:PATH}:{toxinidir}/../../database_utils
+ DBUTILS = {toxinidir}/../../database_utils
+changedir = {toxinidir}
+commands =
+ nosetests -v -d {posargs:tests}
+deps =
+ WebTest
+ logging
+ bottle
+ coverage
+ jsonschema
+ mock
+ mysqlclient
+ nose
+ six
+ PyYaml
+ paramiko
+ ipdb
+ requests
+
+[testenv:flake8]
+changedir = {toxinidir}
+deps = flake8
+commands = flake8 {posargs:.}
+
+[testenv:radon]
+changedir = {toxinidir}
+deps = radon
+commands =
+ radon cc --show-complexity --total-average {posargs:.}
+ radon mi -s {posargs:.}
+
+[coverage:run]
+branch = True
+source = {toxinidir}
+omit =
+ tests
+ tests/*
+ */test_*
+ .tox/*
+
+[coverage:report]
+show_missing = True
+
+[flake8]
+exclude =
+ .tox
diff --git a/osm_ro/wim/wan_link_actions.py b/osm_ro/wim/wan_link_actions.py
new file mode 100644
index 0000000..1993ae7
--- /dev/null
+++ b/osm_ro/wim/wan_link_actions.py
@@ -0,0 +1,315 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+# pylint: disable=E1101,E0203,W0201
+import json
+from time import time
+
+from ..utils import filter_dict_keys as filter_keys
+from ..utils import merge_dicts, remove_none_items, safe_get, truncate
+from .actions import CreateAction, DeleteAction, FindAction
+from .errors import (
+ InconsistentState,
+ MultipleRecordsFound,
+ NoRecordFound,
+)
+from wimconn import WimConnectorError
+
+INSTANCE_NET_STATUS_ERROR = ('DOWN', 'ERROR', 'VIM_ERROR',
+ 'DELETED', 'SCHEDULED_DELETION')
+INSTANCE_NET_STATUS_PENDING = ('BUILD', 'INACTIVE', 'SCHEDULED_CREATION')
+INSTANCE_VM_STATUS_ERROR = ('ERROR', 'VIM_ERROR',
+ 'DELETED', 'SCHEDULED_DELETION')
+
+
+class RefreshMixin(object):
+ def refresh(self, connector, persistence):
+ """Ask the external WAN Infrastructure Manager system for updates on
+ the status of the task.
+
+ Arguments:
+ connector: object with API for accessing the WAN
+ Infrastructure Manager system
+ persistence: abstraction layer for the database
+ """
+ fields = ('wim_status', 'wim_info', 'error_msg')
+ result = dict.fromkeys(fields)
+
+ try:
+ result.update(
+ connector
+ .get_connectivity_service_status(self.wim_internal_id))
+ except WimConnectorError as ex:
+ self.logger.exception(ex)
+ result.update(wim_status='WIM_ERROR', error_msg=truncate(ex))
+
+ result = filter_keys(result, fields)
+
+ action_changes = remove_none_items({
+ 'extra': merge_dicts(self.extra, result),
+ 'status': 'BUILD' if result['wim_status'] == 'BUILD' else None,
+ 'error_msg': result['error_msg'],
+ 'modified_at': time()})
+ link_changes = merge_dicts(result, status=result.pop('wim_status'))
+ # ^ Rename field: wim_status => status
+
+ persistence.update_wan_link(self.item_id,
+ remove_none_items(link_changes))
+
+ self.save(persistence, **action_changes)
+
+ return result
+
+
+class WanLinkCreate(RefreshMixin, CreateAction):
+ def fail(self, persistence, reason, status='FAILED'):
+ changes = {'status': 'ERROR', 'error_msg': truncate(reason)}
+ persistence.update_wan_link(self.item_id, changes)
+ return super(WanLinkCreate, self).fail(persistence, reason, status)
+
+ def process(self, connector, persistence, ovim):
+ """Process the current task.
+ First we check if all the dependencies are ready,
+ then we call ``execute`` to actually execute the action.
+
+ Arguments:
+ connector: object with API for accessing the WAN
+ Infrastructure Manager system
+ persistence: abstraction layer for the database
+ ovim: instance of openvim, abstraction layer that enable
+ SDN-related operations
+ """
+ wan_link = persistence.get_by_uuid('instance_wim_nets', self.item_id)
+
+ # First we check if all the dependencies are solved
+ instance_nets = persistence.get_instance_nets(
+ wan_link['instance_scenario_id'], wan_link['sce_net_id'])
+
+ try:
+ dependency_statuses = [n['status'] for n in instance_nets]
+ except KeyError:
+ self.logger.debug('`status` not found in\n\n%s\n\n',
+ json.dumps(instance_nets, indent=4))
+ errored = [instance_nets[i]
+ for i, status in enumerate(dependency_statuses)
+ if status in INSTANCE_NET_STATUS_ERROR]
+ if errored:
+ return self.fail(
+ persistence,
+ 'Impossible to stablish WAN connectivity due to an issue '
+ 'with the local networks:\n\t' +
+ '\n\t'.join('{uuid}: {status}'.format(**n) for n in errored))
+
+ pending = [instance_nets[i]
+ for i, status in enumerate(dependency_statuses)
+ if status in INSTANCE_NET_STATUS_PENDING]
+ if pending:
+ return self.defer(
+ persistence,
+ 'Still waiting for the local networks to be active:\n\t' +
+ '\n\t'.join('{uuid}: {status}'.format(**n) for n in pending))
+
+ return self.execute(connector, persistence, ovim, instance_nets)
+
+ def get_endpoint(self, persistence, ovim, instance_net):
+ """Retrieve the endpoint (information about the connection PoP <> WAN
+ """
+ wim_account = persistence.get_wim_account_by(uuid=self.wim_account_id)
+
+ # TODO: make more generic to support networks that are not created with
+ # the SDN assist. This method should have a consistent way of getting
+ # the endpoint for all different types of networks used in the VIM
+ # (provider networks, SDN assist, overlay networks, ...)
+ if instance_net.get('sdn_net_id'):
+ return self.get_endpoint_sdn(
+ persistence, ovim, instance_net, wim_account['wim_id'])
+ else:
+ raise InconsistentState(
+ 'The field `instance_nets.sdn_net_id` was expected to be '
+ 'found in the database for the record %s after the network '
+ 'become active, but it is still NULL', instance_net['uuid'])
+
+ def get_endpoint_sdn(self, persistence, ovim, instance_net, wim_id):
+ criteria = {'net_id': instance_net['sdn_net_id']}
+ local_port_mapping = ovim.get_ports(filter=criteria)
+
+ if len(local_port_mapping) > 1:
+ raise MultipleRecordsFound(criteria, 'ovim.ports')
+ local_port_mapping = local_port_mapping[0]
+
+ criteria = {
+ 'wim_id': wim_id,
+ 'pop_switch_dpid': local_port_mapping['switch_dpid'],
+ 'pop_switch_port': local_port_mapping['switch_port'],
+ 'datacenter_id': instance_net['datacenter_id']}
+
+ wan_port_mapping = persistence.query_one(
+ FROM='wim_port_mappings',
+ WHERE=criteria)
+
+ if local_port_mapping.get('vlan'):
+ wan_port_mapping['wan_service_mapping_info']['vlan'] = (
+ local_port_mapping['vlan'])
+
+ return wan_port_mapping
+
+ @staticmethod
+ def _derive_connection_point(endpoint):
+ point = {'service_endpoint_id': endpoint['wan_service_endpoint_id']}
+ # TODO: Cover other scenarios, e.g. VXLAN.
+ info = endpoint.get('wan_service_mapping_info', {})
+ if 'vlan' in info:
+ point['service_endpoint_encapsulation_type'] = 'dot1q'
+ point['service_endpoint_encapsulation_info'] = {
+ 'vlan': info['vlan']
+ }
+ else:
+ point['service_endpoint_encapsulation_type'] = 'none'
+ return point
+
+ @staticmethod
+ def _derive_service_type(connection_points):
+ # TODO: add multipoint and L3 connectivity.
+ if len(connection_points) == 2:
+ return 'ELINE'
+ else:
+ raise NotImplementedError('Multipoint connectivity is not '
+ 'supported yet.')
+
+ def _update_persistent_data(self, persistence, service_uuid,
+ endpoints, conn_info):
+ """Store plugin/connector specific information in the database"""
+ persistence.update_wan_link(self.item_id, {
+ 'wim_internal_id': service_uuid,
+ 'wim_info': {'conn_info': conn_info},
+ 'status': 'BUILD'})
+
+ def execute(self, connector, persistence, ovim, instance_nets):
+ """Actually execute the action, since now we are sure all the
+ dependencies are solved
+ """
+ try:
+ endpoints = [self.get_endpoint(persistence, ovim, net)
+ for net in instance_nets]
+ connection_points = [self._derive_connection_point(e)
+ for e in endpoints]
+
+ uuid, info = connector.create_connectivity_service(
+ self._derive_service_type(connection_points),
+ connection_points
+ # TODO: other properties, e.g. bandwidth
+ )
+ except (WimConnectorError, InconsistentState) as ex:
+ self.logger.exception(ex)
+ return self.fail(
+ persistence,
+ 'Impossible to stablish WAN connectivity.\n\t{}'.format(ex))
+
+ self.logger.debug('WAN connectivity established %s\n%s\n',
+ uuid, json.dumps(info, indent=4))
+ self.wim_internal_id = uuid
+ self._update_persistent_data(persistence, uuid, endpoints, info)
+ self.succeed(persistence)
+ return uuid
+
+
+class WanLinkDelete(DeleteAction):
+ def succeed(self, persistence):
+ try:
+ persistence.update_wan_link(self.item_id, {'status': 'DELETED'})
+ except NoRecordFound:
+ self.logger.debug('%s(%s) record already deleted',
+ self.item, self.item_id)
+
+ return super(WanLinkDelete, self).succeed(persistence)
+
+ def get_wan_link(self, persistence):
+ """Retrieve information about the wan_link
+
+ It might be cached, or arrive from the database
+ """
+ if self.extra.get('wan_link'):
+ # First try a cached version of the data
+ return self.extra['wan_link']
+
+ return persistence.get_by_uuid(
+ 'instance_wim_nets', self.item_id)
+
+ def process(self, connector, persistence, ovim):
+ """Delete a WAN link previously created"""
+ wan_link = self.get_wan_link(persistence)
+ if 'ERROR' in (wan_link.get('status') or ''):
+ return self.fail(
+ persistence,
+ 'Impossible to delete WAN connectivity, '
+ 'it was never successfully established:'
+ '\n\t{}'.format(wan_link['error_msg']))
+
+ internal_id = wan_link.get('wim_internal_id') or self.internal_id
+
+ if not internal_id:
+ self.logger.debug('No wim_internal_id found in\n%s\n%s\n'
+ 'Assuming no network was created yet, '
+ 'so no network have to be deleted.',
+ json.dumps(wan_link, indent=4),
+ json.dumps(self.as_dict(), indent=4))
+ return self.succeed(persistence)
+
+ try:
+ id = self.wim_internal_id
+ conn_info = safe_get(wan_link, 'wim_info.conn_info')
+ self.logger.debug('Connection Service %s (wan_link: %s):\n%s\n',
+ id, wan_link['uuid'],
+ json.dumps(conn_info, indent=4))
+ result = connector.delete_connectivity_service(id, conn_info)
+ except (WimConnectorError, InconsistentState) as ex:
+ self.logger.exception(ex)
+ return self.fail(
+ persistence,
+ 'Impossible to delete WAN connectivity.\n\t{}'.format(ex))
+
+ self.logger.debug('WAN connectivity removed %s', result)
+ self.succeed(persistence)
+
+ return result
+
+
+class WanLinkFind(RefreshMixin, FindAction):
+ pass
+
+
+ACTIONS = {
+ 'CREATE': WanLinkCreate,
+ 'DELETE': WanLinkDelete,
+ 'FIND': WanLinkFind,
+}
diff --git a/osm_ro/wim/wim_thread.py b/osm_ro/wim/wim_thread.py
new file mode 100644
index 0000000..a13d6a2
--- /dev/null
+++ b/osm_ro/wim/wim_thread.py
@@ -0,0 +1,437 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+
+"""
+Thread-based interaction with WIMs. Tasks are stored in the
+database (vim_wim_actions table) and processed sequentially
+
+Please check the Action class for information about the content of each action.
+"""
+
+import logging
+import threading
+from contextlib import contextmanager
+from functools import partial
+from itertools import islice, chain, takewhile
+from operator import itemgetter, attrgetter
+from sys import exc_info
+from time import time, sleep
+
+from six import reraise
+from six.moves import queue
+
+from . import wan_link_actions, wimconn_odl # wimconn_tapi
+from ..utils import ensure, partition, pipe
+from .actions import IGNORE, PENDING, REFRESH
+from .errors import (
+ DbBaseException,
+ QueueFull,
+ InvalidParameters as Invalid,
+ UndefinedAction,
+)
+from .failing_connector import FailingConnector
+from .wimconn import WimConnectorError
+
+ACTIONS = {
+ 'instance_wim_nets': wan_link_actions.ACTIONS
+}
+
+CONNECTORS = {
+ "odl": wimconn_odl.OdlConnector,
+ # "tapi": wimconn_tapi
+ # Add extra connectors here
+}
+
+
+class WimThread(threading.Thread):
+ """Specialized task queue implementation that runs in an isolated thread.
+
+ Objects of this class have a few methods that are intended to be used
+ outside of the thread:
+
+ - start
+ - insert_task
+ - reload
+ - exit
+
+ All the other methods are used internally to manipulate/process the task
+ queue.
+ """
+ RETRY_SCHEDULED = 10 # 10 seconds
+ REFRESH_BUILD = 10 # 10 seconds
+ REFRESH_ACTIVE = 60 # 1 minute
+ BATCH = 10 # 10 actions per round
+ QUEUE_SIZE = 2000
+ RECOVERY_TIME = 5 # Sleep 5s to leave the system some time to recover
+ MAX_RECOVERY_TIME = 180
+ WAITING_TIME = 1 # Wait 1s for taks to arrive, when there are none
+
+ def __init__(self, persistence, wim_account, logger=None, ovim=None):
+ """Init a thread.
+
+ Arguments:
+ persistence: Database abstraction layer
+ wim_account: Record containing wim_account, tenant and wim
+ information.
+ """
+ name = '{}.{}.{}'.format(wim_account['wim']['name'],
+ wim_account['name'], wim_account['uuid'])
+ super(WimThread, self).__init__(name=name)
+
+ self.name = name
+ self.connector = None
+ self.wim_account = wim_account
+
+ self.logger = logger or logging.getLogger('openmano.wim.'+self.name)
+ self.persist = persistence
+ self.ovim = ovim
+
+ self.task_queue = queue.Queue(self.QUEUE_SIZE)
+
+ self.refresh_tasks = []
+ """Time ordered task list for refreshing the status of WIM nets"""
+
+ self.pending_tasks = []
+ """Time ordered task list for creation, deletion of WIM nets"""
+
+ self.grouped_tasks = {}
+ """ It contains all the creation/deletion pending tasks grouped by
+ its concrete vm, net, etc
+
+ <item><item_id>:
+ - <task1> # e.g. CREATE task
+ <task2> # e.g. DELETE task
+ """
+
+ self._insert_task = {
+ PENDING: partial(self.schedule, list_name='pending'),
+ REFRESH: partial(self.schedule, list_name='refresh'),
+ IGNORE: lambda task, *_, **__: task.save(self.persist)}
+ """Send the task to the right processing queue"""
+
+ def on_start(self):
+ """Run a series of procedures every time the thread (re)starts"""
+ self.connector = self.get_connector()
+ self.reload_actions()
+
+ def get_connector(self):
+ """Create an WimConnector instance according to the wim.type"""
+ error_msg = ''
+ account_id = self.wim_account['uuid']
+ try:
+ account = self.persist.get_wim_account_by(
+ uuid=account_id, hide=None) # Credentials need to be available
+ wim = account['wim']
+ mapping = self.persist.query('wim_port_mappings',
+ WHERE={'wim_id': wim['uuid']},
+ error_if_none=False)
+ return CONNECTORS[wim['type']](wim, account, {
+ 'service_endpoint_mapping': mapping or []
+ })
+ except DbBaseException as ex:
+ error_msg = ('Error when retrieving WIM account ({})\n'
+ .format(account_id)) + str(ex)
+ self.logger.error(error_msg, exc_info=True)
+ except KeyError as ex:
+ error_msg = ('Unable to find the WIM connector for WIM ({})\n'
+ .format(wim['type'])) + str(ex)
+ self.logger.error(error_msg, exc_info=True)
+ except (WimConnectorError, Exception) as ex:
+ # TODO: Remove the Exception class here when the connector class is
+ # ready
+ error_msg = ('Error when loading WIM connector for WIM ({})\n'
+ .format(wim['type'])) + str(ex)
+ self.logger.error(error_msg, exc_info=True)
+
+ error_msg_extra = ('Any task targeting WIM account {} ({}) will fail.'
+ .format(account_id, self.wim_account.get('name')))
+ self.logger.warning(error_msg_extra)
+ return FailingConnector(error_msg + '\n' + error_msg_extra)
+
+ @contextmanager
+ def avoid_exceptions(self):
+ """Make a real effort to keep the thread alive, by avoiding the
+ exceptions. They are instead logged as a critical errors.
+ """
+ try:
+ yield
+ except Exception as ex:
+ self.logger.critical("Unexpected exception %s", ex, exc_info=True)
+ sleep(self.RECOVERY_TIME)
+
+ def reload_actions(self, group_limit=100):
+ """Read actions from database and reload them at memory.
+
+ This method will clean and reload the attributes ``refresh_tasks``,
+ ``pending_tasks`` and ``grouped_tasks``
+
+ Attributes:
+ group_limit (int): maximum number of action groups (those that
+ refer to the same ``<item, item_id>``) to be retrieved from the
+ database in each batch.
+ """
+
+ # First we clean the cache to let the garbage collector work
+ self.refresh_tasks = []
+ self.pending_tasks = []
+ self.grouped_tasks = {}
+
+ offset = 0
+
+ while True:
+ # Do things in batches
+ task_groups = self.persist.get_actions_in_groups(
+ self.wim_account['uuid'], item_types=('instance_wim_nets',),
+ group_offset=offset, group_limit=group_limit)
+ offset += (group_limit - 1) # Update for the next batch
+
+ if not task_groups:
+ break
+
+ pending_groups = (g for _, g in task_groups if is_pending_group(g))
+
+ for task_list in pending_groups:
+ with self.avoid_exceptions():
+ self.insert_pending_tasks(filter_pending_tasks(task_list))
+
+ self.logger.debug(
+ 'Reloaded wim actions pending: %d refresh: %d',
+ len(self.pending_tasks), len(self.refresh_tasks))
+
+ def insert_pending_tasks(self, task_list):
+ """Insert task in the list of actions being processed"""
+ task_list = [action_from(task, self.logger) for task in task_list]
+
+ for task in task_list:
+ group = task.group_key
+ self.grouped_tasks.setdefault(group, [])
+ # Each task can try to supersede the other ones,
+ # but just DELETE actions will actually do
+ task.supersede(self.grouped_tasks[group])
+ self.grouped_tasks[group].append(task)
+
+ # We need a separate loop so each task can check all the other
+ # ones before deciding
+ for task in task_list:
+ self._insert_task[task.processing](task)
+ self.logger.debug('Insert WIM task: %s (%s): %s %s',
+ task.id, task.status, task.action, task.item)
+
+ def schedule(self, task, when=None, list_name='pending'):
+ """Insert a task in the correct list, respecting the schedule.
+ The refreshing list is ordered by threshold_time (task.process_at)
+ It is assumed that this is called inside this thread
+
+ Arguments:
+ task (Action): object representing the task.
+ This object must implement the ``process`` method and inherit
+ from the ``Action`` class
+ list_name: either 'refresh' or 'pending'
+ when (float): unix time in seconds since as a float number
+ """
+ processing_list = {'refresh': self.refresh_tasks,
+ 'pending': self.pending_tasks}[list_name]
+
+ when = when or time()
+ task.process_at = when
+
+ schedule = (t.process_at for t in processing_list)
+ index = len(list(takewhile(lambda moment: moment <= when, schedule)))
+
+ processing_list.insert(index, task)
+ self.logger.debug(
+ 'Schedule of %s in "%s" - waiting position: %d (%f)',
+ task.id, list_name, index, task.process_at)
+
+ return task
+
+ def process_list(self, list_name='pending'):
+ """Process actions in batches and reschedule them if necessary"""
+ task_list, handler = {
+ 'refresh': (self.refresh_tasks, self._refresh_single),
+ 'pending': (self.pending_tasks, self._process_single)}[list_name]
+
+ now = time()
+ waiting = ((i, task) for i, task in enumerate(task_list)
+ if task.process_at is None or task.process_at <= now)
+
+ is_superseded = pipe(itemgetter(1), attrgetter('is_superseded'))
+ superseded, active = partition(is_superseded, waiting)
+ superseded = [(i, t.save(self.persist)) for i, t in superseded]
+
+ batch = islice(active, self.BATCH)
+ refreshed = [(i, handler(t)) for i, t in batch]
+
+ # Since pop changes the indexes in the list, we need to do it backwards
+ remove = sorted([i for i, _ in chain(refreshed, superseded)])
+ return len([task_list.pop(i) for i in reversed(remove)])
+
+ def _refresh_single(self, task):
+ """Refresh just a single task, and reschedule it if necessary"""
+ now = time()
+
+ result = task.refresh(self.connector, self.persist)
+ self.logger.debug('Refreshing WIM task: %s (%s): %s %s => %r',
+ task.id, task.status, task.action, task.item, result)
+
+ interval = self.REFRESH_BUILD if task.is_build else self.REFRESH_ACTIVE
+ self.schedule(task, now + interval, 'refresh')
+
+ return result
+
+ def _process_single(self, task):
+ """Process just a single task, and reschedule it if necessary"""
+ now = time()
+
+ result = task.process(self.connector, self.persist, self.ovim)
+ self.logger.debug('Executing WIM task: %s (%s): %s %s => %r',
+ task.id, task.status, task.action, task.item, result)
+
+ if task.action == 'DELETE':
+ del self.grouped_tasks[task.group_key]
+
+ self._insert_task[task.processing](task, now + self.RETRY_SCHEDULED)
+
+ return result
+
+ def insert_task(self, task):
+ """Send a message to the running thread
+
+ This function is supposed to be called outside of the WIM Thread.
+
+ Arguments:
+ task (str or dict): `"exit"`, `"reload"` or dict representing a
+ task. For more information about the fields in task, please
+ check the Action class.
+ """
+ try:
+ self.task_queue.put(task, False)
+ return None
+ except queue.Full:
+ ex = QueueFull(self.name)
+ reraise(ex.__class__, ex, exc_info()[2])
+
+ def reload(self):
+ """Send a message to the running thread to reload itself"""
+ self.insert_task('reload')
+
+ def exit(self):
+ """Send a message to the running thread to kill itself"""
+ self.insert_task('exit')
+
+ def run(self):
+ self.logger.debug('Starting: %s', self.name)
+ recovery_time = 0
+ while True:
+ self.on_start()
+ reload_thread = False
+ self.logger.debug('Reloaded: %s', self.name)
+
+ while True:
+ with self.avoid_exceptions():
+ while not self.task_queue.empty():
+ task = self.task_queue.get()
+ if isinstance(task, dict):
+ self.insert_pending_tasks([task])
+ elif isinstance(task, list):
+ self.insert_pending_tasks(task)
+ elif isinstance(task, str):
+ if task == 'exit':
+ self.logger.debug('Finishing: %s', self.name)
+ return 0
+ elif task == 'reload':
+ reload_thread = True
+ break
+ self.task_queue.task_done()
+
+ if reload_thread:
+ break
+
+ if not(self.process_list('pending') +
+ self.process_list('refresh')):
+ sleep(self.WAITING_TIME)
+
+ if isinstance(self.connector, FailingConnector):
+ # Wait sometime to try instantiating the connector
+ # again and restart
+ # Increase the recovery time if restarting is not
+ # working (up to a limit)
+ recovery_time = min(self.MAX_RECOVERY_TIME,
+ recovery_time + self.RECOVERY_TIME)
+ sleep(recovery_time)
+ break
+ else:
+ recovery_time = 0
+
+ self.logger.debug("Finishing")
+
+
+def is_pending_group(group):
+ return all(task['action'] != 'DELETE' or
+ task['status'] == 'SCHEDULED'
+ for task in group)
+
+
+def filter_pending_tasks(group):
+ return (t for t in group
+ if (t['status'] == 'SCHEDULED' or
+ t['action'] in ('CREATE', 'FIND')))
+
+
+def action_from(record, logger=None, mapping=ACTIONS):
+ """Create an Action object from a action record (dict)
+
+ Arguments:
+ mapping (dict): Nested data structure that maps the relationship
+ between action properties and object constructors. This data
+ structure should be a dict with 2 levels of keys: item type and
+ action type. Example::
+ {'wan_link':
+ {'CREATE': WanLinkCreate}
+ ...}
+ ...}
+ record (dict): action information
+
+ Return:
+ (Action.Base): Object representing the action
+ """
+ ensure('item' in record, Invalid('`record` should contain "item"'))
+ ensure('action' in record, Invalid('`record` should contain "action"'))
+
+ try:
+ factory = mapping[record['item']][record['action']]
+ return factory(record, logger=logger)
+ except KeyError:
+ ex = UndefinedAction(record['item'], record['action'])
+ reraise(ex.__class__, ex, exc_info()[2])
diff --git a/osm_ro/wim/wimconn.py b/osm_ro/wim/wimconn.py
new file mode 100644
index 0000000..92b6db0
--- /dev/null
+++ b/osm_ro/wim/wimconn.py
@@ -0,0 +1,236 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+"""The WIM connector is responsible for establishing wide area network
+connectivity.
+
+It receives information from the WimThread/WAN Actions about the endpoints of
+a link that spans across multiple datacenters and stablish a path between them.
+"""
+import logging
+
+from ..http_tools.errors import HttpMappedError
+
+
+class WimConnectorError(HttpMappedError):
+ """Base Exception for all connector related errors"""
+
+
+class WimConnector(object):
+ """Abstract base class for all the WIM connectors
+
+ Arguments:
+ wim (dict): WIM record, as stored in the database
+ wim_account (dict): WIM account record, as stored in the database
+ config (dict): optional persistent information related to an specific
+ connector. Inside this dict, a special key,
+ ``service_endpoint_mapping`` provides the internal endpoint
+ mapping.
+ logger (logging.Logger): optional logger object. If none is passed
+ ``openmano.wim.wimconn`` is used.
+
+ The arguments of the constructor are converted to object attributes.
+ An extra property, ``service_endpoint_mapping`` is created from ``config``.
+ """
+ def __init__(self, wim, wim_account, config=None, logger=None):
+ self.logger = logger or logging.getLogger('openmano.wim.wimconn')
+
+ self.wim = wim
+ self.wim_account = wim_account
+ self.config = config or {}
+ self.service_endpoint_mapping = (
+ config.get('service_endpoint_mapping', []))
+
+ def check_credentials(self):
+ """Check if the connector itself can access the WIM.
+
+ Raises:
+ WimConnectorError: Issues regarding authorization, access to
+ external URLs, etc are detected.
+ """
+ raise NotImplementedError
+
+ def get_connectivity_service_status(self, service_uuid, conn_info=None):
+ """Monitor the status of the connectivity service established
+
+ Arguments:
+ service_uuid (str): UUID of the connectivity service
+ conn_info (dict or None): Information returned by the connector
+ during the service creation/edition and subsequently stored in
+ the database.
+
+ Returns:
+ dict: JSON/YAML-serializable dict that contains a mandatory key
+ ``wim_status`` associated with one of the following values::
+
+ {'wim_status': 'ACTIVE'}
+ # The service is up and running.
+
+ {'wim_status': 'INACTIVE'}
+ # The service was created, but the connector
+ # cannot determine yet if connectivity exists
+ # (ideally, the caller needs to wait and check again).
+
+ {'wim_status': 'DOWN'}
+ # Connection was previously established,
+ # but an error/failure was detected.
+
+ {'wim_status': 'ERROR'}
+ # An error occurred when trying to create the service/
+ # establish the connectivity.
+
+ {'wim_status': 'BUILD'}
+ # Still trying to create the service, the caller
+ # needs to wait and check again.
+
+ Additionally ``error_msg``(**str**) and ``wim_info``(**dict**)
+ keys can be used to provide additional status explanation or
+ new information available for the connectivity service.
+ """
+ raise NotImplementedError
+
+ def create_connectivity_service(self, service_type, connection_points,
+ **kwargs):
+ """Stablish WAN connectivity between the endpoints
+
+ Arguments:
+ service_type (str): ``ELINE`` (L2), ``ELAN`` (L2), ``ETREE`` (L2),
+ ``L3``.
+ connection_points (list): each point corresponds to
+ an entry point from the DC to the transport network. One
+ connection point serves to identify the specific access and
+ some other service parameters, such as encapsulation type.
+ Represented by a dict as follows::
+
+ {
+ "service_endpoint_id": ..., (str[uuid])
+ "service_endpoint_encapsulation_type": ...,
+ (enum: none, dot1q, ...)
+ "service_endpoint_encapsulation_info": {
+ ... (dict)
+ "vlan": ..., (int, present if encapsulation is dot1q)
+ "vni": ... (int, present if encapsulation is vxlan),
+ "peers": [(ipv4_1), (ipv4_2)]
+ (present if encapsulation is vxlan)
+ }
+ }
+
+ The service endpoint ID should be previously informed to the WIM
+ engine in the RO when the WIM port mapping is registered.
+
+ Keyword Arguments:
+ bandwidth (int): value in kilobytes
+ latency (int): value in milliseconds
+
+ Other QoS might be passed as keyword arguments.
+
+ Returns:
+ tuple: ``(service_id, conn_info)`` containing:
+ - *service_uuid* (str): UUID of the established connectivity
+ service
+ - *conn_info* (dict or None): Information to be stored at the
+ database (or ``None``). This information will be provided to
+ the :meth:`~.edit_connectivity_service` and :obj:`~.delete`.
+ **MUST** be JSON/YAML-serializable (plain data structures).
+
+ Raises:
+ WimConnectorException: In case of error.
+ """
+ raise NotImplementedError
+
+ def delete_connectivity_service(self, service_uuid, conn_info=None):
+ """Disconnect multi-site endpoints previously connected
+
+ This method should receive as arguments both the UUID and the
+ connection info dict (respectively), as returned by
+ :meth:`~.create_connectivity_service` and
+ :meth:`~.edit_connectivity_service`.
+
+ Arguments:
+ service_uuid (str): UUID of the connectivity service
+ conn_info (dict or None): Information returned by the connector
+ during the service creation and subsequently stored in the
+ database.
+
+ Raises:
+ WimConnectorException: In case of error.
+ """
+ raise NotImplementedError
+
+ def edit_connectivity_service(self, service_uuid, conn_info=None,
+ connection_points=None, **kwargs):
+ """Change an existing connectivity service.
+
+ This method's arguments and return value follow the same convention as
+ :meth:`~.create_connectivity_service`.
+
+ Arguments:
+ service_uuid (str): UUID of the connectivity service.
+ conn_info (dict or None): Information previously stored in the
+ database.
+ connection_points (list): If provided, the old list of connection
+ points will be replaced.
+
+ Returns:
+ dict or None: Information to be updated and stored at the
+ database.
+ When ``None`` is returned, no information should be changed.
+ When an empty dict is returned, the database record will be
+ deleted.
+ **MUST** be JSON/YAML-serializable (plain data structures).
+
+ Raises:
+ WimConnectorException: In case of error.
+ """
+ raise NotImplementedError
+
+ def clear_all_connectivity_services(self):
+ """Delete all WAN Links in a WIM.
+
+ This method is intended for debugging only, and should delete all the
+ connections controlled by the WIM, not only the WIM connections that
+ a specific RO is aware of.
+
+ Raises:
+ WimConnectorException: In case of error.
+ """
+ raise NotImplementedError
+
+ def get_all_active_connectivity_services(self):
+ """Provide information about all active connections provisioned by a
+ WIM.
+
+ Raises:
+ WimConnectorException: In case of error.
+ """
+ raise NotImplementedError
diff --git a/osm_ro/wim/wimconn_odl.py b/osm_ro/wim/wimconn_odl.py
new file mode 100644
index 0000000..2371046
--- /dev/null
+++ b/osm_ro/wim/wimconn_odl.py
@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+from .wimconn import WimConnector
+
+
+# TODO: Basically create this file
+
+class OdlConnector(WimConnector):
+ def get_connectivity_service_status(self, link_uuid):
+ raise NotImplementedError
+
+ def create_connectivity_service(self, *args, **kwargs):
+ raise NotImplementedError
+
+ def delete_connectivity_service(self, link_uuid):
+ raise NotImplementedError