Bug 585 Fix for scaling 11/10311/9
authoraktas <emin.aktas@ulakhaberlesme.com.tr>
Fri, 12 Feb 2021 19:19:10 +0000 (22:19 +0300)
committeraktas <emin.aktas@ulakhaberlesme.com.tr>
Thu, 25 Feb 2021 14:49:13 +0000 (17:49 +0300)
This fix should be merged with this
https://osm.etsi.org/gerrit/c/osm/N2VC/+/10364

Change-Id: I43fb4e5c81dbbaed07f01ba1a3ba399f7425b347
Signed-off-by: aktas <emin.aktas@ulakhaberlesme.com.tr>
osm_lcm/data_utils/vnfd.py
osm_lcm/data_utils/vnfr.py
osm_lcm/ns.py

index f816a8d..1b45b53 100644 (file)
@@ -96,7 +96,7 @@ def get_configuration(vnfd, entity_id):
     if not ops_vnf:
         return None
     day12ops = ops_vnf.get("day1-2", [])
-    list_utils.find_in_list(
+    return list_utils.find_in_list(
         day12ops,
         lambda configuration: configuration["id"] == entity_id)
 
@@ -138,7 +138,7 @@ def get_number_of_instances(vnfd, vdu_id):
             ()
         ),
         lambda a_vdu: a_vdu["vdu-id"] == vdu_id
-    )["number-of-instances"]
+    ).get("number-of-instances", 1)
 
 
 def get_juju_ee_ref(vnfd, entity_id):
index 042788e..9c0b148 100644 (file)
@@ -23,6 +23,7 @@
 ##
 
 from osm_lcm.data_utils import list_utils
+from osm_lcm.lcm_utils import get_iterable
 
 
 def find_VNFR_by_VDU_ID(vnfr, vdu_id):
@@ -57,3 +58,11 @@ def get_osm_params(db_vnfr, vdu_id=None, vdu_count_index=0):
             osm_params["vdu_id"] = vdu_id
             osm_params["count_index"] = vdu_count_index
     return osm_params
+
+
+def get_vdur_index(db_vnfr, vdu_delta):
+    vdur_list = get_iterable(db_vnfr, "vdur")
+    if vdur_list:
+        return len([x for x in vdur_list if x.get("vdu-id-ref") == vdu_delta["id"]])
+    else:
+        return 0
index 4077d01..e5b1060 100644 (file)
@@ -33,7 +33,7 @@ from osm_lcm.data_utils.vnfd import get_vdu_list, get_vdu_profile, \
     get_kdu_list, get_virtual_link_profiles, get_vdu, get_configuration, \
     get_vdu_index, get_scaling_aspect, get_number_of_instances, get_juju_ee_ref
 from osm_lcm.data_utils.list_utils import find_in_list
-from osm_lcm.data_utils.vnfr import get_osm_params
+from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index
 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
 from osm_lcm.data_utils.database.vim_account import VimAccountDB
 from n2vc.k8s_helm_conn import K8sHelmConnector
@@ -1163,14 +1163,14 @@ class NsLcm(LcmBase):
             if vnfr_id:
                 element_type = 'VNF'
                 element_under_configuration = vnfr_id
-                namespace += ".{}".format(vnfr_id)
+                namespace += ".{}-{}".format(vnfr_id, vdu_index or 0)
                 if vdu_id:
                     namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
                     element_type = 'VDU'
                     element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
                     osm_config["osm"]["vdu_id"] = vdu_id
                 elif kdu_name:
-                    namespace += ".{}".format(kdu_name)
+                    namespace += ".{}.{}".format(kdu_name, vdu_index or 0)
                     element_type = 'KDU'
                     element_under_configuration = kdu_name
                     osm_config["osm"]["kdu_name"] = kdu_name
@@ -2734,7 +2734,7 @@ class NsLcm(LcmBase):
                 return vca["ee_id"]
 
     async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
-                           vca_index, destroy_ee=True, exec_primitives=True):
+                           vca_index, destroy_ee=True, exec_primitives=True, scaling_in=False):
         """
         Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
         :param logging_text:
@@ -2745,6 +2745,7 @@ class NsLcm(LcmBase):
         :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
         :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
                             not executed properly
+        :param scaling_in: True destroys the application, False destroys the model
         :return: None or exception
         """
 
@@ -2802,7 +2803,7 @@ class NsLcm(LcmBase):
             await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
 
         if destroy_ee:
-            await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
+            await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"], scaling_in=scaling_in)
 
     async def _delete_all_N2VC(self, db_nsr: dict):
         self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
@@ -3043,7 +3044,7 @@ class NsLcm(LcmBase):
                     config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
                 else:
                     db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
-                    config_descriptor = get_configuration(db_vnfd, db_vnfd["id"]) 
+                    config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
                 vca_type = vca.get("type")
                 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
                                              vca.get("needed_terminate"))
@@ -3608,6 +3609,7 @@ class NsLcm(LcmBase):
 
         logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
         stage = ['', '', '']
+        tasks_dict_info = {}
         # ^ stage, step, VIM progress
         self.logger.debug(logging_text + "Enter")
         # get all needed from database
@@ -3619,6 +3621,7 @@ class NsLcm(LcmBase):
         scale_process = None
         old_operational_status = ""
         old_config_status = ""
+        nsi_id = None
         try:
             # wait for any previous tasks in process
             step = "Waiting for previous operations to terminate"
@@ -3663,6 +3666,8 @@ class NsLcm(LcmBase):
             step = "Getting vnfd from database"
             db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
 
+            base_folder = db_vnfd["_admin"]["storage"]
+
             step = "Getting scaling-group-descriptor"
             scaling_descriptor = find_in_list(
                 get_scaling_aspect(
@@ -3689,6 +3694,7 @@ class NsLcm(LcmBase):
                     admin_scale_index += 1
                     db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
             RO_scaling_info = []
+            VCA_scaling_info = []
             vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
             if scaling_type == "SCALE_OUT":
                 if "aspect-delta-details" not in scaling_descriptor:
@@ -3705,7 +3711,7 @@ class NsLcm(LcmBase):
                 for delta in deltas:
                     for vdu_delta in delta["vdu-delta"]:
                         vdud = get_vdu(db_vnfd, vdu_delta["id"])
-                        vdu_index = get_vdu_index(db_vnfr, vdu_delta["id"])
+                        vdu_index = get_vdur_index(db_vnfr, vdu_delta)
                         cloud_init_text = self._get_vdu_cloud_init_content(vdud, db_vnfd)
                         if cloud_init_text:
                             additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
@@ -3716,11 +3722,11 @@ class NsLcm(LcmBase):
                         if vdu_profile and "max-number-of-instances" in vdu_profile:
                             max_instance_count = vdu_profile.get("max-number-of-instances", 10)
                         
-                        deafult_instance_num = get_number_of_instances(db_vnfd, vdud["id"])
+                        default_instance_num = get_number_of_instances(db_vnfd, vdud["id"])
 
                         nb_scale_op += vdu_delta.get("number-of-instances", 1)
 
-                        if nb_scale_op + deafult_instance_num > max_instance_count:
+                        if nb_scale_op + default_instance_num > max_instance_count:
                             raise LcmException(
                                 "reached the limit of {} (max-instance-count) "
                                 "scaling-out operations for the "
@@ -3742,6 +3748,14 @@ class NsLcm(LcmBase):
                                         vdud["id"]
                                     )
                                 )
+                                VCA_scaling_info.append(
+                                    {
+                                        "osm_vdu_id": vdu_delta["id"],
+                                        "member-vnf-index": vnf_index,
+                                        "type": "create",
+                                        "vdu_index": vdu_index + x
+                                    }
+                                )
                         RO_scaling_info.append(
                             {
                                 "osm_vdu_id": vdu_delta["id"],
@@ -3763,21 +3777,32 @@ class NsLcm(LcmBase):
                 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
                 for delta in deltas:
                     for vdu_delta in delta["vdu-delta"]:
+                        vdu_index = get_vdur_index(db_vnfr, vdu_delta)
                         min_instance_count = 0
                         vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
                         if vdu_profile and "min-number-of-instances" in vdu_profile:
                             min_instance_count = vdu_profile["min-number-of-instances"]
 
-                        deafult_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"])
+                        default_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"])
 
                         nb_scale_op -= vdu_delta.get("number-of-instances", 1)
-                        if nb_scale_op + deafult_instance_num < min_instance_count:
+                        if nb_scale_op + default_instance_num < min_instance_count:
                             raise LcmException(
                                 "reached the limit of {} (min-instance-count) scaling-in operations for the "
                                 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
                             )
                         RO_scaling_info.append({"osm_vdu_id": vdu_delta["id"], "member-vnf-index": vnf_index,
-                                                "type": "delete", "count": vdu_delta.get("number-of-instances", 1)})
+                                                "type": "delete", "count": vdu_delta.get("number-of-instances", 1),
+                                                "vdu_index": vdu_index - 1})
+                        for x in range(vdu_delta.get("number-of-instances", 1)):
+                            VCA_scaling_info.append(
+                                {
+                                    "osm_vdu_id": vdu_delta["id"],
+                                    "member-vnf-index": vnf_index,
+                                    "type": "delete",
+                                    "vdu_index": vdu_index - 1 - x
+                                }
+                            )
                         vdu_scaling_info["vdu-delete"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
 
             # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
@@ -3879,6 +3904,68 @@ class NsLcm(LcmBase):
             db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
             db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
 
+            # SCALE-IN VCA - BEGIN
+            if VCA_scaling_info:
+                step = db_nslcmop_update["detailed-status"] = \
+                    "Deleting the execution environments"
+                scale_process = "VCA"
+                for vdu_info in VCA_scaling_info:
+                    if vdu_info["type"] == "delete":
+                        member_vnf_index = str(vdu_info["member-vnf-index"])
+                        self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
+                        vdu_id = vdu_info["osm_vdu_id"]
+                        vdu_index = int(vdu_info["vdu_index"])
+                        stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
+                            member_vnf_index, vdu_id, vdu_index)
+                        stage[2] = step = "Scaling in VCA"
+                        self._write_op_status(
+                            op_id=nslcmop_id,
+                            stage=stage
+                        )
+                        vca_update = db_nsr["_admin"]["deployed"]["VCA"]
+                        config_update = db_nsr["configurationStatus"]
+                        for vca_index, vca in enumerate(vca_update):
+                            if (vca or vca.get("ee_id")) and vca["member-vnf-index"] == member_vnf_index and \
+                                    vca["vdu_count_index"] == vdu_index:
+                                if vca.get("vdu_id"):
+                                    config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
+                                elif vca.get("kdu_name"):
+                                    config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
+                                else:
+                                    config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
+                                operation_params = db_nslcmop.get("operationParams") or {}
+                                exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
+                                                             vca.get("needed_terminate"))
+                                task = asyncio.ensure_future(asyncio.wait_for(
+                                    self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
+                                                      vca_index, destroy_ee=True,
+                                                      exec_primitives=exec_terminate_primitives,
+                                                      scaling_in=True), timeout=self.timeout_charm_delete))
+                                # wait before next removal
+                                await asyncio.sleep(30)
+                                tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
+                                del vca_update[vca_index]
+                                del config_update[vca_index]
+                        # wait for pending tasks of terminate primitives
+                        if tasks_dict_info:
+                            self.logger.debug(logging_text +
+                                              'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
+                            error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
+                                                                    min(self.timeout_charm_delete,
+                                                                        self.timeout_ns_terminate),
+                                                                    stage, nslcmop_id)
+                            tasks_dict_info.clear()
+                            if error_list:
+                                raise LcmException("; ".join(error_list))
+
+                        db_vca_and_config_update = {
+                            "_admin.deployed.VCA": vca_update,
+                            "configurationStatus": config_update
+                        }
+                        self.update_db_2("nsrs", db_nsr["_id"], db_vca_and_config_update)
+            scale_process = None
+            # SCALE-IN VCA - END
+
             # SCALE RO - BEGIN
             if RO_scaling_info:
                 scale_process = "RO"
@@ -3890,6 +3977,117 @@ class NsLcm(LcmBase):
             scale_process = None
             if db_nsr_update:
                 self.update_db_2("nsrs", nsr_id, db_nsr_update)
+            # SCALE RO - END
+
+            # SCALE-UP VCA - BEGIN
+            if VCA_scaling_info:
+                step = db_nslcmop_update["detailed-status"] = \
+                    "Creating new execution environments"
+                scale_process = "VCA"
+                for vdu_info in VCA_scaling_info:
+                    if vdu_info["type"] == "create":
+                        member_vnf_index = str(vdu_info["member-vnf-index"])
+                        self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
+                        vnfd_id = db_vnfr["vnfd-ref"]
+                        vdu_index = int(vdu_info["vdu_index"])
+                        deploy_params = {"OSM": get_osm_params(db_vnfr)}
+                        if db_vnfr.get("additionalParamsForVnf"):
+                            deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
+                        descriptor_config = get_configuration(db_vnfd, db_vnfd["id"])
+                        if descriptor_config:
+                            vdu_id = None
+                            vdu_name = None
+                            kdu_name = None
+                            self._deploy_n2vc(
+                                logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
+                                db_nsr=db_nsr,
+                                db_vnfr=db_vnfr,
+                                nslcmop_id=nslcmop_id,
+                                nsr_id=nsr_id,
+                                nsi_id=nsi_id,
+                                vnfd_id=vnfd_id,
+                                vdu_id=vdu_id,
+                                kdu_name=kdu_name,
+                                member_vnf_index=member_vnf_index,
+                                vdu_index=vdu_index,
+                                vdu_name=vdu_name,
+                                deploy_params=deploy_params,
+                                descriptor_config=descriptor_config,
+                                base_folder=base_folder,
+                                task_instantiation_info=tasks_dict_info,
+                                stage=stage
+                            )
+                        vdu_id = vdu_info["osm_vdu_id"]
+                        vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
+                        descriptor_config = get_configuration(db_vnfd, vdu_id)
+                        if vdur.get("additionalParams"):
+                            deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
+                        else:
+                            deploy_params_vdu = deploy_params
+                        deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=vdu_index)
+                        if descriptor_config:
+                            vdu_name = None
+                            kdu_name = None
+                            stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
+                                member_vnf_index, vdu_id, vdu_index)
+                            stage[2] = step = "Scaling out VCA"
+                            self._write_op_status(
+                                op_id=nslcmop_id,
+                                stage=stage
+                            )
+                            self._deploy_n2vc(
+                                logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
+                                    member_vnf_index, vdu_id, vdu_index),
+                                db_nsr=db_nsr,
+                                db_vnfr=db_vnfr,
+                                nslcmop_id=nslcmop_id,
+                                nsr_id=nsr_id,
+                                nsi_id=nsi_id,
+                                vnfd_id=vnfd_id,
+                                vdu_id=vdu_id,
+                                kdu_name=kdu_name,
+                                member_vnf_index=member_vnf_index,
+                                vdu_index=vdu_index,
+                                vdu_name=vdu_name,
+                                deploy_params=deploy_params_vdu,
+                                descriptor_config=descriptor_config,
+                                base_folder=base_folder,
+                                task_instantiation_info=tasks_dict_info,
+                                stage=stage
+                            )
+                        # TODO: scaling for kdu is not implemented yet.
+                        kdu_name = vdu_info["osm_vdu_id"]
+                        descriptor_config = get_configuration(db_vnfd, kdu_name)
+                        if descriptor_config:
+                            vdu_id = None
+                            vdu_index = vdu_index
+                            vdu_name = None
+                            kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
+                            deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
+                            if kdur.get("additionalParams"):
+                                deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
+
+                            self._deploy_n2vc(
+                                logging_text=logging_text,
+                                db_nsr=db_nsr,
+                                db_vnfr=db_vnfr,
+                                nslcmop_id=nslcmop_id,
+                                nsr_id=nsr_id,
+                                nsi_id=nsi_id,
+                                vnfd_id=vnfd_id,
+                                vdu_id=vdu_id,
+                                kdu_name=kdu_name,
+                                member_vnf_index=member_vnf_index,
+                                vdu_index=vdu_index,
+                                vdu_name=vdu_name,
+                                deploy_params=deploy_params_kdu,
+                                descriptor_config=descriptor_config,
+                                base_folder=base_folder,
+                                task_instantiation_info=tasks_dict_info,
+                                stage=stage
+                            )
+            # SCALE-UP VCA - END
+            scale_process = None
 
             # POST-SCALE BEGIN
             # execute primitive service POST-SCALING
@@ -3984,6 +4182,11 @@ class NsLcm(LcmBase):
             self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
         finally:
             self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE", current_operation_id=None)
+            if tasks_dict_info:
+                stage[1] = "Waiting for instantiate pending tasks."
+                self.logger.debug(logging_text + stage[1])
+                exc = await self._wait_for_tasks(logging_text, tasks_dict_info, self.timeout_ns_deploy,
+                                                 stage, nslcmop_id, nsr_id=nsr_id)
             if exc:
                 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
                 nslcmop_operation_state = "FAILED"