import logging
from typing import Dict, List
-import peewee
import yaml
-
from kafka import KafkaConsumer
-from osm_policy_module.core.config import Config
-from osm_policy_module.common.lcm_client import LcmClient
-
from osm_policy_module.common.alarm_config import AlarmConfig
+from osm_policy_module.common.lcm_client import LcmClient
from osm_policy_module.common.mon_client import MonClient
+from osm_policy_module.core.config import Config
from osm_policy_module.core.database import ScalingRecord, ScalingAlarm
log = logging.getLogger(__name__)
# Initialize Kafka consumer
log.info("Connecting to Kafka server at %s", kafka_server)
- # TODO: Add logic to handle deduplication of messages when using group_id.
- # See: https://stackoverflow.com/a/29836412
consumer = KafkaConsumer(bootstrap_servers=kafka_server,
key_deserializer=bytes.decode,
- value_deserializer=bytes.decode)
+ value_deserializer=bytes.decode,
+ group_id="pm-consumer")
consumer.subscribe(['lcm_pm', 'alarm_response'])
for message in consumer:
if message.key == 'notify_alarm':
content = json.loads(message.value)
alarm_id = content['notify_details']['alarm_uuid']
- log.info("Received alarm notification for alarm %s", alarm_id)
+ metric_name = content['notify_details']['metric_name']
+ operation = content['notify_details']['operation']
+ threshold = content['notify_details']['threshold_value']
+ vdu_name = content['notify_details']['vdu_name']
+ vnf_member_index = content['notify_details']['vnf_member_index']
+ ns_id = content['notify_details']['ns_id']
+ log.info(
+ "Received alarm notification for alarm %s, \
+ metric %s, \
+ operation %s, \
+ threshold %s, \
+ vdu_name %s, \
+ vnf_member_index %s, \
+ ns_id %s ",
+ alarm_id, metric_name, operation, threshold, vdu_name, vnf_member_index, ns_id)
try:
alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
lcm_client = LcmClient()
log.info("Sending scaling action message for ns: %s", alarm_id)
lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.action)
except ScalingAlarm.DoesNotExist:
- log.info("There is no action configured for alarm %.", alarm_id)
+ log.info("There is no action configured for alarm %s.", alarm_id)
except Exception:
log.exception("Error consuming message: ")