1 |
|
## |
2 |
|
# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. |
3 |
|
# This file is part of OSM |
4 |
|
# All Rights Reserved. |
5 |
|
# |
6 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
7 |
|
# you may not use this file except in compliance with the License. |
8 |
|
# You may obtain a copy of the License at |
9 |
|
# |
10 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
11 |
|
# |
12 |
|
# Unless required by applicable law or agreed to in writing, software |
13 |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
14 |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
15 |
|
# implied. |
16 |
|
# See the License for the specific language governing permissions and |
17 |
|
# limitations under the License. |
18 |
|
# |
19 |
|
# For those usages not covered by the Apache License, Version 2.0 please |
20 |
|
# contact with: nfvlabs@tid.es |
21 |
|
## |
22 |
1 |
import abc |
23 |
1 |
import asyncio |
24 |
1 |
import random |
25 |
1 |
import time |
26 |
1 |
import shlex |
27 |
1 |
import shutil |
28 |
1 |
import stat |
29 |
1 |
import subprocess |
30 |
1 |
import os |
31 |
1 |
import yaml |
32 |
1 |
from uuid import uuid4 |
33 |
|
|
34 |
1 |
from n2vc.exceptions import K8sException |
35 |
1 |
from n2vc.k8s_conn import K8sConnector |
36 |
|
|
37 |
|
|
38 |
1 |
class K8sHelmBaseConnector(K8sConnector): |
39 |
|
|
40 |
|
""" |
41 |
|
#################################################################################### |
42 |
|
################################### P U B L I C #################################### |
43 |
|
#################################################################################### |
44 |
|
""" |
45 |
1 |
service_account = "osm" |
46 |
1 |
_STABLE_REPO_URL = "https://charts.helm.sh/stable" |
47 |
|
|
48 |
1 |
def __init__( |
49 |
|
self, |
50 |
|
fs: object, |
51 |
|
db: object, |
52 |
|
kubectl_command: str = "/usr/bin/kubectl", |
53 |
|
helm_command: str = "/usr/bin/helm", |
54 |
|
log: object = None, |
55 |
|
on_update_db=None, |
56 |
|
vca_config: dict = None, |
57 |
|
): |
58 |
|
""" |
59 |
|
|
60 |
|
:param fs: file system for kubernetes and helm configuration |
61 |
|
:param db: database object to write current operation status |
62 |
|
:param kubectl_command: path to kubectl executable |
63 |
|
:param helm_command: path to helm executable |
64 |
|
:param log: logger |
65 |
|
:param on_update_db: callback called when k8s connector updates database |
66 |
|
""" |
67 |
|
|
68 |
|
# parent class |
69 |
1 |
K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db) |
70 |
|
|
71 |
1 |
self.log.info("Initializing K8S Helm connector") |
72 |
|
|
73 |
|
# random numbers for release name generation |
74 |
1 |
random.seed(time.time()) |
75 |
|
|
76 |
|
# the file system |
77 |
1 |
self.fs = fs |
78 |
|
|
79 |
|
# exception if kubectl is not installed |
80 |
1 |
self.kubectl_command = kubectl_command |
81 |
1 |
self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True) |
82 |
|
|
83 |
|
# exception if helm is not installed |
84 |
1 |
self._helm_command = helm_command |
85 |
1 |
self._check_file_exists(filename=helm_command, exception_if_not_exists=True) |
86 |
|
|
87 |
|
# obtain stable repo url from config or apply default |
88 |
1 |
if not vca_config or not vca_config.get("stablerepourl"): |
89 |
1 |
self._stable_repo_url = self._STABLE_REPO_URL |
90 |
|
else: |
91 |
0 |
self._stable_repo_url = vca_config.get("stablerepourl") |
92 |
|
|
93 |
1 |
@staticmethod |
94 |
1 |
def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str): |
95 |
|
""" |
96 |
|
Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only |
97 |
|
cluster_id for backward compatibility |
98 |
|
""" |
99 |
1 |
namespace, _, cluster_id = cluster_uuid.rpartition(':') |
100 |
1 |
return namespace, cluster_id |
101 |
|
|
102 |
1 |
async def init_env( |
103 |
|
self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None |
104 |
|
) -> (str, bool): |
105 |
|
""" |
106 |
|
It prepares a given K8s cluster environment to run Charts |
107 |
|
|
108 |
|
:param k8s_creds: credentials to access a given K8s cluster, i.e. a valid |
109 |
|
'.kube/config' |
110 |
|
:param namespace: optional namespace to be used for helm. By default, |
111 |
|
'kube-system' will be used |
112 |
|
:param reuse_cluster_uuid: existing cluster uuid for reuse |
113 |
|
:return: uuid of the K8s cluster and True if connector has installed some |
114 |
|
software in the cluster |
115 |
|
(on error, an exception will be raised) |
116 |
|
""" |
117 |
|
|
118 |
1 |
if reuse_cluster_uuid: |
119 |
1 |
namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid) |
120 |
1 |
namespace = namespace_ or namespace |
121 |
|
else: |
122 |
0 |
cluster_id = str(uuid4()) |
123 |
1 |
cluster_uuid = "{}:{}".format(namespace, cluster_id) |
124 |
|
|
125 |
1 |
self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)) |
126 |
|
|
127 |
1 |
paths, env = self._init_paths_env( |
128 |
|
cluster_name=cluster_id, create_if_not_exist=True |
129 |
|
) |
130 |
1 |
mode = stat.S_IRUSR | stat.S_IWUSR |
131 |
1 |
with open(paths["kube_config"], "w", mode) as f: |
132 |
1 |
f.write(k8s_creds) |
133 |
1 |
os.chmod(paths["kube_config"], 0o600) |
134 |
|
|
135 |
|
# Code with initialization specific of helm version |
136 |
1 |
n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env) |
137 |
|
|
138 |
|
# sync fs with local data |
139 |
1 |
self.fs.reverse_sync(from_path=cluster_id) |
140 |
|
|
141 |
1 |
self.log.info("Cluster {} initialized".format(cluster_id)) |
142 |
|
|
143 |
1 |
return cluster_uuid, n2vc_installed_sw |
144 |
|
|
145 |
1 |
async def repo_add( |
146 |
|
self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart" |
147 |
|
): |
148 |
1 |
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid) |
149 |
1 |
self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format( |
150 |
|
cluster_id, repo_type, name, url)) |
151 |
|
|
152 |
|
# sync local dir |
153 |
1 |
self.fs.sync(from_path=cluster_id) |
154 |
|
|
155 |
|
# init_env |
156 |
1 |
paths, env = self._init_paths_env( |
157 |
|
cluster_name=cluster_id, create_if_not_exist=True |
158 |
|
) |
159 |
|
|
160 |
|
# helm repo update |
161 |
1 |
command = "{} repo update".format( |
162 |
|
self._helm_command |
163 |
|
) |
164 |
1 |
self.log.debug("updating repo: {}".format(command)) |
165 |
1 |
await self._local_async_exec(command=command, raise_exception_on_error=False, env=env) |
166 |
|
|
167 |
|
# helm repo add name url |
168 |
1 |
command = "{} repo add {} {}".format( |
169 |
|
self._helm_command, name, url |
170 |
|
) |
171 |
1 |
self.log.debug("adding repo: {}".format(command)) |
172 |
1 |
await self._local_async_exec(command=command, raise_exception_on_error=True, env=env) |
173 |
|
|
174 |
|
# sync fs |
175 |
1 |
self.fs.reverse_sync(from_path=cluster_id) |
176 |
|
|
177 |
1 |
async def repo_list(self, cluster_uuid: str) -> list: |
178 |
|
""" |
179 |
|
Get the list of registered repositories |
180 |
|
|
181 |
|
:return: list of registered repositories: [ (name, url) .... ] |
182 |
|
""" |
183 |
|
|
184 |
1 |
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid) |
185 |
1 |
self.log.debug("list repositories for cluster {}".format(cluster_id)) |
186 |
|
|
187 |
|
# sync local dir |
188 |
1 |
self.fs.sync(from_path=cluster_id) |
189 |
|
|
190 |
|
# config filename |
191 |
1 |
paths, env = self._init_paths_env( |
192 |
|
cluster_name=cluster_id, create_if_not_exist=True |
193 |
|
) |
194 |
|
|
195 |
1 |
command = "{} repo list --output yaml".format( |
196 |
|
self._helm_command |
197 |
|
) |
198 |
|
|
199 |
|
# Set exception to false because if there are no repos just want an empty list |
200 |
1 |
output, _rc = await self._local_async_exec( |
201 |
|
command=command, raise_exception_on_error=False, env=env |
202 |
|
) |
203 |
|
|
204 |
|
# sync fs |
205 |
1 |
self.fs.reverse_sync(from_path=cluster_id) |
206 |
|
|
207 |
1 |
if _rc == 0: |
208 |
1 |
if output and len(output) > 0: |
209 |
0 |
repos = yaml.load(output, Loader=yaml.SafeLoader) |
210 |
|
# unify format between helm2 and helm3 setting all keys lowercase |
211 |
0 |
return self._lower_keys_list(repos) |
212 |
|
else: |
213 |
1 |
return [] |
214 |
|
else: |
215 |
0 |
return [] |
216 |
|
|
217 |
1 |
async def repo_remove(self, cluster_uuid: str, name: str): |
218 |
|
|
219 |
1 |
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid) |
220 |
1 |
self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id)) |
221 |
|
|
222 |
|
# sync local dir |
223 |
1 |
self.fs.sync(from_path=cluster_id) |
224 |
|
|
225 |
|
# init env, paths |
226 |
1 |
paths, env = self._init_paths_env( |
227 |
|
cluster_name=cluster_id, create_if_not_exist=True |
228 |
|
) |
229 |
|
|
230 |
1 |
command = "{} repo remove {}".format( |
231 |
|
self._helm_command, name |
232 |
|
) |
233 |
1 |
await self._local_async_exec(command=command, raise_exception_on_error=True, env=env) |
234 |
|
|
235 |
|
# sync fs |
236 |
1 |
self.fs.reverse_sync(from_path=cluster_id) |
237 |
|
|
238 |
1 |
async def reset( |
239 |
|
self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False |
240 |
|
) -> bool: |
241 |
|
|
242 |
1 |
namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid) |
243 |
1 |
self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}" |
244 |
|
.format(cluster_id, uninstall_sw)) |
245 |
|
|
246 |
|
# sync local dir |
247 |
1 |
self.fs.sync(from_path=cluster_id) |
248 |
|
|
249 |
|
# uninstall releases if needed. |
250 |
1 |
if uninstall_sw: |
251 |
1 |
releases = await self.instances_list(cluster_uuid=cluster_uuid) |
252 |
1 |
if len(releases) > 0: |
253 |
1 |
if force: |
254 |
1 |
for r in releases: |
255 |
1 |
try: |
256 |
1 |
kdu_instance = r.get("name") |
257 |
1 |
chart = r.get("chart") |
258 |
1 |
self.log.debug( |
259 |
|
"Uninstalling {} -> {}".format(chart, kdu_instance) |
260 |
|
) |
261 |
1 |
await self.uninstall( |
262 |
|
cluster_uuid=cluster_uuid, kdu_instance=kdu_instance |
263 |
|
) |
264 |
0 |
except Exception as e: |
265 |
|
# will not raise exception as it was found |
266 |
|
# that in some cases of previously installed helm releases it |
267 |
|
# raised an error |
268 |
0 |
self.log.warn( |
269 |
|
"Error uninstalling release {}: {}".format(kdu_instance, e) |
270 |
|
) |
271 |
|
else: |
272 |
0 |
msg = ( |
273 |
|
"Cluster uuid: {} has releases and not force. Leaving K8s helm environment" |
274 |
|
).format(cluster_id) |
275 |
0 |
self.log.warn(msg) |
276 |
0 |
uninstall_sw = False # Allow to remove k8s cluster without removing Tiller |
277 |
|
|
278 |
1 |
if uninstall_sw: |
279 |
1 |
await self._uninstall_sw(cluster_id, namespace) |
280 |
|
|
281 |
|
# delete cluster directory |
282 |
1 |
self.log.debug("Removing directory {}".format(cluster_id)) |
283 |
1 |
self.fs.file_delete(cluster_id, ignore_non_exist=True) |
284 |
|
# Remove also local directorio if still exist |
285 |
1 |
direct = self.fs.path + "/" + cluster_id |
286 |
1 |
shutil.rmtree(direct, ignore_errors=True) |
287 |
|
|
288 |
1 |
return True |
289 |
|
|
290 |
1 |
async def _install_impl( |
291 |
|
self, |
292 |
|
cluster_id: str, |
293 |
|
kdu_model: str, |
294 |
|
paths: dict, |
295 |
|
env: dict, |
296 |
|
kdu_instance: str, |
297 |
|
atomic: bool = True, |
298 |
|
timeout: float = 300, |
299 |
|
params: dict = None, |
300 |
|
db_dict: dict = None, |
301 |
|
kdu_name: str = None, |
302 |
|
namespace: str = None, |
303 |
|
): |
304 |
|
# params to str |
305 |
1 |
params_str, file_to_delete = self._params_to_file_option( |
306 |
|
cluster_id=cluster_id, params=params |
307 |
|
) |
308 |
|
|
309 |
|
# version |
310 |
1 |
version = None |
311 |
1 |
if ":" in kdu_model: |
312 |
1 |
parts = kdu_model.split(sep=":") |
313 |
1 |
if len(parts) == 2: |
314 |
1 |
version = str(parts[1]) |
315 |
1 |
kdu_model = parts[0] |
316 |
|
|
317 |
1 |
command = self._get_install_command(kdu_model, kdu_instance, namespace, |
318 |
|
params_str, version, atomic, timeout) |
319 |
|
|
320 |
1 |
self.log.debug("installing: {}".format(command)) |
321 |
|
|
322 |
1 |
if atomic: |
323 |
|
# exec helm in a task |
324 |
1 |
exec_task = asyncio.ensure_future( |
325 |
|
coro_or_future=self._local_async_exec( |
326 |
|
command=command, raise_exception_on_error=False, env=env |
327 |
|
) |
328 |
|
) |
329 |
|
|
330 |
|
# write status in another task |
331 |
1 |
status_task = asyncio.ensure_future( |
332 |
|
coro_or_future=self._store_status( |
333 |
|
cluster_id=cluster_id, |
334 |
|
kdu_instance=kdu_instance, |
335 |
|
namespace=namespace, |
336 |
|
db_dict=db_dict, |
337 |
|
operation="install", |
338 |
|
run_once=False, |
339 |
|
) |
340 |
|
) |
341 |
|
|
342 |
|
# wait for execution task |
343 |
1 |
await asyncio.wait([exec_task]) |
344 |
|
|
345 |
|
# cancel status task |
346 |
1 |
status_task.cancel() |
347 |
|
|
348 |
1 |
output, rc = exec_task.result() |
349 |
|
|
350 |
|
else: |
351 |
|
|
352 |
0 |
output, rc = await self._local_async_exec( |
353 |
|
command=command, raise_exception_on_error=False, env=env |
354 |
|
) |
355 |
|
|
356 |
|
# remove temporal values yaml file |
357 |
1 |
if file_to_delete: |
358 |
0 |
os.remove(file_to_delete) |
359 |
|
|
360 |
|
# write final status |
361 |
1 |
await self._store_status( |
362 |
|
cluster_id=cluster_id, |
363 |
|
kdu_instance=kdu_instance, |
364 |
|
namespace=namespace, |
365 |
|
db_dict=db_dict, |
366 |
|
operation="install", |
367 |
|
run_once=True, |
368 |
|
check_every=0, |
369 |
|
) |
370 |
|
|
371 |
1 |
if rc != 0: |
372 |
0 |
msg = "Error executing command: {}\nOutput: {}".format(command, output) |
373 |
0 |
self.log.error(msg) |
374 |
0 |
raise K8sException(msg) |
375 |
|
|
376 |
1 |
async def upgrade( |
377 |
|
self, |
378 |
|
cluster_uuid: str, |
379 |
|
kdu_instance: str, |
380 |
|
kdu_model: str = None, |
381 |
|
atomic: bool = True, |
382 |
|
timeout: float = 300, |
383 |
|
params: dict = None, |
384 |
|
db_dict: dict = None, |
385 |
|
): |
386 |
1 |
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid) |
387 |
1 |
self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id)) |
388 |
|
|
389 |
|
# sync local dir |
390 |
1 |
self.fs.sync(from_path=cluster_id) |
391 |
|
|
392 |
|
# look for instance to obtain namespace |
393 |
1 |
instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) |
394 |
1 |
if not instance_info: |
395 |
0 |
raise K8sException("kdu_instance {} not found".format(kdu_instance)) |
396 |
|
|
397 |
|
# init env, paths |
398 |
1 |
paths, env = self._init_paths_env( |
399 |
|
cluster_name=cluster_id, create_if_not_exist=True |
400 |
|
) |
401 |
|
|
402 |
|
# params to str |
403 |
1 |
params_str, file_to_delete = self._params_to_file_option( |
404 |
|
cluster_id=cluster_id, params=params |
405 |
|
) |
406 |
|
|
407 |
|
# version |
408 |
1 |
version = None |
409 |
1 |
if ":" in kdu_model: |
410 |
1 |
parts = kdu_model.split(sep=":") |
411 |
1 |
if len(parts) == 2: |
412 |
1 |
version = str(parts[1]) |
413 |
1 |
kdu_model = parts[0] |
414 |
|
|
415 |
1 |
command = self._get_upgrade_command(kdu_model, kdu_instance, instance_info["namespace"], |
416 |
|
params_str, version, atomic, timeout) |
417 |
|
|
418 |
1 |
self.log.debug("upgrading: {}".format(command)) |
419 |
|
|
420 |
1 |
if atomic: |
421 |
|
|
422 |
|
# exec helm in a task |
423 |
1 |
exec_task = asyncio.ensure_future( |
424 |
|
coro_or_future=self._local_async_exec( |
425 |
|
command=command, raise_exception_on_error=False, env=env |
426 |
|
) |
427 |
|
) |
428 |
|
# write status in another task |
429 |
1 |
status_task = asyncio.ensure_future( |
430 |
|
coro_or_future=self._store_status( |
431 |
|
cluster_id=cluster_id, |
432 |
|
kdu_instance=kdu_instance, |
433 |
|
namespace=instance_info["namespace"], |
434 |
|
db_dict=db_dict, |
435 |
|
operation="upgrade", |
436 |
|
run_once=False, |
437 |
|
) |
438 |
|
) |
439 |
|
|
440 |
|
# wait for execution task |
441 |
1 |
await asyncio.wait([exec_task]) |
442 |
|
|
443 |
|
# cancel status task |
444 |
1 |
status_task.cancel() |
445 |
1 |
output, rc = exec_task.result() |
446 |
|
|
447 |
|
else: |
448 |
|
|
449 |
0 |
output, rc = await self._local_async_exec( |
450 |
|
command=command, raise_exception_on_error=False, env=env |
451 |
|
) |
452 |
|
|
453 |
|
# remove temporal values yaml file |
454 |
1 |
if file_to_delete: |
455 |
0 |
os.remove(file_to_delete) |
456 |
|
|
457 |
|
# write final status |
458 |
1 |
await self._store_status( |
459 |
|
cluster_id=cluster_id, |
460 |
|
kdu_instance=kdu_instance, |
461 |
|
namespace=instance_info["namespace"], |
462 |
|
db_dict=db_dict, |
463 |
|
operation="upgrade", |
464 |
|
run_once=True, |
465 |
|
check_every=0, |
466 |
|
) |
467 |
|
|
468 |
1 |
if rc != 0: |
469 |
0 |
msg = "Error executing command: {}\nOutput: {}".format(command, output) |
470 |
0 |
self.log.error(msg) |
471 |
0 |
raise K8sException(msg) |
472 |
|
|
473 |
|
# sync fs |
474 |
1 |
self.fs.reverse_sync(from_path=cluster_id) |
475 |
|
|
476 |
|
# return new revision number |
477 |
1 |
instance = await self.get_instance_info( |
478 |
|
cluster_uuid=cluster_uuid, kdu_instance=kdu_instance |
479 |
|
) |
480 |
1 |
if instance: |
481 |
1 |
revision = int(instance.get("revision")) |
482 |
1 |
self.log.debug("New revision: {}".format(revision)) |
483 |
1 |
return revision |
484 |
|
else: |
485 |
0 |
return 0 |
486 |
|
|
487 |
1 |
async def rollback( |
488 |
|
self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None |
489 |
|
): |
490 |
|
|
491 |
1 |
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid) |
492 |
1 |
self.log.debug( |
493 |
|
"rollback kdu_instance {} to revision {} from cluster {}".format( |
494 |
|
kdu_instance, revision, cluster_id |
495 |
|
) |
496 |
|
) |
497 |
|
|
498 |
|
# sync local dir |
499 |
1 |
self.fs.sync(from_path=cluster_id) |
500 |
|
|
501 |
|
# look for instance to obtain namespace |
502 |
1 |
instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) |
503 |
1 |
if not instance_info: |
504 |
0 |
raise K8sException("kdu_instance {} not found".format(kdu_instance)) |
505 |
|
|
506 |
|
# init env, paths |
507 |
1 |
paths, env = self._init_paths_env( |
508 |
|
cluster_name=cluster_id, create_if_not_exist=True |
509 |
|
) |
510 |
|
|
511 |
1 |
command = self._get_rollback_command(kdu_instance, instance_info["namespace"], |
512 |
|
revision) |
513 |
|
|
514 |
1 |
self.log.debug("rolling_back: {}".format(command)) |
515 |
|
|
516 |
|
# exec helm in a task |
517 |
1 |
exec_task = asyncio.ensure_future( |
518 |
|
coro_or_future=self._local_async_exec( |
519 |
|
command=command, raise_exception_on_error=False, env=env |
520 |
|
) |
521 |
|
) |
522 |
|
# write status in another task |
523 |
1 |
status_task = asyncio.ensure_future( |
524 |
|
coro_or_future=self._store_status( |
525 |
|
cluster_id=cluster_id, |
526 |
|
kdu_instance=kdu_instance, |
527 |
|
namespace=instance_info["namespace"], |
528 |
|
db_dict=db_dict, |
529 |
|
operation="rollback", |
530 |
|
run_once=False, |
531 |
|
) |
532 |
|
) |
533 |
|
|
534 |
|
# wait for execution task |
535 |
1 |
await asyncio.wait([exec_task]) |
536 |
|
|
537 |
|
# cancel status task |
538 |
1 |
status_task.cancel() |
539 |
|
|
540 |
1 |
output, rc = exec_task.result() |
541 |
|
|
542 |
|
# write final status |
543 |
1 |
await self._store_status( |
544 |
|
cluster_id=cluster_id, |
545 |
|
kdu_instance=kdu_instance, |
546 |
|
namespace=instance_info["namespace"], |
547 |
|
db_dict=db_dict, |
548 |
|
operation="rollback", |
549 |
|
run_once=True, |
550 |
|
check_every=0, |
551 |
|
) |
552 |
|
|
553 |
1 |
if rc != 0: |
554 |
0 |
msg = "Error executing command: {}\nOutput: {}".format(command, output) |
555 |
0 |
self.log.error(msg) |
556 |
0 |
raise K8sException(msg) |
557 |
|
|
558 |
|
# sync fs |
559 |
1 |
self.fs.reverse_sync(from_path=cluster_id) |
560 |
|
|
561 |
|
# return new revision number |
562 |
1 |
instance = await self.get_instance_info( |
563 |
|
cluster_uuid=cluster_uuid, kdu_instance=kdu_instance |
564 |
|
) |
565 |
1 |
if instance: |
566 |
1 |
revision = int(instance.get("revision")) |
567 |
1 |
self.log.debug("New revision: {}".format(revision)) |
568 |
1 |
return revision |
569 |
|
else: |
570 |
0 |
return 0 |
571 |
|
|
572 |
1 |
async def uninstall(self, cluster_uuid: str, kdu_instance: str): |
573 |
|
""" |
574 |
|
Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call |
575 |
|
(this call should happen after all _terminate-config-primitive_ of the VNF |
576 |
|
are invoked). |
577 |
|
|
578 |
|
:param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id |
579 |
|
:param kdu_instance: unique name for the KDU instance to be deleted |
580 |
|
:return: True if successful |
581 |
|
""" |
582 |
|
|
583 |
1 |
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid) |
584 |
1 |
self.log.debug( |
585 |
|
"uninstall kdu_instance {} from cluster {}".format( |
586 |
|
kdu_instance, cluster_id |
587 |
|
) |
588 |
|
) |
589 |
|
|
590 |
|
# sync local dir |
591 |
1 |
self.fs.sync(from_path=cluster_id) |
592 |
|
|
593 |
|
# look for instance to obtain namespace |
594 |
1 |
instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) |
595 |
1 |
if not instance_info: |
596 |
0 |
raise K8sException("kdu_instance {} not found".format(kdu_instance)) |
597 |
|
|
598 |
|
# init env, paths |
599 |
1 |
paths, env = self._init_paths_env( |
600 |
|
cluster_name=cluster_id, create_if_not_exist=True |
601 |
|
) |
602 |
|
|
603 |
1 |
command = self._get_uninstall_command(kdu_instance, instance_info["namespace"]) |
604 |
1 |
output, _rc = await self._local_async_exec( |
605 |
|
command=command, raise_exception_on_error=True, env=env |
606 |
|
) |
607 |
|
|
608 |
|
# sync fs |
609 |
1 |
self.fs.reverse_sync(from_path=cluster_id) |
610 |
|
|
611 |
1 |
return self._output_to_table(output) |
612 |
|
|
613 |
1 |
async def instances_list(self, cluster_uuid: str) -> list: |
614 |
|
""" |
615 |
|
returns a list of deployed releases in a cluster |
616 |
|
|
617 |
|
:param cluster_uuid: the 'cluster' or 'namespace:cluster' |
618 |
|
:return: |
619 |
|
""" |
620 |
|
|
621 |
1 |
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid) |
622 |
1 |
self.log.debug("list releases for cluster {}".format(cluster_id)) |
623 |
|
|
624 |
|
# sync local dir |
625 |
1 |
self.fs.sync(from_path=cluster_id) |
626 |
|
|
627 |
|
# execute internal command |
628 |
1 |
result = await self._instances_list(cluster_id) |
629 |
|
|
630 |
|
# sync fs |
631 |
1 |
self.fs.reverse_sync(from_path=cluster_id) |
632 |
|
|
633 |
1 |
return result |
634 |
|
|
635 |
1 |
async def get_instance_info(self, cluster_uuid: str, kdu_instance: str): |
636 |
0 |
instances = await self.instances_list(cluster_uuid=cluster_uuid) |
637 |
0 |
for instance in instances: |
638 |
0 |
if instance.get("name") == kdu_instance: |
639 |
0 |
return instance |
640 |
0 |
self.log.debug("Instance {} not found".format(kdu_instance)) |
641 |
0 |
return None |
642 |
|
|
643 |
1 |
async def exec_primitive( |
644 |
|
self, |
645 |
|
cluster_uuid: str = None, |
646 |
|
kdu_instance: str = None, |
647 |
|
primitive_name: str = None, |
648 |
|
timeout: float = 300, |
649 |
|
params: dict = None, |
650 |
|
db_dict: dict = None, |
651 |
|
) -> str: |
652 |
|
"""Exec primitive (Juju action) |
653 |
|
|
654 |
|
:param cluster_uuid: The UUID of the cluster or namespace:cluster |
655 |
|
:param kdu_instance: The unique name of the KDU instance |
656 |
|
:param primitive_name: Name of action that will be executed |
657 |
|
:param timeout: Timeout for action execution |
658 |
|
:param params: Dictionary of all the parameters needed for the action |
659 |
|
:db_dict: Dictionary for any additional data |
660 |
|
|
661 |
|
:return: Returns the output of the action |
662 |
|
""" |
663 |
0 |
raise K8sException( |
664 |
|
"KDUs deployed with Helm don't support actions " |
665 |
|
"different from rollback, upgrade and status" |
666 |
|
) |
667 |
|
|
668 |
1 |
async def get_services(self, |
669 |
|
cluster_uuid: str, |
670 |
|
kdu_instance: str, |
671 |
|
namespace: str) -> list: |
672 |
|
""" |
673 |
|
Returns a list of services defined for the specified kdu instance. |
674 |
|
|
675 |
|
:param cluster_uuid: UUID of a K8s cluster known by OSM |
676 |
|
:param kdu_instance: unique name for the KDU instance |
677 |
|
:param namespace: K8s namespace used by the KDU instance |
678 |
|
:return: If successful, it will return a list of services, Each service |
679 |
|
can have the following data: |
680 |
|
- `name` of the service |
681 |
|
- `type` type of service in the k8 cluster |
682 |
|
- `ports` List of ports offered by the service, for each port includes at least |
683 |
|
name, port, protocol |
684 |
|
- `cluster_ip` Internal ip to be used inside k8s cluster |
685 |
|
- `external_ip` List of external ips (in case they are available) |
686 |
|
""" |
687 |
|
|
688 |
1 |
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid) |
689 |
1 |
self.log.debug( |
690 |
|
"get_services: cluster_uuid: {}, kdu_instance: {}".format( |
691 |
|
cluster_uuid, kdu_instance |
692 |
|
) |
693 |
|
) |
694 |
|
|
695 |
|
# sync local dir |
696 |
1 |
self.fs.sync(from_path=cluster_id) |
697 |
|
|
698 |
|
# get list of services names for kdu |
699 |
1 |
service_names = await self._get_services(cluster_id, kdu_instance, namespace) |
700 |
|
|
701 |
1 |
service_list = [] |
702 |
1 |
for service in service_names: |
703 |
1 |
service = await self._get_service(cluster_id, service, namespace) |
704 |
1 |
service_list.append(service) |
705 |
|
|
706 |
|
# sync fs |
707 |
1 |
self.fs.reverse_sync(from_path=cluster_id) |
708 |
|
|
709 |
1 |
return service_list |
710 |
|
|
711 |
1 |
async def get_service(self, |
712 |
|
cluster_uuid: str, |
713 |
|
service_name: str, |
714 |
|
namespace: str) -> object: |
715 |
|
|
716 |
1 |
self.log.debug( |
717 |
|
"get service, service_name: {}, namespace: {}, cluster_uuid: {}".format( |
718 |
|
service_name, namespace, cluster_uuid) |
719 |
|
) |
720 |
|
|
721 |
1 |
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid) |
722 |
|
|
723 |
|
# sync local dir |
724 |
1 |
self.fs.sync(from_path=cluster_id) |
725 |
|
|
726 |
1 |
service = await self._get_service(cluster_id, service_name, namespace) |
727 |
|
|
728 |
|
# sync fs |
729 |
1 |
self.fs.reverse_sync(from_path=cluster_id) |
730 |
|
|
731 |
1 |
return service |
732 |
|
|
733 |
1 |
async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str: |
734 |
|
|
735 |
0 |
self.log.debug( |
736 |
|
"status_kdu: cluster_uuid: {}, kdu_instance: {}".format( |
737 |
|
cluster_uuid, kdu_instance |
738 |
|
) |
739 |
|
) |
740 |
|
|
741 |
0 |
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid) |
742 |
|
|
743 |
|
# sync local dir |
744 |
0 |
self.fs.sync(from_path=cluster_id) |
745 |
|
|
746 |
|
# get instance: needed to obtain namespace |
747 |
0 |
instances = await self._instances_list(cluster_id=cluster_id) |
748 |
0 |
for instance in instances: |
749 |
0 |
if instance.get("name") == kdu_instance: |
750 |
0 |
break |
751 |
|
else: |
752 |
|
# instance does not exist |
753 |
0 |
raise K8sException("Instance name: {} not found in cluster: {}".format( |
754 |
|
kdu_instance, cluster_id)) |
755 |
|
|
756 |
0 |
status = await self._status_kdu( |
757 |
|
cluster_id=cluster_id, |
758 |
|
kdu_instance=kdu_instance, |
759 |
|
namespace=instance["namespace"], |
760 |
|
show_error_log=True, |
761 |
|
return_text=True, |
762 |
|
) |
763 |
|
|
764 |
|
# sync fs |
765 |
0 |
self.fs.reverse_sync(from_path=cluster_id) |
766 |
|
|
767 |
0 |
return status |
768 |
|
|
769 |
1 |
async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str: |
770 |
|
|
771 |
1 |
self.log.debug( |
772 |
|
"inspect kdu_model values {} from (optional) repo: {}".format( |
773 |
|
kdu_model, repo_url |
774 |
|
) |
775 |
|
) |
776 |
|
|
777 |
1 |
return await self._exec_inspect_comand( |
778 |
|
inspect_command="values", kdu_model=kdu_model, repo_url=repo_url |
779 |
|
) |
780 |
|
|
781 |
1 |
async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str: |
782 |
|
|
783 |
1 |
self.log.debug( |
784 |
|
"inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url) |
785 |
|
) |
786 |
|
|
787 |
1 |
return await self._exec_inspect_comand( |
788 |
|
inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url |
789 |
|
) |
790 |
|
|
791 |
1 |
async def synchronize_repos(self, cluster_uuid: str): |
792 |
|
|
793 |
1 |
self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid)) |
794 |
1 |
try: |
795 |
1 |
db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid) |
796 |
1 |
db_repo_dict = self._get_db_repos_dict(db_repo_ids) |
797 |
|
|
798 |
1 |
local_repo_list = await self.repo_list(cluster_uuid) |
799 |
1 |
local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list} |
800 |
|
|
801 |
1 |
deleted_repo_list = [] |
802 |
1 |
added_repo_dict = {} |
803 |
|
|
804 |
|
# iterate over the list of repos in the database that should be |
805 |
|
# added if not present |
806 |
1 |
for repo_name, db_repo in db_repo_dict.items(): |
807 |
1 |
try: |
808 |
|
# check if it is already present |
809 |
1 |
curr_repo_url = local_repo_dict.get(db_repo["name"]) |
810 |
1 |
repo_id = db_repo.get("_id") |
811 |
1 |
if curr_repo_url != db_repo["url"]: |
812 |
1 |
if curr_repo_url: |
813 |
0 |
self.log.debug("repo {} url changed, delete and and again".format( |
814 |
|
db_repo["url"])) |
815 |
0 |
await self.repo_remove(cluster_uuid, db_repo["name"]) |
816 |
0 |
deleted_repo_list.append(repo_id) |
817 |
|
|
818 |
|
# add repo |
819 |
1 |
self.log.debug("add repo {}".format(db_repo["name"])) |
820 |
1 |
await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"]) |
821 |
1 |
added_repo_dict[repo_id] = db_repo["name"] |
822 |
0 |
except Exception as e: |
823 |
0 |
raise K8sException( |
824 |
|
"Error adding repo id: {}, err_msg: {} ".format( |
825 |
|
repo_id, repr(e) |
826 |
|
) |
827 |
|
) |
828 |
|
|
829 |
|
# Delete repos that are present but not in nbi_list |
830 |
1 |
for repo_name in local_repo_dict: |
831 |
1 |
if not db_repo_dict.get(repo_name) and repo_name != "stable": |
832 |
1 |
self.log.debug("delete repo {}".format(repo_name)) |
833 |
1 |
try: |
834 |
1 |
await self.repo_remove(cluster_uuid, repo_name) |
835 |
1 |
deleted_repo_list.append(repo_name) |
836 |
0 |
except Exception as e: |
837 |
0 |
self.warning( |
838 |
|
"Error deleting repo, name: {}, err_msg: {}".format( |
839 |
|
repo_name, str(e) |
840 |
|
) |
841 |
|
) |
842 |
|
|
843 |
1 |
return deleted_repo_list, added_repo_dict |
844 |
|
|
845 |
0 |
except K8sException: |
846 |
0 |
raise |
847 |
0 |
except Exception as e: |
848 |
|
# Do not raise errors synchronizing repos |
849 |
0 |
self.log.error("Error synchronizing repos: {}".format(e)) |
850 |
0 |
raise Exception("Error synchronizing repos: {}".format(e)) |
851 |
|
|
852 |
1 |
def _get_db_repos_dict(self, repo_ids: list): |
853 |
1 |
db_repos_dict = {} |
854 |
1 |
for repo_id in repo_ids: |
855 |
1 |
db_repo = self.db.get_one("k8srepos", {"_id": repo_id}) |
856 |
1 |
db_repos_dict[db_repo["name"]] = db_repo |
857 |
1 |
return db_repos_dict |
858 |
|
|
859 |
|
""" |
860 |
|
#################################################################################### |
861 |
|
################################### TO BE IMPLEMENTED SUBCLASSES ################### |
862 |
|
#################################################################################### |
863 |
|
""" |
864 |
|
|
865 |
1 |
@abc.abstractmethod |
866 |
1 |
def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True): |
867 |
|
""" |
868 |
|
Creates and returns base cluster and kube dirs and returns them. |
869 |
|
Also created helm3 dirs according to new directory specification, paths are |
870 |
|
not returned but assigned to helm environment variables |
871 |
|
|
872 |
|
:param cluster_name: cluster_name |
873 |
|
:return: Dictionary with config_paths and dictionary with helm environment variables |
874 |
|
""" |
875 |
|
|
876 |
1 |
@abc.abstractmethod |
877 |
|
async def _cluster_init(self, cluster_id, namespace, paths, env): |
878 |
|
""" |
879 |
|
Implements the helm version dependent cluster initialization |
880 |
|
""" |
881 |
|
|
882 |
1 |
@abc.abstractmethod |
883 |
|
async def _instances_list(self, cluster_id): |
884 |
|
""" |
885 |
|
Implements the helm version dependent helm instances list |
886 |
|
""" |
887 |
|
|
888 |
1 |
@abc.abstractmethod |
889 |
|
async def _get_services(self, cluster_id, kdu_instance, namespace): |
890 |
|
""" |
891 |
|
Implements the helm version dependent method to obtain services from a helm instance |
892 |
|
""" |
893 |
|
|
894 |
1 |
@abc.abstractmethod |
895 |
1 |
async def _status_kdu(self, cluster_id: str, kdu_instance: str, namespace: str = None, |
896 |
|
show_error_log: bool = False, return_text: bool = False): |
897 |
|
""" |
898 |
|
Implements the helm version dependent method to obtain status of a helm instance |
899 |
|
""" |
900 |
|
|
901 |
1 |
@abc.abstractmethod |
902 |
1 |
def _get_install_command(self, kdu_model, kdu_instance, namespace, |
903 |
|
params_str, version, atomic, timeout) -> str: |
904 |
|
""" |
905 |
|
Obtain command to be executed to delete the indicated instance |
906 |
|
""" |
907 |
|
|
908 |
1 |
@abc.abstractmethod |
909 |
1 |
def _get_upgrade_command(self, kdu_model, kdu_instance, namespace, |
910 |
|
params_str, version, atomic, timeout) -> str: |
911 |
|
""" |
912 |
|
Obtain command to be executed to upgrade the indicated instance |
913 |
|
""" |
914 |
|
|
915 |
1 |
@abc.abstractmethod |
916 |
1 |
def _get_rollback_command(self, kdu_instance, namespace, revision) -> str: |
917 |
|
""" |
918 |
|
Obtain command to be executed to rollback the indicated instance |
919 |
|
""" |
920 |
|
|
921 |
1 |
@abc.abstractmethod |
922 |
1 |
def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str: |
923 |
|
""" |
924 |
|
Obtain command to be executed to delete the indicated instance |
925 |
|
""" |
926 |
|
|
927 |
1 |
@abc.abstractmethod |
928 |
1 |
def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str, |
929 |
|
version: str): |
930 |
|
""" |
931 |
|
Obtain command to be executed to obtain information about the kdu |
932 |
|
""" |
933 |
|
|
934 |
1 |
@abc.abstractmethod |
935 |
1 |
async def _uninstall_sw(self, cluster_id: str, namespace: str): |
936 |
|
""" |
937 |
|
Method call to uninstall cluster software for helm. This method is dependent |
938 |
|
of helm version |
939 |
|
For Helm v2 it will be called when Tiller must be uninstalled |
940 |
|
For Helm v3 it does nothing and does not need to be callled |
941 |
|
""" |
942 |
|
|
943 |
1 |
@abc.abstractmethod |
944 |
1 |
def _get_helm_chart_repos_ids(self, cluster_uuid) -> list: |
945 |
|
""" |
946 |
|
Obtains the cluster repos identifiers |
947 |
|
""" |
948 |
|
|
949 |
|
""" |
950 |
|
#################################################################################### |
951 |
|
################################### P R I V A T E ################################## |
952 |
|
#################################################################################### |
953 |
|
""" |
954 |
|
|
955 |
1 |
@staticmethod |
956 |
1 |
def _check_file_exists(filename: str, exception_if_not_exists: bool = False): |
957 |
0 |
if os.path.exists(filename): |
958 |
0 |
return True |
959 |
|
else: |
960 |
0 |
msg = "File {} does not exist".format(filename) |
961 |
0 |
if exception_if_not_exists: |
962 |
0 |
raise K8sException(msg) |
963 |
|
|
964 |
1 |
@staticmethod |
965 |
|
def _remove_multiple_spaces(strobj): |
966 |
0 |
strobj = strobj.strip() |
967 |
0 |
while " " in strobj: |
968 |
0 |
strobj = strobj.replace(" ", " ") |
969 |
0 |
return strobj |
970 |
|
|
971 |
1 |
@staticmethod |
972 |
1 |
def _output_to_lines(output: str) -> list: |
973 |
0 |
output_lines = list() |
974 |
0 |
lines = output.splitlines(keepends=False) |
975 |
0 |
for line in lines: |
976 |
0 |
line = line.strip() |
977 |
0 |
if len(line) > 0: |
978 |
0 |
output_lines.append(line) |
979 |
0 |
return output_lines |
980 |
|
|
981 |
1 |
@staticmethod |
982 |
1 |
def _output_to_table(output: str) -> list: |
983 |
1 |
output_table = list() |
984 |
1 |
lines = output.splitlines(keepends=False) |
985 |
1 |
for line in lines: |
986 |
0 |
line = line.replace("\t", " ") |
987 |
0 |
line_list = list() |
988 |
0 |
output_table.append(line_list) |
989 |
0 |
cells = line.split(sep=" ") |
990 |
0 |
for cell in cells: |
991 |
0 |
cell = cell.strip() |
992 |
0 |
if len(cell) > 0: |
993 |
0 |
line_list.append(cell) |
994 |
1 |
return output_table |
995 |
|
|
996 |
1 |
@staticmethod |
997 |
1 |
def _parse_services(output: str) -> list: |
998 |
0 |
lines = output.splitlines(keepends=False) |
999 |
0 |
services = [] |
1000 |
0 |
for line in lines: |
1001 |
0 |
line = line.replace("\t", " ") |
1002 |
0 |
cells = line.split(sep=" ") |
1003 |
0 |
if len(cells) > 0 and cells[0].startswith("service/"): |
1004 |
0 |
elems = cells[0].split(sep="/") |
1005 |
0 |
if len(elems) > 1: |
1006 |
0 |
services.append(elems[1]) |
1007 |
0 |
return services |
1008 |
|
|
1009 |
1 |
@staticmethod |
1010 |
1 |
def _get_deep(dictionary: dict, members: tuple): |
1011 |
1 |
target = dictionary |
1012 |
1 |
value = None |
1013 |
1 |
try: |
1014 |
1 |
for m in members: |
1015 |
1 |
value = target.get(m) |
1016 |
0 |
if not value: |
1017 |
0 |
return None |
1018 |
|
else: |
1019 |
0 |
target = value |
1020 |
1 |
except Exception: |
1021 |
1 |
pass |
1022 |
1 |
return value |
1023 |
|
|
1024 |
|
# find key:value in several lines |
1025 |
1 |
@staticmethod |
1026 |
1 |
def _find_in_lines(p_lines: list, p_key: str) -> str: |
1027 |
0 |
for line in p_lines: |
1028 |
0 |
try: |
1029 |
0 |
if line.startswith(p_key + ":"): |
1030 |
0 |
parts = line.split(":") |
1031 |
0 |
the_value = parts[1].strip() |
1032 |
0 |
return the_value |
1033 |
0 |
except Exception: |
1034 |
|
# ignore it |
1035 |
0 |
pass |
1036 |
0 |
return None |
1037 |
|
|
1038 |
1 |
@staticmethod |
1039 |
1 |
def _lower_keys_list(input_list: list): |
1040 |
|
""" |
1041 |
|
Transform the keys in a list of dictionaries to lower case and returns a new list |
1042 |
|
of dictionaries |
1043 |
|
""" |
1044 |
0 |
new_list = [] |
1045 |
0 |
for dictionary in input_list: |
1046 |
0 |
new_dict = dict((k.lower(), v) for k, v in dictionary.items()) |
1047 |
0 |
new_list.append(new_dict) |
1048 |
0 |
return new_list |
1049 |
|
|
1050 |
1 |
def _local_exec(self, command: str) -> (str, int): |
1051 |
0 |
command = self._remove_multiple_spaces(command) |
1052 |
0 |
self.log.debug("Executing sync local command: {}".format(command)) |
1053 |
|
# raise exception if fails |
1054 |
0 |
output = "" |
1055 |
0 |
try: |
1056 |
0 |
output = subprocess.check_output( |
1057 |
|
command, shell=True, universal_newlines=True |
1058 |
|
) |
1059 |
0 |
return_code = 0 |
1060 |
0 |
self.log.debug(output) |
1061 |
0 |
except Exception: |
1062 |
0 |
return_code = 1 |
1063 |
|
|
1064 |
0 |
return output, return_code |
1065 |
|
|
1066 |
1 |
async def _local_async_exec( |
1067 |
|
self, |
1068 |
|
command: str, |
1069 |
|
raise_exception_on_error: bool = False, |
1070 |
|
show_error_log: bool = True, |
1071 |
|
encode_utf8: bool = False, |
1072 |
|
env: dict = None |
1073 |
|
) -> (str, int): |
1074 |
|
|
1075 |
0 |
command = K8sHelmBaseConnector._remove_multiple_spaces(command) |
1076 |
0 |
self.log.debug("Executing async local command: {}, env: {}".format(command, env)) |
1077 |
|
|
1078 |
|
# split command |
1079 |
0 |
command = shlex.split(command) |
1080 |
|
|
1081 |
0 |
environ = os.environ.copy() |
1082 |
0 |
if env: |
1083 |
0 |
environ.update(env) |
1084 |
|
|
1085 |
0 |
try: |
1086 |
0 |
process = await asyncio.create_subprocess_exec( |
1087 |
|
*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, |
1088 |
|
env=environ |
1089 |
|
) |
1090 |
|
|
1091 |
|
# wait for command terminate |
1092 |
0 |
stdout, stderr = await process.communicate() |
1093 |
|
|
1094 |
0 |
return_code = process.returncode |
1095 |
|
|
1096 |
0 |
output = "" |
1097 |
0 |
if stdout: |
1098 |
0 |
output = stdout.decode("utf-8").strip() |
1099 |
|
# output = stdout.decode() |
1100 |
0 |
if stderr: |
1101 |
0 |
output = stderr.decode("utf-8").strip() |
1102 |
|
# output = stderr.decode() |
1103 |
|
|
1104 |
0 |
if return_code != 0 and show_error_log: |
1105 |
0 |
self.log.debug( |
1106 |
|
"Return code (FAIL): {}\nOutput:\n{}".format(return_code, output) |
1107 |
|
) |
1108 |
|
else: |
1109 |
0 |
self.log.debug("Return code: {}".format(return_code)) |
1110 |
|
|
1111 |
0 |
if raise_exception_on_error and return_code != 0: |
1112 |
0 |
raise K8sException(output) |
1113 |
|
|
1114 |
0 |
if encode_utf8: |
1115 |
0 |
output = output.encode("utf-8").strip() |
1116 |
0 |
output = str(output).replace("\\n", "\n") |
1117 |
|
|
1118 |
0 |
return output, return_code |
1119 |
|
|
1120 |
0 |
except asyncio.CancelledError: |
1121 |
0 |
raise |
1122 |
0 |
except K8sException: |
1123 |
0 |
raise |
1124 |
0 |
except Exception as e: |
1125 |
0 |
msg = "Exception executing command: {} -> {}".format(command, e) |
1126 |
0 |
self.log.error(msg) |
1127 |
0 |
if raise_exception_on_error: |
1128 |
0 |
raise K8sException(e) from e |
1129 |
|
else: |
1130 |
0 |
return "", -1 |
1131 |
|
|
1132 |
1 |
async def _local_async_exec_pipe(self, |
1133 |
|
command1: str, |
1134 |
|
command2: str, |
1135 |
|
raise_exception_on_error: bool = True, |
1136 |
|
show_error_log: bool = True, |
1137 |
|
encode_utf8: bool = False, |
1138 |
|
env: dict = None): |
1139 |
|
|
1140 |
0 |
command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1) |
1141 |
0 |
command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2) |
1142 |
0 |
command = "{} | {}".format(command1, command2) |
1143 |
0 |
self.log.debug("Executing async local command: {}, env: {}".format(command, env)) |
1144 |
|
|
1145 |
|
# split command |
1146 |
0 |
command1 = shlex.split(command1) |
1147 |
0 |
command2 = shlex.split(command2) |
1148 |
|
|
1149 |
0 |
environ = os.environ.copy() |
1150 |
0 |
if env: |
1151 |
0 |
environ.update(env) |
1152 |
|
|
1153 |
0 |
try: |
1154 |
0 |
read, write = os.pipe() |
1155 |
0 |
await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ) |
1156 |
0 |
os.close(write) |
1157 |
0 |
process_2 = await asyncio.create_subprocess_exec(*command2, stdin=read, |
1158 |
|
stdout=asyncio.subprocess.PIPE, |
1159 |
|
env=environ) |
1160 |
0 |
os.close(read) |
1161 |
0 |
stdout, stderr = await process_2.communicate() |
1162 |
|
|
1163 |
0 |
return_code = process_2.returncode |
1164 |
|
|
1165 |
0 |
output = "" |
1166 |
0 |
if stdout: |
1167 |
0 |
output = stdout.decode("utf-8").strip() |
1168 |
|
# output = stdout.decode() |
1169 |
0 |
if stderr: |
1170 |
0 |
output = stderr.decode("utf-8").strip() |
1171 |
|
# output = stderr.decode() |
1172 |
|
|
1173 |
0 |
if return_code != 0 and show_error_log: |
1174 |
0 |
self.log.debug( |
1175 |
|
"Return code (FAIL): {}\nOutput:\n{}".format(return_code, output) |
1176 |
|
) |
1177 |
|
else: |
1178 |
0 |
self.log.debug("Return code: {}".format(return_code)) |
1179 |
|
|
1180 |
0 |
if raise_exception_on_error and return_code != 0: |
1181 |
0 |
raise K8sException(output) |
1182 |
|
|
1183 |
0 |
if encode_utf8: |
1184 |
0 |
output = output.encode("utf-8").strip() |
1185 |
0 |
output = str(output).replace("\\n", "\n") |
1186 |
|
|
1187 |
0 |
return output, return_code |
1188 |
0 |
except asyncio.CancelledError: |
1189 |
0 |
raise |
1190 |
0 |
except K8sException: |
1191 |
0 |
raise |
1192 |
0 |
except Exception as e: |
1193 |
0 |
msg = "Exception executing command: {} -> {}".format(command, e) |
1194 |
0 |
self.log.error(msg) |
1195 |
0 |
if raise_exception_on_error: |
1196 |
0 |
raise K8sException(e) from e |
1197 |
|
else: |
1198 |
0 |
return "", -1 |
1199 |
|
|
1200 |
1 |
async def _get_service(self, cluster_id, service_name, namespace): |
1201 |
|
""" |
1202 |
|
Obtains the data of the specified service in the k8cluster. |
1203 |
|
|
1204 |
|
:param cluster_id: id of a K8s cluster known by OSM |
1205 |
|
:param service_name: name of the K8s service in the specified namespace |
1206 |
|
:param namespace: K8s namespace used by the KDU instance |
1207 |
|
:return: If successful, it will return a service with the following data: |
1208 |
|
- `name` of the service |
1209 |
|
- `type` type of service in the k8 cluster |
1210 |
|
- `ports` List of ports offered by the service, for each port includes at least |
1211 |
|
name, port, protocol |
1212 |
|
- `cluster_ip` Internal ip to be used inside k8s cluster |
1213 |
|
- `external_ip` List of external ips (in case they are available) |
1214 |
|
""" |
1215 |
|
|
1216 |
|
# init config, env |
1217 |
1 |
paths, env = self._init_paths_env( |
1218 |
|
cluster_name=cluster_id, create_if_not_exist=True |
1219 |
|
) |
1220 |
|
|
1221 |
1 |
command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format( |
1222 |
|
self.kubectl_command, paths["kube_config"], namespace, service_name |
1223 |
|
) |
1224 |
|
|
1225 |
1 |
output, _rc = await self._local_async_exec( |
1226 |
|
command=command, raise_exception_on_error=True, env=env |
1227 |
|
) |
1228 |
|
|
1229 |
1 |
data = yaml.load(output, Loader=yaml.SafeLoader) |
1230 |
|
|
1231 |
1 |
service = { |
1232 |
|
"name": service_name, |
1233 |
|
"type": self._get_deep(data, ("spec", "type")), |
1234 |
|
"ports": self._get_deep(data, ("spec", "ports")), |
1235 |
|
"cluster_ip": self._get_deep(data, ("spec", "clusterIP")) |
1236 |
|
} |
1237 |
1 |
if service["type"] == "LoadBalancer": |
1238 |
0 |
ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress")) |
1239 |
0 |
ip_list = [elem["ip"] for elem in ip_map_list] |
1240 |
0 |
service["external_ip"] = ip_list |
1241 |
|
|
1242 |
1 |
return service |
1243 |
|
|
1244 |
1 |
async def _exec_inspect_comand( |
1245 |
|
self, inspect_command: str, kdu_model: str, repo_url: str = None |
1246 |
|
): |
1247 |
|
""" |
1248 |
|
Obtains information about a kdu, no cluster (no env) |
1249 |
|
""" |
1250 |
|
|
1251 |
1 |
repo_str = "" |
1252 |
1 |
if repo_url: |
1253 |
1 |
repo_str = " --repo {}".format(repo_url) |
1254 |
|
|
1255 |
1 |
idx = kdu_model.find("/") |
1256 |
1 |
if idx >= 0: |
1257 |
1 |
idx += 1 |
1258 |
1 |
kdu_model = kdu_model[idx:] |
1259 |
|
|
1260 |
1 |
version = "" |
1261 |
1 |
if ":" in kdu_model: |
1262 |
1 |
parts = kdu_model.split(sep=":") |
1263 |
1 |
if len(parts) == 2: |
1264 |
1 |
version = "--version {}".format(str(parts[1])) |
1265 |
1 |
kdu_model = parts[0] |
1266 |
|
|
1267 |
1 |
full_command = self._get_inspect_command(inspect_command, kdu_model, repo_str, version) |
1268 |
1 |
output, _rc = await self._local_async_exec( |
1269 |
|
command=full_command, encode_utf8=True |
1270 |
|
) |
1271 |
|
|
1272 |
1 |
return output |
1273 |
|
|
1274 |
1 |
async def _store_status( |
1275 |
|
self, |
1276 |
|
cluster_id: str, |
1277 |
|
operation: str, |
1278 |
|
kdu_instance: str, |
1279 |
|
namespace: str = None, |
1280 |
|
check_every: float = 10, |
1281 |
|
db_dict: dict = None, |
1282 |
|
run_once: bool = False, |
1283 |
|
): |
1284 |
1 |
while True: |
1285 |
1 |
try: |
1286 |
1 |
await asyncio.sleep(check_every) |
1287 |
1 |
detailed_status = await self._status_kdu( |
1288 |
|
cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=namespace, |
1289 |
|
return_text=False |
1290 |
|
) |
1291 |
1 |
status = detailed_status.get("info").get("description") |
1292 |
1 |
self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status)) |
1293 |
|
# write status to db |
1294 |
1 |
result = await self.write_app_status_to_db( |
1295 |
|
db_dict=db_dict, |
1296 |
|
status=str(status), |
1297 |
|
detailed_status=str(detailed_status), |
1298 |
|
operation=operation, |
1299 |
|
) |
1300 |
1 |
if not result: |
1301 |
0 |
self.log.info("Error writing in database. Task exiting...") |
1302 |
0 |
return |
1303 |
0 |
except asyncio.CancelledError: |
1304 |
0 |
self.log.debug("Task cancelled") |
1305 |
0 |
return |
1306 |
0 |
except Exception as e: |
1307 |
0 |
self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True) |
1308 |
0 |
pass |
1309 |
|
finally: |
1310 |
1 |
if run_once: |
1311 |
1 |
return |
1312 |
|
|
1313 |
|
# params for use in -f file |
1314 |
|
# returns values file option and filename (in order to delete it at the end) |
1315 |
1 |
def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str): |
1316 |
|
|
1317 |
1 |
if params and len(params) > 0: |
1318 |
0 |
self._init_paths_env( |
1319 |
|
cluster_name=cluster_id, create_if_not_exist=True |
1320 |
|
) |
1321 |
|
|
1322 |
0 |
def get_random_number(): |
1323 |
0 |
r = random.randrange(start=1, stop=99999999) |
1324 |
0 |
s = str(r) |
1325 |
0 |
while len(s) < 10: |
1326 |
0 |
s = "0" + s |
1327 |
0 |
return s |
1328 |
|
|
1329 |
0 |
params2 = dict() |
1330 |
0 |
for key in params: |
1331 |
0 |
value = params.get(key) |
1332 |
0 |
if "!!yaml" in str(value): |
1333 |
0 |
value = yaml.load(value[7:]) |
1334 |
0 |
params2[key] = value |
1335 |
|
|
1336 |
0 |
values_file = get_random_number() + ".yaml" |
1337 |
0 |
with open(values_file, "w") as stream: |
1338 |
0 |
yaml.dump(params2, stream, indent=4, default_flow_style=False) |
1339 |
|
|
1340 |
0 |
return "-f {}".format(values_file), values_file |
1341 |
|
|
1342 |
1 |
return "", None |
1343 |
|
|
1344 |
|
# params for use in --set option |
1345 |
1 |
@staticmethod |
1346 |
1 |
def _params_to_set_option(params: dict) -> str: |
1347 |
0 |
params_str = "" |
1348 |
0 |
if params and len(params) > 0: |
1349 |
0 |
start = True |
1350 |
0 |
for key in params: |
1351 |
0 |
value = params.get(key, None) |
1352 |
0 |
if value is not None: |
1353 |
0 |
if start: |
1354 |
0 |
params_str += "--set " |
1355 |
0 |
start = False |
1356 |
|
else: |
1357 |
0 |
params_str += "," |
1358 |
0 |
params_str += "{}={}".format(key, value) |
1359 |
0 |
return params_str |
1360 |
|
|
1361 |
1 |
@staticmethod |
1362 |
|
def generate_kdu_instance_name(**kwargs): |
1363 |
0 |
chart_name = kwargs["kdu_model"] |
1364 |
|
# check embeded chart (file or dir) |
1365 |
0 |
if chart_name.startswith("/"): |
1366 |
|
# extract file or directory name |
1367 |
0 |
chart_name = chart_name[chart_name.rfind("/") + 1:] |
1368 |
|
# check URL |
1369 |
0 |
elif "://" in chart_name: |
1370 |
|
# extract last portion of URL |
1371 |
0 |
chart_name = chart_name[chart_name.rfind("/") + 1:] |
1372 |
|
|
1373 |
0 |
name = "" |
1374 |
0 |
for c in chart_name: |
1375 |
0 |
if c.isalpha() or c.isnumeric(): |
1376 |
0 |
name += c |
1377 |
|
else: |
1378 |
0 |
name += "-" |
1379 |
0 |
if len(name) > 35: |
1380 |
0 |
name = name[0:35] |
1381 |
|
|
1382 |
|
# if does not start with alpha character, prefix 'a' |
1383 |
0 |
if not name[0].isalpha(): |
1384 |
0 |
name = "a" + name |
1385 |
|
|
1386 |
0 |
name += "-" |
1387 |
|
|
1388 |
0 |
def get_random_number(): |
1389 |
0 |
r = random.randrange(start=1, stop=99999999) |
1390 |
0 |
s = str(r) |
1391 |
0 |
s = s.rjust(10, "0") |
1392 |
0 |
return s |
1393 |
|
|
1394 |
0 |
name = name + get_random_number() |
1395 |
0 |
return name.lower() |