X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FLCM.git;a=blobdiff_plain;f=osm_lcm%2Flcm_helm_conn.py;h=34e49156e03a4e5779c5685212271ad65afe7547;hp=904b7d9a1aa0132513fada30379c693f3f3381e6;hb=c43253de0f53c4bbd3f4b67c6d57c6efc437cd7a;hpb=aae391fc83f3b20e058769ff3356ca4d9965a3b8 diff --git a/osm_lcm/lcm_helm_conn.py b/osm_lcm/lcm_helm_conn.py index 904b7d9..34e4915 100644 --- a/osm_lcm/lcm_helm_conn.py +++ b/osm_lcm/lcm_helm_conn.py @@ -29,6 +29,9 @@ from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply from osm_lcm.frontend_grpc import FrontendExecutorStub from osm_lcm.lcm_utils import LcmBase +from osm_lcm.data_utils.database.database import Database +from osm_lcm.data_utils.filesystem.filesystem import Filesystem + from n2vc.n2vc_conn import N2VCConnector from n2vc.k8s_helm_conn import K8sHelmConnector from n2vc.k8s_helm3_conn import K8sHelm3Connector @@ -37,7 +40,7 @@ from n2vc.exceptions import N2VCBadArgumentsException, N2VCException, N2VCExecut from osm_lcm.lcm_utils import deep_get -def retryer(max_wait_time=60, delay_time=10): +def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"): def wrapper(func): retry_exceptions = ( ConnectionRefusedError @@ -45,6 +48,17 @@ def retryer(max_wait_time=60, delay_time=10): @functools.wraps(func) async def wrapped(*args, **kwargs): + # default values for wait time and delay_time + delay_time = 10 + max_wait_time = 300 + + # obtain arguments from variable names + self = args[0] + if self.__dict__.get(max_wait_time_var): + max_wait_time = self.__dict__.get(max_wait_time_var) + if self.__dict__.get(delay_time_var): + delay_time = self.__dict__.get(delay_time_var) + wait_time = max_wait_time while wait_time > 0: try: @@ -64,16 +78,14 @@ class LCMHelmConn(N2VCConnector, LcmBase): _KUBECTL_OSM_CLUSTER_NAME = "_system-osm-k8s" _EE_SERVICE_PORT = 50050 - # Time beetween retries - _EE_RETRY_DELAY = 10 # Initial max retry time - _MAX_INITIAL_RETRY_TIME = 300 - # Other retry time + _MAX_INITIAL_RETRY_TIME = 600 + # Max retry time for normal operations _MAX_RETRY_TIME = 30 + # Time beetween retries, retry time after a connection error is raised + _EE_RETRY_DELAY = 10 def __init__(self, - db: object, - fs: object, log: object = None, loop: object = None, url: str = None, @@ -84,35 +96,51 @@ class LCMHelmConn(N2VCConnector, LcmBase): Initialize EE helm connector. """ + self.db = Database().instance.db + self.fs = Filesystem().instance.fs + # parent class constructor N2VCConnector.__init__( self, - db=db, - fs=fs, log=log, loop=loop, url=url, username=username, vca_config=vca_config, on_update_db=on_update_db, + db=self.db, + fs=self.fs ) self.log.debug("Initialize helm N2VC connector") + self.log.debug("initial vca_config: {}".format(vca_config)) # TODO - Obtain data from configuration self._ee_service_port = self._EE_SERVICE_PORT self._retry_delay = self._EE_RETRY_DELAY - self._max_retry_time = self._MAX_RETRY_TIME - self._initial_retry_time = self._MAX_INITIAL_RETRY_TIME + + if self.vca_config and self.vca_config.get("eegrpcinittimeout"): + self._initial_retry_time = self.vca_config.get("eegrpcinittimeout") + self.log.debug("Initial retry time: {}".format(self._initial_retry_time)) + else: + self._initial_retry_time = self._MAX_INITIAL_RETRY_TIME + self.log.debug("Applied default retry time: {}".format(self._initial_retry_time)) + + if self.vca_config and self.vca_config.get("eegrpctimeout"): + self._max_retry_time = self.vca_config.get("eegrpctimeout") + self.log.debug("Retry time: {}".format(self._max_retry_time)) + else: + self._max_retry_time = self._MAX_RETRY_TIME + self.log.debug("Applied default retry time: {}".format(self._max_retry_time)) # initialize helm connector for helmv2 and helmv3 self._k8sclusterhelm2 = K8sHelmConnector( kubectl_command=self.vca_config.get("kubectlpath"), helm_command=self.vca_config.get("helmpath"), fs=self.fs, - log=self.log, db=self.db, + log=self.log, on_update_db=None, ) @@ -190,6 +218,9 @@ class LCMHelmConn(N2VCConnector, LcmBase): else: full_path = self.fs.path + "/" + helm_chart_path + while full_path.find("//") >= 0: + full_path = full_path.replace("//", "/") + try: # Call helm conn install # Obtain system cluster id from database @@ -202,17 +233,27 @@ class LCMHelmConn(N2VCConnector, LcmBase): self.log.debug("install helm chart: {}".format(full_path)) if vca_type == "helm": - helm_id = await self._k8sclusterhelm2.install(system_cluster_uuid, kdu_model=full_path, - namespace=self._KUBECTL_OSM_NAMESPACE, - params=config, - db_dict=db_dict, - timeout=progress_timeout) + helm_id = self._k8sclusterhelm2.generate_kdu_instance_name( + db_dict=db_dict, + kdu_model=full_path, + ) + await self._k8sclusterhelm2.install(system_cluster_uuid, kdu_model=full_path, + kdu_instance=helm_id, + namespace=self._KUBECTL_OSM_NAMESPACE, + params=config, + db_dict=db_dict, + timeout=progress_timeout) else: - helm_id = await self._k8sclusterhelm3.install(system_cluster_uuid, kdu_model=full_path, - namespace=self._KUBECTL_OSM_NAMESPACE, - params=config, - db_dict=db_dict, - timeout=progress_timeout) + helm_id = self._k8sclusterhelm2.generate_kdu_instance_name( + db_dict=db_dict, + kdu_model=full_path, + ) + await self._k8sclusterhelm3.install(system_cluster_uuid, kdu_model=full_path, + kdu_instance=helm_id, + namespace=self._KUBECTL_OSM_NAMESPACE, + params=config, + db_dict=db_dict, + timeout=progress_timeout) ee_id = "{}:{}.{}".format(vca_type, self._KUBECTL_OSM_NAMESPACE, helm_id) return ee_id, None @@ -393,7 +434,13 @@ class LCMHelmConn(N2VCConnector, LcmBase): # nothing to be done pass - async def delete_execution_environment(self, ee_id: str, db_dict: dict = None, total_timeout: float = None): + async def delete_execution_environment( + self, + ee_id: str, + db_dict: dict = None, + total_timeout: float = None, + **kwargs, + ): """ Delete an execution environment :param str ee_id: id of the execution environment to delete, included namespace.helm_id @@ -424,7 +471,7 @@ class LCMHelmConn(N2VCConnector, LcmBase): # Uninstall chart, for backward compatibility we must assume that if there is no # version it is helm-v2 if version == "helm-v3": - await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id) + await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id) else: await self._k8sclusterhelm2.uninstall(system_cluster_uuid, helm_id) self.log.info("ee_id: {} deleted".format(ee_id)) @@ -451,7 +498,7 @@ class LCMHelmConn(N2VCConnector, LcmBase): ) -> str: pass - @retryer(max_wait_time=_MAX_INITIAL_RETRY_TIME, delay_time=_EE_RETRY_DELAY) + @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay") async def _get_ssh_key(self, ip_addr): channel = Channel(ip_addr, self._ee_service_port) try: @@ -462,13 +509,13 @@ class LCMHelmConn(N2VCConnector, LcmBase): finally: channel.close() - @retryer(max_wait_time=_MAX_INITIAL_RETRY_TIME, delay_time=_EE_RETRY_DELAY) + @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay") async def _execute_config_primitive(self, ip_addr, params, db_dict=None): return await self._execute_primitive_internal(ip_addr, "config", params, db_dict=db_dict) - @retryer(max_wait_time=_MAX_RETRY_TIME, delay_time=_EE_RETRY_DELAY) + @retryer(max_wait_time_var="_max_retry_time", delay_time_var="_retry_delay") async def _execute_primitive(self, ip_addr, primitive_name, params, db_dict=None): - return await self._execute_primitive_internal(ip_addr, primitive_name, params, db_dict=db_dict) + return await self._execute_primitive_internal(ip_addr, primitive_name, params, db_dict=db_dict) async def _execute_primitive_internal(self, ip_addr, primitive_name, params, db_dict=None):