--- /dev/null
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+# pylint: disable=E1101,E0203,W0201
+
+"""Common logic for task management"""
+import logging
+from time import time
+from types import StringTypes
+
+from six.moves import range
+
+import yaml
+
+from ..utils import (
+ filter_dict_keys,
+ filter_out_dict_keys,
+ merge_dicts,
+ remove_none_items,
+ truncate
+)
+
+PENDING, REFRESH, IGNORE = range(3)
+
+TIMEOUT = 1 * 60 * 60 # 1 hour
+MIN_ATTEMPTS = 10
+
+
+class Action(object):
+ """Create a basic object representing the action record.
+
+ Arguments:
+ record (dict): record as returned by the database
+ **kwargs: extra keyword arguments to overwrite the fields in record
+ """
+
+ PROPERTIES = [
+ 'task_index', # MD - Index number of the task.
+ # This together with the instance_action_id
+ # forms a unique key identifier
+ 'action', # MD - CREATE, DELETE, FIND
+ 'item', # MD - table name, eg. instance_wim_nets
+ 'item_id', # MD - uuid of the referenced entry in the
+ # previous table
+ 'instance_action_id', # MD - reference to a cohesive group of actions
+ # related to the same instance-scenario
+ 'wim_account_id', # MD - reference to the WIM account used
+ # by the thread/connector
+ 'wim_internal_id', # MD - internal ID used by the WIM to refer to
+ # the item
+ 'datacenter_vim_id', # MD - reference to the VIM account used
+ # by the thread/connector
+ 'vim_id', # MD - internal ID used by the VIM to refer to
+ # the item
+ 'status', # MD - SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
+ 'extra', # MD - text with yaml format at database,
+ # dict at memory with:
+ # `- params: list with the params to be sent to the VIM for CREATE
+ # or FIND. For DELETE the vim_id is taken from other
+ # related tasks
+ # `- find: (only for CREATE tasks) if present it should FIND
+ # before creating and use if existing.
+ # Contains the FIND params
+ # `- depends_on: list with the 'task_index'es of tasks that must be
+ # completed before. e.g. a vm creation depends on a net
+ # creation
+ # `- sdn_net_id: used for net.
+ # `- tries
+ # `- created_items:
+ # dictionary with extra elements created that need
+ # to be deleted. e.g. ports,
+ # `- volumes,...
+ # `- created: False if the VIM element is not created by
+ # other actions, and it should not be deleted
+ # `- wim_status: WIM status of the element. Stored also at database
+ # in the item table
+ 'params', # M - similar to extra[params]
+ 'depends_on', # M - similar to extra[depends_on]
+ 'depends', # M - dict with task_index(from depends_on) to
+ # task class
+ 'error_msg', # MD - descriptive text upon an error
+ 'created_at', # MD - task DB creation time
+ 'modified_at', # MD - last DB update time
+ 'process_at', # M - unix epoch when to process the task
+ ]
+
+ __slots__ = PROPERTIES + [
+ 'logger',
+ ]
+
+ def __init__(self, record, logger=None, **kwargs):
+ self.logger = logger or logging.getLogger('openmano.wim.action')
+ attrs = merge_dicts(dict.fromkeys(self.PROPERTIES), record, kwargs)
+ self.update(_expand_extra(attrs))
+
+ def __repr__(self):
+ return super(Action, self).__repr__() + repr(self.as_dict())
+
+ def as_dict(self, *fields):
+ """Representation of the object as a dict"""
+ attrs = (set(self.PROPERTIES) & set(fields)
+ if fields else self.PROPERTIES)
+ return {k: getattr(self, k) for k in attrs}
+
+ def as_record(self):
+ """Returns a dict that can be send to the persistence layer"""
+ special = ['params', 'depends_on', 'depends']
+ record = self.as_dict()
+ record['extra'].update(self.as_dict(*special))
+ non_fields = special + ['process_at']
+
+ return remove_none_items(filter_out_dict_keys(record, non_fields))
+
+ def update(self, values=None, **kwargs):
+ """Update the in-memory representation of the task (works similarly to
+ dict.update). The update is NOT automatically persisted.
+ """
+ # "white-listed mass assignment"
+ updates = merge_dicts(values, kwargs)
+ for attr in set(self.PROPERTIES) & set(updates.keys()):
+ setattr(self, attr, updates[attr])
+
+ def save(self, persistence, **kwargs):
+ """Persist current state of the object to the database.
+
+ Arguments:
+ persistence: object encapsulating the database
+ **kwargs: extra properties to be updated before saving
+
+ Note:
+ If any key word argument is passed, the object itself will be
+ changed as an extra side-effect.
+ """
+ action_id = self.instance_action_id
+ index = self.task_index
+ if kwargs:
+ self.update(kwargs)
+ properties = self.as_record()
+
+ return persistence.update_action(action_id, index, properties)
+
+ def fail(self, persistence, reason, status='FAILED'):
+ """Mark action as FAILED, updating tables accordingly"""
+ persistence.update_instance_action_counters(
+ self.instance_action_id,
+ failed=1,
+ done=(-1 if self.status == 'DONE' else 0))
+
+ self.status = status
+ self.error_msg = truncate(reason)
+ self.logger.error('%s %s: %s', self.id, status, reason)
+ return self.save(persistence)
+
+ def succeed(self, persistence, status='DONE'):
+ """Mark action as DONE, updating tables accordingly"""
+ persistence.update_instance_action_counters(
+ self.instance_action_id, done=1)
+ self.status = status
+ self.logger.debug('%s %s', self.id, status)
+ return self.save(persistence)
+
+ def defer(self, persistence, reason,
+ timeout=TIMEOUT, min_attempts=MIN_ATTEMPTS):
+ """Postpone the task processing, taking care to not timeout.
+
+ Arguments:
+ persistence: object encapsulating the database
+ reason (str): explanation for the delay
+ timeout (int): maximum delay tolerated since the first attempt.
+ Note that this number is a time delta, in seconds
+ min_attempts (int): Number of attempts to try before giving up.
+ """
+ now = time()
+ last_attempt = self.extra.get('last_attempted_at') or time()
+ attempts = self.extra.get('attempts') or 0
+
+ if last_attempt - now > timeout and attempts > min_attempts:
+ self.fail(persistence,
+ 'Timeout reached. {} attempts in the last {:d} min'
+ .format(attempts, last_attempt / 60))
+
+ self.extra['last_attempted_at'] = time()
+ self.extra['attempts'] = attempts + 1
+ self.logger.info('%s DEFERRED: %s', self.id, reason)
+ return self.save(persistence)
+
+ @property
+ def group_key(self):
+ """Key defining the group to which this tasks belongs"""
+ return (self.item, self.item_id)
+
+ @property
+ def processing(self):
+ """Processing status for the task (PENDING, REFRESH, IGNORE)"""
+ if self.status == 'SCHEDULED':
+ return PENDING
+
+ return IGNORE
+
+ @property
+ def id(self):
+ """Unique identifier of this particular action"""
+ return '{}[{}]'.format(self.instance_action_id, self.task_index)
+
+ @property
+ def is_scheduled(self):
+ return self.status == 'SCHEDULED'
+
+ @property
+ def is_build(self):
+ return self.status == 'BUILD'
+
+ @property
+ def is_done(self):
+ return self.status == 'DONE'
+
+ @property
+ def is_failed(self):
+ return self.status == 'FAILED'
+
+ @property
+ def is_superseded(self):
+ return self.status == 'SUPERSEDED'
+
+ def refresh(self, connector, persistence):
+ """Use the connector/persistence to refresh the status of the item.
+
+ After the item status is refreshed any change in the task should be
+ persisted to the database.
+
+ Arguments:
+ connector: object containing the classes to access the WIM or VIM
+ persistence: object containing the methods necessary to query the
+ database and to persist the updates
+ """
+ self.logger.debug(
+ 'Action `%s` has no refresh to be done',
+ self.__class__.__name__)
+
+ def expand_dependency_links(self, task_group):
+ """Expand task indexes into actual IDs"""
+ if not self.depends_on or (
+ isinstance(self.depends, dict) and self.depends):
+ return
+
+ num_tasks = len(task_group)
+ references = {
+ "TASK-{}".format(i): task_group[i]
+ for i in self.depends_on
+ if i < num_tasks and task_group[i].task_index == i and
+ task_group[i].instance_action_id == self.instance_action_id
+ }
+ self.depends = references
+
+ def become_superseded(self, superseding):
+ """When another action tries to supersede this one,
+ we need to change both of them, so the surviving actions will be
+ logic consistent.
+
+ This method should do the required internal changes, and also
+ suggest changes for the other, superseding, action.
+
+ Arguments:
+ superseding: other task superseding this one
+
+ Returns:
+ dict: changes suggested to the action superseding this one.
+ A special key ``superseding_needed`` is used to
+ suggest if the superseding is actually required or not.
+ If not present, ``superseding_needed`` is assumed to
+ be False.
+ """
+ self.status = 'SUPERSEDED'
+ self.logger.debug(
+ 'Action `%s` was superseded by `%s`',
+ self.__class__.__name__, superseding.__class__.__name__)
+ return {}
+
+ def supersede(self, others):
+ """Supersede other tasks, if necessary
+
+ Arguments:
+ others (list): action objects being superseded
+
+ When the task decide to supersede others, this method should call
+ ``become_superseded`` on the other actions, collect the suggested
+ updates and perform the necessary changes
+ """
+ # By default actions don't supersede others
+ self.logger.debug(
+ 'Action `%s` does not supersede other actions',
+ self.__class__.__name__)
+
+ def process(self, connector, persistence, ovim):
+ """Abstract method, that needs to be implemented.
+ Process the current task.
+
+ Arguments:
+ connector: object with API for accessing the WAN
+ Infrastructure Manager system
+ persistence: abstraction layer for the database
+ ovim: instance of openvim, abstraction layer that enable
+ SDN-related operations
+ """
+ raise NotImplementedError
+
+
+class FindAction(Action):
+ """Abstract class that should be inherited for FIND actions, depending on
+ the item type.
+ """
+ @property
+ def processing(self):
+ if self.status in ('DONE', 'BUILD'):
+ return REFRESH
+
+ return super(FindAction, self).processing
+
+ def become_superseded(self, superseding):
+ super(FindAction, self).become_superseded(superseding)
+ info = ('vim_id', 'wim_internal_id')
+ return remove_none_items({f: getattr(self, f) for f in info})
+
+
+class CreateAction(Action):
+ """Abstract class that should be inherited for CREATE actions, depending on
+ the item type.
+ """
+ @property
+ def processing(self):
+ if self.status in ('DONE', 'BUILD'):
+ return REFRESH
+
+ return super(CreateAction, self).processing
+
+ def become_superseded(self, superseding):
+ super(CreateAction, self).become_superseded(superseding)
+
+ created = self.extra.get('created', True)
+ sdn_net_id = self.extra.get('sdn_net_id')
+ pending_info = self.wim_internal_id or self.vim_id or sdn_net_id
+ if not(created and pending_info):
+ return {}
+
+ extra_fields = ('sdn_net_id', 'interfaces', 'created_items')
+ extra_info = filter_dict_keys(self.extra or {}, extra_fields)
+
+ return {'superseding_needed': True,
+ 'wim_internal_id': self.wim_internal_id,
+ 'vim_id': self.vim_id,
+ 'extra': remove_none_items(extra_info)}
+
+
+class DeleteAction(Action):
+ """Abstract class that should be inherited for DELETE actions, depending on
+ the item type.
+ """
+ def supersede(self, others):
+ self.logger.debug('%s %s %s %s might supersede other actions',
+ self.id, self.action, self.item, self.item_id)
+ # First collect all the changes from the superseded tasks
+ changes = [other.become_superseded(self) for other in others]
+ needed = any(change.pop('superseding_needed', False)
+ for change in changes)
+
+ # Deal with the nested ones first
+ extras = [change.pop('extra', None) or {} for change in changes]
+ items = [extra.pop('created_items', None) or {} for extra in extras]
+ items = merge_dicts(self.extra.get('created_items', {}), *items)
+ self.extra = merge_dicts(self.extra, {'created_items': items}, *extras)
+
+ # Accept the other ones
+ change = ((key, value) for key, value in merge_dicts(*changes).items()
+ if key in self.PROPERTIES)
+ for attr, value in change:
+ setattr(self, attr, value)
+
+ # Reevaluate if the action itself is needed
+ if not needed:
+ self.status = 'SUPERSEDED'
+
+
+def _expand_extra(record):
+ extra = record.pop('extra', None) or {}
+ if isinstance(extra, StringTypes):
+ extra = yaml.safe_load(extra)
+
+ record['params'] = extra.get('params')
+ record['depends_on'] = extra.get('depends_on', [])
+ record['depends'] = extra.get('depends', None)
+ record['extra'] = extra
+
+ return record