05062c18ce0d15ba3739e431cc9ac9506dc6019c
[osm/SO.git] / rwlaunchpad / plugins / rwpkgmgr / rift / tasklets / rwpkgmgr / publisher / download_status.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Author(s): Varun Prasad
17 # Creation Date: 09/25/2016
18 #
19
20 import asyncio
21 import gi
22 import sys
23
24 from gi.repository import (RwDts as rwdts)
25 import rift.mano.dts as mano_dts
26
27 import rift.downloader as url_downloader
28 gi.require_version('RwKeyspec', '1.0')
29 from gi.repository.RwKeyspec import quoted_key
30
31 import functools
32 import concurrent
33
34 if sys.version_info < (3, 4, 4):
35 asyncio.ensure_future = asyncio.async
36
37 class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol):
38
39 def __init__(self, log, dts, loop, project):
40 super().__init__(log, dts, loop, project)
41 self.tasks = {}
42
43
44 def xpath(self, download_id=None):
45 return self._project.add_project("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job" +
46 ("[download-id={}]".
47 format(quoted_key(download_id)) if download_id else ""))
48
49 @asyncio.coroutine
50 def _dts_publisher(self, job):
51 # Publish the download state
52 self.reg.update_element(
53 self.xpath(download_id=job.download_id), job)
54
55 @asyncio.coroutine
56 def register(self):
57 self.reg = yield from self.dts.register(xpath=self.xpath(),
58 flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
59
60 assert self.reg is not None
61
62 def dergister(self):
63 self._log.debug("De-registering download status for project {}".
64 format(self.project.name))
65 if self.reg:
66 self.reg.deregister()
67 self.reg = None
68
69 @staticmethod
70 def _async_func(func, fut):
71 try:
72 ret = func()
73 fut.set_result(ret)
74 except Exception as e:
75 fut.set_exception(e)
76
77 def _schedule_dts_work(self, download_job_msg):
78 # Create a coroutine
79 cort = self._dts_publisher(download_job_msg)
80 # Use main asyncio loop (running in main thread)
81 newfunc = functools.partial(asyncio.ensure_future, cort, loop=self.loop)
82 fut = concurrent.futures.Future()
83 # Schedule future in main thread immediately
84 self.loop.call_soon_threadsafe(DownloadStatusPublisher._async_func, newfunc, fut)
85 res = fut.result()
86 exc = fut.exception()
87 if exc is not None:
88 self.log.error("Caught future exception during download: %s type %s", str(exc), type(exc))
89 raise exc
90 return res
91
92 def on_download_progress(self, download_job_msg):
93 """callback that triggers update.
94 """
95 # Trigger progess update
96 # Schedule a future in the main thread
97 self._schedule_dts_work(download_job_msg)
98
99 def on_download_finished(self, download_job_msg):
100 """callback that triggers update.
101 """
102
103 # clean up the local cache
104 key = download_job_msg.download_id
105 if key in self.tasks:
106 del self.tasks[key]
107
108 # Publish the final state
109 # Schedule a future in the main thread
110 self._schedule_dts_work(download_job_msg)
111
112 @asyncio.coroutine
113 def register_downloader(self, downloader):
114 downloader.delegate = self
115 future = self.loop.run_in_executor(None, downloader.download)
116 self.tasks[downloader.download_id] = (downloader, future)
117
118 return downloader.download_id
119
120 @asyncio.coroutine
121 def cancel_download(self, key):
122 task, future = self.tasks[key]
123
124 future.cancel()
125 task.cancel_download()
126
127 def stop(self):
128 self.deregister()
129
130 for task, future in self.tasks:
131 task.cancel()
132 future.cancel()
133
134 def deregister(self):
135 """ de-register with dts """
136 if self.reg is not None:
137 self.reg.deregister()
138 self.reg = None