Feature 9751: Centralized VCA for KNFs
- Use libjuju.py for the communication with VCA
- Add the k8s_cloud as an external cloud to the VCA
- Add unit tests
Change-Id: Id55bfada3957f35e13cef7b4bfcc7acb72452df0
Signed-off-by: David Garcia <david.garcia@canonical.com>
diff --git a/n2vc/tests/unit/utils.py b/n2vc/tests/unit/utils.py
index d960c70..ac86cdd 100644
--- a/n2vc/tests/unit/utils.py
+++ b/n2vc/tests/unit/utils.py
@@ -19,6 +19,50 @@
from unittest.mock import MagicMock
+kubeconfig = """apiVersion: v1
+clusters:
+- cluster:
+ certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1\
+ JSURBVENDQWVtZ0F3SUJBZ0lKQUxjMk9xVUpwcnVCTUEwR0NTcUdTSWIzRFFFQk\
+ N3VUFNQmN4RlRBVEJnTlYKQkFNTURERXdMakUxTWk0eE9ETXVNVEFlRncweU1EQ\
+ TVNVEV4TkRJeU16VmFGdzB6TURBNU1Ea3hOREl5TXpWYQpNQmN4RlRBVEJnTlZC\
+ QU1NRERFd0xqRTFNaTR4T0RNdU1UQ0NBU0l3RFFZSktvWklodmNOQVFFQkJRQUR\
+ nZ0VQCkFEQ0NBUW9DZ2dFQkFNV0tyQkdxWlJRT0VONDExR2RESmY2ckZWRDcvMU\
+ xHNlZMWjNhd1BRdHBhRTRxdVdyNisKWjExTWwra2kwVEU1cGZFV3dKenVUZXlCU\
+ XVkUEpnYm1QTjF1VWROdGRiNlpocHEzeC9oT0hCMVJLNC9iSlNFUgpiZ0dITmN6\
+ MzR6SHRaZ1dwb2NPTXpPOW9oRUdhMTZUaDhmQWVxYU1CQTJRaklmeUFlaVp3VHJ\
+ nZ3BrY2dBMUlOCjBvQkdqSURnSGVoSU5tbGZOOURkQ3hNN1FNTmtSbzRXdE13bF\
+ JSRWZ4QnFiVkNpZGFjbVhhb1VPUjJPeFVmQWEKN1orSUU1TmN5ZFQ1TGovazdwd\
+ XZCVkdIa0JQWnE0TmlBa3R4aXd5NVB5R29GTk9mT0NrV2I2VnBzVzNhTlNJeAo4\
+ aXBITkc3enV3elc1TGQ5TkhQYWpRckZwdFZBSHpJNWNhRUNBd0VBQWFOUU1FNHd\
+ IUVlEVlIwT0JCWUVGQ1dVCkFaTXNaeE13L1k1OGlXMGZJWVAzcDdTYk1COEdBMV\
+ VkSXdRWU1CYUFGQ1dVQVpNc1p4TXcvWTU4aVcwZklZUDMKcDdTYk1Bd0dBMVVkR\
+ XdRRk1BTUJBZjh3RFFZSktvWklodmNOQVFFTEJRQURnZ0VCQUJaMlYxMWowRzhh\
+ Z1Z6Twp2YWtKTGt4UGZ0UE1NMFFOaVRzZmV6RzlicnBkdEVLSjFyalFCblNXYTN\
+ WbThWRGZTYkhLQUNXaGh0OEhzcXhtCmNzdVQyOWUyaGZBNHVIOUxMdy9MVG5EdE\
+ tJSjZ6aWFzaTM5RGh3UGwwaExuamJRMjk4VVo5TGovVlpnZGlqemIKWnVPdHlpT\
+ nVOS0E2Nmd0dGxXcWZRQ2hkbnJ5MlZUbjBjblR5dU9UalByYWdOdXJMdlVwL3Nl\
+ eURhZmsxNXJ4egozcmlYZldiQnRhUUk1dnM0ekFKU2xneUg2RnpiZStoTUhlUzF\
+ mM2ppb3dJV0lRR2NNbHpGT1RpMm1xWFRybEJYCnh1WmpLZlpOcndjQVNGbk9qYV\
+ BWeFQ1ODJ4WWhtTm8wR3J2MlZEck51bDlSYkgvK3lNS2J5NEhkOFRvVThMU2kKY\
+ 3Uxajh3cz0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
+ server: https://192.168.0.22:16443
+ name: microk8s-cluster
+contexts:
+- context:
+ cluster: microk8s-cluster
+ user: admin
+ name: microk8s
+current-context: microk8s
+kind: Config
+preferences: {}
+users:
+- name: admin
+ user:
+ token: clhkRExRem5Xd1dCdnFEVXdvRGtDRGE5b1F3WnNrZk5qeHFCOU10bHBZRT0K
+"""
+
+
async def AsyncMockFunc():
await asyncio.sleep(1)
@@ -91,6 +135,15 @@
status = "ready"
+class FakeModel:
+ def __init__(self, applications: dict = {}):
+ self._applications = applications
+
+ @property
+ def applications(self):
+ return self._applications
+
+
class FakeUnit(MagicMock):
async def is_leader_from_status(self):
return True
@@ -100,7 +153,6 @@
class FakeApplication(AsyncMock):
-
async def set_config(self, config):
pass
@@ -113,6 +165,25 @@
units = [FakeUnit(), FakeUnit()]
+class FakeFile:
+ def __init__(self, content: str = ""):
+ self.content = content
+
+ def read(self, size: int = -1):
+ return self.content
+
+
+class FakeFileWrapper:
+ def __init__(self, content: str = ""):
+ self.file = FakeFile(content=content)
+
+ def __enter__(self):
+ return self.file
+
+ def __exit__(self, type, value, traceback):
+ pass
+
+
FAKE_DELTA_MACHINE_PENDING = Dict(
{
"deltas": ["machine", "change", {}],