205b98c3131566d61e43be37f51be23fd1155fa2
[osm/POL.git] / osm_policy_module / core / agent.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import asyncio
25 import datetime
26 import json
27 import logging
28 from json import JSONDecodeError
29
30 import peewee
31 import yaml
32 from aiokafka import AIOKafkaConsumer
33
34 from osm_policy_module.common.common_db_client import CommonDbClient
35 from osm_policy_module.common.lcm_client import LcmClient
36 from osm_policy_module.common.mon_client import MonClient
37 from osm_policy_module.core import database
38 from osm_policy_module.core.config import Config
39 from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria, DatabaseManager
40 from osm_policy_module.core.exceptions import VdurNotFound
41 from osm_policy_module.utils.vnfd import VnfdUtils
42
43 log = logging.getLogger(__name__)
44
45 ALLOWED_KAFKA_KEYS = ['instantiated', 'scaled', 'terminated', 'notify_alarm']
46
47
48 class PolicyModuleAgent:
49 def __init__(self, loop=None):
50 cfg = Config.instance()
51 if not loop:
52 loop = asyncio.get_event_loop()
53 self.loop = loop
54 self.db_client = CommonDbClient()
55 self.mon_client = MonClient(loop=self.loop)
56 self.lcm_client = LcmClient(loop=self.loop)
57 self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
58 cfg.OSMPOL_MESSAGE_PORT)
59 self.database_manager = DatabaseManager()
60
61 def run(self):
62 self.loop.run_until_complete(self.start())
63
64 async def start(self):
65 consumer = AIOKafkaConsumer(
66 "ns",
67 "alarm_response",
68 loop=self.loop,
69 bootstrap_servers=self.kafka_server,
70 group_id="pol-consumer",
71 key_deserializer=bytes.decode,
72 value_deserializer=bytes.decode,
73 )
74 await consumer.start()
75 try:
76 async for msg in consumer:
77 log.info("Message arrived: %s", msg)
78 await self._process_msg(msg.topic, msg.key, msg.value)
79 finally:
80 await consumer.stop()
81 log.critical("Exiting...")
82
83 async def _process_msg(self, topic, key, msg):
84 log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg)
85 try:
86 if key in ALLOWED_KAFKA_KEYS:
87 try:
88 content = json.loads(msg)
89 except JSONDecodeError:
90 content = yaml.safe_load(msg)
91
92 if key == 'instantiated' or key == 'scaled':
93 await self._handle_instantiated_or_scaled(content)
94
95 if key == 'terminated':
96 await self._handle_terminated(content)
97
98 if key == 'notify_alarm':
99 await self._handle_alarm_notification(content)
100 else:
101 log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key)
102 except peewee.PeeweeException:
103 log.exception("Database error consuming message: ")
104 raise
105 except Exception:
106 log.exception("Error consuming message: ")
107
108 async def _handle_alarm_notification(self, content):
109 log.debug("_handle_alarm_notification: %s", content)
110 alarm_uuid = content['notify_details']['alarm_uuid']
111 metric_name = content['notify_details']['metric_name']
112 operation = content['notify_details']['operation']
113 threshold = content['notify_details']['threshold_value']
114 vdu_name = content['notify_details']['vdu_name']
115 vnf_member_index = content['notify_details']['vnf_member_index']
116 nsr_id = content['notify_details']['ns_id']
117 log.info(
118 "Received alarm notification for alarm %s, \
119 metric %s, \
120 operation %s, \
121 threshold %s, \
122 vdu_name %s, \
123 vnf_member_index %s, \
124 ns_id %s ",
125 alarm_uuid, metric_name, operation, threshold, vdu_name, vnf_member_index, nsr_id)
126 try:
127 alarm = self.database_manager.get_alarm(alarm_uuid)
128 delta = datetime.datetime.now() - alarm.scaling_criteria.scaling_policy.last_scale
129 log.debug("last_scale: %s", alarm.scaling_criteria.scaling_policy.last_scale)
130 log.debug("now: %s", datetime.datetime.now())
131 log.debug("delta: %s", delta)
132 if delta.total_seconds() < alarm.scaling_criteria.scaling_policy.cooldown_time:
133 log.info("Time between last scale and now is less than cooldown time. Skipping.")
134 return
135 log.info("Sending scaling action message for ns: %s", nsr_id)
136 await self.lcm_client.scale(nsr_id,
137 alarm.scaling_criteria.scaling_policy.scaling_group.name,
138 alarm.vnf_member_index,
139 alarm.action)
140 alarm.scaling_criteria.scaling_policy.last_scale = datetime.datetime.now()
141 alarm.scaling_criteria.scaling_policy.save()
142 except ScalingAlarm.DoesNotExist:
143 log.info("There is no action configured for alarm %s.", alarm_uuid)
144
145 async def _handle_instantiated_or_scaled(self, content):
146 log.debug("_handle_instantiated_or_scaled: %s", content)
147 nslcmop_id = content['nslcmop_id']
148 nslcmop = self.db_client.get_nslcmop(nslcmop_id)
149 if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
150 nsr_id = nslcmop['nsInstanceId']
151 log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
152 await self._configure_scaling_groups(nsr_id)
153 log.info("Checking for orphaned alarms to be deleted for network service with nsr_id: %s", nsr_id)
154 await self._delete_orphaned_alarms(nsr_id)
155 else:
156 log.info(
157 "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
158 "Current state is %s. Skipping...",
159 nslcmop['operationState'])
160
161 async def _handle_terminated(self, content):
162 log.debug("_handle_deleted: %s", content)
163 nsr_id = content['nsr_id']
164 if content['operationState'] == 'COMPLETED' or content['operationState'] == 'PARTIALLY_COMPLETED':
165 log.info("Deleting scaling groups and alarms for network service with nsr_id: %s", nsr_id)
166 await self._delete_scaling_groups(nsr_id)
167 else:
168 log.info(
169 "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
170 "Current state is %s. Skipping...",
171 content['operationState'])
172
173 async def _configure_scaling_groups(self, nsr_id: str):
174 log.debug("_configure_scaling_groups: %s", nsr_id)
175 alarms_created = []
176 with database.db.atomic() as tx:
177 try:
178 vnfrs = self.db_client.get_vnfrs(nsr_id)
179 for vnfr in vnfrs:
180 log.info("Processing vnfr: %s", vnfr)
181 vnfd = self.db_client.get_vnfd(vnfr['vnfd-id'])
182 log.info("Looking for vnfd %s", vnfr['vnfd-id'])
183 if 'scaling-group-descriptor' not in vnfd:
184 continue
185 scaling_groups = vnfd['scaling-group-descriptor']
186 vnf_monitoring_params = vnfd['monitoring-param']
187 for scaling_group in scaling_groups:
188 try:
189 scaling_group_record = ScalingGroup.select().where(
190 ScalingGroup.nsr_id == nsr_id,
191 ScalingGroup.vnf_member_index == int(vnfr['member-vnf-index-ref']),
192 ScalingGroup.name == scaling_group['name']
193 ).get()
194 log.info("Found existing scaling group record in DB...")
195 except ScalingGroup.DoesNotExist:
196 log.info("Creating scaling group record in DB...")
197 scaling_group_record = ScalingGroup.create(
198 nsr_id=nsr_id,
199 vnf_member_index=vnfr['member-vnf-index-ref'],
200 name=scaling_group['name'],
201 content=json.dumps(scaling_group)
202 )
203 log.info(
204 "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s",
205 scaling_group_record.nsr_id,
206 scaling_group_record.vnf_member_index,
207 scaling_group_record.name)
208 for scaling_policy in scaling_group['scaling-policy']:
209 if scaling_policy['scaling-type'] != 'automatic':
210 continue
211 try:
212 scaling_policy_record = ScalingPolicy.select().join(ScalingGroup).where(
213 ScalingPolicy.name == scaling_policy['name'],
214 ScalingGroup.id == scaling_group_record.id
215 ).get()
216 log.info("Found existing scaling policy record in DB...")
217 except ScalingPolicy.DoesNotExist:
218 log.info("Creating scaling policy record in DB...")
219 scaling_policy_record = ScalingPolicy.create(
220 nsr_id=nsr_id,
221 name=scaling_policy['name'],
222 cooldown_time=scaling_policy['cooldown-time'],
223 scaling_group=scaling_group_record
224 )
225 log.info("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
226 scaling_policy_record.name,
227 scaling_policy_record.scaling_group.name)
228
229 for scaling_criteria in scaling_policy['scaling-criteria']:
230 try:
231 scaling_criteria_record = ScalingCriteria.select().join(ScalingPolicy).where(
232 ScalingPolicy.id == scaling_policy_record.id,
233 ScalingCriteria.name == scaling_criteria['name']
234 ).get()
235 log.info("Found existing scaling criteria record in DB...")
236 except ScalingCriteria.DoesNotExist:
237 log.info("Creating scaling criteria record in DB...")
238 scaling_criteria_record = ScalingCriteria.create(
239 nsr_id=nsr_id,
240 name=scaling_criteria['name'],
241 scaling_policy=scaling_policy_record
242 )
243 log.info(
244 "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
245 scaling_criteria_record.name,
246 scaling_criteria_record.scaling_policy.name)
247
248 vnf_monitoring_param = next(
249 filter(
250 lambda param: param['id'] == scaling_criteria[
251 'vnf-monitoring-param-ref'
252 ],
253 vnf_monitoring_params)
254 )
255 if 'vdu-monitoring-param' in vnf_monitoring_param:
256 vdurs = list(
257 filter(
258 lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param
259 ['vdu-monitoring-param']
260 ['vdu-ref'],
261 vnfr['vdur']
262 )
263 )
264 elif 'vdu-metric' in vnf_monitoring_param:
265 vdurs = list(
266 filter(
267 lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param
268 ['vdu-metric']
269 ['vdu-ref'],
270 vnfr['vdur']
271 )
272 )
273 elif 'vnf-metric' in vnf_monitoring_param:
274 vdu = VnfdUtils.get_mgmt_vdu(vnfd)
275 vdurs = list(
276 filter(
277 lambda vdur: vdur['vdu-id-ref'] == vdu['id'],
278 vnfr['vdur']
279 )
280 )
281 else:
282 log.warning(
283 "Scaling criteria is referring to a vnf-monitoring-param that does not "
284 "contain a reference to a vdu or vnf metric.")
285 continue
286 for vdur in vdurs:
287 log.info("Creating alarm for vdur %s ", vdur)
288 try:
289 (ScalingAlarm.select()
290 .join(ScalingCriteria)
291 .join(ScalingPolicy)
292 .join(ScalingGroup)
293 .where(
294 ScalingAlarm.vdu_name == vdur['name'],
295 ScalingCriteria.name == scaling_criteria['name'],
296 ScalingPolicy.name == scaling_policy['name'],
297 ScalingGroup.nsr_id == nsr_id
298 ).get())
299 log.debug("vdu %s already has an alarm configured", vdur['name'])
300 continue
301 except ScalingAlarm.DoesNotExist:
302 pass
303 alarm_uuid = await self.mon_client.create_alarm(
304 metric_name=vnf_monitoring_param['id'],
305 ns_id=nsr_id,
306 vdu_name=vdur['name'],
307 vnf_member_index=vnfr['member-vnf-index-ref'],
308 threshold=scaling_criteria['scale-in-threshold'],
309 operation=scaling_criteria['scale-in-relational-operation'],
310 statistic=vnf_monitoring_param['aggregation-type']
311 )
312 alarm = ScalingAlarm.create(
313 alarm_uuid=alarm_uuid,
314 action='scale_in',
315 vnf_member_index=int(vnfr['member-vnf-index-ref']),
316 vdu_name=vdur['name'],
317 scaling_criteria=scaling_criteria_record
318 )
319 alarms_created.append(alarm)
320 alarm_uuid = await self.mon_client.create_alarm(
321 metric_name=vnf_monitoring_param['id'],
322 ns_id=nsr_id,
323 vdu_name=vdur['name'],
324 vnf_member_index=vnfr['member-vnf-index-ref'],
325 threshold=scaling_criteria['scale-out-threshold'],
326 operation=scaling_criteria['scale-out-relational-operation'],
327 statistic=vnf_monitoring_param['aggregation-type']
328 )
329 alarm = ScalingAlarm.create(
330 alarm_uuid=alarm_uuid,
331 action='scale_out',
332 vnf_member_index=int(vnfr['member-vnf-index-ref']),
333 vdu_name=vdur['name'],
334 scaling_criteria=scaling_criteria_record
335 )
336 alarms_created.append(alarm)
337
338 except Exception as e:
339 log.exception("Error configuring scaling groups:")
340 tx.rollback()
341 if len(alarms_created) > 0:
342 log.info("Cleaning alarm resources in MON")
343 for alarm in alarms_created:
344 await self.mon_client.delete_alarm(alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
345 alarm.vnf_member_index,
346 alarm.vdu_name,
347 alarm.alarm_uuid)
348 raise e
349
350 async def _delete_scaling_groups(self, nsr_id: str):
351 with database.db.atomic() as tx:
352 try:
353 for scaling_group in ScalingGroup.select().where(ScalingGroup.nsr_id == nsr_id):
354 for scaling_policy in scaling_group.scaling_policies:
355 for scaling_criteria in scaling_policy.scaling_criterias:
356 for alarm in scaling_criteria.scaling_alarms:
357 try:
358 await self.mon_client.delete_alarm(
359 alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
360 alarm.vnf_member_index,
361 alarm.vdu_name,
362 alarm.alarm_uuid)
363 except ValueError:
364 log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid)
365 alarm.delete_instance()
366 scaling_criteria.delete_instance()
367 scaling_policy.delete_instance()
368 scaling_group.delete_instance()
369
370 except Exception as e:
371 log.exception("Error deleting scaling groups and alarms:")
372 tx.rollback()
373 raise e
374
375 async def _delete_orphaned_alarms(self, nsr_id):
376 with database.db.atomic() as tx:
377 try:
378 for scaling_group in ScalingGroup.select().where(ScalingGroup.nsr_id == nsr_id):
379 for scaling_policy in scaling_group.scaling_policies:
380 for scaling_criteria in scaling_policy.scaling_criterias:
381 for alarm in scaling_criteria.scaling_alarms:
382 try:
383 self.db_client.get_vdur(nsr_id, alarm.vnf_member_index, alarm.vdu_name)
384 except VdurNotFound:
385 log.info("Deleting orphaned alarm %s", alarm.alarm_uuid)
386 try:
387 await self.mon_client.delete_alarm(
388 alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
389 alarm.vnf_member_index,
390 alarm.vdu_name,
391 alarm.alarm_uuid)
392 except ValueError:
393 log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid)
394 alarm.delete_instance()
395
396 except Exception as e:
397 log.exception("Error deleting orphaned alarms:")
398 tx.rollback()
399 raise e