1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
26 from typing
import Dict
, List
31 from kafka
import KafkaConsumer
32 from osm_policy_module
.core
.config
import Config
33 from osm_policy_module
.common
.lcm_client
import LcmClient
35 from osm_policy_module
.common
.alarm_config
import AlarmConfig
36 from osm_policy_module
.common
.mon_client
import MonClient
37 from osm_policy_module
.core
.database
import ScalingRecord
, ScalingAlarm
39 log
= logging
.getLogger(__name__
)
42 class PolicyModuleAgent
:
44 cfg
= Config
.instance()
46 kafka_server
= '{}:{}'.format(cfg
.get('policy_module', 'kafka_server_host'),
47 cfg
.get('policy_module', 'kafka_server_port'))
49 # Initialize Kafka consumer
50 log
.info("Connecting to Kafka server at %s", kafka_server
)
51 # TODO: Add logic to handle deduplication of messages when using group_id.
52 # See: https://stackoverflow.com/a/29836412
53 consumer
= KafkaConsumer(bootstrap_servers
=kafka_server
,
54 key_deserializer
=bytes
.decode
,
55 value_deserializer
=bytes
.decode
)
56 consumer
.subscribe(['lcm_pm', 'alarm_response'])
58 for message
in consumer
:
59 log
.info("Message arrived: %s", message
)
61 if message
.key
== 'configure_scaling':
63 content
= json
.loads(message
.value
)
65 content
= yaml
.safe_load(message
.value
)
66 log
.info("Creating scaling record in DB")
67 # TODO: Use transactions: http://docs.peewee-orm.com/en/latest/peewee/transactions.html
68 scaling_record
= ScalingRecord
.create(
69 nsr_id
=content
['ns_id'],
70 name
=content
['scaling_group_descriptor']['name'],
71 content
=json
.dumps(content
)
73 log
.info("Created scaling record in DB : nsr_id=%s, name=%s, content=%s",
74 scaling_record
.nsr_id
,
76 scaling_record
.content
)
77 alarm_configs
= self
._get
_alarm
_configs
(content
)
78 for config
in alarm_configs
:
79 mon_client
= MonClient()
80 log
.info("Creating alarm record in DB")
81 alarm_uuid
= mon_client
.create_alarm(
82 metric_name
=config
.metric_name
,
83 ns_id
=scaling_record
.nsr_id
,
84 vdu_name
=config
.vdu_name
,
85 vnf_member_index
=config
.vnf_member_index
,
86 threshold
=config
.threshold
,
87 operation
=config
.operation
,
88 statistic
=config
.statistic
93 scaling_record
=scaling_record
95 if message
.key
== 'notify_alarm':
96 content
= json
.loads(message
.value
)
97 alarm_id
= content
['notify_details']['alarm_uuid']
98 metric_name
= content
['notify_details']['metric_name']
99 operation
= content
['notify_details']['operation']
100 threshold
= content
['notify_details']['threshold_value']
101 vdu_name
= content
['notify_details']['vdu_name']
102 vnf_member_index
= content
['notify_details']['vnf_member_index']
103 ns_id
= content
['notify_details']['ns_id']
105 "Received alarm notification for alarm %s, \
110 vnf_member_index %s, \
112 alarm_id
, metric_name
, operation
, threshold
, vdu_name
, vnf_member_index
, ns_id
)
114 alarm
= ScalingAlarm
.select().where(ScalingAlarm
.alarm_id
== alarm_id
).get()
115 lcm_client
= LcmClient()
116 log
.info("Sending scaling action message for ns: %s", alarm_id
)
117 lcm_client
.scale(alarm
.scaling_record
.nsr_id
, alarm
.scaling_record
.name
, alarm
.action
)
118 except ScalingAlarm
.DoesNotExist
:
119 log
.info("There is no action configured for alarm %.", alarm_id
)
121 log
.exception("Error consuming message: ")
123 def _get_alarm_configs(self
, message_content
: Dict
) -> List
[AlarmConfig
]:
124 scaling_criterias
= message_content
['scaling_group_descriptor']['scaling_policy']['scaling_criteria']
126 for criteria
in scaling_criterias
:
128 scale_out_threshold
= criteria
['scale_out_threshold']
129 scale_in_threshold
= criteria
['scale_in_threshold']
130 scale_out_operation
= criteria
['scale_out_relational_operation']
131 scale_in_operation
= criteria
['scale_in_relational_operation']
132 statistic
= criteria
['monitoring_param']['aggregation_type']
134 vnf_member_index
= ''
135 if 'vdu_monitoring_param' in criteria
['monitoring_param']:
136 vdu_name
= criteria
['monitoring_param']['vdu_monitoring_param']['vdu_name']
137 vnf_member_index
= criteria
['monitoring_param']['vdu_monitoring_param']['vnf_member_index']
138 metric_name
= criteria
['monitoring_param']['vdu_monitoring_param']['name']
139 if 'vnf_metric' in criteria
['monitoring_param']:
142 if 'vdu_metric' in criteria
['monitoring_param']:
145 scale_out_alarm_config
= AlarmConfig(metric_name
,
152 scale_in_alarm_config
= AlarmConfig(metric_name
,
159 alarm_configs
.append(scale_in_alarm_config
)
160 alarm_configs
.append(scale_out_alarm_config
)