From a0deb2d8f1d4d858fd8208f7403d2e55da80fa43 Mon Sep 17 00:00:00 2001 From: sousaedu Date: Tue, 21 Apr 2020 12:08:14 +0100 Subject: [PATCH 01/16] Fix bug 1053 This fixes bug 1053 by making the sync call before any of the other functions requires the package. Change-Id: I95f61fd88cfa84fbb29e05b40f95c82f84e7cf84 Signed-off-by: sousaedu --- osm_lcm/ns.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index dc53b98..55bebbe 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -1168,7 +1168,6 @@ class NsLcm(LcmBase): element_under_configuration = kdu_name # Get artifact path - self.fs.sync() # Sync from FSMongo artifact_path = "{}/{}/charms/{}".format( base_folder["folder"], base_folder["pkg-dir"], @@ -1548,6 +1547,9 @@ class NsLcm(LcmBase): logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id) self.logger.debug(logging_text + "Enter") + # Sync from FSMongo + self.fs.sync() + # get all needed from database # database nsrs record -- 2.25.1 From 7c4e24ccf1e11dead0d23bece32f83f04ad64c0f Mon Sep 17 00:00:00 2001 From: tierno Date: Wed, 13 May 2020 08:41:35 +0000 Subject: [PATCH 02/16] minor flake8 fixes and spelling Change-Id: I3d8ba97fcc36b472762ebc0279ee432fee5c58db Signed-off-by: tierno --- osm_lcm/ns.py | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 55bebbe..6eb0c17 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -2393,9 +2393,9 @@ class NsLcm(LcmBase): # _ns_execute_primitive() or RO.create_action() will NOT be executed return self.SUBOPERATION_STATUS_SKIP else: - # c. Reintent executing sub-operation + # c. retry executing sub-operation # The sub-operation exists, and operationState != 'COMPLETED' - # Update operationState = 'PROCESSING' to indicate a reintent. + # Update operationState = 'PROCESSING' to indicate a retry. operationState = 'PROCESSING' detailed_status = 'In progress' self._update_suboperation_status( @@ -2408,7 +2408,7 @@ class NsLcm(LcmBase): # Find a sub-operation where all keys in a matching dictionary must match # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match def _find_suboperation(self, db_nslcmop, match): - if (db_nslcmop and match): + if db_nslcmop and match: op_list = db_nslcmop.get('_admin', {}).get('operations', []) for i, op in enumerate(op_list): if all(op.get(k) == match[k] for k in match): @@ -2476,11 +2476,11 @@ class NsLcm(LcmBase): # Check for 3 different cases: # a. New: First time execution, return SUBOPERATION_STATUS_NEW # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP - # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute + # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params, operationType, RO_nsr_id=None, RO_scaling_info=None): # Find this sub-operation - if (RO_nsr_id and RO_scaling_info): + if RO_nsr_id and RO_scaling_info: operationType = 'SCALE-RO' match = { 'member_vnf_index': vnf_index, @@ -3542,10 +3542,10 @@ class NsLcm(LcmBase): db_nsr_update["config-status"] = "configuring pre-scaling" primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params) - # Pre-scale reintent check: Check if this sub-operation has been executed before + # Pre-scale retry check: Check if this sub-operation has been executed before op_index = self._check_or_add_scale_suboperation( db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE') - if (op_index == self.SUBOPERATION_STATUS_SKIP): + if op_index == self.SUBOPERATION_STATUS_SKIP: # Skip sub-operation result = 'COMPLETED' result_detail = 'Done' @@ -3553,20 +3553,20 @@ class NsLcm(LcmBase): "vnf_config_primitive={} Skipped sub-operation, result {} {}".format( vnf_config_primitive, result, result_detail)) else: - if (op_index == self.SUBOPERATION_STATUS_NEW): + if op_index == self.SUBOPERATION_STATUS_NEW: # New sub-operation: Get index of this sub-operation op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation". format(vnf_config_primitive)) else: - # Reintent: Get registered params for this existing sub-operation + # retry: Get registered params for this existing sub-operation op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index] vnf_index = op.get('member_vnf_index') vnf_config_primitive = op.get('primitive') primitive_params = op.get('primitive_params') - self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent". + self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry". format(vnf_config_primitive)) - # Execute the primitive, either with new (first-time) or registered (reintent) args + # Execute the primitive, either with new (first-time) or registered (retry) args result, result_detail = await self._ns_execute_primitive( self._look_for_deployed_vca(nsr_deployed["VCA"], member_vnf_index=vnf_index, @@ -3590,26 +3590,26 @@ class NsLcm(LcmBase): # if (RO_nsr_id and RO_scaling_info): if RO_scaling_info: scale_process = "RO" - # Scale RO reintent check: Check if this sub-operation has been executed before + # Scale RO retry check: Check if this sub-operation has been executed before op_index = self._check_or_add_scale_suboperation( db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info) - if (op_index == self.SUBOPERATION_STATUS_SKIP): + if op_index == self.SUBOPERATION_STATUS_SKIP: # Skip sub-operation result = 'COMPLETED' result_detail = 'Done' self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format( result, result_detail)) else: - if (op_index == self.SUBOPERATION_STATUS_NEW): + if op_index == self.SUBOPERATION_STATUS_NEW: # New sub-operation: Get index of this sub-operation op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1 self.logger.debug(logging_text + "New sub-operation RO") else: - # Reintent: Get registered params for this existing sub-operation + # retry: Get registered params for this existing sub-operation op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index] RO_nsr_id = op.get('RO_nsr_id') RO_scaling_info = op.get('RO_scaling_info') - self.logger.debug(logging_text + "Sub-operation RO reintent".format( + self.logger.debug(logging_text + "Sub-operation RO retry for primitive {}".format( vnf_config_primitive)) RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info}) @@ -3735,7 +3735,7 @@ class NsLcm(LcmBase): db_nsr_update["config-status"] = "configuring post-scaling" primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params) - # Post-scale reintent check: Check if this sub-operation has been executed before + # Post-scale retry check: Check if this sub-operation has been executed before op_index = self._check_or_add_scale_suboperation( db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE') if op_index == self.SUBOPERATION_STATUS_SKIP: @@ -3752,14 +3752,14 @@ class NsLcm(LcmBase): self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation". format(vnf_config_primitive)) else: - # Reintent: Get registered params for this existing sub-operation + # retry: Get registered params for this existing sub-operation op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index] vnf_index = op.get('member_vnf_index') vnf_config_primitive = op.get('primitive') primitive_params = op.get('primitive_params') - self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent". + self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry". format(vnf_config_primitive)) - # Execute the primitive, either with new (first-time) or registered (reintent) args + # Execute the primitive, either with new (first-time) or registered (retry) args result, result_detail = await self._ns_execute_primitive( self._look_for_deployed_vca(nsr_deployed["VCA"], member_vnf_index=vnf_index, -- 2.25.1 From 010c0e76ee06b8659a404bc85ecacee698b0fb7e Mon Sep 17 00:00:00 2001 From: Dominik Fleischmann Date: Mon, 18 May 2020 15:19:11 +0200 Subject: [PATCH 03/16] Fix typo This typo was introduced in Change 8755. Change-Id: I009d5a99dfdfb8d745f2f28fec824ddd11860abc Signed-off-by: Dominik Fleischmann --- osm_lcm/ns.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 6eb0c17..5d780d9 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -2180,7 +2180,7 @@ class NsLcm(LcmBase): storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage')) if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel - filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["'pkg-dir"], k8sclustertype, + filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype, kdumodel) if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'): kdumodel = self.fs.path + filename -- 2.25.1 From d58995a1aa43588713121922beed94b990b37cdf Mon Sep 17 00:00:00 2001 From: tierno Date: Wed, 20 May 2020 14:35:19 +0000 Subject: [PATCH 04/16] fix 1070 k8scluster on error status deletion Change-Id: Ice7a07e909d7c2a99ba3e31aa04c474d5f85b231 Signed-off-by: tierno --- osm_lcm/vim_sdn.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/osm_lcm/vim_sdn.py b/osm_lcm/vim_sdn.py index 6593b46..4cd5a6c 100644 --- a/osm_lcm/vim_sdn.py +++ b/osm_lcm/vim_sdn.py @@ -1074,12 +1074,16 @@ class K8sClusterLcm(LcmBase): uninstall_sw = deep_get(db_k8scluster, ("_admin", "helm-chart", "created")) cluster_removed = True if k8s_hc_id: + step = "Removing helm-chart '{}'".format(k8s_hc_id) uninstall_sw = uninstall_sw or False cluster_removed = await self.helm_k8scluster.reset(cluster_uuid=k8s_hc_id, uninstall_sw=uninstall_sw) + db_k8scluster_update["_admin.helm-chart.id"] = None if k8s_jb_id: + step = "Removing juju-bundle '{}'".format(k8s_jb_id) uninstall_sw = uninstall_sw or False cluster_removed = await self.juju_k8scluster.reset(cluster_uuid=k8s_jb_id, uninstall_sw=uninstall_sw) + db_k8scluster_update["_admin.juju-bundle.id"] = None # Try to remove from cluster_inserted to clean old versions if k8s_hc_id and cluster_removed: @@ -1093,14 +1097,11 @@ class K8sClusterLcm(LcmBase): self.update_db_2("k8srepos", k8srepo["_id"], {"_admin.cluster-inserted": cluster_list}) except Exception as e: self.logger.error("{}: {}".format(step, e)) - self.db.del_one("k8sclusters", {"_id": k8scluster_id}) - else: - raise LcmException("An error happened during the reset of the k8s cluster '{}'".format(k8scluster_id)) - # if not cluster_removed: - # raise Exception("K8scluster was not properly removed") + self.db.del_one("k8sclusters", {"_id": k8scluster_id}) + db_k8scluster_update = {} except Exception as e: - if isinstance(e, (LcmException, DbException)): + if isinstance(e, (LcmException, DbException, K8sException, N2VCException)): self.logger.error(logging_text + "Exit Exception {}".format(e)) else: self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True) -- 2.25.1 From 584465c22911043fe9e44ca97e1186c71f849d6d Mon Sep 17 00:00:00 2001 From: Dominik Fleischmann Date: Thu, 21 May 2020 14:00:43 +0200 Subject: [PATCH 05/16] Changing Juju version in Docker The juju that is integrated in the LCM container, contains a pinned version and needs to be updated for new functionality. Change-Id: I63a8926e745962494224aa804b6ce04f78a7f7b0 Signed-off-by: Dominik Fleischmann --- Dockerfile.local | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile.local b/Dockerfile.local index 25775d6..a64bf4c 100644 --- a/Dockerfile.local +++ b/Dockerfile.local @@ -29,8 +29,8 @@ RUN curl https://get.helm.sh/helm-v2.15.2-linux-amd64.tar.gz --output helm-v2.15 && mv linux-amd64/helm /usr/local/bin/helm \ && rm -r linux-amd64/ -RUN curl -L https://launchpad.net/juju/2.7/2.7.0/+download/juju-2.7.0-k8s.tar.xz --output juju-2.7.0-k8s.tar.xz \ - && tar -xvf juju-2.7.0-k8s.tar.xz \ +RUN curl -L https://launchpad.net/juju/2.7/2.7.6/+download/juju-2.7.6-k8s.tar.xz --output juju-2.7.6-k8s.tar.xz \ + && tar -xvf juju-2.7.6-k8s.tar.xz \ && mv juju /usr/local/bin/juju RUN apt-get update && apt-get install -y git tox python3 \ -- 2.25.1 From d732fb8be7345fc3e9381750359a5b027929a8e2 Mon Sep 17 00:00:00 2001 From: tierno Date: Thu, 21 May 2020 13:18:23 +0000 Subject: [PATCH 06/16] fix 1073 obtain nsd from database, not from nsr[nsd] Change-Id: I2846a215d6144743ec7e52b81921ea1b557cc0fa Signed-off-by: tierno --- osm_lcm/ns.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 5d780d9..0a70367 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -1605,8 +1605,9 @@ class NsLcm(LcmBase): # read from db: ns stage[1] = "Getting nsr={} from db".format(nsr_id) db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) - # nsd is replicated into ns (no db read) - nsd = db_nsr["nsd"] + stage[1] = "Getting nsd={} from db".format(db_nsr["nsd-id"]) + nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]}) + db_nsr["nsd"] = nsd # nsr_name = db_nsr["name"] # TODO short-name?? # read from db: vnf's of this ns -- 2.25.1 From 171f35490b6e2c806b28de91e970921714f29346 Mon Sep 17 00:00:00 2001 From: David Garcia Date: Thu, 21 May 2020 16:41:07 +0200 Subject: [PATCH 07/16] Fix bug 1074: Get NS Configuration from database Change-Id: I10d77e47ebab0669d8d2b43699c5224654a902b9 Signed-off-by: David Garcia --- osm_lcm/ns.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 0a70367..dddf99f 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -1967,13 +1967,14 @@ class NsLcm(LcmBase): # read nsr record db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]}) # this VCA data my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index] # read all ns-configuration relations ns_relations = list() - db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation')) + db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation')) if db_ns_relations: for r in db_ns_relations: # check if this VCA is in the relation -- 2.25.1 From f691984c50667cd45a01b555ad63ef8179575cb7 Mon Sep 17 00:00:00 2001 From: David Garcia Date: Thu, 21 May 2020 16:41:07 +0200 Subject: [PATCH 08/16] Bug 1042: increase nslcmop action default timeout Change-Id: I46836f64a504a85b07e2803874cbf35f58c405c1 Signed-off-by: tierno --- osm_lcm/ns.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index dddf99f..489c436 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -49,8 +49,8 @@ class NsLcm(LcmBase): timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns timeout_ns_terminate = 1800 # default global timeout for un deployment a ns timeout_charm_delete = 10 * 60 - timeout_primitive = 10 * 60 # timeout for primitive execution - timeout_progress_primitive = 2 * 60 # timeout for some progress in a primitive execution + timeout_primitive = 30 * 60 # timeout for primitive execution + timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution SUBOPERATION_STATUS_NOT_FOUND = -1 SUBOPERATION_STATUS_NEW = -2 -- 2.25.1 From e972cbc4071e1b838ce1640a50dbd85532d43679 Mon Sep 17 00:00:00 2001 From: Felipe Vicens Date: Fri, 22 May 2020 15:14:31 +0200 Subject: [PATCH 09/16] Fix #1076 slice instantiation without CP in NSD fail Change-Id: I7a62b7d73fc57fa5c3630475043eb6dd39fc2ef5 Signed-off-by: Felipe Vicens --- osm_lcm/netslice.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osm_lcm/netslice.py b/osm_lcm/netslice.py index fe8a7b9..4d731af 100644 --- a/osm_lcm/netslice.py +++ b/osm_lcm/netslice.py @@ -248,7 +248,7 @@ class NetsliceLcm(LcmBase): if nss_cp_item["nss-ref"] == nss["nss-id"]: db_nsds = self.db.get_one("nsds", {"_id": nss["nsdId"]}) # Go for nsd, and search the CP that match with nst:CP to get vld-id-ref - for cp_nsd in db_nsds["connection-point"]: + for cp_nsd in db_nsds.get("connection-point", ()): if cp_nsd["name"] == nss_cp_item["nsd-connection-point-ref"]: if nslcmop.get("operationParams"): if nslcmop["operationParams"].get("nsName") == nss["nsName"]: -- 2.25.1 From 06a11f2e11c0f4e68f31dc47b90b78a66c03b60c Mon Sep 17 00:00:00 2001 From: David Garcia Date: Wed, 25 Mar 2020 18:21:37 +0100 Subject: [PATCH 10/16] Feature 8720: Add scale support for charms Change-Id: Ibbd561bd9ed14d9e3869b5aa369371cc11dab7b2 Signed-off-by: David Garcia --- osm_lcm/ns.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 489c436..9514768 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -1271,11 +1271,24 @@ class NsLcm(LcmBase): deploy_params ) break + num_units = 1 + if is_proxy_charm: + if element_type == "NS": + num_units = db_nsr.get("config-units") or 1 + elif element_type == "VNF": + num_units = db_vnfr.get("config-units") or 1 + elif element_type == "VDU": + for v in db_vnfr["vdur"]: + if vdu_id == v["vdu-id-ref"]: + num_units = v.get("config-units") or 1 + break + await self.n2vc.install_configuration_sw( ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict, - config=config + config=config, + num_units=num_units ) # write in db flag of configuration_sw already installed -- 2.25.1 From 69f0d388812e3befeba6c6115803ebdfb76b7294 Mon Sep 17 00:00:00 2001 From: tierno Date: Thu, 7 May 2020 13:08:09 +0000 Subject: [PATCH 11/16] Feature 7184 New generation RO Allow using the new version of RO by the config parameter 'RO.ng=True' or the env 'OSMLCM_RO_NG=True' Change-Id: I17dfe0326ac26b93c74cbf1b2540a5e383de6456 Signed-off-by: tierno --- osm_lcm/ROclient.py | 42 +++---- osm_lcm/lcm.py | 26 +++-- osm_lcm/ng_ro.py | 189 +++++++++++++++++++++++++++++++ osm_lcm/ns.py | 234 +++++++++++++++++++++++++++++++++++---- osm_lcm/tests/test_ns.py | 4 +- 5 files changed, 444 insertions(+), 51 deletions(-) create mode 100644 osm_lcm/ng_ro.py diff --git a/osm_lcm/ROclient.py b/osm_lcm/ROclient.py index b0c16f9..0ca00a5 100644 --- a/osm_lcm/ROclient.py +++ b/osm_lcm/ROclient.py @@ -117,9 +117,9 @@ class ROClient: timeout_large = 120 timeout_short = 30 - def __init__(self, loop, endpoint_url, **kwargs): + def __init__(self, loop, uri, **kwargs): self.loop = loop - self.endpoint_url = endpoint_url + self.uri = uri self.username = kwargs.get("username") self.password = kwargs.get("password") @@ -127,7 +127,7 @@ class ROClient: self.tenant = None self.datacenter_id_name = kwargs.get("datacenter") self.datacenter = None - logger_name = kwargs.get('logger_name', 'ROClient') + logger_name = kwargs.get('logger_name', 'lcm.ro') self.logger = logging.getLogger(logger_name) if kwargs.get("loglevel"): self.logger.setLevel(kwargs["loglevel"]) @@ -143,8 +143,8 @@ class ROClient: return self.username elif index == 'password': return self.password - elif index == 'endpoint_url': - return self.endpoint_url + elif index == 'uri': + return self.uri else: raise KeyError("Invalid key '{}'".format(index)) @@ -157,8 +157,8 @@ class ROClient: self.username = value elif index == 'password': self.password = value - elif index == 'endpoint_url': - self.endpoint_url = value + elif index == 'uri': + self.uri = value else: raise KeyError("Invalid key '{}'".format(index)) self.tenant = None # force to reload tenant with different credentials @@ -434,7 +434,7 @@ class ROClient: tenant_text = "/" + self.tenant item_id = 0 - url = "{}{}/{}".format(self.endpoint_url, tenant_text, item) + url = "{}{}/{}".format(self.uri, tenant_text, item) if self.check_if_uuid(item_id_name): item_id = item_id_name url += "/" + item_id_name @@ -485,7 +485,7 @@ class ROClient: # check that exist uuid = await self._get_item_uuid(session, item, item_id_name, all_tenants) - url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, uuid) + url = "{}{}/{}/{}".format(self.uri, tenant_text, item, uuid) if extra_item: url += "/" + extra_item if extra_item_id: @@ -547,7 +547,7 @@ class ROClient: else: action = "/{}".format(action) - url = "{}{apiver}{tenant}/{item}{id}{action}".format(self.endpoint_url, apiver=api_version_text, + url = "{}{apiver}{tenant}/{item}{id}{action}".format(self.uri, apiver=api_version_text, tenant=tenant_text, item=item, id=uuid, action=action) self.logger.debug("RO POST %s %s", url, payload_req) # timeout = aiohttp.ClientTimeout(total=self.timeout_large) @@ -577,7 +577,7 @@ class ROClient: else: uuid = item_id_name - url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, uuid) + url = "{}{}/{}/{}".format(self.uri, tenant_text, item, uuid) self.logger.debug("DELETE %s", url) # timeout = aiohttp.ClientTimeout(total=self.timeout_short) async with session.delete(url, headers=self.headers_req) as response: @@ -598,7 +598,7 @@ class ROClient: await self._get_tenant(session) tenant_text = "/" + self.tenant - url = "{}{}/{}".format(self.endpoint_url, tenant_text, item) + url = "{}{}/{}".format(self.uri, tenant_text, item) separator = "?" if filter_dict: for k in filter_dict: @@ -627,7 +627,7 @@ class ROClient: payload_req = yaml.safe_dump(descriptor) # print payload_req - url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, item_id) + url = "{}{}/{}/{}".format(self.uri, tenant_text, item, item_id) self.logger.debug("RO PUT %s %s", url, payload_req) # timeout = aiohttp.ClientTimeout(total=self.timeout_large) async with session.put(url, headers=self.headers_req, data=payload_req) as response: @@ -646,7 +646,7 @@ class ROClient: try: response_text = "" async with aiohttp.ClientSession(loop=self.loop) as session: - url = "{}/version".format(self.endpoint_url) + url = "{}/version".format(self.uri) self.logger.debug("RO GET %s", url) # timeout = aiohttp.ClientTimeout(total=self.timeout_short) async with session.get(url, headers=self.headers_req) as response: @@ -943,7 +943,7 @@ class ROClient: item_id = await self._get_item_uuid(session, self.client_to_RO[item], item_id_name, all_tenants=True) await self._get_tenant(session) - url = "{}/{tenant}/{item}/{item_id}".format(self.endpoint_url, tenant=self.tenant, + url = "{}/{tenant}/{item}/{item_id}".format(self.uri, tenant=self.tenant, item=self.client_to_RO[item], item_id=item_id) self.logger.debug("RO POST %s %s", url, payload_req) # timeout = aiohttp.ClientTimeout(total=self.timeout_large) @@ -969,7 +969,7 @@ class ROClient: item_id = await self._get_item_uuid(session, self.client_to_RO[item], item_id_name, all_tenants=False) tenant = await self._get_tenant(session) - url = "{}/{tenant}/{item}/{datacenter}".format(self.endpoint_url, tenant=tenant, + url = "{}/{tenant}/{item}/{datacenter}".format(self.uri, tenant=tenant, item=self.client_to_RO[item], datacenter=item_id) self.logger.debug("RO DELETE %s", url) @@ -1097,7 +1097,7 @@ class ROClient: datacenter = self.get_datacenter(session) if action == "list": - url = "{}{}/vim/{}/{}".format(self.endpoint_url, tenant_text, datacenter, item) + url = "{}{}/vim/{}/{}".format(self.uri, tenant_text, datacenter, item) self.logger.debug("GET %s", url) mano_response = requests.get(url, headers=self.headers_req) self.logger.debug("RO response: %s", mano_response.text) @@ -1107,7 +1107,7 @@ class ROClient: else: raise ROClientException(str(content), http_code=mano_response.status) elif action == "get" or action == "show": - url = "{}{}/vim/{}/{}/{}".format(self.endpoint_url, tenant_text, datacenter, item, uuid) + url = "{}{}/vim/{}/{}/{}".format(self.uri, tenant_text, datacenter, item, uuid) self.logger.debug("GET %s", url) mano_response = requests.get(url, headers=self.headers_req) self.logger.debug("RO response: %s", mano_response.text) @@ -1117,7 +1117,7 @@ class ROClient: else: raise ROClientException(str(content), http_code=mano_response.status) elif action == "delete": - url = "{}{}/vim/{}/{}/{}".format(self.endpoint_url, tenant_text, datacenter, item, uuid) + url = "{}{}/vim/{}/{}/{}".format(self.uri, tenant_text, datacenter, item, uuid) self.logger.debug("DELETE %s", url) mano_response = requests.delete(url, headers=self.headers_req) self.logger.debug("RO response: %s", mano_response.text) @@ -1145,7 +1145,7 @@ class ROClient: descriptor[item[:-1]]['description'] = kwargs["description"] payload_req = yaml.safe_dump(descriptor) # print payload_req - url = "{}{}/vim/{}/{}".format(self.endpoint_url, tenant_text, datacenter, item) + url = "{}{}/vim/{}/{}".format(self.uri, tenant_text, datacenter, item) self.logger.debug("RO POST %s %s", url, payload_req) mano_response = requests.post(url, headers=self.headers_req, data=payload_req) self.logger.debug("RO response: %s", mano_response.text) @@ -1177,7 +1177,7 @@ if __name__ == '__main__': tenant_id = None vim_id = False loop = asyncio.get_event_loop() - myClient = ROClient(endpoint_url=RO_URL, loop=loop, loglevel="DEBUG") + myClient = ROClient(uri=RO_URL, loop=loop, loglevel="DEBUG") try: # test tenant content = loop.run_until_complete(myClient.get_list("tenant")) diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index b306720..5d27277 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -32,7 +32,8 @@ import sys from osm_lcm import ns from osm_lcm import vim_sdn from osm_lcm import netslice -from osm_lcm import ROclient +from osm_lcm.ng_ro import NgRoException, NgRoClient +from osm_lcm.ROclient import ROClient, ROClientException from time import time from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit @@ -86,11 +87,18 @@ class Lcm: config = self.read_config_file(config_file) self.config = config self.config["ro_config"] = { - "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]), + "ng": config["RO"].get("ng", False), + "uri": config["RO"].get("uri"), "tenant": config.get("tenant", "osm"), - "logger_name": "lcm.ROclient", - "loglevel": "ERROR", + "logger_name": "lcm.roclient", + "loglevel": config["RO"].get("loglevel", "ERROR"), } + if not self.config["ro_config"]["uri"]: + if not self.config["ro_config"]["ng"]: + self.config["ro_config"]["uri"] = "http://{}:{}/openmano".format(config["RO"]["host"], + config["RO"]["port"]) + else: + self.config["ro_config"]["uri"] = "http://{}:{}/ro".format(config["RO"]["host"], config["RO"]["port"]) self.loop = loop or asyncio.get_event_loop() @@ -197,17 +205,19 @@ class Lcm: last_error = None while True: try: - ro_server = ROclient.ROClient(self.loop, **self.config["ro_config"]) + if self.config["ro_config"].get("ng"): + ro_server = NgRoClient(self.loop, **self.config["ro_config"]) + else: + ro_server = ROClient(self.loop, **self.config["ro_config"]) ro_version = await ro_server.get_version() if versiontuple(ro_version) < versiontuple(min_RO_version): raise LcmException("Not compatible osm/RO version '{}'. Needed '{}' or higher".format( ro_version, min_RO_version)) self.logger.info("Connected to RO version {}".format(ro_version)) return - except ROclient.ROClientException as e: + except (ROClientException, NgRoException) as e: tries -= 1 - error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["endpoint_url"], - e) + error_text = "Error while connecting to RO on {}: {}".format(self.config["ro_config"]["uri"], e) if tries <= 0: self.logger.critical(error_text) raise LcmException(error_text) diff --git a/osm_lcm/ng_ro.py b/osm_lcm/ng_ro.py new file mode 100644 index 0000000..6e9f683 --- /dev/null +++ b/osm_lcm/ng_ro.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +## +# Copyright 2020 Telefónica Investigación y Desarrollo, S.A.U. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +## + +""" +asyncio RO python client to interact with New Generation RO server +""" + +import asyncio +import aiohttp +import yaml +import logging + +__author__ = "Alfonso Tierno = 300: + raise NgRoException(response_text, http_code=response.status) + return self._parse_yaml(response_text, response=True) + except (aiohttp.ClientOSError, aiohttp.ClientError) as e: + raise NgRoException(e, http_code=504) + except asyncio.TimeoutError: + raise NgRoException("Timeout", http_code=504) + + async def status(self, nsr_id, action_id): + try: + url = "{}/ns/v1/deploy/{nsr_id}/{action_id}".format(self.endpoint_url, nsr_id=nsr_id, action_id=action_id) + async with aiohttp.ClientSession(loop=self.loop) as session: + self.logger.debug("GET %s", url) + # timeout = aiohttp.ClientTimeout(total=self.timeout_short) + async with session.get(url, headers=self.headers_req) as response: + response_text = await response.read() + self.logger.debug("GET {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status >= 300: + raise NgRoException(response_text, http_code=response.status) + return self._parse_yaml(response_text, response=True) + + except (aiohttp.ClientOSError, aiohttp.ClientError) as e: + raise NgRoException(e, http_code=504) + except asyncio.TimeoutError: + raise NgRoException("Timeout", http_code=504) + + async def delete(self, nsr_id): + try: + url = "{}/ns/v1/deploy/{nsr_id}".format(self.endpoint_url, nsr_id=nsr_id) + async with aiohttp.ClientSession(loop=self.loop) as session: + self.logger.debug("DELETE %s", url) + # timeout = aiohttp.ClientTimeout(total=self.timeout_short) + async with session.delete(url, headers=self.headers_req) as response: + self.logger.debug("DELETE {} [{}]".format(url, response.status)) + if response.status >= 300: + raise NgRoException("Delete {}".format(nsr_id), http_code=response.status) + return + + except (aiohttp.ClientOSError, aiohttp.ClientError) as e: + raise NgRoException(e, http_code=504) + except asyncio.TimeoutError: + raise NgRoException("Timeout", http_code=504) + + async def get_version(self): + """ + Obtain RO server version. + :return: a list with integers ["major", "minor", "release"]. Raises NgRoException on Error, + """ + try: + response_text = "" + async with aiohttp.ClientSession(loop=self.loop) as session: + url = "{}/version".format(self.endpoint_url) + self.logger.debug("RO GET %s", url) + # timeout = aiohttp.ClientTimeout(total=self.timeout_short) + async with session.get(url, headers=self.headers_req) as response: + response_text = await response.read() + self.logger.debug("GET {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status >= 300: + raise NgRoException(response_text, http_code=response.status) + + for word in str(response_text).split(" "): + if "." in word: + version_text, _, _ = word.partition("-") + return version_text + raise NgRoException("Got invalid version text: '{}'".format(response_text), http_code=500) + except (aiohttp.ClientOSError, aiohttp.ClientError) as e: + raise NgRoException(e, http_code=504) + except asyncio.TimeoutError: + raise NgRoException("Timeout", http_code=504) + except Exception as e: + raise NgRoException("Got invalid version text: '{}'; causing exception {}".format(response_text, e), + http_code=500) + + @staticmethod + def _parse_yaml(descriptor, response=False): + try: + return yaml.safe_load(descriptor) + except yaml.YAMLError as exc: + error_pos = "" + if hasattr(exc, 'problem_mark'): + mark = exc.problem_mark + error_pos = " at line:{} column:{}s".format(mark.line + 1, mark.column + 1) + error_text = "yaml format error" + error_pos + if response: + raise NgRoException("reponse with " + error_text) + raise NgRoException(error_text) diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 9514768..1fbe845 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -25,6 +25,7 @@ import json from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError from osm_lcm import ROclient +from osm_lcm.ng_ro import NgRoClient, NgRoException from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict from n2vc.k8s_helm_conn import K8sHelmConnector from n2vc.k8s_juju_conn import K8sJujuConnector @@ -41,7 +42,7 @@ from time import time from uuid import uuid4 from functools import partial -__author__ = "Alfonso Tierno" +__author__ = "Alfonso Tierno " class NsLcm(LcmBase): @@ -74,6 +75,7 @@ class NsLcm(LcmBase): self.lcm_tasks = lcm_tasks self.timeout = config["timeout"] self.ro_config = config["ro_config"] + self.ng_ro = config["ro_config"].get("ng") self.vca_config = config["VCA"].copy() # create N2VC connector @@ -113,7 +115,10 @@ class NsLcm(LcmBase): "juju": self.k8sclusterjuju, } # create RO client - self.RO = ROclient.ROClient(self.loop, **self.ro_config) + if self.ng_ro: + self.RO = NgRoClient(self.loop, **self.ro_config) + else: + self.RO = ROclient.ROClient(self.loop, **self.ro_config) def _on_update_ro_db(self, nsrs_id, ro_descriptor): @@ -756,6 +761,179 @@ class NsLcm(LcmBase): primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []}) return primitive_list + async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref, + n2vc_key_list, stage, start_deploy, timeout_ns_deploy): + nslcmop_id = db_nslcmop["_id"] + target = { + "name": db_nsr["name"], + "ns": {"vld": []}, + "vnf": [], + "image": deepcopy(db_nsr["image"]), + "flavor": deepcopy(db_nsr["flavor"]), + "action_id": nslcmop_id, + } + for image in target["image"]: + image["vim_info"] = [] + for flavor in target["flavor"]: + flavor["vim_info"] = [] + + ns_params = db_nslcmop.get("operationParams") + ssh_keys = [] + if ns_params.get("ssh_keys"): + ssh_keys += ns_params.get("ssh_keys") + if n2vc_key_list: + ssh_keys += n2vc_key_list + + cp2target = {} + for vld_index, vld in enumerate(nsd.get("vld")): + target_vld = {"id": vld["id"], + "name": vld["name"], + "mgmt-network": vld.get("mgmt-network", False), + "type": vld.get("type"), + "vim_info": [{"vim-network-name": vld.get("vim-network-name"), + "vim_account_id": ns_params["vimAccountId"]}], + } + for cp in vld["vnfd-connection-point-ref"]: + cp2target["member_vnf:{}.{}".format(cp["member-vnf-index-ref"], cp["vnfd-connection-point-ref"])] = \ + "nsrs:{}:vld.{}".format(nsr_id, vld_index) + target["ns"]["vld"].append(target_vld) + for vnfr in db_vnfrs.values(): + vnfd = db_vnfds_ref[vnfr["vnfd-ref"]] + target_vnf = deepcopy(vnfr) + for vld in target_vnf.get("vld", ()): + # check if connected to a ns.vld + vnf_cp = next((cp for cp in vnfd.get("connection-point", ()) if + cp.get("internal-vld-ref") == vld["id"]), None) + if vnf_cp: + ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"]) + if cp2target.get(ns_cp): + vld["target"] = cp2target[ns_cp] + vld["vim_info"] = [{"vim-network-name": vld.get("vim-network-name"), + "vim_account_id": vnfr["vim-account-id"]}] + + for vdur in target_vnf.get("vdur", ()): + vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}] + vdud_index, vdud = next(k for k in enumerate(vnfd["vdu"]) if k[1]["id"] == vdur["vdu-id-ref"]) + # vdur["additionalParams"] = vnfr.get("additionalParamsForVnf") # TODO additional params for VDU + + if ssh_keys: + if deep_get(vdud, ("vdu-configuration", "config-access", "ssh-access", "required")): + vdur["ssh-keys"] = ssh_keys + vdur["ssh-access-required"] = True + elif deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required")) and \ + any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]): + vdur["ssh-keys"] = ssh_keys + vdur["ssh-access-required"] = True + + # cloud-init + if vdud.get("cloud-init-file"): + vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file")) + elif vdud.get("cloud-init"): + vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], vdud_index) + + # flavor + ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])] + if not next((vi for vi in ns_flavor["vim_info"] if + vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None): + ns_flavor["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]}) + # image + ns_image = target["image"][int(vdur["ns-image-id"])] + if not next((vi for vi in ns_image["vim_info"] if + vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None): + ns_image["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]}) + + vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}] + target["vnf"].append(target_vnf) + + desc = await self.RO.deploy(nsr_id, target) + action_id = desc["action_id"] + await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage) + + # Updating NSR + db_nsr_update = { + "_admin.deployed.RO.operational-status": "running", + "detailed-status": " ".join(stage) + } + # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM" + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) + self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id)) + return + + async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_time, timeout, stage): + detailed_status_old = None + db_nsr_update = {} + while time() <= start_time + timeout: + desc_status = await self.RO.status(nsr_id, action_id) + if desc_status["status"] == "FAILED": + raise NgRoException(desc_status["details"]) + elif desc_status["status"] == "BUILD": + stage[2] = "VIM: ({})".format(desc_status["details"]) + elif desc_status["status"] == "DONE": + stage[2] = "Deployed at VIM" + break + else: + assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"]) + if stage[2] != detailed_status_old: + detailed_status_old = stage[2] + db_nsr_update["detailed-status"] = " ".join(stage) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) + await asyncio.sleep(5, loop=self.loop) + else: # timeout_ns_deploy + raise NgRoException("Timeout waiting ns to deploy") + + async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage): + db_nsr_update = {} + failed_detail = [] + action_id = None + start_deploy = time() + try: + target = { + "ns": {"vld": []}, + "vnf": [], + "image": [], + "flavor": [], + } + desc = await self.RO.deploy(nsr_id, target) + action_id = desc["action_id"] + db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id + db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING" + self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id)) + + # wait until done + delete_timeout = 20 * 60 # 20 minutes + await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage) + + db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None + db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED" + # delete all nsr + await self.RO.delete(nsr_id) + except Exception as e: + if isinstance(e, NgRoException) and e.http_code == 404: # not found + db_nsr_update["_admin.deployed.RO.nsr_id"] = None + db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED" + db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None + self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id)) + elif isinstance(e, NgRoException) and e.http_code == 409: # conflict + failed_detail.append("delete conflict: {}".format(e)) + self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e)) + else: + failed_detail.append("delete error: {}".format(e)) + self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e)) + + if failed_detail: + stage[2] = "Error deleting from VIM" + else: + stage[2] = "Deleted from VIM" + db_nsr_update["detailed-status"] = " ".join(stage) + self.update_db_2("nsrs", nsr_id, db_nsr_update) + self._write_op_status(nslcmop_id, stage) + + if failed_detail: + raise LcmException("; ".join(failed_detail)) + return + async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list, stage): """ @@ -793,6 +971,10 @@ class NsLcm(LcmBase): else: ns_params["vimAccountId"] == vnfr["vim-account-id"] + if self.ng_ro: + return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, + db_vnfds_ref, n2vc_key_list, stage, start_deploy, + timeout_ns_deploy) # deploy RO # get vnfds, instantiate at RO for c_vnf in nsd.get("constituent-vnfd", ()): @@ -988,7 +1170,7 @@ class NsLcm(LcmBase): self._write_op_status(nslcmop_id, stage) # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update) # self.logger.debug(logging_text + "Deployed at VIM") - except (ROclient.ROClientException, LcmException, DbException) as e: + except (ROclient.ROClientException, LcmException, DbException, NgRoException) as e: stage[2] = "ERROR deploying at VIM" self.set_vnfr_at_error(db_vnfrs, str(e)) raise @@ -1068,21 +1250,29 @@ class NsLcm(LcmBase): return ip_address try: ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index - result_dict = await self.RO.create_action( - item="ns", - item_id_name=ro_nsr_id, - descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user} - ) - # result_dict contains the format {VM-id: {vim_result: 200, description: text}} - if not result_dict or not isinstance(result_dict, dict): - raise LcmException("Unknown response from RO when injecting key") - for result in result_dict.values(): - if result.get("vim_result") == 200: - break - else: - raise ROclient.ROClientException("error injecting key: {}".format( - result.get("description"))) - break + if self.ng_ro: + target = {"action": "inject_ssh_key", "key": pub_key, "user": user, + "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdu_id}]}], + } + await self.RO.deploy(nsr_id, target) + else: + result_dict = await self.RO.create_action( + item="ns", + item_id_name=ro_nsr_id, + descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user} + ) + # result_dict contains the format {VM-id: {vim_result: 200, description: text}} + if not result_dict or not isinstance(result_dict, dict): + raise LcmException("Unknown response from RO when injecting key") + for result in result_dict.values(): + if result.get("vim_result") == 200: + break + else: + raise ROclient.ROClientException("error injecting key: {}".format( + result.get("description"))) + break + except NgRoException as e: + raise LcmException("Reaching max tries injecting key. Error: {}".format(e)) except ROclient.ROClientException as e: if not nb_tries: self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds". @@ -2896,8 +3086,12 @@ class NsLcm(LcmBase): # remove from RO stage[1] = "Deleting ns from VIM." - task_delete_ro = asyncio.ensure_future( - self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage)) + if self.ng_ro: + task_delete_ro = asyncio.ensure_future( + self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage)) + else: + task_delete_ro = asyncio.ensure_future( + self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage)) tasks_dict_info[task_delete_ro] = "Removing deployment from VIM" # rest of staff will be done at finally diff --git a/osm_lcm/tests/test_ns.py b/osm_lcm/tests/test_ns.py index e35afc6..21338f2 100644 --- a/osm_lcm/tests/test_ns.py +++ b/osm_lcm/tests/test_ns.py @@ -60,8 +60,8 @@ lcm_config = { 'ca_cert': getenv("OSMLCM_VCA_CACERT", None) }, "ro_config": { - "endpoint_url": "http://{}:{}/openmano".format(getenv("OSMLCM_RO_HOST", "ro"), - getenv("OSMLCM_RO_PORT", "9090")), + "uri": "http://{}:{}/openmano".format(getenv("OSMLCM_RO_HOST", "ro"), + getenv("OSMLCM_RO_PORT", "9090")), "tenant": getenv("OSMLCM_RO_TENANT", "osm"), "logger_name": "lcm.ROclient", "loglevel": "DEBUG", -- 2.25.1 From eccc21010cd4aedd32763acee92fd63cf061cfd1 Mon Sep 17 00:00:00 2001 From: Dominik Fleischmann Date: Tue, 9 Jun 2020 11:55:08 +0200 Subject: [PATCH 12/16] 8716 - K8s Proxy Charms This commit enables the possibility to deploy Proxy Charms on Kubernetes. It also adds the k8s_cloud variable which will state the name of the k8s that VCA will use to deploy them. Change-Id: I78763a67bdd51db60f7883fe01e04ac21170dfd6 Signed-off-by: Dominik Fleischmann --- osm_lcm/lcm.cfg | 1 + osm_lcm/ns.py | 162 +++++++++++++++++++++++++----------------------- 2 files changed, 86 insertions(+), 77 deletions(-) diff --git a/osm_lcm/lcm.cfg b/osm_lcm/lcm.cfg index fa72b7f..8f141f1 100644 --- a/osm_lcm/lcm.cfg +++ b/osm_lcm/lcm.cfg @@ -42,6 +42,7 @@ VCA: user: admin secret: secret cloud: localhost + k8s_cloud: k8scloud helmpath: /usr/local/bin/helm kubectlpath: /usr/bin/kubectl jujupath: /usr/local/bin/juju diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 1fbe845..3951ae5 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -1365,74 +1365,72 @@ class NsLcm(LcmBase): ) is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None + is_k8s_proxy_charm = False + if deep_get(config_descriptor, ('juju', 'proxy')) is False: is_proxy_charm = False - # n2vc_redesign STEP 3.1 + if deep_get(config_descriptor, ('juju', 'k8s')) is True and is_proxy_charm: + is_k8s_proxy_charm = True - # find old ee_id if exists - ee_id = vca_deployed.get("ee_id") + if not is_k8s_proxy_charm: + # n2vc_redesign STEP 3.1 - # create or register execution environment in VCA - if is_proxy_charm: + # find old ee_id if exists + ee_id = vca_deployed.get("ee_id") - self._write_configuration_status( - nsr_id=nsr_id, - vca_index=vca_index, - status='CREATING', - element_under_configuration=element_under_configuration, - element_type=element_type - ) + # create or register execution environment in VCA + if is_proxy_charm: - step = "create execution environment" - self.logger.debug(logging_text + step) - ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace, - reuse_ee_id=ee_id, - db_dict=db_dict) + self._write_configuration_status( + nsr_id=nsr_id, + vca_index=vca_index, + status='CREATING', + element_under_configuration=element_under_configuration, + element_type=element_type + ) - else: - step = "Waiting to VM being up and getting IP address" - self.logger.debug(logging_text + step) - rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, - user=None, pub_key=None) - credentials = {"hostname": rw_mgmt_ip} - # get username - username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user")) - # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were - # merged. Meanwhile let's get username from initial-config-primitive - if not username and config_descriptor.get("initial-config-primitive"): - for config_primitive in config_descriptor["initial-config-primitive"]: - for param in config_primitive.get("parameter", ()): - if param["name"] == "ssh-username": - username = param["value"] - break - if not username: - raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with " - "'config-access.ssh-access.default-user'") - credentials["username"] = username - # n2vc_redesign STEP 3.2 + step = "create execution environment" + self.logger.debug(logging_text + step) + ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace, + reuse_ee_id=ee_id, + db_dict=db_dict) - self._write_configuration_status( - nsr_id=nsr_id, - vca_index=vca_index, - status='REGISTERING', - element_under_configuration=element_under_configuration, - element_type=element_type - ) + else: + step = "Waiting to VM being up and getting IP address" + self.logger.debug(logging_text + step) + rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, + user=None, pub_key=None) + credentials = {"hostname": rw_mgmt_ip} + # get username + username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user")) + # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were + # merged. Meanwhile let's get username from initial-config-primitive + if not username and config_descriptor.get("initial-config-primitive"): + for config_primitive in config_descriptor["initial-config-primitive"]: + for param in config_primitive.get("parameter", ()): + if param["name"] == "ssh-username": + username = param["value"] + break + if not username: + raise LcmException("Cannot determine the username neither with" + "'initial-config-promitive' nor with " + "'config-access.ssh-access.default-user'") + credentials["username"] = username + # n2vc_redesign STEP 3.2 + + self._write_configuration_status( + nsr_id=nsr_id, + vca_index=vca_index, + status='REGISTERING', + element_under_configuration=element_under_configuration, + element_type=element_type + ) - step = "register execution environment {}".format(credentials) - self.logger.debug(logging_text + step) - ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace, - db_dict=db_dict) - - # for compatibility with MON/POL modules, the need model and application name at database - # TODO ask to N2VC instead of assuming the format "model_name.application_name" - ee_id_parts = ee_id.split('.') - model_name = ee_id_parts[0] - application_name = ee_id_parts[1] - db_nsr_update = {db_update_entry + "model": model_name, - db_update_entry + "application": application_name, - db_update_entry + "ee_id": ee_id} + step = "register execution environment {}".format(credentials) + self.logger.debug(logging_text + step) + ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace, + db_dict=db_dict) # n2vc_redesign STEP 3.3 @@ -1444,7 +1442,6 @@ class NsLcm(LcmBase): status='INSTALLING SW', element_under_configuration=element_under_configuration, element_type=element_type, - other_update=db_nsr_update ) # TODO check if already done @@ -1461,25 +1458,36 @@ class NsLcm(LcmBase): deploy_params ) break - num_units = 1 - if is_proxy_charm: - if element_type == "NS": - num_units = db_nsr.get("config-units") or 1 - elif element_type == "VNF": - num_units = db_vnfr.get("config-units") or 1 - elif element_type == "VDU": - for v in db_vnfr["vdur"]: - if vdu_id == v["vdu-id-ref"]: - num_units = v.get("config-units") or 1 - break + if is_k8s_proxy_charm: + charm_name = deep_get(config_descriptor, ('juju', 'charm')) + self.logger.debug("Installing K8s Proxy Charm: {}".format(charm_name)) + + ee_id = await self.n2vc.install_k8s_proxy_charm( + charm_name=charm_name, + namespace=namespace, + artifact_path=artifact_path, + db_dict=db_dict + ) + else: + num_units = 1 + if is_proxy_charm: + if element_type == "NS": + num_units = db_nsr.get("config-units") or 1 + elif element_type == "VNF": + num_units = db_vnfr.get("config-units") or 1 + elif element_type == "VDU": + for v in db_vnfr["vdur"]: + if vdu_id == v["vdu-id-ref"]: + num_units = v.get("config-units") or 1 + break - await self.n2vc.install_configuration_sw( - ee_id=ee_id, - artifact_path=artifact_path, - db_dict=db_dict, - config=config, - num_units=num_units - ) + await self.n2vc.install_configuration_sw( + ee_id=ee_id, + artifact_path=artifact_path, + db_dict=db_dict, + config=config, + num_units=num_units + ) # write in db flag of configuration_sw already installed self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}) -- 2.25.1 From 62b2ee49c25be96be8cb6ef97224f598ae7094be Mon Sep 17 00:00:00 2001 From: lloretgalleg Date: Tue, 30 Jun 2020 11:26:48 +0000 Subject: [PATCH 13/16] Adding grpc library Change-Id: I8577bf19b895f543518ff1eacb28aa1c757391b1 Signed-off-by: lloretgalleg --- Dockerfile.local | 2 + debian/python3-osm-lcm.postinst | 1 + osm_lcm/frontend_grpc.py | 74 ++++++++++ osm_lcm/frontend_pb2.py | 250 ++++++++++++++++++++++++++++++++ requirements.txt | 3 + setup.py | 2 + stdeb.cfg | 19 ++- tox.ini | 3 +- 8 files changed, 352 insertions(+), 2 deletions(-) create mode 100644 osm_lcm/frontend_grpc.py create mode 100644 osm_lcm/frontend_pb2.py diff --git a/Dockerfile.local b/Dockerfile.local index a64bf4c..d02aee2 100644 --- a/Dockerfile.local +++ b/Dockerfile.local @@ -49,6 +49,8 @@ RUN git -C /app clone https://osm.etsi.org/gerrit/osm/common.git \ && python3 -m pip install -e /app/common # python3-pymongo python3-yaml pycrypto aiokafka +RUN python3 -m pip install grpcio-tools grpclib + RUN mkdir -p /app/storage/kafka && mkdir -p /app/log diff --git a/debian/python3-osm-lcm.postinst b/debian/python3-osm-lcm.postinst index f66671f..4ee0a2e 100755 --- a/debian/python3-osm-lcm.postinst +++ b/debian/python3-osm-lcm.postinst @@ -22,6 +22,7 @@ echo "POST INSTALL OSM-LCM" # echo "Installing python dependencies via pip..." # python3 -m pip install -U pip # python3 -m pip install --user aiokafka +python3 -m pip install grpcio-tools grpclib #Creation of log folder mkdir -p /var/log/osm diff --git a/osm_lcm/frontend_grpc.py b/osm_lcm/frontend_grpc.py new file mode 100644 index 0000000..c1dd60d --- /dev/null +++ b/osm_lcm/frontend_grpc.py @@ -0,0 +1,74 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +## +# Copyright 2018 Telefonica S.A. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +## +# Generated by the Protocol Buffers compiler. DO NOT EDIT! +# source: osm_lcm/frontend.proto +# plugin: grpclib.plugin.main +import abc +import typing + +import grpclib.const +import grpclib.client +if typing.TYPE_CHECKING: + import grpclib.server + +import osm_lcm.frontend_pb2 + + +class FrontendExecutorBase(abc.ABC): + + @abc.abstractmethod + async def RunPrimitive(self, stream: 'grpclib.server.Stream[osm_lcm.frontend_pb2.PrimitiveRequest, osm_lcm.frontend_pb2.PrimitiveReply]') -> None: + pass + + @abc.abstractmethod + async def GetSshKey(self, stream: 'grpclib.server.Stream[osm_lcm.frontend_pb2.SshKeyRequest, osm_lcm.frontend_pb2.SshKeyReply]') -> None: + pass + + def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]: + return { + '/osm_lcm.FrontendExecutor/RunPrimitive': grpclib.const.Handler( + self.RunPrimitive, + grpclib.const.Cardinality.UNARY_STREAM, + osm_lcm.frontend_pb2.PrimitiveRequest, + osm_lcm.frontend_pb2.PrimitiveReply, + ), + '/osm_lcm.FrontendExecutor/GetSshKey': grpclib.const.Handler( + self.GetSshKey, + grpclib.const.Cardinality.UNARY_UNARY, + osm_lcm.frontend_pb2.SshKeyRequest, + osm_lcm.frontend_pb2.SshKeyReply, + ), + } + + +class FrontendExecutorStub: + + def __init__(self, channel: grpclib.client.Channel) -> None: + self.RunPrimitive = grpclib.client.UnaryStreamMethod( + channel, + '/osm_lcm.FrontendExecutor/RunPrimitive', + osm_lcm.frontend_pb2.PrimitiveRequest, + osm_lcm.frontend_pb2.PrimitiveReply, + ) + self.GetSshKey = grpclib.client.UnaryUnaryMethod( + channel, + '/osm_lcm.FrontendExecutor/GetSshKey', + osm_lcm.frontend_pb2.SshKeyRequest, + osm_lcm.frontend_pb2.SshKeyReply, + ) diff --git a/osm_lcm/frontend_pb2.py b/osm_lcm/frontend_pb2.py new file mode 100644 index 0000000..0c24c73 --- /dev/null +++ b/osm_lcm/frontend_pb2.py @@ -0,0 +1,250 @@ +# -*- coding: utf-8 -*- +## +# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +## +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: osm_lcm/frontend.proto + +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='osm_lcm/frontend.proto', + package='osm_lcm', + syntax='proto3', + serialized_options=b'\n\027com.etsi.osm.lcm.osm_lcmB\014GrpcExecutorP\001\242\002\003OEE', + serialized_pb=b'\n\x15osm_lcm/frontend.proto\x12\x06osm_lcm\"<\n\x10PrimitiveRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0e\n\x06params\x18\x03 \x01(\t\":\n\x0ePrimitiveReply\x12\x0e\n\x06status\x18\x01 \x01(\t\x12\x18\n\x10\x64\x65tailed_message\x18\x02 \x01(\t\"\x0f\n\rSshKeyRequest\"\x1e\n\x0bSshKeyReply\x12\x0f\n\x07message\x18\x01 \x01(\t2\x93\x01\n\x10\x46rontendExecutor\x12\x44\n\x0cRunPrimitive\x12\x18.osm_lcm.PrimitiveRequest\x1a\x16.osm_lcm.PrimitiveReply\"\x00\x30\x01\x12\x39\n\tGetSshKey\x12\x15.osm_lcm.SshKeyRequest\x1a\x13.osm_lcm.SshKeyReply\"\x00\x42/\n\x17\x63om.etsi.osm.lcm.osm_lcmB\x0cGrpcExecutorP\x01\xa2\x02\x03OEEb\x06proto3' +) + + + + +_PRIMITIVEREQUEST = _descriptor.Descriptor( + name='PrimitiveRequest', + full_name='osm_lcm.PrimitiveRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='id', full_name='osm_lcm.PrimitiveRequest.id', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='name', full_name='osm_lcm.PrimitiveRequest.name', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='params', full_name='osm_lcm.PrimitiveRequest.params', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=33, + serialized_end=93, +) + + +_PRIMITIVEREPLY = _descriptor.Descriptor( + name='PrimitiveReply', + full_name='osm_lcm.PrimitiveReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='status', full_name='osm_lcm.PrimitiveReply.status', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='detailed_message', full_name='osm_lcm.PrimitiveReply.detailed_message', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=95, + serialized_end=153, +) + + +_SSHKEYREQUEST = _descriptor.Descriptor( + name='SshKeyRequest', + full_name='osm_lcm.SshKeyRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=155, + serialized_end=170, +) + + +_SSHKEYREPLY = _descriptor.Descriptor( + name='SshKeyReply', + full_name='osm_lcm.SshKeyReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='message', full_name='osm_lcm.SshKeyReply.message', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=172, + serialized_end=202, +) + +DESCRIPTOR.message_types_by_name['PrimitiveRequest'] = _PRIMITIVEREQUEST +DESCRIPTOR.message_types_by_name['PrimitiveReply'] = _PRIMITIVEREPLY +DESCRIPTOR.message_types_by_name['SshKeyRequest'] = _SSHKEYREQUEST +DESCRIPTOR.message_types_by_name['SshKeyReply'] = _SSHKEYREPLY +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +PrimitiveRequest = _reflection.GeneratedProtocolMessageType('PrimitiveRequest', (_message.Message,), { + 'DESCRIPTOR' : _PRIMITIVEREQUEST, + '__module__' : 'osm_lcm.frontend_pb2' + # @@protoc_insertion_point(class_scope:osm_lcm.PrimitiveRequest) + }) +_sym_db.RegisterMessage(PrimitiveRequest) + +PrimitiveReply = _reflection.GeneratedProtocolMessageType('PrimitiveReply', (_message.Message,), { + 'DESCRIPTOR' : _PRIMITIVEREPLY, + '__module__' : 'osm_lcm.frontend_pb2' + # @@protoc_insertion_point(class_scope:osm_lcm.PrimitiveReply) + }) +_sym_db.RegisterMessage(PrimitiveReply) + +SshKeyRequest = _reflection.GeneratedProtocolMessageType('SshKeyRequest', (_message.Message,), { + 'DESCRIPTOR' : _SSHKEYREQUEST, + '__module__' : 'osm_lcm.frontend_pb2' + # @@protoc_insertion_point(class_scope:osm_lcm.SshKeyRequest) + }) +_sym_db.RegisterMessage(SshKeyRequest) + +SshKeyReply = _reflection.GeneratedProtocolMessageType('SshKeyReply', (_message.Message,), { + 'DESCRIPTOR' : _SSHKEYREPLY, + '__module__' : 'osm_lcm.frontend_pb2' + # @@protoc_insertion_point(class_scope:osm_lcm.SshKeyReply) + }) +_sym_db.RegisterMessage(SshKeyReply) + + +DESCRIPTOR._options = None + +_FRONTENDEXECUTOR = _descriptor.ServiceDescriptor( + name='FrontendExecutor', + full_name='osm_lcm.FrontendExecutor', + file=DESCRIPTOR, + index=0, + serialized_options=None, + serialized_start=205, + serialized_end=352, + methods=[ + _descriptor.MethodDescriptor( + name='RunPrimitive', + full_name='osm_lcm.FrontendExecutor.RunPrimitive', + index=0, + containing_service=None, + input_type=_PRIMITIVEREQUEST, + output_type=_PRIMITIVEREPLY, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='GetSshKey', + full_name='osm_lcm.FrontendExecutor.GetSshKey', + index=1, + containing_service=None, + input_type=_SSHKEYREQUEST, + output_type=_SSHKEYREPLY, + serialized_options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_FRONTENDEXECUTOR) + +DESCRIPTOR.services_by_name['FrontendExecutor'] = _FRONTENDEXECUTOR + +# @@protoc_insertion_point(module_scope) diff --git a/requirements.txt b/requirements.txt index a1b035b..32a09a9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,5 +15,8 @@ pyyaml aiohttp>=2.3.10 jinja2 +grpcio-tools +grpclib git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common git+https://osm.etsi.org/gerrit/osm/N2VC.git#egg=n2vc + diff --git a/setup.py b/setup.py index 9ba333b..aa83fd6 100644 --- a/setup.py +++ b/setup.py @@ -56,6 +56,8 @@ setup( 'osm-common', 'n2vc', 'jinja2', + 'grpcio-tools', + 'grpclib', # TODO this is version installed by 'apt python3-aiohttp' on Ubuntu Sserver 14.04 # version installed by pip 3.3.2 is not compatible. Code should be migrated to this version and use pip3 ], diff --git a/stdeb.cfg b/stdeb.cfg index 36d1fb0..3e222e5 100644 --- a/stdeb.cfg +++ b/stdeb.cfg @@ -1,3 +1,20 @@ +## +# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +## [DEFAULT] X-Python3-Version : >= 3.5 -Depends3: python3-osm-common, python3-n2vc, python3-yaml, python3-aiohttp, python3-jinja2 +Depends3: python3-osm-common, python3-n2vc, python3-yaml, python3-aiohttp, python3-jinja2, python3-pip diff --git a/tox.ini b/tox.ini index de33e96..24b770f 100644 --- a/tox.ini +++ b/tox.ini @@ -27,7 +27,8 @@ commands=python3 -m unittest discover -v basepython = python3 deps = flake8 commands = flake8 osm_lcm --max-line-length 120 \ - --exclude .svn,CVS,.gz,.git,__pycache__,.tox,local,temp --ignore W291,W293,E226,W504 + --exclude .svn,CVS,.gz,.git,__pycache__,.tox,local,temp,frontend_grpc.py,frontend_pb2.py \ + --ignore W291,W293,E226,W504 [testenv:unittest] basepython = python3 -- 2.25.1 From c7c6ba88814f41706792cc5070b8a5abe4dc1c78 Mon Sep 17 00:00:00 2001 From: tierno Date: Wed, 1 Jul 2020 07:18:37 +0000 Subject: [PATCH 14/16] change Dockerfile used for devops-stages to ubuntu 18.04 Change-Id: I32393a8056c5b69f07f058425ed5dcd6747ef2ce Signed-off-by: tierno --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index b0e81bb..79c1a50 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,7 +18,7 @@ # Use Dockerfile.local for running osm/LCM in a docker container from source -FROM ubuntu:16.04 +FROM ubuntu:18.04 RUN apt-get update && \ DEBIAN_FRONTEND=noninteractive apt-get --yes install git tox make debhelper wget \ -- 2.25.1 From 302266e50bdeec9f1bf3a5d865993b1aab4ee61f Mon Sep 17 00:00:00 2001 From: lloretgalleg Date: Wed, 1 Jul 2020 08:52:35 +0000 Subject: [PATCH 15/16] Modified grpc client, revised package name Change-Id: I345c66b6251a8f2f70b7cc8b72c434027095e438 Signed-off-by: lloretgalleg --- osm_lcm/frontend_grpc.py | 31 ++++++++++----------- osm_lcm/frontend_pb2.py | 60 ++++++++++++++++++++-------------------- 2 files changed, 45 insertions(+), 46 deletions(-) diff --git a/osm_lcm/frontend_grpc.py b/osm_lcm/frontend_grpc.py index c1dd60d..88308e1 100644 --- a/osm_lcm/frontend_grpc.py +++ b/osm_lcm/frontend_grpc.py @@ -1,20 +1,19 @@ -#!/usr/bin/python3 -# -*- coding: utf-8 -*- - ## -# Copyright 2018 Telefonica S.A. +# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. # -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ## # Generated by the Protocol Buffers compiler. DO NOT EDIT! # source: osm_lcm/frontend.proto @@ -42,13 +41,13 @@ class FrontendExecutorBase(abc.ABC): def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]: return { - '/osm_lcm.FrontendExecutor/RunPrimitive': grpclib.const.Handler( + '/osm_ee.FrontendExecutor/RunPrimitive': grpclib.const.Handler( self.RunPrimitive, grpclib.const.Cardinality.UNARY_STREAM, osm_lcm.frontend_pb2.PrimitiveRequest, osm_lcm.frontend_pb2.PrimitiveReply, ), - '/osm_lcm.FrontendExecutor/GetSshKey': grpclib.const.Handler( + '/osm_ee.FrontendExecutor/GetSshKey': grpclib.const.Handler( self.GetSshKey, grpclib.const.Cardinality.UNARY_UNARY, osm_lcm.frontend_pb2.SshKeyRequest, @@ -62,13 +61,13 @@ class FrontendExecutorStub: def __init__(self, channel: grpclib.client.Channel) -> None: self.RunPrimitive = grpclib.client.UnaryStreamMethod( channel, - '/osm_lcm.FrontendExecutor/RunPrimitive', + '/osm_ee.FrontendExecutor/RunPrimitive', osm_lcm.frontend_pb2.PrimitiveRequest, osm_lcm.frontend_pb2.PrimitiveReply, ) self.GetSshKey = grpclib.client.UnaryUnaryMethod( channel, - '/osm_lcm.FrontendExecutor/GetSshKey', + '/osm_ee.FrontendExecutor/GetSshKey', osm_lcm.frontend_pb2.SshKeyRequest, osm_lcm.frontend_pb2.SshKeyReply, ) diff --git a/osm_lcm/frontend_pb2.py b/osm_lcm/frontend_pb2.py index 0c24c73..3407e73 100644 --- a/osm_lcm/frontend_pb2.py +++ b/osm_lcm/frontend_pb2.py @@ -32,10 +32,10 @@ _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor.FileDescriptor( name='osm_lcm/frontend.proto', - package='osm_lcm', + package='osm_ee', syntax='proto3', - serialized_options=b'\n\027com.etsi.osm.lcm.osm_lcmB\014GrpcExecutorP\001\242\002\003OEE', - serialized_pb=b'\n\x15osm_lcm/frontend.proto\x12\x06osm_lcm\"<\n\x10PrimitiveRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0e\n\x06params\x18\x03 \x01(\t\":\n\x0ePrimitiveReply\x12\x0e\n\x06status\x18\x01 \x01(\t\x12\x18\n\x10\x64\x65tailed_message\x18\x02 \x01(\t\"\x0f\n\rSshKeyRequest\"\x1e\n\x0bSshKeyReply\x12\x0f\n\x07message\x18\x01 \x01(\t2\x93\x01\n\x10\x46rontendExecutor\x12\x44\n\x0cRunPrimitive\x12\x18.osm_lcm.PrimitiveRequest\x1a\x16.osm_lcm.PrimitiveReply\"\x00\x30\x01\x12\x39\n\tGetSshKey\x12\x15.osm_lcm.SshKeyRequest\x1a\x13.osm_lcm.SshKeyReply\"\x00\x42/\n\x17\x63om.etsi.osm.lcm.osm_lcmB\x0cGrpcExecutorP\x01\xa2\x02\x03OEEb\x06proto3' + serialized_options=b'\n\027com.etsi.osm.lcm.osm_eeB\014GrpcExecutorP\001\242\002\003OEE', + serialized_pb=b'\n\x16osm_lcm/frontend.proto\x12\x06osm_ee\"<\n\x10PrimitiveRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x0e\n\x06params\x18\x03 \x01(\t\":\n\x0ePrimitiveReply\x12\x0e\n\x06status\x18\x01 \x01(\t\x12\x18\n\x10\x64\x65tailed_message\x18\x02 \x01(\t\"\x0f\n\rSshKeyRequest\"\x1e\n\x0bSshKeyReply\x12\x0f\n\x07message\x18\x01 \x01(\t2\x93\x01\n\x10\x46rontendExecutor\x12\x44\n\x0cRunPrimitive\x12\x18.osm_ee.PrimitiveRequest\x1a\x16.osm_ee.PrimitiveReply\"\x00\x30\x01\x12\x39\n\tGetSshKey\x12\x15.osm_ee.SshKeyRequest\x1a\x13.osm_ee.SshKeyReply\"\x00\x42/\n\x17\x63om.etsi.osm.lcm.osm_eeB\x0cGrpcExecutorP\x01\xa2\x02\x03OEEb\x06proto3' ) @@ -43,27 +43,27 @@ DESCRIPTOR = _descriptor.FileDescriptor( _PRIMITIVEREQUEST = _descriptor.Descriptor( name='PrimitiveRequest', - full_name='osm_lcm.PrimitiveRequest', + full_name='osm_ee.PrimitiveRequest', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='id', full_name='osm_lcm.PrimitiveRequest.id', index=0, + name='id', full_name='osm_ee.PrimitiveRequest.id', index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='name', full_name='osm_lcm.PrimitiveRequest.name', index=1, + name='name', full_name='osm_ee.PrimitiveRequest.name', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='params', full_name='osm_lcm.PrimitiveRequest.params', index=2, + name='params', full_name='osm_ee.PrimitiveRequest.params', index=2, number=3, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, @@ -81,27 +81,27 @@ _PRIMITIVEREQUEST = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=33, - serialized_end=93, + serialized_start=34, + serialized_end=94, ) _PRIMITIVEREPLY = _descriptor.Descriptor( name='PrimitiveReply', - full_name='osm_lcm.PrimitiveReply', + full_name='osm_ee.PrimitiveReply', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='status', full_name='osm_lcm.PrimitiveReply.status', index=0, + name='status', full_name='osm_ee.PrimitiveReply.status', index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='detailed_message', full_name='osm_lcm.PrimitiveReply.detailed_message', index=1, + name='detailed_message', full_name='osm_ee.PrimitiveReply.detailed_message', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, @@ -119,14 +119,14 @@ _PRIMITIVEREPLY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=95, - serialized_end=153, + serialized_start=96, + serialized_end=154, ) _SSHKEYREQUEST = _descriptor.Descriptor( name='SshKeyRequest', - full_name='osm_lcm.SshKeyRequest', + full_name='osm_ee.SshKeyRequest', filename=None, file=DESCRIPTOR, containing_type=None, @@ -143,20 +143,20 @@ _SSHKEYREQUEST = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=155, - serialized_end=170, + serialized_start=156, + serialized_end=171, ) _SSHKEYREPLY = _descriptor.Descriptor( name='SshKeyReply', - full_name='osm_lcm.SshKeyReply', + full_name='osm_ee.SshKeyReply', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='message', full_name='osm_lcm.SshKeyReply.message', index=0, + name='message', full_name='osm_ee.SshKeyReply.message', index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, @@ -174,8 +174,8 @@ _SSHKEYREPLY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=172, - serialized_end=202, + serialized_start=173, + serialized_end=203, ) DESCRIPTOR.message_types_by_name['PrimitiveRequest'] = _PRIMITIVEREQUEST @@ -187,28 +187,28 @@ _sym_db.RegisterFileDescriptor(DESCRIPTOR) PrimitiveRequest = _reflection.GeneratedProtocolMessageType('PrimitiveRequest', (_message.Message,), { 'DESCRIPTOR' : _PRIMITIVEREQUEST, '__module__' : 'osm_lcm.frontend_pb2' - # @@protoc_insertion_point(class_scope:osm_lcm.PrimitiveRequest) + # @@protoc_insertion_point(class_scope:osm_ee.PrimitiveRequest) }) _sym_db.RegisterMessage(PrimitiveRequest) PrimitiveReply = _reflection.GeneratedProtocolMessageType('PrimitiveReply', (_message.Message,), { 'DESCRIPTOR' : _PRIMITIVEREPLY, '__module__' : 'osm_lcm.frontend_pb2' - # @@protoc_insertion_point(class_scope:osm_lcm.PrimitiveReply) + # @@protoc_insertion_point(class_scope:osm_ee.PrimitiveReply) }) _sym_db.RegisterMessage(PrimitiveReply) SshKeyRequest = _reflection.GeneratedProtocolMessageType('SshKeyRequest', (_message.Message,), { 'DESCRIPTOR' : _SSHKEYREQUEST, '__module__' : 'osm_lcm.frontend_pb2' - # @@protoc_insertion_point(class_scope:osm_lcm.SshKeyRequest) + # @@protoc_insertion_point(class_scope:osm_ee.SshKeyRequest) }) _sym_db.RegisterMessage(SshKeyRequest) SshKeyReply = _reflection.GeneratedProtocolMessageType('SshKeyReply', (_message.Message,), { 'DESCRIPTOR' : _SSHKEYREPLY, '__module__' : 'osm_lcm.frontend_pb2' - # @@protoc_insertion_point(class_scope:osm_lcm.SshKeyReply) + # @@protoc_insertion_point(class_scope:osm_ee.SshKeyReply) }) _sym_db.RegisterMessage(SshKeyReply) @@ -217,16 +217,16 @@ DESCRIPTOR._options = None _FRONTENDEXECUTOR = _descriptor.ServiceDescriptor( name='FrontendExecutor', - full_name='osm_lcm.FrontendExecutor', + full_name='osm_ee.FrontendExecutor', file=DESCRIPTOR, index=0, serialized_options=None, - serialized_start=205, - serialized_end=352, + serialized_start=206, + serialized_end=353, methods=[ _descriptor.MethodDescriptor( name='RunPrimitive', - full_name='osm_lcm.FrontendExecutor.RunPrimitive', + full_name='osm_ee.FrontendExecutor.RunPrimitive', index=0, containing_service=None, input_type=_PRIMITIVEREQUEST, @@ -235,7 +235,7 @@ _FRONTENDEXECUTOR = _descriptor.ServiceDescriptor( ), _descriptor.MethodDescriptor( name='GetSshKey', - full_name='osm_lcm.FrontendExecutor.GetSshKey', + full_name='osm_ee.FrontendExecutor.GetSshKey', index=1, containing_service=None, input_type=_SSHKEYREQUEST, -- 2.25.1 From 1a3a4c95298f6f77e2b60a9f66a8cb43a0823d8f Mon Sep 17 00:00:00 2001 From: lloretgalleg Date: Tue, 30 Jun 2020 13:46:28 +0000 Subject: [PATCH 16/16] Added new helm grpc connector Change-Id: I15b4a8fe28c679130017fc4982066f9a2de243d6 Signed-off-by: lloretgalleg --- osm_lcm/lcm_helm_conn.py | 484 +++++++++++++++++++++++++++++++++++++++ stdeb.cfg | 2 +- tox.ini | 2 +- 3 files changed, 486 insertions(+), 2 deletions(-) create mode 100644 osm_lcm/lcm_helm_conn.py diff --git a/osm_lcm/lcm_helm_conn.py b/osm_lcm/lcm_helm_conn.py new file mode 100644 index 0000000..c7ea476 --- /dev/null +++ b/osm_lcm/lcm_helm_conn.py @@ -0,0 +1,484 @@ +## +# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +## +import functools +import yaml +import asyncio +import socket +import uuid + +from grpclib.client import Channel + +from osm_lcm.frontend_pb2 import PrimitiveRequest +from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply +from osm_lcm.frontend_grpc import FrontendExecutorStub + +from n2vc.n2vc_conn import N2VCConnector +from n2vc.k8s_helm_conn import K8sHelmConnector +from n2vc.exceptions import N2VCBadArgumentsException, N2VCException, N2VCExecutionException + +from osm_lcm.lcm_utils import deep_get + + +def retryer(max_wait_time=60, delay_time=10): + def wrapper(func): + retry_exceptions = ( + ConnectionRefusedError + ) + + @functools.wraps(func) + async def wrapped(*args, **kwargs): + wait_time = max_wait_time + while wait_time > 0: + try: + return await func(*args, **kwargs) + except retry_exceptions: + wait_time = wait_time - delay_time + await asyncio.sleep(delay_time) + continue + else: + return ConnectionRefusedError + return wrapped + return wrapper + + +class LCMHelmConn(N2VCConnector): + _KUBECTL_OSM_NAMESPACE = "osm" + _KUBECTL_OSM_CLUSTER_NAME = "_system-osm-k8s" + _EE_SERVICE_PORT = 50050 + + # Time beetween retries + _EE_RETRY_DELAY = 10 + # Initial max retry time + _MAX_INITIAL_RETRY_TIME = 300 + # Other retry time + _MAX_RETRY_TIME = 30 + + def __init__(self, + db: object, + fs: object, + log: object = None, + loop: object = None, + url: str = None, + username: str = None, + vca_config: dict = None, + on_update_db=None, ): + """ + Initialize EE helm connector. + """ + + # parent class constructor + N2VCConnector.__init__( + self, + db=db, + fs=fs, + log=log, + loop=loop, + url=url, + username=username, + vca_config=vca_config, + on_update_db=on_update_db, + ) + + self.log.debug("Initialize helm N2VC connector") + + # TODO - Obtain data from configuration + self._ee_service_port = self._EE_SERVICE_PORT + + self._retry_delay = self._EE_RETRY_DELAY + self._max_retry_time = self._MAX_RETRY_TIME + self._initial_retry_time = self._MAX_INITIAL_RETRY_TIME + + # initialize helm connector + self._k8sclusterhelm = K8sHelmConnector( + kubectl_command=self.vca_config.get("kubectlpath"), + helm_command=self.vca_config.get("helmpath"), + fs=self.fs, + log=self.log, + db=self.db, + on_update_db=None, + ) + + self._system_cluster_id = None + self.log.info("Helm N2VC connector initialized") + + # TODO - ¿reuse_ee_id? + async def create_execution_environment(self, + namespace: str, + db_dict: dict, + reuse_ee_id: str = None, + progress_timeout: float = None, + total_timeout: float = None, + artifact_path: str = None, + vca_type: str = None) -> (str, dict): + """ + Creates a new helm execution environment deploying the helm-chat indicated in the + attifact_path + :param str namespace: This param is not used, all helm charts are deployed in the osm + system namespace + :param dict db_dict: where to write to database when the status changes. + It contains a dictionary with {collection: str, filter: {}, path: str}, + e.g. {collection: "nsrs", filter: {_id: , path: + "_admin.deployed.VCA.3"} + :param str reuse_ee_id: ee id from an older execution. TODO - right now this params is not used + :param float progress_timeout: + :param float total_timeout: + :param str artifact_path path of package content + :param str vca_type Type of vca, not used as assumed of type helm + :returns str, dict: id of the new execution environment including namespace.helm_id + and credentials object set to None as all credentials should be osm kubernetes .kubeconfig + """ + + self.log.info( + "create_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, " + "reuse_ee_id: {}".format( + namespace, artifact_path, db_dict, reuse_ee_id) + ) + + # Validate artifact-path is provided + if artifact_path is None or len(artifact_path) == 0: + raise N2VCBadArgumentsException( + message="artifact_path is mandatory", bad_args=["artifact_path"] + ) + + # Validate artifact-path exists + + # remove / in charm path + while artifact_path.find("//") >= 0: + artifact_path = artifact_path.replace("//", "/") + + # check charm path + if self.fs.file_exists(artifact_path): + helm_chart_path = artifact_path + else: + msg = "artifact path does not exist: {}".format(artifact_path) + raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"]) + + if artifact_path.startswith("/"): + full_path = self.fs.path + helm_chart_path + else: + full_path = self.fs.path + "/" + helm_chart_path + + try: + # Call helm conn install + # Obtain system cluster id from database + system_cluster_uuid = self._get_system_cluster_id() + + self.log.debug("install helm chart: {}".format(full_path)) + helm_id = await self._k8sclusterhelm.install(system_cluster_uuid, kdu_model=full_path, + namespace=self._KUBECTL_OSM_NAMESPACE, + db_dict=db_dict, + timeout=progress_timeout) + + ee_id = "{}.{}".format(self._KUBECTL_OSM_NAMESPACE, helm_id) + return ee_id, None + except Exception as e: + self.log.error("Error deploying chart ee: {}".format(e), exc_info=True) + raise N2VCException("Error deploying chart ee: {}".format(e)) + + async def register_execution_environment(self, namespace: str, credentials: dict, db_dict: dict, + progress_timeout: float = None, total_timeout: float = None) -> str: + # nothing to do + pass + + async def install_configuration_sw(self, + ee_id: str, + artifact_path: str, + db_dict: dict, + progress_timeout: float = None, + total_timeout: float = None, + config: dict = None, + ): + # nothing to do + pass + + async def add_relation(self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str): + # nothing to do + pass + + async def remove_relation(self): + # nothing to to + pass + + async def get_status(self, namespace: str, yaml_format: bool = True): + # not used for this connector + pass + + async def get_ee_ssh_public__key(self, ee_id: str, db_dict: dict, progress_timeout: float = None, + total_timeout: float = None) -> str: + """ + Obtains ssh-public key from ee executing GetSShKey method from the ee. + + :param str ee_id: the id of the execution environment returned by + create_execution_environment or register_execution_environment + :param dict db_dict: + :param float progress_timeout: + :param float total_timeout: + :returns: public key of the execution environment + """ + + self.log.info( + "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format( + ee_id, db_dict) + ) + + # check arguments + if ee_id is None or len(ee_id) == 0: + raise N2VCBadArgumentsException( + message="ee_id is mandatory", bad_args=["ee_id"] + ) + + try: + # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes + namespace, helm_id = self._get_ee_id_parts(ee_id) + ip_addr = socket.gethostbyname(helm_id) + + # Obtain ssh_key from the ee, this method will implement retries to allow the ee + # install libraries and start successfully + ssh_key = await self._get_ssh_key(ip_addr) + return ssh_key + except Exception as e: + self.log.error("Error obtaining ee ssh_key: {}".format(e), exc_info=True) + raise N2VCException("Error obtaining ee ssh_ke: {}".format(e)) + + async def exec_primitive(self, ee_id: str, primitive_name: str, params_dict: dict, db_dict: dict = None, + progress_timeout: float = None, total_timeout: float = None) -> str: + """ + Execute a primitive in the execution environment + + :param str ee_id: the one returned by create_execution_environment or + register_execution_environment with the format namespace.helm_id + :param str primitive_name: must be one defined in the software. There is one + called 'config', where, for the proxy case, the 'credentials' of VM are + provided + :param dict params_dict: parameters of the action + :param dict db_dict: where to write into database when the status changes. + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nslcmops", filter: + {_id: , path: "_admin.VCA"} + It will be used to store information about intermediate notifications + :param float progress_timeout: + :param float total_timeout: + :returns str: primitive result, if ok. It raises exceptions in case of fail + """ + + self.log.info("exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format( + ee_id, primitive_name, params_dict, db_dict + )) + + # check arguments + if ee_id is None or len(ee_id) == 0: + raise N2VCBadArgumentsException( + message="ee_id is mandatory", bad_args=["ee_id"] + ) + if primitive_name is None or len(primitive_name) == 0: + raise N2VCBadArgumentsException( + message="action_name is mandatory", bad_args=["action_name"] + ) + if params_dict is None: + params_dict = dict() + + try: + namespace, helm_id = self._get_ee_id_parts(ee_id) + ip_addr = socket.gethostbyname(helm_id) + except Exception as e: + self.log.error("Error getting ee ip ee: {}".format(e)) + raise N2VCException("Error getting ee ip ee: {}".format(e)) + + if primitive_name == "config": + try: + # Execute config primitive, higher timeout to check the case ee is starting + status, detailed_message = await self._execute_config_primitive(ip_addr, params_dict, db_dict=db_dict) + self.log.debug("Executed config primitive ee_id_ {}, status: {}, message: {}".format( + ee_id, status, detailed_message)) + if status != "OK": + self.log.error("Error configuring helm ee, status: {}, message: {}".format( + status, detailed_message)) + raise N2VCExecutionException( + message="Error configuring helm ee_id: {}, status: {}, message: {}: ".format( + ee_id, status, detailed_message + ), + primitive_name=primitive_name, + ) + except Exception as e: + self.log.error("Error configuring helm ee: {}".format(e)) + raise N2VCExecutionException( + message="Error configuring helm ee_id: {}, {}".format( + ee_id, e + ), + primitive_name=primitive_name, + ) + return "CONFIG OK" + else: + try: + # Execute primitive + status, detailed_message = await self._execute_primitive(ip_addr, primitive_name, + params_dict, db_dict=db_dict) + self.log.debug("Executed primitive {} ee_id_ {}, status: {}, message: {}".format( + primitive_name, ee_id, status, detailed_message)) + if status != "OK" and status != "PROCESSING": + self.log.error( + "Execute primitive {} returned not ok status: {}, message: {}".format( + primitive_name, status, detailed_message) + ) + raise N2VCExecutionException( + message="Execute primitive {} returned not ok status: {}, message: {}".format( + primitive_name, status, detailed_message + ), + primitive_name=primitive_name, + ) + except Exception as e: + self.log.error( + "Error executing primitive {}: {}".format(primitive_name, e) + ) + raise N2VCExecutionException( + message="Error executing primitive {} into ee={} : {}".format( + primitive_name, ee_id, e + ), + primitive_name=primitive_name, + ) + return detailed_message + + async def deregister_execution_environments(self): + # nothing to be done + pass + + async def delete_execution_environment(self, ee_id: str, db_dict: dict = None, total_timeout: float = None): + """ + Delete an execution environment + :param str ee_id: id of the execution environment to delete, included namespace.helm_id + :param dict db_dict: where to write into database when the status changes. + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} + :param float total_timeout: + """ + + self.log.info("ee_id: {}".format(ee_id)) + + # check arguments + if ee_id is None: + raise N2VCBadArgumentsException( + message="ee_id is mandatory", bad_args=["ee_id"] + ) + + try: + + # Obtain cluster_uuid + system_cluster_uuid = self._get_system_cluster_id() + + # Get helm_id + namespace, helm_id = self._get_ee_id_parts(ee_id) + + # Uninstall chart + await self._k8sclusterhelm.uninstall(system_cluster_uuid, helm_id) + self.log.info("ee_id: {} deleted".format(ee_id)) + except Exception as e: + self.log.error("Error deleting ee id: {}: {}".format(ee_id, e), exc_info=True) + raise N2VCException("Error deleting ee id {}: {}".format(ee_id, e)) + + async def delete_namespace(self, namespace: str, db_dict: dict = None, total_timeout: float = None): + # method not implemented for this connector, execution environments must be deleted individually + pass + + async def install_k8s_proxy_charm( + self, + charm_name: str, + namespace: str, + artifact_path: str, + db_dict: dict, + progress_timeout: float = None, + total_timeout: float = None, + config: dict = None, + ) -> str: + pass + + @retryer(max_wait_time=_MAX_INITIAL_RETRY_TIME, delay_time=_EE_RETRY_DELAY) + async def _get_ssh_key(self, ip_addr): + channel = Channel(ip_addr, self._ee_service_port) + try: + stub = FrontendExecutorStub(channel) + self.log.debug("get ssh key, ip_addr: {}".format(ip_addr)) + reply: SshKeyReply = await stub.GetSshKey(SshKeyRequest()) + return reply.message + finally: + channel.close() + + @retryer(max_wait_time=_MAX_INITIAL_RETRY_TIME, delay_time=_EE_RETRY_DELAY) + async def _execute_config_primitive(self, ip_addr, params, db_dict=None): + return await self._execute_primitive_internal(ip_addr, "config", params, db_dict=db_dict) + + @retryer(max_wait_time=_MAX_RETRY_TIME, delay_time=_EE_RETRY_DELAY) + async def _execute_primitive(self, ip_addr, primitive_name, params, db_dict=None): + return await self._execute_primitive_internal(ip_addr, primitive_name, params, db_dict=db_dict) + + async def _execute_primitive_internal(self, ip_addr, primitive_name, params, db_dict=None): + + channel = Channel(ip_addr, self._ee_service_port) + try: + stub = FrontendExecutorStub(channel) + async with stub.RunPrimitive.open() as stream: + primitive_id = str(uuid.uuid1()) + result = None + self.log.debug("Execute primitive internal: id:{}, name:{}, params: {}". + format(primitive_id, primitive_name, params)) + await stream.send_message( + PrimitiveRequest(id=primitive_id, name=primitive_name, params=yaml.dump(params)), end=True) + async for reply in stream: + self.log.debug("Received reply: {}".format(reply)) + result = reply + # If db_dict provided write notifs in database + if db_dict: + self._write_op_detailed_status(db_dict, reply.status, reply.detailed_message) + if result: + return reply.status, reply.detailed_message + else: + return "ERROR", "No result received" + finally: + channel.close() + + def _write_op_detailed_status(self, db_dict, status, detailed_message): + + # write ee_id to database: _admin.deployed.VCA.x + try: + the_table = db_dict["collection"] + the_filter = db_dict["filter"] + update_dict = {"detailed-status": "{}: {}".format(status, detailed_message)} + # self.log.debug('Writing ee_id to database: {}'.format(the_path)) + self.db.set_one( + table=the_table, + q_filter=the_filter, + update_dict=update_dict, + fail_on_empty=True, + ) + except asyncio.CancelledError: + raise + except Exception as e: + self.log.error("Error writing detailedStatus to database: {}".format(e)) + + def _get_system_cluster_id(self): + if not self._system_cluster_id: + db_k8cluster = self.db.get_one("k8sclusters", {"name": self._KUBECTL_OSM_CLUSTER_NAME}) + k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart", "id")) + self._system_cluster_id = k8s_hc_id + return self._system_cluster_id + + def _get_ee_id_parts(self, ee_id): + namespace, _, helm_id = ee_id.partition('.') + return namespace, helm_id diff --git a/stdeb.cfg b/stdeb.cfg index 3e222e5..2794a1f 100644 --- a/stdeb.cfg +++ b/stdeb.cfg @@ -16,5 +16,5 @@ # ## [DEFAULT] -X-Python3-Version : >= 3.5 +X-Python3-Version : >= 3.6 Depends3: python3-osm-common, python3-n2vc, python3-yaml, python3-aiohttp, python3-jinja2, python3-pip diff --git a/tox.ini b/tox.ini index 24b770f..65a62e5 100644 --- a/tox.ini +++ b/tox.ini @@ -24,7 +24,7 @@ deps = -r{toxinidir}/test-requirements.txt commands=python3 -m unittest discover -v [testenv:flake8] -basepython = python3 +basepython = python3.6 deps = flake8 commands = flake8 osm_lcm --max-line-length 120 \ --exclude .svn,CVS,.gz,.git,__pycache__,.tox,local,temp,frontend_grpc.py,frontend_pb2.py \ -- 2.25.1