Reformat POL to standardized format
[osm/POL.git] / osm_policy_module / autoscaling / service.py
index 57756b7..2709f64 100644 (file)
@@ -32,15 +32,22 @@ from osm_policy_module.common.lcm_client import LcmClient
 from osm_policy_module.common.mon_client import MonClient
 from osm_policy_module.core import database
 from osm_policy_module.core.config import Config
-from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria, \
-    ScalingAlarmRepository, ScalingGroupRepository, ScalingPolicyRepository, ScalingCriteriaRepository
+from osm_policy_module.core.database import (
+    ScalingGroup,
+    ScalingAlarm,
+    ScalingPolicy,
+    ScalingCriteria,
+    ScalingAlarmRepository,
+    ScalingGroupRepository,
+    ScalingPolicyRepository,
+    ScalingCriteriaRepository,
+)
 from osm_policy_module.core.exceptions import VdurNotFound
 
 log = logging.getLogger(__name__)
 
 
 class AutoscalingService:
-
     def __init__(self, config: Config, loop=None):
         self.conf = config
         if not loop:
@@ -56,8 +63,9 @@ class AutoscalingService:
         :param nsr_id: Network service record id
         :return:
         """
-        log.info("Configuring scaling groups for network service with nsr_id: %s",
-                 nsr_id)
+        log.info(
+            "Configuring scaling groups for network service with nsr_id: %s", nsr_id
+        )
         alarms_created = []
         database.db.connect()
         try:
@@ -66,97 +74,156 @@ class AutoscalingService:
                     vnfrs = self.db_client.get_vnfrs(nsr_id)
                     for vnfr in vnfrs:
                         log.debug("Processing vnfr: %s", vnfr)
-                        vnfd = self.db_client.get_vnfd(vnfr['vnfd-id'])
+                        vnfd = self.db_client.get_vnfd(vnfr["vnfd-id"])
                         # TODO: Change for multiple DF support
-                        df = vnfd.get('df', [{}])[0]
-                        if 'scaling-aspect' not in df:
+                        df = vnfd.get("df", [{}])[0]
+                        if "scaling-aspect" not in df:
                             log.debug("No scaling aspect present in vnfd")
                             continue
                         # TODO: Change for multiple instantiation levels support
-                        scaling_aspects = df['scaling-aspect']
-                        all_vnfd_monitoring_params = self._get_all_vnfd_monitoring_params(vnfd)
+                        scaling_aspects = df["scaling-aspect"]
+                        all_vnfd_monitoring_params = (
+                            self._get_all_vnfd_monitoring_params(vnfd)
+                        )
                         for scaling_aspect in scaling_aspects:
-                            scaling_group_record = self._get_or_create_scaling_group(nsr_id,
-                                                                                     vnfr['member-vnf-index-ref'],
-                                                                                     scaling_aspect)
-                            vdurs = self._get_monitored_vdurs(scaling_aspect, vnfr['vdur'])
-                            for scaling_policy in scaling_aspect.get('scaling-policy', ()):
-                                if scaling_policy['scaling-type'] != 'automatic':
+                            scaling_group_record = self._get_or_create_scaling_group(
+                                nsr_id, vnfr["member-vnf-index-ref"], scaling_aspect
+                            )
+                            vdurs = self._get_monitored_vdurs(
+                                scaling_aspect, vnfr["vdur"]
+                            )
+                            for scaling_policy in scaling_aspect.get(
+                                "scaling-policy", ()
+                            ):
+                                if scaling_policy["scaling-type"] != "automatic":
                                     continue
-                                scaling_policy_record = self._get_or_create_scaling_policy(nsr_id,
-                                                                                           scaling_policy,
-                                                                                           scaling_group_record)
-
-                                for scaling_criteria in scaling_policy['scaling-criteria']:
-                                    scaling_criteria_record = self._get_or_create_scaling_criteria(
-                                        nsr_id,
-                                        scaling_criteria,
-                                        scaling_policy_record
+                                scaling_policy_record = (
+                                    self._get_or_create_scaling_policy(
+                                        nsr_id, scaling_policy, scaling_group_record
+                                    )
+                                )
+
+                                for scaling_criteria in scaling_policy[
+                                    "scaling-criteria"
+                                ]:
+                                    scaling_criteria_record = (
+                                        self._get_or_create_scaling_criteria(
+                                            nsr_id,
+                                            scaling_criteria,
+                                            scaling_policy_record,
+                                        )
                                     )
-                                    monitoring_param_ref = scaling_criteria.get('vnf-monitoring-param-ref')
-                                    vnf_monitoring_param = all_vnfd_monitoring_params[monitoring_param_ref]
+                                    monitoring_param_ref = scaling_criteria.get(
+                                        "vnf-monitoring-param-ref"
+                                    )
+                                    vnf_monitoring_param = all_vnfd_monitoring_params[
+                                        monitoring_param_ref
+                                    ]
 
                                     for vdur in vdurs:
-                                        vdu_id = vdur['vdu-id-ref']
+                                        vdu_id = vdur["vdu-id-ref"]
                                         log.debug("Creating alarm for vdur %s ", vdur)
                                         try:
-                                            ScalingAlarmRepository.get(ScalingAlarm.vdu_name == vdur['name'],
-                                                                       ScalingCriteria.name == scaling_criteria['name'],
-                                                                       ScalingPolicy.name == scaling_policy['name'],
-                                                                       ScalingGroup.nsr_id == nsr_id,
-                                                                       join_classes=[ScalingCriteria,
-                                                                                     ScalingPolicy,
-                                                                                     ScalingGroup])
-                                            log.debug("vdu %s already has an alarm configured", vdur['name'])
+                                            ScalingAlarmRepository.get(
+                                                ScalingAlarm.vdu_name == vdur["name"],
+                                                ScalingCriteria.name
+                                                == scaling_criteria["name"],
+                                                ScalingPolicy.name
+                                                == scaling_policy["name"],
+                                                ScalingGroup.nsr_id == nsr_id,
+                                                join_classes=[
+                                                    ScalingCriteria,
+                                                    ScalingPolicy,
+                                                    ScalingGroup,
+                                                ],
+                                            )
+                                            log.debug(
+                                                "vdu %s already has an alarm configured",
+                                                vdur["name"],
+                                            )
                                             continue
                                         except ScalingAlarm.DoesNotExist:
                                             pass
-                                        metric_name = self._get_metric_name(vnf_monitoring_param)
+                                        metric_name = self._get_metric_name(
+                                            vnf_monitoring_param
+                                        )
 
                                         db_nsr = self.db_client.get_nsr(nsr_id)
                                         nb_scale_op = 0
                                         if db_nsr["_admin"].get("scaling-group"):
-                                            db_nsr_admin = db_nsr["_admin"]["scaling-group"]
-                                            for admin_scale_index, admin_scale_info in enumerate(db_nsr_admin):
-                                                if admin_scale_info["name"] == scaling_aspect["name"]:
-                                                    nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
+                                            db_nsr_admin = db_nsr["_admin"][
+                                                "scaling-group"
+                                            ]
+                                            for (
+                                                admin_scale_index,
+                                                admin_scale_info,
+                                            ) in enumerate(db_nsr_admin):
+                                                if (
+                                                    admin_scale_info["name"]
+                                                    == scaling_aspect["name"]
+                                                ):
+                                                    nb_scale_op = admin_scale_info.get(
+                                                        "nb-scale-op", 0
+                                                    )
                                                     break
                                         min_instance_count = 1
-                                        for vdu_profile in df.get('vdu-profile', ()):
-                                            if vdu_profile.get('id') == vdu_id:
-                                                min_instance_count = int(vdu_profile.get('min-number-of-instances ', 1))
+                                        for vdu_profile in df.get("vdu-profile", ()):
+                                            if vdu_profile.get("id") == vdu_id:
+                                                min_instance_count = int(
+                                                    vdu_profile.get(
+                                                        "min-number-of-instances ", 1
+                                                    )
+                                                )
                                                 break
                                         if nb_scale_op >= min_instance_count:
-                                            alarm_uuid = await self.mon_client.create_alarm(
-                                                metric_name=metric_name,
-                                                ns_id=nsr_id,
-                                                vdu_name=vdur['name'],
-                                                vnf_member_index=vnfr['member-vnf-index-ref'],
-                                                threshold=scaling_criteria['scale-in-threshold'],
-                                                operation=scaling_criteria['scale-in-relational-operation']
+                                            alarm_uuid = (
+                                                await self.mon_client.create_alarm(
+                                                    metric_name=metric_name,
+                                                    ns_id=nsr_id,
+                                                    vdu_name=vdur["name"],
+                                                    vnf_member_index=vnfr[
+                                                        "member-vnf-index-ref"
+                                                    ],
+                                                    threshold=scaling_criteria[
+                                                        "scale-in-threshold"
+                                                    ],
+                                                    operation=scaling_criteria[
+                                                        "scale-in-relational-operation"
+                                                    ],
+                                                )
                                             )
                                             alarm = ScalingAlarmRepository.create(
                                                 alarm_uuid=alarm_uuid,
-                                                action='scale_in',
-                                                vnf_member_index=vnfr['member-vnf-index-ref'],
-                                                vdu_name=vdur['name'],
-                                                scaling_criteria=scaling_criteria_record
+                                                action="scale_in",
+                                                vnf_member_index=vnfr[
+                                                    "member-vnf-index-ref"
+                                                ],
+                                                vdu_name=vdur["name"],
+                                                scaling_criteria=scaling_criteria_record,
                                             )
                                             alarms_created.append(alarm)
                                         alarm_uuid = await self.mon_client.create_alarm(
                                             metric_name=metric_name,
                                             ns_id=nsr_id,
-                                            vdu_name=vdur['name'],
-                                            vnf_member_index=vnfr['member-vnf-index-ref'],
-                                            threshold=scaling_criteria['scale-out-threshold'],
-                                            operation=scaling_criteria['scale-out-relational-operation']
+                                            vdu_name=vdur["name"],
+                                            vnf_member_index=vnfr[
+                                                "member-vnf-index-ref"
+                                            ],
+                                            threshold=scaling_criteria[
+                                                "scale-out-threshold"
+                                            ],
+                                            operation=scaling_criteria[
+                                                "scale-out-relational-operation"
+                                            ],
                                         )
                                         alarm = ScalingAlarmRepository.create(
                                             alarm_uuid=alarm_uuid,
-                                            action='scale_out',
-                                            vnf_member_index=vnfr['member-vnf-index-ref'],
-                                            vdu_name=vdur['name'],
-                                            scaling_criteria=scaling_criteria_record
+                                            action="scale_out",
+                                            vnf_member_index=vnfr[
+                                                "member-vnf-index-ref"
+                                            ],
+                                            vdu_name=vdur["name"],
+                                            scaling_criteria=scaling_criteria_record,
                                         )
                                         alarms_created.append(alarm)
 
@@ -170,7 +237,8 @@ class AutoscalingService:
                                 alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
                                 alarm.vnf_member_index,
                                 alarm.vdu_name,
-                                alarm.alarm_uuid)
+                                alarm.alarm_uuid,
+                            )
                     raise e
         finally:
             database.db.close()
@@ -181,7 +249,9 @@ class AutoscalingService:
         try:
             with database.db.atomic() as tx:
                 try:
-                    for scaling_group in ScalingGroupRepository.list(ScalingGroup.nsr_id == nsr_id):
+                    for scaling_group in ScalingGroupRepository.list(
+                        ScalingGroup.nsr_id == nsr_id
+                    ):
                         for scaling_policy in scaling_group.scaling_policies:
                             for scaling_criteria in scaling_policy.scaling_criterias:
                                 for alarm in scaling_criteria.scaling_alarms:
@@ -190,9 +260,13 @@ class AutoscalingService:
                                             alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
                                             alarm.vnf_member_index,
                                             alarm.vdu_name,
-                                            alarm.alarm_uuid)
+                                            alarm.alarm_uuid,
+                                        )
                                     except ValueError:
-                                        log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid)
+                                        log.exception(
+                                            "Error deleting alarm in MON %s",
+                                            alarm.alarm_uuid,
+                                        )
                                     alarm.delete_instance()
                                 scaling_criteria.delete_instance()
                             scaling_policy.delete_instance()
@@ -211,22 +285,35 @@ class AutoscalingService:
         try:
             with database.db.atomic() as tx:
                 try:
-                    for scaling_group in ScalingGroupRepository.list(ScalingGroup.nsr_id == nsr_id):
+                    for scaling_group in ScalingGroupRepository.list(
+                        ScalingGroup.nsr_id == nsr_id
+                    ):
                         for scaling_policy in scaling_group.scaling_policies:
                             for scaling_criteria in scaling_policy.scaling_criterias:
                                 for alarm in scaling_criteria.scaling_alarms:
                                     try:
-                                        self.db_client.get_vdur(nsr_id, alarm.vnf_member_index, alarm.vdu_name)
+                                        self.db_client.get_vdur(
+                                            nsr_id,
+                                            alarm.vnf_member_index,
+                                            alarm.vdu_name,
+                                        )
                                     except VdurNotFound:
-                                        log.debug("Deleting orphaned scaling alarm %s", alarm.alarm_uuid)
+                                        log.debug(
+                                            "Deleting orphaned scaling alarm %s",
+                                            alarm.alarm_uuid,
+                                        )
                                         try:
                                             await self.mon_client.delete_alarm(
                                                 alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
                                                 alarm.vnf_member_index,
                                                 alarm.vdu_name,
-                                                alarm.alarm_uuid)
+                                                alarm.alarm_uuid,
+                                            )
                                         except ValueError:
-                                            log.exception("Error deleting alarm in MON %s", alarm.alarm_uuid)
+                                            log.exception(
+                                                "Error deleting alarm in MON %s",
+                                                alarm.alarm_uuid,
+                                            )
                                         alarm.delete_instance()
 
                 except Exception as e:
@@ -244,11 +331,15 @@ class AutoscalingService:
         database.db.connect()
         try:
             with database.db.atomic():
-                alarm = ScalingAlarmRepository.get(ScalingAlarm.alarm_uuid == alarm_uuid)
+                alarm = ScalingAlarmRepository.get(
+                    ScalingAlarm.alarm_uuid == alarm_uuid
+                )
                 alarm.last_status = status
                 alarm.save()
         except ScalingAlarm.DoesNotExist:
-            log.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid)
+            log.debug(
+                "There is no autoscaling action configured for alarm %s.", alarm_uuid
+            )
         finally:
             database.db.close()
 
@@ -256,40 +347,52 @@ class AutoscalingService:
         database.db.connect()
         try:
             with database.db.atomic():
-                alarm = ScalingAlarmRepository.get(ScalingAlarm.alarm_uuid == alarm_uuid)
+                alarm = ScalingAlarmRepository.get(
+                    ScalingAlarm.alarm_uuid == alarm_uuid
+                )
                 vnf_member_index = alarm.vnf_member_index
                 action = alarm.action
                 scaling_policy = alarm.scaling_criteria.scaling_policy
                 if not scaling_policy.enabled:
                     return
-                if action == 'scale_in':
+                if action == "scale_in":
                     operation = scaling_policy.scale_in_operation
-                elif action == 'scale_out':
+                elif action == "scale_out":
                     operation = scaling_policy.scale_out_operation
                 else:
-                    raise Exception('Unknown alarm action {}'.format(alarm.action))
-                alarms = ScalingAlarmRepository.list(ScalingAlarm.scaling_criteria == alarm.scaling_criteria,
-                                                     ScalingAlarm.action == alarm.action,
-                                                     ScalingAlarm.vnf_member_index == vnf_member_index,
-                                                     ScalingAlarm.vdu_name == alarm.vdu_name)
+                    raise Exception("Unknown alarm action {}".format(alarm.action))
+                alarms = ScalingAlarmRepository.list(
+                    ScalingAlarm.scaling_criteria == alarm.scaling_criteria,
+                    ScalingAlarm.action == alarm.action,
+                    ScalingAlarm.vnf_member_index == vnf_member_index,
+                    ScalingAlarm.vdu_name == alarm.vdu_name,
+                )
                 statuses = []
                 for alarm in alarms:
                     statuses.append(alarm.last_status)
-                if (operation == 'AND' and set(statuses) == {'alarm'}) or (operation == 'OR' and 'alarm' in statuses):
+                if (operation == "AND" and set(statuses) == {"alarm"}) or (
+                    operation == "OR" and "alarm" in statuses
+                ):
                     delta = datetime.datetime.now() - scaling_policy.last_scale
                     if delta.total_seconds() > scaling_policy.cooldown_time:
-                        log.info("Sending %s action message for ns: %s",
-                                 alarm.action,
-                                 scaling_policy.scaling_group.nsr_id)
-                        await self.lcm_client.scale(scaling_policy.scaling_group.nsr_id,
-                                                    scaling_policy.scaling_group.name,
-                                                    vnf_member_index,
-                                                    action)
+                        log.info(
+                            "Sending %s action message for ns: %s",
+                            alarm.action,
+                            scaling_policy.scaling_group.nsr_id,
+                        )
+                        await self.lcm_client.scale(
+                            scaling_policy.scaling_group.nsr_id,
+                            scaling_policy.scaling_group.name,
+                            vnf_member_index,
+                            action,
+                        )
                         scaling_policy.last_scale = datetime.datetime.now()
                         scaling_policy.save()
 
         except ScalingAlarm.DoesNotExist:
-            log.debug("There is no autoscaling action configured for alarm %s.", alarm_uuid)
+            log.debug(
+                "There is no autoscaling action configured for alarm %s.", alarm_uuid
+            )
         finally:
             database.db.close()
 
@@ -309,12 +412,14 @@ class AutoscalingService:
 
         return all_monitoring_params
 
-    def _get_or_create_scaling_group(self, nsr_id: str, vnf_member_index: str, scaling_aspect: dict):
+    def _get_or_create_scaling_group(
+        self, nsr_id: str, vnf_member_index: str, scaling_aspect: dict
+    ):
         try:
             scaling_group_record = ScalingGroupRepository.get(
                 ScalingGroup.nsr_id == nsr_id,
                 ScalingGroup.vnf_member_index == vnf_member_index,
-                ScalingGroup.name == scaling_aspect['name']
+                ScalingGroup.name == scaling_aspect["name"],
             )
             log.debug("Found existing scaling group record in DB...")
         except ScalingGroup.DoesNotExist:
@@ -322,81 +427,98 @@ class AutoscalingService:
             scaling_group_record = ScalingGroupRepository.create(
                 nsr_id=nsr_id,
                 vnf_member_index=vnf_member_index,
-                name=scaling_aspect['name'],
-                content=json.dumps(scaling_aspect)
+                name=scaling_aspect["name"],
+                content=json.dumps(scaling_aspect),
             )
             log.debug(
                 "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%s, name=%s",
                 scaling_group_record.nsr_id,
                 scaling_group_record.vnf_member_index,
-                scaling_group_record.name)
+                scaling_group_record.name,
+            )
         return scaling_group_record
 
-    def _get_or_create_scaling_policy(self, nsr_id: str, scaling_policy: dict, scaling_group_record: ScalingGroup):
+    def _get_or_create_scaling_policy(
+        self, nsr_id: str, scaling_policy: dict, scaling_group_record: ScalingGroup
+    ):
         try:
             scaling_policy_record = ScalingPolicyRepository.get(
-                ScalingPolicy.name == scaling_policy['name'],
+                ScalingPolicy.name == scaling_policy["name"],
                 ScalingGroup.id == scaling_group_record.id,
-                join_classes=[ScalingGroup]
+                join_classes=[ScalingGroup],
             )
             log.debug("Found existing scaling policy record in DB...")
         except ScalingPolicy.DoesNotExist:
             log.debug("Creating scaling policy record in DB...")
             scaling_policy_record = ScalingPolicyRepository.create(
                 nsr_id=nsr_id,
-                name=scaling_policy['name'],
-                cooldown_time=scaling_policy['cooldown-time'],
+                name=scaling_policy["name"],
+                cooldown_time=scaling_policy["cooldown-time"],
                 scaling_group=scaling_group_record,
             )
-            if 'scale-in-operation-type' in scaling_policy:
-                scaling_policy_record.scale_in_operation = scaling_policy['scale-in-operation-type']
-            if 'scale-out-operation-type' in scaling_policy:
-                scaling_policy_record.scale_out_operation = scaling_policy['scale-out-operation-type']
-            if 'enabled' in scaling_policy:
-                scaling_policy_record.enabled = scaling_policy['enabled']
+            if "scale-in-operation-type" in scaling_policy:
+                scaling_policy_record.scale_in_operation = scaling_policy[
+                    "scale-in-operation-type"
+                ]
+            if "scale-out-operation-type" in scaling_policy:
+                scaling_policy_record.scale_out_operation = scaling_policy[
+                    "scale-out-operation-type"
+                ]
+            if "enabled" in scaling_policy:
+                scaling_policy_record.enabled = scaling_policy["enabled"]
             scaling_policy_record.save()
-            log.debug("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
-                      scaling_policy_record.name,
-                      scaling_policy_record.scaling_group.name)
+            log.debug(
+                "Created scaling policy record in DB : name=%s, scaling_group.name=%s",
+                scaling_policy_record.name,
+                scaling_policy_record.scaling_group.name,
+            )
         return scaling_policy_record
 
-    def _get_or_create_scaling_criteria(self, nsr_id: str, scaling_criteria: dict,
-                                        scaling_policy_record: ScalingPolicy):
+    def _get_or_create_scaling_criteria(
+        self, nsr_id: str, scaling_criteria: dict, scaling_policy_record: ScalingPolicy
+    ):
         try:
             scaling_criteria_record = ScalingCriteriaRepository.get(
                 ScalingPolicy.id == scaling_policy_record.id,
-                ScalingCriteria.name == scaling_criteria['name'],
-                join_classes=[ScalingPolicy]
+                ScalingCriteria.name == scaling_criteria["name"],
+                join_classes=[ScalingPolicy],
             )
             log.debug("Found existing scaling criteria record in DB...")
         except ScalingCriteria.DoesNotExist:
             log.debug("Creating scaling criteria record in DB...")
             scaling_criteria_record = ScalingCriteriaRepository.create(
                 nsr_id=nsr_id,
-                name=scaling_criteria['name'],
-                scaling_policy=scaling_policy_record
+                name=scaling_criteria["name"],
+                scaling_policy=scaling_policy_record,
             )
             log.debug(
                 "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
                 scaling_criteria_record.name,
-                scaling_criteria_record.scaling_policy.name)
+                scaling_criteria_record.scaling_policy.name,
+            )
         return scaling_criteria_record
 
     def _get_monitored_vdurs(self, scaling_aspect: dict, vdurs):
         all_monitored_vdus = set()
-        for delta in scaling_aspect.get('aspect-delta-details', {}).get('deltas', ()):
-            for vdu_delta in delta.get('vdu-delta', ()):
-                all_monitored_vdus.add(vdu_delta.get('id'))
+        for delta in scaling_aspect.get("aspect-delta-details", {}).get("deltas", ()):
+            for vdu_delta in delta.get("vdu-delta", ()):
+                all_monitored_vdus.add(vdu_delta.get("id"))
 
-        monitored_vdurs = list(filter(lambda vdur: vdur['vdu-id-ref'] in all_monitored_vdus, vdurs))
+        monitored_vdurs = list(
+            filter(lambda vdur: vdur["vdu-id-ref"] in all_monitored_vdus, vdurs)
+        )
 
         if not monitored_vdurs:
             log.warning(
                 "Scaling criteria is referring to a vnf-monitoring-param that does not "
-                "contain a reference to a vdu or vnf metric.")
+                "contain a reference to a vdu or vnf metric."
+            )
         return monitored_vdurs
 
     def _get_metric_name(self, vnf_monitoring_param: dict):
-        if 'performance-metric' in vnf_monitoring_param:
-            return vnf_monitoring_param['performance-metric']
-        raise ValueError('No metric name found for vnf_monitoring_param %s' % vnf_monitoring_param['id'])
+        if "performance-metric" in vnf_monitoring_param:
+            return vnf_monitoring_param["performance-metric"]
+        raise ValueError(
+            "No metric name found for vnf_monitoring_param %s"
+            % vnf_monitoring_param["id"]
+        )