Bug 1875 - Scale of KDUs fails when the KDU is instantiated with additional params
[osm/LCM.git] / osm_lcm / ns.py
index 2184ce6..8ec39ad 100644 (file)
@@ -96,6 +96,7 @@ from n2vc.n2vc_juju_conn import N2VCJujuConnector
 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
 
 from osm_lcm.lcm_helm_conn import LCMHelmConn
+from osm_lcm.prometheus import parse_job
 
 from copy import copy, deepcopy
 from time import time
@@ -123,7 +124,7 @@ class NsLcm(LcmBase):
     SUBOPERATION_STATUS_SKIP = -3
     task_name_deploy_vca = "Deploying VCA"
 
-    def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
+    def __init__(self, msg, lcm_tasks, config, loop):
         """
         Init, Connect to database, filesystem storage, and messaging
         :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
@@ -200,8 +201,6 @@ class NsLcm(LcmBase):
             "helm-v3": self.conn_helm_ee,
         }
 
-        self.prometheus = prometheus
-
         # create RO client
         self.RO = NgRoClient(self.loop, **self.ro_config)
 
@@ -540,7 +539,7 @@ class NsLcm(LcmBase):
                     vdur_copy = deepcopy(vdur)
                     vdur_copy["status"] = "BUILD"
                     vdur_copy["status-detailed"] = None
-                    vdur_copy["ip-address"]: None
+                    vdur_copy["ip-address"] = None
                     vdur_copy["_id"] = str(uuid4())
                     vdur_copy["count-index"] += count + 1
                     vdur_copy["id"] = "{}-{}".format(
@@ -1987,7 +1986,7 @@ class NsLcm(LcmBase):
 
             # STEP 7 Configure metrics
             if vca_type == "helm" or vca_type == "helm-v3":
-                prometheus_jobs = await self.add_prometheus_metrics(
+                prometheus_jobs = await self.extract_prometheus_scrape_jobs(
                     ee_id=ee_id,
                     artifact_path=artifact_path,
                     ee_config_descriptor=ee_config_descriptor,
@@ -2002,6 +2001,17 @@ class NsLcm(LcmBase):
                         {db_update_entry + "prometheus_jobs": prometheus_jobs},
                     )
 
+                    for job in prometheus_jobs:
+                        self.db.set_one(
+                            "prometheus_jobs",
+                            {
+                                "job_name": job["job_name"]
+                            },
+                            job,
+                            upsert=True,
+                            fail_on_empty=False
+                        )
+
             step = "instantiated at VCA"
             self.logger.debug(logging_text + step)
 
@@ -2266,6 +2276,10 @@ class NsLcm(LcmBase):
             # read from db: operation
             stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
             db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+            if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
+                db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
+                    db_nslcmop["operationParams"]["additionalParamsForVnf"]
+                )
             ns_params = db_nslcmop.get("operationParams")
             if ns_params and ns_params.get("timeout_ns_deploy"):
                 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
@@ -2276,8 +2290,10 @@ class NsLcm(LcmBase):
 
             # read from db: ns
             stage[1] = "Getting nsr={} from db.".format(nsr_id)
+            self.logger.debug(logging_text + stage[1])
             db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
             stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
+            self.logger.debug(logging_text + stage[1])
             nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
             self.fs.sync(db_nsr["nsd-id"])
             db_nsr["nsd"] = nsd
@@ -2293,6 +2309,14 @@ class NsLcm(LcmBase):
 
             # for each vnf in ns, read vnfd
             for vnfr in db_vnfrs_list:
+                if vnfr.get("kdur"):
+                    kdur_list = []
+                    for kdur in vnfr["kdur"]:
+                        if kdur.get("additionalParams"):
+                            kdur["additionalParams"] = json.loads(kdur["additionalParams"])
+                        kdur_list.append(kdur)
+                    vnfr["kdur"] = kdur_list
+
                 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
                 vnfd_id = vnfr["vnfd-id"]
                 vnfd_ref = vnfr["vnfd-ref"]
@@ -3946,8 +3970,9 @@ class NsLcm(LcmBase):
                     "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
                 )
 
-        if vca_deployed.get("prometheus_jobs") and self.prometheus:
-            await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
+        # Delete Prometheus Jobs if any
+        # This uses NSR_ID, so it will destroy any jobs under this index
+        self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
 
         if destroy_ee:
             await self.vca_map[vca_type].delete_execution_environment(
@@ -4814,6 +4839,10 @@ class NsLcm(LcmBase):
             step = "Getting information from database"
             db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
             db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+            if db_nslcmop["operationParams"].get("primitive_params"):
+                db_nslcmop["operationParams"]["primitive_params"] = json.loads(
+                    db_nslcmop["operationParams"]["primitive_params"]
+                )
 
             nsr_deployed = db_nsr["_admin"].get("deployed")
             vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
@@ -4831,6 +4860,13 @@ class NsLcm(LcmBase):
                 db_vnfr = self.db.get_one(
                     "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
                 )
+                if db_vnfr.get("kdur"):
+                    kdur_list = []
+                    for kdur in db_vnfr["kdur"]:
+                        if kdur.get("additionalParams"):
+                            kdur["additionalParams"] = json.loads(kdur["additionalParams"])
+                        kdur_list.append(kdur)
+                    db_vnfr["kdur"] = kdur_list
                 step = "Getting vnfd from database"
                 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
 
@@ -5320,7 +5356,7 @@ class NsLcm(LcmBase):
                     for kdu_delta in delta.get("kdu-resource-delta", {}):
                         kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
                         kdu_name = kdu_profile["kdu-name"]
-                        resource_name = kdu_profile["resource-name"]
+                        resource_name = kdu_profile.get("resource-name", "")
 
                         # Might have different kdus in the same delta
                         # Should have list for each kdu
@@ -5336,7 +5372,6 @@ class NsLcm(LcmBase):
                                 and kdur.get("helm-version") == "v2"
                             ):
                                 k8s_cluster_type = "helm-chart"
-                            raise NotImplementedError
                         elif kdur.get("juju-bundle"):
                             k8s_cluster_type = "juju-bundle"
                         else:
@@ -5366,7 +5401,13 @@ class NsLcm(LcmBase):
                         kdu_instance = deployed_kdu.get("kdu-instance")
                         instance_num = await self.k8scluster_map[
                             k8s_cluster_type
-                        ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
+                        ].get_scale_count(
+                            resource_name,
+                            kdu_instance,
+                            vca_id=vca_id,
+                            cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
+                            kdu_model=deployed_kdu.get("kdu-model"),
+                        )
                         kdu_replica_count = instance_num + kdu_delta.get(
                             "number-of-instances", 1
                         )
@@ -5450,7 +5491,7 @@ class NsLcm(LcmBase):
                     for kdu_delta in delta.get("kdu-resource-delta", {}):
                         kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
                         kdu_name = kdu_profile["kdu-name"]
-                        resource_name = kdu_profile["resource-name"]
+                        resource_name = kdu_profile.get("resource-name", "")
 
                         if not scaling_info["kdu-delete"].get(kdu_name, None):
                             scaling_info["kdu-delete"][kdu_name] = []
@@ -5464,7 +5505,6 @@ class NsLcm(LcmBase):
                                 and kdur.get("helm-version") == "v2"
                             ):
                                 k8s_cluster_type = "helm-chart"
-                            raise NotImplementedError
                         elif kdur.get("juju-bundle"):
                             k8s_cluster_type = "juju-bundle"
                         else:
@@ -5492,7 +5532,13 @@ class NsLcm(LcmBase):
                         kdu_instance = deployed_kdu.get("kdu-instance")
                         instance_num = await self.k8scluster_map[
                             k8s_cluster_type
-                        ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
+                        ].get_scale_count(
+                            resource_name,
+                            kdu_instance,
+                            vca_id=vca_id,
+                            cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
+                            kdu_model=deployed_kdu.get("kdu-model"),
+                        )
                         kdu_replica_count = instance_num - kdu_delta.get(
                             "number-of-instances", 1
                         )
@@ -5691,7 +5737,7 @@ class NsLcm(LcmBase):
                 ] = "Deleting the execution environments"
                 scale_process = "VCA"
                 for vca_info in vca_scaling_info:
-                    if vca_info["type"] == "delete":
+                    if vca_info["type"] == "delete" and not vca_info.get("osm_kdu_id"):
                         member_vnf_index = str(vca_info["member-vnf-index"])
                         self.logger.debug(
                             logging_text + "vdu info: {}".format(vca_info)
@@ -5704,14 +5750,6 @@ class NsLcm(LcmBase):
                             ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
                                 member_vnf_index, vdu_id, vdu_index
                             )
-                        else:
-                            vdu_index = 0
-                            kdu_id = vca_info["osm_kdu_id"]
-                            stage[
-                                1
-                            ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
-                                member_vnf_index, kdu_id, vdu_index
-                            )
                         stage[2] = step = "Scaling in VCA"
                         self._write_op_status(op_id=nslcmop_id, stage=stage)
                         vca_update = db_nsr["_admin"]["deployed"]["VCA"]
@@ -5827,7 +5865,7 @@ class NsLcm(LcmBase):
                 ] = "Creating new execution environments"
                 scale_process = "VCA"
                 for vca_info in vca_scaling_info:
-                    if vca_info["type"] == "create":
+                    if vca_info["type"] == "create" and not vca_info.get("osm_kdu_id"):
                         member_vnf_index = str(vca_info["member-vnf-index"])
                         self.logger.debug(
                             logging_text + "vdu info: {}".format(vca_info)
@@ -5915,43 +5953,6 @@ class NsLcm(LcmBase):
                                     task_instantiation_info=tasks_dict_info,
                                     stage=stage,
                                 )
-                        else:
-                            kdu_name = vca_info["osm_kdu_id"]
-                            descriptor_config = get_configuration(db_vnfd, kdu_name)
-                            if descriptor_config:
-                                vdu_id = None
-                                kdu_index = int(vca_info["kdu_index"])
-                                vdu_name = None
-                                kdur = next(
-                                    x
-                                    for x in db_vnfr["kdur"]
-                                    if x["kdu-name"] == kdu_name
-                                )
-                                deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
-                                if kdur.get("additionalParams"):
-                                    deploy_params_kdu = parse_yaml_strings(
-                                        kdur["additionalParams"]
-                                    )
-
-                                self._deploy_n2vc(
-                                    logging_text=logging_text,
-                                    db_nsr=db_nsr,
-                                    db_vnfr=db_vnfr,
-                                    nslcmop_id=nslcmop_id,
-                                    nsr_id=nsr_id,
-                                    nsi_id=nsi_id,
-                                    vnfd_id=vnfd_id,
-                                    vdu_id=vdu_id,
-                                    kdu_name=kdu_name,
-                                    member_vnf_index=member_vnf_index,
-                                    vdu_index=kdu_index,
-                                    vdu_name=vdu_name,
-                                    deploy_params=deploy_params_kdu,
-                                    descriptor_config=descriptor_config,
-                                    base_folder=base_folder,
-                                    task_instantiation_info=tasks_dict_info,
-                                    stage=stage,
-                                )
             # SCALE-UP VCA - END
             scale_process = None
 
@@ -6198,6 +6199,7 @@ class NsLcm(LcmBase):
                 )
                 cluster_uuid = deployed_kdu["k8scluster-uuid"]
                 kdu_instance = deployed_kdu["kdu-instance"]
+                kdu_model = deployed_kdu.get("kdu-model")
                 scale = int(kdu_scaling_info["scale"])
                 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
 
@@ -6252,6 +6254,10 @@ class NsLcm(LcmBase):
                         scale,
                         kdu_scaling_info["resource-name"],
                         vca_id=vca_id,
+                        cluster_uuid=cluster_uuid,
+                        kdu_model=kdu_model,
+                        atomic=True,
+                        db_dict=db_dict,
                     ),
                     timeout=self.timeout_vca_on_error,
                 )
@@ -6335,11 +6341,15 @@ class NsLcm(LcmBase):
                 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
             )
 
-    async def add_prometheus_metrics(
-        self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
+    async def extract_prometheus_scrape_jobs(
+        self,
+        ee_id,
+        artifact_path,
+        ee_config_descriptor,
+        vnfr_id,
+        nsr_id,
+        target_ip
     ):
-        if not self.prometheus:
-            return
         # look if exist a file called 'prometheus*.j2' and
         artifact_content = self.fs.dir_ls(artifact_path)
         job_file = next(
@@ -6366,7 +6376,7 @@ class NsLcm(LcmBase):
             "EXPORTER_POD_IP": host_name,
             "EXPORTER_POD_PORT": host_port,
         }
-        job_list = self.prometheus.parse_job(job_data, variables)
+        job_list = parse_job(job_data, variables)
         # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
         for job in job_list:
             if (
@@ -6375,9 +6385,8 @@ class NsLcm(LcmBase):
             ):
                 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
             job["nsr_id"] = nsr_id
-        job_dict = {jl["job_name"]: jl for jl in job_list}
-        if await self.prometheus.update(job_dict):
-            return list(job_dict.keys())
+            job["vnfr_id"] = vnfr_id
+        return job_list
 
     def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
         """