K8s action support
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
index 88c94c5..d3fbed6 100644 (file)
 # contact with: nfvlabs@tid.es
 ##
 
 # contact with: nfvlabs@tid.es
 ##
 
-import paramiko
 import subprocess
 import os
 import shutil
 import asyncio
 import subprocess
 import os
 import shutil
 import asyncio
-import uuid
 import time
 import yaml
 from uuid import uuid4
 import random
 from n2vc.k8s_conn import K8sConnector
 import time
 import yaml
 from uuid import uuid4
 import random
 from n2vc.k8s_conn import K8sConnector
+from n2vc.exceptions import K8sException
 
 
 class K8sHelmConnector(K8sConnector):
 
 
 class K8sHelmConnector(K8sConnector):
@@ -68,7 +67,7 @@ class K8sHelmConnector(K8sConnector):
             on_update_db=on_update_db
         )
 
             on_update_db=on_update_db
         )
 
-        self.info('Initializing K8S Helm connector')
+        self.log.info('Initializing K8S Helm connector')
 
         # random numbers for release name generation
         random.seed(time.time())
 
         # random numbers for release name generation
         random.seed(time.time())
@@ -84,7 +83,17 @@ class K8sHelmConnector(K8sConnector):
         self._helm_command = helm_command
         self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
 
         self._helm_command = helm_command
         self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
 
-        self.info('K8S Helm connector initialized')
+        # initialize helm client-only
+        self.log.debug('Initializing helm client-only...')
+        command = '{} init --client-only'.format(self._helm_command)
+        try:
+            asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False))
+            # loop = asyncio.get_event_loop()
+            # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False))
+        except Exception as e:
+            self.warning(msg='helm init failed (it was already initialized): {}'.format(e))
+
+        self.log.info('K8S Helm connector initialized')
 
     async def init_env(
             self,
 
     async def init_env(
             self,
@@ -92,15 +101,27 @@ class K8sHelmConnector(K8sConnector):
             namespace: str = 'kube-system',
             reuse_cluster_uuid=None
     ) -> (str, bool):
             namespace: str = 'kube-system',
             reuse_cluster_uuid=None
     ) -> (str, bool):
+        """
+        It prepares a given K8s cluster environment to run Charts on both sides:
+            client (OSM)
+            server (Tiller)
+
+        :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
+        :param namespace: optional namespace to be used for helm. By default, 'kube-system' will be used
+        :param reuse_cluster_uuid: existing cluster uuid for reuse
+        :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
+        (on error, an exception will be raised)
+        """
 
         cluster_uuid = reuse_cluster_uuid
         if not cluster_uuid:
             cluster_uuid = str(uuid4())
 
 
         cluster_uuid = reuse_cluster_uuid
         if not cluster_uuid:
             cluster_uuid = str(uuid4())
 
-        self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
+        self.log.debug('Initializing K8S environment. namespace: {}'.format(namespace))
 
         # create config filename
 
         # create config filename
-        kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
         f = open(config_filename, "w")
         f.write(k8s_creds)
         f.close()
         f = open(config_filename, "w")
         f.write(k8s_creds)
         f.close()
@@ -125,7 +146,7 @@ class K8sHelmConnector(K8sConnector):
         # helm init
         n2vc_installed_sw = False
         if not already_initialized:
         # helm init
         n2vc_installed_sw = False
         if not already_initialized:
-            self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
+            self.log.info('Initializing helm in client and server: {}'.format(cluster_uuid))
             command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
                 .format(self._helm_command, config_filename, namespace, helm_dir)
             output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
             command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
                 .format(self._helm_command, config_filename, namespace, helm_dir)
             output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
@@ -134,14 +155,14 @@ class K8sHelmConnector(K8sConnector):
             # check client helm installation
             check_file = helm_dir + '/repository/repositories.yaml'
             if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
             # check client helm installation
             check_file = helm_dir + '/repository/repositories.yaml'
             if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
-                self.info('Initializing helm in client: {}'.format(cluster_uuid))
+                self.log.info('Initializing helm in client: {}'.format(cluster_uuid))
                 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
                     .format(self._helm_command, config_filename, namespace, helm_dir)
                 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
             else:
                 command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
                     .format(self._helm_command, config_filename, namespace, helm_dir)
                 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
             else:
-                self.info('Helm client already initialized')
+                self.log.info('Helm client already initialized')
 
 
-        self.info('Cluster initialized {}'.format(cluster_uuid))
+        self.log.info('Cluster initialized {}'.format(cluster_uuid))
 
         return cluster_uuid, n2vc_installed_sw
 
 
         return cluster_uuid, n2vc_installed_sw
 
@@ -153,20 +174,21 @@ class K8sHelmConnector(K8sConnector):
             repo_type: str = 'chart'
     ):
 
             repo_type: str = 'chart'
     ):
 
-        self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
+        self.log.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
 
         # config filename
 
         # config filename
-        kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
 
         # helm repo update
         command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
 
         # helm repo update
         command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
-        self.debug('updating repo: {}'.format(command))
+        self.log.debug('updating repo: {}'.format(command))
         await self._local_async_exec(command=command, raise_exception_on_error=False)
 
         # helm repo add name url
         command = '{} --kubeconfig={} --home={} repo add {} {}'\
             .format(self._helm_command, config_filename, helm_dir, name, url)
         await self._local_async_exec(command=command, raise_exception_on_error=False)
 
         # helm repo add name url
         command = '{} --kubeconfig={} --home={} repo add {} {}'\
             .format(self._helm_command, config_filename, helm_dir, name, url)
-        self.debug('adding repo: {}'.format(command))
+        self.log.debug('adding repo: {}'.format(command))
         await self._local_async_exec(command=command, raise_exception_on_error=True)
 
     async def repo_list(
         await self._local_async_exec(command=command, raise_exception_on_error=True)
 
     async def repo_list(
@@ -179,12 +201,14 @@ class K8sHelmConnector(K8sConnector):
         :return: list of registered repositories: [ (name, url) .... ]
         """
 
         :return: list of registered repositories: [ (name, url) .... ]
         """
 
-        self.debug('list repositories for cluster {}'.format(cluster_uuid))
+        self.log.debug('list repositories for cluster {}'.format(cluster_uuid))
 
         # config filename
 
         # config filename
-        kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
 
 
-        command = '{} --kubeconfig={} --home={} repo list --output yaml'.format(self._helm_command, config_filename, helm_dir)
+        command = '{} --kubeconfig={} --home={} repo list --output yaml'\
+            .format(self._helm_command, config_filename, helm_dir)
 
         output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
         if output and len(output) > 0:
 
         output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
         if output and len(output) > 0:
@@ -205,10 +229,11 @@ class K8sHelmConnector(K8sConnector):
         :return: True if successful
         """
 
         :return: True if successful
         """
 
-        self.debug('list repositories for cluster {}'.format(cluster_uuid))
+        self.log.debug('list repositories for cluster {}'.format(cluster_uuid))
 
         # config filename
 
         # config filename
-        kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
 
         command = '{} --kubeconfig={} --home={} repo remove {}'\
             .format(self._helm_command, config_filename, helm_dir, name)
 
         command = '{} --kubeconfig={} --home={} repo remove {}'\
             .format(self._helm_command, config_filename, helm_dir, name)
@@ -222,10 +247,11 @@ class K8sHelmConnector(K8sConnector):
             uninstall_sw: bool = False
     ) -> bool:
 
             uninstall_sw: bool = False
     ) -> bool:
 
-        self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
+        self.log.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
 
         # get kube and helm directories
 
         # get kube and helm directories
-        kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
 
         # uninstall releases if needed
         releases = await self.instances_list(cluster_uuid=cluster_uuid)
 
         # uninstall releases if needed
         releases = await self.instances_list(cluster_uuid=cluster_uuid)
@@ -235,19 +261,19 @@ class K8sHelmConnector(K8sConnector):
                     try:
                         kdu_instance = r.get('Name')
                         chart = r.get('Chart')
                     try:
                         kdu_instance = r.get('Name')
                         chart = r.get('Chart')
-                        self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
+                        self.log.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
                         await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
                     except Exception as e:
                         await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
                     except Exception as e:
-                        self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
+                        self.log.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
             else:
                 msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
                     .format(cluster_uuid)
             else:
                 msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
                     .format(cluster_uuid)
-                self.error(msg)
-                raise Exception(msg)
+                self.log.error(msg)
+                raise K8sException(msg)
 
         if uninstall_sw:
 
 
         if uninstall_sw:
 
-            self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
+            self.log.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
 
             # find namespace for tiller pod
             command = '{} --kubeconfig={} get deployments --all-namespaces'\
 
             # find namespace for tiller pod
             command = '{} --kubeconfig={} get deployments --all-namespaces'\
@@ -264,32 +290,31 @@ class K8sHelmConnector(K8sConnector):
                     pass
             else:
                 msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
                     pass
             else:
                 msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
-                self.error(msg)
-                # raise Exception(msg)
+                self.log.error(msg)
 
 
-            self.debug('namespace for tiller: {}'.format(namespace))
+            self.log.debug('namespace for tiller: {}'.format(namespace))
 
             force_str = '--force'
 
             if namespace:
                 # delete tiller deployment
 
             force_str = '--force'
 
             if namespace:
                 # delete tiller deployment
-                self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
+                self.log.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
                 command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
                     .format(self.kubectl_command, namespace, config_filename, force_str)
                 await self._local_async_exec(command=command, raise_exception_on_error=False)
 
                 # uninstall tiller from cluster
                 command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
                     .format(self.kubectl_command, namespace, config_filename, force_str)
                 await self._local_async_exec(command=command, raise_exception_on_error=False)
 
                 # uninstall tiller from cluster
-                self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
+                self.log.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
                 command = '{} --kubeconfig={} --home={} reset'\
                     .format(self._helm_command, config_filename, helm_dir)
                 command = '{} --kubeconfig={} --home={} reset'\
                     .format(self._helm_command, config_filename, helm_dir)
-                self.debug('resetting: {}'.format(command))
+                self.log.debug('resetting: {}'.format(command))
                 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
             else:
                 output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
             else:
-                self.debug('namespace not found')
+                self.log.debug('namespace not found')
 
         # delete cluster directory
         dir = self.fs.path + '/' + cluster_uuid
 
         # delete cluster directory
         dir = self.fs.path + '/' + cluster_uuid
-        self.debug('Removing directory {}'.format(dir))
+        self.log.debug('Removing directory {}'.format(dir))
         shutil.rmtree(dir, ignore_errors=True)
 
         return True
         shutil.rmtree(dir, ignore_errors=True)
 
         return True
@@ -301,19 +326,20 @@ class K8sHelmConnector(K8sConnector):
             atomic: bool = True,
             timeout: float = 300,
             params: dict = None,
             atomic: bool = True,
             timeout: float = 300,
             params: dict = None,
-            db_dict: dict = None
+            db_dict: dict = None,
+            kdu_name: str = None,
+            namespace: str = None
     ):
 
     ):
 
-        self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
-
-        start = time.time()
-        end = start + timeout
+        self.log.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
 
         # config filename
 
         # config filename
-        kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
 
         # params to str
 
         # params to str
-        params_str = K8sHelmConnector._params_to_set_option(params)
+        # params_str = K8sHelmConnector._params_to_set_option(params)
+        params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
 
         timeout_str = ''
         if timeout:
 
         timeout_str = ''
         if timeout:
@@ -323,6 +349,10 @@ class K8sHelmConnector(K8sConnector):
         atomic_str = ''
         if atomic:
             atomic_str = '--atomic'
         atomic_str = ''
         if atomic:
             atomic_str = '--atomic'
+        # namespace
+        namespace_str = ''
+        if namespace:
+            namespace_str = "--namespace {}".format(namespace)
 
         # version
         version_str = ''
 
         # version
         version_str = ''
@@ -332,7 +362,7 @@ class K8sHelmConnector(K8sConnector):
                 version_str = '--version {}'.format(parts[1])
                 kdu_model = parts[0]
 
                 version_str = '--version {}'.format(parts[1])
                 kdu_model = parts[0]
 
-        # generate a name for the releas. Then, check if already exists
+        # generate a name for the release. Then, check if already exists
         kdu_instance = None
         while kdu_instance is None:
             kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
         kdu_instance = None
         while kdu_instance is None:
             kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
@@ -345,20 +375,23 @@ class K8sHelmConnector(K8sConnector):
                 if result is not None:
                     # instance already exists: generate a new one
                     kdu_instance = None
                 if result is not None:
                     # instance already exists: generate a new one
                     kdu_instance = None
-            except:
-                kdu_instance = None
+            except K8sException:
+                pass
 
         # helm repo install
 
         # helm repo install
-        command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
-            .format(self._helm_command, atomic_str, config_filename, helm_dir,
-                    params_str, timeout_str, kdu_instance, kdu_model, version_str)
-        self.debug('installing: {}'.format(command))
+        command = '{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} {params} {timeout} ' \
+                  '--name={name} {ns} {model} {ver}'.format(helm=self._helm_command, atomic=atomic_str,
+                                                            config=config_filename, dir=helm_dir, params=params_str,
+                                                            timeout=timeout_str, name=kdu_instance, ns=namespace_str,
+                                                            model=kdu_model, ver=version_str)
+        self.log.debug('installing: {}'.format(command))
 
         if atomic:
             # exec helm in a task
             exec_task = asyncio.ensure_future(
                 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
             )
 
         if atomic:
             # exec helm in a task
             exec_task = asyncio.ensure_future(
                 coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
             )
+
             # write status in another task
             status_task = asyncio.ensure_future(
                 coro_or_future=self._store_status(
             # write status in another task
             status_task = asyncio.ensure_future(
                 coro_or_future=self._store_status(
@@ -382,6 +415,10 @@ class K8sHelmConnector(K8sConnector):
 
             output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
 
 
             output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
 
+        # remove temporal values yaml file
+        if file_to_delete:
+            os.remove(file_to_delete)
+
         # write final status
         await self._store_status(
             cluster_uuid=cluster_uuid,
         # write final status
         await self._store_status(
             cluster_uuid=cluster_uuid,
@@ -394,10 +431,10 @@ class K8sHelmConnector(K8sConnector):
 
         if rc != 0:
             msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
 
         if rc != 0:
             msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
-            self.error(msg)
-            raise Exception(msg)
+            self.log.error(msg)
+            raise K8sException(msg)
 
 
-        self.debug('Returning kdu_instance {}'.format(kdu_instance))
+        self.log.debug('Returning kdu_instance {}'.format(kdu_instance))
         return kdu_instance
 
     async def instances_list(
         return kdu_instance
 
     async def instances_list(
@@ -411,10 +448,11 @@ class K8sHelmConnector(K8sConnector):
         :return:
         """
 
         :return:
         """
 
-        self.debug('list releases for cluster {}'.format(cluster_uuid))
+        self.log.debug('list releases for cluster {}'.format(cluster_uuid))
 
         # config filename
 
         # config filename
-        kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
 
         command = '{} --kubeconfig={} --home={} list --output yaml'\
             .format(self._helm_command, config_filename, helm_dir)
 
         command = '{} --kubeconfig={} --home={} list --output yaml'\
             .format(self._helm_command, config_filename, helm_dir)
@@ -437,16 +475,15 @@ class K8sHelmConnector(K8sConnector):
             db_dict: dict = None
     ):
 
             db_dict: dict = None
     ):
 
-        self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
-
-        start = time.time()
-        end = start + timeout
+        self.log.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
 
         # config filename
 
         # config filename
-        kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
 
         # params to str
 
         # params to str
-        params_str = K8sHelmConnector._params_to_set_option(params)
+        # params_str = K8sHelmConnector._params_to_set_option(params)
+        params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
 
         timeout_str = ''
         if timeout:
 
         timeout_str = ''
         if timeout:
@@ -459,7 +496,7 @@ class K8sHelmConnector(K8sConnector):
 
         # version
         version_str = ''
 
         # version
         version_str = ''
-        if ':' in kdu_model:
+        if kdu_model and ':' in kdu_model:
             parts = kdu_model.split(sep=':')
             if len(parts) == 2:
                 version_str = '--version {}'.format(parts[1])
             parts = kdu_model.split(sep=':')
             if len(parts) == 2:
                 version_str = '--version {}'.format(parts[1])
@@ -469,7 +506,7 @@ class K8sHelmConnector(K8sConnector):
         command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
             .format(self._helm_command, atomic_str, config_filename, helm_dir,
                     params_str, timeout_str, kdu_instance, kdu_model, version_str)
         command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
             .format(self._helm_command, atomic_str, config_filename, helm_dir,
                     params_str, timeout_str, kdu_instance, kdu_model, version_str)
-        self.debug('upgrading: {}'.format(command))
+        self.log.debug('upgrading: {}'.format(command))
 
         if atomic:
 
 
         if atomic:
 
@@ -489,7 +526,7 @@ class K8sHelmConnector(K8sConnector):
             )
 
             # wait for execution task
             )
 
             # wait for execution task
-            await asyncio.wait([ exec_task ])
+            await asyncio.wait([exec_task])
 
             # cancel status task
             status_task.cancel()
 
             # cancel status task
             status_task.cancel()
@@ -499,6 +536,10 @@ class K8sHelmConnector(K8sConnector):
 
             output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
 
 
             output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
 
+        # remove temporal values yaml file
+        if file_to_delete:
+            os.remove(file_to_delete)
+
         # write final status
         await self._store_status(
             cluster_uuid=cluster_uuid,
         # write final status
         await self._store_status(
             cluster_uuid=cluster_uuid,
@@ -511,14 +552,14 @@ class K8sHelmConnector(K8sConnector):
 
         if rc != 0:
             msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
 
         if rc != 0:
             msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
-            self.error(msg)
-            raise Exception(msg)
+            self.log.error(msg)
+            raise K8sException(msg)
 
         # return new revision number
         instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
         if instance:
             revision = int(instance.get('Revision'))
 
         # return new revision number
         instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
         if instance:
             revision = int(instance.get('Revision'))
-            self.debug('New revision: {}'.format(revision))
+            self.log.debug('New revision: {}'.format(revision))
             return revision
         else:
             return 0
             return revision
         else:
             return 0
@@ -531,11 +572,12 @@ class K8sHelmConnector(K8sConnector):
             db_dict: dict = None
     ):
 
             db_dict: dict = None
     ):
 
-        self.debug('rollback kdu_instance {} to revision {} from cluster {}'
+        self.log.debug('rollback kdu_instance {} to revision {} from cluster {}'
                    .format(kdu_instance, revision, cluster_uuid))
 
         # config filename
                    .format(kdu_instance, revision, cluster_uuid))
 
         # config filename
-        kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
 
         command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
             .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
 
         command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
             .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
@@ -575,14 +617,14 @@ class K8sHelmConnector(K8sConnector):
 
         if rc != 0:
             msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
 
         if rc != 0:
             msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
-            self.error(msg)
-            raise Exception(msg)
+            self.log.error(msg)
+            raise K8sException(msg)
 
         # return new revision number
         instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
         if instance:
             revision = int(instance.get('Revision'))
 
         # return new revision number
         instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
         if instance:
             revision = int(instance.get('Revision'))
-            self.debug('New revision: {}'.format(revision))
+            self.log.debug('New revision: {}'.format(revision))
             return revision
         else:
             return 0
             return revision
         else:
             return 0
@@ -601,10 +643,11 @@ class K8sHelmConnector(K8sConnector):
         :return: True if successful
         """
 
         :return: True if successful
         """
 
-        self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
+        self.log.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
 
         # config filename
 
         # config filename
-        kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
 
         command = '{} --kubeconfig={} --home={} delete --purge {}'\
             .format(self._helm_command, config_filename, helm_dir, kdu_instance)
 
         command = '{} --kubeconfig={} --home={} delete --purge {}'\
             .format(self._helm_command, config_filename, helm_dir, kdu_instance)
@@ -613,42 +656,138 @@ class K8sHelmConnector(K8sConnector):
 
         return self._output_to_table(output)
 
 
         return self._output_to_table(output)
 
+    async def exec_primitive(
+        self,
+        cluster_uuid: str = None,
+        kdu_instance: str = None,
+        primitive_name: str = None,
+        timeout: float = 300,
+        params: dict = None,
+        db_dict: dict = None,
+    ) -> str:
+        """Exec primitive (Juju action)
+
+        :param cluster_uuid str: The UUID of the cluster
+        :param kdu_instance str: The unique name of the KDU instance
+        :param primitive_name: Name of action that will be executed
+        :param timeout: Timeout for action execution
+        :param params: Dictionary of all the parameters needed for the action
+        :db_dict: Dictionary for any additional data
+
+        :return: Returns the output of the action
+        """
+        raise K8sException("KDUs deployed with Helm don't support actions "
+                           "different from rollback, upgrade and status")
+
     async def inspect_kdu(
             self,
     async def inspect_kdu(
             self,
-            kdu_model: str
+            kdu_model: str,
+            repo_url: str = None
     ) -> str:
 
     ) -> str:
 
-        self.debug('inspect kdu_model {}'.format(kdu_model))
+        self.log.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url))
 
 
-        command = '{} inspect values {}'\
-            .format(self._helm_command, kdu_model)
+        return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url)
 
 
-        output, rc = await self._local_async_exec(command=command)
+    async def values_kdu(
+            self,
+            kdu_model: str,
+            repo_url: str = None
+    ) -> str:
 
 
-        return output
+        self.log.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url))
+
+        return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url)
 
     async def help_kdu(
             self,
 
     async def help_kdu(
             self,
-            kdu_model: str
-    ):
-
-        self.debug('help kdu_model {}'.format(kdu_model))
-
-        command = '{} inspect readme {}'\
-            .format(self._helm_command, kdu_model)
+            kdu_model: str,
+            repo_url: str = None
+    ) -> str:
 
 
-        output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+        self.log.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url))
 
 
-        return output
+        return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url)
 
     async def status_kdu(
             self,
             cluster_uuid: str,
             kdu_instance: str
 
     async def status_kdu(
             self,
             cluster_uuid: str,
             kdu_instance: str
-    ):
+    ) -> str:
 
 
-        return await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, show_error_log=True)
+        # call internal function
+        return await self._status_kdu(
+            cluster_uuid=cluster_uuid,
+            kdu_instance=kdu_instance,
+            show_error_log=True,
+            return_text=True
+        )
 
 
+    async def synchronize_repos(self, cluster_uuid: str):
+
+        self.log.debug("syncronize repos for cluster helm-id: {}",)
+        try:
+            update_repos_timeout = 300 # max timeout to sync a single repos, more than this is too much
+            db_k8scluster = self.db.get_one("k8sclusters", {"_admin.helm-chart.id": cluster_uuid})
+            if db_k8scluster:
+                nbi_repo_list = db_k8scluster.get("_admin").get("helm_chart_repos") or []
+                cluster_repo_dict = db_k8scluster.get("_admin").get("helm_charts_added") or {}
+                # elements that must be deleted
+                deleted_repo_list = []
+                added_repo_dict = {}
+                self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
+                self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
+
+                # obtain repos to add: registered by nbi but not added
+                repos_to_add = [repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)]
+
+                # obtain repos to delete: added by cluster but not in nbi list
+                repos_to_delete = [repo for repo in cluster_repo_dict.keys() if repo not in nbi_repo_list]
+
+                # delete repos: must delete first then add because there may be different repos with same name but
+                # different id and url
+                self.log.debug("repos to delete: {}".format(repos_to_delete))
+                for repo_id in repos_to_delete:
+                    # try to delete repos
+                    try:
+                        repo_delete_task = asyncio.ensure_future(self.repo_remove(cluster_uuid=cluster_uuid,
+                                                                                  name=cluster_repo_dict[repo_id]))
+                        await asyncio.wait_for(repo_delete_task, update_repos_timeout)
+                    except Exception as e:
+                        self.warning("Error deleting repo, id: {}, name: {}, err_msg: {}".format(repo_id,
+                                cluster_repo_dict[repo_id], str(e)))
+                    # always add to the list of to_delete if there is an error because if is not there deleting raises error
+                    deleted_repo_list.append(repo_id)
+
+                # add repos
+                self.log.debug("repos to add: {}".format(repos_to_add))
+                add_task_list = []
+                for repo_id in repos_to_add:
+                    # obtain the repo data from the db
+                    # if there is an error getting the repo in the database we will ignore this repo and continue
+                    # because there is a possible race condition where the repo has been deleted while processing
+                    db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
+                    self.log.debug("obtained repo: id, {}, name: {}, url: {}".format(repo_id, db_repo["name"], db_repo["url"]))
+                    try:
+                        repo_add_task = asyncio.ensure_future(self.repo_add(cluster_uuid=cluster_uuid,
+                                                         name=db_repo["name"], url=db_repo["url"],
+                                                         repo_type="chart"))
+                        await asyncio.wait_for(repo_add_task, update_repos_timeout)
+                        added_repo_dict[repo_id] = db_repo["name"]
+                        self.log.debug("added repo: id, {}, name: {}".format(repo_id, db_repo["name"]))
+                    except Exception as e:
+                        # deal with error adding repo, adding a repo that already exists does not raise any error
+                        # will not raise error because a wrong repos added by anyone could prevent instantiating any ns
+                        self.log.error("Error adding repo id: {}, err_msg: {} ".format(repo_id, repr(e)))
+
+                return deleted_repo_list, added_repo_dict
+
+            else: # else db_k8scluster does not exist
+                raise K8sException("k8cluster with helm-id : {} not found".format(cluster_uuid))
+
+        except Exception as e:
+            self.log.error("Error synchronizing repos: {}".format(str(e)))
+            raise K8sException("Error synchronizing repos")
 
     """
     ##################################################################################################
 
     """
     ##################################################################################################
@@ -656,17 +795,39 @@ class K8sHelmConnector(K8sConnector):
     ##################################################################################################
     """
 
     ##################################################################################################
     """
 
+    async def _exec_inspect_comand(
+            self,
+            inspect_command: str,
+            kdu_model: str,
+            repo_url: str = None
+    ):
+
+        repo_str = ''
+        if repo_url:
+            repo_str = ' --repo {}'.format(repo_url)
+            idx = kdu_model.find('/')
+            if idx >= 0:
+                idx += 1
+                kdu_model = kdu_model[idx:]
+
+        inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str)
+        output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True)
+
+        return output
+
     async def _status_kdu(
             self,
             cluster_uuid: str,
             kdu_instance: str,
     async def _status_kdu(
             self,
             cluster_uuid: str,
             kdu_instance: str,
-            show_error_log: bool = False
+            show_error_log: bool = False,
+            return_text: bool = False
     ):
 
     ):
 
-        self.debug('status of kdu_instance {}'.format(kdu_instance))
+        self.log.debug('status of kdu_instance {}'.format(kdu_instance))
 
         # config filename
 
         # config filename
-        kube_dir, helm_dir, config_filename = self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+        kube_dir, helm_dir, config_filename, cluster_dir = \
+            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
 
         command = '{} --kubeconfig={} --home={} status {} --output yaml'\
             .format(self._helm_command, config_filename, helm_dir, kdu_instance)
 
         command = '{} --kubeconfig={} --home={} status {} --output yaml'\
             .format(self._helm_command, config_filename, helm_dir, kdu_instance)
@@ -677,6 +838,9 @@ class K8sHelmConnector(K8sConnector):
             show_error_log=show_error_log
         )
 
             show_error_log=show_error_log
         )
 
+        if return_text:
+            return str(output)
+
         if rc != 0:
             return None
 
         if rc != 0:
             return None
 
@@ -698,7 +862,6 @@ class K8sHelmConnector(K8sConnector):
 
         return data
 
 
         return data
 
-
     async def get_instance_info(
             self,
             cluster_uuid: str,
     async def get_instance_info(
             self,
             cluster_uuid: str,
@@ -708,13 +871,22 @@ class K8sHelmConnector(K8sConnector):
         for instance in instances:
             if instance.get('Name') == kdu_instance:
                 return instance
         for instance in instances:
             if instance.get('Name') == kdu_instance:
                 return instance
-        self.debug('Instance {} not found'.format(kdu_instance))
+        self.log.debug('Instance {} not found'.format(kdu_instance))
         return None
 
     @staticmethod
     def _generate_release_name(
             chart_name: str
     ):
         return None
 
     @staticmethod
     def _generate_release_name(
             chart_name: str
     ):
+        # check embeded chart (file or dir)
+        if chart_name.startswith('/'):
+            # extract file or directory name
+            chart_name = chart_name[chart_name.rfind('/')+1:]
+        # check URL
+        elif '://' in chart_name:
+            # extract last portion of URL
+            chart_name = chart_name[chart_name.rfind('/')+1:]
+
         name = ''
         for c in chart_name:
             if c.isalpha() or c.isnumeric():
         name = ''
         for c in chart_name:
             if c.isalpha() or c.isnumeric():
@@ -733,8 +905,7 @@ class K8sHelmConnector(K8sConnector):
         def get_random_number():
             r = random.randrange(start=1, stop=99999999)
             s = str(r)
         def get_random_number():
             r = random.randrange(start=1, stop=99999999)
             s = str(r)
-            while len(s) < 10:
-                s = '0' + s
+            s = s.rjust(10, '0')
             return s
 
         name = name + get_random_number()
             return s
 
         name = name + get_random_number()
@@ -754,10 +925,8 @@ class K8sHelmConnector(K8sConnector):
                 await asyncio.sleep(check_every)
                 detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
                 status = detailed_status.get('info').get('Description')
                 await asyncio.sleep(check_every)
                 detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
                 status = detailed_status.get('info').get('Description')
-                print('=' * 60)
-                self.debug('STATUS:\n{}'.format(status))
-                self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
-                print('=' * 60)
+                self.log.debug('STATUS:\n{}'.format(status))
+                self.log.debug('DETAILED STATUS:\n{}'.format(detailed_status))
                 # write status to db
                 result = await self.write_app_status_to_db(
                     db_dict=db_dict,
                 # write status to db
                 result = await self.write_app_status_to_db(
                     db_dict=db_dict,
@@ -765,12 +934,13 @@ class K8sHelmConnector(K8sConnector):
                     detailed_status=str(detailed_status),
                     operation=operation)
                 if not result:
                     detailed_status=str(detailed_status),
                     operation=operation)
                 if not result:
-                    self.info('Error writing in database. Task exiting...')
+                    self.log.info('Error writing in database. Task exiting...')
                     return
             except asyncio.CancelledError:
                     return
             except asyncio.CancelledError:
-                self.debug('Task cancelled')
+                self.log.debug('Task cancelled')
                 return
             except Exception as e:
                 return
             except Exception as e:
+                self.log.debug('_store_status exception: {}'.format(str(e)))
                 pass
             finally:
                 if run_once:
                 pass
             finally:
                 if run_once:
@@ -782,7 +952,7 @@ class K8sHelmConnector(K8sConnector):
             kdu_instance: str
     ) -> bool:
 
             kdu_instance: str
     ) -> bool:
 
-        status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+        status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False)
 
         # extract info.status.resources-> str
         # format:
 
         # extract info.status.resources-> str
         # format:
@@ -817,7 +987,7 @@ class K8sHelmConnector(K8sConnector):
                             current = int(parts[0])
                             total = int(parts[1])
                             if current < total:
                             current = int(parts[0])
                             total = int(parts[1])
                             if current < total:
-                                self.debug('NOT READY:\n    {}'.format(line3))
+                                self.log.debug('NOT READY:\n    {}'.format(line3))
                                 ready = False
                             line3 = resources[index]
                             index += 1
                                 ready = False
                             line3 = resources[index]
                             index += 1
@@ -856,6 +1026,36 @@ class K8sHelmConnector(K8sConnector):
                 pass
         return None
 
                 pass
         return None
 
+    # params for use in -f file
+    # returns values file option and filename (in order to delete it at the end)
+    def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
+
+        if params and len(params) > 0:
+            kube_dir, helm_dir, config_filename, cluster_dir = \
+                self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+
+            def get_random_number():
+                r = random.randrange(start=1, stop=99999999)
+                s = str(r)
+                while len(s) < 10:
+                    s = '0' + s
+                return s
+
+            params2 = dict()
+            for key in params:
+                value = params.get(key)
+                if '!!yaml' in str(value):
+                    value = yaml.load(value[7:])
+                params2[key] = value
+
+            values_file = get_random_number() + '.yaml'
+            with open(values_file, 'w') as stream:
+                yaml.dump(params2, stream, indent=4, default_flow_style=False)
+
+            return '-f {}'.format(values_file), values_file
+
+        return '', None
+
     # params for use in --set option
     @staticmethod
     def _params_to_set_option(params: dict) -> str:
     # params for use in --set option
     @staticmethod
     def _params_to_set_option(params: dict) -> str:
@@ -898,13 +1098,14 @@ class K8sHelmConnector(K8sConnector):
                     line_list.append(cell)
         return output_table
 
                     line_list.append(cell)
         return output_table
 
-    def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str):
+    def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
         """
         Returns kube and helm directories
 
         :param cluster_name:
         :param create_if_not_exist:
         """
         Returns kube and helm directories
 
         :param cluster_name:
         :param create_if_not_exist:
-        :return: kube, helm directories and config filename. Raises exception if not exist and cannot create
+        :return: kube, helm directories, config filename and cluster dir.
+                Raises exception if not exist and cannot create
         """
 
         base = self.fs.path
         """
 
         base = self.fs.path
@@ -914,35 +1115,35 @@ class K8sHelmConnector(K8sConnector):
         # base dir for cluster
         cluster_dir = base + '/' + cluster_name
         if create_if_not_exist and not os.path.exists(cluster_dir):
         # base dir for cluster
         cluster_dir = base + '/' + cluster_name
         if create_if_not_exist and not os.path.exists(cluster_dir):
-            self.debug('Creating dir {}'.format(cluster_dir))
+            self.log.debug('Creating dir {}'.format(cluster_dir))
             os.makedirs(cluster_dir)
         if not os.path.exists(cluster_dir):
             msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
             os.makedirs(cluster_dir)
         if not os.path.exists(cluster_dir):
             msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
-            self.error(msg)
-            raise Exception(msg)
+            self.log.error(msg)
+            raise K8sException(msg)
 
         # kube dir
         kube_dir = cluster_dir + '/' + '.kube'
         if create_if_not_exist and not os.path.exists(kube_dir):
 
         # kube dir
         kube_dir = cluster_dir + '/' + '.kube'
         if create_if_not_exist and not os.path.exists(kube_dir):
-            self.debug('Creating dir {}'.format(kube_dir))
+            self.log.debug('Creating dir {}'.format(kube_dir))
             os.makedirs(kube_dir)
         if not os.path.exists(kube_dir):
             msg = 'Kube config dir {} does not exist'.format(kube_dir)
             os.makedirs(kube_dir)
         if not os.path.exists(kube_dir):
             msg = 'Kube config dir {} does not exist'.format(kube_dir)
-            self.error(msg)
-            raise Exception(msg)
+            self.log.error(msg)
+            raise K8sException(msg)
 
         # helm home dir
         helm_dir = cluster_dir + '/' + '.helm'
         if create_if_not_exist and not os.path.exists(helm_dir):
 
         # helm home dir
         helm_dir = cluster_dir + '/' + '.helm'
         if create_if_not_exist and not os.path.exists(helm_dir):
-            self.debug('Creating dir {}'.format(helm_dir))
+            self.log.debug('Creating dir {}'.format(helm_dir))
             os.makedirs(helm_dir)
         if not os.path.exists(helm_dir):
             msg = 'Helm config dir {} does not exist'.format(helm_dir)
             os.makedirs(helm_dir)
         if not os.path.exists(helm_dir):
             msg = 'Helm config dir {} does not exist'.format(helm_dir)
-            self.error(msg)
-            raise Exception(msg)
+            self.log.error(msg)
+            raise K8sException(msg)
 
         config_filename = kube_dir + '/config'
 
         config_filename = kube_dir + '/config'
-        return kube_dir, helm_dir, config_filename
+        return kube_dir, helm_dir, config_filename, cluster_dir
 
     @staticmethod
     def _remove_multiple_spaces(str):
 
     @staticmethod
     def _remove_multiple_spaces(str):
@@ -956,13 +1157,13 @@ class K8sHelmConnector(K8sConnector):
             command: str
     ) -> (str, int):
         command = K8sHelmConnector._remove_multiple_spaces(command)
             command: str
     ) -> (str, int):
         command = K8sHelmConnector._remove_multiple_spaces(command)
-        self.debug('Executing sync local command: {}'.format(command))
+        self.log.debug('Executing sync local command: {}'.format(command))
         # raise exception if fails
         output = ''
         try:
             output = subprocess.check_output(command, shell=True, universal_newlines=True)
             return_code = 0
         # raise exception if fails
         output = ''
         try:
             output = subprocess.check_output(command, shell=True, universal_newlines=True)
             return_code = 0
-            self.debug(output)
+            self.log.debug(output)
         except Exception as e:
             return_code = 1
 
         except Exception as e:
             return_code = 1
 
@@ -972,11 +1173,12 @@ class K8sHelmConnector(K8sConnector):
             self,
             command: str,
             raise_exception_on_error: bool = False,
             self,
             command: str,
             raise_exception_on_error: bool = False,
-            show_error_log: bool = False
+            show_error_log: bool = True,
+            encode_utf8: bool = False
     ) -> (str, int):
 
         command = K8sHelmConnector._remove_multiple_spaces(command)
     ) -> (str, int):
 
         command = K8sHelmConnector._remove_multiple_spaces(command)
-        self.debug('Executing async local command: {}'.format(command))
+        self.log.debug('Executing async local command: {}'.format(command))
 
         # split command
         command = command.split(sep=' ')
 
         # split command
         command = command.split(sep=' ')
@@ -996,60 +1198,45 @@ class K8sHelmConnector(K8sConnector):
             output = ''
             if stdout:
                 output = stdout.decode('utf-8').strip()
             output = ''
             if stdout:
                 output = stdout.decode('utf-8').strip()
+                # output = stdout.decode()
             if stderr:
                 output = stderr.decode('utf-8').strip()
             if stderr:
                 output = stderr.decode('utf-8').strip()
+                # output = stderr.decode()
 
             if return_code != 0 and show_error_log:
 
             if return_code != 0 and show_error_log:
-                self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
+                self.log.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
             else:
             else:
-                self.debug('Return code: {}'.format(return_code))
+                self.log.debug('Return code: {}'.format(return_code))
 
             if raise_exception_on_error and return_code != 0:
 
             if raise_exception_on_error and return_code != 0:
-                raise Exception(output)
+                raise K8sException(output)
+
+            if encode_utf8:
+                output = output.encode('utf-8').strip()
+                output = str(output).replace('\\n', '\n')
 
             return output, return_code
 
 
             return output, return_code
 
+        except asyncio.CancelledError:
+            raise
+        except K8sException:
+            raise
         except Exception as e:
             msg = 'Exception executing command: {} -> {}'.format(command, e)
         except Exception as e:
             msg = 'Exception executing command: {} -> {}'.format(command, e)
-            if show_error_log:
-                self.error(msg)
-            return '', -1
-
-    def _remote_exec(
-            self,
-            hostname: str,
-            username: str,
-            password: str,
-            command: str,
-            timeout: int = 10
-    ) -> (str, int):
-
-        command = K8sHelmConnector._remove_multiple_spaces(command)
-        self.debug('Executing sync remote ssh command: {}'.format(command))
-
-        ssh = paramiko.SSHClient()
-        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-        ssh.connect(hostname=hostname, username=username, password=password)
-        ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
-        output = ssh_stdout.read().decode('utf-8')
-        error = ssh_stderr.read().decode('utf-8')
-        if error:
-            self.error('ERROR: {}'.format(error))
-            return_code = 1
-        else:
-            return_code = 0
-        output = output.replace('\\n', '\n')
-        self.debug('OUTPUT: {}'.format(output))
-
-        return output, return_code
+            self.log.error(msg)
+            if raise_exception_on_error:
+                raise K8sException(e) from e
+            else:
+                return '', -1
 
     def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
 
     def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
-        self.debug('Checking if file {} exists...'.format(filename))
+        # self.log.debug('Checking if file {} exists...'.format(filename))
         if os.path.exists(filename):
             return True
         else:
             msg = 'File {} does not exist'.format(filename)
             if exception_if_not_exists:
         if os.path.exists(filename):
             return True
         else:
             msg = 'File {} does not exist'.format(filename)
             if exception_if_not_exists:
-                self.error(msg)
-                raise Exception(msg)
+                # self.log.error(msg)
+                raise K8sException(msg)
+