blob: 67a404cde4201a03b0d3b0a7ca94afb8712374db [file] [log] [blame]
almagiacdd20ae2024-12-13 09:45:45 +01001#######################################################################################
2# Copyright 2020 Canonical Ltd.
3# Copyright ETSI Contributors and Others.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#######################################################################################
17
18import base64
19import logging
almagiacdd20ae2024-12-13 09:45:45 +010020import typing
21import uuid
22import json
23import tarfile
24import io
25from time import sleep
26
27from distutils.version import LooseVersion
28
garciadeblas6d8acf32025-02-06 13:34:37 +010029from kubernetes import client as kclient, config as kconfig
almagiacdd20ae2024-12-13 09:45:45 +010030from kubernetes.client.api import VersionApi
31from kubernetes.client.models import (
32 V1ClusterRole,
33 V1Role,
34 V1ObjectMeta,
35 V1PolicyRule,
36 V1ServiceAccount,
37 V1ClusterRoleBinding,
38 V1RoleBinding,
39 V1RoleRef,
40 RbacV1Subject,
41 V1Secret,
42 V1SecretReference,
43 V1Namespace,
44 V1PersistentVolumeClaim,
45 V1PersistentVolumeClaimSpec,
46 V1PersistentVolumeClaimVolumeSource,
47 V1ResourceRequirements,
48 V1Pod,
49 V1PodSpec,
50 V1Volume,
51 V1VolumeMount,
52 V1Container,
53)
54from kubernetes.client.rest import ApiException
55from kubernetes.stream import stream
56from osm_lcm.n2vc.libjuju import retry_callback
57from retrying_async import retry
58
59SERVICE_ACCOUNT_TOKEN_KEY = "token"
60SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
61# clients
62CORE_CLIENT = "core_v1"
63RBAC_CLIENT = "rbac_v1"
64STORAGE_CLIENT = "storage_v1"
65CUSTOM_OBJECT_CLIENT = "custom_object"
66
67
68class Kubectl:
69 def __init__(self, config_file=None):
almagiacdd20ae2024-12-13 09:45:45 +010070 self.logger = logging.getLogger("lcm.kubectl")
garciadeblas6d8acf32025-02-06 13:34:37 +010071 self._config_file = config_file
72 self.logger.info(f"Kubectl cfg file: {config_file}")
73 # kconfig.load_kube_config(config_file=config_file)
74 self._api_client = kconfig.new_client_from_config(config_file=config_file)
75 self._clients = {
76 CORE_CLIENT: kclient.CoreV1Api(api_client=self._api_client),
77 RBAC_CLIENT: kclient.RbacAuthorizationV1Api(api_client=self._api_client),
78 STORAGE_CLIENT: kclient.StorageV1Api(api_client=self._api_client),
79 CUSTOM_OBJECT_CLIENT: kclient.CustomObjectsApi(api_client=self._api_client),
80 }
81 self._configuration = self._api_client.configuration.get_default_copy()
82 self.logger.info(f"Kubectl cfg file: {config_file}")
almagiacdd20ae2024-12-13 09:45:45 +010083
84 @property
85 def configuration(self):
86 return self._configuration
87
88 @property
89 def clients(self):
90 return self._clients
91
92 def get_services(
93 self,
94 field_selector: str = None,
95 label_selector: str = None,
96 ) -> typing.List[typing.Dict]:
97 """
98 Get Service list from a namespace
99
100 :param: field_selector: Kubernetes field selector for the namespace
101 :param: label_selector: Kubernetes label selector for the namespace
102
103 :return: List of the services matching the selectors specified
104 """
105 kwargs = {}
106 if field_selector:
107 kwargs["field_selector"] = field_selector
108 if label_selector:
109 kwargs["label_selector"] = label_selector
110 try:
111 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
112 return [
113 {
114 "name": i.metadata.name,
115 "cluster_ip": i.spec.cluster_ip,
116 "type": i.spec.type,
117 "ports": (
118 [
119 {
120 "name": p.name,
121 "node_port": p.node_port,
122 "port": p.port,
123 "protocol": p.protocol,
124 "target_port": p.target_port,
125 }
126 for p in i.spec.ports
127 ]
128 if i.spec.ports
129 else []
130 ),
131 "external_ip": [i.ip for i in i.status.load_balancer.ingress]
132 if i.status.load_balancer.ingress
133 else None,
134 }
135 for i in result.items
136 ]
137 except ApiException as e:
138 self.logger.error("Error calling get services: {}".format(e))
139 raise e
140
141 def get_default_storage_class(self) -> str:
142 """
143 Default storage class
144
145 :return: Returns the default storage class name, if exists.
146 If not, it returns the first storage class.
147 If there are not storage classes, returns None
148 """
149 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
150 selected_sc = None
151 default_sc_annotations = {
152 "storageclass.kubernetes.io/is-default-class": "true",
153 # Older clusters still use the beta annotation.
154 "storageclass.beta.kubernetes.io/is-default-class": "true",
155 }
156 for sc in storage_classes.items:
157 if not selected_sc:
158 # Select the first storage class in case there is no a default-class
159 selected_sc = sc.metadata.name
160 annotations = sc.metadata.annotations or {}
161 if any(
162 k in annotations and annotations[k] == v
163 for k, v in default_sc_annotations.items()
164 ):
165 # Default storage
166 selected_sc = sc.metadata.name
167 break
168 return selected_sc
169
170 def create_cluster_role(
171 self,
172 name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100173 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100174 namespace: str = "kube-system",
175 ):
176 """
177 Create a cluster role
178
179 :param: name: Name of the cluster role
180 :param: labels: Labels for cluster role metadata
181 :param: namespace: Kubernetes namespace for cluster role metadata
182 Default: kube-system
183 """
184 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
185 field_selector="metadata.name={}".format(name)
186 )
187
188 if len(cluster_roles.items) > 0:
189 raise Exception("Role with metadata.name={} already exists".format(name))
190
191 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
192 # Cluster role
193 cluster_role = V1ClusterRole(
194 metadata=metadata,
195 rules=[
196 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
197 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
198 ],
199 )
200
201 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
202
203 async def create_role(
204 self,
205 name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100206 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100207 api_groups: list,
208 resources: list,
209 verbs: list,
210 namespace: str,
211 ):
212 """
213 Create a role with one PolicyRule
214
215 :param: name: Name of the namespaced Role
216 :param: labels: Labels for namespaced Role metadata
217 :param: api_groups: List with api-groups allowed in the policy rule
218 :param: resources: List with resources allowed in the policy rule
219 :param: verbs: List with verbs allowed in the policy rule
220 :param: namespace: Kubernetes namespace for Role metadata
221
222 :return: None
223 """
224
225 roles = self.clients[RBAC_CLIENT].list_namespaced_role(
226 namespace, field_selector="metadata.name={}".format(name)
227 )
228
229 if len(roles.items) > 0:
230 raise Exception("Role with metadata.name={} already exists".format(name))
231
232 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
233
234 role = V1Role(
235 metadata=metadata,
236 rules=[
237 V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
238 ],
239 )
240
241 self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
242
243 def delete_cluster_role(self, name: str):
244 """
245 Delete a cluster role
246
247 :param: name: Name of the cluster role
248 """
249 self.clients[RBAC_CLIENT].delete_cluster_role(name)
250
251 def _get_kubectl_version(self):
garciadeblas6d8acf32025-02-06 13:34:37 +0100252 self.logger.debug("Enter _get_kubectl_version function")
253 version = VersionApi(api_client=self._api_client).get_code()
almagiacdd20ae2024-12-13 09:45:45 +0100254 return "{}.{}".format(version.major, version.minor)
255
256 def _need_to_create_new_secret(self):
257 min_k8s_version = "1.24"
258 current_k8s_version = self._get_kubectl_version()
259 return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
260
261 def _get_secret_name(self, service_account_name: str):
262 random_alphanum = str(uuid.uuid4())[:5]
263 return "{}-token-{}".format(service_account_name, random_alphanum)
264
265 def _create_service_account_secret(
266 self,
267 service_account_name: str,
268 namespace: str,
269 secret_name: str,
270 ):
271 """
272 Create a secret for the service account. K8s version >= 1.24
273
274 :param: service_account_name: Name of the service account
275 :param: namespace: Kubernetes namespace for service account metadata
276 :param: secret_name: Name of the secret
277 """
278 v1_core = self.clients[CORE_CLIENT]
279 secrets = v1_core.list_namespaced_secret(
280 namespace, field_selector="metadata.name={}".format(secret_name)
281 ).items
282
283 if len(secrets) > 0:
284 raise Exception(
285 "Secret with metadata.name={} already exists".format(secret_name)
286 )
287
288 annotations = {"kubernetes.io/service-account.name": service_account_name}
289 metadata = V1ObjectMeta(
290 name=secret_name, namespace=namespace, annotations=annotations
291 )
292 type = "kubernetes.io/service-account-token"
293 secret = V1Secret(metadata=metadata, type=type)
294 v1_core.create_namespaced_secret(namespace, secret)
295
296 def _get_secret_reference_list(self, namespace: str, secret_name: str):
297 """
298 Return a secret reference list with one secret.
299 K8s version >= 1.24
300
301 :param: namespace: Kubernetes namespace for service account metadata
302 :param: secret_name: Name of the secret
303 :rtype: list[V1SecretReference]
304 """
305 return [V1SecretReference(name=secret_name, namespace=namespace)]
306
307 def create_service_account(
308 self,
309 name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100310 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100311 namespace: str = "kube-system",
312 ):
313 """
314 Create a service account
315
316 :param: name: Name of the service account
317 :param: labels: Labels for service account metadata
318 :param: namespace: Kubernetes namespace for service account metadata
319 Default: kube-system
320 """
321 v1_core = self.clients[CORE_CLIENT]
322 service_accounts = v1_core.list_namespaced_service_account(
323 namespace, field_selector="metadata.name={}".format(name)
324 )
325 if len(service_accounts.items) > 0:
326 raise Exception(
327 "Service account with metadata.name={} already exists".format(name)
328 )
329
330 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
331
332 if self._need_to_create_new_secret():
333 secret_name = self._get_secret_name(name)
334 secrets = self._get_secret_reference_list(namespace, secret_name)
335 service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
336 v1_core.create_namespaced_service_account(namespace, service_account)
337 self._create_service_account_secret(name, namespace, secret_name)
338 else:
339 service_account = V1ServiceAccount(metadata=metadata)
340 v1_core.create_namespaced_service_account(namespace, service_account)
341
342 def delete_secret(self, name: str, namespace: str = "kube-system"):
343 """
344 Delete a secret
345
346 :param: name: Name of the secret
347 :param: namespace: Kubernetes namespace
348 Default: kube-system
349 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100350 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100351 self.clients[CORE_CLIENT].delete_namespaced_secret(name, namespace)
352
353 def delete_service_account(self, name: str, namespace: str = "kube-system"):
354 """
355 Delete a service account
356
357 :param: name: Name of the service account
358 :param: namespace: Kubernetes namespace for service account metadata
359 Default: kube-system
360 """
361 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
362
363 def create_cluster_role_binding(
garciadeblas6bbe55e2025-02-08 11:30:39 +0100364 self, name: str, labels: typing.Dict[str, str], namespace: str = "kube-system"
almagiacdd20ae2024-12-13 09:45:45 +0100365 ):
366 """
367 Create a cluster role binding
368
369 :param: name: Name of the cluster role
370 :param: labels: Labels for cluster role binding metadata
371 :param: namespace: Kubernetes namespace for cluster role binding metadata
372 Default: kube-system
373 """
374 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
375 field_selector="metadata.name={}".format(name)
376 )
377 if len(role_bindings.items) > 0:
378 raise Exception("Generated rbac id already exists")
379
380 role_binding = V1ClusterRoleBinding(
381 metadata=V1ObjectMeta(name=name, labels=labels),
382 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
383 subjects=[
384 RbacV1Subject(kind="ServiceAccount", name=name, namespace=namespace)
385 ],
386 )
387 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
388
389 async def create_role_binding(
390 self,
391 name: str,
392 role_name: str,
393 sa_name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100394 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100395 namespace: str,
396 ):
397 """
398 Create a cluster role binding
399
400 :param: name: Name of the namespaced Role Binding
401 :param: role_name: Name of the namespaced Role to be bound
402 :param: sa_name: Name of the Service Account to be bound
403 :param: labels: Labels for Role Binding metadata
404 :param: namespace: Kubernetes namespace for Role Binding metadata
405
406 :return: None
407 """
408 role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
409 namespace, field_selector="metadata.name={}".format(name)
410 )
411 if len(role_bindings.items) > 0:
412 raise Exception(
413 "Role Binding with metadata.name={} already exists".format(name)
414 )
415
416 role_binding = V1RoleBinding(
417 metadata=V1ObjectMeta(name=name, labels=labels),
418 role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
419 subjects=[
420 RbacV1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
421 ],
422 )
423 self.clients[RBAC_CLIENT].create_namespaced_role_binding(
424 namespace, role_binding
425 )
426
427 def delete_cluster_role_binding(self, name: str):
428 """
429 Delete a cluster role binding
430
431 :param: name: Name of the cluster role binding
432 """
433 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
434
435 @retry(
436 attempts=10,
437 delay=1,
438 fallback=Exception("Failed getting the secret from service account"),
439 callback=retry_callback,
440 )
441 async def get_secret_data(
442 self, name: str, namespace: str = "kube-system"
garciadeblas6d8acf32025-02-06 13:34:37 +0100443 ) -> typing.Tuple[str, str]:
almagiacdd20ae2024-12-13 09:45:45 +0100444 """
445 Get secret data
446
447 :param: name: Name of the secret data
448 :param: namespace: Name of the namespace where the secret is stored
449
450 :return: Tuple with the token and client certificate
451 """
452 v1_core = self.clients[CORE_CLIENT]
453
454 secret_name = None
455
456 service_accounts = v1_core.list_namespaced_service_account(
457 namespace, field_selector="metadata.name={}".format(name)
458 )
459 if len(service_accounts.items) == 0:
460 raise Exception(
461 "Service account not found with metadata.name={}".format(name)
462 )
463 service_account = service_accounts.items[0]
464 if service_account.secrets and len(service_account.secrets) > 0:
465 secret_name = service_account.secrets[0].name
466 if not secret_name:
467 raise Exception(
468 "Failed getting the secret from service account {}".format(name)
469 )
470 # TODO: refactor to use get_secret_content
471 secret = v1_core.list_namespaced_secret(
472 namespace, field_selector="metadata.name={}".format(secret_name)
473 ).items[0]
474
475 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
476 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
477
478 return (
479 base64.b64decode(token).decode("utf-8"),
480 base64.b64decode(client_certificate_data).decode("utf-8"),
481 )
482
483 @retry(
484 attempts=10,
485 delay=1,
486 fallback=Exception("Failed getting data from the secret"),
487 )
488 async def get_secret_content(
489 self,
490 name: str,
491 namespace: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100492 ) -> typing.Dict:
almagiacdd20ae2024-12-13 09:45:45 +0100493 """
494 Get secret data
495
496 :param: name: Name of the secret
497 :param: namespace: Name of the namespace where the secret is stored
498
499 :return: Dictionary with secret's data
500 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100501 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100502 v1_core = self.clients[CORE_CLIENT]
503
504 secret = v1_core.read_namespaced_secret(name, namespace)
505
506 return secret.data
507
508 @retry(
509 attempts=10,
510 delay=1,
511 fallback=Exception("Failed creating the secret"),
512 )
513 async def create_secret(
514 self, name: str, data: dict, namespace: str, secret_type: str
515 ):
516 """
517 Create secret with data
518
519 :param: name: Name of the secret
520 :param: data: Dict with data content. Values must be already base64 encoded
521 :param: namespace: Name of the namespace where the secret will be stored
522 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
523
524 :return: None
525 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100526 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
garciadeblas6d8acf32025-02-06 13:34:37 +0100527 self.logger.debug("Enter create_secret function")
almagiacdd20ae2024-12-13 09:45:45 +0100528 v1_core = self.clients[CORE_CLIENT]
garciadeblas6d8acf32025-02-06 13:34:37 +0100529 self.logger.debug(f"v1_core: {v1_core}")
almagiacdd20ae2024-12-13 09:45:45 +0100530 metadata = V1ObjectMeta(name=name, namespace=namespace)
garciadeblas6d8acf32025-02-06 13:34:37 +0100531 self.logger.debug(f"metadata: {metadata}")
almagiacdd20ae2024-12-13 09:45:45 +0100532 secret = V1Secret(metadata=metadata, data=data, type=secret_type)
garciadeblas6d8acf32025-02-06 13:34:37 +0100533 self.logger.debug(f"secret: {secret}")
garciadeblasc58e9962025-06-17 18:16:20 +0200534 try:
535 v1_core.create_namespaced_secret(namespace, secret)
536 self.logger.info("Namespaced secret was created")
537 except ApiException as e:
538 self.logger.error(f"Failed to create namespaced secret: {e}")
539 raise
almagiacdd20ae2024-12-13 09:45:45 +0100540
541 async def create_certificate(
542 self,
543 namespace: str,
544 name: str,
545 dns_prefix: str,
546 secret_name: str,
547 usages: list,
548 issuer_name: str,
549 ):
550 """
551 Creates cert-manager certificate object
552
553 :param: namespace: Name of the namespace where the certificate and secret is stored
554 :param: name: Name of the certificate object
555 :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
556 :param: secret_name: Name of the secret created by cert-manager
557 :param: usages: List of X.509 key usages
558 :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
559
560 """
561 certificate_body = {
562 "apiVersion": "cert-manager.io/v1",
563 "kind": "Certificate",
564 "metadata": {"name": name, "namespace": namespace},
565 "spec": {
566 "secretName": secret_name,
567 "privateKey": {
568 "rotationPolicy": "Always",
569 "algorithm": "ECDSA",
570 "size": 256,
571 },
572 "duration": "8760h", # 1 Year
573 "renewBefore": "2208h", # 9 months
574 "subject": {"organizations": ["osm"]},
575 "commonName": "osm",
576 "isCA": False,
577 "usages": usages,
578 "dnsNames": [
579 "{}.{}".format(dns_prefix, namespace),
580 "{}.{}.svc".format(dns_prefix, namespace),
581 "{}.{}.svc.cluster".format(dns_prefix, namespace),
582 "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
583 ],
584 "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
585 },
586 }
587 client = self.clients[CUSTOM_OBJECT_CLIENT]
588 try:
589 client.create_namespaced_custom_object(
590 group="cert-manager.io",
591 plural="certificates",
592 version="v1",
593 body=certificate_body,
594 namespace=namespace,
595 )
596 except ApiException as e:
597 info = json.loads(e.body)
598 if info.get("reason").lower() == "alreadyexists":
599 self.logger.warning("Certificate already exists: {}".format(e))
600 else:
601 raise e
602
603 async def delete_certificate(self, namespace, object_name):
604 client = self.clients[CUSTOM_OBJECT_CLIENT]
605 try:
606 client.delete_namespaced_custom_object(
607 group="cert-manager.io",
608 plural="certificates",
609 version="v1",
610 name=object_name,
611 namespace=namespace,
612 )
613 except ApiException as e:
614 info = json.loads(e.body)
615 if info.get("reason").lower() == "notfound":
616 self.logger.warning("Certificate already deleted: {}".format(e))
617 else:
618 raise e
619
620 @retry(
621 attempts=10,
622 delay=1,
623 fallback=Exception("Failed creating the namespace"),
624 )
625 async def create_namespace(self, name: str, labels: dict = None):
626 """
627 Create a namespace
628
629 :param: name: Name of the namespace to be created
630 :param: labels: Dictionary with labels for the new namespace
631
632 """
633 v1_core = self.clients[CORE_CLIENT]
634 metadata = V1ObjectMeta(name=name, labels=labels)
635 namespace = V1Namespace(
636 metadata=metadata,
637 )
638
639 try:
640 v1_core.create_namespace(namespace)
641 self.logger.debug("Namespace created: {}".format(name))
642 except ApiException as e:
643 info = json.loads(e.body)
644 if info.get("reason").lower() == "alreadyexists":
645 self.logger.warning("Namespace already exists: {}".format(e))
646 else:
647 raise e
648
649 @retry(
650 attempts=10,
651 delay=1,
652 fallback=Exception("Failed deleting the namespace"),
653 )
654 async def delete_namespace(self, name: str):
655 """
656 Delete a namespace
657
658 :param: name: Name of the namespace to be deleted
659
660 """
661 try:
662 self.clients[CORE_CLIENT].delete_namespace(name)
663 except ApiException as e:
664 if e.reason == "Not Found":
665 self.logger.warning("Namespace already deleted: {}".format(e))
666
667 def get_secrets(
668 self,
669 namespace: str,
670 field_selector: str = None,
671 ) -> typing.List[typing.Dict]:
672 """
673 Get Secret list from a namespace
674
675 :param: namespace: Kubernetes namespace
676 :param: field_selector: Kubernetes field selector
677
678 :return: List of the secrets matching the selectors specified
679 """
680 try:
681 v1_core = self.clients[CORE_CLIENT]
682 secrets = v1_core.list_namespaced_secret(
683 namespace=namespace,
684 field_selector=field_selector,
685 ).items
686 return secrets
687 except ApiException as e:
688 self.logger.error("Error calling get secrets: {}".format(e))
689 raise e
690
691 def create_generic_object(
692 self,
693 api_group: str,
694 api_plural: str,
695 api_version: str,
696 namespace: str,
697 manifest_dict: dict,
698 ):
699 """
700 Creates generic object
701
702 :param: api_group: API Group
703 :param: api_plural: API Plural
704 :param: api_version: API Version
705 :param: namespace: Namespace
706 :param: manifest_dict: Dictionary with the content of the Kubernetes manifest
707
708 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100709 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100710 client = self.clients[CUSTOM_OBJECT_CLIENT]
711 try:
712 if namespace:
713 client.create_namespaced_custom_object(
714 group=api_group,
715 plural=api_plural,
716 version=api_version,
717 body=manifest_dict,
718 namespace=namespace,
719 )
720 else:
721 client.create_cluster_custom_object(
722 group=api_group,
723 plural=api_plural,
724 version=api_version,
725 body=manifest_dict,
726 )
727 except ApiException as e:
728 info = json.loads(e.body)
729 if info.get("reason").lower() == "alreadyexists":
730 self.logger.warning("Object already exists: {}".format(e))
731 else:
732 raise e
733
734 def delete_generic_object(
735 self,
736 api_group: str,
737 api_plural: str,
738 api_version: str,
739 namespace: str,
740 name: str,
741 ):
742 """
743 Deletes generic object
744
745 :param: api_group: API Group
746 :param: api_plural: API Plural
747 :param: api_version: API Version
748 :param: namespace: Namespace
749 :param: name: Name of the object
750
751 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100752 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100753 client = self.clients[CUSTOM_OBJECT_CLIENT]
754 try:
755 if namespace:
756 client.delete_namespaced_custom_object(
757 group=api_group,
758 plural=api_plural,
759 version=api_version,
760 name=name,
761 namespace=namespace,
762 )
763 else:
764 client.delete_cluster_custom_object(
765 group=api_group,
766 plural=api_plural,
767 version=api_version,
768 name=name,
769 )
770 except ApiException as e:
771 info = json.loads(e.body)
772 if info.get("reason").lower() == "notfound":
773 self.logger.warning("Object already deleted: {}".format(e))
774 else:
775 raise e
776
777 async def get_generic_object(
778 self,
779 api_group: str,
780 api_plural: str,
781 api_version: str,
782 namespace: str,
783 name: str,
784 ):
785 """
786 Gets generic object
787
788 :param: api_group: API Group
789 :param: api_plural: API Plural
790 :param: api_version: API Version
791 :param: namespace: Namespace
792 :param: name: Name of the object
793
794 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100795 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100796 client = self.clients[CUSTOM_OBJECT_CLIENT]
797 try:
798 if namespace:
799 object_dict = client.list_namespaced_custom_object(
800 group=api_group,
801 plural=api_plural,
802 version=api_version,
803 namespace=namespace,
804 field_selector=f"metadata.name={name}",
805 )
806 else:
807 object_dict = client.list_cluster_custom_object(
808 group=api_group,
809 plural=api_plural,
810 version=api_version,
811 field_selector=f"metadata.name={name}",
812 )
813 if len(object_dict.get("items")) == 0:
814 return None
815 return object_dict.get("items")[0]
816 except ApiException as e:
817 self.logger.debug(f"Exception: {e}")
818 info = json.loads(e.body)
garciadeblasad6d1ba2025-01-22 16:02:18 +0100819 raise f"Api Exception: {e}. Reason: {info.get('reason')}"
almagiacdd20ae2024-12-13 09:45:45 +0100820
821 async def list_generic_object(
822 self,
823 api_group: str,
824 api_plural: str,
825 api_version: str,
826 namespace: str,
827 ):
828 """
829 Lists all generic objects of the requested API group
830
831 :param: api_group: API Group
832 :param: api_plural: API Plural
833 :param: api_version: API Version
834 :param: namespace: Namespace
835
836 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100837 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100838 client = self.clients[CUSTOM_OBJECT_CLIENT]
839 try:
840 if namespace:
841 object_dict = client.list_namespaced_custom_object(
842 group=api_group,
843 plural=api_plural,
844 version=api_version,
845 namespace=namespace,
846 )
847 else:
848 object_dict = client.list_cluster_custom_object(
849 group=api_group,
850 plural=api_plural,
851 version=api_version,
852 )
853 self.logger.debug(f"Object-list: {object_dict.get('items')}")
854 return object_dict.get("items")
855 except ApiException as e:
856 self.logger.debug(f"Exception: {e}")
857 info = json.loads(e.body)
858 if info.get("reason").lower() == "notfound":
859 self.logger.warning(
860 "Cannot find specified custom objects: {}".format(e)
861 )
862 return []
863 else:
864 raise e
865
866 @retry(
867 attempts=10,
868 delay=1,
869 fallback=Exception("Failed creating the secret"),
870 )
871 async def create_secret_string(
872 self, name: str, string_data: str, namespace: str, secret_type: str
873 ):
874 """
875 Create secret with data
876
877 :param: name: Name of the secret
878 :param: string_data: String with data content
879 :param: namespace: Name of the namespace where the secret will be stored
880 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
881
882 :return: None
883 """
884 v1_core = self.clients[CORE_CLIENT]
885 metadata = V1ObjectMeta(name=name, namespace=namespace)
886 secret = V1Secret(metadata=metadata, string_data=string_data, type=secret_type)
887 v1_core.create_namespaced_secret(namespace, secret)
888
889 @retry(
890 attempts=10,
891 delay=1,
892 fallback=Exception("Failed creating the pvc"),
893 )
894 async def create_pvc(self, name: str, namespace: str):
895 """
896 Create a namespace
897
898 :param: name: Name of the pvc to be created
899 :param: namespace: Name of the namespace where the pvc will be stored
900
901 """
902 try:
903 pvc = V1PersistentVolumeClaim(
904 api_version="v1",
905 kind="PersistentVolumeClaim",
906 metadata=V1ObjectMeta(name=name),
907 spec=V1PersistentVolumeClaimSpec(
908 access_modes=["ReadWriteOnce"],
909 resources=V1ResourceRequirements(requests={"storage": "100Mi"}),
910 ),
911 )
912 self.clients[CORE_CLIENT].create_namespaced_persistent_volume_claim(
913 namespace=namespace, body=pvc
914 )
915 except ApiException as e:
916 info = json.loads(e.body)
917 if info.get("reason").lower() == "alreadyexists":
918 self.logger.warning("PVC already exists: {}".format(e))
919 else:
920 raise e
921
922 @retry(
923 attempts=10,
924 delay=1,
925 fallback=Exception("Failed deleting the pvc"),
926 )
927 async def delete_pvc(self, name: str, namespace: str):
928 """
929 Create a namespace
930
931 :param: name: Name of the pvc to be deleted
932 :param: namespace: Namespace
933
934 """
935 self.clients[CORE_CLIENT].delete_namespaced_persistent_volume_claim(
936 name, namespace
937 )
938
939 def copy_file_to_pod(
940 self, namespace, pod_name, container_name, src_file, dest_path
941 ):
942 # Create an in-memory tar file containing the source file
943 tar_buffer = io.BytesIO()
944 with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
945 tar.add(src_file, arcname=dest_path.split("/")[-1])
946
947 tar_buffer.seek(0)
948
949 # Define the command to extract the tar file in the pod
950 exec_command = ["tar", "xvf", "-", "-C", dest_path.rsplit("/", 1)[0]]
951
952 # Execute the command
953 resp = stream(
954 self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
955 pod_name,
956 namespace,
957 command=exec_command,
958 container=container_name,
959 stdin=True,
960 stderr=True,
961 stdout=True,
962 tty=False,
963 _preload_content=False,
964 )
965
966 # Write the tar data to the pod
967 resp.write_stdin(tar_buffer.read())
968 resp.close()
969
970 @retry(
971 attempts=10,
972 delay=1,
973 fallback=Exception("Failed creating the pvc"),
974 )
975 async def create_pvc_with_content(
976 self, name: str, namespace: str, src_file: str, dest_filename: str
977 ):
978 """
979 Create a PVC with content
980
981 :param: name: Name of the pvc to be created
982 :param: namespace: Name of the namespace where the pvc will be stored
983 :param: src_file: File to be copied
984 :param: filename: Name of the file in the destination folder
985 """
986 pod_name = f"copy-pod-{name}"
987 self.logger.debug(f"Creating pvc {name}")
988 await self.create_pvc(name=name, namespace=namespace)
989 self.logger.debug("Sleeping")
990 sleep(40)
991 self.logger.debug(f"Creating pod {pod_name}")
992 await self.create_copy_pod(name=pod_name, namespace=namespace, pvc_name=name)
993 self.logger.debug("Sleeping")
994 sleep(40)
995 self.logger.debug(f"Copying files to pod {pod_name}")
996 self.copy_file_to_pod(
997 namespace=namespace,
998 pod_name=pod_name,
999 container_name="copy-container",
1000 src_file=src_file,
1001 dest_path=f"/mnt/data/{dest_filename}",
1002 )
1003
1004 @retry(
1005 attempts=10,
1006 delay=1,
1007 fallback=Exception("Failed creating the pvc"),
1008 )
1009 async def create_copy_pod(self, name: str, namespace: str, pvc_name: str):
1010 """
1011 Create a pod to copy content into a PVC
1012
1013 :param: name: Name of the pod to be created
1014 :param: namespace: Name of the namespace where the pod will be stored
1015 :param: pvc_name: Name of the PVC that the pod will mount as a volume
1016
1017 """
1018 pod = V1Pod(
1019 api_version="v1",
1020 kind="Pod",
garciadeblas6d8acf32025-02-06 13:34:37 +01001021 metadata=kclient.V1ObjectMeta(name=name),
almagiacdd20ae2024-12-13 09:45:45 +01001022 spec=V1PodSpec(
1023 containers=[
1024 V1Container(
1025 name="copy-container",
1026 image="busybox", # Imagen ligera para copiar archivos
1027 command=["sleep", "3600"], # Mantén el contenedor en ejecución
1028 volume_mounts=[
1029 V1VolumeMount(mount_path="/mnt/data", name="my-storage")
1030 ],
1031 )
1032 ],
1033 volumes=[
1034 V1Volume(
1035 name="my-storage",
1036 persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
1037 claim_name=pvc_name
1038 ),
1039 )
1040 ],
1041 ),
1042 )
1043 # Create the pod
1044 self.clients[CORE_CLIENT].create_namespaced_pod(namespace=namespace, body=pod)
1045
1046 @retry(
1047 attempts=10,
1048 delay=1,
1049 fallback=Exception("Failed deleting the pod"),
1050 )
1051 async def delete_pod(self, name: str, namespace: str):
1052 """
1053 Create a namespace
1054
1055 :param: name: Name of the pod to be deleted
1056 :param: namespace: Namespace
1057
1058 """
1059 self.clients[CORE_CLIENT].delete_namespaced_pod(name, namespace)