##
import asyncio
+import shutil
+from typing import Any, Dict, List
import yaml
import logging
import logging.handlers
TemplateNotFound,
StrictUndefined,
UndefinedError,
+ select_autoescape,
)
from osm_lcm import ROclient
-from osm_lcm.data_utils.nsr import get_deployed_kdu
+from osm_lcm.data_utils.lcm_config import LcmCfg
+from osm_lcm.data_utils.nsr import (
+ get_deployed_kdu,
+ get_deployed_vca,
+ get_deployed_vca_list,
+ get_nsd,
+)
+from osm_lcm.data_utils.vca import (
+ DeployedComponent,
+ DeployedK8sResource,
+ DeployedVCA,
+ EELevel,
+ Relation,
+ EERelation,
+ safe_get_ee_relation,
+)
from osm_lcm.ng_ro import NgRoClient, NgRoException
from osm_lcm.lcm_utils import (
LcmException,
deep_get,
get_iterable,
populate_dict,
+ check_juju_bundle_existence,
+ get_charm_artifact_path,
+ get_ee_id_parts,
+ vld_to_ro_ip_profile,
+)
+from osm_lcm.data_utils.nsd import (
+ get_ns_configuration_relation_list,
+ get_vnf_profile,
+ get_vnf_profiles,
)
-from osm_lcm.data_utils.nsd import get_vnf_profiles
from osm_lcm.data_utils.vnfd import (
+ get_kdu,
+ get_kdu_services,
+ get_relation_list,
get_vdu_list,
get_vdu_profile,
get_ee_sorted_initial_config_primitive_list,
get_scaling_aspect,
get_number_of_instances,
get_juju_ee_ref,
- get_kdu_profile,
+ get_kdu_resource_profile,
+ find_software_version,
+ check_helm_ee_in_ns,
)
from osm_lcm.data_utils.list_utils import find_in_list
-from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
+from osm_lcm.data_utils.vnfr import (
+ get_osm_params,
+ get_vdur_index,
+ get_kdur,
+ get_volumes_from_instantiation_params,
+)
from osm_lcm.data_utils.dict_utils import parse_yaml_strings
from osm_lcm.data_utils.database.vim_account import VimAccountDB
+from n2vc.definitions import RelationEndpoint
from n2vc.k8s_helm_conn import K8sHelmConnector
from n2vc.k8s_helm3_conn import K8sHelm3Connector
from n2vc.k8s_juju_conn import K8sJujuConnector
from osm_lcm.data_utils.database.database import Database
from osm_lcm.data_utils.filesystem.filesystem import Filesystem
+from osm_lcm.data_utils.wim import (
+ get_sdn_ports,
+ get_target_wim_attrs,
+ select_feasible_wim_account,
+)
from n2vc.n2vc_juju_conn import N2VCJujuConnector
from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
from osm_lcm.lcm_helm_conn import LCMHelmConn
+from osm_lcm.osm_config import OsmConfigBuilder
+from osm_lcm.prometheus import parse_job
from copy import copy, deepcopy
from time import time
class NsLcm(LcmBase):
- timeout_vca_on_error = (
- 5 * 60
- ) # Time for charm from first time at blocked,error status to mark as failed
- timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
- timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
- timeout_charm_delete = 10 * 60
- timeout_primitive = 30 * 60 # timeout for primitive execution
- timeout_progress_primitive = (
- 10 * 60
- ) # timeout for some progress in a primitive execution
-
SUBOPERATION_STATUS_NOT_FOUND = -1
SUBOPERATION_STATUS_NEW = -2
SUBOPERATION_STATUS_SKIP = -3
task_name_deploy_vca = "Deploying VCA"
- def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
+ def __init__(self, msg, lcm_tasks, config: LcmCfg, loop):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
self.fs = Filesystem().instance.fs
self.loop = loop
self.lcm_tasks = lcm_tasks
- self.timeout = config["timeout"]
- self.ro_config = config["ro_config"]
- self.ng_ro = config["ro_config"].get("ng")
- self.vca_config = config["VCA"].copy()
+ self.timeout = config.timeout
+ self.ro_config = config.RO
+ self.vca_config = config.VCA
# create N2VC connector
self.n2vc = N2VCJujuConnector(
)
self.k8sclusterhelm2 = K8sHelmConnector(
- kubectl_command=self.vca_config.get("kubectlpath"),
- helm_command=self.vca_config.get("helmpath"),
+ kubectl_command=self.vca_config.kubectlpath,
+ helm_command=self.vca_config.helmpath,
log=self.logger,
on_update_db=None,
fs=self.fs,
)
self.k8sclusterhelm3 = K8sHelm3Connector(
- kubectl_command=self.vca_config.get("kubectlpath"),
- helm_command=self.vca_config.get("helm3path"),
+ kubectl_command=self.vca_config.kubectlpath,
+ helm_command=self.vca_config.helm3path,
fs=self.fs,
log=self.logger,
db=self.db,
)
self.k8sclusterjuju = K8sJujuConnector(
- kubectl_command=self.vca_config.get("kubectlpath"),
- juju_command=self.vca_config.get("jujupath"),
+ kubectl_command=self.vca_config.kubectlpath,
+ juju_command=self.vca_config.jujupath,
log=self.logger,
loop=self.loop,
on_update_db=self._on_update_k8s_db,
"helm-v3": self.conn_helm_ee,
}
- self.prometheus = prometheus
-
# create RO client
- self.RO = NgRoClient(self.loop, **self.ro_config)
+ self.RO = NgRoClient(self.loop, **self.ro_config.to_dict())
+
+ self.op_status_map = {
+ "instantiation": self.RO.status,
+ "termination": self.RO.status,
+ "migrate": self.RO.status,
+ "healing": self.RO.recreate_status,
+ "verticalscale": self.RO.status,
+ "start_stop_rebuild": self.RO.status,
+ }
@staticmethod
def increment_ip_mac(ip_mac, vm_index=1):
return None
def _on_update_ro_db(self, nsrs_id, ro_descriptor):
-
# self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
try:
)
async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
-
# remove last dot from path (if exists)
if path.endswith("."):
path = path[:-1]
# self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
# .format(table, filter, path, updated_data))
try:
-
nsr_id = filter.get("_id")
# read ns record from database
# vcaStatus
db_dict = dict()
db_dict["vcaStatus"] = status_dict
- await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
# update configurationStatus for this VCA
try:
self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
async def _on_update_k8s_db(
- self, cluster_uuid, kdu_instance, filter=None, vca_id=None
+ self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju"
):
"""
Updating vca status in NSR record
:param cluster_uuid: UUID of a k8s cluster
:param kdu_instance: The unique name of the KDU instance
:param filter: To get nsr_id
+ :cluster_type: The cluster type (juju, k8s)
:return: none
"""
# self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
# .format(cluster_uuid, kdu_instance, filter))
+ nsr_id = filter.get("_id")
try:
- nsr_id = filter.get("_id")
-
- # get vca status for NS
- vca_status = await self.k8sclusterjuju.status_kdu(
- cluster_uuid,
- kdu_instance,
- complete_status=True,
+ vca_status = await self.k8scluster_map[cluster_type].status_kdu(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
yaml_format=False,
+ complete_status=True,
vca_id=vca_id,
)
+
# vcaStatus
db_dict = dict()
db_dict["vcaStatus"] = {nsr_id: vca_status}
- await self.k8sclusterjuju.update_vca_status(
- db_dict["vcaStatus"],
- kdu_instance,
- vca_id=vca_id,
+ self.logger.debug(
+ f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
)
# write to database
self.update_db_2("nsrs", nsr_id, db_dict)
-
except (asyncio.CancelledError, asyncio.TimeoutError):
raise
except Exception as e:
@staticmethod
def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
try:
- env = Environment(undefined=StrictUndefined)
+ env = Environment(
+ undefined=StrictUndefined,
+ autoescape=select_autoescape(default_for_string=True, default=True),
+ )
template = env.from_string(cloud_init_text)
return template.render(additional_params or {})
except UndefinedError as e:
try:
if vdu.get("cloud-init-file"):
base_folder = vnfd["_admin"]["storage"]
- cloud_init_file = "{}/{}/cloud_init/{}".format(
- base_folder["folder"],
- base_folder["pkg-dir"],
- vdu["cloud-init-file"],
- )
+ if base_folder["pkg-dir"]:
+ cloud_init_file = "{}/{}/cloud_init/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ vdu["cloud-init-file"],
+ )
+ else:
+ cloud_init_file = "{}/Scripts/cloud_init/{}".format(
+ base_folder["folder"],
+ vdu["cloud-init-file"],
+ )
with self.fs.file_open(cloud_init_file, "r") as ci_file:
cloud_init_content = ci_file.read()
elif vdu.get("cloud-init"):
def _get_vdu_additional_params(self, db_vnfr, vdu_id):
vdur = next(
- vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]
+ (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]), {}
)
additional_params = vdur.get("additionalParams")
return parse_yaml_strings(additional_params)
return wim_account
def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
-
db_vdu_push_list = []
+ template_vdur = []
db_update = {"_admin.modified": time()}
if vdu_create:
for vdu_id, vdu_count in vdu_create.items():
None,
)
if not vdur:
- raise LcmException(
- "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
- vdu_id
+ # Read the template saved in the db:
+ self.logger.debug(
+ "No vdur in the database. Using the vdur-template to scale"
+ )
+ vdur_template = db_vnfr.get("vdur-template")
+ if not vdur_template:
+ raise LcmException(
+ "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
+ vdu_id
+ )
)
+ vdur = vdur_template[0]
+ # Delete a template from the database after using it
+ self.db.set_one(
+ "vnfrs",
+ {"_id": db_vnfr["_id"]},
+ None,
+ pull={"vdur-template": {"_id": vdur["_id"]}},
)
-
for count in range(vdu_count):
vdur_copy = deepcopy(vdur)
vdur_copy["status"] = "BUILD"
vdur_copy["status-detailed"] = None
- vdur_copy["ip-address"]: None
+ vdur_copy["ip-address"] = None
vdur_copy["_id"] = str(uuid4())
vdur_copy["count-index"] += count + 1
vdur_copy["id"] = "{}-{}".format(
)
else:
iface.pop("mac-address", None)
- iface.pop(
- "mgmt_vnf", None
- ) # only first vdu can be managment of vnf
+ if db_vnfr["vdur"]:
+ iface.pop(
+ "mgmt_vnf", None
+ ) # only first vdu can be managment of vnf
db_vdu_push_list.append(vdur_copy)
# self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
if vdu_delete:
+ if len(db_vnfr["vdur"]) == 1:
+ # The scale will move to 0 instances
+ self.logger.debug(
+ "Scaling to 0 !, creating the template with the last vdur"
+ )
+ template_vdur = [db_vnfr["vdur"][0]]
for vdu_id, vdu_count in vdu_delete.items():
if mark_delete:
indexes_to_delete = [
None,
pull={"vdur": {"_id": vdu["_id"]}},
)
- db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
+ db_push = {}
+ if db_vdu_push_list:
+ db_push["vdur"] = db_vdu_push_list
+ if template_vdur:
+ db_push["vdur-template"] = template_vdur
+ if not db_push:
+ db_push = None
+ db_vnfr["vdur-template"] = template_vdur
self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
# modify passed dictionary db_vnfr
db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
start_deploy,
timeout_ns_deploy,
):
-
db_vims = {}
def get_vim_account(vim_account_id):
target_vim, target_vld, vld_params, target_sdn
):
if vld_params.get("ip-profile"):
- target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
- "ip-profile"
- ]
+ target_vld["vim_info"][target_vim]["ip_profile"] = vld_to_ro_ip_profile(
+ vld_params["ip-profile"]
+ )
if vld_params.get("provider-network"):
target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
"provider-network"
target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
"provider-network"
]["sdn-ports"]
- if vld_params.get("wimAccountId"):
- target_wim = "wim:{}".format(vld_params["wimAccountId"])
- target_vld["vim_info"][target_wim] = {}
+
+ # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
+ # if wim_account_id is specified in vld_params, validate if it is feasible.
+ wim_account_id, db_wim = select_feasible_wim_account(
+ db_nsr, db_vnfrs, target_vld, vld_params, self.logger
+ )
+
+ if wim_account_id:
+ # WIM is needed and a feasible one was found, populate WIM target and SDN ports
+ self.logger.info("WIM selected: {:s}".format(str(wim_account_id)))
+ # update vld_params with correct WIM account Id
+ vld_params["wimAccountId"] = wim_account_id
+
+ target_wim = "wim:{}".format(wim_account_id)
+ target_wim_attrs = get_target_wim_attrs(nsr_id, target_vld, vld_params)
+ sdn_ports = get_sdn_ports(vld_params, db_wim)
+ if len(sdn_ports) > 0:
+ target_vld["vim_info"][target_wim] = target_wim_attrs
+ target_vld["vim_info"][target_wim]["sdn-ports"] = sdn_ports
+
+ self.logger.debug(
+ "Target VLD with WIM data: {:s}".format(str(target_vld))
+ )
+
for param in ("vim-network-name", "vim-network-id"):
if vld_params.get(param):
if isinstance(vld_params[param], dict):
if vld_params.get("common_id"):
target_vld["common_id"] = vld_params.get("common_id")
+ # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
+ def update_ns_vld_target(target, ns_params):
+ for vnf_params in ns_params.get("vnf", ()):
+ if vnf_params.get("vimAccountId"):
+ target_vnf = next(
+ (
+ vnfr
+ for vnfr in db_vnfrs.values()
+ if vnf_params["member-vnf-index"]
+ == vnfr["member-vnf-index-ref"]
+ ),
+ None,
+ )
+ vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
+ if not vdur:
+ return
+ for a_index, a_vld in enumerate(target["ns"]["vld"]):
+ target_vld = find_in_list(
+ get_iterable(vdur, "interfaces"),
+ lambda iface: iface.get("ns-vld-id") == a_vld["name"],
+ )
+
+ vld_params = find_in_list(
+ get_iterable(ns_params, "vld"),
+ lambda v_vld: v_vld["name"] in (a_vld["name"], a_vld["id"]),
+ )
+ if target_vld:
+ if vnf_params.get("vimAccountId") not in a_vld.get(
+ "vim_info", {}
+ ):
+ target_vim_network_list = [
+ v for _, v in a_vld.get("vim_info").items()
+ ]
+ target_vim_network_name = next(
+ (
+ item.get("vim_network_name", "")
+ for item in target_vim_network_list
+ ),
+ "",
+ )
+
+ target["ns"]["vld"][a_index].get("vim_info").update(
+ {
+ "vim:{}".format(vnf_params["vimAccountId"]): {
+ "vim_network_name": target_vim_network_name,
+ }
+ }
+ )
+
+ if vld_params:
+ for param in ("vim-network-name", "vim-network-id"):
+ if vld_params.get(param) and isinstance(
+ vld_params[param], dict
+ ):
+ for vim, vim_net in vld_params[
+ param
+ ].items():
+ other_target_vim = "vim:" + vim
+ populate_dict(
+ target["ns"]["vld"][a_index].get(
+ "vim_info"
+ ),
+ (
+ other_target_vim,
+ param.replace("-", "_"),
+ ),
+ vim_net,
+ )
+
nslcmop_id = db_nslcmop["_id"]
target = {
"name": db_nsr["name"],
image["vim_info"] = {}
for flavor in target["flavor"]:
flavor["vim_info"] = {}
+ if db_nsr.get("affinity-or-anti-affinity-group"):
+ target["affinity-or-anti-affinity-group"] = deepcopy(
+ db_nsr["affinity-or-anti-affinity-group"]
+ )
+ for affinity_or_anti_affinity_group in target[
+ "affinity-or-anti-affinity-group"
+ ]:
+ affinity_or_anti_affinity_group["vim_info"] = {}
if db_nslcmop.get("lcmOperationType") != "instantiate":
# get parameters of instantiation:
# check if this network needs SDN assist
if vld.get("pci-interfaces"):
db_vim = get_vim_account(ns_params["vimAccountId"])
- sdnc_id = db_vim["config"].get("sdn-controller")
- if sdnc_id:
- sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
- target_sdn = "sdn:{}".format(sdnc_id)
- target_vld["vim_info"][target_sdn] = {
- "sdn": True,
- "target_vim": target_vim,
- "vlds": [sdn_vld],
- "type": vld.get("type"),
- }
+ if vim_config := db_vim.get("config"):
+ if sdnc_id := vim_config.get("sdn-controller"):
+ sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
+ target_sdn = "sdn:{}".format(sdnc_id)
+ target_vld["vim_info"][target_sdn] = {
+ "sdn": True,
+ "target_vim": target_vim,
+ "vlds": [sdn_vld],
+ "type": vld.get("type"),
+ }
nsd_vnf_profiles = get_vnf_profiles(nsd)
for nsd_vnf_profile in nsd_vnf_profiles:
and nsd_vlp.get("virtual-link-protocol-data")
and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
):
- ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
+ vld_params["ip-profile"] = nsd_vlp["virtual-link-protocol-data"][
"l3-protocol-data"
]
- ip_profile_dest_data = {}
- if "ip-version" in ip_profile_source_data:
- ip_profile_dest_data["ip-version"] = ip_profile_source_data[
- "ip-version"
- ]
- if "cidr" in ip_profile_source_data:
- ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
- "cidr"
- ]
- if "gateway-ip" in ip_profile_source_data:
- ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
- "gateway-ip"
- ]
- if "dhcp-enabled" in ip_profile_source_data:
- ip_profile_dest_data["dhcp-params"] = {
- "enabled": ip_profile_source_data["dhcp-enabled"]
- }
- vld_params["ip-profile"] = ip_profile_dest_data
# update vld_params with instantiation params
vld_instantiation_params = find_in_list(
vld_params.update(vld_instantiation_params)
parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
target["ns"]["vld"].append(target_vld)
+ # Update the target ns_vld if vnf vim_account is overriden by instantiation params
+ update_ns_vld_target(target, ns_params)
for vnfr in db_vnfrs.values():
vnfd = find_in_list(
and vnfd_vlp.get("virtual-link-protocol-data")
and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
):
- ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
+ vld_params["ip-profile"] = vnfd_vlp["virtual-link-protocol-data"][
"l3-protocol-data"
]
- ip_profile_dest_data = {}
- if "ip-version" in ip_profile_source_data:
- ip_profile_dest_data["ip-version"] = ip_profile_source_data[
- "ip-version"
- ]
- if "cidr" in ip_profile_source_data:
- ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
- "cidr"
- ]
- if "gateway-ip" in ip_profile_source_data:
- ip_profile_dest_data[
- "gateway-address"
- ] = ip_profile_source_data["gateway-ip"]
- if "dhcp-enabled" in ip_profile_source_data:
- ip_profile_dest_data["dhcp-params"] = {
- "enabled": ip_profile_source_data["dhcp-enabled"]
- }
-
- vld_params["ip-profile"] = ip_profile_dest_data
# update vld_params with instantiation params
if vnf_params:
vld_instantiation_params = find_in_list(
# read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
if vdur["cloud-init"] not in target["cloud_init_content"]:
base_folder = vnfd["_admin"]["storage"]
- cloud_init_file = "{}/{}/cloud_init/{}".format(
- base_folder["folder"],
- base_folder["pkg-dir"],
- vdud.get("cloud-init-file"),
- )
+ if base_folder["pkg-dir"]:
+ cloud_init_file = "{}/{}/cloud_init/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ vdud.get("cloud-init-file"),
+ )
+ else:
+ cloud_init_file = "{}/Scripts/cloud_init/{}".format(
+ base_folder["folder"],
+ vdud.get("cloud-init-file"),
+ )
with self.fs.file_open(cloud_init_file, "r") as ci_file:
target["cloud_init_content"][
vdur["cloud-init"]
if target_vim not in ns_image["vim_info"]:
ns_image["vim_info"][target_vim] = {}
+ # Affinity groups
+ if vdur.get("affinity-or-anti-affinity-group-id"):
+ for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
+ ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
+ if target_vim not in ns_ags["vim_info"]:
+ ns_ags["vim_info"][target_vim] = {}
+
vdur["vim_info"] = {target_vim: {}}
# instantiation parameters
- # if vnf_params:
- # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
- # vdud["id"]), None)
+ if vnf_params:
+ vdu_instantiation_params = find_in_list(
+ get_iterable(vnf_params, "vdu"),
+ lambda i_vdu: i_vdu["id"] == vdud["id"],
+ )
+ if vdu_instantiation_params:
+ # Parse the vdu_volumes from the instantiation params
+ vdu_volumes = get_volumes_from_instantiation_params(
+ vdu_instantiation_params, vdud
+ )
+ vdur["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
vdur_list.append(vdur)
target_vnf["vdur"] = vdur_list
target["vnf"].append(target_vnf)
+ self.logger.debug("Send to RO > nsr_id={} target={}".format(nsr_id, target))
desc = await self.RO.deploy(nsr_id, target)
self.logger.debug("RO return > {}".format(desc))
action_id = desc["action_id"]
await self._wait_ng_ro(
- nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
+ nsr_id,
+ action_id,
+ nslcmop_id,
+ start_deploy,
+ timeout_ns_deploy,
+ stage,
+ operation="instantiation",
)
# Updating NSR
start_time=None,
timeout=600,
stage=None,
+ operation=None,
):
detailed_status_old = None
db_nsr_update = {}
start_time = start_time or time()
while time() <= start_time + timeout:
- desc_status = await self.RO.status(nsr_id, action_id)
+ desc_status = await self.op_status_map[operation](nsr_id, action_id)
self.logger.debug("Wait NG RO > {}".format(desc_status))
if desc_status["status"] == "FAILED":
raise NgRoException(desc_status["details"])
}
desc = await self.RO.deploy(nsr_id, target)
action_id = desc["action_id"]
- db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
self.logger.debug(
logging_text
# wait until done
delete_timeout = 20 * 60 # 20 minutes
await self._wait_ng_ro(
- nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
+ nsr_id,
+ action_id,
+ nslcmop_id,
+ start_deploy,
+ delete_timeout,
+ stage,
+ operation="termination",
)
-
- db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
# delete all nsr
await self.RO.delete(nsr_id)
- except Exception as e:
- if isinstance(e, NgRoException) and e.http_code == 404: # not found
+ except NgRoException as e:
+ if e.http_code == 404: # not found
db_nsr_update["_admin.deployed.RO.nsr_id"] = None
db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
- db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
self.logger.debug(
logging_text + "RO_action_id={} already deleted".format(action_id)
)
- elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
+ elif e.http_code == 409: # conflict
failed_detail.append("delete conflict: {}".format(e))
self.logger.debug(
logging_text
logging_text
+ "RO_action_id={} delete error: {}".format(action_id, e)
)
+ except Exception as e:
+ failed_detail.append("delete error: {}".format(e))
+ self.logger.error(
+ logging_text + "RO_action_id={} delete error: {}".format(action_id, e)
+ )
if failed_detail:
stage[2] = "Error deleting from VIM"
if ns_params and ns_params.get("timeout_ns_deploy"):
timeout_ns_deploy = ns_params["timeout_ns_deploy"]
else:
- timeout_ns_deploy = self.timeout.get(
- "ns_deploy", self.timeout_ns_deploy
- )
+ timeout_ns_deploy = self.timeout.ns_deploy
# Check for and optionally request placement optimization. Database will be updated if placement activated
stage[2] = "Waiting for Placement."
:param nsr_id:
:param vnfr_id:
:param kdu_name:
- :return: IP address
+ :return: IP address, K8s services
"""
# self.logger.debug(logging_text + "Starting wait_kdu_up")
)
if kdur.get("status"):
if kdur["status"] in ("READY", "ENABLED"):
- return kdur.get("ip-address")
+ return kdur.get("ip-address"), kdur.get("services")
else:
raise LcmException(
"target KDU={} is in error state".format(kdu_name)
"""
self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
- ro_nsr_id = None
ip_address = None
- nb_tries = 0
target_vdu_id = None
ro_retries = 0
while True:
-
ro_retries += 1
if ro_retries >= 360: # 1 hour
raise LcmException(
self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
return ip_address
try:
- ro_vm_id = "{}-{}".format(
- db_vnfr["member-vnf-index-ref"], target_vdu_id
- ) # TODO add vdu_index
- if self.ng_ro:
- target = {
- "action": {
- "action": "inject_ssh_key",
- "key": pub_key,
- "user": user,
- },
- "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
- }
- desc = await self.RO.deploy(nsr_id, target)
- action_id = desc["action_id"]
- await self._wait_ng_ro(nsr_id, action_id, timeout=600)
- break
- else:
- # wait until NS is deployed at RO
- if not ro_nsr_id:
- db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
- ro_nsr_id = deep_get(
- db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
- )
- if not ro_nsr_id:
- continue
- result_dict = await self.RO.create_action(
- item="ns",
- item_id_name=ro_nsr_id,
- descriptor={
- "add_public_key": pub_key,
- "vms": [ro_vm_id],
- "user": user,
- },
- )
- # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
- if not result_dict or not isinstance(result_dict, dict):
- raise LcmException(
- "Unknown response from RO when injecting key"
- )
- for result in result_dict.values():
- if result.get("vim_result") == 200:
- break
- else:
- raise ROclient.ROClientException(
- "error injecting key: {}".format(
- result.get("description")
- )
- )
- break
+ target = {
+ "action": {
+ "action": "inject_ssh_key",
+ "key": pub_key,
+ "user": user,
+ },
+ "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
+ }
+ desc = await self.RO.deploy(nsr_id, target)
+ action_id = desc["action_id"]
+ await self._wait_ng_ro(
+ nsr_id, action_id, timeout=600, operation="instantiation"
+ )
+ break
except NgRoException as e:
raise LcmException(
"Reaching max tries injecting key. Error: {}".format(e)
)
- except ROclient.ROClientException as e:
- if not nb_tries:
- self.logger.debug(
- logging_text
- + "error injecting key: {}. Retrying until {} seconds".format(
- e, 20 * 10
- )
- )
- nb_tries += 1
- if nb_tries >= 20:
- raise LcmException(
- "Reaching max tries injecting key. Error: {}".format(e)
- )
else:
break
raise LcmException("Configuration aborted because dependent charm/s timeout")
def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
- return deep_get(db_vnfr, ("vca-id",)) or deep_get(
- db_nsr, ("instantiate_params", "vcaId")
- )
+ vca_id = None
+ if db_vnfr:
+ vca_id = deep_get(db_vnfr, ("vca-id",))
+ elif db_nsr:
+ vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
+ vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
+ return vca_id
async def instantiate_N2VC(
self,
vdu_id,
kdu_name,
vdu_index,
+ kdu_index,
config_descriptor,
deploy_params,
base_folder,
}
step = ""
try:
-
element_type = "NS"
element_under_configuration = nsr_id
namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
+ if vca_type == "native_charm":
+ index_number = 0
+ else:
+ index_number = vdu_index or 0
+
if vnfr_id:
element_type = "VNF"
element_under_configuration = vnfr_id
- namespace += ".{}-{}".format(vnfr_id, vdu_index or 0)
+ namespace += ".{}-{}".format(vnfr_id, index_number)
if vdu_id:
- namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
+ namespace += ".{}-{}".format(vdu_id, index_number)
element_type = "VDU"
- element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
+ element_under_configuration = "{}-{}".format(vdu_id, index_number)
osm_config["osm"]["vdu_id"] = vdu_id
elif kdu_name:
- namespace += ".{}.{}".format(kdu_name, vdu_index or 0)
+ namespace += ".{}".format(kdu_name)
element_type = "KDU"
element_under_configuration = kdu_name
osm_config["osm"]["kdu_name"] = kdu_name
# Get artifact path
- artifact_path = "{}/{}/{}/{}".format(
- base_folder["folder"],
- base_folder["pkg-dir"],
- "charms"
- if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
- else "helm-charts",
- vca_name,
- )
+ if base_folder["pkg-dir"]:
+ artifact_path = "{}/{}/{}/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ "charms"
+ if vca_type
+ in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
+ else "helm-charts",
+ vca_name,
+ )
+ else:
+ artifact_path = "{}/Scripts/{}/{}/".format(
+ base_folder["folder"],
+ "charms"
+ if vca_type
+ in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
+ else "helm-charts",
+ vca_name,
+ )
self.logger.debug("Artifact path > {}".format(artifact_path))
vca_id = self.get_vca_id(db_vnfr, db_nsr)
# create or register execution environment in VCA
if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
-
self._write_configuration_status(
nsr_id=nsr_id,
vca_index=vca_index,
db_dict=db_dict,
config=osm_config,
artifact_path=artifact_path,
+ chart_model=vca_name,
vca_type=vca_type,
)
else:
config=config,
num_units=num_units,
vca_id=vca_id,
+ vca_type=vca_type,
)
# write in db flag of configuration_sw already installed
)
# add relations for this VCA (wait for other peers related with this VCA)
- await self._add_vca_relations(
+ is_relation_added = await self._add_vca_relations(
logging_text=logging_text,
nsr_id=nsr_id,
- vca_index=vca_index,
- vca_id=vca_id,
vca_type=vca_type,
+ vca_index=vca_index,
)
+ if not is_relation_added:
+ raise LcmException("Relations could not be added to VCA.")
+
# if SSH access is required, then get execution environment SSH public
# if native charm we have waited already to VM be UP
if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
step = "Waiting to VM being up and getting IP address"
self.logger.debug(logging_text + step)
+ # default rw_mgmt_ip to None, avoiding the non definition of the variable
+ rw_mgmt_ip = None
+
# n2vc_redesign STEP 5.1
# wait for RO (ip-address) Insert pub_key into VM
if vnfr_id:
if kdu_name:
- rw_mgmt_ip = await self.wait_kdu_up(
+ rw_mgmt_ip, services = await self.wait_kdu_up(
logging_text, nsr_id, vnfr_id, kdu_name
)
- else:
+ vnfd = self.db.get_one(
+ "vnfds_revisions",
+ {"_id": f'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
+ )
+ kdu = get_kdu(vnfd, kdu_name)
+ kdu_services = [
+ service["name"] for service in get_kdu_services(kdu)
+ ]
+ exposed_services = []
+ for service in services:
+ if any(s in service["name"] for s in kdu_services):
+ exposed_services.append(service)
+ await self.vca_map[vca_type].exec_primitive(
+ ee_id=ee_id,
+ primitive_name="config",
+ params_dict={
+ "osm-config": json.dumps(
+ OsmConfigBuilder(
+ k8s={"services": exposed_services}
+ ).build()
+ )
+ },
+ vca_id=vca_id,
+ )
+
+ # This verification is needed in order to avoid trying to add a public key
+ # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
+ # for a KNF and not for its KDUs, the previous verification gives False, and the code
+ # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
+ # or it is a KNF)
+ elif db_vnfr.get("vdur"):
rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
logging_text,
nsr_id,
user=user,
pub_key=pub_key,
)
- else:
- rw_mgmt_ip = None # This is for a NS configuration
self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
params_dict=primitive_params_,
db_dict=db_dict,
vca_id=vca_id,
+ vca_type=vca_type,
)
# Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
if check_if_terminated_needed:
# STEP 7 Configure metrics
if vca_type == "helm" or vca_type == "helm-v3":
- prometheus_jobs = await self.add_prometheus_metrics(
+ # TODO: review for those cases where the helm chart is a reference and
+ # is not part of the NF package
+ prometheus_jobs = await self.extract_prometheus_scrape_jobs(
ee_id=ee_id,
artifact_path=artifact_path,
ee_config_descriptor=ee_config_descriptor,
vnfr_id=vnfr_id,
nsr_id=nsr_id,
target_ip=rw_mgmt_ip,
+ element_type=element_type,
+ vnf_member_index=db_vnfr.get("member-vnf-index-ref", ""),
+ vdu_id=vdu_id,
+ vdu_index=vdu_index,
+ kdu_name=kdu_name,
+ kdu_index=kdu_index,
)
if prometheus_jobs:
self.update_db_2(
{db_update_entry + "prometheus_jobs": prometheus_jobs},
)
+ for job in prometheus_jobs:
+ self.db.set_one(
+ "prometheus_jobs",
+ {"job_name": job["job_name"]},
+ job,
+ upsert=True,
+ fail_on_empty=False,
+ )
+
step = "instantiated at VCA"
self.logger.debug(logging_text + step)
self._write_configuration_status(
nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
)
- raise LcmException("{} {}".format(step, e)) from e
+ raise LcmException("{}. {}".format(step, e)) from e
def _write_ns_status(
self,
element_type: str = None,
other_update: dict = None,
):
-
# self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
# .format(vca_index, status))
# update operation on nslcmops
db_nslcmop_update = {}
+ timeout_ns_deploy = self.timeout.ns_deploy
+
nslcmop_operation_state = None
db_vnfrs = {} # vnf's info indexed by member-index
# n2vc_info = {}
# read from db: operation
stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
+ db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
+ db_nslcmop["operationParams"]["additionalParamsForVnf"]
+ )
ns_params = db_nslcmop.get("operationParams")
if ns_params and ns_params.get("timeout_ns_deploy"):
timeout_ns_deploy = ns_params["timeout_ns_deploy"]
- else:
- timeout_ns_deploy = self.timeout.get(
- "ns_deploy", self.timeout_ns_deploy
- )
# read from db: ns
stage[1] = "Getting nsr={} from db.".format(nsr_id)
+ self.logger.debug(logging_text + stage[1])
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
+ self.logger.debug(logging_text + stage[1])
nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
self.fs.sync(db_nsr["nsd-id"])
db_nsr["nsd"] = nsd
# for each vnf in ns, read vnfd
for vnfr in db_vnfrs_list:
+ if vnfr.get("kdur"):
+ kdur_list = []
+ for kdur in vnfr["kdur"]:
+ if kdur.get("additionalParams"):
+ kdur["additionalParams"] = json.loads(
+ kdur["additionalParams"]
+ )
+ kdur_list.append(kdur)
+ vnfr["kdur"] = kdur_list
+
db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
vnfd_id = vnfr["vnfd-id"]
vnfd_ref = vnfr["vnfd-ref"]
# feature 1429. Add n2vc public key to needed VMs
n2vc_key = self.n2vc.get_public_key()
n2vc_key_list = [n2vc_key]
- if self.vca_config.get("public_key"):
- n2vc_key_list.append(self.vca_config["public_key"])
+ if self.vca_config.public_key:
+ n2vc_key_list.append(self.vca_config.public_key)
stage[1] = "Deploying NS at VIM."
task_ro = asyncio.ensure_future(
stage[1] = "Deploying Execution Environments."
self.logger.debug(logging_text + stage[1])
+ # create namespace and certificate if any helm based EE is present in the NS
+ if check_helm_ee_in_ns(db_vnfds):
+ # TODO: create EE namespace
+ # create TLS certificates
+ await self.vca_map["helm-v3"].create_tls_certificate(
+ secret_name="ee-tls-{}".format(nsr_id),
+ dns_prefix="*",
+ nsr_id=nsr_id,
+ usage="server auth",
+ )
+
nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
for vnf_profile in get_vnf_profiles(nsd):
vnfd_id = vnf_profile["vnfd-id"]
vdu_index = 0
vdu_name = None
kdu_name = None
+ kdu_index = None
# Get additional parameters
deploy_params = {"OSM": get_osm_params(db_vnfr)}
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
+ kdu_index=kdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params,
descriptor_config=descriptor_config,
deploy_params_vdu["OSM"] = get_osm_params(
db_vnfr, vdu_id, vdu_count_index=0
)
- vdud_count = get_vdu_profile(vnfd, vdu_id).get(
- "max-number-of-instances", 1
- )
+ vdud_count = get_number_of_instances(vnfd, vdu_id)
self.logger.debug("VDUD > {}".format(vdud))
self.logger.debug(
if descriptor_config:
vdu_name = None
kdu_name = None
+ kdu_index = None
for vdu_index in range(vdud_count):
# TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
self._deploy_n2vc(
vnfd_id=vnfd_id,
vdu_id=vdu_id,
kdu_name=kdu_name,
+ kdu_index=kdu_index,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
vdu_name=vdu_name,
vdu_id = None
vdu_index = 0
vdu_name = None
- kdur = next(
- x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
+ kdu_index, kdur = next(
+ x
+ for x in enumerate(db_vnfr["kdur"])
+ if x[1]["kdu-name"] == kdu_name
)
deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
if kdur.get("additionalParams"):
- deploy_params_kdu = parse_yaml_strings(
- kdur["additionalParams"]
+ deploy_params_kdu.update(
+ parse_yaml_strings(kdur["additionalParams"].copy())
)
self._deploy_n2vc(
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
+ kdu_index=kdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params_kdu,
descriptor_config=descriptor_config,
member_vnf_index = None
vdu_id = None
kdu_name = None
+ kdu_index = None
vdu_index = 0
vdu_name = None
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
+ kdu_index=kdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params,
descriptor_config=descriptor_config,
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
+ def _get_vnfd(self, vnfd_id: str, projects_read: str, cached_vnfds: Dict[str, Any]):
+ if vnfd_id not in cached_vnfds:
+ cached_vnfds[vnfd_id] = self.db.get_one(
+ "vnfds", {"id": vnfd_id, "_admin.projects_read": projects_read}
+ )
+ return cached_vnfds[vnfd_id]
+
+ def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
+ if vnf_profile_id not in cached_vnfrs:
+ cached_vnfrs[vnf_profile_id] = self.db.get_one(
+ "vnfrs",
+ {
+ "member-vnf-index-ref": vnf_profile_id,
+ "nsr-id-ref": nsr_id,
+ },
+ )
+ return cached_vnfrs[vnf_profile_id]
+
+ def _is_deployed_vca_in_relation(
+ self, vca: DeployedVCA, relation: Relation
+ ) -> bool:
+ found = False
+ for endpoint in (relation.provider, relation.requirer):
+ if endpoint["kdu-resource-profile-id"]:
+ continue
+ found = (
+ vca.vnf_profile_id == endpoint.vnf_profile_id
+ and vca.vdu_profile_id == endpoint.vdu_profile_id
+ and vca.execution_environment_ref == endpoint.execution_environment_ref
+ )
+ if found:
+ break
+ return found
+
+ def _update_ee_relation_data_with_implicit_data(
+ self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
+ ):
+ ee_relation_data = safe_get_ee_relation(
+ nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
+ )
+ ee_relation_level = EELevel.get_level(ee_relation_data)
+ if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
+ "execution-environment-ref"
+ ]:
+ vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
+ vnfd_id = vnf_profile["vnfd-id"]
+ project = nsd["_admin"]["projects_read"][0]
+ db_vnfd = self._get_vnfd(vnfd_id, project, cached_vnfds)
+ entity_id = (
+ vnfd_id
+ if ee_relation_level == EELevel.VNF
+ else ee_relation_data["vdu-profile-id"]
+ )
+ ee = get_juju_ee_ref(db_vnfd, entity_id)
+ if not ee:
+ raise Exception(
+ f"not execution environments found for ee_relation {ee_relation_data}"
+ )
+ ee_relation_data["execution-environment-ref"] = ee["id"]
+ return ee_relation_data
+
+ def _get_ns_relations(
+ self,
+ nsr_id: str,
+ nsd: Dict[str, Any],
+ vca: DeployedVCA,
+ cached_vnfds: Dict[str, Any],
+ ) -> List[Relation]:
+ relations = []
+ db_ns_relations = get_ns_configuration_relation_list(nsd)
+ for r in db_ns_relations:
+ provider_dict = None
+ requirer_dict = None
+ if all(key in r for key in ("provider", "requirer")):
+ provider_dict = r["provider"]
+ requirer_dict = r["requirer"]
+ elif "entities" in r:
+ provider_id = r["entities"][0]["id"]
+ provider_dict = {
+ "nsr-id": nsr_id,
+ "endpoint": r["entities"][0]["endpoint"],
+ }
+ if provider_id != nsd["id"]:
+ provider_dict["vnf-profile-id"] = provider_id
+ requirer_id = r["entities"][1]["id"]
+ requirer_dict = {
+ "nsr-id": nsr_id,
+ "endpoint": r["entities"][1]["endpoint"],
+ }
+ if requirer_id != nsd["id"]:
+ requirer_dict["vnf-profile-id"] = requirer_id
+ else:
+ raise Exception(
+ "provider/requirer or entities must be included in the relation."
+ )
+ relation_provider = self._update_ee_relation_data_with_implicit_data(
+ nsr_id, nsd, provider_dict, cached_vnfds
+ )
+ relation_requirer = self._update_ee_relation_data_with_implicit_data(
+ nsr_id, nsd, requirer_dict, cached_vnfds
+ )
+ provider = EERelation(relation_provider)
+ requirer = EERelation(relation_requirer)
+ relation = Relation(r["name"], provider, requirer)
+ vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
+ if vca_in_relation:
+ relations.append(relation)
+ return relations
+
+ def _get_vnf_relations(
+ self,
+ nsr_id: str,
+ nsd: Dict[str, Any],
+ vca: DeployedVCA,
+ cached_vnfds: Dict[str, Any],
+ ) -> List[Relation]:
+ relations = []
+ if vca.target_element == "ns":
+ self.logger.debug("VCA is a NS charm, not a VNF.")
+ return relations
+ vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
+ vnf_profile_id = vnf_profile["id"]
+ vnfd_id = vnf_profile["vnfd-id"]
+ project = nsd["_admin"]["projects_read"][0]
+ db_vnfd = self._get_vnfd(vnfd_id, project, cached_vnfds)
+ db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
+ for r in db_vnf_relations:
+ provider_dict = None
+ requirer_dict = None
+ if all(key in r for key in ("provider", "requirer")):
+ provider_dict = r["provider"]
+ requirer_dict = r["requirer"]
+ elif "entities" in r:
+ provider_id = r["entities"][0]["id"]
+ provider_dict = {
+ "nsr-id": nsr_id,
+ "vnf-profile-id": vnf_profile_id,
+ "endpoint": r["entities"][0]["endpoint"],
+ }
+ if provider_id != vnfd_id:
+ provider_dict["vdu-profile-id"] = provider_id
+ requirer_id = r["entities"][1]["id"]
+ requirer_dict = {
+ "nsr-id": nsr_id,
+ "vnf-profile-id": vnf_profile_id,
+ "endpoint": r["entities"][1]["endpoint"],
+ }
+ if requirer_id != vnfd_id:
+ requirer_dict["vdu-profile-id"] = requirer_id
+ else:
+ raise Exception(
+ "provider/requirer or entities must be included in the relation."
+ )
+ relation_provider = self._update_ee_relation_data_with_implicit_data(
+ nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
+ )
+ relation_requirer = self._update_ee_relation_data_with_implicit_data(
+ nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
+ )
+ provider = EERelation(relation_provider)
+ requirer = EERelation(relation_requirer)
+ relation = Relation(r["name"], provider, requirer)
+ vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
+ if vca_in_relation:
+ relations.append(relation)
+ return relations
+
+ def _get_kdu_resource_data(
+ self,
+ ee_relation: EERelation,
+ db_nsr: Dict[str, Any],
+ cached_vnfds: Dict[str, Any],
+ ) -> DeployedK8sResource:
+ nsd = get_nsd(db_nsr)
+ vnf_profiles = get_vnf_profiles(nsd)
+ vnfd_id = find_in_list(
+ vnf_profiles,
+ lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
+ )["vnfd-id"]
+ project = nsd["_admin"]["projects_read"][0]
+ db_vnfd = self._get_vnfd(vnfd_id, project, cached_vnfds)
+ kdu_resource_profile = get_kdu_resource_profile(
+ db_vnfd, ee_relation.kdu_resource_profile_id
+ )
+ kdu_name = kdu_resource_profile["kdu-name"]
+ deployed_kdu, _ = get_deployed_kdu(
+ db_nsr.get("_admin", ()).get("deployed", ()),
+ kdu_name,
+ ee_relation.vnf_profile_id,
+ )
+ deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
+ return deployed_kdu
+
+ def _get_deployed_component(
+ self,
+ ee_relation: EERelation,
+ db_nsr: Dict[str, Any],
+ cached_vnfds: Dict[str, Any],
+ ) -> DeployedComponent:
+ nsr_id = db_nsr["_id"]
+ deployed_component = None
+ ee_level = EELevel.get_level(ee_relation)
+ if ee_level == EELevel.NS:
+ vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
+ if vca:
+ deployed_component = DeployedVCA(nsr_id, vca)
+ elif ee_level == EELevel.VNF:
+ vca = get_deployed_vca(
+ db_nsr,
+ {
+ "vdu_id": None,
+ "member-vnf-index": ee_relation.vnf_profile_id,
+ "ee_descriptor_id": ee_relation.execution_environment_ref,
+ },
+ )
+ if vca:
+ deployed_component = DeployedVCA(nsr_id, vca)
+ elif ee_level == EELevel.VDU:
+ vca = get_deployed_vca(
+ db_nsr,
+ {
+ "vdu_id": ee_relation.vdu_profile_id,
+ "member-vnf-index": ee_relation.vnf_profile_id,
+ "ee_descriptor_id": ee_relation.execution_environment_ref,
+ },
+ )
+ if vca:
+ deployed_component = DeployedVCA(nsr_id, vca)
+ elif ee_level == EELevel.KDU:
+ kdu_resource_data = self._get_kdu_resource_data(
+ ee_relation, db_nsr, cached_vnfds
+ )
+ if kdu_resource_data:
+ deployed_component = DeployedK8sResource(kdu_resource_data)
+ return deployed_component
+
+ async def _add_relation(
+ self,
+ relation: Relation,
+ vca_type: str,
+ db_nsr: Dict[str, Any],
+ cached_vnfds: Dict[str, Any],
+ cached_vnfrs: Dict[str, Any],
+ ) -> bool:
+ deployed_provider = self._get_deployed_component(
+ relation.provider, db_nsr, cached_vnfds
+ )
+ deployed_requirer = self._get_deployed_component(
+ relation.requirer, db_nsr, cached_vnfds
+ )
+ if (
+ deployed_provider
+ and deployed_requirer
+ and deployed_provider.config_sw_installed
+ and deployed_requirer.config_sw_installed
+ ):
+ provider_db_vnfr = (
+ self._get_vnfr(
+ relation.provider.nsr_id,
+ relation.provider.vnf_profile_id,
+ cached_vnfrs,
+ )
+ if relation.provider.vnf_profile_id
+ else None
+ )
+ requirer_db_vnfr = (
+ self._get_vnfr(
+ relation.requirer.nsr_id,
+ relation.requirer.vnf_profile_id,
+ cached_vnfrs,
+ )
+ if relation.requirer.vnf_profile_id
+ else None
+ )
+ provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
+ requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
+ provider_relation_endpoint = RelationEndpoint(
+ deployed_provider.ee_id,
+ provider_vca_id,
+ relation.provider.endpoint,
+ )
+ requirer_relation_endpoint = RelationEndpoint(
+ deployed_requirer.ee_id,
+ requirer_vca_id,
+ relation.requirer.endpoint,
+ )
+ try:
+ await self.vca_map[vca_type].add_relation(
+ provider=provider_relation_endpoint,
+ requirer=requirer_relation_endpoint,
+ )
+ except N2VCException as exception:
+ self.logger.error(exception)
+ raise LcmException(exception)
+ return True
+ return False
+
async def _add_vca_relations(
self,
logging_text,
nsr_id,
+ vca_type: str,
vca_index: int,
timeout: int = 3600,
- vca_type: str = None,
- vca_id: str = None,
) -> bool:
-
# steps:
# 1. find all relations for this VCA
# 2. wait for other peers related
# 3. add relations
try:
- vca_type = vca_type or "lxc_proxy_charm"
-
# STEP 1: find all relations for this VCA
# read nsr record
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
- nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+ nsd = get_nsd(db_nsr)
# this VCA data
- my_vca = deep_get(db_nsr, ("_admin", "deployed", "VCA"))[vca_index]
-
- # read all ns-configuration relations
- ns_relations = list()
- db_ns_relations = deep_get(nsd, ("ns-configuration", "relation"))
- if db_ns_relations:
- for r in db_ns_relations:
- # check if this VCA is in the relation
- if my_vca.get("member-vnf-index") in (
- r.get("entities")[0].get("id"),
- r.get("entities")[1].get("id"),
- ):
- ns_relations.append(r)
-
- # read all vnf-configuration relations
- vnf_relations = list()
- db_vnfd_list = db_nsr.get("vnfd-id")
- if db_vnfd_list:
- for vnfd in db_vnfd_list:
- db_vnf_relations = None
- db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
- db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"])
- if db_vnf_configuration:
- db_vnf_relations = db_vnf_configuration.get("relation", [])
- if db_vnf_relations:
- for r in db_vnf_relations:
- # check if this VCA is in the relation
- if my_vca.get("vdu_id") in (
- r.get("entities")[0].get("id"),
- r.get("entities")[1].get("id"),
- ):
- vnf_relations.append(r)
+ deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
+ my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
+
+ cached_vnfds = {}
+ cached_vnfrs = {}
+ relations = []
+ relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
+ relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
# if no relations, terminate
- if not ns_relations and not vnf_relations:
+ if not relations:
self.logger.debug(logging_text + " No relations")
return True
- self.logger.debug(
- logging_text
- + " adding relations\n {}\n {}".format(
- ns_relations, vnf_relations
- )
- )
+ self.logger.debug(logging_text + " adding relations {}".format(relations))
# add all relations
start = time()
self.logger.error(logging_text + " : timeout adding relations")
return False
- # reload nsr from database (we need to update record: _admin.deloyed.VCA)
+ # reload nsr from database (we need to update record: _admin.deployed.VCA)
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
- # for each defined NS relation, find the VCA's related
- for r in ns_relations.copy():
- from_vca_ee_id = None
- to_vca_ee_id = None
- from_vca_endpoint = None
- to_vca_endpoint = None
- vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
- for vca in vca_list:
- if vca.get("member-vnf-index") == r.get("entities")[0].get(
- "id"
- ) and vca.get("config_sw_installed"):
- from_vca_ee_id = vca.get("ee_id")
- from_vca_endpoint = r.get("entities")[0].get("endpoint")
- if vca.get("member-vnf-index") == r.get("entities")[1].get(
- "id"
- ) and vca.get("config_sw_installed"):
- to_vca_ee_id = vca.get("ee_id")
- to_vca_endpoint = r.get("entities")[1].get("endpoint")
- if from_vca_ee_id and to_vca_ee_id:
- # add relation
- await self.vca_map[vca_type].add_relation(
- ee_id_1=from_vca_ee_id,
- ee_id_2=to_vca_ee_id,
- endpoint_1=from_vca_endpoint,
- endpoint_2=to_vca_endpoint,
- vca_id=vca_id,
- )
- # remove entry from relations list
- ns_relations.remove(r)
- else:
- # check failed peers
- try:
- vca_status_list = db_nsr.get("configurationStatus")
- if vca_status_list:
- for i in range(len(vca_list)):
- vca = vca_list[i]
- vca_status = vca_status_list[i]
- if vca.get("member-vnf-index") == r.get("entities")[
- 0
- ].get("id"):
- if vca_status.get("status") == "BROKEN":
- # peer broken: remove relation from list
- ns_relations.remove(r)
- if vca.get("member-vnf-index") == r.get("entities")[
- 1
- ].get("id"):
- if vca_status.get("status") == "BROKEN":
- # peer broken: remove relation from list
- ns_relations.remove(r)
- except Exception:
- # ignore
- pass
-
- # for each defined VNF relation, find the VCA's related
- for r in vnf_relations.copy():
- from_vca_ee_id = None
- to_vca_ee_id = None
- from_vca_endpoint = None
- to_vca_endpoint = None
- vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
- for vca in vca_list:
- key_to_check = "vdu_id"
- if vca.get("vdu_id") is None:
- key_to_check = "vnfd_id"
- if vca.get(key_to_check) == r.get("entities")[0].get(
- "id"
- ) and vca.get("config_sw_installed"):
- from_vca_ee_id = vca.get("ee_id")
- from_vca_endpoint = r.get("entities")[0].get("endpoint")
- if vca.get(key_to_check) == r.get("entities")[1].get(
- "id"
- ) and vca.get("config_sw_installed"):
- to_vca_ee_id = vca.get("ee_id")
- to_vca_endpoint = r.get("entities")[1].get("endpoint")
- if from_vca_ee_id and to_vca_ee_id:
- # add relation
- await self.vca_map[vca_type].add_relation(
- ee_id_1=from_vca_ee_id,
- ee_id_2=to_vca_ee_id,
- endpoint_1=from_vca_endpoint,
- endpoint_2=to_vca_endpoint,
- vca_id=vca_id,
- )
- # remove entry from relations list
- vnf_relations.remove(r)
- else:
- # check failed peers
- try:
- vca_status_list = db_nsr.get("configurationStatus")
- if vca_status_list:
- for i in range(len(vca_list)):
- vca = vca_list[i]
- vca_status = vca_status_list[i]
- if vca.get("vdu_id") == r.get("entities")[0].get(
- "id"
- ):
- if vca_status.get("status") == "BROKEN":
- # peer broken: remove relation from list
- vnf_relations.remove(r)
- if vca.get("vdu_id") == r.get("entities")[1].get(
- "id"
- ):
- if vca_status.get("status") == "BROKEN":
- # peer broken: remove relation from list
- vnf_relations.remove(r)
- except Exception:
- # ignore
- pass
-
- # wait for next try
- await asyncio.sleep(5.0)
+ # for each relation, find the VCA's related
+ for relation in relations.copy():
+ added = await self._add_relation(
+ relation,
+ vca_type,
+ db_nsr,
+ cached_vnfds,
+ cached_vnfrs,
+ )
+ if added:
+ relations.remove(relation)
- if not ns_relations and not vnf_relations:
+ if not relations:
self.logger.debug("Relations added")
break
+ await asyncio.sleep(5.0)
return True
timeout: int = 600,
vca_id: str = None,
):
-
try:
k8sclustertype = k8s_instance_info["k8scluster-type"]
# Instantiate kdu
kdu_model=k8s_instance_info["kdu-model"],
kdu_name=k8s_instance_info["kdu-name"],
)
+
+ # Update the nsrs table with the kdu-instance value
self.update_db_2(
- "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
+ item="nsrs",
+ _id=nsr_id,
+ _desc={nsr_db_path + ".kdu-instance": kdu_instance},
)
+
+ # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
+ # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
+ # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
+ # namespace, this first verification could be removed, and the next step would be done for any kind
+ # of KNF.
+ # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
+ # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
+ if k8sclustertype in ("juju", "juju-bundle"):
+ # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
+ # that the user passed a namespace which he wants its KDU to be deployed in)
+ if (
+ self.db.count(
+ table="nsrs",
+ q_filter={
+ "_id": nsr_id,
+ "_admin.projects_write": k8s_instance_info["namespace"],
+ "_admin.projects_read": k8s_instance_info["namespace"],
+ },
+ )
+ > 0
+ ):
+ self.logger.debug(
+ f"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
+ )
+ self.update_db_2(
+ item="nsrs",
+ _id=nsr_id,
+ _desc={f"{nsr_db_path}.namespace": kdu_instance},
+ )
+ k8s_instance_info["namespace"] = kdu_instance
+
await self.k8scluster_map[k8sclustertype].install(
cluster_uuid=k8s_instance_info["k8scluster-uuid"],
kdu_model=k8s_instance_info["kdu-model"],
kdu_instance=kdu_instance,
vca_id=vca_id,
)
- self.update_db_2(
- "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
- )
# Obtain services to obtain management service ip
services = await self.k8scluster_map[k8sclustertype].get_services(
db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
)
storage = deep_get(vnfd_with_id, ("_admin", "storage"))
- if storage and storage.get(
- "pkg-dir"
- ): # may be not present if vnfd has not artifacts
+ if storage: # may be not present if vnfd has not artifacts
# path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
- filename = "{}/{}/{}s/{}".format(
- storage["folder"],
- storage["pkg-dir"],
- k8sclustertype,
- kdumodel,
- )
+ if storage["pkg-dir"]:
+ filename = "{}/{}/{}s/{}".format(
+ storage["folder"],
+ storage["pkg-dir"],
+ k8sclustertype,
+ kdumodel,
+ )
+ else:
+ filename = "{}/Scripts/{}s/{}".format(
+ storage["folder"],
+ k8sclustertype,
+ kdumodel,
+ )
if self.fs.file_exists(
filename, mode="file"
) or self.fs.file_exists(filename, mode="dir"):
vnfd_with_id,
k8s_instance_info,
k8params=desc_params,
- timeout=600,
+ timeout=1800,
vca_id=vca_id,
)
)
kdu_name,
member_vnf_index,
vdu_index,
+ kdu_index,
vdu_name,
deploy_params,
descriptor_config,
self.logger.debug(
logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
)
+
+ charm_name = ""
+ get_charm_name = False
if "execution-environment-list" in descriptor_config:
ee_list = descriptor_config.get("execution-environment-list", [])
elif "juju" in descriptor_config:
ee_list = [descriptor_config] # ns charms
+ if "execution-environment-list" not in descriptor_config:
+ # charm name is only required for ns charms
+ get_charm_name = True
else: # other types as script are not supported
ee_list = []
ee_descriptor_id = ee_item.get("id")
if ee_item.get("juju"):
vca_name = ee_item["juju"].get("charm")
+ if get_charm_name:
+ charm_name = self.find_charm_name(db_nsr, str(vca_name))
vca_type = (
"lxc_proxy_charm"
if ee_item["juju"].get("charm") is not None
"vdu_name": vdu_name,
"type": vca_type,
"ee_descriptor_id": ee_descriptor_id,
+ "charm_name": charm_name,
}
vca_index += 1
vdu_id=vdu_id,
kdu_name=kdu_name,
vdu_index=vdu_index,
+ kdu_index=kdu_index,
deploy_params=deploy_params,
config_descriptor=descriptor_config,
base_folder=base_folder,
"nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
)
- if vca_deployed.get("prometheus_jobs") and self.prometheus:
- await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
+ # Delete Prometheus Jobs if any
+ # This uses NSR_ID, so it will destroy any jobs under this index
+ self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
if destroy_ee:
await self.vca_map[vca_type].delete_execution_environment(
vca_deployed["ee_id"],
scaling_in=scaling_in,
+ vca_type=vca_type,
vca_id=vca_id,
)
try:
await self.n2vc.delete_namespace(
namespace=namespace,
- total_timeout=self.timeout_charm_delete,
+ total_timeout=self.timeout.charm_delete,
vca_id=vca_id,
)
except N2VCNotFound: # already deleted. Skip
pass
self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
- async def _terminate_RO(
- self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
- ):
- """
- Terminates a deployment from RO
- :param logging_text:
- :param nsr_deployed: db_nsr._admin.deployed
- :param nsr_id:
- :param nslcmop_id:
- :param stage: list of string with the content to write on db_nslcmop.detailed-status.
- this method will update only the index 2, but it will write on database the concatenated content of the list
- :return:
- """
- db_nsr_update = {}
- failed_detail = []
- ro_nsr_id = ro_delete_action = None
- if nsr_deployed and nsr_deployed.get("RO"):
- ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
- ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
- try:
- if ro_nsr_id:
- stage[2] = "Deleting ns from VIM."
- db_nsr_update["detailed-status"] = " ".join(stage)
- self._write_op_status(nslcmop_id, stage)
- self.logger.debug(logging_text + stage[2])
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
- self._write_op_status(nslcmop_id, stage)
- desc = await self.RO.delete("ns", ro_nsr_id)
- ro_delete_action = desc["action_id"]
- db_nsr_update[
- "_admin.deployed.RO.nsr_delete_action_id"
- ] = ro_delete_action
- db_nsr_update["_admin.deployed.RO.nsr_id"] = None
- db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
- if ro_delete_action:
- # wait until NS is deleted from VIM
- stage[2] = "Waiting ns deleted from VIM."
- detailed_status_old = None
- self.logger.debug(
- logging_text
- + stage[2]
- + " RO_id={} ro_delete_action={}".format(
- ro_nsr_id, ro_delete_action
- )
- )
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
- self._write_op_status(nslcmop_id, stage)
-
- delete_timeout = 20 * 60 # 20 minutes
- while delete_timeout > 0:
- desc = await self.RO.show(
- "ns",
- item_id_name=ro_nsr_id,
- extra_item="action",
- extra_item_id=ro_delete_action,
- )
-
- # deploymentStatus
- self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
-
- ns_status, ns_status_info = self.RO.check_action_status(desc)
- if ns_status == "ERROR":
- raise ROclient.ROClientException(ns_status_info)
- elif ns_status == "BUILD":
- stage[2] = "Deleting from VIM {}".format(ns_status_info)
- elif ns_status == "ACTIVE":
- db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
- db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
- break
- else:
- assert (
- False
- ), "ROclient.check_action_status returns unknown {}".format(
- ns_status
- )
- if stage[2] != detailed_status_old:
- detailed_status_old = stage[2]
- db_nsr_update["detailed-status"] = " ".join(stage)
- self._write_op_status(nslcmop_id, stage)
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
- await asyncio.sleep(5, loop=self.loop)
- delete_timeout -= 5
- else: # delete_timeout <= 0:
- raise ROclient.ROClientException(
- "Timeout waiting ns deleted from VIM"
- )
-
- except Exception as e:
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
- if (
- isinstance(e, ROclient.ROClientException) and e.http_code == 404
- ): # not found
- db_nsr_update["_admin.deployed.RO.nsr_id"] = None
- db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
- db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
- self.logger.debug(
- logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
- )
- elif (
- isinstance(e, ROclient.ROClientException) and e.http_code == 409
- ): # conflict
- failed_detail.append("delete conflict: {}".format(e))
- self.logger.debug(
- logging_text
- + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
- )
- else:
- failed_detail.append("delete error: {}".format(e))
- self.logger.error(
- logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
- )
-
- # Delete nsd
- if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
- ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
- try:
- stage[2] = "Deleting nsd from RO."
- db_nsr_update["detailed-status"] = " ".join(stage)
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
- self._write_op_status(nslcmop_id, stage)
- await self.RO.delete("nsd", ro_nsd_id)
- self.logger.debug(
- logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
- )
- db_nsr_update["_admin.deployed.RO.nsd_id"] = None
- except Exception as e:
- if (
- isinstance(e, ROclient.ROClientException) and e.http_code == 404
- ): # not found
- db_nsr_update["_admin.deployed.RO.nsd_id"] = None
- self.logger.debug(
- logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
- )
- elif (
- isinstance(e, ROclient.ROClientException) and e.http_code == 409
- ): # conflict
- failed_detail.append(
- "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
- )
- self.logger.debug(logging_text + failed_detail[-1])
- else:
- failed_detail.append(
- "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
- )
- self.logger.error(logging_text + failed_detail[-1])
-
- if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
- for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
- if not vnf_deployed or not vnf_deployed["id"]:
- continue
- try:
- ro_vnfd_id = vnf_deployed["id"]
- stage[
- 2
- ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
- vnf_deployed["member-vnf-index"], ro_vnfd_id
- )
- db_nsr_update["detailed-status"] = " ".join(stage)
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
- self._write_op_status(nslcmop_id, stage)
- await self.RO.delete("vnfd", ro_vnfd_id)
- self.logger.debug(
- logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
- )
- db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
- except Exception as e:
- if (
- isinstance(e, ROclient.ROClientException) and e.http_code == 404
- ): # not found
- db_nsr_update[
- "_admin.deployed.RO.vnfd.{}.id".format(index)
- ] = None
- self.logger.debug(
- logging_text
- + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
- )
- elif (
- isinstance(e, ROclient.ROClientException) and e.http_code == 409
- ): # conflict
- failed_detail.append(
- "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
- )
- self.logger.debug(logging_text + failed_detail[-1])
- else:
- failed_detail.append(
- "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
- )
- self.logger.error(logging_text + failed_detail[-1])
-
- if failed_detail:
- stage[2] = "Error deleting from VIM"
- else:
- stage[2] = "Deleted from VIM"
- db_nsr_update["detailed-status"] = " ".join(stage)
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
- self._write_op_status(nslcmop_id, stage)
-
- if failed_detail:
- raise LcmException("; ".join(failed_detail))
-
async def terminate(self, nsr_id, nslcmop_id):
# Try to lock HA task here
task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
- timeout_ns_terminate = self.timeout_ns_terminate
+ timeout_ns_terminate = self.timeout.ns_terminate
db_nsr = None
db_nslcmop = None
operation_params = None
error_list = await self._wait_for_tasks(
logging_text,
tasks_dict_info,
- min(self.timeout_charm_delete, timeout_ns_terminate),
+ min(self.timeout.charm_delete, timeout_ns_terminate),
stage,
nslcmop_id,
)
task_delete_ee = asyncio.ensure_future(
asyncio.wait_for(
self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
- timeout=self.timeout_charm_delete,
+ timeout=self.timeout.charm_delete,
)
)
# task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
tasks_dict_info[task_delete_ee] = "Terminating all VCA"
+ # Delete Namespace and Certificates if necessary
+ if check_helm_ee_in_ns(list(db_vnfds_from_member_index.values())):
+ await self.vca_map["helm-v3"].delete_tls_certificate(
+ certificate_name=db_nslcmop["nsInstanceId"],
+ )
+ # TODO: Delete namespace
+
# Delete from k8scluster
stage[1] = "Deleting KDUs."
self.logger.debug(logging_text + stage[1])
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu_instance,
vca_id=vca_id,
+ namespace=kdu.get("namespace"),
)
)
else:
# remove from RO
stage[1] = "Deleting ns from VIM."
- if self.ng_ro:
+ if self.ro_config.ng:
task_delete_ro = asyncio.ensure_future(
self._terminate_ng_ro(
logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
)
)
- else:
- task_delete_ro = asyncio.ensure_future(
- self._terminate_RO(
- logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
- )
- )
- tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
+ tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
# rest of staff will be done at finally
ee_id=ee_id,
primitive_name=primitive,
params_dict=primitive_params,
- progress_timeout=self.timeout_progress_primitive,
- total_timeout=self.timeout_primitive,
+ progress_timeout=self.timeout.progress_primitive,
+ total_timeout=self.timeout.primitive,
db_dict=db_dict,
vca_id=vca_id,
+ vca_type=vca_type,
),
- timeout=timeout or self.timeout_primitive,
+ timeout=timeout or self.timeout.primitive,
)
# execution was OK
break
except asyncio.CancelledError:
raise
- except Exception as e: # asyncio.TimeoutError
- if isinstance(e, asyncio.TimeoutError):
- e = "Timeout"
+ except Exception as e:
retries -= 1
if retries >= 0:
self.logger.debug(
# wait and retry
await asyncio.sleep(retries_interval, loop=self.loop)
else:
- return "FAILED", str(e)
+ if isinstance(e, asyncio.TimeoutError):
+ e = N2VCException(
+ message="Timed out waiting for action to complete"
+ )
+ return "FAILED", getattr(e, "message", repr(e))
return "COMPLETED", output
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
vca_id = self.get_vca_id({}, db_nsr)
if db_nsr["_admin"]["deployed"]["K8s"]:
- for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
- cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
+ for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
+ cluster_uuid, kdu_instance, cluster_type = (
+ k8s["k8scluster-uuid"],
+ k8s["kdu-instance"],
+ k8s["k8scluster-type"],
+ )
await self._on_update_k8s_db(
- cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ filter={"_id": nsr_id},
+ vca_id=vca_id,
+ cluster_type=cluster_type,
)
else:
for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
nslcmop_operation_state = None
error_description_nslcmop = None
exc = None
+ step = ""
try:
# wait for any previous tasks in process
step = "Waiting for previous operations to terminate"
step = "Getting information from database"
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ if db_nslcmop["operationParams"].get("primitive_params"):
+ db_nslcmop["operationParams"]["primitive_params"] = json.loads(
+ db_nslcmop["operationParams"]["primitive_params"]
+ )
nsr_deployed = db_nsr["_admin"].get("deployed")
vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
primitive = db_nslcmop["operationParams"]["primitive"]
primitive_params = db_nslcmop["operationParams"]["primitive_params"]
timeout_ns_action = db_nslcmop["operationParams"].get(
- "timeout_ns_action", self.timeout_primitive
+ "timeout_ns_action", self.timeout.primitive
)
if vnf_index:
db_vnfr = self.db.get_one(
"vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
)
+ if db_vnfr.get("kdur"):
+ kdur_list = []
+ for kdur in db_vnfr["kdur"]:
+ if kdur.get("additionalParams"):
+ kdur["additionalParams"] = json.loads(
+ kdur["additionalParams"]
+ )
+ kdur_list.append(kdur)
+ db_vnfr["kdur"] = kdur_list
step = "Getting vnfd from database"
db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
+
+ # Sync filesystem before running a primitive
+ self.fs.sync(db_vnfr["vnfd-id"])
else:
step = "Getting nsd from database"
db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
actions.add(primitive["name"])
for primitive in kdu_configuration.get("config-primitive", []):
actions.add(primitive["name"])
- kdu_action = True if primitive_name in actions else False
+ kdu = find_in_list(
+ nsr_deployed["K8s"],
+ lambda kdu: kdu_name == kdu["kdu-name"]
+ and kdu["member-vnf-index"] == vnf_index,
+ )
+ kdu_action = (
+ True
+ if primitive_name in actions
+ and kdu["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
+ else False
+ )
# TODO check if ns is in a proper status
if kdu_name and (
del desc_params["kdu_model"]
else:
kdu_model = kdu.get("kdu-model")
- parts = kdu_model.split(sep=":")
- if len(parts) == 2:
- kdu_model = parts[0]
+ if kdu_model.count("/") < 2: # helm chart is not embedded
+ parts = kdu_model.split(sep=":")
+ if len(parts) == 2:
+ kdu_model = parts[0]
+ if desc_params.get("kdu_atomic_upgrade"):
+ atomic_upgrade = desc_params.get(
+ "kdu_atomic_upgrade"
+ ).lower() in ("yes", "true", "1")
+ del desc_params["kdu_atomic_upgrade"]
+ else:
+ atomic_upgrade = True
detailed_status = await asyncio.wait_for(
self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu.get("kdu-instance"),
- atomic=True,
+ atomic=atomic_upgrade,
kdu_model=kdu_model,
params=desc_params,
db_dict=db_dict,
)
self.logger.debug(
logging_text
- + " task Done with result {} {}".format(
+ + "Done with result {} {}".format(
nslcmop_operation_state, detailed_status
)
)
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
return nslcmop_operation_state, detailed_status
- async def scale(self, nsr_id, nslcmop_id):
- # Try to lock HA task here
- task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
- if not task_is_locked_by_me:
- return
+ async def terminate_vdus(
+ self, db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text
+ ):
+ """This method terminates VDUs
- logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
- stage = ["", "", ""]
- tasks_dict_info = {}
- # ^ stage, step, VIM progress
- self.logger.debug(logging_text + "Enter")
- # get all needed from database
- db_nsr = None
- db_nslcmop_update = {}
- db_nsr_update = {}
- exc = None
- # in case of error, indicates what part of scale was failed to put nsr at error status
- scale_process = None
- old_operational_status = ""
- old_config_status = ""
- nsi_id = None
- try:
- # wait for any previous tasks in process
- step = "Waiting for previous operations to terminate"
- await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
- self._write_ns_status(
- nsr_id=nsr_id,
- ns_state=None,
- current_operation="SCALING",
- current_operation_id=nslcmop_id,
+ Args:
+ db_vnfr: VNF instance record
+ member_vnf_index: VNF index to identify the VDUs to be removed
+ db_nsr: NS instance record
+ update_db_nslcmops: Nslcmop update record
+ """
+ vca_scaling_info = []
+ scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
+ scaling_info["scaling_direction"] = "IN"
+ scaling_info["vdu-delete"] = {}
+ scaling_info["kdu-delete"] = {}
+ db_vdur = db_vnfr.get("vdur")
+ vdur_list = copy(db_vdur)
+ count_index = 0
+ for index, vdu in enumerate(vdur_list):
+ vca_scaling_info.append(
+ {
+ "osm_vdu_id": vdu["vdu-id-ref"],
+ "member-vnf-index": member_vnf_index,
+ "type": "delete",
+ "vdu_index": count_index,
+ }
)
-
- step = "Getting nslcmop from database"
- self.logger.debug(
- step + " after having waited for previous tasks to be completed"
+ scaling_info["vdu-delete"][vdu["vdu-id-ref"]] = count_index
+ scaling_info["vdu"].append(
+ {
+ "name": vdu.get("name") or vdu.get("vdu-name"),
+ "vdu_id": vdu["vdu-id-ref"],
+ "interface": [],
+ }
)
- db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ for interface in vdu["interfaces"]:
+ scaling_info["vdu"][index]["interface"].append(
+ {
+ "name": interface["name"],
+ "ip_address": interface["ip-address"],
+ "mac_address": interface.get("mac-address"),
+ }
+ )
+ self.logger.info("NS update scaling info{}".format(scaling_info))
+ stage[2] = "Terminating VDUs"
+ if scaling_info.get("vdu-delete"):
+ # scale_process = "RO"
+ if self.ro_config.ng:
+ await self._scale_ng_ro(
+ logging_text,
+ db_nsr,
+ update_db_nslcmops,
+ db_vnfr,
+ scaling_info,
+ stage,
+ )
- step = "Getting nsr from database"
- db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
- old_operational_status = db_nsr["operational-status"]
- old_config_status = db_nsr["config-status"]
+ async def remove_vnf(self, nsr_id, nslcmop_id, vnf_instance_id):
+ """This method is to Remove VNF instances from NS.
- step = "Parsing scaling parameters"
- db_nsr_update["operational-status"] = "scaling"
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
- nsr_deployed = db_nsr["_admin"].get("deployed")
+ Args:
+ nsr_id: NS instance id
+ nslcmop_id: nslcmop id of update
+ vnf_instance_id: id of the VNF instance to be removed
- vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
- "scaleByStepData"
- ]["member-vnf-index"]
- scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
- "scaleByStepData"
- ]["scaling-group-descriptor"]
- scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
- # for backward compatibility
- if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
- nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
- db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ Returns:
+ result: (str, str) COMPLETED/FAILED, details
+ """
+ try:
+ db_nsr_update = {}
+ logging_text = "Task ns={} update ".format(nsr_id)
+ check_vnfr_count = len(self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}))
+ self.logger.info("check_vnfr_count {}".format(check_vnfr_count))
+ if check_vnfr_count > 1:
+ stage = ["", "", ""]
+ step = "Getting nslcmop from database"
+ self.logger.debug(
+ step + " after having waited for previous tasks to be completed"
+ )
+ # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+ """ db_vnfr = self.db.get_one(
+ "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
+
+ update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ await self.terminate_vdus(
+ db_vnfr,
+ member_vnf_index,
+ db_nsr,
+ update_db_nslcmops,
+ stage,
+ logging_text,
+ )
- step = "Getting vnfr from database"
- db_vnfr = self.db.get_one(
- "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
- )
+ constituent_vnfr = db_nsr.get("constituent-vnfr-ref")
+ constituent_vnfr.remove(db_vnfr.get("_id"))
+ db_nsr_update["constituent-vnfr-ref"] = db_nsr.get(
+ "constituent-vnfr-ref"
+ )
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self.db.del_one("vnfrs", {"_id": db_vnfr.get("_id")})
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ return "COMPLETED", "Done"
+ else:
+ step = "Terminate VNF Failed with"
+ raise LcmException(
+ "{} Cannot terminate the last VNF in this NS.".format(
+ vnf_instance_id
+ )
+ )
+ except (LcmException, asyncio.CancelledError):
+ raise
+ except Exception as e:
+ self.logger.debug("Error removing VNF {}".format(e))
+ return "FAILED", "Error removing VNF {}".format(e)
- vca_id = self.get_vca_id(db_vnfr, db_nsr)
+ async def _ns_redeploy_vnf(
+ self,
+ nsr_id,
+ nslcmop_id,
+ db_vnfd,
+ db_vnfr,
+ db_nsr,
+ ):
+ """This method updates and redeploys VNF instances
- step = "Getting vnfd from database"
- db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
+ Args:
+ nsr_id: NS instance id
+ nslcmop_id: nslcmop id
+ db_vnfd: VNF descriptor
+ db_vnfr: VNF instance record
+ db_nsr: NS instance record
- base_folder = db_vnfd["_admin"]["storage"]
+ Returns:
+ result: (str, str) COMPLETED/FAILED, details
+ """
+ try:
+ count_index = 0
+ stage = ["", "", ""]
+ logging_text = "Task ns={} update ".format(nsr_id)
+ latest_vnfd_revision = db_vnfd["_admin"].get("revision")
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+
+ # Terminate old VNF resources
+ update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ await self.terminate_vdus(
+ db_vnfr,
+ member_vnf_index,
+ db_nsr,
+ update_db_nslcmops,
+ stage,
+ logging_text,
+ )
- step = "Getting scaling-group-descriptor"
- scaling_descriptor = find_in_list(
- get_scaling_aspect(db_vnfd),
- lambda scale_desc: scale_desc["name"] == scaling_group,
+ # old_vnfd_id = db_vnfr["vnfd-id"]
+ # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+ new_db_vnfd = db_vnfd
+ # new_vnfd_ref = new_db_vnfd["id"]
+ # new_vnfd_id = vnfd_id
+
+ # Create VDUR
+ new_vnfr_cp = []
+ for cp in new_db_vnfd.get("ext-cpd", ()):
+ vnf_cp = {
+ "name": cp.get("id"),
+ "connection-point-id": cp.get("int-cpd", {}).get("cpd"),
+ "connection-point-vdu-id": cp.get("int-cpd", {}).get("vdu-id"),
+ "id": cp.get("id"),
+ }
+ new_vnfr_cp.append(vnf_cp)
+ new_vdur = update_db_nslcmops["operationParams"]["newVdur"]
+ # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
+ # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
+ new_vnfr_update = {
+ "revision": latest_vnfd_revision,
+ "connection-point": new_vnfr_cp,
+ "vdur": new_vdur,
+ "ip-address": "",
+ }
+ self.update_db_2("vnfrs", db_vnfr["_id"], new_vnfr_update)
+ updated_db_vnfr = self.db.get_one(
+ "vnfrs",
+ {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id},
)
- if not scaling_descriptor:
- raise LcmException(
- "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
- "at vnfd:scaling-group-descriptor".format(scaling_group)
+
+ # Instantiate new VNF resources
+ # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ vca_scaling_info = []
+ scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
+ scaling_info["scaling_direction"] = "OUT"
+ scaling_info["vdu-create"] = {}
+ scaling_info["kdu-create"] = {}
+ vdud_instantiate_list = db_vnfd["vdu"]
+ for index, vdud in enumerate(vdud_instantiate_list):
+ cloud_init_text = self._get_vdu_cloud_init_content(vdud, db_vnfd)
+ if cloud_init_text:
+ additional_params = (
+ self._get_vdu_additional_params(updated_db_vnfr, vdud["id"])
+ or {}
+ )
+ cloud_init_list = []
+ if cloud_init_text:
+ # TODO Information of its own ip is not available because db_vnfr is not updated.
+ additional_params["OSM"] = get_osm_params(
+ updated_db_vnfr, vdud["id"], 1
+ )
+ cloud_init_list.append(
+ self._parse_cloud_init(
+ cloud_init_text,
+ additional_params,
+ db_vnfd["id"],
+ vdud["id"],
+ )
+ )
+ vca_scaling_info.append(
+ {
+ "osm_vdu_id": vdud["id"],
+ "member-vnf-index": member_vnf_index,
+ "type": "create",
+ "vdu_index": count_index,
+ }
+ )
+ scaling_info["vdu-create"][vdud["id"]] = count_index
+ if self.ro_config.ng:
+ self.logger.debug(
+ "New Resources to be deployed: {}".format(scaling_info)
+ )
+ await self._scale_ng_ro(
+ logging_text,
+ db_nsr,
+ update_db_nslcmops,
+ updated_db_vnfr,
+ scaling_info,
+ stage,
)
+ return "COMPLETED", "Done"
+ except (LcmException, asyncio.CancelledError):
+ raise
+ except Exception as e:
+ self.logger.debug("Error updating VNF {}".format(e))
+ return "FAILED", "Error updating VNF {}".format(e)
- step = "Sending scale order to VIM"
- # TODO check if ns is in a proper status
- nb_scale_op = 0
- if not db_nsr["_admin"].get("scaling-group"):
- self.update_db_2(
- "nsrs",
- nsr_id,
- {
- "_admin.scaling-group": [
- {"name": scaling_group, "nb-scale-op": 0}
- ]
- },
+ async def _ns_charm_upgrade(
+ self,
+ ee_id,
+ charm_id,
+ charm_type,
+ path,
+ timeout: float = None,
+ ) -> (str, str):
+ """This method upgrade charms in VNF instances
+
+ Args:
+ ee_id: Execution environment id
+ path: Local path to the charm
+ charm_id: charm-id
+ charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
+ timeout: (Float) Timeout for the ns update operation
+
+ Returns:
+ result: (str, str) COMPLETED/FAILED, details
+ """
+ try:
+ charm_type = charm_type or "lxc_proxy_charm"
+ output = await self.vca_map[charm_type].upgrade_charm(
+ ee_id=ee_id,
+ path=path,
+ charm_id=charm_id,
+ charm_type=charm_type,
+ timeout=timeout or self.timeout.ns_update,
+ )
+
+ if output:
+ return "COMPLETED", output
+
+ except (LcmException, asyncio.CancelledError):
+ raise
+
+ except Exception as e:
+ self.logger.debug("Error upgrading charm {}".format(path))
+
+ return "FAILED", "Error upgrading charm {}: {}".format(path, e)
+
+ async def update(self, nsr_id, nslcmop_id):
+ """Update NS according to different update types
+
+ This method performs upgrade of VNF instances then updates the revision
+ number in VNF record
+
+ Args:
+ nsr_id: Network service will be updated
+ nslcmop_id: ns lcm operation id
+
+ Returns:
+ It may raise DbException, LcmException, N2VCException, K8sException
+
+ """
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+
+ logging_text = "Task ns={} update={} ".format(nsr_id, nslcmop_id)
+ self.logger.debug(logging_text + "Enter")
+
+ # Set the required variables to be filled up later
+ db_nsr = None
+ db_nslcmop_update = {}
+ vnfr_update = {}
+ nslcmop_operation_state = None
+ db_nsr_update = {}
+ error_description_nslcmop = ""
+ exc = None
+ change_type = "updated"
+ detailed_status = ""
+ member_vnf_index = None
+
+ try:
+ # wait for any previous tasks in process
+ step = "Waiting for previous operations to terminate"
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="UPDATING",
+ current_operation_id=nslcmop_id,
+ )
+
+ step = "Getting nslcmop from database"
+ db_nslcmop = self.db.get_one(
+ "nslcmops", {"_id": nslcmop_id}, fail_on_empty=False
+ )
+ update_type = db_nslcmop["operationParams"]["updateType"]
+
+ step = "Getting nsr from database"
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ old_operational_status = db_nsr["operational-status"]
+ db_nsr_update["operational-status"] = "updating"
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ nsr_deployed = db_nsr["_admin"].get("deployed")
+
+ if update_type == "CHANGE_VNFPKG":
+ # Get the input parameters given through update request
+ vnf_instance_id = db_nslcmop["operationParams"][
+ "changeVnfPackageData"
+ ].get("vnfInstanceId")
+
+ vnfd_id = db_nslcmop["operationParams"]["changeVnfPackageData"].get(
+ "vnfdId"
)
- admin_scale_index = 0
- else:
- for admin_scale_index, admin_scale_info in enumerate(
- db_nsr["_admin"]["scaling-group"]
- ):
- if admin_scale_info["name"] == scaling_group:
- nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
- break
- else: # not found, set index one plus last element and add new entry with the name
- admin_scale_index += 1
- db_nsr_update[
- "_admin.scaling-group.{}.name".format(admin_scale_index)
- ] = scaling_group
+ timeout_seconds = db_nslcmop["operationParams"].get("timeout_ns_update")
- vca_scaling_info = []
- scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
- if scaling_type == "SCALE_OUT":
- if "aspect-delta-details" not in scaling_descriptor:
- raise LcmException(
- "Aspect delta details not fount in scaling descriptor {}".format(
- scaling_descriptor["name"]
- )
+ step = "Getting vnfr from database"
+ db_vnfr = self.db.get_one(
+ "vnfrs", {"_id": vnf_instance_id}, fail_on_empty=False
+ )
+
+ step = "Getting vnfds from database"
+ # Latest VNFD
+ latest_vnfd = self.db.get_one(
+ "vnfds", {"_id": vnfd_id}, fail_on_empty=False
+ )
+ latest_vnfd_revision = latest_vnfd["_admin"].get("revision")
+
+ # Current VNFD
+ current_vnf_revision = db_vnfr.get("revision", 1)
+ current_vnfd = self.db.get_one(
+ "vnfds_revisions",
+ {"_id": vnfd_id + ":" + str(current_vnf_revision)},
+ fail_on_empty=False,
+ )
+ # Charm artifact paths will be filled up later
+ (
+ current_charm_artifact_path,
+ target_charm_artifact_path,
+ charm_artifact_paths,
+ helm_artifacts,
+ ) = ([], [], [], [])
+
+ step = "Checking if revision has changed in VNFD"
+ if current_vnf_revision != latest_vnfd_revision:
+ change_type = "policy_updated"
+
+ # There is new revision of VNFD, update operation is required
+ current_vnfd_path = vnfd_id + ":" + str(current_vnf_revision)
+ latest_vnfd_path = vnfd_id + ":" + str(latest_vnfd_revision)
+
+ step = "Removing the VNFD packages if they exist in the local path"
+ shutil.rmtree(self.fs.path + current_vnfd_path, ignore_errors=True)
+ shutil.rmtree(self.fs.path + latest_vnfd_path, ignore_errors=True)
+
+ step = "Get the VNFD packages from FSMongo"
+ self.fs.sync(from_path=latest_vnfd_path)
+ self.fs.sync(from_path=current_vnfd_path)
+
+ step = (
+ "Get the charm-type, charm-id, ee-id if there is deployed VCA"
)
- # count if max-instance-count is reached
- deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
+ current_base_folder = current_vnfd["_admin"]["storage"]
+ latest_base_folder = latest_vnfd["_admin"]["storage"]
- scaling_info["scaling_direction"] = "OUT"
- scaling_info["vdu-create"] = {}
- scaling_info["kdu-create"] = {}
- for delta in deltas:
- for vdu_delta in delta.get("vdu-delta", {}):
- vdud = get_vdu(db_vnfd, vdu_delta["id"])
- # vdu_index also provides the number of instance of the targeted vdu
- vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
- cloud_init_text = self._get_vdu_cloud_init_content(
- vdud, db_vnfd
- )
- if cloud_init_text:
- additional_params = (
- self._get_vdu_additional_params(db_vnfr, vdud["id"])
- or {}
- )
- cloud_init_list = []
+ for vca_index, vca_deployed in enumerate(
+ get_iterable(nsr_deployed, "VCA")
+ ):
+ vnf_index = db_vnfr.get("member-vnf-index-ref")
- vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
- max_instance_count = 10
- if vdu_profile and "max-number-of-instances" in vdu_profile:
- max_instance_count = vdu_profile.get(
- "max-number-of-instances", 10
- )
+ # Getting charm-id and charm-type
+ if vca_deployed.get("member-vnf-index") == vnf_index:
+ vca_id = self.get_vca_id(db_vnfr, db_nsr)
+ vca_type = vca_deployed.get("type")
+ vdu_count_index = vca_deployed.get("vdu_count_index")
- default_instance_num = get_number_of_instances(
- db_vnfd, vdud["id"]
- )
- instances_number = vdu_delta.get("number-of-instances", 1)
- nb_scale_op += instances_number
+ # Getting ee-id
+ ee_id = vca_deployed.get("ee_id")
- new_instance_count = nb_scale_op + default_instance_num
- # Control if new count is over max and vdu count is less than max.
- # Then assign new instance count
- if new_instance_count > max_instance_count > vdu_count:
- instances_number = new_instance_count - max_instance_count
- else:
- instances_number = instances_number
+ step = "Getting descriptor config"
+ if current_vnfd.get("kdu"):
+ search_key = "kdu_name"
+ else:
+ search_key = "vnfd_id"
- if new_instance_count > max_instance_count:
- raise LcmException(
- "reached the limit of {} (max-instance-count) "
- "scaling-out operations for the "
- "scaling-group-descriptor '{}'".format(
- nb_scale_op, scaling_group
- )
+ entity_id = vca_deployed.get(search_key)
+
+ descriptor_config = get_configuration(
+ current_vnfd, entity_id
)
- for x in range(vdu_delta.get("number-of-instances", 1)):
- if cloud_init_text:
- # TODO Information of its own ip is not available because db_vnfr is not updated.
- additional_params["OSM"] = get_osm_params(
- db_vnfr, vdu_delta["id"], vdu_index + x
+
+ if "execution-environment-list" in descriptor_config:
+ ee_list = descriptor_config.get(
+ "execution-environment-list", []
)
- cloud_init_list.append(
- self._parse_cloud_init(
- cloud_init_text,
- additional_params,
- db_vnfd["id"],
- vdud["id"],
+ else:
+ ee_list = []
+
+ # There could be several charm used in the same VNF
+ for ee_item in ee_list:
+ if ee_item.get("juju"):
+ step = "Getting charm name"
+ charm_name = ee_item["juju"].get("charm")
+
+ step = "Setting Charm artifact paths"
+ current_charm_artifact_path.append(
+ get_charm_artifact_path(
+ current_base_folder,
+ charm_name,
+ vca_type,
+ current_vnf_revision,
+ )
+ )
+ target_charm_artifact_path.append(
+ get_charm_artifact_path(
+ latest_base_folder,
+ charm_name,
+ vca_type,
+ latest_vnfd_revision,
+ )
+ )
+ elif ee_item.get("helm-chart"):
+ # add chart to list and all parameters
+ step = "Getting helm chart name"
+ chart_name = ee_item.get("helm-chart")
+ if (
+ ee_item.get("helm-version")
+ and ee_item.get("helm-version") == "v2"
+ ):
+ vca_type = "helm"
+ else:
+ vca_type = "helm-v3"
+ step = "Setting Helm chart artifact paths"
+
+ helm_artifacts.append(
+ {
+ "current_artifact_path": get_charm_artifact_path(
+ current_base_folder,
+ chart_name,
+ vca_type,
+ current_vnf_revision,
+ ),
+ "target_artifact_path": get_charm_artifact_path(
+ latest_base_folder,
+ chart_name,
+ vca_type,
+ latest_vnfd_revision,
+ ),
+ "ee_id": ee_id,
+ "vca_index": vca_index,
+ "vdu_index": vdu_count_index,
+ }
+ )
+
+ charm_artifact_paths = zip(
+ current_charm_artifact_path, target_charm_artifact_path
+ )
+
+ step = "Checking if software version has changed in VNFD"
+ if find_software_version(current_vnfd) != find_software_version(
+ latest_vnfd
+ ):
+ step = "Checking if existing VNF has charm"
+ for current_charm_path, target_charm_path in list(
+ charm_artifact_paths
+ ):
+ if current_charm_path:
+ raise LcmException(
+ "Software version change is not supported as VNF instance {} has charm.".format(
+ vnf_instance_id
)
)
- vca_scaling_info.append(
- {
- "osm_vdu_id": vdu_delta["id"],
- "member-vnf-index": vnf_index,
- "type": "create",
- "vdu_index": vdu_index + x,
- }
- )
- scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
- for kdu_delta in delta.get("kdu-resource-delta", {}):
- kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
- kdu_name = kdu_profile["kdu-name"]
- resource_name = kdu_profile["resource-name"]
- # Might have different kdus in the same delta
- # Should have list for each kdu
- if not scaling_info["kdu-create"].get(kdu_name, None):
- scaling_info["kdu-create"][kdu_name] = []
+ # There is no change in the charm package, then redeploy the VNF
+ # based on new descriptor
+ step = "Redeploying VNF"
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+ (result, detailed_status) = await self._ns_redeploy_vnf(
+ nsr_id, nslcmop_id, latest_vnfd, db_vnfr, db_nsr
+ )
+ if result == "FAILED":
+ nslcmop_operation_state = result
+ error_description_nslcmop = detailed_status
+ db_nslcmop_update["detailed-status"] = detailed_status
+ self.logger.debug(
+ logging_text
+ + " step {} Done with result {} {}".format(
+ step, nslcmop_operation_state, detailed_status
+ )
+ )
- kdur = get_kdur(db_vnfr, kdu_name)
- if kdur.get("helm-chart"):
- k8s_cluster_type = "helm-chart-v3"
- self.logger.debug("kdur: {}".format(kdur))
+ else:
+ step = "Checking if any charm package has changed or not"
+ for current_charm_path, target_charm_path in list(
+ charm_artifact_paths
+ ):
if (
- kdur.get("helm-version")
- and kdur.get("helm-version") == "v2"
+ current_charm_path
+ and target_charm_path
+ and self.check_charm_hash_changed(
+ current_charm_path, target_charm_path
+ )
):
- k8s_cluster_type = "helm-chart"
- raise NotImplementedError
- elif kdur.get("juju-bundle"):
- k8s_cluster_type = "juju-bundle"
- else:
- raise LcmException(
- "kdu type for kdu='{}.{}' is neither helm-chart nor "
- "juju-bundle. Maybe an old NBI version is running".format(
- db_vnfr["member-vnf-index-ref"], kdu_name
+ step = "Checking whether VNF uses juju bundle"
+ if check_juju_bundle_existence(current_vnfd):
+ raise LcmException(
+ "Charm upgrade is not supported for the instance which"
+ " uses juju-bundle: {}".format(
+ check_juju_bundle_existence(current_vnfd)
+ )
+ )
+
+ step = "Upgrading Charm"
+ (
+ result,
+ detailed_status,
+ ) = await self._ns_charm_upgrade(
+ ee_id=ee_id,
+ charm_id=vca_id,
+ charm_type=vca_type,
+ path=self.fs.path + target_charm_path,
+ timeout=timeout_seconds,
)
- )
- max_instance_count = 10
- if kdu_profile and "max-number-of-instances" in kdu_profile:
- max_instance_count = kdu_profile.get(
- "max-number-of-instances", 10
- )
+ if result == "FAILED":
+ nslcmop_operation_state = result
+ error_description_nslcmop = detailed_status
- nb_scale_op += kdu_delta.get("number-of-instances", 1)
- deployed_kdu, _ = get_deployed_kdu(
- nsr_deployed, kdu_name, vnf_index
- )
- if deployed_kdu is None:
- raise LcmException(
- "KDU '{}' for vnf '{}' not deployed".format(
- kdu_name, vnf_index
+ db_nslcmop_update["detailed-status"] = detailed_status
+ self.logger.debug(
+ logging_text
+ + " step {} Done with result {} {}".format(
+ step, nslcmop_operation_state, detailed_status
+ )
)
- )
- kdu_instance = deployed_kdu.get("kdu-instance")
- instance_num = await self.k8scluster_map[
- k8s_cluster_type
- ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
- kdu_replica_count = instance_num + kdu_delta.get(
- "number-of-instances", 1
- )
- # Control if new count is over max and instance_num is less than max.
- # Then assign max instance number to kdu replica count
- if kdu_replica_count > max_instance_count > instance_num:
- kdu_replica_count = max_instance_count
- if kdu_replica_count > max_instance_count:
- raise LcmException(
- "reached the limit of {} (max-instance-count) "
- "scaling-out operations for the "
- "scaling-group-descriptor '{}'".format(
- instance_num, scaling_group
- )
- )
-
- for x in range(kdu_delta.get("number-of-instances", 1)):
- vca_scaling_info.append(
- {
- "osm_kdu_id": kdu_name,
- "member-vnf-index": vnf_index,
- "type": "create",
- "kdu_index": instance_num + x - 1,
- }
+ step = "Updating policies"
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+ result = "COMPLETED"
+ detailed_status = "Done"
+ db_nslcmop_update["detailed-status"] = "Done"
+
+ # helm base EE
+ for item in helm_artifacts:
+ if not (
+ item["current_artifact_path"]
+ and item["target_artifact_path"]
+ and self.check_charm_hash_changed(
+ item["current_artifact_path"],
+ item["target_artifact_path"],
)
- scaling_info["kdu-create"][kdu_name].append(
- {
- "member-vnf-index": vnf_index,
- "type": "create",
- "k8s-cluster-type": k8s_cluster_type,
- "resource-name": resource_name,
- "scale": kdu_replica_count,
- }
+ ):
+ continue
+ db_update_entry = "_admin.deployed.VCA.{}.".format(
+ item["vca_index"]
)
- elif scaling_type == "SCALE_IN":
- deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
-
- scaling_info["scaling_direction"] = "IN"
- scaling_info["vdu-delete"] = {}
- scaling_info["kdu-delete"] = {}
-
- for delta in deltas:
- for vdu_delta in delta.get("vdu-delta", {}):
- vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
- min_instance_count = 0
- vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
- if vdu_profile and "min-number-of-instances" in vdu_profile:
- min_instance_count = vdu_profile["min-number-of-instances"]
-
- default_instance_num = get_number_of_instances(
- db_vnfd, vdu_delta["id"]
+ vnfr_id = db_vnfr["_id"]
+ osm_config = {"osm": {"ns_id": nsr_id, "vnf_id": vnfr_id}}
+ db_dict = {
+ "collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": db_update_entry,
+ }
+ vca_type, namespace, helm_id = get_ee_id_parts(item["ee_id"])
+ await self.vca_map[vca_type].upgrade_execution_environment(
+ namespace=namespace,
+ helm_id=helm_id,
+ db_dict=db_dict,
+ config=osm_config,
+ artifact_path=item["target_artifact_path"],
+ vca_type=vca_type,
)
- instance_num = vdu_delta.get("number-of-instances", 1)
- nb_scale_op -= instance_num
-
- new_instance_count = nb_scale_op + default_instance_num
-
- if new_instance_count < min_instance_count < vdu_count:
- instances_number = min_instance_count - new_instance_count
- else:
- instances_number = instance_num
-
- if new_instance_count < min_instance_count:
- raise LcmException(
- "reached the limit of {} (min-instance-count) scaling-in operations for the "
- "scaling-group-descriptor '{}'".format(
- nb_scale_op, scaling_group
- )
+ vnf_id = db_vnfr.get("vnfd-ref")
+ config_descriptor = get_configuration(latest_vnfd, vnf_id)
+ self.logger.debug("get ssh key block")
+ rw_mgmt_ip = None
+ if deep_get(
+ config_descriptor,
+ ("config-access", "ssh-access", "required"),
+ ):
+ # Needed to inject a ssh key
+ user = deep_get(
+ config_descriptor,
+ ("config-access", "ssh-access", "default-user"),
)
- for x in range(vdu_delta.get("number-of-instances", 1)):
- vca_scaling_info.append(
- {
- "osm_vdu_id": vdu_delta["id"],
- "member-vnf-index": vnf_index,
- "type": "delete",
- "vdu_index": vdu_index - 1 - x,
- }
+ step = (
+ "Install configuration Software, getting public ssh key"
+ )
+ pub_key = await self.vca_map[
+ vca_type
+ ].get_ee_ssh_public__key(
+ ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
)
- scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
- for kdu_delta in delta.get("kdu-resource-delta", {}):
- kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
- kdu_name = kdu_profile["kdu-name"]
- resource_name = kdu_profile["resource-name"]
-
- if not scaling_info["kdu-delete"].get(kdu_name, None):
- scaling_info["kdu-delete"][kdu_name] = []
- kdur = get_kdur(db_vnfr, kdu_name)
- if kdur.get("helm-chart"):
- k8s_cluster_type = "helm-chart-v3"
- self.logger.debug("kdur: {}".format(kdur))
- if (
- kdur.get("helm-version")
- and kdur.get("helm-version") == "v2"
- ):
- k8s_cluster_type = "helm-chart"
- raise NotImplementedError
- elif kdur.get("juju-bundle"):
- k8s_cluster_type = "juju-bundle"
- else:
- raise LcmException(
- "kdu type for kdu='{}.{}' is neither helm-chart nor "
- "juju-bundle. Maybe an old NBI version is running".format(
- db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
+ step = (
+ "Insert public key into VM user={} ssh_key={}".format(
+ user, pub_key
)
)
+ self.logger.debug(logging_text + step)
- min_instance_count = 0
- if kdu_profile and "min-number-of-instances" in kdu_profile:
- min_instance_count = kdu_profile["min-number-of-instances"]
+ # wait for RO (ip-address) Insert pub_key into VM
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
+ logging_text,
+ nsr_id,
+ vnfr_id,
+ None,
+ item["vdu_index"],
+ user=user,
+ pub_key=pub_key,
+ )
- nb_scale_op -= kdu_delta.get("number-of-instances", 1)
- deployed_kdu, _ = get_deployed_kdu(
- nsr_deployed, kdu_name, vnf_index
+ initial_config_primitive_list = config_descriptor.get(
+ "initial-config-primitive"
)
- if deployed_kdu is None:
- raise LcmException(
- "KDU '{}' for vnf '{}' not deployed".format(
- kdu_name, vnf_index
- )
- )
- kdu_instance = deployed_kdu.get("kdu-instance")
- instance_num = await self.k8scluster_map[
- k8s_cluster_type
- ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
- kdu_replica_count = instance_num - kdu_delta.get(
- "number-of-instances", 1
+ config_primitive = next(
+ (
+ p
+ for p in initial_config_primitive_list
+ if p["name"] == "config"
+ ),
+ None,
)
+ if not config_primitive:
+ continue
- if kdu_replica_count < min_instance_count < instance_num:
- kdu_replica_count = min_instance_count
- if kdu_replica_count < min_instance_count:
- raise LcmException(
- "reached the limit of {} (min-instance-count) scaling-in operations for the "
- "scaling-group-descriptor '{}'".format(
- instance_num, scaling_group
+ deploy_params = {"OSM": get_osm_params(db_vnfr)}
+ if rw_mgmt_ip:
+ deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
+ if db_vnfr.get("additionalParamsForVnf"):
+ deploy_params.update(
+ parse_yaml_strings(
+ db_vnfr["additionalParamsForVnf"].copy()
)
)
-
- for x in range(kdu_delta.get("number-of-instances", 1)):
- vca_scaling_info.append(
- {
- "osm_kdu_id": kdu_name,
- "member-vnf-index": vnf_index,
- "type": "delete",
- "kdu_index": instance_num - x - 1,
- }
- )
- scaling_info["kdu-delete"][kdu_name].append(
- {
- "member-vnf-index": vnf_index,
- "type": "delete",
- "k8s-cluster-type": k8s_cluster_type,
- "resource-name": resource_name,
- "scale": kdu_replica_count,
- }
+ primitive_params_ = self._map_primitive_params(
+ config_primitive, {}, deploy_params
)
- # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
- vdu_delete = copy(scaling_info.get("vdu-delete"))
- if scaling_info["scaling_direction"] == "IN":
- for vdur in reversed(db_vnfr["vdur"]):
- if vdu_delete.get(vdur["vdu-id-ref"]):
- vdu_delete[vdur["vdu-id-ref"]] -= 1
- scaling_info["vdu"].append(
- {
- "name": vdur.get("name") or vdur.get("vdu-name"),
- "vdu_id": vdur["vdu-id-ref"],
- "interface": [],
- }
+ step = "execute primitive '{}' params '{}'".format(
+ config_primitive["name"], primitive_params_
)
- for interface in vdur["interfaces"]:
- scaling_info["vdu"][-1]["interface"].append(
- {
- "name": interface["name"],
- "ip_address": interface["ip-address"],
- "mac_address": interface.get("mac-address"),
- }
- )
- # vdu_delete = vdu_scaling_info.pop("vdu-delete")
-
- # PRE-SCALE BEGIN
- step = "Executing pre-scale vnf-config-primitive"
- if scaling_descriptor.get("scaling-config-action"):
- for scaling_config_action in scaling_descriptor[
- "scaling-config-action"
- ]:
- if (
- scaling_config_action.get("trigger") == "pre-scale-in"
- and scaling_type == "SCALE_IN"
- ) or (
- scaling_config_action.get("trigger") == "pre-scale-out"
- and scaling_type == "SCALE_OUT"
- ):
- vnf_config_primitive = scaling_config_action[
- "vnf-config-primitive-name-ref"
- ]
- step = db_nslcmop_update[
- "detailed-status"
- ] = "executing pre-scale scaling-config-action '{}'".format(
- vnf_config_primitive
+ self.logger.debug(logging_text + step)
+ await self.vca_map[vca_type].exec_primitive(
+ ee_id=ee_id,
+ primitive_name=config_primitive["name"],
+ params_dict=primitive_params_,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ vca_type=vca_type,
)
- # look for primitive
- for config_primitive in (
- get_configuration(db_vnfd, db_vnfd["id"]) or {}
- ).get("config-primitive", ()):
- if config_primitive["name"] == vnf_config_primitive:
- break
- else:
- raise LcmException(
- "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
- "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
- "primitive".format(scaling_group, vnf_config_primitive)
- )
+ step = "Updating policies"
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+ detailed_status = "Done"
+ db_nslcmop_update["detailed-status"] = "Done"
- vnfr_params = {"VDU_SCALE_INFO": scaling_info}
- if db_vnfr.get("additionalParamsForVnf"):
- vnfr_params.update(db_vnfr["additionalParamsForVnf"])
+ # If nslcmop_operation_state is None, so any operation is not failed.
+ if not nslcmop_operation_state:
+ nslcmop_operation_state = "COMPLETED"
- scale_process = "VCA"
- db_nsr_update["config-status"] = "configuring pre-scaling"
- primitive_params = self._map_primitive_params(
- config_primitive, {}, vnfr_params
- )
+ # If update CHANGE_VNFPKG nslcmop_operation is successful
+ # vnf revision need to be updated
+ vnfr_update["revision"] = latest_vnfd_revision
+ self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
- # Pre-scale retry check: Check if this sub-operation has been executed before
- op_index = self._check_or_add_scale_suboperation(
- db_nslcmop,
- nslcmop_id,
- vnf_index,
- vnf_config_primitive,
- primitive_params,
- "PRE-SCALE",
+ self.logger.debug(
+ logging_text
+ + " task Done with result {} {}".format(
+ nslcmop_operation_state, detailed_status
)
- if op_index == self.SUBOPERATION_STATUS_SKIP:
- # Skip sub-operation
- result = "COMPLETED"
- result_detail = "Done"
- self.logger.debug(
- logging_text
- + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
- vnf_config_primitive, result, result_detail
- )
- )
- else:
- if op_index == self.SUBOPERATION_STATUS_NEW:
- # New sub-operation: Get index of this sub-operation
- op_index = (
- len(db_nslcmop.get("_admin", {}).get("operations"))
- - 1
- )
- self.logger.debug(
- logging_text
- + "vnf_config_primitive={} New sub-operation".format(
- vnf_config_primitive
- )
- )
- else:
- # retry: Get registered params for this existing sub-operation
- op = db_nslcmop.get("_admin", {}).get("operations", [])[
- op_index
- ]
- vnf_index = op.get("member_vnf_index")
- vnf_config_primitive = op.get("primitive")
- primitive_params = op.get("primitive_params")
- self.logger.debug(
- logging_text
- + "vnf_config_primitive={} Sub-operation retry".format(
- vnf_config_primitive
- )
- )
- # Execute the primitive, either with new (first-time) or registered (reintent) args
- ee_descriptor_id = config_primitive.get(
- "execution-environment-ref"
- )
- primitive_name = config_primitive.get(
- "execution-environment-primitive", vnf_config_primitive
- )
- ee_id, vca_type = self._look_for_deployed_vca(
- nsr_deployed["VCA"],
- member_vnf_index=vnf_index,
- vdu_id=None,
- vdu_count_index=None,
- ee_descriptor_id=ee_descriptor_id,
- )
- result, result_detail = await self._ns_execute_primitive(
- ee_id,
- primitive_name,
- primitive_params,
- vca_type=vca_type,
- vca_id=vca_id,
- )
- self.logger.debug(
- logging_text
- + "vnf_config_primitive={} Done with result {} {}".format(
- vnf_config_primitive, result, result_detail
- )
- )
- # Update operationState = COMPLETED | FAILED
- self._update_suboperation_status(
- db_nslcmop, op_index, result, result_detail
- )
+ )
+ elif update_type == "REMOVE_VNF":
+ # This part is included in https://osm.etsi.org/gerrit/11876
+ vnf_instance_id = db_nslcmop["operationParams"]["removeVnfInstanceId"]
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+ step = "Removing VNF"
+ (result, detailed_status) = await self.remove_vnf(
+ nsr_id, nslcmop_id, vnf_instance_id
+ )
+ if result == "FAILED":
+ nslcmop_operation_state = result
+ error_description_nslcmop = detailed_status
+ db_nslcmop_update["detailed-status"] = detailed_status
+ change_type = "vnf_terminated"
+ if not nslcmop_operation_state:
+ nslcmop_operation_state = "COMPLETED"
+ self.logger.debug(
+ logging_text
+ + " task Done with result {} {}".format(
+ nslcmop_operation_state, detailed_status
+ )
+ )
- if result == "FAILED":
- raise LcmException(result_detail)
- db_nsr_update["config-status"] = old_config_status
- scale_process = None
- # PRE-SCALE END
+ elif update_type == "OPERATE_VNF":
+ vnf_id = db_nslcmop["operationParams"]["operateVnfData"][
+ "vnfInstanceId"
+ ]
+ operation_type = db_nslcmop["operationParams"]["operateVnfData"][
+ "changeStateTo"
+ ]
+ additional_param = db_nslcmop["operationParams"]["operateVnfData"][
+ "additionalParam"
+ ]
+ (result, detailed_status) = await self.rebuild_start_stop(
+ nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
+ )
+ if result == "FAILED":
+ nslcmop_operation_state = result
+ error_description_nslcmop = detailed_status
+ db_nslcmop_update["detailed-status"] = detailed_status
+ if not nslcmop_operation_state:
+ nslcmop_operation_state = "COMPLETED"
+ self.logger.debug(
+ logging_text
+ + " task Done with result {} {}".format(
+ nslcmop_operation_state, detailed_status
+ )
+ )
- db_nsr_update[
- "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
- ] = nb_scale_op
- db_nsr_update[
- "_admin.scaling-group.{}.time".format(admin_scale_index)
- ] = time()
+ # If nslcmop_operation_state is None, so any operation is not failed.
+ # All operations are executed in overall.
+ if not nslcmop_operation_state:
+ nslcmop_operation_state = "COMPLETED"
+ db_nsr_update["operational-status"] = old_operational_status
- # SCALE-IN VCA - BEGIN
- if vca_scaling_info:
- step = db_nslcmop_update[
+ except (DbException, LcmException, N2VCException, K8sException) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(
+ logging_text + "Cancelled Exception while '{}'".format(step)
+ )
+ exc = "Operation was cancelled"
+ except asyncio.TimeoutError:
+ self.logger.error(logging_text + "Timeout while '{}'".format(step))
+ exc = "Timeout"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(
+ logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
+ exc_info=True,
+ )
+ finally:
+ if exc:
+ db_nslcmop_update[
"detailed-status"
- ] = "Deleting the execution environments"
- scale_process = "VCA"
- for vca_info in vca_scaling_info:
- if vca_info["type"] == "delete":
- member_vnf_index = str(vca_info["member-vnf-index"])
- self.logger.debug(
- logging_text + "vdu info: {}".format(vca_info)
+ ] = (
+ detailed_status
+ ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
+ nslcmop_operation_state = "FAILED"
+ db_nsr_update["operational-status"] = old_operational_status
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=db_nsr["nsState"],
+ current_operation="IDLE",
+ current_operation_id=None,
+ other_update=db_nsr_update,
+ )
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+
+ if nslcmop_operation_state:
+ try:
+ msg = {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ }
+ if (
+ change_type in ("vnf_terminated", "policy_updated")
+ and member_vnf_index
+ ):
+ msg.update({"vnf_member_index": member_vnf_index})
+ await self.msg.aiowrite("ns", change_type, msg, loop=self.loop)
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_update")
+ return nslcmop_operation_state, detailed_status
+
+ async def scale(self, nsr_id, nslcmop_id):
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+
+ logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
+ stage = ["", "", ""]
+ tasks_dict_info = {}
+ # ^ stage, step, VIM progress
+ self.logger.debug(logging_text + "Enter")
+ # get all needed from database
+ db_nsr = None
+ db_nslcmop_update = {}
+ db_nsr_update = {}
+ exc = None
+ # in case of error, indicates what part of scale was failed to put nsr at error status
+ scale_process = None
+ old_operational_status = ""
+ old_config_status = ""
+ nsi_id = None
+ try:
+ # wait for any previous tasks in process
+ step = "Waiting for previous operations to terminate"
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="SCALING",
+ current_operation_id=nslcmop_id,
+ )
+
+ step = "Getting nslcmop from database"
+ self.logger.debug(
+ step + " after having waited for previous tasks to be completed"
+ )
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+
+ step = "Getting nsr from database"
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ old_operational_status = db_nsr["operational-status"]
+ old_config_status = db_nsr["config-status"]
+
+ step = "Parsing scaling parameters"
+ db_nsr_update["operational-status"] = "scaling"
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ nsr_deployed = db_nsr["_admin"].get("deployed")
+
+ vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
+ "scaleByStepData"
+ ]["member-vnf-index"]
+ scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
+ "scaleByStepData"
+ ]["scaling-group-descriptor"]
+ scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
+ # for backward compatibility
+ if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
+ nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
+ db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ step = "Getting vnfr from database"
+ db_vnfr = self.db.get_one(
+ "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
+ )
+
+ vca_id = self.get_vca_id(db_vnfr, db_nsr)
+
+ step = "Getting vnfd from database"
+ db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
+
+ base_folder = db_vnfd["_admin"]["storage"]
+
+ step = "Getting scaling-group-descriptor"
+ scaling_descriptor = find_in_list(
+ get_scaling_aspect(db_vnfd),
+ lambda scale_desc: scale_desc["name"] == scaling_group,
+ )
+ if not scaling_descriptor:
+ raise LcmException(
+ "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
+ "at vnfd:scaling-group-descriptor".format(scaling_group)
+ )
+
+ step = "Sending scale order to VIM"
+ # TODO check if ns is in a proper status
+ nb_scale_op = 0
+ if not db_nsr["_admin"].get("scaling-group"):
+ self.update_db_2(
+ "nsrs",
+ nsr_id,
+ {
+ "_admin.scaling-group": [
+ {"name": scaling_group, "nb-scale-op": 0}
+ ]
+ },
+ )
+ admin_scale_index = 0
+ else:
+ for admin_scale_index, admin_scale_info in enumerate(
+ db_nsr["_admin"]["scaling-group"]
+ ):
+ if admin_scale_info["name"] == scaling_group:
+ nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
+ break
+ else: # not found, set index one plus last element and add new entry with the name
+ admin_scale_index += 1
+ db_nsr_update[
+ "_admin.scaling-group.{}.name".format(admin_scale_index)
+ ] = scaling_group
+
+ vca_scaling_info = []
+ scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
+ if scaling_type == "SCALE_OUT":
+ if "aspect-delta-details" not in scaling_descriptor:
+ raise LcmException(
+ "Aspect delta details not fount in scaling descriptor {}".format(
+ scaling_descriptor["name"]
)
- if vca_info.get("osm_vdu_id"):
- vdu_id = vca_info["osm_vdu_id"]
- vdu_index = int(vca_info["vdu_index"])
- stage[
- 1
- ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
- member_vnf_index, vdu_id, vdu_index
+ )
+ # count if max-instance-count is reached
+ deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
+
+ scaling_info["scaling_direction"] = "OUT"
+ scaling_info["vdu-create"] = {}
+ scaling_info["kdu-create"] = {}
+ for delta in deltas:
+ for vdu_delta in delta.get("vdu-delta", {}):
+ vdud = get_vdu(db_vnfd, vdu_delta["id"])
+ # vdu_index also provides the number of instance of the targeted vdu
+ vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
+ cloud_init_text = self._get_vdu_cloud_init_content(
+ vdud, db_vnfd
+ )
+ if cloud_init_text:
+ additional_params = (
+ self._get_vdu_additional_params(db_vnfr, vdud["id"])
+ or {}
+ )
+ cloud_init_list = []
+
+ vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
+ max_instance_count = 10
+ if vdu_profile and "max-number-of-instances" in vdu_profile:
+ max_instance_count = vdu_profile.get(
+ "max-number-of-instances", 10
)
+
+ default_instance_num = get_number_of_instances(
+ db_vnfd, vdud["id"]
+ )
+ instances_number = vdu_delta.get("number-of-instances", 1)
+ nb_scale_op += instances_number
+
+ new_instance_count = nb_scale_op + default_instance_num
+ # Control if new count is over max and vdu count is less than max.
+ # Then assign new instance count
+ if new_instance_count > max_instance_count > vdu_count:
+ instances_number = new_instance_count - max_instance_count
else:
- vdu_index = 0
- kdu_id = vca_info["osm_kdu_id"]
- stage[
- 1
- ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
- member_vnf_index, kdu_id, vdu_index
+ instances_number = instances_number
+
+ if new_instance_count > max_instance_count:
+ raise LcmException(
+ "reached the limit of {} (max-instance-count) "
+ "scaling-out operations for the "
+ "scaling-group-descriptor '{}'".format(
+ nb_scale_op, scaling_group
+ )
)
- stage[2] = step = "Scaling in VCA"
- self._write_op_status(op_id=nslcmop_id, stage=stage)
- vca_update = db_nsr["_admin"]["deployed"]["VCA"]
- config_update = db_nsr["configurationStatus"]
- for vca_index, vca in enumerate(vca_update):
+ for x in range(vdu_delta.get("number-of-instances", 1)):
+ if cloud_init_text:
+ # TODO Information of its own ip is not available because db_vnfr is not updated.
+ additional_params["OSM"] = get_osm_params(
+ db_vnfr, vdu_delta["id"], vdu_index + x
+ )
+ cloud_init_list.append(
+ self._parse_cloud_init(
+ cloud_init_text,
+ additional_params,
+ db_vnfd["id"],
+ vdud["id"],
+ )
+ )
+ vca_scaling_info.append(
+ {
+ "osm_vdu_id": vdu_delta["id"],
+ "member-vnf-index": vnf_index,
+ "type": "create",
+ "vdu_index": vdu_index + x,
+ }
+ )
+ scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
+ for kdu_delta in delta.get("kdu-resource-delta", {}):
+ kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
+ kdu_name = kdu_profile["kdu-name"]
+ resource_name = kdu_profile.get("resource-name", "")
+
+ # Might have different kdus in the same delta
+ # Should have list for each kdu
+ if not scaling_info["kdu-create"].get(kdu_name, None):
+ scaling_info["kdu-create"][kdu_name] = []
+
+ kdur = get_kdur(db_vnfr, kdu_name)
+ if kdur.get("helm-chart"):
+ k8s_cluster_type = "helm-chart-v3"
+ self.logger.debug("kdur: {}".format(kdur))
if (
- (vca or vca.get("ee_id"))
- and vca["member-vnf-index"] == member_vnf_index
- and vca["vdu_count_index"] == vdu_index
+ kdur.get("helm-version")
+ and kdur.get("helm-version") == "v2"
):
- if vca.get("vdu_id"):
- config_descriptor = get_configuration(
- db_vnfd, vca.get("vdu_id")
- )
- elif vca.get("kdu_name"):
- config_descriptor = get_configuration(
- db_vnfd, vca.get("kdu_name")
- )
- else:
- config_descriptor = get_configuration(
- db_vnfd, db_vnfd["id"]
- )
- operation_params = (
- db_nslcmop.get("operationParams") or {}
+ k8s_cluster_type = "helm-chart"
+ elif kdur.get("juju-bundle"):
+ k8s_cluster_type = "juju-bundle"
+ else:
+ raise LcmException(
+ "kdu type for kdu='{}.{}' is neither helm-chart nor "
+ "juju-bundle. Maybe an old NBI version is running".format(
+ db_vnfr["member-vnf-index-ref"], kdu_name
)
- exec_terminate_primitives = not operation_params.get(
- "skip_terminate_primitives"
- ) and vca.get("needed_terminate")
- task = asyncio.ensure_future(
- asyncio.wait_for(
- self.destroy_N2VC(
- logging_text,
- db_nslcmop,
- vca,
- config_descriptor,
- vca_index,
- destroy_ee=True,
- exec_primitives=exec_terminate_primitives,
- scaling_in=True,
- vca_id=vca_id,
- ),
- timeout=self.timeout_charm_delete,
- )
+ )
+
+ max_instance_count = 10
+ if kdu_profile and "max-number-of-instances" in kdu_profile:
+ max_instance_count = kdu_profile.get(
+ "max-number-of-instances", 10
+ )
+
+ nb_scale_op += kdu_delta.get("number-of-instances", 1)
+ deployed_kdu, _ = get_deployed_kdu(
+ nsr_deployed, kdu_name, vnf_index
+ )
+ if deployed_kdu is None:
+ raise LcmException(
+ "KDU '{}' for vnf '{}' not deployed".format(
+ kdu_name, vnf_index
)
- tasks_dict_info[task] = "Terminating VCA {}".format(
- vca.get("ee_id")
+ )
+ kdu_instance = deployed_kdu.get("kdu-instance")
+ instance_num = await self.k8scluster_map[
+ k8s_cluster_type
+ ].get_scale_count(
+ resource_name,
+ kdu_instance,
+ vca_id=vca_id,
+ cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
+ kdu_model=deployed_kdu.get("kdu-model"),
+ )
+ kdu_replica_count = instance_num + kdu_delta.get(
+ "number-of-instances", 1
+ )
+
+ # Control if new count is over max and instance_num is less than max.
+ # Then assign max instance number to kdu replica count
+ if kdu_replica_count > max_instance_count > instance_num:
+ kdu_replica_count = max_instance_count
+ if kdu_replica_count > max_instance_count:
+ raise LcmException(
+ "reached the limit of {} (max-instance-count) "
+ "scaling-out operations for the "
+ "scaling-group-descriptor '{}'".format(
+ instance_num, scaling_group
)
- del vca_update[vca_index]
- del config_update[vca_index]
- # wait for pending tasks of terminate primitives
- if tasks_dict_info:
- self.logger.debug(
- logging_text
- + "Waiting for tasks {}".format(
- list(tasks_dict_info.keys())
+ )
+
+ for x in range(kdu_delta.get("number-of-instances", 1)):
+ vca_scaling_info.append(
+ {
+ "osm_kdu_id": kdu_name,
+ "member-vnf-index": vnf_index,
+ "type": "create",
+ "kdu_index": instance_num + x - 1,
+ }
+ )
+ scaling_info["kdu-create"][kdu_name].append(
+ {
+ "member-vnf-index": vnf_index,
+ "type": "create",
+ "k8s-cluster-type": k8s_cluster_type,
+ "resource-name": resource_name,
+ "scale": kdu_replica_count,
+ }
+ )
+ elif scaling_type == "SCALE_IN":
+ deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
+
+ scaling_info["scaling_direction"] = "IN"
+ scaling_info["vdu-delete"] = {}
+ scaling_info["kdu-delete"] = {}
+
+ for delta in deltas:
+ for vdu_delta in delta.get("vdu-delta", {}):
+ vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
+ min_instance_count = 0
+ vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
+ if vdu_profile and "min-number-of-instances" in vdu_profile:
+ min_instance_count = vdu_profile["min-number-of-instances"]
+
+ default_instance_num = get_number_of_instances(
+ db_vnfd, vdu_delta["id"]
+ )
+ instance_num = vdu_delta.get("number-of-instances", 1)
+ nb_scale_op -= instance_num
+
+ new_instance_count = nb_scale_op + default_instance_num
+
+ if new_instance_count < min_instance_count < vdu_count:
+ instances_number = min_instance_count - new_instance_count
+ else:
+ instances_number = instance_num
+
+ if new_instance_count < min_instance_count:
+ raise LcmException(
+ "reached the limit of {} (min-instance-count) scaling-in operations for the "
+ "scaling-group-descriptor '{}'".format(
+ nb_scale_op, scaling_group
)
)
- error_list = await self._wait_for_tasks(
- logging_text,
- tasks_dict_info,
- min(
- self.timeout_charm_delete, self.timeout_ns_terminate
- ),
- stage,
- nslcmop_id,
+ for x in range(vdu_delta.get("number-of-instances", 1)):
+ vca_scaling_info.append(
+ {
+ "osm_vdu_id": vdu_delta["id"],
+ "member-vnf-index": vnf_index,
+ "type": "delete",
+ "vdu_index": vdu_index - 1 - x,
+ }
)
- tasks_dict_info.clear()
- if error_list:
- raise LcmException("; ".join(error_list))
+ scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
+ for kdu_delta in delta.get("kdu-resource-delta", {}):
+ kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
+ kdu_name = kdu_profile["kdu-name"]
+ resource_name = kdu_profile.get("resource-name", "")
+
+ if not scaling_info["kdu-delete"].get(kdu_name, None):
+ scaling_info["kdu-delete"][kdu_name] = []
+
+ kdur = get_kdur(db_vnfr, kdu_name)
+ if kdur.get("helm-chart"):
+ k8s_cluster_type = "helm-chart-v3"
+ self.logger.debug("kdur: {}".format(kdur))
+ if (
+ kdur.get("helm-version")
+ and kdur.get("helm-version") == "v2"
+ ):
+ k8s_cluster_type = "helm-chart"
+ elif kdur.get("juju-bundle"):
+ k8s_cluster_type = "juju-bundle"
+ else:
+ raise LcmException(
+ "kdu type for kdu='{}.{}' is neither helm-chart nor "
+ "juju-bundle. Maybe an old NBI version is running".format(
+ db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
+ )
+ )
+
+ min_instance_count = 0
+ if kdu_profile and "min-number-of-instances" in kdu_profile:
+ min_instance_count = kdu_profile["min-number-of-instances"]
+
+ nb_scale_op -= kdu_delta.get("number-of-instances", 1)
+ deployed_kdu, _ = get_deployed_kdu(
+ nsr_deployed, kdu_name, vnf_index
+ )
+ if deployed_kdu is None:
+ raise LcmException(
+ "KDU '{}' for vnf '{}' not deployed".format(
+ kdu_name, vnf_index
+ )
+ )
+ kdu_instance = deployed_kdu.get("kdu-instance")
+ instance_num = await self.k8scluster_map[
+ k8s_cluster_type
+ ].get_scale_count(
+ resource_name,
+ kdu_instance,
+ vca_id=vca_id,
+ cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
+ kdu_model=deployed_kdu.get("kdu-model"),
+ )
+ kdu_replica_count = instance_num - kdu_delta.get(
+ "number-of-instances", 1
+ )
+
+ if kdu_replica_count < min_instance_count < instance_num:
+ kdu_replica_count = min_instance_count
+ if kdu_replica_count < min_instance_count:
+ raise LcmException(
+ "reached the limit of {} (min-instance-count) scaling-in operations for the "
+ "scaling-group-descriptor '{}'".format(
+ instance_num, scaling_group
+ )
+ )
+
+ for x in range(kdu_delta.get("number-of-instances", 1)):
+ vca_scaling_info.append(
+ {
+ "osm_kdu_id": kdu_name,
+ "member-vnf-index": vnf_index,
+ "type": "delete",
+ "kdu_index": instance_num - x - 1,
+ }
+ )
+ scaling_info["kdu-delete"][kdu_name].append(
+ {
+ "member-vnf-index": vnf_index,
+ "type": "delete",
+ "k8s-cluster-type": k8s_cluster_type,
+ "resource-name": resource_name,
+ "scale": kdu_replica_count,
+ }
+ )
+
+ # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
+ vdu_delete = copy(scaling_info.get("vdu-delete"))
+ if scaling_info["scaling_direction"] == "IN":
+ for vdur in reversed(db_vnfr["vdur"]):
+ if vdu_delete.get(vdur["vdu-id-ref"]):
+ vdu_delete[vdur["vdu-id-ref"]] -= 1
+ scaling_info["vdu"].append(
+ {
+ "name": vdur.get("name") or vdur.get("vdu-name"),
+ "vdu_id": vdur["vdu-id-ref"],
+ "interface": [],
+ }
+ )
+ for interface in vdur["interfaces"]:
+ scaling_info["vdu"][-1]["interface"].append(
+ {
+ "name": interface["name"],
+ "ip_address": interface["ip-address"],
+ "mac_address": interface.get("mac-address"),
+ }
+ )
+ # vdu_delete = vdu_scaling_info.pop("vdu-delete")
+
+ # PRE-SCALE BEGIN
+ step = "Executing pre-scale vnf-config-primitive"
+ if scaling_descriptor.get("scaling-config-action"):
+ for scaling_config_action in scaling_descriptor[
+ "scaling-config-action"
+ ]:
+ if (
+ scaling_config_action.get("trigger") == "pre-scale-in"
+ and scaling_type == "SCALE_IN"
+ ) or (
+ scaling_config_action.get("trigger") == "pre-scale-out"
+ and scaling_type == "SCALE_OUT"
+ ):
+ vnf_config_primitive = scaling_config_action[
+ "vnf-config-primitive-name-ref"
+ ]
+ step = db_nslcmop_update[
+ "detailed-status"
+ ] = "executing pre-scale scaling-config-action '{}'".format(
+ vnf_config_primitive
+ )
+
+ # look for primitive
+ for config_primitive in (
+ get_configuration(db_vnfd, db_vnfd["id"]) or {}
+ ).get("config-primitive", ()):
+ if config_primitive["name"] == vnf_config_primitive:
+ break
+ else:
+ raise LcmException(
+ "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
+ "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
+ "primitive".format(scaling_group, vnf_config_primitive)
+ )
+
+ vnfr_params = {"VDU_SCALE_INFO": scaling_info}
+ if db_vnfr.get("additionalParamsForVnf"):
+ vnfr_params.update(db_vnfr["additionalParamsForVnf"])
+
+ scale_process = "VCA"
+ db_nsr_update["config-status"] = "configuring pre-scaling"
+ primitive_params = self._map_primitive_params(
+ config_primitive, {}, vnfr_params
+ )
+
+ # Pre-scale retry check: Check if this sub-operation has been executed before
+ op_index = self._check_or_add_scale_suboperation(
+ db_nslcmop,
+ vnf_index,
+ vnf_config_primitive,
+ primitive_params,
+ "PRE-SCALE",
+ )
+ if op_index == self.SUBOPERATION_STATUS_SKIP:
+ # Skip sub-operation
+ result = "COMPLETED"
+ result_detail = "Done"
+ self.logger.debug(
+ logging_text
+ + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
+ vnf_config_primitive, result, result_detail
+ )
+ )
+ else:
+ if op_index == self.SUBOPERATION_STATUS_NEW:
+ # New sub-operation: Get index of this sub-operation
+ op_index = (
+ len(db_nslcmop.get("_admin", {}).get("operations"))
+ - 1
+ )
+ self.logger.debug(
+ logging_text
+ + "vnf_config_primitive={} New sub-operation".format(
+ vnf_config_primitive
+ )
+ )
+ else:
+ # retry: Get registered params for this existing sub-operation
+ op = db_nslcmop.get("_admin", {}).get("operations", [])[
+ op_index
+ ]
+ vnf_index = op.get("member_vnf_index")
+ vnf_config_primitive = op.get("primitive")
+ primitive_params = op.get("primitive_params")
+ self.logger.debug(
+ logging_text
+ + "vnf_config_primitive={} Sub-operation retry".format(
+ vnf_config_primitive
+ )
+ )
+ # Execute the primitive, either with new (first-time) or registered (reintent) args
+ ee_descriptor_id = config_primitive.get(
+ "execution-environment-ref"
+ )
+ primitive_name = config_primitive.get(
+ "execution-environment-primitive", vnf_config_primitive
+ )
+ ee_id, vca_type = self._look_for_deployed_vca(
+ nsr_deployed["VCA"],
+ member_vnf_index=vnf_index,
+ vdu_id=None,
+ vdu_count_index=None,
+ ee_descriptor_id=ee_descriptor_id,
+ )
+ result, result_detail = await self._ns_execute_primitive(
+ ee_id,
+ primitive_name,
+ primitive_params,
+ vca_type=vca_type,
+ vca_id=vca_id,
+ )
+ self.logger.debug(
+ logging_text
+ + "vnf_config_primitive={} Done with result {} {}".format(
+ vnf_config_primitive, result, result_detail
+ )
+ )
+ # Update operationState = COMPLETED | FAILED
+ self._update_suboperation_status(
+ db_nslcmop, op_index, result, result_detail
+ )
+
+ if result == "FAILED":
+ raise LcmException(result_detail)
+ db_nsr_update["config-status"] = old_config_status
+ scale_process = None
+ # PRE-SCALE END
+
+ db_nsr_update[
+ "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
+ ] = nb_scale_op
+ db_nsr_update[
+ "_admin.scaling-group.{}.time".format(admin_scale_index)
+ ] = time()
+
+ # SCALE-IN VCA - BEGIN
+ if vca_scaling_info:
+ step = db_nslcmop_update[
+ "detailed-status"
+ ] = "Deleting the execution environments"
+ scale_process = "VCA"
+ for vca_info in vca_scaling_info:
+ if vca_info["type"] == "delete" and not vca_info.get("osm_kdu_id"):
+ member_vnf_index = str(vca_info["member-vnf-index"])
+ self.logger.debug(
+ logging_text + "vdu info: {}".format(vca_info)
+ )
+ if vca_info.get("osm_vdu_id"):
+ vdu_id = vca_info["osm_vdu_id"]
+ vdu_index = int(vca_info["vdu_index"])
+ stage[
+ 1
+ ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
+ member_vnf_index, vdu_id, vdu_index
+ )
+ stage[2] = step = "Scaling in VCA"
+ self._write_op_status(op_id=nslcmop_id, stage=stage)
+ vca_update = db_nsr["_admin"]["deployed"]["VCA"]
+ config_update = db_nsr["configurationStatus"]
+ for vca_index, vca in enumerate(vca_update):
+ if (
+ (vca or vca.get("ee_id"))
+ and vca["member-vnf-index"] == member_vnf_index
+ and vca["vdu_count_index"] == vdu_index
+ ):
+ if vca.get("vdu_id"):
+ config_descriptor = get_configuration(
+ db_vnfd, vca.get("vdu_id")
+ )
+ elif vca.get("kdu_name"):
+ config_descriptor = get_configuration(
+ db_vnfd, vca.get("kdu_name")
+ )
+ else:
+ config_descriptor = get_configuration(
+ db_vnfd, db_vnfd["id"]
+ )
+ operation_params = (
+ db_nslcmop.get("operationParams") or {}
+ )
+ exec_terminate_primitives = not operation_params.get(
+ "skip_terminate_primitives"
+ ) and vca.get("needed_terminate")
+ task = asyncio.ensure_future(
+ asyncio.wait_for(
+ self.destroy_N2VC(
+ logging_text,
+ db_nslcmop,
+ vca,
+ config_descriptor,
+ vca_index,
+ destroy_ee=True,
+ exec_primitives=exec_terminate_primitives,
+ scaling_in=True,
+ vca_id=vca_id,
+ ),
+ timeout=self.timeout.charm_delete,
+ )
+ )
+ tasks_dict_info[task] = "Terminating VCA {}".format(
+ vca.get("ee_id")
+ )
+ del vca_update[vca_index]
+ del config_update[vca_index]
+ # wait for pending tasks of terminate primitives
+ if tasks_dict_info:
+ self.logger.debug(
+ logging_text
+ + "Waiting for tasks {}".format(
+ list(tasks_dict_info.keys())
+ )
+ )
+ error_list = await self._wait_for_tasks(
+ logging_text,
+ tasks_dict_info,
+ min(
+ self.timeout.charm_delete, self.timeout.ns_terminate
+ ),
+ stage,
+ nslcmop_id,
+ )
+ tasks_dict_info.clear()
+ if error_list:
+ raise LcmException("; ".join(error_list))
+
+ db_vca_and_config_update = {
+ "_admin.deployed.VCA": vca_update,
+ "configurationStatus": config_update,
+ }
+ self.update_db_2(
+ "nsrs", db_nsr["_id"], db_vca_and_config_update
+ )
+ scale_process = None
+ # SCALE-IN VCA - END
+
+ # SCALE RO - BEGIN
+ if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
+ scale_process = "RO"
+ if self.ro_config.ng:
+ await self._scale_ng_ro(
+ logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
+ )
+ scaling_info.pop("vdu-create", None)
+ scaling_info.pop("vdu-delete", None)
+
+ scale_process = None
+ # SCALE RO - END
+
+ # SCALE KDU - BEGIN
+ if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
+ scale_process = "KDU"
+ await self._scale_kdu(
+ logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
+ )
+ scaling_info.pop("kdu-create", None)
+ scaling_info.pop("kdu-delete", None)
+
+ scale_process = None
+ # SCALE KDU - END
+
+ if db_nsr_update:
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ # SCALE-UP VCA - BEGIN
+ if vca_scaling_info:
+ step = db_nslcmop_update[
+ "detailed-status"
+ ] = "Creating new execution environments"
+ scale_process = "VCA"
+ for vca_info in vca_scaling_info:
+ if vca_info["type"] == "create" and not vca_info.get("osm_kdu_id"):
+ member_vnf_index = str(vca_info["member-vnf-index"])
+ self.logger.debug(
+ logging_text + "vdu info: {}".format(vca_info)
+ )
+ vnfd_id = db_vnfr["vnfd-ref"]
+ if vca_info.get("osm_vdu_id"):
+ vdu_index = int(vca_info["vdu_index"])
+ deploy_params = {"OSM": get_osm_params(db_vnfr)}
+ if db_vnfr.get("additionalParamsForVnf"):
+ deploy_params.update(
+ parse_yaml_strings(
+ db_vnfr["additionalParamsForVnf"].copy()
+ )
+ )
+ descriptor_config = get_configuration(
+ db_vnfd, db_vnfd["id"]
+ )
+ if descriptor_config:
+ vdu_id = None
+ vdu_name = None
+ kdu_name = None
+ kdu_index = None
+ self._deploy_n2vc(
+ logging_text=logging_text
+ + "member_vnf_index={} ".format(member_vnf_index),
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ kdu_index=kdu_index,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage,
+ )
+ vdu_id = vca_info["osm_vdu_id"]
+ vdur = find_in_list(
+ db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
+ )
+ descriptor_config = get_configuration(db_vnfd, vdu_id)
+ if vdur.get("additionalParams"):
+ deploy_params_vdu = parse_yaml_strings(
+ vdur["additionalParams"]
+ )
+ else:
+ deploy_params_vdu = deploy_params
+ deploy_params_vdu["OSM"] = get_osm_params(
+ db_vnfr, vdu_id, vdu_count_index=vdu_index
+ )
+ if descriptor_config:
+ vdu_name = None
+ kdu_name = None
+ kdu_index = None
+ stage[
+ 1
+ ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
+ member_vnf_index, vdu_id, vdu_index
+ )
+ stage[2] = step = "Scaling out VCA"
+ self._write_op_status(op_id=nslcmop_id, stage=stage)
+ self._deploy_n2vc(
+ logging_text=logging_text
+ + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
+ member_vnf_index, vdu_id, vdu_index
+ ),
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ kdu_index=kdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params_vdu,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage,
+ )
+ # SCALE-UP VCA - END
+ scale_process = None
+
+ # POST-SCALE BEGIN
+ # execute primitive service POST-SCALING
+ step = "Executing post-scale vnf-config-primitive"
+ if scaling_descriptor.get("scaling-config-action"):
+ for scaling_config_action in scaling_descriptor[
+ "scaling-config-action"
+ ]:
+ if (
+ scaling_config_action.get("trigger") == "post-scale-in"
+ and scaling_type == "SCALE_IN"
+ ) or (
+ scaling_config_action.get("trigger") == "post-scale-out"
+ and scaling_type == "SCALE_OUT"
+ ):
+ vnf_config_primitive = scaling_config_action[
+ "vnf-config-primitive-name-ref"
+ ]
+ step = db_nslcmop_update[
+ "detailed-status"
+ ] = "executing post-scale scaling-config-action '{}'".format(
+ vnf_config_primitive
+ )
+
+ vnfr_params = {"VDU_SCALE_INFO": scaling_info}
+ if db_vnfr.get("additionalParamsForVnf"):
+ vnfr_params.update(db_vnfr["additionalParamsForVnf"])
+
+ # look for primitive
+ for config_primitive in (
+ get_configuration(db_vnfd, db_vnfd["id"]) or {}
+ ).get("config-primitive", ()):
+ if config_primitive["name"] == vnf_config_primitive:
+ break
+ else:
+ raise LcmException(
+ "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
+ "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
+ "config-primitive".format(
+ scaling_group, vnf_config_primitive
+ )
+ )
+ scale_process = "VCA"
+ db_nsr_update["config-status"] = "configuring post-scaling"
+ primitive_params = self._map_primitive_params(
+ config_primitive, {}, vnfr_params
+ )
+
+ # Post-scale retry check: Check if this sub-operation has been executed before
+ op_index = self._check_or_add_scale_suboperation(
+ db_nslcmop,
+ vnf_index,
+ vnf_config_primitive,
+ primitive_params,
+ "POST-SCALE",
+ )
+ if op_index == self.SUBOPERATION_STATUS_SKIP:
+ # Skip sub-operation
+ result = "COMPLETED"
+ result_detail = "Done"
+ self.logger.debug(
+ logging_text
+ + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
+ vnf_config_primitive, result, result_detail
+ )
+ )
+ else:
+ if op_index == self.SUBOPERATION_STATUS_NEW:
+ # New sub-operation: Get index of this sub-operation
+ op_index = (
+ len(db_nslcmop.get("_admin", {}).get("operations"))
+ - 1
+ )
+ self.logger.debug(
+ logging_text
+ + "vnf_config_primitive={} New sub-operation".format(
+ vnf_config_primitive
+ )
+ )
+ else:
+ # retry: Get registered params for this existing sub-operation
+ op = db_nslcmop.get("_admin", {}).get("operations", [])[
+ op_index
+ ]
+ vnf_index = op.get("member_vnf_index")
+ vnf_config_primitive = op.get("primitive")
+ primitive_params = op.get("primitive_params")
+ self.logger.debug(
+ logging_text
+ + "vnf_config_primitive={} Sub-operation retry".format(
+ vnf_config_primitive
+ )
+ )
+ # Execute the primitive, either with new (first-time) or registered (reintent) args
+ ee_descriptor_id = config_primitive.get(
+ "execution-environment-ref"
+ )
+ primitive_name = config_primitive.get(
+ "execution-environment-primitive", vnf_config_primitive
+ )
+ ee_id, vca_type = self._look_for_deployed_vca(
+ nsr_deployed["VCA"],
+ member_vnf_index=vnf_index,
+ vdu_id=None,
+ vdu_count_index=None,
+ ee_descriptor_id=ee_descriptor_id,
+ )
+ result, result_detail = await self._ns_execute_primitive(
+ ee_id,
+ primitive_name,
+ primitive_params,
+ vca_type=vca_type,
+ vca_id=vca_id,
+ )
+ self.logger.debug(
+ logging_text
+ + "vnf_config_primitive={} Done with result {} {}".format(
+ vnf_config_primitive, result, result_detail
+ )
+ )
+ # Update operationState = COMPLETED | FAILED
+ self._update_suboperation_status(
+ db_nslcmop, op_index, result, result_detail
+ )
+
+ if result == "FAILED":
+ raise LcmException(result_detail)
+ db_nsr_update["config-status"] = old_config_status
+ scale_process = None
+ # POST-SCALE END
+
+ db_nsr_update[
+ "detailed-status"
+ ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
+ db_nsr_update["operational-status"] = (
+ "running"
+ if old_operational_status == "failed"
+ else old_operational_status
+ )
+ db_nsr_update["config-status"] = old_config_status
+ return
+ except (
+ ROclient.ROClientException,
+ DbException,
+ LcmException,
+ NgRoException,
+ ) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(
+ logging_text + "Cancelled Exception while '{}'".format(step)
+ )
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(
+ logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
+ exc_info=True,
+ )
+ finally:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="IDLE",
+ current_operation_id=None,
+ )
+ if tasks_dict_info:
+ stage[1] = "Waiting for instantiate pending tasks."
+ self.logger.debug(logging_text + stage[1])
+ exc = await self._wait_for_tasks(
+ logging_text,
+ tasks_dict_info,
+ self.timeout.ns_deploy,
+ stage,
+ nslcmop_id,
+ nsr_id=nsr_id,
+ )
+ if exc:
+ db_nslcmop_update[
+ "detailed-status"
+ ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
+ nslcmop_operation_state = "FAILED"
+ if db_nsr:
+ db_nsr_update["operational-status"] = old_operational_status
+ db_nsr_update["config-status"] = old_config_status
+ db_nsr_update["detailed-status"] = ""
+ if scale_process:
+ if "VCA" in scale_process:
+ db_nsr_update["config-status"] = "failed"
+ if "RO" in scale_process:
+ db_nsr_update["operational-status"] = "failed"
+ db_nsr_update[
+ "detailed-status"
+ ] = "FAILED scaling nslcmop={} {}: {}".format(
+ nslcmop_id, step, exc
+ )
+ else:
+ error_description_nslcmop = None
+ nslcmop_operation_state = "COMPLETED"
+ db_nslcmop_update["detailed-status"] = "Done"
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="IDLE",
+ current_operation_id=None,
+ other_update=db_nsr_update,
+ )
+
+ if nslcmop_operation_state:
+ try:
+ msg = {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ }
+ await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
+
+ async def _scale_kdu(
+ self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
+ ):
+ _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
+ for kdu_name in _scaling_info:
+ for kdu_scaling_info in _scaling_info[kdu_name]:
+ deployed_kdu, index = get_deployed_kdu(
+ nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
+ )
+ cluster_uuid = deployed_kdu["k8scluster-uuid"]
+ kdu_instance = deployed_kdu["kdu-instance"]
+ kdu_model = deployed_kdu.get("kdu-model")
+ scale = int(kdu_scaling_info["scale"])
+ k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
+
+ db_dict = {
+ "collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": "_admin.deployed.K8s.{}".format(index),
+ }
+
+ step = "scaling application {}".format(
+ kdu_scaling_info["resource-name"]
+ )
+ self.logger.debug(logging_text + step)
+
+ if kdu_scaling_info["type"] == "delete":
+ kdu_config = get_configuration(db_vnfd, kdu_name)
+ if (
+ kdu_config
+ and kdu_config.get("terminate-config-primitive")
+ and get_juju_ee_ref(db_vnfd, kdu_name) is None
+ ):
+ terminate_config_primitive_list = kdu_config.get(
+ "terminate-config-primitive"
+ )
+ terminate_config_primitive_list.sort(
+ key=lambda val: int(val["seq"])
+ )
+
+ for (
+ terminate_config_primitive
+ ) in terminate_config_primitive_list:
+ primitive_params_ = self._map_primitive_params(
+ terminate_config_primitive, {}, {}
+ )
+ step = "execute terminate config primitive"
+ self.logger.debug(logging_text + step)
+ await asyncio.wait_for(
+ self.k8scluster_map[k8s_cluster_type].exec_primitive(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ primitive_name=terminate_config_primitive["name"],
+ params=primitive_params_,
+ db_dict=db_dict,
+ total_timeout=self.timeout.primitive,
+ vca_id=vca_id,
+ ),
+ timeout=self.timeout.primitive
+ * self.timeout.primitive_outer_factor,
+ )
+
+ await asyncio.wait_for(
+ self.k8scluster_map[k8s_cluster_type].scale(
+ kdu_instance=kdu_instance,
+ scale=scale,
+ resource_name=kdu_scaling_info["resource-name"],
+ total_timeout=self.timeout.scale_on_error,
+ vca_id=vca_id,
+ cluster_uuid=cluster_uuid,
+ kdu_model=kdu_model,
+ atomic=True,
+ db_dict=db_dict,
+ ),
+ timeout=self.timeout.scale_on_error
+ * self.timeout.scale_on_error_outer_factor,
+ )
+
+ if kdu_scaling_info["type"] == "create":
+ kdu_config = get_configuration(db_vnfd, kdu_name)
+ if (
+ kdu_config
+ and kdu_config.get("initial-config-primitive")
+ and get_juju_ee_ref(db_vnfd, kdu_name) is None
+ ):
+ initial_config_primitive_list = kdu_config.get(
+ "initial-config-primitive"
+ )
+ initial_config_primitive_list.sort(
+ key=lambda val: int(val["seq"])
+ )
+
+ for initial_config_primitive in initial_config_primitive_list:
+ primitive_params_ = self._map_primitive_params(
+ initial_config_primitive, {}, {}
+ )
+ step = "execute initial config primitive"
+ self.logger.debug(logging_text + step)
+ await asyncio.wait_for(
+ self.k8scluster_map[k8s_cluster_type].exec_primitive(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ primitive_name=initial_config_primitive["name"],
+ params=primitive_params_,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ ),
+ timeout=600,
+ )
+
+ async def _scale_ng_ro(
+ self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
+ ):
+ nsr_id = db_nslcmop["nsInstanceId"]
+ db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+ db_vnfrs = {}
+
+ # read from db: vnfd's for every vnf
+ db_vnfds = []
+
+ # for each vnf in ns, read vnfd
+ for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
+ db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
+ vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
+ # if we haven't this vnfd, read it from db
+ if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
+ # read from db
+ vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+ db_vnfds.append(vnfd)
+ n2vc_key = self.n2vc.get_public_key()
+ n2vc_key_list = [n2vc_key]
+ self.scale_vnfr(
+ db_vnfr,
+ vdu_scaling_info.get("vdu-create"),
+ vdu_scaling_info.get("vdu-delete"),
+ mark_delete=True,
+ )
+ # db_vnfr has been updated, update db_vnfrs to use it
+ db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
+ await self._instantiate_ng_ro(
+ logging_text,
+ nsr_id,
+ db_nsd,
+ db_nsr,
+ db_nslcmop,
+ db_vnfrs,
+ db_vnfds,
+ n2vc_key_list,
+ stage=stage,
+ start_deploy=time(),
+ timeout_ns_deploy=self.timeout.ns_deploy,
+ )
+ if vdu_scaling_info.get("vdu-delete"):
+ self.scale_vnfr(
+ db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
+ )
+
+ async def extract_prometheus_scrape_jobs(
+ self,
+ ee_id: str,
+ artifact_path: str,
+ ee_config_descriptor: dict,
+ vnfr_id: str,
+ nsr_id: str,
+ target_ip: str,
+ element_type: str,
+ vnf_member_index: str = "",
+ vdu_id: str = "",
+ vdu_index: int = None,
+ kdu_name: str = "",
+ kdu_index: int = None,
+ ) -> dict:
+ """Method to extract prometheus scrape jobs from EE's Prometheus template job file
+ This method will wait until the corresponding VDU or KDU is fully instantiated
+
+ Args:
+ ee_id (str): Execution Environment ID
+ artifact_path (str): Path where the EE's content is (including the Prometheus template file)
+ ee_config_descriptor (dict): Execution Environment's configuration descriptor
+ vnfr_id (str): VNFR ID where this EE applies
+ nsr_id (str): NSR ID where this EE applies
+ target_ip (str): VDU/KDU instance IP address
+ element_type (str): NS or VNF or VDU or KDU
+ vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
+ vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
+ vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
+ kdu_name (str, optional): KDU name where this EE applies. Defaults to "".
+ kdu_index (int, optional): KDU index where this EE applies. Defaults to None.
+
+ Raises:
+ LcmException: When the VDU or KDU instance was not found in an hour
+
+ Returns:
+ _type_: Prometheus jobs
+ """
+ # default the vdur and kdur names to an empty string, to avoid any later
+ # problem with Prometheus when the element type is not VDU or KDU
+ vdur_name = ""
+ kdur_name = ""
+
+ # look if exist a file called 'prometheus*.j2' and
+ artifact_content = self.fs.dir_ls(artifact_path)
+ job_file = next(
+ (
+ f
+ for f in artifact_content
+ if f.startswith("prometheus") and f.endswith(".j2")
+ ),
+ None,
+ )
+ if not job_file:
+ return
+ with self.fs.file_open((artifact_path, job_file), "r") as f:
+ job_data = f.read()
+
+ # obtain the VDUR or KDUR, if the element type is VDU or KDU
+ if element_type in ("VDU", "KDU"):
+ for _ in range(360):
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
+ if vdu_id and vdu_index is not None:
+ vdur = next(
+ (
+ x
+ for x in get_iterable(db_vnfr, "vdur")
+ if (
+ x.get("vdu-id-ref") == vdu_id
+ and x.get("count-index") == vdu_index
+ )
+ ),
+ {},
+ )
+ if vdur.get("name"):
+ vdur_name = vdur.get("name")
+ break
+ if kdu_name and kdu_index is not None:
+ kdur = next(
+ (
+ x
+ for x in get_iterable(db_vnfr, "kdur")
+ if (
+ x.get("kdu-name") == kdu_name
+ and x.get("count-index") == kdu_index
+ )
+ ),
+ {},
+ )
+ if kdur.get("name"):
+ kdur_name = kdur.get("name")
+ break
+
+ await asyncio.sleep(10, loop=self.loop)
+ else:
+ if vdu_id and vdu_index is not None:
+ raise LcmException(
+ f"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
+ )
+ if kdu_name and kdu_index is not None:
+ raise LcmException(
+ f"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
+ )
+
+ # TODO get_service
+ _, _, service = ee_id.partition(".") # remove prefix "namespace."
+ host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
+ host_port = "80"
+ vnfr_id = vnfr_id.replace("-", "")
+ variables = {
+ "JOB_NAME": vnfr_id,
+ "TARGET_IP": target_ip,
+ "EXPORTER_POD_IP": host_name,
+ "EXPORTER_POD_PORT": host_port,
+ "NSR_ID": nsr_id,
+ "VNF_MEMBER_INDEX": vnf_member_index,
+ "VDUR_NAME": vdur_name,
+ "KDUR_NAME": kdur_name,
+ "ELEMENT_TYPE": element_type,
+ }
+ job_list = parse_job(job_data, variables)
+ # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
+ for job in job_list:
+ if (
+ not isinstance(job.get("job_name"), str)
+ or vnfr_id not in job["job_name"]
+ ):
+ job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
+ job["nsr_id"] = nsr_id
+ job["vnfr_id"] = vnfr_id
+ return job_list
+
+ async def rebuild_start_stop(
+ self, nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
+ ):
+ logging_text = "Task ns={} {}={} ".format(nsr_id, operation_type, nslcmop_id)
+ self.logger.info(logging_text + "Enter")
+ stage = ["Preparing the environment", ""]
+ # database nsrs record
+ db_nsr_update = {}
+ vdu_vim_name = None
+ vim_vm_id = None
+ # in case of error, indicates what part of scale was failed to put nsr at error status
+ start_deploy = time()
+ try:
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_id})
+ vim_account_id = db_vnfr.get("vim-account-id")
+ vim_info_key = "vim:" + vim_account_id
+ vdu_id = additional_param["vdu_id"]
+ vdurs = [item for item in db_vnfr["vdur"] if item["vdu-id-ref"] == vdu_id]
+ vdur = find_in_list(
+ vdurs, lambda vdu: vdu["count-index"] == additional_param["count-index"]
+ )
+ if vdur:
+ vdu_vim_name = vdur["name"]
+ vim_vm_id = vdur["vim_info"][vim_info_key]["vim_id"]
+ target_vim, _ = next(k_v for k_v in vdur["vim_info"].items())
+ else:
+ raise LcmException("Target vdu is not found")
+ self.logger.info("vdu_vim_name >> {} ".format(vdu_vim_name))
+ # wait for any previous tasks in process
+ stage[1] = "Waiting for previous operations to terminate"
+ self.logger.info(stage[1])
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+
+ stage[1] = "Reading from database."
+ self.logger.info(stage[1])
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation=operation_type.upper(),
+ current_operation_id=nslcmop_id,
+ )
+ self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
+
+ # read from db: ns
+ stage[1] = "Getting nsr={} from db.".format(nsr_id)
+ db_nsr_update["operational-status"] = operation_type
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ # Payload for RO
+ desc = {
+ operation_type: {
+ "vim_vm_id": vim_vm_id,
+ "vnf_id": vnf_id,
+ "vdu_index": additional_param["count-index"],
+ "vdu_id": vdur["id"],
+ "target_vim": target_vim,
+ "vim_account_id": vim_account_id,
+ }
+ }
+ stage[1] = "Sending rebuild request to RO... {}".format(desc)
+ self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
+ self.logger.info("ro nsr id: {}".format(nsr_id))
+ result_dict = await self.RO.operate(nsr_id, desc, operation_type)
+ self.logger.info("response from RO: {}".format(result_dict))
+ action_id = result_dict["action_id"]
+ await self._wait_ng_ro(
+ nsr_id,
+ action_id,
+ nslcmop_id,
+ start_deploy,
+ self.timeout.operate,
+ None,
+ "start_stop_rebuild",
+ )
+ return "COMPLETED", "Done"
+ except (ROclient.ROClientException, DbException, LcmException) as e:
+ self.logger.error("Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error("Cancelled Exception while '{}'".format(stage))
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(
+ "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True
+ )
+ return "FAILED", "Error in operate VNF {}".format(exc)
+
+ def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
+ """
+ Get VCA Cloud and VCA Cloud Credentials for the VIM account
+
+ :param: vim_account_id: VIM Account ID
+
+ :return: (cloud_name, cloud_credential)
+ """
+ config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
+ return config.get("vca_cloud"), config.get("vca_cloud_credential")
+
+ def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
+ """
+ Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
+
+ :param: vim_account_id: VIM Account ID
+
+ :return: (cloud_name, cloud_credential)
+ """
+ config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
+ return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")
+
+ async def migrate(self, nsr_id, nslcmop_id):
+ """
+ Migrate VNFs and VDUs instances in a NS
+
+ :param: nsr_id: NS Instance ID
+ :param: nslcmop_id: nslcmop ID of migrate
+
+ """
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+ logging_text = "Task ns={} migrate ".format(nsr_id)
+ self.logger.debug(logging_text + "Enter")
+ # get all needed from database
+ db_nslcmop = None
+ db_nslcmop_update = {}
+ nslcmop_operation_state = None
+ db_nsr_update = {}
+ target = {}
+ exc = None
+ # in case of error, indicates what part of scale was failed to put nsr at error status
+ start_deploy = time()
+
+ try:
+ # wait for any previous tasks in process
+ step = "Waiting for previous operations to terminate"
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="MIGRATING",
+ current_operation_id=nslcmop_id,
+ )
+ step = "Getting nslcmop from database"
+ self.logger.debug(
+ step + " after having waited for previous tasks to be completed"
+ )
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ migrate_params = db_nslcmop.get("operationParams")
+
+ target = {}
+ target.update(migrate_params)
+ desc = await self.RO.migrate(nsr_id, target)
+ self.logger.debug("RO return > {}".format(desc))
+ action_id = desc["action_id"]
+ await self._wait_ng_ro(
+ nsr_id,
+ action_id,
+ nslcmop_id,
+ start_deploy,
+ self.timeout.migrate,
+ operation="migrate",
+ )
+ except (ROclient.ROClientException, DbException, LcmException) as e:
+ self.logger.error("Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error("Cancelled Exception while '{}'".format(step))
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(
+ "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True
+ )
+ finally:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="IDLE",
+ current_operation_id=None,
+ )
+ if exc:
+ db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
+ nslcmop_operation_state = "FAILED"
+ else:
+ nslcmop_operation_state = "COMPLETED"
+ db_nslcmop_update["detailed-status"] = "Done"
+ db_nsr_update["detailed-status"] = "Done"
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message="",
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+ if nslcmop_operation_state:
+ try:
+ msg = {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ }
+ await self.msg.aiowrite("ns", "migrated", msg, loop=self.loop)
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_migrate")
+
+ async def heal(self, nsr_id, nslcmop_id):
+ """
+ Heal NS
+
+ :param nsr_id: ns instance to heal
+ :param nslcmop_id: operation to run
+ :return:
+ """
+
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+
+ logging_text = "Task ns={} heal={} ".format(nsr_id, nslcmop_id)
+ stage = ["", "", ""]
+ tasks_dict_info = {}
+ # ^ stage, step, VIM progress
+ self.logger.debug(logging_text + "Enter")
+ # get all needed from database
+ db_nsr = None
+ db_nslcmop_update = {}
+ db_nsr_update = {}
+ db_vnfrs = {} # vnf's info indexed by _id
+ exc = None
+ old_operational_status = ""
+ old_config_status = ""
+ nsi_id = None
+ try:
+ # wait for any previous tasks in process
+ step = "Waiting for previous operations to terminate"
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="HEALING",
+ current_operation_id=nslcmop_id,
+ )
+
+ step = "Getting nslcmop from database"
+ self.logger.debug(
+ step + " after having waited for previous tasks to be completed"
+ )
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+
+ step = "Getting nsr from database"
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ old_operational_status = db_nsr["operational-status"]
+ old_config_status = db_nsr["config-status"]
+
+ db_nsr_update = {
+ "_admin.deployed.RO.operational-status": "healing",
+ }
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ step = "Sending heal order to VIM"
+ await self.heal_RO(
+ logging_text=logging_text,
+ nsr_id=nsr_id,
+ db_nslcmop=db_nslcmop,
+ stage=stage,
+ )
+ # VCA tasks
+ # read from db: nsd
+ stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
+ self.logger.debug(logging_text + stage[1])
+ nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+ self.fs.sync(db_nsr["nsd-id"])
+ db_nsr["nsd"] = nsd
+ # read from db: vnfr's of this ns
+ step = "Getting vnfrs from db"
+ db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
+ for vnfr in db_vnfrs_list:
+ db_vnfrs[vnfr["_id"]] = vnfr
+ self.logger.debug("ns.heal db_vnfrs={}".format(db_vnfrs))
+
+ # Check for each target VNF
+ target_list = db_nslcmop.get("operationParams", {}).get("healVnfData", {})
+ for target_vnf in target_list:
+ # Find this VNF in the list from DB
+ vnfr_id = target_vnf.get("vnfInstanceId", None)
+ if vnfr_id:
+ db_vnfr = db_vnfrs[vnfr_id]
+ vnfd_id = db_vnfr.get("vnfd-id")
+ vnfd_ref = db_vnfr.get("vnfd-ref")
+ vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+ base_folder = vnfd["_admin"]["storage"]
+ vdu_id = None
+ vdu_index = 0
+ vdu_name = None
+ kdu_name = None
+ nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
+ member_vnf_index = db_vnfr.get("member-vnf-index-ref")
+
+ # Check each target VDU and deploy N2VC
+ target_vdu_list = target_vnf.get("additionalParams", {}).get(
+ "vdu", []
+ )
+ if not target_vdu_list:
+ # Codigo nuevo para crear diccionario
+ target_vdu_list = []
+ for existing_vdu in db_vnfr.get("vdur"):
+ vdu_name = existing_vdu.get("vdu-name", None)
+ vdu_index = existing_vdu.get("count-index", 0)
+ vdu_run_day1 = target_vnf.get("additionalParams", {}).get(
+ "run-day1", False
+ )
+ vdu_to_be_healed = {
+ "vdu-id": vdu_name,
+ "count-index": vdu_index,
+ "run-day1": vdu_run_day1,
+ }
+ target_vdu_list.append(vdu_to_be_healed)
+ for target_vdu in target_vdu_list:
+ deploy_params_vdu = target_vdu
+ # Set run-day1 vnf level value if not vdu level value exists
+ if not deploy_params_vdu.get("run-day1") and target_vnf[
+ "additionalParams"
+ ].get("run-day1"):
+ deploy_params_vdu["run-day1"] = target_vnf[
+ "additionalParams"
+ ].get("run-day1")
+ vdu_name = target_vdu.get("vdu-id", None)
+ # TODO: Get vdu_id from vdud.
+ vdu_id = vdu_name
+ # For multi instance VDU count-index is mandatory
+ # For single session VDU count-indes is 0
+ vdu_index = target_vdu.get("count-index", 0)
+
+ # n2vc_redesign STEP 3 to 6 Deploy N2VC
+ stage[1] = "Deploying Execution Environments."
+ self.logger.debug(logging_text + stage[1])
+
+ # VNF Level charm. Normal case when proxy charms.
+ # If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
+ descriptor_config = get_configuration(vnfd, vnfd_ref)
+ if descriptor_config:
+ # Continue if healed machine is management machine
+ vnf_ip_address = db_vnfr.get("ip-address")
+ target_instance = None
+ for instance in db_vnfr.get("vdur", None):
+ if (
+ instance["vdu-name"] == vdu_name
+ and instance["count-index"] == vdu_index
+ ):
+ target_instance = instance
+ break
+ if vnf_ip_address == target_instance.get("ip-address"):
+ self._heal_n2vc(
+ logging_text=logging_text
+ + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
+ member_vnf_index, vdu_name, vdu_index
+ ),
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_ref,
+ vdu_id=None,
+ kdu_name=None,
+ member_vnf_index=member_vnf_index,
+ vdu_index=0,
+ vdu_name=None,
+ deploy_params=deploy_params_vdu,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage,
+ )
+
+ # VDU Level charm. Normal case with native charms.
+ descriptor_config = get_configuration(vnfd, vdu_name)
+ if descriptor_config:
+ self._heal_n2vc(
+ logging_text=logging_text
+ + "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
+ member_vnf_index, vdu_name, vdu_index
+ ),
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_ref,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params_vdu,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage,
+ )
+
+ except (
+ ROclient.ROClientException,
+ DbException,
+ LcmException,
+ NgRoException,
+ ) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(
+ logging_text + "Cancelled Exception while '{}'".format(step)
+ )
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(
+ logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
+ exc_info=True,
+ )
+ finally:
+ if tasks_dict_info:
+ stage[1] = "Waiting for healing pending tasks."
+ self.logger.debug(logging_text + stage[1])
+ exc = await self._wait_for_tasks(
+ logging_text,
+ tasks_dict_info,
+ self.timeout.ns_deploy,
+ stage,
+ nslcmop_id,
+ nsr_id=nsr_id,
+ )
+ if exc:
+ db_nslcmop_update[
+ "detailed-status"
+ ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
+ nslcmop_operation_state = "FAILED"
+ if db_nsr:
+ db_nsr_update["operational-status"] = old_operational_status
+ db_nsr_update["config-status"] = old_config_status
+ db_nsr_update[
+ "detailed-status"
+ ] = "FAILED healing nslcmop={} {}: {}".format(nslcmop_id, step, exc)
+ for task, task_name in tasks_dict_info.items():
+ if not task.done() or task.cancelled() or task.exception():
+ if task_name.startswith(self.task_name_deploy_vca):
+ # A N2VC task is pending
+ db_nsr_update["config-status"] = "failed"
+ else:
+ # RO task is pending
+ db_nsr_update["operational-status"] = "failed"
+ else:
+ error_description_nslcmop = None
+ nslcmop_operation_state = "COMPLETED"
+ db_nslcmop_update["detailed-status"] = "Done"
+ db_nsr_update["detailed-status"] = "Done"
+ db_nsr_update["operational-status"] = "running"
+ db_nsr_update["config-status"] = "configured"
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="IDLE",
+ current_operation_id=None,
+ other_update=db_nsr_update,
+ )
+
+ if nslcmop_operation_state:
+ try:
+ msg = {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ }
+ await self.msg.aiowrite("ns", "healed", msg, loop=self.loop)
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_heal")
+
+ async def heal_RO(
+ self,
+ logging_text,
+ nsr_id,
+ db_nslcmop,
+ stage,
+ ):
+ """
+ Heal at RO
+ :param logging_text: preffix text to use at logging
+ :param nsr_id: nsr identity
+ :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
+ :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
+ :return: None or exception
+ """
+
+ def get_vim_account(vim_account_id):
+ nonlocal db_vims
+ if vim_account_id in db_vims:
+ return db_vims[vim_account_id]
+ db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
+ db_vims[vim_account_id] = db_vim
+ return db_vim
+
+ try:
+ start_heal = time()
+ ns_params = db_nslcmop.get("operationParams")
+ if ns_params and ns_params.get("timeout_ns_heal"):
+ timeout_ns_heal = ns_params["timeout_ns_heal"]
+ else:
+ timeout_ns_heal = self.timeout.ns_heal
+
+ db_vims = {}
+
+ nslcmop_id = db_nslcmop["_id"]
+ target = {
+ "action_id": nslcmop_id,
+ }
+ self.logger.warning(
+ "db_nslcmop={} and timeout_ns_heal={}".format(
+ db_nslcmop, timeout_ns_heal
+ )
+ )
+ target.update(db_nslcmop.get("operationParams", {}))
+
+ self.logger.debug("Send to RO > nsr_id={} target={}".format(nsr_id, target))
+ desc = await self.RO.recreate(nsr_id, target)
+ self.logger.debug("RO return > {}".format(desc))
+ action_id = desc["action_id"]
+ # waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
+ await self._wait_ng_ro(
+ nsr_id,
+ action_id,
+ nslcmop_id,
+ start_heal,
+ timeout_ns_heal,
+ stage,
+ operation="healing",
+ )
+
+ # Updating NSR
+ db_nsr_update = {
+ "_admin.deployed.RO.operational-status": "running",
+ "detailed-status": " ".join(stage),
+ }
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_op_status(nslcmop_id, stage)
+ self.logger.debug(
+ logging_text + "ns healed at RO. RO_id={}".format(action_id)
+ )
+
+ except Exception as e:
+ stage[2] = "ERROR healing at VIM"
+ # self.set_vnfr_at_error(db_vnfrs, str(e))
+ self.logger.error(
+ "Error healing at VIM {}".format(e),
+ exc_info=not isinstance(
+ e,
+ (
+ ROclient.ROClientException,
+ LcmException,
+ DbException,
+ NgRoException,
+ ),
+ ),
+ )
+ raise
+
+ def _heal_n2vc(
+ self,
+ logging_text,
+ db_nsr,
+ db_vnfr,
+ nslcmop_id,
+ nsr_id,
+ nsi_id,
+ vnfd_id,
+ vdu_id,
+ kdu_name,
+ member_vnf_index,
+ vdu_index,
+ vdu_name,
+ deploy_params,
+ descriptor_config,
+ base_folder,
+ task_instantiation_info,
+ stage,
+ ):
+ # launch instantiate_N2VC in a asyncio task and register task object
+ # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
+ # if not found, create one entry and update database
+ # fill db_nsr._admin.deployed.VCA.<index>
+
+ self.logger.debug(
+ logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
+ )
+
+ charm_name = ""
+ get_charm_name = False
+ if "execution-environment-list" in descriptor_config:
+ ee_list = descriptor_config.get("execution-environment-list", [])
+ elif "juju" in descriptor_config:
+ ee_list = [descriptor_config] # ns charms
+ if "execution-environment-list" not in descriptor_config:
+ # charm name is only required for ns charms
+ get_charm_name = True
+ else: # other types as script are not supported
+ ee_list = []
+
+ for ee_item in ee_list:
+ self.logger.debug(
+ logging_text
+ + "_deploy_n2vc ee_item juju={}, helm={}".format(
+ ee_item.get("juju"), ee_item.get("helm-chart")
+ )
+ )
+ ee_descriptor_id = ee_item.get("id")
+ if ee_item.get("juju"):
+ vca_name = ee_item["juju"].get("charm")
+ if get_charm_name:
+ charm_name = self.find_charm_name(db_nsr, str(vca_name))
+ vca_type = (
+ "lxc_proxy_charm"
+ if ee_item["juju"].get("charm") is not None
+ else "native_charm"
+ )
+ if ee_item["juju"].get("cloud") == "k8s":
+ vca_type = "k8s_proxy_charm"
+ elif ee_item["juju"].get("proxy") is False:
+ vca_type = "native_charm"
+ elif ee_item.get("helm-chart"):
+ vca_name = ee_item["helm-chart"]
+ if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
+ vca_type = "helm"
+ else:
+ vca_type = "helm-v3"
+ else:
+ self.logger.debug(
+ logging_text + "skipping non juju neither charm configuration"
+ )
+ continue
+
+ vca_index = -1
+ for vca_index, vca_deployed in enumerate(
+ db_nsr["_admin"]["deployed"]["VCA"]
+ ):
+ if not vca_deployed:
+ continue
+ if (
+ vca_deployed.get("member-vnf-index") == member_vnf_index
+ and vca_deployed.get("vdu_id") == vdu_id
+ and vca_deployed.get("kdu_name") == kdu_name
+ and vca_deployed.get("vdu_count_index", 0) == vdu_index
+ and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
+ ):
+ break
+ else:
+ # not found, create one.
+ target = (
+ "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
+ )
+ if vdu_id:
+ target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
+ elif kdu_name:
+ target += "/kdu/{}".format(kdu_name)
+ vca_deployed = {
+ "target_element": target,
+ # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
+ "member-vnf-index": member_vnf_index,
+ "vdu_id": vdu_id,
+ "kdu_name": kdu_name,
+ "vdu_count_index": vdu_index,
+ "operational-status": "init", # TODO revise
+ "detailed-status": "", # TODO revise
+ "step": "initial-deploy", # TODO revise
+ "vnfd_id": vnfd_id,
+ "vdu_name": vdu_name,
+ "type": vca_type,
+ "ee_descriptor_id": ee_descriptor_id,
+ "charm_name": charm_name,
+ }
+ vca_index += 1
+
+ # create VCA and configurationStatus in db
+ db_dict = {
+ "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
+ "configurationStatus.{}".format(vca_index): dict(),
+ }
+ self.update_db_2("nsrs", nsr_id, db_dict)
+
+ db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
+
+ self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
+ self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
+ self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
+
+ # Launch task
+ task_n2vc = asyncio.ensure_future(
+ self.heal_N2VC(
+ logging_text=logging_text,
+ vca_index=vca_index,
+ nsi_id=nsi_id,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ vdu_index=vdu_index,
+ deploy_params=deploy_params,
+ config_descriptor=descriptor_config,
+ base_folder=base_folder,
+ nslcmop_id=nslcmop_id,
+ stage=stage,
+ vca_type=vca_type,
+ vca_name=vca_name,
+ ee_config_descriptor=ee_item,
+ )
+ )
+ self.lcm_tasks.register(
+ "ns",
+ nsr_id,
+ nslcmop_id,
+ "instantiate_N2VC-{}".format(vca_index),
+ task_n2vc,
+ )
+ task_instantiation_info[
+ task_n2vc
+ ] = self.task_name_deploy_vca + " {}.{}".format(
+ member_vnf_index or "", vdu_id or ""
+ )
+
+ async def heal_N2VC(
+ self,
+ logging_text,
+ vca_index,
+ nsi_id,
+ db_nsr,
+ db_vnfr,
+ vdu_id,
+ kdu_name,
+ vdu_index,
+ config_descriptor,
+ deploy_params,
+ base_folder,
+ nslcmop_id,
+ stage,
+ vca_type,
+ vca_name,
+ ee_config_descriptor,
+ ):
+ nsr_id = db_nsr["_id"]
+ db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
+ vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
+ vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
+ osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
+ db_dict = {
+ "collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": db_update_entry,
+ }
+ step = ""
+ try:
+ element_type = "NS"
+ element_under_configuration = nsr_id
+
+ vnfr_id = None
+ if db_vnfr:
+ vnfr_id = db_vnfr["_id"]
+ osm_config["osm"]["vnf_id"] = vnfr_id
+
+ namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
+
+ if vca_type == "native_charm":
+ index_number = 0
+ else:
+ index_number = vdu_index or 0
+
+ if vnfr_id:
+ element_type = "VNF"
+ element_under_configuration = vnfr_id
+ namespace += ".{}-{}".format(vnfr_id, index_number)
+ if vdu_id:
+ namespace += ".{}-{}".format(vdu_id, index_number)
+ element_type = "VDU"
+ element_under_configuration = "{}-{}".format(vdu_id, index_number)
+ osm_config["osm"]["vdu_id"] = vdu_id
+ elif kdu_name:
+ namespace += ".{}".format(kdu_name)
+ element_type = "KDU"
+ element_under_configuration = kdu_name
+ osm_config["osm"]["kdu_name"] = kdu_name
+
+ # Get artifact path
+ if base_folder["pkg-dir"]:
+ artifact_path = "{}/{}/{}/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ "charms"
+ if vca_type
+ in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
+ else "helm-charts",
+ vca_name,
+ )
+ else:
+ artifact_path = "{}/Scripts/{}/{}/".format(
+ base_folder["folder"],
+ "charms"
+ if vca_type
+ in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
+ else "helm-charts",
+ vca_name,
+ )
+
+ self.logger.debug("Artifact path > {}".format(artifact_path))
+
+ # get initial_config_primitive_list that applies to this element
+ initial_config_primitive_list = config_descriptor.get(
+ "initial-config-primitive"
+ )
+
+ self.logger.debug(
+ "Initial config primitive list > {}".format(
+ initial_config_primitive_list
+ )
+ )
+
+ # add config if not present for NS charm
+ ee_descriptor_id = ee_config_descriptor.get("id")
+ self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
+ initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
+ initial_config_primitive_list, vca_deployed, ee_descriptor_id
+ )
+
+ self.logger.debug(
+ "Initial config primitive list #2 > {}".format(
+ initial_config_primitive_list
+ )
+ )
+ # n2vc_redesign STEP 3.1
+ # find old ee_id if exists
+ ee_id = vca_deployed.get("ee_id")
+
+ vca_id = self.get_vca_id(db_vnfr, db_nsr)
+ # create or register execution environment in VCA. Only for native charms when healing
+ if vca_type == "native_charm":
+ step = "Waiting to VM being up and getting IP address"
+ self.logger.debug(logging_text + step)
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
+ logging_text,
+ nsr_id,
+ vnfr_id,
+ vdu_id,
+ vdu_index,
+ user=None,
+ pub_key=None,
+ )
+ credentials = {"hostname": rw_mgmt_ip}
+ # get username
+ username = deep_get(
+ config_descriptor, ("config-access", "ssh-access", "default-user")
+ )
+ # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
+ # merged. Meanwhile let's get username from initial-config-primitive
+ if not username and initial_config_primitive_list:
+ for config_primitive in initial_config_primitive_list:
+ for param in config_primitive.get("parameter", ()):
+ if param["name"] == "ssh-username":
+ username = param["value"]
+ break
+ if not username:
+ raise LcmException(
+ "Cannot determine the username neither with 'initial-config-primitive' nor with "
+ "'config-access.ssh-access.default-user'"
+ )
+ credentials["username"] = username
- db_vca_and_config_update = {
- "_admin.deployed.VCA": vca_update,
- "configurationStatus": config_update,
- }
- self.update_db_2(
- "nsrs", db_nsr["_id"], db_vca_and_config_update
- )
- scale_process = None
- # SCALE-IN VCA - END
+ # n2vc_redesign STEP 3.2
+ # TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status="REGISTERING",
+ element_under_configuration=element_under_configuration,
+ element_type=element_type,
+ )
- # SCALE RO - BEGIN
- if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
- scale_process = "RO"
- if self.ro_config.get("ng"):
- await self._scale_ng_ro(
- logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
- )
- scaling_info.pop("vdu-create", None)
- scaling_info.pop("vdu-delete", None)
+ step = "register execution environment {}".format(credentials)
+ self.logger.debug(logging_text + step)
+ ee_id = await self.vca_map[vca_type].register_execution_environment(
+ credentials=credentials,
+ namespace=namespace,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ )
- scale_process = None
- # SCALE RO - END
+ # update ee_id en db
+ db_dict_ee_id = {
+ "_admin.deployed.VCA.{}.ee_id".format(vca_index): ee_id,
+ }
+ self.update_db_2("nsrs", nsr_id, db_dict_ee_id)
- # SCALE KDU - BEGIN
- if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
- scale_process = "KDU"
- await self._scale_kdu(
- logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
+ # for compatibility with MON/POL modules, the need model and application name at database
+ # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
+ # Not sure if this need to be done when healing
+ """
+ ee_id_parts = ee_id.split(".")
+ db_nsr_update = {db_update_entry + "ee_id": ee_id}
+ if len(ee_id_parts) >= 2:
+ model_name = ee_id_parts[0]
+ application_name = ee_id_parts[1]
+ db_nsr_update[db_update_entry + "model"] = model_name
+ db_nsr_update[db_update_entry + "application"] = application_name
+ """
+
+ # n2vc_redesign STEP 3.3
+ # Install configuration software. Only for native charms.
+ step = "Install configuration Software"
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status="INSTALLING SW",
+ element_under_configuration=element_under_configuration,
+ element_type=element_type,
+ # other_update=db_nsr_update,
+ other_update=None,
+ )
+
+ # TODO check if already done
+ self.logger.debug(logging_text + step)
+ config = None
+ if vca_type == "native_charm":
+ config_primitive = next(
+ (p for p in initial_config_primitive_list if p["name"] == "config"),
+ None,
+ )
+ if config_primitive:
+ config = self._map_primitive_params(
+ config_primitive, {}, deploy_params
+ )
+ await self.vca_map[vca_type].install_configuration_sw(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ db_dict=db_dict,
+ config=config,
+ num_units=1,
+ vca_id=vca_id,
+ vca_type=vca_type,
)
- scaling_info.pop("kdu-create", None)
- scaling_info.pop("kdu-delete", None)
- scale_process = None
- # SCALE KDU - END
+ # write in db flag of configuration_sw already installed
+ self.update_db_2(
+ "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
+ )
- if db_nsr_update:
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ # Not sure if this need to be done when healing
+ """
+ # add relations for this VCA (wait for other peers related with this VCA)
+ await self._add_vca_relations(
+ logging_text=logging_text,
+ nsr_id=nsr_id,
+ vca_type=vca_type,
+ vca_index=vca_index,
+ )
+ """
- # SCALE-UP VCA - BEGIN
- if vca_scaling_info:
- step = db_nslcmop_update[
- "detailed-status"
- ] = "Creating new execution environments"
- scale_process = "VCA"
- for vca_info in vca_scaling_info:
- if vca_info["type"] == "create":
- member_vnf_index = str(vca_info["member-vnf-index"])
- self.logger.debug(
- logging_text + "vdu info: {}".format(vca_info)
+ # if SSH access is required, then get execution environment SSH public
+ # if native charm we have waited already to VM be UP
+ if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
+ pub_key = None
+ user = None
+ # self.logger.debug("get ssh key block")
+ if deep_get(
+ config_descriptor, ("config-access", "ssh-access", "required")
+ ):
+ # self.logger.debug("ssh key needed")
+ # Needed to inject a ssh key
+ user = deep_get(
+ config_descriptor,
+ ("config-access", "ssh-access", "default-user"),
+ )
+ step = "Install configuration Software, getting public ssh key"
+ pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
+ ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
+ )
+
+ step = "Insert public key into VM user={} ssh_key={}".format(
+ user, pub_key
+ )
+ else:
+ # self.logger.debug("no need to get ssh key")
+ step = "Waiting to VM being up and getting IP address"
+ self.logger.debug(logging_text + step)
+
+ # n2vc_redesign STEP 5.1
+ # wait for RO (ip-address) Insert pub_key into VM
+ # IMPORTANT: We need do wait for RO to complete healing operation.
+ await self._wait_heal_ro(nsr_id, self.timeout.ns_heal)
+ if vnfr_id:
+ if kdu_name:
+ rw_mgmt_ip = await self.wait_kdu_up(
+ logging_text, nsr_id, vnfr_id, kdu_name
)
- vnfd_id = db_vnfr["vnfd-ref"]
- if vca_info.get("osm_vdu_id"):
- vdu_index = int(vca_info["vdu_index"])
- deploy_params = {"OSM": get_osm_params(db_vnfr)}
- if db_vnfr.get("additionalParamsForVnf"):
- deploy_params.update(
- parse_yaml_strings(
- db_vnfr["additionalParamsForVnf"].copy()
- )
- )
- descriptor_config = get_configuration(
- db_vnfd, db_vnfd["id"]
- )
- if descriptor_config:
- vdu_id = None
- vdu_name = None
- kdu_name = None
- self._deploy_n2vc(
- logging_text=logging_text
- + "member_vnf_index={} ".format(member_vnf_index),
- db_nsr=db_nsr,
- db_vnfr=db_vnfr,
- nslcmop_id=nslcmop_id,
- nsr_id=nsr_id,
- nsi_id=nsi_id,
- vnfd_id=vnfd_id,
- vdu_id=vdu_id,
- kdu_name=kdu_name,
- member_vnf_index=member_vnf_index,
- vdu_index=vdu_index,
- vdu_name=vdu_name,
- deploy_params=deploy_params,
- descriptor_config=descriptor_config,
- base_folder=base_folder,
- task_instantiation_info=tasks_dict_info,
- stage=stage,
- )
- vdu_id = vca_info["osm_vdu_id"]
- vdur = find_in_list(
- db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
- )
- descriptor_config = get_configuration(db_vnfd, vdu_id)
- if vdur.get("additionalParams"):
- deploy_params_vdu = parse_yaml_strings(
- vdur["additionalParams"]
- )
- else:
- deploy_params_vdu = deploy_params
- deploy_params_vdu["OSM"] = get_osm_params(
- db_vnfr, vdu_id, vdu_count_index=vdu_index
- )
- if descriptor_config:
- vdu_name = None
- kdu_name = None
- stage[
- 1
- ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
- member_vnf_index, vdu_id, vdu_index
- )
- stage[2] = step = "Scaling out VCA"
- self._write_op_status(op_id=nslcmop_id, stage=stage)
- self._deploy_n2vc(
- logging_text=logging_text
- + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
- member_vnf_index, vdu_id, vdu_index
- ),
- db_nsr=db_nsr,
- db_vnfr=db_vnfr,
- nslcmop_id=nslcmop_id,
- nsr_id=nsr_id,
- nsi_id=nsi_id,
- vnfd_id=vnfd_id,
- vdu_id=vdu_id,
- kdu_name=kdu_name,
- member_vnf_index=member_vnf_index,
- vdu_index=vdu_index,
- vdu_name=vdu_name,
- deploy_params=deploy_params_vdu,
- descriptor_config=descriptor_config,
- base_folder=base_folder,
- task_instantiation_info=tasks_dict_info,
- stage=stage,
- )
- else:
- kdu_name = vca_info["osm_kdu_id"]
- descriptor_config = get_configuration(db_vnfd, kdu_name)
- if descriptor_config:
- vdu_id = None
- kdu_index = int(vca_info["kdu_index"])
- vdu_name = None
- kdur = next(
- x
- for x in db_vnfr["kdur"]
- if x["kdu-name"] == kdu_name
- )
- deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
- if kdur.get("additionalParams"):
- deploy_params_kdu = parse_yaml_strings(
- kdur["additionalParams"]
- )
+ else:
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
+ logging_text,
+ nsr_id,
+ vnfr_id,
+ vdu_id,
+ vdu_index,
+ user=user,
+ pub_key=pub_key,
+ )
+ else:
+ rw_mgmt_ip = None # This is for a NS configuration
- self._deploy_n2vc(
- logging_text=logging_text,
- db_nsr=db_nsr,
- db_vnfr=db_vnfr,
- nslcmop_id=nslcmop_id,
- nsr_id=nsr_id,
- nsi_id=nsi_id,
- vnfd_id=vnfd_id,
- vdu_id=vdu_id,
- kdu_name=kdu_name,
- member_vnf_index=member_vnf_index,
- vdu_index=kdu_index,
- vdu_name=vdu_name,
- deploy_params=deploy_params_kdu,
- descriptor_config=descriptor_config,
- base_folder=base_folder,
- task_instantiation_info=tasks_dict_info,
- stage=stage,
- )
- # SCALE-UP VCA - END
- scale_process = None
+ self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
- # POST-SCALE BEGIN
- # execute primitive service POST-SCALING
- step = "Executing post-scale vnf-config-primitive"
- if scaling_descriptor.get("scaling-config-action"):
- for scaling_config_action in scaling_descriptor[
- "scaling-config-action"
- ]:
- if (
- scaling_config_action.get("trigger") == "post-scale-in"
- and scaling_type == "SCALE_IN"
- ) or (
- scaling_config_action.get("trigger") == "post-scale-out"
- and scaling_type == "SCALE_OUT"
- ):
- vnf_config_primitive = scaling_config_action[
- "vnf-config-primitive-name-ref"
- ]
- step = db_nslcmop_update[
- "detailed-status"
- ] = "executing post-scale scaling-config-action '{}'".format(
- vnf_config_primitive
- )
+ # store rw_mgmt_ip in deploy params for later replacement
+ deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
- vnfr_params = {"VDU_SCALE_INFO": scaling_info}
- if db_vnfr.get("additionalParamsForVnf"):
- vnfr_params.update(db_vnfr["additionalParamsForVnf"])
+ # Day1 operations.
+ # get run-day1 operation parameter
+ runDay1 = deploy_params.get("run-day1", False)
+ self.logger.debug(
+ "Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id, vdu_id, runDay1)
+ )
+ if runDay1:
+ # n2vc_redesign STEP 6 Execute initial config primitive
+ step = "execute initial config primitive"
+
+ # wait for dependent primitives execution (NS -> VNF -> VDU)
+ if initial_config_primitive_list:
+ await self._wait_dependent_n2vc(
+ nsr_id, vca_deployed_list, vca_index
+ )
- # look for primitive
- for config_primitive in (
- get_configuration(db_vnfd, db_vnfd["id"]) or {}
- ).get("config-primitive", ()):
- if config_primitive["name"] == vnf_config_primitive:
- break
- else:
- raise LcmException(
- "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
- "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
- "config-primitive".format(
- scaling_group, vnf_config_primitive
- )
- )
- scale_process = "VCA"
- db_nsr_update["config-status"] = "configuring post-scaling"
- primitive_params = self._map_primitive_params(
- config_primitive, {}, vnfr_params
- )
+ # stage, in function of element type: vdu, kdu, vnf or ns
+ my_vca = vca_deployed_list[vca_index]
+ if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
+ # VDU or KDU
+ stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
+ elif my_vca.get("member-vnf-index"):
+ # VNF
+ stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
+ else:
+ # NS
+ stage[0] = "Stage 5/5: running Day-1 primitives for NS."
- # Post-scale retry check: Check if this sub-operation has been executed before
- op_index = self._check_or_add_scale_suboperation(
- db_nslcmop,
- nslcmop_id,
- vnf_index,
- vnf_config_primitive,
- primitive_params,
- "POST-SCALE",
+ self._write_configuration_status(
+ nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
+ )
+
+ self._write_op_status(op_id=nslcmop_id, stage=stage)
+
+ check_if_terminated_needed = True
+ for initial_config_primitive in initial_config_primitive_list:
+ # adding information on the vca_deployed if it is a NS execution environment
+ if not vca_deployed["member-vnf-index"]:
+ deploy_params["ns_config_info"] = json.dumps(
+ self._get_ns_config_info(nsr_id)
)
- if op_index == self.SUBOPERATION_STATUS_SKIP:
- # Skip sub-operation
- result = "COMPLETED"
- result_detail = "Done"
- self.logger.debug(
- logging_text
- + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
- vnf_config_primitive, result, result_detail
- )
- )
- else:
- if op_index == self.SUBOPERATION_STATUS_NEW:
- # New sub-operation: Get index of this sub-operation
- op_index = (
- len(db_nslcmop.get("_admin", {}).get("operations"))
- - 1
- )
- self.logger.debug(
- logging_text
- + "vnf_config_primitive={} New sub-operation".format(
- vnf_config_primitive
- )
- )
- else:
- # retry: Get registered params for this existing sub-operation
- op = db_nslcmop.get("_admin", {}).get("operations", [])[
- op_index
- ]
- vnf_index = op.get("member_vnf_index")
- vnf_config_primitive = op.get("primitive")
- primitive_params = op.get("primitive_params")
- self.logger.debug(
- logging_text
- + "vnf_config_primitive={} Sub-operation retry".format(
- vnf_config_primitive
- )
- )
- # Execute the primitive, either with new (first-time) or registered (reintent) args
- ee_descriptor_id = config_primitive.get(
- "execution-environment-ref"
- )
- primitive_name = config_primitive.get(
- "execution-environment-primitive", vnf_config_primitive
- )
- ee_id, vca_type = self._look_for_deployed_vca(
- nsr_deployed["VCA"],
- member_vnf_index=vnf_index,
- vdu_id=None,
- vdu_count_index=None,
- ee_descriptor_id=ee_descriptor_id,
- )
- result, result_detail = await self._ns_execute_primitive(
- ee_id,
- primitive_name,
- primitive_params,
- vca_type=vca_type,
- vca_id=vca_id,
- )
- self.logger.debug(
- logging_text
- + "vnf_config_primitive={} Done with result {} {}".format(
- vnf_config_primitive, result, result_detail
- )
- )
- # Update operationState = COMPLETED | FAILED
- self._update_suboperation_status(
- db_nslcmop, op_index, result, result_detail
+ # TODO check if already done
+ primitive_params_ = self._map_primitive_params(
+ initial_config_primitive, {}, deploy_params
+ )
+
+ step = "execute primitive '{}' params '{}'".format(
+ initial_config_primitive["name"], primitive_params_
+ )
+ self.logger.debug(logging_text + step)
+ await self.vca_map[vca_type].exec_primitive(
+ ee_id=ee_id,
+ primitive_name=initial_config_primitive["name"],
+ params_dict=primitive_params_,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ vca_type=vca_type,
+ )
+ # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
+ if check_if_terminated_needed:
+ if config_descriptor.get("terminate-config-primitive"):
+ self.update_db_2(
+ "nsrs",
+ nsr_id,
+ {db_update_entry + "needed_terminate": True},
)
+ check_if_terminated_needed = False
- if result == "FAILED":
- raise LcmException(result_detail)
- db_nsr_update["config-status"] = old_config_status
- scale_process = None
- # POST-SCALE END
+ # TODO register in database that primitive is done
- db_nsr_update[
- "detailed-status"
- ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
- db_nsr_update["operational-status"] = (
- "running"
- if old_operational_status == "failed"
- else old_operational_status
+ # STEP 7 Configure metrics
+ # Not sure if this need to be done when healing
+ """
+ if vca_type == "helm" or vca_type == "helm-v3":
+ prometheus_jobs = await self.extract_prometheus_scrape_jobs(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ ee_config_descriptor=ee_config_descriptor,
+ vnfr_id=vnfr_id,
+ nsr_id=nsr_id,
+ target_ip=rw_mgmt_ip,
+ )
+ if prometheus_jobs:
+ self.update_db_2(
+ "nsrs",
+ nsr_id,
+ {db_update_entry + "prometheus_jobs": prometheus_jobs},
+ )
+
+ for job in prometheus_jobs:
+ self.db.set_one(
+ "prometheus_jobs",
+ {"job_name": job["job_name"]},
+ job,
+ upsert=True,
+ fail_on_empty=False,
+ )
+
+ """
+ step = "instantiated at VCA"
+ self.logger.debug(logging_text + step)
+
+ self._write_configuration_status(
+ nsr_id=nsr_id, vca_index=vca_index, status="READY"
)
- db_nsr_update["config-status"] = old_config_status
+
+ except Exception as e: # TODO not use Exception but N2VC exception
+ # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
+ if not isinstance(
+ e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
+ ):
+ self.logger.error(
+ "Exception while {} : {}".format(step, e), exc_info=True
+ )
+ self._write_configuration_status(
+ nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
+ )
+ raise LcmException("{} {}".format(step, e)) from e
+
+ async def _wait_heal_ro(
+ self,
+ nsr_id,
+ timeout=600,
+ ):
+ start_time = time()
+ while time() <= start_time + timeout:
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ operational_status_ro = db_nsr["_admin"]["deployed"]["RO"][
+ "operational-status"
+ ]
+ self.logger.debug("Wait Heal RO > {}".format(operational_status_ro))
+ if operational_status_ro != "healing":
+ break
+ await asyncio.sleep(15, loop=self.loop)
+ else: # timeout_ns_deploy
+ raise NgRoException("Timeout waiting ns to deploy")
+
+ async def vertical_scale(self, nsr_id, nslcmop_id):
+ """
+ Vertical Scale the VDUs in a NS
+
+ :param: nsr_id: NS Instance ID
+ :param: nslcmop_id: nslcmop ID of migrate
+
+ """
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
return
- except (
- ROclient.ROClientException,
- DbException,
- LcmException,
- NgRoException,
- ) as e:
- self.logger.error(logging_text + "Exit Exception {}".format(e))
+ logging_text = "Task ns={} vertical scale ".format(nsr_id)
+ self.logger.debug(logging_text + "Enter")
+ # get all needed from database
+ db_nslcmop = None
+ db_nslcmop_update = {}
+ nslcmop_operation_state = None
+ db_nsr_update = {}
+ target = {}
+ exc = None
+ # in case of error, indicates what part of scale was failed to put nsr at error status
+ start_deploy = time()
+
+ try:
+ # wait for any previous tasks in process
+ step = "Waiting for previous operations to terminate"
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="VerticalScale",
+ current_operation_id=nslcmop_id,
+ )
+ step = "Getting nslcmop from database"
+ self.logger.debug(
+ step + " after having waited for previous tasks to be completed"
+ )
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ operationParams = db_nslcmop.get("operationParams")
+ target = {}
+ target.update(operationParams)
+ desc = await self.RO.vertical_scale(nsr_id, target)
+ self.logger.debug("RO return > {}".format(desc))
+ action_id = desc["action_id"]
+ await self._wait_ng_ro(
+ nsr_id,
+ action_id,
+ nslcmop_id,
+ start_deploy,
+ self.timeout.verticalscale,
+ operation="verticalscale",
+ )
+ except (ROclient.ROClientException, DbException, LcmException) as e:
+ self.logger.error("Exit Exception {}".format(e))
exc = e
except asyncio.CancelledError:
- self.logger.error(
- logging_text + "Cancelled Exception while '{}'".format(step)
- )
+ self.logger.error("Cancelled Exception while '{}'".format(step))
exc = "Operation was cancelled"
except Exception as e:
exc = traceback.format_exc()
self.logger.critical(
- logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
- exc_info=True,
+ "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True
)
finally:
self._write_ns_status(
current_operation="IDLE",
current_operation_id=None,
)
- if tasks_dict_info:
- stage[1] = "Waiting for instantiate pending tasks."
- self.logger.debug(logging_text + stage[1])
- exc = await self._wait_for_tasks(
- logging_text,
- tasks_dict_info,
- self.timeout_ns_deploy,
- stage,
- nslcmop_id,
- nsr_id=nsr_id,
- )
if exc:
- db_nslcmop_update[
- "detailed-status"
- ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
+ db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
nslcmop_operation_state = "FAILED"
- if db_nsr:
- db_nsr_update["operational-status"] = old_operational_status
- db_nsr_update["config-status"] = old_config_status
- db_nsr_update["detailed-status"] = ""
- if scale_process:
- if "VCA" in scale_process:
- db_nsr_update["config-status"] = "failed"
- if "RO" in scale_process:
- db_nsr_update["operational-status"] = "failed"
- db_nsr_update[
- "detailed-status"
- ] = "FAILED scaling nslcmop={} {}: {}".format(
- nslcmop_id, step, exc
- )
else:
- error_description_nslcmop = None
nslcmop_operation_state = "COMPLETED"
db_nslcmop_update["detailed-status"] = "Done"
+ db_nsr_update["detailed-status"] = "Done"
self._write_op_status(
op_id=nslcmop_id,
stage="",
- error_message=error_description_nslcmop,
+ error_message="",
operation_state=nslcmop_operation_state,
other_update=db_nslcmop_update,
)
- if db_nsr:
- self._write_ns_status(
- nsr_id=nsr_id,
- ns_state=None,
- current_operation="IDLE",
- current_operation_id=None,
- other_update=db_nsr_update,
- )
-
if nslcmop_operation_state:
try:
msg = {
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "verticalscaled", msg, loop=self.loop)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
)
self.logger.debug(logging_text + "Exit")
- self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
-
- async def _scale_kdu(
- self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
- ):
- _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
- for kdu_name in _scaling_info:
- for kdu_scaling_info in _scaling_info[kdu_name]:
- deployed_kdu, index = get_deployed_kdu(
- nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
- )
- cluster_uuid = deployed_kdu["k8scluster-uuid"]
- kdu_instance = deployed_kdu["kdu-instance"]
- scale = int(kdu_scaling_info["scale"])
- k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
-
- db_dict = {
- "collection": "nsrs",
- "filter": {"_id": nsr_id},
- "path": "_admin.deployed.K8s.{}".format(index),
- }
-
- step = "scaling application {}".format(
- kdu_scaling_info["resource-name"]
- )
- self.logger.debug(logging_text + step)
-
- if kdu_scaling_info["type"] == "delete":
- kdu_config = get_configuration(db_vnfd, kdu_name)
- if (
- kdu_config
- and kdu_config.get("terminate-config-primitive")
- and get_juju_ee_ref(db_vnfd, kdu_name) is None
- ):
- terminate_config_primitive_list = kdu_config.get(
- "terminate-config-primitive"
- )
- terminate_config_primitive_list.sort(
- key=lambda val: int(val["seq"])
- )
-
- for (
- terminate_config_primitive
- ) in terminate_config_primitive_list:
- primitive_params_ = self._map_primitive_params(
- terminate_config_primitive, {}, {}
- )
- step = "execute terminate config primitive"
- self.logger.debug(logging_text + step)
- await asyncio.wait_for(
- self.k8scluster_map[k8s_cluster_type].exec_primitive(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- primitive_name=terminate_config_primitive["name"],
- params=primitive_params_,
- db_dict=db_dict,
- vca_id=vca_id,
- ),
- timeout=600,
- )
-
- await asyncio.wait_for(
- self.k8scluster_map[k8s_cluster_type].scale(
- kdu_instance,
- scale,
- kdu_scaling_info["resource-name"],
- vca_id=vca_id,
- ),
- timeout=self.timeout_vca_on_error,
- )
-
- if kdu_scaling_info["type"] == "create":
- kdu_config = get_configuration(db_vnfd, kdu_name)
- if (
- kdu_config
- and kdu_config.get("initial-config-primitive")
- and get_juju_ee_ref(db_vnfd, kdu_name) is None
- ):
- initial_config_primitive_list = kdu_config.get(
- "initial-config-primitive"
- )
- initial_config_primitive_list.sort(
- key=lambda val: int(val["seq"])
- )
-
- for initial_config_primitive in initial_config_primitive_list:
- primitive_params_ = self._map_primitive_params(
- initial_config_primitive, {}, {}
- )
- step = "execute initial config primitive"
- self.logger.debug(logging_text + step)
- await asyncio.wait_for(
- self.k8scluster_map[k8s_cluster_type].exec_primitive(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- primitive_name=initial_config_primitive["name"],
- params=primitive_params_,
- db_dict=db_dict,
- vca_id=vca_id,
- ),
- timeout=600,
- )
-
- async def _scale_ng_ro(
- self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
- ):
- nsr_id = db_nslcmop["nsInstanceId"]
- db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
- db_vnfrs = {}
-
- # read from db: vnfd's for every vnf
- db_vnfds = []
-
- # for each vnf in ns, read vnfd
- for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
- db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
- vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
- # if we haven't this vnfd, read it from db
- if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
- # read from db
- vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
- db_vnfds.append(vnfd)
- n2vc_key = self.n2vc.get_public_key()
- n2vc_key_list = [n2vc_key]
- self.scale_vnfr(
- db_vnfr,
- vdu_scaling_info.get("vdu-create"),
- vdu_scaling_info.get("vdu-delete"),
- mark_delete=True,
- )
- # db_vnfr has been updated, update db_vnfrs to use it
- db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
- await self._instantiate_ng_ro(
- logging_text,
- nsr_id,
- db_nsd,
- db_nsr,
- db_nslcmop,
- db_vnfrs,
- db_vnfds,
- n2vc_key_list,
- stage=stage,
- start_deploy=time(),
- timeout_ns_deploy=self.timeout_ns_deploy,
- )
- if vdu_scaling_info.get("vdu-delete"):
- self.scale_vnfr(
- db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
- )
-
- async def add_prometheus_metrics(
- self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
- ):
- if not self.prometheus:
- return
- # look if exist a file called 'prometheus*.j2' and
- artifact_content = self.fs.dir_ls(artifact_path)
- job_file = next(
- (
- f
- for f in artifact_content
- if f.startswith("prometheus") and f.endswith(".j2")
- ),
- None,
- )
- if not job_file:
- return
- with self.fs.file_open((artifact_path, job_file), "r") as f:
- job_data = f.read()
-
- # TODO get_service
- _, _, service = ee_id.partition(".") # remove prefix "namespace."
- host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
- host_port = "80"
- vnfr_id = vnfr_id.replace("-", "")
- variables = {
- "JOB_NAME": vnfr_id,
- "TARGET_IP": target_ip,
- "EXPORTER_POD_IP": host_name,
- "EXPORTER_POD_PORT": host_port,
- }
- job_list = self.prometheus.parse_job(job_data, variables)
- # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
- for job in job_list:
- if (
- not isinstance(job.get("job_name"), str)
- or vnfr_id not in job["job_name"]
- ):
- job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
- job["nsr_id"] = nsr_id
- job_dict = {jl["job_name"]: jl for jl in job_list}
- if await self.prometheus.update(job_dict):
- return list(job_dict.keys())
-
- def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
- """
- Get VCA Cloud and VCA Cloud Credentials for the VIM account
-
- :param: vim_account_id: VIM Account ID
-
- :return: (cloud_name, cloud_credential)
- """
- config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
- return config.get("vca_cloud"), config.get("vca_cloud_credential")
-
- def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
- """
- Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
-
- :param: vim_account_id: VIM Account ID
-
- :return: (cloud_name, cloud_credential)
- """
- config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
- return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_verticalscale")