)
from osm_lcm import ROclient
+from osm_lcm.data_utils.lcm_config import LcmCfg
from osm_lcm.data_utils.nsr import (
get_deployed_kdu,
get_deployed_vca,
check_juju_bundle_existence,
get_charm_artifact_path,
get_ee_id_parts,
+ vld_to_ro_ip_profile,
)
from osm_lcm.data_utils.nsd import (
get_ns_configuration_relation_list,
from time import time
from uuid import uuid4
-from random import randint
+from random import SystemRandom
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
class NsLcm(LcmBase):
- timeout_scale_on_error = (
- 5 * 60
- ) # Time for charm from first time at blocked,error status to mark as failed
- timeout_scale_on_error_outer_factor = 1.05 # Factor in relation to timeout_scale_on_error related to the timeout to be applied within the asyncio.wait_for coroutine
- timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
- timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
- timeout_ns_heal = 1800 # default global timeout for un deployment a ns
- timeout_charm_delete = 10 * 60
- timeout_primitive = 30 * 60 # Timeout for primitive execution
- timeout_primitive_outer_factor = 1.05 # Factor in relation to timeout_primitive related to the timeout to be applied within the asyncio.wait_for coroutine
- timeout_ns_update = 30 * 60 # timeout for ns update
- timeout_progress_primitive = (
- 10 * 60
- ) # timeout for some progress in a primitive execution
- timeout_migrate = 1800 # default global timeout for migrating vnfs
- timeout_operate = 1800 # default global timeout for migrating vnfs
- timeout_verticalscale = 1800 # default global timeout for Vertical Sclaing
SUBOPERATION_STATUS_NOT_FOUND = -1
SUBOPERATION_STATUS_NEW = -2
SUBOPERATION_STATUS_SKIP = -3
+ EE_TLS_NAME = "ee-tls"
task_name_deploy_vca = "Deploying VCA"
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config: LcmCfg):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
self.db = Database().instance.db
self.fs = Filesystem().instance.fs
- self.loop = loop
self.lcm_tasks = lcm_tasks
- self.timeout = config["timeout"]
- self.ro_config = config["ro_config"]
- self.ng_ro = config["ro_config"].get("ng")
- self.vca_config = config["VCA"].copy()
+ self.timeout = config.timeout
+ self.ro_config = config.RO
+ self.vca_config = config.VCA
# create N2VC connector
self.n2vc = N2VCJujuConnector(
log=self.logger,
- loop=self.loop,
on_update_db=self._on_update_n2vc_db,
fs=self.fs,
db=self.db,
self.conn_helm_ee = LCMHelmConn(
log=self.logger,
- loop=self.loop,
vca_config=self.vca_config,
on_update_db=self._on_update_n2vc_db,
)
self.k8sclusterhelm2 = K8sHelmConnector(
- kubectl_command=self.vca_config.get("kubectlpath"),
- helm_command=self.vca_config.get("helmpath"),
+ kubectl_command=self.vca_config.kubectlpath,
+ helm_command=self.vca_config.helmpath,
log=self.logger,
on_update_db=None,
fs=self.fs,
)
self.k8sclusterhelm3 = K8sHelm3Connector(
- kubectl_command=self.vca_config.get("kubectlpath"),
- helm_command=self.vca_config.get("helm3path"),
+ kubectl_command=self.vca_config.kubectlpath,
+ helm_command=self.vca_config.helm3path,
fs=self.fs,
log=self.logger,
db=self.db,
)
self.k8sclusterjuju = K8sJujuConnector(
- kubectl_command=self.vca_config.get("kubectlpath"),
- juju_command=self.vca_config.get("jujupath"),
+ kubectl_command=self.vca_config.kubectlpath,
+ juju_command=self.vca_config.jujupath,
log=self.logger,
- loop=self.loop,
on_update_db=self._on_update_k8s_db,
fs=self.fs,
db=self.db,
}
# create RO client
- self.RO = NgRoClient(self.loop, **self.ro_config)
+ self.RO = NgRoClient(**self.ro_config.to_dict())
self.op_status_map = {
"instantiation": self.RO.status,
return None
def _on_update_ro_db(self, nsrs_id, ro_descriptor):
-
# self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
try:
)
async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
-
# remove last dot from path (if exists)
if path.endswith("."):
path = path[:-1]
# self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
# .format(table, filter, path, updated_data))
try:
-
nsr_id = filter.get("_id")
# read ns record from database
return wim_account
def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
-
db_vdu_push_list = []
template_vdur = []
db_update = {"_admin.modified": time()}
start_deploy,
timeout_ns_deploy,
):
-
db_vims = {}
def get_vim_account(vim_account_id):
target_vim, target_vld, vld_params, target_sdn
):
if vld_params.get("ip-profile"):
- target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
- "ip-profile"
- ]
+ target_vld["vim_info"][target_vim]["ip_profile"] = vld_to_ro_ip_profile(
+ vld_params["ip-profile"]
+ )
if vld_params.get("provider-network"):
target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
"provider-network"
lambda v_vld: v_vld["name"] in (a_vld["name"], a_vld["id"]),
)
if target_vld:
-
if vnf_params.get("vimAccountId") not in a_vld.get(
"vim_info", {}
):
image["vim_info"] = {}
for flavor in target["flavor"]:
flavor["vim_info"] = {}
+ if db_nsr.get("shared-volumes"):
+ target["shared-volumes"] = deepcopy(db_nsr["shared-volumes"])
+ for shared_volumes in target["shared-volumes"]:
+ shared_volumes["vim_info"] = {}
if db_nsr.get("affinity-or-anti-affinity-group"):
target["affinity-or-anti-affinity-group"] = deepcopy(
db_nsr["affinity-or-anti-affinity-group"]
# check if this network needs SDN assist
if vld.get("pci-interfaces"):
db_vim = get_vim_account(ns_params["vimAccountId"])
- sdnc_id = db_vim["config"].get("sdn-controller")
- if sdnc_id:
- sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
- target_sdn = "sdn:{}".format(sdnc_id)
- target_vld["vim_info"][target_sdn] = {
- "sdn": True,
- "target_vim": target_vim,
- "vlds": [sdn_vld],
- "type": vld.get("type"),
- }
+ if vim_config := db_vim.get("config"):
+ if sdnc_id := vim_config.get("sdn-controller"):
+ sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
+ target_sdn = "sdn:{}".format(sdnc_id)
+ target_vld["vim_info"][target_sdn] = {
+ "sdn": True,
+ "target_vim": target_vim,
+ "vlds": [sdn_vld],
+ "type": vld.get("type"),
+ }
nsd_vnf_profiles = get_vnf_profiles(nsd)
for nsd_vnf_profile in nsd_vnf_profiles:
and nsd_vlp.get("virtual-link-protocol-data")
and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
):
- ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
+ vld_params["ip-profile"] = nsd_vlp["virtual-link-protocol-data"][
"l3-protocol-data"
]
- ip_profile_dest_data = {}
- if "ip-version" in ip_profile_source_data:
- ip_profile_dest_data["ip-version"] = ip_profile_source_data[
- "ip-version"
- ]
- if "cidr" in ip_profile_source_data:
- ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
- "cidr"
- ]
- if "gateway-ip" in ip_profile_source_data:
- ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
- "gateway-ip"
- ]
- if "dhcp-enabled" in ip_profile_source_data:
- ip_profile_dest_data["dhcp-params"] = {
- "enabled": ip_profile_source_data["dhcp-enabled"]
- }
- vld_params["ip-profile"] = ip_profile_dest_data
# update vld_params with instantiation params
vld_instantiation_params = find_in_list(
and vnfd_vlp.get("virtual-link-protocol-data")
and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
):
- ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
+ vld_params["ip-profile"] = vnfd_vlp["virtual-link-protocol-data"][
"l3-protocol-data"
]
- ip_profile_dest_data = {}
- if "ip-version" in ip_profile_source_data:
- ip_profile_dest_data["ip-version"] = ip_profile_source_data[
- "ip-version"
- ]
- if "cidr" in ip_profile_source_data:
- ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
- "cidr"
- ]
- if "gateway-ip" in ip_profile_source_data:
- ip_profile_dest_data[
- "gateway-address"
- ] = ip_profile_source_data["gateway-ip"]
- if "dhcp-enabled" in ip_profile_source_data:
- ip_profile_dest_data["dhcp-params"] = {
- "enabled": ip_profile_source_data["dhcp-enabled"]
- }
-
- vld_params["ip-profile"] = ip_profile_dest_data
# update vld_params with instantiation params
if vnf_params:
vld_instantiation_params = find_in_list(
if target_vim not in ns_ags["vim_info"]:
ns_ags["vim_info"][target_vim] = {}
+ # shared-volumes
+ if vdur.get("shared-volumes-id"):
+ for sv_id in vdur["shared-volumes-id"]:
+ ns_sv = find_in_list(
+ target["shared-volumes"], lambda sv: sv_id in sv["id"]
+ )
+ if ns_sv:
+ ns_sv["vim_info"][target_vim] = {}
+
vdur["vim_info"] = {target_vim: {}}
# instantiation parameters
if vnf_params:
vdu_instantiation_params, vdud
)
vdur["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
+ vdur["additionalParams"]["OSM"][
+ "vim_flavor_id"
+ ] = vdu_instantiation_params.get("vim-flavor-id")
vdur_list.append(vdur)
target_vnf["vdur"] = vdur_list
target["vnf"].append(target_vnf)
db_nsr_update["detailed-status"] = " ".join(stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
- await asyncio.sleep(15, loop=self.loop)
+ await asyncio.sleep(15)
else: # timeout_ns_deploy
raise NgRoException("Timeout waiting ns to deploy")
}
desc = await self.RO.deploy(nsr_id, target)
action_id = desc["action_id"]
- db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
self.logger.debug(
logging_text
stage,
operation="termination",
)
-
- db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
# delete all nsr
await self.RO.delete(nsr_id)
- except Exception as e:
- if isinstance(e, NgRoException) and e.http_code == 404: # not found
+ except NgRoException as e:
+ if e.http_code == 404: # not found
db_nsr_update["_admin.deployed.RO.nsr_id"] = None
db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
- db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
self.logger.debug(
logging_text + "RO_action_id={} already deleted".format(action_id)
)
- elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
+ elif e.http_code == 409: # conflict
failed_detail.append("delete conflict: {}".format(e))
self.logger.debug(
logging_text
logging_text
+ "RO_action_id={} delete error: {}".format(action_id, e)
)
+ except Exception as e:
+ failed_detail.append("delete error: {}".format(e))
+ self.logger.error(
+ logging_text + "RO_action_id={} delete error: {}".format(action_id, e)
+ )
if failed_detail:
stage[2] = "Error deleting from VIM"
if ns_params and ns_params.get("timeout_ns_deploy"):
timeout_ns_deploy = ns_params["timeout_ns_deploy"]
else:
- timeout_ns_deploy = self.timeout.get(
- "ns_deploy", self.timeout_ns_deploy
- )
+ timeout_ns_deploy = self.timeout.ns_deploy
# Check for and optionally request placement optimization. Database will be updated if placement activated
stage[2] = "Waiting for Placement."
"target KDU={} is in error state".format(kdu_name)
)
- await asyncio.sleep(10, loop=self.loop)
+ await asyncio.sleep(10)
nb_tries += 1
raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
"""
self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
- ro_nsr_id = None
ip_address = None
- nb_tries = 0
target_vdu_id = None
ro_retries = 0
while True:
-
ro_retries += 1
if ro_retries >= 360: # 1 hour
raise LcmException(
"Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
)
- await asyncio.sleep(10, loop=self.loop)
+ await asyncio.sleep(10)
# get ip address
if not target_vdu_id:
self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
return ip_address
try:
- ro_vm_id = "{}-{}".format(
- db_vnfr["member-vnf-index-ref"], target_vdu_id
- ) # TODO add vdu_index
- if self.ng_ro:
- target = {
- "action": {
- "action": "inject_ssh_key",
- "key": pub_key,
- "user": user,
- },
- "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
- }
- desc = await self.RO.deploy(nsr_id, target)
- action_id = desc["action_id"]
- await self._wait_ng_ro(
- nsr_id, action_id, timeout=600, operation="instantiation"
- )
- break
- else:
- # wait until NS is deployed at RO
- if not ro_nsr_id:
- db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
- ro_nsr_id = deep_get(
- db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
- )
- if not ro_nsr_id:
- continue
- result_dict = await self.RO.create_action(
- item="ns",
- item_id_name=ro_nsr_id,
- descriptor={
- "add_public_key": pub_key,
- "vms": [ro_vm_id],
- "user": user,
- },
- )
- # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
- if not result_dict or not isinstance(result_dict, dict):
- raise LcmException(
- "Unknown response from RO when injecting key"
- )
- for result in result_dict.values():
- if result.get("vim_result") == 200:
- break
- else:
- raise ROclient.ROClientException(
- "error injecting key: {}".format(
- result.get("description")
- )
- )
- break
+ target = {
+ "action": {
+ "action": "inject_ssh_key",
+ "key": pub_key,
+ "user": user,
+ },
+ "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
+ }
+ desc = await self.RO.deploy(nsr_id, target)
+ action_id = desc["action_id"]
+ await self._wait_ng_ro(
+ nsr_id, action_id, timeout=600, operation="instantiation"
+ )
+ break
except NgRoException as e:
raise LcmException(
"Reaching max tries injecting key. Error: {}".format(e)
)
- except ROclient.ROClientException as e:
- if not nb_tries:
- self.logger.debug(
- logging_text
- + "error injecting key: {}. Retrying until {} seconds".format(
- e, 20 * 10
- )
- )
- nb_tries += 1
- if nb_tries >= 20:
- raise LcmException(
- "Reaching max tries injecting key. Error: {}".format(e)
- )
else:
break
vdu_id,
kdu_name,
vdu_index,
+ kdu_index,
config_descriptor,
deploy_params,
base_folder,
}
step = ""
try:
-
element_type = "NS"
element_under_configuration = nsr_id
vca_id = self.get_vca_id(db_vnfr, db_nsr)
# create or register execution environment in VCA
if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
-
self._write_configuration_status(
nsr_id=nsr_id,
vca_index=vca_index,
ee_id, credentials = await self.vca_map[
vca_type
].create_execution_environment(
- namespace=namespace,
+ namespace=nsr_id,
reuse_ee_id=ee_id,
db_dict=db_dict,
config=osm_config,
)
# add relations for this VCA (wait for other peers related with this VCA)
- await self._add_vca_relations(
+ is_relation_added = await self._add_vca_relations(
logging_text=logging_text,
nsr_id=nsr_id,
vca_type=vca_type,
vca_index=vca_index,
)
+ if not is_relation_added:
+ raise LcmException("Relations could not be added to VCA.")
+
# if SSH access is required, then get execution environment SSH public
# if native charm we have waited already to VM be UP
if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
vnfr_id=vnfr_id,
nsr_id=nsr_id,
target_ip=rw_mgmt_ip,
+ element_type=element_type,
+ vnf_member_index=db_vnfr.get("member-vnf-index-ref", ""),
+ vdu_id=vdu_id,
+ vdu_index=vdu_index,
+ kdu_name=kdu_name,
+ kdu_index=kdu_index,
)
if prometheus_jobs:
self.update_db_2(
self._write_configuration_status(
nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
)
- raise LcmException("{} {}".format(step, e)) from e
+ raise LcmException("{}. {}".format(step, e)) from e
def _write_ns_status(
self,
element_type: str = None,
other_update: dict = None,
):
-
# self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
# .format(vca_index, status))
self.logger.debug(
logging_text + "Invoke and wait for placement optimization"
)
- await self.msg.aiowrite(
- "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
- )
+ await self.msg.aiowrite("pla", "get_placement", {"nslcmopId": nslcmop_id})
db_poll_interval = 5
wait = db_poll_interval * 10
pla_result = None
vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
return modified
+ def _gather_vnfr_healing_alerts(self, vnfr, vnfd):
+ alerts = []
+ nsr_id = vnfr["nsr-id-ref"]
+ df = vnfd.get("df", [{}])[0]
+ # Checking for auto-healing configuration
+ if "healing-aspect" in df:
+ healing_aspects = df["healing-aspect"]
+ for healing in healing_aspects:
+ for healing_policy in healing.get("healing-policy", ()):
+ vdu_id = healing_policy["vdu-id"]
+ vdur = next(
+ (vdur for vdur in vnfr["vdur"] if vdu_id == vdur["vdu-id-ref"]),
+ {},
+ )
+ if not vdur:
+ continue
+ metric_name = "vm_status"
+ vdu_name = vdur.get("name")
+ vnf_member_index = vnfr["member-vnf-index-ref"]
+ uuid = str(uuid4())
+ name = f"healing_{uuid}"
+ action = healing_policy
+ # action_on_recovery = healing.get("action-on-recovery")
+ # cooldown_time = healing.get("cooldown-time")
+ # day1 = healing.get("day1")
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_name": vdu_name,
+ },
+ "alarm_status": "ok",
+ "action_type": "healing",
+ "action": action,
+ }
+ alerts.append(alert)
+ return alerts
+
+ def _gather_vnfr_scaling_alerts(self, vnfr, vnfd):
+ alerts = []
+ nsr_id = vnfr["nsr-id-ref"]
+ df = vnfd.get("df", [{}])[0]
+ # Checking for auto-scaling configuration
+ if "scaling-aspect" in df:
+ rel_operation_types = {
+ "GE": ">=",
+ "LE": "<=",
+ "GT": ">",
+ "LT": "<",
+ "EQ": "==",
+ "NE": "!=",
+ }
+ scaling_aspects = df["scaling-aspect"]
+ all_vnfd_monitoring_params = {}
+ for ivld in vnfd.get("int-virtual-link-desc", ()):
+ for mp in ivld.get("monitoring-parameters", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for vdu in vnfd.get("vdu", ()):
+ for mp in vdu.get("monitoring-parameter", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for df in vnfd.get("df", ()):
+ for mp in df.get("monitoring-parameter", ()):
+ all_vnfd_monitoring_params[mp.get("id")] = mp
+ for scaling_aspect in scaling_aspects:
+ scaling_group_name = scaling_aspect.get("name", "")
+ # Get monitored VDUs
+ all_monitored_vdus = set()
+ for delta in scaling_aspect.get("aspect-delta-details", {}).get(
+ "deltas", ()
+ ):
+ for vdu_delta in delta.get("vdu-delta", ()):
+ all_monitored_vdus.add(vdu_delta.get("id"))
+ monitored_vdurs = list(
+ filter(
+ lambda vdur: vdur["vdu-id-ref"] in all_monitored_vdus,
+ vnfr["vdur"],
+ )
+ )
+ if not monitored_vdurs:
+ self.logger.error(
+ "Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric"
+ )
+ continue
+ for scaling_policy in scaling_aspect.get("scaling-policy", ()):
+ if scaling_policy["scaling-type"] != "automatic":
+ continue
+ threshold_time = scaling_policy.get("threshold-time", "1")
+ cooldown_time = scaling_policy.get("cooldown-time", "0")
+ for scaling_criteria in scaling_policy["scaling-criteria"]:
+ monitoring_param_ref = scaling_criteria.get(
+ "vnf-monitoring-param-ref"
+ )
+ vnf_monitoring_param = all_vnfd_monitoring_params[
+ monitoring_param_ref
+ ]
+ for vdur in monitored_vdurs:
+ vdu_id = vdur["vdu-id-ref"]
+ metric_name = vnf_monitoring_param.get("performance-metric")
+ metric_name = f"osm_{metric_name}"
+ vnf_member_index = vnfr["member-vnf-index-ref"]
+ scalein_threshold = scaling_criteria.get(
+ "scale-in-threshold"
+ )
+ scaleout_threshold = scaling_criteria.get(
+ "scale-out-threshold"
+ )
+ # Looking for min/max-number-of-instances
+ instances_min_number = 1
+ instances_max_number = 1
+ vdu_profile = df["vdu-profile"]
+ if vdu_profile:
+ profile = next(
+ item for item in vdu_profile if item["id"] == vdu_id
+ )
+ instances_min_number = profile.get(
+ "min-number-of-instances", 1
+ )
+ instances_max_number = profile.get(
+ "max-number-of-instances", 1
+ )
+
+ if scalein_threshold:
+ uuid = str(uuid4())
+ name = f"scalein_{uuid}"
+ operation = scaling_criteria[
+ "scale-in-relational-operation"
+ ]
+ rel_operator = rel_operation_types.get(operation, "<=")
+ metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
+ expression = f"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})"
+ labels = {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ }
+ prom_cfg = {
+ "alert": name,
+ "expr": expression,
+ "for": str(threshold_time) + "m",
+ "labels": labels,
+ }
+ action = scaling_policy
+ action = {
+ "scaling-group": scaling_group_name,
+ "cooldown-time": cooldown_time,
+ }
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ },
+ "alarm_status": "ok",
+ "action_type": "scale_in",
+ "action": action,
+ "prometheus_config": prom_cfg,
+ }
+ alerts.append(alert)
+
+ if scaleout_threshold:
+ uuid = str(uuid4())
+ name = f"scaleout_{uuid}"
+ operation = scaling_criteria[
+ "scale-out-relational-operation"
+ ]
+ rel_operator = rel_operation_types.get(operation, "<=")
+ metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
+ expression = f"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})"
+ labels = {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ }
+ prom_cfg = {
+ "alert": name,
+ "expr": expression,
+ "for": str(threshold_time) + "m",
+ "labels": labels,
+ }
+ action = scaling_policy
+ action = {
+ "scaling-group": scaling_group_name,
+ "cooldown-time": cooldown_time,
+ }
+ alert = {
+ "uuid": uuid,
+ "name": name,
+ "metric": metric_name,
+ "tags": {
+ "ns_id": nsr_id,
+ "vnf_member_index": vnf_member_index,
+ "vdu_id": vdu_id,
+ },
+ "alarm_status": "ok",
+ "action_type": "scale_out",
+ "action": action,
+ "prometheus_config": prom_cfg,
+ }
+ alerts.append(alert)
+ return alerts
+
def update_nsrs_with_pla_result(self, params):
try:
nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
# update operation on nslcmops
db_nslcmop_update = {}
+ timeout_ns_deploy = self.timeout.ns_deploy
+
nslcmop_operation_state = None
db_vnfrs = {} # vnf's info indexed by member-index
# n2vc_info = {}
ns_params = db_nslcmop.get("operationParams")
if ns_params and ns_params.get("timeout_ns_deploy"):
timeout_ns_deploy = ns_params["timeout_ns_deploy"]
- else:
- timeout_ns_deploy = self.timeout.get(
- "ns_deploy", self.timeout_ns_deploy
- )
# read from db: ns
stage[1] = "Getting nsr={} from db.".format(nsr_id)
# feature 1429. Add n2vc public key to needed VMs
n2vc_key = self.n2vc.get_public_key()
n2vc_key_list = [n2vc_key]
- if self.vca_config.get("public_key"):
- n2vc_key_list.append(self.vca_config["public_key"])
+ if self.vca_config.public_key:
+ n2vc_key_list.append(self.vca_config.public_key)
stage[1] = "Deploying NS at VIM."
task_ro = asyncio.ensure_future(
# create namespace and certificate if any helm based EE is present in the NS
if check_helm_ee_in_ns(db_vnfds):
- # TODO: create EE namespace
+ await self.vca_map["helm-v3"].setup_ns_namespace(
+ name=nsr_id,
+ )
# create TLS certificates
await self.vca_map["helm-v3"].create_tls_certificate(
- secret_name="ee-tls-{}".format(nsr_id),
+ secret_name=self.EE_TLS_NAME,
dns_prefix="*",
nsr_id=nsr_id,
usage="server auth",
+ namespace=nsr_id,
)
nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
vdu_index = 0
vdu_name = None
kdu_name = None
+ kdu_index = None
# Get additional parameters
deploy_params = {"OSM": get_osm_params(db_vnfr)}
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
+ kdu_index=kdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params,
descriptor_config=descriptor_config,
if descriptor_config:
vdu_name = None
kdu_name = None
+ kdu_index = None
for vdu_index in range(vdud_count):
# TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
self._deploy_n2vc(
vnfd_id=vnfd_id,
vdu_id=vdu_id,
kdu_name=kdu_name,
+ kdu_index=kdu_index,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
vdu_name=vdu_name,
vdu_id = None
vdu_index = 0
vdu_name = None
- kdur = next(
- x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
+ kdu_index, kdur = next(
+ x
+ for x in enumerate(db_vnfr["kdur"])
+ if x[1]["kdu-name"] == kdu_name
)
deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
if kdur.get("additionalParams"):
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
+ kdu_index=kdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params_kdu,
descriptor_config=descriptor_config,
stage=stage,
)
+ # Check if each vnf has exporter for metric collection if so update prometheus job records
+ if "exporters-endpoints" in vnfd.get("df")[0]:
+ exporter_config = vnfd.get("df")[0].get("exporters-endpoints")
+ self.logger.debug("exporter config :{}".format(exporter_config))
+ artifact_path = "{}/{}/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ "exporter-endpoint",
+ )
+ ee_id = None
+ ee_config_descriptor = exporter_config
+ vnfr_id = db_vnfr["id"]
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
+ logging_text,
+ nsr_id,
+ vnfr_id,
+ vdu_id=None,
+ vdu_index=None,
+ user=None,
+ pub_key=None,
+ )
+ self.logger.debug("rw_mgmt_ip:{}".format(rw_mgmt_ip))
+ self.logger.debug("Artifact_path:{}".format(artifact_path))
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
+ vdu_id_for_prom = None
+ vdu_index_for_prom = None
+ for x in get_iterable(db_vnfr, "vdur"):
+ vdu_id_for_prom = x.get("vdu-id-ref")
+ vdu_index_for_prom = x.get("count-index")
+ prometheus_jobs = await self.extract_prometheus_scrape_jobs(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ ee_config_descriptor=ee_config_descriptor,
+ vnfr_id=vnfr_id,
+ nsr_id=nsr_id,
+ target_ip=rw_mgmt_ip,
+ element_type="VDU",
+ vdu_id=vdu_id_for_prom,
+ vdu_index=vdu_index_for_prom,
+ )
+
+ self.logger.debug("Prometheus job:{}".format(prometheus_jobs))
+ if prometheus_jobs:
+ db_nsr_update["_admin.deployed.prometheus_jobs"] = prometheus_jobs
+ self.update_db_2(
+ "nsrs",
+ nsr_id,
+ db_nsr_update,
+ )
+
+ for job in prometheus_jobs:
+ self.db.set_one(
+ "prometheus_jobs",
+ {"job_name": job["job_name"]},
+ job,
+ upsert=True,
+ fail_on_empty=False,
+ )
+
# Check if this NS has a charm configuration
descriptor_config = nsd.get("ns-configuration")
if descriptor_config and descriptor_config.get("juju"):
member_vnf_index = None
vdu_id = None
kdu_name = None
+ kdu_index = None
vdu_index = 0
vdu_name = None
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
+ kdu_index=kdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params,
descriptor_config=descriptor_config,
db_nsr_update["detailed-status"] = "Done"
db_nslcmop_update["detailed-status"] = "Done"
nslcmop_operation_state = "COMPLETED"
+ # Gather auto-healing and auto-scaling alerts for each vnfr
+ healing_alerts = []
+ scaling_alerts = []
+ for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
+ vnfd = next(
+ (sub for sub in db_vnfds if sub["_id"] == vnfr["vnfd-id"]), None
+ )
+ healing_alerts = self._gather_vnfr_healing_alerts(vnfr, vnfd)
+ for alert in healing_alerts:
+ self.logger.info(f"Storing healing alert in MongoDB: {alert}")
+ self.db.create("alerts", alert)
+
+ scaling_alerts = self._gather_vnfr_scaling_alerts(vnfr, vnfd)
+ for alert in scaling_alerts:
+ self.logger.info(f"Storing scaling alert in MongoDB: {alert}")
+ self.db.create("alerts", alert)
if db_nsr:
self._write_ns_status(
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
- def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
+ def _get_vnfd(self, vnfd_id: str, projects_read: str, cached_vnfds: Dict[str, Any]):
if vnfd_id not in cached_vnfds:
- cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
+ cached_vnfds[vnfd_id] = self.db.get_one(
+ "vnfds", {"id": vnfd_id, "_admin.projects_read": projects_read}
+ )
return cached_vnfds[vnfd_id]
def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
]:
vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
vnfd_id = vnf_profile["vnfd-id"]
- db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
+ project = nsd["_admin"]["projects_read"][0]
+ db_vnfd = self._get_vnfd(vnfd_id, project, cached_vnfds)
entity_id = (
vnfd_id
if ee_relation_level == EELevel.VNF
cached_vnfds: Dict[str, Any],
) -> List[Relation]:
relations = []
+ if vca.target_element == "ns":
+ self.logger.debug("VCA is a NS charm, not a VNF.")
+ return relations
vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
vnf_profile_id = vnf_profile["id"]
vnfd_id = vnf_profile["vnfd-id"]
- db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
+ project = nsd["_admin"]["projects_read"][0]
+ db_vnfd = self._get_vnfd(vnfd_id, project, cached_vnfds)
db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
for r in db_vnf_relations:
provider_dict = None
vnf_profiles,
lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
)["vnfd-id"]
- db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
+ project = nsd["_admin"]["projects_read"][0]
+ db_vnfd = self._get_vnfd(vnfd_id, project, cached_vnfds)
kdu_resource_profile = get_kdu_resource_profile(
db_vnfd, ee_relation.kdu_resource_profile_id
)
requirer_vca_id,
relation.requirer.endpoint,
)
- await self.vca_map[vca_type].add_relation(
- provider=provider_relation_endpoint,
- requirer=requirer_relation_endpoint,
- )
- # remove entry from relations list
+ try:
+ await self.vca_map[vca_type].add_relation(
+ provider=provider_relation_endpoint,
+ requirer=requirer_relation_endpoint,
+ )
+ except N2VCException as exception:
+ self.logger.error(exception)
+ raise LcmException(exception)
return True
return False
vca_index: int,
timeout: int = 3600,
) -> bool:
-
# steps:
# 1. find all relations for this VCA
# 2. wait for other peers related
timeout: int = 600,
vca_id: str = None,
):
-
try:
k8sclustertype = k8s_instance_info["k8scluster-type"]
# Instantiate kdu
kdu_name,
member_vnf_index,
vdu_index,
+ kdu_index,
vdu_name,
deploy_params,
descriptor_config,
vdu_id=vdu_id,
kdu_name=kdu_name,
vdu_index=vdu_index,
+ kdu_index=kdu_index,
deploy_params=deploy_params,
config_descriptor=descriptor_config,
base_folder=base_folder,
# TODO vdu_index_count
for vca in vca_deployed_list:
if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
- return vca["ee_id"]
+ return vca.get("ee_id")
async def destroy_N2VC(
self,
try:
await self.n2vc.delete_namespace(
namespace=namespace,
- total_timeout=self.timeout_charm_delete,
+ total_timeout=self.timeout.charm_delete,
vca_id=vca_id,
)
except N2VCNotFound: # already deleted. Skip
pass
self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
- async def _terminate_RO(
- self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
- ):
- """
- Terminates a deployment from RO
- :param logging_text:
- :param nsr_deployed: db_nsr._admin.deployed
- :param nsr_id:
- :param nslcmop_id:
- :param stage: list of string with the content to write on db_nslcmop.detailed-status.
- this method will update only the index 2, but it will write on database the concatenated content of the list
- :return:
- """
- db_nsr_update = {}
- failed_detail = []
- ro_nsr_id = ro_delete_action = None
- if nsr_deployed and nsr_deployed.get("RO"):
- ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
- ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
- try:
- if ro_nsr_id:
- stage[2] = "Deleting ns from VIM."
- db_nsr_update["detailed-status"] = " ".join(stage)
- self._write_op_status(nslcmop_id, stage)
- self.logger.debug(logging_text + stage[2])
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
- self._write_op_status(nslcmop_id, stage)
- desc = await self.RO.delete("ns", ro_nsr_id)
- ro_delete_action = desc["action_id"]
- db_nsr_update[
- "_admin.deployed.RO.nsr_delete_action_id"
- ] = ro_delete_action
- db_nsr_update["_admin.deployed.RO.nsr_id"] = None
- db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
- if ro_delete_action:
- # wait until NS is deleted from VIM
- stage[2] = "Waiting ns deleted from VIM."
- detailed_status_old = None
- self.logger.debug(
- logging_text
- + stage[2]
- + " RO_id={} ro_delete_action={}".format(
- ro_nsr_id, ro_delete_action
- )
- )
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
- self._write_op_status(nslcmop_id, stage)
-
- delete_timeout = 20 * 60 # 20 minutes
- while delete_timeout > 0:
- desc = await self.RO.show(
- "ns",
- item_id_name=ro_nsr_id,
- extra_item="action",
- extra_item_id=ro_delete_action,
- )
-
- # deploymentStatus
- self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
-
- ns_status, ns_status_info = self.RO.check_action_status(desc)
- if ns_status == "ERROR":
- raise ROclient.ROClientException(ns_status_info)
- elif ns_status == "BUILD":
- stage[2] = "Deleting from VIM {}".format(ns_status_info)
- elif ns_status == "ACTIVE":
- db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
- db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
- break
- else:
- assert (
- False
- ), "ROclient.check_action_status returns unknown {}".format(
- ns_status
- )
- if stage[2] != detailed_status_old:
- detailed_status_old = stage[2]
- db_nsr_update["detailed-status"] = " ".join(stage)
- self._write_op_status(nslcmop_id, stage)
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
- await asyncio.sleep(5, loop=self.loop)
- delete_timeout -= 5
- else: # delete_timeout <= 0:
- raise ROclient.ROClientException(
- "Timeout waiting ns deleted from VIM"
- )
-
- except Exception as e:
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
- if (
- isinstance(e, ROclient.ROClientException) and e.http_code == 404
- ): # not found
- db_nsr_update["_admin.deployed.RO.nsr_id"] = None
- db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
- db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
- self.logger.debug(
- logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
- )
- elif (
- isinstance(e, ROclient.ROClientException) and e.http_code == 409
- ): # conflict
- failed_detail.append("delete conflict: {}".format(e))
- self.logger.debug(
- logging_text
- + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
- )
- else:
- failed_detail.append("delete error: {}".format(e))
- self.logger.error(
- logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
- )
-
- # Delete nsd
- if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
- ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
- try:
- stage[2] = "Deleting nsd from RO."
- db_nsr_update["detailed-status"] = " ".join(stage)
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
- self._write_op_status(nslcmop_id, stage)
- await self.RO.delete("nsd", ro_nsd_id)
- self.logger.debug(
- logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
- )
- db_nsr_update["_admin.deployed.RO.nsd_id"] = None
- except Exception as e:
- if (
- isinstance(e, ROclient.ROClientException) and e.http_code == 404
- ): # not found
- db_nsr_update["_admin.deployed.RO.nsd_id"] = None
- self.logger.debug(
- logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
- )
- elif (
- isinstance(e, ROclient.ROClientException) and e.http_code == 409
- ): # conflict
- failed_detail.append(
- "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
- )
- self.logger.debug(logging_text + failed_detail[-1])
- else:
- failed_detail.append(
- "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
- )
- self.logger.error(logging_text + failed_detail[-1])
-
- if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
- for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
- if not vnf_deployed or not vnf_deployed["id"]:
- continue
- try:
- ro_vnfd_id = vnf_deployed["id"]
- stage[
- 2
- ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
- vnf_deployed["member-vnf-index"], ro_vnfd_id
- )
- db_nsr_update["detailed-status"] = " ".join(stage)
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
- self._write_op_status(nslcmop_id, stage)
- await self.RO.delete("vnfd", ro_vnfd_id)
- self.logger.debug(
- logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
- )
- db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
- except Exception as e:
- if (
- isinstance(e, ROclient.ROClientException) and e.http_code == 404
- ): # not found
- db_nsr_update[
- "_admin.deployed.RO.vnfd.{}.id".format(index)
- ] = None
- self.logger.debug(
- logging_text
- + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
- )
- elif (
- isinstance(e, ROclient.ROClientException) and e.http_code == 409
- ): # conflict
- failed_detail.append(
- "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
- )
- self.logger.debug(logging_text + failed_detail[-1])
- else:
- failed_detail.append(
- "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
- )
- self.logger.error(logging_text + failed_detail[-1])
-
- if failed_detail:
- stage[2] = "Error deleting from VIM"
- else:
- stage[2] = "Deleted from VIM"
- db_nsr_update["detailed-status"] = " ".join(stage)
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
- self._write_op_status(nslcmop_id, stage)
-
- if failed_detail:
- raise LcmException("; ".join(failed_detail))
-
async def terminate(self, nsr_id, nslcmop_id):
# Try to lock HA task here
task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
- timeout_ns_terminate = self.timeout_ns_terminate
+ timeout_ns_terminate = self.timeout.ns_terminate
db_nsr = None
db_nslcmop = None
operation_params = None
error_list = await self._wait_for_tasks(
logging_text,
tasks_dict_info,
- min(self.timeout_charm_delete, timeout_ns_terminate),
+ min(self.timeout.charm_delete, timeout_ns_terminate),
stage,
nslcmop_id,
)
task_delete_ee = asyncio.ensure_future(
asyncio.wait_for(
self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
- timeout=self.timeout_charm_delete,
+ timeout=self.timeout.charm_delete,
)
)
# task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
# Delete Namespace and Certificates if necessary
if check_helm_ee_in_ns(list(db_vnfds_from_member_index.values())):
await self.vca_map["helm-v3"].delete_tls_certificate(
- certificate_name=db_nslcmop["nsInstanceId"],
+ namespace=db_nslcmop["nsInstanceId"],
+ certificate_name=self.EE_TLS_NAME,
+ )
+ await self.vca_map["helm-v3"].delete_namespace(
+ namespace=db_nslcmop["nsInstanceId"],
)
- # TODO: Delete namespace
# Delete from k8scluster
stage[1] = "Deleting KDUs."
# remove from RO
stage[1] = "Deleting ns from VIM."
- if self.ng_ro:
+ if self.ro_config.ng:
task_delete_ro = asyncio.ensure_future(
self._terminate_ng_ro(
logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
)
)
- else:
- task_delete_ro = asyncio.ensure_future(
- self._terminate_RO(
- logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
- )
- )
- tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
+ tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
# rest of staff will be done at finally
"operationState": nslcmop_operation_state,
"autoremove": autoremove,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
)
+ self.logger.debug(f"Deleting alerts: ns_id={nsr_id}")
+ self.db.del_list("alerts", {"tags.ns_id": nsr_id})
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
ee_id=ee_id,
primitive_name=primitive,
params_dict=primitive_params,
- progress_timeout=self.timeout_progress_primitive,
- total_timeout=self.timeout_primitive,
+ progress_timeout=self.timeout.progress_primitive,
+ total_timeout=self.timeout.primitive,
db_dict=db_dict,
vca_id=vca_id,
vca_type=vca_type,
),
- timeout=timeout or self.timeout_primitive,
+ timeout=timeout or self.timeout.primitive,
)
# execution was OK
break
)
)
# wait and retry
- await asyncio.sleep(retries_interval, loop=self.loop)
+ await asyncio.sleep(retries_interval)
else:
if isinstance(e, asyncio.TimeoutError):
e = N2VCException(
nslcmop_operation_state = None
error_description_nslcmop = None
exc = None
+ step = ""
try:
# wait for any previous tasks in process
step = "Waiting for previous operations to terminate"
primitive = db_nslcmop["operationParams"]["primitive"]
primitive_params = db_nslcmop["operationParams"]["primitive_params"]
timeout_ns_action = db_nslcmop["operationParams"].get(
- "timeout_ns_action", self.timeout_primitive
+ "timeout_ns_action", self.timeout.primitive
)
if vnf_index:
del desc_params["kdu_model"]
else:
kdu_model = kdu.get("kdu-model")
- parts = kdu_model.split(sep=":")
- if len(parts) == 2:
- kdu_model = parts[0]
+ if kdu_model.count("/") < 2: # helm chart is not embedded
+ parts = kdu_model.split(sep=":")
+ if len(parts) == 2:
+ kdu_model = parts[0]
if desc_params.get("kdu_atomic_upgrade"):
atomic_upgrade = desc_params.get(
"kdu_atomic_upgrade"
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
stage[2] = "Terminating VDUs"
if scaling_info.get("vdu-delete"):
# scale_process = "RO"
- if self.ro_config.get("ng"):
+ if self.ro_config.ng:
await self._scale_ng_ro(
logging_text,
db_nsr,
}
)
scaling_info["vdu-create"][vdud["id"]] = count_index
- if self.ro_config.get("ng"):
+ if self.ro_config.ng:
self.logger.debug(
"New Resources to be deployed: {}".format(scaling_info)
)
path=path,
charm_id=charm_id,
charm_type=charm_type,
- timeout=timeout or self.timeout_ns_update,
+ timeout=timeout or self.timeout.ns_update,
)
if output:
raise
except Exception as e:
-
self.logger.debug("Error upgrading charm {}".format(path))
return "FAILED", "Error upgrading charm {}: {}".format(path, e)
exc = None
change_type = "updated"
detailed_status = ""
+ member_vnf_index = None
try:
# wait for any previous tasks in process
nsr_deployed = db_nsr["_admin"].get("deployed")
if update_type == "CHANGE_VNFPKG":
-
# Get the input parameters given through update request
vnf_instance_id = db_nslcmop["operationParams"][
"changeVnfPackageData"
step = "Checking if revision has changed in VNFD"
if current_vnf_revision != latest_vnfd_revision:
-
change_type = "policy_updated"
# There is new revision of VNFD, update operation is required
ee_id = vca_deployed.get("ee_id")
step = "Getting descriptor config"
+ if current_vnfd.get("kdu"):
+ search_key = "kdu_name"
+ else:
+ search_key = "vnfd_id"
+
+ entity_id = vca_deployed.get(search_key)
+
descriptor_config = get_configuration(
- current_vnfd, current_vnfd["id"]
+ current_vnfd, entity_id
)
if "execution-environment-list" in descriptor_config:
# There could be several charm used in the same VNF
for ee_item in ee_list:
if ee_item.get("juju"):
-
step = "Getting charm name"
charm_name = ee_item["juju"].get("charm")
if find_software_version(current_vnfd) != find_software_version(
latest_vnfd
):
-
step = "Checking if existing VNF has charm"
for current_charm_path, target_charm_path in list(
charm_artifact_paths
current_charm_path, target_charm_path
)
):
-
step = "Checking whether VNF uses juju bundle"
if check_juju_bundle_existence(current_vnfd):
-
raise LcmException(
"Charm upgrade is not supported for the instance which"
" uses juju-bundle: {}".format(
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- if change_type in ("vnf_terminated", "policy_updated"):
+ if (
+ change_type in ("vnf_terminated", "policy_updated")
+ and member_vnf_index
+ ):
msg.update({"vnf_member_index": member_vnf_index})
- await self.msg.aiowrite("ns", change_type, msg, loop=self.loop)
+ await self.msg.aiowrite("ns", change_type, msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
scaling_in=True,
vca_id=vca_id,
),
- timeout=self.timeout_charm_delete,
+ timeout=self.timeout.charm_delete,
)
)
tasks_dict_info[task] = "Terminating VCA {}".format(
logging_text,
tasks_dict_info,
min(
- self.timeout_charm_delete, self.timeout_ns_terminate
+ self.timeout.charm_delete, self.timeout.ns_terminate
),
stage,
nslcmop_id,
# SCALE RO - BEGIN
if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
scale_process = "RO"
- if self.ro_config.get("ng"):
+ if self.ro_config.ng:
await self._scale_ng_ro(
logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
)
vdu_id = None
vdu_name = None
kdu_name = None
+ kdu_index = None
self._deploy_n2vc(
logging_text=logging_text
+ "member_vnf_index={} ".format(member_vnf_index),
vnfd_id=vnfd_id,
vdu_id=vdu_id,
kdu_name=kdu_name,
+ kdu_index=kdu_index,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
vdu_name=vdu_name,
if descriptor_config:
vdu_name = None
kdu_name = None
+ kdu_index = None
stage[
1
] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
kdu_name=kdu_name,
member_vnf_index=member_vnf_index,
vdu_index=vdu_index,
+ kdu_index=kdu_index,
vdu_name=vdu_name,
deploy_params=deploy_params_vdu,
descriptor_config=descriptor_config,
exc = await self._wait_for_tasks(
logging_text,
tasks_dict_info,
- self.timeout_ns_deploy,
+ self.timeout.ns_deploy,
stage,
nslcmop_id,
nsr_id=nsr_id,
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "scaled", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
primitive_name=terminate_config_primitive["name"],
params=primitive_params_,
db_dict=db_dict,
- total_timeout=self.timeout_primitive,
+ total_timeout=self.timeout.primitive,
vca_id=vca_id,
),
- timeout=self.timeout_primitive
- * self.timeout_primitive_outer_factor,
+ timeout=self.timeout.primitive
+ * self.timeout.primitive_outer_factor,
)
await asyncio.wait_for(
kdu_instance=kdu_instance,
scale=scale,
resource_name=kdu_scaling_info["resource-name"],
- total_timeout=self.timeout_scale_on_error,
+ total_timeout=self.timeout.scale_on_error,
vca_id=vca_id,
cluster_uuid=cluster_uuid,
kdu_model=kdu_model,
atomic=True,
db_dict=db_dict,
),
- timeout=self.timeout_scale_on_error
- * self.timeout_scale_on_error_outer_factor,
+ timeout=self.timeout.scale_on_error
+ * self.timeout.scale_on_error_outer_factor,
)
if kdu_scaling_info["type"] == "create":
n2vc_key_list,
stage=stage,
start_deploy=time(),
- timeout_ns_deploy=self.timeout_ns_deploy,
+ timeout_ns_deploy=self.timeout.ns_deploy,
)
if vdu_scaling_info.get("vdu-delete"):
self.scale_vnfr(
)
async def extract_prometheus_scrape_jobs(
- self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
- ):
+ self,
+ ee_id: str,
+ artifact_path: str,
+ ee_config_descriptor: dict,
+ vnfr_id: str,
+ nsr_id: str,
+ target_ip: str,
+ element_type: str,
+ vnf_member_index: str = "",
+ vdu_id: str = "",
+ vdu_index: int = None,
+ kdu_name: str = "",
+ kdu_index: int = None,
+ ) -> dict:
+ """Method to extract prometheus scrape jobs from EE's Prometheus template job file
+ This method will wait until the corresponding VDU or KDU is fully instantiated
+
+ Args:
+ ee_id (str): Execution Environment ID
+ artifact_path (str): Path where the EE's content is (including the Prometheus template file)
+ ee_config_descriptor (dict): Execution Environment's configuration descriptor
+ vnfr_id (str): VNFR ID where this EE applies
+ nsr_id (str): NSR ID where this EE applies
+ target_ip (str): VDU/KDU instance IP address
+ element_type (str): NS or VNF or VDU or KDU
+ vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
+ vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
+ vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
+ kdu_name (str, optional): KDU name where this EE applies. Defaults to "".
+ kdu_index (int, optional): KDU index where this EE applies. Defaults to None.
+
+ Raises:
+ LcmException: When the VDU or KDU instance was not found in an hour
+
+ Returns:
+ _type_: Prometheus jobs
+ """
+ # default the vdur and kdur names to an empty string, to avoid any later
+ # problem with Prometheus when the element type is not VDU or KDU
+ vdur_name = ""
+ kdur_name = ""
+
# look if exist a file called 'prometheus*.j2' and
artifact_content = self.fs.dir_ls(artifact_path)
job_file = next(
)
if not job_file:
return
+ self.logger.debug("Artifact path{}".format(artifact_path))
+ self.logger.debug("job file{}".format(job_file))
with self.fs.file_open((artifact_path, job_file), "r") as f:
job_data = f.read()
+ # obtain the VDUR or KDUR, if the element type is VDU or KDU
+ if element_type in ("VDU", "KDU"):
+ for _ in range(360):
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
+ if vdu_id and vdu_index is not None:
+ vdur = next(
+ (
+ x
+ for x in get_iterable(db_vnfr, "vdur")
+ if (
+ x.get("vdu-id-ref") == vdu_id
+ and x.get("count-index") == vdu_index
+ )
+ ),
+ {},
+ )
+ if vdur.get("name"):
+ vdur_name = vdur.get("name")
+ break
+ if kdu_name and kdu_index is not None:
+ kdur = next(
+ (
+ x
+ for x in get_iterable(db_vnfr, "kdur")
+ if (
+ x.get("kdu-name") == kdu_name
+ and x.get("count-index") == kdu_index
+ )
+ ),
+ {},
+ )
+ if kdur.get("name"):
+ kdur_name = kdur.get("name")
+ break
+
+ await asyncio.sleep(10)
+ else:
+ if vdu_id and vdu_index is not None:
+ raise LcmException(
+ f"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
+ )
+ if kdu_name and kdu_index is not None:
+ raise LcmException(
+ f"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
+ )
+
# TODO get_service
- _, _, service = ee_id.partition(".") # remove prefix "namespace."
- host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
- host_port = "80"
- vnfr_id = vnfr_id.replace("-", "")
- variables = {
- "JOB_NAME": vnfr_id,
- "TARGET_IP": target_ip,
- "EXPORTER_POD_IP": host_name,
- "EXPORTER_POD_PORT": host_port,
- }
+ if ee_id is not None:
+ _, _, service = ee_id.partition(".") # remove prefix "namespace."
+ host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
+ host_port = "80"
+ vnfr_id = vnfr_id.replace("-", "")
+ variables = {
+ "JOB_NAME": vnfr_id,
+ "TARGET_IP": target_ip,
+ "EXPORTER_POD_IP": host_name,
+ "EXPORTER_POD_PORT": host_port,
+ "NSR_ID": nsr_id,
+ "VNF_MEMBER_INDEX": vnf_member_index,
+ "VDUR_NAME": vdur_name,
+ "KDUR_NAME": kdur_name,
+ "ELEMENT_TYPE": element_type,
+ }
+ else:
+ metric_path = ee_config_descriptor["metric-path"]
+ target_port = ee_config_descriptor["metric-port"]
+ vnfr_id = vnfr_id.replace("-", "")
+ variables = {
+ "JOB_NAME": vnfr_id,
+ "TARGET_IP": target_ip,
+ "TARGET_PORT": target_port,
+ "METRIC_PATH": metric_path,
+ }
+
job_list = parse_job(job_data, variables)
# ensure job_name is using the vnfr_id. Adding the metadata nsr_id
for job in job_list:
not isinstance(job.get("job_name"), str)
or vnfr_id not in job["job_name"]
):
- job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
+ job["job_name"] = vnfr_id + "_" + str(SystemRandom().randint(1, 10000))
job["nsr_id"] = nsr_id
job["vnfr_id"] = vnfr_id
return job_list
action_id,
nslcmop_id,
start_deploy,
- self.timeout_operate,
+ self.timeout.operate,
None,
"start_stop_rebuild",
)
action_id,
nslcmop_id,
start_deploy,
- self.timeout_migrate,
+ self.timeout.migrate,
operation="migrate",
)
except (ROclient.ROClientException, DbException, LcmException) as e:
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "migrated", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "migrated", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
for target_vdu in target_vdu_list:
deploy_params_vdu = target_vdu
# Set run-day1 vnf level value if not vdu level value exists
- if not deploy_params_vdu.get("run-day1") and target_vnf[
- "additionalParams"
- ].get("run-day1"):
+ if not deploy_params_vdu.get("run-day1") and target_vnf.get(
+ "additionalParams", {}
+ ).get("run-day1"):
deploy_params_vdu["run-day1"] = target_vnf[
"additionalParams"
].get("run-day1")
exc = await self._wait_for_tasks(
logging_text,
tasks_dict_info,
- self.timeout_ns_deploy,
+ self.timeout.ns_deploy,
stage,
nslcmop_id,
nsr_id=nsr_id,
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "healed", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "healed", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
if ns_params and ns_params.get("timeout_ns_heal"):
timeout_ns_heal = ns_params["timeout_ns_heal"]
else:
- timeout_ns_heal = self.timeout.get("ns_heal", self.timeout_ns_heal)
+ timeout_ns_heal = self.timeout.ns_heal
db_vims = {}
}
step = ""
try:
-
element_type = "NS"
element_under_configuration = nsr_id
# n2vc_redesign STEP 5.1
# wait for RO (ip-address) Insert pub_key into VM
# IMPORTANT: We need do wait for RO to complete healing operation.
- await self._wait_heal_ro(nsr_id, self.timeout_ns_heal)
+ await self._wait_heal_ro(nsr_id, self.timeout.ns_heal)
if vnfr_id:
if kdu_name:
rw_mgmt_ip = await self.wait_kdu_up(
self.logger.debug("Wait Heal RO > {}".format(operational_status_ro))
if operational_status_ro != "healing":
break
- await asyncio.sleep(15, loop=self.loop)
+ await asyncio.sleep(15)
else: # timeout_ns_deploy
raise NgRoException("Timeout waiting ns to deploy")
action_id,
nslcmop_id,
start_deploy,
- self.timeout_verticalscale,
+ self.timeout.verticalscale,
operation="verticalscale",
)
except (ROclient.ROClientException, DbException, LcmException) as e:
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "verticalscaled", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "verticalscaled", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)