blob: 4190740b41ea903f2c3550407d58f3d70c00e2bc [file] [log] [blame]
garciadeblas5c465fa2024-12-12 14:38:13 +01001#######################################################################################
2# Copyright 2020 Canonical Ltd.
3# Copyright ETSI Contributors and Others.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#######################################################################################
17
18import base64
19import logging
20from typing import Dict
21import typing
22import uuid
23import json
24import tarfile
25import io
26from time import sleep
27
28from distutils.version import LooseVersion
29
30from kubernetes import client, config
31from kubernetes.client.api import VersionApi
32from kubernetes.client.models import (
33 V1ClusterRole,
34 V1Role,
35 V1ObjectMeta,
36 V1PolicyRule,
37 V1ServiceAccount,
38 V1ClusterRoleBinding,
39 V1RoleBinding,
40 V1RoleRef,
41 RbacV1Subject,
42 V1Secret,
43 V1SecretReference,
44 V1Namespace,
45 V1PersistentVolumeClaim,
46 V1PersistentVolumeClaimSpec,
47 V1PersistentVolumeClaimVolumeSource,
48 V1ResourceRequirements,
49 V1Pod,
50 V1PodSpec,
51 V1Volume,
52 V1VolumeMount,
53 V1Container,
54)
55from kubernetes.client.rest import ApiException
56from kubernetes.stream import stream
57from osm_lcm.n2vc.libjuju import retry_callback
58from retrying_async import retry
59
60SERVICE_ACCOUNT_TOKEN_KEY = "token"
61SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
62# clients
63CORE_CLIENT = "core_v1"
64RBAC_CLIENT = "rbac_v1"
65STORAGE_CLIENT = "storage_v1"
66CUSTOM_OBJECT_CLIENT = "custom_object"
67
68
69class Kubectl:
70 def __init__(self, config_file=None):
71 config.load_kube_config(config_file=config_file)
72 self._clients = {
73 CORE_CLIENT: client.CoreV1Api(),
74 RBAC_CLIENT: client.RbacAuthorizationV1Api(),
75 STORAGE_CLIENT: client.StorageV1Api(),
76 CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
77 }
78 self._configuration = config.kube_config.Configuration.get_default_copy()
79 self.logger = logging.getLogger("lcm.kubectl")
80
81 @property
82 def configuration(self):
83 return self._configuration
84
85 @property
86 def clients(self):
87 return self._clients
88
89 def get_services(
90 self,
91 field_selector: str = None,
92 label_selector: str = None,
93 ) -> typing.List[typing.Dict]:
94 """
95 Get Service list from a namespace
96
97 :param: field_selector: Kubernetes field selector for the namespace
98 :param: label_selector: Kubernetes label selector for the namespace
99
100 :return: List of the services matching the selectors specified
101 """
102 kwargs = {}
103 if field_selector:
104 kwargs["field_selector"] = field_selector
105 if label_selector:
106 kwargs["label_selector"] = label_selector
107 try:
108 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
109 return [
110 {
111 "name": i.metadata.name,
112 "cluster_ip": i.spec.cluster_ip,
113 "type": i.spec.type,
114 "ports": (
115 [
116 {
117 "name": p.name,
118 "node_port": p.node_port,
119 "port": p.port,
120 "protocol": p.protocol,
121 "target_port": p.target_port,
122 }
123 for p in i.spec.ports
124 ]
125 if i.spec.ports
126 else []
127 ),
128 "external_ip": [i.ip for i in i.status.load_balancer.ingress]
129 if i.status.load_balancer.ingress
130 else None,
131 }
132 for i in result.items
133 ]
134 except ApiException as e:
135 self.logger.error("Error calling get services: {}".format(e))
136 raise e
137
138 def get_default_storage_class(self) -> str:
139 """
140 Default storage class
141
142 :return: Returns the default storage class name, if exists.
143 If not, it returns the first storage class.
144 If there are not storage classes, returns None
145 """
146 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
147 selected_sc = None
148 default_sc_annotations = {
149 "storageclass.kubernetes.io/is-default-class": "true",
150 # Older clusters still use the beta annotation.
151 "storageclass.beta.kubernetes.io/is-default-class": "true",
152 }
153 for sc in storage_classes.items:
154 if not selected_sc:
155 # Select the first storage class in case there is no a default-class
156 selected_sc = sc.metadata.name
157 annotations = sc.metadata.annotations or {}
158 if any(
159 k in annotations and annotations[k] == v
160 for k, v in default_sc_annotations.items()
161 ):
162 # Default storage
163 selected_sc = sc.metadata.name
164 break
165 return selected_sc
166
167 def create_cluster_role(
168 self,
169 name: str,
170 labels: Dict[str, str],
171 namespace: str = "kube-system",
172 ):
173 """
174 Create a cluster role
175
176 :param: name: Name of the cluster role
177 :param: labels: Labels for cluster role metadata
178 :param: namespace: Kubernetes namespace for cluster role metadata
179 Default: kube-system
180 """
181 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
182 field_selector="metadata.name={}".format(name)
183 )
184
185 if len(cluster_roles.items) > 0:
186 raise Exception("Role with metadata.name={} already exists".format(name))
187
188 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
189 # Cluster role
190 cluster_role = V1ClusterRole(
191 metadata=metadata,
192 rules=[
193 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
194 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
195 ],
196 )
197
198 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
199
200 async def create_role(
201 self,
202 name: str,
203 labels: Dict[str, str],
204 api_groups: list,
205 resources: list,
206 verbs: list,
207 namespace: str,
208 ):
209 """
210 Create a role with one PolicyRule
211
212 :param: name: Name of the namespaced Role
213 :param: labels: Labels for namespaced Role metadata
214 :param: api_groups: List with api-groups allowed in the policy rule
215 :param: resources: List with resources allowed in the policy rule
216 :param: verbs: List with verbs allowed in the policy rule
217 :param: namespace: Kubernetes namespace for Role metadata
218
219 :return: None
220 """
221
222 roles = self.clients[RBAC_CLIENT].list_namespaced_role(
223 namespace, field_selector="metadata.name={}".format(name)
224 )
225
226 if len(roles.items) > 0:
227 raise Exception("Role with metadata.name={} already exists".format(name))
228
229 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
230
231 role = V1Role(
232 metadata=metadata,
233 rules=[
234 V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
235 ],
236 )
237
238 self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
239
240 def delete_cluster_role(self, name: str):
241 """
242 Delete a cluster role
243
244 :param: name: Name of the cluster role
245 """
246 self.clients[RBAC_CLIENT].delete_cluster_role(name)
247
248 def _get_kubectl_version(self):
249 version = VersionApi().get_code()
250 return "{}.{}".format(version.major, version.minor)
251
252 def _need_to_create_new_secret(self):
253 min_k8s_version = "1.24"
254 current_k8s_version = self._get_kubectl_version()
255 return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
256
257 def _get_secret_name(self, service_account_name: str):
258 random_alphanum = str(uuid.uuid4())[:5]
259 return "{}-token-{}".format(service_account_name, random_alphanum)
260
261 def _create_service_account_secret(
262 self,
263 service_account_name: str,
264 namespace: str,
265 secret_name: str,
266 ):
267 """
268 Create a secret for the service account. K8s version >= 1.24
269
270 :param: service_account_name: Name of the service account
271 :param: namespace: Kubernetes namespace for service account metadata
272 :param: secret_name: Name of the secret
273 """
274 v1_core = self.clients[CORE_CLIENT]
275 secrets = v1_core.list_namespaced_secret(
276 namespace, field_selector="metadata.name={}".format(secret_name)
277 ).items
278
279 if len(secrets) > 0:
280 raise Exception(
281 "Secret with metadata.name={} already exists".format(secret_name)
282 )
283
284 annotations = {"kubernetes.io/service-account.name": service_account_name}
285 metadata = V1ObjectMeta(
286 name=secret_name, namespace=namespace, annotations=annotations
287 )
288 type = "kubernetes.io/service-account-token"
289 secret = V1Secret(metadata=metadata, type=type)
290 v1_core.create_namespaced_secret(namespace, secret)
291
292 def _get_secret_reference_list(self, namespace: str, secret_name: str):
293 """
294 Return a secret reference list with one secret.
295 K8s version >= 1.24
296
297 :param: namespace: Kubernetes namespace for service account metadata
298 :param: secret_name: Name of the secret
299 :rtype: list[V1SecretReference]
300 """
301 return [V1SecretReference(name=secret_name, namespace=namespace)]
302
303 def create_service_account(
304 self,
305 name: str,
306 labels: Dict[str, str],
307 namespace: str = "kube-system",
308 ):
309 """
310 Create a service account
311
312 :param: name: Name of the service account
313 :param: labels: Labels for service account metadata
314 :param: namespace: Kubernetes namespace for service account metadata
315 Default: kube-system
316 """
317 v1_core = self.clients[CORE_CLIENT]
318 service_accounts = v1_core.list_namespaced_service_account(
319 namespace, field_selector="metadata.name={}".format(name)
320 )
321 if len(service_accounts.items) > 0:
322 raise Exception(
323 "Service account with metadata.name={} already exists".format(name)
324 )
325
326 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
327
328 if self._need_to_create_new_secret():
329 secret_name = self._get_secret_name(name)
330 secrets = self._get_secret_reference_list(namespace, secret_name)
331 service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
332 v1_core.create_namespaced_service_account(namespace, service_account)
333 self._create_service_account_secret(name, namespace, secret_name)
334 else:
335 service_account = V1ServiceAccount(metadata=metadata)
336 v1_core.create_namespaced_service_account(namespace, service_account)
337
338 def delete_secret(self, name: str, namespace: str = "kube-system"):
339 """
340 Delete a secret
341
342 :param: name: Name of the secret
343 :param: namespace: Kubernetes namespace
344 Default: kube-system
345 """
346 self.clients[CORE_CLIENT].delete_namespaced_secret(name, namespace)
347
348 def delete_service_account(self, name: str, namespace: str = "kube-system"):
349 """
350 Delete a service account
351
352 :param: name: Name of the service account
353 :param: namespace: Kubernetes namespace for service account metadata
354 Default: kube-system
355 """
356 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
357
358 def create_cluster_role_binding(
359 self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
360 ):
361 """
362 Create a cluster role binding
363
364 :param: name: Name of the cluster role
365 :param: labels: Labels for cluster role binding metadata
366 :param: namespace: Kubernetes namespace for cluster role binding metadata
367 Default: kube-system
368 """
369 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
370 field_selector="metadata.name={}".format(name)
371 )
372 if len(role_bindings.items) > 0:
373 raise Exception("Generated rbac id already exists")
374
375 role_binding = V1ClusterRoleBinding(
376 metadata=V1ObjectMeta(name=name, labels=labels),
377 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
378 subjects=[
379 RbacV1Subject(kind="ServiceAccount", name=name, namespace=namespace)
380 ],
381 )
382 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
383
384 async def create_role_binding(
385 self,
386 name: str,
387 role_name: str,
388 sa_name: str,
389 labels: Dict[str, str],
390 namespace: str,
391 ):
392 """
393 Create a cluster role binding
394
395 :param: name: Name of the namespaced Role Binding
396 :param: role_name: Name of the namespaced Role to be bound
397 :param: sa_name: Name of the Service Account to be bound
398 :param: labels: Labels for Role Binding metadata
399 :param: namespace: Kubernetes namespace for Role Binding metadata
400
401 :return: None
402 """
403 role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
404 namespace, field_selector="metadata.name={}".format(name)
405 )
406 if len(role_bindings.items) > 0:
407 raise Exception(
408 "Role Binding with metadata.name={} already exists".format(name)
409 )
410
411 role_binding = V1RoleBinding(
412 metadata=V1ObjectMeta(name=name, labels=labels),
413 role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
414 subjects=[
415 RbacV1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
416 ],
417 )
418 self.clients[RBAC_CLIENT].create_namespaced_role_binding(
419 namespace, role_binding
420 )
421
422 def delete_cluster_role_binding(self, name: str):
423 """
424 Delete a cluster role binding
425
426 :param: name: Name of the cluster role binding
427 """
428 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
429
430 @retry(
431 attempts=10,
432 delay=1,
433 fallback=Exception("Failed getting the secret from service account"),
434 callback=retry_callback,
435 )
436 async def get_secret_data(
437 self, name: str, namespace: str = "kube-system"
438 ) -> (str, str):
439 """
440 Get secret data
441
442 :param: name: Name of the secret data
443 :param: namespace: Name of the namespace where the secret is stored
444
445 :return: Tuple with the token and client certificate
446 """
447 v1_core = self.clients[CORE_CLIENT]
448
449 secret_name = None
450
451 service_accounts = v1_core.list_namespaced_service_account(
452 namespace, field_selector="metadata.name={}".format(name)
453 )
454 if len(service_accounts.items) == 0:
455 raise Exception(
456 "Service account not found with metadata.name={}".format(name)
457 )
458 service_account = service_accounts.items[0]
459 if service_account.secrets and len(service_account.secrets) > 0:
460 secret_name = service_account.secrets[0].name
461 if not secret_name:
462 raise Exception(
463 "Failed getting the secret from service account {}".format(name)
464 )
465 # TODO: refactor to use get_secret_content
466 secret = v1_core.list_namespaced_secret(
467 namespace, field_selector="metadata.name={}".format(secret_name)
468 ).items[0]
469
470 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
471 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
472
473 return (
474 base64.b64decode(token).decode("utf-8"),
475 base64.b64decode(client_certificate_data).decode("utf-8"),
476 )
477
478 @retry(
479 attempts=10,
480 delay=1,
481 fallback=Exception("Failed getting data from the secret"),
482 )
483 async def get_secret_content(
484 self,
485 name: str,
486 namespace: str,
487 ) -> dict:
488 """
489 Get secret data
490
491 :param: name: Name of the secret
492 :param: namespace: Name of the namespace where the secret is stored
493
494 :return: Dictionary with secret's data
495 """
496 v1_core = self.clients[CORE_CLIENT]
497
498 secret = v1_core.read_namespaced_secret(name, namespace)
499
500 return secret.data
501
502 @retry(
503 attempts=10,
504 delay=1,
505 fallback=Exception("Failed creating the secret"),
506 )
507 async def create_secret(
508 self, name: str, data: dict, namespace: str, secret_type: str
509 ):
510 """
511 Create secret with data
512
513 :param: name: Name of the secret
514 :param: data: Dict with data content. Values must be already base64 encoded
515 :param: namespace: Name of the namespace where the secret will be stored
516 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
517
518 :return: None
519 """
520 self.logger.info("Enter create_secret function")
521 v1_core = self.clients[CORE_CLIENT]
522 self.logger.info(f"v1_core: {v1_core}")
523 metadata = V1ObjectMeta(name=name, namespace=namespace)
524 self.logger.info(f"metadata: {metadata}")
525 secret = V1Secret(metadata=metadata, data=data, type=secret_type)
526 self.logger.info(f"secret: {secret}")
527 v1_core.create_namespaced_secret(namespace, secret)
528 self.logger.info("Namespaced secret was created")
529
530 async def create_certificate(
531 self,
532 namespace: str,
533 name: str,
534 dns_prefix: str,
535 secret_name: str,
536 usages: list,
537 issuer_name: str,
538 ):
539 """
540 Creates cert-manager certificate object
541
542 :param: namespace: Name of the namespace where the certificate and secret is stored
543 :param: name: Name of the certificate object
544 :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
545 :param: secret_name: Name of the secret created by cert-manager
546 :param: usages: List of X.509 key usages
547 :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
548
549 """
550 certificate_body = {
551 "apiVersion": "cert-manager.io/v1",
552 "kind": "Certificate",
553 "metadata": {"name": name, "namespace": namespace},
554 "spec": {
555 "secretName": secret_name,
556 "privateKey": {
557 "rotationPolicy": "Always",
558 "algorithm": "ECDSA",
559 "size": 256,
560 },
561 "duration": "8760h", # 1 Year
562 "renewBefore": "2208h", # 9 months
563 "subject": {"organizations": ["osm"]},
564 "commonName": "osm",
565 "isCA": False,
566 "usages": usages,
567 "dnsNames": [
568 "{}.{}".format(dns_prefix, namespace),
569 "{}.{}.svc".format(dns_prefix, namespace),
570 "{}.{}.svc.cluster".format(dns_prefix, namespace),
571 "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
572 ],
573 "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
574 },
575 }
576 client = self.clients[CUSTOM_OBJECT_CLIENT]
577 try:
578 client.create_namespaced_custom_object(
579 group="cert-manager.io",
580 plural="certificates",
581 version="v1",
582 body=certificate_body,
583 namespace=namespace,
584 )
585 except ApiException as e:
586 info = json.loads(e.body)
587 if info.get("reason").lower() == "alreadyexists":
588 self.logger.warning("Certificate already exists: {}".format(e))
589 else:
590 raise e
591
592 async def delete_certificate(self, namespace, object_name):
593 client = self.clients[CUSTOM_OBJECT_CLIENT]
594 try:
595 client.delete_namespaced_custom_object(
596 group="cert-manager.io",
597 plural="certificates",
598 version="v1",
599 name=object_name,
600 namespace=namespace,
601 )
602 except ApiException as e:
603 info = json.loads(e.body)
604 if info.get("reason").lower() == "notfound":
605 self.logger.warning("Certificate already deleted: {}".format(e))
606 else:
607 raise e
608
609 @retry(
610 attempts=10,
611 delay=1,
612 fallback=Exception("Failed creating the namespace"),
613 )
614 async def create_namespace(self, name: str, labels: dict = None):
615 """
616 Create a namespace
617
618 :param: name: Name of the namespace to be created
619 :param: labels: Dictionary with labels for the new namespace
620
621 """
622 v1_core = self.clients[CORE_CLIENT]
623 metadata = V1ObjectMeta(name=name, labels=labels)
624 namespace = V1Namespace(
625 metadata=metadata,
626 )
627
628 try:
629 v1_core.create_namespace(namespace)
630 self.logger.debug("Namespace created: {}".format(name))
631 except ApiException as e:
632 info = json.loads(e.body)
633 if info.get("reason").lower() == "alreadyexists":
634 self.logger.warning("Namespace already exists: {}".format(e))
635 else:
636 raise e
637
638 @retry(
639 attempts=10,
640 delay=1,
641 fallback=Exception("Failed deleting the namespace"),
642 )
643 async def delete_namespace(self, name: str):
644 """
645 Delete a namespace
646
647 :param: name: Name of the namespace to be deleted
648
649 """
650 try:
651 self.clients[CORE_CLIENT].delete_namespace(name)
652 except ApiException as e:
653 if e.reason == "Not Found":
654 self.logger.warning("Namespace already deleted: {}".format(e))
655
656 def get_secrets(
657 self,
658 namespace: str,
659 field_selector: str = None,
660 ) -> typing.List[typing.Dict]:
661 """
662 Get Secret list from a namespace
663
664 :param: namespace: Kubernetes namespace
665 :param: field_selector: Kubernetes field selector
666
667 :return: List of the secrets matching the selectors specified
668 """
669 try:
670 v1_core = self.clients[CORE_CLIENT]
671 secrets = v1_core.list_namespaced_secret(
672 namespace=namespace,
673 field_selector=field_selector,
674 ).items
675 return secrets
676 except ApiException as e:
677 self.logger.error("Error calling get secrets: {}".format(e))
678 raise e
679
680 def create_generic_object(
681 self,
682 api_group: str,
683 api_plural: str,
684 api_version: str,
685 namespace: str,
686 manifest_dict: dict,
687 ):
688 """
689 Creates generic object
690
691 :param: api_group: API Group
692 :param: api_plural: API Plural
693 :param: api_version: API Version
694 :param: namespace: Namespace
695 :param: manifest_dict: Dictionary with the content of the Kubernetes manifest
696
697 """
698 client = self.clients[CUSTOM_OBJECT_CLIENT]
699 try:
700 if namespace:
701 client.create_namespaced_custom_object(
702 group=api_group,
703 plural=api_plural,
704 version=api_version,
705 body=manifest_dict,
706 namespace=namespace,
707 )
708 else:
709 client.create_cluster_custom_object(
710 group=api_group,
711 plural=api_plural,
712 version=api_version,
713 body=manifest_dict,
714 )
715 except ApiException as e:
716 info = json.loads(e.body)
717 if info.get("reason").lower() == "alreadyexists":
718 self.logger.warning("Object already exists: {}".format(e))
719 else:
720 raise e
721
722 def delete_generic_object(
723 self,
724 api_group: str,
725 api_plural: str,
726 api_version: str,
727 namespace: str,
728 name: str,
729 ):
730 """
731 Deletes generic object
732
733 :param: api_group: API Group
734 :param: api_plural: API Plural
735 :param: api_version: API Version
736 :param: namespace: Namespace
737 :param: name: Name of the object
738
739 """
740 client = self.clients[CUSTOM_OBJECT_CLIENT]
741 try:
742 if namespace:
743 client.delete_namespaced_custom_object(
744 group=api_group,
745 plural=api_plural,
746 version=api_version,
747 name=name,
748 namespace=namespace,
749 )
750 else:
751 client.delete_cluster_custom_object(
752 group=api_group,
753 plural=api_plural,
754 version=api_version,
755 name=name,
756 )
757 except ApiException as e:
758 info = json.loads(e.body)
759 if info.get("reason").lower() == "notfound":
760 self.logger.warning("Object already deleted: {}".format(e))
761 else:
762 raise e
763
764 async def get_generic_object(
765 self,
766 api_group: str,
767 api_plural: str,
768 api_version: str,
769 namespace: str,
770 name: str,
771 ):
772 """
773 Gets generic object
774
775 :param: api_group: API Group
776 :param: api_plural: API Plural
777 :param: api_version: API Version
778 :param: namespace: Namespace
779 :param: name: Name of the object
780
781 """
782 client = self.clients[CUSTOM_OBJECT_CLIENT]
783 try:
784 if namespace:
785 object_dict = client.list_namespaced_custom_object(
786 group=api_group,
787 plural=api_plural,
788 version=api_version,
789 namespace=namespace,
790 field_selector=f"metadata.name={name}",
791 )
792 else:
793 object_dict = client.list_cluster_custom_object(
794 group=api_group,
795 plural=api_plural,
796 version=api_version,
797 field_selector=f"metadata.name={name}",
798 )
799 if len(object_dict.get("items")) == 0:
800 return None
801 return object_dict.get("items")[0]
802 except ApiException as e:
803 self.logger.debug(f"Exception: {e}")
804 info = json.loads(e.body)
805 if info.get("reason").lower() == "notfound":
806 self.logger.warning("Cannot get custom object: {}".format(e))
807 return None
808 else:
809 raise e
810
811 async def list_generic_object(
812 self,
813 api_group: str,
814 api_plural: str,
815 api_version: str,
816 namespace: str,
817 ):
818 """
819 Lists all generic objects of the requested API group
820
821 :param: api_group: API Group
822 :param: api_plural: API Plural
823 :param: api_version: API Version
824 :param: namespace: Namespace
825
826 """
827 client = self.clients[CUSTOM_OBJECT_CLIENT]
828 try:
829 if namespace:
830 object_dict = client.list_namespaced_custom_object(
831 group=api_group,
832 plural=api_plural,
833 version=api_version,
834 namespace=namespace,
835 )
836 else:
837 object_dict = client.list_cluster_custom_object(
838 group=api_group,
839 plural=api_plural,
840 version=api_version,
841 )
842 self.logger.debug(f"Object-list: {object_dict.get('items')}")
843 return object_dict.get("items")
844 except ApiException as e:
845 self.logger.debug(f"Exception: {e}")
846 info = json.loads(e.body)
847 if info.get("reason").lower() == "notfound":
848 self.logger.warning(
849 "Cannot find specified custom objects: {}".format(e)
850 )
851 return []
852 else:
853 raise e
854
855 @retry(
856 attempts=10,
857 delay=1,
858 fallback=Exception("Failed creating the secret"),
859 )
860 async def create_secret_string(
861 self, name: str, string_data: str, namespace: str, secret_type: str
862 ):
863 """
864 Create secret with data
865
866 :param: name: Name of the secret
867 :param: string_data: String with data content
868 :param: namespace: Name of the namespace where the secret will be stored
869 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
870
871 :return: None
872 """
873 v1_core = self.clients[CORE_CLIENT]
874 metadata = V1ObjectMeta(name=name, namespace=namespace)
875 secret = V1Secret(metadata=metadata, string_data=string_data, type=secret_type)
876 v1_core.create_namespaced_secret(namespace, secret)
877
878 @retry(
879 attempts=10,
880 delay=1,
881 fallback=Exception("Failed creating the pvc"),
882 )
883 async def create_pvc(self, name: str, namespace: str):
884 """
885 Create a namespace
886
887 :param: name: Name of the pvc to be created
888 :param: namespace: Name of the namespace where the pvc will be stored
889
890 """
891 try:
892 pvc = V1PersistentVolumeClaim(
893 api_version="v1",
894 kind="PersistentVolumeClaim",
895 metadata=V1ObjectMeta(name=name),
896 spec=V1PersistentVolumeClaimSpec(
897 access_modes=["ReadWriteOnce"],
898 resources=V1ResourceRequirements(requests={"storage": "100Mi"}),
899 ),
900 )
901 self.clients[CORE_CLIENT].create_namespaced_persistent_volume_claim(
902 namespace=namespace, body=pvc
903 )
904 except ApiException as e:
905 info = json.loads(e.body)
906 if info.get("reason").lower() == "alreadyexists":
907 self.logger.warning("PVC already exists: {}".format(e))
908 else:
909 raise e
910
911 @retry(
912 attempts=10,
913 delay=1,
914 fallback=Exception("Failed deleting the pvc"),
915 )
916 async def delete_pvc(self, name: str, namespace: str):
917 """
918 Create a namespace
919
920 :param: name: Name of the pvc to be deleted
921 :param: namespace: Namespace
922
923 """
924 self.clients[CORE_CLIENT].delete_namespaced_persistent_volume_claim(
925 name, namespace
926 )
927
928 def copy_file_to_pod(
929 self, namespace, pod_name, container_name, src_file, dest_path
930 ):
931 # Create an in-memory tar file containing the source file
932 tar_buffer = io.BytesIO()
933 with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
934 tar.add(src_file, arcname=dest_path.split("/")[-1])
935
936 tar_buffer.seek(0)
937
938 # Define the command to extract the tar file in the pod
939 exec_command = ["tar", "xvf", "-", "-C", dest_path.rsplit("/", 1)[0]]
940
941 # Execute the command
942 resp = stream(
943 self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
944 pod_name,
945 namespace,
946 command=exec_command,
947 container=container_name,
948 stdin=True,
949 stderr=True,
950 stdout=True,
951 tty=False,
952 _preload_content=False,
953 )
954
955 # Write the tar data to the pod
956 resp.write_stdin(tar_buffer.read())
957 resp.close()
958
959 @retry(
960 attempts=10,
961 delay=1,
962 fallback=Exception("Failed creating the pvc"),
963 )
964 async def create_pvc_with_content(
965 self, name: str, namespace: str, src_file: str, dest_filename: str
966 ):
967 """
968 Create a PVC with content
969
970 :param: name: Name of the pvc to be created
971 :param: namespace: Name of the namespace where the pvc will be stored
972 :param: src_file: File to be copied
973 :param: filename: Name of the file in the destination folder
974 """
975 pod_name = f"copy-pod-{name}"
976 self.logger.debug(f"Creating pvc {name}")
977 await self.create_pvc(name=name, namespace=namespace)
978 self.logger.debug("Sleeping")
979 sleep(40)
980 self.logger.debug(f"Creating pod {pod_name}")
981 await self.create_copy_pod(name=pod_name, namespace=namespace, pvc_name=name)
982 self.logger.debug("Sleeping")
983 sleep(40)
984 self.logger.debug(f"Copying files to pod {pod_name}")
985 self.copy_file_to_pod(
986 namespace=namespace,
987 pod_name=pod_name,
988 container_name="copy-container",
989 src_file=src_file,
990 dest_path=f"/mnt/data/{dest_filename}",
991 )
992
993 @retry(
994 attempts=10,
995 delay=1,
996 fallback=Exception("Failed creating the pvc"),
997 )
998 async def create_copy_pod(self, name: str, namespace: str, pvc_name: str):
999 """
1000 Create a pod to copy content into a PVC
1001
1002 :param: name: Name of the pod to be created
1003 :param: namespace: Name of the namespace where the pod will be stored
1004 :param: pvc_name: Name of the PVC that the pod will mount as a volume
1005
1006 """
1007 pod = V1Pod(
1008 api_version="v1",
1009 kind="Pod",
1010 metadata=client.V1ObjectMeta(name=name),
1011 spec=V1PodSpec(
1012 containers=[
1013 V1Container(
1014 name="copy-container",
1015 image="busybox", # Imagen ligera para copiar archivos
1016 command=["sleep", "3600"], # Mantén el contenedor en ejecución
1017 volume_mounts=[
1018 V1VolumeMount(mount_path="/mnt/data", name="my-storage")
1019 ],
1020 )
1021 ],
1022 volumes=[
1023 V1Volume(
1024 name="my-storage",
1025 persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
1026 claim_name=pvc_name
1027 ),
1028 )
1029 ],
1030 ),
1031 )
1032 # Create the pod
1033 self.clients[CORE_CLIENT].create_namespaced_pod(namespace=namespace, body=pod)
1034
1035 @retry(
1036 attempts=10,
1037 delay=1,
1038 fallback=Exception("Failed deleting the pod"),
1039 )
1040 async def delete_pod(self, name: str, namespace: str):
1041 """
1042 Create a namespace
1043
1044 :param: name: Name of the pod to be deleted
1045 :param: namespace: Namespace
1046
1047 """
1048 self.clients[CORE_CLIENT].delete_namespaced_pod(name, namespace)