Merge "Fixes bugs for integration with policy module"
[osm/MON.git] / policy_module / osm_policy_module / core / agent.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import json
25 import logging
26
27 from kafka import KafkaConsumer
28 from osm_policy_module.core.config import Config
29 from osm_policy_module.common.lcm_client import LcmClient
30
31 from osm_policy_module.common.alarm_config import AlarmConfig
32 from osm_policy_module.common.mon_client import MonClient
33 from osm_policy_module.core.database import ScalingRecord, ScalingAlarm
34
35 log = logging.getLogger(__name__)
36
37
38 class PolicyModuleAgent:
39 def run(self):
40 cfg = Config.instance()
41 # Initialize servers
42 kafka_server = '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
43 cfg.get('policy_module', 'kafka_server_port'))
44
45 # Initialize Kafka consumer
46 log.info("Connecting to Kafka server at %s", kafka_server)
47 # TODO: Add logic to handle deduplication of messages when using group_id.
48 # See: https://stackoverflow.com/a/29836412
49 consumer = KafkaConsumer(bootstrap_servers=kafka_server,
50 key_deserializer=bytes.decode,
51 value_deserializer=bytes.decode)
52 consumer.subscribe(['lcm_pm', 'alarm_response'])
53
54 for message in consumer:
55 log.info("Message arrived: %s", message)
56 try:
57 if message.key == 'configure_scaling':
58 content = json.loads(message.value)
59 log.info("Creating scaling record in DB")
60 # TODO: Use transactions: http://docs.peewee-orm.com/en/latest/peewee/transactions.html
61 scaling_record = ScalingRecord.create(
62 nsr_id=content['ns_id'],
63 name=content['scaling_group_descriptor']['name'],
64 content=json.dumps(content)
65 )
66 log.info("Created scaling record in DB : nsr_id=%s, name=%s, content=%s",
67 scaling_record.nsr_id,
68 scaling_record.name,
69 scaling_record.content)
70 alarm_configs = self._get_alarm_configs(content)
71 for config in alarm_configs:
72 mon_client = MonClient()
73 log.info("Creating alarm record in DB")
74 alarm_uuid = mon_client.create_alarm(
75 metric_name=config.metric_name,
76 resource_uuid=config.resource_uuid,
77 vim_uuid=config.vim_uuid,
78 threshold=config.threshold,
79 operation=config.operation,
80 statistic=config.statistic
81 )
82 ScalingAlarm.create(
83 alarm_id=alarm_uuid,
84 action=config.action,
85 scaling_record=scaling_record
86 )
87 if message.key == 'notify_alarm':
88 content = json.loads(message.value)
89 alarm_id = content['notify_details']['alarm_uuid']
90 alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
91 if alarm:
92 lcm_client = LcmClient()
93 log.info("Sending scaling action message for ns: %s", alarm_id)
94 lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.action)
95 except Exception:
96 log.exception("Error consuming message: ")
97
98 def _get_alarm_configs(self, message_content):
99 scaling_criterias = message_content['scaling_group_descriptor']['scaling_policy']['scaling_criteria']
100 alarm_configs = []
101 for criteria in scaling_criterias:
102 metric_name = ''
103 scale_out_threshold = criteria['scale_out_threshold']
104 scale_in_threshold = criteria['scale_in_threshold']
105 scale_out_operation = criteria['scale_out_relational_operation']
106 scale_in_operation = criteria['scale_in_relational_operation']
107 statistic = criteria['monitoring_param']['aggregation_type']
108 vim_uuid = ''
109 resource_uuid = ''
110 if 'vdu_monitoring_param' in criteria['monitoring_param']:
111 vim_uuid = criteria['monitoring_param']['vdu_monitoring_param']['vim_uuid']
112 resource_uuid = criteria['monitoring_param']['vdu_monitoring_param']['resource_id']
113 metric_name = criteria['monitoring_param']['vdu_monitoring_param']['name']
114 if 'vnf_metric' in criteria['monitoring_param']:
115 # TODO vnf_metric
116 continue
117 if 'vdu_metric' in criteria['monitoring_param']:
118 # TODO vdu_metric
119 continue
120 scale_out_alarm_config = AlarmConfig(metric_name,
121 resource_uuid,
122 vim_uuid,
123 scale_out_threshold,
124 scale_out_operation,
125 statistic,
126 'scale_out')
127 scale_in_alarm_config = AlarmConfig(metric_name,
128 resource_uuid,
129 vim_uuid,
130 scale_in_threshold,
131 scale_in_operation,
132 statistic,
133 'scale_in')
134 alarm_configs.append(scale_in_alarm_config)
135 alarm_configs.append(scale_out_alarm_config)
136 return alarm_configs