X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=policy_module%2Fosm_policy_module%2Fcore%2Fagent.py;h=cdd5dfc9eb8b8b24b8307a20fdfa39daa5b0015c;hb=d9a55971b0b87f0d43883f840478f6fbc5f62fde;hp=b4ae2604dc6475fa6dcc1490f651de4c7154cc4a;hpb=77b8b3f70d0bdb8faeeaab4fc8144f1d0a1e4732;p=osm%2FMON.git diff --git a/policy_module/osm_policy_module/core/agent.py b/policy_module/osm_policy_module/core/agent.py index b4ae260..cdd5dfc 100644 --- a/policy_module/osm_policy_module/core/agent.py +++ b/policy_module/osm_policy_module/core/agent.py @@ -23,13 +23,14 @@ ## import json import logging +from typing import Dict, List +import yaml from kafka import KafkaConsumer -from osm_policy_module.core.config import Config -from osm_policy_module.common.lcm_client import LcmClient - from osm_policy_module.common.alarm_config import AlarmConfig +from osm_policy_module.common.lcm_client import LcmClient from osm_policy_module.common.mon_client import MonClient +from osm_policy_module.core.config import Config from osm_policy_module.core.database import ScalingRecord, ScalingAlarm log = logging.getLogger(__name__) @@ -44,18 +45,20 @@ class PolicyModuleAgent: # Initialize Kafka consumer log.info("Connecting to Kafka server at %s", kafka_server) - # TODO: Add logic to handle deduplication of messages when using group_id. - # See: https://stackoverflow.com/a/29836412 consumer = KafkaConsumer(bootstrap_servers=kafka_server, key_deserializer=bytes.decode, - value_deserializer=bytes.decode) + value_deserializer=bytes.decode, + group_id="pm-consumer") consumer.subscribe(['lcm_pm', 'alarm_response']) for message in consumer: log.info("Message arrived: %s", message) try: if message.key == 'configure_scaling': - content = json.loads(message.value) + try: + content = json.loads(message.value) + except: + content = yaml.safe_load(message.value) log.info("Creating scaling record in DB") # TODO: Use transactions: http://docs.peewee-orm.com/en/latest/peewee/transactions.html scaling_record = ScalingRecord.create( @@ -73,8 +76,9 @@ class PolicyModuleAgent: log.info("Creating alarm record in DB") alarm_uuid = mon_client.create_alarm( metric_name=config.metric_name, - resource_uuid=config.resource_uuid, - vim_uuid=config.vim_uuid, + ns_id=scaling_record.nsr_id, + vdu_name=config.vdu_name, + vnf_member_index=config.vnf_member_index, threshold=config.threshold, operation=config.operation, statistic=config.statistic @@ -87,15 +91,32 @@ class PolicyModuleAgent: if message.key == 'notify_alarm': content = json.loads(message.value) alarm_id = content['notify_details']['alarm_uuid'] - alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get() - if alarm: + metric_name = content['notify_details']['metric_name'] + operation = content['notify_details']['operation'] + threshold = content['notify_details']['threshold_value'] + vdu_name = content['notify_details']['vdu_name'] + vnf_member_index = content['notify_details']['vnf_member_index'] + ns_id = content['notify_details']['ns_id'] + log.info( + "Received alarm notification for alarm %s, \ + metric %s, \ + operation %s, \ + threshold %s, \ + vdu_name %s, \ + vnf_member_index %s, \ + ns_id %s ", + alarm_id, metric_name, operation, threshold, vdu_name, vnf_member_index, ns_id) + try: + alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get() lcm_client = LcmClient() log.info("Sending scaling action message for ns: %s", alarm_id) lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.action) + except ScalingAlarm.DoesNotExist: + log.info("There is no action configured for alarm %s.", alarm_id) except Exception: log.exception("Error consuming message: ") - def _get_alarm_configs(self, message_content): + def _get_alarm_configs(self, message_content: Dict) -> List[AlarmConfig]: scaling_criterias = message_content['scaling_group_descriptor']['scaling_policy']['scaling_criteria'] alarm_configs = [] for criteria in scaling_criterias: @@ -105,11 +126,11 @@ class PolicyModuleAgent: scale_out_operation = criteria['scale_out_relational_operation'] scale_in_operation = criteria['scale_in_relational_operation'] statistic = criteria['monitoring_param']['aggregation_type'] - vim_uuid = '' - resource_uuid = '' + vdu_name = '' + vnf_member_index = '' if 'vdu_monitoring_param' in criteria['monitoring_param']: - vim_uuid = criteria['monitoring_param']['vdu_monitoring_param']['vim_uuid'] - resource_uuid = criteria['monitoring_param']['vdu_monitoring_param']['resource_id'] + vdu_name = criteria['monitoring_param']['vdu_monitoring_param']['vdu_name'] + vnf_member_index = criteria['monitoring_param']['vdu_monitoring_param']['vnf_member_index'] metric_name = criteria['monitoring_param']['vdu_monitoring_param']['name'] if 'vnf_metric' in criteria['monitoring_param']: # TODO vnf_metric @@ -118,15 +139,15 @@ class PolicyModuleAgent: # TODO vdu_metric continue scale_out_alarm_config = AlarmConfig(metric_name, - resource_uuid, - vim_uuid, + vdu_name, + vnf_member_index, scale_out_threshold, scale_out_operation, statistic, 'scale_out') scale_in_alarm_config = AlarmConfig(metric_name, - resource_uuid, - vim_uuid, + vdu_name, + vnf_member_index, scale_in_threshold, scale_in_operation, statistic,