Fix DTS updates in package manager publisher
[osm/SO.git] / rwlaunchpad / plugins / rwpkgmgr / rift / tasklets / rwpkgmgr / publisher / download_status.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Author(s): Varun Prasad
17 # Creation Date: 09/25/2016
18 #
19
20 import asyncio
21 import sys
22
23 from gi.repository import (RwDts as rwdts)
24 import rift.mano.dts as mano_dts
25
26 import rift.downloader as url_downloader
27
28 import functools
29 import concurrent
30
31 if sys.version_info < (3, 4, 4):
32 asyncio.ensure_future = asyncio.async
33
34 class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol):
35
36 def __init__(self, log, dts, loop):
37 super().__init__(log, dts, loop)
38 self.tasks = {}
39
40
41 def xpath(self, download_id=None):
42 return ("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job" +
43 ("[download-id='{}']".format(download_id) if download_id else ""))
44
45 @asyncio.coroutine
46 def _dts_publisher(self, job):
47 # Publish the download state
48 self.reg.update_element(
49 self.xpath(download_id=job.download_id), job)
50
51 @asyncio.coroutine
52 def register(self):
53 self.reg = yield from self.dts.register(xpath=self.xpath(),
54 flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
55
56 assert self.reg is not None
57
58 @staticmethod
59 def _async_func(func, fut):
60 try:
61 ret = func()
62 fut.set_result(ret)
63 except Exception as e:
64 fut.set_exception(e)
65
66 def _schedule_dts_work(self, download_job_msg):
67 # Create a coroutine
68 cort = self._dts_publisher(download_job_msg)
69 # Use main asyncio loop (running in main thread)
70 newfunc = functools.partial(asyncio.ensure_future, cort, loop=self.loop)
71 fut = concurrent.futures.Future()
72 # Schedule future in main thread immediately
73 self.loop.call_soon_threadsafe(DownloadStatusPublisher._async_func, newfunc, fut)
74 res = fut.result()
75 exc = fut.exception()
76 if exc is not None:
77 self.log.error("Caught future exception during download: %s type %s", str(exc), type(exc))
78 raise exc
79 return res
80
81 def on_download_progress(self, download_job_msg):
82 """callback that triggers update.
83 """
84 # Trigger progess update
85 # Schedule a future in the main thread
86 self._schedule_dts_work(download_job_msg)
87
88 def on_download_finished(self, download_job_msg):
89 """callback that triggers update.
90 """
91
92 # clean up the local cache
93 key = download_job_msg.download_id
94 if key in self.tasks:
95 del self.tasks[key]
96
97 # Publish the final state
98 # Schedule a future in the main thread
99 self._schedule_dts_work(download_job_msg)
100
101 @asyncio.coroutine
102 def register_downloader(self, downloader):
103 downloader.delegate = self
104 future = self.loop.run_in_executor(None, downloader.download)
105 self.tasks[downloader.download_id] = (downloader, future)
106
107 return downloader.download_id
108
109 @asyncio.coroutine
110 def cancel_download(self, key):
111 task, future = self.tasks[key]
112
113 future.cancel()
114 task.cancel_download()
115
116 def stop(self):
117 self.deregister()
118
119 for task, future in self.tasks:
120 task.cancel()
121 future.cancel()
122
123 def deregister(self):
124 """ de-register with dts """
125 if self.reg is not None:
126 self.reg.deregister()
127 self.reg = None