import shlex
import shutil
import stat
-import subprocess
import os
import yaml
from uuid import uuid4
)
)
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
# init_env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
# helm repo update
- command = "{} repo update".format(self._helm_command)
+ command = "env KUBECONFIG={} {} repo update".format(
+ paths["kube_config"], self._helm_command
+ )
self.log.debug("updating repo: {}".format(command))
await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
# helm repo add name url
- command = "{} repo add {} {}".format(self._helm_command, name, url)
+ command = "env KUBECONFIG={} {} repo add {} {}".format(
+ paths["kube_config"], self._helm_command, name, url
+ )
self.log.debug("adding repo: {}".format(command))
await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug("list repositories for cluster {}".format(cluster_id))
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
# config filename
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
- command = "{} repo list --output yaml".format(self._helm_command)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ command = "env KUBECONFIG={} {} repo list --output yaml".format(
+ paths["kube_config"], self._helm_command
+ )
# Set exception to false because if there are no repos just want an empty list
output, _rc = await self._local_async_exec(
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
# init env, paths
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
- command = "{} repo remove {}".format(self._helm_command, name)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ command = "env KUBECONFIG={} {} repo remove {}".format(
+ paths["kube_config"], self._helm_command, name
+ )
await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
kdu_name: str = None,
namespace: str = None,
):
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
# params to str
params_str, file_to_delete = self._params_to_file_option(
cluster_id=cluster_id, params=params
kdu_model = parts[0]
command = self._get_install_command(
- kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ paths["kube_config"],
)
self.log.debug("installing: {}".format(command))
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
# params to str
params_str, file_to_delete = self._params_to_file_option(
cluster_id=cluster_id, params=params
version,
atomic,
timeout,
+ paths["kube_config"],
)
self.log.debug("upgrading: {}".format(command))
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
command = self._get_rollback_command(
- kdu_instance, instance_info["namespace"], revision
+ kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
)
self.log.debug("rolling_back: {}".format(command))
# look for instance to obtain namespace
instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
if not instance_info:
- raise K8sException("kdu_instance {} not found".format(kdu_instance))
-
+ self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
+ return True
# init env, paths
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
- command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ command = self._get_uninstall_command(
+ kdu_instance, instance_info["namespace"], paths["kube_config"]
+ )
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
)
)
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
# sync local dir
self.fs.sync(from_path=cluster_id)
# get list of services names for kdu
- service_names = await self._get_services(cluster_id, kdu_instance, namespace)
+ service_names = await self._get_services(
+ cluster_id, kdu_instance, namespace, paths["kube_config"]
+ )
service_list = []
for service in service_names:
"""
@abc.abstractmethod
- async def _get_services(self, cluster_id, kdu_instance, namespace):
+ async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
"""
Implements the helm version dependent method to obtain services from a helm instance
"""
@abc.abstractmethod
def _get_install_command(
- self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
+ self,
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ kubeconfig,
) -> str:
"""
Obtain command to be executed to delete the indicated instance
@abc.abstractmethod
def _get_upgrade_command(
- self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
+ self,
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ kubeconfig,
) -> str:
"""
Obtain command to be executed to upgrade the indicated instance
"""
@abc.abstractmethod
- def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
+ def _get_rollback_command(
+ self, kdu_instance, namespace, revision, kubeconfig
+ ) -> str:
"""
Obtain command to be executed to rollback the indicated instance
"""
@abc.abstractmethod
- def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
+ def _get_uninstall_command(
+ self, kdu_instance: str, namespace: str, kubeconfig: str
+ ) -> str:
"""
Obtain command to be executed to delete the indicated instance
"""
new_list.append(new_dict)
return new_list
- def _local_exec(self, command: str) -> (str, int):
- command = self._remove_multiple_spaces(command)
- self.log.debug("Executing sync local command: {}".format(command))
- # raise exception if fails
- output = ""
- try:
- output = subprocess.check_output(
- command, shell=True, universal_newlines=True
- )
- return_code = 0
- self.log.debug(output)
- except Exception:
- return_code = 1
-
- return output, return_code
-
async def _local_async_exec(
self,
command: str,