blob: 9427fa05e131179cf80877fdad57c819db8ad7eb [file] [log] [blame]
# -*- coding: utf-8 -*-
##
# Copyright 2018 Telefonica S.A.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
##
import asyncio
import shutil
from typing import Any, Dict, List
import yaml
import logging
import logging.handlers
import traceback
import ipaddress
import json
from jinja2 import (
Environment,
TemplateError,
TemplateNotFound,
StrictUndefined,
UndefinedError,
select_autoescape,
)
from osm_lcm import ROclient
from osm_lcm.data_utils.lcm_config import LcmCfg
from osm_lcm.data_utils.nsr import (
get_deployed_kdu,
get_deployed_vca,
get_deployed_vca_list,
get_nsd,
)
from osm_lcm.data_utils.vca import (
DeployedComponent,
DeployedK8sResource,
DeployedVCA,
EELevel,
Relation,
EERelation,
safe_get_ee_relation,
)
from osm_lcm.ng_ro import NgRoClient, NgRoException
from osm_lcm.lcm_utils import (
LcmException,
LcmBase,
deep_get,
get_iterable,
populate_dict,
check_juju_bundle_existence,
get_charm_artifact_path,
get_ee_id_parts,
vld_to_ro_ip_profile,
)
from osm_lcm.data_utils.nsd import (
get_ns_configuration_relation_list,
get_vnf_profile,
get_vnf_profiles,
)
from osm_lcm.data_utils.vnfd import (
get_kdu,
get_kdu_services,
get_relation_list,
get_vdu_list,
get_vdu_profile,
get_ee_sorted_initial_config_primitive_list,
get_ee_sorted_terminate_config_primitive_list,
get_kdu_list,
get_virtual_link_profiles,
get_vdu,
get_configuration,
get_vdu_index,
get_scaling_aspect,
get_number_of_instances,
get_juju_ee_ref,
get_helm_ee_ref,
get_kdu_resource_profile,
find_software_version,
check_helm_ee_in_ns,
)
from osm_lcm.data_utils.list_utils import find_in_list
from osm_lcm.data_utils.vnfr import (
get_osm_params,
get_vdur_index,
get_kdur,
get_volumes_from_instantiation_params,
)
from osm_lcm.data_utils.dict_utils import parse_yaml_strings
from osm_lcm.data_utils.database.vim_account import VimAccountDB
from osm_lcm.n2vc.definitions import RelationEndpoint
from osm_lcm.n2vc.k8s_helm3_conn import K8sHelm3Connector
from osm_lcm.n2vc.k8s_juju_conn import K8sJujuConnector
from osm_common.dbbase import DbException
from osm_common.fsbase import FsException
from osm_lcm.data_utils.database.database import Database
from osm_lcm.data_utils.filesystem.filesystem import Filesystem
from osm_lcm.data_utils.wim import (
get_sdn_ports,
get_target_wim_attrs,
select_feasible_wim_account,
)
from osm_lcm.n2vc.n2vc_juju_conn import N2VCJujuConnector
from osm_lcm.n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
from osm_lcm.lcm_helm_conn import LCMHelmConn
from osm_lcm.osm_config import OsmConfigBuilder
from osm_lcm.prometheus import parse_job
from copy import copy, deepcopy
from time import time
from uuid import uuid4
from random import SystemRandom
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
class NsLcm(LcmBase):
SUBOPERATION_STATUS_NOT_FOUND = -1
SUBOPERATION_STATUS_NEW = -2
SUBOPERATION_STATUS_SKIP = -3
EE_TLS_NAME = "ee-tls"
task_name_deploy_vca = "Deploying VCA"
rel_operation_types = {
"GE": ">=",
"LE": "<=",
"GT": ">",
"LT": "<",
"EQ": "==",
"NE": "!=",
}
def __init__(self, msg, lcm_tasks, config: LcmCfg):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
self.db = Database().instance.db
self.fs = Filesystem().instance.fs
self.lcm_tasks = lcm_tasks
self.timeout = config.timeout
self.ro_config = config.RO
self.vca_config = config.VCA
self.service_kpi = config.servicekpi
# create N2VC connector
self.n2vc = N2VCJujuConnector(
log=self.logger,
on_update_db=self._on_update_n2vc_db,
fs=self.fs,
db=self.db,
)
self.conn_helm_ee = LCMHelmConn(
log=self.logger,
vca_config=self.vca_config,
on_update_db=self._on_update_n2vc_db,
)
self.k8sclusterhelm3 = K8sHelm3Connector(
kubectl_command=self.vca_config.kubectlpath,
helm_command=self.vca_config.helm3path,
fs=self.fs,
log=self.logger,
db=self.db,
on_update_db=None,
)
self.k8sclusterjuju = K8sJujuConnector(
kubectl_command=self.vca_config.kubectlpath,
juju_command=self.vca_config.jujupath,
log=self.logger,
on_update_db=self._on_update_k8s_db,
fs=self.fs,
db=self.db,
)
self.k8scluster_map = {
"helm-chart-v3": self.k8sclusterhelm3,
"chart": self.k8sclusterhelm3,
"juju-bundle": self.k8sclusterjuju,
"juju": self.k8sclusterjuju,
}
self.vca_map = {
"lxc_proxy_charm": self.n2vc,
"native_charm": self.n2vc,
"k8s_proxy_charm": self.n2vc,
"helm": self.conn_helm_ee,
"helm-v3": self.conn_helm_ee,
}
# create RO client
self.RO = NgRoClient(**self.ro_config.to_dict())
self.op_status_map = {
"instantiation": self.RO.status,
"termination": self.RO.status,
"migrate": self.RO.status,
"healing": self.RO.recreate_status,
"verticalscale": self.RO.status,
"start_stop_rebuild": self.RO.status,
}
@staticmethod
def increment_ip_mac(ip_mac, vm_index=1):
if not isinstance(ip_mac, str):
return ip_mac
try:
next_ipv6 = None
next_ipv4 = None
dual_ip = ip_mac.split(";")
if len(dual_ip) == 2:
for ip in dual_ip:
if ipaddress.ip_address(ip).version == 6:
ipv6 = ipaddress.IPv6Address(ip)
next_ipv6 = str(ipaddress.IPv6Address(int(ipv6) + 1))
elif ipaddress.ip_address(ip).version == 4:
ipv4 = ipaddress.IPv4Address(ip)
next_ipv4 = str(ipaddress.IPv4Address(int(ipv4) + 1))
return [next_ipv4, next_ipv6]
# try with ipv4 look for last dot
i = ip_mac.rfind(".")
if i > 0:
i += 1
return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
# try with ipv6 or mac look for last colon. Operate in hex
i = ip_mac.rfind(":")
if i > 0:
i += 1
# format in hex, len can be 2 for mac or 4 for ipv6
return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
ip_mac[:i], int(ip_mac[i:], 16) + vm_index
)
except Exception:
pass
return None
async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
# remove last dot from path (if exists)
if path.endswith("."):
path = path[:-1]
# self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
# .format(table, filter, path, updated_data))
try:
nsr_id = filter.get("_id")
# read ns record from database
nsr = self.db.get_one(table="nsrs", q_filter=filter)
current_ns_status = nsr.get("nsState")
# First, we need to verify if the current vcaStatus is null, because if that is the case,
# MongoDB will not be able to create the fields used within the update key in the database
if not nsr.get("vcaStatus"):
# Write an empty dictionary to the vcaStatus field, it its value is null
self.update_db_2("nsrs", nsr_id, {"vcaStatus": dict()})
# Get vca status for NS
status_dict = await self.n2vc.get_status(
namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
)
# Update the vcaStatus
db_key = f"vcaStatus.{nsr_id}.VNF"
db_dict = dict()
db_dict[db_key] = status_dict[nsr_id]
await self.n2vc.update_vca_status(db_dict[db_key], vca_id=vca_id)
# update configurationStatus for this VCA
try:
vca_index = int(path[path.rfind(".") + 1 :])
vca_list = deep_get(
target_dict=nsr, key_list=("_admin", "deployed", "VCA")
)
vca_status = vca_list[vca_index].get("status")
configuration_status_list = nsr.get("configurationStatus")
config_status = configuration_status_list[vca_index].get("status")
if config_status == "BROKEN" and vca_status != "failed":
db_dict["configurationStatus"][vca_index] = "READY"
elif config_status != "BROKEN" and vca_status == "failed":
db_dict["configurationStatus"][vca_index] = "BROKEN"
except Exception as e:
# not update configurationStatus
self.logger.debug("Error updating vca_index (ignore): {}".format(e))
# if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
# if nsState = 'DEGRADED' check if all is OK
is_degraded = False
if current_ns_status in ("READY", "DEGRADED"):
error_description = ""
# check machines
if status_dict.get("machines"):
for machine_id in status_dict.get("machines"):
machine = status_dict.get("machines").get(machine_id)
# check machine agent-status
if machine.get("agent-status"):
s = machine.get("agent-status").get("status")
if s != "started":
is_degraded = True
error_description += (
"machine {} agent-status={} ; ".format(
machine_id, s
)
)
# check machine instance status
if machine.get("instance-status"):
s = machine.get("instance-status").get("status")
if s != "running":
is_degraded = True
error_description += (
"machine {} instance-status={} ; ".format(
machine_id, s
)
)
# check applications
if status_dict.get("applications"):
for app_id in status_dict.get("applications"):
app = status_dict.get("applications").get(app_id)
# check application status
if app.get("status"):
s = app.get("status").get("status")
if s != "active":
is_degraded = True
error_description += (
"application {} status={} ; ".format(app_id, s)
)
if error_description:
db_dict["errorDescription"] = error_description
if current_ns_status == "READY" and is_degraded:
db_dict["nsState"] = "DEGRADED"
if current_ns_status == "DEGRADED" and not is_degraded:
db_dict["nsState"] = "READY"
# write to database
self.update_db_2("nsrs", nsr_id, db_dict)
except (asyncio.CancelledError, asyncio.TimeoutError):
raise
except Exception as e:
self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
async def _on_update_k8s_db(
self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju"
):
"""
Updating vca status in NSR record
:param cluster_uuid: UUID of a k8s cluster
:param kdu_instance: The unique name of the KDU instance
:param filter: To get nsr_id
:cluster_type: The cluster type (juju, k8s)
:return: none
"""
# self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
# .format(cluster_uuid, kdu_instance, filter))
nsr_id = filter.get("_id")
try:
vca_status = await self.k8scluster_map[cluster_type].status_kdu(
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
yaml_format=False,
complete_status=True,
vca_id=vca_id,
)
# First, we need to verify if the current vcaStatus is null, because if that is the case,
# MongoDB will not be able to create the fields used within the update key in the database
nsr = self.db.get_one(table="nsrs", q_filter=filter)
if not nsr.get("vcaStatus"):
# Write an empty dictionary to the vcaStatus field, it its value is null
self.update_db_2("nsrs", nsr_id, {"vcaStatus": dict()})
# Update the vcaStatus
db_key = f"vcaStatus.{nsr_id}.KNF"
db_dict = dict()
db_dict[db_key] = vca_status
if cluster_type in ("juju-bundle", "juju"):
# TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
# status in a similar way between Juju Bundles and Helm Charts on this side
await self.k8sclusterjuju.update_vca_status(
db_dict[db_key],
kdu_instance,
vca_id=vca_id,
)
self.logger.debug(
f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
)
# write to database
self.update_db_2("nsrs", nsr_id, db_dict)
except (asyncio.CancelledError, asyncio.TimeoutError):
raise
except Exception as e:
self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
@staticmethod
def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
try:
env = Environment(
undefined=StrictUndefined,
autoescape=select_autoescape(default_for_string=True, default=True),
)
template = env.from_string(cloud_init_text)
return template.render(additional_params or {})
except UndefinedError as e:
raise LcmException(
"Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
"file, must be provided in the instantiation parameters inside the "
"'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
)
except (TemplateError, TemplateNotFound) as e:
raise LcmException(
"Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
vnfd_id, vdu_id, e
)
)
def _get_vdu_cloud_init_content(self, vdu, vnfd):
cloud_init_content = cloud_init_file = None
try:
if vdu.get("cloud-init-file"):
base_folder = vnfd["_admin"]["storage"]
if base_folder["pkg-dir"]:
cloud_init_file = "{}/{}/cloud_init/{}".format(
base_folder["folder"],
base_folder["pkg-dir"],
vdu["cloud-init-file"],
)
else:
cloud_init_file = "{}/Scripts/cloud_init/{}".format(
base_folder["folder"],
vdu["cloud-init-file"],
)
with self.fs.file_open(cloud_init_file, "r") as ci_file:
cloud_init_content = ci_file.read()
elif vdu.get("cloud-init"):
cloud_init_content = vdu["cloud-init"]
return cloud_init_content
except FsException as e:
raise LcmException(
"Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
vnfd["id"], vdu["id"], cloud_init_file, e
)
)
def _get_vdu_additional_params(self, db_vnfr, vdu_id):
vdur = next(
(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]), {}
)
additional_params = vdur.get("additionalParams")
return parse_yaml_strings(additional_params)
@staticmethod
def ip_profile_2_RO(ip_profile):
RO_ip_profile = deepcopy(ip_profile)
if "dns-server" in RO_ip_profile:
if isinstance(RO_ip_profile["dns-server"], list):
RO_ip_profile["dns-address"] = []
for ds in RO_ip_profile.pop("dns-server"):
RO_ip_profile["dns-address"].append(ds["address"])
else:
RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
if RO_ip_profile.get("ip-version") == "ipv4":
RO_ip_profile["ip-version"] = "IPv4"
if RO_ip_profile.get("ip-version") == "ipv6":
RO_ip_profile["ip-version"] = "IPv6"
if "dhcp-params" in RO_ip_profile:
RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
return RO_ip_profile
def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
db_vdu_push_list = []
template_vdur = []
db_update = {"_admin.modified": time()}
if vdu_create:
for vdu_id, vdu_count in vdu_create.items():
vdur = next(
(
vdur
for vdur in reversed(db_vnfr["vdur"])
if vdur["vdu-id-ref"] == vdu_id
),
None,
)
if not vdur:
# Read the template saved in the db:
self.logger.debug(
"No vdur in the database. Using the vdur-template to scale"
)
vdur_template = db_vnfr.get("vdur-template")
if not vdur_template:
raise LcmException(
"Error scaling OUT VNFR for {}. No vnfr or template exists".format(
vdu_id
)
)
vdur = vdur_template[0]
# Delete a template from the database after using it
self.db.set_one(
"vnfrs",
{"_id": db_vnfr["_id"]},
None,
pull={"vdur-template": {"_id": vdur["_id"]}},
)
for count in range(vdu_count):
vdur_copy = deepcopy(vdur)
vdur_copy["status"] = "BUILD"
vdur_copy["status-detailed"] = None
vdur_copy["ip-address"] = None
vdur_copy["_id"] = str(uuid4())
vdur_copy["count-index"] += count + 1
vdur_copy["id"] = "{}-{}".format(
vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
)
vdur_copy.pop("vim_info", None)
for iface in vdur_copy["interfaces"]:
if iface.get("fixed-ip"):
iface["ip-address"] = self.increment_ip_mac(
iface["ip-address"], count + 1
)
else:
iface.pop("ip-address", None)
if iface.get("fixed-mac"):
iface["mac-address"] = self.increment_ip_mac(
iface["mac-address"], count + 1
)
else:
iface.pop("mac-address", None)
if db_vnfr["vdur"]:
iface.pop(
"mgmt_vnf", None
) # only first vdu can be managment of vnf
db_vdu_push_list.append(vdur_copy)
# self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
if vdu_delete:
if len(db_vnfr["vdur"]) == 1:
# The scale will move to 0 instances
self.logger.debug(
"Scaling to 0 !, creating the template with the last vdur"
)
template_vdur = [db_vnfr["vdur"][0]]
for vdu_id, vdu_count in vdu_delete.items():
if mark_delete:
indexes_to_delete = [
iv[0]
for iv in enumerate(db_vnfr["vdur"])
if iv[1]["vdu-id-ref"] == vdu_id
]
db_update.update(
{
"vdur.{}.status".format(i): "DELETING"
for i in indexes_to_delete[-vdu_count:]
}
)
else:
# it must be deleted one by one because common.db does not allow otherwise
vdus_to_delete = [
v
for v in reversed(db_vnfr["vdur"])
if v["vdu-id-ref"] == vdu_id
]
for vdu in vdus_to_delete[:vdu_count]:
self.db.set_one(
"vnfrs",
{"_id": db_vnfr["_id"]},
None,
pull={"vdur": {"_id": vdu["_id"]}},
)
db_push = {}
if db_vdu_push_list:
db_push["vdur"] = db_vdu_push_list
if template_vdur:
db_push["vdur-template"] = template_vdur
if not db_push:
db_push = None
db_vnfr["vdur-template"] = template_vdur
self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
# modify passed dictionary db_vnfr
db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
db_vnfr["vdur"] = db_vnfr_["vdur"]
def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
"""
Updates database nsr with the RO info for the created vld
:param ns_update_nsr: dictionary to be filled with the updated info
:param db_nsr: content of db_nsr. This is also modified
:param nsr_desc_RO: nsr descriptor from RO
:return: Nothing, LcmException is raised on errors
"""
for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
for net_RO in get_iterable(nsr_desc_RO, "nets"):
if vld["id"] != net_RO.get("ns_net_osm_id"):
continue
vld["vim-id"] = net_RO.get("vim_net_id")
vld["name"] = net_RO.get("vim_name")
vld["status"] = net_RO.get("status")
vld["status-detailed"] = net_RO.get("error_msg")
ns_update_nsr["vld.{}".format(vld_index)] = vld
break
else:
raise LcmException(
"ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
)
def set_vnfr_at_error(self, db_vnfrs, error_text):
try:
for db_vnfr in db_vnfrs.values():
vnfr_update = {"status": "ERROR"}
for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
if "status" not in vdur:
vdur["status"] = "ERROR"
vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
if error_text:
vdur["status-detailed"] = str(error_text)
vnfr_update[
"vdur.{}.status-detailed".format(vdu_index)
] = "ERROR"
self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
except DbException as e:
self.logger.error("Cannot update vnf. {}".format(e))
def _get_ns_config_info(self, nsr_id):
"""
Generates a mapping between vnf,vdu elements and the N2VC id
:param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
:return: a dictionary with {osm-config-mapping: {}} where its element contains:
"<member-vnf-index>": <N2VC-id> for a vnf configuration, or
"<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
"""
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
mapping = {}
ns_config_info = {"osm-config-mapping": mapping}
for vca in vca_deployed_list:
if not vca["member-vnf-index"]:
continue
if not vca["vdu_id"]:
mapping[vca["member-vnf-index"]] = vca["application"]
else:
mapping[
"{}.{}.{}".format(
vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
)
] = vca["application"]
return ns_config_info
async def _instantiate_ng_ro(
self,
logging_text,
nsr_id,
nsd,
db_nsr,
db_nslcmop,
db_vnfrs,
db_vnfds,
n2vc_key_list,
stage,
start_deploy,
timeout_ns_deploy,
):
db_vims = {}
def get_vim_account(vim_account_id):
nonlocal db_vims
if vim_account_id in db_vims:
return db_vims[vim_account_id]
db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
db_vims[vim_account_id] = db_vim
return db_vim
# modify target_vld info with instantiation parameters
def parse_vld_instantiation_params(
target_vim, target_vld, vld_params, target_sdn
):
if vld_params.get("ip-profile"):
target_vld["vim_info"][target_vim]["ip_profile"] = vld_to_ro_ip_profile(
vld_params["ip-profile"]
)
if vld_params.get("provider-network"):
target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
"provider-network"
]
if "sdn-ports" in vld_params["provider-network"] and target_sdn:
target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
"provider-network"
]["sdn-ports"]
# check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
# if wim_account_id is specified in vld_params, validate if it is feasible.
wim_account_id, db_wim = select_feasible_wim_account(
db_nsr, db_vnfrs, target_vld, vld_params, self.logger
)
if wim_account_id:
# WIM is needed and a feasible one was found, populate WIM target and SDN ports
self.logger.info("WIM selected: {:s}".format(str(wim_account_id)))
# update vld_params with correct WIM account Id
vld_params["wimAccountId"] = wim_account_id
target_wim = "wim:{}".format(wim_account_id)
target_wim_attrs = get_target_wim_attrs(nsr_id, target_vld, vld_params)
sdn_ports = get_sdn_ports(vld_params, db_wim)
if len(sdn_ports) > 0:
target_vld["vim_info"][target_wim] = target_wim_attrs
target_vld["vim_info"][target_wim]["sdn-ports"] = sdn_ports
self.logger.debug(
"Target VLD with WIM data: {:s}".format(str(target_vld))
)
for param in ("vim-network-name", "vim-network-id"):
if vld_params.get(param):
if isinstance(vld_params[param], dict):
for vim, vim_net in vld_params[param].items():
other_target_vim = "vim:" + vim
populate_dict(
target_vld["vim_info"],
(other_target_vim, param.replace("-", "_")),
vim_net,
)
else: # isinstance str
target_vld["vim_info"][target_vim][
param.replace("-", "_")
] = vld_params[param]
if vld_params.get("common_id"):
target_vld["common_id"] = vld_params.get("common_id")
# modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
def update_ns_vld_target(target, ns_params):
for vnf_params in ns_params.get("vnf", ()):
if vnf_params.get("vimAccountId"):
target_vnf = next(
(
vnfr
for vnfr in db_vnfrs.values()
if vnf_params["member-vnf-index"]
== vnfr["member-vnf-index-ref"]
),
None,
)
vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
if not vdur:
continue
for a_index, a_vld in enumerate(target["ns"]["vld"]):
target_vld = find_in_list(
get_iterable(vdur, "interfaces"),
lambda iface: iface.get("ns-vld-id") == a_vld["name"],
)
vld_params = find_in_list(
get_iterable(ns_params, "vld"),
lambda v_vld: v_vld["name"] in (a_vld["name"], a_vld["id"]),
)
if target_vld:
if vnf_params.get("vimAccountId") not in a_vld.get(
"vim_info", {}
):
target_vim_network_list = [
v for _, v in a_vld.get("vim_info").items()
]
target_vim_network_name = next(
(
item.get("vim_network_name", "")
for item in target_vim_network_list
),
"",
)
target["ns"]["vld"][a_index].get("vim_info").update(
{
"vim:{}".format(vnf_params["vimAccountId"]): {
"vim_network_name": target_vim_network_name,
}
}
)
if vld_params:
for param in ("vim-network-name", "vim-network-id"):
if vld_params.get(param) and isinstance(
vld_params[param], dict
):
for vim, vim_net in vld_params[
param
].items():
other_target_vim = "vim:" + vim
populate_dict(
target["ns"]["vld"][a_index].get(
"vim_info"
),
(
other_target_vim,
param.replace("-", "_"),
),
vim_net,
)
nslcmop_id = db_nslcmop["_id"]
target = {
"name": db_nsr["name"],
"ns": {"vld": []},
"vnf": [],
"image": deepcopy(db_nsr["image"]),
"flavor": deepcopy(db_nsr["flavor"]),
"action_id": nslcmop_id,
"cloud_init_content": {},
}
for image in target["image"]:
image["vim_info"] = {}
for flavor in target["flavor"]:
flavor["vim_info"] = {}
if db_nsr.get("shared-volumes"):
target["shared-volumes"] = deepcopy(db_nsr["shared-volumes"])
for shared_volumes in target["shared-volumes"]:
shared_volumes["vim_info"] = {}
if db_nsr.get("affinity-or-anti-affinity-group"):
target["affinity-or-anti-affinity-group"] = deepcopy(
db_nsr["affinity-or-anti-affinity-group"]
)
for affinity_or_anti_affinity_group in target[
"affinity-or-anti-affinity-group"
]:
affinity_or_anti_affinity_group["vim_info"] = {}
if db_nslcmop.get("lcmOperationType") != "instantiate":
# get parameters of instantiation:
db_nslcmop_instantiate = self.db.get_list(
"nslcmops",
{
"nsInstanceId": db_nslcmop["nsInstanceId"],
"lcmOperationType": "instantiate",
},
)[-1]
ns_params = db_nslcmop_instantiate.get("operationParams")
else:
ns_params = db_nslcmop.get("operationParams")
ssh_keys_instantiation = ns_params.get("ssh_keys") or []
ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
cp2target = {}
for vld_index, vld in enumerate(db_nsr.get("vld")):
target_vim = "vim:{}".format(ns_params["vimAccountId"])
target_vld = {
"id": vld["id"],
"name": vld["name"],
"mgmt-network": vld.get("mgmt-network", False),
"type": vld.get("type"),
"vim_info": {
target_vim: {
"vim_network_name": vld.get("vim-network-name"),
"vim_account_id": ns_params["vimAccountId"],
}
},
}
# check if this network needs SDN assist
if vld.get("pci-interfaces"):
db_vim = get_vim_account(ns_params["vimAccountId"])
if vim_config := db_vim.get("config"):
if sdnc_id := vim_config.get("sdn-controller"):
sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
target_sdn = "sdn:{}".format(sdnc_id)
target_vld["vim_info"][target_sdn] = {
"sdn": True,
"target_vim": target_vim,
"vlds": [sdn_vld],
"type": vld.get("type"),
}
nsd_vnf_profiles = get_vnf_profiles(nsd)
for nsd_vnf_profile in nsd_vnf_profiles:
for cp in nsd_vnf_profile["virtual-link-connectivity"]:
if cp["virtual-link-profile-id"] == vld["id"]:
cp2target[
"member_vnf:{}.{}".format(
cp["constituent-cpd-id"][0][
"constituent-base-element-id"
],
cp["constituent-cpd-id"][0]["constituent-cpd-id"],
)
] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
# check at nsd descriptor, if there is an ip-profile
vld_params = {}
nsd_vlp = find_in_list(
get_virtual_link_profiles(nsd),
lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
== vld["id"],
)
if (
nsd_vlp
and nsd_vlp.get("virtual-link-protocol-data")
and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
):
vld_params["ip-profile"] = nsd_vlp["virtual-link-protocol-data"][
"l3-protocol-data"
]
# update vld_params with instantiation params
vld_instantiation_params = find_in_list(
get_iterable(ns_params, "vld"),
lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
)
if vld_instantiation_params:
vld_params.update(vld_instantiation_params)
parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
target["ns"]["vld"].append(target_vld)
# Update the target ns_vld if vnf vim_account is overriden by instantiation params
update_ns_vld_target(target, ns_params)
for vnfr in db_vnfrs.values():
vnfd = find_in_list(
db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
)
vnf_params = find_in_list(
get_iterable(ns_params, "vnf"),
lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
)
target_vnf = deepcopy(vnfr)
target_vim = "vim:{}".format(vnfr["vim-account-id"])
for vld in target_vnf.get("vld", ()):
# check if connected to a ns.vld, to fill target'
vnf_cp = find_in_list(
vnfd.get("int-virtual-link-desc", ()),
lambda cpd: cpd.get("id") == vld["id"],
)
if vnf_cp:
ns_cp = "member_vnf:{}.{}".format(
vnfr["member-vnf-index-ref"], vnf_cp["id"]
)
if cp2target.get(ns_cp):
vld["target"] = cp2target[ns_cp]
vld["vim_info"] = {
target_vim: {"vim_network_name": vld.get("vim-network-name")}
}
# check if this network needs SDN assist
target_sdn = None
if vld.get("pci-interfaces"):
db_vim = get_vim_account(vnfr["vim-account-id"])
sdnc_id = db_vim["config"].get("sdn-controller")
if sdnc_id:
sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
target_sdn = "sdn:{}".format(sdnc_id)
vld["vim_info"][target_sdn] = {
"sdn": True,
"target_vim": target_vim,
"vlds": [sdn_vld],
"type": vld.get("type"),
}
# check at vnfd descriptor, if there is an ip-profile
vld_params = {}
vnfd_vlp = find_in_list(
get_virtual_link_profiles(vnfd),
lambda a_link_profile: a_link_profile["id"] == vld["id"],
)
if (
vnfd_vlp
and vnfd_vlp.get("virtual-link-protocol-data")
and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
):
vld_params["ip-profile"] = vnfd_vlp["virtual-link-protocol-data"][
"l3-protocol-data"
]
# update vld_params with instantiation params
if vnf_params:
vld_instantiation_params = find_in_list(
get_iterable(vnf_params, "internal-vld"),
lambda i_vld: i_vld["name"] == vld["id"],
)
if vld_instantiation_params:
vld_params.update(vld_instantiation_params)
parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
vdur_list = []
for vdur in target_vnf.get("vdur", ()):
if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
continue # This vdu must not be created
vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
if ssh_keys_all:
vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
vnf_configuration = get_configuration(vnfd, vnfd["id"])
if (
vdu_configuration
and vdu_configuration.get("config-access")
and vdu_configuration.get("config-access").get("ssh-access")
):
vdur["ssh-keys"] = ssh_keys_all
vdur["ssh-access-required"] = vdu_configuration[
"config-access"
]["ssh-access"]["required"]
elif (
vnf_configuration
and vnf_configuration.get("config-access")
and vnf_configuration.get("config-access").get("ssh-access")
and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
):
vdur["ssh-keys"] = ssh_keys_all
vdur["ssh-access-required"] = vnf_configuration[
"config-access"
]["ssh-access"]["required"]
elif ssh_keys_instantiation and find_in_list(
vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
):
vdur["ssh-keys"] = ssh_keys_instantiation
self.logger.debug("NS > vdur > {}".format(vdur))
vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
# cloud-init
if vdud.get("cloud-init-file"):
vdur["cloud-init"] = "{}:file:{}".format(
vnfd["_id"], vdud.get("cloud-init-file")
)
# read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
if vdur["cloud-init"] not in target["cloud_init_content"]:
base_folder = vnfd["_admin"]["storage"]
if base_folder["pkg-dir"]:
cloud_init_file = "{}/{}/cloud_init/{}".format(
base_folder["folder"],
base_folder["pkg-dir"],
vdud.get("cloud-init-file"),
)
else:
cloud_init_file = "{}/Scripts/cloud_init/{}".format(
base_folder["folder"],
vdud.get("cloud-init-file"),
)
with self.fs.file_open(cloud_init_file, "r") as ci_file:
target["cloud_init_content"][
vdur["cloud-init"]
] = ci_file.read()
elif vdud.get("cloud-init"):
vdur["cloud-init"] = "{}:vdu:{}".format(
vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
)
# put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
target["cloud_init_content"][vdur["cloud-init"]] = vdud[
"cloud-init"
]
vdur["additionalParams"] = vdur.get("additionalParams") or {}
deploy_params_vdu = self._format_additional_params(
vdur.get("additionalParams") or {}
)
deploy_params_vdu["OSM"] = get_osm_params(
vnfr, vdur["vdu-id-ref"], vdur["count-index"]
)
vdur["additionalParams"] = deploy_params_vdu
# flavor
ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
if target_vim not in ns_flavor["vim_info"]:
ns_flavor["vim_info"][target_vim] = {}
# deal with images
# in case alternative images are provided we must check if they should be applied
# for the vim_type, modify the vim_type taking into account
ns_image_id = int(vdur["ns-image-id"])
if vdur.get("alt-image-ids"):
db_vim = get_vim_account(vnfr["vim-account-id"])
vim_type = db_vim["vim_type"]
for alt_image_id in vdur.get("alt-image-ids"):
ns_alt_image = target["image"][int(alt_image_id)]
if vim_type == ns_alt_image.get("vim-type"):
# must use alternative image
self.logger.debug(
"use alternative image id: {}".format(alt_image_id)
)
ns_image_id = alt_image_id
vdur["ns-image-id"] = ns_image_id
break
ns_image = target["image"][int(ns_image_id)]
if target_vim not in ns_image["vim_info"]:
ns_image["vim_info"][target_vim] = {}
# Affinity groups
if vdur.get("affinity-or-anti-affinity-group-id"):
for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
if target_vim not in ns_ags["vim_info"]:
ns_ags["vim_info"][target_vim] = {}
# shared-volumes
if vdur.get("shared-volumes-id"):
for sv_id in vdur["shared-volumes-id"]:
ns_sv = find_in_list(
target["shared-volumes"], lambda sv: sv_id in sv["id"]
)
if ns_sv:
ns_sv["vim_info"][target_vim] = {}
vdur["vim_info"] = {target_vim: {}}
# instantiation parameters
if vnf_params:
vdu_instantiation_params = find_in_list(
get_iterable(vnf_params, "vdu"),
lambda i_vdu: i_vdu["id"] == vdud["id"],
)
if vdu_instantiation_params:
# Parse the vdu_volumes from the instantiation params
vdu_volumes = get_volumes_from_instantiation_params(
vdu_instantiation_params, vdud
)
vdur["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
vdur["additionalParams"]["OSM"][
"vim_flavor_id"
] = vdu_instantiation_params.get("vim-flavor-id")
vdur["additionalParams"]["OSM"][
"vim_flavor_name"
] = vdu_instantiation_params.get("vim-flavor-name")
vdur["additionalParams"]["OSM"][
"instance_name"
] = vdu_instantiation_params.get("instance_name")
vdur["additionalParams"]["OSM"][
"security-group-name"
] = vdu_instantiation_params.get("security-group-name")
vdur_list.append(vdur)
target_vnf["vdur"] = vdur_list
target["vnf"].append(target_vnf)
self.logger.debug("Send to RO > nsr_id={} target={}".format(nsr_id, target))
desc = await self.RO.deploy(nsr_id, target)
self.logger.debug("RO return > {}".format(desc))
action_id = desc["action_id"]
await self._wait_ng_ro(
nsr_id,
action_id,
nslcmop_id,
start_deploy,
timeout_ns_deploy,
stage,
operation="instantiation",
)
# Updating NSR
db_nsr_update = {
"_admin.deployed.RO.operational-status": "running",
"detailed-status": " ".join(stage),
}
# db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
self.logger.debug(
logging_text + "ns deployed at RO. RO_id={}".format(action_id)
)
return
async def _wait_ng_ro(
self,
nsr_id,
action_id,
nslcmop_id=None,
start_time=None,
timeout=600,
stage=None,
operation=None,
):
detailed_status_old = None
db_nsr_update = {}
start_time = start_time or time()
while time() <= start_time + timeout:
desc_status = await self.op_status_map[operation](nsr_id, action_id)
self.logger.debug("Wait NG RO > {}".format(desc_status))
if desc_status["status"] == "FAILED":
raise NgRoException(desc_status["details"])
elif desc_status["status"] == "BUILD":
if stage:
stage[2] = "VIM: ({})".format(desc_status["details"])
elif desc_status["status"] == "DONE":
if stage:
stage[2] = "Deployed at VIM"
break
else:
assert False, "ROclient.check_ns_status returns unknown {}".format(
desc_status["status"]
)
if stage and nslcmop_id and stage[2] != detailed_status_old:
detailed_status_old = stage[2]
db_nsr_update["detailed-status"] = " ".join(stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
await asyncio.sleep(15)
else: # timeout_ns_deploy
raise NgRoException("Timeout waiting ns to deploy")
async def _terminate_ng_ro(
self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
):
db_nsr_update = {}
failed_detail = []
action_id = None
start_deploy = time()
try:
target = {
"ns": {"vld": []},
"vnf": [],
"image": [],
"flavor": [],
"action_id": nslcmop_id,
}
desc = await self.RO.deploy(nsr_id, target)
action_id = desc["action_id"]
db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
self.logger.debug(
logging_text
+ "ns terminate action at RO. action_id={}".format(action_id)
)
# wait until done
delete_timeout = 20 * 60 # 20 minutes
await self._wait_ng_ro(
nsr_id,
action_id,
nslcmop_id,
start_deploy,
delete_timeout,
stage,
operation="termination",
)
db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
# delete all nsr
await self.RO.delete(nsr_id)
except NgRoException as e:
if e.http_code == 404: # not found
db_nsr_update["_admin.deployed.RO.nsr_id"] = None
db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
self.logger.debug(
logging_text + "RO_action_id={} already deleted".format(action_id)
)
elif e.http_code == 409: # conflict
failed_detail.append("delete conflict: {}".format(e))
self.logger.debug(
logging_text
+ "RO_action_id={} delete conflict: {}".format(action_id, e)
)
else:
failed_detail.append("delete error: {}".format(e))
self.logger.error(
logging_text
+ "RO_action_id={} delete error: {}".format(action_id, e)
)
except Exception as e:
failed_detail.append("delete error: {}".format(e))
self.logger.error(
logging_text + "RO_action_id={} delete error: {}".format(action_id, e)
)
if failed_detail:
stage[2] = "Error deleting from VIM"
else:
stage[2] = "Deleted from VIM"
db_nsr_update["detailed-status"] = " ".join(stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
if failed_detail:
raise LcmException("; ".join(failed_detail))
return
async def instantiate_RO(
self,
logging_text,
nsr_id,
nsd,
db_nsr,
db_nslcmop,
db_vnfrs,
db_vnfds,
n2vc_key_list,
stage,
):
"""
Instantiate at RO
:param logging_text: preffix text to use at logging
:param nsr_id: nsr identity
:param nsd: database content of ns descriptor
:param db_nsr: database content of ns record
:param db_nslcmop: database content of ns operation, in this case, 'instantiate'
:param db_vnfrs:
:param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
:param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
:param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
:return: None or exception
"""
try:
start_deploy = time()
ns_params = db_nslcmop.get("operationParams")
if ns_params and ns_params.get("timeout_ns_deploy"):
timeout_ns_deploy = ns_params["timeout_ns_deploy"]
else:
timeout_ns_deploy = self.timeout.ns_deploy
# Check for and optionally request placement optimization. Database will be updated if placement activated
stage[2] = "Waiting for Placement."
if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
# in case of placement change ns_params[vimAcountId) if not present at any vnfrs
for vnfr in db_vnfrs.values():
if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
break
else:
ns_params["vimAccountId"] == vnfr["vim-account-id"]
return await self._instantiate_ng_ro(
logging_text,
nsr_id,
nsd,
db_nsr,
db_nslcmop,
db_vnfrs,
db_vnfds,
n2vc_key_list,
stage,
start_deploy,
timeout_ns_deploy,
)
except Exception as e:
stage[2] = "ERROR deploying at VIM"
self.set_vnfr_at_error(db_vnfrs, str(e))
self.logger.error(
"Error deploying at VIM {}".format(e),
exc_info=not isinstance(
e,
(
ROclient.ROClientException,
LcmException,
DbException,
NgRoException,
),
),
)
raise
async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
"""
Wait for kdu to be up, get ip address
:param logging_text: prefix use for logging
:param nsr_id:
:param vnfr_id:
:param kdu_name:
:return: IP address, K8s services
"""
# self.logger.debug(logging_text + "Starting wait_kdu_up")
nb_tries = 0
while nb_tries < 360:
db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
kdur = next(
(
x
for x in get_iterable(db_vnfr, "kdur")
if x.get("kdu-name") == kdu_name
),
None,
)
if not kdur:
raise LcmException(
"Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
)
if kdur.get("status"):
if kdur["status"] in ("READY", "ENABLED"):
return kdur.get("ip-address"), kdur.get("services")
else:
raise LcmException(
"target KDU={} is in error state".format(kdu_name)
)
await asyncio.sleep(10)
nb_tries += 1
raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
async def wait_vm_up_insert_key_ro(
self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
):
"""
Wait for ip addres at RO, and optionally, insert public key in virtual machine
:param logging_text: prefix use for logging
:param nsr_id:
:param vnfr_id:
:param vdu_id:
:param vdu_index:
:param pub_key: public ssh key to inject, None to skip
:param user: user to apply the public ssh key
:return: IP address
"""
self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
ip_address = None
target_vdu_id = None
ro_retries = 0
while True:
ro_retries += 1
if ro_retries >= 360: # 1 hour
raise LcmException(
"Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
)
await asyncio.sleep(10)
# get ip address
if not target_vdu_id:
db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
if not vdu_id: # for the VNF case
if db_vnfr.get("status") == "ERROR":
raise LcmException(
"Cannot inject ssh-key because target VNF is in error state"
)
ip_address = db_vnfr.get("ip-address")
if not ip_address:
continue
vdur = next(
(
x
for x in get_iterable(db_vnfr, "vdur")
if x.get("ip-address") == ip_address
),
None,
)
else: # VDU case
vdur = next(
(
x
for x in get_iterable(db_vnfr, "vdur")
if x.get("vdu-id-ref") == vdu_id
and x.get("count-index") == vdu_index
),
None,
)
if (
not vdur and len(db_vnfr.get("vdur", ())) == 1
): # If only one, this should be the target vdu
vdur = db_vnfr["vdur"][0]
if not vdur:
raise LcmException(
"Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
vnfr_id, vdu_id, vdu_index
)
)
# New generation RO stores information at "vim_info"
ng_ro_status = None
target_vim = None
if vdur.get("vim_info"):
target_vim = next(
t for t in vdur["vim_info"]
) # there should be only one key
ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
if (
vdur.get("pdu-type")
or vdur.get("status") == "ACTIVE"
or ng_ro_status == "ACTIVE"
):
ip_address = vdur.get("ip-address")
if not ip_address:
continue
target_vdu_id = vdur["vdu-id-ref"]
elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
raise LcmException(
"Cannot inject ssh-key because target VM is in error state"
)
if not target_vdu_id:
continue
# inject public key into machine
if pub_key and user:
self.logger.debug(logging_text + "Inserting RO key")
self.logger.debug("SSH > PubKey > {}".format(pub_key))
if vdur.get("pdu-type"):
self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
return ip_address
try:
target = {
"action": {
"action": "inject_ssh_key",
"key": pub_key,
"user": user,
},
"vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
}
desc = await self.RO.deploy(nsr_id, target)
action_id = desc["action_id"]
await self._wait_ng_ro(
nsr_id, action_id, timeout=600, operation="instantiation"
)
break
except NgRoException as e:
raise LcmException(
"Reaching max tries injecting key. Error: {}".format(e)
)
else:
break
return ip_address
async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
"""
Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
"""
my_vca = vca_deployed_list[vca_index]
if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
# vdu or kdu: no dependencies
return
timeout = 300
while timeout >= 0:
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
configuration_status_list = db_nsr["configurationStatus"]
for index, vca_deployed in enumerate(configuration_status_list):
if index == vca_index:
# myself
continue
if not my_vca.get("member-vnf-index") or (
vca_deployed.get("member-vnf-index")
== my_vca.get("member-vnf-index")
):
internal_status = configuration_status_list[index].get("status")
if internal_status == "READY":
continue
elif internal_status == "BROKEN":
raise LcmException(
"Configuration aborted because dependent charm/s has failed"
)
else:
break
else:
# no dependencies, return
return
await asyncio.sleep(10)
timeout -= 1
raise LcmException("Configuration aborted because dependent charm/s timeout")
def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
vca_id = None
if db_vnfr:
vca_id = deep_get(db_vnfr, ("vca-id",))
elif db_nsr:
vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
return vca_id
async def instantiate_N2VC(
self,
logging_text,
vca_index,
nsi_id,
db_nsr,
db_vnfr,
vdu_id,
kdu_name,
vdu_index,
kdu_index,
config_descriptor,
deploy_params,
base_folder,
nslcmop_id,
stage,
vca_type,
vca_name,
ee_config_descriptor,
):
nsr_id = db_nsr["_id"]
db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
db_dict = {
"collection": "nsrs",
"filter": {"_id": nsr_id},
"path": db_update_entry,
}
step = ""
try:
element_type = "NS"
element_under_configuration = nsr_id
vnfr_id = None
if db_vnfr:
vnfr_id = db_vnfr["_id"]
osm_config["osm"]["vnf_id"] = vnfr_id
namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
if vca_type == "native_charm":
index_number = 0
else:
index_number = vdu_index or 0
if vnfr_id:
element_type = "VNF"
element_under_configuration = vnfr_id
namespace += ".{}-{}".format(vnfr_id, index_number)
if vdu_id:
namespace += ".{}-{}".format(vdu_id, index_number)
element_type = "VDU"
element_under_configuration = "{}-{}".format(vdu_id, index_number)
osm_config["osm"]["vdu_id"] = vdu_id
elif kdu_name:
namespace += ".{}".format(kdu_name)
element_type = "KDU"
element_under_configuration = kdu_name
osm_config["osm"]["kdu_name"] = kdu_name
# Get artifact path
if base_folder["pkg-dir"]:
artifact_path = "{}/{}/{}/{}".format(
base_folder["folder"],
base_folder["pkg-dir"],
"charms"
if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
else "helm-charts",
vca_name,
)
else:
artifact_path = "{}/Scripts/{}/{}/".format(
base_folder["folder"],
"charms"
if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
else "helm-charts",
vca_name,
)
self.logger.debug("Artifact path > {}".format(artifact_path))
# get initial_config_primitive_list that applies to this element
initial_config_primitive_list = config_descriptor.get(
"initial-config-primitive"
)
self.logger.debug(
"Initial config primitive list > {}".format(
initial_config_primitive_list
)
)
# add config if not present for NS charm
ee_descriptor_id = ee_config_descriptor.get("id")
self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
initial_config_primitive_list, vca_deployed, ee_descriptor_id
)
self.logger.debug(
"Initial config primitive list #2 > {}".format(
initial_config_primitive_list
)
)
# n2vc_redesign STEP 3.1
# find old ee_id if exists
ee_id = vca_deployed.get("ee_id")
vca_id = self.get_vca_id(db_vnfr, db_nsr)
# create or register execution environment in VCA
if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm-v3"):
self._write_configuration_status(
nsr_id=nsr_id,
vca_index=vca_index,
status="CREATING",
element_under_configuration=element_under_configuration,
element_type=element_type,
)
step = "create execution environment"
self.logger.debug(logging_text + step)
ee_id = None
credentials = None
if vca_type == "k8s_proxy_charm":
ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
namespace=namespace,
artifact_path=artifact_path,
db_dict=db_dict,
vca_id=vca_id,
)
elif vca_type == "helm-v3":
ee_id, credentials = await self.vca_map[
vca_type
].create_execution_environment(
namespace=nsr_id,
reuse_ee_id=ee_id,
db_dict=db_dict,
config=osm_config,
artifact_path=artifact_path,
chart_model=vca_name,
vca_type=vca_type,
)
else:
ee_id, credentials = await self.vca_map[
vca_type
].create_execution_environment(
namespace=namespace,
reuse_ee_id=ee_id,
db_dict=db_dict,
vca_id=vca_id,
)
elif vca_type == "native_charm":
step = "Waiting to VM being up and getting IP address"
self.logger.debug(logging_text + step)
rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
logging_text,
nsr_id,
vnfr_id,
vdu_id,
vdu_index,
user=None,
pub_key=None,
)
credentials = {"hostname": rw_mgmt_ip}
# get username
username = deep_get(
config_descriptor, ("config-access", "ssh-access", "default-user")
)
# TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
# merged. Meanwhile let's get username from initial-config-primitive
if not username and initial_config_primitive_list:
for config_primitive in initial_config_primitive_list:
for param in config_primitive.get("parameter", ()):
if param["name"] == "ssh-username":
username = param["value"]
break
if not username:
raise LcmException(
"Cannot determine the username neither with 'initial-config-primitive' nor with "
"'config-access.ssh-access.default-user'"
)
credentials["username"] = username
# n2vc_redesign STEP 3.2
self._write_configuration_status(
nsr_id=nsr_id,
vca_index=vca_index,
status="REGISTERING",
element_under_configuration=element_under_configuration,
element_type=element_type,
)
step = "register execution environment {}".format(credentials)
self.logger.debug(logging_text + step)
ee_id = await self.vca_map[vca_type].register_execution_environment(
credentials=credentials,
namespace=namespace,
db_dict=db_dict,
vca_id=vca_id,
)
# for compatibility with MON/POL modules, the need model and application name at database
# TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
ee_id_parts = ee_id.split(".")
db_nsr_update = {db_update_entry + "ee_id": ee_id}
if len(ee_id_parts) >= 2:
model_name = ee_id_parts[0]
application_name = ee_id_parts[1]
db_nsr_update[db_update_entry + "model"] = model_name
db_nsr_update[db_update_entry + "application"] = application_name
# n2vc_redesign STEP 3.3
step = "Install configuration Software"
self._write_configuration_status(
nsr_id=nsr_id,
vca_index=vca_index,
status="INSTALLING SW",
element_under_configuration=element_under_configuration,
element_type=element_type,
other_update=db_nsr_update,
)
# TODO check if already done
self.logger.debug(logging_text + step)
config = None
if vca_type == "native_charm":
config_primitive = next(
(p for p in initial_config_primitive_list if p["name"] == "config"),
None,
)
if config_primitive:
config = self._map_primitive_params(
config_primitive, {}, deploy_params
)
num_units = 1
if vca_type == "lxc_proxy_charm":
if element_type == "NS":
num_units = db_nsr.get("config-units") or 1
elif element_type == "VNF":
num_units = db_vnfr.get("config-units") or 1
elif element_type == "VDU":
for v in db_vnfr["vdur"]:
if vdu_id == v["vdu-id-ref"]:
num_units = v.get("config-units") or 1
break
if vca_type != "k8s_proxy_charm":
await self.vca_map[vca_type].install_configuration_sw(
ee_id=ee_id,
artifact_path=artifact_path,
db_dict=db_dict,
config=config,
num_units=num_units,
vca_id=vca_id,
vca_type=vca_type,
)
# write in db flag of configuration_sw already installed
self.update_db_2(
"nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
)
# add relations for this VCA (wait for other peers related with this VCA)
is_relation_added = await self._add_vca_relations(
logging_text=logging_text,
nsr_id=nsr_id,
vca_type=vca_type,
vca_index=vca_index,
)
if not is_relation_added:
raise LcmException("Relations could not be added to VCA.")
# if SSH access is required, then get execution environment SSH public
# if native charm we have waited already to VM be UP
if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm-v3"):
pub_key = None
user = None
# self.logger.debug("get ssh key block")
if deep_get(
config_descriptor, ("config-access", "ssh-access", "required")
):
# self.logger.debug("ssh key needed")
# Needed to inject a ssh key
user = deep_get(
config_descriptor,
("config-access", "ssh-access", "default-user"),
)
step = "Install configuration Software, getting public ssh key"
pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
)
step = "Insert public key into VM user={} ssh_key={}".format(
user, pub_key
)
else:
# self.logger.debug("no need to get ssh key")
step = "Waiting to VM being up and getting IP address"
self.logger.debug(logging_text + step)
# default rw_mgmt_ip to None, avoiding the non definition of the variable
rw_mgmt_ip = None
# n2vc_redesign STEP 5.1
# wait for RO (ip-address) Insert pub_key into VM
if vnfr_id:
if kdu_name:
rw_mgmt_ip, services = await self.wait_kdu_up(
logging_text, nsr_id, vnfr_id, kdu_name
)
vnfd = self.db.get_one(
"vnfds_revisions",
{"_id": f'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
)
kdu = get_kdu(vnfd, kdu_name)
kdu_services = [
service["name"] for service in get_kdu_services(kdu)
]
exposed_services = []
for service in services:
if any(s in service["name"] for s in kdu_services):
exposed_services.append(service)
await self.vca_map[vca_type].exec_primitive(
ee_id=ee_id,
primitive_name="config",
params_dict={
"osm-config": json.dumps(
OsmConfigBuilder(
k8s={"services": exposed_services}
).build()
)
},
vca_id=vca_id,
)
# This verification is needed in order to avoid trying to add a public key
# to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
# for a KNF and not for its KDUs, the previous verification gives False, and the code
# jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
# or it is a KNF)
elif db_vnfr.get("vdur"):
rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
logging_text,
nsr_id,
vnfr_id,
vdu_id,
vdu_index,
user=user,
pub_key=pub_key,
)
self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
# store rw_mgmt_ip in deploy params for later replacement
deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
# n2vc_redesign STEP 6 Execute initial config primitive
step = "execute initial config primitive"
# wait for dependent primitives execution (NS -> VNF -> VDU)
if initial_config_primitive_list:
await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
# stage, in function of element type: vdu, kdu, vnf or ns
my_vca = vca_deployed_list[vca_index]
if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
# VDU or KDU
stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
elif my_vca.get("member-vnf-index"):
# VNF
stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
else:
# NS
stage[0] = "Stage 5/5: running Day-1 primitives for NS."
self._write_configuration_status(
nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
)
self._write_op_status(op_id=nslcmop_id, stage=stage)
check_if_terminated_needed = True
for initial_config_primitive in initial_config_primitive_list:
# adding information on the vca_deployed if it is a NS execution environment
if not vca_deployed["member-vnf-index"]:
deploy_params["ns_config_info"] = json.dumps(
self._get_ns_config_info(nsr_id)
)
# TODO check if already done
primitive_params_ = self._map_primitive_params(
initial_config_primitive, {}, deploy_params
)
step = "execute primitive '{}' params '{}'".format(
initial_config_primitive["name"], primitive_params_
)
self.logger.debug(logging_text + step)
await self.vca_map[vca_type].exec_primitive(
ee_id=ee_id,
primitive_name=initial_config_primitive["name"],
params_dict=primitive_params_,
db_dict=db_dict,
vca_id=vca_id,
vca_type=vca_type,
)
# Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
if check_if_terminated_needed:
if config_descriptor.get("terminate-config-primitive"):
self.update_db_2(
"nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
)
check_if_terminated_needed = False
# TODO register in database that primitive is done
# STEP 7 Configure metrics
if vca_type == "helm-v3":
# TODO: review for those cases where the helm chart is a reference and
# is not part of the NF package
prometheus_jobs = await self.extract_prometheus_scrape_jobs(
ee_id=ee_id,
artifact_path=artifact_path,
ee_config_descriptor=ee_config_descriptor,
vnfr_id=vnfr_id,
nsr_id=nsr_id,
target_ip=rw_mgmt_ip,
element_type=element_type,
vnf_member_index=db_vnfr.get("member-vnf-index-ref", ""),
vdu_id=vdu_id,
vdu_index=vdu_index,
kdu_name=kdu_name,
kdu_index=kdu_index,
)
if prometheus_jobs:
self.update_db_2(
"nsrs",
nsr_id,
{db_update_entry + "prometheus_jobs": prometheus_jobs},
)
for job in prometheus_jobs:
self.db.set_one(
"prometheus_jobs",
{"job_name": job["job_name"]},
job,
upsert=True,
fail_on_empty=False,
)
step = "instantiated at VCA"
self.logger.debug(logging_text + step)
self._write_configuration_status(
nsr_id=nsr_id, vca_index=vca_index, status="READY"
)
except Exception as e: # TODO not use Exception but N2VC exception
# self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
if not isinstance(
e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
):
self.logger.error(
"Exception while {} : {}".format(step, e), exc_info=True
)
self._write_configuration_status(
nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
)
raise LcmException("{}. {}".format(step, e)) from e
def _write_ns_status(
self,
nsr_id: str,
ns_state: str,
current_operation: str,
current_operation_id: str,
error_description: str = None,
error_detail: str = None,
other_update: dict = None,
):
"""
Update db_nsr fields.
:param nsr_id:
:param ns_state:
:param current_operation:
:param current_operation_id:
:param error_description:
:param error_detail:
:param other_update: Other required changes at database if provided, will be cleared
:return:
"""
try:
db_dict = other_update or {}
db_dict[
"_admin.nslcmop"
] = current_operation_id # for backward compatibility
db_dict["_admin.current-operation"] = current_operation_id
db_dict["_admin.operation-type"] = (
current_operation if current_operation != "IDLE" else None
)
db_dict["currentOperation"] = current_operation
db_dict["currentOperationID"] = current_operation_id
db_dict["errorDescription"] = error_description
db_dict["errorDetail"] = error_detail
if ns_state:
db_dict["nsState"] = ns_state
self.update_db_2("nsrs", nsr_id, db_dict)
except DbException as e:
self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
def _write_op_status(
self,
op_id: str,
stage: list = None,
error_message: str = None,
queuePosition: int = 0,
operation_state: str = None,
other_update: dict = None,
):
try:
db_dict = other_update or {}
db_dict["queuePosition"] = queuePosition
if isinstance(stage, list):
db_dict["stage"] = stage[0]
db_dict["detailed-status"] = " ".join(stage)
elif stage is not None:
db_dict["stage"] = str(stage)
if error_message is not None:
db_dict["errorMessage"] = error_message
if operation_state is not None:
db_dict["operationState"] = operation_state
db_dict["statusEnteredTime"] = time()
self.update_db_2("nslcmops", op_id, db_dict)
except DbException as e:
self.logger.warn(
"Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
)
def _write_all_config_status(self, db_nsr: dict, status: str):
try:
nsr_id = db_nsr["_id"]
# configurationStatus
config_status = db_nsr.get("configurationStatus")
if config_status:
db_nsr_update = {
"configurationStatus.{}.status".format(index): status
for index, v in enumerate(config_status)
if v
}
# update status
self.update_db_2("nsrs", nsr_id, db_nsr_update)
except DbException as e:
self.logger.warn(
"Error writing all configuration status, ns={}: {}".format(nsr_id, e)
)
def _write_configuration_status(
self,
nsr_id: str,
vca_index: int,
status: str = None,
element_under_configuration: str = None,
element_type: str = None,
other_update: dict = None,
):
# self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
# .format(vca_index, status))
try:
db_path = "configurationStatus.{}.".format(vca_index)
db_dict = other_update or {}
if status:
db_dict[db_path + "status"] = status
if element_under_configuration:
db_dict[
db_path + "elementUnderConfiguration"
] = element_under_configuration
if element_type:
db_dict[db_path + "elementType"] = element_type
self.update_db_2("nsrs", nsr_id, db_dict)
except DbException as e:
self.logger.warn(
"Error writing configuration status={}, ns={}, vca_index={}: {}".format(
status, nsr_id, vca_index, e
)
)
async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
"""
Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
Database is used because the result can be obtained from a different LCM worker in case of HA.
:param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
:param db_nslcmop: database content of nslcmop
:param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
:return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
computed 'vim-account-id'
"""
modified = False
nslcmop_id = db_nslcmop["_id"]
placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
if placement_engine == "PLA":
self.logger.debug(
logging_text + "Invoke and wait for placement optimization"
)
await self.msg.aiowrite("pla", "get_placement", {"nslcmopId": nslcmop_id})
db_poll_interval = 5
wait = db_poll_interval * 10
pla_result = None
while not pla_result and wait >= 0:
await asyncio.sleep(db_poll_interval)
wait -= db_poll_interval
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
if not pla_result:
raise LcmException(
"Placement timeout for nslcmopId={}".format(nslcmop_id)
)
for pla_vnf in pla_result["vnf"]:
vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
if not pla_vnf.get("vimAccountId") or not vnfr:
continue
modified = True
self.db.set_one(
"vnfrs",
{"_id": vnfr["_id"]},
{"vim-account-id": pla_vnf["vimAccountId"]},
)
# Modifies db_vnfrs
vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
return modified
def _gather_vnfr_healing_alerts(self, vnfr, vnfd):
alerts = []
nsr_id = vnfr["nsr-id-ref"]
df = vnfd.get("df", [{}])[0]
# Checking for auto-healing configuration
if "healing-aspect" in df:
healing_aspects = df["healing-aspect"]
for healing in healing_aspects:
for healing_policy in healing.get("healing-policy", ()):
vdu_id = healing_policy["vdu-id"]
vdur = next(
(vdur for vdur in vnfr["vdur"] if vdu_id == vdur["vdu-id-ref"]),
{},
)
if not vdur:
continue
metric_name = "vm_status"
vdu_name = vdur.get("name")
vnf_member_index = vnfr["member-vnf-index-ref"]
uuid = str(uuid4())
name = f"healing_{uuid}"
action = healing_policy
# action_on_recovery = healing.get("action-on-recovery")
# cooldown_time = healing.get("cooldown-time")
# day1 = healing.get("day1")
alert = {
"uuid": uuid,
"name": name,
"metric": metric_name,
"tags": {
"ns_id": nsr_id,
"vnf_member_index": vnf_member_index,
"vdu_name": vdu_name,
},
"alarm_status": "ok",
"action_type": "healing",
"action": action,
}
alerts.append(alert)
return alerts
def _gather_vnfr_scaling_alerts(self, vnfr, vnfd):
alerts = []
nsr_id = vnfr["nsr-id-ref"]
df = vnfd.get("df", [{}])[0]
# Checking for auto-scaling configuration
if "scaling-aspect" in df:
scaling_aspects = df["scaling-aspect"]
all_vnfd_monitoring_params = {}
for ivld in vnfd.get("int-virtual-link-desc", ()):
for mp in ivld.get("monitoring-parameters", ()):
all_vnfd_monitoring_params[mp.get("id")] = mp
for vdu in vnfd.get("vdu", ()):
for mp in vdu.get("monitoring-parameter", ()):
all_vnfd_monitoring_params[mp.get("id")] = mp
for df in vnfd.get("df", ()):
for mp in df.get("monitoring-parameter", ()):
all_vnfd_monitoring_params[mp.get("id")] = mp
for scaling_aspect in scaling_aspects:
scaling_group_name = scaling_aspect.get("name", "")
# Get monitored VDUs
all_monitored_vdus = set()
for delta in scaling_aspect.get("aspect-delta-details", {}).get(
"deltas", ()
):
for vdu_delta in delta.get("vdu-delta", ()):
all_monitored_vdus.add(vdu_delta.get("id"))
monitored_vdurs = list(
filter(
lambda vdur: vdur["vdu-id-ref"] in all_monitored_vdus,
vnfr["vdur"],
)
)
if not monitored_vdurs:
self.logger.error(
"Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric"
)
continue
for scaling_policy in scaling_aspect.get("scaling-policy", ()):
if scaling_policy["scaling-type"] != "automatic":
continue
threshold_time = scaling_policy.get("threshold-time", "1")
cooldown_time = scaling_policy.get("cooldown-time", "0")
for scaling_criteria in scaling_policy["scaling-criteria"]:
monitoring_param_ref = scaling_criteria.get(
"vnf-monitoring-param-ref"
)
vnf_monitoring_param = all_vnfd_monitoring_params[
monitoring_param_ref
]
for vdur in monitored_vdurs:
vdu_id = vdur["vdu-id-ref"]
metric_name = vnf_monitoring_param.get("performance-metric")
if "exporters-endpoints" not in df:
metric_name = f"osm_{metric_name}"
vnf_member_index = vnfr["member-vnf-index-ref"]
scalein_threshold = scaling_criteria.get(
"scale-in-threshold"
)
scaleout_threshold = scaling_criteria.get(
"scale-out-threshold"
)
# Looking for min/max-number-of-instances
instances_min_number = 1
instances_max_number = 1
vdu_profile = df["vdu-profile"]
if vdu_profile:
profile = next(
item for item in vdu_profile if item["id"] == vdu_id
)
instances_min_number = profile.get(
"min-number-of-instances", 1
)
instances_max_number = profile.get(
"max-number-of-instances", 1
)
if scalein_threshold:
uuid = str(uuid4())
name = f"scalein_{uuid}"
operation = scaling_criteria[
"scale-in-relational-operation"
]
rel_operator = self.rel_operation_types.get(
operation, "<="
)
metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
expression = f"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})"
if (
"exporters-endpoints" in df
and metric_name.startswith("kpi_")
):
new_metric_name = (
f'osm_{metric_name.replace("kpi_", "").strip()}'
)
metric_port = df["exporters-endpoints"].get(
"metric-port", 9100
)
vdu_ip = vdur["ip-address"]
ip_port = str(vdu_ip) + ":" + str(metric_port)
metric_selector = (
f'{new_metric_name}{{instance="{ip_port}"}}'
)
expression = f"({metric_selector} {rel_operator} {scalein_threshold})"
labels = {
"ns_id": nsr_id,
"vnf_member_index": vnf_member_index,
"vdu_id": vdu_id,
}
prom_cfg = {
"alert": name,
"expr": expression,
"for": str(threshold_time) + "m",
"labels": labels,
}
action = scaling_policy
action = {
"scaling-group": scaling_group_name,
"cooldown-time": cooldown_time,
}
alert = {
"uuid": uuid,
"name": name,
"metric": metric_name,
"tags": {
"ns_id": nsr_id,
"vnf_member_index": vnf_member_index,
"vdu_id": vdu_id,
},
"alarm_status": "ok",
"action_type": "scale_in",
"action": action,
"prometheus_config": prom_cfg,
}
alerts.append(alert)
if scaleout_threshold:
uuid = str(uuid4())
name = f"scaleout_{uuid}"
operation = scaling_criteria[
"scale-out-relational-operation"
]
rel_operator = self.rel_operation_types.get(
operation, "<="
)
metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
expression = f"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})"
if (
"exporters-endpoints" in df
and metric_name.startswith("kpi_")
):
new_metric_name = (
f'osm_{metric_name.replace("kpi_", "").strip()}'
)
metric_port = df["exporters-endpoints"].get(
"metric-port", 9100
)
vdu_ip = vdur["ip-address"]
ip_port = str(vdu_ip) + ":" + str(metric_port)
metric_selector = (
f'{new_metric_name}{{instance="{ip_port}"}}'
)
expression = f"({metric_selector} {rel_operator} {scaleout_threshold})"
labels = {
"ns_id": nsr_id,
"vnf_member_index": vnf_member_index,
"vdu_id": vdu_id,
}
prom_cfg = {
"alert": name,
"expr": expression,
"for": str(threshold_time) + "m",
"labels": labels,
}
action = scaling_policy
action = {
"scaling-group": scaling_group_name,
"cooldown-time": cooldown_time,
}
alert = {
"uuid": uuid,
"name": name,
"metric": metric_name,
"tags": {
"ns_id": nsr_id,
"vnf_member_index": vnf_member_index,
"vdu_id": vdu_id,
},
"alarm_status": "ok",
"action_type": "scale_out",
"action": action,
"prometheus_config": prom_cfg,
}
alerts.append(alert)
return alerts
def _gather_vnfr_alarm_alerts(self, vnfr, vnfd):
alerts = []
nsr_id = vnfr["nsr-id-ref"]
vnf_member_index = vnfr["member-vnf-index-ref"]
# Checking for VNF alarm configuration
for vdur in vnfr["vdur"]:
vdu_id = vdur["vdu-id-ref"]
vdu = next(filter(lambda vdu: vdu["id"] == vdu_id, vnfd["vdu"]))
if "alarm" in vdu:
# Get VDU monitoring params, since alerts are based on them
vdu_monitoring_params = {}
for mp in vdu.get("monitoring-parameter", []):
vdu_monitoring_params[mp.get("id")] = mp
if not vdu_monitoring_params:
self.logger.error(
"VDU alarm refers to a VDU monitoring param, but there are no VDU monitoring params in the VDU"
)
continue
# Get alarms in the VDU
alarm_descriptors = vdu["alarm"]
# Create VDU alarms for each alarm in the VDU
for alarm_descriptor in alarm_descriptors:
# Check that the VDU alarm refers to a proper monitoring param
alarm_monitoring_param = alarm_descriptor.get(
"vnf-monitoring-param-ref", ""
)
vdu_specific_monitoring_param = vdu_monitoring_params.get(
alarm_monitoring_param, {}
)
if not vdu_specific_monitoring_param:
self.logger.error(
"VDU alarm refers to a VDU monitoring param not present in the VDU"
)
continue
metric_name = vdu_specific_monitoring_param.get(
"performance-metric"
)
if not metric_name:
self.logger.error(
"VDU alarm refers to a VDU monitoring param that has no associated performance-metric"
)
continue
# Set params of the alarm to be created in Prometheus
metric_name = f"osm_{metric_name}"
metric_threshold = alarm_descriptor.get("value")
uuid = str(uuid4())
alert_name = f"vdu_alarm_{uuid}"
operation = alarm_descriptor["operation"]
rel_operator = self.rel_operation_types.get(operation, "<=")
metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
expression = f"{metric_selector} {rel_operator} {metric_threshold}"
labels = {
"ns_id": nsr_id,
"vnf_member_index": vnf_member_index,
"vdu_id": vdu_id,
"vdu_name": "{{ $labels.vdu_name }}",
}
prom_cfg = {
"alert": alert_name,
"expr": expression,
"for": "1m", # default value. Ideally, this should be related to an IM param, but there is not such param
"labels": labels,
}
alarm_action = dict()
for action_type in ["ok", "insufficient-data", "alarm"]:
if (
"actions" in alarm_descriptor
and action_type in alarm_descriptor["actions"]
):
alarm_action[action_type] = alarm_descriptor["actions"][
action_type
]
alert = {
"uuid": uuid,
"name": alert_name,
"metric": metric_name,
"tags": {
"ns_id": nsr_id,
"vnf_member_index": vnf_member_index,
"vdu_id": vdu_id,
},
"alarm_status": "ok",
"action_type": "vdu_alarm",
"action": alarm_action,
"prometheus_config": prom_cfg,
}
alerts.append(alert)
return alerts
def update_nsrs_with_pla_result(self, params):
try:
nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
self.update_db_2(
"nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
)
except Exception as e:
self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
async def instantiate(self, nsr_id, nslcmop_id):
"""
:param nsr_id: ns instance to deploy
:param nslcmop_id: operation to run
:return:
"""
# Try to lock HA task here
task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
if not task_is_locked_by_me:
self.logger.debug(
"instantiate() task is not locked by me, ns={}".format(nsr_id)
)
return
logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
# get all needed from database
# database nsrs record
db_nsr = None
# database nslcmops record
db_nslcmop = None
# update operation on nsrs
db_nsr_update = {}
# update operation on nslcmops
db_nslcmop_update = {}
timeout_ns_deploy = self.timeout.ns_deploy
nslcmop_operation_state = None
db_vnfrs = {} # vnf's info indexed by member-index
# n2vc_info = {}
tasks_dict_info = {} # from task to info text
exc = None
error_list = []
stage = [
"Stage 1/5: preparation of the environment.",
"Waiting for previous operations to terminate.",
"",
]
# ^ stage, step, VIM progress
try:
# wait for any previous tasks in process
await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
# STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
stage[1] = "Reading from database."
# nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
db_nsr_update["detailed-status"] = "creating"
db_nsr_update["operational-status"] = "init"
self._write_ns_status(
nsr_id=nsr_id,
ns_state="BUILDING",
current_operation="INSTANTIATING",
current_operation_id=nslcmop_id,
other_update=db_nsr_update,
)
self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
# read from db: operation
stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
db_nslcmop["operationParams"]["additionalParamsForVnf"]
)
ns_params = db_nslcmop.get("operationParams")
if ns_params and ns_params.get("timeout_ns_deploy"):
timeout_ns_deploy = ns_params["timeout_ns_deploy"]
# read from db: ns
stage[1] = "Getting nsr={} from db.".format(nsr_id)
self.logger.debug(logging_text + stage[1])
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
self.logger.debug(logging_text + stage[1])
nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
self.fs.sync(db_nsr["nsd-id"])
db_nsr["nsd"] = nsd
# nsr_name = db_nsr["name"] # TODO short-name??
# read from db: vnf's of this ns
stage[1] = "Getting vnfrs from db."
self.logger.debug(logging_text + stage[1])
db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
# read from db: vnfd's for every vnf
db_vnfds = [] # every vnfd data
# for each vnf in ns, read vnfd
for vnfr in db_vnfrs_list:
if vnfr.get("kdur"):
kdur_list = []
for kdur in vnfr["kdur"]:
if kdur.get("additionalParams"):
kdur["additionalParams"] = json.loads(
kdur["additionalParams"]
)
kdur_list.append(kdur)
vnfr["kdur"] = kdur_list
db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
vnfd_id = vnfr["vnfd-id"]
vnfd_ref = vnfr["vnfd-ref"]
self.fs.sync(vnfd_id)
# if we haven't this vnfd, read it from db
if vnfd_id not in db_vnfds:
# read from db
stage[1] = "Getting vnfd={} id='{}' from db.".format(
vnfd_id, vnfd_ref
)
self.logger.debug(logging_text + stage[1])
vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
# store vnfd
db_vnfds.append(vnfd)
# Get or generates the _admin.deployed.VCA list
vca_deployed_list = None
if db_nsr["_admin"].get("deployed"):
vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
if vca_deployed_list is None:
vca_deployed_list = []
configuration_status_list = []
db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
db_nsr_update["configurationStatus"] = configuration_status_list
# add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
elif isinstance(vca_deployed_list, dict):
# maintain backward compatibility. Change a dict to list at database
vca_deployed_list = list(vca_deployed_list.values())
db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
if not isinstance(
deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
):
populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
db_nsr_update["_admin.deployed.RO.vnfd"] = []
# set state to INSTANTIATED. When instantiated NBI will not delete directly
db_nsr_update["_admin.nsState"] = "INSTANTIATED"
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self.db.set_list(
"vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
)
# n2vc_redesign STEP 2 Deploy Network Scenario
stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
self._write_op_status(op_id=nslcmop_id, stage=stage)
stage[1] = "Deploying KDUs."
# self.logger.debug(logging_text + "Before deploy_kdus")
# Call to deploy_kdus in case exists the "vdu:kdu" param
await self.deploy_kdus(
logging_text=logging_text,
nsr_id=nsr_id,
nslcmop_id=nslcmop_id,
db_vnfrs=db_vnfrs,
db_vnfds=db_vnfds,
task_instantiation_info=tasks_dict_info,
)
stage[1] = "Getting VCA public key."
# n2vc_redesign STEP 1 Get VCA public ssh-key
# feature 1429. Add n2vc public key to needed VMs
n2vc_key = self.n2vc.get_public_key()
n2vc_key_list = [n2vc_key]
if self.vca_config.public_key:
n2vc_key_list.append(self.vca_config.public_key)
stage[1] = "Deploying NS at VIM."
task_ro = asyncio.ensure_future(
self.instantiate_RO(
logging_text=logging_text,
nsr_id=nsr_id,
nsd=nsd,
db_nsr=db_nsr,
db_nslcmop=db_nslcmop,
db_vnfrs=db_vnfrs,
db_vnfds=db_vnfds,
n2vc_key_list=n2vc_key_list,
stage=stage,
)
)
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
tasks_dict_info[task_ro] = "Deploying at VIM"
# n2vc_redesign STEP 3 to 6 Deploy N2VC
stage[1] = "Deploying Execution Environments."
self.logger.debug(logging_text + stage[1])
# create namespace and certificate if any helm based EE is present in the NS
if check_helm_ee_in_ns(db_vnfds):
await self.vca_map["helm-v3"].setup_ns_namespace(
name=nsr_id,
)
# create TLS certificates
await self.vca_map["helm-v3"].create_tls_certificate(
secret_name=self.EE_TLS_NAME,
dns_prefix="*",
nsr_id=nsr_id,
usage="server auth",
namespace=nsr_id,
)
nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
for vnf_profile in get_vnf_profiles(nsd):
vnfd_id = vnf_profile["vnfd-id"]
vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
member_vnf_index = str(vnf_profile["id"])
db_vnfr = db_vnfrs[member_vnf_index]
base_folder = vnfd["_admin"]["storage"]
vdu_id = None
vdu_index = 0
vdu_name = None
kdu_name = None
kdu_index = None
# Get additional parameters
deploy_params = {"OSM": get_osm_params(db_vnfr)}
if db_vnfr.get("additionalParamsForVnf"):
deploy_params.update(
parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
)
descriptor_config = get_configuration(vnfd, vnfd["id"])
if descriptor_config:
self._deploy_n2vc(
logging_text=logging_text
+ "member_vnf_index={} ".format(member_vnf_index),
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
nsr_id=nsr_id,
nsi_id=nsi_id,
vnfd_id=vnfd_id,
vdu_id=vdu_id,
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
kdu_index=kdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params,
descriptor_config=descriptor_config,
base_folder=base_folder,
task_instantiation_info=tasks_dict_info,
stage=stage,
)
# Deploy charms for each VDU that supports one.
for vdud in get_vdu_list(vnfd):
vdu_id = vdud["id"]
descriptor_config = get_configuration(vnfd, vdu_id)
vdur = find_in_list(
db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
)
if vdur.get("additionalParams"):
deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
else:
deploy_params_vdu = deploy_params
deploy_params_vdu["OSM"] = get_osm_params(
db_vnfr, vdu_id, vdu_count_index=0
)
vdud_count = get_number_of_instances(vnfd, vdu_id)
self.logger.debug("VDUD > {}".format(vdud))
self.logger.debug(
"Descriptor config > {}".format(descriptor_config)
)
if descriptor_config:
vdu_name = None
kdu_name = None
kdu_index = None
for vdu_index in range(vdud_count):
# TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
self._deploy_n2vc(
logging_text=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
member_vnf_index, vdu_id, vdu_index
),
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
nsr_id=nsr_id,
nsi_id=nsi_id,
vnfd_id=vnfd_id,
vdu_id=vdu_id,
kdu_name=kdu_name,
kdu_index=kdu_index,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params_vdu,
descriptor_config=descriptor_config,
base_folder=base_folder,
task_instantiation_info=tasks_dict_info,
stage=stage,
)
for kdud in get_kdu_list(vnfd):
kdu_name = kdud["name"]
descriptor_config = get_configuration(vnfd, kdu_name)
if descriptor_config:
vdu_id = None
vdu_index = 0
vdu_name = None
kdu_index, kdur = next(
x
for x in enumerate(db_vnfr["kdur"])
if x[1]["kdu-name"] == kdu_name
)
deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
if kdur.get("additionalParams"):
deploy_params_kdu.update(
parse_yaml_strings(kdur["additionalParams"].copy())
)
self._deploy_n2vc(
logging_text=logging_text,
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
nsr_id=nsr_id,
nsi_id=nsi_id,
vnfd_id=vnfd_id,
vdu_id=vdu_id,
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
kdu_index=kdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params_kdu,
descriptor_config=descriptor_config,
base_folder=base_folder,
task_instantiation_info=tasks_dict_info,
stage=stage,
)
# Check if each vnf has exporter for metric collection if so update prometheus job records
if "exporters-endpoints" in vnfd.get("df")[0]:
exporter_config = vnfd.get("df")[0].get("exporters-endpoints")
self.logger.debug("exporter config :{}".format(exporter_config))
artifact_path = "{}/{}/{}".format(
base_folder["folder"],
base_folder["pkg-dir"],
"exporter-endpoint",
)
ee_id = None
ee_config_descriptor = exporter_config
vnfr_id = db_vnfr["id"]
rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
logging_text,
nsr_id,
vnfr_id,
vdu_id=None,
vdu_index=None,
user=None,
pub_key=None,
)
self.logger.debug("rw_mgmt_ip:{}".format(rw_mgmt_ip))
self.logger.debug("Artifact_path:{}".format(artifact_path))
db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
vdu_id_for_prom = None
vdu_index_for_prom = None
for x in get_iterable(db_vnfr, "vdur"):
vdu_id_for_prom = x.get("vdu-id-ref")
vdu_index_for_prom = x.get("count-index")
prometheus_jobs = await self.extract_prometheus_scrape_jobs(
ee_id=ee_id,
artifact_path=artifact_path,
ee_config_descriptor=ee_config_descriptor,
vnfr_id=vnfr_id,
nsr_id=nsr_id,
target_ip=rw_mgmt_ip,
element_type="VDU",
vdu_id=vdu_id_for_prom,
vdu_index=vdu_index_for_prom,
)
self.logger.debug("Prometheus job:{}".format(prometheus_jobs))
if prometheus_jobs:
db_nsr_update["_admin.deployed.prometheus_jobs"] = prometheus_jobs
self.update_db_2(
"nsrs",
nsr_id,
db_nsr_update,
)
for job in prometheus_jobs:
self.db.set_one(
"prometheus_jobs",
{"job_name": job["job_name"]},
job,
upsert=True,
fail_on_empty=False,
)
# Check if this NS has a charm configuration
descriptor_config = nsd.get("ns-configuration")
if descriptor_config and descriptor_config.get("juju"):
vnfd_id = None
db_vnfr = None
member_vnf_index = None
vdu_id = None
kdu_name = None
kdu_index = None
vdu_index = 0
vdu_name = None
# Get additional parameters
deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
if db_nsr.get("additionalParamsForNs"):
deploy_params.update(
parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
)
base_folder = nsd["_admin"]["storage"]
self._deploy_n2vc(
logging_text=logging_text,
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
nsr_id=nsr_id,
nsi_id=nsi_id,
vnfd_id=vnfd_id,
vdu_id=vdu_id,
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
kdu_index=kdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params,
descriptor_config=descriptor_config,
base_folder=base_folder,
task_instantiation_info=tasks_dict_info,
stage=stage,
)
# rest of staff will be done at finally
except (
ROclient.ROClientException,
DbException,
LcmException,
N2VCException,
) as e:
self.logger.error(
logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
)
exc = e
except asyncio.CancelledError:
self.logger.error(
logging_text + "Cancelled Exception while '{}'".format(stage[1])
)
exc = "Operation was cancelled"
except Exception as e:
exc = traceback.format_exc()
self.logger.critical(
logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
exc_info=True,
)
finally:
if exc:
error_list.append(str(exc))
try:
# wait for pending tasks
if tasks_dict_info:
stage[1] = "Waiting for instantiate pending tasks."
self.logger.debug(logging_text + stage[1])
error_list += await self._wait_for_tasks(
logging_text,
tasks_dict_info,
timeout_ns_deploy,
stage,
nslcmop_id,
nsr_id=nsr_id,
)
stage[1] = stage[2] = ""
except asyncio.CancelledError:
error_list.append("Cancelled")
await self._cancel_pending_tasks(logging_text, tasks_dict_info)
await self._wait_for_tasks(
logging_text,
tasks_dict_info,
timeout_ns_deploy,
stage,
nslcmop_id,
nsr_id=nsr_id,
)
except Exception as exc:
error_list.append(str(exc))
# update operation-status
db_nsr_update["operational-status"] = "running"
# let's begin with VCA 'configured' status (later we can change it)
db_nsr_update["config-status"] = "configured"
for task, task_name in tasks_dict_info.items():
if not task.done() or task.cancelled() or task.exception():
if task_name.startswith(self.task_name_deploy_vca):
# A N2VC task is pending
db_nsr_update["config-status"] = "failed"
else:
# RO or KDU task is pending
db_nsr_update["operational-status"] = "failed"
# update status at database
if error_list:
error_detail = ". ".join(error_list)
self.logger.error(logging_text + error_detail)
error_description_nslcmop = "{} Detail: {}".format(
stage[0], error_detail
)
error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
nslcmop_id, stage[0]
)
db_nsr_update["detailed-status"] = (
error_description_nsr + " Detail: " + error_detail
)
db_nslcmop_update["detailed-status"] = error_detail
nslcmop_operation_state = "FAILED"
ns_state = "BROKEN"
else:
error_detail = None
error_description_nsr = error_description_nslcmop = None
ns_state = "READY"
db_nsr_update["detailed-status"] = "Done"
db_nslcmop_update["detailed-status"] = "Done"
nslcmop_operation_state = "COMPLETED"
# Gather auto-healing and auto-scaling alerts for each vnfr
healing_alerts = []
scaling_alerts = []
for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
vnfd = next(
(sub for sub in db_vnfds if sub["_id"] == vnfr["vnfd-id"]), None
)
healing_alerts = self._gather_vnfr_healing_alerts(vnfr, vnfd)
for alert in healing_alerts:
self.logger.info(f"Storing healing alert in MongoDB: {alert}")
self.db.create("alerts", alert)
scaling_alerts = self._gather_vnfr_scaling_alerts(vnfr, vnfd)
for alert in scaling_alerts:
self.logger.info(f"Storing scaling alert in MongoDB: {alert}")
self.db.create("alerts", alert)
alarm_alerts = self._gather_vnfr_alarm_alerts(vnfr, vnfd)
for alert in alarm_alerts:
self.logger.info(f"Storing VNF alarm alert in MongoDB: {alert}")
self.db.create("alerts", alert)
if db_nsr:
self._write_ns_status(
nsr_id=nsr_id,
ns_state=ns_state,
current_operation="IDLE",
current_operation_id=None,
error_description=error_description_nsr,
error_detail=error_detail,
other_update=db_nsr_update,
)
self._write_op_status(
op_id=nslcmop_id,
stage="",
error_message=error_description_nslcmop,
operation_state=nslcmop_operation_state,
other_update=db_nslcmop_update,
)
if nslcmop_operation_state:
try:
await self.msg.aiowrite(
"ns",
"instantiated",
{
"nsr_id": nsr_id,
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
"startTime": db_nslcmop["startTime"],
"links": db_nslcmop["links"],
"operationParams": {
"nsInstanceId": nsr_id,
"nsdId": db_nsr["nsd-id"],
},
},
)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
)
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
def _get_vnfd(self, vnfd_id: str, projects_read: str, cached_vnfds: Dict[str, Any]):
if vnfd_id not in cached_vnfds:
cached_vnfds[vnfd_id] = self.db.get_one(
"vnfds", {"id": vnfd_id, "_admin.projects_read": projects_read}
)
return cached_vnfds[vnfd_id]
def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
if vnf_profile_id not in cached_vnfrs:
cached_vnfrs[vnf_profile_id] = self.db.get_one(
"vnfrs",
{
"member-vnf-index-ref": vnf_profile_id,
"nsr-id-ref": nsr_id,
},
)
return cached_vnfrs[vnf_profile_id]
def _is_deployed_vca_in_relation(
self, vca: DeployedVCA, relation: Relation
) -> bool:
found = False
for endpoint in (relation.provider, relation.requirer):
if endpoint["kdu-resource-profile-id"]:
continue
found = (
vca.vnf_profile_id == endpoint.vnf_profile_id
and vca.vdu_profile_id == endpoint.vdu_profile_id
and vca.execution_environment_ref == endpoint.execution_environment_ref
)
if found:
break
return found
def _update_ee_relation_data_with_implicit_data(
self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
):
ee_relation_data = safe_get_ee_relation(
nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
)
ee_relation_level = EELevel.get_level(ee_relation_data)
if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
"execution-environment-ref"
]:
vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
vnfd_id = vnf_profile["vnfd-id"]
project = nsd["_admin"]["projects_read"][0]
db_vnfd = self._get_vnfd(vnfd_id, project, cached_vnfds)
entity_id = (
vnfd_id
if ee_relation_level == EELevel.VNF
else ee_relation_data["vdu-profile-id"]
)
ee = get_juju_ee_ref(db_vnfd, entity_id)
if not ee:
raise Exception(
f"not execution environments found for ee_relation {ee_relation_data}"
)
ee_relation_data["execution-environment-ref"] = ee["id"]
return ee_relation_data
def _get_ns_relations(
self,
nsr_id: str,
nsd: Dict[str, Any],
vca: DeployedVCA,
cached_vnfds: Dict[str, Any],
) -> List[Relation]:
relations = []
db_ns_relations = get_ns_configuration_relation_list(nsd)
for r in db_ns_relations:
provider_dict = None
requirer_dict = None
if all(key in r for key in ("provider", "requirer")):
provider_dict = r["provider"]
requirer_dict = r["requirer"]
elif "entities" in r:
provider_id = r["entities"][0]["id"]
provider_dict = {
"nsr-id": nsr_id,
"endpoint": r["entities"][0]["endpoint"],
}
if provider_id != nsd["id"]:
provider_dict["vnf-profile-id"] = provider_id
requirer_id = r["entities"][1]["id"]
requirer_dict = {
"nsr-id": nsr_id,
"endpoint": r["entities"][1]["endpoint"],
}
if requirer_id != nsd["id"]:
requirer_dict["vnf-profile-id"] = requirer_id
else:
raise Exception(
"provider/requirer or entities must be included in the relation."
)
relation_provider = self._update_ee_relation_data_with_implicit_data(
nsr_id, nsd, provider_dict, cached_vnfds
)
relation_requirer = self._update_ee_relation_data_with_implicit_data(
nsr_id, nsd, requirer_dict, cached_vnfds
)
provider = EERelation(relation_provider)
requirer = EERelation(relation_requirer)
relation = Relation(r["name"], provider, requirer)
vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
if vca_in_relation:
relations.append(relation)
return relations
def _get_vnf_relations(
self,
nsr_id: str,
nsd: Dict[str, Any],
vca: DeployedVCA,
cached_vnfds: Dict[str, Any],
) -> List[Relation]:
relations = []
if vca.target_element == "ns":
self.logger.debug("VCA is a NS charm, not a VNF.")
return relations
vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
vnf_profile_id = vnf_profile["id"]
vnfd_id = vnf_profile["vnfd-id"]
project = nsd["_admin"]["projects_read"][0]
db_vnfd = self._get_vnfd(vnfd_id, project, cached_vnfds)
db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
for r in db_vnf_relations:
provider_dict = None
requirer_dict = None
if all(key in r for key in ("provider", "requirer")):
provider_dict = r["provider"]
requirer_dict = r["requirer"]
elif "entities" in r:
provider_id = r["entities"][0]["id"]
provider_dict = {
"nsr-id": nsr_id,
"vnf-profile-id": vnf_profile_id,
"endpoint": r["entities"][0]["endpoint"],
}
if provider_id != vnfd_id:
provider_dict["vdu-profile-id"] = provider_id
requirer_id = r["entities"][1]["id"]
requirer_dict = {
"nsr-id": nsr_id,
"vnf-profile-id": vnf_profile_id,
"endpoint": r["entities"][1]["endpoint"],
}
if requirer_id != vnfd_id:
requirer_dict["vdu-profile-id"] = requirer_id
else:
raise Exception(
"provider/requirer or entities must be included in the relation."
)
relation_provider = self._update_ee_relation_data_with_implicit_data(
nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
)
relation_requirer = self._update_ee_relation_data_with_implicit_data(
nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
)
provider = EERelation(relation_provider)
requirer = EERelation(relation_requirer)
relation = Relation(r["name"], provider, requirer)
vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
if vca_in_relation:
relations.append(relation)
return relations
def _get_kdu_resource_data(
self,
ee_relation: EERelation,
db_nsr: Dict[str, Any],
cached_vnfds: Dict[str, Any],
) -> DeployedK8sResource:
nsd = get_nsd(db_nsr)
vnf_profiles = get_vnf_profiles(nsd)
vnfd_id = find_in_list(
vnf_profiles,
lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
)["vnfd-id"]
project = nsd["_admin"]["projects_read"][0]
db_vnfd = self._get_vnfd(vnfd_id, project, cached_vnfds)
kdu_resource_profile = get_kdu_resource_profile(
db_vnfd, ee_relation.kdu_resource_profile_id
)
kdu_name = kdu_resource_profile["kdu-name"]
deployed_kdu, _ = get_deployed_kdu(
db_nsr.get("_admin", ()).get("deployed", ()),
kdu_name,
ee_relation.vnf_profile_id,
)
deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
return deployed_kdu
def _get_deployed_component(
self,
ee_relation: EERelation,
db_nsr: Dict[str, Any],
cached_vnfds: Dict[str, Any],
) -> DeployedComponent:
nsr_id = db_nsr["_id"]
deployed_component = None
ee_level = EELevel.get_level(ee_relation)
if ee_level == EELevel.NS:
vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
if vca:
deployed_component = DeployedVCA(nsr_id, vca)
elif ee_level == EELevel.VNF:
vca = get_deployed_vca(
db_nsr,
{
"vdu_id": None,
"member-vnf-index": ee_relation.vnf_profile_id,
"ee_descriptor_id": ee_relation.execution_environment_ref,
},
)
if vca:
deployed_component = DeployedVCA(nsr_id, vca)
elif ee_level == EELevel.VDU:
vca = get_deployed_vca(
db_nsr,
{
"vdu_id": ee_relation.vdu_profile_id,
"member-vnf-index": ee_relation.vnf_profile_id,
"ee_descriptor_id": ee_relation.execution_environment_ref,
},
)
if vca:
deployed_component = DeployedVCA(nsr_id, vca)
elif ee_level == EELevel.KDU:
kdu_resource_data = self._get_kdu_resource_data(
ee_relation, db_nsr, cached_vnfds
)
if kdu_resource_data:
deployed_component = DeployedK8sResource(kdu_resource_data)
return deployed_component
async def _add_relation(
self,
relation: Relation,
vca_type: str,
db_nsr: Dict[str, Any],
cached_vnfds: Dict[str, Any],
cached_vnfrs: Dict[str, Any],
) -> bool:
deployed_provider = self._get_deployed_component(
relation.provider, db_nsr, cached_vnfds
)
deployed_requirer = self._get_deployed_component(
relation.requirer, db_nsr, cached_vnfds
)
if (
deployed_provider
and deployed_requirer
and deployed_provider.config_sw_installed
and deployed_requirer.config_sw_installed
):
provider_db_vnfr = (
self._get_vnfr(
relation.provider.nsr_id,
relation.provider.vnf_profile_id,
cached_vnfrs,
)
if relation.provider.vnf_profile_id
else None
)
requirer_db_vnfr = (
self._get_vnfr(
relation.requirer.nsr_id,
relation.requirer.vnf_profile_id,
cached_vnfrs,
)
if relation.requirer.vnf_profile_id
else None
)
provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
provider_relation_endpoint = RelationEndpoint(
deployed_provider.ee_id,
provider_vca_id,
relation.provider.endpoint,
)
requirer_relation_endpoint = RelationEndpoint(
deployed_requirer.ee_id,
requirer_vca_id,
relation.requirer.endpoint,
)
try:
await self.vca_map[vca_type].add_relation(
provider=provider_relation_endpoint,
requirer=requirer_relation_endpoint,
)
except N2VCException as exception:
self.logger.error(exception)
raise LcmException(exception)
return True
return False
async def _add_vca_relations(
self,
logging_text,
nsr_id,
vca_type: str,
vca_index: int,
timeout: int = 3600,
) -> bool:
# steps:
# 1. find all relations for this VCA
# 2. wait for other peers related
# 3. add relations
try:
# STEP 1: find all relations for this VCA
# read nsr record
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
nsd = get_nsd(db_nsr)
# this VCA data
deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
cached_vnfds = {}
cached_vnfrs = {}
relations = []
relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
# if no relations, terminate
if not relations:
self.logger.debug(logging_text + " No relations")
return True
self.logger.debug(logging_text + " adding relations {}".format(relations))
# add all relations
start = time()
while True:
# check timeout
now = time()
if now - start >= timeout:
self.logger.error(logging_text + " : timeout adding relations")
return False
# reload nsr from database (we need to update record: _admin.deployed.VCA)
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
# for each relation, find the VCA's related
for relation in relations.copy():
added = await self._add_relation(
relation,
vca_type,
db_nsr,
cached_vnfds,
cached_vnfrs,
)
if added:
relations.remove(relation)
if not relations:
self.logger.debug("Relations added")
break
await asyncio.sleep(5.0)
return True
except Exception as e:
self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
return False
async def _install_kdu(
self,
nsr_id: str,
nsr_db_path: str,
vnfr_data: dict,
kdu_index: int,
kdud: dict,
vnfd: dict,
k8s_instance_info: dict,
k8params: dict = None,
timeout: int = 600,
vca_id: str = None,
):
try:
k8sclustertype = k8s_instance_info["k8scluster-type"]
# Instantiate kdu
db_dict_install = {
"collection": "nsrs",
"filter": {"_id": nsr_id},
"path": nsr_db_path,
}
if k8s_instance_info.get("kdu-deployment-name"):
kdu_instance = k8s_instance_info.get("kdu-deployment-name")
else:
kdu_instance = self.k8scluster_map[
k8sclustertype
].generate_kdu_instance_name(
db_dict=db_dict_install,
kdu_model=k8s_instance_info["kdu-model"],
kdu_name=k8s_instance_info["kdu-name"],
)
# Update the nsrs table with the kdu-instance value
self.update_db_2(
item="nsrs",
_id=nsr_id,
_desc={nsr_db_path + ".kdu-instance": kdu_instance},
)
# Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
# `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
# between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
# namespace, this first verification could be removed, and the next step would be done for any kind
# of KNF.
# TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
# KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
if k8sclustertype in ("juju", "juju-bundle"):
# First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
# that the user passed a namespace which he wants its KDU to be deployed in)
if (
self.db.count(
table="nsrs",
q_filter={
"_id": nsr_id,
"_admin.projects_write": k8s_instance_info["namespace"],
"_admin.projects_read": k8s_instance_info["namespace"],
},
)
> 0
):
self.logger.debug(
f"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
)
self.update_db_2(
item="nsrs",
_id=nsr_id,
_desc={f"{nsr_db_path}.namespace": kdu_instance},
)
k8s_instance_info["namespace"] = kdu_instance
await self.k8scluster_map[k8sclustertype].install(
cluster_uuid=k8s_instance_info["k8scluster-uuid"],
kdu_model=k8s_instance_info["kdu-model"],
atomic=True,
params=k8params,
db_dict=db_dict_install,
timeout=timeout,
kdu_name=k8s_instance_info["kdu-name"],
namespace=k8s_instance_info["namespace"],
kdu_instance=kdu_instance,
vca_id=vca_id,
)
# Obtain services to obtain management service ip
services = await self.k8scluster_map[k8sclustertype].get_services(
cluster_uuid=k8s_instance_info["k8scluster-uuid"],
kdu_instance=kdu_instance,
namespace=k8s_instance_info["namespace"],
)
# Obtain management service info (if exists)
vnfr_update_dict = {}
kdu_config = get_configuration(vnfd, kdud["name"])
if kdu_config:
target_ee_list = kdu_config.get("execution-environment-list", [])
else:
target_ee_list = []
if services:
vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
mgmt_services = [
service
for service in kdud.get("service", [])
if service.get("mgmt-service")
]
for mgmt_service in mgmt_services:
for service in services:
if service["name"].startswith(mgmt_service["name"]):
# Mgmt service found, Obtain service ip
ip = service.get("external_ip", service.get("cluster_ip"))
if isinstance(ip, list) and len(ip) == 1:
ip = ip[0]
vnfr_update_dict[
"kdur.{}.ip-address".format(kdu_index)
] = ip
# Check if must update also mgmt ip at the vnf
service_external_cp = mgmt_service.get(
"external-connection-point-ref"
)
if service_external_cp:
if (
deep_get(vnfd, ("mgmt-interface", "cp"))
== service_external_cp
):
vnfr_update_dict["ip-address"] = ip
if find_in_list(
target_ee_list,
lambda ee: ee.get(
"external-connection-point-ref", ""
)
== service_external_cp,
):
vnfr_update_dict[
"kdur.{}.ip-address".format(kdu_index)
] = ip
break
else:
self.logger.warn(
"Mgmt service name: {} not found".format(
mgmt_service["name"]
)
)
vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
if (
kdu_config
and kdu_config.get("initial-config-primitive")
and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
and get_helm_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
):
initial_config_primitive_list = kdu_config.get(
"initial-config-primitive"
)
initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
for initial_config_primitive in initial_config_primitive_list:
primitive_params_ = self._map_primitive_params(
initial_config_primitive, {}, {}
)
await asyncio.wait_for(
self.k8scluster_map[k8sclustertype].exec_primitive(
cluster_uuid=k8s_instance_info["k8scluster-uuid"],
kdu_instance=kdu_instance,
primitive_name=initial_config_primitive["name"],
params=primitive_params_,
db_dict=db_dict_install,
vca_id=vca_id,
),
timeout=timeout,
)
except Exception as e:
# Prepare update db with error and raise exception
try:
self.update_db_2(
"nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
)
self.update_db_2(
"vnfrs",
vnfr_data.get("_id"),
{"kdur.{}.status".format(kdu_index): "ERROR"},
)
except Exception as error:
# ignore to keep original exception
self.logger.warning(
f"An exception occurred while updating DB: {str(error)}"
)
# reraise original error
raise
return kdu_instance
async def deploy_kdus(
self,
logging_text,
nsr_id,
nslcmop_id,
db_vnfrs,
db_vnfds,
task_instantiation_info,
):
# Launch kdus if present in the descriptor
k8scluster_id_2_uuic = {
"helm-chart-v3": {},
"juju-bundle": {},
}
async def _get_cluster_id(cluster_id, cluster_type):
nonlocal k8scluster_id_2_uuic
if cluster_id in k8scluster_id_2_uuic[cluster_type]:
return k8scluster_id_2_uuic[cluster_type][cluster_id]
# check if K8scluster is creating and wait look if previous tasks in process
task_name, task_dependency = self.lcm_tasks.lookfor_related(
"k8scluster", cluster_id
)
if task_dependency:
text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
task_name, cluster_id
)
self.logger.debug(logging_text + text)
await asyncio.wait(task_dependency, timeout=3600)
db_k8scluster = self.db.get_one(
"k8sclusters", {"_id": cluster_id}, fail_on_empty=False
)
if not db_k8scluster:
raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
if not k8s_id:
if cluster_type == "helm-chart-v3":
try:
# backward compatibility for existing clusters that have not been initialized for helm v3
k8s_credentials = yaml.safe_dump(
db_k8scluster.get("credentials")
)
k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
k8s_credentials, reuse_cluster_uuid=cluster_id
)
db_k8scluster_update = {}
db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
db_k8scluster_update[
"_admin.helm-chart-v3.created"
] = uninstall_sw
db_k8scluster_update[
"_admin.helm-chart-v3.operationalState"
] = "ENABLED"
self.update_db_2(
"k8sclusters", cluster_id, db_k8scluster_update
)
except Exception as e:
self.logger.error(
logging_text
+ "error initializing helm-v3 cluster: {}".format(str(e))
)
raise LcmException(
"K8s cluster '{}' has not been initialized for '{}'".format(
cluster_id, cluster_type
)
)
else:
raise LcmException(
"K8s cluster '{}' has not been initialized for '{}'".format(
cluster_id, cluster_type
)
)
k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
return k8s_id
logging_text += "Deploy kdus: "
step = ""
try:
db_nsr_update = {"_admin.deployed.K8s": []}
self.update_db_2("nsrs", nsr_id, db_nsr_update)
index = 0
updated_cluster_list = []
updated_v3_cluster_list = []
for vnfr_data in db_vnfrs.values():
vca_id = self.get_vca_id(vnfr_data, {})
for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
# Step 0: Prepare and set parameters
desc_params = parse_yaml_strings(kdur.get("additionalParams"))
vnfd_id = vnfr_data.get("vnfd-id")
vnfd_with_id = find_in_list(
db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
)
kdud = next(
kdud
for kdud in vnfd_with_id["kdu"]
if kdud["name"] == kdur["kdu-name"]
)
namespace = kdur.get("k8s-namespace")
kdu_deployment_name = kdur.get("kdu-deployment-name")
if kdur.get("helm-chart"):
kdumodel = kdur["helm-chart"]
# Default version: helm3, if helm-version is v2 assign v2
k8sclustertype = "helm-chart-v3"
self.logger.debug("kdur: {}".format(kdur))
elif kdur.get("juju-bundle"):
kdumodel = kdur["juju-bundle"]
k8sclustertype = "juju-bundle"
else:
raise LcmException(
"kdu type for kdu='{}.{}' is neither helm-chart nor "
"juju-bundle. Maybe an old NBI version is running".format(
vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
)
)
# check if kdumodel is a file and exists
try:
vnfd_with_id = find_in_list(
db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
)
storage = deep_get(vnfd_with_id, ("_admin", "storage"))
if storage: # may be not present if vnfd has not artifacts
# path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
if storage["pkg-dir"]:
filename = "{}/{}/{}s/{}".format(
storage["folder"],
storage["pkg-dir"],
k8sclustertype,
kdumodel,
)
else:
filename = "{}/Scripts/{}s/{}".format(
storage["folder"],
k8sclustertype,
kdumodel,
)
if self.fs.file_exists(
filename, mode="file"
) or self.fs.file_exists(filename, mode="dir"):
kdumodel = self.fs.path + filename
except (asyncio.TimeoutError, asyncio.CancelledError):
raise
except Exception as e: # it is not a file
self.logger.warning(f"An exception occurred: {str(e)}")
k8s_cluster_id = kdur["k8s-cluster"]["id"]
step = "Synchronize repos for k8s cluster '{}'".format(
k8s_cluster_id
)
cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
# Synchronize repos
if (
k8sclustertype == "helm-chart"
and cluster_uuid not in updated_cluster_list
) or (
k8sclustertype == "helm-chart-v3"
and cluster_uuid not in updated_v3_cluster_list
):
del_repo_list, added_repo_dict = await asyncio.ensure_future(
self.k8scluster_map[k8sclustertype].synchronize_repos(
cluster_uuid=cluster_uuid
)
)
if del_repo_list or added_repo_dict:
if k8sclustertype == "helm-chart":
unset = {
"_admin.helm_charts_added." + item: None
for item in del_repo_list
}
updated = {
"_admin.helm_charts_added." + item: name
for item, name in added_repo_dict.items()
}
updated_cluster_list.append(cluster_uuid)
elif k8sclustertype == "helm-chart-v3":
unset = {
"_admin.helm_charts_v3_added." + item: None
for item in del_repo_list
}
updated = {
"_admin.helm_charts_v3_added." + item: name
for item, name in added_repo_dict.items()
}
updated_v3_cluster_list.append(cluster_uuid)
self.logger.debug(
logging_text + "repos synchronized on k8s cluster "
"'{}' to_delete: {}, to_add: {}".format(
k8s_cluster_id, del_repo_list, added_repo_dict
)
)
self.db.set_one(
"k8sclusters",
{"_id": k8s_cluster_id},
updated,
unset=unset,
)
# Instantiate kdu
step = "Instantiating KDU {}.{} in k8s cluster {}".format(
vnfr_data["member-vnf-index-ref"],
kdur["kdu-name"],
k8s_cluster_id,
)
k8s_instance_info = {
"kdu-instance": None,
"k8scluster-uuid": cluster_uuid,
"k8scluster-type": k8sclustertype,
"member-vnf-index": vnfr_data["member-vnf-index-ref"],
"kdu-name": kdur["kdu-name"],
"kdu-model": kdumodel,
"namespace": namespace,
"kdu-deployment-name": kdu_deployment_name,
}
db_path = "_admin.deployed.K8s.{}".format(index)
db_nsr_update[db_path] = k8s_instance_info
self.update_db_2("nsrs", nsr_id, db_nsr_update)
vnfd_with_id = find_in_list(
db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
)
task = asyncio.ensure_future(
self._install_kdu(
nsr_id,
db_path,
vnfr_data,
kdu_index,
kdud,
vnfd_with_id,
k8s_instance_info,
k8params=desc_params,
timeout=1800,
vca_id=vca_id,
)
)
self.lcm_tasks.register(
"ns",
nsr_id,
nslcmop_id,
"instantiate_KDU-{}".format(index),
task,
)
task_instantiation_info[task] = "Deploying KDU {}".format(
kdur["kdu-name"]
)
index += 1
except (LcmException, asyncio.CancelledError):
raise
except Exception as e:
msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
if isinstance(e, (N2VCException, DbException)):
self.logger.error(logging_text + msg)
else:
self.logger.critical(logging_text + msg, exc_info=True)
raise LcmException(msg)
finally:
if db_nsr_update:
self.update_db_2("nsrs", nsr_id, db_nsr_update)
def _deploy_n2vc(
self,
logging_text,
db_nsr,
db_vnfr,
nslcmop_id,
nsr_id,
nsi_id,
vnfd_id,
vdu_id,
kdu_name,
member_vnf_index,
vdu_index,
kdu_index,
vdu_name,
deploy_params,
descriptor_config,
base_folder,
task_instantiation_info,
stage,
):
# launch instantiate_N2VC in a asyncio task and register task object
# Look where information of this charm is at database <nsrs>._admin.deployed.VCA
# if not found, create one entry and update database
# fill db_nsr._admin.deployed.VCA.<index>
self.logger.debug(
logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
)
charm_name = ""
get_charm_name = False
if "execution-environment-list" in descriptor_config:
ee_list = descriptor_config.get("execution-environment-list", [])
elif "juju" in descriptor_config:
ee_list = [descriptor_config] # ns charms
if "execution-environment-list" not in descriptor_config:
# charm name is only required for ns charms
get_charm_name = True
else: # other types as script are not supported
ee_list = []
for ee_item in ee_list:
self.logger.debug(
logging_text
+ "_deploy_n2vc ee_item juju={}, helm={}".format(
ee_item.get("juju"), ee_item.get("helm-chart")
)
)
ee_descriptor_id = ee_item.get("id")
vca_name, charm_name, vca_type = self.get_vca_info(
ee_item, db_nsr, get_charm_name
)
if not vca_type:
self.logger.debug(
logging_text + "skipping, non juju/charm/helm configuration"
)
continue
vca_index = -1
for vca_index, vca_deployed in enumerate(
db_nsr["_admin"]["deployed"]["VCA"]
):
if not vca_deployed:
continue
if (
vca_deployed.get("member-vnf-index") == member_vnf_index
and vca_deployed.get("vdu_id") == vdu_id
and vca_deployed.get("kdu_name") == kdu_name
and vca_deployed.get("vdu_count_index", 0) == vdu_index
and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
):
break
else:
# not found, create one.
target = (
"ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
)
if vdu_id:
target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
elif kdu_name:
target += "/kdu/{}".format(kdu_name)
vca_deployed = {
"target_element": target,
# ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
"member-vnf-index": member_vnf_index,
"vdu_id": vdu_id,
"kdu_name": kdu_name,
"vdu_count_index": vdu_index,
"operational-status": "init", # TODO revise
"detailed-status": "", # TODO revise
"step": "initial-deploy", # TODO revise
"vnfd_id": vnfd_id,
"vdu_name": vdu_name,
"type": vca_type,
"ee_descriptor_id": ee_descriptor_id,
"charm_name": charm_name,
}
vca_index += 1
# create VCA and configurationStatus in db
db_dict = {
"_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
"configurationStatus.{}".format(vca_index): dict(),
}
self.update_db_2("nsrs", nsr_id, db_dict)
db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
# Launch task
task_n2vc = asyncio.ensure_future(
self.instantiate_N2VC(
logging_text=logging_text,
vca_index=vca_index,
nsi_id=nsi_id,
db_nsr=db_nsr,
db_vnfr=db_vnfr,
vdu_id=vdu_id,
kdu_name=kdu_name,
vdu_index=vdu_index,
kdu_index=kdu_index,
deploy_params=deploy_params,
config_descriptor=descriptor_config,
base_folder=base_folder,
nslcmop_id=nslcmop_id,
stage=stage,
vca_type=vca_type,
vca_name=vca_name,
ee_config_descriptor=ee_item,
)
)
self.lcm_tasks.register(
"ns",
nsr_id,
nslcmop_id,
"instantiate_N2VC-{}".format(vca_index),
task_n2vc,
)
task_instantiation_info[
task_n2vc
] = self.task_name_deploy_vca + " {}.{}".format(
member_vnf_index or "", vdu_id or ""
)
def _format_additional_params(self, params):
params = params or {}
for key, value in params.items():
if str(value).startswith("!!yaml "):
params[key] = yaml.safe_load(value[7:])
return params
def _get_terminate_primitive_params(self, seq, vnf_index):
primitive = seq.get("name")
primitive_params = {}
params = {
"member_vnf_index": vnf_index,
"primitive": primitive,
"primitive_params": primitive_params,
}
desc_params = {}
return self._map_primitive_params(seq, params, desc_params)
# sub-operations
def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
if op.get("operationState") == "COMPLETED":
# b. Skip sub-operation
# _ns_execute_primitive() or RO.create_action() will NOT be executed
return self.SUBOPERATION_STATUS_SKIP
else:
# c. retry executing sub-operation
# The sub-operation exists, and operationState != 'COMPLETED'
# Update operationState = 'PROCESSING' to indicate a retry.
operationState = "PROCESSING"
detailed_status = "In progress"
self._update_suboperation_status(
db_nslcmop, op_index, operationState, detailed_status
)
# Return the sub-operation index
# _ns_execute_primitive() or RO.create_action() will be called from scale()
# with arguments extracted from the sub-operation
return op_index
# Find a sub-operation where all keys in a matching dictionary must match
# Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
def _find_suboperation(self, db_nslcmop, match):
if db_nslcmop and match:
op_list = db_nslcmop.get("_admin", {}).get("operations", [])
for i, op in enumerate(op_list):
if all(op.get(k) == match[k] for k in match):
return i
return self.SUBOPERATION_STATUS_NOT_FOUND
# Update status for a sub-operation given its index
def _update_suboperation_status(
self, db_nslcmop, op_index, operationState, detailed_status
):
# Update DB for HA tasks
q_filter = {"_id": db_nslcmop["_id"]}
update_dict = {
"_admin.operations.{}.operationState".format(op_index): operationState,
"_admin.operations.{}.detailed-status".format(op_index): detailed_status,
}
self.db.set_one(
"nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
)
# Add sub-operation, return the index of the added sub-operation
# Optionally, set operationState, detailed-status, and operationType
# Status and type are currently set for 'scale' sub-operations:
# 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
# 'detailed-status' : status message
# 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
# Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
def _add_suboperation(
self,
db_nslcmop,
vnf_index,
vdu_id,
vdu_count_index,
vdu_name,
primitive,
mapped_primitive_params,
operationState=None,
detailed_status=None,
operationType=None,
RO_nsr_id=None,
RO_scaling_info=None,
):
if not db_nslcmop:
return self.SUBOPERATION_STATUS_NOT_FOUND
# Get the "_admin.operations" list, if it exists
db_nslcmop_admin = db_nslcmop.get("_admin", {})
op_list = db_nslcmop_admin.get("operations")
# Create or append to the "_admin.operations" list
new_op = {
"member_vnf_index": vnf_index,
"vdu_id": vdu_id,
"vdu_count_index": vdu_count_index,
"primitive": primitive,
"primitive_params": mapped_primitive_params,
}
if operationState:
new_op["operationState"] = operationState
if detailed_status:
new_op["detailed-status"] = detailed_status
if operationType:
new_op["lcmOperationType"] = operationType
if RO_nsr_id:
new_op["RO_nsr_id"] = RO_nsr_id
if RO_scaling_info:
new_op["RO_scaling_info"] = RO_scaling_info
if not op_list:
# No existing operations, create key 'operations' with current operation as first list element
db_nslcmop_admin.update({"operations": [new_op]})
op_list = db_nslcmop_admin.get("operations")
else:
# Existing operations, append operation to list
op_list.append(new_op)
db_nslcmop_update = {"_admin.operations": op_list}
self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
op_index = len(op_list) - 1
return op_index
# Helper methods for scale() sub-operations
# pre-scale/post-scale:
# Check for 3 different cases:
# a. New: First time execution, return SUBOPERATION_STATUS_NEW
# b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
# c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
def _check_or_add_scale_suboperation(
self,
db_nslcmop,
vnf_index,
vnf_config_primitive,
primitive_params,
operationType,
RO_nsr_id=None,
RO_scaling_info=None,
):
# Find this sub-operation
if RO_nsr_id and RO_scaling_info:
operationType = "SCALE-RO"
match = {
"member_vnf_index": vnf_index,
"RO_nsr_id": RO_nsr_id,
"RO_scaling_info": RO_scaling_info,
}
else:
match = {
"member_vnf_index": vnf_index,
"primitive": vnf_config_primitive,
"primitive_params": primitive_params,
"lcmOperationType": operationType,
}
op_index = self._find_suboperation(db_nslcmop, match)
if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
# a. New sub-operation
# The sub-operation does not exist, add it.
# _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
# The following parameters are set to None for all kind of scaling:
vdu_id = None
vdu_count_index = None
vdu_name = None
if RO_nsr_id and RO_scaling_info:
vnf_config_primitive = None
primitive_params = None
else:
RO_nsr_id = None
RO_scaling_info = None
# Initial status for sub-operation
operationState = "PROCESSING"
detailed_status = "In progress"
# Add sub-operation for pre/post-scaling (zero or more operations)
self._add_suboperation(
db_nslcmop,
vnf_index,
vdu_id,
vdu_count_index,
vdu_name,
vnf_config_primitive,
primitive_params,
operationState,
detailed_status,
operationType,
RO_nsr_id,
RO_scaling_info,
)
return self.SUBOPERATION_STATUS_NEW
else:
# Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
# or op_index (operationState != 'COMPLETED')
return self._retry_or_skip_suboperation(db_nslcmop, op_index)
# Function to return execution_environment id
async def destroy_N2VC(
self,
logging_text,
db_nslcmop,
vca_deployed,
config_descriptor,
vca_index,
destroy_ee=True,
exec_primitives=True,
scaling_in=False,
vca_id: str = None,
):
"""
Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
:param logging_text:
:param db_nslcmop:
:param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
:param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
:param vca_index: index in the database _admin.deployed.VCA
:param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
:param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
not executed properly
:param scaling_in: True destroys the application, False destroys the model
:return: None or exception
"""
self.logger.debug(
logging_text
+ " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
vca_index, vca_deployed, config_descriptor, destroy_ee
)
)
vca_type = vca_deployed.get("type", "lxc_proxy_charm")
# execute terminate_primitives
if exec_primitives:
terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
config_descriptor.get("terminate-config-primitive"),
vca_deployed.get("ee_descriptor_id"),
)
vdu_id = vca_deployed.get("vdu_id")
vdu_count_index = vca_deployed.get("vdu_count_index")
vdu_name = vca_deployed.get("vdu_name")
vnf_index = vca_deployed.get("member-vnf-index")
if terminate_primitives and vca_deployed.get("needed_terminate"):
for seq in terminate_primitives:
# For each sequence in list, get primitive and call _ns_execute_primitive()
step = "Calling terminate action for vnf_member_index={} primitive={}".format(
vnf_index, seq.get("name")
)
self.logger.debug(logging_text + step)
# Create the primitive for each sequence, i.e. "primitive": "touch"
primitive = seq.get("name")
mapped_primitive_params = self._get_terminate_primitive_params(
seq, vnf_index
)
# Add sub-operation
self._add_suboperation(
db_nslcmop,
vnf_index,
vdu_id,
vdu_count_index,
vdu_name,
primitive,
mapped_primitive_params,
)
# Sub-operations: Call _ns_execute_primitive() instead of action()
try:
result, result_detail = await self._ns_execute_primitive(
vca_deployed["ee_id"],
primitive,
mapped_primitive_params,
vca_type=vca_type,
vca_id=vca_id,
)
except LcmException:
# this happens when VCA is not deployed. In this case it is not needed to terminate
continue
result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
if result not in result_ok:
raise LcmException(
"terminate_primitive {} for vnf_member_index={} fails with "
"error {}".format(seq.get("name"), vnf_index, result_detail)
)
# set that this VCA do not need terminated
db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
vca_index
)
self.update_db_2(
"nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
)
# Delete Prometheus Jobs if any
# This uses NSR_ID, so it will destroy any jobs under this index
self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
if destroy_ee:
await self.vca_map[vca_type].delete_execution_environment(
vca_deployed["ee_id"],
scaling_in=scaling_in,
vca_type=vca_type,
vca_id=vca_id,
)
async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
namespace = "." + db_nsr["_id"]
try:
await self.n2vc.delete_namespace(
namespace=namespace,
total_timeout=self.timeout.charm_delete,
vca_id=vca_id,
)
except N2VCNotFound: # already deleted. Skip
pass
self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
async def terminate(self, nsr_id, nslcmop_id):
# Try to lock HA task here
task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
if not task_is_locked_by_me:
return
logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
timeout_ns_terminate = self.timeout.ns_terminate
db_nsr = None
db_nslcmop = None
operation_params = None
exc = None
error_list = [] # annotates all failed error messages
db_nslcmop_update = {}
autoremove = False # autoremove after terminated
tasks_dict_info = {}
db_nsr_update = {}
stage = [
"Stage 1/3: Preparing task.",
"Waiting for previous operations to terminate.",
"",
]
# ^ contains [stage, step, VIM-status]
try:
# wait for any previous tasks in process
await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
operation_params = db_nslcmop.get("operationParams") or {}
if operation_params.get("timeout_ns_terminate"):
timeout_ns_terminate = operation_params["timeout_ns_terminate"]
stage[1] = "Getting nsr={} from db.".format(nsr_id)
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
db_nsr_update["operational-status"] = "terminating"
db_nsr_update["config-status"] = "terminating"
self._write_ns_status(
nsr_id=nsr_id,
ns_state="TERMINATING",
current_operation="TERMINATING",
current_operation_id=nslcmop_id,
other_update=db_nsr_update,
)
self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
return
stage[1] = "Getting vnf descriptors from db."
db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
db_vnfrs_dict = {
db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
}
db_vnfds_from_id = {}
db_vnfds_from_member_index = {}
# Loop over VNFRs
for vnfr in db_vnfrs_list:
vnfd_id = vnfr["vnfd-id"]
if vnfd_id not in db_vnfds_from_id:
vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
db_vnfds_from_id[vnfd_id] = vnfd
db_vnfds_from_member_index[
vnfr["member-vnf-index-ref"]
] = db_vnfds_from_id[vnfd_id]
# Destroy individual execution environments when there are terminating primitives.
# Rest of EE will be deleted at once
# TODO - check before calling _destroy_N2VC
# if not operation_params.get("skip_terminate_primitives"):#
# or not vca.get("needed_terminate"):
stage[0] = "Stage 2/3 execute terminating primitives."
self.logger.debug(logging_text + stage[0])
stage[1] = "Looking execution environment that needs terminate."
self.logger.debug(logging_text + stage[1])
for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
config_descriptor = None
vca_member_vnf_index = vca.get("member-vnf-index")
vca_id = self.get_vca_id(
db_vnfrs_dict.get(vca_member_vnf_index)
if vca_member_vnf_index
else None,
db_nsr,
)
if not vca or not vca.get("ee_id"):
continue
if not vca.get("member-vnf-index"):
# ns
config_descriptor = db_nsr.get("ns-configuration")
elif vca.get("vdu_id"):
db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
elif vca.get("kdu_name"):
db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
else:
db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
vca_type = vca.get("type")
exec_terminate_primitives = not operation_params.get(
"skip_terminate_primitives"
) and vca.get("needed_terminate")
# For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
# pending native charms
destroy_ee = True if vca_type in ("helm-v3", "native_charm") else False
# self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
# vca_index, vca.get("ee_id"), vca_type, destroy_ee))
task = asyncio.ensure_future(
self.destroy_N2VC(
logging_text,
db_nslcmop,
vca,
config_descriptor,
vca_index,
destroy_ee,
exec_terminate_primitives,
vca_id=vca_id,
)
)
tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
# wait for pending tasks of terminate primitives
if tasks_dict_info:
self.logger.debug(
logging_text
+ "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
)
error_list = await self._wait_for_tasks(
logging_text,
tasks_dict_info,
min(self.timeout.charm_delete, timeout_ns_terminate),
stage,
nslcmop_id,
)
tasks_dict_info.clear()
if error_list:
return # raise LcmException("; ".join(error_list))
# remove All execution environments at once
stage[0] = "Stage 3/3 delete all."
if nsr_deployed.get("VCA"):
stage[1] = "Deleting all execution environments."
self.logger.debug(logging_text + stage[1])
helm_vca_list = get_deployed_vca(db_nsr, {"type": "helm-v3"})
if helm_vca_list:
# Delete Namespace and Certificates
await self.vca_map["helm-v3"].delete_tls_certificate(
namespace=db_nslcmop["nsInstanceId"],
certificate_name=self.EE_TLS_NAME,
)
await self.vca_map["helm-v3"].delete_namespace(
namespace=db_nslcmop["nsInstanceId"],
)
else:
vca_id = self.get_vca_id({}, db_nsr)
task_delete_ee = asyncio.ensure_future(
asyncio.wait_for(
self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
timeout=self.timeout.charm_delete,
)
)
tasks_dict_info[task_delete_ee] = "Terminating all VCA"
# Delete from k8scluster
stage[1] = "Deleting KDUs."
self.logger.debug(logging_text + stage[1])
# print(nsr_deployed)
for kdu in get_iterable(nsr_deployed, "K8s"):
if not kdu or not kdu.get("kdu-instance"):
continue
kdu_instance = kdu.get("kdu-instance")
if kdu.get("k8scluster-type") in self.k8scluster_map:
# TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
vca_id = self.get_vca_id({}, db_nsr)
task_delete_kdu_instance = asyncio.ensure_future(
self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu_instance,
vca_id=vca_id,
namespace=kdu.get("namespace"),
)
)
else:
self.logger.error(
logging_text
+ "Unknown k8s deployment type {}".format(
kdu.get("k8scluster-type")
)
)
continue
tasks_dict_info[
task_delete_kdu_instance
] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
# remove from RO
stage[1] = "Deleting ns from VIM."
if self.ro_config.ng:
task_delete_ro = asyncio.ensure_future(
self._terminate_ng_ro(
logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
)
)
tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
# rest of staff will be done at finally
except (
ROclient.ROClientException,
DbException,
LcmException,
N2VCException,
) as e:
self.logger.error(logging_text + "Exit Exception {}".format(e))
exc = e
except asyncio.CancelledError:
self.logger.error(
logging_text + "Cancelled Exception while '{}'".format(stage[1])
)
exc = "Operation was cancelled"
except Exception as e:
exc = traceback.format_exc()
self.logger.critical(
logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
exc_info=True,
)
finally:
if exc:
error_list.append(str(exc))
try:
# wait for pending tasks
if tasks_dict_info:
stage[1] = "Waiting for terminate pending tasks."
self.logger.debug(logging_text + stage[1])
error_list += await self._wait_for_tasks(
logging_text,
tasks_dict_info,
timeout_ns_terminate,
stage,
nslcmop_id,
)
stage[1] = stage[2] = ""
except asyncio.CancelledError:
error_list.append("Cancelled")
await self._cancel_pending_tasks(logging_text, tasks_dict_info)
await self._wait_for_tasks(
logging_text,
tasks_dict_info,
timeout_ns_terminate,
stage,
nslcmop_id,
)
except Exception as exc:
error_list.append(str(exc))
# update status at database
if error_list:
error_detail = "; ".join(error_list)
# self.logger.error(logging_text + error_detail)
error_description_nslcmop = "{} Detail: {}".format(
stage[0], error_detail
)
error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
nslcmop_id, stage[0]
)
db_nsr_update["operational-status"] = "failed"
db_nsr_update["detailed-status"] = (
error_description_nsr + " Detail: " + error_detail
)
db_nslcmop_update["detailed-status"] = error_detail
nslcmop_operation_state = "FAILED"
ns_state = "BROKEN"
else:
error_detail = None
error_description_nsr = error_description_nslcmop = None
ns_state = "NOT_INSTANTIATED"
db_nsr_update["operational-status"] = "terminated"
db_nsr_update["detailed-status"] = "Done"
db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
db_nslcmop_update["detailed-status"] = "Done"
nslcmop_operation_state = "COMPLETED"
if db_nsr:
self._write_ns_status(
nsr_id=nsr_id,
ns_state=ns_state,
current_operation="IDLE",
current_operation_id=None,
error_description=error_description_nsr,
error_detail=error_detail,
other_update=db_nsr_update,
)
self._write_op_status(
op_id=nslcmop_id,
stage="",
error_message=error_description_nslcmop,
operation_state=nslcmop_operation_state,
other_update=db_nslcmop_update,
)
if nslcmop_operation_state == "COMPLETED":
self.db.del_list("prometheus_jobs", {"nsr_id": nsr_id})
if ns_state == "NOT_INSTANTIATED":
try:
self.db.set_list(
"vnfrs",
{"nsr-id-ref": nsr_id},
{"_admin.nsState": "NOT_INSTANTIATED"},
)
except DbException as e:
self.logger.warn(
logging_text
+ "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
nsr_id, e
)
)
if operation_params:
autoremove = operation_params.get("autoremove", False)
if nslcmop_operation_state:
try:
await self.msg.aiowrite(
"ns",
"terminated",
{
"nsr_id": nsr_id,
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
"autoremove": autoremove,
},
)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
)
self.logger.debug(f"Deleting alerts: ns_id={nsr_id}")
self.db.del_list("alerts", {"tags.ns_id": nsr_id})
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
async def _wait_for_tasks(
self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
):
time_start = time()
error_detail_list = []
error_list = []
pending_tasks = list(created_tasks_info.keys())
num_tasks = len(pending_tasks)
num_done = 0
stage[1] = "{}/{}.".format(num_done, num_tasks)
self._write_op_status(nslcmop_id, stage)
while pending_tasks:
new_error = None
_timeout = timeout + time_start - time()
done, pending_tasks = await asyncio.wait(
pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
)
num_done += len(done)
if not done: # Timeout
for task in pending_tasks:
new_error = created_tasks_info[task] + ": Timeout"
error_detail_list.append(new_error)
error_list.append(new_error)
break
for task in done:
if task.cancelled():
exc = "Cancelled"
else:
exc = task.exception()
if exc:
if isinstance(exc, asyncio.TimeoutError):
exc = "Timeout"
new_error = created_tasks_info[task] + ": {}".format(exc)
error_list.append(created_tasks_info[task])
error_detail_list.append(new_error)
if isinstance(
exc,
(
str,
DbException,
N2VCException,
ROclient.ROClientException,
LcmException,
K8sException,
NgRoException,
),
):
self.logger.error(logging_text + new_error)
else:
exc_traceback = "".join(
traceback.format_exception(None, exc, exc.__traceback__)
)
self.logger.error(
logging_text
+ created_tasks_info[task]
+ " "
+ exc_traceback
)
else:
self.logger.debug(
logging_text + created_tasks_info[task] + ": Done"
)
stage[1] = "{}/{}.".format(num_done, num_tasks)
if new_error:
stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
if nsr_id: # update also nsr
self.update_db_2(
"nsrs",
nsr_id,
{
"errorDescription": "Error at: " + ", ".join(error_list),
"errorDetail": ". ".join(error_detail_list),
},
)
self._write_op_status(nslcmop_id, stage)
return error_detail_list
async def _cancel_pending_tasks(self, logging_text, created_tasks_info):
for task, name in created_tasks_info.items():
self.logger.debug(logging_text + "Cancelling task: " + name)
task.cancel()
@staticmethod
def _map_primitive_params(primitive_desc, params, instantiation_params):
"""
Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
The default-value is used. If it is between < > it look for a value at instantiation_params
:param primitive_desc: portion of VNFD/NSD that describes primitive
:param params: Params provided by user
:param instantiation_params: Instantiation params provided by user
:return: a dictionary with the calculated params
"""
calculated_params = {}
for parameter in primitive_desc.get("parameter", ()):
param_name = parameter["name"]
if param_name in params:
calculated_params[param_name] = params[param_name]
elif "default-value" in parameter or "value" in parameter:
if "value" in parameter:
calculated_params[param_name] = parameter["value"]
else:
calculated_params[param_name] = parameter["default-value"]
if (
isinstance(calculated_params[param_name], str)
and calculated_params[param_name].startswith("<")
and calculated_params[param_name].endswith(">")
):
if calculated_params[param_name][1:-1] in instantiation_params:
calculated_params[param_name] = instantiation_params[
calculated_params[param_name][1:-1]
]
else:
raise LcmException(
"Parameter {} needed to execute primitive {} not provided".format(
calculated_params[param_name], primitive_desc["name"]
)
)
else:
raise LcmException(
"Parameter {} needed to execute primitive {} not provided".format(
param_name, primitive_desc["name"]
)
)
if isinstance(calculated_params[param_name], (dict, list, tuple)):
calculated_params[param_name] = yaml.safe_dump(
calculated_params[param_name], default_flow_style=True, width=256
)
elif isinstance(calculated_params[param_name], str) and calculated_params[
param_name
].startswith("!!yaml "):
calculated_params[param_name] = calculated_params[param_name][7:]
if parameter.get("data-type") == "INTEGER":
try:
calculated_params[param_name] = int(calculated_params[param_name])
except ValueError: # error converting string to int
raise LcmException(
"Parameter {} of primitive {} must be integer".format(
param_name, primitive_desc["name"]
)
)
elif parameter.get("data-type") == "BOOLEAN":
calculated_params[param_name] = not (
(str(calculated_params[param_name])).lower() == "false"
)
# add always ns_config_info if primitive name is config
if primitive_desc["name"] == "config":
if "ns_config_info" in instantiation_params:
calculated_params["ns_config_info"] = instantiation_params[
"ns_config_info"
]
return calculated_params
def _look_for_deployed_vca(
self,
deployed_vca,
member_vnf_index,
vdu_id,
vdu_count_index,
kdu_name=None,
ee_descriptor_id=None,
):
# find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
for vca in deployed_vca:
if not vca:
continue
if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
continue
if (
vdu_count_index is not None
and vdu_count_index != vca["vdu_count_index"]
):
continue
if kdu_name and kdu_name != vca["kdu_name"]:
continue
if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
continue
break
else:
# vca_deployed not found
raise LcmException(
"charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
" is not deployed".format(
member_vnf_index,
vdu_id,
vdu_count_index,
kdu_name,
ee_descriptor_id,
)
)
# get ee_id
ee_id = vca.get("ee_id")
vca_type = vca.get(
"type", "lxc_proxy_charm"
) # default value for backward compatibility - proxy charm
if not ee_id:
raise LcmException(
"charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
"execution environment".format(
member_vnf_index, vdu_id, kdu_name, vdu_count_index
)
)
return ee_id, vca_type
async def _ns_execute_primitive(
self,
ee_id,
primitive,
primitive_params,
retries=0,
retries_interval=30,
timeout=None,
vca_type=None,
db_dict=None,
vca_id: str = None,
) -> (str, str):
try:
if primitive == "config":
primitive_params = {"params": primitive_params}
vca_type = vca_type or "lxc_proxy_charm"
while retries >= 0:
try:
output = await asyncio.wait_for(
self.vca_map[vca_type].exec_primitive(
ee_id=ee_id,
primitive_name=primitive,
params_dict=primitive_params,
progress_timeout=self.timeout.progress_primitive,
total_timeout=self.timeout.primitive,
db_dict=db_dict,
vca_id=vca_id,
vca_type=vca_type,
),
timeout=timeout or self.timeout.primitive,
)
# execution was OK
break
except asyncio.CancelledError:
raise
except Exception as e:
retries -= 1
if retries >= 0:
self.logger.debug(
"Error executing action {} on {} -> {}".format(
primitive, ee_id, e
)
)
# wait and retry
await asyncio.sleep(retries_interval)
else:
if isinstance(e, asyncio.TimeoutError):
e = N2VCException(
message="Timed out waiting for action to complete"
)
return "FAILED", getattr(e, "message", repr(e))
return "COMPLETED", output
except (LcmException, asyncio.CancelledError):
raise
except Exception as e:
return "FAIL", "Error executing action {}: {}".format(primitive, e)
async def vca_status_refresh(self, nsr_id, nslcmop_id):
"""
Updating the vca_status with latest juju information in nsrs record
:param: nsr_id: Id of the nsr
:param: nslcmop_id: Id of the nslcmop
:return: None
"""
self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
vca_id = self.get_vca_id({}, db_nsr)
if db_nsr["_admin"]["deployed"]["K8s"]:
for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
cluster_uuid, kdu_instance, cluster_type = (
k8s["k8scluster-uuid"],
k8s["kdu-instance"],
k8s["k8scluster-type"],
)
await self._on_update_k8s_db(
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
filter={"_id": nsr_id},
vca_id=vca_id,
cluster_type=cluster_type,
)
if db_nsr["_admin"]["deployed"]["VCA"]:
for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
table, filter = "nsrs", {"_id": nsr_id}
path = "_admin.deployed.VCA.{}.".format(vca_index)
await self._on_update_n2vc_db(table, filter, path, {})
self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
async def action(self, nsr_id, nslcmop_id):
# Try to lock HA task here
task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
if not task_is_locked_by_me:
return
logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
# get all needed from database
db_nsr = None
db_nslcmop = None
db_nsr_update = {}
db_nslcmop_update = {}
nslcmop_operation_state = None
error_description_nslcmop = None
exc = None
step = ""
try:
# wait for any previous tasks in process
step = "Waiting for previous operations to terminate"
await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
self._write_ns_status(
nsr_id=nsr_id,
ns_state=None,
current_operation="RUNNING ACTION",
current_operation_id=nslcmop_id,
)
step = "Getting information from database"
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
if db_nslcmop["operationParams"].get("primitive_params"):
db_nslcmop["operationParams"]["primitive_params"] = json.loads(
db_nslcmop["operationParams"]["primitive_params"]
)
nsr_deployed = db_nsr["_admin"].get("deployed")
vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
vdu_id = db_nslcmop["operationParams"].get("vdu_id")
kdu_name = db_nslcmop["operationParams"].get("kdu_name")
vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
primitive = db_nslcmop["operationParams"]["primitive"]
primitive_params = db_nslcmop["operationParams"]["primitive_params"]
timeout_ns_action = db_nslcmop["operationParams"].get(
"timeout_ns_action", self.timeout.primitive
)
if vnf_index:
step = "Getting vnfr from database"
db_vnfr = self.db.get_one(
"vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
)
if db_vnfr.get("kdur"):
kdur_list = []
for kdur in db_vnfr["kdur"]:
if kdur.get("additionalParams"):
kdur["additionalParams"] = json.loads(
kdur["additionalParams"]
)
kdur_list.append(kdur)
db_vnfr["kdur"] = kdur_list
step = "Getting vnfd from database"
db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
# Sync filesystem before running a primitive
self.fs.sync(db_vnfr["vnfd-id"])
else:
step = "Getting nsd from database"
db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
vca_id = self.get_vca_id(db_vnfr, db_nsr)
# for backward compatibility
if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
self.update_db_2("nsrs", nsr_id, db_nsr_update)
# look for primitive
config_primitive_desc = descriptor_configuration = None
if vdu_id:
descriptor_configuration = get_configuration(db_vnfd, vdu_id)
elif kdu_name:
descriptor_configuration = get_configuration(db_vnfd, kdu_name)
elif vnf_index:
descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
else:
descriptor_configuration = db_nsd.get("ns-configuration")
if descriptor_configuration and descriptor_configuration.get(
"config-primitive"
):
for config_primitive in descriptor_configuration["config-primitive"]:
if config_primitive["name"] == primitive:
config_primitive_desc = config_primitive
break
if not config_primitive_desc:
if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
raise LcmException(
"Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
primitive
)
)
primitive_name = primitive
ee_descriptor_id = None
else:
primitive_name = config_primitive_desc.get(
"execution-environment-primitive", primitive
)
ee_descriptor_id = config_primitive_desc.get(
"execution-environment-ref"
)
if vnf_index:
if vdu_id:
vdur = next(
(x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
)
desc_params = parse_yaml_strings(vdur.get("additionalParams"))
elif kdu_name:
kdur = next(
(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
)
desc_params = parse_yaml_strings(kdur.get("additionalParams"))
else:
desc_params = parse_yaml_strings(
db_vnfr.get("additionalParamsForVnf")
)
else:
desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
if kdu_name and get_configuration(db_vnfd, kdu_name):
kdu_configuration = get_configuration(db_vnfd, kdu_name)
actions = set()
for primitive in kdu_configuration.get("initial-config-primitive", []):
actions.add(primitive["name"])
for primitive in kdu_configuration.get("config-primitive", []):
actions.add(primitive["name"])
kdu = find_in_list(
nsr_deployed["K8s"],
lambda kdu: kdu_name == kdu["kdu-name"]
and kdu["member-vnf-index"] == vnf_index,
)
kdu_action = (
True
if primitive_name in actions
and kdu["k8scluster-type"] != "helm-chart-v3"
else False
)
# TODO check if ns is in a proper status
if kdu_name and (
primitive_name in ("upgrade", "rollback", "status") or kdu_action
):
# TODO Check if we will need something at vnf level
for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
if (
kdu_name == kdu["kdu-name"]
and kdu["member-vnf-index"] == vnf_index
):
break
else:
raise LcmException(
"KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
)
if kdu.get("k8scluster-type") not in self.k8scluster_map:
msg = "unknown k8scluster-type '{}'".format(
kdu.get("k8scluster-type")
)
raise LcmException(msg)
db_dict = {
"collection": "nsrs",
"filter": {"_id": nsr_id},
"path": "_admin.deployed.K8s.{}".format(index),
}
self.logger.debug(
logging_text
+ "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
)
step = "Executing kdu {}".format(primitive_name)
if primitive_name == "upgrade" and primitive_params:
if primitive_params.get("kdu_model"):
kdu_model = primitive_params.pop("kdu_model")
else:
kdu_model = kdu.get("kdu-model")
if kdu_model.count("/") < 2: # helm chart is not embedded
parts = kdu_model.split(sep=":")
if len(parts) == 2:
kdu_model = parts[0]
if primitive_params.get("kdu_atomic_upgrade"):
atomic_upgrade = primitive_params.get(
"kdu_atomic_upgrade"
).lower() in ("yes", "true", "1")
del primitive_params["kdu_atomic_upgrade"]
else:
atomic_upgrade = True
# Type of upgrade: reset, reuse, reset_then_reuse
reset_values = False
reuse_values = False
reset_then_reuse_values = False
# If no option is specified, default behaviour is reuse_values
# Otherwise, options will be parsed and used
if (
("kdu_reset_values" not in primitive_params)
and ("kdu_reuse_values" not in primitive_params)
and ("kdu_reset_then_reuse_values" not in primitive_params)
):
reuse_values = True
else:
if primitive_params.get("kdu_reset_values"):
reset_values = primitive_params.pop(
"kdu_reset_values"
).lower() in ("yes", "true", "1")
if primitive_params.get("kdu_reuse_values"):
reuse_values = primitive_params.pop(
"kdu_reuse_values"
).lower() in ("yes", "true", "1")
if primitive_params.get("kdu_reset_then_reuse_values"):
reset_then_reuse_values = primitive_params.get(
"kdu_reset_then_reuse_values"
).lower() in ("yes", "true", "1")
# Two true options are not possible
if (
sum([reset_values, reuse_values, reset_then_reuse_values])
>= 2
):
raise LcmException(
"Cannot upgrade the KDU simultaneously with two true options to handle values"
)
# kdur and desc_params already set from before
if reset_values:
desc_params = primitive_params
else:
desc_params.update(primitive_params)
detailed_status = await asyncio.wait_for(
self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu.get("kdu-instance"),
atomic=atomic_upgrade,
reset_values=reset_values,
reuse_values=reuse_values,
reset_then_reuse_values=reset_then_reuse_values,
kdu_model=kdu_model,
params=desc_params,
db_dict=db_dict,
timeout=timeout_ns_action,
),
timeout=timeout_ns_action + 10,
)
self.logger.debug(
logging_text + " Upgrade of kdu {} done".format(detailed_status)
)
elif primitive_name == "rollback":
detailed_status = await asyncio.wait_for(
self.k8scluster_map[kdu["k8scluster-type"]].rollback(
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu.get("kdu-instance"),
db_dict=db_dict,
),
timeout=timeout_ns_action,
)
elif primitive_name == "status":
detailed_status = await asyncio.wait_for(
self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu.get("kdu-instance"),
vca_id=vca_id,
),
timeout=timeout_ns_action,
)
else:
kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
kdu["kdu-name"], nsr_id
)
params = self._map_primitive_params(
config_primitive_desc, primitive_params, desc_params
)
detailed_status = await asyncio.wait_for(
self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu_instance,
primitive_name=primitive_name,
params=params,
db_dict=db_dict,
timeout=timeout_ns_action,
vca_id=vca_id,
),
timeout=timeout_ns_action,
)
if detailed_status:
nslcmop_operation_state = "COMPLETED"
else:
detailed_status = ""
nslcmop_operation_state = "FAILED"
else:
ee_id, vca_type = self._look_for_deployed_vca(
nsr_deployed["VCA"],
member_vnf_index=vnf_index,
vdu_id=vdu_id,
vdu_count_index=vdu_count_index,
ee_descriptor_id=ee_descriptor_id,
)
for vca_index, vca_deployed in enumerate(
db_nsr["_admin"]["deployed"]["VCA"]
):
if vca_deployed.get("member-vnf-index") == vnf_index:
db_dict = {
"collection": "nsrs",
"filter": {"_id": nsr_id},
"path": "_admin.deployed.VCA.{}.".format(vca_index),
}
break
(
nslcmop_operation_state,
detailed_status,
) = await self._ns_execute_primitive(
ee_id,
primitive=primitive_name,
primitive_params=self._map_primitive_params(
config_primitive_desc, primitive_params, desc_params
),
timeout=timeout_ns_action,
vca_type=vca_type,
db_dict=db_dict,
vca_id=vca_id,
)
db_nslcmop_update["detailed-status"] = detailed_status
error_description_nslcmop = (
detailed_status if nslcmop_operation_state == "FAILED" else ""
)
self.logger.debug(
logging_text
+ "Done with result {} {}".format(
nslcmop_operation_state, detailed_status
)
)
return # database update is called inside finally
except (DbException, LcmException, N2VCException, K8sException) as e:
self.logger.error(logging_text + "Exit Exception {}".format(e))
exc = e
except asyncio.CancelledError:
self.logger.error(
logging_text + "Cancelled Exception while '{}'".format(step)
)
exc = "Operation was cancelled"
except asyncio.TimeoutError:
self.logger.error(logging_text + "Timeout while '{}'".format(step))
exc = "Timeout"
except Exception as e:
exc = traceback.format_exc()
self.logger.critical(
logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
exc_info=True,
)
finally:
if exc:
db_nslcmop_update[
"detailed-status"
] = (
detailed_status
) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
nslcmop_operation_state = "FAILED"
if db_nsr:
self._write_ns_status(
nsr_id=nsr_id,
ns_state=db_nsr[
"nsState"
], # TODO check if degraded. For the moment use previous status
current_operation="IDLE",
current_operation_id=None,
# error_description=error_description_nsr,
# error_detail=error_detail,
other_update=db_nsr_update,
)
self._write_op_status(
op_id=nslcmop_id,
stage="",
error_message=error_description_nslcmop,
operation_state=nslcmop_operation_state,
other_update=db_nslcmop_update,
)
if nslcmop_operation_state:
try:
await self.msg.aiowrite(
"ns",
"actioned",
{
"nsr_id": nsr_id,
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
},
)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
)
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
return nslcmop_operation_state, detailed_status
async def terminate_vdus(
self, db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text
):
"""This method terminates VDUs
Args:
db_vnfr: VNF instance record
member_vnf_index: VNF index to identify the VDUs to be removed
db_nsr: NS instance record
update_db_nslcmops: Nslcmop update record
"""
vca_scaling_info = []
scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
scaling_info["scaling_direction"] = "IN"
scaling_info["vdu-delete"] = {}
scaling_info["kdu-delete"] = {}
db_vdur = db_vnfr.get("vdur")
vdur_list = copy(db_vdur)
count_index = 0
for index, vdu in enumerate(vdur_list):
vca_scaling_info.append(
{
"osm_vdu_id": vdu["vdu-id-ref"],
"member-vnf-index": member_vnf_index,
"type": "delete",
"vdu_index": count_index,
}
)
scaling_info["vdu-delete"][vdu["vdu-id-ref"]] = count_index
scaling_info["vdu"].append(
{
"name": vdu.get("name") or vdu.get("vdu-name"),
"vdu_id": vdu["vdu-id-ref"],
"interface": [],
}
)
for interface in vdu["interfaces"]:
scaling_info["vdu"][index]["interface"].append(
{
"name": interface["name"],
"ip_address": interface["ip-address"],
"mac_address": interface.get("mac-address"),
}
)
self.logger.info("NS update scaling info{}".format(scaling_info))
stage[2] = "Terminating VDUs"
if scaling_info.get("vdu-delete"):
# scale_process = "RO"
if self.ro_config.ng:
await self._scale_ng_ro(
logging_text,
db_nsr,
update_db_nslcmops,
db_vnfr,
scaling_info,
stage,
)
async def remove_vnf(self, nsr_id, nslcmop_id, vnf_instance_id):
"""This method is to Remove VNF instances from NS.
Args:
nsr_id: NS instance id
nslcmop_id: nslcmop id of update
vnf_instance_id: id of the VNF instance to be removed
Returns:
result: (str, str) COMPLETED/FAILED, details
"""
try:
db_nsr_update = {}
logging_text = "Task ns={} update ".format(nsr_id)
check_vnfr_count = len(self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}))
self.logger.info("check_vnfr_count {}".format(check_vnfr_count))
if check_vnfr_count > 1:
stage = ["", "", ""]
step = "Getting nslcmop from database"
self.logger.debug(
step + " after having waited for previous tasks to be completed"
)
# db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
member_vnf_index = db_vnfr["member-vnf-index-ref"]
""" db_vnfr = self.db.get_one(
"vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
await self.terminate_vdus(
db_vnfr,
member_vnf_index,
db_nsr,
update_db_nslcmops,
stage,
logging_text,
)
constituent_vnfr = db_nsr.get("constituent-vnfr-ref")
constituent_vnfr.remove(db_vnfr.get("_id"))
db_nsr_update["constituent-vnfr-ref"] = db_nsr.get(
"constituent-vnfr-ref"
)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self.db.del_one("vnfrs", {"_id": db_vnfr.get("_id")})
self.update_db_2("nsrs", nsr_id, db_nsr_update)
return "COMPLETED", "Done"
else:
step = "Terminate VNF Failed with"
raise LcmException(
"{} Cannot terminate the last VNF in this NS.".format(
vnf_instance_id
)
)
except (LcmException, asyncio.CancelledError):
raise
except Exception as e:
self.logger.debug("Error removing VNF {}".format(e))
return "FAILED", "Error removing VNF {}".format(e)
async def _ns_redeploy_vnf(
self,
nsr_id,
nslcmop_id,
db_vnfd,
db_vnfr,
db_nsr,
):
"""This method updates and redeploys VNF instances
Args:
nsr_id: NS instance id
nslcmop_id: nslcmop id
db_vnfd: VNF descriptor
db_vnfr: VNF instance record
db_nsr: NS instance record
Returns:
result: (str, str) COMPLETED/FAILED, details
"""
try:
count_index = 0
stage = ["", "", ""]
logging_text = "Task ns={} update ".format(nsr_id)
latest_vnfd_revision = db_vnfd["_admin"].get("revision")
member_vnf_index = db_vnfr["member-vnf-index-ref"]
# Terminate old VNF resources
update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
await self.terminate_vdus(
db_vnfr,
member_vnf_index,
db_nsr,
update_db_nslcmops,
stage,
logging_text,
)
# old_vnfd_id = db_vnfr["vnfd-id"]
# new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
new_db_vnfd = db_vnfd
# new_vnfd_ref = new_db_vnfd["id"]
# new_vnfd_id = vnfd_id
# Create VDUR
new_vnfr_cp = []
for cp in new_db_vnfd.get("ext-cpd", ()):
vnf_cp = {
"name": cp.get("id"),
"connection-point-id": cp.get("int-cpd", {}).get("cpd"),
"connection-point-vdu-id": cp.get("int-cpd", {}).get("vdu-id"),
"id": cp.get("id"),
}
new_vnfr_cp.append(vnf_cp)
new_vdur = update_db_nslcmops["operationParams"]["newVdur"]
# new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
# new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
new_vnfr_update = {
"revision": latest_vnfd_revision,
"connection-point": new_vnfr_cp,
"vdur": new_vdur,
"ip-address": "",
}
self.update_db_2("vnfrs", db_vnfr["_id"], new_vnfr_update)
updated_db_vnfr = self.db.get_one(
"vnfrs",
{"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id},
)
# Instantiate new VNF resources
# update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
vca_scaling_info = []
scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
scaling_info["scaling_direction"] = "OUT"
scaling_info["vdu-create"] = {}
scaling_info["kdu-create"] = {}
vdud_instantiate_list = db_vnfd["vdu"]
for index, vdud in enumerate(vdud_instantiate_list):
cloud_init_text = self._get_vdu_cloud_init_content(vdud, db_vnfd)
if cloud_init_text:
additional_params = (
self._get_vdu_additional_params(updated_db_vnfr, vdud["id"])
or {}
)
cloud_init_list = []
if cloud_init_text:
# TODO Information of its own ip is not available because db_vnfr is not updated.
additional_params["OSM"] = get_osm_params(
updated_db_vnfr, vdud["id"], 1
)
cloud_init_list.append(
self._parse_cloud_init(
cloud_init_text,
additional_params,
db_vnfd["id"],
vdud["id"],
)
)
vca_scaling_info.append(
{
"osm_vdu_id": vdud["id"],
"member-vnf-index": member_vnf_index,
"type": "create",
"vdu_index": count_index,
}
)
scaling_info["vdu-create"][vdud["id"]] = count_index
if self.ro_config.ng:
self.logger.debug(
"New Resources to be deployed: {}".format(scaling_info)
)
await self._scale_ng_ro(
logging_text,
db_nsr,
update_db_nslcmops,
updated_db_vnfr,
scaling_info,
stage,
)
return "COMPLETED", "Done"
except (LcmException, asyncio.CancelledError):
raise
except Exception as e:
self.logger.debug("Error updating VNF {}".format(e))
return "FAILED", "Error updating VNF {}".format(e)
async def _ns_charm_upgrade(
self,
ee_id,
charm_id,
charm_type,
path,
timeout: float = None,
) -> (str, str):
"""This method upgrade charms in VNF instances
Args:
ee_id: Execution environment id
path: Local path to the charm
charm_id: charm-id
charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
timeout: (Float) Timeout for the ns update operation
Returns:
result: (str, str) COMPLETED/FAILED, details
"""
try:
charm_type = charm_type or "lxc_proxy_charm"
output = await self.vca_map[charm_type].upgrade_charm(
ee_id=ee_id,
path=path,
charm_id=charm_id,
charm_type=charm_type,
timeout=timeout or self.timeout.ns_update,
)
if output:
return "COMPLETED", output
except (LcmException, asyncio.CancelledError):
raise
except Exception as e:
self.logger.debug("Error upgrading charm {}".format(path))
return "FAILED", "Error upgrading charm {}: {}".format(path, e)
async def update(self, nsr_id, nslcmop_id):
"""Update NS according to different update types
This method performs upgrade of VNF instances then updates the revision
number in VNF record
Args:
nsr_id: Network service will be updated
nslcmop_id: ns lcm operation id
Returns:
It may raise DbException, LcmException, N2VCException, K8sException
"""
# Try to lock HA task here
task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
if not task_is_locked_by_me:
return
logging_text = "Task ns={} update={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
# Set the required variables to be filled up later
db_nsr = None
db_nslcmop_update = {}
vnfr_update = {}
nslcmop_operation_state = None
db_nsr_update = {}
error_description_nslcmop = ""
exc = None
change_type = "updated"
detailed_status = ""
member_vnf_index = None
try:
# wait for any previous tasks in process
step = "Waiting for previous operations to terminate"
await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
self._write_ns_status(
nsr_id=nsr_id,
ns_state=None,
current_operation="UPDATING",
current_operation_id=nslcmop_id,
)
step = "Getting nslcmop from database"
db_nslcmop = self.db.get_one(
"nslcmops", {"_id": nslcmop_id}, fail_on_empty=False
)
update_type = db_nslcmop["operationParams"]["updateType"]
step = "Getting nsr from database"
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
old_operational_status = db_nsr["operational-status"]
db_nsr_update["operational-status"] = "updating"
self.update_db_2("nsrs", nsr_id, db_nsr_update)
nsr_deployed = db_nsr["_admin"].get("deployed")
if update_type == "CHANGE_VNFPKG":
# Get the input parameters given through update request
vnf_instance_id = db_nslcmop["operationParams"][
"changeVnfPackageData"
].get("vnfInstanceId")
vnfd_id = db_nslcmop["operationParams"]["changeVnfPackageData"].get(
"vnfdId"
)
timeout_seconds = db_nslcmop["operationParams"].get("timeout_ns_update")
step = "Getting vnfr from database"
db_vnfr = self.db.get_one(
"vnfrs", {"_id": vnf_instance_id}, fail_on_empty=False
)
step = "Getting vnfds from database"
# Latest VNFD
latest_vnfd = self.db.get_one(
"vnfds", {"_id": vnfd_id}, fail_on_empty=False
)
latest_vnfd_revision = latest_vnfd["_admin"].get("revision")
# Current VNFD
current_vnf_revision = db_vnfr.get("revision", 1)
current_vnfd = self.db.get_one(
"vnfds_revisions",
{"_id": vnfd_id + ":" + str(current_vnf_revision)},
fail_on_empty=False,
)
# Charm artifact paths will be filled up later
(
current_charm_artifact_path,
target_charm_artifact_path,
charm_artifact_paths,
helm_artifacts,
) = ([], [], [], [])
step = "Checking if revision has changed in VNFD"
if current_vnf_revision != latest_vnfd_revision:
change_type = "policy_updated"
# There is new revision of VNFD, update operation is required
current_vnfd_path = vnfd_id + ":" + str(current_vnf_revision)
latest_vnfd_path = vnfd_id + ":" + str(latest_vnfd_revision)
step = "Removing the VNFD packages if they exist in the local path"
shutil.rmtree(self.fs.path + current_vnfd_path, ignore_errors=True)
shutil.rmtree(self.fs.path + latest_vnfd_path, ignore_errors=True)
step = "Get the VNFD packages from FSMongo"
self.fs.sync(from_path=latest_vnfd_path)
self.fs.sync(from_path=current_vnfd_path)
step = (
"Get the charm-type, charm-id, ee-id if there is deployed VCA"
)
current_base_folder = current_vnfd["_admin"]["storage"]
latest_base_folder = latest_vnfd["_admin"]["storage"]
for vca_index, vca_deployed in enumerate(
get_iterable(nsr_deployed, "VCA")
):
vnf_index = db_vnfr.get("member-vnf-index-ref")
# Getting charm-id and charm-type
if vca_deployed.get("member-vnf-index") == vnf_index:
vca_id = self.get_vca_id(db_vnfr, db_nsr)
vca_type = vca_deployed.get("type")
vdu_count_index = vca_deployed.get("vdu_count_index")
# Getting ee-id
ee_id = vca_deployed.get("ee_id")
step = "Getting descriptor config"
if current_vnfd.get("kdu"):
search_key = "kdu_name"
else:
search_key = "vnfd_id"
entity_id = vca_deployed.get(search_key)
descriptor_config = get_configuration(
current_vnfd, entity_id
)
if "execution-environment-list" in descriptor_config:
ee_list = descriptor_config.get(
"execution-environment-list", []
)
else:
ee_list = []
# There could be several charm used in the same VNF
for ee_item in ee_list:
if ee_item.get("juju"):
step = "Getting charm name"
charm_name = ee_item["juju"].get("charm")
step = "Setting Charm artifact paths"
current_charm_artifact_path.append(
get_charm_artifact_path(
current_base_folder,
charm_name,
vca_type,
current_vnf_revision,
)
)
target_charm_artifact_path.append(
get_charm_artifact_path(
latest_base_folder,
charm_name,
vca_type,
latest_vnfd_revision,
)
)
elif ee_item.get("helm-chart"):
# add chart to list and all parameters
step = "Getting helm chart name"
chart_name = ee_item.get("helm-chart")
vca_type = "helm-v3"
step = "Setting Helm chart artifact paths"
helm_artifacts.append(
{
"current_artifact_path": get_charm_artifact_path(
current_base_folder,
chart_name,
vca_type,
current_vnf_revision,
),
"target_artifact_path": get_charm_artifact_path(
latest_base_folder,
chart_name,
vca_type,
latest_vnfd_revision,
),
"ee_id": ee_id,
"vca_index": vca_index,
"vdu_index": vdu_count_index,
}
)
charm_artifact_paths = zip(
current_charm_artifact_path, target_charm_artifact_path
)
step = "Checking if software version has changed in VNFD"
if find_software_version(current_vnfd) != find_software_version(
latest_vnfd
):
step = "Checking if existing VNF has charm"
for current_charm_path, target_charm_path in list(
charm_artifact_paths
):
if current_charm_path:
raise LcmException(
"Software version change is not supported as VNF instance {} has charm.".format(
vnf_instance_id
)
)
step = "Checking whether the descriptor has SFC"
if db_nsr.get("nsd", {}).get("vnffgd"):
raise LcmException(
"Ns update is not allowed for NS with SFC"
)
# There is no change in the charm package, then redeploy the VNF
# based on new descriptor
step = "Redeploying VNF"
member_vnf_index = db_vnfr["member-vnf-index-ref"]
(result, detailed_status) = await self._ns_redeploy_vnf(
nsr_id, nslcmop_id, latest_vnfd, db_vnfr, db_nsr
)
if result == "FAILED":
nslcmop_operation_state = result
error_description_nslcmop = detailed_status
old_operational_status = "failed"
db_nslcmop_update["detailed-status"] = detailed_status
db_nsr_update["detailed-status"] = detailed_status
scaling_aspect = get_scaling_aspect(latest_vnfd)
scaling_group_desc = db_nsr.get("_admin").get(
"scaling-group", None
)
if scaling_group_desc:
for aspect in scaling_aspect:
scaling_group_id = aspect.get("id")
for scale_index, scaling_group in enumerate(
scaling_group_desc
):
if scaling_group.get("name") == scaling_group_id:
db_nsr_update[
"_admin.scaling-group.{}.nb-scale-op".format(
scale_index
)
] = 0
self.logger.debug(
logging_text
+ " step {} Done with result {} {}".format(
step, nslcmop_operation_state, detailed_status
)
)
else:
step = "Checking if any charm package has changed or not"
for current_charm_path, target_charm_path in list(
charm_artifact_paths
):
if (
current_charm_path
and target_charm_path
and self.check_charm_hash_changed(
current_charm_path, target_charm_path
)
):
step = "Checking whether VNF uses juju bundle"
if check_juju_bundle_existence(current_vnfd):
raise LcmException(
"Charm upgrade is not supported for the instance which"
" uses juju-bundle: {}".format(
check_juju_bundle_existence(current_vnfd)
)
)
step = "Upgrading Charm"
(
result,
detailed_status,
) = await self._ns_charm_upgrade(
ee_id=ee_id,
charm_id=vca_id,
charm_type=vca_type,
path=self.fs.path + target_charm_path,
timeout=timeout_seconds,
)
if result == "FAILED":
nslcmop_operation_state = result
error_description_nslcmop = detailed_status
db_nslcmop_update["detailed-status"] = detailed_status
self.logger.debug(
logging_text
+ " step {} Done with result {} {}".format(
step, nslcmop_operation_state, detailed_status
)
)
step = "Updating policies"
member_vnf_index = db_vnfr["member-vnf-index-ref"]
result = "COMPLETED"
detailed_status = "Done"
db_nslcmop_update["detailed-status"] = "Done"
# helm base EE
for item in helm_artifacts:
if not (
item["current_artifact_path"]
and item["target_artifact_path"]
and self.check_charm_hash_changed(
item["current_artifact_path"],
item["target_artifact_path"],
)
):
continue
db_update_entry = "_admin.deployed.VCA.{}.".format(
item["vca_index"]
)
vnfr_id = db_vnfr["_id"]
osm_config = {"osm": {"ns_id": nsr_id, "vnf_id": vnfr_id}}
db_dict = {
"collection": "nsrs",
"filter": {"_id": nsr_id},
"path": db_update_entry,
}
vca_type, namespace, helm_id = get_ee_id_parts(item["ee_id"])
await self.vca_map[vca_type].upgrade_execution_environment(
namespace=namespace,
helm_id=helm_id,
db_dict=db_dict,
config=osm_config,
artifact_path=item["target_artifact_path"],
vca_type=vca_type,
)
vnf_id = db_vnfr.get("vnfd-ref")
config_descriptor = get_configuration(latest_vnfd, vnf_id)
self.logger.debug("get ssh key block")
rw_mgmt_ip = None
if deep_get(
config_descriptor,
("config-access", "ssh-access", "required"),
):
# Needed to inject a ssh key
user = deep_get(
config_descriptor,
("config-access", "ssh-access", "default-user"),
)
step = (
"Install configuration Software, getting public ssh key"
)
pub_key = await self.vca_map[
vca_type
].get_ee_ssh_public__key(
ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
)
step = (
"Insert public key into VM user={} ssh_key={}".format(
user, pub_key
)
)
self.logger.debug(logging_text + step)
# wait for RO (ip-address) Insert pub_key into VM
rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
logging_text,
nsr_id,
vnfr_id,
None,
item["vdu_index"],
user=user,
pub_key=pub_key,
)
initial_config_primitive_list = config_descriptor.get(
"initial-config-primitive"
)
config_primitive = next(
(
p
for p in initial_config_primitive_list
if p["name"] == "config"
),
None,
)
if not config_primitive:
continue
deploy_params = {"OSM": get_osm_params(db_vnfr)}
if rw_mgmt_ip:
deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
if db_vnfr.get("additionalParamsForVnf"):
deploy_params.update(
parse_yaml_strings(
db_vnfr["additionalParamsForVnf"].copy()
)
)
primitive_params_ = self._map_primitive_params(
config_primitive, {}, deploy_params
)
step = "execute primitive '{}' params '{}'".format(
config_primitive["name"], primitive_params_
)
self.logger.debug(logging_text + step)
await self.vca_map[vca_type].exec_primitive(
ee_id=ee_id,
primitive_name=config_primitive["name"],
params_dict=primitive_params_,
db_dict=db_dict,
vca_id=vca_id,
vca_type=vca_type,
)
step = "Updating policies"
member_vnf_index = db_vnfr["member-vnf-index-ref"]
detailed_status = "Done"
db_nslcmop_update["detailed-status"] = "Done"
# If nslcmop_operation_state is None, so any operation is not failed.
if not nslcmop_operation_state:
nslcmop_operation_state = "COMPLETED"
# If update CHANGE_VNFPKG nslcmop_operation is successful
# vnf revision need to be updated
vnfr_update["revision"] = latest_vnfd_revision
self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
self.logger.debug(
logging_text
+ " task Done with result {} {}".format(
nslcmop_operation_state, detailed_status
)
)
elif update_type == "REMOVE_VNF":
# This part is included in https://osm.etsi.org/gerrit/11876
vnf_instance_id = db_nslcmop["operationParams"]["removeVnfInstanceId"]
db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
member_vnf_index = db_vnfr["member-vnf-index-ref"]
step = "Removing VNF"
(result, detailed_status) = await self.remove_vnf(
nsr_id, nslcmop_id, vnf_instance_id
)
if result == "FAILED":
nslcmop_operation_state = result
error_description_nslcmop = detailed_status
db_nslcmop_update["detailed-status"] = detailed_status
change_type = "vnf_terminated"
if not nslcmop_operation_state:
nslcmop_operation_state = "COMPLETED"
self.logger.debug(
logging_text
+ " task Done with result {} {}".format(
nslcmop_operation_state, detailed_status
)
)
elif update_type == "OPERATE_VNF":
vnf_id = db_nslcmop["operationParams"]["operateVnfData"][
"vnfInstanceId"
]
operation_type = db_nslcmop["operationParams"]["operateVnfData"][
"changeStateTo"
]
additional_param = db_nslcmop["operationParams"]["operateVnfData"][
"additionalParam"
]
(result, detailed_status) = await self.rebuild_start_stop(
nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
)
if result == "FAILED":
nslcmop_operation_state = result
error_description_nslcmop = detailed_status
db_nslcmop_update["detailed-status"] = detailed_status
if not nslcmop_operation_state:
nslcmop_operation_state = "COMPLETED"
self.logger.debug(
logging_text
+ " task Done with result {} {}".format(
nslcmop_operation_state, detailed_status
)
)
elif update_type == "VERTICAL_SCALE":
self.logger.debug(
"Prepare for VERTICAL_SCALE update operation {}".format(db_nslcmop)
)
# Get the input parameters given through update request
vnf_instance_id = db_nslcmop["operationParams"]["verticalScaleVnf"].get(
"vnfInstanceId"
)
vnfd_id = db_nslcmop["operationParams"]["verticalScaleVnf"].get(
"vnfdId"
)
step = "Getting vnfr from database"
db_vnfr = self.db.get_one(
"vnfrs", {"_id": vnf_instance_id}, fail_on_empty=False
)
self.logger.debug(step)
step = "Getting vnfds from database"
self.logger.debug("Start" + step)
# Latest VNFD
latest_vnfd = self.db.get_one(
"vnfds", {"_id": vnfd_id}, fail_on_empty=False
)
latest_vnfd_revision = latest_vnfd["_admin"].get("revision")
# Current VNFD
current_vnf_revision = db_vnfr.get("revision", 1)
current_vnfd = self.db.get_one(
"vnfds_revisions",
{"_id": vnfd_id + ":" + str(current_vnf_revision)},
fail_on_empty=False,
)
self.logger.debug("End" + step)
# verify flavor changes
step = "Checking for flavor change"
if find_software_version(current_vnfd) != find_software_version(
latest_vnfd
):
self.logger.debug("Start" + step)
if current_vnfd.get("virtual-compute-desc") == latest_vnfd.get(
"virtual-compute-desc"
) and current_vnfd.get("virtual-storage-desc") == latest_vnfd.get(
"virtual-storage-desc"
):
raise LcmException(
"No change in flavor check vnfd {}".format(vnfd_id)
)
else:
raise LcmException(
"No change in software_version of vnfd {}".format(vnfd_id)
)
self.logger.debug("End" + step)
(result, detailed_status) = await self.vertical_scale(
nsr_id, nslcmop_id
)
self.logger.debug(
"vertical_scale result: {} detailed_status :{}".format(
result, detailed_status
)
)
if result == "FAILED":
nslcmop_operation_state = result
error_description_nslcmop = detailed_status
db_nslcmop_update["detailed-status"] = detailed_status
if not nslcmop_operation_state:
nslcmop_operation_state = "COMPLETED"
self.logger.debug(
logging_text
+ " task Done with result {} {}".format(
nslcmop_operation_state, detailed_status
)
)
# If nslcmop_operation_state is None, so any operation is not failed.
# All operations are executed in overall.
if not nslcmop_operation_state:
nslcmop_operation_state = "COMPLETED"
db_nsr_update["operational-status"] = old_operational_status
except (DbException, LcmException, N2VCException, K8sException) as e:
self.logger.error(logging_text + "Exit Exception {}".format(e))
exc = e
except asyncio.CancelledError:
self.logger.error(
logging_text + "Cancelled Exception while '{}'".format(step)
)
exc = "Operation was cancelled"
except asyncio.TimeoutError:
self.logger.error(logging_text + "Timeout while '{}'".format(step))
exc = "Timeout"
except Exception as e:
exc = traceback.format_exc()
self.logger.critical(
logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
exc_info=True,
)
finally:
if exc:
db_nslcmop_update[
"detailed-status"
] = (
detailed_status
) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
nslcmop_operation_state = "FAILED"
db_nsr_update["operational-status"] = old_operational_status
if db_nsr:
self._write_ns_status(
nsr_id=nsr_id,
ns_state=db_nsr["nsState"],
current_operation="IDLE",
current_operation_id=None,
other_update=db_nsr_update,
)
self._write_op_status(
op_id=nslcmop_id,
stage="",
error_message=error_description_nslcmop,
operation_state=nslcmop_operation_state,
other_update=db_nslcmop_update,
)
if nslcmop_operation_state:
try:
msg = {
"nsr_id": nsr_id,
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
if (
change_type in ("vnf_terminated", "policy_updated")
and member_vnf_index
):
msg.update({"vnf_member_index": member_vnf_index})
await self.msg.aiowrite("ns", change_type, msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
)
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_update")
return nslcmop_operation_state, detailed_status
async def scale(self, nsr_id, nslcmop_id):
# Try to lock HA task here
task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
if not task_is_locked_by_me:
return
logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
stage = ["", "", ""]
tasks_dict_info = {}
# ^ stage, step, VIM progress
self.logger.debug(logging_text + "Enter")
# get all needed from database
db_nsr = None
db_nslcmop_update = {}
db_nsr_update = {}
exc = None
# in case of error, indicates what part of scale was failed to put nsr at error status
scale_process = None
old_operational_status = ""
old_config_status = ""
nsi_id = None
prom_job_name = ""
try:
# wait for any previous tasks in process
step = "Waiting for previous operations to terminate"
await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
self._write_ns_status(
nsr_id=nsr_id,
ns_state=None,
current_operation="SCALING",
current_operation_id=nslcmop_id,
)
step = "Getting nslcmop from database"
self.logger.debug(
step + " after having waited for previous tasks to be completed"
)
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
step = "Getting nsr from database"
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
old_operational_status = db_nsr["operational-status"]
old_config_status = db_nsr["config-status"]
step = "Checking whether the descriptor has SFC"
if db_nsr.get("nsd", {}).get("vnffgd"):
raise LcmException("Scaling is not allowed for NS with SFC")
step = "Parsing scaling parameters"
db_nsr_update["operational-status"] = "scaling"
self.update_db_2("nsrs", nsr_id, db_nsr_update)
nsr_deployed = db_nsr["_admin"].get("deployed")
vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
"scaleByStepData"
]["member-vnf-index"]
scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
"scaleByStepData"
]["scaling-group-descriptor"]
scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
# for backward compatibility
if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
self.update_db_2("nsrs", nsr_id, db_nsr_update)
step = "Getting vnfr from database"
db_vnfr = self.db.get_one(
"vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
)
vca_id = self.get_vca_id(db_vnfr, db_nsr)
step = "Getting vnfd from database"
db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
base_folder = db_vnfd["_admin"]["storage"]
step = "Getting scaling-group-descriptor"
scaling_descriptor = find_in_list(
get_scaling_aspect(db_vnfd),
lambda scale_desc: scale_desc["name"] == scaling_group,
)
if not scaling_descriptor:
raise LcmException(
"input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
"at vnfd:scaling-group-descriptor".format(scaling_group)
)
step = "Sending scale order to VIM"
# TODO check if ns is in a proper status
nb_scale_op = 0
if not db_nsr["_admin"].get("scaling-group"):
self.update_db_2(
"nsrs",
nsr_id,
{
"_admin.scaling-group": [
{
"name": scaling_group,
"vnf_index": vnf_index,
"nb-scale-op": 0,
}
]
},
)
admin_scale_index = 0
else:
for admin_scale_index, admin_scale_info in enumerate(
db_nsr["_admin"]["scaling-group"]
):
if (
admin_scale_info["name"] == scaling_group
and admin_scale_info["vnf_index"] == vnf_index
):
nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
break
else: # not found, set index one plus last element and add new entry with the name
admin_scale_index += 1
db_nsr_update[
"_admin.scaling-group.{}.name".format(admin_scale_index)
] = scaling_group
db_nsr_update[
"_admin.scaling-group.{}.vnf_index".format(admin_scale_index)
] = vnf_index
vca_scaling_info = []
scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
if scaling_type == "SCALE_OUT":
if "aspect-delta-details" not in scaling_descriptor:
raise LcmException(
"Aspect delta details not fount in scaling descriptor {}".format(
scaling_descriptor["name"]
)
)
# count if max-instance-count is reached
deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
scaling_info["scaling_direction"] = "OUT"
scaling_info["vdu-create"] = {}
scaling_info["kdu-create"] = {}
for delta in deltas:
for vdu_delta in delta.get("vdu-delta", {}):
vdud = get_vdu(db_vnfd, vdu_delta["id"])
# vdu_index also provides the number of instance of the targeted vdu
vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
if vdu_index <= len(db_vnfr["vdur"]):
vdu_name_id = db_vnfr["vdur"][vdu_index - 1]["vdu-name"]
prom_job_name = (
db_vnfr["_id"] + vdu_name_id + str(vdu_index - 1)
)
prom_job_name = prom_job_name.replace("_", "")
prom_job_name = prom_job_name.replace("-", "")
else:
prom_job_name = None
cloud_init_text = self._get_vdu_cloud_init_content(
vdud, db_vnfd
)
if cloud_init_text:
additional_params = (
self._get_vdu_additional_params(db_vnfr, vdud["id"])
or {}
)
cloud_init_list = []
vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
max_instance_count = 10
if vdu_profile and "max-number-of-instances" in vdu_profile:
max_instance_count = vdu_profile.get(
"max-number-of-instances", 10
)
default_instance_num = get_number_of_instances(
db_vnfd, vdud["id"]
)
instances_number = vdu_delta.get("number-of-instances", 1)
nb_scale_op += instances_number
new_instance_count = nb_scale_op + default_instance_num
# Control if new count is over max and vdu count is less than max.
# Then assign new instance count
if new_instance_count > max_instance_count > vdu_count:
instances_number = new_instance_count - max_instance_count
else:
instances_number = instances_number
if new_instance_count > max_instance_count:
raise LcmException(
"reached the limit of {} (max-instance-count) "
"scaling-out operations for the "
"scaling-group-descriptor '{}'".format(
nb_scale_op, scaling_group
)
)
for x in range(vdu_delta.get("number-of-instances", 1)):
if cloud_init_text:
# TODO Information of its own ip is not available because db_vnfr is not updated.
additional_params["OSM"] = get_osm_params(
db_vnfr, vdu_delta["id"], vdu_index + x
)
cloud_init_list.append(
self._parse_cloud_init(
cloud_init_text,
additional_params,
db_vnfd["id"],
vdud["id"],
)
)
vca_scaling_info.append(
{
"osm_vdu_id": vdu_delta["id"],
"member-vnf-index": vnf_index,
"type": "create",
"vdu_index": vdu_index + x,
}
)
scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
for kdu_delta in delta.get("kdu-resource-delta", {}):
kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
kdu_name = kdu_profile["kdu-name"]
resource_name = kdu_profile.get("resource-name", "")
# Might have different kdus in the same delta
# Should have list for each kdu
if not scaling_info["kdu-create"].get(kdu_name, None):
scaling_info["kdu-create"][kdu_name] = []
kdur = get_kdur(db_vnfr, kdu_name)
if kdur.get("helm-chart"):
k8s_cluster_type = "helm-chart-v3"
self.logger.debug("kdur: {}".format(kdur))
elif kdur.get("juju-bundle"):
k8s_cluster_type = "juju-bundle"
else:
raise LcmException(
"kdu type for kdu='{}.{}' is neither helm-chart nor "
"juju-bundle. Maybe an old NBI version is running".format(
db_vnfr["member-vnf-index-ref"], kdu_name
)
)
max_instance_count = 10
if kdu_profile and "max-number-of-instances" in kdu_profile:
max_instance_count = kdu_profile.get(
"max-number-of-instances", 10
)
nb_scale_op += kdu_delta.get("number-of-instances", 1)
deployed_kdu, _ = get_deployed_kdu(
nsr_deployed, kdu_name, vnf_index
)
if deployed_kdu is None:
raise LcmException(
"KDU '{}' for vnf '{}' not deployed".format(
kdu_name, vnf_index
)
)
kdu_instance = deployed_kdu.get("kdu-instance")
instance_num = await self.k8scluster_map[
k8s_cluster_type
].get_scale_count(
resource_name,
kdu_instance,
vca_id=vca_id,
cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
kdu_model=deployed_kdu.get("kdu-model"),
)
kdu_replica_count = instance_num + kdu_delta.get(
"number-of-instances", 1
)
# Control if new count is over max and instance_num is less than max.
# Then assign max instance number to kdu replica count
if kdu_replica_count > max_instance_count > instance_num:
kdu_replica_count = max_instance_count
if kdu_replica_count > max_instance_count:
raise LcmException(
"reached the limit of {} (max-instance-count) "
"scaling-out operations for the "
"scaling-group-descriptor '{}'".format(
instance_num, scaling_group
)
)
for x in range(kdu_delta.get("number-of-instances", 1)):
vca_scaling_info.append(
{
"osm_kdu_id": kdu_name,
"member-vnf-index": vnf_index,
"type": "create",
"kdu_index": instance_num + x - 1,
}
)
scaling_info["kdu-create"][kdu_name].append(
{
"member-vnf-index": vnf_index,
"type": "create",
"k8s-cluster-type": k8s_cluster_type,
"resource-name": resource_name,
"scale": kdu_replica_count,
}
)
elif scaling_type == "SCALE_IN":
deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
scaling_info["scaling_direction"] = "IN"
scaling_info["vdu-delete"] = {}
scaling_info["kdu-delete"] = {}
for delta in deltas:
for vdu_delta in delta.get("vdu-delta", {}):
vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
min_instance_count = 0
vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
if vdu_profile and "min-number-of-instances" in vdu_profile:
min_instance_count = vdu_profile["min-number-of-instances"]
default_instance_num = get_number_of_instances(
db_vnfd, vdu_delta["id"]
)
instance_num = vdu_delta.get("number-of-instances", 1)
nb_scale_op -= instance_num
new_instance_count = nb_scale_op + default_instance_num
if new_instance_count < min_instance_count < vdu_count:
instances_number = min_instance_count - new_instance_count
else:
instances_number = instance_num
if new_instance_count < min_instance_count:
raise LcmException(
"reached the limit of {} (min-instance-count) scaling-in operations for the "
"scaling-group-descriptor '{}'".format(
nb_scale_op, scaling_group
)
)
for x in range(vdu_delta.get("number-of-instances", 1)):
vca_scaling_info.append(
{
"osm_vdu_id": vdu_delta["id"],
"member-vnf-index": vnf_index,
"type": "delete",
"vdu_index": vdu_index - 1 - x,
}
)
scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
for kdu_delta in delta.get("kdu-resource-delta", {}):
kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
kdu_name = kdu_profile["kdu-name"]
resource_name = kdu_profile.get("resource-name", "")
if not scaling_info["kdu-delete"].get(kdu_name, None):
scaling_info["kdu-delete"][kdu_name] = []
kdur = get_kdur(db_vnfr, kdu_name)
if kdur.get("helm-chart"):
k8s_cluster_type = "helm-chart-v3"
self.logger.debug("kdur: {}".format(kdur))
elif kdur.get("juju-bundle"):
k8s_cluster_type = "juju-bundle"
else:
raise LcmException(
"kdu type for kdu='{}.{}' is neither helm-chart nor "
"juju-bundle. Maybe an old NBI version is running".format(
db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
)
)
min_instance_count = 0
if kdu_profile and "min-number-of-instances" in kdu_profile:
min_instance_count = kdu_profile["min-number-of-instances"]
nb_scale_op -= kdu_delta.get("number-of-instances", 1)
deployed_kdu, _ = get_deployed_kdu(
nsr_deployed, kdu_name, vnf_index
)
if deployed_kdu is None:
raise LcmException(
"KDU '{}' for vnf '{}' not deployed".format(
kdu_name, vnf_index
)
)
kdu_instance = deployed_kdu.get("kdu-instance")
instance_num = await self.k8scluster_map[
k8s_cluster_type
].get_scale_count(
resource_name,
kdu_instance,
vca_id=vca_id,
cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
kdu_model=deployed_kdu.get("kdu-model"),
)
kdu_replica_count = instance_num - kdu_delta.get(
"number-of-instances", 1
)
if kdu_replica_count < min_instance_count < instance_num:
kdu_replica_count = min_instance_count
if kdu_replica_count < min_instance_count:
raise LcmException(
"reached the limit of {} (min-instance-count) scaling-in operations for the "
"scaling-group-descriptor '{}'".format(
instance_num, scaling_group
)
)
for x in range(kdu_delta.get("number-of-instances", 1)):
vca_scaling_info.append(
{
"osm_kdu_id": kdu_name,
"member-vnf-index": vnf_index,
"type": "delete",
"kdu_index": instance_num - x - 1,
}
)
scaling_info["kdu-delete"][kdu_name].append(
{
"member-vnf-index": vnf_index,
"type": "delete",
"k8s-cluster-type": k8s_cluster_type,
"resource-name": resource_name,
"scale": kdu_replica_count,
}
)
# update VDU_SCALING_INFO with the VDUs to delete ip_addresses
vdu_delete = copy(scaling_info.get("vdu-delete"))
if scaling_info["scaling_direction"] == "IN":
for vdur in reversed(db_vnfr["vdur"]):
if vdu_delete.get(vdur["vdu-id-ref"]):
vdu_delete[vdur["vdu-id-ref"]] -= 1
scaling_info["vdu"].append(
{
"name": vdur.get("name") or vdur.get("vdu-name"),
"vdu_id": vdur["vdu-id-ref"],
"interface": [],
}
)
for interface in vdur["interfaces"]:
scaling_info["vdu"][-1]["interface"].append(
{
"name": interface["name"],
"ip_address": interface["ip-address"],
"mac_address": interface.get("mac-address"),
}
)
# vdu_delete = vdu_scaling_info.pop("vdu-delete")
# PRE-SCALE BEGIN
step = "Executing pre-scale vnf-config-primitive"
if scaling_descriptor.get("scaling-config-action"):
for scaling_config_action in scaling_descriptor[
"scaling-config-action"
]:
if (
scaling_config_action.get("trigger") == "pre-scale-in"
and scaling_type == "SCALE_IN"
) or (
scaling_config_action.get("trigger") == "pre-scale-out"
and scaling_type == "SCALE_OUT"
):
vnf_config_primitive = scaling_config_action[
"vnf-config-primitive-name-ref"
]
step = db_nslcmop_update[
"detailed-status"
] = "executing pre-scale scaling-config-action '{}'".format(
vnf_config_primitive
)
# look for primitive
for config_primitive in (
get_configuration(db_vnfd, db_vnfd["id"]) or {}
).get("config-primitive", ()):
if config_primitive["name"] == vnf_config_primitive:
break
else:
raise LcmException(
"Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
"[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
"primitive".format(scaling_group, vnf_config_primitive)
)
vnfr_params = {"VDU_SCALE_INFO": scaling_info}
if db_vnfr.get("additionalParamsForVnf"):
vnfr_params.update(db_vnfr["additionalParamsForVnf"])
scale_process = "VCA"
db_nsr_update["config-status"] = "configuring pre-scaling"
primitive_params = self._map_primitive_params(
config_primitive, {}, vnfr_params
)
# Pre-scale retry check: Check if this sub-operation has been executed before
op_index = self._check_or_add_scale_suboperation(
db_nslcmop,
vnf_index,
vnf_config_primitive,
primitive_params,
"PRE-SCALE",
)
if op_index == self.SUBOPERATION_STATUS_SKIP:
# Skip sub-operation
result = "COMPLETED"
result_detail = "Done"
self.logger.debug(
logging_text
+ "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
vnf_config_primitive, result, result_detail
)
)
else:
if op_index == self.SUBOPERATION_STATUS_NEW:
# New sub-operation: Get index of this sub-operation
op_index = (
len(db_nslcmop.get("_admin", {}).get("operations"))
- 1
)
self.logger.debug(
logging_text
+ "vnf_config_primitive={} New sub-operation".format(
vnf_config_primitive
)
)
else:
# retry: Get registered params for this existing sub-operation
op = db_nslcmop.get("_admin", {}).get("operations", [])[
op_index
]
vnf_index = op.get("member_vnf_index")
vnf_config_primitive = op.get("primitive")
primitive_params = op.get("primitive_params")
self.logger.debug(
logging_text
+ "vnf_config_primitive={} Sub-operation retry".format(
vnf_config_primitive
)
)
# Execute the primitive, either with new (first-time) or registered (reintent) args
ee_descriptor_id = config_primitive.get(
"execution-environment-ref"
)
primitive_name = config_primitive.get(
"execution-environment-primitive", vnf_config_primitive
)
ee_id, vca_type = self._look_for_deployed_vca(
nsr_deployed["VCA"],
member_vnf_index=vnf_index,
vdu_id=None,
vdu_count_index=None,
ee_descriptor_id=ee_descriptor_id,
)
result, result_detail = await self._ns_execute_primitive(
ee_id,
primitive_name,
primitive_params,
vca_type=vca_type,
vca_id=vca_id,
)
self.logger.debug(
logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
vnf_config_primitive, result, result_detail
)
)
# Update operationState = COMPLETED | FAILED
self._update_suboperation_status(
db_nslcmop, op_index, result, result_detail
)
if result == "FAILED":
raise LcmException(result_detail)
db_nsr_update["config-status"] = old_config_status
scale_process = None
# PRE-SCALE END
db_nsr_update[
"_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
] = nb_scale_op
db_nsr_update[
"_admin.scaling-group.{}.time".format(admin_scale_index)
] = time()
# SCALE-IN VCA - BEGIN
if vca_scaling_info:
step = db_nslcmop_update[
"detailed-status"
] = "Deleting the execution environments"
scale_process = "VCA"
for vca_info in vca_scaling_info:
if vca_info["type"] == "delete" and not vca_info.get("osm_kdu_id"):
member_vnf_index = str(vca_info["member-vnf-index"])
self.logger.debug(
logging_text + "vdu info: {}".format(vca_info)
)
if vca_info.get("osm_vdu_id"):
vdu_id = vca_info["osm_vdu_id"]
vdu_index = int(vca_info["vdu_index"])
stage[
1
] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
member_vnf_index, vdu_id, vdu_index
)
stage[2] = step = "Scaling in VCA"
self._write_op_status(op_id=nslcmop_id, stage=stage)
vca_update = db_nsr["_admin"]["deployed"]["VCA"]
config_update = db_nsr["configurationStatus"]
for vca_index, vca in enumerate(vca_update):
if (
(vca or vca.get("ee_id"))
and vca["member-vnf-index"] == member_vnf_index
and vca["vdu_count_index"] == vdu_index
):
if vca.get("vdu_id"):
config_descriptor = get_configuration(
db_vnfd, vca.get("vdu_id")
)
elif vca.get("kdu_name"):
config_descriptor = get_configuration(
db_vnfd, vca.get("kdu_name")
)
else:
config_descriptor = get_configuration(
db_vnfd, db_vnfd["id"]
)
operation_params = (
db_nslcmop.get("operationParams") or {}
)
exec_terminate_primitives = not operation_params.get(
"skip_terminate_primitives"
) and vca.get("needed_terminate")
task = asyncio.ensure_future(
asyncio.wait_for(
self.destroy_N2VC(
logging_text,
db_nslcmop,
vca,
config_descriptor,
vca_index,
destroy_ee=True,
exec_primitives=exec_terminate_primitives,
scaling_in=True,
vca_id=vca_id,
),
timeout=self.timeout.charm_delete,
)
)
tasks_dict_info[task] = "Terminating VCA {}".format(
vca.get("ee_id")
)
del vca_update[vca_index]
del config_update[vca_index]
# wait for pending tasks of terminate primitives
if tasks_dict_info:
self.logger.debug(
logging_text
+ "Waiting for tasks {}".format(
list(tasks_dict_info.keys())
)
)
error_list = await self._wait_for_tasks(
logging_text,
tasks_dict_info,
min(
self.timeout.charm_delete, self.timeout.ns_terminate
),
stage,
nslcmop_id,
)
tasks_dict_info.clear()
if error_list:
raise LcmException("; ".join(error_list))
db_vca_and_config_update = {
"_admin.deployed.VCA": vca_update,
"configurationStatus": config_update,
}
self.update_db_2(
"nsrs", db_nsr["_id"], db_vca_and_config_update
)
scale_process = None
# SCALE-IN VCA - END
# SCALE RO - BEGIN
if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
scale_process = "RO"
if self.ro_config.ng:
await self._scale_ng_ro(
logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
)
scaling_info.pop("vdu-create", None)
scaling_info.pop("vdu-delete", None)
scale_process = None
# SCALE RO - END
# SCALE KDU - BEGIN
if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
scale_process = "KDU"
await self._scale_kdu(
logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
)
scaling_info.pop("kdu-create", None)
scaling_info.pop("kdu-delete", None)
scale_process = None
# SCALE KDU - END
if db_nsr_update:
self.update_db_2("nsrs", nsr_id, db_nsr_update)
# SCALE-UP VCA - BEGIN
if vca_scaling_info:
step = db_nslcmop_update[
"detailed-status"
] = "Creating new execution environments"
scale_process = "VCA"
for vca_info in vca_scaling_info:
if vca_info["type"] == "create" and not vca_info.get("osm_kdu_id"):
member_vnf_index = str(vca_info["member-vnf-index"])
self.logger.debug(
logging_text + "vdu info: {}".format(vca_info)
)
vnfd_id = db_vnfr["vnfd-ref"]
if vca_info.get("osm_vdu_id"):
vdu_index = int(vca_info["vdu_index"])
deploy_params = {"OSM": get_osm_params(db_vnfr)}
if db_vnfr.get("additionalParamsForVnf"):
deploy_params.update(
parse_yaml_strings(
db_vnfr["additionalParamsForVnf"].copy()
)
)
descriptor_config = get_configuration(
db_vnfd, db_vnfd["id"]
)
if descriptor_config:
vdu_id = None
vdu_name = None
kdu_name = None
kdu_index = None
self._deploy_n2vc(
logging_text=logging_text
+ "member_vnf_index={} ".format(member_vnf_index),
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
nsr_id=nsr_id,
nsi_id=nsi_id,
vnfd_id=vnfd_id,
vdu_id=vdu_id,
kdu_name=kdu_name,
kdu_index=kdu_index,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params,
descriptor_config=descriptor_config,
base_folder=base_folder,
task_instantiation_info=tasks_dict_info,
stage=stage,
)
vdu_id = vca_info["osm_vdu_id"]
vdur = find_in_list(
db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
)
descriptor_config = get_configuration(db_vnfd, vdu_id)
if vdur.get("additionalParams"):
deploy_params_vdu = parse_yaml_strings(
vdur["additionalParams"]
)
else:
deploy_params_vdu = deploy_params
deploy_params_vdu["OSM"] = get_osm_params(
db_vnfr, vdu_id, vdu_count_index=vdu_index
)
if descriptor_config:
vdu_name = None
kdu_name = None
kdu_index = None
stage[
1
] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
member_vnf_index, vdu_id, vdu_index
)
stage[2] = step = "Scaling out VCA"
self._write_op_status(op_id=nslcmop_id, stage=stage)
self._deploy_n2vc(
logging_text=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
member_vnf_index, vdu_id, vdu_index
),
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
nsr_id=nsr_id,
nsi_id=nsi_id,
vnfd_id=vnfd_id,
vdu_id=vdu_id,
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
kdu_index=kdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params_vdu,
descriptor_config=descriptor_config,
base_folder=base_folder,
task_instantiation_info=tasks_dict_info,
stage=stage,
)
# SCALE-UP VCA - END
scale_process = None
# POST-SCALE BEGIN
# execute primitive service POST-SCALING
step = "Executing post-scale vnf-config-primitive"
if scaling_descriptor.get("scaling-config-action"):
for scaling_config_action in scaling_descriptor[
"scaling-config-action"
]:
if (
scaling_config_action.get("trigger") == "post-scale-in"
and scaling_type == "SCALE_IN"
) or (
scaling_config_action.get("trigger") == "post-scale-out"
and scaling_type == "SCALE_OUT"
):
vnf_config_primitive = scaling_config_action[
"vnf-config-primitive-name-ref"
]
step = db_nslcmop_update[
"detailed-status"
] = "executing post-scale scaling-config-action '{}'".format(
vnf_config_primitive
)
vnfr_params = {"VDU_SCALE_INFO": scaling_info}
if db_vnfr.get("additionalParamsForVnf"):
vnfr_params.update(db_vnfr["additionalParamsForVnf"])
# look for primitive
for config_primitive in (
get_configuration(db_vnfd, db_vnfd["id"]) or {}
).get("config-primitive", ()):
if config_primitive["name"] == vnf_config_primitive:
break
else:
raise LcmException(
"Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
"action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
"config-primitive".format(
scaling_group, vnf_config_primitive
)
)
scale_process = "VCA"
db_nsr_update["config-status"] = "configuring post-scaling"
primitive_params = self._map_primitive_params(
config_primitive, {}, vnfr_params
)
# Post-scale retry check: Check if this sub-operation has been executed before
op_index = self._check_or_add_scale_suboperation(
db_nslcmop,
vnf_index,
vnf_config_primitive,
primitive_params,
"POST-SCALE",
)
if op_index == self.SUBOPERATION_STATUS_SKIP:
# Skip sub-operation
result = "COMPLETED"
result_detail = "Done"
self.logger.debug(
logging_text
+ "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
vnf_config_primitive, result, result_detail
)
)
else:
if op_index == self.SUBOPERATION_STATUS_NEW:
# New sub-operation: Get index of this sub-operation
op_index = (
len(db_nslcmop.get("_admin", {}).get("operations"))
- 1
)
self.logger.debug(
logging_text
+ "vnf_config_primitive={} New sub-operation".format(
vnf_config_primitive
)
)
else:
# retry: Get registered params for this existing sub-operation
op = db_nslcmop.get("_admin", {}).get("operations", [])[
op_index
]
vnf_index = op.get("member_vnf_index")
vnf_config_primitive = op.get("primitive")
primitive_params = op.get("primitive_params")
self.logger.debug(
logging_text
+ "vnf_config_primitive={} Sub-operation retry".format(
vnf_config_primitive
)
)
# Execute the primitive, either with new (first-time) or registered (reintent) args
ee_descriptor_id = config_primitive.get(
"execution-environment-ref"
)
primitive_name = config_primitive.get(
"execution-environment-primitive", vnf_config_primitive
)
ee_id, vca_type = self._look_for_deployed_vca(
nsr_deployed["VCA"],
member_vnf_index=vnf_index,
vdu_id=None,
vdu_count_index=None,
ee_descriptor_id=ee_descriptor_id,
)
result, result_detail = await self._ns_execute_primitive(
ee_id,
primitive_name,
primitive_params,
vca_type=vca_type,
vca_id=vca_id,
)
self.logger.debug(
logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
vnf_config_primitive, result, result_detail
)
)
# Update operationState = COMPLETED | FAILED
self._update_suboperation_status(
db_nslcmop, op_index, result, result_detail
)
if result == "FAILED":
raise LcmException(result_detail)
db_nsr_update["config-status"] = old_config_status
scale_process = None
# POST-SCALE END
# Check if each vnf has exporter for metric collection if so update prometheus job records
if scaling_type == "SCALE_OUT" and bool(self.service_kpi.old_sa):
if "exporters-endpoints" in db_vnfd.get("df")[0]:
vnfr_id = db_vnfr["id"]
db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
exporter_config = db_vnfd.get("df")[0].get("exporters-endpoints")
self.logger.debug("exporter config :{}".format(exporter_config))
artifact_path = "{}/{}/{}".format(
base_folder["folder"],
base_folder["pkg-dir"],
"exporter-endpoint",
)
ee_id = None
ee_config_descriptor = exporter_config
rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
logging_text,
nsr_id,
vnfr_id,
vdu_id=db_vnfr["vdur"][-1]["vdu-id-ref"],
vdu_index=db_vnfr["vdur"][-1]["count-index"],
user=None,
pub_key=None,
)
self.logger.debug("rw_mgmt_ip:{}".format(rw_mgmt_ip))
self.logger.debug("Artifact_path:{}".format(artifact_path))
vdu_id_for_prom = None
vdu_index_for_prom = None
for x in get_iterable(db_vnfr, "vdur"):
vdu_id_for_prom = x.get("vdu-id-ref")
vdu_index_for_prom = x.get("count-index")
vnfr_id = vnfr_id + vdu_id + str(vdu_index)
vnfr_id = vnfr_id.replace("_", "")
prometheus_jobs = await self.extract_prometheus_scrape_jobs(
ee_id=ee_id,
artifact_path=artifact_path,
ee_config_descriptor=ee_config_descriptor,
vnfr_id=vnfr_id,
nsr_id=nsr_id,
target_ip=rw_mgmt_ip,
element_type="VDU",
vdu_id=vdu_id_for_prom,
vdu_index=vdu_index_for_prom,
)
self.logger.debug("Prometheus job:{}".format(prometheus_jobs))
if prometheus_jobs:
db_nsr_update[
"_admin.deployed.prometheus_jobs"
] = prometheus_jobs
self.update_db_2(
"nsrs",
nsr_id,
db_nsr_update,
)
for job in prometheus_jobs:
self.db.set_one(
"prometheus_jobs",
{"job_name": ""},
job,
upsert=True,
fail_on_empty=False,
)
db_nsr_update[
"detailed-status"
] = "" # "scaled {} {}".format(scaling_group, scaling_type)
db_nsr_update["operational-status"] = (
"running"
if old_operational_status == "failed"
else old_operational_status
)
db_nsr_update["config-status"] = old_config_status
return
except (
ROclient.ROClientException,
DbException,
LcmException,
NgRoException,
) as e:
self.logger.error(logging_text + "Exit Exception {}".format(e))
exc = e
except asyncio.CancelledError:
self.logger.error(
logging_text + "Cancelled Exception while '{}'".format(step)
)
exc = "Operation was cancelled"
except Exception as e:
exc = traceback.format_exc()
self.logger.critical(
logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
exc_info=True,
)
finally:
error_list = list()
if exc:
error_list.append(str(exc))
self._write_ns_status(
nsr_id=nsr_id,
ns_state=None,
current_operation="IDLE",
current_operation_id=None,
)
try:
if tasks_dict_info:
stage[1] = "Waiting for instantiate pending tasks."
self.logger.debug(logging_text + stage[1])
exc = await self._wait_for_tasks(
logging_text,
tasks_dict_info,
self.timeout.ns_deploy,
stage,
nslcmop_id,
nsr_id=nsr_id,
)
except asyncio.CancelledError:
error_list.append("Cancelled")
await self._cancel_pending_tasks(logging_text, tasks_dict_info)
await self._wait_for_tasks(
logging_text,
tasks_dict_info,
self.timeout.ns_deploy,
stage,
nslcmop_id,
nsr_id=nsr_id,
)
if error_list:
error_detail = "; ".join(error_list)
db_nslcmop_update[
"detailed-status"
] = error_description_nslcmop = "FAILED {}: {}".format(
step, error_detail
)
nslcmop_operation_state = "FAILED"
if db_nsr:
db_nsr_update["operational-status"] = old_operational_status
db_nsr_update["config-status"] = old_config_status
db_nsr_update["detailed-status"] = ""
if scale_process:
if "VCA" in scale_process:
db_nsr_update["config-status"] = "failed"
if "RO" in scale_process:
db_nsr_update["operational-status"] = "failed"
db_nsr_update[
"detailed-status"
] = "FAILED scaling nslcmop={} {}: {}".format(
nslcmop_id, step, error_detail
)
else:
error_description_nslcmop = None
nslcmop_operation_state = "COMPLETED"
db_nslcmop_update["detailed-status"] = "Done"
if scaling_type == "SCALE_IN" and prom_job_name is not None:
self.db.del_one(
"prometheus_jobs",
{"job_name": prom_job_name},
fail_on_empty=False,
)
self._write_op_status(
op_id=nslcmop_id,
stage="",
error_message=error_description_nslcmop,
operation_state=nslcmop_operation_state,
other_update=db_nslcmop_update,
)
if db_nsr:
self._write_ns_status(
nsr_id=nsr_id,
ns_state=None,
current_operation="IDLE",
current_operation_id=None,
other_update=db_nsr_update,
)
if nslcmop_operation_state:
try:
msg = {
"nsr_id": nsr_id,
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
await self.msg.aiowrite("ns", "scaled", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
)
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
async def _scale_kdu(
self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
):
_scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
for kdu_name in _scaling_info:
for kdu_scaling_info in _scaling_info[kdu_name]:
deployed_kdu, index = get_deployed_kdu(
nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
)
cluster_uuid = deployed_kdu["k8scluster-uuid"]
kdu_instance = deployed_kdu["kdu-instance"]
kdu_model = deployed_kdu.get("kdu-model")
scale = int(kdu_scaling_info["scale"])
k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
db_dict = {
"collection": "nsrs",
"filter": {"_id": nsr_id},
"path": "_admin.deployed.K8s.{}".format(index),
}
step = "scaling application {}".format(
kdu_scaling_info["resource-name"]
)
self.logger.debug(logging_text + step)
if kdu_scaling_info["type"] == "delete":
kdu_config = get_configuration(db_vnfd, kdu_name)
if (
kdu_config
and kdu_config.get("terminate-config-primitive")
and get_juju_ee_ref(db_vnfd, kdu_name) is None
):
terminate_config_primitive_list = kdu_config.get(
"terminate-config-primitive"
)
terminate_config_primitive_list.sort(
key=lambda val: int(val["seq"])
)
for (
terminate_config_primitive
) in terminate_config_primitive_list:
primitive_params_ = self._map_primitive_params(
terminate_config_primitive, {}, {}
)
step = "execute terminate config primitive"
self.logger.debug(logging_text + step)
await asyncio.wait_for(
self.k8scluster_map[k8s_cluster_type].exec_primitive(
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
primitive_name=terminate_config_primitive["name"],
params=primitive_params_,
db_dict=db_dict,
total_timeout=self.timeout.primitive,
vca_id=vca_id,
),
timeout=self.timeout.primitive
* self.timeout.primitive_outer_factor,
)
await asyncio.wait_for(
self.k8scluster_map[k8s_cluster_type].scale(
kdu_instance=kdu_instance,
scale=scale,
resource_name=kdu_scaling_info["resource-name"],
total_timeout=self.timeout.scale_on_error,
vca_id=vca_id,
cluster_uuid=cluster_uuid,
kdu_model=kdu_model,
atomic=True,
db_dict=db_dict,
),
timeout=self.timeout.scale_on_error
* self.timeout.scale_on_error_outer_factor,
)
if kdu_scaling_info["type"] == "create":
kdu_config = get_configuration(db_vnfd, kdu_name)
if (
kdu_config
and kdu_config.get("initial-config-primitive")
and get_juju_ee_ref(db_vnfd, kdu_name) is None
):
initial_config_primitive_list = kdu_config.get(
"initial-config-primitive"
)
initial_config_primitive_list.sort(
key=lambda val: int(val["seq"])
)
for initial_config_primitive in initial_config_primitive_list:
primitive_params_ = self._map_primitive_params(
initial_config_primitive, {}, {}
)
step = "execute initial config primitive"
self.logger.debug(logging_text + step)
await asyncio.wait_for(
self.k8scluster_map[k8s_cluster_type].exec_primitive(
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
primitive_name=initial_config_primitive["name"],
params=primitive_params_,
db_dict=db_dict,
vca_id=vca_id,
),
timeout=600,
)
async def _scale_ng_ro(
self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
):
nsr_id = db_nslcmop["nsInstanceId"]
db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
db_vnfrs = {}
# read from db: vnfd's for every vnf
db_vnfds = []
# for each vnf in ns, read vnfd
for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
# if we haven't this vnfd, read it from db
if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
# read from db
vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
db_vnfds.append(vnfd)
n2vc_key = self.n2vc.get_public_key()
n2vc_key_list = [n2vc_key]
self.scale_vnfr(
db_vnfr,
vdu_scaling_info.get("vdu-create"),
vdu_scaling_info.get("vdu-delete"),
mark_delete=True,
)
# db_vnfr has been updated, update db_vnfrs to use it
db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
await self._instantiate_ng_ro(
logging_text,
nsr_id,
db_nsd,
db_nsr,
db_nslcmop,
db_vnfrs,
db_vnfds,
n2vc_key_list,
stage=stage,
start_deploy=time(),
timeout_ns_deploy=self.timeout.ns_deploy,
)
if vdu_scaling_info.get("vdu-delete"):
self.scale_vnfr(
db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
)
async def extract_prometheus_scrape_jobs(
self,
ee_id: str,
artifact_path: str,
ee_config_descriptor: dict,
vnfr_id: str,
nsr_id: str,
target_ip: str,
element_type: str,
vnf_member_index: str = "",
vdu_id: str = "",
vdu_index: int = None,
kdu_name: str = "",
kdu_index: int = None,
) -> dict:
"""Method to extract prometheus scrape jobs from EE's Prometheus template job file
This method will wait until the corresponding VDU or KDU is fully instantiated
Args:
ee_id (str): Execution Environment ID
artifact_path (str): Path where the EE's content is (including the Prometheus template file)
ee_config_descriptor (dict): Execution Environment's configuration descriptor
vnfr_id (str): VNFR ID where this EE applies
nsr_id (str): NSR ID where this EE applies
target_ip (str): VDU/KDU instance IP address
element_type (str): NS or VNF or VDU or KDU
vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
kdu_name (str, optional): KDU name where this EE applies. Defaults to "".
kdu_index (int, optional): KDU index where this EE applies. Defaults to None.
Raises:
LcmException: When the VDU or KDU instance was not found in an hour
Returns:
_type_: Prometheus jobs
"""
# default the vdur and kdur names to an empty string, to avoid any later
# problem with Prometheus when the element type is not VDU or KDU
vdur_name = ""
kdur_name = ""
# look if exist a file called 'prometheus*.j2' and
artifact_content = self.fs.dir_ls(artifact_path)
job_file = next(
(
f
for f in artifact_content
if f.startswith("prometheus") and f.endswith(".j2")
),
None,
)
if not job_file:
return
self.logger.debug("Artifact path{}".format(artifact_path))
self.logger.debug("job file{}".format(job_file))
with self.fs.file_open((artifact_path, job_file), "r") as f:
job_data = f.read()
# obtain the VDUR or KDUR, if the element type is VDU or KDU
if element_type in ("VDU", "KDU"):
for _ in range(360):
db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
if vdu_id and vdu_index is not None:
vdur = next(
(
x
for x in get_iterable(db_vnfr, "vdur")
if (
x.get("vdu-id-ref") == vdu_id
and x.get("count-index") == vdu_index
)
),
{},
)
if vdur.get("name"):
vdur_name = vdur.get("name")
break
if kdu_name and kdu_index is not None:
kdur = next(
(
x
for x in get_iterable(db_vnfr, "kdur")
if (
x.get("kdu-name") == kdu_name
and x.get("count-index") == kdu_index
)
),
{},
)
if kdur.get("name"):
kdur_name = kdur.get("name")
break
await asyncio.sleep(10)
else:
if vdu_id and vdu_index is not None:
raise LcmException(
f"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
)
if kdu_name and kdu_index is not None:
raise LcmException(
f"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
)
if ee_id is not None:
_, namespace, helm_id = get_ee_id_parts(
ee_id
) # get namespace and EE gRPC service name
host_name = f'{helm_id}-{ee_config_descriptor["metric-service"]}.{namespace}.svc' # svc_name.namespace.svc
host_port = "80"
vnfr_id = vnfr_id.replace("-", "")
variables = {
"JOB_NAME": vnfr_id,
"TARGET_IP": target_ip,
"EXPORTER_POD_IP": host_name,
"EXPORTER_POD_PORT": host_port,
"NSR_ID": nsr_id,
"VNF_MEMBER_INDEX": vnf_member_index,
"VDUR_NAME": vdur_name,
"KDUR_NAME": kdur_name,
"ELEMENT_TYPE": element_type,
}
else:
metric_path = ee_config_descriptor["metric-path"]
target_port = ee_config_descriptor["metric-port"]
vnfr_id = vnfr_id.replace("-", "")
variables = {
"JOB_NAME": vnfr_id,
"TARGET_IP": target_ip,
"TARGET_PORT": target_port,
"METRIC_PATH": metric_path,
}
job_list = parse_job(job_data, variables)
# ensure job_name is using the vnfr_id. Adding the metadata nsr_id
for job in job_list:
if (
not isinstance(job.get("job_name"), str)
or vnfr_id not in job["job_name"]
):
job["job_name"] = vnfr_id + "_" + str(SystemRandom().randint(1, 10000))
job["nsr_id"] = nsr_id
job["vnfr_id"] = vnfr_id
return job_list
async def rebuild_start_stop(
self, nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
):
logging_text = "Task ns={} {}={} ".format(nsr_id, operation_type, nslcmop_id)
self.logger.info(logging_text + "Enter")
stage = ["Preparing the environment", ""]
# database nsrs record
db_nsr_update = {}
vdu_vim_name = None
vim_vm_id = None
# in case of error, indicates what part of scale was failed to put nsr at error status
start_deploy = time()
try:
db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_id})
vim_account_id = db_vnfr.get("vim-account-id")
vim_info_key = "vim:" + vim_account_id
vdu_id = additional_param["vdu_id"]
vdurs = [item for item in db_vnfr["vdur"] if item["vdu-id-ref"] == vdu_id]
vdur = find_in_list(
vdurs, lambda vdu: vdu["count-index"] == additional_param["count-index"]
)
if vdur:
vdu_vim_name = vdur["name"]
vim_vm_id = vdur["vim_info"][vim_info_key]["vim_id"]
target_vim, _ = next(k_v for k_v in vdur["vim_info"].items())
else:
raise LcmException("Target vdu is not found")
self.logger.info("vdu_vim_name >> {} ".format(vdu_vim_name))
# wait for any previous tasks in process
stage[1] = "Waiting for previous operations to terminate"
self.logger.info(stage[1])
await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
stage[1] = "Reading from database."
self.logger.info(stage[1])
self._write_ns_status(
nsr_id=nsr_id,
ns_state=None,
current_operation=operation_type.upper(),
current_operation_id=nslcmop_id,
)
self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
# read from db: ns
stage[1] = "Getting nsr={} from db.".format(nsr_id)
db_nsr_update["operational-status"] = operation_type
self.update_db_2("nsrs", nsr_id, db_nsr_update)
# Payload for RO
desc = {
operation_type: {
"vim_vm_id": vim_vm_id,
"vnf_id": vnf_id,
"vdu_index": additional_param["count-index"],
"vdu_id": vdur["id"],
"target_vim": target_vim,
"vim_account_id": vim_account_id,
}
}
stage[1] = "Sending rebuild request to RO... {}".format(desc)
self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
self.logger.info("ro nsr id: {}".format(nsr_id))
result_dict = await self.RO.operate(nsr_id, desc, operation_type)
self.logger.info("response from RO: {}".format(result_dict))
action_id = result_dict["action_id"]
await self._wait_ng_ro(
nsr_id,
action_id,
nslcmop_id,
start_deploy,
self.timeout.operate,
None,
"start_stop_rebuild",
)
return "COMPLETED", "Done"
except (ROclient.ROClientException, DbException, LcmException) as e:
self.logger.error("Exit Exception {}".format(e))
exc = e
except asyncio.CancelledError:
self.logger.error("Cancelled Exception while '{}'".format(stage))
exc = "Operation was cancelled"
except Exception as e:
exc = traceback.format_exc()
self.logger.critical(
"Exit Exception {} {}".format(type(e).__name__, e), exc_info=True
)
return "FAILED", "Error in operate VNF {}".format(exc)
async def migrate(self, nsr_id, nslcmop_id):
"""
Migrate VNFs and VDUs instances in a NS
:param: nsr_id: NS Instance ID
:param: nslcmop_id: nslcmop ID of migrate
"""
# Try to lock HA task here
task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
if not task_is_locked_by_me:
return
logging_text = "Task ns={} migrate ".format(nsr_id)
self.logger.debug(logging_text + "Enter")
# get all needed from database
db_nslcmop = None
db_nslcmop_update = {}
nslcmop_operation_state = None
db_nsr_update = {}
target = {}
exc = None
# in case of error, indicates what part of scale was failed to put nsr at error status
start_deploy = time()
try:
# wait for any previous tasks in process
step = "Waiting for previous operations to terminate"
await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
self._write_ns_status(
nsr_id=nsr_id,
ns_state=None,
current_operation="MIGRATING",
current_operation_id=nslcmop_id,
)
step = "Getting nslcmop from database"
self.logger.debug(
step + " after having waited for previous tasks to be completed"
)
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
migrate_params = db_nslcmop.get("operationParams")
target = {}
target.update(migrate_params)
if "migrateToHost" in target:
desc = await self.RO.migrate(nsr_id, target)
self.logger.debug("RO return > {}".format(desc))
action_id = desc["action_id"]
await self._wait_ng_ro(
nsr_id,
action_id,
nslcmop_id,
start_deploy,
self.timeout.migrate,
operation="migrate",
)
elif "targetHostK8sLabels" in target:
await self.k8sclusterhelm3.migrate(nsr_id, target)
except (ROclient.ROClientException, DbException, LcmException) as e:
self.logger.error("Exit Exception {}".format(e))
exc = e
except asyncio.CancelledError:
self.logger.error("Cancelled Exception while '{}'".format(step))
exc = "Operation was cancelled"
except Exception as e:
exc = traceback.format_exc()
self.logger.critical(
"Exit Exception {} {}".format(type(e).__name__, e), exc_info=True
)
finally:
self._write_ns_status(
nsr_id=nsr_id,
ns_state=None,
current_operation="IDLE",
current_operation_id=None,
)
if exc:
db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
nslcmop_operation_state = "FAILED"
else:
nslcmop_operation_state = "COMPLETED"
db_nslcmop_update["detailed-status"] = "Done"
db_nsr_update["detailed-status"] = "Done"
self._write_op_status(
op_id=nslcmop_id,
stage="",
error_message="",
operation_state=nslcmop_operation_state,
other_update=db_nslcmop_update,
)
if nslcmop_operation_state:
try:
msg = {
"nsr_id": nsr_id,
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
await self.msg.aiowrite("ns", "migrated", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
)
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_migrate")
async def heal(self, nsr_id, nslcmop_id):
"""
Heal NS
:param nsr_id: ns instance to heal
:param nslcmop_id: operation to run
:return:
"""
# Try to lock HA task here
task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
if not task_is_locked_by_me:
return
logging_text = "Task ns={} heal={} ".format(nsr_id, nslcmop_id)
stage = ["", "", ""]
tasks_dict_info = {}
# ^ stage, step, VIM progress
self.logger.debug(logging_text + "Enter")
# get all needed from database
db_nsr = None
db_nslcmop_update = {}
db_nsr_update = {}
db_vnfrs = {} # vnf's info indexed by _id
exc = None
old_operational_status = ""
old_config_status = ""
nsi_id = None
try:
# wait for any previous tasks in process
step = "Waiting for previous operations to terminate"
await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
self._write_ns_status(
nsr_id=nsr_id,
ns_state=None,
current_operation="HEALING",
current_operation_id=nslcmop_id,
)
step = "Getting nslcmop from database"
self.logger.debug(
step + " after having waited for previous tasks to be completed"
)
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
step = "Getting nsr from database"
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
old_operational_status = db_nsr["operational-status"]
old_config_status = db_nsr["config-status"]
db_nsr_update = {
"operational-status": "healing",
"_admin.deployed.RO.operational-status": "healing",
}
self.update_db_2("nsrs", nsr_id, db_nsr_update)
step = "Sending heal order to VIM"
await self.heal_RO(
logging_text=logging_text,
nsr_id=nsr_id,
db_nslcmop=db_nslcmop,
stage=stage,
)
# VCA tasks
# read from db: nsd
stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
self.logger.debug(logging_text + stage[1])
nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
self.fs.sync(db_nsr["nsd-id"])
db_nsr["nsd"] = nsd
# read from db: vnfr's of this ns
step = "Getting vnfrs from db"
db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
for vnfr in db_vnfrs_list:
db_vnfrs[vnfr["_id"]] = vnfr
self.logger.debug("ns.heal db_vnfrs={}".format(db_vnfrs))
# Check for each target VNF
target_list = db_nslcmop.get("operationParams", {}).get("healVnfData", {})
for target_vnf in target_list:
# Find this VNF in the list from DB
vnfr_id = target_vnf.get("vnfInstanceId", None)
if vnfr_id:
db_vnfr = db_vnfrs[vnfr_id]
vnfd_id = db_vnfr.get("vnfd-id")
vnfd_ref = db_vnfr.get("vnfd-ref")
vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
base_folder = vnfd["_admin"]["storage"]
vdu_id = None
vdu_index = 0
vdu_name = None
kdu_name = None
nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
member_vnf_index = db_vnfr.get("member-vnf-index-ref")
# Check each target VDU and deploy N2VC
target_vdu_list = target_vnf.get("additionalParams", {}).get(
"vdu", []
)
if not target_vdu_list:
# Codigo nuevo para crear diccionario
target_vdu_list = []
for existing_vdu in db_vnfr.get("vdur"):
vdu_name = existing_vdu.get("vdu-name", None)
vdu_index = existing_vdu.get("count-index", 0)
vdu_run_day1 = target_vnf.get("additionalParams", {}).get(
"run-day1", False
)
vdu_to_be_healed = {
"vdu-id": vdu_name,
"count-index": vdu_index,
"run-day1": vdu_run_day1,
}
target_vdu_list.append(vdu_to_be_healed)
for target_vdu in target_vdu_list:
deploy_params_vdu = target_vdu
# Set run-day1 vnf level value if not vdu level value exists
if not deploy_params_vdu.get("run-day1") and target_vnf.get(
"additionalParams", {}
).get("run-day1"):
deploy_params_vdu["run-day1"] = target_vnf[
"additionalParams"
].get("run-day1")
vdu_name = target_vdu.get("vdu-id", None)
# TODO: Get vdu_id from vdud.
vdu_id = vdu_name
# For multi instance VDU count-index is mandatory
# For single session VDU count-indes is 0
vdu_index = target_vdu.get("count-index", 0)
# n2vc_redesign STEP 3 to 6 Deploy N2VC
stage[1] = "Deploying Execution Environments."
self.logger.debug(logging_text + stage[1])
# VNF Level charm. Normal case when proxy charms.
# If target instance is management machine continue with actions: recreate EE for native charms or reinject juju key for proxy charms.
descriptor_config = get_configuration(vnfd, vnfd_ref)
if descriptor_config:
# Continue if healed machine is management machine
vnf_ip_address = db_vnfr.get("ip-address")
target_instance = None
for instance in db_vnfr.get("vdur", None):
if (
instance["vdu-name"] == vdu_name
and instance["count-index"] == vdu_index
):
target_instance = instance
break
if vnf_ip_address == target_instance.get("ip-address"):
self._heal_n2vc(
logging_text=logging_text
+ "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
member_vnf_index, vdu_name, vdu_index
),
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
nsr_id=nsr_id,
nsi_id=nsi_id,
vnfd_id=vnfd_ref,
vdu_id=None,
kdu_name=None,
member_vnf_index=member_vnf_index,
vdu_index=0,
vdu_name=None,
deploy_params=deploy_params_vdu,
descriptor_config=descriptor_config,
base_folder=base_folder,
task_instantiation_info=tasks_dict_info,
stage=stage,
)
# VDU Level charm. Normal case with native charms.
descriptor_config = get_configuration(vnfd, vdu_name)
if descriptor_config:
self._heal_n2vc(
logging_text=logging_text
+ "member_vnf_index={}, vdu_name={}, vdu_index={} ".format(
member_vnf_index, vdu_name, vdu_index
),
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
nsr_id=nsr_id,
nsi_id=nsi_id,
vnfd_id=vnfd_ref,
vdu_id=vdu_id,
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params_vdu,
descriptor_config=descriptor_config,
base_folder=base_folder,
task_instantiation_info=tasks_dict_info,
stage=stage,
)
except (
ROclient.ROClientException,
DbException,
LcmException,
NgRoException,
) as e:
self.logger.error(logging_text + "Exit Exception {}".format(e))
exc = e
except asyncio.CancelledError:
self.logger.error(
logging_text + "Cancelled Exception while '{}'".format(step)
)
exc = "Operation was cancelled"
except Exception as e:
exc = traceback.format_exc()
self.logger.critical(
logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
exc_info=True,
)
finally:
error_list = list()
if db_vnfrs_list and target_list:
for vnfrs in db_vnfrs_list:
for vnf_instance in target_list:
if vnfrs["_id"] == vnf_instance.get("vnfInstanceId"):
self.db.set_list(
"vnfrs",
{"_id": vnfrs["_id"]},
{"_admin.modified": time()},
)
if exc:
error_list.append(str(exc))
try:
if tasks_dict_info:
stage[1] = "Waiting for healing pending tasks."
self.logger.debug(logging_text + stage[1])
exc = await self._wait_for_tasks(
logging_text,
tasks_dict_info,
self.timeout.ns_deploy,
stage,
nslcmop_id,
nsr_id=nsr_id,
)
except asyncio.CancelledError:
error_list.append("Cancelled")
await self._cancel_pending_tasks(logging_text, tasks_dict_info)
await self._wait_for_tasks(
logging_text,
tasks_dict_info,
self.timeout.ns_deploy,
stage,
nslcmop_id,
nsr_id=nsr_id,
)
if error_list:
error_detail = "; ".join(error_list)
db_nslcmop_update[
"detailed-status"
] = error_description_nslcmop = "FAILED {}: {}".format(
step, error_detail
)
nslcmop_operation_state = "FAILED"
if db_nsr:
db_nsr_update["operational-status"] = old_operational_status
db_nsr_update["config-status"] = old_config_status
db_nsr_update[
"detailed-status"
] = "FAILED healing nslcmop={} {}: {}".format(
nslcmop_id, step, error_detail
)
for task, task_name in tasks_dict_info.items():
if not task.done() or task.cancelled() or task.exception():
if task_name.startswith(self.task_name_deploy_vca):
# A N2VC task is pending
db_nsr_update["config-status"] = "failed"
else:
# RO task is pending
db_nsr_update["operational-status"] = "failed"
else:
error_description_nslcmop = None
nslcmop_operation_state = "COMPLETED"
db_nslcmop_update["detailed-status"] = "Done"
db_nsr_update["detailed-status"] = "Done"
db_nsr_update["operational-status"] = "running"
db_nsr_update["config-status"] = "configured"
self._write_op_status(
op_id=nslcmop_id,
stage="",
error_message=error_description_nslcmop,
operation_state=nslcmop_operation_state,
other_update=db_nslcmop_update,
)
if db_nsr:
self._write_ns_status(
nsr_id=nsr_id,
ns_state=None,
current_operation="IDLE",
current_operation_id=None,
other_update=db_nsr_update,
)
if nslcmop_operation_state:
try:
msg = {
"nsr_id": nsr_id,
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
await self.msg.aiowrite("ns", "healed", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
)
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_heal")
async def heal_RO(
self,
logging_text,
nsr_id,
db_nslcmop,
stage,
):
"""
Heal at RO
:param logging_text: preffix text to use at logging
:param nsr_id: nsr identity
:param db_nslcmop: database content of ns operation, in this case, 'instantiate'
:param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
:return: None or exception
"""
def get_vim_account(vim_account_id):
nonlocal db_vims
if vim_account_id in db_vims:
return db_vims[vim_account_id]
db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
db_vims[vim_account_id] = db_vim
return db_vim
try:
start_heal = time()
ns_params = db_nslcmop.get("operationParams")
if ns_params and ns_params.get("timeout_ns_heal"):
timeout_ns_heal = ns_params["timeout_ns_heal"]
else:
timeout_ns_heal = self.timeout.ns_heal
db_vims = {}
nslcmop_id = db_nslcmop["_id"]
target = {
"action_id": nslcmop_id,
}
self.logger.warning(
"db_nslcmop={} and timeout_ns_heal={}".format(
db_nslcmop, timeout_ns_heal
)
)
target.update(db_nslcmop.get("operationParams", {}))
self.logger.debug("Send to RO > nsr_id={} target={}".format(nsr_id, target))
desc = await self.RO.recreate(nsr_id, target)
self.logger.debug("RO return > {}".format(desc))
action_id = desc["action_id"]
# waits for RO to complete because Reinjecting juju key at ro can find VM in state Deleted
await self._wait_ng_ro(
nsr_id,
action_id,
nslcmop_id,
start_heal,
timeout_ns_heal,
stage,
operation="healing",
)
# Updating NSR
db_nsr_update = {
"_admin.deployed.RO.operational-status": "running",
"detailed-status": " ".join(stage),
}
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
self.logger.debug(
logging_text + "ns healed at RO. RO_id={}".format(action_id)
)
except Exception as e:
stage[2] = "ERROR healing at VIM"
# self.set_vnfr_at_error(db_vnfrs, str(e))
self.logger.error(
"Error healing at VIM {}".format(e),
exc_info=not isinstance(
e,
(
ROclient.ROClientException,
LcmException,
DbException,
NgRoException,
),
),
)
raise
def _heal_n2vc(
self,
logging_text,
db_nsr,
db_vnfr,
nslcmop_id,
nsr_id,
nsi_id,
vnfd_id,
vdu_id,
kdu_name,
member_vnf_index,
vdu_index,
vdu_name,
deploy_params,
descriptor_config,
base_folder,
task_instantiation_info,
stage,
):
# launch instantiate_N2VC in a asyncio task and register task object
# Look where information of this charm is at database <nsrs>._admin.deployed.VCA
# if not found, create one entry and update database
# fill db_nsr._admin.deployed.VCA.<index>
self.logger.debug(
logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
)
charm_name = ""
get_charm_name = False
if "execution-environment-list" in descriptor_config:
ee_list = descriptor_config.get("execution-environment-list", [])
elif "juju" in descriptor_config:
ee_list = [descriptor_config] # ns charms
if "execution-environment-list" not in descriptor_config:
# charm name is only required for ns charms
get_charm_name = True
else: # other types as script are not supported
ee_list = []
for ee_item in ee_list:
self.logger.debug(
logging_text
+ "_deploy_n2vc ee_item juju={}, helm={}".format(
ee_item.get("juju"), ee_item.get("helm-chart")
)
)
ee_descriptor_id = ee_item.get("id")
vca_name, charm_name, vca_type = self.get_vca_info(
ee_item, db_nsr, get_charm_name
)
if not vca_type:
self.logger.debug(
logging_text + "skipping, non juju/charm/helm configuration"
)
continue
vca_index = -1
for vca_index, vca_deployed in enumerate(
db_nsr["_admin"]["deployed"]["VCA"]
):
if not vca_deployed:
continue
if (
vca_deployed.get("member-vnf-index") == member_vnf_index
and vca_deployed.get("vdu_id") == vdu_id
and vca_deployed.get("kdu_name") == kdu_name
and vca_deployed.get("vdu_count_index", 0) == vdu_index
and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
):
break
else:
# not found, create one.
target = (
"ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
)
if vdu_id:
target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
elif kdu_name:
target += "/kdu/{}".format(kdu_name)
vca_deployed = {
"target_element": target,
# ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
"member-vnf-index": member_vnf_index,
"vdu_id": vdu_id,
"kdu_name": kdu_name,
"vdu_count_index": vdu_index,
"operational-status": "init", # TODO revise
"detailed-status": "", # TODO revise
"step": "initial-deploy", # TODO revise
"vnfd_id": vnfd_id,
"vdu_name": vdu_name,
"type": vca_type,
"ee_descriptor_id": ee_descriptor_id,
"charm_name": charm_name,
}
vca_index += 1
# create VCA and configurationStatus in db
db_dict = {
"_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
"configurationStatus.{}".format(vca_index): dict(),
}
self.update_db_2("nsrs", nsr_id, db_dict)
db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
# Launch task
task_n2vc = asyncio.ensure_future(
self.heal_N2VC(
logging_text=logging_text,
vca_index=vca_index,
nsi_id=nsi_id,
db_nsr=db_nsr,
db_vnfr=db_vnfr,
vdu_id=vdu_id,
kdu_name=kdu_name,
vdu_index=vdu_index,
deploy_params=deploy_params,
config_descriptor=descriptor_config,
base_folder=base_folder,
nslcmop_id=nslcmop_id,
stage=stage,
vca_type=vca_type,
vca_name=vca_name,
ee_config_descriptor=ee_item,
)
)
self.lcm_tasks.register(
"ns",
nsr_id,
nslcmop_id,
"instantiate_N2VC-{}".format(vca_index),
task_n2vc,
)
task_instantiation_info[
task_n2vc
] = self.task_name_deploy_vca + " {}.{}".format(
member_vnf_index or "", vdu_id or ""
)
async def heal_N2VC(
self,
logging_text,
vca_index,
nsi_id,
db_nsr,
db_vnfr,
vdu_id,
kdu_name,
vdu_index,
config_descriptor,
deploy_params,
base_folder,
nslcmop_id,
stage,
vca_type,
vca_name,
ee_config_descriptor,
):
nsr_id = db_nsr["_id"]
db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
db_dict = {
"collection": "nsrs",
"filter": {"_id": nsr_id},
"path": db_update_entry,
}
step = ""
try:
element_type = "NS"
element_under_configuration = nsr_id
vnfr_id = None
if db_vnfr:
vnfr_id = db_vnfr["_id"]
osm_config["osm"]["vnf_id"] = vnfr_id
namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
if vca_type == "native_charm":
index_number = 0
else:
index_number = vdu_index or 0
if vnfr_id:
element_type = "VNF"
element_under_configuration = vnfr_id
namespace += ".{}-{}".format(vnfr_id, index_number)
if vdu_id:
namespace += ".{}-{}".format(vdu_id, index_number)
element_type = "VDU"
element_under_configuration = "{}-{}".format(vdu_id, index_number)
osm_config["osm"]["vdu_id"] = vdu_id
elif kdu_name:
namespace += ".{}".format(kdu_name)
element_type = "KDU"
element_under_configuration = kdu_name
osm_config["osm"]["kdu_name"] = kdu_name
# Get artifact path
if base_folder["pkg-dir"]:
artifact_path = "{}/{}/{}/{}".format(
base_folder["folder"],
base_folder["pkg-dir"],
"charms"
if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
else "helm-charts",
vca_name,
)
else:
artifact_path = "{}/Scripts/{}/{}/".format(
base_folder["folder"],
"charms"
if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
else "helm-charts",
vca_name,
)
self.logger.debug("Artifact path > {}".format(artifact_path))
# get initial_config_primitive_list that applies to this element
initial_config_primitive_list = config_descriptor.get(
"initial-config-primitive"
)
self.logger.debug(
"Initial config primitive list > {}".format(
initial_config_primitive_list
)
)
# add config if not present for NS charm
ee_descriptor_id = ee_config_descriptor.get("id")
self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
initial_config_primitive_list, vca_deployed, ee_descriptor_id
)
self.logger.debug(
"Initial config primitive list #2 > {}".format(
initial_config_primitive_list
)
)
# n2vc_redesign STEP 3.1
# find old ee_id if exists
ee_id = vca_deployed.get("ee_id")
vca_id = self.get_vca_id(db_vnfr, db_nsr)
# create or register execution environment in VCA. Only for native charms when healing
if vca_type == "native_charm":
step = "Waiting to VM being up and getting IP address"
self.logger.debug(logging_text + step)
rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
logging_text,
nsr_id,
vnfr_id,
vdu_id,
vdu_index,
user=None,
pub_key=None,
)
credentials = {"hostname": rw_mgmt_ip}
# get username
username = deep_get(
config_descriptor, ("config-access", "ssh-access", "default-user")
)
# TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
# merged. Meanwhile let's get username from initial-config-primitive
if not username and initial_config_primitive_list:
for config_primitive in initial_config_primitive_list:
for param in config_primitive.get("parameter", ()):
if param["name"] == "ssh-username":
username = param["value"]
break
if not username:
raise LcmException(
"Cannot determine the username neither with 'initial-config-primitive' nor with "
"'config-access.ssh-access.default-user'"
)
credentials["username"] = username
# n2vc_redesign STEP 3.2
# TODO: Before healing at RO it is needed to destroy native charm units to be deleted.
self._write_configuration_status(
nsr_id=nsr_id,
vca_index=vca_index,
status="REGISTERING",
element_under_configuration=element_under_configuration,
element_type=element_type,
)
step = "register execution environment {}".format(credentials)
self.logger.debug(logging_text + step)
ee_id = await self.vca_map[vca_type].register_execution_environment(
credentials=credentials,
namespace=namespace,
db_dict=db_dict,
vca_id=vca_id,
)
# update ee_id en db
db_dict_ee_id = {
"_admin.deployed.VCA.{}.ee_id".format(vca_index): ee_id,
}
self.update_db_2("nsrs", nsr_id, db_dict_ee_id)
# for compatibility with MON/POL modules, the need model and application name at database
# TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
# Not sure if this need to be done when healing
"""
ee_id_parts = ee_id.split(".")
db_nsr_update = {db_update_entry + "ee_id": ee_id}
if len(ee_id_parts) >= 2:
model_name = ee_id_parts[0]
application_name = ee_id_parts[1]
db_nsr_update[db_update_entry + "model"] = model_name
db_nsr_update[db_update_entry + "application"] = application_name
"""
# n2vc_redesign STEP 3.3
# Install configuration software. Only for native charms.
step = "Install configuration Software"
self._write_configuration_status(
nsr_id=nsr_id,
vca_index=vca_index,
status="INSTALLING SW",
element_under_configuration=element_under_configuration,
element_type=element_type,
# other_update=db_nsr_update,
other_update=None,
)
# TODO check if already done
self.logger.debug(logging_text + step)
config = None
if vca_type == "native_charm":
config_primitive = next(
(p for p in initial_config_primitive_list if p["name"] == "config"),
None,
)
if config_primitive:
config = self._map_primitive_params(
config_primitive, {}, deploy_params
)
await self.vca_map[vca_type].install_configuration_sw(
ee_id=ee_id,
artifact_path=artifact_path,
db_dict=db_dict,
config=config,
num_units=1,
vca_id=vca_id,
vca_type=vca_type,
)
# write in db flag of configuration_sw already installed
self.update_db_2(
"nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
)
# Not sure if this need to be done when healing
"""
# add relations for this VCA (wait for other peers related with this VCA)
await self._add_vca_relations(
logging_text=logging_text,
nsr_id=nsr_id,
vca_type=vca_type,
vca_index=vca_index,
)
"""
# if SSH access is required, then get execution environment SSH public
# if native charm we have waited already to VM be UP
if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm-v3"):
pub_key = None
user = None
# self.logger.debug("get ssh key block")
if deep_get(
config_descriptor, ("config-access", "ssh-access", "required")
):
# self.logger.debug("ssh key needed")
# Needed to inject a ssh key
user = deep_get(
config_descriptor,
("config-access", "ssh-access", "default-user"),
)
step = "Install configuration Software, getting public ssh key"
pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
)
step = "Insert public key into VM user={} ssh_key={}".format(
user, pub_key
)
else:
# self.logger.debug("no need to get ssh key")
step = "Waiting to VM being up and getting IP address"
self.logger.debug(logging_text + step)
# n2vc_redesign STEP 5.1
# wait for RO (ip-address) Insert pub_key into VM
# IMPORTANT: We need do wait for RO to complete healing operation.
await self._wait_heal_ro(nsr_id, self.timeout.ns_heal)
if vnfr_id:
if kdu_name:
rw_mgmt_ip = await self.wait_kdu_up(
logging_text, nsr_id, vnfr_id, kdu_name
)
else:
rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
logging_text,
nsr_id,
vnfr_id,
vdu_id,
vdu_index,
user=user,
pub_key=pub_key,
)
else:
rw_mgmt_ip = None # This is for a NS configuration
self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
# store rw_mgmt_ip in deploy params for later replacement
deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
# Day1 operations.
# get run-day1 operation parameter
runDay1 = deploy_params.get("run-day1", False)
self.logger.debug(
"Healing vnf={}, vdu={}, runDay1 ={}".format(vnfr_id, vdu_id, runDay1)
)
if runDay1:
# n2vc_redesign STEP 6 Execute initial config primitive
step = "execute initial config primitive"
# wait for dependent primitives execution (NS -> VNF -> VDU)
if initial_config_primitive_list:
await self._wait_dependent_n2vc(
nsr_id, vca_deployed_list, vca_index
)
# stage, in function of element type: vdu, kdu, vnf or ns
my_vca = vca_deployed_list[vca_index]
if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
# VDU or KDU
stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
elif my_vca.get("member-vnf-index"):
# VNF
stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
else:
# NS
stage[0] = "Stage 5/5: running Day-1 primitives for NS."
self._write_configuration_status(
nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
)
self._write_op_status(op_id=nslcmop_id, stage=stage)
check_if_terminated_needed = True
for initial_config_primitive in initial_config_primitive_list:
# adding information on the vca_deployed if it is a NS execution environment
if not vca_deployed["member-vnf-index"]:
deploy_params["ns_config_info"] = json.dumps(
self._get_ns_config_info(nsr_id)
)
# TODO check if already done
primitive_params_ = self._map_primitive_params(
initial_config_primitive, {}, deploy_params
)
step = "execute primitive '{}' params '{}'".format(
initial_config_primitive["name"], primitive_params_
)
self.logger.debug(logging_text + step)
await self.vca_map[vca_type].exec_primitive(
ee_id=ee_id,
primitive_name=initial_config_primitive["name"],
params_dict=primitive_params_,
db_dict=db_dict,
vca_id=vca_id,
vca_type=vca_type,
)
# Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
if check_if_terminated_needed:
if config_descriptor.get("terminate-config-primitive"):
self.update_db_2(
"nsrs",
nsr_id,
{db_update_entry + "needed_terminate": True},
)
check_if_terminated_needed = False
# TODO register in database that primitive is done
# STEP 7 Configure metrics
# Not sure if this need to be done when healing
"""
if vca_type == "helm" or vca_type == "helm-v3":
prometheus_jobs = await self.extract_prometheus_scrape_jobs(
ee_id=ee_id,
artifact_path=artifact_path,
ee_config_descriptor=ee_config_descriptor,
vnfr_id=vnfr_id,
nsr_id=nsr_id,
target_ip=rw_mgmt_ip,
)
if prometheus_jobs:
self.update_db_2(
"nsrs",
nsr_id,
{db_update_entry + "prometheus_jobs": prometheus_jobs},
)
for job in prometheus_jobs:
self.db.set_one(
"prometheus_jobs",
{"job_name": job["job_name"]},
job,
upsert=True,
fail_on_empty=False,
)
"""
step = "instantiated at VCA"
self.logger.debug(logging_text + step)
self._write_configuration_status(
nsr_id=nsr_id, vca_index=vca_index, status="READY"
)
except Exception as e: # TODO not use Exception but N2VC exception
# self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
if not isinstance(
e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
):
self.logger.error(
"Exception while {} : {}".format(step, e), exc_info=True
)
self._write_configuration_status(
nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
)
raise LcmException("{} {}".format(step, e)) from e
async def _wait_heal_ro(
self,
nsr_id,
timeout=600,
):
start_time = time()
while time() <= start_time + timeout:
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
operational_status_ro = db_nsr["_admin"]["deployed"]["RO"][
"operational-status"
]
self.logger.debug("Wait Heal RO > {}".format(operational_status_ro))
if operational_status_ro != "healing":
break
await asyncio.sleep(15)
else: # timeout_ns_deploy
raise NgRoException("Timeout waiting ns to deploy")
async def vertical_scale(self, nsr_id, nslcmop_id):
"""
Vertical Scale the VDUs in a NS
:param: nsr_id: NS Instance ID
:param: nslcmop_id: nslcmop ID of migrate
"""
logging_text = "Task ns={} vertical scale ".format(nsr_id)
self.logger.info(logging_text + "Enter")
stage = ["Preparing the environment", ""]
# get all needed from database
db_nslcmop = None
db_nsr_update = {}
target = {}
exc = None
# in case of error, indicates what part of scale was failed to put nsr at error status
start_deploy = time()
try:
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
operationParams = db_nslcmop.get("operationParams")
vertical_scale_data = operationParams["verticalScaleVnf"]
vnfd_id = vertical_scale_data["vnfdId"]
count_index = vertical_scale_data["countIndex"]
vdu_id_ref = vertical_scale_data["vduId"]
vnfr_id = vertical_scale_data["vnfInstanceId"]
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
db_flavor = db_nsr.get("flavor")
db_flavor_index = str(len(db_flavor))
def set_flavor_refrence_to_vdur(diff=0):
"""
Utility function to add and remove the
ref to new ns-flavor-id to vdurs
:param: diff: default 0
"""
q_filter = {}
db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
if (
vdur.get("count-index") == count_index
and vdur.get("vdu-id-ref") == vdu_id_ref
):
filter_text = {
"_id": vnfr_id,
"vdur.count-index": count_index,
"vdur.vdu-id-ref": vdu_id_ref,
}
q_filter.update(filter_text)
db_update = {}
db_update["vdur.{}.ns-flavor-id".format(vdu_index)] = str(
int(db_flavor_index) - diff
)
self.db.set_one(
"vnfrs",
q_filter=q_filter,
update_dict=db_update,
fail_on_empty=True,
)
# wait for any previous tasks in process
stage[1] = "Waiting for previous operations to terminate"
await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
self._write_ns_status(
nsr_id=nsr_id,
ns_state=None,
current_operation="VERTICALSCALE",
current_operation_id=nslcmop_id,
)
self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
self.logger.debug(
stage[1] + " after having waited for previous tasks to be completed"
)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
virtual_compute = vnfd["virtual-compute-desc"][0]
virtual_memory = round(
float(virtual_compute["virtual-memory"]["size"]) * 1024
)
virtual_cpu = virtual_compute["virtual-cpu"]["num-virtual-cpu"]
virtual_storage = vnfd["virtual-storage-desc"][0]["size-of-storage"]
flavor_dict_update = {
"id": db_flavor_index,
"memory-mb": virtual_memory,
"name": f"{vdu_id_ref}-{count_index}-flv",
"storage-gb": str(virtual_storage),
"vcpu-count": virtual_cpu,
}
db_flavor.append(flavor_dict_update)
db_update = {}
db_update["flavor"] = db_flavor
q_filter = {
"_id": nsr_id,
}
# Update the VNFRS and NSRS with the requested flavour detail, So that ro tasks can function properly
self.db.set_one(
"nsrs",
q_filter=q_filter,
update_dict=db_update,
fail_on_empty=True,
)
set_flavor_refrence_to_vdur()
target = {}
new_operationParams = {
"lcmOperationType": "verticalscale",
"verticalScale": "CHANGE_VNFFLAVOR",
"nsInstanceId": nsr_id,
"changeVnfFlavorData": {
"vnfInstanceId": vnfr_id,
"additionalParams": {
"vduid": vdu_id_ref,
"vduCountIndex": count_index,
"virtualMemory": virtual_memory,
"numVirtualCpu": int(virtual_cpu),
"sizeOfStorage": int(virtual_storage),
},
},
}
target.update(new_operationParams)
stage[1] = "Sending vertical scale request to RO... {}".format(target)
self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
self.logger.info("RO target > {}".format(target))
desc = await self.RO.vertical_scale(nsr_id, target)
self.logger.info("RO.vertical_scale return value - {}".format(desc))
action_id = desc["action_id"]
await self._wait_ng_ro(
nsr_id,
action_id,
nslcmop_id,
start_deploy,
self.timeout.verticalscale,
operation="verticalscale",
)
except (
NgRoException,
ROclient.ROClientException,
DbException,
LcmException,
) as e:
self.logger.error("Exit Exception {}".format(e))
exc = e
except asyncio.CancelledError:
self.logger.error("Cancelled Exception while '{}'".format(stage))
exc = "Operation was cancelled"
except Exception as e:
exc = traceback.format_exc()
self.logger.critical(
"Exit Exception {} {}".format(type(e).__name__, e), exc_info=True
)
finally:
if exc:
self.logger.critical(
"Vertical-Scale operation Failed, cleaning up nsrs and vnfrs flavor detail"
)
self.db.set_one(
"nsrs",
{"_id": nsr_id},
None,
pull={"flavor": {"id": db_flavor_index}},
)
set_flavor_refrence_to_vdur(diff=1)
return "FAILED", "Error in verticalscale VNF {}".format(exc)
else:
return "COMPLETED", "Done"