+ raise LcmException(
+ "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
+ vnf_index
+ )
+ )
+
+ def _get_ns_config_info(self, nsr_id):
+ """
+ Generates a mapping between vnf,vdu elements and the N2VC id
+ :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
+ :return: a dictionary with {osm-config-mapping: {}} where its element contains:
+ "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
+ "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
+ """
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
+ mapping = {}
+ ns_config_info = {"osm-config-mapping": mapping}
+ for vca in vca_deployed_list:
+ if not vca["member-vnf-index"]:
+ continue
+ if not vca["vdu_id"]:
+ mapping[vca["member-vnf-index"]] = vca["application"]
+ else:
+ mapping[
+ "{}.{}.{}".format(
+ vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
+ )
+ ] = vca["application"]
+ return ns_config_info
+
+ async def _instantiate_ng_ro(
+ self,
+ logging_text,
+ nsr_id,
+ nsd,
+ db_nsr,
+ db_nslcmop,
+ db_vnfrs,
+ db_vnfds,
+ n2vc_key_list,
+ stage,
+ start_deploy,
+ timeout_ns_deploy,
+ ):
+
+ db_vims = {}
+
+ def get_vim_account(vim_account_id):
+ nonlocal db_vims
+ if vim_account_id in db_vims:
+ return db_vims[vim_account_id]
+ db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
+ db_vims[vim_account_id] = db_vim
+ return db_vim
+
+ # modify target_vld info with instantiation parameters
+ def parse_vld_instantiation_params(
+ target_vim, target_vld, vld_params, target_sdn
+ ):
+ if vld_params.get("ip-profile"):
+ target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
+ "ip-profile"
+ ]
+ if vld_params.get("provider-network"):
+ target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
+ "provider-network"
+ ]
+ if "sdn-ports" in vld_params["provider-network"] and target_sdn:
+ target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
+ "provider-network"
+ ]["sdn-ports"]
+ if vld_params.get("wimAccountId"):
+ target_wim = "wim:{}".format(vld_params["wimAccountId"])
+ target_vld["vim_info"][target_wim] = {}
+ for param in ("vim-network-name", "vim-network-id"):
+ if vld_params.get(param):
+ if isinstance(vld_params[param], dict):
+ for vim, vim_net in vld_params[param].items():
+ other_target_vim = "vim:" + vim
+ populate_dict(
+ target_vld["vim_info"],
+ (other_target_vim, param.replace("-", "_")),
+ vim_net,
+ )
+ else: # isinstance str
+ target_vld["vim_info"][target_vim][
+ param.replace("-", "_")
+ ] = vld_params[param]
+ if vld_params.get("common_id"):
+ target_vld["common_id"] = vld_params.get("common_id")
+
+ # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
+ def update_ns_vld_target(target, ns_params):
+ for vnf_params in ns_params.get("vnf", ()):
+ if vnf_params.get("vimAccountId"):
+ target_vnf = next(
+ (
+ vnfr
+ for vnfr in db_vnfrs.values()
+ if vnf_params["member-vnf-index"]
+ == vnfr["member-vnf-index-ref"]
+ ),
+ None,
+ )
+ vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
+ for a_index, a_vld in enumerate(target["ns"]["vld"]):
+ target_vld = find_in_list(
+ get_iterable(vdur, "interfaces"),
+ lambda iface: iface.get("ns-vld-id") == a_vld["name"],
+ )
+ if target_vld:
+ if vnf_params.get("vimAccountId") not in a_vld.get(
+ "vim_info", {}
+ ):
+ target["ns"]["vld"][a_index].get("vim_info").update(
+ {
+ "vim:{}".format(vnf_params["vimAccountId"]): {
+ "vim_network_name": ""
+ }
+ }
+ )
+
+ nslcmop_id = db_nslcmop["_id"]
+ target = {
+ "name": db_nsr["name"],
+ "ns": {"vld": []},
+ "vnf": [],
+ "image": deepcopy(db_nsr["image"]),
+ "flavor": deepcopy(db_nsr["flavor"]),
+ "action_id": nslcmop_id,
+ "cloud_init_content": {},
+ }
+ for image in target["image"]:
+ image["vim_info"] = {}
+ for flavor in target["flavor"]:
+ flavor["vim_info"] = {}
+ if db_nsr.get("affinity-or-anti-affinity-group"):
+ target["affinity-or-anti-affinity-group"] = deepcopy(db_nsr["affinity-or-anti-affinity-group"])
+ for affinity_or_anti_affinity_group in target["affinity-or-anti-affinity-group"]:
+ affinity_or_anti_affinity_group["vim_info"] = {}
+
+ if db_nslcmop.get("lcmOperationType") != "instantiate":
+ # get parameters of instantiation:
+ db_nslcmop_instantiate = self.db.get_list(
+ "nslcmops",
+ {
+ "nsInstanceId": db_nslcmop["nsInstanceId"],
+ "lcmOperationType": "instantiate",
+ },
+ )[-1]
+ ns_params = db_nslcmop_instantiate.get("operationParams")
+ else:
+ ns_params = db_nslcmop.get("operationParams")
+ ssh_keys_instantiation = ns_params.get("ssh_keys") or []
+ ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
+
+ cp2target = {}
+ for vld_index, vld in enumerate(db_nsr.get("vld")):
+ target_vim = "vim:{}".format(ns_params["vimAccountId"])
+ target_vld = {
+ "id": vld["id"],
+ "name": vld["name"],
+ "mgmt-network": vld.get("mgmt-network", False),
+ "type": vld.get("type"),
+ "vim_info": {
+ target_vim: {
+ "vim_network_name": vld.get("vim-network-name"),
+ "vim_account_id": ns_params["vimAccountId"],
+ }
+ },
+ }
+ # check if this network needs SDN assist
+ if vld.get("pci-interfaces"):
+ db_vim = get_vim_account(ns_params["vimAccountId"])
+ sdnc_id = db_vim["config"].get("sdn-controller")
+ if sdnc_id:
+ sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
+ target_sdn = "sdn:{}".format(sdnc_id)
+ target_vld["vim_info"][target_sdn] = {
+ "sdn": True,
+ "target_vim": target_vim,
+ "vlds": [sdn_vld],
+ "type": vld.get("type"),
+ }
+
+ nsd_vnf_profiles = get_vnf_profiles(nsd)
+ for nsd_vnf_profile in nsd_vnf_profiles:
+ for cp in nsd_vnf_profile["virtual-link-connectivity"]:
+ if cp["virtual-link-profile-id"] == vld["id"]:
+ cp2target[
+ "member_vnf:{}.{}".format(
+ cp["constituent-cpd-id"][0][
+ "constituent-base-element-id"
+ ],
+ cp["constituent-cpd-id"][0]["constituent-cpd-id"],
+ )
+ ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
+
+ # check at nsd descriptor, if there is an ip-profile
+ vld_params = {}
+ nsd_vlp = find_in_list(
+ get_virtual_link_profiles(nsd),
+ lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
+ == vld["id"],
+ )
+ if (
+ nsd_vlp
+ and nsd_vlp.get("virtual-link-protocol-data")
+ and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
+ ):
+ ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
+ "l3-protocol-data"
+ ]
+ ip_profile_dest_data = {}
+ if "ip-version" in ip_profile_source_data:
+ ip_profile_dest_data["ip-version"] = ip_profile_source_data[
+ "ip-version"
+ ]
+ if "cidr" in ip_profile_source_data:
+ ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
+ "cidr"
+ ]
+ if "gateway-ip" in ip_profile_source_data:
+ ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
+ "gateway-ip"
+ ]
+ if "dhcp-enabled" in ip_profile_source_data:
+ ip_profile_dest_data["dhcp-params"] = {
+ "enabled": ip_profile_source_data["dhcp-enabled"]
+ }
+ vld_params["ip-profile"] = ip_profile_dest_data
+
+ # update vld_params with instantiation params
+ vld_instantiation_params = find_in_list(
+ get_iterable(ns_params, "vld"),
+ lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
+ )
+ if vld_instantiation_params:
+ vld_params.update(vld_instantiation_params)
+ parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
+ target["ns"]["vld"].append(target_vld)
+ # Update the target ns_vld if vnf vim_account is overriden by instantiation params
+ update_ns_vld_target(target, ns_params)
+
+ for vnfr in db_vnfrs.values():
+ vnfd = find_in_list(
+ db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
+ )
+ vnf_params = find_in_list(
+ get_iterable(ns_params, "vnf"),
+ lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
+ )
+ target_vnf = deepcopy(vnfr)
+ target_vim = "vim:{}".format(vnfr["vim-account-id"])
+ for vld in target_vnf.get("vld", ()):
+ # check if connected to a ns.vld, to fill target'
+ vnf_cp = find_in_list(
+ vnfd.get("int-virtual-link-desc", ()),
+ lambda cpd: cpd.get("id") == vld["id"],
+ )
+ if vnf_cp:
+ ns_cp = "member_vnf:{}.{}".format(
+ vnfr["member-vnf-index-ref"], vnf_cp["id"]
+ )
+ if cp2target.get(ns_cp):
+ vld["target"] = cp2target[ns_cp]
+
+ vld["vim_info"] = {
+ target_vim: {"vim_network_name": vld.get("vim-network-name")}
+ }
+ # check if this network needs SDN assist
+ target_sdn = None
+ if vld.get("pci-interfaces"):
+ db_vim = get_vim_account(vnfr["vim-account-id"])
+ sdnc_id = db_vim["config"].get("sdn-controller")
+ if sdnc_id:
+ sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
+ target_sdn = "sdn:{}".format(sdnc_id)
+ vld["vim_info"][target_sdn] = {
+ "sdn": True,
+ "target_vim": target_vim,
+ "vlds": [sdn_vld],
+ "type": vld.get("type"),
+ }
+
+ # check at vnfd descriptor, if there is an ip-profile
+ vld_params = {}
+ vnfd_vlp = find_in_list(
+ get_virtual_link_profiles(vnfd),
+ lambda a_link_profile: a_link_profile["id"] == vld["id"],
+ )
+ if (
+ vnfd_vlp
+ and vnfd_vlp.get("virtual-link-protocol-data")
+ and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
+ ):
+ ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
+ "l3-protocol-data"
+ ]
+ ip_profile_dest_data = {}
+ if "ip-version" in ip_profile_source_data:
+ ip_profile_dest_data["ip-version"] = ip_profile_source_data[
+ "ip-version"
+ ]
+ if "cidr" in ip_profile_source_data:
+ ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
+ "cidr"
+ ]
+ if "gateway-ip" in ip_profile_source_data:
+ ip_profile_dest_data[
+ "gateway-address"
+ ] = ip_profile_source_data["gateway-ip"]
+ if "dhcp-enabled" in ip_profile_source_data:
+ ip_profile_dest_data["dhcp-params"] = {
+ "enabled": ip_profile_source_data["dhcp-enabled"]
+ }
+
+ vld_params["ip-profile"] = ip_profile_dest_data
+ # update vld_params with instantiation params
+ if vnf_params:
+ vld_instantiation_params = find_in_list(
+ get_iterable(vnf_params, "internal-vld"),
+ lambda i_vld: i_vld["name"] == vld["id"],
+ )
+ if vld_instantiation_params:
+ vld_params.update(vld_instantiation_params)
+ parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
+
+ vdur_list = []
+ for vdur in target_vnf.get("vdur", ()):
+ if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
+ continue # This vdu must not be created
+ vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
+
+ self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
+
+ if ssh_keys_all:
+ vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
+ vnf_configuration = get_configuration(vnfd, vnfd["id"])
+ if (
+ vdu_configuration
+ and vdu_configuration.get("config-access")
+ and vdu_configuration.get("config-access").get("ssh-access")
+ ):
+ vdur["ssh-keys"] = ssh_keys_all
+ vdur["ssh-access-required"] = vdu_configuration[
+ "config-access"
+ ]["ssh-access"]["required"]
+ elif (
+ vnf_configuration
+ and vnf_configuration.get("config-access")
+ and vnf_configuration.get("config-access").get("ssh-access")
+ and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
+ ):
+ vdur["ssh-keys"] = ssh_keys_all
+ vdur["ssh-access-required"] = vnf_configuration[
+ "config-access"
+ ]["ssh-access"]["required"]
+ elif ssh_keys_instantiation and find_in_list(
+ vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
+ ):
+ vdur["ssh-keys"] = ssh_keys_instantiation
+
+ self.logger.debug("NS > vdur > {}".format(vdur))
+
+ vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
+ # cloud-init
+ if vdud.get("cloud-init-file"):
+ vdur["cloud-init"] = "{}:file:{}".format(
+ vnfd["_id"], vdud.get("cloud-init-file")
+ )
+ # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
+ if vdur["cloud-init"] not in target["cloud_init_content"]:
+ base_folder = vnfd["_admin"]["storage"]
+ if base_folder["pkg-dir"]:
+ cloud_init_file = "{}/{}/cloud_init/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ vdud.get("cloud-init-file"),
+ )
+ else:
+ cloud_init_file = "{}/Scripts/cloud_init/{}".format(
+ base_folder["folder"],
+ vdud.get("cloud-init-file"),
+ )
+ with self.fs.file_open(cloud_init_file, "r") as ci_file:
+ target["cloud_init_content"][
+ vdur["cloud-init"]
+ ] = ci_file.read()
+ elif vdud.get("cloud-init"):
+ vdur["cloud-init"] = "{}:vdu:{}".format(
+ vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
+ )
+ # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
+ target["cloud_init_content"][vdur["cloud-init"]] = vdud[
+ "cloud-init"
+ ]
+ vdur["additionalParams"] = vdur.get("additionalParams") or {}
+ deploy_params_vdu = self._format_additional_params(
+ vdur.get("additionalParams") or {}
+ )
+ deploy_params_vdu["OSM"] = get_osm_params(
+ vnfr, vdur["vdu-id-ref"], vdur["count-index"]
+ )
+ vdur["additionalParams"] = deploy_params_vdu
+
+ # flavor
+ ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
+ if target_vim not in ns_flavor["vim_info"]:
+ ns_flavor["vim_info"][target_vim] = {}
+
+ # deal with images
+ # in case alternative images are provided we must check if they should be applied
+ # for the vim_type, modify the vim_type taking into account
+ ns_image_id = int(vdur["ns-image-id"])
+ if vdur.get("alt-image-ids"):
+ db_vim = get_vim_account(vnfr["vim-account-id"])
+ vim_type = db_vim["vim_type"]
+ for alt_image_id in vdur.get("alt-image-ids"):
+ ns_alt_image = target["image"][int(alt_image_id)]
+ if vim_type == ns_alt_image.get("vim-type"):
+ # must use alternative image
+ self.logger.debug(
+ "use alternative image id: {}".format(alt_image_id)
+ )
+ ns_image_id = alt_image_id
+ vdur["ns-image-id"] = ns_image_id
+ break
+ ns_image = target["image"][int(ns_image_id)]
+ if target_vim not in ns_image["vim_info"]:
+ ns_image["vim_info"][target_vim] = {}
+
+ # Affinity groups
+ if vdur.get("affinity-or-anti-affinity-group-id"):
+ for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
+ ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
+ if target_vim not in ns_ags["vim_info"]:
+ ns_ags["vim_info"][target_vim] = {}
+
+ vdur["vim_info"] = {target_vim: {}}
+ # instantiation parameters
+ # if vnf_params:
+ # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
+ # vdud["id"]), None)
+ vdur_list.append(vdur)
+ target_vnf["vdur"] = vdur_list
+ target["vnf"].append(target_vnf)
+
+ desc = await self.RO.deploy(nsr_id, target)
+ self.logger.debug("RO return > {}".format(desc))
+ action_id = desc["action_id"]
+ await self._wait_ng_ro(
+ nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
+ )
+
+ # Updating NSR
+ db_nsr_update = {
+ "_admin.deployed.RO.operational-status": "running",
+ "detailed-status": " ".join(stage),
+ }
+ # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+ self.logger.debug(
+ logging_text + "ns deployed at RO. RO_id={}".format(action_id)
+ )
+ return
+
+ async def _wait_ng_ro(
+ self,
+ nsr_id,
+ action_id,
+ nslcmop_id=None,
+ start_time=None,
+ timeout=600,
+ stage=None,
+ ):
+ detailed_status_old = None
+ db_nsr_update = {}
+ start_time = start_time or time()
+ while time() <= start_time + timeout:
+ desc_status = await self.RO.status(nsr_id, action_id)
+ self.logger.debug("Wait NG RO > {}".format(desc_status))
+ if desc_status["status"] == "FAILED":
+ raise NgRoException(desc_status["details"])
+ elif desc_status["status"] == "BUILD":
+ if stage:
+ stage[2] = "VIM: ({})".format(desc_status["details"])
+ elif desc_status["status"] == "DONE":
+ if stage:
+ stage[2] = "Deployed at VIM"
+ break
+ else:
+ assert False, "ROclient.check_ns_status returns unknown {}".format(
+ desc_status["status"]
+ )
+ if stage and nslcmop_id and stage[2] != detailed_status_old:
+ detailed_status_old = stage[2]
+ db_nsr_update["detailed-status"] = " ".join(stage)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+ await asyncio.sleep(15, loop=self.loop)
+ else: # timeout_ns_deploy
+ raise NgRoException("Timeout waiting ns to deploy")
+
+ async def _terminate_ng_ro(
+ self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
+ ):
+ db_nsr_update = {}
+ failed_detail = []
+ action_id = None
+ start_deploy = time()
+ try:
+ target = {
+ "ns": {"vld": []},
+ "vnf": [],
+ "image": [],
+ "flavor": [],
+ "action_id": nslcmop_id,
+ }
+ desc = await self.RO.deploy(nsr_id, target)
+ action_id = desc["action_id"]
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
+ self.logger.debug(
+ logging_text
+ + "ns terminate action at RO. action_id={}".format(action_id)
+ )
+
+ # wait until done
+ delete_timeout = 20 * 60 # 20 minutes
+ await self._wait_ng_ro(
+ nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
+ )
+
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
+ # delete all nsr
+ await self.RO.delete(nsr_id)
+ except Exception as e:
+ if isinstance(e, NgRoException) and e.http_code == 404: # not found
+ db_nsr_update["_admin.deployed.RO.nsr_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
+ self.logger.debug(
+ logging_text + "RO_action_id={} already deleted".format(action_id)
+ )
+ elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
+ failed_detail.append("delete conflict: {}".format(e))
+ self.logger.debug(
+ logging_text
+ + "RO_action_id={} delete conflict: {}".format(action_id, e)
+ )
+ else:
+ failed_detail.append("delete error: {}".format(e))
+ self.logger.error(
+ logging_text
+ + "RO_action_id={} delete error: {}".format(action_id, e)
+ )
+
+ if failed_detail:
+ stage[2] = "Error deleting from VIM"
+ else:
+ stage[2] = "Deleted from VIM"
+ db_nsr_update["detailed-status"] = " ".join(stage)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+
+ if failed_detail:
+ raise LcmException("; ".join(failed_detail))
+ return
+
+ async def instantiate_RO(
+ self,
+ logging_text,
+ nsr_id,
+ nsd,
+ db_nsr,
+ db_nslcmop,
+ db_vnfrs,
+ db_vnfds,
+ n2vc_key_list,
+ stage,
+ ):
+ """
+ Instantiate at RO
+ :param logging_text: preffix text to use at logging
+ :param nsr_id: nsr identity
+ :param nsd: database content of ns descriptor
+ :param db_nsr: database content of ns record
+ :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
+ :param db_vnfrs:
+ :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
+ :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
+ :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
+ :return: None or exception
+ """
+ try:
+ start_deploy = time()
+ ns_params = db_nslcmop.get("operationParams")
+ if ns_params and ns_params.get("timeout_ns_deploy"):
+ timeout_ns_deploy = ns_params["timeout_ns_deploy"]
+ else:
+ timeout_ns_deploy = self.timeout.get(
+ "ns_deploy", self.timeout_ns_deploy
+ )
+
+ # Check for and optionally request placement optimization. Database will be updated if placement activated
+ stage[2] = "Waiting for Placement."
+ if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
+ # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
+ for vnfr in db_vnfrs.values():
+ if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
+ break
+ else:
+ ns_params["vimAccountId"] == vnfr["vim-account-id"]
+
+ return await self._instantiate_ng_ro(
+ logging_text,
+ nsr_id,
+ nsd,
+ db_nsr,
+ db_nslcmop,
+ db_vnfrs,
+ db_vnfds,
+ n2vc_key_list,
+ stage,
+ start_deploy,
+ timeout_ns_deploy,
+ )
+ except Exception as e:
+ stage[2] = "ERROR deploying at VIM"
+ self.set_vnfr_at_error(db_vnfrs, str(e))
+ self.logger.error(
+ "Error deploying at VIM {}".format(e),
+ exc_info=not isinstance(
+ e,
+ (
+ ROclient.ROClientException,
+ LcmException,
+ DbException,
+ NgRoException,
+ ),
+ ),
+ )
+ raise
+
+ async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
+ """
+ Wait for kdu to be up, get ip address
+ :param logging_text: prefix use for logging
+ :param nsr_id:
+ :param vnfr_id:
+ :param kdu_name:
+ :return: IP address
+ """
+
+ # self.logger.debug(logging_text + "Starting wait_kdu_up")
+ nb_tries = 0
+
+ while nb_tries < 360:
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
+ kdur = next(
+ (
+ x
+ for x in get_iterable(db_vnfr, "kdur")
+ if x.get("kdu-name") == kdu_name
+ ),
+ None,
+ )
+ if not kdur:
+ raise LcmException(
+ "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
+ )
+ if kdur.get("status"):
+ if kdur["status"] in ("READY", "ENABLED"):
+ return kdur.get("ip-address")
+ else:
+ raise LcmException(
+ "target KDU={} is in error state".format(kdu_name)
+ )
+
+ await asyncio.sleep(10, loop=self.loop)
+ nb_tries += 1
+ raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
+
+ async def wait_vm_up_insert_key_ro(
+ self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
+ ):
+ """
+ Wait for ip addres at RO, and optionally, insert public key in virtual machine
+ :param logging_text: prefix use for logging
+ :param nsr_id:
+ :param vnfr_id:
+ :param vdu_id:
+ :param vdu_index:
+ :param pub_key: public ssh key to inject, None to skip
+ :param user: user to apply the public ssh key
+ :return: IP address
+ """
+
+ self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
+ ro_nsr_id = None
+ ip_address = None
+ nb_tries = 0
+ target_vdu_id = None
+ ro_retries = 0
+
+ while True:
+
+ ro_retries += 1
+ if ro_retries >= 360: # 1 hour
+ raise LcmException(
+ "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
+ )
+
+ await asyncio.sleep(10, loop=self.loop)
+
+ # get ip address
+ if not target_vdu_id:
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
+
+ if not vdu_id: # for the VNF case
+ if db_vnfr.get("status") == "ERROR":
+ raise LcmException(
+ "Cannot inject ssh-key because target VNF is in error state"
+ )
+ ip_address = db_vnfr.get("ip-address")
+ if not ip_address:
+ continue
+ vdur = next(
+ (
+ x
+ for x in get_iterable(db_vnfr, "vdur")
+ if x.get("ip-address") == ip_address
+ ),
+ None,
+ )
+ else: # VDU case
+ vdur = next(
+ (
+ x
+ for x in get_iterable(db_vnfr, "vdur")
+ if x.get("vdu-id-ref") == vdu_id
+ and x.get("count-index") == vdu_index
+ ),
+ None,
+ )
+
+ if (
+ not vdur and len(db_vnfr.get("vdur", ())) == 1
+ ): # If only one, this should be the target vdu
+ vdur = db_vnfr["vdur"][0]
+ if not vdur:
+ raise LcmException(
+ "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
+ vnfr_id, vdu_id, vdu_index
+ )
+ )
+ # New generation RO stores information at "vim_info"
+ ng_ro_status = None
+ target_vim = None
+ if vdur.get("vim_info"):
+ target_vim = next(
+ t for t in vdur["vim_info"]
+ ) # there should be only one key
+ ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
+ if (
+ vdur.get("pdu-type")
+ or vdur.get("status") == "ACTIVE"
+ or ng_ro_status == "ACTIVE"
+ ):
+ ip_address = vdur.get("ip-address")
+ if not ip_address:
+ continue
+ target_vdu_id = vdur["vdu-id-ref"]
+ elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
+ raise LcmException(
+ "Cannot inject ssh-key because target VM is in error state"
+ )
+
+ if not target_vdu_id:
+ continue
+
+ # inject public key into machine
+ if pub_key and user:
+ self.logger.debug(logging_text + "Inserting RO key")
+ self.logger.debug("SSH > PubKey > {}".format(pub_key))
+ if vdur.get("pdu-type"):
+ self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
+ return ip_address
+ try:
+ ro_vm_id = "{}-{}".format(
+ db_vnfr["member-vnf-index-ref"], target_vdu_id
+ ) # TODO add vdu_index
+ if self.ng_ro:
+ target = {
+ "action": {
+ "action": "inject_ssh_key",
+ "key": pub_key,
+ "user": user,
+ },
+ "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
+ }
+ desc = await self.RO.deploy(nsr_id, target)
+ action_id = desc["action_id"]
+ await self._wait_ng_ro(nsr_id, action_id, timeout=600)
+ break
+ else:
+ # wait until NS is deployed at RO
+ if not ro_nsr_id:
+ db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
+ ro_nsr_id = deep_get(
+ db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
+ )
+ if not ro_nsr_id:
+ continue
+ result_dict = await self.RO.create_action(
+ item="ns",
+ item_id_name=ro_nsr_id,
+ descriptor={
+ "add_public_key": pub_key,
+ "vms": [ro_vm_id],
+ "user": user,
+ },
+ )
+ # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
+ if not result_dict or not isinstance(result_dict, dict):
+ raise LcmException(
+ "Unknown response from RO when injecting key"
+ )
+ for result in result_dict.values():
+ if result.get("vim_result") == 200:
+ break
+ else:
+ raise ROclient.ROClientException(
+ "error injecting key: {}".format(
+ result.get("description")
+ )
+ )
+ break
+ except NgRoException as e:
+ raise LcmException(
+ "Reaching max tries injecting key. Error: {}".format(e)
+ )
+ except ROclient.ROClientException as e:
+ if not nb_tries:
+ self.logger.debug(
+ logging_text
+ + "error injecting key: {}. Retrying until {} seconds".format(
+ e, 20 * 10
+ )
+ )
+ nb_tries += 1
+ if nb_tries >= 20:
+ raise LcmException(
+ "Reaching max tries injecting key. Error: {}".format(e)
+ )
+ else:
+ break
+
+ return ip_address
+
+ async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
+ """
+ Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
+ """
+ my_vca = vca_deployed_list[vca_index]
+ if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
+ # vdu or kdu: no dependencies
+ return
+ timeout = 300
+ while timeout >= 0:
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
+ configuration_status_list = db_nsr["configurationStatus"]
+ for index, vca_deployed in enumerate(configuration_status_list):
+ if index == vca_index:
+ # myself
+ continue
+ if not my_vca.get("member-vnf-index") or (
+ vca_deployed.get("member-vnf-index")
+ == my_vca.get("member-vnf-index")
+ ):
+ internal_status = configuration_status_list[index].get("status")
+ if internal_status == "READY":
+ continue
+ elif internal_status == "BROKEN":
+ raise LcmException(
+ "Configuration aborted because dependent charm/s has failed"
+ )
+ else:
+ break
+ else:
+ # no dependencies, return
+ return
+ await asyncio.sleep(10)
+ timeout -= 1
+
+ raise LcmException("Configuration aborted because dependent charm/s timeout")
+
+ def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
+ vca_id = None
+ if db_vnfr:
+ vca_id = deep_get(db_vnfr, ("vca-id",))
+ elif db_nsr:
+ vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
+ vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
+ return vca_id
+
+ async def instantiate_N2VC(
+ self,
+ logging_text,
+ vca_index,
+ nsi_id,
+ db_nsr,
+ db_vnfr,
+ vdu_id,
+ kdu_name,
+ vdu_index,
+ config_descriptor,
+ deploy_params,
+ base_folder,
+ nslcmop_id,
+ stage,
+ vca_type,
+ vca_name,
+ ee_config_descriptor,
+ ):
+ nsr_id = db_nsr["_id"]
+ db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
+ vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
+ vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
+ osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
+ db_dict = {
+ "collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": db_update_entry,
+ }
+ step = ""
+ try:
+
+ element_type = "NS"
+ element_under_configuration = nsr_id
+
+ vnfr_id = None
+ if db_vnfr:
+ vnfr_id = db_vnfr["_id"]
+ osm_config["osm"]["vnf_id"] = vnfr_id
+
+ namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
+
+ if vca_type == "native_charm":
+ index_number = 0
+ else:
+ index_number = vdu_index or 0
+
+ if vnfr_id:
+ element_type = "VNF"
+ element_under_configuration = vnfr_id
+ namespace += ".{}-{}".format(vnfr_id, index_number)
+ if vdu_id:
+ namespace += ".{}-{}".format(vdu_id, index_number)
+ element_type = "VDU"
+ element_under_configuration = "{}-{}".format(vdu_id, index_number)
+ osm_config["osm"]["vdu_id"] = vdu_id
+ elif kdu_name:
+ namespace += ".{}".format(kdu_name)
+ element_type = "KDU"
+ element_under_configuration = kdu_name
+ osm_config["osm"]["kdu_name"] = kdu_name
+
+ # Get artifact path
+ if base_folder["pkg-dir"]:
+ artifact_path = "{}/{}/{}/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ "charms"
+ if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
+ else "helm-charts",
+ vca_name,
+ )
+ else:
+ artifact_path = "{}/Scripts/{}/{}/".format(
+ base_folder["folder"],
+ "charms"
+ if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
+ else "helm-charts",
+ vca_name,
+ )
+
+ self.logger.debug("Artifact path > {}".format(artifact_path))
+
+ # get initial_config_primitive_list that applies to this element
+ initial_config_primitive_list = config_descriptor.get(
+ "initial-config-primitive"
+ )
+
+ self.logger.debug(
+ "Initial config primitive list > {}".format(
+ initial_config_primitive_list
+ )
+ )
+
+ # add config if not present for NS charm
+ ee_descriptor_id = ee_config_descriptor.get("id")
+ self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
+ initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
+ initial_config_primitive_list, vca_deployed, ee_descriptor_id
+ )
+
+ self.logger.debug(
+ "Initial config primitive list #2 > {}".format(
+ initial_config_primitive_list
+ )
+ )
+ # n2vc_redesign STEP 3.1
+ # find old ee_id if exists
+ ee_id = vca_deployed.get("ee_id")
+
+ vca_id = self.get_vca_id(db_vnfr, db_nsr)
+ # create or register execution environment in VCA
+ if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status="CREATING",
+ element_under_configuration=element_under_configuration,
+ element_type=element_type,
+ )
+
+ step = "create execution environment"
+ self.logger.debug(logging_text + step)
+
+ ee_id = None
+ credentials = None
+ if vca_type == "k8s_proxy_charm":
+ ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
+ charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
+ namespace=namespace,
+ artifact_path=artifact_path,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ )
+ elif vca_type == "helm" or vca_type == "helm-v3":
+ ee_id, credentials = await self.vca_map[
+ vca_type
+ ].create_execution_environment(
+ namespace=namespace,
+ reuse_ee_id=ee_id,
+ db_dict=db_dict,
+ config=osm_config,
+ artifact_path=artifact_path,
+ vca_type=vca_type,
+ )
+ else:
+ ee_id, credentials = await self.vca_map[
+ vca_type
+ ].create_execution_environment(
+ namespace=namespace,
+ reuse_ee_id=ee_id,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ )
+
+ elif vca_type == "native_charm":
+ step = "Waiting to VM being up and getting IP address"
+ self.logger.debug(logging_text + step)
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
+ logging_text,
+ nsr_id,
+ vnfr_id,
+ vdu_id,
+ vdu_index,
+ user=None,
+ pub_key=None,
+ )
+ credentials = {"hostname": rw_mgmt_ip}
+ # get username
+ username = deep_get(
+ config_descriptor, ("config-access", "ssh-access", "default-user")
+ )
+ # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
+ # merged. Meanwhile let's get username from initial-config-primitive
+ if not username and initial_config_primitive_list:
+ for config_primitive in initial_config_primitive_list:
+ for param in config_primitive.get("parameter", ()):
+ if param["name"] == "ssh-username":
+ username = param["value"]
+ break
+ if not username:
+ raise LcmException(
+ "Cannot determine the username neither with 'initial-config-primitive' nor with "
+ "'config-access.ssh-access.default-user'"
+ )
+ credentials["username"] = username
+ # n2vc_redesign STEP 3.2
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status="REGISTERING",
+ element_under_configuration=element_under_configuration,
+ element_type=element_type,
+ )
+
+ step = "register execution environment {}".format(credentials)
+ self.logger.debug(logging_text + step)
+ ee_id = await self.vca_map[vca_type].register_execution_environment(
+ credentials=credentials,
+ namespace=namespace,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ )
+
+ # for compatibility with MON/POL modules, the need model and application name at database
+ # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
+ ee_id_parts = ee_id.split(".")
+ db_nsr_update = {db_update_entry + "ee_id": ee_id}
+ if len(ee_id_parts) >= 2:
+ model_name = ee_id_parts[0]
+ application_name = ee_id_parts[1]
+ db_nsr_update[db_update_entry + "model"] = model_name
+ db_nsr_update[db_update_entry + "application"] = application_name
+
+ # n2vc_redesign STEP 3.3
+ step = "Install configuration Software"
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status="INSTALLING SW",
+ element_under_configuration=element_under_configuration,
+ element_type=element_type,
+ other_update=db_nsr_update,
+ )
+
+ # TODO check if already done
+ self.logger.debug(logging_text + step)
+ config = None
+ if vca_type == "native_charm":
+ config_primitive = next(
+ (p for p in initial_config_primitive_list if p["name"] == "config"),
+ None,
+ )
+ if config_primitive:
+ config = self._map_primitive_params(
+ config_primitive, {}, deploy_params
+ )
+ num_units = 1
+ if vca_type == "lxc_proxy_charm":
+ if element_type == "NS":
+ num_units = db_nsr.get("config-units") or 1
+ elif element_type == "VNF":
+ num_units = db_vnfr.get("config-units") or 1
+ elif element_type == "VDU":
+ for v in db_vnfr["vdur"]:
+ if vdu_id == v["vdu-id-ref"]:
+ num_units = v.get("config-units") or 1
+ break
+ if vca_type != "k8s_proxy_charm":
+ await self.vca_map[vca_type].install_configuration_sw(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ db_dict=db_dict,
+ config=config,
+ num_units=num_units,
+ vca_id=vca_id,
+ vca_type=vca_type,
+ )
+
+ # write in db flag of configuration_sw already installed
+ self.update_db_2(
+ "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
+ )
+
+ # add relations for this VCA (wait for other peers related with this VCA)
+ await self._add_vca_relations(
+ logging_text=logging_text,
+ nsr_id=nsr_id,
+ vca_type=vca_type,
+ vca_index=vca_index,
+ )
+
+ # if SSH access is required, then get execution environment SSH public
+ # if native charm we have waited already to VM be UP
+ if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
+ pub_key = None
+ user = None
+ # self.logger.debug("get ssh key block")
+ if deep_get(
+ config_descriptor, ("config-access", "ssh-access", "required")
+ ):
+ # self.logger.debug("ssh key needed")
+ # Needed to inject a ssh key
+ user = deep_get(
+ config_descriptor,
+ ("config-access", "ssh-access", "default-user"),
+ )
+ step = "Install configuration Software, getting public ssh key"
+ pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
+ ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
+ )
+
+ step = "Insert public key into VM user={} ssh_key={}".format(
+ user, pub_key
+ )
+ else:
+ # self.logger.debug("no need to get ssh key")
+ step = "Waiting to VM being up and getting IP address"
+ self.logger.debug(logging_text + step)
+
+ # n2vc_redesign STEP 5.1
+ # wait for RO (ip-address) Insert pub_key into VM
+ if vnfr_id:
+ if kdu_name:
+ rw_mgmt_ip = await self.wait_kdu_up(
+ logging_text, nsr_id, vnfr_id, kdu_name
+ )
+ else:
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
+ logging_text,
+ nsr_id,
+ vnfr_id,
+ vdu_id,
+ vdu_index,
+ user=user,
+ pub_key=pub_key,
+ )
+ else:
+ rw_mgmt_ip = None # This is for a NS configuration
+
+ self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
+
+ # store rw_mgmt_ip in deploy params for later replacement
+ deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
+
+ # n2vc_redesign STEP 6 Execute initial config primitive
+ step = "execute initial config primitive"
+
+ # wait for dependent primitives execution (NS -> VNF -> VDU)
+ if initial_config_primitive_list:
+ await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
+
+ # stage, in function of element type: vdu, kdu, vnf or ns
+ my_vca = vca_deployed_list[vca_index]
+ if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
+ # VDU or KDU
+ stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
+ elif my_vca.get("member-vnf-index"):
+ # VNF
+ stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
+ else:
+ # NS
+ stage[0] = "Stage 5/5: running Day-1 primitives for NS."
+
+ self._write_configuration_status(
+ nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
+ )
+
+ self._write_op_status(op_id=nslcmop_id, stage=stage)
+
+ check_if_terminated_needed = True
+ for initial_config_primitive in initial_config_primitive_list:
+ # adding information on the vca_deployed if it is a NS execution environment
+ if not vca_deployed["member-vnf-index"]:
+ deploy_params["ns_config_info"] = json.dumps(
+ self._get_ns_config_info(nsr_id)
+ )
+ # TODO check if already done
+ primitive_params_ = self._map_primitive_params(
+ initial_config_primitive, {}, deploy_params
+ )
+
+ step = "execute primitive '{}' params '{}'".format(
+ initial_config_primitive["name"], primitive_params_
+ )
+ self.logger.debug(logging_text + step)
+ await self.vca_map[vca_type].exec_primitive(
+ ee_id=ee_id,
+ primitive_name=initial_config_primitive["name"],
+ params_dict=primitive_params_,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ vca_type=vca_type,
+ )
+ # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
+ if check_if_terminated_needed:
+ if config_descriptor.get("terminate-config-primitive"):
+ self.update_db_2(
+ "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
+ )
+ check_if_terminated_needed = False
+
+ # TODO register in database that primitive is done
+
+ # STEP 7 Configure metrics
+ if vca_type == "helm" or vca_type == "helm-v3":
+ prometheus_jobs = await self.extract_prometheus_scrape_jobs(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ ee_config_descriptor=ee_config_descriptor,
+ vnfr_id=vnfr_id,
+ nsr_id=nsr_id,
+ target_ip=rw_mgmt_ip,
+ )
+ if prometheus_jobs:
+ self.update_db_2(
+ "nsrs",
+ nsr_id,
+ {db_update_entry + "prometheus_jobs": prometheus_jobs},
+ )
+
+ for job in prometheus_jobs:
+ self.db.set_one(
+ "prometheus_jobs",
+ {
+ "job_name": job["job_name"]
+ },
+ job,
+ upsert=True,
+ fail_on_empty=False,
+ )
+
+ step = "instantiated at VCA"
+ self.logger.debug(logging_text + step)
+
+ self._write_configuration_status(
+ nsr_id=nsr_id, vca_index=vca_index, status="READY"
+ )
+
+ except Exception as e: # TODO not use Exception but N2VC exception
+ # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
+ if not isinstance(
+ e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
+ ):
+ self.logger.error(
+ "Exception while {} : {}".format(step, e), exc_info=True
+ )
+ self._write_configuration_status(
+ nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
+ )
+ raise LcmException("{} {}".format(step, e)) from e
+
+ def _write_ns_status(
+ self,
+ nsr_id: str,
+ ns_state: str,
+ current_operation: str,
+ current_operation_id: str,
+ error_description: str = None,
+ error_detail: str = None,
+ other_update: dict = None,
+ ):
+ """
+ Update db_nsr fields.
+ :param nsr_id:
+ :param ns_state:
+ :param current_operation:
+ :param current_operation_id:
+ :param error_description:
+ :param error_detail:
+ :param other_update: Other required changes at database if provided, will be cleared
+ :return:
+ """
+ try:
+ db_dict = other_update or {}
+ db_dict[
+ "_admin.nslcmop"
+ ] = current_operation_id # for backward compatibility
+ db_dict["_admin.current-operation"] = current_operation_id
+ db_dict["_admin.operation-type"] = (
+ current_operation if current_operation != "IDLE" else None
+ )
+ db_dict["currentOperation"] = current_operation
+ db_dict["currentOperationID"] = current_operation_id
+ db_dict["errorDescription"] = error_description
+ db_dict["errorDetail"] = error_detail
+
+ if ns_state:
+ db_dict["nsState"] = ns_state
+ self.update_db_2("nsrs", nsr_id, db_dict)
+ except DbException as e:
+ self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
+
+ def _write_op_status(
+ self,
+ op_id: str,
+ stage: list = None,
+ error_message: str = None,
+ queuePosition: int = 0,
+ operation_state: str = None,
+ other_update: dict = None,
+ ):
+ try:
+ db_dict = other_update or {}
+ db_dict["queuePosition"] = queuePosition
+ if isinstance(stage, list):
+ db_dict["stage"] = stage[0]
+ db_dict["detailed-status"] = " ".join(stage)
+ elif stage is not None:
+ db_dict["stage"] = str(stage)
+
+ if error_message is not None:
+ db_dict["errorMessage"] = error_message
+ if operation_state is not None:
+ db_dict["operationState"] = operation_state
+ db_dict["statusEnteredTime"] = time()
+ self.update_db_2("nslcmops", op_id, db_dict)
+ except DbException as e:
+ self.logger.warn(
+ "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
+ )
+
+ def _write_all_config_status(self, db_nsr: dict, status: str):
+ try:
+ nsr_id = db_nsr["_id"]
+ # configurationStatus
+ config_status = db_nsr.get("configurationStatus")
+ if config_status:
+ db_nsr_update = {
+ "configurationStatus.{}.status".format(index): status
+ for index, v in enumerate(config_status)
+ if v
+ }
+ # update status
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ except DbException as e:
+ self.logger.warn(
+ "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
+ )
+
+ def _write_configuration_status(
+ self,
+ nsr_id: str,
+ vca_index: int,
+ status: str = None,
+ element_under_configuration: str = None,
+ element_type: str = None,
+ other_update: dict = None,
+ ):
+
+ # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
+ # .format(vca_index, status))
+
+ try:
+ db_path = "configurationStatus.{}.".format(vca_index)
+ db_dict = other_update or {}
+ if status:
+ db_dict[db_path + "status"] = status
+ if element_under_configuration:
+ db_dict[
+ db_path + "elementUnderConfiguration"
+ ] = element_under_configuration
+ if element_type:
+ db_dict[db_path + "elementType"] = element_type
+ self.update_db_2("nsrs", nsr_id, db_dict)
+ except DbException as e:
+ self.logger.warn(
+ "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
+ status, nsr_id, vca_index, e
+ )
+ )
+
+ async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
+ """
+ Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
+ sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
+ Database is used because the result can be obtained from a different LCM worker in case of HA.
+ :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
+ :param db_nslcmop: database content of nslcmop
+ :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
+ :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
+ computed 'vim-account-id'
+ """
+ modified = False
+ nslcmop_id = db_nslcmop["_id"]
+ placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
+ if placement_engine == "PLA":
+ self.logger.debug(
+ logging_text + "Invoke and wait for placement optimization"
+ )
+ await self.msg.aiowrite(
+ "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
+ )
+ db_poll_interval = 5
+ wait = db_poll_interval * 10
+ pla_result = None
+ while not pla_result and wait >= 0:
+ await asyncio.sleep(db_poll_interval)
+ wait -= db_poll_interval
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
+
+ if not pla_result:
+ raise LcmException(
+ "Placement timeout for nslcmopId={}".format(nslcmop_id)
+ )
+
+ for pla_vnf in pla_result["vnf"]:
+ vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
+ if not pla_vnf.get("vimAccountId") or not vnfr:
+ continue
+ modified = True
+ self.db.set_one(
+ "vnfrs",
+ {"_id": vnfr["_id"]},
+ {"vim-account-id": pla_vnf["vimAccountId"]},
+ )
+ # Modifies db_vnfrs
+ vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
+ return modified
+
+ def update_nsrs_with_pla_result(self, params):
+ try:
+ nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
+ self.update_db_2(
+ "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
+ )
+ except Exception as e:
+ self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))