+ async def scale(
+ self,
+ kdu_instance: str,
+ scale: int,
+ resource_name: str,
+ total_timeout: float = 1800,
+ cluster_uuid: str = None,
+ kdu_model: str = None,
+ atomic: bool = True,
+ db_dict: dict = None,
+ **kwargs,
+ ):
+ """Scale a resource in a Helm Chart.
+
+ Args:
+ kdu_instance: KDU instance name
+ scale: Scale to which to set the resource
+ resource_name: Resource name
+ total_timeout: The time, in seconds, to wait
+ cluster_uuid: The UUID of the cluster
+ kdu_model: The chart reference
+ atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
+ The --wait flag will be set automatically if --atomic is used
+ db_dict: Dictionary for any additional data
+ kwargs: Additional parameters
+
+ Returns:
+ True if successful, False otherwise
+ """
+
+ debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
+ if resource_name:
+ debug_mgs = "scaling resource {} in model {} (cluster {})".format(
+ resource_name, kdu_model, cluster_uuid
+ )
+
+ self.log.debug(debug_mgs)
+
+ # look for instance to obtain namespace
+ # get_instance_info function calls the sync command
+ instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+ if not instance_info:
+ raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+
+ # version
+ kdu_model, version = self._split_version(kdu_model)
+
+ repo_url = await self._find_repo(kdu_model, cluster_uuid)
+ if not repo_url:
+ raise K8sException(
+ "Repository not found for kdu_model {}".format(kdu_model)
+ )
+
+ _, replica_str = await self._get_replica_count_url(
+ kdu_model, repo_url, resource_name
+ )
+
+ command = self._get_upgrade_scale_command(
+ kdu_model,
+ kdu_instance,
+ instance_info["namespace"],
+ scale,
+ version,
+ atomic,
+ replica_str,
+ total_timeout,
+ resource_name,
+ paths["kube_config"],
+ )
+
+ self.log.debug("scaling: {}".format(command))
+
+ if atomic:
+ # exec helm in a task
+ exec_task = asyncio.ensure_future(
+ coro_or_future=self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+ )
+ # write status in another task
+ status_task = asyncio.ensure_future(
+ coro_or_future=self._store_status(
+ cluster_id=cluster_uuid,
+ kdu_instance=kdu_instance,
+ namespace=instance_info["namespace"],
+ db_dict=db_dict,
+ operation="scale",
+ run_once=False,
+ )
+ )
+
+ # wait for execution task
+ await asyncio.wait([exec_task])
+
+ # cancel status task
+ status_task.cancel()
+ output, rc = exec_task.result()
+
+ else:
+ output, rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
+
+ # write final status
+ await self._store_status(
+ cluster_id=cluster_uuid,
+ kdu_instance=kdu_instance,
+ namespace=instance_info["namespace"],
+ db_dict=db_dict,
+ operation="scale",
+ run_once=True,
+ check_every=0,
+ )
+
+ if rc != 0:
+ msg = "Error executing command: {}\nOutput: {}".format(command, output)
+ self.log.error(msg)
+ raise K8sException(msg)
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_uuid)
+
+ return True
+
+ async def get_scale_count(
+ self,
+ resource_name: str,
+ kdu_instance: str,
+ cluster_uuid: str,
+ kdu_model: str,
+ **kwargs,
+ ) -> int:
+ """Get a resource scale count.
+
+ Args:
+ cluster_uuid: The UUID of the cluster
+ resource_name: Resource name
+ kdu_instance: KDU instance name
+ kdu_model: The name or path of a bundle
+ kwargs: Additional parameters
+
+ Returns:
+ Resource instance count
+ """
+
+ self.log.debug(
+ "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
+ )
+
+ # look for instance to obtain namespace
+ instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
+ if not instance_info:
+ raise K8sException("kdu_instance {} not found".format(kdu_instance))
+
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+
+ replicas = await self._get_replica_count_instance(
+ kdu_instance, instance_info["namespace"], paths["kube_config"]
+ )
+
+ # Get default value if scale count is not found from provided values
+ if not replicas:
+ repo_url = await self._find_repo(kdu_model, cluster_uuid)
+ if not repo_url:
+ raise K8sException(
+ "Repository not found for kdu_model {}".format(kdu_model)
+ )
+
+ replicas, _ = await self._get_replica_count_url(
+ kdu_model, repo_url, resource_name
+ )
+
+ if not replicas:
+ msg = "Replica count not found. Cannot be scaled"
+ self.log.error(msg)
+ raise K8sException(msg)
+
+ return int(replicas)
+