Fix machine deletion when delete execution environment
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
index 2b15d76..496e320 100644 (file)
@@ -20,7 +20,6 @@
 # contact with: nfvlabs@tid.es
 ##
 
-import paramiko
 import subprocess
 import os
 import shutil
@@ -30,6 +29,7 @@ import yaml
 from uuid import uuid4
 import random
 from n2vc.k8s_conn import K8sConnector
+from n2vc.exceptions import K8sException
 
 
 class K8sHelmConnector(K8sConnector):
@@ -101,6 +101,17 @@ class K8sHelmConnector(K8sConnector):
             namespace: str = 'kube-system',
             reuse_cluster_uuid=None
     ) -> (str, bool):
+        """
+        It prepares a given K8s cluster environment to run Charts on both sides:
+            client (OSM)
+            server (Tiller)
+
+        :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
+        :param namespace: optional namespace to be used for helm. By default, 'kube-system' will be used
+        :param reuse_cluster_uuid: existing cluster uuid for reuse
+        :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
+        (on error, an exception will be raised)
+        """
 
         cluster_uuid = reuse_cluster_uuid
         if not cluster_uuid:
@@ -258,7 +269,7 @@ class K8sHelmConnector(K8sConnector):
                 msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
                     .format(cluster_uuid)
                 self.error(msg)
-                raise Exception(msg)
+                raise K8sException(msg)
 
         if uninstall_sw:
 
@@ -280,7 +291,6 @@ class K8sHelmConnector(K8sConnector):
             else:
                 msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
                 self.error(msg)
-                # raise Exception(msg)
 
             self.debug('namespace for tiller: {}'.format(namespace))
 
@@ -316,7 +326,8 @@ class K8sHelmConnector(K8sConnector):
             atomic: bool = True,
             timeout: float = 300,
             params: dict = None,
-            db_dict: dict = None
+            db_dict: dict = None,
+            kdu_name: str = None
     ):
 
         self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
@@ -346,7 +357,7 @@ class K8sHelmConnector(K8sConnector):
                 version_str = '--version {}'.format(parts[1])
                 kdu_model = parts[0]
 
-        # generate a name for the releas. Then, check if already exists
+        # generate a name for the release. Then, check if already exists
         kdu_instance = None
         while kdu_instance is None:
             kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
@@ -359,8 +370,8 @@ class K8sHelmConnector(K8sConnector):
                 if result is not None:
                     # instance already exists: generate a new one
                     kdu_instance = None
-            except Exception as e:
-                kdu_instance = None
+            except K8sException:
+                pass
 
         # helm repo install
         command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
@@ -373,6 +384,7 @@ class K8sHelmConnector(K8sConnector):
             exec_task = asyncio.ensure_future(
                 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
             )
+
             # write status in another task
             status_task = asyncio.ensure_future(
                 coro_or_future=self._store_status(
@@ -413,7 +425,7 @@ class K8sHelmConnector(K8sConnector):
         if rc != 0:
             msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
             self.error(msg)
-            raise Exception(msg)
+            raise K8sException(msg)
 
         self.debug('Returning kdu_instance {}'.format(kdu_instance))
         return kdu_instance
@@ -534,7 +546,7 @@ class K8sHelmConnector(K8sConnector):
         if rc != 0:
             msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
             self.error(msg)
-            raise Exception(msg)
+            raise K8sException(msg)
 
         # return new revision number
         instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
@@ -599,7 +611,7 @@ class K8sHelmConnector(K8sConnector):
         if rc != 0:
             msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
             self.error(msg)
-            raise Exception(msg)
+            raise K8sException(msg)
 
         # return new revision number
         instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
@@ -681,6 +693,72 @@ class K8sHelmConnector(K8sConnector):
             return_text=True
         )
 
+    async def synchronize_repos(self, cluster_uuid: str):
+
+        self.debug("syncronize repos for cluster helm-id: {}",)
+        try:
+            update_repos_timeout = 300 # max timeout to sync a single repos, more than this is too much
+            db_k8scluster = self.db.get_one("k8sclusters", {"_admin.helm-chart.id": cluster_uuid})
+            if db_k8scluster:
+                nbi_repo_list = db_k8scluster.get("_admin").get("helm_chart_repos") or []
+                cluster_repo_dict = db_k8scluster.get("_admin").get("helm_charts_added") or {}
+                # elements that must be deleted
+                deleted_repo_list = []
+                added_repo_dict = {}
+                self.debug("helm_chart_repos: {}".format(nbi_repo_list))
+                self.debug("helm_charts_added: {}".format(cluster_repo_dict))
+
+                # obtain repos to add: registered by nbi but not added
+                repos_to_add = [repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)]
+
+                # obtain repos to delete: added by cluster but not in nbi list
+                repos_to_delete = [repo for repo in cluster_repo_dict.keys() if repo not in nbi_repo_list]
+
+                # delete repos: must delete first then add because there may be different repos with same name but
+                # different id and url
+                self.debug("repos to delete: {}".format(repos_to_delete))
+                for repo_id in repos_to_delete:
+                    # try to delete repos
+                    try:
+                        repo_delete_task = asyncio.ensure_future(self.repo_remove(cluster_uuid=cluster_uuid,
+                                                                                  name=cluster_repo_dict[repo_id]))
+                        await asyncio.wait_for(repo_delete_task, update_repos_timeout)
+                    except Exception as e:
+                        self.warning("Error deleting repo, id: {}, name: {}, err_msg: {}".format(repo_id,
+                                cluster_repo_dict[repo_id], str(e)))
+                    # always add to the list of to_delete if there is an error because if is not there deleting raises error
+                    deleted_repo_list.append(repo_id)
+
+                # add repos
+                self.debug("repos to add: {}".format(repos_to_add))
+                add_task_list = []
+                for repo_id in repos_to_add:
+                    # obtain the repo data from the db
+                    # if there is an error getting the repo in the database we will ignore this repo and continue
+                    # because there is a possible race condition where the repo has been deleted while processing
+                    db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
+                    self.debug("obtained repo: id, {}, name: {}, url: {}".format(repo_id, db_repo["name"], db_repo["url"]))
+                    try:
+                        repo_add_task = asyncio.ensure_future(self.repo_add(cluster_uuid=cluster_uuid,
+                                                         name=db_repo["name"], url=db_repo["url"],
+                                                         repo_type="chart"))
+                        await asyncio.wait_for(repo_add_task, update_repos_timeout)
+                        added_repo_dict[repo_id] = db_repo["name"]
+                        self.debug("added repo: id, {}, name: {}".format(repo_id, db_repo["name"]))
+                    except Exception as e:
+                        # deal with error adding repo, adding a repo that already exists does not raise any error
+                        # will not raise error because a wrong repos added by anyone could prevent instantiating any ns
+                        self.error("Error adding repo id: {}, err_msg: {} ".format(repo_id, repr(e)))
+
+                return deleted_repo_list, added_repo_dict
+
+            else: # else db_k8scluster does not exist
+                raise K8sException("k8cluster with helm-id : {} not found".format(cluster_uuid))
+
+        except Exception as e:
+            self.error("Error synchronizing repos: {}".format(str(e)))
+            raise K8sException("Error synchronizing repos")
+
     """
     ##################################################################################################
     ########################################## P R I V A T E #########################################
@@ -770,6 +848,15 @@ class K8sHelmConnector(K8sConnector):
     def _generate_release_name(
             chart_name: str
     ):
+        # check embeded chart (file or dir)
+        if chart_name.startswith('/'):
+            # extract file or directory name
+            chart_name = chart_name[chart_name.rfind('/')+1:]
+        # check URL
+        elif '://' in chart_name:
+            # extract last portion of URL
+            chart_name = chart_name[chart_name.rfind('/')+1:]
+
         name = ''
         for c in chart_name:
             if c.isalpha() or c.isnumeric():
@@ -788,7 +875,7 @@ class K8sHelmConnector(K8sConnector):
         def get_random_number():
             r = random.randrange(start=1, stop=99999999)
             s = str(r)
-            s = s.rjust(width=10, fillchar=' ')
+            s = s.rjust(10, '0')
             return s
 
         name = name + get_random_number()
@@ -808,10 +895,8 @@ class K8sHelmConnector(K8sConnector):
                 await asyncio.sleep(check_every)
                 detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
                 status = detailed_status.get('info').get('Description')
-                print('=' * 60)
                 self.debug('STATUS:\n{}'.format(status))
                 self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
-                print('=' * 60)
                 # write status to db
                 result = await self.write_app_status_to_db(
                     db_dict=db_dict,
@@ -825,6 +910,7 @@ class K8sHelmConnector(K8sConnector):
                 self.debug('Task cancelled')
                 return
             except Exception as e:
+                self.debug('_store_status exception: {}'.format(str(e)))
                 pass
             finally:
                 if run_once:
@@ -1004,7 +1090,7 @@ class K8sHelmConnector(K8sConnector):
         if not os.path.exists(cluster_dir):
             msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
             self.error(msg)
-            raise Exception(msg)
+            raise K8sException(msg)
 
         # kube dir
         kube_dir = cluster_dir + '/' + '.kube'
@@ -1014,7 +1100,7 @@ class K8sHelmConnector(K8sConnector):
         if not os.path.exists(kube_dir):
             msg = 'Kube config dir {} does not exist'.format(kube_dir)
             self.error(msg)
-            raise Exception(msg)
+            raise K8sException(msg)
 
         # helm home dir
         helm_dir = cluster_dir + '/' + '.helm'
@@ -1024,7 +1110,7 @@ class K8sHelmConnector(K8sConnector):
         if not os.path.exists(helm_dir):
             msg = 'Helm config dir {} does not exist'.format(helm_dir)
             self.error(msg)
-            raise Exception(msg)
+            raise K8sException(msg)
 
         config_filename = kube_dir + '/config'
         return kube_dir, helm_dir, config_filename, cluster_dir
@@ -1093,7 +1179,7 @@ class K8sHelmConnector(K8sConnector):
                 self.debug('Return code: {}'.format(return_code))
 
             if raise_exception_on_error and return_code != 0:
-                raise Exception(output)
+                raise K8sException(output)
 
             if encode_utf8:
                 output = output.encode('utf-8').strip()
@@ -1101,39 +1187,17 @@ class K8sHelmConnector(K8sConnector):
 
             return output, return_code
 
+        except asyncio.CancelledError:
+            raise
+        except K8sException:
+            raise
         except Exception as e:
             msg = 'Exception executing command: {} -> {}'.format(command, e)
-            if show_error_log:
-                self.error(msg)
-            return '', -1
-
-    def _remote_exec(
-            self,
-            hostname: str,
-            username: str,
-            password: str,
-            command: str,
-            timeout: int = 10
-    ) -> (str, int):
-
-        command = K8sHelmConnector._remove_multiple_spaces(command)
-        self.debug('Executing sync remote ssh command: {}'.format(command))
-
-        ssh = paramiko.SSHClient()
-        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-        ssh.connect(hostname=hostname, username=username, password=password)
-        ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
-        output = ssh_stdout.read().decode('utf-8')
-        error = ssh_stderr.read().decode('utf-8')
-        if error:
-            self.error('ERROR: {}'.format(error))
-            return_code = 1
-        else:
-            return_code = 0
-        output = output.replace('\\n', '\n')
-        self.debug('OUTPUT: {}'.format(output))
-
-        return output, return_code
+            self.error(msg)
+            if raise_exception_on_error:
+                raise K8sException(e) from e
+            else:
+                return '', -1
 
     def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
         self.debug('Checking if file {} exists...'.format(filename))
@@ -1143,4 +1207,6 @@ class K8sHelmConnector(K8sConnector):
             msg = 'File {} does not exist'.format(filename)
             if exception_if_not_exists:
                 self.error(msg)
-                raise Exception(msg)
+                raise K8sException(msg)
+
+