find_software_version,
)
from osm_lcm.data_utils.list_utils import find_in_list
-from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
+from osm_lcm.data_utils.vnfr import (
+ get_osm_params,
+ get_vdur_index,
+ get_kdur,
+ get_volumes_from_instantiation_params,
+)
from osm_lcm.data_utils.dict_utils import parse_yaml_strings
from osm_lcm.data_utils.database.vim_account import VimAccountDB
from n2vc.definitions import RelationEndpoint
def _get_vdu_additional_params(self, db_vnfr, vdu_id):
vdur = next(
- (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
- {}
+ (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]), {}
)
additional_params = vdur.get("additionalParams")
return parse_yaml_strings(additional_params)
)
if not vdur:
# Read the template saved in the db:
- self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
+ self.logger.debug(
+ "No vdur in the database. Using the vdur-template to scale"
+ )
vdur_template = db_vnfr.get("vdur-template")
if not vdur_template:
raise LcmException(
- "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
- vdu_id
+ "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
+ vdu_id
)
)
vdur = vdur_template[0]
- #Delete a template from the database after using it
- self.db.set_one("vnfrs",
- {"_id": db_vnfr["_id"]},
- None,
- pull={"vdur-template": {"_id": vdur['_id']}}
- )
+ # Delete a template from the database after using it
+ self.db.set_one(
+ "vnfrs",
+ {"_id": db_vnfr["_id"]},
+ None,
+ pull={"vdur-template": {"_id": vdur["_id"]}},
+ )
for count in range(vdu_count):
vdur_copy = deepcopy(vdur)
vdur_copy["status"] = "BUILD"
if vdu_delete:
if len(db_vnfr["vdur"]) == 1:
# The scale will move to 0 instances
- self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
+ self.logger.debug(
+ "Scaling to 0 !, creating the template with the last vdur"
+ )
template_vdur = [db_vnfr["vdur"][0]]
for vdu_id, vdu_count in vdu_delete.items():
if mark_delete:
vdur["vim_info"] = {target_vim: {}}
# instantiation parameters
- # if vnf_params:
- # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
- # vdud["id"]), None)
+ if vnf_params:
+ vdu_instantiation_params = find_in_list(
+ get_iterable(vnf_params, "vdu"),
+ lambda i_vdu: i_vdu["id"] == vdud["id"],
+ )
+ if vdu_instantiation_params:
+ # Parse the vdu_volumes from the instantiation params
+ vdu_volumes = get_volumes_from_instantiation_params(
+ vdu_instantiation_params, vdud
+ )
+ vdur["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
vdur_list.append(vdur)
target_vnf["vdur"] = vdur_list
target["vnf"].append(target_vnf)
step = "Waiting to VM being up and getting IP address"
self.logger.debug(logging_text + step)
+ # default rw_mgmt_ip to None, avoiding the non definition of the variable
+ rw_mgmt_ip = None
+
# n2vc_redesign STEP 5.1
# wait for RO (ip-address) Insert pub_key into VM
if vnfr_id:
},
vca_id=vca_id,
)
- else:
+
+ # This verification is needed in order to avoid trying to add a public key
+ # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
+ # for a KNF and not for its KDUs, the previous verification gives False, and the code
+ # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
+ # or it is a KNF)
+ elif db_vnfr.get('vdur'):
rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
logging_text,
nsr_id,
pub_key=pub_key,
)
- else:
- rw_mgmt_ip = None # This is for a NS configuration
-
self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
# store rw_mgmt_ip in deploy params for later replacement
kdu_model=k8s_instance_info["kdu-model"],
kdu_name=k8s_instance_info["kdu-name"],
)
+
+ # Update the nsrs table with the kdu-instance value
self.update_db_2(
- "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
+ item="nsrs",
+ _id=nsr_id,
+ _desc={nsr_db_path + ".kdu-instance": kdu_instance},
)
+
+ # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
+ # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
+ # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
+ # namespace, this first verification could be removed, and the next step would be done for any kind
+ # of KNF.
+ # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
+ # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
+ if k8sclustertype in ("juju", "juju-bundle"):
+ # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
+ # that the user passed a namespace which he wants its KDU to be deployed in)
+ if (
+ self.db.count(
+ table="nsrs",
+ q_filter={
+ "_id": nsr_id,
+ "_admin.projects_write": k8s_instance_info["namespace"],
+ "_admin.projects_read": k8s_instance_info["namespace"],
+ },
+ )
+ > 0
+ ):
+ self.logger.debug(
+ f"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
+ )
+ self.update_db_2(
+ item="nsrs",
+ _id=nsr_id,
+ _desc={f"{nsr_db_path}.namespace": kdu_instance},
+ )
+ k8s_instance_info["namespace"] = kdu_instance
+
await self.k8scluster_map[k8sclustertype].install(
cluster_uuid=k8s_instance_info["k8scluster-uuid"],
kdu_model=k8s_instance_info["kdu-model"],
kdu_instance=kdu_instance,
vca_id=vca_id,
)
- self.update_db_2(
- "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
- )
# Obtain services to obtain management service ip
services = await self.k8scluster_map[k8sclustertype].get_services(
vnfd_with_id,
k8s_instance_info,
k8params=desc_params,
- timeout=600,
+ timeout=1800,
vca_id=vca_id,
)
)
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu_instance,
vca_id=vca_id,
+ namespace=kdu.get("namespace"),
)
)
else:
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
return nslcmop_operation_state, detailed_status
+ async def terminate_vdus(
+ self, db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text
+ ):
+ """This method terminates VDUs
+
+ Args:
+ db_vnfr: VNF instance record
+ member_vnf_index: VNF index to identify the VDUs to be removed
+ db_nsr: NS instance record
+ update_db_nslcmops: Nslcmop update record
+ """
+ vca_scaling_info = []
+ scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
+ scaling_info["scaling_direction"] = "IN"
+ scaling_info["vdu-delete"] = {}
+ scaling_info["kdu-delete"] = {}
+ db_vdur = db_vnfr.get("vdur")
+ vdur_list = copy(db_vdur)
+ count_index = 0
+ for index, vdu in enumerate(vdur_list):
+ vca_scaling_info.append(
+ {
+ "osm_vdu_id": vdu["vdu-id-ref"],
+ "member-vnf-index": member_vnf_index,
+ "type": "delete",
+ "vdu_index": count_index,
+ })
+ scaling_info["vdu-delete"][vdu["vdu-id-ref"]] = count_index
+ scaling_info["vdu"].append(
+ {
+ "name": vdu.get("name") or vdu.get("vdu-name"),
+ "vdu_id": vdu["vdu-id-ref"],
+ "interface": [],
+ })
+ for interface in vdu["interfaces"]:
+ scaling_info["vdu"][index]["interface"].append(
+ {
+ "name": interface["name"],
+ "ip_address": interface["ip-address"],
+ "mac_address": interface.get("mac-address"),
+ })
+ self.logger.info("NS update scaling info{}".format(scaling_info))
+ stage[2] = "Terminating VDUs"
+ if scaling_info.get("vdu-delete"):
+ # scale_process = "RO"
+ if self.ro_config.get("ng"):
+ await self._scale_ng_ro(
+ logging_text, db_nsr, update_db_nslcmops, db_vnfr, scaling_info, stage
+ )
+
+ async def remove_vnf(
+ self, nsr_id, nslcmop_id, vnf_instance_id
+ ):
+ """This method is to Remove VNF instances from NS.
+
+ Args:
+ nsr_id: NS instance id
+ nslcmop_id: nslcmop id of update
+ vnf_instance_id: id of the VNF instance to be removed
+
+ Returns:
+ result: (str, str) COMPLETED/FAILED, details
+ """
+ try:
+ db_nsr_update = {}
+ logging_text = "Task ns={} update ".format(nsr_id)
+ check_vnfr_count = len(self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}))
+ self.logger.info("check_vnfr_count {}".format(check_vnfr_count))
+ if check_vnfr_count > 1:
+ stage = ["", "", ""]
+ step = "Getting nslcmop from database"
+ self.logger.debug(step + " after having waited for previous tasks to be completed")
+ # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+ """ db_vnfr = self.db.get_one(
+ "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
+
+ update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ await self.terminate_vdus(db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text)
+
+ constituent_vnfr = db_nsr.get("constituent-vnfr-ref")
+ constituent_vnfr.remove(db_vnfr.get("_id"))
+ db_nsr_update["constituent-vnfr-ref"] = db_nsr.get("constituent-vnfr-ref")
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self.db.del_one("vnfrs", {"_id": db_vnfr.get("_id")})
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ return "COMPLETED", "Done"
+ else:
+ step = "Terminate VNF Failed with"
+ raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
+ vnf_instance_id))
+ except (LcmException, asyncio.CancelledError):
+ raise
+ except Exception as e:
+ self.logger.debug("Error removing VNF {}".format(e))
+ return "FAILED", "Error removing VNF {}".format(e)
+
+ async def _ns_redeploy_vnf(
+ self, nsr_id, nslcmop_id, db_vnfd, db_vnfr, db_nsr,
+ ):
+ """This method updates and redeploys VNF instances
+
+ Args:
+ nsr_id: NS instance id
+ nslcmop_id: nslcmop id
+ db_vnfd: VNF descriptor
+ db_vnfr: VNF instance record
+ db_nsr: NS instance record
+
+ Returns:
+ result: (str, str) COMPLETED/FAILED, details
+ """
+ try:
+ count_index = 0
+ stage = ["", "", ""]
+ logging_text = "Task ns={} update ".format(nsr_id)
+ latest_vnfd_revision = db_vnfd["_admin"].get("revision")
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+
+ # Terminate old VNF resources
+ update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ await self.terminate_vdus(db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text)
+
+ # old_vnfd_id = db_vnfr["vnfd-id"]
+ # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+ new_db_vnfd = db_vnfd
+ # new_vnfd_ref = new_db_vnfd["id"]
+ # new_vnfd_id = vnfd_id
+
+ # Create VDUR
+ new_vnfr_cp = []
+ for cp in new_db_vnfd.get("ext-cpd", ()):
+ vnf_cp = {
+ "name": cp.get("id"),
+ "connection-point-id": cp.get("int-cpd", {}).get("cpd"),
+ "connection-point-vdu-id": cp.get("int-cpd", {}).get("vdu-id"),
+ "id": cp.get("id"),
+ }
+ new_vnfr_cp.append(vnf_cp)
+ new_vdur = update_db_nslcmops["operationParams"]["newVdur"]
+ # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
+ # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
+ new_vnfr_update = {"revision": latest_vnfd_revision, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
+ self.update_db_2("vnfrs", db_vnfr["_id"], new_vnfr_update)
+ updated_db_vnfr = self.db.get_one(
+ "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}
+ )
+
+ # Instantiate new VNF resources
+ # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ vca_scaling_info = []
+ scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
+ scaling_info["scaling_direction"] = "OUT"
+ scaling_info["vdu-create"] = {}
+ scaling_info["kdu-create"] = {}
+ vdud_instantiate_list = db_vnfd["vdu"]
+ for index, vdud in enumerate(vdud_instantiate_list):
+ cloud_init_text = self._get_vdu_cloud_init_content(
+ vdud, db_vnfd
+ )
+ if cloud_init_text:
+ additional_params = (
+ self._get_vdu_additional_params(updated_db_vnfr, vdud["id"])
+ or {}
+ )
+ cloud_init_list = []
+ if cloud_init_text:
+ # TODO Information of its own ip is not available because db_vnfr is not updated.
+ additional_params["OSM"] = get_osm_params(
+ updated_db_vnfr, vdud["id"], 1
+ )
+ cloud_init_list.append(
+ self._parse_cloud_init(
+ cloud_init_text,
+ additional_params,
+ db_vnfd["id"],
+ vdud["id"],
+ )
+ )
+ vca_scaling_info.append(
+ {
+ "osm_vdu_id": vdud["id"],
+ "member-vnf-index": member_vnf_index,
+ "type": "create",
+ "vdu_index": count_index,
+ }
+ )
+ scaling_info["vdu-create"][vdud["id"]] = count_index
+ if self.ro_config.get("ng"):
+ self.logger.debug(
+ "New Resources to be deployed: {}".format(scaling_info))
+ await self._scale_ng_ro(
+ logging_text, db_nsr, update_db_nslcmops, updated_db_vnfr, scaling_info, stage
+ )
+ return "COMPLETED", "Done"
+ except (LcmException, asyncio.CancelledError):
+ raise
+ except Exception as e:
+ self.logger.debug("Error updating VNF {}".format(e))
+ return "FAILED", "Error updating VNF {}".format(e)
+
async def _ns_charm_upgrade(
self,
ee_id,
db_nsr_update = {}
error_description_nslcmop = ""
exc = None
- change_type = ""
+ change_type = "updated"
detailed_status = ""
try:
step = "Checking if revision has changed in VNFD"
if current_vnf_revision != latest_vnfd_revision:
+ change_type = "policy_updated"
+
# There is new revision of VNFD, update operation is required
current_vnfd_path = vnfd_id + ":" + str(current_vnf_revision)
- latest_vnfd_path = vnfd_id
+ latest_vnfd_path = vnfd_id + ":" + str(latest_vnfd_revision)
step = "Removing the VNFD packages if they exist in the local path"
shutil.rmtree(self.fs.path + current_vnfd_path, ignore_errors=True)
base_folder,
charm_name,
charm_type,
+ latest_vnfd_revision,
)
)
# There is no change in the charm package, then redeploy the VNF
# based on new descriptor
step = "Redeploying VNF"
- # This part is in https://osm.etsi.org/gerrit/11943
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+ (
+ result,
+ detailed_status
+ ) = await self._ns_redeploy_vnf(
+ nsr_id,
+ nslcmop_id,
+ latest_vnfd,
+ db_vnfr,
+ db_nsr
+ )
+ if result == "FAILED":
+ nslcmop_operation_state = result
+ error_description_nslcmop = detailed_status
+ db_nslcmop_update["detailed-status"] = detailed_status
+ self.logger.debug(
+ logging_text
+ + " step {} Done with result {} {}".format(
+ step, nslcmop_operation_state, detailed_status
+ )
+ )
else:
step = "Checking if any charm package has changed or not"
)
step = "Updating policies"
- # This part is in https://osm.etsi.org/gerrit/11943
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+ result = "COMPLETED"
+ detailed_status = "Done"
+ db_nslcmop_update["detailed-status"] = "Done"
# If nslcmop_operation_state is None, so any operation is not failed.
if not nslcmop_operation_state:
)
elif update_type == "REMOVE_VNF":
# This part is included in https://osm.etsi.org/gerrit/11876
- pass
+ vnf_instance_id = db_nslcmop["operationParams"]["removeVnfInstanceId"]
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+ step = "Removing VNF"
+ (result, detailed_status) = await self.remove_vnf(nsr_id, nslcmop_id, vnf_instance_id)
+ if result == "FAILED":
+ nslcmop_operation_state = result
+ error_description_nslcmop = detailed_status
+ db_nslcmop_update["detailed-status"] = detailed_status
+ change_type = "vnf_terminated"
+ if not nslcmop_operation_state:
+ nslcmop_operation_state = "COMPLETED"
+ self.logger.debug(
+ logging_text
+ + " task Done with result {} {}".format(
+ nslcmop_operation_state, detailed_status
+ )
+ )
# If nslcmop_operation_state is None, so any operation is not failed.
# All operations are executed in overall.
if nslcmop_operation_state:
try:
- await self.msg.aiowrite(
- "ns",
- "updated",
- {
- "nsr_id": nsr_id,
- "nslcmop_id": nslcmop_id,
- "operationState": nslcmop_operation_state,
- },
- loop=self.loop,
- )
+ msg = {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ }
+ if change_type in ("vnf_terminated", "policy_updated"):
+ msg.update({"vnf_member_index": member_vnf_index})
+ await self.msg.aiowrite("ns", change_type, msg, loop=self.loop)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
try:
# wait for any previous tasks in process
step = "Waiting for previous operations to terminate"
- await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
self._write_ns_status(
nsr_id=nsr_id,
ns_state=None,
current_operation="MIGRATING",
- current_operation_id=nslcmop_id
+ current_operation_id=nslcmop_id,
)
step = "Getting nslcmop from database"
- self.logger.debug(step + " after having waited for previous tasks to be completed")
+ self.logger.debug(
+ step + " after having waited for previous tasks to be completed"
+ )
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
migrate_params = db_nslcmop.get("operationParams")
exc = "Operation was cancelled"
except Exception as e:
exc = traceback.format_exc()
- self.logger.critical("Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
+ self.logger.critical(
+ "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True
+ )
finally:
self._write_ns_status(
nsr_id=nsr_id,
current_operation_id=None,
)
if exc:
- db_nslcmop_update[
- "detailed-status"
- ] = "FAILED {}: {}".format(step, exc)
+ db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
nslcmop_operation_state = "FAILED"
else:
nslcmop_operation_state = "COMPLETED"