update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwpkgmgr / test / utest_pkgmgr_publisher_dts.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import argparse
20 import asyncio
21 import gi
22 import logging
23 import mock
24 import os
25 import sys
26 import unittest
27 import uuid
28 import xmlrunner
29
30 #Setting RIFT_VAR_ROOT if not already set for unit test execution
31 if "RIFT_VAR_ROOT" not in os.environ:
32 os.environ['RIFT_VAR_ROOT'] = os.path.join(os.environ['RIFT_INSTALL'], 'var/rift/unittest')
33
34 gi.require_version('RwDts', '1.0')
35 gi.require_version('RwPkgMgmtYang', '1.0')
36 from gi.repository import (
37 RwDts as rwdts,
38 RwPkgMgmtYang
39 )
40 import rift.tasklets.rwpkgmgr.downloader as downloader
41 import rift.tasklets.rwpkgmgr.publisher as pkg_publisher
42 import rift.test.dts
43 from rift.mano.utils.project import ManoProject, DEFAULT_PROJECT
44
45 gi.require_version('RwKeyspec', '1.0')
46 from gi.repository.RwKeyspec import quoted_key
47
48 class TestCase(rift.test.dts.AbstractDTSTest):
49 @classmethod
50 def configure_schema(cls):
51 return RwPkgMgmtYang.get_schema()
52
53 @classmethod
54 def configure_timeout(cls):
55 return 240
56
57 def configure_test(self, loop, test_id):
58 self.log.debug("STARTING - %s", test_id)
59 self.tinfo = self.new_tinfo(str(test_id))
60 self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
61 self.project = ManoProject(self.log, name=DEFAULT_PROJECT)
62
63 self.job_handler = pkg_publisher.DownloadStatusPublisher(self.log, self.dts,
64 self.loop, self.project)
65
66 def tearDown(self):
67 super().tearDown()
68
69 @asyncio.coroutine
70 def get_published_xpaths(self):
71 published_xpaths = set()
72
73 res_iter = yield from self.dts.query_read("D,/rwdts:dts")
74 for i in res_iter:
75 res = (yield from i).result
76 for member in res.member:
77 published_xpaths |= {reg.keyspec for reg in member.state.registration if reg.flags == "publisher"}
78
79 return published_xpaths
80
81 @asyncio.coroutine
82 def read_xpath(self, xpath):
83 itr = yield from self.dts.query_read(xpath)
84
85 result = None
86 for fut in itr:
87 result = yield from fut
88 return result.result
89
90 @rift.test.dts.async_test
91 def test_download_publisher(self):
92 yield from self.job_handler.register()
93 published_xpaths = yield from self.get_published_xpaths()
94 assert self.job_handler.xpath() in published_xpaths
95
96 @rift.test.dts.async_test
97 def test_publish(self):
98 """
99 Asserts:
100 1. Verify if an update on_download_progess & on_download_finished
101 triggers a DTS update
102 2. Verify if the internal store is updated
103 """
104 yield from self.job_handler.register()
105
106 mock_msg = RwPkgMgmtYang.YangData_RwProject_Project_DownloadJobs_Job.from_dict({
107 "url": "http://foo/bar",
108 "package_id": "123",
109 "download_id": str(uuid.uuid4())})
110
111 yield from self.job_handler._dts_publisher(mock_msg)
112 yield from asyncio.sleep(5, loop=self.loop)
113
114 xpath = self.project.add_project("/download-jobs/job[download-id={}]".
115 format(quoted_key(mock_msg.download_id)))
116 itr = yield from self.dts.query_read(xpath)
117
118 result = None
119 for fut in itr:
120 result = yield from fut
121 result = result.result
122
123 self.log.debug("Mock msg: {}".format(mock_msg))
124 assert result == mock_msg
125
126 # Modify the msg
127 mock_msg.url = "http://bar/foo"
128 yield from self.job_handler._dts_publisher(mock_msg)
129 yield from asyncio.sleep(5, loop=self.loop)
130
131 itr = yield from self.dts.query_read(xpath)
132
133 result = None
134 for fut in itr:
135 result = yield from fut
136 result = result.result
137 assert result == mock_msg
138
139
140 @rift.test.dts.async_test
141 def test_url_download(self):
142 """
143 Integration Test:
144 Test the updates with download/url.py
145 """
146 yield from self.job_handler.register()
147
148 proxy = mock.MagicMock()
149
150 url = "http://sharedfiles/common/unittests/plantuml.jar"
151 url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", "SCRIPTS", "VNF_CONFIG", proxy)
152
153 download_id = yield from self.job_handler.register_downloader(url_downloader)
154 assert download_id is not None
155
156 # Waiting for 5 secs to be sure that the file is downloaded
157 yield from asyncio.sleep(10, loop=self.loop)
158 xpath = self.project.add_project("/download-jobs/job[download-id={}]".format(
159 quoted_key(download_id)))
160 result = yield from self.read_xpath(xpath)
161 self.log.debug("Test result before complete check - %s", result)
162 assert result.status == "COMPLETED"
163 assert len(self.job_handler.tasks) == 0
164
165 @rift.test.dts.async_test
166 def test_url_download_unreachable_ip(self):
167 """
168 Integration Test:
169 Ensure that a bad IP does not block forever
170 """
171 yield from self.job_handler.register()
172
173 proxy = mock.MagicMock()
174
175 # Here, we are assuming that there is no HTTP server at 10.1.2.3
176 url = "http://10.1.2.3/common/unittests/plantuml.jar"
177 url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", "SCRIPTS", "VNF_CONFIG", proxy)
178 self.log.debug("Downloader url: {}".format(url_downloader))
179
180 download_id = yield from self.job_handler.register_downloader(url_downloader)
181 self.log.debug("Download id: {}".format(download_id))
182 assert download_id is not None
183
184 # Waiting for 60 secs to be sure all reconnect attempts have been exhausted
185 yield from asyncio.sleep(60, loop=self.loop)
186 xpath = self.project.add_project("/download-jobs/job[download-id={}]".
187 format(quoted_key(download_id)))
188 result = yield from self.read_xpath(xpath)
189 self.log.debug("Test result before complete check - %s", result)
190 assert result.status == "FAILED"
191 assert len(self.job_handler.tasks) == 0
192
193
194 @rift.test.dts.async_test
195 def test_cancelled(self):
196 """
197 Integration Test:
198 1. Test the updates with downloader.py
199 2. Verifies if cancel triggers the job status to move to cancelled
200 """
201 yield from self.job_handler.register()
202
203 proxy = mock.MagicMock()
204 url = "http://sharedfiles/common/unittests/Fedora-x86_64-20-20131211.1-sda-ping.qcow2"
205 url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", "SCRIPTS", "VNF_CONFIG", proxy)
206
207 download_id = yield from self.job_handler.register_downloader(url_downloader)
208 assert download_id is not None
209 xpath = self.project.add_project("/download-jobs/job[download-id={}]".
210 format(quoted_key(download_id)))
211
212 # wait long enough to have the state be in IN_PROGRESS
213 yield from asyncio.sleep(0.2, loop=self.loop)
214
215 result = yield from self.read_xpath(xpath)
216 self.log.debug("Test result before in_progress check - %s", result)
217 assert result.status == "IN_PROGRESS"
218
219 yield from self.job_handler.cancel_download(download_id)
220 yield from asyncio.sleep(3, loop=self.loop)
221 result = yield from self.read_xpath(xpath)
222 self.log.debug("Test result before cancel check - %s", result)
223 assert result.status == "CANCELLED"
224 assert len(self.job_handler.tasks) == 0
225
226
227 def main():
228 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
229
230 parser = argparse.ArgumentParser()
231 parser.add_argument('-v', '--verbose', action='store_true')
232 parser.add_argument('-n', '--no-runner', action='store_true')
233 args, unittest_args = parser.parse_known_args()
234 if args.no_runner:
235 runner = None
236
237 TestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
238
239 unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
240
241 if __name__ == '__main__':
242 main()