blob: 14de5569b8de8e96b6417351c744acc5de8d481f [file] [log] [blame]
almagiacdd20ae2024-12-13 09:45:45 +01001#######################################################################################
2# Copyright 2020 Canonical Ltd.
3# Copyright ETSI Contributors and Others.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#######################################################################################
17
18import base64
19import logging
almagiacdd20ae2024-12-13 09:45:45 +010020import typing
21import uuid
22import json
23import tarfile
24import io
25from time import sleep
26
27from distutils.version import LooseVersion
28
garciadeblas619a0a32025-02-06 13:34:37 +010029from kubernetes import client as kclient, config as kconfig
almagiacdd20ae2024-12-13 09:45:45 +010030from kubernetes.client.api import VersionApi
31from kubernetes.client.models import (
32 V1ClusterRole,
33 V1Role,
34 V1ObjectMeta,
35 V1PolicyRule,
36 V1ServiceAccount,
37 V1ClusterRoleBinding,
38 V1RoleBinding,
39 V1RoleRef,
40 RbacV1Subject,
41 V1Secret,
42 V1SecretReference,
43 V1Namespace,
44 V1PersistentVolumeClaim,
45 V1PersistentVolumeClaimSpec,
46 V1PersistentVolumeClaimVolumeSource,
47 V1ResourceRequirements,
48 V1Pod,
49 V1PodSpec,
50 V1Volume,
51 V1VolumeMount,
52 V1Container,
53)
54from kubernetes.client.rest import ApiException
55from kubernetes.stream import stream
56from osm_lcm.n2vc.libjuju import retry_callback
57from retrying_async import retry
58
59SERVICE_ACCOUNT_TOKEN_KEY = "token"
60SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
61# clients
62CORE_CLIENT = "core_v1"
63RBAC_CLIENT = "rbac_v1"
64STORAGE_CLIENT = "storage_v1"
65CUSTOM_OBJECT_CLIENT = "custom_object"
66
67
68class Kubectl:
69 def __init__(self, config_file=None):
almagiacdd20ae2024-12-13 09:45:45 +010070 self.logger = logging.getLogger("lcm.kubectl")
garciadeblas619a0a32025-02-06 13:34:37 +010071 self._config_file = config_file
72 self.logger.info(f"Kubectl cfg file: {config_file}")
73 # kconfig.load_kube_config(config_file=config_file)
74 self._api_client = kconfig.new_client_from_config(config_file=config_file)
75 self._clients = {
76 CORE_CLIENT: kclient.CoreV1Api(api_client=self._api_client),
77 RBAC_CLIENT: kclient.RbacAuthorizationV1Api(api_client=self._api_client),
78 STORAGE_CLIENT: kclient.StorageV1Api(api_client=self._api_client),
79 CUSTOM_OBJECT_CLIENT: kclient.CustomObjectsApi(api_client=self._api_client),
80 }
81 self._configuration = self._api_client.configuration.get_default_copy()
82 self.logger.info(f"Kubectl cfg file: {config_file}")
83 self.logger.info(f"Kubectl self configuration Host: {self._configuration.host}")
almagiacdd20ae2024-12-13 09:45:45 +010084
85 @property
86 def configuration(self):
87 return self._configuration
88
89 @property
90 def clients(self):
91 return self._clients
92
93 def get_services(
94 self,
95 field_selector: str = None,
96 label_selector: str = None,
97 ) -> typing.List[typing.Dict]:
98 """
99 Get Service list from a namespace
100
101 :param: field_selector: Kubernetes field selector for the namespace
102 :param: label_selector: Kubernetes label selector for the namespace
103
104 :return: List of the services matching the selectors specified
105 """
106 kwargs = {}
107 if field_selector:
108 kwargs["field_selector"] = field_selector
109 if label_selector:
110 kwargs["label_selector"] = label_selector
111 try:
112 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
113 return [
114 {
115 "name": i.metadata.name,
116 "cluster_ip": i.spec.cluster_ip,
117 "type": i.spec.type,
118 "ports": (
119 [
120 {
121 "name": p.name,
122 "node_port": p.node_port,
123 "port": p.port,
124 "protocol": p.protocol,
125 "target_port": p.target_port,
126 }
127 for p in i.spec.ports
128 ]
129 if i.spec.ports
130 else []
131 ),
132 "external_ip": [i.ip for i in i.status.load_balancer.ingress]
133 if i.status.load_balancer.ingress
134 else None,
135 }
136 for i in result.items
137 ]
138 except ApiException as e:
139 self.logger.error("Error calling get services: {}".format(e))
140 raise e
141
142 def get_default_storage_class(self) -> str:
143 """
144 Default storage class
145
146 :return: Returns the default storage class name, if exists.
147 If not, it returns the first storage class.
148 If there are not storage classes, returns None
149 """
150 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
151 selected_sc = None
152 default_sc_annotations = {
153 "storageclass.kubernetes.io/is-default-class": "true",
154 # Older clusters still use the beta annotation.
155 "storageclass.beta.kubernetes.io/is-default-class": "true",
156 }
157 for sc in storage_classes.items:
158 if not selected_sc:
159 # Select the first storage class in case there is no a default-class
160 selected_sc = sc.metadata.name
161 annotations = sc.metadata.annotations or {}
162 if any(
163 k in annotations and annotations[k] == v
164 for k, v in default_sc_annotations.items()
165 ):
166 # Default storage
167 selected_sc = sc.metadata.name
168 break
169 return selected_sc
170
171 def create_cluster_role(
172 self,
173 name: str,
garciadeblasb282ed02025-02-08 11:30:39 +0100174 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100175 namespace: str = "kube-system",
176 ):
177 """
178 Create a cluster role
179
180 :param: name: Name of the cluster role
181 :param: labels: Labels for cluster role metadata
182 :param: namespace: Kubernetes namespace for cluster role metadata
183 Default: kube-system
184 """
185 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
186 field_selector="metadata.name={}".format(name)
187 )
188
189 if len(cluster_roles.items) > 0:
190 raise Exception("Role with metadata.name={} already exists".format(name))
191
192 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
193 # Cluster role
194 cluster_role = V1ClusterRole(
195 metadata=metadata,
196 rules=[
197 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
198 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
199 ],
200 )
201
202 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
203
204 async def create_role(
205 self,
206 name: str,
garciadeblasb282ed02025-02-08 11:30:39 +0100207 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100208 api_groups: list,
209 resources: list,
210 verbs: list,
211 namespace: str,
212 ):
213 """
214 Create a role with one PolicyRule
215
216 :param: name: Name of the namespaced Role
217 :param: labels: Labels for namespaced Role metadata
218 :param: api_groups: List with api-groups allowed in the policy rule
219 :param: resources: List with resources allowed in the policy rule
220 :param: verbs: List with verbs allowed in the policy rule
221 :param: namespace: Kubernetes namespace for Role metadata
222
223 :return: None
224 """
225
226 roles = self.clients[RBAC_CLIENT].list_namespaced_role(
227 namespace, field_selector="metadata.name={}".format(name)
228 )
229
230 if len(roles.items) > 0:
231 raise Exception("Role with metadata.name={} already exists".format(name))
232
233 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
234
235 role = V1Role(
236 metadata=metadata,
237 rules=[
238 V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
239 ],
240 )
241
242 self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
243
244 def delete_cluster_role(self, name: str):
245 """
246 Delete a cluster role
247
248 :param: name: Name of the cluster role
249 """
250 self.clients[RBAC_CLIENT].delete_cluster_role(name)
251
252 def _get_kubectl_version(self):
garciadeblas619a0a32025-02-06 13:34:37 +0100253 self.logger.debug("Enter _get_kubectl_version function")
254 version = VersionApi(api_client=self._api_client).get_code()
almagiacdd20ae2024-12-13 09:45:45 +0100255 return "{}.{}".format(version.major, version.minor)
256
257 def _need_to_create_new_secret(self):
258 min_k8s_version = "1.24"
259 current_k8s_version = self._get_kubectl_version()
260 return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
261
262 def _get_secret_name(self, service_account_name: str):
263 random_alphanum = str(uuid.uuid4())[:5]
264 return "{}-token-{}".format(service_account_name, random_alphanum)
265
266 def _create_service_account_secret(
267 self,
268 service_account_name: str,
269 namespace: str,
270 secret_name: str,
271 ):
272 """
273 Create a secret for the service account. K8s version >= 1.24
274
275 :param: service_account_name: Name of the service account
276 :param: namespace: Kubernetes namespace for service account metadata
277 :param: secret_name: Name of the secret
278 """
279 v1_core = self.clients[CORE_CLIENT]
280 secrets = v1_core.list_namespaced_secret(
281 namespace, field_selector="metadata.name={}".format(secret_name)
282 ).items
283
284 if len(secrets) > 0:
285 raise Exception(
286 "Secret with metadata.name={} already exists".format(secret_name)
287 )
288
289 annotations = {"kubernetes.io/service-account.name": service_account_name}
290 metadata = V1ObjectMeta(
291 name=secret_name, namespace=namespace, annotations=annotations
292 )
293 type = "kubernetes.io/service-account-token"
294 secret = V1Secret(metadata=metadata, type=type)
295 v1_core.create_namespaced_secret(namespace, secret)
296
297 def _get_secret_reference_list(self, namespace: str, secret_name: str):
298 """
299 Return a secret reference list with one secret.
300 K8s version >= 1.24
301
302 :param: namespace: Kubernetes namespace for service account metadata
303 :param: secret_name: Name of the secret
304 :rtype: list[V1SecretReference]
305 """
306 return [V1SecretReference(name=secret_name, namespace=namespace)]
307
308 def create_service_account(
309 self,
310 name: str,
garciadeblasb282ed02025-02-08 11:30:39 +0100311 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100312 namespace: str = "kube-system",
313 ):
314 """
315 Create a service account
316
317 :param: name: Name of the service account
318 :param: labels: Labels for service account metadata
319 :param: namespace: Kubernetes namespace for service account metadata
320 Default: kube-system
321 """
322 v1_core = self.clients[CORE_CLIENT]
323 service_accounts = v1_core.list_namespaced_service_account(
324 namespace, field_selector="metadata.name={}".format(name)
325 )
326 if len(service_accounts.items) > 0:
327 raise Exception(
328 "Service account with metadata.name={} already exists".format(name)
329 )
330
331 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
332
333 if self._need_to_create_new_secret():
334 secret_name = self._get_secret_name(name)
335 secrets = self._get_secret_reference_list(namespace, secret_name)
336 service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
337 v1_core.create_namespaced_service_account(namespace, service_account)
338 self._create_service_account_secret(name, namespace, secret_name)
339 else:
340 service_account = V1ServiceAccount(metadata=metadata)
341 v1_core.create_namespaced_service_account(namespace, service_account)
342
343 def delete_secret(self, name: str, namespace: str = "kube-system"):
344 """
345 Delete a secret
346
347 :param: name: Name of the secret
348 :param: namespace: Kubernetes namespace
349 Default: kube-system
350 """
garciadeblas619a0a32025-02-06 13:34:37 +0100351 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
352 self.logger.debug(
353 f"Kubectl self configuration Host: {self._configuration.host}"
354 )
almagiacdd20ae2024-12-13 09:45:45 +0100355 self.clients[CORE_CLIENT].delete_namespaced_secret(name, namespace)
356
357 def delete_service_account(self, name: str, namespace: str = "kube-system"):
358 """
359 Delete a service account
360
361 :param: name: Name of the service account
362 :param: namespace: Kubernetes namespace for service account metadata
363 Default: kube-system
364 """
365 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
366
367 def create_cluster_role_binding(
garciadeblasb282ed02025-02-08 11:30:39 +0100368 self, name: str, labels: typing.Dict[str, str], namespace: str = "kube-system"
almagiacdd20ae2024-12-13 09:45:45 +0100369 ):
370 """
371 Create a cluster role binding
372
373 :param: name: Name of the cluster role
374 :param: labels: Labels for cluster role binding metadata
375 :param: namespace: Kubernetes namespace for cluster role binding metadata
376 Default: kube-system
377 """
378 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
379 field_selector="metadata.name={}".format(name)
380 )
381 if len(role_bindings.items) > 0:
382 raise Exception("Generated rbac id already exists")
383
384 role_binding = V1ClusterRoleBinding(
385 metadata=V1ObjectMeta(name=name, labels=labels),
386 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
387 subjects=[
388 RbacV1Subject(kind="ServiceAccount", name=name, namespace=namespace)
389 ],
390 )
391 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
392
393 async def create_role_binding(
394 self,
395 name: str,
396 role_name: str,
397 sa_name: str,
garciadeblasb282ed02025-02-08 11:30:39 +0100398 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100399 namespace: str,
400 ):
401 """
402 Create a cluster role binding
403
404 :param: name: Name of the namespaced Role Binding
405 :param: role_name: Name of the namespaced Role to be bound
406 :param: sa_name: Name of the Service Account to be bound
407 :param: labels: Labels for Role Binding metadata
408 :param: namespace: Kubernetes namespace for Role Binding metadata
409
410 :return: None
411 """
412 role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
413 namespace, field_selector="metadata.name={}".format(name)
414 )
415 if len(role_bindings.items) > 0:
416 raise Exception(
417 "Role Binding with metadata.name={} already exists".format(name)
418 )
419
420 role_binding = V1RoleBinding(
421 metadata=V1ObjectMeta(name=name, labels=labels),
422 role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
423 subjects=[
424 RbacV1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
425 ],
426 )
427 self.clients[RBAC_CLIENT].create_namespaced_role_binding(
428 namespace, role_binding
429 )
430
431 def delete_cluster_role_binding(self, name: str):
432 """
433 Delete a cluster role binding
434
435 :param: name: Name of the cluster role binding
436 """
437 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
438
439 @retry(
440 attempts=10,
441 delay=1,
442 fallback=Exception("Failed getting the secret from service account"),
443 callback=retry_callback,
444 )
445 async def get_secret_data(
446 self, name: str, namespace: str = "kube-system"
garciadeblas619a0a32025-02-06 13:34:37 +0100447 ) -> typing.Tuple[str, str]:
almagiacdd20ae2024-12-13 09:45:45 +0100448 """
449 Get secret data
450
451 :param: name: Name of the secret data
452 :param: namespace: Name of the namespace where the secret is stored
453
454 :return: Tuple with the token and client certificate
455 """
456 v1_core = self.clients[CORE_CLIENT]
457
458 secret_name = None
459
460 service_accounts = v1_core.list_namespaced_service_account(
461 namespace, field_selector="metadata.name={}".format(name)
462 )
463 if len(service_accounts.items) == 0:
464 raise Exception(
465 "Service account not found with metadata.name={}".format(name)
466 )
467 service_account = service_accounts.items[0]
468 if service_account.secrets and len(service_account.secrets) > 0:
469 secret_name = service_account.secrets[0].name
470 if not secret_name:
471 raise Exception(
472 "Failed getting the secret from service account {}".format(name)
473 )
474 # TODO: refactor to use get_secret_content
475 secret = v1_core.list_namespaced_secret(
476 namespace, field_selector="metadata.name={}".format(secret_name)
477 ).items[0]
478
479 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
480 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
481
482 return (
483 base64.b64decode(token).decode("utf-8"),
484 base64.b64decode(client_certificate_data).decode("utf-8"),
485 )
486
487 @retry(
488 attempts=10,
489 delay=1,
490 fallback=Exception("Failed getting data from the secret"),
491 )
492 async def get_secret_content(
493 self,
494 name: str,
495 namespace: str,
garciadeblasb282ed02025-02-08 11:30:39 +0100496 ) -> typing.Dict:
almagiacdd20ae2024-12-13 09:45:45 +0100497 """
498 Get secret data
499
500 :param: name: Name of the secret
501 :param: namespace: Name of the namespace where the secret is stored
502
503 :return: Dictionary with secret's data
504 """
garciadeblas619a0a32025-02-06 13:34:37 +0100505 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
506 self.logger.debug(
507 f"Kubectl self configuration Host: {self._configuration.host}"
508 )
almagiacdd20ae2024-12-13 09:45:45 +0100509 v1_core = self.clients[CORE_CLIENT]
510
511 secret = v1_core.read_namespaced_secret(name, namespace)
512
513 return secret.data
514
515 @retry(
516 attempts=10,
517 delay=1,
518 fallback=Exception("Failed creating the secret"),
519 )
520 async def create_secret(
521 self, name: str, data: dict, namespace: str, secret_type: str
522 ):
523 """
524 Create secret with data
525
526 :param: name: Name of the secret
527 :param: data: Dict with data content. Values must be already base64 encoded
528 :param: namespace: Name of the namespace where the secret will be stored
529 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
530
531 :return: None
532 """
garciadeblas619a0a32025-02-06 13:34:37 +0100533 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
534 self.logger.debug(
535 f"Kubectl self configuration Host: {self._configuration.host}"
536 )
537 self.logger.debug("Enter create_secret function")
almagiacdd20ae2024-12-13 09:45:45 +0100538 v1_core = self.clients[CORE_CLIENT]
garciadeblas619a0a32025-02-06 13:34:37 +0100539 self.logger.debug(f"v1_core: {v1_core}")
almagiacdd20ae2024-12-13 09:45:45 +0100540 metadata = V1ObjectMeta(name=name, namespace=namespace)
garciadeblas619a0a32025-02-06 13:34:37 +0100541 self.logger.debug(f"metadata: {metadata}")
almagiacdd20ae2024-12-13 09:45:45 +0100542 secret = V1Secret(metadata=metadata, data=data, type=secret_type)
garciadeblas619a0a32025-02-06 13:34:37 +0100543 self.logger.debug(f"secret: {secret}")
almagiacdd20ae2024-12-13 09:45:45 +0100544 v1_core.create_namespaced_secret(namespace, secret)
545 self.logger.info("Namespaced secret was created")
546
547 async def create_certificate(
548 self,
549 namespace: str,
550 name: str,
551 dns_prefix: str,
552 secret_name: str,
553 usages: list,
554 issuer_name: str,
555 ):
556 """
557 Creates cert-manager certificate object
558
559 :param: namespace: Name of the namespace where the certificate and secret is stored
560 :param: name: Name of the certificate object
561 :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
562 :param: secret_name: Name of the secret created by cert-manager
563 :param: usages: List of X.509 key usages
564 :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
565
566 """
567 certificate_body = {
568 "apiVersion": "cert-manager.io/v1",
569 "kind": "Certificate",
570 "metadata": {"name": name, "namespace": namespace},
571 "spec": {
572 "secretName": secret_name,
573 "privateKey": {
574 "rotationPolicy": "Always",
575 "algorithm": "ECDSA",
576 "size": 256,
577 },
578 "duration": "8760h", # 1 Year
579 "renewBefore": "2208h", # 9 months
580 "subject": {"organizations": ["osm"]},
581 "commonName": "osm",
582 "isCA": False,
583 "usages": usages,
584 "dnsNames": [
585 "{}.{}".format(dns_prefix, namespace),
586 "{}.{}.svc".format(dns_prefix, namespace),
587 "{}.{}.svc.cluster".format(dns_prefix, namespace),
588 "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
589 ],
590 "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
591 },
592 }
593 client = self.clients[CUSTOM_OBJECT_CLIENT]
594 try:
595 client.create_namespaced_custom_object(
596 group="cert-manager.io",
597 plural="certificates",
598 version="v1",
599 body=certificate_body,
600 namespace=namespace,
601 )
602 except ApiException as e:
603 info = json.loads(e.body)
604 if info.get("reason").lower() == "alreadyexists":
605 self.logger.warning("Certificate already exists: {}".format(e))
606 else:
607 raise e
608
609 async def delete_certificate(self, namespace, object_name):
610 client = self.clients[CUSTOM_OBJECT_CLIENT]
611 try:
612 client.delete_namespaced_custom_object(
613 group="cert-manager.io",
614 plural="certificates",
615 version="v1",
616 name=object_name,
617 namespace=namespace,
618 )
619 except ApiException as e:
620 info = json.loads(e.body)
621 if info.get("reason").lower() == "notfound":
622 self.logger.warning("Certificate already deleted: {}".format(e))
623 else:
624 raise e
625
626 @retry(
627 attempts=10,
628 delay=1,
629 fallback=Exception("Failed creating the namespace"),
630 )
631 async def create_namespace(self, name: str, labels: dict = None):
632 """
633 Create a namespace
634
635 :param: name: Name of the namespace to be created
636 :param: labels: Dictionary with labels for the new namespace
637
638 """
639 v1_core = self.clients[CORE_CLIENT]
640 metadata = V1ObjectMeta(name=name, labels=labels)
641 namespace = V1Namespace(
642 metadata=metadata,
643 )
644
645 try:
646 v1_core.create_namespace(namespace)
647 self.logger.debug("Namespace created: {}".format(name))
648 except ApiException as e:
649 info = json.loads(e.body)
650 if info.get("reason").lower() == "alreadyexists":
651 self.logger.warning("Namespace already exists: {}".format(e))
652 else:
653 raise e
654
655 @retry(
656 attempts=10,
657 delay=1,
658 fallback=Exception("Failed deleting the namespace"),
659 )
660 async def delete_namespace(self, name: str):
661 """
662 Delete a namespace
663
664 :param: name: Name of the namespace to be deleted
665
666 """
667 try:
668 self.clients[CORE_CLIENT].delete_namespace(name)
669 except ApiException as e:
670 if e.reason == "Not Found":
671 self.logger.warning("Namespace already deleted: {}".format(e))
672
673 def get_secrets(
674 self,
675 namespace: str,
676 field_selector: str = None,
677 ) -> typing.List[typing.Dict]:
678 """
679 Get Secret list from a namespace
680
681 :param: namespace: Kubernetes namespace
682 :param: field_selector: Kubernetes field selector
683
684 :return: List of the secrets matching the selectors specified
685 """
686 try:
687 v1_core = self.clients[CORE_CLIENT]
688 secrets = v1_core.list_namespaced_secret(
689 namespace=namespace,
690 field_selector=field_selector,
691 ).items
692 return secrets
693 except ApiException as e:
694 self.logger.error("Error calling get secrets: {}".format(e))
695 raise e
696
697 def create_generic_object(
698 self,
699 api_group: str,
700 api_plural: str,
701 api_version: str,
702 namespace: str,
703 manifest_dict: dict,
704 ):
705 """
706 Creates generic object
707
708 :param: api_group: API Group
709 :param: api_plural: API Plural
710 :param: api_version: API Version
711 :param: namespace: Namespace
712 :param: manifest_dict: Dictionary with the content of the Kubernetes manifest
713
714 """
garciadeblas619a0a32025-02-06 13:34:37 +0100715 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
716 self.logger.debug(
717 f"Kubectl self configuration Host: {self._configuration.host}"
718 )
almagiacdd20ae2024-12-13 09:45:45 +0100719 client = self.clients[CUSTOM_OBJECT_CLIENT]
720 try:
721 if namespace:
722 client.create_namespaced_custom_object(
723 group=api_group,
724 plural=api_plural,
725 version=api_version,
726 body=manifest_dict,
727 namespace=namespace,
728 )
729 else:
730 client.create_cluster_custom_object(
731 group=api_group,
732 plural=api_plural,
733 version=api_version,
734 body=manifest_dict,
735 )
736 except ApiException as e:
737 info = json.loads(e.body)
738 if info.get("reason").lower() == "alreadyexists":
739 self.logger.warning("Object already exists: {}".format(e))
740 else:
741 raise e
742
743 def delete_generic_object(
744 self,
745 api_group: str,
746 api_plural: str,
747 api_version: str,
748 namespace: str,
749 name: str,
750 ):
751 """
752 Deletes generic object
753
754 :param: api_group: API Group
755 :param: api_plural: API Plural
756 :param: api_version: API Version
757 :param: namespace: Namespace
758 :param: name: Name of the object
759
760 """
garciadeblas619a0a32025-02-06 13:34:37 +0100761 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
762 self.logger.debug(
763 f"Kubectl self configuration Host: {self._configuration.host}"
764 )
almagiacdd20ae2024-12-13 09:45:45 +0100765 client = self.clients[CUSTOM_OBJECT_CLIENT]
766 try:
767 if namespace:
768 client.delete_namespaced_custom_object(
769 group=api_group,
770 plural=api_plural,
771 version=api_version,
772 name=name,
773 namespace=namespace,
774 )
775 else:
776 client.delete_cluster_custom_object(
777 group=api_group,
778 plural=api_plural,
779 version=api_version,
780 name=name,
781 )
782 except ApiException as e:
783 info = json.loads(e.body)
784 if info.get("reason").lower() == "notfound":
785 self.logger.warning("Object already deleted: {}".format(e))
786 else:
787 raise e
788
789 async def get_generic_object(
790 self,
791 api_group: str,
792 api_plural: str,
793 api_version: str,
794 namespace: str,
795 name: str,
796 ):
797 """
798 Gets generic object
799
800 :param: api_group: API Group
801 :param: api_plural: API Plural
802 :param: api_version: API Version
803 :param: namespace: Namespace
804 :param: name: Name of the object
805
806 """
garciadeblas619a0a32025-02-06 13:34:37 +0100807 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
808 self.logger.debug(
809 f"Kubectl self configuration Host: {self._configuration.host}"
810 )
almagiacdd20ae2024-12-13 09:45:45 +0100811 client = self.clients[CUSTOM_OBJECT_CLIENT]
812 try:
813 if namespace:
814 object_dict = client.list_namespaced_custom_object(
815 group=api_group,
816 plural=api_plural,
817 version=api_version,
818 namespace=namespace,
819 field_selector=f"metadata.name={name}",
820 )
821 else:
822 object_dict = client.list_cluster_custom_object(
823 group=api_group,
824 plural=api_plural,
825 version=api_version,
826 field_selector=f"metadata.name={name}",
827 )
828 if len(object_dict.get("items")) == 0:
829 return None
830 return object_dict.get("items")[0]
831 except ApiException as e:
832 self.logger.debug(f"Exception: {e}")
833 info = json.loads(e.body)
garciadeblasbc96f382025-01-22 16:02:18 +0100834 raise f"Api Exception: {e}. Reason: {info.get('reason')}"
almagiacdd20ae2024-12-13 09:45:45 +0100835
836 async def list_generic_object(
837 self,
838 api_group: str,
839 api_plural: str,
840 api_version: str,
841 namespace: str,
842 ):
843 """
844 Lists all generic objects of the requested API group
845
846 :param: api_group: API Group
847 :param: api_plural: API Plural
848 :param: api_version: API Version
849 :param: namespace: Namespace
850
851 """
garciadeblas619a0a32025-02-06 13:34:37 +0100852 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
853 self.logger.debug(
854 f"Kubectl self configuration Host: {self._configuration.host}"
855 )
almagiacdd20ae2024-12-13 09:45:45 +0100856 client = self.clients[CUSTOM_OBJECT_CLIENT]
857 try:
858 if namespace:
859 object_dict = client.list_namespaced_custom_object(
860 group=api_group,
861 plural=api_plural,
862 version=api_version,
863 namespace=namespace,
864 )
865 else:
866 object_dict = client.list_cluster_custom_object(
867 group=api_group,
868 plural=api_plural,
869 version=api_version,
870 )
871 self.logger.debug(f"Object-list: {object_dict.get('items')}")
872 return object_dict.get("items")
873 except ApiException as e:
874 self.logger.debug(f"Exception: {e}")
875 info = json.loads(e.body)
876 if info.get("reason").lower() == "notfound":
877 self.logger.warning(
878 "Cannot find specified custom objects: {}".format(e)
879 )
880 return []
881 else:
882 raise e
883
884 @retry(
885 attempts=10,
886 delay=1,
887 fallback=Exception("Failed creating the secret"),
888 )
889 async def create_secret_string(
890 self, name: str, string_data: str, namespace: str, secret_type: str
891 ):
892 """
893 Create secret with data
894
895 :param: name: Name of the secret
896 :param: string_data: String with data content
897 :param: namespace: Name of the namespace where the secret will be stored
898 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
899
900 :return: None
901 """
902 v1_core = self.clients[CORE_CLIENT]
903 metadata = V1ObjectMeta(name=name, namespace=namespace)
904 secret = V1Secret(metadata=metadata, string_data=string_data, type=secret_type)
905 v1_core.create_namespaced_secret(namespace, secret)
906
907 @retry(
908 attempts=10,
909 delay=1,
910 fallback=Exception("Failed creating the pvc"),
911 )
912 async def create_pvc(self, name: str, namespace: str):
913 """
914 Create a namespace
915
916 :param: name: Name of the pvc to be created
917 :param: namespace: Name of the namespace where the pvc will be stored
918
919 """
920 try:
921 pvc = V1PersistentVolumeClaim(
922 api_version="v1",
923 kind="PersistentVolumeClaim",
924 metadata=V1ObjectMeta(name=name),
925 spec=V1PersistentVolumeClaimSpec(
926 access_modes=["ReadWriteOnce"],
927 resources=V1ResourceRequirements(requests={"storage": "100Mi"}),
928 ),
929 )
930 self.clients[CORE_CLIENT].create_namespaced_persistent_volume_claim(
931 namespace=namespace, body=pvc
932 )
933 except ApiException as e:
934 info = json.loads(e.body)
935 if info.get("reason").lower() == "alreadyexists":
936 self.logger.warning("PVC already exists: {}".format(e))
937 else:
938 raise e
939
940 @retry(
941 attempts=10,
942 delay=1,
943 fallback=Exception("Failed deleting the pvc"),
944 )
945 async def delete_pvc(self, name: str, namespace: str):
946 """
947 Create a namespace
948
949 :param: name: Name of the pvc to be deleted
950 :param: namespace: Namespace
951
952 """
953 self.clients[CORE_CLIENT].delete_namespaced_persistent_volume_claim(
954 name, namespace
955 )
956
957 def copy_file_to_pod(
958 self, namespace, pod_name, container_name, src_file, dest_path
959 ):
960 # Create an in-memory tar file containing the source file
961 tar_buffer = io.BytesIO()
962 with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
963 tar.add(src_file, arcname=dest_path.split("/")[-1])
964
965 tar_buffer.seek(0)
966
967 # Define the command to extract the tar file in the pod
968 exec_command = ["tar", "xvf", "-", "-C", dest_path.rsplit("/", 1)[0]]
969
970 # Execute the command
971 resp = stream(
972 self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
973 pod_name,
974 namespace,
975 command=exec_command,
976 container=container_name,
977 stdin=True,
978 stderr=True,
979 stdout=True,
980 tty=False,
981 _preload_content=False,
982 )
983
984 # Write the tar data to the pod
985 resp.write_stdin(tar_buffer.read())
986 resp.close()
987
988 @retry(
989 attempts=10,
990 delay=1,
991 fallback=Exception("Failed creating the pvc"),
992 )
993 async def create_pvc_with_content(
994 self, name: str, namespace: str, src_file: str, dest_filename: str
995 ):
996 """
997 Create a PVC with content
998
999 :param: name: Name of the pvc to be created
1000 :param: namespace: Name of the namespace where the pvc will be stored
1001 :param: src_file: File to be copied
1002 :param: filename: Name of the file in the destination folder
1003 """
1004 pod_name = f"copy-pod-{name}"
1005 self.logger.debug(f"Creating pvc {name}")
1006 await self.create_pvc(name=name, namespace=namespace)
1007 self.logger.debug("Sleeping")
1008 sleep(40)
1009 self.logger.debug(f"Creating pod {pod_name}")
1010 await self.create_copy_pod(name=pod_name, namespace=namespace, pvc_name=name)
1011 self.logger.debug("Sleeping")
1012 sleep(40)
1013 self.logger.debug(f"Copying files to pod {pod_name}")
1014 self.copy_file_to_pod(
1015 namespace=namespace,
1016 pod_name=pod_name,
1017 container_name="copy-container",
1018 src_file=src_file,
1019 dest_path=f"/mnt/data/{dest_filename}",
1020 )
1021
1022 @retry(
1023 attempts=10,
1024 delay=1,
1025 fallback=Exception("Failed creating the pvc"),
1026 )
1027 async def create_copy_pod(self, name: str, namespace: str, pvc_name: str):
1028 """
1029 Create a pod to copy content into a PVC
1030
1031 :param: name: Name of the pod to be created
1032 :param: namespace: Name of the namespace where the pod will be stored
1033 :param: pvc_name: Name of the PVC that the pod will mount as a volume
1034
1035 """
1036 pod = V1Pod(
1037 api_version="v1",
1038 kind="Pod",
garciadeblas619a0a32025-02-06 13:34:37 +01001039 metadata=kclient.V1ObjectMeta(name=name),
almagiacdd20ae2024-12-13 09:45:45 +01001040 spec=V1PodSpec(
1041 containers=[
1042 V1Container(
1043 name="copy-container",
1044 image="busybox", # Imagen ligera para copiar archivos
1045 command=["sleep", "3600"], # Mantén el contenedor en ejecución
1046 volume_mounts=[
1047 V1VolumeMount(mount_path="/mnt/data", name="my-storage")
1048 ],
1049 )
1050 ],
1051 volumes=[
1052 V1Volume(
1053 name="my-storage",
1054 persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
1055 claim_name=pvc_name
1056 ),
1057 )
1058 ],
1059 ),
1060 )
1061 # Create the pod
1062 self.clients[CORE_CLIENT].create_namespaced_pod(namespace=namespace, body=pod)
1063
1064 @retry(
1065 attempts=10,
1066 delay=1,
1067 fallback=Exception("Failed deleting the pod"),
1068 )
1069 async def delete_pod(self, name: str, namespace: str):
1070 """
1071 Create a namespace
1072
1073 :param: name: Name of the pod to be deleted
1074 :param: namespace: Namespace
1075
1076 """
1077 self.clients[CORE_CLIENT].delete_namespaced_pod(name, namespace)