# For those usages not covered by the Apache License, Version 2.0 please
# contact with: nfvlabs@tid.es
##
-
-import paramiko
-import subprocess
-import os
-import shutil
import asyncio
-import time
+from typing import Union
+import os
import yaml
-from uuid import uuid4
-import random
-from n2vc.k8s_conn import K8sConnector
+from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
+from n2vc.exceptions import K8sException
-class K8sHelmConnector(K8sConnector):
+
+class K8sHelmConnector(K8sHelmBaseConnector):
"""
- ##################################################################################################
- ########################################## P U B L I C ###########################################
- ##################################################################################################
+ ####################################################################################
+ ################################### P U B L I C ####################################
+ ####################################################################################
"""
def __init__(
- self,
- fs: object,
- db: object,
- kubectl_command: str = '/usr/bin/kubectl',
- helm_command: str = '/usr/bin/helm',
- log: object = None,
- on_update_db=None
+ self,
+ fs: object,
+ db: object,
+ kubectl_command: str = "/usr/bin/kubectl",
+ helm_command: str = "/usr/bin/helm",
+ log: object = None,
+ on_update_db=None,
):
"""
+ Initializes helm connector for helm v2
:param fs: file system for kubernetes and helm configuration
:param db: database object to write current operation status
"""
# parent class
- K8sConnector.__init__(
+ K8sHelmBaseConnector.__init__(
self,
db=db,
log=log,
- on_update_db=on_update_db
+ fs=fs,
+ kubectl_command=kubectl_command,
+ helm_command=helm_command,
+ on_update_db=on_update_db,
)
- self.info('Initializing K8S Helm connector')
-
- # random numbers for release name generation
- random.seed(time.time())
-
- # the file system
- self.fs = fs
-
- # exception if kubectl is not installed
- self.kubectl_command = kubectl_command
- self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
-
- # exception if helm is not installed
- self._helm_command = helm_command
- self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
-
- self.info('K8S Helm connector initialized')
+ self.log.info("Initializing K8S Helm2 connector")
- async def init_env(
- self,
- k8s_creds: str,
- namespace: str = 'kube-system',
- reuse_cluster_uuid=None
- ) -> (str, bool):
-
- cluster_uuid = reuse_cluster_uuid
- if not cluster_uuid:
- cluster_uuid = str(uuid4())
-
- self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
-
- # create config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
- f = open(config_filename, "w")
- f.write(k8s_creds)
- f.close()
-
- # check if tiller pod is up in cluster
- command = '{} --kubeconfig={} --namespace={} get deployments'\
- .format(self.kubectl_command, config_filename, namespace)
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
-
- output_table = K8sHelmConnector._output_to_table(output=output)
-
- # find 'tiller' pod in all pods
- already_initialized = False
+ # initialize helm client-only
+ self.log.debug("Initializing helm client-only...")
+ command = "{} init --client-only {} ".format(
+ self._helm_command,
+ "--stable-repo-url {}".format(self._stable_repo_url)
+ if self._stable_repo_url
+ else "--skip-repos",
+ )
try:
- for row in output_table:
- if row[0].startswith('tiller-deploy'):
- already_initialized = True
- break
+ asyncio.ensure_future(
+ self._local_async_exec(command=command, raise_exception_on_error=False)
+ )
+ # loop = asyncio.get_event_loop()
+ # loop.run_until_complete(self._local_async_exec(command=command,
+ # raise_exception_on_error=False))
except Exception as e:
- pass
-
- # helm init
- n2vc_installed_sw = False
- if not already_initialized:
- self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
- command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
- .format(self._helm_command, config_filename, namespace, helm_dir)
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
- n2vc_installed_sw = True
- else:
- # check client helm installation
- check_file = helm_dir + '/repository/repositories.yaml'
- if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
- self.info('Initializing helm in client: {}'.format(cluster_uuid))
- command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
- .format(self._helm_command, config_filename, namespace, helm_dir)
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
- else:
- self.info('Helm client already initialized')
-
- self.info('Cluster initialized {}'.format(cluster_uuid))
-
- return cluster_uuid, n2vc_installed_sw
-
- async def repo_add(
- self,
- cluster_uuid: str,
- name: str,
- url: str,
- repo_type: str = 'chart'
- ):
-
- self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
-
- # config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
-
- # helm repo update
- command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
- self.debug('updating repo: {}'.format(command))
- await self._local_async_exec(command=command, raise_exception_on_error=False)
-
- # helm repo add name url
- command = '{} --kubeconfig={} --home={} repo add {} {}'\
- .format(self._helm_command, config_filename, helm_dir, name, url)
- self.debug('adding repo: {}'.format(command))
- await self._local_async_exec(command=command, raise_exception_on_error=True)
-
- async def repo_list(
- self,
- cluster_uuid: str
- ) -> list:
- """
- Get the list of registered repositories
-
- :return: list of registered repositories: [ (name, url) .... ]
- """
-
- self.debug('list repositories for cluster {}'.format(cluster_uuid))
-
- # config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
-
- command = '{} --kubeconfig={} --home={} repo list --output yaml'.format(self._helm_command, config_filename, helm_dir)
+ self.warning(
+ msg="helm init failed (it was already initialized): {}".format(e)
+ )
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
- if output and len(output) > 0:
- return yaml.load(output, Loader=yaml.SafeLoader)
- else:
- return []
+ self.log.info("K8S Helm2 connector initialized")
- async def repo_remove(
- self,
- cluster_uuid: str,
- name: str
+ async def install(
+ self,
+ cluster_uuid: str,
+ kdu_model: str,
+ kdu_instance: str,
+ atomic: bool = True,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ kdu_name: str = None,
+ namespace: str = None,
+ **kwargs,
):
"""
- Remove a repository from OSM
+ Deploys of a new KDU instance. It would implicitly rely on the `install` call
+ to deploy the Chart/Bundle properly parametrized (in practice, this call would
+ happen before any _initial-config-primitive_of the VNF is called).
- :param cluster_uuid: the cluster
- :param name: repo name in OSM
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_model: chart/ reference (string), which can be either
+ of these options:
+ - a name of chart available via the repos known by OSM
+ - a path to a packaged chart
+ - a path to an unpacked chart directory or a URL
+ :param kdu_instance: Kdu instance name
+ :param atomic: If set, installation process purges chart/bundle on fail, also
+ will wait until all the K8s objects are active
+ :param timeout: Time in seconds to wait for the install of the chart/bundle
+ (defaults to Helm default timeout: 300s)
+ :param params: dictionary of key-value pairs for instantiation parameters
+ (overriding default values)
+ :param dict db_dict: where to write into database when the status changes.
+ It contains a dict with {collection: <str>, filter: {},
+ path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+ :param kdu_name: Name of the KDU instance to be installed
+ :param namespace: K8s namespace to use for the KDU instance
+ :param kwargs: Additional parameters (None yet)
:return: True if successful
"""
+ self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
- self.debug('list repositories for cluster {}'.format(cluster_uuid))
-
- # config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
-
- command = '{} --kubeconfig={} --home={} repo remove {}'\
- .format(self._helm_command, config_filename, helm_dir, name)
+ # sync local dir
+ self.fs.sync(from_path=cluster_uuid)
- await self._local_async_exec(command=command, raise_exception_on_error=True)
-
- async def reset(
- self,
- cluster_uuid: str,
- force: bool = False,
- uninstall_sw: bool = False
- ) -> bool:
-
- self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
-
- # get kube and helm directories
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
-
- # uninstall releases if needed
- releases = await self.instances_list(cluster_uuid=cluster_uuid)
- if len(releases) > 0:
- if force:
- for r in releases:
- try:
- kdu_instance = r.get('Name')
- chart = r.get('Chart')
- self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
- await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
- except Exception as e:
- self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
- else:
- msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
- .format(cluster_uuid)
- self.error(msg)
- raise Exception(msg)
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
- if uninstall_sw:
+ await self._install_impl(
+ cluster_uuid,
+ kdu_model,
+ paths,
+ env,
+ kdu_instance,
+ atomic=atomic,
+ timeout=timeout,
+ params=params,
+ db_dict=db_dict,
+ kdu_name=kdu_name,
+ namespace=namespace,
+ )
- self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_uuid)
- # find namespace for tiller pod
- command = '{} --kubeconfig={} get deployments --all-namespaces'\
- .format(self.kubectl_command, config_filename)
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
- output_table = K8sHelmConnector._output_to_table(output=output)
- namespace = None
- for r in output_table:
- try:
- if 'tiller-deploy' in r[1]:
- namespace = r[0]
- break
- except Exception as e:
- pass
- else:
- msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
- self.error(msg)
- # raise Exception(msg)
-
- self.debug('namespace for tiller: {}'.format(namespace))
-
- force_str = '--force'
-
- if namespace:
- # delete tiller deployment
- self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
- command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
- .format(self.kubectl_command, namespace, config_filename, force_str)
- await self._local_async_exec(command=command, raise_exception_on_error=False)
-
- # uninstall tiller from cluster
- self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
- command = '{} --kubeconfig={} --home={} reset'\
- .format(self._helm_command, config_filename, helm_dir)
- self.debug('resetting: {}'.format(command))
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
- else:
- self.debug('namespace not found')
+ self.log.debug("Returning kdu_instance {}".format(kdu_instance))
+ return True
- # delete cluster directory
- dir = self.fs.path + '/' + cluster_uuid
- self.debug('Removing directory {}'.format(dir))
- shutil.rmtree(dir, ignore_errors=True)
+ async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
- return True
+ self.log.debug(
+ "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
+ )
- async def install(
- self,
- cluster_uuid: str,
- kdu_model: str,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None
- ):
+ return await self._exec_inspect_comand(
+ inspect_command="", kdu_model=kdu_model, repo_url=repo_url
+ )
- self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
+ """
+ ####################################################################################
+ ################################### P R I V A T E ##################################
+ ####################################################################################
+ """
- start = time.time()
- end = start + timeout
+ def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
+ """
+ Creates and returns base cluster and kube dirs and returns them.
+ Also created helm3 dirs according to new directory specification, paths are
+ returned and also environment variables that must be provided to execute commands
- # config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ Helm 2 directory specification uses helm_home dir:
- # params to str
- # params_str = K8sHelmConnector._params_to_set_option(params)
- params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
+ The variables assigned for this paths are:
+ - Helm hone: $HELM_HOME
+ - helm kubeconfig: $KUBECONFIG
- timeout_str = ''
- if timeout:
- timeout_str = '--timeout {}'.format(timeout)
+ :param cluster_name: cluster_name
+ :return: Dictionary with config_paths and dictionary with helm environment variables
+ """
+ base = self.fs.path
+ if base.endswith("/") or base.endswith("\\"):
+ base = base[:-1]
- # atomic
- atomic_str = ''
- if atomic:
- atomic_str = '--atomic'
+ # base dir for cluster
+ cluster_dir = base + "/" + cluster_name
- # version
- version_str = ''
- if ':' in kdu_model:
- parts = kdu_model.split(sep=':')
- if len(parts) == 2:
- version_str = '--version {}'.format(parts[1])
- kdu_model = parts[0]
-
- # generate a name for the releas. Then, check if already exists
- kdu_instance = None
- while kdu_instance is None:
- kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
- try:
- result = await self._status_kdu(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- show_error_log=False
- )
- if result is not None:
- # instance already exists: generate a new one
- kdu_instance = None
- except:
- kdu_instance = None
-
- # helm repo install
- command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
- .format(self._helm_command, atomic_str, config_filename, helm_dir,
- params_str, timeout_str, kdu_instance, kdu_model, version_str)
- self.debug('installing: {}'.format(command))
+ # kube dir
+ kube_dir = cluster_dir + "/" + ".kube"
+ if create_if_not_exist and not os.path.exists(kube_dir):
+ self.log.debug("Creating dir {}".format(kube_dir))
+ os.makedirs(kube_dir)
- if atomic:
- # exec helm in a task
- exec_task = asyncio.ensure_future(
- coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
- )
- # write status in another task
- status_task = asyncio.ensure_future(
- coro_or_future=self._store_status(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation='install',
- run_once=False
- )
- )
+ # helm home dir
+ helm_dir = cluster_dir + "/" + ".helm"
+ if create_if_not_exist and not os.path.exists(helm_dir):
+ self.log.debug("Creating dir {}".format(helm_dir))
+ os.makedirs(helm_dir)
- # wait for execution task
- await asyncio.wait([exec_task])
+ config_filename = kube_dir + "/config"
- # cancel status task
- status_task.cancel()
+ # 2 - Prepare dictionary with paths
+ paths = {
+ "kube_dir": kube_dir,
+ "kube_config": config_filename,
+ "cluster_dir": cluster_dir,
+ "helm_dir": helm_dir,
+ }
- output, rc = exec_task.result()
+ for file_name, file in paths.items():
+ if "dir" in file_name and not os.path.exists(file):
+ err_msg = "{} dir does not exist".format(file)
+ self.log.error(err_msg)
+ raise K8sException(err_msg)
- else:
+ # 3 - Prepare environment variables
+ env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+ return paths, env
- # remove temporal values yaml file
- if file_to_delete:
- os.remove(file_to_delete)
+ async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
- # write final status
- await self._store_status(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation='install',
- run_once=True,
- check_every=0
+ # init config, env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
)
- if rc != 0:
- msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
- self.error(msg)
- raise Exception(msg)
+ command1 = "env KUBECONFIG={} {} get manifest {} ".format(
+ kubeconfig, self._helm_command, kdu_instance
+ )
+ command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
+ output, _rc = await self._local_async_exec_pipe(
+ command1, command2, env=env, raise_exception_on_error=True
+ )
+ services = self._parse_services(output)
- self.debug('Returning kdu_instance {}'.format(kdu_instance))
- return kdu_instance
+ return services
- async def instances_list(
- self,
- cluster_uuid: str
- ) -> list:
+ async def _cluster_init(
+ self, cluster_id: str, namespace: str, paths: dict, env: dict
+ ):
"""
- returns a list of deployed releases in a cluster
-
- :param cluster_uuid: the cluster
- :return:
+ Implements the helm version dependent cluster initialization:
+ For helm2 it initialized tiller environment if needed
"""
- self.debug('list releases for cluster {}'.format(cluster_uuid))
-
- # config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ # check if tiller pod is up in cluster
+ command = "{} --kubeconfig={} --namespace={} get deployments".format(
+ self.kubectl_command, paths["kube_config"], namespace
+ )
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
+ )
- command = '{} --kubeconfig={} --home={} list --output yaml'\
- .format(self._helm_command, config_filename, helm_dir)
+ output_table = self._output_to_table(output=output)
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+ # find 'tiller' pod in all pods
+ already_initialized = False
+ try:
+ for row in output_table:
+ if row[0].startswith("tiller-deploy"):
+ already_initialized = True
+ break
+ except Exception:
+ pass
- if output and len(output) > 0:
- return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
- else:
- return []
+ # helm init
+ n2vc_installed_sw = False
+ if not already_initialized:
+ self.log.info(
+ "Initializing helm in client and server: {}".format(cluster_id)
+ )
+ command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
+ self.kubectl_command, paths["kube_config"], self.service_account
+ )
+ _, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
- async def upgrade(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- kdu_model: str = None,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None
- ):
+ command = (
+ "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
+ "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
+ ).format(self.kubectl_command, paths["kube_config"], self.service_account)
+ _, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
- self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
+ command = (
+ "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
+ " {} init"
+ ).format(
+ self._helm_command,
+ paths["kube_config"],
+ namespace,
+ paths["helm_dir"],
+ self.service_account,
+ "--stable-repo-url {}".format(self._stable_repo_url)
+ if self._stable_repo_url
+ else "--skip-repos",
+ )
+ _, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
+ )
+ n2vc_installed_sw = True
+ else:
+ # check client helm installation
+ check_file = paths["helm_dir"] + "/repository/repositories.yaml"
+ if not self._check_file_exists(
+ filename=check_file, exception_if_not_exists=False
+ ):
+ self.log.info("Initializing helm in client: {}".format(cluster_id))
+ command = (
+ "{} --kubeconfig={} --tiller-namespace={} "
+ "--home={} init --client-only {} "
+ ).format(
+ self._helm_command,
+ paths["kube_config"],
+ namespace,
+ paths["helm_dir"],
+ "--stable-repo-url {}".format(self._stable_repo_url)
+ if self._stable_repo_url
+ else "--skip-repos",
+ )
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
+ )
+ else:
+ self.log.info("Helm client already initialized")
- start = time.time()
- end = start + timeout
+ repo_list = await self.repo_list(cluster_id)
+ for repo in repo_list:
+ if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
+ self.log.debug("Add new stable repo url: {}")
+ await self.repo_remove(cluster_id, "stable")
+ if self._stable_repo_url:
+ await self.repo_add(cluster_id, "stable", self._stable_repo_url)
+ break
- # config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ return n2vc_installed_sw
- # params to str
- # params_str = K8sHelmConnector._params_to_set_option(params)
- params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
+ async def _uninstall_sw(self, cluster_id: str, namespace: str):
+ # uninstall Tiller if necessary
- timeout_str = ''
- if timeout:
- timeout_str = '--timeout {}'.format(timeout)
+ self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
- # atomic
- atomic_str = ''
- if atomic:
- atomic_str = '--atomic'
+ # init paths, env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
- # version
- version_str = ''
- if kdu_model and ':' in kdu_model:
- parts = kdu_model.split(sep=':')
- if len(parts) == 2:
- version_str = '--version {}'.format(parts[1])
- kdu_model = parts[0]
-
- # helm repo upgrade
- command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
- .format(self._helm_command, atomic_str, config_filename, helm_dir,
- params_str, timeout_str, kdu_instance, kdu_model, version_str)
- self.debug('upgrading: {}'.format(command))
+ if not namespace:
+ # find namespace for tiller pod
+ command = "{} --kubeconfig={} get deployments --all-namespaces".format(
+ self.kubectl_command, paths["kube_config"]
+ )
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+ output_table = self._output_to_table(output=output)
+ namespace = None
+ for r in output_table:
+ try:
+ if "tiller-deploy" in r[1]:
+ namespace = r[0]
+ break
+ except Exception:
+ pass
+ else:
+ msg = "Tiller deployment not found in cluster {}".format(cluster_id)
+ self.log.error(msg)
- if atomic:
+ self.log.debug("namespace for tiller: {}".format(namespace))
- # exec helm in a task
- exec_task = asyncio.ensure_future(
- coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
+ if namespace:
+ # uninstall tiller from cluster
+ self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
+ command = "{} --kubeconfig={} --home={} reset".format(
+ self._helm_command, paths["kube_config"], paths["helm_dir"]
+ )
+ self.log.debug("resetting: {}".format(command))
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
)
- # write status in another task
- status_task = asyncio.ensure_future(
- coro_or_future=self._store_status(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation='upgrade',
- run_once=False
+ # Delete clusterrolebinding and serviceaccount.
+ # Ignore if errors for backward compatibility
+ command = (
+ "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
+ "io/osm-tiller-cluster-rule"
+ ).format(self.kubectl_command, paths["kube_config"])
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+ command = (
+ "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
+ self.kubectl_command,
+ paths["kube_config"],
+ namespace,
+ self.service_account,
)
)
-
- # wait for execution task
- await asyncio.wait([ exec_task ])
-
- # cancel status task
- status_task.cancel()
- output, rc = exec_task.result()
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
else:
+ self.log.debug("namespace not found")
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
-
- # remove temporal values yaml file
- if file_to_delete:
- os.remove(file_to_delete)
+ async def _instances_list(self, cluster_id):
- # write final status
- await self._store_status(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation='upgrade',
- run_once=True,
- check_every=0
+ # init paths, env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
)
- if rc != 0:
- msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
- self.error(msg)
- raise Exception(msg)
-
- # return new revision number
- instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
- if instance:
- revision = int(instance.get('Revision'))
- self.debug('New revision: {}'.format(revision))
- return revision
- else:
- return 0
-
- async def rollback(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- revision=0,
- db_dict: dict = None
- ):
-
- self.debug('rollback kdu_instance {} to revision {} from cluster {}'
- .format(kdu_instance, revision, cluster_uuid))
-
- # config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ command = "{} list --output yaml".format(self._helm_command)
- command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
- .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
-
- # exec helm in a task
- exec_task = asyncio.ensure_future(
- coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
- )
- # write status in another task
- status_task = asyncio.ensure_future(
- coro_or_future=self._store_status(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation='rollback',
- run_once=False
- )
- )
-
- # wait for execution task
- await asyncio.wait([exec_task])
-
- # cancel status task
- status_task.cancel()
-
- output, rc = exec_task.result()
-
- # write final status
- await self._store_status(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation='rollback',
- run_once=True,
- check_every=0
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True, env=env
)
- if rc != 0:
- msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
- self.error(msg)
- raise Exception(msg)
-
- # return new revision number
- instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
- if instance:
- revision = int(instance.get('Revision'))
- self.debug('New revision: {}'.format(revision))
- return revision
+ if output and len(output) > 0:
+ # parse yaml and update keys to lower case to unify with helm3
+ instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
+ new_instances = []
+ for instance in instances:
+ new_instance = dict((k.lower(), v) for k, v in instance.items())
+ new_instances.append(new_instance)
+ return new_instances
else:
- return 0
-
- async def uninstall(
- self,
- cluster_uuid: str,
- kdu_instance: str
- ):
- """
- Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
- after all _terminate-config-primitive_ of the VNF are invoked).
-
- :param cluster_uuid: UUID of a K8s cluster known by OSM
- :param kdu_instance: unique name for the KDU instance to be deleted
- :return: True if successful
- """
-
- self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
-
- # config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
-
- command = '{} --kubeconfig={} --home={} delete --purge {}'\
- .format(self._helm_command, config_filename, helm_dir, kdu_instance)
-
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
-
- return self._output_to_table(output)
-
- async def inspect_kdu(
- self,
- kdu_model: str
- ) -> str:
-
- self.debug('inspect kdu_model {}'.format(kdu_model))
-
- command = '{} inspect values {}'\
- .format(self._helm_command, kdu_model)
-
- output, rc = await self._local_async_exec(command=command)
-
- return output
-
- async def help_kdu(
- self,
- kdu_model: str
- ):
-
- self.debug('help kdu_model {}'.format(kdu_model))
-
- command = '{} inspect readme {}'\
- .format(self._helm_command, kdu_model)
-
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
-
- return output
+ return []
- async def status_kdu(
- self,
- cluster_uuid: str,
- kdu_instance: str
+ def _get_inspect_command(
+ self, show_command: str, kdu_model: str, repo_str: str, version: str
):
-
- return await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, show_error_log=True)
-
-
- """
- ##################################################################################################
- ########################################## P R I V A T E #########################################
- ##################################################################################################
- """
+ inspect_command = "{} inspect {} {}{} {}".format(
+ self._helm_command, show_command, kdu_model, repo_str, version
+ )
+ return inspect_command
async def _status_kdu(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- show_error_log: bool = False
- ):
-
- self.debug('status of kdu_instance {}'.format(kdu_instance))
-
- # config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
-
- command = '{} --kubeconfig={} --home={} status {} --output yaml'\
- .format(self._helm_command, config_filename, helm_dir, kdu_instance)
+ self,
+ cluster_id: str,
+ kdu_instance: str,
+ namespace: str = None,
+ yaml_format: bool = False,
+ show_error_log: bool = False,
+ ) -> Union[str, dict]:
+
+ self.log.debug(
+ "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
+ )
+ # init config, env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+ command = ("env KUBECONFIG={} {} status {} --output yaml").format(
+ paths["kube_config"], self._helm_command, kdu_instance
+ )
output, rc = await self._local_async_exec(
command=command,
raise_exception_on_error=True,
- show_error_log=show_error_log
+ show_error_log=show_error_log,
+ env=env,
)
+ if yaml_format:
+ return str(output)
+
if rc != 0:
return None
# remove field 'notes'
try:
- del data.get('info').get('status')['notes']
+ del data.get("info").get("status")["notes"]
except KeyError:
pass
+ # parse the manifest to a list of dictionaries
+ if "manifest" in data:
+ manifest_str = data.get("manifest")
+ manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
+
+ data["manifest"] = []
+ for doc in manifest_docs:
+ data["manifest"].append(doc)
+
# parse field 'resources'
try:
- resources = str(data.get('info').get('status').get('resources'))
+ resources = str(data.get("info").get("status").get("resources"))
resource_table = self._output_to_table(resources)
- data.get('info').get('status')['resources'] = resource_table
- except Exception as e:
+ data.get("info").get("status")["resources"] = resource_table
+ except Exception:
pass
- return data
-
- async def get_instance_info(
- self,
- cluster_uuid: str,
- kdu_instance: str
- ):
- instances = await self.instances_list(cluster_uuid=cluster_uuid)
- for instance in instances:
- if instance.get('Name') == kdu_instance:
- return instance
- self.debug('Instance {} not found'.format(kdu_instance))
- return None
-
- @staticmethod
- def _generate_release_name(
- chart_name: str
- ):
- name = ''
- for c in chart_name:
- if c.isalpha() or c.isnumeric():
- name += c
- else:
- name += '-'
- if len(name) > 35:
- name = name[0:35]
-
- # if does not start with alpha character, prefix 'a'
- if not name[0].isalpha():
- name = 'a' + name
-
- name += '-'
-
- def get_random_number():
- r = random.randrange(start=1, stop=99999999)
- s = str(r)
- s = s.rjust(width=10, fillchar=' ')
- return s
+ # set description to lowercase (unify with helm3)
+ try:
+ data.get("info")["description"] = data.get("info").pop("Description")
+ except KeyError:
+ pass
- name = name + get_random_number()
- return name.lower()
+ return data
- async def _store_status(
- self,
- cluster_uuid: str,
- operation: str,
- kdu_instance: str,
- check_every: float = 10,
- db_dict: dict = None,
- run_once: bool = False
- ):
- while True:
- try:
- await asyncio.sleep(check_every)
- detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
- status = detailed_status.get('info').get('Description')
- print('=' * 60)
- self.debug('STATUS:\n{}'.format(status))
- self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
- print('=' * 60)
- # write status to db
- result = await self.write_app_status_to_db(
- db_dict=db_dict,
- status=str(status),
- detailed_status=str(detailed_status),
- operation=operation)
- if not result:
- self.info('Error writing in database. Task exiting...')
- return
- except asyncio.CancelledError:
- self.debug('Task cancelled')
- return
- except Exception as e:
- pass
- finally:
- if run_once:
- return
+ def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
+ repo_ids = []
+ cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
+ cluster = self.db.get_one("k8sclusters", cluster_filter)
+ if cluster:
+ repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
+ return repo_ids
+ else:
+ raise K8sException(
+ "k8cluster with helm-id : {} not found".format(cluster_uuid)
+ )
- async def _is_install_completed(
- self,
- cluster_uuid: str,
- kdu_instance: str
- ) -> bool:
+ async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
+ # init config, env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
- status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+ status = await self._status_kdu(
+ cluster_id=cluster_id, kdu_instance=kdu_instance, yaml_format=False
+ )
# extract info.status.resources-> str
# format:
# halting-horse-mongodb 0/1 1 0 0s
# halting-petit-mongodb 1/1 1 0 0s
# blank line
- resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
+ resources = K8sHelmBaseConnector._get_deep(
+ status, ("info", "status", "resources")
+ )
# convert to table
- resources = K8sHelmConnector._output_to_table(resources)
+ resources = K8sHelmBaseConnector._output_to_table(resources)
num_lines = len(resources)
index = 0
+ ready = True
while index < num_lines:
try:
line1 = resources[index]
index += 1
# find '==>' in column 0
- if line1[0] == '==>':
+ if line1[0] == "==>":
line2 = resources[index]
index += 1
# find READY in column 1
- if line2[1] == 'READY':
+ if line2[1] == "READY":
# read next lines
line3 = resources[index]
index += 1
while len(line3) > 1 and index < num_lines:
ready_value = line3[1]
- parts = ready_value.split(sep='/')
+ parts = ready_value.split(sep="/")
current = int(parts[0])
total = int(parts[1])
if current < total:
- self.debug('NOT READY:\n {}'.format(line3))
+ self.log.debug("NOT READY:\n {}".format(line3))
ready = False
line3 = resources[index]
index += 1
- except Exception as e:
+ except Exception:
pass
return ready
- @staticmethod
- def _get_deep(dictionary: dict, members: tuple):
- target = dictionary
- value = None
- try:
- for m in members:
- value = target.get(m)
- if not value:
- return None
- else:
- target = value
- except Exception as e:
- pass
- return value
-
- # find key:value in several lines
- @staticmethod
- def _find_in_lines(p_lines: list, p_key: str) -> str:
- for line in p_lines:
- try:
- if line.startswith(p_key + ':'):
- parts = line.split(':')
- the_value = parts[1].strip()
- return the_value
- except Exception as e:
- # ignore it
- pass
- return None
-
- # params for use in -f file
- # returns values file option and filename (in order to delete it at the end)
- def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
- params_str = ''
-
- if params and len(params) > 0:
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
-
- def get_random_number():
- r = random.randrange(start=1, stop=99999999)
- s = str(r)
- while len(s) < 10:
- s = '0' + s
- return s
-
- params2 = dict()
- for key in params:
- value = params.get(key)
- if '!!yaml' in str(value):
- value = yaml.safe_load(value[7:])
- params2[key] = value
-
- values_file = get_random_number() + '.yaml'
- with open(values_file, 'w') as stream:
- yaml.dump(params2, stream, indent=4, default_flow_style=False)
-
- return '-f {}'.format(values_file), values_file
-
- return '', None
-
- # params for use in --set option
- @staticmethod
- def _params_to_set_option(params: dict) -> str:
- params_str = ''
- if params and len(params) > 0:
- start = True
- for key in params:
- value = params.get(key, None)
- if value is not None:
- if start:
- params_str += '--set '
- start = False
- else:
- params_str += ','
- params_str += '{}={}'.format(key, value)
- return params_str
-
- @staticmethod
- def _output_to_lines(output: str) -> list:
- output_lines = list()
- lines = output.splitlines(keepends=False)
- for line in lines:
- line = line.strip()
- if len(line) > 0:
- output_lines.append(line)
- return output_lines
-
- @staticmethod
- def _output_to_table(output: str) -> list:
- output_table = list()
- lines = output.splitlines(keepends=False)
- for line in lines:
- line = line.replace('\t', ' ')
- line_list = list()
- output_table.append(line_list)
- cells = line.split(sep=' ')
- for cell in cells:
- cell = cell.strip()
- if len(cell) > 0:
- line_list.append(cell)
- return output_table
-
- def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
- """
- Returns kube and helm directories
-
- :param cluster_name:
- :param create_if_not_exist:
- :return: kube, helm directories, config filename and cluster dir.
- Raises exception if not exist and cannot create
- """
-
- base = self.fs.path
- if base.endswith("/") or base.endswith("\\"):
- base = base[:-1]
-
- # base dir for cluster
- cluster_dir = base + '/' + cluster_name
- if create_if_not_exist and not os.path.exists(cluster_dir):
- self.debug('Creating dir {}'.format(cluster_dir))
- os.makedirs(cluster_dir)
- if not os.path.exists(cluster_dir):
- msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
- self.error(msg)
- raise Exception(msg)
-
- # kube dir
- kube_dir = cluster_dir + '/' + '.kube'
- if create_if_not_exist and not os.path.exists(kube_dir):
- self.debug('Creating dir {}'.format(kube_dir))
- os.makedirs(kube_dir)
- if not os.path.exists(kube_dir):
- msg = 'Kube config dir {} does not exist'.format(kube_dir)
- self.error(msg)
- raise Exception(msg)
-
- # helm home dir
- helm_dir = cluster_dir + '/' + '.helm'
- if create_if_not_exist and not os.path.exists(helm_dir):
- self.debug('Creating dir {}'.format(helm_dir))
- os.makedirs(helm_dir)
- if not os.path.exists(helm_dir):
- msg = 'Helm config dir {} does not exist'.format(helm_dir)
- self.error(msg)
- raise Exception(msg)
-
- config_filename = kube_dir + '/config'
- return kube_dir, helm_dir, config_filename, cluster_dir
-
- @staticmethod
- def _remove_multiple_spaces(str):
- str = str.strip()
- while ' ' in str:
- str = str.replace(' ', ' ')
- return str
-
- def _local_exec(
- self,
- command: str
- ) -> (str, int):
- command = K8sHelmConnector._remove_multiple_spaces(command)
- self.debug('Executing sync local command: {}'.format(command))
- # raise exception if fails
- output = ''
- try:
- output = subprocess.check_output(command, shell=True, universal_newlines=True)
- return_code = 0
- self.debug(output)
- except Exception as e:
- return_code = 1
-
- return output, return_code
-
- async def _local_async_exec(
- self,
- command: str,
- raise_exception_on_error: bool = False,
- show_error_log: bool = True
- ) -> (str, int):
+ def _get_install_command(
+ self,
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ kubeconfig,
+ ) -> str:
- command = K8sHelmConnector._remove_multiple_spaces(command)
- self.debug('Executing async local command: {}'.format(command))
+ timeout_str = ""
+ if timeout:
+ timeout_str = "--timeout {}".format(timeout)
- # split command
- command = command.split(sep=' ')
+ # atomic
+ atomic_str = ""
+ if atomic:
+ atomic_str = "--atomic"
+ # namespace
+ namespace_str = ""
+ if namespace:
+ namespace_str = "--namespace {}".format(namespace)
- try:
- process = await asyncio.create_subprocess_exec(
- *command,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE
+ # version
+ version_str = ""
+ if version:
+ version_str = version_str = "--version {}".format(version)
+
+ command = (
+ "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
+ "{params} {timeout} --name={name} {ns} {model} {ver}".format(
+ kubeconfig=kubeconfig,
+ helm=self._helm_command,
+ atomic=atomic_str,
+ params=params_str,
+ timeout=timeout_str,
+ name=kdu_instance,
+ ns=namespace_str,
+ model=kdu_model,
+ ver=version_str,
)
+ )
+ return command
+
+ def _get_upgrade_command(
+ self,
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ kubeconfig,
+ ) -> str:
- # wait for command terminate
- stdout, stderr = await process.communicate()
-
- return_code = process.returncode
-
- output = ''
- if stdout:
- output = stdout.decode('utf-8').strip()
- if stderr:
- output = stderr.decode('utf-8').strip()
-
- if return_code != 0 and show_error_log:
- self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
- else:
- self.debug('Return code: {}'.format(return_code))
-
- if raise_exception_on_error and return_code != 0:
- raise Exception(output)
-
- return output, return_code
-
- except Exception as e:
- msg = 'Exception executing command: {} -> {}'.format(command, e)
- if show_error_log:
- self.error(msg)
- return '', -1
+ timeout_str = ""
+ if timeout:
+ timeout_str = "--timeout {}".format(timeout)
- def _remote_exec(
- self,
- hostname: str,
- username: str,
- password: str,
- command: str,
- timeout: int = 10
- ) -> (str, int):
-
- command = K8sHelmConnector._remove_multiple_spaces(command)
- self.debug('Executing sync remote ssh command: {}'.format(command))
-
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.connect(hostname=hostname, username=username, password=password)
- ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
- output = ssh_stdout.read().decode('utf-8')
- error = ssh_stderr.read().decode('utf-8')
- if error:
- self.error('ERROR: {}'.format(error))
- return_code = 1
- else:
- return_code = 0
- output = output.replace('\\n', '\n')
- self.debug('OUTPUT: {}'.format(output))
+ # atomic
+ atomic_str = ""
+ if atomic:
+ atomic_str = "--atomic"
- return output, return_code
+ # version
+ version_str = ""
+ if version:
+ version_str = "--version {}".format(version)
+
+ command = (
+ "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
+ ).format(
+ kubeconfig=kubeconfig,
+ helm=self._helm_command,
+ atomic=atomic_str,
+ params=params_str,
+ timeout=timeout_str,
+ name=kdu_instance,
+ model=kdu_model,
+ ver=version_str,
+ )
+ return command
- def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
- self.debug('Checking if file {} exists...'.format(filename))
- if os.path.exists(filename):
- return True
- else:
- msg = 'File {} does not exist'.format(filename)
- if exception_if_not_exists:
- self.error(msg)
- raise Exception(msg)
+ def _get_rollback_command(
+ self, kdu_instance, namespace, revision, kubeconfig
+ ) -> str:
+ return "env KUBECONFIG={} {} rollback {} {} --wait".format(
+ kubeconfig, self._helm_command, kdu_instance, revision
+ )
+ def _get_uninstall_command(
+ self, kdu_instance: str, namespace: str, kubeconfig: str
+ ) -> str:
+ return "env KUBECONFIG={} {} delete --purge {}".format(
+ kubeconfig, self._helm_command, kdu_instance
+ )