Fix DTS updates in package manager publisher 55/1355/1
authorChamarty <ravi.chamarty@riftio.com>
Thu, 23 Mar 2017 15:29:34 +0000 (11:29 -0400)
committerChamarty <ravi.chamarty@riftio.com>
Thu, 23 Mar 2017 15:29:40 +0000 (11:29 -0400)
Change-Id: I285bfd3e05d1856f58e2974afead00b47ccac3b2
Signed-off-by: Chamarty <ravi.chamarty@riftio.com>
rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/download_status.py
rwlaunchpad/plugins/rwpkgmgr/test/utest_publisher_dts.py

index 6890241..d8c6ade 100644 (file)
 # 
 
 import asyncio
-import uuid
+import sys
 
 from gi.repository import (RwDts as rwdts)
 import rift.mano.dts as mano_dts
 
 import rift.downloader as url_downloader
 
+import functools
+import concurrent
+
+if sys.version_info < (3, 4, 4):
+    asyncio.ensure_future = asyncio.async
 
 class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol):
 
@@ -32,26 +37,53 @@ class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProt
         super().__init__(log, dts, loop)
         self.tasks = {}
 
+
     def xpath(self, download_id=None):
         return ("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job" +
             ("[download-id='{}']".format(download_id) if download_id else ""))
 
+    @asyncio.coroutine
+    def _dts_publisher(self, job):
+         # Publish the download state
+         self.reg.update_element(
+                        self.xpath(download_id=job.download_id), job)
+
     @asyncio.coroutine
     def register(self):
         self.reg = yield from self.dts.register(xpath=self.xpath(),
                   flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
 
         assert self.reg is not None
-
+   
+    @staticmethod 
+    def _async_func(func, fut):
+        try:
+            ret = func()
+            fut.set_result(ret)
+        except Exception as e:
+            fut.set_exception(e)
+
+    def _schedule_dts_work(self, download_job_msg):
+        # Create a coroutine
+        cort = self._dts_publisher(download_job_msg)
+        # Use main asyncio loop (running in main thread)
+        newfunc = functools.partial(asyncio.ensure_future, cort, loop=self.loop)
+        fut = concurrent.futures.Future()
+        # Schedule future in main thread immediately
+        self.loop.call_soon_threadsafe(DownloadStatusPublisher._async_func, newfunc, fut)
+        res = fut.result()
+        exc = fut.exception()
+        if exc is not None:
+            self.log.error("Caught future exception during download: %s type %s", str(exc), type(exc))
+            raise exc
+        return res
 
     def on_download_progress(self, download_job_msg):
         """callback that triggers update.
         """
-        key = download_job_msg.download_id
         # Trigger progess update
-        self.reg.update_element(
-                self.xpath(download_id=key),
-                download_job_msg)
+        # Schedule a future in the main thread
+        self._schedule_dts_work(download_job_msg)
 
     def on_download_finished(self, download_job_msg):
         """callback that triggers update.
@@ -63,9 +95,8 @@ class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProt
             del self.tasks[key]
 
         # Publish the final state
-        self.reg.update_element(
-                self.xpath(download_id=key),
-                download_job_msg)
+        # Schedule a future in the main thread
+        self._schedule_dts_work(download_job_msg)
 
     @asyncio.coroutine
     def register_downloader(self, downloader):
index 9ec2956..a02e5c6 100755 (executable)
@@ -99,7 +99,7 @@ class TestCase(rift.test.dts.AbstractDTSTest):
                 "package_id": "123",
                 "download_id": str(uuid.uuid4())})
 
-        self.job_handler.on_download_progress(mock_msg)
+        yield from self.job_handler._dts_publisher(mock_msg)
         yield from asyncio.sleep(5, loop=self.loop)
 
         itr = yield from self.dts.query_read("/download-jobs/job[download-id='{}']".format(
@@ -110,12 +110,12 @@ class TestCase(rift.test.dts.AbstractDTSTest):
             result = yield from fut
             result = result.result
 
-        print (mock_msg)
+        print ("Mock ", mock_msg)
         assert result == mock_msg
 
         # Modify the msg
         mock_msg.url = "http://bar/foo"
-        self.job_handler.on_download_finished(mock_msg)
+        yield from self.job_handler._dts_publisher(mock_msg)
         yield from asyncio.sleep(5, loop=self.loop)
         
         itr = yield from self.dts.query_read("/download-jobs/job[download-id='{}']".format(
@@ -138,17 +138,18 @@ class TestCase(rift.test.dts.AbstractDTSTest):
 
         proxy = mock.MagicMock()
 
-        url = "http://boson.eng.riftio.com/common/unittests/rift-shell"
+        url = "http://boson.eng.riftio.com/common/unittests/plantuml.jar"
         url_downloader = downloader.PackageFileDownloader(url, "1", "/", "VNFD", proxy)
 
         download_id = yield from self.job_handler.register_downloader(url_downloader)
         assert download_id is not None
-        
+       
+        # Waiting for 5 secs to be sure that the file is downloaded
         yield from asyncio.sleep(5, loop=self.loop)
         xpath = "/download-jobs/job[download-id='{}']".format(
             download_id)
         result = yield from self.read_xpath(xpath)
-        print (result)
+        self.log.debug("Test result before complete check - %s", result)
         assert result.status == "COMPLETED"
         assert len(self.job_handler.tasks) == 0
 
@@ -171,14 +172,16 @@ class TestCase(rift.test.dts.AbstractDTSTest):
         xpath = "/download-jobs/job[download-id='{}']".format(
             download_id)
 
-        yield from asyncio.sleep(3, loop=self.loop)
+        yield from asyncio.sleep(1, loop=self.loop)
 
         result = yield from self.read_xpath(xpath)
+        self.log.debug("Test result before in_progress check - %s", result)
         assert result.status == "IN_PROGRESS"
 
         yield from self.job_handler.cancel_download(download_id)
         yield from asyncio.sleep(3, loop=self.loop)
         result = yield from self.read_xpath(xpath)
+        self.log.debug("Test result before cancel check - %s", result)
         assert result.status == "CANCELLED"
         assert len(self.job_handler.tasks) == 0