Support for helm v3
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
diff --git a/n2vc/k8s_helm_base_conn.py b/n2vc/k8s_helm_base_conn.py
new file mode 100644 (file)
index 0000000..842bbe3
--- /dev/null
@@ -0,0 +1,1423 @@
+##
+# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
+# This file is part of OSM
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact with: nfvlabs@tid.es
+##
+import abc
+import asyncio
+import random
+import time
+import shlex
+import shutil
+import stat
+import subprocess
+import os
+import yaml
+from uuid import uuid4
+
+from n2vc.exceptions import K8sException
+from n2vc.k8s_conn import K8sConnector
+
+
+class K8sHelmBaseConnector(K8sConnector):
+
+    """
+    ####################################################################################
+    ################################### P U B L I C ####################################
+    ####################################################################################
+    """
+    service_account = "osm"
+
+    def __init__(
+        self,
+        fs: object,
+        db: object,
+        kubectl_command: str = "/usr/bin/kubectl",
+        helm_command: str = "/usr/bin/helm",
+        log: object = None,
+        on_update_db=None,
+    ):
+        """
+
+        :param fs: file system for kubernetes and helm configuration
+        :param db: database object to write current operation status
+        :param kubectl_command: path to kubectl executable
+        :param helm_command: path to helm executable
+        :param log: logger
+        :param on_update_db: callback called when k8s connector updates database
+        """
+
+        # parent class
+        K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
+
+        self.log.info("Initializing K8S Helm connector")
+
+        # random numbers for release name generation
+        random.seed(time.time())
+
+        # the file system
+        self.fs = fs
+
+        # exception if kubectl is not installed
+        self.kubectl_command = kubectl_command
+        self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
+
+        # exception if helm is not installed
+        self._helm_command = helm_command
+        self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
+
+    @staticmethod
+    def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
+        """
+        Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
+        cluster_id for backward compatibility
+        """
+        namespace, _, cluster_id = cluster_uuid.rpartition(':')
+        return namespace, cluster_id
+
+    async def init_env(
+            self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
+    ) -> (str, bool):
+        """
+        It prepares a given K8s cluster environment to run Charts
+
+        :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
+            '.kube/config'
+        :param namespace: optional namespace to be used for helm. By default,
+            'kube-system' will be used
+        :param reuse_cluster_uuid: existing cluster uuid for reuse
+        :return: uuid of the K8s cluster and True if connector has installed some
+            software in the cluster
+        (on error, an exception will be raised)
+        """
+
+        if reuse_cluster_uuid:
+            namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
+            namespace = namespace_ or namespace
+        else:
+            cluster_id = str(uuid4())
+        cluster_uuid = "{}:{}".format(namespace, cluster_id)
+
+        self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
+
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
+        )
+        mode = stat.S_IRUSR | stat.S_IWUSR
+        with open(paths["kube_config"], "w", mode) as f:
+            f.write(k8s_creds)
+        os.chmod(paths["kube_config"], 0o600)
+
+        # Code with initialization specific of helm version
+        n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
+
+        # sync fs with local data
+        self.fs.reverse_sync(from_path=cluster_id)
+
+        self.log.info("Cluster {} initialized".format(cluster_id))
+
+        return cluster_uuid, n2vc_installed_sw
+
+    async def repo_add(
+            self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
+    ):
+        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+        self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
+            cluster_id, repo_type, name, url))
+
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
+        # init_env
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
+        )
+
+        # helm repo update
+        command = "{} repo update".format(
+            self._helm_command
+        )
+        self.log.debug("updating repo: {}".format(command))
+        await self._local_async_exec(command=command, raise_exception_on_error=False, env=env)
+
+        # helm repo add name url
+        command = "{} repo add {} {}".format(
+            self._helm_command, name, url
+        )
+        self.log.debug("adding repo: {}".format(command))
+        await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
+
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
+    async def repo_list(self, cluster_uuid: str) -> list:
+        """
+        Get the list of registered repositories
+
+        :return: list of registered repositories: [ (name, url) .... ]
+        """
+
+        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+        self.log.debug("list repositories for cluster {}".format(cluster_id))
+
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
+        # config filename
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
+        )
+
+        command = "{} repo list --output yaml".format(
+            self._helm_command
+        )
+
+        # Set exception to false because if there are no repos just want an empty list
+        output, _rc = await self._local_async_exec(
+            command=command, raise_exception_on_error=False, env=env
+        )
+
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
+        if _rc == 0:
+            if output and len(output) > 0:
+                repos = yaml.load(output, Loader=yaml.SafeLoader)
+                # unify format between helm2 and helm3 setting all keys lowercase
+                return self._lower_keys_list(repos)
+            else:
+                return []
+        else:
+            return []
+
+    async def repo_remove(self, cluster_uuid: str, name: str):
+
+        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+        self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
+
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
+        # init env, paths
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
+        )
+
+        command = "{} repo remove {}".format(
+            self._helm_command, name
+        )
+        await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
+
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
+    async def reset(
+            self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
+    ) -> bool:
+
+        namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+        self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
+                       .format(cluster_id, uninstall_sw))
+
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
+        # uninstall releases if needed.
+        if uninstall_sw:
+            releases = await self.instances_list(cluster_uuid=cluster_uuid)
+            if len(releases) > 0:
+                if force:
+                    for r in releases:
+                        try:
+                            kdu_instance = r.get("name")
+                            chart = r.get("chart")
+                            self.log.debug(
+                                "Uninstalling {} -> {}".format(chart, kdu_instance)
+                            )
+                            await self.uninstall(
+                                cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+                            )
+                        except Exception as e:
+                            # will not raise exception as it was found
+                            # that in some cases of previously installed helm releases it
+                            # raised an error
+                            self.log.warn(
+                                "Error uninstalling release {}: {}".format(kdu_instance, e)
+                            )
+                else:
+                    msg = (
+                        "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
+                    ).format(cluster_id)
+                    self.log.warn(msg)
+                    uninstall_sw = False  # Allow to remove k8s cluster without removing Tiller
+
+        if uninstall_sw:
+            await self._uninstall_sw(cluster_id, namespace)
+
+        # delete cluster directory
+        self.log.debug("Removing directory {}".format(cluster_id))
+        self.fs.file_delete(cluster_id, ignore_non_exist=True)
+        # Remove also local directorio if still exist
+        direct = self.fs.path + "/" + cluster_id
+        shutil.rmtree(direct, ignore_errors=True)
+
+        return True
+
+    async def install(
+            self,
+            cluster_uuid: str,
+            kdu_model: str,
+            atomic: bool = True,
+            timeout: float = 300,
+            params: dict = None,
+            db_dict: dict = None,
+            kdu_name: str = None,
+            namespace: str = None,
+    ):
+        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+        self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
+
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
+        # init env, paths
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
+        )
+
+        # params to str
+        params_str, file_to_delete = self._params_to_file_option(
+            cluster_id=cluster_id, params=params
+        )
+
+        # version
+        version = None
+        if ":" in kdu_model:
+            parts = kdu_model.split(sep=":")
+            if len(parts) == 2:
+                version = str(parts[1])
+                kdu_model = parts[0]
+
+        # generate a name for the release. Then, check if already exists
+        kdu_instance = None
+        while kdu_instance is None:
+            kdu_instance = self._generate_release_name(kdu_model)
+            try:
+                result = await self._status_kdu(
+                    cluster_id=cluster_id,
+                    kdu_instance=kdu_instance,
+                    namespace=namespace,
+                    show_error_log=False,
+                )
+                if result is not None:
+                    # instance already exists: generate a new one
+                    kdu_instance = None
+            except K8sException:
+                pass
+
+        command = self._get_install_command(kdu_model, kdu_instance, namespace,
+                                            params_str, version, atomic, timeout)
+
+        self.log.debug("installing: {}".format(command))
+
+        if atomic:
+            # exec helm in a task
+            exec_task = asyncio.ensure_future(
+                coro_or_future=self._local_async_exec(
+                    command=command, raise_exception_on_error=False, env=env
+                )
+            )
+
+            # write status in another task
+            status_task = asyncio.ensure_future(
+                coro_or_future=self._store_status(
+                    cluster_id=cluster_id,
+                    kdu_instance=kdu_instance,
+                    namespace=namespace,
+                    db_dict=db_dict,
+                    operation="install",
+                    run_once=False,
+                )
+            )
+
+            # wait for execution task
+            await asyncio.wait([exec_task])
+
+            # cancel status task
+            status_task.cancel()
+
+            output, rc = exec_task.result()
+
+        else:
+
+            output, rc = await self._local_async_exec(
+                command=command, raise_exception_on_error=False, env=env
+            )
+
+        # remove temporal values yaml file
+        if file_to_delete:
+            os.remove(file_to_delete)
+
+        # write final status
+        await self._store_status(
+            cluster_id=cluster_id,
+            kdu_instance=kdu_instance,
+            namespace=namespace,
+            db_dict=db_dict,
+            operation="install",
+            run_once=True,
+            check_every=0,
+        )
+
+        if rc != 0:
+            msg = "Error executing command: {}\nOutput: {}".format(command, output)
+            self.log.error(msg)
+            raise K8sException(msg)
+
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
+        self.log.debug("Returning kdu_instance {}".format(kdu_instance))
+        return kdu_instance
+
+    async def upgrade(
+        self,
+        cluster_uuid: str,
+        kdu_instance: str,
+        kdu_model: str = None,
+        atomic: bool = True,
+        timeout: float = 300,
+        params: dict = None,
+        db_dict: dict = None,
+    ):
+        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+        self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
+
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
+        # look for instance to obtain namespace
+        instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+        if not instance_info:
+            raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+        # init env, paths
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
+        )
+
+        # params to str
+        params_str, file_to_delete = self._params_to_file_option(
+            cluster_id=cluster_id, params=params
+        )
+
+        # version
+        version = None
+        if ":" in kdu_model:
+            parts = kdu_model.split(sep=":")
+            if len(parts) == 2:
+                version = str(parts[1])
+                kdu_model = parts[0]
+
+        command = self._get_upgrade_command(kdu_model, kdu_instance, instance_info["namespace"],
+                                            params_str, version, atomic, timeout)
+
+        self.log.debug("upgrading: {}".format(command))
+
+        if atomic:
+
+            # exec helm in a task
+            exec_task = asyncio.ensure_future(
+                coro_or_future=self._local_async_exec(
+                    command=command, raise_exception_on_error=False, env=env
+                )
+            )
+            # write status in another task
+            status_task = asyncio.ensure_future(
+                coro_or_future=self._store_status(
+                    cluster_id=cluster_id,
+                    kdu_instance=kdu_instance,
+                    namespace=instance_info["namespace"],
+                    db_dict=db_dict,
+                    operation="upgrade",
+                    run_once=False,
+                )
+            )
+
+            # wait for execution task
+            await asyncio.wait([exec_task])
+
+            # cancel status task
+            status_task.cancel()
+            output, rc = exec_task.result()
+
+        else:
+
+            output, rc = await self._local_async_exec(
+                command=command, raise_exception_on_error=False, env=env
+            )
+
+        # remove temporal values yaml file
+        if file_to_delete:
+            os.remove(file_to_delete)
+
+        # write final status
+        await self._store_status(
+            cluster_id=cluster_id,
+            kdu_instance=kdu_instance,
+            namespace=instance_info["namespace"],
+            db_dict=db_dict,
+            operation="upgrade",
+            run_once=True,
+            check_every=0,
+        )
+
+        if rc != 0:
+            msg = "Error executing command: {}\nOutput: {}".format(command, output)
+            self.log.error(msg)
+            raise K8sException(msg)
+
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
+        # return new revision number
+        instance = await self.get_instance_info(
+            cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+        )
+        if instance:
+            revision = int(instance.get("revision"))
+            self.log.debug("New revision: {}".format(revision))
+            return revision
+        else:
+            return 0
+
+    async def rollback(
+        self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
+    ):
+
+        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+        self.log.debug(
+            "rollback kdu_instance {} to revision {} from cluster {}".format(
+                kdu_instance, revision, cluster_id
+            )
+        )
+
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
+        # look for instance to obtain namespace
+        instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+        if not instance_info:
+            raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+        # init env, paths
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
+        )
+
+        command = self._get_rollback_command(kdu_instance, instance_info["namespace"],
+                                             revision)
+
+        self.log.debug("rolling_back: {}".format(command))
+
+        # exec helm in a task
+        exec_task = asyncio.ensure_future(
+            coro_or_future=self._local_async_exec(
+                command=command, raise_exception_on_error=False, env=env
+            )
+        )
+        # write status in another task
+        status_task = asyncio.ensure_future(
+            coro_or_future=self._store_status(
+                cluster_id=cluster_id,
+                kdu_instance=kdu_instance,
+                namespace=instance_info["namespace"],
+                db_dict=db_dict,
+                operation="rollback",
+                run_once=False,
+            )
+        )
+
+        # wait for execution task
+        await asyncio.wait([exec_task])
+
+        # cancel status task
+        status_task.cancel()
+
+        output, rc = exec_task.result()
+
+        # write final status
+        await self._store_status(
+            cluster_id=cluster_id,
+            kdu_instance=kdu_instance,
+            namespace=instance_info["namespace"],
+            db_dict=db_dict,
+            operation="rollback",
+            run_once=True,
+            check_every=0,
+        )
+
+        if rc != 0:
+            msg = "Error executing command: {}\nOutput: {}".format(command, output)
+            self.log.error(msg)
+            raise K8sException(msg)
+
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
+        # return new revision number
+        instance = await self.get_instance_info(
+            cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+        )
+        if instance:
+            revision = int(instance.get("revision"))
+            self.log.debug("New revision: {}".format(revision))
+            return revision
+        else:
+            return 0
+
+    async def uninstall(self, cluster_uuid: str, kdu_instance: str):
+        """
+        Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
+        (this call should happen after all _terminate-config-primitive_ of the VNF
+        are invoked).
+
+        :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
+        :param kdu_instance: unique name for the KDU instance to be deleted
+        :return: True if successful
+        """
+
+        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+        self.log.debug(
+            "uninstall kdu_instance {} from cluster {}".format(
+                kdu_instance, cluster_id
+            )
+        )
+
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
+        # look for instance to obtain namespace
+        instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+        if not instance_info:
+            raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+        # init env, paths
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
+        )
+
+        command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
+        output, _rc = await self._local_async_exec(
+            command=command, raise_exception_on_error=True, env=env
+        )
+
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
+        return self._output_to_table(output)
+
+    async def instances_list(self, cluster_uuid: str) -> list:
+        """
+        returns a list of deployed releases in a cluster
+
+        :param cluster_uuid: the 'cluster' or 'namespace:cluster'
+        :return:
+        """
+
+        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+        self.log.debug("list releases for cluster {}".format(cluster_id))
+
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
+        # execute internal command
+        result = await self._instances_list(cluster_id)
+
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
+        return result
+
+    async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
+        instances = await self.instances_list(cluster_uuid=cluster_uuid)
+        for instance in instances:
+            if instance.get("name") == kdu_instance:
+                return instance
+        self.log.debug("Instance {} not found".format(kdu_instance))
+        return None
+
+    async def exec_primitive(
+        self,
+        cluster_uuid: str = None,
+        kdu_instance: str = None,
+        primitive_name: str = None,
+        timeout: float = 300,
+        params: dict = None,
+        db_dict: dict = None,
+    ) -> str:
+        """Exec primitive (Juju action)
+
+        :param cluster_uuid: The UUID of the cluster or namespace:cluster
+        :param kdu_instance: The unique name of the KDU instance
+        :param primitive_name: Name of action that will be executed
+        :param timeout: Timeout for action execution
+        :param params: Dictionary of all the parameters needed for the action
+        :db_dict: Dictionary for any additional data
+
+        :return: Returns the output of the action
+        """
+        raise K8sException(
+            "KDUs deployed with Helm don't support actions "
+            "different from rollback, upgrade and status"
+        )
+
+    async def get_services(self,
+                           cluster_uuid: str,
+                           kdu_instance: str,
+                           namespace: str) -> list:
+        """
+        Returns a list of services defined for the specified kdu instance.
+
+        :param cluster_uuid: UUID of a K8s cluster known by OSM
+        :param kdu_instance: unique name for the KDU instance
+        :param namespace: K8s namespace used by the KDU instance
+        :return: If successful, it will return a list of services, Each service
+        can have the following data:
+        - `name` of the service
+        - `type` type of service in the k8 cluster
+        - `ports` List of ports offered by the service, for each port includes at least
+        name, port, protocol
+        - `cluster_ip` Internal ip to be used inside k8s cluster
+        - `external_ip` List of external ips (in case they are available)
+        """
+
+        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+        self.log.debug(
+            "get_services: cluster_uuid: {}, kdu_instance: {}".format(
+                cluster_uuid, kdu_instance
+            )
+        )
+
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
+        # get list of services names for kdu
+        service_names = await self._get_services(cluster_id, kdu_instance, namespace)
+
+        service_list = []
+        for service in service_names:
+            service = await self._get_service(cluster_id, service, namespace)
+            service_list.append(service)
+
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
+        return service_list
+
+    async def get_service(self,
+                          cluster_uuid: str,
+                          service_name: str,
+                          namespace: str) -> object:
+
+        self.log.debug(
+            "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
+                service_name, namespace, cluster_uuid)
+        )
+
+        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
+        service = await self._get_service(cluster_id, service_name, namespace)
+
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
+        return service
+
+    async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
+
+        self.log.debug(
+            "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
+                cluster_uuid, kdu_instance
+            )
+        )
+
+        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+
+        # sync local dir
+        self.fs.sync(from_path=cluster_id)
+
+        # get instance: needed to obtain namespace
+        instances = await self._instances_list(cluster_id=cluster_id)
+        for instance in instances:
+            if instance.get("name") == kdu_instance:
+                break
+        else:
+            # instance does not exist
+            raise K8sException("Instance name: {} not found in cluster: {}".format(
+                kdu_instance, cluster_id))
+
+        status = await self._status_kdu(
+            cluster_id=cluster_id,
+            kdu_instance=kdu_instance,
+            namespace=instance["namespace"],
+            show_error_log=True,
+            return_text=True,
+        )
+
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
+        return status
+
+    async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
+
+        self.log.debug(
+            "inspect kdu_model values {} from (optional) repo: {}".format(
+                kdu_model, repo_url
+            )
+        )
+
+        return await self._exec_inspect_comand(
+            inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
+        )
+
+    async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
+
+        self.log.debug(
+            "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
+        )
+
+        return await self._exec_inspect_comand(
+            inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
+        )
+
+    async def synchronize_repos(self, cluster_uuid: str):
+
+        self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
+        try:
+            db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
+            db_repo_dict = self._get_db_repos_dict(db_repo_ids)
+
+            local_repo_list = await self.repo_list(cluster_uuid)
+            local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
+
+            deleted_repo_list = []
+            added_repo_dict = {}
+
+            # iterate over the list of repos in the database that should be
+            # added if not present
+            for repo_name, db_repo in db_repo_dict.items():
+                try:
+                    # check if it is already present
+                    curr_repo_url = local_repo_dict.get(db_repo["name"])
+                    repo_id = db_repo.get("_id")
+                    if curr_repo_url != db_repo["url"]:
+                        if curr_repo_url:
+                            self.log.debug("repo {} url changed, delete and and again".format(
+                                db_repo["url"]))
+                            await self.repo_remove(cluster_uuid, db_repo["name"])
+                            deleted_repo_list.append(repo_id)
+
+                        # add repo
+                        self.log.debug("add repo {}".format(db_repo["name"]))
+                        await  self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
+                        added_repo_dict[repo_id] = db_repo["name"]
+                except Exception as e:
+                    raise K8sException(
+                        "Error adding repo id: {}, err_msg: {} ".format(
+                            repo_id, repr(e)
+                        )
+                    )
+
+            # Delete repos that are present but not in nbi_list
+            for repo_name in local_repo_dict:
+                if not db_repo_dict.get(repo_name) and repo_name != "stable":
+                    self.log.debug("delete repo {}".format(repo_name))
+                    try:
+                        await self.repo_remove(cluster_uuid, repo_name)
+                        deleted_repo_list.append(repo_name)
+                    except Exception as e:
+                        self.warning(
+                            "Error deleting repo, name: {}, err_msg: {}".format(
+                                repo_name, str(e)
+                            )
+                        )
+
+            return deleted_repo_list, added_repo_dict
+
+        except K8sException:
+            raise
+        except Exception as e:
+            # Do not raise errors synchronizing repos
+            self.log.error("Error synchronizing repos: {}".format(e))
+            raise Exception("Error synchronizing repos: {}".format(e))
+
+    def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
+        repo_ids = []
+        cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
+        cluster = self.db.get_one("k8sclusters", cluster_filter)
+        if cluster:
+            repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
+            return repo_ids
+        else:
+            raise K8sException(
+                "k8cluster with helm-id : {} not found".format(cluster_uuid)
+            )
+
+    def _get_db_repos_dict(self, repo_ids: list):
+        db_repos_dict = {}
+        for repo_id in repo_ids:
+            db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
+            db_repos_dict[db_repo["name"]] = db_repo
+        return db_repos_dict
+
+    """
+    ####################################################################################
+    ################################### TO BE IMPLEMENTED SUBCLASSES ###################
+    ####################################################################################
+    """
+
+    @abc.abstractmethod
+    def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
+        """
+        Creates and returns base cluster and kube dirs and returns them.
+        Also created helm3 dirs according to new directory specification, paths are
+        not returned but assigned to helm environment variables
+
+        :param cluster_name:  cluster_name
+        :return: Dictionary with config_paths and dictionary with helm environment variables
+        """
+
+    @abc.abstractmethod
+    async def _cluster_init(self, cluster_id, namespace, paths, env):
+        """
+        Implements the helm version dependent cluster initialization
+        """
+
+    @abc.abstractmethod
+    async def _instances_list(self, cluster_id):
+        """
+        Implements the helm version dependent helm instances list
+        """
+
+    @abc.abstractmethod
+    async def _get_services(self, cluster_id, kdu_instance, namespace):
+        """
+        Implements the helm version dependent method to obtain services from a helm instance
+        """
+
+    @abc.abstractmethod
+    async def _status_kdu(self, cluster_id: str, kdu_instance: str, namespace: str = None,
+                          show_error_log: bool = False, return_text: bool = False):
+        """
+        Implements the helm version dependent method to obtain status of a helm instance
+        """
+
+    @abc.abstractmethod
+    def _get_install_command(self, kdu_model, kdu_instance, namespace,
+                             params_str, version, atomic, timeout) -> str:
+        """
+        Obtain command to be executed to delete the indicated instance
+        """
+
+    @abc.abstractmethod
+    def _get_upgrade_command(self, kdu_model, kdu_instance, namespace,
+                             params_str, version, atomic, timeout) -> str:
+        """
+        Obtain command to be executed to upgrade the indicated instance
+        """
+
+    @abc.abstractmethod
+    def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
+        """
+        Obtain command to be executed to rollback the indicated instance
+        """
+
+    @abc.abstractmethod
+    def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
+        """
+        Obtain command to be executed to delete the indicated instance
+        """
+
+    @abc.abstractmethod
+    def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
+                             version: str):
+        """
+        Obtain command to be executed to obtain information about the kdu
+        """
+
+    @abc.abstractmethod
+    async def _uninstall_sw(self, cluster_id: str, namespace: str):
+        """
+        Method call to uninstall cluster software for helm. This method is dependent
+        of helm version
+        For Helm v2 it will be called when Tiller must be uninstalled
+        For Helm v3 it does nothing and does not need to be callled
+        """
+
+    """
+    ####################################################################################
+    ################################### P R I V A T E ##################################
+    ####################################################################################
+    """
+
+    @staticmethod
+    def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
+        if os.path.exists(filename):
+            return True
+        else:
+            msg = "File {} does not exist".format(filename)
+            if exception_if_not_exists:
+                raise K8sException(msg)
+
+    @staticmethod
+    def _remove_multiple_spaces(strobj):
+        strobj = strobj.strip()
+        while "  " in strobj:
+            strobj = strobj.replace("  ", " ")
+        return strobj
+
+    @staticmethod
+    def _output_to_lines(output: str) -> list:
+        output_lines = list()
+        lines = output.splitlines(keepends=False)
+        for line in lines:
+            line = line.strip()
+            if len(line) > 0:
+                output_lines.append(line)
+        return output_lines
+
+    @staticmethod
+    def _output_to_table(output: str) -> list:
+        output_table = list()
+        lines = output.splitlines(keepends=False)
+        for line in lines:
+            line = line.replace("\t", " ")
+            line_list = list()
+            output_table.append(line_list)
+            cells = line.split(sep=" ")
+            for cell in cells:
+                cell = cell.strip()
+                if len(cell) > 0:
+                    line_list.append(cell)
+        return output_table
+
+    @staticmethod
+    def _parse_services(output: str) -> list:
+        lines = output.splitlines(keepends=False)
+        services = []
+        for line in lines:
+            line = line.replace("\t", " ")
+            cells = line.split(sep=" ")
+            if len(cells) > 0 and cells[0].startswith("service/"):
+                elems = cells[0].split(sep="/")
+                if len(elems) > 1:
+                    services.append(elems[1])
+        return services
+
+    @staticmethod
+    def _get_deep(dictionary: dict, members: tuple):
+        target = dictionary
+        value = None
+        try:
+            for m in members:
+                value = target.get(m)
+                if not value:
+                    return None
+                else:
+                    target = value
+        except Exception:
+            pass
+        return value
+
+    # find key:value in several lines
+    @staticmethod
+    def _find_in_lines(p_lines: list, p_key: str) -> str:
+        for line in p_lines:
+            try:
+                if line.startswith(p_key + ":"):
+                    parts = line.split(":")
+                    the_value = parts[1].strip()
+                    return the_value
+            except Exception:
+                # ignore it
+                pass
+        return None
+
+    @staticmethod
+    def _lower_keys_list(input_list: list):
+        """
+        Transform the keys in a list of dictionaries to lower case and returns a new list
+        of dictionaries
+        """
+        new_list = []
+        for dictionary in input_list:
+            new_dict = dict((k.lower(), v) for k, v in dictionary.items())
+            new_list.append(new_dict)
+        return new_list
+
+    def _local_exec(self, command: str) -> (str, int):
+        command = self._remove_multiple_spaces(command)
+        self.log.debug("Executing sync local command: {}".format(command))
+        # raise exception if fails
+        output = ""
+        try:
+            output = subprocess.check_output(
+                command, shell=True, universal_newlines=True
+            )
+            return_code = 0
+            self.log.debug(output)
+        except Exception:
+            return_code = 1
+
+        return output, return_code
+
+    async def _local_async_exec(
+        self,
+        command: str,
+        raise_exception_on_error: bool = False,
+        show_error_log: bool = True,
+        encode_utf8: bool = False,
+        env: dict = None
+    ) -> (str, int):
+
+        command = K8sHelmBaseConnector._remove_multiple_spaces(command)
+        self.log.debug("Executing async local command: {}, env: {}".format(command, env))
+
+        # split command
+        command = shlex.split(command)
+
+        environ = os.environ.copy()
+        if env:
+            environ.update(env)
+
+        try:
+            process = await asyncio.create_subprocess_exec(
+                *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
+                env=environ
+            )
+
+            # wait for command terminate
+            stdout, stderr = await process.communicate()
+
+            return_code = process.returncode
+
+            output = ""
+            if stdout:
+                output = stdout.decode("utf-8").strip()
+                # output = stdout.decode()
+            if stderr:
+                output = stderr.decode("utf-8").strip()
+                # output = stderr.decode()
+
+            if return_code != 0 and show_error_log:
+                self.log.debug(
+                    "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
+                )
+            else:
+                self.log.debug("Return code: {}".format(return_code))
+
+            if raise_exception_on_error and return_code != 0:
+                raise K8sException(output)
+
+            if encode_utf8:
+                output = output.encode("utf-8").strip()
+                output = str(output).replace("\\n", "\n")
+
+            return output, return_code
+
+        except asyncio.CancelledError:
+            raise
+        except K8sException:
+            raise
+        except Exception as e:
+            msg = "Exception executing command: {} -> {}".format(command, e)
+            self.log.error(msg)
+            if raise_exception_on_error:
+                raise K8sException(e) from e
+            else:
+                return "", -1
+
+    async def _local_async_exec_pipe(self,
+                                     command1: str,
+                                     command2: str,
+                                     raise_exception_on_error: bool = True,
+                                     show_error_log: bool = True,
+                                     encode_utf8: bool = False,
+                                     env: dict = None):
+
+        command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
+        command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
+        command = "{} | {}".format(command1, command2)
+        self.log.debug("Executing async local command: {}, env: {}".format(command, env))
+
+        # split command
+        command1 = shlex.split(command1)
+        command2 = shlex.split(command2)
+
+        environ = os.environ.copy()
+        if env:
+            environ.update(env)
+
+        try:
+            read, write = os.pipe()
+            await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
+            os.close(write)
+            process_2 = await asyncio.create_subprocess_exec(*command2, stdin=read,
+                                                             stdout=asyncio.subprocess.PIPE,
+                                                             env=environ)
+            os.close(read)
+            stdout, stderr = await process_2.communicate()
+
+            return_code = process_2.returncode
+
+            output = ""
+            if stdout:
+                output = stdout.decode("utf-8").strip()
+                # output = stdout.decode()
+            if stderr:
+                output = stderr.decode("utf-8").strip()
+                # output = stderr.decode()
+
+            if return_code != 0 and show_error_log:
+                self.log.debug(
+                    "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
+                )
+            else:
+                self.log.debug("Return code: {}".format(return_code))
+
+            if raise_exception_on_error and return_code != 0:
+                raise K8sException(output)
+
+            if encode_utf8:
+                output = output.encode("utf-8").strip()
+                output = str(output).replace("\\n", "\n")
+
+            return output, return_code
+        except asyncio.CancelledError:
+            raise
+        except K8sException:
+            raise
+        except Exception as e:
+            msg = "Exception executing command: {} -> {}".format(command, e)
+            self.log.error(msg)
+            if raise_exception_on_error:
+                raise K8sException(e) from e
+            else:
+                return "", -1
+
+    async def _get_service(self, cluster_id, service_name, namespace):
+        """
+        Obtains the data of the specified service in the k8cluster.
+
+        :param cluster_id: id of a K8s cluster known by OSM
+        :param service_name: name of the K8s service in the specified namespace
+        :param namespace: K8s namespace used by the KDU instance
+        :return: If successful, it will return a service with the following data:
+        - `name` of the service
+        - `type` type of service in the k8 cluster
+        - `ports` List of ports offered by the service, for each port includes at least
+        name, port, protocol
+        - `cluster_ip` Internal ip to be used inside k8s cluster
+        - `external_ip` List of external ips (in case they are available)
+        """
+
+        # init config, env
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
+        )
+
+        command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
+            self.kubectl_command, paths["kube_config"], namespace, service_name
+        )
+
+        output, _rc = await self._local_async_exec(
+            command=command, raise_exception_on_error=True, env=env
+        )
+
+        data = yaml.load(output, Loader=yaml.SafeLoader)
+
+        service = {
+            "name": service_name,
+            "type": self._get_deep(data, ("spec", "type")),
+            "ports": self._get_deep(data, ("spec", "ports")),
+            "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
+        }
+        if service["type"] == "LoadBalancer":
+            ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
+            ip_list = [elem["ip"] for elem in ip_map_list]
+            service["external_ip"] = ip_list
+
+        return service
+
+    async def _exec_inspect_comand(
+        self, inspect_command: str, kdu_model: str, repo_url: str = None
+    ):
+        """
+        Obtains information about a kdu, no cluster (no env)
+        """
+
+        repo_str = ""
+        if repo_url:
+            repo_str = " --repo {}".format(repo_url)
+
+        idx = kdu_model.find("/")
+        if idx >= 0:
+            idx += 1
+            kdu_model = kdu_model[idx:]
+
+        version = ""
+        if ":" in kdu_model:
+            parts = kdu_model.split(sep=":")
+            if len(parts) == 2:
+                version = "--version {}".format(str(parts[1]))
+                kdu_model = parts[0]
+
+        full_command = self._get_inspect_command(inspect_command, kdu_model, repo_str, version)
+        output, _rc = await self._local_async_exec(
+            command=full_command, encode_utf8=True
+        )
+
+        return output
+
+    async def _store_status(
+        self,
+        cluster_id: str,
+        operation: str,
+        kdu_instance: str,
+        namespace: str = None,
+        check_every: float = 10,
+        db_dict: dict = None,
+        run_once: bool = False,
+    ):
+        while True:
+            try:
+                await asyncio.sleep(check_every)
+                detailed_status = await self._status_kdu(
+                    cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=namespace,
+                    return_text=False
+                )
+                status = detailed_status.get("info").get("description")
+                self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
+                # write status to db
+                result = await self.write_app_status_to_db(
+                    db_dict=db_dict,
+                    status=str(status),
+                    detailed_status=str(detailed_status),
+                    operation=operation,
+                )
+                if not result:
+                    self.log.info("Error writing in database. Task exiting...")
+                    return
+            except asyncio.CancelledError:
+                self.log.debug("Task cancelled")
+                return
+            except Exception as e:
+                self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
+                pass
+            finally:
+                if run_once:
+                    return
+
+    # params for use in -f file
+    # returns values file option and filename (in order to delete it at the end)
+    def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
+
+        if params and len(params) > 0:
+            self._init_paths_env(
+                cluster_name=cluster_id, create_if_not_exist=True
+            )
+
+            def get_random_number():
+                r = random.randrange(start=1, stop=99999999)
+                s = str(r)
+                while len(s) < 10:
+                    s = "0" + s
+                return s
+
+            params2 = dict()
+            for key in params:
+                value = params.get(key)
+                if "!!yaml" in str(value):
+                    value = yaml.load(value[7:])
+                params2[key] = value
+
+            values_file = get_random_number() + ".yaml"
+            with open(values_file, "w") as stream:
+                yaml.dump(params2, stream, indent=4, default_flow_style=False)
+
+            return "-f {}".format(values_file), values_file
+
+        return "", None
+
+    # params for use in --set option
+    @staticmethod
+    def _params_to_set_option(params: dict) -> str:
+        params_str = ""
+        if params and len(params) > 0:
+            start = True
+            for key in params:
+                value = params.get(key, None)
+                if value is not None:
+                    if start:
+                        params_str += "--set "
+                        start = False
+                    else:
+                        params_str += ","
+                    params_str += "{}={}".format(key, value)
+        return params_str
+
+    @staticmethod
+    def _generate_release_name(chart_name: str):
+        # check embeded chart (file or dir)
+        if chart_name.startswith("/"):
+            # extract file or directory name
+            chart_name = chart_name[chart_name.rfind("/") + 1:]
+        # check URL
+        elif "://" in chart_name:
+            # extract last portion of URL
+            chart_name = chart_name[chart_name.rfind("/") + 1:]
+
+        name = ""
+        for c in chart_name:
+            if c.isalpha() or c.isnumeric():
+                name += c
+            else:
+                name += "-"
+        if len(name) > 35:
+            name = name[0:35]
+
+        # if does not start with alpha character, prefix 'a'
+        if not name[0].isalpha():
+            name = "a" + name
+
+        name += "-"
+
+        def get_random_number():
+            r = random.randrange(start=1, stop=99999999)
+            s = str(r)
+            s = s.rjust(10, "0")
+            return s
+
+        name = name + get_random_number()
+        return name.lower()