6890241e930ffa8c0582814d76b8cda388257a5f
[osm/SO.git] / rwlaunchpad / plugins / rwpkgmgr / rift / tasklets / rwpkgmgr / publisher / download_status.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Author(s): Varun Prasad
17 # Creation Date: 09/25/2016
18 #
19
20 import asyncio
21 import uuid
22
23 from gi.repository import (RwDts as rwdts)
24 import rift.mano.dts as mano_dts
25
26 import rift.downloader as url_downloader
27
28
29 class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol):
30
31 def __init__(self, log, dts, loop):
32 super().__init__(log, dts, loop)
33 self.tasks = {}
34
35 def xpath(self, download_id=None):
36 return ("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job" +
37 ("[download-id='{}']".format(download_id) if download_id else ""))
38
39 @asyncio.coroutine
40 def register(self):
41 self.reg = yield from self.dts.register(xpath=self.xpath(),
42 flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
43
44 assert self.reg is not None
45
46
47 def on_download_progress(self, download_job_msg):
48 """callback that triggers update.
49 """
50 key = download_job_msg.download_id
51 # Trigger progess update
52 self.reg.update_element(
53 self.xpath(download_id=key),
54 download_job_msg)
55
56 def on_download_finished(self, download_job_msg):
57 """callback that triggers update.
58 """
59
60 # clean up the local cache
61 key = download_job_msg.download_id
62 if key in self.tasks:
63 del self.tasks[key]
64
65 # Publish the final state
66 self.reg.update_element(
67 self.xpath(download_id=key),
68 download_job_msg)
69
70 @asyncio.coroutine
71 def register_downloader(self, downloader):
72 downloader.delegate = self
73 future = self.loop.run_in_executor(None, downloader.download)
74 self.tasks[downloader.download_id] = (downloader, future)
75
76 return downloader.download_id
77
78 @asyncio.coroutine
79 def cancel_download(self, key):
80 task, future = self.tasks[key]
81
82 future.cancel()
83 task.cancel_download()
84
85 def stop(self):
86 self.deregister()
87
88 for task, future in self.tasks:
89 task.cancel()
90 future.cancel()
91
92 def deregister(self):
93 """ de-register with dts """
94 if self.reg is not None:
95 self.reg.deregister()
96 self.reg = None