+ my_vca = vca_deployed_list[vca_index]
+ if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
+ # vdu or kdu: no dependencies
+ return
+ timeout = 300
+ while timeout >= 0:
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
+ configuration_status_list = db_nsr["configurationStatus"]
+ for index, vca_deployed in enumerate(configuration_status_list):
+ if index == vca_index:
+ # myself
+ continue
+ if not my_vca.get("member-vnf-index") or \
+ (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
+ internal_status = configuration_status_list[index].get("status")
+ if internal_status == 'READY':
+ continue
+ elif internal_status == 'BROKEN':
+ raise LcmException("Configuration aborted because dependent charm/s has failed")
+ else:
+ break
+ else:
+ # no dependencies, return
+ return
+ await asyncio.sleep(10)
+ timeout -= 1
+
+ raise LcmException("Configuration aborted because dependent charm/s timeout")
+
+ async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
+ config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
+ ee_config_descriptor):
+ nsr_id = db_nsr["_id"]
+ db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
+ vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
+ vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
+ osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
+ db_dict = {
+ 'collection': 'nsrs',
+ 'filter': {'_id': nsr_id},
+ 'path': db_update_entry
+ }
+ step = ""
+ try:
+
+ element_type = 'NS'
+ element_under_configuration = nsr_id
+
+ vnfr_id = None
+ if db_vnfr:
+ vnfr_id = db_vnfr["_id"]
+ osm_config["osm"]["vnf_id"] = vnfr_id
+
+ namespace = "{nsi}.{ns}".format(
+ nsi=nsi_id if nsi_id else "",
+ ns=nsr_id)
+
+ if vnfr_id:
+ element_type = 'VNF'
+ element_under_configuration = vnfr_id
+ namespace += ".{}".format(vnfr_id)
+ if vdu_id:
+ namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
+ element_type = 'VDU'
+ element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
+ osm_config["osm"]["vdu_id"] = vdu_id
+ elif kdu_name:
+ namespace += ".{}".format(kdu_name)
+ element_type = 'KDU'
+ element_under_configuration = kdu_name
+ osm_config["osm"]["kdu_name"] = kdu_name
+
+ # Get artifact path
+ artifact_path = "{}/{}/{}/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
+ vca_name
+ )
+ # get initial_config_primitive_list that applies to this element
+ initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
+
+ # add config if not present for NS charm
+ ee_descriptor_id = ee_config_descriptor.get("id")
+ initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
+ vca_deployed, ee_descriptor_id)
+
+ # n2vc_redesign STEP 3.1
+ # find old ee_id if exists
+ ee_id = vca_deployed.get("ee_id")
+
+ # create or register execution environment in VCA
+ if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm"):
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='CREATING',
+ element_under_configuration=element_under_configuration,
+ element_type=element_type
+ )
+
+ step = "create execution environment"
+ self.logger.debug(logging_text + step)
+ ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
+ namespace=namespace,
+ reuse_ee_id=ee_id,
+ db_dict=db_dict,
+ config=osm_config,
+ artifact_path=artifact_path,
+ vca_type=vca_type)
+
+ elif vca_type == "native_charm":
+ step = "Waiting to VM being up and getting IP address"
+ self.logger.debug(logging_text + step)
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
+ user=None, pub_key=None)
+ credentials = {"hostname": rw_mgmt_ip}
+ # get username
+ username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
+ # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
+ # merged. Meanwhile let's get username from initial-config-primitive
+ if not username and initial_config_primitive_list:
+ for config_primitive in initial_config_primitive_list:
+ for param in config_primitive.get("parameter", ()):
+ if param["name"] == "ssh-username":
+ username = param["value"]
+ break
+ if not username:
+ raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
+ "'config-access.ssh-access.default-user'")
+ credentials["username"] = username
+ # n2vc_redesign STEP 3.2
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='REGISTERING',
+ element_under_configuration=element_under_configuration,
+ element_type=element_type
+ )
+
+ step = "register execution environment {}".format(credentials)
+ self.logger.debug(logging_text + step)
+ ee_id = await self.vca_map[vca_type].register_execution_environment(
+ credentials=credentials, namespace=namespace, db_dict=db_dict)
+
+ # for compatibility with MON/POL modules, the need model and application name at database
+ # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
+ ee_id_parts = ee_id.split('.')
+ db_nsr_update = {db_update_entry + "ee_id": ee_id}
+ if len(ee_id_parts) >= 2:
+ model_name = ee_id_parts[0]
+ application_name = ee_id_parts[1]
+ db_nsr_update[db_update_entry + "model"] = model_name
+ db_nsr_update[db_update_entry + "application"] = application_name
+
+ # n2vc_redesign STEP 3.3
+ step = "Install configuration Software"
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='INSTALLING SW',
+ element_under_configuration=element_under_configuration,
+ element_type=element_type,
+ other_update=db_nsr_update
+ )
+
+ # TODO check if already done
+ self.logger.debug(logging_text + step)
+ config = None
+ if vca_type == "native_charm":
+ config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
+ if config_primitive:
+ config = self._map_primitive_params(
+ config_primitive,
+ {},
+ deploy_params
+ )
+ num_units = 1
+ if vca_type == "lxc_proxy_charm":
+ if element_type == "NS":
+ num_units = db_nsr.get("config-units") or 1
+ elif element_type == "VNF":
+ num_units = db_vnfr.get("config-units") or 1
+ elif element_type == "VDU":
+ for v in db_vnfr["vdur"]:
+ if vdu_id == v["vdu-id-ref"]:
+ num_units = v.get("config-units") or 1
+ break
+
+ await self.vca_map[vca_type].install_configuration_sw(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ db_dict=db_dict,
+ config=config,
+ num_units=num_units,
+ vca_type=vca_type
+ )
+
+ # write in db flag of configuration_sw already installed
+ self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
+
+ # add relations for this VCA (wait for other peers related with this VCA)
+ await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
+ vca_index=vca_index, vca_type=vca_type)
+
+ # if SSH access is required, then get execution environment SSH public
+ # if native charm we have waited already to VM be UP
+ if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm"):
+ pub_key = None
+ user = None
+ # self.logger.debug("get ssh key block")
+ if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
+ # self.logger.debug("ssh key needed")
+ # Needed to inject a ssh key
+ user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
+ step = "Install configuration Software, getting public ssh key"
+ pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
+
+ step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
+ else:
+ # self.logger.debug("no need to get ssh key")
+ step = "Waiting to VM being up and getting IP address"
+ self.logger.debug(logging_text + step)
+
+ # n2vc_redesign STEP 5.1
+ # wait for RO (ip-address) Insert pub_key into VM
+ if vnfr_id:
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
+ user=user, pub_key=pub_key)
+ else:
+ rw_mgmt_ip = None # This is for a NS configuration
+
+ self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
+
+ # store rw_mgmt_ip in deploy params for later replacement
+ deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
+
+ # n2vc_redesign STEP 6 Execute initial config primitive
+ step = 'execute initial config primitive'
+
+ # wait for dependent primitives execution (NS -> VNF -> VDU)
+ if initial_config_primitive_list:
+ await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
+
+ # stage, in function of element type: vdu, kdu, vnf or ns
+ my_vca = vca_deployed_list[vca_index]
+ if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
+ # VDU or KDU
+ stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
+ elif my_vca.get("member-vnf-index"):
+ # VNF
+ stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
+ else:
+ # NS
+ stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='EXECUTING PRIMITIVE'
+ )
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage=stage
+ )
+
+ check_if_terminated_needed = True
+ for initial_config_primitive in initial_config_primitive_list:
+ # adding information on the vca_deployed if it is a NS execution environment
+ if not vca_deployed["member-vnf-index"]:
+ deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
+ # TODO check if already done
+ primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
+
+ step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
+ self.logger.debug(logging_text + step)
+ await self.vca_map[vca_type].exec_primitive(
+ ee_id=ee_id,
+ primitive_name=initial_config_primitive["name"],
+ params_dict=primitive_params_,
+ db_dict=db_dict
+ )
+ # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
+ if check_if_terminated_needed:
+ if config_descriptor.get('terminate-config-primitive'):
+ self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
+ check_if_terminated_needed = False
+
+ # TODO register in database that primitive is done
+
+ # STEP 7 Configure metrics
+ if vca_type == "helm":
+ prometheus_jobs = await self.add_prometheus_metrics(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ ee_config_descriptor=ee_config_descriptor,
+ vnfr_id=vnfr_id,
+ nsr_id=nsr_id,
+ target_ip=rw_mgmt_ip,
+ )
+ if prometheus_jobs:
+ self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
+
+ step = "instantiated at VCA"
+ self.logger.debug(logging_text + step)
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='READY'
+ )
+
+ except Exception as e: # TODO not use Exception but N2VC exception
+ # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
+ if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
+ self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='BROKEN'
+ )
+ raise LcmException("{} {}".format(step, e)) from e
+
+ def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
+ error_description: str = None, error_detail: str = None, other_update: dict = None):
+ """
+ Update db_nsr fields.
+ :param nsr_id:
+ :param ns_state:
+ :param current_operation:
+ :param current_operation_id:
+ :param error_description:
+ :param error_detail:
+ :param other_update: Other required changes at database if provided, will be cleared
+ :return:
+ """
+ try:
+ db_dict = other_update or {}
+ db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
+ db_dict["_admin.current-operation"] = current_operation_id
+ db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
+ db_dict["currentOperation"] = current_operation
+ db_dict["currentOperationID"] = current_operation_id
+ db_dict["errorDescription"] = error_description
+ db_dict["errorDetail"] = error_detail
+
+ if ns_state:
+ db_dict["nsState"] = ns_state
+ self.update_db_2("nsrs", nsr_id, db_dict)
+ except DbException as e:
+ self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
+
+ def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
+ operation_state: str = None, other_update: dict = None):
+ try:
+ db_dict = other_update or {}
+ db_dict['queuePosition'] = queuePosition
+ if isinstance(stage, list):
+ db_dict['stage'] = stage[0]
+ db_dict['detailed-status'] = " ".join(stage)
+ elif stage is not None:
+ db_dict['stage'] = str(stage)
+
+ if error_message is not None:
+ db_dict['errorMessage'] = error_message
+ if operation_state is not None:
+ db_dict['operationState'] = operation_state
+ db_dict["statusEnteredTime"] = time()
+ self.update_db_2("nslcmops", op_id, db_dict)
+ except DbException as e:
+ self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
+
+ def _write_all_config_status(self, db_nsr: dict, status: str):
+ try:
+ nsr_id = db_nsr["_id"]
+ # configurationStatus
+ config_status = db_nsr.get('configurationStatus')
+ if config_status:
+ db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
+ enumerate(config_status) if v}
+ # update status
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ except DbException as e:
+ self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
+
+ def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
+ element_under_configuration: str = None, element_type: str = None,
+ other_update: dict = None):
+
+ # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
+ # .format(vca_index, status))
+
+ try:
+ db_path = 'configurationStatus.{}.'.format(vca_index)
+ db_dict = other_update or {}
+ if status:
+ db_dict[db_path + 'status'] = status
+ if element_under_configuration:
+ db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
+ if element_type:
+ db_dict[db_path + 'elementType'] = element_type
+ self.update_db_2("nsrs", nsr_id, db_dict)
+ except DbException as e:
+ self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
+ .format(status, nsr_id, vca_index, e))
+
+ async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
+ """
+ Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
+ sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
+ Database is used because the result can be obtained from a different LCM worker in case of HA.
+ :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
+ :param db_nslcmop: database content of nslcmop
+ :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
+ :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
+ computed 'vim-account-id'
+ """
+ modified = False
+ nslcmop_id = db_nslcmop['_id']
+ placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
+ if placement_engine == "PLA":
+ self.logger.debug(logging_text + "Invoke and wait for placement optimization")
+ await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
+ db_poll_interval = 5
+ wait = db_poll_interval * 10
+ pla_result = None
+ while not pla_result and wait >= 0:
+ await asyncio.sleep(db_poll_interval)
+ wait -= db_poll_interval
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
+
+ if not pla_result:
+ raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
+
+ for pla_vnf in pla_result['vnf']:
+ vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
+ if not pla_vnf.get('vimAccountId') or not vnfr:
+ continue
+ modified = True
+ self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
+ # Modifies db_vnfrs
+ vnfr["vim-account-id"] = pla_vnf['vimAccountId']
+ return modified
+
+ def update_nsrs_with_pla_result(self, params):
+ try:
+ nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
+ self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
+ except Exception as e:
+ self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
+
+ async def instantiate(self, nsr_id, nslcmop_id):
+ """
+
+ :param nsr_id: ns instance to deploy
+ :param nslcmop_id: operation to run
+ :return:
+ """
+
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
+ if not task_is_locked_by_me:
+ self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
+ return
+
+ logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
+ self.logger.debug(logging_text + "Enter")
+
+ # get all needed from database
+
+ # database nsrs record
+ db_nsr = None
+
+ # database nslcmops record
+ db_nslcmop = None
+
+ # update operation on nsrs
+ db_nsr_update = {}
+ # update operation on nslcmops
+ db_nslcmop_update = {}
+
+ nslcmop_operation_state = None
+ db_vnfrs = {} # vnf's info indexed by member-index
+ # n2vc_info = {}
+ tasks_dict_info = {} # from task to info text
+ exc = None
+ error_list = []
+ stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
+ # ^ stage, step, VIM progress
+ try:
+ # wait for any previous tasks in process
+ await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
+
+ stage[1] = "Sync filesystem from database"
+ self.fs.sync() # TODO, make use of partial sync, only for the needed packages
+
+ # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
+ stage[1] = "Reading from database"
+ # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
+ db_nsr_update["detailed-status"] = "creating"
+ db_nsr_update["operational-status"] = "init"
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state="BUILDING",
+ current_operation="INSTANTIATING",
+ current_operation_id=nslcmop_id,
+ other_update=db_nsr_update
+ )
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage=stage,
+ queuePosition=0
+ )
+
+ # read from db: operation
+ stage[1] = "Getting nslcmop={} from db".format(nslcmop_id)
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ ns_params = db_nslcmop.get("operationParams")
+ if ns_params and ns_params.get("timeout_ns_deploy"):
+ timeout_ns_deploy = ns_params["timeout_ns_deploy"]
+ else:
+ timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
+
+ # read from db: ns
+ stage[1] = "Getting nsr={} from db".format(nsr_id)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ stage[1] = "Getting nsd={} from db".format(db_nsr["nsd-id"])
+ nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+ db_nsr["nsd"] = nsd
+ # nsr_name = db_nsr["name"] # TODO short-name??
+
+ # read from db: vnf's of this ns
+ stage[1] = "Getting vnfrs from db"
+ self.logger.debug(logging_text + stage[1])
+ db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
+
+ # read from db: vnfd's for every vnf
+ db_vnfds_ref = {} # every vnfd data indexed by vnf name
+ db_vnfds = {} # every vnfd data indexed by vnf id
+ db_vnfds_index = {} # every vnfd data indexed by vnf member-index
+
+ # for each vnf in ns, read vnfd
+ for vnfr in db_vnfrs_list:
+ db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
+ vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
+ vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
+ # if we haven't this vnfd, read it from db
+ if vnfd_id not in db_vnfds:
+ # read from db
+ stage[1] = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
+ self.logger.debug(logging_text + stage[1])
+ vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+
+ # store vnfd
+ db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
+ db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
+ db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
+
+ # Get or generates the _admin.deployed.VCA list
+ vca_deployed_list = None
+ if db_nsr["_admin"].get("deployed"):
+ vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
+ if vca_deployed_list is None:
+ vca_deployed_list = []
+ configuration_status_list = []
+ db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
+ db_nsr_update["configurationStatus"] = configuration_status_list
+ # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
+ populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
+ elif isinstance(vca_deployed_list, dict):
+ # maintain backward compatibility. Change a dict to list at database
+ vca_deployed_list = list(vca_deployed_list.values())
+ db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
+ populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
+
+ if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
+ populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
+ db_nsr_update["_admin.deployed.RO.vnfd"] = []
+
+ # set state to INSTANTIATED. When instantiated NBI will not delete directly
+ db_nsr_update["_admin.nsState"] = "INSTANTIATED"
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ # n2vc_redesign STEP 2 Deploy Network Scenario
+ stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage=stage
+ )
+
+ stage[1] = "Deploying KDUs,"
+ # self.logger.debug(logging_text + "Before deploy_kdus")
+ # Call to deploy_kdus in case exists the "vdu:kdu" param
+ await self.deploy_kdus(
+ logging_text=logging_text,
+ nsr_id=nsr_id,
+ nslcmop_id=nslcmop_id,
+ db_vnfrs=db_vnfrs,
+ db_vnfds=db_vnfds,
+ task_instantiation_info=tasks_dict_info,
+ )
+
+ stage[1] = "Getting VCA public key."
+ # n2vc_redesign STEP 1 Get VCA public ssh-key
+ # feature 1429. Add n2vc public key to needed VMs
+ n2vc_key = self.n2vc.get_public_key()
+ n2vc_key_list = [n2vc_key]
+ if self.vca_config.get("public_key"):
+ n2vc_key_list.append(self.vca_config["public_key"])
+
+ stage[1] = "Deploying NS at VIM."
+ task_ro = asyncio.ensure_future(
+ self.instantiate_RO(
+ logging_text=logging_text,
+ nsr_id=nsr_id,
+ nsd=nsd,
+ db_nsr=db_nsr,
+ db_nslcmop=db_nslcmop,
+ db_vnfrs=db_vnfrs,
+ db_vnfds_ref=db_vnfds_ref,
+ n2vc_key_list=n2vc_key_list,
+ stage=stage
+ )
+ )
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
+ tasks_dict_info[task_ro] = "Deploying at VIM"
+
+ # n2vc_redesign STEP 3 to 6 Deploy N2VC
+ stage[1] = "Deploying Execution Environments."
+ self.logger.debug(logging_text + stage[1])
+
+ nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
+ # get_iterable() returns a value from a dict or empty tuple if key does not exist
+ for c_vnf in get_iterable(nsd, "constituent-vnfd"):
+ vnfd_id = c_vnf["vnfd-id-ref"]
+ vnfd = db_vnfds_ref[vnfd_id]
+ member_vnf_index = str(c_vnf["member-vnf-index"])
+ db_vnfr = db_vnfrs[member_vnf_index]
+ base_folder = vnfd["_admin"]["storage"]
+ vdu_id = None
+ vdu_index = 0
+ vdu_name = None
+ kdu_name = None
+
+ # Get additional parameters
+ deploy_params = {}
+ if db_vnfr.get("additionalParamsForVnf"):
+ deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
+
+ descriptor_config = vnfd.get("vnf-configuration")
+ if descriptor_config:
+ self._deploy_n2vc(
+ logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage
+ )
+
+ # Deploy charms for each VDU that supports one.
+ for vdud in get_iterable(vnfd, 'vdu'):
+ vdu_id = vdud["id"]
+ descriptor_config = vdud.get('vdu-configuration')
+ vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
+ if vdur.get("additionalParams"):
+ deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
+ else:
+ deploy_params_vdu = deploy_params
+ if descriptor_config:
+ # look for vdu index in the db_vnfr["vdu"] section
+ # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
+ # if vdur["vdu-id-ref"] == vdu_id:
+ # break
+ # else:
+ # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
+ # "member_vnf_index={}".format(vdu_id, member_vnf_index))
+ # vdu_name = vdur.get("name")
+ vdu_name = None
+ kdu_name = None
+ for vdu_index in range(int(vdud.get("count", 1))):
+ # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
+ self._deploy_n2vc(
+ logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
+ member_vnf_index, vdu_id, vdu_index),
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params_vdu,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage
+ )
+ for kdud in get_iterable(vnfd, 'kdu'):
+ kdu_name = kdud["name"]
+ descriptor_config = kdud.get('kdu-configuration')
+ if descriptor_config:
+ vdu_id = None
+ vdu_index = 0
+ vdu_name = None
+ # look for vdu index in the db_vnfr["vdu"] section
+ # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
+ # if vdur["vdu-id-ref"] == vdu_id:
+ # break
+ # else:
+ # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
+ # "member_vnf_index={}".format(vdu_id, member_vnf_index))
+ # vdu_name = vdur.get("name")
+ # vdu_name = None
+
+ self._deploy_n2vc(
+ logging_text=logging_text,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage
+ )
+
+ # Check if this NS has a charm configuration
+ descriptor_config = nsd.get("ns-configuration")
+ if descriptor_config and descriptor_config.get("juju"):
+ vnfd_id = None
+ db_vnfr = None
+ member_vnf_index = None
+ vdu_id = None
+ kdu_name = None
+ vdu_index = 0
+ vdu_name = None
+
+ # Get additional parameters
+ deploy_params = {}
+ if db_nsr.get("additionalParamsForNs"):
+ deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
+ base_folder = nsd["_admin"]["storage"]
+ self._deploy_n2vc(
+ logging_text=logging_text,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage
+ )
+
+ # rest of staff will be done at finally
+
+ except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
+ self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
+ finally:
+ if exc:
+ error_list.append(str(exc))
+ try:
+ # wait for pending tasks
+ if tasks_dict_info:
+ stage[1] = "Waiting for instantiate pending tasks."
+ self.logger.debug(logging_text + stage[1])
+ error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
+ stage, nslcmop_id, nsr_id=nsr_id)
+ stage[1] = stage[2] = ""
+ except asyncio.CancelledError:
+ error_list.append("Cancelled")
+ # TODO cancel all tasks
+ except Exception as exc:
+ error_list.append(str(exc))
+
+ # update operation-status
+ db_nsr_update["operational-status"] = "running"
+ # let's begin with VCA 'configured' status (later we can change it)
+ db_nsr_update["config-status"] = "configured"
+ for task, task_name in tasks_dict_info.items():
+ if not task.done() or task.cancelled() or task.exception():
+ if task_name.startswith(self.task_name_deploy_vca):
+ # A N2VC task is pending
+ db_nsr_update["config-status"] = "failed"
+ else:
+ # RO or KDU task is pending
+ db_nsr_update["operational-status"] = "failed"
+
+ # update status at database
+ if error_list:
+ error_detail = ". ".join(error_list)
+ self.logger.error(logging_text + error_detail)
+ error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
+ error_description_nsr = 'Operation: INSTANTIATING.{}, Stage {}'.format(nslcmop_id, stage[0])
+
+ db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
+ db_nslcmop_update["detailed-status"] = error_detail
+ nslcmop_operation_state = "FAILED"
+ ns_state = "BROKEN"
+ else:
+ error_detail = None
+ error_description_nsr = error_description_nslcmop = None
+ ns_state = "READY"
+ db_nsr_update["detailed-status"] = "Done"
+ db_nslcmop_update["detailed-status"] = "Done"
+ nslcmop_operation_state = "COMPLETED"
+
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=ns_state,
+ current_operation="IDLE",
+ current_operation_id=None,
+ error_description=error_description_nsr,
+ error_detail=error_detail,
+ other_update=db_nsr_update
+ )
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+
+ if nslcmop_operation_state:
+ try:
+ await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state},
+ loop=self.loop)
+ except Exception as e:
+ self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
+
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
+
+ async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
+ timeout: int = 3600, vca_type: str = None) -> bool:
+
+ # steps:
+ # 1. find all relations for this VCA
+ # 2. wait for other peers related
+ # 3. add relations
+
+ try:
+ vca_type = vca_type or "lxc_proxy_charm"
+
+ # STEP 1: find all relations for this VCA
+
+ # read nsr record
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+
+ # this VCA data
+ my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
+
+ # read all ns-configuration relations
+ ns_relations = list()
+ db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
+ if db_ns_relations:
+ for r in db_ns_relations:
+ # check if this VCA is in the relation
+ if my_vca.get('member-vnf-index') in\
+ (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
+ ns_relations.append(r)
+
+ # read all vnf-configuration relations
+ vnf_relations = list()
+ db_vnfd_list = db_nsr.get('vnfd-id')
+ if db_vnfd_list:
+ for vnfd in db_vnfd_list:
+ db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
+ db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
+ if db_vnf_relations:
+ for r in db_vnf_relations:
+ # check if this VCA is in the relation
+ if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
+ vnf_relations.append(r)
+
+ # if no relations, terminate
+ if not ns_relations and not vnf_relations:
+ self.logger.debug(logging_text + ' No relations')
+ return True
+
+ self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
+
+ # add all relations
+ start = time()
+ while True:
+ # check timeout
+ now = time()
+ if now - start >= timeout:
+ self.logger.error(logging_text + ' : timeout adding relations')
+ return False
+
+ # reload nsr from database (we need to update record: _admin.deloyed.VCA)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+
+ # for each defined NS relation, find the VCA's related
+ for r in ns_relations:
+ from_vca_ee_id = None
+ to_vca_ee_id = None
+ from_vca_endpoint = None
+ to_vca_endpoint = None
+ vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
+ for vca in vca_list:
+ if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
+ and vca.get('config_sw_installed'):
+ from_vca_ee_id = vca.get('ee_id')
+ from_vca_endpoint = r.get('entities')[0].get('endpoint')
+ if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
+ and vca.get('config_sw_installed'):
+ to_vca_ee_id = vca.get('ee_id')
+ to_vca_endpoint = r.get('entities')[1].get('endpoint')
+ if from_vca_ee_id and to_vca_ee_id:
+ # add relation
+ await self.vca_map[vca_type].add_relation(
+ ee_id_1=from_vca_ee_id,
+ ee_id_2=to_vca_ee_id,
+ endpoint_1=from_vca_endpoint,
+ endpoint_2=to_vca_endpoint)
+ # remove entry from relations list
+ ns_relations.remove(r)
+ else:
+ # check failed peers
+ try:
+ vca_status_list = db_nsr.get('configurationStatus')
+ if vca_status_list:
+ for i in range(len(vca_list)):
+ vca = vca_list[i]
+ vca_status = vca_status_list[i]
+ if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ except Exception:
+ # ignore
+ pass
+
+ # for each defined VNF relation, find the VCA's related
+ for r in vnf_relations:
+ from_vca_ee_id = None
+ to_vca_ee_id = None
+ from_vca_endpoint = None
+ to_vca_endpoint = None
+ vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
+ for vca in vca_list:
+ if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
+ from_vca_ee_id = vca.get('ee_id')
+ from_vca_endpoint = r.get('entities')[0].get('endpoint')
+ if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
+ to_vca_ee_id = vca.get('ee_id')
+ to_vca_endpoint = r.get('entities')[1].get('endpoint')
+ if from_vca_ee_id and to_vca_ee_id:
+ # add relation
+ await self.vca_map[vca_type].add_relation(
+ ee_id_1=from_vca_ee_id,
+ ee_id_2=to_vca_ee_id,
+ endpoint_1=from_vca_endpoint,
+ endpoint_2=to_vca_endpoint)
+ # remove entry from relations list
+ vnf_relations.remove(r)
+ else:
+ # check failed peers
+ try:
+ vca_status_list = db_nsr.get('configurationStatus')
+ if vca_status_list:
+ for i in range(len(vca_list)):
+ vca = vca_list[i]
+ vca_status = vca_status_list[i]
+ if vca.get('vdu_id') == r.get('entities')[0].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ if vca.get('vdu_id') == r.get('entities')[1].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ except Exception:
+ # ignore
+ pass
+
+ # wait for next try
+ await asyncio.sleep(5.0)
+
+ if not ns_relations and not vnf_relations:
+ self.logger.debug('Relations added')
+ break
+
+ return True
+
+ except Exception as e:
+ self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
+ return False
+
+ async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdur: dict, kdud: dict,
+ vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
+
+ try:
+ k8sclustertype = k8s_instance_info["k8scluster-type"]
+ # Instantiate kdu
+ db_dict_install = {"collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": nsr_db_path}
+
+ kdu_instance = await self.k8scluster_map[k8sclustertype].install(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_model=k8s_instance_info["kdu-model"],
+ atomic=True,
+ params=k8params,
+ db_dict=db_dict_install,
+ timeout=timeout,
+ kdu_name=k8s_instance_info["kdu-name"],
+ namespace=k8s_instance_info["namespace"])
+ self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
+
+ # Obtain services to obtain management service ip
+ services = await self.k8scluster_map[k8sclustertype].get_services(
+ cluster_uuid=k8s_instance_info["k8scluster-uuid"],
+ kdu_instance=kdu_instance,
+ namespace=k8s_instance_info["namespace"])
+
+ # Obtain management service info (if exists)
+ if services:
+ vnfr_update_dict = {"kdur.{}.services".format(kdu_index): services}
+ mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
+ for mgmt_service in mgmt_services:
+ for service in services:
+ if service["name"].startswith(mgmt_service["name"]):
+ # Mgmt service found, Obtain service ip
+ ip = service.get("external_ip", service.get("cluster_ip"))
+ if isinstance(ip, list) and len(ip) == 1:
+ ip = ip[0]
+
+ vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
+
+ # Check if must update also mgmt ip at the vnf
+ service_external_cp = mgmt_service.get("external-connection-point-ref")
+ if service_external_cp:
+ if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
+ vnfr_update_dict["ip-address"] = ip
+
+ break
+ else:
+ self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
+
+ self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
+
+ except Exception as e:
+ # Prepare update db with error and raise exception
+ try:
+ self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
+ except Exception:
+ # ignore to keep original exception
+ pass
+ # reraise original error
+ raise
+
+ return kdu_instance
+
+ async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
+ # Launch kdus if present in the descriptor
+
+ k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
+
+ def _get_cluster_id(cluster_id, cluster_type):
+ nonlocal k8scluster_id_2_uuic
+ if cluster_id in k8scluster_id_2_uuic[cluster_type]:
+ return k8scluster_id_2_uuic[cluster_type][cluster_id]
+
+ db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
+ if not db_k8scluster:
+ raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
+ k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
+ if not k8s_id:
+ raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
+ cluster_type))
+ k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
+ return k8s_id
+
+ logging_text += "Deploy kdus: "
+ step = ""
+ try:
+ db_nsr_update = {"_admin.deployed.K8s": []}
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ index = 0
+ updated_cluster_list = []
+
+ for vnfr_data in db_vnfrs.values():
+ for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
+ # Step 0: Prepare and set parameters
+ desc_params = self._format_additional_params(kdur.get("additionalParams"))
+ vnfd_id = vnfr_data.get('vnfd-id')
+ kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"])
+ namespace = kdur.get("k8s-namespace")
+ if kdur.get("helm-chart"):
+ kdumodel = kdur["helm-chart"]
+ k8sclustertype = "helm-chart"
+ elif kdur.get("juju-bundle"):
+ kdumodel = kdur["juju-bundle"]
+ k8sclustertype = "juju-bundle"
+ else:
+ raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
+ "juju-bundle. Maybe an old NBI version is running".
+ format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
+ # check if kdumodel is a file and exists
+ try:
+ storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
+ if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
+ # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
+ filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
+ kdumodel)
+ if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
+ kdumodel = self.fs.path + filename
+ except (asyncio.TimeoutError, asyncio.CancelledError):
+ raise
+ except Exception: # it is not a file
+ pass
+
+ k8s_cluster_id = kdur["k8s-cluster"]["id"]
+ step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
+ cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
+
+ # Synchronize repos
+ if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
+ del_repo_list, added_repo_dict = await asyncio.ensure_future(
+ self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
+ if del_repo_list or added_repo_dict:
+ unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
+ updated = {'_admin.helm_charts_added.' +
+ item: name for item, name in added_repo_dict.items()}
+ self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, "
+ "to_add: {}".format(k8s_cluster_id, del_repo_list,
+ added_repo_dict))
+ self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
+ updated_cluster_list.append(cluster_uuid)
+
+ # Instantiate kdu
+ step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
+ kdur["kdu-name"], k8s_cluster_id)
+ k8s_instance_info = {"kdu-instance": None,
+ "k8scluster-uuid": cluster_uuid,
+ "k8scluster-type": k8sclustertype,
+ "member-vnf-index": vnfr_data["member-vnf-index-ref"],
+ "kdu-name": kdur["kdu-name"],
+ "kdu-model": kdumodel,
+ "namespace": namespace}
+ db_path = "_admin.deployed.K8s.{}".format(index)
+ db_nsr_update[db_path] = k8s_instance_info
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ task = asyncio.ensure_future(
+ self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdur, kdud, db_vnfds[vnfd_id],
+ k8s_instance_info, k8params=desc_params, timeout=600))
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
+ task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
+
+ index += 1
+
+ except (LcmException, asyncio.CancelledError):
+ raise
+ except Exception as e:
+ msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
+ if isinstance(e, (N2VCException, DbException)):
+ self.logger.error(logging_text + msg)
+ else:
+ self.logger.critical(logging_text + msg, exc_info=True)
+ raise LcmException(msg)
+ finally:
+ if db_nsr_update:
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
+ kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
+ base_folder, task_instantiation_info, stage):
+ # launch instantiate_N2VC in a asyncio task and register task object
+ # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
+ # if not found, create one entry and update database
+ # fill db_nsr._admin.deployed.VCA.<index>
+
+ self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
+ if descriptor_config.get("juju"): # There is one execution envioronment of type juju
+ ee_list = [descriptor_config]
+ elif descriptor_config.get("execution-environment-list"):
+ ee_list = descriptor_config.get("execution-environment-list")
+ else: # other types as script are not supported
+ ee_list = []
+
+ for ee_item in ee_list:
+ self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
+ ee_item.get("helm-chart")))
+ ee_descriptor_id = ee_item.get("id")
+ if ee_item.get("juju"):
+ vca_name = ee_item['juju'].get('charm')
+ vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
+ if ee_item['juju'].get('cloud') == "k8s":
+ vca_type = "k8s_proxy_charm"
+ elif ee_item['juju'].get('proxy') is False:
+ vca_type = "native_charm"
+ elif ee_item.get("helm-chart"):
+ vca_name = ee_item['helm-chart']
+ vca_type = "helm"
+ else:
+ self.logger.debug(logging_text + "skipping non juju neither charm configuration")
+ continue
+
+ vca_index = -1
+ for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
+ if not vca_deployed:
+ continue
+ if vca_deployed.get("member-vnf-index") == member_vnf_index and \
+ vca_deployed.get("vdu_id") == vdu_id and \
+ vca_deployed.get("kdu_name") == kdu_name and \
+ vca_deployed.get("vdu_count_index", 0) == vdu_index and \
+ vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
+ break
+ else:
+ # not found, create one.
+ target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
+ if vdu_id:
+ target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
+ elif kdu_name:
+ target += "/kdu/{}".format(kdu_name)
+ vca_deployed = {
+ "target_element": target,
+ # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
+ "member-vnf-index": member_vnf_index,
+ "vdu_id": vdu_id,
+ "kdu_name": kdu_name,
+ "vdu_count_index": vdu_index,
+ "operational-status": "init", # TODO revise
+ "detailed-status": "", # TODO revise
+ "step": "initial-deploy", # TODO revise
+ "vnfd_id": vnfd_id,
+ "vdu_name": vdu_name,
+ "type": vca_type,
+ "ee_descriptor_id": ee_descriptor_id
+ }
+ vca_index += 1
+
+ # create VCA and configurationStatus in db
+ db_dict = {
+ "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
+ "configurationStatus.{}".format(vca_index): dict()
+ }
+ self.update_db_2("nsrs", nsr_id, db_dict)
+
+ db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
+
+ # Launch task
+ task_n2vc = asyncio.ensure_future(
+ self.instantiate_N2VC(
+ logging_text=logging_text,
+ vca_index=vca_index,
+ nsi_id=nsi_id,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ vdu_index=vdu_index,
+ deploy_params=deploy_params,
+ config_descriptor=descriptor_config,
+ base_folder=base_folder,
+ nslcmop_id=nslcmop_id,
+ stage=stage,
+ vca_type=vca_type,
+ vca_name=vca_name,
+ ee_config_descriptor=ee_item
+ )
+ )
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
+ task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
+ member_vnf_index or "", vdu_id or "")
+
+ @staticmethod
+ def _get_terminate_config_primitive(primitive_list, vca_deployed):
+ """ Get a sorted terminate config primitive list. In case ee_descriptor_id is present at vca_deployed,
+ it get only those primitives for this execution envirom"""
+
+ primitive_list = primitive_list or []
+ # filter primitives by ee_descriptor_id
+ ee_descriptor_id = vca_deployed.get("ee_descriptor_id")
+ primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
+
+ if primitive_list:
+ primitive_list.sort(key=lambda val: int(val['seq']))
+
+ return primitive_list
+
+ @staticmethod
+ def _create_nslcmop(nsr_id, operation, params):
+ """
+ Creates a ns-lcm-opp content to be stored at database.
+ :param nsr_id: internal id of the instance
+ :param operation: instantiate, terminate, scale, action, ...
+ :param params: user parameters for the operation
+ :return: dictionary following SOL005 format
+ """
+ # Raise exception if invalid arguments
+ if not (nsr_id and operation and params):
+ raise LcmException(
+ "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
+ now = time()
+ _id = str(uuid4())
+ nslcmop = {
+ "id": _id,
+ "_id": _id,
+ # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
+ "operationState": "PROCESSING",
+ "statusEnteredTime": now,
+ "nsInstanceId": nsr_id,
+ "lcmOperationType": operation,
+ "startTime": now,
+ "isAutomaticInvocation": False,
+ "operationParams": params,
+ "isCancelPending": False,
+ "links": {
+ "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
+ "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
+ }
+ }
+ return nslcmop
+
+ def _format_additional_params(self, params):
+ params = params or {}
+ for key, value in params.items():
+ if str(value).startswith("!!yaml "):
+ params[key] = yaml.safe_load(value[7:])
+ return params
+
+ def _get_terminate_primitive_params(self, seq, vnf_index):
+ primitive = seq.get('name')
+ primitive_params = {}
+ params = {
+ "member_vnf_index": vnf_index,
+ "primitive": primitive,
+ "primitive_params": primitive_params,
+ }
+ desc_params = {}
+ return self._map_primitive_params(seq, params, desc_params)
+
+ # sub-operations
+
+ def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
+ op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
+ if op.get('operationState') == 'COMPLETED':
+ # b. Skip sub-operation
+ # _ns_execute_primitive() or RO.create_action() will NOT be executed
+ return self.SUBOPERATION_STATUS_SKIP
+ else:
+ # c. retry executing sub-operation
+ # The sub-operation exists, and operationState != 'COMPLETED'
+ # Update operationState = 'PROCESSING' to indicate a retry.
+ operationState = 'PROCESSING'
+ detailed_status = 'In progress'
+ self._update_suboperation_status(
+ db_nslcmop, op_index, operationState, detailed_status)
+ # Return the sub-operation index
+ # _ns_execute_primitive() or RO.create_action() will be called from scale()
+ # with arguments extracted from the sub-operation
+ return op_index
+
+ # Find a sub-operation where all keys in a matching dictionary must match
+ # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
+ def _find_suboperation(self, db_nslcmop, match):
+ if db_nslcmop and match:
+ op_list = db_nslcmop.get('_admin', {}).get('operations', [])
+ for i, op in enumerate(op_list):
+ if all(op.get(k) == match[k] for k in match):
+ return i
+ return self.SUBOPERATION_STATUS_NOT_FOUND
+
+ # Update status for a sub-operation given its index
+ def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
+ # Update DB for HA tasks
+ q_filter = {'_id': db_nslcmop['_id']}
+ update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
+ '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
+ self.db.set_one("nslcmops",
+ q_filter=q_filter,
+ update_dict=update_dict,
+ fail_on_empty=False)
+
+ # Add sub-operation, return the index of the added sub-operation
+ # Optionally, set operationState, detailed-status, and operationType
+ # Status and type are currently set for 'scale' sub-operations:
+ # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
+ # 'detailed-status' : status message
+ # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
+ # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
+ def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
+ mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
+ RO_nsr_id=None, RO_scaling_info=None):
+ if not db_nslcmop:
+ return self.SUBOPERATION_STATUS_NOT_FOUND
+ # Get the "_admin.operations" list, if it exists
+ db_nslcmop_admin = db_nslcmop.get('_admin', {})
+ op_list = db_nslcmop_admin.get('operations')
+ # Create or append to the "_admin.operations" list
+ new_op = {'member_vnf_index': vnf_index,
+ 'vdu_id': vdu_id,
+ 'vdu_count_index': vdu_count_index,
+ 'primitive': primitive,
+ 'primitive_params': mapped_primitive_params}
+ if operationState:
+ new_op['operationState'] = operationState
+ if detailed_status:
+ new_op['detailed-status'] = detailed_status
+ if operationType:
+ new_op['lcmOperationType'] = operationType
+ if RO_nsr_id:
+ new_op['RO_nsr_id'] = RO_nsr_id
+ if RO_scaling_info:
+ new_op['RO_scaling_info'] = RO_scaling_info
+ if not op_list:
+ # No existing operations, create key 'operations' with current operation as first list element
+ db_nslcmop_admin.update({'operations': [new_op]})
+ op_list = db_nslcmop_admin.get('operations')
+ else:
+ # Existing operations, append operation to list
+ op_list.append(new_op)
+
+ db_nslcmop_update = {'_admin.operations': op_list}
+ self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
+ op_index = len(op_list) - 1
+ return op_index
+
+ # Helper methods for scale() sub-operations
+
+ # pre-scale/post-scale:
+ # Check for 3 different cases:
+ # a. New: First time execution, return SUBOPERATION_STATUS_NEW
+ # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
+ # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
+ def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
+ operationType, RO_nsr_id=None, RO_scaling_info=None):
+ # Find this sub-operation
+ if RO_nsr_id and RO_scaling_info:
+ operationType = 'SCALE-RO'
+ match = {
+ 'member_vnf_index': vnf_index,
+ 'RO_nsr_id': RO_nsr_id,
+ 'RO_scaling_info': RO_scaling_info,
+ }
+ else:
+ match = {
+ 'member_vnf_index': vnf_index,
+ 'primitive': vnf_config_primitive,
+ 'primitive_params': primitive_params,
+ 'lcmOperationType': operationType
+ }
+ op_index = self._find_suboperation(db_nslcmop, match)
+ if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
+ # a. New sub-operation
+ # The sub-operation does not exist, add it.
+ # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
+ # The following parameters are set to None for all kind of scaling:
+ vdu_id = None
+ vdu_count_index = None
+ vdu_name = None
+ if RO_nsr_id and RO_scaling_info:
+ vnf_config_primitive = None
+ primitive_params = None
+ else:
+ RO_nsr_id = None
+ RO_scaling_info = None
+ # Initial status for sub-operation
+ operationState = 'PROCESSING'
+ detailed_status = 'In progress'
+ # Add sub-operation for pre/post-scaling (zero or more operations)
+ self._add_suboperation(db_nslcmop,
+ vnf_index,
+ vdu_id,
+ vdu_count_index,
+ vdu_name,
+ vnf_config_primitive,
+ primitive_params,
+ operationState,
+ detailed_status,
+ operationType,
+ RO_nsr_id,
+ RO_scaling_info)
+ return self.SUBOPERATION_STATUS_NEW
+ else:
+ # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
+ # or op_index (operationState != 'COMPLETED')
+ return self._retry_or_skip_suboperation(db_nslcmop, op_index)
+
+ # Function to return execution_environment id
+
+ def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
+ # TODO vdu_index_count
+ for vca in vca_deployed_list:
+ if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
+ return vca["ee_id"]
+
+ async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
+ vca_index, destroy_ee=True, exec_primitives=True):
+ """
+ Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
+ :param logging_text:
+ :param db_nslcmop:
+ :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
+ :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
+ :param vca_index: index in the database _admin.deployed.VCA
+ :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
+ :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
+ not executed properly
+ :return: None or exception
+ """
+
+ self.logger.debug(
+ logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
+ vca_index, vca_deployed, config_descriptor, destroy_ee
+ )
+ )
+
+ vca_type = vca_deployed.get("type", "lxc_proxy_charm")
+
+ # execute terminate_primitives
+ if exec_primitives:
+ terminate_primitives = self._get_terminate_config_primitive(
+ config_descriptor.get("terminate-config-primitive"), vca_deployed)
+ vdu_id = vca_deployed.get("vdu_id")
+ vdu_count_index = vca_deployed.get("vdu_count_index")
+ vdu_name = vca_deployed.get("vdu_name")
+ vnf_index = vca_deployed.get("member-vnf-index")
+ if terminate_primitives and vca_deployed.get("needed_terminate"):
+ for seq in terminate_primitives:
+ # For each sequence in list, get primitive and call _ns_execute_primitive()
+ step = "Calling terminate action for vnf_member_index={} primitive={}".format(
+ vnf_index, seq.get("name"))
+ self.logger.debug(logging_text + step)
+ # Create the primitive for each sequence, i.e. "primitive": "touch"
+ primitive = seq.get('name')
+ mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
+
+ # Add sub-operation
+ self._add_suboperation(db_nslcmop,
+ vnf_index,
+ vdu_id,
+ vdu_count_index,
+ vdu_name,
+ primitive,
+ mapped_primitive_params)
+ # Sub-operations: Call _ns_execute_primitive() instead of action()
+ try:
+ result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
+ mapped_primitive_params,
+ vca_type=vca_type)
+ except LcmException:
+ # this happens when VCA is not deployed. In this case it is not needed to terminate
+ continue
+ result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
+ if result not in result_ok:
+ raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
+ "error {}".format(seq.get("name"), vnf_index, result_detail))
+ # set that this VCA do not need terminated
+ db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
+ self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
+
+ if vca_deployed.get("prometheus_jobs") and self.prometheus:
+ await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
+
+ if destroy_ee:
+ await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
+
+ async def _delete_all_N2VC(self, db_nsr: dict):
+ self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
+ namespace = "." + db_nsr["_id"]
+ try:
+ await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
+ except N2VCNotFound: # already deleted. Skip
+ pass
+ self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
+
+ async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
+ """
+ Terminates a deployment from RO
+ :param logging_text:
+ :param nsr_deployed: db_nsr._admin.deployed
+ :param nsr_id:
+ :param nslcmop_id:
+ :param stage: list of string with the content to write on db_nslcmop.detailed-status.
+ this method will update only the index 2, but it will write on database the concatenated content of the list
+ :return:
+ """
+ db_nsr_update = {}
+ failed_detail = []
+ ro_nsr_id = ro_delete_action = None
+ if nsr_deployed and nsr_deployed.get("RO"):
+ ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
+ ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
+ try:
+ if ro_nsr_id:
+ stage[2] = "Deleting ns from VIM."
+ db_nsr_update["detailed-status"] = " ".join(stage)
+ self._write_op_status(nslcmop_id, stage)
+ self.logger.debug(logging_text + stage[2])
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+ desc = await self.RO.delete("ns", ro_nsr_id)
+ ro_delete_action = desc["action_id"]
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
+ db_nsr_update["_admin.deployed.RO.nsr_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
+ if ro_delete_action:
+ # wait until NS is deleted from VIM
+ stage[2] = "Waiting ns deleted from VIM."
+ detailed_status_old = None
+ self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
+ ro_delete_action))
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+
+ delete_timeout = 20 * 60 # 20 minutes
+ while delete_timeout > 0:
+ desc = await self.RO.show(
+ "ns",
+ item_id_name=ro_nsr_id,
+ extra_item="action",
+ extra_item_id=ro_delete_action)
+
+ # deploymentStatus
+ self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
+
+ ns_status, ns_status_info = self.RO.check_action_status(desc)
+ if ns_status == "ERROR":
+ raise ROclient.ROClientException(ns_status_info)
+ elif ns_status == "BUILD":
+ stage[2] = "Deleting from VIM {}".format(ns_status_info)
+ elif ns_status == "ACTIVE":
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
+ break
+ else:
+ assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
+ if stage[2] != detailed_status_old:
+ detailed_status_old = stage[2]
+ db_nsr_update["detailed-status"] = " ".join(stage)
+ self._write_op_status(nslcmop_id, stage)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ await asyncio.sleep(5, loop=self.loop)
+ delete_timeout -= 5
+ else: # delete_timeout <= 0:
+ raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
+
+ except Exception as e:
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
+ db_nsr_update["_admin.deployed.RO.nsr_id"] = None
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
+ db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
+ self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
+ elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
+ failed_detail.append("delete conflict: {}".format(e))
+ self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
+ else:
+ failed_detail.append("delete error: {}".format(e))
+ self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))