Policy Module first commit
[osm/MON.git] / policy_module / osm_policy_module / core / agent.py
1 import json
2 import logging
3
4 from kafka import KafkaConsumer
5 from osm_policy_module.core.config import Config
6 from osm_policy_module.common.lcm_client import LcmClient
7
8 from osm_policy_module.common.alarm_config import AlarmConfig
9 from osm_policy_module.common.mon_client import MonClient
10 from osm_policy_module.core.database import ScalingRecord, ScalingAlarm
11
12 log = logging.getLogger(__name__)
13
14
15 class PolicyModuleAgent:
16 def run(self):
17 cfg = Config.instance()
18 # Initialize servers
19 kafka_server = '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
20 cfg.get('policy_module', 'kafka_server_port'))
21
22 # Initialize Kafka consumer
23 log.info("Connecting to Kafka server at %s", kafka_server)
24 consumer = KafkaConsumer(bootstrap_servers=kafka_server,
25 key_deserializer=bytes.decode,
26 value_deserializer=bytes.decode,
27 group_id="policy-module-agent")
28 consumer.subscribe(['lcm_pm', 'alarm_response'])
29
30 for message in consumer:
31 log.info("Message arrived: %s", message)
32 log.info("Message key: %s", message.key)
33 try:
34 if message.key == 'configure_scaling':
35 content = json.loads(message.value)
36 log.info("Creating scaling record in DB")
37 # TODO: Use transactions: http://docs.peewee-orm.com/en/latest/peewee/transactions.html
38 scaling_record = ScalingRecord.create(
39 nsr_id=content['ns_id'],
40 name=content['scaling_group_descriptor']['name'],
41 content=json.dumps(content)
42 )
43 log.info("Created scaling record in DB : nsr_id=%s, name=%s, content=%s",
44 scaling_record.nsr_id,
45 scaling_record.name,
46 scaling_record.content)
47 alarm_configs = self._get_alarm_configs(content)
48 for config in alarm_configs:
49 mon_client = MonClient()
50 log.info("Creating alarm record in DB")
51 alarm_uuid = mon_client.create_alarm(
52 metric_name=config.metric_name,
53 resource_uuid=config.resource_uuid,
54 vim_uuid=config.vim_uuid,
55 threshold=config.threshold,
56 operation=config.operation,
57 statistic=config.statistic
58 )
59 ScalingAlarm.create(
60 alarm_id=alarm_uuid,
61 action=config.action,
62 scaling_record=scaling_record
63 )
64 if message.key == 'notify_alarm':
65 content = json.loads(message.value)
66 alarm_id = content['notify_details']['alarm_uuid']
67 alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
68 if alarm:
69 lcm_client = LcmClient()
70 log.info("Sending scaling action message: %s", json.dumps(alarm))
71 lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.action)
72 except Exception:
73 log.exception("Error consuming message: ")
74
75 def _get_alarm_configs(self, message_content):
76 scaling_criterias = message_content['scaling_group_descriptor']['scaling_policy']['scaling_criteria']
77 alarm_configs = []
78 for criteria in scaling_criterias:
79 metric_name = ''
80 scale_out_threshold = criteria['scale_out_threshold']
81 scale_in_threshold = criteria['scale_in_threshold']
82 scale_out_operation = criteria['scale_out_relational_operation']
83 scale_in_operation = criteria['scale_in_relational_operation']
84 statistic = criteria['monitoring_param']['aggregation_type']
85 vim_uuid = ''
86 resource_uuid = ''
87 if 'vdu_monitoring_param' in criteria['monitoring_param']:
88 vim_uuid = criteria['monitoring_param']['vdu_monitoring_param']['vim_uuid']
89 resource_uuid = criteria['monitoring_param']['vdu_monitoring_param']['resource_id']
90 metric_name = criteria['monitoring_param']['vdu_monitoring_param']['name']
91 if 'vnf_metric' in criteria['monitoring_param']:
92 # TODO vnf_metric
93 continue
94 if 'vdu_metric' in criteria['monitoring_param']:
95 # TODO vdu_metric
96 continue
97 scale_out_alarm_config = AlarmConfig(metric_name,
98 resource_uuid,
99 vim_uuid,
100 scale_out_threshold,
101 scale_out_operation,
102 statistic,
103 'scale_out')
104 scale_in_alarm_config = AlarmConfig(metric_name,
105 resource_uuid,
106 vim_uuid,
107 scale_in_threshold,
108 scale_in_operation,
109 statistic,
110 'scale_in')
111 alarm_configs.append(scale_in_alarm_config)
112 alarm_configs.append(scale_out_alarm_config)
113 return alarm_configs