@patch("n2vc.k8s_helm_base_conn.EnvironConfig")
async def setUp(self, mock_env):
@patch("n2vc.k8s_helm_base_conn.EnvironConfig")
async def setUp(self, mock_env):
self.db = Mock(DbMemory())
self.fs = asynctest.Mock(FsLocal())
self.fs.path = "./tmp/"
self.namespace = "testk8s"
self.cluster_id = "helm3_cluster_id"
self.db = Mock(DbMemory())
self.fs = asynctest.Mock(FsLocal())
self.fs.path = "./tmp/"
self.namespace = "testk8s"
self.cluster_id = "helm3_cluster_id"
# pass fake kubectl and helm commands to make sure it does not call actual commands
K8sHelm3Connector._check_file_exists = asynctest.Mock(return_value=True)
cluster_dir = self.fs.path + self.cluster_id
# pass fake kubectl and helm commands to make sure it does not call actual commands
K8sHelm3Connector._check_file_exists = asynctest.Mock(return_value=True)
cluster_dir = self.fs.path + self.cluster_id
)
self.helm_conn._get_namespaces.assert_called_once_with(self.cluster_id)
self.helm_conn._create_namespace.assert_called_once_with(
)
self.helm_conn._get_namespaces.assert_called_once_with(self.cluster_id)
self.helm_conn._create_namespace.assert_called_once_with(
- repo_update_command = "/usr/bin/helm3 repo update"
- repo_add_command = "/usr/bin/helm3 repo add {} {}".format(repo_name, repo_url)
+ repo_update_command = (
+ "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 repo update {}"
+ ).format(repo_name)
+ repo_add_command = (
+ "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 repo add {} {}"
+ ).format(repo_name, repo_url)
calls = self.helm_conn._local_async_exec.call_args_list
call0_kargs = calls[0][1]
self.assertEqual(
call0_kargs.get("command"),
calls = self.helm_conn._local_async_exec.call_args_list
call0_kargs = calls[0][1]
self.assertEqual(
call0_kargs.get("command"),
)
call1_kargs = calls[1][1]
self.assertEqual(
call1_kargs.get("command"),
)
call1_kargs = calls[1][1]
self.assertEqual(
call1_kargs.get("command"),
self.helm_conn._local_async_exec.assert_called_with(
command=command, env=self.env, raise_exception_on_error=False
)
self.helm_conn._local_async_exec.assert_called_with(
command=command, env=self.env, raise_exception_on_error=False
)
self.helm_conn._local_async_exec.assert_called_with(
command=command, env=self.env, raise_exception_on_error=True
)
self.helm_conn._local_async_exec.assert_called_with(
command=command, env=self.env, raise_exception_on_error=True
)
self.kdu_instance = "stable-openldap-0005399828"
self.helm_conn.generate_kdu_instance_name = Mock(return_value=self.kdu_instance)
self.helm_conn._get_namespaces = asynctest.CoroutineMock(return_value=[])
self.kdu_instance = "stable-openldap-0005399828"
self.helm_conn.generate_kdu_instance_name = Mock(return_value=self.kdu_instance)
self.helm_conn._get_namespaces = asynctest.CoroutineMock(return_value=[])
"--timeout 300s --namespace testk8s stable/openldap --version 1.2.2"
)
self.helm_conn._local_async_exec.assert_called_once_with(
"--timeout 300s --namespace testk8s stable/openldap --version 1.2.2"
)
self.helm_conn._local_async_exec.assert_called_once_with(
result = await self.helm_conn._namespace_exists(self.cluster_id, self.namespace)
self.helm_conn._get_namespaces.assert_called_once()
self.assertEqual(result, True)
self.helm_conn._get_namespaces.reset_mock()
result = await self.helm_conn._namespace_exists(self.cluster_id, self.namespace)
self.helm_conn._get_namespaces.assert_called_once()
self.assertEqual(result, True)
self.helm_conn._get_namespaces.reset_mock()
self.helm_conn._get_namespaces.assert_called_once()
self.assertEqual(result, False)
self.helm_conn._get_namespaces.assert_called_once()
self.assertEqual(result, False)
await self.helm_conn.upgrade(
self.cluster_uuid, kdu_instance, kdu_model, atomic=True, db_dict=db_dict
)
await self.helm_conn.upgrade(
self.cluster_uuid, kdu_instance, kdu_model, atomic=True, db_dict=db_dict
)
await self.helm_conn.rollback(
self.cluster_uuid, kdu_instance=kdu_instance, revision=1, db_dict=db_dict
)
await self.helm_conn.rollback(
self.cluster_uuid, kdu_instance=kdu_instance, revision=1, db_dict=db_dict
)
self.helm_conn._local_async_exec.assert_called_once_with(
command=command, env=self.env, raise_exception_on_error=False
)
self.helm_conn._local_async_exec.assert_called_once_with(
command=command, env=self.env, raise_exception_on_error=False
)
- command = "/usr/bin/helm3 uninstall {} --namespace={}".format(
- kdu_instance, self.namespace
- )
+ command = (
+ "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config /usr/bin/helm3 uninstall {} --namespace={}"
+ ).format(kdu_instance, self.namespace)
self.helm_conn._local_async_exec.assert_called_once_with(
command=command, env=self.env, raise_exception_on_error=True
)
self.helm_conn._local_async_exec.assert_called_once_with(
command=command, env=self.env, raise_exception_on_error=True
)
command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace)
self.helm_conn._local_async_exec_pipe.assert_called_once_with(
command1, command2, env=self.env, raise_exception_on_error=True
command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace)
self.helm_conn._local_async_exec_pipe.assert_called_once_with(
command1, command2, env=self.env, raise_exception_on_error=True
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
await self.helm_conn._status_kdu(
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
await self.helm_conn._status_kdu(
- self.cluster_id, kdu_instance, self.namespace, return_text=True
- )
- command = "/usr/bin/helm3 status {} --namespace={} --output yaml".format(
- kdu_instance, self.namespace
+ self.cluster_id, kdu_instance, self.namespace, yaml_format=True
cluster_id=self.cluster_id,
kdu_instance=kdu_instance,
namespace=self.namespace,
cluster_id=self.cluster_id,
kdu_instance=kdu_instance,
namespace=self.namespace,
self.helm_conn._uninstall_sw = asynctest.CoroutineMock()
self.helm_conn.instances_list = asynctest.CoroutineMock(return_value=instances)
self.helm_conn.uninstall = asynctest.CoroutineMock()
self.helm_conn._uninstall_sw = asynctest.CoroutineMock()
self.helm_conn.instances_list = asynctest.CoroutineMock(return_value=instances)
self.helm_conn.uninstall = asynctest.CoroutineMock()
self.helm_conn.fs.file_delete.assert_called_once_with(
self.cluster_id, ignore_non_exist=True
)
self.helm_conn.fs.file_delete.assert_called_once_with(
self.cluster_id, ignore_non_exist=True
)
cluster_uuid=self.cluster_uuid, kdu_instance=kdu_instance
)
self.helm_conn._uninstall_sw.assert_called_once_with(
cluster_uuid=self.cluster_uuid, kdu_instance=kdu_instance
)
self.helm_conn._uninstall_sw.assert_called_once_with(