+#######################################################################################
# Copyright 2020 Canonical Ltd.
+# Copyright ETSI Contributors and Others.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+#######################################################################################
import base64
import logging
import typing
import uuid
import json
+import tarfile
+import io
from distutils.version import LooseVersion
V1Secret,
V1SecretReference,
V1Namespace,
+ V1PersistentVolumeClaim,
+ V1PersistentVolumeClaimSpec,
+ V1PersistentVolumeClaimVolumeSource,
+ V1ResourceRequirements,
+ V1Pod,
+ V1PodSpec,
+ V1Volume,
+ V1VolumeMount,
+ V1Container,
)
from kubernetes.client.rest import ApiException
+from kubernetes.stream import stream
from n2vc.libjuju import retry_callback
from retrying_async import retry
-
SERVICE_ACCOUNT_TOKEN_KEY = "token"
SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
# clients
CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
}
self._configuration = config.kube_config.Configuration.get_default_copy()
- self.logger = logging.getLogger("Kubectl")
+ self.logger = logging.getLogger("lcm.kubectl")
@property
def configuration(self):
"name": i.metadata.name,
"cluster_ip": i.spec.cluster_ip,
"type": i.spec.type,
- "ports": [
- {
- "name": p.name,
- "node_port": p.node_port,
- "port": p.port,
- "protocol": p.protocol,
- "target_port": p.target_port,
- }
- for p in i.spec.ports
- ]
- if i.spec.ports
- else [],
+ "ports": (
+ [
+ {
+ "name": p.name,
+ "node_port": p.node_port,
+ "port": p.port,
+ "protocol": p.protocol,
+ "target_port": p.target_port,
+ }
+ for p in i.spec.ports
+ ]
+ if i.spec.ports
+ else []
+ ),
"external_ip": [i.ip for i in i.status.load_balancer.ingress]
if i.status.load_balancer.ingress
else None,
return "{}-token-{}".format(service_account_name, random_alphanum)
def _create_service_account_secret(
- self, service_account_name: str, namespace: str, secret_name: str
+ self,
+ service_account_name: str,
+ namespace: str,
+ secret_name: str,
):
"""
Create a secret for the service account. K8s version >= 1.24
service_account = V1ServiceAccount(metadata=metadata)
v1_core.create_namespaced_service_account(namespace, service_account)
+ def delete_secret(self, name: str, namespace: str = "kube-system"):
+ """
+ Delete a secret
+
+ :param: name: Name of the secret
+ :param: namespace: Kubernetes namespace
+ Default: kube-system
+ """
+ self.clients[CORE_CLIENT].delete_namespaced_secret(name, namespace)
+
def delete_service_account(self, name: str, namespace: str = "kube-system"):
"""
Delete a service account
self, name: str, data: dict, namespace: str, secret_type: str
):
"""
- Get secret data
+ Create secret with data
:param: name: Name of the secret
:param: data: Dict with data content. Values must be already base64 encoded
:return: None
"""
+ self.logger.info("Enter create_secret function")
v1_core = self.clients[CORE_CLIENT]
+ self.logger.info(f"v1_core: {v1_core}")
metadata = V1ObjectMeta(name=name, namespace=namespace)
+ self.logger.info(f"metadata: {metadata}")
secret = V1Secret(metadata=metadata, data=data, type=secret_type)
+ self.logger.info(f"secret: {secret}")
v1_core.create_namespaced_secret(namespace, secret)
+ self.logger.info("Namespaced secret was created")
async def create_certificate(
self,
except ApiException as e:
if e.reason == "Not Found":
self.logger.warning("Namespace already deleted: {}".format(e))
+
+ def get_secrets(
+ self,
+ namespace: str,
+ field_selector: str = None,
+ ) -> typing.List[typing.Dict]:
+ """
+ Get Secret list from a namespace
+
+ :param: namespace: Kubernetes namespace
+ :param: field_selector: Kubernetes field selector
+
+ :return: List of the secrets matching the selectors specified
+ """
+ try:
+ v1_core = self.clients[CORE_CLIENT]
+ secrets = v1_core.list_namespaced_secret(
+ namespace=namespace,
+ field_selector=field_selector,
+ ).items
+ return secrets
+ except ApiException as e:
+ self.logger.error("Error calling get secrets: {}".format(e))
+ raise e
+
+ async def create_generic_object(
+ self,
+ api_group: str,
+ api_plural: str,
+ api_version: str,
+ namespace: str,
+ manifest_dict: dict,
+ ):
+ """
+ Creates generic object
+
+ :param: api_group: API Group
+ :param: api_plural: API Plural
+ :param: api_version: API Version
+ :param: namespace: Namespace
+ :param: manifest_dict: Dictionary with the content of the Kubernetes manifest
+
+ """
+ client = self.clients[CUSTOM_OBJECT_CLIENT]
+ try:
+ client.create_namespaced_custom_object(
+ group=api_group,
+ plural=api_plural,
+ version=api_version,
+ body=manifest_dict,
+ namespace=namespace,
+ )
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "alreadyexists":
+ self.logger.warning("Object already exists: {}".format(e))
+ else:
+ raise e
+
+ async def delete_generic_object(
+ self,
+ api_group: str,
+ api_plural: str,
+ api_version: str,
+ namespace: str,
+ name: str,
+ ):
+ """
+ Deletes generic object
+
+ :param: api_group: API Group
+ :param: api_plural: API Plural
+ :param: api_version: API Version
+ :param: namespace: Namespace
+ :param: name: Name of the object
+
+ """
+ client = self.clients[CUSTOM_OBJECT_CLIENT]
+ try:
+ client.delete_namespaced_custom_object(
+ group=api_group,
+ plural=api_plural,
+ version=api_version,
+ name=name,
+ namespace=namespace,
+ )
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "notfound":
+ self.logger.warning("Object already deleted: {}".format(e))
+ else:
+ raise e
+
+ async def get_generic_object(
+ self,
+ api_group: str,
+ api_plural: str,
+ api_version: str,
+ namespace: str,
+ name: str,
+ ):
+ """
+ Gets generic object
+
+ :param: api_group: API Group
+ :param: api_plural: API Plural
+ :param: api_version: API Version
+ :param: namespace: Namespace
+ :param: name: Name of the object
+
+ """
+ client = self.clients[CUSTOM_OBJECT_CLIENT]
+ try:
+ object_dict = client.list_namespaced_custom_object(
+ group=api_group,
+ plural=api_plural,
+ version=api_version,
+ namespace=namespace,
+ field_selector=f"metadata.name={name}",
+ )
+ if len(object_dict.get("items")) == 0:
+ return None
+ return object_dict.get("items")[0]
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "notfound":
+ self.logger.warning("Cannot get custom object: {}".format(e))
+ else:
+ raise e
+
+ async def list_generic_object(
+ self,
+ api_group: str,
+ api_plural: str,
+ api_version: str,
+ namespace: str,
+ ):
+ """
+ Lists all generic objects of the requested API group
+
+ :param: api_group: API Group
+ :param: api_plural: API Plural
+ :param: api_version: API Version
+ :param: namespace: Namespace
+
+ """
+ client = self.clients[CUSTOM_OBJECT_CLIENT]
+ try:
+ object_dict = client.list_namespaced_custom_object(
+ group=api_group,
+ plural=api_plural,
+ version=api_version,
+ namespace=namespace,
+ )
+ self.logger.debug(f"Object-list: {object_dict.get('items')}")
+ return object_dict.get("items")
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "notfound":
+ self.logger.warning(
+ "Cannot retrieve list of custom objects: {}".format(e)
+ )
+ else:
+ raise e
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed creating the secret"),
+ )
+ async def create_secret_string(
+ self, name: str, string_data: str, namespace: str, secret_type: str
+ ):
+ """
+ Create secret with data
+
+ :param: name: Name of the secret
+ :param: string_data: String with data content
+ :param: namespace: Name of the namespace where the secret will be stored
+ :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
+
+ :return: None
+ """
+ v1_core = self.clients[CORE_CLIENT]
+ metadata = V1ObjectMeta(name=name, namespace=namespace)
+ secret = V1Secret(metadata=metadata, string_data=string_data, type=secret_type)
+ v1_core.create_namespaced_secret(namespace, secret)
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed creating the pvc"),
+ )
+ async def create_pvc(self, name: str, namespace: str):
+ """
+ Create a namespace
+
+ :param: name: Name of the pvc to be created
+ :param: namespace: Name of the namespace where the pvc will be stored
+
+ """
+ try:
+ pvc = V1PersistentVolumeClaim(
+ api_version="v1",
+ kind="PersistentVolumeClaim",
+ metadata=V1ObjectMeta(name=name),
+ spec=V1PersistentVolumeClaimSpec(
+ access_modes=["ReadWriteOnce"],
+ resources=V1ResourceRequirements(requests={"storage": "100Mi"}),
+ ),
+ )
+ self.clients[CORE_CLIENT].create_namespaced_persistent_volume_claim(
+ namespace=namespace, body=pvc
+ )
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "alreadyexists":
+ self.logger.warning("PVC already exists: {}".format(e))
+ else:
+ raise e
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed deleting the pvc"),
+ )
+ async def delete_pvc(self, name: str, namespace: str):
+ """
+ Create a namespace
+
+ :param: name: Name of the pvc to be deleted
+ :param: namespace: Namespace
+
+ """
+ self.clients[CORE_CLIENT].delete_namespaced_persistent_volume_claim(
+ name, namespace
+ )
+
+ def copy_file_to_pod(
+ self, namespace, pod_name, container_name, src_file, dest_path
+ ):
+ # Create an in-memory tar file containing the source file
+ tar_buffer = io.BytesIO()
+ with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
+ tar.add(src_file, arcname=dest_path.split("/")[-1])
+
+ tar_buffer.seek(0)
+
+ # Define the command to extract the tar file in the pod
+ exec_command = ["tar", "xvf", "-", "-C", dest_path.rsplit("/", 1)[0]]
+
+ # Execute the command
+ resp = stream(
+ self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
+ pod_name,
+ namespace,
+ command=exec_command,
+ container=container_name,
+ stdin=True,
+ stderr=True,
+ stdout=True,
+ tty=False,
+ _preload_content=False,
+ )
+
+ # Write the tar data to the pod
+ resp.write_stdin(tar_buffer.read())
+ resp.close()
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed creating the pvc"),
+ )
+ async def create_pvc_with_content(
+ self, name: str, namespace: str, src_folder: str, filename: str
+ ):
+ """
+ Create a PVC with content
+
+ :param: name: Name of the pvc to be created
+ :param: namespace: Name of the namespace where the pvc will be stored
+ :param: src_folder: Folder where the file to be copied is located
+ :param: filename: Name of the file to be copied
+ """
+ pod_name = f"copy-pod-{name}"
+ await self.create_pvc(name=name, namespace=namespace)
+ await self.create_copy_pod(name=pod_name, namespace=namespace, pvc_name=name)
+ self.copy_file_to_pod(
+ namespace=namespace,
+ pod_name=pod_name,
+ container_name="copy-container",
+ src_file=f"{src_folder}/{filename}",
+ dest_path=f"/mnt/data/{filename}",
+ )
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed creating the pvc"),
+ )
+ async def create_copy_pod(self, name: str, namespace: str, pvc_name: str):
+ """
+ Create a pod to copy content into a PVC
+
+ :param: name: Name of the pod to be created
+ :param: namespace: Name of the namespace where the pod will be stored
+ :param: pvc_name: Name of the PVC that the pod will mount as a volume
+
+ """
+ pod = V1Pod(
+ api_version="v1",
+ kind="Pod",
+ metadata=client.V1ObjectMeta(name=name),
+ spec=V1PodSpec(
+ containers=[
+ V1Container(
+ name="copy-container",
+ image="busybox", # Imagen ligera para copiar archivos
+ command=["sleep", "3600"], # Mantén el contenedor en ejecución
+ volume_mounts=[
+ V1VolumeMount(mount_path="/mnt/data", name="my-storage")
+ ],
+ )
+ ],
+ volumes=[
+ V1Volume(
+ name="my-storage",
+ persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
+ claim_name=pvc_name
+ ),
+ )
+ ],
+ ),
+ )
+ # Create the pod
+ self.clients[CORE_CLIENT].create_namespaced_pod(namespace=namespace, body=pod)
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed deleting the pod"),
+ )
+ async def delete_pod(self, name: str, namespace: str):
+ """
+ Create a namespace
+
+ :param: name: Name of the pod to be deleted
+ :param: namespace: Namespace
+
+ """
+ self.clients[CORE_CLIENT].delete_namespaced_pod(name, namespace)