from kafka import KafkaProducer
from osm_common.dbmongo import DbMongo
-from osm_policy_module.core import database
from peewee import SqliteDatabase
+from osm_policy_module.common.db_client import DbClient
from osm_policy_module.common.mon_client import MonClient
+from osm_policy_module.core import database
from osm_policy_module.core.agent import PolicyModuleAgent
-from osm_policy_module.core.database import ScalingRecord, ScalingAlarm, BaseModel
+from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria
log = logging.getLogger()
log.level = logging.INFO
test_db = SqliteDatabase(':memory:')
-MODELS = [ScalingRecord, ScalingAlarm]
+MODELS = [ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm]
class PolicyModuleAgentTest(unittest.TestCase):
@patch.object(DbMongo, 'db_connect', Mock())
@patch.object(KafkaProducer, '__init__')
@patch.object(MonClient, 'create_alarm')
- @patch.object(PolicyModuleAgent, '_get_vnfd')
- @patch.object(PolicyModuleAgent, '_get_nsr')
- @patch.object(PolicyModuleAgent, '_get_vnfr')
+ @patch.object(DbClient, 'get_vnfd')
+ @patch.object(DbClient, 'get_nsr')
+ @patch.object(DbClient, 'get_vnfr')
def test_configure_scaling_groups(self, get_vnfr, get_nsr, get_vnfd, create_alarm, kafka_producer_init):
def _test_configure_scaling_groups_get_vnfr(*args, **kwargs):
if '1' in args[1]:
operation='GT',
statistic='AVERAGE',
threshold=80,
- vdu_name='cirros_vnfd-VM',
+ vdu_name='cirros_ns-1-cirros_vnfd-VM-1',
vnf_member_index='1')
- scaling_record = ScalingRecord.get()
+ create_alarm.assert_any_call(metric_name='average_memory_utilization',
+ ns_id='test_nsr_id',
+ operation='LT',
+ statistic='AVERAGE',
+ threshold=20,
+ vdu_name='cirros_ns-1-cirros_vnfd-VM-1',
+ vnf_member_index='1')
+ create_alarm.assert_any_call(metric_name='average_memory_utilization',
+ ns_id='test_nsr_id',
+ operation='GT',
+ statistic='AVERAGE',
+ threshold=80,
+ vdu_name='cirros_ns-2-cirros_vnfd-VM-1',
+ vnf_member_index='2')
+ create_alarm.assert_any_call(metric_name='average_memory_utilization',
+ ns_id='test_nsr_id',
+ operation='LT',
+ statistic='AVERAGE',
+ threshold=20,
+ vdu_name='cirros_ns-2-cirros_vnfd-VM-1',
+ vnf_member_index='2')
+ scaling_record = ScalingGroup.get()
self.assertEqual(scaling_record.name, 'scale_cirros_vnfd-VM')
self.assertEqual(scaling_record.nsr_id, 'test_nsr_id')
- self.assertIsNotNone(scaling_record)
if __name__ == '__main__':