X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm_helm_conn.py;h=40624ade166b867440da791cf82e9d3c5cdb1720;hb=a0c6bafeb1998041e171aa87b2134f69d650a9c0;hp=555613e1030370049ed72a23333c50717e3a7f13;hpb=843adbc77237767f62b198e9cbb5ffb4c7caf17d;p=osm%2FLCM.git diff --git a/osm_lcm/lcm_helm_conn.py b/osm_lcm/lcm_helm_conn.py index 555613e..40624ad 100644 --- a/osm_lcm/lcm_helm_conn.py +++ b/osm_lcm/lcm_helm_conn.py @@ -20,28 +20,47 @@ import yaml import asyncio import socket import uuid +import os from grpclib.client import Channel from osm_lcm.frontend_pb2 import PrimitiveRequest from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply from osm_lcm.frontend_grpc import FrontendExecutorStub +from osm_lcm.lcm_utils import LcmBase + +from osm_lcm.data_utils.database.database import Database +from osm_lcm.data_utils.filesystem.filesystem import Filesystem from n2vc.n2vc_conn import N2VCConnector from n2vc.k8s_helm_conn import K8sHelmConnector -from n2vc.exceptions import N2VCBadArgumentsException, N2VCException, N2VCExecutionException +from n2vc.k8s_helm3_conn import K8sHelm3Connector +from n2vc.exceptions import ( + N2VCBadArgumentsException, + N2VCException, + N2VCExecutionException, +) from osm_lcm.lcm_utils import deep_get -def retryer(max_wait_time=60, delay_time=10): +def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"): def wrapper(func): - retry_exceptions = ( - ConnectionRefusedError - ) + retry_exceptions = ConnectionRefusedError @functools.wraps(func) async def wrapped(*args, **kwargs): + # default values for wait time and delay_time + delay_time = 10 + max_wait_time = 300 + + # obtain arguments from variable names + self = args[0] + if self.__dict__.get(max_wait_time_var): + max_wait_time = self.__dict__.get(max_wait_time_var) + if self.__dict__.get(delay_time_var): + delay_time = self.__dict__.get(delay_time_var) + wait_time = max_wait_time while wait_time > 0: try: @@ -52,62 +71,84 @@ def retryer(max_wait_time=60, delay_time=10): continue else: return ConnectionRefusedError + return wrapped + return wrapper -class LCMHelmConn(N2VCConnector): +class LCMHelmConn(N2VCConnector, LcmBase): _KUBECTL_OSM_NAMESPACE = "osm" _KUBECTL_OSM_CLUSTER_NAME = "_system-osm-k8s" _EE_SERVICE_PORT = 50050 - # Time beetween retries - _EE_RETRY_DELAY = 10 # Initial max retry time - _MAX_INITIAL_RETRY_TIME = 300 - # Other retry time + _MAX_INITIAL_RETRY_TIME = 600 + # Max retry time for normal operations _MAX_RETRY_TIME = 30 + # Time beetween retries, retry time after a connection error is raised + _EE_RETRY_DELAY = 10 - def __init__(self, - db: object, - fs: object, - log: object = None, - loop: object = None, - url: str = None, - username: str = None, - vca_config: dict = None, - on_update_db=None, ): + def __init__( + self, + log: object = None, + loop: object = None, + vca_config: dict = None, + on_update_db=None, + ): """ Initialize EE helm connector. """ + self.db = Database().instance.db + self.fs = Filesystem().instance.fs + # parent class constructor N2VCConnector.__init__( - self, - db=db, - fs=fs, - log=log, - loop=loop, - url=url, - username=username, - vca_config=vca_config, - on_update_db=on_update_db, + self, log=log, loop=loop, on_update_db=on_update_db, db=self.db, fs=self.fs ) + self.vca_config = vca_config self.log.debug("Initialize helm N2VC connector") + self.log.debug("initial vca_config: {}".format(vca_config)) # TODO - Obtain data from configuration self._ee_service_port = self._EE_SERVICE_PORT self._retry_delay = self._EE_RETRY_DELAY - self._max_retry_time = self._MAX_RETRY_TIME - self._initial_retry_time = self._MAX_INITIAL_RETRY_TIME - # initialize helm connector - self._k8sclusterhelm = K8sHelmConnector( + if self.vca_config and self.vca_config.get("eegrpcinittimeout"): + self._initial_retry_time = self.vca_config.get("eegrpcinittimeout") + self.log.debug("Initial retry time: {}".format(self._initial_retry_time)) + else: + self._initial_retry_time = self._MAX_INITIAL_RETRY_TIME + self.log.debug( + "Applied default retry time: {}".format(self._initial_retry_time) + ) + + if self.vca_config and self.vca_config.get("eegrpctimeout"): + self._max_retry_time = self.vca_config.get("eegrpctimeout") + self.log.debug("Retry time: {}".format(self._max_retry_time)) + else: + self._max_retry_time = self._MAX_RETRY_TIME + self.log.debug( + "Applied default retry time: {}".format(self._max_retry_time) + ) + + # initialize helm connector for helmv2 and helmv3 + self._k8sclusterhelm2 = K8sHelmConnector( kubectl_command=self.vca_config.get("kubectlpath"), helm_command=self.vca_config.get("helmpath"), fs=self.fs, + db=self.db, + log=self.log, + on_update_db=None, + ) + + self._k8sclusterhelm3 = K8sHelm3Connector( + kubectl_command=self.vca_config.get("kubectlpath"), + helm_command=self.vca_config.get("helm3path"), + fs=self.fs, log=self.log, db=self.db, on_update_db=None, @@ -117,15 +158,19 @@ class LCMHelmConn(N2VCConnector): self.log.info("Helm N2VC connector initialized") # TODO - ¿reuse_ee_id? - async def create_execution_environment(self, - namespace: str, - db_dict: dict, - reuse_ee_id: str = None, - progress_timeout: float = None, - total_timeout: float = None, - config: dict = None, - artifact_path: str = None, - vca_type: str = None) -> (str, dict): + async def create_execution_environment( + self, + namespace: str, + db_dict: dict, + reuse_ee_id: str = None, + progress_timeout: float = None, + total_timeout: float = None, + config: dict = None, + artifact_path: str = None, + vca_type: str = None, + *kargs, + **kwargs, + ) -> (str, dict): """ Creates a new helm execution environment deploying the helm-chat indicated in the attifact_path @@ -140,15 +185,14 @@ class LCMHelmConn(N2VCConnector): :param float total_timeout: :param dict config: General variables to instantiate KDU :param str artifact_path: path of package content - :param str vca_type: Type of vca, not used as assumed of type helm + :param str vca_type: Type of vca, must be type helm or helm-v3 :returns str, dict: id of the new execution environment including namespace.helm_id and credentials object set to None as all credentials should be osm kubernetes .kubeconfig """ self.log.info( "create_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, " - "reuse_ee_id: {}".format( - namespace, artifact_path, db_dict, reuse_ee_id) + "reuse_ee_id: {}".format(namespace, artifact_path, db_dict, reuse_ee_id) ) # Validate artifact-path is provided @@ -157,7 +201,9 @@ class LCMHelmConn(N2VCConnector): message="artifact_path is mandatory", bad_args=["artifact_path"] ) - # Validate artifact-path exists + # Validate artifact-path exists and sync path + from_path = os.path.split(artifact_path)[0] + self.fs.sync(from_path) # remove / in charm path while artifact_path.find("//") >= 0: @@ -175,10 +221,13 @@ class LCMHelmConn(N2VCConnector): else: full_path = self.fs.path + "/" + helm_chart_path + while full_path.find("//") >= 0: + full_path = full_path.replace("//", "/") + try: # Call helm conn install # Obtain system cluster id from database - system_cluster_uuid = self._get_system_cluster_id() + system_cluster_uuid = await self._get_system_cluster_id() # Add parameter osm if exist to global if config and config.get("osm"): if not config.get("global"): @@ -186,13 +235,36 @@ class LCMHelmConn(N2VCConnector): config["global"]["osm"] = config.get("osm") self.log.debug("install helm chart: {}".format(full_path)) - helm_id = await self._k8sclusterhelm.install(system_cluster_uuid, kdu_model=full_path, - namespace=self._KUBECTL_OSM_NAMESPACE, - params=config, - db_dict=db_dict, - timeout=progress_timeout) + if vca_type == "helm": + helm_id = self._k8sclusterhelm2.generate_kdu_instance_name( + db_dict=db_dict, + kdu_model=full_path, + ) + await self._k8sclusterhelm2.install( + system_cluster_uuid, + kdu_model=full_path, + kdu_instance=helm_id, + namespace=self._KUBECTL_OSM_NAMESPACE, + params=config, + db_dict=db_dict, + timeout=progress_timeout, + ) + else: + helm_id = self._k8sclusterhelm2.generate_kdu_instance_name( + db_dict=db_dict, + kdu_model=full_path, + ) + await self._k8sclusterhelm3.install( + system_cluster_uuid, + kdu_model=full_path, + kdu_instance=helm_id, + namespace=self._KUBECTL_OSM_NAMESPACE, + params=config, + db_dict=db_dict, + timeout=progress_timeout, + ) - ee_id = "{}.{}".format(self._KUBECTL_OSM_NAMESPACE, helm_id) + ee_id = "{}:{}.{}".format(vca_type, self._KUBECTL_OSM_NAMESPACE, helm_id) return ee_id, None except N2VCException: raise @@ -200,25 +272,24 @@ class LCMHelmConn(N2VCConnector): self.log.error("Error deploying chart ee: {}".format(e), exc_info=True) raise N2VCException("Error deploying chart ee: {}".format(e)) - async def register_execution_environment(self, namespace: str, credentials: dict, db_dict: dict, - progress_timeout: float = None, total_timeout: float = None) -> str: + async def register_execution_environment( + self, + namespace: str, + credentials: dict, + db_dict: dict, + progress_timeout: float = None, + total_timeout: float = None, + *kargs, + **kwargs, + ) -> str: # nothing to do pass - async def install_configuration_sw(self, - ee_id: str, - artifact_path: str, - db_dict: dict, - progress_timeout: float = None, - total_timeout: float = None, - config: dict = None, - num_units: int = 1, - vca_type: str = None - ): + async def install_configuration_sw(self, *args, **kwargs): # nothing to do pass - async def add_relation(self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str): + async def add_relation(self, *args, **kwargs): # nothing to do pass @@ -226,12 +297,18 @@ class LCMHelmConn(N2VCConnector): # nothing to to pass - async def get_status(self, namespace: str, yaml_format: bool = True): + async def get_status(self, *args, **kwargs): # not used for this connector pass - async def get_ee_ssh_public__key(self, ee_id: str, db_dict: dict, progress_timeout: float = None, - total_timeout: float = None) -> str: + async def get_ee_ssh_public__key( + self, + ee_id: str, + db_dict: dict, + progress_timeout: float = None, + total_timeout: float = None, + **kwargs, + ) -> str: """ Obtains ssh-public key from ee executing GetSShKey method from the ee. @@ -244,8 +321,7 @@ class LCMHelmConn(N2VCConnector): """ self.log.info( - "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format( - ee_id, db_dict) + "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(ee_id, db_dict) ) # check arguments @@ -256,7 +332,7 @@ class LCMHelmConn(N2VCConnector): try: # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes - namespace, helm_id = self._get_ee_id_parts(ee_id) + version, namespace, helm_id = self._get_ee_id_parts(ee_id) ip_addr = socket.gethostbyname(helm_id) # Obtain ssh_key from the ee, this method will implement retries to allow the ee @@ -267,8 +343,16 @@ class LCMHelmConn(N2VCConnector): self.log.error("Error obtaining ee ssh_key: {}".format(e), exc_info=True) raise N2VCException("Error obtaining ee ssh_ke: {}".format(e)) - async def exec_primitive(self, ee_id: str, primitive_name: str, params_dict: dict, db_dict: dict = None, - progress_timeout: float = None, total_timeout: float = None) -> str: + async def exec_primitive( + self, + ee_id: str, + primitive_name: str, + params_dict: dict, + db_dict: dict = None, + progress_timeout: float = None, + total_timeout: float = None, + **kwargs, + ) -> str: """ Execute a primitive in the execution environment @@ -289,9 +373,11 @@ class LCMHelmConn(N2VCConnector): :returns str: primitive result, if ok. It raises exceptions in case of fail """ - self.log.info("exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format( - ee_id, primitive_name, params_dict, db_dict - )) + self.log.info( + "exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format( + ee_id, primitive_name, params_dict, db_dict + ) + ) # check arguments if ee_id is None or len(ee_id) == 0: @@ -306,7 +392,7 @@ class LCMHelmConn(N2VCConnector): params_dict = dict() try: - namespace, helm_id = self._get_ee_id_parts(ee_id) + version, namespace, helm_id = self._get_ee_id_parts(ee_id) ip_addr = socket.gethostbyname(helm_id) except Exception as e: self.log.error("Error getting ee ip ee: {}".format(e)) @@ -315,12 +401,20 @@ class LCMHelmConn(N2VCConnector): if primitive_name == "config": try: # Execute config primitive, higher timeout to check the case ee is starting - status, detailed_message = await self._execute_config_primitive(ip_addr, params_dict, db_dict=db_dict) - self.log.debug("Executed config primitive ee_id_ {}, status: {}, message: {}".format( - ee_id, status, detailed_message)) + status, detailed_message = await self._execute_config_primitive( + ip_addr, params_dict, db_dict=db_dict + ) + self.log.debug( + "Executed config primitive ee_id_ {}, status: {}, message: {}".format( + ee_id, status, detailed_message + ) + ) if status != "OK": - self.log.error("Error configuring helm ee, status: {}, message: {}".format( - status, detailed_message)) + self.log.error( + "Error configuring helm ee, status: {}, message: {}".format( + status, detailed_message + ) + ) raise N2VCExecutionException( message="Error configuring helm ee_id: {}, status: {}, message: {}: ".format( ee_id, status, detailed_message @@ -330,23 +424,26 @@ class LCMHelmConn(N2VCConnector): except Exception as e: self.log.error("Error configuring helm ee: {}".format(e)) raise N2VCExecutionException( - message="Error configuring helm ee_id: {}, {}".format( - ee_id, e - ), + message="Error configuring helm ee_id: {}, {}".format(ee_id, e), primitive_name=primitive_name, ) return "CONFIG OK" else: try: # Execute primitive - status, detailed_message = await self._execute_primitive(ip_addr, primitive_name, - params_dict, db_dict=db_dict) - self.log.debug("Executed primitive {} ee_id_ {}, status: {}, message: {}".format( - primitive_name, ee_id, status, detailed_message)) + status, detailed_message = await self._execute_primitive( + ip_addr, primitive_name, params_dict, db_dict=db_dict + ) + self.log.debug( + "Executed primitive {} ee_id_ {}, status: {}, message: {}".format( + primitive_name, ee_id, status, detailed_message + ) + ) if status != "OK" and status != "PROCESSING": self.log.error( "Execute primitive {} returned not ok status: {}, message: {}".format( - primitive_name, status, detailed_message) + primitive_name, status, detailed_message + ) ) raise N2VCExecutionException( message="Execute primitive {} returned not ok status: {}, message: {}".format( @@ -370,7 +467,13 @@ class LCMHelmConn(N2VCConnector): # nothing to be done pass - async def delete_execution_environment(self, ee_id: str, db_dict: dict = None, total_timeout: float = None): + async def delete_execution_environment( + self, + ee_id: str, + db_dict: dict = None, + total_timeout: float = None, + **kwargs, + ): """ Delete an execution environment :param str ee_id: id of the execution environment to delete, included namespace.helm_id @@ -393,21 +496,29 @@ class LCMHelmConn(N2VCConnector): try: # Obtain cluster_uuid - system_cluster_uuid = self._get_system_cluster_id() + system_cluster_uuid = await self._get_system_cluster_id() # Get helm_id - namespace, helm_id = self._get_ee_id_parts(ee_id) + version, namespace, helm_id = self._get_ee_id_parts(ee_id) - # Uninstall chart - await self._k8sclusterhelm.uninstall(system_cluster_uuid, helm_id) + # Uninstall chart, for backward compatibility we must assume that if there is no + # version it is helm-v2 + if version == "helm-v3": + await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id) + else: + await self._k8sclusterhelm2.uninstall(system_cluster_uuid, helm_id) self.log.info("ee_id: {} deleted".format(ee_id)) except N2VCException: raise except Exception as e: - self.log.error("Error deleting ee id: {}: {}".format(ee_id, e), exc_info=True) + self.log.error( + "Error deleting ee id: {}: {}".format(ee_id, e), exc_info=True + ) raise N2VCException("Error deleting ee id {}: {}".format(ee_id, e)) - async def delete_namespace(self, namespace: str, db_dict: dict = None, total_timeout: float = None): + async def delete_namespace( + self, namespace: str, db_dict: dict = None, total_timeout: float = None + ): # method not implemented for this connector, execution environments must be deleted individually pass @@ -420,10 +531,12 @@ class LCMHelmConn(N2VCConnector): progress_timeout: float = None, total_timeout: float = None, config: dict = None, + *kargs, + **kwargs, ) -> str: pass - @retryer(max_wait_time=_MAX_INITIAL_RETRY_TIME, delay_time=_EE_RETRY_DELAY) + @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay") async def _get_ssh_key(self, ip_addr): channel = Channel(ip_addr, self._ee_service_port) try: @@ -434,15 +547,21 @@ class LCMHelmConn(N2VCConnector): finally: channel.close() - @retryer(max_wait_time=_MAX_INITIAL_RETRY_TIME, delay_time=_EE_RETRY_DELAY) + @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay") async def _execute_config_primitive(self, ip_addr, params, db_dict=None): - return await self._execute_primitive_internal(ip_addr, "config", params, db_dict=db_dict) + return await self._execute_primitive_internal( + ip_addr, "config", params, db_dict=db_dict + ) - @retryer(max_wait_time=_MAX_RETRY_TIME, delay_time=_EE_RETRY_DELAY) + @retryer(max_wait_time_var="_max_retry_time", delay_time_var="_retry_delay") async def _execute_primitive(self, ip_addr, primitive_name, params, db_dict=None): - return await self._execute_primitive_internal(ip_addr, primitive_name, params, db_dict=db_dict) + return await self._execute_primitive_internal( + ip_addr, primitive_name, params, db_dict=db_dict + ) - async def _execute_primitive_internal(self, ip_addr, primitive_name, params, db_dict=None): + async def _execute_primitive_internal( + self, ip_addr, primitive_name, params, db_dict=None + ): channel = Channel(ip_addr, self._ee_service_port) try: @@ -450,16 +569,25 @@ class LCMHelmConn(N2VCConnector): async with stub.RunPrimitive.open() as stream: primitive_id = str(uuid.uuid1()) result = None - self.log.debug("Execute primitive internal: id:{}, name:{}, params: {}". - format(primitive_id, primitive_name, params)) + self.log.debug( + "Execute primitive internal: id:{}, name:{}, params: {}".format( + primitive_id, primitive_name, params + ) + ) await stream.send_message( - PrimitiveRequest(id=primitive_id, name=primitive_name, params=yaml.dump(params)), end=True) + PrimitiveRequest( + id=primitive_id, name=primitive_name, params=yaml.dump(params) + ), + end=True, + ) async for reply in stream: self.log.debug("Received reply: {}".format(reply)) result = reply # If db_dict provided write notifs in database if db_dict: - self._write_op_detailed_status(db_dict, reply.status, reply.detailed_message) + self._write_op_detailed_status( + db_dict, reply.status, reply.detailed_message + ) if result: return reply.status, reply.detailed_message else: @@ -486,17 +614,45 @@ class LCMHelmConn(N2VCConnector): except Exception as e: self.log.error("Error writing detailedStatus to database: {}".format(e)) - def _get_system_cluster_id(self): + async def _get_system_cluster_id(self): if not self._system_cluster_id: - db_k8cluster = self.db.get_one("k8sclusters", {"name": self._KUBECTL_OSM_CLUSTER_NAME}) - k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart", "id")) + db_k8cluster = self.db.get_one( + "k8sclusters", {"name": self._KUBECTL_OSM_CLUSTER_NAME} + ) + k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id")) if not k8s_hc_id: - self.log.error("osm system cluster has not been properly initialized for helm connector, " - "helm-chart id is not defined") - raise N2VCException("osm system cluster has not been properly initialized for helm connector") + try: + # backward compatibility for existing clusters that have not been initialized for helm v3 + cluster_id = db_k8cluster.get("_id") + k8s_credentials = yaml.safe_dump(db_k8cluster.get("credentials")) + k8s_hc_id, uninstall_sw = await self._k8sclusterhelm3.init_env( + k8s_credentials, reuse_cluster_uuid=cluster_id + ) + db_k8scluster_update = { + "_admin.helm-chart-v3.error_msg": None, + "_admin.helm-chart-v3.id": k8s_hc_id, + "_admin.helm-chart-v3}.created": uninstall_sw, + "_admin.helm-chart-v3.operationalState": "ENABLED", + } + self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update) + except Exception as e: + self.log.error( + "error initializing helm-v3 cluster: {}".format(str(e)) + ) + raise N2VCException( + "K8s system cluster '{}' has not been initialized for helm-chart-v3".format( + cluster_id + ) + ) self._system_cluster_id = k8s_hc_id return self._system_cluster_id def _get_ee_id_parts(self, ee_id): - namespace, _, helm_id = ee_id.partition('.') - return namespace, helm_id + """ + Parses ee_id stored at database that can be either 'version:namespace.helm_id' or only + namespace.helm_id for backward compatibility + If exists helm version can be helm-v3 or helm (helm-v2 old version) + """ + version, _, part_id = ee_id.rpartition(":") + namespace, _, helm_id = part_id.rpartition(".") + return version, namespace, helm_id