New feature: Code changes for project support
[osm/SO.git] / rwlaunchpad / plugins / rwpkgmgr / rift / tasklets / rwpkgmgr / publisher / download_status.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Author(s): Varun Prasad
17 # Creation Date: 09/25/2016
18 #
19
20 import asyncio
21 import uuid
22
23 from gi.repository import (RwDts as rwdts)
24 import rift.mano.dts as mano_dts
25
26 import rift.downloader as url_downloader
27
28
29 class DownloadStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol):
30
31 def __init__(self, log, dts, loop, project):
32 super().__init__(log, dts, loop, project)
33 self.tasks = {}
34
35 def xpath(self, download_id=None):
36 return self._project.add_project("D,/rw-pkg-mgmt:download-jobs/rw-pkg-mgmt:job" +
37 ("[download-id='{}']".
38 format(download_id) if download_id else ""))
39
40 @asyncio.coroutine
41 def register(self):
42 self.reg = yield from self.dts.register(xpath=self.xpath(),
43 flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
44
45 assert self.reg is not None
46
47 def dergister(self):
48 self._log.debug("De-registering download status for project {}".
49 format(self.project.name))
50 if self.reg:
51 self.reg.deregister()
52 self.reg = None
53
54 def on_download_progress(self, download_job_msg):
55 """callback that triggers update.
56 """
57 key = download_job_msg.download_id
58 # Trigger progess update
59 self.reg.update_element(
60 self.xpath(download_id=key),
61 download_job_msg)
62
63 def on_download_finished(self, download_job_msg):
64 """callback that triggers update.
65 """
66
67 # clean up the local cache
68 key = download_job_msg.download_id
69 if key in self.tasks:
70 del self.tasks[key]
71
72 # Publish the final state
73 self.reg.update_element(
74 self.xpath(download_id=key),
75 download_job_msg)
76
77 @asyncio.coroutine
78 def register_downloader(self, downloader):
79 downloader.delegate = self
80 future = self.loop.run_in_executor(None, downloader.download)
81 self.tasks[downloader.download_id] = (downloader, future)
82
83 return downloader.download_id
84
85 @asyncio.coroutine
86 def cancel_download(self, key):
87 task, future = self.tasks[key]
88
89 future.cancel()
90 task.cancel_download()
91
92 def stop(self):
93 self.deregister()
94
95 for task, future in self.tasks:
96 task.cancel()
97 future.cancel()
98
99 def deregister(self):
100 """ de-register with dts """
101 if self.reg is not None:
102 self.reg.deregister()
103 self.reg = None