Syncronize with fs before action and reverse sync after action to allow high availabi...
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
index 2e74423..65cbdac 100644 (file)
@@ -195,6 +195,9 @@ class K8sHelmConnector(K8sConnector):
             else:
                 self.log.info("Helm client already initialized")
 
             else:
                 self.log.info("Helm client already initialized")
 
+        # sync fs with local data
+        self.fs.reverse_sync(from_path=cluster_id)
+
         self.log.info("Cluster {} initialized".format(cluster_id))
 
         return cluster_uuid, n2vc_installed_sw
         self.log.info("Cluster {} initialized".format(cluster_id))
 
         return cluster_uuid, n2vc_installed_sw
@@ -211,6 +214,9 @@ class K8sHelmConnector(K8sConnector):
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
         # helm repo update
         command = "{} --kubeconfig={} --home={} repo update".format(
             self._helm_command, config_filename, helm_dir
         # helm repo update
         command = "{} --kubeconfig={} --home={} repo update".format(
             self._helm_command, config_filename, helm_dir
@@ -225,6 +231,9 @@ class K8sHelmConnector(K8sConnector):
         self.log.debug("adding repo: {}".format(command))
         await self._local_async_exec(command=command, raise_exception_on_error=True)
 
         self.log.debug("adding repo: {}".format(command))
         await self._local_async_exec(command=command, raise_exception_on_error=True)
 
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
     async def repo_list(self, cluster_uuid: str) -> list:
         """
         Get the list of registered repositories
     async def repo_list(self, cluster_uuid: str) -> list:
         """
         Get the list of registered repositories
@@ -240,6 +249,9 @@ class K8sHelmConnector(K8sConnector):
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
         command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
             self._helm_command, config_filename, helm_dir
         )
         command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
             self._helm_command, config_filename, helm_dir
         )
@@ -252,6 +264,9 @@ class K8sHelmConnector(K8sConnector):
         else:
             return []
 
         else:
             return []
 
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
     async def repo_remove(self, cluster_uuid: str, name: str):
         """
         Remove a repository from OSM
     async def repo_remove(self, cluster_uuid: str, name: str):
         """
         Remove a repository from OSM
@@ -269,51 +284,59 @@ class K8sHelmConnector(K8sConnector):
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
         command = "{} --kubeconfig={} --home={} repo remove {}".format(
             self._helm_command, config_filename, helm_dir, name
         )
 
         await self._local_async_exec(command=command, raise_exception_on_error=True)
 
         command = "{} --kubeconfig={} --home={} repo remove {}".format(
             self._helm_command, config_filename, helm_dir, name
         )
 
         await self._local_async_exec(command=command, raise_exception_on_error=True)
 
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
     async def reset(
         self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
     ) -> bool:
 
         namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
     async def reset(
         self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
     ) -> bool:
 
         namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
-        self.log.debug(
-            "Resetting K8s environment. cluster uuid: {}".format(cluster_id)
-        )
+        self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
+                       .format(cluster_id, uninstall_sw))
 
         # get kube and helm directories
         _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
             cluster_name=cluster_id, create_if_not_exist=False
         )
 
 
         # get kube and helm directories
         _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
             cluster_name=cluster_id, create_if_not_exist=False
         )
 
-        # uninstall releases if needed
-        releases = await self.instances_list(cluster_uuid=cluster_uuid)
-        if len(releases) > 0:
-            if force:
-                for r in releases:
-                    try:
-                        kdu_instance = r.get("Name")
-                        chart = r.get("Chart")
-                        self.log.debug(
-                            "Uninstalling {} -> {}".format(chart, kdu_instance)
-                        )
-                        await self.uninstall(
-                            cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
-                        )
-                    except Exception as e:
-                        self.log.error(
-                            "Error uninstalling release {}: {}".format(kdu_instance, e)
-                        )
-            else:
-                msg = (
-                    "Cluster has releases and not force. Cannot reset K8s "
-                    "environment. Cluster uuid: {}"
-                ).format(cluster_id)
-                self.log.error(msg)
-                raise K8sException(msg)
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
+        # uninstall releases if needed.
+        if uninstall_sw:
+            releases = await self.instances_list(cluster_uuid=cluster_uuid)
+            if len(releases) > 0:
+                if force:
+                    for r in releases:
+                        try:
+                            kdu_instance = r.get("Name")
+                            chart = r.get("Chart")
+                            self.log.debug(
+                                "Uninstalling {} -> {}".format(chart, kdu_instance)
+                            )
+                            await self.uninstall(
+                                cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+                            )
+                        except Exception as e:
+                            self.log.error(
+                                "Error uninstalling release {}: {}".format(kdu_instance, e)
+                            )
+                else:
+                    msg = (
+                        "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
+                    ).format(cluster_id)
+                    self.log.warn(msg)
+                    uninstall_sw = False  # Allow to remove k8s cluster without removing Tiller
 
         if uninstall_sw:
 
 
         if uninstall_sw:
 
@@ -370,8 +393,10 @@ class K8sHelmConnector(K8sConnector):
                 self.log.debug("namespace not found")
 
         # delete cluster directory
                 self.log.debug("namespace not found")
 
         # delete cluster directory
+        self.log.debug("Removing directory {}".format(cluster_id))
+        self.fs.file_delete(cluster_id, ignore_non_exist=True)
+        # Remove also local directorio if still exist
         direct = self.fs.path + "/" + cluster_id
         direct = self.fs.path + "/" + cluster_id
-        self.log.debug("Removing directory {}".format(direct))
         shutil.rmtree(direct, ignore_errors=True)
 
         return True
         shutil.rmtree(direct, ignore_errors=True)
 
         return True
@@ -396,6 +421,10 @@ class K8sHelmConnector(K8sConnector):
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
+        # sync local dir
+        self.log.debug("sync cluster_id: {}".format(_cluster_dir))
+        self.fs.sync(from_path=cluster_id)
+
         # params to str
         # params_str = K8sHelmConnector._params_to_set_option(params)
         params_str, file_to_delete = self._params_to_file_option(
         # params to str
         # params_str = K8sHelmConnector._params_to_set_option(params)
         params_str, file_to_delete = self._params_to_file_option(
@@ -509,6 +538,9 @@ class K8sHelmConnector(K8sConnector):
             self.log.error(msg)
             raise K8sException(msg)
 
             self.log.error(msg)
             raise K8sException(msg)
 
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
         self.log.debug("Returning kdu_instance {}".format(kdu_instance))
         return kdu_instance
 
         self.log.debug("Returning kdu_instance {}".format(kdu_instance))
         return kdu_instance
 
@@ -528,6 +560,9 @@ class K8sHelmConnector(K8sConnector):
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
         command = "{} --kubeconfig={} --home={} list --output yaml".format(
             self._helm_command, config_filename, helm_dir
         )
         command = "{} --kubeconfig={} --home={} list --output yaml".format(
             self._helm_command, config_filename, helm_dir
         )
@@ -560,6 +595,9 @@ class K8sHelmConnector(K8sConnector):
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
         # params to str
         # params_str = K8sHelmConnector._params_to_set_option(params)
         params_str, file_to_delete = self._params_to_file_option(
         # params to str
         # params_str = K8sHelmConnector._params_to_set_option(params)
         params_str, file_to_delete = self._params_to_file_option(
@@ -650,6 +688,9 @@ class K8sHelmConnector(K8sConnector):
             self.log.error(msg)
             raise K8sException(msg)
 
             self.log.error(msg)
             raise K8sException(msg)
 
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
         # return new revision number
         instance = await self.get_instance_info(
             cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
         # return new revision number
         instance = await self.get_instance_info(
             cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
@@ -677,6 +718,9 @@ class K8sHelmConnector(K8sConnector):
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
         command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
             self._helm_command, config_filename, helm_dir, kdu_instance, revision
         )
         command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
             self._helm_command, config_filename, helm_dir, kdu_instance, revision
         )
@@ -721,6 +765,9 @@ class K8sHelmConnector(K8sConnector):
             self.log.error(msg)
             raise K8sException(msg)
 
             self.log.error(msg)
             raise K8sException(msg)
 
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
         # return new revision number
         instance = await self.get_instance_info(
             cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
         # return new revision number
         instance = await self.get_instance_info(
             cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
@@ -755,6 +802,9 @@ class K8sHelmConnector(K8sConnector):
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
             cluster_name=cluster_id, create_if_not_exist=True
         )
 
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
         command = "{} --kubeconfig={} --home={} delete --purge {}".format(
             self._helm_command, config_filename, helm_dir, kdu_instance
         )
         command = "{} --kubeconfig={} --home={} delete --purge {}".format(
             self._helm_command, config_filename, helm_dir, kdu_instance
         )
@@ -763,6 +813,9 @@ class K8sHelmConnector(K8sConnector):
             command=command, raise_exception_on_error=True
         )
 
             command=command, raise_exception_on_error=True
         )
 
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
         return self._output_to_table(output)
 
     async def exec_primitive(
         return self._output_to_table(output)
 
     async def exec_primitive(
@@ -833,6 +886,81 @@ class K8sHelmConnector(K8sConnector):
             return_text=True,
         )
 
             return_text=True,
         )
 
+    async def get_services(self,
+                           cluster_uuid: str,
+                           kdu_instance: str,
+                           namespace: str) -> list:
+
+        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+        self.log.debug(
+            "get_services: cluster_uuid: {}, kdu_instance: {}".format(
+                cluster_uuid, kdu_instance
+            )
+        )
+
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
+        status = await self._status_kdu(
+            cluster_id, kdu_instance, return_text=False
+        )
+
+        service_names = self._parse_helm_status_service_info(status)
+        service_list = []
+        for service in service_names:
+            service = await self.get_service(cluster_uuid, service, namespace)
+            service_list.append(service)
+
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
+        return service_list
+
+    async def get_service(self,
+                          cluster_uuid: str,
+                          service_name: str,
+                          namespace: str) -> object:
+
+        self.log.debug(
+            "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
+                service_name, namespace, cluster_uuid)
+        )
+
+        # get paths
+        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+        _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+            cluster_name=cluster_id, create_if_not_exist=True
+        )
+
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
+        command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
+            self.kubectl_command, config_filename, namespace, service_name
+        )
+
+        output, _rc = await self._local_async_exec(
+            command=command, raise_exception_on_error=True
+        )
+
+        data = yaml.load(output, Loader=yaml.SafeLoader)
+
+        service = {
+            "name": service_name,
+            "type": self._get_deep(data, ("spec", "type")),
+            "ports": self._get_deep(data, ("spec", "ports")),
+            "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
+        }
+        if service["type"] == "LoadBalancer":
+            ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
+            ip_list = [elem["ip"] for elem in ip_map_list]
+            service["external_ip"] = ip_list
+
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
+        return service
+
     async def synchronize_repos(self, cluster_uuid: str):
 
         _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
     async def synchronize_repos(self, cluster_uuid: str):
 
         _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
@@ -1097,7 +1225,7 @@ class K8sHelmConnector(K8sConnector):
                 self.log.debug("Task cancelled")
                 return
             except Exception as e:
                 self.log.debug("Task cancelled")
                 return
             except Exception as e:
-                self.log.debug("_store_status exception: {}".format(str(e)))
+                self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
                 pass
             finally:
                 if run_once:
                 pass
             finally:
                 if run_once:
@@ -1152,6 +1280,37 @@ class K8sHelmConnector(K8sConnector):
 
         return ready
 
 
         return ready
 
+    def _parse_helm_status_service_info(self, status):
+
+        # extract info.status.resources-> str
+        # format:
+        #       ==> v1/Deployment
+        #       NAME                    READY   UP-TO-DATE   AVAILABLE   AGE
+        #       halting-horse-mongodb   0/1     1            0           0s
+        #       halting-petit-mongodb   1/1     1            0           0s
+        # blank line
+        resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
+
+        service_list = []
+        first_line_skipped = service_found = False
+        for line in resources:
+            if not service_found:
+                if len(line) >= 2 and line[0] == "==>" and line[1] == "v1/Service":
+                    service_found = True
+                    continue
+            else:
+                if len(line) >= 2 and line[0] == "==>":
+                    service_found = first_line_skipped = False
+                    continue
+                if not line:
+                    continue
+                if not first_line_skipped:
+                    first_line_skipped = True
+                    continue
+                service_list.append(line[0])
+
+        return service_list
+
     @staticmethod
     def _get_deep(dictionary: dict, members: tuple):
         target = dictionary
     @staticmethod
     def _get_deep(dictionary: dict, members: tuple):
         target = dictionary