Code Coverage

Cobertura Coverage Report > n2vc >

kubectl.py

Trend

File Coverage summary

NameClassesLinesConditionals
kubectl.py
100%
1/1
78%
148/189
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
kubectl.py
78%
148/189
N/A

Source

n2vc/kubectl.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #     Unless required by applicable law or agreed to in writing, software
10 #     distributed under the License is distributed on an "AS IS" BASIS,
11 #     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #     See the License for the specific language governing permissions and
13 #     limitations under the License.
14
15 1 import base64
16 1 import logging
17 1 from typing import Dict
18 1 import typing
19 1 import uuid
20 1 import json
21
22 1 from distutils.version import LooseVersion
23
24 1 from kubernetes import client, config
25 1 from kubernetes.client.api import VersionApi
26 1 from kubernetes.client.models import (
27     V1ClusterRole,
28     V1Role,
29     V1ObjectMeta,
30     V1PolicyRule,
31     V1ServiceAccount,
32     V1ClusterRoleBinding,
33     V1RoleBinding,
34     V1RoleRef,
35     V1Subject,
36     V1Secret,
37     V1SecretReference,
38     V1Namespace,
39 )
40 1 from kubernetes.client.rest import ApiException
41 1 from n2vc.libjuju import retry_callback
42 1 from retrying_async import retry
43
44
45 1 SERVICE_ACCOUNT_TOKEN_KEY = "token"
46 1 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
47 # clients
48 1 CORE_CLIENT = "core_v1"
49 1 RBAC_CLIENT = "rbac_v1"
50 1 STORAGE_CLIENT = "storage_v1"
51 1 CUSTOM_OBJECT_CLIENT = "custom_object"
52
53
54 1 class Kubectl:
55 1     def __init__(self, config_file=None):
56 1         config.load_kube_config(config_file=config_file)
57 1         self._clients = {
58             CORE_CLIENT: client.CoreV1Api(),
59             RBAC_CLIENT: client.RbacAuthorizationV1Api(),
60             STORAGE_CLIENT: client.StorageV1Api(),
61             CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
62         }
63 1         self._configuration = config.kube_config.Configuration.get_default_copy()
64 1         self.logger = logging.getLogger("Kubectl")
65
66 1     @property
67 1     def configuration(self):
68 1         return self._configuration
69
70 1     @property
71 1     def clients(self):
72 1         return self._clients
73
74 1     def get_services(
75         self,
76         field_selector: str = None,
77         label_selector: str = None,
78     ) -> typing.List[typing.Dict]:
79         """
80         Get Service list from a namespace
81
82         :param: field_selector:     Kubernetes field selector for the namespace
83         :param: label_selector:     Kubernetes label selector for the namespace
84
85         :return: List of the services matching the selectors specified
86         """
87 1         kwargs = {}
88 1         if field_selector:
89 1             kwargs["field_selector"] = field_selector
90 1         if label_selector:
91 1             kwargs["label_selector"] = label_selector
92 1         try:
93 1             result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
94 1             return [
95                 {
96                     "name": i.metadata.name,
97                     "cluster_ip": i.spec.cluster_ip,
98                     "type": i.spec.type,
99                     "ports": [
100                         {
101                             "name": p.name,
102                             "node_port": p.node_port,
103                             "port": p.port,
104                             "protocol": p.protocol,
105                             "target_port": p.target_port,
106                         }
107                         for p in i.spec.ports
108                     ]
109                     if i.spec.ports
110                     else [],
111                     "external_ip": [i.ip for i in i.status.load_balancer.ingress]
112                     if i.status.load_balancer.ingress
113                     else None,
114                 }
115                 for i in result.items
116             ]
117 1         except ApiException as e:
118 1             self.logger.error("Error calling get services: {}".format(e))
119 1             raise e
120
121 1     def get_default_storage_class(self) -> str:
122         """
123         Default storage class
124
125         :return:    Returns the default storage class name, if exists.
126                     If not, it returns the first storage class.
127                     If there are not storage classes, returns None
128         """
129 1         storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
130 1         selected_sc = None
131 1         default_sc_annotations = {
132             "storageclass.kubernetes.io/is-default-class": "true",
133             # Older clusters still use the beta annotation.
134             "storageclass.beta.kubernetes.io/is-default-class": "true",
135         }
136 1         for sc in storage_classes.items:
137 1             if not selected_sc:
138                 # Select the first storage class in case there is no a default-class
139 1                 selected_sc = sc.metadata.name
140 1             annotations = sc.metadata.annotations or {}
141 1             if any(
142                 k in annotations and annotations[k] == v
143                 for k, v in default_sc_annotations.items()
144             ):
145                 # Default storage
146 1                 selected_sc = sc.metadata.name
147 1                 break
148 1         return selected_sc
149
150 1     def create_cluster_role(
151         self,
152         name: str,
153         labels: Dict[str, str],
154         namespace: str = "kube-system",
155     ):
156         """
157         Create a cluster role
158
159         :param: name:       Name of the cluster role
160         :param: labels:     Labels for cluster role metadata
161         :param: namespace:  Kubernetes namespace for cluster role metadata
162                             Default: kube-system
163         """
164 0         cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
165             field_selector="metadata.name={}".format(name)
166         )
167
168 0         if len(cluster_roles.items) > 0:
169 0             raise Exception("Role with metadata.name={} already exists".format(name))
170
171 0         metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
172         # Cluster role
173 0         cluster_role = V1ClusterRole(
174             metadata=metadata,
175             rules=[
176                 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
177                 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
178             ],
179         )
180
181 0         self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
182
183 1     async def create_role(
184         self,
185         name: str,
186         labels: Dict[str, str],
187         api_groups: list,
188         resources: list,
189         verbs: list,
190         namespace: str,
191     ):
192         """
193         Create a role with one PolicyRule
194
195         :param: name:       Name of the namespaced Role
196         :param: labels:     Labels for namespaced Role metadata
197         :param: api_groups: List with api-groups allowed in the policy rule
198         :param: resources:  List with resources allowed in the policy rule
199         :param: verbs:      List with verbs allowed in the policy rule
200         :param: namespace:  Kubernetes namespace for Role metadata
201
202         :return: None
203         """
204
205 1         roles = self.clients[RBAC_CLIENT].list_namespaced_role(
206             namespace, field_selector="metadata.name={}".format(name)
207         )
208
209 1         if len(roles.items) > 0:
210 1             raise Exception("Role with metadata.name={} already exists".format(name))
211
212 0         metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
213
214 0         role = V1Role(
215             metadata=metadata,
216             rules=[
217                 V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
218             ],
219         )
220
221 0         self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
222
223 1     def delete_cluster_role(self, name: str):
224         """
225         Delete a cluster role
226
227         :param: name:       Name of the cluster role
228         """
229 0         self.clients[RBAC_CLIENT].delete_cluster_role(name)
230
231 1     def _get_kubectl_version(self):
232 1         version = VersionApi().get_code()
233 1         return "{}.{}".format(version.major, version.minor)
234
235 1     def _need_to_create_new_secret(self):
236 1         min_k8s_version = "1.24"
237 1         current_k8s_version = self._get_kubectl_version()
238 1         return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
239
240 1     def _get_secret_name(self, service_account_name: str):
241 1         random_alphanum = str(uuid.uuid4())[:5]
242 1         return "{}-token-{}".format(service_account_name, random_alphanum)
243
244 1     def _create_service_account_secret(
245         self, service_account_name: str, namespace: str, secret_name: str
246     ):
247         """
248         Create a secret for the service account. K8s version >= 1.24
249
250         :param: service_account_name: Name of the service account
251         :param: namespace:  Kubernetes namespace for service account metadata
252         :param: secret_name: Name of the secret
253         """
254 1         v1_core = self.clients[CORE_CLIENT]
255 1         secrets = v1_core.list_namespaced_secret(
256             namespace, field_selector="metadata.name={}".format(secret_name)
257         ).items
258
259 1         if len(secrets) > 0:
260 1             raise Exception(
261                 "Secret with metadata.name={} already exists".format(secret_name)
262             )
263
264 1         annotations = {"kubernetes.io/service-account.name": service_account_name}
265 1         metadata = V1ObjectMeta(
266             name=secret_name, namespace=namespace, annotations=annotations
267         )
268 1         type = "kubernetes.io/service-account-token"
269 1         secret = V1Secret(metadata=metadata, type=type)
270 1         v1_core.create_namespaced_secret(namespace, secret)
271
272 1     def _get_secret_reference_list(self, namespace: str, secret_name: str):
273         """
274         Return a secret reference list with one secret.
275         K8s version >= 1.24
276
277         :param: namespace:  Kubernetes namespace for service account metadata
278         :param: secret_name: Name of the secret
279         :rtype: list[V1SecretReference]
280         """
281 1         return [V1SecretReference(name=secret_name, namespace=namespace)]
282
283 1     def create_service_account(
284         self,
285         name: str,
286         labels: Dict[str, str],
287         namespace: str = "kube-system",
288     ):
289         """
290         Create a service account
291
292         :param: name:       Name of the service account
293         :param: labels:     Labels for service account metadata
294         :param: namespace:  Kubernetes namespace for service account metadata
295                             Default: kube-system
296         """
297 1         v1_core = self.clients[CORE_CLIENT]
298 1         service_accounts = v1_core.list_namespaced_service_account(
299             namespace, field_selector="metadata.name={}".format(name)
300         )
301 1         if len(service_accounts.items) > 0:
302 1             raise Exception(
303                 "Service account with metadata.name={} already exists".format(name)
304             )
305
306 1         metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
307
308 1         if self._need_to_create_new_secret():
309 1             secret_name = self._get_secret_name(name)
310 1             secrets = self._get_secret_reference_list(namespace, secret_name)
311 1             service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
312 1             v1_core.create_namespaced_service_account(namespace, service_account)
313 1             self._create_service_account_secret(name, namespace, secret_name)
314         else:
315 1             service_account = V1ServiceAccount(metadata=metadata)
316 1             v1_core.create_namespaced_service_account(namespace, service_account)
317
318 1     def delete_service_account(self, name: str, namespace: str = "kube-system"):
319         """
320         Delete a service account
321
322         :param: name:       Name of the service account
323         :param: namespace:  Kubernetes namespace for service account metadata
324                             Default: kube-system
325         """
326 0         self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
327
328 1     def create_cluster_role_binding(
329         self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
330     ):
331         """
332         Create a cluster role binding
333
334         :param: name:       Name of the cluster role
335         :param: labels:     Labels for cluster role binding metadata
336         :param: namespace:  Kubernetes namespace for cluster role binding metadata
337                             Default: kube-system
338         """
339 0         role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
340             field_selector="metadata.name={}".format(name)
341         )
342 0         if len(role_bindings.items) > 0:
343 0             raise Exception("Generated rbac id already exists")
344
345 0         role_binding = V1ClusterRoleBinding(
346             metadata=V1ObjectMeta(name=name, labels=labels),
347             role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
348             subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
349         )
350 0         self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
351
352 1     async def create_role_binding(
353         self,
354         name: str,
355         role_name: str,
356         sa_name: str,
357         labels: Dict[str, str],
358         namespace: str,
359     ):
360         """
361         Create a cluster role binding
362
363         :param: name:       Name of the namespaced Role Binding
364         :param: role_name:  Name of the namespaced Role to be bound
365         :param: sa_name:    Name of the Service Account to be bound
366         :param: labels:     Labels for Role Binding metadata
367         :param: namespace:  Kubernetes namespace for Role Binding metadata
368
369         :return: None
370         """
371 1         role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
372             namespace, field_selector="metadata.name={}".format(name)
373         )
374 1         if len(role_bindings.items) > 0:
375 1             raise Exception(
376                 "Role Binding with metadata.name={} already exists".format(name)
377             )
378
379 0         role_binding = V1RoleBinding(
380             metadata=V1ObjectMeta(name=name, labels=labels),
381             role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
382             subjects=[
383                 V1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
384             ],
385         )
386 0         self.clients[RBAC_CLIENT].create_namespaced_role_binding(
387             namespace, role_binding
388         )
389
390 1     def delete_cluster_role_binding(self, name: str):
391         """
392         Delete a cluster role binding
393
394         :param: name:       Name of the cluster role binding
395         """
396 0         self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
397
398 1     @retry(
399         attempts=10,
400         delay=1,
401         fallback=Exception("Failed getting the secret from service account"),
402         callback=retry_callback,
403     )
404 1     async def get_secret_data(
405         self, name: str, namespace: str = "kube-system"
406     ) -> (str, str):
407         """
408         Get secret data
409
410         :param: name:       Name of the secret data
411         :param: namespace:  Name of the namespace where the secret is stored
412
413         :return: Tuple with the token and client certificate
414         """
415 0         v1_core = self.clients[CORE_CLIENT]
416
417 0         secret_name = None
418
419 0         service_accounts = v1_core.list_namespaced_service_account(
420             namespace, field_selector="metadata.name={}".format(name)
421         )
422 0         if len(service_accounts.items) == 0:
423 0             raise Exception(
424                 "Service account not found with metadata.name={}".format(name)
425             )
426 0         service_account = service_accounts.items[0]
427 0         if service_account.secrets and len(service_account.secrets) > 0:
428 0             secret_name = service_account.secrets[0].name
429 0         if not secret_name:
430 0             raise Exception(
431                 "Failed getting the secret from service account {}".format(name)
432             )
433         # TODO: refactor to use get_secret_content
434 0         secret = v1_core.list_namespaced_secret(
435             namespace, field_selector="metadata.name={}".format(secret_name)
436         ).items[0]
437
438 0         token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
439 0         client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
440
441 0         return (
442             base64.b64decode(token).decode("utf-8"),
443             base64.b64decode(client_certificate_data).decode("utf-8"),
444         )
445
446 1     @retry(
447         attempts=10,
448         delay=1,
449         fallback=Exception("Failed getting data from the secret"),
450     )
451 1     async def get_secret_content(
452         self,
453         name: str,
454         namespace: str,
455     ) -> dict:
456         """
457         Get secret data
458
459         :param: name:       Name of the secret
460         :param: namespace:  Name of the namespace where the secret is stored
461
462         :return: Dictionary with secret's data
463         """
464 1         v1_core = self.clients[CORE_CLIENT]
465
466 1         secret = v1_core.read_namespaced_secret(name, namespace)
467
468 1         return secret.data
469
470 1     @retry(
471         attempts=10,
472         delay=1,
473         fallback=Exception("Failed creating the secret"),
474     )
475 1     async def create_secret(
476         self, name: str, data: dict, namespace: str, secret_type: str
477     ):
478         """
479         Get secret data
480
481         :param: name:        Name of the secret
482         :param: data:        Dict with data content. Values must be already base64 encoded
483         :param: namespace:   Name of the namespace where the secret will be stored
484         :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
485
486         :return: None
487         """
488 0         v1_core = self.clients[CORE_CLIENT]
489 0         metadata = V1ObjectMeta(name=name, namespace=namespace)
490 0         secret = V1Secret(metadata=metadata, data=data, type=secret_type)
491 0         v1_core.create_namespaced_secret(namespace, secret)
492
493 1     async def create_certificate(
494         self,
495         namespace: str,
496         name: str,
497         dns_prefix: str,
498         secret_name: str,
499         usages: list,
500         issuer_name: str,
501     ):
502         """
503         Creates cert-manager certificate object
504
505         :param: namespace:       Name of the namespace where the certificate and secret is stored
506         :param: name:            Name of the certificate object
507         :param: dns_prefix:      Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
508         :param: secret_name:     Name of the secret created by cert-manager
509         :param: usages:          List of X.509 key usages
510         :param: issuer_name:     Name of the cert-manager's Issuer or ClusterIssuer object
511
512         """
513 1         certificate_body = {
514             "apiVersion": "cert-manager.io/v1",
515             "kind": "Certificate",
516             "metadata": {"name": name, "namespace": namespace},
517             "spec": {
518                 "secretName": secret_name,
519                 "privateKey": {
520                     "rotationPolicy": "Always",
521                     "algorithm": "ECDSA",
522                     "size": 256,
523                 },
524                 "duration": "8760h",  # 1 Year
525                 "renewBefore": "2208h",  # 9 months
526                 "subject": {"organizations": ["osm"]},
527                 "commonName": "osm",
528                 "isCA": False,
529                 "usages": usages,
530                 "dnsNames": [
531                     "{}.{}".format(dns_prefix, namespace),
532                     "{}.{}.svc".format(dns_prefix, namespace),
533                     "{}.{}.svc.cluster".format(dns_prefix, namespace),
534                     "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
535                 ],
536                 "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
537             },
538         }
539 1         client = self.clients[CUSTOM_OBJECT_CLIENT]
540 1         try:
541 1             client.create_namespaced_custom_object(
542                 group="cert-manager.io",
543                 plural="certificates",
544                 version="v1",
545                 body=certificate_body,
546                 namespace=namespace,
547             )
548 1         except ApiException as e:
549 1             info = json.loads(e.body)
550 1             if info.get("reason").lower() == "alreadyexists":
551 1                 self.logger.warning("Certificate already exists: {}".format(e))
552             else:
553 0                 raise e
554
555 1     async def delete_certificate(self, namespace, object_name):
556 1         client = self.clients[CUSTOM_OBJECT_CLIENT]
557 1         try:
558 1             client.delete_namespaced_custom_object(
559                 group="cert-manager.io",
560                 plural="certificates",
561                 version="v1",
562                 name=object_name,
563                 namespace=namespace,
564             )
565 1         except ApiException as e:
566 1             info = json.loads(e.body)
567 1             if info.get("reason").lower() == "notfound":
568 1                 self.logger.warning("Certificate already deleted: {}".format(e))
569             else:
570 0                 raise e
571
572 1     @retry(
573         attempts=10,
574         delay=1,
575         fallback=Exception("Failed creating the namespace"),
576     )
577 1     async def create_namespace(self, name: str, labels: dict = None):
578         """
579         Create a namespace
580
581         :param: name:       Name of the namespace to be created
582         :param: labels:     Dictionary with labels for the new namespace
583
584         """
585 1         v1_core = self.clients[CORE_CLIENT]
586 1         metadata = V1ObjectMeta(name=name, labels=labels)
587 1         namespace = V1Namespace(
588             metadata=metadata,
589         )
590
591 1         try:
592 1             v1_core.create_namespace(namespace)
593 1             self.logger.debug("Namespace created: {}".format(name))
594 1         except ApiException as e:
595 1             info = json.loads(e.body)
596 1             if info.get("reason").lower() == "alreadyexists":
597 1                 self.logger.warning("Namespace already exists: {}".format(e))
598             else:
599 0                 raise e
600
601 1     @retry(
602         attempts=10,
603         delay=1,
604         fallback=Exception("Failed deleting the namespace"),
605     )
606 1     async def delete_namespace(self, name: str):
607         """
608         Delete a namespace
609
610         :param: name:       Name of the namespace to be deleted
611
612         """
613 1         try:
614 1             self.clients[CORE_CLIENT].delete_namespace(name)
615 1         except ApiException as e:
616 1             if e.reason == "Not Found":
617 0                 self.logger.warning("Namespace already deleted: {}".format(e))