X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_policy_module%2Fautoscaling%2Fservice.py;h=5869785d74975b1dc32c8e79fa695aac2eac997b;hb=0f6b1c44ccb844079e0359c8bcda880902bae8b3;hp=a8c36e00f4e343753aaad7e389d4ea71f696176e;hpb=e9228cf9ae98c524d477cf823a59515864aba07e;p=osm%2FPOL.git diff --git a/osm_policy_module/autoscaling/service.py b/osm_policy_module/autoscaling/service.py index a8c36e0..5869785 100644 --- a/osm_policy_module/autoscaling/service.py +++ b/osm_policy_module/autoscaling/service.py @@ -22,10 +22,11 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## -import asyncio import datetime import json import logging +import operator +import functools from osm_policy_module.common.common_db_client import CommonDbClient from osm_policy_module.common.lcm_client import LcmClient @@ -48,16 +49,13 @@ log = logging.getLogger(__name__) class AutoscalingService: - def __init__(self, config: Config, loop=None): + def __init__(self, config: Config): self.conf = config - if not loop: - loop = asyncio.get_event_loop() - self.loop = loop self.db_client = CommonDbClient(config) - self.mon_client = MonClient(config, loop=self.loop) - self.lcm_client = LcmClient(config, loop=self.loop) + self.mon_client = MonClient(config) + self.lcm_client = LcmClient(config) - async def configure_scaling_groups(self, nsr_id: str): + async def configure_scaling_groups(self, nsr_id: str, vnf_member_index=None): """ Configures scaling groups for a network service. Creates records in DB. Creates alarms in MON. :param nsr_id: Network service record id @@ -71,7 +69,13 @@ class AutoscalingService: try: with database.db.atomic() as tx: try: - vnfrs = self.db_client.get_vnfrs(nsr_id) + if vnf_member_index is None: + vnfrs = self.db_client.get_vnfrs(nsr_id) + else: + vnfrs = [] + vnfr = self.db_client.get_vnfr(nsr_id, vnf_member_index) + vnfrs.append(vnfr) + # vnfrs = self.db_client.get_vnfrs(nsr_id) for vnfr in vnfrs: log.debug("Processing vnfr: %s", vnfr) vnfd = self.db_client.get_vnfd(vnfr["vnfd-id"]) @@ -190,7 +194,7 @@ class AutoscalingService: operation=scaling_criteria[ "scale-in-relational-operation" ], - action="scale_in" + action="scale_in", ) ) alarm = ScalingAlarmRepository.create( @@ -245,15 +249,21 @@ class AutoscalingService: finally: database.db.close() - async def delete_scaling_groups(self, nsr_id: str): + async def delete_scaling_groups(self, nsr_id: str, vnf_member_index=None): log.debug("Deleting scaling groups for network service %s", nsr_id) database.db.connect() try: with database.db.atomic() as tx: try: - for scaling_group in ScalingGroupRepository.list( - ScalingGroup.nsr_id == nsr_id - ): + if vnf_member_index is None: + scale_conditions = ScalingGroup.nsr_id == nsr_id + else: + query_list = [ + ScalingGroup.nsr_id == nsr_id, + ScalingGroup.vnf_member_index == vnf_member_index, + ] + scale_conditions = functools.reduce(operator.and_, query_list) + for scaling_group in ScalingGroupRepository.list(scale_conditions): for scaling_policy in scaling_group.scaling_policies: for scaling_criteria in scaling_policy.scaling_criterias: for alarm in scaling_criteria.scaling_alarms: @@ -348,48 +358,49 @@ class AutoscalingService: async def evaluate_policy(self, alarm_uuid): database.db.connect() try: - with database.db.atomic(): - alarm = ScalingAlarmRepository.get( - ScalingAlarm.alarm_uuid == alarm_uuid - ) - vnf_member_index = alarm.vnf_member_index - action = alarm.action - scaling_policy = alarm.scaling_criteria.scaling_policy - if not scaling_policy.enabled: - return - if action == "scale_in": - operation = scaling_policy.scale_in_operation - elif action == "scale_out": - operation = scaling_policy.scale_out_operation - else: - raise Exception("Unknown alarm action {}".format(alarm.action)) - alarms = ScalingAlarmRepository.list( - ScalingAlarm.scaling_criteria == alarm.scaling_criteria, - ScalingAlarm.action == alarm.action, - ScalingAlarm.vnf_member_index == vnf_member_index, - ScalingAlarm.vdu_name == alarm.vdu_name, - ) - statuses = [] - for alarm in alarms: - statuses.append(alarm.last_status) - if (operation == "AND" and set(statuses) == {"alarm"}) or ( - operation == "OR" and "alarm" in statuses - ): - delta = datetime.datetime.now() - scaling_policy.last_scale - if delta.total_seconds() > scaling_policy.cooldown_time: - log.info( - "Sending %s action message for ns: %s", - alarm.action, - scaling_policy.scaling_group.nsr_id, - ) - await self.lcm_client.scale( - scaling_policy.scaling_group.nsr_id, - scaling_policy.scaling_group.name, - vnf_member_index, - action, - ) - scaling_policy.last_scale = datetime.datetime.now() - scaling_policy.save() + if self.conf.get("autoscale", "enabled") == "True": + with database.db.atomic(): + alarm = ScalingAlarmRepository.get( + ScalingAlarm.alarm_uuid == alarm_uuid + ) + vnf_member_index = alarm.vnf_member_index + action = alarm.action + scaling_policy = alarm.scaling_criteria.scaling_policy + if not scaling_policy.enabled: + return + if action == "scale_in": + operation = scaling_policy.scale_in_operation + elif action == "scale_out": + operation = scaling_policy.scale_out_operation + else: + raise Exception("Unknown alarm action {}".format(alarm.action)) + alarms = ScalingAlarmRepository.list( + ScalingAlarm.scaling_criteria == alarm.scaling_criteria, + ScalingAlarm.action == alarm.action, + ScalingAlarm.vnf_member_index == vnf_member_index, + ScalingAlarm.vdu_name == alarm.vdu_name, + ) + statuses = [] + for alarm in alarms: + statuses.append(alarm.last_status) + if (operation == "AND" and set(statuses) == {"alarm"}) or ( + operation == "OR" and "alarm" in statuses + ): + delta = datetime.datetime.now() - scaling_policy.last_scale + if delta.total_seconds() > scaling_policy.cooldown_time: + log.info( + "Sending %s action message for ns: %s", + alarm.action, + scaling_policy.scaling_group.nsr_id, + ) + await self.lcm_client.scale( + scaling_policy.scaling_group.nsr_id, + scaling_policy.scaling_group.name, + vnf_member_index, + action, + ) + scaling_policy.last_scale = datetime.datetime.now() + scaling_policy.save() except ScalingAlarm.DoesNotExist: log.debug(