4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
29 gi
.require_version('RwDts', '1.0')
30 gi
.require_version('RwStagingMgmtYang', '1.0')
31 from gi
.repository
import (
35 import rift
.tasklets
.rwstagingmgr
.publisher
as publisher
37 from rift
.mano
.utils
.project
import ManoProject
38 gi
.require_version('RwKeyspec', '1.0')
39 from gi
.repository
.RwKeyspec
import quoted_key
41 class TestProject(ManoProject
):
42 def __init__(self
, log
, dts
, loop
):
48 class TestCase(rift
.test
.dts
.AbstractDTSTest
):
50 def configure_schema(cls
):
51 return RwStagingMgmtYang
.get_schema()
54 def configure_timeout(cls
):
57 def configure_test(self
, loop
, test_id
):
58 self
.log
.debug("STARTING - %s", test_id
)
59 self
.tinfo
= self
.new_tinfo(str(test_id
))
60 self
.dts
= rift
.tasklets
.DTS(self
.tinfo
, self
.schema
, self
.loop
)
61 self
.project
= TestProject(self
.log
, self
.dts
, self
.loop
)
63 self
.job_handler
= publisher
.StagingStorePublisher(self
.project
)
69 def get_published_xpaths(self
):
70 published_xpaths
= set()
72 res_iter
= yield from self
.dts
.query_read("D,/rwdts:dts")
74 res
= (yield from i
).result
75 for member
in res
.member
:
76 published_xpaths |
= {reg
.keyspec
for reg
in member
.state
.registration
if reg
.flags
== "publisher"}
78 return published_xpaths
81 def read_xpath(self
, xpath
):
82 itr
= yield from self
.dts
.query_read(xpath
)
86 result
= yield from fut
89 @rift.test
.dts
.async_test
90 def test_download_publisher(self
):
91 yield from self
.job_handler
.register()
92 yield from asyncio
.sleep(2, loop
=self
.loop
)
93 published_xpaths
= yield from self
.get_published_xpaths()
94 assert self
.job_handler
.xpath() in published_xpaths
95 self
.job_handler
.deregister()
97 @rift.test
.dts
.async_test
98 def test_publish(self
):
101 yield from self
.job_handler
.register()
103 mock_msg
= RwStagingMgmtYang
.YangData_RwProject_Project_StagingAreas_StagingArea
.from_dict({
106 self
.job_handler
.on_staging_area_create(mock_msg
)
107 yield from asyncio
.sleep(5, loop
=self
.loop
)
109 xpath
= self
.project
.add_project("/staging-areas/staging-area[area-id={}]".
110 format(quoted_key(mock_msg
.area_id
)))
111 itr
= yield from self
.dts
.query_read(xpath
)
116 result
= yield from fut
117 result
= result
.result
120 assert result
== mock_msg
121 self
.job_handler
.deregister()
124 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
126 parser
= argparse
.ArgumentParser()
127 parser
.add_argument('-v', '--verbose', action
='store_true')
128 parser
.add_argument('-n', '--no-runner', action
='store_true')
129 args
, unittest_args
= parser
.parse_known_args()
133 TestCase
.log_level
= logging
.DEBUG
if args
.verbose
else logging
.WARN
135 unittest
.main(testRunner
=runner
, argv
=[sys
.argv
[0]] + unittest_args
)
137 if __name__
== '__main__':