# Author(s): Nandan Sinha
#
-import sys
-import asyncio
-import uuid
import abc
+import asyncio
import functools
+import gi
+import sys
+import uuid
from concurrent.futures import Future
from gi.repository import (RwDts as rwdts)
import rift.mano.dts as mano_dts
import rift.downloader as url_downloader
import rift.tasklets.rwlaunchpad.onboard as onboard
+gi.require_version('RwKeyspec', '1.0')
+from gi.repository.RwKeyspec import quoted_key
if sys.version_info < (3, 4, 4):
asyncio.ensure_future = asyncio.async
class CopyStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol):
- def __init__(self, log, dts, loop, tasklet_info):
- super().__init__(log, dts, loop)
- self.tasks = {}
- self.tasklet_info = tasklet_info
+ def __init__(self, log, dts, loop, project):
+ super().__init__(log, dts, loop, project)
+ self.tasks = {}
+ self.tasklet_info = project.tasklet.tasklet_info
def xpath(self, transaction_id=None):
- return ("D,/rw-pkg-mgmt:copy-jobs/rw-pkg-mgmt:job" +
- ("[transaction-id='{}']".format(transaction_id) if transaction_id else ""))
+ return self.project.add_project("D,/rw-pkg-mgmt:copy-jobs/rw-pkg-mgmt:job" +
+ ("[transaction-id={}]".format(quoted_key(transaction_id)) if transaction_id else ""))
pass
-
+
@asyncio.coroutine
def register(self):
self.reg = yield from self.dts.register(xpath=self.xpath(),
assert self.reg is not None
+ def deregister(self):
+ if self.reg:
+ self.reg.deregister()
+ self.reg = None
+
@asyncio.coroutine
def register_copier(self, copier):
copier.delegate = self
def on_download_progress(self, job_msg):
"""callback that triggers update.
"""
- return self._schedule_dts_work(job_msg)
+ return self._schedule_dts_work(job_msg.progress)
def on_download_finished(self, job_msg):
"""callback that triggers update.
if key in self.tasks:
del self.tasks[key]
- return self._schedule_dts_work(job_msg)
+ return self._schedule_dts_work(job_msg.progress)
def on_download_succeeded(self, job_msg):
"""Post the catalog descriptor object to the http endpoint.
- Argument: job_msg (proto-gi descriptor_msg of the copied descriptor)
+ Argument: job_msg (of type PackageFileCopier)
"""
- manifest = self.tasklet_info.get_pb_manifest()
- use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
- ssl_cert, ssl_key = None, None
- if use_ssl:
- ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
- ssl_key = manifest.bootstrap_phase.rwsecurity.key
-
- onboarder = onboard.DescriptorOnboarder(self.log,
- "127.0.0.1", 8008, use_ssl, ssl_cert, ssl_key)
try:
- onboarder.onboard(job_msg)
+ job_msg.onboarder.onboard(job_msg.descriptor_msg, project=self._project.name)
except onboard.OnboardError as e:
self.log.error("Onboard exception triggered while posting copied catalog descriptor %s", e)
raise