Feature 10954 to automatically select WIMs for inter-datacenter networks
[osm/LCM.git] / osm_lcm / ns.py
index 2e9c1bc..e3655df 100644 (file)
@@ -102,6 +102,11 @@ from osm_common.fsbase import FsException
 
 from osm_lcm.data_utils.database.database import Database
 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
+from osm_lcm.data_utils.wim import (
+    get_sdn_ports,
+    get_target_wim_attrs,
+    select_feasible_wim_account,
+)
 
 from n2vc.n2vc_juju_conn import N2VCJujuConnector
 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
@@ -134,6 +139,7 @@ class NsLcm(LcmBase):
     )  # timeout for some progress in a primitive execution
     timeout_migrate = 1800  # default global timeout for migrating vnfs
     timeout_operate = 1800  # default global timeout for migrating vnfs
+    timeout_verticalscale = 1800   # default global timeout for Vertical Sclaing
     SUBOPERATION_STATUS_NOT_FOUND = -1
     SUBOPERATION_STATUS_NEW = -2
     SUBOPERATION_STATUS_SKIP = -3
@@ -224,6 +230,8 @@ class NsLcm(LcmBase):
             "termination": self.RO.status,
             "migrate": self.RO.status,
             "healing": self.RO.recreate_status,
+            "verticalscale": self.RO.status,
+            "start_stop_rebuild": self.RO.status,
         }
 
     @staticmethod
@@ -421,7 +429,7 @@ class NsLcm(LcmBase):
     @staticmethod
     def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
         try:
-            env = Environment(undefined=StrictUndefined)
+            env = Environment(undefined=StrictUndefined, autoescape=True)
             template = env.from_string(cloud_init_text)
             return template.render(additional_params or {})
         except UndefinedError as e:
@@ -858,9 +866,30 @@ class NsLcm(LcmBase):
                     target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
                         "provider-network"
                     ]["sdn-ports"]
-            if vld_params.get("wimAccountId"):
-                target_wim = "wim:{}".format(vld_params["wimAccountId"])
-                target_vld["vim_info"][target_wim] = {}
+
+            # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
+            # if wim_account_id is specified in vld_params, validate if it is feasible.
+            wim_account_id, db_wim = select_feasible_wim_account(
+                db_nsr, db_vnfrs, target_vld, vld_params, self.logger
+            )
+
+            if wim_account_id:
+                # WIM is needed and a feasible one was found, populate WIM target and SDN ports
+                self.logger.info("WIM selected: {:s}".format(str(wim_account_id)))
+                # update vld_params with correct WIM account Id
+                vld_params["wimAccountId"] = wim_account_id
+
+                target_wim = "wim:{}".format(wim_account_id)
+                target_wim_attrs = get_target_wim_attrs(nsr_id, target_vld, vld_params)
+                sdn_ports = get_sdn_ports(vld_params, db_wim)
+                if len(sdn_ports) > 0:
+                    target_vld["vim_info"][target_wim] = target_wim_attrs
+                    target_vld["vim_info"][target_wim]["sdn-ports"] = sdn_ports
+
+                self.logger.debug(
+                    "Target VLD with WIM data: {:s}".format(str(target_vld))
+                )
+
             for param in ("vim-network-name", "vim-network-id"):
                 if vld_params.get(param):
                     if isinstance(vld_params[param], dict):
@@ -897,18 +926,55 @@ class NsLcm(LcmBase):
                             get_iterable(vdur, "interfaces"),
                             lambda iface: iface.get("ns-vld-id") == a_vld["name"],
                         )
+
+                        vld_params = find_in_list(
+                            get_iterable(ns_params, "vld"),
+                            lambda v_vld: v_vld["name"] in (a_vld["name"], a_vld["id"]),
+                        )
                         if target_vld:
+
                             if vnf_params.get("vimAccountId") not in a_vld.get(
                                 "vim_info", {}
                             ):
+                                target_vim_network_list = [
+                                    v for _, v in a_vld.get("vim_info").items()
+                                ]
+                                target_vim_network_name = next(
+                                    (
+                                        item.get("vim_network_name", "")
+                                        for item in target_vim_network_list
+                                    ),
+                                    "",
+                                )
+
                                 target["ns"]["vld"][a_index].get("vim_info").update(
                                     {
                                         "vim:{}".format(vnf_params["vimAccountId"]): {
-                                            "vim_network_name": ""
+                                            "vim_network_name": target_vim_network_name,
                                         }
                                     }
                                 )
 
+                                if vld_params:
+                                    for param in ("vim-network-name", "vim-network-id"):
+                                        if vld_params.get(param) and isinstance(
+                                            vld_params[param], dict
+                                        ):
+                                            for vim, vim_net in vld_params[
+                                                param
+                                            ].items():
+                                                other_target_vim = "vim:" + vim
+                                                populate_dict(
+                                                    target["ns"]["vld"][a_index].get(
+                                                        "vim_info"
+                                                    ),
+                                                    (
+                                                        other_target_vim,
+                                                        param.replace("-", "_"),
+                                                    ),
+                                                    vim_net,
+                                                )
+
         nslcmop_id = db_nslcmop["_id"]
         target = {
             "name": db_nsr["name"],
@@ -965,16 +1031,16 @@ class NsLcm(LcmBase):
             # check if this network needs SDN assist
             if vld.get("pci-interfaces"):
                 db_vim = get_vim_account(ns_params["vimAccountId"])
-                sdnc_id = db_vim["config"].get("sdn-controller")
-                if sdnc_id:
-                    sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
-                    target_sdn = "sdn:{}".format(sdnc_id)
-                    target_vld["vim_info"][target_sdn] = {
-                        "sdn": True,
-                        "target_vim": target_vim,
-                        "vlds": [sdn_vld],
-                        "type": vld.get("type"),
-                    }
+                if vim_config := db_vim.get("config"):
+                    if sdnc_id := vim_config.get("sdn-controller"):
+                        sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
+                        target_sdn = "sdn:{}".format(sdnc_id)
+                        target_vld["vim_info"][target_sdn] = {
+                            "sdn": True,
+                            "target_vim": target_vim,
+                            "vlds": [sdn_vld],
+                            "type": vld.get("type"),
+                        }
 
             nsd_vnf_profiles = get_vnf_profiles(nsd)
             for nsd_vnf_profile in nsd_vnf_profiles:
@@ -2844,9 +2910,9 @@ class NsLcm(LcmBase):
             self.logger.debug(logging_text + "Exit")
             self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
 
-    def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
+    def _get_vnfd(self, vnfd_id: str, projects_read: str, cached_vnfds: Dict[str, Any]):
         if vnfd_id not in cached_vnfds:
-            cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
+            cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id, "_admin.projects_read": projects_read})
         return cached_vnfds[vnfd_id]
 
     def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
@@ -2888,7 +2954,8 @@ class NsLcm(LcmBase):
         ]:
             vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
             vnfd_id = vnf_profile["vnfd-id"]
-            db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
+            project = nsd["_admin"]["projects_read"][0]
+            db_vnfd = self._get_vnfd(vnfd_id, project, cached_vnfds)
             entity_id = (
                 vnfd_id
                 if ee_relation_level == EELevel.VNF
@@ -2961,7 +3028,8 @@ class NsLcm(LcmBase):
         vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
         vnf_profile_id = vnf_profile["id"]
         vnfd_id = vnf_profile["vnfd-id"]
-        db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
+        project = nsd["_admin"]["projects_read"][0]
+        db_vnfd = self._get_vnfd(vnfd_id, project, cached_vnfds)
         db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
         for r in db_vnf_relations:
             provider_dict = None
@@ -3016,7 +3084,8 @@ class NsLcm(LcmBase):
             vnf_profiles,
             lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
         )["vnfd-id"]
-        db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
+        project = nsd["_admin"]["projects_read"][0]
+        db_vnfd = self._get_vnfd(vnfd_id, project, cached_vnfds)
         kdu_resource_profile = get_kdu_resource_profile(
             db_vnfd, ee_relation.kdu_resource_profile_id
         )
@@ -5196,12 +5265,17 @@ class NsLcm(LcmBase):
                         parts = kdu_model.split(sep=":")
                         if len(parts) == 2:
                             kdu_model = parts[0]
+                    if desc_params.get("kdu_atomic_upgrade"):
+                        atomic_upgrade = desc_params.get("kdu_atomic_upgrade").lower() in ("yes", "true", "1")
+                        del desc_params["kdu_atomic_upgrade"]
+                    else:
+                        atomic_upgrade = True
 
                     detailed_status = await asyncio.wait_for(
                         self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
                             cluster_uuid=kdu.get("k8scluster-uuid"),
                             kdu_instance=kdu.get("kdu-instance"),
-                            atomic=True,
+                            atomic=atomic_upgrade,
                             kdu_model=kdu_model,
                             params=desc_params,
                             db_dict=db_dict,
@@ -7236,13 +7310,17 @@ class NsLcm(LcmBase):
             db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_id})
             vim_account_id = db_vnfr.get("vim-account-id")
             vim_info_key = "vim:" + vim_account_id
+            vdu_id = additional_param["vdu_id"]
+            vdurs = [item for item in db_vnfr["vdur"] if item["vdu-id-ref"] == vdu_id]
             vdur = find_in_list(
-                db_vnfr["vdur"], lambda vdu: vdu["count-index"] == additional_param["count-index"]
+                vdurs, lambda vdu: vdu["count-index"] == additional_param["count-index"]
                 )
             if vdur:
                 vdu_vim_name = vdur["name"]
                 vim_vm_id = vdur["vim_info"][vim_info_key]["vim_id"]
                 target_vim, _ = next(k_v for k_v in vdur["vim_info"].items())
+            else:
+                raise LcmException("Target vdu is not found")
             self.logger.info("vdu_vim_name >> {} ".format(vdu_vim_name))
             # wait for any previous tasks in process
             stage[1] = "Waiting for previous operations to terminate"
@@ -7281,7 +7359,8 @@ class NsLcm(LcmBase):
             self.logger.info("response from RO: {}".format(result_dict))
             action_id = result_dict["action_id"]
             await self._wait_ng_ro(
-                nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_operate
+                nsr_id, action_id, nslcmop_id, start_deploy,
+                self.timeout_operate, None, "start_stop_rebuild",
             )
             return "COMPLETED", "Done"
         except (ROclient.ROClientException, DbException, LcmException) as e:
@@ -7473,17 +7552,22 @@ class NsLcm(LcmBase):
             self.update_db_2("nsrs", nsr_id, db_nsr_update)
 
             step = "Sending heal order to VIM"
-            task_ro = asyncio.ensure_future(
-                self.heal_RO(
+            #task_ro = asyncio.ensure_future(
+            #    self.heal_RO(
+            #        logging_text=logging_text,
+            #        nsr_id=nsr_id,
+            #        db_nslcmop=db_nslcmop,
+            #        stage=stage,
+            #    )
+            #)
+            #self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "heal_RO", task_ro)
+            #tasks_dict_info[task_ro] = "Healing at VIM"
+            await self.heal_RO(
                     logging_text=logging_text,
                     nsr_id=nsr_id,
                     db_nslcmop=db_nslcmop,
                     stage=stage,
                 )
-            )
-            self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "heal_RO", task_ro)
-            tasks_dict_info[task_ro] = "Healing at VIM"
-
             # VCA tasks
             # read from db: nsd
             stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
@@ -7517,7 +7601,17 @@ class NsLcm(LcmBase):
                     member_vnf_index = db_vnfr.get("member-vnf-index-ref")
 
                     # Check each target VDU and deploy N2VC
-                    for target_vdu in target_vnf["additionalParams"].get("vdu", None):
+                    target_vdu_list = target_vnf.get("additionalParams", {}).get("vdu", [])
+                    if not target_vdu_list:
+                        # Codigo nuevo para crear diccionario
+                        target_vdu_list = []
+                        for existing_vdu in db_vnfr.get("vdur"):
+                            vdu_name = existing_vdu.get("vdu-name", None)
+                            vdu_index = existing_vdu.get("count-index", 0)
+                            vdu_run_day1 = target_vnf.get("additionalParams", {}).get("run-day1", False)
+                            vdu_to_be_healed = {"vdu-id": vdu_name, "count-index": vdu_index, "run-day1": vdu_run_day1}
+                            target_vdu_list.append(vdu_to_be_healed)
+                    for target_vdu in target_vdu_list:
                         deploy_params_vdu = target_vdu
                         # Set run-day1 vnf level value if not vdu level value exists
                         if not deploy_params_vdu.get("run-day1") and target_vnf["additionalParams"].get("run-day1"):
@@ -8337,3 +8431,99 @@ class NsLcm(LcmBase):
             await asyncio.sleep(15, loop=self.loop)
         else:  # timeout_ns_deploy
             raise NgRoException("Timeout waiting ns to deploy")
+
+    async def vertical_scale(self, nsr_id, nslcmop_id):
+        """
+        Vertical Scale the VDUs in a NS
+
+        :param: nsr_id: NS Instance ID
+        :param: nslcmop_id: nslcmop ID of migrate
+
+        """
+        # Try to lock HA task here
+        task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+        if not task_is_locked_by_me:
+            return
+        logging_text = "Task ns={} vertical scale ".format(nsr_id)
+        self.logger.debug(logging_text + "Enter")
+        # get all needed from database
+        db_nslcmop = None
+        db_nslcmop_update = {}
+        nslcmop_operation_state = None
+        db_nsr_update = {}
+        target = {}
+        exc = None
+        # in case of error, indicates what part of scale was failed to put nsr at error status
+        start_deploy = time()
+
+        try:
+            # wait for any previous tasks in process
+            step = "Waiting for previous operations to terminate"
+            await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
+
+            self._write_ns_status(
+                nsr_id=nsr_id,
+                ns_state=None,
+                current_operation="VerticalScale",
+                current_operation_id=nslcmop_id
+            )
+            step = "Getting nslcmop from database"
+            self.logger.debug(step + " after having waited for previous tasks to be completed")
+            db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+            operationParams = db_nslcmop.get("operationParams")
+            target = {}
+            target.update(operationParams)
+            desc = await self.RO.vertical_scale(nsr_id, target)
+            self.logger.debug("RO return > {}".format(desc))
+            action_id = desc["action_id"]
+            await self._wait_ng_ro(
+                nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_verticalscale,
+                operation="verticalscale"
+            )
+        except (ROclient.ROClientException, DbException, LcmException) as e:
+            self.logger.error("Exit Exception {}".format(e))
+            exc = e
+        except asyncio.CancelledError:
+            self.logger.error("Cancelled Exception while '{}'".format(step))
+            exc = "Operation was cancelled"
+        except Exception as e:
+            exc = traceback.format_exc()
+            self.logger.critical("Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
+        finally:
+            self._write_ns_status(
+                nsr_id=nsr_id,
+                ns_state=None,
+                current_operation="IDLE",
+                current_operation_id=None,
+            )
+            if exc:
+                db_nslcmop_update[
+                    "detailed-status"
+                ] = "FAILED {}: {}".format(step, exc)
+                nslcmop_operation_state = "FAILED"
+            else:
+                nslcmop_operation_state = "COMPLETED"
+                db_nslcmop_update["detailed-status"] = "Done"
+                db_nsr_update["detailed-status"] = "Done"
+
+            self._write_op_status(
+                op_id=nslcmop_id,
+                stage="",
+                error_message="",
+                operation_state=nslcmop_operation_state,
+                other_update=db_nslcmop_update,
+            )
+            if nslcmop_operation_state:
+                try:
+                    msg = {
+                        "nsr_id": nsr_id,
+                        "nslcmop_id": nslcmop_id,
+                        "operationState": nslcmop_operation_state,
+                    }
+                    await self.msg.aiowrite("ns", "verticalscaled", msg, loop=self.loop)
+                except Exception as e:
+                    self.logger.error(
+                        logging_text + "kafka_write notification Exception {}".format(e)
+                    )
+            self.logger.debug(logging_text + "Exit")
+            self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_verticalscale")