Revert "Revert "Feature 11002: Deprecate helmv2""
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
index 5588c3d..383ce7d 100644 (file)
@@ -22,6 +22,7 @@
 import abc
 import asyncio
 from typing import Union
+from shlex import quote
 import random
 import time
 import shlex
@@ -113,7 +114,7 @@ class K8sHelmBaseConnector(K8sConnector):
         namespace: str = "kube-system",
         reuse_cluster_uuid=None,
         **kwargs,
-    ) -> (str, bool):
+    ) -> tuple[str, bool]:
         """
         It prepares a given K8s cluster environment to run Charts
 
@@ -181,7 +182,7 @@ class K8sHelmBaseConnector(K8sConnector):
 
         # helm repo add name url
         command = ("env KUBECONFIG={} {} repo add {} {}").format(
-            paths["kube_config"], self._helm_command, name, url
+            paths["kube_config"], self._helm_command, quote(name), quote(url)
         )
 
         if cert:
@@ -191,13 +192,13 @@ class K8sHelmBaseConnector(K8sConnector):
             os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
             with open(temp_cert_file, "w") as the_cert:
                 the_cert.write(cert)
-            command += " --ca-file {}".format(temp_cert_file)
+            command += " --ca-file {}".format(quote(temp_cert_file))
 
         if user:
-            command += " --username={}".format(user)
+            command += " --username={}".format(quote(user))
 
         if password:
-            command += " --password={}".format(password)
+            command += " --password={}".format(quote(password))
 
         self.log.debug("adding repo: {}".format(command))
         await self._local_async_exec(
@@ -206,7 +207,7 @@ class K8sHelmBaseConnector(K8sConnector):
 
         # helm repo update
         command = "env KUBECONFIG={} {} repo update {}".format(
-            paths["kube_config"], self._helm_command, name
+            paths["kube_config"], self._helm_command, quote(name)
         )
         self.log.debug("updating repo: {}".format(command))
         await self._local_async_exec(
@@ -232,7 +233,7 @@ class K8sHelmBaseConnector(K8sConnector):
         self.fs.sync(from_path=cluster_uuid)
 
         # helm repo update
-        command = "{} repo update {}".format(self._helm_command, name)
+        command = "{} repo update {}".format(self._helm_command, quote(name))
         self.log.debug("updating repo: {}".format(command))
         await self._local_async_exec(
             command=command, raise_exception_on_error=False, env=env
@@ -294,7 +295,7 @@ class K8sHelmBaseConnector(K8sConnector):
         self.fs.sync(from_path=cluster_uuid)
 
         command = "env KUBECONFIG={} {} repo remove {}".format(
-            paths["kube_config"], self._helm_command, name
+            paths["kube_config"], self._helm_command, quote(name)
         )
         await self._local_async_exec(
             command=command, raise_exception_on_error=True, env=env
@@ -1538,7 +1539,7 @@ class K8sHelmBaseConnector(K8sConnector):
         show_error_log: bool = True,
         encode_utf8: bool = False,
         env: dict = None,
-    ) -> (str, int):
+    ) -> tuple[str, int]:
         command = K8sHelmBaseConnector._remove_multiple_spaces(command)
         self.log.debug(
             "Executing async local command: {}, env: {}".format(command, env)
@@ -1704,7 +1705,10 @@ class K8sHelmBaseConnector(K8sConnector):
         )
 
         command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
-            self.kubectl_command, paths["kube_config"], namespace, service_name
+            self.kubectl_command,
+            paths["kube_config"],
+            quote(namespace),
+            quote(service_name),
         )
 
         output, _rc = await self._local_async_exec(
@@ -1755,20 +1759,20 @@ class K8sHelmBaseConnector(K8sConnector):
 
         repo_str = ""
         if repo_url:
-            repo_str = " --repo {}".format(repo_url)
+            repo_str = " --repo {}".format(quote(repo_url))
 
             # Obtain the Chart's name and store it in the var kdu_model
             kdu_model, _ = self._split_repo(kdu_model=kdu_model)
 
         kdu_model, version = self._split_version(kdu_model)
         if version:
-            version_str = "--version {}".format(version)
+            version_str = "--version {}".format(quote(version))
         else:
             version_str = ""
 
         full_command = self._get_inspect_command(
             show_command=inspect_command,
-            kdu_model=kdu_model,
+            kdu_model=quote(kdu_model),
             repo_str=repo_str,
             version=version_str,
         )
@@ -1782,7 +1786,7 @@ class K8sHelmBaseConnector(K8sConnector):
         kdu_model: str,
         repo_url: str = None,
         resource_name: str = None,
-    ) -> (int, str):
+    ) -> tuple[int, str]:
         """Get the replica count value in the Helm Chart Values.
 
         Args:
@@ -1957,12 +1961,12 @@ class K8sHelmBaseConnector(K8sConnector):
 
     # params for use in -f file
     # returns values file option and filename (in order to delete it at the end)
-    def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
+    def _params_to_file_option(self, cluster_id: str, params: dict) -> tuple[str, str]:
         if params and len(params) > 0:
             self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
 
             def get_random_number():
-                r = random.randrange(start=1, stop=99999999)
+                r = random.SystemRandom().randint(1, 99999999)
                 s = str(r)
                 while len(s) < 10:
                     s = "0" + s
@@ -1986,19 +1990,14 @@ class K8sHelmBaseConnector(K8sConnector):
     # params for use in --set option
     @staticmethod
     def _params_to_set_option(params: dict) -> str:
-        params_str = ""
-        if params and len(params) > 0:
-            start = True
-            for key in params:
-                value = params.get(key, None)
-                if value is not None:
-                    if start:
-                        params_str += "--set "
-                        start = False
-                    else:
-                        params_str += ","
-                    params_str += "{}={}".format(key, value)
-        return params_str
+        pairs = [
+            f"{quote(str(key))}={quote(str(value))}"
+            for key, value in params.items()
+            if value is not None
+        ]
+        if not pairs:
+            return ""
+        return "--set " + ",".join(pairs)
 
     @staticmethod
     def generate_kdu_instance_name(**kwargs):
@@ -2028,7 +2027,7 @@ class K8sHelmBaseConnector(K8sConnector):
         name += "-"
 
         def get_random_number():
-            r = random.randrange(start=1, stop=99999999)
+            r = random.SystemRandom().randint(1, 99999999)
             s = str(r)
             s = s.rjust(10, "0")
             return s
@@ -2036,7 +2035,7 @@ class K8sHelmBaseConnector(K8sConnector):
         name = name + get_random_number()
         return name.lower()
 
-    def _split_version(self, kdu_model: str) -> (str, str):
+    def _split_version(self, kdu_model: str) -> tuple[str, str]:
         version = None
         if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
             parts = kdu_model.split(sep=":")
@@ -2045,7 +2044,7 @@ class K8sHelmBaseConnector(K8sConnector):
                 kdu_model = parts[0]
         return kdu_model, version
 
-    def _split_repo(self, kdu_model: str) -> (str, str):
+    def _split_repo(self, kdu_model: str) -> tuple[str, str]:
         """Obtain the Helm Chart's repository and Chart's names from the KDU model
 
         Args:
@@ -2113,3 +2112,125 @@ class K8sHelmBaseConnector(K8sConnector):
         )
         kubectl = Kubectl(config_file=paths["kube_config"])
         await kubectl.delete_certificate(namespace, certificate_name)
+
+    async def create_namespace(
+        self,
+        namespace,
+        cluster_uuid,
+        labels,
+    ):
+        """
+        Create a namespace in a specific cluster
+
+        :param namespace:    Namespace to be created
+        :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+        :param labels:       Dictionary with labels for the new namespace
+        :returns: None
+        """
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        await kubectl.create_namespace(
+            name=namespace,
+            labels=labels,
+        )
+
+    async def delete_namespace(
+        self,
+        namespace,
+        cluster_uuid,
+    ):
+        """
+        Delete a namespace in a specific cluster
+
+        :param namespace: namespace to be deleted
+        :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+        :returns: None
+        """
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        await kubectl.delete_namespace(
+            name=namespace,
+        )
+
+    async def copy_secret_data(
+        self,
+        src_secret: str,
+        dst_secret: str,
+        cluster_uuid: str,
+        data_key: str,
+        src_namespace: str = "osm",
+        dst_namespace: str = "osm",
+    ):
+        """
+        Copy a single key and value from an existing secret to a new one
+
+        :param src_secret: name of the existing secret
+        :param dst_secret: name of the new secret
+        :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+        :param data_key: key of the existing secret to be copied
+        :param src_namespace: Namespace of the existing secret
+        :param dst_namespace: Namespace of the new secret
+        :returns: None
+        """
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        secret_data = await kubectl.get_secret_content(
+            name=src_secret,
+            namespace=src_namespace,
+        )
+        # Only the corresponding data_key value needs to be copy
+        data = {data_key: secret_data.get(data_key)}
+        await kubectl.create_secret(
+            name=dst_secret,
+            data=data,
+            namespace=dst_namespace,
+            secret_type="Opaque",
+        )
+
+    async def setup_default_rbac(
+        self,
+        name,
+        namespace,
+        cluster_uuid,
+        api_groups,
+        resources,
+        verbs,
+        service_account,
+    ):
+        """
+        Create a basic RBAC for a new namespace.
+
+        :param name: name of both Role and Role Binding
+        :param namespace: K8s namespace
+        :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+        :param api_groups: Api groups to be allowed in Policy Rule
+        :param resources: Resources to be allowed in Policy Rule
+        :param verbs: Verbs to be allowed in Policy Rule
+        :param service_account: Service Account name used to bind the Role
+        :returns: None
+        """
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        await kubectl.create_role(
+            name=name,
+            labels={},
+            namespace=namespace,
+            api_groups=api_groups,
+            resources=resources,
+            verbs=verbs,
+        )
+        await kubectl.create_role_binding(
+            name=name,
+            labels={},
+            namespace=namespace,
+            role_name=name,
+            sa_name=service_account,
+        )