| Anderson Bravalheri | 0446cd5 | 2018-08-17 15:26:19 +0100 | [diff] [blame] | 1 | # -*- coding: utf-8 -*- |
| 2 | ## |
| 3 | # Copyright 2018 University of Bristol - High Performance Networks Research |
| 4 | # Group |
| 5 | # All Rights Reserved. |
| 6 | # |
| 7 | # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique |
| 8 | # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou |
| 9 | # |
| 10 | # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| 11 | # not use this file except in compliance with the License. You may obtain |
| 12 | # a copy of the License at |
| 13 | # |
| 14 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 15 | # |
| 16 | # Unless required by applicable law or agreed to in writing, software |
| 17 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 18 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 19 | # License for the specific language governing permissions and limitations |
| 20 | # under the License. |
| 21 | # |
| 22 | # For those usages not covered by the Apache License, Version 2.0 please |
| 23 | # contact with: <highperformance-networks@bristol.ac.uk> |
| 24 | # |
| 25 | # Neither the name of the University of Bristol nor the names of its |
| 26 | # contributors may be used to endorse or promote products derived from |
| 27 | # this software without specific prior written permission. |
| 28 | # |
| 29 | # This work has been performed in the context of DCMS UK 5G Testbeds |
| 30 | # & Trials Programme and in the framework of the Metro-Haul project - |
| 31 | # funded by the European Commission under Grant number 761727 through the |
| 32 | # Horizon 2020 and 5G-PPP programmes. |
| 33 | ## |
| 34 | # pylint: disable=E1101,E0203,W0201 |
| 35 | |
| 36 | """Common logic for task management""" |
| 37 | import logging |
| 38 | from time import time |
| 39 | from types import StringTypes |
| 40 | |
| 41 | from six.moves import range |
| 42 | |
| 43 | import yaml |
| 44 | |
| 45 | from ..utils import ( |
| 46 | filter_dict_keys, |
| 47 | filter_out_dict_keys, |
| 48 | merge_dicts, |
| 49 | remove_none_items, |
| 50 | truncate |
| 51 | ) |
| 52 | |
| 53 | PENDING, REFRESH, IGNORE = range(3) |
| 54 | |
| 55 | TIMEOUT = 1 * 60 * 60 # 1 hour |
| 56 | MIN_ATTEMPTS = 10 |
| 57 | |
| 58 | |
| 59 | class Action(object): |
| 60 | """Create a basic object representing the action record. |
| 61 | |
| 62 | Arguments: |
| 63 | record (dict): record as returned by the database |
| 64 | **kwargs: extra keyword arguments to overwrite the fields in record |
| 65 | """ |
| 66 | |
| 67 | PROPERTIES = [ |
| 68 | 'task_index', # MD - Index number of the task. |
| 69 | # This together with the instance_action_id |
| 70 | # forms a unique key identifier |
| 71 | 'action', # MD - CREATE, DELETE, FIND |
| 72 | 'item', # MD - table name, eg. instance_wim_nets |
| 73 | 'item_id', # MD - uuid of the referenced entry in the |
| 74 | # previous table |
| 75 | 'instance_action_id', # MD - reference to a cohesive group of actions |
| 76 | # related to the same instance-scenario |
| 77 | 'wim_account_id', # MD - reference to the WIM account used |
| 78 | # by the thread/connector |
| 79 | 'wim_internal_id', # MD - internal ID used by the WIM to refer to |
| 80 | # the item |
| 81 | 'datacenter_vim_id', # MD - reference to the VIM account used |
| 82 | # by the thread/connector |
| 83 | 'vim_id', # MD - internal ID used by the VIM to refer to |
| 84 | # the item |
| 85 | 'status', # MD - SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED |
| 86 | 'extra', # MD - text with yaml format at database, |
| 87 | # dict at memory with: |
| 88 | # `- params: list with the params to be sent to the VIM for CREATE |
| 89 | # or FIND. For DELETE the vim_id is taken from other |
| 90 | # related tasks |
| 91 | # `- find: (only for CREATE tasks) if present it should FIND |
| 92 | # before creating and use if existing. |
| 93 | # Contains the FIND params |
| 94 | # `- depends_on: list with the 'task_index'es of tasks that must be |
| 95 | # completed before. e.g. a vm creation depends on a net |
| 96 | # creation |
| 97 | # `- sdn_net_id: used for net. |
| 98 | # `- tries |
| 99 | # `- created_items: |
| 100 | # dictionary with extra elements created that need |
| 101 | # to be deleted. e.g. ports, |
| 102 | # `- volumes,... |
| 103 | # `- created: False if the VIM element is not created by |
| 104 | # other actions, and it should not be deleted |
| 105 | # `- wim_status: WIM status of the element. Stored also at database |
| 106 | # in the item table |
| 107 | 'params', # M - similar to extra[params] |
| 108 | 'depends_on', # M - similar to extra[depends_on] |
| 109 | 'depends', # M - dict with task_index(from depends_on) to |
| 110 | # task class |
| 111 | 'error_msg', # MD - descriptive text upon an error |
| 112 | 'created_at', # MD - task DB creation time |
| 113 | 'modified_at', # MD - last DB update time |
| 114 | 'process_at', # M - unix epoch when to process the task |
| 115 | ] |
| 116 | |
| 117 | __slots__ = PROPERTIES + [ |
| 118 | 'logger', |
| 119 | ] |
| 120 | |
| 121 | def __init__(self, record, logger=None, **kwargs): |
| 122 | self.logger = logger or logging.getLogger('openmano.wim.action') |
| 123 | attrs = merge_dicts(dict.fromkeys(self.PROPERTIES), record, kwargs) |
| 124 | self.update(_expand_extra(attrs)) |
| 125 | |
| 126 | def __repr__(self): |
| 127 | return super(Action, self).__repr__() + repr(self.as_dict()) |
| 128 | |
| 129 | def as_dict(self, *fields): |
| 130 | """Representation of the object as a dict""" |
| 131 | attrs = (set(self.PROPERTIES) & set(fields) |
| 132 | if fields else self.PROPERTIES) |
| 133 | return {k: getattr(self, k) for k in attrs} |
| 134 | |
| 135 | def as_record(self): |
| 136 | """Returns a dict that can be send to the persistence layer""" |
| 137 | special = ['params', 'depends_on', 'depends'] |
| 138 | record = self.as_dict() |
| 139 | record['extra'].update(self.as_dict(*special)) |
| 140 | non_fields = special + ['process_at'] |
| 141 | |
| 142 | return remove_none_items(filter_out_dict_keys(record, non_fields)) |
| 143 | |
| 144 | def update(self, values=None, **kwargs): |
| 145 | """Update the in-memory representation of the task (works similarly to |
| 146 | dict.update). The update is NOT automatically persisted. |
| 147 | """ |
| 148 | # "white-listed mass assignment" |
| 149 | updates = merge_dicts(values, kwargs) |
| 150 | for attr in set(self.PROPERTIES) & set(updates.keys()): |
| 151 | setattr(self, attr, updates[attr]) |
| 152 | |
| 153 | def save(self, persistence, **kwargs): |
| 154 | """Persist current state of the object to the database. |
| 155 | |
| 156 | Arguments: |
| 157 | persistence: object encapsulating the database |
| 158 | **kwargs: extra properties to be updated before saving |
| 159 | |
| 160 | Note: |
| 161 | If any key word argument is passed, the object itself will be |
| 162 | changed as an extra side-effect. |
| 163 | """ |
| 164 | action_id = self.instance_action_id |
| 165 | index = self.task_index |
| 166 | if kwargs: |
| 167 | self.update(kwargs) |
| 168 | properties = self.as_record() |
| 169 | |
| 170 | return persistence.update_action(action_id, index, properties) |
| 171 | |
| 172 | def fail(self, persistence, reason, status='FAILED'): |
| 173 | """Mark action as FAILED, updating tables accordingly""" |
| 174 | persistence.update_instance_action_counters( |
| 175 | self.instance_action_id, |
| 176 | failed=1, |
| 177 | done=(-1 if self.status == 'DONE' else 0)) |
| 178 | |
| 179 | self.status = status |
| 180 | self.error_msg = truncate(reason) |
| 181 | self.logger.error('%s %s: %s', self.id, status, reason) |
| 182 | return self.save(persistence) |
| 183 | |
| 184 | def succeed(self, persistence, status='DONE'): |
| 185 | """Mark action as DONE, updating tables accordingly""" |
| 186 | persistence.update_instance_action_counters( |
| 187 | self.instance_action_id, done=1) |
| 188 | self.status = status |
| 189 | self.logger.debug('%s %s', self.id, status) |
| 190 | return self.save(persistence) |
| 191 | |
| 192 | def defer(self, persistence, reason, |
| 193 | timeout=TIMEOUT, min_attempts=MIN_ATTEMPTS): |
| 194 | """Postpone the task processing, taking care to not timeout. |
| 195 | |
| 196 | Arguments: |
| 197 | persistence: object encapsulating the database |
| 198 | reason (str): explanation for the delay |
| 199 | timeout (int): maximum delay tolerated since the first attempt. |
| 200 | Note that this number is a time delta, in seconds |
| 201 | min_attempts (int): Number of attempts to try before giving up. |
| 202 | """ |
| 203 | now = time() |
| 204 | last_attempt = self.extra.get('last_attempted_at') or time() |
| 205 | attempts = self.extra.get('attempts') or 0 |
| 206 | |
| 207 | if last_attempt - now > timeout and attempts > min_attempts: |
| 208 | self.fail(persistence, |
| 209 | 'Timeout reached. {} attempts in the last {:d} min' |
| 210 | .format(attempts, last_attempt / 60)) |
| 211 | |
| 212 | self.extra['last_attempted_at'] = time() |
| 213 | self.extra['attempts'] = attempts + 1 |
| 214 | self.logger.info('%s DEFERRED: %s', self.id, reason) |
| 215 | return self.save(persistence) |
| 216 | |
| 217 | @property |
| 218 | def group_key(self): |
| 219 | """Key defining the group to which this tasks belongs""" |
| 220 | return (self.item, self.item_id) |
| 221 | |
| 222 | @property |
| 223 | def processing(self): |
| 224 | """Processing status for the task (PENDING, REFRESH, IGNORE)""" |
| 225 | if self.status == 'SCHEDULED': |
| 226 | return PENDING |
| 227 | |
| 228 | return IGNORE |
| 229 | |
| 230 | @property |
| 231 | def id(self): |
| 232 | """Unique identifier of this particular action""" |
| 233 | return '{}[{}]'.format(self.instance_action_id, self.task_index) |
| 234 | |
| 235 | @property |
| 236 | def is_scheduled(self): |
| 237 | return self.status == 'SCHEDULED' |
| 238 | |
| 239 | @property |
| 240 | def is_build(self): |
| 241 | return self.status == 'BUILD' |
| 242 | |
| 243 | @property |
| 244 | def is_done(self): |
| 245 | return self.status == 'DONE' |
| 246 | |
| 247 | @property |
| 248 | def is_failed(self): |
| 249 | return self.status == 'FAILED' |
| 250 | |
| 251 | @property |
| 252 | def is_superseded(self): |
| 253 | return self.status == 'SUPERSEDED' |
| 254 | |
| 255 | def refresh(self, connector, persistence): |
| 256 | """Use the connector/persistence to refresh the status of the item. |
| 257 | |
| 258 | After the item status is refreshed any change in the task should be |
| 259 | persisted to the database. |
| 260 | |
| 261 | Arguments: |
| 262 | connector: object containing the classes to access the WIM or VIM |
| 263 | persistence: object containing the methods necessary to query the |
| 264 | database and to persist the updates |
| 265 | """ |
| 266 | self.logger.debug( |
| 267 | 'Action `%s` has no refresh to be done', |
| 268 | self.__class__.__name__) |
| 269 | |
| 270 | def expand_dependency_links(self, task_group): |
| 271 | """Expand task indexes into actual IDs""" |
| 272 | if not self.depends_on or ( |
| 273 | isinstance(self.depends, dict) and self.depends): |
| 274 | return |
| 275 | |
| 276 | num_tasks = len(task_group) |
| 277 | references = { |
| 278 | "TASK-{}".format(i): task_group[i] |
| 279 | for i in self.depends_on |
| 280 | if i < num_tasks and task_group[i].task_index == i and |
| 281 | task_group[i].instance_action_id == self.instance_action_id |
| 282 | } |
| 283 | self.depends = references |
| 284 | |
| 285 | def become_superseded(self, superseding): |
| 286 | """When another action tries to supersede this one, |
| 287 | we need to change both of them, so the surviving actions will be |
| 288 | logic consistent. |
| 289 | |
| 290 | This method should do the required internal changes, and also |
| 291 | suggest changes for the other, superseding, action. |
| 292 | |
| 293 | Arguments: |
| 294 | superseding: other task superseding this one |
| 295 | |
| 296 | Returns: |
| 297 | dict: changes suggested to the action superseding this one. |
| 298 | A special key ``superseding_needed`` is used to |
| 299 | suggest if the superseding is actually required or not. |
| 300 | If not present, ``superseding_needed`` is assumed to |
| 301 | be False. |
| 302 | """ |
| 303 | self.status = 'SUPERSEDED' |
| 304 | self.logger.debug( |
| 305 | 'Action `%s` was superseded by `%s`', |
| 306 | self.__class__.__name__, superseding.__class__.__name__) |
| 307 | return {} |
| 308 | |
| 309 | def supersede(self, others): |
| 310 | """Supersede other tasks, if necessary |
| 311 | |
| 312 | Arguments: |
| 313 | others (list): action objects being superseded |
| 314 | |
| 315 | When the task decide to supersede others, this method should call |
| 316 | ``become_superseded`` on the other actions, collect the suggested |
| 317 | updates and perform the necessary changes |
| 318 | """ |
| 319 | # By default actions don't supersede others |
| 320 | self.logger.debug( |
| 321 | 'Action `%s` does not supersede other actions', |
| 322 | self.__class__.__name__) |
| 323 | |
| 324 | def process(self, connector, persistence, ovim): |
| 325 | """Abstract method, that needs to be implemented. |
| 326 | Process the current task. |
| 327 | |
| 328 | Arguments: |
| 329 | connector: object with API for accessing the WAN |
| 330 | Infrastructure Manager system |
| 331 | persistence: abstraction layer for the database |
| 332 | ovim: instance of openvim, abstraction layer that enable |
| 333 | SDN-related operations |
| 334 | """ |
| 335 | raise NotImplementedError |
| 336 | |
| 337 | |
| 338 | class FindAction(Action): |
| 339 | """Abstract class that should be inherited for FIND actions, depending on |
| 340 | the item type. |
| 341 | """ |
| 342 | @property |
| 343 | def processing(self): |
| 344 | if self.status in ('DONE', 'BUILD'): |
| 345 | return REFRESH |
| 346 | |
| 347 | return super(FindAction, self).processing |
| 348 | |
| 349 | def become_superseded(self, superseding): |
| 350 | super(FindAction, self).become_superseded(superseding) |
| 351 | info = ('vim_id', 'wim_internal_id') |
| 352 | return remove_none_items({f: getattr(self, f) for f in info}) |
| 353 | |
| 354 | |
| 355 | class CreateAction(Action): |
| 356 | """Abstract class that should be inherited for CREATE actions, depending on |
| 357 | the item type. |
| 358 | """ |
| 359 | @property |
| 360 | def processing(self): |
| 361 | if self.status in ('DONE', 'BUILD'): |
| 362 | return REFRESH |
| 363 | |
| 364 | return super(CreateAction, self).processing |
| 365 | |
| 366 | def become_superseded(self, superseding): |
| 367 | super(CreateAction, self).become_superseded(superseding) |
| 368 | |
| 369 | created = self.extra.get('created', True) |
| 370 | sdn_net_id = self.extra.get('sdn_net_id') |
| 371 | pending_info = self.wim_internal_id or self.vim_id or sdn_net_id |
| 372 | if not(created and pending_info): |
| 373 | return {} |
| 374 | |
| 375 | extra_fields = ('sdn_net_id', 'interfaces', 'created_items') |
| 376 | extra_info = filter_dict_keys(self.extra or {}, extra_fields) |
| 377 | |
| 378 | return {'superseding_needed': True, |
| 379 | 'wim_internal_id': self.wim_internal_id, |
| 380 | 'vim_id': self.vim_id, |
| 381 | 'extra': remove_none_items(extra_info)} |
| 382 | |
| 383 | |
| 384 | class DeleteAction(Action): |
| 385 | """Abstract class that should be inherited for DELETE actions, depending on |
| 386 | the item type. |
| 387 | """ |
| 388 | def supersede(self, others): |
| 389 | self.logger.debug('%s %s %s %s might supersede other actions', |
| 390 | self.id, self.action, self.item, self.item_id) |
| 391 | # First collect all the changes from the superseded tasks |
| 392 | changes = [other.become_superseded(self) for other in others] |
| 393 | needed = any(change.pop('superseding_needed', False) |
| 394 | for change in changes) |
| 395 | |
| 396 | # Deal with the nested ones first |
| 397 | extras = [change.pop('extra', None) or {} for change in changes] |
| 398 | items = [extra.pop('created_items', None) or {} for extra in extras] |
| 399 | items = merge_dicts(self.extra.get('created_items', {}), *items) |
| 400 | self.extra = merge_dicts(self.extra, {'created_items': items}, *extras) |
| 401 | |
| 402 | # Accept the other ones |
| 403 | change = ((key, value) for key, value in merge_dicts(*changes).items() |
| 404 | if key in self.PROPERTIES) |
| 405 | for attr, value in change: |
| 406 | setattr(self, attr, value) |
| 407 | |
| 408 | # Reevaluate if the action itself is needed |
| 409 | if not needed: |
| 410 | self.status = 'SUPERSEDED' |
| 411 | |
| 412 | |
| 413 | def _expand_extra(record): |
| 414 | extra = record.pop('extra', None) or {} |
| 415 | if isinstance(extra, StringTypes): |
| 416 | extra = yaml.safe_load(extra) |
| 417 | |
| 418 | record['params'] = extra.get('params') |
| 419 | record['depends_on'] = extra.get('depends_on', []) |
| 420 | record['depends'] = extra.get('depends', None) |
| 421 | record['extra'] = extra |
| 422 | |
| 423 | return record |