blob: f37aba745d1aafca40ba1b289a7d8468cf1ff508 [file] [log] [blame]
# -*- coding: utf-8 -*-
##
# Copyright 2018 University of Bristol - High Performance Networks Research
# Group
# All Rights Reserved.
#
# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact with: <highperformance-networks@bristol.ac.uk>
#
# Neither the name of the University of Bristol nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# This work has been performed in the context of DCMS UK 5G Testbeds
# & Trials Programme and in the framework of the Metro-Haul project -
# funded by the European Commission under Grant number 761727 through the
# Horizon 2020 and 5G-PPP programmes.
##
"""
Thread-based interaction with WIMs. Tasks are stored in the
database (vim_wim_actions table) and processed sequentially
Please check the Action class for information about the content of each action.
"""
import logging
import threading
from contextlib import contextmanager
from functools import partial
from itertools import islice, chain, takewhile
from operator import itemgetter, attrgetter
from sys import exc_info
from time import time, sleep
from six import reraise
from six.moves import queue
from . import wan_link_actions
from ..utils import ensure, partition, pipe
from .actions import IGNORE, PENDING, REFRESH
from .errors import (
DbBaseException,
QueueFull,
InvalidParameters as Invalid,
UndefinedAction,
)
from .failing_connector import FailingConnector
from .wimconn import WimConnectorError
from .wimconn_dynpac import DynpacConnector
from .wimconn_fake import FakeConnector
from .wimconn_ietfl2vpn import WimconnectorIETFL2VPN
ACTIONS = {
'instance_wim_nets': wan_link_actions.ACTIONS
}
CONNECTORS = {
# "odl": wimconn_odl.OdlConnector,
"dynpac": DynpacConnector,
"fake": FakeConnector,
"tapi": WimconnectorIETFL2VPN,
# Add extra connectors here
}
class WimThread(threading.Thread):
"""Specialized task queue implementation that runs in an isolated thread.
Objects of this class have a few methods that are intended to be used
outside of the thread:
- start
- insert_task
- reload
- exit
All the other methods are used internally to manipulate/process the task
queue.
"""
RETRY_SCHEDULED = 10 # 10 seconds
REFRESH_BUILD = 10 # 10 seconds
REFRESH_ACTIVE = 60 # 1 minute
BATCH = 10 # 10 actions per round
QUEUE_SIZE = 2000
RECOVERY_TIME = 5 # Sleep 5s to leave the system some time to recover
MAX_RECOVERY_TIME = 180
WAITING_TIME = 1 # Wait 1s for taks to arrive, when there are none
def __init__(self, persistence, wim_account, logger=None, ovim=None):
"""Init a thread.
Arguments:
persistence: Database abstraction layer
wim_account: Record containing wim_account, tenant and wim
information.
"""
name = '{}.{}.{}'.format(wim_account['wim']['name'],
wim_account['name'], wim_account['uuid'])
super(WimThread, self).__init__(name=name)
self.name = name
self.connector = None
self.wim_account = wim_account
self.logger = logger or logging.getLogger('openmano.wim.'+self.name)
self.persist = persistence
self.ovim = ovim
self.task_queue = queue.Queue(self.QUEUE_SIZE)
self.refresh_tasks = []
"""Time ordered task list for refreshing the status of WIM nets"""
self.pending_tasks = []
"""Time ordered task list for creation, deletion of WIM nets"""
self.grouped_tasks = {}
""" It contains all the creation/deletion pending tasks grouped by
its concrete vm, net, etc
<item><item_id>:
- <task1> # e.g. CREATE task
<task2> # e.g. DELETE task
"""
self._insert_task = {
PENDING: partial(self.schedule, list_name='pending'),
REFRESH: partial(self.schedule, list_name='refresh'),
IGNORE: lambda task, *_, **__: task.save(self.persist)}
"""Send the task to the right processing queue"""
def on_start(self):
"""Run a series of procedures every time the thread (re)starts"""
self.connector = self.get_connector()
self.reload_actions()
def get_connector(self):
"""Create an WimConnector instance according to the wim.type"""
error_msg = ''
account_id = self.wim_account['uuid']
try:
account = self.persist.get_wim_account_by(
uuid=account_id, hide=None) # Credentials need to be available
wim = account['wim']
mapping = self.persist.query('wim_port_mappings',
WHERE={'wim_id': wim['uuid']},
error_if_none=False)
return CONNECTORS[wim['type']](wim, account, {
'service_endpoint_mapping': mapping or []
})
except DbBaseException as ex:
error_msg = ('Error when retrieving WIM account ({})\n'
.format(account_id)) + str(ex)
self.logger.error(error_msg, exc_info=True)
except KeyError as ex:
error_msg = ('Unable to find the WIM connector for WIM ({})\n'
.format(wim['type'])) + str(ex)
self.logger.error(error_msg, exc_info=True)
except (WimConnectorError, Exception) as ex:
# TODO: Remove the Exception class here when the connector class is
# ready
error_msg = ('Error when loading WIM connector for WIM ({})\n'
.format(wim['type'])) + str(ex)
self.logger.error(error_msg, exc_info=True)
error_msg_extra = ('Any task targeting WIM account {} ({}) will fail.'
.format(account_id, self.wim_account.get('name')))
self.logger.warning(error_msg_extra)
return FailingConnector(error_msg + '\n' + error_msg_extra)
@contextmanager
def avoid_exceptions(self):
"""Make a real effort to keep the thread alive, by avoiding the
exceptions. They are instead logged as a critical errors.
"""
try:
yield
except Exception as ex:
self.logger.critical("Unexpected exception %s", ex, exc_info=True)
sleep(self.RECOVERY_TIME)
def reload_actions(self, group_limit=100):
"""Read actions from database and reload them at memory.
This method will clean and reload the attributes ``refresh_tasks``,
``pending_tasks`` and ``grouped_tasks``
Attributes:
group_limit (int): maximum number of action groups (those that
refer to the same ``<item, item_id>``) to be retrieved from the
database in each batch.
"""
# First we clean the cache to let the garbage collector work
self.refresh_tasks = []
self.pending_tasks = []
self.grouped_tasks = {}
offset = 0
while True:
# Do things in batches
task_groups = self.persist.get_actions_in_groups(
self.wim_account['uuid'], item_types=('instance_wim_nets',),
group_offset=offset, group_limit=group_limit)
offset += (group_limit - 1) # Update for the next batch
if not task_groups:
break
pending_groups = (g for _, g in task_groups if is_pending_group(g))
for task_list in pending_groups:
with self.avoid_exceptions():
self.insert_pending_tasks(filter_pending_tasks(task_list))
self.logger.debug(
'Reloaded wim actions pending: %d refresh: %d',
len(self.pending_tasks), len(self.refresh_tasks))
def insert_pending_tasks(self, task_list):
"""Insert task in the list of actions being processed"""
task_list = [action_from(task, self.logger) for task in task_list]
for task in task_list:
group = task.group_key
self.grouped_tasks.setdefault(group, [])
# Each task can try to supersede the other ones,
# but just DELETE actions will actually do
task.supersede(self.grouped_tasks[group])
self.grouped_tasks[group].append(task)
# We need a separate loop so each task can check all the other
# ones before deciding
for task in task_list:
self._insert_task[task.processing](task)
self.logger.debug('Insert WIM task: %s (%s): %s %s',
task.id, task.status, task.action, task.item)
def schedule(self, task, when=None, list_name='pending'):
"""Insert a task in the correct list, respecting the schedule.
The refreshing list is ordered by threshold_time (task.process_at)
It is assumed that this is called inside this thread
Arguments:
task (Action): object representing the task.
This object must implement the ``process`` method and inherit
from the ``Action`` class
list_name: either 'refresh' or 'pending'
when (float): unix time in seconds since as a float number
"""
processing_list = {'refresh': self.refresh_tasks,
'pending': self.pending_tasks}[list_name]
when = when or time()
task.process_at = when
schedule = (t.process_at for t in processing_list)
index = len(list(takewhile(lambda moment: moment <= when, schedule)))
processing_list.insert(index, task)
self.logger.debug(
'Schedule of %s in "%s" - waiting position: %d (%f)',
task.id, list_name, index, task.process_at)
return task
def process_list(self, list_name='pending'):
"""Process actions in batches and reschedule them if necessary"""
task_list, handler = {
'refresh': (self.refresh_tasks, self._refresh_single),
'pending': (self.pending_tasks, self._process_single)}[list_name]
now = time()
waiting = ((i, task) for i, task in enumerate(task_list)
if task.process_at is None or task.process_at <= now)
is_superseded = pipe(itemgetter(1), attrgetter('is_superseded'))
superseded, active = partition(is_superseded, waiting)
superseded = [(i, t.save(self.persist)) for i, t in superseded]
batch = islice(active, self.BATCH)
refreshed = [(i, handler(t)) for i, t in batch]
# Since pop changes the indexes in the list, we need to do it backwards
remove = sorted([i for i, _ in chain(refreshed, superseded)])
return len([task_list.pop(i) for i in reversed(remove)])
def _refresh_single(self, task):
"""Refresh just a single task, and reschedule it if necessary"""
now = time()
result = task.refresh(self.connector, self.persist)
self.logger.debug('Refreshing WIM task: %s (%s): %s %s => %r',
task.id, task.status, task.action, task.item, result)
interval = self.REFRESH_BUILD if task.is_build else self.REFRESH_ACTIVE
self.schedule(task, now + interval, 'refresh')
return result
def _process_single(self, task):
"""Process just a single task, and reschedule it if necessary"""
now = time()
result = task.process(self.connector, self.persist, self.ovim)
self.logger.debug('Executing WIM task: %s (%s): %s %s => %r',
task.id, task.status, task.action, task.item, result)
if task.action == 'DELETE':
del self.grouped_tasks[task.group_key]
self._insert_task[task.processing](task, now + self.RETRY_SCHEDULED)
return result
def insert_task(self, task):
"""Send a message to the running thread
This function is supposed to be called outside of the WIM Thread.
Arguments:
task (str or dict): `"exit"`, `"reload"` or dict representing a
task. For more information about the fields in task, please
check the Action class.
"""
try:
self.task_queue.put(task, False)
return None
except queue.Full:
ex = QueueFull(self.name)
reraise(ex.__class__, ex, exc_info()[2])
def reload(self):
"""Send a message to the running thread to reload itself"""
self.insert_task('reload')
def exit(self):
"""Send a message to the running thread to kill itself"""
self.insert_task('exit')
def run(self):
self.logger.debug('Starting: %s', self.name)
recovery_time = 0
while True:
self.on_start()
reload_thread = False
self.logger.debug('Reloaded: %s', self.name)
while True:
with self.avoid_exceptions():
while not self.task_queue.empty():
task = self.task_queue.get()
if isinstance(task, dict):
self.insert_pending_tasks([task])
elif isinstance(task, list):
self.insert_pending_tasks(task)
elif isinstance(task, str):
if task == 'exit':
self.logger.debug('Finishing: %s', self.name)
return 0
elif task == 'reload':
reload_thread = True
break
self.task_queue.task_done()
if reload_thread:
break
if not(self.process_list('pending') +
self.process_list('refresh')):
sleep(self.WAITING_TIME)
if isinstance(self.connector, FailingConnector):
# Wait sometime to try instantiating the connector
# again and restart
# Increase the recovery time if restarting is not
# working (up to a limit)
recovery_time = min(self.MAX_RECOVERY_TIME,
recovery_time + self.RECOVERY_TIME)
sleep(recovery_time)
break
else:
recovery_time = 0
self.logger.debug("Finishing")
def is_pending_group(group):
return all(task['action'] != 'DELETE' or
task['status'] == 'SCHEDULED'
for task in group)
def filter_pending_tasks(group):
return (t for t in group
if (t['status'] == 'SCHEDULED' or
t['action'] in ('CREATE', 'FIND')))
def action_from(record, logger=None, mapping=ACTIONS):
"""Create an Action object from a action record (dict)
Arguments:
mapping (dict): Nested data structure that maps the relationship
between action properties and object constructors. This data
structure should be a dict with 2 levels of keys: item type and
action type. Example::
{'wan_link':
{'CREATE': WanLinkCreate}
...}
...}
record (dict): action information
Return:
(Action.Base): Object representing the action
"""
ensure('item' in record, Invalid('`record` should contain "item"'))
ensure('action' in record, Invalid('`record` should contain "action"'))
try:
factory = mapping[record['item']][record['action']]
return factory(record, logger=logger)
except KeyError:
ex = UndefinedAction(record['item'], record['action'])
reraise(ex.__class__, ex, exc_info()[2])