Bug 1980 fixed
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
index a79f038..5079dbe 100644 (file)
 ##
 import abc
 import asyncio
+from typing import Union
 import random
 import time
 import shlex
 import shutil
 import stat
-import subprocess
 import os
 import yaml
 from uuid import uuid4
 
+from n2vc.config import EnvironConfig
 from n2vc.exceptions import K8sException
 from n2vc.k8s_conn import K8sConnector
 
@@ -42,8 +43,8 @@ class K8sHelmBaseConnector(K8sConnector):
     ################################### P U B L I C ####################################
     ####################################################################################
     """
+
     service_account = "osm"
-    _STABLE_REPO_URL = "https://charts.helm.sh/stable"
 
     def __init__(
         self,
@@ -53,7 +54,6 @@ class K8sHelmBaseConnector(K8sConnector):
         helm_command: str = "/usr/bin/helm",
         log: object = None,
         on_update_db=None,
-        vca_config: dict = None,
     ):
         """
 
@@ -70,6 +70,7 @@ class K8sHelmBaseConnector(K8sConnector):
 
         self.log.info("Initializing K8S Helm connector")
 
+        self.config = EnvironConfig()
         # random numbers for release name generation
         random.seed(time.time())
 
@@ -85,22 +86,29 @@ class K8sHelmBaseConnector(K8sConnector):
         self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
 
         # obtain stable repo url from config or apply default
-        if not vca_config or not vca_config.get("stablerepourl"):
-            self._stable_repo_url = self._STABLE_REPO_URL
-        else:
-            self._stable_repo_url = vca_config.get("stablerepourl")
+        self._stable_repo_url = self.config.get("stablerepourl")
+        if self._stable_repo_url == "None":
+            self._stable_repo_url = None
 
-    @staticmethod
-    def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
+    def _get_namespace(self, cluster_uuid: str) -> str:
         """
-        Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
-        cluster_id for backward compatibility
+        Obtains the namespace used by the cluster with the uuid passed by argument
+
+        param: cluster_uuid: cluster's uuid
         """
-        namespace, _, cluster_id = cluster_uuid.rpartition(':')
-        return namespace, cluster_id
+
+        # first, obtain the cluster corresponding to the uuid passed by argument
+        k8scluster = self.db.get_one(
+            "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
+        )
+        return k8scluster.get("namespace")
 
     async def init_env(
-            self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
+        self,
+        k8s_creds: str,
+        namespace: str = "kube-system",
+        reuse_cluster_uuid=None,
+        **kwargs,
     ) -> (str, bool):
         """
         It prepares a given K8s cluster environment to run Charts
@@ -110,19 +118,20 @@ class K8sHelmBaseConnector(K8sConnector):
         :param namespace: optional namespace to be used for helm. By default,
             'kube-system' will be used
         :param reuse_cluster_uuid: existing cluster uuid for reuse
+        :param kwargs: Additional parameters (None yet)
         :return: uuid of the K8s cluster and True if connector has installed some
             software in the cluster
         (on error, an exception will be raised)
         """
 
         if reuse_cluster_uuid:
-            namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
-            namespace = namespace_ or namespace
+            cluster_id = reuse_cluster_uuid
         else:
             cluster_id = str(uuid4())
-        cluster_uuid = "{}:{}".format(namespace, cluster_id)
 
-        self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
+        self.log.debug(
+            "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
+        )
 
         paths, env = self._init_paths_env(
             cluster_name=cluster_id, create_if_not_exist=True
@@ -140,39 +149,68 @@ class K8sHelmBaseConnector(K8sConnector):
 
         self.log.info("Cluster {} initialized".format(cluster_id))
 
-        return cluster_uuid, n2vc_installed_sw
+        return cluster_id, n2vc_installed_sw
 
     async def repo_add(
-            self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
+        self,
+        cluster_uuid: str,
+        name: str,
+        url: str,
+        repo_type: str = "chart",
+        cert: str = None,
+        user: str = None,
+        password: str = None,
     ):
-        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
-        self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
-            cluster_id, repo_type, name, url))
-
-        # sync local dir
-        self.fs.sync(from_path=cluster_id)
+        self.log.debug(
+            "Cluster {}, adding {} repository {}. URL: {}".format(
+                cluster_uuid, repo_type, name, url
+            )
+        )
 
         # init_env
         paths, env = self._init_paths_env(
-            cluster_name=cluster_id, create_if_not_exist=True
+            cluster_name=cluster_uuid, create_if_not_exist=True
         )
 
+        # sync local dir
+        self.fs.sync(from_path=cluster_uuid)
+
         # helm repo update
-        command = "{} repo update".format(
-            self._helm_command
+        command = "env KUBECONFIG={} {} repo update".format(
+            paths["kube_config"], self._helm_command
         )
         self.log.debug("updating repo: {}".format(command))
-        await self._local_async_exec(command=command, raise_exception_on_error=False, env=env)
+        await self._local_async_exec(
+            command=command, raise_exception_on_error=False, env=env
+        )
 
         # helm repo add name url
-        command = "{} repo add {} {}".format(
-            self._helm_command, name, url
+        command = ("env KUBECONFIG={} {} repo add {} {}").format(
+            paths["kube_config"], self._helm_command, name, url
         )
+
+        if cert:
+            temp_cert_file = os.path.join(
+                self.fs.path, "{}/helmcerts/".format(cluster_id), "temp.crt"
+            )
+            os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
+            with open(temp_cert_file, "w") as the_cert:
+                the_cert.write(cert)
+            command += " --ca-file {}".format(temp_cert_file)
+
+        if user:
+            command += " --username={}".format(user)
+
+        if password:
+            command += " --password={}".format(password)
+
         self.log.debug("adding repo: {}".format(command))
-        await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
+        await self._local_async_exec(
+            command=command, raise_exception_on_error=True, env=env
+        )
 
         # sync fs
-        self.fs.reverse_sync(from_path=cluster_id)
+        self.fs.reverse_sync(from_path=cluster_uuid)
 
     async def repo_list(self, cluster_uuid: str) -> list:
         """
@@ -181,19 +219,18 @@ class K8sHelmBaseConnector(K8sConnector):
         :return: list of registered repositories: [ (name, url) .... ]
         """
 
-        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
-        self.log.debug("list repositories for cluster {}".format(cluster_id))
-
-        # sync local dir
-        self.fs.sync(from_path=cluster_id)
+        self.log.debug("list repositories for cluster {}".format(cluster_uuid))
 
         # config filename
         paths, env = self._init_paths_env(
-            cluster_name=cluster_id, create_if_not_exist=True
+            cluster_name=cluster_uuid, create_if_not_exist=True
         )
 
-        command = "{} repo list --output yaml".format(
-            self._helm_command
+        # sync local dir
+        self.fs.sync(from_path=cluster_uuid)
+
+        command = "env KUBECONFIG={} {} repo list --output yaml".format(
+            paths["kube_config"], self._helm_command
         )
 
         # Set exception to false because if there are no repos just want an empty list
@@ -202,7 +239,7 @@ class K8sHelmBaseConnector(K8sConnector):
         )
 
         # sync fs
-        self.fs.reverse_sync(from_path=cluster_id)
+        self.fs.reverse_sync(from_path=cluster_uuid)
 
         if _rc == 0:
             if output and len(output) > 0:
@@ -215,36 +252,54 @@ class K8sHelmBaseConnector(K8sConnector):
             return []
 
     async def repo_remove(self, cluster_uuid: str, name: str):
-
-        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
-        self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
-
-        # sync local dir
-        self.fs.sync(from_path=cluster_id)
+        self.log.debug(
+            "remove {} repositories for cluster {}".format(name, cluster_uuid)
+        )
 
         # init env, paths
         paths, env = self._init_paths_env(
-            cluster_name=cluster_id, create_if_not_exist=True
+            cluster_name=cluster_uuid, create_if_not_exist=True
         )
 
-        command = "{} repo remove {}".format(
-            self._helm_command, name
+        # sync local dir
+        self.fs.sync(from_path=cluster_uuid)
+
+        command = "env KUBECONFIG={} {} repo remove {}".format(
+            paths["kube_config"], self._helm_command, name
+        )
+        await self._local_async_exec(
+            command=command, raise_exception_on_error=True, env=env
         )
-        await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
 
         # sync fs
-        self.fs.reverse_sync(from_path=cluster_id)
+        self.fs.reverse_sync(from_path=cluster_uuid)
 
     async def reset(
-            self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
+        self,
+        cluster_uuid: str,
+        force: bool = False,
+        uninstall_sw: bool = False,
+        **kwargs,
     ) -> bool:
+        """Reset a cluster
 
-        namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
-        self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
-                       .format(cluster_id, uninstall_sw))
+        Resets the Kubernetes cluster by removing the helm deployment that represents it.
+
+        :param cluster_uuid: The UUID of the cluster to reset
+        :param force: Boolean to force the reset
+        :param uninstall_sw: Boolean to force the reset
+        :param kwargs: Additional parameters (None yet)
+        :return: Returns True if successful or raises an exception.
+        """
+        namespace = self._get_namespace(cluster_uuid=cluster_uuid)
+        self.log.debug(
+            "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
+                cluster_uuid, uninstall_sw
+            )
+        )
 
         # sync local dir
-        self.fs.sync(from_path=cluster_id)
+        self.fs.sync(from_path=cluster_uuid)
 
         # uninstall releases if needed.
         if uninstall_sw:
@@ -266,56 +321,68 @@ class K8sHelmBaseConnector(K8sConnector):
                             # that in some cases of previously installed helm releases it
                             # raised an error
                             self.log.warn(
-                                "Error uninstalling release {}: {}".format(kdu_instance, e)
+                                "Error uninstalling release {}: {}".format(
+                                    kdu_instance, e
+                                )
                             )
                 else:
                     msg = (
                         "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
-                    ).format(cluster_id)
+                    ).format(cluster_uuid)
                     self.log.warn(msg)
-                    uninstall_sw = False  # Allow to remove k8s cluster without removing Tiller
+                    uninstall_sw = (
+                        False  # Allow to remove k8s cluster without removing Tiller
+                    )
 
         if uninstall_sw:
-            await self._uninstall_sw(cluster_idnamespace)
+            await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
 
         # delete cluster directory
-        self.log.debug("Removing directory {}".format(cluster_id))
-        self.fs.file_delete(cluster_id, ignore_non_exist=True)
+        self.log.debug("Removing directory {}".format(cluster_uuid))
+        self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
         # Remove also local directorio if still exist
-        direct = self.fs.path + "/" + cluster_id
+        direct = self.fs.path + "/" + cluster_uuid
         shutil.rmtree(direct, ignore_errors=True)
 
         return True
 
     async def _install_impl(
-            self,
-            cluster_id: str,
-            kdu_model: str,
-            paths: dict,
-            env: dict,
-            kdu_instance: str,
-            atomic: bool = True,
-            timeout: float = 300,
-            params: dict = None,
-            db_dict: dict = None,
-            kdu_name: str = None,
-            namespace: str = None,
+        self,
+        cluster_id: str,
+        kdu_model: str,
+        paths: dict,
+        env: dict,
+        kdu_instance: str,
+        atomic: bool = True,
+        timeout: float = 300,
+        params: dict = None,
+        db_dict: dict = None,
+        kdu_name: str = None,
+        namespace: str = None,
     ):
+        # init env, paths
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
+        )
+
         # params to str
         params_str, file_to_delete = self._params_to_file_option(
             cluster_id=cluster_id, params=params
         )
 
         # version
-        version = None
-        if ":" in kdu_model:
-            parts = kdu_model.split(sep=":")
-            if len(parts) == 2:
-                version = str(parts[1])
-                kdu_model = parts[0]
-
-        command = self._get_install_command(kdu_model, kdu_instance, namespace,
-                                            params_str, version, atomic, timeout)
+        kdu_model, version = self._split_version(kdu_model)
+
+        command = self._get_install_command(
+            kdu_model,
+            kdu_instance,
+            namespace,
+            params_str,
+            version,
+            atomic,
+            timeout,
+            paths["kube_config"],
+        )
 
         self.log.debug("installing: {}".format(command))
 
@@ -383,11 +450,10 @@ class K8sHelmBaseConnector(K8sConnector):
         params: dict = None,
         db_dict: dict = None,
     ):
-        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
-        self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
+        self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
 
         # sync local dir
-        self.fs.sync(from_path=cluster_id)
+        self.fs.sync(from_path=cluster_uuid)
 
         # look for instance to obtain namespace
         instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
@@ -396,24 +462,30 @@ class K8sHelmBaseConnector(K8sConnector):
 
         # init env, paths
         paths, env = self._init_paths_env(
-            cluster_name=cluster_id, create_if_not_exist=True
+            cluster_name=cluster_uuid, create_if_not_exist=True
         )
 
+        # sync local dir
+        self.fs.sync(from_path=cluster_uuid)
+
         # params to str
         params_str, file_to_delete = self._params_to_file_option(
-            cluster_id=cluster_id, params=params
+            cluster_id=cluster_uuid, params=params
         )
 
         # version
-        version = None
-        if ":" in kdu_model:
-            parts = kdu_model.split(sep=":")
-            if len(parts) == 2:
-                version = str(parts[1])
-                kdu_model = parts[0]
-
-        command = self._get_upgrade_command(kdu_model, kdu_instance, instance_info["namespace"],
-                                            params_str, version, atomic, timeout)
+        kdu_model, version = self._split_version(kdu_model)
+
+        command = self._get_upgrade_command(
+            kdu_model,
+            kdu_instance,
+            instance_info["namespace"],
+            params_str,
+            version,
+            atomic,
+            timeout,
+            paths["kube_config"],
+        )
 
         self.log.debug("upgrading: {}".format(command))
 
@@ -428,7 +500,7 @@ class K8sHelmBaseConnector(K8sConnector):
             # write status in another task
             status_task = asyncio.ensure_future(
                 coro_or_future=self._store_status(
-                    cluster_id=cluster_id,
+                    cluster_id=cluster_uuid,
                     kdu_instance=kdu_instance,
                     namespace=instance_info["namespace"],
                     db_dict=db_dict,
@@ -456,7 +528,7 @@ class K8sHelmBaseConnector(K8sConnector):
 
         # write final status
         await self._store_status(
-            cluster_id=cluster_id,
+            cluster_id=cluster_uuid,
             kdu_instance=kdu_instance,
             namespace=instance_info["namespace"],
             db_dict=db_dict,
@@ -471,7 +543,7 @@ class K8sHelmBaseConnector(K8sConnector):
             raise K8sException(msg)
 
         # sync fs
-        self.fs.reverse_sync(from_path=cluster_id)
+        self.fs.reverse_sync(from_path=cluster_uuid)
 
         # return new revision number
         instance = await self.get_instance_info(
@@ -484,19 +556,204 @@ class K8sHelmBaseConnector(K8sConnector):
         else:
             return 0
 
+    async def scale(
+        self,
+        kdu_instance: str,
+        scale: int,
+        resource_name: str,
+        total_timeout: float = 1800,
+        cluster_uuid: str = None,
+        kdu_model: str = None,
+        atomic: bool = True,
+        db_dict: dict = None,
+        **kwargs,
+    ):
+        """Scale a resource in a Helm Chart.
+
+        Args:
+            kdu_instance: KDU instance name
+            scale: Scale to which to set the resource
+            resource_name: Resource name
+            total_timeout: The time, in seconds, to wait
+            cluster_uuid: The UUID of the cluster
+            kdu_model: The chart reference
+            atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
+                The --wait flag will be set automatically if --atomic is used
+            db_dict: Dictionary for any additional data
+            kwargs: Additional parameters
+
+        Returns:
+            True if successful, False otherwise
+        """
+
+        debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
+        if resource_name:
+            debug_mgs = "scaling resource {} in model {} (cluster {})".format(
+                resource_name, kdu_model, cluster_uuid
+            )
+
+        self.log.debug(debug_mgs)
+
+        # look for instance to obtain namespace
+        # get_instance_info function calls the sync command
+        instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+        if not instance_info:
+            raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+        # init env, paths
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+
+        # version
+        kdu_model, version = self._split_version(kdu_model)
+
+        repo_url = await self._find_repo(kdu_model, cluster_uuid)
+        if not repo_url:
+            raise K8sException(
+                "Repository not found for kdu_model {}".format(kdu_model)
+            )
+
+        _, replica_str = await self._get_replica_count_url(
+            kdu_model, repo_url, resource_name
+        )
+
+        command = self._get_upgrade_scale_command(
+            kdu_model,
+            kdu_instance,
+            instance_info["namespace"],
+            scale,
+            version,
+            atomic,
+            replica_str,
+            total_timeout,
+            resource_name,
+            paths["kube_config"],
+        )
+
+        self.log.debug("scaling: {}".format(command))
+
+        if atomic:
+            # exec helm in a task
+            exec_task = asyncio.ensure_future(
+                coro_or_future=self._local_async_exec(
+                    command=command, raise_exception_on_error=False, env=env
+                )
+            )
+            # write status in another task
+            status_task = asyncio.ensure_future(
+                coro_or_future=self._store_status(
+                    cluster_id=cluster_uuid,
+                    kdu_instance=kdu_instance,
+                    namespace=instance_info["namespace"],
+                    db_dict=db_dict,
+                    operation="scale",
+                    run_once=False,
+                )
+            )
+
+            # wait for execution task
+            await asyncio.wait([exec_task])
+
+            # cancel status task
+            status_task.cancel()
+            output, rc = exec_task.result()
+
+        else:
+            output, rc = await self._local_async_exec(
+                command=command, raise_exception_on_error=False, env=env
+            )
+
+        # write final status
+        await self._store_status(
+            cluster_id=cluster_uuid,
+            kdu_instance=kdu_instance,
+            namespace=instance_info["namespace"],
+            db_dict=db_dict,
+            operation="scale",
+            run_once=True,
+            check_every=0,
+        )
+
+        if rc != 0:
+            msg = "Error executing command: {}\nOutput: {}".format(command, output)
+            self.log.error(msg)
+            raise K8sException(msg)
+
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_uuid)
+
+        return True
+
+    async def get_scale_count(
+        self,
+        resource_name: str,
+        kdu_instance: str,
+        cluster_uuid: str,
+        kdu_model: str,
+        **kwargs,
+    ) -> int:
+        """Get a resource scale count.
+
+        Args:
+            cluster_uuid: The UUID of the cluster
+            resource_name: Resource name
+            kdu_instance: KDU instance name
+            kdu_model: The name or path of a bundle
+            kwargs: Additional parameters
+
+        Returns:
+            Resource instance count
+        """
+
+        self.log.debug(
+            "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
+        )
+
+        # look for instance to obtain namespace
+        instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+        if not instance_info:
+            raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+        # init env, paths
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+
+        replicas = await self._get_replica_count_instance(
+            kdu_instance, instance_info["namespace"], paths["kube_config"]
+        )
+
+        # Get default value if scale count is not found from provided values
+        if not replicas:
+            repo_url = await self._find_repo(kdu_model, cluster_uuid)
+            if not repo_url:
+                raise K8sException(
+                    "Repository not found for kdu_model {}".format(kdu_model)
+                )
+
+            replicas, _ = await self._get_replica_count_url(
+                kdu_model, repo_url, resource_name
+            )
+
+        if not replicas:
+            msg = "Replica count not found. Cannot be scaled"
+            self.log.error(msg)
+            raise K8sException(msg)
+
+        return int(replicas)
+
     async def rollback(
         self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
     ):
-
-        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
         self.log.debug(
             "rollback kdu_instance {} to revision {} from cluster {}".format(
-                kdu_instance, revision, cluster_id
+                kdu_instance, revision, cluster_uuid
             )
         )
 
         # sync local dir
-        self.fs.sync(from_path=cluster_id)
+        self.fs.sync(from_path=cluster_uuid)
 
         # look for instance to obtain namespace
         instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
@@ -505,11 +762,15 @@ class K8sHelmBaseConnector(K8sConnector):
 
         # init env, paths
         paths, env = self._init_paths_env(
-            cluster_name=cluster_id, create_if_not_exist=True
+            cluster_name=cluster_uuid, create_if_not_exist=True
         )
 
-        command = self._get_rollback_command(kdu_instance, instance_info["namespace"],
-                                             revision)
+        # sync local dir
+        self.fs.sync(from_path=cluster_uuid)
+
+        command = self._get_rollback_command(
+            kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
+        )
 
         self.log.debug("rolling_back: {}".format(command))
 
@@ -522,7 +783,7 @@ class K8sHelmBaseConnector(K8sConnector):
         # write status in another task
         status_task = asyncio.ensure_future(
             coro_or_future=self._store_status(
-                cluster_id=cluster_id,
+                cluster_id=cluster_uuid,
                 kdu_instance=kdu_instance,
                 namespace=instance_info["namespace"],
                 db_dict=db_dict,
@@ -541,7 +802,7 @@ class K8sHelmBaseConnector(K8sConnector):
 
         # write final status
         await self._store_status(
-            cluster_id=cluster_id,
+            cluster_id=cluster_uuid,
             kdu_instance=kdu_instance,
             namespace=instance_info["namespace"],
             db_dict=db_dict,
@@ -556,7 +817,7 @@ class K8sHelmBaseConnector(K8sConnector):
             raise K8sException(msg)
 
         # sync fs
-        self.fs.reverse_sync(from_path=cluster_id)
+        self.fs.reverse_sync(from_path=cluster_uuid)
 
         # return new revision number
         instance = await self.get_instance_info(
@@ -569,7 +830,7 @@ class K8sHelmBaseConnector(K8sConnector):
         else:
             return 0
 
-    async def uninstall(self, cluster_uuid: str, kdu_instance: str):
+    async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
         """
         Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
         (this call should happen after all _terminate-config-primitive_ of the VNF
@@ -577,36 +838,41 @@ class K8sHelmBaseConnector(K8sConnector):
 
         :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
         :param kdu_instance: unique name for the KDU instance to be deleted
+        :param kwargs: Additional parameters (None yet)
         :return: True if successful
         """
 
-        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
         self.log.debug(
             "uninstall kdu_instance {} from cluster {}".format(
-                kdu_instance, cluster_id
+                kdu_instance, cluster_uuid
             )
         )
 
         # sync local dir
-        self.fs.sync(from_path=cluster_id)
+        self.fs.sync(from_path=cluster_uuid)
 
         # look for instance to obtain namespace
         instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
         if not instance_info:
-            raise K8sException("kdu_instance {} not found".format(kdu_instance))
-
+            self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
+            return True
         # init env, paths
         paths, env = self._init_paths_env(
-            cluster_name=cluster_id, create_if_not_exist=True
+            cluster_name=cluster_uuid, create_if_not_exist=True
         )
 
-        command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
+        # sync local dir
+        self.fs.sync(from_path=cluster_uuid)
+
+        command = self._get_uninstall_command(
+            kdu_instance, instance_info["namespace"], paths["kube_config"]
+        )
         output, _rc = await self._local_async_exec(
             command=command, raise_exception_on_error=True, env=env
         )
 
         # sync fs
-        self.fs.reverse_sync(from_path=cluster_id)
+        self.fs.reverse_sync(from_path=cluster_uuid)
 
         return self._output_to_table(output)
 
@@ -618,17 +884,16 @@ class K8sHelmBaseConnector(K8sConnector):
         :return:
         """
 
-        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
-        self.log.debug("list releases for cluster {}".format(cluster_id))
+        self.log.debug("list releases for cluster {}".format(cluster_uuid))
 
         # sync local dir
-        self.fs.sync(from_path=cluster_id)
+        self.fs.sync(from_path=cluster_uuid)
 
         # execute internal command
-        result = await self._instances_list(cluster_id)
+        result = await self._instances_list(cluster_uuid)
 
         # sync fs
-        self.fs.reverse_sync(from_path=cluster_id)
+        self.fs.reverse_sync(from_path=cluster_uuid)
 
         return result
 
@@ -648,6 +913,7 @@ class K8sHelmBaseConnector(K8sConnector):
         timeout: float = 300,
         params: dict = None,
         db_dict: dict = None,
+        **kwargs,
     ) -> str:
         """Exec primitive (Juju action)
 
@@ -657,6 +923,7 @@ class K8sHelmBaseConnector(K8sConnector):
         :param timeout: Timeout for action execution
         :param params: Dictionary of all the parameters needed for the action
         :db_dict: Dictionary for any additional data
+        :param kwargs: Additional parameters (None yet)
 
         :return: Returns the output of the action
         """
@@ -665,10 +932,9 @@ class K8sHelmBaseConnector(K8sConnector):
             "different from rollback, upgrade and status"
         )
 
-    async def get_services(self,
-                           cluster_uuid: str,
-                           kdu_instance: str,
-                           namespace: str) -> list:
+    async def get_services(
+        self, cluster_uuid: str, kdu_instance: str, namespace: str
+    ) -> list:
         """
         Returns a list of services defined for the specified kdu instance.
 
@@ -685,87 +951,131 @@ class K8sHelmBaseConnector(K8sConnector):
         - `external_ip` List of external ips (in case they are available)
         """
 
-        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
         self.log.debug(
             "get_services: cluster_uuid: {}, kdu_instance: {}".format(
                 cluster_uuid, kdu_instance
             )
         )
 
+        # init env, paths
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+
         # sync local dir
-        self.fs.sync(from_path=cluster_id)
+        self.fs.sync(from_path=cluster_uuid)
 
         # get list of services names for kdu
-        service_names = await self._get_services(cluster_id, kdu_instance, namespace)
+        service_names = await self._get_services(
+            cluster_uuid, kdu_instance, namespace, paths["kube_config"]
+        )
 
         service_list = []
         for service in service_names:
-            service = await self._get_service(cluster_id, service, namespace)
+            service = await self._get_service(cluster_uuid, service, namespace)
             service_list.append(service)
 
         # sync fs
-        self.fs.reverse_sync(from_path=cluster_id)
+        self.fs.reverse_sync(from_path=cluster_uuid)
 
         return service_list
 
-    async def get_service(self,
-                          cluster_uuid: str,
-                          service_name: str,
-                          namespace: str) -> object:
+    async def get_service(
+        self, cluster_uuid: str, service_name: str, namespace: str
+    ) -> object:
 
         self.log.debug(
             "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
-                service_name, namespace, cluster_uuid)
+                service_name, namespace, cluster_uuid
+            )
         )
 
-        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
-
         # sync local dir
-        self.fs.sync(from_path=cluster_id)
+        self.fs.sync(from_path=cluster_uuid)
 
-        service = await self._get_service(cluster_id, service_name, namespace)
+        service = await self._get_service(cluster_uuid, service_name, namespace)
 
         # sync fs
-        self.fs.reverse_sync(from_path=cluster_id)
+        self.fs.reverse_sync(from_path=cluster_uuid)
 
         return service
 
-    async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
+    async def status_kdu(
+        self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
+    ) -> Union[str, dict]:
+        """
+        This call would retrieve tha current state of a given KDU instance. It would be
+        would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
+        values_ of the configuration parameters applied to a given instance. This call
+        would be based on the `status` call.
+
+        :param cluster_uuid: UUID of a K8s cluster known by OSM
+        :param kdu_instance: unique name for the KDU instance
+        :param kwargs: Additional parameters (None yet)
+        :param yaml_format: if the return shall be returned as an YAML string or as a
+                                dictionary
+        :return: If successful, it will return the following vector of arguments:
+        - K8s `namespace` in the cluster where the KDU lives
+        - `state` of the KDU instance. It can be:
+              - UNKNOWN
+              - DEPLOYED
+              - DELETED
+              - SUPERSEDED
+              - FAILED or
+              - DELETING
+        - List of `resources` (objects) that this release consists of, sorted by kind,
+          and the status of those resources
+        - Last `deployment_time`.
 
+        """
         self.log.debug(
             "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
                 cluster_uuid, kdu_instance
             )
         )
 
-        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
-
         # sync local dir
-        self.fs.sync(from_path=cluster_id)
+        self.fs.sync(from_path=cluster_uuid)
 
         # get instance: needed to obtain namespace
-        instances = await self._instances_list(cluster_id=cluster_id)
+        instances = await self._instances_list(cluster_id=cluster_uuid)
         for instance in instances:
             if instance.get("name") == kdu_instance:
                 break
         else:
             # instance does not exist
-            raise K8sException("Instance name: {} not found in cluster: {}".format(
-                kdu_instance, cluster_id))
+            raise K8sException(
+                "Instance name: {} not found in cluster: {}".format(
+                    kdu_instance, cluster_uuid
+                )
+            )
 
         status = await self._status_kdu(
-            cluster_id=cluster_id,
+            cluster_id=cluster_uuid,
             kdu_instance=kdu_instance,
             namespace=instance["namespace"],
+            yaml_format=yaml_format,
             show_error_log=True,
-            return_text=True,
         )
 
         # sync fs
-        self.fs.reverse_sync(from_path=cluster_id)
+        self.fs.reverse_sync(from_path=cluster_uuid)
 
         return status
 
+    async def get_values_kdu(
+        self, kdu_instance: str, namespace: str, kubeconfig: str
+    ) -> str:
+
+        self.log.debug("get kdu_instance values {}".format(kdu_instance))
+
+        return await self._exec_get_command(
+            get_command="values",
+            kdu_instance=kdu_instance,
+            namespace=namespace,
+            kubeconfig=kubeconfig,
+        )
+
     async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
 
         self.log.debug(
@@ -774,7 +1084,7 @@ class K8sHelmBaseConnector(K8sConnector):
             )
         )
 
-        return await self._exec_inspect_comand(
+        return await self._exec_inspect_command(
             inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
         )
 
@@ -784,7 +1094,7 @@ class K8sHelmBaseConnector(K8sConnector):
             "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
         )
 
-        return await self._exec_inspect_comand(
+        return await self._exec_inspect_command(
             inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
         )
 
@@ -810,14 +1120,29 @@ class K8sHelmBaseConnector(K8sConnector):
                     repo_id = db_repo.get("_id")
                     if curr_repo_url != db_repo["url"]:
                         if curr_repo_url:
-                            self.log.debug("repo {} url changed, delete and and again".format(
-                                db_repo["url"]))
+                            self.log.debug(
+                                "repo {} url changed, delete and and again".format(
+                                    db_repo["url"]
+                                )
+                            )
                             await self.repo_remove(cluster_uuid, db_repo["name"])
                             deleted_repo_list.append(repo_id)
 
                         # add repo
                         self.log.debug("add repo {}".format(db_repo["name"]))
-                        await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
+                        if "ca_cert" in db_repo:
+                            await self.repo_add(
+                                cluster_uuid,
+                                db_repo["name"],
+                                db_repo["url"],
+                                cert=db_repo["ca_cert"],
+                            )
+                        else:
+                            await self.repo_add(
+                                cluster_uuid,
+                                db_repo["name"],
+                                db_repo["url"],
+                            )
                         added_repo_dict[repo_id] = db_repo["name"]
                 except Exception as e:
                     raise K8sException(
@@ -886,51 +1211,102 @@ class K8sHelmBaseConnector(K8sConnector):
         """
 
     @abc.abstractmethod
-    async def _get_services(self, cluster_id, kdu_instance, namespace):
+    async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
         """
         Implements the helm version dependent method to obtain services from a helm instance
         """
 
     @abc.abstractmethod
-    async def _status_kdu(self, cluster_id: str, kdu_instance: str, namespace: str = None,
-                          show_error_log: bool = False, return_text: bool = False):
+    async def _status_kdu(
+        self,
+        cluster_id: str,
+        kdu_instance: str,
+        namespace: str = None,
+        yaml_format: bool = False,
+        show_error_log: bool = False,
+    ) -> Union[str, dict]:
         """
         Implements the helm version dependent method to obtain status of a helm instance
         """
 
     @abc.abstractmethod
-    def _get_install_command(self, kdu_model, kdu_instance, namespace,
-                             params_str, version, atomic, timeout) -> str:
+    def _get_install_command(
+        self,
+        kdu_model,
+        kdu_instance,
+        namespace,
+        params_str,
+        version,
+        atomic,
+        timeout,
+        kubeconfig,
+    ) -> str:
         """
         Obtain command to be executed to delete the indicated instance
         """
 
     @abc.abstractmethod
-    def _get_upgrade_command(self, kdu_model, kdu_instance, namespace,
-                             params_str, version, atomic, timeout) -> str:
+    def _get_upgrade_scale_command(
+        self,
+        kdu_model,
+        kdu_instance,
+        namespace,
+        count,
+        version,
+        atomic,
+        replicas,
+        timeout,
+        resource_name,
+        kubeconfig,
+    ) -> str:
+        """Obtain command to be executed to upgrade the indicated instance."""
+
+    @abc.abstractmethod
+    def _get_upgrade_command(
+        self,
+        kdu_model,
+        kdu_instance,
+        namespace,
+        params_str,
+        version,
+        atomic,
+        timeout,
+        kubeconfig,
+    ) -> str:
         """
         Obtain command to be executed to upgrade the indicated instance
         """
 
     @abc.abstractmethod
-    def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
+    def _get_rollback_command(
+        self, kdu_instance, namespace, revision, kubeconfig
+    ) -> str:
         """
         Obtain command to be executed to rollback the indicated instance
         """
 
     @abc.abstractmethod
-    def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
+    def _get_uninstall_command(
+        self, kdu_instance: str, namespace: str, kubeconfig: str
+    ) -> str:
         """
         Obtain command to be executed to delete the indicated instance
         """
 
     @abc.abstractmethod
-    def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
-                             version: str):
+    def _get_inspect_command(
+        self, show_command: str, kdu_model: str, repo_str: str, version: str
+    ):
         """
         Obtain command to be executed to obtain information about the kdu
         """
 
+    @abc.abstractmethod
+    def _get_get_command(
+        self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
+    ):
+        """Obtain command to be executed to get information about the kdu instance."""
+
     @abc.abstractmethod
     async def _uninstall_sw(self, cluster_id: str, namespace: str):
         """
@@ -1042,38 +1418,25 @@ class K8sHelmBaseConnector(K8sConnector):
         of dictionaries
         """
         new_list = []
-        for dictionary in input_list:
-            new_dict = dict((k.lower(), v) for k, v in dictionary.items())
-            new_list.append(new_dict)
+        if input_list:
+            for dictionary in input_list:
+                new_dict = dict((k.lower(), v) for k, v in dictionary.items())
+                new_list.append(new_dict)
         return new_list
 
-    def _local_exec(self, command: str) -> (str, int):
-        command = self._remove_multiple_spaces(command)
-        self.log.debug("Executing sync local command: {}".format(command))
-        # raise exception if fails
-        output = ""
-        try:
-            output = subprocess.check_output(
-                command, shell=True, universal_newlines=True
-            )
-            return_code = 0
-            self.log.debug(output)
-        except Exception:
-            return_code = 1
-
-        return output, return_code
-
     async def _local_async_exec(
         self,
         command: str,
         raise_exception_on_error: bool = False,
         show_error_log: bool = True,
         encode_utf8: bool = False,
-        env: dict = None
+        env: dict = None,
     ) -> (str, int):
 
         command = K8sHelmBaseConnector._remove_multiple_spaces(command)
-        self.log.debug("Executing async local command: {}, env: {}".format(command, env))
+        self.log.debug(
+            "Executing async local command: {}, env: {}".format(command, env)
+        )
 
         # split command
         command = shlex.split(command)
@@ -1084,8 +1447,10 @@ class K8sHelmBaseConnector(K8sConnector):
 
         try:
             process = await asyncio.create_subprocess_exec(
-                *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
-                env=environ
+                *command,
+                stdout=asyncio.subprocess.PIPE,
+                stderr=asyncio.subprocess.PIPE,
+                env=environ,
             )
 
             # wait for command terminate
@@ -1129,18 +1494,22 @@ class K8sHelmBaseConnector(K8sConnector):
             else:
                 return "", -1
 
-    async def _local_async_exec_pipe(self,
-                                     command1: str,
-                                     command2: str,
-                                     raise_exception_on_error: bool = True,
-                                     show_error_log: bool = True,
-                                     encode_utf8: bool = False,
-                                     env: dict = None):
+    async def _local_async_exec_pipe(
+        self,
+        command1: str,
+        command2: str,
+        raise_exception_on_error: bool = True,
+        show_error_log: bool = True,
+        encode_utf8: bool = False,
+        env: dict = None,
+    ):
 
         command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
         command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
         command = "{} | {}".format(command1, command2)
-        self.log.debug("Executing async local command: {}, env: {}".format(command, env))
+        self.log.debug(
+            "Executing async local command: {}, env: {}".format(command, env)
+        )
 
         # split command
         command1 = shlex.split(command1)
@@ -1154,9 +1523,9 @@ class K8sHelmBaseConnector(K8sConnector):
             read, write = os.pipe()
             await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
             os.close(write)
-            process_2 = await asyncio.create_subprocess_exec(*command2, stdin=read,
-                                                             stdout=asyncio.subprocess.PIPE,
-                                                             env=environ)
+            process_2 = await asyncio.create_subprocess_exec(
+                *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
+            )
             os.close(read)
             stdout, stderr = await process_2.communicate()
 
@@ -1232,7 +1601,7 @@ class K8sHelmBaseConnector(K8sConnector):
             "name": service_name,
             "type": self._get_deep(data, ("spec", "type")),
             "ports": self._get_deep(data, ("spec", "ports")),
-            "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
+            "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
         }
         if service["type"] == "LoadBalancer":
             ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
@@ -1241,12 +1610,23 @@ class K8sHelmBaseConnector(K8sConnector):
 
         return service
 
-    async def _exec_inspect_comand(
+    async def _exec_get_command(
+        self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
+    ):
+        """Obtains information about the kdu instance."""
+
+        full_command = self._get_get_command(
+            get_command, kdu_instance, namespace, kubeconfig
+        )
+
+        output, _rc = await self._local_async_exec(command=full_command)
+
+        return output
+
+    async def _exec_inspect_command(
         self, inspect_command: str, kdu_model: str, repo_url: str = None
     ):
-        """
-        Obtains information about a kdu, no cluster (no env)
-        """
+        """Obtains information about a kdu, no cluster (no env)."""
 
         repo_str = ""
         if repo_url:
@@ -1257,20 +1637,141 @@ class K8sHelmBaseConnector(K8sConnector):
             idx += 1
             kdu_model = kdu_model[idx:]
 
-        version = ""
-        if ":" in kdu_model:
-            parts = kdu_model.split(sep=":")
-            if len(parts) == 2:
-                version = "--version {}".format(str(parts[1]))
-                kdu_model = parts[0]
+        kdu_model, version = self._split_version(kdu_model)
+        if version:
+            version_str = "--version {}".format(version)
+        else:
+            version_str = ""
 
-        full_command = self._get_inspect_command(inspect_command, kdu_model, repo_str, version)
-        output, _rc = await self._local_async_exec(
-            command=full_command, encode_utf8=True
+        full_command = self._get_inspect_command(
+            inspect_command, kdu_model, repo_str, version_str
         )
 
+        output, _rc = await self._local_async_exec(command=full_command)
+
         return output
 
+    async def _get_replica_count_url(
+        self,
+        kdu_model: str,
+        repo_url: str,
+        resource_name: str = None,
+    ):
+        """Get the replica count value in the Helm Chart Values.
+
+        Args:
+            kdu_model: The name or path of a bundle
+            repo_url: Helm Chart repository url
+            resource_name: Resource name
+
+        Returns:
+            True if replicas, False replicaCount
+        """
+
+        kdu_values = yaml.load(
+            await self.values_kdu(kdu_model, repo_url), Loader=yaml.SafeLoader
+        )
+
+        if not kdu_values:
+            raise K8sException(
+                "kdu_values not found for kdu_model {}".format(kdu_model)
+            )
+
+        if resource_name:
+            kdu_values = kdu_values.get(resource_name, None)
+
+        if not kdu_values:
+            msg = "resource {} not found in the values in model {}".format(
+                resource_name, kdu_model
+            )
+            self.log.error(msg)
+            raise K8sException(msg)
+
+        duplicate_check = False
+
+        replica_str = ""
+        replicas = None
+
+        if kdu_values.get("replicaCount", None):
+            replicas = kdu_values["replicaCount"]
+            replica_str = "replicaCount"
+        elif kdu_values.get("replicas", None):
+            duplicate_check = True
+            replicas = kdu_values["replicas"]
+            replica_str = "replicas"
+        else:
+            if resource_name:
+                msg = (
+                    "replicaCount or replicas not found in the resource"
+                    "{} values in model {}. Cannot be scaled".format(
+                        resource_name, kdu_model
+                    )
+                )
+            else:
+                msg = (
+                    "replicaCount or replicas not found in the values"
+                    "in model {}. Cannot be scaled".format(kdu_model)
+                )
+            self.log.error(msg)
+            raise K8sException(msg)
+
+        # Control if replicas and replicaCount exists at the same time
+        msg = "replicaCount and replicas are exists at the same time"
+        if duplicate_check:
+            if "replicaCount" in kdu_values:
+                self.log.error(msg)
+                raise K8sException(msg)
+        else:
+            if "replicas" in kdu_values:
+                self.log.error(msg)
+                raise K8sException(msg)
+
+        return replicas, replica_str
+
+    async def _get_replica_count_instance(
+        self,
+        kdu_instance: str,
+        namespace: str,
+        kubeconfig: str,
+        resource_name: str = None,
+    ):
+        """Get the replica count value in the instance.
+
+        Args:
+            kdu_instance: The name of the KDU instance
+            namespace: KDU instance namespace
+            kubeconfig:
+            resource_name: Resource name
+
+        Returns:
+            True if replicas, False replicaCount
+        """
+
+        kdu_values = yaml.load(
+            await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
+            Loader=yaml.SafeLoader,
+        )
+
+        replicas = None
+
+        if kdu_values:
+            resource_values = (
+                kdu_values.get(resource_name, None) if resource_name else None
+            )
+            replicas = (
+                (
+                    resource_values.get("replicaCount", None)
+                    or resource_values.get("replicas", None)
+                )
+                if resource_values
+                else (
+                    kdu_values.get("replicaCount", None)
+                    or kdu_values.get("replicas", None)
+                )
+            )
+
+        return replicas
+
     async def _store_status(
         self,
         cluster_id: str,
@@ -1285,11 +1786,13 @@ class K8sHelmBaseConnector(K8sConnector):
             try:
                 await asyncio.sleep(check_every)
                 detailed_status = await self._status_kdu(
-                    cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=namespace,
-                    return_text=False
+                    cluster_id=cluster_id,
+                    kdu_instance=kdu_instance,
+                    yaml_format=False,
+                    namespace=namespace,
                 )
                 status = detailed_status.get("info").get("description")
-                self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
+                self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
                 # write status to db
                 result = await self.write_app_status_to_db(
                     db_dict=db_dict,
@@ -1304,7 +1807,9 @@ class K8sHelmBaseConnector(K8sConnector):
                 self.log.debug("Task cancelled")
                 return
             except Exception as e:
-                self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
+                self.log.debug(
+                    "_store_status exception: {}".format(str(e)), exc_info=True
+                )
                 pass
             finally:
                 if run_once:
@@ -1315,9 +1820,7 @@ class K8sHelmBaseConnector(K8sConnector):
     def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
 
         if params and len(params) > 0:
-            self._init_paths_env(
-                cluster_name=cluster_id, create_if_not_exist=True
-            )
+            self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
 
             def get_random_number():
                 r = random.randrange(start=1, stop=99999999)
@@ -1364,11 +1867,11 @@ class K8sHelmBaseConnector(K8sConnector):
         # check embeded chart (file or dir)
         if chart_name.startswith("/"):
             # extract file or directory name
-            chart_name = chart_name[chart_name.rfind("/") + 1:]
+            chart_name = chart_name[chart_name.rfind("/") + 1 :]
         # check URL
         elif "://" in chart_name:
             # extract last portion of URL
-            chart_name = chart_name[chart_name.rfind("/") + 1:]
+            chart_name = chart_name[chart_name.rfind("/") + 1 :]
 
         name = ""
         for c in chart_name:
@@ -1393,3 +1896,23 @@ class K8sHelmBaseConnector(K8sConnector):
 
         name = name + get_random_number()
         return name.lower()
+
+    def _split_version(self, kdu_model: str) -> (str, str):
+        version = None
+        if ":" in kdu_model:
+            parts = kdu_model.split(sep=":")
+            if len(parts) == 2:
+                version = str(parts[1])
+                kdu_model = parts[0]
+        return kdu_model, version
+
+    async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
+        repo_url = None
+        idx = kdu_model.find("/")
+        if idx >= 0:
+            repo_name = kdu_model[:idx]
+            # Find repository link
+            local_repo_list = await self.repo_list(cluster_uuid)
+            for repo in local_repo_list:
+                repo_url = repo["url"] if repo["name"] == repo_name else None
+        return repo_url