blob: eee63dccaa5407187890b5b44bb9bc24da415c6c [file] [log] [blame]
almagiacdd20ae2024-12-13 09:45:45 +01001#######################################################################################
2# Copyright 2020 Canonical Ltd.
3# Copyright ETSI Contributors and Others.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#######################################################################################
17
18import base64
19import logging
20from typing import Dict
21import typing
22import uuid
23import json
24import tarfile
25import io
26from time import sleep
27
28from distutils.version import LooseVersion
29
30from kubernetes import client, config
31from kubernetes.client.api import VersionApi
32from kubernetes.client.models import (
33 V1ClusterRole,
34 V1Role,
35 V1ObjectMeta,
36 V1PolicyRule,
37 V1ServiceAccount,
38 V1ClusterRoleBinding,
39 V1RoleBinding,
40 V1RoleRef,
41 RbacV1Subject,
42 V1Secret,
43 V1SecretReference,
44 V1Namespace,
45 V1PersistentVolumeClaim,
46 V1PersistentVolumeClaimSpec,
47 V1PersistentVolumeClaimVolumeSource,
48 V1ResourceRequirements,
49 V1Pod,
50 V1PodSpec,
51 V1Volume,
52 V1VolumeMount,
53 V1Container,
54)
55from kubernetes.client.rest import ApiException
56from kubernetes.stream import stream
57from osm_lcm.n2vc.libjuju import retry_callback
58from retrying_async import retry
59
60SERVICE_ACCOUNT_TOKEN_KEY = "token"
61SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
62# clients
63CORE_CLIENT = "core_v1"
64RBAC_CLIENT = "rbac_v1"
65STORAGE_CLIENT = "storage_v1"
66CUSTOM_OBJECT_CLIENT = "custom_object"
67
68
69class Kubectl:
70 def __init__(self, config_file=None):
71 config.load_kube_config(config_file=config_file)
72 self._clients = {
73 CORE_CLIENT: client.CoreV1Api(),
74 RBAC_CLIENT: client.RbacAuthorizationV1Api(),
75 STORAGE_CLIENT: client.StorageV1Api(),
76 CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
77 }
78 self._configuration = config.kube_config.Configuration.get_default_copy()
79 self.logger = logging.getLogger("lcm.kubectl")
80
81 @property
82 def configuration(self):
83 return self._configuration
84
85 @property
86 def clients(self):
87 return self._clients
88
89 def get_services(
90 self,
91 field_selector: str = None,
92 label_selector: str = None,
93 ) -> typing.List[typing.Dict]:
94 """
95 Get Service list from a namespace
96
97 :param: field_selector: Kubernetes field selector for the namespace
98 :param: label_selector: Kubernetes label selector for the namespace
99
100 :return: List of the services matching the selectors specified
101 """
102 kwargs = {}
103 if field_selector:
104 kwargs["field_selector"] = field_selector
105 if label_selector:
106 kwargs["label_selector"] = label_selector
107 try:
108 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
109 return [
110 {
111 "name": i.metadata.name,
112 "cluster_ip": i.spec.cluster_ip,
113 "type": i.spec.type,
114 "ports": (
115 [
116 {
117 "name": p.name,
118 "node_port": p.node_port,
119 "port": p.port,
120 "protocol": p.protocol,
121 "target_port": p.target_port,
122 }
123 for p in i.spec.ports
124 ]
125 if i.spec.ports
126 else []
127 ),
128 "external_ip": [i.ip for i in i.status.load_balancer.ingress]
129 if i.status.load_balancer.ingress
130 else None,
131 }
132 for i in result.items
133 ]
134 except ApiException as e:
135 self.logger.error("Error calling get services: {}".format(e))
136 raise e
137
138 def get_default_storage_class(self) -> str:
139 """
140 Default storage class
141
142 :return: Returns the default storage class name, if exists.
143 If not, it returns the first storage class.
144 If there are not storage classes, returns None
145 """
146 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
147 selected_sc = None
148 default_sc_annotations = {
149 "storageclass.kubernetes.io/is-default-class": "true",
150 # Older clusters still use the beta annotation.
151 "storageclass.beta.kubernetes.io/is-default-class": "true",
152 }
153 for sc in storage_classes.items:
154 if not selected_sc:
155 # Select the first storage class in case there is no a default-class
156 selected_sc = sc.metadata.name
157 annotations = sc.metadata.annotations or {}
158 if any(
159 k in annotations and annotations[k] == v
160 for k, v in default_sc_annotations.items()
161 ):
162 # Default storage
163 selected_sc = sc.metadata.name
164 break
165 return selected_sc
166
167 def create_cluster_role(
168 self,
169 name: str,
170 labels: Dict[str, str],
171 namespace: str = "kube-system",
172 ):
173 """
174 Create a cluster role
175
176 :param: name: Name of the cluster role
177 :param: labels: Labels for cluster role metadata
178 :param: namespace: Kubernetes namespace for cluster role metadata
179 Default: kube-system
180 """
181 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
182 field_selector="metadata.name={}".format(name)
183 )
184
185 if len(cluster_roles.items) > 0:
186 raise Exception("Role with metadata.name={} already exists".format(name))
187
188 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
189 # Cluster role
190 cluster_role = V1ClusterRole(
191 metadata=metadata,
192 rules=[
193 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
194 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
195 ],
196 )
197
198 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
199
200 async def create_role(
201 self,
202 name: str,
203 labels: Dict[str, str],
204 api_groups: list,
205 resources: list,
206 verbs: list,
207 namespace: str,
208 ):
209 """
210 Create a role with one PolicyRule
211
212 :param: name: Name of the namespaced Role
213 :param: labels: Labels for namespaced Role metadata
214 :param: api_groups: List with api-groups allowed in the policy rule
215 :param: resources: List with resources allowed in the policy rule
216 :param: verbs: List with verbs allowed in the policy rule
217 :param: namespace: Kubernetes namespace for Role metadata
218
219 :return: None
220 """
221
222 roles = self.clients[RBAC_CLIENT].list_namespaced_role(
223 namespace, field_selector="metadata.name={}".format(name)
224 )
225
226 if len(roles.items) > 0:
227 raise Exception("Role with metadata.name={} already exists".format(name))
228
229 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
230
231 role = V1Role(
232 metadata=metadata,
233 rules=[
234 V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
235 ],
236 )
237
238 self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
239
240 def delete_cluster_role(self, name: str):
241 """
242 Delete a cluster role
243
244 :param: name: Name of the cluster role
245 """
246 self.clients[RBAC_CLIENT].delete_cluster_role(name)
247
248 def _get_kubectl_version(self):
249 version = VersionApi().get_code()
250 return "{}.{}".format(version.major, version.minor)
251
252 def _need_to_create_new_secret(self):
253 min_k8s_version = "1.24"
254 current_k8s_version = self._get_kubectl_version()
255 return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
256
257 def _get_secret_name(self, service_account_name: str):
258 random_alphanum = str(uuid.uuid4())[:5]
259 return "{}-token-{}".format(service_account_name, random_alphanum)
260
261 def _create_service_account_secret(
262 self,
263 service_account_name: str,
264 namespace: str,
265 secret_name: str,
266 ):
267 """
268 Create a secret for the service account. K8s version >= 1.24
269
270 :param: service_account_name: Name of the service account
271 :param: namespace: Kubernetes namespace for service account metadata
272 :param: secret_name: Name of the secret
273 """
274 v1_core = self.clients[CORE_CLIENT]
275 secrets = v1_core.list_namespaced_secret(
276 namespace, field_selector="metadata.name={}".format(secret_name)
277 ).items
278
279 if len(secrets) > 0:
280 raise Exception(
281 "Secret with metadata.name={} already exists".format(secret_name)
282 )
283
284 annotations = {"kubernetes.io/service-account.name": service_account_name}
285 metadata = V1ObjectMeta(
286 name=secret_name, namespace=namespace, annotations=annotations
287 )
288 type = "kubernetes.io/service-account-token"
289 secret = V1Secret(metadata=metadata, type=type)
290 v1_core.create_namespaced_secret(namespace, secret)
291
292 def _get_secret_reference_list(self, namespace: str, secret_name: str):
293 """
294 Return a secret reference list with one secret.
295 K8s version >= 1.24
296
297 :param: namespace: Kubernetes namespace for service account metadata
298 :param: secret_name: Name of the secret
299 :rtype: list[V1SecretReference]
300 """
301 return [V1SecretReference(name=secret_name, namespace=namespace)]
302
303 def create_service_account(
304 self,
305 name: str,
306 labels: Dict[str, str],
307 namespace: str = "kube-system",
308 ):
309 """
310 Create a service account
311
312 :param: name: Name of the service account
313 :param: labels: Labels for service account metadata
314 :param: namespace: Kubernetes namespace for service account metadata
315 Default: kube-system
316 """
317 v1_core = self.clients[CORE_CLIENT]
318 service_accounts = v1_core.list_namespaced_service_account(
319 namespace, field_selector="metadata.name={}".format(name)
320 )
321 if len(service_accounts.items) > 0:
322 raise Exception(
323 "Service account with metadata.name={} already exists".format(name)
324 )
325
326 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
327
328 if self._need_to_create_new_secret():
329 secret_name = self._get_secret_name(name)
330 secrets = self._get_secret_reference_list(namespace, secret_name)
331 service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
332 v1_core.create_namespaced_service_account(namespace, service_account)
333 self._create_service_account_secret(name, namespace, secret_name)
334 else:
335 service_account = V1ServiceAccount(metadata=metadata)
336 v1_core.create_namespaced_service_account(namespace, service_account)
337
338 def delete_secret(self, name: str, namespace: str = "kube-system"):
339 """
340 Delete a secret
341
342 :param: name: Name of the secret
343 :param: namespace: Kubernetes namespace
344 Default: kube-system
345 """
346 self.clients[CORE_CLIENT].delete_namespaced_secret(name, namespace)
347
348 def delete_service_account(self, name: str, namespace: str = "kube-system"):
349 """
350 Delete a service account
351
352 :param: name: Name of the service account
353 :param: namespace: Kubernetes namespace for service account metadata
354 Default: kube-system
355 """
356 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
357
358 def create_cluster_role_binding(
359 self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
360 ):
361 """
362 Create a cluster role binding
363
364 :param: name: Name of the cluster role
365 :param: labels: Labels for cluster role binding metadata
366 :param: namespace: Kubernetes namespace for cluster role binding metadata
367 Default: kube-system
368 """
369 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
370 field_selector="metadata.name={}".format(name)
371 )
372 if len(role_bindings.items) > 0:
373 raise Exception("Generated rbac id already exists")
374
375 role_binding = V1ClusterRoleBinding(
376 metadata=V1ObjectMeta(name=name, labels=labels),
377 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
378 subjects=[
379 RbacV1Subject(kind="ServiceAccount", name=name, namespace=namespace)
380 ],
381 )
382 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
383
384 async def create_role_binding(
385 self,
386 name: str,
387 role_name: str,
388 sa_name: str,
389 labels: Dict[str, str],
390 namespace: str,
391 ):
392 """
393 Create a cluster role binding
394
395 :param: name: Name of the namespaced Role Binding
396 :param: role_name: Name of the namespaced Role to be bound
397 :param: sa_name: Name of the Service Account to be bound
398 :param: labels: Labels for Role Binding metadata
399 :param: namespace: Kubernetes namespace for Role Binding metadata
400
401 :return: None
402 """
403 role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
404 namespace, field_selector="metadata.name={}".format(name)
405 )
406 if len(role_bindings.items) > 0:
407 raise Exception(
408 "Role Binding with metadata.name={} already exists".format(name)
409 )
410
411 role_binding = V1RoleBinding(
412 metadata=V1ObjectMeta(name=name, labels=labels),
413 role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
414 subjects=[
415 RbacV1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
416 ],
417 )
418 self.clients[RBAC_CLIENT].create_namespaced_role_binding(
419 namespace, role_binding
420 )
421
422 def delete_cluster_role_binding(self, name: str):
423 """
424 Delete a cluster role binding
425
426 :param: name: Name of the cluster role binding
427 """
428 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
429
430 @retry(
431 attempts=10,
432 delay=1,
433 fallback=Exception("Failed getting the secret from service account"),
434 callback=retry_callback,
435 )
436 async def get_secret_data(
437 self, name: str, namespace: str = "kube-system"
438 ) -> (str, str):
439 """
440 Get secret data
441
442 :param: name: Name of the secret data
443 :param: namespace: Name of the namespace where the secret is stored
444
445 :return: Tuple with the token and client certificate
446 """
447 v1_core = self.clients[CORE_CLIENT]
448
449 secret_name = None
450
451 service_accounts = v1_core.list_namespaced_service_account(
452 namespace, field_selector="metadata.name={}".format(name)
453 )
454 if len(service_accounts.items) == 0:
455 raise Exception(
456 "Service account not found with metadata.name={}".format(name)
457 )
458 service_account = service_accounts.items[0]
459 if service_account.secrets and len(service_account.secrets) > 0:
460 secret_name = service_account.secrets[0].name
461 if not secret_name:
462 raise Exception(
463 "Failed getting the secret from service account {}".format(name)
464 )
465 # TODO: refactor to use get_secret_content
466 secret = v1_core.list_namespaced_secret(
467 namespace, field_selector="metadata.name={}".format(secret_name)
468 ).items[0]
469
470 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
471 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
472
473 return (
474 base64.b64decode(token).decode("utf-8"),
475 base64.b64decode(client_certificate_data).decode("utf-8"),
476 )
477
478 @retry(
479 attempts=10,
480 delay=1,
481 fallback=Exception("Failed getting data from the secret"),
482 )
483 async def get_secret_content(
484 self,
485 name: str,
486 namespace: str,
487 ) -> dict:
488 """
489 Get secret data
490
491 :param: name: Name of the secret
492 :param: namespace: Name of the namespace where the secret is stored
493
494 :return: Dictionary with secret's data
495 """
496 v1_core = self.clients[CORE_CLIENT]
497
498 secret = v1_core.read_namespaced_secret(name, namespace)
499
500 return secret.data
501
502 @retry(
503 attempts=10,
504 delay=1,
505 fallback=Exception("Failed creating the secret"),
506 )
507 async def create_secret(
508 self, name: str, data: dict, namespace: str, secret_type: str
509 ):
510 """
511 Create secret with data
512
513 :param: name: Name of the secret
514 :param: data: Dict with data content. Values must be already base64 encoded
515 :param: namespace: Name of the namespace where the secret will be stored
516 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
517
518 :return: None
519 """
520 self.logger.info("Enter create_secret function")
521 v1_core = self.clients[CORE_CLIENT]
522 self.logger.info(f"v1_core: {v1_core}")
523 metadata = V1ObjectMeta(name=name, namespace=namespace)
524 self.logger.info(f"metadata: {metadata}")
525 secret = V1Secret(metadata=metadata, data=data, type=secret_type)
526 self.logger.info(f"secret: {secret}")
527 v1_core.create_namespaced_secret(namespace, secret)
528 self.logger.info("Namespaced secret was created")
529
530 async def create_certificate(
531 self,
532 namespace: str,
533 name: str,
534 dns_prefix: str,
535 secret_name: str,
536 usages: list,
537 issuer_name: str,
538 ):
539 """
540 Creates cert-manager certificate object
541
542 :param: namespace: Name of the namespace where the certificate and secret is stored
543 :param: name: Name of the certificate object
544 :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
545 :param: secret_name: Name of the secret created by cert-manager
546 :param: usages: List of X.509 key usages
547 :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
548
549 """
550 certificate_body = {
551 "apiVersion": "cert-manager.io/v1",
552 "kind": "Certificate",
553 "metadata": {"name": name, "namespace": namespace},
554 "spec": {
555 "secretName": secret_name,
556 "privateKey": {
557 "rotationPolicy": "Always",
558 "algorithm": "ECDSA",
559 "size": 256,
560 },
561 "duration": "8760h", # 1 Year
562 "renewBefore": "2208h", # 9 months
563 "subject": {"organizations": ["osm"]},
564 "commonName": "osm",
565 "isCA": False,
566 "usages": usages,
567 "dnsNames": [
568 "{}.{}".format(dns_prefix, namespace),
569 "{}.{}.svc".format(dns_prefix, namespace),
570 "{}.{}.svc.cluster".format(dns_prefix, namespace),
571 "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
572 ],
573 "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
574 },
575 }
576 client = self.clients[CUSTOM_OBJECT_CLIENT]
577 try:
578 client.create_namespaced_custom_object(
579 group="cert-manager.io",
580 plural="certificates",
581 version="v1",
582 body=certificate_body,
583 namespace=namespace,
584 )
585 except ApiException as e:
586 info = json.loads(e.body)
587 if info.get("reason").lower() == "alreadyexists":
588 self.logger.warning("Certificate already exists: {}".format(e))
589 else:
590 raise e
591
592 async def delete_certificate(self, namespace, object_name):
593 client = self.clients[CUSTOM_OBJECT_CLIENT]
594 try:
595 client.delete_namespaced_custom_object(
596 group="cert-manager.io",
597 plural="certificates",
598 version="v1",
599 name=object_name,
600 namespace=namespace,
601 )
602 except ApiException as e:
603 info = json.loads(e.body)
604 if info.get("reason").lower() == "notfound":
605 self.logger.warning("Certificate already deleted: {}".format(e))
606 else:
607 raise e
608
609 @retry(
610 attempts=10,
611 delay=1,
612 fallback=Exception("Failed creating the namespace"),
613 )
614 async def create_namespace(self, name: str, labels: dict = None):
615 """
616 Create a namespace
617
618 :param: name: Name of the namespace to be created
619 :param: labels: Dictionary with labels for the new namespace
620
621 """
622 v1_core = self.clients[CORE_CLIENT]
623 metadata = V1ObjectMeta(name=name, labels=labels)
624 namespace = V1Namespace(
625 metadata=metadata,
626 )
627
628 try:
629 v1_core.create_namespace(namespace)
630 self.logger.debug("Namespace created: {}".format(name))
631 except ApiException as e:
632 info = json.loads(e.body)
633 if info.get("reason").lower() == "alreadyexists":
634 self.logger.warning("Namespace already exists: {}".format(e))
635 else:
636 raise e
637
638 @retry(
639 attempts=10,
640 delay=1,
641 fallback=Exception("Failed deleting the namespace"),
642 )
643 async def delete_namespace(self, name: str):
644 """
645 Delete a namespace
646
647 :param: name: Name of the namespace to be deleted
648
649 """
650 try:
651 self.clients[CORE_CLIENT].delete_namespace(name)
652 except ApiException as e:
653 if e.reason == "Not Found":
654 self.logger.warning("Namespace already deleted: {}".format(e))
655
656 def get_secrets(
657 self,
658 namespace: str,
659 field_selector: str = None,
660 ) -> typing.List[typing.Dict]:
661 """
662 Get Secret list from a namespace
663
664 :param: namespace: Kubernetes namespace
665 :param: field_selector: Kubernetes field selector
666
667 :return: List of the secrets matching the selectors specified
668 """
669 try:
670 v1_core = self.clients[CORE_CLIENT]
671 secrets = v1_core.list_namespaced_secret(
672 namespace=namespace,
673 field_selector=field_selector,
674 ).items
675 return secrets
676 except ApiException as e:
677 self.logger.error("Error calling get secrets: {}".format(e))
678 raise e
679
680 def create_generic_object(
681 self,
682 api_group: str,
683 api_plural: str,
684 api_version: str,
685 namespace: str,
686 manifest_dict: dict,
687 ):
688 """
689 Creates generic object
690
691 :param: api_group: API Group
692 :param: api_plural: API Plural
693 :param: api_version: API Version
694 :param: namespace: Namespace
695 :param: manifest_dict: Dictionary with the content of the Kubernetes manifest
696
697 """
698 client = self.clients[CUSTOM_OBJECT_CLIENT]
699 try:
700 if namespace:
701 client.create_namespaced_custom_object(
702 group=api_group,
703 plural=api_plural,
704 version=api_version,
705 body=manifest_dict,
706 namespace=namespace,
707 )
708 else:
709 client.create_cluster_custom_object(
710 group=api_group,
711 plural=api_plural,
712 version=api_version,
713 body=manifest_dict,
714 )
715 except ApiException as e:
716 info = json.loads(e.body)
717 if info.get("reason").lower() == "alreadyexists":
718 self.logger.warning("Object already exists: {}".format(e))
719 else:
720 raise e
721
722 def delete_generic_object(
723 self,
724 api_group: str,
725 api_plural: str,
726 api_version: str,
727 namespace: str,
728 name: str,
729 ):
730 """
731 Deletes generic object
732
733 :param: api_group: API Group
734 :param: api_plural: API Plural
735 :param: api_version: API Version
736 :param: namespace: Namespace
737 :param: name: Name of the object
738
739 """
740 client = self.clients[CUSTOM_OBJECT_CLIENT]
741 try:
742 if namespace:
743 client.delete_namespaced_custom_object(
744 group=api_group,
745 plural=api_plural,
746 version=api_version,
747 name=name,
748 namespace=namespace,
749 )
750 else:
751 client.delete_cluster_custom_object(
752 group=api_group,
753 plural=api_plural,
754 version=api_version,
755 name=name,
756 )
757 except ApiException as e:
758 info = json.loads(e.body)
759 if info.get("reason").lower() == "notfound":
760 self.logger.warning("Object already deleted: {}".format(e))
761 else:
762 raise e
763
764 async def get_generic_object(
765 self,
766 api_group: str,
767 api_plural: str,
768 api_version: str,
769 namespace: str,
770 name: str,
771 ):
772 """
773 Gets generic object
774
775 :param: api_group: API Group
776 :param: api_plural: API Plural
777 :param: api_version: API Version
778 :param: namespace: Namespace
779 :param: name: Name of the object
780
781 """
782 client = self.clients[CUSTOM_OBJECT_CLIENT]
783 try:
784 if namespace:
785 object_dict = client.list_namespaced_custom_object(
786 group=api_group,
787 plural=api_plural,
788 version=api_version,
789 namespace=namespace,
790 field_selector=f"metadata.name={name}",
791 )
792 else:
793 object_dict = client.list_cluster_custom_object(
794 group=api_group,
795 plural=api_plural,
796 version=api_version,
797 field_selector=f"metadata.name={name}",
798 )
799 if len(object_dict.get("items")) == 0:
800 return None
801 return object_dict.get("items")[0]
802 except ApiException as e:
803 self.logger.debug(f"Exception: {e}")
804 info = json.loads(e.body)
garciadeblasbc96f382025-01-22 16:02:18 +0100805 raise f"Api Exception: {e}. Reason: {info.get('reason')}"
almagiacdd20ae2024-12-13 09:45:45 +0100806
807 async def list_generic_object(
808 self,
809 api_group: str,
810 api_plural: str,
811 api_version: str,
812 namespace: str,
813 ):
814 """
815 Lists all generic objects of the requested API group
816
817 :param: api_group: API Group
818 :param: api_plural: API Plural
819 :param: api_version: API Version
820 :param: namespace: Namespace
821
822 """
823 client = self.clients[CUSTOM_OBJECT_CLIENT]
824 try:
825 if namespace:
826 object_dict = client.list_namespaced_custom_object(
827 group=api_group,
828 plural=api_plural,
829 version=api_version,
830 namespace=namespace,
831 )
832 else:
833 object_dict = client.list_cluster_custom_object(
834 group=api_group,
835 plural=api_plural,
836 version=api_version,
837 )
838 self.logger.debug(f"Object-list: {object_dict.get('items')}")
839 return object_dict.get("items")
840 except ApiException as e:
841 self.logger.debug(f"Exception: {e}")
842 info = json.loads(e.body)
843 if info.get("reason").lower() == "notfound":
844 self.logger.warning(
845 "Cannot find specified custom objects: {}".format(e)
846 )
847 return []
848 else:
849 raise e
850
851 @retry(
852 attempts=10,
853 delay=1,
854 fallback=Exception("Failed creating the secret"),
855 )
856 async def create_secret_string(
857 self, name: str, string_data: str, namespace: str, secret_type: str
858 ):
859 """
860 Create secret with data
861
862 :param: name: Name of the secret
863 :param: string_data: String with data content
864 :param: namespace: Name of the namespace where the secret will be stored
865 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
866
867 :return: None
868 """
869 v1_core = self.clients[CORE_CLIENT]
870 metadata = V1ObjectMeta(name=name, namespace=namespace)
871 secret = V1Secret(metadata=metadata, string_data=string_data, type=secret_type)
872 v1_core.create_namespaced_secret(namespace, secret)
873
874 @retry(
875 attempts=10,
876 delay=1,
877 fallback=Exception("Failed creating the pvc"),
878 )
879 async def create_pvc(self, name: str, namespace: str):
880 """
881 Create a namespace
882
883 :param: name: Name of the pvc to be created
884 :param: namespace: Name of the namespace where the pvc will be stored
885
886 """
887 try:
888 pvc = V1PersistentVolumeClaim(
889 api_version="v1",
890 kind="PersistentVolumeClaim",
891 metadata=V1ObjectMeta(name=name),
892 spec=V1PersistentVolumeClaimSpec(
893 access_modes=["ReadWriteOnce"],
894 resources=V1ResourceRequirements(requests={"storage": "100Mi"}),
895 ),
896 )
897 self.clients[CORE_CLIENT].create_namespaced_persistent_volume_claim(
898 namespace=namespace, body=pvc
899 )
900 except ApiException as e:
901 info = json.loads(e.body)
902 if info.get("reason").lower() == "alreadyexists":
903 self.logger.warning("PVC already exists: {}".format(e))
904 else:
905 raise e
906
907 @retry(
908 attempts=10,
909 delay=1,
910 fallback=Exception("Failed deleting the pvc"),
911 )
912 async def delete_pvc(self, name: str, namespace: str):
913 """
914 Create a namespace
915
916 :param: name: Name of the pvc to be deleted
917 :param: namespace: Namespace
918
919 """
920 self.clients[CORE_CLIENT].delete_namespaced_persistent_volume_claim(
921 name, namespace
922 )
923
924 def copy_file_to_pod(
925 self, namespace, pod_name, container_name, src_file, dest_path
926 ):
927 # Create an in-memory tar file containing the source file
928 tar_buffer = io.BytesIO()
929 with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
930 tar.add(src_file, arcname=dest_path.split("/")[-1])
931
932 tar_buffer.seek(0)
933
934 # Define the command to extract the tar file in the pod
935 exec_command = ["tar", "xvf", "-", "-C", dest_path.rsplit("/", 1)[0]]
936
937 # Execute the command
938 resp = stream(
939 self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
940 pod_name,
941 namespace,
942 command=exec_command,
943 container=container_name,
944 stdin=True,
945 stderr=True,
946 stdout=True,
947 tty=False,
948 _preload_content=False,
949 )
950
951 # Write the tar data to the pod
952 resp.write_stdin(tar_buffer.read())
953 resp.close()
954
955 @retry(
956 attempts=10,
957 delay=1,
958 fallback=Exception("Failed creating the pvc"),
959 )
960 async def create_pvc_with_content(
961 self, name: str, namespace: str, src_file: str, dest_filename: str
962 ):
963 """
964 Create a PVC with content
965
966 :param: name: Name of the pvc to be created
967 :param: namespace: Name of the namespace where the pvc will be stored
968 :param: src_file: File to be copied
969 :param: filename: Name of the file in the destination folder
970 """
971 pod_name = f"copy-pod-{name}"
972 self.logger.debug(f"Creating pvc {name}")
973 await self.create_pvc(name=name, namespace=namespace)
974 self.logger.debug("Sleeping")
975 sleep(40)
976 self.logger.debug(f"Creating pod {pod_name}")
977 await self.create_copy_pod(name=pod_name, namespace=namespace, pvc_name=name)
978 self.logger.debug("Sleeping")
979 sleep(40)
980 self.logger.debug(f"Copying files to pod {pod_name}")
981 self.copy_file_to_pod(
982 namespace=namespace,
983 pod_name=pod_name,
984 container_name="copy-container",
985 src_file=src_file,
986 dest_path=f"/mnt/data/{dest_filename}",
987 )
988
989 @retry(
990 attempts=10,
991 delay=1,
992 fallback=Exception("Failed creating the pvc"),
993 )
994 async def create_copy_pod(self, name: str, namespace: str, pvc_name: str):
995 """
996 Create a pod to copy content into a PVC
997
998 :param: name: Name of the pod to be created
999 :param: namespace: Name of the namespace where the pod will be stored
1000 :param: pvc_name: Name of the PVC that the pod will mount as a volume
1001
1002 """
1003 pod = V1Pod(
1004 api_version="v1",
1005 kind="Pod",
1006 metadata=client.V1ObjectMeta(name=name),
1007 spec=V1PodSpec(
1008 containers=[
1009 V1Container(
1010 name="copy-container",
1011 image="busybox", # Imagen ligera para copiar archivos
1012 command=["sleep", "3600"], # Mantén el contenedor en ejecución
1013 volume_mounts=[
1014 V1VolumeMount(mount_path="/mnt/data", name="my-storage")
1015 ],
1016 )
1017 ],
1018 volumes=[
1019 V1Volume(
1020 name="my-storage",
1021 persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
1022 claim_name=pvc_name
1023 ),
1024 )
1025 ],
1026 ),
1027 )
1028 # Create the pod
1029 self.clients[CORE_CLIENT].create_namespaced_pod(namespace=namespace, body=pod)
1030
1031 @retry(
1032 attempts=10,
1033 delay=1,
1034 fallback=Exception("Failed deleting the pod"),
1035 )
1036 async def delete_pod(self, name: str, namespace: str):
1037 """
1038 Create a namespace
1039
1040 :param: name: Name of the pod to be deleted
1041 :param: namespace: Namespace
1042
1043 """
1044 self.clients[CORE_CLIENT].delete_namespaced_pod(name, namespace)