) # Time for charm from first time at blocked,error status to mark as failed
timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
+ timeout_ns_heal = 1800 # default global timeout for un deployment a ns
timeout_charm_delete = 10 * 60
timeout_primitive = 30 * 60 # timeout for primitive execution
timeout_ns_update = 30 * 60 # timeout for ns update
# create RO client
self.RO = NgRoClient(self.loop, **self.ro_config)
+ self.op_status_map = {
+ "instantiation": self.RO.status,
+ "termination": self.RO.status,
+ "migrate": self.RO.status,
+ "healing": self.RO.recreate_status,
+ }
+
@staticmethod
def increment_ip_mac(ip_mac, vm_index=1):
if not isinstance(ip_mac, str):
target_vnf["vdur"] = vdur_list
target["vnf"].append(target_vnf)
+ self.logger.debug("Send to RO > nsr_id={} target={}".format(nsr_id, target))
desc = await self.RO.deploy(nsr_id, target)
self.logger.debug("RO return > {}".format(desc))
action_id = desc["action_id"]
await self._wait_ng_ro(
- nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
+ nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage,
+ operation="instantiation"
)
# Updating NSR
start_time=None,
timeout=600,
stage=None,
+ operation=None,
):
detailed_status_old = None
db_nsr_update = {}
start_time = start_time or time()
while time() <= start_time + timeout:
- desc_status = await self.RO.status(nsr_id, action_id)
+ desc_status = await self.op_status_map[operation](nsr_id, action_id)
self.logger.debug("Wait NG RO > {}".format(desc_status))
if desc_status["status"] == "FAILED":
raise NgRoException(desc_status["details"])
# wait until done
delete_timeout = 20 * 60 # 20 minutes
await self._wait_ng_ro(
- nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
+ nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage,
+ operation="termination"
)
db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
}
desc = await self.RO.deploy(nsr_id, target)
action_id = desc["action_id"]
- await self._wait_ng_ro(nsr_id, action_id, timeout=600)
+ await self._wait_ng_ro(nsr_id, action_id, timeout=600, operation="instantiation")
break
else:
# wait until NS is deployed at RO
self.logger.debug("RO return > {}".format(desc))
action_id = desc["action_id"]
await self._wait_ng_ro(
- nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_migrate
+ nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_migrate,
+ operation="migrate"
)
except (ROclient.ROClientException, DbException, LcmException) as e:
self.logger.error("Exit Exception {}".format(e))
)
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_migrate")
+
+
+ async def heal(self, nsr_id, nslcmop_id):
+ """
+ Heal NS
+
+ :param nsr_id: ns instance to heal
+ :param nslcmop_id: operation to run
+ :return:
+ """
+
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+
+ logging_text = "Task ns={} heal={} ".format(nsr_id, nslcmop_id)
+ stage = ["", "", ""]
+ tasks_dict_info = {}
+ # ^ stage, step, VIM progress
+ self.logger.debug(logging_text + "Enter")
+ # get all needed from database
+ db_nsr = None
+ db_nslcmop_update = {}
+ db_nsr_update = {}
+ db_vnfrs = {} # vnf's info indexed by _id
+ exc = None
+ old_operational_status = ""
+ old_config_status = ""
+ nsi_id = None
+ try:
+ # wait for any previous tasks in process
+ step = "Waiting for previous operations to terminate"
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="HEALING",
+ current_operation_id=nslcmop_id,
+ )
+
+ step = "Getting nslcmop from database"
+ self.logger.debug(
+ step + " after having waited for previous tasks to be completed"
+ )
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+
+ step = "Getting nsr from database"
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ old_operational_status = db_nsr["operational-status"]
+ old_config_status = db_nsr["config-status"]
+
+ db_nsr_update = {
+ "_admin.deployed.RO.operational-status": "healing",
+ }
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ step = "Sending heal order to VIM"
+ task_ro = asyncio.ensure_future(
+ self.heal_RO(
+ logging_text=logging_text,
+ nsr_id=nsr_id,
+ db_nslcmop=db_nslcmop,
+ stage=stage,
+ )
+ )
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "heal_RO", task_ro)
+ tasks_dict_info[task_ro] = "Healing at VIM"
+
+ # VCA tasks
+ # read from db: nsd
+ stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
+ self.logger.debug(logging_text + stage[1])
+ nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+ self.fs.sync(db_nsr["nsd-id"])
+ db_nsr["nsd"] = nsd
+ # read from db: vnfr's of this ns
+ step = "Getting vnfrs from db"
+ db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
+ for vnfr in db_vnfrs_list:
+ db_vnfrs[vnfr["_id"]] = vnfr
+ self.logger.debug("ns.heal db_vnfrs={}".format(db_vnfrs))
+
+ # Check for each target VNF
+ target_list = db_nslcmop.get("operationParams", {}).get("healVnfData", {})
+ for target_vnf in target_list:
+ # Find this VNF in the list from DB
+ vnfr_id = target_vnf.get("vnfInstanceId", None)
+ if vnfr_id:
+ db_vnfr = db_vnfrs[vnfr_id]
+ vnfd_id = db_vnfr.get("vnfd-id")
+ vnfd_ref = db_vnfr.get("vnfd-ref")
+ vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+ base_folder = vnfd["_admin"]["storage"]
+ vdu_id = None
+ vdu_index = 0
+ vdu_name = None
+ kdu_name = None
+ nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
+ member_vnf_index = db_vnfr.get("member-vnf-index-ref")
+
+ # Check each target VDU and deploy N2VC
+ for target_vdu in target_vnf["additionalParams"].get("vdu", None):
+ deploy_params_vdu = target_vdu
+ # Set run-day1 vnf level value if not vdu level value exists
+ if not deploy_params_vdu.get("run-day1") and target_vnf["additionalParams"].get("run-day1"):
+ deploy_params_vdu["run-day1"] = target_vnf["additionalParams"].get("run-day1")
+ vdu_name = target_vdu.get("vdu-id", None)
+ # TODO: Get vdu_id from vdud.
+ vdu_id = vdu_name
+ # For multi instance VDU count-index is mandatory
+ # For single session VDU count-indes is 0
+ vdu_index = target_vdu.get("count-index",0)
+
+ # n2vc_redesign STEP 3 to 6 Deploy N2VC
+ stage[1] = "Deploying Execution Environments."
+ self.logger.debug(logging_text + stage[1])
+
+ # VNF Level charm. Normal case when proxy charms.
+ # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
+ descriptor_config = get_configuration(vnfd, vnfd_ref)
+ if descriptor_config:
+ # Continue if healed machine is management machine
+ vnf_ip_address = db_vnfr.get("ip-address")
+ target_instance = None
+ for instance in db_vnfr.get("vdur", None):
+ if ( instance["vdu-name"] == vdu_name and instance["count-index"] == vdu_index ):
+ target_instance = instance
+ break
+ if vnf_ip_address == target_instance.get("ip-address"):
+ self._heal_n2vc(
+ logging_text=logging_text
+ + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
+ member_vnf_index, vdu_name, vdu_index
+ ),
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_ref,
+ vdu_id=None,
+ kdu_name=None,
+ member_vnf_index=member_vnf_index,
+ vdu_index=0,
+ vdu_name=None,
+ deploy_params=deploy_params_vdu,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage,
+ )
+
+ # VDU Level charm. Normal case with native charms.
+ descriptor_config = get_configuration(vnfd, vdu_name)
+ if descriptor_config:
+ self._heal_n2vc(
+ logging_text=logging_text
+ + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
+ member_vnf_index, vdu_name, vdu_index
+ ),
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_ref,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params_vdu,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage,
+ )
+
+ except (
+ ROclient.ROClientException,
+ DbException,
+ LcmException,
+ NgRoException,
+ ) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(
+ logging_text + "Cancelled Exception while '{}'".format(step)
+ )
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(
+ logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
+ exc_info=True,
+ )
+ finally:
+ if tasks_dict_info:
+ stage[1] = "Waiting for healing pending tasks."
+ self.logger.debug(logging_text + stage[1])
+ exc = await self._wait_for_tasks(
+ logging_text,
+ tasks_dict_info,
+ self.timeout_ns_deploy,
+ stage,
+ nslcmop_id,
+ nsr_id=nsr_id,
+ )
+ if exc:
+ db_nslcmop_update[
+ "detailed-status"
+ ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
+ nslcmop_operation_state = "FAILED"
+ if db_nsr:
+ db_nsr_update["operational-status"] = old_operational_status
+ db_nsr_update["config-status"] = old_config_status
+ db_nsr_update[
+ "detailed-status"
+ ] = "FAILED healing nslcmop={} {}: {}".format(
+ nslcmop_id, step, exc
+ )
+ for task, task_name in tasks_dict_info.items():
+ if not task.done() or task.cancelled() or task.exception():
+ if task_name.startswith(self.task_name_deploy_vca):
+ # A N2VC task is pending
+ db_nsr_update["config-status"] = "failed"
+ else:
+ # RO task is pending
+ db_nsr_update["operational-status"] = "failed"
+ else:
+ error_description_nslcmop = None
+ nslcmop_operation_state = "COMPLETED"
+ db_nslcmop_update["detailed-status"] = "Done"
+ db_nsr_update["detailed-status"] = "Done"
+ db_nsr_update["operational-status"] = "running"
+ db_nsr_update["config-status"] = "configured"
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="IDLE",
+ current_operation_id=None,
+ other_update=db_nsr_update,
+ )
+
+ if nslcmop_operation_state:
+ try:
+ msg = {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ }
+ await self.msg.aiowrite("ns", "healed", msg, loop=self.loop)
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_heal")
+
+ async def heal_RO(
+ self,
+ logging_text,
+ nsr_id,
+ db_nslcmop,
+ stage,
+ ):
+ """
+ Heal at RO
+ :param logging_text: preffix text to use at logging
+ :param nsr_id: nsr identity
+ :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
+ :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
+ :return: None or exception
+ """
+ def get_vim_account(vim_account_id):
+ nonlocal db_vims
+ if vim_account_id in db_vims:
+ return db_vims[vim_account_id]
+ db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
+ db_vims[vim_account_id] = db_vim
+ return db_vim
+
+ try:
+ start_heal = time()
+ ns_params = db_nslcmop.get("operationParams")
+ if ns_params and ns_params.get("timeout_ns_heal"):
+ timeout_ns_heal = ns_params["timeout_ns_heal"]
+ else:
+ timeout_ns_heal = self.timeout.get(
+ "ns_heal", self.timeout_ns_heal
+ )
+
+ db_vims = {}
+
+ nslcmop_id = db_nslcmop["_id"]
+ target = {
+ "action_id": nslcmop_id,
+ }
+ self.logger.warning("db_nslcmop={} and timeout_ns_heal={}".format(db_nslcmop,timeout_ns_heal))
+ target.update(db_nslcmop.get("operationParams", {}))
+
+ self.logger.debug("Send to RO > nsr_id={} target={}".format(nsr_id, target))
+ desc = await self.RO.recreate(nsr_id, target)
+ self.logger.debug("RO return > {}".format(desc))
+ action_id = desc["action_id"]
+ # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
+ await self._wait_ng_ro(
+ nsr_id, action_id, nslcmop_id, start_heal, timeout_ns_heal, stage,
+ operation="healing"
+ )
+
+ # Updating NSR
+ db_nsr_update = {
+ "_admin.deployed.RO.operational-status": "running",
+ "detailed-status": " ".join(stage),
+ }
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+ self.logger.debug(
+ logging_text + "ns healed at RO. RO_id={}".format(action_id)
+ )
+
+ except Exception as e:
+ stage[2] = "ERROR healing at VIM"
+ #self.set_vnfr_at_error(db_vnfrs, str(e))
+ self.logger.error(
+ "Error healing at VIM {}".format(e),
+ exc_info=not isinstance(
+ e,
+ (
+ ROclient.ROClientException,
+ LcmException,
+ DbException,
+ NgRoException,
+ ),
+ ),
+ )
+ raise
+
+ def _heal_n2vc(
+ self,
+ logging_text,
+ db_nsr,
+ db_vnfr,
+ nslcmop_id,
+ nsr_id,
+ nsi_id,
+ vnfd_id,
+ vdu_id,
+ kdu_name,
+ member_vnf_index,
+ vdu_index,
+ vdu_name,
+ deploy_params,
+ descriptor_config,
+ base_folder,
+ task_instantiation_info,
+ stage,
+ ):
+ # launch instantiate_N2VC in a asyncio task and register task object
+ # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
+ # if not found, create one entry and update database
+ # fill db_nsr._admin.deployed.VCA.<index>
+
+ self.logger.debug(
+ logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
+ )
+ if "execution-environment-list" in descriptor_config:
+ ee_list = descriptor_config.get("execution-environment-list", [])
+ elif "juju" in descriptor_config:
+ ee_list = [descriptor_config] # ns charms
+ else: # other types as script are not supported
+ ee_list = []
+
+ for ee_item in ee_list:
+ self.logger.debug(
+ logging_text
+ + "_deploy_n2vc ee_item juju={}, helm={}".format(
+ ee_item.get("juju"), ee_item.get("helm-chart")
+ )
+ )
+ ee_descriptor_id = ee_item.get("id")
+ if ee_item.get("juju"):
+ vca_name = ee_item["juju"].get("charm")
+ vca_type = (
+ "lxc_proxy_charm"
+ if ee_item["juju"].get("charm") is not None
+ else "native_charm"
+ )
+ if ee_item["juju"].get("cloud") == "k8s":
+ vca_type = "k8s_proxy_charm"
+ elif ee_item["juju"].get("proxy") is False:
+ vca_type = "native_charm"
+ elif ee_item.get("helm-chart"):
+ vca_name = ee_item["helm-chart"]
+ if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
+ vca_type = "helm"
+ else:
+ vca_type = "helm-v3"
+ else:
+ self.logger.debug(
+ logging_text + "skipping non juju neither charm configuration"
+ )
+ continue
+
+ vca_index = -1
+ for vca_index, vca_deployed in enumerate(
+ db_nsr["_admin"]["deployed"]["VCA"]
+ ):
+ if not vca_deployed:
+ continue
+ if (
+ vca_deployed.get("member-vnf-index") == member_vnf_index
+ and vca_deployed.get("vdu_id") == vdu_id
+ and vca_deployed.get("kdu_name") == kdu_name
+ and vca_deployed.get("vdu_count_index", 0) == vdu_index
+ and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
+ ):
+ break
+ else:
+ # not found, create one.
+ target = (
+ "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
+ )
+ if vdu_id:
+ target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
+ elif kdu_name:
+ target += "/kdu/{}".format(kdu_name)
+ vca_deployed = {
+ "target_element": target,
+ # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
+ "member-vnf-index": member_vnf_index,
+ "vdu_id": vdu_id,
+ "kdu_name": kdu_name,
+ "vdu_count_index": vdu_index,
+ "operational-status": "init", # TODO revise
+ "detailed-status": "", # TODO revise
+ "step": "initial-deploy", # TODO revise
+ "vnfd_id": vnfd_id,
+ "vdu_name": vdu_name,
+ "type": vca_type,
+ "ee_descriptor_id": ee_descriptor_id,
+ }
+ vca_index += 1
+
+ # create VCA and configurationStatus in db
+ db_dict = {
+ "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
+ "configurationStatus.{}".format(vca_index): dict(),
+ }
+ self.update_db_2("nsrs", nsr_id, db_dict)
+
+ db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
+
+ self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
+ self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
+ self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
+
+ # Launch task
+ task_n2vc = asyncio.ensure_future(
+ self.heal_N2VC(
+ logging_text=logging_text,
+ vca_index=vca_index,
+ nsi_id=nsi_id,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ vdu_index=vdu_index,
+ deploy_params=deploy_params,
+ config_descriptor=descriptor_config,
+ base_folder=base_folder,
+ nslcmop_id=nslcmop_id,
+ stage=stage,
+ vca_type=vca_type,
+ vca_name=vca_name,
+ ee_config_descriptor=ee_item,
+ )
+ )
+ self.lcm_tasks.register(
+ "ns",
+ nsr_id,
+ nslcmop_id,
+ "instantiate_N2VC-{}".format(vca_index),
+ task_n2vc,
+ )
+ task_instantiation_info[
+ task_n2vc
+ ] = self.task_name_deploy_vca + " {}.{}".format(
+ member_vnf_index or "", vdu_id or ""
+ )
+
+ async def heal_N2VC(
+ self,
+ logging_text,
+ vca_index,
+ nsi_id,
+ db_nsr,
+ db_vnfr,
+ vdu_id,
+ kdu_name,
+ vdu_index,
+ config_descriptor,
+ deploy_params,
+ base_folder,
+ nslcmop_id,
+ stage,
+ vca_type,
+ vca_name,
+ ee_config_descriptor,
+ ):
+ nsr_id = db_nsr["_id"]
+ db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
+ vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
+ vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
+ osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
+ db_dict = {
+ "collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": db_update_entry,
+ }
+ step = ""
+ try:
+
+ element_type = "NS"
+ element_under_configuration = nsr_id
+
+ vnfr_id = None
+ if db_vnfr:
+ vnfr_id = db_vnfr["_id"]
+ osm_config["osm"]["vnf_id"] = vnfr_id
+
+ namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
+
+ if vca_type == "native_charm":
+ index_number = 0
+ else:
+ index_number = vdu_index or 0
+
+ if vnfr_id:
+ element_type = "VNF"
+ element_under_configuration = vnfr_id
+ namespace += ".{}-{}".format(vnfr_id, index_number)
+ if vdu_id:
+ namespace += ".{}-{}".format(vdu_id, index_number)
+ element_type = "VDU"
+ element_under_configuration = "{}-{}".format(vdu_id, index_number)
+ osm_config["osm"]["vdu_id"] = vdu_id
+ elif kdu_name:
+ namespace += ".{}".format(kdu_name)
+ element_type = "KDU"
+ element_under_configuration = kdu_name
+ osm_config["osm"]["kdu_name"] = kdu_name
+
+ # Get artifact path
+ if base_folder["pkg-dir"]:
+ artifact_path = "{}/{}/{}/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ "charms"
+ if vca_type
+ in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
+ else "helm-charts",
+ vca_name,
+ )
+ else:
+ artifact_path = "{}/Scripts/{}/{}/".format(
+ base_folder["folder"],
+ "charms"
+ if vca_type
+ in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
+ else "helm-charts",
+ vca_name,
+ )
+
+ self.logger.debug("Artifact path > {}".format(artifact_path))
+
+ # get initial_config_primitive_list that applies to this element
+ initial_config_primitive_list = config_descriptor.get(
+ "initial-config-primitive"
+ )
+
+ self.logger.debug(
+ "Initial config primitive list > {}".format(
+ initial_config_primitive_list
+ )
+ )
+
+ # add config if not present for NS charm
+ ee_descriptor_id = ee_config_descriptor.get("id")
+ self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
+ initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
+ initial_config_primitive_list, vca_deployed, ee_descriptor_id
+ )
+
+ self.logger.debug(
+ "Initial config primitive list #2 > {}".format(
+ initial_config_primitive_list
+ )
+ )
+ # n2vc_redesign STEP 3.1
+ # find old ee_id if exists
+ ee_id = vca_deployed.get("ee_id")
+
+ vca_id = self.get_vca_id(db_vnfr, db_nsr)
+ # create or register execution environment in VCA. Only for native charms when healing
+ if vca_type == "native_charm":
+ step = "Waiting to VM being up and getting IP address"
+ self.logger.debug(logging_text + step)
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
+ logging_text,
+ nsr_id,
+ vnfr_id,
+ vdu_id,
+ vdu_index,
+ user=None,
+ pub_key=None,
+ )
+ credentials = {"hostname": rw_mgmt_ip}
+ # get username
+ username = deep_get(
+ config_descriptor, ("config-access", "ssh-access", "default-user")
+ )
+ # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
+ # merged. Meanwhile let's get username from initial-config-primitive
+ if not username and initial_config_primitive_list:
+ for config_primitive in initial_config_primitive_list:
+ for param in config_primitive.get("parameter", ()):
+ if param["name"] == "ssh-username":
+ username = param["value"]
+ break
+ if not username:
+ raise LcmException(
+ "Cannot determine the username neither with 'initial-config-primitive' nor with "
+ "'config-access.ssh-access.default-user'"
+ )
+ credentials["username"] = username
+
+ # n2vc_redesign STEP 3.2
+ # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status="REGISTERING",
+ element_under_configuration=element_under_configuration,
+ element_type=element_type,
+ )
+
+ step = "register execution environment {}".format(credentials)
+ self.logger.debug(logging_text + step)
+ ee_id = await self.vca_map[vca_type].register_execution_environment(
+ credentials=credentials,
+ namespace=namespace,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ )
+
+ # update ee_id en db
+ db_dict_ee_id = {
+ "_admin.deployed.VCA.{}.ee_id".format(vca_index): ee_id,
+ }
+ self.update_db_2("nsrs", nsr_id, db_dict_ee_id)
+
+ # for compatibility with MON/POL modules, the need model and application name at database
+ # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
+ # Not sure if this need to be done when healing
+ """
+ ee_id_parts = ee_id.split(".")
+ db_nsr_update = {db_update_entry + "ee_id": ee_id}
+ if len(ee_id_parts) >= 2:
+ model_name = ee_id_parts[0]
+ application_name = ee_id_parts[1]
+ db_nsr_update[db_update_entry + "model"] = model_name
+ db_nsr_update[db_update_entry + "application"] = application_name
+ """
+
+ # n2vc_redesign STEP 3.3
+ # Install configuration software. Only for native charms.
+ step = "Install configuration Software"
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status="INSTALLING SW",
+ element_under_configuration=element_under_configuration,
+ element_type=element_type,
+ #other_update=db_nsr_update,
+ other_update=None,
+ )
+
+ # TODO check if already done
+ self.logger.debug(logging_text + step)
+ config = None
+ if vca_type == "native_charm":
+ config_primitive = next(
+ (p for p in initial_config_primitive_list if p["name"] == "config"),
+ None,
+ )
+ if config_primitive:
+ config = self._map_primitive_params(
+ config_primitive, {}, deploy_params
+ )
+ await self.vca_map[vca_type].install_configuration_sw(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ db_dict=db_dict,
+ config=config,
+ num_units=1,
+ vca_id=vca_id,
+ vca_type=vca_type,
+ )
+
+ # write in db flag of configuration_sw already installed
+ self.update_db_2(
+ "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
+ )
+
+ # Not sure if this need to be done when healing
+ """
+ # add relations for this VCA (wait for other peers related with this VCA)
+ await self._add_vca_relations(
+ logging_text=logging_text,
+ nsr_id=nsr_id,
+ vca_type=vca_type,
+ vca_index=vca_index,
+ )
+ """
+
+ # if SSH access is required, then get execution environment SSH public
+ # if native charm we have waited already to VM be UP
+ if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
+ pub_key = None
+ user = None
+ # self.logger.debug("get ssh key block")
+ if deep_get(
+ config_descriptor, ("config-access", "ssh-access", "required")
+ ):
+ # self.logger.debug("ssh key needed")
+ # Needed to inject a ssh key
+ user = deep_get(
+ config_descriptor,
+ ("config-access", "ssh-access", "default-user"),
+ )
+ step = "Install configuration Software, getting public ssh key"
+ pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
+ ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
+ )
+
+ step = "Insert public key into VM user={} ssh_key={}".format(
+ user, pub_key
+ )
+ else:
+ # self.logger.debug("no need to get ssh key")
+ step = "Waiting to VM being up and getting IP address"
+ self.logger.debug(logging_text + step)
+
+ # n2vc_redesign STEP 5.1
+ # wait for RO (ip-address) Insert pub_key into VM
+ # IMPORTANT: We need do wait for RO to complete healing operation.
+ await self._wait_heal_ro(nsr_id,self.timeout_ns_heal)
+ if vnfr_id:
+ if kdu_name:
+ rw_mgmt_ip = await self.wait_kdu_up(
+ logging_text, nsr_id, vnfr_id, kdu_name
+ )
+ else:
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
+ logging_text,
+ nsr_id,
+ vnfr_id,
+ vdu_id,
+ vdu_index,
+ user=user,
+ pub_key=pub_key,
+ )
+ else:
+ rw_mgmt_ip = None # This is for a NS configuration
+
+ self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
+
+ # store rw_mgmt_ip in deploy params for later replacement
+ deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
+
+ # Day1 operations.
+ # get run-day1 operation parameter
+ runDay1 = deploy_params.get("run-day1",False)
+ self.logger.debug(" Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id,vdu_id,runDay1))
+ if runDay1:
+ # n2vc_redesign STEP 6 Execute initial config primitive
+ step = "execute initial config primitive"
+
+ # wait for dependent primitives execution (NS -> VNF -> VDU)
+ if initial_config_primitive_list:
+ await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
+
+ # stage, in function of element type: vdu, kdu, vnf or ns
+ my_vca = vca_deployed_list[vca_index]
+ if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
+ # VDU or KDU
+ stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
+ elif my_vca.get("member-vnf-index"):
+ # VNF
+ stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
+ else:
+ # NS
+ stage[0] = "Stage 5/5: running Day-1 primitives for NS."
+
+ self._write_configuration_status(
+ nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
+ )
+
+ self._write_op_status(op_id=nslcmop_id, stage=stage)
+
+ check_if_terminated_needed = True
+ for initial_config_primitive in initial_config_primitive_list:
+ # adding information on the vca_deployed if it is a NS execution environment
+ if not vca_deployed["member-vnf-index"]:
+ deploy_params["ns_config_info"] = json.dumps(
+ self._get_ns_config_info(nsr_id)
+ )
+ # TODO check if already done
+ primitive_params_ = self._map_primitive_params(
+ initial_config_primitive, {}, deploy_params
+ )
+
+ step = "execute primitive '{}' params '{}'".format(
+ initial_config_primitive["name"], primitive_params_
+ )
+ self.logger.debug(logging_text + step)
+ await self.vca_map[vca_type].exec_primitive(
+ ee_id=ee_id,
+ primitive_name=initial_config_primitive["name"],
+ params_dict=primitive_params_,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ vca_type=vca_type,
+ )
+ # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
+ if check_if_terminated_needed:
+ if config_descriptor.get("terminate-config-primitive"):
+ self.update_db_2(
+ "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
+ )
+ check_if_terminated_needed = False
+
+ # TODO register in database that primitive is done
+
+ # STEP 7 Configure metrics
+ # Not sure if this need to be done when healing
+ """
+ if vca_type == "helm" or vca_type == "helm-v3":
+ prometheus_jobs = await self.extract_prometheus_scrape_jobs(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ ee_config_descriptor=ee_config_descriptor,
+ vnfr_id=vnfr_id,
+ nsr_id=nsr_id,
+ target_ip=rw_mgmt_ip,
+ )
+ if prometheus_jobs:
+ self.update_db_2(
+ "nsrs",
+ nsr_id,
+ {db_update_entry + "prometheus_jobs": prometheus_jobs},
+ )
+
+ for job in prometheus_jobs:
+ self.db.set_one(
+ "prometheus_jobs",
+ {"job_name": job["job_name"]},
+ job,
+ upsert=True,
+ fail_on_empty=False,
+ )
+
+ """
+ step = "instantiated at VCA"
+ self.logger.debug(logging_text + step)
+
+ self._write_configuration_status(
+ nsr_id=nsr_id, vca_index=vca_index, status="READY"
+ )
+
+ except Exception as e: # TODO not use Exception but N2VC exception
+ # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
+ if not isinstance(
+ e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
+ ):
+ self.logger.error(
+ "Exception while {} : {}".format(step, e), exc_info=True
+ )
+ self._write_configuration_status(
+ nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
+ )
+ raise LcmException("{} {}".format(step, e)) from e
+
+ async def _wait_heal_ro(
+ self,
+ nsr_id,
+ timeout=600,
+ ):
+ start_time = time()
+ while time() <= start_time + timeout:
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ operational_status_ro = db_nsr["_admin"]["deployed"]["RO"]["operational-status"]
+ self.logger.debug("Wait Heal RO > {}".format(operational_status_ro))
+ if operational_status_ro != "healing":
+ break
+ await asyncio.sleep(15, loop=self.loop)
+ else: # timeout_ns_deploy
+ raise NgRoException("Timeout waiting ns to deploy")