X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_policy_module%2Fautoscaling%2Fservice.py;h=57756b7aac0fc797dc14f64fcbc0c0d13df2d81d;hb=refs%2Ftags%2Fv9.1.1;hp=8bb17e48dc4b6b4a71f54c4ac3c46077f7085736;hpb=87c4af90c63a6b53f92fd84f20c2a29187db5765;p=osm%2FPOL.git diff --git a/osm_policy_module/autoscaling/service.py b/osm_policy_module/autoscaling/service.py index 8bb17e4..57756b7 100644 --- a/osm_policy_module/autoscaling/service.py +++ b/osm_policy_module/autoscaling/service.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +# pylint: disable=no-member # Copyright 2018 Whitestack, LLC # ************************************************************* @@ -34,7 +35,6 @@ from osm_policy_module.core.config import Config from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria, \ ScalingAlarmRepository, ScalingGroupRepository, ScalingPolicyRepository, ScalingCriteriaRepository from osm_policy_module.core.exceptions import VdurNotFound -from osm_policy_module.utils.vnfd import VnfdUtils log = logging.getLogger(__name__) @@ -67,122 +67,37 @@ class AutoscalingService: for vnfr in vnfrs: log.debug("Processing vnfr: %s", vnfr) vnfd = self.db_client.get_vnfd(vnfr['vnfd-id']) - if 'scaling-group-descriptor' not in vnfd: - log.debug("No scaling group present in vnfd") + # TODO: Change for multiple DF support + df = vnfd.get('df', [{}])[0] + if 'scaling-aspect' not in df: + log.debug("No scaling aspect present in vnfd") continue - scaling_groups = vnfd['scaling-group-descriptor'] - vnf_monitoring_params = vnfd['monitoring-param'] - for scaling_group in scaling_groups: - try: - scaling_group_record = ScalingGroupRepository.get( - ScalingGroup.nsr_id == nsr_id, - ScalingGroup.vnf_member_index == vnfr['member-vnf-index-ref'], - ScalingGroup.name == scaling_group['name'] - ) - log.debug("Found existing scaling group record in DB...") - except ScalingGroup.DoesNotExist: - log.debug("Creating scaling group record in DB...") - scaling_group_record = ScalingGroupRepository.create( - nsr_id=nsr_id, - vnf_member_index=vnfr['member-vnf-index-ref'], - name=scaling_group['name'], - content=json.dumps(scaling_group) - ) - log.debug( - "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s", - scaling_group_record.nsr_id, - scaling_group_record.vnf_member_index, - scaling_group_record.name) - for scaling_policy in scaling_group['scaling-policy']: + # TODO: Change for multiple instantiation levels support + scaling_aspects = df['scaling-aspect'] + all_vnfd_monitoring_params = self._get_all_vnfd_monitoring_params(vnfd) + for scaling_aspect in scaling_aspects: + scaling_group_record = self._get_or_create_scaling_group(nsr_id, + vnfr['member-vnf-index-ref'], + scaling_aspect) + vdurs = self._get_monitored_vdurs(scaling_aspect, vnfr['vdur']) + for scaling_policy in scaling_aspect.get('scaling-policy', ()): if scaling_policy['scaling-type'] != 'automatic': continue - try: - scaling_policy_record = ScalingPolicyRepository.get( - ScalingPolicy.name == scaling_policy['name'], - ScalingGroup.id == scaling_group_record.id, - join_classes=[ScalingGroup] - ) - log.debug("Found existing scaling policy record in DB...") - except ScalingPolicy.DoesNotExist: - log.debug("Creating scaling policy record in DB...") - scaling_policy_record = ScalingPolicyRepository.create( - nsr_id=nsr_id, - name=scaling_policy['name'], - cooldown_time=scaling_policy['cooldown-time'], - scaling_group=scaling_group_record, - ) - if 'scale-in-operation-type' in scaling_policy: - scaling_policy_record.scale_in_operation = scaling_policy[ - 'scale-in-operation-type'] - if 'scale-out-operation-type' in scaling_policy: - scaling_policy_record.scale_out_operation = scaling_policy[ - 'scale-out-operation-type'] - if 'enabled' in scaling_policy: - scaling_policy_record.enabled = scaling_policy['enabled'] - scaling_policy_record.save() - log.debug("Created scaling policy record in DB : name=%s, scaling_group.name=%s", - scaling_policy_record.name, - scaling_policy_record.scaling_group.name) + scaling_policy_record = self._get_or_create_scaling_policy(nsr_id, + scaling_policy, + scaling_group_record) for scaling_criteria in scaling_policy['scaling-criteria']: - try: - scaling_criteria_record = ScalingCriteriaRepository.get( - ScalingPolicy.id == scaling_policy_record.id, - ScalingCriteria.name == scaling_criteria['name'], - join_classes=[ScalingPolicy] - ) - log.debug("Found existing scaling criteria record in DB...") - except ScalingCriteria.DoesNotExist: - log.debug("Creating scaling criteria record in DB...") - scaling_criteria_record = ScalingCriteriaRepository.create( - nsr_id=nsr_id, - name=scaling_criteria['name'], - scaling_policy=scaling_policy_record - ) - log.debug( - "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s", - scaling_criteria_record.name, - scaling_criteria_record.scaling_policy.name) - - vnf_monitoring_param = next( - filter( - lambda param: param['id'] == scaling_criteria[ - 'vnf-monitoring-param-ref' - ], - vnf_monitoring_params) + scaling_criteria_record = self._get_or_create_scaling_criteria( + nsr_id, + scaling_criteria, + scaling_policy_record ) - if 'vdu-monitoring-param' in vnf_monitoring_param: - vdurs = list( - filter( - lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param - ['vdu-monitoring-param'] - ['vdu-ref'], - vnfr['vdur'] - ) - ) - elif 'vdu-metric' in vnf_monitoring_param: - vdurs = list( - filter( - lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param - ['vdu-metric'] - ['vdu-ref'], - vnfr['vdur'] - ) - ) - elif 'vnf-metric' in vnf_monitoring_param: - vdu = VnfdUtils.get_mgmt_vdu(vnfd) - vdurs = list( - filter( - lambda vdur: vdur['vdu-id-ref'] == vdu['id'], - vnfr['vdur'] - ) - ) - else: - log.warning( - "Scaling criteria is referring to a vnf-monitoring-param that does not " - "contain a reference to a vdu or vnf metric.") - continue + monitoring_param_ref = scaling_criteria.get('vnf-monitoring-param-ref') + vnf_monitoring_param = all_vnfd_monitoring_params[monitoring_param_ref] + for vdur in vdurs: + vdu_id = vdur['vdu-id-ref'] log.debug("Creating alarm for vdur %s ", vdur) try: ScalingAlarmRepository.get(ScalingAlarm.vdu_name == vdur['name'], @@ -196,31 +111,45 @@ class AutoscalingService: continue except ScalingAlarm.DoesNotExist: pass + metric_name = self._get_metric_name(vnf_monitoring_param) + + db_nsr = self.db_client.get_nsr(nsr_id) + nb_scale_op = 0 + if db_nsr["_admin"].get("scaling-group"): + db_nsr_admin = db_nsr["_admin"]["scaling-group"] + for admin_scale_index, admin_scale_info in enumerate(db_nsr_admin): + if admin_scale_info["name"] == scaling_aspect["name"]: + nb_scale_op = admin_scale_info.get("nb-scale-op", 0) + break + min_instance_count = 1 + for vdu_profile in df.get('vdu-profile', ()): + if vdu_profile.get('id') == vdu_id: + min_instance_count = int(vdu_profile.get('min-number-of-instances ', 1)) + break + if nb_scale_op >= min_instance_count: + alarm_uuid = await self.mon_client.create_alarm( + metric_name=metric_name, + ns_id=nsr_id, + vdu_name=vdur['name'], + vnf_member_index=vnfr['member-vnf-index-ref'], + threshold=scaling_criteria['scale-in-threshold'], + operation=scaling_criteria['scale-in-relational-operation'] + ) + alarm = ScalingAlarmRepository.create( + alarm_uuid=alarm_uuid, + action='scale_in', + vnf_member_index=vnfr['member-vnf-index-ref'], + vdu_name=vdur['name'], + scaling_criteria=scaling_criteria_record + ) + alarms_created.append(alarm) alarm_uuid = await self.mon_client.create_alarm( - metric_name=vnf_monitoring_param['id'], - ns_id=nsr_id, - vdu_name=vdur['name'], - vnf_member_index=vnfr['member-vnf-index-ref'], - threshold=scaling_criteria['scale-in-threshold'], - operation=scaling_criteria['scale-in-relational-operation'], - statistic=vnf_monitoring_param['aggregation-type'] - ) - alarm = ScalingAlarmRepository.create( - alarm_uuid=alarm_uuid, - action='scale_in', - vnf_member_index=vnfr['member-vnf-index-ref'], - vdu_name=vdur['name'], - scaling_criteria=scaling_criteria_record - ) - alarms_created.append(alarm) - alarm_uuid = await self.mon_client.create_alarm( - metric_name=vnf_monitoring_param['id'], + metric_name=metric_name, ns_id=nsr_id, vdu_name=vdur['name'], vnf_member_index=vnfr['member-vnf-index-ref'], threshold=scaling_criteria['scale-out-threshold'], - operation=scaling_criteria['scale-out-relational-operation'], - statistic=vnf_monitoring_param['aggregation-type'] + operation=scaling_criteria['scale-out-relational-operation'] ) alarm = ScalingAlarmRepository.create( alarm_uuid=alarm_uuid, @@ -307,9 +236,6 @@ class AutoscalingService: finally: database.db.close() - def get_nslcmop(self, nslcmop_id): - return self.db_client.get_nslcmop(nslcmop_id) - async def handle_alarm(self, alarm_uuid: str, status: str): await self.update_alarm_status(alarm_uuid, status) await self.evaluate_policy(alarm_uuid) @@ -366,3 +292,111 @@ class AutoscalingService: log.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid) finally: database.db.close() + + def _get_all_vnfd_monitoring_params(self, vnfd): + all_monitoring_params = {} + for ivld in vnfd.get("int-virtual-link-desc", ()): + for mp in ivld.get("monitoring-parameters", ()): + all_monitoring_params[mp.get("id")] = mp + + for vdu in vnfd.get("vdu", ()): + for mp in vdu.get("monitoring-parameter", ()): + all_monitoring_params[mp.get("id")] = mp + + for df in vnfd.get("df", ()): + for mp in df.get("monitoring-parameter", ()): + all_monitoring_params[mp.get("id")] = mp + + return all_monitoring_params + + def _get_or_create_scaling_group(self, nsr_id: str, vnf_member_index: str, scaling_aspect: dict): + try: + scaling_group_record = ScalingGroupRepository.get( + ScalingGroup.nsr_id == nsr_id, + ScalingGroup.vnf_member_index == vnf_member_index, + ScalingGroup.name == scaling_aspect['name'] + ) + log.debug("Found existing scaling group record in DB...") + except ScalingGroup.DoesNotExist: + log.debug("Creating scaling group record in DB...") + scaling_group_record = ScalingGroupRepository.create( + nsr_id=nsr_id, + vnf_member_index=vnf_member_index, + name=scaling_aspect['name'], + content=json.dumps(scaling_aspect) + ) + log.debug( + "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s", + scaling_group_record.nsr_id, + scaling_group_record.vnf_member_index, + scaling_group_record.name) + return scaling_group_record + + def _get_or_create_scaling_policy(self, nsr_id: str, scaling_policy: dict, scaling_group_record: ScalingGroup): + try: + scaling_policy_record = ScalingPolicyRepository.get( + ScalingPolicy.name == scaling_policy['name'], + ScalingGroup.id == scaling_group_record.id, + join_classes=[ScalingGroup] + ) + log.debug("Found existing scaling policy record in DB...") + except ScalingPolicy.DoesNotExist: + log.debug("Creating scaling policy record in DB...") + scaling_policy_record = ScalingPolicyRepository.create( + nsr_id=nsr_id, + name=scaling_policy['name'], + cooldown_time=scaling_policy['cooldown-time'], + scaling_group=scaling_group_record, + ) + if 'scale-in-operation-type' in scaling_policy: + scaling_policy_record.scale_in_operation = scaling_policy['scale-in-operation-type'] + if 'scale-out-operation-type' in scaling_policy: + scaling_policy_record.scale_out_operation = scaling_policy['scale-out-operation-type'] + if 'enabled' in scaling_policy: + scaling_policy_record.enabled = scaling_policy['enabled'] + scaling_policy_record.save() + log.debug("Created scaling policy record in DB : name=%s, scaling_group.name=%s", + scaling_policy_record.name, + scaling_policy_record.scaling_group.name) + return scaling_policy_record + + def _get_or_create_scaling_criteria(self, nsr_id: str, scaling_criteria: dict, + scaling_policy_record: ScalingPolicy): + try: + scaling_criteria_record = ScalingCriteriaRepository.get( + ScalingPolicy.id == scaling_policy_record.id, + ScalingCriteria.name == scaling_criteria['name'], + join_classes=[ScalingPolicy] + ) + log.debug("Found existing scaling criteria record in DB...") + except ScalingCriteria.DoesNotExist: + log.debug("Creating scaling criteria record in DB...") + scaling_criteria_record = ScalingCriteriaRepository.create( + nsr_id=nsr_id, + name=scaling_criteria['name'], + scaling_policy=scaling_policy_record + ) + log.debug( + "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s", + scaling_criteria_record.name, + scaling_criteria_record.scaling_policy.name) + return scaling_criteria_record + + def _get_monitored_vdurs(self, scaling_aspect: dict, vdurs): + all_monitored_vdus = set() + for delta in scaling_aspect.get('aspect-delta-details', {}).get('deltas', ()): + for vdu_delta in delta.get('vdu-delta', ()): + all_monitored_vdus.add(vdu_delta.get('id')) + + monitored_vdurs = list(filter(lambda vdur: vdur['vdu-id-ref'] in all_monitored_vdus, vdurs)) + + if not monitored_vdurs: + log.warning( + "Scaling criteria is referring to a vnf-monitoring-param that does not " + "contain a reference to a vdu or vnf metric.") + return monitored_vdurs + + def _get_metric_name(self, vnf_monitoring_param: dict): + if 'performance-metric' in vnf_monitoring_param: + return vnf_monitoring_param['performance-metric'] + raise ValueError('No metric name found for vnf_monitoring_param %s' % vnf_monitoring_param['id'])