8b8008efa1bf18ef8ea723113efcf8ce9fe19593
[osm/N2VC.git] / n2vc / kubectl.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import base64
16 import logging
17 from typing import Dict
18 import typing
19 import uuid
20
21 from distutils.version import LooseVersion
22
23 from kubernetes import client, config
24 from kubernetes.client.api import VersionApi
25 from kubernetes.client.models import (
26 V1ClusterRole,
27 V1ObjectMeta,
28 V1PolicyRule,
29 V1ServiceAccount,
30 V1ClusterRoleBinding,
31 V1RoleRef,
32 V1Subject,
33 V1Secret,
34 V1SecretReference,
35 )
36 from kubernetes.client.rest import ApiException
37 from retrying_async import retry
38
39
40 SERVICE_ACCOUNT_TOKEN_KEY = "token"
41 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
42 # clients
43 CORE_CLIENT = "core_v1"
44 RBAC_CLIENT = "rbac_v1"
45 STORAGE_CLIENT = "storage_v1"
46
47
48 class Kubectl:
49 def __init__(self, config_file=None):
50 config.load_kube_config(config_file=config_file)
51 self._clients = {
52 CORE_CLIENT: client.CoreV1Api(),
53 RBAC_CLIENT: client.RbacAuthorizationV1Api(),
54 STORAGE_CLIENT: client.StorageV1Api(),
55 }
56 self._configuration = config.kube_config.Configuration.get_default_copy()
57 self.logger = logging.getLogger("Kubectl")
58
59 @property
60 def configuration(self):
61 return self._configuration
62
63 @property
64 def clients(self):
65 return self._clients
66
67 def get_services(
68 self,
69 field_selector: str = None,
70 label_selector: str = None,
71 ) -> typing.List[typing.Dict]:
72 """
73 Get Service list from a namespace
74
75 :param: field_selector: Kubernetes field selector for the namespace
76 :param: label_selector: Kubernetes label selector for the namespace
77
78 :return: List of the services matching the selectors specified
79 """
80 kwargs = {}
81 if field_selector:
82 kwargs["field_selector"] = field_selector
83 if label_selector:
84 kwargs["label_selector"] = label_selector
85 try:
86 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
87 return [
88 {
89 "name": i.metadata.name,
90 "cluster_ip": i.spec.cluster_ip,
91 "type": i.spec.type,
92 "ports": [
93 {
94 "name": p.name,
95 "node_port": p.node_port,
96 "port": p.port,
97 "protocol": p.protocol,
98 "target_port": p.target_port,
99 }
100 for p in i.spec.ports
101 ]
102 if i.spec.ports
103 else [],
104 "external_ip": [i.ip for i in i.status.load_balancer.ingress]
105 if i.status.load_balancer.ingress
106 else None,
107 }
108 for i in result.items
109 ]
110 except ApiException as e:
111 self.logger.error("Error calling get services: {}".format(e))
112 raise e
113
114 def get_default_storage_class(self) -> str:
115 """
116 Default storage class
117
118 :return: Returns the default storage class name, if exists.
119 If not, it returns the first storage class.
120 If there are not storage classes, returns None
121 """
122 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
123 selected_sc = None
124 default_sc_annotations = {
125 "storageclass.kubernetes.io/is-default-class": "true",
126 # Older clusters still use the beta annotation.
127 "storageclass.beta.kubernetes.io/is-default-class": "true",
128 }
129 for sc in storage_classes.items:
130 if not selected_sc:
131 # Select the first storage class in case there is no a default-class
132 selected_sc = sc.metadata.name
133 annotations = sc.metadata.annotations or {}
134 if any(
135 k in annotations and annotations[k] == v
136 for k, v in default_sc_annotations.items()
137 ):
138 # Default storage
139 selected_sc = sc.metadata.name
140 break
141 return selected_sc
142
143 def create_cluster_role(
144 self,
145 name: str,
146 labels: Dict[str, str],
147 namespace: str = "kube-system",
148 ):
149 """
150 Create a cluster role
151
152 :param: name: Name of the cluster role
153 :param: labels: Labels for cluster role metadata
154 :param: namespace: Kubernetes namespace for cluster role metadata
155 Default: kube-system
156 """
157 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
158 field_selector="metadata.name={}".format(name)
159 )
160
161 if len(cluster_roles.items) > 0:
162 raise Exception(
163 "Cluster role with metadata.name={} already exists".format(name)
164 )
165
166 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
167 # Cluster role
168 cluster_role = V1ClusterRole(
169 metadata=metadata,
170 rules=[
171 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
172 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
173 ],
174 )
175
176 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
177
178 def delete_cluster_role(self, name: str):
179 """
180 Delete a cluster role
181
182 :param: name: Name of the cluster role
183 """
184 self.clients[RBAC_CLIENT].delete_cluster_role(name)
185
186 def _get_kubectl_version(self):
187 version = VersionApi().get_code()
188 return "{}.{}".format(version.major, version.minor)
189
190 def _need_to_create_new_secret(self):
191 min_k8s_version = "1.24"
192 current_k8s_version = self._get_kubectl_version()
193 return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
194
195 def _get_secret_name(self, service_account_name: str):
196 random_alphanum = str(uuid.uuid4())[:5]
197 return "{}-token-{}".format(service_account_name, random_alphanum)
198
199 def _create_service_account_secret(
200 self, service_account_name: str, namespace: str, secret_name: str
201 ):
202 """
203 Create a secret for the service account. K8s version >= 1.24
204
205 :param: service_account_name: Name of the service account
206 :param: namespace: Kubernetes namespace for service account metadata
207 :param: secret_name: Name of the secret
208 """
209 v1_core = self.clients[CORE_CLIENT]
210 secrets = v1_core.list_namespaced_secret(
211 namespace, field_selector="metadata.name={}".format(secret_name)
212 ).items
213
214 if len(secrets) > 0:
215 raise Exception(
216 "Secret with metadata.name={} already exists".format(secret_name)
217 )
218
219 annotations = {"kubernetes.io/service-account.name": service_account_name}
220 metadata = V1ObjectMeta(
221 name=secret_name, namespace=namespace, annotations=annotations
222 )
223 type = "kubernetes.io/service-account-token"
224 secret = V1Secret(metadata=metadata, type=type)
225 v1_core.create_namespaced_secret(namespace, secret)
226
227 def _get_secret_reference_list(self, namespace: str, secret_name: str):
228 """
229 Return a secret reference list with one secret.
230 K8s version >= 1.24
231
232 :param: namespace: Kubernetes namespace for service account metadata
233 :param: secret_name: Name of the secret
234 :rtype: list[V1SecretReference]
235 """
236 return [V1SecretReference(name=secret_name, namespace=namespace)]
237
238 def create_service_account(
239 self,
240 name: str,
241 labels: Dict[str, str],
242 namespace: str = "kube-system",
243 ):
244 """
245 Create a service account
246
247 :param: name: Name of the service account
248 :param: labels: Labels for service account metadata
249 :param: namespace: Kubernetes namespace for service account metadata
250 Default: kube-system
251 """
252 v1_core = self.clients[CORE_CLIENT]
253 service_accounts = v1_core.list_namespaced_service_account(
254 namespace, field_selector="metadata.name={}".format(name)
255 )
256 if len(service_accounts.items) > 0:
257 raise Exception(
258 "Service account with metadata.name={} already exists".format(name)
259 )
260
261 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
262
263 if self._need_to_create_new_secret():
264 secret_name = self._get_secret_name(name)
265 secrets = self._get_secret_reference_list(namespace, secret_name)
266 service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
267 v1_core.create_namespaced_service_account(namespace, service_account)
268 self._create_service_account_secret(name, namespace, secret_name)
269 else:
270 service_account = V1ServiceAccount(metadata=metadata)
271 v1_core.create_namespaced_service_account(namespace, service_account)
272
273 def delete_service_account(self, name: str, namespace: str = "kube-system"):
274 """
275 Delete a service account
276
277 :param: name: Name of the service account
278 :param: namespace: Kubernetes namespace for service account metadata
279 Default: kube-system
280 """
281 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
282
283 def create_cluster_role_binding(
284 self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
285 ):
286 """
287 Create a cluster role binding
288
289 :param: name: Name of the cluster role
290 :param: labels: Labels for cluster role binding metadata
291 :param: namespace: Kubernetes namespace for cluster role binding metadata
292 Default: kube-system
293 """
294 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
295 field_selector="metadata.name={}".format(name)
296 )
297 if len(role_bindings.items) > 0:
298 raise Exception("Generated rbac id already exists")
299
300 role_binding = V1ClusterRoleBinding(
301 metadata=V1ObjectMeta(name=name, labels=labels),
302 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
303 subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
304 )
305 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
306
307 def delete_cluster_role_binding(self, name: str):
308 """
309 Delete a cluster role binding
310
311 :param: name: Name of the cluster role binding
312 """
313 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
314
315 @retry(
316 attempts=10,
317 delay=1,
318 fallback=Exception("Failed getting the secret from service account"),
319 )
320 async def get_secret_data(
321 self, name: str, namespace: str = "kube-system"
322 ) -> (str, str):
323 """
324 Get secret data
325
326 :param: name: Name of the secret data
327 :param: namespace: Name of the namespace where the secret is stored
328
329 :return: Tuple with the token and client certificate
330 """
331 v1_core = self.clients[CORE_CLIENT]
332
333 secret_name = None
334
335 service_accounts = v1_core.list_namespaced_service_account(
336 namespace, field_selector="metadata.name={}".format(name)
337 )
338 if len(service_accounts.items) == 0:
339 raise Exception(
340 "Service account not found with metadata.name={}".format(name)
341 )
342 service_account = service_accounts.items[0]
343 if service_account.secrets and len(service_account.secrets) > 0:
344 secret_name = service_account.secrets[0].name
345 if not secret_name:
346 raise Exception(
347 "Failed getting the secret from service account {}".format(name)
348 )
349 secret = v1_core.list_namespaced_secret(
350 namespace, field_selector="metadata.name={}".format(secret_name)
351 ).items[0]
352
353 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
354 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
355
356 return (
357 base64.b64decode(token).decode("utf-8"),
358 base64.b64decode(client_certificate_data).decode("utf-8"),
359 )