X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fjuju_observer.py;h=29ae93294e4fbd72c18dc00e9d8489586013a4ef;hp=ac40f34dcb122b43ed0de5580fdd7daefb25ee24;hb=8bfcc14713a71f43f155e3cddec168380134d344;hpb=2911434d2a0e24292c73f640f5df4cac9c447867 diff --git a/n2vc/juju_observer.py b/n2vc/juju_observer.py index ac40f34..29ae932 100644 --- a/n2vc/juju_observer.py +++ b/n2vc/juju_observer.py @@ -23,13 +23,13 @@ import asyncio import time -from juju.model import ModelObserver, Model -from juju.machine import Machine -from juju.application import Application from juju.action import Action +from juju.application import Application +from juju.machine import Machine +from juju.model import ModelObserver, Model -from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status from n2vc.exceptions import N2VCTimeoutException +from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status class _Entity: @@ -42,7 +42,6 @@ class _Entity: class JujuModelObserver(ModelObserver): - def __init__(self, n2vc: N2VCConnector, model: Model): self.n2vc = n2vc self.model = model @@ -52,8 +51,16 @@ class JujuModelObserver(ModelObserver): self.actions = dict() def register_machine(self, machine: Machine, db_dict: dict): - entity_id = machine.entity_id - entity = _Entity(entity_id=entity_id, entity_type='machine', obj=machine, db_dict=db_dict) + try: + entity_id = machine.entity_id + except Exception: + # no entity_id aatribute, try machine attribute + entity_id = machine.machine + # self.n2vc.debug( + # msg='Registering machine for change notifications: {}'.format(entity_id)) + entity = _Entity( + entity_id=entity_id, entity_type="machine", obj=machine, db_dict=db_dict + ) self.machines[entity_id] = entity def unregister_machine(self, machine_id: str): @@ -65,7 +72,14 @@ class JujuModelObserver(ModelObserver): def register_application(self, application: Application, db_dict: dict): entity_id = application.entity_id - entity = _Entity(entity_id=entity_id, entity_type='application', obj=application, db_dict=db_dict) + # self.n2vc.debug( + # msg='Registering application for change notifications: {}'.format(entity_id)) + entity = _Entity( + entity_id=entity_id, + entity_type="application", + obj=application, + db_dict=db_dict, + ) self.applications[entity_id] = entity def unregister_application(self, application_id: str): @@ -77,7 +91,11 @@ class JujuModelObserver(ModelObserver): def register_action(self, action: Action, db_dict: dict): entity_id = action.entity_id - entity = _Entity(entity_id=entity_id, entity_type='action', obj=action, db_dict=db_dict) + # self.n2vc.debug( + # msg='Registering action for changes notifications: {}'.format(entity_id)) + entity = _Entity( + entity_id=entity_id, entity_type="action", obj=action, db_dict=db_dict + ) self.actions[entity_id] = entity def unregister_action(self, action_id: str): @@ -88,74 +106,87 @@ class JujuModelObserver(ModelObserver): return action_id in self.actions async def wait_for_machine( - self, - machine_id: str, - progress_timeout: float = None, - total_timeout: float = None) -> int: + self, + machine_id: str, + progress_timeout: float = None, + total_timeout: float = None, + ) -> int: if not self.is_machine_registered(machine_id): return + self.n2vc.debug("Waiting for machine completed: {}".format(machine_id)) + # wait for a final state entity = self.machines[machine_id] return await self._wait_for_entity( entity=entity, - field_to_check='agent_status', - final_states_list=['started'], + field_to_check="agent_status", + final_states_list=["started"], progress_timeout=progress_timeout, - total_timeout=total_timeout) + total_timeout=total_timeout, + ) async def wait_for_application( - self, - application_id: str, - progress_timeout: float = None, - total_timeout: float = None) -> int: + self, + application_id: str, + progress_timeout: float = None, + total_timeout: float = None, + ) -> int: if not self.is_application_registered(application_id): return + self.n2vc.debug("Waiting for application completed: {}".format(application_id)) + # application statuses: unknown, active, waiting # wait for a final state entity = self.applications[application_id] return await self._wait_for_entity( entity=entity, - field_to_check='status', - final_states_list=['active', 'blocked'], + field_to_check="status", + final_states_list=["active", "blocked"], progress_timeout=progress_timeout, - total_timeout=total_timeout) + total_timeout=total_timeout, + ) async def wait_for_action( - self, - action_id: str, - progress_timeout: float = None, - total_timeout: float = None) -> int: + self, + action_id: str, + progress_timeout: float = None, + total_timeout: float = None, + ) -> int: if not self.is_action_registered(action_id): return + self.n2vc.debug("Waiting for action completed: {}".format(action_id)) + # action statuses: pending, running, completed, failed, cancelled # wait for a final state entity = self.actions[action_id] return await self._wait_for_entity( entity=entity, - field_to_check='status', - final_states_list=['completed', 'failed', 'cancelled'], + field_to_check="status", + final_states_list=["completed", "failed", "cancelled"], progress_timeout=progress_timeout, - total_timeout=total_timeout) + total_timeout=total_timeout, + ) async def _wait_for_entity( - self, - entity: _Entity, - field_to_check: str, - final_states_list: list, - progress_timeout: float = None, - total_timeout: float = None) -> int: + self, + entity: _Entity, + field_to_check: str, + final_states_list: list, + progress_timeout: float = None, + total_timeout: float = None, + ) -> int: # default values for no timeout if total_timeout is None: - total_timeout = 100000 + total_timeout = 3600 if progress_timeout is None: - progress_timeout = 100000 + progress_timeout = 3600 # max end time now = time.time() @@ -163,8 +194,10 @@ class JujuModelObserver(ModelObserver): if now >= total_end: raise N2VCTimeoutException( - message='Total timeout {} seconds, {}: {}'.format(total_timeout, entity.entity_type, entity.entity_id), - timeout='total' + message="Total timeout {} seconds, {}: {}".format( + total_timeout, entity.entity_type, entity.entity_id + ), + timeout="total", ) # update next progress timeout @@ -182,12 +215,13 @@ class JujuModelObserver(ModelObserver): if await _wait_for_event_or_timeout(entity.event, next_timeout): entity.event.clear() else: - message = 'Progress timeout {} seconds, {}}: {}'\ - .format(progress_timeout, entity.entity_type, entity.entity_id) + message = "Progress timeout {} seconds, {}: {}".format( + progress_timeout, entity.entity_type, entity.entity_id + ) self.n2vc.debug(message) - raise N2VCTimeoutException(message=message, timeout='progress') - self.n2vc.debug('End of wait. Final state: {}, retries: {}' - .format(entity.obj.__getattribute__(field_to_check), retries)) + raise N2VCTimeoutException(message=message, timeout="progress") + # self.n2vc.debug('End of wait. Final state: {}, retries: {}' + # .format(entity.obj.__getattribute__(field_to_check), retries)) return retries async def on_change(self, delta, old, new, model): @@ -196,10 +230,10 @@ class JujuModelObserver(ModelObserver): return # log - self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}' - .format(delta.type, delta.entity, new.entity_id)) + # self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}' + # .format(delta.type, delta.entity, new.entity_id)) - if delta.entity == 'machine': + if delta.entity == "machine": # check registered machine if new.entity_id not in self.machines: @@ -211,13 +245,13 @@ class JujuModelObserver(ModelObserver): status=juju_status_2_osm_status(delta.entity, new.agent_status), detailed_status=new.status_message, vca_status=new.status, - entity_type='machine' + entity_type="machine", ) # set event for this machine self.machines[new.entity_id].event.set() - elif delta.entity == 'application': + elif delta.entity == "application": # check registered application if new.entity_id not in self.applications: @@ -229,34 +263,35 @@ class JujuModelObserver(ModelObserver): status=juju_status_2_osm_status(delta.entity, new.status), detailed_status=new.status_message, vca_status=new.status, - entity_type='application' + entity_type="application", ) # set event for this application self.applications[new.entity_id].event.set() - elif delta.entity == 'unit': + elif delta.entity == "unit": # get the application for this unit - application_id = delta.data['application'] + application_id = delta.data["application"] # check registered application if application_id not in self.applications: return # write change in database - await self.n2vc.write_app_status_to_db( - db_dict=self.applications[application_id].db_dict, - status=juju_status_2_osm_status(delta.entity, new.workload_status), - detailed_status=new.workload_status_message, - vca_status=new.workload_status, - entity_type='unit' - ) + if not new.dead: + await self.n2vc.write_app_status_to_db( + db_dict=self.applications[application_id].db_dict, + status=juju_status_2_osm_status(delta.entity, new.workload_status), + detailed_status=new.workload_status_message, + vca_status=new.workload_status, + entity_type="unit", + ) # set event for this application self.applications[application_id].event.set() - elif delta.entity == 'action': + elif delta.entity == "action": # check registered action if new.entity_id not in self.actions: @@ -268,7 +303,7 @@ class JujuModelObserver(ModelObserver): status=juju_status_2_osm_status(delta.entity, new.status), detailed_status=new.status, vca_status=new.status, - entity_type='action' + entity_type="action", ) # set event for this application