Refactors alarm creation to comply with changes regarding the use of tags instead...
[osm/POL.git] / osm_policy_module / autoscaling / service.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import asyncio
25 import datetime
26 import json
27 import logging
28 from typing import List
29
30 from osm_policy_module.common.common_db_client import CommonDbClient
31 from osm_policy_module.common.lcm_client import LcmClient
32 from osm_policy_module.common.mon_client import MonClient
33 from osm_policy_module.core import database
34 from osm_policy_module.core.config import Config
35 from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria, \
36 ScalingAlarmRepository, ScalingGroupRepository, ScalingPolicyRepository, ScalingCriteriaRepository
37 from osm_policy_module.core.exceptions import VdurNotFound
38 from osm_policy_module.utils.vnfd import VnfdUtils
39
40 log = logging.getLogger(__name__)
41
42
43 class AutoscalingService:
44
45 def __init__(self, config: Config, loop=None):
46 self.conf = config
47 if not loop:
48 loop = asyncio.get_event_loop()
49 self.loop = loop
50 self.db_client = CommonDbClient(config)
51 self.mon_client = MonClient(config, loop=self.loop)
52 self.lcm_client = LcmClient(config, loop=self.loop)
53
54 async def configure_scaling_groups(self, nsr_id: str):
55 """
56 Configures scaling groups for a network service. Creates records in DB. Creates alarms in MON.
57 :param nsr_id: Network service record id
58 :return:
59 """
60 log.info("Configuring scaling groups for network service with nsr_id: %s",
61 nsr_id)
62 alarms_created = []
63 database.db.connect()
64 try:
65 with database.db.atomic() as tx:
66 try:
67 vnfrs = self.db_client.get_vnfrs(nsr_id)
68 for vnfr in vnfrs:
69 log.debug("Processing vnfr: %s", vnfr)
70 vnfd = self.db_client.get_vnfd(vnfr['vnfd-id'])
71 if 'scaling-group-descriptor' not in vnfd:
72 log.debug("No scaling group present in vnfd")
73 continue
74 scaling_groups = vnfd['scaling-group-descriptor']
75 vnf_monitoring_params = vnfd['monitoring-param']
76 for scaling_group in scaling_groups:
77 scaling_group_record = self._get_or_create_scaling_group(nsr_id,
78 vnfr['member-vnf-index-ref'],
79 scaling_group)
80 for scaling_policy in scaling_group['scaling-policy']:
81 if scaling_policy['scaling-type'] != 'automatic':
82 continue
83 scaling_policy_record = self._get_or_create_scaling_policy(nsr_id,
84 scaling_policy,
85 scaling_group_record)
86
87 for scaling_criteria in scaling_policy['scaling-criteria']:
88 scaling_criteria_record = self._get_or_create_scaling_criteria(
89 nsr_id,
90 scaling_criteria,
91 scaling_policy_record
92 )
93 vnf_monitoring_param = next(
94 filter(
95 lambda param: param['id'] == scaling_criteria[
96 'vnf-monitoring-param-ref'
97 ],
98 vnf_monitoring_params)
99 )
100 vdurs = self._get_monitored_vdurs(vnf_monitoring_param, vnfr['vdur'], vnfd)
101 for vdur in vdurs:
102 log.debug("Creating alarm for vdur %s ", vdur)
103 try:
104 ScalingAlarmRepository.get(ScalingAlarm.vdu_name == vdur['name'],
105 ScalingCriteria.name == scaling_criteria['name'],
106 ScalingPolicy.name == scaling_policy['name'],
107 ScalingGroup.nsr_id == nsr_id,
108 join_classes=[ScalingCriteria,
109 ScalingPolicy,
110 ScalingGroup])
111 log.debug("vdu %s already has an alarm configured", vdur['name'])
112 continue
113 except ScalingAlarm.DoesNotExist:
114 pass
115 metric_name = self._get_metric_name(vnf_monitoring_param, vdur, vnfd)
116 alarm_uuid = await self.mon_client.create_alarm(
117 metric_name=metric_name,
118 ns_id=nsr_id,
119 vdu_name=vdur['name'],
120 vnf_member_index=vnfr['member-vnf-index-ref'],
121 threshold=scaling_criteria['scale-in-threshold'],
122 operation=scaling_criteria['scale-in-relational-operation'],
123 statistic=vnf_monitoring_param['aggregation-type']
124 )
125 alarm = ScalingAlarmRepository.create(
126 alarm_uuid=alarm_uuid,
127 action='scale_in',
128 vnf_member_index=vnfr['member-vnf-index-ref'],
129 vdu_name=vdur['name'],
130 scaling_criteria=scaling_criteria_record
131 )
132 alarms_created.append(alarm)
133 alarm_uuid = await self.mon_client.create_alarm(
134 metric_name=metric_name,
135 ns_id=nsr_id,
136 vdu_name=vdur['name'],
137 vnf_member_index=vnfr['member-vnf-index-ref'],
138 threshold=scaling_criteria['scale-out-threshold'],
139 operation=scaling_criteria['scale-out-relational-operation'],
140 statistic=vnf_monitoring_param['aggregation-type']
141 )
142 alarm = ScalingAlarmRepository.create(
143 alarm_uuid=alarm_uuid,
144 action='scale_out',
145 vnf_member_index=vnfr['member-vnf-index-ref'],
146 vdu_name=vdur['name'],
147 scaling_criteria=scaling_criteria_record
148 )
149 alarms_created.append(alarm)
150
151 except Exception as e:
152 log.exception("Error configuring scaling groups:")
153 tx.rollback()
154 if len(alarms_created) > 0:
155 log.info("Cleaning alarm resources in MON")
156 for alarm in alarms_created:
157 await self.mon_client.delete_alarm(
158 alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
159 alarm.vnf_member_index,
160 alarm.vdu_name,
161 alarm.alarm_uuid)
162 raise e
163 finally:
164 database.db.close()
165
166 async def delete_scaling_groups(self, nsr_id: str):
167 log.debug("Deleting scaling groups for network service %s", nsr_id)
168 database.db.connect()
169 try:
170 with database.db.atomic() as tx:
171 try:
172 for scaling_group in ScalingGroupRepository.list(ScalingGroup.nsr_id == nsr_id):
173 for scaling_policy in scaling_group.scaling_policies:
174 for scaling_criteria in scaling_policy.scaling_criterias:
175 for alarm in scaling_criteria.scaling_alarms:
176 try:
177 await self.mon_client.delete_alarm(
178 alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
179 alarm.vnf_member_index,
180 alarm.vdu_name,
181 alarm.alarm_uuid)
182 except ValueError:
183 log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid)
184 alarm.delete_instance()
185 scaling_criteria.delete_instance()
186 scaling_policy.delete_instance()
187 scaling_group.delete_instance()
188
189 except Exception as e:
190 log.exception("Error deleting scaling groups and alarms:")
191 tx.rollback()
192 raise e
193 finally:
194 database.db.close()
195
196 async def delete_orphaned_alarms(self, nsr_id):
197 log.info("Deleting orphaned scaling alarms for network service %s", nsr_id)
198 database.db.connect()
199 try:
200 with database.db.atomic() as tx:
201 try:
202 for scaling_group in ScalingGroupRepository.list(ScalingGroup.nsr_id == nsr_id):
203 for scaling_policy in scaling_group.scaling_policies:
204 for scaling_criteria in scaling_policy.scaling_criterias:
205 for alarm in scaling_criteria.scaling_alarms:
206 try:
207 self.db_client.get_vdur(nsr_id, alarm.vnf_member_index, alarm.vdu_name)
208 except VdurNotFound:
209 log.debug("Deleting orphaned scaling alarm %s", alarm.alarm_uuid)
210 try:
211 await self.mon_client.delete_alarm(
212 alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
213 alarm.vnf_member_index,
214 alarm.vdu_name,
215 alarm.alarm_uuid)
216 except ValueError:
217 log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid)
218 alarm.delete_instance()
219
220 except Exception as e:
221 log.exception("Error deleting orphaned alarms:")
222 tx.rollback()
223 raise e
224 finally:
225 database.db.close()
226
227 async def handle_alarm(self, alarm_uuid: str, status: str):
228 await self.update_alarm_status(alarm_uuid, status)
229 await self.evaluate_policy(alarm_uuid)
230
231 async def update_alarm_status(self, alarm_uuid: str, status: str):
232 database.db.connect()
233 try:
234 with database.db.atomic():
235 alarm = ScalingAlarmRepository.get(ScalingAlarm.alarm_uuid == alarm_uuid)
236 alarm.last_status = status
237 alarm.save()
238 except ScalingAlarm.DoesNotExist:
239 log.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid)
240 finally:
241 database.db.close()
242
243 async def evaluate_policy(self, alarm_uuid):
244 database.db.connect()
245 try:
246 with database.db.atomic():
247 alarm = ScalingAlarmRepository.get(ScalingAlarm.alarm_uuid == alarm_uuid)
248 vnf_member_index = alarm.vnf_member_index
249 action = alarm.action
250 scaling_policy = alarm.scaling_criteria.scaling_policy
251 if not scaling_policy.enabled:
252 return
253 if action == 'scale_in':
254 operation = scaling_policy.scale_in_operation
255 elif action == 'scale_out':
256 operation = scaling_policy.scale_out_operation
257 else:
258 raise Exception('Unknown alarm action {}'.format(alarm.action))
259 alarms = ScalingAlarmRepository.list(ScalingAlarm.scaling_criteria == alarm.scaling_criteria,
260 ScalingAlarm.action == alarm.action,
261 ScalingAlarm.vnf_member_index == vnf_member_index,
262 ScalingAlarm.vdu_name == alarm.vdu_name)
263 statuses = []
264 for alarm in alarms:
265 statuses.append(alarm.last_status)
266 if (operation == 'AND' and set(statuses) == {'alarm'}) or (operation == 'OR' and 'alarm' in statuses):
267 delta = datetime.datetime.now() - scaling_policy.last_scale
268 if delta.total_seconds() > scaling_policy.cooldown_time:
269 log.info("Sending %s action message for ns: %s",
270 alarm.action,
271 scaling_policy.scaling_group.nsr_id)
272 await self.lcm_client.scale(scaling_policy.scaling_group.nsr_id,
273 scaling_policy.scaling_group.name,
274 vnf_member_index,
275 action)
276 scaling_policy.last_scale = datetime.datetime.now()
277 scaling_policy.save()
278
279 except ScalingAlarm.DoesNotExist:
280 log.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid)
281 finally:
282 database.db.close()
283
284 def _get_or_create_scaling_group(self, nsr_id: str, vnf_member_index: str, scaling_group: dict):
285 try:
286 scaling_group_record = ScalingGroupRepository.get(
287 ScalingGroup.nsr_id == nsr_id,
288 ScalingGroup.vnf_member_index == vnf_member_index,
289 ScalingGroup.name == scaling_group['name']
290 )
291 log.debug("Found existing scaling group record in DB...")
292 except ScalingGroup.DoesNotExist:
293 log.debug("Creating scaling group record in DB...")
294 scaling_group_record = ScalingGroupRepository.create(
295 nsr_id=nsr_id,
296 vnf_member_index=vnf_member_index,
297 name=scaling_group['name'],
298 content=json.dumps(scaling_group)
299 )
300 log.debug(
301 "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s",
302 scaling_group_record.nsr_id,
303 scaling_group_record.vnf_member_index,
304 scaling_group_record.name)
305 return scaling_group_record
306
307 def _get_or_create_scaling_policy(self, nsr_id: str, scaling_policy: dict, scaling_group_record: ScalingGroup):
308 try:
309 scaling_policy_record = ScalingPolicyRepository.get(
310 ScalingPolicy.name == scaling_policy['name'],
311 ScalingGroup.id == scaling_group_record.id,
312 join_classes=[ScalingGroup]
313 )
314 log.debug("Found existing scaling policy record in DB...")
315 except ScalingPolicy.DoesNotExist:
316 log.debug("Creating scaling policy record in DB...")
317 scaling_policy_record = ScalingPolicyRepository.create(
318 nsr_id=nsr_id,
319 name=scaling_policy['name'],
320 cooldown_time=scaling_policy['cooldown-time'],
321 scaling_group=scaling_group_record,
322 )
323 if 'scale-in-operation-type' in scaling_policy:
324 scaling_policy_record.scale_in_operation = scaling_policy[
325 'scale-in-operation-type']
326 if 'scale-out-operation-type' in scaling_policy:
327 scaling_policy_record.scale_out_operation = scaling_policy[
328 'scale-out-operation-type']
329 if 'enabled' in scaling_policy:
330 scaling_policy_record.enabled = scaling_policy['enabled']
331 scaling_policy_record.save()
332 log.debug("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
333 scaling_policy_record.name,
334 scaling_policy_record.scaling_group.name)
335 return scaling_policy_record
336
337 def _get_or_create_scaling_criteria(self, nsr_id: str, scaling_criteria: dict,
338 scaling_policy_record: ScalingPolicy):
339 try:
340 scaling_criteria_record = ScalingCriteriaRepository.get(
341 ScalingPolicy.id == scaling_policy_record.id,
342 ScalingCriteria.name == scaling_criteria['name'],
343 join_classes=[ScalingPolicy]
344 )
345 log.debug("Found existing scaling criteria record in DB...")
346 except ScalingCriteria.DoesNotExist:
347 log.debug("Creating scaling criteria record in DB...")
348 scaling_criteria_record = ScalingCriteriaRepository.create(
349 nsr_id=nsr_id,
350 name=scaling_criteria['name'],
351 scaling_policy=scaling_policy_record
352 )
353 log.debug(
354 "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
355 scaling_criteria_record.name,
356 scaling_criteria_record.scaling_policy.name)
357 return scaling_criteria_record
358
359 def _get_monitored_vdurs(self, vnf_monitoring_param: dict, vdurs: List[dict], vnfd: dict):
360 monitored_vdurs = []
361 if 'vdu-monitoring-param' in vnf_monitoring_param:
362 monitored_vdurs = list(
363 filter(
364 lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param
365 ['vdu-monitoring-param']
366 ['vdu-ref'],
367 vdurs
368 )
369 )
370 elif 'vdu-metric' in vnf_monitoring_param:
371 monitored_vdurs = list(
372 filter(
373 lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param
374 ['vdu-metric']
375 ['vdu-ref'],
376 vdurs
377 )
378 )
379 elif 'vnf-metric' in vnf_monitoring_param:
380 vdu = VnfdUtils.get_mgmt_vdu(vnfd)
381 monitored_vdurs = list(
382 filter(
383 lambda vdur: vdur['vdu-id-ref'] == vdu['id'],
384 vdurs
385 )
386 )
387 else:
388 log.warning(
389 "Scaling criteria is referring to a vnf-monitoring-param that does not "
390 "contain a reference to a vdu or vnf metric.")
391 return monitored_vdurs
392
393 def _get_metric_name(self, vnf_monitoring_param: dict, vdur: dict, vnfd: dict):
394 vdu = next(
395 filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu'])
396 )
397 if 'vdu-monitoring-param' in vnf_monitoring_param:
398 vdu_monitoring_param = next(filter(
399 lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param'][
400 'vdu-monitoring-param-ref'], vdu['monitoring-param']))
401 nfvi_metric = vdu_monitoring_param['nfvi-metric']
402 return nfvi_metric
403 if 'vdu-metric' in vnf_monitoring_param:
404 vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref']
405 return vnf_metric_name
406 if 'vnf-metric' in vnf_monitoring_param:
407 vnf_metric_name = vnf_monitoring_param['vnf-metric']['vnf-metric-name-ref']
408 return vnf_metric_name
409 raise ValueError('No metric name found for vnf_monitoring_param %s' % vnf_monitoring_param['id'])