X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm_helm_conn.py;h=27c330f2a385e2c2e8e3f3a2a6ce8065a39f31e2;hb=a5ae90b046fc9760c542832e4543d4c4790f869a;hp=c7ea476a8f9679dec0397249f7fdbd7e6120da73;hpb=1a3a4c95298f6f77e2b60a9f66a8cb43a0823d8f;p=osm%2FLCM.git diff --git a/osm_lcm/lcm_helm_conn.py b/osm_lcm/lcm_helm_conn.py index c7ea476..27c330f 100644 --- a/osm_lcm/lcm_helm_conn.py +++ b/osm_lcm/lcm_helm_conn.py @@ -20,21 +20,27 @@ import yaml import asyncio import socket import uuid +import os from grpclib.client import Channel from osm_lcm.frontend_pb2 import PrimitiveRequest from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply from osm_lcm.frontend_grpc import FrontendExecutorStub +from osm_lcm.lcm_utils import LcmBase + +from osm_lcm.data_utils.database.database import Database +from osm_lcm.data_utils.filesystem.filesystem import Filesystem from n2vc.n2vc_conn import N2VCConnector from n2vc.k8s_helm_conn import K8sHelmConnector +from n2vc.k8s_helm3_conn import K8sHelm3Connector from n2vc.exceptions import N2VCBadArgumentsException, N2VCException, N2VCExecutionException from osm_lcm.lcm_utils import deep_get -def retryer(max_wait_time=60, delay_time=10): +def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"): def wrapper(func): retry_exceptions = ( ConnectionRefusedError @@ -42,6 +48,17 @@ def retryer(max_wait_time=60, delay_time=10): @functools.wraps(func) async def wrapped(*args, **kwargs): + # default values for wait time and delay_time + delay_time = 10 + max_wait_time = 300 + + # obtain arguments from variable names + self = args[0] + if self.__dict__.get(max_wait_time_var): + max_wait_time = self.__dict__.get(max_wait_time_var) + if self.__dict__.get(delay_time_var): + delay_time = self.__dict__.get(delay_time_var) + wait_time = max_wait_time while wait_time > 0: try: @@ -56,21 +73,19 @@ def retryer(max_wait_time=60, delay_time=10): return wrapper -class LCMHelmConn(N2VCConnector): +class LCMHelmConn(N2VCConnector, LcmBase): _KUBECTL_OSM_NAMESPACE = "osm" _KUBECTL_OSM_CLUSTER_NAME = "_system-osm-k8s" _EE_SERVICE_PORT = 50050 - # Time beetween retries - _EE_RETRY_DELAY = 10 # Initial max retry time - _MAX_INITIAL_RETRY_TIME = 300 - # Other retry time + _MAX_INITIAL_RETRY_TIME = 600 + # Max retry time for normal operations _MAX_RETRY_TIME = 30 + # Time beetween retries, retry time after a connection error is raised + _EE_RETRY_DELAY = 10 def __init__(self, - db: object, - fs: object, log: object = None, loop: object = None, url: str = None, @@ -81,33 +96,58 @@ class LCMHelmConn(N2VCConnector): Initialize EE helm connector. """ + self.db = Database().instance.db + self.fs = Filesystem().instance.fs + # parent class constructor N2VCConnector.__init__( self, - db=db, - fs=fs, log=log, loop=loop, url=url, username=username, vca_config=vca_config, on_update_db=on_update_db, + db=self.db, + fs=self.fs ) self.log.debug("Initialize helm N2VC connector") + self.log.debug("initial vca_config: {}".format(vca_config)) # TODO - Obtain data from configuration self._ee_service_port = self._EE_SERVICE_PORT self._retry_delay = self._EE_RETRY_DELAY - self._max_retry_time = self._MAX_RETRY_TIME - self._initial_retry_time = self._MAX_INITIAL_RETRY_TIME - # initialize helm connector - self._k8sclusterhelm = K8sHelmConnector( + if self.vca_config and self.vca_config.get("eegrpcinittimeout"): + self._initial_retry_time = self.vca_config.get("eegrpcinittimeout") + self.log.debug("Initial retry time: {}".format(self._initial_retry_time)) + else: + self._initial_retry_time = self._MAX_INITIAL_RETRY_TIME + self.log.debug("Applied default retry time: {}".format(self._initial_retry_time)) + + if self.vca_config and self.vca_config.get("eegrpctimeout"): + self._max_retry_time = self.vca_config.get("eegrpctimeout") + self.log.debug("Retry time: {}".format(self._max_retry_time)) + else: + self._max_retry_time = self._MAX_RETRY_TIME + self.log.debug("Applied default retry time: {}".format(self._max_retry_time)) + + # initialize helm connector for helmv2 and helmv3 + self._k8sclusterhelm2 = K8sHelmConnector( kubectl_command=self.vca_config.get("kubectlpath"), helm_command=self.vca_config.get("helmpath"), fs=self.fs, + db=self.db, + log=self.log, + on_update_db=None, + ) + + self._k8sclusterhelm3 = K8sHelm3Connector( + kubectl_command=self.vca_config.get("kubectlpath"), + helm_command=self.vca_config.get("helm3path"), + fs=self.fs, log=self.log, db=self.db, on_update_db=None, @@ -123,8 +163,10 @@ class LCMHelmConn(N2VCConnector): reuse_ee_id: str = None, progress_timeout: float = None, total_timeout: float = None, + config: dict = None, artifact_path: str = None, - vca_type: str = None) -> (str, dict): + vca_type: str = None, + *kargs, **kwargs) -> (str, dict): """ Creates a new helm execution environment deploying the helm-chat indicated in the attifact_path @@ -137,8 +179,9 @@ class LCMHelmConn(N2VCConnector): :param str reuse_ee_id: ee id from an older execution. TODO - right now this params is not used :param float progress_timeout: :param float total_timeout: - :param str artifact_path path of package content - :param str vca_type Type of vca, not used as assumed of type helm + :param dict config: General variables to instantiate KDU + :param str artifact_path: path of package content + :param str vca_type: Type of vca, must be type helm or helm-v3 :returns str, dict: id of the new execution environment including namespace.helm_id and credentials object set to None as all credentials should be osm kubernetes .kubeconfig """ @@ -155,7 +198,9 @@ class LCMHelmConn(N2VCConnector): message="artifact_path is mandatory", bad_args=["artifact_path"] ) - # Validate artifact-path exists + # Validate artifact-path exists and sync path + from_path = os.path.split(artifact_path)[0] + self.fs.sync(from_path) # remove / in charm path while artifact_path.find("//") >= 0: @@ -173,25 +218,44 @@ class LCMHelmConn(N2VCConnector): else: full_path = self.fs.path + "/" + helm_chart_path + while full_path.find("//") >= 0: + full_path = full_path.replace("//", "/") + try: # Call helm conn install # Obtain system cluster id from database - system_cluster_uuid = self._get_system_cluster_id() + system_cluster_uuid = await self._get_system_cluster_id() + # Add parameter osm if exist to global + if config and config.get("osm"): + if not config.get("global"): + config["global"] = {} + config["global"]["osm"] = config.get("osm") self.log.debug("install helm chart: {}".format(full_path)) - helm_id = await self._k8sclusterhelm.install(system_cluster_uuid, kdu_model=full_path, - namespace=self._KUBECTL_OSM_NAMESPACE, - db_dict=db_dict, - timeout=progress_timeout) + if vca_type == "helm": + helm_id = await self._k8sclusterhelm2.install(system_cluster_uuid, kdu_model=full_path, + namespace=self._KUBECTL_OSM_NAMESPACE, + params=config, + db_dict=db_dict, + timeout=progress_timeout) + else: + helm_id = await self._k8sclusterhelm3.install(system_cluster_uuid, kdu_model=full_path, + namespace=self._KUBECTL_OSM_NAMESPACE, + params=config, + db_dict=db_dict, + timeout=progress_timeout) - ee_id = "{}.{}".format(self._KUBECTL_OSM_NAMESPACE, helm_id) + ee_id = "{}:{}.{}".format(vca_type, self._KUBECTL_OSM_NAMESPACE, helm_id) return ee_id, None + except N2VCException: + raise except Exception as e: self.log.error("Error deploying chart ee: {}".format(e), exc_info=True) raise N2VCException("Error deploying chart ee: {}".format(e)) async def register_execution_environment(self, namespace: str, credentials: dict, db_dict: dict, - progress_timeout: float = None, total_timeout: float = None) -> str: + progress_timeout: float = None, total_timeout: float = None, + *kargs, **kwargs) -> str: # nothing to do pass @@ -202,6 +266,8 @@ class LCMHelmConn(N2VCConnector): progress_timeout: float = None, total_timeout: float = None, config: dict = None, + num_units: int = 1, + vca_type: str = None ): # nothing to do pass @@ -244,7 +310,7 @@ class LCMHelmConn(N2VCConnector): try: # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes - namespace, helm_id = self._get_ee_id_parts(ee_id) + version, namespace, helm_id = self._get_ee_id_parts(ee_id) ip_addr = socket.gethostbyname(helm_id) # Obtain ssh_key from the ee, this method will implement retries to allow the ee @@ -294,7 +360,7 @@ class LCMHelmConn(N2VCConnector): params_dict = dict() try: - namespace, helm_id = self._get_ee_id_parts(ee_id) + version, namespace, helm_id = self._get_ee_id_parts(ee_id) ip_addr = socket.gethostbyname(helm_id) except Exception as e: self.log.error("Error getting ee ip ee: {}".format(e)) @@ -381,14 +447,20 @@ class LCMHelmConn(N2VCConnector): try: # Obtain cluster_uuid - system_cluster_uuid = self._get_system_cluster_id() + system_cluster_uuid = await self._get_system_cluster_id() # Get helm_id - namespace, helm_id = self._get_ee_id_parts(ee_id) + version, namespace, helm_id = self._get_ee_id_parts(ee_id) - # Uninstall chart - await self._k8sclusterhelm.uninstall(system_cluster_uuid, helm_id) + # Uninstall chart, for backward compatibility we must assume that if there is no + # version it is helm-v2 + if version == "helm-v3": + await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id) + else: + await self._k8sclusterhelm2.uninstall(system_cluster_uuid, helm_id) self.log.info("ee_id: {} deleted".format(ee_id)) + except N2VCException: + raise except Exception as e: self.log.error("Error deleting ee id: {}: {}".format(ee_id, e), exc_info=True) raise N2VCException("Error deleting ee id {}: {}".format(ee_id, e)) @@ -406,10 +478,11 @@ class LCMHelmConn(N2VCConnector): progress_timeout: float = None, total_timeout: float = None, config: dict = None, + *kargs, **kwargs ) -> str: pass - @retryer(max_wait_time=_MAX_INITIAL_RETRY_TIME, delay_time=_EE_RETRY_DELAY) + @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay") async def _get_ssh_key(self, ip_addr): channel = Channel(ip_addr, self._ee_service_port) try: @@ -420,13 +493,13 @@ class LCMHelmConn(N2VCConnector): finally: channel.close() - @retryer(max_wait_time=_MAX_INITIAL_RETRY_TIME, delay_time=_EE_RETRY_DELAY) + @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay") async def _execute_config_primitive(self, ip_addr, params, db_dict=None): return await self._execute_primitive_internal(ip_addr, "config", params, db_dict=db_dict) - @retryer(max_wait_time=_MAX_RETRY_TIME, delay_time=_EE_RETRY_DELAY) + @retryer(max_wait_time_var="_max_retry_time", delay_time_var="_retry_delay") async def _execute_primitive(self, ip_addr, primitive_name, params, db_dict=None): - return await self._execute_primitive_internal(ip_addr, primitive_name, params, db_dict=db_dict) + return await self._execute_primitive_internal(ip_addr, primitive_name, params, db_dict=db_dict) async def _execute_primitive_internal(self, ip_addr, primitive_name, params, db_dict=None): @@ -472,13 +545,35 @@ class LCMHelmConn(N2VCConnector): except Exception as e: self.log.error("Error writing detailedStatus to database: {}".format(e)) - def _get_system_cluster_id(self): + async def _get_system_cluster_id(self): if not self._system_cluster_id: db_k8cluster = self.db.get_one("k8sclusters", {"name": self._KUBECTL_OSM_CLUSTER_NAME}) - k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart", "id")) + k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id")) + if not k8s_hc_id: + try: + # backward compatibility for existing clusters that have not been initialized for helm v3 + cluster_id = db_k8cluster.get("_id") + k8s_credentials = yaml.safe_dump(db_k8cluster.get("credentials")) + k8s_hc_id, uninstall_sw = await self._k8sclusterhelm3.init_env(k8s_credentials, + reuse_cluster_uuid=cluster_id) + db_k8scluster_update = {"_admin.helm-chart-v3.error_msg": None, + "_admin.helm-chart-v3.id": k8s_hc_id, + "_admin.helm-chart-v3}.created": uninstall_sw, + "_admin.helm-chart-v3.operationalState": "ENABLED"} + self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update) + except Exception as e: + self.log.error("error initializing helm-v3 cluster: {}".format(str(e))) + raise N2VCException("K8s system cluster '{}' has not been initialized for helm-chart-v3".format( + cluster_id)) self._system_cluster_id = k8s_hc_id return self._system_cluster_id def _get_ee_id_parts(self, ee_id): - namespace, _, helm_id = ee_id.partition('.') - return namespace, helm_id + """ + Parses ee_id stored at database that can be either 'version:namespace.helm_id' or only + namespace.helm_id for backward compatibility + If exists helm version can be helm-v3 or helm (helm-v2 old version) + """ + version, _, part_id = ee_id.rpartition(':') + namespace, _, helm_id = part_id.rpartition('.') + return version, namespace, helm_id