)
from osm_lcm import ROclient
+from osm_lcm.data_utils.lcm_config import LcmCfg
from osm_lcm.data_utils.nsr import (
get_deployed_kdu,
get_deployed_vca,
populate_dict,
check_juju_bundle_existence,
get_charm_artifact_path,
+ get_ee_id_parts,
)
from osm_lcm.data_utils.nsd import (
get_ns_configuration_relation_list,
get_juju_ee_ref,
get_kdu_resource_profile,
find_software_version,
+ check_helm_ee_in_ns,
)
from osm_lcm.data_utils.list_utils import find_in_list
from osm_lcm.data_utils.vnfr import (
from osm_lcm.data_utils.database.database import Database
from osm_lcm.data_utils.filesystem.filesystem import Filesystem
+from osm_lcm.data_utils.wim import (
+ get_sdn_ports,
+ get_target_wim_attrs,
+ select_feasible_wim_account,
+)
from n2vc.n2vc_juju_conn import N2VCJujuConnector
from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
class NsLcm(LcmBase):
- timeout_vca_on_error = (
- 5 * 60
- ) # Time for charm from first time at blocked,error status to mark as failed
- timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
- timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
- timeout_ns_heal = 1800 # default global timeout for un deployment a ns
- timeout_charm_delete = 10 * 60
- timeout_primitive = 30 * 60 # timeout for primitive execution
- timeout_ns_update = 30 * 60 # timeout for ns update
- timeout_progress_primitive = (
- 10 * 60
- ) # timeout for some progress in a primitive execution
- timeout_migrate = 1800 # default global timeout for migrating vnfs
- timeout_operate = 1800 # default global timeout for migrating vnfs
- timeout_verticalscale = 1800 # default global timeout for Vertical Sclaing
SUBOPERATION_STATUS_NOT_FOUND = -1
SUBOPERATION_STATUS_NEW = -2
SUBOPERATION_STATUS_SKIP = -3
task_name_deploy_vca = "Deploying VCA"
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config: LcmCfg, loop):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
self.fs = Filesystem().instance.fs
self.loop = loop
self.lcm_tasks = lcm_tasks
- self.timeout = config["timeout"]
- self.ro_config = config["ro_config"]
- self.ng_ro = config["ro_config"].get("ng")
- self.vca_config = config["VCA"].copy()
+ self.timeout = config.timeout
+ self.ro_config = config.RO
+ self.vca_config = config.VCA
# create N2VC connector
self.n2vc = N2VCJujuConnector(
)
self.k8sclusterhelm2 = K8sHelmConnector(
- kubectl_command=self.vca_config.get("kubectlpath"),
- helm_command=self.vca_config.get("helmpath"),
+ kubectl_command=self.vca_config.kubectlpath,
+ helm_command=self.vca_config.helmpath,
log=self.logger,
on_update_db=None,
fs=self.fs,
)
self.k8sclusterhelm3 = K8sHelm3Connector(
- kubectl_command=self.vca_config.get("kubectlpath"),
- helm_command=self.vca_config.get("helm3path"),
+ kubectl_command=self.vca_config.kubectlpath,
+ helm_command=self.vca_config.helm3path,
fs=self.fs,
log=self.logger,
db=self.db,
)
self.k8sclusterjuju = K8sJujuConnector(
- kubectl_command=self.vca_config.get("kubectlpath"),
- juju_command=self.vca_config.get("jujupath"),
+ kubectl_command=self.vca_config.kubectlpath,
+ juju_command=self.vca_config.jujupath,
log=self.logger,
loop=self.loop,
on_update_db=self._on_update_k8s_db,
}
# create RO client
- self.RO = NgRoClient(self.loop, **self.ro_config)
+ self.RO = NgRoClient(self.loop, **self.ro_config.to_dict())
self.op_status_map = {
"instantiation": self.RO.status,
target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
"provider-network"
]["sdn-ports"]
- if vld_params.get("wimAccountId"):
- target_wim = "wim:{}".format(vld_params["wimAccountId"])
- target_vld["vim_info"][target_wim] = {}
+
+ # check if WIM is needed; if needed, choose a feasible WIM able to connect VIMs
+ # if wim_account_id is specified in vld_params, validate if it is feasible.
+ wim_account_id, db_wim = select_feasible_wim_account(
+ db_nsr, db_vnfrs, target_vld, vld_params, self.logger
+ )
+
+ if wim_account_id:
+ # WIM is needed and a feasible one was found, populate WIM target and SDN ports
+ self.logger.info("WIM selected: {:s}".format(str(wim_account_id)))
+ # update vld_params with correct WIM account Id
+ vld_params["wimAccountId"] = wim_account_id
+
+ target_wim = "wim:{}".format(wim_account_id)
+ target_wim_attrs = get_target_wim_attrs(nsr_id, target_vld, vld_params)
+ sdn_ports = get_sdn_ports(vld_params, db_wim)
+ if len(sdn_ports) > 0:
+ target_vld["vim_info"][target_wim] = target_wim_attrs
+ target_vld["vim_info"][target_wim]["sdn-ports"] = sdn_ports
+
+ self.logger.debug(
+ "Target VLD with WIM data: {:s}".format(str(target_vld))
+ )
+
for param in ("vim-network-name", "vim-network-id"):
if vld_params.get(param):
if isinstance(vld_params[param], dict):
None,
)
vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
+ if not vdur:
+ return
for a_index, a_vld in enumerate(target["ns"]["vld"]):
target_vld = find_in_list(
get_iterable(vdur, "interfaces"),
if ns_params and ns_params.get("timeout_ns_deploy"):
timeout_ns_deploy = ns_params["timeout_ns_deploy"]
else:
- timeout_ns_deploy = self.timeout.get(
- "ns_deploy", self.timeout_ns_deploy
- )
+ timeout_ns_deploy = self.timeout.ns_deploy
# Check for and optionally request placement optimization. Database will be updated if placement activated
stage[2] = "Waiting for Placement."
ro_vm_id = "{}-{}".format(
db_vnfr["member-vnf-index-ref"], target_vdu_id
) # TODO add vdu_index
- if self.ng_ro:
+ if self.ro_config.ng:
target = {
"action": {
"action": "inject_ssh_key",
db_dict=db_dict,
config=osm_config,
artifact_path=artifact_path,
+ chart_model=vca_name,
vca_type=vca_type,
)
else:
# STEP 7 Configure metrics
if vca_type == "helm" or vca_type == "helm-v3":
+ # TODO: review for those cases where the helm chart is a reference and
+ # is not part of the NF package
prometheus_jobs = await self.extract_prometheus_scrape_jobs(
ee_id=ee_id,
artifact_path=artifact_path,
if ns_params and ns_params.get("timeout_ns_deploy"):
timeout_ns_deploy = ns_params["timeout_ns_deploy"]
else:
- timeout_ns_deploy = self.timeout.get(
- "ns_deploy", self.timeout_ns_deploy
- )
+ timeout_ns_deploy = self.timeout.ns_deploy
# read from db: ns
stage[1] = "Getting nsr={} from db.".format(nsr_id)
# feature 1429. Add n2vc public key to needed VMs
n2vc_key = self.n2vc.get_public_key()
n2vc_key_list = [n2vc_key]
- if self.vca_config.get("public_key"):
- n2vc_key_list.append(self.vca_config["public_key"])
+ if self.vca_config.public_key:
+ n2vc_key_list.append(self.vca_config.public_key)
stage[1] = "Deploying NS at VIM."
task_ro = asyncio.ensure_future(
stage[1] = "Deploying Execution Environments."
self.logger.debug(logging_text + stage[1])
+ # create namespace and certificate if any helm based EE is present in the NS
+ if check_helm_ee_in_ns(db_vnfds):
+ # TODO: create EE namespace
+ # create TLS certificates
+ await self.vca_map["helm-v3"].create_tls_certificate(
+ secret_name="ee-tls-{}".format(nsr_id),
+ dns_prefix="*",
+ nsr_id=nsr_id,
+ usage="server auth",
+ )
+
nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
for vnf_profile in get_vnf_profiles(nsd):
vnfd_id = vnf_profile["vnfd-id"]
try:
await self.n2vc.delete_namespace(
namespace=namespace,
- total_timeout=self.timeout_charm_delete,
+ total_timeout=self.timeout.charm_delete,
vca_id=vca_id,
)
except N2VCNotFound: # already deleted. Skip
logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
- timeout_ns_terminate = self.timeout_ns_terminate
+ timeout_ns_terminate = self.timeout.ns_terminate
db_nsr = None
db_nslcmop = None
operation_params = None
error_list = await self._wait_for_tasks(
logging_text,
tasks_dict_info,
- min(self.timeout_charm_delete, timeout_ns_terminate),
+ min(self.timeout.charm_delete, timeout_ns_terminate),
stage,
nslcmop_id,
)
task_delete_ee = asyncio.ensure_future(
asyncio.wait_for(
self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
- timeout=self.timeout_charm_delete,
+ timeout=self.timeout.charm_delete,
)
)
# task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
tasks_dict_info[task_delete_ee] = "Terminating all VCA"
+ # Delete Namespace and Certificates if necessary
+ if check_helm_ee_in_ns(list(db_vnfds_from_member_index.values())):
+ await self.vca_map["helm-v3"].delete_tls_certificate(
+ certificate_name=db_nslcmop["nsInstanceId"],
+ )
+ # TODO: Delete namespace
+
# Delete from k8scluster
stage[1] = "Deleting KDUs."
self.logger.debug(logging_text + stage[1])
# remove from RO
stage[1] = "Deleting ns from VIM."
- if self.ng_ro:
+ if self.ro_config.ng:
task_delete_ro = asyncio.ensure_future(
self._terminate_ng_ro(
logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
ee_id=ee_id,
primitive_name=primitive,
params_dict=primitive_params,
- progress_timeout=self.timeout_progress_primitive,
- total_timeout=self.timeout_primitive,
+ progress_timeout=self.timeout.progress_primitive,
+ total_timeout=self.timeout.primitive,
db_dict=db_dict,
vca_id=vca_id,
vca_type=vca_type,
),
- timeout=timeout or self.timeout_primitive,
+ timeout=timeout or self.timeout.primitive,
)
# execution was OK
break
primitive = db_nslcmop["operationParams"]["primitive"]
primitive_params = db_nslcmop["operationParams"]["primitive_params"]
timeout_ns_action = db_nslcmop["operationParams"].get(
- "timeout_ns_action", self.timeout_primitive
+ "timeout_ns_action", self.timeout.primitive
)
if vnf_index:
parts = kdu_model.split(sep=":")
if len(parts) == 2:
kdu_model = parts[0]
+ if desc_params.get("kdu_atomic_upgrade"):
+ atomic_upgrade = desc_params.get(
+ "kdu_atomic_upgrade"
+ ).lower() in ("yes", "true", "1")
+ del desc_params["kdu_atomic_upgrade"]
+ else:
+ atomic_upgrade = True
detailed_status = await asyncio.wait_for(
self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu.get("kdu-instance"),
- atomic=True,
+ atomic=atomic_upgrade,
kdu_model=kdu_model,
params=desc_params,
db_dict=db_dict,
stage[2] = "Terminating VDUs"
if scaling_info.get("vdu-delete"):
# scale_process = "RO"
- if self.ro_config.get("ng"):
+ if self.ro_config.ng:
await self._scale_ng_ro(
logging_text,
db_nsr,
}
)
scaling_info["vdu-create"][vdud["id"]] = count_index
- if self.ro_config.get("ng"):
+ if self.ro_config.ng:
self.logger.debug(
"New Resources to be deployed: {}".format(scaling_info)
)
path=path,
charm_id=charm_id,
charm_type=charm_type,
- timeout=timeout or self.timeout_ns_update,
+ timeout=timeout or self.timeout.ns_update,
)
if output:
current_charm_artifact_path,
target_charm_artifact_path,
charm_artifact_paths,
- ) = ([], [], [])
+ helm_artifacts,
+ ) = ([], [], [], [])
step = "Checking if revision has changed in VNFD"
if current_vnf_revision != latest_vnfd_revision:
step = (
"Get the charm-type, charm-id, ee-id if there is deployed VCA"
)
- base_folder = latest_vnfd["_admin"]["storage"]
+ current_base_folder = current_vnfd["_admin"]["storage"]
+ latest_base_folder = latest_vnfd["_admin"]["storage"]
- for charm_index, charm_deployed in enumerate(
+ for vca_index, vca_deployed in enumerate(
get_iterable(nsr_deployed, "VCA")
):
vnf_index = db_vnfr.get("member-vnf-index-ref")
# Getting charm-id and charm-type
- if charm_deployed.get("member-vnf-index") == vnf_index:
- charm_id = self.get_vca_id(db_vnfr, db_nsr)
- charm_type = charm_deployed.get("type")
+ if vca_deployed.get("member-vnf-index") == vnf_index:
+ vca_id = self.get_vca_id(db_vnfr, db_nsr)
+ vca_type = vca_deployed.get("type")
+ vdu_count_index = vca_deployed.get("vdu_count_index")
# Getting ee-id
- ee_id = charm_deployed.get("ee_id")
+ ee_id = vca_deployed.get("ee_id")
step = "Getting descriptor config"
+ if current_vnfd.get("kdu"):
+
+ search_key = "kdu_name"
+ else:
+ search_key = "vnfd_id"
+
+ entity_id = vca_deployed.get(search_key)
+
descriptor_config = get_configuration(
- current_vnfd, current_vnfd["id"]
+ current_vnfd, entity_id
)
if "execution-environment-list" in descriptor_config:
step = "Setting Charm artifact paths"
current_charm_artifact_path.append(
get_charm_artifact_path(
- base_folder,
+ current_base_folder,
charm_name,
- charm_type,
+ vca_type,
current_vnf_revision,
)
)
target_charm_artifact_path.append(
get_charm_artifact_path(
- base_folder,
+ latest_base_folder,
charm_name,
- charm_type,
+ vca_type,
latest_vnfd_revision,
)
)
+ elif ee_item.get("helm-chart"):
+ # add chart to list and all parameters
+ step = "Getting helm chart name"
+ chart_name = ee_item.get("helm-chart")
+ if (
+ ee_item.get("helm-version")
+ and ee_item.get("helm-version") == "v2"
+ ):
+ vca_type = "helm"
+ else:
+ vca_type = "helm-v3"
+ step = "Setting Helm chart artifact paths"
+
+ helm_artifacts.append(
+ {
+ "current_artifact_path": get_charm_artifact_path(
+ current_base_folder,
+ chart_name,
+ vca_type,
+ current_vnf_revision,
+ ),
+ "target_artifact_path": get_charm_artifact_path(
+ latest_base_folder,
+ chart_name,
+ vca_type,
+ latest_vnfd_revision,
+ ),
+ "ee_id": ee_id,
+ "vca_index": vca_index,
+ "vdu_index": vdu_count_index,
+ }
+ )
charm_artifact_paths = zip(
current_charm_artifact_path, target_charm_artifact_path
detailed_status,
) = await self._ns_charm_upgrade(
ee_id=ee_id,
- charm_id=charm_id,
- charm_type=charm_type,
+ charm_id=vca_id,
+ charm_type=vca_type,
path=self.fs.path + target_charm_path,
timeout=timeout_seconds,
)
detailed_status = "Done"
db_nslcmop_update["detailed-status"] = "Done"
+ # helm base EE
+ for item in helm_artifacts:
+ if not (
+ item["current_artifact_path"]
+ and item["target_artifact_path"]
+ and self.check_charm_hash_changed(
+ item["current_artifact_path"],
+ item["target_artifact_path"],
+ )
+ ):
+ continue
+ db_update_entry = "_admin.deployed.VCA.{}.".format(
+ item["vca_index"]
+ )
+ vnfr_id = db_vnfr["_id"]
+ osm_config = {"osm": {"ns_id": nsr_id, "vnf_id": vnfr_id}}
+ db_dict = {
+ "collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": db_update_entry,
+ }
+ vca_type, namespace, helm_id = get_ee_id_parts(item["ee_id"])
+ await self.vca_map[vca_type].upgrade_execution_environment(
+ namespace=namespace,
+ helm_id=helm_id,
+ db_dict=db_dict,
+ config=osm_config,
+ artifact_path=item["target_artifact_path"],
+ vca_type=vca_type,
+ )
+ vnf_id = db_vnfr.get("vnfd-ref")
+ config_descriptor = get_configuration(latest_vnfd, vnf_id)
+ self.logger.debug("get ssh key block")
+ rw_mgmt_ip = None
+ if deep_get(
+ config_descriptor,
+ ("config-access", "ssh-access", "required"),
+ ):
+ # Needed to inject a ssh key
+ user = deep_get(
+ config_descriptor,
+ ("config-access", "ssh-access", "default-user"),
+ )
+ step = (
+ "Install configuration Software, getting public ssh key"
+ )
+ pub_key = await self.vca_map[
+ vca_type
+ ].get_ee_ssh_public__key(
+ ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
+ )
+
+ step = (
+ "Insert public key into VM user={} ssh_key={}".format(
+ user, pub_key
+ )
+ )
+ self.logger.debug(logging_text + step)
+
+ # wait for RO (ip-address) Insert pub_key into VM
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
+ logging_text,
+ nsr_id,
+ vnfr_id,
+ None,
+ item["vdu_index"],
+ user=user,
+ pub_key=pub_key,
+ )
+
+ initial_config_primitive_list = config_descriptor.get(
+ "initial-config-primitive"
+ )
+ config_primitive = next(
+ (
+ p
+ for p in initial_config_primitive_list
+ if p["name"] == "config"
+ ),
+ None,
+ )
+ if not config_primitive:
+ continue
+
+ deploy_params = {"OSM": get_osm_params(db_vnfr)}
+ if rw_mgmt_ip:
+ deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
+ if db_vnfr.get("additionalParamsForVnf"):
+ deploy_params.update(
+ parse_yaml_strings(
+ db_vnfr["additionalParamsForVnf"].copy()
+ )
+ )
+ primitive_params_ = self._map_primitive_params(
+ config_primitive, {}, deploy_params
+ )
+
+ step = "execute primitive '{}' params '{}'".format(
+ config_primitive["name"], primitive_params_
+ )
+ self.logger.debug(logging_text + step)
+ await self.vca_map[vca_type].exec_primitive(
+ ee_id=ee_id,
+ primitive_name=config_primitive["name"],
+ params_dict=primitive_params_,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ vca_type=vca_type,
+ )
+
+ step = "Updating policies"
+ member_vnf_index = db_vnfr["member-vnf-index-ref"]
+ detailed_status = "Done"
+ db_nslcmop_update["detailed-status"] = "Done"
+
# If nslcmop_operation_state is None, so any operation is not failed.
if not nslcmop_operation_state:
nslcmop_operation_state = "COMPLETED"
scaling_in=True,
vca_id=vca_id,
),
- timeout=self.timeout_charm_delete,
+ timeout=self.timeout.charm_delete,
)
)
tasks_dict_info[task] = "Terminating VCA {}".format(
logging_text,
tasks_dict_info,
min(
- self.timeout_charm_delete, self.timeout_ns_terminate
+ self.timeout.charm_delete, self.timeout.ns_terminate
),
stage,
nslcmop_id,
# SCALE RO - BEGIN
if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
scale_process = "RO"
- if self.ro_config.get("ng"):
+ if self.ro_config.ng:
await self._scale_ng_ro(
logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
)
exc = await self._wait_for_tasks(
logging_text,
tasks_dict_info,
- self.timeout_ns_deploy,
+ self.timeout.ns_deploy,
stage,
nslcmop_id,
nsr_id=nsr_id,
primitive_name=terminate_config_primitive["name"],
params=primitive_params_,
db_dict=db_dict,
+ total_timeout=self.timeout.primitive,
vca_id=vca_id,
),
- timeout=600,
+ timeout=self.timeout.primitive
+ * self.timeout.primitive_outer_factor,
)
await asyncio.wait_for(
self.k8scluster_map[k8s_cluster_type].scale(
- kdu_instance,
- scale,
- kdu_scaling_info["resource-name"],
+ kdu_instance=kdu_instance,
+ scale=scale,
+ resource_name=kdu_scaling_info["resource-name"],
+ total_timeout=self.timeout.scale_on_error,
vca_id=vca_id,
cluster_uuid=cluster_uuid,
kdu_model=kdu_model,
atomic=True,
db_dict=db_dict,
),
- timeout=self.timeout_vca_on_error,
+ timeout=self.timeout.scale_on_error
+ * self.timeout.scale_on_error_outer_factor,
)
if kdu_scaling_info["type"] == "create":
n2vc_key_list,
stage=stage,
start_deploy=time(),
- timeout_ns_deploy=self.timeout_ns_deploy,
+ timeout_ns_deploy=self.timeout.ns_deploy,
)
if vdu_scaling_info.get("vdu-delete"):
self.scale_vnfr(
action_id,
nslcmop_id,
start_deploy,
- self.timeout_operate,
+ self.timeout.operate,
None,
"start_stop_rebuild",
)
action_id,
nslcmop_id,
start_deploy,
- self.timeout_migrate,
+ self.timeout.migrate,
operation="migrate",
)
except (ROclient.ROClientException, DbException, LcmException) as e:
self.update_db_2("nsrs", nsr_id, db_nsr_update)
step = "Sending heal order to VIM"
- task_ro = asyncio.ensure_future(
- self.heal_RO(
- logging_text=logging_text,
- nsr_id=nsr_id,
- db_nslcmop=db_nslcmop,
- stage=stage,
- )
+ await self.heal_RO(
+ logging_text=logging_text,
+ nsr_id=nsr_id,
+ db_nslcmop=db_nslcmop,
+ stage=stage,
)
- self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "heal_RO", task_ro)
- tasks_dict_info[task_ro] = "Healing at VIM"
-
# VCA tasks
# read from db: nsd
stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
exc = await self._wait_for_tasks(
logging_text,
tasks_dict_info,
- self.timeout_ns_deploy,
+ self.timeout.ns_deploy,
stage,
nslcmop_id,
nsr_id=nsr_id,
if ns_params and ns_params.get("timeout_ns_heal"):
timeout_ns_heal = ns_params["timeout_ns_heal"]
else:
- timeout_ns_heal = self.timeout.get("ns_heal", self.timeout_ns_heal)
+ timeout_ns_heal = self.timeout.ns_heal
db_vims = {}
# n2vc_redesign STEP 5.1
# wait for RO (ip-address) Insert pub_key into VM
# IMPORTANT: We need do wait for RO to complete healing operation.
- await self._wait_heal_ro(nsr_id, self.timeout_ns_heal)
+ await self._wait_heal_ro(nsr_id, self.timeout.ns_heal)
if vnfr_id:
if kdu_name:
rw_mgmt_ip = await self.wait_kdu_up(
action_id,
nslcmop_id,
start_deploy,
- self.timeout_verticalscale,
+ self.timeout.verticalscale,
operation="verticalscale",
)
except (ROclient.ROClientException, DbException, LcmException) as e: