--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import asyncio
+import json
+import logging
+
+import requests
+
+from osm_policy_module.common.common_db_client import CommonDbClient
+from osm_policy_module.common.lcm_client import LcmClient
+from osm_policy_module.common.mon_client import MonClient
+from osm_policy_module.core import database
+from osm_policy_module.core.config import Config
+from osm_policy_module.core.database import VnfAlarm, VnfAlarmRepository, AlarmActionRepository
+from osm_policy_module.core.exceptions import VdurNotFound
+
+log = logging.getLogger(__name__)
+
+
+class AlarmingService:
+
+ def __init__(self, config: Config, loop=None):
+ self.conf = config
+ if not loop:
+ loop = asyncio.get_event_loop()
+ self.loop = loop
+ self.db_client = CommonDbClient(config)
+ self.mon_client = MonClient(config, loop=self.loop)
+ self.lcm_client = LcmClient(config, loop=self.loop)
+
+ async def configure_vnf_alarms(self, nsr_id: str):
+ log.info("Configuring vnf alarms for network service %s", nsr_id)
+ alarms_created = []
+ database.db.connect(reuse_if_open=True)
+ with database.db.atomic() as tx:
+ try:
+ vnfrs = self.db_client.get_vnfrs(nsr_id)
+ for vnfr in vnfrs:
+ log.debug("Processing vnfr: %s", vnfr)
+ vnfd = self.db_client.get_vnfd(vnfr['vnfd-id'])
+ for vdur in vnfr['vdur']:
+ vdu = next(
+ filter(
+ lambda vdu: vdu['id'] == vdur['vdu-id-ref'],
+ vnfd['vdu']
+ )
+ )
+ if 'alarm' in vdu:
+ alarm_descriptors = vdu['alarm']
+ for alarm_descriptor in alarm_descriptors:
+ try:
+ VnfAlarmRepository.get(
+ VnfAlarm.alarm_id == alarm_descriptor['alarm-id'],
+ VnfAlarm.vnf_member_index == vnfr['member-vnf-index-ref'],
+ VnfAlarm.vdu_name == vdur['name'],
+ VnfAlarm.nsr_id == nsr_id
+ )
+ log.debug("vdu %s already has an alarm configured with same id %s", vdur['name'],
+ alarm_descriptor['alarm-id'])
+ continue
+ except VnfAlarm.DoesNotExist:
+ pass
+ vnf_monitoring_param = next(
+ filter(
+ lambda param: param['id'] == alarm_descriptor['vnf-monitoring-param-ref'],
+ vnfd['monitoring-param'])
+ )
+ alarm_uuid = await self.mon_client.create_alarm(
+ metric_name=alarm_descriptor['vnf-monitoring-param-ref'],
+ ns_id=nsr_id,
+ vdu_name=vdur['name'],
+ vnf_member_index=vnfr['member-vnf-index-ref'],
+ threshold=alarm_descriptor['value'],
+ operation=alarm_descriptor['operation'],
+ statistic=vnf_monitoring_param['aggregation-type']
+ )
+ alarm = VnfAlarmRepository.create(
+ alarm_id=alarm_descriptor['alarm-id'],
+ alarm_uuid=alarm_uuid,
+ nsr_id=nsr_id,
+ vnf_member_index=int(vnfr['member-vnf-index-ref']),
+ vdu_name=vdur['name']
+ )
+ for action_type in ['ok', 'insufficient-data', 'alarm']:
+ if action_type in alarm_descriptor['actions']:
+ for url in alarm_descriptor['actions'][action_type]:
+ AlarmActionRepository.create(
+ type=action_type,
+ url=url['url'],
+ alarm=alarm
+ )
+ alarms_created.append(alarm)
+
+ except Exception as e:
+ log.exception("Error configuring VNF alarms:")
+ tx.rollback()
+ if len(alarms_created) > 0:
+ log.debug("Cleaning alarm resources in MON")
+ for alarm in alarms_created:
+ await self.mon_client.delete_alarm(alarm.nsr_id,
+ alarm.vnf_member_index,
+ alarm.vdu_name,
+ alarm.alarm_uuid)
+ raise e
+ database.db.close()
+
+ async def delete_orphaned_alarms(self, nsr_id):
+ log.info("Deleting orphaned vnf alarms for network service %s", nsr_id)
+ database.db.connect()
+ with database.db.atomic() as tx:
+ try:
+ for alarm in VnfAlarmRepository.list(VnfAlarm.nsr_id == nsr_id):
+ try:
+ self.db_client.get_vdur(nsr_id, alarm.vnf_member_index, alarm.vdu_name)
+ except VdurNotFound:
+ log.debug("Deleting orphaned alarm %s", alarm.alarm_uuid)
+ try:
+ await self.mon_client.delete_alarm(
+ alarm.nsr_id,
+ alarm.vnf_member_index,
+ alarm.vdu_name,
+ alarm.alarm_uuid)
+ except ValueError:
+ log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid)
+ alarm.delete_instance()
+
+ except Exception as e:
+ log.exception("Error deleting orphaned alarms:")
+ tx.rollback()
+ raise e
+ database.db.close()
+
+ async def delete_vnf_alarms(self, nsr_id):
+ log.info("Deleting vnf alarms for network service %s", nsr_id)
+ database.db.connect()
+ with database.db.atomic() as tx:
+ try:
+ for alarm in VnfAlarmRepository.list(VnfAlarm.nsr_id == nsr_id):
+ log.debug("Deleting vnf alarm %s", alarm.alarm_uuid)
+ try:
+ await self.mon_client.delete_alarm(
+ alarm.nsr_id,
+ alarm.vnf_member_index,
+ alarm.vdu_name,
+ alarm.alarm_uuid)
+ except ValueError:
+ log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid)
+ alarm.delete_instance()
+
+ except Exception as e:
+ log.exception("Error deleting orphaned alarms:")
+ tx.rollback()
+ raise e
+ database.db.close()
+
+ async def handle_alarm(self, alarm_uuid: str, status: str, payload: dict):
+ database.db.connect()
+ try:
+ with database.db.atomic():
+ alarm = VnfAlarmRepository.get(VnfAlarm.alarm_uuid == alarm_uuid)
+ log.debug("Handling vnf alarm %s with status %s", alarm.alarm_id, status)
+ for action in alarm.actions:
+ if action.type == status:
+ log.info("Executing request to url %s for vnf alarm %s with status %s", action.url,
+ alarm.alarm_id, status)
+ requests.post(url=action.url, json=json.dumps(payload))
+ except VnfAlarm.DoesNotExist:
+ log.debug("There is no alarming action configured for alarm %s.", alarm_uuid)
+ finally:
+ database.db.close()
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-# Copyright 2018 Whitestack, LLC
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Whitestack, LLC
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: bdiaz@whitestack.com or glavado@whitestack.com
-##
-import asyncio
-import logging
-
-import peewee
-
-from osm_policy_module.autoscaling.service import Service
-from osm_policy_module.common.message_bus_client import MessageBusClient
-from osm_policy_module.core.config import Config
-from osm_policy_module.core.database import ScalingAlarm
-
-log = logging.getLogger(__name__)
-
-ALLOWED_KAFKA_KEYS = ['instantiated', 'scaled', 'terminated', 'notify_alarm']
-
-
-class PolicyModuleAgent:
- def __init__(self, config: Config, loop=None):
- self.conf = config
- if not loop:
- loop = asyncio.get_event_loop()
- self.loop = loop
- self.msg_bus = MessageBusClient(config)
- self.service = Service(config, loop)
-
- def run(self):
- self.loop.run_until_complete(self.start())
-
- async def start(self):
- topics = [
- "ns",
- "alarm_response"
- ]
- await self.msg_bus.aioread(topics, self._process_msg)
- log.critical("Exiting...")
-
- async def _process_msg(self, topic, key, msg):
- log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg)
- log.info("Message arrived: %s", msg)
- try:
- if key in ALLOWED_KAFKA_KEYS:
-
- if key == 'instantiated':
- await self._handle_instantiated(msg)
-
- if key == 'scaled':
- await self._handle_scaled(msg)
-
- if key == 'terminated':
- await self._handle_terminated(msg)
-
- if key == 'notify_alarm':
- await self._handle_alarm_notification(msg)
- else:
- log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key)
- except peewee.PeeweeException:
- log.exception("Database error consuming message: ")
- raise
- except Exception:
- log.exception("Error consuming message: ")
-
- async def _handle_alarm_notification(self, content):
- log.debug("_handle_alarm_notification: %s", content)
- alarm_uuid = content['notify_details']['alarm_uuid']
- metric_name = content['notify_details']['metric_name']
- operation = content['notify_details']['operation']
- threshold = content['notify_details']['threshold_value']
- vdu_name = content['notify_details']['vdu_name']
- vnf_member_index = content['notify_details']['vnf_member_index']
- nsr_id = content['notify_details']['ns_id']
- log.info(
- "Received alarm notification for alarm %s, \
- metric %s, \
- operation %s, \
- threshold %s, \
- vdu_name %s, \
- vnf_member_index %s, \
- ns_id %s ",
- alarm_uuid, metric_name, operation, threshold, vdu_name, vnf_member_index, nsr_id)
- try:
- alarm = self.service.get_alarm(alarm_uuid)
- await self.service.scale(alarm)
- except ScalingAlarm.DoesNotExist:
- log.info("There is no action configured for alarm %s.", alarm_uuid)
-
- async def _handle_instantiated(self, content):
- log.debug("_handle_instantiated: %s", content)
- nslcmop_id = content['nslcmop_id']
- nslcmop = self.service.get_nslcmop(nslcmop_id)
- if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
- nsr_id = nslcmop['nsInstanceId']
- log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
- await self.service.configure_scaling_groups(nsr_id)
- else:
- log.info(
- "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
- "Current state is %s. Skipping...",
- nslcmop['operationState'])
-
- async def _handle_scaled(self, content):
- log.debug("_handle_scaled: %s", content)
- nslcmop_id = content['nslcmop_id']
- nslcmop = self.service.get_nslcmop(nslcmop_id)
- if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
- nsr_id = nslcmop['nsInstanceId']
- log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
- await self.service.configure_scaling_groups(nsr_id)
- log.info("Checking for orphaned alarms to be deleted for network service with nsr_id: %s", nsr_id)
- await self.service.delete_orphaned_alarms(nsr_id)
- else:
- log.info(
- "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
- "Current state is %s. Skipping...",
- nslcmop['operationState'])
-
- async def _handle_terminated(self, content):
- log.debug("_handle_deleted: %s", content)
- nsr_id = content['nsr_id']
- if content['operationState'] == 'COMPLETED' or content['operationState'] == 'PARTIALLY_COMPLETED':
- log.info("Deleting scaling groups and alarms for network service with nsr_id: %s", nsr_id)
- await self.service.delete_scaling_groups(nsr_id)
- else:
- log.info(
- "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
- "Current state is %s. Skipping...",
- content['operationState'])
from osm_policy_module.common.mon_client import MonClient
from osm_policy_module.core import database
from osm_policy_module.core.config import Config
-from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria
+from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria, \
+ ScalingAlarmRepository, ScalingGroupRepository, ScalingPolicyRepository, ScalingCriteriaRepository
from osm_policy_module.core.exceptions import VdurNotFound
from osm_policy_module.utils.vnfd import VnfdUtils
log = logging.getLogger(__name__)
-class Service:
+class AutoscalingService:
def __init__(self, config: Config, loop=None):
self.conf = config
self.lcm_client = LcmClient(config, loop=self.loop)
async def configure_scaling_groups(self, nsr_id: str):
- log.debug("_configure_scaling_groups: %s", nsr_id)
+ log.info("Configuring scaling groups for network service with nsr_id: %s",
+ nsr_id)
alarms_created = []
database.db.connect()
- with database.db.atomic() as tx:
- try:
- vnfrs = self.db_client.get_vnfrs(nsr_id)
- for vnfr in vnfrs:
- log.info("Processing vnfr: %s", vnfr)
- vnfd = self.db_client.get_vnfd(vnfr['vnfd-id'])
- log.info("Looking for vnfd %s", vnfr['vnfd-id'])
- if 'scaling-group-descriptor' not in vnfd:
- continue
- scaling_groups = vnfd['scaling-group-descriptor']
- vnf_monitoring_params = vnfd['monitoring-param']
- for scaling_group in scaling_groups:
- try:
- scaling_group_record = ScalingGroup.select().where(
- ScalingGroup.nsr_id == nsr_id,
- ScalingGroup.vnf_member_index == int(vnfr['member-vnf-index-ref']),
- ScalingGroup.name == scaling_group['name']
- ).get()
- log.info("Found existing scaling group record in DB...")
- except ScalingGroup.DoesNotExist:
- log.info("Creating scaling group record in DB...")
- scaling_group_record = ScalingGroup.create(
- nsr_id=nsr_id,
- vnf_member_index=vnfr['member-vnf-index-ref'],
- name=scaling_group['name'],
- content=json.dumps(scaling_group)
- )
- log.info(
- "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s",
- scaling_group_record.nsr_id,
- scaling_group_record.vnf_member_index,
- scaling_group_record.name)
- for scaling_policy in scaling_group['scaling-policy']:
- if scaling_policy['scaling-type'] != 'automatic':
- continue
+ try:
+ with database.db.atomic() as tx:
+ try:
+ vnfrs = self.db_client.get_vnfrs(nsr_id)
+ for vnfr in vnfrs:
+ log.debug("Processing vnfr: %s", vnfr)
+ vnfd = self.db_client.get_vnfd(vnfr['vnfd-id'])
+ if 'scaling-group-descriptor' not in vnfd:
+ log.debug("No scaling group present in vnfd")
+ continue
+ scaling_groups = vnfd['scaling-group-descriptor']
+ vnf_monitoring_params = vnfd['monitoring-param']
+ for scaling_group in scaling_groups:
try:
- scaling_policy_record = ScalingPolicy.select().join(ScalingGroup).where(
- ScalingPolicy.name == scaling_policy['name'],
- ScalingGroup.id == scaling_group_record.id
- ).get()
- log.info("Found existing scaling policy record in DB...")
- except ScalingPolicy.DoesNotExist:
- log.info("Creating scaling policy record in DB...")
- scaling_policy_record = ScalingPolicy.create(
+ scaling_group_record = ScalingGroupRepository.get(
+ ScalingGroup.nsr_id == nsr_id,
+ ScalingGroup.vnf_member_index == int(vnfr['member-vnf-index-ref']),
+ ScalingGroup.name == scaling_group['name']
+ )
+ log.debug("Found existing scaling group record in DB...")
+ except ScalingGroup.DoesNotExist:
+ log.debug("Creating scaling group record in DB...")
+ scaling_group_record = ScalingGroupRepository.create(
nsr_id=nsr_id,
- name=scaling_policy['name'],
- cooldown_time=scaling_policy['cooldown-time'],
- scaling_group=scaling_group_record
+ vnf_member_index=vnfr['member-vnf-index-ref'],
+ name=scaling_group['name'],
+ content=json.dumps(scaling_group)
)
- log.info("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
- scaling_policy_record.name,
- scaling_policy_record.scaling_group.name)
-
- for scaling_criteria in scaling_policy['scaling-criteria']:
+ log.debug(
+ "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s",
+ scaling_group_record.nsr_id,
+ scaling_group_record.vnf_member_index,
+ scaling_group_record.name)
+ for scaling_policy in scaling_group['scaling-policy']:
+ if scaling_policy['scaling-type'] != 'automatic':
+ continue
try:
- scaling_criteria_record = ScalingCriteria.select().join(ScalingPolicy).where(
- ScalingPolicy.id == scaling_policy_record.id,
- ScalingCriteria.name == scaling_criteria['name']
- ).get()
- log.info("Found existing scaling criteria record in DB...")
- except ScalingCriteria.DoesNotExist:
- log.info("Creating scaling criteria record in DB...")
- scaling_criteria_record = ScalingCriteria.create(
+ scaling_policy_record = ScalingPolicyRepository.get(
+ ScalingPolicy.name == scaling_policy['name'],
+ ScalingGroup.id == scaling_group_record.id,
+ join_classes=[ScalingGroup]
+ )
+ log.debug("Found existing scaling policy record in DB...")
+ except ScalingPolicy.DoesNotExist:
+ log.debug("Creating scaling policy record in DB...")
+ scaling_policy_record = ScalingPolicyRepository.create(
nsr_id=nsr_id,
- name=scaling_criteria['name'],
- scaling_policy=scaling_policy_record
+ name=scaling_policy['name'],
+ cooldown_time=scaling_policy['cooldown-time'],
+ scaling_group=scaling_group_record,
)
- log.info(
- "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
- scaling_criteria_record.name,
- scaling_criteria_record.scaling_policy.name)
+ if 'scale-in-operation-type' in scaling_policy:
+ scaling_policy_record.scale_in_operation = scaling_policy[
+ 'scale-in-operation-type']
+ if 'scale-out-operation-type' in scaling_policy:
+ scaling_policy_record.scale_out_operation = scaling_policy[
+ 'scale-out-operation-type']
+ if 'enabled' in scaling_policy:
+ scaling_policy_record.enabled = scaling_policy['enabled']
+ scaling_policy_record.save()
+ log.debug("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
+ scaling_policy_record.name,
+ scaling_policy_record.scaling_group.name)
- vnf_monitoring_param = next(
- filter(
- lambda param: param['id'] == scaling_criteria[
- 'vnf-monitoring-param-ref'
- ],
- vnf_monitoring_params)
- )
- if 'vdu-monitoring-param' in vnf_monitoring_param:
- vdurs = list(
- filter(
- lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param
- ['vdu-monitoring-param']
- ['vdu-ref'],
- vnfr['vdur']
+ for scaling_criteria in scaling_policy['scaling-criteria']:
+ try:
+ scaling_criteria_record = ScalingCriteriaRepository.get(
+ ScalingPolicy.id == scaling_policy_record.id,
+ ScalingCriteria.name == scaling_criteria['name'],
+ join_classes=[ScalingPolicy]
)
- )
- elif 'vdu-metric' in vnf_monitoring_param:
- vdurs = list(
- filter(
- lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param
- ['vdu-metric']
- ['vdu-ref'],
- vnfr['vdur']
+ log.debug("Found existing scaling criteria record in DB...")
+ except ScalingCriteria.DoesNotExist:
+ log.debug("Creating scaling criteria record in DB...")
+ scaling_criteria_record = ScalingCriteriaRepository.create(
+ nsr_id=nsr_id,
+ name=scaling_criteria['name'],
+ scaling_policy=scaling_policy_record
)
- )
- elif 'vnf-metric' in vnf_monitoring_param:
- vdu = VnfdUtils.get_mgmt_vdu(vnfd)
- vdurs = list(
+ log.debug(
+ "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
+ scaling_criteria_record.name,
+ scaling_criteria_record.scaling_policy.name)
+
+ vnf_monitoring_param = next(
filter(
- lambda vdur: vdur['vdu-id-ref'] == vdu['id'],
- vnfr['vdur']
- )
+ lambda param: param['id'] == scaling_criteria[
+ 'vnf-monitoring-param-ref'
+ ],
+ vnf_monitoring_params)
)
- else:
- log.warning(
- "Scaling criteria is referring to a vnf-monitoring-param that does not "
- "contain a reference to a vdu or vnf metric.")
- continue
- for vdur in vdurs:
- log.info("Creating alarm for vdur %s ", vdur)
- try:
- (ScalingAlarm.select()
- .join(ScalingCriteria)
- .join(ScalingPolicy)
- .join(ScalingGroup)
- .where(
- ScalingAlarm.vdu_name == vdur['name'],
- ScalingCriteria.name == scaling_criteria['name'],
- ScalingPolicy.name == scaling_policy['name'],
- ScalingGroup.nsr_id == nsr_id
- ).get())
- log.debug("vdu %s already has an alarm configured", vdur['name'])
+ if 'vdu-monitoring-param' in vnf_monitoring_param:
+ vdurs = list(
+ filter(
+ lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param
+ ['vdu-monitoring-param']
+ ['vdu-ref'],
+ vnfr['vdur']
+ )
+ )
+ elif 'vdu-metric' in vnf_monitoring_param:
+ vdurs = list(
+ filter(
+ lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param
+ ['vdu-metric']
+ ['vdu-ref'],
+ vnfr['vdur']
+ )
+ )
+ elif 'vnf-metric' in vnf_monitoring_param:
+ vdu = VnfdUtils.get_mgmt_vdu(vnfd)
+ vdurs = list(
+ filter(
+ lambda vdur: vdur['vdu-id-ref'] == vdu['id'],
+ vnfr['vdur']
+ )
+ )
+ else:
+ log.warning(
+ "Scaling criteria is referring to a vnf-monitoring-param that does not "
+ "contain a reference to a vdu or vnf metric.")
continue
- except ScalingAlarm.DoesNotExist:
- pass
- alarm_uuid = await self.mon_client.create_alarm(
- metric_name=vnf_monitoring_param['id'],
- ns_id=nsr_id,
- vdu_name=vdur['name'],
- vnf_member_index=vnfr['member-vnf-index-ref'],
- threshold=scaling_criteria['scale-in-threshold'],
- operation=scaling_criteria['scale-in-relational-operation'],
- statistic=vnf_monitoring_param['aggregation-type']
- )
- alarm = ScalingAlarm.create(
- alarm_uuid=alarm_uuid,
- action='scale_in',
- vnf_member_index=int(vnfr['member-vnf-index-ref']),
- vdu_name=vdur['name'],
- scaling_criteria=scaling_criteria_record
- )
- alarms_created.append(alarm)
- alarm_uuid = await self.mon_client.create_alarm(
- metric_name=vnf_monitoring_param['id'],
- ns_id=nsr_id,
- vdu_name=vdur['name'],
- vnf_member_index=vnfr['member-vnf-index-ref'],
- threshold=scaling_criteria['scale-out-threshold'],
- operation=scaling_criteria['scale-out-relational-operation'],
- statistic=vnf_monitoring_param['aggregation-type']
- )
- alarm = ScalingAlarm.create(
- alarm_uuid=alarm_uuid,
- action='scale_out',
- vnf_member_index=int(vnfr['member-vnf-index-ref']),
- vdu_name=vdur['name'],
- scaling_criteria=scaling_criteria_record
- )
- alarms_created.append(alarm)
+ for vdur in vdurs:
+ log.debug("Creating alarm for vdur %s ", vdur)
+ try:
+ ScalingAlarmRepository.get(ScalingAlarm.vdu_name == vdur['name'],
+ ScalingCriteria.name == scaling_criteria['name'],
+ ScalingPolicy.name == scaling_policy['name'],
+ ScalingGroup.nsr_id == nsr_id,
+ join_classes=[ScalingCriteria,
+ ScalingPolicy,
+ ScalingGroup])
+ log.debug("vdu %s already has an alarm configured", vdur['name'])
+ continue
+ except ScalingAlarm.DoesNotExist:
+ pass
+ alarm_uuid = await self.mon_client.create_alarm(
+ metric_name=vnf_monitoring_param['id'],
+ ns_id=nsr_id,
+ vdu_name=vdur['name'],
+ vnf_member_index=vnfr['member-vnf-index-ref'],
+ threshold=scaling_criteria['scale-in-threshold'],
+ operation=scaling_criteria['scale-in-relational-operation'],
+ statistic=vnf_monitoring_param['aggregation-type']
+ )
+ alarm = ScalingAlarmRepository.create(
+ alarm_uuid=alarm_uuid,
+ action='scale_in',
+ vnf_member_index=int(vnfr['member-vnf-index-ref']),
+ vdu_name=vdur['name'],
+ scaling_criteria=scaling_criteria_record
+ )
+ alarms_created.append(alarm)
+ alarm_uuid = await self.mon_client.create_alarm(
+ metric_name=vnf_monitoring_param['id'],
+ ns_id=nsr_id,
+ vdu_name=vdur['name'],
+ vnf_member_index=vnfr['member-vnf-index-ref'],
+ threshold=scaling_criteria['scale-out-threshold'],
+ operation=scaling_criteria['scale-out-relational-operation'],
+ statistic=vnf_monitoring_param['aggregation-type']
+ )
+ alarm = ScalingAlarmRepository.create(
+ alarm_uuid=alarm_uuid,
+ action='scale_out',
+ vnf_member_index=int(vnfr['member-vnf-index-ref']),
+ vdu_name=vdur['name'],
+ scaling_criteria=scaling_criteria_record
+ )
+ alarms_created.append(alarm)
- except Exception as e:
- log.exception("Error configuring scaling groups:")
- tx.rollback()
- if len(alarms_created) > 0:
- log.info("Cleaning alarm resources in MON")
- for alarm in alarms_created:
- await self.mon_client.delete_alarm(alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
- alarm.vnf_member_index,
- alarm.vdu_name,
- alarm.alarm_uuid)
- raise e
- database.db.close()
+ except Exception as e:
+ log.exception("Error configuring scaling groups:")
+ tx.rollback()
+ if len(alarms_created) > 0:
+ log.info("Cleaning alarm resources in MON")
+ for alarm in alarms_created:
+ await self.mon_client.delete_alarm(
+ alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
+ alarm.vnf_member_index,
+ alarm.vdu_name,
+ alarm.alarm_uuid)
+ raise e
+ finally:
+ database.db.close()
async def delete_scaling_groups(self, nsr_id: str):
+ log.debug("Deleting scaling groups for network service %s", nsr_id)
database.db.connect()
- with database.db.atomic() as tx:
- try:
- for scaling_group in ScalingGroup.select().where(ScalingGroup.nsr_id == nsr_id):
- for scaling_policy in scaling_group.scaling_policies:
- for scaling_criteria in scaling_policy.scaling_criterias:
- for alarm in scaling_criteria.scaling_alarms:
- try:
- await self.mon_client.delete_alarm(
- alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
- alarm.vnf_member_index,
- alarm.vdu_name,
- alarm.alarm_uuid)
- except ValueError:
- log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid)
- alarm.delete_instance()
- scaling_criteria.delete_instance()
- scaling_policy.delete_instance()
- scaling_group.delete_instance()
-
- except Exception as e:
- log.exception("Error deleting scaling groups and alarms:")
- tx.rollback()
- raise e
- database.db.close()
-
- async def delete_orphaned_alarms(self, nsr_id):
- database.db.connect()
- with database.db.atomic() as tx:
- try:
- for scaling_group in ScalingGroup.select().where(ScalingGroup.nsr_id == nsr_id):
- for scaling_policy in scaling_group.scaling_policies:
- for scaling_criteria in scaling_policy.scaling_criterias:
- for alarm in scaling_criteria.scaling_alarms:
- try:
- self.db_client.get_vdur(nsr_id, alarm.vnf_member_index, alarm.vdu_name)
- except VdurNotFound:
- log.info("Deleting orphaned alarm %s", alarm.alarm_uuid)
+ try:
+ with database.db.atomic() as tx:
+ try:
+ for scaling_group in ScalingGroupRepository.list(ScalingGroup.nsr_id == nsr_id):
+ for scaling_policy in scaling_group.scaling_policies:
+ for scaling_criteria in scaling_policy.scaling_criterias:
+ for alarm in scaling_criteria.scaling_alarms:
try:
await self.mon_client.delete_alarm(
alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
except ValueError:
log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid)
alarm.delete_instance()
+ scaling_criteria.delete_instance()
+ scaling_policy.delete_instance()
+ scaling_group.delete_instance()
- except Exception as e:
- log.exception("Error deleting orphaned alarms:")
- tx.rollback()
- raise e
- database.db.close()
+ except Exception as e:
+ log.exception("Error deleting scaling groups and alarms:")
+ tx.rollback()
+ raise e
+ finally:
+ database.db.close()
- async def scale(self, alarm):
+ async def delete_orphaned_alarms(self, nsr_id):
+ log.info("Deleting orphaned scaling alarms for network service %s", nsr_id)
database.db.connect()
- with database.db.atomic():
- delta = datetime.datetime.now() - alarm.scaling_criteria.scaling_policy.last_scale
- if delta.total_seconds() > alarm.scaling_criteria.scaling_policy.cooldown_time:
- log.info("Sending scaling action message for ns: %s",
- alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id)
- await self.lcm_client.scale(alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
- alarm.scaling_criteria.scaling_policy.scaling_group.name,
- alarm.vnf_member_index,
- alarm.action)
- alarm.scaling_criteria.scaling_policy.last_scale = datetime.datetime.now()
- alarm.scaling_criteria.scaling_policy.save()
- else:
- log.info("Time between last scale and now is less than cooldown time. Skipping.")
- database.db.close()
+ try:
+ with database.db.atomic() as tx:
+ try:
+ for scaling_group in ScalingGroupRepository.list(ScalingGroup.nsr_id == nsr_id):
+ for scaling_policy in scaling_group.scaling_policies:
+ for scaling_criteria in scaling_policy.scaling_criterias:
+ for alarm in scaling_criteria.scaling_alarms:
+ try:
+ self.db_client.get_vdur(nsr_id, alarm.vnf_member_index, alarm.vdu_name)
+ except VdurNotFound:
+ log.debug("Deleting orphaned scaling alarm %s", alarm.alarm_uuid)
+ try:
+ await self.mon_client.delete_alarm(
+ alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
+ alarm.vnf_member_index,
+ alarm.vdu_name,
+ alarm.alarm_uuid)
+ except ValueError:
+ log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid)
+ alarm.delete_instance()
- def get_alarm(self, alarm_uuid: str):
- database.db.connect()
- with database.db.atomic():
- alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_uuid == alarm_uuid).get()
- database.db.close()
- return alarm
+ except Exception as e:
+ log.exception("Error deleting orphaned alarms:")
+ tx.rollback()
+ raise e
+ finally:
+ database.db.close()
def get_nslcmop(self, nslcmop_id):
return self.db_client.get_nslcmop(nslcmop_id)
+
+ async def handle_alarm(self, alarm_uuid: str, status: str):
+ await self.update_alarm_status(alarm_uuid, status)
+ await self.evaluate_policy(alarm_uuid)
+
+ async def update_alarm_status(self, alarm_uuid: str, status: str):
+ database.db.connect()
+ try:
+ with database.db.atomic():
+ alarm = ScalingAlarmRepository.get(ScalingAlarm.alarm_uuid == alarm_uuid)
+ alarm.last_status = status
+ alarm.save()
+ except ScalingAlarm.DoesNotExist:
+ log.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid)
+ finally:
+ database.db.close()
+
+ async def evaluate_policy(self, alarm_uuid):
+ database.db.connect()
+ try:
+ with database.db.atomic():
+ alarm = ScalingAlarmRepository.get(ScalingAlarm.alarm_uuid == alarm_uuid)
+ vnf_member_index = alarm.vnf_member_index
+ action = alarm.action
+ scaling_policy = alarm.scaling_criteria.scaling_policy
+ if not scaling_policy.enabled:
+ return
+ if action == 'scale_in':
+ operation = scaling_policy.scale_in_operation
+ elif action == 'scale_out':
+ operation = scaling_policy.scale_out_operation
+ else:
+ raise Exception('Unknown alarm action {}'.format(alarm.action))
+ alarms = ScalingAlarmRepository.list(ScalingAlarm.scaling_criteria == alarm.scaling_criteria,
+ ScalingAlarm.action == alarm.action,
+ ScalingAlarm.vnf_member_index == vnf_member_index,
+ ScalingAlarm.vdu_name == alarm.vdu_name)
+ statuses = []
+ for alarm in alarms:
+ statuses.append(alarm.last_status)
+ if (operation == 'AND' and set(statuses) == {'alarm'}) or (operation == 'OR' and 'alarm' in statuses):
+ delta = datetime.datetime.now() - scaling_policy.last_scale
+ if delta.total_seconds() > scaling_policy.cooldown_time:
+ log.info("Sending %s action message for ns: %s",
+ alarm.action,
+ scaling_policy.scaling_group.nsr_id)
+ await self.lcm_client.scale(scaling_policy.scaling_group.nsr_id,
+ scaling_policy.scaling_group.name,
+ vnf_member_index,
+ action)
+ scaling_policy.last_scale = datetime.datetime.now()
+ scaling_policy.save()
+
+ except ScalingAlarm.DoesNotExist:
+ log.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid)
+ finally:
+ database.db.close()
import logging
import sys
-from osm_policy_module.autoscaling.agent import PolicyModuleAgent
+from osm_policy_module.core.agent import PolicyModuleAgent
from osm_policy_module.core.config import Config
from osm_policy_module.core.database import DatabaseManager
log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action)
self.db_client.create_nslcmop(nslcmop)
- log.info("Sending scale action message: %s", json.dumps(nslcmop))
+ log.debug("Sending scale action message: %s", json.dumps(nslcmop))
await self.msg_bus.aiowrite("ns", "scale", nslcmop)
def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
async def create_alarm(self, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: int, threshold: int,
statistic: str, operation: str):
cor_id = random.randint(1, 10e7)
- msg = self._build_create_alarm_payload(cor_id, metric_name, ns_id, vdu_name, vnf_member_index, threshold,
+ msg = self._build_create_alarm_payload(cor_id,
+ metric_name,
+ ns_id,
+ vdu_name,
+ vnf_member_index,
+ threshold,
statistic,
operation)
- log.info("Sending create_alarm_request %s", msg)
+ log.debug("Sending create_alarm_request %s", msg)
producer = AIOKafkaProducer(loop=self.loop,
bootstrap_servers=self.kafka_server,
key_serializer=str.encode,
await producer.send_and_wait("alarm_request", key="create_alarm_request", value=json.dumps(msg))
finally:
await producer.stop()
- log.info("Waiting for create_alarm_response...")
+ log.debug("Waiting for create_alarm_response...")
consumer = AIOKafkaConsumer(
"alarm_response_" + str(cor_id),
loop=self.loop,
content = json.loads(message.value)
except JSONDecodeError:
content = yaml.safe_load(message.value)
- log.info("Received create_alarm_response %s", content)
+ log.debug("Received create_alarm_response %s", content)
if content['alarm_create_response']['correlation_id'] == cor_id:
if not content['alarm_create_response']['status']:
raise ValueError("Error creating alarm in MON")
async def delete_alarm(self, ns_id: str, vnf_member_index: int, vdu_name: str, alarm_uuid: str):
cor_id = random.randint(1, 10e7)
msg = self._build_delete_alarm_payload(cor_id, ns_id, vdu_name, vnf_member_index, alarm_uuid)
- log.info("Sending delete_alarm_request %s", msg)
+ log.debug("Sending delete_alarm_request %s", msg)
producer = AIOKafkaProducer(loop=self.loop,
bootstrap_servers=self.kafka_server,
key_serializer=str.encode,
await producer.send_and_wait("alarm_request", key="delete_alarm_request", value=json.dumps(msg))
finally:
await producer.stop()
- log.info("Waiting for delete_alarm_response...")
+ log.debug("Waiting for delete_alarm_response...")
consumer = AIOKafkaConsumer(
"alarm_response_" + str(cor_id),
loop=self.loop,
except JSONDecodeError:
content = yaml.safe_load(message.value)
if content['alarm_delete_response']['correlation_id'] == cor_id:
- log.info("Received delete_alarm_response %s", content)
+ log.debug("Received delete_alarm_response %s", content)
if not content['alarm_delete_response']['status']:
raise ValueError("Error deleting alarm in MON. Response status is False.")
alarm_uuid = content['alarm_delete_response']['alarm_uuid']
raise ValueError('No alarm deletion response from MON. Is MON up?')
return alarm_uuid
- def _build_create_alarm_payload(self, cor_id: int, metric_name: str, ns_id: str, vdu_name: str,
+ def _build_create_alarm_payload(self, cor_id: int,
+ metric_name: str,
+ ns_id: str,
+ vdu_name: str,
vnf_member_index: int,
- threshold: int, statistic: str, operation: str):
+ threshold: int,
+ statistic: str,
+ operation: str):
+
alarm_create_request = {
'correlation_id': cor_id,
'alarm_name': 'osm_alarm_{}_{}_{}_{}'.format(ns_id, vnf_member_index, vdu_name, metric_name),
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import asyncio
+import logging
+
+import peewee
+
+from osm_policy_module.alarming.service import AlarmingService
+from osm_policy_module.autoscaling.service import AutoscalingService
+from osm_policy_module.common.message_bus_client import MessageBusClient
+from osm_policy_module.core.config import Config
+
+log = logging.getLogger(__name__)
+
+ALLOWED_KAFKA_KEYS = ['instantiated', 'scaled', 'terminated', 'notify_alarm']
+
+
+class PolicyModuleAgent:
+ def __init__(self, config: Config, loop=None):
+ self.conf = config
+ if not loop:
+ loop = asyncio.get_event_loop()
+ self.loop = loop
+ self.msg_bus = MessageBusClient(config)
+ self.autoscaling_service = AutoscalingService(config, loop)
+ self.alarming_service = AlarmingService(config, loop)
+
+ def run(self):
+ self.loop.run_until_complete(self.start())
+
+ async def start(self):
+ topics = [
+ "ns",
+ "alarm_response"
+ ]
+ await self.msg_bus.aioread(topics, self._process_msg)
+ log.critical("Exiting...")
+
+ async def _process_msg(self, topic, key, msg):
+ log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg)
+ try:
+ if key in ALLOWED_KAFKA_KEYS:
+
+ if key == 'instantiated':
+ await self._handle_instantiated(msg)
+
+ if key == 'scaled':
+ await self._handle_scaled(msg)
+
+ if key == 'terminated':
+ await self._handle_terminated(msg)
+
+ if key == 'notify_alarm':
+ await self._handle_alarm_notification(msg)
+ else:
+ log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key)
+ except peewee.PeeweeException:
+ log.exception("Database error consuming message: ")
+ raise
+ except Exception:
+ log.exception("Error consuming message: ")
+
+ async def _handle_alarm_notification(self, content):
+ log.debug("_handle_alarm_notification: %s", content)
+ alarm_uuid = content['notify_details']['alarm_uuid']
+ status = content['notify_details']['status']
+ await self.autoscaling_service.handle_alarm(alarm_uuid, status)
+ await self.alarming_service.handle_alarm(alarm_uuid, status, content)
+
+ async def _handle_instantiated(self, content):
+ log.debug("_handle_instantiated: %s", content)
+ nslcmop_id = content['nslcmop_id']
+ nslcmop = self.autoscaling_service.get_nslcmop(nslcmop_id)
+ if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
+ nsr_id = nslcmop['nsInstanceId']
+ log.info("Configuring nsr_id: %s", nsr_id)
+ await self.autoscaling_service.configure_scaling_groups(nsr_id)
+ await self.alarming_service.configure_vnf_alarms(nsr_id)
+ else:
+ log.info(
+ "Network_service is not in COMPLETED or PARTIALLY_COMPLETED state. "
+ "Current state is %s. Skipping...",
+ nslcmop['operationState'])
+
+ async def _handle_scaled(self, content):
+ log.debug("_handle_scaled: %s", content)
+ nslcmop_id = content['nslcmop_id']
+ nslcmop = self.autoscaling_service.get_nslcmop(nslcmop_id)
+ if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
+ nsr_id = nslcmop['nsInstanceId']
+ log.info("Configuring scaled service with nsr_id: %s", nsr_id)
+ await self.autoscaling_service.configure_scaling_groups(nsr_id)
+ await self.autoscaling_service.delete_orphaned_alarms(nsr_id)
+ await self.alarming_service.configure_vnf_alarms(nsr_id)
+ else:
+ log.debug(
+ "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
+ "Current state is %s. Skipping...",
+ nslcmop['operationState'])
+
+ async def _handle_terminated(self, content):
+ log.debug("_handle_deleted: %s", content)
+ nsr_id = content['nsr_id']
+ if content['operationState'] == 'COMPLETED' or content['operationState'] == 'PARTIALLY_COMPLETED':
+ log.info("Deleting scaling groups and alarms for network autoscaling_service with nsr_id: %s", nsr_id)
+ await self.autoscaling_service.delete_scaling_groups(nsr_id)
+ await self.alarming_service.delete_vnf_alarms(nsr_id)
+ else:
+ log.info(
+ "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
+ "Current state is %s. Skipping...",
+ content['operationState'])
import datetime
import logging
import os
+from typing import Iterable, List
-from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField, AutoField, DateTimeField, Proxy
+from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField, AutoField, DateTimeField, Proxy, \
+ BooleanField
from peewee_migrate import Router
from playhouse.db_url import connect
class ScalingPolicy(BaseModel):
name = CharField()
cooldown_time = IntegerField()
+ scale_in_operation = CharField(default='AND')
+ scale_out_operation = CharField(default='OR')
+ enabled = BooleanField(default=True)
last_scale = DateTimeField(default=datetime.datetime.now)
scaling_group = ForeignKeyField(ScalingGroup, related_name='scaling_policies', on_delete='CASCADE')
vnf_member_index = IntegerField()
vdu_name = CharField()
scaling_criteria = ForeignKeyField(ScalingCriteria, related_name='scaling_alarms', on_delete='CASCADE')
+ last_status = CharField(default='insufficient-data')
+
+
+class VnfAlarm(BaseModel):
+ alarm_id = CharField()
+ alarm_uuid = CharField(unique=True)
+ nsr_id = CharField()
+ vnf_member_index = IntegerField()
+ vdu_name = CharField()
+
+
+class AlarmAction(BaseModel):
+ type = CharField()
+ url = TextField()
+ alarm = ForeignKeyField(VnfAlarm, related_name='actions', on_delete='CASCADE')
class DatabaseManager:
router = Router(db, os.path.dirname(migrations.__file__))
router.run()
db.close()
+
+
+class ScalingAlarmRepository:
+
+ @staticmethod
+ def list(*expressions) -> Iterable[ScalingAlarm]:
+ return ScalingAlarm.select().where(*expressions)
+
+ @staticmethod
+ def get(*expressions, join_classes: List = None) -> ScalingAlarm:
+ query = ScalingAlarm.select()
+ if join_classes:
+ for join_class in join_classes:
+ query = query.join(join_class)
+ return query.where(*expressions).get()
+
+ @staticmethod
+ def create(**query) -> ScalingAlarm:
+ return ScalingAlarm.create(**query)
+
+
+class ScalingGroupRepository:
+
+ @staticmethod
+ def list(*expressions) -> Iterable[ScalingGroup]:
+ return ScalingGroup.select().where(*expressions)
+
+ @staticmethod
+ def get(*expressions) -> ScalingGroup:
+ return ScalingGroup.select().where(*expressions).get()
+
+ @staticmethod
+ def create(**query) -> ScalingGroup:
+ return ScalingGroup.create(**query)
+
+
+class ScalingPolicyRepository:
+
+ @staticmethod
+ def list(*expressions, join_classes: List = None) -> Iterable[ScalingPolicy]:
+ query = ScalingPolicy.select()
+ if join_classes:
+ for join_class in join_classes:
+ query = query.join(join_class)
+ return query.where(*expressions)
+
+ @staticmethod
+ def get(*expressions, join_classes: List = None) -> ScalingPolicy:
+ query = ScalingPolicy.select()
+ if join_classes:
+ for join_class in join_classes:
+ query = query.join(join_class)
+ return query.where(*expressions).get()
+
+ @staticmethod
+ def create(**query) -> ScalingPolicy:
+ return ScalingPolicy.create(**query)
+
+
+class ScalingCriteriaRepository:
+
+ @staticmethod
+ def list(*expressions, join_classes: List = None) -> Iterable[ScalingCriteria]:
+ query = ScalingCriteria.select()
+ if join_classes:
+ for join_class in join_classes:
+ query = query.join(join_class)
+ return query.where(*expressions)
+
+ @staticmethod
+ def get(*expressions, join_classes: List = None) -> ScalingCriteria:
+ query = ScalingCriteria.select()
+ if join_classes:
+ for join_class in join_classes:
+ query = query.join(join_class)
+ return query.where(*expressions).get()
+
+ @staticmethod
+ def create(**query) -> ScalingCriteria:
+ return ScalingCriteria.create(**query)
+
+
+class VnfAlarmRepository:
+
+ @staticmethod
+ def list(*expressions) -> Iterable[VnfAlarm]:
+ return VnfAlarm.select().where(*expressions)
+
+ @staticmethod
+ def get(*expressions) -> VnfAlarm:
+ return VnfAlarm.select().where(*expressions).get()
+
+ @staticmethod
+ def create(**query) -> VnfAlarm:
+ return VnfAlarm.create(**query)
+
+
+class AlarmActionRepository:
+
+ @staticmethod
+ def list(*expressions) -> Iterable[AlarmAction]:
+ return AlarmAction.select().where(*expressions)
+
+ @staticmethod
+ def get(*expressions) -> AlarmAction:
+ return AlarmAction.select().where(*expressions).get()
+
+ @staticmethod
+ def create(**query) -> AlarmAction:
+ return AlarmAction.create(**query)
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+"""Peewee migrations -- 002_add_vnf_alarm.py.
+
+Some examples (model - class or model name)::
+
+ > Model = migrator.orm['model_name'] # Return model in current state by name
+
+ > migrator.sql(sql) # Run custom SQL
+ > migrator.python(func, *args, **kwargs) # Run python code
+ > migrator.create_model(Model) # Create a model (could be used as decorator)
+ > migrator.remove_model(model, cascade=True) # Remove a model
+ > migrator.add_fields(model, **fields) # Add fields to a model
+ > migrator.change_fields(model, **fields) # Change fields
+ > migrator.remove_fields(model, *field_names, cascade=True)
+ > migrator.rename_field(model, old_field_name, new_field_name)
+ > migrator.rename_table(model, new_table_name)
+ > migrator.add_index(model, *col_names, unique=False)
+ > migrator.drop_index(model, *col_names)
+ > migrator.add_not_null(model, *field_names)
+ > migrator.drop_not_null(model, *field_names)
+ > migrator.add_default(model, field_name, default)
+
+"""
+
+import peewee as pw
+
+SQL = pw.SQL
+
+
+def migrate(migrator, database, fake=False, **kwargs):
+ """Write your migrations here."""
+
+ @migrator.create_model
+ class VnfAlarm(pw.Model):
+ id = pw.AutoField()
+ alarm_id = pw.CharField(max_length=255)
+ alarm_uuid = pw.CharField(max_length=255, unique=True)
+ nsr_id = pw.CharField(max_length=255)
+ vnf_member_index = pw.IntegerField()
+ vdu_name = pw.CharField(max_length=255)
+
+ class Meta:
+ table_name = "vnfalarm"
+
+ @migrator.create_model
+ class AlarmAction(pw.Model):
+ id = pw.AutoField()
+ type = pw.CharField(max_length=255)
+ url = pw.TextField()
+ alarm = pw.ForeignKeyField(backref='actions', column_name='alarm_id', field='id',
+ model=migrator.orm['vnfalarm'], on_delete='CASCADE')
+
+ class Meta:
+ table_name = "alarmaction"
+
+
+def rollback(migrator, database, fake=False, **kwargs):
+ """Write your rollback migrations here."""
+
+ migrator.remove_model('vnfalarm')
+
+ migrator.remove_model('alarmaction')
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+"""Peewee migrations -- 002_add_fields_to_policy.py.
+
+Some examples (model - class or model name)::
+
+ > Model = migrator.orm['model_name'] # Return model in current state by name
+
+ > migrator.sql(sql) # Run custom SQL
+ > migrator.python(func, *args, **kwargs) # Run python code
+ > migrator.create_model(Model) # Create a model (could be used as decorator)
+ > migrator.remove_model(model, cascade=True) # Remove a model
+ > migrator.add_fields(model, **fields) # Add fields to a model
+ > migrator.change_fields(model, **fields) # Change fields
+ > migrator.remove_fields(model, *field_names, cascade=True)
+ > migrator.rename_field(model, old_field_name, new_field_name)
+ > migrator.rename_table(model, new_table_name)
+ > migrator.add_index(model, *col_names, unique=False)
+ > migrator.drop_index(model, *col_names)
+ > migrator.add_not_null(model, *field_names)
+ > migrator.drop_not_null(model, *field_names)
+ > migrator.add_default(model, field_name, default)
+
+"""
+
+import peewee as pw
+
+SQL = pw.SQL
+
+
+def migrate(migrator, database, fake=False, **kwargs):
+ """Write your migrations here."""
+
+ migrator.add_fields('scalingpolicy',
+ scale_in_operation=pw.CharField(max_length=255, default='AND'),
+ scale_out_operation=pw.CharField(max_length=255, default='OR'),
+ enabled=pw.BooleanField(default=True))
+
+
+def rollback(migrator, database, fake=False, **kwargs):
+ """Write your rollback migrations here."""
+
+ migrator.remove_fields('scalingpolicy', 'scale_in_operation', 'scale_out_operation', 'enabled')
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+"""Peewee migrations -- 003_add_fields_to_alarm.py.
+
+Some examples (model - class or model name)::
+
+ > Model = migrator.orm['model_name'] # Return model in current state by name
+
+ > migrator.sql(sql) # Run custom SQL
+ > migrator.python(func, *args, **kwargs) # Run python code
+ > migrator.create_model(Model) # Create a model (could be used as decorator)
+ > migrator.remove_model(model, cascade=True) # Remove a model
+ > migrator.add_fields(model, **fields) # Add fields to a model
+ > migrator.change_fields(model, **fields) # Change fields
+ > migrator.remove_fields(model, *field_names, cascade=True)
+ > migrator.rename_field(model, old_field_name, new_field_name)
+ > migrator.rename_table(model, new_table_name)
+ > migrator.add_index(model, *col_names, unique=False)
+ > migrator.drop_index(model, *col_names)
+ > migrator.add_not_null(model, *field_names)
+ > migrator.drop_not_null(model, *field_names)
+ > migrator.add_default(model, field_name, default)
+
+"""
+
+import peewee as pw
+
+SQL = pw.SQL
+
+
+def migrate(migrator, database, fake=False, **kwargs):
+ """Write your migrations here."""
+
+ migrator.add_fields('scalingalarm',
+ last_status=pw.CharField(max_length=255, default='insufficient-data'))
+
+
+def rollback(migrator, database, fake=False, **kwargs):
+ """Write your rollback migrations here."""
+
+ migrator.remove_fields('scalingalarm', 'last_status')
from osm_policy_module.common.common_db_client import CommonDbClient
from osm_policy_module.common.mon_client import MonClient
from osm_policy_module.core import database
-from osm_policy_module.autoscaling.agent import PolicyModuleAgent
+from osm_policy_module.core.agent import PolicyModuleAgent
from osm_policy_module.core.config import Config
-from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria
+from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria, VnfAlarm, \
+ AlarmAction
log = logging.getLogger()
log.level = logging.INFO
}
}
],
- "image": "cirros034"
+ "image": "cirros034",
+ "alarm": [
+ {
+ "value": 20.0000,
+ "actions": {
+ "insufficient-data": [
+ {
+ "url": "localhost:9090"
+ }
+ ],
+ "ok": [
+ {
+ "url": "localhost:9090"
+ }
+ ],
+ "alarm": [
+ {
+ "url": "localhost:9090"
+ }
+ ]
+ },
+ "alarm-id": "alarm-1",
+ "operation": "LT",
+ "vnf-monitoring-param-ref": "cirros_vnf_memory_util"
+ }
+ ]
}
],
"monitoring-param": [
}
}
-MODELS = [ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm]
+MODELS = [ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm, VnfAlarm, AlarmAction]
class PolicyModuleAgentTest(unittest.TestCase):
create_alarm.side_effect = _test_configure_scaling_groups_create_alarm
config = Config()
agent = PolicyModuleAgent(config, self.loop)
- self.loop.run_until_complete(agent.service.configure_scaling_groups("test_nsr_id"))
+ self.loop.run_until_complete(agent.autoscaling_service.configure_scaling_groups("test_nsr_id"))
create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util',
ns_id='test_nsr_id',
operation='GT',
self.assertEqual(scaling_record.name, 'scale_cirros_vnfd-VM')
self.assertEqual(scaling_record.nsr_id, 'test_nsr_id')
+ @patch.object(DbMongo, 'db_connect', Mock())
+ @patch.object(KafkaProducer, '__init__')
+ @patch.object(MonClient, 'create_alarm')
+ @patch.object(CommonDbClient, 'get_vnfd')
+ @patch.object(CommonDbClient, 'get_nsr')
+ @patch.object(CommonDbClient, 'get_vnfr')
+ def test_configure_vnf_alarms(self, get_vnfr, get_nsr, get_vnfd, create_alarm, kafka_producer_init):
+ def _test_configure_scaling_groups_get_vnfr(*args, **kwargs):
+ if '1' in args[1]:
+ return vnfr_record_mocks[0]
+ if '2' in args[1]:
+ return vnfr_record_mocks[1]
+
+ async def _test_configure_vnf_alarms_create_alarm(*args, **kwargs):
+ return uuid.uuid4()
+
+ kafka_producer_init.return_value = None
+ get_vnfr.side_effect = _test_configure_scaling_groups_get_vnfr
+ get_nsr.return_value = nsr_record_mock
+ get_vnfd.return_value = vnfd_record_mock
+ create_alarm.side_effect = _test_configure_vnf_alarms_create_alarm
+ config = Config()
+ agent = PolicyModuleAgent(config, self.loop)
+ self.loop.run_until_complete(agent.alarming_service.configure_vnf_alarms("test_nsr_id"))
+ create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util',
+ ns_id='test_nsr_id',
+ operation='LT',
+ statistic='AVERAGE',
+ threshold=20.0,
+ vdu_name='cirros_ns-1-cirros_vnfd-VM-1',
+ vnf_member_index='1')
+ create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util',
+ ns_id='test_nsr_id',
+ operation='LT',
+ statistic='AVERAGE',
+ threshold=20.0,
+ vdu_name='cirros_ns-2-cirros_vnfd-VM-1',
+ vnf_member_index='2')
+
if __name__ == '__main__':
unittest.main()
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import asyncio
+from unittest import TestCase, mock
+
+from osm_policy_module.alarming.service import AlarmingService
+from osm_policy_module.common.common_db_client import CommonDbClient
+from osm_policy_module.common.lcm_client import LcmClient
+from osm_policy_module.common.mon_client import MonClient
+from osm_policy_module.core.config import Config
+from osm_policy_module.core.database import VnfAlarmRepository
+
+
+@mock.patch.object(LcmClient, "__init__", lambda *args, **kwargs: None)
+@mock.patch.object(MonClient, "__init__", lambda *args, **kwargs: None)
+@mock.patch.object(CommonDbClient, "__init__", lambda *args, **kwargs: None)
+class TestAlarmingService(TestCase):
+
+ def setUp(self):
+ self.config = Config()
+ self.loop = asyncio.new_event_loop()
+
+ @mock.patch.object(VnfAlarmRepository, 'get')
+ @mock.patch('requests.post')
+ @mock.patch('osm_policy_module.core.database.db')
+ def test_handle_alarm(self, database, requests_post, get_alarm):
+ mock_alarm = self._build_mock_alarm('test_id')
+ get_alarm.return_value = mock_alarm
+
+ service = AlarmingService(self.config)
+ self.loop.run_until_complete(service.handle_alarm('test_id', 'alarm', {}))
+ requests_post.assert_called_once_with(json='{}', url='http://alarm-url/')
+
+ requests_post.reset_mock()
+ self.loop.run_until_complete(service.handle_alarm('test_id', 'ok', {}))
+ requests_post.assert_called_once_with(json='{}', url='http://ok-url/')
+
+ requests_post.reset_mock()
+ self.loop.run_until_complete(service.handle_alarm('test_id', 'insufficient-data', {}))
+ requests_post.assert_called_once_with(json='{}', url='http://insufficient-data-url/')
+
+ @mock.patch.object(VnfAlarmRepository, 'get')
+ @mock.patch('requests.post')
+ @mock.patch('osm_policy_module.core.database.db')
+ def test_handle_alarm_unknown_status(self, database, requests_post, get_alarm):
+ mock_alarm = self._build_mock_alarm('test_id')
+ get_alarm.return_value = mock_alarm
+
+ service = AlarmingService(self.config)
+ self.loop.run_until_complete(service.handle_alarm('test_id', 'unknown', {}))
+ requests_post.assert_not_called()
+
+ def _build_mock_alarm(self,
+ alarm_id='test_id',
+ alarm_url='http://alarm-url/',
+ insufficient_data_url='http://insufficient-data-url/',
+ ok_url='http://ok-url/'):
+ mock_alarm = mock.Mock()
+ mock_alarm.alarm_id = alarm_id
+ insufficient_data_action = mock.Mock()
+ insufficient_data_action.type = 'insufficient-data'
+ insufficient_data_action.url = insufficient_data_url
+ alarm_action = mock.Mock()
+ alarm_action.type = 'alarm'
+ alarm_action.url = alarm_url
+ ok_action = mock.Mock()
+ ok_action.type = 'ok'
+ ok_action.url = ok_url
+ mock_alarm.actions = [insufficient_data_action, alarm_action, ok_action]
+ return mock_alarm
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import asyncio
+import datetime
+from unittest import TestCase, mock
+
+from osm_policy_module.autoscaling.service import AutoscalingService
+from osm_policy_module.common.common_db_client import CommonDbClient
+from osm_policy_module.common.lcm_client import LcmClient
+from osm_policy_module.common.mon_client import MonClient
+from osm_policy_module.core.config import Config
+from osm_policy_module.core.database import ScalingAlarmRepository
+
+
+@mock.patch.object(LcmClient, "__init__", lambda *args, **kwargs: None)
+@mock.patch.object(MonClient, "__init__", lambda *args, **kwargs: None)
+@mock.patch.object(CommonDbClient, "__init__", lambda *args, **kwargs: None)
+class TestAutoscalingService(TestCase):
+
+ def setUp(self):
+ self.config = Config()
+ self.loop = asyncio.new_event_loop()
+
+ @mock.patch.object(ScalingAlarmRepository, 'get')
+ @mock.patch('osm_policy_module.core.database.db')
+ def test_update_alarm_status(self, database, get_alarm):
+ mock_alarm = mock.Mock()
+ mock_alarm.last_status = 'insufficient_data'
+ get_alarm.return_value = mock_alarm
+
+ service = AutoscalingService(self.config)
+ self.loop.run_until_complete(service.update_alarm_status('test_uuid', 'alarm'))
+ self.assertEqual(mock_alarm.last_status, 'alarm')
+ mock_alarm.save.assert_called_with()
+
+ service = AutoscalingService(self.config)
+ self.loop.run_until_complete(service.update_alarm_status('test_uuid', 'ok'))
+ self.assertEqual(mock_alarm.last_status, 'ok')
+ mock_alarm.save.assert_called_with()
+
+ service = AutoscalingService(self.config)
+ self.loop.run_until_complete(service.update_alarm_status('test_uuid', 'insufficient_data'))
+ self.assertEqual(mock_alarm.last_status, 'insufficient_data')
+ mock_alarm.save.assert_called_with()
+
+ @mock.patch.object(ScalingAlarmRepository, 'list')
+ @mock.patch.object(ScalingAlarmRepository, 'get')
+ @mock.patch('osm_policy_module.core.database.db')
+ def test_evaluate_policy_not_enabled(self, database, get_alarm, list_alarms):
+ mock_alarm = mock.Mock()
+ mock_alarm.scaling_criteria.scaling_policy.enabled = False
+ get_alarm.return_value = mock_alarm
+
+ service = AutoscalingService(self.config)
+ self.loop.run_until_complete(service.evaluate_policy('test_uuid'))
+ list_alarms.assert_not_called()
+
+ @mock.patch.object(ScalingAlarmRepository, 'list')
+ @mock.patch.object(ScalingAlarmRepository, 'get')
+ @mock.patch.object(LcmClient, 'scale')
+ @mock.patch('osm_policy_module.core.database.db')
+ def test_evaluate_policy_scale_in_and_equal(self, database, scale, get_alarm, list_alarms):
+ """
+ Tests scale in with AND operation, both alarms triggered
+ """
+ future = asyncio.Future(loop=self.loop)
+ future.set_result('mock')
+ scale.return_value = future
+
+ mock_alarm = self._build_mock_alarm(action='scale_in', last_status='alarm', enabled=True, scale_in_op='AND')
+ get_alarm.return_value = mock_alarm
+
+ mock_alarm_2 = self._build_mock_alarm(action='scale_in', last_status='alarm', enabled=True, scale_in_op='AND')
+
+ list_alarms.return_value = [mock_alarm, mock_alarm_2]
+
+ service = AutoscalingService(self.config)
+ self.loop.run_until_complete(service.evaluate_policy('test_uuid'))
+ scale.assert_called_with('test_nsr_id', 'test_group', '1', 'scale_in')
+
+ @mock.patch.object(ScalingAlarmRepository, 'list')
+ @mock.patch.object(ScalingAlarmRepository, 'get')
+ @mock.patch.object(LcmClient, 'scale')
+ @mock.patch('osm_policy_module.core.database.db')
+ def test_evaluate_policy_scale_in_and_diff(self, database, scale, get_alarm, list_alarms):
+ """
+ Tests scale in with AND operation, only one alarm triggered.
+ """
+ future = asyncio.Future(loop=self.loop)
+ future.set_result('mock')
+ scale.return_value = future
+
+ mock_alarm = self._build_mock_alarm(action='scale_in', last_status='alarm', enabled=True, scale_in_op='AND')
+ get_alarm.return_value = mock_alarm
+
+ mock_alarm_2 = self._build_mock_alarm(action='scale_in', last_status='ok', enabled=True, scale_in_op='OR')
+
+ list_alarms.return_value = [mock_alarm, mock_alarm_2]
+
+ service = AutoscalingService(self.config)
+ self.loop.run_until_complete(service.evaluate_policy('test_uuid'))
+ scale.assert_not_called()
+
+ @mock.patch.object(ScalingAlarmRepository, 'list')
+ @mock.patch.object(ScalingAlarmRepository, 'get')
+ @mock.patch.object(LcmClient, 'scale')
+ @mock.patch('osm_policy_module.core.database.db')
+ def test_evaluate_policy_scale_in_or_equal(self, database, scale, get_alarm, list_alarms):
+ """
+ Tests scale in with OR operation, both alarms triggered
+ """
+ future = asyncio.Future(loop=self.loop)
+ future.set_result('mock')
+ scale.return_value = future
+
+ mock_alarm = self._build_mock_alarm(action='scale_in', last_status='alarm', enabled=True, scale_in_op='OR')
+ get_alarm.return_value = mock_alarm
+
+ mock_alarm_2 = self._build_mock_alarm(action='scale_in', last_status='alarm', enabled=True, scale_in_op='OR')
+
+ list_alarms.return_value = [mock_alarm, mock_alarm_2]
+
+ service = AutoscalingService(self.config)
+ self.loop.run_until_complete(service.evaluate_policy('test_uuid'))
+ scale.assert_called_with('test_nsr_id', 'test_group', '1', 'scale_in')
+
+ @mock.patch.object(ScalingAlarmRepository, 'list')
+ @mock.patch.object(ScalingAlarmRepository, 'get')
+ @mock.patch.object(LcmClient, 'scale')
+ @mock.patch('osm_policy_module.core.database.db')
+ def test_evaluate_policy_scale_in_or_diff(self, database, scale, get_alarm, list_alarms):
+ """
+ Tests scale in with OR operation, only one alarm triggered
+ """
+ future = asyncio.Future(loop=self.loop)
+ future.set_result('mock')
+ scale.return_value = future
+
+ mock_alarm = self._build_mock_alarm(action='scale_in', last_status='alarm', enabled=True, scale_in_op='OR')
+ get_alarm.return_value = mock_alarm
+
+ mock_alarm_2 = self._build_mock_alarm(action='scale_in', last_status='ok', enabled=True, scale_in_op='OR')
+
+ list_alarms.return_value = [mock_alarm, mock_alarm_2]
+
+ service = AutoscalingService(self.config)
+ self.loop.run_until_complete(service.evaluate_policy('test_uuid'))
+ scale.assert_called_with('test_nsr_id', 'test_group', '1', 'scale_in')
+
+ @mock.patch.object(ScalingAlarmRepository, 'list')
+ @mock.patch.object(ScalingAlarmRepository, 'get')
+ @mock.patch.object(LcmClient, 'scale')
+ @mock.patch('osm_policy_module.core.database.db')
+ def test_evaluate_policy_scale_out_and_equal(self, database, scale, get_alarm, list_alarms):
+ """
+ Tests scale out with AND operation, both alarms triggered
+ """
+ future = asyncio.Future(loop=self.loop)
+ future.set_result('mock')
+ scale.return_value = future
+
+ mock_alarm = self._build_mock_alarm(action='scale_out', last_status='alarm', enabled=True, scale_out_op='AND')
+ get_alarm.return_value = mock_alarm
+
+ mock_alarm_2 = self._build_mock_alarm(action='scale_out', last_status='alarm', enabled=True, scale_out_op='AND')
+
+ list_alarms.return_value = [mock_alarm, mock_alarm_2]
+
+ service = AutoscalingService(self.config)
+ self.loop.run_until_complete(service.evaluate_policy('test_uuid'))
+ scale.assert_called_with('test_nsr_id', 'test_group', '1', 'scale_out')
+
+ @mock.patch.object(ScalingAlarmRepository, 'list')
+ @mock.patch.object(ScalingAlarmRepository, 'get')
+ @mock.patch.object(LcmClient, 'scale')
+ @mock.patch('osm_policy_module.core.database.db')
+ def test_evaluate_policy_scale_out_and_diff(self, database, scale, get_alarm, list_alarms):
+ """
+ Tests scale out with AND operation, only one alarm triggered.
+ """
+ future = asyncio.Future(loop=self.loop)
+ future.set_result('mock')
+ scale.return_value = future
+
+ mock_alarm = self._build_mock_alarm(action='scale_out', last_status='alarm', enabled=True, scale_out_op='AND')
+ get_alarm.return_value = mock_alarm
+
+ mock_alarm_2 = self._build_mock_alarm(action='scale_out', last_status='ok', enabled=True, scale_out_op='OR')
+
+ list_alarms.return_value = [mock_alarm, mock_alarm_2]
+
+ service = AutoscalingService(self.config)
+ self.loop.run_until_complete(service.evaluate_policy('test_uuid'))
+ scale.assert_not_called()
+
+ @mock.patch.object(ScalingAlarmRepository, 'list')
+ @mock.patch.object(ScalingAlarmRepository, 'get')
+ @mock.patch.object(LcmClient, 'scale')
+ @mock.patch('osm_policy_module.core.database.db')
+ def test_evaluate_policy_scale_out_or_equal(self, database, scale, get_alarm, list_alarms):
+ """
+ Tests scale out with OR operation, both alarms triggered
+ """
+ future = asyncio.Future(loop=self.loop)
+ future.set_result('mock')
+ scale.return_value = future
+
+ mock_alarm = self._build_mock_alarm(action='scale_out', last_status='alarm', enabled=True, scale_out_op='OR')
+ get_alarm.return_value = mock_alarm
+
+ mock_alarm_2 = self._build_mock_alarm(action='scale_out', last_status='alarm', enabled=True, scale_out_op='OR')
+
+ list_alarms.return_value = [mock_alarm, mock_alarm_2]
+
+ service = AutoscalingService(self.config)
+ self.loop.run_until_complete(service.evaluate_policy('test_uuid'))
+ scale.assert_called_with('test_nsr_id', 'test_group', '1', 'scale_out')
+
+ @mock.patch.object(ScalingAlarmRepository, 'list')
+ @mock.patch.object(ScalingAlarmRepository, 'get')
+ @mock.patch.object(LcmClient, 'scale')
+ @mock.patch('osm_policy_module.core.database.db')
+ def test_evaluate_policy_scale_out_or_diff(self, database, scale, get_alarm, list_alarms):
+ """
+ Tests scale out with OR operation, only one alarm triggered
+ """
+ future = asyncio.Future(loop=self.loop)
+ future.set_result('mock')
+ scale.return_value = future
+
+ mock_alarm = self._build_mock_alarm(action='scale_out', last_status='alarm', enabled=True, scale_out_op='OR')
+ get_alarm.return_value = mock_alarm
+
+ mock_alarm_2 = self._build_mock_alarm(action='scale_out', last_status='ok', enabled=True, scale_out_op='OR')
+
+ list_alarms.return_value = [mock_alarm, mock_alarm_2]
+
+ service = AutoscalingService(self.config)
+ self.loop.run_until_complete(service.evaluate_policy('test_uuid'))
+ scale.assert_called_with('test_nsr_id', 'test_group', '1', 'scale_out')
+
+ def _build_mock_alarm(self,
+ action='scale_in',
+ last_status='alarm',
+ last_scale=datetime.datetime.min,
+ cooldown_time=10,
+ enabled=True,
+ scale_in_op='AND',
+ scale_out_op='AND'):
+ mock_alarm = mock.Mock()
+ mock_alarm.action = action
+ mock_alarm.last_status = last_status
+ mock_alarm.vnf_member_index = '1'
+ mock_alarm.scaling_criteria.scaling_policy.last_scale = last_scale
+ mock_alarm.scaling_criteria.scaling_policy.cooldown_time = cooldown_time
+ mock_alarm.scaling_criteria.scaling_policy.enabled = enabled
+ mock_alarm.scaling_criteria.scaling_policy.scale_in_operation = scale_in_op
+ mock_alarm.scaling_criteria.scaling_policy.scale_out_operation = scale_out_op
+ mock_alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id = 'test_nsr_id'
+ mock_alarm.scaling_criteria.scaling_policy.scaling_group.name = 'test_group'
+ return mock_alarm
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
import asyncio
-import datetime
import unittest
from unittest import mock
-from unittest.mock import Mock
-from osm_policy_module.autoscaling.agent import PolicyModuleAgent
-from osm_policy_module.autoscaling.service import Service
+from osm_policy_module.alarming.service import AlarmingService
+from osm_policy_module.autoscaling.service import AutoscalingService
+from osm_policy_module.core.agent import PolicyModuleAgent
from osm_policy_module.core.config import Config
def setUp(self):
self.loop = asyncio.new_event_loop()
+ @mock.patch('osm_policy_module.alarming.service.CommonDbClient')
+ @mock.patch('osm_policy_module.alarming.service.MonClient')
+ @mock.patch('osm_policy_module.alarming.service.LcmClient')
@mock.patch('osm_policy_module.autoscaling.service.CommonDbClient')
@mock.patch('osm_policy_module.autoscaling.service.MonClient')
@mock.patch('osm_policy_module.autoscaling.service.LcmClient')
- @mock.patch.object(Service, 'configure_scaling_groups')
- @mock.patch.object(Service, 'delete_orphaned_alarms')
- def test_handle_instantiated(self, delete_orphaned_alarms, configure_scaling_groups, lcm_client,
- mon_client, db_client):
+ @mock.patch.object(AutoscalingService, 'configure_scaling_groups')
+ @mock.patch.object(AlarmingService, 'configure_vnf_alarms')
+ @mock.patch.object(AutoscalingService, 'delete_orphaned_alarms')
+ def test_handle_instantiated(self,
+ delete_orphaned_alarms,
+ configure_vnf_alarms,
+ configure_scaling_groups,
+ autoscaling_lcm_client,
+ autoscaling_mon_client,
+ autoscaling_db_client,
+ alarming_lcm_client,
+ alarming_mon_client,
+ alarming_db_client):
async def mock_configure_scaling_groups(nsr_id):
pass
+ async def mock_configure_vnf_alarms(nsr_id):
+ pass
+
async def mock_delete_orphaned_alarms(nsr_id):
pass
config = Config()
agent = PolicyModuleAgent(config, self.loop)
- assert lcm_client.called
- assert mon_client.called
- assert db_client.called
+ assert autoscaling_lcm_client.called
+ assert autoscaling_mon_client.called
+ assert autoscaling_db_client.called
+ assert alarming_lcm_client.called
+ assert alarming_mon_client.called
+ assert alarming_db_client.called
content = {
'nslcmop_id': 'test_id',
}
'nsInstanceId': 'test_nsr_id'
}
configure_scaling_groups.side_effect = mock_configure_scaling_groups
+ configure_vnf_alarms.side_effect = mock_configure_vnf_alarms
delete_orphaned_alarms.side_effect = mock_delete_orphaned_alarms
- db_client.return_value.get_nslcmop.return_value = nslcmop_completed
+ autoscaling_db_client.return_value.get_nslcmop.return_value = nslcmop_completed
self.loop.run_until_complete(agent._handle_instantiated(content))
configure_scaling_groups.assert_called_with('test_nsr_id')
configure_scaling_groups.reset_mock()
- db_client.return_value.get_nslcmop.return_value = nslcmop_failed
+ autoscaling_db_client.return_value.get_nslcmop.return_value = nslcmop_failed
self.loop.run_until_complete(agent._handle_instantiated(content))
configure_scaling_groups.assert_not_called()
@mock.patch('osm_policy_module.autoscaling.service.CommonDbClient')
@mock.patch('osm_policy_module.autoscaling.service.MonClient')
@mock.patch('osm_policy_module.autoscaling.service.LcmClient')
- @mock.patch('osm_policy_module.core.database.db')
- @mock.patch.object(Service, 'get_alarm')
- def test_handle_alarm_notification(self, get_alarm, db, lcm_client, mon_client, db_client):
- async def mock_scale(nsr_id, scaling_group_name, vnf_member_index, action):
+ @mock.patch('osm_policy_module.alarming.service.CommonDbClient')
+ @mock.patch('osm_policy_module.alarming.service.MonClient')
+ @mock.patch('osm_policy_module.alarming.service.LcmClient')
+ @mock.patch.object(AutoscalingService, 'handle_alarm')
+ @mock.patch.object(AlarmingService, 'handle_alarm')
+ def test_handle_alarm_notification(self,
+ alarming_handle_alarm,
+ autoscaling_handle_alarm,
+ autoscaling_lcm_client,
+ autoscaling_mon_client,
+ autoscaling_db_client,
+ alarming_lcm_client,
+ alarming_mon_client,
+ alarming_db_client):
+ async def mock_handle_alarm(alarm_uuid, status, payload=None):
pass
config = Config()
agent = PolicyModuleAgent(config, self.loop)
- assert lcm_client.called
- assert mon_client.called
- assert db_client.called
+ assert autoscaling_lcm_client.called
+ assert autoscaling_mon_client.called
+ assert autoscaling_db_client.called
+ assert alarming_lcm_client.called
+ assert alarming_mon_client.called
+ assert alarming_db_client.called
content = {
'notify_details': {
'alarm_uuid': 'test_alarm_uuid',
'threshold_value': 'test_threshold_value',
'vdu_name': 'test_vdu_name',
'vnf_member_index': 'test_vnf_member_index',
- 'ns_id': 'test_nsr_id'
+ 'ns_id': 'test_nsr_id',
+ 'status': 'alarm'
}
}
- mock_alarm = Mock()
- mock_alarm.vnf_member_index = 1
- mock_alarm.action = 'scale_out'
- mock_scaling_criteria = Mock()
- mock_scaling_policy = Mock()
- mock_scaling_group = Mock()
- mock_scaling_group.nsr_id = 'test_nsr_id'
- mock_scaling_group.name = 'test_name'
- mock_scaling_policy.cooldown_time = 60
- mock_scaling_policy.scaling_group = mock_scaling_group
- mock_scaling_criteria.scaling_policy = mock_scaling_policy
- mock_alarm.scaling_criteria = mock_scaling_criteria
- get_alarm.return_value = mock_alarm
- lcm_client.return_value.scale.side_effect = mock_scale
-
- mock_scaling_policy.last_scale = datetime.datetime.now() - datetime.timedelta(minutes=90)
-
- self.loop.run_until_complete(agent._handle_alarm_notification(content))
- lcm_client.return_value.scale.assert_called_with('test_nsr_id', 'test_name', 1, 'scale_out')
- lcm_client.return_value.scale.reset_mock()
+ autoscaling_handle_alarm.side_effect = mock_handle_alarm
+ alarming_handle_alarm.side_effect = mock_handle_alarm
- mock_scaling_policy.last_scale = datetime.datetime.now()
self.loop.run_until_complete(agent._handle_alarm_notification(content))
- lcm_client.return_value.scale.assert_not_called()
+ autoscaling_handle_alarm.assert_called_with('test_alarm_uuid', 'alarm')
+ alarming_handle_alarm.assert_called_with('test_alarm_uuid', 'alarm', content)
if __name__ == '__main__':
pyyaml==3.*
pymysql==0.9.*
peewee-migrate==1.1.*
+requests
git+https://osm.etsi.org/gerrit/osm/common.git@v5.0#egg=osm-common