| Anderson Bravalheri | 0446cd5 | 2018-08-17 15:26:19 +0100 | [diff] [blame] | 1 | # -*- coding: utf-8 -*- |
| 2 | ## |
| 3 | # Copyright 2018 University of Bristol - High Performance Networks Research |
| 4 | # Group |
| 5 | # All Rights Reserved. |
| 6 | # |
| 7 | # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique |
| 8 | # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou |
| 9 | # |
| 10 | # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| 11 | # not use this file except in compliance with the License. You may obtain |
| 12 | # a copy of the License at |
| 13 | # |
| 14 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 15 | # |
| 16 | # Unless required by applicable law or agreed to in writing, software |
| 17 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 18 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 19 | # License for the specific language governing permissions and limitations |
| 20 | # under the License. |
| 21 | # |
| 22 | # For those usages not covered by the Apache License, Version 2.0 please |
| 23 | # contact with: <highperformance-networks@bristol.ac.uk> |
| 24 | # |
| 25 | # Neither the name of the University of Bristol nor the names of its |
| 26 | # contributors may be used to endorse or promote products derived from |
| 27 | # this software without specific prior written permission. |
| 28 | # |
| 29 | # This work has been performed in the context of DCMS UK 5G Testbeds |
| 30 | # & Trials Programme and in the framework of the Metro-Haul project - |
| 31 | # funded by the European Commission under Grant number 761727 through the |
| 32 | # Horizon 2020 and 5G-PPP programmes. |
| 33 | ## |
| 34 | |
| 35 | """ |
| 36 | Thread-based interaction with WIMs. Tasks are stored in the |
| 37 | database (vim_wim_actions table) and processed sequentially |
| 38 | |
| 39 | Please check the Action class for information about the content of each action. |
| 40 | """ |
| 41 | |
| 42 | import logging |
| 43 | import threading |
| 44 | from contextlib import contextmanager |
| 45 | from functools import partial |
| 46 | from itertools import islice, chain, takewhile |
| 47 | from operator import itemgetter, attrgetter |
| 48 | from sys import exc_info |
| 49 | from time import time, sleep |
| 50 | |
| 51 | from six import reraise |
| 52 | from six.moves import queue |
| 53 | |
| David García | 00e29dd | 2018-12-10 09:43:50 +0100 | [diff] [blame] | 54 | from . import wan_link_actions, wimconn_odl, wimconn_dynpac # wimconn_tapi |
| Anderson Bravalheri | 0446cd5 | 2018-08-17 15:26:19 +0100 | [diff] [blame] | 55 | from ..utils import ensure, partition, pipe |
| 56 | from .actions import IGNORE, PENDING, REFRESH |
| 57 | from .errors import ( |
| 58 | DbBaseException, |
| 59 | QueueFull, |
| 60 | InvalidParameters as Invalid, |
| 61 | UndefinedAction, |
| 62 | ) |
| 63 | from .failing_connector import FailingConnector |
| 64 | from .wimconn import WimConnectorError |
| 65 | |
| 66 | ACTIONS = { |
| 67 | 'instance_wim_nets': wan_link_actions.ACTIONS |
| 68 | } |
| 69 | |
| 70 | CONNECTORS = { |
| 71 | "odl": wimconn_odl.OdlConnector, |
| 72 | # "tapi": wimconn_tapi |
| 73 | # Add extra connectors here |
| David García | 00e29dd | 2018-12-10 09:43:50 +0100 | [diff] [blame] | 74 | "dynpac": wimconn_dynpac.DynpacConnector |
| Anderson Bravalheri | 0446cd5 | 2018-08-17 15:26:19 +0100 | [diff] [blame] | 75 | } |
| 76 | |
| 77 | |
| 78 | class WimThread(threading.Thread): |
| 79 | """Specialized task queue implementation that runs in an isolated thread. |
| 80 | |
| 81 | Objects of this class have a few methods that are intended to be used |
| 82 | outside of the thread: |
| 83 | |
| 84 | - start |
| 85 | - insert_task |
| 86 | - reload |
| 87 | - exit |
| 88 | |
| 89 | All the other methods are used internally to manipulate/process the task |
| 90 | queue. |
| 91 | """ |
| 92 | RETRY_SCHEDULED = 10 # 10 seconds |
| 93 | REFRESH_BUILD = 10 # 10 seconds |
| 94 | REFRESH_ACTIVE = 60 # 1 minute |
| 95 | BATCH = 10 # 10 actions per round |
| 96 | QUEUE_SIZE = 2000 |
| 97 | RECOVERY_TIME = 5 # Sleep 5s to leave the system some time to recover |
| 98 | MAX_RECOVERY_TIME = 180 |
| 99 | WAITING_TIME = 1 # Wait 1s for taks to arrive, when there are none |
| 100 | |
| 101 | def __init__(self, persistence, wim_account, logger=None, ovim=None): |
| 102 | """Init a thread. |
| 103 | |
| 104 | Arguments: |
| 105 | persistence: Database abstraction layer |
| 106 | wim_account: Record containing wim_account, tenant and wim |
| 107 | information. |
| 108 | """ |
| 109 | name = '{}.{}.{}'.format(wim_account['wim']['name'], |
| 110 | wim_account['name'], wim_account['uuid']) |
| 111 | super(WimThread, self).__init__(name=name) |
| 112 | |
| 113 | self.name = name |
| 114 | self.connector = None |
| 115 | self.wim_account = wim_account |
| 116 | |
| 117 | self.logger = logger or logging.getLogger('openmano.wim.'+self.name) |
| 118 | self.persist = persistence |
| 119 | self.ovim = ovim |
| 120 | |
| 121 | self.task_queue = queue.Queue(self.QUEUE_SIZE) |
| 122 | |
| 123 | self.refresh_tasks = [] |
| 124 | """Time ordered task list for refreshing the status of WIM nets""" |
| 125 | |
| 126 | self.pending_tasks = [] |
| 127 | """Time ordered task list for creation, deletion of WIM nets""" |
| 128 | |
| 129 | self.grouped_tasks = {} |
| 130 | """ It contains all the creation/deletion pending tasks grouped by |
| 131 | its concrete vm, net, etc |
| 132 | |
| 133 | <item><item_id>: |
| 134 | - <task1> # e.g. CREATE task |
| 135 | <task2> # e.g. DELETE task |
| 136 | """ |
| 137 | |
| 138 | self._insert_task = { |
| 139 | PENDING: partial(self.schedule, list_name='pending'), |
| 140 | REFRESH: partial(self.schedule, list_name='refresh'), |
| 141 | IGNORE: lambda task, *_, **__: task.save(self.persist)} |
| 142 | """Send the task to the right processing queue""" |
| 143 | |
| 144 | def on_start(self): |
| 145 | """Run a series of procedures every time the thread (re)starts""" |
| 146 | self.connector = self.get_connector() |
| 147 | self.reload_actions() |
| 148 | |
| 149 | def get_connector(self): |
| 150 | """Create an WimConnector instance according to the wim.type""" |
| 151 | error_msg = '' |
| 152 | account_id = self.wim_account['uuid'] |
| 153 | try: |
| 154 | account = self.persist.get_wim_account_by( |
| 155 | uuid=account_id, hide=None) # Credentials need to be available |
| 156 | wim = account['wim'] |
| 157 | mapping = self.persist.query('wim_port_mappings', |
| 158 | WHERE={'wim_id': wim['uuid']}, |
| 159 | error_if_none=False) |
| 160 | return CONNECTORS[wim['type']](wim, account, { |
| 161 | 'service_endpoint_mapping': mapping or [] |
| 162 | }) |
| 163 | except DbBaseException as ex: |
| 164 | error_msg = ('Error when retrieving WIM account ({})\n' |
| 165 | .format(account_id)) + str(ex) |
| 166 | self.logger.error(error_msg, exc_info=True) |
| 167 | except KeyError as ex: |
| 168 | error_msg = ('Unable to find the WIM connector for WIM ({})\n' |
| 169 | .format(wim['type'])) + str(ex) |
| 170 | self.logger.error(error_msg, exc_info=True) |
| 171 | except (WimConnectorError, Exception) as ex: |
| 172 | # TODO: Remove the Exception class here when the connector class is |
| 173 | # ready |
| 174 | error_msg = ('Error when loading WIM connector for WIM ({})\n' |
| 175 | .format(wim['type'])) + str(ex) |
| 176 | self.logger.error(error_msg, exc_info=True) |
| 177 | |
| 178 | error_msg_extra = ('Any task targeting WIM account {} ({}) will fail.' |
| 179 | .format(account_id, self.wim_account.get('name'))) |
| 180 | self.logger.warning(error_msg_extra) |
| 181 | return FailingConnector(error_msg + '\n' + error_msg_extra) |
| 182 | |
| 183 | @contextmanager |
| 184 | def avoid_exceptions(self): |
| 185 | """Make a real effort to keep the thread alive, by avoiding the |
| 186 | exceptions. They are instead logged as a critical errors. |
| 187 | """ |
| 188 | try: |
| 189 | yield |
| 190 | except Exception as ex: |
| 191 | self.logger.critical("Unexpected exception %s", ex, exc_info=True) |
| 192 | sleep(self.RECOVERY_TIME) |
| 193 | |
| 194 | def reload_actions(self, group_limit=100): |
| 195 | """Read actions from database and reload them at memory. |
| 196 | |
| 197 | This method will clean and reload the attributes ``refresh_tasks``, |
| 198 | ``pending_tasks`` and ``grouped_tasks`` |
| 199 | |
| 200 | Attributes: |
| 201 | group_limit (int): maximum number of action groups (those that |
| 202 | refer to the same ``<item, item_id>``) to be retrieved from the |
| 203 | database in each batch. |
| 204 | """ |
| 205 | |
| 206 | # First we clean the cache to let the garbage collector work |
| 207 | self.refresh_tasks = [] |
| 208 | self.pending_tasks = [] |
| 209 | self.grouped_tasks = {} |
| 210 | |
| 211 | offset = 0 |
| 212 | |
| 213 | while True: |
| 214 | # Do things in batches |
| 215 | task_groups = self.persist.get_actions_in_groups( |
| 216 | self.wim_account['uuid'], item_types=('instance_wim_nets',), |
| 217 | group_offset=offset, group_limit=group_limit) |
| 218 | offset += (group_limit - 1) # Update for the next batch |
| 219 | |
| 220 | if not task_groups: |
| 221 | break |
| 222 | |
| 223 | pending_groups = (g for _, g in task_groups if is_pending_group(g)) |
| 224 | |
| 225 | for task_list in pending_groups: |
| 226 | with self.avoid_exceptions(): |
| 227 | self.insert_pending_tasks(filter_pending_tasks(task_list)) |
| 228 | |
| 229 | self.logger.debug( |
| 230 | 'Reloaded wim actions pending: %d refresh: %d', |
| 231 | len(self.pending_tasks), len(self.refresh_tasks)) |
| 232 | |
| 233 | def insert_pending_tasks(self, task_list): |
| 234 | """Insert task in the list of actions being processed""" |
| 235 | task_list = [action_from(task, self.logger) for task in task_list] |
| 236 | |
| 237 | for task in task_list: |
| 238 | group = task.group_key |
| 239 | self.grouped_tasks.setdefault(group, []) |
| 240 | # Each task can try to supersede the other ones, |
| 241 | # but just DELETE actions will actually do |
| 242 | task.supersede(self.grouped_tasks[group]) |
| 243 | self.grouped_tasks[group].append(task) |
| 244 | |
| 245 | # We need a separate loop so each task can check all the other |
| 246 | # ones before deciding |
| 247 | for task in task_list: |
| 248 | self._insert_task[task.processing](task) |
| 249 | self.logger.debug('Insert WIM task: %s (%s): %s %s', |
| 250 | task.id, task.status, task.action, task.item) |
| 251 | |
| 252 | def schedule(self, task, when=None, list_name='pending'): |
| 253 | """Insert a task in the correct list, respecting the schedule. |
| 254 | The refreshing list is ordered by threshold_time (task.process_at) |
| 255 | It is assumed that this is called inside this thread |
| 256 | |
| 257 | Arguments: |
| 258 | task (Action): object representing the task. |
| 259 | This object must implement the ``process`` method and inherit |
| 260 | from the ``Action`` class |
| 261 | list_name: either 'refresh' or 'pending' |
| 262 | when (float): unix time in seconds since as a float number |
| 263 | """ |
| 264 | processing_list = {'refresh': self.refresh_tasks, |
| 265 | 'pending': self.pending_tasks}[list_name] |
| 266 | |
| 267 | when = when or time() |
| 268 | task.process_at = when |
| 269 | |
| 270 | schedule = (t.process_at for t in processing_list) |
| 271 | index = len(list(takewhile(lambda moment: moment <= when, schedule))) |
| 272 | |
| 273 | processing_list.insert(index, task) |
| 274 | self.logger.debug( |
| 275 | 'Schedule of %s in "%s" - waiting position: %d (%f)', |
| 276 | task.id, list_name, index, task.process_at) |
| 277 | |
| 278 | return task |
| 279 | |
| 280 | def process_list(self, list_name='pending'): |
| 281 | """Process actions in batches and reschedule them if necessary""" |
| 282 | task_list, handler = { |
| 283 | 'refresh': (self.refresh_tasks, self._refresh_single), |
| 284 | 'pending': (self.pending_tasks, self._process_single)}[list_name] |
| 285 | |
| 286 | now = time() |
| 287 | waiting = ((i, task) for i, task in enumerate(task_list) |
| 288 | if task.process_at is None or task.process_at <= now) |
| 289 | |
| 290 | is_superseded = pipe(itemgetter(1), attrgetter('is_superseded')) |
| 291 | superseded, active = partition(is_superseded, waiting) |
| 292 | superseded = [(i, t.save(self.persist)) for i, t in superseded] |
| 293 | |
| 294 | batch = islice(active, self.BATCH) |
| 295 | refreshed = [(i, handler(t)) for i, t in batch] |
| 296 | |
| 297 | # Since pop changes the indexes in the list, we need to do it backwards |
| 298 | remove = sorted([i for i, _ in chain(refreshed, superseded)]) |
| 299 | return len([task_list.pop(i) for i in reversed(remove)]) |
| 300 | |
| 301 | def _refresh_single(self, task): |
| 302 | """Refresh just a single task, and reschedule it if necessary""" |
| 303 | now = time() |
| 304 | |
| 305 | result = task.refresh(self.connector, self.persist) |
| 306 | self.logger.debug('Refreshing WIM task: %s (%s): %s %s => %r', |
| 307 | task.id, task.status, task.action, task.item, result) |
| 308 | |
| 309 | interval = self.REFRESH_BUILD if task.is_build else self.REFRESH_ACTIVE |
| 310 | self.schedule(task, now + interval, 'refresh') |
| 311 | |
| 312 | return result |
| 313 | |
| 314 | def _process_single(self, task): |
| 315 | """Process just a single task, and reschedule it if necessary""" |
| 316 | now = time() |
| 317 | |
| 318 | result = task.process(self.connector, self.persist, self.ovim) |
| 319 | self.logger.debug('Executing WIM task: %s (%s): %s %s => %r', |
| 320 | task.id, task.status, task.action, task.item, result) |
| 321 | |
| 322 | if task.action == 'DELETE': |
| 323 | del self.grouped_tasks[task.group_key] |
| 324 | |
| 325 | self._insert_task[task.processing](task, now + self.RETRY_SCHEDULED) |
| 326 | |
| 327 | return result |
| 328 | |
| 329 | def insert_task(self, task): |
| 330 | """Send a message to the running thread |
| 331 | |
| 332 | This function is supposed to be called outside of the WIM Thread. |
| 333 | |
| 334 | Arguments: |
| 335 | task (str or dict): `"exit"`, `"reload"` or dict representing a |
| 336 | task. For more information about the fields in task, please |
| 337 | check the Action class. |
| 338 | """ |
| 339 | try: |
| 340 | self.task_queue.put(task, False) |
| 341 | return None |
| 342 | except queue.Full: |
| 343 | ex = QueueFull(self.name) |
| 344 | reraise(ex.__class__, ex, exc_info()[2]) |
| 345 | |
| 346 | def reload(self): |
| 347 | """Send a message to the running thread to reload itself""" |
| 348 | self.insert_task('reload') |
| 349 | |
| 350 | def exit(self): |
| 351 | """Send a message to the running thread to kill itself""" |
| 352 | self.insert_task('exit') |
| 353 | |
| 354 | def run(self): |
| 355 | self.logger.debug('Starting: %s', self.name) |
| 356 | recovery_time = 0 |
| 357 | while True: |
| 358 | self.on_start() |
| 359 | reload_thread = False |
| 360 | self.logger.debug('Reloaded: %s', self.name) |
| 361 | |
| 362 | while True: |
| 363 | with self.avoid_exceptions(): |
| 364 | while not self.task_queue.empty(): |
| 365 | task = self.task_queue.get() |
| 366 | if isinstance(task, dict): |
| 367 | self.insert_pending_tasks([task]) |
| 368 | elif isinstance(task, list): |
| 369 | self.insert_pending_tasks(task) |
| 370 | elif isinstance(task, str): |
| 371 | if task == 'exit': |
| 372 | self.logger.debug('Finishing: %s', self.name) |
| 373 | return 0 |
| 374 | elif task == 'reload': |
| 375 | reload_thread = True |
| 376 | break |
| 377 | self.task_queue.task_done() |
| 378 | |
| 379 | if reload_thread: |
| 380 | break |
| 381 | |
| 382 | if not(self.process_list('pending') + |
| 383 | self.process_list('refresh')): |
| 384 | sleep(self.WAITING_TIME) |
| 385 | |
| 386 | if isinstance(self.connector, FailingConnector): |
| 387 | # Wait sometime to try instantiating the connector |
| 388 | # again and restart |
| 389 | # Increase the recovery time if restarting is not |
| 390 | # working (up to a limit) |
| 391 | recovery_time = min(self.MAX_RECOVERY_TIME, |
| 392 | recovery_time + self.RECOVERY_TIME) |
| 393 | sleep(recovery_time) |
| 394 | break |
| 395 | else: |
| 396 | recovery_time = 0 |
| 397 | |
| 398 | self.logger.debug("Finishing") |
| 399 | |
| 400 | |
| 401 | def is_pending_group(group): |
| 402 | return all(task['action'] != 'DELETE' or |
| 403 | task['status'] == 'SCHEDULED' |
| 404 | for task in group) |
| 405 | |
| 406 | |
| 407 | def filter_pending_tasks(group): |
| 408 | return (t for t in group |
| 409 | if (t['status'] == 'SCHEDULED' or |
| 410 | t['action'] in ('CREATE', 'FIND'))) |
| 411 | |
| 412 | |
| 413 | def action_from(record, logger=None, mapping=ACTIONS): |
| 414 | """Create an Action object from a action record (dict) |
| 415 | |
| 416 | Arguments: |
| 417 | mapping (dict): Nested data structure that maps the relationship |
| 418 | between action properties and object constructors. This data |
| 419 | structure should be a dict with 2 levels of keys: item type and |
| 420 | action type. Example:: |
| 421 | {'wan_link': |
| 422 | {'CREATE': WanLinkCreate} |
| 423 | ...} |
| 424 | ...} |
| 425 | record (dict): action information |
| 426 | |
| 427 | Return: |
| 428 | (Action.Base): Object representing the action |
| 429 | """ |
| 430 | ensure('item' in record, Invalid('`record` should contain "item"')) |
| 431 | ensure('action' in record, Invalid('`record` should contain "action"')) |
| 432 | |
| 433 | try: |
| 434 | factory = mapping[record['item']][record['action']] |
| 435 | return factory(record, logger=logger) |
| 436 | except KeyError: |
| 437 | ex = UndefinedAction(record['item'], record['action']) |
| 438 | reraise(ex.__class__, ex, exc_info()[2]) |