X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=rwlaunchpad%2Fplugins%2Frwpkgmgr%2Frift%2Ftasklets%2Frwpkgmgr%2Fpublisher%2Fdownload_status.py;h=05062c18ce0d15ba3739e431cc9ac9506dc6019c;hb=f314b4af9744068a7ed7a6a6314220c3aa857523;hp=6890241e930ffa8c0582814d76b8cda388257a5f;hpb=07b439824b5eac4dc760ce56b52fbdcf5539db4c;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/download_status.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/download_status.py index 6890241e..05062c18 100644 --- a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/download_status.py +++ b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/download_status.py @@ -18,23 +18,39 @@ # import asyncio -import uuid +import gi +import sys from gi.repository import (RwDts as rwdts) import rift.mano.dts as mano_dts import rift.downloader as url_downloader +gi.require_version('RwKeyspec', '1.0') +from gi.repository.RwKeyspec import quoted_key +import functools +import concurrent + +if sys.version_info < (3, 4, 4): + asyncio.ensure_future = asyncio.async class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol): - def __init__(self, log, dts, loop): - super().__init__(log, dts, loop) + def __init__(self, log, dts, loop, project): + super().__init__(log, dts, loop, project) self.tasks = {} + def xpath(self, download_id=None): - return ("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job" + - ("[download-id='{}']".format(download_id) if download_id else "")) + return self._project.add_project("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job" + + ("[download-id={}]". + format(quoted_key(download_id)) if download_id else "")) + + @asyncio.coroutine + def _dts_publisher(self, job): + # Publish the download state + self.reg.update_element( + self.xpath(download_id=job.download_id), job) @asyncio.coroutine def register(self): @@ -43,15 +59,42 @@ class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProt assert self.reg is not None + def dergister(self): + self._log.debug("De-registering download status for project {}". + format(self.project.name)) + if self.reg: + self.reg.deregister() + self.reg = None + + @staticmethod + def _async_func(func, fut): + try: + ret = func() + fut.set_result(ret) + except Exception as e: + fut.set_exception(e) + + def _schedule_dts_work(self, download_job_msg): + # Create a coroutine + cort = self._dts_publisher(download_job_msg) + # Use main asyncio loop (running in main thread) + newfunc = functools.partial(asyncio.ensure_future, cort, loop=self.loop) + fut = concurrent.futures.Future() + # Schedule future in main thread immediately + self.loop.call_soon_threadsafe(DownloadStatusPublisher._async_func, newfunc, fut) + res = fut.result() + exc = fut.exception() + if exc is not None: + self.log.error("Caught future exception during download: %s type %s", str(exc), type(exc)) + raise exc + return res def on_download_progress(self, download_job_msg): """callback that triggers update. """ - key = download_job_msg.download_id # Trigger progess update - self.reg.update_element( - self.xpath(download_id=key), - download_job_msg) + # Schedule a future in the main thread + self._schedule_dts_work(download_job_msg) def on_download_finished(self, download_job_msg): """callback that triggers update. @@ -63,9 +106,8 @@ class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProt del self.tasks[key] # Publish the final state - self.reg.update_element( - self.xpath(download_id=key), - download_job_msg) + # Schedule a future in the main thread + self._schedule_dts_work(download_job_msg) @asyncio.coroutine def register_downloader(self, downloader):