X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm_helm_conn.py;h=904b7d9a1aa0132513fada30379c693f3f3381e6;hb=aae391fc83f3b20e058769ff3356ca4d9965a3b8;hp=146da9d6db28a914f4f76bf76f3be8b2b13d96d5;hpb=588547c4feb0d220d8ab648304da60aac9a98dec;p=osm%2FLCM.git diff --git a/osm_lcm/lcm_helm_conn.py b/osm_lcm/lcm_helm_conn.py index 146da9d..904b7d9 100644 --- a/osm_lcm/lcm_helm_conn.py +++ b/osm_lcm/lcm_helm_conn.py @@ -20,15 +20,18 @@ import yaml import asyncio import socket import uuid +import os from grpclib.client import Channel from osm_lcm.frontend_pb2 import PrimitiveRequest from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply from osm_lcm.frontend_grpc import FrontendExecutorStub +from osm_lcm.lcm_utils import LcmBase from n2vc.n2vc_conn import N2VCConnector from n2vc.k8s_helm_conn import K8sHelmConnector +from n2vc.k8s_helm3_conn import K8sHelm3Connector from n2vc.exceptions import N2VCBadArgumentsException, N2VCException, N2VCExecutionException from osm_lcm.lcm_utils import deep_get @@ -56,7 +59,7 @@ def retryer(max_wait_time=60, delay_time=10): return wrapper -class LCMHelmConn(N2VCConnector): +class LCMHelmConn(N2VCConnector, LcmBase): _KUBECTL_OSM_NAMESPACE = "osm" _KUBECTL_OSM_CLUSTER_NAME = "_system-osm-k8s" _EE_SERVICE_PORT = 50050 @@ -103,8 +106,8 @@ class LCMHelmConn(N2VCConnector): self._max_retry_time = self._MAX_RETRY_TIME self._initial_retry_time = self._MAX_INITIAL_RETRY_TIME - # initialize helm connector - self._k8sclusterhelm = K8sHelmConnector( + # initialize helm connector for helmv2 and helmv3 + self._k8sclusterhelm2 = K8sHelmConnector( kubectl_command=self.vca_config.get("kubectlpath"), helm_command=self.vca_config.get("helmpath"), fs=self.fs, @@ -113,6 +116,15 @@ class LCMHelmConn(N2VCConnector): on_update_db=None, ) + self._k8sclusterhelm3 = K8sHelm3Connector( + kubectl_command=self.vca_config.get("kubectlpath"), + helm_command=self.vca_config.get("helm3path"), + fs=self.fs, + log=self.log, + db=self.db, + on_update_db=None, + ) + self._system_cluster_id = None self.log.info("Helm N2VC connector initialized") @@ -123,8 +135,10 @@ class LCMHelmConn(N2VCConnector): reuse_ee_id: str = None, progress_timeout: float = None, total_timeout: float = None, + config: dict = None, artifact_path: str = None, - vca_type: str = None) -> (str, dict): + vca_type: str = None, + *kargs, **kwargs) -> (str, dict): """ Creates a new helm execution environment deploying the helm-chat indicated in the attifact_path @@ -137,8 +151,9 @@ class LCMHelmConn(N2VCConnector): :param str reuse_ee_id: ee id from an older execution. TODO - right now this params is not used :param float progress_timeout: :param float total_timeout: - :param str artifact_path path of package content - :param str vca_type Type of vca, not used as assumed of type helm + :param dict config: General variables to instantiate KDU + :param str artifact_path: path of package content + :param str vca_type: Type of vca, must be type helm or helm-v3 :returns str, dict: id of the new execution environment including namespace.helm_id and credentials object set to None as all credentials should be osm kubernetes .kubeconfig """ @@ -155,7 +170,9 @@ class LCMHelmConn(N2VCConnector): message="artifact_path is mandatory", bad_args=["artifact_path"] ) - # Validate artifact-path exists + # Validate artifact-path exists and sync path + from_path = os.path.split(artifact_path)[0] + self.fs.sync(from_path) # remove / in charm path while artifact_path.find("//") >= 0: @@ -176,22 +193,38 @@ class LCMHelmConn(N2VCConnector): try: # Call helm conn install # Obtain system cluster id from database - system_cluster_uuid = self._get_system_cluster_id() + system_cluster_uuid = await self._get_system_cluster_id() + # Add parameter osm if exist to global + if config and config.get("osm"): + if not config.get("global"): + config["global"] = {} + config["global"]["osm"] = config.get("osm") self.log.debug("install helm chart: {}".format(full_path)) - helm_id = await self._k8sclusterhelm.install(system_cluster_uuid, kdu_model=full_path, - namespace=self._KUBECTL_OSM_NAMESPACE, - db_dict=db_dict, - timeout=progress_timeout) + if vca_type == "helm": + helm_id = await self._k8sclusterhelm2.install(system_cluster_uuid, kdu_model=full_path, + namespace=self._KUBECTL_OSM_NAMESPACE, + params=config, + db_dict=db_dict, + timeout=progress_timeout) + else: + helm_id = await self._k8sclusterhelm3.install(system_cluster_uuid, kdu_model=full_path, + namespace=self._KUBECTL_OSM_NAMESPACE, + params=config, + db_dict=db_dict, + timeout=progress_timeout) - ee_id = "{}.{}".format(self._KUBECTL_OSM_NAMESPACE, helm_id) + ee_id = "{}:{}.{}".format(vca_type, self._KUBECTL_OSM_NAMESPACE, helm_id) return ee_id, None + except N2VCException: + raise except Exception as e: self.log.error("Error deploying chart ee: {}".format(e), exc_info=True) raise N2VCException("Error deploying chart ee: {}".format(e)) async def register_execution_environment(self, namespace: str, credentials: dict, db_dict: dict, - progress_timeout: float = None, total_timeout: float = None) -> str: + progress_timeout: float = None, total_timeout: float = None, + *kargs, **kwargs) -> str: # nothing to do pass @@ -246,7 +279,7 @@ class LCMHelmConn(N2VCConnector): try: # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes - namespace, helm_id = self._get_ee_id_parts(ee_id) + version, namespace, helm_id = self._get_ee_id_parts(ee_id) ip_addr = socket.gethostbyname(helm_id) # Obtain ssh_key from the ee, this method will implement retries to allow the ee @@ -296,7 +329,7 @@ class LCMHelmConn(N2VCConnector): params_dict = dict() try: - namespace, helm_id = self._get_ee_id_parts(ee_id) + version, namespace, helm_id = self._get_ee_id_parts(ee_id) ip_addr = socket.gethostbyname(helm_id) except Exception as e: self.log.error("Error getting ee ip ee: {}".format(e)) @@ -383,14 +416,20 @@ class LCMHelmConn(N2VCConnector): try: # Obtain cluster_uuid - system_cluster_uuid = self._get_system_cluster_id() + system_cluster_uuid = await self._get_system_cluster_id() # Get helm_id - namespace, helm_id = self._get_ee_id_parts(ee_id) + version, namespace, helm_id = self._get_ee_id_parts(ee_id) - # Uninstall chart - await self._k8sclusterhelm.uninstall(system_cluster_uuid, helm_id) + # Uninstall chart, for backward compatibility we must assume that if there is no + # version it is helm-v2 + if version == "helm-v3": + await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id) + else: + await self._k8sclusterhelm2.uninstall(system_cluster_uuid, helm_id) self.log.info("ee_id: {} deleted".format(ee_id)) + except N2VCException: + raise except Exception as e: self.log.error("Error deleting ee id: {}: {}".format(ee_id, e), exc_info=True) raise N2VCException("Error deleting ee id {}: {}".format(ee_id, e)) @@ -408,6 +447,7 @@ class LCMHelmConn(N2VCConnector): progress_timeout: float = None, total_timeout: float = None, config: dict = None, + *kargs, **kwargs ) -> str: pass @@ -474,13 +514,35 @@ class LCMHelmConn(N2VCConnector): except Exception as e: self.log.error("Error writing detailedStatus to database: {}".format(e)) - def _get_system_cluster_id(self): + async def _get_system_cluster_id(self): if not self._system_cluster_id: db_k8cluster = self.db.get_one("k8sclusters", {"name": self._KUBECTL_OSM_CLUSTER_NAME}) - k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart", "id")) + k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id")) + if not k8s_hc_id: + try: + # backward compatibility for existing clusters that have not been initialized for helm v3 + cluster_id = db_k8cluster.get("_id") + k8s_credentials = yaml.safe_dump(db_k8cluster.get("credentials")) + k8s_hc_id, uninstall_sw = await self._k8sclusterhelm3.init_env(k8s_credentials, + reuse_cluster_uuid=cluster_id) + db_k8scluster_update = {"_admin.helm-chart-v3.error_msg": None, + "_admin.helm-chart-v3.id": k8s_hc_id, + "_admin.helm-chart-v3}.created": uninstall_sw, + "_admin.helm-chart-v3.operationalState": "ENABLED"} + self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update) + except Exception as e: + self.log.error("error initializing helm-v3 cluster: {}".format(str(e))) + raise N2VCException("K8s system cluster '{}' has not been initialized for helm-chart-v3".format( + cluster_id)) self._system_cluster_id = k8s_hc_id return self._system_cluster_id def _get_ee_id_parts(self, ee_id): - namespace, _, helm_id = ee_id.partition('.') - return namespace, helm_id + """ + Parses ee_id stored at database that can be either 'version:namespace.helm_id' or only + namespace.helm_id for backward compatibility + If exists helm version can be helm-v3 or helm (helm-v2 old version) + """ + version, _, part_id = ee_id.rpartition(':') + namespace, _, helm_id = part_id.rpartition('.') + return version, namespace, helm_id