--- /dev/null
+#!/usr/bin/env python3
+
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse
+import asyncio
+import logging
+import mock
+import os
+import sys
+import unittest
+import uuid
+import xmlrunner
+
+import gi
+gi.require_version('RwDts', '1.0')
+gi.require_version('RwPkgMgmtYang', '1.0')
+from gi.repository import (
+ RwDts as rwdts,
+ RwPkgMgmtYang
+ )
+import rift.tasklets.rwpkgmgr.downloader as downloader
+import rift.tasklets.rwpkgmgr.publisher as pkg_publisher
+import rift.test.dts
+
+
+class TestCase(rift.test.dts.AbstractDTSTest):
+ @classmethod
+ def configure_schema(cls):
+ return RwPkgMgmtYang.get_schema()
+
+ @classmethod
+ def configure_timeout(cls):
+ return 240
+
+ def configure_test(self, loop, test_id):
+ self.log.debug("STARTING - %s", test_id)
+ self.tinfo = self.new_tinfo(str(test_id))
+ self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
+
+ self.job_handler = pkg_publisher.DownloadStatusPublisher(self.log, self.dts, self.loop)
+
+ def tearDown(self):
+ super().tearDown()
+
+ @asyncio.coroutine
+ def get_published_xpaths(self):
+ published_xpaths = set()
+
+ res_iter = yield from self.dts.query_read("D,/rwdts:dts")
+ for i in res_iter:
+ res = (yield from i).result
+ for member in res.member:
+ published_xpaths |= {reg.keyspec for reg in member.state.registration if reg.flags == "publisher"}
+
+ return published_xpaths
+
+ @asyncio.coroutine
+ def read_xpath(self, xpath):
+ itr = yield from self.dts.query_read(xpath)
+
+ result = None
+ for fut in itr:
+ result = yield from fut
+ return result.result
+
+ @rift.test.dts.async_test
+ def test_download_publisher(self):
+ yield from self.job_handler.register()
+ published_xpaths = yield from self.get_published_xpaths()
+ assert self.job_handler.xpath() in published_xpaths
+
+ @rift.test.dts.async_test
+ def test_publish(self):
+ """
+ Asserts:
+ 1. Verify if an update on_download_progess & on_download_finished
+ triggers a DTS update
+ 2. Verify if the internal store is updated
+ """
+ yield from self.job_handler.register()
+
+ mock_msg = RwPkgMgmtYang.DownloadJob.from_dict({
+ "url": "http://foo/bar",
+ "package_id": "123",
+ "download_id": str(uuid.uuid4())})
+
+ self.job_handler.on_download_progress(mock_msg)
+ yield from asyncio.sleep(5, loop=self.loop)
+
+ itr = yield from self.dts.query_read("/download-jobs/job[download-id='{}']".format(
+ mock_msg.download_id))
+
+ result = None
+ for fut in itr:
+ result = yield from fut
+ result = result.result
+
+ print (mock_msg)
+ assert result == mock_msg
+
+ # Modify the msg
+ mock_msg.url = "http://bar/foo"
+ self.job_handler.on_download_finished(mock_msg)
+ yield from asyncio.sleep(5, loop=self.loop)
+
+ itr = yield from self.dts.query_read("/download-jobs/job[download-id='{}']".format(
+ mock_msg.download_id))
+
+ result = None
+ for fut in itr:
+ result = yield from fut
+ result = result.result
+ assert result == mock_msg
+
+
+ @rift.test.dts.async_test
+ def test_url_download(self):
+ """
+ Integration Test:
+ Test the updates with download/url.py
+ """
+ yield from self.job_handler.register()
+
+ proxy = mock.MagicMock()
+
+ url = "https://raw.githubusercontent.com/RIFTIO/RIFT.ware/master/rift-shell"
+ url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", proxy)
+
+ download_id = yield from self.job_handler.register_downloader(url_downloader)
+ assert download_id is not None
+
+ yield from asyncio.sleep(5, loop=self.loop)
+ xpath = "/download-jobs/job[download-id='{}']".format(
+ download_id)
+ result = yield from self.read_xpath(xpath)
+ print (result)
+ assert result.status == "COMPLETED"
+ assert len(self.job_handler.tasks) == 0
+
+
+ @rift.test.dts.async_test
+ def test_cancelled(self):
+ """
+ Integration Test:
+ 1. Test the updates with downloader.py
+ 2. Verifies if cancel triggers the job status to move to cancelled
+ """
+ yield from self.job_handler.register()
+
+ proxy = mock.MagicMock()
+ url = "http://mirror.0x.sg/fedora/linux/releases/24/CloudImages/x86_64/images/Fedora-Cloud-Base-24-1.2.x86_64.qcow2"
+ url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", proxy)
+
+ download_id = yield from self.job_handler.register_downloader(url_downloader)
+ assert download_id is not None
+ xpath = "/download-jobs/job[download-id='{}']".format(
+ download_id)
+
+ yield from asyncio.sleep(3, loop=self.loop)
+
+ result = yield from self.read_xpath(xpath)
+ assert result.status == "IN_PROGRESS"
+
+ yield from self.job_handler.cancel_download(download_id)
+ yield from asyncio.sleep(3, loop=self.loop)
+ result = yield from self.read_xpath(xpath)
+ assert result.status == "CANCELLED"
+ assert len(self.job_handler.tasks) == 0
+
+
+def main():
+ runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-v', '--verbose', action='store_true')
+ parser.add_argument('-n', '--no-runner', action='store_true')
+ args, unittest_args = parser.parse_known_args()
+ if args.no_runner:
+ runner = None
+
+ TestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
+
+ unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
+
+if __name__ == '__main__':
+ main()