blob: 4402f1138e3fd567d784dea3226aa0bc66764bba [file] [log] [blame]
almagiacdd20ae2024-12-13 09:45:45 +01001#######################################################################################
2# Copyright 2020 Canonical Ltd.
3# Copyright ETSI Contributors and Others.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#######################################################################################
17
18import base64
19import logging
almagiacdd20ae2024-12-13 09:45:45 +010020import typing
21import uuid
22import json
23import tarfile
24import io
25from time import sleep
26
27from distutils.version import LooseVersion
28
29from kubernetes import client, config
30from kubernetes.client.api import VersionApi
31from kubernetes.client.models import (
32 V1ClusterRole,
33 V1Role,
34 V1ObjectMeta,
35 V1PolicyRule,
36 V1ServiceAccount,
37 V1ClusterRoleBinding,
38 V1RoleBinding,
39 V1RoleRef,
40 RbacV1Subject,
41 V1Secret,
42 V1SecretReference,
43 V1Namespace,
44 V1PersistentVolumeClaim,
45 V1PersistentVolumeClaimSpec,
46 V1PersistentVolumeClaimVolumeSource,
47 V1ResourceRequirements,
48 V1Pod,
49 V1PodSpec,
50 V1Volume,
51 V1VolumeMount,
52 V1Container,
53)
54from kubernetes.client.rest import ApiException
55from kubernetes.stream import stream
56from osm_lcm.n2vc.libjuju import retry_callback
57from retrying_async import retry
58
59SERVICE_ACCOUNT_TOKEN_KEY = "token"
60SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
61# clients
62CORE_CLIENT = "core_v1"
63RBAC_CLIENT = "rbac_v1"
64STORAGE_CLIENT = "storage_v1"
65CUSTOM_OBJECT_CLIENT = "custom_object"
66
67
68class Kubectl:
69 def __init__(self, config_file=None):
70 config.load_kube_config(config_file=config_file)
71 self._clients = {
72 CORE_CLIENT: client.CoreV1Api(),
73 RBAC_CLIENT: client.RbacAuthorizationV1Api(),
74 STORAGE_CLIENT: client.StorageV1Api(),
75 CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
76 }
77 self._configuration = config.kube_config.Configuration.get_default_copy()
78 self.logger = logging.getLogger("lcm.kubectl")
79
80 @property
81 def configuration(self):
82 return self._configuration
83
84 @property
85 def clients(self):
86 return self._clients
87
88 def get_services(
89 self,
90 field_selector: str = None,
91 label_selector: str = None,
92 ) -> typing.List[typing.Dict]:
93 """
94 Get Service list from a namespace
95
96 :param: field_selector: Kubernetes field selector for the namespace
97 :param: label_selector: Kubernetes label selector for the namespace
98
99 :return: List of the services matching the selectors specified
100 """
101 kwargs = {}
102 if field_selector:
103 kwargs["field_selector"] = field_selector
104 if label_selector:
105 kwargs["label_selector"] = label_selector
106 try:
107 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
108 return [
109 {
110 "name": i.metadata.name,
111 "cluster_ip": i.spec.cluster_ip,
112 "type": i.spec.type,
113 "ports": (
114 [
115 {
116 "name": p.name,
117 "node_port": p.node_port,
118 "port": p.port,
119 "protocol": p.protocol,
120 "target_port": p.target_port,
121 }
122 for p in i.spec.ports
123 ]
124 if i.spec.ports
125 else []
126 ),
127 "external_ip": [i.ip for i in i.status.load_balancer.ingress]
128 if i.status.load_balancer.ingress
129 else None,
130 }
131 for i in result.items
132 ]
133 except ApiException as e:
134 self.logger.error("Error calling get services: {}".format(e))
135 raise e
136
137 def get_default_storage_class(self) -> str:
138 """
139 Default storage class
140
141 :return: Returns the default storage class name, if exists.
142 If not, it returns the first storage class.
143 If there are not storage classes, returns None
144 """
145 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
146 selected_sc = None
147 default_sc_annotations = {
148 "storageclass.kubernetes.io/is-default-class": "true",
149 # Older clusters still use the beta annotation.
150 "storageclass.beta.kubernetes.io/is-default-class": "true",
151 }
152 for sc in storage_classes.items:
153 if not selected_sc:
154 # Select the first storage class in case there is no a default-class
155 selected_sc = sc.metadata.name
156 annotations = sc.metadata.annotations or {}
157 if any(
158 k in annotations and annotations[k] == v
159 for k, v in default_sc_annotations.items()
160 ):
161 # Default storage
162 selected_sc = sc.metadata.name
163 break
164 return selected_sc
165
166 def create_cluster_role(
167 self,
168 name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100169 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100170 namespace: str = "kube-system",
171 ):
172 """
173 Create a cluster role
174
175 :param: name: Name of the cluster role
176 :param: labels: Labels for cluster role metadata
177 :param: namespace: Kubernetes namespace for cluster role metadata
178 Default: kube-system
179 """
180 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
181 field_selector="metadata.name={}".format(name)
182 )
183
184 if len(cluster_roles.items) > 0:
185 raise Exception("Role with metadata.name={} already exists".format(name))
186
187 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
188 # Cluster role
189 cluster_role = V1ClusterRole(
190 metadata=metadata,
191 rules=[
192 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
193 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
194 ],
195 )
196
197 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
198
199 async def create_role(
200 self,
201 name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100202 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100203 api_groups: list,
204 resources: list,
205 verbs: list,
206 namespace: str,
207 ):
208 """
209 Create a role with one PolicyRule
210
211 :param: name: Name of the namespaced Role
212 :param: labels: Labels for namespaced Role metadata
213 :param: api_groups: List with api-groups allowed in the policy rule
214 :param: resources: List with resources allowed in the policy rule
215 :param: verbs: List with verbs allowed in the policy rule
216 :param: namespace: Kubernetes namespace for Role metadata
217
218 :return: None
219 """
220
221 roles = self.clients[RBAC_CLIENT].list_namespaced_role(
222 namespace, field_selector="metadata.name={}".format(name)
223 )
224
225 if len(roles.items) > 0:
226 raise Exception("Role with metadata.name={} already exists".format(name))
227
228 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
229
230 role = V1Role(
231 metadata=metadata,
232 rules=[
233 V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
234 ],
235 )
236
237 self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
238
239 def delete_cluster_role(self, name: str):
240 """
241 Delete a cluster role
242
243 :param: name: Name of the cluster role
244 """
245 self.clients[RBAC_CLIENT].delete_cluster_role(name)
246
247 def _get_kubectl_version(self):
248 version = VersionApi().get_code()
249 return "{}.{}".format(version.major, version.minor)
250
251 def _need_to_create_new_secret(self):
252 min_k8s_version = "1.24"
253 current_k8s_version = self._get_kubectl_version()
254 return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
255
256 def _get_secret_name(self, service_account_name: str):
257 random_alphanum = str(uuid.uuid4())[:5]
258 return "{}-token-{}".format(service_account_name, random_alphanum)
259
260 def _create_service_account_secret(
261 self,
262 service_account_name: str,
263 namespace: str,
264 secret_name: str,
265 ):
266 """
267 Create a secret for the service account. K8s version >= 1.24
268
269 :param: service_account_name: Name of the service account
270 :param: namespace: Kubernetes namespace for service account metadata
271 :param: secret_name: Name of the secret
272 """
273 v1_core = self.clients[CORE_CLIENT]
274 secrets = v1_core.list_namespaced_secret(
275 namespace, field_selector="metadata.name={}".format(secret_name)
276 ).items
277
278 if len(secrets) > 0:
279 raise Exception(
280 "Secret with metadata.name={} already exists".format(secret_name)
281 )
282
283 annotations = {"kubernetes.io/service-account.name": service_account_name}
284 metadata = V1ObjectMeta(
285 name=secret_name, namespace=namespace, annotations=annotations
286 )
287 type = "kubernetes.io/service-account-token"
288 secret = V1Secret(metadata=metadata, type=type)
289 v1_core.create_namespaced_secret(namespace, secret)
290
291 def _get_secret_reference_list(self, namespace: str, secret_name: str):
292 """
293 Return a secret reference list with one secret.
294 K8s version >= 1.24
295
296 :param: namespace: Kubernetes namespace for service account metadata
297 :param: secret_name: Name of the secret
298 :rtype: list[V1SecretReference]
299 """
300 return [V1SecretReference(name=secret_name, namespace=namespace)]
301
302 def create_service_account(
303 self,
304 name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100305 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100306 namespace: str = "kube-system",
307 ):
308 """
309 Create a service account
310
311 :param: name: Name of the service account
312 :param: labels: Labels for service account metadata
313 :param: namespace: Kubernetes namespace for service account metadata
314 Default: kube-system
315 """
316 v1_core = self.clients[CORE_CLIENT]
317 service_accounts = v1_core.list_namespaced_service_account(
318 namespace, field_selector="metadata.name={}".format(name)
319 )
320 if len(service_accounts.items) > 0:
321 raise Exception(
322 "Service account with metadata.name={} already exists".format(name)
323 )
324
325 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
326
327 if self._need_to_create_new_secret():
328 secret_name = self._get_secret_name(name)
329 secrets = self._get_secret_reference_list(namespace, secret_name)
330 service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
331 v1_core.create_namespaced_service_account(namespace, service_account)
332 self._create_service_account_secret(name, namespace, secret_name)
333 else:
334 service_account = V1ServiceAccount(metadata=metadata)
335 v1_core.create_namespaced_service_account(namespace, service_account)
336
337 def delete_secret(self, name: str, namespace: str = "kube-system"):
338 """
339 Delete a secret
340
341 :param: name: Name of the secret
342 :param: namespace: Kubernetes namespace
343 Default: kube-system
344 """
345 self.clients[CORE_CLIENT].delete_namespaced_secret(name, namespace)
346
347 def delete_service_account(self, name: str, namespace: str = "kube-system"):
348 """
349 Delete a service account
350
351 :param: name: Name of the service account
352 :param: namespace: Kubernetes namespace for service account metadata
353 Default: kube-system
354 """
355 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
356
357 def create_cluster_role_binding(
garciadeblas6bbe55e2025-02-08 11:30:39 +0100358 self, name: str, labels: typing.Dict[str, str], namespace: str = "kube-system"
almagiacdd20ae2024-12-13 09:45:45 +0100359 ):
360 """
361 Create a cluster role binding
362
363 :param: name: Name of the cluster role
364 :param: labels: Labels for cluster role binding metadata
365 :param: namespace: Kubernetes namespace for cluster role binding metadata
366 Default: kube-system
367 """
368 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
369 field_selector="metadata.name={}".format(name)
370 )
371 if len(role_bindings.items) > 0:
372 raise Exception("Generated rbac id already exists")
373
374 role_binding = V1ClusterRoleBinding(
375 metadata=V1ObjectMeta(name=name, labels=labels),
376 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
377 subjects=[
378 RbacV1Subject(kind="ServiceAccount", name=name, namespace=namespace)
379 ],
380 )
381 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
382
383 async def create_role_binding(
384 self,
385 name: str,
386 role_name: str,
387 sa_name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100388 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100389 namespace: str,
390 ):
391 """
392 Create a cluster role binding
393
394 :param: name: Name of the namespaced Role Binding
395 :param: role_name: Name of the namespaced Role to be bound
396 :param: sa_name: Name of the Service Account to be bound
397 :param: labels: Labels for Role Binding metadata
398 :param: namespace: Kubernetes namespace for Role Binding metadata
399
400 :return: None
401 """
402 role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
403 namespace, field_selector="metadata.name={}".format(name)
404 )
405 if len(role_bindings.items) > 0:
406 raise Exception(
407 "Role Binding with metadata.name={} already exists".format(name)
408 )
409
410 role_binding = V1RoleBinding(
411 metadata=V1ObjectMeta(name=name, labels=labels),
412 role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
413 subjects=[
414 RbacV1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
415 ],
416 )
417 self.clients[RBAC_CLIENT].create_namespaced_role_binding(
418 namespace, role_binding
419 )
420
421 def delete_cluster_role_binding(self, name: str):
422 """
423 Delete a cluster role binding
424
425 :param: name: Name of the cluster role binding
426 """
427 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
428
429 @retry(
430 attempts=10,
431 delay=1,
432 fallback=Exception("Failed getting the secret from service account"),
433 callback=retry_callback,
434 )
435 async def get_secret_data(
436 self, name: str, namespace: str = "kube-system"
437 ) -> (str, str):
438 """
439 Get secret data
440
441 :param: name: Name of the secret data
442 :param: namespace: Name of the namespace where the secret is stored
443
444 :return: Tuple with the token and client certificate
445 """
446 v1_core = self.clients[CORE_CLIENT]
447
448 secret_name = None
449
450 service_accounts = v1_core.list_namespaced_service_account(
451 namespace, field_selector="metadata.name={}".format(name)
452 )
453 if len(service_accounts.items) == 0:
454 raise Exception(
455 "Service account not found with metadata.name={}".format(name)
456 )
457 service_account = service_accounts.items[0]
458 if service_account.secrets and len(service_account.secrets) > 0:
459 secret_name = service_account.secrets[0].name
460 if not secret_name:
461 raise Exception(
462 "Failed getting the secret from service account {}".format(name)
463 )
464 # TODO: refactor to use get_secret_content
465 secret = v1_core.list_namespaced_secret(
466 namespace, field_selector="metadata.name={}".format(secret_name)
467 ).items[0]
468
469 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
470 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
471
472 return (
473 base64.b64decode(token).decode("utf-8"),
474 base64.b64decode(client_certificate_data).decode("utf-8"),
475 )
476
477 @retry(
478 attempts=10,
479 delay=1,
480 fallback=Exception("Failed getting data from the secret"),
481 )
482 async def get_secret_content(
483 self,
484 name: str,
485 namespace: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100486 ) -> typing.Dict:
almagiacdd20ae2024-12-13 09:45:45 +0100487 """
488 Get secret data
489
490 :param: name: Name of the secret
491 :param: namespace: Name of the namespace where the secret is stored
492
493 :return: Dictionary with secret's data
494 """
495 v1_core = self.clients[CORE_CLIENT]
496
497 secret = v1_core.read_namespaced_secret(name, namespace)
498
499 return secret.data
500
501 @retry(
502 attempts=10,
503 delay=1,
504 fallback=Exception("Failed creating the secret"),
505 )
506 async def create_secret(
507 self, name: str, data: dict, namespace: str, secret_type: str
508 ):
509 """
510 Create secret with data
511
512 :param: name: Name of the secret
513 :param: data: Dict with data content. Values must be already base64 encoded
514 :param: namespace: Name of the namespace where the secret will be stored
515 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
516
517 :return: None
518 """
519 self.logger.info("Enter create_secret function")
520 v1_core = self.clients[CORE_CLIENT]
521 self.logger.info(f"v1_core: {v1_core}")
522 metadata = V1ObjectMeta(name=name, namespace=namespace)
523 self.logger.info(f"metadata: {metadata}")
524 secret = V1Secret(metadata=metadata, data=data, type=secret_type)
525 self.logger.info(f"secret: {secret}")
526 v1_core.create_namespaced_secret(namespace, secret)
527 self.logger.info("Namespaced secret was created")
528
529 async def create_certificate(
530 self,
531 namespace: str,
532 name: str,
533 dns_prefix: str,
534 secret_name: str,
535 usages: list,
536 issuer_name: str,
537 ):
538 """
539 Creates cert-manager certificate object
540
541 :param: namespace: Name of the namespace where the certificate and secret is stored
542 :param: name: Name of the certificate object
543 :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
544 :param: secret_name: Name of the secret created by cert-manager
545 :param: usages: List of X.509 key usages
546 :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
547
548 """
549 certificate_body = {
550 "apiVersion": "cert-manager.io/v1",
551 "kind": "Certificate",
552 "metadata": {"name": name, "namespace": namespace},
553 "spec": {
554 "secretName": secret_name,
555 "privateKey": {
556 "rotationPolicy": "Always",
557 "algorithm": "ECDSA",
558 "size": 256,
559 },
560 "duration": "8760h", # 1 Year
561 "renewBefore": "2208h", # 9 months
562 "subject": {"organizations": ["osm"]},
563 "commonName": "osm",
564 "isCA": False,
565 "usages": usages,
566 "dnsNames": [
567 "{}.{}".format(dns_prefix, namespace),
568 "{}.{}.svc".format(dns_prefix, namespace),
569 "{}.{}.svc.cluster".format(dns_prefix, namespace),
570 "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
571 ],
572 "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
573 },
574 }
575 client = self.clients[CUSTOM_OBJECT_CLIENT]
576 try:
577 client.create_namespaced_custom_object(
578 group="cert-manager.io",
579 plural="certificates",
580 version="v1",
581 body=certificate_body,
582 namespace=namespace,
583 )
584 except ApiException as e:
585 info = json.loads(e.body)
586 if info.get("reason").lower() == "alreadyexists":
587 self.logger.warning("Certificate already exists: {}".format(e))
588 else:
589 raise e
590
591 async def delete_certificate(self, namespace, object_name):
592 client = self.clients[CUSTOM_OBJECT_CLIENT]
593 try:
594 client.delete_namespaced_custom_object(
595 group="cert-manager.io",
596 plural="certificates",
597 version="v1",
598 name=object_name,
599 namespace=namespace,
600 )
601 except ApiException as e:
602 info = json.loads(e.body)
603 if info.get("reason").lower() == "notfound":
604 self.logger.warning("Certificate already deleted: {}".format(e))
605 else:
606 raise e
607
608 @retry(
609 attempts=10,
610 delay=1,
611 fallback=Exception("Failed creating the namespace"),
612 )
613 async def create_namespace(self, name: str, labels: dict = None):
614 """
615 Create a namespace
616
617 :param: name: Name of the namespace to be created
618 :param: labels: Dictionary with labels for the new namespace
619
620 """
621 v1_core = self.clients[CORE_CLIENT]
622 metadata = V1ObjectMeta(name=name, labels=labels)
623 namespace = V1Namespace(
624 metadata=metadata,
625 )
626
627 try:
628 v1_core.create_namespace(namespace)
629 self.logger.debug("Namespace created: {}".format(name))
630 except ApiException as e:
631 info = json.loads(e.body)
632 if info.get("reason").lower() == "alreadyexists":
633 self.logger.warning("Namespace already exists: {}".format(e))
634 else:
635 raise e
636
637 @retry(
638 attempts=10,
639 delay=1,
640 fallback=Exception("Failed deleting the namespace"),
641 )
642 async def delete_namespace(self, name: str):
643 """
644 Delete a namespace
645
646 :param: name: Name of the namespace to be deleted
647
648 """
649 try:
650 self.clients[CORE_CLIENT].delete_namespace(name)
651 except ApiException as e:
652 if e.reason == "Not Found":
653 self.logger.warning("Namespace already deleted: {}".format(e))
654
655 def get_secrets(
656 self,
657 namespace: str,
658 field_selector: str = None,
659 ) -> typing.List[typing.Dict]:
660 """
661 Get Secret list from a namespace
662
663 :param: namespace: Kubernetes namespace
664 :param: field_selector: Kubernetes field selector
665
666 :return: List of the secrets matching the selectors specified
667 """
668 try:
669 v1_core = self.clients[CORE_CLIENT]
670 secrets = v1_core.list_namespaced_secret(
671 namespace=namespace,
672 field_selector=field_selector,
673 ).items
674 return secrets
675 except ApiException as e:
676 self.logger.error("Error calling get secrets: {}".format(e))
677 raise e
678
679 def create_generic_object(
680 self,
681 api_group: str,
682 api_plural: str,
683 api_version: str,
684 namespace: str,
685 manifest_dict: dict,
686 ):
687 """
688 Creates generic object
689
690 :param: api_group: API Group
691 :param: api_plural: API Plural
692 :param: api_version: API Version
693 :param: namespace: Namespace
694 :param: manifest_dict: Dictionary with the content of the Kubernetes manifest
695
696 """
697 client = self.clients[CUSTOM_OBJECT_CLIENT]
698 try:
699 if namespace:
700 client.create_namespaced_custom_object(
701 group=api_group,
702 plural=api_plural,
703 version=api_version,
704 body=manifest_dict,
705 namespace=namespace,
706 )
707 else:
708 client.create_cluster_custom_object(
709 group=api_group,
710 plural=api_plural,
711 version=api_version,
712 body=manifest_dict,
713 )
714 except ApiException as e:
715 info = json.loads(e.body)
716 if info.get("reason").lower() == "alreadyexists":
717 self.logger.warning("Object already exists: {}".format(e))
718 else:
719 raise e
720
721 def delete_generic_object(
722 self,
723 api_group: str,
724 api_plural: str,
725 api_version: str,
726 namespace: str,
727 name: str,
728 ):
729 """
730 Deletes generic object
731
732 :param: api_group: API Group
733 :param: api_plural: API Plural
734 :param: api_version: API Version
735 :param: namespace: Namespace
736 :param: name: Name of the object
737
738 """
739 client = self.clients[CUSTOM_OBJECT_CLIENT]
740 try:
741 if namespace:
742 client.delete_namespaced_custom_object(
743 group=api_group,
744 plural=api_plural,
745 version=api_version,
746 name=name,
747 namespace=namespace,
748 )
749 else:
750 client.delete_cluster_custom_object(
751 group=api_group,
752 plural=api_plural,
753 version=api_version,
754 name=name,
755 )
756 except ApiException as e:
757 info = json.loads(e.body)
758 if info.get("reason").lower() == "notfound":
759 self.logger.warning("Object already deleted: {}".format(e))
760 else:
761 raise e
762
763 async def get_generic_object(
764 self,
765 api_group: str,
766 api_plural: str,
767 api_version: str,
768 namespace: str,
769 name: str,
770 ):
771 """
772 Gets generic object
773
774 :param: api_group: API Group
775 :param: api_plural: API Plural
776 :param: api_version: API Version
777 :param: namespace: Namespace
778 :param: name: Name of the object
779
780 """
781 client = self.clients[CUSTOM_OBJECT_CLIENT]
782 try:
783 if namespace:
784 object_dict = client.list_namespaced_custom_object(
785 group=api_group,
786 plural=api_plural,
787 version=api_version,
788 namespace=namespace,
789 field_selector=f"metadata.name={name}",
790 )
791 else:
792 object_dict = client.list_cluster_custom_object(
793 group=api_group,
794 plural=api_plural,
795 version=api_version,
796 field_selector=f"metadata.name={name}",
797 )
798 if len(object_dict.get("items")) == 0:
799 return None
800 return object_dict.get("items")[0]
801 except ApiException as e:
802 self.logger.debug(f"Exception: {e}")
803 info = json.loads(e.body)
garciadeblasad6d1ba2025-01-22 16:02:18 +0100804 raise f"Api Exception: {e}. Reason: {info.get('reason')}"
almagiacdd20ae2024-12-13 09:45:45 +0100805
806 async def list_generic_object(
807 self,
808 api_group: str,
809 api_plural: str,
810 api_version: str,
811 namespace: str,
812 ):
813 """
814 Lists all generic objects of the requested API group
815
816 :param: api_group: API Group
817 :param: api_plural: API Plural
818 :param: api_version: API Version
819 :param: namespace: Namespace
820
821 """
822 client = self.clients[CUSTOM_OBJECT_CLIENT]
823 try:
824 if namespace:
825 object_dict = client.list_namespaced_custom_object(
826 group=api_group,
827 plural=api_plural,
828 version=api_version,
829 namespace=namespace,
830 )
831 else:
832 object_dict = client.list_cluster_custom_object(
833 group=api_group,
834 plural=api_plural,
835 version=api_version,
836 )
837 self.logger.debug(f"Object-list: {object_dict.get('items')}")
838 return object_dict.get("items")
839 except ApiException as e:
840 self.logger.debug(f"Exception: {e}")
841 info = json.loads(e.body)
842 if info.get("reason").lower() == "notfound":
843 self.logger.warning(
844 "Cannot find specified custom objects: {}".format(e)
845 )
846 return []
847 else:
848 raise e
849
850 @retry(
851 attempts=10,
852 delay=1,
853 fallback=Exception("Failed creating the secret"),
854 )
855 async def create_secret_string(
856 self, name: str, string_data: str, namespace: str, secret_type: str
857 ):
858 """
859 Create secret with data
860
861 :param: name: Name of the secret
862 :param: string_data: String with data content
863 :param: namespace: Name of the namespace where the secret will be stored
864 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
865
866 :return: None
867 """
868 v1_core = self.clients[CORE_CLIENT]
869 metadata = V1ObjectMeta(name=name, namespace=namespace)
870 secret = V1Secret(metadata=metadata, string_data=string_data, type=secret_type)
871 v1_core.create_namespaced_secret(namespace, secret)
872
873 @retry(
874 attempts=10,
875 delay=1,
876 fallback=Exception("Failed creating the pvc"),
877 )
878 async def create_pvc(self, name: str, namespace: str):
879 """
880 Create a namespace
881
882 :param: name: Name of the pvc to be created
883 :param: namespace: Name of the namespace where the pvc will be stored
884
885 """
886 try:
887 pvc = V1PersistentVolumeClaim(
888 api_version="v1",
889 kind="PersistentVolumeClaim",
890 metadata=V1ObjectMeta(name=name),
891 spec=V1PersistentVolumeClaimSpec(
892 access_modes=["ReadWriteOnce"],
893 resources=V1ResourceRequirements(requests={"storage": "100Mi"}),
894 ),
895 )
896 self.clients[CORE_CLIENT].create_namespaced_persistent_volume_claim(
897 namespace=namespace, body=pvc
898 )
899 except ApiException as e:
900 info = json.loads(e.body)
901 if info.get("reason").lower() == "alreadyexists":
902 self.logger.warning("PVC already exists: {}".format(e))
903 else:
904 raise e
905
906 @retry(
907 attempts=10,
908 delay=1,
909 fallback=Exception("Failed deleting the pvc"),
910 )
911 async def delete_pvc(self, name: str, namespace: str):
912 """
913 Create a namespace
914
915 :param: name: Name of the pvc to be deleted
916 :param: namespace: Namespace
917
918 """
919 self.clients[CORE_CLIENT].delete_namespaced_persistent_volume_claim(
920 name, namespace
921 )
922
923 def copy_file_to_pod(
924 self, namespace, pod_name, container_name, src_file, dest_path
925 ):
926 # Create an in-memory tar file containing the source file
927 tar_buffer = io.BytesIO()
928 with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
929 tar.add(src_file, arcname=dest_path.split("/")[-1])
930
931 tar_buffer.seek(0)
932
933 # Define the command to extract the tar file in the pod
934 exec_command = ["tar", "xvf", "-", "-C", dest_path.rsplit("/", 1)[0]]
935
936 # Execute the command
937 resp = stream(
938 self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
939 pod_name,
940 namespace,
941 command=exec_command,
942 container=container_name,
943 stdin=True,
944 stderr=True,
945 stdout=True,
946 tty=False,
947 _preload_content=False,
948 )
949
950 # Write the tar data to the pod
951 resp.write_stdin(tar_buffer.read())
952 resp.close()
953
954 @retry(
955 attempts=10,
956 delay=1,
957 fallback=Exception("Failed creating the pvc"),
958 )
959 async def create_pvc_with_content(
960 self, name: str, namespace: str, src_file: str, dest_filename: str
961 ):
962 """
963 Create a PVC with content
964
965 :param: name: Name of the pvc to be created
966 :param: namespace: Name of the namespace where the pvc will be stored
967 :param: src_file: File to be copied
968 :param: filename: Name of the file in the destination folder
969 """
970 pod_name = f"copy-pod-{name}"
971 self.logger.debug(f"Creating pvc {name}")
972 await self.create_pvc(name=name, namespace=namespace)
973 self.logger.debug("Sleeping")
974 sleep(40)
975 self.logger.debug(f"Creating pod {pod_name}")
976 await self.create_copy_pod(name=pod_name, namespace=namespace, pvc_name=name)
977 self.logger.debug("Sleeping")
978 sleep(40)
979 self.logger.debug(f"Copying files to pod {pod_name}")
980 self.copy_file_to_pod(
981 namespace=namespace,
982 pod_name=pod_name,
983 container_name="copy-container",
984 src_file=src_file,
985 dest_path=f"/mnt/data/{dest_filename}",
986 )
987
988 @retry(
989 attempts=10,
990 delay=1,
991 fallback=Exception("Failed creating the pvc"),
992 )
993 async def create_copy_pod(self, name: str, namespace: str, pvc_name: str):
994 """
995 Create a pod to copy content into a PVC
996
997 :param: name: Name of the pod to be created
998 :param: namespace: Name of the namespace where the pod will be stored
999 :param: pvc_name: Name of the PVC that the pod will mount as a volume
1000
1001 """
1002 pod = V1Pod(
1003 api_version="v1",
1004 kind="Pod",
1005 metadata=client.V1ObjectMeta(name=name),
1006 spec=V1PodSpec(
1007 containers=[
1008 V1Container(
1009 name="copy-container",
1010 image="busybox", # Imagen ligera para copiar archivos
1011 command=["sleep", "3600"], # Mantén el contenedor en ejecución
1012 volume_mounts=[
1013 V1VolumeMount(mount_path="/mnt/data", name="my-storage")
1014 ],
1015 )
1016 ],
1017 volumes=[
1018 V1Volume(
1019 name="my-storage",
1020 persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
1021 claim_name=pvc_name
1022 ),
1023 )
1024 ],
1025 ),
1026 )
1027 # Create the pod
1028 self.clients[CORE_CLIENT].create_namespaced_pod(namespace=namespace, body=pod)
1029
1030 @retry(
1031 attempts=10,
1032 delay=1,
1033 fallback=Exception("Failed deleting the pod"),
1034 )
1035 async def delete_pod(self, name: str, namespace: str):
1036 """
1037 Create a namespace
1038
1039 :param: name: Name of the pod to be deleted
1040 :param: namespace: Namespace
1041
1042 """
1043 self.clients[CORE_CLIENT].delete_namespaced_pod(name, namespace)