store kdu-instance by LCM after KDU deploy
[osm/LCM.git] / osm_lcm / ns.py
index 5d4f34d..b7805d7 100644 (file)
@@ -33,12 +33,13 @@ from osm_common.dbbase import DbException
 from osm_common.fsbase import FsException
 
 from n2vc.n2vc_juju_conn import N2VCJujuConnector
 from osm_common.fsbase import FsException
 
 from n2vc.n2vc_juju_conn import N2VCJujuConnector
-from n2vc.exceptions import N2VCException
+from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
 
 from copy import copy, deepcopy
 from http import HTTPStatus
 from time import time
 from uuid import uuid4
 
 from copy import copy, deepcopy
 from http import HTTPStatus
 from time import time
 from uuid import uuid4
+from functools import partial
 
 __author__ = "Alfonso Tierno"
 
 
 __author__ = "Alfonso Tierno"
 
@@ -214,11 +215,11 @@ class NsLcm(LcmBase):
             # write to database
             self.update_db_2("nsrs", nsr_id, db_dict)
 
             # write to database
             self.update_db_2("nsrs", nsr_id, db_dict)
 
+        except (asyncio.CancelledError, asyncio.TimeoutError):
+            raise
         except Exception as e:
             self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
 
         except Exception as e:
             self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
 
-        return
-
     def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
         """
         Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
     def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
         """
         Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
@@ -277,7 +278,7 @@ class NsLcm(LcmBase):
             raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
                                format(vnfd["id"], vdu["id"], e))
 
             raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
                                format(vnfd["id"], vdu["id"], e))
 
-    def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
+    def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
         """
         Creates a RO ns descriptor from OSM ns_instantiate params
         :param ns_params: OSM instantiate params
         """
         Creates a RO ns descriptor from OSM ns_instantiate params
         :param ns_params: OSM instantiate params
@@ -765,7 +766,7 @@ class NsLcm(LcmBase):
 
             # Check for and optionally request placement optimization. Database will be updated if placement activated
             stage[2] = "Waiting for Placement."
 
             # Check for and optionally request placement optimization. Database will be updated if placement activated
             stage[2] = "Waiting for Placement."
-            await self.do_placement(logging_text, db_nslcmop, db_vnfrs)
+            await self._do_placement(logging_text, db_nslcmop, db_vnfrs)
 
             # deploy RO
 
 
             # deploy RO
 
@@ -898,7 +899,7 @@ class NsLcm(LcmBase):
                             await asyncio.wait(task_dependency, timeout=3600)
 
                 stage[2] = "Checking instantiation parameters."
                             await asyncio.wait(task_dependency, timeout=3600)
 
                 stage[2] = "Checking instantiation parameters."
-                RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
+                RO_ns_params = self._ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
                 stage[2] = "Deploying ns at VIM."
                 db_nsr_update["detailed-status"] = " ".join(stage)
                 self.update_db_2("nsrs", nsr_id, db_nsr_update)
                 stage[2] = "Deploying ns at VIM."
                 db_nsr_update["detailed-status"] = " ".join(stage)
                 self.update_db_2("nsrs", nsr_id, db_nsr_update)
@@ -1137,6 +1138,10 @@ class NsLcm(LcmBase):
                     namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
                     element_type = 'VDU'
                     element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
                     namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
                     element_type = 'VDU'
                     element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
+                elif kdu_name:
+                    namespace += ".{}".format(kdu_name)
+                    element_type = 'KDU'
+                    element_under_configuration = kdu_name
 
             # Get artifact path
             self.fs.sync()  # Sync from FSMongo
 
             # Get artifact path
             self.fs.sync()  # Sync from FSMongo
@@ -1212,9 +1217,9 @@ class NsLcm(LcmBase):
             ee_id_parts = ee_id.split('.')
             model_name = ee_id_parts[0]
             application_name = ee_id_parts[1]
             ee_id_parts = ee_id.split('.')
             model_name = ee_id_parts[0]
             application_name = ee_id_parts[1]
-            self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
-                                              db_update_entry + "application": application_name,
-                                              db_update_entry + "ee_id": ee_id})
+            db_nsr_update = {db_update_entry + "model": model_name,
+                             db_update_entry + "application": application_name,
+                             db_update_entry + "ee_id": ee_id}
 
             # n2vc_redesign STEP 3.3
 
 
             # n2vc_redesign STEP 3.3
 
@@ -1225,7 +1230,8 @@ class NsLcm(LcmBase):
                 vca_index=vca_index,
                 status='INSTALLING SW',
                 element_under_configuration=element_under_configuration,
                 vca_index=vca_index,
                 status='INSTALLING SW',
                 element_under_configuration=element_under_configuration,
-                element_type=element_type
+                element_type=element_type,
+                other_update=db_nsr_update
             )
 
             # TODO check if already done
             )
 
             # TODO check if already done
@@ -1420,33 +1426,30 @@ class NsLcm(LcmBase):
         except DbException as e:
             self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
 
         except DbException as e:
             self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
 
-    def _write_all_config_status(self, nsr_id: str, status: str):
+    def _write_all_config_status(self, db_nsr: dict, status: str):
         try:
         try:
-            # nsrs record
-            db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+            nsr_id = db_nsr["_id"]
             # configurationStatus
             config_status = db_nsr.get('configurationStatus')
             if config_status:
             # configurationStatus
             config_status = db_nsr.get('configurationStatus')
             if config_status:
+                db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
+                                 enumerate(config_status) if v}
                 # update status
                 # update status
-                db_dict = dict()
-                db_dict['configurationStatus'] = list()
-                for c in config_status:
-                    c['status'] = status
-                    db_dict['configurationStatus'].append(c)
-                self.update_db_2("nsrs", nsr_id, db_dict)
+                self.update_db_2("nsrs", nsr_id, db_nsr_update)
 
         except DbException as e:
             self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
 
     def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
 
         except DbException as e:
             self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
 
     def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
-                                    element_under_configuration: str = None, element_type: str = None):
+                                    element_under_configuration: str = None, element_type: str = None,
+                                    other_update: dict = None):
 
         # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
         #                   .format(vca_index, status))
 
         try:
             db_path = 'configurationStatus.{}.'.format(vca_index)
 
         # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
         #                   .format(vca_index, status))
 
         try:
             db_path = 'configurationStatus.{}.'.format(vca_index)
-            db_dict = dict()
+            db_dict = other_update or {}
             if status:
                 db_dict[db_path + 'status'] = status
             if element_under_configuration:
             if status:
                 db_dict[db_path + 'status'] = status
             if element_under_configuration:
@@ -1458,28 +1461,40 @@ class NsLcm(LcmBase):
             self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
                              .format(status, nsr_id, vca_index, e))
 
             self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
                              .format(status, nsr_id, vca_index, e))
 
-    async def do_placement(self, logging_text, db_nslcmop, db_vnfrs):
+    async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
+        """
+        Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
+        sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
+        Database is used because the result can be obtained from a different LCM worker in case of HA.
+        :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
+        :param db_nslcmop: database content of nslcmop
+        :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
+        :return: None. Modifies database vnfrs and parameter db_vnfr with the computed 'vim-account-id'
+        """
+        nslcmop_id = db_nslcmop['_id']
         placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
         if placement_engine == "PLA":
         placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
         if placement_engine == "PLA":
-            self.logger.debug(logging_text + "Invoke placement optimization for nslcmopId={}".format(db_nslcmop['id']))
-            await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': db_nslcmop['_id']}, loop=self.loop)
+            self.logger.debug(logging_text + "Invoke and wait for placement optimization")
+            await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
             db_poll_interval = 5
             db_poll_interval = 5
-            wait = db_poll_interval * 4
+            wait = db_poll_interval * 10
             pla_result = None
             while not pla_result and wait >= 0:
                 await asyncio.sleep(db_poll_interval)
                 wait -= db_poll_interval
             pla_result = None
             while not pla_result and wait >= 0:
                 await asyncio.sleep(db_poll_interval)
                 wait -= db_poll_interval
-                db_nslcmop = self.db.get_one("nslcmops", {"_id": db_nslcmop["_id"]})
+                db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
                 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
 
             if not pla_result:
                 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
 
             if not pla_result:
-                raise LcmException("Placement timeout for nslcmopId={}".format(db_nslcmop['id']))
+                raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
 
             for pla_vnf in pla_result['vnf']:
                 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
                 if not pla_vnf.get('vimAccountId') or not vnfr:
                     continue
                 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
 
             for pla_vnf in pla_result['vnf']:
                 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
                 if not pla_vnf.get('vimAccountId') or not vnfr:
                     continue
                 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
+                # Modifies db_vnfrs
+                vnfr["vim-account-id"] = pla_vnf['vimAccountId']
         return
 
     def update_nsrs_with_pla_result(self, params):
         return
 
     def update_nsrs_with_pla_result(self, params):
@@ -2070,6 +2085,25 @@ class NsLcm(LcmBase):
             self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
             return False
 
             self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
             return False
 
+    def _write_db_callback(self, task, item, _id, on_done=None, on_exc=None):
+        """
+        callback for kdu install intended to store the returned kdu_instance at database
+        :return: None
+        """
+        db_update = {}
+        try:
+            result = task.result()
+            if on_done:
+                db_update[on_done] = str(result)
+        except Exception as e:
+            if on_exc:
+                db_update[on_exc] = str(e)
+        if db_update:
+            try:
+                self.update_db_2(item, _id, db_update)
+            except Exception:
+                pass
+
     async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
         # Launch kdus if present in the descriptor
 
     async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
         # Launch kdus if present in the descriptor
 
@@ -2102,7 +2136,7 @@ class NsLcm(LcmBase):
                 for kdur in get_iterable(vnfr_data, "kdur"):
                     desc_params = self._format_additional_params(kdur.get("additionalParams"))
                     vnfd_id = vnfr_data.get('vnfd-id')
                 for kdur in get_iterable(vnfr_data, "kdur"):
                     desc_params = self._format_additional_params(kdur.get("additionalParams"))
                     vnfd_id = vnfr_data.get('vnfd-id')
-                    pkgdir = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage', 'pkg-dir'))
+                    namespace = kdur.get("k8s-namespace")
                     if kdur.get("helm-chart"):
                         kdumodel = kdur["helm-chart"]
                         k8sclustertype = "helm-chart"
                     if kdur.get("helm-chart"):
                         kdumodel = kdur["helm-chart"]
                         k8sclustertype = "helm-chart"
@@ -2115,11 +2149,14 @@ class NsLcm(LcmBase):
                                            format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
                     # check if kdumodel is a file and exists
                     try:
                                            format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
                     # check if kdumodel is a file and exists
                     try:
-                        # path format: /vnfdid/pkkdir/kdumodel
-                        filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype, kdumodel)
-                        if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
-                            kdumodel = self.fs.path + filename
-                    except asyncio.CancelledError:
+                        storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
+                        if storage and storage.get('pkg-dir'):  # may be not present if vnfd has not artifacts
+                            # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
+                            filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["'pkg-dir"], k8sclustertype,
+                                                             kdumodel)
+                            if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
+                                kdumodel = self.fs.path + filename
+                    except (asyncio.TimeoutError, asyncio.CancelledError):
                         raise
                     except Exception:       # it is not a file
                         pass
                         raise
                     except Exception:       # it is not a file
                         pass
@@ -2149,20 +2186,25 @@ class NsLcm(LcmBase):
                                         "k8scluster-type": k8sclustertype,
                                         "member-vnf-index": vnfr_data["member-vnf-index-ref"],
                                         "kdu-name": kdur["kdu-name"],
                                         "k8scluster-type": k8sclustertype,
                                         "member-vnf-index": vnfr_data["member-vnf-index-ref"],
                                         "kdu-name": kdur["kdu-name"],
-                                        "kdu-model": kdumodel}
-                    db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
+                                        "kdu-model": kdumodel,
+                                        "namespace": namespace}
+                    db_path = "_admin.deployed.K8s.{}".format(index)
+                    db_nsr_update[db_path] = k8s_instace_info
                     self.update_db_2("nsrs", nsr_id, db_nsr_update)
 
                     db_dict = {"collection": "nsrs",
                                "filter": {"_id": nsr_id},
                     self.update_db_2("nsrs", nsr_id, db_nsr_update)
 
                     db_dict = {"collection": "nsrs",
                                "filter": {"_id": nsr_id},
-                               "path": "_admin.deployed.K8s.{}".format(index)}
+                               "path": db_path}
 
                     task = asyncio.ensure_future(
                         self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
                                                                     atomic=True, params=desc_params,
                                                                     db_dict=db_dict, timeout=600,
 
                     task = asyncio.ensure_future(
                         self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
                                                                     atomic=True, params=desc_params,
                                                                     db_dict=db_dict, timeout=600,
-                                                                    kdu_name=kdur["kdu-name"]))
+                                                                    kdu_name=kdur["kdu-name"], namespace=namespace))
 
 
+                    task.add_done_callback(partial(self._write_db_callback, item="nsrs", _id=nsr_id,
+                                                   on_done=db_path + ".kdu-instance",
+                                                   on_exc=db_path + ".detailed-status"))
                     self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
                     task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
 
                     self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
                     task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
 
@@ -2316,9 +2358,9 @@ class NsLcm(LcmBase):
 
     # sub-operations
 
 
     # sub-operations
 
-    def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
-        op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
-        if (op.get('operationState') == 'COMPLETED'):
+    def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
+        op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
+        if op.get('operationState') == 'COMPLETED':
             # b. Skip sub-operation
             # _ns_execute_primitive() or RO.create_action() will NOT be executed
             return self.SUBOPERATION_STATUS_SKIP
             # b. Skip sub-operation
             # _ns_execute_primitive() or RO.create_action() will NOT be executed
             return self.SUBOPERATION_STATUS_SKIP
@@ -2425,7 +2467,7 @@ class NsLcm(LcmBase):
                 'lcmOperationType': operationType
             }
         op_index = self._find_suboperation(db_nslcmop, match)
                 'lcmOperationType': operationType
             }
         op_index = self._find_suboperation(db_nslcmop, match)
-        if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
+        if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
             # a. New sub-operation
             # The sub-operation does not exist, add it.
             # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
             # a. New sub-operation
             # The sub-operation does not exist, add it.
             # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
@@ -2433,7 +2475,7 @@ class NsLcm(LcmBase):
             vdu_id = None
             vdu_count_index = None
             vdu_name = None
             vdu_id = None
             vdu_count_index = None
             vdu_name = None
-            if (RO_nsr_id and RO_scaling_info):
+            if RO_nsr_id and RO_scaling_info:
                 vnf_config_primitive = None
                 primitive_params = None
             else:
                 vnf_config_primitive = None
                 primitive_params = None
             else:
@@ -2459,7 +2501,7 @@ class NsLcm(LcmBase):
         else:
             # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
             # or op_index (operationState != 'COMPLETED')
         else:
             # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
             # or op_index (operationState != 'COMPLETED')
-            return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
+            return self._retry_or_skip_suboperation(db_nslcmop, op_index)
 
     # Function to return execution_environment id
 
 
     # Function to return execution_environment id
 
@@ -2526,11 +2568,14 @@ class NsLcm(LcmBase):
         if destroy_ee:
             await self.n2vc.delete_execution_environment(vca_deployed["ee_id"])
 
         if destroy_ee:
             await self.n2vc.delete_execution_environment(vca_deployed["ee_id"])
 
-    async def _delete_N2VC(self, nsr_id: str):
-        self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
-        namespace = "." + nsr_id
-        await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
-        self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
+    async def _delete_all_N2VC(self, db_nsr: dict):
+        self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
+        namespace = "." + db_nsr["_id"]
+        try:
+            await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
+        except N2VCNotFound:  # already deleted. Skip
+            pass
+        self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
 
     async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
         """
 
     async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
         """
@@ -2777,13 +2822,14 @@ class NsLcm(LcmBase):
 
             # remove All execution environments at once
             stage[0] = "Stage 3/3 delete all."
 
             # remove All execution environments at once
             stage[0] = "Stage 3/3 delete all."
-            stage[1] = "Deleting all execution environments."
-            self.logger.debug(logging_text + stage[1])
 
 
-            task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_N2VC(nsr_id=nsr_id),
-                                                                    timeout=self.timeout_charm_delete))
-            # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
-            tasks_dict_info[task_delete_ee] = "Terminating all VCA"
+            if nsr_deployed.get("VCA"):
+                stage[1] = "Deleting all execution environments."
+                self.logger.debug(logging_text + stage[1])
+                task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
+                                                                        timeout=self.timeout_charm_delete))
+                # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
+                tasks_dict_info[task_delete_ee] = "Terminating all VCA"
 
             # Delete from k8scluster
             stage[1] = "Deleting KDUs."
 
             # Delete from k8scluster
             stage[1] = "Deleting KDUs."
@@ -3146,8 +3192,11 @@ class NsLcm(LcmBase):
             else:
                 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
 
             else:
                 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
 
+            if kdu_name:
+                kdu_action = True if not deep_get(kdu, ("kdu-configuration", "juju")) else False
+
             # TODO check if ns is in a proper status
             # TODO check if ns is in a proper status
-            if kdu_name and primitive in ("upgrade", "rollback", "status"):
+            if kdu_name and (primitive in ("upgrade", "rollback", "status") or kdu_action):
                 # kdur and desc_params already set from before
                 if primitive_params:
                     desc_params.update(primitive_params)
                 # kdur and desc_params already set from before
                 if primitive_params:
                     desc_params.update(primitive_params)
@@ -3199,13 +3248,24 @@ class NsLcm(LcmBase):
                             cluster_uuid=kdu.get("k8scluster-uuid"),
                             kdu_instance=kdu.get("kdu-instance")),
                         timeout=timeout_ns_action)
                             cluster_uuid=kdu.get("k8scluster-uuid"),
                             kdu_instance=kdu.get("kdu-instance")),
                         timeout=timeout_ns_action)
+                else:
+                    kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
+                    params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
+
+                    detailed_status = await asyncio.wait_for(
+                        self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
+                            cluster_uuid=kdu.get("k8scluster-uuid"),
+                            kdu_instance=kdu_instance,
+                            primitive_name=primitive,
+                            params=params, db_dict=db_dict,
+                            timeout=timeout_ns_action),
+                        timeout=timeout_ns_action)
 
                 if detailed_status:
                     nslcmop_operation_state = 'COMPLETED'
                 else:
                     detailed_status = ''
                     nslcmop_operation_state = 'FAILED'
 
                 if detailed_status:
                     nslcmop_operation_state = 'COMPLETED'
                 else:
                     detailed_status = ''
                     nslcmop_operation_state = 'FAILED'
-
             else:
                 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
                     self._look_for_deployed_vca(nsr_deployed["VCA"],
             else:
                 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
                     self._look_for_deployed_vca(nsr_deployed["VCA"],
@@ -3222,7 +3282,7 @@ class NsLcm(LcmBase):
                                                                                    detailed_status))
             return  # database update is called inside finally
 
                                                                                    detailed_status))
             return  # database update is called inside finally
 
-        except (DbException, LcmException, N2VCException) as e:
+        except (DbException, LcmException, N2VCException, K8sException) as e:
             self.logger.error(logging_text + "Exit Exception {}".format(e))
             exc = e
         except asyncio.CancelledError:
             self.logger.error(logging_text + "Exit Exception {}".format(e))
             exc = e
         except asyncio.CancelledError: