X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_ro%2Fwim%2Factions.py;h=e199fd07a00bf8a238769b01cfe95c76a4e4dd80;hb=HEAD;hp=f224460858d43bd058c697d251da0b6bfaacd259;hpb=0446cd5df24c38f95cea13b995c553e9b2403f21;p=osm%2FRO.git diff --git a/osm_ro/wim/actions.py b/osm_ro/wim/actions.py deleted file mode 100644 index f2244608..00000000 --- a/osm_ro/wim/actions.py +++ /dev/null @@ -1,423 +0,0 @@ -# -*- coding: utf-8 -*- -## -# Copyright 2018 University of Bristol - High Performance Networks Research -# Group -# All Rights Reserved. -# -# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique -# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# For those usages not covered by the Apache License, Version 2.0 please -# contact with: -# -# Neither the name of the University of Bristol nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# This work has been performed in the context of DCMS UK 5G Testbeds -# & Trials Programme and in the framework of the Metro-Haul project - -# funded by the European Commission under Grant number 761727 through the -# Horizon 2020 and 5G-PPP programmes. -## -# pylint: disable=E1101,E0203,W0201 - -"""Common logic for task management""" -import logging -from time import time -from types import StringTypes - -from six.moves import range - -import yaml - -from ..utils import ( - filter_dict_keys, - filter_out_dict_keys, - merge_dicts, - remove_none_items, - truncate -) - -PENDING, REFRESH, IGNORE = range(3) - -TIMEOUT = 1 * 60 * 60 # 1 hour -MIN_ATTEMPTS = 10 - - -class Action(object): - """Create a basic object representing the action record. - - Arguments: - record (dict): record as returned by the database - **kwargs: extra keyword arguments to overwrite the fields in record - """ - - PROPERTIES = [ - 'task_index', # MD - Index number of the task. - # This together with the instance_action_id - # forms a unique key identifier - 'action', # MD - CREATE, DELETE, FIND - 'item', # MD - table name, eg. instance_wim_nets - 'item_id', # MD - uuid of the referenced entry in the - # previous table - 'instance_action_id', # MD - reference to a cohesive group of actions - # related to the same instance-scenario - 'wim_account_id', # MD - reference to the WIM account used - # by the thread/connector - 'wim_internal_id', # MD - internal ID used by the WIM to refer to - # the item - 'datacenter_vim_id', # MD - reference to the VIM account used - # by the thread/connector - 'vim_id', # MD - internal ID used by the VIM to refer to - # the item - 'status', # MD - SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED - 'extra', # MD - text with yaml format at database, - # dict at memory with: - # `- params: list with the params to be sent to the VIM for CREATE - # or FIND. For DELETE the vim_id is taken from other - # related tasks - # `- find: (only for CREATE tasks) if present it should FIND - # before creating and use if existing. - # Contains the FIND params - # `- depends_on: list with the 'task_index'es of tasks that must be - # completed before. e.g. a vm creation depends on a net - # creation - # `- sdn_net_id: used for net. - # `- tries - # `- created_items: - # dictionary with extra elements created that need - # to be deleted. e.g. ports, - # `- volumes,... - # `- created: False if the VIM element is not created by - # other actions, and it should not be deleted - # `- wim_status: WIM status of the element. Stored also at database - # in the item table - 'params', # M - similar to extra[params] - 'depends_on', # M - similar to extra[depends_on] - 'depends', # M - dict with task_index(from depends_on) to - # task class - 'error_msg', # MD - descriptive text upon an error - 'created_at', # MD - task DB creation time - 'modified_at', # MD - last DB update time - 'process_at', # M - unix epoch when to process the task - ] - - __slots__ = PROPERTIES + [ - 'logger', - ] - - def __init__(self, record, logger=None, **kwargs): - self.logger = logger or logging.getLogger('openmano.wim.action') - attrs = merge_dicts(dict.fromkeys(self.PROPERTIES), record, kwargs) - self.update(_expand_extra(attrs)) - - def __repr__(self): - return super(Action, self).__repr__() + repr(self.as_dict()) - - def as_dict(self, *fields): - """Representation of the object as a dict""" - attrs = (set(self.PROPERTIES) & set(fields) - if fields else self.PROPERTIES) - return {k: getattr(self, k) for k in attrs} - - def as_record(self): - """Returns a dict that can be send to the persistence layer""" - special = ['params', 'depends_on', 'depends'] - record = self.as_dict() - record['extra'].update(self.as_dict(*special)) - non_fields = special + ['process_at'] - - return remove_none_items(filter_out_dict_keys(record, non_fields)) - - def update(self, values=None, **kwargs): - """Update the in-memory representation of the task (works similarly to - dict.update). The update is NOT automatically persisted. - """ - # "white-listed mass assignment" - updates = merge_dicts(values, kwargs) - for attr in set(self.PROPERTIES) & set(updates.keys()): - setattr(self, attr, updates[attr]) - - def save(self, persistence, **kwargs): - """Persist current state of the object to the database. - - Arguments: - persistence: object encapsulating the database - **kwargs: extra properties to be updated before saving - - Note: - If any key word argument is passed, the object itself will be - changed as an extra side-effect. - """ - action_id = self.instance_action_id - index = self.task_index - if kwargs: - self.update(kwargs) - properties = self.as_record() - - return persistence.update_action(action_id, index, properties) - - def fail(self, persistence, reason, status='FAILED'): - """Mark action as FAILED, updating tables accordingly""" - persistence.update_instance_action_counters( - self.instance_action_id, - failed=1, - done=(-1 if self.status == 'DONE' else 0)) - - self.status = status - self.error_msg = truncate(reason) - self.logger.error('%s %s: %s', self.id, status, reason) - return self.save(persistence) - - def succeed(self, persistence, status='DONE'): - """Mark action as DONE, updating tables accordingly""" - persistence.update_instance_action_counters( - self.instance_action_id, done=1) - self.status = status - self.logger.debug('%s %s', self.id, status) - return self.save(persistence) - - def defer(self, persistence, reason, - timeout=TIMEOUT, min_attempts=MIN_ATTEMPTS): - """Postpone the task processing, taking care to not timeout. - - Arguments: - persistence: object encapsulating the database - reason (str): explanation for the delay - timeout (int): maximum delay tolerated since the first attempt. - Note that this number is a time delta, in seconds - min_attempts (int): Number of attempts to try before giving up. - """ - now = time() - last_attempt = self.extra.get('last_attempted_at') or time() - attempts = self.extra.get('attempts') or 0 - - if last_attempt - now > timeout and attempts > min_attempts: - self.fail(persistence, - 'Timeout reached. {} attempts in the last {:d} min' - .format(attempts, last_attempt / 60)) - - self.extra['last_attempted_at'] = time() - self.extra['attempts'] = attempts + 1 - self.logger.info('%s DEFERRED: %s', self.id, reason) - return self.save(persistence) - - @property - def group_key(self): - """Key defining the group to which this tasks belongs""" - return (self.item, self.item_id) - - @property - def processing(self): - """Processing status for the task (PENDING, REFRESH, IGNORE)""" - if self.status == 'SCHEDULED': - return PENDING - - return IGNORE - - @property - def id(self): - """Unique identifier of this particular action""" - return '{}[{}]'.format(self.instance_action_id, self.task_index) - - @property - def is_scheduled(self): - return self.status == 'SCHEDULED' - - @property - def is_build(self): - return self.status == 'BUILD' - - @property - def is_done(self): - return self.status == 'DONE' - - @property - def is_failed(self): - return self.status == 'FAILED' - - @property - def is_superseded(self): - return self.status == 'SUPERSEDED' - - def refresh(self, connector, persistence): - """Use the connector/persistence to refresh the status of the item. - - After the item status is refreshed any change in the task should be - persisted to the database. - - Arguments: - connector: object containing the classes to access the WIM or VIM - persistence: object containing the methods necessary to query the - database and to persist the updates - """ - self.logger.debug( - 'Action `%s` has no refresh to be done', - self.__class__.__name__) - - def expand_dependency_links(self, task_group): - """Expand task indexes into actual IDs""" - if not self.depends_on or ( - isinstance(self.depends, dict) and self.depends): - return - - num_tasks = len(task_group) - references = { - "TASK-{}".format(i): task_group[i] - for i in self.depends_on - if i < num_tasks and task_group[i].task_index == i and - task_group[i].instance_action_id == self.instance_action_id - } - self.depends = references - - def become_superseded(self, superseding): - """When another action tries to supersede this one, - we need to change both of them, so the surviving actions will be - logic consistent. - - This method should do the required internal changes, and also - suggest changes for the other, superseding, action. - - Arguments: - superseding: other task superseding this one - - Returns: - dict: changes suggested to the action superseding this one. - A special key ``superseding_needed`` is used to - suggest if the superseding is actually required or not. - If not present, ``superseding_needed`` is assumed to - be False. - """ - self.status = 'SUPERSEDED' - self.logger.debug( - 'Action `%s` was superseded by `%s`', - self.__class__.__name__, superseding.__class__.__name__) - return {} - - def supersede(self, others): - """Supersede other tasks, if necessary - - Arguments: - others (list): action objects being superseded - - When the task decide to supersede others, this method should call - ``become_superseded`` on the other actions, collect the suggested - updates and perform the necessary changes - """ - # By default actions don't supersede others - self.logger.debug( - 'Action `%s` does not supersede other actions', - self.__class__.__name__) - - def process(self, connector, persistence, ovim): - """Abstract method, that needs to be implemented. - Process the current task. - - Arguments: - connector: object with API for accessing the WAN - Infrastructure Manager system - persistence: abstraction layer for the database - ovim: instance of openvim, abstraction layer that enable - SDN-related operations - """ - raise NotImplementedError - - -class FindAction(Action): - """Abstract class that should be inherited for FIND actions, depending on - the item type. - """ - @property - def processing(self): - if self.status in ('DONE', 'BUILD'): - return REFRESH - - return super(FindAction, self).processing - - def become_superseded(self, superseding): - super(FindAction, self).become_superseded(superseding) - info = ('vim_id', 'wim_internal_id') - return remove_none_items({f: getattr(self, f) for f in info}) - - -class CreateAction(Action): - """Abstract class that should be inherited for CREATE actions, depending on - the item type. - """ - @property - def processing(self): - if self.status in ('DONE', 'BUILD'): - return REFRESH - - return super(CreateAction, self).processing - - def become_superseded(self, superseding): - super(CreateAction, self).become_superseded(superseding) - - created = self.extra.get('created', True) - sdn_net_id = self.extra.get('sdn_net_id') - pending_info = self.wim_internal_id or self.vim_id or sdn_net_id - if not(created and pending_info): - return {} - - extra_fields = ('sdn_net_id', 'interfaces', 'created_items') - extra_info = filter_dict_keys(self.extra or {}, extra_fields) - - return {'superseding_needed': True, - 'wim_internal_id': self.wim_internal_id, - 'vim_id': self.vim_id, - 'extra': remove_none_items(extra_info)} - - -class DeleteAction(Action): - """Abstract class that should be inherited for DELETE actions, depending on - the item type. - """ - def supersede(self, others): - self.logger.debug('%s %s %s %s might supersede other actions', - self.id, self.action, self.item, self.item_id) - # First collect all the changes from the superseded tasks - changes = [other.become_superseded(self) for other in others] - needed = any(change.pop('superseding_needed', False) - for change in changes) - - # Deal with the nested ones first - extras = [change.pop('extra', None) or {} for change in changes] - items = [extra.pop('created_items', None) or {} for extra in extras] - items = merge_dicts(self.extra.get('created_items', {}), *items) - self.extra = merge_dicts(self.extra, {'created_items': items}, *extras) - - # Accept the other ones - change = ((key, value) for key, value in merge_dicts(*changes).items() - if key in self.PROPERTIES) - for attr, value in change: - setattr(self, attr, value) - - # Reevaluate if the action itself is needed - if not needed: - self.status = 'SUPERSEDED' - - -def _expand_extra(record): - extra = record.pop('extra', None) or {} - if isinstance(extra, StringTypes): - extra = yaml.safe_load(extra) - - record['params'] = extra.get('params') - record['depends_on'] = extra.get('depends_on', []) - record['depends'] = extra.get('depends', None) - record['extra'] = extra - - return record