Adds more alarm parameters to alarm notification log
[osm/MON.git] / policy_module / osm_policy_module / core / agent.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import json
25 import logging
26 from typing import Dict, List
27
28 import peewee
29 import yaml
30
31 from kafka import KafkaConsumer
32 from osm_policy_module.core.config import Config
33 from osm_policy_module.common.lcm_client import LcmClient
34
35 from osm_policy_module.common.alarm_config import AlarmConfig
36 from osm_policy_module.common.mon_client import MonClient
37 from osm_policy_module.core.database import ScalingRecord, ScalingAlarm
38
39 log = logging.getLogger(__name__)
40
41
42 class PolicyModuleAgent:
43 def run(self):
44 cfg = Config.instance()
45 # Initialize servers
46 kafka_server = '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
47 cfg.get('policy_module', 'kafka_server_port'))
48
49 # Initialize Kafka consumer
50 log.info("Connecting to Kafka server at %s", kafka_server)
51 # TODO: Add logic to handle deduplication of messages when using group_id.
52 # See: https://stackoverflow.com/a/29836412
53 consumer = KafkaConsumer(bootstrap_servers=kafka_server,
54 key_deserializer=bytes.decode,
55 value_deserializer=bytes.decode)
56 consumer.subscribe(['lcm_pm', 'alarm_response'])
57
58 for message in consumer:
59 log.info("Message arrived: %s", message)
60 try:
61 if message.key == 'configure_scaling':
62 try:
63 content = json.loads(message.value)
64 except:
65 content = yaml.safe_load(message.value)
66 log.info("Creating scaling record in DB")
67 # TODO: Use transactions: http://docs.peewee-orm.com/en/latest/peewee/transactions.html
68 scaling_record = ScalingRecord.create(
69 nsr_id=content['ns_id'],
70 name=content['scaling_group_descriptor']['name'],
71 content=json.dumps(content)
72 )
73 log.info("Created scaling record in DB : nsr_id=%s, name=%s, content=%s",
74 scaling_record.nsr_id,
75 scaling_record.name,
76 scaling_record.content)
77 alarm_configs = self._get_alarm_configs(content)
78 for config in alarm_configs:
79 mon_client = MonClient()
80 log.info("Creating alarm record in DB")
81 alarm_uuid = mon_client.create_alarm(
82 metric_name=config.metric_name,
83 ns_id=scaling_record.nsr_id,
84 vdu_name=config.vdu_name,
85 vnf_member_index=config.vnf_member_index,
86 threshold=config.threshold,
87 operation=config.operation,
88 statistic=config.statistic
89 )
90 ScalingAlarm.create(
91 alarm_id=alarm_uuid,
92 action=config.action,
93 scaling_record=scaling_record
94 )
95 if message.key == 'notify_alarm':
96 content = json.loads(message.value)
97 alarm_id = content['notify_details']['alarm_uuid']
98 metric_name = content['notify_details']['metric_name']
99 operation = content['notify_details']['operation']
100 threshold = content['notify_details']['threshold_value']
101 vdu_name = content['notify_details']['vdu_name']
102 vnf_member_index = content['notify_details']['vnf_member_index']
103 ns_id = content['notify_details']['ns_id']
104 log.info(
105 "Received alarm notification for alarm %s, \
106 metric %s, \
107 operation %s, \
108 threshold %s, \
109 vdu_name %s, \
110 vnf_member_index %s, \
111 ns_id %s ",
112 alarm_id, metric_name, operation, threshold, vdu_name, vnf_member_index, ns_id)
113 try:
114 alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
115 lcm_client = LcmClient()
116 log.info("Sending scaling action message for ns: %s", alarm_id)
117 lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.action)
118 except ScalingAlarm.DoesNotExist:
119 log.info("There is no action configured for alarm %.", alarm_id)
120 except Exception:
121 log.exception("Error consuming message: ")
122
123 def _get_alarm_configs(self, message_content: Dict) -> List[AlarmConfig]:
124 scaling_criterias = message_content['scaling_group_descriptor']['scaling_policy']['scaling_criteria']
125 alarm_configs = []
126 for criteria in scaling_criterias:
127 metric_name = ''
128 scale_out_threshold = criteria['scale_out_threshold']
129 scale_in_threshold = criteria['scale_in_threshold']
130 scale_out_operation = criteria['scale_out_relational_operation']
131 scale_in_operation = criteria['scale_in_relational_operation']
132 statistic = criteria['monitoring_param']['aggregation_type']
133 vdu_name = ''
134 vnf_member_index = ''
135 if 'vdu_monitoring_param' in criteria['monitoring_param']:
136 vdu_name = criteria['monitoring_param']['vdu_monitoring_param']['vdu_name']
137 vnf_member_index = criteria['monitoring_param']['vdu_monitoring_param']['vnf_member_index']
138 metric_name = criteria['monitoring_param']['vdu_monitoring_param']['name']
139 if 'vnf_metric' in criteria['monitoring_param']:
140 # TODO vnf_metric
141 continue
142 if 'vdu_metric' in criteria['monitoring_param']:
143 # TODO vdu_metric
144 continue
145 scale_out_alarm_config = AlarmConfig(metric_name,
146 vdu_name,
147 vnf_member_index,
148 scale_out_threshold,
149 scale_out_operation,
150 statistic,
151 'scale_out')
152 scale_in_alarm_config = AlarmConfig(metric_name,
153 vdu_name,
154 vnf_member_index,
155 scale_in_threshold,
156 scale_in_operation,
157 statistic,
158 'scale_in')
159 alarm_configs.append(scale_in_alarm_config)
160 alarm_configs.append(scale_out_alarm_config)
161 return alarm_configs