X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwpkgmgr%2Frift%2Ftasklets%2Frwpkgmgr%2Fpublisher%2Fdownload_status.py;h=d8c6ade57adc2ad80f06a3c69e1698fa2c2823c9;hb=844bfbb4487d8493b3449418675c9f360e39f2ef;hp=6890241e930ffa8c0582814d76b8cda388257a5f;hpb=85a3dc954226041e4db357fa807b1ffdb6953110;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/download_status.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/download_status.py index 6890241e..d8c6ade5 100644 --- a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/download_status.py +++ b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/download_status.py @@ -18,13 +18,18 @@ # import asyncio -import uuid +import sys from gi.repository import (RwDts as rwdts) import rift.mano.dts as mano_dts import rift.downloader as url_downloader +import functools +import concurrent + +if sys.version_info < (3, 4, 4): + asyncio.ensure_future = asyncio.async class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol): @@ -32,26 +37,53 @@ class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProt super().__init__(log, dts, loop) self.tasks = {} + def xpath(self, download_id=None): return ("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job" + ("[download-id='{}']".format(download_id) if download_id else "")) + @asyncio.coroutine + def _dts_publisher(self, job): + # Publish the download state + self.reg.update_element( + self.xpath(download_id=job.download_id), job) + @asyncio.coroutine def register(self): self.reg = yield from self.dts.register(xpath=self.xpath(), flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ) assert self.reg is not None - + + @staticmethod + def _async_func(func, fut): + try: + ret = func() + fut.set_result(ret) + except Exception as e: + fut.set_exception(e) + + def _schedule_dts_work(self, download_job_msg): + # Create a coroutine + cort = self._dts_publisher(download_job_msg) + # Use main asyncio loop (running in main thread) + newfunc = functools.partial(asyncio.ensure_future, cort, loop=self.loop) + fut = concurrent.futures.Future() + # Schedule future in main thread immediately + self.loop.call_soon_threadsafe(DownloadStatusPublisher._async_func, newfunc, fut) + res = fut.result() + exc = fut.exception() + if exc is not None: + self.log.error("Caught future exception during download: %s type %s", str(exc), type(exc)) + raise exc + return res def on_download_progress(self, download_job_msg): """callback that triggers update. """ - key = download_job_msg.download_id # Trigger progess update - self.reg.update_element( - self.xpath(download_id=key), - download_job_msg) + # Schedule a future in the main thread + self._schedule_dts_work(download_job_msg) def on_download_finished(self, download_job_msg): """callback that triggers update. @@ -63,9 +95,8 @@ class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProt del self.tasks[key] # Publish the final state - self.reg.update_element( - self.xpath(download_id=key), - download_job_msg) + # Schedule a future in the main thread + self._schedule_dts_work(download_job_msg) @asyncio.coroutine def register_downloader(self, downloader):