From 4584f8e86a492d67d120bfea1195eff1475c0a65 Mon Sep 17 00:00:00 2001 From: garciadeblas Date: Fri, 14 May 2021 16:50:06 +0200 Subject: [PATCH] Reformat POL to standardized format Change-Id: I955a76830f582b6d152e242568d74a12907c782c Signed-off-by: garciadeblas --- osm_policy_module/alarming/service.py | 132 ++-- osm_policy_module/autoscaling/service.py | 372 ++++++---- osm_policy_module/cmd/policy_module_agent.py | 18 +- .../cmd/policy_module_healthcheck.py | 14 +- osm_policy_module/common/common_db_client.py | 45 +- osm_policy_module/common/lcm_client.py | 42 +- .../common/message_bus_client.py | 10 +- osm_policy_module/common/mon_client.py | 170 +++-- osm_policy_module/core/agent.py | 66 +- osm_policy_module/core/config.py | 10 +- osm_policy_module/core/database.py | 41 +- osm_policy_module/migrations/001_initial.py | 37 +- .../migrations/002_add_vnf_alarm.py | 13 +- .../migrations/003_add_fields_to_policy.py | 14 +- .../migrations/004_add_fields_to_alarm.py | 8 +- .../005_change_vnf_index_member_to_str.py | 16 +- osm_policy_module/migrations/conf.py | 2 +- .../tests/integration/test_kafka_messages.py | 35 +- .../tests/integration/test_policy_agent.py | 658 +++++++++--------- .../unit/alarming/test_alarming_service.py | 55 +- .../autoscaling/test_autoscaling_service.py | 269 ++++--- .../unit/common/test_message_bus_client.py | 29 +- .../tests/unit/core/test_policy_agent.py | 101 +-- .../tests/unit/utils/test_vnfd_utils.py | 20 +- osm_policy_module/utils/vnfd.py | 17 +- setup.py | 21 +- tox.ini | 1 + 27 files changed, 1290 insertions(+), 926 deletions(-) diff --git a/osm_policy_module/alarming/service.py b/osm_policy_module/alarming/service.py index 55fe864..37fa6a1 100644 --- a/osm_policy_module/alarming/service.py +++ b/osm_policy_module/alarming/service.py @@ -34,14 +34,17 @@ from osm_policy_module.common.lcm_client import LcmClient from osm_policy_module.common.mon_client import MonClient from osm_policy_module.core import database from osm_policy_module.core.config import Config -from osm_policy_module.core.database import VnfAlarm, VnfAlarmRepository, AlarmActionRepository +from osm_policy_module.core.database import ( + VnfAlarm, + VnfAlarmRepository, + AlarmActionRepository, +) from osm_policy_module.core.exceptions import VdurNotFound log = logging.getLogger(__name__) class AlarmingService: - def __init__(self, config: Config, loop=None): self.conf = config if not loop: @@ -60,59 +63,71 @@ class AlarmingService: vnfrs = self.db_client.get_vnfrs(nsr_id) for vnfr in vnfrs: log.debug("Processing vnfr: %s", vnfr) - vnfd = self.db_client.get_vnfd(vnfr['vnfd-id']) - for vdur in vnfr['vdur']: + vnfd = self.db_client.get_vnfd(vnfr["vnfd-id"]) + for vdur in vnfr["vdur"]: vdu = next( filter( - lambda vdu: vdu['id'] == vdur['vdu-id-ref'], - vnfd['vdu'] + lambda vdu: vdu["id"] == vdur["vdu-id-ref"], vnfd["vdu"] ) ) - if 'alarm' in vdu: - alarm_descriptors = vdu['alarm'] + if "alarm" in vdu: + alarm_descriptors = vdu["alarm"] for alarm_descriptor in alarm_descriptors: try: VnfAlarmRepository.get( - VnfAlarm.alarm_id == alarm_descriptor['alarm-id'], - VnfAlarm.vnf_member_index == vnfr['member-vnf-index-ref'], - VnfAlarm.vdu_name == vdur['name'], - VnfAlarm.nsr_id == nsr_id + VnfAlarm.alarm_id + == alarm_descriptor["alarm-id"], + VnfAlarm.vnf_member_index + == vnfr["member-vnf-index-ref"], + VnfAlarm.vdu_name == vdur["name"], + VnfAlarm.nsr_id == nsr_id, + ) + log.debug( + "vdu %s already has an alarm configured with same id %s", + vdur["name"], + alarm_descriptor["alarm-id"], ) - log.debug("vdu %s already has an alarm configured with same id %s", vdur['name'], - alarm_descriptor['alarm-id']) continue except VnfAlarm.DoesNotExist: pass vnf_monitoring_param = next( filter( - lambda param: param['id'] == alarm_descriptor['vnf-monitoring-param-ref'], - vdu.get('monitoring-parameter', []) + lambda param: param["id"] + == alarm_descriptor["vnf-monitoring-param-ref"], + vdu.get("monitoring-parameter", []), ), - {} + {}, + ) + metric_name = self._get_metric_name( + vnf_monitoring_param ) - metric_name = self._get_metric_name(vnf_monitoring_param) alarm_uuid = await self.mon_client.create_alarm( metric_name=metric_name, ns_id=nsr_id, - vdu_name=vdur['name'], - vnf_member_index=vnfr['member-vnf-index-ref'], - threshold=alarm_descriptor['value'], - operation=alarm_descriptor['operation'] + vdu_name=vdur["name"], + vnf_member_index=vnfr["member-vnf-index-ref"], + threshold=alarm_descriptor["value"], + operation=alarm_descriptor["operation"], ) alarm = VnfAlarmRepository.create( - alarm_id=alarm_descriptor['alarm-id'], + alarm_id=alarm_descriptor["alarm-id"], alarm_uuid=alarm_uuid, nsr_id=nsr_id, - vnf_member_index=vnfr['member-vnf-index-ref'], - vdu_name=vdur['name'] + vnf_member_index=vnfr["member-vnf-index-ref"], + vdu_name=vdur["name"], ) - for action_type in ['ok', 'insufficient-data', 'alarm']: - if 'actions' in alarm_descriptor and action_type in alarm_descriptor['actions']: - for url in alarm_descriptor['actions'][action_type]: + for action_type in ["ok", "insufficient-data", "alarm"]: + if ( + "actions" in alarm_descriptor + and action_type in alarm_descriptor["actions"] + ): + for url in alarm_descriptor["actions"][ + action_type + ]: AlarmActionRepository.create( type=action_type, - url=url['url'], - alarm=alarm + url=url["url"], + alarm=alarm, ) alarms_created.append(alarm) @@ -122,12 +137,16 @@ class AlarmingService: log.debug("Cleaning alarm resources in MON") for alarm in alarms_created: try: - await self.mon_client.delete_alarm(alarm.nsr_id, - alarm.vnf_member_index, - alarm.vdu_name, - alarm.alarm_uuid) + await self.mon_client.delete_alarm( + alarm.nsr_id, + alarm.vnf_member_index, + alarm.vdu_name, + alarm.alarm_uuid, + ) except ValueError: - log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid) + log.exception( + "Error deleting alarm in MON %s", alarm.alarm_uuid + ) raise e finally: database.db.close() @@ -140,7 +159,9 @@ class AlarmingService: with database.db.atomic(): for alarm in VnfAlarmRepository.list(VnfAlarm.nsr_id == nsr_id): try: - self.db_client.get_vdur(nsr_id, alarm.vnf_member_index, alarm.vdu_name) + self.db_client.get_vdur( + nsr_id, alarm.vnf_member_index, alarm.vdu_name + ) except VdurNotFound: log.debug("Deleting orphaned alarm %s", alarm.alarm_uuid) try: @@ -148,9 +169,12 @@ class AlarmingService: alarm.nsr_id, alarm.vnf_member_index, alarm.vdu_name, - alarm.alarm_uuid) + alarm.alarm_uuid, + ) except ValueError: - log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid) + log.exception( + "Error deleting alarm in MON %s", alarm.alarm_uuid + ) alarm.delete_instance() except Exception as e: log.exception("Error deleting orphaned alarms:") @@ -170,9 +194,12 @@ class AlarmingService: alarm.nsr_id, alarm.vnf_member_index, alarm.vdu_name, - alarm.alarm_uuid) + alarm.alarm_uuid, + ) except ValueError: - log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid) + log.exception( + "Error deleting alarm in MON %s", alarm.alarm_uuid + ) alarm.delete_instance() except Exception as e: @@ -186,22 +213,33 @@ class AlarmingService: try: with database.db.atomic(): alarm = VnfAlarmRepository.get(VnfAlarm.alarm_uuid == alarm_uuid) - log.debug("Handling vnf alarm %s with status %s", alarm.alarm_id, status) + log.debug( + "Handling vnf alarm %s with status %s", alarm.alarm_id, status + ) for action in alarm.actions: if action.type == status: - log.info("Executing request to url %s for vnf alarm %s with status %s", action.url, - alarm.alarm_id, status) + log.info( + "Executing request to url %s for vnf alarm %s with status %s", + action.url, + alarm.alarm_id, + status, + ) try: requests.post(url=action.url, json=json.dumps(payload)) except ConnectionError: log.exception("Error connecting to url %s", action.url) except VnfAlarm.DoesNotExist: - log.debug("There is no alarming action configured for alarm %s.", alarm_uuid) + log.debug( + "There is no alarming action configured for alarm %s.", alarm_uuid + ) finally: database.db.close() def _get_metric_name(self, vnf_monitoring_param: dict): - if 'performance-metric' in vnf_monitoring_param: - return vnf_monitoring_param['performance-metric'] - raise ValueError('No metric name found for vnf_monitoring_param %s' % vnf_monitoring_param['id']) + if "performance-metric" in vnf_monitoring_param: + return vnf_monitoring_param["performance-metric"] + raise ValueError( + "No metric name found for vnf_monitoring_param %s" + % vnf_monitoring_param["id"] + ) diff --git a/osm_policy_module/autoscaling/service.py b/osm_policy_module/autoscaling/service.py index 57756b7..2709f64 100644 --- a/osm_policy_module/autoscaling/service.py +++ b/osm_policy_module/autoscaling/service.py @@ -32,15 +32,22 @@ from osm_policy_module.common.lcm_client import LcmClient from osm_policy_module.common.mon_client import MonClient from osm_policy_module.core import database from osm_policy_module.core.config import Config -from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria, \ - ScalingAlarmRepository, ScalingGroupRepository, ScalingPolicyRepository, ScalingCriteriaRepository +from osm_policy_module.core.database import ( + ScalingGroup, + ScalingAlarm, + ScalingPolicy, + ScalingCriteria, + ScalingAlarmRepository, + ScalingGroupRepository, + ScalingPolicyRepository, + ScalingCriteriaRepository, +) from osm_policy_module.core.exceptions import VdurNotFound log = logging.getLogger(__name__) class AutoscalingService: - def __init__(self, config: Config, loop=None): self.conf = config if not loop: @@ -56,8 +63,9 @@ class AutoscalingService: :param nsr_id: Network service record id :return: """ - log.info("Configuring scaling groups for network service with nsr_id: %s", - nsr_id) + log.info( + "Configuring scaling groups for network service with nsr_id: %s", nsr_id + ) alarms_created = [] database.db.connect() try: @@ -66,97 +74,156 @@ class AutoscalingService: vnfrs = self.db_client.get_vnfrs(nsr_id) for vnfr in vnfrs: log.debug("Processing vnfr: %s", vnfr) - vnfd = self.db_client.get_vnfd(vnfr['vnfd-id']) + vnfd = self.db_client.get_vnfd(vnfr["vnfd-id"]) # TODO: Change for multiple DF support - df = vnfd.get('df', [{}])[0] - if 'scaling-aspect' not in df: + df = vnfd.get("df", [{}])[0] + if "scaling-aspect" not in df: log.debug("No scaling aspect present in vnfd") continue # TODO: Change for multiple instantiation levels support - scaling_aspects = df['scaling-aspect'] - all_vnfd_monitoring_params = self._get_all_vnfd_monitoring_params(vnfd) + scaling_aspects = df["scaling-aspect"] + all_vnfd_monitoring_params = ( + self._get_all_vnfd_monitoring_params(vnfd) + ) for scaling_aspect in scaling_aspects: - scaling_group_record = self._get_or_create_scaling_group(nsr_id, - vnfr['member-vnf-index-ref'], - scaling_aspect) - vdurs = self._get_monitored_vdurs(scaling_aspect, vnfr['vdur']) - for scaling_policy in scaling_aspect.get('scaling-policy', ()): - if scaling_policy['scaling-type'] != 'automatic': + scaling_group_record = self._get_or_create_scaling_group( + nsr_id, vnfr["member-vnf-index-ref"], scaling_aspect + ) + vdurs = self._get_monitored_vdurs( + scaling_aspect, vnfr["vdur"] + ) + for scaling_policy in scaling_aspect.get( + "scaling-policy", () + ): + if scaling_policy["scaling-type"] != "automatic": continue - scaling_policy_record = self._get_or_create_scaling_policy(nsr_id, - scaling_policy, - scaling_group_record) - - for scaling_criteria in scaling_policy['scaling-criteria']: - scaling_criteria_record = self._get_or_create_scaling_criteria( - nsr_id, - scaling_criteria, - scaling_policy_record + scaling_policy_record = ( + self._get_or_create_scaling_policy( + nsr_id, scaling_policy, scaling_group_record + ) + ) + + for scaling_criteria in scaling_policy[ + "scaling-criteria" + ]: + scaling_criteria_record = ( + self._get_or_create_scaling_criteria( + nsr_id, + scaling_criteria, + scaling_policy_record, + ) ) - monitoring_param_ref = scaling_criteria.get('vnf-monitoring-param-ref') - vnf_monitoring_param = all_vnfd_monitoring_params[monitoring_param_ref] + monitoring_param_ref = scaling_criteria.get( + "vnf-monitoring-param-ref" + ) + vnf_monitoring_param = all_vnfd_monitoring_params[ + monitoring_param_ref + ] for vdur in vdurs: - vdu_id = vdur['vdu-id-ref'] + vdu_id = vdur["vdu-id-ref"] log.debug("Creating alarm for vdur %s ", vdur) try: - ScalingAlarmRepository.get(ScalingAlarm.vdu_name == vdur['name'], - ScalingCriteria.name == scaling_criteria['name'], - ScalingPolicy.name == scaling_policy['name'], - ScalingGroup.nsr_id == nsr_id, - join_classes=[ScalingCriteria, - ScalingPolicy, - ScalingGroup]) - log.debug("vdu %s already has an alarm configured", vdur['name']) + ScalingAlarmRepository.get( + ScalingAlarm.vdu_name == vdur["name"], + ScalingCriteria.name + == scaling_criteria["name"], + ScalingPolicy.name + == scaling_policy["name"], + ScalingGroup.nsr_id == nsr_id, + join_classes=[ + ScalingCriteria, + ScalingPolicy, + ScalingGroup, + ], + ) + log.debug( + "vdu %s already has an alarm configured", + vdur["name"], + ) continue except ScalingAlarm.DoesNotExist: pass - metric_name = self._get_metric_name(vnf_monitoring_param) + metric_name = self._get_metric_name( + vnf_monitoring_param + ) db_nsr = self.db_client.get_nsr(nsr_id) nb_scale_op = 0 if db_nsr["_admin"].get("scaling-group"): - db_nsr_admin = db_nsr["_admin"]["scaling-group"] - for admin_scale_index, admin_scale_info in enumerate(db_nsr_admin): - if admin_scale_info["name"] == scaling_aspect["name"]: - nb_scale_op = admin_scale_info.get("nb-scale-op", 0) + db_nsr_admin = db_nsr["_admin"][ + "scaling-group" + ] + for ( + admin_scale_index, + admin_scale_info, + ) in enumerate(db_nsr_admin): + if ( + admin_scale_info["name"] + == scaling_aspect["name"] + ): + nb_scale_op = admin_scale_info.get( + "nb-scale-op", 0 + ) break min_instance_count = 1 - for vdu_profile in df.get('vdu-profile', ()): - if vdu_profile.get('id') == vdu_id: - min_instance_count = int(vdu_profile.get('min-number-of-instances ', 1)) + for vdu_profile in df.get("vdu-profile", ()): + if vdu_profile.get("id") == vdu_id: + min_instance_count = int( + vdu_profile.get( + "min-number-of-instances ", 1 + ) + ) break if nb_scale_op >= min_instance_count: - alarm_uuid = await self.mon_client.create_alarm( - metric_name=metric_name, - ns_id=nsr_id, - vdu_name=vdur['name'], - vnf_member_index=vnfr['member-vnf-index-ref'], - threshold=scaling_criteria['scale-in-threshold'], - operation=scaling_criteria['scale-in-relational-operation'] + alarm_uuid = ( + await self.mon_client.create_alarm( + metric_name=metric_name, + ns_id=nsr_id, + vdu_name=vdur["name"], + vnf_member_index=vnfr[ + "member-vnf-index-ref" + ], + threshold=scaling_criteria[ + "scale-in-threshold" + ], + operation=scaling_criteria[ + "scale-in-relational-operation" + ], + ) ) alarm = ScalingAlarmRepository.create( alarm_uuid=alarm_uuid, - action='scale_in', - vnf_member_index=vnfr['member-vnf-index-ref'], - vdu_name=vdur['name'], - scaling_criteria=scaling_criteria_record + action="scale_in", + vnf_member_index=vnfr[ + "member-vnf-index-ref" + ], + vdu_name=vdur["name"], + scaling_criteria=scaling_criteria_record, ) alarms_created.append(alarm) alarm_uuid = await self.mon_client.create_alarm( metric_name=metric_name, ns_id=nsr_id, - vdu_name=vdur['name'], - vnf_member_index=vnfr['member-vnf-index-ref'], - threshold=scaling_criteria['scale-out-threshold'], - operation=scaling_criteria['scale-out-relational-operation'] + vdu_name=vdur["name"], + vnf_member_index=vnfr[ + "member-vnf-index-ref" + ], + threshold=scaling_criteria[ + "scale-out-threshold" + ], + operation=scaling_criteria[ + "scale-out-relational-operation" + ], ) alarm = ScalingAlarmRepository.create( alarm_uuid=alarm_uuid, - action='scale_out', - vnf_member_index=vnfr['member-vnf-index-ref'], - vdu_name=vdur['name'], - scaling_criteria=scaling_criteria_record + action="scale_out", + vnf_member_index=vnfr[ + "member-vnf-index-ref" + ], + vdu_name=vdur["name"], + scaling_criteria=scaling_criteria_record, ) alarms_created.append(alarm) @@ -170,7 +237,8 @@ class AutoscalingService: alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id, alarm.vnf_member_index, alarm.vdu_name, - alarm.alarm_uuid) + alarm.alarm_uuid, + ) raise e finally: database.db.close() @@ -181,7 +249,9 @@ class AutoscalingService: try: with database.db.atomic() as tx: try: - for scaling_group in ScalingGroupRepository.list(ScalingGroup.nsr_id == nsr_id): + for scaling_group in ScalingGroupRepository.list( + ScalingGroup.nsr_id == nsr_id + ): for scaling_policy in scaling_group.scaling_policies: for scaling_criteria in scaling_policy.scaling_criterias: for alarm in scaling_criteria.scaling_alarms: @@ -190,9 +260,13 @@ class AutoscalingService: alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id, alarm.vnf_member_index, alarm.vdu_name, - alarm.alarm_uuid) + alarm.alarm_uuid, + ) except ValueError: - log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid) + log.exception( + "Error deleting alarm in MON %s", + alarm.alarm_uuid, + ) alarm.delete_instance() scaling_criteria.delete_instance() scaling_policy.delete_instance() @@ -211,22 +285,35 @@ class AutoscalingService: try: with database.db.atomic() as tx: try: - for scaling_group in ScalingGroupRepository.list(ScalingGroup.nsr_id == nsr_id): + for scaling_group in ScalingGroupRepository.list( + ScalingGroup.nsr_id == nsr_id + ): for scaling_policy in scaling_group.scaling_policies: for scaling_criteria in scaling_policy.scaling_criterias: for alarm in scaling_criteria.scaling_alarms: try: - self.db_client.get_vdur(nsr_id, alarm.vnf_member_index, alarm.vdu_name) + self.db_client.get_vdur( + nsr_id, + alarm.vnf_member_index, + alarm.vdu_name, + ) except VdurNotFound: - log.debug("Deleting orphaned scaling alarm %s", alarm.alarm_uuid) + log.debug( + "Deleting orphaned scaling alarm %s", + alarm.alarm_uuid, + ) try: await self.mon_client.delete_alarm( alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id, alarm.vnf_member_index, alarm.vdu_name, - alarm.alarm_uuid) + alarm.alarm_uuid, + ) except ValueError: - log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid) + log.exception( + "Error deleting alarm in MON %s", + alarm.alarm_uuid, + ) alarm.delete_instance() except Exception as e: @@ -244,11 +331,15 @@ class AutoscalingService: database.db.connect() try: with database.db.atomic(): - alarm = ScalingAlarmRepository.get(ScalingAlarm.alarm_uuid == alarm_uuid) + alarm = ScalingAlarmRepository.get( + ScalingAlarm.alarm_uuid == alarm_uuid + ) alarm.last_status = status alarm.save() except ScalingAlarm.DoesNotExist: - log.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid) + log.debug( + "There is no autoscaling action configured for alarm %s.", alarm_uuid + ) finally: database.db.close() @@ -256,40 +347,52 @@ class AutoscalingService: database.db.connect() try: with database.db.atomic(): - alarm = ScalingAlarmRepository.get(ScalingAlarm.alarm_uuid == alarm_uuid) + alarm = ScalingAlarmRepository.get( + ScalingAlarm.alarm_uuid == alarm_uuid + ) vnf_member_index = alarm.vnf_member_index action = alarm.action scaling_policy = alarm.scaling_criteria.scaling_policy if not scaling_policy.enabled: return - if action == 'scale_in': + if action == "scale_in": operation = scaling_policy.scale_in_operation - elif action == 'scale_out': + elif action == "scale_out": operation = scaling_policy.scale_out_operation else: - raise Exception('Unknown alarm action {}'.format(alarm.action)) - alarms = ScalingAlarmRepository.list(ScalingAlarm.scaling_criteria == alarm.scaling_criteria, - ScalingAlarm.action == alarm.action, - ScalingAlarm.vnf_member_index == vnf_member_index, - ScalingAlarm.vdu_name == alarm.vdu_name) + raise Exception("Unknown alarm action {}".format(alarm.action)) + alarms = ScalingAlarmRepository.list( + ScalingAlarm.scaling_criteria == alarm.scaling_criteria, + ScalingAlarm.action == alarm.action, + ScalingAlarm.vnf_member_index == vnf_member_index, + ScalingAlarm.vdu_name == alarm.vdu_name, + ) statuses = [] for alarm in alarms: statuses.append(alarm.last_status) - if (operation == 'AND' and set(statuses) == {'alarm'}) or (operation == 'OR' and 'alarm' in statuses): + if (operation == "AND" and set(statuses) == {"alarm"}) or ( + operation == "OR" and "alarm" in statuses + ): delta = datetime.datetime.now() - scaling_policy.last_scale if delta.total_seconds() > scaling_policy.cooldown_time: - log.info("Sending %s action message for ns: %s", - alarm.action, - scaling_policy.scaling_group.nsr_id) - await self.lcm_client.scale(scaling_policy.scaling_group.nsr_id, - scaling_policy.scaling_group.name, - vnf_member_index, - action) + log.info( + "Sending %s action message for ns: %s", + alarm.action, + scaling_policy.scaling_group.nsr_id, + ) + await self.lcm_client.scale( + scaling_policy.scaling_group.nsr_id, + scaling_policy.scaling_group.name, + vnf_member_index, + action, + ) scaling_policy.last_scale = datetime.datetime.now() scaling_policy.save() except ScalingAlarm.DoesNotExist: - log.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid) + log.debug( + "There is no autoscaling action configured for alarm %s.", alarm_uuid + ) finally: database.db.close() @@ -309,12 +412,14 @@ class AutoscalingService: return all_monitoring_params - def _get_or_create_scaling_group(self, nsr_id: str, vnf_member_index: str, scaling_aspect: dict): + def _get_or_create_scaling_group( + self, nsr_id: str, vnf_member_index: str, scaling_aspect: dict + ): try: scaling_group_record = ScalingGroupRepository.get( ScalingGroup.nsr_id == nsr_id, ScalingGroup.vnf_member_index == vnf_member_index, - ScalingGroup.name == scaling_aspect['name'] + ScalingGroup.name == scaling_aspect["name"], ) log.debug("Found existing scaling group record in DB...") except ScalingGroup.DoesNotExist: @@ -322,81 +427,98 @@ class AutoscalingService: scaling_group_record = ScalingGroupRepository.create( nsr_id=nsr_id, vnf_member_index=vnf_member_index, - name=scaling_aspect['name'], - content=json.dumps(scaling_aspect) + name=scaling_aspect["name"], + content=json.dumps(scaling_aspect), ) log.debug( "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s", scaling_group_record.nsr_id, scaling_group_record.vnf_member_index, - scaling_group_record.name) + scaling_group_record.name, + ) return scaling_group_record - def _get_or_create_scaling_policy(self, nsr_id: str, scaling_policy: dict, scaling_group_record: ScalingGroup): + def _get_or_create_scaling_policy( + self, nsr_id: str, scaling_policy: dict, scaling_group_record: ScalingGroup + ): try: scaling_policy_record = ScalingPolicyRepository.get( - ScalingPolicy.name == scaling_policy['name'], + ScalingPolicy.name == scaling_policy["name"], ScalingGroup.id == scaling_group_record.id, - join_classes=[ScalingGroup] + join_classes=[ScalingGroup], ) log.debug("Found existing scaling policy record in DB...") except ScalingPolicy.DoesNotExist: log.debug("Creating scaling policy record in DB...") scaling_policy_record = ScalingPolicyRepository.create( nsr_id=nsr_id, - name=scaling_policy['name'], - cooldown_time=scaling_policy['cooldown-time'], + name=scaling_policy["name"], + cooldown_time=scaling_policy["cooldown-time"], scaling_group=scaling_group_record, ) - if 'scale-in-operation-type' in scaling_policy: - scaling_policy_record.scale_in_operation = scaling_policy['scale-in-operation-type'] - if 'scale-out-operation-type' in scaling_policy: - scaling_policy_record.scale_out_operation = scaling_policy['scale-out-operation-type'] - if 'enabled' in scaling_policy: - scaling_policy_record.enabled = scaling_policy['enabled'] + if "scale-in-operation-type" in scaling_policy: + scaling_policy_record.scale_in_operation = scaling_policy[ + "scale-in-operation-type" + ] + if "scale-out-operation-type" in scaling_policy: + scaling_policy_record.scale_out_operation = scaling_policy[ + "scale-out-operation-type" + ] + if "enabled" in scaling_policy: + scaling_policy_record.enabled = scaling_policy["enabled"] scaling_policy_record.save() - log.debug("Created scaling policy record in DB : name=%s, scaling_group.name=%s", - scaling_policy_record.name, - scaling_policy_record.scaling_group.name) + log.debug( + "Created scaling policy record in DB : name=%s, scaling_group.name=%s", + scaling_policy_record.name, + scaling_policy_record.scaling_group.name, + ) return scaling_policy_record - def _get_or_create_scaling_criteria(self, nsr_id: str, scaling_criteria: dict, - scaling_policy_record: ScalingPolicy): + def _get_or_create_scaling_criteria( + self, nsr_id: str, scaling_criteria: dict, scaling_policy_record: ScalingPolicy + ): try: scaling_criteria_record = ScalingCriteriaRepository.get( ScalingPolicy.id == scaling_policy_record.id, - ScalingCriteria.name == scaling_criteria['name'], - join_classes=[ScalingPolicy] + ScalingCriteria.name == scaling_criteria["name"], + join_classes=[ScalingPolicy], ) log.debug("Found existing scaling criteria record in DB...") except ScalingCriteria.DoesNotExist: log.debug("Creating scaling criteria record in DB...") scaling_criteria_record = ScalingCriteriaRepository.create( nsr_id=nsr_id, - name=scaling_criteria['name'], - scaling_policy=scaling_policy_record + name=scaling_criteria["name"], + scaling_policy=scaling_policy_record, ) log.debug( "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s", scaling_criteria_record.name, - scaling_criteria_record.scaling_policy.name) + scaling_criteria_record.scaling_policy.name, + ) return scaling_criteria_record def _get_monitored_vdurs(self, scaling_aspect: dict, vdurs): all_monitored_vdus = set() - for delta in scaling_aspect.get('aspect-delta-details', {}).get('deltas', ()): - for vdu_delta in delta.get('vdu-delta', ()): - all_monitored_vdus.add(vdu_delta.get('id')) + for delta in scaling_aspect.get("aspect-delta-details", {}).get("deltas", ()): + for vdu_delta in delta.get("vdu-delta", ()): + all_monitored_vdus.add(vdu_delta.get("id")) - monitored_vdurs = list(filter(lambda vdur: vdur['vdu-id-ref'] in all_monitored_vdus, vdurs)) + monitored_vdurs = list( + filter(lambda vdur: vdur["vdu-id-ref"] in all_monitored_vdus, vdurs) + ) if not monitored_vdurs: log.warning( "Scaling criteria is referring to a vnf-monitoring-param that does not " - "contain a reference to a vdu or vnf metric.") + "contain a reference to a vdu or vnf metric." + ) return monitored_vdurs def _get_metric_name(self, vnf_monitoring_param: dict): - if 'performance-metric' in vnf_monitoring_param: - return vnf_monitoring_param['performance-metric'] - raise ValueError('No metric name found for vnf_monitoring_param %s' % vnf_monitoring_param['id']) + if "performance-metric" in vnf_monitoring_param: + return vnf_monitoring_param["performance-metric"] + raise ValueError( + "No metric name found for vnf_monitoring_param %s" + % vnf_monitoring_param["id"] + ) diff --git a/osm_policy_module/cmd/policy_module_agent.py b/osm_policy_module/cmd/policy_module_agent.py index e6c0681..43dbc34 100644 --- a/osm_policy_module/cmd/policy_module_agent.py +++ b/osm_policy_module/cmd/policy_module_agent.py @@ -34,19 +34,21 @@ from osm_policy_module.core.database import DatabaseManager def main(): # Cleanup old temp health file - if os.path.exists('/tmp/osm_pol_agent_health_flag'): - os.remove('/tmp/osm_pol_agent_health_flag') + if os.path.exists("/tmp/osm_pol_agent_health_flag"): + os.remove("/tmp/osm_pol_agent_health_flag") - parser = argparse.ArgumentParser(prog='osm-policy-agent') - parser.add_argument('--config-file', nargs='?', help='POL configuration file') + parser = argparse.ArgumentParser(prog="osm-policy-agent") + parser.add_argument("--config-file", nargs="?", help="POL configuration file") args = parser.parse_args() cfg = Config(args.config_file) root = logging.getLogger() - root.setLevel(logging.getLevelName(cfg.get('global', 'loglevel'))) + root.setLevel(logging.getLevelName(cfg.get("global", "loglevel"))) ch = logging.StreamHandler(sys.stdout) - ch.setLevel(logging.getLevelName(cfg.get('global', 'loglevel'))) - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p') + ch.setLevel(logging.getLevelName(cfg.get("global", "loglevel"))) + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "%m/%d/%Y %I:%M:%S %p" + ) ch.setFormatter(formatter) root.addHandler(ch) @@ -62,5 +64,5 @@ def main(): agent.run() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/osm_policy_module/cmd/policy_module_healthcheck.py b/osm_policy_module/cmd/policy_module_healthcheck.py index de85ad0..23fb6a7 100644 --- a/osm_policy_module/cmd/policy_module_healthcheck.py +++ b/osm_policy_module/cmd/policy_module_healthcheck.py @@ -29,8 +29,8 @@ log = logging.getLogger(__name__) def main(): - parser = argparse.ArgumentParser(prog='osm-policy-healthcheck') - parser.add_argument('--config-file', nargs='?', help='POL configuration file') + parser = argparse.ArgumentParser(prog="osm-policy-healthcheck") + parser.add_argument("--config-file", nargs="?", help="POL configuration file") # args = parser.parse_args() # cfg = Config(args.config_file) @@ -46,20 +46,20 @@ def _processes_running(): return True return False - processes_to_check = ['osm-policy-agent'] - ps = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE).communicate()[0] - processes_running = ps.decode().split('\n') + processes_to_check = ["osm-policy-agent"] + ps = subprocess.Popen(["ps", "aux"], stdout=subprocess.PIPE).communicate()[0] + processes_running = ps.decode().split("\n") for p in processes_to_check: if not _contains_process(processes_running, p): log.error("Process %s not running!" % p) return False # Check if process is running properly (listening to kafka bus) - if os.path.exists('/tmp/osm_pol_agent_health_flag'): + if os.path.exists("/tmp/osm_pol_agent_health_flag"): return True else: return False -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/osm_policy_module/common/common_db_client.py b/osm_policy_module/common/common_db_client.py index 9ec183a..1bd4e76 100644 --- a/osm_policy_module/common/common_db_client.py +++ b/osm_policy_module/common/common_db_client.py @@ -29,47 +29,56 @@ from osm_policy_module.core.exceptions import VdurNotFound class CommonDbClient: def __init__(self, config: Config): - if config.get('database', 'driver') == "mongo": + if config.get("database", "driver") == "mongo": self.common_db = dbmongo.DbMongo() - elif config.get('database', 'driver') == "memory": + elif config.get("database", "driver") == "memory": self.common_db = dbmemory.DbMemory() else: - raise Exception("Unknown database driver {}".format(config.get('section', 'driver'))) + raise Exception( + "Unknown database driver {}".format(config.get("section", "driver")) + ) self.common_db.db_connect(config.get("database")) def get_vnfr(self, nsr_id: str, member_index: str): - vnfr = self.common_db.get_one("vnfrs", - {"nsr-id-ref": nsr_id, "member-vnf-index-ref": member_index}) + vnfr = self.common_db.get_one( + "vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": member_index} + ) return vnfr def get_vnfrs(self, nsr_id: str): # TODO: Change for multiple DF support - nsr_nsd_df = self.get_nsr(nsr_id)['nsd'].get('df', [{}])[0] - all_nsd_member_vnf_index = [vnf.get('id') for vnf in nsr_nsd_df.get('vnf-profile', [])] - return [self.get_vnfr(nsr_id, member_index) for member_index in all_nsd_member_vnf_index] + nsr_nsd_df = self.get_nsr(nsr_id)["nsd"].get("df", [{}])[0] + all_nsd_member_vnf_index = [ + vnf.get("id") for vnf in nsr_nsd_df.get("vnf-profile", []) + ] + return [ + self.get_vnfr(nsr_id, member_index) + for member_index in all_nsd_member_vnf_index + ] def get_vnfd(self, vnfd_id: str): - vnfr = self.common_db.get_one("vnfds", - {"_id": vnfd_id}) + vnfr = self.common_db.get_one("vnfds", {"_id": vnfd_id}) return vnfr def get_nsr(self, nsr_id: str): - nsr = self.common_db.get_one("nsrs", - {"id": nsr_id}) + nsr = self.common_db.get_one("nsrs", {"id": nsr_id}) return nsr def get_nslcmop(self, nslcmop_id): - nslcmop = self.common_db.get_one("nslcmops", - {"_id": nslcmop_id}) + nslcmop = self.common_db.get_one("nslcmops", {"_id": nslcmop_id}) return nslcmop def get_vdur(self, nsr_id, member_index, vdur_name): vnfr = self.get_vnfr(nsr_id, member_index) - for vdur in vnfr['vdur']: - if vdur['name'] == vdur_name: + for vdur in vnfr["vdur"]: + if vdur["name"] == vdur_name: return vdur - raise VdurNotFound('vdur not found for nsr-id %s, member_index %s and vdur_name %s', nsr_id, member_index, - vdur_name) + raise VdurNotFound( + "vdur not found for nsr-id %s, member_index %s and vdur_name %s", + nsr_id, + member_index, + vdur_name, + ) def create_nslcmop(self, nslcmop): self.common_db.create("nslcmops", nslcmop) diff --git a/osm_policy_module/common/lcm_client.py b/osm_policy_module/common/lcm_client.py index 089e541..5085a86 100644 --- a/osm_policy_module/common/lcm_client.py +++ b/osm_policy_module/common/lcm_client.py @@ -47,7 +47,9 @@ class LcmClient: loop = asyncio.get_event_loop() self.loop = loop - async def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: str, action: str): + async def scale( + self, nsr_id: str, scaling_group_name: str, vnf_member_index: str, action: str + ): """ Sends scaling action to LCM through the message bus. @@ -57,14 +59,25 @@ class LcmClient: :param action: Scaling action to be executed. Valid values: scale_in, scale_out :return: """ - log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action) + log.debug( + "scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action + ) nsr = self.db_client.get_nsr(nsr_id) - nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action, nsr['_admin']) + nslcmop = self._generate_nslcmop( + nsr_id, scaling_group_name, vnf_member_index, action, nsr["_admin"] + ) self.db_client.create_nslcmop(nslcmop) log.debug("Sending scale action message: %s", json.dumps(nslcmop)) await self.msg_bus.aiowrite("ns", "scale", nslcmop) - def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: str, action: str, admin: dict): + def _generate_nslcmop( + self, + nsr_id: str, + scaling_group_name: str, + vnf_member_index: str, + action: str, + admin: dict, + ): """ Builds scaling nslcmop. @@ -75,7 +88,14 @@ class LcmClient: :param admin: Dict corresponding to the _admin section of the nsr. Required keys: projects_read, projects_write. :return: """ - log.debug("_generate_nslcmop %s %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action, admin) + log.debug( + "_generate_nslcmop %s %s %s %s %s", + nsr_id, + scaling_group_name, + vnf_member_index, + action, + admin, + ) _id = str(uuid.uuid4()) now = time.time() params = { @@ -84,10 +104,10 @@ class LcmClient: "scaleVnfType": action.upper(), "scaleByStepData": { "scaling-group-descriptor": scaling_group_name, - "member-vnf-index": vnf_member_index - } + "member-vnf-index": vnf_member_index, + }, }, - "scaleTime": "{}Z".format(datetime.datetime.utcnow().isoformat()) + "scaleTime": "{}Z".format(datetime.datetime.utcnow().isoformat()), } nslcmop = { @@ -106,8 +126,8 @@ class LcmClient: "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id, }, "_admin": { - "projects_read": admin['projects_read'], - "projects_write": admin['projects_write'] - } + "projects_read": admin["projects_read"], + "projects_write": admin["projects_write"], + }, } return nslcmop diff --git a/osm_policy_module/common/message_bus_client.py b/osm_policy_module/common/message_bus_client.py index ea5095d..4073d0f 100644 --- a/osm_policy_module/common/message_bus_client.py +++ b/osm_policy_module/common/message_bus_client.py @@ -31,13 +31,15 @@ from osm_policy_module.core.config import Config class MessageBusClient: def __init__(self, config: Config, loop=None): - if config.get('message', 'driver') == "local": + if config.get("message", "driver") == "local": self.msg_bus = msglocal.MsgLocal() - elif config.get('message', 'driver') == "kafka": + elif config.get("message", "driver") == "kafka": self.msg_bus = msgkafka.MsgKafka() else: - raise Exception("Unknown message bug driver {}".format(config.get('section', 'driver'))) - self.msg_bus.connect(config.get('message')) + raise Exception( + "Unknown message bug driver {}".format(config.get("section", "driver")) + ) + self.msg_bus.connect(config.get("message")) if not loop: loop = asyncio.get_event_loop() self.loop = loop diff --git a/osm_policy_module/common/mon_client.py b/osm_policy_module/common/mon_client.py index c83f794..ff8339f 100644 --- a/osm_policy_module/common/mon_client.py +++ b/osm_policy_module/common/mon_client.py @@ -37,31 +37,46 @@ log = logging.getLogger(__name__) class MonClient: def __init__(self, config: Config, loop=None): - self.kafka_server = '{}:{}'.format(config.get('message', 'host'), - config.get('message', 'port')) + self.kafka_server = "{}:{}".format( + config.get("message", "host"), config.get("message", "port") + ) if not loop: loop = asyncio.get_event_loop() self.loop = loop - async def create_alarm(self, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: str, threshold: int, - operation: str, statistic: str = 'AVERAGE'): + async def create_alarm( + self, + metric_name: str, + ns_id: str, + vdu_name: str, + vnf_member_index: str, + threshold: int, + operation: str, + statistic: str = "AVERAGE", + ): cor_id = random.randint(1, 10e7) - msg = self._build_create_alarm_payload(cor_id, - metric_name, - ns_id, - vdu_name, - vnf_member_index, - threshold, - statistic, - operation) + msg = self._build_create_alarm_payload( + cor_id, + metric_name, + ns_id, + vdu_name, + vnf_member_index, + threshold, + statistic, + operation, + ) log.debug("Sending create_alarm_request %s", msg) - producer = AIOKafkaProducer(loop=self.loop, - bootstrap_servers=self.kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) + producer = AIOKafkaProducer( + loop=self.loop, + bootstrap_servers=self.kafka_server, + key_serializer=str.encode, + value_serializer=str.encode, + ) await producer.start() try: - await producer.send_and_wait("alarm_request", key="create_alarm_request", value=json.dumps(msg)) + await producer.send_and_wait( + "alarm_request", key="create_alarm_request", value=json.dumps(msg) + ) finally: await producer.stop() log.debug("Waiting for create_alarm_response...") @@ -71,7 +86,8 @@ class MonClient: bootstrap_servers=self.kafka_server, key_deserializer=bytes.decode, value_deserializer=bytes.decode, - auto_offset_reset='earliest') + auto_offset_reset="earliest", + ) await consumer.start() alarm_uuid = None try: @@ -81,28 +97,36 @@ class MonClient: except JSONDecodeError: content = yaml.safe_load(message.value) log.debug("Received create_alarm_response %s", content) - if content['alarm_create_response']['correlation_id'] == cor_id: - if not content['alarm_create_response']['status']: + if content["alarm_create_response"]["correlation_id"] == cor_id: + if not content["alarm_create_response"]["status"]: raise ValueError("Error creating alarm in MON") - alarm_uuid = content['alarm_create_response']['alarm_uuid'] + alarm_uuid = content["alarm_create_response"]["alarm_uuid"] break finally: await consumer.stop() if not alarm_uuid: - raise ValueError('No alarm deletion response from MON. Is MON up?') + raise ValueError("No alarm deletion response from MON. Is MON up?") return alarm_uuid - async def delete_alarm(self, ns_id: str, vnf_member_index: str, vdu_name: str, alarm_uuid: str): + async def delete_alarm( + self, ns_id: str, vnf_member_index: str, vdu_name: str, alarm_uuid: str + ): cor_id = random.randint(1, 10e7) - msg = self._build_delete_alarm_payload(cor_id, ns_id, vdu_name, vnf_member_index, alarm_uuid) + msg = self._build_delete_alarm_payload( + cor_id, ns_id, vdu_name, vnf_member_index, alarm_uuid + ) log.debug("Sending delete_alarm_request %s", msg) - producer = AIOKafkaProducer(loop=self.loop, - bootstrap_servers=self.kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) + producer = AIOKafkaProducer( + loop=self.loop, + bootstrap_servers=self.kafka_server, + key_serializer=str.encode, + value_serializer=str.encode, + ) await producer.start() try: - await producer.send_and_wait("alarm_request", key="delete_alarm_request", value=json.dumps(msg)) + await producer.send_and_wait( + "alarm_request", key="delete_alarm_request", value=json.dumps(msg) + ) finally: await producer.stop() log.debug("Waiting for delete_alarm_response...") @@ -112,7 +136,8 @@ class MonClient: bootstrap_servers=self.kafka_server, key_deserializer=bytes.decode, value_deserializer=bytes.decode, - auto_offset_reset='earliest') + auto_offset_reset="earliest", + ) await consumer.start() alarm_uuid = None try: @@ -121,58 +146,71 @@ class MonClient: content = json.loads(message.value) except JSONDecodeError: content = yaml.safe_load(message.value) - if content['alarm_delete_response']['correlation_id'] == cor_id: + if content["alarm_delete_response"]["correlation_id"] == cor_id: log.debug("Received delete_alarm_response %s", content) - if not content['alarm_delete_response']['status']: - raise ValueError("Error deleting alarm in MON. Response status is False.") - alarm_uuid = content['alarm_delete_response']['alarm_uuid'] + if not content["alarm_delete_response"]["status"]: + raise ValueError( + "Error deleting alarm in MON. Response status is False." + ) + alarm_uuid = content["alarm_delete_response"]["alarm_uuid"] break finally: await consumer.stop() if not alarm_uuid: - raise ValueError('No alarm deletion response from MON. Is MON up?') + raise ValueError("No alarm deletion response from MON. Is MON up?") return alarm_uuid - def _build_create_alarm_payload(self, cor_id: int, - metric_name: str, - ns_id: str, - vdu_name: str, - vnf_member_index: str, - threshold: int, - statistic: str, - operation: str): + def _build_create_alarm_payload( + self, + cor_id: int, + metric_name: str, + ns_id: str, + vdu_name: str, + vnf_member_index: str, + threshold: int, + statistic: str, + operation: str, + ): alarm_create_request = { - 'correlation_id': cor_id, - 'alarm_name': 'osm_alarm_{}_{}_{}_{}'.format(ns_id, vnf_member_index, vdu_name, metric_name), - 'metric_name': metric_name, - 'operation': operation, - 'severity': 'critical', - 'threshold_value': threshold, - 'statistic': statistic, - 'tags': { - 'ns_id': ns_id, - 'vdu_name': vdu_name, - 'vnf_member_index': vnf_member_index, - } + "correlation_id": cor_id, + "alarm_name": "osm_alarm_{}_{}_{}_{}".format( + ns_id, vnf_member_index, vdu_name, metric_name + ), + "metric_name": metric_name, + "operation": operation, + "severity": "critical", + "threshold_value": threshold, + "statistic": statistic, + "tags": { + "ns_id": ns_id, + "vdu_name": vdu_name, + "vnf_member_index": vnf_member_index, + }, } msg = { - 'alarm_create_request': alarm_create_request, + "alarm_create_request": alarm_create_request, } return msg - def _build_delete_alarm_payload(self, cor_id: int, ns_id: str, vdu_name: str, - vnf_member_index: str, alarm_uuid: str): + def _build_delete_alarm_payload( + self, + cor_id: int, + ns_id: str, + vdu_name: str, + vnf_member_index: str, + alarm_uuid: str, + ): alarm_delete_request = { - 'correlation_id': cor_id, - 'alarm_uuid': alarm_uuid, - 'tags': { - 'ns_id': ns_id, - 'vdu_name': vdu_name, - 'vnf_member_index': vnf_member_index - } + "correlation_id": cor_id, + "alarm_uuid": alarm_uuid, + "tags": { + "ns_id": ns_id, + "vdu_name": vdu_name, + "vnf_member_index": vnf_member_index, + }, } msg = { - 'alarm_delete_request': alarm_delete_request, + "alarm_delete_request": alarm_delete_request, } return msg diff --git a/osm_policy_module/core/agent.py b/osm_policy_module/core/agent.py index 3f87d16..95cc830 100644 --- a/osm_policy_module/core/agent.py +++ b/osm_policy_module/core/agent.py @@ -36,7 +36,7 @@ from osm_policy_module.core.config import Config log = logging.getLogger(__name__) -ALLOWED_KAFKA_KEYS = ['instantiated', 'scaled', 'terminated', 'notify_alarm'] +ALLOWED_KAFKA_KEYS = ["instantiated", "scaled", "terminated", "notify_alarm"] class PolicyModuleAgent: @@ -54,32 +54,29 @@ class PolicyModuleAgent: self.loop.run_until_complete(self.start()) async def start(self): - Path('/tmp/osm_pol_agent_health_flag').touch() - topics = [ - "ns", - "alarm_response" - ] + Path("/tmp/osm_pol_agent_health_flag").touch() + topics = ["ns", "alarm_response"] await self.msg_bus.aioread(topics, self._process_msg) log.critical("Exiting...") - if os.path.exists('/tmp/osm_pol_agent_health_flag'): - os.remove('/tmp/osm_pol_agent_health_flag') + if os.path.exists("/tmp/osm_pol_agent_health_flag"): + os.remove("/tmp/osm_pol_agent_health_flag") async def _process_msg(self, topic, key, msg): - Path('/tmp/osm_pol_agent_health_flag').touch() + Path("/tmp/osm_pol_agent_health_flag").touch() log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg) try: if key in ALLOWED_KAFKA_KEYS: - if key == 'instantiated': + if key == "instantiated": await self._handle_instantiated(msg) - if key == 'scaled': + if key == "scaled": await self._handle_scaled(msg) - if key == 'terminated': + if key == "terminated": await self._handle_terminated(msg) - if key == 'notify_alarm': + if key == "notify_alarm": await self._handle_alarm_notification(msg) else: log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key) @@ -91,17 +88,20 @@ class PolicyModuleAgent: async def _handle_alarm_notification(self, content): log.debug("_handle_alarm_notification: %s", content) - alarm_uuid = content['notify_details']['alarm_uuid'] - status = content['notify_details']['status'] + alarm_uuid = content["notify_details"]["alarm_uuid"] + status = content["notify_details"]["status"] await self.autoscaling_service.handle_alarm(alarm_uuid, status) await self.alarming_service.handle_alarm(alarm_uuid, status, content) async def _handle_instantiated(self, content): log.debug("_handle_instantiated: %s", content) - nslcmop_id = content['nslcmop_id'] + nslcmop_id = content["nslcmop_id"] nslcmop = self.db_client.get_nslcmop(nslcmop_id) - if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED': - nsr_id = nslcmop['nsInstanceId'] + if ( + nslcmop["operationState"] == "COMPLETED" + or nslcmop["operationState"] == "PARTIALLY_COMPLETED" + ): + nsr_id = nslcmop["nsInstanceId"] log.info("Configuring nsr_id: %s", nsr_id) await self.autoscaling_service.configure_scaling_groups(nsr_id) await self.alarming_service.configure_vnf_alarms(nsr_id) @@ -109,14 +109,18 @@ class PolicyModuleAgent: log.info( "Network_service is not in COMPLETED or PARTIALLY_COMPLETED state. " "Current state is %s. Skipping...", - nslcmop['operationState']) + nslcmop["operationState"], + ) async def _handle_scaled(self, content): log.debug("_handle_scaled: %s", content) - nslcmop_id = content['nslcmop_id'] + nslcmop_id = content["nslcmop_id"] nslcmop = self.db_client.get_nslcmop(nslcmop_id) - if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED': - nsr_id = nslcmop['nsInstanceId'] + if ( + nslcmop["operationState"] == "COMPLETED" + or nslcmop["operationState"] == "PARTIALLY_COMPLETED" + ): + nsr_id = nslcmop["nsInstanceId"] log.info("Configuring scaled service with nsr_id: %s", nsr_id) await self.autoscaling_service.configure_scaling_groups(nsr_id) await self.autoscaling_service.delete_orphaned_alarms(nsr_id) @@ -125,17 +129,25 @@ class PolicyModuleAgent: log.debug( "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " "Current state is %s. Skipping...", - nslcmop['operationState']) + nslcmop["operationState"], + ) async def _handle_terminated(self, content): log.debug("_handle_deleted: %s", content) - nsr_id = content['nsr_id'] - if content['operationState'] == 'COMPLETED' or content['operationState'] == 'PARTIALLY_COMPLETED': - log.info("Deleting scaling groups and alarms for network autoscaling_service with nsr_id: %s", nsr_id) + nsr_id = content["nsr_id"] + if ( + content["operationState"] == "COMPLETED" + or content["operationState"] == "PARTIALLY_COMPLETED" + ): + log.info( + "Deleting scaling groups and alarms for network autoscaling_service with nsr_id: %s", + nsr_id, + ) await self.autoscaling_service.delete_scaling_groups(nsr_id) await self.alarming_service.delete_vnf_alarms(nsr_id) else: log.info( "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " "Current state is %s. Skipping...", - content['operationState']) + content["operationState"], + ) diff --git a/osm_policy_module/core/config.py b/osm_policy_module/core/config.py index e482ec8..289894a 100644 --- a/osm_policy_module/core/config.py +++ b/osm_policy_module/core/config.py @@ -33,14 +33,14 @@ logger = logging.getLogger(__name__) class Config: - def __init__(self, config_file: str = ''): + def __init__(self, config_file: str = ""): self.conf = {} self._read_config_file(config_file) self._read_env() def _read_config_file(self, config_file): if not config_file: - path = 'pol.yaml' + path = "pol.yaml" config_file = pkg_resources.resource_filename(__name__, path) with open(config_file) as f: self.conf = yaml.load(f) @@ -63,10 +63,12 @@ class Config: if len(elements) < 3: logger.warning( "Environment variable %s=%s does not comply with required format. Section and/or field missing.", - env, os.getenv(env)) + env, + os.getenv(env), + ) continue section = elements[1] - field = '_'.join(elements[2:]) + field = "_".join(elements[2:]) value = os.getenv(env) if section not in self.conf: self.conf[section] = {} diff --git a/osm_policy_module/core/database.py b/osm_policy_module/core/database.py index 3a89652..7bf11d8 100644 --- a/osm_policy_module/core/database.py +++ b/osm_policy_module/core/database.py @@ -26,8 +26,17 @@ import logging import os from typing import Iterable, List -from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField, AutoField, DateTimeField, Proxy, \ - BooleanField +from peewee import ( + CharField, + IntegerField, + ForeignKeyField, + Model, + TextField, + AutoField, + DateTimeField, + Proxy, + BooleanField, +) from peewee_migrate import Router from playhouse.db_url import connect @@ -56,16 +65,20 @@ class ScalingGroup(BaseModel): class ScalingPolicy(BaseModel): name = CharField() cooldown_time = IntegerField() - scale_in_operation = CharField(default='AND') - scale_out_operation = CharField(default='OR') + scale_in_operation = CharField(default="AND") + scale_out_operation = CharField(default="OR") enabled = BooleanField(default=True) last_scale = DateTimeField(default=datetime.datetime.now) - scaling_group = ForeignKeyField(ScalingGroup, related_name='scaling_policies', on_delete='CASCADE') + scaling_group = ForeignKeyField( + ScalingGroup, related_name="scaling_policies", on_delete="CASCADE" + ) class ScalingCriteria(BaseModel): name = CharField() - scaling_policy = ForeignKeyField(ScalingPolicy, related_name='scaling_criterias', on_delete='CASCADE') + scaling_policy = ForeignKeyField( + ScalingPolicy, related_name="scaling_criterias", on_delete="CASCADE" + ) class ScalingAlarm(BaseModel): @@ -73,8 +86,10 @@ class ScalingAlarm(BaseModel): action = CharField() vnf_member_index = CharField() vdu_name = CharField() - scaling_criteria = ForeignKeyField(ScalingCriteria, related_name='scaling_alarms', on_delete='CASCADE') - last_status = CharField(default='insufficient-data') + scaling_criteria = ForeignKeyField( + ScalingCriteria, related_name="scaling_alarms", on_delete="CASCADE" + ) + last_status = CharField(default="insufficient-data") class VnfAlarm(BaseModel): @@ -88,12 +103,12 @@ class VnfAlarm(BaseModel): class AlarmAction(BaseModel): type = CharField() url = TextField() - alarm = ForeignKeyField(VnfAlarm, related_name='actions', on_delete='CASCADE') + alarm = ForeignKeyField(VnfAlarm, related_name="actions", on_delete="CASCADE") class DatabaseManager: def __init__(self, config: Config): - db.initialize(connect(config.get('sql', 'database_uri'))) + db.initialize(connect(config.get("sql", "database_uri"))) def create_tables(self) -> None: db.connect() @@ -104,7 +119,6 @@ class DatabaseManager: class ScalingAlarmRepository: - @staticmethod def list(*expressions) -> Iterable[ScalingAlarm]: return ScalingAlarm.select().where(*expressions) @@ -123,7 +137,6 @@ class ScalingAlarmRepository: class ScalingGroupRepository: - @staticmethod def list(*expressions) -> Iterable[ScalingGroup]: return ScalingGroup.select().where(*expressions) @@ -138,7 +151,6 @@ class ScalingGroupRepository: class ScalingPolicyRepository: - @staticmethod def list(*expressions, join_classes: List = None) -> Iterable[ScalingPolicy]: query = ScalingPolicy.select() @@ -161,7 +173,6 @@ class ScalingPolicyRepository: class ScalingCriteriaRepository: - @staticmethod def list(*expressions, join_classes: List = None) -> Iterable[ScalingCriteria]: query = ScalingCriteria.select() @@ -184,7 +195,6 @@ class ScalingCriteriaRepository: class VnfAlarmRepository: - @staticmethod def list(*expressions) -> Iterable[VnfAlarm]: return VnfAlarm.select().where(*expressions) @@ -199,7 +209,6 @@ class VnfAlarmRepository: class AlarmActionRepository: - @staticmethod def list(*expressions) -> Iterable[AlarmAction]: return AlarmAction.select().where(*expressions) diff --git a/osm_policy_module/migrations/001_initial.py b/osm_policy_module/migrations/001_initial.py index 257a06c..d6f572c 100644 --- a/osm_policy_module/migrations/001_initial.py +++ b/osm_policy_module/migrations/001_initial.py @@ -76,8 +76,13 @@ def migrate(migrator, database, fake=False, **kwargs): name = pw.CharField(max_length=255) cooldown_time = pw.IntegerField() last_scale = pw.DateTimeField() - scaling_group = pw.ForeignKeyField(backref='scaling_policies', column_name='scaling_group_id', field='id', - model=migrator.orm['scalinggroup'], on_delete='CASCADE') + scaling_group = pw.ForeignKeyField( + backref="scaling_policies", + column_name="scaling_group_id", + field="id", + model=migrator.orm["scalinggroup"], + on_delete="CASCADE", + ) class Meta: table_name = "scalingpolicy" @@ -86,8 +91,13 @@ def migrate(migrator, database, fake=False, **kwargs): class ScalingCriteria(pw.Model): id = pw.AutoField() name = pw.CharField(max_length=255) - scaling_policy = pw.ForeignKeyField(backref='scaling_criterias', column_name='scaling_policy_id', field='id', - model=migrator.orm['scalingpolicy'], on_delete='CASCADE') + scaling_policy = pw.ForeignKeyField( + backref="scaling_criterias", + column_name="scaling_policy_id", + field="id", + model=migrator.orm["scalingpolicy"], + on_delete="CASCADE", + ) class Meta: table_name = "scalingcriteria" @@ -99,8 +109,13 @@ def migrate(migrator, database, fake=False, **kwargs): action = pw.CharField(max_length=255) vnf_member_index = pw.IntegerField() vdu_name = pw.CharField(max_length=255) - scaling_criteria = pw.ForeignKeyField(backref='scaling_alarms', column_name='scaling_criteria_id', field='id', - model=migrator.orm['scalingcriteria'], on_delete='CASCADE') + scaling_criteria = pw.ForeignKeyField( + backref="scaling_alarms", + column_name="scaling_criteria_id", + field="id", + model=migrator.orm["scalingcriteria"], + on_delete="CASCADE", + ) class Meta: table_name = "scalingalarm" @@ -109,12 +124,12 @@ def migrate(migrator, database, fake=False, **kwargs): def rollback(migrator, database, fake=False, **kwargs): """Write your rollback migrations here.""" - migrator.remove_model('scalingalarm') + migrator.remove_model("scalingalarm") - migrator.remove_model('scalingcriteria') + migrator.remove_model("scalingcriteria") - migrator.remove_model('scalingpolicy') + migrator.remove_model("scalingpolicy") - migrator.remove_model('scalinggroup') + migrator.remove_model("scalinggroup") - migrator.remove_model('basemodel') + migrator.remove_model("basemodel") diff --git a/osm_policy_module/migrations/002_add_vnf_alarm.py b/osm_policy_module/migrations/002_add_vnf_alarm.py index 70360e4..0c5c4b9 100644 --- a/osm_policy_module/migrations/002_add_vnf_alarm.py +++ b/osm_policy_module/migrations/002_add_vnf_alarm.py @@ -69,8 +69,13 @@ def migrate(migrator, database, fake=False, **kwargs): id = pw.AutoField() type = pw.CharField(max_length=255) url = pw.TextField() - alarm = pw.ForeignKeyField(backref='actions', column_name='alarm_id', field='id', - model=migrator.orm['vnfalarm'], on_delete='CASCADE') + alarm = pw.ForeignKeyField( + backref="actions", + column_name="alarm_id", + field="id", + model=migrator.orm["vnfalarm"], + on_delete="CASCADE", + ) class Meta: table_name = "alarmaction" @@ -79,6 +84,6 @@ def migrate(migrator, database, fake=False, **kwargs): def rollback(migrator, database, fake=False, **kwargs): """Write your rollback migrations here.""" - migrator.remove_model('vnfalarm') + migrator.remove_model("vnfalarm") - migrator.remove_model('alarmaction') + migrator.remove_model("alarmaction") diff --git a/osm_policy_module/migrations/003_add_fields_to_policy.py b/osm_policy_module/migrations/003_add_fields_to_policy.py index cb11390..5bd3784 100644 --- a/osm_policy_module/migrations/003_add_fields_to_policy.py +++ b/osm_policy_module/migrations/003_add_fields_to_policy.py @@ -52,13 +52,17 @@ SQL = pw.SQL def migrate(migrator, database, fake=False, **kwargs): """Write your migrations here.""" - migrator.add_fields('scalingpolicy', - scale_in_operation=pw.CharField(max_length=255, default='AND'), - scale_out_operation=pw.CharField(max_length=255, default='OR'), - enabled=pw.BooleanField(default=True)) + migrator.add_fields( + "scalingpolicy", + scale_in_operation=pw.CharField(max_length=255, default="AND"), + scale_out_operation=pw.CharField(max_length=255, default="OR"), + enabled=pw.BooleanField(default=True), + ) def rollback(migrator, database, fake=False, **kwargs): """Write your rollback migrations here.""" - migrator.remove_fields('scalingpolicy', 'scale_in_operation', 'scale_out_operation', 'enabled') + migrator.remove_fields( + "scalingpolicy", "scale_in_operation", "scale_out_operation", "enabled" + ) diff --git a/osm_policy_module/migrations/004_add_fields_to_alarm.py b/osm_policy_module/migrations/004_add_fields_to_alarm.py index 44c495a..f88a329 100644 --- a/osm_policy_module/migrations/004_add_fields_to_alarm.py +++ b/osm_policy_module/migrations/004_add_fields_to_alarm.py @@ -52,11 +52,13 @@ SQL = pw.SQL def migrate(migrator, database, fake=False, **kwargs): """Write your migrations here.""" - migrator.add_fields('scalingalarm', - last_status=pw.CharField(max_length=255, default='insufficient-data')) + migrator.add_fields( + "scalingalarm", + last_status=pw.CharField(max_length=255, default="insufficient-data"), + ) def rollback(migrator, database, fake=False, **kwargs): """Write your rollback migrations here.""" - migrator.remove_fields('scalingalarm', 'last_status') + migrator.remove_fields("scalingalarm", "last_status") diff --git a/osm_policy_module/migrations/005_change_vnf_index_member_to_str.py b/osm_policy_module/migrations/005_change_vnf_index_member_to_str.py index 36d2c65..650683a 100644 --- a/osm_policy_module/migrations/005_change_vnf_index_member_to_str.py +++ b/osm_policy_module/migrations/005_change_vnf_index_member_to_str.py @@ -52,14 +52,18 @@ SQL = pw.SQL def migrate(migrator, database, fake=False, **kwargs): """Write your migrations here.""" - migrator.change_fields('scalingalarm', vnf_member_index=pw.CharField(max_length=255)) - migrator.change_fields('vnfalarm', vnf_member_index=pw.CharField(max_length=255)) - migrator.change_fields('scalinggroup', vnf_member_index=pw.CharField(max_length=255)) + migrator.change_fields( + "scalingalarm", vnf_member_index=pw.CharField(max_length=255) + ) + migrator.change_fields("vnfalarm", vnf_member_index=pw.CharField(max_length=255)) + migrator.change_fields( + "scalinggroup", vnf_member_index=pw.CharField(max_length=255) + ) def rollback(migrator, database, fake=False, **kwargs): """Write your rollback migrations here.""" - migrator.change_fields('scalingalarm', vnf_member_index=pw.IntegerField()) - migrator.change_fields('vnfalarm', vnf_member_index=pw.IntegerField()) - migrator.change_fields('scalinggroup', vnf_member_index=pw.IntegerField()) + migrator.change_fields("scalingalarm", vnf_member_index=pw.IntegerField()) + migrator.change_fields("vnfalarm", vnf_member_index=pw.IntegerField()) + migrator.change_fields("scalinggroup", vnf_member_index=pw.IntegerField()) diff --git a/osm_policy_module/migrations/conf.py b/osm_policy_module/migrations/conf.py index c86cad9..70d39f8 100644 --- a/osm_policy_module/migrations/conf.py +++ b/osm_policy_module/migrations/conf.py @@ -23,4 +23,4 @@ ## import os -DATABASE = os.getenv('OSMPOL_SQL_DATABASE_URI', 'sqlite:///policy_module.db') +DATABASE = os.getenv("OSMPOL_SQL_DATABASE_URI", "sqlite:///policy_module.db") diff --git a/osm_policy_module/tests/integration/test_kafka_messages.py b/osm_policy_module/tests/integration/test_kafka_messages.py index 28b2e0f..725cc3f 100644 --- a/osm_policy_module/tests/integration/test_kafka_messages.py +++ b/osm_policy_module/tests/integration/test_kafka_messages.py @@ -43,8 +43,9 @@ class KafkaMessagesTest(unittest.TestCase): def setUp(self): super() cfg = Config() - self.kafka_server = '{}:{}'.format(cfg.get('message', 'host'), - cfg.get('message', 'port')) + self.kafka_server = "{}:{}".format( + cfg.get("message", "host"), cfg.get("message", "port") + ) self.loop = asyncio.new_event_loop() def tearDown(self): @@ -52,30 +53,38 @@ class KafkaMessagesTest(unittest.TestCase): def test_send_instantiated_msg(self): async def test_send_instantiated_msg(): - producer = AIOKafkaProducer(loop=self.loop, - bootstrap_servers=self.kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) + producer = AIOKafkaProducer( + loop=self.loop, + bootstrap_servers=self.kafka_server, + key_serializer=str.encode, + value_serializer=str.encode, + ) await producer.start() consumer = AIOKafkaConsumer( "ns", loop=self.loop, bootstrap_servers=self.kafka_server, consumer_timeout_ms=10000, - auto_offset_reset='earliest', + auto_offset_reset="earliest", value_deserializer=bytes.decode, - key_deserializer=bytes.decode) + key_deserializer=bytes.decode, + ) await consumer.start() try: with open( - os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file: + os.path.join( + os.path.dirname(__file__), "../examples/instantiated.json" + ) + ) as file: payload = json.load(file) - await producer.send_and_wait("ns", key="instantiated", value=json.dumps(payload)) + await producer.send_and_wait( + "ns", key="instantiated", value=json.dumps(payload) + ) finally: await producer.stop() try: async for message in consumer: - if message.key == 'instantiated': + if message.key == "instantiated": self.assertIsNotNone(message.value) return finally: @@ -84,8 +93,8 @@ class KafkaMessagesTest(unittest.TestCase): try: self.loop.run_until_complete(test_send_instantiated_msg()) except KafkaError: - self.skipTest('Kafka server not present.') + self.skipTest("Kafka server not present.") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/osm_policy_module/tests/integration/test_policy_agent.py b/osm_policy_module/tests/integration/test_policy_agent.py index 9fe5e2a..2c86b12 100644 --- a/osm_policy_module/tests/integration/test_policy_agent.py +++ b/osm_policy_module/tests/integration/test_policy_agent.py @@ -38,8 +38,14 @@ from osm_policy_module.common.mon_client import MonClient from osm_policy_module.core import database from osm_policy_module.core.agent import PolicyModuleAgent from osm_policy_module.core.config import Config -from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria, VnfAlarm, \ - AlarmAction +from osm_policy_module.core.database import ( + ScalingGroup, + ScalingAlarm, + ScalingPolicy, + ScalingCriteria, + VnfAlarm, + AlarmAction, +) log = logging.getLogger() log.level = logging.INFO @@ -56,9 +62,7 @@ nsr_record_mock = { "_id": "d7c8bd3c-eb39-4514-8847-19f01345524f", "_admin": { "created": 1535392246.499733, - "userDefinedData": { - - }, + "userDefinedData": {}, "usageSate": "NOT_IN_USE", "storage": { "zipfile": "package.tar.gz", @@ -66,66 +70,68 @@ nsr_record_mock = { "path": "/app/storage/", "folder": "d7c8bd3c-eb39-4514-8847-19f01345524f", "pkg-dir": "cirros_nsd", - "descriptor": "cirros_nsd/cirros_vdu_scaling_nsd.yaml" + "descriptor": "cirros_nsd/cirros_vdu_scaling_nsd.yaml", }, "onboardingState": "ONBOARDED", "modified": 1535392246.499733, - "projects_read": [ - "admin" - ], + "projects_read": ["admin"], "operationalState": "ENABLED", - "projects_write": [ - "admin" - ] + "projects_write": ["admin"], }, "id": "cirros_vdu_scaling_ns", "name": "cirros_vdu_scaling_ns", "description": "Simple NS example with a cirros_vdu_scaling_vnf", - "designer": "OSM", "version": "1.0", + "designer": "OSM", + "version": "1.0", "vnfd-id": ["cirros_vdu_scaling_vnf"], - "df": [{ - "id": "default-df", - "vnf-profile": [ - { - "id": "1", - "vnfd-id": "cirros_vdu_scaling_vnf", - "virtual-link-connectivity": [{ - "virtual-link-profile-id": "cirros_nsd_vld1", - "constituent-cpd-id": [{ - "constituent-base-element-id": "1", - "constituent-cpd-id": "eth0-ext" - }] - }] - }, { - "id": "2", - "vnfd-id": "cirros_vdu_scaling_vnf", - "virtual-link-connectivity": [{ - "virtual-link-profile-id": "cirros_nsd_vld1", - "constituent-cpd-id": [{ - "constituent-base-element-id": "2", - "constituent-cpd-id": "eth0-ext" - }] - }] - } - ] - }], - "virtual-link-desc": [{ - "id": "cirros_nsd_vld1", "mgmt-network": "true" - }] + "df": [ + { + "id": "default-df", + "vnf-profile": [ + { + "id": "1", + "vnfd-id": "cirros_vdu_scaling_vnf", + "virtual-link-connectivity": [ + { + "virtual-link-profile-id": "cirros_nsd_vld1", + "constituent-cpd-id": [ + { + "constituent-base-element-id": "1", + "constituent-cpd-id": "eth0-ext", + } + ], + } + ], + }, + { + "id": "2", + "vnfd-id": "cirros_vdu_scaling_vnf", + "virtual-link-connectivity": [ + { + "virtual-link-profile-id": "cirros_nsd_vld1", + "constituent-cpd-id": [ + { + "constituent-base-element-id": "2", + "constituent-cpd-id": "eth0-ext", + } + ], + } + ], + }, + ], + } + ], + "virtual-link-desc": [{"id": "cirros_nsd_vld1", "mgmt-network": "true"}], }, "id": "87776f33-b67c-417a-8119-cb08e4098951", "config-status": "configured", "operational-events": [], "_admin": { "created": 1535392482.0084584, - "projects_read": [ - "admin" - ], + "projects_read": ["admin"], "nsState": "INSTANTIATED", "modified": 1535392482.0084584, - "projects_write": [ - "admin" - ], + "projects_write": ["admin"], "deployed": { "RO": { "vnfd_id": { @@ -133,9 +139,9 @@ nsr_record_mock = { }, "nsd_id": "92c56cf0-f8fa-488c-9afb-9f3d78ae6bbb", "nsr_id": "637e12cd-c201-4c44-8ebd-70fb57a4dcee", - "nsr_status": "BUILD" + "nsr_status": "BUILD", } - } + }, }, "nsd-ref": "cirros_vdu_scaling_ns", "name": "cirros_ns", @@ -145,29 +151,21 @@ nsr_record_mock = { "nsdId": "d7c8bd3c-eb39-4514-8847-19f01345524f", "nsr_id": "87776f33-b67c-417a-8119-cb08e4098951", "nsName": "cirros_ns", - "vimAccountId": "be48ae31-1d46-4892-a4b4-d69abd55714b" + "vimAccountId": "be48ae31-1d46-4892-a4b4-d69abd55714b", }, "description": "default description", "constituent-vnfr-ref": [ "0d9d06ad-3fc2-418c-9934-465e815fafe2", - "3336eb44-77df-4c4f-9881-d2828d259864" + "3336eb44-77df-4c4f-9881-d2828d259864", ], "admin-status": "ENABLED", "detailed-status": "done", "datacenter": "be48ae31-1d46-4892-a4b4-d69abd55714b", - "orchestration-progress": { - - }, + "orchestration-progress": {}, "short-name": "cirros_ns", "ns-instance-config-ref": "87776f33-b67c-417a-8119-cb08e4098951", "nsd-name-ref": "cirros_vdu_scaling_ns", - "admin": { - "deployed": { - "RO": { - "nsr_status": "ACTIVE" - } - } - } + "admin": {"deployed": {"RO": {"nsr_status": "ACTIVE"}}}, } vnfr_record_mocks = [ @@ -182,7 +180,7 @@ vnfr_record_mocks = [ { "mac-address": "fa:16:3e:71:fd:b8", "name": "eth0", - "ip-address": "192.168.160.2" + "ip-address": "192.168.160.2", } ], "status": "ACTIVE", @@ -190,7 +188,7 @@ vnfr_record_mocks = [ "name": "cirros_ns-1-cirros_vnfd-VM-1", "status-detailed": None, "ip-address": "192.168.160.2", - "vdu-id-ref": "cirros_vnfd-VM" + "vdu-id-ref": "cirros_vnfd-VM", } ], "id": "0d9d06ad-3fc2-418c-9934-465e815fafe2", @@ -198,23 +196,13 @@ vnfr_record_mocks = [ "vnfd-id": "63f44c41-45ee-456b-b10d-5f08fb1796e0", "_admin": { "created": 1535392482.0067868, - "projects_read": [ - "admin" - ], + "projects_read": ["admin"], "modified": 1535392482.0067868, - "projects_write": [ - "admin" - ] + "projects_write": ["admin"], }, "nsr-id-ref": "87776f33-b67c-417a-8119-cb08e4098951", "member-vnf-index-ref": "1", - "connection-point": [ - { - "name": "eth0", - "id": None, - "connection-point-id": None - } - ] + "connection-point": [{"name": "eth0", "id": None, "connection-point-id": None}], }, { "_id": "3336eb44-77df-4c4f-9881-d2828d259864", @@ -227,7 +215,7 @@ vnfr_record_mocks = [ { "mac-address": "fa:16:3e:1e:76:e8", "name": "eth0", - "ip-address": "192.168.160.10" + "ip-address": "192.168.160.10", } ], "status": "ACTIVE", @@ -235,7 +223,7 @@ vnfr_record_mocks = [ "name": "cirros_ns-2-cirros_vnfd-VM-1", "status-detailed": None, "ip-address": "192.168.160.10", - "vdu-id-ref": "cirros_vnfd-VM" + "vdu-id-ref": "cirros_vnfd-VM", } ], "id": "3336eb44-77df-4c4f-9881-d2828d259864", @@ -243,189 +231,192 @@ vnfr_record_mocks = [ "vnfd-id": "63f44c41-45ee-456b-b10d-5f08fb1796e0", "_admin": { "created": 1535392482.0076294, - "projects_read": [ - "admin" - ], + "projects_read": ["admin"], "modified": 1535392482.0076294, - "projects_write": [ - "admin" - ] + "projects_write": ["admin"], }, "nsr-id-ref": "87776f33-b67c-417a-8119-cb08e4098951", "member-vnf-index-ref": "2", - "connection-point": [ - { - "name": "eth0", - "id": None, - "connection-point-id": None - } - ]}] + "connection-point": [{"name": "eth0", "id": None, "connection-point-id": None}], + }, +] nsd_record_mock = { - 'id': 'cirros_vdu_scaling_ns', - 'name': 'cirros_vdu_scaling_ns', - 'description': 'Simple NS example with a cirros_vdu_scaling_vnf', - 'designer': 'OSM', - 'version': '1.0', - 'vnfd-id': ['cirros_vdu_scaling_vnf'], - 'df': [{ - 'id': 'default-df', - 'vnf-profile': [ - { - 'id': '1', - 'vnfd-id': 'cirros_vdu_scaling_vnf', - 'virtual-link-connectivity': [{ - 'virtual-link-profile-id': 'cirros_nsd_vld1', - 'constituent-cpd-id': [{ - 'constituent-base-element-id': '1', - 'constituent-cpd-id': 'eth0-ext' - }] - }] - }, { - 'id': '2', - 'vnfd-id': 'cirros_vdu_scaling_vnf', - 'virtual-link-connectivity': [{ - 'virtual-link-profile-id': 'cirros_nsd_vld1', - 'constituent-cpd-id': [{ - 'constituent-base-element-id': '2', - 'constituent-cpd-id': 'eth0-ext' - }] - }] - } - ] - }], - 'virtual-link-desc': [{ - 'id': 'cirros_nsd_vld1', - 'mgmt-network': 'true' - }] + "id": "cirros_vdu_scaling_ns", + "name": "cirros_vdu_scaling_ns", + "description": "Simple NS example with a cirros_vdu_scaling_vnf", + "designer": "OSM", + "version": "1.0", + "vnfd-id": ["cirros_vdu_scaling_vnf"], + "df": [ + { + "id": "default-df", + "vnf-profile": [ + { + "id": "1", + "vnfd-id": "cirros_vdu_scaling_vnf", + "virtual-link-connectivity": [ + { + "virtual-link-profile-id": "cirros_nsd_vld1", + "constituent-cpd-id": [ + { + "constituent-base-element-id": "1", + "constituent-cpd-id": "eth0-ext", + } + ], + } + ], + }, + { + "id": "2", + "vnfd-id": "cirros_vdu_scaling_vnf", + "virtual-link-connectivity": [ + { + "virtual-link-profile-id": "cirros_nsd_vld1", + "constituent-cpd-id": [ + { + "constituent-base-element-id": "2", + "constituent-cpd-id": "eth0-ext", + } + ], + } + ], + }, + ], + } + ], + "virtual-link-desc": [{"id": "cirros_nsd_vld1", "mgmt-network": "true"}], } vnfd_record_mock = { - 'id': 'cirros_vdu_scaling_vnf', + "id": "cirros_vdu_scaling_vnf", "_id": "63f44c41-45ee-456b-b10d-5f08fb1796e0", - 'product-name': 'cirros_vdu_scaling_vnf', - 'description': 'Simple VNF example with a cirros and a scaling group descriptor', - 'provider': 'OSM', - 'version': '1.0', - 'mgmt-cp': 'eth0-ext', - 'virtual-storage-desc': [{ - 'id': 'cirros_vnfd-VM-storage', - 'size-of-storage': 2 - }], - 'virtual-compute-desc': [{ - 'id': 'cirros_vnfd-VM-compute', - 'virtual-cpu': { - 'num-virtual-cpu': 1 - }, - 'virtual-memory': { - 'size': 0.25 + "product-name": "cirros_vdu_scaling_vnf", + "description": "Simple VNF example with a cirros and a scaling group descriptor", + "provider": "OSM", + "version": "1.0", + "mgmt-cp": "eth0-ext", + "virtual-storage-desc": [{"id": "cirros_vnfd-VM-storage", "size-of-storage": 2}], + "virtual-compute-desc": [ + { + "id": "cirros_vnfd-VM-compute", + "virtual-cpu": {"num-virtual-cpu": 1}, + "virtual-memory": {"size": 0.25}, } - }], - 'sw-image-desc': [{ - 'id': 'cirros034', - 'name': 'cirros034', - 'image': 'cirros034' - }], - 'vdu': [{ - 'id': 'cirros_vnfd-VM', - 'description': 'cirros_vnfd-VM', - 'name': 'cirros_vnfd-VM', - 'alarm': [{ - 'value': 20.0, - 'actions': { - 'insufficient-data': [{ - 'url': 'localhost:9090' - }], - 'ok': [{ - 'url': 'localhost:9090' - }], - 'alarm': [{ - 'url': 'localhost:9090' - }] - }, - 'alarm-id': 'alarm-1', - 'operation': 'LT', - 'vnf-monitoring-param-ref': 'cirros_vnf_memory_util' - }], - 'sw-image-desc': 'cirros034', - 'virtual-compute-desc': 'cirros_vnfd-VM-compute', - 'virtual-storage-desc': ['cirros_vnfd-VM-storage'], - 'int-cpd': [{ - 'id': 'eth0-int', - 'virtual-network-interface-requirement': [{ - 'name': 'eth0', - 'virtual-interface': { - 'bandwidth': '0', - 'type': 'VIRTIO', - 'vpci': '0000:00:0a.0'} - }] - }], - 'monitoring-parameter': [{ - 'id': 'cirros_vnf_memory_util', - 'name': 'cirros_vnf_memory_util', - 'performance-metric': 'average_memory_utilization' - }] - }], - 'df': [{ - 'id': 'default-df', - 'vdu-profile': [{ - 'id': 'cirros_vnfd-VM', - 'min-number-of-instances': 1, - 'max-number-of-instances': 10, - 'vdu-configuration-id': 'cirros_vnfd-VM-vdu-configuration' - }], - 'instantiation-level': [{ - 'id': 'default-instantiation-level', - 'vdu-level': [{ - 'vdu-id': 'cirros_vnfd-VM', - 'number-of-instances': 1}] - }], - 'scaling-aspect': [{ - 'id': 'scale_cirros_vnfd-VM', - 'name': 'scale_cirros_vnfd-VM', - 'max-scale-level': 10, - 'scaling-policy': [{ - 'name': 'auto_memory_util_above_threshold', - 'scaling-type': 'automatic', - 'cooldown-time': 60, - 'threshold-time': 10, - 'scaling-criteria': [{ - 'name': 'group1_memory_util_above_threshold', - 'vnf-monitoring-param-ref': 'cirros_vnf_memory_util', - 'scale-out-threshold': 80, - 'scale-out-relational-operation': 'GT', - 'scale-in-relational-operation': 'LT', - 'scale-in-threshold': 20 - }] - }], - 'aspect-delta-details': { - 'deltas': [{ - 'id': 'scale_cirros_vnfd-VM-delta', - 'vdu-delta': [{ - 'number-of-instances': 1, - 'id': 'cirros_vnfd-VM'}] - }] - } - }] - }], - 'ext-cpd': [{ - 'id': 'eth0-ext', - 'int-cpd': { - 'vdu-id': 'cirros_vnfd-VM', - 'cpd': 'eth0-int' + ], + "sw-image-desc": [{"id": "cirros034", "name": "cirros034", "image": "cirros034"}], + "vdu": [ + { + "id": "cirros_vnfd-VM", + "description": "cirros_vnfd-VM", + "name": "cirros_vnfd-VM", + "alarm": [ + { + "value": 20.0, + "actions": { + "insufficient-data": [{"url": "localhost:9090"}], + "ok": [{"url": "localhost:9090"}], + "alarm": [{"url": "localhost:9090"}], + }, + "alarm-id": "alarm-1", + "operation": "LT", + "vnf-monitoring-param-ref": "cirros_vnf_memory_util", + } + ], + "sw-image-desc": "cirros034", + "virtual-compute-desc": "cirros_vnfd-VM-compute", + "virtual-storage-desc": ["cirros_vnfd-VM-storage"], + "int-cpd": [ + { + "id": "eth0-int", + "virtual-network-interface-requirement": [ + { + "name": "eth0", + "virtual-interface": { + "bandwidth": "0", + "type": "VIRTIO", + "vpci": "0000:00:0a.0", + }, + } + ], + } + ], + "monitoring-parameter": [ + { + "id": "cirros_vnf_memory_util", + "name": "cirros_vnf_memory_util", + "performance-metric": "average_memory_utilization", + } + ], } - }], - 'vdu-configuration': [{ - 'juju': { - 'charm': 'testmetrics', - 'proxy': True - }, - 'metrics': [{ - 'name': 'users' - }], - 'id': 'cirros_vnfd-VM-vdu-configuration' - }], + ], + "df": [ + { + "id": "default-df", + "vdu-profile": [ + { + "id": "cirros_vnfd-VM", + "min-number-of-instances": 1, + "max-number-of-instances": 10, + "vdu-configuration-id": "cirros_vnfd-VM-vdu-configuration", + } + ], + "instantiation-level": [ + { + "id": "default-instantiation-level", + "vdu-level": [ + {"vdu-id": "cirros_vnfd-VM", "number-of-instances": 1} + ], + } + ], + "scaling-aspect": [ + { + "id": "scale_cirros_vnfd-VM", + "name": "scale_cirros_vnfd-VM", + "max-scale-level": 10, + "scaling-policy": [ + { + "name": "auto_memory_util_above_threshold", + "scaling-type": "automatic", + "cooldown-time": 60, + "threshold-time": 10, + "scaling-criteria": [ + { + "name": "group1_memory_util_above_threshold", + "vnf-monitoring-param-ref": "cirros_vnf_memory_util", + "scale-out-threshold": 80, + "scale-out-relational-operation": "GT", + "scale-in-relational-operation": "LT", + "scale-in-threshold": 20, + } + ], + } + ], + "aspect-delta-details": { + "deltas": [ + { + "id": "scale_cirros_vnfd-VM-delta", + "vdu-delta": [ + {"number-of-instances": 1, "id": "cirros_vnfd-VM"} + ], + } + ] + }, + } + ], + } + ], + "ext-cpd": [ + {"id": "eth0-ext", "int-cpd": {"vdu-id": "cirros_vnfd-VM", "cpd": "eth0-int"}} + ], + "vdu-configuration": [ + { + "juju": {"charm": "testmetrics", "proxy": True}, + "metrics": [{"name": "users"}], + "id": "cirros_vnfd-VM-vdu-configuration", + } + ], "_admin": { "created": 1535392242.6281035, "modified": 1535392242.6281035, @@ -435,30 +426,31 @@ vnfd_record_mock = { "path": "/app/storage/", "folder": "63f44c41-45ee-456b-b10d-5f08fb1796e0", "fs": "local", - "descriptor": "cirros_vnf/cirros_vdu_scaling_vnfd.yaml" + "descriptor": "cirros_vnf/cirros_vdu_scaling_vnfd.yaml", }, "usageSate": "NOT_IN_USE", "onboardingState": "ONBOARDED", - "userDefinedData": { - - }, - "projects_read": [ - "admin" - ], + "userDefinedData": {}, + "projects_read": ["admin"], "operationalState": "ENABLED", - "projects_write": [ - "admin" - ] - } + "projects_write": ["admin"], + }, } -MODELS = [ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm, VnfAlarm, AlarmAction] +MODELS = [ + ScalingGroup, + ScalingPolicy, + ScalingCriteria, + ScalingAlarm, + VnfAlarm, + AlarmAction, +] class PolicyModuleAgentTest(unittest.TestCase): def setUp(self): super() - database.db.initialize(connect('sqlite:///test_db.sqlite')) + database.db.initialize(connect("sqlite:///test_db.sqlite")) database.db.bind(MODELS) database.db.connect() database.db.drop_tables(MODELS) @@ -468,19 +460,21 @@ class PolicyModuleAgentTest(unittest.TestCase): def tearDown(self): super() - os.remove('test_db.sqlite') + os.remove("test_db.sqlite") - @patch.object(DbMongo, 'db_connect', Mock()) - @patch.object(KafkaProducer, '__init__') - @patch.object(MonClient, 'create_alarm') - @patch.object(CommonDbClient, 'get_vnfd') - @patch.object(CommonDbClient, 'get_nsr') - @patch.object(CommonDbClient, 'get_vnfr') - def test_configure_scaling_groups(self, get_vnfr, get_nsr, get_vnfd, create_alarm, kafka_producer_init): + @patch.object(DbMongo, "db_connect", Mock()) + @patch.object(KafkaProducer, "__init__") + @patch.object(MonClient, "create_alarm") + @patch.object(CommonDbClient, "get_vnfd") + @patch.object(CommonDbClient, "get_nsr") + @patch.object(CommonDbClient, "get_vnfr") + def test_configure_scaling_groups( + self, get_vnfr, get_nsr, get_vnfd, create_alarm, kafka_producer_init + ): def _test_configure_scaling_groups_get_vnfr(*args, **kwargs): - if '1' in args[1]: + if "1" in args[1]: return vnfr_record_mocks[0] - if '2' in args[1]: + if "2" in args[1]: return vnfr_record_mocks[1] def assert_not_called_with(*args, **kwargs): @@ -488,7 +482,7 @@ class PolicyModuleAgentTest(unittest.TestCase): create_alarm.assert_called_with(*args, **kwargs) except AssertionError: return - raise AssertionError('Expected to not have been called.') + raise AssertionError("Expected to not have been called.") async def _test_configure_scaling_groups_create_alarm(*args, **kwargs): return uuid.uuid4() @@ -501,46 +495,58 @@ class PolicyModuleAgentTest(unittest.TestCase): create_alarm.assert_not_called_with = assert_not_called_with config = Config() agent = PolicyModuleAgent(config, self.loop) - self.loop.run_until_complete(agent.autoscaling_service.configure_scaling_groups("test_nsr_id")) - create_alarm.assert_any_call(metric_name='average_memory_utilization', - ns_id='test_nsr_id', - operation='GT', - threshold=80, - vdu_name='cirros_ns-1-cirros_vnfd-VM-1', - vnf_member_index='1') - create_alarm.assert_not_called_with(metric_name='average_memory_utilization', - ns_id='test_nsr_id', - operation='LT', - threshold=20, - vdu_name='cirros_ns-1-cirros_vnfd-VM-1', - vnf_member_index='1') - create_alarm.assert_any_call(metric_name='average_memory_utilization', - ns_id='test_nsr_id', - operation='GT', - threshold=80, - vdu_name='cirros_ns-2-cirros_vnfd-VM-1', - vnf_member_index='2') - create_alarm.assert_not_called_with(metric_name='average_memory_utilization', - ns_id='test_nsr_id', - operation='LT', - threshold=20, - vdu_name='cirros_ns-2-cirros_vnfd-VM-1', - vnf_member_index='2') + self.loop.run_until_complete( + agent.autoscaling_service.configure_scaling_groups("test_nsr_id") + ) + create_alarm.assert_any_call( + metric_name="average_memory_utilization", + ns_id="test_nsr_id", + operation="GT", + threshold=80, + vdu_name="cirros_ns-1-cirros_vnfd-VM-1", + vnf_member_index="1", + ) + create_alarm.assert_not_called_with( + metric_name="average_memory_utilization", + ns_id="test_nsr_id", + operation="LT", + threshold=20, + vdu_name="cirros_ns-1-cirros_vnfd-VM-1", + vnf_member_index="1", + ) + create_alarm.assert_any_call( + metric_name="average_memory_utilization", + ns_id="test_nsr_id", + operation="GT", + threshold=80, + vdu_name="cirros_ns-2-cirros_vnfd-VM-1", + vnf_member_index="2", + ) + create_alarm.assert_not_called_with( + metric_name="average_memory_utilization", + ns_id="test_nsr_id", + operation="LT", + threshold=20, + vdu_name="cirros_ns-2-cirros_vnfd-VM-1", + vnf_member_index="2", + ) scaling_record = ScalingGroup.get() - self.assertEqual(scaling_record.name, 'scale_cirros_vnfd-VM') - self.assertEqual(scaling_record.nsr_id, 'test_nsr_id') + self.assertEqual(scaling_record.name, "scale_cirros_vnfd-VM") + self.assertEqual(scaling_record.nsr_id, "test_nsr_id") - @patch.object(DbMongo, 'db_connect', Mock()) - @patch.object(KafkaProducer, '__init__') - @patch.object(MonClient, 'create_alarm') - @patch.object(CommonDbClient, 'get_vnfd') - @patch.object(CommonDbClient, 'get_nsr') - @patch.object(CommonDbClient, 'get_vnfr') - def test_configure_vnf_alarms(self, get_vnfr, get_nsr, get_vnfd, create_alarm, kafka_producer_init): + @patch.object(DbMongo, "db_connect", Mock()) + @patch.object(KafkaProducer, "__init__") + @patch.object(MonClient, "create_alarm") + @patch.object(CommonDbClient, "get_vnfd") + @patch.object(CommonDbClient, "get_nsr") + @patch.object(CommonDbClient, "get_vnfr") + def test_configure_vnf_alarms( + self, get_vnfr, get_nsr, get_vnfd, create_alarm, kafka_producer_init + ): def _test_configure_scaling_groups_get_vnfr(*args, **kwargs): - if '1' in args[1]: + if "1" in args[1]: return vnfr_record_mocks[0] - if '2' in args[1]: + if "2" in args[1]: return vnfr_record_mocks[1] async def _test_configure_vnf_alarms_create_alarm(*args, **kwargs): @@ -553,20 +559,26 @@ class PolicyModuleAgentTest(unittest.TestCase): create_alarm.side_effect = _test_configure_vnf_alarms_create_alarm config = Config() agent = PolicyModuleAgent(config, self.loop) - self.loop.run_until_complete(agent.alarming_service.configure_vnf_alarms("test_nsr_id")) - create_alarm.assert_any_call(metric_name='average_memory_utilization', - ns_id='test_nsr_id', - vdu_name='cirros_ns-1-cirros_vnfd-VM-1', - vnf_member_index='1', - threshold=20.0, - operation='LT') - create_alarm.assert_any_call(metric_name='average_memory_utilization', - ns_id='test_nsr_id', - vdu_name='cirros_ns-2-cirros_vnfd-VM-1', - vnf_member_index='2', - threshold=20.0, - operation='LT') + self.loop.run_until_complete( + agent.alarming_service.configure_vnf_alarms("test_nsr_id") + ) + create_alarm.assert_any_call( + metric_name="average_memory_utilization", + ns_id="test_nsr_id", + vdu_name="cirros_ns-1-cirros_vnfd-VM-1", + vnf_member_index="1", + threshold=20.0, + operation="LT", + ) + create_alarm.assert_any_call( + metric_name="average_memory_utilization", + ns_id="test_nsr_id", + vdu_name="cirros_ns-2-cirros_vnfd-VM-1", + vnf_member_index="2", + threshold=20.0, + operation="LT", + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/osm_policy_module/tests/unit/alarming/test_alarming_service.py b/osm_policy_module/tests/unit/alarming/test_alarming_service.py index 4edfa86..67956eb 100644 --- a/osm_policy_module/tests/unit/alarming/test_alarming_service.py +++ b/osm_policy_module/tests/unit/alarming/test_alarming_service.py @@ -36,56 +36,61 @@ from osm_policy_module.core.database import VnfAlarmRepository @mock.patch.object(MonClient, "__init__", lambda *args, **kwargs: None) @mock.patch.object(CommonDbClient, "__init__", lambda *args, **kwargs: None) class TestAlarmingService(TestCase): - def setUp(self): self.config = Config() self.loop = asyncio.new_event_loop() - @mock.patch.object(VnfAlarmRepository, 'get') - @mock.patch('requests.post') - @mock.patch('osm_policy_module.core.database.db') + @mock.patch.object(VnfAlarmRepository, "get") + @mock.patch("requests.post") + @mock.patch("osm_policy_module.core.database.db") def test_handle_alarm(self, database, requests_post, get_alarm): - mock_alarm = self._build_mock_alarm('test_id') + mock_alarm = self._build_mock_alarm("test_id") get_alarm.return_value = mock_alarm service = AlarmingService(self.config) - self.loop.run_until_complete(service.handle_alarm('test_id', 'alarm', {})) - requests_post.assert_called_once_with(json='{}', url='http://alarm-url/') + self.loop.run_until_complete(service.handle_alarm("test_id", "alarm", {})) + requests_post.assert_called_once_with(json="{}", url="http://alarm-url/") requests_post.reset_mock() - self.loop.run_until_complete(service.handle_alarm('test_id', 'ok', {})) - requests_post.assert_called_once_with(json='{}', url='http://ok-url/') + self.loop.run_until_complete(service.handle_alarm("test_id", "ok", {})) + requests_post.assert_called_once_with(json="{}", url="http://ok-url/") requests_post.reset_mock() - self.loop.run_until_complete(service.handle_alarm('test_id', 'insufficient-data', {})) - requests_post.assert_called_once_with(json='{}', url='http://insufficient-data-url/') - - @mock.patch.object(VnfAlarmRepository, 'get') - @mock.patch('requests.post') - @mock.patch('osm_policy_module.core.database.db') + self.loop.run_until_complete( + service.handle_alarm("test_id", "insufficient-data", {}) + ) + requests_post.assert_called_once_with( + json="{}", url="http://insufficient-data-url/" + ) + + @mock.patch.object(VnfAlarmRepository, "get") + @mock.patch("requests.post") + @mock.patch("osm_policy_module.core.database.db") def test_handle_alarm_unknown_status(self, database, requests_post, get_alarm): - mock_alarm = self._build_mock_alarm('test_id') + mock_alarm = self._build_mock_alarm("test_id") get_alarm.return_value = mock_alarm service = AlarmingService(self.config) - self.loop.run_until_complete(service.handle_alarm('test_id', 'unknown', {})) + self.loop.run_until_complete(service.handle_alarm("test_id", "unknown", {})) requests_post.assert_not_called() - def _build_mock_alarm(self, - alarm_id='test_id', - alarm_url='http://alarm-url/', - insufficient_data_url='http://insufficient-data-url/', - ok_url='http://ok-url/'): + def _build_mock_alarm( + self, + alarm_id="test_id", + alarm_url="http://alarm-url/", + insufficient_data_url="http://insufficient-data-url/", + ok_url="http://ok-url/", + ): mock_alarm = mock.Mock() mock_alarm.alarm_id = alarm_id insufficient_data_action = mock.Mock() - insufficient_data_action.type = 'insufficient-data' + insufficient_data_action.type = "insufficient-data" insufficient_data_action.url = insufficient_data_url alarm_action = mock.Mock() - alarm_action.type = 'alarm' + alarm_action.type = "alarm" alarm_action.url = alarm_url ok_action = mock.Mock() - ok_action.type = 'ok' + ok_action.type = "ok" ok_action.url = ok_url mock_alarm.actions = [insufficient_data_action, alarm_action, ok_action] return mock_alarm diff --git a/osm_policy_module/tests/unit/autoscaling/test_autoscaling_service.py b/osm_policy_module/tests/unit/autoscaling/test_autoscaling_service.py index 4491fbb..053b278 100644 --- a/osm_policy_module/tests/unit/autoscaling/test_autoscaling_service.py +++ b/osm_policy_module/tests/unit/autoscaling/test_autoscaling_service.py @@ -37,246 +37,297 @@ from osm_policy_module.core.database import ScalingAlarmRepository @mock.patch.object(MonClient, "__init__", lambda *args, **kwargs: None) @mock.patch.object(CommonDbClient, "__init__", lambda *args, **kwargs: None) class TestAutoscalingService(TestCase): - def setUp(self): self.config = Config() self.loop = asyncio.new_event_loop() - @mock.patch.object(ScalingAlarmRepository, 'get') - @mock.patch('osm_policy_module.core.database.db') + @mock.patch.object(ScalingAlarmRepository, "get") + @mock.patch("osm_policy_module.core.database.db") def test_update_alarm_status(self, database, get_alarm): mock_alarm = mock.Mock() - mock_alarm.last_status = 'insufficient_data' + mock_alarm.last_status = "insufficient_data" get_alarm.return_value = mock_alarm service = AutoscalingService(self.config) - self.loop.run_until_complete(service.update_alarm_status('test_uuid', 'alarm')) - self.assertEqual(mock_alarm.last_status, 'alarm') + self.loop.run_until_complete(service.update_alarm_status("test_uuid", "alarm")) + self.assertEqual(mock_alarm.last_status, "alarm") mock_alarm.save.assert_called_with() service = AutoscalingService(self.config) - self.loop.run_until_complete(service.update_alarm_status('test_uuid', 'ok')) - self.assertEqual(mock_alarm.last_status, 'ok') + self.loop.run_until_complete(service.update_alarm_status("test_uuid", "ok")) + self.assertEqual(mock_alarm.last_status, "ok") mock_alarm.save.assert_called_with() service = AutoscalingService(self.config) - self.loop.run_until_complete(service.update_alarm_status('test_uuid', 'insufficient_data')) - self.assertEqual(mock_alarm.last_status, 'insufficient_data') + self.loop.run_until_complete( + service.update_alarm_status("test_uuid", "insufficient_data") + ) + self.assertEqual(mock_alarm.last_status, "insufficient_data") mock_alarm.save.assert_called_with() - @mock.patch.object(ScalingAlarmRepository, 'list') - @mock.patch.object(ScalingAlarmRepository, 'get') - @mock.patch('osm_policy_module.core.database.db') + @mock.patch.object(ScalingAlarmRepository, "list") + @mock.patch.object(ScalingAlarmRepository, "get") + @mock.patch("osm_policy_module.core.database.db") def test_evaluate_policy_not_enabled(self, database, get_alarm, list_alarms): mock_alarm = mock.Mock() mock_alarm.scaling_criteria.scaling_policy.enabled = False get_alarm.return_value = mock_alarm service = AutoscalingService(self.config) - self.loop.run_until_complete(service.evaluate_policy('test_uuid')) + self.loop.run_until_complete(service.evaluate_policy("test_uuid")) list_alarms.assert_not_called() - @mock.patch.object(ScalingAlarmRepository, 'list') - @mock.patch.object(ScalingAlarmRepository, 'get') - @mock.patch.object(LcmClient, 'scale') - @mock.patch('osm_policy_module.core.database.db') - def test_evaluate_policy_scale_in_and_equal(self, database, scale, get_alarm, list_alarms): + @mock.patch.object(ScalingAlarmRepository, "list") + @mock.patch.object(ScalingAlarmRepository, "get") + @mock.patch.object(LcmClient, "scale") + @mock.patch("osm_policy_module.core.database.db") + def test_evaluate_policy_scale_in_and_equal( + self, database, scale, get_alarm, list_alarms + ): """ Tests scale in with AND operation, both alarms triggered """ future = asyncio.Future(loop=self.loop) - future.set_result('mock') + future.set_result("mock") scale.return_value = future - mock_alarm = self._build_mock_alarm(action='scale_in', last_status='alarm', enabled=True, scale_in_op='AND') + mock_alarm = self._build_mock_alarm( + action="scale_in", last_status="alarm", enabled=True, scale_in_op="AND" + ) get_alarm.return_value = mock_alarm - mock_alarm_2 = self._build_mock_alarm(action='scale_in', last_status='alarm', enabled=True, scale_in_op='AND') + mock_alarm_2 = self._build_mock_alarm( + action="scale_in", last_status="alarm", enabled=True, scale_in_op="AND" + ) list_alarms.return_value = [mock_alarm, mock_alarm_2] service = AutoscalingService(self.config) - self.loop.run_until_complete(service.evaluate_policy('test_uuid')) - scale.assert_called_with('test_nsr_id', 'test_group', '1', 'scale_in') - - @mock.patch.object(ScalingAlarmRepository, 'list') - @mock.patch.object(ScalingAlarmRepository, 'get') - @mock.patch.object(LcmClient, 'scale') - @mock.patch('osm_policy_module.core.database.db') - def test_evaluate_policy_scale_in_and_diff(self, database, scale, get_alarm, list_alarms): + self.loop.run_until_complete(service.evaluate_policy("test_uuid")) + scale.assert_called_with("test_nsr_id", "test_group", "1", "scale_in") + + @mock.patch.object(ScalingAlarmRepository, "list") + @mock.patch.object(ScalingAlarmRepository, "get") + @mock.patch.object(LcmClient, "scale") + @mock.patch("osm_policy_module.core.database.db") + def test_evaluate_policy_scale_in_and_diff( + self, database, scale, get_alarm, list_alarms + ): """ Tests scale in with AND operation, only one alarm triggered. """ future = asyncio.Future(loop=self.loop) - future.set_result('mock') + future.set_result("mock") scale.return_value = future - mock_alarm = self._build_mock_alarm(action='scale_in', last_status='alarm', enabled=True, scale_in_op='AND') + mock_alarm = self._build_mock_alarm( + action="scale_in", last_status="alarm", enabled=True, scale_in_op="AND" + ) get_alarm.return_value = mock_alarm - mock_alarm_2 = self._build_mock_alarm(action='scale_in', last_status='ok', enabled=True, scale_in_op='OR') + mock_alarm_2 = self._build_mock_alarm( + action="scale_in", last_status="ok", enabled=True, scale_in_op="OR" + ) list_alarms.return_value = [mock_alarm, mock_alarm_2] service = AutoscalingService(self.config) - self.loop.run_until_complete(service.evaluate_policy('test_uuid')) + self.loop.run_until_complete(service.evaluate_policy("test_uuid")) scale.assert_not_called() - @mock.patch.object(ScalingAlarmRepository, 'list') - @mock.patch.object(ScalingAlarmRepository, 'get') - @mock.patch.object(LcmClient, 'scale') - @mock.patch('osm_policy_module.core.database.db') - def test_evaluate_policy_scale_in_or_equal(self, database, scale, get_alarm, list_alarms): + @mock.patch.object(ScalingAlarmRepository, "list") + @mock.patch.object(ScalingAlarmRepository, "get") + @mock.patch.object(LcmClient, "scale") + @mock.patch("osm_policy_module.core.database.db") + def test_evaluate_policy_scale_in_or_equal( + self, database, scale, get_alarm, list_alarms + ): """ Tests scale in with OR operation, both alarms triggered """ future = asyncio.Future(loop=self.loop) - future.set_result('mock') + future.set_result("mock") scale.return_value = future - mock_alarm = self._build_mock_alarm(action='scale_in', last_status='alarm', enabled=True, scale_in_op='OR') + mock_alarm = self._build_mock_alarm( + action="scale_in", last_status="alarm", enabled=True, scale_in_op="OR" + ) get_alarm.return_value = mock_alarm - mock_alarm_2 = self._build_mock_alarm(action='scale_in', last_status='alarm', enabled=True, scale_in_op='OR') + mock_alarm_2 = self._build_mock_alarm( + action="scale_in", last_status="alarm", enabled=True, scale_in_op="OR" + ) list_alarms.return_value = [mock_alarm, mock_alarm_2] service = AutoscalingService(self.config) - self.loop.run_until_complete(service.evaluate_policy('test_uuid')) - scale.assert_called_with('test_nsr_id', 'test_group', '1', 'scale_in') - - @mock.patch.object(ScalingAlarmRepository, 'list') - @mock.patch.object(ScalingAlarmRepository, 'get') - @mock.patch.object(LcmClient, 'scale') - @mock.patch('osm_policy_module.core.database.db') - def test_evaluate_policy_scale_in_or_diff(self, database, scale, get_alarm, list_alarms): + self.loop.run_until_complete(service.evaluate_policy("test_uuid")) + scale.assert_called_with("test_nsr_id", "test_group", "1", "scale_in") + + @mock.patch.object(ScalingAlarmRepository, "list") + @mock.patch.object(ScalingAlarmRepository, "get") + @mock.patch.object(LcmClient, "scale") + @mock.patch("osm_policy_module.core.database.db") + def test_evaluate_policy_scale_in_or_diff( + self, database, scale, get_alarm, list_alarms + ): """ Tests scale in with OR operation, only one alarm triggered """ future = asyncio.Future(loop=self.loop) - future.set_result('mock') + future.set_result("mock") scale.return_value = future - mock_alarm = self._build_mock_alarm(action='scale_in', last_status='alarm', enabled=True, scale_in_op='OR') + mock_alarm = self._build_mock_alarm( + action="scale_in", last_status="alarm", enabled=True, scale_in_op="OR" + ) get_alarm.return_value = mock_alarm - mock_alarm_2 = self._build_mock_alarm(action='scale_in', last_status='ok', enabled=True, scale_in_op='OR') + mock_alarm_2 = self._build_mock_alarm( + action="scale_in", last_status="ok", enabled=True, scale_in_op="OR" + ) list_alarms.return_value = [mock_alarm, mock_alarm_2] service = AutoscalingService(self.config) - self.loop.run_until_complete(service.evaluate_policy('test_uuid')) - scale.assert_called_with('test_nsr_id', 'test_group', '1', 'scale_in') - - @mock.patch.object(ScalingAlarmRepository, 'list') - @mock.patch.object(ScalingAlarmRepository, 'get') - @mock.patch.object(LcmClient, 'scale') - @mock.patch('osm_policy_module.core.database.db') - def test_evaluate_policy_scale_out_and_equal(self, database, scale, get_alarm, list_alarms): + self.loop.run_until_complete(service.evaluate_policy("test_uuid")) + scale.assert_called_with("test_nsr_id", "test_group", "1", "scale_in") + + @mock.patch.object(ScalingAlarmRepository, "list") + @mock.patch.object(ScalingAlarmRepository, "get") + @mock.patch.object(LcmClient, "scale") + @mock.patch("osm_policy_module.core.database.db") + def test_evaluate_policy_scale_out_and_equal( + self, database, scale, get_alarm, list_alarms + ): """ Tests scale out with AND operation, both alarms triggered """ future = asyncio.Future(loop=self.loop) - future.set_result('mock') + future.set_result("mock") scale.return_value = future - mock_alarm = self._build_mock_alarm(action='scale_out', last_status='alarm', enabled=True, scale_out_op='AND') + mock_alarm = self._build_mock_alarm( + action="scale_out", last_status="alarm", enabled=True, scale_out_op="AND" + ) get_alarm.return_value = mock_alarm - mock_alarm_2 = self._build_mock_alarm(action='scale_out', last_status='alarm', enabled=True, scale_out_op='AND') + mock_alarm_2 = self._build_mock_alarm( + action="scale_out", last_status="alarm", enabled=True, scale_out_op="AND" + ) list_alarms.return_value = [mock_alarm, mock_alarm_2] service = AutoscalingService(self.config) - self.loop.run_until_complete(service.evaluate_policy('test_uuid')) - scale.assert_called_with('test_nsr_id', 'test_group', '1', 'scale_out') - - @mock.patch.object(ScalingAlarmRepository, 'list') - @mock.patch.object(ScalingAlarmRepository, 'get') - @mock.patch.object(LcmClient, 'scale') - @mock.patch('osm_policy_module.core.database.db') - def test_evaluate_policy_scale_out_and_diff(self, database, scale, get_alarm, list_alarms): + self.loop.run_until_complete(service.evaluate_policy("test_uuid")) + scale.assert_called_with("test_nsr_id", "test_group", "1", "scale_out") + + @mock.patch.object(ScalingAlarmRepository, "list") + @mock.patch.object(ScalingAlarmRepository, "get") + @mock.patch.object(LcmClient, "scale") + @mock.patch("osm_policy_module.core.database.db") + def test_evaluate_policy_scale_out_and_diff( + self, database, scale, get_alarm, list_alarms + ): """ Tests scale out with AND operation, only one alarm triggered. """ future = asyncio.Future(loop=self.loop) - future.set_result('mock') + future.set_result("mock") scale.return_value = future - mock_alarm = self._build_mock_alarm(action='scale_out', last_status='alarm', enabled=True, scale_out_op='AND') + mock_alarm = self._build_mock_alarm( + action="scale_out", last_status="alarm", enabled=True, scale_out_op="AND" + ) get_alarm.return_value = mock_alarm - mock_alarm_2 = self._build_mock_alarm(action='scale_out', last_status='ok', enabled=True, scale_out_op='OR') + mock_alarm_2 = self._build_mock_alarm( + action="scale_out", last_status="ok", enabled=True, scale_out_op="OR" + ) list_alarms.return_value = [mock_alarm, mock_alarm_2] service = AutoscalingService(self.config) - self.loop.run_until_complete(service.evaluate_policy('test_uuid')) + self.loop.run_until_complete(service.evaluate_policy("test_uuid")) scale.assert_not_called() - @mock.patch.object(ScalingAlarmRepository, 'list') - @mock.patch.object(ScalingAlarmRepository, 'get') - @mock.patch.object(LcmClient, 'scale') - @mock.patch('osm_policy_module.core.database.db') - def test_evaluate_policy_scale_out_or_equal(self, database, scale, get_alarm, list_alarms): + @mock.patch.object(ScalingAlarmRepository, "list") + @mock.patch.object(ScalingAlarmRepository, "get") + @mock.patch.object(LcmClient, "scale") + @mock.patch("osm_policy_module.core.database.db") + def test_evaluate_policy_scale_out_or_equal( + self, database, scale, get_alarm, list_alarms + ): """ Tests scale out with OR operation, both alarms triggered """ future = asyncio.Future(loop=self.loop) - future.set_result('mock') + future.set_result("mock") scale.return_value = future - mock_alarm = self._build_mock_alarm(action='scale_out', last_status='alarm', enabled=True, scale_out_op='OR') + mock_alarm = self._build_mock_alarm( + action="scale_out", last_status="alarm", enabled=True, scale_out_op="OR" + ) get_alarm.return_value = mock_alarm - mock_alarm_2 = self._build_mock_alarm(action='scale_out', last_status='alarm', enabled=True, scale_out_op='OR') + mock_alarm_2 = self._build_mock_alarm( + action="scale_out", last_status="alarm", enabled=True, scale_out_op="OR" + ) list_alarms.return_value = [mock_alarm, mock_alarm_2] service = AutoscalingService(self.config) - self.loop.run_until_complete(service.evaluate_policy('test_uuid')) - scale.assert_called_with('test_nsr_id', 'test_group', '1', 'scale_out') - - @mock.patch.object(ScalingAlarmRepository, 'list') - @mock.patch.object(ScalingAlarmRepository, 'get') - @mock.patch.object(LcmClient, 'scale') - @mock.patch('osm_policy_module.core.database.db') - def test_evaluate_policy_scale_out_or_diff(self, database, scale, get_alarm, list_alarms): + self.loop.run_until_complete(service.evaluate_policy("test_uuid")) + scale.assert_called_with("test_nsr_id", "test_group", "1", "scale_out") + + @mock.patch.object(ScalingAlarmRepository, "list") + @mock.patch.object(ScalingAlarmRepository, "get") + @mock.patch.object(LcmClient, "scale") + @mock.patch("osm_policy_module.core.database.db") + def test_evaluate_policy_scale_out_or_diff( + self, database, scale, get_alarm, list_alarms + ): """ Tests scale out with OR operation, only one alarm triggered """ future = asyncio.Future(loop=self.loop) - future.set_result('mock') + future.set_result("mock") scale.return_value = future - mock_alarm = self._build_mock_alarm(action='scale_out', last_status='alarm', enabled=True, scale_out_op='OR') + mock_alarm = self._build_mock_alarm( + action="scale_out", last_status="alarm", enabled=True, scale_out_op="OR" + ) get_alarm.return_value = mock_alarm - mock_alarm_2 = self._build_mock_alarm(action='scale_out', last_status='ok', enabled=True, scale_out_op='OR') + mock_alarm_2 = self._build_mock_alarm( + action="scale_out", last_status="ok", enabled=True, scale_out_op="OR" + ) list_alarms.return_value = [mock_alarm, mock_alarm_2] service = AutoscalingService(self.config) - self.loop.run_until_complete(service.evaluate_policy('test_uuid')) - scale.assert_called_with('test_nsr_id', 'test_group', '1', 'scale_out') - - def _build_mock_alarm(self, - action='scale_in', - last_status='alarm', - last_scale=datetime.datetime.min, - cooldown_time=10, - enabled=True, - scale_in_op='AND', - scale_out_op='AND'): + self.loop.run_until_complete(service.evaluate_policy("test_uuid")) + scale.assert_called_with("test_nsr_id", "test_group", "1", "scale_out") + + def _build_mock_alarm( + self, + action="scale_in", + last_status="alarm", + last_scale=datetime.datetime.min, + cooldown_time=10, + enabled=True, + scale_in_op="AND", + scale_out_op="AND", + ): mock_alarm = mock.Mock() mock_alarm.action = action mock_alarm.last_status = last_status - mock_alarm.vnf_member_index = '1' + mock_alarm.vnf_member_index = "1" mock_alarm.scaling_criteria.scaling_policy.last_scale = last_scale mock_alarm.scaling_criteria.scaling_policy.cooldown_time = cooldown_time mock_alarm.scaling_criteria.scaling_policy.enabled = enabled mock_alarm.scaling_criteria.scaling_policy.scale_in_operation = scale_in_op mock_alarm.scaling_criteria.scaling_policy.scale_out_operation = scale_out_op - mock_alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id = 'test_nsr_id' - mock_alarm.scaling_criteria.scaling_policy.scaling_group.name = 'test_group' + mock_alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id = "test_nsr_id" + mock_alarm.scaling_criteria.scaling_policy.scaling_group.name = "test_group" return mock_alarm diff --git a/osm_policy_module/tests/unit/common/test_message_bus_client.py b/osm_policy_module/tests/unit/common/test_message_bus_client.py index 0b97de7..81ca832 100644 --- a/osm_policy_module/tests/unit/common/test_message_bus_client.py +++ b/osm_policy_module/tests/unit/common/test_message_bus_client.py @@ -31,43 +31,42 @@ from osm_policy_module.core.config import Config class TestMessageBusClient(TestCase): - def setUp(self): self.config = Config() - self.config.set('message', 'driver', 'kafka') + self.config.set("message", "driver", "kafka") self.loop = asyncio.new_event_loop() - @mock.patch.object(MsgKafka, 'aioread') + @mock.patch.object(MsgKafka, "aioread") def test_aioread(self, aioread): async def mock_callback(): pass future = asyncio.Future(loop=self.loop) - future.set_result('mock') + future.set_result("mock") aioread.return_value = future msg_bus = MessageBusClient(self.config, loop=self.loop) - topic = 'test_topic' + topic = "test_topic" self.loop.run_until_complete(msg_bus.aioread([topic], mock_callback)) - aioread.assert_called_with(['test_topic'], self.loop, aiocallback=mock_callback) + aioread.assert_called_with(["test_topic"], self.loop, aiocallback=mock_callback) - @mock.patch.object(MsgKafka, 'aiowrite') + @mock.patch.object(MsgKafka, "aiowrite") def test_aiowrite(self, aiowrite): future = asyncio.Future(loop=self.loop) - future.set_result('mock') + future.set_result("mock") aiowrite.return_value = future msg_bus = MessageBusClient(self.config, loop=self.loop) - topic = 'test_topic' - key = 'test_key' - msg = {'test': 'test_msg'} + topic = "test_topic" + key = "test_key" + msg = {"test": "test_msg"} self.loop.run_until_complete(msg_bus.aiowrite(topic, key, msg)) aiowrite.assert_called_with(topic, key, msg, self.loop) - @mock.patch.object(MsgKafka, 'aioread') + @mock.patch.object(MsgKafka, "aioread") def test_aioread_once(self, aioread): future = asyncio.Future(loop=self.loop) - future.set_result('mock') + future.set_result("mock") aioread.return_value = future msg_bus = MessageBusClient(self.config, loop=self.loop) - topic = 'test_topic' + topic = "test_topic" self.loop.run_until_complete(msg_bus.aioread_once(topic)) - aioread.assert_called_with('test_topic', self.loop) + aioread.assert_called_with("test_topic", self.loop) diff --git a/osm_policy_module/tests/unit/core/test_policy_agent.py b/osm_policy_module/tests/unit/core/test_policy_agent.py index 9ff6b45..77c285d 100644 --- a/osm_policy_module/tests/unit/core/test_policy_agent.py +++ b/osm_policy_module/tests/unit/core/test_policy_agent.py @@ -37,23 +37,25 @@ class PolicyAgentTest(unittest.TestCase): self.loop = asyncio.new_event_loop() @mock.patch.object(CommonDbClient, "__init__", lambda *args, **kwargs: None) - @mock.patch('osm_policy_module.alarming.service.MonClient') - @mock.patch('osm_policy_module.alarming.service.LcmClient') - @mock.patch('osm_policy_module.autoscaling.service.MonClient') - @mock.patch('osm_policy_module.autoscaling.service.LcmClient') - @mock.patch.object(AutoscalingService, 'configure_scaling_groups') - @mock.patch.object(AlarmingService, 'configure_vnf_alarms') - @mock.patch.object(AutoscalingService, 'delete_orphaned_alarms') - @mock.patch.object(CommonDbClient, 'get_nslcmop') - def test_handle_instantiated(self, - get_nslcmop, - delete_orphaned_alarms, - configure_vnf_alarms, - configure_scaling_groups, - autoscaling_lcm_client, - autoscaling_mon_client, - alarming_lcm_client, - alarming_mon_client): + @mock.patch("osm_policy_module.alarming.service.MonClient") + @mock.patch("osm_policy_module.alarming.service.LcmClient") + @mock.patch("osm_policy_module.autoscaling.service.MonClient") + @mock.patch("osm_policy_module.autoscaling.service.LcmClient") + @mock.patch.object(AutoscalingService, "configure_scaling_groups") + @mock.patch.object(AlarmingService, "configure_vnf_alarms") + @mock.patch.object(AutoscalingService, "delete_orphaned_alarms") + @mock.patch.object(CommonDbClient, "get_nslcmop") + def test_handle_instantiated( + self, + get_nslcmop, + delete_orphaned_alarms, + configure_vnf_alarms, + configure_scaling_groups, + autoscaling_lcm_client, + autoscaling_mon_client, + alarming_lcm_client, + alarming_mon_client, + ): async def mock_configure_scaling_groups(nsr_id): pass @@ -70,23 +72,20 @@ class PolicyAgentTest(unittest.TestCase): assert alarming_lcm_client.called assert alarming_mon_client.called content = { - 'nslcmop_id': 'test_id', + "nslcmop_id": "test_id", } nslcmop_completed = { - 'operationState': 'COMPLETED', - 'nsInstanceId': 'test_nsr_id' - } - nslcmop_failed = { - 'operationState': 'FAILED', - 'nsInstanceId': 'test_nsr_id' + "operationState": "COMPLETED", + "nsInstanceId": "test_nsr_id", } + nslcmop_failed = {"operationState": "FAILED", "nsInstanceId": "test_nsr_id"} configure_scaling_groups.side_effect = mock_configure_scaling_groups configure_vnf_alarms.side_effect = mock_configure_vnf_alarms delete_orphaned_alarms.side_effect = mock_delete_orphaned_alarms get_nslcmop.return_value = nslcmop_completed self.loop.run_until_complete(agent._handle_instantiated(content)) - configure_scaling_groups.assert_called_with('test_nsr_id') + configure_scaling_groups.assert_called_with("test_nsr_id") configure_scaling_groups.reset_mock() get_nslcmop.return_value = nslcmop_failed @@ -94,19 +93,21 @@ class PolicyAgentTest(unittest.TestCase): configure_scaling_groups.assert_not_called() @mock.patch.object(CommonDbClient, "__init__", lambda *args, **kwargs: None) - @mock.patch('osm_policy_module.autoscaling.service.MonClient') - @mock.patch('osm_policy_module.autoscaling.service.LcmClient') - @mock.patch('osm_policy_module.alarming.service.MonClient') - @mock.patch('osm_policy_module.alarming.service.LcmClient') - @mock.patch.object(AutoscalingService, 'handle_alarm') - @mock.patch.object(AlarmingService, 'handle_alarm') - def test_handle_alarm_notification(self, - alarming_handle_alarm, - autoscaling_handle_alarm, - autoscaling_lcm_client, - autoscaling_mon_client, - alarming_lcm_client, - alarming_mon_client): + @mock.patch("osm_policy_module.autoscaling.service.MonClient") + @mock.patch("osm_policy_module.autoscaling.service.LcmClient") + @mock.patch("osm_policy_module.alarming.service.MonClient") + @mock.patch("osm_policy_module.alarming.service.LcmClient") + @mock.patch.object(AutoscalingService, "handle_alarm") + @mock.patch.object(AlarmingService, "handle_alarm") + def test_handle_alarm_notification( + self, + alarming_handle_alarm, + autoscaling_handle_alarm, + autoscaling_lcm_client, + autoscaling_mon_client, + alarming_lcm_client, + alarming_mon_client, + ): async def mock_handle_alarm(alarm_uuid, status, payload=None): pass @@ -117,24 +118,24 @@ class PolicyAgentTest(unittest.TestCase): assert alarming_lcm_client.called assert alarming_mon_client.called content = { - 'notify_details': { - 'alarm_uuid': 'test_alarm_uuid', - 'metric_name': 'test_metric_name', - 'operation': 'test_operation', - 'threshold_value': 'test_threshold_value', - 'vdu_name': 'test_vdu_name', - 'vnf_member_index': 'test_vnf_member_index', - 'ns_id': 'test_nsr_id', - 'status': 'alarm' + "notify_details": { + "alarm_uuid": "test_alarm_uuid", + "metric_name": "test_metric_name", + "operation": "test_operation", + "threshold_value": "test_threshold_value", + "vdu_name": "test_vdu_name", + "vnf_member_index": "test_vnf_member_index", + "ns_id": "test_nsr_id", + "status": "alarm", } } autoscaling_handle_alarm.side_effect = mock_handle_alarm alarming_handle_alarm.side_effect = mock_handle_alarm self.loop.run_until_complete(agent._handle_alarm_notification(content)) - autoscaling_handle_alarm.assert_called_with('test_alarm_uuid', 'alarm') - alarming_handle_alarm.assert_called_with('test_alarm_uuid', 'alarm', content) + autoscaling_handle_alarm.assert_called_with("test_alarm_uuid", "alarm") + alarming_handle_alarm.assert_called_with("test_alarm_uuid", "alarm", content) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/osm_policy_module/tests/unit/utils/test_vnfd_utils.py b/osm_policy_module/tests/unit/utils/test_vnfd_utils.py index 049d293..1474a1c 100644 --- a/osm_policy_module/tests/unit/utils/test_vnfd_utils.py +++ b/osm_policy_module/tests/unit/utils/test_vnfd_utils.py @@ -31,16 +31,20 @@ from osm_policy_module.core.exceptions import ManagementVduNotFound class VnfdUtilsTest(unittest.TestCase): def test_get_mgmt_vdu_on_valid_descriptor(self): - example_file = os.path.join(os.path.dirname(__file__), 'examples/cirros_vdu_scaling_vnfd.yaml') - with open(example_file, 'r') as file: - vnfd = yaml.safe_load(file)['vnfd'] + example_file = os.path.join( + os.path.dirname(__file__), "examples/cirros_vdu_scaling_vnfd.yaml" + ) + with open(example_file, "r") as file: + vnfd = yaml.safe_load(file)["vnfd"] vdu = VnfdUtils.get_mgmt_vdu(vnfd) - self.assertEqual(vdu['id'], 'cirros_vnfd-VM') + self.assertEqual(vdu["id"], "cirros_vnfd-VM") def test_get_mgmt_vdu_on_invalid_descriptor(self): - example_file = os.path.join(os.path.dirname(__file__), 'examples/cirros_vdu_scaling_vnfd.yaml') - with open(example_file, 'r') as file: - vnfd = yaml.safe_load(file)['vnfd'] - vnfd['mgmt-cp'] = 'invalid-cp' + example_file = os.path.join( + os.path.dirname(__file__), "examples/cirros_vdu_scaling_vnfd.yaml" + ) + with open(example_file, "r") as file: + vnfd = yaml.safe_load(file)["vnfd"] + vnfd["mgmt-cp"] = "invalid-cp" with self.assertRaises(ManagementVduNotFound): VnfdUtils.get_mgmt_vdu(vnfd) diff --git a/osm_policy_module/utils/vnfd.py b/osm_policy_module/utils/vnfd.py index 0ecbda1..1a81e63 100644 --- a/osm_policy_module/utils/vnfd.py +++ b/osm_policy_module/utils/vnfd.py @@ -25,16 +25,15 @@ from osm_policy_module.core.exceptions import ManagementVduNotFound class VnfdUtils: - @staticmethod def get_mgmt_vdu(vnfd: dict): - if 'mgmt-cp' in vnfd: - mgmt_cp = vnfd['mgmt-cp'] + if "mgmt-cp" in vnfd: + mgmt_cp = vnfd["mgmt-cp"] mgmt_cp_vdu_id = None - for cpd in vnfd.get('ext-cpd', ()): - if cpd.get('id') == mgmt_cp: - mgmt_cp_vdu_id = cpd.get('int-cpd', {}).get('vdu-id') - for vdu in vnfd.get('vdu', ()): - if vdu.get('id') == mgmt_cp_vdu_id: + for cpd in vnfd.get("ext-cpd", ()): + if cpd.get("id") == mgmt_cp: + mgmt_cp_vdu_id = cpd.get("int-cpd", {}).get("vdu-id") + for vdu in vnfd.get("vdu", ()): + if vdu.get("id") == mgmt_cp_vdu_id: return vdu - raise ManagementVduNotFound("Management vdu not found in vnfd %s", vnfd['id']) + raise ManagementVduNotFound("Management vdu not found in vnfd %s", vnfd["id"]) diff --git a/setup.py b/setup.py index a2273ad..530d3ee 100644 --- a/setup.py +++ b/setup.py @@ -23,21 +23,21 @@ ## from setuptools import setup -_name = 'osm_policy_module' -_version_command = ('git describe --match v* --tags --long --dirty', 'pep440-git-full') +_name = "osm_policy_module" +_version_command = ("git describe --match v* --tags --long --dirty", "pep440-git-full") _author = "OSM Support" -_author_email = 'osmsupport@etsi.org' -_description = 'OSM Policy Module' -_maintainer = 'OSM Support' -_maintainer_email = 'osmsupport@etsi.org' -_license = 'Apache 2.0' -_url = 'https://osm.etsi.org/gitweb/?p=osm/MON.git;a=tree' +_author_email = "osmsupport@etsi.org" +_description = "OSM Policy Module" +_maintainer = "OSM Support" +_maintainer_email = "osmsupport@etsi.org" +_license = "Apache 2.0" +_url = "https://osm.etsi.org/gitweb/?p=osm/MON.git;a=tree" setup( name=_name, version_command=_version_command, description=_description, - long_description=open('README.rst', encoding='utf-8').read(), + long_description=open("README.rst", encoding="utf-8").read(), author=_author, author_email=_author_email, maintainer=_maintainer, @@ -47,12 +47,11 @@ setup( packages=[_name], package_dir={_name: _name}, include_package_data=True, - entry_points={ "console_scripts": [ "osm-policy-agent = osm_policy_module.cmd.policy_module_agent:main", "osm-pol-healthcheck = osm_policy_module.cmd.policy_module_healthcheck:main", ] }, - setup_requires=['setuptools-version-command'] + setup_requires=["setuptools-version-command"], ) diff --git a/tox.ini b/tox.ini index 9c52bdb..64ae2f0 100644 --- a/tox.ini +++ b/tox.ini @@ -34,6 +34,7 @@ deps = black skip_install = true commands = - black --check --diff osm_policy_module/ + - black --check --diff setup.py ####################################################################################### -- 2.17.1