Adds projects_read and projects_write params to scaling nslcmop
[osm/POL.git] / osm_policy_module / autoscaling / service.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import asyncio
25 import datetime
26 import json
27 import logging
28
29 from osm_policy_module.common.common_db_client import CommonDbClient
30 from osm_policy_module.common.lcm_client import LcmClient
31 from osm_policy_module.common.mon_client import MonClient
32 from osm_policy_module.core import database
33 from osm_policy_module.core.config import Config
34 from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria, \
35 ScalingAlarmRepository, ScalingGroupRepository, ScalingPolicyRepository, ScalingCriteriaRepository
36 from osm_policy_module.core.exceptions import VdurNotFound
37 from osm_policy_module.utils.vnfd import VnfdUtils
38
39 log = logging.getLogger(__name__)
40
41
42 class AutoscalingService:
43
44 def __init__(self, config: Config, loop=None):
45 self.conf = config
46 if not loop:
47 loop = asyncio.get_event_loop()
48 self.loop = loop
49 self.db_client = CommonDbClient(config)
50 self.mon_client = MonClient(config, loop=self.loop)
51 self.lcm_client = LcmClient(config, loop=self.loop)
52
53 async def configure_scaling_groups(self, nsr_id: str):
54 """
55 Configures scaling groups for a network service. Creates records in DB. Creates alarms in MON.
56 :param nsr_id: Network service record id
57 :return:
58 """
59 log.info("Configuring scaling groups for network service with nsr_id: %s",
60 nsr_id)
61 alarms_created = []
62 database.db.connect()
63 try:
64 with database.db.atomic() as tx:
65 try:
66 vnfrs = self.db_client.get_vnfrs(nsr_id)
67 for vnfr in vnfrs:
68 log.debug("Processing vnfr: %s", vnfr)
69 vnfd = self.db_client.get_vnfd(vnfr['vnfd-id'])
70 if 'scaling-group-descriptor' not in vnfd:
71 log.debug("No scaling group present in vnfd")
72 continue
73 scaling_groups = vnfd['scaling-group-descriptor']
74 vnf_monitoring_params = vnfd['monitoring-param']
75 for scaling_group in scaling_groups:
76 try:
77 scaling_group_record = ScalingGroupRepository.get(
78 ScalingGroup.nsr_id == nsr_id,
79 ScalingGroup.vnf_member_index == vnfr['member-vnf-index-ref'],
80 ScalingGroup.name == scaling_group['name']
81 )
82 log.debug("Found existing scaling group record in DB...")
83 except ScalingGroup.DoesNotExist:
84 log.debug("Creating scaling group record in DB...")
85 scaling_group_record = ScalingGroupRepository.create(
86 nsr_id=nsr_id,
87 vnf_member_index=vnfr['member-vnf-index-ref'],
88 name=scaling_group['name'],
89 content=json.dumps(scaling_group)
90 )
91 log.debug(
92 "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s",
93 scaling_group_record.nsr_id,
94 scaling_group_record.vnf_member_index,
95 scaling_group_record.name)
96 for scaling_policy in scaling_group['scaling-policy']:
97 if scaling_policy['scaling-type'] != 'automatic':
98 continue
99 try:
100 scaling_policy_record = ScalingPolicyRepository.get(
101 ScalingPolicy.name == scaling_policy['name'],
102 ScalingGroup.id == scaling_group_record.id,
103 join_classes=[ScalingGroup]
104 )
105 log.debug("Found existing scaling policy record in DB...")
106 except ScalingPolicy.DoesNotExist:
107 log.debug("Creating scaling policy record in DB...")
108 scaling_policy_record = ScalingPolicyRepository.create(
109 nsr_id=nsr_id,
110 name=scaling_policy['name'],
111 cooldown_time=scaling_policy['cooldown-time'],
112 scaling_group=scaling_group_record,
113 )
114 if 'scale-in-operation-type' in scaling_policy:
115 scaling_policy_record.scale_in_operation = scaling_policy[
116 'scale-in-operation-type']
117 if 'scale-out-operation-type' in scaling_policy:
118 scaling_policy_record.scale_out_operation = scaling_policy[
119 'scale-out-operation-type']
120 if 'enabled' in scaling_policy:
121 scaling_policy_record.enabled = scaling_policy['enabled']
122 scaling_policy_record.save()
123 log.debug("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
124 scaling_policy_record.name,
125 scaling_policy_record.scaling_group.name)
126
127 for scaling_criteria in scaling_policy['scaling-criteria']:
128 try:
129 scaling_criteria_record = ScalingCriteriaRepository.get(
130 ScalingPolicy.id == scaling_policy_record.id,
131 ScalingCriteria.name == scaling_criteria['name'],
132 join_classes=[ScalingPolicy]
133 )
134 log.debug("Found existing scaling criteria record in DB...")
135 except ScalingCriteria.DoesNotExist:
136 log.debug("Creating scaling criteria record in DB...")
137 scaling_criteria_record = ScalingCriteriaRepository.create(
138 nsr_id=nsr_id,
139 name=scaling_criteria['name'],
140 scaling_policy=scaling_policy_record
141 )
142 log.debug(
143 "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
144 scaling_criteria_record.name,
145 scaling_criteria_record.scaling_policy.name)
146
147 vnf_monitoring_param = next(
148 filter(
149 lambda param: param['id'] == scaling_criteria[
150 'vnf-monitoring-param-ref'
151 ],
152 vnf_monitoring_params)
153 )
154 if 'vdu-monitoring-param' in vnf_monitoring_param:
155 vdurs = list(
156 filter(
157 lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param
158 ['vdu-monitoring-param']
159 ['vdu-ref'],
160 vnfr['vdur']
161 )
162 )
163 elif 'vdu-metric' in vnf_monitoring_param:
164 vdurs = list(
165 filter(
166 lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param
167 ['vdu-metric']
168 ['vdu-ref'],
169 vnfr['vdur']
170 )
171 )
172 elif 'vnf-metric' in vnf_monitoring_param:
173 vdu = VnfdUtils.get_mgmt_vdu(vnfd)
174 vdurs = list(
175 filter(
176 lambda vdur: vdur['vdu-id-ref'] == vdu['id'],
177 vnfr['vdur']
178 )
179 )
180 else:
181 log.warning(
182 "Scaling criteria is referring to a vnf-monitoring-param that does not "
183 "contain a reference to a vdu or vnf metric.")
184 continue
185 for vdur in vdurs:
186 log.debug("Creating alarm for vdur %s ", vdur)
187 try:
188 ScalingAlarmRepository.get(ScalingAlarm.vdu_name == vdur['name'],
189 ScalingCriteria.name == scaling_criteria['name'],
190 ScalingPolicy.name == scaling_policy['name'],
191 ScalingGroup.nsr_id == nsr_id,
192 join_classes=[ScalingCriteria,
193 ScalingPolicy,
194 ScalingGroup])
195 log.debug("vdu %s already has an alarm configured", vdur['name'])
196 continue
197 except ScalingAlarm.DoesNotExist:
198 pass
199 alarm_uuid = await self.mon_client.create_alarm(
200 metric_name=vnf_monitoring_param['id'],
201 ns_id=nsr_id,
202 vdu_name=vdur['name'],
203 vnf_member_index=vnfr['member-vnf-index-ref'],
204 threshold=scaling_criteria['scale-in-threshold'],
205 operation=scaling_criteria['scale-in-relational-operation'],
206 statistic=vnf_monitoring_param['aggregation-type']
207 )
208 alarm = ScalingAlarmRepository.create(
209 alarm_uuid=alarm_uuid,
210 action='scale_in',
211 vnf_member_index=vnfr['member-vnf-index-ref'],
212 vdu_name=vdur['name'],
213 scaling_criteria=scaling_criteria_record
214 )
215 alarms_created.append(alarm)
216 alarm_uuid = await self.mon_client.create_alarm(
217 metric_name=vnf_monitoring_param['id'],
218 ns_id=nsr_id,
219 vdu_name=vdur['name'],
220 vnf_member_index=vnfr['member-vnf-index-ref'],
221 threshold=scaling_criteria['scale-out-threshold'],
222 operation=scaling_criteria['scale-out-relational-operation'],
223 statistic=vnf_monitoring_param['aggregation-type']
224 )
225 alarm = ScalingAlarmRepository.create(
226 alarm_uuid=alarm_uuid,
227 action='scale_out',
228 vnf_member_index=vnfr['member-vnf-index-ref'],
229 vdu_name=vdur['name'],
230 scaling_criteria=scaling_criteria_record
231 )
232 alarms_created.append(alarm)
233
234 except Exception as e:
235 log.exception("Error configuring scaling groups:")
236 tx.rollback()
237 if len(alarms_created) > 0:
238 log.info("Cleaning alarm resources in MON")
239 for alarm in alarms_created:
240 await self.mon_client.delete_alarm(
241 alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
242 alarm.vnf_member_index,
243 alarm.vdu_name,
244 alarm.alarm_uuid)
245 raise e
246 finally:
247 database.db.close()
248
249 async def delete_scaling_groups(self, nsr_id: str):
250 log.debug("Deleting scaling groups for network service %s", nsr_id)
251 database.db.connect()
252 try:
253 with database.db.atomic() as tx:
254 try:
255 for scaling_group in ScalingGroupRepository.list(ScalingGroup.nsr_id == nsr_id):
256 for scaling_policy in scaling_group.scaling_policies:
257 for scaling_criteria in scaling_policy.scaling_criterias:
258 for alarm in scaling_criteria.scaling_alarms:
259 try:
260 await self.mon_client.delete_alarm(
261 alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
262 alarm.vnf_member_index,
263 alarm.vdu_name,
264 alarm.alarm_uuid)
265 except ValueError:
266 log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid)
267 alarm.delete_instance()
268 scaling_criteria.delete_instance()
269 scaling_policy.delete_instance()
270 scaling_group.delete_instance()
271
272 except Exception as e:
273 log.exception("Error deleting scaling groups and alarms:")
274 tx.rollback()
275 raise e
276 finally:
277 database.db.close()
278
279 async def delete_orphaned_alarms(self, nsr_id):
280 log.info("Deleting orphaned scaling alarms for network service %s", nsr_id)
281 database.db.connect()
282 try:
283 with database.db.atomic() as tx:
284 try:
285 for scaling_group in ScalingGroupRepository.list(ScalingGroup.nsr_id == nsr_id):
286 for scaling_policy in scaling_group.scaling_policies:
287 for scaling_criteria in scaling_policy.scaling_criterias:
288 for alarm in scaling_criteria.scaling_alarms:
289 try:
290 self.db_client.get_vdur(nsr_id, alarm.vnf_member_index, alarm.vdu_name)
291 except VdurNotFound:
292 log.debug("Deleting orphaned scaling alarm %s", alarm.alarm_uuid)
293 try:
294 await self.mon_client.delete_alarm(
295 alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
296 alarm.vnf_member_index,
297 alarm.vdu_name,
298 alarm.alarm_uuid)
299 except ValueError:
300 log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid)
301 alarm.delete_instance()
302
303 except Exception as e:
304 log.exception("Error deleting orphaned alarms:")
305 tx.rollback()
306 raise e
307 finally:
308 database.db.close()
309
310 def get_nslcmop(self, nslcmop_id):
311 return self.db_client.get_nslcmop(nslcmop_id)
312
313 async def handle_alarm(self, alarm_uuid: str, status: str):
314 await self.update_alarm_status(alarm_uuid, status)
315 await self.evaluate_policy(alarm_uuid)
316
317 async def update_alarm_status(self, alarm_uuid: str, status: str):
318 database.db.connect()
319 try:
320 with database.db.atomic():
321 alarm = ScalingAlarmRepository.get(ScalingAlarm.alarm_uuid == alarm_uuid)
322 alarm.last_status = status
323 alarm.save()
324 except ScalingAlarm.DoesNotExist:
325 log.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid)
326 finally:
327 database.db.close()
328
329 async def evaluate_policy(self, alarm_uuid):
330 database.db.connect()
331 try:
332 with database.db.atomic():
333 alarm = ScalingAlarmRepository.get(ScalingAlarm.alarm_uuid == alarm_uuid)
334 vnf_member_index = alarm.vnf_member_index
335 action = alarm.action
336 scaling_policy = alarm.scaling_criteria.scaling_policy
337 if not scaling_policy.enabled:
338 return
339 if action == 'scale_in':
340 operation = scaling_policy.scale_in_operation
341 elif action == 'scale_out':
342 operation = scaling_policy.scale_out_operation
343 else:
344 raise Exception('Unknown alarm action {}'.format(alarm.action))
345 alarms = ScalingAlarmRepository.list(ScalingAlarm.scaling_criteria == alarm.scaling_criteria,
346 ScalingAlarm.action == alarm.action,
347 ScalingAlarm.vnf_member_index == vnf_member_index,
348 ScalingAlarm.vdu_name == alarm.vdu_name)
349 statuses = []
350 for alarm in alarms:
351 statuses.append(alarm.last_status)
352 if (operation == 'AND' and set(statuses) == {'alarm'}) or (operation == 'OR' and 'alarm' in statuses):
353 delta = datetime.datetime.now() - scaling_policy.last_scale
354 if delta.total_seconds() > scaling_policy.cooldown_time:
355 log.info("Sending %s action message for ns: %s",
356 alarm.action,
357 scaling_policy.scaling_group.nsr_id)
358 await self.lcm_client.scale(scaling_policy.scaling_group.nsr_id,
359 scaling_policy.scaling_group.name,
360 vnf_member_index,
361 action)
362 scaling_policy.last_scale = datetime.datetime.now()
363 scaling_policy.save()
364
365 except ScalingAlarm.DoesNotExist:
366 log.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid)
367 finally:
368 database.db.close()