1 |
|
# Copyright 2020 Canonical Ltd. |
2 |
|
# |
3 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
# you may not use this file except in compliance with the License. |
5 |
|
# You may obtain a copy of the License at |
6 |
|
# |
7 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
# |
9 |
|
# Unless required by applicable law or agreed to in writing, software |
10 |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 |
|
# See the License for the specific language governing permissions and |
13 |
|
# limitations under the License. |
14 |
|
|
15 |
1 |
import base64 |
16 |
1 |
import logging |
17 |
1 |
from typing import Dict |
18 |
1 |
import typing |
19 |
1 |
import uuid |
20 |
1 |
import json |
21 |
|
|
22 |
1 |
from distutils.version import LooseVersion |
23 |
|
|
24 |
1 |
from kubernetes import client, config |
25 |
1 |
from kubernetes.client.api import VersionApi |
26 |
1 |
from kubernetes.client.models import ( |
27 |
|
V1ClusterRole, |
28 |
|
V1Role, |
29 |
|
V1ObjectMeta, |
30 |
|
V1PolicyRule, |
31 |
|
V1ServiceAccount, |
32 |
|
V1ClusterRoleBinding, |
33 |
|
V1RoleBinding, |
34 |
|
V1RoleRef, |
35 |
|
V1Subject, |
36 |
|
V1Secret, |
37 |
|
V1SecretReference, |
38 |
|
V1Namespace, |
39 |
|
) |
40 |
1 |
from kubernetes.client.rest import ApiException |
41 |
1 |
from n2vc.libjuju import retry_callback |
42 |
1 |
from retrying_async import retry |
43 |
|
|
44 |
|
|
45 |
1 |
SERVICE_ACCOUNT_TOKEN_KEY = "token" |
46 |
1 |
SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt" |
47 |
|
# clients |
48 |
1 |
CORE_CLIENT = "core_v1" |
49 |
1 |
RBAC_CLIENT = "rbac_v1" |
50 |
1 |
STORAGE_CLIENT = "storage_v1" |
51 |
1 |
CUSTOM_OBJECT_CLIENT = "custom_object" |
52 |
|
|
53 |
|
|
54 |
1 |
class Kubectl: |
55 |
1 |
def __init__(self, config_file=None): |
56 |
1 |
config.load_kube_config(config_file=config_file) |
57 |
1 |
self._clients = { |
58 |
|
CORE_CLIENT: client.CoreV1Api(), |
59 |
|
RBAC_CLIENT: client.RbacAuthorizationV1Api(), |
60 |
|
STORAGE_CLIENT: client.StorageV1Api(), |
61 |
|
CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(), |
62 |
|
} |
63 |
1 |
self._configuration = config.kube_config.Configuration.get_default_copy() |
64 |
1 |
self.logger = logging.getLogger("Kubectl") |
65 |
|
|
66 |
1 |
@property |
67 |
1 |
def configuration(self): |
68 |
1 |
return self._configuration |
69 |
|
|
70 |
1 |
@property |
71 |
1 |
def clients(self): |
72 |
1 |
return self._clients |
73 |
|
|
74 |
1 |
def get_services( |
75 |
|
self, |
76 |
|
field_selector: str = None, |
77 |
|
label_selector: str = None, |
78 |
|
) -> typing.List[typing.Dict]: |
79 |
|
""" |
80 |
|
Get Service list from a namespace |
81 |
|
|
82 |
|
:param: field_selector: Kubernetes field selector for the namespace |
83 |
|
:param: label_selector: Kubernetes label selector for the namespace |
84 |
|
|
85 |
|
:return: List of the services matching the selectors specified |
86 |
|
""" |
87 |
1 |
kwargs = {} |
88 |
1 |
if field_selector: |
89 |
1 |
kwargs["field_selector"] = field_selector |
90 |
1 |
if label_selector: |
91 |
1 |
kwargs["label_selector"] = label_selector |
92 |
1 |
try: |
93 |
1 |
result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs) |
94 |
1 |
return [ |
95 |
|
{ |
96 |
|
"name": i.metadata.name, |
97 |
|
"cluster_ip": i.spec.cluster_ip, |
98 |
|
"type": i.spec.type, |
99 |
|
"ports": [ |
100 |
|
{ |
101 |
|
"name": p.name, |
102 |
|
"node_port": p.node_port, |
103 |
|
"port": p.port, |
104 |
|
"protocol": p.protocol, |
105 |
|
"target_port": p.target_port, |
106 |
|
} |
107 |
|
for p in i.spec.ports |
108 |
|
] |
109 |
|
if i.spec.ports |
110 |
|
else [], |
111 |
|
"external_ip": [i.ip for i in i.status.load_balancer.ingress] |
112 |
|
if i.status.load_balancer.ingress |
113 |
|
else None, |
114 |
|
} |
115 |
|
for i in result.items |
116 |
|
] |
117 |
1 |
except ApiException as e: |
118 |
1 |
self.logger.error("Error calling get services: {}".format(e)) |
119 |
1 |
raise e |
120 |
|
|
121 |
1 |
def get_default_storage_class(self) -> str: |
122 |
|
""" |
123 |
|
Default storage class |
124 |
|
|
125 |
|
:return: Returns the default storage class name, if exists. |
126 |
|
If not, it returns the first storage class. |
127 |
|
If there are not storage classes, returns None |
128 |
|
""" |
129 |
1 |
storage_classes = self.clients[STORAGE_CLIENT].list_storage_class() |
130 |
1 |
selected_sc = None |
131 |
1 |
default_sc_annotations = { |
132 |
|
"storageclass.kubernetes.io/is-default-class": "true", |
133 |
|
# Older clusters still use the beta annotation. |
134 |
|
"storageclass.beta.kubernetes.io/is-default-class": "true", |
135 |
|
} |
136 |
1 |
for sc in storage_classes.items: |
137 |
1 |
if not selected_sc: |
138 |
|
# Select the first storage class in case there is no a default-class |
139 |
1 |
selected_sc = sc.metadata.name |
140 |
1 |
annotations = sc.metadata.annotations or {} |
141 |
1 |
if any( |
142 |
|
k in annotations and annotations[k] == v |
143 |
|
for k, v in default_sc_annotations.items() |
144 |
|
): |
145 |
|
# Default storage |
146 |
1 |
selected_sc = sc.metadata.name |
147 |
1 |
break |
148 |
1 |
return selected_sc |
149 |
|
|
150 |
1 |
def create_cluster_role( |
151 |
|
self, |
152 |
|
name: str, |
153 |
|
labels: Dict[str, str], |
154 |
|
namespace: str = "kube-system", |
155 |
|
): |
156 |
|
""" |
157 |
|
Create a cluster role |
158 |
|
|
159 |
|
:param: name: Name of the cluster role |
160 |
|
:param: labels: Labels for cluster role metadata |
161 |
|
:param: namespace: Kubernetes namespace for cluster role metadata |
162 |
|
Default: kube-system |
163 |
|
""" |
164 |
0 |
cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role( |
165 |
|
field_selector="metadata.name={}".format(name) |
166 |
|
) |
167 |
|
|
168 |
0 |
if len(cluster_roles.items) > 0: |
169 |
0 |
raise Exception("Role with metadata.name={} already exists".format(name)) |
170 |
|
|
171 |
0 |
metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) |
172 |
|
# Cluster role |
173 |
0 |
cluster_role = V1ClusterRole( |
174 |
|
metadata=metadata, |
175 |
|
rules=[ |
176 |
|
V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]), |
177 |
|
V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]), |
178 |
|
], |
179 |
|
) |
180 |
|
|
181 |
0 |
self.clients[RBAC_CLIENT].create_cluster_role(cluster_role) |
182 |
|
|
183 |
1 |
async def create_role( |
184 |
|
self, |
185 |
|
name: str, |
186 |
|
labels: Dict[str, str], |
187 |
|
api_groups: list, |
188 |
|
resources: list, |
189 |
|
verbs: list, |
190 |
|
namespace: str, |
191 |
|
): |
192 |
|
""" |
193 |
|
Create a role with one PolicyRule |
194 |
|
|
195 |
|
:param: name: Name of the namespaced Role |
196 |
|
:param: labels: Labels for namespaced Role metadata |
197 |
|
:param: api_groups: List with api-groups allowed in the policy rule |
198 |
|
:param: resources: List with resources allowed in the policy rule |
199 |
|
:param: verbs: List with verbs allowed in the policy rule |
200 |
|
:param: namespace: Kubernetes namespace for Role metadata |
201 |
|
|
202 |
|
:return: None |
203 |
|
""" |
204 |
|
|
205 |
1 |
roles = self.clients[RBAC_CLIENT].list_namespaced_role( |
206 |
|
namespace, field_selector="metadata.name={}".format(name) |
207 |
|
) |
208 |
|
|
209 |
1 |
if len(roles.items) > 0: |
210 |
1 |
raise Exception("Role with metadata.name={} already exists".format(name)) |
211 |
|
|
212 |
0 |
metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) |
213 |
|
|
214 |
0 |
role = V1Role( |
215 |
|
metadata=metadata, |
216 |
|
rules=[ |
217 |
|
V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs), |
218 |
|
], |
219 |
|
) |
220 |
|
|
221 |
0 |
self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role) |
222 |
|
|
223 |
1 |
def delete_cluster_role(self, name: str): |
224 |
|
""" |
225 |
|
Delete a cluster role |
226 |
|
|
227 |
|
:param: name: Name of the cluster role |
228 |
|
""" |
229 |
0 |
self.clients[RBAC_CLIENT].delete_cluster_role(name) |
230 |
|
|
231 |
1 |
def _get_kubectl_version(self): |
232 |
1 |
version = VersionApi().get_code() |
233 |
1 |
return "{}.{}".format(version.major, version.minor) |
234 |
|
|
235 |
1 |
def _need_to_create_new_secret(self): |
236 |
1 |
min_k8s_version = "1.24" |
237 |
1 |
current_k8s_version = self._get_kubectl_version() |
238 |
1 |
return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version) |
239 |
|
|
240 |
1 |
def _get_secret_name(self, service_account_name: str): |
241 |
1 |
random_alphanum = str(uuid.uuid4())[:5] |
242 |
1 |
return "{}-token-{}".format(service_account_name, random_alphanum) |
243 |
|
|
244 |
1 |
def _create_service_account_secret( |
245 |
|
self, service_account_name: str, namespace: str, secret_name: str |
246 |
|
): |
247 |
|
""" |
248 |
|
Create a secret for the service account. K8s version >= 1.24 |
249 |
|
|
250 |
|
:param: service_account_name: Name of the service account |
251 |
|
:param: namespace: Kubernetes namespace for service account metadata |
252 |
|
:param: secret_name: Name of the secret |
253 |
|
""" |
254 |
1 |
v1_core = self.clients[CORE_CLIENT] |
255 |
1 |
secrets = v1_core.list_namespaced_secret( |
256 |
|
namespace, field_selector="metadata.name={}".format(secret_name) |
257 |
|
).items |
258 |
|
|
259 |
1 |
if len(secrets) > 0: |
260 |
1 |
raise Exception( |
261 |
|
"Secret with metadata.name={} already exists".format(secret_name) |
262 |
|
) |
263 |
|
|
264 |
1 |
annotations = {"kubernetes.io/service-account.name": service_account_name} |
265 |
1 |
metadata = V1ObjectMeta( |
266 |
|
name=secret_name, namespace=namespace, annotations=annotations |
267 |
|
) |
268 |
1 |
type = "kubernetes.io/service-account-token" |
269 |
1 |
secret = V1Secret(metadata=metadata, type=type) |
270 |
1 |
v1_core.create_namespaced_secret(namespace, secret) |
271 |
|
|
272 |
1 |
def _get_secret_reference_list(self, namespace: str, secret_name: str): |
273 |
|
""" |
274 |
|
Return a secret reference list with one secret. |
275 |
|
K8s version >= 1.24 |
276 |
|
|
277 |
|
:param: namespace: Kubernetes namespace for service account metadata |
278 |
|
:param: secret_name: Name of the secret |
279 |
|
:rtype: list[V1SecretReference] |
280 |
|
""" |
281 |
1 |
return [V1SecretReference(name=secret_name, namespace=namespace)] |
282 |
|
|
283 |
1 |
def create_service_account( |
284 |
|
self, |
285 |
|
name: str, |
286 |
|
labels: Dict[str, str], |
287 |
|
namespace: str = "kube-system", |
288 |
|
): |
289 |
|
""" |
290 |
|
Create a service account |
291 |
|
|
292 |
|
:param: name: Name of the service account |
293 |
|
:param: labels: Labels for service account metadata |
294 |
|
:param: namespace: Kubernetes namespace for service account metadata |
295 |
|
Default: kube-system |
296 |
|
""" |
297 |
1 |
v1_core = self.clients[CORE_CLIENT] |
298 |
1 |
service_accounts = v1_core.list_namespaced_service_account( |
299 |
|
namespace, field_selector="metadata.name={}".format(name) |
300 |
|
) |
301 |
1 |
if len(service_accounts.items) > 0: |
302 |
1 |
raise Exception( |
303 |
|
"Service account with metadata.name={} already exists".format(name) |
304 |
|
) |
305 |
|
|
306 |
1 |
metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) |
307 |
|
|
308 |
1 |
if self._need_to_create_new_secret(): |
309 |
1 |
secret_name = self._get_secret_name(name) |
310 |
1 |
secrets = self._get_secret_reference_list(namespace, secret_name) |
311 |
1 |
service_account = V1ServiceAccount(metadata=metadata, secrets=secrets) |
312 |
1 |
v1_core.create_namespaced_service_account(namespace, service_account) |
313 |
1 |
self._create_service_account_secret(name, namespace, secret_name) |
314 |
|
else: |
315 |
1 |
service_account = V1ServiceAccount(metadata=metadata) |
316 |
1 |
v1_core.create_namespaced_service_account(namespace, service_account) |
317 |
|
|
318 |
1 |
def delete_service_account(self, name: str, namespace: str = "kube-system"): |
319 |
|
""" |
320 |
|
Delete a service account |
321 |
|
|
322 |
|
:param: name: Name of the service account |
323 |
|
:param: namespace: Kubernetes namespace for service account metadata |
324 |
|
Default: kube-system |
325 |
|
""" |
326 |
0 |
self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace) |
327 |
|
|
328 |
1 |
def create_cluster_role_binding( |
329 |
|
self, name: str, labels: Dict[str, str], namespace: str = "kube-system" |
330 |
|
): |
331 |
|
""" |
332 |
|
Create a cluster role binding |
333 |
|
|
334 |
|
:param: name: Name of the cluster role |
335 |
|
:param: labels: Labels for cluster role binding metadata |
336 |
|
:param: namespace: Kubernetes namespace for cluster role binding metadata |
337 |
|
Default: kube-system |
338 |
|
""" |
339 |
0 |
role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding( |
340 |
|
field_selector="metadata.name={}".format(name) |
341 |
|
) |
342 |
0 |
if len(role_bindings.items) > 0: |
343 |
0 |
raise Exception("Generated rbac id already exists") |
344 |
|
|
345 |
0 |
role_binding = V1ClusterRoleBinding( |
346 |
|
metadata=V1ObjectMeta(name=name, labels=labels), |
347 |
|
role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""), |
348 |
|
subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)], |
349 |
|
) |
350 |
0 |
self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding) |
351 |
|
|
352 |
1 |
async def create_role_binding( |
353 |
|
self, |
354 |
|
name: str, |
355 |
|
role_name: str, |
356 |
|
sa_name: str, |
357 |
|
labels: Dict[str, str], |
358 |
|
namespace: str, |
359 |
|
): |
360 |
|
""" |
361 |
|
Create a cluster role binding |
362 |
|
|
363 |
|
:param: name: Name of the namespaced Role Binding |
364 |
|
:param: role_name: Name of the namespaced Role to be bound |
365 |
|
:param: sa_name: Name of the Service Account to be bound |
366 |
|
:param: labels: Labels for Role Binding metadata |
367 |
|
:param: namespace: Kubernetes namespace for Role Binding metadata |
368 |
|
|
369 |
|
:return: None |
370 |
|
""" |
371 |
1 |
role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding( |
372 |
|
namespace, field_selector="metadata.name={}".format(name) |
373 |
|
) |
374 |
1 |
if len(role_bindings.items) > 0: |
375 |
1 |
raise Exception( |
376 |
|
"Role Binding with metadata.name={} already exists".format(name) |
377 |
|
) |
378 |
|
|
379 |
0 |
role_binding = V1RoleBinding( |
380 |
|
metadata=V1ObjectMeta(name=name, labels=labels), |
381 |
|
role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""), |
382 |
|
subjects=[ |
383 |
|
V1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace) |
384 |
|
], |
385 |
|
) |
386 |
0 |
self.clients[RBAC_CLIENT].create_namespaced_role_binding( |
387 |
|
namespace, role_binding |
388 |
|
) |
389 |
|
|
390 |
1 |
def delete_cluster_role_binding(self, name: str): |
391 |
|
""" |
392 |
|
Delete a cluster role binding |
393 |
|
|
394 |
|
:param: name: Name of the cluster role binding |
395 |
|
""" |
396 |
0 |
self.clients[RBAC_CLIENT].delete_cluster_role_binding(name) |
397 |
|
|
398 |
1 |
@retry( |
399 |
|
attempts=10, |
400 |
|
delay=1, |
401 |
|
fallback=Exception("Failed getting the secret from service account"), |
402 |
|
callback=retry_callback, |
403 |
|
) |
404 |
1 |
async def get_secret_data( |
405 |
|
self, name: str, namespace: str = "kube-system" |
406 |
|
) -> (str, str): |
407 |
|
""" |
408 |
|
Get secret data |
409 |
|
|
410 |
|
:param: name: Name of the secret data |
411 |
|
:param: namespace: Name of the namespace where the secret is stored |
412 |
|
|
413 |
|
:return: Tuple with the token and client certificate |
414 |
|
""" |
415 |
0 |
v1_core = self.clients[CORE_CLIENT] |
416 |
|
|
417 |
0 |
secret_name = None |
418 |
|
|
419 |
0 |
service_accounts = v1_core.list_namespaced_service_account( |
420 |
|
namespace, field_selector="metadata.name={}".format(name) |
421 |
|
) |
422 |
0 |
if len(service_accounts.items) == 0: |
423 |
0 |
raise Exception( |
424 |
|
"Service account not found with metadata.name={}".format(name) |
425 |
|
) |
426 |
0 |
service_account = service_accounts.items[0] |
427 |
0 |
if service_account.secrets and len(service_account.secrets) > 0: |
428 |
0 |
secret_name = service_account.secrets[0].name |
429 |
0 |
if not secret_name: |
430 |
0 |
raise Exception( |
431 |
|
"Failed getting the secret from service account {}".format(name) |
432 |
|
) |
433 |
|
# TODO: refactor to use get_secret_content |
434 |
0 |
secret = v1_core.list_namespaced_secret( |
435 |
|
namespace, field_selector="metadata.name={}".format(secret_name) |
436 |
|
).items[0] |
437 |
|
|
438 |
0 |
token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY] |
439 |
0 |
client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY] |
440 |
|
|
441 |
0 |
return ( |
442 |
|
base64.b64decode(token).decode("utf-8"), |
443 |
|
base64.b64decode(client_certificate_data).decode("utf-8"), |
444 |
|
) |
445 |
|
|
446 |
1 |
@retry( |
447 |
|
attempts=10, |
448 |
|
delay=1, |
449 |
|
fallback=Exception("Failed getting data from the secret"), |
450 |
|
) |
451 |
1 |
async def get_secret_content( |
452 |
|
self, |
453 |
|
name: str, |
454 |
|
namespace: str, |
455 |
|
) -> dict: |
456 |
|
""" |
457 |
|
Get secret data |
458 |
|
|
459 |
|
:param: name: Name of the secret |
460 |
|
:param: namespace: Name of the namespace where the secret is stored |
461 |
|
|
462 |
|
:return: Dictionary with secret's data |
463 |
|
""" |
464 |
1 |
v1_core = self.clients[CORE_CLIENT] |
465 |
|
|
466 |
1 |
secret = v1_core.read_namespaced_secret(name, namespace) |
467 |
|
|
468 |
1 |
return secret.data |
469 |
|
|
470 |
1 |
@retry( |
471 |
|
attempts=10, |
472 |
|
delay=1, |
473 |
|
fallback=Exception("Failed creating the secret"), |
474 |
|
) |
475 |
1 |
async def create_secret( |
476 |
|
self, name: str, data: dict, namespace: str, secret_type: str |
477 |
|
): |
478 |
|
""" |
479 |
|
Get secret data |
480 |
|
|
481 |
|
:param: name: Name of the secret |
482 |
|
:param: data: Dict with data content. Values must be already base64 encoded |
483 |
|
:param: namespace: Name of the namespace where the secret will be stored |
484 |
|
:param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls |
485 |
|
|
486 |
|
:return: None |
487 |
|
""" |
488 |
0 |
v1_core = self.clients[CORE_CLIENT] |
489 |
0 |
metadata = V1ObjectMeta(name=name, namespace=namespace) |
490 |
0 |
secret = V1Secret(metadata=metadata, data=data, type=secret_type) |
491 |
0 |
v1_core.create_namespaced_secret(namespace, secret) |
492 |
|
|
493 |
1 |
async def create_certificate( |
494 |
|
self, |
495 |
|
namespace: str, |
496 |
|
name: str, |
497 |
|
dns_prefix: str, |
498 |
|
secret_name: str, |
499 |
|
usages: list, |
500 |
|
issuer_name: str, |
501 |
|
): |
502 |
|
""" |
503 |
|
Creates cert-manager certificate object |
504 |
|
|
505 |
|
:param: namespace: Name of the namespace where the certificate and secret is stored |
506 |
|
:param: name: Name of the certificate object |
507 |
|
:param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes |
508 |
|
:param: secret_name: Name of the secret created by cert-manager |
509 |
|
:param: usages: List of X.509 key usages |
510 |
|
:param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object |
511 |
|
|
512 |
|
""" |
513 |
1 |
certificate_body = { |
514 |
|
"apiVersion": "cert-manager.io/v1", |
515 |
|
"kind": "Certificate", |
516 |
|
"metadata": {"name": name, "namespace": namespace}, |
517 |
|
"spec": { |
518 |
|
"secretName": secret_name, |
519 |
|
"privateKey": { |
520 |
|
"rotationPolicy": "Always", |
521 |
|
"algorithm": "ECDSA", |
522 |
|
"size": 256, |
523 |
|
}, |
524 |
|
"duration": "8760h", # 1 Year |
525 |
|
"renewBefore": "2208h", # 9 months |
526 |
|
"subject": {"organizations": ["osm"]}, |
527 |
|
"commonName": "osm", |
528 |
|
"isCA": False, |
529 |
|
"usages": usages, |
530 |
|
"dnsNames": [ |
531 |
|
"{}.{}".format(dns_prefix, namespace), |
532 |
|
"{}.{}.svc".format(dns_prefix, namespace), |
533 |
|
"{}.{}.svc.cluster".format(dns_prefix, namespace), |
534 |
|
"{}.{}.svc.cluster.local".format(dns_prefix, namespace), |
535 |
|
], |
536 |
|
"issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"}, |
537 |
|
}, |
538 |
|
} |
539 |
1 |
client = self.clients[CUSTOM_OBJECT_CLIENT] |
540 |
1 |
try: |
541 |
1 |
client.create_namespaced_custom_object( |
542 |
|
group="cert-manager.io", |
543 |
|
plural="certificates", |
544 |
|
version="v1", |
545 |
|
body=certificate_body, |
546 |
|
namespace=namespace, |
547 |
|
) |
548 |
1 |
except ApiException as e: |
549 |
1 |
info = json.loads(e.body) |
550 |
1 |
if info.get("reason").lower() == "alreadyexists": |
551 |
1 |
self.logger.warning("Certificate already exists: {}".format(e)) |
552 |
|
else: |
553 |
0 |
raise e |
554 |
|
|
555 |
1 |
async def delete_certificate(self, namespace, object_name): |
556 |
1 |
client = self.clients[CUSTOM_OBJECT_CLIENT] |
557 |
1 |
try: |
558 |
1 |
client.delete_namespaced_custom_object( |
559 |
|
group="cert-manager.io", |
560 |
|
plural="certificates", |
561 |
|
version="v1", |
562 |
|
name=object_name, |
563 |
|
namespace=namespace, |
564 |
|
) |
565 |
1 |
except ApiException as e: |
566 |
1 |
info = json.loads(e.body) |
567 |
1 |
if info.get("reason").lower() == "notfound": |
568 |
1 |
self.logger.warning("Certificate already deleted: {}".format(e)) |
569 |
|
else: |
570 |
0 |
raise e |
571 |
|
|
572 |
1 |
@retry( |
573 |
|
attempts=10, |
574 |
|
delay=1, |
575 |
|
fallback=Exception("Failed creating the namespace"), |
576 |
|
) |
577 |
1 |
async def create_namespace(self, name: str, labels: dict = None): |
578 |
|
""" |
579 |
|
Create a namespace |
580 |
|
|
581 |
|
:param: name: Name of the namespace to be created |
582 |
|
:param: labels: Dictionary with labels for the new namespace |
583 |
|
|
584 |
|
""" |
585 |
1 |
v1_core = self.clients[CORE_CLIENT] |
586 |
1 |
metadata = V1ObjectMeta(name=name, labels=labels) |
587 |
1 |
namespace = V1Namespace( |
588 |
|
metadata=metadata, |
589 |
|
) |
590 |
|
|
591 |
1 |
try: |
592 |
1 |
v1_core.create_namespace(namespace) |
593 |
1 |
self.logger.debug("Namespace created: {}".format(name)) |
594 |
1 |
except ApiException as e: |
595 |
1 |
info = json.loads(e.body) |
596 |
1 |
if info.get("reason").lower() == "alreadyexists": |
597 |
1 |
self.logger.warning("Namespace already exists: {}".format(e)) |
598 |
|
else: |
599 |
0 |
raise e |
600 |
|
|
601 |
1 |
@retry( |
602 |
|
attempts=10, |
603 |
|
delay=1, |
604 |
|
fallback=Exception("Failed deleting the namespace"), |
605 |
|
) |
606 |
1 |
async def delete_namespace(self, name: str): |
607 |
|
""" |
608 |
|
Delete a namespace |
609 |
|
|
610 |
|
:param: name: Name of the namespace to be deleted |
611 |
|
|
612 |
|
""" |
613 |
1 |
try: |
614 |
1 |
self.clients[CORE_CLIENT].delete_namespace(name) |
615 |
1 |
except ApiException as e: |
616 |
1 |
if e.reason == "Not Found": |
617 |
0 |
self.logger.warning("Namespace already deleted: {}".format(e)) |