927331c1eeff33cda37d49b6571be8cbb3c5fe5d
[osm/SO.git] / rwlaunchpad / plugins / rwpkgmgr / rift / tasklets / rwpkgmgr / publisher / copy_status.py
1 #
2 # Copyright 2017 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Author(s): Nandan Sinha
17 #
18
19 import sys
20 import asyncio
21 import uuid
22 import abc
23 import functools
24 from concurrent.futures import Future
25
26 from gi.repository import (RwDts as rwdts)
27 import rift.mano.dts as mano_dts
28 import rift.downloader as url_downloader
29 import rift.tasklets.rwlaunchpad.onboard as onboard
30
31 if sys.version_info < (3, 4, 4):
32 asyncio.ensure_future = asyncio.async
33
34
35 class CopyStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol):
36
37 def __init__(self, log, dts, loop, tasklet_info):
38 super().__init__(log, dts, loop)
39 self.tasks = {}
40 self.tasklet_info = tasklet_info
41
42 def xpath(self, transaction_id=None):
43 return ("D,/rw-pkg-mgmt:copy-jobs/rw-pkg-mgmt:job" +
44 ("[transaction-id='{}']".format(transaction_id) if transaction_id else ""))
45 pass
46
47 @asyncio.coroutine
48 def register(self):
49 self.reg = yield from self.dts.register(xpath=self.xpath(),
50 flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
51
52 assert self.reg is not None
53
54 @asyncio.coroutine
55 def register_copier(self, copier):
56 copier.delegate = self
57 future = self.loop.run_in_executor(None, copier.copy)
58 self.tasks[copier.transaction_id] = (copier, future)
59
60 return (copier.transaction_id, copier.dest_package_id)
61
62 @asyncio.coroutine
63 def _dts_publisher(self, job_msg):
64 # Publish the download state
65 self.reg.update_element(
66 self.xpath(transaction_id=job_msg.transaction_id), job_msg)
67
68 @staticmethod
69 def _async_add(func, fut):
70 try:
71 ret = func()
72 fut.set_result(ret)
73 except Exception as e:
74 fut.set_exception(e)
75
76 def _schedule_dts_work(self, job_msg):
77 f = functools.partial(
78 asyncio.ensure_future,
79 self._dts_publisher(job_msg),
80 loop = self.loop)
81 fut = Future()
82 self.loop.call_soon_threadsafe(CopyStatusPublisher._async_add, f, fut)
83 xx = fut.result()
84 if fut.exception() is not None:
85 self.log.error("Caught future exception during download: %s type %s", str(fut.exception()), type(fut.exception()))
86 raise fut.exception()
87 return xx
88
89 def on_download_progress(self, job_msg):
90 """callback that triggers update.
91 """
92 return self._schedule_dts_work(job_msg)
93
94 def on_download_finished(self, job_msg):
95 """callback that triggers update.
96 """
97 # clean up the local cache
98 key = job_msg.transaction_id
99 if key in self.tasks:
100 del self.tasks[key]
101
102 return self._schedule_dts_work(job_msg)
103
104 def on_download_succeeded(self, job_msg):
105 """Post the catalog descriptor object to the http endpoint.
106 Argument: job_msg (proto-gi descriptor_msg of the copied descriptor)
107
108 """
109 manifest = self.tasklet_info.get_pb_manifest()
110 use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
111 ssl_cert, ssl_key = None, None
112 if use_ssl:
113 ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
114 ssl_key = manifest.bootstrap_phase.rwsecurity.key
115
116 onboarder = onboard.DescriptorOnboarder(self.log,
117 "127.0.0.1", 8008, use_ssl, ssl_cert, ssl_key)
118 try:
119 onboarder.onboard(job_msg)
120 except onboard.OnboardError as e:
121 self.log.error("Onboard exception triggered while posting copied catalog descriptor %s", e)
122 raise
123
124