X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwstagingmgr%2Ftest%2Futest_publisher_dts.py;fp=rwlaunchpad%2Fplugins%2Frwstagingmgr%2Ftest%2Futest_publisher_dts.py;h=d88a0e276dd6bfc22e21b9c5aa38e0ab0f8e0c33;hb=4870d0ee29789b859931e4e2c73e13dcb29537d5;hp=585a0d922e71c0dfa3acd22e0d21b1ee986c9ff6;hpb=6f1a3fe149e4a6b9803382cb299c902f4cf58ec9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwstagingmgr/test/utest_publisher_dts.py b/rwlaunchpad/plugins/rwstagingmgr/test/utest_publisher_dts.py index 585a0d92..d88a0e27 100755 --- a/rwlaunchpad/plugins/rwstagingmgr/test/utest_publisher_dts.py +++ b/rwlaunchpad/plugins/rwstagingmgr/test/utest_publisher_dts.py @@ -18,6 +18,7 @@ import argparse import asyncio +import gi import logging import os import sys @@ -25,7 +26,6 @@ import unittest import uuid import xmlrunner -import gi gi.require_version('RwDts', '1.0') gi.require_version('RwStagingMgmtYang', '1.0') from gi.repository import ( @@ -34,6 +34,15 @@ from gi.repository import ( ) import rift.tasklets.rwstagingmgr.publisher as publisher import rift.test.dts +from rift.mano.utils.project import ManoProject +gi.require_version('RwKeyspec', '1.0') +from gi.repository.RwKeyspec import quoted_key + +class TestProject(ManoProject): + def __init__(self, log, dts, loop): + super().__init__(log) + self._dts = dts + self._loop = loop class TestCase(rift.test.dts.AbstractDTSTest): @@ -49,8 +58,9 @@ class TestCase(rift.test.dts.AbstractDTSTest): self.log.debug("STARTING - %s", test_id) self.tinfo = self.new_tinfo(str(test_id)) self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop) + self.project = TestProject(self.log, self.dts, self.loop) - self.job_handler = publisher.StagingStorePublisher(self.log, self.dts, self.loop) + self.job_handler = publisher.StagingStorePublisher(self.project) def tearDown(self): super().tearDown() @@ -82,6 +92,7 @@ class TestCase(rift.test.dts.AbstractDTSTest): yield from asyncio.sleep(2, loop=self.loop) published_xpaths = yield from self.get_published_xpaths() assert self.job_handler.xpath() in published_xpaths + self.job_handler.deregister() @rift.test.dts.async_test def test_publish(self): @@ -89,14 +100,15 @@ class TestCase(rift.test.dts.AbstractDTSTest): """ yield from self.job_handler.register() - mock_msg = RwStagingMgmtYang.StagingArea.from_dict({ + mock_msg = RwStagingMgmtYang.YangData_RwProject_Project_StagingAreas_StagingArea.from_dict({ "area_id": "123"}) self.job_handler.on_staging_area_create(mock_msg) yield from asyncio.sleep(5, loop=self.loop) - itr = yield from self.dts.query_read("/staging-areas/staging-area[area-id='{}']".format( - mock_msg.area_id)) + xpath = self.project.add_project("/staging-areas/staging-area[area-id={}]". + format(quoted_key(mock_msg.area_id))) + itr = yield from self.dts.query_read(xpath) result = None @@ -106,6 +118,7 @@ class TestCase(rift.test.dts.AbstractDTSTest): print (result) assert result == mock_msg + self.job_handler.deregister() def main(): runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])