Feature 10886 manual scaling for KNF (helm chart) deployment 60/11460/6
authoraktas <emin.aktas@ulakhaberlesme.com.tr>
Tue, 19 Oct 2021 15:26:13 +0000 (18:26 +0300)
committeraktas <emin.aktas@ulakhaberlesme.com.tr>
Tue, 4 Jan 2022 15:35:52 +0000 (18:35 +0300)
Change-Id: Ibdcabd4e3f077bf86749f49e8aeb75aec2935ea2
Signed-off-by: aktas <emin.aktas@ulakhaberlesme.com.tr>
n2vc/k8s_conn.py
n2vc/k8s_helm3_conn.py
n2vc/k8s_helm_base_conn.py
n2vc/k8s_helm_conn.py
n2vc/k8s_juju_conn.py
n2vc/tests/unit/test_k8s_helm3_conn.py
n2vc/tests/unit/test_k8s_helm_conn.py

index 89805b3..55b340a 100644 (file)
@@ -215,19 +215,30 @@ class K8sConnector(abc.ABC, Loggable):
         scale: int,
         resource_name: str,
         total_timeout: float = 1800,
+        cluster_uuid: str = None,
+        kdu_model: str = None,
+        atomic: bool = True,
+        db_dict: dict = None,
         **kwargs,
     ) -> bool:
-        """
-        Scales an application in KDU instance.
-
-        :param: kdu_instance str:        KDU instance name
-        :param: scale int:               Scale to which to set this application
-        :param: resource_name str:       Resource name (Application name)
-        :param: timeout float:           The time, in seconds, to wait for the install
-                                         to finish
-        :param kwargs:                   Additional parameters
-
-        :return: If successful, returns True
+        """Scale a resource in a KDU instance.
+
+        Args:
+            kdu_instance: KDU instance name
+            scale: Scale to which to set the resource
+            resource_name: Resource name
+            total_timeout: The time, in seconds, to wait for the install
+                to finish
+            cluster_uuid: The UUID of the cluster
+            kdu_model: The chart/bundle reference
+            atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
+                The --wait flag will be set automatically if --atomic is used
+            db_dict: Dictionary for any additional data
+            kwargs: Additional parameters
+                vca_id (str): VCA ID
+
+        Returns:
+            True if successful, False otherwise
         """
 
     @abc.abstractmethod
@@ -235,16 +246,23 @@ class K8sConnector(abc.ABC, Loggable):
         self,
         resource_name: str,
         kdu_instance: str,
+        cluster_uuid: str,
+        kdu_model: str,
+        timeout: float = 300,
         **kwargs,
     ) -> int:
-        """
-        Get an application scale count.
-
-        :param: resource_name str:       Resource name (Application name)
-        :param: kdu_instance str:        KDU instance name
-        :param kwargs:                   Additional parameters
-
-        :return: Return application instance count
+        """Get a resource scale count in a KDU instance.
+
+        Args:
+            resource_name: Resource name
+            kdu_instance: KDU instance name
+            cluster_uuid: The UUID of the cluster
+            kdu_model:    chart/bundle reference
+            timeout:  The time, in seconds, to wait
+            kwargs: Additional parameters
+
+        Returns:
+            Resource instance count
         """
 
     @abc.abstractmethod
index 5544e3c..4d8df93 100644 (file)
@@ -148,7 +148,7 @@ class K8sHelm3Connector(K8sHelmBaseConnector):
             "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
         )
 
-        return await self._exec_inspect_comand(
+        return await self._exec_inspect_command(
             inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
         )
 
@@ -354,6 +354,16 @@ class K8sHelm3Connector(K8sHelmBaseConnector):
         )
         return inspect_command
 
+    def _get_get_command(
+        self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
+    ):
+        get_command = (
+            "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
+                kubeconfig, self._helm_command, get_command, kdu_instance, namespace
+            )
+        )
+        return get_command
+
     async def _status_kdu(
         self,
         cluster_id: str,
@@ -449,6 +459,63 @@ class K8sHelm3Connector(K8sHelmBaseConnector):
         )
         return command
 
+    def _get_upgrade_scale_command(
+        self,
+        kdu_model: str,
+        kdu_instance: str,
+        namespace: str,
+        scale: int,
+        version: str,
+        atomic: bool,
+        replica_str: str,
+        timeout: float,
+        resource_name: str,
+        kubeconfig: str,
+    ) -> str:
+
+        timeout_str = ""
+        if timeout:
+            timeout_str = "--timeout {}s".format(timeout)
+
+        # atomic
+        atomic_str = ""
+        if atomic:
+            atomic_str = "--atomic"
+
+        # version
+        version_str = ""
+        if version:
+            version_str = "--version {}".format(version)
+
+        # namespace
+        namespace_str = ""
+        if namespace:
+            namespace_str = "--namespace {}".format(namespace)
+
+        # scale
+        if resource_name:
+            scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
+        else:
+            scale_dict = {replica_str: scale}
+
+        scale_str = self._params_to_set_option(scale_dict)
+
+        command = (
+            "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} --output yaml {scale} "
+            "{timeout} {ver}"
+        ).format(
+            helm=self._helm_command,
+            name=kdu_instance,
+            namespace=namespace_str,
+            atomic=atomic_str,
+            scale=scale_str,
+            timeout=timeout_str,
+            model=kdu_model,
+            ver=version_str,
+            kubeconfig=kubeconfig,
+        )
+        return command
+
     def _get_upgrade_command(
         self,
         kdu_model: str,
index 20fa337..273b206 100644 (file)
@@ -347,12 +347,7 @@ class K8sHelmBaseConnector(K8sConnector):
         )
 
         # version
-        version = None
-        if ":" in kdu_model:
-            parts = kdu_model.split(sep=":")
-            if len(parts) == 2:
-                version = str(parts[1])
-                kdu_model = parts[0]
+        kdu_model, version = self._split_version(kdu_model)
 
         command = self._get_install_command(
             kdu_model,
@@ -456,12 +451,7 @@ class K8sHelmBaseConnector(K8sConnector):
         )
 
         # version
-        version = None
-        if ":" in kdu_model:
-            parts = kdu_model.split(sep=":")
-            if len(parts) == 2:
-                version = str(parts[1])
-                kdu_model = parts[0]
+        kdu_model, version = self._split_version(kdu_model)
 
         command = self._get_upgrade_command(
             kdu_model,
@@ -549,17 +539,189 @@ class K8sHelmBaseConnector(K8sConnector):
         scale: int,
         resource_name: str,
         total_timeout: float = 1800,
+        cluster_uuid: str = None,
+        kdu_model: str = None,
+        atomic: bool = True,
+        db_dict: dict = None,
         **kwargs,
     ):
-        raise NotImplementedError("Method not implemented")
+        """Scale a resource in a Helm Chart.
+
+        Args:
+            kdu_instance: KDU instance name
+            scale: Scale to which to set the resource
+            resource_name: Resource name
+            total_timeout: The time, in seconds, to wait
+            cluster_uuid: The UUID of the cluster
+            kdu_model: The chart reference
+            atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
+                The --wait flag will be set automatically if --atomic is used
+            db_dict: Dictionary for any additional data
+            kwargs: Additional parameters
+
+        Returns:
+            True if successful, False otherwise
+        """
+
+        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+
+        debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_id)
+        if resource_name:
+            debug_mgs = "scaling resource {} in model {} (cluster {})".format(
+                resource_name, kdu_model, cluster_id
+            )
+
+        self.log.debug(debug_mgs)
+
+        # look for instance to obtain namespace
+        # get_instance_info function calls the sync command
+        instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+        if not instance_info:
+            raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+        # init env, paths
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
+        )
+
+        # version
+        kdu_model, version = self._split_version(kdu_model)
+
+        repo_url = await self._find_repo(kdu_model, cluster_uuid)
+        if not repo_url:
+            raise K8sException(
+                "Repository not found for kdu_model {}".format(kdu_model)
+            )
+
+        _, replica_str = await self._get_replica_count_url(
+            kdu_model, repo_url, resource_name
+        )
+
+        command = self._get_upgrade_scale_command(
+            kdu_model,
+            kdu_instance,
+            instance_info["namespace"],
+            scale,
+            version,
+            atomic,
+            replica_str,
+            total_timeout,
+            resource_name,
+            paths["kube_config"],
+        )
+
+        self.log.debug("scaling: {}".format(command))
+
+        if atomic:
+            # exec helm in a task
+            exec_task = asyncio.ensure_future(
+                coro_or_future=self._local_async_exec(
+                    command=command, raise_exception_on_error=False, env=env
+                )
+            )
+            # write status in another task
+            status_task = asyncio.ensure_future(
+                coro_or_future=self._store_status(
+                    cluster_id=cluster_id,
+                    kdu_instance=kdu_instance,
+                    namespace=instance_info["namespace"],
+                    db_dict=db_dict,
+                    operation="scale",
+                    run_once=False,
+                )
+            )
+
+            # wait for execution task
+            await asyncio.wait([exec_task])
+
+            # cancel status task
+            status_task.cancel()
+            output, rc = exec_task.result()
+
+        else:
+            output, rc = await self._local_async_exec(
+                command=command, raise_exception_on_error=False, env=env
+            )
+
+        # write final status
+        await self._store_status(
+            cluster_id=cluster_id,
+            kdu_instance=kdu_instance,
+            namespace=instance_info["namespace"],
+            db_dict=db_dict,
+            operation="scale",
+            run_once=True,
+            check_every=0,
+        )
+
+        if rc != 0:
+            msg = "Error executing command: {}\nOutput: {}".format(command, output)
+            self.log.error(msg)
+            raise K8sException(msg)
+
+        # sync fs
+        self.fs.reverse_sync(from_path=cluster_id)
+
+        return True
 
     async def get_scale_count(
         self,
         resource_name: str,
         kdu_instance: str,
+        cluster_uuid: str,
+        kdu_model: str,
         **kwargs,
-    ):
-        raise NotImplementedError("Method not implemented")
+    ) -> int:
+        """Get a resource scale count.
+
+        Args:
+            cluster_uuid: The UUID of the cluster
+            resource_name: Resource name
+            kdu_instance: KDU instance name
+            kdu_model: The name or path of a bundle
+            kwargs: Additional parameters
+
+        Returns:
+            Resource instance count
+        """
+
+        _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+        self.log.debug(
+            "getting scale count for {} in cluster {}".format(kdu_model, cluster_id)
+        )
+
+        # look for instance to obtain namespace
+        instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+        if not instance_info:
+            raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+        # init env, paths
+        paths, env = self._init_paths_env(
+            cluster_name=cluster_id, create_if_not_exist=True
+        )
+
+        replicas = await self._get_replica_count_instance(
+            kdu_instance, instance_info["namespace"], paths["kube_config"]
+        )
+
+        # Get default value if scale count is not found from provided values
+        if not replicas:
+            repo_url = await self._find_repo(kdu_model, cluster_uuid)
+            if not repo_url:
+                raise K8sException(
+                    "Repository not found for kdu_model {}".format(kdu_model)
+                )
+
+            replicas, _ = await self._get_replica_count_url(
+                kdu_model, repo_url, resource_name
+            )
+
+        if not replicas:
+            msg = "Replica count not found. Cannot be scaled"
+            self.log.error(msg)
+            raise K8sException(msg)
+
+        return int(replicas)
 
     async def rollback(
         self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
@@ -884,6 +1046,19 @@ class K8sHelmBaseConnector(K8sConnector):
 
         return status
 
+    async def get_values_kdu(
+        self, kdu_instance: str, namespace: str, kubeconfig: str
+    ) -> str:
+
+        self.log.debug("get kdu_instance values {}".format(kdu_instance))
+
+        return await self._exec_get_command(
+            get_command="values",
+            kdu_instance=kdu_instance,
+            namespace=namespace,
+            kubeconfig=kubeconfig,
+        )
+
     async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
 
         self.log.debug(
@@ -892,7 +1067,7 @@ class K8sHelmBaseConnector(K8sConnector):
             )
         )
 
-        return await self._exec_inspect_comand(
+        return await self._exec_inspect_command(
             inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
         )
 
@@ -902,7 +1077,7 @@ class K8sHelmBaseConnector(K8sConnector):
             "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
         )
 
-        return await self._exec_inspect_comand(
+        return await self._exec_inspect_command(
             inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
         )
 
@@ -1043,6 +1218,22 @@ class K8sHelmBaseConnector(K8sConnector):
         Obtain command to be executed to delete the indicated instance
         """
 
+    @abc.abstractmethod
+    def _get_upgrade_scale_command(
+        self,
+        kdu_model,
+        kdu_instance,
+        namespace,
+        count,
+        version,
+        atomic,
+        replicas,
+        timeout,
+        resource_name,
+        kubeconfig,
+    ) -> str:
+        """Obtain command to be executed to upgrade the indicated instance."""
+
     @abc.abstractmethod
     def _get_upgrade_command(
         self,
@@ -1083,6 +1274,12 @@ class K8sHelmBaseConnector(K8sConnector):
         Obtain command to be executed to obtain information about the kdu
         """
 
+    @abc.abstractmethod
+    def _get_get_command(
+        self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
+    ):
+        """Obtain command to be executed to get information about the kdu instance."""
+
     @abc.abstractmethod
     async def _uninstall_sw(self, cluster_id: str, namespace: str):
         """
@@ -1386,12 +1583,23 @@ class K8sHelmBaseConnector(K8sConnector):
 
         return service
 
-    async def _exec_inspect_comand(
+    async def _exec_get_command(
+        self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
+    ):
+        """Obtains information about the kdu instance."""
+
+        full_command = self._get_get_command(
+            get_command, kdu_instance, namespace, kubeconfig
+        )
+
+        output, _rc = await self._local_async_exec(command=full_command)
+
+        return output
+
+    async def _exec_inspect_command(
         self, inspect_command: str, kdu_model: str, repo_url: str = None
     ):
-        """
-        Obtains information about a kdu, no cluster (no env)
-        """
+        """Obtains information about a kdu, no cluster (no env)."""
 
         repo_str = ""
         if repo_url:
@@ -1402,22 +1610,141 @@ class K8sHelmBaseConnector(K8sConnector):
             idx += 1
             kdu_model = kdu_model[idx:]
 
-        version = ""
-        if ":" in kdu_model:
-            parts = kdu_model.split(sep=":")
-            if len(parts) == 2:
-                version = "--version {}".format(str(parts[1]))
-                kdu_model = parts[0]
+        kdu_model, version = self._split_version(kdu_model)
+        if version:
+            version_str = "--version {}".format(version)
+        else:
+            version_str = ""
 
         full_command = self._get_inspect_command(
-            inspect_command, kdu_model, repo_str, version
-        )
-        output, _rc = await self._local_async_exec(
-            command=full_command, encode_utf8=True
+            inspect_command, kdu_model, repo_str, version_str
         )
 
+        output, _rc = await self._local_async_exec(command=full_command)
+
         return output
 
+    async def _get_replica_count_url(
+        self,
+        kdu_model: str,
+        repo_url: str,
+        resource_name: str = None,
+    ):
+        """Get the replica count value in the Helm Chart Values.
+
+        Args:
+            kdu_model: The name or path of a bundle
+            repo_url: Helm Chart repository url
+            resource_name: Resource name
+
+        Returns:
+            True if replicas, False replicaCount
+        """
+
+        kdu_values = yaml.load(
+            await self.values_kdu(kdu_model, repo_url), Loader=yaml.SafeLoader
+        )
+
+        if not kdu_values:
+            raise K8sException(
+                "kdu_values not found for kdu_model {}".format(kdu_model)
+            )
+
+        if resource_name:
+            kdu_values = kdu_values.get(resource_name, None)
+
+        if not kdu_values:
+            msg = "resource {} not found in the values in model {}".format(
+                resource_name, kdu_model
+            )
+            self.log.error(msg)
+            raise K8sException(msg)
+
+        duplicate_check = False
+
+        replica_str = ""
+        replicas = None
+
+        if kdu_values.get("replicaCount", None):
+            replicas = kdu_values["replicaCount"]
+            replica_str = "replicaCount"
+        elif kdu_values.get("replicas", None):
+            duplicate_check = True
+            replicas = kdu_values["replicas"]
+            replica_str = "replicas"
+        else:
+            if resource_name:
+                msg = (
+                    "replicaCount or replicas not found in the resource"
+                    "{} values in model {}. Cannot be scaled".format(
+                        resource_name, kdu_model
+                    )
+                )
+            else:
+                msg = (
+                    "replicaCount or replicas not found in the values"
+                    "in model {}. Cannot be scaled".format(kdu_model)
+                )
+            self.log.error(msg)
+            raise K8sException(msg)
+
+        # Control if replicas and replicaCount exists at the same time
+        msg = "replicaCount and replicas are exists at the same time"
+        if duplicate_check:
+            if "replicaCount" in kdu_values:
+                self.log.error(msg)
+                raise K8sException(msg)
+        else:
+            if "replicas" in kdu_values:
+                self.log.error(msg)
+                raise K8sException(msg)
+
+        return replicas, replica_str
+
+    async def _get_replica_count_instance(
+        self,
+        kdu_instance: str,
+        namespace: str,
+        kubeconfig: str,
+        resource_name: str = None,
+    ):
+        """Get the replica count value in the instance.
+
+        Args:
+            kdu_instance: The name of the KDU instance
+            namespace: KDU instance namespace
+            kubeconfig:
+            resource_name: Resource name
+
+        Returns:
+            True if replicas, False replicaCount
+        """
+
+        kdu_values = yaml.load(
+            await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
+            Loader=yaml.SafeLoader,
+        )
+
+        replicas = None
+
+        if kdu_values:
+            resource_values = (
+                kdu_values.get(resource_name, None) if resource_name else None
+            )
+            replicas = (
+                (
+                    resource_values.get("replicaCount", None)
+                    or resource_values.get("replicas", None)
+                )
+                if resource_values
+                else (
+                    kdu_values.get("replicaCount", None)
+                    or kdu_values.get("replicas", None)
+                )
+            )
+
+        return replicas
+
     async def _store_status(
         self,
         cluster_id: str,
@@ -1542,3 +1869,23 @@ class K8sHelmBaseConnector(K8sConnector):
 
         name = name + get_random_number()
         return name.lower()
+
+    def _split_version(self, kdu_model: str) -> (str, str):
+        version = None
+        if ":" in kdu_model:
+            parts = kdu_model.split(sep=":")
+            if len(parts) == 2:
+                version = str(parts[1])
+                kdu_model = parts[0]
+        return kdu_model, version
+
+    async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
+        repo_url = None
+        idx = kdu_model.find("/")
+        if idx >= 0:
+            repo_name = kdu_model[:idx]
+            # Find repository link
+            local_repo_list = await self.repo_list(cluster_uuid)
+            for repo in local_repo_list:
+                repo_url = repo["url"] if repo["name"] == repo_name else None
+        return repo_url
index 13a3114..5a788e9 100644 (file)
@@ -168,7 +168,7 @@ class K8sHelmConnector(K8sHelmBaseConnector):
             "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
         )
 
-        return await self._exec_inspect_comand(
+        return await self._exec_inspect_command(
             inspect_command="", kdu_model=kdu_model, repo_url=repo_url
         )
 
@@ -449,6 +449,14 @@ class K8sHelmConnector(K8sHelmBaseConnector):
         )
         return inspect_command
 
+    def _get_get_command(
+        self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
+    ):
+        get_command = "env KUBECONFIG={} {} get {} {} --output yaml".format(
+            kubeconfig, self._helm_command, get_command, kdu_instance
+        )
+        return get_command
+
     async def _status_kdu(
         self,
         cluster_id: str,
@@ -602,7 +610,7 @@ class K8sHelmConnector(K8sHelmBaseConnector):
         # version
         version_str = ""
         if version:
-            version_str = version_str = "--version {}".format(version)
+            version_str = "--version {}".format(version)
 
         command = (
             "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml  "
@@ -620,6 +628,56 @@ class K8sHelmConnector(K8sHelmBaseConnector):
         )
         return command
 
+    def _get_upgrade_scale_command(
+        self,
+        kdu_model: str,
+        kdu_instance: str,
+        namespace: str,
+        scale: int,
+        version: str,
+        atomic: bool,
+        replica_str: str,
+        timeout: float,
+        resource_name: str,
+        kubeconfig: str,
+    ) -> str:
+
+        timeout_str = ""
+        if timeout:
+            timeout_str = "--timeout {}s".format(timeout)
+
+        # atomic
+        atomic_str = ""
+        if atomic:
+            atomic_str = "--atomic"
+
+        # version
+        version_str = ""
+        if version:
+            version_str = "--version {}".format(version)
+
+        # scale
+        if resource_name:
+            scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
+        else:
+            scale_dict = {replica_str: scale}
+
+        scale_str = self._params_to_set_option(scale_dict)
+
+        command = (
+            "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {scale} {timeout} {name} {model} {ver}"
+        ).format(
+            helm=self._helm_command,
+            name=kdu_instance,
+            atomic=atomic_str,
+            scale=scale_str,
+            timeout=timeout_str,
+            model=kdu_model,
+            ver=version_str,
+            kubeconfig=kubeconfig,
+        )
+        return command
+
     def _get_upgrade_command(
         self,
         kdu_model,
index f8ed0e0..1f217f8 100644 (file)
@@ -360,8 +360,8 @@ class K8sJujuConnector(K8sConnector):
         """Scale an application in a model
 
         :param: kdu_instance str:        KDU instance name
-        :param: scale int:               Scale to which to set this application
-        :param: resource_name str:       Resource name (Application name)
+        :param: scale int:               Scale to which to set the application
+        :param: resource_name str:       The application name in the Juju Bundle
         :param: timeout float:           The time, in seconds, to wait for the install
                                          to finish
         :param kwargs:                   Additional parameters
@@ -394,12 +394,13 @@ class K8sJujuConnector(K8sConnector):
     ) -> int:
         """Get an application scale count
 
-        :param: resource_name str:       Resource name (Application name)
+        :param: resource_name str:       The application name in the Juju Bundle
         :param: kdu_instance str:        KDU instance name
         :param kwargs:                   Additional parameters
                                             vca_id (str): VCA ID
         :return: Return application instance count
         """
+
         try:
             libjuju = await self._get_libjuju(kwargs.get("vca_id"))
             status = await libjuju.get_model_status(kdu_instance)
index e3c7707..4f5247b 100644 (file)
@@ -290,6 +290,91 @@ class TestK8sHelm3Conn(asynctest.TestCase):
             command=command, env=self.env, raise_exception_on_error=False
         )
 
+    @asynctest.fail_on(active_handles=True)
+    async def test_scale(self):
+        kdu_model = "stable/openldap:1.2.3"
+        kdu_instance = "stable-openldap-0005399828"
+        db_dict = {}
+        instance_info = {
+            "chart": "openldap-1.2.3",
+            "name": kdu_instance,
+            "namespace": self.namespace,
+            "revision": 1,
+            "status": "DEPLOYED",
+        }
+        repo_list = [
+            {
+                "name": "stable",
+                "url": "https://kubernetes-charts.storage.googleapis.com/",
+            }
+        ]
+        kdu_values = """
+            # Default values for openldap.
+            # This is a YAML-formatted file.
+            # Declare variables to be passed into your templates.
+
+            replicaCount: 1
+            dummy-app:
+              replicas: 2
+        """
+
+        self.helm_conn.repo_list = asynctest.CoroutineMock(return_value=repo_list)
+        self.helm_conn.values_kdu = asynctest.CoroutineMock(return_value=kdu_values)
+        self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+        self.helm_conn._store_status = asynctest.CoroutineMock()
+        self.helm_conn.get_instance_info = asynctest.CoroutineMock(
+            return_value=instance_info
+        )
+
+        # TEST-1
+        await self.helm_conn.scale(
+            kdu_instance,
+            2,
+            "",
+            kdu_model=kdu_model,
+            cluster_uuid=self.cluster_uuid,
+            atomic=True,
+            db_dict=db_dict,
+        )
+        command = (
+            "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config "
+            "/usr/bin/helm3 upgrade stable-openldap-0005399828 stable/openldap "
+            "--namespace testk8s --atomic --output yaml --set replicaCount=2 --timeout 1800s "
+            "--version 1.2.3"
+        )
+        self.helm_conn._local_async_exec.assert_called_once_with(
+            command=command, env=self.env, raise_exception_on_error=False
+        )
+        # TEST-2
+        await self.helm_conn.scale(
+            kdu_instance,
+            3,
+            "dummy-app",
+            kdu_model=kdu_model,
+            cluster_uuid=self.cluster_uuid,
+            atomic=True,
+            db_dict=db_dict,
+        )
+        command = (
+            "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config "
+            "/usr/bin/helm3 upgrade stable-openldap-0005399828 stable/openldap "
+            "--namespace testk8s --atomic --output yaml --set dummy-app.replicas=3 --timeout 1800s "
+            "--version 1.2.3"
+        )
+        self.helm_conn._local_async_exec.assert_called_with(
+            command=command, env=self.env, raise_exception_on_error=False
+        )
+        self.helm_conn.fs.reverse_sync.assert_called_with(from_path=self.cluster_id)
+        self.helm_conn._store_status.assert_called_with(
+            cluster_id=self.cluster_id,
+            kdu_instance=kdu_instance,
+            namespace=self.namespace,
+            db_dict=db_dict,
+            operation="scale",
+            run_once=True,
+            check_every=0,
+        )
+
     @asynctest.fail_on(active_handles=True)
     async def test_rollback(self):
         kdu_instance = "stable-openldap-0005399828"
@@ -422,9 +507,7 @@ class TestK8sHelm3Conn(asynctest.TestCase):
             "https://kubernetes-charts.storage.googleapis.com/ "
             "--version 1.2.4"
         )
-        self.helm_conn._local_async_exec.assert_called_with(
-            command=command, encode_utf8=True
-        )
+        self.helm_conn._local_async_exec.assert_called_with(command=command)
 
     @asynctest.fail_on(active_handles=True)
     async def test_help_kdu(self):
@@ -439,9 +522,7 @@ class TestK8sHelm3Conn(asynctest.TestCase):
             "https://kubernetes-charts.storage.googleapis.com/ "
             "--version 1.2.4"
         )
-        self.helm_conn._local_async_exec.assert_called_with(
-            command=command, encode_utf8=True
-        )
+        self.helm_conn._local_async_exec.assert_called_with(command=command)
 
     @asynctest.fail_on(active_handles=True)
     async def test_values_kdu(self):
@@ -456,9 +537,22 @@ class TestK8sHelm3Conn(asynctest.TestCase):
             "https://kubernetes-charts.storage.googleapis.com/ "
             "--version 1.2.4"
         )
-        self.helm_conn._local_async_exec.assert_called_with(
-            command=command, encode_utf8=True
+        self.helm_conn._local_async_exec.assert_called_with(command=command)
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_get_values_kdu(self):
+        self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+        kdu_instance = "stable-openldap-0005399828"
+        await self.helm_conn.get_values_kdu(
+            kdu_instance, self.namespace, self.env["KUBECONFIG"]
+        )
+
+        command = (
+            "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 get values "
+            "stable-openldap-0005399828 --namespace=testk8s --output yaml"
         )
+        self.helm_conn._local_async_exec.assert_called_with(command=command)
 
     @asynctest.fail_on(active_handles=True)
     async def test_instances_list(self):
index afc8ca7..8e58740 100644 (file)
@@ -223,6 +223,92 @@ class TestK8sHelmConn(asynctest.TestCase):
             command=command, env=self.env, raise_exception_on_error=False
         )
 
+    @asynctest.fail_on(active_handles=True)
+    async def test_scale(self):
+        kdu_model = "stable/openldap:1.2.3"
+        kdu_instance = "stable-openldap-0005399828"
+        db_dict = {}
+        instance_info = {
+            "chart": "openldap-1.2.3",
+            "name": kdu_instance,
+            "namespace": self.namespace,
+            "revision": 1,
+            "status": "DEPLOYED",
+        }
+        repo_list = [
+            {
+                "name": "stable",
+                "url": "https://kubernetes-charts.storage.googleapis.com/",
+            }
+        ]
+        kdu_values = """
+            # Default values for openldap.
+            # This is a YAML-formatted file.
+            # Declare variables to be passed into your templates.
+
+            replicaCount: 1
+            dummy-app:
+              replicas: 2
+        """
+
+        self.helm_conn.repo_list = asynctest.CoroutineMock(return_value=repo_list)
+        self.helm_conn.values_kdu = asynctest.CoroutineMock(return_value=kdu_values)
+        self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+        self.helm_conn._store_status = asynctest.CoroutineMock()
+        self.helm_conn.get_instance_info = asynctest.CoroutineMock(
+            return_value=instance_info
+        )
+
+        # TEST-1
+        await self.helm_conn.scale(
+            kdu_instance,
+            2,
+            "",
+            kdu_model=kdu_model,
+            cluster_uuid=self.cluster_uuid,
+            atomic=True,
+            db_dict=db_dict,
+        )
+        command = (
+            "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config "
+            "/usr/bin/helm upgrade --atomic --output yaml --set replicaCount=2 "
+            "--timeout 1800s stable-openldap-0005399828 stable/openldap "
+            "--version 1.2.3"
+        )
+        self.helm_conn._local_async_exec.assert_called_once_with(
+            command=command, env=self.env, raise_exception_on_error=False
+        )
+
+        # TEST-2
+        await self.helm_conn.scale(
+            kdu_instance,
+            3,
+            "dummy-app",
+            kdu_model=kdu_model,
+            cluster_uuid=self.cluster_uuid,
+            atomic=True,
+            db_dict=db_dict,
+        )
+        command = (
+            "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config "
+            "/usr/bin/helm upgrade --atomic --output yaml --set dummy-app.replicas=3 "
+            "--timeout 1800s stable-openldap-0005399828 stable/openldap "
+            "--version 1.2.3"
+        )
+        self.helm_conn._local_async_exec.assert_called_with(
+            command=command, env=self.env, raise_exception_on_error=False
+        )
+        self.helm_conn.fs.reverse_sync.assert_called_with(from_path=self.cluster_id)
+        self.helm_conn._store_status.assert_called_with(
+            cluster_id=self.cluster_id,
+            kdu_instance=kdu_instance,
+            namespace=self.namespace,
+            db_dict=db_dict,
+            operation="scale",
+            run_once=True,
+            check_every=0,
+        )
+
     @asynctest.fail_on(active_handles=True)
     async def test_rollback(self):
         kdu_instance = "stable-openldap-0005399828"
@@ -355,9 +441,7 @@ class TestK8sHelmConn(asynctest.TestCase):
             "https://kubernetes-charts.storage.googleapis.com/ "
             "--version 1.2.4"
         )
-        self.helm_conn._local_async_exec.assert_called_with(
-            command=command, encode_utf8=True
-        )
+        self.helm_conn._local_async_exec.assert_called_with(command=command)
 
     @asynctest.fail_on(active_handles=True)
     async def test_help_kdu(self):
@@ -372,9 +456,7 @@ class TestK8sHelmConn(asynctest.TestCase):
             "https://kubernetes-charts.storage.googleapis.com/ "
             "--version 1.2.4"
         )
-        self.helm_conn._local_async_exec.assert_called_with(
-            command=command, encode_utf8=True
-        )
+        self.helm_conn._local_async_exec.assert_called_with(command=command)
 
     @asynctest.fail_on(active_handles=True)
     async def test_values_kdu(self):
@@ -389,9 +471,22 @@ class TestK8sHelmConn(asynctest.TestCase):
             "https://kubernetes-charts.storage.googleapis.com/ "
             "--version 1.2.4"
         )
-        self.helm_conn._local_async_exec.assert_called_with(
-            command=command, encode_utf8=True
+        self.helm_conn._local_async_exec.assert_called_with(command=command)
+
+    @asynctest.fail_on(active_handles=True)
+    async def test_get_values_kdu(self):
+        self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+
+        kdu_instance = "stable-openldap-0005399828"
+        await self.helm_conn.get_values_kdu(
+            kdu_instance, self.namespace, self.env["KUBECONFIG"]
+        )
+
+        command = (
+            "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm get values "
+            "stable-openldap-0005399828 --output yaml"
         )
+        self.helm_conn._local_async_exec.assert_called_with(command=command)
 
     @asynctest.fail_on(active_handles=True)
     async def test_instances_list(self):