vnf RO deploy using db_vnfrs.vim-account-id
[osm/LCM.git] / osm_lcm / ns.py
index bcd1af9..07f5845 100644 (file)
@@ -33,12 +33,13 @@ from osm_common.dbbase import DbException
 from osm_common.fsbase import FsException
 
 from n2vc.n2vc_juju_conn import N2VCJujuConnector
 from osm_common.fsbase import FsException
 
 from n2vc.n2vc_juju_conn import N2VCJujuConnector
-from n2vc.exceptions import N2VCException
+from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
 
 from copy import copy, deepcopy
 from http import HTTPStatus
 from time import time
 from uuid import uuid4
 
 from copy import copy, deepcopy
 from http import HTTPStatus
 from time import time
 from uuid import uuid4
+from functools import partial
 
 __author__ = "Alfonso Tierno"
 
 
 __author__ = "Alfonso Tierno"
 
@@ -277,10 +278,12 @@ class NsLcm(LcmBase):
             raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
                                format(vnfd["id"], vdu["id"], e))
 
             raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
                                format(vnfd["id"], vdu["id"], e))
 
-    def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
+    def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, db_vnfrs, n2vc_key_list):
         """
         Creates a RO ns descriptor from OSM ns_instantiate params
         :param ns_params: OSM instantiate params
         """
         Creates a RO ns descriptor from OSM ns_instantiate params
         :param ns_params: OSM instantiate params
+        :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
+        :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
         :return: The RO ns descriptor
         """
         vim_2_RO = {}
         :return: The RO ns descriptor
         """
         vim_2_RO = {}
@@ -342,6 +345,13 @@ class NsLcm(LcmBase):
             "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
             # "scenario": ns_params["nsdId"],
         }
             "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
             # "scenario": ns_params["nsdId"],
         }
+        # set vim_account of each vnf if different from general vim_account.
+        # Get this information from <vnfr> database content, key vim-account-id
+        # Vim account can be set by placement_engine and it may be different from
+        # the instantiate parameters (vnfs.member-vnf-index.datacenter).
+        for vnf_index, vnfr in db_vnfrs.items():
+            if vnfr.get("vim-account-id") and vnfr["vim-account-id"] != ns_params["vimAccountId"]:
+                populate_dict(RO_ns_params, ("vnfs", vnf_index, "datacenter"), vim_account_2_RO(vnfr["vim-account-id"]))
 
         n2vc_key_list = n2vc_key_list or []
         for vnfd_ref, vnfd in vnfd_dict.items():
 
         n2vc_key_list = n2vc_key_list or []
         for vnfd_ref, vnfd in vnfd_dict.items():
@@ -390,9 +400,6 @@ class NsLcm(LcmBase):
             else:
                 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
                                    "constituent-vnfd".format(vnf_params["member-vnf-index"]))
             else:
                 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
                                    "constituent-vnfd".format(vnf_params["member-vnf-index"]))
-            if vnf_params.get("vimAccountId"):
-                populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
-                              vim_account_2_RO(vnf_params["vimAccountId"]))
 
             for vdu_params in get_iterable(vnf_params, "vdu"):
                 # TODO feature 1417: check that this VDU exist and it is not a PDU
 
             for vdu_params in get_iterable(vnf_params, "vdu"):
                 # TODO feature 1417: check that this VDU exist and it is not a PDU
@@ -751,6 +758,19 @@ class NsLcm(LcmBase):
 
     async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
                              n2vc_key_list, stage):
 
     async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
                              n2vc_key_list, stage):
+        """
+        Instantiate at RO
+        :param logging_text: preffix text to use at logging
+        :param nsr_id: nsr identity
+        :param nsd: database content of ns descriptor
+        :param db_nsr: database content of ns record
+        :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
+        :param db_vnfrs:
+        :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
+        :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
+        :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
+        :return: None or exception
+        """
         try:
             db_nsr_update = {}
             RO_descriptor_number = 0   # number of descriptors created at RO
         try:
             db_nsr_update = {}
             RO_descriptor_number = 0   # number of descriptors created at RO
@@ -765,7 +785,7 @@ class NsLcm(LcmBase):
 
             # Check for and optionally request placement optimization. Database will be updated if placement activated
             stage[2] = "Waiting for Placement."
 
             # Check for and optionally request placement optimization. Database will be updated if placement activated
             stage[2] = "Waiting for Placement."
-            await self.do_placement(logging_text, db_nslcmop, db_vnfrs)
+            await self._do_placement(logging_text, db_nslcmop, db_vnfrs)
 
             # deploy RO
 
 
             # deploy RO
 
@@ -898,7 +918,7 @@ class NsLcm(LcmBase):
                             await asyncio.wait(task_dependency, timeout=3600)
 
                 stage[2] = "Checking instantiation parameters."
                             await asyncio.wait(task_dependency, timeout=3600)
 
                 stage[2] = "Checking instantiation parameters."
-                RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
+                RO_ns_params = self._ns_params_2_RO(ns_params, nsd, db_vnfds_ref, db_vnfrs, n2vc_key_list)
                 stage[2] = "Deploying ns at VIM."
                 db_nsr_update["detailed-status"] = " ".join(stage)
                 self.update_db_2("nsrs", nsr_id, db_nsr_update)
                 stage[2] = "Deploying ns at VIM."
                 db_nsr_update["detailed-status"] = " ".join(stage)
                 self.update_db_2("nsrs", nsr_id, db_nsr_update)
@@ -1460,28 +1480,40 @@ class NsLcm(LcmBase):
             self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
                              .format(status, nsr_id, vca_index, e))
 
             self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
                              .format(status, nsr_id, vca_index, e))
 
-    async def do_placement(self, logging_text, db_nslcmop, db_vnfrs):
+    async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
+        """
+        Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
+        sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
+        Database is used because the result can be obtained from a different LCM worker in case of HA.
+        :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
+        :param db_nslcmop: database content of nslcmop
+        :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
+        :return: None. Modifies database vnfrs and parameter db_vnfr with the computed 'vim-account-id'
+        """
+        nslcmop_id = db_nslcmop['_id']
         placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
         if placement_engine == "PLA":
         placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
         if placement_engine == "PLA":
-            self.logger.debug(logging_text + "Invoke placement optimization for nslcmopId={}".format(db_nslcmop['id']))
-            await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': db_nslcmop['_id']}, loop=self.loop)
+            self.logger.debug(logging_text + "Invoke and wait for placement optimization")
+            await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
             db_poll_interval = 5
             db_poll_interval = 5
-            wait = db_poll_interval * 4
+            wait = db_poll_interval * 10
             pla_result = None
             while not pla_result and wait >= 0:
                 await asyncio.sleep(db_poll_interval)
                 wait -= db_poll_interval
             pla_result = None
             while not pla_result and wait >= 0:
                 await asyncio.sleep(db_poll_interval)
                 wait -= db_poll_interval
-                db_nslcmop = self.db.get_one("nslcmops", {"_id": db_nslcmop["_id"]})
+                db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
                 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
 
             if not pla_result:
                 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
 
             if not pla_result:
-                raise LcmException("Placement timeout for nslcmopId={}".format(db_nslcmop['id']))
+                raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
 
             for pla_vnf in pla_result['vnf']:
                 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
                 if not pla_vnf.get('vimAccountId') or not vnfr:
                     continue
                 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
 
             for pla_vnf in pla_result['vnf']:
                 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
                 if not pla_vnf.get('vimAccountId') or not vnfr:
                     continue
                 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
+                # Modifies db_vnfrs
+                vnfr["vim-account-id"] = pla_vnf['vimAccountId']
         return
 
     def update_nsrs_with_pla_result(self, params):
         return
 
     def update_nsrs_with_pla_result(self, params):
@@ -2072,6 +2104,25 @@ class NsLcm(LcmBase):
             self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
             return False
 
             self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
             return False
 
+    def _write_db_callback(self, task, item, _id, on_done=None, on_exc=None):
+        """
+        callback for kdu install intended to store the returned kdu_instance at database
+        :return: None
+        """
+        db_update = {}
+        try:
+            result = task.result()
+            if on_done:
+                db_update[on_done] = str(result)
+        except Exception as e:
+            if on_exc:
+                db_update[on_exc] = str(e)
+        if db_update:
+            try:
+                self.update_db_2(item, _id, db_update)
+            except Exception:
+                pass
+
     async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
         # Launch kdus if present in the descriptor
 
     async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
         # Launch kdus if present in the descriptor
 
@@ -2104,6 +2155,7 @@ class NsLcm(LcmBase):
                 for kdur in get_iterable(vnfr_data, "kdur"):
                     desc_params = self._format_additional_params(kdur.get("additionalParams"))
                     vnfd_id = vnfr_data.get('vnfd-id')
                 for kdur in get_iterable(vnfr_data, "kdur"):
                     desc_params = self._format_additional_params(kdur.get("additionalParams"))
                     vnfd_id = vnfr_data.get('vnfd-id')
+                    namespace = kdur.get("k8s-namespace")
                     if kdur.get("helm-chart"):
                         kdumodel = kdur["helm-chart"]
                         k8sclustertype = "helm-chart"
                     if kdur.get("helm-chart"):
                         kdumodel = kdur["helm-chart"]
                         k8sclustertype = "helm-chart"
@@ -2153,20 +2205,25 @@ class NsLcm(LcmBase):
                                         "k8scluster-type": k8sclustertype,
                                         "member-vnf-index": vnfr_data["member-vnf-index-ref"],
                                         "kdu-name": kdur["kdu-name"],
                                         "k8scluster-type": k8sclustertype,
                                         "member-vnf-index": vnfr_data["member-vnf-index-ref"],
                                         "kdu-name": kdur["kdu-name"],
-                                        "kdu-model": kdumodel}
-                    db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
+                                        "kdu-model": kdumodel,
+                                        "namespace": namespace}
+                    db_path = "_admin.deployed.K8s.{}".format(index)
+                    db_nsr_update[db_path] = k8s_instace_info
                     self.update_db_2("nsrs", nsr_id, db_nsr_update)
 
                     db_dict = {"collection": "nsrs",
                                "filter": {"_id": nsr_id},
                     self.update_db_2("nsrs", nsr_id, db_nsr_update)
 
                     db_dict = {"collection": "nsrs",
                                "filter": {"_id": nsr_id},
-                               "path": "_admin.deployed.K8s.{}".format(index)}
+                               "path": db_path}
 
                     task = asyncio.ensure_future(
                         self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
                                                                     atomic=True, params=desc_params,
                                                                     db_dict=db_dict, timeout=600,
 
                     task = asyncio.ensure_future(
                         self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
                                                                     atomic=True, params=desc_params,
                                                                     db_dict=db_dict, timeout=600,
-                                                                    kdu_name=kdur["kdu-name"]))
+                                                                    kdu_name=kdur["kdu-name"], namespace=namespace))
 
 
+                    task.add_done_callback(partial(self._write_db_callback, item="nsrs", _id=nsr_id,
+                                                   on_done=db_path + ".kdu-instance",
+                                                   on_exc=db_path + ".detailed-status"))
                     self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
                     task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
 
                     self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
                     task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
 
@@ -2533,7 +2590,10 @@ class NsLcm(LcmBase):
     async def _delete_all_N2VC(self, db_nsr: dict):
         self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
         namespace = "." + db_nsr["_id"]
     async def _delete_all_N2VC(self, db_nsr: dict):
         self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
         namespace = "." + db_nsr["_id"]
-        await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
+        try:
+            await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
+        except N2VCNotFound:  # already deleted. Skip
+            pass
         self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
 
     async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
         self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
 
     async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
@@ -2927,7 +2987,8 @@ class NsLcm(LcmBase):
                     new_error = created_tasks_info[task] + ": {}".format(exc)
                     error_list.append(created_tasks_info[task])
                     error_detail_list.append(new_error)
                     new_error = created_tasks_info[task] + ": {}".format(exc)
                     error_list.append(created_tasks_info[task])
                     error_detail_list.append(new_error)
-                    if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException)):
+                    if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
+                                        K8sException)):
                         self.logger.error(logging_text + new_error)
                     else:
                         exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
                         self.logger.error(logging_text + new_error)
                     else:
                         exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
@@ -3151,8 +3212,11 @@ class NsLcm(LcmBase):
             else:
                 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
 
             else:
                 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
 
+            if kdu_name:
+                kdu_action = True if not deep_get(kdu, ("kdu-configuration", "juju")) else False
+
             # TODO check if ns is in a proper status
             # TODO check if ns is in a proper status
-            if kdu_name and primitive in ("upgrade", "rollback", "status"):
+            if kdu_name and (primitive in ("upgrade", "rollback", "status") or kdu_action):
                 # kdur and desc_params already set from before
                 if primitive_params:
                     desc_params.update(primitive_params)
                 # kdur and desc_params already set from before
                 if primitive_params:
                     desc_params.update(primitive_params)
@@ -3204,13 +3268,24 @@ class NsLcm(LcmBase):
                             cluster_uuid=kdu.get("k8scluster-uuid"),
                             kdu_instance=kdu.get("kdu-instance")),
                         timeout=timeout_ns_action)
                             cluster_uuid=kdu.get("k8scluster-uuid"),
                             kdu_instance=kdu.get("kdu-instance")),
                         timeout=timeout_ns_action)
+                else:
+                    kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
+                    params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
+
+                    detailed_status = await asyncio.wait_for(
+                        self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
+                            cluster_uuid=kdu.get("k8scluster-uuid"),
+                            kdu_instance=kdu_instance,
+                            primitive_name=primitive,
+                            params=params, db_dict=db_dict,
+                            timeout=timeout_ns_action),
+                        timeout=timeout_ns_action)
 
                 if detailed_status:
                     nslcmop_operation_state = 'COMPLETED'
                 else:
                     detailed_status = ''
                     nslcmop_operation_state = 'FAILED'
 
                 if detailed_status:
                     nslcmop_operation_state = 'COMPLETED'
                 else:
                     detailed_status = ''
                     nslcmop_operation_state = 'FAILED'
-
             else:
                 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
                     self._look_for_deployed_vca(nsr_deployed["VCA"],
             else:
                 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
                     self._look_for_deployed_vca(nsr_deployed["VCA"],
@@ -3227,7 +3302,7 @@ class NsLcm(LcmBase):
                                                                                    detailed_status))
             return  # database update is called inside finally
 
                                                                                    detailed_status))
             return  # database update is called inside finally
 
-        except (DbException, LcmException, N2VCException) as e:
+        except (DbException, LcmException, N2VCException, K8sException) as e:
             self.logger.error(logging_text + "Exit Exception {}".format(e))
             exc = e
         except asyncio.CancelledError:
             self.logger.error(logging_text + "Exit Exception {}".format(e))
             exc = e
         except asyncio.CancelledError: