blob: 98c1240eaa830872e8c3944605e504d48a24fbf1 [file] [log] [blame]
almagiacdd20ae2024-12-13 09:45:45 +01001#######################################################################################
2# Copyright 2020 Canonical Ltd.
3# Copyright ETSI Contributors and Others.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#######################################################################################
17
18import base64
19import logging
almagiacdd20ae2024-12-13 09:45:45 +010020import typing
21import uuid
22import json
garciadeblas53222a92025-09-11 11:38:02 +020023import yaml
almagiacdd20ae2024-12-13 09:45:45 +010024import tarfile
25import io
garciadeblas61a4c692025-07-17 13:04:13 +020026import os
almagiacdd20ae2024-12-13 09:45:45 +010027from time import sleep
28
29from distutils.version import LooseVersion
30
garciadeblas6d8acf32025-02-06 13:34:37 +010031from kubernetes import client as kclient, config as kconfig
almagiacdd20ae2024-12-13 09:45:45 +010032from kubernetes.client.api import VersionApi
33from kubernetes.client.models import (
34 V1ClusterRole,
35 V1Role,
36 V1ObjectMeta,
37 V1PolicyRule,
38 V1ServiceAccount,
39 V1ClusterRoleBinding,
40 V1RoleBinding,
41 V1RoleRef,
42 RbacV1Subject,
43 V1Secret,
44 V1SecretReference,
45 V1Namespace,
46 V1PersistentVolumeClaim,
47 V1PersistentVolumeClaimSpec,
48 V1PersistentVolumeClaimVolumeSource,
49 V1ResourceRequirements,
50 V1Pod,
51 V1PodSpec,
52 V1Volume,
53 V1VolumeMount,
54 V1Container,
rshrif8911b92025-06-11 18:19:07 +000055 V1ConfigMap,
almagiacdd20ae2024-12-13 09:45:45 +010056)
57from kubernetes.client.rest import ApiException
58from kubernetes.stream import stream
59from osm_lcm.n2vc.libjuju import retry_callback
60from retrying_async import retry
61
62SERVICE_ACCOUNT_TOKEN_KEY = "token"
63SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
64# clients
65CORE_CLIENT = "core_v1"
66RBAC_CLIENT = "rbac_v1"
67STORAGE_CLIENT = "storage_v1"
68CUSTOM_OBJECT_CLIENT = "custom_object"
69
70
71class Kubectl:
72 def __init__(self, config_file=None):
almagiacdd20ae2024-12-13 09:45:45 +010073 self.logger = logging.getLogger("lcm.kubectl")
garciadeblas6d8acf32025-02-06 13:34:37 +010074 self._config_file = config_file
75 self.logger.info(f"Kubectl cfg file: {config_file}")
garciadeblas53222a92025-09-11 11:38:02 +020076
77 # Create default configuration for API client
78 self._configuration = kclient.Configuration()
79
80 # Get proxy_url
81 proxy_url = None
82 if config_file:
83 with open(config_file, "r", encoding="utf-8") as f:
84 kubeconfig_yaml = f.read()
85 try:
86 kubeconfig_dict = yaml.safe_load(kubeconfig_yaml)
87 except yaml.YAMLError as e:
88 raise e
89 proxy_url = (
90 kubeconfig_dict.get("clusters", [])[0]
91 .get("cluster", {})
92 .get("proxy-url")
93 )
94
95 # If kubeconfig has proxy configured, use it
96 if proxy_url:
97 self._configuration.proxy = proxy_url
98 self.logger.info(f"Using proxy for kubernetes: {proxy_url}")
99
100 # Create API client
101 self._api_client = kconfig.new_client_from_config(
102 config_file=config_file,
103 client_configuration=self._configuration,
104 )
105 # self._configuration = self._api_client.configuration.get_default_copy()
106
107 # Carga la config base
garciadeblas6d8acf32025-02-06 13:34:37 +0100108 self._clients = {
109 CORE_CLIENT: kclient.CoreV1Api(api_client=self._api_client),
110 RBAC_CLIENT: kclient.RbacAuthorizationV1Api(api_client=self._api_client),
111 STORAGE_CLIENT: kclient.StorageV1Api(api_client=self._api_client),
112 CUSTOM_OBJECT_CLIENT: kclient.CustomObjectsApi(api_client=self._api_client),
113 }
garciadeblas6d8acf32025-02-06 13:34:37 +0100114 self.logger.info(f"Kubectl cfg file: {config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100115
116 @property
117 def configuration(self):
118 return self._configuration
119
120 @property
121 def clients(self):
122 return self._clients
123
124 def get_services(
125 self,
126 field_selector: str = None,
127 label_selector: str = None,
128 ) -> typing.List[typing.Dict]:
129 """
130 Get Service list from a namespace
131
132 :param: field_selector: Kubernetes field selector for the namespace
133 :param: label_selector: Kubernetes label selector for the namespace
134
135 :return: List of the services matching the selectors specified
136 """
137 kwargs = {}
138 if field_selector:
139 kwargs["field_selector"] = field_selector
140 if label_selector:
141 kwargs["label_selector"] = label_selector
142 try:
143 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
144 return [
145 {
146 "name": i.metadata.name,
147 "cluster_ip": i.spec.cluster_ip,
148 "type": i.spec.type,
149 "ports": (
150 [
151 {
152 "name": p.name,
153 "node_port": p.node_port,
154 "port": p.port,
155 "protocol": p.protocol,
156 "target_port": p.target_port,
157 }
158 for p in i.spec.ports
159 ]
160 if i.spec.ports
161 else []
162 ),
163 "external_ip": [i.ip for i in i.status.load_balancer.ingress]
164 if i.status.load_balancer.ingress
165 else None,
166 }
167 for i in result.items
168 ]
169 except ApiException as e:
170 self.logger.error("Error calling get services: {}".format(e))
171 raise e
172
173 def get_default_storage_class(self) -> str:
174 """
175 Default storage class
176
177 :return: Returns the default storage class name, if exists.
178 If not, it returns the first storage class.
179 If there are not storage classes, returns None
180 """
181 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
182 selected_sc = None
183 default_sc_annotations = {
184 "storageclass.kubernetes.io/is-default-class": "true",
185 # Older clusters still use the beta annotation.
186 "storageclass.beta.kubernetes.io/is-default-class": "true",
187 }
188 for sc in storage_classes.items:
189 if not selected_sc:
190 # Select the first storage class in case there is no a default-class
191 selected_sc = sc.metadata.name
192 annotations = sc.metadata.annotations or {}
193 if any(
194 k in annotations and annotations[k] == v
195 for k, v in default_sc_annotations.items()
196 ):
197 # Default storage
198 selected_sc = sc.metadata.name
199 break
200 return selected_sc
201
202 def create_cluster_role(
203 self,
204 name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100205 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100206 namespace: str = "kube-system",
207 ):
208 """
209 Create a cluster role
210
211 :param: name: Name of the cluster role
212 :param: labels: Labels for cluster role metadata
213 :param: namespace: Kubernetes namespace for cluster role metadata
214 Default: kube-system
215 """
216 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
217 field_selector="metadata.name={}".format(name)
218 )
219
220 if len(cluster_roles.items) > 0:
221 raise Exception("Role with metadata.name={} already exists".format(name))
222
223 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
224 # Cluster role
225 cluster_role = V1ClusterRole(
226 metadata=metadata,
227 rules=[
228 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
229 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
230 ],
231 )
232
233 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
234
235 async def create_role(
236 self,
237 name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100238 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100239 api_groups: list,
240 resources: list,
241 verbs: list,
242 namespace: str,
243 ):
244 """
245 Create a role with one PolicyRule
246
247 :param: name: Name of the namespaced Role
248 :param: labels: Labels for namespaced Role metadata
249 :param: api_groups: List with api-groups allowed in the policy rule
250 :param: resources: List with resources allowed in the policy rule
251 :param: verbs: List with verbs allowed in the policy rule
252 :param: namespace: Kubernetes namespace for Role metadata
253
254 :return: None
255 """
256
257 roles = self.clients[RBAC_CLIENT].list_namespaced_role(
258 namespace, field_selector="metadata.name={}".format(name)
259 )
260
261 if len(roles.items) > 0:
262 raise Exception("Role with metadata.name={} already exists".format(name))
263
264 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
265
266 role = V1Role(
267 metadata=metadata,
268 rules=[
269 V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
270 ],
271 )
272
273 self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
274
275 def delete_cluster_role(self, name: str):
276 """
277 Delete a cluster role
278
279 :param: name: Name of the cluster role
280 """
281 self.clients[RBAC_CLIENT].delete_cluster_role(name)
282
283 def _get_kubectl_version(self):
garciadeblas6d8acf32025-02-06 13:34:37 +0100284 self.logger.debug("Enter _get_kubectl_version function")
285 version = VersionApi(api_client=self._api_client).get_code()
almagiacdd20ae2024-12-13 09:45:45 +0100286 return "{}.{}".format(version.major, version.minor)
287
288 def _need_to_create_new_secret(self):
289 min_k8s_version = "1.24"
290 current_k8s_version = self._get_kubectl_version()
291 return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
292
293 def _get_secret_name(self, service_account_name: str):
294 random_alphanum = str(uuid.uuid4())[:5]
295 return "{}-token-{}".format(service_account_name, random_alphanum)
296
297 def _create_service_account_secret(
298 self,
299 service_account_name: str,
300 namespace: str,
301 secret_name: str,
302 ):
303 """
304 Create a secret for the service account. K8s version >= 1.24
305
306 :param: service_account_name: Name of the service account
307 :param: namespace: Kubernetes namespace for service account metadata
308 :param: secret_name: Name of the secret
309 """
310 v1_core = self.clients[CORE_CLIENT]
311 secrets = v1_core.list_namespaced_secret(
312 namespace, field_selector="metadata.name={}".format(secret_name)
313 ).items
314
315 if len(secrets) > 0:
316 raise Exception(
317 "Secret with metadata.name={} already exists".format(secret_name)
318 )
319
320 annotations = {"kubernetes.io/service-account.name": service_account_name}
321 metadata = V1ObjectMeta(
322 name=secret_name, namespace=namespace, annotations=annotations
323 )
324 type = "kubernetes.io/service-account-token"
325 secret = V1Secret(metadata=metadata, type=type)
326 v1_core.create_namespaced_secret(namespace, secret)
327
328 def _get_secret_reference_list(self, namespace: str, secret_name: str):
329 """
330 Return a secret reference list with one secret.
331 K8s version >= 1.24
332
333 :param: namespace: Kubernetes namespace for service account metadata
334 :param: secret_name: Name of the secret
335 :rtype: list[V1SecretReference]
336 """
337 return [V1SecretReference(name=secret_name, namespace=namespace)]
338
339 def create_service_account(
340 self,
341 name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100342 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100343 namespace: str = "kube-system",
344 ):
345 """
346 Create a service account
347
348 :param: name: Name of the service account
349 :param: labels: Labels for service account metadata
350 :param: namespace: Kubernetes namespace for service account metadata
351 Default: kube-system
352 """
353 v1_core = self.clients[CORE_CLIENT]
354 service_accounts = v1_core.list_namespaced_service_account(
355 namespace, field_selector="metadata.name={}".format(name)
356 )
357 if len(service_accounts.items) > 0:
358 raise Exception(
359 "Service account with metadata.name={} already exists".format(name)
360 )
361
362 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
363
364 if self._need_to_create_new_secret():
365 secret_name = self._get_secret_name(name)
366 secrets = self._get_secret_reference_list(namespace, secret_name)
367 service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
368 v1_core.create_namespaced_service_account(namespace, service_account)
369 self._create_service_account_secret(name, namespace, secret_name)
370 else:
371 service_account = V1ServiceAccount(metadata=metadata)
372 v1_core.create_namespaced_service_account(namespace, service_account)
373
374 def delete_secret(self, name: str, namespace: str = "kube-system"):
375 """
376 Delete a secret
377
378 :param: name: Name of the secret
379 :param: namespace: Kubernetes namespace
380 Default: kube-system
381 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100382 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100383 self.clients[CORE_CLIENT].delete_namespaced_secret(name, namespace)
384
385 def delete_service_account(self, name: str, namespace: str = "kube-system"):
386 """
387 Delete a service account
388
389 :param: name: Name of the service account
390 :param: namespace: Kubernetes namespace for service account metadata
391 Default: kube-system
392 """
393 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
394
395 def create_cluster_role_binding(
garciadeblas6bbe55e2025-02-08 11:30:39 +0100396 self, name: str, labels: typing.Dict[str, str], namespace: str = "kube-system"
almagiacdd20ae2024-12-13 09:45:45 +0100397 ):
398 """
399 Create a cluster role binding
400
401 :param: name: Name of the cluster role
402 :param: labels: Labels for cluster role binding metadata
403 :param: namespace: Kubernetes namespace for cluster role binding metadata
404 Default: kube-system
405 """
406 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
407 field_selector="metadata.name={}".format(name)
408 )
409 if len(role_bindings.items) > 0:
410 raise Exception("Generated rbac id already exists")
411
412 role_binding = V1ClusterRoleBinding(
413 metadata=V1ObjectMeta(name=name, labels=labels),
414 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
415 subjects=[
416 RbacV1Subject(kind="ServiceAccount", name=name, namespace=namespace)
417 ],
418 )
419 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
420
421 async def create_role_binding(
422 self,
423 name: str,
424 role_name: str,
425 sa_name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100426 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100427 namespace: str,
428 ):
429 """
430 Create a cluster role binding
431
432 :param: name: Name of the namespaced Role Binding
433 :param: role_name: Name of the namespaced Role to be bound
434 :param: sa_name: Name of the Service Account to be bound
435 :param: labels: Labels for Role Binding metadata
436 :param: namespace: Kubernetes namespace for Role Binding metadata
437
438 :return: None
439 """
440 role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
441 namespace, field_selector="metadata.name={}".format(name)
442 )
443 if len(role_bindings.items) > 0:
444 raise Exception(
445 "Role Binding with metadata.name={} already exists".format(name)
446 )
447
448 role_binding = V1RoleBinding(
449 metadata=V1ObjectMeta(name=name, labels=labels),
450 role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
451 subjects=[
452 RbacV1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
453 ],
454 )
455 self.clients[RBAC_CLIENT].create_namespaced_role_binding(
456 namespace, role_binding
457 )
458
459 def delete_cluster_role_binding(self, name: str):
460 """
461 Delete a cluster role binding
462
463 :param: name: Name of the cluster role binding
464 """
465 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
466
467 @retry(
468 attempts=10,
469 delay=1,
470 fallback=Exception("Failed getting the secret from service account"),
471 callback=retry_callback,
472 )
473 async def get_secret_data(
474 self, name: str, namespace: str = "kube-system"
garciadeblas6d8acf32025-02-06 13:34:37 +0100475 ) -> typing.Tuple[str, str]:
almagiacdd20ae2024-12-13 09:45:45 +0100476 """
477 Get secret data
478
479 :param: name: Name of the secret data
480 :param: namespace: Name of the namespace where the secret is stored
481
482 :return: Tuple with the token and client certificate
483 """
484 v1_core = self.clients[CORE_CLIENT]
485
486 secret_name = None
487
488 service_accounts = v1_core.list_namespaced_service_account(
489 namespace, field_selector="metadata.name={}".format(name)
490 )
491 if len(service_accounts.items) == 0:
492 raise Exception(
493 "Service account not found with metadata.name={}".format(name)
494 )
495 service_account = service_accounts.items[0]
496 if service_account.secrets and len(service_account.secrets) > 0:
497 secret_name = service_account.secrets[0].name
498 if not secret_name:
499 raise Exception(
500 "Failed getting the secret from service account {}".format(name)
501 )
502 # TODO: refactor to use get_secret_content
503 secret = v1_core.list_namespaced_secret(
504 namespace, field_selector="metadata.name={}".format(secret_name)
505 ).items[0]
506
507 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
508 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
509
510 return (
511 base64.b64decode(token).decode("utf-8"),
512 base64.b64decode(client_certificate_data).decode("utf-8"),
513 )
514
515 @retry(
516 attempts=10,
517 delay=1,
518 fallback=Exception("Failed getting data from the secret"),
519 )
520 async def get_secret_content(
521 self,
522 name: str,
523 namespace: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100524 ) -> typing.Dict:
almagiacdd20ae2024-12-13 09:45:45 +0100525 """
526 Get secret data
527
528 :param: name: Name of the secret
529 :param: namespace: Name of the namespace where the secret is stored
530
531 :return: Dictionary with secret's data
532 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100533 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100534 v1_core = self.clients[CORE_CLIENT]
535
536 secret = v1_core.read_namespaced_secret(name, namespace)
537
538 return secret.data
539
540 @retry(
541 attempts=10,
542 delay=1,
543 fallback=Exception("Failed creating the secret"),
544 )
545 async def create_secret(
546 self, name: str, data: dict, namespace: str, secret_type: str
547 ):
548 """
549 Create secret with data
550
551 :param: name: Name of the secret
552 :param: data: Dict with data content. Values must be already base64 encoded
553 :param: namespace: Name of the namespace where the secret will be stored
554 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
555
556 :return: None
557 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100558 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
garciadeblas6d8acf32025-02-06 13:34:37 +0100559 self.logger.debug("Enter create_secret function")
almagiacdd20ae2024-12-13 09:45:45 +0100560 v1_core = self.clients[CORE_CLIENT]
garciadeblas6d8acf32025-02-06 13:34:37 +0100561 self.logger.debug(f"v1_core: {v1_core}")
almagiacdd20ae2024-12-13 09:45:45 +0100562 metadata = V1ObjectMeta(name=name, namespace=namespace)
garciadeblas6d8acf32025-02-06 13:34:37 +0100563 self.logger.debug(f"metadata: {metadata}")
almagiacdd20ae2024-12-13 09:45:45 +0100564 secret = V1Secret(metadata=metadata, data=data, type=secret_type)
garciadeblas6d8acf32025-02-06 13:34:37 +0100565 self.logger.debug(f"secret: {secret}")
garciadeblasc58e9962025-06-17 18:16:20 +0200566 try:
567 v1_core.create_namespaced_secret(namespace, secret)
568 self.logger.info("Namespaced secret was created")
569 except ApiException as e:
570 self.logger.error(f"Failed to create namespaced secret: {e}")
571 raise
almagiacdd20ae2024-12-13 09:45:45 +0100572
rshrif8911b92025-06-11 18:19:07 +0000573 async def create_configmap(self, name: str, data: dict, namespace: str):
574 """
575 Create secret with data
576
577 :param: name: Name of the configmap
578 :param: data: Dict with data content.
579 :param: namespace: Name of the namespace where the configmap will be stored
580
581 :return: None
582 """
583 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
584 self.logger.debug("Enter create_configmap function")
585 v1_core = self.clients[CORE_CLIENT]
586 self.logger.debug(f"v1_core: {v1_core}")
587 config_map = V1ConfigMap(
588 metadata=V1ObjectMeta(name=name, namespace=namespace),
589 data=data,
590 )
591 self.logger.debug(f"config_map: {config_map}")
592 try:
593 v1_core.create_namespaced_config_map(namespace, config_map)
594 self.logger.info("Namespaced configmap was created")
595 except ApiException as e:
596 self.logger.error(f"Failed to create namespaced configmap: {e}")
597 raise
598
599 def delete_configmap(self, name: str, namespace: str):
600 """
601 Delete a configmap
602
603 :param: name: Name of the configmap
604 :param: namespace: Kubernetes namespace
605 """
606 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
607 self.clients[CORE_CLIENT].delete_namespaced_config_map(name, namespace)
608
almagiacdd20ae2024-12-13 09:45:45 +0100609 async def create_certificate(
610 self,
611 namespace: str,
612 name: str,
613 dns_prefix: str,
614 secret_name: str,
615 usages: list,
616 issuer_name: str,
617 ):
618 """
619 Creates cert-manager certificate object
620
621 :param: namespace: Name of the namespace where the certificate and secret is stored
622 :param: name: Name of the certificate object
623 :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
624 :param: secret_name: Name of the secret created by cert-manager
625 :param: usages: List of X.509 key usages
626 :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
627
628 """
629 certificate_body = {
630 "apiVersion": "cert-manager.io/v1",
631 "kind": "Certificate",
632 "metadata": {"name": name, "namespace": namespace},
633 "spec": {
634 "secretName": secret_name,
635 "privateKey": {
636 "rotationPolicy": "Always",
637 "algorithm": "ECDSA",
638 "size": 256,
639 },
640 "duration": "8760h", # 1 Year
641 "renewBefore": "2208h", # 9 months
642 "subject": {"organizations": ["osm"]},
643 "commonName": "osm",
644 "isCA": False,
645 "usages": usages,
646 "dnsNames": [
647 "{}.{}".format(dns_prefix, namespace),
648 "{}.{}.svc".format(dns_prefix, namespace),
649 "{}.{}.svc.cluster".format(dns_prefix, namespace),
650 "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
651 ],
652 "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
653 },
654 }
655 client = self.clients[CUSTOM_OBJECT_CLIENT]
656 try:
657 client.create_namespaced_custom_object(
658 group="cert-manager.io",
659 plural="certificates",
660 version="v1",
661 body=certificate_body,
662 namespace=namespace,
663 )
664 except ApiException as e:
665 info = json.loads(e.body)
666 if info.get("reason").lower() == "alreadyexists":
667 self.logger.warning("Certificate already exists: {}".format(e))
668 else:
669 raise e
670
671 async def delete_certificate(self, namespace, object_name):
672 client = self.clients[CUSTOM_OBJECT_CLIENT]
673 try:
674 client.delete_namespaced_custom_object(
675 group="cert-manager.io",
676 plural="certificates",
677 version="v1",
678 name=object_name,
679 namespace=namespace,
680 )
681 except ApiException as e:
682 info = json.loads(e.body)
683 if info.get("reason").lower() == "notfound":
684 self.logger.warning("Certificate already deleted: {}".format(e))
685 else:
686 raise e
687
688 @retry(
689 attempts=10,
690 delay=1,
691 fallback=Exception("Failed creating the namespace"),
692 )
693 async def create_namespace(self, name: str, labels: dict = None):
694 """
695 Create a namespace
696
697 :param: name: Name of the namespace to be created
698 :param: labels: Dictionary with labels for the new namespace
699
700 """
701 v1_core = self.clients[CORE_CLIENT]
702 metadata = V1ObjectMeta(name=name, labels=labels)
703 namespace = V1Namespace(
704 metadata=metadata,
705 )
706
707 try:
708 v1_core.create_namespace(namespace)
709 self.logger.debug("Namespace created: {}".format(name))
710 except ApiException as e:
711 info = json.loads(e.body)
712 if info.get("reason").lower() == "alreadyexists":
713 self.logger.warning("Namespace already exists: {}".format(e))
714 else:
715 raise e
716
717 @retry(
718 attempts=10,
719 delay=1,
720 fallback=Exception("Failed deleting the namespace"),
721 )
722 async def delete_namespace(self, name: str):
723 """
724 Delete a namespace
725
726 :param: name: Name of the namespace to be deleted
727
728 """
729 try:
730 self.clients[CORE_CLIENT].delete_namespace(name)
731 except ApiException as e:
732 if e.reason == "Not Found":
733 self.logger.warning("Namespace already deleted: {}".format(e))
734
735 def get_secrets(
736 self,
737 namespace: str,
738 field_selector: str = None,
739 ) -> typing.List[typing.Dict]:
740 """
741 Get Secret list from a namespace
742
743 :param: namespace: Kubernetes namespace
744 :param: field_selector: Kubernetes field selector
745
746 :return: List of the secrets matching the selectors specified
747 """
748 try:
749 v1_core = self.clients[CORE_CLIENT]
750 secrets = v1_core.list_namespaced_secret(
751 namespace=namespace,
752 field_selector=field_selector,
753 ).items
754 return secrets
755 except ApiException as e:
756 self.logger.error("Error calling get secrets: {}".format(e))
757 raise e
758
759 def create_generic_object(
760 self,
761 api_group: str,
762 api_plural: str,
763 api_version: str,
764 namespace: str,
765 manifest_dict: dict,
766 ):
767 """
768 Creates generic object
769
770 :param: api_group: API Group
771 :param: api_plural: API Plural
772 :param: api_version: API Version
773 :param: namespace: Namespace
774 :param: manifest_dict: Dictionary with the content of the Kubernetes manifest
775
776 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100777 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100778 client = self.clients[CUSTOM_OBJECT_CLIENT]
779 try:
780 if namespace:
781 client.create_namespaced_custom_object(
782 group=api_group,
783 plural=api_plural,
784 version=api_version,
785 body=manifest_dict,
786 namespace=namespace,
787 )
788 else:
789 client.create_cluster_custom_object(
790 group=api_group,
791 plural=api_plural,
792 version=api_version,
793 body=manifest_dict,
794 )
795 except ApiException as e:
796 info = json.loads(e.body)
797 if info.get("reason").lower() == "alreadyexists":
798 self.logger.warning("Object already exists: {}".format(e))
799 else:
800 raise e
801
802 def delete_generic_object(
803 self,
804 api_group: str,
805 api_plural: str,
806 api_version: str,
807 namespace: str,
808 name: str,
809 ):
810 """
811 Deletes generic object
812
813 :param: api_group: API Group
814 :param: api_plural: API Plural
815 :param: api_version: API Version
816 :param: namespace: Namespace
817 :param: name: Name of the object
818
819 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100820 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100821 client = self.clients[CUSTOM_OBJECT_CLIENT]
822 try:
823 if namespace:
824 client.delete_namespaced_custom_object(
825 group=api_group,
826 plural=api_plural,
827 version=api_version,
828 name=name,
829 namespace=namespace,
830 )
831 else:
832 client.delete_cluster_custom_object(
833 group=api_group,
834 plural=api_plural,
835 version=api_version,
836 name=name,
837 )
838 except ApiException as e:
839 info = json.loads(e.body)
840 if info.get("reason").lower() == "notfound":
841 self.logger.warning("Object already deleted: {}".format(e))
842 else:
843 raise e
844
845 async def get_generic_object(
846 self,
847 api_group: str,
848 api_plural: str,
849 api_version: str,
850 namespace: str,
851 name: str,
852 ):
853 """
854 Gets generic object
855
856 :param: api_group: API Group
857 :param: api_plural: API Plural
858 :param: api_version: API Version
859 :param: namespace: Namespace
860 :param: name: Name of the object
861
862 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100863 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100864 client = self.clients[CUSTOM_OBJECT_CLIENT]
865 try:
866 if namespace:
867 object_dict = client.list_namespaced_custom_object(
868 group=api_group,
869 plural=api_plural,
870 version=api_version,
871 namespace=namespace,
872 field_selector=f"metadata.name={name}",
873 )
874 else:
875 object_dict = client.list_cluster_custom_object(
876 group=api_group,
877 plural=api_plural,
878 version=api_version,
879 field_selector=f"metadata.name={name}",
880 )
881 if len(object_dict.get("items")) == 0:
882 return None
883 return object_dict.get("items")[0]
884 except ApiException as e:
885 self.logger.debug(f"Exception: {e}")
886 info = json.loads(e.body)
garciadeblas3a461df2025-12-17 08:20:58 +0100887 self.logger.debug(f"Api Exception: {e}. Reason: {info.get('reason')}")
888 return None
almagiacdd20ae2024-12-13 09:45:45 +0100889
890 async def list_generic_object(
891 self,
892 api_group: str,
893 api_plural: str,
894 api_version: str,
895 namespace: str,
896 ):
897 """
898 Lists all generic objects of the requested API group
899
900 :param: api_group: API Group
901 :param: api_plural: API Plural
902 :param: api_version: API Version
903 :param: namespace: Namespace
904
905 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100906 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100907 client = self.clients[CUSTOM_OBJECT_CLIENT]
908 try:
909 if namespace:
910 object_dict = client.list_namespaced_custom_object(
911 group=api_group,
912 plural=api_plural,
913 version=api_version,
914 namespace=namespace,
915 )
916 else:
917 object_dict = client.list_cluster_custom_object(
918 group=api_group,
919 plural=api_plural,
920 version=api_version,
921 )
922 self.logger.debug(f"Object-list: {object_dict.get('items')}")
923 return object_dict.get("items")
924 except ApiException as e:
925 self.logger.debug(f"Exception: {e}")
926 info = json.loads(e.body)
927 if info.get("reason").lower() == "notfound":
928 self.logger.warning(
929 "Cannot find specified custom objects: {}".format(e)
930 )
931 return []
932 else:
933 raise e
934
935 @retry(
936 attempts=10,
937 delay=1,
938 fallback=Exception("Failed creating the secret"),
939 )
940 async def create_secret_string(
941 self, name: str, string_data: str, namespace: str, secret_type: str
942 ):
943 """
944 Create secret with data
945
946 :param: name: Name of the secret
947 :param: string_data: String with data content
948 :param: namespace: Name of the namespace where the secret will be stored
949 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
950
951 :return: None
952 """
953 v1_core = self.clients[CORE_CLIENT]
954 metadata = V1ObjectMeta(name=name, namespace=namespace)
955 secret = V1Secret(metadata=metadata, string_data=string_data, type=secret_type)
956 v1_core.create_namespaced_secret(namespace, secret)
957
958 @retry(
959 attempts=10,
960 delay=1,
961 fallback=Exception("Failed creating the pvc"),
962 )
963 async def create_pvc(self, name: str, namespace: str):
964 """
965 Create a namespace
966
967 :param: name: Name of the pvc to be created
968 :param: namespace: Name of the namespace where the pvc will be stored
969
970 """
971 try:
972 pvc = V1PersistentVolumeClaim(
973 api_version="v1",
974 kind="PersistentVolumeClaim",
975 metadata=V1ObjectMeta(name=name),
976 spec=V1PersistentVolumeClaimSpec(
977 access_modes=["ReadWriteOnce"],
978 resources=V1ResourceRequirements(requests={"storage": "100Mi"}),
979 ),
980 )
981 self.clients[CORE_CLIENT].create_namespaced_persistent_volume_claim(
982 namespace=namespace, body=pvc
983 )
984 except ApiException as e:
985 info = json.loads(e.body)
986 if info.get("reason").lower() == "alreadyexists":
987 self.logger.warning("PVC already exists: {}".format(e))
988 else:
989 raise e
990
991 @retry(
992 attempts=10,
993 delay=1,
994 fallback=Exception("Failed deleting the pvc"),
995 )
996 async def delete_pvc(self, name: str, namespace: str):
997 """
998 Create a namespace
999
1000 :param: name: Name of the pvc to be deleted
1001 :param: namespace: Namespace
1002
1003 """
1004 self.clients[CORE_CLIENT].delete_namespaced_persistent_volume_claim(
1005 name, namespace
1006 )
1007
1008 def copy_file_to_pod(
1009 self, namespace, pod_name, container_name, src_file, dest_path
garciadeblas61a4c692025-07-17 13:04:13 +02001010 ) -> bool:
1011 try:
1012 # Create the destination directory in the pod
1013 dest_dir = os.path.dirname(dest_path)
1014 if dest_dir:
1015 self.logger.debug(f"Creating directory {dest_dir} in pod {pod_name}")
1016 mkdir_command = ["mkdir", "-p", dest_dir]
almagiacdd20ae2024-12-13 09:45:45 +01001017
garciadeblas61a4c692025-07-17 13:04:13 +02001018 resp = stream(
1019 self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
1020 pod_name,
1021 namespace,
1022 command=mkdir_command,
1023 container=container_name,
1024 stdin=False,
1025 stderr=True,
1026 stdout=True,
1027 tty=False,
1028 _preload_content=False,
1029 )
1030 resp.close()
1031 self.logger.debug(f"Directory {dest_dir} created")
almagiacdd20ae2024-12-13 09:45:45 +01001032
garciadeblas61a4c692025-07-17 13:04:13 +02001033 # Create an in-memory tar file containing the source file
1034 self.logger.debug(
1035 f"Creating in-memory tar of {src_file} as {dest_path.split('/')[-1]}"
1036 )
1037 tar_buffer = io.BytesIO()
1038 with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
1039 tar.add(src_file, arcname=dest_path.split("/")[-1])
almagiacdd20ae2024-12-13 09:45:45 +01001040
garciadeblas61a4c692025-07-17 13:04:13 +02001041 tar_buffer.seek(0)
1042 self.logger.debug(
1043 f"Tar buffer created, size={tar_buffer.getbuffer().nbytes} bytes"
1044 )
almagiacdd20ae2024-12-13 09:45:45 +01001045
garciadeblas61a4c692025-07-17 13:04:13 +02001046 # Define the command to extract the tar file in the pod
1047 exec_command = ["tar", "xvf", "-", "-C", dest_dir]
1048 self.logger.debug(f"Exec command prepared: {exec_command}")
1049
1050 # Execute the command
1051 resp = stream(
1052 self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
1053 pod_name,
1054 namespace,
1055 command=exec_command,
1056 container=container_name,
1057 stdin=True,
1058 stderr=True,
1059 stdout=True,
1060 tty=False,
1061 _preload_content=False,
1062 )
1063 self.logger.debug(
1064 f"Started exec stream to pod {pod_name} (ns={namespace}, container={container_name})"
1065 )
1066
1067 # Write the tar data to the pod
1068 data = tar_buffer.read()
1069 self.logger.debug(f"Writing {len(data)} bytes to pod stdin")
1070 resp.write_stdin(data)
1071 self.logger.debug("Data written to pod stdin")
1072 resp.close()
1073 self.logger.debug("Exec stream closed")
1074 return True
1075 except Exception as e:
1076 self.logger.error(f"Failed to copy file {src_file} to pod: {e}")
1077 return False
almagiacdd20ae2024-12-13 09:45:45 +01001078
1079 @retry(
1080 attempts=10,
1081 delay=1,
1082 fallback=Exception("Failed creating the pvc"),
1083 )
1084 async def create_pvc_with_content(
garciadeblas61a4c692025-07-17 13:04:13 +02001085 self, name: str, namespace: str, src_files: typing.List, dest_files: typing.List
almagiacdd20ae2024-12-13 09:45:45 +01001086 ):
1087 """
1088 Create a PVC with content
1089
1090 :param: name: Name of the pvc to be created
1091 :param: namespace: Name of the namespace where the pvc will be stored
garciadeblas61a4c692025-07-17 13:04:13 +02001092 :param: src_files: List of source files to be copied
1093 :param: dest_files: List of destination filenames (paired with src_files)
almagiacdd20ae2024-12-13 09:45:45 +01001094 """
1095 pod_name = f"copy-pod-{name}"
1096 self.logger.debug(f"Creating pvc {name}")
1097 await self.create_pvc(name=name, namespace=namespace)
1098 self.logger.debug("Sleeping")
1099 sleep(40)
1100 self.logger.debug(f"Creating pod {pod_name}")
1101 await self.create_copy_pod(name=pod_name, namespace=namespace, pvc_name=name)
1102 self.logger.debug("Sleeping")
1103 sleep(40)
1104 self.logger.debug(f"Copying files to pod {pod_name}")
garciadeblas61a4c692025-07-17 13:04:13 +02001105 for src_f, dest_f in zip(src_files, dest_files):
1106 dest_path = f"/mnt/data/{dest_f}"
1107 self.logger.debug(f"Copying file {src_f} to {dest_path} in pod {pod_name}")
1108 result = self.copy_file_to_pod(
1109 namespace=namespace,
1110 pod_name=pod_name,
1111 container_name="copy-container",
1112 src_file=src_f,
1113 dest_path=dest_path,
1114 )
1115 if result:
1116 self.logger.debug(
1117 f"Successfully copied file {src_f} to {dest_path} in pod {pod_name}"
1118 )
1119 else:
1120 raise Exception(
1121 f"Failed copying file {src_f} to {dest_path} in pod {pod_name}"
1122 )
garciadeblasaed10692025-09-26 14:07:00 +02001123 self.logger.debug(f"Deleting pod {pod_name}")
1124 await self.delete_pod(pod_name, namespace)
almagiacdd20ae2024-12-13 09:45:45 +01001125
1126 @retry(
1127 attempts=10,
1128 delay=1,
1129 fallback=Exception("Failed creating the pvc"),
1130 )
1131 async def create_copy_pod(self, name: str, namespace: str, pvc_name: str):
1132 """
1133 Create a pod to copy content into a PVC
1134
1135 :param: name: Name of the pod to be created
1136 :param: namespace: Name of the namespace where the pod will be stored
1137 :param: pvc_name: Name of the PVC that the pod will mount as a volume
1138
1139 """
1140 pod = V1Pod(
1141 api_version="v1",
1142 kind="Pod",
garciadeblas6d8acf32025-02-06 13:34:37 +01001143 metadata=kclient.V1ObjectMeta(name=name),
almagiacdd20ae2024-12-13 09:45:45 +01001144 spec=V1PodSpec(
1145 containers=[
1146 V1Container(
1147 name="copy-container",
1148 image="busybox", # Imagen ligera para copiar archivos
1149 command=["sleep", "3600"], # Mantén el contenedor en ejecución
1150 volume_mounts=[
1151 V1VolumeMount(mount_path="/mnt/data", name="my-storage")
1152 ],
1153 )
1154 ],
1155 volumes=[
1156 V1Volume(
1157 name="my-storage",
1158 persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
1159 claim_name=pvc_name
1160 ),
1161 )
1162 ],
1163 ),
1164 )
1165 # Create the pod
1166 self.clients[CORE_CLIENT].create_namespaced_pod(namespace=namespace, body=pod)
1167
1168 @retry(
1169 attempts=10,
1170 delay=1,
1171 fallback=Exception("Failed deleting the pod"),
1172 )
1173 async def delete_pod(self, name: str, namespace: str):
1174 """
1175 Create a namespace
1176
1177 :param: name: Name of the pod to be deleted
1178 :param: namespace: Namespace
1179
1180 """
1181 self.clients[CORE_CLIENT].delete_namespaced_pod(name, namespace)