##
import asyncio
+import shutil
from typing import Any, Dict, List
import yaml
import logging
deep_get,
get_iterable,
populate_dict,
+ check_juju_bundle_existence,
+ get_charm_artifact_path,
)
from osm_lcm.data_utils.nsd import (
get_ns_configuration_relation_list,
get_vnf_profiles,
)
from osm_lcm.data_utils.vnfd import (
+ get_kdu,
+ get_kdu_services,
get_relation_list,
get_vdu_list,
get_vdu_profile,
get_number_of_instances,
get_juju_ee_ref,
get_kdu_resource_profile,
+ find_software_version,
)
from osm_lcm.data_utils.list_utils import find_in_list
from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
from osm_lcm.lcm_helm_conn import LCMHelmConn
+from osm_lcm.osm_config import OsmConfigBuilder
from osm_lcm.prometheus import parse_job
from copy import copy, deepcopy
timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
timeout_charm_delete = 10 * 60
timeout_primitive = 30 * 60 # timeout for primitive execution
+ timeout_ns_update = 30 * 60 # timeout for ns update
timeout_progress_primitive = (
10 * 60
) # timeout for some progress in a primitive execution
+ timeout_migrate = 1800 # default global timeout for migrating vnfs
SUBOPERATION_STATUS_NOT_FOUND = -1
SUBOPERATION_STATUS_NEW = -2
self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
async def _on_update_k8s_db(
- self, cluster_uuid, kdu_instance, filter=None, vca_id=None
+ self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju"
):
"""
Updating vca status in NSR record
:param cluster_uuid: UUID of a k8s cluster
:param kdu_instance: The unique name of the KDU instance
:param filter: To get nsr_id
+ :cluster_type: The cluster type (juju, k8s)
:return: none
"""
# self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
# .format(cluster_uuid, kdu_instance, filter))
+ nsr_id = filter.get("_id")
try:
- nsr_id = filter.get("_id")
-
- # get vca status for NS
- vca_status = await self.k8sclusterjuju.status_kdu(
- cluster_uuid,
- kdu_instance,
- complete_status=True,
+ vca_status = await self.k8scluster_map[cluster_type].status_kdu(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
yaml_format=False,
+ complete_status=True,
vca_id=vca_id,
)
+
# vcaStatus
db_dict = dict()
db_dict["vcaStatus"] = {nsr_id: vca_status}
- await self.k8sclusterjuju.update_vca_status(
- db_dict["vcaStatus"],
- kdu_instance,
- vca_id=vca_id,
+ if cluster_type in ("juju-bundle", "juju"):
+ # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
+ # status in a similar way between Juju Bundles and Helm Charts on this side
+ await self.k8sclusterjuju.update_vca_status(
+ db_dict["vcaStatus"],
+ kdu_instance,
+ vca_id=vca_id,
+ )
+
+ self.logger.debug(
+ f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
)
# write to database
self.update_db_2("nsrs", nsr_id, db_dict)
-
except (asyncio.CancelledError, asyncio.TimeoutError):
raise
except Exception as e:
def _get_vdu_additional_params(self, db_vnfr, vdu_id):
vdur = next(
- vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]
+ (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
+ {}
)
additional_params = vdur.get("additionalParams")
return parse_yaml_strings(additional_params)
def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
db_vdu_push_list = []
+ template_vdur = []
db_update = {"_admin.modified": time()}
if vdu_create:
for vdu_id, vdu_count in vdu_create.items():
None,
)
if not vdur:
- raise LcmException(
- "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
+ # Read the template saved in the db:
+ self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
+ vdur_template = db_vnfr.get("vdur-template")
+ if not vdur_template:
+ raise LcmException(
+ "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
vdu_id
+ )
)
- )
-
+ vdur = vdur_template[0]
+ #Delete a template from the database after using it
+ self.db.set_one("vnfrs",
+ {"_id": db_vnfr["_id"]},
+ None,
+ pull={"vdur-template": {"_id": vdur['_id']}}
+ )
for count in range(vdu_count):
vdur_copy = deepcopy(vdur)
vdur_copy["status"] = "BUILD"
)
else:
iface.pop("mac-address", None)
- iface.pop(
- "mgmt_vnf", None
- ) # only first vdu can be managment of vnf
+ if db_vnfr["vdur"]:
+ iface.pop(
+ "mgmt_vnf", None
+ ) # only first vdu can be managment of vnf
db_vdu_push_list.append(vdur_copy)
# self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
if vdu_delete:
+ if len(db_vnfr["vdur"]) == 1:
+ # The scale will move to 0 instances
+ self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
+ template_vdur = [db_vnfr["vdur"][0]]
for vdu_id, vdu_count in vdu_delete.items():
if mark_delete:
indexes_to_delete = [
None,
pull={"vdur": {"_id": vdu["_id"]}},
)
- db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
+ db_push = {}
+ if db_vdu_push_list:
+ db_push["vdur"] = db_vdu_push_list
+ if template_vdur:
+ db_push["vdur-template"] = template_vdur
+ if not db_push:
+ db_push = None
+ db_vnfr["vdur-template"] = template_vdur
self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
# modify passed dictionary db_vnfr
db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
if vld_params.get("common_id"):
target_vld["common_id"] = vld_params.get("common_id")
+ # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
+ def update_ns_vld_target(target, ns_params):
+ for vnf_params in ns_params.get("vnf", ()):
+ if vnf_params.get("vimAccountId"):
+ target_vnf = next(
+ (
+ vnfr
+ for vnfr in db_vnfrs.values()
+ if vnf_params["member-vnf-index"]
+ == vnfr["member-vnf-index-ref"]
+ ),
+ None,
+ )
+ vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
+ for a_index, a_vld in enumerate(target["ns"]["vld"]):
+ target_vld = find_in_list(
+ get_iterable(vdur, "interfaces"),
+ lambda iface: iface.get("ns-vld-id") == a_vld["name"],
+ )
+ if target_vld:
+ if vnf_params.get("vimAccountId") not in a_vld.get(
+ "vim_info", {}
+ ):
+ target["ns"]["vld"][a_index].get("vim_info").update(
+ {
+ "vim:{}".format(vnf_params["vimAccountId"]): {
+ "vim_network_name": ""
+ }
+ }
+ )
+
nslcmop_id = db_nslcmop["_id"]
target = {
"name": db_nsr["name"],
image["vim_info"] = {}
for flavor in target["flavor"]:
flavor["vim_info"] = {}
+ if db_nsr.get("affinity-or-anti-affinity-group"):
+ target["affinity-or-anti-affinity-group"] = deepcopy(
+ db_nsr["affinity-or-anti-affinity-group"]
+ )
+ for affinity_or_anti_affinity_group in target[
+ "affinity-or-anti-affinity-group"
+ ]:
+ affinity_or_anti_affinity_group["vim_info"] = {}
if db_nslcmop.get("lcmOperationType") != "instantiate":
# get parameters of instantiation:
vld_params.update(vld_instantiation_params)
parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
target["ns"]["vld"].append(target_vld)
+ # Update the target ns_vld if vnf vim_account is overriden by instantiation params
+ update_ns_vld_target(target, ns_params)
for vnfr in db_vnfrs.values():
vnfd = find_in_list(
if target_vim not in ns_image["vim_info"]:
ns_image["vim_info"][target_vim] = {}
+ # Affinity groups
+ if vdur.get("affinity-or-anti-affinity-group-id"):
+ for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
+ ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
+ if target_vim not in ns_ags["vim_info"]:
+ ns_ags["vim_info"][target_vim] = {}
+
vdur["vim_info"] = {target_vim: {}}
# instantiation parameters
# if vnf_params:
:param nsr_id:
:param vnfr_id:
:param kdu_name:
- :return: IP address
+ :return: IP address, K8s services
"""
# self.logger.debug(logging_text + "Starting wait_kdu_up")
)
if kdur.get("status"):
if kdur["status"] in ("READY", "ENABLED"):
- return kdur.get("ip-address")
+ return kdur.get("ip-address"), kdur.get("services")
else:
raise LcmException(
"target KDU={} is in error state".format(kdu_name)
base_folder["folder"],
base_folder["pkg-dir"],
"charms"
- if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
+ if vca_type
+ in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
else "helm-charts",
vca_name,
)
artifact_path = "{}/Scripts/{}/{}/".format(
base_folder["folder"],
"charms"
- if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
+ if vca_type
+ in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
else "helm-charts",
vca_name,
)
# wait for RO (ip-address) Insert pub_key into VM
if vnfr_id:
if kdu_name:
- rw_mgmt_ip = await self.wait_kdu_up(
+ rw_mgmt_ip, services = await self.wait_kdu_up(
logging_text, nsr_id, vnfr_id, kdu_name
)
+ vnfd = self.db.get_one(
+ "vnfds_revisions",
+ {"_id": f'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
+ )
+ kdu = get_kdu(vnfd, kdu_name)
+ kdu_services = [
+ service["name"] for service in get_kdu_services(kdu)
+ ]
+ exposed_services = []
+ for service in services:
+ if any(s in service["name"] for s in kdu_services):
+ exposed_services.append(service)
+ await self.vca_map[vca_type].exec_primitive(
+ ee_id=ee_id,
+ primitive_name="config",
+ params_dict={
+ "osm-config": json.dumps(
+ OsmConfigBuilder(
+ k8s={"services": exposed_services}
+ ).build()
+ )
+ },
+ vca_id=vca_id,
+ )
else:
rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
logging_text,
user=user,
pub_key=pub_key,
)
+
else:
rw_mgmt_ip = None # This is for a NS configuration
for job in prometheus_jobs:
self.db.set_one(
"prometheus_jobs",
- {
- "job_name": job["job_name"]
- },
+ {"job_name": job["job_name"]},
job,
upsert=True,
- fail_on_empty=False
+ fail_on_empty=False,
)
step = "instantiated at VCA"
kdur_list = []
for kdur in vnfr["kdur"]:
if kdur.get("additionalParams"):
- kdur["additionalParams"] = json.loads(kdur["additionalParams"])
+ kdur["additionalParams"] = json.loads(
+ kdur["additionalParams"]
+ )
kdur_list.append(kdur)
vnfr["kdur"] = kdur_list
)
deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
if kdur.get("additionalParams"):
- deploy_params_kdu = parse_yaml_strings(
- kdur["additionalParams"]
+ deploy_params_kdu.update(
+ parse_yaml_strings(kdur["additionalParams"].copy())
)
self._deploy_n2vc(
if requirer_id != nsd["id"]:
requirer_dict["vnf-profile-id"] = requirer_id
else:
- raise Exception("provider/requirer or entities must be included in the relation.")
+ raise Exception(
+ "provider/requirer or entities must be included in the relation."
+ )
relation_provider = self._update_ee_relation_data_with_implicit_data(
nsr_id, nsd, provider_dict, cached_vnfds
)
if requirer_id != vnfd_id:
requirer_dict["vdu-profile-id"] = requirer_id
else:
- raise Exception("provider/requirer or entities must be included in the relation.")
+ raise Exception(
+ "provider/requirer or entities must be included in the relation."
+ )
relation_provider = self._update_ee_relation_data_with_implicit_data(
nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
)
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu_instance,
vca_id=vca_id,
+ namespace=kdu.get("namespace"),
)
)
else:
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
vca_id = self.get_vca_id({}, db_nsr)
if db_nsr["_admin"]["deployed"]["K8s"]:
- for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
- cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
+ for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
+ cluster_uuid, kdu_instance, cluster_type = (
+ k8s["k8scluster-uuid"],
+ k8s["kdu-instance"],
+ k8s["k8scluster-type"],
+ )
await self._on_update_k8s_db(
- cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ filter={"_id": nsr_id},
+ vca_id=vca_id,
+ cluster_type=cluster_type,
)
else:
for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
db_vnfr = self.db.get_one(
"vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
)
+ if db_vnfr.get("kdur"):
+ kdur_list = []
+ for kdur in db_vnfr["kdur"]:
+ if kdur.get("additionalParams"):
+ kdur["additionalParams"] = json.loads(
+ kdur["additionalParams"]
+ )
+ kdur_list.append(kdur)
+ db_vnfr["kdur"] = kdur_list
step = "Getting vnfd from database"
db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
actions.add(primitive["name"])
for primitive in kdu_configuration.get("config-primitive", []):
actions.add(primitive["name"])
- kdu_action = True if primitive_name in actions else False
+ kdu = find_in_list(
+ nsr_deployed["K8s"],
+ lambda kdu: kdu_name == kdu["kdu-name"]
+ and kdu["member-vnf-index"] == vnf_index,
+ )
+ kdu_action = (
+ True
+ if primitive_name in actions
+ and kdu["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
+ else False
+ )
# TODO check if ns is in a proper status
if kdu_name and (
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
return nslcmop_operation_state, detailed_status
+ async def terminate_vdus(
+ self, db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text
+ ):
+ """This method terminates VDUs
+
+ Args:
+ db_vnfr: VNF instance record
+ member_vnf_index: VNF index to identify the VDUs to be removed
+ db_nsr: NS instance record
+ update_db_nslcmops: Nslcmop update record
+ """
+ vca_scaling_info = []
+ scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
+ scaling_info["scaling_direction"] = "IN"
+ scaling_info["vdu-delete"] = {}
+ scaling_info["kdu-delete"] = {}
+ db_vdur = db_vnfr.get("vdur")
+ vdur_list = copy(db_vdur)
+ count_index = 0
+ for index, vdu in enumerate(vdur_list):
+ vca_scaling_info.append(
+ {
+ "osm_vdu_id": vdu["vdu-id-ref"],
+ "member-vnf-index": member_vnf_index,
+ "type": "delete",
+ "vdu_index": count_index,
+ })
+ scaling_info["vdu-delete"][vdu["vdu-id-ref"]] = count_index
+ scaling_info["vdu"].append(
+ {
+ "name": vdu.get("name") or vdu.get("vdu-name"),
+ "vdu_id": vdu["vdu-id-ref"],
+ "interface": [],
+ })
+ for interface in vdu["interfaces"]:
+ scaling_info["vdu"][index]["interface"].append(
+ {
+ "name": interface["name"],
+ "ip_address": interface["ip-address"],
+ "mac_address": interface.get("mac-address"),
+ })
+ self.logger.info("NS update scaling info{}".format(scaling_info))
+ stage[2] = "Terminating VDUs"
+ if scaling_info.get("vdu-delete"):
+ # scale_process = "RO"
+ if self.ro_config.get("ng"):
+ await self._scale_ng_ro(
+ logging_text, db_nsr, update_db_nslcmops, db_vnfr, scaling_info, stage
+ )
+
+ async def remove_vnf(
+ self, nsr_id, nslcmop_id, vnf_instance_id
+ ):
+ """This method is to Remove VNF instances from NS.
+
+ Args:
+ nsr_id: NS instance id
+ nslcmop_id: nslcmop id of update
+ vnf_instance_id: id of the VNF instance to be removed
+
+ Returns:
+ result: (str, str) COMPLETED/FAILED, details
+ """
+ try:
+ db_nsr_update = {}
+ logging_text = "Task ns={} update ".format(nsr_id)
+ check_vnfr_count = len(self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}))
+ self.logger.info("check_vnfr_count {}".format(check_vnfr_count))
+ if check_vnfr_count > 1:
+ stage = ["", "", ""]
+ step = "Getting nslcmop from database"
+ self.logger.debug(step + " after having waited for previous tasks to be completed")
+ # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+ """ db_vnfr = self.db.get_one(
+ "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
+
+ update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ await self.terminate_vdus(db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text)
+
+ constituent_vnfr = db_nsr.get("constituent-vnfr-ref")
+ constituent_vnfr.remove(db_vnfr.get("_id"))
+ db_nsr_update["constituent-vnfr-ref"] = db_nsr.get("constituent-vnfr-ref")
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self.db.del_one("vnfrs", {"_id": db_vnfr.get("_id")})
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ return "COMPLETED", "Done"
+ else:
+ step = "Terminate VNF Failed with"
+ raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
+ vnf_instance_id))
+ except (LcmException, asyncio.CancelledError):
+ raise
+ except Exception as e:
+ self.logger.debug("Error removing VNF {}".format(e))
+ return "FAILED", "Error removing VNF {}".format(e)
+
+ async def _ns_redeploy_vnf(
+ self, nsr_id, nslcmop_id, db_vnfd, db_vnfr, db_nsr,
+ ):
+ """This method updates and redeploys VNF instances
+
+ Args:
+ nsr_id: NS instance id
+ nslcmop_id: nslcmop id
+ db_vnfd: VNF descriptor
+ db_vnfr: VNF instance record
+ db_nsr: NS instance record
+
+ Returns:
+ result: (str, str) COMPLETED/FAILED, details
+ """
+ try:
+ count_index = 0
+ stage = ["", "", ""]
+ logging_text = "Task ns={} update ".format(nsr_id)
+ latest_vnfd_revision = db_vnfd["_admin"].get("revision")
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+
+ # Terminate old VNF resources
+ update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ await self.terminate_vdus(db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text)
+
+ # old_vnfd_id = db_vnfr["vnfd-id"]
+ # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+ new_db_vnfd = db_vnfd
+ # new_vnfd_ref = new_db_vnfd["id"]
+ # new_vnfd_id = vnfd_id
+
+ # Create VDUR
+ new_vnfr_cp = []
+ for cp in new_db_vnfd.get("ext-cpd", ()):
+ vnf_cp = {
+ "name": cp.get("id"),
+ "connection-point-id": cp.get("int-cpd", {}).get("cpd"),
+ "connection-point-vdu-id": cp.get("int-cpd", {}).get("vdu-id"),
+ "id": cp.get("id"),
+ }
+ new_vnfr_cp.append(vnf_cp)
+ new_vdur = update_db_nslcmops["operationParams"]["newVdur"]
+ # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
+ # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
+ new_vnfr_update = {"revision": latest_vnfd_revision, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
+ self.update_db_2("vnfrs", db_vnfr["_id"], new_vnfr_update)
+ updated_db_vnfr = self.db.get_one(
+ "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}
+ )
+
+ # Instantiate new VNF resources
+ # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ vca_scaling_info = []
+ scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
+ scaling_info["scaling_direction"] = "OUT"
+ scaling_info["vdu-create"] = {}
+ scaling_info["kdu-create"] = {}
+ vdud_instantiate_list = db_vnfd["vdu"]
+ for index, vdud in enumerate(vdud_instantiate_list):
+ cloud_init_text = self._get_vdu_cloud_init_content(
+ vdud, db_vnfd
+ )
+ if cloud_init_text:
+ additional_params = (
+ self._get_vdu_additional_params(updated_db_vnfr, vdud["id"])
+ or {}
+ )
+ cloud_init_list = []
+ if cloud_init_text:
+ # TODO Information of its own ip is not available because db_vnfr is not updated.
+ additional_params["OSM"] = get_osm_params(
+ updated_db_vnfr, vdud["id"], 1
+ )
+ cloud_init_list.append(
+ self._parse_cloud_init(
+ cloud_init_text,
+ additional_params,
+ db_vnfd["id"],
+ vdud["id"],
+ )
+ )
+ vca_scaling_info.append(
+ {
+ "osm_vdu_id": vdud["id"],
+ "member-vnf-index": member_vnf_index,
+ "type": "create",
+ "vdu_index": count_index,
+ }
+ )
+ scaling_info["vdu-create"][vdud["id"]] = count_index
+ if self.ro_config.get("ng"):
+ self.logger.debug(
+ "New Resources to be deployed: {}".format(scaling_info))
+ await self._scale_ng_ro(
+ logging_text, db_nsr, update_db_nslcmops, updated_db_vnfr, scaling_info, stage
+ )
+ return "COMPLETED", "Done"
+ except (LcmException, asyncio.CancelledError):
+ raise
+ except Exception as e:
+ self.logger.debug("Error updating VNF {}".format(e))
+ return "FAILED", "Error updating VNF {}".format(e)
+
+ async def _ns_charm_upgrade(
+ self,
+ ee_id,
+ charm_id,
+ charm_type,
+ path,
+ timeout: float = None,
+ ) -> (str, str):
+ """This method upgrade charms in VNF instances
+
+ Args:
+ ee_id: Execution environment id
+ path: Local path to the charm
+ charm_id: charm-id
+ charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
+ timeout: (Float) Timeout for the ns update operation
+
+ Returns:
+ result: (str, str) COMPLETED/FAILED, details
+ """
+ try:
+ charm_type = charm_type or "lxc_proxy_charm"
+ output = await self.vca_map[charm_type].upgrade_charm(
+ ee_id=ee_id,
+ path=path,
+ charm_id=charm_id,
+ charm_type=charm_type,
+ timeout=timeout or self.timeout_ns_update,
+ )
+
+ if output:
+ return "COMPLETED", output
+
+ except (LcmException, asyncio.CancelledError):
+ raise
+
+ except Exception as e:
+
+ self.logger.debug("Error upgrading charm {}".format(path))
+
+ return "FAILED", "Error upgrading charm {}: {}".format(path, e)
+
+ async def update(self, nsr_id, nslcmop_id):
+ """Update NS according to different update types
+
+ This method performs upgrade of VNF instances then updates the revision
+ number in VNF record
+
+ Args:
+ nsr_id: Network service will be updated
+ nslcmop_id: ns lcm operation id
+
+ Returns:
+ It may raise DbException, LcmException, N2VCException, K8sException
+
+ """
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+
+ logging_text = "Task ns={} update={} ".format(nsr_id, nslcmop_id)
+ self.logger.debug(logging_text + "Enter")
+
+ # Set the required variables to be filled up later
+ db_nsr = None
+ db_nslcmop_update = {}
+ vnfr_update = {}
+ nslcmop_operation_state = None
+ db_nsr_update = {}
+ error_description_nslcmop = ""
+ exc = None
+ change_type = "updated"
+ detailed_status = ""
+
+ try:
+ # wait for any previous tasks in process
+ step = "Waiting for previous operations to terminate"
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="UPDATING",
+ current_operation_id=nslcmop_id,
+ )
+
+ step = "Getting nslcmop from database"
+ db_nslcmop = self.db.get_one(
+ "nslcmops", {"_id": nslcmop_id}, fail_on_empty=False
+ )
+ update_type = db_nslcmop["operationParams"]["updateType"]
+
+ step = "Getting nsr from database"
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ old_operational_status = db_nsr["operational-status"]
+ db_nsr_update["operational-status"] = "updating"
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ nsr_deployed = db_nsr["_admin"].get("deployed")
+
+ if update_type == "CHANGE_VNFPKG":
+
+ # Get the input parameters given through update request
+ vnf_instance_id = db_nslcmop["operationParams"][
+ "changeVnfPackageData"
+ ].get("vnfInstanceId")
+
+ vnfd_id = db_nslcmop["operationParams"]["changeVnfPackageData"].get(
+ "vnfdId"
+ )
+ timeout_seconds = db_nslcmop["operationParams"].get("timeout_ns_update")
+
+ step = "Getting vnfr from database"
+ db_vnfr = self.db.get_one(
+ "vnfrs", {"_id": vnf_instance_id}, fail_on_empty=False
+ )
+
+ step = "Getting vnfds from database"
+ # Latest VNFD
+ latest_vnfd = self.db.get_one(
+ "vnfds", {"_id": vnfd_id}, fail_on_empty=False
+ )
+ latest_vnfd_revision = latest_vnfd["_admin"].get("revision")
+
+ # Current VNFD
+ current_vnf_revision = db_vnfr.get("revision", 1)
+ current_vnfd = self.db.get_one(
+ "vnfds_revisions",
+ {"_id": vnfd_id + ":" + str(current_vnf_revision)},
+ fail_on_empty=False,
+ )
+ # Charm artifact paths will be filled up later
+ (
+ current_charm_artifact_path,
+ target_charm_artifact_path,
+ charm_artifact_paths,
+ ) = ([], [], [])
+
+ step = "Checking if revision has changed in VNFD"
+ if current_vnf_revision != latest_vnfd_revision:
+
+ change_type = "policy_updated"
+
+ # There is new revision of VNFD, update operation is required
+ current_vnfd_path = vnfd_id + ":" + str(current_vnf_revision)
+ latest_vnfd_path = vnfd_id
+
+ step = "Removing the VNFD packages if they exist in the local path"
+ shutil.rmtree(self.fs.path + current_vnfd_path, ignore_errors=True)
+ shutil.rmtree(self.fs.path + latest_vnfd_path, ignore_errors=True)
+
+ step = "Get the VNFD packages from FSMongo"
+ self.fs.sync(from_path=latest_vnfd_path)
+ self.fs.sync(from_path=current_vnfd_path)
+
+ step = (
+ "Get the charm-type, charm-id, ee-id if there is deployed VCA"
+ )
+ base_folder = latest_vnfd["_admin"]["storage"]
+
+ for charm_index, charm_deployed in enumerate(
+ get_iterable(nsr_deployed, "VCA")
+ ):
+ vnf_index = db_vnfr.get("member-vnf-index-ref")
+
+ # Getting charm-id and charm-type
+ if charm_deployed.get("member-vnf-index") == vnf_index:
+ charm_id = self.get_vca_id(db_vnfr, db_nsr)
+ charm_type = charm_deployed.get("type")
+
+ # Getting ee-id
+ ee_id = charm_deployed.get("ee_id")
+
+ step = "Getting descriptor config"
+ descriptor_config = get_configuration(
+ current_vnfd, current_vnfd["id"]
+ )
+
+ if "execution-environment-list" in descriptor_config:
+ ee_list = descriptor_config.get(
+ "execution-environment-list", []
+ )
+ else:
+ ee_list = []
+
+ # There could be several charm used in the same VNF
+ for ee_item in ee_list:
+ if ee_item.get("juju"):
+
+ step = "Getting charm name"
+ charm_name = ee_item["juju"].get("charm")
+
+ step = "Setting Charm artifact paths"
+ current_charm_artifact_path.append(
+ get_charm_artifact_path(
+ base_folder,
+ charm_name,
+ charm_type,
+ current_vnf_revision,
+ )
+ )
+ target_charm_artifact_path.append(
+ get_charm_artifact_path(
+ base_folder,
+ charm_name,
+ charm_type,
+ )
+ )
+
+ charm_artifact_paths = zip(
+ current_charm_artifact_path, target_charm_artifact_path
+ )
+
+ step = "Checking if software version has changed in VNFD"
+ if find_software_version(current_vnfd) != find_software_version(
+ latest_vnfd
+ ):
+
+ step = "Checking if existing VNF has charm"
+ for current_charm_path, target_charm_path in list(
+ charm_artifact_paths
+ ):
+ if current_charm_path:
+ raise LcmException(
+ "Software version change is not supported as VNF instance {} has charm.".format(
+ vnf_instance_id
+ )
+ )
+
+ # There is no change in the charm package, then redeploy the VNF
+ # based on new descriptor
+ step = "Redeploying VNF"
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+ (
+ result,
+ detailed_status
+ ) = await self._ns_redeploy_vnf(
+ nsr_id,
+ nslcmop_id,
+ latest_vnfd,
+ db_vnfr,
+ db_nsr
+ )
+ if result == "FAILED":
+ nslcmop_operation_state = result
+ error_description_nslcmop = detailed_status
+ db_nslcmop_update["detailed-status"] = detailed_status
+ self.logger.debug(
+ logging_text
+ + " step {} Done with result {} {}".format(
+ step, nslcmop_operation_state, detailed_status
+ )
+ )
+
+ else:
+ step = "Checking if any charm package has changed or not"
+ for current_charm_path, target_charm_path in list(
+ charm_artifact_paths
+ ):
+ if (
+ current_charm_path
+ and target_charm_path
+ and self.check_charm_hash_changed(
+ current_charm_path, target_charm_path
+ )
+ ):
+
+ step = "Checking whether VNF uses juju bundle"
+ if check_juju_bundle_existence(current_vnfd):
+
+ raise LcmException(
+ "Charm upgrade is not supported for the instance which"
+ " uses juju-bundle: {}".format(
+ check_juju_bundle_existence(current_vnfd)
+ )
+ )
+
+ step = "Upgrading Charm"
+ (
+ result,
+ detailed_status,
+ ) = await self._ns_charm_upgrade(
+ ee_id=ee_id,
+ charm_id=charm_id,
+ charm_type=charm_type,
+ path=self.fs.path + target_charm_path,
+ timeout=timeout_seconds,
+ )
+
+ if result == "FAILED":
+ nslcmop_operation_state = result
+ error_description_nslcmop = detailed_status
+
+ db_nslcmop_update["detailed-status"] = detailed_status
+ self.logger.debug(
+ logging_text
+ + " step {} Done with result {} {}".format(
+ step, nslcmop_operation_state, detailed_status
+ )
+ )
+
+ step = "Updating policies"
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+ result = "COMPLETED"
+ detailed_status = "Done"
+ db_nslcmop_update["detailed-status"] = "Done"
+
+ # If nslcmop_operation_state is None, so any operation is not failed.
+ if not nslcmop_operation_state:
+ nslcmop_operation_state = "COMPLETED"
+
+ # If update CHANGE_VNFPKG nslcmop_operation is successful
+ # vnf revision need to be updated
+ vnfr_update["revision"] = latest_vnfd_revision
+ self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
+
+ self.logger.debug(
+ logging_text
+ + " task Done with result {} {}".format(
+ nslcmop_operation_state, detailed_status
+ )
+ )
+ elif update_type == "REMOVE_VNF":
+ # This part is included in https://osm.etsi.org/gerrit/11876
+ vnf_instance_id = db_nslcmop["operationParams"]["removeVnfInstanceId"]
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+ step = "Removing VNF"
+ (result, detailed_status) = await self.remove_vnf(nsr_id, nslcmop_id, vnf_instance_id)
+ if result == "FAILED":
+ nslcmop_operation_state = result
+ error_description_nslcmop = detailed_status
+ db_nslcmop_update["detailed-status"] = detailed_status
+ change_type = "vnf_terminated"
+ if not nslcmop_operation_state:
+ nslcmop_operation_state = "COMPLETED"
+ self.logger.debug(
+ logging_text
+ + " task Done with result {} {}".format(
+ nslcmop_operation_state, detailed_status
+ )
+ )
+
+ # If nslcmop_operation_state is None, so any operation is not failed.
+ # All operations are executed in overall.
+ if not nslcmop_operation_state:
+ nslcmop_operation_state = "COMPLETED"
+ db_nsr_update["operational-status"] = old_operational_status
+
+ except (DbException, LcmException, N2VCException, K8sException) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(
+ logging_text + "Cancelled Exception while '{}'".format(step)
+ )
+ exc = "Operation was cancelled"
+ except asyncio.TimeoutError:
+ self.logger.error(logging_text + "Timeout while '{}'".format(step))
+ exc = "Timeout"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(
+ logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
+ exc_info=True,
+ )
+ finally:
+ if exc:
+ db_nslcmop_update[
+ "detailed-status"
+ ] = (
+ detailed_status
+ ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
+ nslcmop_operation_state = "FAILED"
+ db_nsr_update["operational-status"] = old_operational_status
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=db_nsr["nsState"],
+ current_operation="IDLE",
+ current_operation_id=None,
+ other_update=db_nsr_update,
+ )
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+
+ if nslcmop_operation_state:
+ try:
+ msg = {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ }
+ if change_type in ("vnf_terminated", "policy_updated"):
+ msg.update({"vnf_member_index": member_vnf_index})
+ await self.msg.aiowrite("ns", change_type, msg, loop=self.loop)
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_update")
+ return nslcmop_operation_state, detailed_status
+
async def scale(self, nsr_id, nslcmop_id):
# Try to lock HA task here
task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
] = "Deleting the execution environments"
scale_process = "VCA"
for vca_info in vca_scaling_info:
- if vca_info["type"] == "delete":
+ if vca_info["type"] == "delete" and not vca_info.get("osm_kdu_id"):
member_vnf_index = str(vca_info["member-vnf-index"])
self.logger.debug(
logging_text + "vdu info: {}".format(vca_info)
] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
member_vnf_index, vdu_id, vdu_index
)
- else:
- vdu_index = 0
- kdu_id = vca_info["osm_kdu_id"]
- stage[
- 1
- ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
- member_vnf_index, kdu_id, vdu_index
- )
stage[2] = step = "Scaling in VCA"
self._write_op_status(op_id=nslcmop_id, stage=stage)
vca_update = db_nsr["_admin"]["deployed"]["VCA"]
] = "Creating new execution environments"
scale_process = "VCA"
for vca_info in vca_scaling_info:
- if vca_info["type"] == "create":
+ if vca_info["type"] == "create" and not vca_info.get("osm_kdu_id"):
member_vnf_index = str(vca_info["member-vnf-index"])
self.logger.debug(
logging_text + "vdu info: {}".format(vca_info)
task_instantiation_info=tasks_dict_info,
stage=stage,
)
- else:
- kdu_name = vca_info["osm_kdu_id"]
- descriptor_config = get_configuration(db_vnfd, kdu_name)
- if descriptor_config:
- vdu_id = None
- kdu_index = int(vca_info["kdu_index"])
- vdu_name = None
- kdur = next(
- x
- for x in db_vnfr["kdur"]
- if x["kdu-name"] == kdu_name
- )
- deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
- if kdur.get("additionalParams"):
- deploy_params_kdu = parse_yaml_strings(
- kdur["additionalParams"]
- )
-
- self._deploy_n2vc(
- logging_text=logging_text,
- db_nsr=db_nsr,
- db_vnfr=db_vnfr,
- nslcmop_id=nslcmop_id,
- nsr_id=nsr_id,
- nsi_id=nsi_id,
- vnfd_id=vnfd_id,
- vdu_id=vdu_id,
- kdu_name=kdu_name,
- member_vnf_index=member_vnf_index,
- vdu_index=kdu_index,
- vdu_name=vdu_name,
- deploy_params=deploy_params_kdu,
- descriptor_config=descriptor_config,
- base_folder=base_folder,
- task_instantiation_info=tasks_dict_info,
- stage=stage,
- )
# SCALE-UP VCA - END
scale_process = None
)
async def extract_prometheus_scrape_jobs(
- self,
- ee_id,
- artifact_path,
- ee_config_descriptor,
- vnfr_id,
- nsr_id,
- target_ip
+ self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
):
# look if exist a file called 'prometheus*.j2' and
artifact_content = self.fs.dir_ls(artifact_path)
"""
config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")
+
+ async def migrate(self, nsr_id, nslcmop_id):
+ """
+ Migrate VNFs and VDUs instances in a NS
+
+ :param: nsr_id: NS Instance ID
+ :param: nslcmop_id: nslcmop ID of migrate
+
+ """
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+ logging_text = "Task ns={} migrate ".format(nsr_id)
+ self.logger.debug(logging_text + "Enter")
+ # get all needed from database
+ db_nslcmop = None
+ db_nslcmop_update = {}
+ nslcmop_operation_state = None
+ db_nsr_update = {}
+ target = {}
+ exc = None
+ # in case of error, indicates what part of scale was failed to put nsr at error status
+ start_deploy = time()
+
+ try:
+ # wait for any previous tasks in process
+ step = "Waiting for previous operations to terminate"
+ await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
+
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="MIGRATING",
+ current_operation_id=nslcmop_id
+ )
+ step = "Getting nslcmop from database"
+ self.logger.debug(step + " after having waited for previous tasks to be completed")
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ migrate_params = db_nslcmop.get("operationParams")
+
+ target = {}
+ target.update(migrate_params)
+ desc = await self.RO.migrate(nsr_id, target)
+ self.logger.debug("RO return > {}".format(desc))
+ action_id = desc["action_id"]
+ await self._wait_ng_ro(
+ nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_migrate
+ )
+ except (ROclient.ROClientException, DbException, LcmException) as e:
+ self.logger.error("Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error("Cancelled Exception while '{}'".format(step))
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical("Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
+ finally:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="IDLE",
+ current_operation_id=None,
+ )
+ if exc:
+ db_nslcmop_update[
+ "detailed-status"
+ ] = "FAILED {}: {}".format(step, exc)
+ nslcmop_operation_state = "FAILED"
+ else:
+ nslcmop_operation_state = "COMPLETED"
+ db_nslcmop_update["detailed-status"] = "Done"
+ db_nsr_update["detailed-status"] = "Done"
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message="",
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+ if nslcmop_operation_state:
+ try:
+ msg = {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ }
+ await self.msg.aiowrite("ns", "migrated", msg, loop=self.loop)
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_migrate")