##
import asyncio
import logging
+import os
import sys
import unittest
import uuid
from kafka import KafkaProducer
from osm_common.dbmongo import DbMongo
-from peewee import SqliteDatabase
+from playhouse.db_url import connect
from osm_policy_module.common.common_db_client import CommonDbClient
from osm_policy_module.common.mon_client import MonClient
from osm_policy_module.core import database
-from osm_policy_module.core.agent import PolicyModuleAgent
+from osm_policy_module.autoscaling.agent import PolicyModuleAgent
+from osm_policy_module.core.config import Config
from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria
log = logging.getLogger()
}
}
-test_db = SqliteDatabase(':memory:')
-
MODELS = [ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm]
class PolicyModuleAgentTest(unittest.TestCase):
def setUp(self):
super()
- database.db = test_db
- test_db.bind(MODELS)
- test_db.connect()
- test_db.drop_tables(MODELS)
- test_db.create_tables(MODELS)
+ database.db.initialize(connect('sqlite:///test_db.sqlite'))
+ database.db.bind(MODELS)
+ database.db.connect()
+ database.db.drop_tables(MODELS)
+ database.db.create_tables(MODELS)
+ database.db.close()
self.loop = asyncio.new_event_loop()
- asyncio.set_event_loop(None)
def tearDown(self):
super()
+ os.remove('test_db.sqlite')
@patch.object(DbMongo, 'db_connect', Mock())
@patch.object(KafkaProducer, '__init__')
get_nsr.return_value = nsr_record_mock
get_vnfd.return_value = vnfd_record_mock
create_alarm.side_effect = _test_configure_scaling_groups_create_alarm
- agent = PolicyModuleAgent(self.loop)
- self.loop.run_until_complete(agent._configure_scaling_groups("test_nsr_id"))
+ config = Config()
+ agent = PolicyModuleAgent(config, self.loop)
+ self.loop.run_until_complete(agent.service.configure_scaling_groups("test_nsr_id"))
create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util',
ns_id='test_nsr_id',
operation='GT',