X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm_helm_conn.py;h=930ec602d8e57d52ead167078231c096ca009dcb;hb=e11384e1797ea0a5f8cd084d6f336948170bc640;hp=887c212a8b01cb4f2b95d66c861529b97311cf02;hpb=1411a004e64e080851cfd072d88edbf7cccd78d0;p=osm%2FLCM.git diff --git a/osm_lcm/lcm_helm_conn.py b/osm_lcm/lcm_helm_conn.py index 887c212..930ec60 100644 --- a/osm_lcm/lcm_helm_conn.py +++ b/osm_lcm/lcm_helm_conn.py @@ -24,6 +24,7 @@ import ssl from grpclib.client import Channel +from osm_lcm.data_utils.lcm_config import VcaConfig from osm_lcm.frontend_pb2 import PrimitiveRequest from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply from osm_lcm.frontend_grpc import FrontendExecutorStub @@ -33,7 +34,6 @@ from osm_lcm.data_utils.database.database import Database from osm_lcm.data_utils.filesystem.filesystem import Filesystem from n2vc.n2vc_conn import N2VCConnector -from n2vc.k8s_helm_conn import K8sHelmConnector from n2vc.k8s_helm3_conn import K8sHelm3Connector from n2vc.exceptions import ( N2VCBadArgumentsException, @@ -43,12 +43,10 @@ from n2vc.exceptions import ( from osm_lcm.lcm_utils import deep_get -CA_STORE = "/etc/ssl/certs/osm-ca.crt" - def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"): def wrapper(func): - retry_exceptions = ConnectionRefusedError + retry_exceptions = (ConnectionRefusedError, TimeoutError) @functools.wraps(func) async def wrapped(*args, **kwargs): @@ -80,41 +78,24 @@ def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_dela def create_secure_context( - trusted: str, + trusted: str, client_cert_path: str, client_key_path: str ) -> ssl.SSLContext: ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.verify_mode = ssl.CERT_REQUIRED ctx.check_hostname = True ctx.minimum_version = ssl.TLSVersion.TLSv1_2 - # TODO: client TLS - # ctx.load_cert_chain(str(client_cert), str(client_key)) + ctx.load_cert_chain(client_cert_path, client_key_path) ctx.load_verify_locations(trusted) ctx.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20") ctx.set_alpn_protocols(["h2"]) - try: - ctx.set_npn_protocols(["h2"]) - except NotImplementedError: - pass return ctx class LCMHelmConn(N2VCConnector, LcmBase): - _KUBECTL_OSM_NAMESPACE = "osm" - _KUBECTL_OSM_CLUSTER_NAME = "_system-osm-k8s" - _EE_SERVICE_PORT = 50050 - - # Initial max retry time - _MAX_INITIAL_RETRY_TIME = 600 - # Max retry time for normal operations - _MAX_RETRY_TIME = 30 - # Time beetween retries, retry time after a connection error is raised - _EE_RETRY_DELAY = 10 - def __init__( self, log: object = None, - loop: object = None, - vca_config: dict = None, + vca_config: VcaConfig = None, on_update_db=None, ): """ @@ -126,57 +107,25 @@ class LCMHelmConn(N2VCConnector, LcmBase): # parent class constructor N2VCConnector.__init__( - self, log=log, loop=loop, on_update_db=on_update_db, db=self.db, fs=self.fs + self, log=log, on_update_db=on_update_db, db=self.db, fs=self.fs ) self.vca_config = vca_config self.log.debug("Initialize helm N2VC connector") - self.log.debug("initial vca_config: {}".format(vca_config)) + self.log.debug("initial vca_config: {}".format(vca_config.to_dict())) - # TODO - Obtain data from configuration - self._ee_service_port = self._EE_SERVICE_PORT - - self._retry_delay = self._EE_RETRY_DELAY - - if self.vca_config and self.vca_config.get("eegrpcinittimeout"): - self._initial_retry_time = self.vca_config.get("eegrpcinittimeout") - self.log.debug("Initial retry time: {}".format(self._initial_retry_time)) - else: - self._initial_retry_time = self._MAX_INITIAL_RETRY_TIME - self.log.debug( - "Applied default retry time: {}".format(self._initial_retry_time) - ) + self._retry_delay = self.vca_config.helm_ee_retry_delay - if self.vca_config and self.vca_config.get("eegrpctimeout"): - self._max_retry_time = self.vca_config.get("eegrpctimeout") - self.log.debug("Retry time: {}".format(self._max_retry_time)) - else: - self._max_retry_time = self._MAX_RETRY_TIME - self.log.debug( - "Applied default retry time: {}".format(self._max_retry_time) - ) + self._initial_retry_time = self.vca_config.helm_max_initial_retry_time + self.log.debug("Initial retry time: {}".format(self._initial_retry_time)) - if self.vca_config and self.vca_config.get("eegrpc_tls_enforce"): - self._tls_enforce = str( - self.vca_config.get("eegrpc_tls_enforce") - ).lower() in ("true", "1", "yes") - else: - self._tls_enforce = False - self.log.debug("TLS enforce enabled: {}".format(self._tls_enforce)) - - # initialize helm connector for helmv2 and helmv3 - self._k8sclusterhelm2 = K8sHelmConnector( - kubectl_command=self.vca_config.get("kubectlpath"), - helm_command=self.vca_config.get("helmpath"), - fs=self.fs, - db=self.db, - log=self.log, - on_update_db=None, - ) + self._max_retry_time = self.vca_config.helm_max_retry_time + self.log.debug("Retry time: {}".format(self._max_retry_time)) + # initialize helm connector for helmv3 self._k8sclusterhelm3 = K8sHelm3Connector( - kubectl_command=self.vca_config.get("kubectlpath"), - helm_command=self.vca_config.get("helm3path"), + kubectl_command=self.vca_config.kubectlpath, + helm_command=self.vca_config.helm3path, fs=self.fs, log=self.log, db=self.db, @@ -221,11 +170,14 @@ class LCMHelmConn(N2VCConnector, LcmBase): (e.g. stable/openldap, stable/openldap:1.2.4) - a path to a packaged chart (e.g. mychart.tgz) - a path to an unpacked chart directory or a URL (e.g. mychart) - :param str vca_type: Type of vca, must be type helm or helm-v3 + :param str vca_type: Type of vca, must be type helm-v3 :returns str, dict: id of the new execution environment including namespace.helm_id and credentials object set to None as all credentials should be osm kubernetes .kubeconfig """ + if not namespace: + namespace = self.vca_config.kubectl_osm_namespace + self.log.info( "create_execution_environment: namespace: {}, artifact_path: {}, " "chart_model: {}, db_dict: {}, reuse_ee_id: {}".format( @@ -280,36 +232,21 @@ class LCMHelmConn(N2VCConnector, LcmBase): config["global"]["osm"] = config.get("osm") self.log.debug("install helm chart: {}".format(full_path)) - if vca_type == "helm": - helm_id = self._k8sclusterhelm2.generate_kdu_instance_name( - db_dict=db_dict, - kdu_model=kdu_model, - ) - await self._k8sclusterhelm2.install( - system_cluster_uuid, - kdu_model=kdu_model, - kdu_instance=helm_id, - namespace=self._KUBECTL_OSM_NAMESPACE, - params=config, - db_dict=db_dict, - timeout=progress_timeout, - ) - else: - helm_id = self._k8sclusterhelm2.generate_kdu_instance_name( - db_dict=db_dict, - kdu_model=kdu_model, - ) - await self._k8sclusterhelm3.install( - system_cluster_uuid, - kdu_model=kdu_model, - kdu_instance=helm_id, - namespace=self._KUBECTL_OSM_NAMESPACE, - params=config, - db_dict=db_dict, - timeout=progress_timeout, - ) + helm_id = self._k8sclusterhelm3.generate_kdu_instance_name( + db_dict=db_dict, + kdu_model=kdu_model, + ) + await self._k8sclusterhelm3.install( + system_cluster_uuid, + kdu_model=kdu_model, + kdu_instance=helm_id, + namespace=namespace, + params=config, + db_dict=db_dict, + timeout=progress_timeout, + ) - ee_id = "{}:{}.{}".format(vca_type, self._KUBECTL_OSM_NAMESPACE, helm_id) + ee_id = "{}:{}.{}".format(vca_type, namespace, helm_id) return ee_id, None except N2VCException: raise @@ -344,7 +281,7 @@ class LCMHelmConn(N2VCConnector, LcmBase): :param float total_timeout: :param dict config: General variables to instantiate KDU :param str artifact_path: path of package content - :param str vca_type: Type of vca, must be type helm or helm-v3 + :param str vca_type: Type of vca, must be type helm-v3 :returns str, dict: id of the new execution environment including namespace.helm_id and credentials object set to None as all credentials should be osm kubernetes .kubeconfig """ @@ -399,28 +336,16 @@ class LCMHelmConn(N2VCConnector, LcmBase): config["global"]["osm"] = config.get("osm") self.log.debug("Ugrade helm chart: {}".format(full_path)) - if vca_type == "helm": - await self._k8sclusterhelm2.upgrade( - system_cluster_uuid, - kdu_model=full_path, - kdu_instance=helm_id, - namespace=namespace, - params=config, - db_dict=db_dict, - timeout=progress_timeout, - force=True, - ) - else: - await self._k8sclusterhelm3.upgrade( - system_cluster_uuid, - kdu_model=full_path, - kdu_instance=helm_id, - namespace=namespace, - params=config, - db_dict=db_dict, - timeout=progress_timeout, - force=True, - ) + await self._k8sclusterhelm3.upgrade( + system_cluster_uuid, + kdu_model=full_path, + kdu_instance=helm_id, + namespace=namespace, + params=config, + db_dict=db_dict, + timeout=progress_timeout, + force=True, + ) except N2VCException: raise @@ -434,14 +359,14 @@ class LCMHelmConn(N2VCConnector, LcmBase): secret_name: str, usage: str, dns_prefix: str, - namespace: str = _KUBECTL_OSM_NAMESPACE, + namespace: str = None, ): # Obtain system cluster id from database system_cluster_uuid = await self._get_system_cluster_id() # use helm-v3 as certificates don't depend on helm version await self._k8sclusterhelm3.create_certificate( cluster_uuid=system_cluster_uuid, - namespace=namespace, + namespace=namespace or self.vca_config.kubectl_osm_namespace, dns_prefix=dns_prefix, name=nsr_id, secret_name=secret_name, @@ -451,16 +376,47 @@ class LCMHelmConn(N2VCConnector, LcmBase): async def delete_tls_certificate( self, certificate_name: str = None, - namespace: str = _KUBECTL_OSM_NAMESPACE, + namespace: str = None, ): # Obtain system cluster id from database system_cluster_uuid = await self._get_system_cluster_id() await self._k8sclusterhelm3.delete_certificate( cluster_uuid=system_cluster_uuid, - namespace=namespace, + namespace=namespace or self.vca_config.kubectl_osm_namespace, certificate_name=certificate_name, ) + async def setup_ns_namespace( + self, + name: str, + ): + # Obtain system cluster id from database + system_cluster_uuid = await self._get_system_cluster_id() + await self._k8sclusterhelm3.create_namespace( + namespace=name, + cluster_uuid=system_cluster_uuid, + labels={ + "pod-security.kubernetes.io/enforce": self.vca_config.eegrpc_pod_admission_policy + }, + ) + await self._k8sclusterhelm3.setup_default_rbac( + name="ee-role", + namespace=name, + api_groups=[""], + resources=["secrets"], + verbs=["get"], + service_account="default", + cluster_uuid=system_cluster_uuid, + ) + await self._k8sclusterhelm3.copy_secret_data( + src_secret="osm-ca", + dst_secret="osm-ca", + src_namespace=self.vca_config.kubectl_osm_namespace, + dst_namespace=name, + cluster_uuid=system_cluster_uuid, + data_key="ca.crt", + ) + async def register_execution_environment( self, namespace: str, @@ -707,19 +663,13 @@ class LCMHelmConn(N2VCConnector, LcmBase): ) try: - # Obtain cluster_uuid system_cluster_uuid = await self._get_system_cluster_id() # Get helm_id version, namespace, helm_id = get_ee_id_parts(ee_id) - # Uninstall chart, for backward compatibility we must assume that if there is no - # version it is helm-v2 - if version == "helm-v3": - await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id) - else: - await self._k8sclusterhelm2.uninstall(system_cluster_uuid, helm_id) + await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id) self.log.info("ee_id: {} deleted".format(ee_id)) except N2VCException: raise @@ -732,8 +682,12 @@ class LCMHelmConn(N2VCConnector, LcmBase): async def delete_namespace( self, namespace: str, db_dict: dict = None, total_timeout: float = None ): - # method not implemented for this connector, execution environments must be deleted individually - pass + # Obtain system cluster id from database + system_cluster_uuid = await self._get_system_cluster_id() + await self._k8sclusterhelm3.delete_namespace( + namespace=namespace, + cluster_uuid=system_cluster_uuid, + ) async def install_k8s_proxy_charm( self, @@ -806,16 +760,25 @@ class LCMHelmConn(N2VCConnector, LcmBase): else: return "ERROR", "No result received" - ssl_context = create_secure_context(CA_STORE) - channel = Channel(ip_addr, self._ee_service_port, ssl=ssl_context) + ssl_context = create_secure_context( + self.vca_config.ca_store, + self.vca_config.client_cert_path, + self.vca_config.client_key_path, + ) + channel = Channel( + ip_addr, self.vca_config.helm_ee_service_port, ssl=ssl_context + ) try: return await execute() except ssl.SSLError as ssl_error: # fallback to insecure gRPC - if ssl_error.reason == "WRONG_VERSION_NUMBER" and not self._tls_enforce: + if ( + ssl_error.reason == "WRONG_VERSION_NUMBER" + and not self.vca_config.eegrpc_tls_enforce + ): self.log.debug( "Execution environment doesn't support TLS, falling back to unsecure gRPC" ) - channel = Channel(ip_addr, self._ee_service_port) + channel = Channel(ip_addr, self.vca_config.helm_ee_service_port) return await execute() elif ssl_error.reason == "WRONG_VERSION_NUMBER": raise N2VCException( @@ -827,7 +790,6 @@ class LCMHelmConn(N2VCConnector, LcmBase): channel.close() def _write_op_detailed_status(self, db_dict, status, detailed_message): - # write ee_id to database: _admin.deployed.VCA.x try: the_table = db_dict["collection"] @@ -848,7 +810,7 @@ class LCMHelmConn(N2VCConnector, LcmBase): async def _get_system_cluster_id(self): if not self._system_cluster_id: db_k8cluster = self.db.get_one( - "k8sclusters", {"name": self._KUBECTL_OSM_CLUSTER_NAME} + "k8sclusters", {"name": self.vca_config.kubectl_osm_cluster_name} ) k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id")) if not k8s_hc_id: