# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import asyncio
import datetime
import json
import logging
+import operator
+import functools
from osm_policy_module.common.common_db_client import CommonDbClient
from osm_policy_module.common.lcm_client import LcmClient
class AutoscalingService:
- def __init__(self, config: Config, loop=None):
+ def __init__(self, config: Config):
self.conf = config
- if not loop:
- loop = asyncio.get_event_loop()
- self.loop = loop
self.db_client = CommonDbClient(config)
- self.mon_client = MonClient(config, loop=self.loop)
- self.lcm_client = LcmClient(config, loop=self.loop)
+ self.mon_client = MonClient(config)
+ self.lcm_client = LcmClient(config)
- async def configure_scaling_groups(self, nsr_id: str):
+ async def configure_scaling_groups(self, nsr_id: str, vnf_member_index=None):
"""
Configures scaling groups for a network service. Creates records in DB. Creates alarms in MON.
:param nsr_id: Network service record id
try:
with database.db.atomic() as tx:
try:
- vnfrs = self.db_client.get_vnfrs(nsr_id)
+ if vnf_member_index is None:
+ vnfrs = self.db_client.get_vnfrs(nsr_id)
+ else:
+ vnfrs = []
+ vnfr = self.db_client.get_vnfr(nsr_id, vnf_member_index)
+ vnfrs.append(vnfr)
+ # vnfrs = self.db_client.get_vnfrs(nsr_id)
for vnfr in vnfrs:
log.debug("Processing vnfr: %s", vnfr)
vnfd = self.db_client.get_vnfd(vnfr["vnfd-id"])
operation=scaling_criteria[
"scale-in-relational-operation"
],
+ action="scale_in",
)
)
alarm = ScalingAlarmRepository.create(
operation=scaling_criteria[
"scale-out-relational-operation"
],
+ action="scale_out",
)
alarm = ScalingAlarmRepository.create(
alarm_uuid=alarm_uuid,
finally:
database.db.close()
- async def delete_scaling_groups(self, nsr_id: str):
+ async def delete_scaling_groups(self, nsr_id: str, vnf_member_index=None):
log.debug("Deleting scaling groups for network service %s", nsr_id)
database.db.connect()
try:
with database.db.atomic() as tx:
try:
- for scaling_group in ScalingGroupRepository.list(
- ScalingGroup.nsr_id == nsr_id
- ):
+ if vnf_member_index is None:
+ scale_conditions = ScalingGroup.nsr_id == nsr_id
+ else:
+ query_list = [
+ ScalingGroup.nsr_id == nsr_id,
+ ScalingGroup.vnf_member_index == vnf_member_index,
+ ]
+ scale_conditions = functools.reduce(operator.and_, query_list)
+ for scaling_group in ScalingGroupRepository.list(scale_conditions):
for scaling_policy in scaling_group.scaling_policies:
for scaling_criteria in scaling_policy.scaling_criterias:
for alarm in scaling_criteria.scaling_alarms:
async def evaluate_policy(self, alarm_uuid):
database.db.connect()
try:
- with database.db.atomic():
- alarm = ScalingAlarmRepository.get(
- ScalingAlarm.alarm_uuid == alarm_uuid
- )
- vnf_member_index = alarm.vnf_member_index
- action = alarm.action
- scaling_policy = alarm.scaling_criteria.scaling_policy
- if not scaling_policy.enabled:
- return
- if action == "scale_in":
- operation = scaling_policy.scale_in_operation
- elif action == "scale_out":
- operation = scaling_policy.scale_out_operation
- else:
- raise Exception("Unknown alarm action {}".format(alarm.action))
- alarms = ScalingAlarmRepository.list(
- ScalingAlarm.scaling_criteria == alarm.scaling_criteria,
- ScalingAlarm.action == alarm.action,
- ScalingAlarm.vnf_member_index == vnf_member_index,
- ScalingAlarm.vdu_name == alarm.vdu_name,
- )
- statuses = []
- for alarm in alarms:
- statuses.append(alarm.last_status)
- if (operation == "AND" and set(statuses) == {"alarm"}) or (
- operation == "OR" and "alarm" in statuses
- ):
- delta = datetime.datetime.now() - scaling_policy.last_scale
- if delta.total_seconds() > scaling_policy.cooldown_time:
- log.info(
- "Sending %s action message for ns: %s",
- alarm.action,
- scaling_policy.scaling_group.nsr_id,
- )
- await self.lcm_client.scale(
- scaling_policy.scaling_group.nsr_id,
- scaling_policy.scaling_group.name,
- vnf_member_index,
- action,
- )
- scaling_policy.last_scale = datetime.datetime.now()
- scaling_policy.save()
+ if self.conf.get("autoscale", "enabled") == "True":
+ with database.db.atomic():
+ alarm = ScalingAlarmRepository.get(
+ ScalingAlarm.alarm_uuid == alarm_uuid
+ )
+ vnf_member_index = alarm.vnf_member_index
+ action = alarm.action
+ scaling_policy = alarm.scaling_criteria.scaling_policy
+ if not scaling_policy.enabled:
+ return
+ if action == "scale_in":
+ operation = scaling_policy.scale_in_operation
+ elif action == "scale_out":
+ operation = scaling_policy.scale_out_operation
+ else:
+ raise Exception("Unknown alarm action {}".format(alarm.action))
+ alarms = ScalingAlarmRepository.list(
+ ScalingAlarm.scaling_criteria == alarm.scaling_criteria,
+ ScalingAlarm.action == alarm.action,
+ ScalingAlarm.vnf_member_index == vnf_member_index,
+ ScalingAlarm.vdu_name == alarm.vdu_name,
+ )
+ statuses = []
+ for alarm in alarms:
+ statuses.append(alarm.last_status)
+ if (operation == "AND" and set(statuses) == {"alarm"}) or (
+ operation == "OR" and "alarm" in statuses
+ ):
+ delta = datetime.datetime.now() - scaling_policy.last_scale
+ if delta.total_seconds() > scaling_policy.cooldown_time:
+ log.info(
+ "Sending %s action message for ns: %s",
+ alarm.action,
+ scaling_policy.scaling_group.nsr_id,
+ )
+ await self.lcm_client.scale(
+ scaling_policy.scaling_group.nsr_id,
+ scaling_policy.scaling_group.name,
+ vnf_member_index,
+ action,
+ )
+ scaling_policy.last_scale = datetime.datetime.now()
+ scaling_policy.save()
except ScalingAlarm.DoesNotExist:
log.debug(