Coverity-CWE 330: Use of Insufficiently Random Values
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
index f0a049b..a897e0e 100644 (file)
@@ -34,6 +34,7 @@ from uuid import uuid4
 from n2vc.config import EnvironConfig
 from n2vc.exceptions import K8sException
 from n2vc.k8s_conn import K8sConnector
+from n2vc.kubectl import Kubectl
 
 
 class K8sHelmBaseConnector(K8sConnector):
@@ -90,6 +91,9 @@ class K8sHelmBaseConnector(K8sConnector):
         if self._stable_repo_url == "None":
             self._stable_repo_url = None
 
+        # Lock to avoid concurrent execution of helm commands
+        self.cmd_lock = asyncio.Lock()
+
     def _get_namespace(self, cluster_uuid: str) -> str:
         """
         Obtains the namespace used by the cluster with the uuid passed by argument
@@ -401,9 +405,9 @@ class K8sHelmBaseConnector(K8sConnector):
         # version
         kdu_model, version = self._split_version(kdu_model)
 
-        repo = self._split_repo(kdu_model)
+        _, repo = self._split_repo(kdu_model)
         if repo:
-            self.repo_update(cluster_id, repo)
+            await self.repo_update(cluster_id, repo)
 
         command = self._get_install_command(
             kdu_model,
@@ -446,7 +450,6 @@ class K8sHelmBaseConnector(K8sConnector):
             output, rc = exec_task.result()
 
         else:
-
             output, rc = await self._local_async_exec(
                 command=command, raise_exception_on_error=False, env=env
             )
@@ -478,6 +481,8 @@ class K8sHelmBaseConnector(K8sConnector):
         timeout: float = 300,
         params: dict = None,
         db_dict: dict = None,
+        namespace: str = None,
+        force: bool = False,
     ):
         self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
 
@@ -485,9 +490,13 @@ class K8sHelmBaseConnector(K8sConnector):
         self.fs.sync(from_path=cluster_uuid)
 
         # look for instance to obtain namespace
-        instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
-        if not instance_info:
-            raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+        # set namespace
+        if not namespace:
+            instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+            if not instance_info:
+                raise K8sException("kdu_instance {} not found".format(kdu_instance))
+            namespace = instance_info["namespace"]
 
         # init env, paths
         paths, env = self._init_paths_env(
@@ -505,25 +514,25 @@ class K8sHelmBaseConnector(K8sConnector):
         # version
         kdu_model, version = self._split_version(kdu_model)
 
-        repo = self._split_repo(kdu_model)
+        _, repo = self._split_repo(kdu_model)
         if repo:
-            self.repo_update(cluster_uuid, repo)
+            await self.repo_update(cluster_uuid, repo)
 
         command = self._get_upgrade_command(
             kdu_model,
             kdu_instance,
-            instance_info["namespace"],
+            namespace,
             params_str,
             version,
             atomic,
             timeout,
             paths["kube_config"],
+            force,
         )
 
         self.log.debug("upgrading: {}".format(command))
 
         if atomic:
-
             # exec helm in a task
             exec_task = asyncio.ensure_future(
                 coro_or_future=self._local_async_exec(
@@ -535,7 +544,7 @@ class K8sHelmBaseConnector(K8sConnector):
                 coro_or_future=self._store_status(
                     cluster_id=cluster_uuid,
                     kdu_instance=kdu_instance,
-                    namespace=instance_info["namespace"],
+                    namespace=namespace,
                     db_dict=db_dict,
                     operation="upgrade",
                 )
@@ -549,7 +558,6 @@ class K8sHelmBaseConnector(K8sConnector):
             output, rc = exec_task.result()
 
         else:
-
             output, rc = await self._local_async_exec(
                 command=command, raise_exception_on_error=False, env=env
             )
@@ -562,7 +570,7 @@ class K8sHelmBaseConnector(K8sConnector):
         await self._store_status(
             cluster_id=cluster_uuid,
             kdu_instance=kdu_instance,
-            namespace=instance_info["namespace"],
+            namespace=namespace,
             db_dict=db_dict,
             operation="upgrade",
         )
@@ -739,7 +747,7 @@ class K8sHelmBaseConnector(K8sConnector):
             raise K8sException("kdu_instance {} not found".format(kdu_instance))
 
         # init env, paths
-        paths, env = self._init_paths_env(
+        paths, _ = self._init_paths_env(
             cluster_name=cluster_uuid, create_if_not_exist=True
         )
 
@@ -750,8 +758,15 @@ class K8sHelmBaseConnector(K8sConnector):
             resource_name=resource_name,
         )
 
+        self.log.debug(
+            f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
+        )
+
         # Get default value if scale count is not found from provided values
-        if not replicas:
+        # Important note: this piece of code shall only be executed in the first scaling operation,
+        # since it is expected that the _get_replica_count_instance is able to obtain the number of
+        # replicas when a scale operation was already conducted previously for this KDU/resource!
+        if replicas is None:
             repo_url = await self._find_repo(
                 kdu_model=kdu_model, cluster_uuid=cluster_uuid
             )
@@ -759,10 +774,15 @@ class K8sHelmBaseConnector(K8sConnector):
                 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
             )
 
-        if not replicas:
-            msg = "Replica count not found. Cannot be scaled"
-            self.log.error(msg)
-            raise K8sException(msg)
+            self.log.debug(
+                f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
+                f"{resource_name} obtained: {replicas}"
+            )
+
+            if replicas is None:
+                msg = "Replica count not found. Cannot be scaled"
+                self.log.error(msg)
+                raise K8sException(msg)
 
         return int(replicas)
 
@@ -1025,7 +1045,6 @@ class K8sHelmBaseConnector(K8sConnector):
     async def get_service(
         self, cluster_uuid: str, service_name: str, namespace: str
     ) -> object:
-
         self.log.debug(
             "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
                 service_name, namespace, cluster_uuid
@@ -1108,7 +1127,6 @@ class K8sHelmBaseConnector(K8sConnector):
     async def get_values_kdu(
         self, kdu_instance: str, namespace: str, kubeconfig: str
     ) -> str:
-
         self.log.debug("get kdu_instance values {}".format(kdu_instance))
 
         return await self._exec_get_command(
@@ -1140,7 +1158,6 @@ class K8sHelmBaseConnector(K8sConnector):
         )
 
     async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
         self.log.debug(
             "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
         )
@@ -1150,7 +1167,6 @@ class K8sHelmBaseConnector(K8sConnector):
         )
 
     async def synchronize_repos(self, cluster_uuid: str):
-
         self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
         try:
             db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
@@ -1310,7 +1326,24 @@ class K8sHelmBaseConnector(K8sConnector):
         resource_name,
         kubeconfig,
     ) -> str:
-        """Obtain command to be executed to upgrade the indicated instance."""
+        """Generates the command to scale a Helm Chart release
+
+        Args:
+            kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
+            kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
+            namespace (str): Namespace where this KDU instance is deployed
+            scale (int): Scale count
+            version (str): Constraint with specific version of the Chart to use
+            atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
+                The --wait flag will be set automatically if --atomic is used
+            replica_str (str): The key under resource_name key where the scale count is stored
+            timeout (float): The time, in seconds, to wait
+            resource_name (str): The KDU's resource to scale
+            kubeconfig (str): Kubeconfig file path
+
+        Returns:
+            str: command to scale a Helm Chart release
+        """
 
     @abc.abstractmethod
     def _get_upgrade_command(
@@ -1323,9 +1356,23 @@ class K8sHelmBaseConnector(K8sConnector):
         atomic,
         timeout,
         kubeconfig,
+        force,
     ) -> str:
-        """
-        Obtain command to be executed to upgrade the indicated instance
+        """Generates the command to upgrade a Helm Chart release
+
+        Args:
+            kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
+            kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
+            namespace (str): Namespace where this KDU instance is deployed
+            params_str (str): Params used to upgrade the Helm Chart release
+            version (str): Constraint with specific version of the Chart to use
+            atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
+                The --wait flag will be set automatically if --atomic is used
+            timeout (float): The time, in seconds, to wait
+            kubeconfig (str): Kubeconfig file path
+            force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
+        Returns:
+            str: command to upgrade a Helm Chart release
         """
 
     @abc.abstractmethod
@@ -1492,7 +1539,6 @@ class K8sHelmBaseConnector(K8sConnector):
         encode_utf8: bool = False,
         env: dict = None,
     ) -> (str, int):
-
         command = K8sHelmBaseConnector._remove_multiple_spaces(command)
         self.log.debug(
             "Executing async local command: {}, env: {}".format(command, env)
@@ -1506,17 +1552,18 @@ class K8sHelmBaseConnector(K8sConnector):
             environ.update(env)
 
         try:
-            process = await asyncio.create_subprocess_exec(
-                *command,
-                stdout=asyncio.subprocess.PIPE,
-                stderr=asyncio.subprocess.PIPE,
-                env=environ,
-            )
+            async with self.cmd_lock:
+                process = await asyncio.create_subprocess_exec(
+                    *command,
+                    stdout=asyncio.subprocess.PIPE,
+                    stderr=asyncio.subprocess.PIPE,
+                    env=environ,
+                )
 
-            # wait for command terminate
-            stdout, stderr = await process.communicate()
+                # wait for command terminate
+                stdout, stderr = await process.communicate()
 
-            return_code = process.returncode
+                return_code = process.returncode
 
             output = ""
             if stdout:
@@ -1543,6 +1590,9 @@ class K8sHelmBaseConnector(K8sConnector):
             return output, return_code
 
         except asyncio.CancelledError:
+            # first, kill the process if it is still running
+            if process.returncode is None:
+                process.kill()
             raise
         except K8sException:
             raise
@@ -1563,7 +1613,6 @@ class K8sHelmBaseConnector(K8sConnector):
         encode_utf8: bool = False,
         env: dict = None,
     ):
-
         command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
         command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
         command = "{} | {}".format(command1, command2)
@@ -1580,16 +1629,19 @@ class K8sHelmBaseConnector(K8sConnector):
             environ.update(env)
 
         try:
-            read, write = os.pipe()
-            await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
-            os.close(write)
-            process_2 = await asyncio.create_subprocess_exec(
-                *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
-            )
-            os.close(read)
-            stdout, stderr = await process_2.communicate()
+            async with self.cmd_lock:
+                read, write = os.pipe()
+                process_1 = await asyncio.create_subprocess_exec(
+                    *command1, stdout=write, env=environ
+                )
+                os.close(write)
+                process_2 = await asyncio.create_subprocess_exec(
+                    *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
+                )
+                os.close(read)
+                stdout, stderr = await process_2.communicate()
 
-            return_code = process_2.returncode
+                return_code = process_2.returncode
 
             output = ""
             if stdout:
@@ -1615,6 +1667,10 @@ class K8sHelmBaseConnector(K8sConnector):
 
             return output, return_code
         except asyncio.CancelledError:
+            # first, kill the processes if they are still running
+            for process in (process_1, process_2):
+                if process.returncode is None:
+                    process.kill()
             raise
         except K8sException:
             raise
@@ -1701,10 +1757,8 @@ class K8sHelmBaseConnector(K8sConnector):
         if repo_url:
             repo_str = " --repo {}".format(repo_url)
 
-        idx = kdu_model.find("/")
-        if idx >= 0:
-            idx += 1
-            kdu_model = kdu_model[idx:]
+            # Obtain the Chart's name and store it in the var kdu_model
+            kdu_model, _ = self._split_repo(kdu_model=kdu_model)
 
         kdu_model, version = self._split_version(kdu_model)
         if version:
@@ -1713,10 +1767,13 @@ class K8sHelmBaseConnector(K8sConnector):
             version_str = ""
 
         full_command = self._get_inspect_command(
-            inspect_command, kdu_model, repo_str, version_str
+            show_command=inspect_command,
+            kdu_model=kdu_model,
+            repo_str=repo_str,
+            version=version_str,
         )
 
-        output, _rc = await self._local_async_exec(command=full_command)
+        output, _ = await self._local_async_exec(command=full_command)
 
         return output
 
@@ -1725,7 +1782,7 @@ class K8sHelmBaseConnector(K8sConnector):
         kdu_model: str,
         repo_url: str = None,
         resource_name: str = None,
-    ):
+    ) -> (int, str):
         """Get the replica count value in the Helm Chart Values.
 
         Args:
@@ -1734,7 +1791,9 @@ class K8sHelmBaseConnector(K8sConnector):
             resource_name: Resource name
 
         Returns:
-            True if replicas, False replicaCount
+            A tuple with:
+            - The number of replicas of the specific instance; if not found, returns None; and
+            - The string corresponding to the replica count key in the Helm values
         """
 
         kdu_values = yaml.load(
@@ -1742,6 +1801,8 @@ class K8sHelmBaseConnector(K8sConnector):
             Loader=yaml.SafeLoader,
         )
 
+        self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
+
         if not kdu_values:
             raise K8sException(
                 "kdu_values not found for kdu_model {}".format(kdu_model)
@@ -1762,10 +1823,10 @@ class K8sHelmBaseConnector(K8sConnector):
         replica_str = ""
         replicas = None
 
-        if kdu_values.get("replicaCount", None):
+        if kdu_values.get("replicaCount") is not None:
             replicas = kdu_values["replicaCount"]
             replica_str = "replicaCount"
-        elif kdu_values.get("replicas", None):
+        elif kdu_values.get("replicas") is not None:
             duplicate_check = True
             replicas = kdu_values["replicas"]
             replica_str = "replicas"
@@ -1804,7 +1865,7 @@ class K8sHelmBaseConnector(K8sConnector):
         namespace: str,
         kubeconfig: str,
         resource_name: str = None,
-    ):
+    ) -> int:
         """Get the replica count value in the instance.
 
         Args:
@@ -1814,7 +1875,7 @@ class K8sHelmBaseConnector(K8sConnector):
             resource_name: Resource name
 
         Returns:
-            True if replicas, False replicaCount
+            The number of replicas of the specific instance; if not found, returns None
         """
 
         kdu_values = yaml.load(
@@ -1822,23 +1883,23 @@ class K8sHelmBaseConnector(K8sConnector):
             Loader=yaml.SafeLoader,
         )
 
+        self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
+
         replicas = None
 
         if kdu_values:
             resource_values = (
                 kdu_values.get(resource_name, None) if resource_name else None
             )
-            replicas = (
-                (
-                    resource_values.get("replicaCount", None)
-                    or resource_values.get("replicas", None)
-                )
-                if resource_values
-                else (
-                    kdu_values.get("replicaCount", None)
-                    or kdu_values.get("replicas", None)
-                )
-            )
+
+            for replica_str in ("replicaCount", "replicas"):
+                if resource_values:
+                    replicas = resource_values.get(replica_str)
+                else:
+                    replicas = kdu_values.get(replica_str)
+
+                if replicas is not None:
+                    break
 
         return replicas
 
@@ -1897,12 +1958,11 @@ class K8sHelmBaseConnector(K8sConnector):
     # params for use in -f file
     # returns values file option and filename (in order to delete it at the end)
     def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
-
         if params and len(params) > 0:
             self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
 
             def get_random_number():
-                r = random.randrange(start=1, stop=99999999)
+                r = random.SystemRandom().randint(1, 99999999)
                 s = str(r)
                 while len(s) < 10:
                     s = "0" + s
@@ -1968,7 +2028,7 @@ class K8sHelmBaseConnector(K8sConnector):
         name += "-"
 
         def get_random_number():
-            r = random.randrange(start=1, stop=99999999)
+            r = random.SystemRandom().randint(1, 99999999)
             s = str(r)
             s = s.rjust(10, "0")
             return s
@@ -1985,12 +2045,27 @@ class K8sHelmBaseConnector(K8sConnector):
                 kdu_model = parts[0]
         return kdu_model, version
 
-    async def _split_repo(self, kdu_model: str) -> str:
+    def _split_repo(self, kdu_model: str) -> (str, str):
+        """Obtain the Helm Chart's repository and Chart's names from the KDU model
+
+        Args:
+            kdu_model (str): Associated KDU model
+
+        Returns:
+            (str, str): Tuple with the Chart name in index 0, and the repo name
+                        in index 2; if there was a problem finding them, return None
+                        for both
+        """
+
+        chart_name = None
         repo_name = None
+
         idx = kdu_model.find("/")
         if idx >= 0:
+            chart_name = kdu_model[idx + 1 :]
             repo_name = kdu_model[:idx]
-        return repo_name
+
+        return chart_name, repo_name
 
     async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
         """Obtain the Helm repository for an Helm Chart
@@ -2003,12 +2078,160 @@ class K8sHelmBaseConnector(K8sConnector):
             str: the repository URL; if Helm Chart is a local one, the function returns None
         """
 
+        _, repo_name = self._split_repo(kdu_model=kdu_model)
+
         repo_url = None
-        idx = kdu_model.find("/")
-        if idx >= 0:
-            repo_name = kdu_model[:idx]
+        if repo_name:
             # Find repository link
             local_repo_list = await self.repo_list(cluster_uuid)
             for repo in local_repo_list:
-                repo_url = repo["url"] if repo["name"] == repo_name else None
+                if repo["name"] == repo_name:
+                    repo_url = repo["url"]
+                    break  # it is not necessary to continue the loop if the repo link was found...
+
         return repo_url
+
+    async def create_certificate(
+        self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
+    ):
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        await kubectl.create_certificate(
+            namespace=namespace,
+            name=name,
+            dns_prefix=dns_prefix,
+            secret_name=secret_name,
+            usages=[usage],
+            issuer_name="ca-issuer",
+        )
+
+    async def delete_certificate(self, cluster_uuid, namespace, certificate_name):
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        await kubectl.delete_certificate(namespace, certificate_name)
+
+    async def create_namespace(
+        self,
+        namespace,
+        cluster_uuid,
+        labels,
+    ):
+        """
+        Create a namespace in a specific cluster
+
+        :param namespace:    Namespace to be created
+        :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+        :param labels:       Dictionary with labels for the new namespace
+        :returns: None
+        """
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        await kubectl.create_namespace(
+            name=namespace,
+            labels=labels,
+        )
+
+    async def delete_namespace(
+        self,
+        namespace,
+        cluster_uuid,
+    ):
+        """
+        Delete a namespace in a specific cluster
+
+        :param namespace: namespace to be deleted
+        :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+        :returns: None
+        """
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        await kubectl.delete_namespace(
+            name=namespace,
+        )
+
+    async def copy_secret_data(
+        self,
+        src_secret: str,
+        dst_secret: str,
+        cluster_uuid: str,
+        data_key: str,
+        src_namespace: str = "osm",
+        dst_namespace: str = "osm",
+    ):
+        """
+        Copy a single key and value from an existing secret to a new one
+
+        :param src_secret: name of the existing secret
+        :param dst_secret: name of the new secret
+        :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+        :param data_key: key of the existing secret to be copied
+        :param src_namespace: Namespace of the existing secret
+        :param dst_namespace: Namespace of the new secret
+        :returns: None
+        """
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        secret_data = await kubectl.get_secret_content(
+            name=src_secret,
+            namespace=src_namespace,
+        )
+        # Only the corresponding data_key value needs to be copy
+        data = {data_key: secret_data.get(data_key)}
+        await kubectl.create_secret(
+            name=dst_secret,
+            data=data,
+            namespace=dst_namespace,
+            secret_type="Opaque",
+        )
+
+    async def setup_default_rbac(
+        self,
+        name,
+        namespace,
+        cluster_uuid,
+        api_groups,
+        resources,
+        verbs,
+        service_account,
+    ):
+        """
+        Create a basic RBAC for a new namespace.
+
+        :param name: name of both Role and Role Binding
+        :param namespace: K8s namespace
+        :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+        :param api_groups: Api groups to be allowed in Policy Rule
+        :param resources: Resources to be allowed in Policy Rule
+        :param verbs: Verbs to be allowed in Policy Rule
+        :param service_account: Service Account name used to bind the Role
+        :returns: None
+        """
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_uuid, create_if_not_exist=True
+        )
+        kubectl = Kubectl(config_file=paths["kube_config"])
+        await kubectl.create_role(
+            name=name,
+            labels={},
+            namespace=namespace,
+            api_groups=api_groups,
+            resources=resources,
+            verbs=verbs,
+        )
+        await kubectl.create_role_binding(
+            name=name,
+            labels={},
+            namespace=namespace,
+            role_name=name,
+            sa_name=service_account,
+        )