d14904def19f94985247e744c0ed21947a819b93
[osm/POL.git] / osm_policy_module / autoscaling / service.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import asyncio
25 import datetime
26 import json
27 import logging
28
29 from osm_policy_module.common.common_db_client import CommonDbClient
30 from osm_policy_module.common.lcm_client import LcmClient
31 from osm_policy_module.common.mon_client import MonClient
32 from osm_policy_module.core import database
33 from osm_policy_module.core.config import Config
34 from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria, \
35 ScalingAlarmRepository, ScalingGroupRepository, ScalingPolicyRepository, ScalingCriteriaRepository
36 from osm_policy_module.core.exceptions import VdurNotFound
37 from osm_policy_module.utils.vnfd import VnfdUtils
38
39 log = logging.getLogger(__name__)
40
41
42 class AutoscalingService:
43
44 def __init__(self, config: Config, loop=None):
45 self.conf = config
46 if not loop:
47 loop = asyncio.get_event_loop()
48 self.loop = loop
49 self.db_client = CommonDbClient(config)
50 self.mon_client = MonClient(config, loop=self.loop)
51 self.lcm_client = LcmClient(config, loop=self.loop)
52
53 async def configure_scaling_groups(self, nsr_id: str):
54 log.info("Configuring scaling groups for network service with nsr_id: %s",
55 nsr_id)
56 alarms_created = []
57 database.db.connect()
58 try:
59 with database.db.atomic() as tx:
60 try:
61 vnfrs = self.db_client.get_vnfrs(nsr_id)
62 for vnfr in vnfrs:
63 log.debug("Processing vnfr: %s", vnfr)
64 vnfd = self.db_client.get_vnfd(vnfr['vnfd-id'])
65 if 'scaling-group-descriptor' not in vnfd:
66 log.debug("No scaling group present in vnfd")
67 continue
68 scaling_groups = vnfd['scaling-group-descriptor']
69 vnf_monitoring_params = vnfd['monitoring-param']
70 for scaling_group in scaling_groups:
71 try:
72 scaling_group_record = ScalingGroupRepository.get(
73 ScalingGroup.nsr_id == nsr_id,
74 ScalingGroup.vnf_member_index == vnfr['member-vnf-index-ref'],
75 ScalingGroup.name == scaling_group['name']
76 )
77 log.debug("Found existing scaling group record in DB...")
78 except ScalingGroup.DoesNotExist:
79 log.debug("Creating scaling group record in DB...")
80 scaling_group_record = ScalingGroupRepository.create(
81 nsr_id=nsr_id,
82 vnf_member_index=vnfr['member-vnf-index-ref'],
83 name=scaling_group['name'],
84 content=json.dumps(scaling_group)
85 )
86 log.debug(
87 "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s",
88 scaling_group_record.nsr_id,
89 scaling_group_record.vnf_member_index,
90 scaling_group_record.name)
91 for scaling_policy in scaling_group['scaling-policy']:
92 if scaling_policy['scaling-type'] != 'automatic':
93 continue
94 try:
95 scaling_policy_record = ScalingPolicyRepository.get(
96 ScalingPolicy.name == scaling_policy['name'],
97 ScalingGroup.id == scaling_group_record.id,
98 join_classes=[ScalingGroup]
99 )
100 log.debug("Found existing scaling policy record in DB...")
101 except ScalingPolicy.DoesNotExist:
102 log.debug("Creating scaling policy record in DB...")
103 scaling_policy_record = ScalingPolicyRepository.create(
104 nsr_id=nsr_id,
105 name=scaling_policy['name'],
106 cooldown_time=scaling_policy['cooldown-time'],
107 scaling_group=scaling_group_record,
108 )
109 if 'scale-in-operation-type' in scaling_policy:
110 scaling_policy_record.scale_in_operation = scaling_policy[
111 'scale-in-operation-type']
112 if 'scale-out-operation-type' in scaling_policy:
113 scaling_policy_record.scale_out_operation = scaling_policy[
114 'scale-out-operation-type']
115 if 'enabled' in scaling_policy:
116 scaling_policy_record.enabled = scaling_policy['enabled']
117 scaling_policy_record.save()
118 log.debug("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
119 scaling_policy_record.name,
120 scaling_policy_record.scaling_group.name)
121
122 for scaling_criteria in scaling_policy['scaling-criteria']:
123 try:
124 scaling_criteria_record = ScalingCriteriaRepository.get(
125 ScalingPolicy.id == scaling_policy_record.id,
126 ScalingCriteria.name == scaling_criteria['name'],
127 join_classes=[ScalingPolicy]
128 )
129 log.debug("Found existing scaling criteria record in DB...")
130 except ScalingCriteria.DoesNotExist:
131 log.debug("Creating scaling criteria record in DB...")
132 scaling_criteria_record = ScalingCriteriaRepository.create(
133 nsr_id=nsr_id,
134 name=scaling_criteria['name'],
135 scaling_policy=scaling_policy_record
136 )
137 log.debug(
138 "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
139 scaling_criteria_record.name,
140 scaling_criteria_record.scaling_policy.name)
141
142 vnf_monitoring_param = next(
143 filter(
144 lambda param: param['id'] == scaling_criteria[
145 'vnf-monitoring-param-ref'
146 ],
147 vnf_monitoring_params)
148 )
149 if 'vdu-monitoring-param' in vnf_monitoring_param:
150 vdurs = list(
151 filter(
152 lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param
153 ['vdu-monitoring-param']
154 ['vdu-ref'],
155 vnfr['vdur']
156 )
157 )
158 elif 'vdu-metric' in vnf_monitoring_param:
159 vdurs = list(
160 filter(
161 lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param
162 ['vdu-metric']
163 ['vdu-ref'],
164 vnfr['vdur']
165 )
166 )
167 elif 'vnf-metric' in vnf_monitoring_param:
168 vdu = VnfdUtils.get_mgmt_vdu(vnfd)
169 vdurs = list(
170 filter(
171 lambda vdur: vdur['vdu-id-ref'] == vdu['id'],
172 vnfr['vdur']
173 )
174 )
175 else:
176 log.warning(
177 "Scaling criteria is referring to a vnf-monitoring-param that does not "
178 "contain a reference to a vdu or vnf metric.")
179 continue
180 for vdur in vdurs:
181 log.debug("Creating alarm for vdur %s ", vdur)
182 try:
183 ScalingAlarmRepository.get(ScalingAlarm.vdu_name == vdur['name'],
184 ScalingCriteria.name == scaling_criteria['name'],
185 ScalingPolicy.name == scaling_policy['name'],
186 ScalingGroup.nsr_id == nsr_id,
187 join_classes=[ScalingCriteria,
188 ScalingPolicy,
189 ScalingGroup])
190 log.debug("vdu %s already has an alarm configured", vdur['name'])
191 continue
192 except ScalingAlarm.DoesNotExist:
193 pass
194 alarm_uuid = await self.mon_client.create_alarm(
195 metric_name=vnf_monitoring_param['id'],
196 ns_id=nsr_id,
197 vdu_name=vdur['name'],
198 vnf_member_index=vnfr['member-vnf-index-ref'],
199 threshold=scaling_criteria['scale-in-threshold'],
200 operation=scaling_criteria['scale-in-relational-operation'],
201 statistic=vnf_monitoring_param['aggregation-type']
202 )
203 alarm = ScalingAlarmRepository.create(
204 alarm_uuid=alarm_uuid,
205 action='scale_in',
206 vnf_member_index=vnfr['member-vnf-index-ref'],
207 vdu_name=vdur['name'],
208 scaling_criteria=scaling_criteria_record
209 )
210 alarms_created.append(alarm)
211 alarm_uuid = await self.mon_client.create_alarm(
212 metric_name=vnf_monitoring_param['id'],
213 ns_id=nsr_id,
214 vdu_name=vdur['name'],
215 vnf_member_index=vnfr['member-vnf-index-ref'],
216 threshold=scaling_criteria['scale-out-threshold'],
217 operation=scaling_criteria['scale-out-relational-operation'],
218 statistic=vnf_monitoring_param['aggregation-type']
219 )
220 alarm = ScalingAlarmRepository.create(
221 alarm_uuid=alarm_uuid,
222 action='scale_out',
223 vnf_member_index=vnfr['member-vnf-index-ref'],
224 vdu_name=vdur['name'],
225 scaling_criteria=scaling_criteria_record
226 )
227 alarms_created.append(alarm)
228
229 except Exception as e:
230 log.exception("Error configuring scaling groups:")
231 tx.rollback()
232 if len(alarms_created) > 0:
233 log.info("Cleaning alarm resources in MON")
234 for alarm in alarms_created:
235 await self.mon_client.delete_alarm(
236 alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
237 alarm.vnf_member_index,
238 alarm.vdu_name,
239 alarm.alarm_uuid)
240 raise e
241 finally:
242 database.db.close()
243
244 async def delete_scaling_groups(self, nsr_id: str):
245 log.debug("Deleting scaling groups for network service %s", nsr_id)
246 database.db.connect()
247 try:
248 with database.db.atomic() as tx:
249 try:
250 for scaling_group in ScalingGroupRepository.list(ScalingGroup.nsr_id == nsr_id):
251 for scaling_policy in scaling_group.scaling_policies:
252 for scaling_criteria in scaling_policy.scaling_criterias:
253 for alarm in scaling_criteria.scaling_alarms:
254 try:
255 await self.mon_client.delete_alarm(
256 alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
257 alarm.vnf_member_index,
258 alarm.vdu_name,
259 alarm.alarm_uuid)
260 except ValueError:
261 log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid)
262 alarm.delete_instance()
263 scaling_criteria.delete_instance()
264 scaling_policy.delete_instance()
265 scaling_group.delete_instance()
266
267 except Exception as e:
268 log.exception("Error deleting scaling groups and alarms:")
269 tx.rollback()
270 raise e
271 finally:
272 database.db.close()
273
274 async def delete_orphaned_alarms(self, nsr_id):
275 log.info("Deleting orphaned scaling alarms for network service %s", nsr_id)
276 database.db.connect()
277 try:
278 with database.db.atomic() as tx:
279 try:
280 for scaling_group in ScalingGroupRepository.list(ScalingGroup.nsr_id == nsr_id):
281 for scaling_policy in scaling_group.scaling_policies:
282 for scaling_criteria in scaling_policy.scaling_criterias:
283 for alarm in scaling_criteria.scaling_alarms:
284 try:
285 self.db_client.get_vdur(nsr_id, alarm.vnf_member_index, alarm.vdu_name)
286 except VdurNotFound:
287 log.debug("Deleting orphaned scaling alarm %s", alarm.alarm_uuid)
288 try:
289 await self.mon_client.delete_alarm(
290 alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
291 alarm.vnf_member_index,
292 alarm.vdu_name,
293 alarm.alarm_uuid)
294 except ValueError:
295 log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid)
296 alarm.delete_instance()
297
298 except Exception as e:
299 log.exception("Error deleting orphaned alarms:")
300 tx.rollback()
301 raise e
302 finally:
303 database.db.close()
304
305 def get_nslcmop(self, nslcmop_id):
306 return self.db_client.get_nslcmop(nslcmop_id)
307
308 async def handle_alarm(self, alarm_uuid: str, status: str):
309 await self.update_alarm_status(alarm_uuid, status)
310 await self.evaluate_policy(alarm_uuid)
311
312 async def update_alarm_status(self, alarm_uuid: str, status: str):
313 database.db.connect()
314 try:
315 with database.db.atomic():
316 alarm = ScalingAlarmRepository.get(ScalingAlarm.alarm_uuid == alarm_uuid)
317 alarm.last_status = status
318 alarm.save()
319 except ScalingAlarm.DoesNotExist:
320 log.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid)
321 finally:
322 database.db.close()
323
324 async def evaluate_policy(self, alarm_uuid):
325 database.db.connect()
326 try:
327 with database.db.atomic():
328 alarm = ScalingAlarmRepository.get(ScalingAlarm.alarm_uuid == alarm_uuid)
329 vnf_member_index = alarm.vnf_member_index
330 action = alarm.action
331 scaling_policy = alarm.scaling_criteria.scaling_policy
332 if not scaling_policy.enabled:
333 return
334 if action == 'scale_in':
335 operation = scaling_policy.scale_in_operation
336 elif action == 'scale_out':
337 operation = scaling_policy.scale_out_operation
338 else:
339 raise Exception('Unknown alarm action {}'.format(alarm.action))
340 alarms = ScalingAlarmRepository.list(ScalingAlarm.scaling_criteria == alarm.scaling_criteria,
341 ScalingAlarm.action == alarm.action,
342 ScalingAlarm.vnf_member_index == vnf_member_index,
343 ScalingAlarm.vdu_name == alarm.vdu_name)
344 statuses = []
345 for alarm in alarms:
346 statuses.append(alarm.last_status)
347 if (operation == 'AND' and set(statuses) == {'alarm'}) or (operation == 'OR' and 'alarm' in statuses):
348 delta = datetime.datetime.now() - scaling_policy.last_scale
349 if delta.total_seconds() > scaling_policy.cooldown_time:
350 log.info("Sending %s action message for ns: %s",
351 alarm.action,
352 scaling_policy.scaling_group.nsr_id)
353 await self.lcm_client.scale(scaling_policy.scaling_group.nsr_id,
354 scaling_policy.scaling_group.name,
355 vnf_member_index,
356 action)
357 scaling_policy.last_scale = datetime.datetime.now()
358 scaling_policy.save()
359
360 except ScalingAlarm.DoesNotExist:
361 log.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid)
362 finally:
363 database.db.close()