&& python3 -m pip install --upgrade pip \
&& python3 -m pip install -U grpcio-tools \
&& python3 -m pip install -U grpclib \
- && python3 -m pip install -U PyYAML
+ && python3 -m pip install -U PyYAML \
+ && python3 -m pip install -U kubernetes
# Libraries used by the vnf: asyncssh, ansible
#RUN apt-get update && apt-get install software-properties-common \
+from base64 import b64decode
import logging
import ssl
+from .util_kubernetes import get_secret_data
logger = logging.getLogger("osm_ee.util_grpc")
-
-SERVER_CERT = "/etc/ssl/grpc-tls/tls.crt"
-SERVER_KEY = "/etc/ssl/grpc-tls/tls.key"
+SERVER_CERT_SECRET = "ee-tls"
+CLIENT_CA_SECRET = "osm-ca"
+SERVER_CERT_FILE = "/etc/ssl/ee-tls.crt"
+SERVER_KEY_FILE = "/etc/ssl/ee-tls.key"
+CLIENT_CA_FILE = "/etc/ssl/osm-ca.crt"
def create_secure_context() -> ssl.SSLContext:
- ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
- # ctx.verify_mode = ssl.CERT_REQUIRED
- try:
- ctx.load_cert_chain(str(SERVER_CERT), str(SERVER_KEY))
- except FileNotFoundError:
- logger.warning("TLS Certificate not found, starting gRPC server in unsecure mode")
+ # retrieve certificates from secrets
+ if not _retrieve_certs():
+ logger.warning("TLS Certificates not found, starting gRPC server in unsecure mode")
return None
- # TODO: client TLS
- # ctx.load_verify_locations(str(trusted))
+ # create SSL context
+ ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+ ctx.verify_mode = ssl.CERT_REQUIRED
+ ctx.load_cert_chain(SERVER_CERT_FILE, SERVER_KEY_FILE)
+ ctx.load_verify_locations(CLIENT_CA_FILE)
ctx.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20')
ctx.set_alpn_protocols(['h2'])
try:
ctx.set_npn_protocols(['h2'])
except NotImplementedError:
pass
- return ctx
\ No newline at end of file
+ return ctx
+
+
+def _retrieve_certs():
+ _server_data = get_secret_data(SERVER_CERT_SECRET)
+ lcm_ca = get_secret_data(CLIENT_CA_SECRET).get("ca.crt")
+ if not (_server_data and lcm_ca):
+ return False
+ server_cert = _server_data.get("tls.crt")
+ with open(SERVER_CERT_FILE, "w") as server_cert_file:
+ server_cert_file.write(b64decode(server_cert).decode())
+ server_key = _server_data.get("tls.key")
+ with open(SERVER_KEY_FILE, "w") as server_key_file:
+ server_key_file.write(b64decode(server_key).decode())
+ with open(CLIENT_CA_FILE, "w") as client_ca_file:
+ client_ca_file.write(b64decode(lcm_ca).decode())
+ return True
--- /dev/null
+from kubernetes import client, config
+from kubernetes.client.rest import ApiException
+
+
+def get_secret_data(name) -> dict:
+ # assume that we are executing in a kubernetes pod
+ try:
+ config.load_incluster_config()
+ except config.ConfigException:
+ # we are not running in kubernetes
+ return {}
+ # Read the namespace from the service account
+ current_namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read()
+
+ v1 = client.CoreV1Api()
+ try:
+ secret = v1.read_namespaced_secret(name, current_namespace)
+ except ApiException as e:
+ if e.reason == 'Not Found': # Backwards compatibility: we run in k8s but certs don't exist
+ return {}
+ else:
+ raise
+ return secret.data