+
+ async def _scale_kdu(
+ self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
+ ):
+ _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
+ for kdu_name in _scaling_info:
+ for kdu_scaling_info in _scaling_info[kdu_name]:
+ deployed_kdu, index = get_deployed_kdu(
+ nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
+ )
+ cluster_uuid = deployed_kdu["k8scluster-uuid"]
+ kdu_instance = deployed_kdu["kdu-instance"]
+ kdu_model = deployed_kdu.get("kdu-model")
+ scale = int(kdu_scaling_info["scale"])
+ k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
+
+ db_dict = {
+ "collection": "nsrs",
+ "filter": {"_id": nsr_id},
+ "path": "_admin.deployed.K8s.{}".format(index),
+ }
+
+ step = "scaling application {}".format(
+ kdu_scaling_info["resource-name"]
+ )
+ self.logger.debug(logging_text + step)
+
+ if kdu_scaling_info["type"] == "delete":
+ kdu_config = get_configuration(db_vnfd, kdu_name)
+ if (
+ kdu_config
+ and kdu_config.get("terminate-config-primitive")
+ and get_juju_ee_ref(db_vnfd, kdu_name) is None
+ ):
+ terminate_config_primitive_list = kdu_config.get(
+ "terminate-config-primitive"
+ )
+ terminate_config_primitive_list.sort(
+ key=lambda val: int(val["seq"])
+ )
+
+ for (
+ terminate_config_primitive
+ ) in terminate_config_primitive_list:
+ primitive_params_ = self._map_primitive_params(
+ terminate_config_primitive, {}, {}
+ )
+ step = "execute terminate config primitive"
+ self.logger.debug(logging_text + step)
+ await asyncio.wait_for(
+ self.k8scluster_map[k8s_cluster_type].exec_primitive(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ primitive_name=terminate_config_primitive["name"],
+ params=primitive_params_,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ ),
+ timeout=600,
+ )
+
+ await asyncio.wait_for(
+ self.k8scluster_map[k8s_cluster_type].scale(
+ kdu_instance,
+ scale,
+ kdu_scaling_info["resource-name"],
+ vca_id=vca_id,
+ cluster_uuid=cluster_uuid,
+ kdu_model=kdu_model,
+ atomic=True,
+ db_dict=db_dict,
+ ),
+ timeout=self.timeout_vca_on_error,
+ )
+
+ if kdu_scaling_info["type"] == "create":
+ kdu_config = get_configuration(db_vnfd, kdu_name)
+ if (
+ kdu_config
+ and kdu_config.get("initial-config-primitive")
+ and get_juju_ee_ref(db_vnfd, kdu_name) is None
+ ):
+ initial_config_primitive_list = kdu_config.get(
+ "initial-config-primitive"
+ )
+ initial_config_primitive_list.sort(
+ key=lambda val: int(val["seq"])
+ )
+
+ for initial_config_primitive in initial_config_primitive_list:
+ primitive_params_ = self._map_primitive_params(
+ initial_config_primitive, {}, {}
+ )
+ step = "execute initial config primitive"
+ self.logger.debug(logging_text + step)
+ await asyncio.wait_for(
+ self.k8scluster_map[k8s_cluster_type].exec_primitive(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ primitive_name=initial_config_primitive["name"],
+ params=primitive_params_,
+ db_dict=db_dict,
+ vca_id=vca_id,
+ ),
+ timeout=600,
+ )
+
+ async def _scale_ng_ro(
+ self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
+ ):
+ nsr_id = db_nslcmop["nsInstanceId"]
+ db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+ db_vnfrs = {}
+
+ # read from db: vnfd's for every vnf
+ db_vnfds = []
+
+ # for each vnf in ns, read vnfd
+ for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
+ db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
+ vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
+ # if we haven't this vnfd, read it from db
+ if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
+ # read from db
+ vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+ db_vnfds.append(vnfd)
+ n2vc_key = self.n2vc.get_public_key()
+ n2vc_key_list = [n2vc_key]
+ self.scale_vnfr(
+ db_vnfr,
+ vdu_scaling_info.get("vdu-create"),
+ vdu_scaling_info.get("vdu-delete"),
+ mark_delete=True,
+ )
+ # db_vnfr has been updated, update db_vnfrs to use it
+ db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
+ await self._instantiate_ng_ro(
+ logging_text,
+ nsr_id,
+ db_nsd,
+ db_nsr,
+ db_nslcmop,
+ db_vnfrs,
+ db_vnfds,
+ n2vc_key_list,
+ stage=stage,
+ start_deploy=time(),
+ timeout_ns_deploy=self.timeout_ns_deploy,
+ )
+ if vdu_scaling_info.get("vdu-delete"):
+ self.scale_vnfr(
+ db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
+ )
+
+ async def extract_prometheus_scrape_jobs(
+ self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
+ ):
+ # look if exist a file called 'prometheus*.j2' and
+ artifact_content = self.fs.dir_ls(artifact_path)
+ job_file = next(
+ (
+ f
+ for f in artifact_content
+ if f.startswith("prometheus") and f.endswith(".j2")
+ ),
+ None,
+ )
+ if not job_file:
+ return
+ with self.fs.file_open((artifact_path, job_file), "r") as f:
+ job_data = f.read()
+
+ # TODO get_service
+ _, _, service = ee_id.partition(".") # remove prefix "namespace."
+ host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
+ host_port = "80"
+ vnfr_id = vnfr_id.replace("-", "")
+ variables = {
+ "JOB_NAME": vnfr_id,
+ "TARGET_IP": target_ip,
+ "EXPORTER_POD_IP": host_name,
+ "EXPORTER_POD_PORT": host_port,
+ }
+ job_list = parse_job(job_data, variables)
+ # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
+ for job in job_list:
+ if (
+ not isinstance(job.get("job_name"), str)
+ or vnfr_id not in job["job_name"]
+ ):
+ job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
+ job["nsr_id"] = nsr_id
+ job["vnfr_id"] = vnfr_id
+ return job_list
+
+ def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
+ """
+ Get VCA Cloud and VCA Cloud Credentials for the VIM account
+
+ :param: vim_account_id: VIM Account ID
+
+ :return: (cloud_name, cloud_credential)
+ """
+ config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
+ return config.get("vca_cloud"), config.get("vca_cloud_credential")
+
+ def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
+ """
+ Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
+
+ :param: vim_account_id: VIM Account ID
+
+ :return: (cloud_name, cloud_credential)
+ """
+ config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
+ return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")
+
+ async def migrate(self, nsr_id, nslcmop_id):
+ """
+ Migrate VNFs and VDUs instances in a NS
+
+ :param: nsr_id: NS Instance ID
+ :param: nslcmop_id: nslcmop ID of migrate
+
+ """
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+ logging_text = "Task ns={} migrate ".format(nsr_id)
+ self.logger.debug(logging_text + "Enter")
+ # get all needed from database
+ db_nslcmop = None
+ db_nslcmop_update = {}
+ nslcmop_operation_state = None
+ db_nsr_update = {}
+ target = {}
+ exc = None
+ # in case of error, indicates what part of scale was failed to put nsr at error status
+ start_deploy = time()
+
+ try:
+ # wait for any previous tasks in process
+ step = "Waiting for previous operations to terminate"
+ await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
+
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="MIGRATING",
+ current_operation_id=nslcmop_id
+ )
+ step = "Getting nslcmop from database"
+ self.logger.debug(step + " after having waited for previous tasks to be completed")
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ migrate_params = db_nslcmop.get("operationParams")
+
+ target = {}
+ target.update(migrate_params)
+ desc = await self.RO.migrate(nsr_id, target)
+ self.logger.debug("RO return > {}".format(desc))
+ action_id = desc["action_id"]
+ await self._wait_ng_ro(
+ nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_migrate
+ )
+ except (ROclient.ROClientException, DbException, LcmException) as e:
+ self.logger.error("Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error("Cancelled Exception while '{}'".format(step))
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical("Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
+ finally:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="IDLE",
+ current_operation_id=None,
+ )
+ if exc:
+ db_nslcmop_update[
+ "detailed-status"
+ ] = "FAILED {}: {}".format(step, exc)
+ nslcmop_operation_state = "FAILED"
+ else:
+ nslcmop_operation_state = "COMPLETED"
+ db_nslcmop_update["detailed-status"] = "Done"
+ db_nsr_update["detailed-status"] = "Done"
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message="",
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+ if nslcmop_operation_state:
+ try:
+ msg = {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ }
+ await self.msg.aiowrite("ns", "migrated", msg, loop=self.loop)
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_migrate")