# contact with: nfvlabs@tid.es
##
-import paramiko
-import subprocess
+import asyncio
import os
+import random
import shutil
-import asyncio
+import subprocess
import time
-import yaml
from uuid import uuid4
-import random
+
+from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
+import yaml
class K8sHelmConnector(K8sConnector):
"""
- ##################################################################################################
- ########################################## P U B L I C ###########################################
- ##################################################################################################
+ ####################################################################################
+ ################################### P U B L I C ####################################
+ ####################################################################################
"""
def __init__(
- self,
- fs: object,
- db: object,
- kubectl_command: str = '/usr/bin/kubectl',
- helm_command: str = '/usr/bin/helm',
- log: object = None,
- on_update_db=None
+ self,
+ fs: object,
+ db: object,
+ kubectl_command: str = "/usr/bin/kubectl",
+ helm_command: str = "/usr/bin/helm",
+ log: object = None,
+ on_update_db=None,
):
"""
"""
# parent class
- K8sConnector.__init__(
- self,
- db=db,
- log=log,
- on_update_db=on_update_db
- )
+ K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
- self.info('Initializing K8S Helm connector')
+ self.log.info("Initializing K8S Helm connector")
# random numbers for release name generation
random.seed(time.time())
self._helm_command = helm_command
self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
- self.info('K8S Helm connector initialized')
+ # initialize helm client-only
+ self.log.debug("Initializing helm client-only...")
+ command = "{} init --client-only".format(self._helm_command)
+ try:
+ asyncio.ensure_future(
+ self._local_async_exec(command=command, raise_exception_on_error=False)
+ )
+ # loop = asyncio.get_event_loop()
+ # loop.run_until_complete(self._local_async_exec(command=command,
+ # raise_exception_on_error=False))
+ except Exception as e:
+ self.warning(
+ msg="helm init failed (it was already initialized): {}".format(e)
+ )
+
+ self.log.info("K8S Helm connector initialized")
async def init_env(
- self,
- k8s_creds: str,
- namespace: str = 'kube-system',
- reuse_cluster_uuid=None
+ self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
) -> (str, bool):
+ """
+ It prepares a given K8s cluster environment to run Charts on both sides:
+ client (OSM)
+ server (Tiller)
+
+ :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
+ '.kube/config'
+ :param namespace: optional namespace to be used for helm. By default,
+ 'kube-system' will be used
+ :param reuse_cluster_uuid: existing cluster uuid for reuse
+ :return: uuid of the K8s cluster and True if connector has installed some
+ software in the cluster
+ (on error, an exception will be raised)
+ """
cluster_uuid = reuse_cluster_uuid
if not cluster_uuid:
cluster_uuid = str(uuid4())
- self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
+ self.log.debug("Initializing K8S environment. namespace: {}".format(namespace))
# create config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
f = open(config_filename, "w")
f.write(k8s_creds)
f.close()
# check if tiller pod is up in cluster
- command = '{} --kubeconfig={} --namespace={} get deployments'\
- .format(self.kubectl_command, config_filename, namespace)
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ command = "{} --kubeconfig={} --namespace={} get deployments".format(
+ self.kubectl_command, config_filename, namespace
+ )
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
output_table = K8sHelmConnector._output_to_table(output=output)
already_initialized = False
try:
for row in output_table:
- if row[0].startswith('tiller-deploy'):
+ if row[0].startswith("tiller-deploy"):
already_initialized = True
break
- except Exception as e:
+ except Exception:
pass
# helm init
n2vc_installed_sw = False
if not already_initialized:
- self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
- command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
- .format(self._helm_command, config_filename, namespace, helm_dir)
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ self.log.info(
+ "Initializing helm in client and server: {}".format(cluster_uuid)
+ )
+ command = "{} --kubeconfig={} --tiller-namespace={} --home={} init".format(
+ self._helm_command, config_filename, namespace, helm_dir
+ )
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
n2vc_installed_sw = True
else:
# check client helm installation
- check_file = helm_dir + '/repository/repositories.yaml'
- if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
- self.info('Initializing helm in client: {}'.format(cluster_uuid))
- command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
- .format(self._helm_command, config_filename, namespace, helm_dir)
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ check_file = helm_dir + "/repository/repositories.yaml"
+ if not self._check_file_exists(
+ filename=check_file, exception_if_not_exists=False
+ ):
+ self.log.info("Initializing helm in client: {}".format(cluster_uuid))
+ command = (
+ "{} --kubeconfig={} --tiller-namespace={} "
+ "--home={} init --client-only"
+ ).format(self._helm_command, config_filename, namespace, helm_dir)
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
else:
- self.info('Helm client already initialized')
+ self.log.info("Helm client already initialized")
- self.info('Cluster initialized {}'.format(cluster_uuid))
+ self.log.info("Cluster initialized {}".format(cluster_uuid))
return cluster_uuid, n2vc_installed_sw
async def repo_add(
- self,
- cluster_uuid: str,
- name: str,
- url: str,
- repo_type: str = 'chart'
+ self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
):
- self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
+ self.log.debug("adding {} repository {}. URL: {}".format(repo_type, name, url))
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
# helm repo update
- command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
- self.debug('updating repo: {}'.format(command))
+ command = "{} --kubeconfig={} --home={} repo update".format(
+ self._helm_command, config_filename, helm_dir
+ )
+ self.log.debug("updating repo: {}".format(command))
await self._local_async_exec(command=command, raise_exception_on_error=False)
# helm repo add name url
- command = '{} --kubeconfig={} --home={} repo add {} {}'\
- .format(self._helm_command, config_filename, helm_dir, name, url)
- self.debug('adding repo: {}'.format(command))
+ command = "{} --kubeconfig={} --home={} repo add {} {}".format(
+ self._helm_command, config_filename, helm_dir, name, url
+ )
+ self.log.debug("adding repo: {}".format(command))
await self._local_async_exec(command=command, raise_exception_on_error=True)
- async def repo_list(
- self,
- cluster_uuid: str
- ) -> list:
+ async def repo_list(self, cluster_uuid: str) -> list:
"""
Get the list of registered repositories
:return: list of registered repositories: [ (name, url) .... ]
"""
- self.debug('list repositories for cluster {}'.format(cluster_uuid))
+ self.log.debug("list repositories for cluster {}".format(cluster_uuid))
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
- command = '{} --kubeconfig={} --home={} repo list --output yaml'.format(self._helm_command, config_filename, helm_dir)
+ command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
+ self._helm_command, config_filename, helm_dir
+ )
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
if output and len(output) > 0:
return yaml.load(output, Loader=yaml.SafeLoader)
else:
return []
- async def repo_remove(
- self,
- cluster_uuid: str,
- name: str
- ):
+ async def repo_remove(self, cluster_uuid: str, name: str):
"""
Remove a repository from OSM
:return: True if successful
"""
- self.debug('list repositories for cluster {}'.format(cluster_uuid))
+ self.log.debug("list repositories for cluster {}".format(cluster_uuid))
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
- command = '{} --kubeconfig={} --home={} repo remove {}'\
- .format(self._helm_command, config_filename, helm_dir, name)
+ command = "{} --kubeconfig={} --home={} repo remove {}".format(
+ self._helm_command, config_filename, helm_dir, name
+ )
await self._local_async_exec(command=command, raise_exception_on_error=True)
async def reset(
- self,
- cluster_uuid: str,
- force: bool = False,
- uninstall_sw: bool = False
+ self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
) -> bool:
- self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
+ self.log.debug(
+ "Resetting K8s environment. cluster uuid: {}".format(cluster_uuid)
+ )
# get kube and helm directories
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=False
+ )
# uninstall releases if needed
releases = await self.instances_list(cluster_uuid=cluster_uuid)
if force:
for r in releases:
try:
- kdu_instance = r.get('Name')
- chart = r.get('Chart')
- self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
- await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+ kdu_instance = r.get("Name")
+ chart = r.get("Chart")
+ self.log.debug(
+ "Uninstalling {} -> {}".format(chart, kdu_instance)
+ )
+ await self.uninstall(
+ cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+ )
except Exception as e:
- self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
+ self.log.error(
+ "Error uninstalling release {}: {}".format(kdu_instance, e)
+ )
else:
- msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
- .format(cluster_uuid)
- self.error(msg)
- raise Exception(msg)
+ msg = (
+ "Cluster has releases and not force. Cannot reset K8s "
+ "environment. Cluster uuid: {}"
+ ).format(cluster_uuid)
+ self.log.error(msg)
+ raise K8sException(msg)
if uninstall_sw:
- self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
+ self.log.debug("Uninstalling tiller from cluster {}".format(cluster_uuid))
# find namespace for tiller pod
- command = '{} --kubeconfig={} get deployments --all-namespaces'\
- .format(self.kubectl_command, config_filename)
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+ command = "{} --kubeconfig={} get deployments --all-namespaces".format(
+ self.kubectl_command, config_filename
+ )
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False
+ )
output_table = K8sHelmConnector._output_to_table(output=output)
namespace = None
for r in output_table:
try:
- if 'tiller-deploy' in r[1]:
+ if "tiller-deploy" in r[1]:
namespace = r[0]
break
- except Exception as e:
+ except Exception:
pass
else:
- msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
- self.error(msg)
- # raise Exception(msg)
+ msg = "Tiller deployment not found in cluster {}".format(cluster_uuid)
+ self.log.error(msg)
- self.debug('namespace for tiller: {}'.format(namespace))
+ self.log.debug("namespace for tiller: {}".format(namespace))
- force_str = '--force'
+ force_str = "--force"
if namespace:
# delete tiller deployment
- self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
- command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
- .format(self.kubectl_command, namespace, config_filename, force_str)
- await self._local_async_exec(command=command, raise_exception_on_error=False)
+ self.log.debug(
+ "Deleting tiller deployment for cluster {}, namespace {}".format(
+ cluster_uuid, namespace
+ )
+ )
+ command = (
+ "{} --namespace {} --kubeconfig={} {} delete deployment "
+ "tiller-deploy"
+ ).format(self.kubectl_command, namespace, config_filename, force_str)
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=False
+ )
# uninstall tiller from cluster
- self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
- command = '{} --kubeconfig={} --home={} reset'\
- .format(self._helm_command, config_filename, helm_dir)
- self.debug('resetting: {}'.format(command))
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ self.log.debug(
+ "Uninstalling tiller from cluster {}".format(cluster_uuid)
+ )
+ command = "{} --kubeconfig={} --home={} reset".format(
+ self._helm_command, config_filename, helm_dir
+ )
+ self.log.debug("resetting: {}".format(command))
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
else:
- self.debug('namespace not found')
+ self.log.debug("namespace not found")
# delete cluster directory
- dir = self.fs.path + '/' + cluster_uuid
- self.debug('Removing directory {}'.format(dir))
- shutil.rmtree(dir, ignore_errors=True)
+ direct = self.fs.path + "/" + cluster_uuid
+ self.log.debug("Removing directory {}".format(direct))
+ shutil.rmtree(direct, ignore_errors=True)
return True
async def install(
- self,
- cluster_uuid: str,
- kdu_model: str,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None
+ self,
+ cluster_uuid: str,
+ kdu_model: str,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ kdu_name: str = None,
+ namespace: str = None,
):
- self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
-
- start = time.time()
- end = start + timeout
+ self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
# params to str
# params_str = K8sHelmConnector._params_to_set_option(params)
- params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
+ params_str, file_to_delete = self._params_to_file_option(
+ cluster_uuid=cluster_uuid, params=params
+ )
- timeout_str = ''
+ timeout_str = ""
if timeout:
- timeout_str = '--timeout {}'.format(timeout)
+ timeout_str = "--timeout {}".format(timeout)
# atomic
- atomic_str = ''
+ atomic_str = ""
if atomic:
- atomic_str = '--atomic'
+ atomic_str = "--atomic"
+ # namespace
+ namespace_str = ""
+ if namespace:
+ namespace_str = "--namespace {}".format(namespace)
# version
- version_str = ''
- if ':' in kdu_model:
- parts = kdu_model.split(sep=':')
+ version_str = ""
+ if ":" in kdu_model:
+ parts = kdu_model.split(sep=":")
if len(parts) == 2:
- version_str = '--version {}'.format(parts[1])
+ version_str = "--version {}".format(parts[1])
kdu_model = parts[0]
- # generate a name for the releas. Then, check if already exists
+ # generate a name for the release. Then, check if already exists
kdu_instance = None
while kdu_instance is None:
kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
result = await self._status_kdu(
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
- show_error_log=False
+ show_error_log=False,
)
if result is not None:
# instance already exists: generate a new one
kdu_instance = None
- except:
- kdu_instance = None
+ except K8sException:
+ pass
# helm repo install
- command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
- .format(self._helm_command, atomic_str, config_filename, helm_dir,
- params_str, timeout_str, kdu_instance, kdu_model, version_str)
- self.debug('installing: {}'.format(command))
+ command = (
+ "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} "
+ "{params} {timeout} --name={name} {ns} {model} {ver}".format(
+ helm=self._helm_command,
+ atomic=atomic_str,
+ config=config_filename,
+ dir=helm_dir,
+ params=params_str,
+ timeout=timeout_str,
+ name=kdu_instance,
+ ns=namespace_str,
+ model=kdu_model,
+ ver=version_str,
+ )
+ )
+ self.log.debug("installing: {}".format(command))
if atomic:
# exec helm in a task
exec_task = asyncio.ensure_future(
- coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
+ coro_or_future=self._local_async_exec(
+ command=command, raise_exception_on_error=False
+ )
)
+
# write status in another task
status_task = asyncio.ensure_future(
coro_or_future=self._store_status(
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
db_dict=db_dict,
- operation='install',
- run_once=False
+ operation="install",
+ run_once=False,
)
)
else:
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+ output, rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False
+ )
# remove temporal values yaml file
if file_to_delete:
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
db_dict=db_dict,
- operation='install',
+ operation="install",
run_once=True,
- check_every=0
+ check_every=0,
)
if rc != 0:
- msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
- self.error(msg)
- raise Exception(msg)
+ msg = "Error executing command: {}\nOutput: {}".format(command, output)
+ self.log.error(msg)
+ raise K8sException(msg)
- self.debug('Returning kdu_instance {}'.format(kdu_instance))
+ self.log.debug("Returning kdu_instance {}".format(kdu_instance))
return kdu_instance
- async def instances_list(
- self,
- cluster_uuid: str
- ) -> list:
+ async def instances_list(self, cluster_uuid: str) -> list:
"""
returns a list of deployed releases in a cluster
:return:
"""
- self.debug('list releases for cluster {}'.format(cluster_uuid))
+ self.log.debug("list releases for cluster {}".format(cluster_uuid))
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
- command = '{} --kubeconfig={} --home={} list --output yaml'\
- .format(self._helm_command, config_filename, helm_dir)
+ command = "{} --kubeconfig={} --home={} list --output yaml".format(
+ self._helm_command, config_filename, helm_dir
+ )
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
if output and len(output) > 0:
- return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
+ return yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
else:
return []
async def upgrade(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- kdu_model: str = None,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ kdu_model: str = None,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
):
- self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
-
- start = time.time()
- end = start + timeout
+ self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
# params to str
# params_str = K8sHelmConnector._params_to_set_option(params)
- params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
+ params_str, file_to_delete = self._params_to_file_option(
+ cluster_uuid=cluster_uuid, params=params
+ )
- timeout_str = ''
+ timeout_str = ""
if timeout:
- timeout_str = '--timeout {}'.format(timeout)
+ timeout_str = "--timeout {}".format(timeout)
# atomic
- atomic_str = ''
+ atomic_str = ""
if atomic:
- atomic_str = '--atomic'
+ atomic_str = "--atomic"
# version
- version_str = ''
- if kdu_model and ':' in kdu_model:
- parts = kdu_model.split(sep=':')
+ version_str = ""
+ if kdu_model and ":" in kdu_model:
+ parts = kdu_model.split(sep=":")
if len(parts) == 2:
- version_str = '--version {}'.format(parts[1])
+ version_str = "--version {}".format(parts[1])
kdu_model = parts[0]
# helm repo upgrade
- command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
- .format(self._helm_command, atomic_str, config_filename, helm_dir,
- params_str, timeout_str, kdu_instance, kdu_model, version_str)
- self.debug('upgrading: {}'.format(command))
+ command = (
+ "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
+ ).format(
+ self._helm_command,
+ atomic_str,
+ config_filename,
+ helm_dir,
+ params_str,
+ timeout_str,
+ kdu_instance,
+ kdu_model,
+ version_str,
+ )
+ self.log.debug("upgrading: {}".format(command))
if atomic:
# exec helm in a task
exec_task = asyncio.ensure_future(
- coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
+ coro_or_future=self._local_async_exec(
+ command=command, raise_exception_on_error=False
+ )
)
# write status in another task
status_task = asyncio.ensure_future(
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
db_dict=db_dict,
- operation='upgrade',
- run_once=False
+ operation="upgrade",
+ run_once=False,
)
)
# wait for execution task
- await asyncio.wait([ exec_task ])
+ await asyncio.wait([exec_task])
# cancel status task
status_task.cancel()
else:
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+ output, rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False
+ )
# remove temporal values yaml file
if file_to_delete:
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
db_dict=db_dict,
- operation='upgrade',
+ operation="upgrade",
run_once=True,
- check_every=0
+ check_every=0,
)
if rc != 0:
- msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
- self.error(msg)
- raise Exception(msg)
+ msg = "Error executing command: {}\nOutput: {}".format(command, output)
+ self.log.error(msg)
+ raise K8sException(msg)
# return new revision number
- instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+ instance = await self.get_instance_info(
+ cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+ )
if instance:
- revision = int(instance.get('Revision'))
- self.debug('New revision: {}'.format(revision))
+ revision = int(instance.get("Revision"))
+ self.log.debug("New revision: {}".format(revision))
return revision
else:
return 0
async def rollback(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- revision=0,
- db_dict: dict = None
+ self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
):
- self.debug('rollback kdu_instance {} to revision {} from cluster {}'
- .format(kdu_instance, revision, cluster_uuid))
+ self.log.debug(
+ "rollback kdu_instance {} to revision {} from cluster {}".format(
+ kdu_instance, revision, cluster_uuid
+ )
+ )
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
- command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
- .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
+ command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
+ self._helm_command, config_filename, helm_dir, kdu_instance, revision
+ )
# exec helm in a task
exec_task = asyncio.ensure_future(
- coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
+ coro_or_future=self._local_async_exec(
+ command=command, raise_exception_on_error=False
+ )
)
# write status in another task
status_task = asyncio.ensure_future(
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
db_dict=db_dict,
- operation='rollback',
- run_once=False
+ operation="rollback",
+ run_once=False,
)
)
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
db_dict=db_dict,
- operation='rollback',
+ operation="rollback",
run_once=True,
- check_every=0
+ check_every=0,
)
if rc != 0:
- msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
- self.error(msg)
- raise Exception(msg)
+ msg = "Error executing command: {}\nOutput: {}".format(command, output)
+ self.log.error(msg)
+ raise K8sException(msg)
# return new revision number
- instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+ instance = await self.get_instance_info(
+ cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+ )
if instance:
- revision = int(instance.get('Revision'))
- self.debug('New revision: {}'.format(revision))
+ revision = int(instance.get("Revision"))
+ self.log.debug("New revision: {}".format(revision))
return revision
else:
return 0
- async def uninstall(
- self,
- cluster_uuid: str,
- kdu_instance: str
- ):
+ async def uninstall(self, cluster_uuid: str, kdu_instance: str):
"""
- Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
- after all _terminate-config-primitive_ of the VNF are invoked).
+ Removes an existing KDU instance. It would implicitly use the `delete` call
+ (this call would happen after all _terminate-config-primitive_ of the VNF
+ are invoked).
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_instance: unique name for the KDU instance to be deleted
:return: True if successful
"""
- self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
+ self.log.debug(
+ "uninstall kdu_instance {} from cluster {}".format(
+ kdu_instance, cluster_uuid
+ )
+ )
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
- command = '{} --kubeconfig={} --home={} delete --purge {}'\
- .format(self._helm_command, config_filename, helm_dir, kdu_instance)
+ command = "{} --kubeconfig={} --home={} delete --purge {}".format(
+ self._helm_command, config_filename, helm_dir, kdu_instance
+ )
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
return self._output_to_table(output)
- async def inspect_kdu(
- self,
- kdu_model: str
+ async def exec_primitive(
+ self,
+ cluster_uuid: str = None,
+ kdu_instance: str = None,
+ primitive_name: str = None,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
) -> str:
+ """Exec primitive (Juju action)
- self.debug('inspect kdu_model {}'.format(kdu_model))
+ :param cluster_uuid str: The UUID of the cluster
+ :param kdu_instance str: The unique name of the KDU instance
+ :param primitive_name: Name of action that will be executed
+ :param timeout: Timeout for action execution
+ :param params: Dictionary of all the parameters needed for the action
+ :db_dict: Dictionary for any additional data
- command = '{} inspect values {}'\
- .format(self._helm_command, kdu_model)
+ :return: Returns the output of the action
+ """
+ raise K8sException(
+ "KDUs deployed with Helm don't support actions "
+ "different from rollback, upgrade and status"
+ )
- output, rc = await self._local_async_exec(command=command)
+ async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
- return output
+ self.log.debug(
+ "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
+ )
- async def help_kdu(
- self,
- kdu_model: str
- ):
+ return await self._exec_inspect_comand(
+ inspect_command="", kdu_model=kdu_model, repo_url=repo_url
+ )
- self.debug('help kdu_model {}'.format(kdu_model))
+ async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
- command = '{} inspect readme {}'\
- .format(self._helm_command, kdu_model)
+ self.log.debug(
+ "inspect kdu_model values {} from (optional) repo: {}".format(
+ kdu_model, repo_url
+ )
+ )
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ return await self._exec_inspect_comand(
+ inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
+ )
- return output
+ async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
- async def status_kdu(
- self,
- cluster_uuid: str,
- kdu_instance: str
- ):
+ self.log.debug(
+ "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
+ )
- return await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, show_error_log=True)
+ return await self._exec_inspect_comand(
+ inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
+ )
+ async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
+
+ # call internal function
+ return await self._status_kdu(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ show_error_log=True,
+ return_text=True,
+ )
+
+ async def synchronize_repos(self, cluster_uuid: str):
+
+ self.log.debug("syncronize repos for cluster helm-id: {}",)
+ try:
+ update_repos_timeout = (
+ 300 # max timeout to sync a single repos, more than this is too much
+ )
+ db_k8scluster = self.db.get_one(
+ "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
+ )
+ if db_k8scluster:
+ nbi_repo_list = (
+ db_k8scluster.get("_admin").get("helm_chart_repos") or []
+ )
+ cluster_repo_dict = (
+ db_k8scluster.get("_admin").get("helm_charts_added") or {}
+ )
+ # elements that must be deleted
+ deleted_repo_list = []
+ added_repo_dict = {}
+ self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
+ self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
+
+ # obtain repos to add: registered by nbi but not added
+ repos_to_add = [
+ repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
+ ]
+
+ # obtain repos to delete: added by cluster but not in nbi list
+ repos_to_delete = [
+ repo
+ for repo in cluster_repo_dict.keys()
+ if repo not in nbi_repo_list
+ ]
+
+ # delete repos: must delete first then add because there may be
+ # different repos with same name but
+ # different id and url
+ self.log.debug("repos to delete: {}".format(repos_to_delete))
+ for repo_id in repos_to_delete:
+ # try to delete repos
+ try:
+ repo_delete_task = asyncio.ensure_future(
+ self.repo_remove(
+ cluster_uuid=cluster_uuid,
+ name=cluster_repo_dict[repo_id],
+ )
+ )
+ await asyncio.wait_for(repo_delete_task, update_repos_timeout)
+ except Exception as e:
+ self.warning(
+ "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
+ repo_id, cluster_repo_dict[repo_id], str(e)
+ )
+ )
+ # always add to the list of to_delete if there is an error
+ # because if is not there
+ # deleting raises error
+ deleted_repo_list.append(repo_id)
+
+ # add repos
+ self.log.debug("repos to add: {}".format(repos_to_add))
+ for repo_id in repos_to_add:
+ # obtain the repo data from the db
+ # if there is an error getting the repo in the database we will
+ # ignore this repo and continue
+ # because there is a possible race condition where the repo has
+ # been deleted while processing
+ db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
+ self.log.debug(
+ "obtained repo: id, {}, name: {}, url: {}".format(
+ repo_id, db_repo["name"], db_repo["url"]
+ )
+ )
+ try:
+ repo_add_task = asyncio.ensure_future(
+ self.repo_add(
+ cluster_uuid=cluster_uuid,
+ name=db_repo["name"],
+ url=db_repo["url"],
+ repo_type="chart",
+ )
+ )
+ await asyncio.wait_for(repo_add_task, update_repos_timeout)
+ added_repo_dict[repo_id] = db_repo["name"]
+ self.log.debug(
+ "added repo: id, {}, name: {}".format(
+ repo_id, db_repo["name"]
+ )
+ )
+ except Exception as e:
+ # deal with error adding repo, adding a repo that already
+ # exists does not raise any error
+ # will not raise error because a wrong repos added by
+ # anyone could prevent instantiating any ns
+ self.log.error(
+ "Error adding repo id: {}, err_msg: {} ".format(
+ repo_id, repr(e)
+ )
+ )
+
+ return deleted_repo_list, added_repo_dict
+
+ else: # else db_k8scluster does not exist
+ raise K8sException(
+ "k8cluster with helm-id : {} not found".format(cluster_uuid)
+ )
+
+ except Exception as e:
+ self.log.error("Error synchronizing repos: {}".format(str(e)))
+ raise K8sException("Error synchronizing repos")
"""
- ##################################################################################################
- ########################################## P R I V A T E #########################################
- ##################################################################################################
+ ####################################################################################
+ ################################### P R I V A T E ##################################
+ ####################################################################################
"""
+ async def _exec_inspect_comand(
+ self, inspect_command: str, kdu_model: str, repo_url: str = None
+ ):
+
+ repo_str = ""
+ if repo_url:
+ repo_str = " --repo {}".format(repo_url)
+ idx = kdu_model.find("/")
+ if idx >= 0:
+ idx += 1
+ kdu_model = kdu_model[idx:]
+
+ inspect_command = "{} inspect {} {}{}".format(
+ self._helm_command, inspect_command, kdu_model, repo_str
+ )
+ output, _rc = await self._local_async_exec(
+ command=inspect_command, encode_utf8=True
+ )
+
+ return output
+
async def _status_kdu(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- show_error_log: bool = False
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ show_error_log: bool = False,
+ return_text: bool = False,
):
- self.debug('status of kdu_instance {}'.format(kdu_instance))
+ self.log.debug("status of kdu_instance {}".format(kdu_instance))
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
- command = '{} --kubeconfig={} --home={} status {} --output yaml'\
- .format(self._helm_command, config_filename, helm_dir, kdu_instance)
+ command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
+ self._helm_command, config_filename, helm_dir, kdu_instance
+ )
output, rc = await self._local_async_exec(
command=command,
raise_exception_on_error=True,
- show_error_log=show_error_log
+ show_error_log=show_error_log,
)
+ if return_text:
+ return str(output)
+
if rc != 0:
return None
# remove field 'notes'
try:
- del data.get('info').get('status')['notes']
+ del data.get("info").get("status")["notes"]
except KeyError:
pass
# parse field 'resources'
try:
- resources = str(data.get('info').get('status').get('resources'))
+ resources = str(data.get("info").get("status").get("resources"))
resource_table = self._output_to_table(resources)
- data.get('info').get('status')['resources'] = resource_table
- except Exception as e:
+ data.get("info").get("status")["resources"] = resource_table
+ except Exception:
pass
return data
- async def get_instance_info(
- self,
- cluster_uuid: str,
- kdu_instance: str
- ):
+ async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
instances = await self.instances_list(cluster_uuid=cluster_uuid)
for instance in instances:
- if instance.get('Name') == kdu_instance:
+ if instance.get("Name") == kdu_instance:
return instance
- self.debug('Instance {} not found'.format(kdu_instance))
+ self.log.debug("Instance {} not found".format(kdu_instance))
return None
@staticmethod
- def _generate_release_name(
- chart_name: str
- ):
- name = ''
+ def _generate_release_name(chart_name: str):
+ # check embeded chart (file or dir)
+ if chart_name.startswith("/"):
+ # extract file or directory name
+ chart_name = chart_name[chart_name.rfind("/") + 1 :]
+ # check URL
+ elif "://" in chart_name:
+ # extract last portion of URL
+ chart_name = chart_name[chart_name.rfind("/") + 1 :]
+
+ name = ""
for c in chart_name:
if c.isalpha() or c.isnumeric():
name += c
else:
- name += '-'
+ name += "-"
if len(name) > 35:
name = name[0:35]
# if does not start with alpha character, prefix 'a'
if not name[0].isalpha():
- name = 'a' + name
+ name = "a" + name
- name += '-'
+ name += "-"
def get_random_number():
r = random.randrange(start=1, stop=99999999)
s = str(r)
- s = s.rjust(width=10, fillchar=' ')
+ s = s.rjust(10, "0")
return s
name = name + get_random_number()
return name.lower()
async def _store_status(
- self,
- cluster_uuid: str,
- operation: str,
- kdu_instance: str,
- check_every: float = 10,
- db_dict: dict = None,
- run_once: bool = False
+ self,
+ cluster_uuid: str,
+ operation: str,
+ kdu_instance: str,
+ check_every: float = 10,
+ db_dict: dict = None,
+ run_once: bool = False,
):
while True:
try:
await asyncio.sleep(check_every)
- detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
- status = detailed_status.get('info').get('Description')
- print('=' * 60)
- self.debug('STATUS:\n{}'.format(status))
- self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
- print('=' * 60)
+ detailed_status = await self.status_kdu(
+ cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+ )
+ status = detailed_status.get("info").get("Description")
+ self.log.debug("STATUS:\n{}".format(status))
+ self.log.debug("DETAILED STATUS:\n{}".format(detailed_status))
# write status to db
result = await self.write_app_status_to_db(
db_dict=db_dict,
status=str(status),
detailed_status=str(detailed_status),
- operation=operation)
+ operation=operation,
+ )
if not result:
- self.info('Error writing in database. Task exiting...')
+ self.log.info("Error writing in database. Task exiting...")
return
except asyncio.CancelledError:
- self.debug('Task cancelled')
+ self.log.debug("Task cancelled")
return
except Exception as e:
+ self.log.debug("_store_status exception: {}".format(str(e)))
pass
finally:
if run_once:
return
- async def _is_install_completed(
- self,
- cluster_uuid: str,
- kdu_instance: str
- ) -> bool:
+ async def _is_install_completed(self, cluster_uuid: str, kdu_instance: str) -> bool:
- status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+ status = await self._status_kdu(
+ cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False
+ )
# extract info.status.resources-> str
# format:
# halting-horse-mongodb 0/1 1 0 0s
# halting-petit-mongodb 1/1 1 0 0s
# blank line
- resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
+ resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
# convert to table
resources = K8sHelmConnector._output_to_table(resources)
line1 = resources[index]
index += 1
# find '==>' in column 0
- if line1[0] == '==>':
+ if line1[0] == "==>":
line2 = resources[index]
index += 1
# find READY in column 1
- if line2[1] == 'READY':
+ if line2[1] == "READY":
# read next lines
line3 = resources[index]
index += 1
while len(line3) > 1 and index < num_lines:
ready_value = line3[1]
- parts = ready_value.split(sep='/')
+ parts = ready_value.split(sep="/")
current = int(parts[0])
total = int(parts[1])
if current < total:
- self.debug('NOT READY:\n {}'.format(line3))
+ self.log.debug("NOT READY:\n {}".format(line3))
ready = False
line3 = resources[index]
index += 1
- except Exception as e:
+ except Exception:
pass
return ready
return None
else:
target = value
- except Exception as e:
+ except Exception:
pass
return value
def _find_in_lines(p_lines: list, p_key: str) -> str:
for line in p_lines:
try:
- if line.startswith(p_key + ':'):
- parts = line.split(':')
+ if line.startswith(p_key + ":"):
+ parts = line.split(":")
the_value = parts[1].strip()
return the_value
- except Exception as e:
+ except Exception:
# ignore it
pass
return None
# params for use in -f file
# returns values file option and filename (in order to delete it at the end)
def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
- params_str = ''
if params and len(params) > 0:
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
def get_random_number():
r = random.randrange(start=1, stop=99999999)
s = str(r)
while len(s) < 10:
- s = '0' + s
+ s = "0" + s
return s
params2 = dict()
for key in params:
value = params.get(key)
- if '!!yaml' in str(value):
- value = yaml.load(value[7:], Loader=yaml.SafeLoader)
+ if "!!yaml" in str(value):
+ value = yaml.load(value[7:])
params2[key] = value
- values_file = get_random_number() + '.yaml'
- with open(values_file, 'w') as stream:
+ values_file = get_random_number() + ".yaml"
+ with open(values_file, "w") as stream:
yaml.dump(params2, stream, indent=4, default_flow_style=False)
- return '-f {}'.format(values_file), values_file
+ return "-f {}".format(values_file), values_file
- return '', None
+ return "", None
# params for use in --set option
@staticmethod
def _params_to_set_option(params: dict) -> str:
- params_str = ''
+ params_str = ""
if params and len(params) > 0:
start = True
for key in params:
value = params.get(key, None)
if value is not None:
if start:
- params_str += '--set '
+ params_str += "--set "
start = False
else:
- params_str += ','
- params_str += '{}={}'.format(key, value)
+ params_str += ","
+ params_str += "{}={}".format(key, value)
return params_str
@staticmethod
output_table = list()
lines = output.splitlines(keepends=False)
for line in lines:
- line = line.replace('\t', ' ')
+ line = line.replace("\t", " ")
line_list = list()
output_table.append(line_list)
- cells = line.split(sep=' ')
+ cells = line.split(sep=" ")
for cell in cells:
cell = cell.strip()
if len(cell) > 0:
line_list.append(cell)
return output_table
- def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
+ def _get_paths(
+ self, cluster_name: str, create_if_not_exist: bool = False
+ ) -> (str, str, str, str):
"""
Returns kube and helm directories
base = base[:-1]
# base dir for cluster
- cluster_dir = base + '/' + cluster_name
+ cluster_dir = base + "/" + cluster_name
if create_if_not_exist and not os.path.exists(cluster_dir):
- self.debug('Creating dir {}'.format(cluster_dir))
+ self.log.debug("Creating dir {}".format(cluster_dir))
os.makedirs(cluster_dir)
if not os.path.exists(cluster_dir):
- msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
- self.error(msg)
- raise Exception(msg)
+ msg = "Base cluster dir {} does not exist".format(cluster_dir)
+ self.log.error(msg)
+ raise K8sException(msg)
# kube dir
- kube_dir = cluster_dir + '/' + '.kube'
+ kube_dir = cluster_dir + "/" + ".kube"
if create_if_not_exist and not os.path.exists(kube_dir):
- self.debug('Creating dir {}'.format(kube_dir))
+ self.log.debug("Creating dir {}".format(kube_dir))
os.makedirs(kube_dir)
if not os.path.exists(kube_dir):
- msg = 'Kube config dir {} does not exist'.format(kube_dir)
- self.error(msg)
- raise Exception(msg)
+ msg = "Kube config dir {} does not exist".format(kube_dir)
+ self.log.error(msg)
+ raise K8sException(msg)
# helm home dir
- helm_dir = cluster_dir + '/' + '.helm'
+ helm_dir = cluster_dir + "/" + ".helm"
if create_if_not_exist and not os.path.exists(helm_dir):
- self.debug('Creating dir {}'.format(helm_dir))
+ self.log.debug("Creating dir {}".format(helm_dir))
os.makedirs(helm_dir)
if not os.path.exists(helm_dir):
- msg = 'Helm config dir {} does not exist'.format(helm_dir)
- self.error(msg)
- raise Exception(msg)
+ msg = "Helm config dir {} does not exist".format(helm_dir)
+ self.log.error(msg)
+ raise K8sException(msg)
- config_filename = kube_dir + '/config'
+ config_filename = kube_dir + "/config"
return kube_dir, helm_dir, config_filename, cluster_dir
@staticmethod
- def _remove_multiple_spaces(str):
- str = str.strip()
- while ' ' in str:
- str = str.replace(' ', ' ')
- return str
-
- def _local_exec(
- self,
- command: str
- ) -> (str, int):
+ def _remove_multiple_spaces(strobj):
+ strobj = strobj.strip()
+ while " " in strobj:
+ strobj = strobj.replace(" ", " ")
+ return strobj
+
+ def _local_exec(self, command: str) -> (str, int):
command = K8sHelmConnector._remove_multiple_spaces(command)
- self.debug('Executing sync local command: {}'.format(command))
+ self.log.debug("Executing sync local command: {}".format(command))
# raise exception if fails
- output = ''
+ output = ""
try:
- output = subprocess.check_output(command, shell=True, universal_newlines=True)
+ output = subprocess.check_output(
+ command, shell=True, universal_newlines=True
+ )
return_code = 0
- self.debug(output)
- except Exception as e:
+ self.log.debug(output)
+ except Exception:
return_code = 1
return output, return_code
async def _local_async_exec(
- self,
- command: str,
- raise_exception_on_error: bool = False,
- show_error_log: bool = True
+ self,
+ command: str,
+ raise_exception_on_error: bool = False,
+ show_error_log: bool = True,
+ encode_utf8: bool = False,
) -> (str, int):
command = K8sHelmConnector._remove_multiple_spaces(command)
- self.debug('Executing async local command: {}'.format(command))
+ self.log.debug("Executing async local command: {}".format(command))
# split command
- command = command.split(sep=' ')
+ command = command.split(sep=" ")
try:
process = await asyncio.create_subprocess_exec(
- *command,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE
+ *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
# wait for command terminate
return_code = process.returncode
- output = ''
+ output = ""
if stdout:
- output = stdout.decode('utf-8').strip()
+ output = stdout.decode("utf-8").strip()
+ # output = stdout.decode()
if stderr:
- output = stderr.decode('utf-8').strip()
+ output = stderr.decode("utf-8").strip()
+ # output = stderr.decode()
if return_code != 0 and show_error_log:
- self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
+ self.log.debug(
+ "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
+ )
else:
- self.debug('Return code: {}'.format(return_code))
+ self.log.debug("Return code: {}".format(return_code))
if raise_exception_on_error and return_code != 0:
- raise Exception(output)
+ raise K8sException(output)
+
+ if encode_utf8:
+ output = output.encode("utf-8").strip()
+ output = str(output).replace("\\n", "\n")
return output, return_code
+ except asyncio.CancelledError:
+ raise
+ except K8sException:
+ raise
except Exception as e:
- msg = 'Exception executing command: {} -> {}'.format(command, e)
- if show_error_log:
- self.error(msg)
- return '', -1
-
- def _remote_exec(
- self,
- hostname: str,
- username: str,
- password: str,
- command: str,
- timeout: int = 10
- ) -> (str, int):
-
- command = K8sHelmConnector._remove_multiple_spaces(command)
- self.debug('Executing sync remote ssh command: {}'.format(command))
-
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.connect(hostname=hostname, username=username, password=password)
- ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
- output = ssh_stdout.read().decode('utf-8')
- error = ssh_stderr.read().decode('utf-8')
- if error:
- self.error('ERROR: {}'.format(error))
- return_code = 1
- else:
- return_code = 0
- output = output.replace('\\n', '\n')
- self.debug('OUTPUT: {}'.format(output))
-
- return output, return_code
+ msg = "Exception executing command: {} -> {}".format(command, e)
+ self.log.error(msg)
+ if raise_exception_on_error:
+ raise K8sException(e) from e
+ else:
+ return "", -1
def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
- self.debug('Checking if file {} exists...'.format(filename))
+ # self.log.debug('Checking if file {} exists...'.format(filename))
if os.path.exists(filename):
return True
else:
- msg = 'File {} does not exist'.format(filename)
+ msg = "File {} does not exist".format(filename)
if exception_if_not_exists:
- self.error(msg)
- raise Exception(msg)
-
+ # self.log.error(msg)
+ raise K8sException(msg)