Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / kubectl.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import base64
16 import logging
17 from typing import Dict
18 import typing
19 import uuid
20 import json
21
22 from distutils.version import LooseVersion
23
24 from kubernetes import client, config
25 from kubernetes.client.api import VersionApi
26 from kubernetes.client.models import (
27 V1ClusterRole,
28 V1ObjectMeta,
29 V1PolicyRule,
30 V1ServiceAccount,
31 V1ClusterRoleBinding,
32 V1RoleRef,
33 V1Subject,
34 V1Secret,
35 V1SecretReference,
36 )
37 from kubernetes.client.rest import ApiException
38 from retrying_async import retry
39
40
41 SERVICE_ACCOUNT_TOKEN_KEY = "token"
42 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
43 # clients
44 CORE_CLIENT = "core_v1"
45 RBAC_CLIENT = "rbac_v1"
46 STORAGE_CLIENT = "storage_v1"
47 CUSTOM_OBJECT_CLIENT = "custom_object"
48
49
50 class Kubectl:
51 def __init__(self, config_file=None):
52 config.load_kube_config(config_file=config_file)
53 self._clients = {
54 CORE_CLIENT: client.CoreV1Api(),
55 RBAC_CLIENT: client.RbacAuthorizationV1Api(),
56 STORAGE_CLIENT: client.StorageV1Api(),
57 CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
58 }
59 self._configuration = config.kube_config.Configuration.get_default_copy()
60 self.logger = logging.getLogger("Kubectl")
61
62 @property
63 def configuration(self):
64 return self._configuration
65
66 @property
67 def clients(self):
68 return self._clients
69
70 def get_services(
71 self,
72 field_selector: str = None,
73 label_selector: str = None,
74 ) -> typing.List[typing.Dict]:
75 """
76 Get Service list from a namespace
77
78 :param: field_selector: Kubernetes field selector for the namespace
79 :param: label_selector: Kubernetes label selector for the namespace
80
81 :return: List of the services matching the selectors specified
82 """
83 kwargs = {}
84 if field_selector:
85 kwargs["field_selector"] = field_selector
86 if label_selector:
87 kwargs["label_selector"] = label_selector
88 try:
89 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
90 return [
91 {
92 "name": i.metadata.name,
93 "cluster_ip": i.spec.cluster_ip,
94 "type": i.spec.type,
95 "ports": [
96 {
97 "name": p.name,
98 "node_port": p.node_port,
99 "port": p.port,
100 "protocol": p.protocol,
101 "target_port": p.target_port,
102 }
103 for p in i.spec.ports
104 ]
105 if i.spec.ports
106 else [],
107 "external_ip": [i.ip for i in i.status.load_balancer.ingress]
108 if i.status.load_balancer.ingress
109 else None,
110 }
111 for i in result.items
112 ]
113 except ApiException as e:
114 self.logger.error("Error calling get services: {}".format(e))
115 raise e
116
117 def get_default_storage_class(self) -> str:
118 """
119 Default storage class
120
121 :return: Returns the default storage class name, if exists.
122 If not, it returns the first storage class.
123 If there are not storage classes, returns None
124 """
125 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
126 selected_sc = None
127 default_sc_annotations = {
128 "storageclass.kubernetes.io/is-default-class": "true",
129 # Older clusters still use the beta annotation.
130 "storageclass.beta.kubernetes.io/is-default-class": "true",
131 }
132 for sc in storage_classes.items:
133 if not selected_sc:
134 # Select the first storage class in case there is no a default-class
135 selected_sc = sc.metadata.name
136 annotations = sc.metadata.annotations or {}
137 if any(
138 k in annotations and annotations[k] == v
139 for k, v in default_sc_annotations.items()
140 ):
141 # Default storage
142 selected_sc = sc.metadata.name
143 break
144 return selected_sc
145
146 def create_cluster_role(
147 self,
148 name: str,
149 labels: Dict[str, str],
150 namespace: str = "kube-system",
151 ):
152 """
153 Create a cluster role
154
155 :param: name: Name of the cluster role
156 :param: labels: Labels for cluster role metadata
157 :param: namespace: Kubernetes namespace for cluster role metadata
158 Default: kube-system
159 """
160 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
161 field_selector="metadata.name={}".format(name)
162 )
163
164 if len(cluster_roles.items) > 0:
165 raise Exception(
166 "Cluster role with metadata.name={} already exists".format(name)
167 )
168
169 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
170 # Cluster role
171 cluster_role = V1ClusterRole(
172 metadata=metadata,
173 rules=[
174 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
175 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
176 ],
177 )
178
179 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
180
181 def delete_cluster_role(self, name: str):
182 """
183 Delete a cluster role
184
185 :param: name: Name of the cluster role
186 """
187 self.clients[RBAC_CLIENT].delete_cluster_role(name)
188
189 def _get_kubectl_version(self):
190 version = VersionApi().get_code()
191 return "{}.{}".format(version.major, version.minor)
192
193 def _need_to_create_new_secret(self):
194 min_k8s_version = "1.24"
195 current_k8s_version = self._get_kubectl_version()
196 return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
197
198 def _get_secret_name(self, service_account_name: str):
199 random_alphanum = str(uuid.uuid4())[:5]
200 return "{}-token-{}".format(service_account_name, random_alphanum)
201
202 def _create_service_account_secret(
203 self, service_account_name: str, namespace: str, secret_name: str
204 ):
205 """
206 Create a secret for the service account. K8s version >= 1.24
207
208 :param: service_account_name: Name of the service account
209 :param: namespace: Kubernetes namespace for service account metadata
210 :param: secret_name: Name of the secret
211 """
212 v1_core = self.clients[CORE_CLIENT]
213 secrets = v1_core.list_namespaced_secret(
214 namespace, field_selector="metadata.name={}".format(secret_name)
215 ).items
216
217 if len(secrets) > 0:
218 raise Exception(
219 "Secret with metadata.name={} already exists".format(secret_name)
220 )
221
222 annotations = {"kubernetes.io/service-account.name": service_account_name}
223 metadata = V1ObjectMeta(
224 name=secret_name, namespace=namespace, annotations=annotations
225 )
226 type = "kubernetes.io/service-account-token"
227 secret = V1Secret(metadata=metadata, type=type)
228 v1_core.create_namespaced_secret(namespace, secret)
229
230 def _get_secret_reference_list(self, namespace: str, secret_name: str):
231 """
232 Return a secret reference list with one secret.
233 K8s version >= 1.24
234
235 :param: namespace: Kubernetes namespace for service account metadata
236 :param: secret_name: Name of the secret
237 :rtype: list[V1SecretReference]
238 """
239 return [V1SecretReference(name=secret_name, namespace=namespace)]
240
241 def create_service_account(
242 self,
243 name: str,
244 labels: Dict[str, str],
245 namespace: str = "kube-system",
246 ):
247 """
248 Create a service account
249
250 :param: name: Name of the service account
251 :param: labels: Labels for service account metadata
252 :param: namespace: Kubernetes namespace for service account metadata
253 Default: kube-system
254 """
255 v1_core = self.clients[CORE_CLIENT]
256 service_accounts = v1_core.list_namespaced_service_account(
257 namespace, field_selector="metadata.name={}".format(name)
258 )
259 if len(service_accounts.items) > 0:
260 raise Exception(
261 "Service account with metadata.name={} already exists".format(name)
262 )
263
264 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
265
266 if self._need_to_create_new_secret():
267 secret_name = self._get_secret_name(name)
268 secrets = self._get_secret_reference_list(namespace, secret_name)
269 service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
270 v1_core.create_namespaced_service_account(namespace, service_account)
271 self._create_service_account_secret(name, namespace, secret_name)
272 else:
273 service_account = V1ServiceAccount(metadata=metadata)
274 v1_core.create_namespaced_service_account(namespace, service_account)
275
276 def delete_service_account(self, name: str, namespace: str = "kube-system"):
277 """
278 Delete a service account
279
280 :param: name: Name of the service account
281 :param: namespace: Kubernetes namespace for service account metadata
282 Default: kube-system
283 """
284 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
285
286 def create_cluster_role_binding(
287 self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
288 ):
289 """
290 Create a cluster role binding
291
292 :param: name: Name of the cluster role
293 :param: labels: Labels for cluster role binding metadata
294 :param: namespace: Kubernetes namespace for cluster role binding metadata
295 Default: kube-system
296 """
297 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
298 field_selector="metadata.name={}".format(name)
299 )
300 if len(role_bindings.items) > 0:
301 raise Exception("Generated rbac id already exists")
302
303 role_binding = V1ClusterRoleBinding(
304 metadata=V1ObjectMeta(name=name, labels=labels),
305 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
306 subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
307 )
308 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
309
310 def delete_cluster_role_binding(self, name: str):
311 """
312 Delete a cluster role binding
313
314 :param: name: Name of the cluster role binding
315 """
316 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
317
318 @retry(
319 attempts=10,
320 delay=1,
321 fallback=Exception("Failed getting the secret from service account"),
322 )
323 async def get_secret_data(
324 self, name: str, namespace: str = "kube-system"
325 ) -> (str, str):
326 """
327 Get secret data
328
329 :param: name: Name of the secret data
330 :param: namespace: Name of the namespace where the secret is stored
331
332 :return: Tuple with the token and client certificate
333 """
334 v1_core = self.clients[CORE_CLIENT]
335
336 secret_name = None
337
338 service_accounts = v1_core.list_namespaced_service_account(
339 namespace, field_selector="metadata.name={}".format(name)
340 )
341 if len(service_accounts.items) == 0:
342 raise Exception(
343 "Service account not found with metadata.name={}".format(name)
344 )
345 service_account = service_accounts.items[0]
346 if service_account.secrets and len(service_account.secrets) > 0:
347 secret_name = service_account.secrets[0].name
348 if not secret_name:
349 raise Exception(
350 "Failed getting the secret from service account {}".format(name)
351 )
352 secret = v1_core.list_namespaced_secret(
353 namespace, field_selector="metadata.name={}".format(secret_name)
354 ).items[0]
355
356 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
357 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
358
359 return (
360 base64.b64decode(token).decode("utf-8"),
361 base64.b64decode(client_certificate_data).decode("utf-8"),
362 )
363
364 async def create_certificate(
365 self,
366 namespace: str,
367 name: str,
368 dns_prefix: str,
369 secret_name: str,
370 usages: list,
371 issuer_name: str,
372 ):
373 """
374 Creates cert-manager certificate object
375
376 :param: namespace: Name of the namespace where the certificate and secret is stored
377 :param: name: Name of the certificate object
378 :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
379 :param: secret_name: Name of the secret created by cert-manager
380 :param: usages: List of X.509 key usages
381 :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
382
383 """
384 certificate_body = {
385 "apiVersion": "cert-manager.io/v1",
386 "kind": "Certificate",
387 "metadata": {"name": name, "namespace": namespace},
388 "spec": {
389 "secretName": secret_name,
390 "privateKey": {
391 "rotationPolicy": "Always",
392 "algorithm": "ECDSA",
393 "size": 256,
394 },
395 "duration": "8760h", # 1 Year
396 "renewBefore": "2208h", # 9 months
397 "subject": {"organizations": ["osm"]},
398 "commonName": "osm",
399 "isCA": False,
400 "usages": usages,
401 "dnsNames": [
402 "{}.{}".format(dns_prefix, namespace),
403 "{}.{}.svc".format(dns_prefix, namespace),
404 "{}.{}.svc.cluster".format(dns_prefix, namespace),
405 "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
406 ],
407 "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
408 },
409 }
410 client = self.clients[CUSTOM_OBJECT_CLIENT]
411 try:
412 client.create_namespaced_custom_object(
413 group="cert-manager.io",
414 plural="certificates",
415 version="v1",
416 body=certificate_body,
417 namespace=namespace,
418 )
419 except ApiException as e:
420 info = json.loads(e.body)
421 if info.get("reason").lower() == "alreadyexists":
422 self.logger.warning("Certificate already exists: {}".format(e))
423 else:
424 raise e
425
426 async def delete_certificate(self, namespace, object_name):
427 client = self.clients[CUSTOM_OBJECT_CLIENT]
428 try:
429 client.delete_namespaced_custom_object(
430 group="cert-manager.io",
431 plural="certificates",
432 version="v1",
433 name=object_name,
434 namespace=namespace,
435 )
436 except ApiException as e:
437 info = json.loads(e.body)
438 if info.get("reason").lower() == "notfound":
439 self.logger.warning("Certificate already deleted: {}".format(e))
440 else:
441 raise e