##
import asyncio
import logging
+import os
import sys
import unittest
import uuid
from osm_policy_module.common.common_db_client import CommonDbClient
from osm_policy_module.common.mon_client import MonClient
from osm_policy_module.core import database
-from osm_policy_module.core.agent import PolicyModuleAgent
+from osm_policy_module.autoscaling.agent import PolicyModuleAgent
from osm_policy_module.core.config import Config
from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria
class PolicyModuleAgentTest(unittest.TestCase):
def setUp(self):
super()
- database.db.initialize(connect('sqlite://'))
+ database.db.initialize(connect('sqlite:///test_db.sqlite'))
database.db.bind(MODELS)
database.db.connect()
database.db.drop_tables(MODELS)
database.db.create_tables(MODELS)
+ database.db.close()
self.loop = asyncio.new_event_loop()
def tearDown(self):
super()
+ os.remove('test_db.sqlite')
@patch.object(DbMongo, 'db_connect', Mock())
@patch.object(KafkaProducer, '__init__')
create_alarm.side_effect = _test_configure_scaling_groups_create_alarm
config = Config()
agent = PolicyModuleAgent(config, self.loop)
- self.loop.run_until_complete(agent._configure_scaling_groups("test_nsr_id"))
+ self.loop.run_until_complete(agent.service.configure_scaling_groups("test_nsr_id"))
create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util',
ns_id='test_nsr_id',
operation='GT',