+ # TODO register in database that primitive is done
+
+ step = "instantiated at VCA"
+ self.logger.debug(logging_text + step)
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='READY'
+ )
+
+ except Exception as e: # TODO not use Exception but N2VC exception
+ # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='BROKEN'
+ )
+ raise Exception("{} {}".format(step, e)) from e
+ # TODO raise N2VC exception with 'step' extra information
+
+ def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
+ error_description: str = None):
+ try:
+ db_dict = dict()
+ if ns_state:
+ db_dict["nsState"] = ns_state
+ db_dict["currentOperation"] = current_operation
+ db_dict["currentOperationID"] = current_operation_id
+ db_dict["errorDescription"] = error_description
+ self.update_db_2("nsrs", nsr_id, db_dict)
+ except Exception as e:
+ self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
+
+ def _write_op_status(self, op_id: str, stage: str = None, error_message: str = None, queuePosition: int = 0):
+ try:
+ db_dict = dict()
+ db_dict['queuePosition'] = queuePosition
+ db_dict['stage'] = stage
+ if error_message:
+ db_dict['errorMessage'] = error_message
+ self.update_db_2("nslcmops", op_id, db_dict)
+ except Exception as e:
+ self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
+
+ def _write_all_config_status(self, nsr_id: str, status: str):
+ try:
+ # nsrs record
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ # configurationStatus
+ config_status = db_nsr.get('configurationStatus')
+ if config_status:
+ # update status
+ db_dict = dict()
+ db_dict['configurationStatus'] = list()
+ for c in config_status:
+ c['status'] = status
+ db_dict['configurationStatus'].append(c)
+ self.update_db_2("nsrs", nsr_id, db_dict)
+
+ except Exception as e:
+ self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
+
+ def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
+ element_under_configuration: str = None, element_type: str = None):
+
+ # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
+ # .format(vca_index, status))
+
+ try:
+ db_path = 'configurationStatus.{}.'.format(vca_index)
+ db_dict = dict()
+ if status:
+ db_dict[db_path + 'status'] = status
+ if element_under_configuration:
+ db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
+ if element_type:
+ db_dict[db_path + 'elementType'] = element_type
+ self.update_db_2("nsrs", nsr_id, db_dict)
+ except Exception as e:
+ self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
+ .format(status, nsr_id, vca_index, e))
+
+ async def instantiate(self, nsr_id, nslcmop_id):
+ """
+
+ :param nsr_id: ns instance to deploy
+ :param nslcmop_id: operation to run
+ :return:
+ """
+
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
+ if not task_is_locked_by_me:
+ self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
+ return
+
+ logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
+ self.logger.debug(logging_text + "Enter")
+
+ # get all needed from database
+
+ # database nsrs record
+ db_nsr = None
+
+ # database nslcmops record
+ db_nslcmop = None
+
+ # update operation on nsrs
+ db_nsr_update = {"_admin.nslcmop": nslcmop_id,
+ "_admin.current-operation": nslcmop_id,
+ "_admin.operation-type": "instantiate"}
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ # update operation on nslcmops
+ db_nslcmop_update = {}
+
+ nslcmop_operation_state = None
+ db_vnfrs = {} # vnf's info indexed by member-index
+ # n2vc_info = {}
+ task_instantiation_list = []
+ task_instantiation_info = {} # from task to info text
+ exc = None
+ try:
+ # wait for any previous tasks in process
+ step = "Waiting for previous operations to terminate"
+ await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
+
+ # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
+
+ # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state="BUILDING",
+ current_operation="INSTANTIATING",
+ current_operation_id=nslcmop_id
+ )
+
+ # read from db: operation
+ step = "Getting nslcmop={} from db".format(nslcmop_id)
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ ns_params = db_nslcmop.get("operationParams")
+ if ns_params and ns_params.get("timeout_ns_deploy"):
+ timeout_ns_deploy = ns_params["timeout_ns_deploy"]
+ else:
+ timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
+
+ # read from db: ns
+ step = "Getting nsr={} from db".format(nsr_id)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ # nsd is replicated into ns (no db read)
+ nsd = db_nsr["nsd"]
+ # nsr_name = db_nsr["name"] # TODO short-name??
+
+ # read from db: vnf's of this ns
+ step = "Getting vnfrs from db"
+ self.logger.debug(logging_text + step)
+ db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
+
+ # read from db: vnfd's for every vnf
+ db_vnfds_ref = {} # every vnfd data indexed by vnf name
+ db_vnfds = {} # every vnfd data indexed by vnf id
+ db_vnfds_index = {} # every vnfd data indexed by vnf member-index
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage='Stage 1/5: preparation of the environment',
+ queuePosition=0
+ )
+
+ # for each vnf in ns, read vnfd
+ for vnfr in db_vnfrs_list:
+ db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
+ vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
+ vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
+ # if we haven't this vnfd, read it from db
+ if vnfd_id not in db_vnfds:
+ # read from db
+ step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
+ self.logger.debug(logging_text + step)
+ vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+
+ # store vnfd
+ db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
+ db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
+ db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
+
+ # Get or generates the _admin.deployed.VCA list
+ vca_deployed_list = None
+ if db_nsr["_admin"].get("deployed"):
+ vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
+ if vca_deployed_list is None:
+ vca_deployed_list = []
+ configuration_status_list = []
+ db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
+ db_nsr_update["configurationStatus"] = configuration_status_list
+ # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
+ populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
+ elif isinstance(vca_deployed_list, dict):
+ # maintain backward compatibility. Change a dict to list at database
+ vca_deployed_list = list(vca_deployed_list.values())
+ db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
+ populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
+
+ db_nsr_update["detailed-status"] = "creating"
+ db_nsr_update["operational-status"] = "init"
+
+ if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
+ populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
+ db_nsr_update["_admin.deployed.RO.vnfd"] = []
+
+ # set state to INSTANTIATED. When instantiated NBI will not delete directly
+ db_nsr_update["_admin.nsState"] = "INSTANTIATED"
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ # n2vc_redesign STEP 2 Deploy Network Scenario
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage='Stage 2/5: deployment of VMs and execution environments'
+ )
+
+ self.logger.debug(logging_text + "Before deploy_kdus")
+ # Call to deploy_kdus in case exists the "vdu:kdu" param
+ task_kdu = asyncio.ensure_future(
+ self.deploy_kdus(
+ logging_text=logging_text,
+ nsr_id=nsr_id,
+ db_nsr=db_nsr,
+ db_vnfrs=db_vnfrs,
+ )
+ )
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
+ task_instantiation_info[task_kdu] = "Deploy KDUs"
+ task_instantiation_list.append(task_kdu)
+ # n2vc_redesign STEP 1 Get VCA public ssh-key
+ # feature 1429. Add n2vc public key to needed VMs
+ n2vc_key = self.n2vc.get_public_key()
+ n2vc_key_list = [n2vc_key]
+ if self.vca_config.get("public_key"):
+ n2vc_key_list.append(self.vca_config["public_key"])
+
+ task_ro = asyncio.ensure_future(
+ self.instantiate_RO(
+ logging_text=logging_text,
+ nsr_id=nsr_id,
+ nsd=nsd,
+ db_nsr=db_nsr,
+ db_nslcmop=db_nslcmop,
+ db_vnfrs=db_vnfrs,
+ db_vnfds_ref=db_vnfds_ref,
+ n2vc_key_list=n2vc_key_list
+ )
+ )
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
+ task_instantiation_info[task_ro] = "Deploy at VIM"
+ task_instantiation_list.append(task_ro)
+
+ # n2vc_redesign STEP 3 to 6 Deploy N2VC
+ step = "Deploying proxy and native charms"
+ self.logger.debug(logging_text + step)
+
+ nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
+ # get_iterable() returns a value from a dict or empty tuple if key does not exist
+ for c_vnf in get_iterable(nsd, "constituent-vnfd"):
+ vnfd_id = c_vnf["vnfd-id-ref"]
+ vnfd = db_vnfds_ref[vnfd_id]
+ member_vnf_index = str(c_vnf["member-vnf-index"])
+ db_vnfr = db_vnfrs[member_vnf_index]
+ base_folder = vnfd["_admin"]["storage"]
+ vdu_id = None
+ vdu_index = 0
+ vdu_name = None
+ kdu_name = None
+
+ # Get additional parameters
+ deploy_params = {}
+ if db_vnfr.get("additionalParamsForVnf"):
+ deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
+
+ descriptor_config = vnfd.get("vnf-configuration")
+ if descriptor_config and descriptor_config.get("juju"):
+ self._deploy_n2vc(
+ logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_list=task_instantiation_list,
+ task_instantiation_info=task_instantiation_info
+ )
+
+ # Deploy charms for each VDU that supports one.
+ for vdud in get_iterable(vnfd, 'vdu'):
+ vdu_id = vdud["id"]
+ descriptor_config = vdud.get('vdu-configuration')
+ vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
+ if vdur.get("additionalParams"):
+ deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
+ else:
+ deploy_params_vdu = deploy_params
+ if descriptor_config and descriptor_config.get("juju"):
+ # look for vdu index in the db_vnfr["vdu"] section
+ # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
+ # if vdur["vdu-id-ref"] == vdu_id:
+ # break
+ # else:
+ # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
+ # "member_vnf_index={}".format(vdu_id, member_vnf_index))
+ # vdu_name = vdur.get("name")
+ vdu_name = None
+ kdu_name = None
+ for vdu_index in range(int(vdud.get("count", 1))):
+ # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
+ self._deploy_n2vc(
+ logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
+ member_vnf_index, vdu_id, vdu_index),
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params_vdu,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_list=task_instantiation_list,
+ task_instantiation_info=task_instantiation_info
+ )
+ for kdud in get_iterable(vnfd, 'kdu'):
+ kdu_name = kdud["name"]
+ descriptor_config = kdud.get('kdu-configuration')
+ if descriptor_config and descriptor_config.get("juju"):
+ vdu_id = None
+ vdu_index = 0
+ vdu_name = None
+ # look for vdu index in the db_vnfr["vdu"] section
+ # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
+ # if vdur["vdu-id-ref"] == vdu_id:
+ # break
+ # else:
+ # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
+ # "member_vnf_index={}".format(vdu_id, member_vnf_index))
+ # vdu_name = vdur.get("name")
+ # vdu_name = None
+
+ self._deploy_n2vc(
+ logging_text=logging_text,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_list=task_instantiation_list,
+ task_instantiation_info=task_instantiation_info
+ )
+
+ # Check if this NS has a charm configuration
+ descriptor_config = nsd.get("ns-configuration")
+ if descriptor_config and descriptor_config.get("juju"):
+ vnfd_id = None
+ db_vnfr = None
+ member_vnf_index = None
+ vdu_id = None
+ kdu_name = None
+ vdu_index = 0
+ vdu_name = None
+
+ # Get additional parameters
+ deploy_params = {}
+ if db_nsr.get("additionalParamsForNs"):
+ deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
+ base_folder = nsd["_admin"]["storage"]
+ self._deploy_n2vc(
+ logging_text=logging_text,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_list=task_instantiation_list,
+ task_instantiation_info=task_instantiation_info
+ )
+
+ # Wait until all tasks of "task_instantiation_list" have been finished
+
+ error_text_list = []
+
+ # let's begin with all OK
+ instantiated_ok = True
+ # let's begin with RO 'running' status (later we can change it)
+ db_nsr_update["operational-status"] = "running"
+ # let's begin with VCA 'configured' status (later we can change it)
+ db_nsr_update["config-status"] = "configured"
+
+ step = "Waiting for tasks to be finished"
+ if task_instantiation_list:
+ # wait for all tasks completion
+ done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout_ns_deploy)
+
+ for task in pending:
+ instantiated_ok = False
+ if task in (task_ro, task_kdu):
+ # RO or KDU task is pending
+ db_nsr_update["operational-status"] = "failed"
+ else:
+ # A N2VC task is pending
+ db_nsr_update["config-status"] = "failed"
+ self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
+ error_text_list.append(task_instantiation_info[task] + ": Timeout")
+ for task in done:
+ if task.cancelled():
+ instantiated_ok = False
+ if task in (task_ro, task_kdu):
+ # RO or KDU task was cancelled
+ db_nsr_update["operational-status"] = "failed"
+ else:
+ # A N2VC was cancelled
+ db_nsr_update["config-status"] = "failed"
+ self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
+ error_text_list.append(task_instantiation_info[task] + ": Cancelled")
+ else:
+ exc = task.exception()
+ if exc:
+ instantiated_ok = False
+ if task in (task_ro, task_kdu):
+ # RO or KDU task raised an exception
+ db_nsr_update["operational-status"] = "failed"
+ else:
+ # A N2VC task raised an exception
+ db_nsr_update["config-status"] = "failed"
+ self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
+
+ if isinstance(exc, (N2VCException, ROclient.ROClientException)):
+ error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
+ else:
+ exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
+ self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
+ error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
+ else:
+ self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
+
+ if error_text_list:
+ error_text = "\n".join(error_text_list)
+ db_nsr_update["detailed-status"] = error_text
+ db_nslcmop_update["detailed-status"] = error_text
+ db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
+ db_nslcmop_update["statusEnteredTime"] = time()
+ else:
+ # all is done
+ db_nsr_update["detailed-status"] = "done"
+ db_nslcmop_update["detailed-status"] = "done"
+ db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
+ db_nslcmop_update["statusEnteredTime"] = time()
+
+ except (ROclient.ROClientException, DbException, LcmException) as e:
+ self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
+ exc_info=True)
+ finally:
+ if exc:
+ if db_nsr:
+ db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
+ db_nsr_update["operational-status"] = "failed"
+ db_nsr_update["config-status"] = "failed"
+ if db_nslcmop:
+ db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
+ db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
+ db_nslcmop_update["statusEnteredTime"] = time()
+ try:
+ if db_nsr:
+ db_nsr_update["_admin.nslcmop"] = None
+ db_nsr_update["_admin.current-operation"] = None
+ db_nsr_update["_admin.operation-type"] = None
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
+ ns_state = None
+ error_description = None
+ if instantiated_ok:
+ ns_state = "READY"
+ else:
+ ns_state = "BROKEN"
+ error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
+
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=ns_state,
+ current_operation="IDLE",
+ current_operation_id=None,
+ error_description=error_description
+ )
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ error_message=error_description
+ )
+
+ if db_nslcmop_update:
+ self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
+
+ self.logger.debug(logging_text + 'End of instantiation: {}'.format(instantiated_ok))
+
+ except DbException as e:
+ self.logger.error(logging_text + "Cannot update database: {}".format(e))
+
+ if nslcmop_operation_state:
+ try:
+ await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state},
+ loop=self.loop)
+ except Exception as e:
+ self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
+
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
+
+ async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
+
+ # steps:
+ # 1. find all relations for this VCA
+ # 2. wait for other peers related
+ # 3. add relations
+
+ try:
+
+ # STEP 1: find all relations for this VCA
+
+ # read nsr record
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+
+ # this VCA data
+ my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
+
+ # read all ns-configuration relations
+ ns_relations = list()
+ db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
+ if db_ns_relations:
+ for r in db_ns_relations:
+ # check if this VCA is in the relation
+ if my_vca.get('member-vnf-index') in\
+ (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
+ ns_relations.append(r)
+
+ # read all vnf-configuration relations
+ vnf_relations = list()
+ db_vnfd_list = db_nsr.get('vnfd-id')
+ if db_vnfd_list:
+ for vnfd in db_vnfd_list:
+ db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
+ db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
+ if db_vnf_relations:
+ for r in db_vnf_relations:
+ # check if this VCA is in the relation
+ if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
+ vnf_relations.append(r)
+
+ # if no relations, terminate
+ if not ns_relations and not vnf_relations:
+ self.logger.debug(logging_text + ' No relations')
+ return True
+
+ self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
+
+ # add all relations
+ start = time()
+ while True:
+ # check timeout
+ now = time()
+ if now - start >= timeout:
+ self.logger.error(logging_text + ' : timeout adding relations')
+ return False
+
+ # reload nsr from database (we need to update record: _admin.deloyed.VCA)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+
+ # for each defined NS relation, find the VCA's related
+ for r in ns_relations:
+ from_vca_ee_id = None
+ to_vca_ee_id = None
+ from_vca_endpoint = None
+ to_vca_endpoint = None
+ vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
+ for vca in vca_list:
+ if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
+ and vca.get('config_sw_installed'):
+ from_vca_ee_id = vca.get('ee_id')
+ from_vca_endpoint = r.get('entities')[0].get('endpoint')
+ if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
+ and vca.get('config_sw_installed'):
+ to_vca_ee_id = vca.get('ee_id')
+ to_vca_endpoint = r.get('entities')[1].get('endpoint')
+ if from_vca_ee_id and to_vca_ee_id:
+ # add relation
+ await self.n2vc.add_relation(
+ ee_id_1=from_vca_ee_id,
+ ee_id_2=to_vca_ee_id,
+ endpoint_1=from_vca_endpoint,
+ endpoint_2=to_vca_endpoint)
+ # remove entry from relations list
+ ns_relations.remove(r)
+ else:
+ # check failed peers
+ try:
+ vca_status_list = db_nsr.get('configurationStatus')
+ if vca_status_list:
+ for i in range(len(vca_list)):
+ vca = vca_list[i]
+ vca_status = vca_status_list[i]
+ if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ except Exception:
+ # ignore
+ pass
+
+ # for each defined VNF relation, find the VCA's related
+ for r in vnf_relations:
+ from_vca_ee_id = None
+ to_vca_ee_id = None
+ from_vca_endpoint = None
+ to_vca_endpoint = None
+ vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
+ for vca in vca_list:
+ if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
+ from_vca_ee_id = vca.get('ee_id')
+ from_vca_endpoint = r.get('entities')[0].get('endpoint')
+ if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
+ to_vca_ee_id = vca.get('ee_id')
+ to_vca_endpoint = r.get('entities')[1].get('endpoint')
+ if from_vca_ee_id and to_vca_ee_id:
+ # add relation
+ await self.n2vc.add_relation(
+ ee_id_1=from_vca_ee_id,
+ ee_id_2=to_vca_ee_id,
+ endpoint_1=from_vca_endpoint,
+ endpoint_2=to_vca_endpoint)
+ # remove entry from relations list
+ vnf_relations.remove(r)
+ else:
+ # check failed peers
+ try:
+ vca_status_list = db_nsr.get('configurationStatus')
+ if vca_status_list:
+ for i in range(len(vca_list)):
+ vca = vca_list[i]
+ vca_status = vca_status_list[i]
+ if vca.get('vdu_id') == r.get('entities')[0].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ if vca.get('vdu_id') == r.get('entities')[1].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ except Exception:
+ # ignore
+ pass
+
+ # wait for next try
+ await asyncio.sleep(5.0)
+
+ if not ns_relations and not vnf_relations:
+ self.logger.debug('Relations added')
+ break
+
+ return True
+
+ except Exception as e:
+ self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
+ return False
+
+ async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs):
+ # Launch kdus if present in the descriptor
+
+ deployed_ok = True
+
+ k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
+
+ def _get_cluster_id(cluster_id, cluster_type):
+ nonlocal k8scluster_id_2_uuic
+ if cluster_id in k8scluster_id_2_uuic[cluster_type]:
+ return k8scluster_id_2_uuic[cluster_type][cluster_id]
+
+ db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
+ if not db_k8scluster:
+ raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
+ k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
+ if not k8s_id:
+ raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
+ k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
+ return k8s_id
+
+ logging_text += "Deploy kdus: "
+ try:
+ db_nsr_update = {"_admin.deployed.K8s": []}
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ # Look for all vnfds
+ pending_tasks = {}
+ index = 0
+ for vnfr_data in db_vnfrs.values():
+ for kdur in get_iterable(vnfr_data, "kdur"):
+ desc_params = self._format_additional_params(kdur.get("additionalParams"))
+ kdumodel = None
+ k8sclustertype = None
+ error_text = None
+ cluster_uuid = None
+ if kdur.get("helm-chart"):
+ kdumodel = kdur["helm-chart"]
+ k8sclustertype = "chart"
+ k8sclustertype_full = "helm-chart"
+ elif kdur.get("juju-bundle"):
+ kdumodel = kdur["juju-bundle"]
+ k8sclustertype = "juju"
+ k8sclustertype_full = "juju-bundle"
+ else:
+ error_text = "kdu type is neither helm-chart nor juju-bundle. Maybe an old NBI version is" \
+ " running"
+ try:
+ if not error_text:
+ cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
+ except LcmException as e:
+ error_text = str(e)
+ deployed_ok = False
+
+ step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
+
+ k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
+ "k8scluster-type": k8sclustertype,
+ "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
+ if error_text:
+ k8s_instace_info["detailed-status"] = error_text
+ db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ if error_text:
+ continue
+
+ db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
+ "{}".format(index)}
+ if k8sclustertype == "chart":
+ task = asyncio.ensure_future(
+ self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
+ params=desc_params, db_dict=db_dict, timeout=3600)
+ )
+ else:
+ task = asyncio.ensure_future(
+ self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
+ atomic=True, params=desc_params,
+ db_dict=db_dict, timeout=600)
+ )
+
+ pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
+ index += 1
+ if not pending_tasks:
+ return
+ self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
+ pending_list = list(pending_tasks.keys())
+ while pending_list:
+ done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
+ return_when=asyncio.FIRST_COMPLETED)
+ if not done_list: # timeout
+ for task in pending_list:
+ db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
+ deployed_ok = False
+ break
+ for task in done_list:
+ exc = task.exception()
+ if exc:
+ db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
+ deployed_ok = False
+ else:
+ db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
+
+ if not deployed_ok:
+ raise LcmException('Cannot deploy KDUs')
+
+ except Exception as e:
+ self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
+ raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
+ finally:
+ if db_nsr_update:
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
+ kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
+ base_folder, task_instantiation_list, task_instantiation_info):
+ # launch instantiate_N2VC in a asyncio task and register task object
+ # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
+ # if not found, create one entry and update database
+
+ # fill db_nsr._admin.deployed.VCA.<index>
+ vca_index = -1
+ for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
+ if not vca_deployed:
+ continue
+ if vca_deployed.get("member-vnf-index") == member_vnf_index and \
+ vca_deployed.get("vdu_id") == vdu_id and \
+ vca_deployed.get("kdu_name") == kdu_name and \
+ vca_deployed.get("vdu_count_index", 0) == vdu_index:
+ break
+ else:
+ # not found, create one.
+ vca_deployed = {
+ "member-vnf-index": member_vnf_index,
+ "vdu_id": vdu_id,
+ "kdu_name": kdu_name,
+ "vdu_count_index": vdu_index,
+ "operational-status": "init", # TODO revise
+ "detailed-status": "", # TODO revise
+ "step": "initial-deploy", # TODO revise
+ "vnfd_id": vnfd_id,
+ "vdu_name": vdu_name,
+ }
+ vca_index += 1
+
+ # create VCA and configurationStatus in db
+ db_dict = {
+ "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
+ "configurationStatus.{}".format(vca_index): dict()
+ }
+ self.update_db_2("nsrs", nsr_id, db_dict)
+
+ db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
+
+ # Launch task
+ task_n2vc = asyncio.ensure_future(
+ self.instantiate_N2VC(
+ logging_text=logging_text,
+ vca_index=vca_index,
+ nsi_id=nsi_id,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ vdu_index=vdu_index,
+ deploy_params=deploy_params,
+ config_descriptor=descriptor_config,
+ base_folder=base_folder,
+ nslcmop_id=nslcmop_id
+ )
+ )
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
+ task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
+ task_instantiation_list.append(task_n2vc)
+
+ # Check if this VNFD has a configured terminate action
+ def _has_terminate_config_primitive(self, vnfd):
+ vnf_config = vnfd.get("vnf-configuration")
+ if vnf_config and vnf_config.get("terminate-config-primitive"):
+ return True
+ else:
+ return False
+
+ @staticmethod
+ def _get_terminate_config_primitive_seq_list(vnfd):
+ """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
+ # No need to check for existing primitive twice, already done before
+ vnf_config = vnfd.get("vnf-configuration")
+ seq_list = vnf_config.get("terminate-config-primitive")
+ # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
+ seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
+ return seq_list_sorted
+
+ @staticmethod
+ def _create_nslcmop(nsr_id, operation, params):
+ """
+ Creates a ns-lcm-opp content to be stored at database.
+ :param nsr_id: internal id of the instance
+ :param operation: instantiate, terminate, scale, action, ...
+ :param params: user parameters for the operation
+ :return: dictionary following SOL005 format
+ """
+ # Raise exception if invalid arguments
+ if not (nsr_id and operation and params):
+ raise LcmException(
+ "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
+ now = time()
+ _id = str(uuid4())
+ nslcmop = {
+ "id": _id,
+ "_id": _id,
+ # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
+ "operationState": "PROCESSING",
+ "statusEnteredTime": now,
+ "nsInstanceId": nsr_id,
+ "lcmOperationType": operation,
+ "startTime": now,
+ "isAutomaticInvocation": False,
+ "operationParams": params,
+ "isCancelPending": False,
+ "links": {
+ "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
+ "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
+ }
+ }
+ return nslcmop
+
+ def _format_additional_params(self, params):
+ params = params or {}
+ for key, value in params.items():
+ if str(value).startswith("!!yaml "):
+ params[key] = yaml.safe_load(value[7:])
+ return params
+
+ def _get_terminate_primitive_params(self, seq, vnf_index):
+ primitive = seq.get('name')
+ primitive_params = {}
+ params = {
+ "member_vnf_index": vnf_index,
+ "primitive": primitive,
+ "primitive_params": primitive_params,
+ }
+ desc_params = {}
+ return self._map_primitive_params(seq, params, desc_params)
+
+ # sub-operations
+
+ def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
+ op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
+ if (op.get('operationState') == 'COMPLETED'):
+ # b. Skip sub-operation
+ # _ns_execute_primitive() or RO.create_action() will NOT be executed
+ return self.SUBOPERATION_STATUS_SKIP
+ else:
+ # c. Reintent executing sub-operation
+ # The sub-operation exists, and operationState != 'COMPLETED'
+ # Update operationState = 'PROCESSING' to indicate a reintent.
+ operationState = 'PROCESSING'
+ detailed_status = 'In progress'
+ self._update_suboperation_status(
+ db_nslcmop, op_index, operationState, detailed_status)
+ # Return the sub-operation index
+ # _ns_execute_primitive() or RO.create_action() will be called from scale()
+ # with arguments extracted from the sub-operation
+ return op_index
+
+ # Find a sub-operation where all keys in a matching dictionary must match
+ # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
+ def _find_suboperation(self, db_nslcmop, match):
+ if (db_nslcmop and match):
+ op_list = db_nslcmop.get('_admin', {}).get('operations', [])
+ for i, op in enumerate(op_list):
+ if all(op.get(k) == match[k] for k in match):
+ return i
+ return self.SUBOPERATION_STATUS_NOT_FOUND
+
+ # Update status for a sub-operation given its index
+ def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
+ # Update DB for HA tasks
+ q_filter = {'_id': db_nslcmop['_id']}
+ update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
+ '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
+ self.db.set_one("nslcmops",
+ q_filter=q_filter,
+ update_dict=update_dict,
+ fail_on_empty=False)
+
+ # Add sub-operation, return the index of the added sub-operation
+ # Optionally, set operationState, detailed-status, and operationType
+ # Status and type are currently set for 'scale' sub-operations:
+ # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
+ # 'detailed-status' : status message
+ # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
+ # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
+ def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
+ mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
+ RO_nsr_id=None, RO_scaling_info=None):
+ if not (db_nslcmop):
+ return self.SUBOPERATION_STATUS_NOT_FOUND
+ # Get the "_admin.operations" list, if it exists
+ db_nslcmop_admin = db_nslcmop.get('_admin', {})
+ op_list = db_nslcmop_admin.get('operations')
+ # Create or append to the "_admin.operations" list
+ new_op = {'member_vnf_index': vnf_index,
+ 'vdu_id': vdu_id,
+ 'vdu_count_index': vdu_count_index,
+ 'primitive': primitive,
+ 'primitive_params': mapped_primitive_params}
+ if operationState:
+ new_op['operationState'] = operationState
+ if detailed_status:
+ new_op['detailed-status'] = detailed_status
+ if operationType:
+ new_op['lcmOperationType'] = operationType
+ if RO_nsr_id:
+ new_op['RO_nsr_id'] = RO_nsr_id
+ if RO_scaling_info:
+ new_op['RO_scaling_info'] = RO_scaling_info
+ if not op_list:
+ # No existing operations, create key 'operations' with current operation as first list element
+ db_nslcmop_admin.update({'operations': [new_op]})
+ op_list = db_nslcmop_admin.get('operations')
+ else:
+ # Existing operations, append operation to list
+ op_list.append(new_op)
+
+ db_nslcmop_update = {'_admin.operations': op_list}
+ self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
+ op_index = len(op_list) - 1
+ return op_index
+
+ # Helper methods for scale() sub-operations
+
+ # pre-scale/post-scale:
+ # Check for 3 different cases:
+ # a. New: First time execution, return SUBOPERATION_STATUS_NEW
+ # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
+ # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
+ def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
+ operationType, RO_nsr_id=None, RO_scaling_info=None):
+ # Find this sub-operation
+ if (RO_nsr_id and RO_scaling_info):
+ operationType = 'SCALE-RO'
+ match = {
+ 'member_vnf_index': vnf_index,
+ 'RO_nsr_id': RO_nsr_id,
+ 'RO_scaling_info': RO_scaling_info,
+ }
+ else:
+ match = {
+ 'member_vnf_index': vnf_index,
+ 'primitive': vnf_config_primitive,
+ 'primitive_params': primitive_params,
+ 'lcmOperationType': operationType
+ }
+ op_index = self._find_suboperation(db_nslcmop, match)
+ if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
+ # a. New sub-operation
+ # The sub-operation does not exist, add it.
+ # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
+ # The following parameters are set to None for all kind of scaling:
+ vdu_id = None
+ vdu_count_index = None
+ vdu_name = None
+ if (RO_nsr_id and RO_scaling_info):
+ vnf_config_primitive = None
+ primitive_params = None
+ else:
+ RO_nsr_id = None
+ RO_scaling_info = None
+ # Initial status for sub-operation
+ operationState = 'PROCESSING'
+ detailed_status = 'In progress'
+ # Add sub-operation for pre/post-scaling (zero or more operations)
+ self._add_suboperation(db_nslcmop,
+ vnf_index,
+ vdu_id,
+ vdu_count_index,
+ vdu_name,
+ vnf_config_primitive,
+ primitive_params,
+ operationState,
+ detailed_status,
+ operationType,
+ RO_nsr_id,
+ RO_scaling_info)
+ return self.SUBOPERATION_STATUS_NEW
+ else:
+ # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
+ # or op_index (operationState != 'COMPLETED')
+ return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
+
+ # Function to return execution_environment id
+
+ def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
+ for vca in vca_deployed_list:
+ if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
+ return vca["ee_id"]
+
+ # Helper methods for terminate()
+
+ async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
+ """ Create a primitive with params from VNFD
+ Called from terminate() before deleting instance
+ Calls action() to execute the primitive """
+ logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
+ db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
+ db_vnfds = {}
+ # Loop over VNFRs
+ for vnfr in db_vnfrs_list:
+ vnfd_id = vnfr["vnfd-id"]
+ vnf_index = vnfr["member-vnf-index-ref"]
+ if vnfd_id not in db_vnfds:
+ step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
+ vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+ db_vnfds[vnfd_id] = vnfd
+ vnfd = db_vnfds[vnfd_id]
+ if not self._has_terminate_config_primitive(vnfd):
+ continue
+ # Get the primitive's sorted sequence list
+ seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
+ for seq in seq_list:
+ # For each sequence in list, get primitive and call _ns_execute_primitive()
+ step = "Calling terminate action for vnf_member_index={} primitive={}".format(
+ vnf_index, seq.get("name"))
+ self.logger.debug(logging_text + step)
+ # Create the primitive for each sequence, i.e. "primitive": "touch"
+ primitive = seq.get('name')
+ mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
+ # The following 3 parameters are currently set to None for 'terminate':
+ # vdu_id, vdu_count_index, vdu_name
+ vdu_id = db_nslcmop["operationParams"].get("vdu_id")
+ vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
+ vdu_name = db_nslcmop["operationParams"].get("vdu_name")
+ # Add sub-operation
+ self._add_suboperation(db_nslcmop,
+ nslcmop_id,
+ vnf_index,
+ vdu_id,
+ vdu_count_index,
+ vdu_name,
+ primitive,
+ mapped_primitive_params)
+ # Sub-operations: Call _ns_execute_primitive() instead of action()
+ # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ # nsr_deployed = db_nsr["_admin"]["deployed"]
+
+ # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
+ # nsr_id, nslcmop_terminate_action_id)
+ # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
+ # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
+ # if result not in result_ok:
+ # raise LcmException(
+ # "terminate_primitive_action for vnf_member_index={}",
+ # " primitive={} fails with error {}".format(
+ # vnf_index, seq.get("name"), result_detail))
+
+ ee_id = self._get_ee_id(vnf_index, vdu_id, vca_deployed_list)
+ try:
+ await self.n2vc.exec_primitive(
+ ee_id=ee_id,
+ primitive_name=primitive,
+ params_dict=mapped_primitive_params
+ )
+ except Exception as e:
+ self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
+ raise LcmException(
+ "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
+ .format(vnf_index, seq.get("name"), e),
+ )
+
+ async def _delete_N2VC(self, nsr_id: str):
+ self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
+ namespace = "." + nsr_id
+ await self.n2vc.delete_namespace(namespace=namespace)
+ self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
+
+ async def terminate(self, nsr_id, nslcmop_id):
+
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
+ if not task_is_locked_by_me:
+ return