+ step = "Parsing scaling parameters"
+ db_nsr_update["operational-status"] = "scaling"
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ nsr_deployed = db_nsr["_admin"].get("deployed")
+
+ vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
+ "scaleByStepData"
+ ]["member-vnf-index"]
+ scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
+ "scaleByStepData"
+ ]["scaling-group-descriptor"]
+ scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
+ # for backward compatibility
+ if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
+ nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
+ db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ step = "Getting vnfr from database"
+ db_vnfr = self.db.get_one(
+ "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
+ )
+
+ vca_id = self.get_vca_id(db_vnfr, db_nsr)
+
+ step = "Getting vnfd from database"
+ db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
+
+ base_folder = db_vnfd["_admin"]["storage"]
+
+ step = "Getting scaling-group-descriptor"
+ scaling_descriptor = find_in_list(
+ get_scaling_aspect(db_vnfd),
+ lambda scale_desc: scale_desc["name"] == scaling_group,
+ )
+ if not scaling_descriptor:
+ raise LcmException(
+ "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
+ "at vnfd:scaling-group-descriptor".format(scaling_group)
+ )
+
+ step = "Sending scale order to VIM"
+ # TODO check if ns is in a proper status
+ nb_scale_op = 0
+ if not db_nsr["_admin"].get("scaling-group"):
+ self.update_db_2(
+ "nsrs",
+ nsr_id,
+ {
+ "_admin.scaling-group": [
+ {"name": scaling_group, "nb-scale-op": 0}
+ ]
+ },
+ )
+ admin_scale_index = 0
+ else:
+ for admin_scale_index, admin_scale_info in enumerate(
+ db_nsr["_admin"]["scaling-group"]
+ ):
+ if admin_scale_info["name"] == scaling_group:
+ nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
+ break
+ else: # not found, set index one plus last element and add new entry with the name
+ admin_scale_index += 1
+ db_nsr_update[
+ "_admin.scaling-group.{}.name".format(admin_scale_index)
+ ] = scaling_group
+
+ vca_scaling_info = []
+ scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
+ if scaling_type == "SCALE_OUT":
+ if "aspect-delta-details" not in scaling_descriptor:
+ raise LcmException(
+ "Aspect delta details not fount in scaling descriptor {}".format(
+ scaling_descriptor["name"]
+ )
+ )
+ # count if max-instance-count is reached
+ deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
+
+ scaling_info["scaling_direction"] = "OUT"
+ scaling_info["vdu-create"] = {}
+ scaling_info["kdu-create"] = {}
+ for delta in deltas:
+ for vdu_delta in delta.get("vdu-delta", {}):
+ vdud = get_vdu(db_vnfd, vdu_delta["id"])
+ # vdu_index also provides the number of instance of the targeted vdu
+ vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
+ cloud_init_text = self._get_vdu_cloud_init_content(
+ vdud, db_vnfd
+ )
+ if cloud_init_text:
+ additional_params = (
+ self._get_vdu_additional_params(db_vnfr, vdud["id"])
+ or {}
+ )
+ cloud_init_list = []
+
+ vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
+ max_instance_count = 10
+ if vdu_profile and "max-number-of-instances" in vdu_profile:
+ max_instance_count = vdu_profile.get(
+ "max-number-of-instances", 10
+ )
+
+ default_instance_num = get_number_of_instances(
+ db_vnfd, vdud["id"]
+ )
+ instances_number = vdu_delta.get("number-of-instances", 1)
+ nb_scale_op += instances_number
+
+ new_instance_count = nb_scale_op + default_instance_num
+ # Control if new count is over max and vdu count is less than max.
+ # Then assign new instance count
+ if new_instance_count > max_instance_count > vdu_count:
+ instances_number = new_instance_count - max_instance_count
+ else:
+ instances_number = instances_number
+
+ if new_instance_count > max_instance_count:
+ raise LcmException(
+ "reached the limit of {} (max-instance-count) "
+ "scaling-out operations for the "
+ "scaling-group-descriptor '{}'".format(
+ nb_scale_op, scaling_group
+ )
+ )
+ for x in range(vdu_delta.get("number-of-instances", 1)):
+ if cloud_init_text:
+ # TODO Information of its own ip is not available because db_vnfr is not updated.
+ additional_params["OSM"] = get_osm_params(
+ db_vnfr, vdu_delta["id"], vdu_index + x
+ )
+ cloud_init_list.append(
+ self._parse_cloud_init(
+ cloud_init_text,
+ additional_params,
+ db_vnfd["id"],
+ vdud["id"],
+ )
+ )
+ vca_scaling_info.append(
+ {
+ "osm_vdu_id": vdu_delta["id"],
+ "member-vnf-index": vnf_index,
+ "type": "create",
+ "vdu_index": vdu_index + x,
+ }
+ )
+ scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
+ for kdu_delta in delta.get("kdu-resource-delta", {}):
+ kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
+ kdu_name = kdu_profile["kdu-name"]
+ resource_name = kdu_profile.get("resource-name", "")
+
+ # Might have different kdus in the same delta
+ # Should have list for each kdu
+ if not scaling_info["kdu-create"].get(kdu_name, None):
+ scaling_info["kdu-create"][kdu_name] = []
+
+ kdur = get_kdur(db_vnfr, kdu_name)
+ if kdur.get("helm-chart"):
+ k8s_cluster_type = "helm-chart-v3"
+ self.logger.debug("kdur: {}".format(kdur))
+ if (
+ kdur.get("helm-version")
+ and kdur.get("helm-version") == "v2"
+ ):
+ k8s_cluster_type = "helm-chart"
+ elif kdur.get("juju-bundle"):
+ k8s_cluster_type = "juju-bundle"
+ else:
+ raise LcmException(
+ "kdu type for kdu='{}.{}' is neither helm-chart nor "
+ "juju-bundle. Maybe an old NBI version is running".format(
+ db_vnfr["member-vnf-index-ref"], kdu_name
+ )
+ )
+
+ max_instance_count = 10
+ if kdu_profile and "max-number-of-instances" in kdu_profile:
+ max_instance_count = kdu_profile.get(
+ "max-number-of-instances", 10
+ )
+
+ nb_scale_op += kdu_delta.get("number-of-instances", 1)
+ deployed_kdu, _ = get_deployed_kdu(
+ nsr_deployed, kdu_name, vnf_index
+ )
+ if deployed_kdu is None:
+ raise LcmException(
+ "KDU '{}' for vnf '{}' not deployed".format(
+ kdu_name, vnf_index
+ )
+ )
+ kdu_instance = deployed_kdu.get("kdu-instance")
+ instance_num = await self.k8scluster_map[
+ k8s_cluster_type
+ ].get_scale_count(
+ resource_name,
+ kdu_instance,
+ vca_id=vca_id,
+ cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
+ kdu_model=deployed_kdu.get("kdu-model"),
+ )
+ kdu_replica_count = instance_num + kdu_delta.get(
+ "number-of-instances", 1
+ )
+
+ # Control if new count is over max and instance_num is less than max.
+ # Then assign max instance number to kdu replica count
+ if kdu_replica_count > max_instance_count > instance_num:
+ kdu_replica_count = max_instance_count
+ if kdu_replica_count > max_instance_count:
+ raise LcmException(
+ "reached the limit of {} (max-instance-count) "
+ "scaling-out operations for the "
+ "scaling-group-descriptor '{}'".format(
+ instance_num, scaling_group
+ )
+ )
+
+ for x in range(kdu_delta.get("number-of-instances", 1)):
+ vca_scaling_info.append(
+ {
+ "osm_kdu_id": kdu_name,
+ "member-vnf-index": vnf_index,
+ "type": "create",
+ "kdu_index": instance_num + x - 1,
+ }
+ )
+ scaling_info["kdu-create"][kdu_name].append(
+ {
+ "member-vnf-index": vnf_index,
+ "type": "create",
+ "k8s-cluster-type": k8s_cluster_type,
+ "resource-name": resource_name,
+ "scale": kdu_replica_count,
+ }
+ )
+ elif scaling_type == "SCALE_IN":
+ deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
+
+ scaling_info["scaling_direction"] = "IN"
+ scaling_info["vdu-delete"] = {}
+ scaling_info["kdu-delete"] = {}
+
+ for delta in deltas:
+ for vdu_delta in delta.get("vdu-delta", {}):
+ vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
+ min_instance_count = 0
+ vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
+ if vdu_profile and "min-number-of-instances" in vdu_profile:
+ min_instance_count = vdu_profile["min-number-of-instances"]
+
+ default_instance_num = get_number_of_instances(
+ db_vnfd, vdu_delta["id"]
+ )
+ instance_num = vdu_delta.get("number-of-instances", 1)
+ nb_scale_op -= instance_num
+
+ new_instance_count = nb_scale_op + default_instance_num
+
+ if new_instance_count < min_instance_count < vdu_count:
+ instances_number = min_instance_count - new_instance_count
+ else:
+ instances_number = instance_num
+
+ if new_instance_count < min_instance_count:
+ raise LcmException(
+ "reached the limit of {} (min-instance-count) scaling-in operations for the "
+ "scaling-group-descriptor '{}'".format(
+ nb_scale_op, scaling_group
+ )
+ )
+ for x in range(vdu_delta.get("number-of-instances", 1)):
+ vca_scaling_info.append(
+ {
+ "osm_vdu_id": vdu_delta["id"],
+ "member-vnf-index": vnf_index,
+ "type": "delete",
+ "vdu_index": vdu_index - 1 - x,
+ }
+ )
+ scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
+ for kdu_delta in delta.get("kdu-resource-delta", {}):
+ kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
+ kdu_name = kdu_profile["kdu-name"]
+ resource_name = kdu_profile.get("resource-name", "")
+
+ if not scaling_info["kdu-delete"].get(kdu_name, None):
+ scaling_info["kdu-delete"][kdu_name] = []
+
+ kdur = get_kdur(db_vnfr, kdu_name)
+ if kdur.get("helm-chart"):
+ k8s_cluster_type = "helm-chart-v3"
+ self.logger.debug("kdur: {}".format(kdur))
+ if (
+ kdur.get("helm-version")
+ and kdur.get("helm-version") == "v2"
+ ):
+ k8s_cluster_type = "helm-chart"
+ elif kdur.get("juju-bundle"):
+ k8s_cluster_type = "juju-bundle"
+ else:
+ raise LcmException(
+ "kdu type for kdu='{}.{}' is neither helm-chart nor "
+ "juju-bundle. Maybe an old NBI version is running".format(
+ db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
+ )
+ )
+
+ min_instance_count = 0
+ if kdu_profile and "min-number-of-instances" in kdu_profile:
+ min_instance_count = kdu_profile["min-number-of-instances"]
+
+ nb_scale_op -= kdu_delta.get("number-of-instances", 1)
+ deployed_kdu, _ = get_deployed_kdu(
+ nsr_deployed, kdu_name, vnf_index
+ )
+ if deployed_kdu is None:
+ raise LcmException(
+ "KDU '{}' for vnf '{}' not deployed".format(
+ kdu_name, vnf_index
+ )
+ )
+ kdu_instance = deployed_kdu.get("kdu-instance")
+ instance_num = await self.k8scluster_map[
+ k8s_cluster_type
+ ].get_scale_count(
+ resource_name,
+ kdu_instance,
+ vca_id=vca_id,
+ cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
+ kdu_model=deployed_kdu.get("kdu-model"),
+ )
+ kdu_replica_count = instance_num - kdu_delta.get(
+ "number-of-instances", 1
+ )
+
+ if kdu_replica_count < min_instance_count < instance_num:
+ kdu_replica_count = min_instance_count
+ if kdu_replica_count < min_instance_count:
+ raise LcmException(
+ "reached the limit of {} (min-instance-count) scaling-in operations for the "
+ "scaling-group-descriptor '{}'".format(
+ instance_num, scaling_group
+ )
+ )
+
+ for x in range(kdu_delta.get("number-of-instances", 1)):
+ vca_scaling_info.append(
+ {
+ "osm_kdu_id": kdu_name,
+ "member-vnf-index": vnf_index,
+ "type": "delete",
+ "kdu_index": instance_num - x - 1,
+ }
+ )
+ scaling_info["kdu-delete"][kdu_name].append(
+ {
+ "member-vnf-index": vnf_index,
+ "type": "delete",
+ "k8s-cluster-type": k8s_cluster_type,
+ "resource-name": resource_name,
+ "scale": kdu_replica_count,
+ }
+ )
+
+ # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
+ vdu_delete = copy(scaling_info.get("vdu-delete"))
+ if scaling_info["scaling_direction"] == "IN":
+ for vdur in reversed(db_vnfr["vdur"]):
+ if vdu_delete.get(vdur["vdu-id-ref"]):
+ vdu_delete[vdur["vdu-id-ref"]] -= 1
+ scaling_info["vdu"].append(
+ {
+ "name": vdur.get("name") or vdur.get("vdu-name"),
+ "vdu_id": vdur["vdu-id-ref"],
+ "interface": [],
+ }
+ )
+ for interface in vdur["interfaces"]:
+ scaling_info["vdu"][-1]["interface"].append(
+ {
+ "name": interface["name"],
+ "ip_address": interface["ip-address"],
+ "mac_address": interface.get("mac-address"),
+ }
+ )
+ # vdu_delete = vdu_scaling_info.pop("vdu-delete")
+
+ # PRE-SCALE BEGIN
+ step = "Executing pre-scale vnf-config-primitive"
+ if scaling_descriptor.get("scaling-config-action"):
+ for scaling_config_action in scaling_descriptor[
+ "scaling-config-action"
+ ]:
+ if (
+ scaling_config_action.get("trigger") == "pre-scale-in"
+ and scaling_type == "SCALE_IN"
+ ) or (
+ scaling_config_action.get("trigger") == "pre-scale-out"
+ and scaling_type == "SCALE_OUT"
+ ):
+ vnf_config_primitive = scaling_config_action[
+ "vnf-config-primitive-name-ref"
+ ]
+ step = db_nslcmop_update[
+ "detailed-status"
+ ] = "executing pre-scale scaling-config-action '{}'".format(
+ vnf_config_primitive
+ )
+
+ # look for primitive
+ for config_primitive in (
+ get_configuration(db_vnfd, db_vnfd["id"]) or {}
+ ).get("config-primitive", ()):
+ if config_primitive["name"] == vnf_config_primitive:
+ break
+ else:
+ raise LcmException(
+ "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
+ "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
+ "primitive".format(scaling_group, vnf_config_primitive)
+ )
+
+ vnfr_params = {"VDU_SCALE_INFO": scaling_info}
+ if db_vnfr.get("additionalParamsForVnf"):
+ vnfr_params.update(db_vnfr["additionalParamsForVnf"])
+
+ scale_process = "VCA"
+ db_nsr_update["config-status"] = "configuring pre-scaling"
+ primitive_params = self._map_primitive_params(
+ config_primitive, {}, vnfr_params
+ )
+
+ # Pre-scale retry check: Check if this sub-operation has been executed before
+ op_index = self._check_or_add_scale_suboperation(
+ db_nslcmop,
+ vnf_index,
+ vnf_config_primitive,
+ primitive_params,
+ "PRE-SCALE",
+ )
+ if op_index == self.SUBOPERATION_STATUS_SKIP:
+ # Skip sub-operation
+ result = "COMPLETED"
+ result_detail = "Done"
+ self.logger.debug(
+ logging_text
+ + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
+ vnf_config_primitive, result, result_detail
+ )
+ )
+ else:
+ if op_index == self.SUBOPERATION_STATUS_NEW:
+ # New sub-operation: Get index of this sub-operation
+ op_index = (
+ len(db_nslcmop.get("_admin", {}).get("operations"))
+ - 1
+ )
+ self.logger.debug(
+ logging_text
+ + "vnf_config_primitive={} New sub-operation".format(
+ vnf_config_primitive
+ )
+ )
+ else:
+ # retry: Get registered params for this existing sub-operation
+ op = db_nslcmop.get("_admin", {}).get("operations", [])[
+ op_index
+ ]
+ vnf_index = op.get("member_vnf_index")
+ vnf_config_primitive = op.get("primitive")
+ primitive_params = op.get("primitive_params")
+ self.logger.debug(
+ logging_text
+ + "vnf_config_primitive={} Sub-operation retry".format(
+ vnf_config_primitive
+ )
+ )
+ # Execute the primitive, either with new (first-time) or registered (reintent) args
+ ee_descriptor_id = config_primitive.get(
+ "execution-environment-ref"
+ )
+ primitive_name = config_primitive.get(
+ "execution-environment-primitive", vnf_config_primitive
+ )
+ ee_id, vca_type = self._look_for_deployed_vca(
+ nsr_deployed["VCA"],
+ member_vnf_index=vnf_index,
+ vdu_id=None,
+ vdu_count_index=None,
+ ee_descriptor_id=ee_descriptor_id,
+ )
+ result, result_detail = await self._ns_execute_primitive(
+ ee_id,
+ primitive_name,
+ primitive_params,
+ vca_type=vca_type,
+ vca_id=vca_id,
+ )
+ self.logger.debug(
+ logging_text
+ + "vnf_config_primitive={} Done with result {} {}".format(
+ vnf_config_primitive, result, result_detail
+ )
+ )
+ # Update operationState = COMPLETED | FAILED
+ self._update_suboperation_status(
+ db_nslcmop, op_index, result, result_detail
+ )
+
+ if result == "FAILED":
+ raise LcmException(result_detail)
+ db_nsr_update["config-status"] = old_config_status
+ scale_process = None
+ # PRE-SCALE END
+
+ db_nsr_update[
+ "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
+ ] = nb_scale_op
+ db_nsr_update[
+ "_admin.scaling-group.{}.time".format(admin_scale_index)
+ ] = time()
+
+ # SCALE-IN VCA - BEGIN
+ if vca_scaling_info:
+ step = db_nslcmop_update[
+ "detailed-status"
+ ] = "Deleting the execution environments"
+ scale_process = "VCA"
+ for vca_info in vca_scaling_info:
+ if vca_info["type"] == "delete" and not vca_info.get("osm_kdu_id"):
+ member_vnf_index = str(vca_info["member-vnf-index"])
+ self.logger.debug(
+ logging_text + "vdu info: {}".format(vca_info)
+ )
+ if vca_info.get("osm_vdu_id"):
+ vdu_id = vca_info["osm_vdu_id"]
+ vdu_index = int(vca_info["vdu_index"])
+ stage[
+ 1
+ ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
+ member_vnf_index, vdu_id, vdu_index
+ )
+ stage[2] = step = "Scaling in VCA"
+ self._write_op_status(op_id=nslcmop_id, stage=stage)
+ vca_update = db_nsr["_admin"]["deployed"]["VCA"]
+ config_update = db_nsr["configurationStatus"]
+ for vca_index, vca in enumerate(vca_update):
+ if (
+ (vca or vca.get("ee_id"))
+ and vca["member-vnf-index"] == member_vnf_index
+ and vca["vdu_count_index"] == vdu_index
+ ):
+ if vca.get("vdu_id"):
+ config_descriptor = get_configuration(
+ db_vnfd, vca.get("vdu_id")
+ )
+ elif vca.get("kdu_name"):
+ config_descriptor = get_configuration(
+ db_vnfd, vca.get("kdu_name")
+ )
+ else:
+ config_descriptor = get_configuration(
+ db_vnfd, db_vnfd["id"]
+ )
+ operation_params = (
+ db_nslcmop.get("operationParams") or {}
+ )
+ exec_terminate_primitives = not operation_params.get(
+ "skip_terminate_primitives"
+ ) and vca.get("needed_terminate")
+ task = asyncio.ensure_future(
+ asyncio.wait_for(
+ self.destroy_N2VC(
+ logging_text,
+ db_nslcmop,
+ vca,
+ config_descriptor,
+ vca_index,
+ destroy_ee=True,
+ exec_primitives=exec_terminate_primitives,
+ scaling_in=True,
+ vca_id=vca_id,
+ ),
+ timeout=self.timeout.charm_delete,
+ )
+ )
+ tasks_dict_info[task] = "Terminating VCA {}".format(
+ vca.get("ee_id")
+ )
+ del vca_update[vca_index]
+ del config_update[vca_index]
+ # wait for pending tasks of terminate primitives
+ if tasks_dict_info:
+ self.logger.debug(
+ logging_text
+ + "Waiting for tasks {}".format(
+ list(tasks_dict_info.keys())
+ )
+ )
+ error_list = await self._wait_for_tasks(
+ logging_text,
+ tasks_dict_info,
+ min(
+ self.timeout.charm_delete, self.timeout.ns_terminate
+ ),
+ stage,
+ nslcmop_id,
+ )
+ tasks_dict_info.clear()
+ if error_list:
+ raise LcmException("; ".join(error_list))
+
+ db_vca_and_config_update = {
+ "_admin.deployed.VCA": vca_update,
+ "configurationStatus": config_update,
+ }
+ self.update_db_2(
+ "nsrs", db_nsr["_id"], db_vca_and_config_update
+ )
+ scale_process = None
+ # SCALE-IN VCA - END
+
+ # SCALE RO - BEGIN
+ if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
+ scale_process = "RO"
+ if self.ro_config.ng:
+ await self._scale_ng_ro(
+ logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
+ )
+ scaling_info.pop("vdu-create", None)
+ scaling_info.pop("vdu-delete", None)
+
+ scale_process = None
+ # SCALE RO - END
+
+ # SCALE KDU - BEGIN
+ if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
+ scale_process = "KDU"
+ await self._scale_kdu(
+ logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
+ )
+ scaling_info.pop("kdu-create", None)
+ scaling_info.pop("kdu-delete", None)
+
+ scale_process = None
+ # SCALE KDU - END
+
+ if db_nsr_update:
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ # SCALE-UP VCA - BEGIN
+ if vca_scaling_info:
+ step = db_nslcmop_update[
+ "detailed-status"
+ ] = "Creating new execution environments"
+ scale_process = "VCA"
+ for vca_info in vca_scaling_info:
+ if vca_info["type"] == "create" and not vca_info.get("osm_kdu_id"):
+ member_vnf_index = str(vca_info["member-vnf-index"])
+ self.logger.debug(
+ logging_text + "vdu info: {}".format(vca_info)
+ )
+ vnfd_id = db_vnfr["vnfd-ref"]
+ if vca_info.get("osm_vdu_id"):
+ vdu_index = int(vca_info["vdu_index"])
+ deploy_params = {"OSM": get_osm_params(db_vnfr)}
+ if db_vnfr.get("additionalParamsForVnf"):
+ deploy_params.update(
+ parse_yaml_strings(
+ db_vnfr["additionalParamsForVnf"].copy()
+ )
+ )
+ descriptor_config = get_configuration(
+ db_vnfd, db_vnfd["id"]
+ )
+ if descriptor_config:
+ vdu_id = None
+ vdu_name = None
+ kdu_name = None
+ kdu_index = None
+ self._deploy_n2vc(
+ logging_text=logging_text
+ + "member_vnf_index={} ".format(member_vnf_index),
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ kdu_index=kdu_index,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage,
+ )
+ vdu_id = vca_info["osm_vdu_id"]
+ vdur = find_in_list(
+ db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
+ )
+ descriptor_config = get_configuration(db_vnfd, vdu_id)
+ if vdur.get("additionalParams"):
+ deploy_params_vdu = parse_yaml_strings(
+ vdur["additionalParams"]
+ )
+ else:
+ deploy_params_vdu = deploy_params
+ deploy_params_vdu["OSM"] = get_osm_params(
+ db_vnfr, vdu_id, vdu_count_index=vdu_index
+ )
+ if descriptor_config:
+ vdu_name = None
+ kdu_name = None
+ kdu_index = None
+ stage[
+ 1
+ ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
+ member_vnf_index, vdu_id, vdu_index
+ )
+ stage[2] = step = "Scaling out VCA"
+ self._write_op_status(op_id=nslcmop_id, stage=stage)
+ self._deploy_n2vc(
+ logging_text=logging_text
+ + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
+ member_vnf_index, vdu_id, vdu_index
+ ),
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ kdu_index=kdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params_vdu,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage,
+ )
+ # SCALE-UP VCA - END
+ scale_process = None
+
+ # POST-SCALE BEGIN
+ # execute primitive service POST-SCALING
+ step = "Executing post-scale vnf-config-primitive"
+ if scaling_descriptor.get("scaling-config-action"):
+ for scaling_config_action in scaling_descriptor[
+ "scaling-config-action"
+ ]:
+ if (
+ scaling_config_action.get("trigger") == "post-scale-in"
+ and scaling_type == "SCALE_IN"
+ ) or (
+ scaling_config_action.get("trigger") == "post-scale-out"
+ and scaling_type == "SCALE_OUT"
+ ):
+ vnf_config_primitive = scaling_config_action[
+ "vnf-config-primitive-name-ref"
+ ]
+ step = db_nslcmop_update[
+ "detailed-status"
+ ] = "executing post-scale scaling-config-action '{}'".format(
+ vnf_config_primitive
+ )
+
+ vnfr_params = {"VDU_SCALE_INFO": scaling_info}
+ if db_vnfr.get("additionalParamsForVnf"):
+ vnfr_params.update(db_vnfr["additionalParamsForVnf"])
+
+ # look for primitive
+ for config_primitive in (
+ get_configuration(db_vnfd, db_vnfd["id"]) or {}
+ ).get("config-primitive", ()):
+ if config_primitive["name"] == vnf_config_primitive:
+ break
+ else:
+ raise LcmException(
+ "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
+ "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
+ "config-primitive".format(
+ scaling_group, vnf_config_primitive
+ )
+ )
+ scale_process = "VCA"
+ db_nsr_update["config-status"] = "configuring post-scaling"
+ primitive_params = self._map_primitive_params(
+ config_primitive, {}, vnfr_params
+ )
+
+ # Post-scale retry check: Check if this sub-operation has been executed before
+ op_index = self._check_or_add_scale_suboperation(
+ db_nslcmop,
+ vnf_index,
+ vnf_config_primitive,
+ primitive_params,
+ "POST-SCALE",
+ )
+ if op_index == self.SUBOPERATION_STATUS_SKIP:
+ # Skip sub-operation
+ result = "COMPLETED"
+ result_detail = "Done"
+ self.logger.debug(
+ logging_text
+ + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
+ vnf_config_primitive, result, result_detail
+ )
+ )
+ else:
+ if op_index == self.SUBOPERATION_STATUS_NEW:
+ # New sub-operation: Get index of this sub-operation
+ op_index = (
+ len(db_nslcmop.get("_admin", {}).get("operations"))
+ - 1
+ )
+ self.logger.debug(
+ logging_text
+ + "vnf_config_primitive={} New sub-operation".format(
+ vnf_config_primitive
+ )
+ )
+ else:
+ # retry: Get registered params for this existing sub-operation
+ op = db_nslcmop.get("_admin", {}).get("operations", [])[
+ op_index
+ ]
+ vnf_index = op.get("member_vnf_index")
+ vnf_config_primitive = op.get("primitive")
+ primitive_params = op.get("primitive_params")
+ self.logger.debug(
+ logging_text
+ + "vnf_config_primitive={} Sub-operation retry".format(
+ vnf_config_primitive
+ )
+ )
+ # Execute the primitive, either with new (first-time) or registered (reintent) args
+ ee_descriptor_id = config_primitive.get(
+ "execution-environment-ref"
+ )
+ primitive_name = config_primitive.get(
+ "execution-environment-primitive", vnf_config_primitive
+ )
+ ee_id, vca_type = self._look_for_deployed_vca(
+ nsr_deployed["VCA"],
+ member_vnf_index=vnf_index,
+ vdu_id=None,
+ vdu_count_index=None,
+ ee_descriptor_id=ee_descriptor_id,
+ )
+ result, result_detail = await self._ns_execute_primitive(
+ ee_id,
+ primitive_name,
+ primitive_params,
+ vca_type=vca_type,
+ vca_id=vca_id,
+ )
+ self.logger.debug(
+ logging_text
+ + "vnf_config_primitive={} Done with result {} {}".format(
+ vnf_config_primitive, result, result_detail
+ )
+ )
+ # Update operationState = COMPLETED | FAILED
+ self._update_suboperation_status(
+ db_nslcmop, op_index, result, result_detail
+ )
+
+ if result == "FAILED":
+ raise LcmException(result_detail)
+ db_nsr_update["config-status"] = old_config_status
+ scale_process = None
+ # POST-SCALE END
+
+ db_nsr_update[
+ "detailed-status"
+ ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
+ db_nsr_update["operational-status"] = (
+ "running"
+ if old_operational_status == "failed"
+ else old_operational_status
+ )
+ db_nsr_update["config-status"] = old_config_status
+ return
+ except (
+ ROclient.ROClientException,
+ DbException,
+ LcmException,
+ NgRoException,
+ ) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(
+ logging_text + "Cancelled Exception while '{}'".format(step)
+ )
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(
+ logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
+ exc_info=True,
+ )
+ finally:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="IDLE",
+ current_operation_id=None,
+ )
+ if tasks_dict_info:
+ stage[1] = "Waiting for instantiate pending tasks."
+ self.logger.debug(logging_text + stage[1])
+ exc = await self._wait_for_tasks(
+ logging_text,
+ tasks_dict_info,
+ self.timeout.ns_deploy,
+ stage,
+ nslcmop_id,
+ nsr_id=nsr_id,
+ )
+ if exc:
+ db_nslcmop_update[
+ "detailed-status"
+ ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
+ nslcmop_operation_state = "FAILED"
+ if db_nsr:
+ db_nsr_update["operational-status"] = old_operational_status
+ db_nsr_update["config-status"] = old_config_status
+ db_nsr_update["detailed-status"] = ""
+ if scale_process:
+ if "VCA" in scale_process:
+ db_nsr_update["config-status"] = "failed"
+ if "RO" in scale_process:
+ db_nsr_update["operational-status"] = "failed"
+ db_nsr_update[
+ "detailed-status"
+ ] = "FAILED scaling nslcmop={} {}: {}".format(
+ nslcmop_id, step, exc
+ )
+ else:
+ error_description_nslcmop = None
+ nslcmop_operation_state = "COMPLETED"
+ db_nslcmop_update["detailed-status"] = "Done"
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="IDLE",
+ current_operation_id=None,
+ other_update=db_nsr_update,
+ )
+
+ if nslcmop_operation_state:
+ try:
+ msg = {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ }
+ await self.msg.aiowrite("ns", "scaled", msg)
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
+
+ async def _scale_kdu(
+ self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
+ ):
+ _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
+ for kdu_name in _scaling_info:
+ for kdu_scaling_info in _scaling_info[kdu_name]:
+ deployed_kdu, index = get_deployed_kdu(
+ nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
+ )
+ cluster_uuid = deployed_kdu["k8scluster-uuid"]
+ kdu_instance = deployed_kdu["kdu-instance"]
+ kdu_model = deployed_kdu.get("kdu-model")
+ scale = int(kdu_scaling_info["scale"])
+ k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
+
+ db_dict = {
+ "collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": "_admin.deployed.K8s.{}".format(index),
+ }
+
+ step = "scaling application {}".format(
+ kdu_scaling_info["resource-name"]
+ )
+ self.logger.debug(logging_text + step)
+
+ if kdu_scaling_info["type"] == "delete":
+ kdu_config = get_configuration(db_vnfd, kdu_name)
+ if (
+ kdu_config
+ and kdu_config.get("terminate-config-primitive")
+ and get_juju_ee_ref(db_vnfd, kdu_name) is None
+ ):
+ terminate_config_primitive_list = kdu_config.get(
+ "terminate-config-primitive"
+ )
+ terminate_config_primitive_list.sort(
+ key=lambda val: int(val["seq"])
+ )
+
+ for (
+ terminate_config_primitive
+ ) in terminate_config_primitive_list:
+ primitive_params_ = self._map_primitive_params(
+ terminate_config_primitive, {}, {}
+ )
+ step = "execute terminate config primitive"
+ self.logger.debug(logging_text + step)
+ await asyncio.wait_for(
+ self.k8scluster_map[k8s_cluster_type].exec_primitive(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ primitive_name=terminate_config_primitive["name"],
+ params=primitive_params_,
+ db_dict=db_dict,
+ total_timeout=self.timeout.primitive,
+ vca_id=vca_id,
+ ),
+ timeout=self.timeout.primitive
+ * self.timeout.primitive_outer_factor,
+ )
+
+ await asyncio.wait_for(
+ self.k8scluster_map[k8s_cluster_type].scale(
+ kdu_instance=kdu_instance,
+ scale=scale,
+ resource_name=kdu_scaling_info["resource-name"],
+ total_timeout=self.timeout.scale_on_error,
+ vca_id=vca_id,
+ cluster_uuid=cluster_uuid,
+ kdu_model=kdu_model,
+ atomic=True,
+ db_dict=db_dict,
+ ),
+ timeout=self.timeout.scale_on_error
+ * self.timeout.scale_on_error_outer_factor,
+ )
+
+ if kdu_scaling_info["type"] == "create":
+ kdu_config = get_configuration(db_vnfd, kdu_name)
+ if (
+ kdu_config
+ and kdu_config.get("initial-config-primitive")
+ and get_juju_ee_ref(db_vnfd, kdu_name) is None
+ ):
+ initial_config_primitive_list = kdu_config.get(
+ "initial-config-primitive"
+ )
+ initial_config_primitive_list.sort(
+ key=lambda val: int(val["seq"])
+ )
+
+ for initial_config_primitive in initial_config_primitive_list:
+ primitive_params_ = self._map_primitive_params(
+ initial_config_primitive, {}, {}
+ )
+ step = "execute initial config primitive"
+ self.logger.debug(logging_text + step)
+ await asyncio.wait_for(
+ self.k8scluster_map[k8s_cluster_type].exec_primitive(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ primitive_name=initial_config_primitive["name"],
+ params=primitive_params_,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ ),
+ timeout=600,
+ )
+
+ async def _scale_ng_ro(
+ self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
+ ):
+ nsr_id = db_nslcmop["nsInstanceId"]
+ db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+ db_vnfrs = {}
+
+ # read from db: vnfd's for every vnf
+ db_vnfds = []
+
+ # for each vnf in ns, read vnfd
+ for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
+ db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
+ vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
+ # if we haven't this vnfd, read it from db
+ if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
+ # read from db
+ vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+ db_vnfds.append(vnfd)
+ n2vc_key = self.n2vc.get_public_key()
+ n2vc_key_list = [n2vc_key]
+ self.scale_vnfr(
+ db_vnfr,
+ vdu_scaling_info.get("vdu-create"),
+ vdu_scaling_info.get("vdu-delete"),
+ mark_delete=True,
+ )
+ # db_vnfr has been updated, update db_vnfrs to use it
+ db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
+ await self._instantiate_ng_ro(
+ logging_text,
+ nsr_id,
+ db_nsd,
+ db_nsr,
+ db_nslcmop,
+ db_vnfrs,
+ db_vnfds,
+ n2vc_key_list,
+ stage=stage,
+ start_deploy=time(),
+ timeout_ns_deploy=self.timeout.ns_deploy,
+ )
+ if vdu_scaling_info.get("vdu-delete"):
+ self.scale_vnfr(
+ db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
+ )
+
+ async def extract_prometheus_scrape_jobs(
+ self,
+ ee_id: str,
+ artifact_path: str,
+ ee_config_descriptor: dict,
+ vnfr_id: str,
+ nsr_id: str,
+ target_ip: str,
+ element_type: str,
+ vnf_member_index: str = "",
+ vdu_id: str = "",
+ vdu_index: int = None,
+ kdu_name: str = "",
+ kdu_index: int = None,
+ ) -> dict:
+ """Method to extract prometheus scrape jobs from EE's Prometheus template job file
+ This method will wait until the corresponding VDU or KDU is fully instantiated
+
+ Args:
+ ee_id (str): Execution Environment ID
+ artifact_path (str): Path where the EE's content is (including the Prometheus template file)
+ ee_config_descriptor (dict): Execution Environment's configuration descriptor
+ vnfr_id (str): VNFR ID where this EE applies
+ nsr_id (str): NSR ID where this EE applies
+ target_ip (str): VDU/KDU instance IP address
+ element_type (str): NS or VNF or VDU or KDU
+ vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
+ vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
+ vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
+ kdu_name (str, optional): KDU name where this EE applies. Defaults to "".
+ kdu_index (int, optional): KDU index where this EE applies. Defaults to None.
+
+ Raises:
+ LcmException: When the VDU or KDU instance was not found in an hour
+
+ Returns:
+ _type_: Prometheus jobs
+ """
+ # default the vdur and kdur names to an empty string, to avoid any later
+ # problem with Prometheus when the element type is not VDU or KDU
+ vdur_name = ""
+ kdur_name = ""
+
+ # look if exist a file called 'prometheus*.j2' and
+ artifact_content = self.fs.dir_ls(artifact_path)
+ job_file = next(
+ (
+ f
+ for f in artifact_content
+ if f.startswith("prometheus") and f.endswith(".j2")
+ ),
+ None,
+ )
+ if not job_file:
+ return
+ self.logger.debug("Artifact path{}".format(artifact_path))
+ self.logger.debug("job file{}".format(job_file))
+ with self.fs.file_open((artifact_path, job_file), "r") as f:
+ job_data = f.read()
+
+ # obtain the VDUR or KDUR, if the element type is VDU or KDU
+ if element_type in ("VDU", "KDU"):
+ for _ in range(360):
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
+ if vdu_id and vdu_index is not None:
+ vdur = next(
+ (
+ x
+ for x in get_iterable(db_vnfr, "vdur")
+ if (
+ x.get("vdu-id-ref") == vdu_id
+ and x.get("count-index") == vdu_index
+ )
+ ),
+ {},
+ )
+ if vdur.get("name"):
+ vdur_name = vdur.get("name")
+ break
+ if kdu_name and kdu_index is not None:
+ kdur = next(
+ (
+ x
+ for x in get_iterable(db_vnfr, "kdur")
+ if (
+ x.get("kdu-name") == kdu_name
+ and x.get("count-index") == kdu_index
+ )
+ ),
+ {},
+ )
+ if kdur.get("name"):
+ kdur_name = kdur.get("name")
+ break
+
+ await asyncio.sleep(10)
+ else:
+ if vdu_id and vdu_index is not None:
+ raise LcmException(
+ f"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
+ )
+ if kdu_name and kdu_index is not None:
+ raise LcmException(
+ f"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
+ )
+
+ if ee_id is not None:
+ _, namespace, helm_id = get_ee_id_parts(
+ ee_id
+ ) # get namespace and EE gRPC service name
+ host_name = f'{helm_id}-{ee_config_descriptor["metric-service"]}.{namespace}.svc' # svc_name.namespace.svc
+ host_port = "80"
+ vnfr_id = vnfr_id.replace("-", "")
+ variables = {
+ "JOB_NAME": vnfr_id,
+ "TARGET_IP": target_ip,
+ "EXPORTER_POD_IP": host_name,
+ "EXPORTER_POD_PORT": host_port,
+ "NSR_ID": nsr_id,
+ "VNF_MEMBER_INDEX": vnf_member_index,
+ "VDUR_NAME": vdur_name,
+ "KDUR_NAME": kdur_name,
+ "ELEMENT_TYPE": element_type,
+ }
+ else:
+ metric_path = ee_config_descriptor["metric-path"]
+ target_port = ee_config_descriptor["metric-port"]
+ vnfr_id = vnfr_id.replace("-", "")
+ variables = {
+ "JOB_NAME": vnfr_id,
+ "TARGET_IP": target_ip,
+ "TARGET_PORT": target_port,
+ "METRIC_PATH": metric_path,
+ }
+
+ job_list = parse_job(job_data, variables)
+ # ensure job_name is using the vnfr_id. Adding the metadata nsr_id