+ # wait until done
+ delete_timeout = 20 * 60 # 20 minutes
+ await self._wait_ng_ro(
+ nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
+ )
+
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
+ # delete all nsr
+ await self.RO.delete(nsr_id)
+ except Exception as e:
+ if isinstance(e, NgRoException) and e.http_code == 404: # not found
+ db_nsr_update["_admin.deployed.RO.nsr_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
+ self.logger.debug(
+ logging_text + "RO_action_id={} already deleted".format(action_id)
+ )
+ elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
+ failed_detail.append("delete conflict: {}".format(e))
+ self.logger.debug(
+ logging_text
+ + "RO_action_id={} delete conflict: {}".format(action_id, e)
+ )
+ else:
+ failed_detail.append("delete error: {}".format(e))
+ self.logger.error(
+ logging_text
+ + "RO_action_id={} delete error: {}".format(action_id, e)
+ )
+
+ if failed_detail:
+ stage[2] = "Error deleting from VIM"
+ else:
+ stage[2] = "Deleted from VIM"
+ db_nsr_update["detailed-status"] = " ".join(stage)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+
+ if failed_detail:
+ raise LcmException("; ".join(failed_detail))
+ return
+
+ async def instantiate_RO(
+ self,
+ logging_text,
+ nsr_id,
+ nsd,
+ db_nsr,
+ db_nslcmop,
+ db_vnfrs,
+ db_vnfds,
+ n2vc_key_list,
+ stage,
+ ):
+ """
+ Instantiate at RO
+ :param logging_text: preffix text to use at logging
+ :param nsr_id: nsr identity
+ :param nsd: database content of ns descriptor
+ :param db_nsr: database content of ns record
+ :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
+ :param db_vnfrs:
+ :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
+ :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
+ :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
+ :return: None or exception
+ """
+ try:
+ start_deploy = time()
+ ns_params = db_nslcmop.get("operationParams")
+ if ns_params and ns_params.get("timeout_ns_deploy"):
+ timeout_ns_deploy = ns_params["timeout_ns_deploy"]
+ else:
+ timeout_ns_deploy = self.timeout.get(
+ "ns_deploy", self.timeout_ns_deploy
+ )
+
+ # Check for and optionally request placement optimization. Database will be updated if placement activated
+ stage[2] = "Waiting for Placement."
+ if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
+ # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
+ for vnfr in db_vnfrs.values():
+ if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
+ break
+ else:
+ ns_params["vimAccountId"] == vnfr["vim-account-id"]
+
+ return await self._instantiate_ng_ro(
+ logging_text,
+ nsr_id,
+ nsd,
+ db_nsr,
+ db_nslcmop,
+ db_vnfrs,
+ db_vnfds,
+ n2vc_key_list,
+ stage,
+ start_deploy,
+ timeout_ns_deploy,
+ )
+ except Exception as e:
+ stage[2] = "ERROR deploying at VIM"
+ self.set_vnfr_at_error(db_vnfrs, str(e))
+ self.logger.error(
+ "Error deploying at VIM {}".format(e),
+ exc_info=not isinstance(
+ e,
+ (
+ ROclient.ROClientException,
+ LcmException,
+ DbException,
+ NgRoException,
+ ),
+ ),
+ )
+ raise
+
+ async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
+ """
+ Wait for kdu to be up, get ip address
+ :param logging_text: prefix use for logging
+ :param nsr_id:
+ :param vnfr_id:
+ :param kdu_name:
+ :return: IP address
+ """
+
+ # self.logger.debug(logging_text + "Starting wait_kdu_up")
+ nb_tries = 0
+
+ while nb_tries < 360:
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
+ kdur = next(
+ (
+ x
+ for x in get_iterable(db_vnfr, "kdur")
+ if x.get("kdu-name") == kdu_name
+ ),
+ None,
+ )
+ if not kdur:
+ raise LcmException(
+ "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
+ )
+ if kdur.get("status"):
+ if kdur["status"] in ("READY", "ENABLED"):
+ return kdur.get("ip-address")
+ else:
+ raise LcmException(
+ "target KDU={} is in error state".format(kdu_name)
+ )
+
+ await asyncio.sleep(10, loop=self.loop)
+ nb_tries += 1
+ raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
+
+ async def wait_vm_up_insert_key_ro(
+ self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
+ ):
+ """
+ Wait for ip addres at RO, and optionally, insert public key in virtual machine
+ :param logging_text: prefix use for logging
+ :param nsr_id:
+ :param vnfr_id:
+ :param vdu_id:
+ :param vdu_index:
+ :param pub_key: public ssh key to inject, None to skip
+ :param user: user to apply the public ssh key
+ :return: IP address
+ """
+
+ self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
+ ro_nsr_id = None
+ ip_address = None
+ nb_tries = 0
+ target_vdu_id = None
+ ro_retries = 0
+
+ while True:
+
+ ro_retries += 1
+ if ro_retries >= 360: # 1 hour
+ raise LcmException(
+ "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
+ )
+
+ await asyncio.sleep(10, loop=self.loop)
+
+ # get ip address
+ if not target_vdu_id:
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
+
+ if not vdu_id: # for the VNF case
+ if db_vnfr.get("status") == "ERROR":
+ raise LcmException(
+ "Cannot inject ssh-key because target VNF is in error state"
+ )
+ ip_address = db_vnfr.get("ip-address")
+ if not ip_address:
+ continue
+ vdur = next(
+ (
+ x
+ for x in get_iterable(db_vnfr, "vdur")
+ if x.get("ip-address") == ip_address
+ ),
+ None,
+ )
+ else: # VDU case
+ vdur = next(
+ (
+ x
+ for x in get_iterable(db_vnfr, "vdur")
+ if x.get("vdu-id-ref") == vdu_id
+ and x.get("count-index") == vdu_index
+ ),
+ None,
+ )
+
+ if (
+ not vdur and len(db_vnfr.get("vdur", ())) == 1
+ ): # If only one, this should be the target vdu
+ vdur = db_vnfr["vdur"][0]
+ if not vdur:
+ raise LcmException(
+ "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
+ vnfr_id, vdu_id, vdu_index
+ )
+ )
+ # New generation RO stores information at "vim_info"
+ ng_ro_status = None
+ target_vim = None
+ if vdur.get("vim_info"):
+ target_vim = next(
+ t for t in vdur["vim_info"]
+ ) # there should be only one key
+ ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
+ if (
+ vdur.get("pdu-type")
+ or vdur.get("status") == "ACTIVE"
+ or ng_ro_status == "ACTIVE"
+ ):
+ ip_address = vdur.get("ip-address")
+ if not ip_address:
+ continue
+ target_vdu_id = vdur["vdu-id-ref"]
+ elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
+ raise LcmException(
+ "Cannot inject ssh-key because target VM is in error state"
+ )
+
+ if not target_vdu_id:
+ continue
+
+ # inject public key into machine
+ if pub_key and user:
+ self.logger.debug(logging_text + "Inserting RO key")
+ self.logger.debug("SSH > PubKey > {}".format(pub_key))
+ if vdur.get("pdu-type"):
+ self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
+ return ip_address
+ try:
+ ro_vm_id = "{}-{}".format(
+ db_vnfr["member-vnf-index-ref"], target_vdu_id
+ ) # TODO add vdu_index
+ if self.ng_ro:
+ target = {
+ "action": {
+ "action": "inject_ssh_key",
+ "key": pub_key,
+ "user": user,
+ },
+ "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
+ }
+ desc = await self.RO.deploy(nsr_id, target)
+ action_id = desc["action_id"]
+ await self._wait_ng_ro(nsr_id, action_id, timeout=600)
+ break
+ else:
+ # wait until NS is deployed at RO
+ if not ro_nsr_id:
+ db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
+ ro_nsr_id = deep_get(
+ db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
+ )
+ if not ro_nsr_id:
+ continue
+ result_dict = await self.RO.create_action(
+ item="ns",
+ item_id_name=ro_nsr_id,
+ descriptor={
+ "add_public_key": pub_key,
+ "vms": [ro_vm_id],
+ "user": user,
+ },
+ )
+ # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
+ if not result_dict or not isinstance(result_dict, dict):
+ raise LcmException(
+ "Unknown response from RO when injecting key"
+ )
+ for result in result_dict.values():
+ if result.get("vim_result") == 200:
+ break
+ else:
+ raise ROclient.ROClientException(
+ "error injecting key: {}".format(
+ result.get("description")
+ )
+ )
+ break
+ except NgRoException as e:
+ raise LcmException(
+ "Reaching max tries injecting key. Error: {}".format(e)
+ )
+ except ROclient.ROClientException as e:
+ if not nb_tries:
+ self.logger.debug(
+ logging_text
+ + "error injecting key: {}. Retrying until {} seconds".format(
+ e, 20 * 10
+ )
+ )
+ nb_tries += 1
+ if nb_tries >= 20:
+ raise LcmException(
+ "Reaching max tries injecting key. Error: {}".format(e)
+ )
+ else:
+ break
+
+ return ip_address
+
+ async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
+ """
+ Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
+ """
+ my_vca = vca_deployed_list[vca_index]
+ if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
+ # vdu or kdu: no dependencies
+ return
+ timeout = 300
+ while timeout >= 0:
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
+ configuration_status_list = db_nsr["configurationStatus"]
+ for index, vca_deployed in enumerate(configuration_status_list):
+ if index == vca_index:
+ # myself
+ continue
+ if not my_vca.get("member-vnf-index") or (
+ vca_deployed.get("member-vnf-index")
+ == my_vca.get("member-vnf-index")
+ ):
+ internal_status = configuration_status_list[index].get("status")
+ if internal_status == "READY":
+ continue
+ elif internal_status == "BROKEN":
+ raise LcmException(
+ "Configuration aborted because dependent charm/s has failed"
+ )
+ else:
+ break
+ else:
+ # no dependencies, return
+ return
+ await asyncio.sleep(10)
+ timeout -= 1
+
+ raise LcmException("Configuration aborted because dependent charm/s timeout")
+
+ def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
+ return deep_get(db_vnfr, ("vca-id",)) or deep_get(
+ db_nsr, ("instantiate_params", "vcaId")
+ )
+
+ async def instantiate_N2VC(
+ self,
+ logging_text,
+ vca_index,
+ nsi_id,
+ db_nsr,
+ db_vnfr,
+ vdu_id,
+ kdu_name,
+ vdu_index,
+ config_descriptor,
+ deploy_params,
+ base_folder,
+ nslcmop_id,
+ stage,
+ vca_type,
+ vca_name,
+ ee_config_descriptor,
+ ):
+ nsr_id = db_nsr["_id"]
+ db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
+ vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
+ vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
+ osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
+ db_dict = {
+ "collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": db_update_entry,
+ }
+ step = ""
+ try:
+
+ element_type = "NS"
+ element_under_configuration = nsr_id
+
+ vnfr_id = None
+ if db_vnfr:
+ vnfr_id = db_vnfr["_id"]
+ osm_config["osm"]["vnf_id"] = vnfr_id
+
+ namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
+
+ if vnfr_id:
+ element_type = "VNF"
+ element_under_configuration = vnfr_id
+ namespace += ".{}-{}".format(vnfr_id, vdu_index or 0)
+ if vdu_id:
+ namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
+ element_type = "VDU"
+ element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
+ osm_config["osm"]["vdu_id"] = vdu_id
+ elif kdu_name:
+ namespace += ".{}.{}".format(kdu_name, vdu_index or 0)
+ element_type = "KDU"
+ element_under_configuration = kdu_name
+ osm_config["osm"]["kdu_name"] = kdu_name
+
+ # Get artifact path
+ artifact_path = "{}/{}/{}/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ "charms"
+ if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
+ else "helm-charts",
+ vca_name,
+ )
+
+ self.logger.debug("Artifact path > {}".format(artifact_path))
+
+ # get initial_config_primitive_list that applies to this element
+ initial_config_primitive_list = config_descriptor.get(
+ "initial-config-primitive"
+ )
+
+ self.logger.debug(
+ "Initial config primitive list > {}".format(
+ initial_config_primitive_list
+ )
+ )
+
+ # add config if not present for NS charm
+ ee_descriptor_id = ee_config_descriptor.get("id")
+ self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
+ initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
+ initial_config_primitive_list, vca_deployed, ee_descriptor_id
+ )
+
+ self.logger.debug(
+ "Initial config primitive list #2 > {}".format(
+ initial_config_primitive_list
+ )
+ )
+ # n2vc_redesign STEP 3.1
+ # find old ee_id if exists
+ ee_id = vca_deployed.get("ee_id")
+
+ vca_id = self.get_vca_id(db_vnfr, db_nsr)
+ # create or register execution environment in VCA
+ if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status="CREATING",
+ element_under_configuration=element_under_configuration,
+ element_type=element_type,
+ )
+
+ step = "create execution environment"
+ self.logger.debug(logging_text + step)
+
+ ee_id = None
+ credentials = None
+ if vca_type == "k8s_proxy_charm":
+ ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
+ charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
+ namespace=namespace,
+ artifact_path=artifact_path,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ )
+ elif vca_type == "helm" or vca_type == "helm-v3":
+ ee_id, credentials = await self.vca_map[
+ vca_type
+ ].create_execution_environment(
+ namespace=namespace,
+ reuse_ee_id=ee_id,
+ db_dict=db_dict,
+ config=osm_config,
+ artifact_path=artifact_path,
+ vca_type=vca_type,
+ )
+ else:
+ ee_id, credentials = await self.vca_map[
+ vca_type
+ ].create_execution_environment(
+ namespace=namespace,
+ reuse_ee_id=ee_id,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ )
+
+ elif vca_type == "native_charm":
+ step = "Waiting to VM being up and getting IP address"
+ self.logger.debug(logging_text + step)
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
+ logging_text,
+ nsr_id,
+ vnfr_id,
+ vdu_id,
+ vdu_index,
+ user=None,
+ pub_key=None,
+ )
+ credentials = {"hostname": rw_mgmt_ip}
+ # get username
+ username = deep_get(
+ config_descriptor, ("config-access", "ssh-access", "default-user")
+ )
+ # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
+ # merged. Meanwhile let's get username from initial-config-primitive
+ if not username and initial_config_primitive_list:
+ for config_primitive in initial_config_primitive_list:
+ for param in config_primitive.get("parameter", ()):
+ if param["name"] == "ssh-username":
+ username = param["value"]
+ break
+ if not username:
+ raise LcmException(
+ "Cannot determine the username neither with 'initial-config-primitive' nor with "
+ "'config-access.ssh-access.default-user'"
+ )
+ credentials["username"] = username
+ # n2vc_redesign STEP 3.2
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status="REGISTERING",
+ element_under_configuration=element_under_configuration,
+ element_type=element_type,
+ )
+
+ step = "register execution environment {}".format(credentials)
+ self.logger.debug(logging_text + step)
+ ee_id = await self.vca_map[vca_type].register_execution_environment(
+ credentials=credentials,
+ namespace=namespace,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ )
+
+ # for compatibility with MON/POL modules, the need model and application name at database
+ # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
+ ee_id_parts = ee_id.split(".")
+ db_nsr_update = {db_update_entry + "ee_id": ee_id}
+ if len(ee_id_parts) >= 2:
+ model_name = ee_id_parts[0]
+ application_name = ee_id_parts[1]
+ db_nsr_update[db_update_entry + "model"] = model_name
+ db_nsr_update[db_update_entry + "application"] = application_name
+
+ # n2vc_redesign STEP 3.3
+ step = "Install configuration Software"
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status="INSTALLING SW",
+ element_under_configuration=element_under_configuration,
+ element_type=element_type,
+ other_update=db_nsr_update,
+ )
+
+ # TODO check if already done
+ self.logger.debug(logging_text + step)
+ config = None
+ if vca_type == "native_charm":
+ config_primitive = next(
+ (p for p in initial_config_primitive_list if p["name"] == "config"),
+ None,
+ )
+ if config_primitive:
+ config = self._map_primitive_params(
+ config_primitive, {}, deploy_params
+ )
+ num_units = 1
+ if vca_type == "lxc_proxy_charm":
+ if element_type == "NS":
+ num_units = db_nsr.get("config-units") or 1
+ elif element_type == "VNF":
+ num_units = db_vnfr.get("config-units") or 1
+ elif element_type == "VDU":
+ for v in db_vnfr["vdur"]:
+ if vdu_id == v["vdu-id-ref"]:
+ num_units = v.get("config-units") or 1
+ break
+ if vca_type != "k8s_proxy_charm":
+ await self.vca_map[vca_type].install_configuration_sw(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ db_dict=db_dict,
+ config=config,
+ num_units=num_units,
+ vca_id=vca_id,
+ )
+
+ # write in db flag of configuration_sw already installed
+ self.update_db_2(
+ "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
+ )
+
+ # add relations for this VCA (wait for other peers related with this VCA)
+ await self._add_vca_relations(
+ logging_text=logging_text,
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ vca_id=vca_id,
+ vca_type=vca_type,
+ )
+
+ # if SSH access is required, then get execution environment SSH public
+ # if native charm we have waited already to VM be UP
+ if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
+ pub_key = None
+ user = None
+ # self.logger.debug("get ssh key block")
+ if deep_get(
+ config_descriptor, ("config-access", "ssh-access", "required")
+ ):
+ # self.logger.debug("ssh key needed")
+ # Needed to inject a ssh key
+ user = deep_get(
+ config_descriptor,
+ ("config-access", "ssh-access", "default-user"),
+ )
+ step = "Install configuration Software, getting public ssh key"
+ pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
+ ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
+ )
+
+ step = "Insert public key into VM user={} ssh_key={}".format(
+ user, pub_key
+ )
+ else:
+ # self.logger.debug("no need to get ssh key")
+ step = "Waiting to VM being up and getting IP address"
+ self.logger.debug(logging_text + step)
+
+ # n2vc_redesign STEP 5.1
+ # wait for RO (ip-address) Insert pub_key into VM
+ if vnfr_id:
+ if kdu_name:
+ rw_mgmt_ip = await self.wait_kdu_up(
+ logging_text, nsr_id, vnfr_id, kdu_name
+ )
+ else:
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
+ logging_text,
+ nsr_id,
+ vnfr_id,
+ vdu_id,
+ vdu_index,
+ user=user,
+ pub_key=pub_key,
+ )
+ else:
+ rw_mgmt_ip = None # This is for a NS configuration
+
+ self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
+
+ # store rw_mgmt_ip in deploy params for later replacement
+ deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
+
+ # n2vc_redesign STEP 6 Execute initial config primitive
+ step = "execute initial config primitive"
+
+ # wait for dependent primitives execution (NS -> VNF -> VDU)
+ if initial_config_primitive_list:
+ await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
+
+ # stage, in function of element type: vdu, kdu, vnf or ns
+ my_vca = vca_deployed_list[vca_index]
+ if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
+ # VDU or KDU
+ stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
+ elif my_vca.get("member-vnf-index"):
+ # VNF
+ stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
+ else:
+ # NS
+ stage[0] = "Stage 5/5: running Day-1 primitives for NS."
+
+ self._write_configuration_status(
+ nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
+ )
+
+ self._write_op_status(op_id=nslcmop_id, stage=stage)
+
+ check_if_terminated_needed = True
+ for initial_config_primitive in initial_config_primitive_list:
+ # adding information on the vca_deployed if it is a NS execution environment
+ if not vca_deployed["member-vnf-index"]:
+ deploy_params["ns_config_info"] = json.dumps(
+ self._get_ns_config_info(nsr_id)
+ )
+ # TODO check if already done
+ primitive_params_ = self._map_primitive_params(
+ initial_config_primitive, {}, deploy_params
+ )
+
+ step = "execute primitive '{}' params '{}'".format(
+ initial_config_primitive["name"], primitive_params_
+ )
+ self.logger.debug(logging_text + step)
+ await self.vca_map[vca_type].exec_primitive(
+ ee_id=ee_id,
+ primitive_name=initial_config_primitive["name"],
+ params_dict=primitive_params_,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ )
+ # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
+ if check_if_terminated_needed:
+ if config_descriptor.get("terminate-config-primitive"):
+ self.update_db_2(
+ "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
+ )
+ check_if_terminated_needed = False
+
+ # TODO register in database that primitive is done
+
+ # STEP 7 Configure metrics
+ if vca_type == "helm" or vca_type == "helm-v3":
+ prometheus_jobs = await self.add_prometheus_metrics(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ ee_config_descriptor=ee_config_descriptor,
+ vnfr_id=vnfr_id,
+ nsr_id=nsr_id,
+ target_ip=rw_mgmt_ip,
+ )
+ if prometheus_jobs:
+ self.update_db_2(
+ "nsrs",
+ nsr_id,
+ {db_update_entry + "prometheus_jobs": prometheus_jobs},
+ )
+
+ step = "instantiated at VCA"
+ self.logger.debug(logging_text + step)
+
+ self._write_configuration_status(
+ nsr_id=nsr_id, vca_index=vca_index, status="READY"
+ )
+
+ except Exception as e: # TODO not use Exception but N2VC exception
+ # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
+ if not isinstance(
+ e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
+ ):
+ self.logger.error(
+ "Exception while {} : {}".format(step, e), exc_info=True
+ )
+ self._write_configuration_status(
+ nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
+ )
+ raise LcmException("{} {}".format(step, e)) from e
+
+ def _write_ns_status(
+ self,
+ nsr_id: str,
+ ns_state: str,
+ current_operation: str,
+ current_operation_id: str,
+ error_description: str = None,
+ error_detail: str = None,
+ other_update: dict = None,
+ ):
+ """
+ Update db_nsr fields.
+ :param nsr_id:
+ :param ns_state:
+ :param current_operation:
+ :param current_operation_id:
+ :param error_description:
+ :param error_detail:
+ :param other_update: Other required changes at database if provided, will be cleared
+ :return:
+ """
+ try:
+ db_dict = other_update or {}
+ db_dict[
+ "_admin.nslcmop"
+ ] = current_operation_id # for backward compatibility
+ db_dict["_admin.current-operation"] = current_operation_id
+ db_dict["_admin.operation-type"] = (
+ current_operation if current_operation != "IDLE" else None
+ )
+ db_dict["currentOperation"] = current_operation
+ db_dict["currentOperationID"] = current_operation_id
+ db_dict["errorDescription"] = error_description
+ db_dict["errorDetail"] = error_detail
+
+ if ns_state:
+ db_dict["nsState"] = ns_state
+ self.update_db_2("nsrs", nsr_id, db_dict)
+ except DbException as e:
+ self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
+
+ def _write_op_status(
+ self,
+ op_id: str,
+ stage: list = None,
+ error_message: str = None,
+ queuePosition: int = 0,
+ operation_state: str = None,
+ other_update: dict = None,
+ ):
+ try:
+ db_dict = other_update or {}
+ db_dict["queuePosition"] = queuePosition
+ if isinstance(stage, list):
+ db_dict["stage"] = stage[0]
+ db_dict["detailed-status"] = " ".join(stage)
+ elif stage is not None:
+ db_dict["stage"] = str(stage)
+
+ if error_message is not None:
+ db_dict["errorMessage"] = error_message
+ if operation_state is not None:
+ db_dict["operationState"] = operation_state
+ db_dict["statusEnteredTime"] = time()
+ self.update_db_2("nslcmops", op_id, db_dict)
+ except DbException as e:
+ self.logger.warn(
+ "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
+ )
+
+ def _write_all_config_status(self, db_nsr: dict, status: str):
+ try:
+ nsr_id = db_nsr["_id"]
+ # configurationStatus
+ config_status = db_nsr.get("configurationStatus")
+ if config_status:
+ db_nsr_update = {
+ "configurationStatus.{}.status".format(index): status
+ for index, v in enumerate(config_status)
+ if v
+ }
+ # update status
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ except DbException as e:
+ self.logger.warn(
+ "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
+ )
+
+ def _write_configuration_status(
+ self,
+ nsr_id: str,
+ vca_index: int,
+ status: str = None,
+ element_under_configuration: str = None,
+ element_type: str = None,
+ other_update: dict = None,
+ ):
+
+ # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
+ # .format(vca_index, status))
+
+ try:
+ db_path = "configurationStatus.{}.".format(vca_index)
+ db_dict = other_update or {}
+ if status:
+ db_dict[db_path + "status"] = status
+ if element_under_configuration:
+ db_dict[
+ db_path + "elementUnderConfiguration"
+ ] = element_under_configuration
+ if element_type:
+ db_dict[db_path + "elementType"] = element_type
+ self.update_db_2("nsrs", nsr_id, db_dict)
+ except DbException as e:
+ self.logger.warn(
+ "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
+ status, nsr_id, vca_index, e
+ )
+ )
+
+ async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
+ """
+ Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
+ sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
+ Database is used because the result can be obtained from a different LCM worker in case of HA.
+ :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
+ :param db_nslcmop: database content of nslcmop
+ :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
+ :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
+ computed 'vim-account-id'
+ """
+ modified = False
+ nslcmop_id = db_nslcmop["_id"]
+ placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
+ if placement_engine == "PLA":
+ self.logger.debug(
+ logging_text + "Invoke and wait for placement optimization"
+ )
+ await self.msg.aiowrite(
+ "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
+ )
+ db_poll_interval = 5
+ wait = db_poll_interval * 10
+ pla_result = None
+ while not pla_result and wait >= 0:
+ await asyncio.sleep(db_poll_interval)
+ wait -= db_poll_interval
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
+
+ if not pla_result:
+ raise LcmException(
+ "Placement timeout for nslcmopId={}".format(nslcmop_id)
+ )
+
+ for pla_vnf in pla_result["vnf"]:
+ vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
+ if not pla_vnf.get("vimAccountId") or not vnfr:
+ continue
+ modified = True
+ self.db.set_one(
+ "vnfrs",
+ {"_id": vnfr["_id"]},
+ {"vim-account-id": pla_vnf["vimAccountId"]},
+ )
+ # Modifies db_vnfrs
+ vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
+ return modified
+
+ def update_nsrs_with_pla_result(self, params):
+ try:
+ nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
+ self.update_db_2(
+ "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
+ )
+ except Exception as e:
+ self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
+
+ async def instantiate(self, nsr_id, nslcmop_id):
+ """
+
+ :param nsr_id: ns instance to deploy
+ :param nslcmop_id: operation to run
+ :return:
+ """
+
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ self.logger.debug(
+ "instantiate() task is not locked by me, ns={}".format(nsr_id)
+ )
+ return
+
+ logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
+ self.logger.debug(logging_text + "Enter")
+
+ # get all needed from database
+
+ # database nsrs record
+ db_nsr = None
+
+ # database nslcmops record
+ db_nslcmop = None
+
+ # update operation on nsrs
+ db_nsr_update = {}
+ # update operation on nslcmops
+ db_nslcmop_update = {}
+
+ nslcmop_operation_state = None
+ db_vnfrs = {} # vnf's info indexed by member-index
+ # n2vc_info = {}
+ tasks_dict_info = {} # from task to info text
+ exc = None
+ error_list = []
+ stage = [
+ "Stage 1/5: preparation of the environment.",
+ "Waiting for previous operations to terminate.",
+ "",
+ ]
+ # ^ stage, step, VIM progress
+ try:
+ # wait for any previous tasks in process
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+
+ # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
+ stage[1] = "Reading from database."
+ # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
+ db_nsr_update["detailed-status"] = "creating"
+ db_nsr_update["operational-status"] = "init"
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state="BUILDING",
+ current_operation="INSTANTIATING",
+ current_operation_id=nslcmop_id,
+ other_update=db_nsr_update,
+ )
+ self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
+
+ # read from db: operation
+ stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ ns_params = db_nslcmop.get("operationParams")
+ if ns_params and ns_params.get("timeout_ns_deploy"):
+ timeout_ns_deploy = ns_params["timeout_ns_deploy"]
+ else:
+ timeout_ns_deploy = self.timeout.get(
+ "ns_deploy", self.timeout_ns_deploy
+ )
+
+ # read from db: ns
+ stage[1] = "Getting nsr={} from db.".format(nsr_id)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
+ nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+ self.fs.sync(db_nsr["nsd-id"])
+ db_nsr["nsd"] = nsd
+ # nsr_name = db_nsr["name"] # TODO short-name??
+
+ # read from db: vnf's of this ns
+ stage[1] = "Getting vnfrs from db."
+ self.logger.debug(logging_text + stage[1])
+ db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
+
+ # read from db: vnfd's for every vnf
+ db_vnfds = [] # every vnfd data
+
+ # for each vnf in ns, read vnfd
+ for vnfr in db_vnfrs_list:
+ db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
+ vnfd_id = vnfr["vnfd-id"]
+ vnfd_ref = vnfr["vnfd-ref"]
+ self.fs.sync(vnfd_id)
+
+ # if we haven't this vnfd, read it from db
+ if vnfd_id not in db_vnfds:
+ # read from db
+ stage[1] = "Getting vnfd={} id='{}' from db.".format(
+ vnfd_id, vnfd_ref
+ )
+ self.logger.debug(logging_text + stage[1])
+ vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+
+ # store vnfd
+ db_vnfds.append(vnfd)
+
+ # Get or generates the _admin.deployed.VCA list
+ vca_deployed_list = None
+ if db_nsr["_admin"].get("deployed"):
+ vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
+ if vca_deployed_list is None:
+ vca_deployed_list = []
+ configuration_status_list = []
+ db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
+ db_nsr_update["configurationStatus"] = configuration_status_list
+ # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
+ populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
+ elif isinstance(vca_deployed_list, dict):
+ # maintain backward compatibility. Change a dict to list at database
+ vca_deployed_list = list(vca_deployed_list.values())
+ db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
+ populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
+
+ if not isinstance(
+ deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
+ ):
+ populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
+ db_nsr_update["_admin.deployed.RO.vnfd"] = []
+
+ # set state to INSTANTIATED. When instantiated NBI will not delete directly
+ db_nsr_update["_admin.nsState"] = "INSTANTIATED"
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self.db.set_list(
+ "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
+ )
+
+ # n2vc_redesign STEP 2 Deploy Network Scenario
+ stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
+ self._write_op_status(op_id=nslcmop_id, stage=stage)
+
+ stage[1] = "Deploying KDUs."
+ # self.logger.debug(logging_text + "Before deploy_kdus")
+ # Call to deploy_kdus in case exists the "vdu:kdu" param
+ await self.deploy_kdus(
+ logging_text=logging_text,
+ nsr_id=nsr_id,
+ nslcmop_id=nslcmop_id,
+ db_vnfrs=db_vnfrs,
+ db_vnfds=db_vnfds,
+ task_instantiation_info=tasks_dict_info,
+ )
+
+ stage[1] = "Getting VCA public key."
+ # n2vc_redesign STEP 1 Get VCA public ssh-key
+ # feature 1429. Add n2vc public key to needed VMs
+ n2vc_key = self.n2vc.get_public_key()
+ n2vc_key_list = [n2vc_key]
+ if self.vca_config.get("public_key"):
+ n2vc_key_list.append(self.vca_config["public_key"])
+
+ stage[1] = "Deploying NS at VIM."
+ task_ro = asyncio.ensure_future(
+ self.instantiate_RO(
+ logging_text=logging_text,
+ nsr_id=nsr_id,
+ nsd=nsd,
+ db_nsr=db_nsr,
+ db_nslcmop=db_nslcmop,
+ db_vnfrs=db_vnfrs,
+ db_vnfds=db_vnfds,
+ n2vc_key_list=n2vc_key_list,
+ stage=stage,
+ )
+ )
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
+ tasks_dict_info[task_ro] = "Deploying at VIM"
+
+ # n2vc_redesign STEP 3 to 6 Deploy N2VC
+ stage[1] = "Deploying Execution Environments."
+ self.logger.debug(logging_text + stage[1])
+
+ nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
+ for vnf_profile in get_vnf_profiles(nsd):
+ vnfd_id = vnf_profile["vnfd-id"]
+ vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
+ member_vnf_index = str(vnf_profile["id"])
+ db_vnfr = db_vnfrs[member_vnf_index]
+ base_folder = vnfd["_admin"]["storage"]
+ vdu_id = None
+ vdu_index = 0
+ vdu_name = None
+ kdu_name = None
+
+ # Get additional parameters
+ deploy_params = {"OSM": get_osm_params(db_vnfr)}
+ if db_vnfr.get("additionalParamsForVnf"):
+ deploy_params.update(
+ parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
+ )
+
+ descriptor_config = get_configuration(vnfd, vnfd["id"])
+ if descriptor_config:
+ self._deploy_n2vc(
+ logging_text=logging_text
+ + "member_vnf_index={} ".format(member_vnf_index),
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage,
+ )
+
+ # Deploy charms for each VDU that supports one.
+ for vdud in get_vdu_list(vnfd):
+ vdu_id = vdud["id"]
+ descriptor_config = get_configuration(vnfd, vdu_id)
+ vdur = find_in_list(
+ db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
+ )
+
+ if vdur.get("additionalParams"):
+ deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
+ else:
+ deploy_params_vdu = deploy_params
+ deploy_params_vdu["OSM"] = get_osm_params(
+ db_vnfr, vdu_id, vdu_count_index=0
+ )
+ vdud_count = get_vdu_profile(vnfd, vdu_id).get(
+ "max-number-of-instances", 1
+ )
+
+ self.logger.debug("VDUD > {}".format(vdud))
+ self.logger.debug(
+ "Descriptor config > {}".format(descriptor_config)
+ )
+ if descriptor_config:
+ vdu_name = None
+ kdu_name = None
+ for vdu_index in range(vdud_count):
+ # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
+ self._deploy_n2vc(
+ logging_text=logging_text
+ + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
+ member_vnf_index, vdu_id, vdu_index
+ ),
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params_vdu,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage,
+ )
+ for kdud in get_kdu_list(vnfd):
+ kdu_name = kdud["name"]
+ descriptor_config = get_configuration(vnfd, kdu_name)
+ if descriptor_config:
+ vdu_id = None
+ vdu_index = 0
+ vdu_name = None
+ kdur = next(
+ x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
+ )
+ deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
+ if kdur.get("additionalParams"):
+ deploy_params_kdu = parse_yaml_strings(
+ kdur["additionalParams"]
+ )
+
+ self._deploy_n2vc(
+ logging_text=logging_text,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params_kdu,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage,
+ )
+
+ # Check if this NS has a charm configuration
+ descriptor_config = nsd.get("ns-configuration")
+ if descriptor_config and descriptor_config.get("juju"):
+ vnfd_id = None
+ db_vnfr = None
+ member_vnf_index = None
+ vdu_id = None
+ kdu_name = None
+ vdu_index = 0
+ vdu_name = None
+
+ # Get additional parameters
+ deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
+ if db_nsr.get("additionalParamsForNs"):
+ deploy_params.update(
+ parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
+ )
+ base_folder = nsd["_admin"]["storage"]
+ self._deploy_n2vc(
+ logging_text=logging_text,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage,
+ )
+
+ # rest of staff will be done at finally
+
+ except (
+ ROclient.ROClientException,
+ DbException,
+ LcmException,
+ N2VCException,
+ ) as e:
+ self.logger.error(
+ logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
+ )
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(
+ logging_text + "Cancelled Exception while '{}'".format(stage[1])
+ )
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(
+ logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
+ exc_info=True,
+ )
+ finally:
+ if exc:
+ error_list.append(str(exc))
+ try:
+ # wait for pending tasks
+ if tasks_dict_info:
+ stage[1] = "Waiting for instantiate pending tasks."
+ self.logger.debug(logging_text + stage[1])
+ error_list += await self._wait_for_tasks(
+ logging_text,
+ tasks_dict_info,
+ timeout_ns_deploy,
+ stage,
+ nslcmop_id,
+ nsr_id=nsr_id,
+ )
+ stage[1] = stage[2] = ""
+ except asyncio.CancelledError:
+ error_list.append("Cancelled")
+ # TODO cancel all tasks
+ except Exception as exc:
+ error_list.append(str(exc))
+
+ # update operation-status
+ db_nsr_update["operational-status"] = "running"
+ # let's begin with VCA 'configured' status (later we can change it)
+ db_nsr_update["config-status"] = "configured"
+ for task, task_name in tasks_dict_info.items():
+ if not task.done() or task.cancelled() or task.exception():
+ if task_name.startswith(self.task_name_deploy_vca):
+ # A N2VC task is pending
+ db_nsr_update["config-status"] = "failed"
+ else:
+ # RO or KDU task is pending
+ db_nsr_update["operational-status"] = "failed"
+
+ # update status at database
+ if error_list:
+ error_detail = ". ".join(error_list)
+ self.logger.error(logging_text + error_detail)
+ error_description_nslcmop = "{} Detail: {}".format(
+ stage[0], error_detail
+ )
+ error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
+ nslcmop_id, stage[0]
+ )
+
+ db_nsr_update["detailed-status"] = (
+ error_description_nsr + " Detail: " + error_detail
+ )
+ db_nslcmop_update["detailed-status"] = error_detail
+ nslcmop_operation_state = "FAILED"
+ ns_state = "BROKEN"
+ else:
+ error_detail = None
+ error_description_nsr = error_description_nslcmop = None
+ ns_state = "READY"
+ db_nsr_update["detailed-status"] = "Done"
+ db_nslcmop_update["detailed-status"] = "Done"
+ nslcmop_operation_state = "COMPLETED"
+
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=ns_state,
+ current_operation="IDLE",
+ current_operation_id=None,
+ error_description=error_description_nsr,
+ error_detail=error_detail,
+ other_update=db_nsr_update,
+ )
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+
+ if nslcmop_operation_state:
+ try:
+ await self.msg.aiowrite(
+ "ns",
+ "instantiated",
+ {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ },
+ loop=self.loop,
+ )
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
+
+ async def _add_vca_relations(
+ self,
+ logging_text,
+ nsr_id,
+ vca_index: int,
+ timeout: int = 3600,
+ vca_type: str = None,
+ vca_id: str = None,
+ ) -> bool:
+
+ # steps:
+ # 1. find all relations for this VCA
+ # 2. wait for other peers related
+ # 3. add relations
+
+ try:
+ vca_type = vca_type or "lxc_proxy_charm"
+
+ # STEP 1: find all relations for this VCA
+
+ # read nsr record
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+
+ # this VCA data
+ my_vca = deep_get(db_nsr, ("_admin", "deployed", "VCA"))[vca_index]
+
+ # read all ns-configuration relations
+ ns_relations = list()
+ db_ns_relations = deep_get(nsd, ("ns-configuration", "relation"))
+ if db_ns_relations:
+ for r in db_ns_relations:
+ # check if this VCA is in the relation
+ if my_vca.get("member-vnf-index") in (
+ r.get("entities")[0].get("id"),
+ r.get("entities")[1].get("id"),
+ ):
+ ns_relations.append(r)
+
+ # read all vnf-configuration relations
+ vnf_relations = list()
+ db_vnfd_list = db_nsr.get("vnfd-id")
+ if db_vnfd_list:
+ for vnfd in db_vnfd_list:
+ db_vnf_relations = None
+ db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
+ db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"])
+ if db_vnf_configuration:
+ db_vnf_relations = db_vnf_configuration.get("relation", [])
+ if db_vnf_relations:
+ for r in db_vnf_relations:
+ # check if this VCA is in the relation
+ if my_vca.get("vdu_id") in (
+ r.get("entities")[0].get("id"),
+ r.get("entities")[1].get("id"),
+ ):
+ vnf_relations.append(r)
+
+ # if no relations, terminate
+ if not ns_relations and not vnf_relations:
+ self.logger.debug(logging_text + " No relations")
+ return True
+
+ self.logger.debug(
+ logging_text
+ + " adding relations\n {}\n {}".format(
+ ns_relations, vnf_relations
+ )
+ )
+
+ # add all relations
+ start = time()
+ while True:
+ # check timeout
+ now = time()
+ if now - start >= timeout:
+ self.logger.error(logging_text + " : timeout adding relations")
+ return False
+
+ # reload nsr from database (we need to update record: _admin.deloyed.VCA)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+
+ # for each defined NS relation, find the VCA's related
+ for r in ns_relations.copy():
+ from_vca_ee_id = None
+ to_vca_ee_id = None
+ from_vca_endpoint = None
+ to_vca_endpoint = None
+ vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
+ for vca in vca_list:
+ if vca.get("member-vnf-index") == r.get("entities")[0].get(
+ "id"
+ ) and vca.get("config_sw_installed"):
+ from_vca_ee_id = vca.get("ee_id")
+ from_vca_endpoint = r.get("entities")[0].get("endpoint")
+ if vca.get("member-vnf-index") == r.get("entities")[1].get(
+ "id"
+ ) and vca.get("config_sw_installed"):
+ to_vca_ee_id = vca.get("ee_id")
+ to_vca_endpoint = r.get("entities")[1].get("endpoint")
+ if from_vca_ee_id and to_vca_ee_id:
+ # add relation
+ await self.vca_map[vca_type].add_relation(
+ ee_id_1=from_vca_ee_id,
+ ee_id_2=to_vca_ee_id,
+ endpoint_1=from_vca_endpoint,
+ endpoint_2=to_vca_endpoint,
+ vca_id=vca_id,
+ )
+ # remove entry from relations list
+ ns_relations.remove(r)
+ else:
+ # check failed peers
+ try:
+ vca_status_list = db_nsr.get("configurationStatus")
+ if vca_status_list:
+ for i in range(len(vca_list)):
+ vca = vca_list[i]
+ vca_status = vca_status_list[i]
+ if vca.get("member-vnf-index") == r.get("entities")[
+ 0
+ ].get("id"):
+ if vca_status.get("status") == "BROKEN":
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ if vca.get("member-vnf-index") == r.get("entities")[
+ 1
+ ].get("id"):
+ if vca_status.get("status") == "BROKEN":
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ except Exception:
+ # ignore
+ pass
+
+ # for each defined VNF relation, find the VCA's related
+ for r in vnf_relations.copy():
+ from_vca_ee_id = None
+ to_vca_ee_id = None
+ from_vca_endpoint = None
+ to_vca_endpoint = None
+ vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
+ for vca in vca_list:
+ key_to_check = "vdu_id"
+ if vca.get("vdu_id") is None:
+ key_to_check = "vnfd_id"
+ if vca.get(key_to_check) == r.get("entities")[0].get(
+ "id"
+ ) and vca.get("config_sw_installed"):
+ from_vca_ee_id = vca.get("ee_id")
+ from_vca_endpoint = r.get("entities")[0].get("endpoint")
+ if vca.get(key_to_check) == r.get("entities")[1].get(
+ "id"
+ ) and vca.get("config_sw_installed"):
+ to_vca_ee_id = vca.get("ee_id")
+ to_vca_endpoint = r.get("entities")[1].get("endpoint")
+ if from_vca_ee_id and to_vca_ee_id:
+ # add relation
+ await self.vca_map[vca_type].add_relation(
+ ee_id_1=from_vca_ee_id,
+ ee_id_2=to_vca_ee_id,
+ endpoint_1=from_vca_endpoint,
+ endpoint_2=to_vca_endpoint,
+ vca_id=vca_id,
+ )
+ # remove entry from relations list
+ vnf_relations.remove(r)
+ else:
+ # check failed peers
+ try:
+ vca_status_list = db_nsr.get("configurationStatus")
+ if vca_status_list:
+ for i in range(len(vca_list)):
+ vca = vca_list[i]
+ vca_status = vca_status_list[i]
+ if vca.get("vdu_id") == r.get("entities")[0].get(
+ "id"
+ ):
+ if vca_status.get("status") == "BROKEN":
+ # peer broken: remove relation from list
+ vnf_relations.remove(r)
+ if vca.get("vdu_id") == r.get("entities")[1].get(
+ "id"
+ ):
+ if vca_status.get("status") == "BROKEN":
+ # peer broken: remove relation from list
+ vnf_relations.remove(r)
+ except Exception:
+ # ignore
+ pass
+
+ # wait for next try
+ await asyncio.sleep(5.0)
+
+ if not ns_relations and not vnf_relations:
+ self.logger.debug("Relations added")
+ break
+
+ return True
+
+ except Exception as e:
+ self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
+ return False
+
+ async def _install_kdu(
+ self,
+ nsr_id: str,
+ nsr_db_path: str,
+ vnfr_data: dict,
+ kdu_index: int,
+ kdud: dict,
+ vnfd: dict,
+ k8s_instance_info: dict,
+ k8params: dict = None,
+ timeout: int = 600,
+ vca_id: str = None,
+ ):
+
+ try:
+ k8sclustertype = k8s_instance_info["k8scluster-type"]
+ # Instantiate kdu
+ db_dict_install = {
+ "collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": nsr_db_path,
+ }
+
+ if k8s_instance_info.get("kdu-deployment-name"):
+ kdu_instance = k8s_instance_info.get("kdu-deployment-name")
+ else:
+ kdu_instance = self.k8scluster_map[
+ k8sclustertype
+ ].generate_kdu_instance_name(
+ db_dict=db_dict_install,
+ kdu_model=k8s_instance_info["kdu-model"],
+ kdu_name=k8s_instance_info["kdu-name"],
+ )
+ self.update_db_2(
+ "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
+ )
+ await self.k8scluster_map[k8sclustertype].install(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_model=k8s_instance_info["kdu-model"],
+ atomic=True,
+ params=k8params,
+ db_dict=db_dict_install,
+ timeout=timeout,
+ kdu_name=k8s_instance_info["kdu-name"],
+ namespace=k8s_instance_info["namespace"],
+ kdu_instance=kdu_instance,
+ vca_id=vca_id,
+ )
+ self.update_db_2(
+ "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
+ )
+
+ # Obtain services to obtain management service ip
+ services = await self.k8scluster_map[k8sclustertype].get_services(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_instance=kdu_instance,
+ namespace=k8s_instance_info["namespace"],
+ )
+
+ # Obtain management service info (if exists)
+ vnfr_update_dict = {}
+ kdu_config = get_configuration(vnfd, kdud["name"])
+ if kdu_config:
+ target_ee_list = kdu_config.get("execution-environment-list", [])
+ else:
+ target_ee_list = []
+
+ if services:
+ vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
+ mgmt_services = [
+ service
+ for service in kdud.get("service", [])
+ if service.get("mgmt-service")
+ ]
+ for mgmt_service in mgmt_services:
+ for service in services:
+ if service["name"].startswith(mgmt_service["name"]):
+ # Mgmt service found, Obtain service ip
+ ip = service.get("external_ip", service.get("cluster_ip"))
+ if isinstance(ip, list) and len(ip) == 1:
+ ip = ip[0]
+
+ vnfr_update_dict[
+ "kdur.{}.ip-address".format(kdu_index)
+ ] = ip
+
+ # Check if must update also mgmt ip at the vnf
+ service_external_cp = mgmt_service.get(
+ "external-connection-point-ref"
+ )
+ if service_external_cp:
+ if (
+ deep_get(vnfd, ("mgmt-interface", "cp"))
+ == service_external_cp
+ ):
+ vnfr_update_dict["ip-address"] = ip
+
+ if find_in_list(
+ target_ee_list,
+ lambda ee: ee.get(
+ "external-connection-point-ref", ""
+ )
+ == service_external_cp,
+ ):
+ vnfr_update_dict[
+ "kdur.{}.ip-address".format(kdu_index)
+ ] = ip
+ break
+ else:
+ self.logger.warn(
+ "Mgmt service name: {} not found".format(
+ mgmt_service["name"]
+ )
+ )
+
+ vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
+ self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
+
+ kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
+ if (
+ kdu_config
+ and kdu_config.get("initial-config-primitive")
+ and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
+ ):
+ initial_config_primitive_list = kdu_config.get(
+ "initial-config-primitive"
+ )
+ initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
+
+ for initial_config_primitive in initial_config_primitive_list:
+ primitive_params_ = self._map_primitive_params(
+ initial_config_primitive, {}, {}
+ )
+
+ await asyncio.wait_for(
+ self.k8scluster_map[k8sclustertype].exec_primitive(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_instance=kdu_instance,
+ primitive_name=initial_config_primitive["name"],
+ params=primitive_params_,
+ db_dict=db_dict_install,
+ vca_id=vca_id,
+ ),
+ timeout=timeout,
+ )
+
+ except Exception as e:
+ # Prepare update db with error and raise exception
+ try:
+ self.update_db_2(
+ "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
+ )
+ self.update_db_2(
+ "vnfrs",
+ vnfr_data.get("_id"),
+ {"kdur.{}.status".format(kdu_index): "ERROR"},
+ )
+ except Exception:
+ # ignore to keep original exception
+ pass
+ # reraise original error
+ raise
+
+ return kdu_instance
+
+ async def deploy_kdus(
+ self,
+ logging_text,
+ nsr_id,
+ nslcmop_id,
+ db_vnfrs,
+ db_vnfds,
+ task_instantiation_info,
+ ):
+ # Launch kdus if present in the descriptor
+
+ k8scluster_id_2_uuic = {
+ "helm-chart-v3": {},
+ "helm-chart": {},
+ "juju-bundle": {},
+ }
+
+ async def _get_cluster_id(cluster_id, cluster_type):
+ nonlocal k8scluster_id_2_uuic
+ if cluster_id in k8scluster_id_2_uuic[cluster_type]:
+ return k8scluster_id_2_uuic[cluster_type][cluster_id]
+
+ # check if K8scluster is creating and wait look if previous tasks in process
+ task_name, task_dependency = self.lcm_tasks.lookfor_related(
+ "k8scluster", cluster_id
+ )
+ if task_dependency:
+ text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
+ task_name, cluster_id
+ )
+ self.logger.debug(logging_text + text)
+ await asyncio.wait(task_dependency, timeout=3600)
+
+ db_k8scluster = self.db.get_one(
+ "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
+ )
+ if not db_k8scluster:
+ raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
+
+ k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
+ if not k8s_id:
+ if cluster_type == "helm-chart-v3":
+ try:
+ # backward compatibility for existing clusters that have not been initialized for helm v3
+ k8s_credentials = yaml.safe_dump(
+ db_k8scluster.get("credentials")
+ )
+ k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
+ k8s_credentials, reuse_cluster_uuid=cluster_id
+ )
+ db_k8scluster_update = {}
+ db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
+ db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
+ db_k8scluster_update[
+ "_admin.helm-chart-v3.created"
+ ] = uninstall_sw
+ db_k8scluster_update[
+ "_admin.helm-chart-v3.operationalState"
+ ] = "ENABLED"
+ self.update_db_2(
+ "k8sclusters", cluster_id, db_k8scluster_update
+ )
+ except Exception as e:
+ self.logger.error(
+ logging_text
+ + "error initializing helm-v3 cluster: {}".format(str(e))
+ )
+ raise LcmException(
+ "K8s cluster '{}' has not been initialized for '{}'".format(
+ cluster_id, cluster_type
+ )
+ )
+ else:
+ raise LcmException(
+ "K8s cluster '{}' has not been initialized for '{}'".format(
+ cluster_id, cluster_type
+ )
+ )
+ k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
+ return k8s_id
+
+ logging_text += "Deploy kdus: "
+ step = ""
+ try:
+ db_nsr_update = {"_admin.deployed.K8s": []}
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ index = 0
+ updated_cluster_list = []
+ updated_v3_cluster_list = []
+
+ for vnfr_data in db_vnfrs.values():
+ vca_id = self.get_vca_id(vnfr_data, {})
+ for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
+ # Step 0: Prepare and set parameters
+ desc_params = parse_yaml_strings(kdur.get("additionalParams"))
+ vnfd_id = vnfr_data.get("vnfd-id")
+ vnfd_with_id = find_in_list(
+ db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
+ )
+ kdud = next(
+ kdud
+ for kdud in vnfd_with_id["kdu"]
+ if kdud["name"] == kdur["kdu-name"]
+ )
+ namespace = kdur.get("k8s-namespace")
+ kdu_deployment_name = kdur.get("kdu-deployment-name")
+ if kdur.get("helm-chart"):
+ kdumodel = kdur["helm-chart"]
+ # Default version: helm3, if helm-version is v2 assign v2
+ k8sclustertype = "helm-chart-v3"
+ self.logger.debug("kdur: {}".format(kdur))
+ if (
+ kdur.get("helm-version")
+ and kdur.get("helm-version") == "v2"
+ ):
+ k8sclustertype = "helm-chart"
+ elif kdur.get("juju-bundle"):
+ kdumodel = kdur["juju-bundle"]
+ k8sclustertype = "juju-bundle"
+ else:
+ raise LcmException(
+ "kdu type for kdu='{}.{}' is neither helm-chart nor "
+ "juju-bundle. Maybe an old NBI version is running".format(
+ vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
+ )
+ )
+ # check if kdumodel is a file and exists
+ try:
+ vnfd_with_id = find_in_list(
+ db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
+ )
+ storage = deep_get(vnfd_with_id, ("_admin", "storage"))
+ if storage and storage.get(
+ "pkg-dir"
+ ): # may be not present if vnfd has not artifacts
+ # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
+ filename = "{}/{}/{}s/{}".format(
+ storage["folder"],
+ storage["pkg-dir"],
+ k8sclustertype,
+ kdumodel,
+ )
+ if self.fs.file_exists(
+ filename, mode="file"
+ ) or self.fs.file_exists(filename, mode="dir"):
+ kdumodel = self.fs.path + filename
+ except (asyncio.TimeoutError, asyncio.CancelledError):
+ raise
+ except Exception: # it is not a file
+ pass
+
+ k8s_cluster_id = kdur["k8s-cluster"]["id"]
+ step = "Synchronize repos for k8s cluster '{}'".format(
+ k8s_cluster_id
+ )
+ cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
+
+ # Synchronize repos
+ if (
+ k8sclustertype == "helm-chart"
+ and cluster_uuid not in updated_cluster_list
+ ) or (
+ k8sclustertype == "helm-chart-v3"
+ and cluster_uuid not in updated_v3_cluster_list
+ ):
+ del_repo_list, added_repo_dict = await asyncio.ensure_future(
+ self.k8scluster_map[k8sclustertype].synchronize_repos(
+ cluster_uuid=cluster_uuid
+ )
+ )
+ if del_repo_list or added_repo_dict:
+ if k8sclustertype == "helm-chart":
+ unset = {
+ "_admin.helm_charts_added." + item: None
+ for item in del_repo_list
+ }
+ updated = {
+ "_admin.helm_charts_added." + item: name
+ for item, name in added_repo_dict.items()
+ }
+ updated_cluster_list.append(cluster_uuid)
+ elif k8sclustertype == "helm-chart-v3":
+ unset = {
+ "_admin.helm_charts_v3_added." + item: None
+ for item in del_repo_list
+ }
+ updated = {
+ "_admin.helm_charts_v3_added." + item: name
+ for item, name in added_repo_dict.items()
+ }
+ updated_v3_cluster_list.append(cluster_uuid)
+ self.logger.debug(
+ logging_text + "repos synchronized on k8s cluster "
+ "'{}' to_delete: {}, to_add: {}".format(
+ k8s_cluster_id, del_repo_list, added_repo_dict
+ )
+ )
+ self.db.set_one(
+ "k8sclusters",
+ {"_id": k8s_cluster_id},
+ updated,
+ unset=unset,
+ )
+
+ # Instantiate kdu
+ step = "Instantiating KDU {}.{} in k8s cluster {}".format(
+ vnfr_data["member-vnf-index-ref"],
+ kdur["kdu-name"],
+ k8s_cluster_id,
+ )
+ k8s_instance_info = {
+ "kdu-instance": None,
+ "k8scluster-uuid": cluster_uuid,
+ "k8scluster-type": k8sclustertype,
+ "member-vnf-index": vnfr_data["member-vnf-index-ref"],
+ "kdu-name": kdur["kdu-name"],
+ "kdu-model": kdumodel,
+ "namespace": namespace,
+ "kdu-deployment-name": kdu_deployment_name,
+ }
+ db_path = "_admin.deployed.K8s.{}".format(index)
+ db_nsr_update[db_path] = k8s_instance_info
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ vnfd_with_id = find_in_list(
+ db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
+ )
+ task = asyncio.ensure_future(
+ self._install_kdu(
+ nsr_id,
+ db_path,
+ vnfr_data,
+ kdu_index,
+ kdud,
+ vnfd_with_id,
+ k8s_instance_info,
+ k8params=desc_params,
+ timeout=600,
+ vca_id=vca_id,
+ )
+ )
+ self.lcm_tasks.register(
+ "ns",
+ nsr_id,
+ nslcmop_id,
+ "instantiate_KDU-{}".format(index),
+ task,
+ )
+ task_instantiation_info[task] = "Deploying KDU {}".format(
+ kdur["kdu-name"]
+ )
+
+ index += 1
+
+ except (LcmException, asyncio.CancelledError):
+ raise
+ except Exception as e:
+ msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
+ if isinstance(e, (N2VCException, DbException)):
+ self.logger.error(logging_text + msg)
+ else:
+ self.logger.critical(logging_text + msg, exc_info=True)
+ raise LcmException(msg)
+ finally:
+ if db_nsr_update:
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ def _deploy_n2vc(
+ self,
+ logging_text,
+ db_nsr,
+ db_vnfr,
+ nslcmop_id,
+ nsr_id,
+ nsi_id,
+ vnfd_id,
+ vdu_id,
+ kdu_name,
+ member_vnf_index,
+ vdu_index,
+ vdu_name,
+ deploy_params,
+ descriptor_config,
+ base_folder,
+ task_instantiation_info,
+ stage,
+ ):
+ # launch instantiate_N2VC in a asyncio task and register task object
+ # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
+ # if not found, create one entry and update database
+ # fill db_nsr._admin.deployed.VCA.<index>
+
+ self.logger.debug(
+ logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
+ )
+ if "execution-environment-list" in descriptor_config:
+ ee_list = descriptor_config.get("execution-environment-list", [])
+ elif "juju" in descriptor_config:
+ ee_list = [descriptor_config] # ns charms
+ else: # other types as script are not supported
+ ee_list = []
+
+ for ee_item in ee_list:
+ self.logger.debug(
+ logging_text
+ + "_deploy_n2vc ee_item juju={}, helm={}".format(
+ ee_item.get("juju"), ee_item.get("helm-chart")
+ )
+ )
+ ee_descriptor_id = ee_item.get("id")
+ if ee_item.get("juju"):
+ vca_name = ee_item["juju"].get("charm")
+ vca_type = (
+ "lxc_proxy_charm"
+ if ee_item["juju"].get("charm") is not None
+ else "native_charm"
+ )
+ if ee_item["juju"].get("cloud") == "k8s":
+ vca_type = "k8s_proxy_charm"
+ elif ee_item["juju"].get("proxy") is False:
+ vca_type = "native_charm"
+ elif ee_item.get("helm-chart"):
+ vca_name = ee_item["helm-chart"]
+ if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
+ vca_type = "helm"
+ else:
+ vca_type = "helm-v3"
+ else:
+ self.logger.debug(
+ logging_text + "skipping non juju neither charm configuration"
+ )
+ continue
+
+ vca_index = -1
+ for vca_index, vca_deployed in enumerate(
+ db_nsr["_admin"]["deployed"]["VCA"]
+ ):
+ if not vca_deployed:
+ continue
+ if (
+ vca_deployed.get("member-vnf-index") == member_vnf_index
+ and vca_deployed.get("vdu_id") == vdu_id
+ and vca_deployed.get("kdu_name") == kdu_name
+ and vca_deployed.get("vdu_count_index", 0) == vdu_index
+ and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
+ ):
+ break
+ else:
+ # not found, create one.
+ target = (
+ "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
+ )
+ if vdu_id:
+ target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
+ elif kdu_name:
+ target += "/kdu/{}".format(kdu_name)
+ vca_deployed = {
+ "target_element": target,
+ # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
+ "member-vnf-index": member_vnf_index,
+ "vdu_id": vdu_id,
+ "kdu_name": kdu_name,
+ "vdu_count_index": vdu_index,
+ "operational-status": "init", # TODO revise
+ "detailed-status": "", # TODO revise
+ "step": "initial-deploy", # TODO revise
+ "vnfd_id": vnfd_id,
+ "vdu_name": vdu_name,
+ "type": vca_type,
+ "ee_descriptor_id": ee_descriptor_id,
+ }
+ vca_index += 1
+
+ # create VCA and configurationStatus in db
+ db_dict = {
+ "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
+ "configurationStatus.{}".format(vca_index): dict(),
+ }
+ self.update_db_2("nsrs", nsr_id, db_dict)
+
+ db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
+
+ self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
+ self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
+ self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
+
+ # Launch task
+ task_n2vc = asyncio.ensure_future(
+ self.instantiate_N2VC(
+ logging_text=logging_text,
+ vca_index=vca_index,
+ nsi_id=nsi_id,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ vdu_index=vdu_index,
+ deploy_params=deploy_params,
+ config_descriptor=descriptor_config,
+ base_folder=base_folder,
+ nslcmop_id=nslcmop_id,
+ stage=stage,
+ vca_type=vca_type,
+ vca_name=vca_name,
+ ee_config_descriptor=ee_item,
+ )
+ )
+ self.lcm_tasks.register(
+ "ns",
+ nsr_id,
+ nslcmop_id,
+ "instantiate_N2VC-{}".format(vca_index),
+ task_n2vc,
+ )
+ task_instantiation_info[
+ task_n2vc
+ ] = self.task_name_deploy_vca + " {}.{}".format(
+ member_vnf_index or "", vdu_id or ""
+ )
+
+ @staticmethod
+ def _create_nslcmop(nsr_id, operation, params):
+ """
+ Creates a ns-lcm-opp content to be stored at database.
+ :param nsr_id: internal id of the instance
+ :param operation: instantiate, terminate, scale, action, ...
+ :param params: user parameters for the operation
+ :return: dictionary following SOL005 format
+ """
+ # Raise exception if invalid arguments
+ if not (nsr_id and operation and params):
+ raise LcmException(
+ "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
+ )
+ now = time()
+ _id = str(uuid4())
+ nslcmop = {
+ "id": _id,
+ "_id": _id,
+ # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
+ "operationState": "PROCESSING",
+ "statusEnteredTime": now,
+ "nsInstanceId": nsr_id,
+ "lcmOperationType": operation,
+ "startTime": now,
+ "isAutomaticInvocation": False,
+ "operationParams": params,
+ "isCancelPending": False,
+ "links": {
+ "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
+ "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
+ },
+ }
+ return nslcmop
+
+ def _format_additional_params(self, params):
+ params = params or {}
+ for key, value in params.items():
+ if str(value).startswith("!!yaml "):
+ params[key] = yaml.safe_load(value[7:])
+ return params
+
+ def _get_terminate_primitive_params(self, seq, vnf_index):
+ primitive = seq.get("name")
+ primitive_params = {}
+ params = {
+ "member_vnf_index": vnf_index,
+ "primitive": primitive,
+ "primitive_params": primitive_params,
+ }
+ desc_params = {}
+ return self._map_primitive_params(seq, params, desc_params)
+
+ # sub-operations
+
+ def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
+ op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
+ if op.get("operationState") == "COMPLETED":
+ # b. Skip sub-operation
+ # _ns_execute_primitive() or RO.create_action() will NOT be executed
+ return self.SUBOPERATION_STATUS_SKIP
+ else:
+ # c. retry executing sub-operation
+ # The sub-operation exists, and operationState != 'COMPLETED'
+ # Update operationState = 'PROCESSING' to indicate a retry.
+ operationState = "PROCESSING"
+ detailed_status = "In progress"
+ self._update_suboperation_status(
+ db_nslcmop, op_index, operationState, detailed_status
+ )
+ # Return the sub-operation index
+ # _ns_execute_primitive() or RO.create_action() will be called from scale()
+ # with arguments extracted from the sub-operation
+ return op_index
+
+ # Find a sub-operation where all keys in a matching dictionary must match
+ # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
+ def _find_suboperation(self, db_nslcmop, match):
+ if db_nslcmop and match:
+ op_list = db_nslcmop.get("_admin", {}).get("operations", [])
+ for i, op in enumerate(op_list):
+ if all(op.get(k) == match[k] for k in match):
+ return i
+ return self.SUBOPERATION_STATUS_NOT_FOUND
+
+ # Update status for a sub-operation given its index
+ def _update_suboperation_status(
+ self, db_nslcmop, op_index, operationState, detailed_status
+ ):
+ # Update DB for HA tasks
+ q_filter = {"_id": db_nslcmop["_id"]}
+ update_dict = {
+ "_admin.operations.{}.operationState".format(op_index): operationState,
+ "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
+ }
+ self.db.set_one(
+ "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
+ )
+
+ # Add sub-operation, return the index of the added sub-operation
+ # Optionally, set operationState, detailed-status, and operationType
+ # Status and type are currently set for 'scale' sub-operations:
+ # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
+ # 'detailed-status' : status message
+ # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
+ # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
+ def _add_suboperation(
+ self,
+ db_nslcmop,
+ vnf_index,
+ vdu_id,
+ vdu_count_index,
+ vdu_name,
+ primitive,
+ mapped_primitive_params,
+ operationState=None,
+ detailed_status=None,
+ operationType=None,
+ RO_nsr_id=None,
+ RO_scaling_info=None,
+ ):
+ if not db_nslcmop:
+ return self.SUBOPERATION_STATUS_NOT_FOUND
+ # Get the "_admin.operations" list, if it exists
+ db_nslcmop_admin = db_nslcmop.get("_admin", {})
+ op_list = db_nslcmop_admin.get("operations")
+ # Create or append to the "_admin.operations" list
+ new_op = {
+ "member_vnf_index": vnf_index,
+ "vdu_id": vdu_id,
+ "vdu_count_index": vdu_count_index,
+ "primitive": primitive,
+ "primitive_params": mapped_primitive_params,
+ }
+ if operationState:
+ new_op["operationState"] = operationState
+ if detailed_status:
+ new_op["detailed-status"] = detailed_status
+ if operationType:
+ new_op["lcmOperationType"] = operationType
+ if RO_nsr_id:
+ new_op["RO_nsr_id"] = RO_nsr_id
+ if RO_scaling_info:
+ new_op["RO_scaling_info"] = RO_scaling_info
+ if not op_list:
+ # No existing operations, create key 'operations' with current operation as first list element
+ db_nslcmop_admin.update({"operations": [new_op]})
+ op_list = db_nslcmop_admin.get("operations")
+ else:
+ # Existing operations, append operation to list
+ op_list.append(new_op)
+
+ db_nslcmop_update = {"_admin.operations": op_list}
+ self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
+ op_index = len(op_list) - 1
+ return op_index
+
+ # Helper methods for scale() sub-operations
+
+ # pre-scale/post-scale:
+ # Check for 3 different cases:
+ # a. New: First time execution, return SUBOPERATION_STATUS_NEW
+ # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
+ # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
+ def _check_or_add_scale_suboperation(
+ self,
+ db_nslcmop,
+ vnf_index,
+ vnf_config_primitive,
+ primitive_params,
+ operationType,
+ RO_nsr_id=None,
+ RO_scaling_info=None,
+ ):
+ # Find this sub-operation
+ if RO_nsr_id and RO_scaling_info:
+ operationType = "SCALE-RO"
+ match = {
+ "member_vnf_index": vnf_index,
+ "RO_nsr_id": RO_nsr_id,
+ "RO_scaling_info": RO_scaling_info,
+ }
+ else:
+ match = {
+ "member_vnf_index": vnf_index,
+ "primitive": vnf_config_primitive,
+ "primitive_params": primitive_params,
+ "lcmOperationType": operationType,
+ }
+ op_index = self._find_suboperation(db_nslcmop, match)
+ if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
+ # a. New sub-operation
+ # The sub-operation does not exist, add it.
+ # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
+ # The following parameters are set to None for all kind of scaling:
+ vdu_id = None
+ vdu_count_index = None
+ vdu_name = None
+ if RO_nsr_id and RO_scaling_info:
+ vnf_config_primitive = None
+ primitive_params = None
+ else:
+ RO_nsr_id = None
+ RO_scaling_info = None
+ # Initial status for sub-operation
+ operationState = "PROCESSING"
+ detailed_status = "In progress"
+ # Add sub-operation for pre/post-scaling (zero or more operations)
+ self._add_suboperation(
+ db_nslcmop,
+ vnf_index,
+ vdu_id,
+ vdu_count_index,
+ vdu_name,
+ vnf_config_primitive,
+ primitive_params,
+ operationState,
+ detailed_status,
+ operationType,
+ RO_nsr_id,
+ RO_scaling_info,
+ )
+ return self.SUBOPERATION_STATUS_NEW
+ else:
+ # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
+ # or op_index (operationState != 'COMPLETED')
+ return self._retry_or_skip_suboperation(db_nslcmop, op_index)
+
+ # Function to return execution_environment id
+
+ def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
+ # TODO vdu_index_count
+ for vca in vca_deployed_list:
+ if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
+ return vca["ee_id"]
+
+ async def destroy_N2VC(
+ self,
+ logging_text,
+ db_nslcmop,
+ vca_deployed,
+ config_descriptor,
+ vca_index,
+ destroy_ee=True,
+ exec_primitives=True,
+ scaling_in=False,
+ vca_id: str = None,
+ ):
+ """
+ Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
+ :param logging_text:
+ :param db_nslcmop:
+ :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
+ :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
+ :param vca_index: index in the database _admin.deployed.VCA
+ :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
+ :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
+ not executed properly
+ :param scaling_in: True destroys the application, False destroys the model
+ :return: None or exception
+ """
+
+ self.logger.debug(
+ logging_text
+ + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
+ vca_index, vca_deployed, config_descriptor, destroy_ee
+ )
+ )
+
+ vca_type = vca_deployed.get("type", "lxc_proxy_charm")
+
+ # execute terminate_primitives
+ if exec_primitives:
+ terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
+ config_descriptor.get("terminate-config-primitive"),
+ vca_deployed.get("ee_descriptor_id"),
+ )
+ vdu_id = vca_deployed.get("vdu_id")
+ vdu_count_index = vca_deployed.get("vdu_count_index")
+ vdu_name = vca_deployed.get("vdu_name")
+ vnf_index = vca_deployed.get("member-vnf-index")
+ if terminate_primitives and vca_deployed.get("needed_terminate"):
+ for seq in terminate_primitives:
+ # For each sequence in list, get primitive and call _ns_execute_primitive()
+ step = "Calling terminate action for vnf_member_index={} primitive={}".format(
+ vnf_index, seq.get("name")
+ )
+ self.logger.debug(logging_text + step)
+ # Create the primitive for each sequence, i.e. "primitive": "touch"
+ primitive = seq.get("name")
+ mapped_primitive_params = self._get_terminate_primitive_params(
+ seq, vnf_index
+ )
+
+ # Add sub-operation
+ self._add_suboperation(
+ db_nslcmop,
+ vnf_index,
+ vdu_id,
+ vdu_count_index,
+ vdu_name,
+ primitive,
+ mapped_primitive_params,
+ )
+ # Sub-operations: Call _ns_execute_primitive() instead of action()
+ try:
+ result, result_detail = await self._ns_execute_primitive(
+ vca_deployed["ee_id"],
+ primitive,
+ mapped_primitive_params,
+ vca_type=vca_type,
+ vca_id=vca_id,
+ )
+ except LcmException:
+ # this happens when VCA is not deployed. In this case it is not needed to terminate
+ continue
+ result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
+ if result not in result_ok:
+ raise LcmException(
+ "terminate_primitive {} for vnf_member_index={} fails with "
+ "error {}".format(seq.get("name"), vnf_index, result_detail)
+ )
+ # set that this VCA do not need terminated
+ db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
+ vca_index
+ )
+ self.update_db_2(
+ "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
+ )
+
+ if vca_deployed.get("prometheus_jobs") and self.prometheus:
+ await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
+
+ if destroy_ee:
+ await self.vca_map[vca_type].delete_execution_environment(
+ vca_deployed["ee_id"],
+ scaling_in=scaling_in,
+ vca_id=vca_id,
+ )
+
+ async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
+ self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
+ namespace = "." + db_nsr["_id"]
+ try:
+ await self.n2vc.delete_namespace(
+ namespace=namespace,
+ total_timeout=self.timeout_charm_delete,
+ vca_id=vca_id,
+ )
+ except N2VCNotFound: # already deleted. Skip
+ pass
+ self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
+
+ async def _terminate_RO(
+ self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
+ ):
+ """
+ Terminates a deployment from RO
+ :param logging_text:
+ :param nsr_deployed: db_nsr._admin.deployed
+ :param nsr_id:
+ :param nslcmop_id:
+ :param stage: list of string with the content to write on db_nslcmop.detailed-status.
+ this method will update only the index 2, but it will write on database the concatenated content of the list
+ :return:
+ """
+ db_nsr_update = {}
+ failed_detail = []
+ ro_nsr_id = ro_delete_action = None
+ if nsr_deployed and nsr_deployed.get("RO"):
+ ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
+ ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
+ try:
+ if ro_nsr_id:
+ stage[2] = "Deleting ns from VIM."
+ db_nsr_update["detailed-status"] = " ".join(stage)
+ self._write_op_status(nslcmop_id, stage)
+ self.logger.debug(logging_text + stage[2])
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+ desc = await self.RO.delete("ns", ro_nsr_id)
+ ro_delete_action = desc["action_id"]
+ db_nsr_update[
+ "_admin.deployed.RO.nsr_delete_action_id"
+ ] = ro_delete_action
+ db_nsr_update["_admin.deployed.RO.nsr_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
+ if ro_delete_action:
+ # wait until NS is deleted from VIM
+ stage[2] = "Waiting ns deleted from VIM."
+ detailed_status_old = None
+ self.logger.debug(
+ logging_text
+ + stage[2]
+ + " RO_id={} ro_delete_action={}".format(
+ ro_nsr_id, ro_delete_action
+ )
+ )
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+
+ delete_timeout = 20 * 60 # 20 minutes
+ while delete_timeout > 0:
+ desc = await self.RO.show(
+ "ns",
+ item_id_name=ro_nsr_id,
+ extra_item="action",
+ extra_item_id=ro_delete_action,
+ )
+
+ # deploymentStatus
+ self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
+
+ ns_status, ns_status_info = self.RO.check_action_status(desc)
+ if ns_status == "ERROR":
+ raise ROclient.ROClientException(ns_status_info)
+ elif ns_status == "BUILD":
+ stage[2] = "Deleting from VIM {}".format(ns_status_info)
+ elif ns_status == "ACTIVE":
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
+ break
+ else:
+ assert (
+ False
+ ), "ROclient.check_action_status returns unknown {}".format(
+ ns_status
+ )
+ if stage[2] != detailed_status_old:
+ detailed_status_old = stage[2]
+ db_nsr_update["detailed-status"] = " ".join(stage)
+ self._write_op_status(nslcmop_id, stage)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ await asyncio.sleep(5, loop=self.loop)
+ delete_timeout -= 5
+ else: # delete_timeout <= 0:
+ raise ROclient.ROClientException(
+ "Timeout waiting ns deleted from VIM"
+ )