blob: 2c1a5b060953cf0a7ebdc778b76f1f537634c2a0 [file] [log] [blame]
almagiacdd20ae2024-12-13 09:45:45 +01001#######################################################################################
2# Copyright 2020 Canonical Ltd.
3# Copyright ETSI Contributors and Others.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#######################################################################################
17
18import base64
19import logging
almagiacdd20ae2024-12-13 09:45:45 +010020import typing
21import uuid
22import json
23import tarfile
24import io
25from time import sleep
26
27from distutils.version import LooseVersion
28
garciadeblas6d8acf32025-02-06 13:34:37 +010029from kubernetes import client as kclient, config as kconfig
almagiacdd20ae2024-12-13 09:45:45 +010030from kubernetes.client.api import VersionApi
31from kubernetes.client.models import (
32 V1ClusterRole,
33 V1Role,
34 V1ObjectMeta,
35 V1PolicyRule,
36 V1ServiceAccount,
37 V1ClusterRoleBinding,
38 V1RoleBinding,
39 V1RoleRef,
40 RbacV1Subject,
41 V1Secret,
42 V1SecretReference,
43 V1Namespace,
44 V1PersistentVolumeClaim,
45 V1PersistentVolumeClaimSpec,
46 V1PersistentVolumeClaimVolumeSource,
47 V1ResourceRequirements,
48 V1Pod,
49 V1PodSpec,
50 V1Volume,
51 V1VolumeMount,
52 V1Container,
rshrif8911b92025-06-11 18:19:07 +000053 V1ConfigMap,
almagiacdd20ae2024-12-13 09:45:45 +010054)
55from kubernetes.client.rest import ApiException
56from kubernetes.stream import stream
57from osm_lcm.n2vc.libjuju import retry_callback
58from retrying_async import retry
59
60SERVICE_ACCOUNT_TOKEN_KEY = "token"
61SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
62# clients
63CORE_CLIENT = "core_v1"
64RBAC_CLIENT = "rbac_v1"
65STORAGE_CLIENT = "storage_v1"
66CUSTOM_OBJECT_CLIENT = "custom_object"
67
68
69class Kubectl:
70 def __init__(self, config_file=None):
almagiacdd20ae2024-12-13 09:45:45 +010071 self.logger = logging.getLogger("lcm.kubectl")
garciadeblas6d8acf32025-02-06 13:34:37 +010072 self._config_file = config_file
73 self.logger.info(f"Kubectl cfg file: {config_file}")
74 # kconfig.load_kube_config(config_file=config_file)
75 self._api_client = kconfig.new_client_from_config(config_file=config_file)
76 self._clients = {
77 CORE_CLIENT: kclient.CoreV1Api(api_client=self._api_client),
78 RBAC_CLIENT: kclient.RbacAuthorizationV1Api(api_client=self._api_client),
79 STORAGE_CLIENT: kclient.StorageV1Api(api_client=self._api_client),
80 CUSTOM_OBJECT_CLIENT: kclient.CustomObjectsApi(api_client=self._api_client),
81 }
82 self._configuration = self._api_client.configuration.get_default_copy()
83 self.logger.info(f"Kubectl cfg file: {config_file}")
almagiacdd20ae2024-12-13 09:45:45 +010084
85 @property
86 def configuration(self):
87 return self._configuration
88
89 @property
90 def clients(self):
91 return self._clients
92
93 def get_services(
94 self,
95 field_selector: str = None,
96 label_selector: str = None,
97 ) -> typing.List[typing.Dict]:
98 """
99 Get Service list from a namespace
100
101 :param: field_selector: Kubernetes field selector for the namespace
102 :param: label_selector: Kubernetes label selector for the namespace
103
104 :return: List of the services matching the selectors specified
105 """
106 kwargs = {}
107 if field_selector:
108 kwargs["field_selector"] = field_selector
109 if label_selector:
110 kwargs["label_selector"] = label_selector
111 try:
112 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
113 return [
114 {
115 "name": i.metadata.name,
116 "cluster_ip": i.spec.cluster_ip,
117 "type": i.spec.type,
118 "ports": (
119 [
120 {
121 "name": p.name,
122 "node_port": p.node_port,
123 "port": p.port,
124 "protocol": p.protocol,
125 "target_port": p.target_port,
126 }
127 for p in i.spec.ports
128 ]
129 if i.spec.ports
130 else []
131 ),
132 "external_ip": [i.ip for i in i.status.load_balancer.ingress]
133 if i.status.load_balancer.ingress
134 else None,
135 }
136 for i in result.items
137 ]
138 except ApiException as e:
139 self.logger.error("Error calling get services: {}".format(e))
140 raise e
141
142 def get_default_storage_class(self) -> str:
143 """
144 Default storage class
145
146 :return: Returns the default storage class name, if exists.
147 If not, it returns the first storage class.
148 If there are not storage classes, returns None
149 """
150 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
151 selected_sc = None
152 default_sc_annotations = {
153 "storageclass.kubernetes.io/is-default-class": "true",
154 # Older clusters still use the beta annotation.
155 "storageclass.beta.kubernetes.io/is-default-class": "true",
156 }
157 for sc in storage_classes.items:
158 if not selected_sc:
159 # Select the first storage class in case there is no a default-class
160 selected_sc = sc.metadata.name
161 annotations = sc.metadata.annotations or {}
162 if any(
163 k in annotations and annotations[k] == v
164 for k, v in default_sc_annotations.items()
165 ):
166 # Default storage
167 selected_sc = sc.metadata.name
168 break
169 return selected_sc
170
171 def create_cluster_role(
172 self,
173 name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100174 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100175 namespace: str = "kube-system",
176 ):
177 """
178 Create a cluster role
179
180 :param: name: Name of the cluster role
181 :param: labels: Labels for cluster role metadata
182 :param: namespace: Kubernetes namespace for cluster role metadata
183 Default: kube-system
184 """
185 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
186 field_selector="metadata.name={}".format(name)
187 )
188
189 if len(cluster_roles.items) > 0:
190 raise Exception("Role with metadata.name={} already exists".format(name))
191
192 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
193 # Cluster role
194 cluster_role = V1ClusterRole(
195 metadata=metadata,
196 rules=[
197 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
198 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
199 ],
200 )
201
202 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
203
204 async def create_role(
205 self,
206 name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100207 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100208 api_groups: list,
209 resources: list,
210 verbs: list,
211 namespace: str,
212 ):
213 """
214 Create a role with one PolicyRule
215
216 :param: name: Name of the namespaced Role
217 :param: labels: Labels for namespaced Role metadata
218 :param: api_groups: List with api-groups allowed in the policy rule
219 :param: resources: List with resources allowed in the policy rule
220 :param: verbs: List with verbs allowed in the policy rule
221 :param: namespace: Kubernetes namespace for Role metadata
222
223 :return: None
224 """
225
226 roles = self.clients[RBAC_CLIENT].list_namespaced_role(
227 namespace, field_selector="metadata.name={}".format(name)
228 )
229
230 if len(roles.items) > 0:
231 raise Exception("Role with metadata.name={} already exists".format(name))
232
233 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
234
235 role = V1Role(
236 metadata=metadata,
237 rules=[
238 V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
239 ],
240 )
241
242 self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
243
244 def delete_cluster_role(self, name: str):
245 """
246 Delete a cluster role
247
248 :param: name: Name of the cluster role
249 """
250 self.clients[RBAC_CLIENT].delete_cluster_role(name)
251
252 def _get_kubectl_version(self):
garciadeblas6d8acf32025-02-06 13:34:37 +0100253 self.logger.debug("Enter _get_kubectl_version function")
254 version = VersionApi(api_client=self._api_client).get_code()
almagiacdd20ae2024-12-13 09:45:45 +0100255 return "{}.{}".format(version.major, version.minor)
256
257 def _need_to_create_new_secret(self):
258 min_k8s_version = "1.24"
259 current_k8s_version = self._get_kubectl_version()
260 return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
261
262 def _get_secret_name(self, service_account_name: str):
263 random_alphanum = str(uuid.uuid4())[:5]
264 return "{}-token-{}".format(service_account_name, random_alphanum)
265
266 def _create_service_account_secret(
267 self,
268 service_account_name: str,
269 namespace: str,
270 secret_name: str,
271 ):
272 """
273 Create a secret for the service account. K8s version >= 1.24
274
275 :param: service_account_name: Name of the service account
276 :param: namespace: Kubernetes namespace for service account metadata
277 :param: secret_name: Name of the secret
278 """
279 v1_core = self.clients[CORE_CLIENT]
280 secrets = v1_core.list_namespaced_secret(
281 namespace, field_selector="metadata.name={}".format(secret_name)
282 ).items
283
284 if len(secrets) > 0:
285 raise Exception(
286 "Secret with metadata.name={} already exists".format(secret_name)
287 )
288
289 annotations = {"kubernetes.io/service-account.name": service_account_name}
290 metadata = V1ObjectMeta(
291 name=secret_name, namespace=namespace, annotations=annotations
292 )
293 type = "kubernetes.io/service-account-token"
294 secret = V1Secret(metadata=metadata, type=type)
295 v1_core.create_namespaced_secret(namespace, secret)
296
297 def _get_secret_reference_list(self, namespace: str, secret_name: str):
298 """
299 Return a secret reference list with one secret.
300 K8s version >= 1.24
301
302 :param: namespace: Kubernetes namespace for service account metadata
303 :param: secret_name: Name of the secret
304 :rtype: list[V1SecretReference]
305 """
306 return [V1SecretReference(name=secret_name, namespace=namespace)]
307
308 def create_service_account(
309 self,
310 name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100311 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100312 namespace: str = "kube-system",
313 ):
314 """
315 Create a service account
316
317 :param: name: Name of the service account
318 :param: labels: Labels for service account metadata
319 :param: namespace: Kubernetes namespace for service account metadata
320 Default: kube-system
321 """
322 v1_core = self.clients[CORE_CLIENT]
323 service_accounts = v1_core.list_namespaced_service_account(
324 namespace, field_selector="metadata.name={}".format(name)
325 )
326 if len(service_accounts.items) > 0:
327 raise Exception(
328 "Service account with metadata.name={} already exists".format(name)
329 )
330
331 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
332
333 if self._need_to_create_new_secret():
334 secret_name = self._get_secret_name(name)
335 secrets = self._get_secret_reference_list(namespace, secret_name)
336 service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
337 v1_core.create_namespaced_service_account(namespace, service_account)
338 self._create_service_account_secret(name, namespace, secret_name)
339 else:
340 service_account = V1ServiceAccount(metadata=metadata)
341 v1_core.create_namespaced_service_account(namespace, service_account)
342
343 def delete_secret(self, name: str, namespace: str = "kube-system"):
344 """
345 Delete a secret
346
347 :param: name: Name of the secret
348 :param: namespace: Kubernetes namespace
349 Default: kube-system
350 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100351 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100352 self.clients[CORE_CLIENT].delete_namespaced_secret(name, namespace)
353
354 def delete_service_account(self, name: str, namespace: str = "kube-system"):
355 """
356 Delete a service account
357
358 :param: name: Name of the service account
359 :param: namespace: Kubernetes namespace for service account metadata
360 Default: kube-system
361 """
362 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
363
364 def create_cluster_role_binding(
garciadeblas6bbe55e2025-02-08 11:30:39 +0100365 self, name: str, labels: typing.Dict[str, str], namespace: str = "kube-system"
almagiacdd20ae2024-12-13 09:45:45 +0100366 ):
367 """
368 Create a cluster role binding
369
370 :param: name: Name of the cluster role
371 :param: labels: Labels for cluster role binding metadata
372 :param: namespace: Kubernetes namespace for cluster role binding metadata
373 Default: kube-system
374 """
375 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
376 field_selector="metadata.name={}".format(name)
377 )
378 if len(role_bindings.items) > 0:
379 raise Exception("Generated rbac id already exists")
380
381 role_binding = V1ClusterRoleBinding(
382 metadata=V1ObjectMeta(name=name, labels=labels),
383 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
384 subjects=[
385 RbacV1Subject(kind="ServiceAccount", name=name, namespace=namespace)
386 ],
387 )
388 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
389
390 async def create_role_binding(
391 self,
392 name: str,
393 role_name: str,
394 sa_name: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100395 labels: typing.Dict[str, str],
almagiacdd20ae2024-12-13 09:45:45 +0100396 namespace: str,
397 ):
398 """
399 Create a cluster role binding
400
401 :param: name: Name of the namespaced Role Binding
402 :param: role_name: Name of the namespaced Role to be bound
403 :param: sa_name: Name of the Service Account to be bound
404 :param: labels: Labels for Role Binding metadata
405 :param: namespace: Kubernetes namespace for Role Binding metadata
406
407 :return: None
408 """
409 role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
410 namespace, field_selector="metadata.name={}".format(name)
411 )
412 if len(role_bindings.items) > 0:
413 raise Exception(
414 "Role Binding with metadata.name={} already exists".format(name)
415 )
416
417 role_binding = V1RoleBinding(
418 metadata=V1ObjectMeta(name=name, labels=labels),
419 role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
420 subjects=[
421 RbacV1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
422 ],
423 )
424 self.clients[RBAC_CLIENT].create_namespaced_role_binding(
425 namespace, role_binding
426 )
427
428 def delete_cluster_role_binding(self, name: str):
429 """
430 Delete a cluster role binding
431
432 :param: name: Name of the cluster role binding
433 """
434 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
435
436 @retry(
437 attempts=10,
438 delay=1,
439 fallback=Exception("Failed getting the secret from service account"),
440 callback=retry_callback,
441 )
442 async def get_secret_data(
443 self, name: str, namespace: str = "kube-system"
garciadeblas6d8acf32025-02-06 13:34:37 +0100444 ) -> typing.Tuple[str, str]:
almagiacdd20ae2024-12-13 09:45:45 +0100445 """
446 Get secret data
447
448 :param: name: Name of the secret data
449 :param: namespace: Name of the namespace where the secret is stored
450
451 :return: Tuple with the token and client certificate
452 """
453 v1_core = self.clients[CORE_CLIENT]
454
455 secret_name = None
456
457 service_accounts = v1_core.list_namespaced_service_account(
458 namespace, field_selector="metadata.name={}".format(name)
459 )
460 if len(service_accounts.items) == 0:
461 raise Exception(
462 "Service account not found with metadata.name={}".format(name)
463 )
464 service_account = service_accounts.items[0]
465 if service_account.secrets and len(service_account.secrets) > 0:
466 secret_name = service_account.secrets[0].name
467 if not secret_name:
468 raise Exception(
469 "Failed getting the secret from service account {}".format(name)
470 )
471 # TODO: refactor to use get_secret_content
472 secret = v1_core.list_namespaced_secret(
473 namespace, field_selector="metadata.name={}".format(secret_name)
474 ).items[0]
475
476 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
477 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
478
479 return (
480 base64.b64decode(token).decode("utf-8"),
481 base64.b64decode(client_certificate_data).decode("utf-8"),
482 )
483
484 @retry(
485 attempts=10,
486 delay=1,
487 fallback=Exception("Failed getting data from the secret"),
488 )
489 async def get_secret_content(
490 self,
491 name: str,
492 namespace: str,
garciadeblas6bbe55e2025-02-08 11:30:39 +0100493 ) -> typing.Dict:
almagiacdd20ae2024-12-13 09:45:45 +0100494 """
495 Get secret data
496
497 :param: name: Name of the secret
498 :param: namespace: Name of the namespace where the secret is stored
499
500 :return: Dictionary with secret's data
501 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100502 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100503 v1_core = self.clients[CORE_CLIENT]
504
505 secret = v1_core.read_namespaced_secret(name, namespace)
506
507 return secret.data
508
509 @retry(
510 attempts=10,
511 delay=1,
512 fallback=Exception("Failed creating the secret"),
513 )
514 async def create_secret(
515 self, name: str, data: dict, namespace: str, secret_type: str
516 ):
517 """
518 Create secret with data
519
520 :param: name: Name of the secret
521 :param: data: Dict with data content. Values must be already base64 encoded
522 :param: namespace: Name of the namespace where the secret will be stored
523 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
524
525 :return: None
526 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100527 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
garciadeblas6d8acf32025-02-06 13:34:37 +0100528 self.logger.debug("Enter create_secret function")
almagiacdd20ae2024-12-13 09:45:45 +0100529 v1_core = self.clients[CORE_CLIENT]
garciadeblas6d8acf32025-02-06 13:34:37 +0100530 self.logger.debug(f"v1_core: {v1_core}")
almagiacdd20ae2024-12-13 09:45:45 +0100531 metadata = V1ObjectMeta(name=name, namespace=namespace)
garciadeblas6d8acf32025-02-06 13:34:37 +0100532 self.logger.debug(f"metadata: {metadata}")
almagiacdd20ae2024-12-13 09:45:45 +0100533 secret = V1Secret(metadata=metadata, data=data, type=secret_type)
garciadeblas6d8acf32025-02-06 13:34:37 +0100534 self.logger.debug(f"secret: {secret}")
garciadeblasc58e9962025-06-17 18:16:20 +0200535 try:
536 v1_core.create_namespaced_secret(namespace, secret)
537 self.logger.info("Namespaced secret was created")
538 except ApiException as e:
539 self.logger.error(f"Failed to create namespaced secret: {e}")
540 raise
almagiacdd20ae2024-12-13 09:45:45 +0100541
rshrif8911b92025-06-11 18:19:07 +0000542 async def create_configmap(self, name: str, data: dict, namespace: str):
543 """
544 Create secret with data
545
546 :param: name: Name of the configmap
547 :param: data: Dict with data content.
548 :param: namespace: Name of the namespace where the configmap will be stored
549
550 :return: None
551 """
552 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
553 self.logger.debug("Enter create_configmap function")
554 v1_core = self.clients[CORE_CLIENT]
555 self.logger.debug(f"v1_core: {v1_core}")
556 config_map = V1ConfigMap(
557 metadata=V1ObjectMeta(name=name, namespace=namespace),
558 data=data,
559 )
560 self.logger.debug(f"config_map: {config_map}")
561 try:
562 v1_core.create_namespaced_config_map(namespace, config_map)
563 self.logger.info("Namespaced configmap was created")
564 except ApiException as e:
565 self.logger.error(f"Failed to create namespaced configmap: {e}")
566 raise
567
568 def delete_configmap(self, name: str, namespace: str):
569 """
570 Delete a configmap
571
572 :param: name: Name of the configmap
573 :param: namespace: Kubernetes namespace
574 """
575 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
576 self.clients[CORE_CLIENT].delete_namespaced_config_map(name, namespace)
577
almagiacdd20ae2024-12-13 09:45:45 +0100578 async def create_certificate(
579 self,
580 namespace: str,
581 name: str,
582 dns_prefix: str,
583 secret_name: str,
584 usages: list,
585 issuer_name: str,
586 ):
587 """
588 Creates cert-manager certificate object
589
590 :param: namespace: Name of the namespace where the certificate and secret is stored
591 :param: name: Name of the certificate object
592 :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
593 :param: secret_name: Name of the secret created by cert-manager
594 :param: usages: List of X.509 key usages
595 :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
596
597 """
598 certificate_body = {
599 "apiVersion": "cert-manager.io/v1",
600 "kind": "Certificate",
601 "metadata": {"name": name, "namespace": namespace},
602 "spec": {
603 "secretName": secret_name,
604 "privateKey": {
605 "rotationPolicy": "Always",
606 "algorithm": "ECDSA",
607 "size": 256,
608 },
609 "duration": "8760h", # 1 Year
610 "renewBefore": "2208h", # 9 months
611 "subject": {"organizations": ["osm"]},
612 "commonName": "osm",
613 "isCA": False,
614 "usages": usages,
615 "dnsNames": [
616 "{}.{}".format(dns_prefix, namespace),
617 "{}.{}.svc".format(dns_prefix, namespace),
618 "{}.{}.svc.cluster".format(dns_prefix, namespace),
619 "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
620 ],
621 "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
622 },
623 }
624 client = self.clients[CUSTOM_OBJECT_CLIENT]
625 try:
626 client.create_namespaced_custom_object(
627 group="cert-manager.io",
628 plural="certificates",
629 version="v1",
630 body=certificate_body,
631 namespace=namespace,
632 )
633 except ApiException as e:
634 info = json.loads(e.body)
635 if info.get("reason").lower() == "alreadyexists":
636 self.logger.warning("Certificate already exists: {}".format(e))
637 else:
638 raise e
639
640 async def delete_certificate(self, namespace, object_name):
641 client = self.clients[CUSTOM_OBJECT_CLIENT]
642 try:
643 client.delete_namespaced_custom_object(
644 group="cert-manager.io",
645 plural="certificates",
646 version="v1",
647 name=object_name,
648 namespace=namespace,
649 )
650 except ApiException as e:
651 info = json.loads(e.body)
652 if info.get("reason").lower() == "notfound":
653 self.logger.warning("Certificate already deleted: {}".format(e))
654 else:
655 raise e
656
657 @retry(
658 attempts=10,
659 delay=1,
660 fallback=Exception("Failed creating the namespace"),
661 )
662 async def create_namespace(self, name: str, labels: dict = None):
663 """
664 Create a namespace
665
666 :param: name: Name of the namespace to be created
667 :param: labels: Dictionary with labels for the new namespace
668
669 """
670 v1_core = self.clients[CORE_CLIENT]
671 metadata = V1ObjectMeta(name=name, labels=labels)
672 namespace = V1Namespace(
673 metadata=metadata,
674 )
675
676 try:
677 v1_core.create_namespace(namespace)
678 self.logger.debug("Namespace created: {}".format(name))
679 except ApiException as e:
680 info = json.loads(e.body)
681 if info.get("reason").lower() == "alreadyexists":
682 self.logger.warning("Namespace already exists: {}".format(e))
683 else:
684 raise e
685
686 @retry(
687 attempts=10,
688 delay=1,
689 fallback=Exception("Failed deleting the namespace"),
690 )
691 async def delete_namespace(self, name: str):
692 """
693 Delete a namespace
694
695 :param: name: Name of the namespace to be deleted
696
697 """
698 try:
699 self.clients[CORE_CLIENT].delete_namespace(name)
700 except ApiException as e:
701 if e.reason == "Not Found":
702 self.logger.warning("Namespace already deleted: {}".format(e))
703
704 def get_secrets(
705 self,
706 namespace: str,
707 field_selector: str = None,
708 ) -> typing.List[typing.Dict]:
709 """
710 Get Secret list from a namespace
711
712 :param: namespace: Kubernetes namespace
713 :param: field_selector: Kubernetes field selector
714
715 :return: List of the secrets matching the selectors specified
716 """
717 try:
718 v1_core = self.clients[CORE_CLIENT]
719 secrets = v1_core.list_namespaced_secret(
720 namespace=namespace,
721 field_selector=field_selector,
722 ).items
723 return secrets
724 except ApiException as e:
725 self.logger.error("Error calling get secrets: {}".format(e))
726 raise e
727
728 def create_generic_object(
729 self,
730 api_group: str,
731 api_plural: str,
732 api_version: str,
733 namespace: str,
734 manifest_dict: dict,
735 ):
736 """
737 Creates generic object
738
739 :param: api_group: API Group
740 :param: api_plural: API Plural
741 :param: api_version: API Version
742 :param: namespace: Namespace
743 :param: manifest_dict: Dictionary with the content of the Kubernetes manifest
744
745 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100746 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100747 client = self.clients[CUSTOM_OBJECT_CLIENT]
748 try:
749 if namespace:
750 client.create_namespaced_custom_object(
751 group=api_group,
752 plural=api_plural,
753 version=api_version,
754 body=manifest_dict,
755 namespace=namespace,
756 )
757 else:
758 client.create_cluster_custom_object(
759 group=api_group,
760 plural=api_plural,
761 version=api_version,
762 body=manifest_dict,
763 )
764 except ApiException as e:
765 info = json.loads(e.body)
766 if info.get("reason").lower() == "alreadyexists":
767 self.logger.warning("Object already exists: {}".format(e))
768 else:
769 raise e
770
771 def delete_generic_object(
772 self,
773 api_group: str,
774 api_plural: str,
775 api_version: str,
776 namespace: str,
777 name: str,
778 ):
779 """
780 Deletes generic object
781
782 :param: api_group: API Group
783 :param: api_plural: API Plural
784 :param: api_version: API Version
785 :param: namespace: Namespace
786 :param: name: Name of the object
787
788 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100789 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100790 client = self.clients[CUSTOM_OBJECT_CLIENT]
791 try:
792 if namespace:
793 client.delete_namespaced_custom_object(
794 group=api_group,
795 plural=api_plural,
796 version=api_version,
797 name=name,
798 namespace=namespace,
799 )
800 else:
801 client.delete_cluster_custom_object(
802 group=api_group,
803 plural=api_plural,
804 version=api_version,
805 name=name,
806 )
807 except ApiException as e:
808 info = json.loads(e.body)
809 if info.get("reason").lower() == "notfound":
810 self.logger.warning("Object already deleted: {}".format(e))
811 else:
812 raise e
813
814 async def get_generic_object(
815 self,
816 api_group: str,
817 api_plural: str,
818 api_version: str,
819 namespace: str,
820 name: str,
821 ):
822 """
823 Gets generic object
824
825 :param: api_group: API Group
826 :param: api_plural: API Plural
827 :param: api_version: API Version
828 :param: namespace: Namespace
829 :param: name: Name of the object
830
831 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100832 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100833 client = self.clients[CUSTOM_OBJECT_CLIENT]
834 try:
835 if namespace:
836 object_dict = client.list_namespaced_custom_object(
837 group=api_group,
838 plural=api_plural,
839 version=api_version,
840 namespace=namespace,
841 field_selector=f"metadata.name={name}",
842 )
843 else:
844 object_dict = client.list_cluster_custom_object(
845 group=api_group,
846 plural=api_plural,
847 version=api_version,
848 field_selector=f"metadata.name={name}",
849 )
850 if len(object_dict.get("items")) == 0:
851 return None
852 return object_dict.get("items")[0]
853 except ApiException as e:
854 self.logger.debug(f"Exception: {e}")
855 info = json.loads(e.body)
garciadeblasad6d1ba2025-01-22 16:02:18 +0100856 raise f"Api Exception: {e}. Reason: {info.get('reason')}"
almagiacdd20ae2024-12-13 09:45:45 +0100857
858 async def list_generic_object(
859 self,
860 api_group: str,
861 api_plural: str,
862 api_version: str,
863 namespace: str,
864 ):
865 """
866 Lists all generic objects of the requested API group
867
868 :param: api_group: API Group
869 :param: api_plural: API Plural
870 :param: api_version: API Version
871 :param: namespace: Namespace
872
873 """
garciadeblas6d8acf32025-02-06 13:34:37 +0100874 self.logger.debug(f"Kubectl cfg file: {self._config_file}")
almagiacdd20ae2024-12-13 09:45:45 +0100875 client = self.clients[CUSTOM_OBJECT_CLIENT]
876 try:
877 if namespace:
878 object_dict = client.list_namespaced_custom_object(
879 group=api_group,
880 plural=api_plural,
881 version=api_version,
882 namespace=namespace,
883 )
884 else:
885 object_dict = client.list_cluster_custom_object(
886 group=api_group,
887 plural=api_plural,
888 version=api_version,
889 )
890 self.logger.debug(f"Object-list: {object_dict.get('items')}")
891 return object_dict.get("items")
892 except ApiException as e:
893 self.logger.debug(f"Exception: {e}")
894 info = json.loads(e.body)
895 if info.get("reason").lower() == "notfound":
896 self.logger.warning(
897 "Cannot find specified custom objects: {}".format(e)
898 )
899 return []
900 else:
901 raise e
902
903 @retry(
904 attempts=10,
905 delay=1,
906 fallback=Exception("Failed creating the secret"),
907 )
908 async def create_secret_string(
909 self, name: str, string_data: str, namespace: str, secret_type: str
910 ):
911 """
912 Create secret with data
913
914 :param: name: Name of the secret
915 :param: string_data: String with data content
916 :param: namespace: Name of the namespace where the secret will be stored
917 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
918
919 :return: None
920 """
921 v1_core = self.clients[CORE_CLIENT]
922 metadata = V1ObjectMeta(name=name, namespace=namespace)
923 secret = V1Secret(metadata=metadata, string_data=string_data, type=secret_type)
924 v1_core.create_namespaced_secret(namespace, secret)
925
926 @retry(
927 attempts=10,
928 delay=1,
929 fallback=Exception("Failed creating the pvc"),
930 )
931 async def create_pvc(self, name: str, namespace: str):
932 """
933 Create a namespace
934
935 :param: name: Name of the pvc to be created
936 :param: namespace: Name of the namespace where the pvc will be stored
937
938 """
939 try:
940 pvc = V1PersistentVolumeClaim(
941 api_version="v1",
942 kind="PersistentVolumeClaim",
943 metadata=V1ObjectMeta(name=name),
944 spec=V1PersistentVolumeClaimSpec(
945 access_modes=["ReadWriteOnce"],
946 resources=V1ResourceRequirements(requests={"storage": "100Mi"}),
947 ),
948 )
949 self.clients[CORE_CLIENT].create_namespaced_persistent_volume_claim(
950 namespace=namespace, body=pvc
951 )
952 except ApiException as e:
953 info = json.loads(e.body)
954 if info.get("reason").lower() == "alreadyexists":
955 self.logger.warning("PVC already exists: {}".format(e))
956 else:
957 raise e
958
959 @retry(
960 attempts=10,
961 delay=1,
962 fallback=Exception("Failed deleting the pvc"),
963 )
964 async def delete_pvc(self, name: str, namespace: str):
965 """
966 Create a namespace
967
968 :param: name: Name of the pvc to be deleted
969 :param: namespace: Namespace
970
971 """
972 self.clients[CORE_CLIENT].delete_namespaced_persistent_volume_claim(
973 name, namespace
974 )
975
976 def copy_file_to_pod(
977 self, namespace, pod_name, container_name, src_file, dest_path
978 ):
979 # Create an in-memory tar file containing the source file
980 tar_buffer = io.BytesIO()
981 with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
982 tar.add(src_file, arcname=dest_path.split("/")[-1])
983
984 tar_buffer.seek(0)
985
986 # Define the command to extract the tar file in the pod
987 exec_command = ["tar", "xvf", "-", "-C", dest_path.rsplit("/", 1)[0]]
988
989 # Execute the command
990 resp = stream(
991 self.clients[CORE_CLIENT].connect_get_namespaced_pod_exec,
992 pod_name,
993 namespace,
994 command=exec_command,
995 container=container_name,
996 stdin=True,
997 stderr=True,
998 stdout=True,
999 tty=False,
1000 _preload_content=False,
1001 )
1002
1003 # Write the tar data to the pod
1004 resp.write_stdin(tar_buffer.read())
1005 resp.close()
1006
1007 @retry(
1008 attempts=10,
1009 delay=1,
1010 fallback=Exception("Failed creating the pvc"),
1011 )
1012 async def create_pvc_with_content(
1013 self, name: str, namespace: str, src_file: str, dest_filename: str
1014 ):
1015 """
1016 Create a PVC with content
1017
1018 :param: name: Name of the pvc to be created
1019 :param: namespace: Name of the namespace where the pvc will be stored
1020 :param: src_file: File to be copied
1021 :param: filename: Name of the file in the destination folder
1022 """
1023 pod_name = f"copy-pod-{name}"
1024 self.logger.debug(f"Creating pvc {name}")
1025 await self.create_pvc(name=name, namespace=namespace)
1026 self.logger.debug("Sleeping")
1027 sleep(40)
1028 self.logger.debug(f"Creating pod {pod_name}")
1029 await self.create_copy_pod(name=pod_name, namespace=namespace, pvc_name=name)
1030 self.logger.debug("Sleeping")
1031 sleep(40)
1032 self.logger.debug(f"Copying files to pod {pod_name}")
1033 self.copy_file_to_pod(
1034 namespace=namespace,
1035 pod_name=pod_name,
1036 container_name="copy-container",
1037 src_file=src_file,
1038 dest_path=f"/mnt/data/{dest_filename}",
1039 )
1040
1041 @retry(
1042 attempts=10,
1043 delay=1,
1044 fallback=Exception("Failed creating the pvc"),
1045 )
1046 async def create_copy_pod(self, name: str, namespace: str, pvc_name: str):
1047 """
1048 Create a pod to copy content into a PVC
1049
1050 :param: name: Name of the pod to be created
1051 :param: namespace: Name of the namespace where the pod will be stored
1052 :param: pvc_name: Name of the PVC that the pod will mount as a volume
1053
1054 """
1055 pod = V1Pod(
1056 api_version="v1",
1057 kind="Pod",
garciadeblas6d8acf32025-02-06 13:34:37 +01001058 metadata=kclient.V1ObjectMeta(name=name),
almagiacdd20ae2024-12-13 09:45:45 +01001059 spec=V1PodSpec(
1060 containers=[
1061 V1Container(
1062 name="copy-container",
1063 image="busybox", # Imagen ligera para copiar archivos
1064 command=["sleep", "3600"], # Mantén el contenedor en ejecución
1065 volume_mounts=[
1066 V1VolumeMount(mount_path="/mnt/data", name="my-storage")
1067 ],
1068 )
1069 ],
1070 volumes=[
1071 V1Volume(
1072 name="my-storage",
1073 persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
1074 claim_name=pvc_name
1075 ),
1076 )
1077 ],
1078 ),
1079 )
1080 # Create the pod
1081 self.clients[CORE_CLIENT].create_namespaced_pod(namespace=namespace, body=pod)
1082
1083 @retry(
1084 attempts=10,
1085 delay=1,
1086 fallback=Exception("Failed deleting the pod"),
1087 )
1088 async def delete_pod(self, name: str, namespace: str):
1089 """
1090 Create a namespace
1091
1092 :param: name: Name of the pod to be deleted
1093 :param: namespace: Namespace
1094
1095 """
1096 self.clients[CORE_CLIENT].delete_namespaced_pod(name, namespace)