Refactors alarm creation to comply with changes regarding the use of tags instead... 64/7964/2
authorBenjamin Diaz <bdiaz@whitestack.com>
Thu, 19 Sep 2019 15:01:14 +0000 (12:01 -0300)
committerBenjamin Diaz <bdiaz@whitestack.com>
Thu, 19 Sep 2019 21:21:03 +0000 (18:21 -0300)
MON changed alarms to include the concept of tags. POL process for alarm creation has
modified to comply with this.
Also, some refactoring has been done in the autoscaling service.

Change-Id: Iaf18262d05727bbb8ff06e83bce13e991aecab6c
Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
osm_policy_module/autoscaling/service.py
osm_policy_module/common/mon_client.py
osm_policy_module/core/agent.py
osm_policy_module/tests/integration/test_policy_agent.py
osm_policy_module/tests/unit/core/test_policy_agent.py

index 8bb17e4..e0ab9cb 100644 (file)
@@ -25,6 +25,7 @@ import asyncio
 import datetime
 import json
 import logging
+from typing import List
 
 from osm_policy_module.common.common_db_client import CommonDbClient
 from osm_policy_module.common.lcm_client import LcmClient
@@ -73,77 +74,22 @@ class AutoscalingService:
                         scaling_groups = vnfd['scaling-group-descriptor']
                         vnf_monitoring_params = vnfd['monitoring-param']
                         for scaling_group in scaling_groups:
-                            try:
-                                scaling_group_record = ScalingGroupRepository.get(
-                                    ScalingGroup.nsr_id == nsr_id,
-                                    ScalingGroup.vnf_member_index == vnfr['member-vnf-index-ref'],
-                                    ScalingGroup.name == scaling_group['name']
-                                )
-                                log.debug("Found existing scaling group record in DB...")
-                            except ScalingGroup.DoesNotExist:
-                                log.debug("Creating scaling group record in DB...")
-                                scaling_group_record = ScalingGroupRepository.create(
-                                    nsr_id=nsr_id,
-                                    vnf_member_index=vnfr['member-vnf-index-ref'],
-                                    name=scaling_group['name'],
-                                    content=json.dumps(scaling_group)
-                                )
-                                log.debug(
-                                    "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s",
-                                    scaling_group_record.nsr_id,
-                                    scaling_group_record.vnf_member_index,
-                                    scaling_group_record.name)
+                            scaling_group_record = self._get_or_create_scaling_group(nsr_id,
+                                                                                     vnfr['member-vnf-index-ref'],
+                                                                                     scaling_group)
                             for scaling_policy in scaling_group['scaling-policy']:
                                 if scaling_policy['scaling-type'] != 'automatic':
                                     continue
-                                try:
-                                    scaling_policy_record = ScalingPolicyRepository.get(
-                                        ScalingPolicy.name == scaling_policy['name'],
-                                        ScalingGroup.id == scaling_group_record.id,
-                                        join_classes=[ScalingGroup]
-                                    )
-                                    log.debug("Found existing scaling policy record in DB...")
-                                except ScalingPolicy.DoesNotExist:
-                                    log.debug("Creating scaling policy record in DB...")
-                                    scaling_policy_record = ScalingPolicyRepository.create(
-                                        nsr_id=nsr_id,
-                                        name=scaling_policy['name'],
-                                        cooldown_time=scaling_policy['cooldown-time'],
-                                        scaling_group=scaling_group_record,
-                                    )
-                                    if 'scale-in-operation-type' in scaling_policy:
-                                        scaling_policy_record.scale_in_operation = scaling_policy[
-                                            'scale-in-operation-type']
-                                    if 'scale-out-operation-type' in scaling_policy:
-                                        scaling_policy_record.scale_out_operation = scaling_policy[
-                                            'scale-out-operation-type']
-                                    if 'enabled' in scaling_policy:
-                                        scaling_policy_record.enabled = scaling_policy['enabled']
-                                    scaling_policy_record.save()
-                                    log.debug("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
-                                              scaling_policy_record.name,
-                                              scaling_policy_record.scaling_group.name)
+                                scaling_policy_record = self._get_or_create_scaling_policy(nsr_id,
+                                                                                           scaling_policy,
+                                                                                           scaling_group_record)
 
                                 for scaling_criteria in scaling_policy['scaling-criteria']:
-                                    try:
-                                        scaling_criteria_record = ScalingCriteriaRepository.get(
-                                            ScalingPolicy.id == scaling_policy_record.id,
-                                            ScalingCriteria.name == scaling_criteria['name'],
-                                            join_classes=[ScalingPolicy]
-                                        )
-                                        log.debug("Found existing scaling criteria record in DB...")
-                                    except ScalingCriteria.DoesNotExist:
-                                        log.debug("Creating scaling criteria record in DB...")
-                                        scaling_criteria_record = ScalingCriteriaRepository.create(
-                                            nsr_id=nsr_id,
-                                            name=scaling_criteria['name'],
-                                            scaling_policy=scaling_policy_record
-                                        )
-                                        log.debug(
-                                            "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
-                                            scaling_criteria_record.name,
-                                            scaling_criteria_record.scaling_policy.name)
-
+                                    scaling_criteria_record = self._get_or_create_scaling_criteria(
+                                        nsr_id,
+                                        scaling_criteria,
+                                        scaling_policy_record
+                                    )
                                     vnf_monitoring_param = next(
                                         filter(
                                             lambda param: param['id'] == scaling_criteria[
@@ -151,37 +97,7 @@ class AutoscalingService:
                                             ],
                                             vnf_monitoring_params)
                                     )
-                                    if 'vdu-monitoring-param' in vnf_monitoring_param:
-                                        vdurs = list(
-                                            filter(
-                                                lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param
-                                                ['vdu-monitoring-param']
-                                                ['vdu-ref'],
-                                                vnfr['vdur']
-                                            )
-                                        )
-                                    elif 'vdu-metric' in vnf_monitoring_param:
-                                        vdurs = list(
-                                            filter(
-                                                lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param
-                                                ['vdu-metric']
-                                                ['vdu-ref'],
-                                                vnfr['vdur']
-                                            )
-                                        )
-                                    elif 'vnf-metric' in vnf_monitoring_param:
-                                        vdu = VnfdUtils.get_mgmt_vdu(vnfd)
-                                        vdurs = list(
-                                            filter(
-                                                lambda vdur: vdur['vdu-id-ref'] == vdu['id'],
-                                                vnfr['vdur']
-                                            )
-                                        )
-                                    else:
-                                        log.warning(
-                                            "Scaling criteria is referring to a vnf-monitoring-param that does not "
-                                            "contain a reference to a vdu or vnf metric.")
-                                        continue
+                                    vdurs = self._get_monitored_vdurs(vnf_monitoring_param, vnfr['vdur'], vnfd)
                                     for vdur in vdurs:
                                         log.debug("Creating alarm for vdur %s ", vdur)
                                         try:
@@ -196,8 +112,9 @@ class AutoscalingService:
                                             continue
                                         except ScalingAlarm.DoesNotExist:
                                             pass
+                                        metric_name = self._get_metric_name(vnf_monitoring_param, vdur, vnfd)
                                         alarm_uuid = await self.mon_client.create_alarm(
-                                            metric_name=vnf_monitoring_param['id'],
+                                            metric_name=metric_name,
                                             ns_id=nsr_id,
                                             vdu_name=vdur['name'],
                                             vnf_member_index=vnfr['member-vnf-index-ref'],
@@ -214,7 +131,7 @@ class AutoscalingService:
                                         )
                                         alarms_created.append(alarm)
                                         alarm_uuid = await self.mon_client.create_alarm(
-                                            metric_name=vnf_monitoring_param['id'],
+                                            metric_name=metric_name,
                                             ns_id=nsr_id,
                                             vdu_name=vdur['name'],
                                             vnf_member_index=vnfr['member-vnf-index-ref'],
@@ -307,9 +224,6 @@ class AutoscalingService:
         finally:
             database.db.close()
 
-    def get_nslcmop(self, nslcmop_id):
-        return self.db_client.get_nslcmop(nslcmop_id)
-
     async def handle_alarm(self, alarm_uuid: str, status: str):
         await self.update_alarm_status(alarm_uuid, status)
         await self.evaluate_policy(alarm_uuid)
@@ -366,3 +280,130 @@ class AutoscalingService:
             log.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid)
         finally:
             database.db.close()
+
+    def _get_or_create_scaling_group(self, nsr_id: str, vnf_member_index: str, scaling_group: dict):
+        try:
+            scaling_group_record = ScalingGroupRepository.get(
+                ScalingGroup.nsr_id == nsr_id,
+                ScalingGroup.vnf_member_index == vnf_member_index,
+                ScalingGroup.name == scaling_group['name']
+            )
+            log.debug("Found existing scaling group record in DB...")
+        except ScalingGroup.DoesNotExist:
+            log.debug("Creating scaling group record in DB...")
+            scaling_group_record = ScalingGroupRepository.create(
+                nsr_id=nsr_id,
+                vnf_member_index=vnf_member_index,
+                name=scaling_group['name'],
+                content=json.dumps(scaling_group)
+            )
+            log.debug(
+                "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s",
+                scaling_group_record.nsr_id,
+                scaling_group_record.vnf_member_index,
+                scaling_group_record.name)
+        return scaling_group_record
+
+    def _get_or_create_scaling_policy(self, nsr_id: str, scaling_policy: dict, scaling_group_record: ScalingGroup):
+        try:
+            scaling_policy_record = ScalingPolicyRepository.get(
+                ScalingPolicy.name == scaling_policy['name'],
+                ScalingGroup.id == scaling_group_record.id,
+                join_classes=[ScalingGroup]
+            )
+            log.debug("Found existing scaling policy record in DB...")
+        except ScalingPolicy.DoesNotExist:
+            log.debug("Creating scaling policy record in DB...")
+            scaling_policy_record = ScalingPolicyRepository.create(
+                nsr_id=nsr_id,
+                name=scaling_policy['name'],
+                cooldown_time=scaling_policy['cooldown-time'],
+                scaling_group=scaling_group_record,
+            )
+            if 'scale-in-operation-type' in scaling_policy:
+                scaling_policy_record.scale_in_operation = scaling_policy[
+                    'scale-in-operation-type']
+            if 'scale-out-operation-type' in scaling_policy:
+                scaling_policy_record.scale_out_operation = scaling_policy[
+                    'scale-out-operation-type']
+            if 'enabled' in scaling_policy:
+                scaling_policy_record.enabled = scaling_policy['enabled']
+            scaling_policy_record.save()
+            log.debug("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
+                      scaling_policy_record.name,
+                      scaling_policy_record.scaling_group.name)
+        return scaling_policy_record
+
+    def _get_or_create_scaling_criteria(self, nsr_id: str, scaling_criteria: dict,
+                                        scaling_policy_record: ScalingPolicy):
+        try:
+            scaling_criteria_record = ScalingCriteriaRepository.get(
+                ScalingPolicy.id == scaling_policy_record.id,
+                ScalingCriteria.name == scaling_criteria['name'],
+                join_classes=[ScalingPolicy]
+            )
+            log.debug("Found existing scaling criteria record in DB...")
+        except ScalingCriteria.DoesNotExist:
+            log.debug("Creating scaling criteria record in DB...")
+            scaling_criteria_record = ScalingCriteriaRepository.create(
+                nsr_id=nsr_id,
+                name=scaling_criteria['name'],
+                scaling_policy=scaling_policy_record
+            )
+            log.debug(
+                "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
+                scaling_criteria_record.name,
+                scaling_criteria_record.scaling_policy.name)
+        return scaling_criteria_record
+
+    def _get_monitored_vdurs(self, vnf_monitoring_param: dict, vdurs: List[dict], vnfd: dict):
+        monitored_vdurs = []
+        if 'vdu-monitoring-param' in vnf_monitoring_param:
+            monitored_vdurs = list(
+                filter(
+                    lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param
+                    ['vdu-monitoring-param']
+                    ['vdu-ref'],
+                    vdurs
+                )
+            )
+        elif 'vdu-metric' in vnf_monitoring_param:
+            monitored_vdurs = list(
+                filter(
+                    lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param
+                    ['vdu-metric']
+                    ['vdu-ref'],
+                    vdurs
+                )
+            )
+        elif 'vnf-metric' in vnf_monitoring_param:
+            vdu = VnfdUtils.get_mgmt_vdu(vnfd)
+            monitored_vdurs = list(
+                filter(
+                    lambda vdur: vdur['vdu-id-ref'] == vdu['id'],
+                    vdurs
+                )
+            )
+        else:
+            log.warning(
+                "Scaling criteria is referring to a vnf-monitoring-param that does not "
+                "contain a reference to a vdu or vnf metric.")
+        return monitored_vdurs
+
+    def _get_metric_name(self, vnf_monitoring_param: dict, vdur: dict, vnfd: dict):
+        vdu = next(
+            filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu'])
+        )
+        if 'vdu-monitoring-param' in vnf_monitoring_param:
+            vdu_monitoring_param = next(filter(
+                lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param'][
+                    'vdu-monitoring-param-ref'], vdu['monitoring-param']))
+            nfvi_metric = vdu_monitoring_param['nfvi-metric']
+            return nfvi_metric
+        if 'vdu-metric' in vnf_monitoring_param:
+            vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref']
+            return vnf_metric_name
+        if 'vnf-metric' in vnf_monitoring_param:
+            vnf_metric_name = vnf_monitoring_param['vnf-metric']['vnf-metric-name-ref']
+            return vnf_metric_name
+        raise ValueError('No metric name found for vnf_monitoring_param %s' % vnf_monitoring_param['id'])
index 19f317d..f578462 100644 (file)
@@ -146,13 +146,15 @@ class MonClient:
             'correlation_id': cor_id,
             'alarm_name': 'osm_alarm_{}_{}_{}_{}'.format(ns_id, vnf_member_index, vdu_name, metric_name),
             'metric_name': metric_name,
-            'ns_id': ns_id,
-            'vdu_name': vdu_name,
-            'vnf_member_index': vnf_member_index,
             'operation': operation,
             'severity': 'critical',
             'threshold_value': threshold,
-            'statistic': statistic
+            'statistic': statistic,
+            'tags': {
+                'ns_id': ns_id,
+                'vdu_name': vdu_name,
+                'vnf_member_index': vnf_member_index,
+            }
         }
         msg = {
             'alarm_create_request': alarm_create_request,
@@ -164,9 +166,11 @@ class MonClient:
         alarm_delete_request = {
             'correlation_id': cor_id,
             'alarm_uuid': alarm_uuid,
-            'ns_id': ns_id,
-            'vdu_name': vdu_name,
-            'vnf_member_index': vnf_member_index
+            'tags': {
+                'ns_id': ns_id,
+                'vdu_name': vdu_name,
+                'vnf_member_index': vnf_member_index
+            }
         }
         msg = {
             'alarm_delete_request': alarm_delete_request,
index 350f383..34b852a 100644 (file)
@@ -28,6 +28,7 @@ import peewee
 
 from osm_policy_module.alarming.service import AlarmingService
 from osm_policy_module.autoscaling.service import AutoscalingService
+from osm_policy_module.common.common_db_client import CommonDbClient
 from osm_policy_module.common.message_bus_client import MessageBusClient
 from osm_policy_module.core.config import Config
 
@@ -43,6 +44,7 @@ class PolicyModuleAgent:
             loop = asyncio.get_event_loop()
         self.loop = loop
         self.msg_bus = MessageBusClient(config)
+        self.db_client = CommonDbClient(config)
         self.autoscaling_service = AutoscalingService(config, loop)
         self.alarming_service = AlarmingService(config, loop)
 
@@ -91,7 +93,7 @@ class PolicyModuleAgent:
     async def _handle_instantiated(self, content):
         log.debug("_handle_instantiated: %s", content)
         nslcmop_id = content['nslcmop_id']
-        nslcmop = self.autoscaling_service.get_nslcmop(nslcmop_id)
+        nslcmop = self.db_client.get_nslcmop(nslcmop_id)
         if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
             nsr_id = nslcmop['nsInstanceId']
             log.info("Configuring nsr_id: %s", nsr_id)
@@ -106,7 +108,7 @@ class PolicyModuleAgent:
     async def _handle_scaled(self, content):
         log.debug("_handle_scaled: %s", content)
         nslcmop_id = content['nslcmop_id']
-        nslcmop = self.autoscaling_service.get_nslcmop(nslcmop_id)
+        nslcmop = self.db_client.get_nslcmop(nslcmop_id)
         if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
             nsr_id = nslcmop['nsInstanceId']
             log.info("Configuring scaled service with nsr_id: %s", nsr_id)
index d253b74..65a657d 100644 (file)
@@ -489,28 +489,28 @@ class PolicyModuleAgentTest(unittest.TestCase):
         config = Config()
         agent = PolicyModuleAgent(config, self.loop)
         self.loop.run_until_complete(agent.autoscaling_service.configure_scaling_groups("test_nsr_id"))
-        create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util',
+        create_alarm.assert_any_call(metric_name='average_memory_utilization',
                                      ns_id='test_nsr_id',
                                      operation='GT',
                                      statistic='AVERAGE',
                                      threshold=80,
                                      vdu_name='cirros_ns-1-cirros_vnfd-VM-1',
                                      vnf_member_index='1')
-        create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util',
+        create_alarm.assert_any_call(metric_name='average_memory_utilization',
                                      ns_id='test_nsr_id',
                                      operation='LT',
                                      statistic='AVERAGE',
                                      threshold=20,
                                      vdu_name='cirros_ns-1-cirros_vnfd-VM-1',
                                      vnf_member_index='1')
-        create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util',
+        create_alarm.assert_any_call(metric_name='average_memory_utilization',
                                      ns_id='test_nsr_id',
                                      operation='GT',
                                      statistic='AVERAGE',
                                      threshold=80,
                                      vdu_name='cirros_ns-2-cirros_vnfd-VM-1',
                                      vnf_member_index='2')
-        create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util',
+        create_alarm.assert_any_call(metric_name='average_memory_utilization',
                                      ns_id='test_nsr_id',
                                      operation='LT',
                                      statistic='AVERAGE',
index d40b43d..9ff6b45 100644 (file)
@@ -27,6 +27,7 @@ from unittest import mock
 
 from osm_policy_module.alarming.service import AlarmingService
 from osm_policy_module.autoscaling.service import AutoscalingService
+from osm_policy_module.common.common_db_client import CommonDbClient
 from osm_policy_module.core.agent import PolicyModuleAgent
 from osm_policy_module.core.config import Config
 
@@ -35,25 +36,24 @@ class PolicyAgentTest(unittest.TestCase):
     def setUp(self):
         self.loop = asyncio.new_event_loop()
 
-    @mock.patch('osm_policy_module.alarming.service.CommonDbClient')
+    @mock.patch.object(CommonDbClient, "__init__", lambda *args, **kwargs: None)
     @mock.patch('osm_policy_module.alarming.service.MonClient')
     @mock.patch('osm_policy_module.alarming.service.LcmClient')
-    @mock.patch('osm_policy_module.autoscaling.service.CommonDbClient')
     @mock.patch('osm_policy_module.autoscaling.service.MonClient')
     @mock.patch('osm_policy_module.autoscaling.service.LcmClient')
     @mock.patch.object(AutoscalingService, 'configure_scaling_groups')
     @mock.patch.object(AlarmingService, 'configure_vnf_alarms')
     @mock.patch.object(AutoscalingService, 'delete_orphaned_alarms')
+    @mock.patch.object(CommonDbClient, 'get_nslcmop')
     def test_handle_instantiated(self,
+                                 get_nslcmop,
                                  delete_orphaned_alarms,
                                  configure_vnf_alarms,
                                  configure_scaling_groups,
                                  autoscaling_lcm_client,
                                  autoscaling_mon_client,
-                                 autoscaling_db_client,
                                  alarming_lcm_client,
-                                 alarming_mon_client,
-                                 alarming_db_client):
+                                 alarming_mon_client):
         async def mock_configure_scaling_groups(nsr_id):
             pass
 
@@ -67,10 +67,8 @@ class PolicyAgentTest(unittest.TestCase):
         agent = PolicyModuleAgent(config, self.loop)
         assert autoscaling_lcm_client.called
         assert autoscaling_mon_client.called
-        assert autoscaling_db_client.called
         assert alarming_lcm_client.called
         assert alarming_mon_client.called
-        assert alarming_db_client.called
         content = {
             'nslcmop_id': 'test_id',
         }
@@ -86,19 +84,18 @@ class PolicyAgentTest(unittest.TestCase):
         configure_vnf_alarms.side_effect = mock_configure_vnf_alarms
         delete_orphaned_alarms.side_effect = mock_delete_orphaned_alarms
 
-        autoscaling_db_client.return_value.get_nslcmop.return_value = nslcmop_completed
+        get_nslcmop.return_value = nslcmop_completed
         self.loop.run_until_complete(agent._handle_instantiated(content))
         configure_scaling_groups.assert_called_with('test_nsr_id')
         configure_scaling_groups.reset_mock()
 
-        autoscaling_db_client.return_value.get_nslcmop.return_value = nslcmop_failed
+        get_nslcmop.return_value = nslcmop_failed
         self.loop.run_until_complete(agent._handle_instantiated(content))
         configure_scaling_groups.assert_not_called()
 
-    @mock.patch('osm_policy_module.autoscaling.service.CommonDbClient')
+    @mock.patch.object(CommonDbClient, "__init__", lambda *args, **kwargs: None)
     @mock.patch('osm_policy_module.autoscaling.service.MonClient')
     @mock.patch('osm_policy_module.autoscaling.service.LcmClient')
-    @mock.patch('osm_policy_module.alarming.service.CommonDbClient')
     @mock.patch('osm_policy_module.alarming.service.MonClient')
     @mock.patch('osm_policy_module.alarming.service.LcmClient')
     @mock.patch.object(AutoscalingService, 'handle_alarm')
@@ -108,10 +105,8 @@ class PolicyAgentTest(unittest.TestCase):
                                        autoscaling_handle_alarm,
                                        autoscaling_lcm_client,
                                        autoscaling_mon_client,
-                                       autoscaling_db_client,
                                        alarming_lcm_client,
-                                       alarming_mon_client,
-                                       alarming_db_client):
+                                       alarming_mon_client):
         async def mock_handle_alarm(alarm_uuid, status, payload=None):
             pass
 
@@ -119,10 +114,8 @@ class PolicyAgentTest(unittest.TestCase):
         agent = PolicyModuleAgent(config, self.loop)
         assert autoscaling_lcm_client.called
         assert autoscaling_mon_client.called
-        assert autoscaling_db_client.called
         assert alarming_lcm_client.called
         assert alarming_mon_client.called
-        assert alarming_db_client.called
         content = {
             'notify_details': {
                 'alarm_uuid': 'test_alarm_uuid',