Adds handling of scaled life cycle operation 20/6620/2
authorBenjamin Diaz <bdiaz@whitestack.com>
Wed, 3 Oct 2018 18:52:47 +0000 (15:52 -0300)
committerBenjamin Diaz <bdiaz@whitestack.com>
Thu, 4 Oct 2018 13:57:48 +0000 (10:57 -0300)
Configures alarms on vdus created after a scaling operation, which
correspond to the 'scaled' message, using the same handler used with
the 'instantiated' message, in which a check has been added to avoid
duplicate alarms.
It also fixes the vdu name used when creating the alarm through MON,
which is now the vdur name and not the vdu-id-ref used in the vnfd.
Finally, it adds a check for the scaling-type param in the scaling policy,
only executing the autoscaling logic if it is set to 'automatic'.

Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
Change-Id: I3499ebdb5605f80ff73d905fbe3ac61d0d806687

osm_policy_module/cmd/policy_module_agent.py
osm_policy_module/core/agent.py
osm_policy_module/core/database.py
osm_policy_module/tests/integration/test_policy_agent.py

index 24663e5..1b6b93a 100644 (file)
@@ -49,7 +49,7 @@ def main():
     kafka_handler.setFormatter(kafka_formatter)
     kafka_logger.addHandler(kafka_handler)
     log = logging.getLogger(__name__)
-    log.info("Config: %s", cfg)
+    log.info("Config: %s", vars(cfg))
     log.info("Syncing database...")
     db_manager = DatabaseManager()
     db_manager.create_tables()
index 52412a6..a194b08 100644 (file)
@@ -34,16 +34,19 @@ from osm_policy_module.common.lcm_client import LcmClient
 from osm_policy_module.common.mon_client import MonClient
 from osm_policy_module.core import database
 from osm_policy_module.core.config import Config
-from osm_policy_module.core.database import ScalingRecord, ScalingAlarm
+from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria
 
 log = logging.getLogger(__name__)
 
+ALLOWED_KAFKA_KEYS = ['instantiated', 'scaled', 'notify_alarm']
+
 
 class PolicyModuleAgent:
     def __init__(self):
         cfg = Config.instance()
         self.db_client = DbClient()
         self.mon_client = MonClient()
+        self.lcm_client = LcmClient()
         self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
                                            cfg.OSMPOL_MESSAGE_PORT)
 
@@ -60,58 +63,61 @@ class PolicyModuleAgent:
 
     def _process_msg(self, topic, key, msg):
         try:
-            # Check for ns instantiation
-            if key == 'instantiated':
-                try:
-                    content = json.loads(msg)
-                except JSONDecodeError:
-                    content = yaml.safe_load(msg)
-                log.info("Message arrived with topic: %s, key: %s, msg: %s", topic, key, content)
-                nslcmop_id = content['nslcmop_id']
-                nslcmop = self.db_client.get_nslcmop(nslcmop_id)
-                if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
-                    nsr_id = nslcmop['nsInstanceId']
-                    log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
-                    self._configure_scaling_groups(nsr_id)
-                else:
-                    log.info(
-                        "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
-                        "Current state is %s. Skipping...",
-                        nslcmop['operationState'])
-
-            if key == 'notify_alarm':
+            log.debug("Message arrived with topic: %s, key: %s, msg: %s", topic, key, msg)
+            if key in ALLOWED_KAFKA_KEYS:
                 try:
                     content = json.loads(msg)
                 except JSONDecodeError:
                     content = yaml.safe_load(msg)
-                log.info("Message arrived with topic: %s, key: %s, msg: %s", topic, key, content)
-                alarm_id = content['notify_details']['alarm_uuid']
-                metric_name = content['notify_details']['metric_name']
-                operation = content['notify_details']['operation']
-                threshold = content['notify_details']['threshold_value']
-                vdu_name = content['notify_details']['vdu_name']
-                vnf_member_index = content['notify_details']['vnf_member_index']
-                ns_id = content['notify_details']['ns_id']
-                log.info(
-                    "Received alarm notification for alarm %s, \
-                    metric %s, \
-                    operation %s, \
-                    threshold %s, \
-                    vdu_name %s, \
-                    vnf_member_index %s, \
-                    ns_id %s ",
-                    alarm_id, metric_name, operation, threshold, vdu_name, vnf_member_index, ns_id)
-                try:
-                    alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
-                    lcm_client = LcmClient()
-                    log.info("Sending scaling action message for ns: %s", alarm_id)
-                    lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.vnf_member_index,
-                                     alarm.action)
-                except ScalingAlarm.DoesNotExist:
-                    log.info("There is no action configured for alarm %s.", alarm_id)
+
+                if key == 'instantiated' or key == 'scaled':
+                    self._handle_instantiated_or_scaled(content)
+
+                if key == 'notify_alarm':
+                    self._handle_alarm_notification(content)
+            else:
+                log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key)
         except Exception:
             log.exception("Error consuming message: ")
 
+    def _handle_alarm_notification(self, content):
+        alarm_id = content['notify_details']['alarm_uuid']
+        metric_name = content['notify_details']['metric_name']
+        operation = content['notify_details']['operation']
+        threshold = content['notify_details']['threshold_value']
+        vdu_name = content['notify_details']['vdu_name']
+        vnf_member_index = content['notify_details']['vnf_member_index']
+        ns_id = content['notify_details']['ns_id']
+        log.info(
+            "Received alarm notification for alarm %s, \
+            metric %s, \
+            operation %s, \
+            threshold %s, \
+            vdu_name %s, \
+            vnf_member_index %s, \
+            ns_id %s ",
+            alarm_id, metric_name, operation, threshold, vdu_name, vnf_member_index, ns_id)
+        try:
+            alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
+            log.info("Sending scaling action message for ns: %s", alarm_id)
+            self.lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.vnf_member_index,
+                                  alarm.action)
+        except ScalingAlarm.DoesNotExist:
+            log.info("There is no action configured for alarm %s.", alarm_id)
+
+    def _handle_instantiated_or_scaled(self, content):
+        nslcmop_id = content['nslcmop_id']
+        nslcmop = self.db_client.get_nslcmop(nslcmop_id)
+        if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
+            nsr_id = nslcmop['nsInstanceId']
+            log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
+            self._configure_scaling_groups(nsr_id)
+        else:
+            log.info(
+                "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
+                "Current state is %s. Skipping...",
+                nslcmop['operationState'])
+
     def _configure_scaling_groups(self, nsr_id: str):
         # TODO(diazb): Check for alarm creation on exception and clean resources if needed.
         with database.db.atomic():
@@ -123,20 +129,59 @@ class PolicyModuleAgent:
                 scaling_groups = vnfd['scaling-group-descriptor']
                 vnf_monitoring_params = vnfd['monitoring-param']
                 for scaling_group in scaling_groups:
-                    log.info("Creating scaling record in DB...")
-                    scaling_record = ScalingRecord.create(
-                        nsr_id=nsr_id,
-                        name=scaling_group['name'],
-                        content=json.dumps(scaling_group)
-                    )
-                    log.info("Created scaling record in DB : nsr_id=%s, name=%s, content=%s",
-                             scaling_record.nsr_id,
-                             scaling_record.name,
-                             scaling_record.content)
+                    try:
+                        scaling_group_record = ScalingGroup.select().where(
+                            ScalingGroup.nsr_id == nsr_id,
+                            ScalingGroup.name == scaling_group['name']
+                        ).get()
+                    except ScalingGroup.DoesNotExist:
+                        log.info("Creating scaling group record in DB...")
+                        scaling_group_record = ScalingGroup.create(
+                            nsr_id=nsr_id,
+                            name=scaling_group['name'],
+                            content=json.dumps(scaling_group)
+                        )
+                        log.info("Created scaling group record in DB : nsr_id=%s, name=%s, content=%s",
+                                 scaling_group_record.nsr_id,
+                                 scaling_group_record.name,
+                                 scaling_group_record.content)
                     for scaling_policy in scaling_group['scaling-policy']:
-                        for vdur in vnfd['vdu']:
-                            vdu_monitoring_params = vdur['monitoring-param']
+                        if scaling_policy['scaling-type'] != 'automatic':
+                            continue
+                        try:
+                            scaling_policy_record = ScalingPolicy.select().join(ScalingGroup).where(
+                                ScalingPolicy.name == scaling_policy['name'],
+                                ScalingGroup.id == scaling_group_record.id
+                            ).get()
+                        except ScalingPolicy.DoesNotExist:
+                            log.info("Creating scaling policy record in DB...")
+                            scaling_policy_record = ScalingPolicy.create(
+                                nsr_id=nsr_id,
+                                name=scaling_policy['name'],
+                                scaling_group=scaling_group_record
+                            )
+                            log.info("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
+                                     scaling_policy_record.nsr_id,
+                                     scaling_policy_record.scaling_group.name)
+                        for vdu in vnfd['vdu']:
+                            vdu_monitoring_params = vdu['monitoring-param']
                             for scaling_criteria in scaling_policy['scaling-criteria']:
+                                try:
+                                    scaling_criteria_record = ScalingCriteria.select().join(ScalingPolicy).where(
+                                        ScalingPolicy.id == scaling_policy_record.id,
+                                        ScalingCriteria.name == scaling_criteria['name']
+                                    ).get()
+                                except ScalingCriteria.DoesNotExist:
+                                    log.info("Creating scaling criteria record in DB...")
+                                    scaling_criteria_record = ScalingCriteria.create(
+                                        nsr_id=nsr_id,
+                                        name=scaling_policy['name'],
+                                        scaling_policy=scaling_policy_record
+                                    )
+                                    log.info(
+                                        "Created scaling criteria record in DB : name=%s, scaling_criteria.name=%s",
+                                        scaling_criteria_record.name,
+                                        scaling_criteria_record.scaling_policy.name)
                                 vnf_monitoring_param = next(
                                     filter(lambda param: param['id'] == scaling_criteria['vnf-monitoring-param-ref'],
                                            vnf_monitoring_params))
@@ -145,35 +190,48 @@ class PolicyModuleAgent:
                                     filter(
                                         lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param-ref'],
                                         vdu_monitoring_params))
-                                alarm_uuid = self.mon_client.create_alarm(
-                                    metric_name=vdu_monitoring_param['nfvi-metric'],
-                                    ns_id=nsr_id,
-                                    vdu_name=vdur['name'],
-                                    vnf_member_index=vnfr['member-vnf-index-ref'],
-                                    threshold=scaling_criteria['scale-in-threshold'],
-                                    operation=scaling_criteria['scale-in-relational-operation'],
-                                    statistic=vnf_monitoring_param['aggregation-type']
-                                )
-                                ScalingAlarm.create(
-                                    alarm_id=alarm_uuid,
-                                    action='scale_in',
-                                    vnf_member_index=int(vnfr['member-vnf-index-ref']),
-                                    vdu_name=vdur['name'],
-                                    scaling_record=scaling_record
-                                )
-                                alarm_uuid = self.mon_client.create_alarm(
-                                    metric_name=vdu_monitoring_param['nfvi-metric'],
-                                    ns_id=nsr_id,
-                                    vdu_name=vdur['name'],
-                                    vnf_member_index=vnfr['member-vnf-index-ref'],
-                                    threshold=scaling_criteria['scale-out-threshold'],
-                                    operation=scaling_criteria['scale-out-relational-operation'],
-                                    statistic=vnf_monitoring_param['aggregation-type']
-                                )
-                                ScalingAlarm.create(
-                                    alarm_id=alarm_uuid,
-                                    action='scale_out',
-                                    vnf_member_index=int(vnfr['member-vnf-index-ref']),
-                                    vdu_name=vdur['name'],
-                                    scaling_record=scaling_record
-                                )
+                                vdurs = list(filter(lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param['vdu-ref'],
+                                                    vnfr['vdur']))
+                                for vdur in vdurs:
+                                    try:
+                                        ScalingAlarm.select().where(
+                                            ScalingAlarm.vdu_name == vdur['name']
+                                        ).where(
+                                            ScalingAlarm.scaling_criteria.name == scaling_criteria['name']
+                                        ).get()
+                                        log.debug("VDU %s already has an alarm configured")
+                                        continue
+                                    except ScalingAlarm.DoesNotExist:
+                                        pass
+                                    alarm_uuid = self.mon_client.create_alarm(
+                                        metric_name=vdu_monitoring_param['nfvi-metric'],
+                                        ns_id=nsr_id,
+                                        vdu_name=vdur['name'],
+                                        vnf_member_index=vnfr['member-vnf-index-ref'],
+                                        threshold=scaling_criteria['scale-in-threshold'],
+                                        operation=scaling_criteria['scale-in-relational-operation'],
+                                        statistic=vnf_monitoring_param['aggregation-type']
+                                    )
+                                    ScalingAlarm.create(
+                                        alarm_id=alarm_uuid,
+                                        action='scale_in',
+                                        vnf_member_index=int(vnfr['member-vnf-index-ref']),
+                                        vdu_name=vdur['name'],
+                                        scaling_criteria=scaling_criteria_record
+                                    )
+                                    alarm_uuid = self.mon_client.create_alarm(
+                                        metric_name=vdu_monitoring_param['nfvi-metric'],
+                                        ns_id=nsr_id,
+                                        vdu_name=vdur['name'],
+                                        vnf_member_index=vnfr['member-vnf-index-ref'],
+                                        threshold=scaling_criteria['scale-out-threshold'],
+                                        operation=scaling_criteria['scale-out-relational-operation'],
+                                        statistic=vnf_monitoring_param['aggregation-type']
+                                    )
+                                    ScalingAlarm.create(
+                                        alarm_id=alarm_uuid,
+                                        action='scale_out',
+                                        vnf_member_index=int(vnfr['member-vnf-index-ref']),
+                                        vdu_name=vdur['name'],
+                                        scaling_criteria=scaling_criteria_record
+                                    )
index f9a37c2..212c13b 100644 (file)
@@ -23,7 +23,7 @@
 ##
 import logging
 
-from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField
+from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField, AutoField
 from playhouse.sqlite_ext import SqliteExtDatabase
 
 from osm_policy_module.core.config import Config
@@ -35,29 +35,41 @@ db = SqliteExtDatabase('policy_module.db')
 
 
 class BaseModel(Model):
+    id = AutoField(primary_key=True)
+
     class Meta:
         database = db
 
 
-class ScalingRecord(BaseModel):
+class ScalingGroup(BaseModel):
     nsr_id = CharField()
     name = CharField()
     content = TextField()
 
 
+class ScalingPolicy(BaseModel):
+    name = CharField()
+    scaling_group = ForeignKeyField(ScalingGroup, related_name='scaling_policies')
+
+
+class ScalingCriteria(BaseModel):
+    name = CharField()
+    scaling_policy = ForeignKeyField(ScalingPolicy, related_name='scaling_criterias')
+
+
 class ScalingAlarm(BaseModel):
     alarm_id = CharField()
     action = CharField()
     vnf_member_index = IntegerField()
     vdu_name = CharField()
-    scaling_record = ForeignKeyField(ScalingRecord, related_name='scaling_alarms')
+    scaling_criteria = ForeignKeyField(ScalingCriteria, related_name='scaling_alarms')
 
 
 class DatabaseManager:
     def create_tables(self):
         try:
             db.connect()
-            db.create_tables([ScalingRecord, ScalingAlarm])
+            db.create_tables([ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm])
             db.close()
         except Exception as e:
             log.exception("Error creating tables: ")
index f7dc65a..bed6eb5 100644 (file)
@@ -35,7 +35,7 @@ from osm_policy_module.common.db_client import DbClient
 from osm_policy_module.common.mon_client import MonClient
 from osm_policy_module.core import database
 from osm_policy_module.core.agent import PolicyModuleAgent
-from osm_policy_module.core.database import ScalingRecord, ScalingAlarm
+from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria
 
 log = logging.getLogger()
 log.level = logging.INFO
@@ -402,7 +402,7 @@ vnfd_record_mock = {
 
 test_db = SqliteDatabase(':memory:')
 
-MODELS = [ScalingRecord, ScalingAlarm]
+MODELS = [ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm]
 
 
 class PolicyModuleAgentTest(unittest.TestCase):
@@ -445,12 +445,32 @@ class PolicyModuleAgentTest(unittest.TestCase):
                                      operation='GT',
                                      statistic='AVERAGE',
                                      threshold=80,
-                                     vdu_name='cirros_vnfd-VM',
+                                     vdu_name='cirros_ns-1-cirros_vnfd-VM-1',
                                      vnf_member_index='1')
-        scaling_record = ScalingRecord.get()
+        create_alarm.assert_any_call(metric_name='average_memory_utilization',
+                                     ns_id='test_nsr_id',
+                                     operation='LT',
+                                     statistic='AVERAGE',
+                                     threshold=20,
+                                     vdu_name='cirros_ns-1-cirros_vnfd-VM-1',
+                                     vnf_member_index='1')
+        create_alarm.assert_any_call(metric_name='average_memory_utilization',
+                                     ns_id='test_nsr_id',
+                                     operation='GT',
+                                     statistic='AVERAGE',
+                                     threshold=80,
+                                     vdu_name='cirros_ns-2-cirros_vnfd-VM-1',
+                                     vnf_member_index='2')
+        create_alarm.assert_any_call(metric_name='average_memory_utilization',
+                                     ns_id='test_nsr_id',
+                                     operation='LT',
+                                     statistic='AVERAGE',
+                                     threshold=20,
+                                     vdu_name='cirros_ns-2-cirros_vnfd-VM-1',
+                                     vnf_member_index='2')
+        scaling_record = ScalingGroup.get()
         self.assertEqual(scaling_record.name, 'scale_cirros_vnfd-VM')
         self.assertEqual(scaling_record.nsr_id, 'test_nsr_id')
-        self.assertIsNotNone(scaling_record)
 
 
 if __name__ == '__main__':