Move 'launchpad' directory to 'RIFT_VAR_ROOT' from 'RIFT_ARTIFACTS'
[osm/SO.git] / rwlaunchpad / plugins / rwpkgmgr / test / utest_publisher_dts.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import argparse
20 import asyncio
21 import logging
22 import mock
23 import os
24 import sys
25 import unittest
26 import uuid
27 import xmlrunner
28
29 #Setting RIFT_VAR_ROOT if not already set for unit test execution
30 if "RIFT_VAR_ROOT" not in os.environ:
31 os.environ['RIFT_VAR_ROOT'] = os.path.join(os.environ['RIFT_INSTALL'], 'var/rift/unittest')
32
33 import gi
34 gi.require_version('RwDts', '1.0')
35 gi.require_version('RwPkgMgmtYang', '1.0')
36 from gi.repository import (
37 RwDts as rwdts,
38 RwPkgMgmtYang
39 )
40 import rift.tasklets.rwpkgmgr.downloader as downloader
41 import rift.tasklets.rwpkgmgr.publisher as pkg_publisher
42 import rift.test.dts
43
44
45 class TestCase(rift.test.dts.AbstractDTSTest):
46 @classmethod
47 def configure_schema(cls):
48 return RwPkgMgmtYang.get_schema()
49
50 @classmethod
51 def configure_timeout(cls):
52 return 240
53
54 def configure_test(self, loop, test_id):
55 self.log.debug("STARTING - %s", test_id)
56 self.tinfo = self.new_tinfo(str(test_id))
57 self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
58
59 self.job_handler = pkg_publisher.DownloadStatusPublisher(self.log, self.dts, self.loop)
60
61 def tearDown(self):
62 super().tearDown()
63
64 @asyncio.coroutine
65 def get_published_xpaths(self):
66 published_xpaths = set()
67
68 res_iter = yield from self.dts.query_read("D,/rwdts:dts")
69 for i in res_iter:
70 res = (yield from i).result
71 for member in res.member:
72 published_xpaths |= {reg.keyspec for reg in member.state.registration if reg.flags == "publisher"}
73
74 return published_xpaths
75
76 @asyncio.coroutine
77 def read_xpath(self, xpath):
78 itr = yield from self.dts.query_read(xpath)
79
80 result = None
81 for fut in itr:
82 result = yield from fut
83 return result.result
84
85 @rift.test.dts.async_test
86 def test_download_publisher(self):
87 yield from self.job_handler.register()
88 published_xpaths = yield from self.get_published_xpaths()
89 assert self.job_handler.xpath() in published_xpaths
90
91 @rift.test.dts.async_test
92 def test_publish(self):
93 """
94 Asserts:
95 1. Verify if an update on_download_progess & on_download_finished
96 triggers a DTS update
97 2. Verify if the internal store is updated
98 """
99 yield from self.job_handler.register()
100
101 mock_msg = RwPkgMgmtYang.DownloadJob.from_dict({
102 "url": "http://foo/bar",
103 "package_id": "123",
104 "download_id": str(uuid.uuid4())})
105
106 yield from self.job_handler._dts_publisher(mock_msg)
107 yield from asyncio.sleep(5, loop=self.loop)
108
109 itr = yield from self.dts.query_read("/download-jobs/job[download-id='{}']".format(
110 mock_msg.download_id))
111
112 result = None
113 for fut in itr:
114 result = yield from fut
115 result = result.result
116
117 print ("Mock ", mock_msg)
118 assert result == mock_msg
119
120 # Modify the msg
121 mock_msg.url = "http://bar/foo"
122 yield from self.job_handler._dts_publisher(mock_msg)
123 yield from asyncio.sleep(5, loop=self.loop)
124
125 itr = yield from self.dts.query_read("/download-jobs/job[download-id='{}']".format(
126 mock_msg.download_id))
127
128 result = None
129 for fut in itr:
130 result = yield from fut
131 result = result.result
132 assert result == mock_msg
133
134
135 @rift.test.dts.async_test
136 def test_url_download(self):
137 """
138 Integration Test:
139 Test the updates with download/url.py
140 """
141 yield from self.job_handler.register()
142
143 proxy = mock.MagicMock()
144
145 url = "http://boson.eng.riftio.com/common/unittests/plantuml.jar"
146 url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", proxy)
147
148 download_id = yield from self.job_handler.register_downloader(url_downloader)
149 assert download_id is not None
150
151 # Waiting for 5 secs to be sure that the file is downloaded
152 yield from asyncio.sleep(10, loop=self.loop)
153 xpath = "/download-jobs/job[download-id='{}']".format(
154 download_id)
155 result = yield from self.read_xpath(xpath)
156 self.log.debug("Test result before complete check - %s", result)
157 assert result.status == "COMPLETED"
158 assert len(self.job_handler.tasks) == 0
159
160 @rift.test.dts.async_test
161 def test_url_download_unreachable_ip(self):
162 """
163 Integration Test:
164 Ensure that a bad IP does not block forever
165 """
166 yield from self.job_handler.register()
167
168 proxy = mock.MagicMock()
169
170 # Here, we are assuming that there is no HTTP server at 10.1.2.3
171 url = "http://10.1.2.3/common/unittests/plantuml.jar"
172 url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", proxy)
173
174 download_id = yield from self.job_handler.register_downloader(url_downloader)
175 assert download_id is not None
176
177 # Waiting for 10 secs to be sure all reconnect attempts have been exhausted
178 yield from asyncio.sleep(10, loop=self.loop)
179 xpath = "/download-jobs/job[download-id='{}']".format(
180 download_id)
181 result = yield from self.read_xpath(xpath)
182 self.log.debug("Test result before complete check - %s", result)
183 assert result.status == "FAILED"
184 assert len(self.job_handler.tasks) == 0
185
186
187 @rift.test.dts.async_test
188 def test_cancelled(self):
189 """
190 Integration Test:
191 1. Test the updates with downloader.py
192 2. Verifies if cancel triggers the job status to move to cancelled
193 """
194 yield from self.job_handler.register()
195
196 proxy = mock.MagicMock()
197 url = "http://boson.eng.riftio.com/common/unittests/Fedora-x86_64-20-20131211.1-sda-ping.qcow2"
198 url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", proxy)
199
200 download_id = yield from self.job_handler.register_downloader(url_downloader)
201 assert download_id is not None
202 xpath = "/download-jobs/job[download-id='{}']".format(
203 download_id)
204
205 yield from asyncio.sleep(1, loop=self.loop)
206
207 result = yield from self.read_xpath(xpath)
208 self.log.debug("Test result before in_progress check - %s", result)
209 assert result.status == "IN_PROGRESS"
210
211 yield from self.job_handler.cancel_download(download_id)
212 yield from asyncio.sleep(3, loop=self.loop)
213 result = yield from self.read_xpath(xpath)
214 self.log.debug("Test result before cancel check - %s", result)
215 assert result.status == "CANCELLED"
216 assert len(self.job_handler.tasks) == 0
217
218
219 def main():
220 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
221
222 parser = argparse.ArgumentParser()
223 parser.add_argument('-v', '--verbose', action='store_true')
224 parser.add_argument('-n', '--no-runner', action='store_true')
225 args, unittest_args = parser.parse_known_args()
226 if args.no_runner:
227 runner = None
228
229 TestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
230
231 unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
232
233 if __name__ == '__main__':
234 main()