Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwpkgmgr / test / utest_publisher_dts.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import argparse
20 import asyncio
21 import logging
22 import mock
23 import os
24 import sys
25 import unittest
26 import uuid
27 import xmlrunner
28
29 import gi
30 gi.require_version('RwDts', '1.0')
31 gi.require_version('RwPkgMgmtYang', '1.0')
32 from gi.repository import (
33 RwDts as rwdts,
34 RwPkgMgmtYang
35 )
36 import rift.tasklets.rwpkgmgr.downloader as downloader
37 import rift.tasklets.rwpkgmgr.publisher as pkg_publisher
38 import rift.test.dts
39 from rift.mano.utils.project import ManoProject, DEFAULT_PROJECT
40
41
42 class TestCase(rift.test.dts.AbstractDTSTest):
43 @classmethod
44 def configure_schema(cls):
45 return RwPkgMgmtYang.get_schema()
46
47 @classmethod
48 def configure_timeout(cls):
49 return 240
50
51 def configure_test(self, loop, test_id):
52 self.log.debug("STARTING - %s", test_id)
53 self.tinfo = self.new_tinfo(str(test_id))
54 self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
55 self.project = ManoProject(self.log, name=DEFAULT_PROJECT)
56
57 self.job_handler = pkg_publisher.DownloadStatusPublisher(self.log, self.dts,
58 self.loop, self.project)
59
60 def tearDown(self):
61 super().tearDown()
62
63 @asyncio.coroutine
64 def get_published_xpaths(self):
65 published_xpaths = set()
66
67 res_iter = yield from self.dts.query_read("D,/rwdts:dts")
68 for i in res_iter:
69 res = (yield from i).result
70 for member in res.member:
71 published_xpaths |= {reg.keyspec for reg in member.state.registration if reg.flags == "publisher"}
72
73 return published_xpaths
74
75 @asyncio.coroutine
76 def read_xpath(self, xpath):
77 itr = yield from self.dts.query_read(xpath)
78
79 result = None
80 for fut in itr:
81 result = yield from fut
82 return result.result
83
84 @rift.test.dts.async_test
85 def test_download_publisher(self):
86 yield from self.job_handler.register()
87 published_xpaths = yield from self.get_published_xpaths()
88 assert self.job_handler.xpath() in published_xpaths
89
90 @rift.test.dts.async_test
91 def test_publish(self):
92 """
93 Asserts:
94 1. Verify if an update on_download_progess & on_download_finished
95 triggers a DTS update
96 2. Verify if the internal store is updated
97 """
98 yield from self.job_handler.register()
99
100 mock_msg = RwPkgMgmtYang.DownloadJob.from_dict({
101 "url": "http://foo/bar",
102 "package_id": "123",
103 "download_id": str(uuid.uuid4())})
104
105 yield from self.job_handler._dts_publisher(mock_msg)
106 yield from asyncio.sleep(5, loop=self.loop)
107
108 xpath = self.project.add_project("/download-jobs/job[download-id='{}']".
109 format(mock_msg.download_id))
110 itr = yield from self.dts.query_read(xpath)
111
112 result = None
113 for fut in itr:
114 result = yield from fut
115 result = result.result
116
117 self.log.debug("Mock msg: {}".format(mock_msg))
118 assert result == mock_msg
119
120 # Modify the msg
121 mock_msg.url = "http://bar/foo"
122 yield from self.job_handler._dts_publisher(mock_msg)
123 yield from asyncio.sleep(5, loop=self.loop)
124
125 itr = yield from self.dts.query_read(xpath)
126
127 result = None
128 for fut in itr:
129 result = yield from fut
130 result = result.result
131 assert result == mock_msg
132
133
134 @rift.test.dts.async_test
135 def test_url_download(self):
136 """
137 Integration Test:
138 Test the updates with download/url.py
139 """
140 yield from self.job_handler.register()
141
142 proxy = mock.MagicMock()
143
144 url = "http://boson.eng.riftio.com/common/unittests/plantuml.jar"
145 url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", proxy)
146
147 download_id = yield from self.job_handler.register_downloader(url_downloader)
148 assert download_id is not None
149
150 # Waiting to be sure that the file is downloaded
151 # From BLR, it sometimes take longer for the file to
152 # be downloaded
153 max_time = 60
154 total_time = 0
155 while True:
156 yield from asyncio.sleep(5, loop=self.loop)
157 xpath = self.project.add_project("/download-jobs/job[download-id='{}']".
158 format(download_id))
159 result = yield from self.read_xpath(xpath)
160 self.log.debug("Test result before complete check - %s", result)
161 if result.status != "COMPLETED":
162 total_time = total_time + 5
163 if total_time <= max_time:
164 continue
165 else:
166 break
167
168 assert result.status == "COMPLETED"
169 assert len(self.job_handler.tasks) == 0
170
171
172 @rift.test.dts.async_test
173 def test_cancelled(self):
174 """
175 Integration Test:
176 1. Test the updates with downloader.py
177 2. Verifies if cancel triggers the job status to move to cancelled
178 """
179 yield from self.job_handler.register()
180
181 proxy = mock.MagicMock()
182 url = "http://boson.eng.riftio.com/common/unittests/Fedora-x86_64-20-20131211.1-sda-ping.qcow2"
183 url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", proxy)
184
185 download_id = yield from self.job_handler.register_downloader(url_downloader)
186 assert download_id is not None
187 xpath = self.project.add_project("/download-jobs/job[download-id='{}']".
188 format(download_id))
189
190 yield from asyncio.sleep(10, loop=self.loop)
191
192 result = yield from self.read_xpath(xpath)
193 self.log.debug("Test result before in_progress check - %s", result)
194 assert result.status == "IN_PROGRESS"
195
196 yield from self.job_handler.cancel_download(download_id)
197 yield from asyncio.sleep(3, loop=self.loop)
198 result = yield from self.read_xpath(xpath)
199 self.log.debug("Test result before cancel check - %s", result)
200 assert result.status == "CANCELLED"
201 assert len(self.job_handler.tasks) == 0
202
203
204 def main():
205 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
206
207 parser = argparse.ArgumentParser()
208 parser.add_argument('-v', '--verbose', action='store_true')
209 parser.add_argument('-n', '--no-runner', action='store_true')
210 args, unittest_args = parser.parse_known_args()
211 if args.no_runner:
212 runner = None
213
214 TestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
215
216 unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
217
218 if __name__ == '__main__':
219 main()