Prepare LCM tasks for HA 73/7673/14
authorkuuse <johan.kuuse@altran.com>
Tue, 18 Jun 2019 10:09:24 +0000 (12:09 +0200)
committerkuuse <johan.kuuse@altran.com>
Mon, 1 Jul 2019 13:02:30 +0000 (15:02 +0200)
Change-Id: I6146d5903e95d6d710d349a08fcf5ae03127dfed
Signed-off-by: kuuse <johan.kuuse@altran.com>
osm_lcm/lcm.py
osm_lcm/lcm_utils.py
osm_lcm/netslice.py
osm_lcm/ns.py

index 8f66055..27a44cb 100644 (file)
@@ -71,8 +71,6 @@ class Lcm:
         self.consecutive_errors = 0
         self.first_start = False
 
-        # contains created tasks/futures to be able to cancel
-        self.lcm_tasks = TaskRegistry()
         # logging
         self.logger = logging.getLogger('lcm')
         # get id
@@ -174,6 +172,9 @@ class Lcm:
             self.logger.critical(str(e), exc_info=True)
             raise LcmException(str(e))
 
+        # contains created tasks/futures to be able to cancel
+        self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
+
         self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.vca_config, self.loop)
         self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, 
                                              self.vca_config, self.loop)
index 01701a5..b4d0455 100644 (file)
@@ -16,6 +16,7 @@
 # under the License.
 ##
 
+import asyncio
 from collections import OrderedDict
 # from osm_common.dbbase import DbException
 
@@ -45,7 +46,36 @@ def versiontuple(v):
     return tuple(filled)
 
 
-class TaskRegistry:
+# LcmBase must be listed before TaskRegistry, as it is a dependency.
+class LcmBase:
+
+    def __init__(self, db, msg, fs, logger):
+        """
+
+        :param db: database connection
+        """
+        self.db = db
+        self.msg = msg
+        self.fs = fs
+        self.logger = logger
+
+    def update_db_2(self, item, _id, _desc):
+        """
+        Updates database with _desc information. If success _desc is cleared
+        :param item:
+        :param _id:
+        :param _desc: dictionary with the content to update. Keys are dot separated keys for
+        :return: None. Exception is raised on error
+        """
+        if not _desc:
+            return
+        self.db.set_one(item, {"_id": _id}, _desc)
+        _desc.clear()
+        # except DbException as e:
+        #     self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
+
+
+class TaskRegistry(LcmBase):
     """
     Implements a registry of task needed for later cancelation, look for related tasks that must be completed before
     etc. It stores a four level dict
@@ -53,9 +83,19 @@ class TaskRegistry:
     Second level is the _id
     Third level is the operation id
     Fourth level is a descriptive name, the value is the task class
+
+    The HA (High-Availability) methods are used when more than one LCM instance is running.
+    To register the current task in the external DB, use LcmBase as base class, to be able
+    to reuse LcmBase.update_db_2()
+    The DB registry uses the following fields to distinguish a task:
+    - op_type: operation type ("nslcmops" or "nsilcmops")
+    - op_id:   operation ID
+    - worker:  the worker ID for this process
     """
 
-    def __init__(self):
+    instance_id_label_dict = {'ns': 'nsInstanceId', 'nsi': 'netsliceInstanceId'}
+
+    def __init__(self, worker_id=None, db=None, logger=None):
         self.task_registry = {
             "ns": {},
             "nsi": {},
@@ -63,6 +103,9 @@ class TaskRegistry:
             "wim_account": {},
             "sdn": {},
         }
+        self.worker_id = worker_id
+        self.db = db
+        self.logger = logger
 
     def register(self, topic, _id, op_id, task_name, task):
         """
@@ -128,7 +171,7 @@ class TaskRegistry:
 
     def cancel(self, topic, _id, target_op_id=None, target_task_name=None):
         """
-        Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only 
+        Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only
         this is cancelled, and the same with task_name
         """
         if not self.task_registry[topic].get(_id):
@@ -144,30 +187,80 @@ class TaskRegistry:
                 # if result:
                 #     self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name))
 
-
-class LcmBase:
-
-    def __init__(self, db, msg, fs, logger):
+    def lock_HA(self, topic, op_type, op_id):
         """
-
-        :param db: database connection
+        Lock an task, if possible, to indicate to the HA system that
+        the task will be executed in this LCM instance.
+        :param topic: Can be "ns", "nsi"
+        :param op_type: Operation type, can be "nslcmops", "nsilcmops"
+        :param op_id: id of the operation of the related item
+        :return:
+        True=lock successful => execute the task (not registered by any other LCM instance)
+        False=lock failed => do NOT execute the task (already registered by another LCM instance)
         """
-        self.db = db
-        self.msg = msg
-        self.fs = fs
-        self.logger = logger
 
-    def update_db_2(self, item, _id, _desc):
+        db_lock_task = self.db.set_one(op_type,
+                                       q_filter={'_id': op_id, '_admin.worker': None},
+                                       update_dict={'_admin.worker': self.worker_id},
+                                       fail_on_empty=False)
+
+        if db_lock_task is None:
+            self.logger.debug("Task {} operation={} already locked by another worker".format(topic, op_id))
+            return False
+        else:
+            return True
+
+    async def waitfor_related_HA(self, topic, op_type, op_id=None):
         """
-        Updates database with _desc information. If success _desc is cleared
-        :param item:
-        :param _id:
-        :param _desc: dictionary with the content to update. Keys are dot separated keys for
-        :return: None. Exception is raised on error
+        Wait for any pending related HA tasks
         """
-        if not _desc:
+
+        # InstanceId label
+        instance_id_label = self.instance_id_label_dict.get(topic)
+
+        # Get 'startTime' timestamp for this operation
+        step = "Getting timestamp for op_id={} from db".format(op_id)
+        db_lcmop = self.db.get_one(op_type,
+                                   {"_id": op_id},
+                                   fail_on_empty=False)
+        if not db_lcmop:
             return
-        self.db.set_one(item, {"_id": _id}, _desc)
-        _desc.clear()
-        # except DbException as e:
-        #     self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
+        starttime_this_op = db_lcmop.get("startTime")
+        instance_id = db_lcmop.get(instance_id_label)
+
+        # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable.
+        timeout_wait_for_task = 3600   # Max time (seconds) to wait for a related task to finish
+        # interval_wait_for_task = 30    #  A too long polling interval slows things down considerably
+        interval_wait_for_task = 10       # Interval in seconds for polling related tasks
+        time_left = timeout_wait_for_task
+        old_num_related_tasks = 0
+        while True:
+            # Get related tasks (operations within the same NS or NSI instance) which are
+            # still running (operationState='PROCESSING') and which were started before this task.
+            _filter = {instance_id_label: instance_id,
+                       'operationState': 'PROCESSING',
+                       'startTime.lt': starttime_this_op}
+            db_waitfor_related_task = self.db.get_list(op_type,
+                                                       q_filter=_filter)
+            new_num_related_tasks = len(db_waitfor_related_task)
+            if not new_num_related_tasks:
+                # There are no related tasks, no need to wait, so return.
+                return
+            # If number of pending related tasks have changed,
+            # update the 'detailed-status' field and log the change.
+            if new_num_related_tasks != old_num_related_tasks:
+                db_lcmops_update = {}
+                step = db_lcmops_update["detailed-status"] = \
+                    "Waiting for {} related tasks to be completed.".format(
+                        new_num_related_tasks)
+                self.logger.debug("Task {} operation={} {}".format(topic, op_id, step))
+                self.update_db_2(op_type, op_id, db_lcmops_update)
+                old_num_related_tasks = new_num_related_tasks
+            time_left -= interval_wait_for_task
+            if time_left < 0:
+                raise LcmException(
+                    "Timeout ({}) when waiting for related tasks to be completed".format(
+                        timeout_wait_for_task))
+            await asyncio.sleep(interval_wait_for_task)
+
+        return
index df32473..da428d1 100644 (file)
@@ -70,6 +70,12 @@ class NetsliceLcm(LcmBase):
                 raise LcmException("ns_update_nsir: Not found vld={} at RO info".format(vld["id"]))
 
     async def instantiate(self, nsir_id, nsilcmop_id):
+
+        # Try to lock HA task here
+        task_is_locked_by_me = self.lcm_tasks.lock_HA('nsi', 'nsilcmops', nsilcmop_id)
+        if not task_is_locked_by_me:
+            return
+
         logging_text = "Task netslice={} instantiate={} ".format(nsir_id, nsilcmop_id)
         self.logger.debug(logging_text + "Enter")
         # get all needed from database
@@ -218,7 +224,7 @@ class NetsliceLcm(LcmBase):
                 db_nsir_update_RO["netslice_scenario_id"] = desc["uuid"]
                 db_nsir_update_RO["vld_id"] = RO_ns_params["name"]
                 db_nsir_update["_admin.deployed.RO"].append(db_nsir_update_RO)
-        
+
         def overwrite_nsd_params(self, db_nsir, nslcmop):
             RO_list = []
             vld_op_list = []
@@ -261,38 +267,30 @@ class NetsliceLcm(LcmBase):
             return nsr_id, nslcmop
 
         try:
+            # wait for any previous tasks in process
+            await self.lcm_tasks.waitfor_related_HA('nsi', 'nsilcmops', nsilcmop_id)
+
             step = "Getting nsir={} from db".format(nsir_id)
             db_nsir = self.db.get_one("nsis", {"_id": nsir_id})
             step = "Getting nsilcmop={} from db".format(nsilcmop_id)
             db_nsilcmop = self.db.get_one("nsilcmops", {"_id": nsilcmop_id})
 
-            # look if previous tasks is in process
-            task_name, task_dependency = self.lcm_tasks.lookfor_related("nsi", nsir_id, nsilcmop_id)
-            if task_dependency:
-                step = db_nsilcmop_update["detailed-status"] = \
-                    "Waiting for related tasks to be completed: {}".format(task_name)
-                self.logger.debug(logging_text + step)
-                self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update)
-                _, pending = await asyncio.wait(task_dependency, timeout=3600)
-                if pending:
-                    raise LcmException("Timeout waiting related tasks to be completed")
-
             # Empty list to keep track of network service records status in the netslice
             nsir_admin = db_nsir_admin = db_nsir.get("_admin")
 
             # Slice status Creating
             db_nsir_update["detailed-status"] = "creating"
             db_nsir_update["operational-status"] = "init"
-            self.update_db_2("nsis", nsir_id, db_nsir_update)     
-            
+            self.update_db_2("nsis", nsir_id, db_nsir_update)
+
             # Creating netslice VLDs networking before NS instantiation
             db_nsir_update["_admin.deployed.RO"] = db_nsir_admin["deployed"]["RO"]
             for vld_item in get_iterable(nsir_admin, "netslice-vld"):
                 await netslice_scenario_create(self, vld_item, nsir_id, db_nsir, db_nsir_admin, db_nsir_update)
             self.update_db_2("nsis", nsir_id, db_nsir_update)
-            
+
             db_nsir_update["detailed-status"] = "Creating netslice subnets at RO"
-            self.update_db_2("nsis", nsir_id, db_nsir_update)  
+            self.update_db_2("nsis", nsir_id, db_nsir_update)
 
             db_nsir = self.db.get_one("nsis", {"_id": nsir_id})
 
@@ -300,9 +298,9 @@ class NetsliceLcm(LcmBase):
             # netslice_scenarios = db_nsir["_admin"]["deployed"]["RO"]
             # db_nsir_update_RO = deepcopy(netslice_scenarios)
             # for netslice_scenario in netslice_scenarios:
-            #    await netslice_scenario_check(self, netslice_scenario["netslice_scenario_id"], 
+            #    await netslice_scenario_check(self, netslice_scenario["netslice_scenario_id"],
             #                                  nsir_id, db_nsir_update_RO)
-            
+
             # db_nsir_update["_admin.deployed.RO"] = db_nsir_update_RO
             # self.update_db_2("nsis", nsir_id, db_nsir_update)
 
@@ -319,12 +317,12 @@ class NetsliceLcm(LcmBase):
                 step = "Launching ns={} instantiate={} task".format(nsr_id, nslcmop_id)
                 task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
                 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
-            
+
             # Wait until Network Slice is ready
             step = nsir_status_detailed = " Waiting nsi ready. nsi_id={}".format(nsir_id)
             nsrs_detailed_list_old = None
             self.logger.debug(logging_text + step)
-            
+
             # TODO: substitute while for await (all task to be done or not)
             deployment_timeout = 2 * 3600   # Two hours
             while deployment_timeout > 0:
@@ -414,9 +412,15 @@ class NetsliceLcm(LcmBase):
             self.lcm_tasks.remove("nsi", nsir_id, nsilcmop_id, "nsi_instantiate")
 
     async def terminate(self, nsir_id, nsilcmop_id):
+
+        # Try to lock HA task here
+        task_is_locked_by_me = self.lcm_tasks.lock_HA('nsi', 'nsilcmops', nsilcmop_id)
+        if not task_is_locked_by_me:
+            return
+
         logging_text = "Task nsi={} terminate={} ".format(nsir_id, nsilcmop_id)
         self.logger.debug(logging_text + "Enter")
-        exc = None 
+        exc = None
         db_nsir = None
         db_nsilcmop = None
         db_nsir_update = {"_admin.nsilcmop": nsilcmop_id}
@@ -427,12 +431,15 @@ class NetsliceLcm(LcmBase):
         nsilcmop_operation_state = None
         autoremove = False  # autoremove after terminated
         try:
+            # wait for any previous tasks in process
+            await self.lcm_tasks.waitfor_related_HA('nsi', 'nsilcmops', nsilcmop_id)
+
             step = "Getting nsir={} from db".format(nsir_id)
             db_nsir = self.db.get_one("nsis", {"_id": nsir_id})
             nsir_deployed = deepcopy(db_nsir["_admin"].get("deployed"))
             step = "Getting nsilcmop={} from db".format(nsilcmop_id)
             db_nsilcmop = self.db.get_one("nsilcmops", {"_id": nsilcmop_id})
-            
+
             # TODO: Check if makes sense check the nsiState=NOT_INSTANTIATED when terminate
             # CASE: Instance was terminated but there is a second request to terminate the instance
             if db_nsir["_admin"]["nsiState"] == "NOT_INSTANTIATED":
@@ -444,23 +451,12 @@ class NetsliceLcm(LcmBase):
             db_nsir_update["detailed-status"] = "Terminating Netslice subnets"
             self.update_db_2("nsis", nsir_id, db_nsir_update)
 
-            # look if previous tasks is in process
-            task_name, task_dependency = self.lcm_tasks.lookfor_related("nsi", nsir_id, nsilcmop_id)
-            if task_dependency:
-                step = db_nsilcmop_update["detailed-status"] = \
-                    "Waiting for related tasks to be completed: {}".format(task_name)
-                self.logger.debug(logging_text + step)
-                self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update)
-                _, pending = await asyncio.wait(task_dependency, timeout=3600)
-                if pending:
-                    raise LcmException("Timeout waiting related tasks to be completed")
-                       
             # Gets the list to keep track of network service records status in the netslice
             nsir_admin = db_nsir["_admin"]
-            nsrs_detailed_list = []       
+            nsrs_detailed_list = []
 
             # Iterate over the network services operation ids to terminate NSs
-            # TODO: (future improvement) look another way check the tasks instead of keep asking  
+            # TODO: (future improvement) look another way check the tasks instead of keep asking
             # -> https://docs.python.org/3/library/asyncio-task.html#waiting-primitives
             # steps: declare ns_tasks, add task when terminate is called, await asyncio.wait(vca_task_list, timeout=300)
             nslcmop_ids = db_nsilcmop["operationParams"].get("nslcmops_ids")
@@ -489,7 +485,7 @@ class NetsliceLcm(LcmBase):
             step = nsir_status_detailed = " Waiting nsi terminated. nsi_id={}".format(nsir_id)
             nsrs_detailed_list_old = None
             self.logger.debug(logging_text + step)
-            
+
             termination_timeout = 2 * 3600   # Two hours
             while termination_timeout > 0:
                 # Check ns termination status
index 4a16835..a04f0a8 100644 (file)
@@ -628,6 +628,12 @@ class NsLcm(LcmBase):
                 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
 
     async def instantiate(self, nsr_id, nslcmop_id):
+
+        # Try to lock HA task here
+        task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
+        if not task_is_locked_by_me:
+            return
+
         logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
         self.logger.debug(logging_text + "Enter")
         # get all needed from database
@@ -644,6 +650,9 @@ class NsLcm(LcmBase):
         n2vc_key_list = []  # list of public keys to be injected as authorized to VMs
         exc = None
         try:
+            # wait for any previous tasks in process
+            await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
+
             step = "Getting nslcmop={} from db".format(nslcmop_id)
             db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
             step = "Getting nsr={} from db".format(nsr_id)
@@ -652,17 +661,6 @@ class NsLcm(LcmBase):
             nsd = db_nsr["nsd"]
             nsr_name = db_nsr["name"]   # TODO short-name??
 
-            # look if previous tasks in process
-            task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
-            if task_dependency:
-                step = db_nslcmop_update["detailed-status"] = \
-                    "Waiting for related tasks to be completed: {}".format(task_name)
-                self.logger.debug(logging_text + step)
-                self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
-                _, pending = await asyncio.wait(task_dependency, timeout=3600)
-                if pending:
-                    raise LcmException("Timeout waiting related tasks to be completed")
-
             step = "Getting vnfrs from db"
             db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
             db_vnfds_ref = {}
@@ -1650,6 +1648,12 @@ class NsLcm(LcmBase):
                             vnf_index, seq.get("name"), nslcmop_operation_state_detail))
 
     async def terminate(self, nsr_id, nslcmop_id):
+
+        # Try to lock HA task here
+        task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
+        if not task_is_locked_by_me:
+            return
+
         logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
         self.logger.debug(logging_text + "Enter")
         db_nsr = None
@@ -1662,6 +1666,9 @@ class NsLcm(LcmBase):
         nslcmop_operation_state = None
         autoremove = False  # autoremove after terminated
         try:
+            # wait for any previous tasks in process
+            await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
+
             step = "Getting nslcmop={} from db".format(nslcmop_id)
             db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
             step = "Getting nsr={} from db".format(nsr_id)
@@ -1998,6 +2005,12 @@ class NsLcm(LcmBase):
             return "FAILED", str(e)
 
     async def action(self, nsr_id, nslcmop_id):
+
+        # Try to lock HA task here
+        task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
+        if not task_is_locked_by_me:
+            return
+
         logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
         self.logger.debug(logging_text + "Enter")
         # get all needed from database
@@ -2009,6 +2022,9 @@ class NsLcm(LcmBase):
         nslcmop_operation_state_detail = None
         exc = None
         try:
+            # wait for any previous tasks in process
+            await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
+
             step = "Getting information from database"
             db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
             db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
@@ -2031,17 +2047,6 @@ class NsLcm(LcmBase):
                     step = "Getting nsd from database"
                     db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
 
-            # look if previous tasks in process
-            task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
-            if task_dependency:
-                step = db_nslcmop_update["detailed-status"] = \
-                    "Waiting for related tasks to be completed: {}".format(task_name)
-                self.logger.debug(logging_text + step)
-                self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
-                _, pending = await asyncio.wait(task_dependency, timeout=3600)
-                if pending:
-                    raise LcmException("Timeout waiting related tasks to be completed")
-
             # for backward compatibility
             if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
                 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
@@ -2129,6 +2134,12 @@ class NsLcm(LcmBase):
             return nslcmop_operation_state, nslcmop_operation_state_detail
 
     async def scale(self, nsr_id, nslcmop_id):
+
+        # Try to lock HA task here
+        task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
+        if not task_is_locked_by_me:
+            return
+
         logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
         self.logger.debug(logging_text + "Enter")
         # get all needed from database
@@ -2144,16 +2155,8 @@ class NsLcm(LcmBase):
         old_config_status = ""
         vnfr_scaled = False
         try:
-            # look if previous tasks in process
-            task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
-            if task_dependency:
-                step = db_nslcmop_update["detailed-status"] = \
-                    "Waiting for related tasks to be completed: {}".format(task_name)
-                self.logger.debug(logging_text + step)
-                self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
-                _, pending = await asyncio.wait(task_dependency, timeout=3600)
-                if pending:
-                    raise LcmException("Timeout waiting related tasks to be completed")
+            # wait for any previous tasks in process
+            await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
 
             step = "Getting nslcmop from database"
             self.logger.debug(step + " after having waited for previous tasks to be completed")