X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_helm_conn.py;h=d3fbed667d4abafce9429b8ad34cf1a22e91abb3;hp=88c94c5c49e535ef58fbf0b3d987a373d86fb181;hb=fc796cc98009d16e124dd47c0596c5541dc10f4d;hpb=26c78a4b25fcbfc92362d730403460d736031736 diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py index 88c94c5..d3fbed6 100644 --- a/n2vc/k8s_helm_conn.py +++ b/n2vc/k8s_helm_conn.py @@ -20,17 +20,16 @@ # contact with: nfvlabs@tid.es ## -import paramiko import subprocess import os import shutil import asyncio -import uuid import time import yaml from uuid import uuid4 import random from n2vc.k8s_conn import K8sConnector +from n2vc.exceptions import K8sException class K8sHelmConnector(K8sConnector): @@ -68,7 +67,7 @@ class K8sHelmConnector(K8sConnector): on_update_db=on_update_db ) - self.info('Initializing K8S Helm connector') + self.log.info('Initializing K8S Helm connector') # random numbers for release name generation random.seed(time.time()) @@ -84,7 +83,17 @@ class K8sHelmConnector(K8sConnector): self._helm_command = helm_command self._check_file_exists(filename=helm_command, exception_if_not_exists=True) - self.info('K8S Helm connector initialized') + # initialize helm client-only + self.log.debug('Initializing helm client-only...') + command = '{} init --client-only'.format(self._helm_command) + try: + asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False)) + # loop = asyncio.get_event_loop() + # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False)) + except Exception as e: + self.warning(msg='helm init failed (it was already initialized): {}'.format(e)) + + self.log.info('K8S Helm connector initialized') async def init_env( self, @@ -92,15 +101,27 @@ class K8sHelmConnector(K8sConnector): namespace: str = 'kube-system', reuse_cluster_uuid=None ) -> (str, bool): + """ + It prepares a given K8s cluster environment to run Charts on both sides: + client (OSM) + server (Tiller) + + :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config' + :param namespace: optional namespace to be used for helm. By default, 'kube-system' will be used + :param reuse_cluster_uuid: existing cluster uuid for reuse + :return: uuid of the K8s cluster and True if connector has installed some software in the cluster + (on error, an exception will be raised) + """ cluster_uuid = reuse_cluster_uuid if not cluster_uuid: cluster_uuid = str(uuid4()) - self.debug('Initializing K8S environment. namespace: {}'.format(namespace)) + self.log.debug('Initializing K8S environment. namespace: {}'.format(namespace)) # create config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) f = open(config_filename, "w") f.write(k8s_creds) f.close() @@ -125,7 +146,7 @@ class K8sHelmConnector(K8sConnector): # helm init n2vc_installed_sw = False if not already_initialized: - self.info('Initializing helm in client and server: {}'.format(cluster_uuid)) + self.log.info('Initializing helm in client and server: {}'.format(cluster_uuid)) command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\ .format(self._helm_command, config_filename, namespace, helm_dir) output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) @@ -134,14 +155,14 @@ class K8sHelmConnector(K8sConnector): # check client helm installation check_file = helm_dir + '/repository/repositories.yaml' if not self._check_file_exists(filename=check_file, exception_if_not_exists=False): - self.info('Initializing helm in client: {}'.format(cluster_uuid)) + self.log.info('Initializing helm in client: {}'.format(cluster_uuid)) command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\ .format(self._helm_command, config_filename, namespace, helm_dir) output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) else: - self.info('Helm client already initialized') + self.log.info('Helm client already initialized') - self.info('Cluster initialized {}'.format(cluster_uuid)) + self.log.info('Cluster initialized {}'.format(cluster_uuid)) return cluster_uuid, n2vc_installed_sw @@ -153,20 +174,21 @@ class K8sHelmConnector(K8sConnector): repo_type: str = 'chart' ): - self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url)) + self.log.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url)) # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) # helm repo update command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir) - self.debug('updating repo: {}'.format(command)) + self.log.debug('updating repo: {}'.format(command)) await self._local_async_exec(command=command, raise_exception_on_error=False) # helm repo add name url command = '{} --kubeconfig={} --home={} repo add {} {}'\ .format(self._helm_command, config_filename, helm_dir, name, url) - self.debug('adding repo: {}'.format(command)) + self.log.debug('adding repo: {}'.format(command)) await self._local_async_exec(command=command, raise_exception_on_error=True) async def repo_list( @@ -179,12 +201,14 @@ class K8sHelmConnector(K8sConnector): :return: list of registered repositories: [ (name, url) .... ] """ - self.debug('list repositories for cluster {}'.format(cluster_uuid)) + self.log.debug('list repositories for cluster {}'.format(cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) - command = '{} --kubeconfig={} --home={} repo list --output yaml'.format(self._helm_command, config_filename, helm_dir) + command = '{} --kubeconfig={} --home={} repo list --output yaml'\ + .format(self._helm_command, config_filename, helm_dir) output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) if output and len(output) > 0: @@ -205,10 +229,11 @@ class K8sHelmConnector(K8sConnector): :return: True if successful """ - self.debug('list repositories for cluster {}'.format(cluster_uuid)) + self.log.debug('list repositories for cluster {}'.format(cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) command = '{} --kubeconfig={} --home={} repo remove {}'\ .format(self._helm_command, config_filename, helm_dir, name) @@ -222,10 +247,11 @@ class K8sHelmConnector(K8sConnector): uninstall_sw: bool = False ) -> bool: - self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid)) + self.log.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid)) # get kube and helm directories - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False) # uninstall releases if needed releases = await self.instances_list(cluster_uuid=cluster_uuid) @@ -235,19 +261,19 @@ class K8sHelmConnector(K8sConnector): try: kdu_instance = r.get('Name') chart = r.get('Chart') - self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance)) + self.log.debug('Uninstalling {} -> {}'.format(chart, kdu_instance)) await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) except Exception as e: - self.error('Error uninstalling release {}: {}'.format(kdu_instance, e)) + self.log.error('Error uninstalling release {}: {}'.format(kdu_instance, e)) else: msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\ .format(cluster_uuid) - self.error(msg) - raise Exception(msg) + self.log.error(msg) + raise K8sException(msg) if uninstall_sw: - self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid)) + self.log.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid)) # find namespace for tiller pod command = '{} --kubeconfig={} get deployments --all-namespaces'\ @@ -264,32 +290,31 @@ class K8sHelmConnector(K8sConnector): pass else: msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid) - self.error(msg) - # raise Exception(msg) + self.log.error(msg) - self.debug('namespace for tiller: {}'.format(namespace)) + self.log.debug('namespace for tiller: {}'.format(namespace)) force_str = '--force' if namespace: # delete tiller deployment - self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace)) + self.log.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace)) command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\ .format(self.kubectl_command, namespace, config_filename, force_str) await self._local_async_exec(command=command, raise_exception_on_error=False) # uninstall tiller from cluster - self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid)) + self.log.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid)) command = '{} --kubeconfig={} --home={} reset'\ .format(self._helm_command, config_filename, helm_dir) - self.debug('resetting: {}'.format(command)) + self.log.debug('resetting: {}'.format(command)) output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) else: - self.debug('namespace not found') + self.log.debug('namespace not found') # delete cluster directory dir = self.fs.path + '/' + cluster_uuid - self.debug('Removing directory {}'.format(dir)) + self.log.debug('Removing directory {}'.format(dir)) shutil.rmtree(dir, ignore_errors=True) return True @@ -301,19 +326,20 @@ class K8sHelmConnector(K8sConnector): atomic: bool = True, timeout: float = 300, params: dict = None, - db_dict: dict = None + db_dict: dict = None, + kdu_name: str = None, + namespace: str = None ): - self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid)) - - start = time.time() - end = start + timeout + self.log.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) # params to str - params_str = K8sHelmConnector._params_to_set_option(params) + # params_str = K8sHelmConnector._params_to_set_option(params) + params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params) timeout_str = '' if timeout: @@ -323,6 +349,10 @@ class K8sHelmConnector(K8sConnector): atomic_str = '' if atomic: atomic_str = '--atomic' + # namespace + namespace_str = '' + if namespace: + namespace_str = "--namespace {}".format(namespace) # version version_str = '' @@ -332,7 +362,7 @@ class K8sHelmConnector(K8sConnector): version_str = '--version {}'.format(parts[1]) kdu_model = parts[0] - # generate a name for the releas. Then, check if already exists + # generate a name for the release. Then, check if already exists kdu_instance = None while kdu_instance is None: kdu_instance = K8sHelmConnector._generate_release_name(kdu_model) @@ -345,20 +375,23 @@ class K8sHelmConnector(K8sConnector): if result is not None: # instance already exists: generate a new one kdu_instance = None - except: - kdu_instance = None + except K8sException: + pass # helm repo install - command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\ - .format(self._helm_command, atomic_str, config_filename, helm_dir, - params_str, timeout_str, kdu_instance, kdu_model, version_str) - self.debug('installing: {}'.format(command)) + command = '{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} {params} {timeout} ' \ + '--name={name} {ns} {model} {ver}'.format(helm=self._helm_command, atomic=atomic_str, + config=config_filename, dir=helm_dir, params=params_str, + timeout=timeout_str, name=kdu_instance, ns=namespace_str, + model=kdu_model, ver=version_str) + self.log.debug('installing: {}'.format(command)) if atomic: # exec helm in a task exec_task = asyncio.ensure_future( coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False) ) + # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( @@ -382,6 +415,10 @@ class K8sHelmConnector(K8sConnector): output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False) + # remove temporal values yaml file + if file_to_delete: + os.remove(file_to_delete) + # write final status await self._store_status( cluster_uuid=cluster_uuid, @@ -394,10 +431,10 @@ class K8sHelmConnector(K8sConnector): if rc != 0: msg = 'Error executing command: {}\nOutput: {}'.format(command, output) - self.error(msg) - raise Exception(msg) + self.log.error(msg) + raise K8sException(msg) - self.debug('Returning kdu_instance {}'.format(kdu_instance)) + self.log.debug('Returning kdu_instance {}'.format(kdu_instance)) return kdu_instance async def instances_list( @@ -411,10 +448,11 @@ class K8sHelmConnector(K8sConnector): :return: """ - self.debug('list releases for cluster {}'.format(cluster_uuid)) + self.log.debug('list releases for cluster {}'.format(cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) command = '{} --kubeconfig={} --home={} list --output yaml'\ .format(self._helm_command, config_filename, helm_dir) @@ -437,16 +475,15 @@ class K8sHelmConnector(K8sConnector): db_dict: dict = None ): - self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid)) - - start = time.time() - end = start + timeout + self.log.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) # params to str - params_str = K8sHelmConnector._params_to_set_option(params) + # params_str = K8sHelmConnector._params_to_set_option(params) + params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params) timeout_str = '' if timeout: @@ -459,7 +496,7 @@ class K8sHelmConnector(K8sConnector): # version version_str = '' - if ':' in kdu_model: + if kdu_model and ':' in kdu_model: parts = kdu_model.split(sep=':') if len(parts) == 2: version_str = '--version {}'.format(parts[1]) @@ -469,7 +506,7 @@ class K8sHelmConnector(K8sConnector): command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\ .format(self._helm_command, atomic_str, config_filename, helm_dir, params_str, timeout_str, kdu_instance, kdu_model, version_str) - self.debug('upgrading: {}'.format(command)) + self.log.debug('upgrading: {}'.format(command)) if atomic: @@ -489,7 +526,7 @@ class K8sHelmConnector(K8sConnector): ) # wait for execution task - await asyncio.wait([ exec_task ]) + await asyncio.wait([exec_task]) # cancel status task status_task.cancel() @@ -499,6 +536,10 @@ class K8sHelmConnector(K8sConnector): output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False) + # remove temporal values yaml file + if file_to_delete: + os.remove(file_to_delete) + # write final status await self._store_status( cluster_uuid=cluster_uuid, @@ -511,14 +552,14 @@ class K8sHelmConnector(K8sConnector): if rc != 0: msg = 'Error executing command: {}\nOutput: {}'.format(command, output) - self.error(msg) - raise Exception(msg) + self.log.error(msg) + raise K8sException(msg) # return new revision number instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) if instance: revision = int(instance.get('Revision')) - self.debug('New revision: {}'.format(revision)) + self.log.debug('New revision: {}'.format(revision)) return revision else: return 0 @@ -531,11 +572,12 @@ class K8sHelmConnector(K8sConnector): db_dict: dict = None ): - self.debug('rollback kdu_instance {} to revision {} from cluster {}' + self.log.debug('rollback kdu_instance {} to revision {} from cluster {}' .format(kdu_instance, revision, cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\ .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision) @@ -575,14 +617,14 @@ class K8sHelmConnector(K8sConnector): if rc != 0: msg = 'Error executing command: {}\nOutput: {}'.format(command, output) - self.error(msg) - raise Exception(msg) + self.log.error(msg) + raise K8sException(msg) # return new revision number instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) if instance: revision = int(instance.get('Revision')) - self.debug('New revision: {}'.format(revision)) + self.log.debug('New revision: {}'.format(revision)) return revision else: return 0 @@ -601,10 +643,11 @@ class K8sHelmConnector(K8sConnector): :return: True if successful """ - self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid)) + self.log.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) command = '{} --kubeconfig={} --home={} delete --purge {}'\ .format(self._helm_command, config_filename, helm_dir, kdu_instance) @@ -613,42 +656,138 @@ class K8sHelmConnector(K8sConnector): return self._output_to_table(output) + async def exec_primitive( + self, + cluster_uuid: str = None, + kdu_instance: str = None, + primitive_name: str = None, + timeout: float = 300, + params: dict = None, + db_dict: dict = None, + ) -> str: + """Exec primitive (Juju action) + + :param cluster_uuid str: The UUID of the cluster + :param kdu_instance str: The unique name of the KDU instance + :param primitive_name: Name of action that will be executed + :param timeout: Timeout for action execution + :param params: Dictionary of all the parameters needed for the action + :db_dict: Dictionary for any additional data + + :return: Returns the output of the action + """ + raise K8sException("KDUs deployed with Helm don't support actions " + "different from rollback, upgrade and status") + async def inspect_kdu( self, - kdu_model: str + kdu_model: str, + repo_url: str = None ) -> str: - self.debug('inspect kdu_model {}'.format(kdu_model)) + self.log.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url)) - command = '{} inspect values {}'\ - .format(self._helm_command, kdu_model) + return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url) - output, rc = await self._local_async_exec(command=command) + async def values_kdu( + self, + kdu_model: str, + repo_url: str = None + ) -> str: - return output + self.log.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url)) + + return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url) async def help_kdu( self, - kdu_model: str - ): - - self.debug('help kdu_model {}'.format(kdu_model)) - - command = '{} inspect readme {}'\ - .format(self._helm_command, kdu_model) + kdu_model: str, + repo_url: str = None + ) -> str: - output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + self.log.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url)) - return output + return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url) async def status_kdu( self, cluster_uuid: str, kdu_instance: str - ): + ) -> str: - return await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, show_error_log=True) + # call internal function + return await self._status_kdu( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + show_error_log=True, + return_text=True + ) + async def synchronize_repos(self, cluster_uuid: str): + + self.log.debug("syncronize repos for cluster helm-id: {}",) + try: + update_repos_timeout = 300 # max timeout to sync a single repos, more than this is too much + db_k8scluster = self.db.get_one("k8sclusters", {"_admin.helm-chart.id": cluster_uuid}) + if db_k8scluster: + nbi_repo_list = db_k8scluster.get("_admin").get("helm_chart_repos") or [] + cluster_repo_dict = db_k8scluster.get("_admin").get("helm_charts_added") or {} + # elements that must be deleted + deleted_repo_list = [] + added_repo_dict = {} + self.log.debug("helm_chart_repos: {}".format(nbi_repo_list)) + self.log.debug("helm_charts_added: {}".format(cluster_repo_dict)) + + # obtain repos to add: registered by nbi but not added + repos_to_add = [repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)] + + # obtain repos to delete: added by cluster but not in nbi list + repos_to_delete = [repo for repo in cluster_repo_dict.keys() if repo not in nbi_repo_list] + + # delete repos: must delete first then add because there may be different repos with same name but + # different id and url + self.log.debug("repos to delete: {}".format(repos_to_delete)) + for repo_id in repos_to_delete: + # try to delete repos + try: + repo_delete_task = asyncio.ensure_future(self.repo_remove(cluster_uuid=cluster_uuid, + name=cluster_repo_dict[repo_id])) + await asyncio.wait_for(repo_delete_task, update_repos_timeout) + except Exception as e: + self.warning("Error deleting repo, id: {}, name: {}, err_msg: {}".format(repo_id, + cluster_repo_dict[repo_id], str(e))) + # always add to the list of to_delete if there is an error because if is not there deleting raises error + deleted_repo_list.append(repo_id) + + # add repos + self.log.debug("repos to add: {}".format(repos_to_add)) + add_task_list = [] + for repo_id in repos_to_add: + # obtain the repo data from the db + # if there is an error getting the repo in the database we will ignore this repo and continue + # because there is a possible race condition where the repo has been deleted while processing + db_repo = self.db.get_one("k8srepos", {"_id": repo_id}) + self.log.debug("obtained repo: id, {}, name: {}, url: {}".format(repo_id, db_repo["name"], db_repo["url"])) + try: + repo_add_task = asyncio.ensure_future(self.repo_add(cluster_uuid=cluster_uuid, + name=db_repo["name"], url=db_repo["url"], + repo_type="chart")) + await asyncio.wait_for(repo_add_task, update_repos_timeout) + added_repo_dict[repo_id] = db_repo["name"] + self.log.debug("added repo: id, {}, name: {}".format(repo_id, db_repo["name"])) + except Exception as e: + # deal with error adding repo, adding a repo that already exists does not raise any error + # will not raise error because a wrong repos added by anyone could prevent instantiating any ns + self.log.error("Error adding repo id: {}, err_msg: {} ".format(repo_id, repr(e))) + + return deleted_repo_list, added_repo_dict + + else: # else db_k8scluster does not exist + raise K8sException("k8cluster with helm-id : {} not found".format(cluster_uuid)) + + except Exception as e: + self.log.error("Error synchronizing repos: {}".format(str(e))) + raise K8sException("Error synchronizing repos") """ ################################################################################################## @@ -656,17 +795,39 @@ class K8sHelmConnector(K8sConnector): ################################################################################################## """ + async def _exec_inspect_comand( + self, + inspect_command: str, + kdu_model: str, + repo_url: str = None + ): + + repo_str = '' + if repo_url: + repo_str = ' --repo {}'.format(repo_url) + idx = kdu_model.find('/') + if idx >= 0: + idx += 1 + kdu_model = kdu_model[idx:] + + inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str) + output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True) + + return output + async def _status_kdu( self, cluster_uuid: str, kdu_instance: str, - show_error_log: bool = False + show_error_log: bool = False, + return_text: bool = False ): - self.debug('status of kdu_instance {}'.format(kdu_instance)) + self.log.debug('status of kdu_instance {}'.format(kdu_instance)) # config filename - kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) command = '{} --kubeconfig={} --home={} status {} --output yaml'\ .format(self._helm_command, config_filename, helm_dir, kdu_instance) @@ -677,6 +838,9 @@ class K8sHelmConnector(K8sConnector): show_error_log=show_error_log ) + if return_text: + return str(output) + if rc != 0: return None @@ -698,7 +862,6 @@ class K8sHelmConnector(K8sConnector): return data - async def get_instance_info( self, cluster_uuid: str, @@ -708,13 +871,22 @@ class K8sHelmConnector(K8sConnector): for instance in instances: if instance.get('Name') == kdu_instance: return instance - self.debug('Instance {} not found'.format(kdu_instance)) + self.log.debug('Instance {} not found'.format(kdu_instance)) return None @staticmethod def _generate_release_name( chart_name: str ): + # check embeded chart (file or dir) + if chart_name.startswith('/'): + # extract file or directory name + chart_name = chart_name[chart_name.rfind('/')+1:] + # check URL + elif '://' in chart_name: + # extract last portion of URL + chart_name = chart_name[chart_name.rfind('/')+1:] + name = '' for c in chart_name: if c.isalpha() or c.isnumeric(): @@ -733,8 +905,7 @@ class K8sHelmConnector(K8sConnector): def get_random_number(): r = random.randrange(start=1, stop=99999999) s = str(r) - while len(s) < 10: - s = '0' + s + s = s.rjust(10, '0') return s name = name + get_random_number() @@ -754,10 +925,8 @@ class K8sHelmConnector(K8sConnector): await asyncio.sleep(check_every) detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) status = detailed_status.get('info').get('Description') - print('=' * 60) - self.debug('STATUS:\n{}'.format(status)) - self.debug('DETAILED STATUS:\n{}'.format(detailed_status)) - print('=' * 60) + self.log.debug('STATUS:\n{}'.format(status)) + self.log.debug('DETAILED STATUS:\n{}'.format(detailed_status)) # write status to db result = await self.write_app_status_to_db( db_dict=db_dict, @@ -765,12 +934,13 @@ class K8sHelmConnector(K8sConnector): detailed_status=str(detailed_status), operation=operation) if not result: - self.info('Error writing in database. Task exiting...') + self.log.info('Error writing in database. Task exiting...') return except asyncio.CancelledError: - self.debug('Task cancelled') + self.log.debug('Task cancelled') return except Exception as e: + self.log.debug('_store_status exception: {}'.format(str(e))) pass finally: if run_once: @@ -782,7 +952,7 @@ class K8sHelmConnector(K8sConnector): kdu_instance: str ) -> bool: - status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) + status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False) # extract info.status.resources-> str # format: @@ -817,7 +987,7 @@ class K8sHelmConnector(K8sConnector): current = int(parts[0]) total = int(parts[1]) if current < total: - self.debug('NOT READY:\n {}'.format(line3)) + self.log.debug('NOT READY:\n {}'.format(line3)) ready = False line3 = resources[index] index += 1 @@ -856,6 +1026,36 @@ class K8sHelmConnector(K8sConnector): pass return None + # params for use in -f file + # returns values file option and filename (in order to delete it at the end) + def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str): + + if params and len(params) > 0: + kube_dir, helm_dir, config_filename, cluster_dir = \ + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + def get_random_number(): + r = random.randrange(start=1, stop=99999999) + s = str(r) + while len(s) < 10: + s = '0' + s + return s + + params2 = dict() + for key in params: + value = params.get(key) + if '!!yaml' in str(value): + value = yaml.load(value[7:]) + params2[key] = value + + values_file = get_random_number() + '.yaml' + with open(values_file, 'w') as stream: + yaml.dump(params2, stream, indent=4, default_flow_style=False) + + return '-f {}'.format(values_file), values_file + + return '', None + # params for use in --set option @staticmethod def _params_to_set_option(params: dict) -> str: @@ -898,13 +1098,14 @@ class K8sHelmConnector(K8sConnector): line_list.append(cell) return output_table - def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str): + def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str): """ Returns kube and helm directories :param cluster_name: :param create_if_not_exist: - :return: kube, helm directories and config filename. Raises exception if not exist and cannot create + :return: kube, helm directories, config filename and cluster dir. + Raises exception if not exist and cannot create """ base = self.fs.path @@ -914,35 +1115,35 @@ class K8sHelmConnector(K8sConnector): # base dir for cluster cluster_dir = base + '/' + cluster_name if create_if_not_exist and not os.path.exists(cluster_dir): - self.debug('Creating dir {}'.format(cluster_dir)) + self.log.debug('Creating dir {}'.format(cluster_dir)) os.makedirs(cluster_dir) if not os.path.exists(cluster_dir): msg = 'Base cluster dir {} does not exist'.format(cluster_dir) - self.error(msg) - raise Exception(msg) + self.log.error(msg) + raise K8sException(msg) # kube dir kube_dir = cluster_dir + '/' + '.kube' if create_if_not_exist and not os.path.exists(kube_dir): - self.debug('Creating dir {}'.format(kube_dir)) + self.log.debug('Creating dir {}'.format(kube_dir)) os.makedirs(kube_dir) if not os.path.exists(kube_dir): msg = 'Kube config dir {} does not exist'.format(kube_dir) - self.error(msg) - raise Exception(msg) + self.log.error(msg) + raise K8sException(msg) # helm home dir helm_dir = cluster_dir + '/' + '.helm' if create_if_not_exist and not os.path.exists(helm_dir): - self.debug('Creating dir {}'.format(helm_dir)) + self.log.debug('Creating dir {}'.format(helm_dir)) os.makedirs(helm_dir) if not os.path.exists(helm_dir): msg = 'Helm config dir {} does not exist'.format(helm_dir) - self.error(msg) - raise Exception(msg) + self.log.error(msg) + raise K8sException(msg) config_filename = kube_dir + '/config' - return kube_dir, helm_dir, config_filename + return kube_dir, helm_dir, config_filename, cluster_dir @staticmethod def _remove_multiple_spaces(str): @@ -956,13 +1157,13 @@ class K8sHelmConnector(K8sConnector): command: str ) -> (str, int): command = K8sHelmConnector._remove_multiple_spaces(command) - self.debug('Executing sync local command: {}'.format(command)) + self.log.debug('Executing sync local command: {}'.format(command)) # raise exception if fails output = '' try: output = subprocess.check_output(command, shell=True, universal_newlines=True) return_code = 0 - self.debug(output) + self.log.debug(output) except Exception as e: return_code = 1 @@ -972,11 +1173,12 @@ class K8sHelmConnector(K8sConnector): self, command: str, raise_exception_on_error: bool = False, - show_error_log: bool = False + show_error_log: bool = True, + encode_utf8: bool = False ) -> (str, int): command = K8sHelmConnector._remove_multiple_spaces(command) - self.debug('Executing async local command: {}'.format(command)) + self.log.debug('Executing async local command: {}'.format(command)) # split command command = command.split(sep=' ') @@ -996,60 +1198,45 @@ class K8sHelmConnector(K8sConnector): output = '' if stdout: output = stdout.decode('utf-8').strip() + # output = stdout.decode() if stderr: output = stderr.decode('utf-8').strip() + # output = stderr.decode() if return_code != 0 and show_error_log: - self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output)) + self.log.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output)) else: - self.debug('Return code: {}'.format(return_code)) + self.log.debug('Return code: {}'.format(return_code)) if raise_exception_on_error and return_code != 0: - raise Exception(output) + raise K8sException(output) + + if encode_utf8: + output = output.encode('utf-8').strip() + output = str(output).replace('\\n', '\n') return output, return_code + except asyncio.CancelledError: + raise + except K8sException: + raise except Exception as e: msg = 'Exception executing command: {} -> {}'.format(command, e) - if show_error_log: - self.error(msg) - return '', -1 - - def _remote_exec( - self, - hostname: str, - username: str, - password: str, - command: str, - timeout: int = 10 - ) -> (str, int): - - command = K8sHelmConnector._remove_multiple_spaces(command) - self.debug('Executing sync remote ssh command: {}'.format(command)) - - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.connect(hostname=hostname, username=username, password=password) - ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout) - output = ssh_stdout.read().decode('utf-8') - error = ssh_stderr.read().decode('utf-8') - if error: - self.error('ERROR: {}'.format(error)) - return_code = 1 - else: - return_code = 0 - output = output.replace('\\n', '\n') - self.debug('OUTPUT: {}'.format(output)) - - return output, return_code + self.log.error(msg) + if raise_exception_on_error: + raise K8sException(e) from e + else: + return '', -1 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False): - self.debug('Checking if file {} exists...'.format(filename)) + # self.log.debug('Checking if file {} exists...'.format(filename)) if os.path.exists(filename): return True else: msg = 'File {} does not exist'.format(filename) if exception_if_not_exists: - self.error(msg) - raise Exception(msg) + # self.log.error(msg) + raise K8sException(msg) +