@asynctest.fail_on(active_handles=True)
async def test_repo_list(self):
-
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
await self.helm_conn.repo_list(self.cluster_uuid)
@asynctest.fail_on(active_handles=True)
async def test_repo_remove(self):
-
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
repo_name = "bitnami"
await self.helm_conn.repo_remove(self.cluster_uuid, repo_name)
self.helm_conn.get_instance_info = asynctest.CoroutineMock(
return_value=instance_info
)
+ # TEST-1 (--force true)
+ await self.helm_conn.upgrade(
+ self.cluster_uuid,
+ kdu_instance,
+ kdu_model,
+ atomic=True,
+ db_dict=db_dict,
+ force=True,
+ )
+ self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_has_calls(
+ [
+ asynctest.call(from_path=self.cluster_id),
+ asynctest.call(from_path=self.cluster_id),
+ ]
+ )
+ self.helm_conn._store_status.assert_called_with(
+ cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace=self.namespace,
+ db_dict=db_dict,
+ operation="upgrade",
+ )
+ command = (
+ "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config "
+ "/usr/bin/helm3 upgrade stable-openldap-0005399828 stable/openldap "
+ "--namespace testk8s --atomic --force --output yaml --timeout 300s "
+ "--reuse-values --version 1.2.3"
+ )
+ self.helm_conn._local_async_exec.assert_called_with(
+ command=command, env=self.env, raise_exception_on_error=False
+ )
+ # TEST-2 (--force false)
await self.helm_conn.upgrade(
- self.cluster_uuid, kdu_instance, kdu_model, atomic=True, db_dict=db_dict
+ self.cluster_uuid,
+ kdu_instance,
+ kdu_model,
+ atomic=True,
+ db_dict=db_dict,
)
self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id)
self.helm_conn.fs.reverse_sync.assert_has_calls(
command=command, env=self.env, raise_exception_on_error=False
)
+ @asynctest.fail_on(active_handles=True)
+ async def test_upgrade_namespace(self):
+ kdu_model = "stable/openldap:1.2.3"
+ kdu_instance = "stable-openldap-0005399828"
+ db_dict = {}
+ instance_info = {
+ "chart": "openldap-1.2.2",
+ "name": kdu_instance,
+ "namespace": self.namespace,
+ "revision": 1,
+ "status": "DEPLOYED",
+ }
+ self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
+ self.helm_conn._store_status = asynctest.CoroutineMock()
+ self.helm_conn.get_instance_info = asynctest.CoroutineMock(
+ return_value=instance_info
+ )
+
+ await self.helm_conn.upgrade(
+ self.cluster_uuid,
+ kdu_instance,
+ kdu_model,
+ atomic=True,
+ db_dict=db_dict,
+ namespace="default",
+ )
+ self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id)
+ self.helm_conn.fs.reverse_sync.assert_has_calls(
+ [
+ asynctest.call(from_path=self.cluster_id),
+ asynctest.call(from_path=self.cluster_id),
+ ]
+ )
+ self.helm_conn._store_status.assert_called_with(
+ cluster_id=self.cluster_id,
+ kdu_instance=kdu_instance,
+ namespace="default",
+ db_dict=db_dict,
+ operation="upgrade",
+ )
+ command = (
+ "env KUBECONFIG=./tmp/helm3_cluster_id/.kube/config "
+ "/usr/bin/helm3 upgrade stable-openldap-0005399828 stable/openldap "
+ "--namespace default --atomic --output yaml --timeout 300s "
+ "--reuse-values --version 1.2.3"
+ )
+ self.helm_conn._local_async_exec.assert_called_with(
+ command=command, env=self.env, raise_exception_on_error=False
+ )
+
@asynctest.fail_on(active_handles=True)
async def test_scale(self):
kdu_model = "stable/openldap:1.2.3"