blob: 1efacc447423c21b1592ab2123c210a42271f5e1 [file] [log] [blame]
quilesj26c78a42019-10-28 18:10:42 +01001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22
quilesj26c78a42019-10-28 18:10:42 +010023import subprocess
24import os
25import shutil
26import asyncio
quilesj26c78a42019-10-28 18:10:42 +010027import time
28import yaml
29from uuid import uuid4
30import random
31from n2vc.k8s_conn import K8sConnector
quilesja6748412019-12-04 07:51:26 +000032from n2vc.exceptions import K8sException
quilesj26c78a42019-10-28 18:10:42 +010033
34
35class K8sHelmConnector(K8sConnector):
36
37 """
38 ##################################################################################################
39 ########################################## P U B L I C ###########################################
40 ##################################################################################################
41 """
42
43 def __init__(
44 self,
45 fs: object,
46 db: object,
47 kubectl_command: str = '/usr/bin/kubectl',
48 helm_command: str = '/usr/bin/helm',
49 log: object = None,
50 on_update_db=None
51 ):
52 """
53
54 :param fs: file system for kubernetes and helm configuration
55 :param db: database object to write current operation status
56 :param kubectl_command: path to kubectl executable
57 :param helm_command: path to helm executable
58 :param log: logger
59 :param on_update_db: callback called when k8s connector updates database
60 """
61
62 # parent class
63 K8sConnector.__init__(
64 self,
65 db=db,
66 log=log,
67 on_update_db=on_update_db
68 )
69
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +010070 self.log.info('Initializing K8S Helm connector')
quilesj26c78a42019-10-28 18:10:42 +010071
72 # random numbers for release name generation
73 random.seed(time.time())
74
75 # the file system
76 self.fs = fs
77
78 # exception if kubectl is not installed
79 self.kubectl_command = kubectl_command
80 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
81
82 # exception if helm is not installed
83 self._helm_command = helm_command
84 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
85
quilesj1be06302019-11-29 11:17:11 +000086 # initialize helm client-only
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +010087 self.log.debug('Initializing helm client-only...')
quilesj1be06302019-11-29 11:17:11 +000088 command = '{} init --client-only'.format(self._helm_command)
89 try:
90 asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False))
91 # loop = asyncio.get_event_loop()
92 # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False))
93 except Exception as e:
94 self.warning(msg='helm init failed (it was already initialized): {}'.format(e))
95
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +010096 self.log.info('K8S Helm connector initialized')
quilesj26c78a42019-10-28 18:10:42 +010097
98 async def init_env(
99 self,
100 k8s_creds: str,
101 namespace: str = 'kube-system',
102 reuse_cluster_uuid=None
103 ) -> (str, bool):
garciadeblas2ce889d2019-12-13 13:39:03 +0100104 """
105 It prepares a given K8s cluster environment to run Charts on both sides:
106 client (OSM)
107 server (Tiller)
108
109 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
110 :param namespace: optional namespace to be used for helm. By default, 'kube-system' will be used
111 :param reuse_cluster_uuid: existing cluster uuid for reuse
112 :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
113 (on error, an exception will be raised)
114 """
quilesj26c78a42019-10-28 18:10:42 +0100115
116 cluster_uuid = reuse_cluster_uuid
117 if not cluster_uuid:
118 cluster_uuid = str(uuid4())
119
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100120 self.log.debug('Initializing K8S environment. namespace: {}'.format(namespace))
quilesj26c78a42019-10-28 18:10:42 +0100121
122 # create config filename
quilesjcda5f412019-11-18 11:32:12 +0100123 kube_dir, helm_dir, config_filename, cluster_dir = \
124 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100125 f = open(config_filename, "w")
126 f.write(k8s_creds)
127 f.close()
128
129 # check if tiller pod is up in cluster
130 command = '{} --kubeconfig={} --namespace={} get deployments'\
131 .format(self.kubectl_command, config_filename, namespace)
132 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
133
134 output_table = K8sHelmConnector._output_to_table(output=output)
135
136 # find 'tiller' pod in all pods
137 already_initialized = False
138 try:
139 for row in output_table:
140 if row[0].startswith('tiller-deploy'):
141 already_initialized = True
142 break
143 except Exception as e:
144 pass
145
146 # helm init
147 n2vc_installed_sw = False
148 if not already_initialized:
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100149 self.log.info('Initializing helm in client and server: {}'.format(cluster_uuid))
quilesj26c78a42019-10-28 18:10:42 +0100150 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
151 .format(self._helm_command, config_filename, namespace, helm_dir)
152 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
153 n2vc_installed_sw = True
154 else:
155 # check client helm installation
156 check_file = helm_dir + '/repository/repositories.yaml'
157 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100158 self.log.info('Initializing helm in client: {}'.format(cluster_uuid))
quilesj26c78a42019-10-28 18:10:42 +0100159 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
160 .format(self._helm_command, config_filename, namespace, helm_dir)
161 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
162 else:
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100163 self.log.info('Helm client already initialized')
quilesj26c78a42019-10-28 18:10:42 +0100164
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100165 self.log.info('Cluster initialized {}'.format(cluster_uuid))
quilesj26c78a42019-10-28 18:10:42 +0100166
167 return cluster_uuid, n2vc_installed_sw
168
169 async def repo_add(
170 self,
171 cluster_uuid: str,
172 name: str,
173 url: str,
174 repo_type: str = 'chart'
175 ):
176
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100177 self.log.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
quilesj26c78a42019-10-28 18:10:42 +0100178
179 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100180 kube_dir, helm_dir, config_filename, cluster_dir = \
181 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100182
183 # helm repo update
184 command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100185 self.log.debug('updating repo: {}'.format(command))
quilesj26c78a42019-10-28 18:10:42 +0100186 await self._local_async_exec(command=command, raise_exception_on_error=False)
187
188 # helm repo add name url
189 command = '{} --kubeconfig={} --home={} repo add {} {}'\
190 .format(self._helm_command, config_filename, helm_dir, name, url)
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100191 self.log.debug('adding repo: {}'.format(command))
quilesj26c78a42019-10-28 18:10:42 +0100192 await self._local_async_exec(command=command, raise_exception_on_error=True)
193
194 async def repo_list(
195 self,
196 cluster_uuid: str
197 ) -> list:
198 """
199 Get the list of registered repositories
200
201 :return: list of registered repositories: [ (name, url) .... ]
202 """
203
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100204 self.log.debug('list repositories for cluster {}'.format(cluster_uuid))
quilesj26c78a42019-10-28 18:10:42 +0100205
206 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100207 kube_dir, helm_dir, config_filename, cluster_dir = \
208 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100209
quilesj1be06302019-11-29 11:17:11 +0000210 command = '{} --kubeconfig={} --home={} repo list --output yaml'\
211 .format(self._helm_command, config_filename, helm_dir)
quilesj26c78a42019-10-28 18:10:42 +0100212
213 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
214 if output and len(output) > 0:
215 return yaml.load(output, Loader=yaml.SafeLoader)
216 else:
217 return []
218
219 async def repo_remove(
220 self,
221 cluster_uuid: str,
222 name: str
223 ):
224 """
225 Remove a repository from OSM
226
227 :param cluster_uuid: the cluster
228 :param name: repo name in OSM
229 :return: True if successful
230 """
231
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100232 self.log.debug('list repositories for cluster {}'.format(cluster_uuid))
quilesj26c78a42019-10-28 18:10:42 +0100233
234 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100235 kube_dir, helm_dir, config_filename, cluster_dir = \
236 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100237
238 command = '{} --kubeconfig={} --home={} repo remove {}'\
239 .format(self._helm_command, config_filename, helm_dir, name)
240
241 await self._local_async_exec(command=command, raise_exception_on_error=True)
242
243 async def reset(
244 self,
245 cluster_uuid: str,
246 force: bool = False,
247 uninstall_sw: bool = False
248 ) -> bool:
249
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100250 self.log.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
quilesj26c78a42019-10-28 18:10:42 +0100251
252 # get kube and helm directories
quilesjcda5f412019-11-18 11:32:12 +0100253 kube_dir, helm_dir, config_filename, cluster_dir = \
254 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
quilesj26c78a42019-10-28 18:10:42 +0100255
256 # uninstall releases if needed
257 releases = await self.instances_list(cluster_uuid=cluster_uuid)
258 if len(releases) > 0:
259 if force:
260 for r in releases:
261 try:
262 kdu_instance = r.get('Name')
263 chart = r.get('Chart')
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100264 self.log.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
quilesj26c78a42019-10-28 18:10:42 +0100265 await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
266 except Exception as e:
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100267 self.log.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
quilesj26c78a42019-10-28 18:10:42 +0100268 else:
269 msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
270 .format(cluster_uuid)
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100271 self.log.error(msg)
quilesja6748412019-12-04 07:51:26 +0000272 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +0100273
274 if uninstall_sw:
275
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100276 self.log.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
quilesj26c78a42019-10-28 18:10:42 +0100277
278 # find namespace for tiller pod
279 command = '{} --kubeconfig={} get deployments --all-namespaces'\
280 .format(self.kubectl_command, config_filename)
281 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
282 output_table = K8sHelmConnector._output_to_table(output=output)
283 namespace = None
284 for r in output_table:
285 try:
286 if 'tiller-deploy' in r[1]:
287 namespace = r[0]
288 break
289 except Exception as e:
290 pass
291 else:
292 msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100293 self.log.error(msg)
quilesj26c78a42019-10-28 18:10:42 +0100294
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100295 self.log.debug('namespace for tiller: {}'.format(namespace))
quilesj26c78a42019-10-28 18:10:42 +0100296
297 force_str = '--force'
298
299 if namespace:
300 # delete tiller deployment
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100301 self.log.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
quilesj26c78a42019-10-28 18:10:42 +0100302 command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
303 .format(self.kubectl_command, namespace, config_filename, force_str)
304 await self._local_async_exec(command=command, raise_exception_on_error=False)
305
306 # uninstall tiller from cluster
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100307 self.log.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
quilesj26c78a42019-10-28 18:10:42 +0100308 command = '{} --kubeconfig={} --home={} reset'\
309 .format(self._helm_command, config_filename, helm_dir)
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100310 self.log.debug('resetting: {}'.format(command))
quilesj26c78a42019-10-28 18:10:42 +0100311 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
312 else:
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100313 self.log.debug('namespace not found')
quilesj26c78a42019-10-28 18:10:42 +0100314
315 # delete cluster directory
316 dir = self.fs.path + '/' + cluster_uuid
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100317 self.log.debug('Removing directory {}'.format(dir))
quilesj26c78a42019-10-28 18:10:42 +0100318 shutil.rmtree(dir, ignore_errors=True)
319
320 return True
321
322 async def install(
323 self,
324 cluster_uuid: str,
325 kdu_model: str,
326 atomic: bool = True,
327 timeout: float = 300,
328 params: dict = None,
Dominik Fleischmann12aa0842020-02-04 15:32:42 +0100329 db_dict: dict = None,
330 kdu_name: str = None
quilesj26c78a42019-10-28 18:10:42 +0100331 ):
332
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100333 self.log.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
quilesj26c78a42019-10-28 18:10:42 +0100334
quilesj26c78a42019-10-28 18:10:42 +0100335 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100336 kube_dir, helm_dir, config_filename, cluster_dir = \
337 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100338
339 # params to str
quilesjcda5f412019-11-18 11:32:12 +0100340 # params_str = K8sHelmConnector._params_to_set_option(params)
341 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
quilesj26c78a42019-10-28 18:10:42 +0100342
343 timeout_str = ''
344 if timeout:
345 timeout_str = '--timeout {}'.format(timeout)
346
347 # atomic
348 atomic_str = ''
349 if atomic:
350 atomic_str = '--atomic'
351
352 # version
353 version_str = ''
354 if ':' in kdu_model:
355 parts = kdu_model.split(sep=':')
356 if len(parts) == 2:
357 version_str = '--version {}'.format(parts[1])
358 kdu_model = parts[0]
359
quilesja6748412019-12-04 07:51:26 +0000360 # generate a name for the release. Then, check if already exists
quilesj26c78a42019-10-28 18:10:42 +0100361 kdu_instance = None
362 while kdu_instance is None:
363 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
364 try:
365 result = await self._status_kdu(
366 cluster_uuid=cluster_uuid,
367 kdu_instance=kdu_instance,
368 show_error_log=False
369 )
370 if result is not None:
371 # instance already exists: generate a new one
372 kdu_instance = None
tierno41d639e2020-02-04 15:26:25 +0000373 except K8sException:
374 pass
quilesj26c78a42019-10-28 18:10:42 +0100375
376 # helm repo install
377 command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
378 .format(self._helm_command, atomic_str, config_filename, helm_dir,
379 params_str, timeout_str, kdu_instance, kdu_model, version_str)
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100380 self.log.debug('installing: {}'.format(command))
quilesj26c78a42019-10-28 18:10:42 +0100381
382 if atomic:
383 # exec helm in a task
384 exec_task = asyncio.ensure_future(
385 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
386 )
lloretgallegf00dcae2020-02-20 12:01:17 +0100387
quilesj26c78a42019-10-28 18:10:42 +0100388 # write status in another task
389 status_task = asyncio.ensure_future(
390 coro_or_future=self._store_status(
391 cluster_uuid=cluster_uuid,
392 kdu_instance=kdu_instance,
393 db_dict=db_dict,
394 operation='install',
395 run_once=False
396 )
397 )
398
399 # wait for execution task
400 await asyncio.wait([exec_task])
401
402 # cancel status task
403 status_task.cancel()
404
405 output, rc = exec_task.result()
406
407 else:
408
409 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
410
quilesjcda5f412019-11-18 11:32:12 +0100411 # remove temporal values yaml file
412 if file_to_delete:
413 os.remove(file_to_delete)
414
quilesj26c78a42019-10-28 18:10:42 +0100415 # write final status
416 await self._store_status(
417 cluster_uuid=cluster_uuid,
418 kdu_instance=kdu_instance,
419 db_dict=db_dict,
420 operation='install',
421 run_once=True,
422 check_every=0
423 )
424
425 if rc != 0:
426 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100427 self.log.error(msg)
quilesja6748412019-12-04 07:51:26 +0000428 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +0100429
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100430 self.log.debug('Returning kdu_instance {}'.format(kdu_instance))
quilesj26c78a42019-10-28 18:10:42 +0100431 return kdu_instance
432
433 async def instances_list(
434 self,
435 cluster_uuid: str
436 ) -> list:
437 """
438 returns a list of deployed releases in a cluster
439
440 :param cluster_uuid: the cluster
441 :return:
442 """
443
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100444 self.log.debug('list releases for cluster {}'.format(cluster_uuid))
quilesj26c78a42019-10-28 18:10:42 +0100445
446 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100447 kube_dir, helm_dir, config_filename, cluster_dir = \
448 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100449
450 command = '{} --kubeconfig={} --home={} list --output yaml'\
451 .format(self._helm_command, config_filename, helm_dir)
452
453 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
454
455 if output and len(output) > 0:
456 return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
457 else:
458 return []
459
460 async def upgrade(
461 self,
462 cluster_uuid: str,
463 kdu_instance: str,
464 kdu_model: str = None,
465 atomic: bool = True,
466 timeout: float = 300,
467 params: dict = None,
468 db_dict: dict = None
469 ):
470
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100471 self.log.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
quilesj26c78a42019-10-28 18:10:42 +0100472
quilesj26c78a42019-10-28 18:10:42 +0100473 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100474 kube_dir, helm_dir, config_filename, cluster_dir = \
475 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100476
477 # params to str
quilesjcda5f412019-11-18 11:32:12 +0100478 # params_str = K8sHelmConnector._params_to_set_option(params)
479 params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
quilesj26c78a42019-10-28 18:10:42 +0100480
481 timeout_str = ''
482 if timeout:
483 timeout_str = '--timeout {}'.format(timeout)
484
485 # atomic
486 atomic_str = ''
487 if atomic:
488 atomic_str = '--atomic'
489
490 # version
491 version_str = ''
quilesjcda5f412019-11-18 11:32:12 +0100492 if kdu_model and ':' in kdu_model:
quilesj26c78a42019-10-28 18:10:42 +0100493 parts = kdu_model.split(sep=':')
494 if len(parts) == 2:
495 version_str = '--version {}'.format(parts[1])
496 kdu_model = parts[0]
497
498 # helm repo upgrade
499 command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
500 .format(self._helm_command, atomic_str, config_filename, helm_dir,
501 params_str, timeout_str, kdu_instance, kdu_model, version_str)
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100502 self.log.debug('upgrading: {}'.format(command))
quilesj26c78a42019-10-28 18:10:42 +0100503
504 if atomic:
505
506 # exec helm in a task
507 exec_task = asyncio.ensure_future(
508 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
509 )
510 # write status in another task
511 status_task = asyncio.ensure_future(
512 coro_or_future=self._store_status(
513 cluster_uuid=cluster_uuid,
514 kdu_instance=kdu_instance,
515 db_dict=db_dict,
516 operation='upgrade',
517 run_once=False
518 )
519 )
520
521 # wait for execution task
quilesj1be06302019-11-29 11:17:11 +0000522 await asyncio.wait([exec_task])
quilesj26c78a42019-10-28 18:10:42 +0100523
524 # cancel status task
525 status_task.cancel()
526 output, rc = exec_task.result()
527
528 else:
529
530 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
531
quilesjcda5f412019-11-18 11:32:12 +0100532 # remove temporal values yaml file
533 if file_to_delete:
534 os.remove(file_to_delete)
535
quilesj26c78a42019-10-28 18:10:42 +0100536 # write final status
537 await self._store_status(
538 cluster_uuid=cluster_uuid,
539 kdu_instance=kdu_instance,
540 db_dict=db_dict,
541 operation='upgrade',
542 run_once=True,
543 check_every=0
544 )
545
546 if rc != 0:
547 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100548 self.log.error(msg)
quilesja6748412019-12-04 07:51:26 +0000549 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +0100550
551 # return new revision number
552 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
553 if instance:
554 revision = int(instance.get('Revision'))
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100555 self.log.debug('New revision: {}'.format(revision))
quilesj26c78a42019-10-28 18:10:42 +0100556 return revision
557 else:
558 return 0
559
560 async def rollback(
561 self,
562 cluster_uuid: str,
563 kdu_instance: str,
564 revision=0,
565 db_dict: dict = None
566 ):
567
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100568 self.log.debug('rollback kdu_instance {} to revision {} from cluster {}'
quilesj26c78a42019-10-28 18:10:42 +0100569 .format(kdu_instance, revision, cluster_uuid))
570
571 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100572 kube_dir, helm_dir, config_filename, cluster_dir = \
573 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100574
575 command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
576 .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
577
578 # exec helm in a task
579 exec_task = asyncio.ensure_future(
580 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
581 )
582 # write status in another task
583 status_task = asyncio.ensure_future(
584 coro_or_future=self._store_status(
585 cluster_uuid=cluster_uuid,
586 kdu_instance=kdu_instance,
587 db_dict=db_dict,
588 operation='rollback',
589 run_once=False
590 )
591 )
592
593 # wait for execution task
594 await asyncio.wait([exec_task])
595
596 # cancel status task
597 status_task.cancel()
598
599 output, rc = exec_task.result()
600
601 # write final status
602 await self._store_status(
603 cluster_uuid=cluster_uuid,
604 kdu_instance=kdu_instance,
605 db_dict=db_dict,
606 operation='rollback',
607 run_once=True,
608 check_every=0
609 )
610
611 if rc != 0:
612 msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100613 self.log.error(msg)
quilesja6748412019-12-04 07:51:26 +0000614 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +0100615
616 # return new revision number
617 instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
618 if instance:
619 revision = int(instance.get('Revision'))
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100620 self.log.debug('New revision: {}'.format(revision))
quilesj26c78a42019-10-28 18:10:42 +0100621 return revision
622 else:
623 return 0
624
625 async def uninstall(
626 self,
627 cluster_uuid: str,
628 kdu_instance: str
629 ):
630 """
631 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
632 after all _terminate-config-primitive_ of the VNF are invoked).
633
634 :param cluster_uuid: UUID of a K8s cluster known by OSM
635 :param kdu_instance: unique name for the KDU instance to be deleted
636 :return: True if successful
637 """
638
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100639 self.log.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
quilesj26c78a42019-10-28 18:10:42 +0100640
641 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100642 kube_dir, helm_dir, config_filename, cluster_dir = \
643 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100644
645 command = '{} --kubeconfig={} --home={} delete --purge {}'\
646 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
647
648 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
649
650 return self._output_to_table(output)
651
652 async def inspect_kdu(
653 self,
quilesj1be06302019-11-29 11:17:11 +0000654 kdu_model: str,
655 repo_url: str = None
quilesj26c78a42019-10-28 18:10:42 +0100656 ) -> str:
657
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100658 self.log.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url))
quilesj26c78a42019-10-28 18:10:42 +0100659
quilesj1be06302019-11-29 11:17:11 +0000660 return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url)
quilesj26c78a42019-10-28 18:10:42 +0100661
quilesj1be06302019-11-29 11:17:11 +0000662 async def values_kdu(
663 self,
664 kdu_model: str,
665 repo_url: str = None
666 ) -> str:
quilesj26c78a42019-10-28 18:10:42 +0100667
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100668 self.log.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url))
quilesj1be06302019-11-29 11:17:11 +0000669
670 return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url)
quilesj26c78a42019-10-28 18:10:42 +0100671
672 async def help_kdu(
673 self,
quilesj1be06302019-11-29 11:17:11 +0000674 kdu_model: str,
675 repo_url: str = None
676 ) -> str:
quilesj26c78a42019-10-28 18:10:42 +0100677
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100678 self.log.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url))
quilesj26c78a42019-10-28 18:10:42 +0100679
quilesj1be06302019-11-29 11:17:11 +0000680 return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url)
quilesj26c78a42019-10-28 18:10:42 +0100681
682 async def status_kdu(
683 self,
684 cluster_uuid: str,
685 kdu_instance: str
quilesj1be06302019-11-29 11:17:11 +0000686 ) -> str:
quilesj26c78a42019-10-28 18:10:42 +0100687
quilesj1be06302019-11-29 11:17:11 +0000688 # call internal function
689 return await self._status_kdu(
690 cluster_uuid=cluster_uuid,
691 kdu_instance=kdu_instance,
692 show_error_log=True,
693 return_text=True
694 )
quilesj26c78a42019-10-28 18:10:42 +0100695
lloretgallegf00dcae2020-02-20 12:01:17 +0100696 async def synchronize_repos(self, cluster_uuid: str):
697
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100698 self.log.debug("syncronize repos for cluster helm-id: {}",)
lloretgallegf00dcae2020-02-20 12:01:17 +0100699 try:
700 update_repos_timeout = 300 # max timeout to sync a single repos, more than this is too much
701 db_k8scluster = self.db.get_one("k8sclusters", {"_admin.helm-chart.id": cluster_uuid})
702 if db_k8scluster:
703 nbi_repo_list = db_k8scluster.get("_admin").get("helm_chart_repos") or []
704 cluster_repo_dict = db_k8scluster.get("_admin").get("helm_charts_added") or {}
705 # elements that must be deleted
706 deleted_repo_list = []
707 added_repo_dict = {}
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100708 self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
709 self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
lloretgallegf00dcae2020-02-20 12:01:17 +0100710
711 # obtain repos to add: registered by nbi but not added
712 repos_to_add = [repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)]
713
714 # obtain repos to delete: added by cluster but not in nbi list
715 repos_to_delete = [repo for repo in cluster_repo_dict.keys() if repo not in nbi_repo_list]
716
717 # delete repos: must delete first then add because there may be different repos with same name but
718 # different id and url
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100719 self.log.debug("repos to delete: {}".format(repos_to_delete))
lloretgallegf00dcae2020-02-20 12:01:17 +0100720 for repo_id in repos_to_delete:
721 # try to delete repos
722 try:
723 repo_delete_task = asyncio.ensure_future(self.repo_remove(cluster_uuid=cluster_uuid,
724 name=cluster_repo_dict[repo_id]))
725 await asyncio.wait_for(repo_delete_task, update_repos_timeout)
726 except Exception as e:
727 self.warning("Error deleting repo, id: {}, name: {}, err_msg: {}".format(repo_id,
728 cluster_repo_dict[repo_id], str(e)))
729 # always add to the list of to_delete if there is an error because if is not there deleting raises error
730 deleted_repo_list.append(repo_id)
731
732 # add repos
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100733 self.log.debug("repos to add: {}".format(repos_to_add))
lloretgallegf00dcae2020-02-20 12:01:17 +0100734 add_task_list = []
735 for repo_id in repos_to_add:
736 # obtain the repo data from the db
737 # if there is an error getting the repo in the database we will ignore this repo and continue
738 # because there is a possible race condition where the repo has been deleted while processing
739 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100740 self.log.debug("obtained repo: id, {}, name: {}, url: {}".format(repo_id, db_repo["name"], db_repo["url"]))
lloretgallegf00dcae2020-02-20 12:01:17 +0100741 try:
742 repo_add_task = asyncio.ensure_future(self.repo_add(cluster_uuid=cluster_uuid,
743 name=db_repo["name"], url=db_repo["url"],
744 repo_type="chart"))
745 await asyncio.wait_for(repo_add_task, update_repos_timeout)
746 added_repo_dict[repo_id] = db_repo["name"]
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100747 self.log.debug("added repo: id, {}, name: {}".format(repo_id, db_repo["name"]))
lloretgallegf00dcae2020-02-20 12:01:17 +0100748 except Exception as e:
749 # deal with error adding repo, adding a repo that already exists does not raise any error
750 # will not raise error because a wrong repos added by anyone could prevent instantiating any ns
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100751 self.log.error("Error adding repo id: {}, err_msg: {} ".format(repo_id, repr(e)))
lloretgallegf00dcae2020-02-20 12:01:17 +0100752
753 return deleted_repo_list, added_repo_dict
754
755 else: # else db_k8scluster does not exist
756 raise K8sException("k8cluster with helm-id : {} not found".format(cluster_uuid))
757
758 except Exception as e:
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100759 self.log.error("Error synchronizing repos: {}".format(str(e)))
lloretgallegf00dcae2020-02-20 12:01:17 +0100760 raise K8sException("Error synchronizing repos")
761
quilesj26c78a42019-10-28 18:10:42 +0100762 """
763 ##################################################################################################
764 ########################################## P R I V A T E #########################################
765 ##################################################################################################
766 """
767
quilesj1be06302019-11-29 11:17:11 +0000768 async def _exec_inspect_comand(
769 self,
770 inspect_command: str,
771 kdu_model: str,
772 repo_url: str = None
773 ):
774
775 repo_str = ''
776 if repo_url:
777 repo_str = ' --repo {}'.format(repo_url)
778 idx = kdu_model.find('/')
779 if idx >= 0:
780 idx += 1
781 kdu_model = kdu_model[idx:]
782
783 inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str)
784 output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True)
785
786 return output
787
quilesj26c78a42019-10-28 18:10:42 +0100788 async def _status_kdu(
789 self,
790 cluster_uuid: str,
791 kdu_instance: str,
quilesj1be06302019-11-29 11:17:11 +0000792 show_error_log: bool = False,
793 return_text: bool = False
quilesj26c78a42019-10-28 18:10:42 +0100794 ):
795
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100796 self.log.debug('status of kdu_instance {}'.format(kdu_instance))
quilesj26c78a42019-10-28 18:10:42 +0100797
798 # config filename
quilesjcda5f412019-11-18 11:32:12 +0100799 kube_dir, helm_dir, config_filename, cluster_dir = \
800 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
quilesj26c78a42019-10-28 18:10:42 +0100801
802 command = '{} --kubeconfig={} --home={} status {} --output yaml'\
803 .format(self._helm_command, config_filename, helm_dir, kdu_instance)
804
805 output, rc = await self._local_async_exec(
806 command=command,
807 raise_exception_on_error=True,
808 show_error_log=show_error_log
809 )
810
quilesj1be06302019-11-29 11:17:11 +0000811 if return_text:
812 return str(output)
813
quilesj26c78a42019-10-28 18:10:42 +0100814 if rc != 0:
815 return None
816
817 data = yaml.load(output, Loader=yaml.SafeLoader)
818
819 # remove field 'notes'
820 try:
821 del data.get('info').get('status')['notes']
822 except KeyError:
823 pass
824
825 # parse field 'resources'
826 try:
827 resources = str(data.get('info').get('status').get('resources'))
828 resource_table = self._output_to_table(resources)
829 data.get('info').get('status')['resources'] = resource_table
830 except Exception as e:
831 pass
832
833 return data
834
quilesj26c78a42019-10-28 18:10:42 +0100835 async def get_instance_info(
836 self,
837 cluster_uuid: str,
838 kdu_instance: str
839 ):
840 instances = await self.instances_list(cluster_uuid=cluster_uuid)
841 for instance in instances:
842 if instance.get('Name') == kdu_instance:
843 return instance
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100844 self.log.debug('Instance {} not found'.format(kdu_instance))
quilesj26c78a42019-10-28 18:10:42 +0100845 return None
846
847 @staticmethod
848 def _generate_release_name(
849 chart_name: str
850 ):
quilesj8d780a92020-01-23 09:28:26 +0000851 # check embeded chart (file or dir)
852 if chart_name.startswith('/'):
853 # extract file or directory name
854 chart_name = chart_name[chart_name.rfind('/')+1:]
855 # check URL
856 elif '://' in chart_name:
857 # extract last portion of URL
858 chart_name = chart_name[chart_name.rfind('/')+1:]
859
quilesj26c78a42019-10-28 18:10:42 +0100860 name = ''
861 for c in chart_name:
862 if c.isalpha() or c.isnumeric():
863 name += c
864 else:
865 name += '-'
866 if len(name) > 35:
867 name = name[0:35]
868
869 # if does not start with alpha character, prefix 'a'
870 if not name[0].isalpha():
871 name = 'a' + name
872
873 name += '-'
874
875 def get_random_number():
876 r = random.randrange(start=1, stop=99999999)
877 s = str(r)
quilesja6748412019-12-04 07:51:26 +0000878 s = s.rjust(10, '0')
quilesj26c78a42019-10-28 18:10:42 +0100879 return s
880
881 name = name + get_random_number()
882 return name.lower()
883
884 async def _store_status(
885 self,
886 cluster_uuid: str,
887 operation: str,
888 kdu_instance: str,
889 check_every: float = 10,
890 db_dict: dict = None,
891 run_once: bool = False
892 ):
893 while True:
894 try:
895 await asyncio.sleep(check_every)
896 detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
897 status = detailed_status.get('info').get('Description')
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100898 self.log.debug('STATUS:\n{}'.format(status))
899 self.log.debug('DETAILED STATUS:\n{}'.format(detailed_status))
quilesj26c78a42019-10-28 18:10:42 +0100900 # write status to db
901 result = await self.write_app_status_to_db(
902 db_dict=db_dict,
903 status=str(status),
904 detailed_status=str(detailed_status),
905 operation=operation)
906 if not result:
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100907 self.log.info('Error writing in database. Task exiting...')
quilesj26c78a42019-10-28 18:10:42 +0100908 return
909 except asyncio.CancelledError:
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100910 self.log.debug('Task cancelled')
quilesj26c78a42019-10-28 18:10:42 +0100911 return
912 except Exception as e:
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100913 self.log.debug('_store_status exception: {}'.format(str(e)))
quilesj26c78a42019-10-28 18:10:42 +0100914 pass
915 finally:
916 if run_once:
917 return
918
919 async def _is_install_completed(
920 self,
921 cluster_uuid: str,
922 kdu_instance: str
923 ) -> bool:
924
quilesj1be06302019-11-29 11:17:11 +0000925 status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False)
quilesj26c78a42019-10-28 18:10:42 +0100926
927 # extract info.status.resources-> str
928 # format:
929 # ==> v1/Deployment
930 # NAME READY UP-TO-DATE AVAILABLE AGE
931 # halting-horse-mongodb 0/1 1 0 0s
932 # halting-petit-mongodb 1/1 1 0 0s
933 # blank line
934 resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
935
936 # convert to table
937 resources = K8sHelmConnector._output_to_table(resources)
938
939 num_lines = len(resources)
940 index = 0
941 while index < num_lines:
942 try:
943 line1 = resources[index]
944 index += 1
945 # find '==>' in column 0
946 if line1[0] == '==>':
947 line2 = resources[index]
948 index += 1
949 # find READY in column 1
950 if line2[1] == 'READY':
951 # read next lines
952 line3 = resources[index]
953 index += 1
954 while len(line3) > 1 and index < num_lines:
955 ready_value = line3[1]
956 parts = ready_value.split(sep='/')
957 current = int(parts[0])
958 total = int(parts[1])
959 if current < total:
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +0100960 self.log.debug('NOT READY:\n {}'.format(line3))
quilesj26c78a42019-10-28 18:10:42 +0100961 ready = False
962 line3 = resources[index]
963 index += 1
964
965 except Exception as e:
966 pass
967
968 return ready
969
970 @staticmethod
971 def _get_deep(dictionary: dict, members: tuple):
972 target = dictionary
973 value = None
974 try:
975 for m in members:
976 value = target.get(m)
977 if not value:
978 return None
979 else:
980 target = value
981 except Exception as e:
982 pass
983 return value
984
985 # find key:value in several lines
986 @staticmethod
987 def _find_in_lines(p_lines: list, p_key: str) -> str:
988 for line in p_lines:
989 try:
990 if line.startswith(p_key + ':'):
991 parts = line.split(':')
992 the_value = parts[1].strip()
993 return the_value
994 except Exception as e:
995 # ignore it
996 pass
997 return None
998
quilesjcda5f412019-11-18 11:32:12 +0100999 # params for use in -f file
1000 # returns values file option and filename (in order to delete it at the end)
1001 def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
quilesjcda5f412019-11-18 11:32:12 +01001002
1003 if params and len(params) > 0:
1004 kube_dir, helm_dir, config_filename, cluster_dir = \
1005 self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
1006
1007 def get_random_number():
1008 r = random.randrange(start=1, stop=99999999)
1009 s = str(r)
1010 while len(s) < 10:
1011 s = '0' + s
1012 return s
1013
1014 params2 = dict()
1015 for key in params:
1016 value = params.get(key)
1017 if '!!yaml' in str(value):
quilesj1be06302019-11-29 11:17:11 +00001018 value = yaml.load(value[7:])
quilesjcda5f412019-11-18 11:32:12 +01001019 params2[key] = value
1020
1021 values_file = get_random_number() + '.yaml'
1022 with open(values_file, 'w') as stream:
1023 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1024
1025 return '-f {}'.format(values_file), values_file
1026
1027 return '', None
1028
quilesj26c78a42019-10-28 18:10:42 +01001029 # params for use in --set option
1030 @staticmethod
1031 def _params_to_set_option(params: dict) -> str:
1032 params_str = ''
1033 if params and len(params) > 0:
1034 start = True
1035 for key in params:
1036 value = params.get(key, None)
1037 if value is not None:
1038 if start:
1039 params_str += '--set '
1040 start = False
1041 else:
1042 params_str += ','
1043 params_str += '{}={}'.format(key, value)
1044 return params_str
1045
1046 @staticmethod
1047 def _output_to_lines(output: str) -> list:
1048 output_lines = list()
1049 lines = output.splitlines(keepends=False)
1050 for line in lines:
1051 line = line.strip()
1052 if len(line) > 0:
1053 output_lines.append(line)
1054 return output_lines
1055
1056 @staticmethod
1057 def _output_to_table(output: str) -> list:
1058 output_table = list()
1059 lines = output.splitlines(keepends=False)
1060 for line in lines:
1061 line = line.replace('\t', ' ')
1062 line_list = list()
1063 output_table.append(line_list)
1064 cells = line.split(sep=' ')
1065 for cell in cells:
1066 cell = cell.strip()
1067 if len(cell) > 0:
1068 line_list.append(cell)
1069 return output_table
1070
quilesjcda5f412019-11-18 11:32:12 +01001071 def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
quilesj26c78a42019-10-28 18:10:42 +01001072 """
1073 Returns kube and helm directories
1074
1075 :param cluster_name:
1076 :param create_if_not_exist:
quilesjcda5f412019-11-18 11:32:12 +01001077 :return: kube, helm directories, config filename and cluster dir.
1078 Raises exception if not exist and cannot create
quilesj26c78a42019-10-28 18:10:42 +01001079 """
1080
1081 base = self.fs.path
1082 if base.endswith("/") or base.endswith("\\"):
1083 base = base[:-1]
1084
1085 # base dir for cluster
1086 cluster_dir = base + '/' + cluster_name
1087 if create_if_not_exist and not os.path.exists(cluster_dir):
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +01001088 self.log.debug('Creating dir {}'.format(cluster_dir))
quilesj26c78a42019-10-28 18:10:42 +01001089 os.makedirs(cluster_dir)
1090 if not os.path.exists(cluster_dir):
1091 msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +01001092 self.log.error(msg)
quilesja6748412019-12-04 07:51:26 +00001093 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +01001094
1095 # kube dir
1096 kube_dir = cluster_dir + '/' + '.kube'
1097 if create_if_not_exist and not os.path.exists(kube_dir):
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +01001098 self.log.debug('Creating dir {}'.format(kube_dir))
quilesj26c78a42019-10-28 18:10:42 +01001099 os.makedirs(kube_dir)
1100 if not os.path.exists(kube_dir):
1101 msg = 'Kube config dir {} does not exist'.format(kube_dir)
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +01001102 self.log.error(msg)
quilesja6748412019-12-04 07:51:26 +00001103 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +01001104
1105 # helm home dir
1106 helm_dir = cluster_dir + '/' + '.helm'
1107 if create_if_not_exist and not os.path.exists(helm_dir):
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +01001108 self.log.debug('Creating dir {}'.format(helm_dir))
quilesj26c78a42019-10-28 18:10:42 +01001109 os.makedirs(helm_dir)
1110 if not os.path.exists(helm_dir):
1111 msg = 'Helm config dir {} does not exist'.format(helm_dir)
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +01001112 self.log.error(msg)
quilesja6748412019-12-04 07:51:26 +00001113 raise K8sException(msg)
quilesj26c78a42019-10-28 18:10:42 +01001114
1115 config_filename = kube_dir + '/config'
quilesjcda5f412019-11-18 11:32:12 +01001116 return kube_dir, helm_dir, config_filename, cluster_dir
quilesj26c78a42019-10-28 18:10:42 +01001117
1118 @staticmethod
1119 def _remove_multiple_spaces(str):
1120 str = str.strip()
1121 while ' ' in str:
1122 str = str.replace(' ', ' ')
1123 return str
1124
1125 def _local_exec(
1126 self,
1127 command: str
1128 ) -> (str, int):
1129 command = K8sHelmConnector._remove_multiple_spaces(command)
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +01001130 self.log.debug('Executing sync local command: {}'.format(command))
quilesj26c78a42019-10-28 18:10:42 +01001131 # raise exception if fails
1132 output = ''
1133 try:
1134 output = subprocess.check_output(command, shell=True, universal_newlines=True)
1135 return_code = 0
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +01001136 self.log.debug(output)
quilesj26c78a42019-10-28 18:10:42 +01001137 except Exception as e:
1138 return_code = 1
1139
1140 return output, return_code
1141
1142 async def _local_async_exec(
1143 self,
1144 command: str,
1145 raise_exception_on_error: bool = False,
quilesj1be06302019-11-29 11:17:11 +00001146 show_error_log: bool = True,
1147 encode_utf8: bool = False
quilesj26c78a42019-10-28 18:10:42 +01001148 ) -> (str, int):
1149
1150 command = K8sHelmConnector._remove_multiple_spaces(command)
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +01001151 self.log.debug('Executing async local command: {}'.format(command))
quilesj26c78a42019-10-28 18:10:42 +01001152
1153 # split command
1154 command = command.split(sep=' ')
1155
1156 try:
1157 process = await asyncio.create_subprocess_exec(
1158 *command,
1159 stdout=asyncio.subprocess.PIPE,
1160 stderr=asyncio.subprocess.PIPE
1161 )
1162
1163 # wait for command terminate
1164 stdout, stderr = await process.communicate()
1165
1166 return_code = process.returncode
1167
1168 output = ''
1169 if stdout:
1170 output = stdout.decode('utf-8').strip()
quilesj1be06302019-11-29 11:17:11 +00001171 # output = stdout.decode()
quilesj26c78a42019-10-28 18:10:42 +01001172 if stderr:
1173 output = stderr.decode('utf-8').strip()
quilesj1be06302019-11-29 11:17:11 +00001174 # output = stderr.decode()
quilesj26c78a42019-10-28 18:10:42 +01001175
1176 if return_code != 0 and show_error_log:
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +01001177 self.log.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
quilesj26c78a42019-10-28 18:10:42 +01001178 else:
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +01001179 self.log.debug('Return code: {}'.format(return_code))
quilesj26c78a42019-10-28 18:10:42 +01001180
1181 if raise_exception_on_error and return_code != 0:
tierno41d639e2020-02-04 15:26:25 +00001182 raise K8sException(output)
quilesj26c78a42019-10-28 18:10:42 +01001183
quilesj1be06302019-11-29 11:17:11 +00001184 if encode_utf8:
1185 output = output.encode('utf-8').strip()
1186 output = str(output).replace('\\n', '\n')
1187
quilesj26c78a42019-10-28 18:10:42 +01001188 return output, return_code
1189
lloretgallegb4b317c2020-02-26 10:00:16 +01001190 except asyncio.CancelledError:
1191 raise
tierno41d639e2020-02-04 15:26:25 +00001192 except K8sException:
1193 raise
quilesj26c78a42019-10-28 18:10:42 +01001194 except Exception as e:
1195 msg = 'Exception executing command: {} -> {}'.format(command, e)
Dominik Fleischmannbc269eb2020-02-27 10:04:34 +01001196 self.log.error(msg)
quilesj23451b82020-01-23 16:30:04 +00001197 if raise_exception_on_error:
tierno41d639e2020-02-04 15:26:25 +00001198 raise K8sException(e) from e
quilesj23451b82020-01-23 16:30:04 +00001199 else:
1200 return '', -1
quilesj26c78a42019-10-28 18:10:42 +01001201
quilesj26c78a42019-10-28 18:10:42 +01001202 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
tiernoe2bd3da2020-03-26 09:51:11 +00001203 # self.log.debug('Checking if file {} exists...'.format(filename))
quilesj26c78a42019-10-28 18:10:42 +01001204 if os.path.exists(filename):
1205 return True
1206 else:
1207 msg = 'File {} does not exist'.format(filename)
1208 if exception_if_not_exists:
tiernoe2bd3da2020-03-26 09:51:11 +00001209 # self.log.error(msg)
quilesja6748412019-12-04 07:51:26 +00001210 raise K8sException(msg)
lloretgallegf00dcae2020-02-20 12:01:17 +01001211
1212