RwLaunchpadYang as launchpadyang,
RwDts as rwdts,
RwVnfrYang,
- RwVnfdYang,
- RwNsdYang
+ RwProjectVnfdYang as RwVnfdYang,
+ RwProjectNsdYang as RwNsdYang,
)
import utest_mon_params
class MonParamMsgGenerator(object):
def __init__(self, num_messages=1):
ping_path = r"/api/v1/ping/stats"
- self._endpoint_msg = vnfryang.YangData_Vnfr_VnfrCatalog_Vnfr_HttpEndpoint.from_dict({
+ self._endpoint_msg = vnfryang.YangData_RwProject_Project_VnfrCatalog_Vnfr_HttpEndpoint.from_dict({
'path': ping_path,
'https': 'true',
'polling_interval_secs': 1,
self._mon_param_msgs = []
for i in range(1, num_messages):
- self._mon_param_msgs.append(vnfryang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict({
+ self._mon_param_msgs.append(vnfryang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam.from_dict({
'id': '%s' % i,
'name': 'param_num_%s' % i,
'json_query_method': "NAMEKEY",
@classmethod
def configure_timeout(cls):
- return 240
+ return 480
def configure_test(self, loop, test_id):
self.log.debug("STARTING - %s", test_id)
def setup_mock_store(self, aggregation_type, monps, legacy=False):
store = mock.MagicMock()
- mock_vnfd = RwVnfdYang.YangData_Vnfd_VnfdCatalog_Vnfd.from_dict({
+ mock_vnfd = RwVnfdYang.YangData_RwProject_Project_VnfdCatalog_Vnfd.from_dict({
'id': "1",
'monitoring_param': [
{'description': 'no of ping requests',
})
store.get_vnfd = mock.MagicMock(return_value=mock_vnfd)
- mock_vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict({
+ mock_vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict({
'id': '1',
'monitoring_param': ([monp.as_dict() for monp in monps] if not legacy else [])
})
- mock_vnfr.vnfd = vnfryang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict({'id': '1'})
+ mock_vnfr.vnfd = vnfryang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd.from_dict({'id': '1'})
store.get_vnfr = mock.MagicMock(return_value=mock_vnfr)
- mock_nsr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.from_dict({
+ mock_nsr = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.from_dict({
'ns_instance_config_ref': "1",
'name_ref': "Foo",
'constituent_vnfr_ref': [{'vnfr_id': mock_vnfr.id}],
'vnfd_monitoring_param_ref': '2'}]
}]
- mock_nsd = RwNsdYang.YangData_Nsd_NsdCatalog_Nsd.from_dict({
+ mock_nsd = RwNsdYang.YangData_RwProject_Project_NsdCatalog_Nsd.from_dict({
'id': str(uuid.uuid1()),
'monitoring_param': (monp if not legacy else [])
})
def register_vnf_publisher(self):
yield from self.vnf_handler.register()
- def add_param_to_publisher(self):
+ def add_param_to_publisher(self, publisher):
msg = self.msg_gen.next_message()
- self.vnf_handler.on_update_mon_params([msg])
+ publisher.on_update_mon_params([msg])
return msg
@asyncio.coroutine
@rift.test.dts.async_test
def _test_add_vnf_mon_params(self):
yield from self.register_vnf_publisher()
- self.add_param_to_publisher()
+ self.add_param_to_publisher(self.vnf_handler)
yield from self.register_vnf_test_subscriber()
- self.add_param_to_publisher()
+ self.add_param_to_publisher(self.vnf_handler)
# RIFT-12888: Elements do not go immediately into cache after on_prepare.
# Because of this, we can't guarantee that the second param will actually be
def _test_publish(self, aggregation_type, expected_value, legacy=False):
- self.msg_gen = MonParamMsgGenerator(4)
+ self.msg_gen = MonParamMsgGenerator(5)
store = self.setup_mock_store(aggregation_type=aggregation_type,
monps=self.msg_gen.mon_param_msgs,
legacy=legacy)
published_xpaths = yield from self.get_published_xpaths()
yield from self.register_vnf_publisher()
- self.add_param_to_publisher()
- self.add_param_to_publisher()
+ self.add_param_to_publisher(self.vnf_handler)
+ self.add_param_to_publisher(self.vnf_handler)
nsr_id = store.get_nsr().ns_instance_config_ref
- yield from asyncio.sleep(5, loop=self.loop)
+ yield from asyncio.sleep(2, loop=self.loop)
itr = yield from self.dts.query_read(self.nsr_handler.xpath(),
rwdts.XactFlag.MERGE)
yield from self._test_publish("COUNT", 2)
@rift.test.dts.async_test
- def _test_legacy_nsr_monitor_publish_avg(self):
+ def test_legacy_nsr_monitor_publish_avg(self):
yield from self._test_publish("AVERAGE", 1, legacy=True)
+ @rift.test.dts.async_test
+ def test_vnfr_add_delete(self):
+ yield from self._test_publish("SUM", 3)
+
+ self.msg_gen = MonParamMsgGenerator(5)
+ store = self.setup_mock_store(aggregation_type="SUM",
+ monps=self.msg_gen.mon_param_msgs)
+ new_vnf_handler = vnf_mon_params.VnfMonitorDtsHandler(
+ self.log, self.dts, self.loop, 2, "2.2.2.1",
+ self.msg_gen.mon_param_msgs, self.msg_gen.endpoint_msgs
+ )
+ yield from new_vnf_handler.register()
+
+ # add a new vnfr
+ new_vnfr = store.get_vnfr()
+ new_vnfr.id = '2'
+ yield from self.nsr_handler.update([new_vnfr])
+
+ # check if the newly created one has been added in the model
+ poller = self.nsr_handler.mon_params_pollers[0]
+ assert len(poller.monp.nsr_mon_param_msg.vnfr_mon_param_ref) == 4
+ assert len(poller.subscribers) == 4
+ assert len(poller.monp.vnfr_monparams) == 4
+
+ # publish new values
+ yield from asyncio.sleep(2, loop=self.loop)
+ self.add_param_to_publisher(new_vnf_handler)
+ self.add_param_to_publisher(new_vnf_handler)
+ yield from asyncio.sleep(3, loop=self.loop)
+
+ itr = yield from self.dts.query_read(self.nsr_handler.xpath(),
+ rwdts.XactFlag.MERGE)
+
+ values = []
+ for res in itr:
+ result = yield from res
+ nsr_monp = result.result
+ values.append(nsr_monp.value_integer)
+
+ assert values[0] == 6
+
+ # delete the VNFR
+ yield from self.nsr_handler.delete([new_vnfr])
+
+ # check if the newly created one has been added in the model
+ poller = self.nsr_handler.mon_params_pollers[0]
+ assert len(poller.monp.vnfr_monparams) == 2
+ assert len(poller.monp.nsr_mon_param_msg.vnfr_mon_param_ref) == 2
+ assert len(poller.subscribers) == 2
+
+ self.msg_gen = MonParamMsgGenerator(5)
+ self.add_param_to_publisher(self.vnf_handler)
+ self.add_param_to_publisher(self.vnf_handler)
+ yield from asyncio.sleep(2, loop=self.loop)
+
+ itr = yield from self.dts.query_read(self.nsr_handler.xpath(),
+ rwdts.XactFlag.MERGE)
+ values = []
+ for res in itr:
+ result = yield from res
+ nsr_monp = result.result
+ values.append(nsr_monp.value_integer)
+
+ assert values[0] == 3
+
def main():