From 29d590a038ce5c8d745f97ab935393c0bd6a8228 Mon Sep 17 00:00:00 2001 From: Benjamin Diaz Date: Wed, 3 Oct 2018 15:52:47 -0300 Subject: [PATCH] Adds handling of scaled life cycle operation Configures alarms on vdus created after a scaling operation, which correspond to the 'scaled' message, using the same handler used with the 'instantiated' message, in which a check has been added to avoid duplicate alarms. It also fixes the vdu name used when creating the alarm through MON, which is now the vdur name and not the vdu-id-ref used in the vnfd. Finally, it adds a check for the scaling-type param in the scaling policy, only executing the autoscaling logic if it is set to 'automatic'. Signed-off-by: Benjamin Diaz Change-Id: I3499ebdb5605f80ff73d905fbe3ac61d0d806687 --- osm_policy_module/cmd/policy_module_agent.py | 2 +- osm_policy_module/core/agent.py | 238 +++++++++++------- osm_policy_module/core/database.py | 20 +- .../tests/integration/test_policy_agent.py | 30 ++- 4 files changed, 190 insertions(+), 100 deletions(-) diff --git a/osm_policy_module/cmd/policy_module_agent.py b/osm_policy_module/cmd/policy_module_agent.py index 24663e5..1b6b93a 100644 --- a/osm_policy_module/cmd/policy_module_agent.py +++ b/osm_policy_module/cmd/policy_module_agent.py @@ -49,7 +49,7 @@ def main(): kafka_handler.setFormatter(kafka_formatter) kafka_logger.addHandler(kafka_handler) log = logging.getLogger(__name__) - log.info("Config: %s", cfg) + log.info("Config: %s", vars(cfg)) log.info("Syncing database...") db_manager = DatabaseManager() db_manager.create_tables() diff --git a/osm_policy_module/core/agent.py b/osm_policy_module/core/agent.py index 52412a6..a194b08 100644 --- a/osm_policy_module/core/agent.py +++ b/osm_policy_module/core/agent.py @@ -34,16 +34,19 @@ from osm_policy_module.common.lcm_client import LcmClient from osm_policy_module.common.mon_client import MonClient from osm_policy_module.core import database from osm_policy_module.core.config import Config -from osm_policy_module.core.database import ScalingRecord, ScalingAlarm +from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria log = logging.getLogger(__name__) +ALLOWED_KAFKA_KEYS = ['instantiated', 'scaled', 'notify_alarm'] + class PolicyModuleAgent: def __init__(self): cfg = Config.instance() self.db_client = DbClient() self.mon_client = MonClient() + self.lcm_client = LcmClient() self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST, cfg.OSMPOL_MESSAGE_PORT) @@ -60,58 +63,61 @@ class PolicyModuleAgent: def _process_msg(self, topic, key, msg): try: - # Check for ns instantiation - if key == 'instantiated': - try: - content = json.loads(msg) - except JSONDecodeError: - content = yaml.safe_load(msg) - log.info("Message arrived with topic: %s, key: %s, msg: %s", topic, key, content) - nslcmop_id = content['nslcmop_id'] - nslcmop = self.db_client.get_nslcmop(nslcmop_id) - if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED': - nsr_id = nslcmop['nsInstanceId'] - log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id) - self._configure_scaling_groups(nsr_id) - else: - log.info( - "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " - "Current state is %s. Skipping...", - nslcmop['operationState']) - - if key == 'notify_alarm': + log.debug("Message arrived with topic: %s, key: %s, msg: %s", topic, key, msg) + if key in ALLOWED_KAFKA_KEYS: try: content = json.loads(msg) except JSONDecodeError: content = yaml.safe_load(msg) - log.info("Message arrived with topic: %s, key: %s, msg: %s", topic, key, content) - alarm_id = content['notify_details']['alarm_uuid'] - metric_name = content['notify_details']['metric_name'] - operation = content['notify_details']['operation'] - threshold = content['notify_details']['threshold_value'] - vdu_name = content['notify_details']['vdu_name'] - vnf_member_index = content['notify_details']['vnf_member_index'] - ns_id = content['notify_details']['ns_id'] - log.info( - "Received alarm notification for alarm %s, \ - metric %s, \ - operation %s, \ - threshold %s, \ - vdu_name %s, \ - vnf_member_index %s, \ - ns_id %s ", - alarm_id, metric_name, operation, threshold, vdu_name, vnf_member_index, ns_id) - try: - alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get() - lcm_client = LcmClient() - log.info("Sending scaling action message for ns: %s", alarm_id) - lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.vnf_member_index, - alarm.action) - except ScalingAlarm.DoesNotExist: - log.info("There is no action configured for alarm %s.", alarm_id) + + if key == 'instantiated' or key == 'scaled': + self._handle_instantiated_or_scaled(content) + + if key == 'notify_alarm': + self._handle_alarm_notification(content) + else: + log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key) except Exception: log.exception("Error consuming message: ") + def _handle_alarm_notification(self, content): + alarm_id = content['notify_details']['alarm_uuid'] + metric_name = content['notify_details']['metric_name'] + operation = content['notify_details']['operation'] + threshold = content['notify_details']['threshold_value'] + vdu_name = content['notify_details']['vdu_name'] + vnf_member_index = content['notify_details']['vnf_member_index'] + ns_id = content['notify_details']['ns_id'] + log.info( + "Received alarm notification for alarm %s, \ + metric %s, \ + operation %s, \ + threshold %s, \ + vdu_name %s, \ + vnf_member_index %s, \ + ns_id %s ", + alarm_id, metric_name, operation, threshold, vdu_name, vnf_member_index, ns_id) + try: + alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get() + log.info("Sending scaling action message for ns: %s", alarm_id) + self.lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.vnf_member_index, + alarm.action) + except ScalingAlarm.DoesNotExist: + log.info("There is no action configured for alarm %s.", alarm_id) + + def _handle_instantiated_or_scaled(self, content): + nslcmop_id = content['nslcmop_id'] + nslcmop = self.db_client.get_nslcmop(nslcmop_id) + if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED': + nsr_id = nslcmop['nsInstanceId'] + log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id) + self._configure_scaling_groups(nsr_id) + else: + log.info( + "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " + "Current state is %s. Skipping...", + nslcmop['operationState']) + def _configure_scaling_groups(self, nsr_id: str): # TODO(diazb): Check for alarm creation on exception and clean resources if needed. with database.db.atomic(): @@ -123,20 +129,59 @@ class PolicyModuleAgent: scaling_groups = vnfd['scaling-group-descriptor'] vnf_monitoring_params = vnfd['monitoring-param'] for scaling_group in scaling_groups: - log.info("Creating scaling record in DB...") - scaling_record = ScalingRecord.create( - nsr_id=nsr_id, - name=scaling_group['name'], - content=json.dumps(scaling_group) - ) - log.info("Created scaling record in DB : nsr_id=%s, name=%s, content=%s", - scaling_record.nsr_id, - scaling_record.name, - scaling_record.content) + try: + scaling_group_record = ScalingGroup.select().where( + ScalingGroup.nsr_id == nsr_id, + ScalingGroup.name == scaling_group['name'] + ).get() + except ScalingGroup.DoesNotExist: + log.info("Creating scaling group record in DB...") + scaling_group_record = ScalingGroup.create( + nsr_id=nsr_id, + name=scaling_group['name'], + content=json.dumps(scaling_group) + ) + log.info("Created scaling group record in DB : nsr_id=%s, name=%s, content=%s", + scaling_group_record.nsr_id, + scaling_group_record.name, + scaling_group_record.content) for scaling_policy in scaling_group['scaling-policy']: - for vdur in vnfd['vdu']: - vdu_monitoring_params = vdur['monitoring-param'] + if scaling_policy['scaling-type'] != 'automatic': + continue + try: + scaling_policy_record = ScalingPolicy.select().join(ScalingGroup).where( + ScalingPolicy.name == scaling_policy['name'], + ScalingGroup.id == scaling_group_record.id + ).get() + except ScalingPolicy.DoesNotExist: + log.info("Creating scaling policy record in DB...") + scaling_policy_record = ScalingPolicy.create( + nsr_id=nsr_id, + name=scaling_policy['name'], + scaling_group=scaling_group_record + ) + log.info("Created scaling policy record in DB : name=%s, scaling_group.name=%s", + scaling_policy_record.nsr_id, + scaling_policy_record.scaling_group.name) + for vdu in vnfd['vdu']: + vdu_monitoring_params = vdu['monitoring-param'] for scaling_criteria in scaling_policy['scaling-criteria']: + try: + scaling_criteria_record = ScalingCriteria.select().join(ScalingPolicy).where( + ScalingPolicy.id == scaling_policy_record.id, + ScalingCriteria.name == scaling_criteria['name'] + ).get() + except ScalingCriteria.DoesNotExist: + log.info("Creating scaling criteria record in DB...") + scaling_criteria_record = ScalingCriteria.create( + nsr_id=nsr_id, + name=scaling_policy['name'], + scaling_policy=scaling_policy_record + ) + log.info( + "Created scaling criteria record in DB : name=%s, scaling_criteria.name=%s", + scaling_criteria_record.name, + scaling_criteria_record.scaling_policy.name) vnf_monitoring_param = next( filter(lambda param: param['id'] == scaling_criteria['vnf-monitoring-param-ref'], vnf_monitoring_params)) @@ -145,35 +190,48 @@ class PolicyModuleAgent: filter( lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param-ref'], vdu_monitoring_params)) - alarm_uuid = self.mon_client.create_alarm( - metric_name=vdu_monitoring_param['nfvi-metric'], - ns_id=nsr_id, - vdu_name=vdur['name'], - vnf_member_index=vnfr['member-vnf-index-ref'], - threshold=scaling_criteria['scale-in-threshold'], - operation=scaling_criteria['scale-in-relational-operation'], - statistic=vnf_monitoring_param['aggregation-type'] - ) - ScalingAlarm.create( - alarm_id=alarm_uuid, - action='scale_in', - vnf_member_index=int(vnfr['member-vnf-index-ref']), - vdu_name=vdur['name'], - scaling_record=scaling_record - ) - alarm_uuid = self.mon_client.create_alarm( - metric_name=vdu_monitoring_param['nfvi-metric'], - ns_id=nsr_id, - vdu_name=vdur['name'], - vnf_member_index=vnfr['member-vnf-index-ref'], - threshold=scaling_criteria['scale-out-threshold'], - operation=scaling_criteria['scale-out-relational-operation'], - statistic=vnf_monitoring_param['aggregation-type'] - ) - ScalingAlarm.create( - alarm_id=alarm_uuid, - action='scale_out', - vnf_member_index=int(vnfr['member-vnf-index-ref']), - vdu_name=vdur['name'], - scaling_record=scaling_record - ) + vdurs = list(filter(lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param['vdu-ref'], + vnfr['vdur'])) + for vdur in vdurs: + try: + ScalingAlarm.select().where( + ScalingAlarm.vdu_name == vdur['name'] + ).where( + ScalingAlarm.scaling_criteria.name == scaling_criteria['name'] + ).get() + log.debug("VDU %s already has an alarm configured") + continue + except ScalingAlarm.DoesNotExist: + pass + alarm_uuid = self.mon_client.create_alarm( + metric_name=vdu_monitoring_param['nfvi-metric'], + ns_id=nsr_id, + vdu_name=vdur['name'], + vnf_member_index=vnfr['member-vnf-index-ref'], + threshold=scaling_criteria['scale-in-threshold'], + operation=scaling_criteria['scale-in-relational-operation'], + statistic=vnf_monitoring_param['aggregation-type'] + ) + ScalingAlarm.create( + alarm_id=alarm_uuid, + action='scale_in', + vnf_member_index=int(vnfr['member-vnf-index-ref']), + vdu_name=vdur['name'], + scaling_criteria=scaling_criteria_record + ) + alarm_uuid = self.mon_client.create_alarm( + metric_name=vdu_monitoring_param['nfvi-metric'], + ns_id=nsr_id, + vdu_name=vdur['name'], + vnf_member_index=vnfr['member-vnf-index-ref'], + threshold=scaling_criteria['scale-out-threshold'], + operation=scaling_criteria['scale-out-relational-operation'], + statistic=vnf_monitoring_param['aggregation-type'] + ) + ScalingAlarm.create( + alarm_id=alarm_uuid, + action='scale_out', + vnf_member_index=int(vnfr['member-vnf-index-ref']), + vdu_name=vdur['name'], + scaling_criteria=scaling_criteria_record + ) diff --git a/osm_policy_module/core/database.py b/osm_policy_module/core/database.py index f9a37c2..212c13b 100644 --- a/osm_policy_module/core/database.py +++ b/osm_policy_module/core/database.py @@ -23,7 +23,7 @@ ## import logging -from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField +from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField, AutoField from playhouse.sqlite_ext import SqliteExtDatabase from osm_policy_module.core.config import Config @@ -35,29 +35,41 @@ db = SqliteExtDatabase('policy_module.db') class BaseModel(Model): + id = AutoField(primary_key=True) + class Meta: database = db -class ScalingRecord(BaseModel): +class ScalingGroup(BaseModel): nsr_id = CharField() name = CharField() content = TextField() +class ScalingPolicy(BaseModel): + name = CharField() + scaling_group = ForeignKeyField(ScalingGroup, related_name='scaling_policies') + + +class ScalingCriteria(BaseModel): + name = CharField() + scaling_policy = ForeignKeyField(ScalingPolicy, related_name='scaling_criterias') + + class ScalingAlarm(BaseModel): alarm_id = CharField() action = CharField() vnf_member_index = IntegerField() vdu_name = CharField() - scaling_record = ForeignKeyField(ScalingRecord, related_name='scaling_alarms') + scaling_criteria = ForeignKeyField(ScalingCriteria, related_name='scaling_alarms') class DatabaseManager: def create_tables(self): try: db.connect() - db.create_tables([ScalingRecord, ScalingAlarm]) + db.create_tables([ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm]) db.close() except Exception as e: log.exception("Error creating tables: ") diff --git a/osm_policy_module/tests/integration/test_policy_agent.py b/osm_policy_module/tests/integration/test_policy_agent.py index f7dc65a..bed6eb5 100644 --- a/osm_policy_module/tests/integration/test_policy_agent.py +++ b/osm_policy_module/tests/integration/test_policy_agent.py @@ -35,7 +35,7 @@ from osm_policy_module.common.db_client import DbClient from osm_policy_module.common.mon_client import MonClient from osm_policy_module.core import database from osm_policy_module.core.agent import PolicyModuleAgent -from osm_policy_module.core.database import ScalingRecord, ScalingAlarm +from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria log = logging.getLogger() log.level = logging.INFO @@ -402,7 +402,7 @@ vnfd_record_mock = { test_db = SqliteDatabase(':memory:') -MODELS = [ScalingRecord, ScalingAlarm] +MODELS = [ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm] class PolicyModuleAgentTest(unittest.TestCase): @@ -445,12 +445,32 @@ class PolicyModuleAgentTest(unittest.TestCase): operation='GT', statistic='AVERAGE', threshold=80, - vdu_name='cirros_vnfd-VM', + vdu_name='cirros_ns-1-cirros_vnfd-VM-1', vnf_member_index='1') - scaling_record = ScalingRecord.get() + create_alarm.assert_any_call(metric_name='average_memory_utilization', + ns_id='test_nsr_id', + operation='LT', + statistic='AVERAGE', + threshold=20, + vdu_name='cirros_ns-1-cirros_vnfd-VM-1', + vnf_member_index='1') + create_alarm.assert_any_call(metric_name='average_memory_utilization', + ns_id='test_nsr_id', + operation='GT', + statistic='AVERAGE', + threshold=80, + vdu_name='cirros_ns-2-cirros_vnfd-VM-1', + vnf_member_index='2') + create_alarm.assert_any_call(metric_name='average_memory_utilization', + ns_id='test_nsr_id', + operation='LT', + statistic='AVERAGE', + threshold=20, + vdu_name='cirros_ns-2-cirros_vnfd-VM-1', + vnf_member_index='2') + scaling_record = ScalingGroup.get() self.assertEqual(scaling_record.name, 'scale_cirros_vnfd-VM') self.assertEqual(scaling_record.nsr_id, 'test_nsr_id') - self.assertIsNotNone(scaling_record) if __name__ == '__main__': -- 2.17.1