bug 760 skip get-ssh-public-key,generate-ssh-key,verify-ssh-credentials methods for...
[osm/LCM.git] / osm_lcm / lcm_utils.py
index cb74118..b4d0455 100644 (file)
@@ -1,9 +1,24 @@
-#!/usr/bin/python3
 # -*- coding: utf-8 -*-
 
+##
+# Copyright 2018 Telefonica S.A.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+##
 
+import asyncio
 from collections import OrderedDict
-from osm_common.dbbase import DbException
+from osm_common.dbbase import DbException
 
 __author__ = "Alfonso Tierno"
 
@@ -16,6 +31,10 @@ class LcmExceptionNoMgmtIP(LcmException):
     pass
 
 
+class LcmExceptionExit(LcmException):
+    pass
+
+
 def versiontuple(v):
     """utility for compare dot separate versions. Fills with zeros to proper number comparison
     package version will be something like 4.0.1.post11+gb3f024d.dirty-1. Where 4.0.1 is the git tag, postXX is the
@@ -27,7 +46,36 @@ def versiontuple(v):
     return tuple(filled)
 
 
-class TaskRegistry:
+# LcmBase must be listed before TaskRegistry, as it is a dependency.
+class LcmBase:
+
+    def __init__(self, db, msg, fs, logger):
+        """
+
+        :param db: database connection
+        """
+        self.db = db
+        self.msg = msg
+        self.fs = fs
+        self.logger = logger
+
+    def update_db_2(self, item, _id, _desc):
+        """
+        Updates database with _desc information. If success _desc is cleared
+        :param item:
+        :param _id:
+        :param _desc: dictionary with the content to update. Keys are dot separated keys for
+        :return: None. Exception is raised on error
+        """
+        if not _desc:
+            return
+        self.db.set_one(item, {"_id": _id}, _desc)
+        _desc.clear()
+        # except DbException as e:
+        #     self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
+
+
+class TaskRegistry(LcmBase):
     """
     Implements a registry of task needed for later cancelation, look for related tasks that must be completed before
     etc. It stores a four level dict
@@ -35,19 +83,34 @@ class TaskRegistry:
     Second level is the _id
     Third level is the operation id
     Fourth level is a descriptive name, the value is the task class
+
+    The HA (High-Availability) methods are used when more than one LCM instance is running.
+    To register the current task in the external DB, use LcmBase as base class, to be able
+    to reuse LcmBase.update_db_2()
+    The DB registry uses the following fields to distinguish a task:
+    - op_type: operation type ("nslcmops" or "nsilcmops")
+    - op_id:   operation ID
+    - worker:  the worker ID for this process
     """
 
-    def __init__(self):
+    instance_id_label_dict = {'ns': 'nsInstanceId', 'nsi': 'netsliceInstanceId'}
+
+    def __init__(self, worker_id=None, db=None, logger=None):
         self.task_registry = {
             "ns": {},
+            "nsi": {},
             "vim_account": {},
+            "wim_account": {},
             "sdn": {},
         }
+        self.worker_id = worker_id
+        self.db = db
+        self.logger = logger
 
     def register(self, topic, _id, op_id, task_name, task):
         """
         Register a new task
-        :param topic: Can be "ns", "vim_account", "sdn"
+        :param topic: Can be "ns", "nsi", "vim_account", "sdn"
         :param _id: _id of the related item
         :param op_id: id of the operation of the related item
         :param task_name: Task descriptive name, as create, instantiate, terminate. Must be unique in this op_id
@@ -64,23 +127,27 @@ class TaskRegistry:
 
     def remove(self, topic, _id, op_id, task_name=None):
         """
-        When task is ended, it should removed. It ignores missing tasks
-        :param topic: Can be "ns", "vim_account", "sdn"
+        When task is ended, it should be removed. It ignores missing tasks. It also removes tasks done with this _id
+        :param topic: Can be "ns", "nsi", "vim_account", "sdn"
         :param _id: _id of the related item
         :param op_id: id of the operation of the related item
-        :param task_name: Task descriptive name. If note it deletes all
-        :return:
+        :param task_name: Task descriptive name. If none it deletes all tasks with same _id and op_id
+        :return: None
         """
-        if not self.task_registry[topic].get(_id) or not self.task_registry[topic][_id].get(op_id):
+        if not self.task_registry[topic].get(_id):
             return
         if not task_name:
-            # print("deleting tasks", topic, _id, op_id, self.task_registry[topic][_id][op_id])
-            del self.task_registry[topic][_id][op_id]
-        elif task_name in self.task_registry[topic][_id][op_id]:
-            # print("deleting tasks", topic, _id, op_id, task_name, self.task_registry[topic][_id][op_id][task_name])
-            del self.task_registry[topic][_id][op_id][task_name]
-            if not self.task_registry[topic][_id][op_id]:
-                del self.task_registry[topic][_id][op_id]
+            self.task_registry[topic][_id].pop(op_id, None)
+        elif self.task_registry[topic][_id].get(op_id):
+            self.task_registry[topic][_id][op_id].pop(task_name, None)
+
+        # delete done tasks
+        for op_id_ in list(self.task_registry[topic][_id]):
+            for name, task in self.task_registry[topic][_id][op_id_].items():
+                if not task.done():
+                    break
+            else:
+                del self.task_registry[topic][_id][op_id_]
         if not self.task_registry[topic][_id]:
             del self.task_registry[topic][_id]
 
@@ -96,15 +163,16 @@ class TaskRegistry:
                 continue
 
             for task_name, task in self.task_registry[topic][_id][op_id].items():
-                task_list.append(task)
-                task_name_list.append(task_name)
+                if not task.done():
+                    task_list.append(task)
+                    task_name_list.append(task_name)
             break
         return ", ".join(task_name_list), task_list
 
     def cancel(self, topic, _id, target_op_id=None, target_task_name=None):
         """
-        Cancel all active tasks of a concrete ns, vim_account, sdn identified for _id. If op_id is supplied only this is
-        cancelled, and the same with task_name
+        Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only
+        this is cancelled, and the same with task_name
         """
         if not self.task_registry[topic].get(_id):
             return
@@ -119,31 +187,80 @@ class TaskRegistry:
                 # if result:
                 #     self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name))
 
-
-class LcmBase:
-
-    def __init__(self, db, msg, fs, logger):
+    def lock_HA(self, topic, op_type, op_id):
         """
-
-        :param db: database connection
+        Lock an task, if possible, to indicate to the HA system that
+        the task will be executed in this LCM instance.
+        :param topic: Can be "ns", "nsi"
+        :param op_type: Operation type, can be "nslcmops", "nsilcmops"
+        :param op_id: id of the operation of the related item
+        :return:
+        True=lock successful => execute the task (not registered by any other LCM instance)
+        False=lock failed => do NOT execute the task (already registered by another LCM instance)
         """
-        self.db = db
-        self.msg = msg
-        self.fs = fs
-        self.logger = logger
 
-    def update_db_2(self, item, _id, _desc):
+        db_lock_task = self.db.set_one(op_type,
+                                       q_filter={'_id': op_id, '_admin.worker': None},
+                                       update_dict={'_admin.worker': self.worker_id},
+                                       fail_on_empty=False)
+
+        if db_lock_task is None:
+            self.logger.debug("Task {} operation={} already locked by another worker".format(topic, op_id))
+            return False
+        else:
+            return True
+
+    async def waitfor_related_HA(self, topic, op_type, op_id=None):
         """
-        Updates database with _desc information. Upon success _desc is cleared
-        :param item:
-        :param _id:
-        :param _desc:
-        :return:
+        Wait for any pending related HA tasks
         """
-        if not _desc:
+
+        # InstanceId label
+        instance_id_label = self.instance_id_label_dict.get(topic)
+
+        # Get 'startTime' timestamp for this operation
+        step = "Getting timestamp for op_id={} from db".format(op_id)
+        db_lcmop = self.db.get_one(op_type,
+                                   {"_id": op_id},
+                                   fail_on_empty=False)
+        if not db_lcmop:
             return
-        try:
-            self.db.set_one(item, {"_id": _id}, _desc)
-            _desc.clear()
-        except DbException as e:
-            self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
+        starttime_this_op = db_lcmop.get("startTime")
+        instance_id = db_lcmop.get(instance_id_label)
+
+        # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable.
+        timeout_wait_for_task = 3600   # Max time (seconds) to wait for a related task to finish
+        # interval_wait_for_task = 30    #  A too long polling interval slows things down considerably
+        interval_wait_for_task = 10       # Interval in seconds for polling related tasks
+        time_left = timeout_wait_for_task
+        old_num_related_tasks = 0
+        while True:
+            # Get related tasks (operations within the same NS or NSI instance) which are
+            # still running (operationState='PROCESSING') and which were started before this task.
+            _filter = {instance_id_label: instance_id,
+                       'operationState': 'PROCESSING',
+                       'startTime.lt': starttime_this_op}
+            db_waitfor_related_task = self.db.get_list(op_type,
+                                                       q_filter=_filter)
+            new_num_related_tasks = len(db_waitfor_related_task)
+            if not new_num_related_tasks:
+                # There are no related tasks, no need to wait, so return.
+                return
+            # If number of pending related tasks have changed,
+            # update the 'detailed-status' field and log the change.
+            if new_num_related_tasks != old_num_related_tasks:
+                db_lcmops_update = {}
+                step = db_lcmops_update["detailed-status"] = \
+                    "Waiting for {} related tasks to be completed.".format(
+                        new_num_related_tasks)
+                self.logger.debug("Task {} operation={} {}".format(topic, op_id, step))
+                self.update_db_2(op_type, op_id, db_lcmops_update)
+                old_num_related_tasks = new_num_related_tasks
+            time_left -= interval_wait_for_task
+            if time_left < 0:
+                raise LcmException(
+                    "Timeout ({}) when waiting for related tasks to be completed".format(
+                        timeout_wait_for_task))
+            await asyncio.sleep(interval_wait_for_task)
+
+        return