X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm_helm_conn.py;h=d1ad4c597b624e2a5453a13dfbc87dd61f2a8451;hb=87f5f03155d092c22f2bdf7303f10abf06f42531;hp=887c212a8b01cb4f2b95d66c861529b97311cf02;hpb=1411a004e64e080851cfd072d88edbf7cccd78d0;p=osm%2FLCM.git diff --git a/osm_lcm/lcm_helm_conn.py b/osm_lcm/lcm_helm_conn.py index 887c212..d1ad4c5 100644 --- a/osm_lcm/lcm_helm_conn.py +++ b/osm_lcm/lcm_helm_conn.py @@ -24,6 +24,7 @@ import ssl from grpclib.client import Channel +from osm_lcm.data_utils.lcm_config import VcaConfig from osm_lcm.frontend_pb2 import PrimitiveRequest from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply from osm_lcm.frontend_grpc import FrontendExecutorStub @@ -43,12 +44,10 @@ from n2vc.exceptions import ( from osm_lcm.lcm_utils import deep_get -CA_STORE = "/etc/ssl/certs/osm-ca.crt" - def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"): def wrapper(func): - retry_exceptions = ConnectionRefusedError + retry_exceptions = (ConnectionRefusedError, TimeoutError) @functools.wraps(func) async def wrapped(*args, **kwargs): @@ -99,22 +98,11 @@ def create_secure_context( class LCMHelmConn(N2VCConnector, LcmBase): - _KUBECTL_OSM_NAMESPACE = "osm" - _KUBECTL_OSM_CLUSTER_NAME = "_system-osm-k8s" - _EE_SERVICE_PORT = 50050 - - # Initial max retry time - _MAX_INITIAL_RETRY_TIME = 600 - # Max retry time for normal operations - _MAX_RETRY_TIME = 30 - # Time beetween retries, retry time after a connection error is raised - _EE_RETRY_DELAY = 10 - def __init__( self, log: object = None, loop: object = None, - vca_config: dict = None, + vca_config: VcaConfig = None, on_update_db=None, ): """ @@ -131,43 +119,20 @@ class LCMHelmConn(N2VCConnector, LcmBase): self.vca_config = vca_config self.log.debug("Initialize helm N2VC connector") - self.log.debug("initial vca_config: {}".format(vca_config)) - - # TODO - Obtain data from configuration - self._ee_service_port = self._EE_SERVICE_PORT + self.log.debug("initial vca_config: {}".format(vca_config.to_dict())) - self._retry_delay = self._EE_RETRY_DELAY - - if self.vca_config and self.vca_config.get("eegrpcinittimeout"): - self._initial_retry_time = self.vca_config.get("eegrpcinittimeout") - self.log.debug("Initial retry time: {}".format(self._initial_retry_time)) - else: - self._initial_retry_time = self._MAX_INITIAL_RETRY_TIME - self.log.debug( - "Applied default retry time: {}".format(self._initial_retry_time) - ) + self._retry_delay = self.vca_config.helm_ee_retry_delay - if self.vca_config and self.vca_config.get("eegrpctimeout"): - self._max_retry_time = self.vca_config.get("eegrpctimeout") - self.log.debug("Retry time: {}".format(self._max_retry_time)) - else: - self._max_retry_time = self._MAX_RETRY_TIME - self.log.debug( - "Applied default retry time: {}".format(self._max_retry_time) - ) + self._initial_retry_time = self.vca_config.helm_max_initial_retry_time + self.log.debug("Initial retry time: {}".format(self._initial_retry_time)) - if self.vca_config and self.vca_config.get("eegrpc_tls_enforce"): - self._tls_enforce = str( - self.vca_config.get("eegrpc_tls_enforce") - ).lower() in ("true", "1", "yes") - else: - self._tls_enforce = False - self.log.debug("TLS enforce enabled: {}".format(self._tls_enforce)) + self._max_retry_time = self.vca_config.helm_max_retry_time + self.log.debug("Retry time: {}".format(self._max_retry_time)) # initialize helm connector for helmv2 and helmv3 self._k8sclusterhelm2 = K8sHelmConnector( - kubectl_command=self.vca_config.get("kubectlpath"), - helm_command=self.vca_config.get("helmpath"), + kubectl_command=self.vca_config.kubectlpath, + helm_command=self.vca_config.helmpath, fs=self.fs, db=self.db, log=self.log, @@ -175,8 +140,8 @@ class LCMHelmConn(N2VCConnector, LcmBase): ) self._k8sclusterhelm3 = K8sHelm3Connector( - kubectl_command=self.vca_config.get("kubectlpath"), - helm_command=self.vca_config.get("helm3path"), + kubectl_command=self.vca_config.kubectlpath, + helm_command=self.vca_config.helm3path, fs=self.fs, log=self.log, db=self.db, @@ -289,7 +254,7 @@ class LCMHelmConn(N2VCConnector, LcmBase): system_cluster_uuid, kdu_model=kdu_model, kdu_instance=helm_id, - namespace=self._KUBECTL_OSM_NAMESPACE, + namespace=self.vca_config.kubectl_osm_namespace, params=config, db_dict=db_dict, timeout=progress_timeout, @@ -303,13 +268,15 @@ class LCMHelmConn(N2VCConnector, LcmBase): system_cluster_uuid, kdu_model=kdu_model, kdu_instance=helm_id, - namespace=self._KUBECTL_OSM_NAMESPACE, + namespace=self.vca_config.kubectl_osm_namespace, params=config, db_dict=db_dict, timeout=progress_timeout, ) - ee_id = "{}:{}.{}".format(vca_type, self._KUBECTL_OSM_NAMESPACE, helm_id) + ee_id = "{}:{}.{}".format( + vca_type, self.vca_config.kubectl_osm_namespace, helm_id + ) return ee_id, None except N2VCException: raise @@ -434,14 +401,14 @@ class LCMHelmConn(N2VCConnector, LcmBase): secret_name: str, usage: str, dns_prefix: str, - namespace: str = _KUBECTL_OSM_NAMESPACE, + namespace: str = None, ): # Obtain system cluster id from database system_cluster_uuid = await self._get_system_cluster_id() # use helm-v3 as certificates don't depend on helm version await self._k8sclusterhelm3.create_certificate( cluster_uuid=system_cluster_uuid, - namespace=namespace, + namespace=namespace or self.vca_config.kubectl_osm_namespace, dns_prefix=dns_prefix, name=nsr_id, secret_name=secret_name, @@ -451,13 +418,13 @@ class LCMHelmConn(N2VCConnector, LcmBase): async def delete_tls_certificate( self, certificate_name: str = None, - namespace: str = _KUBECTL_OSM_NAMESPACE, + namespace: str = None, ): # Obtain system cluster id from database system_cluster_uuid = await self._get_system_cluster_id() await self._k8sclusterhelm3.delete_certificate( cluster_uuid=system_cluster_uuid, - namespace=namespace, + namespace=namespace or self.vca_config.kubectl_osm_namespace, certificate_name=certificate_name, ) @@ -707,7 +674,6 @@ class LCMHelmConn(N2VCConnector, LcmBase): ) try: - # Obtain cluster_uuid system_cluster_uuid = await self._get_system_cluster_id() @@ -806,16 +772,21 @@ class LCMHelmConn(N2VCConnector, LcmBase): else: return "ERROR", "No result received" - ssl_context = create_secure_context(CA_STORE) - channel = Channel(ip_addr, self._ee_service_port, ssl=ssl_context) + ssl_context = create_secure_context(self.vca_config.ca_store) + channel = Channel( + ip_addr, self.vca_config.helm_ee_service_port, ssl=ssl_context + ) try: return await execute() except ssl.SSLError as ssl_error: # fallback to insecure gRPC - if ssl_error.reason == "WRONG_VERSION_NUMBER" and not self._tls_enforce: + if ( + ssl_error.reason == "WRONG_VERSION_NUMBER" + and not self.vca_config.eegrpc_tls_enforce + ): self.log.debug( "Execution environment doesn't support TLS, falling back to unsecure gRPC" ) - channel = Channel(ip_addr, self._ee_service_port) + channel = Channel(ip_addr, self.vca_config.helm_ee_service_port) return await execute() elif ssl_error.reason == "WRONG_VERSION_NUMBER": raise N2VCException( @@ -827,7 +798,6 @@ class LCMHelmConn(N2VCConnector, LcmBase): channel.close() def _write_op_detailed_status(self, db_dict, status, detailed_message): - # write ee_id to database: _admin.deployed.VCA.x try: the_table = db_dict["collection"] @@ -848,7 +818,7 @@ class LCMHelmConn(N2VCConnector, LcmBase): async def _get_system_cluster_id(self): if not self._system_cluster_id: db_k8cluster = self.db.get_one( - "k8sclusters", {"name": self._KUBECTL_OSM_CLUSTER_NAME} + "k8sclusters", {"name": self.vca_config.kubectl_osm_cluster_name} ) k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id")) if not k8s_hc_id: