import asyncio
-from n2vc.utils import Dict, EntityType, N2VCDeploymentStatus
+from n2vc.utils import Dict, N2VCDeploymentStatus
from n2vc.n2vc_conn import N2VCConnector
from unittest.mock import MagicMock
+kubeconfig = """apiVersion: v1
+clusters:
+- cluster:
+ certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1\
+ JSURBVENDQWVtZ0F3SUJBZ0lKQUxjMk9xVUpwcnVCTUEwR0NTcUdTSWIzRFFFQk\
+ N3VUFNQmN4RlRBVEJnTlYKQkFNTURERXdMakUxTWk0eE9ETXVNVEFlRncweU1EQ\
+ TVNVEV4TkRJeU16VmFGdzB6TURBNU1Ea3hOREl5TXpWYQpNQmN4RlRBVEJnTlZC\
+ QU1NRERFd0xqRTFNaTR4T0RNdU1UQ0NBU0l3RFFZSktvWklodmNOQVFFQkJRQUR\
+ nZ0VQCkFEQ0NBUW9DZ2dFQkFNV0tyQkdxWlJRT0VONDExR2RESmY2ckZWRDcvMU\
+ xHNlZMWjNhd1BRdHBhRTRxdVdyNisKWjExTWwra2kwVEU1cGZFV3dKenVUZXlCU\
+ XVkUEpnYm1QTjF1VWROdGRiNlpocHEzeC9oT0hCMVJLNC9iSlNFUgpiZ0dITmN6\
+ MzR6SHRaZ1dwb2NPTXpPOW9oRUdhMTZUaDhmQWVxYU1CQTJRaklmeUFlaVp3VHJ\
+ nZ3BrY2dBMUlOCjBvQkdqSURnSGVoSU5tbGZOOURkQ3hNN1FNTmtSbzRXdE13bF\
+ JSRWZ4QnFiVkNpZGFjbVhhb1VPUjJPeFVmQWEKN1orSUU1TmN5ZFQ1TGovazdwd\
+ XZCVkdIa0JQWnE0TmlBa3R4aXd5NVB5R29GTk9mT0NrV2I2VnBzVzNhTlNJeAo4\
+ aXBITkc3enV3elc1TGQ5TkhQYWpRckZwdFZBSHpJNWNhRUNBd0VBQWFOUU1FNHd\
+ IUVlEVlIwT0JCWUVGQ1dVCkFaTXNaeE13L1k1OGlXMGZJWVAzcDdTYk1COEdBMV\
+ VkSXdRWU1CYUFGQ1dVQVpNc1p4TXcvWTU4aVcwZklZUDMKcDdTYk1Bd0dBMVVkR\
+ XdRRk1BTUJBZjh3RFFZSktvWklodmNOQVFFTEJRQURnZ0VCQUJaMlYxMWowRzhh\
+ Z1Z6Twp2YWtKTGt4UGZ0UE1NMFFOaVRzZmV6RzlicnBkdEVLSjFyalFCblNXYTN\
+ WbThWRGZTYkhLQUNXaGh0OEhzcXhtCmNzdVQyOWUyaGZBNHVIOUxMdy9MVG5EdE\
+ tJSjZ6aWFzaTM5RGh3UGwwaExuamJRMjk4VVo5TGovVlpnZGlqemIKWnVPdHlpT\
+ nVOS0E2Nmd0dGxXcWZRQ2hkbnJ5MlZUbjBjblR5dU9UalByYWdOdXJMdlVwL3Nl\
+ eURhZmsxNXJ4egozcmlYZldiQnRhUUk1dnM0ekFKU2xneUg2RnpiZStoTUhlUzF\
+ mM2ppb3dJV0lRR2NNbHpGT1RpMm1xWFRybEJYCnh1WmpLZlpOcndjQVNGbk9qYV\
+ BWeFQ1ODJ4WWhtTm8wR3J2MlZEck51bDlSYkgvK3lNS2J5NEhkOFRvVThMU2kKY\
+ 3Uxajh3cz0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
+ server: https://192.168.0.22:16443
+ name: microk8s-cluster
+contexts:
+- context:
+ cluster: microk8s-cluster
+ user: admin
+ name: microk8s
+current-context: microk8s
+kind: Config
+preferences: {}
+users:
+- name: admin
+ user:
+ token: clhkRExRem5Xd1dCdnFEVXdvRGtDRGE5b1F3WnNrZk5qeHFCOU10bHBZRT0K
+"""
+
+
async def AsyncMockFunc():
await asyncio.sleep(1)
detailed_status: str,
vca_status: str,
entity_type: str,
+ vca_id: str = None,
):
+ """
+ Write application status to database
+
+ :param: db_dict: DB dictionary
+ :param: status: Status of the application
+ :param: detailed_status: Detailed status
+ :param: vca_status: VCA status
+ :param: entity_type: Entity type ("application", "machine, and "action")
+ :param: vca_id: Id of the VCA. If None, the default VCA will be used.
+ """
self.last_written_values = Dict(
{
"n2vc_status": status,
entity_id = "2"
dns_name = "FAKE ENDPOINT"
model_name = "FAKE MODEL"
- entity_type = EntityType.MACHINE
+ entity_type = "machine"
+ safe_data = {"instance-id": "myid"}
+
+ async def destroy(self, force):
+ pass
+
+
+class FakeManualMachine(MagicMock):
+ entity_id = "2"
+ dns_name = "FAKE ENDPOINT"
+ model_name = "FAKE MODEL"
+ entity_type = "machine"
+ safe_data = {"instance-id": "manual:myid"}
+ series = "FAKE SERIES"
async def destroy(self, force):
pass
status = "ready"
+class FakeModel:
+ def __init__(self, applications: dict = {}):
+ self._applications = applications
+
+ @property
+ def applications(self):
+ return self._applications
+
+
class FakeUnit(MagicMock):
async def is_leader_from_status(self):
return True
- async def run_action(self, action_name):
+ async def run_action(self, action_name, **kwargs):
return FakeAction()
+ @property
+ def machine_id(self):
+ return "existing_machine_id"
+
+ name = "existing_unit"
-class FakeApplication(AsyncMock):
+class FakeApplication(AsyncMock):
async def set_config(self, config):
pass
async def add_unit(self, to):
pass
+ async def destroy_unit(self, unit_name):
+ pass
+
async def get_actions(self):
return ["existing_action"]
+ async def get_config(self):
+ return ["app_config"]
+
+ async def scale(self, scale):
+ pass
+
units = [FakeUnit(), FakeUnit()]
+class FakeFile:
+ def __init__(self, content: str = ""):
+ self.content = content
+
+ def read(self, size: int = -1):
+ return self.content
+
+
+class FakeFileWrapper:
+ def __init__(self, content: str = ""):
+ self.file = FakeFile(content=content)
+
+ def __enter__(self):
+ return self.file
+
+ def __exit__(self, type, value, traceback):
+ pass
+
+
FAKE_DELTA_MACHINE_PENDING = Dict(
{
"deltas": ["machine", "change", {}],
Deltas = [
Dict(
{
- "entity": Dict({"id": "2", "type": EntityType.MACHINE}),
- "filter": Dict({"entity_id": "2", "entity_type": EntityType.MACHINE}),
+ "entity": Dict({"id": "2", "type": "machine"}),
+ "filter": Dict({"entity_id": "2", "entity_type": "machine"}),
"delta": FAKE_DELTA_MACHINE_PENDING,
"entity_status": Dict(
{"status": "pending", "message": "Running", "vca_status": "running"}
),
Dict(
{
- "entity": Dict({"id": "2", "type": EntityType.MACHINE}),
- "filter": Dict({"entity_id": "1", "entity_type": EntityType.MACHINE}),
+ "entity": Dict({"id": "2", "type": "machine"}),
+ "filter": Dict({"entity_id": "1", "entity_type": "machine"}),
"delta": FAKE_DELTA_MACHINE_PENDING,
"entity_status": Dict(
{"status": "pending", "message": "Running", "vca_status": "running"}
),
Dict(
{
- "entity": Dict({"id": "2", "type": EntityType.MACHINE}),
- "filter": Dict({"entity_id": "2", "entity_type": EntityType.MACHINE}),
+ "entity": Dict({"id": "2", "type": "machine"}),
+ "filter": Dict({"entity_id": "2", "entity_type": "machine"}),
"delta": FAKE_DELTA_MACHINE_STARTED,
"entity_status": Dict(
{"status": "started", "message": "Running", "vca_status": "running"}
),
Dict(
{
- "entity": Dict({"id": "2", "type": EntityType.MACHINE}),
- "filter": Dict({"entity_id": "1", "entity_type": EntityType.MACHINE}),
+ "entity": Dict({"id": "2", "type": "machine"}),
+ "filter": Dict({"entity_id": "1", "entity_type": "machine"}),
"delta": FAKE_DELTA_MACHINE_STARTED,
"entity_status": Dict(
{"status": "started", "message": "Running", "vca_status": "running"}
),
Dict(
{
- "entity": Dict({"id": "git/0", "type": EntityType.UNIT}),
- "filter": Dict({"entity_id": "git", "entity_type": EntityType.APPLICATION}),
+ "entity": Dict({"id": "git/0", "type": "unit"}),
+ "filter": Dict({"entity_id": "git", "entity_type": "application"}),
"delta": FAKE_DELTA_UNIT_PENDING,
"entity_status": Dict(
{"status": "waiting", "message": "", "vca_status": "waiting"}
),
Dict(
{
- "entity": Dict({"id": "git/0", "type": EntityType.UNIT}),
- "filter": Dict({"entity_id": "2", "entity_type": EntityType.MACHINE}),
+ "entity": Dict({"id": "git/0", "type": "unit"}),
+ "filter": Dict({"entity_id": "2", "entity_type": "machine"}),
"delta": FAKE_DELTA_UNIT_PENDING,
"entity_status": Dict(
{"status": "waiting", "message": "", "vca_status": "waiting"}
),
Dict(
{
- "entity": Dict({"id": "git/0", "type": EntityType.UNIT}),
- "filter": Dict({"entity_id": "git", "entity_type": EntityType.APPLICATION}),
+ "entity": Dict({"id": "git/0", "type": "unit"}),
+ "filter": Dict({"entity_id": "git", "entity_type": "application"}),
"delta": FAKE_DELTA_UNIT_STARTED,
"entity_status": Dict(
{"status": "active", "message": "", "vca_status": "active"}
),
Dict(
{
- "entity": Dict({"id": "git/0", "type": EntityType.UNIT}),
- "filter": Dict({"entity_id": "1", "entity_type": EntityType.ACTION}),
+ "entity": Dict({"id": "git/0", "type": "unit"}),
+ "filter": Dict({"entity_id": "1", "entity_type": "action"}),
"delta": FAKE_DELTA_UNIT_STARTED,
"entity_status": Dict(
{"status": "active", "message": "", "vca_status": "active"}
),
Dict(
{
- "entity": Dict({"id": "git", "type": EntityType.APPLICATION}),
- "filter": Dict({"entity_id": "git", "entity_type": EntityType.APPLICATION}),
+ "entity": Dict({"id": "git", "type": "application"}),
+ "filter": Dict({"entity_id": "git", "entity_type": "application"}),
"delta": FAKE_DELTA_APPLICATION_MAINTENANCE,
"entity_status": Dict(
{
),
Dict(
{
- "entity": Dict({"id": "git", "type": EntityType.APPLICATION}),
- "filter": Dict({"entity_id": "2", "entity_type": EntityType.MACHINE}),
+ "entity": Dict({"id": "git", "type": "application"}),
+ "filter": Dict({"entity_id": "2", "entity_type": "machine"}),
"delta": FAKE_DELTA_APPLICATION_MAINTENANCE,
"entity_status": Dict(
{
),
Dict(
{
- "entity": Dict({"id": "git", "type": EntityType.APPLICATION}),
- "filter": Dict({"entity_id": "git", "entity_type": EntityType.APPLICATION}),
+ "entity": Dict({"id": "git", "type": "application"}),
+ "filter": Dict({"entity_id": "git", "entity_type": "application"}),
"delta": FAKE_DELTA_APPLICATION_ACTIVE,
"entity_status": Dict(
{"status": "active", "message": "Ready!", "vca_status": "active"}
),
Dict(
{
- "entity": Dict({"id": "git", "type": EntityType.APPLICATION}),
- "filter": Dict({"entity_id": "1", "entity_type": EntityType.ACTION}),
+ "entity": Dict({"id": "git", "type": "application"}),
+ "filter": Dict({"entity_id": "1", "entity_type": "action"}),
"delta": FAKE_DELTA_APPLICATION_ACTIVE,
"entity_status": Dict(
{"status": "active", "message": "Ready!", "vca_status": "active"}
),
Dict(
{
- "entity": Dict({"id": "1", "type": EntityType.ACTION}),
- "filter": Dict({"entity_id": "1", "entity_type": EntityType.ACTION}),
+ "entity": Dict({"id": "1", "type": "action"}),
+ "filter": Dict({"entity_id": "1", "entity_type": "action"}),
"delta": FAKE_DELTA_ACTION_COMPLETED,
"entity_status": Dict(
{
),
Dict(
{
- "entity": Dict({"id": "git", "type": EntityType.ACTION}),
- "filter": Dict({"entity_id": "1", "entity_type": EntityType.MACHINE}),
+ "entity": Dict({"id": "git", "type": "action"}),
+ "filter": Dict({"entity_id": "1", "entity_type": "machine"}),
"delta": FAKE_DELTA_ACTION_COMPLETED,
"entity_status": Dict(
{