Bug 1980 fixed
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
index f7ff853..8c526f5 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact with: nfvlabs@tid.es
 ##
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact with: nfvlabs@tid.es
 ##
-
-import paramiko
-import subprocess
-import os
-import shutil
 import asyncio
 import asyncio
-import time
+from typing import Union
+import os
 import yaml
 import yaml
-from uuid import uuid4
-import random
-from n2vc.k8s_conn import K8sConnector
+
+from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
 from n2vc.exceptions import K8sException
 
 
 from n2vc.exceptions import K8sException
 
 
-class K8sHelmConnector(K8sConnector):
+class K8sHelmConnector(K8sHelmBaseConnector):
 
     """
 
     """
-    ##################################################################################################
-    ########################################## P U B L I C ###########################################
-    ##################################################################################################
+    ####################################################################################
+    ################################### P U B L I C ####################################
+    ####################################################################################
     """
 
     def __init__(
     """
 
     def __init__(
-            self,
-            fs: object,
-            db: object,
-            kubectl_command: str = '/usr/bin/kubectl',
-            helm_command: str = '/usr/bin/helm',
-            log: object = None,
-            on_update_db=None
+        self,
+        fs: object,
+        db: object,
+        kubectl_command: str = "/usr/bin/kubectl",
+        helm_command: str = "/usr/bin/helm",
+        log: object = None,
+        on_update_db=None,
     ):
         """
     ):
         """
+        Initializes helm connector for helm v2
 
         :param fs: file system for kubernetes and helm configuration
         :param db: database object to write current operation status
 
         :param fs: file system for kubernetes and helm configuration
         :param db: database object to write current operation status
@@ -61,676 +57,437 @@ class K8sHelmConnector(K8sConnector):
         """
 
         # parent class
         """
 
         # parent class
-        K8sConnector.__init__(
+        K8sHelmBaseConnector.__init__(
             self,
             db=db,
             log=log,
             self,
             db=db,
             log=log,
-            on_update_db=on_update_db
+            fs=fs,
+            kubectl_command=kubectl_command,
+            helm_command=helm_command,
+            on_update_db=on_update_db,
         )
 
         )
 
-        self.info('Initializing K8S Helm connector')
-
-        # random numbers for release name generation
-        random.seed(time.time())
-
-        # the file system
-        self.fs = fs
-
-        # exception if kubectl is not installed
-        self.kubectl_command = kubectl_command
-        self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
-
-        # exception if helm is not installed
-        self._helm_command = helm_command
-        self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
+        self.log.info("Initializing K8S Helm2 connector")
 
         # initialize helm client-only
 
         # initialize helm client-only
-        self.debug('Initializing helm client-only...')
-        command = '{} init --client-only'.format(self._helm_command)
+        self.log.debug("Initializing helm client-only...")
+        command = "{} init --client-only {} ".format(
+            self._helm_command,
+            "--stable-repo-url {}".format(self._stable_repo_url)
+            if self._stable_repo_url
+            else "--skip-repos",
+        )
         try:
         try:
-            asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False))
+            asyncio.ensure_future(
+                self._local_async_exec(command=command, raise_exception_on_error=False)
+            )
             # loop = asyncio.get_event_loop()
             # loop = asyncio.get_event_loop()
-            # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False))
-        except Exception as e:
-            self.warning(msg='helm init failed (it was already initialized): {}'.format(e))
-
-        self.info('K8S Helm connector initialized')
-
-    async def init_env(
-            self,
-            k8s_creds: str,
-            namespace: str = 'kube-system',
-            reuse_cluster_uuid=None
-    ) -> (str, bool):
-
-        cluster_uuid = reuse_cluster_uuid
-        if not cluster_uuid:
-            cluster_uuid = str(uuid4())
-
-        self.debug('Initializing K8S environment. namespace: {}'.format(namespace))
-
-        # create config filename
-        kube_dir, helm_dir, config_filename, cluster_dir = \
-            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
-        f = open(config_filename, "w")
-        f.write(k8s_creds)
-        f.close()
-
-        # check if tiller pod is up in cluster
-        command = '{} --kubeconfig={} --namespace={} get deployments'\
-            .format(self.kubectl_command, config_filename, namespace)
-        output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
-
-        output_table = K8sHelmConnector._output_to_table(output=output)
-
-        # find 'tiller' pod in all pods
-        already_initialized = False
-        try:
-            for row in output_table:
-                if row[0].startswith('tiller-deploy'):
-                    already_initialized = True
-                    break
+            # loop.run_until_complete(self._local_async_exec(command=command,
+            # raise_exception_on_error=False))
         except Exception as e:
         except Exception as e:
-            pass
-
-        # helm init
-        n2vc_installed_sw = False
-        if not already_initialized:
-            self.info('Initializing helm in client and server: {}'.format(cluster_uuid))
-            command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
-                .format(self._helm_command, config_filename, namespace, helm_dir)
-            output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
-            n2vc_installed_sw = True
-        else:
-            # check client helm installation
-            check_file = helm_dir + '/repository/repositories.yaml'
-            if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
-                self.info('Initializing helm in client: {}'.format(cluster_uuid))
-                command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
-                    .format(self._helm_command, config_filename, namespace, helm_dir)
-                output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
-            else:
-                self.info('Helm client already initialized')
-
-        self.info('Cluster initialized {}'.format(cluster_uuid))
-
-        return cluster_uuid, n2vc_installed_sw
-
-    async def repo_add(
-            self,
-            cluster_uuid: str,
-            name: str,
-            url: str,
-            repo_type: str = 'chart'
-    ):
-
-        self.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url))
-
-        # config filename
-        kube_dir, helm_dir, config_filename, cluster_dir = \
-            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
-
-        # helm repo update
-        command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir)
-        self.debug('updating repo: {}'.format(command))
-        await self._local_async_exec(command=command, raise_exception_on_error=False)
-
-        # helm repo add name url
-        command = '{} --kubeconfig={} --home={} repo add {} {}'\
-            .format(self._helm_command, config_filename, helm_dir, name, url)
-        self.debug('adding repo: {}'.format(command))
-        await self._local_async_exec(command=command, raise_exception_on_error=True)
-
-    async def repo_list(
-            self,
-            cluster_uuid: str
-    ) -> list:
-        """
-        Get the list of registered repositories
-
-        :return: list of registered repositories: [ (name, url) .... ]
-        """
-
-        self.debug('list repositories for cluster {}'.format(cluster_uuid))
-
-        # config filename
-        kube_dir, helm_dir, config_filename, cluster_dir = \
-            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
-
-        command = '{} --kubeconfig={} --home={} repo list --output yaml'\
-            .format(self._helm_command, config_filename, helm_dir)
+            self.warning(
+                msg="helm init failed (it was already initialized): {}".format(e)
+            )
 
 
-        output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
-        if output and len(output) > 0:
-            return yaml.load(output, Loader=yaml.SafeLoader)
-        else:
-            return []
+        self.log.info("K8S Helm2 connector initialized")
 
 
-    async def repo_remove(
-            self,
-            cluster_uuid: str,
-            name: str
+    async def install(
+        self,
+        cluster_uuid: str,
+        kdu_model: str,
+        kdu_instance: str,
+        atomic: bool = True,
+        timeout: float = 300,
+        params: dict = None,
+        db_dict: dict = None,
+        kdu_name: str = None,
+        namespace: str = None,
+        **kwargs,
     ):
         """
     ):
         """
-        Remove a repository from OSM
+        Deploys of a new KDU instance. It would implicitly rely on the `install` call
+        to deploy the Chart/Bundle properly parametrized (in practice, this call would
+        happen before any _initial-config-primitive_of the VNF is called).
 
 
-        :param cluster_uuid: the cluster
-        :param name: repo name in OSM
+        :param cluster_uuid: UUID of a K8s cluster known by OSM
+        :param kdu_model: chart/ reference (string), which can be either
+            of these options:
+            - a name of chart available via the repos known by OSM
+            - a path to a packaged chart
+            - a path to an unpacked chart directory or a URL
+        :param kdu_instance: Kdu instance name
+        :param atomic: If set, installation process purges chart/bundle on fail, also
+            will wait until all the K8s objects are active
+        :param timeout: Time in seconds to wait for the install of the chart/bundle
+            (defaults to Helm default timeout: 300s)
+        :param params: dictionary of key-value pairs for instantiation parameters
+            (overriding default values)
+        :param dict db_dict: where to write into database when the status changes.
+                        It contains a dict with {collection: <str>, filter: {},
+                        path: <str>},
+                            e.g. {collection: "nsrs", filter:
+                            {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+        :param kdu_name: Name of the KDU instance to be installed
+        :param namespace: K8s namespace to use for the KDU instance
+        :param kwargs: Additional parameters (None yet)
         :return: True if successful
         """
         :return: True if successful
         """
+        self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
 
 
-        self.debug('list repositories for cluster {}'.format(cluster_uuid))
-
-        # config filename
-        kube_dir, helm_dir, config_filename, cluster_dir = \
-            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
-
-        command = '{} --kubeconfig={} --home={} repo remove {}'\
-            .format(self._helm_command, config_filename, helm_dir, name)
+        # sync local dir
+        self.fs.sync(from_path=cluster_uuid)
 
 
-        await self._local_async_exec(command=command, raise_exception_on_error=True)
+        # init env, paths
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
 
 
-    async def reset(
-            self,
-            cluster_uuid: str,
-            force: bool = False,
-            uninstall_sw: bool = False
-    ) -> bool:
-
-        self.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid))
-
-        # get kube and helm directories
-        kube_dir, helm_dir, config_filename, cluster_dir = \
-            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False)
-
-        # uninstall releases if needed
-        releases = await self.instances_list(cluster_uuid=cluster_uuid)
-        if len(releases) > 0:
-            if force:
-                for r in releases:
-                    try:
-                        kdu_instance = r.get('Name')
-                        chart = r.get('Chart')
-                        self.debug('Uninstalling {} -> {}'.format(chart, kdu_instance))
-                        await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
-                    except Exception as e:
-                        self.error('Error uninstalling release {}: {}'.format(kdu_instance, e))
-            else:
-                msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
-                    .format(cluster_uuid)
-                self.error(msg)
-                raise K8sException(msg)
+        await self._install_impl(
+            cluster_uuid,
+            kdu_model,
+            paths,
+            env,
+            kdu_instance,
+            atomic=atomic,
+            timeout=timeout,
+            params=params,
+            db_dict=db_dict,
+            kdu_name=kdu_name,
+            namespace=namespace,
+        )
 
 
-        if uninstall_sw:
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_uuid)
 
 
-            self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
+        self.log.debug("Returning kdu_instance {}".format(kdu_instance))
+        return True
 
 
-            # find namespace for tiller pod
-            command = '{} --kubeconfig={} get deployments --all-namespaces'\
-                .format(self.kubectl_command, config_filename)
-            output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
-            output_table = K8sHelmConnector._output_to_table(output=output)
-            namespace = None
-            for r in output_table:
-                try:
-                    if 'tiller-deploy' in r[1]:
-                        namespace = r[0]
-                        break
-                except Exception as e:
-                    pass
-            else:
-                msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid)
-                self.error(msg)
-
-            self.debug('namespace for tiller: {}'.format(namespace))
-
-            force_str = '--force'
-
-            if namespace:
-                # delete tiller deployment
-                self.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace))
-                command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
-                    .format(self.kubectl_command, namespace, config_filename, force_str)
-                await self._local_async_exec(command=command, raise_exception_on_error=False)
-
-                # uninstall tiller from cluster
-                self.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid))
-                command = '{} --kubeconfig={} --home={} reset'\
-                    .format(self._helm_command, config_filename, helm_dir)
-                self.debug('resetting: {}'.format(command))
-                output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
-            else:
-                self.debug('namespace not found')
+    async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
 
 
-        # delete cluster directory
-        dir = self.fs.path + '/' + cluster_uuid
-        self.debug('Removing directory {}'.format(dir))
-        shutil.rmtree(dir, ignore_errors=True)
+        self.log.debug(
+            "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
+        )
 
 
-        return True
+        return await self._exec_inspect_command(
+            inspect_command="", kdu_model=kdu_model, repo_url=repo_url
+        )
 
 
-    async def install(
-            self,
-            cluster_uuid: str,
-            kdu_model: str,
-            atomic: bool = True,
-            timeout: float = 300,
-            params: dict = None,
-            db_dict: dict = None
-    ):
+    """
+    ####################################################################################
+    ################################### P R I V A T E ##################################
+    ####################################################################################
+    """
 
 
-        self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
+    def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
+        """
+        Creates and returns base cluster and kube dirs and returns them.
+        Also created helm3 dirs according to new directory specification, paths are
+        returned and also environment variables that must be provided to execute commands
 
 
-        # config filename
-        kube_dir, helm_dir, config_filename, cluster_dir = \
-            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+        Helm 2 directory specification uses helm_home dir:
 
 
-        # params to str
-        # params_str = K8sHelmConnector._params_to_set_option(params)
-        params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
+        The variables assigned for this paths are:
+        - Helm hone: $HELM_HOME
+        - helm kubeconfig: $KUBECONFIG
 
 
-        timeout_str = ''
-        if timeout:
-            timeout_str = '--timeout {}'.format(timeout)
+        :param cluster_name:  cluster_name
+        :return: Dictionary with config_paths and dictionary with helm environment variables
+        """
+        base = self.fs.path
+        if base.endswith("/") or base.endswith("\\"):
+            base = base[:-1]
 
 
-        # atomic
-        atomic_str = ''
-        if atomic:
-            atomic_str = '--atomic'
+        # base dir for cluster
+        cluster_dir = base + "/" + cluster_name
 
 
-        # version
-        version_str = ''
-        if ':' in kdu_model:
-            parts = kdu_model.split(sep=':')
-            if len(parts) == 2:
-                version_str = '--version {}'.format(parts[1])
-                kdu_model = parts[0]
-
-        # generate a name for the release. Then, check if already exists
-        kdu_instance = None
-        while kdu_instance is None:
-            kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
-            try:
-                result = await self._status_kdu(
-                    cluster_uuid=cluster_uuid,
-                    kdu_instance=kdu_instance,
-                    show_error_log=False
-                )
-                if result is not None:
-                    # instance already exists: generate a new one
-                    kdu_instance = None
-            except Exception as e:
-                kdu_instance = None
-
-        # helm repo install
-        command = '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
-            .format(self._helm_command, atomic_str, config_filename, helm_dir,
-                    params_str, timeout_str, kdu_instance, kdu_model, version_str)
-        self.debug('installing: {}'.format(command))
+        # kube dir
+        kube_dir = cluster_dir + "/" + ".kube"
+        if create_if_not_exist and not os.path.exists(kube_dir):
+            self.log.debug("Creating dir {}".format(kube_dir))
+            os.makedirs(kube_dir)
 
 
-        if atomic:
-            # exec helm in a task
-            exec_task = asyncio.ensure_future(
-                coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
-            )
-            # write status in another task
-            status_task = asyncio.ensure_future(
-                coro_or_future=self._store_status(
-                    cluster_uuid=cluster_uuid,
-                    kdu_instance=kdu_instance,
-                    db_dict=db_dict,
-                    operation='install',
-                    run_once=False
-                )
-            )
+        # helm home dir
+        helm_dir = cluster_dir + "/" + ".helm"
+        if create_if_not_exist and not os.path.exists(helm_dir):
+            self.log.debug("Creating dir {}".format(helm_dir))
+            os.makedirs(helm_dir)
 
 
-            # wait for execution task
-            await asyncio.wait([exec_task])
+        config_filename = kube_dir + "/config"
 
 
-            # cancel status task
-            status_task.cancel()
+        # 2 - Prepare dictionary with paths
+        paths = {
+            "kube_dir": kube_dir,
+            "kube_config": config_filename,
+            "cluster_dir": cluster_dir,
+            "helm_dir": helm_dir,
+        }
 
 
-            output, rc = exec_task.result()
+        for file_name, file in paths.items():
+            if "dir" in file_name and not os.path.exists(file):
+                err_msg = "{} dir does not exist".format(file)
+                self.log.error(err_msg)
+                raise K8sException(err_msg)
 
 
-        else:
+        # 3 - Prepare environment variables
+        env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
 
 
-            output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+        return paths, env
 
 
-        # remove temporal values yaml file
-        if file_to_delete:
-            os.remove(file_to_delete)
+    async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
 
 
-        # write final status
-        await self._store_status(
-            cluster_uuid=cluster_uuid,
-            kdu_instance=kdu_instance,
-            db_dict=db_dict,
-            operation='install',
-            run_once=True,
-            check_every=0
+        # init config, env
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
         )
 
         )
 
-        if rc != 0:
-            msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
-            self.error(msg)
-            raise K8sException(msg)
+        command1 = "env KUBECONFIG={} {} get manifest {} ".format(
+            kubeconfig, self._helm_command, kdu_instance
+        )
+        command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
+        output, _rc = await self._local_async_exec_pipe(
+            command1, command2, env=env, raise_exception_on_error=True
+        )
+        services = self._parse_services(output)
 
 
-        self.debug('Returning kdu_instance {}'.format(kdu_instance))
-        return kdu_instance
+        return services
 
 
-    async def instances_list(
-            self,
-            cluster_uuid: str
-    ) -> list:
+    async def _cluster_init(
+        self, cluster_id: str, namespace: str, paths: dict, env: dict
+    ):
         """
         """
-        returns a list of deployed releases in a cluster
-
-        :param cluster_uuid: the cluster
-        :return:
+        Implements the helm version dependent cluster initialization:
+        For helm2 it initialized tiller environment if needed
         """
 
         """
 
-        self.debug('list releases for cluster {}'.format(cluster_uuid))
+        # check if tiller pod is up in cluster
+        command = "{} --kubeconfig={} --namespace={} get deployments".format(
+            self.kubectl_command, paths["kube_config"], namespace
+        )
+        output, _rc = await self._local_async_exec(
+            command=command, raise_exception_on_error=True, env=env
+        )
+
+        output_table = self._output_to_table(output=output)
 
 
-        # config filename
-        kube_dir, helm_dir, config_filename, cluster_dir = \
-            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+        # find 'tiller' pod in all pods
+        already_initialized = False
+        try:
+            for row in output_table:
+                if row[0].startswith("tiller-deploy"):
+                    already_initialized = True
+                    break
+        except Exception:
+            pass
 
 
-        command = '{} --kubeconfig={} --home={} list --output yaml'\
-            .format(self._helm_command, config_filename, helm_dir)
+        # helm init
+        n2vc_installed_sw = False
+        if not already_initialized:
+            self.log.info(
+                "Initializing helm in client and server: {}".format(cluster_id)
+            )
+            command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
+                self.kubectl_command, paths["kube_config"], self.service_account
+            )
+            _, _rc = await self._local_async_exec(
+                command=command, raise_exception_on_error=False, env=env
+            )
 
 
-        output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
+            command = (
+                "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
+                "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
+            ).format(self.kubectl_command, paths["kube_config"], self.service_account)
+            _, _rc = await self._local_async_exec(
+                command=command, raise_exception_on_error=False, env=env
+            )
 
 
-        if output and len(output) > 0:
-            return yaml.load(output, Loader=yaml.SafeLoader).get('Releases')
+            command = (
+                "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
+                " {} init"
+            ).format(
+                self._helm_command,
+                paths["kube_config"],
+                namespace,
+                paths["helm_dir"],
+                self.service_account,
+                "--stable-repo-url {}".format(self._stable_repo_url)
+                if self._stable_repo_url
+                else "--skip-repos",
+            )
+            _, _rc = await self._local_async_exec(
+                command=command, raise_exception_on_error=True, env=env
+            )
+            n2vc_installed_sw = True
         else:
         else:
-            return []
-
-    async def upgrade(
-            self,
-            cluster_uuid: str,
-            kdu_instance: str,
-            kdu_model: str = None,
-            atomic: bool = True,
-            timeout: float = 300,
-            params: dict = None,
-            db_dict: dict = None
-    ):
+            # check client helm installation
+            check_file = paths["helm_dir"] + "/repository/repositories.yaml"
+            if not self._check_file_exists(
+                filename=check_file, exception_if_not_exists=False
+            ):
+                self.log.info("Initializing helm in client: {}".format(cluster_id))
+                command = (
+                    "{} --kubeconfig={} --tiller-namespace={} "
+                    "--home={} init --client-only {} "
+                ).format(
+                    self._helm_command,
+                    paths["kube_config"],
+                    namespace,
+                    paths["helm_dir"],
+                    "--stable-repo-url {}".format(self._stable_repo_url)
+                    if self._stable_repo_url
+                    else "--skip-repos",
+                )
+                output, _rc = await self._local_async_exec(
+                    command=command, raise_exception_on_error=True, env=env
+                )
+            else:
+                self.log.info("Helm client already initialized")
 
 
-        self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
+        repo_list = await self.repo_list(cluster_id)
+        for repo in repo_list:
+            if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
+                self.log.debug("Add new stable repo url: {}")
+                await self.repo_remove(cluster_id, "stable")
+                if self._stable_repo_url:
+                    await self.repo_add(cluster_id, "stable", self._stable_repo_url)
+                break
 
 
-        # config filename
-        kube_dir, helm_dir, config_filename, cluster_dir = \
-            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+        return n2vc_installed_sw
 
 
-        # params to str
-        # params_str = K8sHelmConnector._params_to_set_option(params)
-        params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params)
+    async def _uninstall_sw(self, cluster_id: str, namespace: str):
+        # uninstall Tiller if necessary
 
 
-        timeout_str = ''
-        if timeout:
-            timeout_str = '--timeout {}'.format(timeout)
+        self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
 
 
-        # atomic
-        atomic_str = ''
-        if atomic:
-            atomic_str = '--atomic'
+        # init paths, env
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
+        )
 
 
-        # version
-        version_str = ''
-        if kdu_model and ':' in kdu_model:
-            parts = kdu_model.split(sep=':')
-            if len(parts) == 2:
-                version_str = '--version {}'.format(parts[1])
-                kdu_model = parts[0]
-
-        # helm repo upgrade
-        command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
-            .format(self._helm_command, atomic_str, config_filename, helm_dir,
-                    params_str, timeout_str, kdu_instance, kdu_model, version_str)
-        self.debug('upgrading: {}'.format(command))
+        if not namespace:
+            # find namespace for tiller pod
+            command = "{} --kubeconfig={} get deployments --all-namespaces".format(
+                self.kubectl_command, paths["kube_config"]
+            )
+            output, _rc = await self._local_async_exec(
+                command=command, raise_exception_on_error=False, env=env
+            )
+            output_table = self._output_to_table(output=output)
+            namespace = None
+            for r in output_table:
+                try:
+                    if "tiller-deploy" in r[1]:
+                        namespace = r[0]
+                        break
+                except Exception:
+                    pass
+            else:
+                msg = "Tiller deployment not found in cluster {}".format(cluster_id)
+                self.log.error(msg)
 
 
-        if atomic:
+            self.log.debug("namespace for tiller: {}".format(namespace))
 
 
-            # exec helm in a task
-            exec_task = asyncio.ensure_future(
-                coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
+        if namespace:
+            # uninstall tiller from cluster
+            self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
+            command = "{} --kubeconfig={} --home={} reset".format(
+                self._helm_command, paths["kube_config"], paths["helm_dir"]
+            )
+            self.log.debug("resetting: {}".format(command))
+            output, _rc = await self._local_async_exec(
+                command=command, raise_exception_on_error=True, env=env
             )
             )
-            # write status in another task
-            status_task = asyncio.ensure_future(
-                coro_or_future=self._store_status(
-                    cluster_uuid=cluster_uuid,
-                    kdu_instance=kdu_instance,
-                    db_dict=db_dict,
-                    operation='upgrade',
-                    run_once=False
+            # Delete clusterrolebinding and serviceaccount.
+            # Ignore if errors for backward compatibility
+            command = (
+                "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
+                "io/osm-tiller-cluster-rule"
+            ).format(self.kubectl_command, paths["kube_config"])
+            output, _rc = await self._local_async_exec(
+                command=command, raise_exception_on_error=False, env=env
+            )
+            command = (
+                "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
+                    self.kubectl_command,
+                    paths["kube_config"],
+                    namespace,
+                    self.service_account,
                 )
             )
                 )
             )
+            output, _rc = await self._local_async_exec(
+                command=command, raise_exception_on_error=False, env=env
+            )
 
 
-            # wait for execution task
-            await asyncio.wait([exec_task])
-
-            # cancel status task
-            status_task.cancel()
-            output, rc = exec_task.result()
-
-        else:
-
-            output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
-
-        # remove temporal values yaml file
-        if file_to_delete:
-            os.remove(file_to_delete)
-
-        # write final status
-        await self._store_status(
-            cluster_uuid=cluster_uuid,
-            kdu_instance=kdu_instance,
-            db_dict=db_dict,
-            operation='upgrade',
-            run_once=True,
-            check_every=0
-        )
-
-        if rc != 0:
-            msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
-            self.error(msg)
-            raise K8sException(msg)
-
-        # return new revision number
-        instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
-        if instance:
-            revision = int(instance.get('Revision'))
-            self.debug('New revision: {}'.format(revision))
-            return revision
         else:
         else:
-            return 0
-
-    async def rollback(
-            self,
-            cluster_uuid: str,
-            kdu_instance: str,
-            revision=0,
-            db_dict: dict = None
-    ):
-
-        self.debug('rollback kdu_instance {} to revision {} from cluster {}'
-                   .format(kdu_instance, revision, cluster_uuid))
-
-        # config filename
-        kube_dir, helm_dir, config_filename, cluster_dir = \
-            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+            self.log.debug("namespace not found")
 
 
-        command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\
-            .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision)
+    async def _instances_list(self, cluster_id):
 
 
-        # exec helm in a task
-        exec_task = asyncio.ensure_future(
-            coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False)
+        # init paths, env
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
         )
         )
-        # write status in another task
-        status_task = asyncio.ensure_future(
-            coro_or_future=self._store_status(
-                cluster_uuid=cluster_uuid,
-                kdu_instance=kdu_instance,
-                db_dict=db_dict,
-                operation='rollback',
-                run_once=False
-            )
-        )
-
-        # wait for execution task
-        await asyncio.wait([exec_task])
 
 
-        # cancel status task
-        status_task.cancel()
+        command = "{} list --output yaml".format(self._helm_command)
 
 
-        output, rc = exec_task.result()
-
-        # write final status
-        await self._store_status(
-            cluster_uuid=cluster_uuid,
-            kdu_instance=kdu_instance,
-            db_dict=db_dict,
-            operation='rollback',
-            run_once=True,
-            check_every=0
+        output, _rc = await self._local_async_exec(
+            command=command, raise_exception_on_error=True, env=env
         )
 
         )
 
-        if rc != 0:
-            msg = 'Error executing command: {}\nOutput: {}'.format(command, output)
-            self.error(msg)
-            raise K8sException(msg)
-
-        # return new revision number
-        instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
-        if instance:
-            revision = int(instance.get('Revision'))
-            self.debug('New revision: {}'.format(revision))
-            return revision
+        if output and len(output) > 0:
+            # parse yaml and update keys to lower case to unify with helm3
+            instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
+            new_instances = []
+            for instance in instances:
+                new_instance = dict((k.lower(), v) for k, v in instance.items())
+                new_instances.append(new_instance)
+            return new_instances
         else:
         else:
-            return 0
+            return []
 
 
-    async def uninstall(
-            self,
-            cluster_uuid: str,
-            kdu_instance: str
+    def _get_inspect_command(
+        self, show_command: str, kdu_model: str, repo_str: str, version: str
     ):
     ):
-        """
-        Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
-        after all _terminate-config-primitive_ of the VNF are invoked).
-
-        :param cluster_uuid: UUID of a K8s cluster known by OSM
-        :param kdu_instance: unique name for the KDU instance to be deleted
-        :return: True if successful
-        """
-
-        self.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid))
-
-        # config filename
-        kube_dir, helm_dir, config_filename, cluster_dir = \
-            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
-
-        command = '{} --kubeconfig={} --home={} delete --purge {}'\
-            .format(self._helm_command, config_filename, helm_dir, kdu_instance)
-
-        output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
-
-        return self._output_to_table(output)
-
-    async def inspect_kdu(
-            self,
-            kdu_model: str,
-            repo_url: str = None
-    ) -> str:
-
-        self.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url))
-
-        return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url)
-
-    async def values_kdu(
-            self,
-            kdu_model: str,
-            repo_url: str = None
-    ) -> str:
-
-        self.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url))
-
-        return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url)
-
-    async def help_kdu(
-            self,
-            kdu_model: str,
-            repo_url: str = None
-    ) -> str:
-
-        self.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url))
-
-        return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url)
-
-    async def status_kdu(
-            self,
-            cluster_uuid: str,
-            kdu_instance: str
-    ) -> str:
-
-        # call internal function
-        return await self._status_kdu(
-            cluster_uuid=cluster_uuid,
-            kdu_instance=kdu_instance,
-            show_error_log=True,
-            return_text=True
+        inspect_command = "{} inspect {} {}{} {}".format(
+            self._helm_command, show_command, kdu_model, repo_str, version
         )
         )
+        return inspect_command
 
 
-    """
-    ##################################################################################################
-    ########################################## P R I V A T E #########################################
-    ##################################################################################################
-    """
-
-    async def _exec_inspect_comand(
-            self,
-            inspect_command: str,
-            kdu_model: str,
-            repo_url: str = None
+    def _get_get_command(
+        self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
     ):
     ):
-
-        repo_str = ''
-        if repo_url:
-            repo_str = ' --repo {}'.format(repo_url)
-            idx = kdu_model.find('/')
-            if idx >= 0:
-                idx += 1
-                kdu_model = kdu_model[idx:]
-
-        inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str)
-        output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True)
-
-        return output
+        get_command = "env KUBECONFIG={} {} get {} {} --output yaml".format(
+            kubeconfig, self._helm_command, get_command, kdu_instance
+        )
+        return get_command
 
     async def _status_kdu(
 
     async def _status_kdu(
-            self,
-            cluster_uuid: str,
-            kdu_instance: str,
-            show_error_log: bool = False,
-            return_text: bool = False
-    ):
-
-        self.debug('status of kdu_instance {}'.format(kdu_instance))
-
-        # config filename
-        kube_dir, helm_dir, config_filename, cluster_dir = \
-            self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
-
-        command = '{} --kubeconfig={} --home={} status {} --output yaml'\
-            .format(self._helm_command, config_filename, helm_dir, kdu_instance)
+        self,
+        cluster_id: str,
+        kdu_instance: str,
+        namespace: str = None,
+        yaml_format: bool = False,
+        show_error_log: bool = False,
+    ) -> Union[str, dict]:
+
+        self.log.debug(
+            "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
+        )
 
 
+        # init config, env
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
+        )
+        command = ("env KUBECONFIG={} {} status {} --output yaml").format(
+            paths["kube_config"], self._helm_command, kdu_instance
+        )
         output, rc = await self._local_async_exec(
             command=command,
             raise_exception_on_error=True,
         output, rc = await self._local_async_exec(
             command=command,
             raise_exception_on_error=True,
-            show_error_log=show_error_log
+            show_error_log=show_error_log,
+            env=env,
         )
 
         )
 
-        if return_text:
+        if yaml_format:
             return str(output)
 
         if rc != 0:
             return str(output)
 
         if rc != 0:
@@ -740,103 +497,56 @@ class K8sHelmConnector(K8sConnector):
 
         # remove field 'notes'
         try:
 
         # remove field 'notes'
         try:
-            del data.get('info').get('status')['notes']
+            del data.get("info").get("status")["notes"]
         except KeyError:
             pass
 
         except KeyError:
             pass
 
+        # parse the manifest to a list of dictionaries
+        if "manifest" in data:
+            manifest_str = data.get("manifest")
+            manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
+
+            data["manifest"] = []
+            for doc in manifest_docs:
+                data["manifest"].append(doc)
+
         # parse field 'resources'
         try:
         # parse field 'resources'
         try:
-            resources = str(data.get('info').get('status').get('resources'))
+            resources = str(data.get("info").get("status").get("resources"))
             resource_table = self._output_to_table(resources)
             resource_table = self._output_to_table(resources)
-            data.get('info').get('status')['resources'] = resource_table
-        except Exception as e:
+            data.get("info").get("status")["resources"] = resource_table
+        except Exception:
             pass
 
             pass
 
-        return data
-
-    async def get_instance_info(
-            self,
-            cluster_uuid: str,
-            kdu_instance: str
-    ):
-        instances = await self.instances_list(cluster_uuid=cluster_uuid)
-        for instance in instances:
-            if instance.get('Name') == kdu_instance:
-                return instance
-        self.debug('Instance {} not found'.format(kdu_instance))
-        return None
-
-    @staticmethod
-    def _generate_release_name(
-            chart_name: str
-    ):
-        name = ''
-        for c in chart_name:
-            if c.isalpha() or c.isnumeric():
-                name += c
-            else:
-                name += '-'
-        if len(name) > 35:
-            name = name[0:35]
-
-        # if does not start with alpha character, prefix 'a'
-        if not name[0].isalpha():
-            name = 'a' + name
-
-        name += '-'
-
-        def get_random_number():
-            r = random.randrange(start=1, stop=99999999)
-            s = str(r)
-            s = s.rjust(10, '0')
-            return s
+        # set description to lowercase (unify with helm3)
+        try:
+            data.get("info")["description"] = data.get("info").pop("Description")
+        except KeyError:
+            pass
 
 
-        name = name + get_random_number()
-        return name.lower()
+        return data
 
 
-    async def _store_status(
-            self,
-            cluster_uuid: str,
-            operation: str,
-            kdu_instance: str,
-            check_every: float = 10,
-            db_dict: dict = None,
-            run_once: bool = False
-    ):
-        while True:
-            try:
-                await asyncio.sleep(check_every)
-                detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
-                status = detailed_status.get('info').get('Description')
-                print('=' * 60)
-                self.debug('STATUS:\n{}'.format(status))
-                self.debug('DETAILED STATUS:\n{}'.format(detailed_status))
-                print('=' * 60)
-                # write status to db
-                result = await self.write_app_status_to_db(
-                    db_dict=db_dict,
-                    status=str(status),
-                    detailed_status=str(detailed_status),
-                    operation=operation)
-                if not result:
-                    self.info('Error writing in database. Task exiting...')
-                    return
-            except asyncio.CancelledError:
-                self.debug('Task cancelled')
-                return
-            except Exception as e:
-                pass
-            finally:
-                if run_once:
-                    return
+    def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
+        repo_ids = []
+        cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
+        cluster = self.db.get_one("k8sclusters", cluster_filter)
+        if cluster:
+            repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
+            return repo_ids
+        else:
+            raise K8sException(
+                "k8cluster with helm-id : {} not found".format(cluster_uuid)
+            )
 
 
-    async def _is_install_completed(
-            self,
-            cluster_uuid: str,
-            kdu_instance: str
-    ) -> bool:
+    async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
+        # init config, env
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
+        )
 
 
-        status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False)
+        status = await self._status_kdu(
+            cluster_id=cluster_id, kdu_instance=kdu_instance, yaml_format=False
+        )
 
         # extract info.status.resources-> str
         # format:
 
         # extract info.status.resources-> str
         # format:
@@ -845,302 +555,191 @@ class K8sHelmConnector(K8sConnector):
         #       halting-horse-mongodb   0/1     1            0           0s
         #       halting-petit-mongodb   1/1     1            0           0s
         # blank line
         #       halting-horse-mongodb   0/1     1            0           0s
         #       halting-petit-mongodb   1/1     1            0           0s
         # blank line
-        resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources'))
+        resources = K8sHelmBaseConnector._get_deep(
+            status, ("info", "status", "resources")
+        )
 
         # convert to table
 
         # convert to table
-        resources = K8sHelmConnector._output_to_table(resources)
+        resources = K8sHelmBaseConnector._output_to_table(resources)
 
         num_lines = len(resources)
         index = 0
 
         num_lines = len(resources)
         index = 0
+        ready = True
         while index < num_lines:
             try:
                 line1 = resources[index]
                 index += 1
                 # find '==>' in column 0
         while index < num_lines:
             try:
                 line1 = resources[index]
                 index += 1
                 # find '==>' in column 0
-                if line1[0] == '==>':
+                if line1[0] == "==>":
                     line2 = resources[index]
                     index += 1
                     # find READY in column 1
                     line2 = resources[index]
                     index += 1
                     # find READY in column 1
-                    if line2[1] == 'READY':
+                    if line2[1] == "READY":
                         # read next lines
                         line3 = resources[index]
                         index += 1
                         while len(line3) > 1 and index < num_lines:
                             ready_value = line3[1]
                         # read next lines
                         line3 = resources[index]
                         index += 1
                         while len(line3) > 1 and index < num_lines:
                             ready_value = line3[1]
-                            parts = ready_value.split(sep='/')
+                            parts = ready_value.split(sep="/")
                             current = int(parts[0])
                             total = int(parts[1])
                             if current < total:
                             current = int(parts[0])
                             total = int(parts[1])
                             if current < total:
-                                self.debug('NOT READY:\n    {}'.format(line3))
+                                self.log.debug("NOT READY:\n    {}".format(line3))
                                 ready = False
                             line3 = resources[index]
                             index += 1
 
                                 ready = False
                             line3 = resources[index]
                             index += 1
 
-            except Exception as e:
+            except Exception:
                 pass
 
         return ready
 
                 pass
 
         return ready
 
-    @staticmethod
-    def _get_deep(dictionary: dict, members: tuple):
-        target = dictionary
-        value = None
-        try:
-            for m in members:
-                value = target.get(m)
-                if not value:
-                    return None
-                else:
-                    target = value
-        except Exception as e:
-            pass
-        return value
-
-    # find key:value in several lines
-    @staticmethod
-    def _find_in_lines(p_lines: list, p_key: str) -> str:
-        for line in p_lines:
-            try:
-                if line.startswith(p_key + ':'):
-                    parts = line.split(':')
-                    the_value = parts[1].strip()
-                    return the_value
-            except Exception as e:
-                # ignore it
-                pass
-        return None
-
-    # params for use in -f file
-    # returns values file option and filename (in order to delete it at the end)
-    def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
-
-        if params and len(params) > 0:
-            kube_dir, helm_dir, config_filename, cluster_dir = \
-                self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
-
-            def get_random_number():
-                r = random.randrange(start=1, stop=99999999)
-                s = str(r)
-                while len(s) < 10:
-                    s = '0' + s
-                return s
-
-            params2 = dict()
-            for key in params:
-                value = params.get(key)
-                if '!!yaml' in str(value):
-                    value = yaml.load(value[7:])
-                params2[key] = value
-
-            values_file = get_random_number() + '.yaml'
-            with open(values_file, 'w') as stream:
-                yaml.dump(params2, stream, indent=4, default_flow_style=False)
-
-            return '-f {}'.format(values_file), values_file
-
-        return '', None
-
-    # params for use in --set option
-    @staticmethod
-    def _params_to_set_option(params: dict) -> str:
-        params_str = ''
-        if params and len(params) > 0:
-            start = True
-            for key in params:
-                value = params.get(key, None)
-                if value is not None:
-                    if start:
-                        params_str += '--set '
-                        start = False
-                    else:
-                        params_str += ','
-                    params_str += '{}={}'.format(key, value)
-        return params_str
-
-    @staticmethod
-    def _output_to_lines(output: str) -> list:
-        output_lines = list()
-        lines = output.splitlines(keepends=False)
-        for line in lines:
-            line = line.strip()
-            if len(line) > 0:
-                output_lines.append(line)
-        return output_lines
-
-    @staticmethod
-    def _output_to_table(output: str) -> list:
-        output_table = list()
-        lines = output.splitlines(keepends=False)
-        for line in lines:
-            line = line.replace('\t', ' ')
-            line_list = list()
-            output_table.append(line_list)
-            cells = line.split(sep=' ')
-            for cell in cells:
-                cell = cell.strip()
-                if len(cell) > 0:
-                    line_list.append(cell)
-        return output_table
-
-    def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
-        """
-        Returns kube and helm directories
-
-        :param cluster_name:
-        :param create_if_not_exist:
-        :return: kube, helm directories, config filename and cluster dir.
-                Raises exception if not exist and cannot create
-        """
-
-        base = self.fs.path
-        if base.endswith("/") or base.endswith("\\"):
-            base = base[:-1]
-
-        # base dir for cluster
-        cluster_dir = base + '/' + cluster_name
-        if create_if_not_exist and not os.path.exists(cluster_dir):
-            self.debug('Creating dir {}'.format(cluster_dir))
-            os.makedirs(cluster_dir)
-        if not os.path.exists(cluster_dir):
-            msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
-            self.error(msg)
-            raise K8sException(msg)
-
-        # kube dir
-        kube_dir = cluster_dir + '/' + '.kube'
-        if create_if_not_exist and not os.path.exists(kube_dir):
-            self.debug('Creating dir {}'.format(kube_dir))
-            os.makedirs(kube_dir)
-        if not os.path.exists(kube_dir):
-            msg = 'Kube config dir {} does not exist'.format(kube_dir)
-            self.error(msg)
-            raise K8sException(msg)
-
-        # helm home dir
-        helm_dir = cluster_dir + '/' + '.helm'
-        if create_if_not_exist and not os.path.exists(helm_dir):
-            self.debug('Creating dir {}'.format(helm_dir))
-            os.makedirs(helm_dir)
-        if not os.path.exists(helm_dir):
-            msg = 'Helm config dir {} does not exist'.format(helm_dir)
-            self.error(msg)
-            raise K8sException(msg)
-
-        config_filename = kube_dir + '/config'
-        return kube_dir, helm_dir, config_filename, cluster_dir
-
-    @staticmethod
-    def _remove_multiple_spaces(str):
-        str = str.strip()
-        while '  ' in str:
-            str = str.replace('  ', ' ')
-        return str
-
-    def _local_exec(
-            self,
-            command: str
-    ) -> (str, int):
-        command = K8sHelmConnector._remove_multiple_spaces(command)
-        self.debug('Executing sync local command: {}'.format(command))
-        # raise exception if fails
-        output = ''
-        try:
-            output = subprocess.check_output(command, shell=True, universal_newlines=True)
-            return_code = 0
-            self.debug(output)
-        except Exception as e:
-            return_code = 1
-
-        return output, return_code
-
-    async def _local_async_exec(
-            self,
-            command: str,
-            raise_exception_on_error: bool = False,
-            show_error_log: bool = True,
-            encode_utf8: bool = False
-    ) -> (str, int):
+    def _get_install_command(
+        self,
+        kdu_model,
+        kdu_instance,
+        namespace,
+        params_str,
+        version,
+        atomic,
+        timeout,
+        kubeconfig,
+    ) -> str:
 
 
-        command = K8sHelmConnector._remove_multiple_spaces(command)
-        self.debug('Executing async local command: {}'.format(command))
+        timeout_str = ""
+        if timeout:
+            timeout_str = "--timeout {}".format(timeout)
 
 
-        # split command
-        command = command.split(sep=' ')
+        # atomic
+        atomic_str = ""
+        if atomic:
+            atomic_str = "--atomic"
+        # namespace
+        namespace_str = ""
+        if namespace:
+            namespace_str = "--namespace {}".format(namespace)
 
 
-        try:
-            process = await asyncio.create_subprocess_exec(
-                *command,
-                stdout=asyncio.subprocess.PIPE,
-                stderr=asyncio.subprocess.PIPE
+        # version
+        version_str = ""
+        if version:
+            version_str = "--version {}".format(version)
+
+        command = (
+            "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml  "
+            "{params} {timeout} --name={name} {ns} {model} {ver}".format(
+                kubeconfig=kubeconfig,
+                helm=self._helm_command,
+                atomic=atomic_str,
+                params=params_str,
+                timeout=timeout_str,
+                name=kdu_instance,
+                ns=namespace_str,
+                model=kdu_model,
+                ver=version_str,
             )
             )
+        )
+        return command
+
+    def _get_upgrade_scale_command(
+        self,
+        kdu_model: str,
+        kdu_instance: str,
+        namespace: str,
+        scale: int,
+        version: str,
+        atomic: bool,
+        replica_str: str,
+        timeout: float,
+        resource_name: str,
+        kubeconfig: str,
+    ) -> str:
 
 
-            # wait for command terminate
-            stdout, stderr = await process.communicate()
-
-            return_code = process.returncode
-
-            output = ''
-            if stdout:
-                output = stdout.decode('utf-8').strip()
-                # output = stdout.decode()
-            if stderr:
-                output = stderr.decode('utf-8').strip()
-                # output = stderr.decode()
+        timeout_str = ""
+        if timeout:
+            timeout_str = "--timeout {}s".format(timeout)
 
 
-            if return_code != 0 and show_error_log:
-                self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
-            else:
-                self.debug('Return code: {}'.format(return_code))
+        # atomic
+        atomic_str = ""
+        if atomic:
+            atomic_str = "--atomic"
 
 
-            if raise_exception_on_error and return_code != 0:
-                raise Exception(output)
+        # version
+        version_str = ""
+        if version:
+            version_str = "--version {}".format(version)
 
 
-            if encode_utf8:
-                output = output.encode('utf-8').strip()
-                output = str(output).replace('\\n', '\n')
+        # scale
+        if resource_name:
+            scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
+        else:
+            scale_dict = {replica_str: scale}
+
+        scale_str = self._params_to_set_option(scale_dict)
+
+        command = (
+            "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {scale} {timeout} {name} {model} {ver}"
+        ).format(
+            helm=self._helm_command,
+            name=kdu_instance,
+            atomic=atomic_str,
+            scale=scale_str,
+            timeout=timeout_str,
+            model=kdu_model,
+            ver=version_str,
+            kubeconfig=kubeconfig,
+        )
+        return command
+
+    def _get_upgrade_command(
+        self,
+        kdu_model,
+        kdu_instance,
+        namespace,
+        params_str,
+        version,
+        atomic,
+        timeout,
+        kubeconfig,
+    ) -> str:
 
 
-            return output, return_code
+        timeout_str = ""
+        if timeout:
+            timeout_str = "--timeout {}".format(timeout)
 
 
-        except Exception as e:
-            msg = 'Exception executing command: {} -> {}'.format(command, e)
-            if show_error_log:
-                self.error(msg)
-            return '', -1
+        # atomic
+        atomic_str = ""
+        if atomic:
+            atomic_str = "--atomic"
 
 
-    def _remote_exec(
-            self,
-            hostname: str,
-            username: str,
-            password: str,
-            command: str,
-            timeout: int = 10
-    ) -> (str, int):
-
-        command = K8sHelmConnector._remove_multiple_spaces(command)
-        self.debug('Executing sync remote ssh command: {}'.format(command))
-
-        ssh = paramiko.SSHClient()
-        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-        ssh.connect(hostname=hostname, username=username, password=password)
-        ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(command=command, timeout=timeout)
-        output = ssh_stdout.read().decode('utf-8')
-        error = ssh_stderr.read().decode('utf-8')
-        if error:
-            self.error('ERROR: {}'.format(error))
-            return_code = 1
-        else:
-            return_code = 0
-        output = output.replace('\\n', '\n')
-        self.debug('OUTPUT: {}'.format(output))
+        # version
+        version_str = ""
+        if version:
+            version_str = "--version {}".format(version)
+
+        command = (
+            "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
+        ).format(
+            kubeconfig=kubeconfig,
+            helm=self._helm_command,
+            atomic=atomic_str,
+            params=params_str,
+            timeout=timeout_str,
+            name=kdu_instance,
+            model=kdu_model,
+            ver=version_str,
+        )
+        return command
 
 
-        return output, return_code
+    def _get_rollback_command(
+        self, kdu_instance, namespace, revision, kubeconfig
+    ) -> str:
+        return "env KUBECONFIG={} {} rollback {} {} --wait".format(
+            kubeconfig, self._helm_command, kdu_instance, revision
+        )
 
 
-    def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
-        self.debug('Checking if file {} exists...'.format(filename))
-        if os.path.exists(filename):
-            return True
-        else:
-            msg = 'File {} does not exist'.format(filename)
-            if exception_if_not_exists:
-                self.error(msg)
-                raise K8sException(msg)
+    def _get_uninstall_command(
+        self, kdu_instance: str, namespace: str, kubeconfig: str
+    ) -> str:
+        return "env KUBECONFIG={} {} delete --purge  {}".format(
+            kubeconfig, self._helm_command, kdu_instance
+        )