+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed getting data from the secret"),
+ )
+ async def get_secret_content(
+ self,
+ name: str,
+ namespace: str,
+ ) -> dict:
+ """
+ Get secret data
+
+ :param: name: Name of the secret
+ :param: namespace: Name of the namespace where the secret is stored
+
+ :return: Dictionary with secret's data
+ """
+ v1_core = self.clients[CORE_CLIENT]
+
+ secret = v1_core.read_namespaced_secret(name, namespace)
+
+ return secret.data
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed creating the secret"),
+ )
+ async def create_secret(
+ self, name: str, data: dict, namespace: str, secret_type: str
+ ):
+ """
+ Get secret data
+
+ :param: name: Name of the secret
+ :param: data: Dict with data content. Values must be already base64 encoded
+ :param: namespace: Name of the namespace where the secret will be stored
+ :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
+
+ :return: None
+ """
+ v1_core = self.clients[CORE_CLIENT]
+ metadata = V1ObjectMeta(name=name, namespace=namespace)
+ secret = V1Secret(metadata=metadata, data=data, type=secret_type)
+ v1_core.create_namespaced_secret(namespace, secret)
+
+ async def create_certificate(
+ self,
+ namespace: str,
+ name: str,
+ dns_prefix: str,
+ secret_name: str,
+ usages: list,
+ issuer_name: str,
+ ):
+ """
+ Creates cert-manager certificate object
+
+ :param: namespace: Name of the namespace where the certificate and secret is stored
+ :param: name: Name of the certificate object
+ :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
+ :param: secret_name: Name of the secret created by cert-manager
+ :param: usages: List of X.509 key usages
+ :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
+
+ """
+ certificate_body = {
+ "apiVersion": "cert-manager.io/v1",
+ "kind": "Certificate",
+ "metadata": {"name": name, "namespace": namespace},
+ "spec": {
+ "secretName": secret_name,
+ "privateKey": {
+ "rotationPolicy": "Always",
+ "algorithm": "ECDSA",
+ "size": 256,
+ },
+ "duration": "8760h", # 1 Year
+ "renewBefore": "2208h", # 9 months
+ "subject": {"organizations": ["osm"]},
+ "commonName": "osm",
+ "isCA": False,
+ "usages": usages,
+ "dnsNames": [
+ "{}.{}".format(dns_prefix, namespace),
+ "{}.{}.svc".format(dns_prefix, namespace),
+ "{}.{}.svc.cluster".format(dns_prefix, namespace),
+ "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
+ ],
+ "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
+ },
+ }
+ client = self.clients[CUSTOM_OBJECT_CLIENT]
+ try:
+ client.create_namespaced_custom_object(
+ group="cert-manager.io",
+ plural="certificates",
+ version="v1",
+ body=certificate_body,
+ namespace=namespace,
+ )
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "alreadyexists":
+ self.logger.warning("Certificate already exists: {}".format(e))
+ else:
+ raise e
+
+ async def delete_certificate(self, namespace, object_name):
+ client = self.clients[CUSTOM_OBJECT_CLIENT]
+ try:
+ client.delete_namespaced_custom_object(
+ group="cert-manager.io",
+ plural="certificates",
+ version="v1",
+ name=object_name,
+ namespace=namespace,
+ )
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "notfound":
+ self.logger.warning("Certificate already deleted: {}".format(e))
+ else:
+ raise e
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed creating the namespace"),
+ )
+ async def create_namespace(self, name: str, labels: dict = None):
+ """
+ Create a namespace
+
+ :param: name: Name of the namespace to be created
+ :param: labels: Dictionary with labels for the new namespace
+
+ """
+ v1_core = self.clients[CORE_CLIENT]
+ metadata = V1ObjectMeta(name=name, labels=labels)
+ namespace = V1Namespace(
+ metadata=metadata,
+ )
+
+ try:
+ v1_core.create_namespace(namespace)
+ self.logger.debug("Namespace created: {}".format(name))
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "alreadyexists":
+ self.logger.warning("Namespace already exists: {}".format(e))
+ else:
+ raise e
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed deleting the namespace"),
+ )
+ async def delete_namespace(self, name: str):
+ """
+ Delete a namespace
+
+ :param: name: Name of the namespace to be deleted
+
+ """
+ try:
+ self.clients[CORE_CLIENT].delete_namespace(name)
+ except ApiException as e:
+ if e.reason == "Not Found":
+ self.logger.warning("Namespace already deleted: {}".format(e))