From c1fe90adf8ed0d671342c617fed7184629c7003e Mon Sep 17 00:00:00 2001 From: David Garcia Date: Wed, 31 Mar 2021 19:12:02 +0200 Subject: [PATCH] Feature 10239: Distributed VCA - Handle VCATopic: https://osm.etsi.org/gerrit/#/c/osm/NBI/+/10574/ - Pass vca_id to calls in N2VC, so N2VC can know to which VCA it needs to talk Depends on the following patch: https://osm.etsi.org/gerrit/#/c/osm/N2VC/+/10616/ Change-Id: I080c1aab94f70de83f2d33def74ccd03450dbdd6 Signed-off-by: David Garcia --- osm_lcm/lcm.py | 16 ++- osm_lcm/lcm_helm_conn.py | 6 +- osm_lcm/lcm_utils.py | 4 +- osm_lcm/ns.py | 243 ++++++++++++++++++++++++---------- osm_lcm/tests/test_vim_sdn.py | 160 ++++++++++++++++++++++ osm_lcm/vim_sdn.py | 150 ++++++++++++++++++++- requirements-dev.txt | 13 +- 7 files changed, 512 insertions(+), 80 deletions(-) create mode 100644 osm_lcm/tests/test_vim_sdn.py diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 659a57c..ebfca7e 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -104,7 +104,7 @@ class Lcm: self.config["ro_config"]["uri"] = self.config["ro_config"]["uri"][index+1] self.loop = loop or asyncio.get_event_loop() - self.ns = self.netslice = self.vim = self.wim = self.sdn = self.k8scluster = self.k8srepo = None + self.ns = self.netslice = self.vim = self.wim = self.sdn = self.k8scluster = self.vca = self.k8srepo = None # logging log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s" @@ -309,6 +309,17 @@ class Lcm: task = asyncio.ensure_future(self.k8scluster.delete(params, order_id)) self.lcm_tasks.register("k8scluster", k8scluster_id, order_id, "k8scluster_delete", task) return + elif topic == "vca": + if command == "create" or command == "created": + vca_id = params.get("_id") + task = asyncio.ensure_future(self.vca.create(params, order_id)) + self.lcm_tasks.register("vca", vca_id, order_id, "vca_create", task) + return + elif command == "delete" or command == "deleted": + vca_id = params.get("_id") + task = asyncio.ensure_future(self.vca.delete(params, order_id)) + self.lcm_tasks.register("vca", vca_id, order_id, "vca_delete", task) + return elif topic == "k8srepo": if command == "create" or command == "created": k8srepo_id = params.get("_id") @@ -486,7 +497,7 @@ class Lcm: self.first_start = True while self.consecutive_errors < 10: try: - topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "k8srepo", "pla") + topics = ("ns", "vim_account", "wim_account", "sdn", "nsi", "k8scluster", "vca", "k8srepo", "pla") topics_admin = ("admin", ) await asyncio.gather( self.msg.aioread(topics, self.loop, self.kafka_read_callback, from_beginning=True), @@ -522,6 +533,7 @@ class Lcm: self.wim = vim_sdn.WimLcm(self.msg, self.lcm_tasks, self.config, self.loop) self.sdn = vim_sdn.SdnLcm(self.msg, self.lcm_tasks, self.config, self.loop) self.k8scluster = vim_sdn.K8sClusterLcm(self.msg, self.lcm_tasks, self.config, self.loop) + self.vca = vim_sdn.VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop) self.k8srepo = vim_sdn.K8sRepoLcm(self.msg, self.lcm_tasks, self.config, self.loop) # configure tsdb prometheus diff --git a/osm_lcm/lcm_helm_conn.py b/osm_lcm/lcm_helm_conn.py index 34e4915..a13824d 100644 --- a/osm_lcm/lcm_helm_conn.py +++ b/osm_lcm/lcm_helm_conn.py @@ -88,8 +88,6 @@ class LCMHelmConn(N2VCConnector, LcmBase): def __init__(self, log: object = None, loop: object = None, - url: str = None, - username: str = None, vca_config: dict = None, on_update_db=None, ): """ @@ -104,14 +102,12 @@ class LCMHelmConn(N2VCConnector, LcmBase): self, log=log, loop=loop, - url=url, - username=username, - vca_config=vca_config, on_update_db=on_update_db, db=self.db, fs=self.fs ) + self.vca_config = vca_config self.log.debug("Initialize helm N2VC connector") self.log.debug("initial vca_config: {}".format(vca_config)) diff --git a/osm_lcm/lcm_utils.py b/osm_lcm/lcm_utils.py index a05e5ac..a1569c1 100644 --- a/osm_lcm/lcm_utils.py +++ b/osm_lcm/lcm_utils.py @@ -146,7 +146,7 @@ class TaskRegistry(LcmBase): # NS/NSI: "services" VIM/WIM/SDN: "accounts" topic_service_list = ['ns', 'nsi'] - topic_account_list = ['vim', 'wim', 'sdn', 'k8scluster', 'k8srepo'] + topic_account_list = ['vim', 'wim', 'sdn', 'k8scluster', 'vca', 'k8srepo'] # Map topic to InstanceID topic2instid_dict = { @@ -161,6 +161,7 @@ class TaskRegistry(LcmBase): 'wim': 'wim_accounts', 'sdn': 'sdns', 'k8scluster': 'k8sclusters', + 'vca': 'vca', 'k8srepo': 'k8srepos'} def __init__(self, worker_id=None, logger=None): @@ -171,6 +172,7 @@ class TaskRegistry(LcmBase): "wim_account": {}, "sdn": {}, "k8scluster": {}, + "vca": {}, "k8srepo": {}, } self.worker_id = worker_id diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 1bcf4c7..0f5c6ab 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -97,9 +97,6 @@ class NsLcm(LcmBase): self.n2vc = N2VCJujuConnector( log=self.logger, loop=self.loop, - url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']), - username=self.vca_config.get('user', None), - vca_config=self.vca_config, on_update_db=self._on_update_n2vc_db, fs=self.fs, db=self.db @@ -108,8 +105,6 @@ class NsLcm(LcmBase): self.conn_helm_ee = LCMHelmConn( log=self.logger, loop=self.loop, - url=None, - username=None, vca_config=self.vca_config, on_update_db=self._on_update_n2vc_db ) @@ -138,7 +133,6 @@ class NsLcm(LcmBase): log=self.logger, loop=self.loop, on_update_db=self._on_update_k8s_db, - vca_config=self.vca_config, fs=self.fs, db=self.db ) @@ -200,7 +194,7 @@ class NsLcm(LcmBase): except Exception as e: self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e)) - async def _on_update_n2vc_db(self, table, filter, path, updated_data): + async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None): # remove last dot from path (if exists) if path.endswith('.'): @@ -217,12 +211,12 @@ class NsLcm(LcmBase): current_ns_status = nsr.get('nsState') # get vca status for NS - status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False) + status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False, vca_id=vca_id) # vcaStatus db_dict = dict() db_dict['vcaStatus'] = status_dict - await self.n2vc.update_vca_status(db_dict['vcaStatus']) + await self.n2vc.update_vca_status(db_dict['vcaStatus'], vca_id=vca_id) # update configurationStatus for this VCA try: @@ -289,7 +283,7 @@ class NsLcm(LcmBase): except Exception as e: self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e)) - async def _on_update_k8s_db(self, cluster_uuid, kdu_instance, filter=None): + async def _on_update_k8s_db(self, cluster_uuid, kdu_instance, filter=None, vca_id=None): """ Updating vca status in NSR record :param cluster_uuid: UUID of a k8s cluster @@ -305,15 +299,22 @@ class NsLcm(LcmBase): nsr_id = filter.get('_id') # get vca status for NS - vca_status = await self.k8sclusterjuju.status_kdu(cluster_uuid, - kdu_instance, - complete_status=True, - yaml_format=False) + vca_status = await self.k8sclusterjuju.status_kdu( + cluster_uuid, + kdu_instance, + complete_status=True, + yaml_format=False, + vca_id=vca_id, + ) # vcaStatus db_dict = dict() db_dict['vcaStatus'] = {nsr_id: vca_status} - await self.k8sclusterjuju.update_vca_status(db_dict['vcaStatus'], kdu_instance) + await self.k8sclusterjuju.update_vca_status( + db_dict['vcaStatus'], + kdu_instance, + vca_id=vca_id, + ) # write to database self.update_db_2("nsrs", nsr_id, db_dict) @@ -1179,6 +1180,12 @@ class NsLcm(LcmBase): raise LcmException("Configuration aborted because dependent charm/s timeout") + def get_vca_id(self, db_vnfr: dict, db_nsr: dict): + return ( + deep_get(db_vnfr, ("vca-id",)) or + deep_get(db_nsr, ("instantiate_params", "vcaId")) + ) + async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index, config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name, ee_config_descriptor): @@ -1248,12 +1255,7 @@ class NsLcm(LcmBase): # find old ee_id if exists ee_id = vca_deployed.get("ee_id") - vim_account_id = ( - deep_get(db_vnfr, ("vim-account-id",)) or - deep_get(deploy_params, ("OSM", "vim_account_id")) - ) - vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id) - vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id) + vca_id = self.get_vca_id(db_vnfr, db_nsr) # create or register execution environment in VCA if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"): @@ -1276,8 +1278,7 @@ class NsLcm(LcmBase): namespace=namespace, artifact_path=artifact_path, db_dict=db_dict, - cloud_name=vca_k8s_cloud, - credential_name=vca_k8s_cloud_credential, + vca_id=vca_id, ) elif vca_type == "helm" or vca_type == "helm-v3": ee_id, credentials = await self.vca_map[vca_type].create_execution_environment( @@ -1293,8 +1294,7 @@ class NsLcm(LcmBase): namespace=namespace, reuse_ee_id=ee_id, db_dict=db_dict, - cloud_name=vca_cloud, - credential_name=vca_cloud_credential, + vca_id=vca_id, ) elif vca_type == "native_charm": @@ -1333,8 +1333,7 @@ class NsLcm(LcmBase): credentials=credentials, namespace=namespace, db_dict=db_dict, - cloud_name=vca_cloud, - credential_name=vca_cloud_credential, + vca_id=vca_id, ) # for compatibility with MON/POL modules, the need model and application name at database @@ -1388,6 +1387,7 @@ class NsLcm(LcmBase): db_dict=db_dict, config=config, num_units=num_units, + vca_id=vca_id, ) # write in db flag of configuration_sw already installed @@ -1395,7 +1395,7 @@ class NsLcm(LcmBase): # add relations for this VCA (wait for other peers related with this VCA) await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, - vca_index=vca_index, vca_type=vca_type) + vca_index=vca_index, vca_id=vca_id, vca_type=vca_type) # if SSH access is required, then get execution environment SSH public # if native charm we have waited already to VM be UP @@ -1408,7 +1408,11 @@ class NsLcm(LcmBase): # Needed to inject a ssh key user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user")) step = "Install configuration Software, getting public ssh key" - pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict) + pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key( + ee_id=ee_id, + db_dict=db_dict, + vca_id=vca_id + ) step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key) else: @@ -1476,7 +1480,8 @@ class NsLcm(LcmBase): ee_id=ee_id, primitive_name=initial_config_primitive["name"], params_dict=primitive_params_, - db_dict=db_dict + db_dict=db_dict, + vca_id=vca_id, ) # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives if check_if_terminated_needed: @@ -2063,8 +2068,15 @@ class NsLcm(LcmBase): self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate") - async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, - timeout: int = 3600, vca_type: str = None) -> bool: + async def _add_vca_relations( + self, + logging_text, + nsr_id, + vca_index: int, + timeout: int = 3600, + vca_type: str = None, + vca_id: str = None, + ) -> bool: # steps: # 1. find all relations for this VCA @@ -2147,7 +2159,9 @@ class NsLcm(LcmBase): ee_id_1=from_vca_ee_id, ee_id_2=to_vca_ee_id, endpoint_1=from_vca_endpoint, - endpoint_2=to_vca_endpoint) + endpoint_2=to_vca_endpoint, + vca_id=vca_id, + ) # remove entry from relations list ns_relations.remove(r) else: @@ -2193,7 +2207,9 @@ class NsLcm(LcmBase): ee_id_1=from_vca_ee_id, ee_id_2=to_vca_ee_id, endpoint_1=from_vca_endpoint, - endpoint_2=to_vca_endpoint) + endpoint_2=to_vca_endpoint, + vca_id=vca_id, + ) # remove entry from relations list vnf_relations.remove(r) else: @@ -2230,7 +2246,8 @@ class NsLcm(LcmBase): return False async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict, - vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600): + vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600, + vca_id: str = None): try: k8sclustertype = k8s_instance_info["k8scluster-type"] @@ -2255,6 +2272,7 @@ class NsLcm(LcmBase): kdu_name=k8s_instance_info["kdu-name"], namespace=k8s_instance_info["namespace"], kdu_instance=kdu_instance, + vca_id=vca_id, ) self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}) @@ -2306,8 +2324,11 @@ class NsLcm(LcmBase): cluster_uuid=k8s_instance_info["k8scluster-uuid"], kdu_instance=kdu_instance, primitive_name=initial_config_primitive["name"], - params=primitive_params_, db_dict=db_dict_install), - timeout=timeout) + params=primitive_params_, db_dict=db_dict_install, + vca_id=vca_id, + ), + timeout=timeout + ) except Exception as e: # Prepare update db with error and raise exception @@ -2378,6 +2399,7 @@ class NsLcm(LcmBase): updated_v3_cluster_list = [] for vnfr_data in db_vnfrs.values(): + vca_id = self.get_vca_id(vnfr_data, {}) for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")): # Step 0: Prepare and set parameters desc_params = parse_yaml_strings(kdur.get("additionalParams")) @@ -2455,7 +2477,7 @@ class NsLcm(LcmBase): vnfd_with_id = find_in_list(db_vnfds, lambda vnf: vnf["_id"] == vnfd_id) task = asyncio.ensure_future( self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, vnfd_with_id, - k8s_instance_info, k8params=desc_params, timeout=600)) + k8s_instance_info, k8params=desc_params, timeout=600, vca_id=vca_id)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task) task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"]) @@ -2788,8 +2810,18 @@ class NsLcm(LcmBase): if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id: return vca["ee_id"] - async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor, - vca_index, destroy_ee=True, exec_primitives=True, scaling_in=False): + async def destroy_N2VC( + self, + logging_text, + db_nslcmop, + vca_deployed, + config_descriptor, + vca_index, + destroy_ee=True, + exec_primitives=True, + scaling_in=False, + vca_id: str = None, + ): """ Execute the terminate primitives and destroy the execution environment (if destroy_ee=False :param logging_text: @@ -2840,9 +2872,12 @@ class NsLcm(LcmBase): mapped_primitive_params) # Sub-operations: Call _ns_execute_primitive() instead of action() try: - result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive, - mapped_primitive_params, - vca_type=vca_type) + result, result_detail = await self._ns_execute_primitive( + vca_deployed["ee_id"], primitive, + mapped_primitive_params, + vca_type=vca_type, + vca_id=vca_id, + ) except LcmException: # this happens when VCA is not deployed. In this case it is not needed to terminate continue @@ -2858,13 +2893,21 @@ class NsLcm(LcmBase): await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"]) if destroy_ee: - await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"], scaling_in=scaling_in) + await self.vca_map[vca_type].delete_execution_environment( + vca_deployed["ee_id"], + scaling_in=scaling_in, + vca_id=vca_id, + ) - async def _delete_all_N2VC(self, db_nsr: dict): + async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None): self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING') namespace = "." + db_nsr["_id"] try: - await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete) + await self.n2vc.delete_namespace( + namespace=namespace, + total_timeout=self.timeout_charm_delete, + vca_id=vca_id, + ) except N2VCNotFound: # already deleted. Skip pass self._write_all_config_status(db_nsr=db_nsr, status='DELETED') @@ -3064,6 +3107,7 @@ class NsLcm(LcmBase): stage[1] = "Getting vnf descriptors from db." db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) + db_vnfrs_dict = {db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list} db_vnfds_from_id = {} db_vnfds_from_member_index = {} # Loop over VNFRs @@ -3086,6 +3130,8 @@ class NsLcm(LcmBase): for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")): config_descriptor = None + + vca_id = self.get_vca_id(db_vnfrs_dict[vca["member-vnf-index"]], db_nsr) if not vca or not vca.get("ee_id"): continue if not vca.get("member-vnf-index"): @@ -3109,8 +3155,17 @@ class NsLcm(LcmBase): # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format( # vca_index, vca.get("ee_id"), vca_type, destroy_ee)) task = asyncio.ensure_future( - self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index, - destroy_ee, exec_terminate_primitives)) + self.destroy_N2VC( + logging_text, + db_nslcmop, + vca, + config_descriptor, + vca_index, + destroy_ee, + exec_terminate_primitives, + vca_id=vca_id, + ) + ) tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id")) # wait for pending tasks of terminate primitives @@ -3129,8 +3184,13 @@ class NsLcm(LcmBase): if nsr_deployed.get("VCA"): stage[1] = "Deleting all execution environments." self.logger.debug(logging_text + stage[1]) - task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr), - timeout=self.timeout_charm_delete)) + vca_id = self.get_vca_id({}, db_nsr) + task_delete_ee = asyncio.ensure_future( + asyncio.wait_for( + self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id), + timeout=self.timeout_charm_delete + ) + ) # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id)) tasks_dict_info[task_delete_ee] = "Terminating all VCA" @@ -3143,10 +3203,15 @@ class NsLcm(LcmBase): continue kdu_instance = kdu.get("kdu-instance") if kdu.get("k8scluster-type") in self.k8scluster_map: + # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs + vca_id = self.get_vca_id({}, db_nsr) task_delete_kdu_instance = asyncio.ensure_future( self.k8scluster_map[kdu["k8scluster-type"]].uninstall( cluster_uuid=kdu.get("k8scluster-uuid"), - kdu_instance=kdu_instance)) + kdu_instance=kdu_instance, + vca_id=vca_id, + ) + ) else: self.logger.error(logging_text + "Unknown k8s deployment type {}". format(kdu.get("k8scluster-type"))) @@ -3378,8 +3443,18 @@ class NsLcm(LcmBase): .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index)) return ee_id, vca_type - async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0, retries_interval=30, - timeout=None, vca_type=None, db_dict=None) -> (str, str): + async def _ns_execute_primitive( + self, + ee_id, + primitive, + primitive_params, + retries=0, + retries_interval=30, + timeout=None, + vca_type=None, + db_dict=None, + vca_id: str = None, + ) -> (str, str): try: if primitive == "config": primitive_params = {"params": primitive_params} @@ -3395,7 +3470,9 @@ class NsLcm(LcmBase): params_dict=primitive_params, progress_timeout=self.timeout_progress_primitive, total_timeout=self.timeout_primitive, - db_dict=db_dict), + db_dict=db_dict, + vca_id=vca_id, + ), timeout=timeout or self.timeout_primitive) # execution was OK break @@ -3429,10 +3506,11 @@ class NsLcm(LcmBase): self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id)) db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + vca_id = self.get_vca_id({}, db_nsr) if db_nsr['_admin']['deployed']['K8s']: for k8s_index, k8s in enumerate(db_nsr['_admin']['deployed']['K8s']): cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"] - await self._on_update_k8s_db(cluster_uuid, kdu_instance, filter={'_id': nsr_id}) + await self._on_update_k8s_db(cluster_uuid, kdu_instance, filter={'_id': nsr_id}, vca_id=vca_id) else: for vca_index, _ in enumerate(db_nsr['_admin']['deployed']['VCA']): table, filter = "nsrs", {"_id": nsr_id} @@ -3492,6 +3570,7 @@ class NsLcm(LcmBase): step = "Getting nsd from database" db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]}) + vca_id = self.get_vca_id(db_vnfr, db_nsr) # for backward compatibility if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict): nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values()) @@ -3596,8 +3675,11 @@ class NsLcm(LcmBase): detailed_status = await asyncio.wait_for( self.k8scluster_map[kdu["k8scluster-type"]].status_kdu( cluster_uuid=kdu.get("k8scluster-uuid"), - kdu_instance=kdu.get("kdu-instance")), - timeout=timeout_ns_action) + kdu_instance=kdu.get("kdu-instance"), + vca_id=vca_id, + ), + timeout=timeout_ns_action + ) else: kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id) params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params) @@ -3608,8 +3690,11 @@ class NsLcm(LcmBase): kdu_instance=kdu_instance, primitive_name=primitive_name, params=params, db_dict=db_dict, - timeout=timeout_ns_action), - timeout=timeout_ns_action) + timeout=timeout_ns_action, + vca_id=vca_id, + ), + timeout=timeout_ns_action + ) if detailed_status: nslcmop_operation_state = 'COMPLETED' @@ -3632,7 +3717,9 @@ class NsLcm(LcmBase): primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params), timeout=timeout_ns_action, vca_type=vca_type, - db_dict=db_dict) + db_dict=db_dict, + vca_id=vca_id, + ) db_nslcmop_update["detailed-status"] = detailed_status error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else "" @@ -3744,6 +3831,8 @@ class NsLcm(LcmBase): step = "Getting vnfr from database" db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}) + vca_id = self.get_vca_id(db_vnfr, db_nsr) + step = "Getting vnfd from database" db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]}) @@ -3969,7 +4058,11 @@ class NsLcm(LcmBase): vdu_count_index=None, ee_descriptor_id=ee_descriptor_id) result, result_detail = await self._ns_execute_primitive( - ee_id, primitive_name, primitive_params, vca_type=vca_type) + ee_id, primitive_name, + primitive_params, + vca_type=vca_type, + vca_id=vca_id, + ) self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format( vnf_config_primitive, result, result_detail)) # Update operationState = COMPLETED | FAILED @@ -4017,11 +4110,22 @@ class NsLcm(LcmBase): operation_params = db_nslcmop.get("operationParams") or {} exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and vca.get("needed_terminate")) - task = asyncio.ensure_future(asyncio.wait_for( - self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, - vca_index, destroy_ee=True, - exec_primitives=exec_terminate_primitives, - scaling_in=True), timeout=self.timeout_charm_delete)) + task = asyncio.ensure_future( + asyncio.wait_for( + self.destroy_N2VC( + logging_text, + db_nslcmop, + vca, + config_descriptor, + vca_index, + destroy_ee=True, + exec_primitives=exec_terminate_primitives, + scaling_in=True, + vca_id=vca_id, + ), + timeout=self.timeout_charm_delete + ) + ) # wait before next removal await asyncio.sleep(30) tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id")) @@ -4234,7 +4338,12 @@ class NsLcm(LcmBase): vdu_count_index=None, ee_descriptor_id=ee_descriptor_id) result, result_detail = await self._ns_execute_primitive( - ee_id, primitive_name, primitive_params, vca_type=vca_type) + ee_id, + primitive_name, + primitive_params, + vca_type=vca_type, + vca_id=vca_id, + ) self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format( vnf_config_primitive, result, result_detail)) # Update operationState = COMPLETED | FAILED diff --git a/osm_lcm/tests/test_vim_sdn.py b/osm_lcm/tests/test_vim_sdn.py new file mode 100644 index 0000000..f6b75e0 --- /dev/null +++ b/osm_lcm/tests/test_vim_sdn.py @@ -0,0 +1,160 @@ +# Copyright 2021 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +from unittest import TestCase +from unittest.mock import Mock, patch, MagicMock + + +from osm_common import msgbase +from osm_common.dbbase import DbException +from osm_lcm.vim_sdn import VcaLcm + + +class AsyncMock(MagicMock): + async def __call__(self, *args, **kwargs): + return super(AsyncMock, self).__call__(*args, **kwargs) + + +class TestVcaLcm(TestCase): + @patch("osm_lcm.lcm_utils.Database") + @patch("osm_lcm.lcm_utils.Filesystem") + def setUp(self, mock_filesystem, mock_database): + self.loop = asyncio.get_event_loop() + self.msg = Mock(msgbase.MsgBase()) + self.lcm_tasks = Mock() + self.config = {"database": {"driver": "mongo"}} + self.vca_lcm = VcaLcm(self.msg, self.lcm_tasks, self.config, self.loop) + self.vca_lcm.db = Mock() + self.vca_lcm.fs = Mock() + + def test_vca_lcm_create(self): + vca_content = {"op_id": "order-id", "_id": "id"} + db_vca = { + "_id": "vca-id", + "secret": "secret", + "cacert": "cacert", + "schema_version": "1.11", + } + order_id = "order-id" + self.lcm_tasks.lock_HA.return_value = True + self.vca_lcm.db.get_one.return_value = db_vca + self.vca_lcm.n2vc.validate_vca = AsyncMock() + self.vca_lcm.update_db_2 = Mock() + + self.loop.run_until_complete(self.vca_lcm.create(vca_content, order_id)) + + self.lcm_tasks.lock_HA.assert_called_with("vca", "create", "order-id") + self.vca_lcm.db.encrypt_decrypt_fields.assert_called_with( + db_vca, + "decrypt", + ["secret", "cacert"], + schema_version="1.11", + salt="vca-id", + ) + self.vca_lcm.update_db_2.assert_called_with( + "vca", + "id", + { + "_admin.operationalState": "ENABLED", + "_admin.detailed-status": "Connectivity: ok", + }, + ) + self.lcm_tasks.unlock_HA.assert_called_with( + "vca", + "create", + "order-id", + operationState="COMPLETED", + detailed_status="VCA validated", + ) + self.lcm_tasks.remove.assert_called_with("vca", "id", "order-id") + + def test_vca_lcm_create_exception(self): + vca_content = {"op_id": "order-id", "_id": "id"} + db_vca = { + "_id": "vca-id", + "secret": "secret", + "cacert": "cacert", + "schema_version": "1.11", + } + order_id = "order-id" + self.lcm_tasks.lock_HA.return_value = True + self.vca_lcm.db.get_one.return_value = db_vca + self.vca_lcm.n2vc.validate_vca = AsyncMock() + self.vca_lcm.n2vc.validate_vca.side_effect = Exception("failed") + self.vca_lcm.update_db_2 = Mock() + self.vca_lcm.update_db_2.side_effect = DbException("failed") + self.loop.run_until_complete(self.vca_lcm.create(vca_content, order_id)) + + self.lcm_tasks.lock_HA.assert_called_with("vca", "create", "order-id") + self.vca_lcm.db.encrypt_decrypt_fields.assert_called_with( + db_vca, + "decrypt", + ["secret", "cacert"], + schema_version="1.11", + salt="vca-id", + ) + self.vca_lcm.update_db_2.assert_called_with( + "vca", + "id", + { + "_admin.operationalState": "ERROR", + "_admin.detailed-status": "Failed with exception: failed", + }, + ) + self.lcm_tasks.unlock_HA.assert_not_called() + self.lcm_tasks.remove.assert_called_with("vca", "id", "order-id") + + def test_vca_lcm_delete(self): + vca_content = {"op_id": "order-id", "_id": "id"} + order_id = "order-id" + self.lcm_tasks.lock_HA.return_value = True + self.vca_lcm.update_db_2 = Mock() + + self.loop.run_until_complete(self.vca_lcm.delete(vca_content, order_id)) + + self.lcm_tasks.lock_HA.assert_called_with("vca", "delete", "order-id") + self.vca_lcm.db.del_one.assert_called_with("vca", {"_id": "id"}) + self.vca_lcm.update_db_2.assert_called_with("vca", "id", None) + self.lcm_tasks.unlock_HA.assert_called_with( + "vca", + "delete", + "order-id", + operationState="COMPLETED", + detailed_status="deleted", + ) + self.lcm_tasks.remove.assert_called_with("vca", "id", "order-id") + + def test_vca_lcm_delete_exception(self): + vca_content = {"op_id": "order-id", "_id": "id"} + order_id = "order-id" + self.lcm_tasks.lock_HA.return_value = True + self.vca_lcm.update_db_2 = Mock() + self.vca_lcm.db.del_one.side_effect = Exception("failed deleting") + self.vca_lcm.update_db_2.side_effect = DbException("failed") + + self.loop.run_until_complete(self.vca_lcm.delete(vca_content, order_id)) + + self.lcm_tasks.lock_HA.assert_called_with("vca", "delete", "order-id") + self.vca_lcm.db.del_one.assert_called_with("vca", {"_id": "id"}) + self.vca_lcm.update_db_2.assert_called_with( + "vca", + "id", + { + "_admin.operationalState": "ERROR", + "_admin.detailed-status": "Failed with exception: failed deleting", + }, + ) + self.lcm_tasks.unlock_HA.not_called() + self.lcm_tasks.remove.assert_called_with("vca", "id", "order-id") diff --git a/osm_lcm/vim_sdn.py b/osm_lcm/vim_sdn.py index 13b95c4..a1623ba 100644 --- a/osm_lcm/vim_sdn.py +++ b/osm_lcm/vim_sdn.py @@ -25,6 +25,7 @@ from osm_lcm.lcm_utils import LcmException, LcmBase, deep_get from n2vc.k8s_helm_conn import K8sHelmConnector from n2vc.k8s_helm3_conn import K8sHelm3Connector from n2vc.k8s_juju_conn import K8sJujuConnector +from n2vc.n2vc_juju_conn import N2VCJujuConnector from n2vc.exceptions import K8sException, N2VCException from osm_common.dbbase import DbException from copy import deepcopy @@ -937,7 +938,6 @@ class K8sClusterLcm(LcmBase): log=self.logger, loop=self.loop, on_update_db=None, - vca_config=self.vca_config, db=self.db, fs=self.fs ) @@ -975,8 +975,13 @@ class K8sClusterLcm(LcmBase): for task_name in ("helm-chart", "juju-bundle", "helm-chart-v3"): if init_target and task_name not in init_target: continue - task = asyncio.ensure_future(self.k8s_map[task_name].init_env(k8s_credentials, - reuse_cluster_uuid=k8scluster_id)) + task = asyncio.ensure_future( + self.k8s_map[task_name].init_env( + k8s_credentials, + reuse_cluster_uuid=k8scluster_id, + vca_id=db_k8scluster.get("vca_id"), + ) + ) pending_tasks.append(task) task2name[task] = task_name @@ -1089,7 +1094,11 @@ class K8sClusterLcm(LcmBase): if k8s_jb_id: # delete in reverse order of creation step = "Removing juju-bundle '{}'".format(k8s_jb_id) uninstall_sw = deep_get(db_k8scluster, ("_admin", "juju-bundle", "created")) or False - cluster_removed = await self.juju_k8scluster.reset(cluster_uuid=k8s_jb_id, uninstall_sw=uninstall_sw) + cluster_removed = await self.juju_k8scluster.reset( + cluster_uuid=k8s_jb_id, + uninstall_sw=uninstall_sw, + vca_id=db_k8scluster.get("vca_id"), + ) db_k8scluster_update["_admin.juju-bundle.id"] = None db_k8scluster_update["_admin.juju-bundle.operationalState"] = "DISABLED" @@ -1153,6 +1162,139 @@ class K8sClusterLcm(LcmBase): self.lcm_tasks.remove("k8scluster", k8scluster_id, order_id) +class VcaLcm(LcmBase): + timeout_create = 30 + + def __init__(self, msg, lcm_tasks, config, loop): + """ + Init, Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :return: None + """ + + self.logger = logging.getLogger("lcm.vca") + self.loop = loop + self.lcm_tasks = lcm_tasks + + super().__init__(msg, self.logger) + + # create N2VC connector + self.n2vc = N2VCJujuConnector( + log=self.logger, + loop=self.loop, + fs=self.fs, + db=self.db + ) + + def _get_vca_by_id(self, vca_id: str) -> dict: + db_vca = self.db.get_one("vca", {"_id": vca_id}) + self.db.encrypt_decrypt_fields( + db_vca, + "decrypt", + ["secret", "cacert"], + schema_version=db_vca["schema_version"], salt=db_vca["_id"] + ) + return db_vca + + async def create(self, vca_content, order_id): + op_id = vca_content.pop("op_id", None) + if not self.lcm_tasks.lock_HA("vca", "create", op_id): + return + + vca_id = vca_content["_id"] + self.logger.debug("Task vca_create={} {}".format(vca_id, "Enter")) + + db_vca = None + db_vca_update = {} + + try: + self.logger.debug("Task vca_create={} {}".format(vca_id, "Getting vca from db")) + db_vca = self._get_vca_by_id(vca_id) + + task = asyncio.ensure_future( + asyncio.wait_for( + self.n2vc.validate_vca(db_vca["_id"]), + timeout=self.timeout_create, + ) + ) + + await asyncio.wait([task], return_when=asyncio.FIRST_COMPLETED) + if task.exception(): + raise task.exception() + self.logger.debug("Task vca_create={} {}".format(vca_id, "vca registered and validated successfully")) + db_vca_update["_admin.operationalState"] = "ENABLED" + db_vca_update["_admin.detailed-status"] = "Connectivity: ok" + operation_details = "VCA validated" + operation_state = "COMPLETED" + + self.logger.debug("Task vca_create={} {}".format(vca_id, "Done. Result: {}".format(operation_state))) + + except Exception as e: + error_msg = "Failed with exception: {}".format(e) + self.logger.error("Task vca_create={} {}".format(vca_id, error_msg)) + db_vca_update["_admin.operationalState"] = "ERROR" + db_vca_update["_admin.detailed-status"] = error_msg + operation_state = "FAILED" + operation_details = error_msg + finally: + try: + self.update_db_2("vca", vca_id, db_vca_update) + + # Register the operation and unlock + self.lcm_tasks.unlock_HA( + "vca", + "create", + op_id, + operationState=operation_state, + detailed_status=operation_details + ) + except DbException as e: + self.logger.error("Task vca_create={} {}".format(vca_id, "Cannot update database: {}".format(e))) + self.lcm_tasks.remove("vca", vca_id, order_id) + + async def delete(self, vca_content, order_id): + + # HA tasks and backward compatibility: + # If "vim_content" does not include "op_id", we a running a legacy NBI version. + # In such a case, HA is not supported by NBI, "op_id" is None, and lock_HA() will do nothing. + # Register "delete" task here for related future HA operations + op_id = vca_content.pop("op_id", None) + if not self.lcm_tasks.lock_HA("vca", "delete", op_id): + return + + db_vca_update = {} + vca_id = vca_content["_id"] + + try: + self.logger.debug("Task vca_delete={} {}".format(vca_id, "Deleting vca from db")) + self.db.del_one("vca", {"_id": vca_id}) + db_vca_update = None + operation_details = "deleted" + operation_state = "COMPLETED" + + self.logger.debug("Task vca_delete={} {}".format(vca_id, "Done. Result: {}".format(operation_state))) + except Exception as e: + error_msg = "Failed with exception: {}".format(e) + self.logger.error("Task vca_delete={} {}".format(vca_id, error_msg)) + db_vca_update["_admin.operationalState"] = "ERROR" + db_vca_update["_admin.detailed-status"] = error_msg + operation_state = "FAILED" + operation_details = error_msg + finally: + try: + self.update_db_2("vca", vca_id, db_vca_update) + self.lcm_tasks.unlock_HA( + "vca", + "delete", + op_id, + operationState=operation_state, + detailed_status=operation_details, + ) + except DbException as e: + self.logger.error("Task vca_delete={} {}".format(vca_id, "Cannot update database: {}".format(e))) + self.lcm_tasks.remove("vca", vca_id, order_id) + + class K8sRepoLcm(LcmBase): def __init__(self, msg, lcm_tasks, config, loop): diff --git a/requirements-dev.txt b/requirements-dev.txt index e9e1d38..a3f9edc 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,5 +1,9 @@ aiokafka==0.7.0 # via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master +async-timeout==3.0.1 + # via + # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master + # retrying-async bcrypt==3.2.0 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master @@ -54,6 +58,8 @@ macaroonbakery==1.3.1 # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master # juju # theblues +motor==1.3.1 + # via -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master mypy-extensions==0.4.3 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master @@ -95,7 +101,10 @@ pymacaroons==0.13.0 # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master # macaroonbakery pymongo==3.11.3 - # via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master + # via + # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master + # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master + # motor pynacl==1.4.0 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master @@ -133,6 +142,8 @@ requests==2.25.1 # macaroonbakery # requests-oauthlib # theblues +retrying-async==1.2.0 + # via -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master rsa==4.7.2 # via # -r https://osm.etsi.org/gitweb/?p=osm/N2VC.git;a=blob_plain;f=requirements.txt;hb=master -- 2.25.1