feature 8029 change RO to python3. Using vim plugins
[osm/RO.git] / RO / osm_ro / wim / actions.py
diff --git a/RO/osm_ro/wim/actions.py b/RO/osm_ro/wim/actions.py
new file mode 100644 (file)
index 0000000..e199fd0
--- /dev/null
@@ -0,0 +1,420 @@
+# -*- coding: utf-8 -*-
+##
+# Copyright 2018 University of Bristol - High Performance Networks Research
+# Group
+# All Rights Reserved.
+#
+# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
+# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: <highperformance-networks@bristol.ac.uk>
+#
+# Neither the name of the University of Bristol nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# This work has been performed in the context of DCMS UK 5G Testbeds
+# & Trials Programme and in the framework of the Metro-Haul project -
+# funded by the European Commission under Grant number 761727 through the
+# Horizon 2020 and 5G-PPP programmes.
+##
+# pylint: disable=E1101,E0203,W0201
+
+"""Common logic for task management"""
+import logging
+from time import time
+
+import yaml
+
+from ..utils import (
+    filter_dict_keys,
+    filter_out_dict_keys,
+    merge_dicts,
+    remove_none_items,
+    truncate
+)
+
+PENDING, REFRESH, IGNORE = range(3)
+
+TIMEOUT = 1 * 60 * 60  # 1 hour
+MIN_ATTEMPTS = 10
+
+
+class Action(object):
+    """Create a basic object representing the action record.
+
+    Arguments:
+        record (dict): record as returned by the database
+        **kwargs: extra keyword arguments to overwrite the fields in record
+    """
+
+    PROPERTIES = [
+        'task_index',          # MD - Index number of the task.
+                               #      This together with the instance_action_id
+                               #      forms a unique key identifier
+        'action',              # MD - CREATE, DELETE, FIND
+        'item',                # MD - table name, eg. instance_wim_nets
+        'item_id',             # MD - uuid of the referenced entry in the
+                               #      previous table
+        'instance_action_id',  # MD - reference to a cohesive group of actions
+                               #      related to the same instance-scenario
+        'wim_account_id',      # MD - reference to the WIM account used
+                               #      by the thread/connector
+        'wim_internal_id',     # MD - internal ID used by the WIM to refer to
+                               #      the item
+        'datacenter_vim_id',   # MD - reference to the VIM account used
+                               #      by the thread/connector
+        'vim_id',              # MD - internal ID used by the VIM to refer to
+                               #      the item
+        'status',              # MD - SCHEDULED,BUILD,DONE,FAILED,SUPERSEDED
+        'extra',               # MD - text with yaml format at database,
+        #                             dict at memory with:
+        # `- params:     list with the params to be sent to the VIM for CREATE
+        #                or FIND. For DELETE the vim_id is taken from other
+        #                related tasks
+        # `- find:       (only for CREATE tasks) if present it should FIND
+        #                before creating and use if existing.
+        #                Contains the FIND params
+        # `- depends_on: list with the 'task_index'es of tasks that must be
+        #                completed before. e.g. a vm creation depends on a net
+        #                creation
+        # `- sdn_net_id: used for net.
+        # `- tries
+        # `- created_items:
+        #                dictionary with extra elements created that need
+        #                to be deleted. e.g. ports,
+        # `- volumes,...
+        # `- created:    False if the VIM element is not created by
+        #                other actions, and it should not be deleted
+        # `- wim_status: WIM status of the element. Stored also at database
+        #                in the item table
+        'params',              # M  - similar to extra[params]
+        'depends_on',          # M  - similar to extra[depends_on]
+        'depends',             # M  - dict with task_index(from depends_on) to
+                               #      task class
+        'error_msg',           # MD - descriptive text upon an error
+        'created_at',          # MD - task DB creation time
+        'modified_at',         # MD - last DB update time
+        'process_at',          # M  - unix epoch when to process the task
+    ]
+
+    __slots__ = PROPERTIES + [
+        'logger',
+    ]
+
+    def __init__(self, record, logger=None, **kwargs):
+        self.logger = logger or logging.getLogger('openmano.wim.action')
+        attrs = merge_dicts(dict.fromkeys(self.PROPERTIES), record, kwargs)
+        self.update(_expand_extra(attrs))
+
+    def __repr__(self):
+        return super(Action, self).__repr__() + repr(self.as_dict())
+
+    def as_dict(self, *fields):
+        """Representation of the object as a dict"""
+        attrs = (set(self.PROPERTIES) & set(fields)
+                 if fields else self.PROPERTIES)
+        return {k: getattr(self, k) for k in attrs}
+
+    def as_record(self):
+        """Returns a dict that can be send to the persistence layer"""
+        special = ['params', 'depends_on', 'depends']
+        record = self.as_dict()
+        record['extra'].update(self.as_dict(*special))
+        non_fields = special + ['process_at']
+
+        return remove_none_items(filter_out_dict_keys(record, non_fields))
+
+    def update(self, values=None, **kwargs):
+        """Update the in-memory representation of the task (works similarly to
+        dict.update). The update is NOT automatically persisted.
+        """
+        # "white-listed mass assignment"
+        updates = merge_dicts(values, kwargs)
+        for attr in set(self.PROPERTIES) & set(updates.keys()):
+            setattr(self, attr, updates[attr])
+
+    def save(self, persistence, **kwargs):
+        """Persist current state of the object to the database.
+
+        Arguments:
+            persistence: object encapsulating the database
+            **kwargs: extra properties to be updated before saving
+
+        Note:
+            If any key word argument is passed, the object itself will be
+            changed as an extra side-effect.
+        """
+        action_id = self.instance_action_id
+        index = self.task_index
+        if kwargs:
+            self.update(kwargs)
+        properties = self.as_record()
+
+        return persistence.update_action(action_id, index, properties)
+
+    def fail(self, persistence, reason, status='FAILED'):
+        """Mark action as FAILED, updating tables accordingly"""
+        persistence.update_instance_action_counters(
+            self.instance_action_id,
+            failed=1,
+            done=(-1 if self.status == 'DONE' else 0))
+
+        self.status = status
+        self.error_msg = truncate(reason)
+        self.logger.error('%s %s: %s', self.id, status, reason)
+        return self.save(persistence)
+
+    def succeed(self, persistence, status='DONE'):
+        """Mark action as DONE, updating tables accordingly"""
+        persistence.update_instance_action_counters(
+            self.instance_action_id, done=1)
+        self.status = status
+        self.logger.debug('%s %s', self.id, status)
+        return self.save(persistence)
+
+    def defer(self, persistence, reason,
+              timeout=TIMEOUT, min_attempts=MIN_ATTEMPTS):
+        """Postpone the task processing, taking care to not timeout.
+
+        Arguments:
+            persistence: object encapsulating the database
+            reason (str): explanation for the delay
+            timeout (int): maximum delay tolerated since the first attempt.
+                Note that this number is a time delta, in seconds
+            min_attempts (int): Number of attempts to try before giving up.
+        """
+        now = time()
+        last_attempt = self.extra.get('last_attempted_at') or time()
+        attempts = self.extra.get('attempts') or 0
+
+        if last_attempt - now > timeout and attempts > min_attempts:
+            self.fail(persistence,
+                      'Timeout reached. {} attempts in the last {:d} min'
+                      .format(attempts, last_attempt / 60))
+
+        self.extra['last_attempted_at'] = time()
+        self.extra['attempts'] = attempts + 1
+        self.logger.info('%s DEFERRED: %s', self.id, reason)
+        return self.save(persistence)
+
+    @property
+    def group_key(self):
+        """Key defining the group to which this tasks belongs"""
+        return (self.item, self.item_id)
+
+    @property
+    def processing(self):
+        """Processing status for the task (PENDING, REFRESH, IGNORE)"""
+        if self.status == 'SCHEDULED':
+            return PENDING
+
+        return IGNORE
+
+    @property
+    def id(self):
+        """Unique identifier of this particular action"""
+        return '{}[{}]'.format(self.instance_action_id, self.task_index)
+
+    @property
+    def is_scheduled(self):
+        return self.status == 'SCHEDULED'
+
+    @property
+    def is_build(self):
+        return self.status == 'BUILD'
+
+    @property
+    def is_done(self):
+        return self.status == 'DONE'
+
+    @property
+    def is_failed(self):
+        return self.status == 'FAILED'
+
+    @property
+    def is_superseded(self):
+        return self.status == 'SUPERSEDED'
+
+    def refresh(self, connector, persistence):
+        """Use the connector/persistence to refresh the status of the item.
+
+        After the item status is refreshed any change in the task should be
+        persisted to the database.
+
+        Arguments:
+            connector: object containing the classes to access the WIM or VIM
+            persistence: object containing the methods necessary to query the
+                database and to persist the updates
+        """
+        self.logger.debug(
+            'Action `%s` has no refresh to be done',
+            self.__class__.__name__)
+
+    def expand_dependency_links(self, task_group):
+        """Expand task indexes into actual IDs"""
+        if not self.depends_on or (
+                isinstance(self.depends, dict) and self.depends):
+            return
+
+        num_tasks = len(task_group)
+        references = {
+            "TASK-{}".format(i): task_group[i]
+            for i in self.depends_on
+            if i < num_tasks and task_group[i].task_index == i and
+            task_group[i].instance_action_id == self.instance_action_id
+        }
+        self.depends = references
+
+    def become_superseded(self, superseding):
+        """When another action tries to supersede this one,
+        we need to change both of them, so the surviving actions will be
+        logic consistent.
+
+        This method should do the required internal changes, and also
+        suggest changes for the other, superseding, action.
+
+        Arguments:
+            superseding: other task superseding this one
+
+        Returns:
+            dict: changes suggested to the action superseding this one.
+                  A special key ``superseding_needed`` is used to
+                  suggest if the superseding is actually required or not.
+                  If not present, ``superseding_needed`` is assumed to
+                  be False.
+        """
+        self.status = 'SUPERSEDED'
+        self.logger.debug(
+            'Action `%s` was superseded by `%s`',
+            self.__class__.__name__, superseding.__class__.__name__)
+        return {}
+
+    def supersede(self, others):
+        """Supersede other tasks, if necessary
+
+        Arguments:
+            others (list): action objects being superseded
+
+        When the task decide to supersede others, this method should call
+        ``become_superseded`` on the other actions, collect the suggested
+        updates and perform the necessary changes
+        """
+        # By default actions don't supersede others
+        self.logger.debug(
+            'Action `%s` does not supersede other actions',
+            self.__class__.__name__)
+
+    def process(self, connector, persistence, ovim):
+        """Abstract method, that needs to be implemented.
+        Process the current task.
+
+        Arguments:
+            connector: object with API for accessing the WAN
+                Infrastructure Manager system
+            persistence: abstraction layer for the database
+            ovim: instance of openvim, abstraction layer that enable
+                SDN-related operations
+        """
+        raise NotImplementedError
+
+
+class FindAction(Action):
+    """Abstract class that should be inherited for FIND actions, depending on
+    the item type.
+    """
+    @property
+    def processing(self):
+        if self.status in ('DONE', 'BUILD'):
+            return REFRESH
+
+        return super(FindAction, self).processing
+
+    def become_superseded(self, superseding):
+        super(FindAction, self).become_superseded(superseding)
+        info = ('vim_id', 'wim_internal_id')
+        return remove_none_items({f: getattr(self, f) for f in info})
+
+
+class CreateAction(Action):
+    """Abstract class that should be inherited for CREATE actions, depending on
+    the item type.
+    """
+    @property
+    def processing(self):
+        if self.status in ('DONE', 'BUILD'):
+            return REFRESH
+
+        return super(CreateAction, self).processing
+
+    def become_superseded(self, superseding):
+        super(CreateAction, self).become_superseded(superseding)
+
+        created = self.extra.get('created', True)
+        sdn_net_id = self.extra.get('sdn_net_id')
+        pending_info = self.wim_internal_id or self.vim_id or sdn_net_id
+        if not(created and pending_info):
+            return {}
+
+        extra_fields = ('sdn_net_id', 'interfaces', 'created_items')
+        extra_info = filter_dict_keys(self.extra or {}, extra_fields)
+
+        return {'superseding_needed': True,
+                'wim_internal_id': self.wim_internal_id,
+                'vim_id': self.vim_id,
+                'extra': remove_none_items(extra_info)}
+
+
+class DeleteAction(Action):
+    """Abstract class that should be inherited for DELETE actions, depending on
+    the item type.
+    """
+    def supersede(self, others):
+        self.logger.debug('%s %s %s %s might supersede other actions',
+                          self.id, self.action, self.item, self.item_id)
+        # First collect all the changes from the superseded tasks
+        changes = [other.become_superseded(self) for other in others]
+        needed = any(change.pop('superseding_needed', False)
+                     for change in changes)
+
+        # Deal with the nested ones first
+        extras = [change.pop('extra', None) or {} for change in changes]
+        items = [extra.pop('created_items', None) or {} for extra in extras]
+        items = merge_dicts(self.extra.get('created_items', {}), *items)
+        self.extra = merge_dicts(self.extra, {'created_items': items}, *extras)
+
+        # Accept the other ones
+        change = ((key, value) for key, value in merge_dicts(*changes).items()
+                  if key in self.PROPERTIES)
+        for attr, value in change:
+            setattr(self, attr, value)
+
+        # Reevaluate if the action itself is needed
+        if not needed:
+            self.status = 'SUPERSEDED'
+
+
+def _expand_extra(record):
+    extra = record.pop('extra', None) or {}
+    if isinstance(extra, str):
+        extra = yaml.safe_load(extra)
+
+    record['params'] = extra.get('params')
+    record['depends_on'] = extra.get('depends_on', [])
+    record['depends'] = extra.get('depends', None)
+    record['extra'] = extra
+
+    return record