Prepare LCM tasks for HA
[osm/LCM.git] / osm_lcm / lcm_utils.py
index 01701a5..b4d0455 100644 (file)
@@ -16,6 +16,7 @@
 # under the License.
 ##
 
+import asyncio
 from collections import OrderedDict
 # from osm_common.dbbase import DbException
 
@@ -45,7 +46,36 @@ def versiontuple(v):
     return tuple(filled)
 
 
-class TaskRegistry:
+# LcmBase must be listed before TaskRegistry, as it is a dependency.
+class LcmBase:
+
+    def __init__(self, db, msg, fs, logger):
+        """
+
+        :param db: database connection
+        """
+        self.db = db
+        self.msg = msg
+        self.fs = fs
+        self.logger = logger
+
+    def update_db_2(self, item, _id, _desc):
+        """
+        Updates database with _desc information. If success _desc is cleared
+        :param item:
+        :param _id:
+        :param _desc: dictionary with the content to update. Keys are dot separated keys for
+        :return: None. Exception is raised on error
+        """
+        if not _desc:
+            return
+        self.db.set_one(item, {"_id": _id}, _desc)
+        _desc.clear()
+        # except DbException as e:
+        #     self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
+
+
+class TaskRegistry(LcmBase):
     """
     Implements a registry of task needed for later cancelation, look for related tasks that must be completed before
     etc. It stores a four level dict
@@ -53,9 +83,19 @@ class TaskRegistry:
     Second level is the _id
     Third level is the operation id
     Fourth level is a descriptive name, the value is the task class
+
+    The HA (High-Availability) methods are used when more than one LCM instance is running.
+    To register the current task in the external DB, use LcmBase as base class, to be able
+    to reuse LcmBase.update_db_2()
+    The DB registry uses the following fields to distinguish a task:
+    - op_type: operation type ("nslcmops" or "nsilcmops")
+    - op_id:   operation ID
+    - worker:  the worker ID for this process
     """
 
-    def __init__(self):
+    instance_id_label_dict = {'ns': 'nsInstanceId', 'nsi': 'netsliceInstanceId'}
+
+    def __init__(self, worker_id=None, db=None, logger=None):
         self.task_registry = {
             "ns": {},
             "nsi": {},
@@ -63,6 +103,9 @@ class TaskRegistry:
             "wim_account": {},
             "sdn": {},
         }
+        self.worker_id = worker_id
+        self.db = db
+        self.logger = logger
 
     def register(self, topic, _id, op_id, task_name, task):
         """
@@ -128,7 +171,7 @@ class TaskRegistry:
 
     def cancel(self, topic, _id, target_op_id=None, target_task_name=None):
         """
-        Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only 
+        Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only
         this is cancelled, and the same with task_name
         """
         if not self.task_registry[topic].get(_id):
@@ -144,30 +187,80 @@ class TaskRegistry:
                 # if result:
                 #     self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name))
 
-
-class LcmBase:
-
-    def __init__(self, db, msg, fs, logger):
+    def lock_HA(self, topic, op_type, op_id):
         """
-
-        :param db: database connection
+        Lock an task, if possible, to indicate to the HA system that
+        the task will be executed in this LCM instance.
+        :param topic: Can be "ns", "nsi"
+        :param op_type: Operation type, can be "nslcmops", "nsilcmops"
+        :param op_id: id of the operation of the related item
+        :return:
+        True=lock successful => execute the task (not registered by any other LCM instance)
+        False=lock failed => do NOT execute the task (already registered by another LCM instance)
         """
-        self.db = db
-        self.msg = msg
-        self.fs = fs
-        self.logger = logger
 
-    def update_db_2(self, item, _id, _desc):
+        db_lock_task = self.db.set_one(op_type,
+                                       q_filter={'_id': op_id, '_admin.worker': None},
+                                       update_dict={'_admin.worker': self.worker_id},
+                                       fail_on_empty=False)
+
+        if db_lock_task is None:
+            self.logger.debug("Task {} operation={} already locked by another worker".format(topic, op_id))
+            return False
+        else:
+            return True
+
+    async def waitfor_related_HA(self, topic, op_type, op_id=None):
         """
-        Updates database with _desc information. If success _desc is cleared
-        :param item:
-        :param _id:
-        :param _desc: dictionary with the content to update. Keys are dot separated keys for
-        :return: None. Exception is raised on error
+        Wait for any pending related HA tasks
         """
-        if not _desc:
+
+        # InstanceId label
+        instance_id_label = self.instance_id_label_dict.get(topic)
+
+        # Get 'startTime' timestamp for this operation
+        step = "Getting timestamp for op_id={} from db".format(op_id)
+        db_lcmop = self.db.get_one(op_type,
+                                   {"_id": op_id},
+                                   fail_on_empty=False)
+        if not db_lcmop:
             return
-        self.db.set_one(item, {"_id": _id}, _desc)
-        _desc.clear()
-        # except DbException as e:
-        #     self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
+        starttime_this_op = db_lcmop.get("startTime")
+        instance_id = db_lcmop.get(instance_id_label)
+
+        # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable.
+        timeout_wait_for_task = 3600   # Max time (seconds) to wait for a related task to finish
+        # interval_wait_for_task = 30    #  A too long polling interval slows things down considerably
+        interval_wait_for_task = 10       # Interval in seconds for polling related tasks
+        time_left = timeout_wait_for_task
+        old_num_related_tasks = 0
+        while True:
+            # Get related tasks (operations within the same NS or NSI instance) which are
+            # still running (operationState='PROCESSING') and which were started before this task.
+            _filter = {instance_id_label: instance_id,
+                       'operationState': 'PROCESSING',
+                       'startTime.lt': starttime_this_op}
+            db_waitfor_related_task = self.db.get_list(op_type,
+                                                       q_filter=_filter)
+            new_num_related_tasks = len(db_waitfor_related_task)
+            if not new_num_related_tasks:
+                # There are no related tasks, no need to wait, so return.
+                return
+            # If number of pending related tasks have changed,
+            # update the 'detailed-status' field and log the change.
+            if new_num_related_tasks != old_num_related_tasks:
+                db_lcmops_update = {}
+                step = db_lcmops_update["detailed-status"] = \
+                    "Waiting for {} related tasks to be completed.".format(
+                        new_num_related_tasks)
+                self.logger.debug("Task {} operation={} {}".format(topic, op_id, step))
+                self.update_db_2(op_type, op_id, db_lcmops_update)
+                old_num_related_tasks = new_num_related_tasks
+            time_left -= interval_wait_for_task
+            if time_left < 0:
+                raise LcmException(
+                    "Timeout ({}) when waiting for related tasks to be completed".format(
+                        timeout_wait_for_task))
+            await asyncio.sleep(interval_wait_for_task)
+
+        return