+ return ip_address
+
+ async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
+ """
+ Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
+ """
+ my_vca = vca_deployed_list[vca_index]
+ if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
+ # vdu or kdu: no dependencies
+ return
+ timeout = 300
+ while timeout >= 0:
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
+ configuration_status_list = db_nsr["configurationStatus"]
+ for index, vca_deployed in enumerate(configuration_status_list):
+ if index == vca_index:
+ # myself
+ continue
+ if not my_vca.get("member-vnf-index") or \
+ (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
+ internal_status = configuration_status_list[index].get("status")
+ if internal_status == 'READY':
+ continue
+ elif internal_status == 'BROKEN':
+ raise LcmException("Configuration aborted because dependent charm/s has failed")
+ else:
+ break
+ else:
+ # no dependencies, return
+ return
+ await asyncio.sleep(10)
+ timeout -= 1
+
+ raise LcmException("Configuration aborted because dependent charm/s timeout")
+
+ async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
+ config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
+ ee_config_descriptor):
+ nsr_id = db_nsr["_id"]
+ db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
+ vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
+ vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
+ osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
+ db_dict = {
+ 'collection': 'nsrs',
+ 'filter': {'_id': nsr_id},
+ 'path': db_update_entry
+ }
+ step = ""
+ try:
+
+ element_type = 'NS'
+ element_under_configuration = nsr_id
+
+ vnfr_id = None
+ if db_vnfr:
+ vnfr_id = db_vnfr["_id"]
+ osm_config["osm"]["vnf_id"] = vnfr_id
+
+ namespace = "{nsi}.{ns}".format(
+ nsi=nsi_id if nsi_id else "",
+ ns=nsr_id)
+
+ if vnfr_id:
+ element_type = 'VNF'
+ element_under_configuration = vnfr_id
+ namespace += ".{}".format(vnfr_id)
+ if vdu_id:
+ namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
+ element_type = 'VDU'
+ element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
+ osm_config["osm"]["vdu_id"] = vdu_id
+ elif kdu_name:
+ namespace += ".{}".format(kdu_name)
+ element_type = 'KDU'
+ element_under_configuration = kdu_name
+ osm_config["osm"]["kdu_name"] = kdu_name
+
+ # Get artifact path
+ artifact_path = "{}/{}/{}/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
+ vca_name
+ )
+
+ self.logger.debug("Artifact path > {}".format(artifact_path))
+
+ # get initial_config_primitive_list that applies to this element
+ initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
+
+ self.logger.debug("Initial config primitive list > {}".format(initial_config_primitive_list))
+
+ # add config if not present for NS charm
+ ee_descriptor_id = ee_config_descriptor.get("id")
+ self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
+ initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list,
+ vca_deployed, ee_descriptor_id)
+
+ self.logger.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list))
+ # n2vc_redesign STEP 3.1
+ # find old ee_id if exists
+ ee_id = vca_deployed.get("ee_id")
+
+ vim_account_id = (
+ deep_get(db_vnfr, ("vim-account-id",)) or
+ deep_get(deploy_params, ("OSM", "vim_account_id"))
+ )
+ vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id)
+ vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id)
+ # create or register execution environment in VCA
+ if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='CREATING',
+ element_under_configuration=element_under_configuration,
+ element_type=element_type
+ )
+
+ step = "create execution environment"
+ self.logger.debug(logging_text + step)
+
+ ee_id = None
+ credentials = None
+ if vca_type == "k8s_proxy_charm":
+ ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
+ charm_name=artifact_path[artifact_path.rfind("/") + 1:],
+ namespace=namespace,
+ artifact_path=artifact_path,
+ db_dict=db_dict,
+ cloud_name=vca_k8s_cloud,
+ credential_name=vca_k8s_cloud_credential,
+ )
+ elif vca_type == "helm" or vca_type == "helm-v3":
+ ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
+ namespace=namespace,
+ reuse_ee_id=ee_id,
+ db_dict=db_dict,
+ config=osm_config,
+ artifact_path=artifact_path,
+ vca_type=vca_type
+ )
+ else:
+ ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
+ namespace=namespace,
+ reuse_ee_id=ee_id,
+ db_dict=db_dict,
+ cloud_name=vca_cloud,
+ credential_name=vca_cloud_credential,
+ )
+
+ elif vca_type == "native_charm":
+ step = "Waiting to VM being up and getting IP address"
+ self.logger.debug(logging_text + step)
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
+ user=None, pub_key=None)
+ credentials = {"hostname": rw_mgmt_ip}
+ # get username
+ username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
+ # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
+ # merged. Meanwhile let's get username from initial-config-primitive
+ if not username and initial_config_primitive_list:
+ for config_primitive in initial_config_primitive_list:
+ for param in config_primitive.get("parameter", ()):
+ if param["name"] == "ssh-username":
+ username = param["value"]
+ break
+ if not username:
+ raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
+ "'config-access.ssh-access.default-user'")
+ credentials["username"] = username
+ # n2vc_redesign STEP 3.2
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='REGISTERING',
+ element_under_configuration=element_under_configuration,
+ element_type=element_type
+ )
+
+ step = "register execution environment {}".format(credentials)
+ self.logger.debug(logging_text + step)
+ ee_id = await self.vca_map[vca_type].register_execution_environment(
+ credentials=credentials,
+ namespace=namespace,
+ db_dict=db_dict,
+ cloud_name=vca_cloud,
+ credential_name=vca_cloud_credential,
+ )
+
+ # for compatibility with MON/POL modules, the need model and application name at database
+ # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
+ ee_id_parts = ee_id.split('.')
+ db_nsr_update = {db_update_entry + "ee_id": ee_id}
+ if len(ee_id_parts) >= 2:
+ model_name = ee_id_parts[0]
+ application_name = ee_id_parts[1]
+ db_nsr_update[db_update_entry + "model"] = model_name
+ db_nsr_update[db_update_entry + "application"] = application_name
+
+ # n2vc_redesign STEP 3.3
+ step = "Install configuration Software"
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='INSTALLING SW',
+ element_under_configuration=element_under_configuration,
+ element_type=element_type,
+ other_update=db_nsr_update
+ )
+
+ # TODO check if already done
+ self.logger.debug(logging_text + step)
+ config = None
+ if vca_type == "native_charm":
+ config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
+ if config_primitive:
+ config = self._map_primitive_params(
+ config_primitive,
+ {},
+ deploy_params
+ )
+ num_units = 1
+ if vca_type == "lxc_proxy_charm":
+ if element_type == "NS":
+ num_units = db_nsr.get("config-units") or 1
+ elif element_type == "VNF":
+ num_units = db_vnfr.get("config-units") or 1
+ elif element_type == "VDU":
+ for v in db_vnfr["vdur"]:
+ if vdu_id == v["vdu-id-ref"]:
+ num_units = v.get("config-units") or 1
+ break
+ if vca_type != "k8s_proxy_charm":
+ await self.vca_map[vca_type].install_configuration_sw(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ db_dict=db_dict,
+ config=config,
+ num_units=num_units,
+ )
+
+ # write in db flag of configuration_sw already installed
+ self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
+
+ # add relations for this VCA (wait for other peers related with this VCA)
+ await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
+ vca_index=vca_index, vca_type=vca_type)
+
+ # if SSH access is required, then get execution environment SSH public
+ # if native charm we have waited already to VM be UP
+ if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
+ pub_key = None
+ user = None
+ # self.logger.debug("get ssh key block")
+ if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
+ # self.logger.debug("ssh key needed")
+ # Needed to inject a ssh key
+ user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
+ step = "Install configuration Software, getting public ssh key"
+ pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
+
+ step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
+ else:
+ # self.logger.debug("no need to get ssh key")
+ step = "Waiting to VM being up and getting IP address"
+ self.logger.debug(logging_text + step)
+
+ # n2vc_redesign STEP 5.1
+ # wait for RO (ip-address) Insert pub_key into VM
+ if vnfr_id:
+ if kdu_name:
+ rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
+ else:
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
+ vdu_index, user=user, pub_key=pub_key)
+ else:
+ rw_mgmt_ip = None # This is for a NS configuration
+
+ self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
+
+ # store rw_mgmt_ip in deploy params for later replacement
+ deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
+
+ # n2vc_redesign STEP 6 Execute initial config primitive
+ step = 'execute initial config primitive'
+
+ # wait for dependent primitives execution (NS -> VNF -> VDU)
+ if initial_config_primitive_list:
+ await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
+
+ # stage, in function of element type: vdu, kdu, vnf or ns
+ my_vca = vca_deployed_list[vca_index]
+ if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
+ # VDU or KDU
+ stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
+ elif my_vca.get("member-vnf-index"):
+ # VNF
+ stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
+ else:
+ # NS
+ stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='EXECUTING PRIMITIVE'
+ )
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage=stage
+ )
+
+ check_if_terminated_needed = True
+ for initial_config_primitive in initial_config_primitive_list:
+ # adding information on the vca_deployed if it is a NS execution environment
+ if not vca_deployed["member-vnf-index"]:
+ deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
+ # TODO check if already done
+ primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
+
+ step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
+ self.logger.debug(logging_text + step)
+ await self.vca_map[vca_type].exec_primitive(
+ ee_id=ee_id,
+ primitive_name=initial_config_primitive["name"],
+ params_dict=primitive_params_,
+ db_dict=db_dict
+ )
+ # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
+ if check_if_terminated_needed:
+ if config_descriptor.get('terminate-config-primitive'):
+ self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
+ check_if_terminated_needed = False
+
+ # TODO register in database that primitive is done
+
+ # STEP 7 Configure metrics
+ if vca_type == "helm" or vca_type == "helm-v3":
+ prometheus_jobs = await self.add_prometheus_metrics(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ ee_config_descriptor=ee_config_descriptor,
+ vnfr_id=vnfr_id,
+ nsr_id=nsr_id,
+ target_ip=rw_mgmt_ip,
+ )
+ if prometheus_jobs:
+ self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
+
+ step = "instantiated at VCA"
+ self.logger.debug(logging_text + step)
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='READY'
+ )
+
+ except Exception as e: # TODO not use Exception but N2VC exception
+ # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
+ if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
+ self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='BROKEN'
+ )
+ raise LcmException("{} {}".format(step, e)) from e
+
+ def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
+ error_description: str = None, error_detail: str = None, other_update: dict = None):
+ """
+ Update db_nsr fields.
+ :param nsr_id:
+ :param ns_state:
+ :param current_operation:
+ :param current_operation_id:
+ :param error_description:
+ :param error_detail:
+ :param other_update: Other required changes at database if provided, will be cleared
+ :return:
+ """
+ try:
+ db_dict = other_update or {}
+ db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
+ db_dict["_admin.current-operation"] = current_operation_id
+ db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
+ db_dict["currentOperation"] = current_operation
+ db_dict["currentOperationID"] = current_operation_id
+ db_dict["errorDescription"] = error_description
+ db_dict["errorDetail"] = error_detail
+
+ if ns_state:
+ db_dict["nsState"] = ns_state
+ self.update_db_2("nsrs", nsr_id, db_dict)
+ except DbException as e:
+ self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
+
+ def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
+ operation_state: str = None, other_update: dict = None):
+ try:
+ db_dict = other_update or {}
+ db_dict['queuePosition'] = queuePosition
+ if isinstance(stage, list):
+ db_dict['stage'] = stage[0]
+ db_dict['detailed-status'] = " ".join(stage)
+ elif stage is not None:
+ db_dict['stage'] = str(stage)
+
+ if error_message is not None:
+ db_dict['errorMessage'] = error_message
+ if operation_state is not None:
+ db_dict['operationState'] = operation_state
+ db_dict["statusEnteredTime"] = time()
+ self.update_db_2("nslcmops", op_id, db_dict)
+ except DbException as e:
+ self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
+
+ def _write_all_config_status(self, db_nsr: dict, status: str):
+ try:
+ nsr_id = db_nsr["_id"]
+ # configurationStatus
+ config_status = db_nsr.get('configurationStatus')
+ if config_status:
+ db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
+ enumerate(config_status) if v}
+ # update status
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ except DbException as e:
+ self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
+
+ def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
+ element_under_configuration: str = None, element_type: str = None,
+ other_update: dict = None):
+
+ # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
+ # .format(vca_index, status))
+
+ try:
+ db_path = 'configurationStatus.{}.'.format(vca_index)
+ db_dict = other_update or {}
+ if status:
+ db_dict[db_path + 'status'] = status
+ if element_under_configuration:
+ db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
+ if element_type:
+ db_dict[db_path + 'elementType'] = element_type
+ self.update_db_2("nsrs", nsr_id, db_dict)
+ except DbException as e:
+ self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
+ .format(status, nsr_id, vca_index, e))
+
+ async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
+ """
+ Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
+ sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
+ Database is used because the result can be obtained from a different LCM worker in case of HA.
+ :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
+ :param db_nslcmop: database content of nslcmop
+ :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
+ :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
+ computed 'vim-account-id'
+ """
+ modified = False
+ nslcmop_id = db_nslcmop['_id']
+ placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
+ if placement_engine == "PLA":
+ self.logger.debug(logging_text + "Invoke and wait for placement optimization")
+ await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
+ db_poll_interval = 5
+ wait = db_poll_interval * 10
+ pla_result = None
+ while not pla_result and wait >= 0:
+ await asyncio.sleep(db_poll_interval)
+ wait -= db_poll_interval
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
+
+ if not pla_result:
+ raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
+
+ for pla_vnf in pla_result['vnf']:
+ vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
+ if not pla_vnf.get('vimAccountId') or not vnfr:
+ continue
+ modified = True
+ self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
+ # Modifies db_vnfrs
+ vnfr["vim-account-id"] = pla_vnf['vimAccountId']
+ return modified
+
+ def update_nsrs_with_pla_result(self, params):
+ try:
+ nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
+ self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
+ except Exception as e:
+ self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))