# See the License for the specific language governing permissions and
# limitations under the License.
-OUTPUT=$(TOX_PARALLEL_NO_SPINNER=1 tox --parallel=auto)
-printf "$OUTPUT"
+tox --parallel=auto
if not await self._namespace_exists(cluster_id, namespace):
err_msg = (
"namespace {} does not exist in cluster_id {} "
- "error message: ".format(
- namespace, e
- )
+ "error message: ".format(namespace, e)
)
self.log.error(err_msg)
raise K8sException(err_msg)
return _rc
- async def _get_services(self, cluster_id: str, kdu_instance: str, namespace: str):
+ async def _get_services(
+ self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str
+ ):
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
- command1 = "{} get manifest {} --namespace={}".format(
- self._helm_command, kdu_instance, namespace
+ command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
+ kubeconfig, self._helm_command, kdu_instance, namespace
)
command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
output, _rc = await self._local_async_exec_pipe(
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
- command = "{} status {} --namespace={} --output yaml".format(
- self._helm_command, kdu_instance, namespace
+ command = "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
+ paths["kube_config"], self._helm_command, kdu_instance, namespace
)
output, rc = await self._local_async_exec(
version: str,
atomic: bool,
timeout: float,
+ kubeconfig: str,
) -> str:
timeout_str = ""
version_str = "--version {}".format(version)
command = (
- "{helm} install {name} {atomic} --output yaml "
+ "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
"{params} {timeout} {ns} {model} {ver}".format(
+ kubeconfig=kubeconfig,
helm=self._helm_command,
name=kdu_instance,
atomic=atomic_str,
version: str,
atomic: bool,
timeout: float,
+ kubeconfig: str,
) -> str:
timeout_str = ""
namespace_str = "--namespace {}".format(namespace)
command = (
- "{helm} upgrade {name} {model} {namespace} {atomic} --output yaml {params} "
- "{timeout} {ver}".format(
- helm=self._helm_command,
- name=kdu_instance,
- namespace=namespace_str,
- atomic=atomic_str,
- params=params_str,
- timeout=timeout_str,
- model=kdu_model,
- ver=version_str,
- )
+ "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} "
+ "--output yaml {params} {timeout} {ver}"
+ ).format(
+ kubeconfig=kubeconfig,
+ helm=self._helm_command,
+ name=kdu_instance,
+ namespace=namespace_str,
+ atomic=atomic_str,
+ params=params_str,
+ timeout=timeout_str,
+ model=kdu_model,
+ ver=version_str,
)
return command
def _get_rollback_command(
- self, kdu_instance: str, namespace: str, revision: float
+ self, kdu_instance: str, namespace: str, revision: float, kubeconfig: str
) -> str:
- return "{} rollback {} {} --namespace={} --wait".format(
- self._helm_command, kdu_instance, revision, namespace
+ return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
+ kubeconfig, self._helm_command, kdu_instance, revision, namespace
)
- def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
+ def _get_uninstall_command(
+ self, kdu_instance: str, namespace: str, kubeconfig: str
+ ) -> str:
- return "{} uninstall {} --namespace={}".format(
- self._helm_command, kdu_instance, namespace
+ return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
+ kubeconfig, self._helm_command, kdu_instance, namespace
)
def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
import shlex
import shutil
import stat
-import subprocess
import os
import yaml
from uuid import uuid4
)
)
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
# init_env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
# helm repo update
- command = "{} repo update".format(self._helm_command)
+ command = "env KUBECONFIG={} {} repo update".format(
+ paths["kube_config"], self._helm_command
+ )
self.log.debug("updating repo: {}".format(command))
await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
# helm repo add name url
- command = "{} repo add {} {}".format(self._helm_command, name, url)
+ command = "env KUBECONFIG={} {} repo add {} {}".format(
+ paths["kube_config"], self._helm_command, name, url
+ )
self.log.debug("adding repo: {}".format(command))
await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug("list repositories for cluster {}".format(cluster_id))
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
# config filename
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
- command = "{} repo list --output yaml".format(self._helm_command)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ command = "env KUBECONFIG={} {} repo list --output yaml".format(
+ paths["kube_config"], self._helm_command
+ )
# Set exception to false because if there are no repos just want an empty list
output, _rc = await self._local_async_exec(
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
# init env, paths
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
- command = "{} repo remove {}".format(self._helm_command, name)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ command = "env KUBECONFIG={} {} repo remove {}".format(
+ paths["kube_config"], self._helm_command, name
+ )
await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
kdu_name: str = None,
namespace: str = None,
):
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
# params to str
params_str, file_to_delete = self._params_to_file_option(
cluster_id=cluster_id, params=params
kdu_model = parts[0]
command = self._get_install_command(
- kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ paths["kube_config"],
)
self.log.debug("installing: {}".format(command))
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
# params to str
params_str, file_to_delete = self._params_to_file_option(
cluster_id=cluster_id, params=params
version,
atomic,
timeout,
+ paths["kube_config"],
)
self.log.debug("upgrading: {}".format(command))
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
command = self._get_rollback_command(
- kdu_instance, instance_info["namespace"], revision
+ kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
)
self.log.debug("rolling_back: {}".format(command))
cluster_name=cluster_id, create_if_not_exist=True
)
- command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ command = self._get_uninstall_command(
+ kdu_instance, instance_info["namespace"], paths["kube_config"]
+ )
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
)
)
+ # init env, paths
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
# sync local dir
self.fs.sync(from_path=cluster_id)
# get list of services names for kdu
- service_names = await self._get_services(cluster_id, kdu_instance, namespace)
+ service_names = await self._get_services(
+ cluster_id, kdu_instance, namespace, paths["kube_config"]
+ )
service_list = []
for service in service_names:
"""
@abc.abstractmethod
- async def _get_services(self, cluster_id, kdu_instance, namespace):
+ async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
"""
Implements the helm version dependent method to obtain services from a helm instance
"""
@abc.abstractmethod
def _get_install_command(
- self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
+ self,
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ kubeconfig,
) -> str:
"""
Obtain command to be executed to delete the indicated instance
@abc.abstractmethod
def _get_upgrade_command(
- self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
+ self,
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ kubeconfig,
) -> str:
"""
Obtain command to be executed to upgrade the indicated instance
"""
@abc.abstractmethod
- def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
+ def _get_rollback_command(
+ self, kdu_instance, namespace, revision, kubeconfig
+ ) -> str:
"""
Obtain command to be executed to rollback the indicated instance
"""
@abc.abstractmethod
- def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
+ def _get_uninstall_command(
+ self, kdu_instance: str, namespace: str, kubeconfig: str
+ ) -> str:
"""
Obtain command to be executed to delete the indicated instance
"""
new_list.append(new_dict)
return new_list
- def _local_exec(self, command: str) -> (str, int):
- command = self._remove_multiple_spaces(command)
- self.log.debug("Executing sync local command: {}".format(command))
- # raise exception if fails
- output = ""
- try:
- output = subprocess.check_output(
- command, shell=True, universal_newlines=True
- )
- return_code = 0
- self.log.debug(output)
- except Exception:
- return_code = 1
-
- return output, return_code
-
async def _local_async_exec(
self,
command: str,
# check embeded chart (file or dir)
if chart_name.startswith("/"):
# extract file or directory name
- chart_name = chart_name[chart_name.rfind("/") + 1:]
+ chart_name = chart_name[chart_name.rfind("/") + 1 :]
# check URL
elif "://" in chart_name:
# extract last portion of URL
- chart_name = chart_name[chart_name.rfind("/") + 1:]
+ chart_name = chart_name[chart_name.rfind("/") + 1 :]
name = ""
for c in chart_name:
return paths, env
- async def _get_services(self, cluster_id, kdu_instance, namespace):
+ async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
- command1 = "{} get manifest {} ".format(self._helm_command, kdu_instance)
+ command1 = "env KUBECONFIG={} {} get manifest {} ".format(
+ kubeconfig, self._helm_command, kdu_instance
+ )
command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
output, _rc = await self._local_async_exec_pipe(
command1, command2, env=env, raise_exception_on_error=True
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
- command = "{} status {} --output yaml".format(self._helm_command, kdu_instance)
+ command = ("env KUBECONFIG={} {} status {} --output yaml").format(
+ paths["kube_config"], self._helm_command, kdu_instance
+ )
output, rc = await self._local_async_exec(
command=command,
raise_exception_on_error=True,
)
async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
+ # init config, env
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
status = await self._status_kdu(
cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
return ready
def _get_install_command(
- self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
+ self,
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ kubeconfig,
) -> str:
timeout_str = ""
version_str = version_str = "--version {}".format(version)
command = (
- "{helm} install {atomic} --output yaml "
+ "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
"{params} {timeout} --name={name} {ns} {model} {ver}".format(
+ kubeconfig=kubeconfig,
helm=self._helm_command,
atomic=atomic_str,
params=params_str,
return command
def _get_upgrade_command(
- self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
+ self,
+ kdu_model,
+ kdu_instance,
+ namespace,
+ params_str,
+ version,
+ atomic,
+ timeout,
+ kubeconfig,
) -> str:
timeout_str = ""
if version:
version_str = "--version {}".format(version)
- command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}".format(
+ command = (
+ "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
+ ).format(
+ kubeconfig=kubeconfig,
helm=self._helm_command,
atomic=atomic_str,
params=params_str,
)
return command
- def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
- return "{} rollback {} {} --wait".format(
- self._helm_command, kdu_instance, revision
+ def _get_rollback_command(
+ self, kdu_instance, namespace, revision, kubeconfig
+ ) -> str:
+ return "env KUBECONFIG={} {} rollback {} {} --wait".format(
+ kubeconfig, self._helm_command, kdu_instance, revision
)
- def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
- return "{} delete --purge {}".format(self._helm_command, kdu_instance)
+ def _get_uninstall_command(
+ self, kdu_instance: str, namespace: str, kubeconfig: str
+ ) -> str:
+ return "env KUBECONFIG={} {} delete --purge {}".format(
+ kubeconfig, self._helm_command, kdu_instance
+ )
import yaml
import tempfile
import binascii
-import base64
from n2vc.config import EnvironConfig
from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
-from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
+from n2vc.kubectl import Kubectl
from .exceptions import MethodNotImplemented
from n2vc.libjuju import Libjuju
from n2vc.utils import obj_to_dict, obj_to_yaml
from n2vc.store import MotorStore
from n2vc.vca.cloud import Cloud
from n2vc.vca.connection import get_connection
-from kubernetes.client.models import (
- V1ClusterRole,
- V1ObjectMeta,
- V1PolicyRule,
- V1ServiceAccount,
- V1ClusterRoleBinding,
- V1RoleRef,
- V1Subject,
-)
-
-from typing import Dict
-
-SERVICE_ACCOUNT_TOKEN_KEY = "token"
-SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
-RBAC_LABEL_KEY_NAME = "rbac-id"
-ADMIN_NAMESPACE = "kube-system"
+
+RBAC_LABEL_KEY_NAME = "rbac-id"
RBAC_STACK_PREFIX = "juju-credential"
libjuju = await self._get_libjuju(kwargs.get("vca_id"))
cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
-
- kubecfg = tempfile.NamedTemporaryFile()
- with open(kubecfg.name, "w") as kubecfg_file:
- kubecfg_file.write(k8s_creds)
- kubectl = Kubectl(config_file=kubecfg.name)
+ kubectl = self._get_kubectl(k8s_creds)
# CREATING RESOURCES IN K8S
rbac_id = generate_rbac_id()
# if it fails in the middle of the process
cleanup_data = []
try:
- self._create_cluster_role(
- kubectl,
+ kubectl.create_cluster_role(
name=metadata_name,
labels=labels,
)
cleanup_data.append(
{
- "delete": self._delete_cluster_role,
- "args": (kubectl, metadata_name),
+ "delete": kubectl.delete_cluster_role,
+ "args": (metadata_name),
}
)
- self._create_service_account(
- kubectl,
+ kubectl.create_service_account(
name=metadata_name,
labels=labels,
)
cleanup_data.append(
{
- "delete": self._delete_service_account,
- "args": (kubectl, metadata_name),
+ "delete": kubectl.delete_service_account,
+ "args": (metadata_name),
}
)
- self._create_cluster_role_binding(
- kubectl,
+ kubectl.create_cluster_role_binding(
name=metadata_name,
labels=labels,
)
cleanup_data.append(
{
- "delete": self._delete_service_account,
- "args": (kubectl, metadata_name),
+ "delete": kubectl.delete_service_account,
+ "args": (metadata_name),
}
)
- token, client_cert_data = await self._get_secret_data(
- kubectl,
+ token, client_cert_data = await kubectl.get_secret_data(
metadata_name,
)
await libjuju.remove_cloud(cluster_uuid)
- kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
+ credentials = self.get_credentials(cluster_uuid=cluster_uuid)
- kubecfg_file = tempfile.NamedTemporaryFile()
- with open(kubecfg_file.name, "w") as f:
- f.write(kubecfg)
- kubectl = Kubectl(config_file=kubecfg_file.name)
+ kubectl = self._get_kubectl(credentials)
delete_functions = [
- self._delete_cluster_role_binding,
- self._delete_service_account,
- self._delete_cluster_role,
+ kubectl.delete_cluster_role_binding,
+ kubectl.delete_service_account,
+ kubectl.delete_cluster_role,
]
credential_attrs = cloud_creds[0].result["attrs"]
if RBAC_LABEL_KEY_NAME in credential_attrs:
rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
- delete_args = (kubectl, metadata_name)
for delete_func in delete_functions:
try:
- delete_func(*delete_args)
+ delete_func(metadata_name)
except Exception as e:
self.log.warning("Cannot remove resource in K8s {}".format(e))
"""Return a list of services of a kdu_instance"""
credentials = self.get_credentials(cluster_uuid=cluster_uuid)
-
- kubecfg = tempfile.NamedTemporaryFile()
- with open(kubecfg.name, "w") as kubecfg_file:
- kubecfg_file.write(credentials)
- kubectl = Kubectl(config_file=kubecfg.name)
-
+ kubectl = self._get_kubectl(credentials)
return kubectl.get_services(
field_selector="metadata.namespace={}".format(kdu_instance)
)
"""Return data for a specific service inside a namespace"""
credentials = self.get_credentials(cluster_uuid=cluster_uuid)
-
- kubecfg = tempfile.NamedTemporaryFile()
- with open(kubecfg.name, "w") as kubecfg_file:
- kubecfg_file.write(credentials)
- kubectl = Kubectl(config_file=kubecfg.name)
-
+ kubectl = self._get_kubectl(credentials)
return kubectl.get_services(
field_selector="metadata.name={},metadata.namespace={}".format(
service_name, namespace
"""
pass
- def _create_cluster_role(
- self,
- kubectl: Kubectl,
- name: str,
- labels: Dict[str, str],
- ):
- cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
- field_selector="metadata.name={}".format(name)
- )
-
- if len(cluster_roles.items) > 0:
- raise Exception(
- "Cluster role with metadata.name={} already exists".format(name)
- )
-
- metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
- # Cluster role
- cluster_role = V1ClusterRole(
- metadata=metadata,
- rules=[
- V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
- V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
- ],
- )
-
- kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
-
- def _delete_cluster_role(self, kubectl: Kubectl, name: str):
- kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
-
- def _create_service_account(
- self,
- kubectl: Kubectl,
- name: str,
- labels: Dict[str, str],
- ):
- service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
- ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
- )
- if len(service_accounts.items) > 0:
- raise Exception(
- "Service account with metadata.name={} already exists".format(name)
- )
-
- metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
- service_account = V1ServiceAccount(metadata=metadata)
-
- kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
- ADMIN_NAMESPACE, service_account
- )
-
- def _delete_service_account(self, kubectl: Kubectl, name: str):
- kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
- name, ADMIN_NAMESPACE
- )
-
- def _create_cluster_role_binding(
- self,
- kubectl: Kubectl,
- name: str,
- labels: Dict[str, str],
- ):
- role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
- field_selector="metadata.name={}".format(name)
- )
- if len(role_bindings.items) > 0:
- raise Exception("Generated rbac id already exists")
-
- role_binding = V1ClusterRoleBinding(
- metadata=V1ObjectMeta(name=name, labels=labels),
- role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
- subjects=[
- V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
- ],
- )
- kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
-
- def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
- kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
-
- async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
- v1_core = kubectl.clients[CORE_CLIENT]
-
- retries_limit = 10
- secret_name = None
- while True:
- retries_limit -= 1
- service_accounts = v1_core.list_namespaced_service_account(
- ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
- )
- if len(service_accounts.items) == 0:
- raise Exception(
- "Service account not found with metadata.name={}".format(name)
- )
- service_account = service_accounts.items[0]
- if service_account.secrets and len(service_account.secrets) > 0:
- secret_name = service_account.secrets[0].name
- if secret_name is not None or not retries_limit:
- break
- if not secret_name:
- raise Exception(
- "Failed getting the secret from service account {}".format(name)
- )
- secret = v1_core.list_namespaced_secret(
- ADMIN_NAMESPACE,
- field_selector="metadata.name={}".format(secret_name),
- ).items[0]
-
- token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
- client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
-
- return (
- base64.b64decode(token).decode("utf-8"),
- base64.b64decode(client_certificate_data).decode("utf-8"),
- )
-
@staticmethod
def generate_kdu_instance_name(**kwargs):
db_dict = kwargs.get("db_dict")
log=self.log,
n2vc=self,
)
+
+ def _get_kubectl(self, credentials: str) -> Kubectl:
+ """
+ Get Kubectl object
+
+ :param: kubeconfig_credentials: Kubeconfig credentials
+ """
+ kubecfg = tempfile.NamedTemporaryFile()
+ with open(kubecfg.name, "w") as kubecfg_file:
+ kubecfg_file.write(credentials)
+ return Kubectl(config_file=kubecfg.name)
# See the License for the specific language governing permissions and
# limitations under the License.
+import base64
import logging
+from typing import Dict
+import typing
+
from kubernetes import client, config
+from kubernetes.client.models import (
+ V1ClusterRole,
+ V1ObjectMeta,
+ V1PolicyRule,
+ V1ServiceAccount,
+ V1ClusterRoleBinding,
+ V1RoleRef,
+ V1Subject,
+)
from kubernetes.client.rest import ApiException
+from retrying_async import retry
+SERVICE_ACCOUNT_TOKEN_KEY = "token"
+SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
+# clients
CORE_CLIENT = "core_v1"
-STORAGE_CLIENT = "storage_v1"
RBAC_CLIENT = "rbac_v1"
+STORAGE_CLIENT = "storage_v1"
class Kubectl:
def __init__(self, config_file=None):
config.load_kube_config(config_file=config_file)
self._clients = {
- "core_v1": client.CoreV1Api(),
- "storage_v1": client.StorageV1Api(),
- "rbac_v1": client.RbacAuthorizationV1Api(),
+ CORE_CLIENT: client.CoreV1Api(),
+ RBAC_CLIENT: client.RbacAuthorizationV1Api(),
+ STORAGE_CLIENT: client.StorageV1Api(),
}
- self._configuration = config.kube_config.Configuration()
+ self._configuration = config.kube_config.Configuration.get_default_copy()
self.logger = logging.getLogger("Kubectl")
@property
def clients(self):
return self._clients
- def get_services(self, field_selector=None, label_selector=None):
+ def get_services(
+ self,
+ field_selector: str = None,
+ label_selector: str = None,
+ ) -> typing.List[typing.Dict]:
+ """
+ Get Service list from a namespace
+
+ :param: field_selector: Kubernetes field selector for the namespace
+ :param: label_selector: Kubernetes label selector for the namespace
+
+ :return: List of the services matching the selectors specified
+ """
kwargs = {}
if field_selector:
kwargs["field_selector"] = field_selector
selected_sc = sc.metadata.name
break
return selected_sc
+
+ def create_cluster_role(
+ self,
+ name: str,
+ labels: Dict[str, str],
+ namespace: str = "kube-system",
+ ):
+ """
+ Create a cluster role
+
+ :param: name: Name of the cluster role
+ :param: labels: Labels for cluster role metadata
+ :param: namespace: Kubernetes namespace for cluster role metadata
+ Default: kube-system
+ """
+ cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
+ field_selector="metadata.name={}".format(name)
+ )
+
+ if len(cluster_roles.items) > 0:
+ raise Exception(
+ "Cluster role with metadata.name={} already exists".format(name)
+ )
+
+ metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+ # Cluster role
+ cluster_role = V1ClusterRole(
+ metadata=metadata,
+ rules=[
+ V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
+ V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
+ ],
+ )
+
+ self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
+
+ def delete_cluster_role(self, name: str):
+ """
+ Delete a cluster role
+
+ :param: name: Name of the cluster role
+ """
+ self.clients[RBAC_CLIENT].delete_cluster_role(name)
+
+ def create_service_account(
+ self,
+ name: str,
+ labels: Dict[str, str],
+ namespace: str = "kube-system",
+ ):
+ """
+ Create a service account
+
+ :param: name: Name of the service account
+ :param: labels: Labels for service account metadata
+ :param: namespace: Kubernetes namespace for service account metadata
+ Default: kube-system
+ """
+ service_accounts = self.clients[CORE_CLIENT].list_namespaced_service_account(
+ namespace, field_selector="metadata.name={}".format(name)
+ )
+ if len(service_accounts.items) > 0:
+ raise Exception(
+ "Service account with metadata.name={} already exists".format(name)
+ )
+
+ metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+ service_account = V1ServiceAccount(metadata=metadata)
+
+ self.clients[CORE_CLIENT].create_namespaced_service_account(
+ namespace, service_account
+ )
+
+ def delete_service_account(self, name: str, namespace: str = "kube-system"):
+ """
+ Delete a service account
+
+ :param: name: Name of the service account
+ :param: namespace: Kubernetes namespace for service account metadata
+ Default: kube-system
+ """
+ self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
+
+ def create_cluster_role_binding(
+ self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
+ ):
+ """
+ Create a cluster role binding
+
+ :param: name: Name of the cluster role
+ :param: labels: Labels for cluster role binding metadata
+ :param: namespace: Kubernetes namespace for cluster role binding metadata
+ Default: kube-system
+ """
+ role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
+ field_selector="metadata.name={}".format(name)
+ )
+ if len(role_bindings.items) > 0:
+ raise Exception("Generated rbac id already exists")
+
+ role_binding = V1ClusterRoleBinding(
+ metadata=V1ObjectMeta(name=name, labels=labels),
+ role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
+ subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)],
+ )
+ self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
+
+ def delete_cluster_role_binding(self, name: str):
+ """
+ Delete a cluster role binding
+
+ :param: name: Name of the cluster role binding
+ """
+ self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed getting the secret from service account"),
+ )
+ async def get_secret_data(
+ self, name: str, namespace: str = "kube-system"
+ ) -> (str, str):
+ """
+ Get secret data
+
+ :param: name: Name of the secret data
+ :param: namespace: Name of the namespace where the secret is stored
+
+ :return: Tuple with the token and client certificate
+ """
+ v1_core = self.clients[CORE_CLIENT]
+
+ secret_name = None
+
+ service_accounts = v1_core.list_namespaced_service_account(
+ namespace, field_selector="metadata.name={}".format(name)
+ )
+ if len(service_accounts.items) == 0:
+ raise Exception(
+ "Service account not found with metadata.name={}".format(name)
+ )
+ service_account = service_accounts.items[0]
+ if service_account.secrets and len(service_account.secrets) > 0:
+ secret_name = service_account.secrets[0].name
+ if not secret_name:
+ raise Exception(
+ "Failed getting the secret from service account {}".format(name)
+ )
+ secret = v1_core.list_namespaced_secret(
+ namespace, field_selector="metadata.name={}".format(secret_name)
+ ).items[0]
+
+ token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
+ client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
+
+ return (
+ base64.b64decode(token).decode("utf-8"),
+ base64.b64decode(client_certificate_data).decode("utf-8"),
+ )
db_dict: dict = None,
progress_timeout: float = None,
total_timeout: float = None,
- series: str = "bionic",
+ series: str = "focal",
wait: bool = True,
) -> (Machine, bool):
"""
controller = await self.get_controller()
model = await self.get_model(controller, model_name)
try:
- await model.deploy(uri)
+ await model.deploy(uri, trust=True)
if wait:
await JujuModelWatcher.wait_for_model(model, timeout=timeout)
self.log.debug("All units active in model {}".format(model_name))
await self.disconnect_model(model)
await self.disconnect_controller(controller)
- async def destroy_model(self, model_name: str, total_timeout: float):
+ async def destroy_model(self, model_name: str, total_timeout: float = 1800):
"""
Destroy model
if not await self.model_exists(model_name, controller=controller):
return
- model = await self.get_model(controller, model_name)
self.log.debug("Destroying model {}".format(model_name))
- uuid = model.info.uuid
+ model = await self.get_model(controller, model_name)
# Destroy machines that are manually provisioned
# and still are in pending state
await self._destroy_pending_machines(model, only_manual=True)
-
- # Disconnect model
await self.disconnect_model(model)
- await controller.destroy_model(uuid, force=True, max_wait=0)
+ await self._destroy_model(
+ model_name,
+ controller,
+ timeout=total_timeout,
+ )
+ finally:
+ if model:
+ await self.disconnect_model(model)
+ await self.disconnect_controller(controller)
- # Wait until model is destroyed
- self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
+ async def _destroy_model(
+ self, model_name: str, controller: Controller, timeout: float = 1800
+ ):
+ """
+ Destroy model from controller
- if total_timeout is None:
- total_timeout = 3600
- end = time.time() + total_timeout
- while time.time() < end:
- models = await controller.list_models()
- if model_name not in models:
- self.log.debug(
- "The model {} ({}) was destroyed".format(model_name, uuid)
- )
- return
+ :param: model: Model name to be removed
+ :param: controller: Controller object
+ :param: timeout: Timeout in seconds
+ """
+
+ async def _destroy_model_loop(model_name: str, controller: Controller):
+ while await self.model_exists(model_name, controller=controller):
+ await controller.destroy_model(
+ model_name, destroy_storage=True, force=True, max_wait=0
+ )
await asyncio.sleep(5)
+
+ try:
+ await asyncio.wait_for(
+ _destroy_model_loop(model_name, controller), timeout=timeout
+ )
+ except asyncio.TimeoutError:
raise Exception(
"Timeout waiting for model {} to be destroyed".format(model_name)
)
- except Exception as e:
- if model:
- await self.disconnect_model(model)
- raise e
- finally:
- await self.disconnect_controller(controller)
async def destroy_application(
self, model_name: str, application_name: str, total_timeout: float
if not include_path:
i = filename.rfind("/")
if i > 0:
- filename = filename[i + 1:]
+ filename = filename[i + 1 :]
# datetime
dt = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
artifact_path = artifact_path.replace("//", "/")
# check charm path
- if not self.fs.file_exists(artifact_path, mode="dir"):
+ if not self.fs.file_exists(artifact_path):
msg = "artifact path does not exist: {}".format(artifact_path)
raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
artifact_path = artifact_path.replace("//", "/")
# check charm path
- if not self.fs.file_exists(artifact_path, mode="dir"):
+ if not self.fs.file_exists(artifact_path):
msg = "artifact path does not exist: {}".format(artifact_path)
raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
@patch("n2vc.k8s_helm_base_conn.EnvironConfig")
async def setUp(self, mock_env):
- mock_env.return_value = {
- "stablerepourl": "https://charts.helm.sh/stable"
- }
+ mock_env.return_value = {"stablerepourl": "https://charts.helm.sh/stable"}
self.db = Mock(DbMemory())
self.fs = asynctest.Mock(FsLocal())
self.fs.path = "./tmp/"
),
)
- repo_update_command = "/usr/bin/helm3 repo update"
- repo_add_command = "/usr/bin/helm3 repo add {} {}".format(repo_name, repo_url)
+ repo_update_command = "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 repo update"
+ repo_add_command = (
+ "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 repo add {} {}"
+ ).format(repo_name, repo_url)
calls = self.helm_conn._local_async_exec.call_args_list
call0_kargs = calls[0][1]
self.assertEqual(
self.helm_conn.fs.reverse_sync.assert_called_once_with(
from_path=self.cluster_id
)
- command = "/usr/bin/helm3 repo list --output yaml"
+ command = "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 repo list --output yaml"
self.helm_conn._local_async_exec.assert_called_with(
command=command, env=self.env, raise_exception_on_error=False
)
self.helm_conn.fs.reverse_sync.assert_called_once_with(
from_path=self.cluster_id
)
- command = "/usr/bin/helm3 repo remove {}".format(repo_name)
+ command = "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 repo remove {}".format(
+ repo_name
+ )
self.helm_conn._local_async_exec.assert_called_with(
command=command, env=self.env, raise_exception_on_error=True
)
self.kdu_instance = "stable-openldap-0005399828"
self.helm_conn.generate_kdu_instance_name = Mock(return_value=self.kdu_instance)
self.helm_conn._get_namespaces = asynctest.CoroutineMock(return_value=[])
- self.helm_conn._namespace_exists = asynctest.CoroutineMock(side_effect=self.helm_conn._namespace_exists)
+ self.helm_conn._namespace_exists = asynctest.CoroutineMock(
+ side_effect=self.helm_conn._namespace_exists
+ )
self.helm_conn._create_namespace = asynctest.CoroutineMock()
await self.helm_conn.install(
check_every=0,
)
command = (
- "/usr/bin/helm3 install stable-openldap-0005399828 --atomic --output yaml "
+ "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 "
+ "install stable-openldap-0005399828 --atomic --output yaml "
"--timeout 300s --namespace testk8s stable/openldap --version 1.2.2"
)
self.helm_conn._local_async_exec.assert_called_once_with(
async def test_namespace_exists(self):
self.helm_conn._get_namespaces = asynctest.CoroutineMock()
- self.helm_conn._get_namespaces.return_value = ['testk8s', 'kube-system']
+ self.helm_conn._get_namespaces.return_value = ["testk8s", "kube-system"]
result = await self.helm_conn._namespace_exists(self.cluster_id, self.namespace)
self.helm_conn._get_namespaces.assert_called_once()
self.assertEqual(result, True)
self.helm_conn._get_namespaces.reset_mock()
- result = await self.helm_conn._namespace_exists(self.cluster_id, 'none-exists-namespace')
+ result = await self.helm_conn._namespace_exists(
+ self.cluster_id, "none-exists-namespace"
+ )
self.helm_conn._get_namespaces.assert_called_once()
self.assertEqual(result, False)
await self.helm_conn.upgrade(
self.cluster_uuid, kdu_instance, kdu_model, atomic=True, db_dict=db_dict
)
- self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id)
self.helm_conn.fs.reverse_sync.assert_called_once_with(
from_path=self.cluster_id
)
check_every=0,
)
command = (
+ "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config "
"/usr/bin/helm3 upgrade stable-openldap-0005399828 stable/openldap "
- "--namespace testk8s --atomic --output yaml --timeout 300s "
+ "--namespace testk8s --atomic --output yaml --timeout 300s "
"--version 1.2.3"
)
self.helm_conn._local_async_exec.assert_called_once_with(
await self.helm_conn.rollback(
self.cluster_uuid, kdu_instance=kdu_instance, revision=1, db_dict=db_dict
)
- self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id)
self.helm_conn.fs.reverse_sync.assert_called_once_with(
from_path=self.cluster_id
)
run_once=True,
check_every=0,
)
- command = "/usr/bin/helm3 rollback stable-openldap-0005399828 1 --namespace=testk8s --wait"
+ command = (
+ "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 "
+ "rollback stable-openldap-0005399828 1 --namespace=testk8s --wait"
+ )
self.helm_conn._local_async_exec.assert_called_once_with(
command=command, env=self.env, raise_exception_on_error=False
)
)
await self.helm_conn.uninstall(self.cluster_uuid, kdu_instance)
- self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id)
self.helm_conn.fs.reverse_sync.assert_called_once_with(
from_path=self.cluster_id
)
- command = "/usr/bin/helm3 uninstall {} --namespace={}".format(
- kdu_instance, self.namespace
- )
+ command = (
+ "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 uninstall {} --namespace={}"
+ ).format(kdu_instance, self.namespace)
self.helm_conn._local_async_exec.assert_called_once_with(
command=command, env=self.env, raise_exception_on_error=True
)
from_path=self.cluster_id
)
self.helm_conn._parse_services.assert_called_once()
- command1 = "/usr/bin/helm3 get manifest {} --namespace=testk8s".format(
- kdu_instance
- )
+ command1 = (
+ "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 get manifest {} --namespace=testk8s"
+ ).format(kdu_instance)
command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace)
self.helm_conn._local_async_exec_pipe.assert_called_once_with(
command1, command2, env=self.env, raise_exception_on_error=True
await self.helm_conn._status_kdu(
self.cluster_id, kdu_instance, self.namespace, return_text=True
)
- command = "/usr/bin/helm3 status {} --namespace={} --output yaml".format(
- kdu_instance, self.namespace
- )
+ command = (
+ "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 status {} --namespace={} --output yaml"
+ ).format(kdu_instance, self.namespace)
self.helm_conn._local_async_exec.assert_called_once_with(
command=command,
env=self.env,
),
)
- repo_update_command = "/usr/bin/helm repo update"
- repo_add_command = "/usr/bin/helm repo add {} {}".format(repo_name, repo_url)
+ repo_update_command = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm repo update"
+ repo_add_command = (
+ "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm repo add {} {}"
+ ).format(repo_name, repo_url)
calls = self.helm_conn._local_async_exec.call_args_list
call0_kargs = calls[0][1]
self.assertEqual(
self.helm_conn.fs.reverse_sync.assert_called_once_with(
from_path=self.cluster_id
)
- command = "/usr/bin/helm repo list --output yaml"
+ command = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm repo list --output yaml"
self.helm_conn._local_async_exec.assert_called_with(
command=command, env=self.env, raise_exception_on_error=False
)
self.helm_conn.fs.reverse_sync.assert_called_once_with(
from_path=self.cluster_id
)
- command = "/usr/bin/helm repo remove {}".format(repo_name)
+ command = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm repo remove {}".format(
+ repo_name
+ )
self.helm_conn._local_async_exec.assert_called_once_with(
command=command, env=self.env, raise_exception_on_error=True
)
check_every=0,
)
command = (
- "/usr/bin/helm install --atomic --output yaml --timeout 300 "
+ "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm install "
+ "--atomic --output yaml --timeout 300 "
"--name=stable-openldap-0005399828 --namespace testk8s stable/openldap "
"--version 1.2.2"
)
await self.helm_conn.upgrade(
self.cluster_uuid, kdu_instance, kdu_model, atomic=True, db_dict=db_dict
)
- self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id)
self.helm_conn.fs.reverse_sync.assert_called_once_with(
from_path=self.cluster_id
)
check_every=0,
)
command = (
- "/usr/bin/helm upgrade --atomic --output yaml --timeout 300 "
- "stable-openldap-0005399828 stable/openldap --version 1.2.3"
+ "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm upgrade "
+ "--atomic --output yaml --timeout 300 stable-openldap-0005399828 stable/openldap --version 1.2.3"
)
self.helm_conn._local_async_exec.assert_called_once_with(
command=command, env=self.env, raise_exception_on_error=False
await self.helm_conn.rollback(
self.cluster_uuid, kdu_instance=kdu_instance, revision=1, db_dict=db_dict
)
- self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id)
self.helm_conn.fs.reverse_sync.assert_called_once_with(
from_path=self.cluster_id
)
run_once=True,
check_every=0,
)
- command = "/usr/bin/helm rollback stable-openldap-0005399828 1 --wait"
+ command = (
+ "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config "
+ "/usr/bin/helm rollback stable-openldap-0005399828 1 --wait"
+ )
self.helm_conn._local_async_exec.assert_called_once_with(
command=command, env=self.env, raise_exception_on_error=False
)
)
await self.helm_conn.uninstall(self.cluster_uuid, kdu_instance)
- self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
+ self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id)
self.helm_conn.fs.reverse_sync.assert_called_once_with(
from_path=self.cluster_id
)
- command = "/usr/bin/helm delete --purge {}".format(kdu_instance)
+ command = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm delete --purge {}".format(
+ kdu_instance
+ )
self.helm_conn._local_async_exec.assert_called_once_with(
command=command, env=self.env, raise_exception_on_error=True
)
from_path=self.cluster_id
)
self.helm_conn._parse_services.assert_called_once()
- command1 = "/usr/bin/helm get manifest {} ".format(kdu_instance)
+ command1 = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm get manifest {} ".format(
+ kdu_instance
+ )
command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace)
self.helm_conn._local_async_exec_pipe.assert_called_once_with(
command1, command2, env=self.env, raise_exception_on_error=True
await self.helm_conn._status_kdu(
self.cluster_id, kdu_instance, self.namespace, return_text=True
)
- command = "/usr/bin/helm status {} --output yaml".format(kdu_instance)
+ command = (
+ "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm status {} --output yaml"
+ ).format(kdu_instance)
self.helm_conn._local_async_exec.assert_called_once_with(
command=command,
env=self.env,
)
self.k8s_juju_conn._store.get_vca_id.return_value = None
self.k8s_juju_conn.libjuju = Mock()
+ # Mock Kubectl
+ self.kubectl = Mock()
+ self.kubectl.get_secret_data = AsyncMock()
+ self.kubectl.get_secret_data.return_value = ("token", "cacert")
+ self.kubectl.get_services.return_value = [{}]
+ self.k8s_juju_conn._get_kubectl = Mock()
+ self.k8s_juju_conn._get_kubectl.return_value = self.kubectl
-@asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
class InitEnvTest(K8sJujuConnTestCase):
def setUp(self):
super(InitEnvTest, self).setUp()
self.k8s_juju_conn.libjuju.add_k8s = AsyncMock()
- self.k8s_juju_conn._create_cluster_role = Mock()
- self.k8s_juju_conn._create_service_account = Mock()
- self.k8s_juju_conn._create_cluster_role_binding = Mock()
- self.k8s_juju_conn._delete_cluster_role = Mock()
- self.k8s_juju_conn._delete_service_account = Mock()
- self.k8s_juju_conn._delete_cluster_role_binding = Mock()
- self.k8s_juju_conn._get_secret_data = AsyncMock()
- self.k8s_juju_conn._get_secret_data.return_value = ("token", "cacert")
def test_with_cluster_uuid(
self,
- mock_get_default_storage_class,
):
reuse_cluster_uuid = "uuid"
uuid, created = self.loop.run_until_complete(
self.assertTrue(created)
self.assertEqual(uuid, reuse_cluster_uuid)
- mock_get_default_storage_class.assert_called_once()
+ self.kubectl.get_default_storage_class.assert_called_once()
self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
def test_with_no_cluster_uuid(
self,
- mock_get_default_storage_class,
):
uuid, created = self.loop.run_until_complete(
self.k8s_juju_conn.init_env(k8s_creds=kubeconfig)
self.assertTrue(created)
self.assertTrue(isinstance(uuid, str))
- mock_get_default_storage_class.assert_called_once()
+ self.kubectl.get_default_storage_class.assert_called_once()
self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
def test_init_env_exception(
self,
- mock_get_default_storage_class,
):
self.k8s_juju_conn.libjuju.add_k8s.side_effect = Exception()
created = None
self.assertIsNone(created)
self.assertIsNone(uuid)
- mock_get_default_storage_class.assert_called_once()
self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
cloud_creds = Mock()
cloud_creds.result = {"attrs": {RBAC_LABEL_KEY_NAME: "asd"}}
self.k8s_juju_conn.libjuju.get_cloud_credentials.return_value = [cloud_creds]
- self.k8s_juju_conn._delete_cluster_role_binding = Mock()
- self.k8s_juju_conn._delete_service_account = Mock()
- self.k8s_juju_conn._delete_cluster_role = Mock()
self.k8s_juju_conn.get_credentials = Mock()
self.k8s_juju_conn.get_credentials.return_value = kubeconfig
def setUp(self):
super(GetServicesTest, self).setUp()
- @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_services")
@asynctest.mock.patch("n2vc.k8s_juju_conn.K8sJujuConnector.get_credentials")
- def test_success(self, mock_get_credentials, mock_get_services):
+ def test_success(self, mock_get_credentials):
mock_get_credentials.return_value = kubeconfig
self.loop.run_until_complete(self.k8s_juju_conn.get_services("", "", ""))
mock_get_credentials.assert_called_once()
- mock_get_services.assert_called_once()
+ self.kubectl.get_services.assert_called_once()
class GetServiceTest(K8sJujuConnTestCase):
def setUp(self):
super(GetServiceTest, self).setUp()
- @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_services")
@asynctest.mock.patch("n2vc.k8s_juju_conn.K8sJujuConnector.get_credentials")
- def test_success(self, mock_get_credentials, mock_get_services):
+ def test_success(self, mock_get_credentials):
mock_get_credentials.return_value = kubeconfig
self.loop.run_until_complete(self.k8s_juju_conn.get_service("", "", ""))
mock_get_credentials.assert_called_once()
- mock_get_services.assert_called_once()
+ self.kubectl.get_services.assert_called_once()
class GetCredentialsTest(K8sJujuConnTestCase):
self.kubectl.get_services()
-@mock.patch("kubernetes.config.kube_config.Configuration")
+@mock.patch("n2vc.kubectl.client")
+@mock.patch("n2vc.kubectl.config.kube_config.Configuration.get_default_copy")
@mock.patch("n2vc.kubectl.config.load_kube_config")
class GetConfiguration(KubectlTestCase):
def setUp(self):
super(GetConfiguration, self).setUp()
- def test_get_configuration(self, mock_load_kube_config, mock_configuration):
+ def test_get_configuration(
+ self,
+ mock_load_kube_config,
+ mock_configuration,
+ mock_client,
+ ):
kubectl = Kubectl()
kubectl.configuration
mock_configuration.assert_called_once()
+ mock_load_kube_config.assert_called_once()
+ mock_client.CoreV1Api.assert_called_once()
+ mock_client.RbacAuthorizationV1Api.assert_called_once()
+ mock_client.StorageV1Api.assert_called_once()
@mock.patch("kubernetes.client.StorageV1Api.list_storage_class")
"user": envs["user"],
"secret": envs["secret"],
"cacert": envs["cacert"],
- "pubkey": envs["pubkey"],
- "lxd-cloud": envs["cloud"],
- "lxd-credentials": envs.get("credentials", envs["cloud"]),
- "k8s-cloud": envs["k8s_cloud"],
- "k8s-credentials": envs.get("k8s_credentials", envs["k8s_cloud"]),
+ "pubkey": envs.get("pubkey"),
+ "lxd-cloud": envs.get("cloud"),
+ "lxd-credentials": envs.get("credentials", envs.get("cloud")),
+ "k8s-cloud": envs.get("k8s_cloud"),
+ "k8s-credentials": envs.get("k8s_credentials", envs.get("k8s_cloud")),
"model-config": ModelConfig(envs),
"api-proxy": envs.get("api_proxy", None),
}
deps = black
skip_install = true
commands =
- - black --check --diff n2vc/
- - black --check --diff setup.py
+ black --check --diff n2vc/
+ black --check --diff setup.py
#######################################################################################
deps = {[testenv]deps}
-r{toxinidir}/requirements-dev.txt
-r{toxinidir}/requirements-test.txt
- pylint
+ pylint==2.10.2
commands =
pylint -E n2vc
W503,
E123,
E125,
+ E203,
E226,
E241
exclude =