Fixes StopIteration error in multi scaling group descriptor scenarios 59/6659/8
authorBenjamin Diaz <bdiaz@whitestack.com>
Mon, 8 Oct 2018 22:38:49 +0000 (19:38 -0300)
committerBenjamin Diaz <bdiaz@whitestack.com>
Tue, 9 Oct 2018 17:39:57 +0000 (14:39 -0300)
The code was iterating over all vdus in a vnfd, instead of doing
so over the vdu refs inside the scaling-group. This lead to errors
when there was a vdu without the metric declared in the criteria.
It has been replaced to iterate over the vdus in the scaling group
descriptor, which is the correct behaviour.
This commit also fixes an error in the scale function, which was
still using the old db structure.

Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
Change-Id: I8b8450534f739e6afecc2602086c868ffaee79bc

docker/Dockerfile
osm_policy_module/cmd/policy_module_agent.py
osm_policy_module/core/agent.py
osm_policy_module/core/config.py

index 4095fa8..ab5b411 100644 (file)
@@ -47,5 +47,6 @@ ENV OSMPOL_DATABASE_PORT 27017
 ENV OSMPOL_SQL_DATABASE_URI sqlite:///mon_sqlite.db
 
 ENV OSMPOL_LOG_LEVEL INFO
+ENV OSMPOL_KAFKA_LOG_LEVEL WARN
 
 CMD osm-policy-agent
index 1b6b93a..c82f006 100644 (file)
@@ -43,7 +43,7 @@ def main():
                         datefmt='%m/%d/%Y %I:%M:%S %p',
                         level=logging.getLevelName(cfg.OSMPOL_LOG_LEVEL))
     kafka_logger = logging.getLogger('kafka')
-    kafka_logger.setLevel(logging.WARN)
+    kafka_logger.setLevel(logging.getLevelName(cfg.OSMPOL_KAFKA_LOG_LEVEL))
     kafka_formatter = logging.Formatter(log_formatter_str)
     kafka_handler = logging.StreamHandler(sys.stdout)
     kafka_handler.setFormatter(kafka_formatter)
index a194b08..dfddb3f 100644 (file)
@@ -100,7 +100,9 @@ class PolicyModuleAgent:
         try:
             alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
             log.info("Sending scaling action message for ns: %s", alarm_id)
-            self.lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.vnf_member_index,
+            self.lcm_client.scale(alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
+                                  alarm.scaling_criteria.scaling_policy.scaling_group.name,
+                                  alarm.vnf_member_index,
                                   alarm.action)
         except ScalingAlarm.DoesNotExist:
             log.info("There is no action configured for alarm %s.", alarm_id)
@@ -120,6 +122,7 @@ class PolicyModuleAgent:
 
     def _configure_scaling_groups(self, nsr_id: str):
         # TODO(diazb): Check for alarm creation on exception and clean resources if needed.
+        # TODO: Add support for non-nfvi metrics
         with database.db.atomic():
             vnfrs = self.db_client.get_vnfrs(nsr_id)
             log.info("Checking %s vnfrs...", len(vnfrs))
@@ -161,31 +164,37 @@ class PolicyModuleAgent:
                                 scaling_group=scaling_group_record
                             )
                             log.info("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
-                                     scaling_policy_record.nsr_id,
+                                     scaling_policy_record.name,
                                      scaling_policy_record.scaling_group.name)
-                        for vdu in vnfd['vdu']:
-                            vdu_monitoring_params = vdu['monitoring-param']
-                            for scaling_criteria in scaling_policy['scaling-criteria']:
-                                try:
-                                    scaling_criteria_record = ScalingCriteria.select().join(ScalingPolicy).where(
-                                        ScalingPolicy.id == scaling_policy_record.id,
-                                        ScalingCriteria.name == scaling_criteria['name']
-                                    ).get()
-                                except ScalingCriteria.DoesNotExist:
-                                    log.info("Creating scaling criteria record in DB...")
-                                    scaling_criteria_record = ScalingCriteria.create(
-                                        nsr_id=nsr_id,
-                                        name=scaling_policy['name'],
-                                        scaling_policy=scaling_policy_record
-                                    )
-                                    log.info(
-                                        "Created scaling criteria record in DB : name=%s, scaling_criteria.name=%s",
-                                        scaling_criteria_record.name,
-                                        scaling_criteria_record.scaling_policy.name)
+
+                        for scaling_criteria in scaling_policy['scaling-criteria']:
+                            try:
+                                scaling_criteria_record = ScalingCriteria.select().join(ScalingPolicy).where(
+                                    ScalingPolicy.id == scaling_policy_record.id,
+                                    ScalingCriteria.name == scaling_criteria['name']
+                                ).get()
+                            except ScalingCriteria.DoesNotExist:
+                                log.info("Creating scaling criteria record in DB...")
+                                scaling_criteria_record = ScalingCriteria.create(
+                                    nsr_id=nsr_id,
+                                    name=scaling_criteria['name'],
+                                    scaling_policy=scaling_policy_record
+                                )
+                                log.info(
+                                    "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
+                                    scaling_criteria_record.name,
+                                    scaling_criteria_record.scaling_policy.name)
+
+                            for vdu_ref in scaling_group['vdu']:
                                 vnf_monitoring_param = next(
                                     filter(lambda param: param['id'] == scaling_criteria['vnf-monitoring-param-ref'],
                                            vnf_monitoring_params))
-                                # TODO: Add support for non-nfvi metrics
+                                if not vdu_ref['vdu-id-ref'] == vnf_monitoring_param['vdu-ref']:
+                                    continue
+                                vdu = next(
+                                    filter(lambda vdu: vdu['id'] == vdu_ref['vdu-id-ref'], vnfd['vdu'])
+                                )
+                                vdu_monitoring_params = vdu['monitoring-param']
                                 vdu_monitoring_param = next(
                                     filter(
                                         lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param-ref'],
@@ -194,12 +203,11 @@ class PolicyModuleAgent:
                                                     vnfr['vdur']))
                                 for vdur in vdurs:
                                     try:
-                                        ScalingAlarm.select().where(
-                                            ScalingAlarm.vdu_name == vdur['name']
-                                        ).where(
-                                            ScalingAlarm.scaling_criteria.name == scaling_criteria['name']
+                                        ScalingAlarm.select().join(ScalingCriteria).where(
+                                            ScalingAlarm.vdu_name == vdur['name'],
+                                            ScalingCriteria.name == scaling_criteria['name']
                                         ).get()
-                                        log.debug("VDU %s already has an alarm configured")
+                                        log.debug("VDU %s already has an alarm configured", vdur['name'])
                                         continue
                                     except ScalingAlarm.DoesNotExist:
                                         pass
index dab409b..84a1f57 100644 (file)
@@ -67,6 +67,7 @@ class Config(object):
         CfgParam('OSMPOL_DATABASE_PORT', 27017, int),
         CfgParam('OSMPOL_SQL_DATABASE_URI', "sqlite:///mon_sqlite.db", six.text_type),
         CfgParam('OSMPOL_LOG_LEVEL', "INFO", six.text_type),
+        CfgParam('OSMPOL_KAFKA_LOG_LEVEL', "WARN", six.text_type),
     ]
 
     _config_dict = {cfg.key: cfg for cfg in _configuration}