X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_helm_conn.py;h=ff1f33112c1629fa2294853b9c3be92c01b52de5;hp=496e320d811a10ed6ecc3c00340072f3a33ba1e5;hb=a599018e3a1406c653bacf5ee636d5601d21dade;hpb=f00dcaeedafa3a57735abfd9c16515fe5986f41f diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py index 496e320..ff1f331 100644 --- a/n2vc/k8s_helm_conn.py +++ b/n2vc/k8s_helm_conn.py @@ -67,7 +67,7 @@ class K8sHelmConnector(K8sConnector): on_update_db=on_update_db ) - self.info('Initializing K8S Helm connector') + self.log.info('Initializing K8S Helm connector') # random numbers for release name generation random.seed(time.time()) @@ -84,7 +84,7 @@ class K8sHelmConnector(K8sConnector): self._check_file_exists(filename=helm_command, exception_if_not_exists=True) # initialize helm client-only - self.debug('Initializing helm client-only...') + self.log.debug('Initializing helm client-only...') command = '{} init --client-only'.format(self._helm_command) try: asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False)) @@ -93,7 +93,7 @@ class K8sHelmConnector(K8sConnector): except Exception as e: self.warning(msg='helm init failed (it was already initialized): {}'.format(e)) - self.info('K8S Helm connector initialized') + self.log.info('K8S Helm connector initialized') async def init_env( self, @@ -117,7 +117,7 @@ class K8sHelmConnector(K8sConnector): if not cluster_uuid: cluster_uuid = str(uuid4()) - self.debug('Initializing K8S environment. namespace: {}'.format(namespace)) + self.log.debug('Initializing K8S environment. namespace: {}'.format(namespace)) # create config filename kube_dir, helm_dir, config_filename, cluster_dir = \ @@ -146,7 +146,7 @@ class K8sHelmConnector(K8sConnector): # helm init n2vc_installed_sw = False if not already_initialized: - self.info('Initializing helm in client and server: {}'.format(cluster_uuid)) + self.log.info('Initializing helm in client and server: {}'.format(cluster_uuid)) command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\ .format(self._helm_command, config_filename, namespace, helm_dir) output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) @@ -155,14 +155,14 @@ class K8sHelmConnector(K8sConnector): # check client helm installation check_file = helm_dir + '/repository/repositories.yaml' if not self._check_file_exists(filename=check_file, exception_if_not_exists=False): - self.info('Initializing helm in client: {}'.format(cluster_uuid)) + self.log.info('Initializing helm in client: {}'.format(cluster_uuid)) command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\ .format(self._helm_command, config_filename, namespace, helm_dir) output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) else: - self.info('Helm client already initialized') + self.log.info('Helm client already initialized') - self.info('Cluster initialized {}'.format(cluster_uuid)) + self.log.info('Cluster initialized {}'.format(cluster_uuid)) return cluster_uuid, n2vc_installed_sw @@ -174,7 +174,7 @@ class K8sHelmConnector(K8sConnector): repo_type: str = 'chart' ): - self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url)) + self.log.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url)) # config filename kube_dir, helm_dir, config_filename, cluster_dir = \ @@ -182,13 +182,13 @@ class K8sHelmConnector(K8sConnector): # helm repo update command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir) - self.debug('updating repo: {}'.format(command)) + self.log.debug('updating repo: {}'.format(command)) await self._local_async_exec(command=command, raise_exception_on_error=False) # helm repo add name url command = '{} --kubeconfig={} --home={} repo add {} {}'\ .format(self._helm_command, config_filename, helm_dir, name, url) - self.debug('adding repo: {}'.format(command)) + self.log.debug('adding repo: {}'.format(command)) await self._local_async_exec(command=command, raise_exception_on_error=True) async def repo_list( @@ -201,7 +201,7 @@ class K8sHelmConnector(K8sConnector): :return: list of registered repositories: [ (name, url) .... ] """ - self.debug('list repositories for cluster {}'.format(cluster_uuid)) + self.log.debug('list repositories for cluster {}'.format(cluster_uuid)) # config filename kube_dir, helm_dir, config_filename, cluster_dir = \ @@ -229,7 +229,7 @@ class K8sHelmConnector(K8sConnector): :return: True if successful """ - self.debug('list repositories for cluster {}'.format(cluster_uuid)) + self.log.debug('list repositories for cluster {}'.format(cluster_uuid)) # config filename kube_dir, helm_dir, config_filename, cluster_dir = \ @@ -247,7 +247,7 @@ class K8sHelmConnector(K8sConnector): uninstall_sw: bool = False ) -> bool: - self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid)) + self.log.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid)) # get kube and helm directories kube_dir, helm_dir, config_filename, cluster_dir = \ @@ -261,19 +261,19 @@ class K8sHelmConnector(K8sConnector): try: kdu_instance = r.get('Name') chart = r.get('Chart') - self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance)) + self.log.debug('Uninstalling {} -> {}'.format(chart, kdu_instance)) await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) except Exception as e: - self.error('Error uninstalling release {}: {}'.format(kdu_instance, e)) + self.log.error('Error uninstalling release {}: {}'.format(kdu_instance, e)) else: msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\ .format(cluster_uuid) - self.error(msg) + self.log.error(msg) raise K8sException(msg) if uninstall_sw: - self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid)) + self.log.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid)) # find namespace for tiller pod command = '{} --kubeconfig={} get deployments --all-namespaces'\ @@ -290,31 +290,31 @@ class K8sHelmConnector(K8sConnector): pass else: msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid) - self.error(msg) + self.log.error(msg) - self.debug('namespace for tiller: {}'.format(namespace)) + self.log.debug('namespace for tiller: {}'.format(namespace)) force_str = '--force' if namespace: # delete tiller deployment - self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace)) + self.log.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace)) command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\ .format(self.kubectl_command, namespace, config_filename, force_str) await self._local_async_exec(command=command, raise_exception_on_error=False) # uninstall tiller from cluster - self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid)) + self.log.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid)) command = '{} --kubeconfig={} --home={} reset'\ .format(self._helm_command, config_filename, helm_dir) - self.debug('resetting: {}'.format(command)) + self.log.debug('resetting: {}'.format(command)) output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) else: - self.debug('namespace not found') + self.log.debug('namespace not found') # delete cluster directory dir = self.fs.path + '/' + cluster_uuid - self.debug('Removing directory {}'.format(dir)) + self.log.debug('Removing directory {}'.format(dir)) shutil.rmtree(dir, ignore_errors=True) return True @@ -330,7 +330,7 @@ class K8sHelmConnector(K8sConnector): kdu_name: str = None ): - self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid)) + self.log.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid)) # config filename kube_dir, helm_dir, config_filename, cluster_dir = \ @@ -377,7 +377,7 @@ class K8sHelmConnector(K8sConnector): command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\ .format(self._helm_command, atomic_str, config_filename, helm_dir, params_str, timeout_str, kdu_instance, kdu_model, version_str) - self.debug('installing: {}'.format(command)) + self.log.debug('installing: {}'.format(command)) if atomic: # exec helm in a task @@ -424,10 +424,10 @@ class K8sHelmConnector(K8sConnector): if rc != 0: msg = 'Error executing command: {}\nOutput: {}'.format(command, output) - self.error(msg) + self.log.error(msg) raise K8sException(msg) - self.debug('Returning kdu_instance {}'.format(kdu_instance)) + self.log.debug('Returning kdu_instance {}'.format(kdu_instance)) return kdu_instance async def instances_list( @@ -441,7 +441,7 @@ class K8sHelmConnector(K8sConnector): :return: """ - self.debug('list releases for cluster {}'.format(cluster_uuid)) + self.log.debug('list releases for cluster {}'.format(cluster_uuid)) # config filename kube_dir, helm_dir, config_filename, cluster_dir = \ @@ -468,7 +468,7 @@ class K8sHelmConnector(K8sConnector): db_dict: dict = None ): - self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid)) + self.log.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid)) # config filename kube_dir, helm_dir, config_filename, cluster_dir = \ @@ -499,7 +499,7 @@ class K8sHelmConnector(K8sConnector): command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\ .format(self._helm_command, atomic_str, config_filename, helm_dir, params_str, timeout_str, kdu_instance, kdu_model, version_str) - self.debug('upgrading: {}'.format(command)) + self.log.debug('upgrading: {}'.format(command)) if atomic: @@ -545,14 +545,14 @@ class K8sHelmConnector(K8sConnector): if rc != 0: msg = 'Error executing command: {}\nOutput: {}'.format(command, output) - self.error(msg) + self.log.error(msg) raise K8sException(msg) # return new revision number instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) if instance: revision = int(instance.get('Revision')) - self.debug('New revision: {}'.format(revision)) + self.log.debug('New revision: {}'.format(revision)) return revision else: return 0 @@ -565,7 +565,7 @@ class K8sHelmConnector(K8sConnector): db_dict: dict = None ): - self.debug('rollback kdu_instance {} to revision {} from cluster {}' + self.log.debug('rollback kdu_instance {} to revision {} from cluster {}' .format(kdu_instance, revision, cluster_uuid)) # config filename @@ -610,14 +610,14 @@ class K8sHelmConnector(K8sConnector): if rc != 0: msg = 'Error executing command: {}\nOutput: {}'.format(command, output) - self.error(msg) + self.log.error(msg) raise K8sException(msg) # return new revision number instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) if instance: revision = int(instance.get('Revision')) - self.debug('New revision: {}'.format(revision)) + self.log.debug('New revision: {}'.format(revision)) return revision else: return 0 @@ -636,7 +636,7 @@ class K8sHelmConnector(K8sConnector): :return: True if successful """ - self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid)) + self.log.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid)) # config filename kube_dir, helm_dir, config_filename, cluster_dir = \ @@ -655,7 +655,7 @@ class K8sHelmConnector(K8sConnector): repo_url: str = None ) -> str: - self.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url)) + self.log.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url)) return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url) @@ -665,7 +665,7 @@ class K8sHelmConnector(K8sConnector): repo_url: str = None ) -> str: - self.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url)) + self.log.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url)) return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url) @@ -675,7 +675,7 @@ class K8sHelmConnector(K8sConnector): repo_url: str = None ) -> str: - self.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url)) + self.log.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url)) return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url) @@ -695,7 +695,7 @@ class K8sHelmConnector(K8sConnector): async def synchronize_repos(self, cluster_uuid: str): - self.debug("syncronize repos for cluster helm-id: {}",) + self.log.debug("syncronize repos for cluster helm-id: {}",) try: update_repos_timeout = 300 # max timeout to sync a single repos, more than this is too much db_k8scluster = self.db.get_one("k8sclusters", {"_admin.helm-chart.id": cluster_uuid}) @@ -705,8 +705,8 @@ class K8sHelmConnector(K8sConnector): # elements that must be deleted deleted_repo_list = [] added_repo_dict = {} - self.debug("helm_chart_repos: {}".format(nbi_repo_list)) - self.debug("helm_charts_added: {}".format(cluster_repo_dict)) + self.log.debug("helm_chart_repos: {}".format(nbi_repo_list)) + self.log.debug("helm_charts_added: {}".format(cluster_repo_dict)) # obtain repos to add: registered by nbi but not added repos_to_add = [repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)] @@ -716,7 +716,7 @@ class K8sHelmConnector(K8sConnector): # delete repos: must delete first then add because there may be different repos with same name but # different id and url - self.debug("repos to delete: {}".format(repos_to_delete)) + self.log.debug("repos to delete: {}".format(repos_to_delete)) for repo_id in repos_to_delete: # try to delete repos try: @@ -730,25 +730,25 @@ class K8sHelmConnector(K8sConnector): deleted_repo_list.append(repo_id) # add repos - self.debug("repos to add: {}".format(repos_to_add)) + self.log.debug("repos to add: {}".format(repos_to_add)) add_task_list = [] for repo_id in repos_to_add: # obtain the repo data from the db # if there is an error getting the repo in the database we will ignore this repo and continue # because there is a possible race condition where the repo has been deleted while processing db_repo = self.db.get_one("k8srepos", {"_id": repo_id}) - self.debug("obtained repo: id, {}, name: {}, url: {}".format(repo_id, db_repo["name"], db_repo["url"])) + self.log.debug("obtained repo: id, {}, name: {}, url: {}".format(repo_id, db_repo["name"], db_repo["url"])) try: repo_add_task = asyncio.ensure_future(self.repo_add(cluster_uuid=cluster_uuid, name=db_repo["name"], url=db_repo["url"], repo_type="chart")) await asyncio.wait_for(repo_add_task, update_repos_timeout) added_repo_dict[repo_id] = db_repo["name"] - self.debug("added repo: id, {}, name: {}".format(repo_id, db_repo["name"])) + self.log.debug("added repo: id, {}, name: {}".format(repo_id, db_repo["name"])) except Exception as e: # deal with error adding repo, adding a repo that already exists does not raise any error # will not raise error because a wrong repos added by anyone could prevent instantiating any ns - self.error("Error adding repo id: {}, err_msg: {} ".format(repo_id, repr(e))) + self.log.error("Error adding repo id: {}, err_msg: {} ".format(repo_id, repr(e))) return deleted_repo_list, added_repo_dict @@ -756,7 +756,7 @@ class K8sHelmConnector(K8sConnector): raise K8sException("k8cluster with helm-id : {} not found".format(cluster_uuid)) except Exception as e: - self.error("Error synchronizing repos: {}".format(str(e))) + self.log.error("Error synchronizing repos: {}".format(str(e))) raise K8sException("Error synchronizing repos") """ @@ -793,7 +793,7 @@ class K8sHelmConnector(K8sConnector): return_text: bool = False ): - self.debug('status of kdu_instance {}'.format(kdu_instance)) + self.log.debug('status of kdu_instance {}'.format(kdu_instance)) # config filename kube_dir, helm_dir, config_filename, cluster_dir = \ @@ -841,7 +841,7 @@ class K8sHelmConnector(K8sConnector): for instance in instances: if instance.get('Name') == kdu_instance: return instance - self.debug('Instance {} not found'.format(kdu_instance)) + self.log.debug('Instance {} not found'.format(kdu_instance)) return None @staticmethod @@ -895,8 +895,8 @@ class K8sHelmConnector(K8sConnector): await asyncio.sleep(check_every) detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) status = detailed_status.get('info').get('Description') - self.debug('STATUS:\n{}'.format(status)) - self.debug('DETAILED STATUS:\n{}'.format(detailed_status)) + self.log.debug('STATUS:\n{}'.format(status)) + self.log.debug('DETAILED STATUS:\n{}'.format(detailed_status)) # write status to db result = await self.write_app_status_to_db( db_dict=db_dict, @@ -904,13 +904,13 @@ class K8sHelmConnector(K8sConnector): detailed_status=str(detailed_status), operation=operation) if not result: - self.info('Error writing in database. Task exiting...') + self.log.info('Error writing in database. Task exiting...') return except asyncio.CancelledError: - self.debug('Task cancelled') + self.log.debug('Task cancelled') return except Exception as e: - self.debug('_store_status exception: {}'.format(str(e))) + self.log.debug('_store_status exception: {}'.format(str(e))) pass finally: if run_once: @@ -957,7 +957,7 @@ class K8sHelmConnector(K8sConnector): current = int(parts[0]) total = int(parts[1]) if current < total: - self.debug('NOT READY:\n {}'.format(line3)) + self.log.debug('NOT READY:\n {}'.format(line3)) ready = False line3 = resources[index] index += 1 @@ -1085,31 +1085,31 @@ class K8sHelmConnector(K8sConnector): # base dir for cluster cluster_dir = base + '/' + cluster_name if create_if_not_exist and not os.path.exists(cluster_dir): - self.debug('Creating dir {}'.format(cluster_dir)) + self.log.debug('Creating dir {}'.format(cluster_dir)) os.makedirs(cluster_dir) if not os.path.exists(cluster_dir): msg = 'Base cluster dir {} does not exist'.format(cluster_dir) - self.error(msg) + self.log.error(msg) raise K8sException(msg) # kube dir kube_dir = cluster_dir + '/' + '.kube' if create_if_not_exist and not os.path.exists(kube_dir): - self.debug('Creating dir {}'.format(kube_dir)) + self.log.debug('Creating dir {}'.format(kube_dir)) os.makedirs(kube_dir) if not os.path.exists(kube_dir): msg = 'Kube config dir {} does not exist'.format(kube_dir) - self.error(msg) + self.log.error(msg) raise K8sException(msg) # helm home dir helm_dir = cluster_dir + '/' + '.helm' if create_if_not_exist and not os.path.exists(helm_dir): - self.debug('Creating dir {}'.format(helm_dir)) + self.log.debug('Creating dir {}'.format(helm_dir)) os.makedirs(helm_dir) if not os.path.exists(helm_dir): msg = 'Helm config dir {} does not exist'.format(helm_dir) - self.error(msg) + self.log.error(msg) raise K8sException(msg) config_filename = kube_dir + '/config' @@ -1127,13 +1127,13 @@ class K8sHelmConnector(K8sConnector): command: str ) -> (str, int): command = K8sHelmConnector._remove_multiple_spaces(command) - self.debug('Executing sync local command: {}'.format(command)) + self.log.debug('Executing sync local command: {}'.format(command)) # raise exception if fails output = '' try: output = subprocess.check_output(command, shell=True, universal_newlines=True) return_code = 0 - self.debug(output) + self.log.debug(output) except Exception as e: return_code = 1 @@ -1148,7 +1148,7 @@ class K8sHelmConnector(K8sConnector): ) -> (str, int): command = K8sHelmConnector._remove_multiple_spaces(command) - self.debug('Executing async local command: {}'.format(command)) + self.log.debug('Executing async local command: {}'.format(command)) # split command command = command.split(sep=' ') @@ -1174,9 +1174,9 @@ class K8sHelmConnector(K8sConnector): # output = stderr.decode() if return_code != 0 and show_error_log: - self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output)) + self.log.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output)) else: - self.debug('Return code: {}'.format(return_code)) + self.log.debug('Return code: {}'.format(return_code)) if raise_exception_on_error and return_code != 0: raise K8sException(output) @@ -1193,20 +1193,20 @@ class K8sHelmConnector(K8sConnector): raise except Exception as e: msg = 'Exception executing command: {} -> {}'.format(command, e) - self.error(msg) + self.log.error(msg) if raise_exception_on_error: raise K8sException(e) from e else: return '', -1 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False): - self.debug('Checking if file {} exists...'.format(filename)) + self.log.debug('Checking if file {} exists...'.format(filename)) if os.path.exists(filename): return True else: msg = 'File {} does not exist'.format(filename) if exception_if_not_exists: - self.error(msg) + self.log.error(msg) raise K8sException(msg)