X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_helm_conn.py;h=417604ad98f3f52e8c1138ae30e34eb75fc885c3;hp=88c94c5c49e535ef58fbf0b3d987a373d86fb181;hb=8d780a9e0414686f0c91be6dc460275fb0155f89;hpb=3957ba32d3a898b7ccbb7ec1bd0c7df8fd88f367 diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py index 88c94c5..417604a 100644 --- a/n2vc/k8s_helm_conn.py +++ b/n2vc/k8s_helm_conn.py @@ -25,12 +25,12 @@ import subprocess import os import shutil import asyncio -import uuid import time import yaml from uuid import uuid4 import random from n2vc.k8s_conn import K8sConnector +from n2vc.exceptions import K8sException class K8sHelmConnector(K8sConnector): @@ -84,6 +84,16 @@ class K8sHelmConnector(K8sConnector): self._helm_command = helm_command self._check_file_exists(filename=helm_command, exception_if_not_exists=True) + # initialize helm client-only + self.debug('Initializing helm client-only...') + command = '{} init --client-only'.format(self._helm_command) + try: + asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False)) + # loop = asyncio.get_event_loop() + # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False)) + except Exception as e: + self.warning(msg='helm init failed (it was already initialized): {}'.format(e)) + self.info('K8S Helm connector initialized') async def init_env( @@ -100,7 +110,8 @@ class K8sHelmConnector(K8sConnector): self.debug('Initializing K8S environment. namespace: {}'.format(namespace)) # create config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) f = open(config_filename, "w") f.write(k8s_creds) f.close() @@ -156,7 +167,8 @@ class K8sHelmConnector(K8sConnector): self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url)) # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) # helm repo update command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir) @@ -182,9 +194,11 @@ class K8sHelmConnector(K8sConnector): self.debug('list repositories for cluster {}'.format(cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) - command = '{} --kubeconfig={} --home={} repo list --output yaml'.format(self._helm_command, config_filename, helm_dir) + command = '{} --kubeconfig={} --home={} repo list --output yaml'\ + .format(self._helm_command, config_filename, helm_dir) output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) if output and len(output) > 0: @@ -208,7 +222,8 @@ class K8sHelmConnector(K8sConnector): self.debug('list repositories for cluster {}'.format(cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) command = '{} --kubeconfig={} --home={} repo remove {}'\ .format(self._helm_command, config_filename, helm_dir, name) @@ -225,7 +240,8 @@ class K8sHelmConnector(K8sConnector): self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid)) # get kube and helm directories - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False) # uninstall releases if needed releases = await self.instances_list(cluster_uuid=cluster_uuid) @@ -243,7 +259,7 @@ class K8sHelmConnector(K8sConnector): msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\ .format(cluster_uuid) self.error(msg) - raise Exception(msg) + raise K8sException(msg) if uninstall_sw: @@ -265,7 +281,6 @@ class K8sHelmConnector(K8sConnector): else: msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid) self.error(msg) - # raise Exception(msg) self.debug('namespace for tiller: {}'.format(namespace)) @@ -306,14 +321,13 @@ class K8sHelmConnector(K8sConnector): self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid)) - start = time.time() - end = start + timeout - # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) # params to str - params_str = K8sHelmConnector._params_to_set_option(params) + # params_str = K8sHelmConnector._params_to_set_option(params) + params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params) timeout_str = '' if timeout: @@ -332,7 +346,7 @@ class K8sHelmConnector(K8sConnector): version_str = '--version {}'.format(parts[1]) kdu_model = parts[0] - # generate a name for the releas. Then, check if already exists + # generate a name for the release. Then, check if already exists kdu_instance = None while kdu_instance is None: kdu_instance = K8sHelmConnector._generate_release_name(kdu_model) @@ -345,7 +359,7 @@ class K8sHelmConnector(K8sConnector): if result is not None: # instance already exists: generate a new one kdu_instance = None - except: + except Exception as e: kdu_instance = None # helm repo install @@ -382,6 +396,10 @@ class K8sHelmConnector(K8sConnector): output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False) + # remove temporal values yaml file + if file_to_delete: + os.remove(file_to_delete) + # write final status await self._store_status( cluster_uuid=cluster_uuid, @@ -395,7 +413,7 @@ class K8sHelmConnector(K8sConnector): if rc != 0: msg = 'Error executing command: {}\nOutput: {}'.format(command, output) self.error(msg) - raise Exception(msg) + raise K8sException(msg) self.debug('Returning kdu_instance {}'.format(kdu_instance)) return kdu_instance @@ -414,7 +432,8 @@ class K8sHelmConnector(K8sConnector): self.debug('list releases for cluster {}'.format(cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) command = '{} --kubeconfig={} --home={} list --output yaml'\ .format(self._helm_command, config_filename, helm_dir) @@ -439,14 +458,13 @@ class K8sHelmConnector(K8sConnector): self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid)) - start = time.time() - end = start + timeout - # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) # params to str - params_str = K8sHelmConnector._params_to_set_option(params) + # params_str = K8sHelmConnector._params_to_set_option(params) + params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params) timeout_str = '' if timeout: @@ -459,7 +477,7 @@ class K8sHelmConnector(K8sConnector): # version version_str = '' - if ':' in kdu_model: + if kdu_model and ':' in kdu_model: parts = kdu_model.split(sep=':') if len(parts) == 2: version_str = '--version {}'.format(parts[1]) @@ -489,7 +507,7 @@ class K8sHelmConnector(K8sConnector): ) # wait for execution task - await asyncio.wait([ exec_task ]) + await asyncio.wait([exec_task]) # cancel status task status_task.cancel() @@ -499,6 +517,10 @@ class K8sHelmConnector(K8sConnector): output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False) + # remove temporal values yaml file + if file_to_delete: + os.remove(file_to_delete) + # write final status await self._store_status( cluster_uuid=cluster_uuid, @@ -512,7 +534,7 @@ class K8sHelmConnector(K8sConnector): if rc != 0: msg = 'Error executing command: {}\nOutput: {}'.format(command, output) self.error(msg) - raise Exception(msg) + raise K8sException(msg) # return new revision number instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) @@ -535,7 +557,8 @@ class K8sHelmConnector(K8sConnector): .format(kdu_instance, revision, cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\ .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision) @@ -576,7 +599,7 @@ class K8sHelmConnector(K8sConnector): if rc != 0: msg = 'Error executing command: {}\nOutput: {}'.format(command, output) self.error(msg) - raise Exception(msg) + raise K8sException(msg) # return new revision number instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) @@ -604,7 +627,8 @@ class K8sHelmConnector(K8sConnector): self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) command = '{} --kubeconfig={} --home={} delete --purge {}'\ .format(self._helm_command, config_filename, helm_dir, kdu_instance) @@ -615,40 +639,47 @@ class K8sHelmConnector(K8sConnector): async def inspect_kdu( self, - kdu_model: str + kdu_model: str, + repo_url: str = None ) -> str: - self.debug('inspect kdu_model {}'.format(kdu_model)) + self.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url)) - command = '{} inspect values {}'\ - .format(self._helm_command, kdu_model) + return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url) - output, rc = await self._local_async_exec(command=command) + async def values_kdu( + self, + kdu_model: str, + repo_url: str = None + ) -> str: - return output + self.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url)) + + return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url) async def help_kdu( self, - kdu_model: str - ): - - self.debug('help kdu_model {}'.format(kdu_model)) - - command = '{} inspect readme {}'\ - .format(self._helm_command, kdu_model) + kdu_model: str, + repo_url: str = None + ) -> str: - output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + self.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url)) - return output + return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url) async def status_kdu( self, cluster_uuid: str, kdu_instance: str - ): - - return await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, show_error_log=True) + ) -> str: + # call internal function + return await self._status_kdu( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + show_error_log=True, + return_text=True + ) """ ################################################################################################## @@ -656,17 +687,39 @@ class K8sHelmConnector(K8sConnector): ################################################################################################## """ + async def _exec_inspect_comand( + self, + inspect_command: str, + kdu_model: str, + repo_url: str = None + ): + + repo_str = '' + if repo_url: + repo_str = ' --repo {}'.format(repo_url) + idx = kdu_model.find('/') + if idx >= 0: + idx += 1 + kdu_model = kdu_model[idx:] + + inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str) + output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True) + + return output + async def _status_kdu( self, cluster_uuid: str, kdu_instance: str, - show_error_log: bool = False + show_error_log: bool = False, + return_text: bool = False ): self.debug('status of kdu_instance {}'.format(kdu_instance)) # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) command = '{} --kubeconfig={} --home={} status {} --output yaml'\ .format(self._helm_command, config_filename, helm_dir, kdu_instance) @@ -677,6 +730,9 @@ class K8sHelmConnector(K8sConnector): show_error_log=show_error_log ) + if return_text: + return str(output) + if rc != 0: return None @@ -698,7 +754,6 @@ class K8sHelmConnector(K8sConnector): return data - async def get_instance_info( self, cluster_uuid: str, @@ -715,6 +770,15 @@ class K8sHelmConnector(K8sConnector): def _generate_release_name( chart_name: str ): + # check embeded chart (file or dir) + if chart_name.startswith('/'): + # extract file or directory name + chart_name = chart_name[chart_name.rfind('/')+1:] + # check URL + elif '://' in chart_name: + # extract last portion of URL + chart_name = chart_name[chart_name.rfind('/')+1:] + name = '' for c in chart_name: if c.isalpha() or c.isnumeric(): @@ -733,8 +797,7 @@ class K8sHelmConnector(K8sConnector): def get_random_number(): r = random.randrange(start=1, stop=99999999) s = str(r) - while len(s) < 10: - s = '0' + s + s = s.rjust(10, '0') return s name = name + get_random_number() @@ -782,7 +845,7 @@ class K8sHelmConnector(K8sConnector): kdu_instance: str ) -> bool: - status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) + status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False) # extract info.status.resources-> str # format: @@ -856,6 +919,36 @@ class K8sHelmConnector(K8sConnector): pass return None + # params for use in -f file + # returns values file option and filename (in order to delete it at the end) + def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str): + + if params and len(params) > 0: + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + def get_random_number(): + r = random.randrange(start=1, stop=99999999) + s = str(r) + while len(s) < 10: + s = '0' + s + return s + + params2 = dict() + for key in params: + value = params.get(key) + if '!!yaml' in str(value): + value = yaml.load(value[7:]) + params2[key] = value + + values_file = get_random_number() + '.yaml' + with open(values_file, 'w') as stream: + yaml.dump(params2, stream, indent=4, default_flow_style=False) + + return '-f {}'.format(values_file), values_file + + return '', None + # params for use in --set option @staticmethod def _params_to_set_option(params: dict) -> str: @@ -898,13 +991,14 @@ class K8sHelmConnector(K8sConnector): line_list.append(cell) return output_table - def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str): + def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str): """ Returns kube and helm directories :param cluster_name: :param create_if_not_exist: - :return: kube, helm directories and config filename. Raises exception if not exist and cannot create + :return: kube, helm directories, config filename and cluster dir. + Raises exception if not exist and cannot create """ base = self.fs.path @@ -919,7 +1013,7 @@ class K8sHelmConnector(K8sConnector): if not os.path.exists(cluster_dir): msg = 'Base cluster dir {} does not exist'.format(cluster_dir) self.error(msg) - raise Exception(msg) + raise K8sException(msg) # kube dir kube_dir = cluster_dir + '/' + '.kube' @@ -929,7 +1023,7 @@ class K8sHelmConnector(K8sConnector): if not os.path.exists(kube_dir): msg = 'Kube config dir {} does not exist'.format(kube_dir) self.error(msg) - raise Exception(msg) + raise K8sException(msg) # helm home dir helm_dir = cluster_dir + '/' + '.helm' @@ -939,10 +1033,10 @@ class K8sHelmConnector(K8sConnector): if not os.path.exists(helm_dir): msg = 'Helm config dir {} does not exist'.format(helm_dir) self.error(msg) - raise Exception(msg) + raise K8sException(msg) config_filename = kube_dir + '/config' - return kube_dir, helm_dir, config_filename + return kube_dir, helm_dir, config_filename, cluster_dir @staticmethod def _remove_multiple_spaces(str): @@ -972,7 +1066,8 @@ class K8sHelmConnector(K8sConnector): self, command: str, raise_exception_on_error: bool = False, - show_error_log: bool = False + show_error_log: bool = True, + encode_utf8: bool = False ) -> (str, int): command = K8sHelmConnector._remove_multiple_spaces(command) @@ -996,8 +1091,10 @@ class K8sHelmConnector(K8sConnector): output = '' if stdout: output = stdout.decode('utf-8').strip() + # output = stdout.decode() if stderr: output = stderr.decode('utf-8').strip() + # output = stderr.decode() if return_code != 0 and show_error_log: self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output)) @@ -1007,13 +1104,20 @@ class K8sHelmConnector(K8sConnector): if raise_exception_on_error and return_code != 0: raise Exception(output) + if encode_utf8: + output = output.encode('utf-8').strip() + output = str(output).replace('\\n', '\n') + return output, return_code except Exception as e: msg = 'Exception executing command: {} -> {}'.format(command, e) if show_error_log: self.error(msg) - return '', -1 + if raise_exception_on_error: + raise e + else: + return '', -1 def _remote_exec( self, @@ -1051,5 +1155,4 @@ class K8sHelmConnector(K8sConnector): msg = 'File {} does not exist'.format(filename) if exception_if_not_exists: self.error(msg) - raise Exception(msg) - + raise K8sException(msg)