import asyncio
import time
from juju.client import client
-from n2vc.utils import FinalStatus, EntityType
from n2vc.exceptions import EntityInvalidException
from n2vc.n2vc_conn import N2VCConnector
from juju.model import ModelEntity, Model
from juju.client.overrides import Delta
+from juju.status import derive_status
+from juju.application import Application
from websockets.exceptions import ConnectionClosed
import logging
logger = logging.getLogger("__main__")
+def status(application: Application) -> str:
+ unit_status = []
+ for unit in application.units:
+ unit_status.append(unit.workload_status)
+ return derive_status(unit_status)
+
+
+def entity_ready(entity: ModelEntity) -> bool:
+ """
+ Check if the entity is ready
+
+ :param: entity: Model entity. It can be a machine, action, or application.
+
+ :returns: boolean saying if the entity is ready or not
+ """
+ entity_type = entity.entity_type
+ if entity_type == "machine":
+ return entity.agent_status in ["started"]
+ elif entity_type == "action":
+ return entity.status in ["completed", "failed", "cancelled"]
+ elif entity_type == "application":
+ # Workaround for bug: https://github.com/juju/python-libjuju/issues/441
+ return entity.status in ["active", "blocked"]
+ else:
+ raise EntityInvalidException("Unknown entity type: {}".format(entity_type))
+
+
+def application_ready(application: Application) -> bool:
+ """
+ Check if an application has a leader
+
+ :param: application: Application entity.
+
+ :returns: boolean saying if the application has a unit that is a leader.
+ """
+ ready_status_list = ["active", "blocked"]
+ application_ready = application.status in ready_status_list
+ units_ready = all(
+ unit.workload_status in ready_status_list for unit in application.units
+ )
+ return application_ready and units_ready
+
+
class JujuModelWatcher:
+ @staticmethod
+ async def wait_for_model(model: Model, timeout: float = 3600):
+ """
+ Wait for all entities in model to reach its final state.
+
+ :param: model: Model to observe
+ :param: timeout: Timeout for the model applications to be active
+
+ :raises: asyncio.TimeoutError when timeout reaches
+ """
+
+ if timeout is None:
+ timeout = 3600.0
+
+ # Coroutine to wait until the entity reaches the final state
+ async def wait_until_model_ready():
+ wait_for_entity = asyncio.ensure_future(
+ asyncio.wait_for(
+ model.block_until(
+ lambda: all(
+ application_ready(application)
+ for application in model.applications.values()
+ ),
+ ),
+ timeout=timeout,
+ )
+ )
+
+ tasks = [wait_for_entity]
+ try:
+ await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
+ finally:
+ # Cancel tasks
+ for task in tasks:
+ task.cancel()
+
+ await wait_until_model_ready()
+ # Check model is still ready after 10 seconds
+
+ await asyncio.sleep(10)
+ await wait_until_model_ready()
+
@staticmethod
async def wait_for(
- model,
+ model: Model,
entity: ModelEntity,
progress_timeout: float = 3600,
total_timeout: float = 3600,
db_dict: dict = None,
n2vc: N2VCConnector = None,
+ vca_id: str = None,
):
"""
Wait for entity to reach its final state.
:param: total_timeout: Timeout for the entity to be active
:param: db_dict: Dictionary with data of the DB to write the updates
:param: n2vc: N2VC Connector objector
+ :param: vca_id: VCA ID
:raises: asyncio.TimeoutError when timeout reaches
"""
if total_timeout is None:
total_timeout = 3600.0
- entity_type = EntityType.get_entity(type(entity))
- if entity_type not in FinalStatus:
- raise EntityInvalidException("Entity type not found")
-
- # Get final states
- final_states = FinalStatus[entity_type].status
- field_to_check = FinalStatus[entity_type].field
+ entity_type = entity.entity_type
+ if entity_type not in ["application", "action", "machine"]:
+ raise EntityInvalidException("Unknown entity type: {}".format(entity_type))
# Coroutine to wait until the entity reaches the final state
wait_for_entity = asyncio.ensure_future(
asyncio.wait_for(
- model.block_until(
- lambda: entity.__getattribute__(field_to_check) in final_states
- ),
+ model.block_until(lambda: entity_ready(entity)),
timeout=total_timeout,
)
)
timeout=progress_timeout,
db_dict=db_dict,
n2vc=n2vc,
+ vca_id=vca_id,
)
)
# Execute tasks, and stop when the first is finished
# The watcher task won't never finish (unless it timeouts)
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
- except Exception as e:
- raise e
finally:
# Cancel tasks
for task in tasks:
async def model_watcher(
model: Model,
entity_id: str,
- entity_type: EntityType,
+ entity_type: str,
timeout: float,
db_dict: dict = None,
n2vc: N2VCConnector = None,
+ vca_id: str = None,
):
"""
Observes the changes related to an specific entity in a model
:param: model: Model to observe
:param: entity_id: ID of the entity to be observed
- :param: entity_type: EntityType (p.e. .APPLICATION, .MACHINE, and .ACTION)
+ :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
:param: timeout: Maximum time between two updates in the model
:param: db_dict: Dictionary with data of the DB to write the updates
:param: n2vc: N2VC Connector objector
+ :param: vca_id: VCA ID
:raises: asyncio.TimeoutError when timeout reaches
"""
# Genenerate array with entity types to listen
entity_types = (
- [entity_type, EntityType.UNIT]
- if entity_type == EntityType.APPLICATION # TODO: Add .ACTION too
+ [entity_type, "unit"]
+ if entity_type == "application" # TODO: Add "action" too
else [entity_type]
)
delta_entity = None
# Get delta EntityType
- delta_entity = EntityType.get_entity_from_delta(delta.entity)
+ delta_entity = delta.entity
if delta_entity in entity_types:
# Get entity id
- if entity_type == EntityType.APPLICATION:
+ if entity_type == "application":
id = (
delta.data["application"]
- if delta_entity == EntityType.UNIT
+ if delta_entity == "unit"
else delta.data["name"]
)
else:
# Update timeout
timeout_end = time.time() + timeout
- (status, status_message, vca_status) = JujuModelWatcher.get_status(
- delta, entity_type=delta_entity
- )
+ (
+ status,
+ status_message,
+ vca_status,
+ ) = JujuModelWatcher.get_status(delta)
if write and n2vc is not None and db_dict:
# Write status to DB
status=status,
detailed_status=status_message,
vca_status=vca_status,
- entity_type=delta_entity.value.__name__.lower(),
+ entity_type=delta_entity,
+ vca_id=vca_id,
)
# Check if timeout
if time.time() > timeout_end:
# the model connection is closed afterwards
@staticmethod
- def get_status(delta: Delta, entity_type: EntityType) -> (str, str, str):
+ def get_status(delta: Delta) -> (str, str, str):
"""
Get status from delta
:param: delta: Delta generated by the allwatcher
- :param: entity_type: EntityType (p.e. .APPLICATION, .MACHINE, and .ACTION)
+ :param: entity_type: Entity Type (p.e. "application", "machine, and "action")
:return (status, message, vca_status)
"""
- if entity_type == EntityType.MACHINE:
+ if delta.entity == "machine":
return (
delta.data["agent-status"]["current"],
delta.data["instance-status"]["message"],
delta.data["instance-status"]["current"],
)
- elif entity_type == EntityType.ACTION:
+ elif delta.entity == "action":
return (
delta.data["status"],
delta.data["status"],
delta.data["status"],
)
- elif entity_type == EntityType.APPLICATION:
+ elif delta.entity == "application":
return (
delta.data["status"]["current"],
delta.data["status"]["message"],
delta.data["status"]["current"],
)
- elif entity_type == EntityType.UNIT:
+ elif delta.entity == "unit":
return (
delta.data["workload-status"]["current"],
delta.data["workload-status"]["message"],