on_update_db=on_update_db
)
- self.info('Initializing K8S Helm connector')
+ self.log.info('Initializing K8S Helm connector')
# random numbers for release name generation
random.seed(time.time())
self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
# initialize helm client-only
- self.debug('Initializing helm client-only...')
+ self.log.debug('Initializing helm client-only...')
command = '{} init --client-only'.format(self._helm_command)
try:
asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False))
except Exception as e:
self.warning(msg='helm init failed (it was already initialized): {}'.format(e))
- self.info('K8S Helm connector initialized')
+ self.log.info('K8S Helm connector initialized')
async def init_env(
self,
if not cluster_uuid:
cluster_uuid = str(uuid4())
- self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
+ self.log.debug('Initializing K8S environment. namespace: {}'.format(namespace))
# create config filename
kube_dir, helm_dir, config_filename, cluster_dir = \
# helm init
n2vc_installed_sw = False
if not already_initialized:
- self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
+ self.log.info('Initializing helm in client and server: {}'.format(cluster_uuid))
command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
.format(self._helm_command, config_filename, namespace, helm_dir)
output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
# check client helm installation
check_file = helm_dir + '/repository/repositories.yaml'
if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
- self.info('Initializing helm in client: {}'.format(cluster_uuid))
+ self.log.info('Initializing helm in client: {}'.format(cluster_uuid))
command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
.format(self._helm_command, config_filename, namespace, helm_dir)
output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
else:
- self.info('Helm client already initialized')
+ self.log.info('Helm client already initialized')
- self.info('Cluster initialized {}'.format(cluster_uuid))
+ self.log.info('Cluster initialized {}'.format(cluster_uuid))
return cluster_uuid, n2vc_installed_sw
repo_type: str = 'chart'
):
- self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
+ self.log.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
# config filename
kube_dir, helm_dir, config_filename, cluster_dir = \
# helm repo update
command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
- self.debug('updating repo: {}'.format(command))
+ self.log.debug('updating repo: {}'.format(command))
await self._local_async_exec(command=command, raise_exception_on_error=False)
# helm repo add name url
command = '{} --kubeconfig={} --home={} repo add {} {}'\
.format(self._helm_command, config_filename, helm_dir, name, url)
- self.debug('adding repo: {}'.format(command))
+ self.log.debug('adding repo: {}'.format(command))
await self._local_async_exec(command=command, raise_exception_on_error=True)
async def repo_list(
:return: list of registered repositories: [ (name, url) .... ]
"""
- self.debug('list repositories for cluster {}'.format(cluster_uuid))
+ self.log.debug('list repositories for cluster {}'.format(cluster_uuid))
# config filename
kube_dir, helm_dir, config_filename, cluster_dir = \
:return: True if successful
"""
- self.debug('list repositories for cluster {}'.format(cluster_uuid))
+ self.log.debug('list repositories for cluster {}'.format(cluster_uuid))
# config filename
kube_dir, helm_dir, config_filename, cluster_dir = \
uninstall_sw: bool = False
) -> bool:
- self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
+ self.log.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
# get kube and helm directories
kube_dir, helm_dir, config_filename, cluster_dir = \
try:
kdu_instance = r.get('Name')
chart = r.get('Chart')
- self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
+ self.log.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
except Exception as e:
- self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
+ self.log.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
else:
msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
.format(cluster_uuid)
- self.error(msg)
+ self.log.error(msg)
raise K8sException(msg)
if uninstall_sw:
- self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
+ self.log.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
# find namespace for tiller pod
command = '{} --kubeconfig={} get deployments --all-namespaces'\
pass
else:
msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
- self.error(msg)
+ self.log.error(msg)
- self.debug('namespace for tiller: {}'.format(namespace))
+ self.log.debug('namespace for tiller: {}'.format(namespace))
force_str = '--force'
if namespace:
# delete tiller deployment
- self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
+ self.log.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
.format(self.kubectl_command, namespace, config_filename, force_str)
await self._local_async_exec(command=command, raise_exception_on_error=False)
# uninstall tiller from cluster
- self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
+ self.log.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
command = '{} --kubeconfig={} --home={} reset'\
.format(self._helm_command, config_filename, helm_dir)
- self.debug('resetting: {}'.format(command))
+ self.log.debug('resetting: {}'.format(command))
output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
else:
- self.debug('namespace not found')
+ self.log.debug('namespace not found')
# delete cluster directory
dir = self.fs.path + '/' + cluster_uuid
- self.debug('Removing directory {}'.format(dir))
+ self.log.debug('Removing directory {}'.format(dir))
shutil.rmtree(dir, ignore_errors=True)
return True
kdu_name: str = None
):
- self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
+ self.log.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
# config filename
kube_dir, helm_dir, config_filename, cluster_dir = \
command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
.format(self._helm_command, atomic_str, config_filename, helm_dir,
params_str, timeout_str, kdu_instance, kdu_model, version_str)
- self.debug('installing: {}'.format(command))
+ self.log.debug('installing: {}'.format(command))
if atomic:
# exec helm in a task
exec_task = asyncio.ensure_future(
coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
)
+
# write status in another task
status_task = asyncio.ensure_future(
coro_or_future=self._store_status(
if rc != 0:
msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
- self.error(msg)
+ self.log.error(msg)
raise K8sException(msg)
- self.debug('Returning kdu_instance {}'.format(kdu_instance))
+ self.log.debug('Returning kdu_instance {}'.format(kdu_instance))
return kdu_instance
async def instances_list(
:return:
"""
- self.debug('list releases for cluster {}'.format(cluster_uuid))
+ self.log.debug('list releases for cluster {}'.format(cluster_uuid))
# config filename
kube_dir, helm_dir, config_filename, cluster_dir = \
db_dict: dict = None
):
- self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
+ self.log.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
# config filename
kube_dir, helm_dir, config_filename, cluster_dir = \
command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
.format(self._helm_command, atomic_str, config_filename, helm_dir,
params_str, timeout_str, kdu_instance, kdu_model, version_str)
- self.debug('upgrading: {}'.format(command))
+ self.log.debug('upgrading: {}'.format(command))
if atomic:
if rc != 0:
msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
- self.error(msg)
+ self.log.error(msg)
raise K8sException(msg)
# return new revision number
instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
if instance:
revision = int(instance.get('Revision'))
- self.debug('New revision: {}'.format(revision))
+ self.log.debug('New revision: {}'.format(revision))
return revision
else:
return 0
db_dict: dict = None
):
- self.debug('rollback kdu_instance {} to revision {} from cluster {}'
+ self.log.debug('rollback kdu_instance {} to revision {} from cluster {}'
.format(kdu_instance, revision, cluster_uuid))
# config filename
if rc != 0:
msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
- self.error(msg)
+ self.log.error(msg)
raise K8sException(msg)
# return new revision number
instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
if instance:
revision = int(instance.get('Revision'))
- self.debug('New revision: {}'.format(revision))
+ self.log.debug('New revision: {}'.format(revision))
return revision
else:
return 0
:return: True if successful
"""
- self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
+ self.log.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
# config filename
kube_dir, helm_dir, config_filename, cluster_dir = \
repo_url: str = None
) -> str:
- self.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url))
+ self.log.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url))
return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url)
repo_url: str = None
) -> str:
- self.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url))
+ self.log.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url))
return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url)
repo_url: str = None
) -> str:
- self.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url))
+ self.log.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url))
return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url)
return_text=True
)
+ async def synchronize_repos(self, cluster_uuid: str):
+
+ self.log.debug("syncronize repos for cluster helm-id: {}",)
+ try:
+ update_repos_timeout = 300 # max timeout to sync a single repos, more than this is too much
+ db_k8scluster = self.db.get_one("k8sclusters", {"_admin.helm-chart.id": cluster_uuid})
+ if db_k8scluster:
+ nbi_repo_list = db_k8scluster.get("_admin").get("helm_chart_repos") or []
+ cluster_repo_dict = db_k8scluster.get("_admin").get("helm_charts_added") or {}
+ # elements that must be deleted
+ deleted_repo_list = []
+ added_repo_dict = {}
+ self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
+ self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
+
+ # obtain repos to add: registered by nbi but not added
+ repos_to_add = [repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)]
+
+ # obtain repos to delete: added by cluster but not in nbi list
+ repos_to_delete = [repo for repo in cluster_repo_dict.keys() if repo not in nbi_repo_list]
+
+ # delete repos: must delete first then add because there may be different repos with same name but
+ # different id and url
+ self.log.debug("repos to delete: {}".format(repos_to_delete))
+ for repo_id in repos_to_delete:
+ # try to delete repos
+ try:
+ repo_delete_task = asyncio.ensure_future(self.repo_remove(cluster_uuid=cluster_uuid,
+ name=cluster_repo_dict[repo_id]))
+ await asyncio.wait_for(repo_delete_task, update_repos_timeout)
+ except Exception as e:
+ self.warning("Error deleting repo, id: {}, name: {}, err_msg: {}".format(repo_id,
+ cluster_repo_dict[repo_id], str(e)))
+ # always add to the list of to_delete if there is an error because if is not there deleting raises error
+ deleted_repo_list.append(repo_id)
+
+ # add repos
+ self.log.debug("repos to add: {}".format(repos_to_add))
+ add_task_list = []
+ for repo_id in repos_to_add:
+ # obtain the repo data from the db
+ # if there is an error getting the repo in the database we will ignore this repo and continue
+ # because there is a possible race condition where the repo has been deleted while processing
+ db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
+ self.log.debug("obtained repo: id, {}, name: {}, url: {}".format(repo_id, db_repo["name"], db_repo["url"]))
+ try:
+ repo_add_task = asyncio.ensure_future(self.repo_add(cluster_uuid=cluster_uuid,
+ name=db_repo["name"], url=db_repo["url"],
+ repo_type="chart"))
+ await asyncio.wait_for(repo_add_task, update_repos_timeout)
+ added_repo_dict[repo_id] = db_repo["name"]
+ self.log.debug("added repo: id, {}, name: {}".format(repo_id, db_repo["name"]))
+ except Exception as e:
+ # deal with error adding repo, adding a repo that already exists does not raise any error
+ # will not raise error because a wrong repos added by anyone could prevent instantiating any ns
+ self.log.error("Error adding repo id: {}, err_msg: {} ".format(repo_id, repr(e)))
+
+ return deleted_repo_list, added_repo_dict
+
+ else: # else db_k8scluster does not exist
+ raise K8sException("k8cluster with helm-id : {} not found".format(cluster_uuid))
+
+ except Exception as e:
+ self.log.error("Error synchronizing repos: {}".format(str(e)))
+ raise K8sException("Error synchronizing repos")
+
"""
##################################################################################################
########################################## P R I V A T E #########################################
return_text: bool = False
):
- self.debug('status of kdu_instance {}'.format(kdu_instance))
+ self.log.debug('status of kdu_instance {}'.format(kdu_instance))
# config filename
kube_dir, helm_dir, config_filename, cluster_dir = \
for instance in instances:
if instance.get('Name') == kdu_instance:
return instance
- self.debug('Instance {} not found'.format(kdu_instance))
+ self.log.debug('Instance {} not found'.format(kdu_instance))
return None
@staticmethod
await asyncio.sleep(check_every)
detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
status = detailed_status.get('info').get('Description')
- print('=' * 60)
- self.debug('STATUS:\n{}'.format(status))
- self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
- print('=' * 60)
+ self.log.debug('STATUS:\n{}'.format(status))
+ self.log.debug('DETAILED STATUS:\n{}'.format(detailed_status))
# write status to db
result = await self.write_app_status_to_db(
db_dict=db_dict,
detailed_status=str(detailed_status),
operation=operation)
if not result:
- self.info('Error writing in database. Task exiting...')
+ self.log.info('Error writing in database. Task exiting...')
return
except asyncio.CancelledError:
- self.debug('Task cancelled')
+ self.log.debug('Task cancelled')
return
except Exception as e:
+ self.log.debug('_store_status exception: {}'.format(str(e)))
pass
finally:
if run_once:
current = int(parts[0])
total = int(parts[1])
if current < total:
- self.debug('NOT READY:\n {}'.format(line3))
+ self.log.debug('NOT READY:\n {}'.format(line3))
ready = False
line3 = resources[index]
index += 1
# base dir for cluster
cluster_dir = base + '/' + cluster_name
if create_if_not_exist and not os.path.exists(cluster_dir):
- self.debug('Creating dir {}'.format(cluster_dir))
+ self.log.debug('Creating dir {}'.format(cluster_dir))
os.makedirs(cluster_dir)
if not os.path.exists(cluster_dir):
msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
- self.error(msg)
+ self.log.error(msg)
raise K8sException(msg)
# kube dir
kube_dir = cluster_dir + '/' + '.kube'
if create_if_not_exist and not os.path.exists(kube_dir):
- self.debug('Creating dir {}'.format(kube_dir))
+ self.log.debug('Creating dir {}'.format(kube_dir))
os.makedirs(kube_dir)
if not os.path.exists(kube_dir):
msg = 'Kube config dir {} does not exist'.format(kube_dir)
- self.error(msg)
+ self.log.error(msg)
raise K8sException(msg)
# helm home dir
helm_dir = cluster_dir + '/' + '.helm'
if create_if_not_exist and not os.path.exists(helm_dir):
- self.debug('Creating dir {}'.format(helm_dir))
+ self.log.debug('Creating dir {}'.format(helm_dir))
os.makedirs(helm_dir)
if not os.path.exists(helm_dir):
msg = 'Helm config dir {} does not exist'.format(helm_dir)
- self.error(msg)
+ self.log.error(msg)
raise K8sException(msg)
config_filename = kube_dir + '/config'
command: str
) -> (str, int):
command = K8sHelmConnector._remove_multiple_spaces(command)
- self.debug('Executing sync local command: {}'.format(command))
+ self.log.debug('Executing sync local command: {}'.format(command))
# raise exception if fails
output = ''
try:
output = subprocess.check_output(command, shell=True, universal_newlines=True)
return_code = 0
- self.debug(output)
+ self.log.debug(output)
except Exception as e:
return_code = 1
) -> (str, int):
command = K8sHelmConnector._remove_multiple_spaces(command)
- self.debug('Executing async local command: {}'.format(command))
+ self.log.debug('Executing async local command: {}'.format(command))
# split command
command = command.split(sep=' ')
# output = stderr.decode()
if return_code != 0 and show_error_log:
- self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
+ self.log.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
else:
- self.debug('Return code: {}'.format(return_code))
+ self.log.debug('Return code: {}'.format(return_code))
if raise_exception_on_error and return_code != 0:
raise K8sException(output)
return output, return_code
+ except asyncio.CancelledError:
+ raise
except K8sException:
raise
except Exception as e:
msg = 'Exception executing command: {} -> {}'.format(command, e)
- self.error(msg)
+ self.log.error(msg)
if raise_exception_on_error:
raise K8sException(e) from e
else:
return '', -1
def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
- self.debug('Checking if file {} exists...'.format(filename))
+ self.log.debug('Checking if file {} exists...'.format(filename))
if os.path.exists(filename):
return True
else:
msg = 'File {} does not exist'.format(filename)
if exception_if_not_exists:
- self.error(msg)
+ self.log.error(msg)
raise K8sException(msg)
+
+