Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / kubectl.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import base64
16 import logging
17 from typing import Dict
18 import typing
19 import uuid
20 import json
21
22 from distutils.version import LooseVersion
23
24 from kubernetes import client, config
25 from kubernetes.client.api import VersionApi
26 from kubernetes.client.models import (
27 V1ClusterRole,
28 V1Role,
29 V1ObjectMeta,
30 V1PolicyRule,
31 V1ServiceAccount,
32 V1ClusterRoleBinding,
33 V1RoleBinding,
34 V1RoleRef,
35 V1Subject,
36 V1Secret,
37 V1SecretReference,
38 V1Namespace,
39 )
40 from kubernetes.client.rest import ApiException
41 from n2vc.libjuju import retry_callback
42 from retrying_async import retry
43
44
45 SERVICE_ACCOUNT_TOKEN_KEY = "token"
46 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
47 # clients
48 CORE_CLIENT = "core_v1"
49 RBAC_CLIENT = "rbac_v1"
50 STORAGE_CLIENT = "storage_v1"
51 CUSTOM_OBJECT_CLIENT = "custom_object"
52
53
54 class Kubectl:
55 def __init__(self, config_file=None):
56 config.load_kube_config(config_file=config_file)
57 self._clients = {
58 CORE_CLIENT: client.CoreV1Api(),
59 RBAC_CLIENT: client.RbacAuthorizationV1Api(),
60 STORAGE_CLIENT: client.StorageV1Api(),
61 CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
62 }
63 self._configuration = config.kube_config.Configuration.get_default_copy()
64 self.logger = logging.getLogger("Kubectl")
65
66 @property
67 def configuration(self):
68 return self._configuration
69
70 @property
71 def clients(self):
72 return self._clients
73
74 def get_services(
75 self,
76 field_selector: str = None,
77 label_selector: str = None,
78 ) -> typing.List[typing.Dict]:
79 """
80 Get Service list from a namespace
81
82 :param: field_selector: Kubernetes field selector for the namespace
83 :param: label_selector: Kubernetes label selector for the namespace
84
85 :return: List of the services matching the selectors specified
86 """
87 kwargs = {}
88 if field_selector:
89 kwargs["field_selector"] = field_selector
90 if label_selector:
91 kwargs["label_selector"] = label_selector
92 try:
93 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
94 return [
95 {
96 "name": i.metadata.name,
97 "cluster_ip": i.spec.cluster_ip,
98 "type": i.spec.type,
99 "ports": [
100 {
101 "name": p.name,
102 "node_port": p.node_port,
103 "port": p.port,
104 "protocol": p.protocol,
105 "target_port": p.target_port,
106 }
107 for p in i.spec.ports
108 ]
109 if i.spec.ports
110 else [],
111 "external_ip": [i.ip for i in i.status.load_balancer.ingress]
112 if i.status.load_balancer.ingress
113 else None,
114 }
115 for i in result.items
116 ]
117 except ApiException as e:
118 self.logger.error("Error calling get services: {}".format(e))
119 raise e
120
121 def get_default_storage_class(self) -> str:
122 """
123 Default storage class
124
125 :return: Returns the default storage class name, if exists.
126 If not, it returns the first storage class.
127 If there are not storage classes, returns None
128 """
129 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
130 selected_sc = None
131 default_sc_annotations = {
132 "storageclass.kubernetes.io/is-default-class": "true",
133 # Older clusters still use the beta annotation.
134 "storageclass.beta.kubernetes.io/is-default-class": "true",
135 }
136 for sc in storage_classes.items:
137 if not selected_sc:
138 # Select the first storage class in case there is no a default-class
139 selected_sc = sc.metadata.name
140 annotations = sc.metadata.annotations or {}
141 if any(
142 k in annotations and annotations[k] == v
143 for k, v in default_sc_annotations.items()
144 ):
145 # Default storage
146 selected_sc = sc.metadata.name
147 break
148 return selected_sc
149
150 def create_cluster_role(
151 self,
152 name: str,
153 labels: Dict[str, str],
154 namespace: str = "kube-system",
155 ):
156 """
157 Create a cluster role
158
159 :param: name: Name of the cluster role
160 :param: labels: Labels for cluster role metadata
161 :param: namespace: Kubernetes namespace for cluster role metadata
162 Default: kube-system
163 """
164 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
165 field_selector="metadata.name={}".format(name)
166 )
167
168 if len(cluster_roles.items) > 0:
169 raise Exception("Role with metadata.name={} already exists".format(name))
170
171 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
172 # Cluster role
173 cluster_role = V1ClusterRole(
174 metadata=metadata,
175 rules=[
176 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
177 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
178 ],
179 )
180
181 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
182
183 async def create_role(
184 self,
185 name: str,
186 labels: Dict[str, str],
187 api_groups: list,
188 resources: list,
189 verbs: list,
190 namespace: str,
191 ):
192 """
193 Create a role with one PolicyRule
194
195 :param: name: Name of the namespaced Role
196 :param: labels: Labels for namespaced Role metadata
197 :param: api_groups: List with api-groups allowed in the policy rule
198 :param: resources: List with resources allowed in the policy rule
199 :param: verbs: List with verbs allowed in the policy rule
200 :param: namespace: Kubernetes namespace for Role metadata
201
202 :return: None
203 """
204
205 roles = self.clients[RBAC_CLIENT].list_namespaced_role(
206 namespace, field_selector="metadata.name={}".format(name)
207 )
208
209 if len(roles.items) > 0:
210 raise Exception("Role with metadata.name={} already exists".format(name))
211
212 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
213
214 role = V1Role(
215 metadata=metadata,
216 rules=[
217 V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
218 ],
219 )
220
221 self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
222
223 def delete_cluster_role(self, name: str):
224 """
225 Delete a cluster role
226
227 :param: name: Name of the cluster role
228 """
229 self.clients[RBAC_CLIENT].delete_cluster_role(name)
230
231 def _get_kubectl_version(self):
232 version = VersionApi().get_code()
233 return "{}.{}".format(version.major, version.minor)
234
235 def _need_to_create_new_secret(self):
236 min_k8s_version = "1.24"
237 current_k8s_version = self._get_kubectl_version()
238 return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
239
240 def _get_secret_name(self, service_account_name: str):
241 random_alphanum = str(uuid.uuid4())[:5]
242 return "{}-token-{}".format(service_account_name, random_alphanum)
243
244 def _create_service_account_secret(
245 self, service_account_name: str, namespace: str, secret_name: str
246 ):
247 """
248 Create a secret for the service account. K8s version >= 1.24
249
250 :param: service_account_name: Name of the service account
251 :param: namespace: Kubernetes namespace for service account metadata
252 :param: secret_name: Name of the secret
253 """
254 v1_core = self.clients[CORE_CLIENT]
255 secrets = v1_core.list_namespaced_secret(
256 namespace, field_selector="metadata.name={}".format(secret_name)
257 ).items
258
259 if len(secrets) > 0:
260 raise Exception(
261 "Secret with metadata.name={} already exists".format(secret_name)
262 )
263
264 annotations = {"kubernetes.io/service-account.name": service_account_name}
265 metadata = V1ObjectMeta(
266 name=secret_name, namespace=namespace, annotations=annotations
267 )
268 type = "kubernetes.io/service-account-token"
269 secret = V1Secret(metadata=metadata, type=type)
270 v1_core.create_namespaced_secret(namespace, secret)
271
272 def _get_secret_reference_list(self, namespace: str, secret_name: str):
273 """
274 Return a secret reference list with one secret.
275 K8s version >= 1.24
276
277 :param: namespace: Kubernetes namespace for service account metadata
278 :param: secret_name: Name of the secret
279 :rtype: list[V1SecretReference]
280 """
281 return [V1SecretReference(name=secret_name, namespace=namespace)]
282
283 def create_service_account(
284 self,
285 name: str,
286 labels: Dict[str, str],
287 namespace: str = "kube-system",
288 ):
289 """
290 Create a service account
291
292 :param: name: Name of the service account
293 :param: labels: Labels for service account metadata
294 :param: namespace: Kubernetes namespace for service account metadata
295 Default: kube-system
296 """
297 v1_core = self.clients[CORE_CLIENT]
298 service_accounts = v1_core.list_namespaced_service_account(
299 namespace, field_selector="metadata.name={}".format(name)
300 )
301 if len(service_accounts.items) > 0:
302 raise Exception(
303 "Service account with metadata.name={} already exists".format(name)
304 )
305
306 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
307
308 if self._need_to_create_new_secret():
309 secret_name = self._get_secret_name(name)
310 secrets = self._get_secret_reference_list(namespace, secret_name)
311 service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
312 v1_core.create_namespaced_service_account(namespace, service_account)
313 self._create_service_account_secret(name, namespace, secret_name)
314 else:
315 service_account = V1ServiceAccount(metadata=metadata)
316 v1_core.create_namespaced_service_account(namespace, service_account)
317
318 def delete_service_account(self, name: str, namespace: str = "kube-system"):
319 """
320 Delete a service account
321
322 :param: name: Name of the service account
323 :param: namespace: Kubernetes namespace for service account metadata
324 Default: kube-system
325 """
326 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
327
328 def create_cluster_role_binding(
329 self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
330 ):
331 """
332 Create a cluster role binding
333
334 :param: name: Name of the cluster role
335 :param: labels: Labels for cluster role binding metadata
336 :param: namespace: Kubernetes namespace for cluster role binding metadata
337 Default: kube-system
338 """
339 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
340 field_selector="metadata.name={}".format(name)
341 )
342 if len(role_bindings.items) > 0:
343 raise Exception("Generated rbac id already exists")
344
345 role_binding = V1ClusterRoleBinding(
346 metadata=V1ObjectMeta(name=name, labels=labels),
347 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
348 subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
349 )
350 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
351
352 async def create_role_binding(
353 self,
354 name: str,
355 role_name: str,
356 sa_name: str,
357 labels: Dict[str, str],
358 namespace: str,
359 ):
360 """
361 Create a cluster role binding
362
363 :param: name: Name of the namespaced Role Binding
364 :param: role_name: Name of the namespaced Role to be bound
365 :param: sa_name: Name of the Service Account to be bound
366 :param: labels: Labels for Role Binding metadata
367 :param: namespace: Kubernetes namespace for Role Binding metadata
368
369 :return: None
370 """
371 role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
372 namespace, field_selector="metadata.name={}".format(name)
373 )
374 if len(role_bindings.items) > 0:
375 raise Exception(
376 "Role Binding with metadata.name={} already exists".format(name)
377 )
378
379 role_binding = V1RoleBinding(
380 metadata=V1ObjectMeta(name=name, labels=labels),
381 role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
382 subjects=[
383 V1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
384 ],
385 )
386 self.clients[RBAC_CLIENT].create_namespaced_role_binding(
387 namespace, role_binding
388 )
389
390 def delete_cluster_role_binding(self, name: str):
391 """
392 Delete a cluster role binding
393
394 :param: name: Name of the cluster role binding
395 """
396 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
397
398 @retry(
399 attempts=10,
400 delay=1,
401 fallback=Exception("Failed getting the secret from service account"),
402 callback=retry_callback,
403 )
404 async def get_secret_data(
405 self, name: str, namespace: str = "kube-system"
406 ) -> (str, str):
407 """
408 Get secret data
409
410 :param: name: Name of the secret data
411 :param: namespace: Name of the namespace where the secret is stored
412
413 :return: Tuple with the token and client certificate
414 """
415 v1_core = self.clients[CORE_CLIENT]
416
417 secret_name = None
418
419 service_accounts = v1_core.list_namespaced_service_account(
420 namespace, field_selector="metadata.name={}".format(name)
421 )
422 if len(service_accounts.items) == 0:
423 raise Exception(
424 "Service account not found with metadata.name={}".format(name)
425 )
426 service_account = service_accounts.items[0]
427 if service_account.secrets and len(service_account.secrets) > 0:
428 secret_name = service_account.secrets[0].name
429 if not secret_name:
430 raise Exception(
431 "Failed getting the secret from service account {}".format(name)
432 )
433 # TODO: refactor to use get_secret_content
434 secret = v1_core.list_namespaced_secret(
435 namespace, field_selector="metadata.name={}".format(secret_name)
436 ).items[0]
437
438 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
439 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
440
441 return (
442 base64.b64decode(token).decode("utf-8"),
443 base64.b64decode(client_certificate_data).decode("utf-8"),
444 )
445
446 @retry(
447 attempts=10,
448 delay=1,
449 fallback=Exception("Failed getting data from the secret"),
450 )
451 async def get_secret_content(
452 self,
453 name: str,
454 namespace: str,
455 ) -> dict:
456 """
457 Get secret data
458
459 :param: name: Name of the secret
460 :param: namespace: Name of the namespace where the secret is stored
461
462 :return: Dictionary with secret's data
463 """
464 v1_core = self.clients[CORE_CLIENT]
465
466 secret = v1_core.read_namespaced_secret(name, namespace)
467
468 return secret.data
469
470 @retry(
471 attempts=10,
472 delay=1,
473 fallback=Exception("Failed creating the secret"),
474 )
475 async def create_secret(
476 self, name: str, data: dict, namespace: str, secret_type: str
477 ):
478 """
479 Get secret data
480
481 :param: name: Name of the secret
482 :param: data: Dict with data content. Values must be already base64 encoded
483 :param: namespace: Name of the namespace where the secret will be stored
484 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
485
486 :return: None
487 """
488 v1_core = self.clients[CORE_CLIENT]
489 metadata = V1ObjectMeta(name=name, namespace=namespace)
490 secret = V1Secret(metadata=metadata, data=data, type=secret_type)
491 v1_core.create_namespaced_secret(namespace, secret)
492
493 async def create_certificate(
494 self,
495 namespace: str,
496 name: str,
497 dns_prefix: str,
498 secret_name: str,
499 usages: list,
500 issuer_name: str,
501 ):
502 """
503 Creates cert-manager certificate object
504
505 :param: namespace: Name of the namespace where the certificate and secret is stored
506 :param: name: Name of the certificate object
507 :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
508 :param: secret_name: Name of the secret created by cert-manager
509 :param: usages: List of X.509 key usages
510 :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
511
512 """
513 certificate_body = {
514 "apiVersion": "cert-manager.io/v1",
515 "kind": "Certificate",
516 "metadata": {"name": name, "namespace": namespace},
517 "spec": {
518 "secretName": secret_name,
519 "privateKey": {
520 "rotationPolicy": "Always",
521 "algorithm": "ECDSA",
522 "size": 256,
523 },
524 "duration": "8760h", # 1 Year
525 "renewBefore": "2208h", # 9 months
526 "subject": {"organizations": ["osm"]},
527 "commonName": "osm",
528 "isCA": False,
529 "usages": usages,
530 "dnsNames": [
531 "{}.{}".format(dns_prefix, namespace),
532 "{}.{}.svc".format(dns_prefix, namespace),
533 "{}.{}.svc.cluster".format(dns_prefix, namespace),
534 "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
535 ],
536 "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
537 },
538 }
539 client = self.clients[CUSTOM_OBJECT_CLIENT]
540 try:
541 client.create_namespaced_custom_object(
542 group="cert-manager.io",
543 plural="certificates",
544 version="v1",
545 body=certificate_body,
546 namespace=namespace,
547 )
548 except ApiException as e:
549 info = json.loads(e.body)
550 if info.get("reason").lower() == "alreadyexists":
551 self.logger.warning("Certificate already exists: {}".format(e))
552 else:
553 raise e
554
555 async def delete_certificate(self, namespace, object_name):
556 client = self.clients[CUSTOM_OBJECT_CLIENT]
557 try:
558 client.delete_namespaced_custom_object(
559 group="cert-manager.io",
560 plural="certificates",
561 version="v1",
562 name=object_name,
563 namespace=namespace,
564 )
565 except ApiException as e:
566 info = json.loads(e.body)
567 if info.get("reason").lower() == "notfound":
568 self.logger.warning("Certificate already deleted: {}".format(e))
569 else:
570 raise e
571
572 @retry(
573 attempts=10,
574 delay=1,
575 fallback=Exception("Failed creating the namespace"),
576 )
577 async def create_namespace(self, name: str, labels: dict = None):
578 """
579 Create a namespace
580
581 :param: name: Name of the namespace to be created
582 :param: labels: Dictionary with labels for the new namespace
583
584 """
585 v1_core = self.clients[CORE_CLIENT]
586 metadata = V1ObjectMeta(name=name, labels=labels)
587 namespace = V1Namespace(
588 metadata=metadata,
589 )
590
591 try:
592 v1_core.create_namespace(namespace)
593 self.logger.debug("Namespace created: {}".format(name))
594 except ApiException as e:
595 info = json.loads(e.body)
596 if info.get("reason").lower() == "alreadyexists":
597 self.logger.warning("Namespace already exists: {}".format(e))
598 else:
599 raise e
600
601 @retry(
602 attempts=10,
603 delay=1,
604 fallback=Exception("Failed deleting the namespace"),
605 )
606 async def delete_namespace(self, name: str):
607 """
608 Delete a namespace
609
610 :param: name: Name of the namespace to be deleted
611
612 """
613 try:
614 self.clients[CORE_CLIENT].delete_namespace(name)
615 except ApiException as e:
616 if e.reason == "Not Found":
617 self.logger.warning("Namespace already deleted: {}".format(e))