Enable lint, flake8 and unit tests
Cleans up non pep compliant code.
Adds a simple unit test.
Formats according to black.
Tox automatically runs lint, flake8 and unit test suite
with coverage. To run each individually, execute:
tox -e pylint
tox -e black
tox -e flake8
tox -e cover
Note that these are all run for each patch via Jenkins. The full
tox suite should be run locally before any commit to ensure it
will not fail in Jenkins.
Change-Id: I2f87abe3d5086d6d65ac33a27780c498fc7b1cd3
Signed-off-by: beierlm <mark.beierl@canonical.com>
diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py
index d3fbed6..fdfc443 100644
--- a/n2vc/k8s_helm_conn.py
+++ b/n2vc/k8s_helm_conn.py
@@ -20,34 +20,35 @@
# contact with: nfvlabs@tid.es
##
-import subprocess
-import os
-import shutil
import asyncio
-import time
-import yaml
-from uuid import uuid4
+import os
import random
-from n2vc.k8s_conn import K8sConnector
+import shutil
+import subprocess
+import time
+from uuid import uuid4
+
from n2vc.exceptions import K8sException
+from n2vc.k8s_conn import K8sConnector
+import yaml
class K8sHelmConnector(K8sConnector):
"""
- ##################################################################################################
- ########################################## P U B L I C ###########################################
- ##################################################################################################
+ ####################################################################################
+ ################################### P U B L I C ####################################
+ ####################################################################################
"""
def __init__(
- self,
- fs: object,
- db: object,
- kubectl_command: str = '/usr/bin/kubectl',
- helm_command: str = '/usr/bin/helm',
- log: object = None,
- on_update_db=None
+ self,
+ fs: object,
+ db: object,
+ kubectl_command: str = "/usr/bin/kubectl",
+ helm_command: str = "/usr/bin/helm",
+ log: object = None,
+ on_update_db=None,
):
"""
@@ -60,14 +61,9 @@
"""
# parent class
- K8sConnector.__init__(
- self,
- db=db,
- log=log,
- on_update_db=on_update_db
- )
+ K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
- self.log.info('Initializing K8S Helm connector')
+ self.log.info("Initializing K8S Helm connector")
# random numbers for release name generation
random.seed(time.time())
@@ -84,32 +80,37 @@
self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
# initialize helm client-only
- self.log.debug('Initializing helm client-only...')
- command = '{} init --client-only'.format(self._helm_command)
+ self.log.debug("Initializing helm client-only...")
+ command = "{} init --client-only".format(self._helm_command)
try:
- asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False))
+ asyncio.ensure_future(
+ self._local_async_exec(command=command, raise_exception_on_error=False)
+ )
# loop = asyncio.get_event_loop()
- # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False))
+ # loop.run_until_complete(self._local_async_exec(command=command,
+ # raise_exception_on_error=False))
except Exception as e:
- self.warning(msg='helm init failed (it was already initialized): {}'.format(e))
+ self.warning(
+ msg="helm init failed (it was already initialized): {}".format(e)
+ )
- self.log.info('K8S Helm connector initialized')
+ self.log.info("K8S Helm connector initialized")
async def init_env(
- self,
- k8s_creds: str,
- namespace: str = 'kube-system',
- reuse_cluster_uuid=None
+ self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
) -> (str, bool):
"""
It prepares a given K8s cluster environment to run Charts on both sides:
client (OSM)
server (Tiller)
- :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
- :param namespace: optional namespace to be used for helm. By default, 'kube-system' will be used
+ :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
+ '.kube/config'
+ :param namespace: optional namespace to be used for helm. By default,
+ 'kube-system' will be used
:param reuse_cluster_uuid: existing cluster uuid for reuse
- :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
+ :return: uuid of the K8s cluster and True if connector has installed some
+ software in the cluster
(on error, an exception will be raised)
"""
@@ -117,19 +118,23 @@
if not cluster_uuid:
cluster_uuid = str(uuid4())
- self.log.debug('Initializing K8S environment. namespace: {}'.format(namespace))
+ self.log.debug("Initializing K8S environment. namespace: {}".format(namespace))
# create config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
f = open(config_filename, "w")
f.write(k8s_creds)
f.close()
# check if tiller pod is up in cluster
- command = '{} --kubeconfig={} --namespace={} get deployments'\
- .format(self.kubectl_command, config_filename, namespace)
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ command = "{} --kubeconfig={} --namespace={} get deployments".format(
+ self.kubectl_command, config_filename, namespace
+ )
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
output_table = K8sHelmConnector._output_to_table(output=output)
@@ -137,90 +142,98 @@
already_initialized = False
try:
for row in output_table:
- if row[0].startswith('tiller-deploy'):
+ if row[0].startswith("tiller-deploy"):
already_initialized = True
break
- except Exception as e:
+ except Exception:
pass
# helm init
n2vc_installed_sw = False
if not already_initialized:
- self.log.info('Initializing helm in client and server: {}'.format(cluster_uuid))
- command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
- .format(self._helm_command, config_filename, namespace, helm_dir)
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ self.log.info(
+ "Initializing helm in client and server: {}".format(cluster_uuid)
+ )
+ command = "{} --kubeconfig={} --tiller-namespace={} --home={} init".format(
+ self._helm_command, config_filename, namespace, helm_dir
+ )
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
n2vc_installed_sw = True
else:
# check client helm installation
- check_file = helm_dir + '/repository/repositories.yaml'
- if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
- self.log.info('Initializing helm in client: {}'.format(cluster_uuid))
- command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
- .format(self._helm_command, config_filename, namespace, helm_dir)
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ check_file = helm_dir + "/repository/repositories.yaml"
+ if not self._check_file_exists(
+ filename=check_file, exception_if_not_exists=False
+ ):
+ self.log.info("Initializing helm in client: {}".format(cluster_uuid))
+ command = (
+ "{} --kubeconfig={} --tiller-namespace={} "
+ "--home={} init --client-only"
+ ).format(self._helm_command, config_filename, namespace, helm_dir)
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
else:
- self.log.info('Helm client already initialized')
+ self.log.info("Helm client already initialized")
- self.log.info('Cluster initialized {}'.format(cluster_uuid))
+ self.log.info("Cluster initialized {}".format(cluster_uuid))
return cluster_uuid, n2vc_installed_sw
async def repo_add(
- self,
- cluster_uuid: str,
- name: str,
- url: str,
- repo_type: str = 'chart'
+ self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
):
- self.log.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
+ self.log.debug("adding {} repository {}. URL: {}".format(repo_type, name, url))
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
# helm repo update
- command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
- self.log.debug('updating repo: {}'.format(command))
+ command = "{} --kubeconfig={} --home={} repo update".format(
+ self._helm_command, config_filename, helm_dir
+ )
+ self.log.debug("updating repo: {}".format(command))
await self._local_async_exec(command=command, raise_exception_on_error=False)
# helm repo add name url
- command = '{} --kubeconfig={} --home={} repo add {} {}'\
- .format(self._helm_command, config_filename, helm_dir, name, url)
- self.log.debug('adding repo: {}'.format(command))
+ command = "{} --kubeconfig={} --home={} repo add {} {}".format(
+ self._helm_command, config_filename, helm_dir, name, url
+ )
+ self.log.debug("adding repo: {}".format(command))
await self._local_async_exec(command=command, raise_exception_on_error=True)
- async def repo_list(
- self,
- cluster_uuid: str
- ) -> list:
+ async def repo_list(self, cluster_uuid: str) -> list:
"""
Get the list of registered repositories
:return: list of registered repositories: [ (name, url) .... ]
"""
- self.log.debug('list repositories for cluster {}'.format(cluster_uuid))
+ self.log.debug("list repositories for cluster {}".format(cluster_uuid))
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
- command = '{} --kubeconfig={} --home={} repo list --output yaml'\
- .format(self._helm_command, config_filename, helm_dir)
+ command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
+ self._helm_command, config_filename, helm_dir
+ )
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
if output and len(output) > 0:
return yaml.load(output, Loader=yaml.SafeLoader)
else:
return []
- async def repo_remove(
- self,
- cluster_uuid: str,
- name: str
- ):
+ async def repo_remove(self, cluster_uuid: str, name: str):
"""
Remove a repository from OSM
@@ -229,29 +242,31 @@
:return: True if successful
"""
- self.log.debug('list repositories for cluster {}'.format(cluster_uuid))
+ self.log.debug("list repositories for cluster {}".format(cluster_uuid))
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
- command = '{} --kubeconfig={} --home={} repo remove {}'\
- .format(self._helm_command, config_filename, helm_dir, name)
+ command = "{} --kubeconfig={} --home={} repo remove {}".format(
+ self._helm_command, config_filename, helm_dir, name
+ )
await self._local_async_exec(command=command, raise_exception_on_error=True)
async def reset(
- self,
- cluster_uuid: str,
- force: bool = False,
- uninstall_sw: bool = False
+ self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
) -> bool:
- self.log.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
+ self.log.debug(
+ "Resetting K8s environment. cluster uuid: {}".format(cluster_uuid)
+ )
# get kube and helm directories
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=False
+ )
# uninstall releases if needed
releases = await self.instances_list(cluster_uuid=cluster_uuid)
@@ -259,107 +274,134 @@
if force:
for r in releases:
try:
- kdu_instance = r.get('Name')
- chart = r.get('Chart')
- self.log.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
- await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+ kdu_instance = r.get("Name")
+ chart = r.get("Chart")
+ self.log.debug(
+ "Uninstalling {} -> {}".format(chart, kdu_instance)
+ )
+ await self.uninstall(
+ cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+ )
except Exception as e:
- self.log.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
+ self.log.error(
+ "Error uninstalling release {}: {}".format(kdu_instance, e)
+ )
else:
- msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
- .format(cluster_uuid)
+ msg = (
+ "Cluster has releases and not force. Cannot reset K8s "
+ "environment. Cluster uuid: {}"
+ ).format(cluster_uuid)
self.log.error(msg)
raise K8sException(msg)
if uninstall_sw:
- self.log.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
+ self.log.debug("Uninstalling tiller from cluster {}".format(cluster_uuid))
# find namespace for tiller pod
- command = '{} --kubeconfig={} get deployments --all-namespaces'\
- .format(self.kubectl_command, config_filename)
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+ command = "{} --kubeconfig={} get deployments --all-namespaces".format(
+ self.kubectl_command, config_filename
+ )
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False
+ )
output_table = K8sHelmConnector._output_to_table(output=output)
namespace = None
for r in output_table:
try:
- if 'tiller-deploy' in r[1]:
+ if "tiller-deploy" in r[1]:
namespace = r[0]
break
- except Exception as e:
+ except Exception:
pass
else:
- msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
+ msg = "Tiller deployment not found in cluster {}".format(cluster_uuid)
self.log.error(msg)
- self.log.debug('namespace for tiller: {}'.format(namespace))
+ self.log.debug("namespace for tiller: {}".format(namespace))
- force_str = '--force'
+ force_str = "--force"
if namespace:
# delete tiller deployment
- self.log.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
- command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
- .format(self.kubectl_command, namespace, config_filename, force_str)
- await self._local_async_exec(command=command, raise_exception_on_error=False)
+ self.log.debug(
+ "Deleting tiller deployment for cluster {}, namespace {}".format(
+ cluster_uuid, namespace
+ )
+ )
+ command = (
+ "{} --namespace {} --kubeconfig={} {} delete deployment "
+ "tiller-deploy"
+ ).format(self.kubectl_command, namespace, config_filename, force_str)
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=False
+ )
# uninstall tiller from cluster
- self.log.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
- command = '{} --kubeconfig={} --home={} reset'\
- .format(self._helm_command, config_filename, helm_dir)
- self.log.debug('resetting: {}'.format(command))
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ self.log.debug(
+ "Uninstalling tiller from cluster {}".format(cluster_uuid)
+ )
+ command = "{} --kubeconfig={} --home={} reset".format(
+ self._helm_command, config_filename, helm_dir
+ )
+ self.log.debug("resetting: {}".format(command))
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
else:
- self.log.debug('namespace not found')
+ self.log.debug("namespace not found")
# delete cluster directory
- dir = self.fs.path + '/' + cluster_uuid
- self.log.debug('Removing directory {}'.format(dir))
- shutil.rmtree(dir, ignore_errors=True)
+ direct = self.fs.path + "/" + cluster_uuid
+ self.log.debug("Removing directory {}".format(direct))
+ shutil.rmtree(direct, ignore_errors=True)
return True
async def install(
- self,
- cluster_uuid: str,
- kdu_model: str,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None,
- kdu_name: str = None,
- namespace: str = None
+ self,
+ cluster_uuid: str,
+ kdu_model: str,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ kdu_name: str = None,
+ namespace: str = None,
):
- self.log.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
+ self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
# params to str
# params_str = K8sHelmConnector._params_to_set_option(params)
- params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
+ params_str, file_to_delete = self._params_to_file_option(
+ cluster_uuid=cluster_uuid, params=params
+ )
- timeout_str = ''
+ timeout_str = ""
if timeout:
- timeout_str = '--timeout {}'.format(timeout)
+ timeout_str = "--timeout {}".format(timeout)
# atomic
- atomic_str = ''
+ atomic_str = ""
if atomic:
- atomic_str = '--atomic'
+ atomic_str = "--atomic"
# namespace
- namespace_str = ''
+ namespace_str = ""
if namespace:
namespace_str = "--namespace {}".format(namespace)
# version
- version_str = ''
- if ':' in kdu_model:
- parts = kdu_model.split(sep=':')
+ version_str = ""
+ if ":" in kdu_model:
+ parts = kdu_model.split(sep=":")
if len(parts) == 2:
- version_str = '--version {}'.format(parts[1])
+ version_str = "--version {}".format(parts[1])
kdu_model = parts[0]
# generate a name for the release. Then, check if already exists
@@ -370,7 +412,7 @@
result = await self._status_kdu(
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
- show_error_log=False
+ show_error_log=False,
)
if result is not None:
# instance already exists: generate a new one
@@ -379,17 +421,29 @@
pass
# helm repo install
- command = '{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} {params} {timeout} ' \
- '--name={name} {ns} {model} {ver}'.format(helm=self._helm_command, atomic=atomic_str,
- config=config_filename, dir=helm_dir, params=params_str,
- timeout=timeout_str, name=kdu_instance, ns=namespace_str,
- model=kdu_model, ver=version_str)
- self.log.debug('installing: {}'.format(command))
+ command = (
+ "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} "
+ "{params} {timeout} --name={name} {ns} {model} {ver}".format(
+ helm=self._helm_command,
+ atomic=atomic_str,
+ config=config_filename,
+ dir=helm_dir,
+ params=params_str,
+ timeout=timeout_str,
+ name=kdu_instance,
+ ns=namespace_str,
+ model=kdu_model,
+ ver=version_str,
+ )
+ )
+ self.log.debug("installing: {}".format(command))
if atomic:
# exec helm in a task
exec_task = asyncio.ensure_future(
- coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
+ coro_or_future=self._local_async_exec(
+ command=command, raise_exception_on_error=False
+ )
)
# write status in another task
@@ -398,8 +452,8 @@
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
db_dict=db_dict,
- operation='install',
- run_once=False
+ operation="install",
+ run_once=False,
)
)
@@ -413,7 +467,9 @@
else:
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+ output, rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False
+ )
# remove temporal values yaml file
if file_to_delete:
@@ -424,23 +480,20 @@
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
db_dict=db_dict,
- operation='install',
+ operation="install",
run_once=True,
- check_every=0
+ check_every=0,
)
if rc != 0:
- msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
+ msg = "Error executing command: {}\nOutput: {}".format(command, output)
self.log.error(msg)
raise K8sException(msg)
- self.log.debug('Returning kdu_instance {}'.format(kdu_instance))
+ self.log.debug("Returning kdu_instance {}".format(kdu_instance))
return kdu_instance
- async def instances_list(
- self,
- cluster_uuid: str
- ) -> list:
+ async def instances_list(self, cluster_uuid: str) -> list:
"""
returns a list of deployed releases in a cluster
@@ -448,71 +501,90 @@
:return:
"""
- self.log.debug('list releases for cluster {}'.format(cluster_uuid))
+ self.log.debug("list releases for cluster {}".format(cluster_uuid))
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
- command = '{} --kubeconfig={} --home={} list --output yaml'\
- .format(self._helm_command, config_filename, helm_dir)
+ command = "{} --kubeconfig={} --home={} list --output yaml".format(
+ self._helm_command, config_filename, helm_dir
+ )
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
if output and len(output) > 0:
- return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
+ return yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
else:
return []
async def upgrade(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- kdu_model: str = None,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ kdu_model: str = None,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
):
- self.log.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
+ self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
# params to str
# params_str = K8sHelmConnector._params_to_set_option(params)
- params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
+ params_str, file_to_delete = self._params_to_file_option(
+ cluster_uuid=cluster_uuid, params=params
+ )
- timeout_str = ''
+ timeout_str = ""
if timeout:
- timeout_str = '--timeout {}'.format(timeout)
+ timeout_str = "--timeout {}".format(timeout)
# atomic
- atomic_str = ''
+ atomic_str = ""
if atomic:
- atomic_str = '--atomic'
+ atomic_str = "--atomic"
# version
- version_str = ''
- if kdu_model and ':' in kdu_model:
- parts = kdu_model.split(sep=':')
+ version_str = ""
+ if kdu_model and ":" in kdu_model:
+ parts = kdu_model.split(sep=":")
if len(parts) == 2:
- version_str = '--version {}'.format(parts[1])
+ version_str = "--version {}".format(parts[1])
kdu_model = parts[0]
# helm repo upgrade
- command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
- .format(self._helm_command, atomic_str, config_filename, helm_dir,
- params_str, timeout_str, kdu_instance, kdu_model, version_str)
- self.log.debug('upgrading: {}'.format(command))
+ command = (
+ "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
+ ).format(
+ self._helm_command,
+ atomic_str,
+ config_filename,
+ helm_dir,
+ params_str,
+ timeout_str,
+ kdu_instance,
+ kdu_model,
+ version_str,
+ )
+ self.log.debug("upgrading: {}".format(command))
if atomic:
# exec helm in a task
exec_task = asyncio.ensure_future(
- coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
+ coro_or_future=self._local_async_exec(
+ command=command, raise_exception_on_error=False
+ )
)
# write status in another task
status_task = asyncio.ensure_future(
@@ -520,8 +592,8 @@
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
db_dict=db_dict,
- operation='upgrade',
- run_once=False
+ operation="upgrade",
+ run_once=False,
)
)
@@ -534,7 +606,9 @@
else:
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+ output, rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False
+ )
# remove temporal values yaml file
if file_to_delete:
@@ -545,46 +619,51 @@
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
db_dict=db_dict,
- operation='upgrade',
+ operation="upgrade",
run_once=True,
- check_every=0
+ check_every=0,
)
if rc != 0:
- msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
+ msg = "Error executing command: {}\nOutput: {}".format(command, output)
self.log.error(msg)
raise K8sException(msg)
# return new revision number
- instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+ instance = await self.get_instance_info(
+ cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+ )
if instance:
- revision = int(instance.get('Revision'))
- self.log.debug('New revision: {}'.format(revision))
+ revision = int(instance.get("Revision"))
+ self.log.debug("New revision: {}".format(revision))
return revision
else:
return 0
async def rollback(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- revision=0,
- db_dict: dict = None
+ self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
):
- self.log.debug('rollback kdu_instance {} to revision {} from cluster {}'
- .format(kdu_instance, revision, cluster_uuid))
+ self.log.debug(
+ "rollback kdu_instance {} to revision {} from cluster {}".format(
+ kdu_instance, revision, cluster_uuid
+ )
+ )
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
- command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
- .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
+ command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
+ self._helm_command, config_filename, helm_dir, kdu_instance, revision
+ )
# exec helm in a task
exec_task = asyncio.ensure_future(
- coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
+ coro_or_future=self._local_async_exec(
+ command=command, raise_exception_on_error=False
+ )
)
# write status in another task
status_task = asyncio.ensure_future(
@@ -592,8 +671,8 @@
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
db_dict=db_dict,
- operation='rollback',
- run_once=False
+ operation="rollback",
+ run_once=False,
)
)
@@ -610,49 +689,56 @@
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
db_dict=db_dict,
- operation='rollback',
+ operation="rollback",
run_once=True,
- check_every=0
+ check_every=0,
)
if rc != 0:
- msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
+ msg = "Error executing command: {}\nOutput: {}".format(command, output)
self.log.error(msg)
raise K8sException(msg)
# return new revision number
- instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+ instance = await self.get_instance_info(
+ cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+ )
if instance:
- revision = int(instance.get('Revision'))
- self.log.debug('New revision: {}'.format(revision))
+ revision = int(instance.get("Revision"))
+ self.log.debug("New revision: {}".format(revision))
return revision
else:
return 0
- async def uninstall(
- self,
- cluster_uuid: str,
- kdu_instance: str
- ):
+ async def uninstall(self, cluster_uuid: str, kdu_instance: str):
"""
- Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
- after all _terminate-config-primitive_ of the VNF are invoked).
+ Removes an existing KDU instance. It would implicitly use the `delete` call
+ (this call would happen after all _terminate-config-primitive_ of the VNF
+ are invoked).
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_instance: unique name for the KDU instance to be deleted
:return: True if successful
"""
- self.log.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
+ self.log.debug(
+ "uninstall kdu_instance {} from cluster {}".format(
+ kdu_instance, cluster_uuid
+ )
+ )
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
- command = '{} --kubeconfig={} --home={} delete --purge {}'\
- .format(self._helm_command, config_filename, helm_dir, kdu_instance)
+ command = "{} --kubeconfig={} --home={} delete --purge {}".format(
+ self._helm_command, config_filename, helm_dir, kdu_instance
+ )
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
return self._output_to_table(output)
@@ -676,62 +762,70 @@
:return: Returns the output of the action
"""
- raise K8sException("KDUs deployed with Helm don't support actions "
- "different from rollback, upgrade and status")
+ raise K8sException(
+ "KDUs deployed with Helm don't support actions "
+ "different from rollback, upgrade and status"
+ )
- async def inspect_kdu(
- self,
- kdu_model: str,
- repo_url: str = None
- ) -> str:
+ async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
- self.log.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url))
+ self.log.debug(
+ "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
+ )
- return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url)
+ return await self._exec_inspect_comand(
+ inspect_command="", kdu_model=kdu_model, repo_url=repo_url
+ )
- async def values_kdu(
- self,
- kdu_model: str,
- repo_url: str = None
- ) -> str:
+ async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
- self.log.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url))
+ self.log.debug(
+ "inspect kdu_model values {} from (optional) repo: {}".format(
+ kdu_model, repo_url
+ )
+ )
- return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url)
+ return await self._exec_inspect_comand(
+ inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
+ )
- async def help_kdu(
- self,
- kdu_model: str,
- repo_url: str = None
- ) -> str:
+ async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
- self.log.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url))
+ self.log.debug(
+ "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
+ )
- return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url)
+ return await self._exec_inspect_comand(
+ inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
+ )
- async def status_kdu(
- self,
- cluster_uuid: str,
- kdu_instance: str
- ) -> str:
+ async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
# call internal function
return await self._status_kdu(
cluster_uuid=cluster_uuid,
kdu_instance=kdu_instance,
show_error_log=True,
- return_text=True
+ return_text=True,
)
async def synchronize_repos(self, cluster_uuid: str):
self.log.debug("syncronize repos for cluster helm-id: {}",)
try:
- update_repos_timeout = 300 # max timeout to sync a single repos, more than this is too much
- db_k8scluster = self.db.get_one("k8sclusters", {"_admin.helm-chart.id": cluster_uuid})
+ update_repos_timeout = (
+ 300 # max timeout to sync a single repos, more than this is too much
+ )
+ db_k8scluster = self.db.get_one(
+ "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
+ )
if db_k8scluster:
- nbi_repo_list = db_k8scluster.get("_admin").get("helm_chart_repos") or []
- cluster_repo_dict = db_k8scluster.get("_admin").get("helm_charts_added") or {}
+ nbi_repo_list = (
+ db_k8scluster.get("_admin").get("helm_chart_repos") or []
+ )
+ cluster_repo_dict = (
+ db_k8scluster.get("_admin").get("helm_charts_added") or {}
+ )
# elements that must be deleted
deleted_repo_list = []
added_repo_dict = {}
@@ -739,103 +833,144 @@
self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
# obtain repos to add: registered by nbi but not added
- repos_to_add = [repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)]
+ repos_to_add = [
+ repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
+ ]
# obtain repos to delete: added by cluster but not in nbi list
- repos_to_delete = [repo for repo in cluster_repo_dict.keys() if repo not in nbi_repo_list]
+ repos_to_delete = [
+ repo
+ for repo in cluster_repo_dict.keys()
+ if repo not in nbi_repo_list
+ ]
- # delete repos: must delete first then add because there may be different repos with same name but
+ # delete repos: must delete first then add because there may be
+ # different repos with same name but
# different id and url
self.log.debug("repos to delete: {}".format(repos_to_delete))
for repo_id in repos_to_delete:
# try to delete repos
try:
- repo_delete_task = asyncio.ensure_future(self.repo_remove(cluster_uuid=cluster_uuid,
- name=cluster_repo_dict[repo_id]))
+ repo_delete_task = asyncio.ensure_future(
+ self.repo_remove(
+ cluster_uuid=cluster_uuid,
+ name=cluster_repo_dict[repo_id],
+ )
+ )
await asyncio.wait_for(repo_delete_task, update_repos_timeout)
except Exception as e:
- self.warning("Error deleting repo, id: {}, name: {}, err_msg: {}".format(repo_id,
- cluster_repo_dict[repo_id], str(e)))
- # always add to the list of to_delete if there is an error because if is not there deleting raises error
+ self.warning(
+ "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
+ repo_id, cluster_repo_dict[repo_id], str(e)
+ )
+ )
+ # always add to the list of to_delete if there is an error
+ # because if is not there
+ # deleting raises error
deleted_repo_list.append(repo_id)
# add repos
self.log.debug("repos to add: {}".format(repos_to_add))
- add_task_list = []
for repo_id in repos_to_add:
# obtain the repo data from the db
- # if there is an error getting the repo in the database we will ignore this repo and continue
- # because there is a possible race condition where the repo has been deleted while processing
+ # if there is an error getting the repo in the database we will
+ # ignore this repo and continue
+ # because there is a possible race condition where the repo has
+ # been deleted while processing
db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
- self.log.debug("obtained repo: id, {}, name: {}, url: {}".format(repo_id, db_repo["name"], db_repo["url"]))
+ self.log.debug(
+ "obtained repo: id, {}, name: {}, url: {}".format(
+ repo_id, db_repo["name"], db_repo["url"]
+ )
+ )
try:
- repo_add_task = asyncio.ensure_future(self.repo_add(cluster_uuid=cluster_uuid,
- name=db_repo["name"], url=db_repo["url"],
- repo_type="chart"))
+ repo_add_task = asyncio.ensure_future(
+ self.repo_add(
+ cluster_uuid=cluster_uuid,
+ name=db_repo["name"],
+ url=db_repo["url"],
+ repo_type="chart",
+ )
+ )
await asyncio.wait_for(repo_add_task, update_repos_timeout)
added_repo_dict[repo_id] = db_repo["name"]
- self.log.debug("added repo: id, {}, name: {}".format(repo_id, db_repo["name"]))
+ self.log.debug(
+ "added repo: id, {}, name: {}".format(
+ repo_id, db_repo["name"]
+ )
+ )
except Exception as e:
- # deal with error adding repo, adding a repo that already exists does not raise any error
- # will not raise error because a wrong repos added by anyone could prevent instantiating any ns
- self.log.error("Error adding repo id: {}, err_msg: {} ".format(repo_id, repr(e)))
+ # deal with error adding repo, adding a repo that already
+ # exists does not raise any error
+ # will not raise error because a wrong repos added by
+ # anyone could prevent instantiating any ns
+ self.log.error(
+ "Error adding repo id: {}, err_msg: {} ".format(
+ repo_id, repr(e)
+ )
+ )
return deleted_repo_list, added_repo_dict
- else: # else db_k8scluster does not exist
- raise K8sException("k8cluster with helm-id : {} not found".format(cluster_uuid))
+ else: # else db_k8scluster does not exist
+ raise K8sException(
+ "k8cluster with helm-id : {} not found".format(cluster_uuid)
+ )
except Exception as e:
self.log.error("Error synchronizing repos: {}".format(str(e)))
raise K8sException("Error synchronizing repos")
"""
- ##################################################################################################
- ########################################## P R I V A T E #########################################
- ##################################################################################################
+ ####################################################################################
+ ################################### P R I V A T E ##################################
+ ####################################################################################
"""
async def _exec_inspect_comand(
- self,
- inspect_command: str,
- kdu_model: str,
- repo_url: str = None
+ self, inspect_command: str, kdu_model: str, repo_url: str = None
):
- repo_str = ''
+ repo_str = ""
if repo_url:
- repo_str = ' --repo {}'.format(repo_url)
- idx = kdu_model.find('/')
+ repo_str = " --repo {}".format(repo_url)
+ idx = kdu_model.find("/")
if idx >= 0:
idx += 1
kdu_model = kdu_model[idx:]
- inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str)
- output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True)
+ inspect_command = "{} inspect {} {}{}".format(
+ self._helm_command, inspect_command, kdu_model, repo_str
+ )
+ output, _rc = await self._local_async_exec(
+ command=inspect_command, encode_utf8=True
+ )
return output
async def _status_kdu(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- show_error_log: bool = False,
- return_text: bool = False
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ show_error_log: bool = False,
+ return_text: bool = False,
):
- self.log.debug('status of kdu_instance {}'.format(kdu_instance))
+ self.log.debug("status of kdu_instance {}".format(kdu_instance))
# config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
- command = '{} --kubeconfig={} --home={} status {} --output yaml'\
- .format(self._helm_command, config_filename, helm_dir, kdu_instance)
+ command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
+ self._helm_command, config_filename, helm_dir, kdu_instance
+ )
output, rc = await self._local_async_exec(
command=command,
raise_exception_on_error=True,
- show_error_log=show_error_log
+ show_error_log=show_error_log,
)
if return_text:
@@ -848,111 +983,106 @@
# remove field 'notes'
try:
- del data.get('info').get('status')['notes']
+ del data.get("info").get("status")["notes"]
except KeyError:
pass
# parse field 'resources'
try:
- resources = str(data.get('info').get('status').get('resources'))
+ resources = str(data.get("info").get("status").get("resources"))
resource_table = self._output_to_table(resources)
- data.get('info').get('status')['resources'] = resource_table
- except Exception as e:
+ data.get("info").get("status")["resources"] = resource_table
+ except Exception:
pass
return data
- async def get_instance_info(
- self,
- cluster_uuid: str,
- kdu_instance: str
- ):
+ async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
instances = await self.instances_list(cluster_uuid=cluster_uuid)
for instance in instances:
- if instance.get('Name') == kdu_instance:
+ if instance.get("Name") == kdu_instance:
return instance
- self.log.debug('Instance {} not found'.format(kdu_instance))
+ self.log.debug("Instance {} not found".format(kdu_instance))
return None
@staticmethod
- def _generate_release_name(
- chart_name: str
- ):
+ def _generate_release_name(chart_name: str):
# check embeded chart (file or dir)
- if chart_name.startswith('/'):
+ if chart_name.startswith("/"):
# extract file or directory name
- chart_name = chart_name[chart_name.rfind('/')+1:]
+ chart_name = chart_name[chart_name.rfind("/") + 1 :]
# check URL
- elif '://' in chart_name:
+ elif "://" in chart_name:
# extract last portion of URL
- chart_name = chart_name[chart_name.rfind('/')+1:]
+ chart_name = chart_name[chart_name.rfind("/") + 1 :]
- name = ''
+ name = ""
for c in chart_name:
if c.isalpha() or c.isnumeric():
name += c
else:
- name += '-'
+ name += "-"
if len(name) > 35:
name = name[0:35]
# if does not start with alpha character, prefix 'a'
if not name[0].isalpha():
- name = 'a' + name
+ name = "a" + name
- name += '-'
+ name += "-"
def get_random_number():
r = random.randrange(start=1, stop=99999999)
s = str(r)
- s = s.rjust(10, '0')
+ s = s.rjust(10, "0")
return s
name = name + get_random_number()
return name.lower()
async def _store_status(
- self,
- cluster_uuid: str,
- operation: str,
- kdu_instance: str,
- check_every: float = 10,
- db_dict: dict = None,
- run_once: bool = False
+ self,
+ cluster_uuid: str,
+ operation: str,
+ kdu_instance: str,
+ check_every: float = 10,
+ db_dict: dict = None,
+ run_once: bool = False,
):
while True:
try:
await asyncio.sleep(check_every)
- detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
- status = detailed_status.get('info').get('Description')
- self.log.debug('STATUS:\n{}'.format(status))
- self.log.debug('DETAILED STATUS:\n{}'.format(detailed_status))
+ detailed_status = await self.status_kdu(
+ cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+ )
+ status = detailed_status.get("info").get("Description")
+ self.log.debug("STATUS:\n{}".format(status))
+ self.log.debug("DETAILED STATUS:\n{}".format(detailed_status))
# write status to db
result = await self.write_app_status_to_db(
db_dict=db_dict,
status=str(status),
detailed_status=str(detailed_status),
- operation=operation)
+ operation=operation,
+ )
if not result:
- self.log.info('Error writing in database. Task exiting...')
+ self.log.info("Error writing in database. Task exiting...")
return
except asyncio.CancelledError:
- self.log.debug('Task cancelled')
+ self.log.debug("Task cancelled")
return
except Exception as e:
- self.log.debug('_store_status exception: {}'.format(str(e)))
+ self.log.debug("_store_status exception: {}".format(str(e)))
pass
finally:
if run_once:
return
- async def _is_install_completed(
- self,
- cluster_uuid: str,
- kdu_instance: str
- ) -> bool:
+ async def _is_install_completed(self, cluster_uuid: str, kdu_instance: str) -> bool:
- status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False)
+ status = await self._status_kdu(
+ cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False
+ )
# extract info.status.resources-> str
# format:
@@ -961,7 +1091,7 @@
# halting-horse-mongodb 0/1 1 0 0s
# halting-petit-mongodb 1/1 1 0 0s
# blank line
- resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
+ resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
# convert to table
resources = K8sHelmConnector._output_to_table(resources)
@@ -973,26 +1103,26 @@
line1 = resources[index]
index += 1
# find '==>' in column 0
- if line1[0] == '==>':
+ if line1[0] == "==>":
line2 = resources[index]
index += 1
# find READY in column 1
- if line2[1] == 'READY':
+ if line2[1] == "READY":
# read next lines
line3 = resources[index]
index += 1
while len(line3) > 1 and index < num_lines:
ready_value = line3[1]
- parts = ready_value.split(sep='/')
+ parts = ready_value.split(sep="/")
current = int(parts[0])
total = int(parts[1])
if current < total:
- self.log.debug('NOT READY:\n {}'.format(line3))
+ self.log.debug("NOT READY:\n {}".format(line3))
ready = False
line3 = resources[index]
index += 1
- except Exception as e:
+ except Exception:
pass
return ready
@@ -1008,7 +1138,7 @@
return None
else:
target = value
- except Exception as e:
+ except Exception:
pass
return value
@@ -1017,11 +1147,11 @@
def _find_in_lines(p_lines: list, p_key: str) -> str:
for line in p_lines:
try:
- if line.startswith(p_key + ':'):
- parts = line.split(':')
+ if line.startswith(p_key + ":"):
+ parts = line.split(":")
the_value = parts[1].strip()
return the_value
- except Exception as e:
+ except Exception:
# ignore it
pass
return None
@@ -1031,46 +1161,45 @@
def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
if params and len(params) > 0:
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
def get_random_number():
r = random.randrange(start=1, stop=99999999)
s = str(r)
while len(s) < 10:
- s = '0' + s
+ s = "0" + s
return s
params2 = dict()
for key in params:
value = params.get(key)
- if '!!yaml' in str(value):
+ if "!!yaml" in str(value):
value = yaml.load(value[7:])
params2[key] = value
- values_file = get_random_number() + '.yaml'
- with open(values_file, 'w') as stream:
+ values_file = get_random_number() + ".yaml"
+ with open(values_file, "w") as stream:
yaml.dump(params2, stream, indent=4, default_flow_style=False)
- return '-f {}'.format(values_file), values_file
+ return "-f {}".format(values_file), values_file
- return '', None
+ return "", None
# params for use in --set option
@staticmethod
def _params_to_set_option(params: dict) -> str:
- params_str = ''
+ params_str = ""
if params and len(params) > 0:
start = True
for key in params:
value = params.get(key, None)
if value is not None:
if start:
- params_str += '--set '
+ params_str += "--set "
start = False
else:
- params_str += ','
- params_str += '{}={}'.format(key, value)
+ params_str += ","
+ params_str += "{}={}".format(key, value)
return params_str
@staticmethod
@@ -1088,17 +1217,19 @@
output_table = list()
lines = output.splitlines(keepends=False)
for line in lines:
- line = line.replace('\t', ' ')
+ line = line.replace("\t", " ")
line_list = list()
output_table.append(line_list)
- cells = line.split(sep=' ')
+ cells = line.split(sep=" ")
for cell in cells:
cell = cell.strip()
if len(cell) > 0:
line_list.append(cell)
return output_table
- def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
+ def _get_paths(
+ self, cluster_name: str, create_if_not_exist: bool = False
+ ) -> (str, str, str, str):
"""
Returns kube and helm directories
@@ -1113,81 +1244,78 @@
base = base[:-1]
# base dir for cluster
- cluster_dir = base + '/' + cluster_name
+ cluster_dir = base + "/" + cluster_name
if create_if_not_exist and not os.path.exists(cluster_dir):
- self.log.debug('Creating dir {}'.format(cluster_dir))
+ self.log.debug("Creating dir {}".format(cluster_dir))
os.makedirs(cluster_dir)
if not os.path.exists(cluster_dir):
- msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
+ msg = "Base cluster dir {} does not exist".format(cluster_dir)
self.log.error(msg)
raise K8sException(msg)
# kube dir
- kube_dir = cluster_dir + '/' + '.kube'
+ kube_dir = cluster_dir + "/" + ".kube"
if create_if_not_exist and not os.path.exists(kube_dir):
- self.log.debug('Creating dir {}'.format(kube_dir))
+ self.log.debug("Creating dir {}".format(kube_dir))
os.makedirs(kube_dir)
if not os.path.exists(kube_dir):
- msg = 'Kube config dir {} does not exist'.format(kube_dir)
+ msg = "Kube config dir {} does not exist".format(kube_dir)
self.log.error(msg)
raise K8sException(msg)
# helm home dir
- helm_dir = cluster_dir + '/' + '.helm'
+ helm_dir = cluster_dir + "/" + ".helm"
if create_if_not_exist and not os.path.exists(helm_dir):
- self.log.debug('Creating dir {}'.format(helm_dir))
+ self.log.debug("Creating dir {}".format(helm_dir))
os.makedirs(helm_dir)
if not os.path.exists(helm_dir):
- msg = 'Helm config dir {} does not exist'.format(helm_dir)
+ msg = "Helm config dir {} does not exist".format(helm_dir)
self.log.error(msg)
raise K8sException(msg)
- config_filename = kube_dir + '/config'
+ config_filename = kube_dir + "/config"
return kube_dir, helm_dir, config_filename, cluster_dir
@staticmethod
- def _remove_multiple_spaces(str):
- str = str.strip()
- while ' ' in str:
- str = str.replace(' ', ' ')
- return str
+ def _remove_multiple_spaces(strobj):
+ strobj = strobj.strip()
+ while " " in strobj:
+ strobj = strobj.replace(" ", " ")
+ return strobj
- def _local_exec(
- self,
- command: str
- ) -> (str, int):
+ def _local_exec(self, command: str) -> (str, int):
command = K8sHelmConnector._remove_multiple_spaces(command)
- self.log.debug('Executing sync local command: {}'.format(command))
+ self.log.debug("Executing sync local command: {}".format(command))
# raise exception if fails
- output = ''
+ output = ""
try:
- output = subprocess.check_output(command, shell=True, universal_newlines=True)
+ output = subprocess.check_output(
+ command, shell=True, universal_newlines=True
+ )
return_code = 0
self.log.debug(output)
- except Exception as e:
+ except Exception:
return_code = 1
return output, return_code
async def _local_async_exec(
- self,
- command: str,
- raise_exception_on_error: bool = False,
- show_error_log: bool = True,
- encode_utf8: bool = False
+ self,
+ command: str,
+ raise_exception_on_error: bool = False,
+ show_error_log: bool = True,
+ encode_utf8: bool = False,
) -> (str, int):
command = K8sHelmConnector._remove_multiple_spaces(command)
- self.log.debug('Executing async local command: {}'.format(command))
+ self.log.debug("Executing async local command: {}".format(command))
# split command
- command = command.split(sep=' ')
+ command = command.split(sep=" ")
try:
process = await asyncio.create_subprocess_exec(
- *command,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE
+ *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
# wait for command terminate
@@ -1195,25 +1323,27 @@
return_code = process.returncode
- output = ''
+ output = ""
if stdout:
- output = stdout.decode('utf-8').strip()
+ output = stdout.decode("utf-8").strip()
# output = stdout.decode()
if stderr:
- output = stderr.decode('utf-8').strip()
+ output = stderr.decode("utf-8").strip()
# output = stderr.decode()
if return_code != 0 and show_error_log:
- self.log.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
+ self.log.debug(
+ "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
+ )
else:
- self.log.debug('Return code: {}'.format(return_code))
+ self.log.debug("Return code: {}".format(return_code))
if raise_exception_on_error and return_code != 0:
raise K8sException(output)
if encode_utf8:
- output = output.encode('utf-8').strip()
- output = str(output).replace('\\n', '\n')
+ output = output.encode("utf-8").strip()
+ output = str(output).replace("\\n", "\n")
return output, return_code
@@ -1222,21 +1352,19 @@
except K8sException:
raise
except Exception as e:
- msg = 'Exception executing command: {} -> {}'.format(command, e)
+ msg = "Exception executing command: {} -> {}".format(command, e)
self.log.error(msg)
if raise_exception_on_error:
raise K8sException(e) from e
else:
- return '', -1
+ return "", -1
def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
# self.log.debug('Checking if file {} exists...'.format(filename))
if os.path.exists(filename):
return True
else:
- msg = 'File {} does not exist'.format(filename)
+ msg = "File {} does not exist".format(filename)
if exception_if_not_exists:
# self.log.error(msg)
raise K8sException(msg)
-
-