#
import asyncio
-import uuid
+import sys
from gi.repository import (RwDts as rwdts)
import rift.mano.dts as mano_dts
import rift.downloader as url_downloader
+import functools
+import concurrent
+
+if sys.version_info < (3, 4, 4):
+ asyncio.ensure_future = asyncio.async
class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol):
super().__init__(log, dts, loop)
self.tasks = {}
+
def xpath(self, download_id=None):
return ("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job" +
("[download-id='{}']".format(download_id) if download_id else ""))
+ @asyncio.coroutine
+ def _dts_publisher(self, job):
+ # Publish the download state
+ self.reg.update_element(
+ self.xpath(download_id=job.download_id), job)
+
@asyncio.coroutine
def register(self):
self.reg = yield from self.dts.register(xpath=self.xpath(),
flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
assert self.reg is not None
-
+
+ @staticmethod
+ def _async_func(func, fut):
+ try:
+ ret = func()
+ fut.set_result(ret)
+ except Exception as e:
+ fut.set_exception(e)
+
+ def _schedule_dts_work(self, download_job_msg):
+ # Create a coroutine
+ cort = self._dts_publisher(download_job_msg)
+ # Use main asyncio loop (running in main thread)
+ newfunc = functools.partial(asyncio.ensure_future, cort, loop=self.loop)
+ fut = concurrent.futures.Future()
+ # Schedule future in main thread immediately
+ self.loop.call_soon_threadsafe(DownloadStatusPublisher._async_func, newfunc, fut)
+ res = fut.result()
+ exc = fut.exception()
+ if exc is not None:
+ self.log.error("Caught future exception during download: %s type %s", str(exc), type(exc))
+ raise exc
+ return res
def on_download_progress(self, download_job_msg):
"""callback that triggers update.
"""
- key = download_job_msg.download_id
# Trigger progess update
- self.reg.update_element(
- self.xpath(download_id=key),
- download_job_msg)
+ # Schedule a future in the main thread
+ self._schedule_dts_work(download_job_msg)
def on_download_finished(self, download_job_msg):
"""callback that triggers update.
del self.tasks[key]
# Publish the final state
- self.reg.update_element(
- self.xpath(download_id=key),
- download_job_msg)
+ # Schedule a future in the main thread
+ self._schedule_dts_work(download_job_msg)
@asyncio.coroutine
def register_downloader(self, downloader):
"package_id": "123",
"download_id": str(uuid.uuid4())})
- self.job_handler.on_download_progress(mock_msg)
+ yield from self.job_handler._dts_publisher(mock_msg)
yield from asyncio.sleep(5, loop=self.loop)
itr = yield from self.dts.query_read("/download-jobs/job[download-id='{}']".format(
result = yield from fut
result = result.result
- print (mock_msg)
+ print ("Mock ", mock_msg)
assert result == mock_msg
# Modify the msg
mock_msg.url = "http://bar/foo"
- self.job_handler.on_download_finished(mock_msg)
+ yield from self.job_handler._dts_publisher(mock_msg)
yield from asyncio.sleep(5, loop=self.loop)
itr = yield from self.dts.query_read("/download-jobs/job[download-id='{}']".format(
proxy = mock.MagicMock()
- url = "http://boson.eng.riftio.com/common/unittests/rift-shell"
+ url = "http://boson.eng.riftio.com/common/unittests/plantuml.jar"
url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", proxy)
download_id = yield from self.job_handler.register_downloader(url_downloader)
assert download_id is not None
-
+
+ # Waiting for 5 secs to be sure that the file is downloaded
yield from asyncio.sleep(5, loop=self.loop)
xpath = "/download-jobs/job[download-id='{}']".format(
download_id)
result = yield from self.read_xpath(xpath)
- print (result)
+ self.log.debug("Test result before complete check - %s", result)
assert result.status == "COMPLETED"
assert len(self.job_handler.tasks) == 0
xpath = "/download-jobs/job[download-id='{}']".format(
download_id)
- yield from asyncio.sleep(3, loop=self.loop)
+ yield from asyncio.sleep(1, loop=self.loop)
result = yield from self.read_xpath(xpath)
+ self.log.debug("Test result before in_progress check - %s", result)
assert result.status == "IN_PROGRESS"
yield from self.job_handler.cancel_download(download_id)
yield from asyncio.sleep(3, loop=self.loop)
result = yield from self.read_xpath(xpath)
+ self.log.debug("Test result before cancel check - %s", result)
assert result.status == "CANCELLED"
assert len(self.job_handler.tasks) == 0