[RIFT 16413, 16414] Unittest failures fixed for utest_package, utest_publisher_dts
[osm/SO.git] / rwlaunchpad / plugins / rwpkgmgr / test / utest_publisher_dts.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import argparse
20 import asyncio
21 import logging
22 import mock
23 import os
24 import sys
25 import unittest
26 import uuid
27 import xmlrunner
28
29 import gi
30 gi.require_version('RwDts', '1.0')
31 gi.require_version('RwPkgMgmtYang', '1.0')
32 from gi.repository import (
33 RwDts as rwdts,
34 RwPkgMgmtYang
35 )
36 import rift.tasklets.rwpkgmgr.downloader as downloader
37 import rift.tasklets.rwpkgmgr.publisher as pkg_publisher
38 import rift.test.dts
39
40
41 class TestCase(rift.test.dts.AbstractDTSTest):
42 @classmethod
43 def configure_schema(cls):
44 return RwPkgMgmtYang.get_schema()
45
46 @classmethod
47 def configure_timeout(cls):
48 return 240
49
50 def configure_test(self, loop, test_id):
51 self.log.debug("STARTING - %s", test_id)
52 self.tinfo = self.new_tinfo(str(test_id))
53 self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
54
55 self.job_handler = pkg_publisher.DownloadStatusPublisher(self.log, self.dts, self.loop)
56
57 def tearDown(self):
58 super().tearDown()
59
60 @asyncio.coroutine
61 def get_published_xpaths(self):
62 published_xpaths = set()
63
64 res_iter = yield from self.dts.query_read("D,/rwdts:dts")
65 for i in res_iter:
66 res = (yield from i).result
67 for member in res.member:
68 published_xpaths |= {reg.keyspec for reg in member.state.registration if reg.flags == "publisher"}
69
70 return published_xpaths
71
72 @asyncio.coroutine
73 def read_xpath(self, xpath):
74 itr = yield from self.dts.query_read(xpath)
75
76 result = None
77 for fut in itr:
78 result = yield from fut
79 return result.result
80
81 @rift.test.dts.async_test
82 def test_download_publisher(self):
83 yield from self.job_handler.register()
84 published_xpaths = yield from self.get_published_xpaths()
85 assert self.job_handler.xpath() in published_xpaths
86
87 @rift.test.dts.async_test
88 def test_publish(self):
89 """
90 Asserts:
91 1. Verify if an update on_download_progess & on_download_finished
92 triggers a DTS update
93 2. Verify if the internal store is updated
94 """
95 yield from self.job_handler.register()
96
97 mock_msg = RwPkgMgmtYang.DownloadJob.from_dict({
98 "url": "http://foo/bar",
99 "package_id": "123",
100 "download_id": str(uuid.uuid4())})
101
102 yield from self.job_handler._dts_publisher(mock_msg)
103 yield from asyncio.sleep(5, loop=self.loop)
104
105 itr = yield from self.dts.query_read("/download-jobs/job[download-id='{}']".format(
106 mock_msg.download_id))
107
108 result = None
109 for fut in itr:
110 result = yield from fut
111 result = result.result
112
113 print ("Mock ", mock_msg)
114 assert result == mock_msg
115
116 # Modify the msg
117 mock_msg.url = "http://bar/foo"
118 yield from self.job_handler._dts_publisher(mock_msg)
119 yield from asyncio.sleep(5, loop=self.loop)
120
121 itr = yield from self.dts.query_read("/download-jobs/job[download-id='{}']".format(
122 mock_msg.download_id))
123
124 result = None
125 for fut in itr:
126 result = yield from fut
127 result = result.result
128 assert result == mock_msg
129
130
131 @rift.test.dts.async_test
132 def test_url_download(self):
133 """
134 Integration Test:
135 Test the updates with download/url.py
136 """
137 yield from self.job_handler.register()
138
139 proxy = mock.MagicMock()
140
141 url = "http://boson.eng.riftio.com/common/unittests/plantuml.jar"
142 url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", "SCRIPTS", "VNF_CONFIG", proxy)
143
144 download_id = yield from self.job_handler.register_downloader(url_downloader)
145 assert download_id is not None
146
147 # Waiting for 5 secs to be sure that the file is downloaded
148 yield from asyncio.sleep(10, loop=self.loop)
149 xpath = "/download-jobs/job[download-id='{}']".format(
150 download_id)
151 result = yield from self.read_xpath(xpath)
152 self.log.debug("Test result before complete check - %s", result)
153 assert result.status == "COMPLETED"
154 assert len(self.job_handler.tasks) == 0
155
156 @rift.test.dts.async_test
157 def test_url_download_unreachable_ip(self):
158 """
159 Integration Test:
160 Ensure that a bad IP does not block forever
161 """
162 yield from self.job_handler.register()
163
164 proxy = mock.MagicMock()
165
166 # Here, we are assuming that there is no HTTP server at 10.1.2.3
167 url = "http://10.1.2.3/common/unittests/plantuml.jar"
168 url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", "SCRIPTS", "VNF_CONFIG", proxy)
169
170 download_id = yield from self.job_handler.register_downloader(url_downloader)
171 assert download_id is not None
172
173 # Waiting for 10 secs to be sure all reconnect attempts have been exhausted
174 yield from asyncio.sleep(10, loop=self.loop)
175 xpath = "/download-jobs/job[download-id='{}']".format(
176 download_id)
177 result = yield from self.read_xpath(xpath)
178 self.log.debug("Test result before complete check - %s", result)
179 assert result.status == "FAILED"
180 assert len(self.job_handler.tasks) == 0
181
182
183 @rift.test.dts.async_test
184 def test_cancelled(self):
185 """
186 Integration Test:
187 1. Test the updates with downloader.py
188 2. Verifies if cancel triggers the job status to move to cancelled
189 """
190 yield from self.job_handler.register()
191
192 proxy = mock.MagicMock()
193 url = "http://boson.eng.riftio.com/common/unittests/Fedora-x86_64-20-20131211.1-sda-ping.qcow2"
194 url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", "SCRIPTS", "VNF_CONFIG", proxy)
195
196 download_id = yield from self.job_handler.register_downloader(url_downloader)
197 assert download_id is not None
198 xpath = "/download-jobs/job[download-id='{}']".format(
199 download_id)
200
201 yield from asyncio.sleep(1, loop=self.loop)
202
203 result = yield from self.read_xpath(xpath)
204 self.log.debug("Test result before in_progress check - %s", result)
205 assert result.status == "IN_PROGRESS"
206
207 yield from self.job_handler.cancel_download(download_id)
208 yield from asyncio.sleep(3, loop=self.loop)
209 result = yield from self.read_xpath(xpath)
210 self.log.debug("Test result before cancel check - %s", result)
211 assert result.status == "CANCELLED"
212 assert len(self.job_handler.tasks) == 0
213
214
215 def main():
216 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
217
218 parser = argparse.ArgumentParser()
219 parser.add_argument('-v', '--verbose', action='store_true')
220 parser.add_argument('-n', '--no-runner', action='store_true')
221 args, unittest_args = parser.parse_known_args()
222 if args.no_runner:
223 runner = None
224
225 TestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
226
227 unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
228
229 if __name__ == '__main__':
230 main()