Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwpkgmgr / rift / tasklets / rwpkgmgr / publisher / download_status.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Author(s): Varun Prasad
17 # Creation Date: 09/25/2016
18 #
19
20 import asyncio
21 import sys
22
23 from gi.repository import (RwDts as rwdts)
24 import rift.mano.dts as mano_dts
25
26 import rift.downloader as url_downloader
27
28 import functools
29 import concurrent
30
31 if sys.version_info < (3, 4, 4):
32 asyncio.ensure_future = asyncio.async
33
34 class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol):
35
36 def __init__(self, log, dts, loop, project):
37 super().__init__(log, dts, loop, project)
38 self.tasks = {}
39
40
41 def xpath(self, download_id=None):
42 return self._project.add_project("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job" +
43 ("[download-id='{}']".
44 format(download_id) if download_id else ""))
45
46 @asyncio.coroutine
47 def _dts_publisher(self, job):
48 # Publish the download state
49 self.reg.update_element(
50 self.xpath(download_id=job.download_id), job)
51
52 @asyncio.coroutine
53 def register(self):
54 self.reg = yield from self.dts.register(xpath=self.xpath(),
55 flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
56
57 assert self.reg is not None
58
59 def dergister(self):
60 self._log.debug("De-registering download status for project {}".
61 format(self.project.name))
62 if self.reg:
63 self.reg.deregister()
64 self.reg = None
65
66 @staticmethod
67 def _async_func(func, fut):
68 try:
69 ret = func()
70 fut.set_result(ret)
71 except Exception as e:
72 fut.set_exception(e)
73
74 def _schedule_dts_work(self, download_job_msg):
75 # Create a coroutine
76 cort = self._dts_publisher(download_job_msg)
77 # Use main asyncio loop (running in main thread)
78 newfunc = functools.partial(asyncio.ensure_future, cort, loop=self.loop)
79 fut = concurrent.futures.Future()
80 # Schedule future in main thread immediately
81 self.loop.call_soon_threadsafe(DownloadStatusPublisher._async_func, newfunc, fut)
82 res = fut.result()
83 exc = fut.exception()
84 if exc is not None:
85 self.log.error("Caught future exception during download: %s type %s", str(exc), type(exc))
86 raise exc
87 return res
88
89 def on_download_progress(self, download_job_msg):
90 """callback that triggers update.
91 """
92 # Trigger progess update
93 # Schedule a future in the main thread
94 self._schedule_dts_work(download_job_msg)
95
96 def on_download_finished(self, download_job_msg):
97 """callback that triggers update.
98 """
99
100 # clean up the local cache
101 key = download_job_msg.download_id
102 if key in self.tasks:
103 del self.tasks[key]
104
105 # Publish the final state
106 # Schedule a future in the main thread
107 self._schedule_dts_work(download_job_msg)
108
109 @asyncio.coroutine
110 def register_downloader(self, downloader):
111 downloader.delegate = self
112 future = self.loop.run_in_executor(None, downloader.download)
113 self.tasks[downloader.download_id] = (downloader, future)
114
115 return downloader.download_id
116
117 @asyncio.coroutine
118 def cancel_download(self, key):
119 task, future = self.tasks[key]
120
121 future.cancel()
122 task.cancel_download()
123
124 def stop(self):
125 self.deregister()
126
127 for task, future in self.tasks:
128 task.cancel()
129 future.cancel()
130
131 def deregister(self):
132 """ de-register with dts """
133 if self.reg is not None:
134 self.reg.deregister()
135 self.reg = None