X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Fns.py;h=0f92f9d91bc9ed2fcecc90ae5126d09591817608;hb=a8bbe67b84f4ef37107d882f0e7fab6ff6a1fc7e;hp=5b06c181b8679fccb0204f7d6b3112d829976376;hpb=e5d05979d8a2b6c3f1fca316053a997790f3d536;p=osm%2FLCM.git diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 5b06c18..0f92f9d 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -28,6 +28,7 @@ from osm_lcm import ROclient from osm_lcm.ng_ro import NgRoClient, NgRoException from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict from n2vc.k8s_helm_conn import K8sHelmConnector +from n2vc.k8s_helm3_conn import K8sHelm3Connector from n2vc.k8s_juju_conn import K8sJujuConnector from osm_common.dbbase import DbException @@ -48,35 +49,6 @@ from random import randint __author__ = "Alfonso Tierno " -class N2VCJujuConnectorLCM(N2VCJujuConnector): - - async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None, - progress_timeout: float = None, total_timeout: float = None, - config: dict = None, artifact_path: str = None, - vca_type: str = None) -> (str, dict): - # admit two new parameters, artifact_path and vca_type - if vca_type == "k8s_proxy_charm": - ee_id = await self.install_k8s_proxy_charm( - charm_name=artifact_path[artifact_path.rfind("/") + 1:], - namespace=namespace, - artifact_path=artifact_path, - db_dict=db_dict) - return ee_id, None - else: - return await super().create_execution_environment( - namespace=namespace, db_dict=db_dict, reuse_ee_id=reuse_ee_id, - progress_timeout=progress_timeout, total_timeout=total_timeout) - - async def install_configuration_sw(self, ee_id: str, artifact_path: str, db_dict: dict, - progress_timeout: float = None, total_timeout: float = None, - config: dict = None, num_units: int = 1, vca_type: str = "lxc_proxy_charm"): - if vca_type == "k8s_proxy_charm": - return - return await super().install_configuration_sw( - ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict, progress_timeout=progress_timeout, - total_timeout=total_timeout, config=config, num_units=num_units) - - class NsLcm(LcmBase): timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns @@ -111,7 +83,7 @@ class NsLcm(LcmBase): self.vca_config = config["VCA"].copy() # create N2VC connector - self.n2vc = N2VCJujuConnectorLCM( + self.n2vc = N2VCJujuConnector( db=self.db, fs=self.fs, log=self.logger, @@ -133,7 +105,7 @@ class NsLcm(LcmBase): on_update_db=self._on_update_n2vc_db ) - self.k8sclusterhelm = K8sHelmConnector( + self.k8sclusterhelm2 = K8sHelmConnector( kubectl_command=self.vca_config.get("kubectlpath"), helm_command=self.vca_config.get("helmpath"), fs=self.fs, @@ -142,18 +114,30 @@ class NsLcm(LcmBase): on_update_db=None, ) + self.k8sclusterhelm3 = K8sHelm3Connector( + kubectl_command=self.vca_config.get("kubectlpath"), + helm_command=self.vca_config.get("helm3path"), + fs=self.fs, + log=self.logger, + db=self.db, + on_update_db=None, + ) + self.k8sclusterjuju = K8sJujuConnector( kubectl_command=self.vca_config.get("kubectlpath"), juju_command=self.vca_config.get("jujupath"), fs=self.fs, log=self.logger, db=self.db, + loop=self.loop, on_update_db=None, + vca_config=self.vca_config, ) self.k8scluster_map = { - "helm-chart": self.k8sclusterhelm, - "chart": self.k8sclusterhelm, + "helm-chart": self.k8sclusterhelm2, + "helm-chart-v3": self.k8sclusterhelm3, + "chart": self.k8sclusterhelm3, "juju-bundle": self.k8sclusterjuju, "juju": self.k8sclusterjuju, } @@ -162,7 +146,8 @@ class NsLcm(LcmBase): "lxc_proxy_charm": self.n2vc, "native_charm": self.n2vc, "k8s_proxy_charm": self.n2vc, - "helm": self.conn_helm_ee + "helm": self.conn_helm_ee, + "helm-v3": self.conn_helm_ee } self.prometheus = prometheus @@ -173,6 +158,26 @@ class NsLcm(LcmBase): else: self.RO = ROclient.ROClient(self.loop, **self.ro_config) + @staticmethod + def increment_ip_mac(ip_mac, vm_index=1): + if not isinstance(ip_mac, str): + return ip_mac + try: + # try with ipv4 look for last dot + i = ip_mac.rfind(".") + if i > 0: + i += 1 + return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index) + # try with ipv6 or mac look for last colon. Operate in hex + i = ip_mac.rfind(":") + if i > 0: + i += 1 + # format in hex, len can be 2 for mac or 4 for ipv6 + return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(ip_mac[:i], int(ip_mac[i:], 16) + vm_index) + except Exception: + pass + return None + def _on_update_ro_db(self, nsrs_id, ro_descriptor): # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id)) @@ -368,6 +373,24 @@ class NsLcm(LcmBase): vdu.pop("cloud-init", None) return vnfd_RO + @staticmethod + def ip_profile_2_RO(ip_profile): + RO_ip_profile = deepcopy(ip_profile) + if "dns-server" in RO_ip_profile: + if isinstance(RO_ip_profile["dns-server"], list): + RO_ip_profile["dns-address"] = [] + for ds in RO_ip_profile.pop("dns-server"): + RO_ip_profile["dns-address"].append(ds['address']) + else: + RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server") + if RO_ip_profile.get("ip-version") == "ipv4": + RO_ip_profile["ip-version"] = "IPv4" + if RO_ip_profile.get("ip-version") == "ipv6": + RO_ip_profile["ip-version"] = "IPv6" + if "dhcp-params" in RO_ip_profile: + RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params") + return RO_ip_profile + def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, db_vnfrs, n2vc_key_list): """ Creates a RO ns descriptor from OSM ns_instantiate params @@ -409,23 +432,6 @@ class NsLcm(LcmBase): else: return wim_account - def ip_profile_2_RO(ip_profile): - RO_ip_profile = deepcopy((ip_profile)) - if "dns-server" in RO_ip_profile: - if isinstance(RO_ip_profile["dns-server"], list): - RO_ip_profile["dns-address"] = [] - for ds in RO_ip_profile.pop("dns-server"): - RO_ip_profile["dns-address"].append(ds['address']) - else: - RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server") - if RO_ip_profile.get("ip-version") == "ipv4": - RO_ip_profile["ip-version"] = "IPv4" - if RO_ip_profile.get("ip-version") == "ipv6": - RO_ip_profile["ip-version"] = "IPv6" - if "dhcp-params" in RO_ip_profile: - RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params") - return RO_ip_profile - if not ns_params: return None RO_ns_params = { @@ -548,7 +554,7 @@ class NsLcm(LcmBase): if internal_vld_params.get("ip-profile"): populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks", internal_vld_params["name"], "ip-profile"), - ip_profile_2_RO(internal_vld_params["ip-profile"])) + self.ip_profile_2_RO(internal_vld_params["ip-profile"])) if internal_vld_params.get("provider-network"): populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks", @@ -585,7 +591,7 @@ class NsLcm(LcmBase): for vld_params in get_iterable(ns_params, "vld"): if "ip-profile" in vld_params: populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"), - ip_profile_2_RO(vld_params["ip-profile"])) + self.ip_profile_2_RO(vld_params["ip-profile"])) if vld_params.get("provider-network"): @@ -664,54 +670,53 @@ class NsLcm(LcmBase): cp_params["mac-address"]) return RO_ns_params - def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None): - # make a copy to do not change - vdu_create = copy(vdu_create) - vdu_delete = copy(vdu_delete) - - vdurs = db_vnfr.get("vdur") - if vdurs is None: - vdurs = [] - vdu_index = len(vdurs) - while vdu_index: - vdu_index -= 1 - vdur = vdurs[vdu_index] - if vdur.get("pdu-type"): - continue - vdu_id_ref = vdur["vdu-id-ref"] - if vdu_create and vdu_create.get(vdu_id_ref): - vdur_copy = deepcopy(vdur) - vdur_copy["status"] = "BUILD" - vdur_copy["status-detailed"] = None - vdur_copy["ip_address"]: None - for iface in vdur_copy["interfaces"]: - iface["ip-address"] = None - iface["mac-address"] = None - iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf # TODO ALF - for index in range(0, vdu_create[vdu_id_ref]): + def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False): + + db_vdu_push_list = [] + db_update = {"_admin.modified": time()} + if vdu_create: + for vdu_id, vdu_count in vdu_create.items(): + vdur = next((vdur for vdur in reversed(db_vnfr["vdur"]) if vdur["vdu-id-ref"] == vdu_id), None) + if not vdur: + raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?". + format(vdu_id)) + + for count in range(vdu_count): + vdur_copy = deepcopy(vdur) + vdur_copy["status"] = "BUILD" + vdur_copy["status-detailed"] = None + vdur_copy["ip-address"]: None vdur_copy["_id"] = str(uuid4()) - vdur_copy["count-index"] += 1 - vdurs.insert(vdu_index+1+index, vdur_copy) - self.logger.debug("scale out, adding vdu={}".format(vdur_copy)) - vdur_copy = deepcopy(vdur_copy) - - del vdu_create[vdu_id_ref] - if vdu_delete and vdu_delete.get(vdu_id_ref): - del vdurs[vdu_index] - vdu_delete[vdu_id_ref] -= 1 - if not vdu_delete[vdu_id_ref]: - del vdu_delete[vdu_id_ref] - # check all operations are done - if vdu_create or vdu_delete: - raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format( - vdu_create)) + vdur_copy["count-index"] += count + 1 + vdur_copy["id"] = "{}-{}".format(vdur_copy["vdu-id-ref"], vdur_copy["count-index"]) + vdur_copy.pop("vim_info", None) + for iface in vdur_copy["interfaces"]: + if iface.get("fixed-ip"): + iface["ip-address"] = self.increment_ip_mac(iface["ip-address"], count+1) + else: + iface.pop("ip-address", None) + if iface.get("fixed-mac"): + iface["mac-address"] = self.increment_ip_mac(iface["mac-address"], count+1) + else: + iface.pop("mac-address", None) + iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf + db_vdu_push_list.append(vdur_copy) + # self.logger.debug("scale out, adding vdu={}".format(vdur_copy)) if vdu_delete: - raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format( - vdu_delete)) - - vnfr_update = {"vdur": vdurs} - db_vnfr["vdur"] = vdurs - self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update) + for vdu_id, vdu_count in vdu_delete.items(): + if mark_delete: + indexes_to_delete = [iv[0] for iv in enumerate(db_vnfr["vdur"]) if iv[1]["vdu-id-ref"] == vdu_id] + db_update.update({"vdur.{}.status".format(i): "DELETING" for i in indexes_to_delete[-vdu_count:]}) + else: + # it must be deleted one by one because common.db does not allow otherwise + vdus_to_delete = [v for v in reversed(db_vnfr["vdur"]) if v["vdu-id-ref"] == vdu_id] + for vdu in vdus_to_delete[:vdu_count]: + self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, None, pull={"vdur": {"_id": vdu["_id"]}}) + db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None + self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push) + # modify passed dictionary db_vnfr + db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]}) + db_vnfr["vdur"] = db_vnfr_["vdur"] def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO): """ @@ -885,6 +890,38 @@ class NsLcm(LcmBase): async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list, stage, start_deploy, timeout_ns_deploy): + + db_vims = {} + + def get_vim_account(vim_account_id): + nonlocal db_vims + if vim_account_id in db_vims: + return db_vims[vim_account_id] + db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id}) + db_vims[vim_account_id] = db_vim + return db_vim + + # modify target_vld info with instantiation parameters + def parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn): + if vld_params.get("ip-profile"): + target_vld["vim_info"][target_vim]["ip_profile"] = vld_params["ip-profile"] + if vld_params.get("provider-network"): + target_vld["vim_info"][target_vim]["provider_network"] = vld_params["provider-network"] + if "sdn-ports" in vld_params["provider-network"] and target_sdn: + target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params["provider-network"]["sdn-ports"] + if vld_params.get("wimAccountId"): + target_wim = "wim:{}".format(vld_params["wimAccountId"]) + target_vld["vim_info"][target_wim] = {} + for param in ("vim-network-name", "vim-network-id"): + if vld_params.get(param): + if isinstance(vld_params[param], dict): + pass + # for vim_account, vim_net in vld_params[param].items(): + # TODO populate vim_info RO_vld_sites.append({ + else: # isinstance str + target_vld["vim_info"][target_vim][param.replace("-", "_")] = vld_params[param] + # TODO if vld_params.get("ns-net"): + nslcmop_id = db_nslcmop["_id"] target = { "name": db_nsr["name"], @@ -893,13 +930,20 @@ class NsLcm(LcmBase): "image": deepcopy(db_nsr["image"]), "flavor": deepcopy(db_nsr["flavor"]), "action_id": nslcmop_id, + "cloud_init_content": {}, } for image in target["image"]: - image["vim_info"] = [] + image["vim_info"] = {} for flavor in target["flavor"]: - flavor["vim_info"] = [] + flavor["vim_info"] = {} - ns_params = db_nslcmop.get("operationParams") + if db_nslcmop.get("lcmOperationType") != "instantiate": + # get parameters of instantiation: + db_nslcmop_instantiate = self.db.get_list("nslcmops", {"nsInstanceId": db_nslcmop["nsInstanceId"], + "lcmOperationType": "instantiate"})[-1] + ns_params = db_nslcmop_instantiate.get("operationParams") + else: + ns_params = db_nslcmop.get("operationParams") ssh_keys = [] if ns_params.get("ssh_keys"): ssh_keys += ns_params.get("ssh_keys") @@ -907,36 +951,91 @@ class NsLcm(LcmBase): ssh_keys += n2vc_key_list cp2target = {} - for vld_index, vld in enumerate(nsd.get("vld")): - target_vld = {"id": vld["id"], - "name": vld["name"], - "mgmt-network": vld.get("mgmt-network", False), - "type": vld.get("type"), - "vim_info": [{"vim-network-name": vld.get("vim-network-name"), - "vim_account_id": ns_params["vimAccountId"]}], - } - for cp in vld["vnfd-connection-point-ref"]: + for vld_index, vld in enumerate(db_nsr.get("vld")): + target_vim = "vim:{}".format(ns_params["vimAccountId"]) + target_vld = { + "id": vld["id"], + "name": vld["name"], + "mgmt-network": vld.get("mgmt-network", False), + "type": vld.get("type"), + "vim_info": { + target_vim: {"vim-network-name": vld.get("vim-network-name")} + } + } + # check if this network needs SDN assist + target_sdn = None + if vld.get("pci-interfaces"): + db_vim = get_vim_account(ns_params["vimAccountId"]) + sdnc_id = db_vim["config"].get("sdn-controller") + if sdnc_id: + sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"]) + target_sdn = "sdn:{}".format(sdnc_id) + target_vld["vim_info"][target_sdn] = { + "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")} + + nsd_vld = next(v for v in nsd["vld"] if v["id"] == vld["id"]) + for cp in nsd_vld["vnfd-connection-point-ref"]: cp2target["member_vnf:{}.{}".format(cp["member-vnf-index-ref"], cp["vnfd-connection-point-ref"])] = \ "nsrs:{}:vld.{}".format(nsr_id, vld_index) + + # check at nsd descriptor, if there is an ip-profile + vld_params = {} + if nsd_vld.get("ip-profile-ref"): + ip_profile = next(ipp for ipp in nsd["ip-profiles"] if ipp["name"] == nsd_vld["ip-profile-ref"]) + vld_params["ip-profile"] = ip_profile["ip-profile-params"] + # update vld_params with instantiation params + vld_instantiation_params = next((v for v in get_iterable(ns_params, "vld") + if v["name"] in (vld["name"], vld["id"])), None) + if vld_instantiation_params: + vld_params.update(vld_instantiation_params) + parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn) target["ns"]["vld"].append(target_vld) for vnfr in db_vnfrs.values(): vnfd = db_vnfds_ref[vnfr["vnfd-ref"]] + vnf_params = next((v for v in get_iterable(ns_params, "vnf") + if v["member-vnf-index"] == vnfr["member-vnf-index-ref"]), None) target_vnf = deepcopy(vnfr) + target_vim = "vim:{}".format(vnfr["vim-account-id"]) for vld in target_vnf.get("vld", ()): - # check if connected to a ns.vld + # check if connected to a ns.vld, to fill target' vnf_cp = next((cp for cp in vnfd.get("connection-point", ()) if cp.get("internal-vld-ref") == vld["id"]), None) if vnf_cp: ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"]) if cp2target.get(ns_cp): vld["target"] = cp2target[ns_cp] - vld["vim_info"] = [{"vim-network-name": vld.get("vim-network-name"), - "vim_account_id": vnfr["vim-account-id"]}] - + vld["vim_info"] = {target_vim: {"vim-network-name": vld.get("vim-network-name")}} + # check if this network needs SDN assist + target_sdn = None + if vld.get("pci-interfaces"): + db_vim = get_vim_account(vnfr["vim-account-id"]) + sdnc_id = db_vim["config"].get("sdn-controller") + if sdnc_id: + sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"]) + target_sdn = "sdn:{}".format(sdnc_id) + vld["vim_info"][target_sdn] = { + "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")} + + # check at vnfd descriptor, if there is an ip-profile + vld_params = {} + vnfd_vld = next(v for v in vnfd["internal-vld"] if v["id"] == vld["id"]) + if vnfd_vld.get("ip-profile-ref"): + ip_profile = next(ipp for ipp in vnfd["ip-profiles"] if ipp["name"] == vnfd_vld["ip-profile-ref"]) + vld_params["ip-profile"] = ip_profile["ip-profile-params"] + # update vld_params with instantiation params + if vnf_params: + vld_instantiation_params = next((v for v in get_iterable(vnf_params, "internal-vld") + if v["name"] == vld["id"]), None) + if vld_instantiation_params: + vld_params.update(vld_instantiation_params) + parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn) + + vdur_list = [] for vdur in target_vnf.get("vdur", ()): - vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}] + if vdur.get("status") == "DELETING" or vdur.get("pdu-type"): + continue # This vdu must not be created + vdur["vim_info"] = {target_vim: {}} vdud_index, vdud = next(k for k in enumerate(vnfd["vdu"]) if k[1]["id"] == vdur["vdu-id-ref"]) - # vdur["additionalParams"] = vnfr.get("additionalParamsForVnf") # TODO additional params for VDU if ssh_keys: if deep_get(vdud, ("vdu-configuration", "config-access", "ssh-access", "required")): @@ -950,26 +1049,43 @@ class NsLcm(LcmBase): # cloud-init if vdud.get("cloud-init-file"): vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file")) + # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system + if vdur["cloud-init"] not in target["cloud_init_content"]: + base_folder = vnfd["_admin"]["storage"] + cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"], + vdud.get("cloud-init-file")) + with self.fs.file_open(cloud_init_file, "r") as ci_file: + target["cloud_init_content"][vdur["cloud-init"]] = ci_file.read() elif vdud.get("cloud-init"): vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], vdud_index) + # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor + target["cloud_init_content"][vdur["cloud-init"]] = vdud["cloud-init"] + vdur["additionalParams"] = vdur.get("additionalParams") or {} + deploy_params_vdu = self._format_additional_params(vdur.get("additionalParams") or {}) + deploy_params_vdu["OSM"] = self._get_osm_params(vnfr, vdur["vdu-id-ref"], vdur["count-index"]) + vdur["additionalParams"] = deploy_params_vdu # flavor ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])] - if not next((vi for vi in ns_flavor["vim_info"] if - vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None): - ns_flavor["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]}) + if target_vim not in ns_flavor["vim_info"]: + ns_flavor["vim_info"][target_vim] = {} # image ns_image = target["image"][int(vdur["ns-image-id"])] - if not next((vi for vi in ns_image["vim_info"] if - vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None): - ns_image["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]}) - - vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}] + if target_vim not in ns_image["vim_info"]: + ns_image["vim_info"][target_vim] = {} + + vdur["vim_info"] = {target_vim: {}} + # instantiation parameters + # if vnf_params: + # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] == + # vdud["id"]), None) + vdur_list.append(vdur) + target_vnf["vdur"] = vdur_list target["vnf"].append(target_vnf) desc = await self.RO.deploy(nsr_id, target) action_id = desc["action_id"] - await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage) + await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage) # Updating NSR db_nsr_update = { @@ -982,21 +1098,24 @@ class NsLcm(LcmBase): self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id)) return - async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_time, timeout, stage): + async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id=None, start_time=None, timeout=600, stage=None): detailed_status_old = None db_nsr_update = {} + start_time = start_time or time() while time() <= start_time + timeout: desc_status = await self.RO.status(nsr_id, action_id) if desc_status["status"] == "FAILED": raise NgRoException(desc_status["details"]) elif desc_status["status"] == "BUILD": - stage[2] = "VIM: ({})".format(desc_status["details"]) + if stage: + stage[2] = "VIM: ({})".format(desc_status["details"]) elif desc_status["status"] == "DONE": - stage[2] = "Deployed at VIM" + if stage: + stage[2] = "Deployed at VIM" break else: assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"]) - if stage[2] != detailed_status_old: + if stage and nslcmop_id and stage[2] != detailed_status_old: detailed_status_old = stage[2] db_nsr_update["detailed-status"] = " ".join(stage) self.update_db_2("nsrs", nsr_id, db_nsr_update) @@ -1016,6 +1135,7 @@ class NsLcm(LcmBase): "vnf": [], "image": [], "flavor": [], + "action_id": nslcmop_id } desc = await self.RO.deploy(nsr_id, target) action_id = desc["action_id"] @@ -1025,7 +1145,7 @@ class NsLcm(LcmBase): # wait until done delete_timeout = 20 * 60 # 20 minutes - await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage) + await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage) db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED" @@ -1292,9 +1412,12 @@ class NsLcm(LcmBase): self._write_op_status(nslcmop_id, stage) # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update) # self.logger.debug(logging_text + "Deployed at VIM") - except (ROclient.ROClientException, LcmException, DbException, NgRoException) as e: + except Exception as e: stage[2] = "ERROR deploying at VIM" self.set_vnfr_at_error(db_vnfrs, str(e)) + self.logger.error("Error deploying at VIM {}".format(e), + exc_info=not isinstance(e, (ROclient.ROClientException, LcmException, DbException, + NgRoException))) raise async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name): @@ -1338,7 +1461,7 @@ class NsLcm(LcmBase): :return: IP address """ - # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro") + self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro") ro_nsr_id = None ip_address = None nb_tries = 0 @@ -1373,13 +1496,21 @@ class NsLcm(LcmBase): if not vdur: raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id, vdu_index)) - - if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE": + # New generation RO stores information at "vim_info" + ng_ro_status = None + target_vim = None + if vdur.get("vim_info"): + target_vim = next(t for t in vdur["vim_info"]) # there should be only one key + ng_ro_status = vdur["vim_info"][target_vim].get("vim_status") + if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE" or ng_ro_status == "ACTIVE": ip_address = vdur.get("ip-address") if not ip_address: continue target_vdu_id = vdur["vdu-id-ref"] - elif vdur.get("status") == "ERROR": + elif ( + vdur.get("status") == "ERROR" or + vdur.get("vim_info", {}).get(target_vim, {}).get("vim_status") == "ERROR" + ): raise LcmException("Cannot inject ssh-key because target VM is in error state") if not target_vdu_id: @@ -1387,14 +1518,7 @@ class NsLcm(LcmBase): # inject public key into machine if pub_key and user: - # wait until NS is deployed at RO - if not ro_nsr_id: - db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id}) - ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id")) - if not ro_nsr_id: - continue - - # self.logger.debug(logging_text + "Inserting RO key") + self.logger.debug(logging_text + "Inserting RO key") if vdur.get("pdu-type"): self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU") return ip_address @@ -1402,10 +1526,19 @@ class NsLcm(LcmBase): ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index if self.ng_ro: target = {"action": "inject_ssh_key", "key": pub_key, "user": user, - "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdu_id}]}], + "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}], } - await self.RO.deploy(nsr_id, target) + desc = await self.RO.deploy(nsr_id, target) + action_id = desc["action_id"] + await self._wait_ng_ro(nsr_id, action_id, timeout=600) + break else: + # wait until NS is deployed at RO + if not ro_nsr_id: + db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id}) + ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id")) + if not ro_nsr_id: + continue result_dict = await self.RO.create_action( item="ns", item_id_name=ro_nsr_id, @@ -1531,8 +1664,14 @@ class NsLcm(LcmBase): # find old ee_id if exists ee_id = vca_deployed.get("ee_id") + vim_account_id = ( + deep_get(db_vnfr, ("vim-account-id",)) or + deep_get(deploy_params, ("OSM", "vim_account_id")) + ) + vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id) + vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id) # create or register execution environment in VCA - if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm"): + if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"): self._write_configuration_status( nsr_id=nsr_id, @@ -1544,13 +1683,26 @@ class NsLcm(LcmBase): step = "create execution environment" self.logger.debug(logging_text + step) - ee_id, credentials = await self.vca_map[vca_type].create_execution_environment( - namespace=namespace, - reuse_ee_id=ee_id, - db_dict=db_dict, - config=osm_config, - artifact_path=artifact_path, - vca_type=vca_type) + + ee_id = None + credentials = None + if vca_type == "k8s_proxy_charm": + ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm( + charm_name=artifact_path[artifact_path.rfind("/") + 1:], + namespace=namespace, + artifact_path=artifact_path, + db_dict=db_dict, + cloud_name=vca_k8s_cloud, + credential_name=vca_k8s_cloud_credential, + ) + else: + ee_id, credentials = await self.vca_map[vca_type].create_execution_environment( + namespace=namespace, + reuse_ee_id=ee_id, + db_dict=db_dict, + cloud_name=vca_cloud, + credential_name=vca_cloud_credential, + ) elif vca_type == "native_charm": step = "Waiting to VM being up and getting IP address" @@ -1585,7 +1737,12 @@ class NsLcm(LcmBase): step = "register execution environment {}".format(credentials) self.logger.debug(logging_text + step) ee_id = await self.vca_map[vca_type].register_execution_environment( - credentials=credentials, namespace=namespace, db_dict=db_dict) + credentials=credentials, + namespace=namespace, + db_dict=db_dict, + cloud_name=vca_cloud, + credential_name=vca_cloud_credential, + ) # for compatibility with MON/POL modules, the need model and application name at database # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name" @@ -1631,15 +1788,14 @@ class NsLcm(LcmBase): if vdu_id == v["vdu-id-ref"]: num_units = v.get("config-units") or 1 break - - await self.vca_map[vca_type].install_configuration_sw( - ee_id=ee_id, - artifact_path=artifact_path, - db_dict=db_dict, - config=config, - num_units=num_units, - vca_type=vca_type - ) + if vca_type != "k8s_proxy_charm": + await self.vca_map[vca_type].install_configuration_sw( + ee_id=ee_id, + artifact_path=artifact_path, + db_dict=db_dict, + config=config, + num_units=num_units, + ) # write in db flag of configuration_sw already installed self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}) @@ -1650,7 +1806,7 @@ class NsLcm(LcmBase): # if SSH access is required, then get execution environment SSH public # if native charm we have waited already to VM be UP - if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm"): + if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"): pub_key = None user = None # self.logger.debug("get ssh key block") @@ -1738,7 +1894,7 @@ class NsLcm(LcmBase): # TODO register in database that primitive is done # STEP 7 Configure metrics - if vca_type == "helm": + if vca_type == "helm" or vca_type == "helm-v3": prometheus_jobs = await self.add_prometheus_metrics( ee_id=ee_id, artifact_path=artifact_path, @@ -2567,7 +2723,7 @@ class NsLcm(LcmBase): async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info): # Launch kdus if present in the descriptor - k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}} + k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}} async def _get_cluster_id(cluster_id, cluster_type): nonlocal k8scluster_id_2_uuic @@ -2587,8 +2743,25 @@ class NsLcm(LcmBase): k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id")) if not k8s_id: - raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id, - cluster_type)) + if cluster_type == "helm-chart-v3": + try: + # backward compatibility for existing clusters that have not been initialized for helm v3 + k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials")) + k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials, + reuse_cluster_uuid=cluster_id) + db_k8scluster_update = {} + db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None + db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id + db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw + db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED" + self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update) + except Exception as e: + self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e))) + raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id, + cluster_type)) + else: + raise LcmException("K8s cluster '{}' has not been initialized for '{}'". + format(cluster_id, cluster_type)) k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id return k8s_id @@ -2600,6 +2773,7 @@ class NsLcm(LcmBase): index = 0 updated_cluster_list = [] + updated_v3_cluster_list = [] for vnfr_data in db_vnfrs.values(): for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")): @@ -2610,7 +2784,11 @@ class NsLcm(LcmBase): namespace = kdur.get("k8s-namespace") if kdur.get("helm-chart"): kdumodel = kdur["helm-chart"] - k8sclustertype = "helm-chart" + # Default version: helm3, if helm-version is v2 assign v2 + k8sclustertype = "helm-chart-v3" + self.logger.debug("kdur: {}".format(kdur)) + if kdur.get("helm-version") and kdur.get("helm-version") == "v2": + k8sclustertype = "helm-chart" elif kdur.get("juju-bundle"): kdumodel = kdur["juju-bundle"] k8sclustertype = "juju-bundle" @@ -2637,18 +2815,25 @@ class NsLcm(LcmBase): cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype) # Synchronize repos - if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list: + if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\ + or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list): del_repo_list, added_repo_dict = await asyncio.ensure_future( - self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid)) + self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid)) if del_repo_list or added_repo_dict: - unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list} - updated = {'_admin.helm_charts_added.' + - item: name for item, name in added_repo_dict.items()} - self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, " - "to_add: {}".format(k8s_cluster_id, del_repo_list, - added_repo_dict)) + if k8sclustertype == "helm-chart": + unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list} + updated = {'_admin.helm_charts_added.' + + item: name for item, name in added_repo_dict.items()} + updated_cluster_list.append(cluster_uuid) + elif k8sclustertype == "helm-chart-v3": + unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list} + updated = {'_admin.helm_charts_v3_added.' + + item: name for item, name in added_repo_dict.items()} + updated_v3_cluster_list.append(cluster_uuid) + self.logger.debug(logging_text + "repos synchronized on k8s cluster " + "'{}' to_delete: {}, to_add: {}". + format(k8s_cluster_id, del_repo_list, added_repo_dict)) self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset) - updated_cluster_list.append(cluster_uuid) # Instantiate kdu step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"], @@ -2714,7 +2899,10 @@ class NsLcm(LcmBase): vca_type = "native_charm" elif ee_item.get("helm-chart"): vca_name = ee_item['helm-chart'] - vca_type = "helm" + if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2": + vca_type = "helm" + else: + vca_type = "helm-v3" else: self.logger.debug(logging_text + "skipping non juju neither charm configuration") continue @@ -2903,7 +3091,7 @@ class NsLcm(LcmBase): # 'detailed-status' : status message # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE' # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations. - def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive, + def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive, mapped_primitive_params, operationState=None, detailed_status=None, operationType=None, RO_nsr_id=None, RO_scaling_info=None): if not db_nslcmop: @@ -3328,7 +3516,7 @@ class NsLcm(LcmBase): vca.get("needed_terminate")) # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are # pending native charms - destroy_ee = True if vca_type in ("helm", "native_charm") else False + destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format( # vca_index, vca.get("ee_id"), vca_type, destroy_ee)) task = asyncio.ensure_future( @@ -3505,11 +3693,11 @@ class NsLcm(LcmBase): error_list.append(created_tasks_info[task]) error_detail_list.append(new_error) if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException, - K8sException)): + K8sException, NgRoException)): self.logger.error(logging_text + new_error) else: exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__)) - self.logger.error(logging_text + created_tasks_info[task] + exc_traceback) + self.logger.error(logging_text + created_tasks_info[task] + " " + exc_traceback) else: self.logger.debug(logging_text + created_tasks_info[task] + ": Done") stage[1] = "{}/{}.".format(num_done, num_tasks) @@ -3557,6 +3745,14 @@ class NsLcm(LcmBase): width=256) elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "): calculated_params[param_name] = calculated_params[param_name][7:] + if parameter.get("data-type") == "INTEGER": + try: + calculated_params[param_name] = int(calculated_params[param_name]) + except ValueError: # error converting string to int + raise LcmException( + "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"])) + elif parameter.get("data-type") == "BOOLEAN": + calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false') # add always ns_config_info if primitive name is config if primitive_desc["name"] == "config": @@ -3890,6 +4086,8 @@ class NsLcm(LcmBase): return logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id) + stage = ['', '', ''] + # ^ stage, step, VIM progress self.logger.debug(logging_text + "Enter") # get all needed from database db_nsr = None @@ -3936,7 +4134,7 @@ class NsLcm(LcmBase): # vdu_name = db_nslcmop["operationParams"].get("vdu_name") ####### - RO_nsr_id = nsr_deployed["RO"]["nsr_id"] + RO_nsr_id = nsr_deployed["RO"].get("nsr_id") vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"] scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"] scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"] @@ -4034,14 +4232,13 @@ class NsLcm(LcmBase): vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1) # update VDU_SCALING_INFO with the VDUs to delete ip_addresses - vdu_create = vdu_scaling_info.get("vdu-create") vdu_delete = copy(vdu_scaling_info.get("vdu-delete")) if vdu_scaling_info["scaling_direction"] == "IN": for vdur in reversed(db_vnfr["vdur"]): if vdu_delete.get(vdur["vdu-id-ref"]): vdu_delete[vdur["vdu-id-ref"]] -= 1 vdu_scaling_info["vdu"].append({ - "name": vdur["name"], + "name": vdur.get("name") or vdur.get("vdu-name"), "vdu_id": vdur["vdu-id-ref"], "interface": [] }) @@ -4051,7 +4248,7 @@ class NsLcm(LcmBase): "ip_address": interface["ip-address"], "mac_address": interface.get("mac-address"), }) - vdu_delete = vdu_scaling_info.pop("vdu-delete") + # vdu_delete = vdu_scaling_info.pop("vdu-delete") # PRE-SCALE BEGIN step = "Executing pre-scale vnf-config-primitive" @@ -4128,120 +4325,19 @@ class NsLcm(LcmBase): scale_process = None # PRE-SCALE END + db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op + db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time() + # SCALE RO - BEGIN - # Should this block be skipped if 'RO_nsr_id' == None ? - # if (RO_nsr_id and RO_scaling_info): if RO_scaling_info: scale_process = "RO" - # Scale RO retry check: Check if this sub-operation has been executed before - op_index = self._check_or_add_scale_suboperation( - db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info) - if op_index == self.SUBOPERATION_STATUS_SKIP: - # Skip sub-operation - result = 'COMPLETED' - result_detail = 'Done' - self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format( - result, result_detail)) + if self.ro_config.get("ng"): + await self._scale_ng_ro(logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage) else: - if op_index == self.SUBOPERATION_STATUS_NEW: - # New sub-operation: Get index of this sub-operation - op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1 - self.logger.debug(logging_text + "New sub-operation RO") - else: - # retry: Get registered params for this existing sub-operation - op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index] - RO_nsr_id = op.get('RO_nsr_id') - RO_scaling_info = op.get('RO_scaling_info') - self.logger.debug(logging_text + "Sub-operation RO retry for primitive {}".format( - vnf_config_primitive)) - - RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info}) - db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op - db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time() - # wait until ready - RO_nslcmop_id = RO_desc["instance_action_id"] - db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id - - RO_task_done = False - step = detailed_status = "Waiting for VIM to scale. RO_task_id={}.".format(RO_nslcmop_id) - detailed_status_old = None - self.logger.debug(logging_text + step) - - deployment_timeout = 1 * 3600 # One hour - while deployment_timeout > 0: - if not RO_task_done: - desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action", - extra_item_id=RO_nslcmop_id) - - # deploymentStatus - self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc) - - ns_status, ns_status_info = self.RO.check_action_status(desc) - if ns_status == "ERROR": - raise ROclient.ROClientException(ns_status_info) - elif ns_status == "BUILD": - detailed_status = step + "; {}".format(ns_status_info) - elif ns_status == "ACTIVE": - RO_task_done = True - self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete) - step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id) - self.logger.debug(logging_text + step) - else: - assert False, "ROclient.check_action_status returns unknown {}".format(ns_status) - else: - desc = await self.RO.show("ns", RO_nsr_id) - ns_status, ns_status_info = self.RO.check_ns_status(desc) - # deploymentStatus - self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc) - - if ns_status == "ERROR": - raise ROclient.ROClientException(ns_status_info) - elif ns_status == "BUILD": - detailed_status = step + "; {}".format(ns_status_info) - elif ns_status == "ACTIVE": - step = detailed_status = \ - "Waiting for management IP address reported by the VIM. Updating VNFRs" - try: - # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc) - self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc) - break - except LcmExceptionNoMgmtIP: - pass - else: - assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status) - if detailed_status != detailed_status_old: - self._update_suboperation_status( - db_nslcmop, op_index, 'COMPLETED', detailed_status) - detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status - self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) - - await asyncio.sleep(5, loop=self.loop) - deployment_timeout -= 5 - if deployment_timeout <= 0: - self._update_suboperation_status( - db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready") - raise ROclient.ROClientException("Timeout waiting ns to be ready") - - # update VDU_SCALING_INFO with the obtained ip_addresses - if vdu_scaling_info["scaling_direction"] == "OUT": - for vdur in reversed(db_vnfr["vdur"]): - if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]): - vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1 - vdu_scaling_info["vdu"].append({ - "name": vdur["name"], - "vdu_id": vdur["vdu-id-ref"], - "interface": [] - }) - for interface in vdur["interfaces"]: - vdu_scaling_info["vdu"][-1]["interface"].append({ - "name": interface["name"], - "ip_address": interface["ip-address"], - "mac_address": interface.get("mac-address"), - }) - del vdu_scaling_info["vdu-create"] - - self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done') - # SCALE RO - END + await self._RO_scale(logging_text, RO_nsr_id, RO_scaling_info, db_nslcmop, db_vnfr, + db_nslcmop_update, vdu_scaling_info) + vdu_scaling_info.pop("vdu-create", None) + vdu_scaling_info.pop("vdu-delete", None) scale_process = None if db_nsr_update: @@ -4327,7 +4423,7 @@ class NsLcm(LcmBase): else old_operational_status db_nsr_update["config-status"] = old_config_status return - except (ROclient.ROClientException, DbException, LcmException) as e: + except (ROclient.ROClientException, DbException, LcmException, NgRoException) as e: self.logger.error(logging_text + "Exit Exception {}".format(e)) exc = e except asyncio.CancelledError: @@ -4391,6 +4487,148 @@ class NsLcm(LcmBase): self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale") + async def _scale_ng_ro(self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage): + nsr_id = db_nslcmop["nsInstanceId"] + db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]}) + db_vnfrs = {} + + # read from db: vnfd's for every vnf + db_vnfds = {} # every vnfd data indexed by vnf id + db_vnfds_ref = {} # every vnfd data indexed by vnfd id + db_vnfds = {} + + # for each vnf in ns, read vnfd + for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}): + db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr + vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf + vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf + # if we haven't this vnfd, read it from db + if vnfd_id not in db_vnfds: + # read from db + vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) + db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name + db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id + n2vc_key = self.n2vc.get_public_key() + n2vc_key_list = [n2vc_key] + self.scale_vnfr(db_vnfr, vdu_scaling_info.get("vdu-create"), vdu_scaling_info.get("vdu-delete"), + mark_delete=True) + # db_vnfr has been updated, update db_vnfrs to use it + db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr + await self._instantiate_ng_ro(logging_text, nsr_id, db_nsd, db_nsr, db_nslcmop, db_vnfrs, + db_vnfds_ref, n2vc_key_list, stage=stage, start_deploy=time(), + timeout_ns_deploy=self.timeout_ns_deploy) + if vdu_scaling_info.get("vdu-delete"): + self.scale_vnfr(db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False) + + async def _RO_scale(self, logging_text, RO_nsr_id, RO_scaling_info, db_nslcmop, db_vnfr, db_nslcmop_update, + vdu_scaling_info): + nslcmop_id = db_nslcmop["_id"] + nsr_id = db_nslcmop["nsInstanceId"] + vdu_create = vdu_scaling_info.get("vdu-create") + vdu_delete = vdu_scaling_info.get("vdu-delete") + # Scale RO retry check: Check if this sub-operation has been executed before + op_index = self._check_or_add_scale_suboperation( + db_nslcmop, db_vnfr["member-vnf-index-ref"], None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info) + if op_index == self.SUBOPERATION_STATUS_SKIP: + # Skip sub-operation + result = 'COMPLETED' + result_detail = 'Done' + self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(result, result_detail)) + else: + if op_index == self.SUBOPERATION_STATUS_NEW: + # New sub-operation: Get index of this sub-operation + op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1 + self.logger.debug(logging_text + "New sub-operation RO") + else: + # retry: Get registered params for this existing sub-operation + op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index] + RO_nsr_id = op.get('RO_nsr_id') + RO_scaling_info = op.get('RO_scaling_info') + self.logger.debug(logging_text + "Sub-operation RO retry") + + RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info}) + # wait until ready + RO_nslcmop_id = RO_desc["instance_action_id"] + db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id + + RO_task_done = False + step = detailed_status = "Waiting for VIM to scale. RO_task_id={}.".format(RO_nslcmop_id) + detailed_status_old = None + self.logger.debug(logging_text + step) + + deployment_timeout = 1 * 3600 # One hour + while deployment_timeout > 0: + if not RO_task_done: + desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action", + extra_item_id=RO_nslcmop_id) + + # deploymentStatus + self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc) + + ns_status, ns_status_info = self.RO.check_action_status(desc) + if ns_status == "ERROR": + raise ROclient.ROClientException(ns_status_info) + elif ns_status == "BUILD": + detailed_status = step + "; {}".format(ns_status_info) + elif ns_status == "ACTIVE": + RO_task_done = True + self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete) + step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id) + self.logger.debug(logging_text + step) + else: + assert False, "ROclient.check_action_status returns unknown {}".format(ns_status) + else: + desc = await self.RO.show("ns", RO_nsr_id) + ns_status, ns_status_info = self.RO.check_ns_status(desc) + # deploymentStatus + self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc) + + if ns_status == "ERROR": + raise ROclient.ROClientException(ns_status_info) + elif ns_status == "BUILD": + detailed_status = step + "; {}".format(ns_status_info) + elif ns_status == "ACTIVE": + step = detailed_status = \ + "Waiting for management IP address reported by the VIM. Updating VNFRs" + try: + # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc) + self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc) + break + except LcmExceptionNoMgmtIP: + pass + else: + assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status) + if detailed_status != detailed_status_old: + self._update_suboperation_status( + db_nslcmop, op_index, 'COMPLETED', detailed_status) + detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status + self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) + + await asyncio.sleep(5, loop=self.loop) + deployment_timeout -= 5 + if deployment_timeout <= 0: + self._update_suboperation_status( + db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready") + raise ROclient.ROClientException("Timeout waiting ns to be ready") + + # update VDU_SCALING_INFO with the obtained ip_addresses + if vdu_scaling_info["scaling_direction"] == "OUT": + for vdur in reversed(db_vnfr["vdur"]): + if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]): + vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1 + vdu_scaling_info["vdu"].append({ + "name": vdur["name"] or vdur.get("vdu-name"), + "vdu_id": vdur["vdu-id-ref"], + "interface": [] + }) + for interface in vdur["interfaces"]: + vdu_scaling_info["vdu"][-1]["interface"].append({ + "name": interface["name"], + "ip_address": interface["ip-address"], + "mac_address": interface.get("mac-address"), + }) + self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done') + async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip): if not self.prometheus: return @@ -4422,3 +4660,36 @@ class NsLcm(LcmBase): job_dict = {jl["job_name"]: jl for jl in job_list} if await self.prometheus.update(job_dict): return list(job_dict.keys()) + + def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str): + """ + Get VCA Cloud and VCA Cloud Credentials for the VIM account + + :param: vim_account_id: VIM Account ID + + :return: (cloud_name, cloud_credential) + """ + config = self.get_vim_account_config(vim_account_id) + return config.get("vca_cloud"), config.get("vca_cloud_credential") + + def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str): + """ + Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account + + :param: vim_account_id: VIM Account ID + + :return: (cloud_name, cloud_credential) + """ + config = self.get_vim_account_config(vim_account_id) + return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential") + + def get_vim_account_config(self, vim_account_id: str) -> dict: + """ + Get VIM Account config from the OSM Database + + :param: vim_account_id: VIM Account ID + + :return: Dictionary with the config of the vim account + """ + vim_account = self.db.get_one(table="vim_accounts", q_filter={"_id": vim_account_id}, fail_on_empty=False) + return vim_account.get("config", {}) if vim_account else {}