1 |
|
## |
2 |
|
# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. |
3 |
|
# This file is part of OSM |
4 |
|
# All Rights Reserved. |
5 |
|
# |
6 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
7 |
|
# you may not use this file except in compliance with the License. |
8 |
|
# You may obtain a copy of the License at |
9 |
|
# |
10 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
11 |
|
# |
12 |
|
# Unless required by applicable law or agreed to in writing, software |
13 |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
14 |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
15 |
|
# implied. |
16 |
|
# See the License for the specific language governing permissions and |
17 |
|
# limitations under the License. |
18 |
|
# |
19 |
|
# For those usages not covered by the Apache License, Version 2.0 please |
20 |
|
# contact with: nfvlabs@tid.es |
21 |
|
## |
22 |
1 |
import abc |
23 |
1 |
import asyncio |
24 |
1 |
from typing import Union |
25 |
1 |
from shlex import quote |
26 |
1 |
import random |
27 |
1 |
import time |
28 |
1 |
import shlex |
29 |
1 |
import shutil |
30 |
1 |
import stat |
31 |
1 |
import os |
32 |
1 |
import yaml |
33 |
1 |
from uuid import uuid4 |
34 |
1 |
from urllib.parse import urlparse |
35 |
|
|
36 |
1 |
from n2vc.config import EnvironConfig |
37 |
1 |
from n2vc.exceptions import K8sException |
38 |
1 |
from n2vc.k8s_conn import K8sConnector |
39 |
1 |
from n2vc.kubectl import Kubectl |
40 |
|
|
41 |
|
|
42 |
1 |
class K8sHelmBaseConnector(K8sConnector): |
43 |
|
|
44 |
|
""" |
45 |
|
#################################################################################### |
46 |
|
################################### P U B L I C #################################### |
47 |
|
#################################################################################### |
48 |
|
""" |
49 |
|
|
50 |
1 |
service_account = "osm" |
51 |
|
|
52 |
1 |
def __init__( |
53 |
|
self, |
54 |
|
fs: object, |
55 |
|
db: object, |
56 |
|
kubectl_command: str = "/usr/bin/kubectl", |
57 |
|
helm_command: str = "/usr/bin/helm", |
58 |
|
log: object = None, |
59 |
|
on_update_db=None, |
60 |
|
): |
61 |
|
""" |
62 |
|
|
63 |
|
:param fs: file system for kubernetes and helm configuration |
64 |
|
:param db: database object to write current operation status |
65 |
|
:param kubectl_command: path to kubectl executable |
66 |
|
:param helm_command: path to helm executable |
67 |
|
:param log: logger |
68 |
|
:param on_update_db: callback called when k8s connector updates database |
69 |
|
""" |
70 |
|
|
71 |
|
# parent class |
72 |
1 |
K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db) |
73 |
|
|
74 |
1 |
self.log.info("Initializing K8S Helm connector") |
75 |
|
|
76 |
1 |
self.config = EnvironConfig() |
77 |
|
# random numbers for release name generation |
78 |
1 |
random.seed(time.time()) |
79 |
|
|
80 |
|
# the file system |
81 |
1 |
self.fs = fs |
82 |
|
|
83 |
|
# exception if kubectl is not installed |
84 |
1 |
self.kubectl_command = kubectl_command |
85 |
1 |
self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True) |
86 |
|
|
87 |
|
# exception if helm is not installed |
88 |
1 |
self._helm_command = helm_command |
89 |
1 |
self._check_file_exists(filename=helm_command, exception_if_not_exists=True) |
90 |
|
|
91 |
|
# obtain stable repo url from config or apply default |
92 |
1 |
self._stable_repo_url = self.config.get("stablerepourl") |
93 |
1 |
if self._stable_repo_url == "None": |
94 |
0 |
self._stable_repo_url = None |
95 |
|
|
96 |
|
# Lock to avoid concurrent execution of helm commands |
97 |
1 |
self.cmd_lock = asyncio.Lock() |
98 |
|
|
99 |
1 |
def _get_namespace(self, cluster_uuid: str) -> str: |
100 |
|
""" |
101 |
|
Obtains the namespace used by the cluster with the uuid passed by argument |
102 |
|
|
103 |
|
param: cluster_uuid: cluster's uuid |
104 |
|
""" |
105 |
|
|
106 |
|
# first, obtain the cluster corresponding to the uuid passed by argument |
107 |
1 |
k8scluster = self.db.get_one( |
108 |
|
"k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False |
109 |
|
) |
110 |
1 |
return k8scluster.get("namespace") |
111 |
|
|
112 |
1 |
async def init_env( |
113 |
|
self, |
114 |
|
k8s_creds: str, |
115 |
|
namespace: str = "kube-system", |
116 |
|
reuse_cluster_uuid=None, |
117 |
|
**kwargs, |
118 |
|
) -> tuple[str, bool]: |
119 |
|
""" |
120 |
|
It prepares a given K8s cluster environment to run Charts |
121 |
|
|
122 |
|
:param k8s_creds: credentials to access a given K8s cluster, i.e. a valid |
123 |
|
'.kube/config' |
124 |
|
:param namespace: optional namespace to be used for helm. By default, |
125 |
|
'kube-system' will be used |
126 |
|
:param reuse_cluster_uuid: existing cluster uuid for reuse |
127 |
|
:param kwargs: Additional parameters (None yet) |
128 |
|
:return: uuid of the K8s cluster and True if connector has installed some |
129 |
|
software in the cluster |
130 |
|
(on error, an exception will be raised) |
131 |
|
""" |
132 |
|
|
133 |
1 |
if reuse_cluster_uuid: |
134 |
1 |
cluster_id = reuse_cluster_uuid |
135 |
|
else: |
136 |
0 |
cluster_id = str(uuid4()) |
137 |
|
|
138 |
1 |
self.log.debug( |
139 |
|
"Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace) |
140 |
|
) |
141 |
|
|
142 |
1 |
paths, env = self._init_paths_env( |
143 |
|
cluster_name=cluster_id, create_if_not_exist=True |
144 |
|
) |
145 |
1 |
mode = stat.S_IRUSR | stat.S_IWUSR |
146 |
1 |
with open(paths["kube_config"], "w", mode) as f: |
147 |
1 |
f.write(k8s_creds) |
148 |
1 |
os.chmod(paths["kube_config"], 0o600) |
149 |
|
|
150 |
|
# Code with initialization specific of helm version |
151 |
1 |
n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env) |
152 |
|
|
153 |
|
# sync fs with local data |
154 |
1 |
self.fs.reverse_sync(from_path=cluster_id) |
155 |
|
|
156 |
1 |
self.log.info("Cluster {} initialized".format(cluster_id)) |
157 |
|
|
158 |
1 |
return cluster_id, n2vc_installed_sw |
159 |
|
|
160 |
1 |
async def repo_add( |
161 |
|
self, |
162 |
|
cluster_uuid: str, |
163 |
|
name: str, |
164 |
|
url: str, |
165 |
|
repo_type: str = "chart", |
166 |
|
cert: str = None, |
167 |
|
user: str = None, |
168 |
|
password: str = None, |
169 |
|
oci: bool = False, |
170 |
|
): |
171 |
1 |
self.log.debug( |
172 |
|
"Cluster {}, adding {} repository {}. URL: {}".format( |
173 |
|
cluster_uuid, repo_type, name, url |
174 |
|
) |
175 |
|
) |
176 |
|
|
177 |
|
# init_env |
178 |
1 |
paths, env = self._init_paths_env( |
179 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
180 |
|
) |
181 |
|
|
182 |
|
# sync local dir |
183 |
1 |
self.fs.sync(from_path=cluster_uuid) |
184 |
|
|
185 |
1 |
if oci: |
186 |
0 |
if user and password: |
187 |
0 |
host_port = urlparse(url).netloc if url.startswith("oci://") else url |
188 |
|
# helm registry login url |
189 |
0 |
command = "env KUBECONFIG={} {} registry login {}".format( |
190 |
|
paths["kube_config"], self._helm_command, quote(host_port) |
191 |
|
) |
192 |
|
else: |
193 |
0 |
self.log.debug( |
194 |
|
"OCI registry login is not needed for repo: {}".format(name) |
195 |
|
) |
196 |
0 |
return |
197 |
|
else: |
198 |
|
# helm repo add name url |
199 |
1 |
command = "env KUBECONFIG={} {} repo add {} {}".format( |
200 |
|
paths["kube_config"], self._helm_command, quote(name), quote(url) |
201 |
|
) |
202 |
|
|
203 |
1 |
if cert: |
204 |
0 |
temp_cert_file = os.path.join( |
205 |
|
self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt" |
206 |
|
) |
207 |
0 |
os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True) |
208 |
0 |
with open(temp_cert_file, "w") as the_cert: |
209 |
0 |
the_cert.write(cert) |
210 |
0 |
command += " --ca-file {}".format(quote(temp_cert_file)) |
211 |
|
|
212 |
1 |
if user: |
213 |
0 |
command += " --username={}".format(quote(user)) |
214 |
|
|
215 |
1 |
if password: |
216 |
0 |
command += " --password={}".format(quote(password)) |
217 |
|
|
218 |
1 |
self.log.debug("adding repo: {}".format(command)) |
219 |
1 |
await self._local_async_exec( |
220 |
|
command=command, raise_exception_on_error=True, env=env |
221 |
|
) |
222 |
|
|
223 |
1 |
if not oci: |
224 |
|
# helm repo update |
225 |
1 |
command = "env KUBECONFIG={} {} repo update {}".format( |
226 |
|
paths["kube_config"], self._helm_command, quote(name) |
227 |
|
) |
228 |
1 |
self.log.debug("updating repo: {}".format(command)) |
229 |
1 |
await self._local_async_exec( |
230 |
|
command=command, raise_exception_on_error=False, env=env |
231 |
|
) |
232 |
|
|
233 |
|
# sync fs |
234 |
1 |
self.fs.reverse_sync(from_path=cluster_uuid) |
235 |
|
|
236 |
1 |
async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"): |
237 |
1 |
self.log.debug( |
238 |
|
"Cluster {}, updating {} repository {}".format( |
239 |
|
cluster_uuid, repo_type, name |
240 |
|
) |
241 |
|
) |
242 |
|
|
243 |
|
# init_env |
244 |
1 |
paths, env = self._init_paths_env( |
245 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
246 |
|
) |
247 |
|
|
248 |
|
# sync local dir |
249 |
1 |
self.fs.sync(from_path=cluster_uuid) |
250 |
|
|
251 |
|
# helm repo update |
252 |
1 |
command = "{} repo update {}".format(self._helm_command, quote(name)) |
253 |
1 |
self.log.debug("updating repo: {}".format(command)) |
254 |
1 |
await self._local_async_exec( |
255 |
|
command=command, raise_exception_on_error=False, env=env |
256 |
|
) |
257 |
|
|
258 |
|
# sync fs |
259 |
1 |
self.fs.reverse_sync(from_path=cluster_uuid) |
260 |
|
|
261 |
1 |
async def repo_list(self, cluster_uuid: str) -> list: |
262 |
|
""" |
263 |
|
Get the list of registered repositories |
264 |
|
|
265 |
|
:return: list of registered repositories: [ (name, url) .... ] |
266 |
|
""" |
267 |
|
|
268 |
1 |
self.log.debug("list repositories for cluster {}".format(cluster_uuid)) |
269 |
|
|
270 |
|
# config filename |
271 |
1 |
paths, env = self._init_paths_env( |
272 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
273 |
|
) |
274 |
|
|
275 |
|
# sync local dir |
276 |
1 |
self.fs.sync(from_path=cluster_uuid) |
277 |
|
|
278 |
1 |
command = "env KUBECONFIG={} {} repo list --output yaml".format( |
279 |
|
paths["kube_config"], self._helm_command |
280 |
|
) |
281 |
|
|
282 |
|
# Set exception to false because if there are no repos just want an empty list |
283 |
1 |
output, _rc = await self._local_async_exec( |
284 |
|
command=command, raise_exception_on_error=False, env=env |
285 |
|
) |
286 |
|
|
287 |
|
# sync fs |
288 |
1 |
self.fs.reverse_sync(from_path=cluster_uuid) |
289 |
|
|
290 |
1 |
if _rc == 0: |
291 |
1 |
if output and len(output) > 0: |
292 |
0 |
repos = yaml.load(output, Loader=yaml.SafeLoader) |
293 |
|
# unify format between helm2 and helm3 setting all keys lowercase |
294 |
0 |
return self._lower_keys_list(repos) |
295 |
|
else: |
296 |
1 |
return [] |
297 |
|
else: |
298 |
0 |
return [] |
299 |
|
|
300 |
1 |
async def repo_remove(self, cluster_uuid: str, name: str): |
301 |
1 |
self.log.debug( |
302 |
|
"remove {} repositories for cluster {}".format(name, cluster_uuid) |
303 |
|
) |
304 |
|
|
305 |
|
# init env, paths |
306 |
1 |
paths, env = self._init_paths_env( |
307 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
308 |
|
) |
309 |
|
|
310 |
|
# sync local dir |
311 |
1 |
self.fs.sync(from_path=cluster_uuid) |
312 |
|
|
313 |
1 |
command = "env KUBECONFIG={} {} repo remove {}".format( |
314 |
|
paths["kube_config"], self._helm_command, quote(name) |
315 |
|
) |
316 |
1 |
await self._local_async_exec( |
317 |
|
command=command, raise_exception_on_error=True, env=env |
318 |
|
) |
319 |
|
|
320 |
|
# sync fs |
321 |
1 |
self.fs.reverse_sync(from_path=cluster_uuid) |
322 |
|
|
323 |
1 |
async def reset( |
324 |
|
self, |
325 |
|
cluster_uuid: str, |
326 |
|
force: bool = False, |
327 |
|
uninstall_sw: bool = False, |
328 |
|
**kwargs, |
329 |
|
) -> bool: |
330 |
|
"""Reset a cluster |
331 |
|
|
332 |
|
Resets the Kubernetes cluster by removing the helm deployment that represents it. |
333 |
|
|
334 |
|
:param cluster_uuid: The UUID of the cluster to reset |
335 |
|
:param force: Boolean to force the reset |
336 |
|
:param uninstall_sw: Boolean to force the reset |
337 |
|
:param kwargs: Additional parameters (None yet) |
338 |
|
:return: Returns True if successful or raises an exception. |
339 |
|
""" |
340 |
1 |
namespace = self._get_namespace(cluster_uuid=cluster_uuid) |
341 |
1 |
self.log.debug( |
342 |
|
"Resetting K8s environment. cluster uuid: {} uninstall={}".format( |
343 |
|
cluster_uuid, uninstall_sw |
344 |
|
) |
345 |
|
) |
346 |
|
|
347 |
|
# sync local dir |
348 |
1 |
self.fs.sync(from_path=cluster_uuid) |
349 |
|
|
350 |
|
# uninstall releases if needed. |
351 |
1 |
if uninstall_sw: |
352 |
1 |
releases = await self.instances_list(cluster_uuid=cluster_uuid) |
353 |
1 |
if len(releases) > 0: |
354 |
1 |
if force: |
355 |
1 |
for r in releases: |
356 |
1 |
try: |
357 |
1 |
kdu_instance = r.get("name") |
358 |
1 |
chart = r.get("chart") |
359 |
1 |
self.log.debug( |
360 |
|
"Uninstalling {} -> {}".format(chart, kdu_instance) |
361 |
|
) |
362 |
1 |
await self.uninstall( |
363 |
|
cluster_uuid=cluster_uuid, kdu_instance=kdu_instance |
364 |
|
) |
365 |
0 |
except Exception as e: |
366 |
|
# will not raise exception as it was found |
367 |
|
# that in some cases of previously installed helm releases it |
368 |
|
# raised an error |
369 |
0 |
self.log.warn( |
370 |
|
"Error uninstalling release {}: {}".format( |
371 |
|
kdu_instance, e |
372 |
|
) |
373 |
|
) |
374 |
|
else: |
375 |
0 |
msg = ( |
376 |
|
"Cluster uuid: {} has releases and not force. Leaving K8s helm environment" |
377 |
|
).format(cluster_uuid) |
378 |
0 |
self.log.warn(msg) |
379 |
0 |
uninstall_sw = ( |
380 |
|
False # Allow to remove k8s cluster without removing Tiller |
381 |
|
) |
382 |
|
|
383 |
1 |
if uninstall_sw: |
384 |
1 |
await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace) |
385 |
|
|
386 |
|
# delete cluster directory |
387 |
1 |
self.log.debug("Removing directory {}".format(cluster_uuid)) |
388 |
1 |
self.fs.file_delete(cluster_uuid, ignore_non_exist=True) |
389 |
|
# Remove also local directorio if still exist |
390 |
1 |
direct = self.fs.path + "/" + cluster_uuid |
391 |
1 |
shutil.rmtree(direct, ignore_errors=True) |
392 |
|
|
393 |
1 |
return True |
394 |
|
|
395 |
1 |
def _is_helm_chart_a_file(self, chart_name: str): |
396 |
1 |
return chart_name.count("/") > 1 |
397 |
|
|
398 |
1 |
@staticmethod |
399 |
1 |
def _is_helm_chart_a_url(chart_name: str): |
400 |
1 |
result = urlparse(chart_name) |
401 |
1 |
return all([result.scheme, result.netloc]) |
402 |
|
|
403 |
1 |
async def _install_impl( |
404 |
|
self, |
405 |
|
cluster_id: str, |
406 |
|
kdu_model: str, |
407 |
|
paths: dict, |
408 |
|
env: dict, |
409 |
|
kdu_instance: str, |
410 |
|
atomic: bool = True, |
411 |
|
timeout: float = 300, |
412 |
|
params: dict = None, |
413 |
|
db_dict: dict = None, |
414 |
|
kdu_name: str = None, |
415 |
|
namespace: str = None, |
416 |
|
): |
417 |
|
# init env, paths |
418 |
1 |
paths, env = self._init_paths_env( |
419 |
|
cluster_name=cluster_id, create_if_not_exist=True |
420 |
|
) |
421 |
|
|
422 |
|
# params to str |
423 |
1 |
params_str, file_to_delete = self._params_to_file_option( |
424 |
|
cluster_id=cluster_id, params=params |
425 |
|
) |
426 |
|
|
427 |
1 |
kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_id) |
428 |
|
|
429 |
1 |
command = self._get_install_command( |
430 |
|
kdu_model, |
431 |
|
kdu_instance, |
432 |
|
namespace, |
433 |
|
params_str, |
434 |
|
version, |
435 |
|
atomic, |
436 |
|
timeout, |
437 |
|
paths["kube_config"], |
438 |
|
) |
439 |
|
|
440 |
1 |
self.log.debug("installing: {}".format(command)) |
441 |
|
|
442 |
1 |
if atomic: |
443 |
|
# exec helm in a task |
444 |
1 |
exec_task = asyncio.ensure_future( |
445 |
|
coro_or_future=self._local_async_exec( |
446 |
|
command=command, raise_exception_on_error=False, env=env |
447 |
|
) |
448 |
|
) |
449 |
|
|
450 |
|
# write status in another task |
451 |
1 |
status_task = asyncio.ensure_future( |
452 |
|
coro_or_future=self._store_status( |
453 |
|
cluster_id=cluster_id, |
454 |
|
kdu_instance=kdu_instance, |
455 |
|
namespace=namespace, |
456 |
|
db_dict=db_dict, |
457 |
|
operation="install", |
458 |
|
) |
459 |
|
) |
460 |
|
|
461 |
|
# wait for execution task |
462 |
1 |
await asyncio.wait([exec_task]) |
463 |
|
|
464 |
|
# cancel status task |
465 |
1 |
status_task.cancel() |
466 |
|
|
467 |
1 |
output, rc = exec_task.result() |
468 |
|
|
469 |
|
else: |
470 |
0 |
output, rc = await self._local_async_exec( |
471 |
|
command=command, raise_exception_on_error=False, env=env |
472 |
|
) |
473 |
|
|
474 |
|
# remove temporal values yaml file |
475 |
1 |
if file_to_delete: |
476 |
0 |
os.remove(file_to_delete) |
477 |
|
|
478 |
|
# write final status |
479 |
1 |
await self._store_status( |
480 |
|
cluster_id=cluster_id, |
481 |
|
kdu_instance=kdu_instance, |
482 |
|
namespace=namespace, |
483 |
|
db_dict=db_dict, |
484 |
|
operation="install", |
485 |
|
) |
486 |
|
|
487 |
1 |
if rc != 0: |
488 |
0 |
msg = "Error executing command: {}\nOutput: {}".format(command, output) |
489 |
0 |
self.log.error(msg) |
490 |
0 |
raise K8sException(msg) |
491 |
|
|
492 |
1 |
async def upgrade( |
493 |
|
self, |
494 |
|
cluster_uuid: str, |
495 |
|
kdu_instance: str, |
496 |
|
kdu_model: str = None, |
497 |
|
atomic: bool = True, |
498 |
|
timeout: float = 300, |
499 |
|
params: dict = None, |
500 |
|
db_dict: dict = None, |
501 |
|
namespace: str = None, |
502 |
|
force: bool = False, |
503 |
|
): |
504 |
1 |
self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid)) |
505 |
|
|
506 |
|
# sync local dir |
507 |
1 |
self.fs.sync(from_path=cluster_uuid) |
508 |
|
|
509 |
|
# look for instance to obtain namespace |
510 |
|
|
511 |
|
# set namespace |
512 |
1 |
if not namespace: |
513 |
1 |
instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) |
514 |
1 |
if not instance_info: |
515 |
0 |
raise K8sException("kdu_instance {} not found".format(kdu_instance)) |
516 |
1 |
namespace = instance_info["namespace"] |
517 |
|
|
518 |
|
# init env, paths |
519 |
1 |
paths, env = self._init_paths_env( |
520 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
521 |
|
) |
522 |
|
|
523 |
|
# sync local dir |
524 |
1 |
self.fs.sync(from_path=cluster_uuid) |
525 |
|
|
526 |
|
# params to str |
527 |
1 |
params_str, file_to_delete = self._params_to_file_option( |
528 |
|
cluster_id=cluster_uuid, params=params |
529 |
|
) |
530 |
|
|
531 |
1 |
kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid) |
532 |
|
|
533 |
1 |
command = self._get_upgrade_command( |
534 |
|
kdu_model, |
535 |
|
kdu_instance, |
536 |
|
namespace, |
537 |
|
params_str, |
538 |
|
version, |
539 |
|
atomic, |
540 |
|
timeout, |
541 |
|
paths["kube_config"], |
542 |
|
force, |
543 |
|
) |
544 |
|
|
545 |
1 |
self.log.debug("upgrading: {}".format(command)) |
546 |
|
|
547 |
1 |
if atomic: |
548 |
|
# exec helm in a task |
549 |
1 |
exec_task = asyncio.ensure_future( |
550 |
|
coro_or_future=self._local_async_exec( |
551 |
|
command=command, raise_exception_on_error=False, env=env |
552 |
|
) |
553 |
|
) |
554 |
|
# write status in another task |
555 |
1 |
status_task = asyncio.ensure_future( |
556 |
|
coro_or_future=self._store_status( |
557 |
|
cluster_id=cluster_uuid, |
558 |
|
kdu_instance=kdu_instance, |
559 |
|
namespace=namespace, |
560 |
|
db_dict=db_dict, |
561 |
|
operation="upgrade", |
562 |
|
) |
563 |
|
) |
564 |
|
|
565 |
|
# wait for execution task |
566 |
1 |
await asyncio.wait([exec_task]) |
567 |
|
|
568 |
|
# cancel status task |
569 |
1 |
status_task.cancel() |
570 |
1 |
output, rc = exec_task.result() |
571 |
|
|
572 |
|
else: |
573 |
0 |
output, rc = await self._local_async_exec( |
574 |
|
command=command, raise_exception_on_error=False, env=env |
575 |
|
) |
576 |
|
|
577 |
|
# remove temporal values yaml file |
578 |
1 |
if file_to_delete: |
579 |
0 |
os.remove(file_to_delete) |
580 |
|
|
581 |
|
# write final status |
582 |
1 |
await self._store_status( |
583 |
|
cluster_id=cluster_uuid, |
584 |
|
kdu_instance=kdu_instance, |
585 |
|
namespace=namespace, |
586 |
|
db_dict=db_dict, |
587 |
|
operation="upgrade", |
588 |
|
) |
589 |
|
|
590 |
1 |
if rc != 0: |
591 |
0 |
msg = "Error executing command: {}\nOutput: {}".format(command, output) |
592 |
0 |
self.log.error(msg) |
593 |
0 |
raise K8sException(msg) |
594 |
|
|
595 |
|
# sync fs |
596 |
1 |
self.fs.reverse_sync(from_path=cluster_uuid) |
597 |
|
|
598 |
|
# return new revision number |
599 |
1 |
instance = await self.get_instance_info( |
600 |
|
cluster_uuid=cluster_uuid, kdu_instance=kdu_instance |
601 |
|
) |
602 |
1 |
if instance: |
603 |
1 |
revision = int(instance.get("revision")) |
604 |
1 |
self.log.debug("New revision: {}".format(revision)) |
605 |
1 |
return revision |
606 |
|
else: |
607 |
0 |
return 0 |
608 |
|
|
609 |
1 |
async def scale( |
610 |
|
self, |
611 |
|
kdu_instance: str, |
612 |
|
scale: int, |
613 |
|
resource_name: str, |
614 |
|
total_timeout: float = 1800, |
615 |
|
cluster_uuid: str = None, |
616 |
|
kdu_model: str = None, |
617 |
|
atomic: bool = True, |
618 |
|
db_dict: dict = None, |
619 |
|
**kwargs, |
620 |
|
): |
621 |
|
"""Scale a resource in a Helm Chart. |
622 |
|
|
623 |
|
Args: |
624 |
|
kdu_instance: KDU instance name |
625 |
|
scale: Scale to which to set the resource |
626 |
|
resource_name: Resource name |
627 |
|
total_timeout: The time, in seconds, to wait |
628 |
|
cluster_uuid: The UUID of the cluster |
629 |
|
kdu_model: The chart reference |
630 |
|
atomic: if set, upgrade process rolls back changes made in case of failed upgrade. |
631 |
|
The --wait flag will be set automatically if --atomic is used |
632 |
|
db_dict: Dictionary for any additional data |
633 |
|
kwargs: Additional parameters |
634 |
|
|
635 |
|
Returns: |
636 |
|
True if successful, False otherwise |
637 |
|
""" |
638 |
|
|
639 |
1 |
debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid) |
640 |
1 |
if resource_name: |
641 |
1 |
debug_mgs = "scaling resource {} in model {} (cluster {})".format( |
642 |
|
resource_name, kdu_model, cluster_uuid |
643 |
|
) |
644 |
|
|
645 |
1 |
self.log.debug(debug_mgs) |
646 |
|
|
647 |
|
# look for instance to obtain namespace |
648 |
|
# get_instance_info function calls the sync command |
649 |
1 |
instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) |
650 |
1 |
if not instance_info: |
651 |
0 |
raise K8sException("kdu_instance {} not found".format(kdu_instance)) |
652 |
|
|
653 |
|
# init env, paths |
654 |
1 |
paths, env = self._init_paths_env( |
655 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
656 |
|
) |
657 |
|
|
658 |
|
# version |
659 |
1 |
kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid) |
660 |
|
|
661 |
1 |
repo_url = await self._find_repo(kdu_model, cluster_uuid) |
662 |
|
|
663 |
1 |
_, replica_str = await self._get_replica_count_url( |
664 |
|
kdu_model, repo_url, resource_name |
665 |
|
) |
666 |
|
|
667 |
1 |
command = self._get_upgrade_scale_command( |
668 |
|
kdu_model, |
669 |
|
kdu_instance, |
670 |
|
instance_info["namespace"], |
671 |
|
scale, |
672 |
|
version, |
673 |
|
atomic, |
674 |
|
replica_str, |
675 |
|
total_timeout, |
676 |
|
resource_name, |
677 |
|
paths["kube_config"], |
678 |
|
) |
679 |
|
|
680 |
1 |
self.log.debug("scaling: {}".format(command)) |
681 |
|
|
682 |
1 |
if atomic: |
683 |
|
# exec helm in a task |
684 |
1 |
exec_task = asyncio.ensure_future( |
685 |
|
coro_or_future=self._local_async_exec( |
686 |
|
command=command, raise_exception_on_error=False, env=env |
687 |
|
) |
688 |
|
) |
689 |
|
# write status in another task |
690 |
1 |
status_task = asyncio.ensure_future( |
691 |
|
coro_or_future=self._store_status( |
692 |
|
cluster_id=cluster_uuid, |
693 |
|
kdu_instance=kdu_instance, |
694 |
|
namespace=instance_info["namespace"], |
695 |
|
db_dict=db_dict, |
696 |
|
operation="scale", |
697 |
|
) |
698 |
|
) |
699 |
|
|
700 |
|
# wait for execution task |
701 |
1 |
await asyncio.wait([exec_task]) |
702 |
|
|
703 |
|
# cancel status task |
704 |
1 |
status_task.cancel() |
705 |
1 |
output, rc = exec_task.result() |
706 |
|
|
707 |
|
else: |
708 |
0 |
output, rc = await self._local_async_exec( |
709 |
|
command=command, raise_exception_on_error=False, env=env |
710 |
|
) |
711 |
|
|
712 |
|
# write final status |
713 |
1 |
await self._store_status( |
714 |
|
cluster_id=cluster_uuid, |
715 |
|
kdu_instance=kdu_instance, |
716 |
|
namespace=instance_info["namespace"], |
717 |
|
db_dict=db_dict, |
718 |
|
operation="scale", |
719 |
|
) |
720 |
|
|
721 |
1 |
if rc != 0: |
722 |
0 |
msg = "Error executing command: {}\nOutput: {}".format(command, output) |
723 |
0 |
self.log.error(msg) |
724 |
0 |
raise K8sException(msg) |
725 |
|
|
726 |
|
# sync fs |
727 |
1 |
self.fs.reverse_sync(from_path=cluster_uuid) |
728 |
|
|
729 |
1 |
return True |
730 |
|
|
731 |
1 |
async def get_scale_count( |
732 |
|
self, |
733 |
|
resource_name: str, |
734 |
|
kdu_instance: str, |
735 |
|
cluster_uuid: str, |
736 |
|
kdu_model: str, |
737 |
|
**kwargs, |
738 |
|
) -> int: |
739 |
|
"""Get a resource scale count. |
740 |
|
|
741 |
|
Args: |
742 |
|
cluster_uuid: The UUID of the cluster |
743 |
|
resource_name: Resource name |
744 |
|
kdu_instance: KDU instance name |
745 |
|
kdu_model: The name or path of an Helm Chart |
746 |
|
kwargs: Additional parameters |
747 |
|
|
748 |
|
Returns: |
749 |
|
Resource instance count |
750 |
|
""" |
751 |
|
|
752 |
0 |
self.log.debug( |
753 |
|
"getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid) |
754 |
|
) |
755 |
|
|
756 |
|
# look for instance to obtain namespace |
757 |
0 |
instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) |
758 |
0 |
if not instance_info: |
759 |
0 |
raise K8sException("kdu_instance {} not found".format(kdu_instance)) |
760 |
|
|
761 |
|
# init env, paths |
762 |
0 |
paths, _ = self._init_paths_env( |
763 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
764 |
|
) |
765 |
|
|
766 |
0 |
replicas = await self._get_replica_count_instance( |
767 |
|
kdu_instance=kdu_instance, |
768 |
|
namespace=instance_info["namespace"], |
769 |
|
kubeconfig=paths["kube_config"], |
770 |
|
resource_name=resource_name, |
771 |
|
) |
772 |
|
|
773 |
0 |
self.log.debug( |
774 |
|
f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}" |
775 |
|
) |
776 |
|
|
777 |
|
# Get default value if scale count is not found from provided values |
778 |
|
# Important note: this piece of code shall only be executed in the first scaling operation, |
779 |
|
# since it is expected that the _get_replica_count_instance is able to obtain the number of |
780 |
|
# replicas when a scale operation was already conducted previously for this KDU/resource! |
781 |
0 |
if replicas is None: |
782 |
0 |
repo_url = await self._find_repo( |
783 |
|
kdu_model=kdu_model, cluster_uuid=cluster_uuid |
784 |
|
) |
785 |
0 |
replicas, _ = await self._get_replica_count_url( |
786 |
|
kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name |
787 |
|
) |
788 |
|
|
789 |
0 |
self.log.debug( |
790 |
|
f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource " |
791 |
|
f"{resource_name} obtained: {replicas}" |
792 |
|
) |
793 |
|
|
794 |
0 |
if replicas is None: |
795 |
0 |
msg = "Replica count not found. Cannot be scaled" |
796 |
0 |
self.log.error(msg) |
797 |
0 |
raise K8sException(msg) |
798 |
|
|
799 |
0 |
return int(replicas) |
800 |
|
|
801 |
1 |
async def rollback( |
802 |
|
self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None |
803 |
|
): |
804 |
1 |
self.log.debug( |
805 |
|
"rollback kdu_instance {} to revision {} from cluster {}".format( |
806 |
|
kdu_instance, revision, cluster_uuid |
807 |
|
) |
808 |
|
) |
809 |
|
|
810 |
|
# sync local dir |
811 |
1 |
self.fs.sync(from_path=cluster_uuid) |
812 |
|
|
813 |
|
# look for instance to obtain namespace |
814 |
1 |
instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) |
815 |
1 |
if not instance_info: |
816 |
0 |
raise K8sException("kdu_instance {} not found".format(kdu_instance)) |
817 |
|
|
818 |
|
# init env, paths |
819 |
1 |
paths, env = self._init_paths_env( |
820 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
821 |
|
) |
822 |
|
|
823 |
|
# sync local dir |
824 |
1 |
self.fs.sync(from_path=cluster_uuid) |
825 |
|
|
826 |
1 |
command = self._get_rollback_command( |
827 |
|
kdu_instance, instance_info["namespace"], revision, paths["kube_config"] |
828 |
|
) |
829 |
|
|
830 |
1 |
self.log.debug("rolling_back: {}".format(command)) |
831 |
|
|
832 |
|
# exec helm in a task |
833 |
1 |
exec_task = asyncio.ensure_future( |
834 |
|
coro_or_future=self._local_async_exec( |
835 |
|
command=command, raise_exception_on_error=False, env=env |
836 |
|
) |
837 |
|
) |
838 |
|
# write status in another task |
839 |
1 |
status_task = asyncio.ensure_future( |
840 |
|
coro_or_future=self._store_status( |
841 |
|
cluster_id=cluster_uuid, |
842 |
|
kdu_instance=kdu_instance, |
843 |
|
namespace=instance_info["namespace"], |
844 |
|
db_dict=db_dict, |
845 |
|
operation="rollback", |
846 |
|
) |
847 |
|
) |
848 |
|
|
849 |
|
# wait for execution task |
850 |
1 |
await asyncio.wait([exec_task]) |
851 |
|
|
852 |
|
# cancel status task |
853 |
1 |
status_task.cancel() |
854 |
|
|
855 |
1 |
output, rc = exec_task.result() |
856 |
|
|
857 |
|
# write final status |
858 |
1 |
await self._store_status( |
859 |
|
cluster_id=cluster_uuid, |
860 |
|
kdu_instance=kdu_instance, |
861 |
|
namespace=instance_info["namespace"], |
862 |
|
db_dict=db_dict, |
863 |
|
operation="rollback", |
864 |
|
) |
865 |
|
|
866 |
1 |
if rc != 0: |
867 |
0 |
msg = "Error executing command: {}\nOutput: {}".format(command, output) |
868 |
0 |
self.log.error(msg) |
869 |
0 |
raise K8sException(msg) |
870 |
|
|
871 |
|
# sync fs |
872 |
1 |
self.fs.reverse_sync(from_path=cluster_uuid) |
873 |
|
|
874 |
|
# return new revision number |
875 |
1 |
instance = await self.get_instance_info( |
876 |
|
cluster_uuid=cluster_uuid, kdu_instance=kdu_instance |
877 |
|
) |
878 |
1 |
if instance: |
879 |
1 |
revision = int(instance.get("revision")) |
880 |
1 |
self.log.debug("New revision: {}".format(revision)) |
881 |
1 |
return revision |
882 |
|
else: |
883 |
0 |
return 0 |
884 |
|
|
885 |
1 |
async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs): |
886 |
|
""" |
887 |
|
Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call |
888 |
|
(this call should happen after all _terminate-config-primitive_ of the VNF |
889 |
|
are invoked). |
890 |
|
|
891 |
|
:param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id |
892 |
|
:param kdu_instance: unique name for the KDU instance to be deleted |
893 |
|
:param kwargs: Additional parameters (None yet) |
894 |
|
:return: True if successful |
895 |
|
""" |
896 |
|
|
897 |
1 |
self.log.debug( |
898 |
|
"uninstall kdu_instance {} from cluster {}".format( |
899 |
|
kdu_instance, cluster_uuid |
900 |
|
) |
901 |
|
) |
902 |
|
|
903 |
|
# sync local dir |
904 |
1 |
self.fs.sync(from_path=cluster_uuid) |
905 |
|
|
906 |
|
# look for instance to obtain namespace |
907 |
1 |
instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) |
908 |
1 |
if not instance_info: |
909 |
0 |
self.log.warning(("kdu_instance {} not found".format(kdu_instance))) |
910 |
0 |
return True |
911 |
|
# init env, paths |
912 |
1 |
paths, env = self._init_paths_env( |
913 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
914 |
|
) |
915 |
|
|
916 |
|
# sync local dir |
917 |
1 |
self.fs.sync(from_path=cluster_uuid) |
918 |
|
|
919 |
1 |
command = self._get_uninstall_command( |
920 |
|
kdu_instance, instance_info["namespace"], paths["kube_config"] |
921 |
|
) |
922 |
1 |
output, _rc = await self._local_async_exec( |
923 |
|
command=command, raise_exception_on_error=True, env=env |
924 |
|
) |
925 |
|
|
926 |
|
# sync fs |
927 |
1 |
self.fs.reverse_sync(from_path=cluster_uuid) |
928 |
|
|
929 |
1 |
return self._output_to_table(output) |
930 |
|
|
931 |
1 |
async def instances_list(self, cluster_uuid: str) -> list: |
932 |
|
""" |
933 |
|
returns a list of deployed releases in a cluster |
934 |
|
|
935 |
|
:param cluster_uuid: the 'cluster' or 'namespace:cluster' |
936 |
|
:return: |
937 |
|
""" |
938 |
|
|
939 |
1 |
self.log.debug("list releases for cluster {}".format(cluster_uuid)) |
940 |
|
|
941 |
|
# sync local dir |
942 |
1 |
self.fs.sync(from_path=cluster_uuid) |
943 |
|
|
944 |
|
# execute internal command |
945 |
1 |
result = await self._instances_list(cluster_uuid) |
946 |
|
|
947 |
|
# sync fs |
948 |
1 |
self.fs.reverse_sync(from_path=cluster_uuid) |
949 |
|
|
950 |
1 |
return result |
951 |
|
|
952 |
1 |
async def get_instance_info(self, cluster_uuid: str, kdu_instance: str): |
953 |
0 |
instances = await self.instances_list(cluster_uuid=cluster_uuid) |
954 |
0 |
for instance in instances: |
955 |
0 |
if instance.get("name") == kdu_instance: |
956 |
0 |
return instance |
957 |
0 |
self.log.debug("Instance {} not found".format(kdu_instance)) |
958 |
0 |
return None |
959 |
|
|
960 |
1 |
async def upgrade_charm( |
961 |
|
self, |
962 |
|
ee_id: str = None, |
963 |
|
path: str = None, |
964 |
|
charm_id: str = None, |
965 |
|
charm_type: str = None, |
966 |
|
timeout: float = None, |
967 |
|
) -> str: |
968 |
|
"""This method upgrade charms in VNFs |
969 |
|
|
970 |
|
Args: |
971 |
|
ee_id: Execution environment id |
972 |
|
path: Local path to the charm |
973 |
|
charm_id: charm-id |
974 |
|
charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm |
975 |
|
timeout: (Float) Timeout for the ns update operation |
976 |
|
|
977 |
|
Returns: |
978 |
|
The output of the update operation if status equals to "completed" |
979 |
|
""" |
980 |
0 |
raise K8sException("KDUs deployed with Helm do not support charm upgrade") |
981 |
|
|
982 |
1 |
async def exec_primitive( |
983 |
|
self, |
984 |
|
cluster_uuid: str = None, |
985 |
|
kdu_instance: str = None, |
986 |
|
primitive_name: str = None, |
987 |
|
timeout: float = 300, |
988 |
|
params: dict = None, |
989 |
|
db_dict: dict = None, |
990 |
|
**kwargs, |
991 |
|
) -> str: |
992 |
|
"""Exec primitive (Juju action) |
993 |
|
|
994 |
|
:param cluster_uuid: The UUID of the cluster or namespace:cluster |
995 |
|
:param kdu_instance: The unique name of the KDU instance |
996 |
|
:param primitive_name: Name of action that will be executed |
997 |
|
:param timeout: Timeout for action execution |
998 |
|
:param params: Dictionary of all the parameters needed for the action |
999 |
|
:db_dict: Dictionary for any additional data |
1000 |
|
:param kwargs: Additional parameters (None yet) |
1001 |
|
|
1002 |
|
:return: Returns the output of the action |
1003 |
|
""" |
1004 |
0 |
raise K8sException( |
1005 |
|
"KDUs deployed with Helm don't support actions " |
1006 |
|
"different from rollback, upgrade and status" |
1007 |
|
) |
1008 |
|
|
1009 |
1 |
async def get_services( |
1010 |
|
self, cluster_uuid: str, kdu_instance: str, namespace: str |
1011 |
|
) -> list: |
1012 |
|
""" |
1013 |
|
Returns a list of services defined for the specified kdu instance. |
1014 |
|
|
1015 |
|
:param cluster_uuid: UUID of a K8s cluster known by OSM |
1016 |
|
:param kdu_instance: unique name for the KDU instance |
1017 |
|
:param namespace: K8s namespace used by the KDU instance |
1018 |
|
:return: If successful, it will return a list of services, Each service |
1019 |
|
can have the following data: |
1020 |
|
- `name` of the service |
1021 |
|
- `type` type of service in the k8 cluster |
1022 |
|
- `ports` List of ports offered by the service, for each port includes at least |
1023 |
|
name, port, protocol |
1024 |
|
- `cluster_ip` Internal ip to be used inside k8s cluster |
1025 |
|
- `external_ip` List of external ips (in case they are available) |
1026 |
|
""" |
1027 |
|
|
1028 |
1 |
self.log.debug( |
1029 |
|
"get_services: cluster_uuid: {}, kdu_instance: {}".format( |
1030 |
|
cluster_uuid, kdu_instance |
1031 |
|
) |
1032 |
|
) |
1033 |
|
|
1034 |
|
# init env, paths |
1035 |
1 |
paths, env = self._init_paths_env( |
1036 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
1037 |
|
) |
1038 |
|
|
1039 |
|
# sync local dir |
1040 |
1 |
self.fs.sync(from_path=cluster_uuid) |
1041 |
|
|
1042 |
|
# get list of services names for kdu |
1043 |
1 |
service_names = await self._get_services( |
1044 |
|
cluster_uuid, kdu_instance, namespace, paths["kube_config"] |
1045 |
|
) |
1046 |
|
|
1047 |
1 |
service_list = [] |
1048 |
1 |
for service in service_names: |
1049 |
1 |
service = await self._get_service(cluster_uuid, service, namespace) |
1050 |
1 |
service_list.append(service) |
1051 |
|
|
1052 |
|
# sync fs |
1053 |
1 |
self.fs.reverse_sync(from_path=cluster_uuid) |
1054 |
|
|
1055 |
1 |
return service_list |
1056 |
|
|
1057 |
1 |
async def get_service( |
1058 |
|
self, cluster_uuid: str, service_name: str, namespace: str |
1059 |
|
) -> object: |
1060 |
1 |
self.log.debug( |
1061 |
|
"get service, service_name: {}, namespace: {}, cluster_uuid: {}".format( |
1062 |
|
service_name, namespace, cluster_uuid |
1063 |
|
) |
1064 |
|
) |
1065 |
|
|
1066 |
|
# sync local dir |
1067 |
1 |
self.fs.sync(from_path=cluster_uuid) |
1068 |
|
|
1069 |
1 |
service = await self._get_service(cluster_uuid, service_name, namespace) |
1070 |
|
|
1071 |
|
# sync fs |
1072 |
1 |
self.fs.reverse_sync(from_path=cluster_uuid) |
1073 |
|
|
1074 |
1 |
return service |
1075 |
|
|
1076 |
1 |
async def status_kdu( |
1077 |
|
self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs |
1078 |
|
) -> Union[str, dict]: |
1079 |
|
""" |
1080 |
|
This call would retrieve tha current state of a given KDU instance. It would be |
1081 |
|
would allow to retrieve the _composition_ (i.e. K8s objects) and _specific |
1082 |
|
values_ of the configuration parameters applied to a given instance. This call |
1083 |
|
would be based on the `status` call. |
1084 |
|
|
1085 |
|
:param cluster_uuid: UUID of a K8s cluster known by OSM |
1086 |
|
:param kdu_instance: unique name for the KDU instance |
1087 |
|
:param kwargs: Additional parameters (None yet) |
1088 |
|
:param yaml_format: if the return shall be returned as an YAML string or as a |
1089 |
|
dictionary |
1090 |
|
:return: If successful, it will return the following vector of arguments: |
1091 |
|
- K8s `namespace` in the cluster where the KDU lives |
1092 |
|
- `state` of the KDU instance. It can be: |
1093 |
|
- UNKNOWN |
1094 |
|
- DEPLOYED |
1095 |
|
- DELETED |
1096 |
|
- SUPERSEDED |
1097 |
|
- FAILED or |
1098 |
|
- DELETING |
1099 |
|
- List of `resources` (objects) that this release consists of, sorted by kind, |
1100 |
|
and the status of those resources |
1101 |
|
- Last `deployment_time`. |
1102 |
|
|
1103 |
|
""" |
1104 |
0 |
self.log.debug( |
1105 |
|
"status_kdu: cluster_uuid: {}, kdu_instance: {}".format( |
1106 |
|
cluster_uuid, kdu_instance |
1107 |
|
) |
1108 |
|
) |
1109 |
|
|
1110 |
|
# sync local dir |
1111 |
0 |
self.fs.sync(from_path=cluster_uuid) |
1112 |
|
|
1113 |
|
# get instance: needed to obtain namespace |
1114 |
0 |
instances = await self._instances_list(cluster_id=cluster_uuid) |
1115 |
0 |
for instance in instances: |
1116 |
0 |
if instance.get("name") == kdu_instance: |
1117 |
0 |
break |
1118 |
|
else: |
1119 |
|
# instance does not exist |
1120 |
0 |
raise K8sException( |
1121 |
|
"Instance name: {} not found in cluster: {}".format( |
1122 |
|
kdu_instance, cluster_uuid |
1123 |
|
) |
1124 |
|
) |
1125 |
|
|
1126 |
0 |
status = await self._status_kdu( |
1127 |
|
cluster_id=cluster_uuid, |
1128 |
|
kdu_instance=kdu_instance, |
1129 |
|
namespace=instance["namespace"], |
1130 |
|
yaml_format=yaml_format, |
1131 |
|
show_error_log=True, |
1132 |
|
) |
1133 |
|
|
1134 |
|
# sync fs |
1135 |
0 |
self.fs.reverse_sync(from_path=cluster_uuid) |
1136 |
|
|
1137 |
0 |
return status |
1138 |
|
|
1139 |
1 |
async def get_values_kdu( |
1140 |
|
self, kdu_instance: str, namespace: str, kubeconfig: str |
1141 |
|
) -> str: |
1142 |
1 |
self.log.debug("get kdu_instance values {}".format(kdu_instance)) |
1143 |
|
|
1144 |
1 |
return await self._exec_get_command( |
1145 |
|
get_command="values", |
1146 |
|
kdu_instance=kdu_instance, |
1147 |
|
namespace=namespace, |
1148 |
|
kubeconfig=kubeconfig, |
1149 |
|
) |
1150 |
|
|
1151 |
1 |
async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str: |
1152 |
|
"""Method to obtain the Helm Chart package's values |
1153 |
|
|
1154 |
|
Args: |
1155 |
|
kdu_model: The name or path of an Helm Chart |
1156 |
|
repo_url: Helm Chart repository url |
1157 |
|
|
1158 |
|
Returns: |
1159 |
|
str: the values of the Helm Chart package |
1160 |
|
""" |
1161 |
|
|
1162 |
1 |
self.log.debug( |
1163 |
|
"inspect kdu_model values {} from (optional) repo: {}".format( |
1164 |
|
kdu_model, repo_url |
1165 |
|
) |
1166 |
|
) |
1167 |
|
|
1168 |
1 |
return await self._exec_inspect_command( |
1169 |
|
inspect_command="values", kdu_model=kdu_model, repo_url=repo_url |
1170 |
|
) |
1171 |
|
|
1172 |
1 |
async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str: |
1173 |
1 |
self.log.debug( |
1174 |
|
"inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url) |
1175 |
|
) |
1176 |
|
|
1177 |
1 |
return await self._exec_inspect_command( |
1178 |
|
inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url |
1179 |
|
) |
1180 |
|
|
1181 |
1 |
async def synchronize_repos(self, cluster_uuid: str): |
1182 |
1 |
self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid)) |
1183 |
1 |
try: |
1184 |
1 |
db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid) |
1185 |
1 |
db_repo_dict = self._get_db_repos_dict(db_repo_ids) |
1186 |
|
|
1187 |
1 |
local_repo_list = await self.repo_list(cluster_uuid) |
1188 |
1 |
local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list} |
1189 |
|
|
1190 |
1 |
deleted_repo_list = [] |
1191 |
1 |
added_repo_dict = {} |
1192 |
|
|
1193 |
|
# iterate over the list of repos in the database that should be |
1194 |
|
# added if not present |
1195 |
1 |
for repo_name, db_repo in db_repo_dict.items(): |
1196 |
1 |
try: |
1197 |
|
# check if it is already present |
1198 |
1 |
curr_repo_url = local_repo_dict.get(db_repo["name"]) |
1199 |
1 |
repo_id = db_repo.get("_id") |
1200 |
1 |
if curr_repo_url != db_repo["url"]: |
1201 |
1 |
if curr_repo_url: |
1202 |
0 |
self.log.debug( |
1203 |
|
"repo {} url changed, delete and and again".format( |
1204 |
|
db_repo["url"] |
1205 |
|
) |
1206 |
|
) |
1207 |
0 |
await self.repo_remove(cluster_uuid, db_repo["name"]) |
1208 |
0 |
deleted_repo_list.append(repo_id) |
1209 |
|
|
1210 |
|
# add repo |
1211 |
1 |
self.log.debug("add repo {}".format(db_repo["name"])) |
1212 |
1 |
await self.repo_add( |
1213 |
|
cluster_uuid, |
1214 |
|
db_repo["name"], |
1215 |
|
db_repo["url"], |
1216 |
|
cert=db_repo.get("ca_cert"), |
1217 |
|
user=db_repo.get("user"), |
1218 |
|
password=db_repo.get("password"), |
1219 |
|
oci=db_repo.get("oci", False), |
1220 |
|
) |
1221 |
1 |
added_repo_dict[repo_id] = db_repo["name"] |
1222 |
0 |
except Exception as e: |
1223 |
0 |
raise K8sException( |
1224 |
|
"Error adding repo id: {}, err_msg: {} ".format( |
1225 |
|
repo_id, repr(e) |
1226 |
|
) |
1227 |
|
) |
1228 |
|
|
1229 |
|
# Delete repos that are present but not in nbi_list |
1230 |
1 |
for repo_name in local_repo_dict: |
1231 |
1 |
if not db_repo_dict.get(repo_name) and repo_name != "stable": |
1232 |
1 |
self.log.debug("delete repo {}".format(repo_name)) |
1233 |
1 |
try: |
1234 |
1 |
await self.repo_remove(cluster_uuid, repo_name) |
1235 |
1 |
deleted_repo_list.append(repo_name) |
1236 |
0 |
except Exception as e: |
1237 |
0 |
self.warning( |
1238 |
|
"Error deleting repo, name: {}, err_msg: {}".format( |
1239 |
|
repo_name, str(e) |
1240 |
|
) |
1241 |
|
) |
1242 |
|
|
1243 |
1 |
return deleted_repo_list, added_repo_dict |
1244 |
|
|
1245 |
0 |
except K8sException: |
1246 |
0 |
raise |
1247 |
0 |
except Exception as e: |
1248 |
|
# Do not raise errors synchronizing repos |
1249 |
0 |
self.log.error("Error synchronizing repos: {}".format(e)) |
1250 |
0 |
raise Exception("Error synchronizing repos: {}".format(e)) |
1251 |
|
|
1252 |
1 |
def _get_db_repos_dict(self, repo_ids: list): |
1253 |
1 |
db_repos_dict = {} |
1254 |
1 |
for repo_id in repo_ids: |
1255 |
1 |
db_repo = self.db.get_one("k8srepos", {"_id": repo_id}) |
1256 |
1 |
db_repos_dict[db_repo["name"]] = db_repo |
1257 |
1 |
return db_repos_dict |
1258 |
|
|
1259 |
1 |
""" |
1260 |
|
#################################################################################### |
1261 |
|
################################### TO BE IMPLEMENTED SUBCLASSES ################### |
1262 |
|
#################################################################################### |
1263 |
|
""" |
1264 |
|
|
1265 |
1 |
@abc.abstractmethod |
1266 |
1 |
def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True): |
1267 |
|
""" |
1268 |
|
Creates and returns base cluster and kube dirs and returns them. |
1269 |
|
Also created helm3 dirs according to new directory specification, paths are |
1270 |
|
not returned but assigned to helm environment variables |
1271 |
|
|
1272 |
|
:param cluster_name: cluster_name |
1273 |
|
:return: Dictionary with config_paths and dictionary with helm environment variables |
1274 |
|
""" |
1275 |
|
|
1276 |
1 |
@abc.abstractmethod |
1277 |
1 |
async def _cluster_init(self, cluster_id, namespace, paths, env): |
1278 |
|
""" |
1279 |
|
Implements the helm version dependent cluster initialization |
1280 |
|
""" |
1281 |
|
|
1282 |
1 |
@abc.abstractmethod |
1283 |
1 |
async def _instances_list(self, cluster_id): |
1284 |
|
""" |
1285 |
|
Implements the helm version dependent helm instances list |
1286 |
|
""" |
1287 |
|
|
1288 |
1 |
@abc.abstractmethod |
1289 |
1 |
async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig): |
1290 |
|
""" |
1291 |
|
Implements the helm version dependent method to obtain services from a helm instance |
1292 |
|
""" |
1293 |
|
|
1294 |
1 |
@abc.abstractmethod |
1295 |
1 |
async def _status_kdu( |
1296 |
|
self, |
1297 |
|
cluster_id: str, |
1298 |
|
kdu_instance: str, |
1299 |
|
namespace: str = None, |
1300 |
|
yaml_format: bool = False, |
1301 |
|
show_error_log: bool = False, |
1302 |
|
) -> Union[str, dict]: |
1303 |
|
""" |
1304 |
|
Implements the helm version dependent method to obtain status of a helm instance |
1305 |
|
""" |
1306 |
|
|
1307 |
1 |
@abc.abstractmethod |
1308 |
1 |
def _get_install_command( |
1309 |
|
self, |
1310 |
|
kdu_model, |
1311 |
|
kdu_instance, |
1312 |
|
namespace, |
1313 |
|
params_str, |
1314 |
|
version, |
1315 |
|
atomic, |
1316 |
|
timeout, |
1317 |
|
kubeconfig, |
1318 |
|
) -> str: |
1319 |
|
""" |
1320 |
|
Obtain command to be executed to delete the indicated instance |
1321 |
|
""" |
1322 |
|
|
1323 |
1 |
@abc.abstractmethod |
1324 |
1 |
def _get_upgrade_scale_command( |
1325 |
|
self, |
1326 |
|
kdu_model, |
1327 |
|
kdu_instance, |
1328 |
|
namespace, |
1329 |
|
count, |
1330 |
|
version, |
1331 |
|
atomic, |
1332 |
|
replicas, |
1333 |
|
timeout, |
1334 |
|
resource_name, |
1335 |
|
kubeconfig, |
1336 |
|
) -> str: |
1337 |
|
"""Generates the command to scale a Helm Chart release |
1338 |
|
|
1339 |
|
Args: |
1340 |
|
kdu_model (str): Kdu model name, corresponding to the Helm local location or repository |
1341 |
|
kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question |
1342 |
|
namespace (str): Namespace where this KDU instance is deployed |
1343 |
|
scale (int): Scale count |
1344 |
|
version (str): Constraint with specific version of the Chart to use |
1345 |
|
atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. |
1346 |
|
The --wait flag will be set automatically if --atomic is used |
1347 |
|
replica_str (str): The key under resource_name key where the scale count is stored |
1348 |
|
timeout (float): The time, in seconds, to wait |
1349 |
|
resource_name (str): The KDU's resource to scale |
1350 |
|
kubeconfig (str): Kubeconfig file path |
1351 |
|
|
1352 |
|
Returns: |
1353 |
|
str: command to scale a Helm Chart release |
1354 |
|
""" |
1355 |
|
|
1356 |
1 |
@abc.abstractmethod |
1357 |
1 |
def _get_upgrade_command( |
1358 |
|
self, |
1359 |
|
kdu_model, |
1360 |
|
kdu_instance, |
1361 |
|
namespace, |
1362 |
|
params_str, |
1363 |
|
version, |
1364 |
|
atomic, |
1365 |
|
timeout, |
1366 |
|
kubeconfig, |
1367 |
|
force, |
1368 |
|
) -> str: |
1369 |
|
"""Generates the command to upgrade a Helm Chart release |
1370 |
|
|
1371 |
|
Args: |
1372 |
|
kdu_model (str): Kdu model name, corresponding to the Helm local location or repository |
1373 |
|
kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question |
1374 |
|
namespace (str): Namespace where this KDU instance is deployed |
1375 |
|
params_str (str): Params used to upgrade the Helm Chart release |
1376 |
|
version (str): Constraint with specific version of the Chart to use |
1377 |
|
atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. |
1378 |
|
The --wait flag will be set automatically if --atomic is used |
1379 |
|
timeout (float): The time, in seconds, to wait |
1380 |
|
kubeconfig (str): Kubeconfig file path |
1381 |
|
force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods. |
1382 |
|
Returns: |
1383 |
|
str: command to upgrade a Helm Chart release |
1384 |
|
""" |
1385 |
|
|
1386 |
1 |
@abc.abstractmethod |
1387 |
1 |
def _get_rollback_command( |
1388 |
|
self, kdu_instance, namespace, revision, kubeconfig |
1389 |
|
) -> str: |
1390 |
|
""" |
1391 |
|
Obtain command to be executed to rollback the indicated instance |
1392 |
|
""" |
1393 |
|
|
1394 |
1 |
@abc.abstractmethod |
1395 |
1 |
def _get_uninstall_command( |
1396 |
|
self, kdu_instance: str, namespace: str, kubeconfig: str |
1397 |
|
) -> str: |
1398 |
|
""" |
1399 |
|
Obtain command to be executed to delete the indicated instance |
1400 |
|
""" |
1401 |
|
|
1402 |
1 |
@abc.abstractmethod |
1403 |
1 |
def _get_inspect_command( |
1404 |
|
self, show_command: str, kdu_model: str, repo_str: str, version: str |
1405 |
|
): |
1406 |
|
"""Generates the command to obtain the information about an Helm Chart package |
1407 |
|
(´helm show ...´ command) |
1408 |
|
|
1409 |
|
Args: |
1410 |
|
show_command: the second part of the command (`helm show <show_command>`) |
1411 |
|
kdu_model: The name or path of an Helm Chart |
1412 |
|
repo_url: Helm Chart repository url |
1413 |
|
version: constraint with specific version of the Chart to use |
1414 |
|
|
1415 |
|
Returns: |
1416 |
|
str: the generated Helm Chart command |
1417 |
|
""" |
1418 |
|
|
1419 |
1 |
@abc.abstractmethod |
1420 |
1 |
def _get_get_command( |
1421 |
|
self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str |
1422 |
|
): |
1423 |
|
"""Obtain command to be executed to get information about the kdu instance.""" |
1424 |
|
|
1425 |
1 |
@abc.abstractmethod |
1426 |
1 |
async def _uninstall_sw(self, cluster_id: str, namespace: str): |
1427 |
|
""" |
1428 |
|
Method call to uninstall cluster software for helm. This method is dependent |
1429 |
|
of helm version |
1430 |
|
For Helm v2 it will be called when Tiller must be uninstalled |
1431 |
|
For Helm v3 it does nothing and does not need to be callled |
1432 |
|
""" |
1433 |
|
|
1434 |
1 |
@abc.abstractmethod |
1435 |
1 |
def _get_helm_chart_repos_ids(self, cluster_uuid) -> list: |
1436 |
|
""" |
1437 |
|
Obtains the cluster repos identifiers |
1438 |
|
""" |
1439 |
|
|
1440 |
1 |
""" |
1441 |
|
#################################################################################### |
1442 |
|
################################### P R I V A T E ################################## |
1443 |
|
#################################################################################### |
1444 |
|
""" |
1445 |
|
|
1446 |
1 |
@staticmethod |
1447 |
1 |
def _check_file_exists(filename: str, exception_if_not_exists: bool = False): |
1448 |
0 |
if os.path.exists(filename): |
1449 |
0 |
return True |
1450 |
|
else: |
1451 |
0 |
msg = "File {} does not exist".format(filename) |
1452 |
0 |
if exception_if_not_exists: |
1453 |
0 |
raise K8sException(msg) |
1454 |
|
|
1455 |
1 |
@staticmethod |
1456 |
1 |
def _remove_multiple_spaces(strobj): |
1457 |
0 |
strobj = strobj.strip() |
1458 |
0 |
while " " in strobj: |
1459 |
0 |
strobj = strobj.replace(" ", " ") |
1460 |
0 |
return strobj |
1461 |
|
|
1462 |
1 |
@staticmethod |
1463 |
1 |
def _output_to_lines(output: str) -> list: |
1464 |
0 |
output_lines = list() |
1465 |
0 |
lines = output.splitlines(keepends=False) |
1466 |
0 |
for line in lines: |
1467 |
0 |
line = line.strip() |
1468 |
0 |
if len(line) > 0: |
1469 |
0 |
output_lines.append(line) |
1470 |
0 |
return output_lines |
1471 |
|
|
1472 |
1 |
@staticmethod |
1473 |
1 |
def _output_to_table(output: str) -> list: |
1474 |
1 |
output_table = list() |
1475 |
1 |
lines = output.splitlines(keepends=False) |
1476 |
1 |
for line in lines: |
1477 |
0 |
line = line.replace("\t", " ") |
1478 |
0 |
line_list = list() |
1479 |
0 |
output_table.append(line_list) |
1480 |
0 |
cells = line.split(sep=" ") |
1481 |
0 |
for cell in cells: |
1482 |
0 |
cell = cell.strip() |
1483 |
0 |
if len(cell) > 0: |
1484 |
0 |
line_list.append(cell) |
1485 |
1 |
return output_table |
1486 |
|
|
1487 |
1 |
@staticmethod |
1488 |
1 |
def _parse_services(output: str) -> list: |
1489 |
0 |
lines = output.splitlines(keepends=False) |
1490 |
0 |
services = [] |
1491 |
0 |
for line in lines: |
1492 |
0 |
line = line.replace("\t", " ") |
1493 |
0 |
cells = line.split(sep=" ") |
1494 |
0 |
if len(cells) > 0 and cells[0].startswith("service/"): |
1495 |
0 |
elems = cells[0].split(sep="/") |
1496 |
0 |
if len(elems) > 1: |
1497 |
0 |
services.append(elems[1]) |
1498 |
0 |
return services |
1499 |
|
|
1500 |
1 |
@staticmethod |
1501 |
1 |
def _get_deep(dictionary: dict, members: tuple): |
1502 |
1 |
target = dictionary |
1503 |
1 |
value = None |
1504 |
1 |
try: |
1505 |
1 |
for m in members: |
1506 |
1 |
value = target.get(m) |
1507 |
0 |
if not value: |
1508 |
0 |
return None |
1509 |
|
else: |
1510 |
0 |
target = value |
1511 |
1 |
except Exception: |
1512 |
1 |
pass |
1513 |
1 |
return value |
1514 |
|
|
1515 |
|
# find key:value in several lines |
1516 |
1 |
@staticmethod |
1517 |
1 |
def _find_in_lines(p_lines: list, p_key: str) -> str: |
1518 |
0 |
for line in p_lines: |
1519 |
0 |
try: |
1520 |
0 |
if line.startswith(p_key + ":"): |
1521 |
0 |
parts = line.split(":") |
1522 |
0 |
the_value = parts[1].strip() |
1523 |
0 |
return the_value |
1524 |
0 |
except Exception: |
1525 |
|
# ignore it |
1526 |
0 |
pass |
1527 |
0 |
return None |
1528 |
|
|
1529 |
1 |
@staticmethod |
1530 |
1 |
def _lower_keys_list(input_list: list): |
1531 |
|
""" |
1532 |
|
Transform the keys in a list of dictionaries to lower case and returns a new list |
1533 |
|
of dictionaries |
1534 |
|
""" |
1535 |
0 |
new_list = [] |
1536 |
0 |
if input_list: |
1537 |
0 |
for dictionary in input_list: |
1538 |
0 |
new_dict = dict((k.lower(), v) for k, v in dictionary.items()) |
1539 |
0 |
new_list.append(new_dict) |
1540 |
0 |
return new_list |
1541 |
|
|
1542 |
1 |
async def _local_async_exec( |
1543 |
|
self, |
1544 |
|
command: str, |
1545 |
|
raise_exception_on_error: bool = False, |
1546 |
|
show_error_log: bool = True, |
1547 |
|
encode_utf8: bool = False, |
1548 |
|
env: dict = None, |
1549 |
|
) -> tuple[str, int]: |
1550 |
0 |
command = K8sHelmBaseConnector._remove_multiple_spaces(command) |
1551 |
0 |
self.log.debug( |
1552 |
|
"Executing async local command: {}, env: {}".format(command, env) |
1553 |
|
) |
1554 |
|
|
1555 |
|
# split command |
1556 |
0 |
command = shlex.split(command) |
1557 |
|
|
1558 |
0 |
environ = os.environ.copy() |
1559 |
0 |
if env: |
1560 |
0 |
environ.update(env) |
1561 |
|
|
1562 |
0 |
try: |
1563 |
0 |
async with self.cmd_lock: |
1564 |
0 |
process = await asyncio.create_subprocess_exec( |
1565 |
|
*command, |
1566 |
|
stdout=asyncio.subprocess.PIPE, |
1567 |
|
stderr=asyncio.subprocess.PIPE, |
1568 |
|
env=environ, |
1569 |
|
) |
1570 |
|
|
1571 |
|
# wait for command terminate |
1572 |
0 |
stdout, stderr = await process.communicate() |
1573 |
|
|
1574 |
0 |
return_code = process.returncode |
1575 |
|
|
1576 |
0 |
output = "" |
1577 |
0 |
if stdout: |
1578 |
0 |
output = stdout.decode("utf-8").strip() |
1579 |
|
# output = stdout.decode() |
1580 |
0 |
if stderr: |
1581 |
0 |
output = stderr.decode("utf-8").strip() |
1582 |
|
# output = stderr.decode() |
1583 |
|
|
1584 |
0 |
if return_code != 0 and show_error_log: |
1585 |
0 |
self.log.debug( |
1586 |
|
"Return code (FAIL): {}\nOutput:\n{}".format(return_code, output) |
1587 |
|
) |
1588 |
|
else: |
1589 |
0 |
self.log.debug("Return code: {}".format(return_code)) |
1590 |
|
|
1591 |
0 |
if raise_exception_on_error and return_code != 0: |
1592 |
0 |
raise K8sException(output) |
1593 |
|
|
1594 |
0 |
if encode_utf8: |
1595 |
0 |
output = output.encode("utf-8").strip() |
1596 |
0 |
output = str(output).replace("\\n", "\n") |
1597 |
|
|
1598 |
0 |
return output, return_code |
1599 |
|
|
1600 |
0 |
except asyncio.CancelledError: |
1601 |
|
# first, kill the process if it is still running |
1602 |
0 |
if process.returncode is None: |
1603 |
0 |
process.kill() |
1604 |
0 |
raise |
1605 |
0 |
except K8sException: |
1606 |
0 |
raise |
1607 |
0 |
except Exception as e: |
1608 |
0 |
msg = "Exception executing command: {} -> {}".format(command, e) |
1609 |
0 |
self.log.error(msg) |
1610 |
0 |
if raise_exception_on_error: |
1611 |
0 |
raise K8sException(e) from e |
1612 |
|
else: |
1613 |
0 |
return "", -1 |
1614 |
|
|
1615 |
1 |
async def _local_async_exec_pipe( |
1616 |
|
self, |
1617 |
|
command1: str, |
1618 |
|
command2: str, |
1619 |
|
raise_exception_on_error: bool = True, |
1620 |
|
show_error_log: bool = True, |
1621 |
|
encode_utf8: bool = False, |
1622 |
|
env: dict = None, |
1623 |
|
): |
1624 |
0 |
command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1) |
1625 |
0 |
command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2) |
1626 |
0 |
command = "{} | {}".format(command1, command2) |
1627 |
0 |
self.log.debug( |
1628 |
|
"Executing async local command: {}, env: {}".format(command, env) |
1629 |
|
) |
1630 |
|
|
1631 |
|
# split command |
1632 |
0 |
command1 = shlex.split(command1) |
1633 |
0 |
command2 = shlex.split(command2) |
1634 |
|
|
1635 |
0 |
environ = os.environ.copy() |
1636 |
0 |
if env: |
1637 |
0 |
environ.update(env) |
1638 |
|
|
1639 |
0 |
try: |
1640 |
0 |
async with self.cmd_lock: |
1641 |
0 |
read, write = os.pipe() |
1642 |
0 |
process_1 = await asyncio.create_subprocess_exec( |
1643 |
|
*command1, stdout=write, env=environ |
1644 |
|
) |
1645 |
0 |
os.close(write) |
1646 |
0 |
process_2 = await asyncio.create_subprocess_exec( |
1647 |
|
*command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ |
1648 |
|
) |
1649 |
0 |
os.close(read) |
1650 |
0 |
stdout, stderr = await process_2.communicate() |
1651 |
|
|
1652 |
0 |
return_code = process_2.returncode |
1653 |
|
|
1654 |
0 |
output = "" |
1655 |
0 |
if stdout: |
1656 |
0 |
output = stdout.decode("utf-8").strip() |
1657 |
|
# output = stdout.decode() |
1658 |
0 |
if stderr: |
1659 |
0 |
output = stderr.decode("utf-8").strip() |
1660 |
|
# output = stderr.decode() |
1661 |
|
|
1662 |
0 |
if return_code != 0 and show_error_log: |
1663 |
0 |
self.log.debug( |
1664 |
|
"Return code (FAIL): {}\nOutput:\n{}".format(return_code, output) |
1665 |
|
) |
1666 |
|
else: |
1667 |
0 |
self.log.debug("Return code: {}".format(return_code)) |
1668 |
|
|
1669 |
0 |
if raise_exception_on_error and return_code != 0: |
1670 |
0 |
raise K8sException(output) |
1671 |
|
|
1672 |
0 |
if encode_utf8: |
1673 |
0 |
output = output.encode("utf-8").strip() |
1674 |
0 |
output = str(output).replace("\\n", "\n") |
1675 |
|
|
1676 |
0 |
return output, return_code |
1677 |
0 |
except asyncio.CancelledError: |
1678 |
|
# first, kill the processes if they are still running |
1679 |
0 |
for process in (process_1, process_2): |
1680 |
0 |
if process.returncode is None: |
1681 |
0 |
process.kill() |
1682 |
0 |
raise |
1683 |
0 |
except K8sException: |
1684 |
0 |
raise |
1685 |
0 |
except Exception as e: |
1686 |
0 |
msg = "Exception executing command: {} -> {}".format(command, e) |
1687 |
0 |
self.log.error(msg) |
1688 |
0 |
if raise_exception_on_error: |
1689 |
0 |
raise K8sException(e) from e |
1690 |
|
else: |
1691 |
0 |
return "", -1 |
1692 |
|
|
1693 |
1 |
async def _get_service(self, cluster_id, service_name, namespace): |
1694 |
|
""" |
1695 |
|
Obtains the data of the specified service in the k8cluster. |
1696 |
|
|
1697 |
|
:param cluster_id: id of a K8s cluster known by OSM |
1698 |
|
:param service_name: name of the K8s service in the specified namespace |
1699 |
|
:param namespace: K8s namespace used by the KDU instance |
1700 |
|
:return: If successful, it will return a service with the following data: |
1701 |
|
- `name` of the service |
1702 |
|
- `type` type of service in the k8 cluster |
1703 |
|
- `ports` List of ports offered by the service, for each port includes at least |
1704 |
|
name, port, protocol |
1705 |
|
- `cluster_ip` Internal ip to be used inside k8s cluster |
1706 |
|
- `external_ip` List of external ips (in case they are available) |
1707 |
|
""" |
1708 |
|
|
1709 |
|
# init config, env |
1710 |
1 |
paths, env = self._init_paths_env( |
1711 |
|
cluster_name=cluster_id, create_if_not_exist=True |
1712 |
|
) |
1713 |
|
|
1714 |
1 |
command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format( |
1715 |
|
self.kubectl_command, |
1716 |
|
paths["kube_config"], |
1717 |
|
quote(namespace), |
1718 |
|
quote(service_name), |
1719 |
|
) |
1720 |
|
|
1721 |
1 |
output, _rc = await self._local_async_exec( |
1722 |
|
command=command, raise_exception_on_error=True, env=env |
1723 |
|
) |
1724 |
|
|
1725 |
1 |
data = yaml.load(output, Loader=yaml.SafeLoader) |
1726 |
|
|
1727 |
1 |
service = { |
1728 |
|
"name": service_name, |
1729 |
|
"type": self._get_deep(data, ("spec", "type")), |
1730 |
|
"ports": self._get_deep(data, ("spec", "ports")), |
1731 |
|
"cluster_ip": self._get_deep(data, ("spec", "clusterIP")), |
1732 |
|
} |
1733 |
1 |
if service["type"] == "LoadBalancer": |
1734 |
0 |
ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress")) |
1735 |
0 |
ip_list = [elem["ip"] for elem in ip_map_list] |
1736 |
0 |
service["external_ip"] = ip_list |
1737 |
|
|
1738 |
1 |
return service |
1739 |
|
|
1740 |
1 |
async def _exec_get_command( |
1741 |
|
self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str |
1742 |
|
): |
1743 |
|
"""Obtains information about the kdu instance.""" |
1744 |
|
|
1745 |
1 |
full_command = self._get_get_command( |
1746 |
|
get_command, kdu_instance, namespace, kubeconfig |
1747 |
|
) |
1748 |
|
|
1749 |
1 |
output, _rc = await self._local_async_exec(command=full_command) |
1750 |
|
|
1751 |
1 |
return output |
1752 |
|
|
1753 |
1 |
async def _exec_inspect_command( |
1754 |
|
self, inspect_command: str, kdu_model: str, repo_url: str = None |
1755 |
|
): |
1756 |
|
"""Obtains information about an Helm Chart package (´helm show´ command) |
1757 |
|
|
1758 |
|
Args: |
1759 |
|
inspect_command: the Helm sub command (`helm show <inspect_command> ...`) |
1760 |
|
kdu_model: The name or path of an Helm Chart |
1761 |
|
repo_url: Helm Chart repository url |
1762 |
|
|
1763 |
|
Returns: |
1764 |
|
str: the requested info about the Helm Chart package |
1765 |
|
""" |
1766 |
|
|
1767 |
1 |
repo_str = "" |
1768 |
1 |
if repo_url: |
1769 |
1 |
repo_str = " --repo {}".format(quote(repo_url)) |
1770 |
|
|
1771 |
|
# Obtain the Chart's name and store it in the var kdu_model |
1772 |
1 |
kdu_model, _ = self._split_repo(kdu_model=kdu_model) |
1773 |
|
|
1774 |
1 |
kdu_model, version = self._split_version(kdu_model) |
1775 |
1 |
if version: |
1776 |
1 |
version_str = "--version {}".format(quote(version)) |
1777 |
|
else: |
1778 |
0 |
version_str = "" |
1779 |
|
|
1780 |
1 |
full_command = self._get_inspect_command( |
1781 |
|
show_command=inspect_command, |
1782 |
|
kdu_model=quote(kdu_model), |
1783 |
|
repo_str=repo_str, |
1784 |
|
version=version_str, |
1785 |
|
) |
1786 |
|
|
1787 |
1 |
output, _ = await self._local_async_exec(command=full_command) |
1788 |
|
|
1789 |
1 |
return output |
1790 |
|
|
1791 |
1 |
async def _get_replica_count_url( |
1792 |
|
self, |
1793 |
|
kdu_model: str, |
1794 |
|
repo_url: str = None, |
1795 |
|
resource_name: str = None, |
1796 |
|
) -> tuple[int, str]: |
1797 |
|
"""Get the replica count value in the Helm Chart Values. |
1798 |
|
|
1799 |
|
Args: |
1800 |
|
kdu_model: The name or path of an Helm Chart |
1801 |
|
repo_url: Helm Chart repository url |
1802 |
|
resource_name: Resource name |
1803 |
|
|
1804 |
|
Returns: |
1805 |
|
A tuple with: |
1806 |
|
- The number of replicas of the specific instance; if not found, returns None; and |
1807 |
|
- The string corresponding to the replica count key in the Helm values |
1808 |
|
""" |
1809 |
|
|
1810 |
1 |
kdu_values = yaml.load( |
1811 |
|
await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url), |
1812 |
|
Loader=yaml.SafeLoader, |
1813 |
|
) |
1814 |
|
|
1815 |
1 |
self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}") |
1816 |
|
|
1817 |
1 |
if not kdu_values: |
1818 |
0 |
raise K8sException( |
1819 |
|
"kdu_values not found for kdu_model {}".format(kdu_model) |
1820 |
|
) |
1821 |
|
|
1822 |
1 |
if resource_name: |
1823 |
1 |
kdu_values = kdu_values.get(resource_name, None) |
1824 |
|
|
1825 |
1 |
if not kdu_values: |
1826 |
0 |
msg = "resource {} not found in the values in model {}".format( |
1827 |
|
resource_name, kdu_model |
1828 |
|
) |
1829 |
0 |
self.log.error(msg) |
1830 |
0 |
raise K8sException(msg) |
1831 |
|
|
1832 |
1 |
duplicate_check = False |
1833 |
|
|
1834 |
1 |
replica_str = "" |
1835 |
1 |
replicas = None |
1836 |
|
|
1837 |
1 |
if kdu_values.get("replicaCount") is not None: |
1838 |
1 |
replicas = kdu_values["replicaCount"] |
1839 |
1 |
replica_str = "replicaCount" |
1840 |
1 |
elif kdu_values.get("replicas") is not None: |
1841 |
1 |
duplicate_check = True |
1842 |
1 |
replicas = kdu_values["replicas"] |
1843 |
1 |
replica_str = "replicas" |
1844 |
|
else: |
1845 |
0 |
if resource_name: |
1846 |
0 |
msg = ( |
1847 |
|
"replicaCount or replicas not found in the resource" |
1848 |
|
"{} values in model {}. Cannot be scaled".format( |
1849 |
|
resource_name, kdu_model |
1850 |
|
) |
1851 |
|
) |
1852 |
|
else: |
1853 |
0 |
msg = ( |
1854 |
|
"replicaCount or replicas not found in the values" |
1855 |
|
"in model {}. Cannot be scaled".format(kdu_model) |
1856 |
|
) |
1857 |
0 |
self.log.error(msg) |
1858 |
0 |
raise K8sException(msg) |
1859 |
|
|
1860 |
|
# Control if replicas and replicaCount exists at the same time |
1861 |
1 |
msg = "replicaCount and replicas are exists at the same time" |
1862 |
1 |
if duplicate_check: |
1863 |
1 |
if "replicaCount" in kdu_values: |
1864 |
0 |
self.log.error(msg) |
1865 |
0 |
raise K8sException(msg) |
1866 |
|
else: |
1867 |
1 |
if "replicas" in kdu_values: |
1868 |
0 |
self.log.error(msg) |
1869 |
0 |
raise K8sException(msg) |
1870 |
|
|
1871 |
1 |
return replicas, replica_str |
1872 |
|
|
1873 |
1 |
async def _get_replica_count_instance( |
1874 |
|
self, |
1875 |
|
kdu_instance: str, |
1876 |
|
namespace: str, |
1877 |
|
kubeconfig: str, |
1878 |
|
resource_name: str = None, |
1879 |
|
) -> int: |
1880 |
|
"""Get the replica count value in the instance. |
1881 |
|
|
1882 |
|
Args: |
1883 |
|
kdu_instance: The name of the KDU instance |
1884 |
|
namespace: KDU instance namespace |
1885 |
|
kubeconfig: |
1886 |
|
resource_name: Resource name |
1887 |
|
|
1888 |
|
Returns: |
1889 |
|
The number of replicas of the specific instance; if not found, returns None |
1890 |
|
""" |
1891 |
|
|
1892 |
0 |
kdu_values = yaml.load( |
1893 |
|
await self.get_values_kdu(kdu_instance, namespace, kubeconfig), |
1894 |
|
Loader=yaml.SafeLoader, |
1895 |
|
) |
1896 |
|
|
1897 |
0 |
self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}") |
1898 |
|
|
1899 |
0 |
replicas = None |
1900 |
|
|
1901 |
0 |
if kdu_values: |
1902 |
0 |
resource_values = ( |
1903 |
|
kdu_values.get(resource_name, None) if resource_name else None |
1904 |
|
) |
1905 |
|
|
1906 |
0 |
for replica_str in ("replicaCount", "replicas"): |
1907 |
0 |
if resource_values: |
1908 |
0 |
replicas = resource_values.get(replica_str) |
1909 |
|
else: |
1910 |
0 |
replicas = kdu_values.get(replica_str) |
1911 |
|
|
1912 |
0 |
if replicas is not None: |
1913 |
0 |
break |
1914 |
|
|
1915 |
0 |
return replicas |
1916 |
|
|
1917 |
1 |
async def _store_status( |
1918 |
|
self, |
1919 |
|
cluster_id: str, |
1920 |
|
operation: str, |
1921 |
|
kdu_instance: str, |
1922 |
|
namespace: str = None, |
1923 |
|
db_dict: dict = None, |
1924 |
|
) -> None: |
1925 |
|
""" |
1926 |
|
Obtains the status of the KDU instance based on Helm Charts, and stores it in the database. |
1927 |
|
|
1928 |
|
:param cluster_id (str): the cluster where the KDU instance is deployed |
1929 |
|
:param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade") |
1930 |
|
:param kdu_instance (str): The KDU instance in relation to which the status is obtained |
1931 |
|
:param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None |
1932 |
|
:param db_dict (dict): A dictionary with the database necessary information. It shall contain the |
1933 |
|
values for the keys: |
1934 |
|
- "collection": The Mongo DB collection to write to |
1935 |
|
- "filter": The query filter to use in the update process |
1936 |
|
- "path": The dot separated keys which targets the object to be updated |
1937 |
|
Defaults to None. |
1938 |
|
""" |
1939 |
|
|
1940 |
1 |
try: |
1941 |
1 |
detailed_status = await self._status_kdu( |
1942 |
|
cluster_id=cluster_id, |
1943 |
|
kdu_instance=kdu_instance, |
1944 |
|
yaml_format=False, |
1945 |
|
namespace=namespace, |
1946 |
|
) |
1947 |
|
|
1948 |
1 |
status = detailed_status.get("info").get("description") |
1949 |
1 |
self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.") |
1950 |
|
|
1951 |
|
# write status to db |
1952 |
1 |
result = await self.write_app_status_to_db( |
1953 |
|
db_dict=db_dict, |
1954 |
|
status=str(status), |
1955 |
|
detailed_status=str(detailed_status), |
1956 |
|
operation=operation, |
1957 |
|
) |
1958 |
|
|
1959 |
1 |
if not result: |
1960 |
0 |
self.log.info("Error writing in database. Task exiting...") |
1961 |
|
|
1962 |
0 |
except asyncio.CancelledError as e: |
1963 |
0 |
self.log.warning( |
1964 |
|
f"Exception in method {self._store_status.__name__} (task cancelled): {e}" |
1965 |
|
) |
1966 |
0 |
except Exception as e: |
1967 |
0 |
self.log.warning(f"Exception in method {self._store_status.__name__}: {e}") |
1968 |
|
|
1969 |
|
# params for use in -f file |
1970 |
|
# returns values file option and filename (in order to delete it at the end) |
1971 |
1 |
def _params_to_file_option(self, cluster_id: str, params: dict) -> tuple[str, str]: |
1972 |
1 |
if params and len(params) > 0: |
1973 |
0 |
self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True) |
1974 |
|
|
1975 |
0 |
def get_random_number(): |
1976 |
0 |
r = random.SystemRandom().randint(1, 99999999) |
1977 |
0 |
s = str(r) |
1978 |
0 |
while len(s) < 10: |
1979 |
0 |
s = "0" + s |
1980 |
0 |
return s |
1981 |
|
|
1982 |
0 |
params2 = dict() |
1983 |
0 |
for key in params: |
1984 |
0 |
value = params.get(key) |
1985 |
0 |
if "!!yaml" in str(value): |
1986 |
0 |
value = yaml.safe_load(value[7:]) |
1987 |
0 |
params2[key] = value |
1988 |
|
|
1989 |
0 |
values_file = get_random_number() + ".yaml" |
1990 |
0 |
with open(values_file, "w") as stream: |
1991 |
0 |
yaml.dump(params2, stream, indent=4, default_flow_style=False) |
1992 |
|
|
1993 |
0 |
return "-f {}".format(values_file), values_file |
1994 |
|
|
1995 |
1 |
return "", None |
1996 |
|
|
1997 |
|
# params for use in --set option |
1998 |
1 |
@staticmethod |
1999 |
1 |
def _params_to_set_option(params: dict) -> str: |
2000 |
1 |
pairs = [ |
2001 |
|
f"{quote(str(key))}={quote(str(value))}" |
2002 |
|
for key, value in params.items() |
2003 |
|
if value is not None |
2004 |
|
] |
2005 |
1 |
if not pairs: |
2006 |
0 |
return "" |
2007 |
1 |
return "--set " + ",".join(pairs) |
2008 |
|
|
2009 |
1 |
@staticmethod |
2010 |
1 |
def generate_kdu_instance_name(**kwargs): |
2011 |
0 |
chart_name = kwargs["kdu_model"] |
2012 |
|
# check embeded chart (file or dir) |
2013 |
0 |
if chart_name.startswith("/"): |
2014 |
|
# extract file or directory name |
2015 |
0 |
chart_name = chart_name[chart_name.rfind("/") + 1 :] |
2016 |
|
# check URL |
2017 |
0 |
elif "://" in chart_name: |
2018 |
|
# extract last portion of URL |
2019 |
0 |
chart_name = chart_name[chart_name.rfind("/") + 1 :] |
2020 |
|
|
2021 |
0 |
name = "" |
2022 |
0 |
for c in chart_name: |
2023 |
0 |
if c.isalpha() or c.isnumeric(): |
2024 |
0 |
name += c |
2025 |
|
else: |
2026 |
0 |
name += "-" |
2027 |
0 |
if len(name) > 35: |
2028 |
0 |
name = name[0:35] |
2029 |
|
|
2030 |
|
# if does not start with alpha character, prefix 'a' |
2031 |
0 |
if not name[0].isalpha(): |
2032 |
0 |
name = "a" + name |
2033 |
|
|
2034 |
0 |
name += "-" |
2035 |
|
|
2036 |
0 |
def get_random_number(): |
2037 |
0 |
r = random.SystemRandom().randint(1, 99999999) |
2038 |
0 |
s = str(r) |
2039 |
0 |
s = s.rjust(10, "0") |
2040 |
0 |
return s |
2041 |
|
|
2042 |
0 |
name = name + get_random_number() |
2043 |
0 |
return name.lower() |
2044 |
|
|
2045 |
1 |
def _split_version(self, kdu_model: str) -> tuple[str, str]: |
2046 |
1 |
version = None |
2047 |
1 |
if ( |
2048 |
|
not ( |
2049 |
|
self._is_helm_chart_a_file(kdu_model) |
2050 |
|
or self._is_helm_chart_a_url(kdu_model) |
2051 |
|
) |
2052 |
|
and ":" in kdu_model |
2053 |
|
): |
2054 |
1 |
parts = kdu_model.split(sep=":") |
2055 |
1 |
if len(parts) == 2: |
2056 |
1 |
version = str(parts[1]) |
2057 |
1 |
kdu_model = parts[0] |
2058 |
1 |
return kdu_model, version |
2059 |
|
|
2060 |
1 |
def _split_repo(self, kdu_model: str) -> tuple[str, str]: |
2061 |
|
"""Obtain the Helm Chart's repository and Chart's names from the KDU model |
2062 |
|
|
2063 |
|
Args: |
2064 |
|
kdu_model (str): Associated KDU model |
2065 |
|
|
2066 |
|
Returns: |
2067 |
|
(str, str): Tuple with the Chart name in index 0, and the repo name |
2068 |
|
in index 2; if there was a problem finding them, return None |
2069 |
|
for both |
2070 |
|
""" |
2071 |
|
|
2072 |
1 |
chart_name = None |
2073 |
1 |
repo_name = None |
2074 |
|
|
2075 |
1 |
idx = kdu_model.find("/") |
2076 |
1 |
if not self._is_helm_chart_a_url(kdu_model) and idx >= 0: |
2077 |
1 |
chart_name = kdu_model[idx + 1 :] |
2078 |
1 |
repo_name = kdu_model[:idx] |
2079 |
|
|
2080 |
1 |
return chart_name, repo_name |
2081 |
|
|
2082 |
1 |
async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str: |
2083 |
|
"""Obtain the Helm repository for an Helm Chart |
2084 |
|
|
2085 |
|
Args: |
2086 |
|
kdu_model (str): the KDU model associated with the Helm Chart instantiation |
2087 |
|
cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation |
2088 |
|
|
2089 |
|
Returns: |
2090 |
|
str: the repository URL; if Helm Chart is a local one, the function returns None |
2091 |
|
""" |
2092 |
|
|
2093 |
1 |
_, repo_name = self._split_repo(kdu_model=kdu_model) |
2094 |
|
|
2095 |
1 |
repo_url = None |
2096 |
1 |
if repo_name: |
2097 |
|
# Find repository link |
2098 |
1 |
local_repo_list = await self.repo_list(cluster_uuid) |
2099 |
1 |
for repo in local_repo_list: |
2100 |
1 |
if repo["name"] == repo_name: |
2101 |
1 |
repo_url = repo["url"] |
2102 |
1 |
break # it is not necessary to continue the loop if the repo link was found... |
2103 |
|
|
2104 |
1 |
return repo_url |
2105 |
|
|
2106 |
1 |
def _repo_to_oci_url(self, repo): |
2107 |
0 |
db_repo = self.db.get_one("k8srepos", {"name": repo}, fail_on_empty=False) |
2108 |
0 |
if db_repo and "oci" in db_repo: |
2109 |
0 |
return db_repo.get("url") |
2110 |
|
|
2111 |
1 |
async def _prepare_helm_chart(self, kdu_model, cluster_id): |
2112 |
|
# e.g.: "stable/openldap", "1.0" |
2113 |
1 |
kdu_model, version = self._split_version(kdu_model) |
2114 |
|
# e.g.: "openldap, stable" |
2115 |
1 |
chart_name, repo = self._split_repo(kdu_model) |
2116 |
1 |
if repo and chart_name: # repo/chart case |
2117 |
1 |
oci_url = self._repo_to_oci_url(repo) |
2118 |
1 |
if oci_url: # oci does not require helm repo update |
2119 |
0 |
kdu_model = f"{oci_url.rstrip('/')}/{chart_name.lstrip('/')}" # urljoin doesn't work for oci schema |
2120 |
|
else: |
2121 |
1 |
await self.repo_update(cluster_id, repo) |
2122 |
1 |
return kdu_model, version |
2123 |
|
|
2124 |
1 |
async def create_certificate( |
2125 |
|
self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage |
2126 |
|
): |
2127 |
0 |
paths, env = self._init_paths_env( |
2128 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
2129 |
|
) |
2130 |
0 |
kubectl = Kubectl(config_file=paths["kube_config"]) |
2131 |
0 |
await kubectl.create_certificate( |
2132 |
|
namespace=namespace, |
2133 |
|
name=name, |
2134 |
|
dns_prefix=dns_prefix, |
2135 |
|
secret_name=secret_name, |
2136 |
|
usages=[usage], |
2137 |
|
issuer_name="ca-issuer", |
2138 |
|
) |
2139 |
|
|
2140 |
1 |
async def delete_certificate(self, cluster_uuid, namespace, certificate_name): |
2141 |
0 |
paths, env = self._init_paths_env( |
2142 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
2143 |
|
) |
2144 |
0 |
kubectl = Kubectl(config_file=paths["kube_config"]) |
2145 |
0 |
await kubectl.delete_certificate(namespace, certificate_name) |
2146 |
|
|
2147 |
1 |
async def create_namespace( |
2148 |
|
self, |
2149 |
|
namespace, |
2150 |
|
cluster_uuid, |
2151 |
|
labels, |
2152 |
|
): |
2153 |
|
""" |
2154 |
|
Create a namespace in a specific cluster |
2155 |
|
|
2156 |
|
:param namespace: Namespace to be created |
2157 |
|
:param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig |
2158 |
|
:param labels: Dictionary with labels for the new namespace |
2159 |
|
:returns: None |
2160 |
|
""" |
2161 |
0 |
paths, env = self._init_paths_env( |
2162 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
2163 |
|
) |
2164 |
0 |
kubectl = Kubectl(config_file=paths["kube_config"]) |
2165 |
0 |
await kubectl.create_namespace( |
2166 |
|
name=namespace, |
2167 |
|
labels=labels, |
2168 |
|
) |
2169 |
|
|
2170 |
1 |
async def delete_namespace( |
2171 |
|
self, |
2172 |
|
namespace, |
2173 |
|
cluster_uuid, |
2174 |
|
): |
2175 |
|
""" |
2176 |
|
Delete a namespace in a specific cluster |
2177 |
|
|
2178 |
|
:param namespace: namespace to be deleted |
2179 |
|
:param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig |
2180 |
|
:returns: None |
2181 |
|
""" |
2182 |
0 |
paths, env = self._init_paths_env( |
2183 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
2184 |
|
) |
2185 |
0 |
kubectl = Kubectl(config_file=paths["kube_config"]) |
2186 |
0 |
await kubectl.delete_namespace( |
2187 |
|
name=namespace, |
2188 |
|
) |
2189 |
|
|
2190 |
1 |
async def copy_secret_data( |
2191 |
|
self, |
2192 |
|
src_secret: str, |
2193 |
|
dst_secret: str, |
2194 |
|
cluster_uuid: str, |
2195 |
|
data_key: str, |
2196 |
|
src_namespace: str = "osm", |
2197 |
|
dst_namespace: str = "osm", |
2198 |
|
): |
2199 |
|
""" |
2200 |
|
Copy a single key and value from an existing secret to a new one |
2201 |
|
|
2202 |
|
:param src_secret: name of the existing secret |
2203 |
|
:param dst_secret: name of the new secret |
2204 |
|
:param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig |
2205 |
|
:param data_key: key of the existing secret to be copied |
2206 |
|
:param src_namespace: Namespace of the existing secret |
2207 |
|
:param dst_namespace: Namespace of the new secret |
2208 |
|
:returns: None |
2209 |
|
""" |
2210 |
0 |
paths, env = self._init_paths_env( |
2211 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
2212 |
|
) |
2213 |
0 |
kubectl = Kubectl(config_file=paths["kube_config"]) |
2214 |
0 |
secret_data = await kubectl.get_secret_content( |
2215 |
|
name=src_secret, |
2216 |
|
namespace=src_namespace, |
2217 |
|
) |
2218 |
|
# Only the corresponding data_key value needs to be copy |
2219 |
0 |
data = {data_key: secret_data.get(data_key)} |
2220 |
0 |
await kubectl.create_secret( |
2221 |
|
name=dst_secret, |
2222 |
|
data=data, |
2223 |
|
namespace=dst_namespace, |
2224 |
|
secret_type="Opaque", |
2225 |
|
) |
2226 |
|
|
2227 |
1 |
async def setup_default_rbac( |
2228 |
|
self, |
2229 |
|
name, |
2230 |
|
namespace, |
2231 |
|
cluster_uuid, |
2232 |
|
api_groups, |
2233 |
|
resources, |
2234 |
|
verbs, |
2235 |
|
service_account, |
2236 |
|
): |
2237 |
|
""" |
2238 |
|
Create a basic RBAC for a new namespace. |
2239 |
|
|
2240 |
|
:param name: name of both Role and Role Binding |
2241 |
|
:param namespace: K8s namespace |
2242 |
|
:param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig |
2243 |
|
:param api_groups: Api groups to be allowed in Policy Rule |
2244 |
|
:param resources: Resources to be allowed in Policy Rule |
2245 |
|
:param verbs: Verbs to be allowed in Policy Rule |
2246 |
|
:param service_account: Service Account name used to bind the Role |
2247 |
|
:returns: None |
2248 |
|
""" |
2249 |
0 |
paths, env = self._init_paths_env( |
2250 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
2251 |
|
) |
2252 |
0 |
kubectl = Kubectl(config_file=paths["kube_config"]) |
2253 |
0 |
await kubectl.create_role( |
2254 |
|
name=name, |
2255 |
|
labels={}, |
2256 |
|
namespace=namespace, |
2257 |
|
api_groups=api_groups, |
2258 |
|
resources=resources, |
2259 |
|
verbs=verbs, |
2260 |
|
) |
2261 |
0 |
await kubectl.create_role_binding( |
2262 |
|
name=name, |
2263 |
|
labels={}, |
2264 |
|
namespace=namespace, |
2265 |
|
role_name=name, |
2266 |
|
sa_name=service_account, |
2267 |
|
) |