Coverage for n2vc/kubectl.py: 78%
189 statements
« prev ^ index » next coverage.py v7.3.1, created at 2024-06-29 09:03 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2024-06-29 09:03 +0000
1# Copyright 2020 Canonical Ltd.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import base64
16import logging
17from typing import Dict
18import typing
19import uuid
20import json
22from distutils.version import LooseVersion
24from kubernetes import client, config
25from kubernetes.client.api import VersionApi
26from kubernetes.client.models import (
27 V1ClusterRole,
28 V1Role,
29 V1ObjectMeta,
30 V1PolicyRule,
31 V1ServiceAccount,
32 V1ClusterRoleBinding,
33 V1RoleBinding,
34 V1RoleRef,
35 V1Subject,
36 V1Secret,
37 V1SecretReference,
38 V1Namespace,
39)
40from kubernetes.client.rest import ApiException
41from n2vc.libjuju import retry_callback
42from retrying_async import retry
45SERVICE_ACCOUNT_TOKEN_KEY = "token"
46SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
47# clients
48CORE_CLIENT = "core_v1"
49RBAC_CLIENT = "rbac_v1"
50STORAGE_CLIENT = "storage_v1"
51CUSTOM_OBJECT_CLIENT = "custom_object"
54class Kubectl:
55 def __init__(self, config_file=None):
56 config.load_kube_config(config_file=config_file)
57 self._clients = {
58 CORE_CLIENT: client.CoreV1Api(),
59 RBAC_CLIENT: client.RbacAuthorizationV1Api(),
60 STORAGE_CLIENT: client.StorageV1Api(),
61 CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
62 }
63 self._configuration = config.kube_config.Configuration.get_default_copy()
64 self.logger = logging.getLogger("Kubectl")
66 @property
67 def configuration(self):
68 return self._configuration
70 @property
71 def clients(self):
72 return self._clients
74 def get_services(
75 self,
76 field_selector: str = None,
77 label_selector: str = None,
78 ) -> typing.List[typing.Dict]:
79 """
80 Get Service list from a namespace
82 :param: field_selector: Kubernetes field selector for the namespace
83 :param: label_selector: Kubernetes label selector for the namespace
85 :return: List of the services matching the selectors specified
86 """
87 kwargs = {}
88 if field_selector:
89 kwargs["field_selector"] = field_selector
90 if label_selector:
91 kwargs["label_selector"] = label_selector
92 try:
93 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
94 return [
95 {
96 "name": i.metadata.name,
97 "cluster_ip": i.spec.cluster_ip,
98 "type": i.spec.type,
99 "ports": [
100 {
101 "name": p.name,
102 "node_port": p.node_port,
103 "port": p.port,
104 "protocol": p.protocol,
105 "target_port": p.target_port,
106 }
107 for p in i.spec.ports
108 ]
109 if i.spec.ports
110 else [],
111 "external_ip": [i.ip for i in i.status.load_balancer.ingress]
112 if i.status.load_balancer.ingress
113 else None,
114 }
115 for i in result.items
116 ]
117 except ApiException as e:
118 self.logger.error("Error calling get services: {}".format(e))
119 raise e
121 def get_default_storage_class(self) -> str:
122 """
123 Default storage class
125 :return: Returns the default storage class name, if exists.
126 If not, it returns the first storage class.
127 If there are not storage classes, returns None
128 """
129 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
130 selected_sc = None
131 default_sc_annotations = {
132 "storageclass.kubernetes.io/is-default-class": "true",
133 # Older clusters still use the beta annotation.
134 "storageclass.beta.kubernetes.io/is-default-class": "true",
135 }
136 for sc in storage_classes.items:
137 if not selected_sc:
138 # Select the first storage class in case there is no a default-class
139 selected_sc = sc.metadata.name
140 annotations = sc.metadata.annotations or {}
141 if any(
142 k in annotations and annotations[k] == v
143 for k, v in default_sc_annotations.items()
144 ):
145 # Default storage
146 selected_sc = sc.metadata.name
147 break
148 return selected_sc
150 def create_cluster_role(
151 self,
152 name: str,
153 labels: Dict[str, str],
154 namespace: str = "kube-system",
155 ):
156 """
157 Create a cluster role
159 :param: name: Name of the cluster role
160 :param: labels: Labels for cluster role metadata
161 :param: namespace: Kubernetes namespace for cluster role metadata
162 Default: kube-system
163 """
164 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
165 field_selector="metadata.name={}".format(name)
166 )
168 if len(cluster_roles.items) > 0:
169 raise Exception("Role with metadata.name={} already exists".format(name))
171 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
172 # Cluster role
173 cluster_role = V1ClusterRole(
174 metadata=metadata,
175 rules=[
176 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
177 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
178 ],
179 )
181 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
183 async def create_role(
184 self,
185 name: str,
186 labels: Dict[str, str],
187 api_groups: list,
188 resources: list,
189 verbs: list,
190 namespace: str,
191 ):
192 """
193 Create a role with one PolicyRule
195 :param: name: Name of the namespaced Role
196 :param: labels: Labels for namespaced Role metadata
197 :param: api_groups: List with api-groups allowed in the policy rule
198 :param: resources: List with resources allowed in the policy rule
199 :param: verbs: List with verbs allowed in the policy rule
200 :param: namespace: Kubernetes namespace for Role metadata
202 :return: None
203 """
205 roles = self.clients[RBAC_CLIENT].list_namespaced_role(
206 namespace, field_selector="metadata.name={}".format(name)
207 )
209 if len(roles.items) > 0:
210 raise Exception("Role with metadata.name={} already exists".format(name))
212 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
214 role = V1Role(
215 metadata=metadata,
216 rules=[
217 V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
218 ],
219 )
221 self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
223 def delete_cluster_role(self, name: str):
224 """
225 Delete a cluster role
227 :param: name: Name of the cluster role
228 """
229 self.clients[RBAC_CLIENT].delete_cluster_role(name)
231 def _get_kubectl_version(self):
232 version = VersionApi().get_code()
233 return "{}.{}".format(version.major, version.minor)
235 def _need_to_create_new_secret(self):
236 min_k8s_version = "1.24"
237 current_k8s_version = self._get_kubectl_version()
238 return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
240 def _get_secret_name(self, service_account_name: str):
241 random_alphanum = str(uuid.uuid4())[:5]
242 return "{}-token-{}".format(service_account_name, random_alphanum)
244 def _create_service_account_secret(
245 self, service_account_name: str, namespace: str, secret_name: str
246 ):
247 """
248 Create a secret for the service account. K8s version >= 1.24
250 :param: service_account_name: Name of the service account
251 :param: namespace: Kubernetes namespace for service account metadata
252 :param: secret_name: Name of the secret
253 """
254 v1_core = self.clients[CORE_CLIENT]
255 secrets = v1_core.list_namespaced_secret(
256 namespace, field_selector="metadata.name={}".format(secret_name)
257 ).items
259 if len(secrets) > 0:
260 raise Exception(
261 "Secret with metadata.name={} already exists".format(secret_name)
262 )
264 annotations = {"kubernetes.io/service-account.name": service_account_name}
265 metadata = V1ObjectMeta(
266 name=secret_name, namespace=namespace, annotations=annotations
267 )
268 type = "kubernetes.io/service-account-token"
269 secret = V1Secret(metadata=metadata, type=type)
270 v1_core.create_namespaced_secret(namespace, secret)
272 def _get_secret_reference_list(self, namespace: str, secret_name: str):
273 """
274 Return a secret reference list with one secret.
275 K8s version >= 1.24
277 :param: namespace: Kubernetes namespace for service account metadata
278 :param: secret_name: Name of the secret
279 :rtype: list[V1SecretReference]
280 """
281 return [V1SecretReference(name=secret_name, namespace=namespace)]
283 def create_service_account(
284 self,
285 name: str,
286 labels: Dict[str, str],
287 namespace: str = "kube-system",
288 ):
289 """
290 Create a service account
292 :param: name: Name of the service account
293 :param: labels: Labels for service account metadata
294 :param: namespace: Kubernetes namespace for service account metadata
295 Default: kube-system
296 """
297 v1_core = self.clients[CORE_CLIENT]
298 service_accounts = v1_core.list_namespaced_service_account(
299 namespace, field_selector="metadata.name={}".format(name)
300 )
301 if len(service_accounts.items) > 0:
302 raise Exception(
303 "Service account with metadata.name={} already exists".format(name)
304 )
306 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
308 if self._need_to_create_new_secret():
309 secret_name = self._get_secret_name(name)
310 secrets = self._get_secret_reference_list(namespace, secret_name)
311 service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
312 v1_core.create_namespaced_service_account(namespace, service_account)
313 self._create_service_account_secret(name, namespace, secret_name)
314 else:
315 service_account = V1ServiceAccount(metadata=metadata)
316 v1_core.create_namespaced_service_account(namespace, service_account)
318 def delete_service_account(self, name: str, namespace: str = "kube-system"):
319 """
320 Delete a service account
322 :param: name: Name of the service account
323 :param: namespace: Kubernetes namespace for service account metadata
324 Default: kube-system
325 """
326 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
328 def create_cluster_role_binding(
329 self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
330 ):
331 """
332 Create a cluster role binding
334 :param: name: Name of the cluster role
335 :param: labels: Labels for cluster role binding metadata
336 :param: namespace: Kubernetes namespace for cluster role binding metadata
337 Default: kube-system
338 """
339 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
340 field_selector="metadata.name={}".format(name)
341 )
342 if len(role_bindings.items) > 0:
343 raise Exception("Generated rbac id already exists")
345 role_binding = V1ClusterRoleBinding(
346 metadata=V1ObjectMeta(name=name, labels=labels),
347 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
348 subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
349 )
350 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
352 async def create_role_binding(
353 self,
354 name: str,
355 role_name: str,
356 sa_name: str,
357 labels: Dict[str, str],
358 namespace: str,
359 ):
360 """
361 Create a cluster role binding
363 :param: name: Name of the namespaced Role Binding
364 :param: role_name: Name of the namespaced Role to be bound
365 :param: sa_name: Name of the Service Account to be bound
366 :param: labels: Labels for Role Binding metadata
367 :param: namespace: Kubernetes namespace for Role Binding metadata
369 :return: None
370 """
371 role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
372 namespace, field_selector="metadata.name={}".format(name)
373 )
374 if len(role_bindings.items) > 0:
375 raise Exception(
376 "Role Binding with metadata.name={} already exists".format(name)
377 )
379 role_binding = V1RoleBinding(
380 metadata=V1ObjectMeta(name=name, labels=labels),
381 role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
382 subjects=[
383 V1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
384 ],
385 )
386 self.clients[RBAC_CLIENT].create_namespaced_role_binding(
387 namespace, role_binding
388 )
390 def delete_cluster_role_binding(self, name: str):
391 """
392 Delete a cluster role binding
394 :param: name: Name of the cluster role binding
395 """
396 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
398 @retry(
399 attempts=10,
400 delay=1,
401 fallback=Exception("Failed getting the secret from service account"),
402 callback=retry_callback,
403 )
404 async def get_secret_data(
405 self, name: str, namespace: str = "kube-system"
406 ) -> (str, str):
407 """
408 Get secret data
410 :param: name: Name of the secret data
411 :param: namespace: Name of the namespace where the secret is stored
413 :return: Tuple with the token and client certificate
414 """
415 v1_core = self.clients[CORE_CLIENT]
417 secret_name = None
419 service_accounts = v1_core.list_namespaced_service_account(
420 namespace, field_selector="metadata.name={}".format(name)
421 )
422 if len(service_accounts.items) == 0:
423 raise Exception(
424 "Service account not found with metadata.name={}".format(name)
425 )
426 service_account = service_accounts.items[0]
427 if service_account.secrets and len(service_account.secrets) > 0:
428 secret_name = service_account.secrets[0].name
429 if not secret_name:
430 raise Exception(
431 "Failed getting the secret from service account {}".format(name)
432 )
433 # TODO: refactor to use get_secret_content
434 secret = v1_core.list_namespaced_secret(
435 namespace, field_selector="metadata.name={}".format(secret_name)
436 ).items[0]
438 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
439 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
441 return (
442 base64.b64decode(token).decode("utf-8"),
443 base64.b64decode(client_certificate_data).decode("utf-8"),
444 )
446 @retry(
447 attempts=10,
448 delay=1,
449 fallback=Exception("Failed getting data from the secret"),
450 )
451 async def get_secret_content(
452 self,
453 name: str,
454 namespace: str,
455 ) -> dict:
456 """
457 Get secret data
459 :param: name: Name of the secret
460 :param: namespace: Name of the namespace where the secret is stored
462 :return: Dictionary with secret's data
463 """
464 v1_core = self.clients[CORE_CLIENT]
466 secret = v1_core.read_namespaced_secret(name, namespace)
468 return secret.data
470 @retry(
471 attempts=10,
472 delay=1,
473 fallback=Exception("Failed creating the secret"),
474 )
475 async def create_secret(
476 self, name: str, data: dict, namespace: str, secret_type: str
477 ):
478 """
479 Get secret data
481 :param: name: Name of the secret
482 :param: data: Dict with data content. Values must be already base64 encoded
483 :param: namespace: Name of the namespace where the secret will be stored
484 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
486 :return: None
487 """
488 v1_core = self.clients[CORE_CLIENT]
489 metadata = V1ObjectMeta(name=name, namespace=namespace)
490 secret = V1Secret(metadata=metadata, data=data, type=secret_type)
491 v1_core.create_namespaced_secret(namespace, secret)
493 async def create_certificate(
494 self,
495 namespace: str,
496 name: str,
497 dns_prefix: str,
498 secret_name: str,
499 usages: list,
500 issuer_name: str,
501 ):
502 """
503 Creates cert-manager certificate object
505 :param: namespace: Name of the namespace where the certificate and secret is stored
506 :param: name: Name of the certificate object
507 :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
508 :param: secret_name: Name of the secret created by cert-manager
509 :param: usages: List of X.509 key usages
510 :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
512 """
513 certificate_body = {
514 "apiVersion": "cert-manager.io/v1",
515 "kind": "Certificate",
516 "metadata": {"name": name, "namespace": namespace},
517 "spec": {
518 "secretName": secret_name,
519 "privateKey": {
520 "rotationPolicy": "Always",
521 "algorithm": "ECDSA",
522 "size": 256,
523 },
524 "duration": "8760h", # 1 Year
525 "renewBefore": "2208h", # 9 months
526 "subject": {"organizations": ["osm"]},
527 "commonName": "osm",
528 "isCA": False,
529 "usages": usages,
530 "dnsNames": [
531 "{}.{}".format(dns_prefix, namespace),
532 "{}.{}.svc".format(dns_prefix, namespace),
533 "{}.{}.svc.cluster".format(dns_prefix, namespace),
534 "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
535 ],
536 "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
537 },
538 }
539 client = self.clients[CUSTOM_OBJECT_CLIENT]
540 try:
541 client.create_namespaced_custom_object(
542 group="cert-manager.io",
543 plural="certificates",
544 version="v1",
545 body=certificate_body,
546 namespace=namespace,
547 )
548 except ApiException as e:
549 info = json.loads(e.body)
550 if info.get("reason").lower() == "alreadyexists":
551 self.logger.warning("Certificate already exists: {}".format(e))
552 else:
553 raise e
555 async def delete_certificate(self, namespace, object_name):
556 client = self.clients[CUSTOM_OBJECT_CLIENT]
557 try:
558 client.delete_namespaced_custom_object(
559 group="cert-manager.io",
560 plural="certificates",
561 version="v1",
562 name=object_name,
563 namespace=namespace,
564 )
565 except ApiException as e:
566 info = json.loads(e.body)
567 if info.get("reason").lower() == "notfound":
568 self.logger.warning("Certificate already deleted: {}".format(e))
569 else:
570 raise e
572 @retry(
573 attempts=10,
574 delay=1,
575 fallback=Exception("Failed creating the namespace"),
576 )
577 async def create_namespace(self, name: str, labels: dict = None):
578 """
579 Create a namespace
581 :param: name: Name of the namespace to be created
582 :param: labels: Dictionary with labels for the new namespace
584 """
585 v1_core = self.clients[CORE_CLIENT]
586 metadata = V1ObjectMeta(name=name, labels=labels)
587 namespace = V1Namespace(
588 metadata=metadata,
589 )
591 try:
592 v1_core.create_namespace(namespace)
593 self.logger.debug("Namespace created: {}".format(name))
594 except ApiException as e:
595 info = json.loads(e.body)
596 if info.get("reason").lower() == "alreadyexists":
597 self.logger.warning("Namespace already exists: {}".format(e))
598 else:
599 raise e
601 @retry(
602 attempts=10,
603 delay=1,
604 fallback=Exception("Failed deleting the namespace"),
605 )
606 async def delete_namespace(self, name: str):
607 """
608 Delete a namespace
610 :param: name: Name of the namespace to be deleted
612 """
613 try:
614 self.clients[CORE_CLIENT].delete_namespace(name)
615 except ApiException as e:
616 if e.reason == "Not Found":
617 self.logger.warning("Namespace already deleted: {}".format(e))