Wrapping Retry for Py3.10
[osm/N2VC.git] / n2vc / kubectl.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import base64
16 import logging
17 from typing import Dict
18 import typing
19 import uuid
20 import json
21
22 from distutils.version import LooseVersion
23
24 from kubernetes import client, config
25 from kubernetes.client.api import VersionApi
26 from kubernetes.client.models import (
27 V1ClusterRole,
28 V1ObjectMeta,
29 V1PolicyRule,
30 V1ServiceAccount,
31 V1ClusterRoleBinding,
32 V1RoleRef,
33 V1Subject,
34 V1Secret,
35 V1SecretReference,
36 )
37 from kubernetes.client.rest import ApiException
38 from n2vc.libjuju import retry_callback
39 from retrying_async import retry
40
41
42 SERVICE_ACCOUNT_TOKEN_KEY = "token"
43 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
44 # clients
45 CORE_CLIENT = "core_v1"
46 RBAC_CLIENT = "rbac_v1"
47 STORAGE_CLIENT = "storage_v1"
48 CUSTOM_OBJECT_CLIENT = "custom_object"
49
50
51 class Kubectl:
52 def __init__(self, config_file=None):
53 config.load_kube_config(config_file=config_file)
54 self._clients = {
55 CORE_CLIENT: client.CoreV1Api(),
56 RBAC_CLIENT: client.RbacAuthorizationV1Api(),
57 STORAGE_CLIENT: client.StorageV1Api(),
58 CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
59 }
60 self._configuration = config.kube_config.Configuration.get_default_copy()
61 self.logger = logging.getLogger("Kubectl")
62
63 @property
64 def configuration(self):
65 return self._configuration
66
67 @property
68 def clients(self):
69 return self._clients
70
71 def get_services(
72 self,
73 field_selector: str = None,
74 label_selector: str = None,
75 ) -> typing.List[typing.Dict]:
76 """
77 Get Service list from a namespace
78
79 :param: field_selector: Kubernetes field selector for the namespace
80 :param: label_selector: Kubernetes label selector for the namespace
81
82 :return: List of the services matching the selectors specified
83 """
84 kwargs = {}
85 if field_selector:
86 kwargs["field_selector"] = field_selector
87 if label_selector:
88 kwargs["label_selector"] = label_selector
89 try:
90 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
91 return [
92 {
93 "name": i.metadata.name,
94 "cluster_ip": i.spec.cluster_ip,
95 "type": i.spec.type,
96 "ports": [
97 {
98 "name": p.name,
99 "node_port": p.node_port,
100 "port": p.port,
101 "protocol": p.protocol,
102 "target_port": p.target_port,
103 }
104 for p in i.spec.ports
105 ]
106 if i.spec.ports
107 else [],
108 "external_ip": [i.ip for i in i.status.load_balancer.ingress]
109 if i.status.load_balancer.ingress
110 else None,
111 }
112 for i in result.items
113 ]
114 except ApiException as e:
115 self.logger.error("Error calling get services: {}".format(e))
116 raise e
117
118 def get_default_storage_class(self) -> str:
119 """
120 Default storage class
121
122 :return: Returns the default storage class name, if exists.
123 If not, it returns the first storage class.
124 If there are not storage classes, returns None
125 """
126 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
127 selected_sc = None
128 default_sc_annotations = {
129 "storageclass.kubernetes.io/is-default-class": "true",
130 # Older clusters still use the beta annotation.
131 "storageclass.beta.kubernetes.io/is-default-class": "true",
132 }
133 for sc in storage_classes.items:
134 if not selected_sc:
135 # Select the first storage class in case there is no a default-class
136 selected_sc = sc.metadata.name
137 annotations = sc.metadata.annotations or {}
138 if any(
139 k in annotations and annotations[k] == v
140 for k, v in default_sc_annotations.items()
141 ):
142 # Default storage
143 selected_sc = sc.metadata.name
144 break
145 return selected_sc
146
147 def create_cluster_role(
148 self,
149 name: str,
150 labels: Dict[str, str],
151 namespace: str = "kube-system",
152 ):
153 """
154 Create a cluster role
155
156 :param: name: Name of the cluster role
157 :param: labels: Labels for cluster role metadata
158 :param: namespace: Kubernetes namespace for cluster role metadata
159 Default: kube-system
160 """
161 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
162 field_selector="metadata.name={}".format(name)
163 )
164
165 if len(cluster_roles.items) > 0:
166 raise Exception(
167 "Cluster role with metadata.name={} already exists".format(name)
168 )
169
170 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
171 # Cluster role
172 cluster_role = V1ClusterRole(
173 metadata=metadata,
174 rules=[
175 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
176 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
177 ],
178 )
179
180 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
181
182 def delete_cluster_role(self, name: str):
183 """
184 Delete a cluster role
185
186 :param: name: Name of the cluster role
187 """
188 self.clients[RBAC_CLIENT].delete_cluster_role(name)
189
190 def _get_kubectl_version(self):
191 version = VersionApi().get_code()
192 return "{}.{}".format(version.major, version.minor)
193
194 def _need_to_create_new_secret(self):
195 min_k8s_version = "1.24"
196 current_k8s_version = self._get_kubectl_version()
197 return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
198
199 def _get_secret_name(self, service_account_name: str):
200 random_alphanum = str(uuid.uuid4())[:5]
201 return "{}-token-{}".format(service_account_name, random_alphanum)
202
203 def _create_service_account_secret(
204 self, service_account_name: str, namespace: str, secret_name: str
205 ):
206 """
207 Create a secret for the service account. K8s version >= 1.24
208
209 :param: service_account_name: Name of the service account
210 :param: namespace: Kubernetes namespace for service account metadata
211 :param: secret_name: Name of the secret
212 """
213 v1_core = self.clients[CORE_CLIENT]
214 secrets = v1_core.list_namespaced_secret(
215 namespace, field_selector="metadata.name={}".format(secret_name)
216 ).items
217
218 if len(secrets) > 0:
219 raise Exception(
220 "Secret with metadata.name={} already exists".format(secret_name)
221 )
222
223 annotations = {"kubernetes.io/service-account.name": service_account_name}
224 metadata = V1ObjectMeta(
225 name=secret_name, namespace=namespace, annotations=annotations
226 )
227 type = "kubernetes.io/service-account-token"
228 secret = V1Secret(metadata=metadata, type=type)
229 v1_core.create_namespaced_secret(namespace, secret)
230
231 def _get_secret_reference_list(self, namespace: str, secret_name: str):
232 """
233 Return a secret reference list with one secret.
234 K8s version >= 1.24
235
236 :param: namespace: Kubernetes namespace for service account metadata
237 :param: secret_name: Name of the secret
238 :rtype: list[V1SecretReference]
239 """
240 return [V1SecretReference(name=secret_name, namespace=namespace)]
241
242 def create_service_account(
243 self,
244 name: str,
245 labels: Dict[str, str],
246 namespace: str = "kube-system",
247 ):
248 """
249 Create a service account
250
251 :param: name: Name of the service account
252 :param: labels: Labels for service account metadata
253 :param: namespace: Kubernetes namespace for service account metadata
254 Default: kube-system
255 """
256 v1_core = self.clients[CORE_CLIENT]
257 service_accounts = v1_core.list_namespaced_service_account(
258 namespace, field_selector="metadata.name={}".format(name)
259 )
260 if len(service_accounts.items) > 0:
261 raise Exception(
262 "Service account with metadata.name={} already exists".format(name)
263 )
264
265 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
266
267 if self._need_to_create_new_secret():
268 secret_name = self._get_secret_name(name)
269 secrets = self._get_secret_reference_list(namespace, secret_name)
270 service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
271 v1_core.create_namespaced_service_account(namespace, service_account)
272 self._create_service_account_secret(name, namespace, secret_name)
273 else:
274 service_account = V1ServiceAccount(metadata=metadata)
275 v1_core.create_namespaced_service_account(namespace, service_account)
276
277 def delete_service_account(self, name: str, namespace: str = "kube-system"):
278 """
279 Delete a service account
280
281 :param: name: Name of the service account
282 :param: namespace: Kubernetes namespace for service account metadata
283 Default: kube-system
284 """
285 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
286
287 def create_cluster_role_binding(
288 self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
289 ):
290 """
291 Create a cluster role binding
292
293 :param: name: Name of the cluster role
294 :param: labels: Labels for cluster role binding metadata
295 :param: namespace: Kubernetes namespace for cluster role binding metadata
296 Default: kube-system
297 """
298 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
299 field_selector="metadata.name={}".format(name)
300 )
301 if len(role_bindings.items) > 0:
302 raise Exception("Generated rbac id already exists")
303
304 role_binding = V1ClusterRoleBinding(
305 metadata=V1ObjectMeta(name=name, labels=labels),
306 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
307 subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
308 )
309 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
310
311 def delete_cluster_role_binding(self, name: str):
312 """
313 Delete a cluster role binding
314
315 :param: name: Name of the cluster role binding
316 """
317 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
318
319 @retry(
320 attempts=10,
321 delay=1,
322 fallback=Exception("Failed getting the secret from service account"),
323 callback=retry_callback,
324 )
325 async def get_secret_data(
326 self, name: str, namespace: str = "kube-system"
327 ) -> (str, str):
328 """
329 Get secret data
330
331 :param: name: Name of the secret data
332 :param: namespace: Name of the namespace where the secret is stored
333
334 :return: Tuple with the token and client certificate
335 """
336 v1_core = self.clients[CORE_CLIENT]
337
338 secret_name = None
339
340 service_accounts = v1_core.list_namespaced_service_account(
341 namespace, field_selector="metadata.name={}".format(name)
342 )
343 if len(service_accounts.items) == 0:
344 raise Exception(
345 "Service account not found with metadata.name={}".format(name)
346 )
347 service_account = service_accounts.items[0]
348 if service_account.secrets and len(service_account.secrets) > 0:
349 secret_name = service_account.secrets[0].name
350 if not secret_name:
351 raise Exception(
352 "Failed getting the secret from service account {}".format(name)
353 )
354 secret = v1_core.list_namespaced_secret(
355 namespace, field_selector="metadata.name={}".format(secret_name)
356 ).items[0]
357
358 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
359 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
360
361 return (
362 base64.b64decode(token).decode("utf-8"),
363 base64.b64decode(client_certificate_data).decode("utf-8"),
364 )
365
366 async def create_certificate(
367 self,
368 namespace: str,
369 name: str,
370 dns_prefix: str,
371 secret_name: str,
372 usages: list,
373 issuer_name: str,
374 ):
375 """
376 Creates cert-manager certificate object
377
378 :param: namespace: Name of the namespace where the certificate and secret is stored
379 :param: name: Name of the certificate object
380 :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
381 :param: secret_name: Name of the secret created by cert-manager
382 :param: usages: List of X.509 key usages
383 :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
384
385 """
386 certificate_body = {
387 "apiVersion": "cert-manager.io/v1",
388 "kind": "Certificate",
389 "metadata": {"name": name, "namespace": namespace},
390 "spec": {
391 "secretName": secret_name,
392 "privateKey": {
393 "rotationPolicy": "Always",
394 "algorithm": "ECDSA",
395 "size": 256,
396 },
397 "duration": "8760h", # 1 Year
398 "renewBefore": "2208h", # 9 months
399 "subject": {"organizations": ["osm"]},
400 "commonName": "osm",
401 "isCA": False,
402 "usages": usages,
403 "dnsNames": [
404 "{}.{}".format(dns_prefix, namespace),
405 "{}.{}.svc".format(dns_prefix, namespace),
406 "{}.{}.svc.cluster".format(dns_prefix, namespace),
407 "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
408 ],
409 "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
410 },
411 }
412 client = self.clients[CUSTOM_OBJECT_CLIENT]
413 try:
414 client.create_namespaced_custom_object(
415 group="cert-manager.io",
416 plural="certificates",
417 version="v1",
418 body=certificate_body,
419 namespace=namespace,
420 )
421 except ApiException as e:
422 info = json.loads(e.body)
423 if info.get("reason").lower() == "alreadyexists":
424 self.logger.warning("Certificate already exists: {}".format(e))
425 else:
426 raise e
427
428 async def delete_certificate(self, namespace, object_name):
429 client = self.clients[CUSTOM_OBJECT_CLIENT]
430 try:
431 client.delete_namespaced_custom_object(
432 group="cert-manager.io",
433 plural="certificates",
434 version="v1",
435 name=object_name,
436 namespace=namespace,
437 )
438 except ApiException as e:
439 info = json.loads(e.body)
440 if info.get("reason").lower() == "notfound":
441 self.logger.warning("Certificate already deleted: {}".format(e))
442 else:
443 raise e