1 # -*- coding: utf-8 -*-
2 # pylint: disable=no-member
4 # Copyright 2018 Whitestack, LLC
5 # *************************************************************
7 # This file is part of OSM Monitoring module
8 # All Rights Reserved to Whitestack, LLC
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
14 # http://www.apache.org/licenses/LICENSE-2.0
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact: bdiaz@whitestack.com or glavado@whitestack.com
32 from osm_policy_module
.common
.common_db_client
import CommonDbClient
33 from osm_policy_module
.common
.lcm_client
import LcmClient
34 from osm_policy_module
.common
.mon_client
import MonClient
35 from osm_policy_module
.core
import database
36 from osm_policy_module
.core
.config
import Config
37 from osm_policy_module
.core
.database
import (
42 ScalingAlarmRepository
,
43 ScalingGroupRepository
,
44 ScalingPolicyRepository
,
45 ScalingCriteriaRepository
,
47 from osm_policy_module
.core
.exceptions
import VdurNotFound
49 log
= logging
.getLogger(__name__
)
52 class AutoscalingService
:
53 def __init__(self
, config
: Config
, loop
=None):
56 loop
= asyncio
.get_event_loop()
58 self
.db_client
= CommonDbClient(config
)
59 self
.mon_client
= MonClient(config
, loop
=self
.loop
)
60 self
.lcm_client
= LcmClient(config
, loop
=self
.loop
)
62 async def configure_scaling_groups(self
, nsr_id
: str, vnf_member_index
=None):
64 Configures scaling groups for a network service. Creates records in DB. Creates alarms in MON.
65 :param nsr_id: Network service record id
69 "Configuring scaling groups for network service with nsr_id: %s", nsr_id
74 with database
.db
.atomic() as tx
:
76 if vnf_member_index
is None:
77 vnfrs
= self
.db_client
.get_vnfrs(nsr_id
)
80 vnfr
= self
.db_client
.get_vnfr(nsr_id
, vnf_member_index
)
82 # vnfrs = self.db_client.get_vnfrs(nsr_id)
84 log
.debug("Processing vnfr: %s", vnfr
)
85 vnfd
= self
.db_client
.get_vnfd(vnfr
["vnfd-id"])
86 # TODO: Change for multiple DF support
87 df
= vnfd
.get("df", [{}])[0]
88 if "scaling-aspect" not in df
:
89 log
.debug("No scaling aspect present in vnfd")
91 # TODO: Change for multiple instantiation levels support
92 scaling_aspects
= df
["scaling-aspect"]
93 all_vnfd_monitoring_params
= (
94 self
._get
_all
_vnfd
_monitoring
_params
(vnfd
)
96 for scaling_aspect
in scaling_aspects
:
97 scaling_group_record
= self
._get
_or
_create
_scaling
_group
(
98 nsr_id
, vnfr
["member-vnf-index-ref"], scaling_aspect
100 vdurs
= self
._get
_monitored
_vdurs
(
101 scaling_aspect
, vnfr
["vdur"]
103 for scaling_policy
in scaling_aspect
.get(
106 if scaling_policy
["scaling-type"] != "automatic":
108 scaling_policy_record
= (
109 self
._get
_or
_create
_scaling
_policy
(
110 nsr_id
, scaling_policy
, scaling_group_record
114 for scaling_criteria
in scaling_policy
[
117 scaling_criteria_record
= (
118 self
._get
_or
_create
_scaling
_criteria
(
121 scaling_policy_record
,
124 monitoring_param_ref
= scaling_criteria
.get(
125 "vnf-monitoring-param-ref"
127 vnf_monitoring_param
= all_vnfd_monitoring_params
[
132 vdu_id
= vdur
["vdu-id-ref"]
133 log
.debug("Creating alarm for vdur %s ", vdur
)
135 ScalingAlarmRepository
.get(
136 ScalingAlarm
.vdu_name
== vdur
["name"],
138 == scaling_criteria
["name"],
140 == scaling_policy
["name"],
141 ScalingGroup
.nsr_id
== nsr_id
,
149 "vdu %s already has an alarm configured",
153 except ScalingAlarm
.DoesNotExist
:
155 metric_name
= self
._get
_metric
_name
(
159 db_nsr
= self
.db_client
.get_nsr(nsr_id
)
161 if db_nsr
["_admin"].get("scaling-group"):
162 db_nsr_admin
= db_nsr
["_admin"][
168 ) in enumerate(db_nsr_admin
):
170 admin_scale_info
["name"]
171 == scaling_aspect
["name"]
173 nb_scale_op
= admin_scale_info
.get(
177 min_instance_count
= 1
178 for vdu_profile
in df
.get("vdu-profile", ()):
179 if vdu_profile
.get("id") == vdu_id
:
180 min_instance_count
= int(
182 "min-number-of-instances ", 1
186 if nb_scale_op
>= min_instance_count
:
188 await self
.mon_client
.create_alarm(
189 metric_name
=metric_name
,
191 vdu_name
=vdur
["name"],
192 vnf_member_index
=vnfr
[
193 "member-vnf-index-ref"
195 threshold
=scaling_criteria
[
198 operation
=scaling_criteria
[
199 "scale-in-relational-operation"
204 alarm
= ScalingAlarmRepository
.create(
205 alarm_uuid
=alarm_uuid
,
207 vnf_member_index
=vnfr
[
208 "member-vnf-index-ref"
210 vdu_name
=vdur
["name"],
211 scaling_criteria
=scaling_criteria_record
,
213 alarms_created
.append(alarm
)
214 alarm_uuid
= await self
.mon_client
.create_alarm(
215 metric_name
=metric_name
,
217 vdu_name
=vdur
["name"],
218 vnf_member_index
=vnfr
[
219 "member-vnf-index-ref"
221 threshold
=scaling_criteria
[
222 "scale-out-threshold"
224 operation
=scaling_criteria
[
225 "scale-out-relational-operation"
229 alarm
= ScalingAlarmRepository
.create(
230 alarm_uuid
=alarm_uuid
,
232 vnf_member_index
=vnfr
[
233 "member-vnf-index-ref"
235 vdu_name
=vdur
["name"],
236 scaling_criteria
=scaling_criteria_record
,
238 alarms_created
.append(alarm
)
240 except Exception as e
:
241 log
.exception("Error configuring scaling groups:")
243 if len(alarms_created
) > 0:
244 log
.info("Cleaning alarm resources in MON")
245 for alarm
in alarms_created
:
246 await self
.mon_client
.delete_alarm(
247 alarm
.scaling_criteria
.scaling_policy
.scaling_group
.nsr_id
,
248 alarm
.vnf_member_index
,
256 async def delete_scaling_groups(self
, nsr_id
: str, vnf_member_index
=None):
257 log
.debug("Deleting scaling groups for network service %s", nsr_id
)
258 database
.db
.connect()
260 with database
.db
.atomic() as tx
:
262 if vnf_member_index
is None:
263 scale_conditions
= ScalingGroup
.nsr_id
== nsr_id
265 query_list
= [ScalingGroup
.nsr_id
== nsr_id
,
266 ScalingGroup
.vnf_member_index
== vnf_member_index
]
267 scale_conditions
= functools
.reduce(operator
.and_
, query_list
)
268 for scaling_group
in ScalingGroupRepository
.list(scale_conditions
):
269 for scaling_policy
in scaling_group
.scaling_policies
:
270 for scaling_criteria
in scaling_policy
.scaling_criterias
:
271 for alarm
in scaling_criteria
.scaling_alarms
:
273 await self
.mon_client
.delete_alarm(
274 alarm
.scaling_criteria
.scaling_policy
.scaling_group
.nsr_id
,
275 alarm
.vnf_member_index
,
281 "Error deleting alarm in MON %s",
284 alarm
.delete_instance()
285 scaling_criteria
.delete_instance()
286 scaling_policy
.delete_instance()
287 scaling_group
.delete_instance()
289 except Exception as e
:
290 log
.exception("Error deleting scaling groups and alarms:")
296 async def delete_orphaned_alarms(self
, nsr_id
):
297 log
.info("Deleting orphaned scaling alarms for network service %s", nsr_id
)
298 database
.db
.connect()
300 with database
.db
.atomic() as tx
:
302 for scaling_group
in ScalingGroupRepository
.list(
303 ScalingGroup
.nsr_id
== nsr_id
305 for scaling_policy
in scaling_group
.scaling_policies
:
306 for scaling_criteria
in scaling_policy
.scaling_criterias
:
307 for alarm
in scaling_criteria
.scaling_alarms
:
309 self
.db_client
.get_vdur(
311 alarm
.vnf_member_index
,
316 "Deleting orphaned scaling alarm %s",
320 await self
.mon_client
.delete_alarm(
321 alarm
.scaling_criteria
.scaling_policy
.scaling_group
.nsr_id
,
322 alarm
.vnf_member_index
,
328 "Error deleting alarm in MON %s",
331 alarm
.delete_instance()
333 except Exception as e
:
334 log
.exception("Error deleting orphaned alarms:")
340 async def handle_alarm(self
, alarm_uuid
: str, status
: str):
341 await self
.update_alarm_status(alarm_uuid
, status
)
342 await self
.evaluate_policy(alarm_uuid
)
344 async def update_alarm_status(self
, alarm_uuid
: str, status
: str):
345 database
.db
.connect()
347 with database
.db
.atomic():
348 alarm
= ScalingAlarmRepository
.get(
349 ScalingAlarm
.alarm_uuid
== alarm_uuid
351 alarm
.last_status
= status
353 except ScalingAlarm
.DoesNotExist
:
355 "There is no autoscaling action configured for alarm %s.", alarm_uuid
360 async def evaluate_policy(self
, alarm_uuid
):
361 database
.db
.connect()
363 with database
.db
.atomic():
364 alarm
= ScalingAlarmRepository
.get(
365 ScalingAlarm
.alarm_uuid
== alarm_uuid
367 vnf_member_index
= alarm
.vnf_member_index
368 action
= alarm
.action
369 scaling_policy
= alarm
.scaling_criteria
.scaling_policy
370 if not scaling_policy
.enabled
:
372 if action
== "scale_in":
373 operation
= scaling_policy
.scale_in_operation
374 elif action
== "scale_out":
375 operation
= scaling_policy
.scale_out_operation
377 raise Exception("Unknown alarm action {}".format(alarm
.action
))
378 alarms
= ScalingAlarmRepository
.list(
379 ScalingAlarm
.scaling_criteria
== alarm
.scaling_criteria
,
380 ScalingAlarm
.action
== alarm
.action
,
381 ScalingAlarm
.vnf_member_index
== vnf_member_index
,
382 ScalingAlarm
.vdu_name
== alarm
.vdu_name
,
386 statuses
.append(alarm
.last_status
)
387 if (operation
== "AND" and set(statuses
) == {"alarm"}) or (
388 operation
== "OR" and "alarm" in statuses
390 delta
= datetime
.datetime
.now() - scaling_policy
.last_scale
391 if delta
.total_seconds() > scaling_policy
.cooldown_time
:
393 "Sending %s action message for ns: %s",
395 scaling_policy
.scaling_group
.nsr_id
,
397 await self
.lcm_client
.scale(
398 scaling_policy
.scaling_group
.nsr_id
,
399 scaling_policy
.scaling_group
.name
,
403 scaling_policy
.last_scale
= datetime
.datetime
.now()
404 scaling_policy
.save()
406 except ScalingAlarm
.DoesNotExist
:
408 "There is no autoscaling action configured for alarm %s.", alarm_uuid
413 def _get_all_vnfd_monitoring_params(self
, vnfd
):
414 all_monitoring_params
= {}
415 for ivld
in vnfd
.get("int-virtual-link-desc", ()):
416 for mp
in ivld
.get("monitoring-parameters", ()):
417 all_monitoring_params
[mp
.get("id")] = mp
419 for vdu
in vnfd
.get("vdu", ()):
420 for mp
in vdu
.get("monitoring-parameter", ()):
421 all_monitoring_params
[mp
.get("id")] = mp
423 for df
in vnfd
.get("df", ()):
424 for mp
in df
.get("monitoring-parameter", ()):
425 all_monitoring_params
[mp
.get("id")] = mp
427 return all_monitoring_params
429 def _get_or_create_scaling_group(
430 self
, nsr_id
: str, vnf_member_index
: str, scaling_aspect
: dict
433 scaling_group_record
= ScalingGroupRepository
.get(
434 ScalingGroup
.nsr_id
== nsr_id
,
435 ScalingGroup
.vnf_member_index
== vnf_member_index
,
436 ScalingGroup
.name
== scaling_aspect
["name"],
438 log
.debug("Found existing scaling group record in DB...")
439 except ScalingGroup
.DoesNotExist
:
440 log
.debug("Creating scaling group record in DB...")
441 scaling_group_record
= ScalingGroupRepository
.create(
443 vnf_member_index
=vnf_member_index
,
444 name
=scaling_aspect
["name"],
445 content
=json
.dumps(scaling_aspect
),
448 "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s",
449 scaling_group_record
.nsr_id
,
450 scaling_group_record
.vnf_member_index
,
451 scaling_group_record
.name
,
453 return scaling_group_record
455 def _get_or_create_scaling_policy(
456 self
, nsr_id
: str, scaling_policy
: dict, scaling_group_record
: ScalingGroup
459 scaling_policy_record
= ScalingPolicyRepository
.get(
460 ScalingPolicy
.name
== scaling_policy
["name"],
461 ScalingGroup
.id == scaling_group_record
.id,
462 join_classes
=[ScalingGroup
],
464 log
.debug("Found existing scaling policy record in DB...")
465 except ScalingPolicy
.DoesNotExist
:
466 log
.debug("Creating scaling policy record in DB...")
467 scaling_policy_record
= ScalingPolicyRepository
.create(
469 name
=scaling_policy
["name"],
470 cooldown_time
=scaling_policy
["cooldown-time"],
471 scaling_group
=scaling_group_record
,
473 if "scale-in-operation-type" in scaling_policy
:
474 scaling_policy_record
.scale_in_operation
= scaling_policy
[
475 "scale-in-operation-type"
477 if "scale-out-operation-type" in scaling_policy
:
478 scaling_policy_record
.scale_out_operation
= scaling_policy
[
479 "scale-out-operation-type"
481 if "enabled" in scaling_policy
:
482 scaling_policy_record
.enabled
= scaling_policy
["enabled"]
483 scaling_policy_record
.save()
485 "Created scaling policy record in DB : name=%s, scaling_group.name=%s",
486 scaling_policy_record
.name
,
487 scaling_policy_record
.scaling_group
.name
,
489 return scaling_policy_record
491 def _get_or_create_scaling_criteria(
492 self
, nsr_id
: str, scaling_criteria
: dict, scaling_policy_record
: ScalingPolicy
495 scaling_criteria_record
= ScalingCriteriaRepository
.get(
496 ScalingPolicy
.id == scaling_policy_record
.id,
497 ScalingCriteria
.name
== scaling_criteria
["name"],
498 join_classes
=[ScalingPolicy
],
500 log
.debug("Found existing scaling criteria record in DB...")
501 except ScalingCriteria
.DoesNotExist
:
502 log
.debug("Creating scaling criteria record in DB...")
503 scaling_criteria_record
= ScalingCriteriaRepository
.create(
505 name
=scaling_criteria
["name"],
506 scaling_policy
=scaling_policy_record
,
509 "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
510 scaling_criteria_record
.name
,
511 scaling_criteria_record
.scaling_policy
.name
,
513 return scaling_criteria_record
515 def _get_monitored_vdurs(self
, scaling_aspect
: dict, vdurs
):
516 all_monitored_vdus
= set()
517 for delta
in scaling_aspect
.get("aspect-delta-details", {}).get("deltas", ()):
518 for vdu_delta
in delta
.get("vdu-delta", ()):
519 all_monitored_vdus
.add(vdu_delta
.get("id"))
521 monitored_vdurs
= list(
522 filter(lambda vdur
: vdur
["vdu-id-ref"] in all_monitored_vdus
, vdurs
)
525 if not monitored_vdurs
:
527 "Scaling criteria is referring to a vnf-monitoring-param that does not "
528 "contain a reference to a vdu or vnf metric."
530 return monitored_vdurs
532 def _get_metric_name(self
, vnf_monitoring_param
: dict):
533 if "performance-metric" in vnf_monitoring_param
:
534 return vnf_monitoring_param
["performance-metric"]
536 "No metric name found for vnf_monitoring_param %s"
537 % vnf_monitoring_param
["id"]