1 # -*- coding: utf-8 -*-
2 # pylint: disable=no-member
4 # Copyright 2018 Whitestack, LLC
5 # *************************************************************
7 # This file is part of OSM Monitoring module
8 # All Rights Reserved to Whitestack, LLC
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact: bdiaz@whitestack.com or glavado@whitestack.com
30 from osm_policy_module
.common
.common_db_client
import CommonDbClient
31 from osm_policy_module
.common
.lcm_client
import LcmClient
32 from osm_policy_module
.common
.mon_client
import MonClient
33 from osm_policy_module
.core
import database
34 from osm_policy_module
.core
.config
import Config
35 from osm_policy_module
.core
.database
import ScalingGroup
, ScalingAlarm
, ScalingPolicy
, ScalingCriteria
, \
36 ScalingAlarmRepository
, ScalingGroupRepository
, ScalingPolicyRepository
, ScalingCriteriaRepository
37 from osm_policy_module
.core
.exceptions
import VdurNotFound
39 log
= logging
.getLogger(__name__
)
42 class AutoscalingService
:
44 def __init__(self
, config
: Config
, loop
=None):
47 loop
= asyncio
.get_event_loop()
49 self
.db_client
= CommonDbClient(config
)
50 self
.mon_client
= MonClient(config
, loop
=self
.loop
)
51 self
.lcm_client
= LcmClient(config
, loop
=self
.loop
)
53 async def configure_scaling_groups(self
, nsr_id
: str):
55 Configures scaling groups for a network service. Creates records in DB. Creates alarms in MON.
56 :param nsr_id: Network service record id
59 log
.info("Configuring scaling groups for network service with nsr_id: %s",
64 with database
.db
.atomic() as tx
:
66 vnfrs
= self
.db_client
.get_vnfrs(nsr_id
)
68 log
.debug("Processing vnfr: %s", vnfr
)
69 vnfd
= self
.db_client
.get_vnfd(vnfr
['vnfd-id'])
70 # TODO: Change for multiple DF support
71 df
= vnfd
.get('df', [{}])[0]
72 if 'scaling-aspect' not in df
:
73 log
.debug("No scaling aspect present in vnfd")
75 # TODO: Change for multiple instantiation levels support
76 instantiation_level
= df
.get('instantiation-level', [{}])[0]
77 scaling_aspects
= df
['scaling-aspect']
78 all_vnfd_monitoring_params
= self
._get
_all
_vnfd
_monitoring
_params
(vnfd
)
79 for scaling_aspect
in scaling_aspects
:
80 scaling_group_record
= self
._get
_or
_create
_scaling
_group
(nsr_id
,
81 vnfr
['member-vnf-index-ref'],
83 vdurs
= self
._get
_monitored
_vdurs
(scaling_aspect
, vnfr
['vdur'])
84 for scaling_policy
in scaling_aspect
.get('scaling-policy', ()):
85 if scaling_policy
['scaling-type'] != 'automatic':
87 scaling_policy_record
= self
._get
_or
_create
_scaling
_policy
(nsr_id
,
91 for scaling_criteria
in scaling_policy
['scaling-criteria']:
92 scaling_criteria_record
= self
._get
_or
_create
_scaling
_criteria
(
97 monitoring_param_ref
= scaling_criteria
.get('vnf-monitoring-param-ref')
98 vnf_monitoring_param
= all_vnfd_monitoring_params
[monitoring_param_ref
]
101 vdu_id
= vdur
['vdu-id-ref']
102 log
.debug("Creating alarm for vdur %s ", vdur
)
104 ScalingAlarmRepository
.get(ScalingAlarm
.vdu_name
== vdur
['name'],
105 ScalingCriteria
.name
== scaling_criteria
['name'],
106 ScalingPolicy
.name
== scaling_policy
['name'],
107 ScalingGroup
.nsr_id
== nsr_id
,
108 join_classes
=[ScalingCriteria
,
111 log
.debug("vdu %s already has an alarm configured", vdur
['name'])
113 except ScalingAlarm
.DoesNotExist
:
115 metric_name
= self
._get
_metric
_name
(vnf_monitoring_param
)
117 db_nsr
= self
.db_client
.get_nsr(nsr_id
)
119 if db_nsr
["_admin"].get("scaling-group"):
120 db_nsr_admin
= db_nsr
["_admin"]["scaling-group"]
121 for admin_scale_index
, admin_scale_info
in enumerate(db_nsr_admin
):
122 if admin_scale_info
["name"] == scaling_aspect
["name"]:
123 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
125 for vdu_level
in instantiation_level
.get('vdu-level', ()):
126 if vdu_level
.get('vdu-id') == vdu_id
:
127 min_instance_count
= int(vdu_level
['number-of-instances'])
128 if nb_scale_op
> min_instance_count
:
129 alarm_uuid
= await self
.mon_client
.create_alarm(
130 metric_name
=metric_name
,
132 vdu_name
=vdur
['name'],
133 vnf_member_index
=vnfr
['member-vnf-index-ref'],
134 threshold
=scaling_criteria
['scale-in-threshold'],
135 operation
=scaling_criteria
['scale-in-relational-operation']
137 alarm
= ScalingAlarmRepository
.create(
138 alarm_uuid
=alarm_uuid
,
140 vnf_member_index
=vnfr
['member-vnf-index-ref'],
141 vdu_name
=vdur
['name'],
142 scaling_criteria
=scaling_criteria_record
144 alarms_created
.append(alarm
)
145 alarm_uuid
= await self
.mon_client
.create_alarm(
146 metric_name
=metric_name
,
148 vdu_name
=vdur
['name'],
149 vnf_member_index
=vnfr
['member-vnf-index-ref'],
150 threshold
=scaling_criteria
['scale-out-threshold'],
151 operation
=scaling_criteria
['scale-out-relational-operation']
153 alarm
= ScalingAlarmRepository
.create(
154 alarm_uuid
=alarm_uuid
,
156 vnf_member_index
=vnfr
['member-vnf-index-ref'],
157 vdu_name
=vdur
['name'],
158 scaling_criteria
=scaling_criteria_record
160 alarms_created
.append(alarm
)
162 except Exception as e
:
163 log
.exception("Error configuring scaling groups:")
165 if len(alarms_created
) > 0:
166 log
.info("Cleaning alarm resources in MON")
167 for alarm
in alarms_created
:
168 await self
.mon_client
.delete_alarm(
169 alarm
.scaling_criteria
.scaling_policy
.scaling_group
.nsr_id
,
170 alarm
.vnf_member_index
,
177 async def delete_scaling_groups(self
, nsr_id
: str):
178 log
.debug("Deleting scaling groups for network service %s", nsr_id
)
179 database
.db
.connect()
181 with database
.db
.atomic() as tx
:
183 for scaling_group
in ScalingGroupRepository
.list(ScalingGroup
.nsr_id
== nsr_id
):
184 for scaling_policy
in scaling_group
.scaling_policies
:
185 for scaling_criteria
in scaling_policy
.scaling_criterias
:
186 for alarm
in scaling_criteria
.scaling_alarms
:
188 await self
.mon_client
.delete_alarm(
189 alarm
.scaling_criteria
.scaling_policy
.scaling_group
.nsr_id
,
190 alarm
.vnf_member_index
,
194 log
.exception("Error deleting alarm in MON %s", alarm
.alarm_uuid
)
195 alarm
.delete_instance()
196 scaling_criteria
.delete_instance()
197 scaling_policy
.delete_instance()
198 scaling_group
.delete_instance()
200 except Exception as e
:
201 log
.exception("Error deleting scaling groups and alarms:")
207 async def delete_orphaned_alarms(self
, nsr_id
):
208 log
.info("Deleting orphaned scaling alarms for network service %s", nsr_id
)
209 database
.db
.connect()
211 with database
.db
.atomic() as tx
:
213 for scaling_group
in ScalingGroupRepository
.list(ScalingGroup
.nsr_id
== nsr_id
):
214 for scaling_policy
in scaling_group
.scaling_policies
:
215 for scaling_criteria
in scaling_policy
.scaling_criterias
:
216 for alarm
in scaling_criteria
.scaling_alarms
:
218 self
.db_client
.get_vdur(nsr_id
, alarm
.vnf_member_index
, alarm
.vdu_name
)
220 log
.debug("Deleting orphaned scaling alarm %s", alarm
.alarm_uuid
)
222 await self
.mon_client
.delete_alarm(
223 alarm
.scaling_criteria
.scaling_policy
.scaling_group
.nsr_id
,
224 alarm
.vnf_member_index
,
228 log
.exception("Error deleting alarm in MON %s", alarm
.alarm_uuid
)
229 alarm
.delete_instance()
231 except Exception as e
:
232 log
.exception("Error deleting orphaned alarms:")
238 async def handle_alarm(self
, alarm_uuid
: str, status
: str):
239 await self
.update_alarm_status(alarm_uuid
, status
)
240 await self
.evaluate_policy(alarm_uuid
)
242 async def update_alarm_status(self
, alarm_uuid
: str, status
: str):
243 database
.db
.connect()
245 with database
.db
.atomic():
246 alarm
= ScalingAlarmRepository
.get(ScalingAlarm
.alarm_uuid
== alarm_uuid
)
247 alarm
.last_status
= status
249 except ScalingAlarm
.DoesNotExist
:
250 log
.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid
)
254 async def evaluate_policy(self
, alarm_uuid
):
255 database
.db
.connect()
257 with database
.db
.atomic():
258 alarm
= ScalingAlarmRepository
.get(ScalingAlarm
.alarm_uuid
== alarm_uuid
)
259 vnf_member_index
= alarm
.vnf_member_index
260 action
= alarm
.action
261 scaling_policy
= alarm
.scaling_criteria
.scaling_policy
262 if not scaling_policy
.enabled
:
264 if action
== 'scale_in':
265 operation
= scaling_policy
.scale_in_operation
266 elif action
== 'scale_out':
267 operation
= scaling_policy
.scale_out_operation
269 raise Exception('Unknown alarm action {}'.format(alarm
.action
))
270 alarms
= ScalingAlarmRepository
.list(ScalingAlarm
.scaling_criteria
== alarm
.scaling_criteria
,
271 ScalingAlarm
.action
== alarm
.action
,
272 ScalingAlarm
.vnf_member_index
== vnf_member_index
,
273 ScalingAlarm
.vdu_name
== alarm
.vdu_name
)
276 statuses
.append(alarm
.last_status
)
277 if (operation
== 'AND' and set(statuses
) == {'alarm'}) or (operation
== 'OR' and 'alarm' in statuses
):
278 delta
= datetime
.datetime
.now() - scaling_policy
.last_scale
279 if delta
.total_seconds() > scaling_policy
.cooldown_time
:
280 log
.info("Sending %s action message for ns: %s",
282 scaling_policy
.scaling_group
.nsr_id
)
283 await self
.lcm_client
.scale(scaling_policy
.scaling_group
.nsr_id
,
284 scaling_policy
.scaling_group
.name
,
287 scaling_policy
.last_scale
= datetime
.datetime
.now()
288 scaling_policy
.save()
290 except ScalingAlarm
.DoesNotExist
:
291 log
.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid
)
295 def _get_all_vnfd_monitoring_params(self
, vnfd
):
296 all_monitoring_params
= {}
297 for ivld
in vnfd
.get("int-virtual-link-desc", ()):
298 for mp
in ivld
.get("monitoring-parameters", ()):
299 all_monitoring_params
[mp
.get("id")] = mp
301 for vdu
in vnfd
.get("vdu", ()):
302 for mp
in vdu
.get("monitoring-parameter", ()):
303 all_monitoring_params
[mp
.get("id")] = mp
305 for df
in vnfd
.get("df", ()):
306 for mp
in df
.get("monitoring-parameter", ()):
307 all_monitoring_params
[mp
.get("id")] = mp
309 return all_monitoring_params
311 def _get_or_create_scaling_group(self
, nsr_id
: str, vnf_member_index
: str, scaling_aspect
: dict):
313 scaling_group_record
= ScalingGroupRepository
.get(
314 ScalingGroup
.nsr_id
== nsr_id
,
315 ScalingGroup
.vnf_member_index
== vnf_member_index
,
316 ScalingGroup
.name
== scaling_aspect
['name']
318 log
.debug("Found existing scaling group record in DB...")
319 except ScalingGroup
.DoesNotExist
:
320 log
.debug("Creating scaling group record in DB...")
321 scaling_group_record
= ScalingGroupRepository
.create(
323 vnf_member_index
=vnf_member_index
,
324 name
=scaling_aspect
['name'],
325 content
=json
.dumps(scaling_aspect
)
328 "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s",
329 scaling_group_record
.nsr_id
,
330 scaling_group_record
.vnf_member_index
,
331 scaling_group_record
.name
)
332 return scaling_group_record
334 def _get_or_create_scaling_policy(self
, nsr_id
: str, scaling_policy
: dict, scaling_group_record
: ScalingGroup
):
336 scaling_policy_record
= ScalingPolicyRepository
.get(
337 ScalingPolicy
.name
== scaling_policy
['name'],
338 ScalingGroup
.id == scaling_group_record
.id,
339 join_classes
=[ScalingGroup
]
341 log
.debug("Found existing scaling policy record in DB...")
342 except ScalingPolicy
.DoesNotExist
:
343 log
.debug("Creating scaling policy record in DB...")
344 scaling_policy_record
= ScalingPolicyRepository
.create(
346 name
=scaling_policy
['name'],
347 cooldown_time
=scaling_policy
['cooldown-time'],
348 scaling_group
=scaling_group_record
,
350 if 'scale-in-operation-type' in scaling_policy
:
351 scaling_policy_record
.scale_in_operation
= scaling_policy
['scale-in-operation-type']
352 if 'scale-out-operation-type' in scaling_policy
:
353 scaling_policy_record
.scale_out_operation
= scaling_policy
['scale-out-operation-type']
354 if 'enabled' in scaling_policy
:
355 scaling_policy_record
.enabled
= scaling_policy
['enabled']
356 scaling_policy_record
.save()
357 log
.debug("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
358 scaling_policy_record
.name
,
359 scaling_policy_record
.scaling_group
.name
)
360 return scaling_policy_record
362 def _get_or_create_scaling_criteria(self
, nsr_id
: str, scaling_criteria
: dict,
363 scaling_policy_record
: ScalingPolicy
):
365 scaling_criteria_record
= ScalingCriteriaRepository
.get(
366 ScalingPolicy
.id == scaling_policy_record
.id,
367 ScalingCriteria
.name
== scaling_criteria
['name'],
368 join_classes
=[ScalingPolicy
]
370 log
.debug("Found existing scaling criteria record in DB...")
371 except ScalingCriteria
.DoesNotExist
:
372 log
.debug("Creating scaling criteria record in DB...")
373 scaling_criteria_record
= ScalingCriteriaRepository
.create(
375 name
=scaling_criteria
['name'],
376 scaling_policy
=scaling_policy_record
379 "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
380 scaling_criteria_record
.name
,
381 scaling_criteria_record
.scaling_policy
.name
)
382 return scaling_criteria_record
384 def _get_monitored_vdurs(self
, scaling_aspect
: dict, vdurs
):
385 all_monitored_vdus
= set()
386 for delta
in scaling_aspect
.get('aspect-delta-details', {}).get('deltas', ()):
387 for vdu_delta
in delta
.get('vdu-delta', ()):
388 all_monitored_vdus
.add(vdu_delta
.get('id'))
390 monitored_vdurs
= list(filter(lambda vdur
: vdur
['vdu-id-ref'] in all_monitored_vdus
, vdurs
))
392 if not monitored_vdurs
:
394 "Scaling criteria is referring to a vnf-monitoring-param that does not "
395 "contain a reference to a vdu or vnf metric.")
396 return monitored_vdurs
398 def _get_metric_name(self
, vnf_monitoring_param
: dict):
399 if 'performance-metric' in vnf_monitoring_param
:
400 return vnf_monitoring_param
['performance-metric']
401 raise ValueError('No metric name found for vnf_monitoring_param %s' % vnf_monitoring_param
['id'])