Unit tests improvements
[osm/N2VC.git] / n2vc / kubectl.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import base64
16 import logging
17 from typing import Dict
18 import typing
19
20
21 from kubernetes import client, config
22 from kubernetes.client.models import (
23 V1ClusterRole,
24 V1ObjectMeta,
25 V1PolicyRule,
26 V1ServiceAccount,
27 V1ClusterRoleBinding,
28 V1RoleRef,
29 V1Subject,
30 )
31 from kubernetes.client.rest import ApiException
32 from retrying_async import retry
33
34
35 SERVICE_ACCOUNT_TOKEN_KEY = "token"
36 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
37 # clients
38 CORE_CLIENT = "core_v1"
39 RBAC_CLIENT = "rbac_v1"
40 STORAGE_CLIENT = "storage_v1"
41
42
43 class Kubectl:
44 def __init__(self, config_file=None):
45 config.load_kube_config(config_file=config_file)
46 self._clients = {
47 CORE_CLIENT: client.CoreV1Api(),
48 RBAC_CLIENT: client.RbacAuthorizationV1Api(),
49 STORAGE_CLIENT: client.StorageV1Api(),
50 }
51 self._configuration = config.kube_config.Configuration.get_default_copy()
52 self.logger = logging.getLogger("Kubectl")
53
54 @property
55 def configuration(self):
56 return self._configuration
57
58 @property
59 def clients(self):
60 return self._clients
61
62 def get_services(
63 self,
64 field_selector: str = None,
65 label_selector: str = None,
66 ) -> typing.List[typing.Dict]:
67 """
68 Get Service list from a namespace
69
70 :param: field_selector: Kubernetes field selector for the namespace
71 :param: label_selector: Kubernetes label selector for the namespace
72
73 :return: List of the services matching the selectors specified
74 """
75 kwargs = {}
76 if field_selector:
77 kwargs["field_selector"] = field_selector
78 if label_selector:
79 kwargs["label_selector"] = label_selector
80 try:
81 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
82 return [
83 {
84 "name": i.metadata.name,
85 "cluster_ip": i.spec.cluster_ip,
86 "type": i.spec.type,
87 "ports": [
88 {
89 "name": p.name,
90 "node_port": p.node_port,
91 "port": p.port,
92 "protocol": p.protocol,
93 "target_port": p.target_port,
94 }
95 for p in i.spec.ports
96 ]
97 if i.spec.ports
98 else [],
99 "external_ip": [i.ip for i in i.status.load_balancer.ingress]
100 if i.status.load_balancer.ingress
101 else None,
102 }
103 for i in result.items
104 ]
105 except ApiException as e:
106 self.logger.error("Error calling get services: {}".format(e))
107 raise e
108
109 def get_default_storage_class(self) -> str:
110 """
111 Default storage class
112
113 :return: Returns the default storage class name, if exists.
114 If not, it returns the first storage class.
115 If there are not storage classes, returns None
116 """
117 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
118 selected_sc = None
119 default_sc_annotations = {
120 "storageclass.kubernetes.io/is-default-class": "true",
121 # Older clusters still use the beta annotation.
122 "storageclass.beta.kubernetes.io/is-default-class": "true",
123 }
124 for sc in storage_classes.items:
125 if not selected_sc:
126 # Select the first storage class in case there is no a default-class
127 selected_sc = sc.metadata.name
128 annotations = sc.metadata.annotations or {}
129 if any(
130 k in annotations and annotations[k] == v
131 for k, v in default_sc_annotations.items()
132 ):
133 # Default storage
134 selected_sc = sc.metadata.name
135 break
136 return selected_sc
137
138 def create_cluster_role(
139 self,
140 name: str,
141 labels: Dict[str, str],
142 namespace: str = "kube-system",
143 ):
144 """
145 Create a cluster role
146
147 :param: name: Name of the cluster role
148 :param: labels: Labels for cluster role metadata
149 :param: namespace: Kubernetes namespace for cluster role metadata
150 Default: kube-system
151 """
152 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
153 field_selector="metadata.name={}".format(name)
154 )
155
156 if len(cluster_roles.items) > 0:
157 raise Exception(
158 "Cluster role with metadata.name={} already exists".format(name)
159 )
160
161 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
162 # Cluster role
163 cluster_role = V1ClusterRole(
164 metadata=metadata,
165 rules=[
166 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
167 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
168 ],
169 )
170
171 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
172
173 def delete_cluster_role(self, name: str):
174 """
175 Delete a cluster role
176
177 :param: name: Name of the cluster role
178 """
179 self.clients[RBAC_CLIENT].delete_cluster_role(name)
180
181 def create_service_account(
182 self,
183 name: str,
184 labels: Dict[str, str],
185 namespace: str = "kube-system",
186 ):
187 """
188 Create a service account
189
190 :param: name: Name of the service account
191 :param: labels: Labels for service account metadata
192 :param: namespace: Kubernetes namespace for service account metadata
193 Default: kube-system
194 """
195 service_accounts = self.clients[CORE_CLIENT].list_namespaced_service_account(
196 namespace, field_selector="metadata.name={}".format(name)
197 )
198 if len(service_accounts.items) > 0:
199 raise Exception(
200 "Service account with metadata.name={} already exists".format(name)
201 )
202
203 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
204 service_account = V1ServiceAccount(metadata=metadata)
205
206 self.clients[CORE_CLIENT].create_namespaced_service_account(
207 namespace, service_account
208 )
209
210 def delete_service_account(self, name: str, namespace: str = "kube-system"):
211 """
212 Delete a service account
213
214 :param: name: Name of the service account
215 :param: namespace: Kubernetes namespace for service account metadata
216 Default: kube-system
217 """
218 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
219
220 def create_cluster_role_binding(
221 self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
222 ):
223 """
224 Create a cluster role binding
225
226 :param: name: Name of the cluster role
227 :param: labels: Labels for cluster role binding metadata
228 :param: namespace: Kubernetes namespace for cluster role binding metadata
229 Default: kube-system
230 """
231 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
232 field_selector="metadata.name={}".format(name)
233 )
234 if len(role_bindings.items) > 0:
235 raise Exception("Generated rbac id already exists")
236
237 role_binding = V1ClusterRoleBinding(
238 metadata=V1ObjectMeta(name=name, labels=labels),
239 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
240 subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
241 )
242 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
243
244 def delete_cluster_role_binding(self, name: str):
245 """
246 Delete a cluster role binding
247
248 :param: name: Name of the cluster role binding
249 """
250 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
251
252 @retry(
253 attempts=10,
254 delay=1,
255 fallback=Exception("Failed getting the secret from service account"),
256 )
257 async def get_secret_data(
258 self, name: str, namespace: str = "kube-system"
259 ) -> (str, str):
260 """
261 Get secret data
262
263 :param: name: Name of the secret data
264 :param: namespace: Name of the namespace where the secret is stored
265
266 :return: Tuple with the token and client certificate
267 """
268 v1_core = self.clients[CORE_CLIENT]
269
270 secret_name = None
271
272 service_accounts = v1_core.list_namespaced_service_account(
273 namespace, field_selector="metadata.name={}".format(name)
274 )
275 if len(service_accounts.items) == 0:
276 raise Exception(
277 "Service account not found with metadata.name={}".format(name)
278 )
279 service_account = service_accounts.items[0]
280 if service_account.secrets and len(service_account.secrets) > 0:
281 secret_name = service_account.secrets[0].name
282 if not secret_name:
283 raise Exception(
284 "Failed getting the secret from service account {}".format(name)
285 )
286 secret = v1_core.list_namespaced_secret(
287 namespace, field_selector="metadata.name={}".format(secret_name)
288 ).items[0]
289
290 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
291 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
292
293 return (
294 base64.b64decode(token).decode("utf-8"),
295 base64.b64decode(client_certificate_data).decode("utf-8"),
296 )