blob: eac857a336f677ab5a1c4ab2df4c330f900784ad [file] [log] [blame]
almagiacdd20ae2024-12-13 09:45:45 +01001#######################################################################################
2# Copyright 2020 Canonical Ltd.
3# Copyright ETSI Contributors and Others.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#######################################################################################
17
18import base64
19import logging
almagiacdd20ae2024-12-13 09:45:45 +010020import typing
21import uuid
22import json
garciadeblas53222a92025-09-11 11:38:02 +020023import yaml
almagiacdd20ae2024-12-13 09:45:45 +010024import tarfile
25import io
26from time import sleep
27
28from distutils.version import LooseVersion
29
garciadeblas6d8acf32025-02-06 13:34:37 +010030from kubernetes import client as kclient, config as kconfig
almagiacdd20ae2024-12-13 09:45:45 +010031from kubernetes.client.api import VersionApi
32from kubernetes.client.models import (
33 V1ClusterRole,
34 V1Role,
35 V1ObjectMeta,
36 V1PolicyRule,
37 V1ServiceAccount,
38 V1ClusterRoleBinding,
39 V1RoleBinding,
40 V1RoleRef,
41 RbacV1Subject,
42 V1Secret,
43 V1SecretReference,
44 V1Namespace,
45 V1PersistentVolumeClaim,
46 V1PersistentVolumeClaimSpec,
47 V1PersistentVolumeClaimVolumeSource,
48 V1ResourceRequirements,
49 V1Pod,
50 V1PodSpec,
51 V1Volume,
52 V1VolumeMount,
53 V1Container,
rshrif8911b92025-06-11 18:19:07 +000054 V1ConfigMap,
almagiacdd20ae2024-12-13 09:45:45 +010055)
56from kubernetes.client.rest import ApiException
57from kubernetes.stream import stream
58from osm_lcm.n2vc.libjuju import retry_callback
59from retrying_async import retry
60
61SERVICE_ACCOUNT_TOKEN_KEY = "token"
62SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
63# clients
64CORE_CLIENT = "core_v1"
65RBAC_CLIENT = "rbac_v1"
66STORAGE_CLIENT = "storage_v1"
67CUSTOM_OBJECT_CLIENT = "custom_object"
68
69
70class Kubectl:
71 def __init__(self, config_file=None):
almagiacdd20ae2024-12-13 09:45:45 +010072 self.logger = logging.getLogger("lcm.kubectl")
garciadeblas6d8acf32025-02-06 13:34:37 +010073 self._config_file = config_file
74 self.logger.info(f"Kubectl cfg file: {config_file}")
garciadeblas53222a92025-09-11 11:38:02 +020075
76 # Create default configuration for API client
77 self._configuration = kclient.Configuration()
78
79 # Get proxy_url
80 proxy_url = None
81 if config_file:
82 with open(config_file, "r", encoding="utf-8") as f:
83 kubeconfig_yaml = f.read()
84 try:
85 kubeconfig_dict = yaml.safe_load(kubeconfig_yaml)
86 except yaml.YAMLError as e:
87 raise e
88 proxy_url = (
89 kubeconfig_dict.get("clusters", [])[0]
90 .get("cluster", {})
91 .get("proxy-url")
92 )
93
94 # If kubeconfig has proxy configured, use it
95 if proxy_url:
96 self._configuration.proxy = proxy_url
97 self.logger.info(f"Using proxy for kubernetes: {proxy_url}")
98
99 # Create API client
100 self._api_client = kconfig.new_client_from_config(
101 config_file=config_file,
102 client_configuration=self._configuration,
103 )
104 # self._configuration = self._api_client.configuration.get_default_copy()
105
106 # Carga la config base
garciadeblas6d8acf32025-02-06 13:34:37 +0100107 self._clients = {
108 CORE_CLIENT: kclient.CoreV1Api(api_client=self._api_client),
109 RBAC_CLIENT: kclient.RbacAuthorizationV1Api(api_client=self._api_client),
110 STORAGE_CLIENT: kclient.StorageV1Api(api_client=self._api_client),
111 CUSTOM_OBJECT_CLIENT: kclient.CustomObjectsApi(api_client=self._api_client),
112 }
garciadeblas6d8acf32025-02-06 13:34:37 +0100113 self.logger.info(f"Kubectl cfg file: {config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100114
115 @property
116 def configuration(self):
117 return self._configuration
118
119 @property
120 def clients(self):
121 return self._clients
122
123 def get_services(
124 self,
125 field_selector: str = None,
126 label_selector: str = None,
127 ) -> typing.List[typing.Dict]:
128 """
129 Get Service list from a namespace
130
131 :param: field_selector: Kubernetes field selector for the namespace
132 :param: label_selector: Kubernetes label selector for the namespace
133
134 :return: List of the services matching the selectors specified
135 """
136 kwargs = {}
137 if field_selector:
138 kwargs["field_selector"] = field_selector
139 if label_selector:
140 kwargs["label_selector"] = label_selector
141 try:
142 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
143 return [
144 {
145 "name": i.metadata.name,
146 "cluster_ip": i.spec.cluster_ip,
147 "type": i.spec.type,
148 "ports": (
149 [
150 {
151 "name": p.name,
152 "node_port": p.node_port,
153 "port": p.port,
154 "protocol": p.protocol,
155 "target_port": p.target_port,
156 }
157 for p in i.spec.ports
158 ]
159 if i.spec.ports
160 else []
161 ),
162 "external_ip": [i.ip for i in i.status.load_balancer.ingress]
163 if i.status.load_balancer.ingress
164 else None,
165 }
166 for i in result.items
167 ]
168 except ApiException as e:
169 self.logger.error("Error calling get services: {}".format(e))
170 raise e
171
172 def get_default_storage_class(self) -> str:
173 """
174 Default storage class
175
176 :return: Returns the default storage class name, if exists.
177 If not, it returns the first storage class.
178 If there are not storage classes, returns None
179 """
180 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
181 selected_sc = None
182 default_sc_annotations = {
183 "storageclass.kubernetes.io/is-default-class": "true",
184 # Older clusters still use the beta annotation.
185 "storageclass.beta.kubernetes.io/is-default-class": "true",
186 }
187 for sc in storage_classes.items:
188 if not selected_sc:
189 # Select the first storage class in case there is no a default-class
190 selected_sc = sc.metadata.name
191 annotations = sc.metadata.annotations or {}
192 if any(
193 k in annotations and annotations[k] == v
194 for k, v in default_sc_annotations.items()
195 ):
196 # Default storage
197 selected_sc = sc.metadata.name
198 break
199 return selected_sc
200
201 def create_cluster_role(
202 self,
203 name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100204 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100205 namespace: str = "kube-system",
206 ):
207 """
208 Create a cluster role
209
210 :param: name: Name of the cluster role
211 :param: labels: Labels for cluster role metadata
212 :param: namespace: Kubernetes namespace for cluster role metadata
213 Default: kube-system
214 """
215 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
216 field_selector="metadata.name={}".format(name)
217 )
218
219 if len(cluster_roles.items) > 0:
220 raise Exception("Role with metadata.name={} already exists".format(name))
221
222 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
223 # Cluster role
224 cluster_role = V1ClusterRole(
225 metadata=metadata,
226 rules=[
227 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
228 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
229 ],
230 )
231
232 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
233
234 async def create_role(
235 self,
236 name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100237 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100238 api_groups: list,
239 resources: list,
240 verbs: list,
241 namespace: str,
242 ):
243 """
244 Create a role with one PolicyRule
245
246 :param: name: Name of the namespaced Role
247 :param: labels: Labels for namespaced Role metadata
248 :param: api_groups: List with api-groups allowed in the policy rule
249 :param: resources: List with resources allowed in the policy rule
250 :param: verbs: List with verbs allowed in the policy rule
251 :param: namespace: Kubernetes namespace for Role metadata
252
253 :return: None
254 """
255
256 roles = self.clients[RBAC_CLIENT].list_namespaced_role(
257 namespace, field_selector="metadata.name={}".format(name)
258 )
259
260 if len(roles.items) > 0:
261 raise Exception("Role with metadata.name={} already exists".format(name))
262
263 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
264
265 role = V1Role(
266 metadata=metadata,
267 rules=[
268 V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
269 ],
270 )
271
272 self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
273
274 def delete_cluster_role(self, name: str):
275 """
276 Delete a cluster role
277
278 :param: name: Name of the cluster role
279 """
280 self.clients[RBAC_CLIENT].delete_cluster_role(name)
281
282 def _get_kubectl_version(self):
garciadeblas6d8acf32025-02-06 13:34:37 +0100283 self.logger.debug("Enter _get_kubectl_version function")
284 version = VersionApi(api_client=self._api_client).get_code()
almagiacdd20ae2024-12-13 09:45:45 +0100285 return "{}.{}".format(version.major, version.minor)
286
287 def _need_to_create_new_secret(self):
288 min_k8s_version = "1.24"
289 current_k8s_version = self._get_kubectl_version()
290 return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
291
292 def _get_secret_name(self, service_account_name: str):
293 random_alphanum = str(uuid.uuid4())[:5]
294 return "{}-token-{}".format(service_account_name, random_alphanum)
295
296 def _create_service_account_secret(
297 self,
298 service_account_name: str,
299 namespace: str,
300 secret_name: str,
301 ):
302 """
303 Create a secret for the service account. K8s version >= 1.24
304
305 :param: service_account_name: Name of the service account
306 :param: namespace: Kubernetes namespace for service account metadata
307 :param: secret_name: Name of the secret
308 """
309 v1_core = self.clients[CORE_CLIENT]
310 secrets = v1_core.list_namespaced_secret(
311 namespace, field_selector="metadata.name={}".format(secret_name)
312 ).items
313
314 if len(secrets) > 0:
315 raise Exception(
316 "Secret with metadata.name={} already exists".format(secret_name)
317 )
318
319 annotations = {"kubernetes.io/service-account.name": service_account_name}
320 metadata = V1ObjectMeta(
321 name=secret_name, namespace=namespace, annotations=annotations
322 )
323 type = "kubernetes.io/service-account-token"
324 secret = V1Secret(metadata=metadata, type=type)
325 v1_core.create_namespaced_secret(namespace, secret)
326
327 def _get_secret_reference_list(self, namespace: str, secret_name: str):
328 """
329 Return a secret reference list with one secret.
330 K8s version >= 1.24
331
332 :param: namespace: Kubernetes namespace for service account metadata
333 :param: secret_name: Name of the secret
334 :rtype: list[V1SecretReference]
335 """
336 return [V1SecretReference(name=secret_name, namespace=namespace)]
337
338 def create_service_account(
339 self,
340 name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100341 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100342 namespace: str = "kube-system",
343 ):
344 """
345 Create a service account
346
347 :param: name: Name of the service account
348 :param: labels: Labels for service account metadata
349 :param: namespace: Kubernetes namespace for service account metadata
350 Default: kube-system
351 """
352 v1_core = self.clients[CORE_CLIENT]
353 service_accounts = v1_core.list_namespaced_service_account(
354 namespace, field_selector="metadata.name={}".format(name)
355 )
356 if len(service_accounts.items) > 0:
357 raise Exception(
358 "Service account with metadata.name={} already exists".format(name)
359 )
360
361 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
362
363 if self._need_to_create_new_secret():
364 secret_name = self._get_secret_name(name)
365 secrets = self._get_secret_reference_list(namespace, secret_name)
366 service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
367 v1_core.create_namespaced_service_account(namespace, service_account)
368 self._create_service_account_secret(name, namespace, secret_name)
369 else:
370 service_account = V1ServiceAccount(metadata=metadata)
371 v1_core.create_namespaced_service_account(namespace, service_account)
372
373 def delete_secret(self, name: str, namespace: str = "kube-system"):
374 """
375 Delete a secret
376
377 :param: name: Name of the secret
378 :param: namespace: Kubernetes namespace
379 Default: kube-system
380 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100381 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100382 self.clients[CORE_CLIENT].delete_namespaced_secret(name, namespace)
383
384 def delete_service_account(self, name: str, namespace: str = "kube-system"):
385 """
386 Delete a service account
387
388 :param: name: Name of the service account
389 :param: namespace: Kubernetes namespace for service account metadata
390 Default: kube-system
391 """
392 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
393
394 def create_cluster_role_binding(
garciadeblas6bbe55e2025-02-08 11:30:39 +0100395 self, name: str, labels: typing.Dict[str, str], namespace: str = "kube-system"
almagiacdd20ae2024-12-13 09:45:45 +0100396 ):
397 """
398 Create a cluster role binding
399
400 :param: name: Name of the cluster role
401 :param: labels: Labels for cluster role binding metadata
402 :param: namespace: Kubernetes namespace for cluster role binding metadata
403 Default: kube-system
404 """
405 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
406 field_selector="metadata.name={}".format(name)
407 )
408 if len(role_bindings.items) > 0:
409 raise Exception("Generated rbac id already exists")
410
411 role_binding = V1ClusterRoleBinding(
412 metadata=V1ObjectMeta(name=name, labels=labels),
413 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
414 subjects=[
415 RbacV1Subject(kind="ServiceAccount", name=name, namespace=namespace)
416 ],
417 )
418 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
419
420 async def create_role_binding(
421 self,
422 name: str,
423 role_name: str,
424 sa_name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100425 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100426 namespace: str,
427 ):
428 """
429 Create a cluster role binding
430
431 :param: name: Name of the namespaced Role Binding
432 :param: role_name: Name of the namespaced Role to be bound
433 :param: sa_name: Name of the Service Account to be bound
434 :param: labels: Labels for Role Binding metadata
435 :param: namespace: Kubernetes namespace for Role Binding metadata
436
437 :return: None
438 """
439 role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
440 namespace, field_selector="metadata.name={}".format(name)
441 )
442 if len(role_bindings.items) > 0:
443 raise Exception(
444 "Role Binding with metadata.name={} already exists".format(name)
445 )
446
447 role_binding = V1RoleBinding(
448 metadata=V1ObjectMeta(name=name, labels=labels),
449 role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
450 subjects=[
451 RbacV1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
452 ],
453 )
454 self.clients[RBAC_CLIENT].create_namespaced_role_binding(
455 namespace, role_binding
456 )
457
458 def delete_cluster_role_binding(self, name: str):
459 """
460 Delete a cluster role binding
461
462 :param: name: Name of the cluster role binding
463 """
464 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
465
466 @retry(
467 attempts=10,
468 delay=1,
469 fallback=Exception("Failed getting the secret from service account"),
470 callback=retry_callback,
471 )
472 async def get_secret_data(
473 self, name: str, namespace: str = "kube-system"
garciadeblas6d8acf32025-02-06 13:34:37 +0100474 ) -> typing.Tuple[str, str]:
almagiacdd20ae2024-12-13 09:45:45 +0100475 """
476 Get secret data
477
478 :param: name: Name of the secret data
479 :param: namespace: Name of the namespace where the secret is stored
480
481 :return: Tuple with the token and client certificate
482 """
483 v1_core = self.clients[CORE_CLIENT]
484
485 secret_name = None
486
487 service_accounts = v1_core.list_namespaced_service_account(
488 namespace, field_selector="metadata.name={}".format(name)
489 )
490 if len(service_accounts.items) == 0:
491 raise Exception(
492 "Service account not found with metadata.name={}".format(name)
493 )
494 service_account = service_accounts.items[0]
495 if service_account.secrets and len(service_account.secrets) > 0:
496 secret_name = service_account.secrets[0].name
497 if not secret_name:
498 raise Exception(
499 "Failed getting the secret from service account {}".format(name)
500 )
501 # TODO: refactor to use get_secret_content
502 secret = v1_core.list_namespaced_secret(
503 namespace, field_selector="metadata.name={}".format(secret_name)
504 ).items[0]
505
506 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
507 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
508
509 return (
510 base64.b64decode(token).decode("utf-8"),
511 base64.b64decode(client_certificate_data).decode("utf-8"),
512 )
513
514 @retry(
515 attempts=10,
516 delay=1,
517 fallback=Exception("Failed getting data from the secret"),
518 )
519 async def get_secret_content(
520 self,
521 name: str,
522 namespace: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100523 ) -> typing.Dict:
almagiacdd20ae2024-12-13 09:45:45 +0100524 """
525 Get secret data
526
527 :param: name: Name of the secret
528 :param: namespace: Name of the namespace where the secret is stored
529
530 :return: Dictionary with secret's data
531 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100532 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100533 v1_core = self.clients[CORE_CLIENT]
534
535 secret = v1_core.read_namespaced_secret(name, namespace)
536
537 return secret.data
538
539 @retry(
540 attempts=10,
541 delay=1,
542 fallback=Exception("Failed creating the secret"),
543 )
544 async def create_secret(
545 self, name: str, data: dict, namespace: str, secret_type: str
546 ):
547 """
548 Create secret with data
549
550 :param: name: Name of the secret
551 :param: data: Dict with data content. Values must be already base64 encoded
552 :param: namespace: Name of the namespace where the secret will be stored
553 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
554
555 :return: None
556 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100557 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
garciadeblas6d8acf32025-02-06 13:34:37 +0100558 self.logger.debug("Enter create_secret function")
almagiacdd20ae2024-12-13 09:45:45 +0100559 v1_core = self.clients[CORE_CLIENT]
garciadeblas6d8acf32025-02-06 13:34:37 +0100560 self.logger.debug(f"v1_core: {v1_core}")
almagiacdd20ae2024-12-13 09:45:45 +0100561 metadata = V1ObjectMeta(name=name, namespace=namespace)
garciadeblas6d8acf32025-02-06 13:34:37 +0100562 self.logger.debug(f"metadata: {metadata}")
almagiacdd20ae2024-12-13 09:45:45 +0100563 secret = V1Secret(metadata=metadata, data=data, type=secret_type)
garciadeblas6d8acf32025-02-06 13:34:37 +0100564 self.logger.debug(f"secret: {secret}")
garciadeblasc58e9962025-06-17 18:16:20 +0200565 try:
566 v1_core.create_namespaced_secret(namespace, secret)
567 self.logger.info("Namespaced secret was created")
568 except ApiException as e:
569 self.logger.error(f"Failed to create namespaced secret: {e}")
570 raise
almagiacdd20ae2024-12-13 09:45:45 +0100571
rshrif8911b92025-06-11 18:19:07 +0000572 async def create_configmap(self, name: str, data: dict, namespace: str):
573 """
574 Create secret with data
575
576 :param: name: Name of the configmap
577 :param: data: Dict with data content.
578 :param: namespace: Name of the namespace where the configmap will be stored
579
580 :return: None
581 """
582 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
583 self.logger.debug("Enter create_configmap function")
584 v1_core = self.clients[CORE_CLIENT]
585 self.logger.debug(f"v1_core: {v1_core}")
586 config_map = V1ConfigMap(
587 metadata=V1ObjectMeta(name=name, namespace=namespace),
588 data=data,
589 )
590 self.logger.debug(f"config_map: {config_map}")
591 try:
592 v1_core.create_namespaced_config_map(namespace, config_map)
593 self.logger.info("Namespaced configmap was created")
594 except ApiException as e:
595 self.logger.error(f"Failed to create namespaced configmap: {e}")
596 raise
597
598 def delete_configmap(self, name: str, namespace: str):
599 """
600 Delete a configmap
601
602 :param: name: Name of the configmap
603 :param: namespace: Kubernetes namespace
604 """
605 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
606 self.clients[CORE_CLIENT].delete_namespaced_config_map(name, namespace)
607
almagiacdd20ae2024-12-13 09:45:45 +0100608 async def create_certificate(
609 self,
610 namespace: str,
611 name: str,
612 dns_prefix: str,
613 secret_name: str,
614 usages: list,
615 issuer_name: str,
616 ):
617 """
618 Creates cert-manager certificate object
619
620 :param: namespace: Name of the namespace where the certificate and secret is stored
621 :param: name: Name of the certificate object
622 :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
623 :param: secret_name: Name of the secret created by cert-manager
624 :param: usages: List of X.509 key usages
625 :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
626
627 """
628 certificate_body = {
629 "apiVersion": "cert-manager.io/v1",
630 "kind": "Certificate",
631 "metadata": {"name": name, "namespace": namespace},
632 "spec": {
633 "secretName": secret_name,
634 "privateKey": {
635 "rotationPolicy": "Always",
636 "algorithm": "ECDSA",
637 "size": 256,
638 },
639 "duration": "8760h", # 1 Year
640 "renewBefore": "2208h", # 9 months
641 "subject": {"organizations": ["osm"]},
642 "commonName": "osm",
643 "isCA": False,
644 "usages": usages,
645 "dnsNames": [
646 "{}.{}".format(dns_prefix, namespace),
647 "{}.{}.svc".format(dns_prefix, namespace),
648 "{}.{}.svc.cluster".format(dns_prefix, namespace),
649 "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
650 ],
651 "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
652 },
653 }
654 client = self.clients[CUSTOM_OBJECT_CLIENT]
655 try:
656 client.create_namespaced_custom_object(
657 group="cert-manager.io",
658 plural="certificates",
659 version="v1",
660 body=certificate_body,
661 namespace=namespace,
662 )
663 except ApiException as e:
664 info = json.loads(e.body)
665 if info.get("reason").lower() == "alreadyexists":
666 self.logger.warning("Certificate already exists: {}".format(e))
667 else:
668 raise e
669
670 async def delete_certificate(self, namespace, object_name):
671 client = self.clients[CUSTOM_OBJECT_CLIENT]
672 try:
673 client.delete_namespaced_custom_object(
674 group="cert-manager.io",
675 plural="certificates",
676 version="v1",
677 name=object_name,
678 namespace=namespace,
679 )
680 except ApiException as e:
681 info = json.loads(e.body)
682 if info.get("reason").lower() == "notfound":
683 self.logger.warning("Certificate already deleted: {}".format(e))
684 else:
685 raise e
686
687 @retry(
688 attempts=10,
689 delay=1,
690 fallback=Exception("Failed creating the namespace"),
691 )
692 async def create_namespace(self, name: str, labels: dict = None):
693 """
694 Create a namespace
695
696 :param: name: Name of the namespace to be created
697 :param: labels: Dictionary with labels for the new namespace
698
699 """
700 v1_core = self.clients[CORE_CLIENT]
701 metadata = V1ObjectMeta(name=name, labels=labels)
702 namespace = V1Namespace(
703 metadata=metadata,
704 )
705
706 try:
707 v1_core.create_namespace(namespace)
708 self.logger.debug("Namespace created: {}".format(name))
709 except ApiException as e:
710 info = json.loads(e.body)
711 if info.get("reason").lower() == "alreadyexists":
712 self.logger.warning("Namespace already exists: {}".format(e))
713 else:
714 raise e
715
716 @retry(
717 attempts=10,
718 delay=1,
719 fallback=Exception("Failed deleting the namespace"),
720 )
721 async def delete_namespace(self, name: str):
722 """
723 Delete a namespace
724
725 :param: name: Name of the namespace to be deleted
726
727 """
728 try:
729 self.clients[CORE_CLIENT].delete_namespace(name)
730 except ApiException as e:
731 if e.reason == "Not Found":
732 self.logger.warning("Namespace already deleted: {}".format(e))
733
734 def get_secrets(
735 self,
736 namespace: str,
737 field_selector: str = None,
738 ) -> typing.List[typing.Dict]:
739 """
740 Get Secret list from a namespace
741
742 :param: namespace: Kubernetes namespace
743 :param: field_selector: Kubernetes field selector
744
745 :return: List of the secrets matching the selectors specified
746 """
747 try:
748 v1_core = self.clients[CORE_CLIENT]
749 secrets = v1_core.list_namespaced_secret(
750 namespace=namespace,
751 field_selector=field_selector,
752 ).items
753 return secrets
754 except ApiException as e:
755 self.logger.error("Error calling get secrets: {}".format(e))
756 raise e
757
758 def create_generic_object(
759 self,
760 api_group: str,
761 api_plural: str,
762 api_version: str,
763 namespace: str,
764 manifest_dict: dict,
765 ):
766 """
767 Creates generic object
768
769 :param: api_group: API Group
770 :param: api_plural: API Plural
771 :param: api_version: API Version
772 :param: namespace: Namespace
773 :param: manifest_dict: Dictionary with the content of the Kubernetes manifest
774
775 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100776 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100777 client = self.clients[CUSTOM_OBJECT_CLIENT]
778 try:
779 if namespace:
780 client.create_namespaced_custom_object(
781 group=api_group,
782 plural=api_plural,
783 version=api_version,
784 body=manifest_dict,
785 namespace=namespace,
786 )
787 else:
788 client.create_cluster_custom_object(
789 group=api_group,
790 plural=api_plural,
791 version=api_version,
792 body=manifest_dict,
793 )
794 except ApiException as e:
795 info = json.loads(e.body)
796 if info.get("reason").lower() == "alreadyexists":
797 self.logger.warning("Object already exists: {}".format(e))
798 else:
799 raise e
800
801 def delete_generic_object(
802 self,
803 api_group: str,
804 api_plural: str,
805 api_version: str,
806 namespace: str,
807 name: str,
808 ):
809 """
810 Deletes generic object
811
812 :param: api_group: API Group
813 :param: api_plural: API Plural
814 :param: api_version: API Version
815 :param: namespace: Namespace
816 :param: name: Name of the object
817
818 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100819 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100820 client = self.clients[CUSTOM_OBJECT_CLIENT]
821 try:
822 if namespace:
823 client.delete_namespaced_custom_object(
824 group=api_group,
825 plural=api_plural,
826 version=api_version,
827 name=name,
828 namespace=namespace,
829 )
830 else:
831 client.delete_cluster_custom_object(
832 group=api_group,
833 plural=api_plural,
834 version=api_version,
835 name=name,
836 )
837 except ApiException as e:
838 info = json.loads(e.body)
839 if info.get("reason").lower() == "notfound":
840 self.logger.warning("Object already deleted: {}".format(e))
841 else:
842 raise e
843
844 async def get_generic_object(
845 self,
846 api_group: str,
847 api_plural: str,
848 api_version: str,
849 namespace: str,
850 name: str,
851 ):
852 """
853 Gets generic object
854
855 :param: api_group: API Group
856 :param: api_plural: API Plural
857 :param: api_version: API Version
858 :param: namespace: Namespace
859 :param: name: Name of the object
860
861 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100862 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100863 client = self.clients[CUSTOM_OBJECT_CLIENT]
864 try:
865 if namespace:
866 object_dict = client.list_namespaced_custom_object(
867 group=api_group,
868 plural=api_plural,
869 version=api_version,
870 namespace=namespace,
871 field_selector=f"metadata.name={name}",
872 )
873 else:
874 object_dict = client.list_cluster_custom_object(
875 group=api_group,
876 plural=api_plural,
877 version=api_version,
878 field_selector=f"metadata.name={name}",
879 )
880 if len(object_dict.get("items")) == 0:
881 return None
882 return object_dict.get("items")[0]
883 except ApiException as e:
884 self.logger.debug(f"Exception: {e}")
885 info = json.loads(e.body)
garciadeblas3a461df2025-12-17 08:20:58 +0100886 self.logger.debug(f"Api Exception: {e}. Reason: {info.get('reason')}")
887 return None
almagiacdd20ae2024-12-13 09:45:45 +0100888
889 async def list_generic_object(
890 self,
891 api_group: str,
892 api_plural: str,
893 api_version: str,
894 namespace: str,
895 ):
896 """
897 Lists all generic objects of the requested API group
898
899 :param: api_group: API Group
900 :param: api_plural: API Plural
901 :param: api_version: API Version
902 :param: namespace: Namespace
903
904 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100905 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100906 client = self.clients[CUSTOM_OBJECT_CLIENT]
907 try:
908 if namespace:
909 object_dict = client.list_namespaced_custom_object(
910 group=api_group,
911 plural=api_plural,
912 version=api_version,
913 namespace=namespace,
914 )
915 else:
916 object_dict = client.list_cluster_custom_object(
917 group=api_group,
918 plural=api_plural,
919 version=api_version,
920 )
921 self.logger.debug(f"Object-list: {object_dict.get('items')}")
922 return object_dict.get("items")
923 except ApiException as e:
924 self.logger.debug(f"Exception: {e}")
925 info = json.loads(e.body)
926 if info.get("reason").lower() == "notfound":
927 self.logger.warning(
928 "Cannot find specified custom objects: {}".format(e)
929 )
930 return []
931 else:
932 raise e
933
934 @retry(
935 attempts=10,
936 delay=1,
937 fallback=Exception("Failed creating the secret"),
938 )
939 async def create_secret_string(
940 self, name: str, string_data: str, namespace: str, secret_type: str
941 ):
942 """
943 Create secret with data
944
945 :param: name: Name of the secret
946 :param: string_data: String with data content
947 :param: namespace: Name of the namespace where the secret will be stored
948 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
949
950 :return: None
951 """
952 v1_core = self.clients[CORE_CLIENT]
953 metadata = V1ObjectMeta(name=name, namespace=namespace)
954 secret = V1Secret(metadata=metadata, string_data=string_data, type=secret_type)
955 v1_core.create_namespaced_secret(namespace, secret)
956
957 @retry(
958 attempts=10,
959 delay=1,
960 fallback=Exception("Failed creating the pvc"),
961 )
962 async def create_pvc(self, name: str, namespace: str):
963 """
964 Create a namespace
965
966 :param: name: Name of the pvc to be created
967 :param: namespace: Name of the namespace where the pvc will be stored
968
969 """
970 try:
971 pvc = V1PersistentVolumeClaim(
972 api_version="v1",
973 kind="PersistentVolumeClaim",
974 metadata=V1ObjectMeta(name=name),
975 spec=V1PersistentVolumeClaimSpec(
976 access_modes=["ReadWriteOnce"],
977 resources=V1ResourceRequirements(requests={"storage": "100Mi"}),
978 ),
979 )
980 self.clients[CORE_CLIENT].create_namespaced_persistent_volume_claim(
981 namespace=namespace, body=pvc
982 )
983 except ApiException as e:
984 info = json.loads(e.body)
985 if info.get("reason").lower() == "alreadyexists":
986 self.logger.warning("PVC already exists: {}".format(e))
987 else:
988 raise e
989
990 @retry(
991 attempts=10,
992 delay=1,
993 fallback=Exception("Failed deleting the pvc"),
994 )
995 async def delete_pvc(self, name: str, namespace: str):
996 """
997 Create a namespace
998
999 :param: name: Name of the pvc to be deleted
1000 :param: namespace: Namespace
1001
1002 """
1003 self.clients[CORE_CLIENT].delete_namespaced_persistent_volume_claim(
1004 name, namespace
1005 )
1006
1007 def copy_file_to_pod(
1008 self, namespace, pod_name, container_name, src_file, dest_path
1009 ):
1010 # Create an in-memory tar file containing the source file
1011 tar_buffer = io.BytesIO()
1012 with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
1013 tar.add(src_file, arcname=dest_path.split("/")[-1])
1014
1015 tar_buffer.seek(0)
1016
1017 # Define the command to extract the tar file in the pod
1018 exec_command = ["tar", "xvf", "-", "-C", dest_path.rsplit("/", 1)[0]]
1019
1020 # Execute the command
1021 resp = stream(
1022 self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
1023 pod_name,
1024 namespace,
1025 command=exec_command,
1026 container=container_name,
1027 stdin=True,
1028 stderr=True,
1029 stdout=True,
1030 tty=False,
1031 _preload_content=False,
1032 )
1033
1034 # Write the tar data to the pod
1035 resp.write_stdin(tar_buffer.read())
1036 resp.close()
1037
1038 @retry(
1039 attempts=10,
1040 delay=1,
1041 fallback=Exception("Failed creating the pvc"),
1042 )
1043 async def create_pvc_with_content(
1044 self, name: str, namespace: str, src_file: str, dest_filename: str
1045 ):
1046 """
1047 Create a PVC with content
1048
1049 :param: name: Name of the pvc to be created
1050 :param: namespace: Name of the namespace where the pvc will be stored
1051 :param: src_file: File to be copied
1052 :param: filename: Name of the file in the destination folder
1053 """
1054 pod_name = f"copy-pod-{name}"
1055 self.logger.debug(f"Creating pvc {name}")
1056 await self.create_pvc(name=name, namespace=namespace)
1057 self.logger.debug("Sleeping")
1058 sleep(40)
1059 self.logger.debug(f"Creating pod {pod_name}")
1060 await self.create_copy_pod(name=pod_name, namespace=namespace, pvc_name=name)
1061 self.logger.debug("Sleeping")
1062 sleep(40)
1063 self.logger.debug(f"Copying files to pod {pod_name}")
1064 self.copy_file_to_pod(
1065 namespace=namespace,
1066 pod_name=pod_name,
1067 container_name="copy-container",
1068 src_file=src_file,
1069 dest_path=f"/mnt/data/{dest_filename}",
1070 )
garciadeblasaed10692025-09-26 14:07:00 +02001071 self.logger.debug(f"Deleting pod {pod_name}")
1072 await self.delete_pod(pod_name, namespace)
almagiacdd20ae2024-12-13 09:45:45 +01001073
1074 @retry(
1075 attempts=10,
1076 delay=1,
1077 fallback=Exception("Failed creating the pvc"),
1078 )
1079 async def create_copy_pod(self, name: str, namespace: str, pvc_name: str):
1080 """
1081 Create a pod to copy content into a PVC
1082
1083 :param: name: Name of the pod to be created
1084 :param: namespace: Name of the namespace where the pod will be stored
1085 :param: pvc_name: Name of the PVC that the pod will mount as a volume
1086
1087 """
1088 pod = V1Pod(
1089 api_version="v1",
1090 kind="Pod",
garciadeblas6d8acf32025-02-06 13:34:37 +01001091 metadata=kclient.V1ObjectMeta(name=name),
almagiacdd20ae2024-12-13 09:45:45 +01001092 spec=V1PodSpec(
1093 containers=[
1094 V1Container(
1095 name="copy-container",
1096 image="busybox", # Imagen ligera para copiar archivos
1097 command=["sleep", "3600"], # Mantén el contenedor en ejecución
1098 volume_mounts=[
1099 V1VolumeMount(mount_path="/mnt/data", name="my-storage")
1100 ],
1101 )
1102 ],
1103 volumes=[
1104 V1Volume(
1105 name="my-storage",
1106 persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
1107 claim_name=pvc_name
1108 ),
1109 )
1110 ],
1111 ),
1112 )
1113 # Create the pod
1114 self.clients[CORE_CLIENT].create_namespaced_pod(namespace=namespace, body=pod)
1115
1116 @retry(
1117 attempts=10,
1118 delay=1,
1119 fallback=Exception("Failed deleting the pod"),
1120 )
1121 async def delete_pod(self, name: str, namespace: str):
1122 """
1123 Create a namespace
1124
1125 :param: name: Name of the pod to be deleted
1126 :param: namespace: Namespace
1127
1128 """
1129 self.clients[CORE_CLIENT].delete_namespaced_pod(name, namespace)