From 1a0af1c6928ce7269d25ab10be8aac6c21f69d8b Mon Sep 17 00:00:00 2001 From: Benjamin Diaz Date: Thu, 19 Sep 2019 12:01:14 -0300 Subject: [PATCH] Refactors alarm creation to comply with changes regarding the use of tags instead of fixed fields MON changed alarms to include the concept of tags. POL process for alarm creation has modified to comply with this. Also, some refactoring has been done in the autoscaling service. Change-Id: Iaf18262d05727bbb8ff06e83bce13e991aecab6c Signed-off-by: Benjamin Diaz --- osm_policy_module/autoscaling/service.py | 245 ++++++++++-------- osm_policy_module/common/mon_client.py | 18 +- osm_policy_module/core/agent.py | 6 +- .../tests/integration/test_policy_agent.py | 8 +- .../tests/unit/core/test_policy_agent.py | 25 +- 5 files changed, 171 insertions(+), 131 deletions(-) diff --git a/osm_policy_module/autoscaling/service.py b/osm_policy_module/autoscaling/service.py index 8bb17e4..e0ab9cb 100644 --- a/osm_policy_module/autoscaling/service.py +++ b/osm_policy_module/autoscaling/service.py @@ -25,6 +25,7 @@ import asyncio import datetime import json import logging +from typing import List from osm_policy_module.common.common_db_client import CommonDbClient from osm_policy_module.common.lcm_client import LcmClient @@ -73,77 +74,22 @@ class AutoscalingService: scaling_groups = vnfd['scaling-group-descriptor'] vnf_monitoring_params = vnfd['monitoring-param'] for scaling_group in scaling_groups: - try: - scaling_group_record = ScalingGroupRepository.get( - ScalingGroup.nsr_id == nsr_id, - ScalingGroup.vnf_member_index == vnfr['member-vnf-index-ref'], - ScalingGroup.name == scaling_group['name'] - ) - log.debug("Found existing scaling group record in DB...") - except ScalingGroup.DoesNotExist: - log.debug("Creating scaling group record in DB...") - scaling_group_record = ScalingGroupRepository.create( - nsr_id=nsr_id, - vnf_member_index=vnfr['member-vnf-index-ref'], - name=scaling_group['name'], - content=json.dumps(scaling_group) - ) - log.debug( - "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s", - scaling_group_record.nsr_id, - scaling_group_record.vnf_member_index, - scaling_group_record.name) + scaling_group_record = self._get_or_create_scaling_group(nsr_id, + vnfr['member-vnf-index-ref'], + scaling_group) for scaling_policy in scaling_group['scaling-policy']: if scaling_policy['scaling-type'] != 'automatic': continue - try: - scaling_policy_record = ScalingPolicyRepository.get( - ScalingPolicy.name == scaling_policy['name'], - ScalingGroup.id == scaling_group_record.id, - join_classes=[ScalingGroup] - ) - log.debug("Found existing scaling policy record in DB...") - except ScalingPolicy.DoesNotExist: - log.debug("Creating scaling policy record in DB...") - scaling_policy_record = ScalingPolicyRepository.create( - nsr_id=nsr_id, - name=scaling_policy['name'], - cooldown_time=scaling_policy['cooldown-time'], - scaling_group=scaling_group_record, - ) - if 'scale-in-operation-type' in scaling_policy: - scaling_policy_record.scale_in_operation = scaling_policy[ - 'scale-in-operation-type'] - if 'scale-out-operation-type' in scaling_policy: - scaling_policy_record.scale_out_operation = scaling_policy[ - 'scale-out-operation-type'] - if 'enabled' in scaling_policy: - scaling_policy_record.enabled = scaling_policy['enabled'] - scaling_policy_record.save() - log.debug("Created scaling policy record in DB : name=%s, scaling_group.name=%s", - scaling_policy_record.name, - scaling_policy_record.scaling_group.name) + scaling_policy_record = self._get_or_create_scaling_policy(nsr_id, + scaling_policy, + scaling_group_record) for scaling_criteria in scaling_policy['scaling-criteria']: - try: - scaling_criteria_record = ScalingCriteriaRepository.get( - ScalingPolicy.id == scaling_policy_record.id, - ScalingCriteria.name == scaling_criteria['name'], - join_classes=[ScalingPolicy] - ) - log.debug("Found existing scaling criteria record in DB...") - except ScalingCriteria.DoesNotExist: - log.debug("Creating scaling criteria record in DB...") - scaling_criteria_record = ScalingCriteriaRepository.create( - nsr_id=nsr_id, - name=scaling_criteria['name'], - scaling_policy=scaling_policy_record - ) - log.debug( - "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s", - scaling_criteria_record.name, - scaling_criteria_record.scaling_policy.name) - + scaling_criteria_record = self._get_or_create_scaling_criteria( + nsr_id, + scaling_criteria, + scaling_policy_record + ) vnf_monitoring_param = next( filter( lambda param: param['id'] == scaling_criteria[ @@ -151,37 +97,7 @@ class AutoscalingService: ], vnf_monitoring_params) ) - if 'vdu-monitoring-param' in vnf_monitoring_param: - vdurs = list( - filter( - lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param - ['vdu-monitoring-param'] - ['vdu-ref'], - vnfr['vdur'] - ) - ) - elif 'vdu-metric' in vnf_monitoring_param: - vdurs = list( - filter( - lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param - ['vdu-metric'] - ['vdu-ref'], - vnfr['vdur'] - ) - ) - elif 'vnf-metric' in vnf_monitoring_param: - vdu = VnfdUtils.get_mgmt_vdu(vnfd) - vdurs = list( - filter( - lambda vdur: vdur['vdu-id-ref'] == vdu['id'], - vnfr['vdur'] - ) - ) - else: - log.warning( - "Scaling criteria is referring to a vnf-monitoring-param that does not " - "contain a reference to a vdu or vnf metric.") - continue + vdurs = self._get_monitored_vdurs(vnf_monitoring_param, vnfr['vdur'], vnfd) for vdur in vdurs: log.debug("Creating alarm for vdur %s ", vdur) try: @@ -196,8 +112,9 @@ class AutoscalingService: continue except ScalingAlarm.DoesNotExist: pass + metric_name = self._get_metric_name(vnf_monitoring_param, vdur, vnfd) alarm_uuid = await self.mon_client.create_alarm( - metric_name=vnf_monitoring_param['id'], + metric_name=metric_name, ns_id=nsr_id, vdu_name=vdur['name'], vnf_member_index=vnfr['member-vnf-index-ref'], @@ -214,7 +131,7 @@ class AutoscalingService: ) alarms_created.append(alarm) alarm_uuid = await self.mon_client.create_alarm( - metric_name=vnf_monitoring_param['id'], + metric_name=metric_name, ns_id=nsr_id, vdu_name=vdur['name'], vnf_member_index=vnfr['member-vnf-index-ref'], @@ -307,9 +224,6 @@ class AutoscalingService: finally: database.db.close() - def get_nslcmop(self, nslcmop_id): - return self.db_client.get_nslcmop(nslcmop_id) - async def handle_alarm(self, alarm_uuid: str, status: str): await self.update_alarm_status(alarm_uuid, status) await self.evaluate_policy(alarm_uuid) @@ -366,3 +280,130 @@ class AutoscalingService: log.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid) finally: database.db.close() + + def _get_or_create_scaling_group(self, nsr_id: str, vnf_member_index: str, scaling_group: dict): + try: + scaling_group_record = ScalingGroupRepository.get( + ScalingGroup.nsr_id == nsr_id, + ScalingGroup.vnf_member_index == vnf_member_index, + ScalingGroup.name == scaling_group['name'] + ) + log.debug("Found existing scaling group record in DB...") + except ScalingGroup.DoesNotExist: + log.debug("Creating scaling group record in DB...") + scaling_group_record = ScalingGroupRepository.create( + nsr_id=nsr_id, + vnf_member_index=vnf_member_index, + name=scaling_group['name'], + content=json.dumps(scaling_group) + ) + log.debug( + "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s", + scaling_group_record.nsr_id, + scaling_group_record.vnf_member_index, + scaling_group_record.name) + return scaling_group_record + + def _get_or_create_scaling_policy(self, nsr_id: str, scaling_policy: dict, scaling_group_record: ScalingGroup): + try: + scaling_policy_record = ScalingPolicyRepository.get( + ScalingPolicy.name == scaling_policy['name'], + ScalingGroup.id == scaling_group_record.id, + join_classes=[ScalingGroup] + ) + log.debug("Found existing scaling policy record in DB...") + except ScalingPolicy.DoesNotExist: + log.debug("Creating scaling policy record in DB...") + scaling_policy_record = ScalingPolicyRepository.create( + nsr_id=nsr_id, + name=scaling_policy['name'], + cooldown_time=scaling_policy['cooldown-time'], + scaling_group=scaling_group_record, + ) + if 'scale-in-operation-type' in scaling_policy: + scaling_policy_record.scale_in_operation = scaling_policy[ + 'scale-in-operation-type'] + if 'scale-out-operation-type' in scaling_policy: + scaling_policy_record.scale_out_operation = scaling_policy[ + 'scale-out-operation-type'] + if 'enabled' in scaling_policy: + scaling_policy_record.enabled = scaling_policy['enabled'] + scaling_policy_record.save() + log.debug("Created scaling policy record in DB : name=%s, scaling_group.name=%s", + scaling_policy_record.name, + scaling_policy_record.scaling_group.name) + return scaling_policy_record + + def _get_or_create_scaling_criteria(self, nsr_id: str, scaling_criteria: dict, + scaling_policy_record: ScalingPolicy): + try: + scaling_criteria_record = ScalingCriteriaRepository.get( + ScalingPolicy.id == scaling_policy_record.id, + ScalingCriteria.name == scaling_criteria['name'], + join_classes=[ScalingPolicy] + ) + log.debug("Found existing scaling criteria record in DB...") + except ScalingCriteria.DoesNotExist: + log.debug("Creating scaling criteria record in DB...") + scaling_criteria_record = ScalingCriteriaRepository.create( + nsr_id=nsr_id, + name=scaling_criteria['name'], + scaling_policy=scaling_policy_record + ) + log.debug( + "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s", + scaling_criteria_record.name, + scaling_criteria_record.scaling_policy.name) + return scaling_criteria_record + + def _get_monitored_vdurs(self, vnf_monitoring_param: dict, vdurs: List[dict], vnfd: dict): + monitored_vdurs = [] + if 'vdu-monitoring-param' in vnf_monitoring_param: + monitored_vdurs = list( + filter( + lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param + ['vdu-monitoring-param'] + ['vdu-ref'], + vdurs + ) + ) + elif 'vdu-metric' in vnf_monitoring_param: + monitored_vdurs = list( + filter( + lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param + ['vdu-metric'] + ['vdu-ref'], + vdurs + ) + ) + elif 'vnf-metric' in vnf_monitoring_param: + vdu = VnfdUtils.get_mgmt_vdu(vnfd) + monitored_vdurs = list( + filter( + lambda vdur: vdur['vdu-id-ref'] == vdu['id'], + vdurs + ) + ) + else: + log.warning( + "Scaling criteria is referring to a vnf-monitoring-param that does not " + "contain a reference to a vdu or vnf metric.") + return monitored_vdurs + + def _get_metric_name(self, vnf_monitoring_param: dict, vdur: dict, vnfd: dict): + vdu = next( + filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']) + ) + if 'vdu-monitoring-param' in vnf_monitoring_param: + vdu_monitoring_param = next(filter( + lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param'][ + 'vdu-monitoring-param-ref'], vdu['monitoring-param'])) + nfvi_metric = vdu_monitoring_param['nfvi-metric'] + return nfvi_metric + if 'vdu-metric' in vnf_monitoring_param: + vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref'] + return vnf_metric_name + if 'vnf-metric' in vnf_monitoring_param: + vnf_metric_name = vnf_monitoring_param['vnf-metric']['vnf-metric-name-ref'] + return vnf_metric_name + raise ValueError('No metric name found for vnf_monitoring_param %s' % vnf_monitoring_param['id']) diff --git a/osm_policy_module/common/mon_client.py b/osm_policy_module/common/mon_client.py index 19f317d..f578462 100644 --- a/osm_policy_module/common/mon_client.py +++ b/osm_policy_module/common/mon_client.py @@ -146,13 +146,15 @@ class MonClient: 'correlation_id': cor_id, 'alarm_name': 'osm_alarm_{}_{}_{}_{}'.format(ns_id, vnf_member_index, vdu_name, metric_name), 'metric_name': metric_name, - 'ns_id': ns_id, - 'vdu_name': vdu_name, - 'vnf_member_index': vnf_member_index, 'operation': operation, 'severity': 'critical', 'threshold_value': threshold, - 'statistic': statistic + 'statistic': statistic, + 'tags': { + 'ns_id': ns_id, + 'vdu_name': vdu_name, + 'vnf_member_index': vnf_member_index, + } } msg = { 'alarm_create_request': alarm_create_request, @@ -164,9 +166,11 @@ class MonClient: alarm_delete_request = { 'correlation_id': cor_id, 'alarm_uuid': alarm_uuid, - 'ns_id': ns_id, - 'vdu_name': vdu_name, - 'vnf_member_index': vnf_member_index + 'tags': { + 'ns_id': ns_id, + 'vdu_name': vdu_name, + 'vnf_member_index': vnf_member_index + } } msg = { 'alarm_delete_request': alarm_delete_request, diff --git a/osm_policy_module/core/agent.py b/osm_policy_module/core/agent.py index 350f383..34b852a 100644 --- a/osm_policy_module/core/agent.py +++ b/osm_policy_module/core/agent.py @@ -28,6 +28,7 @@ import peewee from osm_policy_module.alarming.service import AlarmingService from osm_policy_module.autoscaling.service import AutoscalingService +from osm_policy_module.common.common_db_client import CommonDbClient from osm_policy_module.common.message_bus_client import MessageBusClient from osm_policy_module.core.config import Config @@ -43,6 +44,7 @@ class PolicyModuleAgent: loop = asyncio.get_event_loop() self.loop = loop self.msg_bus = MessageBusClient(config) + self.db_client = CommonDbClient(config) self.autoscaling_service = AutoscalingService(config, loop) self.alarming_service = AlarmingService(config, loop) @@ -91,7 +93,7 @@ class PolicyModuleAgent: async def _handle_instantiated(self, content): log.debug("_handle_instantiated: %s", content) nslcmop_id = content['nslcmop_id'] - nslcmop = self.autoscaling_service.get_nslcmop(nslcmop_id) + nslcmop = self.db_client.get_nslcmop(nslcmop_id) if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED': nsr_id = nslcmop['nsInstanceId'] log.info("Configuring nsr_id: %s", nsr_id) @@ -106,7 +108,7 @@ class PolicyModuleAgent: async def _handle_scaled(self, content): log.debug("_handle_scaled: %s", content) nslcmop_id = content['nslcmop_id'] - nslcmop = self.autoscaling_service.get_nslcmop(nslcmop_id) + nslcmop = self.db_client.get_nslcmop(nslcmop_id) if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED': nsr_id = nslcmop['nsInstanceId'] log.info("Configuring scaled service with nsr_id: %s", nsr_id) diff --git a/osm_policy_module/tests/integration/test_policy_agent.py b/osm_policy_module/tests/integration/test_policy_agent.py index d253b74..65a657d 100644 --- a/osm_policy_module/tests/integration/test_policy_agent.py +++ b/osm_policy_module/tests/integration/test_policy_agent.py @@ -489,28 +489,28 @@ class PolicyModuleAgentTest(unittest.TestCase): config = Config() agent = PolicyModuleAgent(config, self.loop) self.loop.run_until_complete(agent.autoscaling_service.configure_scaling_groups("test_nsr_id")) - create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util', + create_alarm.assert_any_call(metric_name='average_memory_utilization', ns_id='test_nsr_id', operation='GT', statistic='AVERAGE', threshold=80, vdu_name='cirros_ns-1-cirros_vnfd-VM-1', vnf_member_index='1') - create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util', + create_alarm.assert_any_call(metric_name='average_memory_utilization', ns_id='test_nsr_id', operation='LT', statistic='AVERAGE', threshold=20, vdu_name='cirros_ns-1-cirros_vnfd-VM-1', vnf_member_index='1') - create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util', + create_alarm.assert_any_call(metric_name='average_memory_utilization', ns_id='test_nsr_id', operation='GT', statistic='AVERAGE', threshold=80, vdu_name='cirros_ns-2-cirros_vnfd-VM-1', vnf_member_index='2') - create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util', + create_alarm.assert_any_call(metric_name='average_memory_utilization', ns_id='test_nsr_id', operation='LT', statistic='AVERAGE', diff --git a/osm_policy_module/tests/unit/core/test_policy_agent.py b/osm_policy_module/tests/unit/core/test_policy_agent.py index d40b43d..9ff6b45 100644 --- a/osm_policy_module/tests/unit/core/test_policy_agent.py +++ b/osm_policy_module/tests/unit/core/test_policy_agent.py @@ -27,6 +27,7 @@ from unittest import mock from osm_policy_module.alarming.service import AlarmingService from osm_policy_module.autoscaling.service import AutoscalingService +from osm_policy_module.common.common_db_client import CommonDbClient from osm_policy_module.core.agent import PolicyModuleAgent from osm_policy_module.core.config import Config @@ -35,25 +36,24 @@ class PolicyAgentTest(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() - @mock.patch('osm_policy_module.alarming.service.CommonDbClient') + @mock.patch.object(CommonDbClient, "__init__", lambda *args, **kwargs: None) @mock.patch('osm_policy_module.alarming.service.MonClient') @mock.patch('osm_policy_module.alarming.service.LcmClient') - @mock.patch('osm_policy_module.autoscaling.service.CommonDbClient') @mock.patch('osm_policy_module.autoscaling.service.MonClient') @mock.patch('osm_policy_module.autoscaling.service.LcmClient') @mock.patch.object(AutoscalingService, 'configure_scaling_groups') @mock.patch.object(AlarmingService, 'configure_vnf_alarms') @mock.patch.object(AutoscalingService, 'delete_orphaned_alarms') + @mock.patch.object(CommonDbClient, 'get_nslcmop') def test_handle_instantiated(self, + get_nslcmop, delete_orphaned_alarms, configure_vnf_alarms, configure_scaling_groups, autoscaling_lcm_client, autoscaling_mon_client, - autoscaling_db_client, alarming_lcm_client, - alarming_mon_client, - alarming_db_client): + alarming_mon_client): async def mock_configure_scaling_groups(nsr_id): pass @@ -67,10 +67,8 @@ class PolicyAgentTest(unittest.TestCase): agent = PolicyModuleAgent(config, self.loop) assert autoscaling_lcm_client.called assert autoscaling_mon_client.called - assert autoscaling_db_client.called assert alarming_lcm_client.called assert alarming_mon_client.called - assert alarming_db_client.called content = { 'nslcmop_id': 'test_id', } @@ -86,19 +84,18 @@ class PolicyAgentTest(unittest.TestCase): configure_vnf_alarms.side_effect = mock_configure_vnf_alarms delete_orphaned_alarms.side_effect = mock_delete_orphaned_alarms - autoscaling_db_client.return_value.get_nslcmop.return_value = nslcmop_completed + get_nslcmop.return_value = nslcmop_completed self.loop.run_until_complete(agent._handle_instantiated(content)) configure_scaling_groups.assert_called_with('test_nsr_id') configure_scaling_groups.reset_mock() - autoscaling_db_client.return_value.get_nslcmop.return_value = nslcmop_failed + get_nslcmop.return_value = nslcmop_failed self.loop.run_until_complete(agent._handle_instantiated(content)) configure_scaling_groups.assert_not_called() - @mock.patch('osm_policy_module.autoscaling.service.CommonDbClient') + @mock.patch.object(CommonDbClient, "__init__", lambda *args, **kwargs: None) @mock.patch('osm_policy_module.autoscaling.service.MonClient') @mock.patch('osm_policy_module.autoscaling.service.LcmClient') - @mock.patch('osm_policy_module.alarming.service.CommonDbClient') @mock.patch('osm_policy_module.alarming.service.MonClient') @mock.patch('osm_policy_module.alarming.service.LcmClient') @mock.patch.object(AutoscalingService, 'handle_alarm') @@ -108,10 +105,8 @@ class PolicyAgentTest(unittest.TestCase): autoscaling_handle_alarm, autoscaling_lcm_client, autoscaling_mon_client, - autoscaling_db_client, alarming_lcm_client, - alarming_mon_client, - alarming_db_client): + alarming_mon_client): async def mock_handle_alarm(alarm_uuid, status, payload=None): pass @@ -119,10 +114,8 @@ class PolicyAgentTest(unittest.TestCase): agent = PolicyModuleAgent(config, self.loop) assert autoscaling_lcm_client.called assert autoscaling_mon_client.called - assert autoscaling_db_client.called assert alarming_lcm_client.called assert alarming_mon_client.called - assert alarming_db_client.called content = { 'notify_details': { 'alarm_uuid': 'test_alarm_uuid', -- 2.25.1