from osm_policy_module.common.common_db_client import CommonDbClient
from osm_policy_module.common.mon_client import MonClient
from osm_policy_module.core import database
-from osm_policy_module.autoscaling.agent import PolicyModuleAgent
+from osm_policy_module.core.agent import PolicyModuleAgent
from osm_policy_module.core.config import Config
-from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria
+from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria, VnfAlarm, \
+ AlarmAction
log = logging.getLogger()
log.level = logging.INFO
}
}
],
- "image": "cirros034"
+ "image": "cirros034",
+ "alarm": [
+ {
+ "value": 20.0000,
+ "actions": {
+ "insufficient-data": [
+ {
+ "url": "localhost:9090"
+ }
+ ],
+ "ok": [
+ {
+ "url": "localhost:9090"
+ }
+ ],
+ "alarm": [
+ {
+ "url": "localhost:9090"
+ }
+ ]
+ },
+ "alarm-id": "alarm-1",
+ "operation": "LT",
+ "vnf-monitoring-param-ref": "cirros_vnf_memory_util"
+ }
+ ]
}
],
"monitoring-param": [
}
}
-MODELS = [ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm]
+MODELS = [ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm, VnfAlarm, AlarmAction]
class PolicyModuleAgentTest(unittest.TestCase):
create_alarm.side_effect = _test_configure_scaling_groups_create_alarm
config = Config()
agent = PolicyModuleAgent(config, self.loop)
- self.loop.run_until_complete(agent.service.configure_scaling_groups("test_nsr_id"))
+ self.loop.run_until_complete(agent.autoscaling_service.configure_scaling_groups("test_nsr_id"))
create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util',
ns_id='test_nsr_id',
operation='GT',
self.assertEqual(scaling_record.name, 'scale_cirros_vnfd-VM')
self.assertEqual(scaling_record.nsr_id, 'test_nsr_id')
+ @patch.object(DbMongo, 'db_connect', Mock())
+ @patch.object(KafkaProducer, '__init__')
+ @patch.object(MonClient, 'create_alarm')
+ @patch.object(CommonDbClient, 'get_vnfd')
+ @patch.object(CommonDbClient, 'get_nsr')
+ @patch.object(CommonDbClient, 'get_vnfr')
+ def test_configure_vnf_alarms(self, get_vnfr, get_nsr, get_vnfd, create_alarm, kafka_producer_init):
+ def _test_configure_scaling_groups_get_vnfr(*args, **kwargs):
+ if '1' in args[1]:
+ return vnfr_record_mocks[0]
+ if '2' in args[1]:
+ return vnfr_record_mocks[1]
+
+ async def _test_configure_vnf_alarms_create_alarm(*args, **kwargs):
+ return uuid.uuid4()
+
+ kafka_producer_init.return_value = None
+ get_vnfr.side_effect = _test_configure_scaling_groups_get_vnfr
+ get_nsr.return_value = nsr_record_mock
+ get_vnfd.return_value = vnfd_record_mock
+ create_alarm.side_effect = _test_configure_vnf_alarms_create_alarm
+ config = Config()
+ agent = PolicyModuleAgent(config, self.loop)
+ self.loop.run_until_complete(agent.alarming_service.configure_vnf_alarms("test_nsr_id"))
+ create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util',
+ ns_id='test_nsr_id',
+ operation='LT',
+ statistic='AVERAGE',
+ threshold=20.0,
+ vdu_name='cirros_ns-1-cirros_vnfd-VM-1',
+ vnf_member_index='1')
+ create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util',
+ ns_id='test_nsr_id',
+ operation='LT',
+ statistic='AVERAGE',
+ threshold=20.0,
+ vdu_name='cirros_ns-2-cirros_vnfd-VM-1',
+ vnf_member_index='2')
+
if __name__ == '__main__':
unittest.main()