4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
29 #Setting RIFT_VAR_ROOT if not already set for unit test execution
30 if "RIFT_VAR_ROOT" not in os
.environ
:
31 os
.environ
['RIFT_VAR_ROOT'] = os
.path
.join(os
.environ
['RIFT_INSTALL'], 'var/rift/unittest')
34 gi
.require_version('RwDts', '1.0')
35 gi
.require_version('RwPkgMgmtYang', '1.0')
36 from gi
.repository
import (
40 import rift
.tasklets
.rwpkgmgr
.downloader
as downloader
41 import rift
.tasklets
.rwpkgmgr
.publisher
as pkg_publisher
45 class TestCase(rift
.test
.dts
.AbstractDTSTest
):
47 def configure_schema(cls
):
48 return RwPkgMgmtYang
.get_schema()
51 def configure_timeout(cls
):
54 def configure_test(self
, loop
, test_id
):
55 self
.log
.debug("STARTING - %s", test_id
)
56 self
.tinfo
= self
.new_tinfo(str(test_id
))
57 self
.dts
= rift
.tasklets
.DTS(self
.tinfo
, self
.schema
, self
.loop
)
59 self
.job_handler
= pkg_publisher
.DownloadStatusPublisher(self
.log
, self
.dts
, self
.loop
)
65 def get_published_xpaths(self
):
66 published_xpaths
= set()
68 res_iter
= yield from self
.dts
.query_read("D,/rwdts:dts")
70 res
= (yield from i
).result
71 for member
in res
.member
:
72 published_xpaths |
= {reg
.keyspec
for reg
in member
.state
.registration
if reg
.flags
== "publisher"}
74 return published_xpaths
77 def read_xpath(self
, xpath
):
78 itr
= yield from self
.dts
.query_read(xpath
)
82 result
= yield from fut
85 @rift.test
.dts
.async_test
86 def test_download_publisher(self
):
87 yield from self
.job_handler
.register()
88 published_xpaths
= yield from self
.get_published_xpaths()
89 assert self
.job_handler
.xpath() in published_xpaths
91 @rift.test
.dts
.async_test
92 def test_publish(self
):
95 1. Verify if an update on_download_progess & on_download_finished
97 2. Verify if the internal store is updated
99 yield from self
.job_handler
.register()
101 mock_msg
= RwPkgMgmtYang
.DownloadJob
.from_dict({
102 "url": "http://foo/bar",
104 "download_id": str(uuid
.uuid4())})
106 yield from self
.job_handler
._dts
_publisher
(mock_msg
)
107 yield from asyncio
.sleep(5, loop
=self
.loop
)
109 itr
= yield from self
.dts
.query_read("/download-jobs/job[download-id='{}']".format(
110 mock_msg
.download_id
))
114 result
= yield from fut
115 result
= result
.result
117 print ("Mock ", mock_msg
)
118 assert result
== mock_msg
121 mock_msg
.url
= "http://bar/foo"
122 yield from self
.job_handler
._dts
_publisher
(mock_msg
)
123 yield from asyncio
.sleep(5, loop
=self
.loop
)
125 itr
= yield from self
.dts
.query_read("/download-jobs/job[download-id='{}']".format(
126 mock_msg
.download_id
))
130 result
= yield from fut
131 result
= result
.result
132 assert result
== mock_msg
135 @rift.test
.dts
.async_test
136 def test_url_download(self
):
139 Test the updates with download/url.py
141 yield from self
.job_handler
.register()
143 proxy
= mock
.MagicMock()
145 url
= "http://boson.eng.riftio.com/common/unittests/plantuml.jar"
146 url_downloader
= downloader
.PackageFileDownloader(url
, "1", "/", "VNFD", proxy
)
148 download_id
= yield from self
.job_handler
.register_downloader(url_downloader
)
149 assert download_id
is not None
151 # Waiting for 5 secs to be sure that the file is downloaded
152 yield from asyncio
.sleep(10, loop
=self
.loop
)
153 xpath
= "/download-jobs/job[download-id='{}']".format(
155 result
= yield from self
.read_xpath(xpath
)
156 self
.log
.debug("Test result before complete check - %s", result
)
157 assert result
.status
== "COMPLETED"
158 assert len(self
.job_handler
.tasks
) == 0
160 @rift.test
.dts
.async_test
161 def test_url_download_unreachable_ip(self
):
164 Ensure that a bad IP does not block forever
166 yield from self
.job_handler
.register()
168 proxy
= mock
.MagicMock()
170 # Here, we are assuming that there is no HTTP server at 10.1.2.3
171 url
= "http://10.1.2.3/common/unittests/plantuml.jar"
172 url_downloader
= downloader
.PackageFileDownloader(url
, "1", "/", "VNFD", proxy
)
174 download_id
= yield from self
.job_handler
.register_downloader(url_downloader
)
175 assert download_id
is not None
177 # Waiting for 10 secs to be sure all reconnect attempts have been exhausted
178 yield from asyncio
.sleep(10, loop
=self
.loop
)
179 xpath
= "/download-jobs/job[download-id='{}']".format(
181 result
= yield from self
.read_xpath(xpath
)
182 self
.log
.debug("Test result before complete check - %s", result
)
183 assert result
.status
== "FAILED"
184 assert len(self
.job_handler
.tasks
) == 0
187 @rift.test
.dts
.async_test
188 def test_cancelled(self
):
191 1. Test the updates with downloader.py
192 2. Verifies if cancel triggers the job status to move to cancelled
194 yield from self
.job_handler
.register()
196 proxy
= mock
.MagicMock()
197 url
= "http://boson.eng.riftio.com/common/unittests/Fedora-x86_64-20-20131211.1-sda-ping.qcow2"
198 url_downloader
= downloader
.PackageFileDownloader(url
, "1", "/", "VNFD", proxy
)
200 download_id
= yield from self
.job_handler
.register_downloader(url_downloader
)
201 assert download_id
is not None
202 xpath
= "/download-jobs/job[download-id='{}']".format(
205 yield from asyncio
.sleep(1, loop
=self
.loop
)
207 result
= yield from self
.read_xpath(xpath
)
208 self
.log
.debug("Test result before in_progress check - %s", result
)
209 assert result
.status
== "IN_PROGRESS"
211 yield from self
.job_handler
.cancel_download(download_id
)
212 yield from asyncio
.sleep(3, loop
=self
.loop
)
213 result
= yield from self
.read_xpath(xpath
)
214 self
.log
.debug("Test result before cancel check - %s", result
)
215 assert result
.status
== "CANCELLED"
216 assert len(self
.job_handler
.tasks
) == 0
220 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
222 parser
= argparse
.ArgumentParser()
223 parser
.add_argument('-v', '--verbose', action
='store_true')
224 parser
.add_argument('-n', '--no-runner', action
='store_true')
225 args
, unittest_args
= parser
.parse_known_args()
229 TestCase
.log_level
= logging
.DEBUG
if args
.verbose
else logging
.WARN
231 unittest
.main(testRunner
=runner
, argv
=[sys
.argv
[0]] + unittest_args
)
233 if __name__
== '__main__':