4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
30 #Setting RIFT_VAR_ROOT if not already set for unit test execution
31 if "RIFT_VAR_ROOT" not in os
.environ
:
32 os
.environ
['RIFT_VAR_ROOT'] = os
.path
.join(os
.environ
['RIFT_INSTALL'], 'var/rift/unittest')
34 gi
.require_version('RwDts', '1.0')
35 gi
.require_version('RwPkgMgmtYang', '1.0')
36 from gi
.repository
import (
40 import rift
.tasklets
.rwpkgmgr
.downloader
as downloader
41 import rift
.tasklets
.rwpkgmgr
.publisher
as pkg_publisher
43 from rift
.mano
.utils
.project
import ManoProject
, DEFAULT_PROJECT
45 gi
.require_version('RwKeyspec', '1.0')
46 from gi
.repository
.RwKeyspec
import quoted_key
48 class TestCase(rift
.test
.dts
.AbstractDTSTest
):
50 def configure_schema(cls
):
51 return RwPkgMgmtYang
.get_schema()
54 def configure_timeout(cls
):
57 def configure_test(self
, loop
, test_id
):
58 self
.log
.debug("STARTING - %s", test_id
)
59 self
.tinfo
= self
.new_tinfo(str(test_id
))
60 self
.dts
= rift
.tasklets
.DTS(self
.tinfo
, self
.schema
, self
.loop
)
61 self
.project
= ManoProject(self
.log
, name
=DEFAULT_PROJECT
)
63 self
.job_handler
= pkg_publisher
.DownloadStatusPublisher(self
.log
, self
.dts
,
64 self
.loop
, self
.project
)
70 def get_published_xpaths(self
):
71 published_xpaths
= set()
73 res_iter
= yield from self
.dts
.query_read("D,/rwdts:dts")
75 res
= (yield from i
).result
76 for member
in res
.member
:
77 published_xpaths |
= {reg
.keyspec
for reg
in member
.state
.registration
if reg
.flags
== "publisher"}
79 return published_xpaths
82 def read_xpath(self
, xpath
):
83 itr
= yield from self
.dts
.query_read(xpath
)
87 result
= yield from fut
90 @rift.test
.dts
.async_test
91 def test_download_publisher(self
):
92 yield from self
.job_handler
.register()
93 published_xpaths
= yield from self
.get_published_xpaths()
94 assert self
.job_handler
.xpath() in published_xpaths
96 @rift.test
.dts
.async_test
97 def test_publish(self
):
100 1. Verify if an update on_download_progess & on_download_finished
101 triggers a DTS update
102 2. Verify if the internal store is updated
104 yield from self
.job_handler
.register()
106 mock_msg
= RwPkgMgmtYang
.YangData_RwProject_Project_DownloadJobs_Job
.from_dict({
107 "url": "http://foo/bar",
109 "download_id": str(uuid
.uuid4())})
111 yield from self
.job_handler
._dts
_publisher
(mock_msg
)
112 yield from asyncio
.sleep(5, loop
=self
.loop
)
114 xpath
= self
.project
.add_project("/download-jobs/job[download-id={}]".
115 format(quoted_key(mock_msg
.download_id
)))
116 itr
= yield from self
.dts
.query_read(xpath
)
120 result
= yield from fut
121 result
= result
.result
123 self
.log
.debug("Mock msg: {}".format(mock_msg
))
124 assert result
== mock_msg
127 mock_msg
.url
= "http://bar/foo"
128 yield from self
.job_handler
._dts
_publisher
(mock_msg
)
129 yield from asyncio
.sleep(5, loop
=self
.loop
)
131 itr
= yield from self
.dts
.query_read(xpath
)
135 result
= yield from fut
136 result
= result
.result
137 assert result
== mock_msg
140 @rift.test
.dts
.async_test
141 def test_url_download(self
):
144 Test the updates with download/url.py
146 yield from self
.job_handler
.register()
148 proxy
= mock
.MagicMock()
150 url
= "http://sharedfiles/common/unittests/plantuml.jar"
151 url_downloader
= downloader
.PackageFileDownloader(url
, "1", "/", "VNFD", "SCRIPTS", "VNF_CONFIG", proxy
)
153 download_id
= yield from self
.job_handler
.register_downloader(url_downloader
)
154 assert download_id
is not None
156 # Waiting for 5 secs to be sure that the file is downloaded
157 yield from asyncio
.sleep(10, loop
=self
.loop
)
158 xpath
= self
.project
.add_project("/download-jobs/job[download-id={}]".format(
159 quoted_key(download_id
)))
160 result
= yield from self
.read_xpath(xpath
)
161 self
.log
.debug("Test result before complete check - %s", result
)
162 assert result
.status
== "COMPLETED"
163 assert len(self
.job_handler
.tasks
) == 0
165 @rift.test
.dts
.async_test
166 def test_url_download_unreachable_ip(self
):
169 Ensure that a bad IP does not block forever
171 yield from self
.job_handler
.register()
173 proxy
= mock
.MagicMock()
175 # Here, we are assuming that there is no HTTP server at 10.1.2.3
176 url
= "http://10.1.2.3/common/unittests/plantuml.jar"
177 url_downloader
= downloader
.PackageFileDownloader(url
, "1", "/", "VNFD", "SCRIPTS", "VNF_CONFIG", proxy
)
178 self
.log
.debug("Downloader url: {}".format(url_downloader
))
180 download_id
= yield from self
.job_handler
.register_downloader(url_downloader
)
181 self
.log
.debug("Download id: {}".format(download_id
))
182 assert download_id
is not None
184 # Waiting for 60 secs to be sure all reconnect attempts have been exhausted
185 yield from asyncio
.sleep(60, loop
=self
.loop
)
186 xpath
= self
.project
.add_project("/download-jobs/job[download-id={}]".
187 format(quoted_key(download_id
)))
188 result
= yield from self
.read_xpath(xpath
)
189 self
.log
.debug("Test result before complete check - %s", result
)
190 assert result
.status
== "FAILED"
191 assert len(self
.job_handler
.tasks
) == 0
194 @rift.test
.dts
.async_test
195 def test_cancelled(self
):
198 1. Test the updates with downloader.py
199 2. Verifies if cancel triggers the job status to move to cancelled
201 yield from self
.job_handler
.register()
203 proxy
= mock
.MagicMock()
204 url
= "http://sharedfiles/common/unittests/Fedora-x86_64-20-20131211.1-sda-ping.qcow2"
205 url_downloader
= downloader
.PackageFileDownloader(url
, "1", "/", "VNFD", "SCRIPTS", "VNF_CONFIG", proxy
)
207 download_id
= yield from self
.job_handler
.register_downloader(url_downloader
)
208 assert download_id
is not None
209 xpath
= self
.project
.add_project("/download-jobs/job[download-id={}]".
210 format(quoted_key(download_id
)))
212 # wait long enough to have the state be in IN_PROGRESS
213 yield from asyncio
.sleep(0.2, loop
=self
.loop
)
215 result
= yield from self
.read_xpath(xpath
)
216 self
.log
.debug("Test result before in_progress check - %s", result
)
217 assert result
.status
== "IN_PROGRESS"
219 yield from self
.job_handler
.cancel_download(download_id
)
220 yield from asyncio
.sleep(3, loop
=self
.loop
)
221 result
= yield from self
.read_xpath(xpath
)
222 self
.log
.debug("Test result before cancel check - %s", result
)
223 assert result
.status
== "CANCELLED"
224 assert len(self
.job_handler
.tasks
) == 0
228 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
230 parser
= argparse
.ArgumentParser()
231 parser
.add_argument('-v', '--verbose', action
='store_true')
232 parser
.add_argument('-n', '--no-runner', action
='store_true')
233 args
, unittest_args
= parser
.parse_known_args()
237 TestCase
.log_level
= logging
.DEBUG
if args
.verbose
else logging
.WARN
239 unittest
.main(testRunner
=runner
, argv
=[sys
.argv
[0]] + unittest_args
)
241 if __name__
== '__main__':