Enable lint, flake8 and unit tests
Cleans up non pep compliant code.
Adds a simple unit test.
Formats according to black.
Tox automatically runs lint, flake8 and unit test suite
with coverage. To run each individually, execute:
tox -e pylint
tox -e black
tox -e flake8
tox -e cover
Note that these are all run for each patch via Jenkins. The full
tox suite should be run locally before any commit to ensure it
will not fail in Jenkins.
Change-Id: I2f87abe3d5086d6d65ac33a27780c498fc7b1cd3
Signed-off-by: beierlm <mark.beierl@canonical.com>
diff --git a/n2vc/juju_observer.py b/n2vc/juju_observer.py
index e2f0470..7ed3dee 100644
--- a/n2vc/juju_observer.py
+++ b/n2vc/juju_observer.py
@@ -23,13 +23,13 @@
import asyncio
import time
-from juju.model import ModelObserver, Model
-from juju.machine import Machine
-from juju.application import Application
from juju.action import Action
+from juju.application import Application
+from juju.machine import Machine
+from juju.model import ModelObserver, Model
-from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status
from n2vc.exceptions import N2VCTimeoutException
+from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status
class _Entity:
@@ -42,7 +42,6 @@
class JujuModelObserver(ModelObserver):
-
def __init__(self, n2vc: N2VCConnector, model: Model):
self.n2vc = n2vc
self.model = model
@@ -54,11 +53,14 @@
def register_machine(self, machine: Machine, db_dict: dict):
try:
entity_id = machine.entity_id
- except Exception as e:
+ except Exception:
# no entity_id aatribute, try machine attribute
entity_id = machine.machine
- # self.n2vc.debug(msg='Registering machine for change notifications: {}'.format(entity_id))
- entity = _Entity(entity_id=entity_id, entity_type='machine', obj=machine, db_dict=db_dict)
+ # self.n2vc.debug(
+ # msg='Registering machine for change notifications: {}'.format(entity_id))
+ entity = _Entity(
+ entity_id=entity_id, entity_type="machine", obj=machine, db_dict=db_dict
+ )
self.machines[entity_id] = entity
def unregister_machine(self, machine_id: str):
@@ -70,8 +72,14 @@
def register_application(self, application: Application, db_dict: dict):
entity_id = application.entity_id
- # self.n2vc.debug(msg='Registering application for change notifications: {}'.format(entity_id))
- entity = _Entity(entity_id=entity_id, entity_type='application', obj=application, db_dict=db_dict)
+ # self.n2vc.debug(
+ # msg='Registering application for change notifications: {}'.format(entity_id))
+ entity = _Entity(
+ entity_id=entity_id,
+ entity_type="application",
+ obj=application,
+ db_dict=db_dict,
+ )
self.applications[entity_id] = entity
def unregister_application(self, application_id: str):
@@ -83,8 +91,11 @@
def register_action(self, action: Action, db_dict: dict):
entity_id = action.entity_id
- # self.n2vc.debug(msg='Registering action for changes notifications: {}'.format(entity_id))
- entity = _Entity(entity_id=entity_id, entity_type='action', obj=action, db_dict=db_dict)
+ # self.n2vc.debug(
+ # msg='Registering action for changes notifications: {}'.format(entity_id))
+ entity = _Entity(
+ entity_id=entity_id, entity_type="action", obj=action, db_dict=db_dict
+ )
self.actions[entity_id] = entity
def unregister_action(self, action_id: str):
@@ -95,74 +106,81 @@
return action_id in self.actions
async def wait_for_machine(
- self,
- machine_id: str,
- progress_timeout: float = None,
- total_timeout: float = None) -> int:
+ self,
+ machine_id: str,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ ) -> int:
if not self.is_machine_registered(machine_id):
return
- self.n2vc.debug('Waiting for machine completed: {}'.format(machine_id))
+ self.n2vc.debug("Waiting for machine completed: {}".format(machine_id))
# wait for a final state
entity = self.machines[machine_id]
return await self._wait_for_entity(
entity=entity,
- field_to_check='agent_status',
- final_states_list=['started'],
+ field_to_check="agent_status",
+ final_states_list=["started"],
progress_timeout=progress_timeout,
- total_timeout=total_timeout)
+ total_timeout=total_timeout,
+ )
async def wait_for_application(
- self,
- application_id: str,
- progress_timeout: float = None,
- total_timeout: float = None) -> int:
+ self,
+ application_id: str,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ ) -> int:
if not self.is_application_registered(application_id):
return
- self.n2vc.debug('Waiting for application completed: {}'.format(application_id))
+ self.n2vc.debug("Waiting for application completed: {}".format(application_id))
# application statuses: unknown, active, waiting
# wait for a final state
entity = self.applications[application_id]
return await self._wait_for_entity(
entity=entity,
- field_to_check='status',
- final_states_list=['active', 'blocked'],
+ field_to_check="status",
+ final_states_list=["active", "blocked"],
progress_timeout=progress_timeout,
- total_timeout=total_timeout)
+ total_timeout=total_timeout,
+ )
async def wait_for_action(
- self,
- action_id: str,
- progress_timeout: float = None,
- total_timeout: float = None) -> int:
+ self,
+ action_id: str,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ ) -> int:
if not self.is_action_registered(action_id):
return
- self.n2vc.debug('Waiting for action completed: {}'.format(action_id))
+ self.n2vc.debug("Waiting for action completed: {}".format(action_id))
# action statuses: pending, running, completed, failed, cancelled
# wait for a final state
entity = self.actions[action_id]
return await self._wait_for_entity(
entity=entity,
- field_to_check='status',
- final_states_list=['completed', 'failed', 'cancelled'],
+ field_to_check="status",
+ final_states_list=["completed", "failed", "cancelled"],
progress_timeout=progress_timeout,
- total_timeout=total_timeout)
+ total_timeout=total_timeout,
+ )
async def _wait_for_entity(
- self,
- entity: _Entity,
- field_to_check: str,
- final_states_list: list,
- progress_timeout: float = None,
- total_timeout: float = None) -> int:
+ self,
+ entity: _Entity,
+ field_to_check: str,
+ final_states_list: list,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ ) -> int:
# default values for no timeout
if total_timeout is None:
@@ -176,8 +194,10 @@
if now >= total_end:
raise N2VCTimeoutException(
- message='Total timeout {} seconds, {}: {}'.format(total_timeout, entity.entity_type, entity.entity_id),
- timeout='total'
+ message="Total timeout {} seconds, {}: {}".format(
+ total_timeout, entity.entity_type, entity.entity_id
+ ),
+ timeout="total",
)
# update next progress timeout
@@ -195,10 +215,11 @@
if await _wait_for_event_or_timeout(entity.event, next_timeout):
entity.event.clear()
else:
- message = 'Progress timeout {} seconds, {}}: {}'\
- .format(progress_timeout, entity.entity_type, entity.entity_id)
+ message = "Progress timeout {} seconds, {}}: {}".format(
+ progress_timeout, entity.entity_type, entity.entity_id
+ )
self.n2vc.debug(message)
- raise N2VCTimeoutException(message=message, timeout='progress')
+ raise N2VCTimeoutException(message=message, timeout="progress")
# self.n2vc.debug('End of wait. Final state: {}, retries: {}'
# .format(entity.obj.__getattribute__(field_to_check), retries))
return retries
@@ -212,7 +233,7 @@
# self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}'
# .format(delta.type, delta.entity, new.entity_id))
- if delta.entity == 'machine':
+ if delta.entity == "machine":
# check registered machine
if new.entity_id not in self.machines:
@@ -224,13 +245,13 @@
status=juju_status_2_osm_status(delta.entity, new.agent_status),
detailed_status=new.status_message,
vca_status=new.status,
- entity_type='machine'
+ entity_type="machine",
)
# set event for this machine
self.machines[new.entity_id].event.set()
- elif delta.entity == 'application':
+ elif delta.entity == "application":
# check registered application
if new.entity_id not in self.applications:
@@ -242,16 +263,16 @@
status=juju_status_2_osm_status(delta.entity, new.status),
detailed_status=new.status_message,
vca_status=new.status,
- entity_type='application'
+ entity_type="application",
)
# set event for this application
self.applications[new.entity_id].event.set()
- elif delta.entity == 'unit':
+ elif delta.entity == "unit":
# get the application for this unit
- application_id = delta.data['application']
+ application_id = delta.data["application"]
# check registered application
if application_id not in self.applications:
@@ -264,13 +285,13 @@
status=juju_status_2_osm_status(delta.entity, new.workload_status),
detailed_status=new.workload_status_message,
vca_status=new.workload_status,
- entity_type='unit'
+ entity_type="unit",
)
# set event for this application
self.applications[application_id].event.set()
- elif delta.entity == 'action':
+ elif delta.entity == "action":
# check registered action
if new.entity_id not in self.actions:
@@ -282,7 +303,7 @@
status=juju_status_2_osm_status(delta.entity, new.status),
detailed_status=new.status,
vca_status=new.status,
- entity_type='action'
+ entity_type="action",
)
# set event for this application