+++ /dev/null
-# -*- coding: utf-8 -*-
-##
-# Copyright 2018 University of Bristol - High Performance Networks Research
-# Group
-# All Rights Reserved.
-#
-# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
-# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact with: <highperformance-networks@bristol.ac.uk>
-#
-# Neither the name of the University of Bristol nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# This work has been performed in the context of DCMS UK 5G Testbeds
-# & Trials Programme and in the framework of the Metro-Haul project -
-# funded by the European Commission under Grant number 761727 through the
-# Horizon 2020 and 5G-PPP programmes.
-##
-# pylint: disable=E1101,E0203,W0201
-
-"""Common logic for task management"""
-import logging
-from time import time
-
-import yaml
-
-from ..utils import (
- filter_dict_keys,
- filter_out_dict_keys,
- merge_dicts,
- remove_none_items,
- truncate
-)
-
-PENDING, REFRESH, IGNORE = range(3)
-
-TIMEOUT = 1 * 60 * 60 # 1 hour
-MIN_ATTEMPTS = 10
-
-
-class Action(object):
- """Create a basic object representing the action record.
-
- Arguments:
- record (dict): record as returned by the database
- **kwargs: extra keyword arguments to overwrite the fields in record
- """
-
- PROPERTIES = [
- 'task_index', # MD - Index number of the task.
- # This together with the instance_action_id
- # forms a unique key identifier
- 'action', # MD - CREATE, DELETE, FIND
- 'item', # MD - table name, eg. instance_wim_nets
- 'item_id', # MD - uuid of the referenced entry in the
- # previous table
- 'instance_action_id', # MD - reference to a cohesive group of actions
- # related to the same instance-scenario
- 'wim_account_id', # MD - reference to the WIM account used
- # by the thread/connector
- 'wim_internal_id', # MD - internal ID used by the WIM to refer to
- # the item
- 'datacenter_vim_id', # MD - reference to the VIM account used
- # by the thread/connector
- 'vim_id', # MD - internal ID used by the VIM to refer to
- # the item
- 'status', # MD - SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
- 'extra', # MD - text with yaml format at database,
- # dict at memory with:
- # `- params: list with the params to be sent to the VIM for CREATE
- # or FIND. For DELETE the vim_id is taken from other
- # related tasks
- # `- find: (only for CREATE tasks) if present it should FIND
- # before creating and use if existing.
- # Contains the FIND params
- # `- depends_on: list with the 'task_index'es of tasks that must be
- # completed before. e.g. a vm creation depends on a net
- # creation
- # `- sdn_net_id: used for net.
- # `- tries
- # `- created_items:
- # dictionary with extra elements created that need
- # to be deleted. e.g. ports,
- # `- volumes,...
- # `- created: False if the VIM element is not created by
- # other actions, and it should not be deleted
- # `- wim_status: WIM status of the element. Stored also at database
- # in the item table
- 'params', # M - similar to extra[params]
- 'depends_on', # M - similar to extra[depends_on]
- 'depends', # M - dict with task_index(from depends_on) to
- # task class
- 'error_msg', # MD - descriptive text upon an error
- 'created_at', # MD - task DB creation time
- 'modified_at', # MD - last DB update time
- 'process_at', # M - unix epoch when to process the task
- ]
-
- __slots__ = PROPERTIES + [
- 'logger',
- ]
-
- def __init__(self, record, logger=None, **kwargs):
- self.logger = logger or logging.getLogger('openmano.wim.action')
- attrs = merge_dicts(dict.fromkeys(self.PROPERTIES), record, kwargs)
- self.update(_expand_extra(attrs))
-
- def __repr__(self):
- return super(Action, self).__repr__() + repr(self.as_dict())
-
- def as_dict(self, *fields):
- """Representation of the object as a dict"""
- attrs = (set(self.PROPERTIES) & set(fields)
- if fields else self.PROPERTIES)
- return {k: getattr(self, k) for k in attrs}
-
- def as_record(self):
- """Returns a dict that can be send to the persistence layer"""
- special = ['params', 'depends_on', 'depends']
- record = self.as_dict()
- record['extra'].update(self.as_dict(*special))
- non_fields = special + ['process_at']
-
- return remove_none_items(filter_out_dict_keys(record, non_fields))
-
- def update(self, values=None, **kwargs):
- """Update the in-memory representation of the task (works similarly to
- dict.update). The update is NOT automatically persisted.
- """
- # "white-listed mass assignment"
- updates = merge_dicts(values, kwargs)
- for attr in set(self.PROPERTIES) & set(updates.keys()):
- setattr(self, attr, updates[attr])
-
- def save(self, persistence, **kwargs):
- """Persist current state of the object to the database.
-
- Arguments:
- persistence: object encapsulating the database
- **kwargs: extra properties to be updated before saving
-
- Note:
- If any key word argument is passed, the object itself will be
- changed as an extra side-effect.
- """
- action_id = self.instance_action_id
- index = self.task_index
- if kwargs:
- self.update(kwargs)
- properties = self.as_record()
-
- return persistence.update_action(action_id, index, properties)
-
- def fail(self, persistence, reason, status='FAILED'):
- """Mark action as FAILED, updating tables accordingly"""
- persistence.update_instance_action_counters(
- self.instance_action_id,
- failed=1,
- done=(-1 if self.status == 'DONE' else 0))
-
- self.status = status
- self.error_msg = truncate(reason)
- self.logger.error('%s %s: %s', self.id, status, reason)
- return self.save(persistence)
-
- def succeed(self, persistence, status='DONE'):
- """Mark action as DONE, updating tables accordingly"""
- persistence.update_instance_action_counters(
- self.instance_action_id, done=1)
- self.status = status
- self.logger.debug('%s %s', self.id, status)
- return self.save(persistence)
-
- def defer(self, persistence, reason,
- timeout=TIMEOUT, min_attempts=MIN_ATTEMPTS):
- """Postpone the task processing, taking care to not timeout.
-
- Arguments:
- persistence: object encapsulating the database
- reason (str): explanation for the delay
- timeout (int): maximum delay tolerated since the first attempt.
- Note that this number is a time delta, in seconds
- min_attempts (int): Number of attempts to try before giving up.
- """
- now = time()
- last_attempt = self.extra.get('last_attempted_at') or time()
- attempts = self.extra.get('attempts') or 0
-
- if last_attempt - now > timeout and attempts > min_attempts:
- self.fail(persistence,
- 'Timeout reached. {} attempts in the last {:d} min'
- .format(attempts, last_attempt / 60))
-
- self.extra['last_attempted_at'] = time()
- self.extra['attempts'] = attempts + 1
- self.logger.info('%s DEFERRED: %s', self.id, reason)
- return self.save(persistence)
-
- @property
- def group_key(self):
- """Key defining the group to which this tasks belongs"""
- return (self.item, self.item_id)
-
- @property
- def processing(self):
- """Processing status for the task (PENDING, REFRESH, IGNORE)"""
- if self.status == 'SCHEDULED':
- return PENDING
-
- return IGNORE
-
- @property
- def id(self):
- """Unique identifier of this particular action"""
- return '{}[{}]'.format(self.instance_action_id, self.task_index)
-
- @property
- def is_scheduled(self):
- return self.status == 'SCHEDULED'
-
- @property
- def is_build(self):
- return self.status == 'BUILD'
-
- @property
- def is_done(self):
- return self.status == 'DONE'
-
- @property
- def is_failed(self):
- return self.status == 'FAILED'
-
- @property
- def is_superseded(self):
- return self.status == 'SUPERSEDED'
-
- def refresh(self, connector, persistence):
- """Use the connector/persistence to refresh the status of the item.
-
- After the item status is refreshed any change in the task should be
- persisted to the database.
-
- Arguments:
- connector: object containing the classes to access the WIM or VIM
- persistence: object containing the methods necessary to query the
- database and to persist the updates
- """
- self.logger.debug(
- 'Action `%s` has no refresh to be done',
- self.__class__.__name__)
-
- def expand_dependency_links(self, task_group):
- """Expand task indexes into actual IDs"""
- if not self.depends_on or (
- isinstance(self.depends, dict) and self.depends):
- return
-
- num_tasks = len(task_group)
- references = {
- "TASK-{}".format(i): task_group[i]
- for i in self.depends_on
- if i < num_tasks and task_group[i].task_index == i and
- task_group[i].instance_action_id == self.instance_action_id
- }
- self.depends = references
-
- def become_superseded(self, superseding):
- """When another action tries to supersede this one,
- we need to change both of them, so the surviving actions will be
- logic consistent.
-
- This method should do the required internal changes, and also
- suggest changes for the other, superseding, action.
-
- Arguments:
- superseding: other task superseding this one
-
- Returns:
- dict: changes suggested to the action superseding this one.
- A special key ``superseding_needed`` is used to
- suggest if the superseding is actually required or not.
- If not present, ``superseding_needed`` is assumed to
- be False.
- """
- self.status = 'SUPERSEDED'
- self.logger.debug(
- 'Action `%s` was superseded by `%s`',
- self.__class__.__name__, superseding.__class__.__name__)
- return {}
-
- def supersede(self, others):
- """Supersede other tasks, if necessary
-
- Arguments:
- others (list): action objects being superseded
-
- When the task decide to supersede others, this method should call
- ``become_superseded`` on the other actions, collect the suggested
- updates and perform the necessary changes
- """
- # By default actions don't supersede others
- self.logger.debug(
- 'Action `%s` does not supersede other actions',
- self.__class__.__name__)
-
- def process(self, connector, persistence, ovim):
- """Abstract method, that needs to be implemented.
- Process the current task.
-
- Arguments:
- connector: object with API for accessing the WAN
- Infrastructure Manager system
- persistence: abstraction layer for the database
- ovim: instance of openvim, abstraction layer that enable
- SDN-related operations
- """
- raise NotImplementedError
-
-
-class FindAction(Action):
- """Abstract class that should be inherited for FIND actions, depending on
- the item type.
- """
- @property
- def processing(self):
- if self.status in ('DONE', 'BUILD'):
- return REFRESH
-
- return super(FindAction, self).processing
-
- def become_superseded(self, superseding):
- super(FindAction, self).become_superseded(superseding)
- info = ('vim_id', 'wim_internal_id')
- return remove_none_items({f: getattr(self, f) for f in info})
-
-
-class CreateAction(Action):
- """Abstract class that should be inherited for CREATE actions, depending on
- the item type.
- """
- @property
- def processing(self):
- if self.status in ('DONE', 'BUILD'):
- return REFRESH
-
- return super(CreateAction, self).processing
-
- def become_superseded(self, superseding):
- super(CreateAction, self).become_superseded(superseding)
-
- created = self.extra.get('created', True)
- sdn_net_id = self.extra.get('sdn_net_id')
- pending_info = self.wim_internal_id or self.vim_id or sdn_net_id
- if not(created and pending_info):
- return {}
-
- extra_fields = ('sdn_net_id', 'interfaces', 'created_items')
- extra_info = filter_dict_keys(self.extra or {}, extra_fields)
-
- return {'superseding_needed': True,
- 'wim_internal_id': self.wim_internal_id,
- 'vim_id': self.vim_id,
- 'extra': remove_none_items(extra_info)}
-
-
-class DeleteAction(Action):
- """Abstract class that should be inherited for DELETE actions, depending on
- the item type.
- """
- def supersede(self, others):
- self.logger.debug('%s %s %s %s might supersede other actions',
- self.id, self.action, self.item, self.item_id)
- # First collect all the changes from the superseded tasks
- changes = [other.become_superseded(self) for other in others]
- needed = any(change.pop('superseding_needed', False)
- for change in changes)
-
- # Deal with the nested ones first
- extras = [change.pop('extra', None) or {} for change in changes]
- items = [extra.pop('created_items', None) or {} for extra in extras]
- items = merge_dicts(self.extra.get('created_items', {}), *items)
- self.extra = merge_dicts(self.extra, {'created_items': items}, *extras)
-
- # Accept the other ones
- change = ((key, value) for key, value in merge_dicts(*changes).items()
- if key in self.PROPERTIES)
- for attr, value in change:
- setattr(self, attr, value)
-
- # Reevaluate if the action itself is needed
- if not needed:
- self.status = 'SUPERSEDED'
-
-
-def _expand_extra(record):
- extra = record.pop('extra', None) or {}
- if isinstance(extra, str):
- extra = yaml.safe_load(extra)
-
- record['params'] = extra.get('params')
- record['depends_on'] = extra.get('depends_on', [])
- record['depends'] = extra.get('depends', None)
- record['extra'] = extra
-
- return record