From: quilesj Date: Mon, 28 Oct 2019 17:10:42 +0000 (+0100) Subject: K8s helm connector X-Git-Tag: v7.0.0rc1~17^2~2 X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=commitdiff_plain;h=26c78a4b25fcbfc92362d730403460d736031736;hp=a049b7ce5d1606440447a88a98dd70548a1a0c74 K8s helm connector Change-Id: I5332ec8e053643bc582f7979af98b46a3f9f31ee Signed-off-by: quilesj --- diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py new file mode 100644 index 0000000..88c94c5 --- /dev/null +++ b/n2vc/k8s_helm_conn.py @@ -0,0 +1,1055 @@ +## +# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. +# This file is part of OSM +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: nfvlabs@tid.es +## + +import paramiko +import subprocess +import os +import shutil +import asyncio +import uuid +import time +import yaml +from uuid import uuid4 +import random +from n2vc.k8s_conn import K8sConnector + + +class K8sHelmConnector(K8sConnector): + + """ + ################################################################################################## + ########################################## P U B L I C ########################################### + ################################################################################################## + """ + + def __init__( + self, + fs: object, + db: object, + kubectl_command: str = '/usr/bin/kubectl', + helm_command: str = '/usr/bin/helm', + log: object = None, + on_update_db=None + ): + """ + + :param fs: file system for kubernetes and helm configuration + :param db: database object to write current operation status + :param kubectl_command: path to kubectl executable + :param helm_command: path to helm executable + :param log: logger + :param on_update_db: callback called when k8s connector updates database + """ + + # parent class + K8sConnector.__init__( + self, + db=db, + log=log, + on_update_db=on_update_db + ) + + self.info('Initializing K8S Helm connector') + + # random numbers for release name generation + random.seed(time.time()) + + # the file system + self.fs = fs + + # exception if kubectl is not installed + self.kubectl_command = kubectl_command + self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True) + + # exception if helm is not installed + self._helm_command = helm_command + self._check_file_exists(filename=helm_command, exception_if_not_exists=True) + + self.info('K8S Helm connector initialized') + + async def init_env( + self, + k8s_creds: str, + namespace: str = 'kube-system', + reuse_cluster_uuid=None + ) -> (str, bool): + + cluster_uuid = reuse_cluster_uuid + if not cluster_uuid: + cluster_uuid = str(uuid4()) + + self.debug('Initializing K8S environment. namespace: {}'.format(namespace)) + + # create config filename + kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + f = open(config_filename, "w") + f.write(k8s_creds) + f.close() + + # check if tiller pod is up in cluster + command = '{} --kubeconfig={} --namespace={} get deployments'\ + .format(self.kubectl_command, config_filename, namespace) + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + + output_table = K8sHelmConnector._output_to_table(output=output) + + # find 'tiller' pod in all pods + already_initialized = False + try: + for row in output_table: + if row[0].startswith('tiller-deploy'): + already_initialized = True + break + except Exception as e: + pass + + # helm init + n2vc_installed_sw = False + if not already_initialized: + self.info('Initializing helm in client and server: {}'.format(cluster_uuid)) + command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\ + .format(self._helm_command, config_filename, namespace, helm_dir) + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + n2vc_installed_sw = True + else: + # check client helm installation + check_file = helm_dir + '/repository/repositories.yaml' + if not self._check_file_exists(filename=check_file, exception_if_not_exists=False): + self.info('Initializing helm in client: {}'.format(cluster_uuid)) + command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\ + .format(self._helm_command, config_filename, namespace, helm_dir) + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + else: + self.info('Helm client already initialized') + + self.info('Cluster initialized {}'.format(cluster_uuid)) + + return cluster_uuid, n2vc_installed_sw + + async def repo_add( + self, + cluster_uuid: str, + name: str, + url: str, + repo_type: str = 'chart' + ): + + self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url)) + + # config filename + kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + # helm repo update + command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir) + self.debug('updating repo: {}'.format(command)) + await self._local_async_exec(command=command, raise_exception_on_error=False) + + # helm repo add name url + command = '{} --kubeconfig={} --home={} repo add {} {}'\ + .format(self._helm_command, config_filename, helm_dir, name, url) + self.debug('adding repo: {}'.format(command)) + await self._local_async_exec(command=command, raise_exception_on_error=True) + + async def repo_list( + self, + cluster_uuid: str + ) -> list: + """ + Get the list of registered repositories + + :return: list of registered repositories: [ (name, url) .... ] + """ + + self.debug('list repositories for cluster {}'.format(cluster_uuid)) + + # config filename + kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + command = '{} --kubeconfig={} --home={} repo list --output yaml'.format(self._helm_command, config_filename, helm_dir) + + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + if output and len(output) > 0: + return yaml.load(output, Loader=yaml.SafeLoader) + else: + return [] + + async def repo_remove( + self, + cluster_uuid: str, + name: str + ): + """ + Remove a repository from OSM + + :param cluster_uuid: the cluster + :param name: repo name in OSM + :return: True if successful + """ + + self.debug('list repositories for cluster {}'.format(cluster_uuid)) + + # config filename + kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + command = '{} --kubeconfig={} --home={} repo remove {}'\ + .format(self._helm_command, config_filename, helm_dir, name) + + await self._local_async_exec(command=command, raise_exception_on_error=True) + + async def reset( + self, + cluster_uuid: str, + force: bool = False, + uninstall_sw: bool = False + ) -> bool: + + self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid)) + + # get kube and helm directories + kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False) + + # uninstall releases if needed + releases = await self.instances_list(cluster_uuid=cluster_uuid) + if len(releases) > 0: + if force: + for r in releases: + try: + kdu_instance = r.get('Name') + chart = r.get('Chart') + self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance)) + await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) + except Exception as e: + self.error('Error uninstalling release {}: {}'.format(kdu_instance, e)) + else: + msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\ + .format(cluster_uuid) + self.error(msg) + raise Exception(msg) + + if uninstall_sw: + + self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid)) + + # find namespace for tiller pod + command = '{} --kubeconfig={} get deployments --all-namespaces'\ + .format(self.kubectl_command, config_filename) + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False) + output_table = K8sHelmConnector._output_to_table(output=output) + namespace = None + for r in output_table: + try: + if 'tiller-deploy' in r[1]: + namespace = r[0] + break + except Exception as e: + pass + else: + msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid) + self.error(msg) + # raise Exception(msg) + + self.debug('namespace for tiller: {}'.format(namespace)) + + force_str = '--force' + + if namespace: + # delete tiller deployment + self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace)) + command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\ + .format(self.kubectl_command, namespace, config_filename, force_str) + await self._local_async_exec(command=command, raise_exception_on_error=False) + + # uninstall tiller from cluster + self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid)) + command = '{} --kubeconfig={} --home={} reset'\ + .format(self._helm_command, config_filename, helm_dir) + self.debug('resetting: {}'.format(command)) + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + else: + self.debug('namespace not found') + + # delete cluster directory + dir = self.fs.path + '/' + cluster_uuid + self.debug('Removing directory {}'.format(dir)) + shutil.rmtree(dir, ignore_errors=True) + + return True + + async def install( + self, + cluster_uuid: str, + kdu_model: str, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None + ): + + self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid)) + + start = time.time() + end = start + timeout + + # config filename + kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + # params to str + params_str = K8sHelmConnector._params_to_set_option(params) + + timeout_str = '' + if timeout: + timeout_str = '--timeout {}'.format(timeout) + + # atomic + atomic_str = '' + if atomic: + atomic_str = '--atomic' + + # version + version_str = '' + if ':' in kdu_model: + parts = kdu_model.split(sep=':') + if len(parts) == 2: + version_str = '--version {}'.format(parts[1]) + kdu_model = parts[0] + + # generate a name for the releas. Then, check if already exists + kdu_instance = None + while kdu_instance is None: + kdu_instance = K8sHelmConnector._generate_release_name(kdu_model) + try: + result = await self._status_kdu( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + show_error_log=False + ) + if result is not None: + # instance already exists: generate a new one + kdu_instance = None + except: + kdu_instance = None + + # helm repo install + command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\ + .format(self._helm_command, atomic_str, config_filename, helm_dir, + params_str, timeout_str, kdu_instance, kdu_model, version_str) + self.debug('installing: {}'.format(command)) + + if atomic: + # exec helm in a task + exec_task = asyncio.ensure_future( + coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False) + ) + # write status in another task + status_task = asyncio.ensure_future( + coro_or_future=self._store_status( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + db_dict=db_dict, + operation='install', + run_once=False + ) + ) + + # wait for execution task + await asyncio.wait([exec_task]) + + # cancel status task + status_task.cancel() + + output, rc = exec_task.result() + + else: + + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False) + + # write final status + await self._store_status( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + db_dict=db_dict, + operation='install', + run_once=True, + check_every=0 + ) + + if rc != 0: + msg = 'Error executing command: {}\nOutput: {}'.format(command, output) + self.error(msg) + raise Exception(msg) + + self.debug('Returning kdu_instance {}'.format(kdu_instance)) + return kdu_instance + + async def instances_list( + self, + cluster_uuid: str + ) -> list: + """ + returns a list of deployed releases in a cluster + + :param cluster_uuid: the cluster + :return: + """ + + self.debug('list releases for cluster {}'.format(cluster_uuid)) + + # config filename + kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + command = '{} --kubeconfig={} --home={} list --output yaml'\ + .format(self._helm_command, config_filename, helm_dir) + + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + + if output and len(output) > 0: + return yaml.load(output, Loader=yaml.SafeLoader).get('Releases') + else: + return [] + + async def upgrade( + self, + cluster_uuid: str, + kdu_instance: str, + kdu_model: str = None, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None + ): + + self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid)) + + start = time.time() + end = start + timeout + + # config filename + kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + # params to str + params_str = K8sHelmConnector._params_to_set_option(params) + + timeout_str = '' + if timeout: + timeout_str = '--timeout {}'.format(timeout) + + # atomic + atomic_str = '' + if atomic: + atomic_str = '--atomic' + + # version + version_str = '' + if ':' in kdu_model: + parts = kdu_model.split(sep=':') + if len(parts) == 2: + version_str = '--version {}'.format(parts[1]) + kdu_model = parts[0] + + # helm repo upgrade + command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\ + .format(self._helm_command, atomic_str, config_filename, helm_dir, + params_str, timeout_str, kdu_instance, kdu_model, version_str) + self.debug('upgrading: {}'.format(command)) + + if atomic: + + # exec helm in a task + exec_task = asyncio.ensure_future( + coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False) + ) + # write status in another task + status_task = asyncio.ensure_future( + coro_or_future=self._store_status( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + db_dict=db_dict, + operation='upgrade', + run_once=False + ) + ) + + # wait for execution task + await asyncio.wait([ exec_task ]) + + # cancel status task + status_task.cancel() + output, rc = exec_task.result() + + else: + + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False) + + # write final status + await self._store_status( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + db_dict=db_dict, + operation='upgrade', + run_once=True, + check_every=0 + ) + + if rc != 0: + msg = 'Error executing command: {}\nOutput: {}'.format(command, output) + self.error(msg) + raise Exception(msg) + + # return new revision number + instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) + if instance: + revision = int(instance.get('Revision')) + self.debug('New revision: {}'.format(revision)) + return revision + else: + return 0 + + async def rollback( + self, + cluster_uuid: str, + kdu_instance: str, + revision=0, + db_dict: dict = None + ): + + self.debug('rollback kdu_instance {} to revision {} from cluster {}' + .format(kdu_instance, revision, cluster_uuid)) + + # config filename + kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\ + .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision) + + # exec helm in a task + exec_task = asyncio.ensure_future( + coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False) + ) + # write status in another task + status_task = asyncio.ensure_future( + coro_or_future=self._store_status( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + db_dict=db_dict, + operation='rollback', + run_once=False + ) + ) + + # wait for execution task + await asyncio.wait([exec_task]) + + # cancel status task + status_task.cancel() + + output, rc = exec_task.result() + + # write final status + await self._store_status( + cluster_uuid=cluster_uuid, + kdu_instance=kdu_instance, + db_dict=db_dict, + operation='rollback', + run_once=True, + check_every=0 + ) + + if rc != 0: + msg = 'Error executing command: {}\nOutput: {}'.format(command, output) + self.error(msg) + raise Exception(msg) + + # return new revision number + instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) + if instance: + revision = int(instance.get('Revision')) + self.debug('New revision: {}'.format(revision)) + return revision + else: + return 0 + + async def uninstall( + self, + cluster_uuid: str, + kdu_instance: str + ): + """ + Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen + after all _terminate-config-primitive_ of the VNF are invoked). + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_instance: unique name for the KDU instance to be deleted + :return: True if successful + """ + + self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid)) + + # config filename + kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + command = '{} --kubeconfig={} --home={} delete --purge {}'\ + .format(self._helm_command, config_filename, helm_dir, kdu_instance) + + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + + return self._output_to_table(output) + + async def inspect_kdu( + self, + kdu_model: str + ) -> str: + + self.debug('inspect kdu_model {}'.format(kdu_model)) + + command = '{} inspect values {}'\ + .format(self._helm_command, kdu_model) + + output, rc = await self._local_async_exec(command=command) + + return output + + async def help_kdu( + self, + kdu_model: str + ): + + self.debug('help kdu_model {}'.format(kdu_model)) + + command = '{} inspect readme {}'\ + .format(self._helm_command, kdu_model) + + output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + + return output + + async def status_kdu( + self, + cluster_uuid: str, + kdu_instance: str + ): + + return await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, show_error_log=True) + + + """ + ################################################################################################## + ########################################## P R I V A T E ######################################### + ################################################################################################## + """ + + async def _status_kdu( + self, + cluster_uuid: str, + kdu_instance: str, + show_error_log: bool = False + ): + + self.debug('status of kdu_instance {}'.format(kdu_instance)) + + # config filename + kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + + command = '{} --kubeconfig={} --home={} status {} --output yaml'\ + .format(self._helm_command, config_filename, helm_dir, kdu_instance) + + output, rc = await self._local_async_exec( + command=command, + raise_exception_on_error=True, + show_error_log=show_error_log + ) + + if rc != 0: + return None + + data = yaml.load(output, Loader=yaml.SafeLoader) + + # remove field 'notes' + try: + del data.get('info').get('status')['notes'] + except KeyError: + pass + + # parse field 'resources' + try: + resources = str(data.get('info').get('status').get('resources')) + resource_table = self._output_to_table(resources) + data.get('info').get('status')['resources'] = resource_table + except Exception as e: + pass + + return data + + + async def get_instance_info( + self, + cluster_uuid: str, + kdu_instance: str + ): + instances = await self.instances_list(cluster_uuid=cluster_uuid) + for instance in instances: + if instance.get('Name') == kdu_instance: + return instance + self.debug('Instance {} not found'.format(kdu_instance)) + return None + + @staticmethod + def _generate_release_name( + chart_name: str + ): + name = '' + for c in chart_name: + if c.isalpha() or c.isnumeric(): + name += c + else: + name += '-' + if len(name) > 35: + name = name[0:35] + + # if does not start with alpha character, prefix 'a' + if not name[0].isalpha(): + name = 'a' + name + + name += '-' + + def get_random_number(): + r = random.randrange(start=1, stop=99999999) + s = str(r) + while len(s) < 10: + s = '0' + s + return s + + name = name + get_random_number() + return name.lower() + + async def _store_status( + self, + cluster_uuid: str, + operation: str, + kdu_instance: str, + check_every: float = 10, + db_dict: dict = None, + run_once: bool = False + ): + while True: + try: + await asyncio.sleep(check_every) + detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) + status = detailed_status.get('info').get('Description') + print('=' * 60) + self.debug('STATUS:\n{}'.format(status)) + self.debug('DETAILED STATUS:\n{}'.format(detailed_status)) + print('=' * 60) + # write status to db + result = await self.write_app_status_to_db( + db_dict=db_dict, + status=str(status), + detailed_status=str(detailed_status), + operation=operation) + if not result: + self.info('Error writing in database. Task exiting...') + return + except asyncio.CancelledError: + self.debug('Task cancelled') + return + except Exception as e: + pass + finally: + if run_once: + return + + async def _is_install_completed( + self, + cluster_uuid: str, + kdu_instance: str + ) -> bool: + + status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) + + # extract info.status.resources-> str + # format: + # ==> v1/Deployment + # NAME READY UP-TO-DATE AVAILABLE AGE + # halting-horse-mongodb 0/1 1 0 0s + # halting-petit-mongodb 1/1 1 0 0s + # blank line + resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources')) + + # convert to table + resources = K8sHelmConnector._output_to_table(resources) + + num_lines = len(resources) + index = 0 + while index < num_lines: + try: + line1 = resources[index] + index += 1 + # find '==>' in column 0 + if line1[0] == '==>': + line2 = resources[index] + index += 1 + # find READY in column 1 + if line2[1] == 'READY': + # read next lines + line3 = resources[index] + index += 1 + while len(line3) > 1 and index < num_lines: + ready_value = line3[1] + parts = ready_value.split(sep='/') + current = int(parts[0]) + total = int(parts[1]) + if current < total: + self.debug('NOT READY:\n {}'.format(line3)) + ready = False + line3 = resources[index] + index += 1 + + except Exception as e: + pass + + return ready + + @staticmethod + def _get_deep(dictionary: dict, members: tuple): + target = dictionary + value = None + try: + for m in members: + value = target.get(m) + if not value: + return None + else: + target = value + except Exception as e: + pass + return value + + # find key:value in several lines + @staticmethod + def _find_in_lines(p_lines: list, p_key: str) -> str: + for line in p_lines: + try: + if line.startswith(p_key + ':'): + parts = line.split(':') + the_value = parts[1].strip() + return the_value + except Exception as e: + # ignore it + pass + return None + + # params for use in --set option + @staticmethod + def _params_to_set_option(params: dict) -> str: + params_str = '' + if params and len(params) > 0: + start = True + for key in params: + value = params.get(key, None) + if value is not None: + if start: + params_str += '--set ' + start = False + else: + params_str += ',' + params_str += '{}={}'.format(key, value) + return params_str + + @staticmethod + def _output_to_lines(output: str) -> list: + output_lines = list() + lines = output.splitlines(keepends=False) + for line in lines: + line = line.strip() + if len(line) > 0: + output_lines.append(line) + return output_lines + + @staticmethod + def _output_to_table(output: str) -> list: + output_table = list() + lines = output.splitlines(keepends=False) + for line in lines: + line = line.replace('\t', ' ') + line_list = list() + output_table.append(line_list) + cells = line.split(sep=' ') + for cell in cells: + cell = cell.strip() + if len(cell) > 0: + line_list.append(cell) + return output_table + + def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str): + """ + Returns kube and helm directories + + :param cluster_name: + :param create_if_not_exist: + :return: kube, helm directories and config filename. Raises exception if not exist and cannot create + """ + + base = self.fs.path + if base.endswith("/") or base.endswith("\\"): + base = base[:-1] + + # base dir for cluster + cluster_dir = base + '/' + cluster_name + if create_if_not_exist and not os.path.exists(cluster_dir): + self.debug('Creating dir {}'.format(cluster_dir)) + os.makedirs(cluster_dir) + if not os.path.exists(cluster_dir): + msg = 'Base cluster dir {} does not exist'.format(cluster_dir) + self.error(msg) + raise Exception(msg) + + # kube dir + kube_dir = cluster_dir + '/' + '.kube' + if create_if_not_exist and not os.path.exists(kube_dir): + self.debug('Creating dir {}'.format(kube_dir)) + os.makedirs(kube_dir) + if not os.path.exists(kube_dir): + msg = 'Kube config dir {} does not exist'.format(kube_dir) + self.error(msg) + raise Exception(msg) + + # helm home dir + helm_dir = cluster_dir + '/' + '.helm' + if create_if_not_exist and not os.path.exists(helm_dir): + self.debug('Creating dir {}'.format(helm_dir)) + os.makedirs(helm_dir) + if not os.path.exists(helm_dir): + msg = 'Helm config dir {} does not exist'.format(helm_dir) + self.error(msg) + raise Exception(msg) + + config_filename = kube_dir + '/config' + return kube_dir, helm_dir, config_filename + + @staticmethod + def _remove_multiple_spaces(str): + str = str.strip() + while ' ' in str: + str = str.replace(' ', ' ') + return str + + def _local_exec( + self, + command: str + ) -> (str, int): + command = K8sHelmConnector._remove_multiple_spaces(command) + self.debug('Executing sync local command: {}'.format(command)) + # raise exception if fails + output = '' + try: + output = subprocess.check_output(command, shell=True, universal_newlines=True) + return_code = 0 + self.debug(output) + except Exception as e: + return_code = 1 + + return output, return_code + + async def _local_async_exec( + self, + command: str, + raise_exception_on_error: bool = False, + show_error_log: bool = False + ) -> (str, int): + + command = K8sHelmConnector._remove_multiple_spaces(command) + self.debug('Executing async local command: {}'.format(command)) + + # split command + command = command.split(sep=' ') + + try: + process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + # wait for command terminate + stdout, stderr = await process.communicate() + + return_code = process.returncode + + output = '' + if stdout: + output = stdout.decode('utf-8').strip() + if stderr: + output = stderr.decode('utf-8').strip() + + if return_code != 0 and show_error_log: + self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output)) + else: + self.debug('Return code: {}'.format(return_code)) + + if raise_exception_on_error and return_code != 0: + raise Exception(output) + + return output, return_code + + except Exception as e: + msg = 'Exception executing command: {} -> {}'.format(command, e) + if show_error_log: + self.error(msg) + return '', -1 + + def _remote_exec( + self, + hostname: str, + username: str, + password: str, + command: str, + timeout: int = 10 + ) -> (str, int): + + command = K8sHelmConnector._remove_multiple_spaces(command) + self.debug('Executing sync remote ssh command: {}'.format(command)) + + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(hostname=hostname, username=username, password=password) + ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout) + output = ssh_stdout.read().decode('utf-8') + error = ssh_stderr.read().decode('utf-8') + if error: + self.error('ERROR: {}'.format(error)) + return_code = 1 + else: + return_code = 0 + output = output.replace('\\n', '\n') + self.debug('OUTPUT: {}'.format(output)) + + return output, return_code + + def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False): + self.debug('Checking if file {} exists...'.format(filename)) + if os.path.exists(filename): + return True + else: + msg = 'File {} does not exist'.format(filename) + if exception_if_not_exists: + self.error(msg) + raise Exception(msg) +