From f35f914f45080e150cc7c6a2928de47a9ae5c848 Mon Sep 17 00:00:00 2001 From: Benjamin Diaz Date: Mon, 8 Oct 2018 16:25:36 -0300 Subject: [PATCH] Adds deletion of alarms in case exception is thrown during scaling config If a exception is thrown during the configuration of a scaling group, POL will check if there have been alarms created through MON, and if that is the case, it will delete them. Also, it adds validation of the MON alarm responses. It throws an exception if MON returns status: False, meaning there was an error creating the alarm. Signed-off-by: Benjamin Diaz Change-Id: I4f5f0c95ae2cce0c71efb73ff2e06cdf8ea08864 --- osm_policy_module/common/mon_client.py | 50 ++++- osm_policy_module/core/agent.py | 249 +++++++++++++------------ 2 files changed, 175 insertions(+), 124 deletions(-) diff --git a/osm_policy_module/common/mon_client.py b/osm_policy_module/common/mon_client.py index 5d2416e..6309025 100644 --- a/osm_policy_module/common/mon_client.py +++ b/osm_policy_module/common/mon_client.py @@ -45,8 +45,9 @@ class MonClient: def create_alarm(self, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: int, threshold: int, statistic: str, operation: str): cor_id = random.randint(1, 1000000) - msg = self._create_alarm_payload(cor_id, metric_name, ns_id, vdu_name, vnf_member_index, threshold, statistic, - operation) + msg = self._build_create_alarm_payload(cor_id, metric_name, ns_id, vdu_name, vnf_member_index, threshold, + statistic, + operation) log.info("Sending create_alarm_request %s", msg) self.producer.send(topic='alarm_request', key='create_alarm_request', value=json.dumps(msg)) self.producer.flush() @@ -61,14 +62,39 @@ class MonClient: content = json.loads(message.value) log.info("Received create_alarm_response %s", content) if self._is_alarm_response_correlation_id_eq(cor_id, content): + if not content['alarm_create_response']['status']: + raise ValueError("Error creating alarm in MON") alarm_uuid = content['alarm_create_response']['alarm_uuid'] - # TODO Handle error response return alarm_uuid raise ValueError('Timeout: No alarm creation response from MON. Is MON up?') - def _create_alarm_payload(self, cor_id: int, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: int, - threshold: int, statistic: str, operation: str): + def delete_alarm(self, ns_id: str, vnf_member_index: int, vdu_name: str, alarm_uuid: str): + cor_id = random.randint(1, 1000000) + msg = self._build_delete_alarm_payload(cor_id, ns_id, vdu_name, vnf_member_index, alarm_uuid) + log.info("Sending delete_alarm_request %s", msg) + self.producer.send(topic='alarm_request', key='delete_alarm_request', value=json.dumps(msg)) + self.producer.flush() + consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, + key_deserializer=bytes.decode, + value_deserializer=bytes.decode, + consumer_timeout_ms=10000) + consumer.subscribe(['alarm_response']) + for message in consumer: + if message.key == 'delete_alarm_response': + content = json.loads(message.value) + log.info("Received delete_alarm_response %s", content) + if self._is_alarm_response_correlation_id_eq(cor_id, content): + if not content['alarm_delete_response']['status']: + raise ValueError("Error deleting alarm in MON") + alarm_uuid = content['alarm_delete_response']['alarm_uuid'] + return alarm_uuid + + raise ValueError('Timeout: No alarm creation response from MON. Is MON up?') + + def _build_create_alarm_payload(self, cor_id: int, metric_name: str, ns_id: str, vdu_name: str, + vnf_member_index: int, + threshold: int, statistic: str, operation: str): alarm_create_request = { 'correlation_id': cor_id, 'alarm_name': str(uuid.uuid4()), @@ -86,5 +112,19 @@ class MonClient: } return msg + def _build_delete_alarm_payload(self, cor_id: int, ns_id: str, vdu_name: str, + vnf_member_index: int, alarm_uuid: str): + alarm_delete_request = { + 'correlation_id': cor_id, + 'alarm_uuid': alarm_uuid, + 'ns_id': ns_id, + 'vdu_name': vdu_name, + 'vnf_member_index': vnf_member_index + } + msg = { + 'alarm_delete_request': alarm_delete_request, + } + return msg + def _is_alarm_response_correlation_id_eq(self, cor_id, message_content): return message_content['alarm_create_response']['correlation_id'] == cor_id diff --git a/osm_policy_module/core/agent.py b/osm_policy_module/core/agent.py index bbefadd..ba35391 100644 --- a/osm_policy_module/core/agent.py +++ b/osm_policy_module/core/agent.py @@ -134,133 +134,144 @@ class PolicyModuleAgent: def _configure_scaling_groups(self, nsr_id: str): log.debug("_configure_scaling_groups: %s", nsr_id) - # TODO(diazb): Check for alarm creation on exception and clean resources if needed. # TODO: Add support for non-nfvi metrics + alarms_created = [] with database.db.atomic(): - vnfrs = self.db_client.get_vnfrs(nsr_id) - log.info("Found %s vnfrs", len(vnfrs)) - for vnfr in vnfrs: - vnfd = self.db_client.get_vnfd(vnfr['vnfd-id']) - log.info("Looking for vnfd %s", vnfr['vnfd-id']) - scaling_groups = vnfd['scaling-group-descriptor'] - vnf_monitoring_params = vnfd['monitoring-param'] - for scaling_group in scaling_groups: - try: - scaling_group_record = ScalingGroup.select().where( - ScalingGroup.nsr_id == nsr_id, - ScalingGroup.vnf_member_index == vnfr['member-vnf-index-ref'], - ScalingGroup.name == scaling_group['name'] - ).get() - log.info("Found existing scaling group record in DB...") - except ScalingGroup.DoesNotExist: - log.info("Creating scaling group record in DB...") - scaling_group_record = ScalingGroup.create( - nsr_id=nsr_id, - vnf_member_index=vnfr['member-vnf-index-ref'], - name=scaling_group['name'], - content=json.dumps(scaling_group) - ) - log.info( - "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%d, name=%s, content=%s", - scaling_group_record.nsr_id, - scaling_group_record.vnf_member_index, - scaling_group_record.name, - scaling_group_record.content) - for scaling_policy in scaling_group['scaling-policy']: - if scaling_policy['scaling-type'] != 'automatic': - continue - try: - scaling_policy_record = ScalingPolicy.select().join(ScalingGroup).where( - ScalingPolicy.name == scaling_policy['name'], - ScalingGroup.id == scaling_group_record.id - ).get() - log.info("Found existing scaling policy record in DB...") - except ScalingPolicy.DoesNotExist: - log.info("Creating scaling policy record in DB...") - scaling_policy_record = ScalingPolicy.create( - nsr_id=nsr_id, - name=scaling_policy['name'], - cooldown_time=scaling_policy['cooldown-time'], - scaling_group=scaling_group_record - ) - log.info("Created scaling policy record in DB : name=%s, scaling_group.name=%s", - scaling_policy_record.name, - scaling_policy_record.scaling_group.name) - - for scaling_criteria in scaling_policy['scaling-criteria']: + try: + with database.db.atomic(): + vnfrs = self.db_client.get_vnfrs(nsr_id) + log.info("Found %s vnfrs", len(vnfrs)) + for vnfr in vnfrs: + vnfd = self.db_client.get_vnfd(vnfr['vnfd-id']) + log.info("Looking for vnfd %s", vnfr['vnfd-id']) + scaling_groups = vnfd['scaling-group-descriptor'] + vnf_monitoring_params = vnfd['monitoring-param'] + for scaling_group in scaling_groups: try: - scaling_criteria_record = ScalingCriteria.select().join(ScalingPolicy).where( - ScalingPolicy.id == scaling_policy_record.id, - ScalingCriteria.name == scaling_criteria['name'] + scaling_group_record = ScalingGroup.select().where( + ScalingGroup.nsr_id == nsr_id, + ScalingGroup.vnf_member_index == int(vnfr['member-vnf-index-ref']), + ScalingGroup.name == scaling_group['name'] ).get() - log.info("Found existing scaling criteria record in DB...") - except ScalingCriteria.DoesNotExist: - log.info("Creating scaling criteria record in DB...") - scaling_criteria_record = ScalingCriteria.create( + log.info("Found existing scaling group record in DB...") + except ScalingGroup.DoesNotExist: + log.info("Creating scaling group record in DB...") + scaling_group_record = ScalingGroup.create( nsr_id=nsr_id, - name=scaling_criteria['name'], - scaling_policy=scaling_policy_record + vnf_member_index=vnfr['member-vnf-index-ref'], + name=scaling_group['name'], + content=json.dumps(scaling_group) ) log.info( - "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s", - scaling_criteria_record.name, - scaling_criteria_record.scaling_policy.name) - - for vdu_ref in scaling_group['vdu']: - vnf_monitoring_param = next( - filter(lambda param: param['id'] == scaling_criteria['vnf-monitoring-param-ref'], - vnf_monitoring_params)) - if not vdu_ref['vdu-id-ref'] == vnf_monitoring_param['vdu-ref']: + "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s", + scaling_group_record.nsr_id, + scaling_group_record.vnf_member_index, + scaling_group_record.name) + for scaling_policy in scaling_group['scaling-policy']: + if scaling_policy['scaling-type'] != 'automatic': continue - vdu = next( - filter(lambda vdu: vdu['id'] == vdu_ref['vdu-id-ref'], vnfd['vdu']) - ) - vdu_monitoring_params = vdu['monitoring-param'] - vdu_monitoring_param = next( - filter( - lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param-ref'], - vdu_monitoring_params)) - vdurs = list(filter(lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param['vdu-ref'], - vnfr['vdur'])) - for vdur in vdurs: + try: + scaling_policy_record = ScalingPolicy.select().join(ScalingGroup).where( + ScalingPolicy.name == scaling_policy['name'], + ScalingGroup.id == scaling_group_record.id + ).get() + log.info("Found existing scaling policy record in DB...") + except ScalingPolicy.DoesNotExist: + log.info("Creating scaling policy record in DB...") + scaling_policy_record = ScalingPolicy.create( + nsr_id=nsr_id, + name=scaling_policy['name'], + cooldown_time=scaling_policy['cooldown-time'], + scaling_group=scaling_group_record + ) + log.info("Created scaling policy record in DB : name=%s, scaling_group.name=%s", + scaling_policy_record.name, + scaling_policy_record.scaling_group.name) + + for scaling_criteria in scaling_policy['scaling-criteria']: try: - ScalingAlarm.select().join(ScalingCriteria).where( - ScalingAlarm.vdu_name == vdur['name'], + scaling_criteria_record = ScalingCriteria.select().join(ScalingPolicy).where( + ScalingPolicy.id == scaling_policy_record.id, ScalingCriteria.name == scaling_criteria['name'] ).get() - log.debug("vdu %s already has an alarm configured", vdur['name']) - continue - except ScalingAlarm.DoesNotExist: - pass - alarm_uuid = self.mon_client.create_alarm( - metric_name=vdu_monitoring_param['nfvi-metric'], - ns_id=nsr_id, - vdu_name=vdur['name'], - vnf_member_index=vnfr['member-vnf-index-ref'], - threshold=scaling_criteria['scale-in-threshold'], - operation=scaling_criteria['scale-in-relational-operation'], - statistic=vnf_monitoring_param['aggregation-type'] - ) - ScalingAlarm.create( - alarm_id=alarm_uuid, - action='scale_in', - vnf_member_index=int(vnfr['member-vnf-index-ref']), - vdu_name=vdur['name'], - scaling_criteria=scaling_criteria_record - ) - alarm_uuid = self.mon_client.create_alarm( - metric_name=vdu_monitoring_param['nfvi-metric'], - ns_id=nsr_id, - vdu_name=vdur['name'], - vnf_member_index=vnfr['member-vnf-index-ref'], - threshold=scaling_criteria['scale-out-threshold'], - operation=scaling_criteria['scale-out-relational-operation'], - statistic=vnf_monitoring_param['aggregation-type'] - ) - ScalingAlarm.create( - alarm_id=alarm_uuid, - action='scale_out', - vnf_member_index=int(vnfr['member-vnf-index-ref']), - vdu_name=vdur['name'], - scaling_criteria=scaling_criteria_record - ) + log.info("Found existing scaling criteria record in DB...") + except ScalingCriteria.DoesNotExist: + log.info("Creating scaling criteria record in DB...") + scaling_criteria_record = ScalingCriteria.create( + nsr_id=nsr_id, + name=scaling_criteria['name'], + scaling_policy=scaling_policy_record + ) + log.info( + "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s", + scaling_criteria_record.name, + scaling_criteria_record.scaling_policy.name) + + for vdu_ref in scaling_group['vdu']: + vnf_monitoring_param = next( + filter(lambda param: param['id'] == scaling_criteria[ + 'vnf-monitoring-param-ref'], vnf_monitoring_params)) + if not vdu_ref['vdu-id-ref'] == vnf_monitoring_param['vdu-ref']: + continue + vdu = next( + filter(lambda vdu: vdu['id'] == vdu_ref['vdu-id-ref'], vnfd['vdu']) + ) + vdu_monitoring_params = vdu['monitoring-param'] + vdu_monitoring_param = next( + filter( + lambda param: param['id'] == vnf_monitoring_param[ + 'vdu-monitoring-param-ref'], + vdu_monitoring_params)) + vdurs = list( + filter(lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param['vdu-ref'], + vnfr['vdur'])) + for vdur in vdurs: + try: + ScalingAlarm.select().join(ScalingCriteria).where( + ScalingAlarm.vdu_name == vdur['name'], + ScalingCriteria.name == scaling_criteria['name'] + ).get() + log.debug("vdu %s already has an alarm configured", vdur['name']) + continue + except ScalingAlarm.DoesNotExist: + pass + alarm_uuid = self.mon_client.create_alarm( + metric_name=vdu_monitoring_param['nfvi-metric'], + ns_id=nsr_id, + vdu_name=vdur['name'], + vnf_member_index=vnfr['member-vnf-index-ref'], + threshold=scaling_criteria['scale-in-threshold'], + operation=scaling_criteria['scale-in-relational-operation'], + statistic=vnf_monitoring_param['aggregation-type'] + ) + ScalingAlarm.create( + alarm_id=alarm_uuid, + action='scale_in', + vnf_member_index=int(vnfr['member-vnf-index-ref']), + vdu_name=vdur['name'], + scaling_criteria=scaling_criteria_record + ) + alarm_uuid = self.mon_client.create_alarm( + metric_name=vdu_monitoring_param['nfvi-metric'], + ns_id=nsr_id, + vdu_name=vdur['name'], + vnf_member_index=vnfr['member-vnf-index-ref'], + threshold=scaling_criteria['scale-out-threshold'], + operation=scaling_criteria['scale-out-relational-operation'], + statistic=vnf_monitoring_param['aggregation-type'] + ) + ScalingAlarm.create( + alarm_id=alarm_uuid, + action='scale_out', + vnf_member_index=int(vnfr['member-vnf-index-ref']), + vdu_name=vdur['name'], + scaling_criteria=scaling_criteria_record + ) + + except Exception as e: + log.exception("Error configuring scaling groups:") + if len(alarms_created) > 0: + log.info("Cleaning alarm resources in MON") + for alarm in alarms_created: + self.mon_client.delete_alarm(*alarm) + raise e -- 2.17.1