Code Coverage

Cobertura Coverage Report > n2vc >

kubectl.py

Trend

File Coverage summary

NameClassesLinesConditionals
kubectl.py
100%
1/1
79%
114/144
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
kubectl.py
79%
114/144
N/A

Source

n2vc/kubectl.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #     Unless required by applicable law or agreed to in writing, software
10 #     distributed under the License is distributed on an "AS IS" BASIS,
11 #     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #     See the License for the specific language governing permissions and
13 #     limitations under the License.
14
15 1 import base64
16 1 import logging
17 1 from typing import Dict
18 1 import typing
19 1 import uuid
20 1 import json
21
22 1 from distutils.version import LooseVersion
23
24 1 from kubernetes import client, config
25 1 from kubernetes.client.api import VersionApi
26 1 from kubernetes.client.models import (
27     V1ClusterRole,
28     V1ObjectMeta,
29     V1PolicyRule,
30     V1ServiceAccount,
31     V1ClusterRoleBinding,
32     V1RoleRef,
33     V1Subject,
34     V1Secret,
35     V1SecretReference,
36 )
37 1 from kubernetes.client.rest import ApiException
38 1 from retrying_async import retry
39
40
41 1 SERVICE_ACCOUNT_TOKEN_KEY = "token"
42 1 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
43 # clients
44 1 CORE_CLIENT = "core_v1"
45 1 RBAC_CLIENT = "rbac_v1"
46 1 STORAGE_CLIENT = "storage_v1"
47 1 CUSTOM_OBJECT_CLIENT = "custom_object"
48
49
50 1 class Kubectl:
51 1     def __init__(self, config_file=None):
52 1         config.load_kube_config(config_file=config_file)
53 1         self._clients = {
54             CORE_CLIENT: client.CoreV1Api(),
55             RBAC_CLIENT: client.RbacAuthorizationV1Api(),
56             STORAGE_CLIENT: client.StorageV1Api(),
57             CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
58         }
59 1         self._configuration = config.kube_config.Configuration.get_default_copy()
60 1         self.logger = logging.getLogger("Kubectl")
61
62 1     @property
63 1     def configuration(self):
64 1         return self._configuration
65
66 1     @property
67 1     def clients(self):
68 1         return self._clients
69
70 1     def get_services(
71         self,
72         field_selector: str = None,
73         label_selector: str = None,
74     ) -> typing.List[typing.Dict]:
75         """
76         Get Service list from a namespace
77
78         :param: field_selector:     Kubernetes field selector for the namespace
79         :param: label_selector:     Kubernetes label selector for the namespace
80
81         :return: List of the services matching the selectors specified
82         """
83 1         kwargs = {}
84 1         if field_selector:
85 1             kwargs["field_selector"] = field_selector
86 1         if label_selector:
87 1             kwargs["label_selector"] = label_selector
88 1         try:
89 1             result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
90 1             return [
91                 {
92                     "name": i.metadata.name,
93                     "cluster_ip": i.spec.cluster_ip,
94                     "type": i.spec.type,
95                     "ports": [
96                         {
97                             "name": p.name,
98                             "node_port": p.node_port,
99                             "port": p.port,
100                             "protocol": p.protocol,
101                             "target_port": p.target_port,
102                         }
103                         for p in i.spec.ports
104                     ]
105                     if i.spec.ports
106                     else [],
107                     "external_ip": [i.ip for i in i.status.load_balancer.ingress]
108                     if i.status.load_balancer.ingress
109                     else None,
110                 }
111                 for i in result.items
112             ]
113 1         except ApiException as e:
114 1             self.logger.error("Error calling get services: {}".format(e))
115 1             raise e
116
117 1     def get_default_storage_class(self) -> str:
118         """
119         Default storage class
120
121         :return:    Returns the default storage class name, if exists.
122                     If not, it returns the first storage class.
123                     If there are not storage classes, returns None
124         """
125 1         storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
126 1         selected_sc = None
127 1         default_sc_annotations = {
128             "storageclass.kubernetes.io/is-default-class": "true",
129             # Older clusters still use the beta annotation.
130             "storageclass.beta.kubernetes.io/is-default-class": "true",
131         }
132 1         for sc in storage_classes.items:
133 1             if not selected_sc:
134                 # Select the first storage class in case there is no a default-class
135 1                 selected_sc = sc.metadata.name
136 1             annotations = sc.metadata.annotations or {}
137 1             if any(
138                 k in annotations and annotations[k] == v
139                 for k, v in default_sc_annotations.items()
140             ):
141                 # Default storage
142 1                 selected_sc = sc.metadata.name
143 1                 break
144 1         return selected_sc
145
146 1     def create_cluster_role(
147         self,
148         name: str,
149         labels: Dict[str, str],
150         namespace: str = "kube-system",
151     ):
152         """
153         Create a cluster role
154
155         :param: name:       Name of the cluster role
156         :param: labels:     Labels for cluster role metadata
157         :param: namespace:  Kubernetes namespace for cluster role metadata
158                             Default: kube-system
159         """
160 0         cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
161             field_selector="metadata.name={}".format(name)
162         )
163
164 0         if len(cluster_roles.items) > 0:
165 0             raise Exception(
166                 "Cluster role with metadata.name={} already exists".format(name)
167             )
168
169 0         metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
170         # Cluster role
171 0         cluster_role = V1ClusterRole(
172             metadata=metadata,
173             rules=[
174                 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
175                 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
176             ],
177         )
178
179 0         self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
180
181 1     def delete_cluster_role(self, name: str):
182         """
183         Delete a cluster role
184
185         :param: name:       Name of the cluster role
186         """
187 0         self.clients[RBAC_CLIENT].delete_cluster_role(name)
188
189 1     def _get_kubectl_version(self):
190 1         version = VersionApi().get_code()
191 1         return "{}.{}".format(version.major, version.minor)
192
193 1     def _need_to_create_new_secret(self):
194 1         min_k8s_version = "1.24"
195 1         current_k8s_version = self._get_kubectl_version()
196 1         return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
197
198 1     def _get_secret_name(self, service_account_name: str):
199 1         random_alphanum = str(uuid.uuid4())[:5]
200 1         return "{}-token-{}".format(service_account_name, random_alphanum)
201
202 1     def _create_service_account_secret(
203         self, service_account_name: str, namespace: str, secret_name: str
204     ):
205         """
206         Create a secret for the service account. K8s version >= 1.24
207
208         :param: service_account_name: Name of the service account
209         :param: namespace:  Kubernetes namespace for service account metadata
210         :param: secret_name: Name of the secret
211         """
212 1         v1_core = self.clients[CORE_CLIENT]
213 1         secrets = v1_core.list_namespaced_secret(
214             namespace, field_selector="metadata.name={}".format(secret_name)
215         ).items
216
217 1         if len(secrets) > 0:
218 1             raise Exception(
219                 "Secret with metadata.name={} already exists".format(secret_name)
220             )
221
222 1         annotations = {"kubernetes.io/service-account.name": service_account_name}
223 1         metadata = V1ObjectMeta(
224             name=secret_name, namespace=namespace, annotations=annotations
225         )
226 1         type = "kubernetes.io/service-account-token"
227 1         secret = V1Secret(metadata=metadata, type=type)
228 1         v1_core.create_namespaced_secret(namespace, secret)
229
230 1     def _get_secret_reference_list(self, namespace: str, secret_name: str):
231         """
232         Return a secret reference list with one secret.
233         K8s version >= 1.24
234
235         :param: namespace:  Kubernetes namespace for service account metadata
236         :param: secret_name: Name of the secret
237         :rtype: list[V1SecretReference]
238         """
239 1         return [V1SecretReference(name=secret_name, namespace=namespace)]
240
241 1     def create_service_account(
242         self,
243         name: str,
244         labels: Dict[str, str],
245         namespace: str = "kube-system",
246     ):
247         """
248         Create a service account
249
250         :param: name:       Name of the service account
251         :param: labels:     Labels for service account metadata
252         :param: namespace:  Kubernetes namespace for service account metadata
253                             Default: kube-system
254         """
255 1         v1_core = self.clients[CORE_CLIENT]
256 1         service_accounts = v1_core.list_namespaced_service_account(
257             namespace, field_selector="metadata.name={}".format(name)
258         )
259 1         if len(service_accounts.items) > 0:
260 1             raise Exception(
261                 "Service account with metadata.name={} already exists".format(name)
262             )
263
264 1         metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
265
266 1         if self._need_to_create_new_secret():
267 1             secret_name = self._get_secret_name(name)
268 1             secrets = self._get_secret_reference_list(namespace, secret_name)
269 1             service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
270 1             v1_core.create_namespaced_service_account(namespace, service_account)
271 1             self._create_service_account_secret(name, namespace, secret_name)
272         else:
273 1             service_account = V1ServiceAccount(metadata=metadata)
274 1             v1_core.create_namespaced_service_account(namespace, service_account)
275
276 1     def delete_service_account(self, name: str, namespace: str = "kube-system"):
277         """
278         Delete a service account
279
280         :param: name:       Name of the service account
281         :param: namespace:  Kubernetes namespace for service account metadata
282                             Default: kube-system
283         """
284 0         self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
285
286 1     def create_cluster_role_binding(
287         self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
288     ):
289         """
290         Create a cluster role binding
291
292         :param: name:       Name of the cluster role
293         :param: labels:     Labels for cluster role binding metadata
294         :param: namespace:  Kubernetes namespace for cluster role binding metadata
295                             Default: kube-system
296         """
297 0         role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
298             field_selector="metadata.name={}".format(name)
299         )
300 0         if len(role_bindings.items) > 0:
301 0             raise Exception("Generated rbac id already exists")
302
303 0         role_binding = V1ClusterRoleBinding(
304             metadata=V1ObjectMeta(name=name, labels=labels),
305             role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
306             subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
307         )
308 0         self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
309
310 1     def delete_cluster_role_binding(self, name: str):
311         """
312         Delete a cluster role binding
313
314         :param: name:       Name of the cluster role binding
315         """
316 0         self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
317
318 1     @retry(
319         attempts=10,
320         delay=1,
321         fallback=Exception("Failed getting the secret from service account"),
322     )
323 1     async def get_secret_data(
324         self, name: str, namespace: str = "kube-system"
325     ) -> (str, str):
326         """
327         Get secret data
328
329         :param: name:       Name of the secret data
330         :param: namespace:  Name of the namespace where the secret is stored
331
332         :return: Tuple with the token and client certificate
333         """
334 0         v1_core = self.clients[CORE_CLIENT]
335
336 0         secret_name = None
337
338 0         service_accounts = v1_core.list_namespaced_service_account(
339             namespace, field_selector="metadata.name={}".format(name)
340         )
341 0         if len(service_accounts.items) == 0:
342 0             raise Exception(
343                 "Service account not found with metadata.name={}".format(name)
344             )
345 0         service_account = service_accounts.items[0]
346 0         if service_account.secrets and len(service_account.secrets) > 0:
347 0             secret_name = service_account.secrets[0].name
348 0         if not secret_name:
349 0             raise Exception(
350                 "Failed getting the secret from service account {}".format(name)
351             )
352 0         secret = v1_core.list_namespaced_secret(
353             namespace, field_selector="metadata.name={}".format(secret_name)
354         ).items[0]
355
356 0         token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
357 0         client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
358
359 0         return (
360             base64.b64decode(token).decode("utf-8"),
361             base64.b64decode(client_certificate_data).decode("utf-8"),
362         )
363
364 1     async def create_certificate(
365         self,
366         namespace: str,
367         name: str,
368         dns_prefix: str,
369         secret_name: str,
370         usages: list,
371         issuer_name: str,
372     ):
373         """
374         Creates cert-manager certificate object
375
376         :param: namespace:       Name of the namespace where the certificate and secret is stored
377         :param: name:            Name of the certificate object
378         :param: dns_prefix:      Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
379         :param: secret_name:     Name of the secret created by cert-manager
380         :param: usages:          List of X.509 key usages
381         :param: issuer_name:     Name of the cert-manager's Issuer or ClusterIssuer object
382
383         """
384 1         certificate_body = {
385             "apiVersion": "cert-manager.io/v1",
386             "kind": "Certificate",
387             "metadata": {"name": name, "namespace": namespace},
388             "spec": {
389                 "secretName": secret_name,
390                 "privateKey": {
391                     "rotationPolicy": "Always",
392                     "algorithm": "ECDSA",
393                     "size": 256,
394                 },
395                 "duration": "8760h",  # 1 Year
396                 "renewBefore": "2208h",  # 9 months
397                 "subject": {"organizations": ["osm"]},
398                 "commonName": "osm",
399                 "isCA": False,
400                 "usages": usages,
401                 "dnsNames": [
402                     "{}.{}".format(dns_prefix, namespace),
403                     "{}.{}.svc".format(dns_prefix, namespace),
404                     "{}.{}.svc.cluster".format(dns_prefix, namespace),
405                     "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
406                 ],
407                 "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
408             },
409         }
410 1         client = self.clients[CUSTOM_OBJECT_CLIENT]
411 1         try:
412 1             client.create_namespaced_custom_object(
413                 group="cert-manager.io",
414                 plural="certificates",
415                 version="v1",
416                 body=certificate_body,
417                 namespace=namespace,
418             )
419 1         except ApiException as e:
420 1             info = json.loads(e.body)
421 1             if info.get("reason").lower() == "alreadyexists":
422 1                 self.logger.warning("Certificate already exists: {}".format(e))
423             else:
424 0                 raise e
425
426 1     async def delete_certificate(self, namespace, object_name):
427 1         client = self.clients[CUSTOM_OBJECT_CLIENT]
428 1         try:
429 1             client.delete_namespaced_custom_object(
430                 group="cert-manager.io",
431                 plural="certificates",
432                 version="v1",
433                 name=object_name,
434                 namespace=namespace,
435             )
436 1         except ApiException as e:
437 1             info = json.loads(e.body)
438 1             if info.get("reason").lower() == "notfound":
439 1                 self.logger.warning("Certificate already deleted: {}".format(e))
440             else:
441 0                 raise e