blob: 8b8008efa1bf18ef8ea723113efcf8ce9fe19593 [file] [log] [blame]
David Garcia5d799392020-07-02 13:56:58 +02001# Copyright 2020 Canonical Ltd.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
David Garciad8d4b6e2021-06-24 18:47:22 +020015import base64
David Garciaf6e9b002020-11-27 15:32:02 +010016import logging
David Garciad8d4b6e2021-06-24 18:47:22 +020017from typing import Dict
18import typing
Patricia Reinoso6343d432022-08-23 06:22:01 +000019import uuid
David Garciad8d4b6e2021-06-24 18:47:22 +020020
Patricia Reinoso6343d432022-08-23 06:22:01 +000021from distutils.version import LooseVersion
David Garciaf6e9b002020-11-27 15:32:02 +010022
David Garcia5d799392020-07-02 13:56:58 +020023from kubernetes import client, config
Patricia Reinoso6343d432022-08-23 06:22:01 +000024from kubernetes.client.api import VersionApi
David Garciad8d4b6e2021-06-24 18:47:22 +020025from kubernetes.client.models import (
26 V1ClusterRole,
27 V1ObjectMeta,
28 V1PolicyRule,
29 V1ServiceAccount,
30 V1ClusterRoleBinding,
31 V1RoleRef,
32 V1Subject,
Patricia Reinoso6343d432022-08-23 06:22:01 +000033 V1Secret,
34 V1SecretReference,
David Garciad8d4b6e2021-06-24 18:47:22 +020035)
David Garcia5d799392020-07-02 13:56:58 +020036from kubernetes.client.rest import ApiException
David Garciad8d4b6e2021-06-24 18:47:22 +020037from retrying_async import retry
David Garciaf6e9b002020-11-27 15:32:02 +010038
39
David Garciad8d4b6e2021-06-24 18:47:22 +020040SERVICE_ACCOUNT_TOKEN_KEY = "token"
41SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
42# clients
David Garciaf6e9b002020-11-27 15:32:02 +010043CORE_CLIENT = "core_v1"
David Garciaf6e9b002020-11-27 15:32:02 +010044RBAC_CLIENT = "rbac_v1"
David Garciad8d4b6e2021-06-24 18:47:22 +020045STORAGE_CLIENT = "storage_v1"
David Garcia5d799392020-07-02 13:56:58 +020046
47
48class Kubectl:
49 def __init__(self, config_file=None):
50 config.load_kube_config(config_file=config_file)
David Garciaf6e9b002020-11-27 15:32:02 +010051 self._clients = {
David Garciad8d4b6e2021-06-24 18:47:22 +020052 CORE_CLIENT: client.CoreV1Api(),
53 RBAC_CLIENT: client.RbacAuthorizationV1Api(),
54 STORAGE_CLIENT: client.StorageV1Api(),
David Garciaf6e9b002020-11-27 15:32:02 +010055 }
David Garciad8d4b6e2021-06-24 18:47:22 +020056 self._configuration = config.kube_config.Configuration.get_default_copy()
David Garcia5d799392020-07-02 13:56:58 +020057 self.logger = logging.getLogger("Kubectl")
58
David Garciaf6e9b002020-11-27 15:32:02 +010059 @property
60 def configuration(self):
61 return self._configuration
62
63 @property
64 def clients(self):
65 return self._clients
David Garcia475a7222020-09-21 16:19:15 +020066
David Garciad8d4b6e2021-06-24 18:47:22 +020067 def get_services(
68 self,
69 field_selector: str = None,
70 label_selector: str = None,
71 ) -> typing.List[typing.Dict]:
72 """
73 Get Service list from a namespace
74
75 :param: field_selector: Kubernetes field selector for the namespace
76 :param: label_selector: Kubernetes label selector for the namespace
77
78 :return: List of the services matching the selectors specified
79 """
David Garcia5d799392020-07-02 13:56:58 +020080 kwargs = {}
81 if field_selector:
82 kwargs["field_selector"] = field_selector
83 if label_selector:
84 kwargs["label_selector"] = label_selector
David Garcia5d799392020-07-02 13:56:58 +020085 try:
David Garciaf6e9b002020-11-27 15:32:02 +010086 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
David Garcia5d799392020-07-02 13:56:58 +020087 return [
88 {
89 "name": i.metadata.name,
90 "cluster_ip": i.spec.cluster_ip,
91 "type": i.spec.type,
David Garcia37004982020-07-16 17:53:20 +020092 "ports": [
93 {
94 "name": p.name,
95 "node_port": p.node_port,
96 "port": p.port,
97 "protocol": p.protocol,
98 "target_port": p.target_port,
99 }
100 for p in i.spec.ports
David Garcia84ebb752020-07-22 13:17:56 +0200101 ]
102 if i.spec.ports
103 else [],
David Garcia5d799392020-07-02 13:56:58 +0200104 "external_ip": [i.ip for i in i.status.load_balancer.ingress]
105 if i.status.load_balancer.ingress
106 else None,
107 }
108 for i in result.items
109 ]
110 except ApiException as e:
111 self.logger.error("Error calling get services: {}".format(e))
112 raise e
David Garcia475a7222020-09-21 16:19:15 +0200113
114 def get_default_storage_class(self) -> str:
115 """
116 Default storage class
117
118 :return: Returns the default storage class name, if exists.
119 If not, it returns the first storage class.
120 If there are not storage classes, returns None
121 """
David Garciaf6e9b002020-11-27 15:32:02 +0100122 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
David Garcia475a7222020-09-21 16:19:15 +0200123 selected_sc = None
124 default_sc_annotations = {
125 "storageclass.kubernetes.io/is-default-class": "true",
126 # Older clusters still use the beta annotation.
127 "storageclass.beta.kubernetes.io/is-default-class": "true",
128 }
129 for sc in storage_classes.items:
130 if not selected_sc:
131 # Select the first storage class in case there is no a default-class
132 selected_sc = sc.metadata.name
garciadeblas979c54e2021-05-28 14:10:59 +0200133 annotations = sc.metadata.annotations or {}
David Garcia475a7222020-09-21 16:19:15 +0200134 if any(
135 k in annotations and annotations[k] == v
136 for k, v in default_sc_annotations.items()
137 ):
138 # Default storage
139 selected_sc = sc.metadata.name
140 break
141 return selected_sc
David Garciad8d4b6e2021-06-24 18:47:22 +0200142
143 def create_cluster_role(
144 self,
145 name: str,
146 labels: Dict[str, str],
147 namespace: str = "kube-system",
148 ):
149 """
150 Create a cluster role
151
152 :param: name: Name of the cluster role
153 :param: labels: Labels for cluster role metadata
154 :param: namespace: Kubernetes namespace for cluster role metadata
155 Default: kube-system
156 """
157 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
158 field_selector="metadata.name={}".format(name)
159 )
160
161 if len(cluster_roles.items) > 0:
162 raise Exception(
163 "Cluster role with metadata.name={} already exists".format(name)
164 )
165
166 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
167 # Cluster role
168 cluster_role = V1ClusterRole(
169 metadata=metadata,
170 rules=[
171 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
172 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
173 ],
174 )
175
176 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
177
178 def delete_cluster_role(self, name: str):
179 """
180 Delete a cluster role
181
182 :param: name: Name of the cluster role
183 """
184 self.clients[RBAC_CLIENT].delete_cluster_role(name)
185
Patricia Reinoso6343d432022-08-23 06:22:01 +0000186 def _get_kubectl_version(self):
187 version = VersionApi().get_code()
188 return "{}.{}".format(version.major, version.minor)
189
190 def _need_to_create_new_secret(self):
191 min_k8s_version = "1.24"
192 current_k8s_version = self._get_kubectl_version()
193 return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
194
195 def _get_secret_name(self, service_account_name: str):
196 random_alphanum = str(uuid.uuid4())[:5]
197 return "{}-token-{}".format(service_account_name, random_alphanum)
198
199 def _create_service_account_secret(
200 self, service_account_name: str, namespace: str, secret_name: str
201 ):
202 """
203 Create a secret for the service account. K8s version >= 1.24
204
205 :param: service_account_name: Name of the service account
206 :param: namespace: Kubernetes namespace for service account metadata
207 :param: secret_name: Name of the secret
208 """
209 v1_core = self.clients[CORE_CLIENT]
210 secrets = v1_core.list_namespaced_secret(
211 namespace, field_selector="metadata.name={}".format(secret_name)
212 ).items
213
214 if len(secrets) > 0:
215 raise Exception(
216 "Secret with metadata.name={} already exists".format(secret_name)
217 )
218
219 annotations = {"kubernetes.io/service-account.name": service_account_name}
220 metadata = V1ObjectMeta(
221 name=secret_name, namespace=namespace, annotations=annotations
222 )
223 type = "kubernetes.io/service-account-token"
224 secret = V1Secret(metadata=metadata, type=type)
225 v1_core.create_namespaced_secret(namespace, secret)
226
227 def _get_secret_reference_list(self, namespace: str, secret_name: str):
228 """
229 Return a secret reference list with one secret.
230 K8s version >= 1.24
231
232 :param: namespace: Kubernetes namespace for service account metadata
233 :param: secret_name: Name of the secret
234 :rtype: list[V1SecretReference]
235 """
236 return [V1SecretReference(name=secret_name, namespace=namespace)]
237
David Garciad8d4b6e2021-06-24 18:47:22 +0200238 def create_service_account(
239 self,
240 name: str,
241 labels: Dict[str, str],
242 namespace: str = "kube-system",
243 ):
244 """
245 Create a service account
246
247 :param: name: Name of the service account
248 :param: labels: Labels for service account metadata
249 :param: namespace: Kubernetes namespace for service account metadata
250 Default: kube-system
251 """
Patricia Reinoso6343d432022-08-23 06:22:01 +0000252 v1_core = self.clients[CORE_CLIENT]
253 service_accounts = v1_core.list_namespaced_service_account(
David Garciad8d4b6e2021-06-24 18:47:22 +0200254 namespace, field_selector="metadata.name={}".format(name)
255 )
256 if len(service_accounts.items) > 0:
257 raise Exception(
258 "Service account with metadata.name={} already exists".format(name)
259 )
260
261 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
David Garciad8d4b6e2021-06-24 18:47:22 +0200262
Patricia Reinoso6343d432022-08-23 06:22:01 +0000263 if self._need_to_create_new_secret():
264 secret_name = self._get_secret_name(name)
265 secrets = self._get_secret_reference_list(namespace, secret_name)
266 service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
267 v1_core.create_namespaced_service_account(namespace, service_account)
268 self._create_service_account_secret(name, namespace, secret_name)
269 else:
270 service_account = V1ServiceAccount(metadata=metadata)
271 v1_core.create_namespaced_service_account(namespace, service_account)
David Garciad8d4b6e2021-06-24 18:47:22 +0200272
273 def delete_service_account(self, name: str, namespace: str = "kube-system"):
274 """
275 Delete a service account
276
277 :param: name: Name of the service account
278 :param: namespace: Kubernetes namespace for service account metadata
279 Default: kube-system
280 """
281 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
282
283 def create_cluster_role_binding(
284 self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
285 ):
286 """
287 Create a cluster role binding
288
289 :param: name: Name of the cluster role
290 :param: labels: Labels for cluster role binding metadata
291 :param: namespace: Kubernetes namespace for cluster role binding metadata
292 Default: kube-system
293 """
294 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
295 field_selector="metadata.name={}".format(name)
296 )
297 if len(role_bindings.items) > 0:
298 raise Exception("Generated rbac id already exists")
299
300 role_binding = V1ClusterRoleBinding(
301 metadata=V1ObjectMeta(name=name, labels=labels),
302 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
303 subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
304 )
305 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
306
307 def delete_cluster_role_binding(self, name: str):
308 """
309 Delete a cluster role binding
310
311 :param: name: Name of the cluster role binding
312 """
313 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
314
315 @retry(
316 attempts=10,
317 delay=1,
318 fallback=Exception("Failed getting the secret from service account"),
319 )
David Garcia4ae527e2021-07-26 16:04:59 +0200320 async def get_secret_data(
321 self, name: str, namespace: str = "kube-system"
322 ) -> (str, str):
David Garciad8d4b6e2021-06-24 18:47:22 +0200323 """
324 Get secret data
325
David Garcia4ae527e2021-07-26 16:04:59 +0200326 :param: name: Name of the secret data
327 :param: namespace: Name of the namespace where the secret is stored
328
David Garciad8d4b6e2021-06-24 18:47:22 +0200329 :return: Tuple with the token and client certificate
330 """
331 v1_core = self.clients[CORE_CLIENT]
332
333 secret_name = None
334
335 service_accounts = v1_core.list_namespaced_service_account(
336 namespace, field_selector="metadata.name={}".format(name)
337 )
338 if len(service_accounts.items) == 0:
339 raise Exception(
340 "Service account not found with metadata.name={}".format(name)
341 )
342 service_account = service_accounts.items[0]
343 if service_account.secrets and len(service_account.secrets) > 0:
344 secret_name = service_account.secrets[0].name
345 if not secret_name:
346 raise Exception(
347 "Failed getting the secret from service account {}".format(name)
348 )
349 secret = v1_core.list_namespaced_secret(
350 namespace, field_selector="metadata.name={}".format(secret_name)
351 ).items[0]
352
353 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
354 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
355
356 return (
357 base64.b64decode(token).decode("utf-8"),
358 base64.b64decode(client_certificate_data).decode("utf-8"),
359 )