update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwpkgmgr / rift / tasklets / rwpkgmgr / publisher / download_status.py
index 6890241..05062c1 100644 (file)
 # 
 
 import asyncio
-import uuid
+import gi
+import sys
 
 from gi.repository import (RwDts as rwdts)
 import rift.mano.dts as mano_dts
 
 import rift.downloader as url_downloader
+gi.require_version('RwKeyspec', '1.0')
+from gi.repository.RwKeyspec import quoted_key
 
+import functools
+import concurrent
+
+if sys.version_info < (3, 4, 4):
+    asyncio.ensure_future = asyncio.async
 
 class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol):
 
-    def __init__(self, log, dts, loop):
-        super().__init__(log, dts, loop)
+    def __init__(self, log, dts, loop, project):
+        super().__init__(log, dts, loop, project)
         self.tasks = {}
 
+
     def xpath(self, download_id=None):
-        return ("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job" +
-            ("[download-id='{}']".format(download_id) if download_id else ""))
+        return self._project.add_project("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job" +
+                                         ("[download-id={}]".
+                                          format(quoted_key(download_id)) if download_id else ""))
+
+    @asyncio.coroutine
+    def _dts_publisher(self, job):
+         # Publish the download state
+         self.reg.update_element(
+                        self.xpath(download_id=job.download_id), job)
 
     @asyncio.coroutine
     def register(self):
@@ -43,15 +59,42 @@ class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProt
 
         assert self.reg is not None
 
+    def dergister(self):
+        self._log.debug("De-registering download status for project {}".
+                        format(self.project.name))
+        if self.reg:
+            self.reg.deregister()
+            self.reg = None
+   
+    @staticmethod 
+    def _async_func(func, fut):
+        try:
+            ret = func()
+            fut.set_result(ret)
+        except Exception as e:
+            fut.set_exception(e)
+
+    def _schedule_dts_work(self, download_job_msg):
+        # Create a coroutine
+        cort = self._dts_publisher(download_job_msg)
+        # Use main asyncio loop (running in main thread)
+        newfunc = functools.partial(asyncio.ensure_future, cort, loop=self.loop)
+        fut = concurrent.futures.Future()
+        # Schedule future in main thread immediately
+        self.loop.call_soon_threadsafe(DownloadStatusPublisher._async_func, newfunc, fut)
+        res = fut.result()
+        exc = fut.exception()
+        if exc is not None:
+            self.log.error("Caught future exception during download: %s type %s", str(exc), type(exc))
+            raise exc
+        return res
 
     def on_download_progress(self, download_job_msg):
         """callback that triggers update.
         """
-        key = download_job_msg.download_id
         # Trigger progess update
-        self.reg.update_element(
-                self.xpath(download_id=key),
-                download_job_msg)
+        # Schedule a future in the main thread
+        self._schedule_dts_work(download_job_msg)
 
     def on_download_finished(self, download_job_msg):
         """callback that triggers update.
@@ -63,9 +106,8 @@ class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProt
             del self.tasks[key]
 
         # Publish the final state
-        self.reg.update_element(
-                self.xpath(download_id=key),
-                download_job_msg)
+        # Schedule a future in the main thread
+        self._schedule_dts_work(download_job_msg)
 
     @asyncio.coroutine
     def register_downloader(self, downloader):