+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
+
+ data = yaml.load(output, Loader=yaml.SafeLoader)
+
+ service = {
+ "name": service_name,
+ "type": self._get_deep(data, ("spec", "type")),
+ "ports": self._get_deep(data, ("spec", "ports")),
+ "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
+ }
+ if service["type"] == "LoadBalancer":
+ ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
+ ip_list = [elem["ip"] for elem in ip_map_list]
+ service["external_ip"] = ip_list
+
+ return service
+
+ async def synchronize_repos(self, cluster_uuid: str):
+
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("syncronize repos for cluster helm-id: {}".format(cluster_id))
+ try:
+ update_repos_timeout = (
+ 300 # max timeout to sync a single repos, more than this is too much
+ )
+ db_k8scluster = self.db.get_one(
+ "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
+ )
+ if db_k8scluster:
+ nbi_repo_list = (
+ db_k8scluster.get("_admin").get("helm_chart_repos") or []
+ )
+ cluster_repo_dict = (
+ db_k8scluster.get("_admin").get("helm_charts_added") or {}
+ )
+ # elements that must be deleted
+ deleted_repo_list = []
+ added_repo_dict = {}
+ # self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
+ # self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
+
+ # obtain repos to add: registered by nbi but not added
+ repos_to_add = [
+ repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
+ ]
+
+ # obtain repos to delete: added by cluster but not in nbi list
+ repos_to_delete = [
+ repo
+ for repo in cluster_repo_dict.keys()
+ if repo not in nbi_repo_list
+ ]
+
+ # delete repos: must delete first then add because there may be
+ # different repos with same name but
+ # different id and url
+ if repos_to_delete:
+ self.log.debug("repos to delete: {}".format(repos_to_delete))
+ for repo_id in repos_to_delete:
+ # try to delete repos
+ try:
+ repo_delete_task = asyncio.ensure_future(
+ self.repo_remove(
+ cluster_uuid=cluster_uuid,
+ name=cluster_repo_dict[repo_id],
+ )
+ )
+ await asyncio.wait_for(repo_delete_task, update_repos_timeout)
+ except Exception as e:
+ self.warning(
+ "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
+ repo_id, cluster_repo_dict[repo_id], str(e)
+ )
+ )
+ # always add to the list of to_delete if there is an error
+ # because if is not there
+ # deleting raises error
+ deleted_repo_list.append(repo_id)
+
+ # add repos
+ if repos_to_add:
+ self.log.debug("repos to add: {}".format(repos_to_add))
+ for repo_id in repos_to_add:
+ # obtain the repo data from the db
+ # if there is an error getting the repo in the database we will
+ # ignore this repo and continue
+ # because there is a possible race condition where the repo has
+ # been deleted while processing
+ db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
+ self.log.debug(
+ "obtained repo: id, {}, name: {}, url: {}".format(
+ repo_id, db_repo["name"], db_repo["url"]
+ )
+ )
+ try:
+ repo_add_task = asyncio.ensure_future(
+ self.repo_add(
+ cluster_uuid=cluster_uuid,
+ name=db_repo["name"],
+ url=db_repo["url"],
+ repo_type="chart",
+ )
+ )
+ await asyncio.wait_for(repo_add_task, update_repos_timeout)
+ added_repo_dict[repo_id] = db_repo["name"]
+ self.log.debug(
+ "added repo: id, {}, name: {}".format(
+ repo_id, db_repo["name"]
+ )
+ )
+ except Exception as e:
+ # deal with error adding repo, adding a repo that already
+ # exists does not raise any error
+ # will not raise error because a wrong repos added by
+ # anyone could prevent instantiating any ns
+ self.log.error(
+ "Error adding repo id: {}, err_msg: {} ".format(
+ repo_id, repr(e)
+ )
+ )
+
+ return deleted_repo_list, added_repo_dict
+
+ else: # else db_k8scluster does not exist
+ raise K8sException(
+ "k8cluster with helm-id : {} not found".format(cluster_uuid)
+ )
+
+ except Exception as e:
+ self.log.error("Error synchronizing repos: {}".format(str(e)))
+ raise K8sException("Error synchronizing repos")
+