| Anderson Bravalheri | 0446cd5 | 2018-08-17 15:26:19 +0100 | [diff] [blame] | 1 | # -*- coding: utf-8 -*- |
| 2 | ## |
| 3 | # Copyright 2018 University of Bristol - High Performance Networks Research |
| 4 | # Group |
| 5 | # All Rights Reserved. |
| 6 | # |
| 7 | # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique |
| 8 | # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou |
| 9 | # |
| 10 | # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| 11 | # not use this file except in compliance with the License. You may obtain |
| 12 | # a copy of the License at |
| 13 | # |
| 14 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 15 | # |
| 16 | # Unless required by applicable law or agreed to in writing, software |
| 17 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 18 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 19 | # License for the specific language governing permissions and limitations |
| 20 | # under the License. |
| 21 | # |
| 22 | # For those usages not covered by the Apache License, Version 2.0 please |
| 23 | # contact with: <highperformance-networks@bristol.ac.uk> |
| 24 | # |
| 25 | # Neither the name of the University of Bristol nor the names of its |
| 26 | # contributors may be used to endorse or promote products derived from |
| 27 | # this software without specific prior written permission. |
| 28 | # |
| 29 | # This work has been performed in the context of DCMS UK 5G Testbeds |
| 30 | # & Trials Programme and in the framework of the Metro-Haul project - |
| 31 | # funded by the European Commission under Grant number 761727 through the |
| 32 | # Horizon 2020 and 5G-PPP programmes. |
| 33 | ## |
| 34 | |
| 35 | """ |
| 36 | Thread-based interaction with WIMs. Tasks are stored in the |
| 37 | database (vim_wim_actions table) and processed sequentially |
| 38 | |
| 39 | Please check the Action class for information about the content of each action. |
| 40 | """ |
| 41 | |
| 42 | import logging |
| 43 | import threading |
| 44 | from contextlib import contextmanager |
| 45 | from functools import partial |
| 46 | from itertools import islice, chain, takewhile |
| 47 | from operator import itemgetter, attrgetter |
| 48 | from sys import exc_info |
| 49 | from time import time, sleep |
| 50 | |
| 51 | from six import reraise |
| 52 | from six.moves import queue |
| 53 | |
| tierno | c3dfbfb | 2019-05-22 09:56:05 +0000 | [diff] [blame] | 54 | from . import wan_link_actions |
| Anderson Bravalheri | 0446cd5 | 2018-08-17 15:26:19 +0100 | [diff] [blame] | 55 | from ..utils import ensure, partition, pipe |
| 56 | from .actions import IGNORE, PENDING, REFRESH |
| 57 | from .errors import ( |
| 58 | DbBaseException, |
| 59 | QueueFull, |
| 60 | InvalidParameters as Invalid, |
| 61 | UndefinedAction, |
| 62 | ) |
| 63 | from .failing_connector import FailingConnector |
| 64 | from .wimconn import WimConnectorError |
| tierno | c3dfbfb | 2019-05-22 09:56:05 +0000 | [diff] [blame] | 65 | from .wimconn_dynpac import DynpacConnector |
| 66 | from .wimconn_fake import FakeConnector |
| 67 | from .wimconn_ietfl2vpn import WimconnectorIETFL2VPN |
| Anderson Bravalheri | 0446cd5 | 2018-08-17 15:26:19 +0100 | [diff] [blame] | 68 | |
| 69 | ACTIONS = { |
| 70 | 'instance_wim_nets': wan_link_actions.ACTIONS |
| 71 | } |
| 72 | |
| 73 | CONNECTORS = { |
| tierno | c3dfbfb | 2019-05-22 09:56:05 +0000 | [diff] [blame] | 74 | # "odl": wimconn_odl.OdlConnector, |
| 75 | "dynpac": DynpacConnector, |
| 76 | "fake": FakeConnector, |
| tierno | 250e329 | 2019-05-23 08:07:12 +0000 | [diff] [blame] | 77 | "tapi": WimconnectorIETFL2VPN, |
| Anderson Bravalheri | 0446cd5 | 2018-08-17 15:26:19 +0100 | [diff] [blame] | 78 | # Add extra connectors here |
| Anderson Bravalheri | 0446cd5 | 2018-08-17 15:26:19 +0100 | [diff] [blame] | 79 | } |
| 80 | |
| 81 | |
| 82 | class WimThread(threading.Thread): |
| 83 | """Specialized task queue implementation that runs in an isolated thread. |
| 84 | |
| 85 | Objects of this class have a few methods that are intended to be used |
| 86 | outside of the thread: |
| 87 | |
| 88 | - start |
| 89 | - insert_task |
| 90 | - reload |
| 91 | - exit |
| 92 | |
| 93 | All the other methods are used internally to manipulate/process the task |
| 94 | queue. |
| 95 | """ |
| 96 | RETRY_SCHEDULED = 10 # 10 seconds |
| 97 | REFRESH_BUILD = 10 # 10 seconds |
| 98 | REFRESH_ACTIVE = 60 # 1 minute |
| 99 | BATCH = 10 # 10 actions per round |
| 100 | QUEUE_SIZE = 2000 |
| 101 | RECOVERY_TIME = 5 # Sleep 5s to leave the system some time to recover |
| 102 | MAX_RECOVERY_TIME = 180 |
| 103 | WAITING_TIME = 1 # Wait 1s for taks to arrive, when there are none |
| 104 | |
| 105 | def __init__(self, persistence, wim_account, logger=None, ovim=None): |
| 106 | """Init a thread. |
| 107 | |
| 108 | Arguments: |
| 109 | persistence: Database abstraction layer |
| 110 | wim_account: Record containing wim_account, tenant and wim |
| 111 | information. |
| 112 | """ |
| 113 | name = '{}.{}.{}'.format(wim_account['wim']['name'], |
| 114 | wim_account['name'], wim_account['uuid']) |
| 115 | super(WimThread, self).__init__(name=name) |
| 116 | |
| 117 | self.name = name |
| 118 | self.connector = None |
| 119 | self.wim_account = wim_account |
| 120 | |
| 121 | self.logger = logger or logging.getLogger('openmano.wim.'+self.name) |
| 122 | self.persist = persistence |
| 123 | self.ovim = ovim |
| 124 | |
| 125 | self.task_queue = queue.Queue(self.QUEUE_SIZE) |
| 126 | |
| 127 | self.refresh_tasks = [] |
| 128 | """Time ordered task list for refreshing the status of WIM nets""" |
| 129 | |
| 130 | self.pending_tasks = [] |
| 131 | """Time ordered task list for creation, deletion of WIM nets""" |
| 132 | |
| 133 | self.grouped_tasks = {} |
| 134 | """ It contains all the creation/deletion pending tasks grouped by |
| 135 | its concrete vm, net, etc |
| 136 | |
| 137 | <item><item_id>: |
| 138 | - <task1> # e.g. CREATE task |
| 139 | <task2> # e.g. DELETE task |
| 140 | """ |
| 141 | |
| 142 | self._insert_task = { |
| 143 | PENDING: partial(self.schedule, list_name='pending'), |
| 144 | REFRESH: partial(self.schedule, list_name='refresh'), |
| 145 | IGNORE: lambda task, *_, **__: task.save(self.persist)} |
| 146 | """Send the task to the right processing queue""" |
| 147 | |
| 148 | def on_start(self): |
| 149 | """Run a series of procedures every time the thread (re)starts""" |
| 150 | self.connector = self.get_connector() |
| 151 | self.reload_actions() |
| 152 | |
| 153 | def get_connector(self): |
| 154 | """Create an WimConnector instance according to the wim.type""" |
| 155 | error_msg = '' |
| 156 | account_id = self.wim_account['uuid'] |
| 157 | try: |
| 158 | account = self.persist.get_wim_account_by( |
| 159 | uuid=account_id, hide=None) # Credentials need to be available |
| 160 | wim = account['wim'] |
| 161 | mapping = self.persist.query('wim_port_mappings', |
| 162 | WHERE={'wim_id': wim['uuid']}, |
| 163 | error_if_none=False) |
| 164 | return CONNECTORS[wim['type']](wim, account, { |
| 165 | 'service_endpoint_mapping': mapping or [] |
| 166 | }) |
| 167 | except DbBaseException as ex: |
| 168 | error_msg = ('Error when retrieving WIM account ({})\n' |
| 169 | .format(account_id)) + str(ex) |
| 170 | self.logger.error(error_msg, exc_info=True) |
| 171 | except KeyError as ex: |
| 172 | error_msg = ('Unable to find the WIM connector for WIM ({})\n' |
| 173 | .format(wim['type'])) + str(ex) |
| 174 | self.logger.error(error_msg, exc_info=True) |
| 175 | except (WimConnectorError, Exception) as ex: |
| 176 | # TODO: Remove the Exception class here when the connector class is |
| 177 | # ready |
| 178 | error_msg = ('Error when loading WIM connector for WIM ({})\n' |
| 179 | .format(wim['type'])) + str(ex) |
| 180 | self.logger.error(error_msg, exc_info=True) |
| 181 | |
| 182 | error_msg_extra = ('Any task targeting WIM account {} ({}) will fail.' |
| 183 | .format(account_id, self.wim_account.get('name'))) |
| 184 | self.logger.warning(error_msg_extra) |
| 185 | return FailingConnector(error_msg + '\n' + error_msg_extra) |
| 186 | |
| 187 | @contextmanager |
| 188 | def avoid_exceptions(self): |
| 189 | """Make a real effort to keep the thread alive, by avoiding the |
| 190 | exceptions. They are instead logged as a critical errors. |
| 191 | """ |
| 192 | try: |
| 193 | yield |
| 194 | except Exception as ex: |
| 195 | self.logger.critical("Unexpected exception %s", ex, exc_info=True) |
| 196 | sleep(self.RECOVERY_TIME) |
| 197 | |
| 198 | def reload_actions(self, group_limit=100): |
| 199 | """Read actions from database and reload them at memory. |
| 200 | |
| 201 | This method will clean and reload the attributes ``refresh_tasks``, |
| 202 | ``pending_tasks`` and ``grouped_tasks`` |
| 203 | |
| 204 | Attributes: |
| 205 | group_limit (int): maximum number of action groups (those that |
| 206 | refer to the same ``<item, item_id>``) to be retrieved from the |
| 207 | database in each batch. |
| 208 | """ |
| 209 | |
| 210 | # First we clean the cache to let the garbage collector work |
| 211 | self.refresh_tasks = [] |
| 212 | self.pending_tasks = [] |
| 213 | self.grouped_tasks = {} |
| 214 | |
| 215 | offset = 0 |
| 216 | |
| 217 | while True: |
| 218 | # Do things in batches |
| 219 | task_groups = self.persist.get_actions_in_groups( |
| 220 | self.wim_account['uuid'], item_types=('instance_wim_nets',), |
| 221 | group_offset=offset, group_limit=group_limit) |
| 222 | offset += (group_limit - 1) # Update for the next batch |
| 223 | |
| 224 | if not task_groups: |
| 225 | break |
| 226 | |
| 227 | pending_groups = (g for _, g in task_groups if is_pending_group(g)) |
| 228 | |
| 229 | for task_list in pending_groups: |
| 230 | with self.avoid_exceptions(): |
| 231 | self.insert_pending_tasks(filter_pending_tasks(task_list)) |
| 232 | |
| 233 | self.logger.debug( |
| 234 | 'Reloaded wim actions pending: %d refresh: %d', |
| 235 | len(self.pending_tasks), len(self.refresh_tasks)) |
| 236 | |
| 237 | def insert_pending_tasks(self, task_list): |
| 238 | """Insert task in the list of actions being processed""" |
| 239 | task_list = [action_from(task, self.logger) for task in task_list] |
| 240 | |
| 241 | for task in task_list: |
| 242 | group = task.group_key |
| 243 | self.grouped_tasks.setdefault(group, []) |
| 244 | # Each task can try to supersede the other ones, |
| 245 | # but just DELETE actions will actually do |
| 246 | task.supersede(self.grouped_tasks[group]) |
| 247 | self.grouped_tasks[group].append(task) |
| 248 | |
| 249 | # We need a separate loop so each task can check all the other |
| 250 | # ones before deciding |
| 251 | for task in task_list: |
| 252 | self._insert_task[task.processing](task) |
| 253 | self.logger.debug('Insert WIM task: %s (%s): %s %s', |
| 254 | task.id, task.status, task.action, task.item) |
| 255 | |
| 256 | def schedule(self, task, when=None, list_name='pending'): |
| 257 | """Insert a task in the correct list, respecting the schedule. |
| 258 | The refreshing list is ordered by threshold_time (task.process_at) |
| 259 | It is assumed that this is called inside this thread |
| 260 | |
| 261 | Arguments: |
| 262 | task (Action): object representing the task. |
| 263 | This object must implement the ``process`` method and inherit |
| 264 | from the ``Action`` class |
| 265 | list_name: either 'refresh' or 'pending' |
| 266 | when (float): unix time in seconds since as a float number |
| 267 | """ |
| 268 | processing_list = {'refresh': self.refresh_tasks, |
| 269 | 'pending': self.pending_tasks}[list_name] |
| 270 | |
| 271 | when = when or time() |
| 272 | task.process_at = when |
| 273 | |
| 274 | schedule = (t.process_at for t in processing_list) |
| 275 | index = len(list(takewhile(lambda moment: moment <= when, schedule))) |
| 276 | |
| 277 | processing_list.insert(index, task) |
| 278 | self.logger.debug( |
| 279 | 'Schedule of %s in "%s" - waiting position: %d (%f)', |
| 280 | task.id, list_name, index, task.process_at) |
| 281 | |
| 282 | return task |
| 283 | |
| 284 | def process_list(self, list_name='pending'): |
| 285 | """Process actions in batches and reschedule them if necessary""" |
| 286 | task_list, handler = { |
| 287 | 'refresh': (self.refresh_tasks, self._refresh_single), |
| 288 | 'pending': (self.pending_tasks, self._process_single)}[list_name] |
| 289 | |
| 290 | now = time() |
| 291 | waiting = ((i, task) for i, task in enumerate(task_list) |
| 292 | if task.process_at is None or task.process_at <= now) |
| 293 | |
| 294 | is_superseded = pipe(itemgetter(1), attrgetter('is_superseded')) |
| 295 | superseded, active = partition(is_superseded, waiting) |
| 296 | superseded = [(i, t.save(self.persist)) for i, t in superseded] |
| 297 | |
| 298 | batch = islice(active, self.BATCH) |
| 299 | refreshed = [(i, handler(t)) for i, t in batch] |
| 300 | |
| 301 | # Since pop changes the indexes in the list, we need to do it backwards |
| 302 | remove = sorted([i for i, _ in chain(refreshed, superseded)]) |
| 303 | return len([task_list.pop(i) for i in reversed(remove)]) |
| 304 | |
| 305 | def _refresh_single(self, task): |
| 306 | """Refresh just a single task, and reschedule it if necessary""" |
| 307 | now = time() |
| 308 | |
| 309 | result = task.refresh(self.connector, self.persist) |
| 310 | self.logger.debug('Refreshing WIM task: %s (%s): %s %s => %r', |
| 311 | task.id, task.status, task.action, task.item, result) |
| 312 | |
| 313 | interval = self.REFRESH_BUILD if task.is_build else self.REFRESH_ACTIVE |
| 314 | self.schedule(task, now + interval, 'refresh') |
| 315 | |
| 316 | return result |
| 317 | |
| 318 | def _process_single(self, task): |
| 319 | """Process just a single task, and reschedule it if necessary""" |
| 320 | now = time() |
| 321 | |
| 322 | result = task.process(self.connector, self.persist, self.ovim) |
| 323 | self.logger.debug('Executing WIM task: %s (%s): %s %s => %r', |
| 324 | task.id, task.status, task.action, task.item, result) |
| 325 | |
| 326 | if task.action == 'DELETE': |
| 327 | del self.grouped_tasks[task.group_key] |
| 328 | |
| 329 | self._insert_task[task.processing](task, now + self.RETRY_SCHEDULED) |
| 330 | |
| 331 | return result |
| 332 | |
| 333 | def insert_task(self, task): |
| 334 | """Send a message to the running thread |
| 335 | |
| 336 | This function is supposed to be called outside of the WIM Thread. |
| 337 | |
| 338 | Arguments: |
| 339 | task (str or dict): `"exit"`, `"reload"` or dict representing a |
| 340 | task. For more information about the fields in task, please |
| 341 | check the Action class. |
| 342 | """ |
| 343 | try: |
| 344 | self.task_queue.put(task, False) |
| 345 | return None |
| 346 | except queue.Full: |
| 347 | ex = QueueFull(self.name) |
| 348 | reraise(ex.__class__, ex, exc_info()[2]) |
| 349 | |
| 350 | def reload(self): |
| 351 | """Send a message to the running thread to reload itself""" |
| 352 | self.insert_task('reload') |
| 353 | |
| 354 | def exit(self): |
| 355 | """Send a message to the running thread to kill itself""" |
| 356 | self.insert_task('exit') |
| 357 | |
| 358 | def run(self): |
| 359 | self.logger.debug('Starting: %s', self.name) |
| 360 | recovery_time = 0 |
| 361 | while True: |
| 362 | self.on_start() |
| 363 | reload_thread = False |
| 364 | self.logger.debug('Reloaded: %s', self.name) |
| 365 | |
| 366 | while True: |
| 367 | with self.avoid_exceptions(): |
| 368 | while not self.task_queue.empty(): |
| 369 | task = self.task_queue.get() |
| 370 | if isinstance(task, dict): |
| 371 | self.insert_pending_tasks([task]) |
| 372 | elif isinstance(task, list): |
| 373 | self.insert_pending_tasks(task) |
| 374 | elif isinstance(task, str): |
| 375 | if task == 'exit': |
| 376 | self.logger.debug('Finishing: %s', self.name) |
| 377 | return 0 |
| 378 | elif task == 'reload': |
| 379 | reload_thread = True |
| 380 | break |
| 381 | self.task_queue.task_done() |
| 382 | |
| 383 | if reload_thread: |
| 384 | break |
| 385 | |
| 386 | if not(self.process_list('pending') + |
| 387 | self.process_list('refresh')): |
| 388 | sleep(self.WAITING_TIME) |
| 389 | |
| 390 | if isinstance(self.connector, FailingConnector): |
| 391 | # Wait sometime to try instantiating the connector |
| 392 | # again and restart |
| 393 | # Increase the recovery time if restarting is not |
| 394 | # working (up to a limit) |
| 395 | recovery_time = min(self.MAX_RECOVERY_TIME, |
| 396 | recovery_time + self.RECOVERY_TIME) |
| 397 | sleep(recovery_time) |
| 398 | break |
| 399 | else: |
| 400 | recovery_time = 0 |
| 401 | |
| 402 | self.logger.debug("Finishing") |
| 403 | |
| 404 | |
| 405 | def is_pending_group(group): |
| 406 | return all(task['action'] != 'DELETE' or |
| 407 | task['status'] == 'SCHEDULED' |
| 408 | for task in group) |
| 409 | |
| 410 | |
| 411 | def filter_pending_tasks(group): |
| 412 | return (t for t in group |
| 413 | if (t['status'] == 'SCHEDULED' or |
| 414 | t['action'] in ('CREATE', 'FIND'))) |
| 415 | |
| 416 | |
| 417 | def action_from(record, logger=None, mapping=ACTIONS): |
| 418 | """Create an Action object from a action record (dict) |
| 419 | |
| 420 | Arguments: |
| 421 | mapping (dict): Nested data structure that maps the relationship |
| 422 | between action properties and object constructors. This data |
| 423 | structure should be a dict with 2 levels of keys: item type and |
| 424 | action type. Example:: |
| 425 | {'wan_link': |
| 426 | {'CREATE': WanLinkCreate} |
| 427 | ...} |
| 428 | ...} |
| 429 | record (dict): action information |
| 430 | |
| 431 | Return: |
| 432 | (Action.Base): Object representing the action |
| 433 | """ |
| 434 | ensure('item' in record, Invalid('`record` should contain "item"')) |
| 435 | ensure('action' in record, Invalid('`record` should contain "action"')) |
| 436 | |
| 437 | try: |
| 438 | factory = mapping[record['item']][record['action']] |
| 439 | return factory(record, logger=logger) |
| 440 | except KeyError: |
| 441 | ex = UndefinedAction(record['item'], record['action']) |
| 442 | reraise(ex.__class__, ex, exc_info()[2]) |