From e308c71201537413caf28f8814051470f0418504 Mon Sep 17 00:00:00 2001 From: lloretgalleg Date: Wed, 2 Sep 2020 09:40:38 +0000 Subject: [PATCH] Syncronize with fs before action and reverse sync after action to allow high availability with mongofs Change-Id: I4b8b2411d605de8e1743d835aff994ad788ae997 Signed-off-by: lloretgalleg --- n2vc/k8s_helm_conn.py | 68 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py index ad9d9d0..65cbdac 100644 --- a/n2vc/k8s_helm_conn.py +++ b/n2vc/k8s_helm_conn.py @@ -195,6 +195,9 @@ class K8sHelmConnector(K8sConnector): else: self.log.info("Helm client already initialized") + # sync fs with local data + self.fs.reverse_sync(from_path=cluster_id) + self.log.info("Cluster {} initialized".format(cluster_id)) return cluster_uuid, n2vc_installed_sw @@ -211,6 +214,9 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + # helm repo update command = "{} --kubeconfig={} --home={} repo update".format( self._helm_command, config_filename, helm_dir @@ -225,6 +231,9 @@ class K8sHelmConnector(K8sConnector): self.log.debug("adding repo: {}".format(command)) await self._local_async_exec(command=command, raise_exception_on_error=True) + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + async def repo_list(self, cluster_uuid: str) -> list: """ Get the list of registered repositories @@ -240,6 +249,9 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + command = "{} --kubeconfig={} --home={} repo list --output yaml".format( self._helm_command, config_filename, helm_dir ) @@ -252,6 +264,9 @@ class K8sHelmConnector(K8sConnector): else: return [] + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + async def repo_remove(self, cluster_uuid: str, name: str): """ Remove a repository from OSM @@ -269,12 +284,18 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + command = "{} --kubeconfig={} --home={} repo remove {}".format( self._helm_command, config_filename, helm_dir, name ) await self._local_async_exec(command=command, raise_exception_on_error=True) + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + async def reset( self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False ) -> bool: @@ -288,6 +309,9 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=False ) + # sync local dir + self.fs.sync(from_path=cluster_id) + # uninstall releases if needed. if uninstall_sw: releases = await self.instances_list(cluster_uuid=cluster_uuid) @@ -369,8 +393,10 @@ class K8sHelmConnector(K8sConnector): self.log.debug("namespace not found") # delete cluster directory + self.log.debug("Removing directory {}".format(cluster_id)) + self.fs.file_delete(cluster_id, ignore_non_exist=True) + # Remove also local directorio if still exist direct = self.fs.path + "/" + cluster_id - self.log.debug("Removing directory {}".format(direct)) shutil.rmtree(direct, ignore_errors=True) return True @@ -395,6 +421,10 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.log.debug("sync cluster_id: {}".format(_cluster_dir)) + self.fs.sync(from_path=cluster_id) + # params to str # params_str = K8sHelmConnector._params_to_set_option(params) params_str, file_to_delete = self._params_to_file_option( @@ -508,6 +538,9 @@ class K8sHelmConnector(K8sConnector): self.log.error(msg) raise K8sException(msg) + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + self.log.debug("Returning kdu_instance {}".format(kdu_instance)) return kdu_instance @@ -527,6 +560,9 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + command = "{} --kubeconfig={} --home={} list --output yaml".format( self._helm_command, config_filename, helm_dir ) @@ -559,6 +595,9 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + # params to str # params_str = K8sHelmConnector._params_to_set_option(params) params_str, file_to_delete = self._params_to_file_option( @@ -649,6 +688,9 @@ class K8sHelmConnector(K8sConnector): self.log.error(msg) raise K8sException(msg) + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + # return new revision number instance = await self.get_instance_info( cluster_uuid=cluster_uuid, kdu_instance=kdu_instance @@ -676,6 +718,9 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format( self._helm_command, config_filename, helm_dir, kdu_instance, revision ) @@ -720,6 +765,9 @@ class K8sHelmConnector(K8sConnector): self.log.error(msg) raise K8sException(msg) + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + # return new revision number instance = await self.get_instance_info( cluster_uuid=cluster_uuid, kdu_instance=kdu_instance @@ -754,6 +802,9 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + command = "{} --kubeconfig={} --home={} delete --purge {}".format( self._helm_command, config_filename, helm_dir, kdu_instance ) @@ -762,6 +813,9 @@ class K8sHelmConnector(K8sConnector): command=command, raise_exception_on_error=True ) + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + return self._output_to_table(output) async def exec_primitive( @@ -844,6 +898,9 @@ class K8sHelmConnector(K8sConnector): ) ) + # sync local dir + self.fs.sync(from_path=cluster_id) + status = await self._status_kdu( cluster_id, kdu_instance, return_text=False ) @@ -854,6 +911,9 @@ class K8sHelmConnector(K8sConnector): service = await self.get_service(cluster_uuid, service, namespace) service_list.append(service) + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + return service_list async def get_service(self, @@ -872,6 +932,9 @@ class K8sHelmConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format( self.kubectl_command, config_filename, namespace, service_name ) @@ -893,6 +956,9 @@ class K8sHelmConnector(K8sConnector): ip_list = [elem["ip"] for elem in ip_map_list] service["external_ip"] = ip_list + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + return service async def synchronize_repos(self, cluster_uuid: str): -- 2.17.1