| # -*- coding: utf-8 -*- |
| ## |
| # Copyright 2018 University of Bristol - High Performance Networks Research |
| # Group |
| # All Rights Reserved. |
| # |
| # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique |
| # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| # |
| # For those usages not covered by the Apache License, Version 2.0 please |
| # contact with: <highperformance-networks@bristol.ac.uk> |
| # |
| # Neither the name of the University of Bristol nor the names of its |
| # contributors may be used to endorse or promote products derived from |
| # this software without specific prior written permission. |
| # |
| # This work has been performed in the context of DCMS UK 5G Testbeds |
| # & Trials Programme and in the framework of the Metro-Haul project - |
| # funded by the European Commission under Grant number 761727 through the |
| # Horizon 2020 and 5G-PPP programmes. |
| ## |
| |
| """ |
| Thread-based interaction with WIMs. Tasks are stored in the |
| database (vim_wim_actions table) and processed sequentially |
| |
| Please check the Action class for information about the content of each action. |
| """ |
| |
| import logging |
| import threading |
| from contextlib import contextmanager |
| from functools import partial |
| from itertools import islice, chain, takewhile |
| from operator import itemgetter, attrgetter |
| from sys import exc_info |
| from time import time, sleep |
| |
| from six import reraise |
| from six.moves import queue |
| |
| from . import wan_link_actions |
| from ..utils import ensure, partition, pipe |
| from .actions import IGNORE, PENDING, REFRESH |
| from .errors import ( |
| DbBaseException, |
| QueueFull, |
| InvalidParameters as Invalid, |
| UndefinedAction, |
| ) |
| from .failing_connector import FailingConnector |
| from .wimconn import WimConnectorError |
| from .wimconn_dynpac import DynpacConnector |
| from .wimconn_fake import FakeConnector |
| from .wimconn_ietfl2vpn import WimconnectorIETFL2VPN |
| |
| ACTIONS = { |
| 'instance_wim_nets': wan_link_actions.ACTIONS |
| } |
| |
| CONNECTORS = { |
| # "odl": wimconn_odl.OdlConnector, |
| "dynpac": DynpacConnector, |
| "fake": FakeConnector, |
| "tapi": WimconnectorIETFL2VPN, |
| # Add extra connectors here |
| } |
| |
| |
| class WimThread(threading.Thread): |
| """Specialized task queue implementation that runs in an isolated thread. |
| |
| Objects of this class have a few methods that are intended to be used |
| outside of the thread: |
| |
| - start |
| - insert_task |
| - reload |
| - exit |
| |
| All the other methods are used internally to manipulate/process the task |
| queue. |
| """ |
| RETRY_SCHEDULED = 10 # 10 seconds |
| REFRESH_BUILD = 10 # 10 seconds |
| REFRESH_ACTIVE = 60 # 1 minute |
| BATCH = 10 # 10 actions per round |
| QUEUE_SIZE = 2000 |
| RECOVERY_TIME = 5 # Sleep 5s to leave the system some time to recover |
| MAX_RECOVERY_TIME = 180 |
| WAITING_TIME = 1 # Wait 1s for taks to arrive, when there are none |
| |
| def __init__(self, persistence, wim_account, logger=None, ovim=None): |
| """Init a thread. |
| |
| Arguments: |
| persistence: Database abstraction layer |
| wim_account: Record containing wim_account, tenant and wim |
| information. |
| """ |
| name = '{}.{}.{}'.format(wim_account['wim']['name'], |
| wim_account['name'], wim_account['uuid']) |
| super(WimThread, self).__init__(name=name) |
| |
| self.name = name |
| self.connector = None |
| self.wim_account = wim_account |
| |
| self.logger = logger or logging.getLogger('openmano.wim.'+self.name) |
| self.persist = persistence |
| self.ovim = ovim |
| |
| self.task_queue = queue.Queue(self.QUEUE_SIZE) |
| |
| self.refresh_tasks = [] |
| """Time ordered task list for refreshing the status of WIM nets""" |
| |
| self.pending_tasks = [] |
| """Time ordered task list for creation, deletion of WIM nets""" |
| |
| self.grouped_tasks = {} |
| """ It contains all the creation/deletion pending tasks grouped by |
| its concrete vm, net, etc |
| |
| <item><item_id>: |
| - <task1> # e.g. CREATE task |
| <task2> # e.g. DELETE task |
| """ |
| |
| self._insert_task = { |
| PENDING: partial(self.schedule, list_name='pending'), |
| REFRESH: partial(self.schedule, list_name='refresh'), |
| IGNORE: lambda task, *_, **__: task.save(self.persist)} |
| """Send the task to the right processing queue""" |
| |
| def on_start(self): |
| """Run a series of procedures every time the thread (re)starts""" |
| self.connector = self.get_connector() |
| self.reload_actions() |
| |
| def get_connector(self): |
| """Create an WimConnector instance according to the wim.type""" |
| error_msg = '' |
| account_id = self.wim_account['uuid'] |
| try: |
| account = self.persist.get_wim_account_by( |
| uuid=account_id, hide=None) # Credentials need to be available |
| wim = account['wim'] |
| mapping = self.persist.query('wim_port_mappings', |
| WHERE={'wim_id': wim['uuid']}, |
| error_if_none=False) |
| return CONNECTORS[wim['type']](wim, account, { |
| 'service_endpoint_mapping': mapping or [] |
| }) |
| except DbBaseException as ex: |
| error_msg = ('Error when retrieving WIM account ({})\n' |
| .format(account_id)) + str(ex) |
| self.logger.error(error_msg, exc_info=True) |
| except KeyError as ex: |
| error_msg = ('Unable to find the WIM connector for WIM ({})\n' |
| .format(wim['type'])) + str(ex) |
| self.logger.error(error_msg, exc_info=True) |
| except (WimConnectorError, Exception) as ex: |
| # TODO: Remove the Exception class here when the connector class is |
| # ready |
| error_msg = ('Error when loading WIM connector for WIM ({})\n' |
| .format(wim['type'])) + str(ex) |
| self.logger.error(error_msg, exc_info=True) |
| |
| error_msg_extra = ('Any task targeting WIM account {} ({}) will fail.' |
| .format(account_id, self.wim_account.get('name'))) |
| self.logger.warning(error_msg_extra) |
| return FailingConnector(error_msg + '\n' + error_msg_extra) |
| |
| @contextmanager |
| def avoid_exceptions(self): |
| """Make a real effort to keep the thread alive, by avoiding the |
| exceptions. They are instead logged as a critical errors. |
| """ |
| try: |
| yield |
| except Exception as ex: |
| self.logger.critical("Unexpected exception %s", ex, exc_info=True) |
| sleep(self.RECOVERY_TIME) |
| |
| def reload_actions(self, group_limit=100): |
| """Read actions from database and reload them at memory. |
| |
| This method will clean and reload the attributes ``refresh_tasks``, |
| ``pending_tasks`` and ``grouped_tasks`` |
| |
| Attributes: |
| group_limit (int): maximum number of action groups (those that |
| refer to the same ``<item, item_id>``) to be retrieved from the |
| database in each batch. |
| """ |
| |
| # First we clean the cache to let the garbage collector work |
| self.refresh_tasks = [] |
| self.pending_tasks = [] |
| self.grouped_tasks = {} |
| |
| offset = 0 |
| |
| while True: |
| # Do things in batches |
| task_groups = self.persist.get_actions_in_groups( |
| self.wim_account['uuid'], item_types=('instance_wim_nets',), |
| group_offset=offset, group_limit=group_limit) |
| offset += (group_limit - 1) # Update for the next batch |
| |
| if not task_groups: |
| break |
| |
| pending_groups = (g for _, g in task_groups if is_pending_group(g)) |
| |
| for task_list in pending_groups: |
| with self.avoid_exceptions(): |
| self.insert_pending_tasks(filter_pending_tasks(task_list)) |
| |
| self.logger.debug( |
| 'Reloaded wim actions pending: %d refresh: %d', |
| len(self.pending_tasks), len(self.refresh_tasks)) |
| |
| def insert_pending_tasks(self, task_list): |
| """Insert task in the list of actions being processed""" |
| task_list = [action_from(task, self.logger) for task in task_list] |
| |
| for task in task_list: |
| group = task.group_key |
| self.grouped_tasks.setdefault(group, []) |
| # Each task can try to supersede the other ones, |
| # but just DELETE actions will actually do |
| task.supersede(self.grouped_tasks[group]) |
| self.grouped_tasks[group].append(task) |
| |
| # We need a separate loop so each task can check all the other |
| # ones before deciding |
| for task in task_list: |
| self._insert_task[task.processing](task) |
| self.logger.debug('Insert WIM task: %s (%s): %s %s', |
| task.id, task.status, task.action, task.item) |
| |
| def schedule(self, task, when=None, list_name='pending'): |
| """Insert a task in the correct list, respecting the schedule. |
| The refreshing list is ordered by threshold_time (task.process_at) |
| It is assumed that this is called inside this thread |
| |
| Arguments: |
| task (Action): object representing the task. |
| This object must implement the ``process`` method and inherit |
| from the ``Action`` class |
| list_name: either 'refresh' or 'pending' |
| when (float): unix time in seconds since as a float number |
| """ |
| processing_list = {'refresh': self.refresh_tasks, |
| 'pending': self.pending_tasks}[list_name] |
| |
| when = when or time() |
| task.process_at = when |
| |
| schedule = (t.process_at for t in processing_list) |
| index = len(list(takewhile(lambda moment: moment <= when, schedule))) |
| |
| processing_list.insert(index, task) |
| self.logger.debug( |
| 'Schedule of %s in "%s" - waiting position: %d (%f)', |
| task.id, list_name, index, task.process_at) |
| |
| return task |
| |
| def process_list(self, list_name='pending'): |
| """Process actions in batches and reschedule them if necessary""" |
| task_list, handler = { |
| 'refresh': (self.refresh_tasks, self._refresh_single), |
| 'pending': (self.pending_tasks, self._process_single)}[list_name] |
| |
| now = time() |
| waiting = ((i, task) for i, task in enumerate(task_list) |
| if task.process_at is None or task.process_at <= now) |
| |
| is_superseded = pipe(itemgetter(1), attrgetter('is_superseded')) |
| superseded, active = partition(is_superseded, waiting) |
| superseded = [(i, t.save(self.persist)) for i, t in superseded] |
| |
| batch = islice(active, self.BATCH) |
| refreshed = [(i, handler(t)) for i, t in batch] |
| |
| # Since pop changes the indexes in the list, we need to do it backwards |
| remove = sorted([i for i, _ in chain(refreshed, superseded)]) |
| return len([task_list.pop(i) for i in reversed(remove)]) |
| |
| def _refresh_single(self, task): |
| """Refresh just a single task, and reschedule it if necessary""" |
| now = time() |
| |
| result = task.refresh(self.connector, self.persist) |
| self.logger.debug('Refreshing WIM task: %s (%s): %s %s => %r', |
| task.id, task.status, task.action, task.item, result) |
| |
| interval = self.REFRESH_BUILD if task.is_build else self.REFRESH_ACTIVE |
| self.schedule(task, now + interval, 'refresh') |
| |
| return result |
| |
| def _process_single(self, task): |
| """Process just a single task, and reschedule it if necessary""" |
| now = time() |
| |
| result = task.process(self.connector, self.persist, self.ovim) |
| self.logger.debug('Executing WIM task: %s (%s): %s %s => %r', |
| task.id, task.status, task.action, task.item, result) |
| |
| if task.action == 'DELETE': |
| del self.grouped_tasks[task.group_key] |
| |
| self._insert_task[task.processing](task, now + self.RETRY_SCHEDULED) |
| |
| return result |
| |
| def insert_task(self, task): |
| """Send a message to the running thread |
| |
| This function is supposed to be called outside of the WIM Thread. |
| |
| Arguments: |
| task (str or dict): `"exit"`, `"reload"` or dict representing a |
| task. For more information about the fields in task, please |
| check the Action class. |
| """ |
| try: |
| self.task_queue.put(task, False) |
| return None |
| except queue.Full: |
| ex = QueueFull(self.name) |
| reraise(ex.__class__, ex, exc_info()[2]) |
| |
| def reload(self): |
| """Send a message to the running thread to reload itself""" |
| self.insert_task('reload') |
| |
| def exit(self): |
| """Send a message to the running thread to kill itself""" |
| self.insert_task('exit') |
| |
| def run(self): |
| self.logger.debug('Starting: %s', self.name) |
| recovery_time = 0 |
| while True: |
| self.on_start() |
| reload_thread = False |
| self.logger.debug('Reloaded: %s', self.name) |
| |
| while True: |
| with self.avoid_exceptions(): |
| while not self.task_queue.empty(): |
| task = self.task_queue.get() |
| if isinstance(task, dict): |
| self.insert_pending_tasks([task]) |
| elif isinstance(task, list): |
| self.insert_pending_tasks(task) |
| elif isinstance(task, str): |
| if task == 'exit': |
| self.logger.debug('Finishing: %s', self.name) |
| return 0 |
| elif task == 'reload': |
| reload_thread = True |
| break |
| self.task_queue.task_done() |
| |
| if reload_thread: |
| break |
| |
| if not(self.process_list('pending') + |
| self.process_list('refresh')): |
| sleep(self.WAITING_TIME) |
| |
| if isinstance(self.connector, FailingConnector): |
| # Wait sometime to try instantiating the connector |
| # again and restart |
| # Increase the recovery time if restarting is not |
| # working (up to a limit) |
| recovery_time = min(self.MAX_RECOVERY_TIME, |
| recovery_time + self.RECOVERY_TIME) |
| sleep(recovery_time) |
| break |
| else: |
| recovery_time = 0 |
| |
| self.logger.debug("Finishing") |
| |
| |
| def is_pending_group(group): |
| return all(task['action'] != 'DELETE' or |
| task['status'] == 'SCHEDULED' |
| for task in group) |
| |
| |
| def filter_pending_tasks(group): |
| return (t for t in group |
| if (t['status'] == 'SCHEDULED' or |
| t['action'] in ('CREATE', 'FIND'))) |
| |
| |
| def action_from(record, logger=None, mapping=ACTIONS): |
| """Create an Action object from a action record (dict) |
| |
| Arguments: |
| mapping (dict): Nested data structure that maps the relationship |
| between action properties and object constructors. This data |
| structure should be a dict with 2 levels of keys: item type and |
| action type. Example:: |
| {'wan_link': |
| {'CREATE': WanLinkCreate} |
| ...} |
| ...} |
| record (dict): action information |
| |
| Return: |
| (Action.Base): Object representing the action |
| """ |
| ensure('item' in record, Invalid('`record` should contain "item"')) |
| ensure('action' in record, Invalid('`record` should contain "action"')) |
| |
| try: |
| factory = mapping[record['item']][record['action']] |
| return factory(record, logger=logger) |
| except KeyError: |
| ex = UndefinedAction(record['item'], record['action']) |
| reraise(ex.__class__, ex, exc_info()[2]) |