From: lloretgalleg Date: Thu, 20 Feb 2020 11:01:17 +0000 (+0100) Subject: Synchronize helm repos on ns instantiation instead of creation X-Git-Tag: release-v8.0-start~28 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=65ddf85ef9d11aa4d4f9dc7cb2912ff7069f7c94;p=osm%2FN2VC.git Synchronize helm repos on ns instantiation instead of creation Change-Id: Ia4f35283399d2777ff3df1fb2824a68cbaec4804 Signed-off-by: lloretgalleg --- diff --git a/n2vc/k8s_conn.py b/n2vc/k8s_conn.py index f9aefa7..6ac8119 100644 --- a/n2vc/k8s_conn.py +++ b/n2vc/k8s_conn.py @@ -122,6 +122,20 @@ class K8sConnector(abc.ABC, Loggable): :return: True if successful """ + @abc.abstractmethod + async def synchronize_repos( + self, + cluster_uuid: str, + name: str + ): + """ + Synchronizes the list of repositories created in the cluster with + the repositories added by the NBI + + :param cluster_uuid: the cluster + :return: List of repositories deleted from the cluster and dictionary with repos added + """ + @abc.abstractmethod async def reset( self, diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py index 9ce992f..496e320 100644 --- a/n2vc/k8s_helm_conn.py +++ b/n2vc/k8s_helm_conn.py @@ -384,6 +384,7 @@ class K8sHelmConnector(K8sConnector): exec_task = asyncio.ensure_future( coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False) ) + # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( @@ -692,6 +693,72 @@ class K8sHelmConnector(K8sConnector): return_text=True ) + async def synchronize_repos(self, cluster_uuid: str): + + self.debug("syncronize repos for cluster helm-id: {}",) + try: + update_repos_timeout = 300 # max timeout to sync a single repos, more than this is too much + db_k8scluster = self.db.get_one("k8sclusters", {"_admin.helm-chart.id": cluster_uuid}) + if db_k8scluster: + nbi_repo_list = db_k8scluster.get("_admin").get("helm_chart_repos") or [] + cluster_repo_dict = db_k8scluster.get("_admin").get("helm_charts_added") or {} + # elements that must be deleted + deleted_repo_list = [] + added_repo_dict = {} + self.debug("helm_chart_repos: {}".format(nbi_repo_list)) + self.debug("helm_charts_added: {}".format(cluster_repo_dict)) + + # obtain repos to add: registered by nbi but not added + repos_to_add = [repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)] + + # obtain repos to delete: added by cluster but not in nbi list + repos_to_delete = [repo for repo in cluster_repo_dict.keys() if repo not in nbi_repo_list] + + # delete repos: must delete first then add because there may be different repos with same name but + # different id and url + self.debug("repos to delete: {}".format(repos_to_delete)) + for repo_id in repos_to_delete: + # try to delete repos + try: + repo_delete_task = asyncio.ensure_future(self.repo_remove(cluster_uuid=cluster_uuid, + name=cluster_repo_dict[repo_id])) + await asyncio.wait_for(repo_delete_task, update_repos_timeout) + except Exception as e: + self.warning("Error deleting repo, id: {}, name: {}, err_msg: {}".format(repo_id, + cluster_repo_dict[repo_id], str(e))) + # always add to the list of to_delete if there is an error because if is not there deleting raises error + deleted_repo_list.append(repo_id) + + # add repos + self.debug("repos to add: {}".format(repos_to_add)) + add_task_list = [] + for repo_id in repos_to_add: + # obtain the repo data from the db + # if there is an error getting the repo in the database we will ignore this repo and continue + # because there is a possible race condition where the repo has been deleted while processing + db_repo = self.db.get_one("k8srepos", {"_id": repo_id}) + self.debug("obtained repo: id, {}, name: {}, url: {}".format(repo_id, db_repo["name"], db_repo["url"])) + try: + repo_add_task = asyncio.ensure_future(self.repo_add(cluster_uuid=cluster_uuid, + name=db_repo["name"], url=db_repo["url"], + repo_type="chart")) + await asyncio.wait_for(repo_add_task, update_repos_timeout) + added_repo_dict[repo_id] = db_repo["name"] + self.debug("added repo: id, {}, name: {}".format(repo_id, db_repo["name"])) + except Exception as e: + # deal with error adding repo, adding a repo that already exists does not raise any error + # will not raise error because a wrong repos added by anyone could prevent instantiating any ns + self.error("Error adding repo id: {}, err_msg: {} ".format(repo_id, repr(e))) + + return deleted_repo_list, added_repo_dict + + else: # else db_k8scluster does not exist + raise K8sException("k8cluster with helm-id : {} not found".format(cluster_uuid)) + + except Exception as e: + self.error("Error synchronizing repos: {}".format(str(e))) + raise K8sException("Error synchronizing repos") + """ ################################################################################################## ########################################## P R I V A T E ######################################### @@ -828,10 +895,8 @@ class K8sHelmConnector(K8sConnector): await asyncio.sleep(check_every) detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) status = detailed_status.get('info').get('Description') - print('=' * 60) self.debug('STATUS:\n{}'.format(status)) self.debug('DETAILED STATUS:\n{}'.format(detailed_status)) - print('=' * 60) # write status to db result = await self.write_app_status_to_db( db_dict=db_dict, @@ -845,6 +910,7 @@ class K8sHelmConnector(K8sConnector): self.debug('Task cancelled') return except Exception as e: + self.debug('_store_status exception: {}'.format(str(e))) pass finally: if run_once: @@ -1142,3 +1208,5 @@ class K8sHelmConnector(K8sConnector): if exception_if_not_exists: self.error(msg) raise K8sException(msg) + + diff --git a/n2vc/k8s_juju_conn.py b/n2vc/k8s_juju_conn.py index 9bc5d40..147b599 100644 --- a/n2vc/k8s_juju_conn.py +++ b/n2vc/k8s_juju_conn.py @@ -248,6 +248,16 @@ class K8sJujuConnector(K8sConnector): ): raise NotImplemented() + async def synchronize_repos( + self, + cluster_uuid: str, + name: str + ): + """ + Returns None as currently add_repo is not implemented + """ + return None + """Reset""" async def reset( self,