-
- async def synchronize_repos(self, cluster_uuid: str):
-
- self.log.debug("syncronize repos for cluster helm-id: {}",)
- try:
- update_repos_timeout = 300 # max timeout to sync a single repos, more than this is too much
- db_k8scluster = self.db.get_one("k8sclusters", {"_admin.helm-chart.id": cluster_uuid})
- if db_k8scluster:
- nbi_repo_list = db_k8scluster.get("_admin").get("helm_chart_repos") or []
- cluster_repo_dict = db_k8scluster.get("_admin").get("helm_charts_added") or {}
- # elements that must be deleted
- deleted_repo_list = []
- added_repo_dict = {}
- self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
- self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
-
- # obtain repos to add: registered by nbi but not added
- repos_to_add = [repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)]
-
- # obtain repos to delete: added by cluster but not in nbi list
- repos_to_delete = [repo for repo in cluster_repo_dict.keys() if repo not in nbi_repo_list]
-
- # delete repos: must delete first then add because there may be different repos with same name but
- # different id and url
- self.log.debug("repos to delete: {}".format(repos_to_delete))
- for repo_id in repos_to_delete:
- # try to delete repos
- try:
- repo_delete_task = asyncio.ensure_future(self.repo_remove(cluster_uuid=cluster_uuid,
- name=cluster_repo_dict[repo_id]))
- await asyncio.wait_for(repo_delete_task, update_repos_timeout)
- except Exception as e:
- self.warning("Error deleting repo, id: {}, name: {}, err_msg: {}".format(repo_id,
- cluster_repo_dict[repo_id], str(e)))
- # always add to the list of to_delete if there is an error because if is not there deleting raises error
- deleted_repo_list.append(repo_id)
-
- # add repos
- self.log.debug("repos to add: {}".format(repos_to_add))
- add_task_list = []
- for repo_id in repos_to_add:
- # obtain the repo data from the db
- # if there is an error getting the repo in the database we will ignore this repo and continue
- # because there is a possible race condition where the repo has been deleted while processing
- db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
- self.log.debug("obtained repo: id, {}, name: {}, url: {}".format(repo_id, db_repo["name"], db_repo["url"]))
- try:
- repo_add_task = asyncio.ensure_future(self.repo_add(cluster_uuid=cluster_uuid,
- name=db_repo["name"], url=db_repo["url"],
- repo_type="chart"))
- await asyncio.wait_for(repo_add_task, update_repos_timeout)
- added_repo_dict[repo_id] = db_repo["name"]
- self.log.debug("added repo: id, {}, name: {}".format(repo_id, db_repo["name"]))
- except Exception as e:
- # deal with error adding repo, adding a repo that already exists does not raise any error
- # will not raise error because a wrong repos added by anyone could prevent instantiating any ns
- self.log.error("Error adding repo id: {}, err_msg: {} ".format(repo_id, repr(e)))
-
- return deleted_repo_list, added_repo_dict
-
- else: # else db_k8scluster does not exist
- raise K8sException("k8cluster with helm-id : {} not found".format(cluster_uuid))
-
- except Exception as e:
- self.log.error("Error synchronizing repos: {}".format(str(e)))
- raise K8sException("Error synchronizing repos")
-
- """
- ##################################################################################################
- ########################################## P R I V A T E #########################################
- ##################################################################################################
- """
-
- async def _exec_inspect_comand(
- self,
- inspect_command: str,
- kdu_model: str,
- repo_url: str = None
- ):
-
- repo_str = ''
- if repo_url:
- repo_str = ' --repo {}'.format(repo_url)
- idx = kdu_model.find('/')
- if idx >= 0:
- idx += 1
- kdu_model = kdu_model[idx:]
-
- inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str)
- output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True)
-
- return output
-
- async def _status_kdu(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- show_error_log: bool = False,
- return_text: bool = False
- ):
-
- self.log.debug('status of kdu_instance {}'.format(kdu_instance))
-
- # config filename
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
-
- command = '{} --kubeconfig={} --home={} status {} --output yaml'\
- .format(self._helm_command, config_filename, helm_dir, kdu_instance)
-