X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm_helm_conn.py;h=d1ad4c597b624e2a5453a13dfbc87dd61f2a8451;hb=87f5f03155d092c22f2bdf7303f10abf06f42531;hp=0bd5c0f2aec44df7006ebbd94be0dc12c7216fa4;hpb=e539a8d7d65be857fc64afa593893e6e6b0b52c0;p=osm%2FLCM.git diff --git a/osm_lcm/lcm_helm_conn.py b/osm_lcm/lcm_helm_conn.py index 0bd5c0f..d1ad4c5 100644 --- a/osm_lcm/lcm_helm_conn.py +++ b/osm_lcm/lcm_helm_conn.py @@ -18,12 +18,13 @@ import functools import yaml import asyncio -import socket import uuid import os +import ssl from grpclib.client import Channel +from osm_lcm.data_utils.lcm_config import VcaConfig from osm_lcm.frontend_pb2 import PrimitiveRequest from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply from osm_lcm.frontend_grpc import FrontendExecutorStub @@ -46,7 +47,7 @@ from osm_lcm.lcm_utils import deep_get def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"): def wrapper(func): - retry_exceptions = ConnectionRefusedError + retry_exceptions = (ConnectionRefusedError, TimeoutError) @functools.wraps(func) async def wrapped(*args, **kwargs): @@ -77,23 +78,31 @@ def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_dela return wrapper -class LCMHelmConn(N2VCConnector, LcmBase): - _KUBECTL_OSM_NAMESPACE = "osm" - _KUBECTL_OSM_CLUSTER_NAME = "_system-osm-k8s" - _EE_SERVICE_PORT = 50050 +def create_secure_context( + trusted: str, +) -> ssl.SSLContext: + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.check_hostname = True + ctx.minimum_version = ssl.TLSVersion.TLSv1_2 + # TODO: client TLS + # ctx.load_cert_chain(str(client_cert), str(client_key)) + ctx.load_verify_locations(trusted) + ctx.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20") + ctx.set_alpn_protocols(["h2"]) + try: + ctx.set_npn_protocols(["h2"]) + except NotImplementedError: + pass + return ctx - # Initial max retry time - _MAX_INITIAL_RETRY_TIME = 600 - # Max retry time for normal operations - _MAX_RETRY_TIME = 30 - # Time beetween retries, retry time after a connection error is raised - _EE_RETRY_DELAY = 10 +class LCMHelmConn(N2VCConnector, LcmBase): def __init__( self, log: object = None, loop: object = None, - vca_config: dict = None, + vca_config: VcaConfig = None, on_update_db=None, ): """ @@ -110,35 +119,20 @@ class LCMHelmConn(N2VCConnector, LcmBase): self.vca_config = vca_config self.log.debug("Initialize helm N2VC connector") - self.log.debug("initial vca_config: {}".format(vca_config)) + self.log.debug("initial vca_config: {}".format(vca_config.to_dict())) - # TODO - Obtain data from configuration - self._ee_service_port = self._EE_SERVICE_PORT + self._retry_delay = self.vca_config.helm_ee_retry_delay - self._retry_delay = self._EE_RETRY_DELAY + self._initial_retry_time = self.vca_config.helm_max_initial_retry_time + self.log.debug("Initial retry time: {}".format(self._initial_retry_time)) - if self.vca_config and self.vca_config.get("eegrpcinittimeout"): - self._initial_retry_time = self.vca_config.get("eegrpcinittimeout") - self.log.debug("Initial retry time: {}".format(self._initial_retry_time)) - else: - self._initial_retry_time = self._MAX_INITIAL_RETRY_TIME - self.log.debug( - "Applied default retry time: {}".format(self._initial_retry_time) - ) - - if self.vca_config and self.vca_config.get("eegrpctimeout"): - self._max_retry_time = self.vca_config.get("eegrpctimeout") - self.log.debug("Retry time: {}".format(self._max_retry_time)) - else: - self._max_retry_time = self._MAX_RETRY_TIME - self.log.debug( - "Applied default retry time: {}".format(self._max_retry_time) - ) + self._max_retry_time = self.vca_config.helm_max_retry_time + self.log.debug("Retry time: {}".format(self._max_retry_time)) # initialize helm connector for helmv2 and helmv3 self._k8sclusterhelm2 = K8sHelmConnector( - kubectl_command=self.vca_config.get("kubectlpath"), - helm_command=self.vca_config.get("helmpath"), + kubectl_command=self.vca_config.kubectlpath, + helm_command=self.vca_config.helmpath, fs=self.fs, db=self.db, log=self.log, @@ -146,8 +140,8 @@ class LCMHelmConn(N2VCConnector, LcmBase): ) self._k8sclusterhelm3 = K8sHelm3Connector( - kubectl_command=self.vca_config.get("kubectlpath"), - helm_command=self.vca_config.get("helm3path"), + kubectl_command=self.vca_config.kubectlpath, + helm_command=self.vca_config.helm3path, fs=self.fs, log=self.log, db=self.db, @@ -260,7 +254,7 @@ class LCMHelmConn(N2VCConnector, LcmBase): system_cluster_uuid, kdu_model=kdu_model, kdu_instance=helm_id, - namespace=self._KUBECTL_OSM_NAMESPACE, + namespace=self.vca_config.kubectl_osm_namespace, params=config, db_dict=db_dict, timeout=progress_timeout, @@ -274,13 +268,15 @@ class LCMHelmConn(N2VCConnector, LcmBase): system_cluster_uuid, kdu_model=kdu_model, kdu_instance=helm_id, - namespace=self._KUBECTL_OSM_NAMESPACE, + namespace=self.vca_config.kubectl_osm_namespace, params=config, db_dict=db_dict, timeout=progress_timeout, ) - ee_id = "{}:{}.{}".format(vca_type, self._KUBECTL_OSM_NAMESPACE, helm_id) + ee_id = "{}:{}.{}".format( + vca_type, self.vca_config.kubectl_osm_namespace, helm_id + ) return ee_id, None except N2VCException: raise @@ -399,6 +395,39 @@ class LCMHelmConn(N2VCConnector, LcmBase): self.log.error("Error upgrading chart ee: {}".format(e), exc_info=True) raise N2VCException("Error upgrading chart ee: {}".format(e)) + async def create_tls_certificate( + self, + nsr_id: str, + secret_name: str, + usage: str, + dns_prefix: str, + namespace: str = None, + ): + # Obtain system cluster id from database + system_cluster_uuid = await self._get_system_cluster_id() + # use helm-v3 as certificates don't depend on helm version + await self._k8sclusterhelm3.create_certificate( + cluster_uuid=system_cluster_uuid, + namespace=namespace or self.vca_config.kubectl_osm_namespace, + dns_prefix=dns_prefix, + name=nsr_id, + secret_name=secret_name, + usage=usage, + ) + + async def delete_tls_certificate( + self, + certificate_name: str = None, + namespace: str = None, + ): + # Obtain system cluster id from database + system_cluster_uuid = await self._get_system_cluster_id() + await self._k8sclusterhelm3.delete_certificate( + cluster_uuid=system_cluster_uuid, + namespace=namespace or self.vca_config.kubectl_osm_namespace, + certificate_name=certificate_name, + ) + async def register_execution_environment( self, namespace: str, @@ -460,8 +489,7 @@ class LCMHelmConn(N2VCConnector, LcmBase): try: # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes version, namespace, helm_id = get_ee_id_parts(ee_id) - ip_addr = socket.gethostbyname(helm_id) - + ip_addr = "{}.{}.svc".format(helm_id, namespace) # Obtain ssh_key from the ee, this method will implement retries to allow the ee # install libraries and start successfully ssh_key = await self._get_ssh_key(ip_addr) @@ -545,7 +573,7 @@ class LCMHelmConn(N2VCConnector, LcmBase): try: version, namespace, helm_id = get_ee_id_parts(ee_id) - ip_addr = socket.gethostbyname(helm_id) + ip_addr = "{}.{}.svc".format(helm_id, namespace) except Exception as e: self.log.error("Error getting ee ip ee: {}".format(e)) raise N2VCException("Error getting ee ip ee: {}".format(e)) @@ -646,7 +674,6 @@ class LCMHelmConn(N2VCConnector, LcmBase): ) try: - # Obtain cluster_uuid system_cluster_uuid = await self._get_system_cluster_id() @@ -690,14 +717,11 @@ class LCMHelmConn(N2VCConnector, LcmBase): @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay") async def _get_ssh_key(self, ip_addr): - channel = Channel(ip_addr, self._ee_service_port) - try: - stub = FrontendExecutorStub(channel) - self.log.debug("get ssh key, ip_addr: {}".format(ip_addr)) - reply: SshKeyReply = await stub.GetSshKey(SshKeyRequest()) - return reply.message - finally: - channel.close() + return await self._execute_primitive_internal( + ip_addr, + "_get_ssh_key", + None, + ) @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay") async def _execute_config_primitive(self, ip_addr, params, db_dict=None): @@ -714,10 +738,13 @@ class LCMHelmConn(N2VCConnector, LcmBase): async def _execute_primitive_internal( self, ip_addr, primitive_name, params, db_dict=None ): - - channel = Channel(ip_addr, self._ee_service_port) - try: + async def execute(): stub = FrontendExecutorStub(channel) + if primitive_name == "_get_ssh_key": + self.log.debug("get ssh key, ip_addr: {}".format(ip_addr)) + reply: SshKeyReply = await stub.GetSshKey(SshKeyRequest()) + return reply.message + # For any other primitives async with stub.RunPrimitive.open() as stream: primitive_id = str(uuid.uuid1()) result = None @@ -744,11 +771,33 @@ class LCMHelmConn(N2VCConnector, LcmBase): return reply.status, reply.detailed_message else: return "ERROR", "No result received" + + ssl_context = create_secure_context(self.vca_config.ca_store) + channel = Channel( + ip_addr, self.vca_config.helm_ee_service_port, ssl=ssl_context + ) + try: + return await execute() + except ssl.SSLError as ssl_error: # fallback to insecure gRPC + if ( + ssl_error.reason == "WRONG_VERSION_NUMBER" + and not self.vca_config.eegrpc_tls_enforce + ): + self.log.debug( + "Execution environment doesn't support TLS, falling back to unsecure gRPC" + ) + channel = Channel(ip_addr, self.vca_config.helm_ee_service_port) + return await execute() + elif ssl_error.reason == "WRONG_VERSION_NUMBER": + raise N2VCException( + "Execution environment doesn't support TLS, primitives cannot be executed" + ) + else: + raise finally: channel.close() def _write_op_detailed_status(self, db_dict, status, detailed_message): - # write ee_id to database: _admin.deployed.VCA.x try: the_table = db_dict["collection"] @@ -769,7 +818,7 @@ class LCMHelmConn(N2VCConnector, LcmBase): async def _get_system_cluster_id(self): if not self._system_cluster_id: db_k8cluster = self.db.get_one( - "k8sclusters", {"name": self._KUBECTL_OSM_CLUSTER_NAME} + "k8sclusters", {"name": self.vca_config.kubectl_osm_cluster_name} ) k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id")) if not k8s_hc_id: