X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fjuju_watcher.py;h=e122786ccf7e33785a69a933585957b031a876fe;hp=815abf958da18bcac7c79ab9662ec22d873288ba;hb=refs%2Fchanges%2F99%2F10499%2F1;hpb=4fee80e46dff88732b7927e502007203fcd8a15c diff --git a/n2vc/juju_watcher.py b/n2vc/juju_watcher.py index 815abf9..e122786 100644 --- a/n2vc/juju_watcher.py +++ b/n2vc/juju_watcher.py @@ -15,21 +15,76 @@ import asyncio import time from juju.client import client -from n2vc.utils import FinalStatus, EntityType from n2vc.exceptions import EntityInvalidException from n2vc.n2vc_conn import N2VCConnector from juju.model import ModelEntity, Model from juju.client.overrides import Delta - +from juju.status import derive_status +from juju.application import Application +from websockets.exceptions import ConnectionClosed import logging logger = logging.getLogger("__main__") +def status(application: Application) -> str: + unit_status = [] + for unit in application.units: + unit_status.append(unit.workload_status) + return derive_status(unit_status) + + +def entity_ready(entity: ModelEntity) -> bool: + entity_type = entity.entity_type + if entity_type == "machine": + return entity.agent_status in ["started"] + elif entity_type == "action": + return entity.status in ["completed", "failed", "cancelled"] + elif entity_type == "application": + # Workaround for bug: https://github.com/juju/python-libjuju/issues/441 + return status(entity) in ["active", "blocked"] + else: + raise EntityInvalidException("Unknown entity type: {}".format(entity_type)) + + class JujuModelWatcher: + @staticmethod + async def wait_for_model(model: Model, timeout: float = 3600): + """ + Wait for all entities in model to reach its final state. + + :param: model: Model to observe + :param: timeout: Timeout for the model applications to be active + + :raises: asyncio.TimeoutError when timeout reaches + """ + + if timeout is None: + timeout = 3600.0 + + # Coroutine to wait until the entity reaches the final state + wait_for_entity = asyncio.ensure_future( + asyncio.wait_for( + model.block_until( + lambda: all( + entity_ready(entity) for entity in model.applications.values() + ) + ), + timeout=timeout, + ) + ) + + tasks = [wait_for_entity] + try: + await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + finally: + # Cancel tasks + for task in tasks: + task.cancel() + @staticmethod async def wait_for( - model, + model: Model, entity: ModelEntity, progress_timeout: float = 3600, total_timeout: float = 3600, @@ -54,21 +109,14 @@ class JujuModelWatcher: if total_timeout is None: total_timeout = 3600.0 - entity_type = EntityType.get_entity(type(entity)) - if entity_type not in FinalStatus: - raise EntityInvalidException("Entity type not found") - - # Get final states - final_states = FinalStatus[entity_type].status - field_to_check = FinalStatus[entity_type].field + entity_type = entity.entity_type + if entity_type not in ["application", "action", "machine"]: + raise EntityInvalidException("Unknown entity type: {}".format(entity_type)) # Coroutine to wait until the entity reaches the final state wait_for_entity = asyncio.ensure_future( asyncio.wait_for( - model.block_until( - lambda: entity.__getattribute__(field_to_check) in final_states - ), - timeout=total_timeout, + model.block_until(lambda: entity_ready(entity)), timeout=total_timeout, ) ) @@ -89,8 +137,6 @@ class JujuModelWatcher: # Execute tasks, and stop when the first is finished # The watcher task won't never finish (unless it timeouts) await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) - except Exception as e: - raise e finally: # Cancel tasks for task in tasks: @@ -100,7 +146,7 @@ class JujuModelWatcher: async def model_watcher( model: Model, entity_id: str, - entity_type: EntityType, + entity_type: str, timeout: float, db_dict: dict = None, n2vc: N2VCConnector = None, @@ -110,7 +156,7 @@ class JujuModelWatcher: :param: model: Model to observe :param: entity_id: ID of the entity to be observed - :param: entity_type: EntityType (p.e. .APPLICATION, .MACHINE, and .ACTION) + :param: entity_type: Entity Type (p.e. "application", "machine, and "action") :param: timeout: Maximum time between two updates in the model :param: db_dict: Dictionary with data of the DB to write the updates :param: n2vc: N2VC Connector objector @@ -122,86 +168,94 @@ class JujuModelWatcher: # Genenerate array with entity types to listen entity_types = ( - [entity_type, EntityType.UNIT] - if entity_type == EntityType.APPLICATION # TODO: Add .ACTION too + [entity_type, "unit"] + if entity_type == "application" # TODO: Add "action" too else [entity_type] ) # Get time when it should timeout timeout_end = time.time() + timeout - while True: - change = await allwatcher.Next() - for delta in change.deltas: - write = False - delta_entity = None - - # Get delta EntityType - delta_entity = EntityType.get_entity_from_delta(delta.entity) - - if delta_entity in entity_types: - # Get entity id - if entity_type == EntityType.APPLICATION: - id = ( - delta.data["application"] - if delta_entity == EntityType.UNIT - else delta.data["name"] - ) - else: - id = delta.data["id"] - - # Write if the entity id match - write = True if id == entity_id else False - - # Update timeout - timeout_end = time.time() + timeout - (status, status_message, vca_status) = JujuModelWatcher.get_status( - delta, entity_type=delta_entity - ) + try: + while True: + change = await allwatcher.Next() + for delta in change.deltas: + write = False + delta_entity = None + + # Get delta EntityType + delta_entity = delta.entity + + if delta_entity in entity_types: + # Get entity id + if entity_type == "application": + id = ( + delta.data["application"] + if delta_entity == "unit" + else delta.data["name"] + ) + else: + id = delta.data["id"] + + # Write if the entity id match + write = True if id == entity_id else False + + # Update timeout + timeout_end = time.time() + timeout + ( + status, + status_message, + vca_status, + ) = JujuModelWatcher.get_status(delta) - if write and n2vc is not None and db_dict: - # Write status to DB - status = n2vc.osm_status(delta_entity, status) - await n2vc.write_app_status_to_db( - db_dict=db_dict, - status=status, - detailed_status=status_message, - vca_status=vca_status, - entity_type=delta_entity.value.__name__.lower(), - ) - # Check if timeout - if time.time() > timeout_end: - raise asyncio.TimeoutError() + if write and n2vc is not None and db_dict: + # Write status to DB + status = n2vc.osm_status(delta_entity, status) + await n2vc.write_app_status_to_db( + db_dict=db_dict, + status=status, + detailed_status=status_message, + vca_status=vca_status, + entity_type=delta_entity, + ) + # Check if timeout + if time.time() > timeout_end: + raise asyncio.TimeoutError() + except ConnectionClosed: + pass + # This is expected to happen when the + # entity reaches its final state, because + # the model connection is closed afterwards @staticmethod - def get_status(delta: Delta, entity_type: EntityType) -> (str, str, str): + def get_status(delta: Delta) -> (str, str, str): """ Get status from delta :param: delta: Delta generated by the allwatcher - :param: entity_type: EntityType (p.e. .APPLICATION, .MACHINE, and .ACTION) + :param: entity_type: Entity Type (p.e. "application", "machine, and "action") :return (status, message, vca_status) """ - if entity_type == EntityType.MACHINE: + if delta.entity == "machine": return ( delta.data["agent-status"]["current"], delta.data["instance-status"]["message"], delta.data["instance-status"]["current"], ) - elif entity_type == EntityType.ACTION: + elif delta.entity == "action": return ( delta.data["status"], delta.data["status"], delta.data["status"], ) - elif entity_type == EntityType.APPLICATION: + elif delta.entity == "application": return ( delta.data["status"]["current"], delta.data["status"]["message"], delta.data["status"]["current"], ) - elif entity_type == EntityType.UNIT: + elif delta.entity == "unit": return ( delta.data["workload-status"]["current"], delta.data["workload-status"]["message"],