1 # -*- coding: utf-8 -*-
2 # pylint: disable=no-member
4 # Copyright 2018 Whitestack, LLC
5 # *************************************************************
7 # This file is part of OSM Monitoring module
8 # All Rights Reserved to Whitestack, LLC
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact: bdiaz@whitestack.com or glavado@whitestack.com
29 from typing
import List
31 from osm_policy_module
.common
.common_db_client
import CommonDbClient
32 from osm_policy_module
.common
.lcm_client
import LcmClient
33 from osm_policy_module
.common
.mon_client
import MonClient
34 from osm_policy_module
.core
import database
35 from osm_policy_module
.core
.config
import Config
36 from osm_policy_module
.core
.database
import ScalingGroup
, ScalingAlarm
, ScalingPolicy
, ScalingCriteria
, \
37 ScalingAlarmRepository
, ScalingGroupRepository
, ScalingPolicyRepository
, ScalingCriteriaRepository
38 from osm_policy_module
.core
.exceptions
import VdurNotFound
39 from osm_policy_module
.utils
.vnfd
import VnfdUtils
41 log
= logging
.getLogger(__name__
)
44 class AutoscalingService
:
46 def __init__(self
, config
: Config
, loop
=None):
49 loop
= asyncio
.get_event_loop()
51 self
.db_client
= CommonDbClient(config
)
52 self
.mon_client
= MonClient(config
, loop
=self
.loop
)
53 self
.lcm_client
= LcmClient(config
, loop
=self
.loop
)
55 async def configure_scaling_groups(self
, nsr_id
: str):
57 Configures scaling groups for a network service. Creates records in DB. Creates alarms in MON.
58 :param nsr_id: Network service record id
61 log
.info("Configuring scaling groups for network service with nsr_id: %s",
66 with database
.db
.atomic() as tx
:
68 vnfrs
= self
.db_client
.get_vnfrs(nsr_id
)
70 log
.debug("Processing vnfr: %s", vnfr
)
71 vnfd
= self
.db_client
.get_vnfd(vnfr
['vnfd-id'])
72 if 'scaling-group-descriptor' not in vnfd
:
73 log
.debug("No scaling group present in vnfd")
75 scaling_groups
= vnfd
['scaling-group-descriptor']
76 vnf_monitoring_params
= vnfd
['monitoring-param']
77 for scaling_group
in scaling_groups
:
78 scaling_group_record
= self
._get
_or
_create
_scaling
_group
(nsr_id
,
79 vnfr
['member-vnf-index-ref'],
81 for scaling_policy
in scaling_group
['scaling-policy']:
82 if scaling_policy
['scaling-type'] != 'automatic':
84 scaling_policy_record
= self
._get
_or
_create
_scaling
_policy
(nsr_id
,
88 for scaling_criteria
in scaling_policy
['scaling-criteria']:
89 scaling_criteria_record
= self
._get
_or
_create
_scaling
_criteria
(
94 vnf_monitoring_param
= next(
96 lambda param
: param
['id'] == scaling_criteria
[
97 'vnf-monitoring-param-ref'
99 vnf_monitoring_params
)
101 vdurs
= self
._get
_monitored
_vdurs
(vnf_monitoring_param
, vnfr
['vdur'], vnfd
)
103 log
.debug("Creating alarm for vdur %s ", vdur
)
105 ScalingAlarmRepository
.get(ScalingAlarm
.vdu_name
== vdur
['name'],
106 ScalingCriteria
.name
== scaling_criteria
['name'],
107 ScalingPolicy
.name
== scaling_policy
['name'],
108 ScalingGroup
.nsr_id
== nsr_id
,
109 join_classes
=[ScalingCriteria
,
112 log
.debug("vdu %s already has an alarm configured", vdur
['name'])
114 except ScalingAlarm
.DoesNotExist
:
116 metric_name
= self
._get
_metric
_name
(vnf_monitoring_param
, vdur
, vnfd
)
117 alarm_uuid
= await self
.mon_client
.create_alarm(
118 metric_name
=metric_name
,
120 vdu_name
=vdur
['name'],
121 vnf_member_index
=vnfr
['member-vnf-index-ref'],
122 threshold
=scaling_criteria
['scale-in-threshold'],
123 operation
=scaling_criteria
['scale-in-relational-operation'],
124 statistic
=vnf_monitoring_param
['aggregation-type']
126 alarm
= ScalingAlarmRepository
.create(
127 alarm_uuid
=alarm_uuid
,
129 vnf_member_index
=vnfr
['member-vnf-index-ref'],
130 vdu_name
=vdur
['name'],
131 scaling_criteria
=scaling_criteria_record
133 alarms_created
.append(alarm
)
134 alarm_uuid
= await self
.mon_client
.create_alarm(
135 metric_name
=metric_name
,
137 vdu_name
=vdur
['name'],
138 vnf_member_index
=vnfr
['member-vnf-index-ref'],
139 threshold
=scaling_criteria
['scale-out-threshold'],
140 operation
=scaling_criteria
['scale-out-relational-operation'],
141 statistic
=vnf_monitoring_param
['aggregation-type']
143 alarm
= ScalingAlarmRepository
.create(
144 alarm_uuid
=alarm_uuid
,
146 vnf_member_index
=vnfr
['member-vnf-index-ref'],
147 vdu_name
=vdur
['name'],
148 scaling_criteria
=scaling_criteria_record
150 alarms_created
.append(alarm
)
152 except Exception as e
:
153 log
.exception("Error configuring scaling groups:")
155 if len(alarms_created
) > 0:
156 log
.info("Cleaning alarm resources in MON")
157 for alarm
in alarms_created
:
158 await self
.mon_client
.delete_alarm(
159 alarm
.scaling_criteria
.scaling_policy
.scaling_group
.nsr_id
,
160 alarm
.vnf_member_index
,
167 async def delete_scaling_groups(self
, nsr_id
: str):
168 log
.debug("Deleting scaling groups for network service %s", nsr_id
)
169 database
.db
.connect()
171 with database
.db
.atomic() as tx
:
173 for scaling_group
in ScalingGroupRepository
.list(ScalingGroup
.nsr_id
== nsr_id
):
174 for scaling_policy
in scaling_group
.scaling_policies
:
175 for scaling_criteria
in scaling_policy
.scaling_criterias
:
176 for alarm
in scaling_criteria
.scaling_alarms
:
178 await self
.mon_client
.delete_alarm(
179 alarm
.scaling_criteria
.scaling_policy
.scaling_group
.nsr_id
,
180 alarm
.vnf_member_index
,
184 log
.exception("Error deleting alarm in MON %s", alarm
.alarm_uuid
)
185 alarm
.delete_instance()
186 scaling_criteria
.delete_instance()
187 scaling_policy
.delete_instance()
188 scaling_group
.delete_instance()
190 except Exception as e
:
191 log
.exception("Error deleting scaling groups and alarms:")
197 async def delete_orphaned_alarms(self
, nsr_id
):
198 log
.info("Deleting orphaned scaling alarms for network service %s", nsr_id
)
199 database
.db
.connect()
201 with database
.db
.atomic() as tx
:
203 for scaling_group
in ScalingGroupRepository
.list(ScalingGroup
.nsr_id
== nsr_id
):
204 for scaling_policy
in scaling_group
.scaling_policies
:
205 for scaling_criteria
in scaling_policy
.scaling_criterias
:
206 for alarm
in scaling_criteria
.scaling_alarms
:
208 self
.db_client
.get_vdur(nsr_id
, alarm
.vnf_member_index
, alarm
.vdu_name
)
210 log
.debug("Deleting orphaned scaling alarm %s", alarm
.alarm_uuid
)
212 await self
.mon_client
.delete_alarm(
213 alarm
.scaling_criteria
.scaling_policy
.scaling_group
.nsr_id
,
214 alarm
.vnf_member_index
,
218 log
.exception("Error deleting alarm in MON %s", alarm
.alarm_uuid
)
219 alarm
.delete_instance()
221 except Exception as e
:
222 log
.exception("Error deleting orphaned alarms:")
228 async def handle_alarm(self
, alarm_uuid
: str, status
: str):
229 await self
.update_alarm_status(alarm_uuid
, status
)
230 await self
.evaluate_policy(alarm_uuid
)
232 async def update_alarm_status(self
, alarm_uuid
: str, status
: str):
233 database
.db
.connect()
235 with database
.db
.atomic():
236 alarm
= ScalingAlarmRepository
.get(ScalingAlarm
.alarm_uuid
== alarm_uuid
)
237 alarm
.last_status
= status
239 except ScalingAlarm
.DoesNotExist
:
240 log
.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid
)
244 async def evaluate_policy(self
, alarm_uuid
):
245 database
.db
.connect()
247 with database
.db
.atomic():
248 alarm
= ScalingAlarmRepository
.get(ScalingAlarm
.alarm_uuid
== alarm_uuid
)
249 vnf_member_index
= alarm
.vnf_member_index
250 action
= alarm
.action
251 scaling_policy
= alarm
.scaling_criteria
.scaling_policy
252 if not scaling_policy
.enabled
:
254 if action
== 'scale_in':
255 operation
= scaling_policy
.scale_in_operation
256 elif action
== 'scale_out':
257 operation
= scaling_policy
.scale_out_operation
259 raise Exception('Unknown alarm action {}'.format(alarm
.action
))
260 alarms
= ScalingAlarmRepository
.list(ScalingAlarm
.scaling_criteria
== alarm
.scaling_criteria
,
261 ScalingAlarm
.action
== alarm
.action
,
262 ScalingAlarm
.vnf_member_index
== vnf_member_index
,
263 ScalingAlarm
.vdu_name
== alarm
.vdu_name
)
266 statuses
.append(alarm
.last_status
)
267 if (operation
== 'AND' and set(statuses
) == {'alarm'}) or (operation
== 'OR' and 'alarm' in statuses
):
268 delta
= datetime
.datetime
.now() - scaling_policy
.last_scale
269 if delta
.total_seconds() > scaling_policy
.cooldown_time
:
270 log
.info("Sending %s action message for ns: %s",
272 scaling_policy
.scaling_group
.nsr_id
)
273 await self
.lcm_client
.scale(scaling_policy
.scaling_group
.nsr_id
,
274 scaling_policy
.scaling_group
.name
,
277 scaling_policy
.last_scale
= datetime
.datetime
.now()
278 scaling_policy
.save()
280 except ScalingAlarm
.DoesNotExist
:
281 log
.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid
)
285 def _get_or_create_scaling_group(self
, nsr_id
: str, vnf_member_index
: str, scaling_group
: dict):
287 scaling_group_record
= ScalingGroupRepository
.get(
288 ScalingGroup
.nsr_id
== nsr_id
,
289 ScalingGroup
.vnf_member_index
== vnf_member_index
,
290 ScalingGroup
.name
== scaling_group
['name']
292 log
.debug("Found existing scaling group record in DB...")
293 except ScalingGroup
.DoesNotExist
:
294 log
.debug("Creating scaling group record in DB...")
295 scaling_group_record
= ScalingGroupRepository
.create(
297 vnf_member_index
=vnf_member_index
,
298 name
=scaling_group
['name'],
299 content
=json
.dumps(scaling_group
)
302 "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s",
303 scaling_group_record
.nsr_id
,
304 scaling_group_record
.vnf_member_index
,
305 scaling_group_record
.name
)
306 return scaling_group_record
308 def _get_or_create_scaling_policy(self
, nsr_id
: str, scaling_policy
: dict, scaling_group_record
: ScalingGroup
):
310 scaling_policy_record
= ScalingPolicyRepository
.get(
311 ScalingPolicy
.name
== scaling_policy
['name'],
312 ScalingGroup
.id == scaling_group_record
.id,
313 join_classes
=[ScalingGroup
]
315 log
.debug("Found existing scaling policy record in DB...")
316 except ScalingPolicy
.DoesNotExist
:
317 log
.debug("Creating scaling policy record in DB...")
318 scaling_policy_record
= ScalingPolicyRepository
.create(
320 name
=scaling_policy
['name'],
321 cooldown_time
=scaling_policy
['cooldown-time'],
322 scaling_group
=scaling_group_record
,
324 if 'scale-in-operation-type' in scaling_policy
:
325 scaling_policy_record
.scale_in_operation
= scaling_policy
[
326 'scale-in-operation-type']
327 if 'scale-out-operation-type' in scaling_policy
:
328 scaling_policy_record
.scale_out_operation
= scaling_policy
[
329 'scale-out-operation-type']
330 if 'enabled' in scaling_policy
:
331 scaling_policy_record
.enabled
= scaling_policy
['enabled']
332 scaling_policy_record
.save()
333 log
.debug("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
334 scaling_policy_record
.name
,
335 scaling_policy_record
.scaling_group
.name
)
336 return scaling_policy_record
338 def _get_or_create_scaling_criteria(self
, nsr_id
: str, scaling_criteria
: dict,
339 scaling_policy_record
: ScalingPolicy
):
341 scaling_criteria_record
= ScalingCriteriaRepository
.get(
342 ScalingPolicy
.id == scaling_policy_record
.id,
343 ScalingCriteria
.name
== scaling_criteria
['name'],
344 join_classes
=[ScalingPolicy
]
346 log
.debug("Found existing scaling criteria record in DB...")
347 except ScalingCriteria
.DoesNotExist
:
348 log
.debug("Creating scaling criteria record in DB...")
349 scaling_criteria_record
= ScalingCriteriaRepository
.create(
351 name
=scaling_criteria
['name'],
352 scaling_policy
=scaling_policy_record
355 "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
356 scaling_criteria_record
.name
,
357 scaling_criteria_record
.scaling_policy
.name
)
358 return scaling_criteria_record
360 def _get_monitored_vdurs(self
, vnf_monitoring_param
: dict, vdurs
: List
[dict], vnfd
: dict):
362 if 'vdu-monitoring-param' in vnf_monitoring_param
:
363 monitored_vdurs
= list(
365 lambda vdur
: vdur
['vdu-id-ref'] == vnf_monitoring_param
366 ['vdu-monitoring-param']
371 elif 'vdu-metric' in vnf_monitoring_param
:
372 monitored_vdurs
= list(
374 lambda vdur
: vdur
['vdu-id-ref'] == vnf_monitoring_param
380 elif 'vnf-metric' in vnf_monitoring_param
:
381 vdu
= VnfdUtils
.get_mgmt_vdu(vnfd
)
382 monitored_vdurs
= list(
384 lambda vdur
: vdur
['vdu-id-ref'] == vdu
['id'],
390 "Scaling criteria is referring to a vnf-monitoring-param that does not "
391 "contain a reference to a vdu or vnf metric.")
392 return monitored_vdurs
394 def _get_metric_name(self
, vnf_monitoring_param
: dict, vdur
: dict, vnfd
: dict):
396 filter(lambda vdu
: vdu
['id'] == vdur
['vdu-id-ref'], vnfd
['vdu'])
398 if 'vdu-monitoring-param' in vnf_monitoring_param
:
399 vdu_monitoring_param
= next(filter(
400 lambda param
: param
['id'] == vnf_monitoring_param
['vdu-monitoring-param'][
401 'vdu-monitoring-param-ref'], vdu
['monitoring-param']))
402 nfvi_metric
= vdu_monitoring_param
['nfvi-metric']
404 if 'vdu-metric' in vnf_monitoring_param
:
405 vnf_metric_name
= vnf_monitoring_param
['vdu-metric']['vdu-metric-name-ref']
406 return vnf_metric_name
407 if 'vnf-metric' in vnf_monitoring_param
:
408 vnf_metric_name
= vnf_monitoring_param
['vnf-metric']['vnf-metric-name-ref']
409 return vnf_metric_name
410 raise ValueError('No metric name found for vnf_monitoring_param %s' % vnf_monitoring_param
['id'])