blob: 5b0b7b0c5cbdc8ef5a7469d5d3a1fbb25fc61e42 [file] [log] [blame]
# -*- coding: utf-8 -*-
##
# Copyright 2018 Telefonica S.A.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
##
import asyncio
import yaml
import logging
import logging.handlers
import traceback
import json
from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
from osm_lcm import ROclient
from osm_lcm.ng_ro import NgRoClient, NgRoException
from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
from osm_lcm.data_utils.nsd import get_vnf_profiles
from osm_lcm.data_utils.vnfd import get_vdu_list, get_vdu_profile, \
get_ee_sorted_initial_config_primitive_list, get_ee_sorted_terminate_config_primitive_list, \
get_kdu_list, get_virtual_link_profiles, get_vdu, get_configuration, \
get_vdu_index, get_scaling_aspect, get_number_of_instances, get_juju_ee_ref
from osm_lcm.data_utils.list_utils import find_in_list
from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index
from osm_lcm.data_utils.dict_utils import parse_yaml_strings
from osm_lcm.data_utils.database.vim_account import VimAccountDB
from n2vc.k8s_helm_conn import K8sHelmConnector
from n2vc.k8s_helm3_conn import K8sHelm3Connector
from n2vc.k8s_juju_conn import K8sJujuConnector
from osm_common.dbbase import DbException
from osm_common.fsbase import FsException
from osm_lcm.data_utils.database.database import Database
from osm_lcm.data_utils.filesystem.filesystem import Filesystem
from n2vc.n2vc_juju_conn import N2VCJujuConnector
from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
from osm_lcm.lcm_helm_conn import LCMHelmConn
from copy import copy, deepcopy
from time import time
from uuid import uuid4
from random import randint
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
class NsLcm(LcmBase):
timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
timeout_charm_delete = 10 * 60
timeout_primitive = 30 * 60 # timeout for primitive execution
timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
SUBOPERATION_STATUS_NOT_FOUND = -1
SUBOPERATION_STATUS_NEW = -2
SUBOPERATION_STATUS_SKIP = -3
task_name_deploy_vca = "Deploying VCA"
def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
super().__init__(
msg=msg,
logger=logging.getLogger('lcm.ns')
)
self.db = Database().instance.db
self.fs = Filesystem().instance.fs
self.loop = loop
self.lcm_tasks = lcm_tasks
self.timeout = config["timeout"]
self.ro_config = config["ro_config"]
self.ng_ro = config["ro_config"].get("ng")
self.vca_config = config["VCA"].copy()
# create N2VC connector
self.n2vc = N2VCJujuConnector(
log=self.logger,
loop=self.loop,
url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
username=self.vca_config.get('user', None),
vca_config=self.vca_config,
on_update_db=self._on_update_n2vc_db,
fs=self.fs,
db=self.db
)
self.conn_helm_ee = LCMHelmConn(
log=self.logger,
loop=self.loop,
url=None,
username=None,
vca_config=self.vca_config,
on_update_db=self._on_update_n2vc_db
)
self.k8sclusterhelm2 = K8sHelmConnector(
kubectl_command=self.vca_config.get("kubectlpath"),
helm_command=self.vca_config.get("helmpath"),
log=self.logger,
on_update_db=None,
fs=self.fs,
db=self.db
)
self.k8sclusterhelm3 = K8sHelm3Connector(
kubectl_command=self.vca_config.get("kubectlpath"),
helm_command=self.vca_config.get("helm3path"),
fs=self.fs,
log=self.logger,
db=self.db,
on_update_db=None,
)
self.k8sclusterjuju = K8sJujuConnector(
kubectl_command=self.vca_config.get("kubectlpath"),
juju_command=self.vca_config.get("jujupath"),
log=self.logger,
loop=self.loop,
on_update_db=self._on_update_k8s_db,
vca_config=self.vca_config,
fs=self.fs,
db=self.db
)
self.k8scluster_map = {
"helm-chart": self.k8sclusterhelm2,
"helm-chart-v3": self.k8sclusterhelm3,
"chart": self.k8sclusterhelm3,
"juju-bundle": self.k8sclusterjuju,
"juju": self.k8sclusterjuju,
}
self.vca_map = {
"lxc_proxy_charm": self.n2vc,
"native_charm": self.n2vc,
"k8s_proxy_charm": self.n2vc,
"helm": self.conn_helm_ee,
"helm-v3": self.conn_helm_ee
}
self.prometheus = prometheus
# create RO client
self.RO = NgRoClient(self.loop, **self.ro_config)
@staticmethod
def increment_ip_mac(ip_mac, vm_index=1):
if not isinstance(ip_mac, str):
return ip_mac
try:
# try with ipv4 look for last dot
i = ip_mac.rfind(".")
if i > 0:
i += 1
return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
# try with ipv6 or mac look for last colon. Operate in hex
i = ip_mac.rfind(":")
if i > 0:
i += 1
# format in hex, len can be 2 for mac or 4 for ipv6
return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(ip_mac[:i], int(ip_mac[i:], 16) + vm_index)
except Exception:
pass
return None
def _on_update_ro_db(self, nsrs_id, ro_descriptor):
# self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
try:
# TODO filter RO descriptor fields...
# write to database
db_dict = dict()
# db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
db_dict['deploymentStatus'] = ro_descriptor
self.update_db_2("nsrs", nsrs_id, db_dict)
except Exception as e:
self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
async def _on_update_n2vc_db(self, table, filter, path, updated_data):
# remove last dot from path (if exists)
if path.endswith('.'):
path = path[:-1]
# self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
# .format(table, filter, path, updated_data))
try:
nsr_id = filter.get('_id')
# read ns record from database
nsr = self.db.get_one(table='nsrs', q_filter=filter)
current_ns_status = nsr.get('nsState')
# get vca status for NS
status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
# vcaStatus
db_dict = dict()
db_dict['vcaStatus'] = status_dict
await self.n2vc.update_vca_status(db_dict['vcaStatus'])
# update configurationStatus for this VCA
try:
vca_index = int(path[path.rfind(".")+1:])
vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
vca_status = vca_list[vca_index].get('status')
configuration_status_list = nsr.get('configurationStatus')
config_status = configuration_status_list[vca_index].get('status')
if config_status == 'BROKEN' and vca_status != 'failed':
db_dict['configurationStatus'][vca_index] = 'READY'
elif config_status != 'BROKEN' and vca_status == 'failed':
db_dict['configurationStatus'][vca_index] = 'BROKEN'
except Exception as e:
# not update configurationStatus
self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
# if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
# if nsState = 'DEGRADED' check if all is OK
is_degraded = False
if current_ns_status in ('READY', 'DEGRADED'):
error_description = ''
# check machines
if status_dict.get('machines'):
for machine_id in status_dict.get('machines'):
machine = status_dict.get('machines').get(machine_id)
# check machine agent-status
if machine.get('agent-status'):
s = machine.get('agent-status').get('status')
if s != 'started':
is_degraded = True
error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
# check machine instance status
if machine.get('instance-status'):
s = machine.get('instance-status').get('status')
if s != 'running':
is_degraded = True
error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
# check applications
if status_dict.get('applications'):
for app_id in status_dict.get('applications'):
app = status_dict.get('applications').get(app_id)
# check application status
if app.get('status'):
s = app.get('status').get('status')
if s != 'active':
is_degraded = True
error_description += 'application {} status={} ; '.format(app_id, s)
if error_description:
db_dict['errorDescription'] = error_description
if current_ns_status == 'READY' and is_degraded:
db_dict['nsState'] = 'DEGRADED'
if current_ns_status == 'DEGRADED' and not is_degraded:
db_dict['nsState'] = 'READY'
# write to database
self.update_db_2("nsrs", nsr_id, db_dict)
except (asyncio.CancelledError, asyncio.TimeoutError):
raise
except Exception as e:
self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
async def _on_update_k8s_db(self, cluster_uuid, kdu_instance, filter=None):
"""
Updating vca status in NSR record
:param cluster_uuid: UUID of a k8s cluster
:param kdu_instance: The unique name of the KDU instance
:param filter: To get nsr_id
:return: none
"""
# self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
# .format(cluster_uuid, kdu_instance, filter))
try:
nsr_id = filter.get('_id')
# get vca status for NS
vca_status = await self.k8sclusterjuju.status_kdu(cluster_uuid,
kdu_instance,
complete_status=True,
yaml_format=False)
# vcaStatus
db_dict = dict()
db_dict['vcaStatus'] = {nsr_id: vca_status}
await self.k8sclusterjuju.update_vca_status(db_dict['vcaStatus'], kdu_instance)
# write to database
self.update_db_2("nsrs", nsr_id, db_dict)
except (asyncio.CancelledError, asyncio.TimeoutError):
raise
except Exception as e:
self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
@staticmethod
def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
try:
env = Environment(undefined=StrictUndefined)
template = env.from_string(cloud_init_text)
return template.render(additional_params or {})
except UndefinedError as e:
raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
"file, must be provided in the instantiation parameters inside the "
"'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
except (TemplateError, TemplateNotFound) as e:
raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
format(vnfd_id, vdu_id, e))
def _get_vdu_cloud_init_content(self, vdu, vnfd):
cloud_init_content = cloud_init_file = None
try:
if vdu.get("cloud-init-file"):
base_folder = vnfd["_admin"]["storage"]
cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
vdu["cloud-init-file"])
with self.fs.file_open(cloud_init_file, "r") as ci_file:
cloud_init_content = ci_file.read()
elif vdu.get("cloud-init"):
cloud_init_content = vdu["cloud-init"]
return cloud_init_content
except FsException as e:
raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
format(vnfd["id"], vdu["id"], cloud_init_file, e))
def _get_vdu_additional_params(self, db_vnfr, vdu_id):
vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
additional_params = vdur.get("additionalParams")
return parse_yaml_strings(additional_params)
def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
"""
Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
:param vnfd: input vnfd
:param new_id: overrides vnf id if provided
:param additionalParams: Instantiation params for VNFs provided
:param nsrId: Id of the NSR
:return: copy of vnfd
"""
vnfd_RO = deepcopy(vnfd)
# remove unused by RO configuration, monitoring, scaling and internal keys
vnfd_RO.pop("_id", None)
vnfd_RO.pop("_admin", None)
vnfd_RO.pop("monitoring-param", None)
vnfd_RO.pop("scaling-group-descriptor", None)
vnfd_RO.pop("kdu", None)
vnfd_RO.pop("k8s-cluster", None)
if new_id:
vnfd_RO["id"] = new_id
# parse cloud-init or cloud-init-file with the provided variables using Jinja2
for vdu in get_iterable(vnfd_RO, "vdu"):
vdu.pop("cloud-init-file", None)
vdu.pop("cloud-init", None)
return vnfd_RO
@staticmethod
def ip_profile_2_RO(ip_profile):
RO_ip_profile = deepcopy(ip_profile)
if "dns-server" in RO_ip_profile:
if isinstance(RO_ip_profile["dns-server"], list):
RO_ip_profile["dns-address"] = []
for ds in RO_ip_profile.pop("dns-server"):
RO_ip_profile["dns-address"].append(ds['address'])
else:
RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
if RO_ip_profile.get("ip-version") == "ipv4":
RO_ip_profile["ip-version"] = "IPv4"
if RO_ip_profile.get("ip-version") == "ipv6":
RO_ip_profile["ip-version"] = "IPv6"
if "dhcp-params" in RO_ip_profile:
RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
return RO_ip_profile
def _get_ro_vim_id_for_vim_account(self, vim_account):
db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
if db_vim["_admin"]["operationalState"] != "ENABLED":
raise LcmException("VIM={} is not available. operationalState={}".format(
vim_account, db_vim["_admin"]["operationalState"]))
RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
return RO_vim_id
def get_ro_wim_id_for_wim_account(self, wim_account):
if isinstance(wim_account, str):
db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
if db_wim["_admin"]["operationalState"] != "ENABLED":
raise LcmException("WIM={} is not available. operationalState={}".format(
wim_account, db_wim["_admin"]["operationalState"]))
RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
return RO_wim_id
else:
return wim_account
def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
db_vdu_push_list = []
db_update = {"_admin.modified": time()}
if vdu_create:
for vdu_id, vdu_count in vdu_create.items():
vdur = next((vdur for vdur in reversed(db_vnfr["vdur"]) if vdur["vdu-id-ref"] == vdu_id), None)
if not vdur:
raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
format(vdu_id))
for count in range(vdu_count):
vdur_copy = deepcopy(vdur)
vdur_copy["status"] = "BUILD"
vdur_copy["status-detailed"] = None
vdur_copy["ip-address"]: None
vdur_copy["_id"] = str(uuid4())
vdur_copy["count-index"] += count + 1
vdur_copy["id"] = "{}-{}".format(vdur_copy["vdu-id-ref"], vdur_copy["count-index"])
vdur_copy.pop("vim_info", None)
for iface in vdur_copy["interfaces"]:
if iface.get("fixed-ip"):
iface["ip-address"] = self.increment_ip_mac(iface["ip-address"], count+1)
else:
iface.pop("ip-address", None)
if iface.get("fixed-mac"):
iface["mac-address"] = self.increment_ip_mac(iface["mac-address"], count+1)
else:
iface.pop("mac-address", None)
iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
db_vdu_push_list.append(vdur_copy)
# self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
if vdu_delete:
for vdu_id, vdu_count in vdu_delete.items():
if mark_delete:
indexes_to_delete = [iv[0] for iv in enumerate(db_vnfr["vdur"]) if iv[1]["vdu-id-ref"] == vdu_id]
db_update.update({"vdur.{}.status".format(i): "DELETING" for i in indexes_to_delete[-vdu_count:]})
else:
# it must be deleted one by one because common.db does not allow otherwise
vdus_to_delete = [v for v in reversed(db_vnfr["vdur"]) if v["vdu-id-ref"] == vdu_id]
for vdu in vdus_to_delete[:vdu_count]:
self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, None, pull={"vdur": {"_id": vdu["_id"]}})
db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
# modify passed dictionary db_vnfr
db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
db_vnfr["vdur"] = db_vnfr_["vdur"]
def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
"""
Updates database nsr with the RO info for the created vld
:param ns_update_nsr: dictionary to be filled with the updated info
:param db_nsr: content of db_nsr. This is also modified
:param nsr_desc_RO: nsr descriptor from RO
:return: Nothing, LcmException is raised on errors
"""
for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
for net_RO in get_iterable(nsr_desc_RO, "nets"):
if vld["id"] != net_RO.get("ns_net_osm_id"):
continue
vld["vim-id"] = net_RO.get("vim_net_id")
vld["name"] = net_RO.get("vim_name")
vld["status"] = net_RO.get("status")
vld["status-detailed"] = net_RO.get("error_msg")
ns_update_nsr["vld.{}".format(vld_index)] = vld
break
else:
raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
def set_vnfr_at_error(self, db_vnfrs, error_text):
try:
for db_vnfr in db_vnfrs.values():
vnfr_update = {"status": "ERROR"}
for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
if "status" not in vdur:
vdur["status"] = "ERROR"
vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
if error_text:
vdur["status-detailed"] = str(error_text)
vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
except DbException as e:
self.logger.error("Cannot update vnf. {}".format(e))
def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
"""
Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
:param db_vnfrs: dictionary with member-vnf-index: vnfr-content
:param nsr_desc_RO: nsr descriptor from RO
:return: Nothing, LcmException is raised on errors
"""
for vnf_index, db_vnfr in db_vnfrs.items():
for vnf_RO in nsr_desc_RO["vnfs"]:
if vnf_RO["member_vnf_index"] != vnf_index:
continue
vnfr_update = {}
if vnf_RO.get("ip_address"):
db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
elif not db_vnfr.get("ip-address"):
if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
vdur_RO_count_index = 0
if vdur.get("pdu-type"):
continue
for vdur_RO in get_iterable(vnf_RO, "vms"):
if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
continue
if vdur["count-index"] != vdur_RO_count_index:
vdur_RO_count_index += 1
continue
vdur["vim-id"] = vdur_RO.get("vim_vm_id")
if vdur_RO.get("ip_address"):
vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
else:
vdur["ip-address"] = None
vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
vdur["name"] = vdur_RO.get("vim_name")
vdur["status"] = vdur_RO.get("status")
vdur["status-detailed"] = vdur_RO.get("error_msg")
for ifacer in get_iterable(vdur, "interfaces"):
for interface_RO in get_iterable(vdur_RO, "interfaces"):
if ifacer["name"] == interface_RO.get("internal_name"):
ifacer["ip-address"] = interface_RO.get("ip_address")
ifacer["mac-address"] = interface_RO.get("mac_address")
break
else:
raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
"from VIM info"
.format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
vnfr_update["vdur.{}".format(vdu_index)] = vdur
break
else:
raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
"VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
for net_RO in get_iterable(nsr_desc_RO, "nets"):
if vld["id"] != net_RO.get("vnf_net_osm_id"):
continue
vld["vim-id"] = net_RO.get("vim_net_id")
vld["name"] = net_RO.get("vim_name")
vld["status"] = net_RO.get("status")
vld["status-detailed"] = net_RO.get("error_msg")
vnfr_update["vld.{}".format(vld_index)] = vld
break
else:
raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
vnf_index, vld["id"]))
self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
break
else:
raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
def _get_ns_config_info(self, nsr_id):
"""
Generates a mapping between vnf,vdu elements and the N2VC id
:param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
:return: a dictionary with {osm-config-mapping: {}} where its element contains:
"<member-vnf-index>": <N2VC-id> for a vnf configuration, or
"<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
"""
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
mapping = {}
ns_config_info = {"osm-config-mapping": mapping}
for vca in vca_deployed_list:
if not vca["member-vnf-index"]:
continue
if not vca["vdu_id"]:
mapping[vca["member-vnf-index"]] = vca["application"]
else:
mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
vca["application"]
return ns_config_info
async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
db_vims = {}
def get_vim_account(vim_account_id):
nonlocal db_vims
if vim_account_id in db_vims:
return db_vims[vim_account_id]
db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
db_vims[vim_account_id] = db_vim
return db_vim
# modify target_vld info with instantiation parameters
def parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn):
if vld_params.get("ip-profile"):
target_vld["vim_info"][target_vim]["ip_profile"] = vld_params["ip-profile"]
if vld_params.get("provider-network"):
target_vld["vim_info"][target_vim]["provider_network"] = vld_params["provider-network"]
if "sdn-ports" in vld_params["provider-network"] and target_sdn:
target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params["provider-network"]["sdn-ports"]
if vld_params.get("wimAccountId"):
target_wim = "wim:{}".format(vld_params["wimAccountId"])
target_vld["vim_info"][target_wim] = {}
for param in ("vim-network-name", "vim-network-id"):
if vld_params.get(param):
if isinstance(vld_params[param], dict):
for vim, vim_net in vld_params[param].items():
other_target_vim = "vim:" + vim
populate_dict(target_vld["vim_info"], (other_target_vim, param.replace("-", "_")), vim_net)
else: # isinstance str
target_vld["vim_info"][target_vim][param.replace("-", "_")] = vld_params[param]
if vld_params.get("common_id"):
target_vld["common_id"] = vld_params.get("common_id")
nslcmop_id = db_nslcmop["_id"]
target = {
"name": db_nsr["name"],
"ns": {"vld": []},
"vnf": [],
"image": deepcopy(db_nsr["image"]),
"flavor": deepcopy(db_nsr["flavor"]),
"action_id": nslcmop_id,
"cloud_init_content": {},
}
for image in target["image"]:
image["vim_info"] = {}
for flavor in target["flavor"]:
flavor["vim_info"] = {}
if db_nslcmop.get("lcmOperationType") != "instantiate":
# get parameters of instantiation:
db_nslcmop_instantiate = self.db.get_list("nslcmops", {"nsInstanceId": db_nslcmop["nsInstanceId"],
"lcmOperationType": "instantiate"})[-1]
ns_params = db_nslcmop_instantiate.get("operationParams")
else:
ns_params = db_nslcmop.get("operationParams")
ssh_keys_instantiation = ns_params.get("ssh_keys") or []
ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
cp2target = {}
for vld_index, vld in enumerate(db_nsr.get("vld")):
target_vim = "vim:{}".format(ns_params["vimAccountId"])
target_vld = {
"id": vld["id"],
"name": vld["name"],
"mgmt-network": vld.get("mgmt-network", False),
"type": vld.get("type"),
"vim_info": {
target_vim: {
"vim_network_name": vld.get("vim-network-name"),
"vim_account_id": ns_params["vimAccountId"]
}
}
}
# check if this network needs SDN assist
if vld.get("pci-interfaces"):
db_vim = get_vim_account(ns_params["vimAccountId"])
sdnc_id = db_vim["config"].get("sdn-controller")
if sdnc_id:
sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
target_sdn = "sdn:{}".format(sdnc_id)
target_vld["vim_info"][target_sdn] = {
"sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
nsd_vnf_profiles = get_vnf_profiles(nsd)
for nsd_vnf_profile in nsd_vnf_profiles:
for cp in nsd_vnf_profile["virtual-link-connectivity"]:
if cp["virtual-link-profile-id"] == vld["id"]:
cp2target["member_vnf:{}.{}".format(
cp["constituent-cpd-id"][0]["constituent-base-element-id"],
cp["constituent-cpd-id"][0]["constituent-cpd-id"]
)] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
# check at nsd descriptor, if there is an ip-profile
vld_params = {}
virtual_link_profiles = get_virtual_link_profiles(nsd)
for vlp in virtual_link_profiles:
ip_profile = find_in_list(nsd["ip-profiles"],
lambda profile: profile["name"] == vlp["ip-profile-ref"])
vld_params["ip-profile"] = ip_profile["ip-profile-params"]
# update vld_params with instantiation params
vld_instantiation_params = find_in_list(get_iterable(ns_params, "vld"),
lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]))
if vld_instantiation_params:
vld_params.update(vld_instantiation_params)
parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
target["ns"]["vld"].append(target_vld)
for vnfr in db_vnfrs.values():
vnfd = find_in_list(db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"])
vnf_params = find_in_list(get_iterable(ns_params, "vnf"),
lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"])
target_vnf = deepcopy(vnfr)
target_vim = "vim:{}".format(vnfr["vim-account-id"])
for vld in target_vnf.get("vld", ()):
# check if connected to a ns.vld, to fill target'
vnf_cp = find_in_list(vnfd.get("int-virtual-link-desc", ()),
lambda cpd: cpd.get("id") == vld["id"])
if vnf_cp:
ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
if cp2target.get(ns_cp):
vld["target"] = cp2target[ns_cp]
vld["vim_info"] = {target_vim: {"vim_network_name": vld.get("vim-network-name")}}
# check if this network needs SDN assist
target_sdn = None
if vld.get("pci-interfaces"):
db_vim = get_vim_account(vnfr["vim-account-id"])
sdnc_id = db_vim["config"].get("sdn-controller")
if sdnc_id:
sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
target_sdn = "sdn:{}".format(sdnc_id)
vld["vim_info"][target_sdn] = {
"sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
# check at vnfd descriptor, if there is an ip-profile
vld_params = {}
vnfd_vlp = find_in_list(
get_virtual_link_profiles(vnfd),
lambda a_link_profile: a_link_profile["id"] == vld["id"]
)
if vnfd_vlp and vnfd_vlp.get("virtual-link-protocol-data") and \
vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data"):
ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"]["l3-protocol-data"]
ip_profile_dest_data = {}
if "ip-version" in ip_profile_source_data:
ip_profile_dest_data["ip-version"] = ip_profile_source_data["ip-version"]
if "cidr" in ip_profile_source_data:
ip_profile_dest_data["subnet-address"] = ip_profile_source_data["cidr"]
if "gateway-ip" in ip_profile_source_data:
ip_profile_dest_data["gateway-address"] = ip_profile_source_data["gateway-ip"]
if "dhcp-enabled" in ip_profile_source_data:
ip_profile_dest_data["dhcp-params"] = {
"enabled": ip_profile_source_data["dhcp-enabled"]
}
vld_params["ip-profile"] = ip_profile_dest_data
# update vld_params with instantiation params
if vnf_params:
vld_instantiation_params = find_in_list(get_iterable(vnf_params, "internal-vld"),
lambda i_vld: i_vld["name"] == vld["id"])
if vld_instantiation_params:
vld_params.update(vld_instantiation_params)
parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
vdur_list = []
for vdur in target_vnf.get("vdur", ()):
if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
continue # This vdu must not be created
vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
if ssh_keys_all:
vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
vnf_configuration = get_configuration(vnfd, vnfd["id"])
if vdu_configuration and vdu_configuration.get("config-access") and \
vdu_configuration.get("config-access").get("ssh-access"):
vdur["ssh-keys"] = ssh_keys_all
vdur["ssh-access-required"] = vdu_configuration["config-access"]["ssh-access"]["required"]
elif vnf_configuration and vnf_configuration.get("config-access") and \
vnf_configuration.get("config-access").get("ssh-access") and \
any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
vdur["ssh-keys"] = ssh_keys_all
vdur["ssh-access-required"] = vnf_configuration["config-access"]["ssh-access"]["required"]
elif ssh_keys_instantiation and \
find_in_list(vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")):
vdur["ssh-keys"] = ssh_keys_instantiation
self.logger.debug("NS > vdur > {}".format(vdur))
vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
# cloud-init
if vdud.get("cloud-init-file"):
vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
# read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
if vdur["cloud-init"] not in target["cloud_init_content"]:
base_folder = vnfd["_admin"]["storage"]
cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
vdud.get("cloud-init-file"))
with self.fs.file_open(cloud_init_file, "r") as ci_file:
target["cloud_init_content"][vdur["cloud-init"]] = ci_file.read()
elif vdud.get("cloud-init"):
vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"]))
# put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
target["cloud_init_content"][vdur["cloud-init"]] = vdud["cloud-init"]
vdur["additionalParams"] = vdur.get("additionalParams") or {}
deploy_params_vdu = self._format_additional_params(vdur.get("additionalParams") or {})
deploy_params_vdu["OSM"] = get_osm_params(vnfr, vdur["vdu-id-ref"], vdur["count-index"])
vdur["additionalParams"] = deploy_params_vdu
# flavor
ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
if target_vim not in ns_flavor["vim_info"]:
ns_flavor["vim_info"][target_vim] = {}
# deal with images
# in case alternative images are provided we must check if they should be applied
# for the vim_type, modify the vim_type taking into account
ns_image_id = int(vdur["ns-image-id"])
if vdur.get("alt-image-ids"):
db_vim = get_vim_account(vnfr["vim-account-id"])
vim_type = db_vim["vim_type"]
for alt_image_id in vdur.get("alt-image-ids"):
ns_alt_image = target["image"][int(alt_image_id)]
if vim_type == ns_alt_image.get("vim-type"):
# must use alternative image
self.logger.debug("use alternative image id: {}".format(alt_image_id))
ns_image_id = alt_image_id
vdur["ns-image-id"] = ns_image_id
break
ns_image = target["image"][int(ns_image_id)]
if target_vim not in ns_image["vim_info"]:
ns_image["vim_info"][target_vim] = {}
vdur["vim_info"] = {target_vim: {}}
# instantiation parameters
# if vnf_params:
# vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
# vdud["id"]), None)
vdur_list.append(vdur)
target_vnf["vdur"] = vdur_list
target["vnf"].append(target_vnf)
desc = await self.RO.deploy(nsr_id, target)
self.logger.debug("RO return > {}".format(desc))
action_id = desc["action_id"]
await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
# Updating NSR
db_nsr_update = {
"_admin.deployed.RO.operational-status": "running",
"detailed-status": " ".join(stage)
}
# db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
return
async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id=None, start_time=None, timeout=600, stage=None):
detailed_status_old = None
db_nsr_update = {}
start_time = start_time or time()
while time() <= start_time + timeout:
desc_status = await self.RO.status(nsr_id, action_id)
self.logger.debug("Wait NG RO > {}".format(desc_status))
if desc_status["status"] == "FAILED":
raise NgRoException(desc_status["details"])
elif desc_status["status"] == "BUILD":
if stage:
stage[2] = "VIM: ({})".format(desc_status["details"])
elif desc_status["status"] == "DONE":
if stage:
stage[2] = "Deployed at VIM"
break
else:
assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
if stage and nslcmop_id and stage[2] != detailed_status_old:
detailed_status_old = stage[2]
db_nsr_update["detailed-status"] = " ".join(stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
await asyncio.sleep(15, loop=self.loop)
else: # timeout_ns_deploy
raise NgRoException("Timeout waiting ns to deploy")
async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
db_nsr_update = {}
failed_detail = []
action_id = None
start_deploy = time()
try:
target = {
"ns": {"vld": []},
"vnf": [],
"image": [],
"flavor": [],
"action_id": nslcmop_id
}
desc = await self.RO.deploy(nsr_id, target)
action_id = desc["action_id"]
db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
# wait until done
delete_timeout = 20 * 60 # 20 minutes
await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
# delete all nsr
await self.RO.delete(nsr_id)
except Exception as e:
if isinstance(e, NgRoException) and e.http_code == 404: # not found
db_nsr_update["_admin.deployed.RO.nsr_id"] = None
db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
failed_detail.append("delete conflict: {}".format(e))
self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
else:
failed_detail.append("delete error: {}".format(e))
self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
if failed_detail:
stage[2] = "Error deleting from VIM"
else:
stage[2] = "Deleted from VIM"
db_nsr_update["detailed-status"] = " ".join(stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
if failed_detail:
raise LcmException("; ".join(failed_detail))
return
async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
n2vc_key_list, stage):
"""
Instantiate at RO
:param logging_text: preffix text to use at logging
:param nsr_id: nsr identity
:param nsd: database content of ns descriptor
:param db_nsr: database content of ns record
:param db_nslcmop: database content of ns operation, in this case, 'instantiate'
:param db_vnfrs:
:param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
:param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
:param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
:return: None or exception
"""
try:
start_deploy = time()
ns_params = db_nslcmop.get("operationParams")
if ns_params and ns_params.get("timeout_ns_deploy"):
timeout_ns_deploy = ns_params["timeout_ns_deploy"]
else:
timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
# Check for and optionally request placement optimization. Database will be updated if placement activated
stage[2] = "Waiting for Placement."
if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
# in case of placement change ns_params[vimAcountId) if not present at any vnfrs
for vnfr in db_vnfrs.values():
if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
break
else:
ns_params["vimAccountId"] == vnfr["vim-account-id"]
return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
db_vnfds, n2vc_key_list, stage, start_deploy, timeout_ns_deploy)
except Exception as e:
stage[2] = "ERROR deploying at VIM"
self.set_vnfr_at_error(db_vnfrs, str(e))
self.logger.error("Error deploying at VIM {}".format(e),
exc_info=not isinstance(e, (ROclient.ROClientException, LcmException, DbException,
NgRoException)))
raise
async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
"""
Wait for kdu to be up, get ip address
:param logging_text: prefix use for logging
:param nsr_id:
:param vnfr_id:
:param kdu_name:
:return: IP address
"""
# self.logger.debug(logging_text + "Starting wait_kdu_up")
nb_tries = 0
while nb_tries < 360:
db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
if not kdur:
raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
if kdur.get("status"):
if kdur["status"] in ("READY", "ENABLED"):
return kdur.get("ip-address")
else:
raise LcmException("target KDU={} is in error state".format(kdu_name))
await asyncio.sleep(10, loop=self.loop)
nb_tries += 1
raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
"""
Wait for ip addres at RO, and optionally, insert public key in virtual machine
:param logging_text: prefix use for logging
:param nsr_id:
:param vnfr_id:
:param vdu_id:
:param vdu_index:
:param pub_key: public ssh key to inject, None to skip
:param user: user to apply the public ssh key
:return: IP address
"""
self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
ro_nsr_id = None
ip_address = None
nb_tries = 0
target_vdu_id = None
ro_retries = 0
while True:
ro_retries += 1
if ro_retries >= 360: # 1 hour
raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
await asyncio.sleep(10, loop=self.loop)
# get ip address
if not target_vdu_id:
db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
if not vdu_id: # for the VNF case
if db_vnfr.get("status") == "ERROR":
raise LcmException("Cannot inject ssh-key because target VNF is in error state")
ip_address = db_vnfr.get("ip-address")
if not ip_address:
continue
vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
else: # VDU case
vdur = next((x for x in get_iterable(db_vnfr, "vdur")
if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
vdur = db_vnfr["vdur"][0]
if not vdur:
raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
vdu_index))
# New generation RO stores information at "vim_info"
ng_ro_status = None
target_vim = None
if vdur.get("vim_info"):
target_vim = next(t for t in vdur["vim_info"]) # there should be only one key
ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE" or ng_ro_status == "ACTIVE":
ip_address = vdur.get("ip-address")
if not ip_address:
continue
target_vdu_id = vdur["vdu-id-ref"]
elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
raise LcmException("Cannot inject ssh-key because target VM is in error state")
if not target_vdu_id:
continue
# inject public key into machine
if pub_key and user:
self.logger.debug(logging_text + "Inserting RO key")
self.logger.debug("SSH > PubKey > {}".format(pub_key))
if vdur.get("pdu-type"):
self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
return ip_address
try:
ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
if self.ng_ro:
target = {"action": {"action": "inject_ssh_key", "key": pub_key, "user": user},
"vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
}
desc = await self.RO.deploy(nsr_id, target)
action_id = desc["action_id"]
await self._wait_ng_ro(nsr_id, action_id, timeout=600)
break
else:
# wait until NS is deployed at RO
if not ro_nsr_id:
db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
if not ro_nsr_id:
continue
result_dict = await self.RO.create_action(
item="ns",
item_id_name=ro_nsr_id,
descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
)
# result_dict contains the format {VM-id: {vim_result: 200, description: text}}
if not result_dict or not isinstance(result_dict, dict):
raise LcmException("Unknown response from RO when injecting key")
for result in result_dict.values():
if result.get("vim_result") == 200:
break
else:
raise ROclient.ROClientException("error injecting key: {}".format(
result.get("description")))
break
except NgRoException as e:
raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
except ROclient.ROClientException as e:
if not nb_tries:
self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
format(e, 20*10))
nb_tries += 1
if nb_tries >= 20:
raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
else:
break
return ip_address
async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
"""
Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
"""
my_vca = vca_deployed_list[vca_index]
if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
# vdu or kdu: no dependencies
return
timeout = 300
while timeout >= 0:
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
configuration_status_list = db_nsr["configurationStatus"]
for index, vca_deployed in enumerate(configuration_status_list):
if index == vca_index:
# myself
continue
if not my_vca.get("member-vnf-index") or \
(vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
internal_status = configuration_status_list[index].get("status")
if internal_status == 'READY':
continue
elif internal_status == 'BROKEN':
raise LcmException("Configuration aborted because dependent charm/s has failed")
else:
break
else:
# no dependencies, return
return
await asyncio.sleep(10)
timeout -= 1
raise LcmException("Configuration aborted because dependent charm/s timeout")
async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
ee_config_descriptor):
nsr_id = db_nsr["_id"]
db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
db_dict = {
'collection': 'nsrs',
'filter': {'_id': nsr_id},
'path': db_update_entry
}
step = ""
try:
element_type = 'NS'
element_under_configuration = nsr_id
vnfr_id = None
if db_vnfr:
vnfr_id = db_vnfr["_id"]
osm_config["osm"]["vnf_id"] = vnfr_id
namespace = "{nsi}.{ns}".format(
nsi=nsi_id if nsi_id else "",
ns=nsr_id)
if vnfr_id:
element_type = 'VNF'
element_under_configuration = vnfr_id
namespace += ".{}-{}".format(vnfr_id, vdu_index or 0)
if vdu_id:
namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
element_type = 'VDU'
element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
osm_config["osm"]["vdu_id"] = vdu_id
elif kdu_name:
namespace += ".{}.{}".format(kdu_name, vdu_index or 0)
element_type = 'KDU'
element_under_configuration = kdu_name
osm_config["osm"]["kdu_name"] = kdu_name
# Get artifact path
artifact_path = "{}/{}/{}/{}".format(
base_folder["folder"],
base_folder["pkg-dir"],
"charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
vca_name
)
self.logger.debug("Artifact path > {}".format(artifact_path))
# get initial_config_primitive_list that applies to this element
initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
self.logger.debug("Initial config primitive list > {}".format(initial_config_primitive_list))
# add config if not present for NS charm
ee_descriptor_id = ee_config_descriptor.get("id")
self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list,
vca_deployed, ee_descriptor_id)
self.logger.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list))
# n2vc_redesign STEP 3.1
# find old ee_id if exists
ee_id = vca_deployed.get("ee_id")
vim_account_id = (
deep_get(db_vnfr, ("vim-account-id",)) or
deep_get(deploy_params, ("OSM", "vim_account_id"))
)
vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id)
vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id)
# create or register execution environment in VCA
if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
self._write_configuration_status(
nsr_id=nsr_id,
vca_index=vca_index,
status='CREATING',
element_under_configuration=element_under_configuration,
element_type=element_type
)
step = "create execution environment"
self.logger.debug(logging_text + step)
ee_id = None
credentials = None
if vca_type == "k8s_proxy_charm":
ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
charm_name=artifact_path[artifact_path.rfind("/") + 1:],
namespace=namespace,
artifact_path=artifact_path,
db_dict=db_dict,
cloud_name=vca_k8s_cloud,
credential_name=vca_k8s_cloud_credential,
)
elif vca_type == "helm" or vca_type == "helm-v3":
ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
namespace=namespace,
reuse_ee_id=ee_id,
db_dict=db_dict,
config=osm_config,
artifact_path=artifact_path,
vca_type=vca_type
)
else:
ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
namespace=namespace,
reuse_ee_id=ee_id,
db_dict=db_dict,
cloud_name=vca_cloud,
credential_name=vca_cloud_credential,
)
elif vca_type == "native_charm":
step = "Waiting to VM being up and getting IP address"
self.logger.debug(logging_text + step)
rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
user=None, pub_key=None)
credentials = {"hostname": rw_mgmt_ip}
# get username
username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
# TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
# merged. Meanwhile let's get username from initial-config-primitive
if not username and initial_config_primitive_list:
for config_primitive in initial_config_primitive_list:
for param in config_primitive.get("parameter", ()):
if param["name"] == "ssh-username":
username = param["value"]
break
if not username:
raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
"'config-access.ssh-access.default-user'")
credentials["username"] = username
# n2vc_redesign STEP 3.2
self._write_configuration_status(
nsr_id=nsr_id,
vca_index=vca_index,
status='REGISTERING',
element_under_configuration=element_under_configuration,
element_type=element_type
)
step = "register execution environment {}".format(credentials)
self.logger.debug(logging_text + step)
ee_id = await self.vca_map[vca_type].register_execution_environment(
credentials=credentials,
namespace=namespace,
db_dict=db_dict,
cloud_name=vca_cloud,
credential_name=vca_cloud_credential,
)
# for compatibility with MON/POL modules, the need model and application name at database
# TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
ee_id_parts = ee_id.split('.')
db_nsr_update = {db_update_entry + "ee_id": ee_id}
if len(ee_id_parts) >= 2:
model_name = ee_id_parts[0]
application_name = ee_id_parts[1]
db_nsr_update[db_update_entry + "model"] = model_name
db_nsr_update[db_update_entry + "application"] = application_name
# n2vc_redesign STEP 3.3
step = "Install configuration Software"
self._write_configuration_status(
nsr_id=nsr_id,
vca_index=vca_index,
status='INSTALLING SW',
element_under_configuration=element_under_configuration,
element_type=element_type,
other_update=db_nsr_update
)
# TODO check if already done
self.logger.debug(logging_text + step)
config = None
if vca_type == "native_charm":
config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
if config_primitive:
config = self._map_primitive_params(
config_primitive,
{},
deploy_params
)
num_units = 1
if vca_type == "lxc_proxy_charm":
if element_type == "NS":
num_units = db_nsr.get("config-units") or 1
elif element_type == "VNF":
num_units = db_vnfr.get("config-units") or 1
elif element_type == "VDU":
for v in db_vnfr["vdur"]:
if vdu_id == v["vdu-id-ref"]:
num_units = v.get("config-units") or 1
break
if vca_type != "k8s_proxy_charm":
await self.vca_map[vca_type].install_configuration_sw(
ee_id=ee_id,
artifact_path=artifact_path,
db_dict=db_dict,
config=config,
num_units=num_units,
)
# write in db flag of configuration_sw already installed
self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
# add relations for this VCA (wait for other peers related with this VCA)
await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
vca_index=vca_index, vca_type=vca_type)
# if SSH access is required, then get execution environment SSH public
# if native charm we have waited already to VM be UP
if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
pub_key = None
user = None
# self.logger.debug("get ssh key block")
if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
# self.logger.debug("ssh key needed")
# Needed to inject a ssh key
user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
step = "Install configuration Software, getting public ssh key"
pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
else:
# self.logger.debug("no need to get ssh key")
step = "Waiting to VM being up and getting IP address"
self.logger.debug(logging_text + step)
# n2vc_redesign STEP 5.1
# wait for RO (ip-address) Insert pub_key into VM
if vnfr_id:
if kdu_name:
rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
else:
rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
vdu_index, user=user, pub_key=pub_key)
else:
rw_mgmt_ip = None # This is for a NS configuration
self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
# store rw_mgmt_ip in deploy params for later replacement
deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
# n2vc_redesign STEP 6 Execute initial config primitive
step = 'execute initial config primitive'
# wait for dependent primitives execution (NS -> VNF -> VDU)
if initial_config_primitive_list:
await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
# stage, in function of element type: vdu, kdu, vnf or ns
my_vca = vca_deployed_list[vca_index]
if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
# VDU or KDU
stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
elif my_vca.get("member-vnf-index"):
# VNF
stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
else:
# NS
stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
self._write_configuration_status(
nsr_id=nsr_id,
vca_index=vca_index,
status='EXECUTING PRIMITIVE'
)
self._write_op_status(
op_id=nslcmop_id,
stage=stage
)
check_if_terminated_needed = True
for initial_config_primitive in initial_config_primitive_list:
# adding information on the vca_deployed if it is a NS execution environment
if not vca_deployed["member-vnf-index"]:
deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
# TODO check if already done
primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
self.logger.debug(logging_text + step)
await self.vca_map[vca_type].exec_primitive(
ee_id=ee_id,
primitive_name=initial_config_primitive["name"],
params_dict=primitive_params_,
db_dict=db_dict
)
# Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
if check_if_terminated_needed:
if config_descriptor.get('terminate-config-primitive'):
self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
check_if_terminated_needed = False
# TODO register in database that primitive is done
# STEP 7 Configure metrics
if vca_type == "helm" or vca_type == "helm-v3":
prometheus_jobs = await self.add_prometheus_metrics(
ee_id=ee_id,
artifact_path=artifact_path,
ee_config_descriptor=ee_config_descriptor,
vnfr_id=vnfr_id,
nsr_id=nsr_id,
target_ip=rw_mgmt_ip,
)
if prometheus_jobs:
self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
step = "instantiated at VCA"
self.logger.debug(logging_text + step)
self._write_configuration_status(
nsr_id=nsr_id,
vca_index=vca_index,
status='READY'
)
except Exception as e: # TODO not use Exception but N2VC exception
# self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
self._write_configuration_status(
nsr_id=nsr_id,
vca_index=vca_index,
status='BROKEN'
)
raise LcmException("{} {}".format(step, e)) from e
def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
error_description: str = None, error_detail: str = None, other_update: dict = None):
"""
Update db_nsr fields.
:param nsr_id:
:param ns_state:
:param current_operation:
:param current_operation_id:
:param error_description:
:param error_detail:
:param other_update: Other required changes at database if provided, will be cleared
:return:
"""
try:
db_dict = other_update or {}
db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
db_dict["_admin.current-operation"] = current_operation_id
db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
db_dict["currentOperation"] = current_operation
db_dict["currentOperationID"] = current_operation_id
db_dict["errorDescription"] = error_description
db_dict["errorDetail"] = error_detail
if ns_state:
db_dict["nsState"] = ns_state
self.update_db_2("nsrs", nsr_id, db_dict)
except DbException as e:
self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
operation_state: str = None, other_update: dict = None):
try:
db_dict = other_update or {}
db_dict['queuePosition'] = queuePosition
if isinstance(stage, list):
db_dict['stage'] = stage[0]
db_dict['detailed-status'] = " ".join(stage)
elif stage is not None:
db_dict['stage'] = str(stage)
if error_message is not None:
db_dict['errorMessage'] = error_message
if operation_state is not None:
db_dict['operationState'] = operation_state
db_dict["statusEnteredTime"] = time()
self.update_db_2("nslcmops", op_id, db_dict)
except DbException as e:
self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
def _write_all_config_status(self, db_nsr: dict, status: str):
try:
nsr_id = db_nsr["_id"]
# configurationStatus
config_status = db_nsr.get('configurationStatus')
if config_status:
db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
enumerate(config_status) if v}
# update status
self.update_db_2("nsrs", nsr_id, db_nsr_update)
except DbException as e:
self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
element_under_configuration: str = None, element_type: str = None,
other_update: dict = None):
# self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
# .format(vca_index, status))
try:
db_path = 'configurationStatus.{}.'.format(vca_index)
db_dict = other_update or {}
if status:
db_dict[db_path + 'status'] = status
if element_under_configuration:
db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
if element_type:
db_dict[db_path + 'elementType'] = element_type
self.update_db_2("nsrs", nsr_id, db_dict)
except DbException as e:
self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
.format(status, nsr_id, vca_index, e))
async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
"""
Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
Database is used because the result can be obtained from a different LCM worker in case of HA.
:param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
:param db_nslcmop: database content of nslcmop
:param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
:return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
computed 'vim-account-id'
"""
modified = False
nslcmop_id = db_nslcmop['_id']
placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
if placement_engine == "PLA":
self.logger.debug(logging_text + "Invoke and wait for placement optimization")
await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
db_poll_interval = 5
wait = db_poll_interval * 10
pla_result = None
while not pla_result and wait >= 0:
await asyncio.sleep(db_poll_interval)
wait -= db_poll_interval
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
if not pla_result:
raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
for pla_vnf in pla_result['vnf']:
vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
if not pla_vnf.get('vimAccountId') or not vnfr:
continue
modified = True
self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
# Modifies db_vnfrs
vnfr["vim-account-id"] = pla_vnf['vimAccountId']
return modified
def update_nsrs_with_pla_result(self, params):
try:
nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
except Exception as e:
self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
async def instantiate(self, nsr_id, nslcmop_id):
"""
:param nsr_id: ns instance to deploy
:param nslcmop_id: operation to run
:return:
"""
# Try to lock HA task here
task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
if not task_is_locked_by_me:
self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
return
logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
# get all needed from database
# database nsrs record
db_nsr = None
# database nslcmops record
db_nslcmop = None
# update operation on nsrs
db_nsr_update = {}
# update operation on nslcmops
db_nslcmop_update = {}
nslcmop_operation_state = None
db_vnfrs = {} # vnf's info indexed by member-index
# n2vc_info = {}
tasks_dict_info = {} # from task to info text
exc = None
error_list = []
stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
# ^ stage, step, VIM progress
try:
# wait for any previous tasks in process
await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
stage[1] = "Sync filesystem from database."
self.fs.sync() # TODO, make use of partial sync, only for the needed packages
# STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
stage[1] = "Reading from database."
# nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
db_nsr_update["detailed-status"] = "creating"
db_nsr_update["operational-status"] = "init"
self._write_ns_status(
nsr_id=nsr_id,
ns_state="BUILDING",
current_operation="INSTANTIATING",
current_operation_id=nslcmop_id,
other_update=db_nsr_update
)
self._write_op_status(
op_id=nslcmop_id,
stage=stage,
queuePosition=0
)
# read from db: operation
stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
ns_params = db_nslcmop.get("operationParams")
if ns_params and ns_params.get("timeout_ns_deploy"):
timeout_ns_deploy = ns_params["timeout_ns_deploy"]
else:
timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
# read from db: ns
stage[1] = "Getting nsr={} from db.".format(nsr_id)
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
db_nsr["nsd"] = nsd
# nsr_name = db_nsr["name"] # TODO short-name??
# read from db: vnf's of this ns
stage[1] = "Getting vnfrs from db."
self.logger.debug(logging_text + stage[1])
db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
# read from db: vnfd's for every vnf
db_vnfds = [] # every vnfd data
# for each vnf in ns, read vnfd
for vnfr in db_vnfrs_list:
db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
vnfd_id = vnfr["vnfd-id"]
vnfd_ref = vnfr["vnfd-ref"]
# if we haven't this vnfd, read it from db
if vnfd_id not in db_vnfds:
# read from db
stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
self.logger.debug(logging_text + stage[1])
vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
# store vnfd
db_vnfds.append(vnfd)
# Get or generates the _admin.deployed.VCA list
vca_deployed_list = None
if db_nsr["_admin"].get("deployed"):
vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
if vca_deployed_list is None:
vca_deployed_list = []
configuration_status_list = []
db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
db_nsr_update["configurationStatus"] = configuration_status_list
# add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
elif isinstance(vca_deployed_list, dict):
# maintain backward compatibility. Change a dict to list at database
vca_deployed_list = list(vca_deployed_list.values())
db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
db_nsr_update["_admin.deployed.RO.vnfd"] = []
# set state to INSTANTIATED. When instantiated NBI will not delete directly
db_nsr_update["_admin.nsState"] = "INSTANTIATED"
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
# n2vc_redesign STEP 2 Deploy Network Scenario
stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
self._write_op_status(
op_id=nslcmop_id,
stage=stage
)
stage[1] = "Deploying KDUs."
# self.logger.debug(logging_text + "Before deploy_kdus")
# Call to deploy_kdus in case exists the "vdu:kdu" param
await self.deploy_kdus(
logging_text=logging_text,
nsr_id=nsr_id,
nslcmop_id=nslcmop_id,
db_vnfrs=db_vnfrs,
db_vnfds=db_vnfds,
task_instantiation_info=tasks_dict_info,
)
stage[1] = "Getting VCA public key."
# n2vc_redesign STEP 1 Get VCA public ssh-key
# feature 1429. Add n2vc public key to needed VMs
n2vc_key = self.n2vc.get_public_key()
n2vc_key_list = [n2vc_key]
if self.vca_config.get("public_key"):
n2vc_key_list.append(self.vca_config["public_key"])
stage[1] = "Deploying NS at VIM."
task_ro = asyncio.ensure_future(
self.instantiate_RO(
logging_text=logging_text,
nsr_id=nsr_id,
nsd=nsd,
db_nsr=db_nsr,
db_nslcmop=db_nslcmop,
db_vnfrs=db_vnfrs,
db_vnfds=db_vnfds,
n2vc_key_list=n2vc_key_list,
stage=stage
)
)
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
tasks_dict_info[task_ro] = "Deploying at VIM"
# n2vc_redesign STEP 3 to 6 Deploy N2VC
stage[1] = "Deploying Execution Environments."
self.logger.debug(logging_text + stage[1])
nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
for vnf_profile in get_vnf_profiles(nsd):
vnfd_id = vnf_profile["vnfd-id"]
vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
member_vnf_index = str(vnf_profile["id"])
db_vnfr = db_vnfrs[member_vnf_index]
base_folder = vnfd["_admin"]["storage"]
vdu_id = None
vdu_index = 0
vdu_name = None
kdu_name = None
# Get additional parameters
deploy_params = {"OSM": get_osm_params(db_vnfr)}
if db_vnfr.get("additionalParamsForVnf"):
deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
descriptor_config = get_configuration(vnfd, vnfd["id"])
if descriptor_config:
self._deploy_n2vc(
logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
nsr_id=nsr_id,
nsi_id=nsi_id,
vnfd_id=vnfd_id,
vdu_id=vdu_id,
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params,
descriptor_config=descriptor_config,
base_folder=base_folder,
task_instantiation_info=tasks_dict_info,
stage=stage
)
# Deploy charms for each VDU that supports one.
for vdud in get_vdu_list(vnfd):
vdu_id = vdud["id"]
descriptor_config = get_configuration(vnfd, vdu_id)
vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
if vdur.get("additionalParams"):
deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
else:
deploy_params_vdu = deploy_params
deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
vdud_count = get_vdu_profile(vnfd, vdu_id).get("max-number-of-instances", 1)
self.logger.debug("VDUD > {}".format(vdud))
self.logger.debug("Descriptor config > {}".format(descriptor_config))
if descriptor_config:
vdu_name = None
kdu_name = None
for vdu_index in range(vdud_count):
# TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
self._deploy_n2vc(
logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
member_vnf_index, vdu_id, vdu_index),
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
nsr_id=nsr_id,
nsi_id=nsi_id,
vnfd_id=vnfd_id,
vdu_id=vdu_id,
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params_vdu,
descriptor_config=descriptor_config,
base_folder=base_folder,
task_instantiation_info=tasks_dict_info,
stage=stage
)
for kdud in get_kdu_list(vnfd):
kdu_name = kdud["name"]
descriptor_config = get_configuration(vnfd, kdu_name)
if descriptor_config:
vdu_id = None
vdu_index = 0
vdu_name = None
kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
if kdur.get("additionalParams"):
deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
self._deploy_n2vc(
logging_text=logging_text,
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
nsr_id=nsr_id,
nsi_id=nsi_id,
vnfd_id=vnfd_id,
vdu_id=vdu_id,
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params_kdu,
descriptor_config=descriptor_config,
base_folder=base_folder,
task_instantiation_info=tasks_dict_info,
stage=stage
)
# Check if this NS has a charm configuration
descriptor_config = nsd.get("ns-configuration")
if descriptor_config and descriptor_config.get("juju"):
vnfd_id = None
db_vnfr = None
member_vnf_index = None
vdu_id = None
kdu_name = None
vdu_index = 0
vdu_name = None
# Get additional parameters
deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
if db_nsr.get("additionalParamsForNs"):
deploy_params.update(parse_yaml_strings(db_nsr["additionalParamsForNs"].copy()))
base_folder = nsd["_admin"]["storage"]
self._deploy_n2vc(
logging_text=logging_text,
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
nsr_id=nsr_id,
nsi_id=nsi_id,
vnfd_id=vnfd_id,
vdu_id=vdu_id,
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params,
descriptor_config=descriptor_config,
base_folder=base_folder,
task_instantiation_info=tasks_dict_info,
stage=stage
)
# rest of staff will be done at finally
except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
exc = e
except asyncio.CancelledError:
self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
exc = "Operation was cancelled"
except Exception as e:
exc = traceback.format_exc()
self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
finally:
if exc:
error_list.append(str(exc))
try:
# wait for pending tasks
if tasks_dict_info:
stage[1] = "Waiting for instantiate pending tasks."
self.logger.debug(logging_text + stage[1])
error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
stage, nslcmop_id, nsr_id=nsr_id)
stage[1] = stage[2] = ""
except asyncio.CancelledError:
error_list.append("Cancelled")
# TODO cancel all tasks
except Exception as exc:
error_list.append(str(exc))
# update operation-status
db_nsr_update["operational-status"] = "running"
# let's begin with VCA 'configured' status (later we can change it)
db_nsr_update["config-status"] = "configured"
for task, task_name in tasks_dict_info.items():
if not task.done() or task.cancelled() or task.exception():
if task_name.startswith(self.task_name_deploy_vca):
# A N2VC task is pending
db_nsr_update["config-status"] = "failed"
else:
# RO or KDU task is pending
db_nsr_update["operational-status"] = "failed"
# update status at database
if error_list:
error_detail = ". ".join(error_list)
self.logger.error(logging_text + error_detail)
error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
db_nslcmop_update["detailed-status"] = error_detail
nslcmop_operation_state = "FAILED"
ns_state = "BROKEN"
else:
error_detail = None
error_description_nsr = error_description_nslcmop = None
ns_state = "READY"
db_nsr_update["detailed-status"] = "Done"
db_nslcmop_update["detailed-status"] = "Done"
nslcmop_operation_state = "COMPLETED"
if db_nsr:
self._write_ns_status(
nsr_id=nsr_id,
ns_state=ns_state,
current_operation="IDLE",
current_operation_id=None,
error_description=error_description_nsr,
error_detail=error_detail,
other_update=db_nsr_update
)
self._write_op_status(
op_id=nslcmop_id,
stage="",
error_message=error_description_nslcmop,
operation_state=nslcmop_operation_state,
other_update=db_nslcmop_update,
)
if nslcmop_operation_state:
try:
await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state},
loop=self.loop)
except Exception as e:
self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
timeout: int = 3600, vca_type: str = None) -> bool:
# steps:
# 1. find all relations for this VCA
# 2. wait for other peers related
# 3. add relations
try:
vca_type = vca_type or "lxc_proxy_charm"
# STEP 1: find all relations for this VCA
# read nsr record
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
# this VCA data
my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
# read all ns-configuration relations
ns_relations = list()
db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
if db_ns_relations:
for r in db_ns_relations:
# check if this VCA is in the relation
if my_vca.get('member-vnf-index') in\
(r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
ns_relations.append(r)
# read all vnf-configuration relations
vnf_relations = list()
db_vnfd_list = db_nsr.get('vnfd-id')
if db_vnfd_list:
for vnfd in db_vnfd_list:
db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
db_vnf_relations = get_configuration(db_vnfd, db_vnfd["id"]).get("relation", [])
if db_vnf_relations:
for r in db_vnf_relations:
# check if this VCA is in the relation
if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
vnf_relations.append(r)
# if no relations, terminate
if not ns_relations and not vnf_relations:
self.logger.debug(logging_text + ' No relations')
return True
self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
# add all relations
start = time()
while True:
# check timeout
now = time()
if now - start >= timeout:
self.logger.error(logging_text + ' : timeout adding relations')
return False
# reload nsr from database (we need to update record: _admin.deloyed.VCA)
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
# for each defined NS relation, find the VCA's related
for r in ns_relations.copy():
from_vca_ee_id = None
to_vca_ee_id = None
from_vca_endpoint = None
to_vca_endpoint = None
vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
for vca in vca_list:
if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
and vca.get('config_sw_installed'):
from_vca_ee_id = vca.get('ee_id')
from_vca_endpoint = r.get('entities')[0].get('endpoint')
if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
and vca.get('config_sw_installed'):
to_vca_ee_id = vca.get('ee_id')
to_vca_endpoint = r.get('entities')[1].get('endpoint')
if from_vca_ee_id and to_vca_ee_id:
# add relation
await self.vca_map[vca_type].add_relation(
ee_id_1=from_vca_ee_id,
ee_id_2=to_vca_ee_id,
endpoint_1=from_vca_endpoint,
endpoint_2=to_vca_endpoint)
# remove entry from relations list
ns_relations.remove(r)
else:
# check failed peers
try:
vca_status_list = db_nsr.get('configurationStatus')
if vca_status_list:
for i in range(len(vca_list)):
vca = vca_list[i]
vca_status = vca_status_list[i]
if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
if vca_status.get('status') == 'BROKEN':
# peer broken: remove relation from list
ns_relations.remove(r)
if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
if vca_status.get('status') == 'BROKEN':
# peer broken: remove relation from list
ns_relations.remove(r)
except Exception:
# ignore
pass
# for each defined VNF relation, find the VCA's related
for r in vnf_relations.copy():
from_vca_ee_id = None
to_vca_ee_id = None
from_vca_endpoint = None
to_vca_endpoint = None
vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
for vca in vca_list:
key_to_check = "vdu_id"
if vca.get("vdu_id") is None:
key_to_check = "vnfd_id"
if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
from_vca_ee_id = vca.get('ee_id')
from_vca_endpoint = r.get('entities')[0].get('endpoint')
if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
to_vca_ee_id = vca.get('ee_id')
to_vca_endpoint = r.get('entities')[1].get('endpoint')
if from_vca_ee_id and to_vca_ee_id:
# add relation
await self.vca_map[vca_type].add_relation(
ee_id_1=from_vca_ee_id,
ee_id_2=to_vca_ee_id,
endpoint_1=from_vca_endpoint,
endpoint_2=to_vca_endpoint)
# remove entry from relations list
vnf_relations.remove(r)
else:
# check failed peers
try:
vca_status_list = db_nsr.get('configurationStatus')
if vca_status_list:
for i in range(len(vca_list)):
vca = vca_list[i]
vca_status = vca_status_list[i]
if vca.get('vdu_id') == r.get('entities')[0].get('id'):
if vca_status.get('status') == 'BROKEN':
# peer broken: remove relation from list
vnf_relations.remove(r)
if vca.get('vdu_id') == r.get('entities')[1].get('id'):
if vca_status.get('status') == 'BROKEN':
# peer broken: remove relation from list
vnf_relations.remove(r)
except Exception:
# ignore
pass
# wait for next try
await asyncio.sleep(5.0)
if not ns_relations and not vnf_relations:
self.logger.debug('Relations added')
break
return True
except Exception as e:
self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
return False
async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
try:
k8sclustertype = k8s_instance_info["k8scluster-type"]
# Instantiate kdu
db_dict_install = {"collection": "nsrs",
"filter": {"_id": nsr_id},
"path": nsr_db_path}
kdu_instance = self.k8scluster_map[k8sclustertype].generate_kdu_instance_name(
db_dict=db_dict_install,
kdu_model=k8s_instance_info["kdu-model"],
kdu_name=k8s_instance_info["kdu-name"],
)
self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
await self.k8scluster_map[k8sclustertype].install(
cluster_uuid=k8s_instance_info["k8scluster-uuid"],
kdu_model=k8s_instance_info["kdu-model"],
atomic=True,
params=k8params,
db_dict=db_dict_install,
timeout=timeout,
kdu_name=k8s_instance_info["kdu-name"],
namespace=k8s_instance_info["namespace"],
kdu_instance=kdu_instance,
)
self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
# Obtain services to obtain management service ip
services = await self.k8scluster_map[k8sclustertype].get_services(
cluster_uuid=k8s_instance_info["k8scluster-uuid"],
kdu_instance=kdu_instance,
namespace=k8s_instance_info["namespace"])
# Obtain management service info (if exists)
vnfr_update_dict = {}
if services:
vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
for mgmt_service in mgmt_services:
for service in services:
if service["name"].startswith(mgmt_service["name"]):
# Mgmt service found, Obtain service ip
ip = service.get("external_ip", service.get("cluster_ip"))
if isinstance(ip, list) and len(ip) == 1:
ip = ip[0]
vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
# Check if must update also mgmt ip at the vnf
service_external_cp = mgmt_service.get("external-connection-point-ref")
if service_external_cp:
if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
vnfr_update_dict["ip-address"] = ip
break
else:
self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
if kdu_config and kdu_config.get("initial-config-primitive") and \
get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None:
initial_config_primitive_list = kdu_config.get("initial-config-primitive")
initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
for initial_config_primitive in initial_config_primitive_list:
primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
await asyncio.wait_for(
self.k8scluster_map[k8sclustertype].exec_primitive(
cluster_uuid=k8s_instance_info["k8scluster-uuid"],
kdu_instance=kdu_instance,
primitive_name=initial_config_primitive["name"],
params=primitive_params_, db_dict=db_dict_install),
timeout=timeout)
except Exception as e:
# Prepare update db with error and raise exception
try:
self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
except Exception:
# ignore to keep original exception
pass
# reraise original error
raise
return kdu_instance
async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
# Launch kdus if present in the descriptor
k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
async def _get_cluster_id(cluster_id, cluster_type):
nonlocal k8scluster_id_2_uuic
if cluster_id in k8scluster_id_2_uuic[cluster_type]:
return k8scluster_id_2_uuic[cluster_type][cluster_id]
# check if K8scluster is creating and wait look if previous tasks in process
task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
if task_dependency:
text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
self.logger.debug(logging_text + text)
await asyncio.wait(task_dependency, timeout=3600)
db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
if not db_k8scluster:
raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
if not k8s_id:
if cluster_type == "helm-chart-v3":
try:
# backward compatibility for existing clusters that have not been initialized for helm v3
k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials,
reuse_cluster_uuid=cluster_id)
db_k8scluster_update = {}
db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw
db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED"
self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
except Exception as e:
self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e)))
raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
cluster_type))
else:
raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
format(cluster_id, cluster_type))
k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
return k8s_id
logging_text += "Deploy kdus: "
step = ""
try:
db_nsr_update = {"_admin.deployed.K8s": []}
self.update_db_2("nsrs", nsr_id, db_nsr_update)
index = 0
updated_cluster_list = []
updated_v3_cluster_list = []
for vnfr_data in db_vnfrs.values():
for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
# Step 0: Prepare and set parameters
desc_params = parse_yaml_strings(kdur.get("additionalParams"))
vnfd_id = vnfr_data.get('vnfd-id')
vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
kdud = next(kdud for kdud in vnfd_with_id["kdu"] if kdud["name"] == kdur["kdu-name"])
namespace = kdur.get("k8s-namespace")
if kdur.get("helm-chart"):
kdumodel = kdur["helm-chart"]
# Default version: helm3, if helm-version is v2 assign v2
k8sclustertype = "helm-chart-v3"
self.logger.debug("kdur: {}".format(kdur))
if kdur.get("helm-version") and kdur.get("helm-version") == "v2":
k8sclustertype = "helm-chart"
elif kdur.get("juju-bundle"):
kdumodel = kdur["juju-bundle"]
k8sclustertype = "juju-bundle"
else:
raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
"juju-bundle. Maybe an old NBI version is running".
format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
# check if kdumodel is a file and exists
try:
vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
storage = deep_get(vnfd_with_id, ('_admin', 'storage'))
if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
# path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
kdumodel)
if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
kdumodel = self.fs.path + filename
except (asyncio.TimeoutError, asyncio.CancelledError):
raise
except Exception: # it is not a file
pass
k8s_cluster_id = kdur["k8s-cluster"]["id"]
step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
# Synchronize repos
if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\
or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list):
del_repo_list, added_repo_dict = await asyncio.ensure_future(
self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid))
if del_repo_list or added_repo_dict:
if k8sclustertype == "helm-chart":
unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
updated = {'_admin.helm_charts_added.' +
item: name for item, name in added_repo_dict.items()}
updated_cluster_list.append(cluster_uuid)
elif k8sclustertype == "helm-chart-v3":
unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list}
updated = {'_admin.helm_charts_v3_added.' +
item: name for item, name in added_repo_dict.items()}
updated_v3_cluster_list.append(cluster_uuid)
self.logger.debug(logging_text + "repos synchronized on k8s cluster "
"'{}' to_delete: {}, to_add: {}".
format(k8s_cluster_id, del_repo_list, added_repo_dict))
self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
# Instantiate kdu
step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
kdur["kdu-name"], k8s_cluster_id)
k8s_instance_info = {"kdu-instance": None,
"k8scluster-uuid": cluster_uuid,
"k8scluster-type": k8sclustertype,
"member-vnf-index": vnfr_data["member-vnf-index-ref"],
"kdu-name": kdur["kdu-name"],
"kdu-model": kdumodel,
"namespace": namespace}
db_path = "_admin.deployed.K8s.{}".format(index)
db_nsr_update[db_path] = k8s_instance_info
self.update_db_2("nsrs", nsr_id, db_nsr_update)
vnfd_with_id = find_in_list(db_vnfds, lambda vnf: vnf["_id"] == vnfd_id)
task = asyncio.ensure_future(
self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, vnfd_with_id,
k8s_instance_info, k8params=desc_params, timeout=600))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
index += 1
except (LcmException, asyncio.CancelledError):
raise
except Exception as e:
msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
if isinstance(e, (N2VCException, DbException)):
self.logger.error(logging_text + msg)
else:
self.logger.critical(logging_text + msg, exc_info=True)
raise LcmException(msg)
finally:
if db_nsr_update:
self.update_db_2("nsrs", nsr_id, db_nsr_update)
def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
base_folder, task_instantiation_info, stage):
# launch instantiate_N2VC in a asyncio task and register task object
# Look where information of this charm is at database <nsrs>._admin.deployed.VCA
# if not found, create one entry and update database
# fill db_nsr._admin.deployed.VCA.<index>
self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
if "execution-environment-list" in descriptor_config:
ee_list = descriptor_config.get("execution-environment-list", [])
else: # other types as script are not supported
ee_list = []
for ee_item in ee_list:
self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
ee_item.get("helm-chart")))
ee_descriptor_id = ee_item.get("id")
if ee_item.get("juju"):
vca_name = ee_item['juju'].get('charm')
vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
if ee_item['juju'].get('cloud') == "k8s":
vca_type = "k8s_proxy_charm"
elif ee_item['juju'].get('proxy') is False:
vca_type = "native_charm"
elif ee_item.get("helm-chart"):
vca_name = ee_item['helm-chart']
if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
vca_type = "helm"
else:
vca_type = "helm-v3"
else:
self.logger.debug(logging_text + "skipping non juju neither charm configuration")
continue
vca_index = -1
for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
if not vca_deployed:
continue
if vca_deployed.get("member-vnf-index") == member_vnf_index and \
vca_deployed.get("vdu_id") == vdu_id and \
vca_deployed.get("kdu_name") == kdu_name and \
vca_deployed.get("vdu_count_index", 0) == vdu_index and \
vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
break
else:
# not found, create one.
target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
if vdu_id:
target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
elif kdu_name:
target += "/kdu/{}".format(kdu_name)
vca_deployed = {
"target_element": target,
# ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
"member-vnf-index": member_vnf_index,
"vdu_id": vdu_id,
"kdu_name": kdu_name,
"vdu_count_index": vdu_index,
"operational-status": "init", # TODO revise
"detailed-status": "", # TODO revise
"step": "initial-deploy", # TODO revise
"vnfd_id": vnfd_id,
"vdu_name": vdu_name,
"type": vca_type,
"ee_descriptor_id": ee_descriptor_id
}
vca_index += 1
# create VCA and configurationStatus in db
db_dict = {
"_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
"configurationStatus.{}".format(vca_index): dict()
}
self.update_db_2("nsrs", nsr_id, db_dict)
db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
# Launch task
task_n2vc = asyncio.ensure_future(
self.instantiate_N2VC(
logging_text=logging_text,
vca_index=vca_index,
nsi_id=nsi_id,
db_nsr=db_nsr,
db_vnfr=db_vnfr,
vdu_id=vdu_id,
kdu_name=kdu_name,
vdu_index=vdu_index,
deploy_params=deploy_params,
config_descriptor=descriptor_config,
base_folder=base_folder,
nslcmop_id=nslcmop_id,
stage=stage,
vca_type=vca_type,
vca_name=vca_name,
ee_config_descriptor=ee_item
)
)
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
member_vnf_index or "", vdu_id or "")
@staticmethod
def _create_nslcmop(nsr_id, operation, params):
"""
Creates a ns-lcm-opp content to be stored at database.
:param nsr_id: internal id of the instance
:param operation: instantiate, terminate, scale, action, ...
:param params: user parameters for the operation
:return: dictionary following SOL005 format
"""
# Raise exception if invalid arguments
if not (nsr_id and operation and params):
raise LcmException(
"Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
now = time()
_id = str(uuid4())
nslcmop = {
"id": _id,
"_id": _id,
# COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
"operationState": "PROCESSING",
"statusEnteredTime": now,
"nsInstanceId": nsr_id,
"lcmOperationType": operation,
"startTime": now,
"isAutomaticInvocation": False,
"operationParams": params,
"isCancelPending": False,
"links": {
"self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
"nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
}
}
return nslcmop
def _format_additional_params(self, params):
params = params or {}
for key, value in params.items():
if str(value).startswith("!!yaml "):
params[key] = yaml.safe_load(value[7:])
return params
def _get_terminate_primitive_params(self, seq, vnf_index):
primitive = seq.get('name')
primitive_params = {}
params = {
"member_vnf_index": vnf_index,
"primitive": primitive,
"primitive_params": primitive_params,
}
desc_params = {}
return self._map_primitive_params(seq, params, desc_params)
# sub-operations
def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
if op.get('operationState') == 'COMPLETED':
# b. Skip sub-operation
# _ns_execute_primitive() or RO.create_action() will NOT be executed
return self.SUBOPERATION_STATUS_SKIP
else:
# c. retry executing sub-operation
# The sub-operation exists, and operationState != 'COMPLETED'
# Update operationState = 'PROCESSING' to indicate a retry.
operationState = 'PROCESSING'
detailed_status = 'In progress'
self._update_suboperation_status(
db_nslcmop, op_index, operationState, detailed_status)
# Return the sub-operation index
# _ns_execute_primitive() or RO.create_action() will be called from scale()
# with arguments extracted from the sub-operation
return op_index
# Find a sub-operation where all keys in a matching dictionary must match
# Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
def _find_suboperation(self, db_nslcmop, match):
if db_nslcmop and match:
op_list = db_nslcmop.get('_admin', {}).get('operations', [])
for i, op in enumerate(op_list):
if all(op.get(k) == match[k] for k in match):
return i
return self.SUBOPERATION_STATUS_NOT_FOUND
# Update status for a sub-operation given its index
def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
# Update DB for HA tasks
q_filter = {'_id': db_nslcmop['_id']}
update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
'_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
self.db.set_one("nslcmops",
q_filter=q_filter,
update_dict=update_dict,
fail_on_empty=False)
# Add sub-operation, return the index of the added sub-operation
# Optionally, set operationState, detailed-status, and operationType
# Status and type are currently set for 'scale' sub-operations:
# 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
# 'detailed-status' : status message
# 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
# Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
RO_nsr_id=None, RO_scaling_info=None):
if not db_nslcmop:
return self.SUBOPERATION_STATUS_NOT_FOUND
# Get the "_admin.operations" list, if it exists
db_nslcmop_admin = db_nslcmop.get('_admin', {})
op_list = db_nslcmop_admin.get('operations')
# Create or append to the "_admin.operations" list
new_op = {'member_vnf_index': vnf_index,
'vdu_id': vdu_id,
'vdu_count_index': vdu_count_index,
'primitive': primitive,
'primitive_params': mapped_primitive_params}
if operationState:
new_op['operationState'] = operationState
if detailed_status:
new_op['detailed-status'] = detailed_status
if operationType:
new_op['lcmOperationType'] = operationType
if RO_nsr_id:
new_op['RO_nsr_id'] = RO_nsr_id
if RO_scaling_info:
new_op['RO_scaling_info'] = RO_scaling_info
if not op_list:
# No existing operations, create key 'operations' with current operation as first list element
db_nslcmop_admin.update({'operations': [new_op]})
op_list = db_nslcmop_admin.get('operations')
else:
# Existing operations, append operation to list
op_list.append(new_op)
db_nslcmop_update = {'_admin.operations': op_list}
self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
op_index = len(op_list) - 1
return op_index
# Helper methods for scale() sub-operations
# pre-scale/post-scale:
# Check for 3 different cases:
# a. New: First time execution, return SUBOPERATION_STATUS_NEW
# b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
# c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
operationType, RO_nsr_id=None, RO_scaling_info=None):
# Find this sub-operation
if RO_nsr_id and RO_scaling_info:
operationType = 'SCALE-RO'
match = {
'member_vnf_index': vnf_index,
'RO_nsr_id': RO_nsr_id,
'RO_scaling_info': RO_scaling_info,
}
else:
match = {
'member_vnf_index': vnf_index,
'primitive': vnf_config_primitive,
'primitive_params': primitive_params,
'lcmOperationType': operationType
}
op_index = self._find_suboperation(db_nslcmop, match)
if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
# a. New sub-operation
# The sub-operation does not exist, add it.
# _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
# The following parameters are set to None for all kind of scaling:
vdu_id = None
vdu_count_index = None
vdu_name = None
if RO_nsr_id and RO_scaling_info:
vnf_config_primitive = None
primitive_params = None
else:
RO_nsr_id = None
RO_scaling_info = None
# Initial status for sub-operation
operationState = 'PROCESSING'
detailed_status = 'In progress'
# Add sub-operation for pre/post-scaling (zero or more operations)
self._add_suboperation(db_nslcmop,
vnf_index,
vdu_id,
vdu_count_index,
vdu_name,
vnf_config_primitive,
primitive_params,
operationState,
detailed_status,
operationType,
RO_nsr_id,
RO_scaling_info)
return self.SUBOPERATION_STATUS_NEW
else:
# Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
# or op_index (operationState != 'COMPLETED')
return self._retry_or_skip_suboperation(db_nslcmop, op_index)
# Function to return execution_environment id
def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
# TODO vdu_index_count
for vca in vca_deployed_list:
if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
return vca["ee_id"]
async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
vca_index, destroy_ee=True, exec_primitives=True, scaling_in=False):
"""
Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
:param logging_text:
:param db_nslcmop:
:param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
:param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
:param vca_index: index in the database _admin.deployed.VCA
:param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
:param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
not executed properly
:param scaling_in: True destroys the application, False destroys the model
:return: None or exception
"""
self.logger.debug(
logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
vca_index, vca_deployed, config_descriptor, destroy_ee
)
)
vca_type = vca_deployed.get("type", "lxc_proxy_charm")
# execute terminate_primitives
if exec_primitives:
terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
config_descriptor.get("terminate-config-primitive"), vca_deployed.get("ee_descriptor_id"))
vdu_id = vca_deployed.get("vdu_id")
vdu_count_index = vca_deployed.get("vdu_count_index")
vdu_name = vca_deployed.get("vdu_name")
vnf_index = vca_deployed.get("member-vnf-index")
if terminate_primitives and vca_deployed.get("needed_terminate"):
for seq in terminate_primitives:
# For each sequence in list, get primitive and call _ns_execute_primitive()
step = "Calling terminate action for vnf_member_index={} primitive={}".format(
vnf_index, seq.get("name"))
self.logger.debug(logging_text + step)
# Create the primitive for each sequence, i.e. "primitive": "touch"
primitive = seq.get('name')
mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
# Add sub-operation
self._add_suboperation(db_nslcmop,
vnf_index,
vdu_id,
vdu_count_index,
vdu_name,
primitive,
mapped_primitive_params)
# Sub-operations: Call _ns_execute_primitive() instead of action()
try:
result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
mapped_primitive_params,
vca_type=vca_type)
except LcmException:
# this happens when VCA is not deployed. In this case it is not needed to terminate
continue
result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
if result not in result_ok:
raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
"error {}".format(seq.get("name"), vnf_index, result_detail))
# set that this VCA do not need terminated
db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
if vca_deployed.get("prometheus_jobs") and self.prometheus:
await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
if destroy_ee:
await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"], scaling_in=scaling_in)
async def _delete_all_N2VC(self, db_nsr: dict):
self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
namespace = "." + db_nsr["_id"]
try:
await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
except N2VCNotFound: # already deleted. Skip
pass
self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
"""
Terminates a deployment from RO
:param logging_text:
:param nsr_deployed: db_nsr._admin.deployed
:param nsr_id:
:param nslcmop_id:
:param stage: list of string with the content to write on db_nslcmop.detailed-status.
this method will update only the index 2, but it will write on database the concatenated content of the list
:return:
"""
db_nsr_update = {}
failed_detail = []
ro_nsr_id = ro_delete_action = None
if nsr_deployed and nsr_deployed.get("RO"):
ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
try:
if ro_nsr_id:
stage[2] = "Deleting ns from VIM."
db_nsr_update["detailed-status"] = " ".join(stage)
self._write_op_status(nslcmop_id, stage)
self.logger.debug(logging_text + stage[2])
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
desc = await self.RO.delete("ns", ro_nsr_id)
ro_delete_action = desc["action_id"]
db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
db_nsr_update["_admin.deployed.RO.nsr_id"] = None
db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
if ro_delete_action:
# wait until NS is deleted from VIM
stage[2] = "Waiting ns deleted from VIM."
detailed_status_old = None
self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
ro_delete_action))
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
delete_timeout = 20 * 60 # 20 minutes
while delete_timeout > 0:
desc = await self.RO.show(
"ns",
item_id_name=ro_nsr_id,
extra_item="action",
extra_item_id=ro_delete_action)
# deploymentStatus
self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
ns_status, ns_status_info = self.RO.check_action_status(desc)
if ns_status == "ERROR":
raise ROclient.ROClientException(ns_status_info)
elif ns_status == "BUILD":
stage[2] = "Deleting from VIM {}".format(ns_status_info)
elif ns_status == "ACTIVE":
db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
break
else:
assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
if stage[2] != detailed_status_old:
detailed_status_old = stage[2]
db_nsr_update["detailed-status"] = " ".join(stage)
self._write_op_status(nslcmop_id, stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
await asyncio.sleep(5, loop=self.loop)
delete_timeout -= 5
else: # delete_timeout <= 0:
raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
except Exception as e:
self.update_db_2("nsrs", nsr_id, db_nsr_update)
if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
db_nsr_update["_admin.deployed.RO.nsr_id"] = None
db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
failed_detail.append("delete conflict: {}".format(e))
self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
else:
failed_detail.append("delete error: {}".format(e))
self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
# Delete nsd
if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
try:
stage[2] = "Deleting nsd from RO."
db_nsr_update["detailed-status"] = " ".join(stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
await self.RO.delete("nsd", ro_nsd_id)
self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
db_nsr_update["_admin.deployed.RO.nsd_id"] = None
except Exception as e:
if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
db_nsr_update["_admin.deployed.RO.nsd_id"] = None
self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
self.logger.debug(logging_text + failed_detail[-1])
else:
failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
self.logger.error(logging_text + failed_detail[-1])
if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
if not vnf_deployed or not vnf_deployed["id"]:
continue
try:
ro_vnfd_id = vnf_deployed["id"]
stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
vnf_deployed["member-vnf-index"], ro_vnfd_id)
db_nsr_update["detailed-status"] = " ".join(stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
await self.RO.delete("vnfd", ro_vnfd_id)
self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
except Exception as e:
if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
self.logger.debug(logging_text + failed_detail[-1])
else:
failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
self.logger.error(logging_text + failed_detail[-1])
if failed_detail:
stage[2] = "Error deleting from VIM"
else:
stage[2] = "Deleted from VIM"
db_nsr_update["detailed-status"] = " ".join(stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
if failed_detail:
raise LcmException("; ".join(failed_detail))
async def terminate(self, nsr_id, nslcmop_id):
# Try to lock HA task here
task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
if not task_is_locked_by_me:
return
logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
timeout_ns_terminate = self.timeout_ns_terminate
db_nsr = None
db_nslcmop = None
operation_params = None
exc = None
error_list = [] # annotates all failed error messages
db_nslcmop_update = {}
autoremove = False # autoremove after terminated
tasks_dict_info = {}
db_nsr_update = {}
stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
# ^ contains [stage, step, VIM-status]
try:
# wait for any previous tasks in process
await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
operation_params = db_nslcmop.get("operationParams") or {}
if operation_params.get("timeout_ns_terminate"):
timeout_ns_terminate = operation_params["timeout_ns_terminate"]
stage[1] = "Getting nsr={} from db.".format(nsr_id)
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
db_nsr_update["operational-status"] = "terminating"
db_nsr_update["config-status"] = "terminating"
self._write_ns_status(
nsr_id=nsr_id,
ns_state="TERMINATING",
current_operation="TERMINATING",
current_operation_id=nslcmop_id,
other_update=db_nsr_update
)
self._write_op_status(
op_id=nslcmop_id,
queuePosition=0,
stage=stage
)
nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
return
stage[1] = "Getting vnf descriptors from db."
db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
db_vnfds_from_id = {}
db_vnfds_from_member_index = {}
# Loop over VNFRs
for vnfr in db_vnfrs_list:
vnfd_id = vnfr["vnfd-id"]
if vnfd_id not in db_vnfds_from_id:
vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
db_vnfds_from_id[vnfd_id] = vnfd
db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
# Destroy individual execution environments when there are terminating primitives.
# Rest of EE will be deleted at once
# TODO - check before calling _destroy_N2VC
# if not operation_params.get("skip_terminate_primitives"):#
# or not vca.get("needed_terminate"):
stage[0] = "Stage 2/3 execute terminating primitives."
self.logger.debug(logging_text + stage[0])
stage[1] = "Looking execution environment that needs terminate."
self.logger.debug(logging_text + stage[1])
for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
config_descriptor = None
if not vca or not vca.get("ee_id"):
continue
if not vca.get("member-vnf-index"):
# ns
config_descriptor = db_nsr.get("ns-configuration")
elif vca.get("vdu_id"):
db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
elif vca.get("kdu_name"):
db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
else:
db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
vca_type = vca.get("type")
exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
vca.get("needed_terminate"))
# For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
# pending native charms
destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False
# self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
# vca_index, vca.get("ee_id"), vca_type, destroy_ee))
task = asyncio.ensure_future(
self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
destroy_ee, exec_terminate_primitives))
tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
# wait for pending tasks of terminate primitives
if tasks_dict_info:
self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
min(self.timeout_charm_delete, timeout_ns_terminate),
stage, nslcmop_id)
tasks_dict_info.clear()
if error_list:
return # raise LcmException("; ".join(error_list))
# remove All execution environments at once
stage[0] = "Stage 3/3 delete all."
if nsr_deployed.get("VCA"):
stage[1] = "Deleting all execution environments."
self.logger.debug(logging_text + stage[1])
task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
timeout=self.timeout_charm_delete))
# task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
tasks_dict_info[task_delete_ee] = "Terminating all VCA"
# Delete from k8scluster
stage[1] = "Deleting KDUs."
self.logger.debug(logging_text + stage[1])
# print(nsr_deployed)
for kdu in get_iterable(nsr_deployed, "K8s"):
if not kdu or not kdu.get("kdu-instance"):
continue
kdu_instance = kdu.get("kdu-instance")
if kdu.get("k8scluster-type") in self.k8scluster_map:
task_delete_kdu_instance = asyncio.ensure_future(
self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu_instance))
else:
self.logger.error(logging_text + "Unknown k8s deployment type {}".
format(kdu.get("k8scluster-type")))
continue
tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
# remove from RO
stage[1] = "Deleting ns from VIM."
if self.ng_ro:
task_delete_ro = asyncio.ensure_future(
self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
else:
task_delete_ro = asyncio.ensure_future(
self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
# rest of staff will be done at finally
except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
self.logger.error(logging_text + "Exit Exception {}".format(e))
exc = e
except asyncio.CancelledError:
self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
exc = "Operation was cancelled"
except Exception as e:
exc = traceback.format_exc()
self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
finally:
if exc:
error_list.append(str(exc))
try:
# wait for pending tasks
if tasks_dict_info:
stage[1] = "Waiting for terminate pending tasks."
self.logger.debug(logging_text + stage[1])
error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
stage, nslcmop_id)
stage[1] = stage[2] = ""
except asyncio.CancelledError:
error_list.append("Cancelled")
# TODO cancell all tasks
except Exception as exc:
error_list.append(str(exc))
# update status at database
if error_list:
error_detail = "; ".join(error_list)
# self.logger.error(logging_text + error_detail)
error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
db_nsr_update["operational-status"] = "failed"
db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
db_nslcmop_update["detailed-status"] = error_detail
nslcmop_operation_state = "FAILED"
ns_state = "BROKEN"
else:
error_detail = None
error_description_nsr = error_description_nslcmop = None
ns_state = "NOT_INSTANTIATED"
db_nsr_update["operational-status"] = "terminated"
db_nsr_update["detailed-status"] = "Done"
db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
db_nslcmop_update["detailed-status"] = "Done"
nslcmop_operation_state = "COMPLETED"
if db_nsr:
self._write_ns_status(
nsr_id=nsr_id,
ns_state=ns_state,
current_operation="IDLE",
current_operation_id=None,
error_description=error_description_nsr,
error_detail=error_detail,
other_update=db_nsr_update
)
self._write_op_status(
op_id=nslcmop_id,
stage="",
error_message=error_description_nslcmop,
operation_state=nslcmop_operation_state,
other_update=db_nslcmop_update,
)
if ns_state == "NOT_INSTANTIATED":
try:
self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
except DbException as e:
self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
format(nsr_id, e))
if operation_params:
autoremove = operation_params.get("autoremove", False)
if nslcmop_operation_state:
try:
await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
"autoremove": autoremove},
loop=self.loop)
except Exception as e:
self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
time_start = time()
error_detail_list = []
error_list = []
pending_tasks = list(created_tasks_info.keys())
num_tasks = len(pending_tasks)
num_done = 0
stage[1] = "{}/{}.".format(num_done, num_tasks)
self._write_op_status(nslcmop_id, stage)
while pending_tasks:
new_error = None
_timeout = timeout + time_start - time()
done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
return_when=asyncio.FIRST_COMPLETED)
num_done += len(done)
if not done: # Timeout
for task in pending_tasks:
new_error = created_tasks_info[task] + ": Timeout"
error_detail_list.append(new_error)
error_list.append(new_error)
break
for task in done:
if task.cancelled():
exc = "Cancelled"
else:
exc = task.exception()
if exc:
if isinstance(exc, asyncio.TimeoutError):
exc = "Timeout"
new_error = created_tasks_info[task] + ": {}".format(exc)
error_list.append(created_tasks_info[task])
error_detail_list.append(new_error)
if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
K8sException, NgRoException)):
self.logger.error(logging_text + new_error)
else:
exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
self.logger.error(logging_text + created_tasks_info[task] + " " + exc_traceback)
else:
self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
stage[1] = "{}/{}.".format(num_done, num_tasks)
if new_error:
stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
if nsr_id: # update also nsr
self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
"errorDetail": ". ".join(error_detail_list)})
self._write_op_status(nslcmop_id, stage)
return error_detail_list
@staticmethod
def _map_primitive_params(primitive_desc, params, instantiation_params):
"""
Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
The default-value is used. If it is between < > it look for a value at instantiation_params
:param primitive_desc: portion of VNFD/NSD that describes primitive
:param params: Params provided by user
:param instantiation_params: Instantiation params provided by user
:return: a dictionary with the calculated params
"""
calculated_params = {}
for parameter in primitive_desc.get("parameter", ()):
param_name = parameter["name"]
if param_name in params:
calculated_params[param_name] = params[param_name]
elif "default-value" in parameter or "value" in parameter:
if "value" in parameter:
calculated_params[param_name] = parameter["value"]
else:
calculated_params[param_name] = parameter["default-value"]
if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
and calculated_params[param_name].endswith(">"):
if calculated_params[param_name][1:-1] in instantiation_params:
calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
else:
raise LcmException("Parameter {} needed to execute primitive {} not provided".
format(calculated_params[param_name], primitive_desc["name"]))
else:
raise LcmException("Parameter {} needed to execute primitive {} not provided".
format(param_name, primitive_desc["name"]))
if isinstance(calculated_params[param_name], (dict, list, tuple)):
calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name],
default_flow_style=True, width=256)
elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
calculated_params[param_name] = calculated_params[param_name][7:]
if parameter.get("data-type") == "INTEGER":
try:
calculated_params[param_name] = int(calculated_params[param_name])
except ValueError: # error converting string to int
raise LcmException(
"Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
elif parameter.get("data-type") == "BOOLEAN":
calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
# add always ns_config_info if primitive name is config
if primitive_desc["name"] == "config":
if "ns_config_info" in instantiation_params:
calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
return calculated_params
def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
ee_descriptor_id=None):
# find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
for vca in deployed_vca:
if not vca:
continue
if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
continue
if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
continue
if kdu_name and kdu_name != vca["kdu_name"]:
continue
if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
continue
break
else:
# vca_deployed not found
raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
" is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
ee_descriptor_id))
# get ee_id
ee_id = vca.get("ee_id")
vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
if not ee_id:
raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
"execution environment"
.format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
return ee_id, vca_type
async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0, retries_interval=30,
timeout=None, vca_type=None, db_dict=None) -> (str, str):
try:
if primitive == "config":
primitive_params = {"params": primitive_params}
vca_type = vca_type or "lxc_proxy_charm"
while retries >= 0:
try:
output = await asyncio.wait_for(
self.vca_map[vca_type].exec_primitive(
ee_id=ee_id,
primitive_name=primitive,
params_dict=primitive_params,
progress_timeout=self.timeout_progress_primitive,
total_timeout=self.timeout_primitive,
db_dict=db_dict),
timeout=timeout or self.timeout_primitive)
# execution was OK
break
except asyncio.CancelledError:
raise
except Exception as e: # asyncio.TimeoutError
if isinstance(e, asyncio.TimeoutError):
e = "Timeout"
retries -= 1
if retries >= 0:
self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
# wait and retry
await asyncio.sleep(retries_interval, loop=self.loop)
else:
return 'FAILED', str(e)
return 'COMPLETED', output
except (LcmException, asyncio.CancelledError):
raise
except Exception as e:
return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
async def vca_status_refresh(self, nsr_id, nslcmop_id):
"""
Updating the vca_status with latest juju information in nsrs record
:param: nsr_id: Id of the nsr
:param: nslcmop_id: Id of the nslcmop
:return: None
"""
self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
if db_nsr['_admin']['deployed']['K8s']:
for k8s_index, k8s in enumerate(db_nsr['_admin']['deployed']['K8s']):
cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
await self._on_update_k8s_db(cluster_uuid, kdu_instance, filter={'_id': nsr_id})
else:
for vca_index, _ in enumerate(db_nsr['_admin']['deployed']['VCA']):
table, filter = "nsrs", {"_id": nsr_id}
path = "_admin.deployed.VCA.{}.".format(vca_index)
await self._on_update_n2vc_db(table, filter, path, {})
self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
async def action(self, nsr_id, nslcmop_id):
# Try to lock HA task here
task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
if not task_is_locked_by_me:
return
logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
# get all needed from database
db_nsr = None
db_nslcmop = None
db_nsr_update = {}
db_nslcmop_update = {}
nslcmop_operation_state = None
error_description_nslcmop = None
exc = None
try:
# wait for any previous tasks in process
step = "Waiting for previous operations to terminate"
await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
self._write_ns_status(
nsr_id=nsr_id,
ns_state=None,
current_operation="RUNNING ACTION",
current_operation_id=nslcmop_id
)
step = "Getting information from database"
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
nsr_deployed = db_nsr["_admin"].get("deployed")
vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
vdu_id = db_nslcmop["operationParams"].get("vdu_id")
kdu_name = db_nslcmop["operationParams"].get("kdu_name")
vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
primitive = db_nslcmop["operationParams"]["primitive"]
primitive_params = db_nslcmop["operationParams"]["primitive_params"]
timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
if vnf_index:
step = "Getting vnfr from database"
db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
step = "Getting vnfd from database"
db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
else:
step = "Getting nsd from database"
db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
# for backward compatibility
if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
self.update_db_2("nsrs", nsr_id, db_nsr_update)
# look for primitive
config_primitive_desc = descriptor_configuration = None
if vdu_id:
descriptor_configuration = get_configuration(db_vnfd, vdu_id)
elif kdu_name:
descriptor_configuration = get_configuration(db_vnfd, kdu_name)
elif vnf_index:
descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
else:
descriptor_configuration = db_nsd.get("ns-configuration")
if descriptor_configuration and descriptor_configuration.get("config-primitive"):
for config_primitive in descriptor_configuration["config-primitive"]:
if config_primitive["name"] == primitive:
config_primitive_desc = config_primitive
break
if not config_primitive_desc:
if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
format(primitive))
primitive_name = primitive
ee_descriptor_id = None
else:
primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
if vnf_index:
if vdu_id:
vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
desc_params = parse_yaml_strings(vdur.get("additionalParams"))
elif kdu_name:
kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
desc_params = parse_yaml_strings(kdur.get("additionalParams"))
else:
desc_params = parse_yaml_strings(db_vnfr.get("additionalParamsForVnf"))
else:
desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
if kdu_name and get_configuration(db_vnfd, kdu_name):
kdu_configuration = get_configuration(db_vnfd, kdu_name)
actions = set()
for primitive in kdu_configuration.get("initial-config-primitive", []):
actions.add(primitive["name"])
for primitive in kdu_configuration.get("config-primitive", []):
actions.add(primitive["name"])
kdu_action = True if primitive_name in actions else False
# TODO check if ns is in a proper status
if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
# kdur and desc_params already set from before
if primitive_params:
desc_params.update(primitive_params)
# TODO Check if we will need something at vnf level
for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
break
else:
raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
if kdu.get("k8scluster-type") not in self.k8scluster_map:
msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
raise LcmException(msg)
db_dict = {"collection": "nsrs",
"filter": {"_id": nsr_id},
"path": "_admin.deployed.K8s.{}".format(index)}
self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
step = "Executing kdu {}".format(primitive_name)
if primitive_name == "upgrade":
if desc_params.get("kdu_model"):
kdu_model = desc_params.get("kdu_model")
del desc_params["kdu_model"]
else:
kdu_model = kdu.get("kdu-model")
parts = kdu_model.split(sep=":")
if len(parts) == 2:
kdu_model = parts[0]
detailed_status = await asyncio.wait_for(
self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu.get("kdu-instance"),
atomic=True, kdu_model=kdu_model,
params=desc_params, db_dict=db_dict,
timeout=timeout_ns_action),
timeout=timeout_ns_action + 10)
self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
elif primitive_name == "rollback":
detailed_status = await asyncio.wait_for(
self.k8scluster_map[kdu["k8scluster-type"]].rollback(
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu.get("kdu-instance"),
db_dict=db_dict),
timeout=timeout_ns_action)
elif primitive_name == "status":
detailed_status = await asyncio.wait_for(
self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu.get("kdu-instance")),
timeout=timeout_ns_action)
else:
kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
detailed_status = await asyncio.wait_for(
self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu_instance,
primitive_name=primitive_name,
params=params, db_dict=db_dict,
timeout=timeout_ns_action),
timeout=timeout_ns_action)
if detailed_status:
nslcmop_operation_state = 'COMPLETED'
else:
detailed_status = ''
nslcmop_operation_state = 'FAILED'
else:
ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"], member_vnf_index=vnf_index,
vdu_id=vdu_id, vdu_count_index=vdu_count_index,
ee_descriptor_id=ee_descriptor_id)
for vca_index, vca_deployed in enumerate(db_nsr['_admin']['deployed']['VCA']):
if vca_deployed.get("member-vnf-index") == vnf_index:
db_dict = {"collection": "nsrs",
"filter": {"_id": nsr_id},
"path": "_admin.deployed.VCA.{}.".format(vca_index)}
break
nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
ee_id,
primitive=primitive_name,
primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
timeout=timeout_ns_action,
vca_type=vca_type,
db_dict=db_dict)
db_nslcmop_update["detailed-status"] = detailed_status
error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
detailed_status))
return # database update is called inside finally
except (DbException, LcmException, N2VCException, K8sException) as e:
self.logger.error(logging_text + "Exit Exception {}".format(e))
exc = e
except asyncio.CancelledError:
self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
exc = "Operation was cancelled"
except asyncio.TimeoutError:
self.logger.error(logging_text + "Timeout while '{}'".format(step))
exc = "Timeout"
except Exception as e:
exc = traceback.format_exc()
self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
finally:
if exc:
db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
"FAILED {}: {}".format(step, exc)
nslcmop_operation_state = "FAILED"
if db_nsr:
self._write_ns_status(
nsr_id=nsr_id,
ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
current_operation="IDLE",
current_operation_id=None,
# error_description=error_description_nsr,
# error_detail=error_detail,
other_update=db_nsr_update
)
self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
if nslcmop_operation_state:
try:
await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state},
loop=self.loop)
except Exception as e:
self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
return nslcmop_operation_state, detailed_status
async def scale(self, nsr_id, nslcmop_id):
# Try to lock HA task here
task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
if not task_is_locked_by_me:
return
logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
stage = ['', '', '']
tasks_dict_info = {}
# ^ stage, step, VIM progress
self.logger.debug(logging_text + "Enter")
# get all needed from database
db_nsr = None
db_nslcmop_update = {}
db_nsr_update = {}
exc = None
# in case of error, indicates what part of scale was failed to put nsr at error status
scale_process = None
old_operational_status = ""
old_config_status = ""
nsi_id = None
try:
# wait for any previous tasks in process
step = "Waiting for previous operations to terminate"
await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
self._write_ns_status(nsr_id=nsr_id, ns_state=None,
current_operation="SCALING", current_operation_id=nslcmop_id)
step = "Getting nslcmop from database"
self.logger.debug(step + " after having waited for previous tasks to be completed")
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
step = "Getting nsr from database"
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
old_operational_status = db_nsr["operational-status"]
old_config_status = db_nsr["config-status"]
step = "Parsing scaling parameters"
db_nsr_update["operational-status"] = "scaling"
self.update_db_2("nsrs", nsr_id, db_nsr_update)
nsr_deployed = db_nsr["_admin"].get("deployed")
#######
nsr_deployed = db_nsr["_admin"].get("deployed")
vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
# vdu_id = db_nslcmop["operationParams"].get("vdu_id")
# vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
# vdu_name = db_nslcmop["operationParams"].get("vdu_name")
#######
vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
# for backward compatibility
if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
self.update_db_2("nsrs", nsr_id, db_nsr_update)
step = "Getting vnfr from database"
db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
step = "Getting vnfd from database"
db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
base_folder = db_vnfd["_admin"]["storage"]
step = "Getting scaling-group-descriptor"
scaling_descriptor = find_in_list(
get_scaling_aspect(
db_vnfd
),
lambda scale_desc: scale_desc["name"] == scaling_group
)
if not scaling_descriptor:
raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
"at vnfd:scaling-group-descriptor".format(scaling_group))
step = "Sending scale order to VIM"
# TODO check if ns is in a proper status
nb_scale_op = 0
if not db_nsr["_admin"].get("scaling-group"):
self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
admin_scale_index = 0
else:
for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
if admin_scale_info["name"] == scaling_group:
nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
break
else: # not found, set index one plus last element and add new entry with the name
admin_scale_index += 1
db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
RO_scaling_info = []
VCA_scaling_info = []
vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
if scaling_type == "SCALE_OUT":
if "aspect-delta-details" not in scaling_descriptor:
raise LcmException(
"Aspect delta details not fount in scaling descriptor {}".format(
scaling_descriptor["name"]
)
)
# count if max-instance-count is reached
deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
vdu_scaling_info["scaling_direction"] = "OUT"
vdu_scaling_info["vdu-create"] = {}
for delta in deltas:
for vdu_delta in delta["vdu-delta"]:
vdud = get_vdu(db_vnfd, vdu_delta["id"])
vdu_index = get_vdur_index(db_vnfr, vdu_delta)
cloud_init_text = self._get_vdu_cloud_init_content(vdud, db_vnfd)
if cloud_init_text:
additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
cloud_init_list = []
vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
max_instance_count = 10
if vdu_profile and "max-number-of-instances" in vdu_profile:
max_instance_count = vdu_profile.get("max-number-of-instances", 10)
default_instance_num = get_number_of_instances(db_vnfd, vdud["id"])
nb_scale_op += vdu_delta.get("number-of-instances", 1)
if nb_scale_op + default_instance_num > max_instance_count:
raise LcmException(
"reached the limit of {} (max-instance-count) "
"scaling-out operations for the "
"scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
)
for x in range(vdu_delta.get("number-of-instances", 1)):
if cloud_init_text:
# TODO Information of its own ip is not available because db_vnfr is not updated.
additional_params["OSM"] = get_osm_params(
db_vnfr,
vdu_delta["id"],
vdu_index + x
)
cloud_init_list.append(
self._parse_cloud_init(
cloud_init_text,
additional_params,
db_vnfd["id"],
vdud["id"]
)
)
VCA_scaling_info.append(
{
"osm_vdu_id": vdu_delta["id"],
"member-vnf-index": vnf_index,
"type": "create",
"vdu_index": vdu_index + x
}
)
RO_scaling_info.append(
{
"osm_vdu_id": vdu_delta["id"],
"member-vnf-index": vnf_index,
"type": "create",
"count": vdu_delta.get("number-of-instances", 1)
}
)
if cloud_init_list:
RO_scaling_info[-1]["cloud_init"] = cloud_init_list
vdu_scaling_info["vdu-create"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
elif scaling_type == "SCALE_IN":
if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
min_instance_count = int(scaling_descriptor["min-instance-count"])
vdu_scaling_info["scaling_direction"] = "IN"
vdu_scaling_info["vdu-delete"] = {}
deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
for delta in deltas:
for vdu_delta in delta["vdu-delta"]:
vdu_index = get_vdur_index(db_vnfr, vdu_delta)
min_instance_count = 0
vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
if vdu_profile and "min-number-of-instances" in vdu_profile:
min_instance_count = vdu_profile["min-number-of-instances"]
default_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"])
nb_scale_op -= vdu_delta.get("number-of-instances", 1)
if nb_scale_op + default_instance_num < min_instance_count:
raise LcmException(
"reached the limit of {} (min-instance-count) scaling-in operations for the "
"scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
)
RO_scaling_info.append({"osm_vdu_id": vdu_delta["id"], "member-vnf-index": vnf_index,
"type": "delete", "count": vdu_delta.get("number-of-instances", 1),
"vdu_index": vdu_index - 1})
for x in range(vdu_delta.get("number-of-instances", 1)):
VCA_scaling_info.append(
{
"osm_vdu_id": vdu_delta["id"],
"member-vnf-index": vnf_index,
"type": "delete",
"vdu_index": vdu_index - 1 - x
}
)
vdu_scaling_info["vdu-delete"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
# update VDU_SCALING_INFO with the VDUs to delete ip_addresses
vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
if vdu_scaling_info["scaling_direction"] == "IN":
for vdur in reversed(db_vnfr["vdur"]):
if vdu_delete.get(vdur["vdu-id-ref"]):
vdu_delete[vdur["vdu-id-ref"]] -= 1
vdu_scaling_info["vdu"].append({
"name": vdur.get("name") or vdur.get("vdu-name"),
"vdu_id": vdur["vdu-id-ref"],
"interface": []
})
for interface in vdur["interfaces"]:
vdu_scaling_info["vdu"][-1]["interface"].append({
"name": interface["name"],
"ip_address": interface["ip-address"],
"mac_address": interface.get("mac-address"),
})
# vdu_delete = vdu_scaling_info.pop("vdu-delete")
# PRE-SCALE BEGIN
step = "Executing pre-scale vnf-config-primitive"
if scaling_descriptor.get("scaling-config-action"):
for scaling_config_action in scaling_descriptor["scaling-config-action"]:
if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
step = db_nslcmop_update["detailed-status"] = \
"executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
# look for primitive
for config_primitive in (get_configuration(
db_vnfd, db_vnfd["id"]
) or {}).get("config-primitive", ()):
if config_primitive["name"] == vnf_config_primitive:
break
else:
raise LcmException(
"Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
"[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
"primitive".format(scaling_group, vnf_config_primitive))
vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
if db_vnfr.get("additionalParamsForVnf"):
vnfr_params.update(db_vnfr["additionalParamsForVnf"])
scale_process = "VCA"
db_nsr_update["config-status"] = "configuring pre-scaling"
primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
# Pre-scale retry check: Check if this sub-operation has been executed before
op_index = self._check_or_add_scale_suboperation(
db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
if op_index == self.SUBOPERATION_STATUS_SKIP:
# Skip sub-operation
result = 'COMPLETED'
result_detail = 'Done'
self.logger.debug(logging_text +
"vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
vnf_config_primitive, result, result_detail))
else:
if op_index == self.SUBOPERATION_STATUS_NEW:
# New sub-operation: Get index of this sub-operation
op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
format(vnf_config_primitive))
else:
# retry: Get registered params for this existing sub-operation
op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
vnf_index = op.get('member_vnf_index')
vnf_config_primitive = op.get('primitive')
primitive_params = op.get('primitive_params')
self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
format(vnf_config_primitive))
# Execute the primitive, either with new (first-time) or registered (reintent) args
ee_descriptor_id = config_primitive.get("execution-environment-ref")
primitive_name = config_primitive.get("execution-environment-primitive",
vnf_config_primitive)
ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
member_vnf_index=vnf_index,
vdu_id=None,
vdu_count_index=None,
ee_descriptor_id=ee_descriptor_id)
result, result_detail = await self._ns_execute_primitive(
ee_id, primitive_name, primitive_params, vca_type=vca_type)
self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
vnf_config_primitive, result, result_detail))
# Update operationState = COMPLETED | FAILED
self._update_suboperation_status(
db_nslcmop, op_index, result, result_detail)
if result == "FAILED":
raise LcmException(result_detail)
db_nsr_update["config-status"] = old_config_status
scale_process = None
# PRE-SCALE END
db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
# SCALE-IN VCA - BEGIN
if VCA_scaling_info:
step = db_nslcmop_update["detailed-status"] = \
"Deleting the execution environments"
scale_process = "VCA"
for vdu_info in VCA_scaling_info:
if vdu_info["type"] == "delete":
member_vnf_index = str(vdu_info["member-vnf-index"])
self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
vdu_id = vdu_info["osm_vdu_id"]
vdu_index = int(vdu_info["vdu_index"])
stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
member_vnf_index, vdu_id, vdu_index)
stage[2] = step = "Scaling in VCA"
self._write_op_status(
op_id=nslcmop_id,
stage=stage
)
vca_update = db_nsr["_admin"]["deployed"]["VCA"]
config_update = db_nsr["configurationStatus"]
for vca_index, vca in enumerate(vca_update):
if (vca or vca.get("ee_id")) and vca["member-vnf-index"] == member_vnf_index and \
vca["vdu_count_index"] == vdu_index:
if vca.get("vdu_id"):
config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
elif vca.get("kdu_name"):
config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
else:
config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
operation_params = db_nslcmop.get("operationParams") or {}
exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
vca.get("needed_terminate"))
task = asyncio.ensure_future(asyncio.wait_for(
self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
vca_index, destroy_ee=True,
exec_primitives=exec_terminate_primitives,
scaling_in=True), timeout=self.timeout_charm_delete))
# wait before next removal
await asyncio.sleep(30)
tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
del vca_update[vca_index]
del config_update[vca_index]
# wait for pending tasks of terminate primitives
if tasks_dict_info:
self.logger.debug(logging_text +
'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
min(self.timeout_charm_delete,
self.timeout_ns_terminate),
stage, nslcmop_id)
tasks_dict_info.clear()
if error_list:
raise LcmException("; ".join(error_list))
db_vca_and_config_update = {
"_admin.deployed.VCA": vca_update,
"configurationStatus": config_update
}
self.update_db_2("nsrs", db_nsr["_id"], db_vca_and_config_update)
scale_process = None
# SCALE-IN VCA - END
# SCALE RO - BEGIN
if RO_scaling_info:
scale_process = "RO"
if self.ro_config.get("ng"):
await self._scale_ng_ro(logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage)
vdu_scaling_info.pop("vdu-create", None)
vdu_scaling_info.pop("vdu-delete", None)
scale_process = None
if db_nsr_update:
self.update_db_2("nsrs", nsr_id, db_nsr_update)
# SCALE RO - END
# SCALE-UP VCA - BEGIN
if VCA_scaling_info:
step = db_nslcmop_update["detailed-status"] = \
"Creating new execution environments"
scale_process = "VCA"
for vdu_info in VCA_scaling_info:
if vdu_info["type"] == "create":
member_vnf_index = str(vdu_info["member-vnf-index"])
self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
vnfd_id = db_vnfr["vnfd-ref"]
vdu_index = int(vdu_info["vdu_index"])
deploy_params = {"OSM": get_osm_params(db_vnfr)}
if db_vnfr.get("additionalParamsForVnf"):
deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
descriptor_config = get_configuration(db_vnfd, db_vnfd["id"])
if descriptor_config:
vdu_id = None
vdu_name = None
kdu_name = None
self._deploy_n2vc(
logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
nsr_id=nsr_id,
nsi_id=nsi_id,
vnfd_id=vnfd_id,
vdu_id=vdu_id,
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params,
descriptor_config=descriptor_config,
base_folder=base_folder,
task_instantiation_info=tasks_dict_info,
stage=stage
)
vdu_id = vdu_info["osm_vdu_id"]
vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
descriptor_config = get_configuration(db_vnfd, vdu_id)
if vdur.get("additionalParams"):
deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
else:
deploy_params_vdu = deploy_params
deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=vdu_index)
if descriptor_config:
vdu_name = None
kdu_name = None
stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
member_vnf_index, vdu_id, vdu_index)
stage[2] = step = "Scaling out VCA"
self._write_op_status(
op_id=nslcmop_id,
stage=stage
)
self._deploy_n2vc(
logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
member_vnf_index, vdu_id, vdu_index),
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
nsr_id=nsr_id,
nsi_id=nsi_id,
vnfd_id=vnfd_id,
vdu_id=vdu_id,
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params_vdu,
descriptor_config=descriptor_config,
base_folder=base_folder,
task_instantiation_info=tasks_dict_info,
stage=stage
)
# TODO: scaling for kdu is not implemented yet.
kdu_name = vdu_info["osm_vdu_id"]
descriptor_config = get_configuration(db_vnfd, kdu_name)
if descriptor_config:
vdu_id = None
vdu_index = vdu_index
vdu_name = None
kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
if kdur.get("additionalParams"):
deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
self._deploy_n2vc(
logging_text=logging_text,
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
nsr_id=nsr_id,
nsi_id=nsi_id,
vnfd_id=vnfd_id,
vdu_id=vdu_id,
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params_kdu,
descriptor_config=descriptor_config,
base_folder=base_folder,
task_instantiation_info=tasks_dict_info,
stage=stage
)
# SCALE-UP VCA - END
scale_process = None
# POST-SCALE BEGIN
# execute primitive service POST-SCALING
step = "Executing post-scale vnf-config-primitive"
if scaling_descriptor.get("scaling-config-action"):
for scaling_config_action in scaling_descriptor["scaling-config-action"]:
if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
step = db_nslcmop_update["detailed-status"] = \
"executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
if db_vnfr.get("additionalParamsForVnf"):
vnfr_params.update(db_vnfr["additionalParamsForVnf"])
# look for primitive
for config_primitive in (
get_configuration(db_vnfd, db_vnfd["id"]) or {}
).get("config-primitive", ()):
if config_primitive["name"] == vnf_config_primitive:
break
else:
raise LcmException(
"Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
"action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
"config-primitive".format(scaling_group, vnf_config_primitive))
scale_process = "VCA"
db_nsr_update["config-status"] = "configuring post-scaling"
primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
# Post-scale retry check: Check if this sub-operation has been executed before
op_index = self._check_or_add_scale_suboperation(
db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
if op_index == self.SUBOPERATION_STATUS_SKIP:
# Skip sub-operation
result = 'COMPLETED'
result_detail = 'Done'
self.logger.debug(logging_text +
"vnf_config_primitive={} Skipped sub-operation, result {} {}".
format(vnf_config_primitive, result, result_detail))
else:
if op_index == self.SUBOPERATION_STATUS_NEW:
# New sub-operation: Get index of this sub-operation
op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
format(vnf_config_primitive))
else:
# retry: Get registered params for this existing sub-operation
op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
vnf_index = op.get('member_vnf_index')
vnf_config_primitive = op.get('primitive')
primitive_params = op.get('primitive_params')
self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
format(vnf_config_primitive))
# Execute the primitive, either with new (first-time) or registered (reintent) args
ee_descriptor_id = config_primitive.get("execution-environment-ref")
primitive_name = config_primitive.get("execution-environment-primitive",
vnf_config_primitive)
ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
member_vnf_index=vnf_index,
vdu_id=None,
vdu_count_index=None,
ee_descriptor_id=ee_descriptor_id)
result, result_detail = await self._ns_execute_primitive(
ee_id, primitive_name, primitive_params, vca_type=vca_type)
self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
vnf_config_primitive, result, result_detail))
# Update operationState = COMPLETED | FAILED
self._update_suboperation_status(
db_nslcmop, op_index, result, result_detail)
if result == "FAILED":
raise LcmException(result_detail)
db_nsr_update["config-status"] = old_config_status
scale_process = None
# POST-SCALE END
db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
else old_operational_status
db_nsr_update["config-status"] = old_config_status
return
except (ROclient.ROClientException, DbException, LcmException, NgRoException) as e:
self.logger.error(logging_text + "Exit Exception {}".format(e))
exc = e
except asyncio.CancelledError:
self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
exc = "Operation was cancelled"
except Exception as e:
exc = traceback.format_exc()
self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
finally:
self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE", current_operation_id=None)
if tasks_dict_info:
stage[1] = "Waiting for instantiate pending tasks."
self.logger.debug(logging_text + stage[1])
exc = await self._wait_for_tasks(logging_text, tasks_dict_info, self.timeout_ns_deploy,
stage, nslcmop_id, nsr_id=nsr_id)
if exc:
db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
nslcmop_operation_state = "FAILED"
if db_nsr:
db_nsr_update["operational-status"] = old_operational_status
db_nsr_update["config-status"] = old_config_status
db_nsr_update["detailed-status"] = ""
if scale_process:
if "VCA" in scale_process:
db_nsr_update["config-status"] = "failed"
if "RO" in scale_process:
db_nsr_update["operational-status"] = "failed"
db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
exc)
else:
error_description_nslcmop = None
nslcmop_operation_state = "COMPLETED"
db_nslcmop_update["detailed-status"] = "Done"
self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
if db_nsr:
self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE",
current_operation_id=None, other_update=db_nsr_update)
if nslcmop_operation_state:
try:
msg = {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state}
await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
except Exception as e:
self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
async def _scale_ng_ro(self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage):
nsr_id = db_nslcmop["nsInstanceId"]
db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
db_vnfrs = {}
# read from db: vnfd's for every vnf
db_vnfds = []
# for each vnf in ns, read vnfd
for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
# if we haven't this vnfd, read it from db
if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
# read from db
vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
db_vnfds.append(vnfd)
n2vc_key = self.n2vc.get_public_key()
n2vc_key_list = [n2vc_key]
self.scale_vnfr(db_vnfr, vdu_scaling_info.get("vdu-create"), vdu_scaling_info.get("vdu-delete"),
mark_delete=True)
# db_vnfr has been updated, update db_vnfrs to use it
db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
await self._instantiate_ng_ro(logging_text, nsr_id, db_nsd, db_nsr, db_nslcmop, db_vnfrs,
db_vnfds, n2vc_key_list, stage=stage, start_deploy=time(),
timeout_ns_deploy=self.timeout_ns_deploy)
if vdu_scaling_info.get("vdu-delete"):
self.scale_vnfr(db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False)
async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
if not self.prometheus:
return
# look if exist a file called 'prometheus*.j2' and
artifact_content = self.fs.dir_ls(artifact_path)
job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
if not job_file:
return
with self.fs.file_open((artifact_path, job_file), "r") as f:
job_data = f.read()
# TODO get_service
_, _, service = ee_id.partition(".") # remove prefix "namespace."
host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
host_port = "80"
vnfr_id = vnfr_id.replace("-", "")
variables = {
"JOB_NAME": vnfr_id,
"TARGET_IP": target_ip,
"EXPORTER_POD_IP": host_name,
"EXPORTER_POD_PORT": host_port,
}
job_list = self.prometheus.parse_job(job_data, variables)
# ensure job_name is using the vnfr_id. Adding the metadata nsr_id
for job in job_list:
if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
job["nsr_id"] = nsr_id
job_dict = {jl["job_name"]: jl for jl in job_list}
if await self.prometheus.update(job_dict):
return list(job_dict.keys())
def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
"""
Get VCA Cloud and VCA Cloud Credentials for the VIM account
:param: vim_account_id: VIM Account ID
:return: (cloud_name, cloud_credential)
"""
config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
return config.get("vca_cloud"), config.get("vca_cloud_credential")
def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
"""
Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
:param: vim_account_id: VIM Account ID
:return: (cloud_name, cloud_credential)
"""
config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")