update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwpkgmgr / rift / tasklets / rwpkgmgr / publisher / copy_status.py
1 #
2 # Copyright 2017 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Author(s): Nandan Sinha
17 #
18
19 import abc
20 import asyncio
21 import functools
22 import gi
23 import sys
24 import uuid
25 from concurrent.futures import Future
26
27 from gi.repository import (RwDts as rwdts)
28 import rift.mano.dts as mano_dts
29 import rift.downloader as url_downloader
30 import rift.tasklets.rwlaunchpad.onboard as onboard
31 gi.require_version('RwKeyspec', '1.0')
32 from gi.repository.RwKeyspec import quoted_key
33
34 if sys.version_info < (3, 4, 4):
35 asyncio.ensure_future = asyncio.async
36
37
38 class CopyStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol):
39
40 def __init__(self, log, dts, loop, project):
41 super().__init__(log, dts, loop, project)
42 self.tasks = {}
43 self.tasklet_info = project.tasklet.tasklet_info
44
45 def xpath(self, transaction_id=None):
46 return self.project.add_project("D,/rw-pkg-mgmt:copy-jobs/rw-pkg-mgmt:job" +
47 ("[transaction-id={}]".format(quoted_key(transaction_id)) if transaction_id else ""))
48 pass
49
50 @asyncio.coroutine
51 def register(self):
52 self.reg = yield from self.dts.register(xpath=self.xpath(),
53 flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
54
55 assert self.reg is not None
56
57 def deregister(self):
58 if self.reg:
59 self.reg.deregister()
60 self.reg = None
61
62 @asyncio.coroutine
63 def register_copier(self, copier):
64 copier.delegate = self
65 future = self.loop.run_in_executor(None, copier.copy)
66 self.tasks[copier.transaction_id] = (copier, future)
67
68 return (copier.transaction_id, copier.dest_package_id)
69
70 @asyncio.coroutine
71 def _dts_publisher(self, job_msg):
72 # Publish the download state
73 self.reg.update_element(
74 self.xpath(transaction_id=job_msg.transaction_id), job_msg)
75
76 @staticmethod
77 def _async_add(func, fut):
78 try:
79 ret = func()
80 fut.set_result(ret)
81 except Exception as e:
82 fut.set_exception(e)
83
84 def _schedule_dts_work(self, job_msg):
85 f = functools.partial(
86 asyncio.ensure_future,
87 self._dts_publisher(job_msg),
88 loop = self.loop)
89 fut = Future()
90 self.loop.call_soon_threadsafe(CopyStatusPublisher._async_add, f, fut)
91 xx = fut.result()
92 if fut.exception() is not None:
93 self.log.error("Caught future exception during download: %s type %s", str(fut.exception()), type(fut.exception()))
94 raise fut.exception()
95 return xx
96
97 def on_download_progress(self, job_msg):
98 """callback that triggers update.
99 """
100 return self._schedule_dts_work(job_msg.progress)
101
102 def on_download_finished(self, job_msg):
103 """callback that triggers update.
104 """
105 # clean up the local cache
106 key = job_msg.transaction_id
107 if key in self.tasks:
108 del self.tasks[key]
109
110 return self._schedule_dts_work(job_msg.progress)
111
112 def on_download_succeeded(self, job_msg):
113 """Post the catalog descriptor object to the http endpoint.
114 Argument: job_msg (of type PackageFileCopier)
115
116 """
117 try:
118 job_msg.onboarder.onboard(job_msg.descriptor_msg, project=self._project.name)
119 except onboard.OnboardError as e:
120 self.log.error("Onboard exception triggered while posting copied catalog descriptor %s", e)
121 raise
122
123