blob: a56b6cdc2f98e39e0b7674e9d586a475ef956625 [file] [log] [blame]
David Garcia5d799392020-07-02 13:56:58 +02001# Copyright 2020 Canonical Ltd.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
David Garciad8d4b6e2021-06-24 18:47:22 +020015import base64
David Garciaf6e9b002020-11-27 15:32:02 +010016import logging
David Garciad8d4b6e2021-06-24 18:47:22 +020017from typing import Dict
18import typing
19
David Garciaf6e9b002020-11-27 15:32:02 +010020
David Garcia5d799392020-07-02 13:56:58 +020021from kubernetes import client, config
David Garciad8d4b6e2021-06-24 18:47:22 +020022from kubernetes.client.models import (
23 V1ClusterRole,
24 V1ObjectMeta,
25 V1PolicyRule,
26 V1ServiceAccount,
27 V1ClusterRoleBinding,
28 V1RoleRef,
29 V1Subject,
30)
David Garcia5d799392020-07-02 13:56:58 +020031from kubernetes.client.rest import ApiException
David Garciad8d4b6e2021-06-24 18:47:22 +020032from retrying_async import retry
David Garciaf6e9b002020-11-27 15:32:02 +010033
34
David Garciad8d4b6e2021-06-24 18:47:22 +020035SERVICE_ACCOUNT_TOKEN_KEY = "token"
36SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
37# clients
David Garciaf6e9b002020-11-27 15:32:02 +010038CORE_CLIENT = "core_v1"
David Garciaf6e9b002020-11-27 15:32:02 +010039RBAC_CLIENT = "rbac_v1"
David Garciad8d4b6e2021-06-24 18:47:22 +020040STORAGE_CLIENT = "storage_v1"
David Garcia5d799392020-07-02 13:56:58 +020041
42
43class Kubectl:
44 def __init__(self, config_file=None):
45 config.load_kube_config(config_file=config_file)
David Garciaf6e9b002020-11-27 15:32:02 +010046 self._clients = {
David Garciad8d4b6e2021-06-24 18:47:22 +020047 CORE_CLIENT: client.CoreV1Api(),
48 RBAC_CLIENT: client.RbacAuthorizationV1Api(),
49 STORAGE_CLIENT: client.StorageV1Api(),
David Garciaf6e9b002020-11-27 15:32:02 +010050 }
David Garciad8d4b6e2021-06-24 18:47:22 +020051 self._configuration = config.kube_config.Configuration.get_default_copy()
David Garcia5d799392020-07-02 13:56:58 +020052 self.logger = logging.getLogger("Kubectl")
53
David Garciaf6e9b002020-11-27 15:32:02 +010054 @property
55 def configuration(self):
56 return self._configuration
57
58 @property
59 def clients(self):
60 return self._clients
David Garcia475a7222020-09-21 16:19:15 +020061
David Garciad8d4b6e2021-06-24 18:47:22 +020062 def get_services(
63 self,
64 field_selector: str = None,
65 label_selector: str = None,
66 ) -> typing.List[typing.Dict]:
67 """
68 Get Service list from a namespace
69
70 :param: field_selector: Kubernetes field selector for the namespace
71 :param: label_selector: Kubernetes label selector for the namespace
72
73 :return: List of the services matching the selectors specified
74 """
David Garcia5d799392020-07-02 13:56:58 +020075 kwargs = {}
76 if field_selector:
77 kwargs["field_selector"] = field_selector
78 if label_selector:
79 kwargs["label_selector"] = label_selector
David Garcia5d799392020-07-02 13:56:58 +020080 try:
David Garciaf6e9b002020-11-27 15:32:02 +010081 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
David Garcia5d799392020-07-02 13:56:58 +020082 return [
83 {
84 "name": i.metadata.name,
85 "cluster_ip": i.spec.cluster_ip,
86 "type": i.spec.type,
David Garcia37004982020-07-16 17:53:20 +020087 "ports": [
88 {
89 "name": p.name,
90 "node_port": p.node_port,
91 "port": p.port,
92 "protocol": p.protocol,
93 "target_port": p.target_port,
94 }
95 for p in i.spec.ports
David Garcia84ebb752020-07-22 13:17:56 +020096 ]
97 if i.spec.ports
98 else [],
David Garcia5d799392020-07-02 13:56:58 +020099 "external_ip": [i.ip for i in i.status.load_balancer.ingress]
100 if i.status.load_balancer.ingress
101 else None,
102 }
103 for i in result.items
104 ]
105 except ApiException as e:
106 self.logger.error("Error calling get services: {}".format(e))
107 raise e
David Garcia475a7222020-09-21 16:19:15 +0200108
109 def get_default_storage_class(self) -> str:
110 """
111 Default storage class
112
113 :return: Returns the default storage class name, if exists.
114 If not, it returns the first storage class.
115 If there are not storage classes, returns None
116 """
David Garciaf6e9b002020-11-27 15:32:02 +0100117 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
David Garcia475a7222020-09-21 16:19:15 +0200118 selected_sc = None
119 default_sc_annotations = {
120 "storageclass.kubernetes.io/is-default-class": "true",
121 # Older clusters still use the beta annotation.
122 "storageclass.beta.kubernetes.io/is-default-class": "true",
123 }
124 for sc in storage_classes.items:
125 if not selected_sc:
126 # Select the first storage class in case there is no a default-class
127 selected_sc = sc.metadata.name
garciadeblas979c54e2021-05-28 14:10:59 +0200128 annotations = sc.metadata.annotations or {}
David Garcia475a7222020-09-21 16:19:15 +0200129 if any(
130 k in annotations and annotations[k] == v
131 for k, v in default_sc_annotations.items()
132 ):
133 # Default storage
134 selected_sc = sc.metadata.name
135 break
136 return selected_sc
David Garciad8d4b6e2021-06-24 18:47:22 +0200137
138 def create_cluster_role(
139 self,
140 name: str,
141 labels: Dict[str, str],
142 namespace: str = "kube-system",
143 ):
144 """
145 Create a cluster role
146
147 :param: name: Name of the cluster role
148 :param: labels: Labels for cluster role metadata
149 :param: namespace: Kubernetes namespace for cluster role metadata
150 Default: kube-system
151 """
152 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
153 field_selector="metadata.name={}".format(name)
154 )
155
156 if len(cluster_roles.items) > 0:
157 raise Exception(
158 "Cluster role with metadata.name={} already exists".format(name)
159 )
160
161 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
162 # Cluster role
163 cluster_role = V1ClusterRole(
164 metadata=metadata,
165 rules=[
166 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
167 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
168 ],
169 )
170
171 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
172
173 def delete_cluster_role(self, name: str):
174 """
175 Delete a cluster role
176
177 :param: name: Name of the cluster role
178 """
179 self.clients[RBAC_CLIENT].delete_cluster_role(name)
180
181 def create_service_account(
182 self,
183 name: str,
184 labels: Dict[str, str],
185 namespace: str = "kube-system",
186 ):
187 """
188 Create a service account
189
190 :param: name: Name of the service account
191 :param: labels: Labels for service account metadata
192 :param: namespace: Kubernetes namespace for service account metadata
193 Default: kube-system
194 """
195 service_accounts = self.clients[CORE_CLIENT].list_namespaced_service_account(
196 namespace, field_selector="metadata.name={}".format(name)
197 )
198 if len(service_accounts.items) > 0:
199 raise Exception(
200 "Service account with metadata.name={} already exists".format(name)
201 )
202
203 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
204 service_account = V1ServiceAccount(metadata=metadata)
205
206 self.clients[CORE_CLIENT].create_namespaced_service_account(
207 namespace, service_account
208 )
209
210 def delete_service_account(self, name: str, namespace: str = "kube-system"):
211 """
212 Delete a service account
213
214 :param: name: Name of the service account
215 :param: namespace: Kubernetes namespace for service account metadata
216 Default: kube-system
217 """
218 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
219
220 def create_cluster_role_binding(
221 self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
222 ):
223 """
224 Create a cluster role binding
225
226 :param: name: Name of the cluster role
227 :param: labels: Labels for cluster role binding metadata
228 :param: namespace: Kubernetes namespace for cluster role binding metadata
229 Default: kube-system
230 """
231 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
232 field_selector="metadata.name={}".format(name)
233 )
234 if len(role_bindings.items) > 0:
235 raise Exception("Generated rbac id already exists")
236
237 role_binding = V1ClusterRoleBinding(
238 metadata=V1ObjectMeta(name=name, labels=labels),
239 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
240 subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
241 )
242 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
243
244 def delete_cluster_role_binding(self, name: str):
245 """
246 Delete a cluster role binding
247
248 :param: name: Name of the cluster role binding
249 """
250 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
251
252 @retry(
253 attempts=10,
254 delay=1,
255 fallback=Exception("Failed getting the secret from service account"),
256 )
David Garcia4ae527e2021-07-26 16:04:59 +0200257 async def get_secret_data(
258 self, name: str, namespace: str = "kube-system"
259 ) -> (str, str):
David Garciad8d4b6e2021-06-24 18:47:22 +0200260 """
261 Get secret data
262
David Garcia4ae527e2021-07-26 16:04:59 +0200263 :param: name: Name of the secret data
264 :param: namespace: Name of the namespace where the secret is stored
265
David Garciad8d4b6e2021-06-24 18:47:22 +0200266 :return: Tuple with the token and client certificate
267 """
268 v1_core = self.clients[CORE_CLIENT]
269
270 secret_name = None
271
272 service_accounts = v1_core.list_namespaced_service_account(
273 namespace, field_selector="metadata.name={}".format(name)
274 )
275 if len(service_accounts.items) == 0:
276 raise Exception(
277 "Service account not found with metadata.name={}".format(name)
278 )
279 service_account = service_accounts.items[0]
280 if service_account.secrets and len(service_account.secrets) > 0:
281 secret_name = service_account.secrets[0].name
282 if not secret_name:
283 raise Exception(
284 "Failed getting the secret from service account {}".format(name)
285 )
286 secret = v1_core.list_namespaced_secret(
287 namespace, field_selector="metadata.name={}".format(secret_name)
288 ).items[0]
289
290 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
291 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
292
293 return (
294 base64.b64decode(token).decode("utf-8"),
295 base64.b64decode(client_certificate_data).decode("utf-8"),
296 )