1 |
|
## |
2 |
|
# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. |
3 |
|
# This file is part of OSM |
4 |
|
# All Rights Reserved. |
5 |
|
# |
6 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
7 |
|
# you may not use this file except in compliance with the License. |
8 |
|
# You may obtain a copy of the License at |
9 |
|
# |
10 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
11 |
|
# |
12 |
|
# Unless required by applicable law or agreed to in writing, software |
13 |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
14 |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
15 |
|
# implied. |
16 |
|
# See the License for the specific language governing permissions and |
17 |
|
# limitations under the License. |
18 |
|
# |
19 |
|
# For those usages not covered by the Apache License, Version 2.0 please |
20 |
|
# contact with: nfvlabs@tid.es |
21 |
|
## |
22 |
|
|
23 |
0 |
import asyncio |
24 |
0 |
import os |
25 |
0 |
import random |
26 |
0 |
import shutil |
27 |
0 |
import subprocess |
28 |
0 |
import time |
29 |
0 |
from uuid import uuid4 |
30 |
|
|
31 |
0 |
from n2vc.exceptions import K8sException |
32 |
0 |
from n2vc.k8s_conn import K8sConnector |
33 |
0 |
import yaml |
34 |
|
|
35 |
|
|
36 |
0 |
class K8sHelmConnector(K8sConnector): |
37 |
|
|
38 |
|
""" |
39 |
|
#################################################################################### |
40 |
|
################################### P U B L I C #################################### |
41 |
|
#################################################################################### |
42 |
|
""" |
43 |
|
|
44 |
0 |
def __init__( |
45 |
|
self, |
46 |
|
fs: object, |
47 |
|
db: object, |
48 |
|
kubectl_command: str = "/usr/bin/kubectl", |
49 |
|
helm_command: str = "/usr/bin/helm", |
50 |
|
log: object = None, |
51 |
|
on_update_db=None, |
52 |
|
): |
53 |
|
""" |
54 |
|
|
55 |
|
:param fs: file system for kubernetes and helm configuration |
56 |
|
:param db: database object to write current operation status |
57 |
|
:param kubectl_command: path to kubectl executable |
58 |
|
:param helm_command: path to helm executable |
59 |
|
:param log: logger |
60 |
|
:param on_update_db: callback called when k8s connector updates database |
61 |
|
""" |
62 |
|
|
63 |
|
# parent class |
64 |
0 |
K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db) |
65 |
|
|
66 |
0 |
self.log.info("Initializing K8S Helm connector") |
67 |
|
|
68 |
|
# random numbers for release name generation |
69 |
0 |
random.seed(time.time()) |
70 |
|
|
71 |
|
# the file system |
72 |
0 |
self.fs = fs |
73 |
|
|
74 |
|
# exception if kubectl is not installed |
75 |
0 |
self.kubectl_command = kubectl_command |
76 |
0 |
self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True) |
77 |
|
|
78 |
|
# exception if helm is not installed |
79 |
0 |
self._helm_command = helm_command |
80 |
0 |
self._check_file_exists(filename=helm_command, exception_if_not_exists=True) |
81 |
|
|
82 |
|
# initialize helm client-only |
83 |
0 |
self.log.debug("Initializing helm client-only...") |
84 |
0 |
command = "{} init --client-only".format(self._helm_command) |
85 |
0 |
try: |
86 |
0 |
asyncio.ensure_future( |
87 |
|
self._local_async_exec(command=command, raise_exception_on_error=False) |
88 |
|
) |
89 |
|
# loop = asyncio.get_event_loop() |
90 |
|
# loop.run_until_complete(self._local_async_exec(command=command, |
91 |
|
# raise_exception_on_error=False)) |
92 |
0 |
except Exception as e: |
93 |
0 |
self.warning( |
94 |
|
msg="helm init failed (it was already initialized): {}".format(e) |
95 |
|
) |
96 |
|
|
97 |
0 |
self.log.info("K8S Helm connector initialized") |
98 |
|
|
99 |
0 |
async def init_env( |
100 |
|
self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None |
101 |
|
) -> (str, bool): |
102 |
|
""" |
103 |
|
It prepares a given K8s cluster environment to run Charts on both sides: |
104 |
|
client (OSM) |
105 |
|
server (Tiller) |
106 |
|
|
107 |
|
:param k8s_creds: credentials to access a given K8s cluster, i.e. a valid |
108 |
|
'.kube/config' |
109 |
|
:param namespace: optional namespace to be used for helm. By default, |
110 |
|
'kube-system' will be used |
111 |
|
:param reuse_cluster_uuid: existing cluster uuid for reuse |
112 |
|
:return: uuid of the K8s cluster and True if connector has installed some |
113 |
|
software in the cluster |
114 |
|
(on error, an exception will be raised) |
115 |
|
""" |
116 |
|
|
117 |
0 |
cluster_uuid = reuse_cluster_uuid |
118 |
0 |
if not cluster_uuid: |
119 |
0 |
cluster_uuid = str(uuid4()) |
120 |
|
|
121 |
0 |
self.log.debug("Initializing K8S environment. namespace: {}".format(namespace)) |
122 |
|
|
123 |
|
# create config filename |
124 |
0 |
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( |
125 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
126 |
|
) |
127 |
0 |
f = open(config_filename, "w") |
128 |
0 |
f.write(k8s_creds) |
129 |
0 |
f.close() |
130 |
|
|
131 |
|
# check if tiller pod is up in cluster |
132 |
0 |
command = "{} --kubeconfig={} --namespace={} get deployments".format( |
133 |
|
self.kubectl_command, config_filename, namespace |
134 |
|
) |
135 |
0 |
output, _rc = await self._local_async_exec( |
136 |
|
command=command, raise_exception_on_error=True |
137 |
|
) |
138 |
|
|
139 |
0 |
output_table = K8sHelmConnector._output_to_table(output=output) |
140 |
|
|
141 |
|
# find 'tiller' pod in all pods |
142 |
0 |
already_initialized = False |
143 |
0 |
try: |
144 |
0 |
for row in output_table: |
145 |
0 |
if row[0].startswith("tiller-deploy"): |
146 |
0 |
already_initialized = True |
147 |
0 |
break |
148 |
0 |
except Exception: |
149 |
0 |
pass |
150 |
|
|
151 |
|
# helm init |
152 |
0 |
n2vc_installed_sw = False |
153 |
0 |
if not already_initialized: |
154 |
0 |
self.log.info( |
155 |
|
"Initializing helm in client and server: {}".format(cluster_uuid) |
156 |
|
) |
157 |
0 |
command = "{} --kubeconfig={} --tiller-namespace={} --home={} init".format( |
158 |
|
self._helm_command, config_filename, namespace, helm_dir |
159 |
|
) |
160 |
0 |
output, _rc = await self._local_async_exec( |
161 |
|
command=command, raise_exception_on_error=True |
162 |
|
) |
163 |
0 |
n2vc_installed_sw = True |
164 |
|
else: |
165 |
|
# check client helm installation |
166 |
0 |
check_file = helm_dir + "/repository/repositories.yaml" |
167 |
0 |
if not self._check_file_exists( |
168 |
|
filename=check_file, exception_if_not_exists=False |
169 |
|
): |
170 |
0 |
self.log.info("Initializing helm in client: {}".format(cluster_uuid)) |
171 |
0 |
command = ( |
172 |
|
"{} --kubeconfig={} --tiller-namespace={} " |
173 |
|
"--home={} init --client-only" |
174 |
|
).format(self._helm_command, config_filename, namespace, helm_dir) |
175 |
0 |
output, _rc = await self._local_async_exec( |
176 |
|
command=command, raise_exception_on_error=True |
177 |
|
) |
178 |
|
else: |
179 |
0 |
self.log.info("Helm client already initialized") |
180 |
|
|
181 |
0 |
self.log.info("Cluster initialized {}".format(cluster_uuid)) |
182 |
|
|
183 |
0 |
return cluster_uuid, n2vc_installed_sw |
184 |
|
|
185 |
0 |
async def repo_add( |
186 |
|
self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart" |
187 |
|
): |
188 |
|
|
189 |
0 |
self.log.debug("adding {} repository {}. URL: {}".format(repo_type, name, url)) |
190 |
|
|
191 |
|
# config filename |
192 |
0 |
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( |
193 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
194 |
|
) |
195 |
|
|
196 |
|
# helm repo update |
197 |
0 |
command = "{} --kubeconfig={} --home={} repo update".format( |
198 |
|
self._helm_command, config_filename, helm_dir |
199 |
|
) |
200 |
0 |
self.log.debug("updating repo: {}".format(command)) |
201 |
0 |
await self._local_async_exec(command=command, raise_exception_on_error=False) |
202 |
|
|
203 |
|
# helm repo add name url |
204 |
0 |
command = "{} --kubeconfig={} --home={} repo add {} {}".format( |
205 |
|
self._helm_command, config_filename, helm_dir, name, url |
206 |
|
) |
207 |
0 |
self.log.debug("adding repo: {}".format(command)) |
208 |
0 |
await self._local_async_exec(command=command, raise_exception_on_error=True) |
209 |
|
|
210 |
0 |
async def repo_list(self, cluster_uuid: str) -> list: |
211 |
|
""" |
212 |
|
Get the list of registered repositories |
213 |
|
|
214 |
|
:return: list of registered repositories: [ (name, url) .... ] |
215 |
|
""" |
216 |
|
|
217 |
0 |
self.log.debug("list repositories for cluster {}".format(cluster_uuid)) |
218 |
|
|
219 |
|
# config filename |
220 |
0 |
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( |
221 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
222 |
|
) |
223 |
|
|
224 |
0 |
command = "{} --kubeconfig={} --home={} repo list --output yaml".format( |
225 |
|
self._helm_command, config_filename, helm_dir |
226 |
|
) |
227 |
|
|
228 |
0 |
output, _rc = await self._local_async_exec( |
229 |
|
command=command, raise_exception_on_error=True |
230 |
|
) |
231 |
0 |
if output and len(output) > 0: |
232 |
0 |
return yaml.load(output, Loader=yaml.SafeLoader) |
233 |
|
else: |
234 |
0 |
return [] |
235 |
|
|
236 |
0 |
async def repo_remove(self, cluster_uuid: str, name: str): |
237 |
|
""" |
238 |
|
Remove a repository from OSM |
239 |
|
|
240 |
|
:param cluster_uuid: the cluster |
241 |
|
:param name: repo name in OSM |
242 |
|
:return: True if successful |
243 |
|
""" |
244 |
|
|
245 |
0 |
self.log.debug("list repositories for cluster {}".format(cluster_uuid)) |
246 |
|
|
247 |
|
# config filename |
248 |
0 |
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( |
249 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
250 |
|
) |
251 |
|
|
252 |
0 |
command = "{} --kubeconfig={} --home={} repo remove {}".format( |
253 |
|
self._helm_command, config_filename, helm_dir, name |
254 |
|
) |
255 |
|
|
256 |
0 |
await self._local_async_exec(command=command, raise_exception_on_error=True) |
257 |
|
|
258 |
0 |
async def reset( |
259 |
|
self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False |
260 |
|
) -> bool: |
261 |
|
|
262 |
0 |
self.log.debug( |
263 |
|
"Resetting K8s environment. cluster uuid: {}".format(cluster_uuid) |
264 |
|
) |
265 |
|
|
266 |
|
# get kube and helm directories |
267 |
0 |
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( |
268 |
|
cluster_name=cluster_uuid, create_if_not_exist=False |
269 |
|
) |
270 |
|
|
271 |
|
# uninstall releases if needed |
272 |
0 |
releases = await self.instances_list(cluster_uuid=cluster_uuid) |
273 |
0 |
if len(releases) > 0: |
274 |
0 |
if force: |
275 |
0 |
for r in releases: |
276 |
0 |
try: |
277 |
0 |
kdu_instance = r.get("Name") |
278 |
0 |
chart = r.get("Chart") |
279 |
0 |
self.log.debug( |
280 |
|
"Uninstalling {} -> {}".format(chart, kdu_instance) |
281 |
|
) |
282 |
0 |
await self.uninstall( |
283 |
|
cluster_uuid=cluster_uuid, kdu_instance=kdu_instance |
284 |
|
) |
285 |
0 |
except Exception as e: |
286 |
0 |
self.log.error( |
287 |
|
"Error uninstalling release {}: {}".format(kdu_instance, e) |
288 |
|
) |
289 |
|
else: |
290 |
0 |
msg = ( |
291 |
|
"Cluster has releases and not force. Cannot reset K8s " |
292 |
|
"environment. Cluster uuid: {}" |
293 |
|
).format(cluster_uuid) |
294 |
0 |
self.log.error(msg) |
295 |
0 |
raise K8sException(msg) |
296 |
|
|
297 |
0 |
if uninstall_sw: |
298 |
|
|
299 |
0 |
self.log.debug("Uninstalling tiller from cluster {}".format(cluster_uuid)) |
300 |
|
|
301 |
|
# find namespace for tiller pod |
302 |
0 |
command = "{} --kubeconfig={} get deployments --all-namespaces".format( |
303 |
|
self.kubectl_command, config_filename |
304 |
|
) |
305 |
0 |
output, _rc = await self._local_async_exec( |
306 |
|
command=command, raise_exception_on_error=False |
307 |
|
) |
308 |
0 |
output_table = K8sHelmConnector._output_to_table(output=output) |
309 |
0 |
namespace = None |
310 |
0 |
for r in output_table: |
311 |
0 |
try: |
312 |
0 |
if "tiller-deploy" in r[1]: |
313 |
0 |
namespace = r[0] |
314 |
0 |
break |
315 |
0 |
except Exception: |
316 |
0 |
pass |
317 |
|
else: |
318 |
0 |
msg = "Tiller deployment not found in cluster {}".format(cluster_uuid) |
319 |
0 |
self.log.error(msg) |
320 |
|
|
321 |
0 |
self.log.debug("namespace for tiller: {}".format(namespace)) |
322 |
|
|
323 |
0 |
force_str = "--force" |
324 |
|
|
325 |
0 |
if namespace: |
326 |
|
# delete tiller deployment |
327 |
0 |
self.log.debug( |
328 |
|
"Deleting tiller deployment for cluster {}, namespace {}".format( |
329 |
|
cluster_uuid, namespace |
330 |
|
) |
331 |
|
) |
332 |
0 |
command = ( |
333 |
|
"{} --namespace {} --kubeconfig={} {} delete deployment " |
334 |
|
"tiller-deploy" |
335 |
|
).format(self.kubectl_command, namespace, config_filename, force_str) |
336 |
0 |
await self._local_async_exec( |
337 |
|
command=command, raise_exception_on_error=False |
338 |
|
) |
339 |
|
|
340 |
|
# uninstall tiller from cluster |
341 |
0 |
self.log.debug( |
342 |
|
"Uninstalling tiller from cluster {}".format(cluster_uuid) |
343 |
|
) |
344 |
0 |
command = "{} --kubeconfig={} --home={} reset".format( |
345 |
|
self._helm_command, config_filename, helm_dir |
346 |
|
) |
347 |
0 |
self.log.debug("resetting: {}".format(command)) |
348 |
0 |
output, _rc = await self._local_async_exec( |
349 |
|
command=command, raise_exception_on_error=True |
350 |
|
) |
351 |
|
else: |
352 |
0 |
self.log.debug("namespace not found") |
353 |
|
|
354 |
|
# delete cluster directory |
355 |
0 |
direct = self.fs.path + "/" + cluster_uuid |
356 |
0 |
self.log.debug("Removing directory {}".format(direct)) |
357 |
0 |
shutil.rmtree(direct, ignore_errors=True) |
358 |
|
|
359 |
0 |
return True |
360 |
|
|
361 |
0 |
async def install( |
362 |
|
self, |
363 |
|
cluster_uuid: str, |
364 |
|
kdu_model: str, |
365 |
|
atomic: bool = True, |
366 |
|
timeout: float = 300, |
367 |
|
params: dict = None, |
368 |
|
db_dict: dict = None, |
369 |
|
kdu_name: str = None, |
370 |
|
namespace: str = None, |
371 |
|
): |
372 |
|
|
373 |
0 |
self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid)) |
374 |
|
|
375 |
|
# config filename |
376 |
0 |
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( |
377 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
378 |
|
) |
379 |
|
|
380 |
|
# params to str |
381 |
|
# params_str = K8sHelmConnector._params_to_set_option(params) |
382 |
0 |
params_str, file_to_delete = self._params_to_file_option( |
383 |
|
cluster_uuid=cluster_uuid, params=params |
384 |
|
) |
385 |
|
|
386 |
0 |
timeout_str = "" |
387 |
0 |
if timeout: |
388 |
0 |
timeout_str = "--timeout {}".format(timeout) |
389 |
|
|
390 |
|
# atomic |
391 |
0 |
atomic_str = "" |
392 |
0 |
if atomic: |
393 |
0 |
atomic_str = "--atomic" |
394 |
|
# namespace |
395 |
0 |
namespace_str = "" |
396 |
0 |
if namespace: |
397 |
0 |
namespace_str = "--namespace {}".format(namespace) |
398 |
|
|
399 |
|
# version |
400 |
0 |
version_str = "" |
401 |
0 |
if ":" in kdu_model: |
402 |
0 |
parts = kdu_model.split(sep=":") |
403 |
0 |
if len(parts) == 2: |
404 |
0 |
version_str = "--version {}".format(parts[1]) |
405 |
0 |
kdu_model = parts[0] |
406 |
|
|
407 |
|
# generate a name for the release. Then, check if already exists |
408 |
0 |
kdu_instance = None |
409 |
0 |
while kdu_instance is None: |
410 |
0 |
kdu_instance = K8sHelmConnector._generate_release_name(kdu_model) |
411 |
0 |
try: |
412 |
0 |
result = await self._status_kdu( |
413 |
|
cluster_uuid=cluster_uuid, |
414 |
|
kdu_instance=kdu_instance, |
415 |
|
show_error_log=False, |
416 |
|
) |
417 |
0 |
if result is not None: |
418 |
|
# instance already exists: generate a new one |
419 |
0 |
kdu_instance = None |
420 |
0 |
except K8sException: |
421 |
0 |
pass |
422 |
|
|
423 |
|
# helm repo install |
424 |
0 |
command = ( |
425 |
|
"{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} " |
426 |
|
"{params} {timeout} --name={name} {ns} {model} {ver}".format( |
427 |
|
helm=self._helm_command, |
428 |
|
atomic=atomic_str, |
429 |
|
config=config_filename, |
430 |
|
dir=helm_dir, |
431 |
|
params=params_str, |
432 |
|
timeout=timeout_str, |
433 |
|
name=kdu_instance, |
434 |
|
ns=namespace_str, |
435 |
|
model=kdu_model, |
436 |
|
ver=version_str, |
437 |
|
) |
438 |
|
) |
439 |
0 |
self.log.debug("installing: {}".format(command)) |
440 |
|
|
441 |
0 |
if atomic: |
442 |
|
# exec helm in a task |
443 |
0 |
exec_task = asyncio.ensure_future( |
444 |
|
coro_or_future=self._local_async_exec( |
445 |
|
command=command, raise_exception_on_error=False |
446 |
|
) |
447 |
|
) |
448 |
|
|
449 |
|
# write status in another task |
450 |
0 |
status_task = asyncio.ensure_future( |
451 |
|
coro_or_future=self._store_status( |
452 |
|
cluster_uuid=cluster_uuid, |
453 |
|
kdu_instance=kdu_instance, |
454 |
|
db_dict=db_dict, |
455 |
|
operation="install", |
456 |
|
run_once=False, |
457 |
|
) |
458 |
|
) |
459 |
|
|
460 |
|
# wait for execution task |
461 |
0 |
await asyncio.wait([exec_task]) |
462 |
|
|
463 |
|
# cancel status task |
464 |
0 |
status_task.cancel() |
465 |
|
|
466 |
0 |
output, rc = exec_task.result() |
467 |
|
|
468 |
|
else: |
469 |
|
|
470 |
0 |
output, rc = await self._local_async_exec( |
471 |
|
command=command, raise_exception_on_error=False |
472 |
|
) |
473 |
|
|
474 |
|
# remove temporal values yaml file |
475 |
0 |
if file_to_delete: |
476 |
0 |
os.remove(file_to_delete) |
477 |
|
|
478 |
|
# write final status |
479 |
0 |
await self._store_status( |
480 |
|
cluster_uuid=cluster_uuid, |
481 |
|
kdu_instance=kdu_instance, |
482 |
|
db_dict=db_dict, |
483 |
|
operation="install", |
484 |
|
run_once=True, |
485 |
|
check_every=0, |
486 |
|
) |
487 |
|
|
488 |
0 |
if rc != 0: |
489 |
0 |
msg = "Error executing command: {}\nOutput: {}".format(command, output) |
490 |
0 |
self.log.error(msg) |
491 |
0 |
raise K8sException(msg) |
492 |
|
|
493 |
0 |
self.log.debug("Returning kdu_instance {}".format(kdu_instance)) |
494 |
0 |
return kdu_instance |
495 |
|
|
496 |
0 |
async def instances_list(self, cluster_uuid: str) -> list: |
497 |
|
""" |
498 |
|
returns a list of deployed releases in a cluster |
499 |
|
|
500 |
|
:param cluster_uuid: the cluster |
501 |
|
:return: |
502 |
|
""" |
503 |
|
|
504 |
0 |
self.log.debug("list releases for cluster {}".format(cluster_uuid)) |
505 |
|
|
506 |
|
# config filename |
507 |
0 |
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( |
508 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
509 |
|
) |
510 |
|
|
511 |
0 |
command = "{} --kubeconfig={} --home={} list --output yaml".format( |
512 |
|
self._helm_command, config_filename, helm_dir |
513 |
|
) |
514 |
|
|
515 |
0 |
output, _rc = await self._local_async_exec( |
516 |
|
command=command, raise_exception_on_error=True |
517 |
|
) |
518 |
|
|
519 |
0 |
if output and len(output) > 0: |
520 |
0 |
return yaml.load(output, Loader=yaml.SafeLoader).get("Releases") |
521 |
|
else: |
522 |
0 |
return [] |
523 |
|
|
524 |
0 |
async def upgrade( |
525 |
|
self, |
526 |
|
cluster_uuid: str, |
527 |
|
kdu_instance: str, |
528 |
|
kdu_model: str = None, |
529 |
|
atomic: bool = True, |
530 |
|
timeout: float = 300, |
531 |
|
params: dict = None, |
532 |
|
db_dict: dict = None, |
533 |
|
): |
534 |
|
|
535 |
0 |
self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid)) |
536 |
|
|
537 |
|
# config filename |
538 |
0 |
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( |
539 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
540 |
|
) |
541 |
|
|
542 |
|
# params to str |
543 |
|
# params_str = K8sHelmConnector._params_to_set_option(params) |
544 |
0 |
params_str, file_to_delete = self._params_to_file_option( |
545 |
|
cluster_uuid=cluster_uuid, params=params |
546 |
|
) |
547 |
|
|
548 |
0 |
timeout_str = "" |
549 |
0 |
if timeout: |
550 |
0 |
timeout_str = "--timeout {}".format(timeout) |
551 |
|
|
552 |
|
# atomic |
553 |
0 |
atomic_str = "" |
554 |
0 |
if atomic: |
555 |
0 |
atomic_str = "--atomic" |
556 |
|
|
557 |
|
# version |
558 |
0 |
version_str = "" |
559 |
0 |
if kdu_model and ":" in kdu_model: |
560 |
0 |
parts = kdu_model.split(sep=":") |
561 |
0 |
if len(parts) == 2: |
562 |
0 |
version_str = "--version {}".format(parts[1]) |
563 |
0 |
kdu_model = parts[0] |
564 |
|
|
565 |
|
# helm repo upgrade |
566 |
0 |
command = ( |
567 |
|
"{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}" |
568 |
|
).format( |
569 |
|
self._helm_command, |
570 |
|
atomic_str, |
571 |
|
config_filename, |
572 |
|
helm_dir, |
573 |
|
params_str, |
574 |
|
timeout_str, |
575 |
|
kdu_instance, |
576 |
|
kdu_model, |
577 |
|
version_str, |
578 |
|
) |
579 |
0 |
self.log.debug("upgrading: {}".format(command)) |
580 |
|
|
581 |
0 |
if atomic: |
582 |
|
|
583 |
|
# exec helm in a task |
584 |
0 |
exec_task = asyncio.ensure_future( |
585 |
|
coro_or_future=self._local_async_exec( |
586 |
|
command=command, raise_exception_on_error=False |
587 |
|
) |
588 |
|
) |
589 |
|
# write status in another task |
590 |
0 |
status_task = asyncio.ensure_future( |
591 |
|
coro_or_future=self._store_status( |
592 |
|
cluster_uuid=cluster_uuid, |
593 |
|
kdu_instance=kdu_instance, |
594 |
|
db_dict=db_dict, |
595 |
|
operation="upgrade", |
596 |
|
run_once=False, |
597 |
|
) |
598 |
|
) |
599 |
|
|
600 |
|
# wait for execution task |
601 |
0 |
await asyncio.wait([exec_task]) |
602 |
|
|
603 |
|
# cancel status task |
604 |
0 |
status_task.cancel() |
605 |
0 |
output, rc = exec_task.result() |
606 |
|
|
607 |
|
else: |
608 |
|
|
609 |
0 |
output, rc = await self._local_async_exec( |
610 |
|
command=command, raise_exception_on_error=False |
611 |
|
) |
612 |
|
|
613 |
|
# remove temporal values yaml file |
614 |
0 |
if file_to_delete: |
615 |
0 |
os.remove(file_to_delete) |
616 |
|
|
617 |
|
# write final status |
618 |
0 |
await self._store_status( |
619 |
|
cluster_uuid=cluster_uuid, |
620 |
|
kdu_instance=kdu_instance, |
621 |
|
db_dict=db_dict, |
622 |
|
operation="upgrade", |
623 |
|
run_once=True, |
624 |
|
check_every=0, |
625 |
|
) |
626 |
|
|
627 |
0 |
if rc != 0: |
628 |
0 |
msg = "Error executing command: {}\nOutput: {}".format(command, output) |
629 |
0 |
self.log.error(msg) |
630 |
0 |
raise K8sException(msg) |
631 |
|
|
632 |
|
# return new revision number |
633 |
0 |
instance = await self.get_instance_info( |
634 |
|
cluster_uuid=cluster_uuid, kdu_instance=kdu_instance |
635 |
|
) |
636 |
0 |
if instance: |
637 |
0 |
revision = int(instance.get("Revision")) |
638 |
0 |
self.log.debug("New revision: {}".format(revision)) |
639 |
0 |
return revision |
640 |
|
else: |
641 |
0 |
return 0 |
642 |
|
|
643 |
0 |
async def rollback( |
644 |
|
self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None |
645 |
|
): |
646 |
|
|
647 |
0 |
self.log.debug( |
648 |
|
"rollback kdu_instance {} to revision {} from cluster {}".format( |
649 |
|
kdu_instance, revision, cluster_uuid |
650 |
|
) |
651 |
|
) |
652 |
|
|
653 |
|
# config filename |
654 |
0 |
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( |
655 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
656 |
|
) |
657 |
|
|
658 |
0 |
command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format( |
659 |
|
self._helm_command, config_filename, helm_dir, kdu_instance, revision |
660 |
|
) |
661 |
|
|
662 |
|
# exec helm in a task |
663 |
0 |
exec_task = asyncio.ensure_future( |
664 |
|
coro_or_future=self._local_async_exec( |
665 |
|
command=command, raise_exception_on_error=False |
666 |
|
) |
667 |
|
) |
668 |
|
# write status in another task |
669 |
0 |
status_task = asyncio.ensure_future( |
670 |
|
coro_or_future=self._store_status( |
671 |
|
cluster_uuid=cluster_uuid, |
672 |
|
kdu_instance=kdu_instance, |
673 |
|
db_dict=db_dict, |
674 |
|
operation="rollback", |
675 |
|
run_once=False, |
676 |
|
) |
677 |
|
) |
678 |
|
|
679 |
|
# wait for execution task |
680 |
0 |
await asyncio.wait([exec_task]) |
681 |
|
|
682 |
|
# cancel status task |
683 |
0 |
status_task.cancel() |
684 |
|
|
685 |
0 |
output, rc = exec_task.result() |
686 |
|
|
687 |
|
# write final status |
688 |
0 |
await self._store_status( |
689 |
|
cluster_uuid=cluster_uuid, |
690 |
|
kdu_instance=kdu_instance, |
691 |
|
db_dict=db_dict, |
692 |
|
operation="rollback", |
693 |
|
run_once=True, |
694 |
|
check_every=0, |
695 |
|
) |
696 |
|
|
697 |
0 |
if rc != 0: |
698 |
0 |
msg = "Error executing command: {}\nOutput: {}".format(command, output) |
699 |
0 |
self.log.error(msg) |
700 |
0 |
raise K8sException(msg) |
701 |
|
|
702 |
|
# return new revision number |
703 |
0 |
instance = await self.get_instance_info( |
704 |
|
cluster_uuid=cluster_uuid, kdu_instance=kdu_instance |
705 |
|
) |
706 |
0 |
if instance: |
707 |
0 |
revision = int(instance.get("Revision")) |
708 |
0 |
self.log.debug("New revision: {}".format(revision)) |
709 |
0 |
return revision |
710 |
|
else: |
711 |
0 |
return 0 |
712 |
|
|
713 |
0 |
async def uninstall(self, cluster_uuid: str, kdu_instance: str): |
714 |
|
""" |
715 |
|
Removes an existing KDU instance. It would implicitly use the `delete` call |
716 |
|
(this call would happen after all _terminate-config-primitive_ of the VNF |
717 |
|
are invoked). |
718 |
|
|
719 |
|
:param cluster_uuid: UUID of a K8s cluster known by OSM |
720 |
|
:param kdu_instance: unique name for the KDU instance to be deleted |
721 |
|
:return: True if successful |
722 |
|
""" |
723 |
|
|
724 |
0 |
self.log.debug( |
725 |
|
"uninstall kdu_instance {} from cluster {}".format( |
726 |
|
kdu_instance, cluster_uuid |
727 |
|
) |
728 |
|
) |
729 |
|
|
730 |
|
# config filename |
731 |
0 |
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( |
732 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
733 |
|
) |
734 |
|
|
735 |
0 |
command = "{} --kubeconfig={} --home={} delete --purge {}".format( |
736 |
|
self._helm_command, config_filename, helm_dir, kdu_instance |
737 |
|
) |
738 |
|
|
739 |
0 |
output, _rc = await self._local_async_exec( |
740 |
|
command=command, raise_exception_on_error=True |
741 |
|
) |
742 |
|
|
743 |
0 |
return self._output_to_table(output) |
744 |
|
|
745 |
0 |
async def exec_primitive( |
746 |
|
self, |
747 |
|
cluster_uuid: str = None, |
748 |
|
kdu_instance: str = None, |
749 |
|
primitive_name: str = None, |
750 |
|
timeout: float = 300, |
751 |
|
params: dict = None, |
752 |
|
db_dict: dict = None, |
753 |
|
) -> str: |
754 |
|
"""Exec primitive (Juju action) |
755 |
|
|
756 |
|
:param cluster_uuid str: The UUID of the cluster |
757 |
|
:param kdu_instance str: The unique name of the KDU instance |
758 |
|
:param primitive_name: Name of action that will be executed |
759 |
|
:param timeout: Timeout for action execution |
760 |
|
:param params: Dictionary of all the parameters needed for the action |
761 |
|
:db_dict: Dictionary for any additional data |
762 |
|
|
763 |
|
:return: Returns the output of the action |
764 |
|
""" |
765 |
0 |
raise K8sException( |
766 |
|
"KDUs deployed with Helm don't support actions " |
767 |
|
"different from rollback, upgrade and status" |
768 |
|
) |
769 |
|
|
770 |
0 |
async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str: |
771 |
|
|
772 |
0 |
self.log.debug( |
773 |
|
"inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url) |
774 |
|
) |
775 |
|
|
776 |
0 |
return await self._exec_inspect_comand( |
777 |
|
inspect_command="", kdu_model=kdu_model, repo_url=repo_url |
778 |
|
) |
779 |
|
|
780 |
0 |
async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str: |
781 |
|
|
782 |
0 |
self.log.debug( |
783 |
|
"inspect kdu_model values {} from (optional) repo: {}".format( |
784 |
|
kdu_model, repo_url |
785 |
|
) |
786 |
|
) |
787 |
|
|
788 |
0 |
return await self._exec_inspect_comand( |
789 |
|
inspect_command="values", kdu_model=kdu_model, repo_url=repo_url |
790 |
|
) |
791 |
|
|
792 |
0 |
async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str: |
793 |
|
|
794 |
0 |
self.log.debug( |
795 |
|
"inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url) |
796 |
|
) |
797 |
|
|
798 |
0 |
return await self._exec_inspect_comand( |
799 |
|
inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url |
800 |
|
) |
801 |
|
|
802 |
0 |
async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str: |
803 |
|
|
804 |
|
# call internal function |
805 |
0 |
return await self._status_kdu( |
806 |
|
cluster_uuid=cluster_uuid, |
807 |
|
kdu_instance=kdu_instance, |
808 |
|
show_error_log=True, |
809 |
|
return_text=True, |
810 |
|
) |
811 |
|
|
812 |
0 |
async def synchronize_repos(self, cluster_uuid: str): |
813 |
|
|
814 |
0 |
self.log.debug("syncronize repos for cluster helm-id: {}",) |
815 |
0 |
try: |
816 |
0 |
update_repos_timeout = ( |
817 |
|
300 # max timeout to sync a single repos, more than this is too much |
818 |
|
) |
819 |
0 |
db_k8scluster = self.db.get_one( |
820 |
|
"k8sclusters", {"_admin.helm-chart.id": cluster_uuid} |
821 |
|
) |
822 |
0 |
if db_k8scluster: |
823 |
0 |
nbi_repo_list = ( |
824 |
|
db_k8scluster.get("_admin").get("helm_chart_repos") or [] |
825 |
|
) |
826 |
0 |
cluster_repo_dict = ( |
827 |
|
db_k8scluster.get("_admin").get("helm_charts_added") or {} |
828 |
|
) |
829 |
|
# elements that must be deleted |
830 |
0 |
deleted_repo_list = [] |
831 |
0 |
added_repo_dict = {} |
832 |
0 |
self.log.debug("helm_chart_repos: {}".format(nbi_repo_list)) |
833 |
0 |
self.log.debug("helm_charts_added: {}".format(cluster_repo_dict)) |
834 |
|
|
835 |
|
# obtain repos to add: registered by nbi but not added |
836 |
0 |
repos_to_add = [ |
837 |
|
repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo) |
838 |
|
] |
839 |
|
|
840 |
|
# obtain repos to delete: added by cluster but not in nbi list |
841 |
0 |
repos_to_delete = [ |
842 |
|
repo |
843 |
|
for repo in cluster_repo_dict.keys() |
844 |
|
if repo not in nbi_repo_list |
845 |
|
] |
846 |
|
|
847 |
|
# delete repos: must delete first then add because there may be |
848 |
|
# different repos with same name but |
849 |
|
# different id and url |
850 |
0 |
self.log.debug("repos to delete: {}".format(repos_to_delete)) |
851 |
0 |
for repo_id in repos_to_delete: |
852 |
|
# try to delete repos |
853 |
0 |
try: |
854 |
0 |
repo_delete_task = asyncio.ensure_future( |
855 |
|
self.repo_remove( |
856 |
|
cluster_uuid=cluster_uuid, |
857 |
|
name=cluster_repo_dict[repo_id], |
858 |
|
) |
859 |
|
) |
860 |
0 |
await asyncio.wait_for(repo_delete_task, update_repos_timeout) |
861 |
0 |
except Exception as e: |
862 |
0 |
self.warning( |
863 |
|
"Error deleting repo, id: {}, name: {}, err_msg: {}".format( |
864 |
|
repo_id, cluster_repo_dict[repo_id], str(e) |
865 |
|
) |
866 |
|
) |
867 |
|
# always add to the list of to_delete if there is an error |
868 |
|
# because if is not there |
869 |
|
# deleting raises error |
870 |
0 |
deleted_repo_list.append(repo_id) |
871 |
|
|
872 |
|
# add repos |
873 |
0 |
self.log.debug("repos to add: {}".format(repos_to_add)) |
874 |
0 |
for repo_id in repos_to_add: |
875 |
|
# obtain the repo data from the db |
876 |
|
# if there is an error getting the repo in the database we will |
877 |
|
# ignore this repo and continue |
878 |
|
# because there is a possible race condition where the repo has |
879 |
|
# been deleted while processing |
880 |
0 |
db_repo = self.db.get_one("k8srepos", {"_id": repo_id}) |
881 |
0 |
self.log.debug( |
882 |
|
"obtained repo: id, {}, name: {}, url: {}".format( |
883 |
|
repo_id, db_repo["name"], db_repo["url"] |
884 |
|
) |
885 |
|
) |
886 |
0 |
try: |
887 |
0 |
repo_add_task = asyncio.ensure_future( |
888 |
|
self.repo_add( |
889 |
|
cluster_uuid=cluster_uuid, |
890 |
|
name=db_repo["name"], |
891 |
|
url=db_repo["url"], |
892 |
|
repo_type="chart", |
893 |
|
) |
894 |
|
) |
895 |
0 |
await asyncio.wait_for(repo_add_task, update_repos_timeout) |
896 |
0 |
added_repo_dict[repo_id] = db_repo["name"] |
897 |
0 |
self.log.debug( |
898 |
|
"added repo: id, {}, name: {}".format( |
899 |
|
repo_id, db_repo["name"] |
900 |
|
) |
901 |
|
) |
902 |
0 |
except Exception as e: |
903 |
|
# deal with error adding repo, adding a repo that already |
904 |
|
# exists does not raise any error |
905 |
|
# will not raise error because a wrong repos added by |
906 |
|
# anyone could prevent instantiating any ns |
907 |
0 |
self.log.error( |
908 |
|
"Error adding repo id: {}, err_msg: {} ".format( |
909 |
|
repo_id, repr(e) |
910 |
|
) |
911 |
|
) |
912 |
|
|
913 |
0 |
return deleted_repo_list, added_repo_dict |
914 |
|
|
915 |
|
else: # else db_k8scluster does not exist |
916 |
0 |
raise K8sException( |
917 |
|
"k8cluster with helm-id : {} not found".format(cluster_uuid) |
918 |
|
) |
919 |
|
|
920 |
0 |
except Exception as e: |
921 |
0 |
self.log.error("Error synchronizing repos: {}".format(str(e))) |
922 |
0 |
raise K8sException("Error synchronizing repos") |
923 |
|
|
924 |
|
""" |
925 |
|
#################################################################################### |
926 |
|
################################### P R I V A T E ################################## |
927 |
|
#################################################################################### |
928 |
|
""" |
929 |
|
|
930 |
0 |
async def _exec_inspect_comand( |
931 |
|
self, inspect_command: str, kdu_model: str, repo_url: str = None |
932 |
|
): |
933 |
|
|
934 |
0 |
repo_str = "" |
935 |
0 |
if repo_url: |
936 |
0 |
repo_str = " --repo {}".format(repo_url) |
937 |
0 |
idx = kdu_model.find("/") |
938 |
0 |
if idx >= 0: |
939 |
0 |
idx += 1 |
940 |
0 |
kdu_model = kdu_model[idx:] |
941 |
|
|
942 |
0 |
inspect_command = "{} inspect {} {}{}".format( |
943 |
|
self._helm_command, inspect_command, kdu_model, repo_str |
944 |
|
) |
945 |
0 |
output, _rc = await self._local_async_exec( |
946 |
|
command=inspect_command, encode_utf8=True |
947 |
|
) |
948 |
|
|
949 |
0 |
return output |
950 |
|
|
951 |
0 |
async def _status_kdu( |
952 |
|
self, |
953 |
|
cluster_uuid: str, |
954 |
|
kdu_instance: str, |
955 |
|
show_error_log: bool = False, |
956 |
|
return_text: bool = False, |
957 |
|
): |
958 |
|
|
959 |
0 |
self.log.debug("status of kdu_instance {}".format(kdu_instance)) |
960 |
|
|
961 |
|
# config filename |
962 |
0 |
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( |
963 |
|
cluster_name=cluster_uuid, create_if_not_exist=True |
964 |
|
) |
965 |
|
|
966 |
0 |
command = "{} --kubeconfig={} --home={} status {} --output yaml".format( |
967 |
|
self._helm_command, config_filename, helm_dir, kdu_instance |
968 |
|
) |
969 |
|
|
970 |
0 |
output, rc = await self._local_async_exec( |
971 |
|
command=command, |
972 |
|
raise_exception_on_error=True, |
973 |
|
show_error_log=show_error_log, |
974 |
|
) |
975 |
|
|
976 |
0 |
if return_text: |
977 |
0 |
return str(output) |
978 |
|
|
979 |
0 |
if rc != 0: |
980 |
0 |
return None |
981 |
|
|
982 |
0 |
data = yaml.load(output, Loader=yaml.SafeLoader) |
983 |
|
|
984 |
|
# remove field 'notes' |
985 |
0 |
try: |
986 |
0 |
del data.get("info").get("status")["notes"] |
987 |
0 |
except KeyError: |
988 |
0 |
pass |
989 |
|
|
990 |
|
# parse field 'resources' |
991 |
0 |
try: |
992 |
0 |
resources = str(data.get("info").get("status").get("resources")) |
993 |
0 |
resource_table = self._output_to_table(resources) |
994 |
0 |
data.get("info").get("status")["resources"] = resource_table |
995 |
0 |
except Exception: |
996 |
0 |
pass |
997 |
|
|
998 |
0 |
return data |
999 |
|
|
1000 |
0 |
async def get_instance_info(self, cluster_uuid: str, kdu_instance: str): |
1001 |
0 |
instances = await self.instances_list(cluster_uuid=cluster_uuid) |
1002 |
0 |
for instance in instances: |
1003 |
0 |
if instance.get("Name") == kdu_instance: |
1004 |
0 |
return instance |
1005 |
0 |
self.log.debug("Instance {} not found".format(kdu_instance)) |
1006 |
0 |
return None |
1007 |
|
|
1008 |
0 |
@staticmethod |
1009 |
0 |
def _generate_release_name(chart_name: str): |
1010 |
|
# check embeded chart (file or dir) |
1011 |
0 |
if chart_name.startswith("/"): |
1012 |
|
# extract file or directory name |
1013 |
0 |
chart_name = chart_name[chart_name.rfind("/") + 1 :] |
1014 |
|
# check URL |
1015 |
0 |
elif "://" in chart_name: |
1016 |
|
# extract last portion of URL |
1017 |
0 |
chart_name = chart_name[chart_name.rfind("/") + 1 :] |
1018 |
|
|
1019 |
0 |
name = "" |
1020 |
0 |
for c in chart_name: |
1021 |
0 |
if c.isalpha() or c.isnumeric(): |
1022 |
0 |
name += c |
1023 |
|
else: |
1024 |
0 |
name += "-" |
1025 |
0 |
if len(name) > 35: |
1026 |
0 |
name = name[0:35] |
1027 |
|
|
1028 |
|
# if does not start with alpha character, prefix 'a' |
1029 |
0 |
if not name[0].isalpha(): |
1030 |
0 |
name = "a" + name |
1031 |
|
|
1032 |
0 |
name += "-" |
1033 |
|
|
1034 |
0 |
def get_random_number(): |
1035 |
0 |
r = random.randrange(start=1, stop=99999999) |
1036 |
0 |
s = str(r) |
1037 |
0 |
s = s.rjust(10, "0") |
1038 |
0 |
return s |
1039 |
|
|
1040 |
0 |
name = name + get_random_number() |
1041 |
0 |
return name.lower() |
1042 |
|
|
1043 |
0 |
async def _store_status( |
1044 |
|
self, |
1045 |
|
cluster_uuid: str, |
1046 |
|
operation: str, |
1047 |
|
kdu_instance: str, |
1048 |
|
check_every: float = 10, |
1049 |
|
db_dict: dict = None, |
1050 |
|
run_once: bool = False, |
1051 |
|
): |
1052 |
0 |
while True: |
1053 |
0 |
try: |
1054 |
0 |
await asyncio.sleep(check_every) |
1055 |
0 |
detailed_status = await self.status_kdu( |
1056 |
|
cluster_uuid=cluster_uuid, kdu_instance=kdu_instance |
1057 |
|
) |
1058 |
0 |
status = detailed_status.get("info").get("Description") |
1059 |
0 |
self.log.debug("STATUS:\n{}".format(status)) |
1060 |
0 |
self.log.debug("DETAILED STATUS:\n{}".format(detailed_status)) |
1061 |
|
# write status to db |
1062 |
0 |
result = await self.write_app_status_to_db( |
1063 |
|
db_dict=db_dict, |
1064 |
|
status=str(status), |
1065 |
|
detailed_status=str(detailed_status), |
1066 |
|
operation=operation, |
1067 |
|
) |
1068 |
0 |
if not result: |
1069 |
0 |
self.log.info("Error writing in database. Task exiting...") |
1070 |
0 |
return |
1071 |
0 |
except asyncio.CancelledError: |
1072 |
0 |
self.log.debug("Task cancelled") |
1073 |
0 |
return |
1074 |
0 |
except Exception as e: |
1075 |
0 |
self.log.debug("_store_status exception: {}".format(str(e))) |
1076 |
0 |
pass |
1077 |
|
finally: |
1078 |
0 |
if run_once: |
1079 |
0 |
return |
1080 |
|
|
1081 |
0 |
async def _is_install_completed(self, cluster_uuid: str, kdu_instance: str) -> bool: |
1082 |
|
|
1083 |
0 |
status = await self._status_kdu( |
1084 |
|
cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False |
1085 |
|
) |
1086 |
|
|
1087 |
|
# extract info.status.resources-> str |
1088 |
|
# format: |
1089 |
|
# ==> v1/Deployment |
1090 |
|
# NAME READY UP-TO-DATE AVAILABLE AGE |
1091 |
|
# halting-horse-mongodb 0/1 1 0 0s |
1092 |
|
# halting-petit-mongodb 1/1 1 0 0s |
1093 |
|
# blank line |
1094 |
0 |
resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources")) |
1095 |
|
|
1096 |
|
# convert to table |
1097 |
0 |
resources = K8sHelmConnector._output_to_table(resources) |
1098 |
|
|
1099 |
0 |
num_lines = len(resources) |
1100 |
0 |
index = 0 |
1101 |
0 |
while index < num_lines: |
1102 |
0 |
try: |
1103 |
0 |
line1 = resources[index] |
1104 |
0 |
index += 1 |
1105 |
|
# find '==>' in column 0 |
1106 |
0 |
if line1[0] == "==>": |
1107 |
0 |
line2 = resources[index] |
1108 |
0 |
index += 1 |
1109 |
|
# find READY in column 1 |
1110 |
0 |
if line2[1] == "READY": |
1111 |
|
# read next lines |
1112 |
0 |
line3 = resources[index] |
1113 |
0 |
index += 1 |
1114 |
0 |
while len(line3) > 1 and index < num_lines: |
1115 |
0 |
ready_value = line3[1] |
1116 |
0 |
parts = ready_value.split(sep="/") |
1117 |
0 |
current = int(parts[0]) |
1118 |
0 |
total = int(parts[1]) |
1119 |
0 |
if current < total: |
1120 |
0 |
self.log.debug("NOT READY:\n {}".format(line3)) |
1121 |
0 |
ready = False |
1122 |
0 |
line3 = resources[index] |
1123 |
0 |
index += 1 |
1124 |
|
|
1125 |
0 |
except Exception: |
1126 |
0 |
pass |
1127 |
|
|
1128 |
0 |
return ready |
1129 |
|
|
1130 |
0 |
@staticmethod |
1131 |
0 |
def _get_deep(dictionary: dict, members: tuple): |
1132 |
0 |
target = dictionary |
1133 |
0 |
value = None |
1134 |
0 |
try: |
1135 |
0 |
for m in members: |
1136 |
0 |
value = target.get(m) |
1137 |
0 |
if not value: |
1138 |
0 |
return None |
1139 |
|
else: |
1140 |
0 |
target = value |
1141 |
0 |
except Exception: |
1142 |
0 |
pass |
1143 |
0 |
return value |
1144 |
|
|
1145 |
|
# find key:value in several lines |
1146 |
0 |
@staticmethod |
1147 |
0 |
def _find_in_lines(p_lines: list, p_key: str) -> str: |
1148 |
0 |
for line in p_lines: |
1149 |
0 |
try: |
1150 |
0 |
if line.startswith(p_key + ":"): |
1151 |
0 |
parts = line.split(":") |
1152 |
0 |
the_value = parts[1].strip() |
1153 |
0 |
return the_value |
1154 |
0 |
except Exception: |
1155 |
|
# ignore it |
1156 |
0 |
pass |
1157 |
0 |
return None |
1158 |
|
|
1159 |
|
# params for use in -f file |
1160 |
|
# returns values file option and filename (in order to delete it at the end) |
1161 |
0 |
def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str): |
1162 |
|
|
1163 |
0 |
if params and len(params) > 0: |
1164 |
0 |
self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) |
1165 |
|
|
1166 |
0 |
def get_random_number(): |
1167 |
0 |
r = random.randrange(start=1, stop=99999999) |
1168 |
0 |
s = str(r) |
1169 |
0 |
while len(s) < 10: |
1170 |
0 |
s = "0" + s |
1171 |
0 |
return s |
1172 |
|
|
1173 |
0 |
params2 = dict() |
1174 |
0 |
for key in params: |
1175 |
0 |
value = params.get(key) |
1176 |
0 |
if "!!yaml" in str(value): |
1177 |
0 |
value = yaml.load(value[7:]) |
1178 |
0 |
params2[key] = value |
1179 |
|
|
1180 |
0 |
values_file = get_random_number() + ".yaml" |
1181 |
0 |
with open(values_file, "w") as stream: |
1182 |
0 |
yaml.dump(params2, stream, indent=4, default_flow_style=False) |
1183 |
|
|
1184 |
0 |
return "-f {}".format(values_file), values_file |
1185 |
|
|
1186 |
0 |
return "", None |
1187 |
|
|
1188 |
|
# params for use in --set option |
1189 |
0 |
@staticmethod |
1190 |
0 |
def _params_to_set_option(params: dict) -> str: |
1191 |
0 |
params_str = "" |
1192 |
0 |
if params and len(params) > 0: |
1193 |
0 |
start = True |
1194 |
0 |
for key in params: |
1195 |
0 |
value = params.get(key, None) |
1196 |
0 |
if value is not None: |
1197 |
0 |
if start: |
1198 |
0 |
params_str += "--set " |
1199 |
0 |
start = False |
1200 |
|
else: |
1201 |
0 |
params_str += "," |
1202 |
0 |
params_str += "{}={}".format(key, value) |
1203 |
0 |
return params_str |
1204 |
|
|
1205 |
0 |
@staticmethod |
1206 |
0 |
def _output_to_lines(output: str) -> list: |
1207 |
0 |
output_lines = list() |
1208 |
0 |
lines = output.splitlines(keepends=False) |
1209 |
0 |
for line in lines: |
1210 |
0 |
line = line.strip() |
1211 |
0 |
if len(line) > 0: |
1212 |
0 |
output_lines.append(line) |
1213 |
0 |
return output_lines |
1214 |
|
|
1215 |
0 |
@staticmethod |
1216 |
0 |
def _output_to_table(output: str) -> list: |
1217 |
0 |
output_table = list() |
1218 |
0 |
lines = output.splitlines(keepends=False) |
1219 |
0 |
for line in lines: |
1220 |
0 |
line = line.replace("\t", " ") |
1221 |
0 |
line_list = list() |
1222 |
0 |
output_table.append(line_list) |
1223 |
0 |
cells = line.split(sep=" ") |
1224 |
0 |
for cell in cells: |
1225 |
0 |
cell = cell.strip() |
1226 |
0 |
if len(cell) > 0: |
1227 |
0 |
line_list.append(cell) |
1228 |
0 |
return output_table |
1229 |
|
|
1230 |
0 |
def _get_paths( |
1231 |
|
self, cluster_name: str, create_if_not_exist: bool = False |
1232 |
|
) -> (str, str, str, str): |
1233 |
|
""" |
1234 |
|
Returns kube and helm directories |
1235 |
|
|
1236 |
|
:param cluster_name: |
1237 |
|
:param create_if_not_exist: |
1238 |
|
:return: kube, helm directories, config filename and cluster dir. |
1239 |
|
Raises exception if not exist and cannot create |
1240 |
|
""" |
1241 |
|
|
1242 |
0 |
base = self.fs.path |
1243 |
0 |
if base.endswith("/") or base.endswith("\\"): |
1244 |
0 |
base = base[:-1] |
1245 |
|
|
1246 |
|
# base dir for cluster |
1247 |
0 |
cluster_dir = base + "/" + cluster_name |
1248 |
0 |
if create_if_not_exist and not os.path.exists(cluster_dir): |
1249 |
0 |
self.log.debug("Creating dir {}".format(cluster_dir)) |
1250 |
0 |
os.makedirs(cluster_dir) |
1251 |
0 |
if not os.path.exists(cluster_dir): |
1252 |
0 |
msg = "Base cluster dir {} does not exist".format(cluster_dir) |
1253 |
0 |
self.log.error(msg) |
1254 |
0 |
raise K8sException(msg) |
1255 |
|
|
1256 |
|
# kube dir |
1257 |
0 |
kube_dir = cluster_dir + "/" + ".kube" |
1258 |
0 |
if create_if_not_exist and not os.path.exists(kube_dir): |
1259 |
0 |
self.log.debug("Creating dir {}".format(kube_dir)) |
1260 |
0 |
os.makedirs(kube_dir) |
1261 |
0 |
if not os.path.exists(kube_dir): |
1262 |
0 |
msg = "Kube config dir {} does not exist".format(kube_dir) |
1263 |
0 |
self.log.error(msg) |
1264 |
0 |
raise K8sException(msg) |
1265 |
|
|
1266 |
|
# helm home dir |
1267 |
0 |
helm_dir = cluster_dir + "/" + ".helm" |
1268 |
0 |
if create_if_not_exist and not os.path.exists(helm_dir): |
1269 |
0 |
self.log.debug("Creating dir {}".format(helm_dir)) |
1270 |
0 |
os.makedirs(helm_dir) |
1271 |
0 |
if not os.path.exists(helm_dir): |
1272 |
0 |
msg = "Helm config dir {} does not exist".format(helm_dir) |
1273 |
0 |
self.log.error(msg) |
1274 |
0 |
raise K8sException(msg) |
1275 |
|
|
1276 |
0 |
config_filename = kube_dir + "/config" |
1277 |
0 |
return kube_dir, helm_dir, config_filename, cluster_dir |
1278 |
|
|
1279 |
0 |
@staticmethod |
1280 |
|
def _remove_multiple_spaces(strobj): |
1281 |
0 |
strobj = strobj.strip() |
1282 |
0 |
while " " in strobj: |
1283 |
0 |
strobj = strobj.replace(" ", " ") |
1284 |
0 |
return strobj |
1285 |
|
|
1286 |
0 |
def _local_exec(self, command: str) -> (str, int): |
1287 |
0 |
command = K8sHelmConnector._remove_multiple_spaces(command) |
1288 |
0 |
self.log.debug("Executing sync local command: {}".format(command)) |
1289 |
|
# raise exception if fails |
1290 |
0 |
output = "" |
1291 |
0 |
try: |
1292 |
0 |
output = subprocess.check_output( |
1293 |
|
command, shell=True, universal_newlines=True |
1294 |
|
) |
1295 |
0 |
return_code = 0 |
1296 |
0 |
self.log.debug(output) |
1297 |
0 |
except Exception: |
1298 |
0 |
return_code = 1 |
1299 |
|
|
1300 |
0 |
return output, return_code |
1301 |
|
|
1302 |
0 |
async def _local_async_exec( |
1303 |
|
self, |
1304 |
|
command: str, |
1305 |
|
raise_exception_on_error: bool = False, |
1306 |
|
show_error_log: bool = True, |
1307 |
|
encode_utf8: bool = False, |
1308 |
|
) -> (str, int): |
1309 |
|
|
1310 |
0 |
command = K8sHelmConnector._remove_multiple_spaces(command) |
1311 |
0 |
self.log.debug("Executing async local command: {}".format(command)) |
1312 |
|
|
1313 |
|
# split command |
1314 |
0 |
command = command.split(sep=" ") |
1315 |
|
|
1316 |
0 |
try: |
1317 |
0 |
process = await asyncio.create_subprocess_exec( |
1318 |
|
*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE |
1319 |
|
) |
1320 |
|
|
1321 |
|
# wait for command terminate |
1322 |
0 |
stdout, stderr = await process.communicate() |
1323 |
|
|
1324 |
0 |
return_code = process.returncode |
1325 |
|
|
1326 |
0 |
output = "" |
1327 |
0 |
if stdout: |
1328 |
0 |
output = stdout.decode("utf-8").strip() |
1329 |
|
# output = stdout.decode() |
1330 |
0 |
if stderr: |
1331 |
0 |
output = stderr.decode("utf-8").strip() |
1332 |
|
# output = stderr.decode() |
1333 |
|
|
1334 |
0 |
if return_code != 0 and show_error_log: |
1335 |
0 |
self.log.debug( |
1336 |
|
"Return code (FAIL): {}\nOutput:\n{}".format(return_code, output) |
1337 |
|
) |
1338 |
|
else: |
1339 |
0 |
self.log.debug("Return code: {}".format(return_code)) |
1340 |
|
|
1341 |
0 |
if raise_exception_on_error and return_code != 0: |
1342 |
0 |
raise K8sException(output) |
1343 |
|
|
1344 |
0 |
if encode_utf8: |
1345 |
0 |
output = output.encode("utf-8").strip() |
1346 |
0 |
output = str(output).replace("\\n", "\n") |
1347 |
|
|
1348 |
0 |
return output, return_code |
1349 |
|
|
1350 |
0 |
except asyncio.CancelledError: |
1351 |
0 |
raise |
1352 |
0 |
except K8sException: |
1353 |
0 |
raise |
1354 |
0 |
except Exception as e: |
1355 |
0 |
msg = "Exception executing command: {} -> {}".format(command, e) |
1356 |
0 |
self.log.error(msg) |
1357 |
0 |
if raise_exception_on_error: |
1358 |
0 |
raise K8sException(e) from e |
1359 |
|
else: |
1360 |
0 |
return "", -1 |
1361 |
|
|
1362 |
0 |
def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False): |
1363 |
|
# self.log.debug('Checking if file {} exists...'.format(filename)) |
1364 |
0 |
if os.path.exists(filename): |
1365 |
0 |
return True |
1366 |
|
else: |
1367 |
0 |
msg = "File {} does not exist".format(filename) |
1368 |
0 |
if exception_if_not_exists: |
1369 |
|
# self.log.error(msg) |
1370 |
0 |
raise K8sException(msg) |