X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=policy_module%2Fosm_policy_module%2Fcore%2Fagent.py;h=60da337074706dea4d87624ba2da8927cb7fbeb0;hb=2f1be6ba8722211068fb678bc1048283f275f0af;hp=b4ae2604dc6475fa6dcc1490f651de4c7154cc4a;hpb=034a5305e1535971979b842e4ee73bee40d8b1af;p=osm%2FMON.git diff --git a/policy_module/osm_policy_module/core/agent.py b/policy_module/osm_policy_module/core/agent.py index b4ae260..60da337 100644 --- a/policy_module/osm_policy_module/core/agent.py +++ b/policy_module/osm_policy_module/core/agent.py @@ -23,6 +23,10 @@ ## import json import logging +from typing import Dict, List + +import peewee +import yaml from kafka import KafkaConsumer from osm_policy_module.core.config import Config @@ -55,7 +59,10 @@ class PolicyModuleAgent: log.info("Message arrived: %s", message) try: if message.key == 'configure_scaling': - content = json.loads(message.value) + try: + content = json.loads(message.value) + except: + content = yaml.safe_load(message.value) log.info("Creating scaling record in DB") # TODO: Use transactions: http://docs.peewee-orm.com/en/latest/peewee/transactions.html scaling_record = ScalingRecord.create( @@ -73,8 +80,9 @@ class PolicyModuleAgent: log.info("Creating alarm record in DB") alarm_uuid = mon_client.create_alarm( metric_name=config.metric_name, - resource_uuid=config.resource_uuid, - vim_uuid=config.vim_uuid, + ns_id=scaling_record.nsr_id, + vdu_name=config.vdu_name, + vnf_member_index=config.vnf_member_index, threshold=config.threshold, operation=config.operation, statistic=config.statistic @@ -87,15 +95,18 @@ class PolicyModuleAgent: if message.key == 'notify_alarm': content = json.loads(message.value) alarm_id = content['notify_details']['alarm_uuid'] - alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get() - if alarm: + log.info("Received alarm notification for alarm %s", alarm_id) + try: + alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get() lcm_client = LcmClient() log.info("Sending scaling action message for ns: %s", alarm_id) lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.action) + except ScalingAlarm.DoesNotExist: + log.info("There is no action configured for alarm %.", alarm_id) except Exception: log.exception("Error consuming message: ") - def _get_alarm_configs(self, message_content): + def _get_alarm_configs(self, message_content: Dict) -> List[AlarmConfig]: scaling_criterias = message_content['scaling_group_descriptor']['scaling_policy']['scaling_criteria'] alarm_configs = [] for criteria in scaling_criterias: @@ -105,11 +116,11 @@ class PolicyModuleAgent: scale_out_operation = criteria['scale_out_relational_operation'] scale_in_operation = criteria['scale_in_relational_operation'] statistic = criteria['monitoring_param']['aggregation_type'] - vim_uuid = '' - resource_uuid = '' + vdu_name = '' + vnf_member_index = '' if 'vdu_monitoring_param' in criteria['monitoring_param']: - vim_uuid = criteria['monitoring_param']['vdu_monitoring_param']['vim_uuid'] - resource_uuid = criteria['monitoring_param']['vdu_monitoring_param']['resource_id'] + vdu_name = criteria['monitoring_param']['vdu_monitoring_param']['vdu_name'] + vnf_member_index = criteria['monitoring_param']['vdu_monitoring_param']['vnf_member_index'] metric_name = criteria['monitoring_param']['vdu_monitoring_param']['name'] if 'vnf_metric' in criteria['monitoring_param']: # TODO vnf_metric @@ -118,15 +129,15 @@ class PolicyModuleAgent: # TODO vdu_metric continue scale_out_alarm_config = AlarmConfig(metric_name, - resource_uuid, - vim_uuid, + vdu_name, + vnf_member_index, scale_out_threshold, scale_out_operation, statistic, 'scale_out') scale_in_alarm_config = AlarmConfig(metric_name, - resource_uuid, - vim_uuid, + vdu_name, + vnf_member_index, scale_in_threshold, scale_in_operation, statistic,