Adds deletion of alarms in case exception is thrown during scaling config 52/6652/4
authorBenjamin Diaz <bdiaz@whitestack.com>
Mon, 8 Oct 2018 19:25:36 +0000 (16:25 -0300)
committerBenjamin Diaz <bdiaz@whitestack.com>
Sun, 14 Oct 2018 21:38:20 +0000 (18:38 -0300)
If a exception is thrown during the configuration of a scaling group, POL
will check if there have been alarms created through MON, and if that is
the case, it will delete them.
Also, it adds validation of the MON alarm responses. It throws an exception
if MON returns status: False, meaning there was an error creating the alarm.

Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
Change-Id: I4f5f0c95ae2cce0c71efb73ff2e06cdf8ea08864

osm_policy_module/common/mon_client.py
osm_policy_module/core/agent.py

index 5d2416e..6309025 100644 (file)
@@ -45,8 +45,9 @@ class MonClient:
     def create_alarm(self, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: int, threshold: int,
                      statistic: str, operation: str):
         cor_id = random.randint(1, 1000000)
-        msg = self._create_alarm_payload(cor_id, metric_name, ns_id, vdu_name, vnf_member_index, threshold, statistic,
-                                         operation)
+        msg = self._build_create_alarm_payload(cor_id, metric_name, ns_id, vdu_name, vnf_member_index, threshold,
+                                               statistic,
+                                               operation)
         log.info("Sending create_alarm_request %s", msg)
         self.producer.send(topic='alarm_request', key='create_alarm_request', value=json.dumps(msg))
         self.producer.flush()
@@ -61,14 +62,39 @@ class MonClient:
                 content = json.loads(message.value)
                 log.info("Received create_alarm_response %s", content)
                 if self._is_alarm_response_correlation_id_eq(cor_id, content):
+                    if not content['alarm_create_response']['status']:
+                        raise ValueError("Error creating alarm in MON")
                     alarm_uuid = content['alarm_create_response']['alarm_uuid']
-                    # TODO Handle error response
                     return alarm_uuid
 
         raise ValueError('Timeout: No alarm creation response from MON. Is MON up?')
 
-    def _create_alarm_payload(self, cor_id: int, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: int,
-                              threshold: int, statistic: str, operation: str):
+    def delete_alarm(self, ns_id: str, vnf_member_index: int, vdu_name: str, alarm_uuid: str):
+        cor_id = random.randint(1, 1000000)
+        msg = self._build_delete_alarm_payload(cor_id, ns_id, vdu_name, vnf_member_index, alarm_uuid)
+        log.info("Sending delete_alarm_request %s", msg)
+        self.producer.send(topic='alarm_request', key='delete_alarm_request', value=json.dumps(msg))
+        self.producer.flush()
+        consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
+                                 key_deserializer=bytes.decode,
+                                 value_deserializer=bytes.decode,
+                                 consumer_timeout_ms=10000)
+        consumer.subscribe(['alarm_response'])
+        for message in consumer:
+            if message.key == 'delete_alarm_response':
+                content = json.loads(message.value)
+                log.info("Received delete_alarm_response %s", content)
+                if self._is_alarm_response_correlation_id_eq(cor_id, content):
+                    if not content['alarm_delete_response']['status']:
+                        raise ValueError("Error deleting alarm in MON")
+                    alarm_uuid = content['alarm_delete_response']['alarm_uuid']
+                    return alarm_uuid
+
+        raise ValueError('Timeout: No alarm creation response from MON. Is MON up?')
+
+    def _build_create_alarm_payload(self, cor_id: int, metric_name: str, ns_id: str, vdu_name: str,
+                                    vnf_member_index: int,
+                                    threshold: int, statistic: str, operation: str):
         alarm_create_request = {
             'correlation_id': cor_id,
             'alarm_name': str(uuid.uuid4()),
@@ -86,5 +112,19 @@ class MonClient:
         }
         return msg
 
+    def _build_delete_alarm_payload(self, cor_id: int, ns_id: str, vdu_name: str,
+                                    vnf_member_index: int, alarm_uuid: str):
+        alarm_delete_request = {
+            'correlation_id': cor_id,
+            'alarm_uuid': alarm_uuid,
+            'ns_id': ns_id,
+            'vdu_name': vdu_name,
+            'vnf_member_index': vnf_member_index
+        }
+        msg = {
+            'alarm_delete_request': alarm_delete_request,
+        }
+        return msg
+
     def _is_alarm_response_correlation_id_eq(self, cor_id, message_content):
         return message_content['alarm_create_response']['correlation_id'] == cor_id
index bbefadd..ba35391 100644 (file)
@@ -134,133 +134,144 @@ class PolicyModuleAgent:
 
     def _configure_scaling_groups(self, nsr_id: str):
         log.debug("_configure_scaling_groups: %s", nsr_id)
-        # TODO(diazb): Check for alarm creation on exception and clean resources if needed.
         # TODO: Add support for non-nfvi metrics
+        alarms_created = []
         with database.db.atomic():
-            vnfrs = self.db_client.get_vnfrs(nsr_id)
-            log.info("Found %s vnfrs", len(vnfrs))
-            for vnfr in vnfrs:
-                vnfd = self.db_client.get_vnfd(vnfr['vnfd-id'])
-                log.info("Looking for vnfd %s", vnfr['vnfd-id'])
-                scaling_groups = vnfd['scaling-group-descriptor']
-                vnf_monitoring_params = vnfd['monitoring-param']
-                for scaling_group in scaling_groups:
-                    try:
-                        scaling_group_record = ScalingGroup.select().where(
-                            ScalingGroup.nsr_id == nsr_id,
-                            ScalingGroup.vnf_member_index == vnfr['member-vnf-index-ref'],
-                            ScalingGroup.name == scaling_group['name']
-                        ).get()
-                        log.info("Found existing scaling group record in DB...")
-                    except ScalingGroup.DoesNotExist:
-                        log.info("Creating scaling group record in DB...")
-                        scaling_group_record = ScalingGroup.create(
-                            nsr_id=nsr_id,
-                            vnf_member_index=vnfr['member-vnf-index-ref'],
-                            name=scaling_group['name'],
-                            content=json.dumps(scaling_group)
-                        )
-                        log.info(
-                            "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%d, name=%s, content=%s",
-                            scaling_group_record.nsr_id,
-                            scaling_group_record.vnf_member_index,
-                            scaling_group_record.name,
-                            scaling_group_record.content)
-                    for scaling_policy in scaling_group['scaling-policy']:
-                        if scaling_policy['scaling-type'] != 'automatic':
-                            continue
-                        try:
-                            scaling_policy_record = ScalingPolicy.select().join(ScalingGroup).where(
-                                ScalingPolicy.name == scaling_policy['name'],
-                                ScalingGroup.id == scaling_group_record.id
-                            ).get()
-                            log.info("Found existing scaling policy record in DB...")
-                        except ScalingPolicy.DoesNotExist:
-                            log.info("Creating scaling policy record in DB...")
-                            scaling_policy_record = ScalingPolicy.create(
-                                nsr_id=nsr_id,
-                                name=scaling_policy['name'],
-                                cooldown_time=scaling_policy['cooldown-time'],
-                                scaling_group=scaling_group_record
-                            )
-                            log.info("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
-                                     scaling_policy_record.name,
-                                     scaling_policy_record.scaling_group.name)
-
-                        for scaling_criteria in scaling_policy['scaling-criteria']:
+            try:
+                with database.db.atomic():
+                    vnfrs = self.db_client.get_vnfrs(nsr_id)
+                    log.info("Found %s vnfrs", len(vnfrs))
+                    for vnfr in vnfrs:
+                        vnfd = self.db_client.get_vnfd(vnfr['vnfd-id'])
+                        log.info("Looking for vnfd %s", vnfr['vnfd-id'])
+                        scaling_groups = vnfd['scaling-group-descriptor']
+                        vnf_monitoring_params = vnfd['monitoring-param']
+                        for scaling_group in scaling_groups:
                             try:
-                                scaling_criteria_record = ScalingCriteria.select().join(ScalingPolicy).where(
-                                    ScalingPolicy.id == scaling_policy_record.id,
-                                    ScalingCriteria.name == scaling_criteria['name']
+                                scaling_group_record = ScalingGroup.select().where(
+                                    ScalingGroup.nsr_id == nsr_id,
+                                    ScalingGroup.vnf_member_index == int(vnfr['member-vnf-index-ref']),
+                                    ScalingGroup.name == scaling_group['name']
                                 ).get()
-                                log.info("Found existing scaling criteria record in DB...")
-                            except ScalingCriteria.DoesNotExist:
-                                log.info("Creating scaling criteria record in DB...")
-                                scaling_criteria_record = ScalingCriteria.create(
+                                log.info("Found existing scaling group record in DB...")
+                            except ScalingGroup.DoesNotExist:
+                                log.info("Creating scaling group record in DB...")
+                                scaling_group_record = ScalingGroup.create(
                                     nsr_id=nsr_id,
-                                    name=scaling_criteria['name'],
-                                    scaling_policy=scaling_policy_record
+                                    vnf_member_index=vnfr['member-vnf-index-ref'],
+                                    name=scaling_group['name'],
+                                    content=json.dumps(scaling_group)
                                 )
                                 log.info(
-                                    "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
-                                    scaling_criteria_record.name,
-                                    scaling_criteria_record.scaling_policy.name)
-
-                            for vdu_ref in scaling_group['vdu']:
-                                vnf_monitoring_param = next(
-                                    filter(lambda param: param['id'] == scaling_criteria['vnf-monitoring-param-ref'],
-                                           vnf_monitoring_params))
-                                if not vdu_ref['vdu-id-ref'] == vnf_monitoring_param['vdu-ref']:
+                                    "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s",
+                                    scaling_group_record.nsr_id,
+                                    scaling_group_record.vnf_member_index,
+                                    scaling_group_record.name)
+                            for scaling_policy in scaling_group['scaling-policy']:
+                                if scaling_policy['scaling-type'] != 'automatic':
                                     continue
-                                vdu = next(
-                                    filter(lambda vdu: vdu['id'] == vdu_ref['vdu-id-ref'], vnfd['vdu'])
-                                )
-                                vdu_monitoring_params = vdu['monitoring-param']
-                                vdu_monitoring_param = next(
-                                    filter(
-                                        lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param-ref'],
-                                        vdu_monitoring_params))
-                                vdurs = list(filter(lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param['vdu-ref'],
-                                                    vnfr['vdur']))
-                                for vdur in vdurs:
+                                try:
+                                    scaling_policy_record = ScalingPolicy.select().join(ScalingGroup).where(
+                                        ScalingPolicy.name == scaling_policy['name'],
+                                        ScalingGroup.id == scaling_group_record.id
+                                    ).get()
+                                    log.info("Found existing scaling policy record in DB...")
+                                except ScalingPolicy.DoesNotExist:
+                                    log.info("Creating scaling policy record in DB...")
+                                    scaling_policy_record = ScalingPolicy.create(
+                                        nsr_id=nsr_id,
+                                        name=scaling_policy['name'],
+                                        cooldown_time=scaling_policy['cooldown-time'],
+                                        scaling_group=scaling_group_record
+                                    )
+                                    log.info("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
+                                             scaling_policy_record.name,
+                                             scaling_policy_record.scaling_group.name)
+
+                                for scaling_criteria in scaling_policy['scaling-criteria']:
                                     try:
-                                        ScalingAlarm.select().join(ScalingCriteria).where(
-                                            ScalingAlarm.vdu_name == vdur['name'],
+                                        scaling_criteria_record = ScalingCriteria.select().join(ScalingPolicy).where(
+                                            ScalingPolicy.id == scaling_policy_record.id,
                                             ScalingCriteria.name == scaling_criteria['name']
                                         ).get()
-                                        log.debug("vdu %s already has an alarm configured", vdur['name'])
-                                        continue
-                                    except ScalingAlarm.DoesNotExist:
-                                        pass
-                                    alarm_uuid = self.mon_client.create_alarm(
-                                        metric_name=vdu_monitoring_param['nfvi-metric'],
-                                        ns_id=nsr_id,
-                                        vdu_name=vdur['name'],
-                                        vnf_member_index=vnfr['member-vnf-index-ref'],
-                                        threshold=scaling_criteria['scale-in-threshold'],
-                                        operation=scaling_criteria['scale-in-relational-operation'],
-                                        statistic=vnf_monitoring_param['aggregation-type']
-                                    )
-                                    ScalingAlarm.create(
-                                        alarm_id=alarm_uuid,
-                                        action='scale_in',
-                                        vnf_member_index=int(vnfr['member-vnf-index-ref']),
-                                        vdu_name=vdur['name'],
-                                        scaling_criteria=scaling_criteria_record
-                                    )
-                                    alarm_uuid = self.mon_client.create_alarm(
-                                        metric_name=vdu_monitoring_param['nfvi-metric'],
-                                        ns_id=nsr_id,
-                                        vdu_name=vdur['name'],
-                                        vnf_member_index=vnfr['member-vnf-index-ref'],
-                                        threshold=scaling_criteria['scale-out-threshold'],
-                                        operation=scaling_criteria['scale-out-relational-operation'],
-                                        statistic=vnf_monitoring_param['aggregation-type']
-                                    )
-                                    ScalingAlarm.create(
-                                        alarm_id=alarm_uuid,
-                                        action='scale_out',
-                                        vnf_member_index=int(vnfr['member-vnf-index-ref']),
-                                        vdu_name=vdur['name'],
-                                        scaling_criteria=scaling_criteria_record
-                                    )
+                                        log.info("Found existing scaling criteria record in DB...")
+                                    except ScalingCriteria.DoesNotExist:
+                                        log.info("Creating scaling criteria record in DB...")
+                                        scaling_criteria_record = ScalingCriteria.create(
+                                            nsr_id=nsr_id,
+                                            name=scaling_criteria['name'],
+                                            scaling_policy=scaling_policy_record
+                                        )
+                                        log.info(
+                                            "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
+                                            scaling_criteria_record.name,
+                                            scaling_criteria_record.scaling_policy.name)
+
+                                    for vdu_ref in scaling_group['vdu']:
+                                        vnf_monitoring_param = next(
+                                            filter(lambda param: param['id'] == scaling_criteria[
+                                                'vnf-monitoring-param-ref'], vnf_monitoring_params))
+                                        if not vdu_ref['vdu-id-ref'] == vnf_monitoring_param['vdu-ref']:
+                                            continue
+                                        vdu = next(
+                                            filter(lambda vdu: vdu['id'] == vdu_ref['vdu-id-ref'], vnfd['vdu'])
+                                        )
+                                        vdu_monitoring_params = vdu['monitoring-param']
+                                        vdu_monitoring_param = next(
+                                            filter(
+                                                lambda param: param['id'] == vnf_monitoring_param[
+                                                    'vdu-monitoring-param-ref'],
+                                                vdu_monitoring_params))
+                                        vdurs = list(
+                                            filter(lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param['vdu-ref'],
+                                                   vnfr['vdur']))
+                                        for vdur in vdurs:
+                                            try:
+                                                ScalingAlarm.select().join(ScalingCriteria).where(
+                                                    ScalingAlarm.vdu_name == vdur['name'],
+                                                    ScalingCriteria.name == scaling_criteria['name']
+                                                ).get()
+                                                log.debug("vdu %s already has an alarm configured", vdur['name'])
+                                                continue
+                                            except ScalingAlarm.DoesNotExist:
+                                                pass
+                                            alarm_uuid = self.mon_client.create_alarm(
+                                                metric_name=vdu_monitoring_param['nfvi-metric'],
+                                                ns_id=nsr_id,
+                                                vdu_name=vdur['name'],
+                                                vnf_member_index=vnfr['member-vnf-index-ref'],
+                                                threshold=scaling_criteria['scale-in-threshold'],
+                                                operation=scaling_criteria['scale-in-relational-operation'],
+                                                statistic=vnf_monitoring_param['aggregation-type']
+                                            )
+                                            ScalingAlarm.create(
+                                                alarm_id=alarm_uuid,
+                                                action='scale_in',
+                                                vnf_member_index=int(vnfr['member-vnf-index-ref']),
+                                                vdu_name=vdur['name'],
+                                                scaling_criteria=scaling_criteria_record
+                                            )
+                                            alarm_uuid = self.mon_client.create_alarm(
+                                                metric_name=vdu_monitoring_param['nfvi-metric'],
+                                                ns_id=nsr_id,
+                                                vdu_name=vdur['name'],
+                                                vnf_member_index=vnfr['member-vnf-index-ref'],
+                                                threshold=scaling_criteria['scale-out-threshold'],
+                                                operation=scaling_criteria['scale-out-relational-operation'],
+                                                statistic=vnf_monitoring_param['aggregation-type']
+                                            )
+                                            ScalingAlarm.create(
+                                                alarm_id=alarm_uuid,
+                                                action='scale_out',
+                                                vnf_member_index=int(vnfr['member-vnf-index-ref']),
+                                                vdu_name=vdur['name'],
+                                                scaling_criteria=scaling_criteria_record
+                                            )
+
+            except Exception as e:
+                log.exception("Error configuring scaling groups:")
+                if len(alarms_created) > 0:
+                    log.info("Cleaning alarm resources in MON")
+                    for alarm in alarms_created:
+                        self.mon_client.delete_alarm(*alarm)
+                raise e