import argparse
import asyncio
+import gi
import logging
import os
import sys
import uuid
import xmlrunner
-import gi
gi.require_version('RwDts', '1.0')
gi.require_version('RwStagingMgmtYang', '1.0')
from gi.repository import (
)
import rift.tasklets.rwstagingmgr.publisher as publisher
import rift.test.dts
+from rift.mano.utils.project import ManoProject
+gi.require_version('RwKeyspec', '1.0')
+from gi.repository.RwKeyspec import quoted_key
+
+class TestProject(ManoProject):
+ def __init__(self, log, dts, loop):
+ super().__init__(log)
+ self._dts = dts
+ self._loop = loop
class TestCase(rift.test.dts.AbstractDTSTest):
self.log.debug("STARTING - %s", test_id)
self.tinfo = self.new_tinfo(str(test_id))
self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
+ self.project = TestProject(self.log, self.dts, self.loop)
- self.job_handler = publisher.StagingStorePublisher(self.log, self.dts, self.loop)
+ self.job_handler = publisher.StagingStorePublisher(self.project)
def tearDown(self):
super().tearDown()
yield from asyncio.sleep(2, loop=self.loop)
published_xpaths = yield from self.get_published_xpaths()
assert self.job_handler.xpath() in published_xpaths
+ self.job_handler.deregister()
@rift.test.dts.async_test
def test_publish(self):
"""
yield from self.job_handler.register()
- mock_msg = RwStagingMgmtYang.StagingArea.from_dict({
+ mock_msg = RwStagingMgmtYang.YangData_RwProject_Project_StagingAreas_StagingArea.from_dict({
"area_id": "123"})
self.job_handler.on_staging_area_create(mock_msg)
yield from asyncio.sleep(5, loop=self.loop)
- itr = yield from self.dts.query_read("/staging-areas/staging-area[area-id='{}']".format(
- mock_msg.area_id))
+ xpath = self.project.add_project("/staging-areas/staging-area[area-id={}]".
+ format(quoted_key(mock_msg.area_id)))
+ itr = yield from self.dts.query_read(xpath)
result = None
print (result)
assert result == mock_msg
+ self.job_handler.deregister()
def main():
runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])