+++ /dev/null
-#!/usr/bin/env python3
-
-#
-# Copyright 2016 RIFT.IO Inc
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import argparse
-import asyncio
-import logging
-import mock
-import os
-import sys
-import unittest
-import uuid
-import xmlrunner
-
-import gi
-gi.require_version('RwDts', '1.0')
-gi.require_version('RwPkgMgmtYang', '1.0')
-from gi.repository import (
- RwDts as rwdts,
- RwPkgMgmtYang
- )
-import rift.tasklets.rwpkgmgr.downloader as downloader
-import rift.tasklets.rwpkgmgr.publisher as pkg_publisher
-import rift.test.dts
-
-
-class TestCase(rift.test.dts.AbstractDTSTest):
- @classmethod
- def configure_schema(cls):
- return RwPkgMgmtYang.get_schema()
-
- @classmethod
- def configure_timeout(cls):
- return 240
-
- def configure_test(self, loop, test_id):
- self.log.debug("STARTING - %s", test_id)
- self.tinfo = self.new_tinfo(str(test_id))
- self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
-
- self.job_handler = pkg_publisher.DownloadStatusPublisher(self.log, self.dts, self.loop)
-
- def tearDown(self):
- super().tearDown()
-
- @asyncio.coroutine
- def get_published_xpaths(self):
- published_xpaths = set()
-
- res_iter = yield from self.dts.query_read("D,/rwdts:dts")
- for i in res_iter:
- res = (yield from i).result
- for member in res.member:
- published_xpaths |= {reg.keyspec for reg in member.state.registration if reg.flags == "publisher"}
-
- return published_xpaths
-
- @asyncio.coroutine
- def read_xpath(self, xpath):
- itr = yield from self.dts.query_read(xpath)
-
- result = None
- for fut in itr:
- result = yield from fut
- return result.result
-
- @rift.test.dts.async_test
- def test_download_publisher(self):
- yield from self.job_handler.register()
- published_xpaths = yield from self.get_published_xpaths()
- assert self.job_handler.xpath() in published_xpaths
-
- @rift.test.dts.async_test
- def test_publish(self):
- """
- Asserts:
- 1. Verify if an update on_download_progess & on_download_finished
- triggers a DTS update
- 2. Verify if the internal store is updated
- """
- yield from self.job_handler.register()
-
- mock_msg = RwPkgMgmtYang.DownloadJob.from_dict({
- "url": "http://foo/bar",
- "package_id": "123",
- "download_id": str(uuid.uuid4())})
-
- yield from self.job_handler._dts_publisher(mock_msg)
- yield from asyncio.sleep(5, loop=self.loop)
-
- itr = yield from self.dts.query_read("/download-jobs/job[download-id='{}']".format(
- mock_msg.download_id))
-
- result = None
- for fut in itr:
- result = yield from fut
- result = result.result
-
- print ("Mock ", mock_msg)
- assert result == mock_msg
-
- # Modify the msg
- mock_msg.url = "http://bar/foo"
- yield from self.job_handler._dts_publisher(mock_msg)
- yield from asyncio.sleep(5, loop=self.loop)
-
- itr = yield from self.dts.query_read("/download-jobs/job[download-id='{}']".format(
- mock_msg.download_id))
-
- result = None
- for fut in itr:
- result = yield from fut
- result = result.result
- assert result == mock_msg
-
-
- @rift.test.dts.async_test
- def test_url_download(self):
- """
- Integration Test:
- Test the updates with download/url.py
- """
- yield from self.job_handler.register()
-
- proxy = mock.MagicMock()
-
- url = "http://boson.eng.riftio.com/common/unittests/plantuml.jar"
- url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", "SCRIPTS", "VNF_CONFIG", proxy)
-
- download_id = yield from self.job_handler.register_downloader(url_downloader)
- assert download_id is not None
-
- # Waiting for 5 secs to be sure that the file is downloaded
- yield from asyncio.sleep(10, loop=self.loop)
- xpath = "/download-jobs/job[download-id='{}']".format(
- download_id)
- result = yield from self.read_xpath(xpath)
- self.log.debug("Test result before complete check - %s", result)
- assert result.status == "COMPLETED"
- assert len(self.job_handler.tasks) == 0
-
- @rift.test.dts.async_test
- def test_url_download_unreachable_ip(self):
- """
- Integration Test:
- Ensure that a bad IP does not block forever
- """
- yield from self.job_handler.register()
-
- proxy = mock.MagicMock()
-
- # Here, we are assuming that there is no HTTP server at 10.1.2.3
- url = "http://10.1.2.3/common/unittests/plantuml.jar"
- url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", "SCRIPTS", "VNF_CONFIG", proxy)
-
- download_id = yield from self.job_handler.register_downloader(url_downloader)
- assert download_id is not None
-
- # Waiting for 10 secs to be sure all reconnect attempts have been exhausted
- yield from asyncio.sleep(10, loop=self.loop)
- xpath = "/download-jobs/job[download-id='{}']".format(
- download_id)
- result = yield from self.read_xpath(xpath)
- self.log.debug("Test result before complete check - %s", result)
- assert result.status == "FAILED"
- assert len(self.job_handler.tasks) == 0
-
-
- @rift.test.dts.async_test
- def test_cancelled(self):
- """
- Integration Test:
- 1. Test the updates with downloader.py
- 2. Verifies if cancel triggers the job status to move to cancelled
- """
- yield from self.job_handler.register()
-
- proxy = mock.MagicMock()
- url = "http://boson.eng.riftio.com/common/unittests/Fedora-x86_64-20-20131211.1-sda-ping.qcow2"
- url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", "SCRIPTS", "VNF_CONFIG", proxy)
-
- download_id = yield from self.job_handler.register_downloader(url_downloader)
- assert download_id is not None
- xpath = "/download-jobs/job[download-id='{}']".format(
- download_id)
-
- yield from asyncio.sleep(1, loop=self.loop)
-
- result = yield from self.read_xpath(xpath)
- self.log.debug("Test result before in_progress check - %s", result)
- assert result.status == "IN_PROGRESS"
-
- yield from self.job_handler.cancel_download(download_id)
- yield from asyncio.sleep(3, loop=self.loop)
- result = yield from self.read_xpath(xpath)
- self.log.debug("Test result before cancel check - %s", result)
- assert result.status == "CANCELLED"
- assert len(self.job_handler.tasks) == 0
-
-
-def main():
- runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
-
- parser = argparse.ArgumentParser()
- parser.add_argument('-v', '--verbose', action='store_true')
- parser.add_argument('-n', '--no-runner', action='store_true')
- args, unittest_args = parser.parse_known_args()
- if args.no_runner:
- runner = None
-
- TestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
-
- unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
-
-if __name__ == '__main__':
- main()