Feature 10239: Distributed VCA 75/10575/9
authorDavid Garcia <david.garcia@canonical.com>
Wed, 31 Mar 2021 17:12:02 +0000 (19:12 +0200)
committerDavid Garcia <david.garcia@canonical.com>
Mon, 3 May 2021 14:45:41 +0000 (16:45 +0200)
- Handle VCATopic: https://osm.etsi.org/gerrit/#/c/osm/NBI/+/10574/
- Pass vca_id to calls in N2VC, so N2VC can know to which VCA it needs
to talk

Depends on the following patch: https://osm.etsi.org/gerrit/#/c/osm/N2VC/+/10616/

Change-Id: I080c1aab94f70de83f2d33def74ccd03450dbdd6
Signed-off-by: David Garcia <david.garcia@canonical.com>
osm_lcm/lcm.py
osm_lcm/lcm_helm_conn.py
osm_lcm/lcm_utils.py
osm_lcm/ns.py
osm_lcm/tests/test_vim_sdn.py [new file with mode: 0644]
osm_lcm/vim_sdn.py
requirements-dev.txt

index 659a57c..ebfca7e 100644 (file)
@@ -104,7 +104,7 @@ class Lcm:
             self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index+1]
 
         self.loop = loop or asyncio.get_event_loop()
-        self.ns = self.netslice = self.vim = self.wim = self.sdn = self.k8scluster = self.k8srepo = None
+        self.ns = self.netslice = self.vim = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None
 
         # logging
         log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
@@ -309,6 +309,17 @@ class Lcm:
                 task = asyncio.ensure_future(self.k8scluster.delete(params, order_id))
                 self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task)
                 return
+        elif topic == "vca":
+            if command == "create" or command == "created":
+                vca_id = params.get("_id")
+                task = asyncio.ensure_future(self.vca.create(params, order_id))
+                self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task)
+                return
+            elif command == "delete" or command == "deleted":
+                vca_id = params.get("_id")
+                task = asyncio.ensure_future(self.vca.delete(params, order_id))
+                self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task)
+                return
         elif topic == "k8srepo":
             if command == "create" or command == "created":
                 k8srepo_id = params.get("_id")
@@ -486,7 +497,7 @@ class Lcm:
         self.first_start = True
         while self.consecutive_errors < 10:
             try:
-                topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo", "pla")
+                topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "vca", "k8srepo", "pla")
                 topics_admin = ("admin", )
                 await asyncio.gather(
                     self.msg.aioread(topics, self.loop, self.kafka_read_callback, from_beginning=True),
@@ -522,6 +533,7 @@ class Lcm:
         self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop)
         self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop)
         self.k8scluster = vim_sdn.K8sClusterLcm(self.msg, self.lcm_tasks, self.config, self.loop)
+        self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop)
         self.k8srepo = vim_sdn.K8sRepoLcm(self.msg, self.lcm_tasks, self.config, self.loop)
 
         # configure tsdb prometheus
index 34e4915..a13824d 100644 (file)
@@ -88,8 +88,6 @@ class LCMHelmConn(N2VCConnector, LcmBase):
     def __init__(self,
                  log: object = None,
                  loop: object = None,
-                 url: str = None,
-                 username: str = None,
                  vca_config: dict = None,
                  on_update_db=None, ):
         """
@@ -104,14 +102,12 @@ class LCMHelmConn(N2VCConnector, LcmBase):
             self,
             log=log,
             loop=loop,
-            url=url,
-            username=username,
-            vca_config=vca_config,
             on_update_db=on_update_db,
             db=self.db,
             fs=self.fs
         )
 
+        self.vca_config = vca_config
         self.log.debug("Initialize helm N2VC connector")
         self.log.debug("initial vca_config: {}".format(vca_config))
 
index a05e5ac..a1569c1 100644 (file)
@@ -146,7 +146,7 @@ class TaskRegistry(LcmBase):
 
     # NS/NSI: "services" VIM/WIM/SDN: "accounts"
     topic_service_list = ['ns', 'nsi']
-    topic_account_list = ['vim', 'wim', 'sdn', 'k8scluster', 'k8srepo']
+    topic_account_list = ['vim', 'wim', 'sdn', 'k8scluster', 'vca', 'k8srepo']
 
     # Map topic to InstanceID
     topic2instid_dict = {
@@ -161,6 +161,7 @@ class TaskRegistry(LcmBase):
         'wim': 'wim_accounts',
         'sdn': 'sdns',
         'k8scluster': 'k8sclusters',
+        'vca': 'vca',
         'k8srepo': 'k8srepos'}
 
     def __init__(self, worker_id=None, logger=None):
@@ -171,6 +172,7 @@ class TaskRegistry(LcmBase):
             "wim_account": {},
             "sdn": {},
             "k8scluster": {},
+            "vca": {},
             "k8srepo": {},
         }
         self.worker_id = worker_id
index 1bcf4c7..0f5c6ab 100644 (file)
@@ -97,9 +97,6 @@ class NsLcm(LcmBase):
         self.n2vc = N2VCJujuConnector(
             log=self.logger,
             loop=self.loop,
-            url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
-            username=self.vca_config.get('user', None),
-            vca_config=self.vca_config,
             on_update_db=self._on_update_n2vc_db,
             fs=self.fs,
             db=self.db
@@ -108,8 +105,6 @@ class NsLcm(LcmBase):
         self.conn_helm_ee = LCMHelmConn(
             log=self.logger,
             loop=self.loop,
-            url=None,
-            username=None,
             vca_config=self.vca_config,
             on_update_db=self._on_update_n2vc_db
         )
@@ -138,7 +133,6 @@ class NsLcm(LcmBase):
             log=self.logger,
             loop=self.loop,
             on_update_db=self._on_update_k8s_db,
-            vca_config=self.vca_config,
             fs=self.fs,
             db=self.db
         )
@@ -200,7 +194,7 @@ class NsLcm(LcmBase):
         except Exception as e:
             self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
 
-    async def _on_update_n2vc_db(self, table, filter, path, updated_data):
+    async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
 
         # remove last dot from path (if exists)
         if path.endswith('.'):
@@ -217,12 +211,12 @@ class NsLcm(LcmBase):
             current_ns_status = nsr.get('nsState')
 
             # get vca status for NS
-            status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
+            status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False, vca_id=vca_id)
 
             # vcaStatus
             db_dict = dict()
             db_dict['vcaStatus'] = status_dict
-            await self.n2vc.update_vca_status(db_dict['vcaStatus'])
+            await self.n2vc.update_vca_status(db_dict['vcaStatus'], vca_id=vca_id)
 
             # update configurationStatus for this VCA
             try:
@@ -289,7 +283,7 @@ class NsLcm(LcmBase):
         except Exception as e:
             self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
 
-    async def _on_update_k8s_db(self, cluster_uuid, kdu_instance, filter=None):
+    async def _on_update_k8s_db(self, cluster_uuid, kdu_instance, filter=None, vca_id=None):
         """
         Updating vca status in NSR record
         :param cluster_uuid: UUID of a k8s cluster
@@ -305,15 +299,22 @@ class NsLcm(LcmBase):
             nsr_id = filter.get('_id')
 
             # get vca status for NS
-            vca_status = await self.k8sclusterjuju.status_kdu(cluster_uuid,
-                                                              kdu_instance,
-                                                              complete_status=True,
-                                                              yaml_format=False)
+            vca_status = await self.k8sclusterjuju.status_kdu(
+                cluster_uuid,
+                kdu_instance,
+                complete_status=True,
+                yaml_format=False,
+                vca_id=vca_id,
+            )
             # vcaStatus
             db_dict = dict()
             db_dict['vcaStatus'] = {nsr_id: vca_status}
 
-            await self.k8sclusterjuju.update_vca_status(db_dict['vcaStatus'], kdu_instance)
+            await self.k8sclusterjuju.update_vca_status(
+                db_dict['vcaStatus'],
+                kdu_instance,
+                vca_id=vca_id,
+            )
 
             # write to database
             self.update_db_2("nsrs", nsr_id, db_dict)
@@ -1179,6 +1180,12 @@ class NsLcm(LcmBase):
 
         raise LcmException("Configuration aborted because dependent charm/s timeout")
 
+    def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
+        return (
+            deep_get(db_vnfr, ("vca-id",)) or
+            deep_get(db_nsr, ("instantiate_params", "vcaId"))
+        )
+
     async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
                                config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
                                ee_config_descriptor):
@@ -1248,12 +1255,7 @@ class NsLcm(LcmBase):
             # find old ee_id if exists
             ee_id = vca_deployed.get("ee_id")
 
-            vim_account_id = (
-                deep_get(db_vnfr, ("vim-account-id",)) or
-                deep_get(deploy_params, ("OSM", "vim_account_id"))
-            )
-            vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id)
-            vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id)
+            vca_id = self.get_vca_id(db_vnfr, db_nsr)
             # create or register execution environment in VCA
             if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
 
@@ -1276,8 +1278,7 @@ class NsLcm(LcmBase):
                         namespace=namespace,
                         artifact_path=artifact_path,
                         db_dict=db_dict,
-                        cloud_name=vca_k8s_cloud,
-                        credential_name=vca_k8s_cloud_credential,
+                        vca_id=vca_id,
                     )
                 elif vca_type == "helm" or vca_type == "helm-v3":     
                     ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
@@ -1293,8 +1294,7 @@ class NsLcm(LcmBase):
                         namespace=namespace,
                         reuse_ee_id=ee_id,
                         db_dict=db_dict,
-                        cloud_name=vca_cloud,
-                        credential_name=vca_cloud_credential,
+                        vca_id=vca_id,
                     )
 
             elif vca_type == "native_charm":
@@ -1333,8 +1333,7 @@ class NsLcm(LcmBase):
                     credentials=credentials,
                     namespace=namespace,
                     db_dict=db_dict,
-                    cloud_name=vca_cloud,
-                    credential_name=vca_cloud_credential,
+                    vca_id=vca_id,
                 )
 
             # for compatibility with MON/POL modules, the need model and application name at database
@@ -1388,6 +1387,7 @@ class NsLcm(LcmBase):
                     db_dict=db_dict,
                     config=config,
                     num_units=num_units,
+                    vca_id=vca_id,
                 )
 
             # write in db flag of configuration_sw already installed
@@ -1395,7 +1395,7 @@ class NsLcm(LcmBase):
 
             # add relations for this VCA (wait for other peers related with this VCA)
             await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
-                                          vca_index=vca_index, vca_type=vca_type)
+                                          vca_index=vca_index, vca_id=vca_id, vca_type=vca_type)
 
             # if SSH access is required, then get execution environment SSH public
             # if native charm we have waited already to VM be UP
@@ -1408,7 +1408,11 @@ class NsLcm(LcmBase):
                     # Needed to inject a ssh key
                     user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
                     step = "Install configuration Software, getting public ssh key"
-                    pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
+                    pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
+                        ee_id=ee_id,
+                        db_dict=db_dict,
+                        vca_id=vca_id
+                    )
 
                     step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
                 else:
@@ -1476,7 +1480,8 @@ class NsLcm(LcmBase):
                     ee_id=ee_id,
                     primitive_name=initial_config_primitive["name"],
                     params_dict=primitive_params_,
-                    db_dict=db_dict
+                    db_dict=db_dict,
+                    vca_id=vca_id,
                 )
                 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
                 if check_if_terminated_needed:
@@ -2063,8 +2068,15 @@ class NsLcm(LcmBase):
             self.logger.debug(logging_text + "Exit")
             self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
 
-    async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
-                                 timeout: int = 3600, vca_type: str = None) -> bool:
+    async def _add_vca_relations(
+        self,
+        logging_text,
+        nsr_id,
+        vca_index: int,
+        timeout: int = 3600,
+        vca_type: str = None,
+        vca_id: str = None,
+    ) -> bool:
 
         # steps:
         # 1. find all relations for this VCA
@@ -2147,7 +2159,9 @@ class NsLcm(LcmBase):
                             ee_id_1=from_vca_ee_id,
                             ee_id_2=to_vca_ee_id,
                             endpoint_1=from_vca_endpoint,
-                            endpoint_2=to_vca_endpoint)
+                            endpoint_2=to_vca_endpoint,
+                            vca_id=vca_id,
+                        )
                         # remove entry from relations list
                         ns_relations.remove(r)
                     else:
@@ -2193,7 +2207,9 @@ class NsLcm(LcmBase):
                             ee_id_1=from_vca_ee_id,
                             ee_id_2=to_vca_ee_id,
                             endpoint_1=from_vca_endpoint,
-                            endpoint_2=to_vca_endpoint)
+                            endpoint_2=to_vca_endpoint,
+                            vca_id=vca_id,
+                        )
                         # remove entry from relations list
                         vnf_relations.remove(r)
                     else:
@@ -2230,7 +2246,8 @@ class NsLcm(LcmBase):
             return False
 
     async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
-                           vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
+                           vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600,
+                           vca_id: str = None):
 
         try:
             k8sclustertype = k8s_instance_info["k8scluster-type"]
@@ -2255,6 +2272,7 @@ class NsLcm(LcmBase):
                 kdu_name=k8s_instance_info["kdu-name"],
                 namespace=k8s_instance_info["namespace"],
                 kdu_instance=kdu_instance,
+                vca_id=vca_id,
             )
             self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
 
@@ -2306,8 +2324,11 @@ class NsLcm(LcmBase):
                             cluster_uuid=k8s_instance_info["k8scluster-uuid"],
                             kdu_instance=kdu_instance,
                             primitive_name=initial_config_primitive["name"],
-                            params=primitive_params_, db_dict=db_dict_install),
-                        timeout=timeout)
+                            params=primitive_params_, db_dict=db_dict_install,
+                            vca_id=vca_id,
+                        ),
+                        timeout=timeout
+                    )
 
         except Exception as e:
             # Prepare update db with error and raise exception
@@ -2378,6 +2399,7 @@ class NsLcm(LcmBase):
             updated_v3_cluster_list = []
 
             for vnfr_data in db_vnfrs.values():
+                vca_id = self.get_vca_id(vnfr_data, {})
                 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
                     # Step 0: Prepare and set parameters
                     desc_params = parse_yaml_strings(kdur.get("additionalParams"))
@@ -2455,7 +2477,7 @@ class NsLcm(LcmBase):
                     vnfd_with_id = find_in_list(db_vnfds, lambda vnf: vnf["_id"] == vnfd_id)
                     task = asyncio.ensure_future(
                         self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, vnfd_with_id,
-                                          k8s_instance_info, k8params=desc_params, timeout=600))
+                                          k8s_instance_info, k8params=desc_params, timeout=600, vca_id=vca_id))
                     self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
                     task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
 
@@ -2788,8 +2810,18 @@ class NsLcm(LcmBase):
             if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
                 return vca["ee_id"]
 
-    async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
-                           vca_index, destroy_ee=True, exec_primitives=True, scaling_in=False):
+    async def destroy_N2VC(
+        self,
+        logging_text,
+        db_nslcmop,
+        vca_deployed,
+        config_descriptor,
+        vca_index,
+        destroy_ee=True,
+        exec_primitives=True,
+        scaling_in=False,
+        vca_id: str = None,
+    ):
         """
         Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
         :param logging_text:
@@ -2840,9 +2872,12 @@ class NsLcm(LcmBase):
                                            mapped_primitive_params)
                     # Sub-operations: Call _ns_execute_primitive() instead of action()
                     try:
-                        result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
-                                                                                 mapped_primitive_params,
-                                                                                 vca_type=vca_type)
+                        result, result_detail = await self._ns_execute_primitive(
+                            vca_deployed["ee_id"], primitive,
+                            mapped_primitive_params,
+                            vca_type=vca_type,
+                            vca_id=vca_id,
+                        )
                     except LcmException:
                         # this happens when VCA is not deployed. In this case it is not needed to terminate
                         continue
@@ -2858,13 +2893,21 @@ class NsLcm(LcmBase):
             await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
 
         if destroy_ee:
-            await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"], scaling_in=scaling_in)
+            await self.vca_map[vca_type].delete_execution_environment(
+                vca_deployed["ee_id"],
+                scaling_in=scaling_in,
+                vca_id=vca_id,
+            )
 
-    async def _delete_all_N2VC(self, db_nsr: dict):
+    async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
         self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
         namespace = "." + db_nsr["_id"]
         try:
-            await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
+            await self.n2vc.delete_namespace(
+                namespace=namespace,
+                total_timeout=self.timeout_charm_delete,
+                vca_id=vca_id,
+            )
         except N2VCNotFound:  # already deleted. Skip
             pass
         self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
@@ -3064,6 +3107,7 @@ class NsLcm(LcmBase):
 
             stage[1] = "Getting vnf descriptors from db."
             db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
+            db_vnfrs_dict = {db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list}
             db_vnfds_from_id = {}
             db_vnfds_from_member_index = {}
             # Loop over VNFRs
@@ -3086,6 +3130,8 @@ class NsLcm(LcmBase):
 
             for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
                 config_descriptor = None
+
+                vca_id = self.get_vca_id(db_vnfrs_dict[vca["member-vnf-index"]], db_nsr)
                 if not vca or not vca.get("ee_id"):
                     continue
                 if not vca.get("member-vnf-index"):
@@ -3109,8 +3155,17 @@ class NsLcm(LcmBase):
                 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
                 #     vca_index, vca.get("ee_id"), vca_type, destroy_ee))
                 task = asyncio.ensure_future(
-                    self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
-                                      destroy_ee, exec_terminate_primitives))
+                    self.destroy_N2VC(
+                        logging_text,
+                        db_nslcmop,
+                        vca,
+                        config_descriptor,
+                        vca_index,
+                        destroy_ee,
+                        exec_terminate_primitives,
+                        vca_id=vca_id,
+                    )
+                )
                 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
 
             # wait for pending tasks of terminate primitives
@@ -3129,8 +3184,13 @@ class NsLcm(LcmBase):
             if nsr_deployed.get("VCA"):
                 stage[1] = "Deleting all execution environments."
                 self.logger.debug(logging_text + stage[1])
-                task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
-                                                                        timeout=self.timeout_charm_delete))
+                vca_id = self.get_vca_id({}, db_nsr)
+                task_delete_ee = asyncio.ensure_future(
+                    asyncio.wait_for(
+                        self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
+                        timeout=self.timeout_charm_delete
+                    )
+                )
                 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
                 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
 
@@ -3143,10 +3203,15 @@ class NsLcm(LcmBase):
                     continue
                 kdu_instance = kdu.get("kdu-instance")
                 if kdu.get("k8scluster-type") in self.k8scluster_map:
+                    # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
+                    vca_id = self.get_vca_id({}, db_nsr)
                     task_delete_kdu_instance = asyncio.ensure_future(
                         self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
                             cluster_uuid=kdu.get("k8scluster-uuid"),
-                            kdu_instance=kdu_instance))
+                            kdu_instance=kdu_instance,
+                            vca_id=vca_id,
+                        )
+                    )
                 else:
                     self.logger.error(logging_text + "Unknown k8s deployment type {}".
                                       format(kdu.get("k8scluster-type")))
@@ -3378,8 +3443,18 @@ class NsLcm(LcmBase):
                                .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
         return ee_id, vca_type
 
-    async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0, retries_interval=30,
-                                    timeout=None, vca_type=None, db_dict=None) -> (str, str):
+    async def _ns_execute_primitive(
+        self,
+        ee_id,
+        primitive,
+        primitive_params,
+        retries=0,
+        retries_interval=30,
+        timeout=None,
+        vca_type=None,
+        db_dict=None,
+        vca_id: str = None,
+    ) -> (str, str):
         try:
             if primitive == "config":
                 primitive_params = {"params": primitive_params}
@@ -3395,7 +3470,9 @@ class NsLcm(LcmBase):
                             params_dict=primitive_params,
                             progress_timeout=self.timeout_progress_primitive,
                             total_timeout=self.timeout_primitive,
-                            db_dict=db_dict),
+                            db_dict=db_dict,
+                            vca_id=vca_id,
+                        ),
                         timeout=timeout or self.timeout_primitive)
                     # execution was OK
                     break
@@ -3429,10 +3506,11 @@ class NsLcm(LcmBase):
 
         self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
         db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+        vca_id = self.get_vca_id({}, db_nsr)
         if db_nsr['_admin']['deployed']['K8s']:
             for k8s_index, k8s in enumerate(db_nsr['_admin']['deployed']['K8s']):
                 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
-                await self._on_update_k8s_db(cluster_uuid, kdu_instance, filter={'_id': nsr_id})
+                await self._on_update_k8s_db(cluster_uuid, kdu_instance, filter={'_id': nsr_id}, vca_id=vca_id)
         else:
             for vca_index, _ in enumerate(db_nsr['_admin']['deployed']['VCA']):
                 table, filter = "nsrs", {"_id": nsr_id}
@@ -3492,6 +3570,7 @@ class NsLcm(LcmBase):
                 step = "Getting nsd from database"
                 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
 
+            vca_id = self.get_vca_id(db_vnfr, db_nsr)
             # for backward compatibility
             if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
                 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
@@ -3596,8 +3675,11 @@ class NsLcm(LcmBase):
                     detailed_status = await asyncio.wait_for(
                         self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
                             cluster_uuid=kdu.get("k8scluster-uuid"),
-                            kdu_instance=kdu.get("kdu-instance")),
-                        timeout=timeout_ns_action)
+                            kdu_instance=kdu.get("kdu-instance"),
+                            vca_id=vca_id,
+                        ),
+                        timeout=timeout_ns_action
+                    )
                 else:
                     kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
                     params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
@@ -3608,8 +3690,11 @@ class NsLcm(LcmBase):
                             kdu_instance=kdu_instance,
                             primitive_name=primitive_name,
                             params=params, db_dict=db_dict,
-                            timeout=timeout_ns_action),
-                        timeout=timeout_ns_action)
+                            timeout=timeout_ns_action,
+                            vca_id=vca_id,
+                        ),
+                        timeout=timeout_ns_action
+                    )
 
                 if detailed_status:
                     nslcmop_operation_state = 'COMPLETED'
@@ -3632,7 +3717,9 @@ class NsLcm(LcmBase):
                     primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
                     timeout=timeout_ns_action,
                     vca_type=vca_type,
-                    db_dict=db_dict)
+                    db_dict=db_dict,
+                    vca_id=vca_id,
+                )
 
             db_nslcmop_update["detailed-status"] = detailed_status
             error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
@@ -3744,6 +3831,8 @@ class NsLcm(LcmBase):
             step = "Getting vnfr from database"
             db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
 
+            vca_id = self.get_vca_id(db_vnfr, db_nsr)
+
             step = "Getting vnfd from database"
             db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
 
@@ -3969,7 +4058,11 @@ class NsLcm(LcmBase):
                                                                           vdu_count_index=None,
                                                                           ee_descriptor_id=ee_descriptor_id)
                             result, result_detail = await self._ns_execute_primitive(
-                                ee_id, primitive_name, primitive_params, vca_type=vca_type)
+                                ee_id, primitive_name,
+                                primitive_params,
+                                vca_type=vca_type,
+                                vca_id=vca_id,
+                            )
                             self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
                                 vnf_config_primitive, result, result_detail))
                             # Update operationState = COMPLETED | FAILED
@@ -4017,11 +4110,22 @@ class NsLcm(LcmBase):
                                 operation_params = db_nslcmop.get("operationParams") or {}
                                 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
                                                              vca.get("needed_terminate"))
-                                task = asyncio.ensure_future(asyncio.wait_for(
-                                    self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
-                                                      vca_index, destroy_ee=True,
-                                                      exec_primitives=exec_terminate_primitives,
-                                                      scaling_in=True), timeout=self.timeout_charm_delete))
+                                task = asyncio.ensure_future(
+                                    asyncio.wait_for(
+                                        self.destroy_N2VC(
+                                            logging_text,
+                                            db_nslcmop,
+                                            vca,
+                                            config_descriptor,
+                                            vca_index,
+                                            destroy_ee=True,
+                                            exec_primitives=exec_terminate_primitives,
+                                            scaling_in=True,
+                                            vca_id=vca_id,
+                                        ),
+                                        timeout=self.timeout_charm_delete
+                                    )
+                                )
                                 # wait before next removal
                                 await asyncio.sleep(30)
                                 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
@@ -4234,7 +4338,12 @@ class NsLcm(LcmBase):
                                                                           vdu_count_index=None,
                                                                           ee_descriptor_id=ee_descriptor_id)
                             result, result_detail = await self._ns_execute_primitive(
-                                ee_id, primitive_name, primitive_params, vca_type=vca_type)
+                                ee_id,
+                                primitive_name,
+                                primitive_params,
+                                vca_type=vca_type,
+                                vca_id=vca_id,
+                            )
                             self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
                                 vnf_config_primitive, result, result_detail))
                             # Update operationState = COMPLETED | FAILED
diff --git a/osm_lcm/tests/test_vim_sdn.py b/osm_lcm/tests/test_vim_sdn.py
new file mode 100644 (file)
index 0000000..f6b75e0
--- /dev/null
@@ -0,0 +1,160 @@
+# Copyright 2021 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#     Unless required by applicable law or agreed to in writing, software
+#     distributed under the License is distributed on an "AS IS" BASIS,
+#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#     See the License for the specific language governing permissions and
+#     limitations under the License.
+
+import asyncio
+from unittest import TestCase
+from unittest.mock import Mock, patch, MagicMock
+
+
+from osm_common import msgbase
+from osm_common.dbbase import DbException
+from osm_lcm.vim_sdn import VcaLcm
+
+
+class AsyncMock(MagicMock):
+    async def __call__(self, *args, **kwargs):
+        return super(AsyncMock, self).__call__(*args, **kwargs)
+
+
+class TestVcaLcm(TestCase):
+    @patch("osm_lcm.lcm_utils.Database")
+    @patch("osm_lcm.lcm_utils.Filesystem")
+    def setUp(self, mock_filesystem, mock_database):
+        self.loop = asyncio.get_event_loop()
+        self.msg = Mock(msgbase.MsgBase())
+        self.lcm_tasks = Mock()
+        self.config = {"database": {"driver": "mongo"}}
+        self.vca_lcm = VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop)
+        self.vca_lcm.db = Mock()
+        self.vca_lcm.fs = Mock()
+
+    def test_vca_lcm_create(self):
+        vca_content = {"op_id": "order-id", "_id": "id"}
+        db_vca = {
+            "_id": "vca-id",
+            "secret": "secret",
+            "cacert": "cacert",
+            "schema_version": "1.11",
+        }
+        order_id = "order-id"
+        self.lcm_tasks.lock_HA.return_value = True
+        self.vca_lcm.db.get_one.return_value = db_vca
+        self.vca_lcm.n2vc.validate_vca = AsyncMock()
+        self.vca_lcm.update_db_2 = Mock()
+
+        self.loop.run_until_complete(self.vca_lcm.create(vca_content, order_id))
+
+        self.lcm_tasks.lock_HA.assert_called_with("vca", "create", "order-id")
+        self.vca_lcm.db.encrypt_decrypt_fields.assert_called_with(
+            db_vca,
+            "decrypt",
+            ["secret", "cacert"],
+            schema_version="1.11",
+            salt="vca-id",
+        )
+        self.vca_lcm.update_db_2.assert_called_with(
+            "vca",
+            "id",
+            {
+                "_admin.operationalState": "ENABLED",
+                "_admin.detailed-status": "Connectivity: ok",
+            },
+        )
+        self.lcm_tasks.unlock_HA.assert_called_with(
+            "vca",
+            "create",
+            "order-id",
+            operationState="COMPLETED",
+            detailed_status="VCA validated",
+        )
+        self.lcm_tasks.remove.assert_called_with("vca", "id", "order-id")
+
+    def test_vca_lcm_create_exception(self):
+        vca_content = {"op_id": "order-id", "_id": "id"}
+        db_vca = {
+            "_id": "vca-id",
+            "secret": "secret",
+            "cacert": "cacert",
+            "schema_version": "1.11",
+        }
+        order_id = "order-id"
+        self.lcm_tasks.lock_HA.return_value = True
+        self.vca_lcm.db.get_one.return_value = db_vca
+        self.vca_lcm.n2vc.validate_vca = AsyncMock()
+        self.vca_lcm.n2vc.validate_vca.side_effect = Exception("failed")
+        self.vca_lcm.update_db_2 = Mock()
+        self.vca_lcm.update_db_2.side_effect = DbException("failed")
+        self.loop.run_until_complete(self.vca_lcm.create(vca_content, order_id))
+
+        self.lcm_tasks.lock_HA.assert_called_with("vca", "create", "order-id")
+        self.vca_lcm.db.encrypt_decrypt_fields.assert_called_with(
+            db_vca,
+            "decrypt",
+            ["secret", "cacert"],
+            schema_version="1.11",
+            salt="vca-id",
+        )
+        self.vca_lcm.update_db_2.assert_called_with(
+            "vca",
+            "id",
+            {
+                "_admin.operationalState": "ERROR",
+                "_admin.detailed-status": "Failed with exception: failed",
+            },
+        )
+        self.lcm_tasks.unlock_HA.assert_not_called()
+        self.lcm_tasks.remove.assert_called_with("vca", "id", "order-id")
+
+    def test_vca_lcm_delete(self):
+        vca_content = {"op_id": "order-id", "_id": "id"}
+        order_id = "order-id"
+        self.lcm_tasks.lock_HA.return_value = True
+        self.vca_lcm.update_db_2 = Mock()
+
+        self.loop.run_until_complete(self.vca_lcm.delete(vca_content, order_id))
+
+        self.lcm_tasks.lock_HA.assert_called_with("vca", "delete", "order-id")
+        self.vca_lcm.db.del_one.assert_called_with("vca", {"_id": "id"})
+        self.vca_lcm.update_db_2.assert_called_with("vca", "id", None)
+        self.lcm_tasks.unlock_HA.assert_called_with(
+            "vca",
+            "delete",
+            "order-id",
+            operationState="COMPLETED",
+            detailed_status="deleted",
+        )
+        self.lcm_tasks.remove.assert_called_with("vca", "id", "order-id")
+
+    def test_vca_lcm_delete_exception(self):
+        vca_content = {"op_id": "order-id", "_id": "id"}
+        order_id = "order-id"
+        self.lcm_tasks.lock_HA.return_value = True
+        self.vca_lcm.update_db_2 = Mock()
+        self.vca_lcm.db.del_one.side_effect = Exception("failed deleting")
+        self.vca_lcm.update_db_2.side_effect = DbException("failed")
+
+        self.loop.run_until_complete(self.vca_lcm.delete(vca_content, order_id))
+
+        self.lcm_tasks.lock_HA.assert_called_with("vca", "delete", "order-id")
+        self.vca_lcm.db.del_one.assert_called_with("vca", {"_id": "id"})
+        self.vca_lcm.update_db_2.assert_called_with(
+            "vca",
+            "id",
+            {
+                "_admin.operationalState": "ERROR",
+                "_admin.detailed-status": "Failed with exception: failed deleting",
+            },
+        )
+        self.lcm_tasks.unlock_HA.not_called()
+        self.lcm_tasks.remove.assert_called_with("vca", "id", "order-id")
index 13b95c4..a1623ba 100644 (file)
@@ -25,6 +25,7 @@ from osm_lcm.lcm_utils import LcmException, LcmBase, deep_get
 from n2vc.k8s_helm_conn import K8sHelmConnector
 from n2vc.k8s_helm3_conn import K8sHelm3Connector
 from n2vc.k8s_juju_conn import K8sJujuConnector
+from n2vc.n2vc_juju_conn import N2VCJujuConnector
 from n2vc.exceptions import K8sException, N2VCException
 from osm_common.dbbase import DbException
 from copy import deepcopy
@@ -937,7 +938,6 @@ class K8sClusterLcm(LcmBase):
             log=self.logger,
             loop=self.loop,
             on_update_db=None,
-            vca_config=self.vca_config,
             db=self.db,
             fs=self.fs
         )
@@ -975,8 +975,13 @@ class K8sClusterLcm(LcmBase):
             for task_name in ("helm-chart", "juju-bundle", "helm-chart-v3"):
                 if init_target and task_name not in init_target:
                     continue
-                task = asyncio.ensure_future(self.k8s_map[task_name].init_env(k8s_credentials,
-                                                                              reuse_cluster_uuid=k8scluster_id))
+                task = asyncio.ensure_future(
+                    self.k8s_map[task_name].init_env(
+                        k8s_credentials,
+                        reuse_cluster_uuid=k8scluster_id,
+                        vca_id=db_k8scluster.get("vca_id"),
+                    )
+                )
                 pending_tasks.append(task)
                 task2name[task] = task_name
 
@@ -1089,7 +1094,11 @@ class K8sClusterLcm(LcmBase):
             if k8s_jb_id:  # delete in reverse order of creation
                 step = "Removing juju-bundle '{}'".format(k8s_jb_id)
                 uninstall_sw = deep_get(db_k8scluster, ("_admin", "juju-bundle", "created")) or False
-                cluster_removed = await self.juju_k8scluster.reset(cluster_uuid=k8s_jb_id, uninstall_sw=uninstall_sw)
+                cluster_removed = await self.juju_k8scluster.reset(
+                    cluster_uuid=k8s_jb_id,
+                    uninstall_sw=uninstall_sw,
+                    vca_id=db_k8scluster.get("vca_id"),
+                )
                 db_k8scluster_update["_admin.juju-bundle.id"] = None
                 db_k8scluster_update["_admin.juju-bundle.operationalState"] = "DISABLED"
 
@@ -1153,6 +1162,139 @@ class K8sClusterLcm(LcmBase):
             self.lcm_tasks.remove("k8scluster", k8scluster_id, order_id)
 
 
+class VcaLcm(LcmBase):
+    timeout_create = 30
+
+    def __init__(self, msg, lcm_tasks, config, loop):
+        """
+        Init, Connect to database, filesystem storage, and messaging
+        :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+        :return: None
+        """
+
+        self.logger = logging.getLogger("lcm.vca")
+        self.loop = loop
+        self.lcm_tasks = lcm_tasks
+
+        super().__init__(msg, self.logger)
+
+        # create N2VC connector
+        self.n2vc = N2VCJujuConnector(
+            log=self.logger,
+            loop=self.loop,
+            fs=self.fs,
+            db=self.db
+        )
+
+    def _get_vca_by_id(self, vca_id: str) -> dict:
+        db_vca = self.db.get_one("vca", {"_id": vca_id})
+        self.db.encrypt_decrypt_fields(
+            db_vca,
+            "decrypt",
+            ["secret", "cacert"],
+            schema_version=db_vca["schema_version"], salt=db_vca["_id"]
+        )
+        return db_vca
+
+    async def create(self, vca_content, order_id):
+        op_id = vca_content.pop("op_id", None)
+        if not self.lcm_tasks.lock_HA("vca", "create", op_id):
+            return
+
+        vca_id = vca_content["_id"]
+        self.logger.debug("Task vca_create={} {}".format(vca_id, "Enter"))
+
+        db_vca = None
+        db_vca_update = {}
+
+        try:
+            self.logger.debug("Task vca_create={} {}".format(vca_id, "Getting vca from db"))
+            db_vca = self._get_vca_by_id(vca_id)
+
+            task = asyncio.ensure_future(
+                asyncio.wait_for(
+                    self.n2vc.validate_vca(db_vca["_id"]),
+                    timeout=self.timeout_create,
+                )
+            )
+
+            await asyncio.wait([task], return_when=asyncio.FIRST_COMPLETED)
+            if task.exception():
+                raise task.exception()
+            self.logger.debug("Task vca_create={} {}".format(vca_id, "vca registered and validated successfully"))
+            db_vca_update["_admin.operationalState"] = "ENABLED"
+            db_vca_update["_admin.detailed-status"] = "Connectivity: ok"
+            operation_details = "VCA validated"
+            operation_state = "COMPLETED"
+
+            self.logger.debug("Task vca_create={} {}".format(vca_id, "Done. Result: {}".format(operation_state)))
+
+        except Exception as e:
+            error_msg = "Failed with exception: {}".format(e)
+            self.logger.error("Task vca_create={} {}".format(vca_id, error_msg))
+            db_vca_update["_admin.operationalState"] = "ERROR"
+            db_vca_update["_admin.detailed-status"] = error_msg
+            operation_state = "FAILED"
+            operation_details = error_msg
+        finally:
+            try:
+                self.update_db_2("vca", vca_id, db_vca_update)
+
+                # Register the operation and unlock
+                self.lcm_tasks.unlock_HA(
+                    "vca",
+                    "create",
+                    op_id,
+                    operationState=operation_state,
+                    detailed_status=operation_details
+                )
+            except DbException as e:
+                self.logger.error("Task vca_create={} {}".format(vca_id, "Cannot update database: {}".format(e)))
+            self.lcm_tasks.remove("vca", vca_id, order_id)
+
+    async def delete(self, vca_content, order_id):
+
+        # HA tasks and backward compatibility:
+        # If "vim_content" does not include "op_id", we a running a legacy NBI version.
+        # In such a case, HA is not supported by NBI, "op_id" is None, and lock_HA() will do nothing.
+        # Register "delete" task here for related future HA operations
+        op_id = vca_content.pop("op_id", None)
+        if not self.lcm_tasks.lock_HA("vca", "delete", op_id):
+            return
+
+        db_vca_update = {}
+        vca_id = vca_content["_id"]
+
+        try:
+            self.logger.debug("Task vca_delete={} {}".format(vca_id, "Deleting vca from db"))
+            self.db.del_one("vca", {"_id": vca_id})
+            db_vca_update = None
+            operation_details = "deleted"
+            operation_state = "COMPLETED"
+
+            self.logger.debug("Task vca_delete={} {}".format(vca_id, "Done. Result: {}".format(operation_state)))
+        except Exception as e:
+            error_msg = "Failed with exception: {}".format(e)
+            self.logger.error("Task vca_delete={} {}".format(vca_id, error_msg))
+            db_vca_update["_admin.operationalState"] = "ERROR"
+            db_vca_update["_admin.detailed-status"] = error_msg
+            operation_state = "FAILED"
+            operation_details = error_msg
+        finally:
+            try:
+                self.update_db_2("vca", vca_id, db_vca_update)
+                self.lcm_tasks.unlock_HA(
+                    "vca",
+                    "delete",
+                    op_id,
+                    operationState=operation_state,
+                    detailed_status=operation_details,
+                )
+            except DbException as e:
+                self.logger.error("Task vca_delete={} {}".format(vca_id, "Cannot update database: {}".format(e)))
+            self.lcm_tasks.remove("vca", vca_id, order_id)
+
+
 class K8sRepoLcm(LcmBase):
 
     def __init__(self, msg, lcm_tasks, config, loop):
index e9e1d38..a3f9edc 100644 (file)
@@ -1,5 +1,9 @@
 aiokafka==0.7.0
     # via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
+async-timeout==3.0.1
+    # via
+    #   -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
+    #   retrying-async
 bcrypt==3.2.0
     # via
     #   -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
@@ -54,6 +58,8 @@ macaroonbakery==1.3.1
     #   -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
     #   juju
     #   theblues
+motor==1.3.1
+    # via -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
 mypy-extensions==0.4.3
     # via
     #   -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
@@ -95,7 +101,10 @@ pymacaroons==0.13.0
     #   -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
     #   macaroonbakery
 pymongo==3.11.3
-    # via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
+    # via
+    #   -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
+    #   -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
+    #   motor
 pynacl==1.4.0
     # via
     #   -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
@@ -133,6 +142,8 @@ requests==2.25.1
     #   macaroonbakery
     #   requests-oauthlib
     #   theblues
+retrying-async==1.2.0
+    # via -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master
 rsa==4.7.2
     # via
     #   -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master