)
import rift.tasklets.rwstagingmgr.publisher as publisher
import rift.test.dts
+from rift.mano.utils.project import ManoProject
+
+class TestProject(ManoProject):
+ def __init__(self, log, dts, loop):
+ super().__init__(log)
+ self._dts = dts
+ self._loop = loop
class TestCase(rift.test.dts.AbstractDTSTest):
self.log.debug("STARTING - %s", test_id)
self.tinfo = self.new_tinfo(str(test_id))
self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
+ self.project = TestProject(self.log, self.dts, self.loop)
- self.job_handler = publisher.StagingStorePublisher(self.log, self.dts, self.loop)
+ self.job_handler = publisher.StagingStorePublisher(self.project)
def tearDown(self):
super().tearDown()
yield from asyncio.sleep(2, loop=self.loop)
published_xpaths = yield from self.get_published_xpaths()
assert self.job_handler.xpath() in published_xpaths
+ self.job_handler.deregister()
@rift.test.dts.async_test
def test_publish(self):
self.job_handler.on_staging_area_create(mock_msg)
yield from asyncio.sleep(5, loop=self.loop)
- itr = yield from self.dts.query_read("/staging-areas/staging-area[area-id='{}']".format(
- mock_msg.area_id))
+ xpath = self.project.add_project("/staging-areas/staging-area[area-id='{}']".
+ format(mock_msg.area_id))
+ itr = yield from self.dts.query_read(xpath)
result = None
print (result)
assert result == mock_msg
+ self.job_handler.deregister()
def main():
runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])