From: Chamarty Date: Thu, 23 Mar 2017 15:29:34 +0000 (-0400) Subject: Fix DTS updates in package manager publisher X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=7032313e5ce57f8b0d76af95bcf51c985d4b8692;hp=5f2bf0fbc7e2d7560bd81bb7c671bc7cfcda5745;p=osm%2FSO.git Fix DTS updates in package manager publisher Change-Id: I285bfd3e05d1856f58e2974afead00b47ccac3b2 Signed-off-by: Chamarty --- diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/download_status.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/download_status.py index 6890241e..d8c6ade5 100644 --- a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/download_status.py +++ b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/download_status.py @@ -18,13 +18,18 @@ # import asyncio -import uuid +import sys from gi.repository import (RwDts as rwdts) import rift.mano.dts as mano_dts import rift.downloader as url_downloader +import functools +import concurrent + +if sys.version_info < (3, 4, 4): + asyncio.ensure_future = asyncio.async class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol): @@ -32,26 +37,53 @@ class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProt super().__init__(log, dts, loop) self.tasks = {} + def xpath(self, download_id=None): return ("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job" + ("[download-id='{}']".format(download_id) if download_id else "")) + @asyncio.coroutine + def _dts_publisher(self, job): + # Publish the download state + self.reg.update_element( + self.xpath(download_id=job.download_id), job) + @asyncio.coroutine def register(self): self.reg = yield from self.dts.register(xpath=self.xpath(), flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ) assert self.reg is not None - + + @staticmethod + def _async_func(func, fut): + try: + ret = func() + fut.set_result(ret) + except Exception as e: + fut.set_exception(e) + + def _schedule_dts_work(self, download_job_msg): + # Create a coroutine + cort = self._dts_publisher(download_job_msg) + # Use main asyncio loop (running in main thread) + newfunc = functools.partial(asyncio.ensure_future, cort, loop=self.loop) + fut = concurrent.futures.Future() + # Schedule future in main thread immediately + self.loop.call_soon_threadsafe(DownloadStatusPublisher._async_func, newfunc, fut) + res = fut.result() + exc = fut.exception() + if exc is not None: + self.log.error("Caught future exception during download: %s type %s", str(exc), type(exc)) + raise exc + return res def on_download_progress(self, download_job_msg): """callback that triggers update. """ - key = download_job_msg.download_id # Trigger progess update - self.reg.update_element( - self.xpath(download_id=key), - download_job_msg) + # Schedule a future in the main thread + self._schedule_dts_work(download_job_msg) def on_download_finished(self, download_job_msg): """callback that triggers update. @@ -63,9 +95,8 @@ class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProt del self.tasks[key] # Publish the final state - self.reg.update_element( - self.xpath(download_id=key), - download_job_msg) + # Schedule a future in the main thread + self._schedule_dts_work(download_job_msg) @asyncio.coroutine def register_downloader(self, downloader): diff --git a/rwlaunchpad/plugins/rwpkgmgr/test/utest_publisher_dts.py b/rwlaunchpad/plugins/rwpkgmgr/test/utest_publisher_dts.py index 9ec29561..a02e5c66 100755 --- a/rwlaunchpad/plugins/rwpkgmgr/test/utest_publisher_dts.py +++ b/rwlaunchpad/plugins/rwpkgmgr/test/utest_publisher_dts.py @@ -99,7 +99,7 @@ class TestCase(rift.test.dts.AbstractDTSTest): "package_id": "123", "download_id": str(uuid.uuid4())}) - self.job_handler.on_download_progress(mock_msg) + yield from self.job_handler._dts_publisher(mock_msg) yield from asyncio.sleep(5, loop=self.loop) itr = yield from self.dts.query_read("/download-jobs/job[download-id='{}']".format( @@ -110,12 +110,12 @@ class TestCase(rift.test.dts.AbstractDTSTest): result = yield from fut result = result.result - print (mock_msg) + print ("Mock ", mock_msg) assert result == mock_msg # Modify the msg mock_msg.url = "http://bar/foo" - self.job_handler.on_download_finished(mock_msg) + yield from self.job_handler._dts_publisher(mock_msg) yield from asyncio.sleep(5, loop=self.loop) itr = yield from self.dts.query_read("/download-jobs/job[download-id='{}']".format( @@ -138,17 +138,18 @@ class TestCase(rift.test.dts.AbstractDTSTest): proxy = mock.MagicMock() - url = "http://boson.eng.riftio.com/common/unittests/rift-shell" + url = "http://boson.eng.riftio.com/common/unittests/plantuml.jar" url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", proxy) download_id = yield from self.job_handler.register_downloader(url_downloader) assert download_id is not None - + + # Waiting for 5 secs to be sure that the file is downloaded yield from asyncio.sleep(5, loop=self.loop) xpath = "/download-jobs/job[download-id='{}']".format( download_id) result = yield from self.read_xpath(xpath) - print (result) + self.log.debug("Test result before complete check - %s", result) assert result.status == "COMPLETED" assert len(self.job_handler.tasks) == 0 @@ -171,14 +172,16 @@ class TestCase(rift.test.dts.AbstractDTSTest): xpath = "/download-jobs/job[download-id='{}']".format( download_id) - yield from asyncio.sleep(3, loop=self.loop) + yield from asyncio.sleep(1, loop=self.loop) result = yield from self.read_xpath(xpath) + self.log.debug("Test result before in_progress check - %s", result) assert result.status == "IN_PROGRESS" yield from self.job_handler.cancel_download(download_id) yield from asyncio.sleep(3, loop=self.loop) result = yield from self.read_xpath(xpath) + self.log.debug("Test result before cancel check - %s", result) assert result.status == "CANCELLED" assert len(self.job_handler.tasks) == 0