X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_ro%2Fwim%2Fwim_thread.py;fp=osm_ro%2Fwim%2Fwim_thread.py;h=a13d6a25f42d3eedf11339838dcb5d528594c524;hb=0446cd5df24c38f95cea13b995c553e9b2403f21;hp=0000000000000000000000000000000000000000;hpb=63056c57eea17465ada68bcc076a0159d9c5f93f;p=osm%2FRO.git diff --git a/osm_ro/wim/wim_thread.py b/osm_ro/wim/wim_thread.py new file mode 100644 index 00000000..a13d6a25 --- /dev/null +++ b/osm_ro/wim/wim_thread.py @@ -0,0 +1,437 @@ +# -*- coding: utf-8 -*- +## +# Copyright 2018 University of Bristol - High Performance Networks Research +# Group +# All Rights Reserved. +# +# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique +# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: +# +# Neither the name of the University of Bristol nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# This work has been performed in the context of DCMS UK 5G Testbeds +# & Trials Programme and in the framework of the Metro-Haul project - +# funded by the European Commission under Grant number 761727 through the +# Horizon 2020 and 5G-PPP programmes. +## + +""" +Thread-based interaction with WIMs. Tasks are stored in the +database (vim_wim_actions table) and processed sequentially + +Please check the Action class for information about the content of each action. +""" + +import logging +import threading +from contextlib import contextmanager +from functools import partial +from itertools import islice, chain, takewhile +from operator import itemgetter, attrgetter +from sys import exc_info +from time import time, sleep + +from six import reraise +from six.moves import queue + +from . import wan_link_actions, wimconn_odl # wimconn_tapi +from ..utils import ensure, partition, pipe +from .actions import IGNORE, PENDING, REFRESH +from .errors import ( + DbBaseException, + QueueFull, + InvalidParameters as Invalid, + UndefinedAction, +) +from .failing_connector import FailingConnector +from .wimconn import WimConnectorError + +ACTIONS = { + 'instance_wim_nets': wan_link_actions.ACTIONS +} + +CONNECTORS = { + "odl": wimconn_odl.OdlConnector, + # "tapi": wimconn_tapi + # Add extra connectors here +} + + +class WimThread(threading.Thread): + """Specialized task queue implementation that runs in an isolated thread. + + Objects of this class have a few methods that are intended to be used + outside of the thread: + + - start + - insert_task + - reload + - exit + + All the other methods are used internally to manipulate/process the task + queue. + """ + RETRY_SCHEDULED = 10 # 10 seconds + REFRESH_BUILD = 10 # 10 seconds + REFRESH_ACTIVE = 60 # 1 minute + BATCH = 10 # 10 actions per round + QUEUE_SIZE = 2000 + RECOVERY_TIME = 5 # Sleep 5s to leave the system some time to recover + MAX_RECOVERY_TIME = 180 + WAITING_TIME = 1 # Wait 1s for taks to arrive, when there are none + + def __init__(self, persistence, wim_account, logger=None, ovim=None): + """Init a thread. + + Arguments: + persistence: Database abstraction layer + wim_account: Record containing wim_account, tenant and wim + information. + """ + name = '{}.{}.{}'.format(wim_account['wim']['name'], + wim_account['name'], wim_account['uuid']) + super(WimThread, self).__init__(name=name) + + self.name = name + self.connector = None + self.wim_account = wim_account + + self.logger = logger or logging.getLogger('openmano.wim.'+self.name) + self.persist = persistence + self.ovim = ovim + + self.task_queue = queue.Queue(self.QUEUE_SIZE) + + self.refresh_tasks = [] + """Time ordered task list for refreshing the status of WIM nets""" + + self.pending_tasks = [] + """Time ordered task list for creation, deletion of WIM nets""" + + self.grouped_tasks = {} + """ It contains all the creation/deletion pending tasks grouped by + its concrete vm, net, etc + + : + - # e.g. CREATE task + # e.g. DELETE task + """ + + self._insert_task = { + PENDING: partial(self.schedule, list_name='pending'), + REFRESH: partial(self.schedule, list_name='refresh'), + IGNORE: lambda task, *_, **__: task.save(self.persist)} + """Send the task to the right processing queue""" + + def on_start(self): + """Run a series of procedures every time the thread (re)starts""" + self.connector = self.get_connector() + self.reload_actions() + + def get_connector(self): + """Create an WimConnector instance according to the wim.type""" + error_msg = '' + account_id = self.wim_account['uuid'] + try: + account = self.persist.get_wim_account_by( + uuid=account_id, hide=None) # Credentials need to be available + wim = account['wim'] + mapping = self.persist.query('wim_port_mappings', + WHERE={'wim_id': wim['uuid']}, + error_if_none=False) + return CONNECTORS[wim['type']](wim, account, { + 'service_endpoint_mapping': mapping or [] + }) + except DbBaseException as ex: + error_msg = ('Error when retrieving WIM account ({})\n' + .format(account_id)) + str(ex) + self.logger.error(error_msg, exc_info=True) + except KeyError as ex: + error_msg = ('Unable to find the WIM connector for WIM ({})\n' + .format(wim['type'])) + str(ex) + self.logger.error(error_msg, exc_info=True) + except (WimConnectorError, Exception) as ex: + # TODO: Remove the Exception class here when the connector class is + # ready + error_msg = ('Error when loading WIM connector for WIM ({})\n' + .format(wim['type'])) + str(ex) + self.logger.error(error_msg, exc_info=True) + + error_msg_extra = ('Any task targeting WIM account {} ({}) will fail.' + .format(account_id, self.wim_account.get('name'))) + self.logger.warning(error_msg_extra) + return FailingConnector(error_msg + '\n' + error_msg_extra) + + @contextmanager + def avoid_exceptions(self): + """Make a real effort to keep the thread alive, by avoiding the + exceptions. They are instead logged as a critical errors. + """ + try: + yield + except Exception as ex: + self.logger.critical("Unexpected exception %s", ex, exc_info=True) + sleep(self.RECOVERY_TIME) + + def reload_actions(self, group_limit=100): + """Read actions from database and reload them at memory. + + This method will clean and reload the attributes ``refresh_tasks``, + ``pending_tasks`` and ``grouped_tasks`` + + Attributes: + group_limit (int): maximum number of action groups (those that + refer to the same ````) to be retrieved from the + database in each batch. + """ + + # First we clean the cache to let the garbage collector work + self.refresh_tasks = [] + self.pending_tasks = [] + self.grouped_tasks = {} + + offset = 0 + + while True: + # Do things in batches + task_groups = self.persist.get_actions_in_groups( + self.wim_account['uuid'], item_types=('instance_wim_nets',), + group_offset=offset, group_limit=group_limit) + offset += (group_limit - 1) # Update for the next batch + + if not task_groups: + break + + pending_groups = (g for _, g in task_groups if is_pending_group(g)) + + for task_list in pending_groups: + with self.avoid_exceptions(): + self.insert_pending_tasks(filter_pending_tasks(task_list)) + + self.logger.debug( + 'Reloaded wim actions pending: %d refresh: %d', + len(self.pending_tasks), len(self.refresh_tasks)) + + def insert_pending_tasks(self, task_list): + """Insert task in the list of actions being processed""" + task_list = [action_from(task, self.logger) for task in task_list] + + for task in task_list: + group = task.group_key + self.grouped_tasks.setdefault(group, []) + # Each task can try to supersede the other ones, + # but just DELETE actions will actually do + task.supersede(self.grouped_tasks[group]) + self.grouped_tasks[group].append(task) + + # We need a separate loop so each task can check all the other + # ones before deciding + for task in task_list: + self._insert_task[task.processing](task) + self.logger.debug('Insert WIM task: %s (%s): %s %s', + task.id, task.status, task.action, task.item) + + def schedule(self, task, when=None, list_name='pending'): + """Insert a task in the correct list, respecting the schedule. + The refreshing list is ordered by threshold_time (task.process_at) + It is assumed that this is called inside this thread + + Arguments: + task (Action): object representing the task. + This object must implement the ``process`` method and inherit + from the ``Action`` class + list_name: either 'refresh' or 'pending' + when (float): unix time in seconds since as a float number + """ + processing_list = {'refresh': self.refresh_tasks, + 'pending': self.pending_tasks}[list_name] + + when = when or time() + task.process_at = when + + schedule = (t.process_at for t in processing_list) + index = len(list(takewhile(lambda moment: moment <= when, schedule))) + + processing_list.insert(index, task) + self.logger.debug( + 'Schedule of %s in "%s" - waiting position: %d (%f)', + task.id, list_name, index, task.process_at) + + return task + + def process_list(self, list_name='pending'): + """Process actions in batches and reschedule them if necessary""" + task_list, handler = { + 'refresh': (self.refresh_tasks, self._refresh_single), + 'pending': (self.pending_tasks, self._process_single)}[list_name] + + now = time() + waiting = ((i, task) for i, task in enumerate(task_list) + if task.process_at is None or task.process_at <= now) + + is_superseded = pipe(itemgetter(1), attrgetter('is_superseded')) + superseded, active = partition(is_superseded, waiting) + superseded = [(i, t.save(self.persist)) for i, t in superseded] + + batch = islice(active, self.BATCH) + refreshed = [(i, handler(t)) for i, t in batch] + + # Since pop changes the indexes in the list, we need to do it backwards + remove = sorted([i for i, _ in chain(refreshed, superseded)]) + return len([task_list.pop(i) for i in reversed(remove)]) + + def _refresh_single(self, task): + """Refresh just a single task, and reschedule it if necessary""" + now = time() + + result = task.refresh(self.connector, self.persist) + self.logger.debug('Refreshing WIM task: %s (%s): %s %s => %r', + task.id, task.status, task.action, task.item, result) + + interval = self.REFRESH_BUILD if task.is_build else self.REFRESH_ACTIVE + self.schedule(task, now + interval, 'refresh') + + return result + + def _process_single(self, task): + """Process just a single task, and reschedule it if necessary""" + now = time() + + result = task.process(self.connector, self.persist, self.ovim) + self.logger.debug('Executing WIM task: %s (%s): %s %s => %r', + task.id, task.status, task.action, task.item, result) + + if task.action == 'DELETE': + del self.grouped_tasks[task.group_key] + + self._insert_task[task.processing](task, now + self.RETRY_SCHEDULED) + + return result + + def insert_task(self, task): + """Send a message to the running thread + + This function is supposed to be called outside of the WIM Thread. + + Arguments: + task (str or dict): `"exit"`, `"reload"` or dict representing a + task. For more information about the fields in task, please + check the Action class. + """ + try: + self.task_queue.put(task, False) + return None + except queue.Full: + ex = QueueFull(self.name) + reraise(ex.__class__, ex, exc_info()[2]) + + def reload(self): + """Send a message to the running thread to reload itself""" + self.insert_task('reload') + + def exit(self): + """Send a message to the running thread to kill itself""" + self.insert_task('exit') + + def run(self): + self.logger.debug('Starting: %s', self.name) + recovery_time = 0 + while True: + self.on_start() + reload_thread = False + self.logger.debug('Reloaded: %s', self.name) + + while True: + with self.avoid_exceptions(): + while not self.task_queue.empty(): + task = self.task_queue.get() + if isinstance(task, dict): + self.insert_pending_tasks([task]) + elif isinstance(task, list): + self.insert_pending_tasks(task) + elif isinstance(task, str): + if task == 'exit': + self.logger.debug('Finishing: %s', self.name) + return 0 + elif task == 'reload': + reload_thread = True + break + self.task_queue.task_done() + + if reload_thread: + break + + if not(self.process_list('pending') + + self.process_list('refresh')): + sleep(self.WAITING_TIME) + + if isinstance(self.connector, FailingConnector): + # Wait sometime to try instantiating the connector + # again and restart + # Increase the recovery time if restarting is not + # working (up to a limit) + recovery_time = min(self.MAX_RECOVERY_TIME, + recovery_time + self.RECOVERY_TIME) + sleep(recovery_time) + break + else: + recovery_time = 0 + + self.logger.debug("Finishing") + + +def is_pending_group(group): + return all(task['action'] != 'DELETE' or + task['status'] == 'SCHEDULED' + for task in group) + + +def filter_pending_tasks(group): + return (t for t in group + if (t['status'] == 'SCHEDULED' or + t['action'] in ('CREATE', 'FIND'))) + + +def action_from(record, logger=None, mapping=ACTIONS): + """Create an Action object from a action record (dict) + + Arguments: + mapping (dict): Nested data structure that maps the relationship + between action properties and object constructors. This data + structure should be a dict with 2 levels of keys: item type and + action type. Example:: + {'wan_link': + {'CREATE': WanLinkCreate} + ...} + ...} + record (dict): action information + + Return: + (Action.Base): Object representing the action + """ + ensure('item' in record, Invalid('`record` should contain "item"')) + ensure('action' in record, Invalid('`record` should contain "action"')) + + try: + factory = mapping[record['item']][record['action']] + return factory(record, logger=logger) + except KeyError: + ex = UndefinedAction(record['item'], record['action']) + reraise(ex.__class__, ex, exc_info()[2])