# contact with: nfvlabs@tid.es
##
-import paramiko
import subprocess
import os
import shutil
from uuid import uuid4
import random
from n2vc.k8s_conn import K8sConnector
+from n2vc.exceptions import K8sException
class K8sHelmConnector(K8sConnector):
namespace: str = 'kube-system',
reuse_cluster_uuid=None
) -> (str, bool):
+ """
+ It prepares a given K8s cluster environment to run Charts on both sides:
+ client (OSM)
+ server (Tiller)
+
+ :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
+ :param namespace: optional namespace to be used for helm. By default, 'kube-system' will be used
+ :param reuse_cluster_uuid: existing cluster uuid for reuse
+ :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
+ (on error, an exception will be raised)
+ """
cluster_uuid = reuse_cluster_uuid
if not cluster_uuid:
msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
.format(cluster_uuid)
self.error(msg)
- raise Exception(msg)
+ raise K8sException(msg)
if uninstall_sw:
else:
msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
self.error(msg)
- # raise Exception(msg)
self.debug('namespace for tiller: {}'.format(namespace))
atomic: bool = True,
timeout: float = 300,
params: dict = None,
- db_dict: dict = None
+ db_dict: dict = None,
+ kdu_name: str = None
):
self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
version_str = '--version {}'.format(parts[1])
kdu_model = parts[0]
- # generate a name for the releas. Then, check if already exists
+ # generate a name for the release. Then, check if already exists
kdu_instance = None
while kdu_instance is None:
kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
if result is not None:
# instance already exists: generate a new one
kdu_instance = None
- except Exception as e:
- kdu_instance = None
+ except K8sException:
+ pass
# helm repo install
command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
exec_task = asyncio.ensure_future(
coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
)
+
# write status in another task
status_task = asyncio.ensure_future(
coro_or_future=self._store_status(
if rc != 0:
msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
self.error(msg)
- raise Exception(msg)
+ raise K8sException(msg)
self.debug('Returning kdu_instance {}'.format(kdu_instance))
return kdu_instance
if rc != 0:
msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
self.error(msg)
- raise Exception(msg)
+ raise K8sException(msg)
# return new revision number
instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
if rc != 0:
msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
self.error(msg)
- raise Exception(msg)
+ raise K8sException(msg)
# return new revision number
instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
return_text=True
)
+ async def synchronize_repos(self, cluster_uuid: str):
+
+ self.debug("syncronize repos for cluster helm-id: {}",)
+ try:
+ update_repos_timeout = 300 # max timeout to sync a single repos, more than this is too much
+ db_k8scluster = self.db.get_one("k8sclusters", {"_admin.helm-chart.id": cluster_uuid})
+ if db_k8scluster:
+ nbi_repo_list = db_k8scluster.get("_admin").get("helm_chart_repos") or []
+ cluster_repo_dict = db_k8scluster.get("_admin").get("helm_charts_added") or {}
+ # elements that must be deleted
+ deleted_repo_list = []
+ added_repo_dict = {}
+ self.debug("helm_chart_repos: {}".format(nbi_repo_list))
+ self.debug("helm_charts_added: {}".format(cluster_repo_dict))
+
+ # obtain repos to add: registered by nbi but not added
+ repos_to_add = [repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)]
+
+ # obtain repos to delete: added by cluster but not in nbi list
+ repos_to_delete = [repo for repo in cluster_repo_dict.keys() if repo not in nbi_repo_list]
+
+ # delete repos: must delete first then add because there may be different repos with same name but
+ # different id and url
+ self.debug("repos to delete: {}".format(repos_to_delete))
+ for repo_id in repos_to_delete:
+ # try to delete repos
+ try:
+ repo_delete_task = asyncio.ensure_future(self.repo_remove(cluster_uuid=cluster_uuid,
+ name=cluster_repo_dict[repo_id]))
+ await asyncio.wait_for(repo_delete_task, update_repos_timeout)
+ except Exception as e:
+ self.warning("Error deleting repo, id: {}, name: {}, err_msg: {}".format(repo_id,
+ cluster_repo_dict[repo_id], str(e)))
+ # always add to the list of to_delete if there is an error because if is not there deleting raises error
+ deleted_repo_list.append(repo_id)
+
+ # add repos
+ self.debug("repos to add: {}".format(repos_to_add))
+ add_task_list = []
+ for repo_id in repos_to_add:
+ # obtain the repo data from the db
+ # if there is an error getting the repo in the database we will ignore this repo and continue
+ # because there is a possible race condition where the repo has been deleted while processing
+ db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
+ self.debug("obtained repo: id, {}, name: {}, url: {}".format(repo_id, db_repo["name"], db_repo["url"]))
+ try:
+ repo_add_task = asyncio.ensure_future(self.repo_add(cluster_uuid=cluster_uuid,
+ name=db_repo["name"], url=db_repo["url"],
+ repo_type="chart"))
+ await asyncio.wait_for(repo_add_task, update_repos_timeout)
+ added_repo_dict[repo_id] = db_repo["name"]
+ self.debug("added repo: id, {}, name: {}".format(repo_id, db_repo["name"]))
+ except Exception as e:
+ # deal with error adding repo, adding a repo that already exists does not raise any error
+ # will not raise error because a wrong repos added by anyone could prevent instantiating any ns
+ self.error("Error adding repo id: {}, err_msg: {} ".format(repo_id, repr(e)))
+
+ return deleted_repo_list, added_repo_dict
+
+ else: # else db_k8scluster does not exist
+ raise K8sException("k8cluster with helm-id : {} not found".format(cluster_uuid))
+
+ except Exception as e:
+ self.error("Error synchronizing repos: {}".format(str(e)))
+ raise K8sException("Error synchronizing repos")
+
"""
##################################################################################################
########################################## P R I V A T E #########################################
def _generate_release_name(
chart_name: str
):
+ # check embeded chart (file or dir)
+ if chart_name.startswith('/'):
+ # extract file or directory name
+ chart_name = chart_name[chart_name.rfind('/')+1:]
+ # check URL
+ elif '://' in chart_name:
+ # extract last portion of URL
+ chart_name = chart_name[chart_name.rfind('/')+1:]
+
name = ''
for c in chart_name:
if c.isalpha() or c.isnumeric():
def get_random_number():
r = random.randrange(start=1, stop=99999999)
s = str(r)
- s = s.rjust(width=10, fillchar=' ')
+ s = s.rjust(10, '0')
return s
name = name + get_random_number()
await asyncio.sleep(check_every)
detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
status = detailed_status.get('info').get('Description')
- print('=' * 60)
self.debug('STATUS:\n{}'.format(status))
self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
- print('=' * 60)
# write status to db
result = await self.write_app_status_to_db(
db_dict=db_dict,
self.debug('Task cancelled')
return
except Exception as e:
+ self.debug('_store_status exception: {}'.format(str(e)))
pass
finally:
if run_once:
if not os.path.exists(cluster_dir):
msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
self.error(msg)
- raise Exception(msg)
+ raise K8sException(msg)
# kube dir
kube_dir = cluster_dir + '/' + '.kube'
if not os.path.exists(kube_dir):
msg = 'Kube config dir {} does not exist'.format(kube_dir)
self.error(msg)
- raise Exception(msg)
+ raise K8sException(msg)
# helm home dir
helm_dir = cluster_dir + '/' + '.helm'
if not os.path.exists(helm_dir):
msg = 'Helm config dir {} does not exist'.format(helm_dir)
self.error(msg)
- raise Exception(msg)
+ raise K8sException(msg)
config_filename = kube_dir + '/config'
return kube_dir, helm_dir, config_filename, cluster_dir
self.debug('Return code: {}'.format(return_code))
if raise_exception_on_error and return_code != 0:
- raise Exception(output)
+ raise K8sException(output)
if encode_utf8:
output = output.encode('utf-8').strip()
return output, return_code
+ except asyncio.CancelledError:
+ raise
+ except K8sException:
+ raise
except Exception as e:
msg = 'Exception executing command: {} -> {}'.format(command, e)
- if show_error_log:
- self.error(msg)
- return '', -1
-
- def _remote_exec(
- self,
- hostname: str,
- username: str,
- password: str,
- command: str,
- timeout: int = 10
- ) -> (str, int):
-
- command = K8sHelmConnector._remove_multiple_spaces(command)
- self.debug('Executing sync remote ssh command: {}'.format(command))
-
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.connect(hostname=hostname, username=username, password=password)
- ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
- output = ssh_stdout.read().decode('utf-8')
- error = ssh_stderr.read().decode('utf-8')
- if error:
- self.error('ERROR: {}'.format(error))
- return_code = 1
- else:
- return_code = 0
- output = output.replace('\\n', '\n')
- self.debug('OUTPUT: {}'.format(output))
-
- return output, return_code
+ self.error(msg)
+ if raise_exception_on_error:
+ raise K8sException(e) from e
+ else:
+ return '', -1
def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
self.debug('Checking if file {} exists...'.format(filename))
msg = 'File {} does not exist'.format(filename)
if exception_if_not_exists:
self.error(msg)
- raise Exception(msg)
+ raise K8sException(msg)
+
+