Syncronize with fs before action and reverse sync after action to allow high availability with mongofs
Change-Id: I4b8b2411d605de8e1743d835aff994ad788ae997
Signed-off-by: lloretgalleg <illoret@indra.es>
diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py
index ad9d9d0..65cbdac 100644
--- a/n2vc/k8s_helm_conn.py
+++ b/n2vc/k8s_helm_conn.py
@@ -195,6 +195,9 @@
else:
self.log.info("Helm client already initialized")
+ # sync fs with local data
+ self.fs.reverse_sync(from_path=cluster_id)
+
self.log.info("Cluster {} initialized".format(cluster_id))
return cluster_uuid, n2vc_installed_sw
@@ -211,6 +214,9 @@
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
# helm repo update
command = "{} --kubeconfig={} --home={} repo update".format(
self._helm_command, config_filename, helm_dir
@@ -225,6 +231,9 @@
self.log.debug("adding repo: {}".format(command))
await self._local_async_exec(command=command, raise_exception_on_error=True)
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
async def repo_list(self, cluster_uuid: str) -> list:
"""
Get the list of registered repositories
@@ -240,6 +249,9 @@
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
self._helm_command, config_filename, helm_dir
)
@@ -252,6 +264,9 @@
else:
return []
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
async def repo_remove(self, cluster_uuid: str, name: str):
"""
Remove a repository from OSM
@@ -269,12 +284,18 @@
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
command = "{} --kubeconfig={} --home={} repo remove {}".format(
self._helm_command, config_filename, helm_dir, name
)
await self._local_async_exec(command=command, raise_exception_on_error=True)
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
async def reset(
self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
) -> bool:
@@ -288,6 +309,9 @@
cluster_name=cluster_id, create_if_not_exist=False
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
# uninstall releases if needed.
if uninstall_sw:
releases = await self.instances_list(cluster_uuid=cluster_uuid)
@@ -369,8 +393,10 @@
self.log.debug("namespace not found")
# delete cluster directory
+ self.log.debug("Removing directory {}".format(cluster_id))
+ self.fs.file_delete(cluster_id, ignore_non_exist=True)
+ # Remove also local directorio if still exist
direct = self.fs.path + "/" + cluster_id
- self.log.debug("Removing directory {}".format(direct))
shutil.rmtree(direct, ignore_errors=True)
return True
@@ -395,6 +421,10 @@
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.log.debug("sync cluster_id: {}".format(_cluster_dir))
+ self.fs.sync(from_path=cluster_id)
+
# params to str
# params_str = K8sHelmConnector._params_to_set_option(params)
params_str, file_to_delete = self._params_to_file_option(
@@ -508,6 +538,9 @@
self.log.error(msg)
raise K8sException(msg)
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
self.log.debug("Returning kdu_instance {}".format(kdu_instance))
return kdu_instance
@@ -527,6 +560,9 @@
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
command = "{} --kubeconfig={} --home={} list --output yaml".format(
self._helm_command, config_filename, helm_dir
)
@@ -559,6 +595,9 @@
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
# params to str
# params_str = K8sHelmConnector._params_to_set_option(params)
params_str, file_to_delete = self._params_to_file_option(
@@ -649,6 +688,9 @@
self.log.error(msg)
raise K8sException(msg)
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
# return new revision number
instance = await self.get_instance_info(
cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
@@ -676,6 +718,9 @@
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
self._helm_command, config_filename, helm_dir, kdu_instance, revision
)
@@ -720,6 +765,9 @@
self.log.error(msg)
raise K8sException(msg)
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
# return new revision number
instance = await self.get_instance_info(
cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
@@ -754,6 +802,9 @@
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
command = "{} --kubeconfig={} --home={} delete --purge {}".format(
self._helm_command, config_filename, helm_dir, kdu_instance
)
@@ -762,6 +813,9 @@
command=command, raise_exception_on_error=True
)
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
return self._output_to_table(output)
async def exec_primitive(
@@ -844,6 +898,9 @@
)
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
status = await self._status_kdu(
cluster_id, kdu_instance, return_text=False
)
@@ -854,6 +911,9 @@
service = await self.get_service(cluster_uuid, service, namespace)
service_list.append(service)
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
return service_list
async def get_service(self,
@@ -872,6 +932,9 @@
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
self.kubectl_command, config_filename, namespace, service_name
)
@@ -893,6 +956,9 @@
ip_list = [elem["ip"] for elem in ip_map_list]
service["external_ip"] = ip_list
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
return service
async def synchronize_repos(self, cluster_uuid: str):