Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
index c20b55d..5f004b3 100644 (file)
@@ -22,6 +22,7 @@
 import abc
 import asyncio
 from typing import Union
+from shlex import quote
 import random
 import time
 import shlex
@@ -30,10 +31,12 @@ import stat
 import os
 import yaml
 from uuid import uuid4
+from urllib.parse import urlparse
 
 from n2vc.config import EnvironConfig
 from n2vc.exceptions import K8sException
 from n2vc.k8s_conn import K8sConnector
+from n2vc.kubectl import Kubectl
 
 
 class K8sHelmBaseConnector(K8sConnector):
@@ -112,7 +115,7 @@ class K8sHelmBaseConnector(K8sConnector):
         namespace: str = "kube-system",
         reuse_cluster_uuid=None,
         **kwargs,
-    ) -> (str, bool):
+    ) -> tuple[str, bool]:
         """
         It prepares a given K8s cluster environment to run Charts
 
@@ -163,6 +166,7 @@ class K8sHelmBaseConnector(K8sConnector):
         cert: str = None,
         user: str = None,
         password: str = None,
+        oci: bool = False,
     ):
         self.log.debug(
             "Cluster {}, adding {} repository {}. URL: {}".format(
@@ -178,10 +182,23 @@ class K8sHelmBaseConnector(K8sConnector):
         # sync local dir
         self.fs.sync(from_path=cluster_uuid)
 
-        # helm repo add name url
-        command = ("env KUBECONFIG={} {} repo add {} {}").format(
-            paths["kube_config"], self._helm_command, name, url
-        )
+        if oci:
+            if user and password:
+                host_port = urlparse(url).netloc if url.startswith("oci://") else url
+                # helm registry login url
+                command = "env KUBECONFIG={} {} registry login {}".format(
+                    paths["kube_config"], self._helm_command, quote(host_port)
+                )
+            else:
+                self.log.debug(
+                    "OCI registry login is not needed for repo: {}".format(name)
+                )
+                return
+        else:
+            # helm repo add name url
+            command = "env KUBECONFIG={} {} repo add {} {}".format(
+                paths["kube_config"], self._helm_command, quote(name), quote(url)
+            )
 
         if cert:
             temp_cert_file = os.path.join(
@@ -190,27 +207,28 @@ class K8sHelmBaseConnector(K8sConnector):
             os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
             with open(temp_cert_file, "w") as the_cert:
                 the_cert.write(cert)
-            command += " --ca-file {}".format(temp_cert_file)
+            command += " --ca-file {}".format(quote(temp_cert_file))
 
         if user:
-            command += " --username={}".format(user)
+            command += " --username={}".format(quote(user))
 
         if password:
-            command += " --password={}".format(password)
+            command += " --password={}".format(quote(password))
 
         self.log.debug("adding repo: {}".format(command))
         await self._local_async_exec(
             command=command, raise_exception_on_error=True, env=env
         )
 
-        # helm repo update
-        command = "env KUBECONFIG={} {} repo update {}".format(
-            paths["kube_config"], self._helm_command, name
-        )
-        self.log.debug("updating repo: {}".format(command))
-        await self._local_async_exec(
-            command=command, raise_exception_on_error=False, env=env
-        )
+        if not oci:
+            # helm repo update
+            command = "env KUBECONFIG={} {} repo update {}".format(
+                paths["kube_config"], self._helm_command, quote(name)
+            )
+            self.log.debug("updating repo: {}".format(command))
+            await self._local_async_exec(
+                command=command, raise_exception_on_error=False, env=env
+            )
 
         # sync fs
         self.fs.reverse_sync(from_path=cluster_uuid)
@@ -231,7 +249,7 @@ class K8sHelmBaseConnector(K8sConnector):
         self.fs.sync(from_path=cluster_uuid)
 
         # helm repo update
-        command = "{} repo update {}".format(self._helm_command, name)
+        command = "{} repo update {}".format(self._helm_command, quote(name))
         self.log.debug("updating repo: {}".format(command))
         await self._local_async_exec(
             command=command, raise_exception_on_error=False, env=env
@@ -293,7 +311,7 @@ class K8sHelmBaseConnector(K8sConnector):
         self.fs.sync(from_path=cluster_uuid)
 
         command = "env KUBECONFIG={} {} repo remove {}".format(
-            paths["kube_config"], self._helm_command, name
+            paths["kube_config"], self._helm_command, quote(name)
         )
         await self._local_async_exec(
             command=command, raise_exception_on_error=True, env=env
@@ -377,6 +395,11 @@ class K8sHelmBaseConnector(K8sConnector):
     def _is_helm_chart_a_file(self, chart_name: str):
         return chart_name.count("/") > 1
 
+    @staticmethod
+    def _is_helm_chart_a_url(chart_name: str):
+        result = urlparse(chart_name)
+        return all([result.scheme, result.netloc])
+
     async def _install_impl(
         self,
         cluster_id: str,
@@ -401,12 +424,7 @@ class K8sHelmBaseConnector(K8sConnector):
             cluster_id=cluster_id, params=params
         )
 
-        # version
-        kdu_model, version = self._split_version(kdu_model)
-
-        repo = self._split_repo(kdu_model)
-        if repo:
-            await self.repo_update(cluster_id, repo)
+        kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_id)
 
         command = self._get_install_command(
             kdu_model,
@@ -449,7 +467,6 @@ class K8sHelmBaseConnector(K8sConnector):
             output, rc = exec_task.result()
 
         else:
-
             output, rc = await self._local_async_exec(
                 command=command, raise_exception_on_error=False, env=env
             )
@@ -481,6 +498,8 @@ class K8sHelmBaseConnector(K8sConnector):
         timeout: float = 300,
         params: dict = None,
         db_dict: dict = None,
+        namespace: str = None,
+        force: bool = False,
     ):
         self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
 
@@ -488,9 +507,13 @@ class K8sHelmBaseConnector(K8sConnector):
         self.fs.sync(from_path=cluster_uuid)
 
         # look for instance to obtain namespace
-        instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
-        if not instance_info:
-            raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+        # set namespace
+        if not namespace:
+            instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+            if not instance_info:
+                raise K8sException("kdu_instance {} not found".format(kdu_instance))
+            namespace = instance_info["namespace"]
 
         # init env, paths
         paths, env = self._init_paths_env(
@@ -505,28 +528,23 @@ class K8sHelmBaseConnector(K8sConnector):
             cluster_id=cluster_uuid, params=params
         )
 
-        # version
-        kdu_model, version = self._split_version(kdu_model)
-
-        repo = self._split_repo(kdu_model)
-        if repo:
-            await self.repo_update(cluster_uuid, repo)
+        kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
 
         command = self._get_upgrade_command(
             kdu_model,
             kdu_instance,
-            instance_info["namespace"],
+            namespace,
             params_str,
             version,
             atomic,
             timeout,
             paths["kube_config"],
+            force,
         )
 
         self.log.debug("upgrading: {}".format(command))
 
         if atomic:
-
             # exec helm in a task
             exec_task = asyncio.ensure_future(
                 coro_or_future=self._local_async_exec(
@@ -538,7 +556,7 @@ class K8sHelmBaseConnector(K8sConnector):
                 coro_or_future=self._store_status(
                     cluster_id=cluster_uuid,
                     kdu_instance=kdu_instance,
-                    namespace=instance_info["namespace"],
+                    namespace=namespace,
                     db_dict=db_dict,
                     operation="upgrade",
                 )
@@ -552,7 +570,6 @@ class K8sHelmBaseConnector(K8sConnector):
             output, rc = exec_task.result()
 
         else:
-
             output, rc = await self._local_async_exec(
                 command=command, raise_exception_on_error=False, env=env
             )
@@ -565,7 +582,7 @@ class K8sHelmBaseConnector(K8sConnector):
         await self._store_status(
             cluster_id=cluster_uuid,
             kdu_instance=kdu_instance,
-            namespace=instance_info["namespace"],
+            namespace=namespace,
             db_dict=db_dict,
             operation="upgrade",
         )
@@ -639,7 +656,7 @@ class K8sHelmBaseConnector(K8sConnector):
         )
 
         # version
-        kdu_model, version = self._split_version(kdu_model)
+        kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
 
         repo_url = await self._find_repo(kdu_model, cluster_uuid)
 
@@ -742,7 +759,7 @@ class K8sHelmBaseConnector(K8sConnector):
             raise K8sException("kdu_instance {} not found".format(kdu_instance))
 
         # init env, paths
-        paths, env = self._init_paths_env(
+        paths, _ = self._init_paths_env(
             cluster_name=cluster_uuid, create_if_not_exist=True
         )
 
@@ -753,8 +770,15 @@ class K8sHelmBaseConnector(K8sConnector):
             resource_name=resource_name,
         )
 
+        self.log.debug(
+            f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
+        )
+
         # Get default value if scale count is not found from provided values
-        if not replicas:
+        # Important note: this piece of code shall only be executed in the first scaling operation,
+        # since it is expected that the _get_replica_count_instance is able to obtain the number of
+        # replicas when a scale operation was already conducted previously for this KDU/resource!
+        if replicas is None:
             repo_url = await self._find_repo(
                 kdu_model=kdu_model, cluster_uuid=cluster_uuid
             )
@@ -762,10 +786,15 @@ class K8sHelmBaseConnector(K8sConnector):
                 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
             )
 
-        if not replicas:
-            msg = "Replica count not found. Cannot be scaled"
-            self.log.error(msg)
-            raise K8sException(msg)
+            self.log.debug(
+                f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
+                f"{resource_name} obtained: {replicas}"
+            )
+
+            if replicas is None:
+                msg = "Replica count not found. Cannot be scaled"
+                self.log.error(msg)
+                raise K8sException(msg)
 
         return int(replicas)
 
@@ -1028,7 +1057,6 @@ class K8sHelmBaseConnector(K8sConnector):
     async def get_service(
         self, cluster_uuid: str, service_name: str, namespace: str
     ) -> object:
-
         self.log.debug(
             "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
                 service_name, namespace, cluster_uuid
@@ -1111,7 +1139,6 @@ class K8sHelmBaseConnector(K8sConnector):
     async def get_values_kdu(
         self, kdu_instance: str, namespace: str, kubeconfig: str
     ) -> str:
-
         self.log.debug("get kdu_instance values {}".format(kdu_instance))
 
         return await self._exec_get_command(
@@ -1143,7 +1170,6 @@ class K8sHelmBaseConnector(K8sConnector):
         )
 
     async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
         self.log.debug(
             "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
         )
@@ -1153,7 +1179,6 @@ class K8sHelmBaseConnector(K8sConnector):
         )
 
     async def synchronize_repos(self, cluster_uuid: str):
-
         self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
         try:
             db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
@@ -1184,19 +1209,15 @@ class K8sHelmBaseConnector(K8sConnector):
 
                         # add repo
                         self.log.debug("add repo {}".format(db_repo["name"]))
-                        if "ca_cert" in db_repo:
-                            await self.repo_add(
-                                cluster_uuid,
-                                db_repo["name"],
-                                db_repo["url"],
-                                cert=db_repo["ca_cert"],
-                            )
-                        else:
-                            await self.repo_add(
-                                cluster_uuid,
-                                db_repo["name"],
-                                db_repo["url"],
-                            )
+                        await self.repo_add(
+                            cluster_uuid,
+                            db_repo["name"],
+                            db_repo["url"],
+                            cert=db_repo.get("ca_cert"),
+                            user=db_repo.get("user"),
+                            password=db_repo.get("password"),
+                            oci=db_repo.get("oci", False),
+                        )
                         added_repo_dict[repo_id] = db_repo["name"]
                 except Exception as e:
                     raise K8sException(
@@ -1313,7 +1334,24 @@ class K8sHelmBaseConnector(K8sConnector):
         resource_name,
         kubeconfig,
     ) -> str:
-        """Obtain command to be executed to upgrade the indicated instance."""
+        """Generates the command to scale a Helm Chart release
+
+        Args:
+            kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
+            kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
+            namespace (str): Namespace where this KDU instance is deployed
+            scale (int): Scale count
+            version (str): Constraint with specific version of the Chart to use
+            atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
+                The --wait flag will be set automatically if --atomic is used
+            replica_str (str): The key under resource_name key where the scale count is stored
+            timeout (float): The time, in seconds, to wait
+            resource_name (str): The KDU's resource to scale
+            kubeconfig (str): Kubeconfig file path
+
+        Returns:
+            str: command to scale a Helm Chart release
+        """
 
     @abc.abstractmethod
     def _get_upgrade_command(
@@ -1326,9 +1364,23 @@ class K8sHelmBaseConnector(K8sConnector):
         atomic,
         timeout,
         kubeconfig,
+        force,
     ) -> str:
-        """
-        Obtain command to be executed to upgrade the indicated instance
+        """Generates the command to upgrade a Helm Chart release
+
+        Args:
+            kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
+            kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
+            namespace (str): Namespace where this KDU instance is deployed
+            params_str (str): Params used to upgrade the Helm Chart release
+            version (str): Constraint with specific version of the Chart to use
+            atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
+                The --wait flag will be set automatically if --atomic is used
+            timeout (float): The time, in seconds, to wait
+            kubeconfig (str): Kubeconfig file path
+            force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
+        Returns:
+            str: command to upgrade a Helm Chart release
         """
 
     @abc.abstractmethod
@@ -1494,8 +1546,7 @@ class K8sHelmBaseConnector(K8sConnector):
         show_error_log: bool = True,
         encode_utf8: bool = False,
         env: dict = None,
-    ) -> (str, int):
-
+    ) -> tuple[str, int]:
         command = K8sHelmBaseConnector._remove_multiple_spaces(command)
         self.log.debug(
             "Executing async local command: {}, env: {}".format(command, env)
@@ -1547,6 +1598,9 @@ class K8sHelmBaseConnector(K8sConnector):
             return output, return_code
 
         except asyncio.CancelledError:
+            # first, kill the process if it is still running
+            if process.returncode is None:
+                process.kill()
             raise
         except K8sException:
             raise
@@ -1567,7 +1621,6 @@ class K8sHelmBaseConnector(K8sConnector):
         encode_utf8: bool = False,
         env: dict = None,
     ):
-
         command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
         command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
         command = "{} | {}".format(command1, command2)
@@ -1586,7 +1639,7 @@ class K8sHelmBaseConnector(K8sConnector):
         try:
             async with self.cmd_lock:
                 read, write = os.pipe()
-                await asyncio.create_subprocess_exec(
+                process_1 = await asyncio.create_subprocess_exec(
                     *command1, stdout=write, env=environ
                 )
                 os.close(write)
@@ -1622,6 +1675,10 @@ class K8sHelmBaseConnector(K8sConnector):
 
             return output, return_code
         except asyncio.CancelledError:
+            # first, kill the processes if they are still running
+            for process in (process_1, process_2):
+                if process.returncode is None:
+                    process.kill()
             raise
         except K8sException:
             raise
@@ -1655,7 +1712,10 @@ class K8sHelmBaseConnector(K8sConnector):
         )
 
         command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
-            self.kubectl_command, paths["kube_config"], namespace, service_name
+            self.kubectl_command,
+            paths["kube_config"],
+            quote(namespace),
+            quote(service_name),
         )
 
         output, _rc = await self._local_async_exec(
@@ -1706,24 +1766,25 @@ class K8sHelmBaseConnector(K8sConnector):
 
         repo_str = ""
         if repo_url:
-            repo_str = " --repo {}".format(repo_url)
+            repo_str = " --repo {}".format(quote(repo_url))
 
-        idx = kdu_model.find("/")
-        if idx >= 0:
-            idx += 1
-            kdu_model = kdu_model[idx:]
+            # Obtain the Chart's name and store it in the var kdu_model
+            kdu_model, _ = self._split_repo(kdu_model=kdu_model)
 
         kdu_model, version = self._split_version(kdu_model)
         if version:
-            version_str = "--version {}".format(version)
+            version_str = "--version {}".format(quote(version))
         else:
             version_str = ""
 
         full_command = self._get_inspect_command(
-            inspect_command, kdu_model, repo_str, version_str
+            show_command=inspect_command,
+            kdu_model=quote(kdu_model),
+            repo_str=repo_str,
+            version=version_str,
         )
 
-        output, _rc = await self._local_async_exec(command=full_command)
+        output, _ = await self._local_async_exec(command=full_command)
 
         return output
 
@@ -1732,7 +1793,7 @@ class K8sHelmBaseConnector(K8sConnector):
         kdu_model: str,
         repo_url: str = None,
         resource_name: str = None,
-    ):
+    ) -> tuple[int, str]:
         """Get the replica count value in the Helm Chart Values.
 
         Args:
@@ -1741,7 +1802,9 @@ class K8sHelmBaseConnector(K8sConnector):
             resource_name: Resource name
 
         Returns:
-            True if replicas, False replicaCount
+            A tuple with:
+            - The number of replicas of the specific instance; if not found, returns None; and
+            - The string corresponding to the replica count key in the Helm values
         """
 
         kdu_values = yaml.load(
@@ -1749,6 +1812,8 @@ class K8sHelmBaseConnector(K8sConnector):
             Loader=yaml.SafeLoader,
         )
 
+        self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
+
         if not kdu_values:
             raise K8sException(
                 "kdu_values not found for kdu_model {}".format(kdu_model)
@@ -1769,10 +1834,10 @@ class K8sHelmBaseConnector(K8sConnector):
         replica_str = ""
         replicas = None
 
-        if kdu_values.get("replicaCount", None):
+        if kdu_values.get("replicaCount") is not None:
             replicas = kdu_values["replicaCount"]
             replica_str = "replicaCount"
-        elif kdu_values.get("replicas", None):
+        elif kdu_values.get("replicas") is not None:
             duplicate_check = True
             replicas = kdu_values["replicas"]
             replica_str = "replicas"
@@ -1811,7 +1876,7 @@ class K8sHelmBaseConnector(K8sConnector):
         namespace: str,
         kubeconfig: str,
         resource_name: str = None,
-    ):
+    ) -> int:
         """Get the replica count value in the instance.
 
         Args:
@@ -1821,7 +1886,7 @@ class K8sHelmBaseConnector(K8sConnector):
             resource_name: Resource name
 
         Returns:
-            True if replicas, False replicaCount
+            The number of replicas of the specific instance; if not found, returns None
         """
 
         kdu_values = yaml.load(
@@ -1829,23 +1894,23 @@ class K8sHelmBaseConnector(K8sConnector):
             Loader=yaml.SafeLoader,
         )
 
+        self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
+
         replicas = None
 
         if kdu_values:
             resource_values = (
                 kdu_values.get(resource_name, None) if resource_name else None
             )
-            replicas = (
-                (
-                    resource_values.get("replicaCount", None)
-                    or resource_values.get("replicas", None)
-                )
-                if resource_values
-                else (
-                    kdu_values.get("replicaCount", None)
-                    or kdu_values.get("replicas", None)
-                )
-            )
+
+            for replica_str in ("replicaCount", "replicas"):
+                if resource_values:
+                    replicas = resource_values.get(replica_str)
+                else:
+                    replicas = kdu_values.get(replica_str)
+
+                if replicas is not None:
+                    break
 
         return replicas
 
@@ -1903,13 +1968,12 @@ class K8sHelmBaseConnector(K8sConnector):
 
     # params for use in -f file
     # returns values file option and filename (in order to delete it at the end)
-    def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
-
+    def _params_to_file_option(self, cluster_id: str, params: dict) -> tuple[str, str]:
         if params and len(params) > 0:
             self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
 
             def get_random_number():
-                r = random.randrange(start=1, stop=99999999)
+                r = random.SystemRandom().randint(1, 99999999)
                 s = str(r)
                 while len(s) < 10:
                     s = "0" + s
@@ -1933,19 +1997,14 @@ class K8sHelmBaseConnector(K8sConnector):
     # params for use in --set option
     @staticmethod
     def _params_to_set_option(params: dict) -> str:
-        params_str = ""
-        if params and len(params) > 0:
-            start = True
-            for key in params:
-                value = params.get(key, None)
-                if value is not None:
-                    if start:
-                        params_str += "--set "
-                        start = False
-                    else:
-                        params_str += ","
-                    params_str += "{}={}".format(key, value)
-        return params_str
+        pairs = [
+            f"{quote(str(key))}={quote(str(value))}"
+            for key, value in params.items()
+            if value is not None
+        ]
+        if not pairs:
+            return ""
+        return "--set " + ",".join(pairs)
 
     @staticmethod
     def generate_kdu_instance_name(**kwargs):
@@ -1975,7 +2034,7 @@ class K8sHelmBaseConnector(K8sConnector):
         name += "-"
 
         def get_random_number():
-            r = random.randrange(start=1, stop=99999999)
+            r = random.SystemRandom().randint(1, 99999999)
             s = str(r)
             s = s.rjust(10, "0")
             return s
@@ -1983,21 +2042,42 @@ class K8sHelmBaseConnector(K8sConnector):
         name = name + get_random_number()
         return name.lower()
 
-    def _split_version(self, kdu_model: str) -> (str, str):
+    def _split_version(self, kdu_model: str) -> tuple[str, str]:
         version = None
-        if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
+        if (
+            not (
+                self._is_helm_chart_a_file(kdu_model)
+                or self._is_helm_chart_a_url(kdu_model)
+            )
+            and ":" in kdu_model
+        ):
             parts = kdu_model.split(sep=":")
             if len(parts) == 2:
                 version = str(parts[1])
                 kdu_model = parts[0]
         return kdu_model, version
 
-    def _split_repo(self, kdu_model: str) -> str:
+    def _split_repo(self, kdu_model: str) -> tuple[str, str]:
+        """Obtain the Helm Chart's repository and Chart's names from the KDU model
+
+        Args:
+            kdu_model (str): Associated KDU model
+
+        Returns:
+            (str, str): Tuple with the Chart name in index 0, and the repo name
+                        in index 2; if there was a problem finding them, return None
+                        for both
+        """
+
+        chart_name = None
         repo_name = None
+
         idx = kdu_model.find("/")
-        if idx >= 0:
+        if not self._is_helm_chart_a_url(kdu_model) and idx >= 0:
+            chart_name = kdu_model[idx + 1 :]
             repo_name = kdu_model[:idx]
-        return repo_name
+
+        return chart_name, repo_name
 
     async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
         """Obtain the Helm repository for an Helm Chart
@@ -2010,12 +2090,178 @@ class K8sHelmBaseConnector(K8sConnector):
             str: the repository URL; if Helm Chart is a local one, the function returns None
         """
 
+        _, repo_name = self._split_repo(kdu_model=kdu_model)
+
         repo_url = None
-        idx = kdu_model.find("/")
-        if idx >= 0:
-            repo_name = kdu_model[:idx]
+        if repo_name:
             # Find repository link
             local_repo_list = await self.repo_list(cluster_uuid)
             for repo in local_repo_list:
-                repo_url = repo["url"] if repo["name"] == repo_name else None
+                if repo["name"] == repo_name:
+                    repo_url = repo["url"]
+                    break  # it is not necessary to continue the loop if the repo link was found...
+
         return repo_url
+
+    def _repo_to_oci_url(self, repo):
+        db_repo = self.db.get_one("k8srepos", {"name": repo}, fail_on_empty=False)
+        if db_repo and "oci" in db_repo:
+            return db_repo.get("url")
+
+    async def _prepare_helm_chart(self, kdu_model, cluster_id):
+        # e.g.: "stable/openldap", "1.0"
+        kdu_model, version = self._split_version(kdu_model)
+        # e.g.: "openldap, stable"
+        chart_name, repo = self._split_repo(kdu_model)
+        if repo and chart_name:  # repo/chart case
+            oci_url = self._repo_to_oci_url(repo)
+            if oci_url:  # oci does not require helm repo update
+                kdu_model = f"{oci_url.rstrip('/')}/{chart_name.lstrip('/')}"  # urljoin doesn't work for oci schema
+            else:
+                await self.repo_update(cluster_id, repo)
+        return kdu_model, version
+
+    async def create_certificate(
+        self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
+    ):
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        await kubectl.create_certificate(
+            namespace=namespace,
+            name=name,
+            dns_prefix=dns_prefix,
+            secret_name=secret_name,
+            usages=[usage],
+            issuer_name="ca-issuer",
+        )
+
+    async def delete_certificate(self, cluster_uuid, namespace, certificate_name):
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        await kubectl.delete_certificate(namespace, certificate_name)
+
+    async def create_namespace(
+        self,
+        namespace,
+        cluster_uuid,
+        labels,
+    ):
+        """
+        Create a namespace in a specific cluster
+
+        :param namespace:    Namespace to be created
+        :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+        :param labels:       Dictionary with labels for the new namespace
+        :returns: None
+        """
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        await kubectl.create_namespace(
+            name=namespace,
+            labels=labels,
+        )
+
+    async def delete_namespace(
+        self,
+        namespace,
+        cluster_uuid,
+    ):
+        """
+        Delete a namespace in a specific cluster
+
+        :param namespace: namespace to be deleted
+        :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+        :returns: None
+        """
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        await kubectl.delete_namespace(
+            name=namespace,
+        )
+
+    async def copy_secret_data(
+        self,
+        src_secret: str,
+        dst_secret: str,
+        cluster_uuid: str,
+        data_key: str,
+        src_namespace: str = "osm",
+        dst_namespace: str = "osm",
+    ):
+        """
+        Copy a single key and value from an existing secret to a new one
+
+        :param src_secret: name of the existing secret
+        :param dst_secret: name of the new secret
+        :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+        :param data_key: key of the existing secret to be copied
+        :param src_namespace: Namespace of the existing secret
+        :param dst_namespace: Namespace of the new secret
+        :returns: None
+        """
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        secret_data = await kubectl.get_secret_content(
+            name=src_secret,
+            namespace=src_namespace,
+        )
+        # Only the corresponding data_key value needs to be copy
+        data = {data_key: secret_data.get(data_key)}
+        await kubectl.create_secret(
+            name=dst_secret,
+            data=data,
+            namespace=dst_namespace,
+            secret_type="Opaque",
+        )
+
+    async def setup_default_rbac(
+        self,
+        name,
+        namespace,
+        cluster_uuid,
+        api_groups,
+        resources,
+        verbs,
+        service_account,
+    ):
+        """
+        Create a basic RBAC for a new namespace.
+
+        :param name: name of both Role and Role Binding
+        :param namespace: K8s namespace
+        :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+        :param api_groups: Api groups to be allowed in Policy Rule
+        :param resources: Resources to be allowed in Policy Rule
+        :param verbs: Verbs to be allowed in Policy Rule
+        :param service_account: Service Account name used to bind the Role
+        :returns: None
+        """
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        await kubectl.create_role(
+            name=name,
+            labels={},
+            namespace=namespace,
+            api_groups=api_groups,
+            resources=resources,
+            verbs=verbs,
+        )
+        await kubectl.create_role_binding(
+            name=name,
+            labels={},
+            namespace=namespace,
+            role_name=name,
+            sa_name=service_account,
+        )