1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
29 from osm_policy_module
.common
.common_db_client
import CommonDbClient
30 from osm_policy_module
.common
.lcm_client
import LcmClient
31 from osm_policy_module
.common
.mon_client
import MonClient
32 from osm_policy_module
.core
import database
33 from osm_policy_module
.core
.config
import Config
34 from osm_policy_module
.core
.database
import ScalingGroup
, ScalingAlarm
, ScalingPolicy
, ScalingCriteria
, \
35 ScalingAlarmRepository
, ScalingGroupRepository
, ScalingPolicyRepository
, ScalingCriteriaRepository
36 from osm_policy_module
.core
.exceptions
import VdurNotFound
37 from osm_policy_module
.utils
.vnfd
import VnfdUtils
39 log
= logging
.getLogger(__name__
)
42 class AutoscalingService
:
44 def __init__(self
, config
: Config
, loop
=None):
47 loop
= asyncio
.get_event_loop()
49 self
.db_client
= CommonDbClient(config
)
50 self
.mon_client
= MonClient(config
, loop
=self
.loop
)
51 self
.lcm_client
= LcmClient(config
, loop
=self
.loop
)
53 async def configure_scaling_groups(self
, nsr_id
: str):
55 Configures scaling groups for a network service. Creates records in DB. Creates alarms in MON.
56 :param nsr_id: Network service record id
59 log
.info("Configuring scaling groups for network service with nsr_id: %s",
64 with database
.db
.atomic() as tx
:
66 vnfrs
= self
.db_client
.get_vnfrs(nsr_id
)
68 log
.debug("Processing vnfr: %s", vnfr
)
69 vnfd
= self
.db_client
.get_vnfd(vnfr
['vnfd-id'])
70 if 'scaling-group-descriptor' not in vnfd
:
71 log
.debug("No scaling group present in vnfd")
73 scaling_groups
= vnfd
['scaling-group-descriptor']
74 vnf_monitoring_params
= vnfd
['monitoring-param']
75 for scaling_group
in scaling_groups
:
77 scaling_group_record
= ScalingGroupRepository
.get(
78 ScalingGroup
.nsr_id
== nsr_id
,
79 ScalingGroup
.vnf_member_index
== vnfr
['member-vnf-index-ref'],
80 ScalingGroup
.name
== scaling_group
['name']
82 log
.debug("Found existing scaling group record in DB...")
83 except ScalingGroup
.DoesNotExist
:
84 log
.debug("Creating scaling group record in DB...")
85 scaling_group_record
= ScalingGroupRepository
.create(
87 vnf_member_index
=vnfr
['member-vnf-index-ref'],
88 name
=scaling_group
['name'],
89 content
=json
.dumps(scaling_group
)
92 "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s",
93 scaling_group_record
.nsr_id
,
94 scaling_group_record
.vnf_member_index
,
95 scaling_group_record
.name
)
96 for scaling_policy
in scaling_group
['scaling-policy']:
97 if scaling_policy
['scaling-type'] != 'automatic':
100 scaling_policy_record
= ScalingPolicyRepository
.get(
101 ScalingPolicy
.name
== scaling_policy
['name'],
102 ScalingGroup
.id == scaling_group_record
.id,
103 join_classes
=[ScalingGroup
]
105 log
.debug("Found existing scaling policy record in DB...")
106 except ScalingPolicy
.DoesNotExist
:
107 log
.debug("Creating scaling policy record in DB...")
108 scaling_policy_record
= ScalingPolicyRepository
.create(
110 name
=scaling_policy
['name'],
111 cooldown_time
=scaling_policy
['cooldown-time'],
112 scaling_group
=scaling_group_record
,
114 if 'scale-in-operation-type' in scaling_policy
:
115 scaling_policy_record
.scale_in_operation
= scaling_policy
[
116 'scale-in-operation-type']
117 if 'scale-out-operation-type' in scaling_policy
:
118 scaling_policy_record
.scale_out_operation
= scaling_policy
[
119 'scale-out-operation-type']
120 if 'enabled' in scaling_policy
:
121 scaling_policy_record
.enabled
= scaling_policy
['enabled']
122 scaling_policy_record
.save()
123 log
.debug("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
124 scaling_policy_record
.name
,
125 scaling_policy_record
.scaling_group
.name
)
127 for scaling_criteria
in scaling_policy
['scaling-criteria']:
129 scaling_criteria_record
= ScalingCriteriaRepository
.get(
130 ScalingPolicy
.id == scaling_policy_record
.id,
131 ScalingCriteria
.name
== scaling_criteria
['name'],
132 join_classes
=[ScalingPolicy
]
134 log
.debug("Found existing scaling criteria record in DB...")
135 except ScalingCriteria
.DoesNotExist
:
136 log
.debug("Creating scaling criteria record in DB...")
137 scaling_criteria_record
= ScalingCriteriaRepository
.create(
139 name
=scaling_criteria
['name'],
140 scaling_policy
=scaling_policy_record
143 "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
144 scaling_criteria_record
.name
,
145 scaling_criteria_record
.scaling_policy
.name
)
147 vnf_monitoring_param
= next(
149 lambda param
: param
['id'] == scaling_criteria
[
150 'vnf-monitoring-param-ref'
152 vnf_monitoring_params
)
154 if 'vdu-monitoring-param' in vnf_monitoring_param
:
157 lambda vdur
: vdur
['vdu-id-ref'] == vnf_monitoring_param
158 ['vdu-monitoring-param']
163 elif 'vdu-metric' in vnf_monitoring_param
:
166 lambda vdur
: vdur
['vdu-id-ref'] == vnf_monitoring_param
172 elif 'vnf-metric' in vnf_monitoring_param
:
173 vdu
= VnfdUtils
.get_mgmt_vdu(vnfd
)
176 lambda vdur
: vdur
['vdu-id-ref'] == vdu
['id'],
182 "Scaling criteria is referring to a vnf-monitoring-param that does not "
183 "contain a reference to a vdu or vnf metric.")
186 log
.debug("Creating alarm for vdur %s ", vdur
)
188 ScalingAlarmRepository
.get(ScalingAlarm
.vdu_name
== vdur
['name'],
189 ScalingCriteria
.name
== scaling_criteria
['name'],
190 ScalingPolicy
.name
== scaling_policy
['name'],
191 ScalingGroup
.nsr_id
== nsr_id
,
192 join_classes
=[ScalingCriteria
,
195 log
.debug("vdu %s already has an alarm configured", vdur
['name'])
197 except ScalingAlarm
.DoesNotExist
:
199 alarm_uuid
= await self
.mon_client
.create_alarm(
200 metric_name
=vnf_monitoring_param
['id'],
202 vdu_name
=vdur
['name'],
203 vnf_member_index
=vnfr
['member-vnf-index-ref'],
204 threshold
=scaling_criteria
['scale-in-threshold'],
205 operation
=scaling_criteria
['scale-in-relational-operation'],
206 statistic
=vnf_monitoring_param
['aggregation-type']
208 alarm
= ScalingAlarmRepository
.create(
209 alarm_uuid
=alarm_uuid
,
211 vnf_member_index
=vnfr
['member-vnf-index-ref'],
212 vdu_name
=vdur
['name'],
213 scaling_criteria
=scaling_criteria_record
215 alarms_created
.append(alarm
)
216 alarm_uuid
= await self
.mon_client
.create_alarm(
217 metric_name
=vnf_monitoring_param
['id'],
219 vdu_name
=vdur
['name'],
220 vnf_member_index
=vnfr
['member-vnf-index-ref'],
221 threshold
=scaling_criteria
['scale-out-threshold'],
222 operation
=scaling_criteria
['scale-out-relational-operation'],
223 statistic
=vnf_monitoring_param
['aggregation-type']
225 alarm
= ScalingAlarmRepository
.create(
226 alarm_uuid
=alarm_uuid
,
228 vnf_member_index
=vnfr
['member-vnf-index-ref'],
229 vdu_name
=vdur
['name'],
230 scaling_criteria
=scaling_criteria_record
232 alarms_created
.append(alarm
)
234 except Exception as e
:
235 log
.exception("Error configuring scaling groups:")
237 if len(alarms_created
) > 0:
238 log
.info("Cleaning alarm resources in MON")
239 for alarm
in alarms_created
:
240 await self
.mon_client
.delete_alarm(
241 alarm
.scaling_criteria
.scaling_policy
.scaling_group
.nsr_id
,
242 alarm
.vnf_member_index
,
249 async def delete_scaling_groups(self
, nsr_id
: str):
250 log
.debug("Deleting scaling groups for network service %s", nsr_id
)
251 database
.db
.connect()
253 with database
.db
.atomic() as tx
:
255 for scaling_group
in ScalingGroupRepository
.list(ScalingGroup
.nsr_id
== nsr_id
):
256 for scaling_policy
in scaling_group
.scaling_policies
:
257 for scaling_criteria
in scaling_policy
.scaling_criterias
:
258 for alarm
in scaling_criteria
.scaling_alarms
:
260 await self
.mon_client
.delete_alarm(
261 alarm
.scaling_criteria
.scaling_policy
.scaling_group
.nsr_id
,
262 alarm
.vnf_member_index
,
266 log
.exception("Error deleting alarm in MON %s", alarm
.alarm_uuid
)
267 alarm
.delete_instance()
268 scaling_criteria
.delete_instance()
269 scaling_policy
.delete_instance()
270 scaling_group
.delete_instance()
272 except Exception as e
:
273 log
.exception("Error deleting scaling groups and alarms:")
279 async def delete_orphaned_alarms(self
, nsr_id
):
280 log
.info("Deleting orphaned scaling alarms for network service %s", nsr_id
)
281 database
.db
.connect()
283 with database
.db
.atomic() as tx
:
285 for scaling_group
in ScalingGroupRepository
.list(ScalingGroup
.nsr_id
== nsr_id
):
286 for scaling_policy
in scaling_group
.scaling_policies
:
287 for scaling_criteria
in scaling_policy
.scaling_criterias
:
288 for alarm
in scaling_criteria
.scaling_alarms
:
290 self
.db_client
.get_vdur(nsr_id
, alarm
.vnf_member_index
, alarm
.vdu_name
)
292 log
.debug("Deleting orphaned scaling alarm %s", alarm
.alarm_uuid
)
294 await self
.mon_client
.delete_alarm(
295 alarm
.scaling_criteria
.scaling_policy
.scaling_group
.nsr_id
,
296 alarm
.vnf_member_index
,
300 log
.exception("Error deleting alarm in MON %s", alarm
.alarm_uuid
)
301 alarm
.delete_instance()
303 except Exception as e
:
304 log
.exception("Error deleting orphaned alarms:")
310 def get_nslcmop(self
, nslcmop_id
):
311 return self
.db_client
.get_nslcmop(nslcmop_id
)
313 async def handle_alarm(self
, alarm_uuid
: str, status
: str):
314 await self
.update_alarm_status(alarm_uuid
, status
)
315 await self
.evaluate_policy(alarm_uuid
)
317 async def update_alarm_status(self
, alarm_uuid
: str, status
: str):
318 database
.db
.connect()
320 with database
.db
.atomic():
321 alarm
= ScalingAlarmRepository
.get(ScalingAlarm
.alarm_uuid
== alarm_uuid
)
322 alarm
.last_status
= status
324 except ScalingAlarm
.DoesNotExist
:
325 log
.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid
)
329 async def evaluate_policy(self
, alarm_uuid
):
330 database
.db
.connect()
332 with database
.db
.atomic():
333 alarm
= ScalingAlarmRepository
.get(ScalingAlarm
.alarm_uuid
== alarm_uuid
)
334 vnf_member_index
= alarm
.vnf_member_index
335 action
= alarm
.action
336 scaling_policy
= alarm
.scaling_criteria
.scaling_policy
337 if not scaling_policy
.enabled
:
339 if action
== 'scale_in':
340 operation
= scaling_policy
.scale_in_operation
341 elif action
== 'scale_out':
342 operation
= scaling_policy
.scale_out_operation
344 raise Exception('Unknown alarm action {}'.format(alarm
.action
))
345 alarms
= ScalingAlarmRepository
.list(ScalingAlarm
.scaling_criteria
== alarm
.scaling_criteria
,
346 ScalingAlarm
.action
== alarm
.action
,
347 ScalingAlarm
.vnf_member_index
== vnf_member_index
,
348 ScalingAlarm
.vdu_name
== alarm
.vdu_name
)
351 statuses
.append(alarm
.last_status
)
352 if (operation
== 'AND' and set(statuses
) == {'alarm'}) or (operation
== 'OR' and 'alarm' in statuses
):
353 delta
= datetime
.datetime
.now() - scaling_policy
.last_scale
354 if delta
.total_seconds() > scaling_policy
.cooldown_time
:
355 log
.info("Sending %s action message for ns: %s",
357 scaling_policy
.scaling_group
.nsr_id
)
358 await self
.lcm_client
.scale(scaling_policy
.scaling_group
.nsr_id
,
359 scaling_policy
.scaling_group
.name
,
362 scaling_policy
.last_scale
= datetime
.datetime
.now()
363 scaling_policy
.save()
365 except ScalingAlarm
.DoesNotExist
:
366 log
.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid
)