blob: 5fe8fc5b86a35235402ae43836341a7ccd0d56c4 [file] [log] [blame]
from base64 import b64decode
import logging
import ssl
from .util_kubernetes import get_secret_data
logger = logging.getLogger("osm_ee.util_grpc")
SERVER_CERT_SECRET = "ee-tls"
CLIENT_CA_SECRET = "osm-ca"
SERVER_CERT_FILE = "/etc/ssl/ee-tls.crt"
SERVER_KEY_FILE = "/etc/ssl/ee-tls.key"
CLIENT_CA_FILE = "/etc/ssl/osm-ca.crt"
def create_secure_context() -> ssl.SSLContext:
# retrieve certificates from secrets
if not _retrieve_certs():
logger.warning("TLS Certificates not found, starting gRPC server in unsecure mode")
return None
# create SSL context
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_cert_chain(SERVER_CERT_FILE, SERVER_KEY_FILE)
ctx.load_verify_locations(CLIENT_CA_FILE)
ctx.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20')
ctx.set_alpn_protocols(['h2'])
try:
ctx.set_npn_protocols(['h2'])
except NotImplementedError:
pass
return ctx
def _retrieve_certs():
_server_data = get_secret_data(SERVER_CERT_SECRET)
lcm_ca = get_secret_data(CLIENT_CA_SECRET).get("ca.crt")
if not (_server_data and lcm_ca):
return False
server_cert = _server_data.get("tls.crt")
with open(SERVER_CERT_FILE, "w") as server_cert_file:
server_cert_file.write(b64decode(server_cert).decode())
server_key = _server_data.get("tls.key")
with open(SERVER_KEY_FILE, "w") as server_key_file:
server_key_file.write(b64decode(server_key).decode())
with open(CLIENT_CA_FILE, "w") as client_ca_file:
client_ca_file.write(b64decode(lcm_ca).decode())
return True