X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwpkgmgr%2Frift%2Ftasklets%2Frwpkgmgr%2Fpublisher%2Fcopy_status.py;fp=rwlaunchpad%2Fplugins%2Frwpkgmgr%2Frift%2Ftasklets%2Frwpkgmgr%2Fpublisher%2Fcopy_status.py;h=ffec4f05d2b2da2b411f5f35b2bed2770c922b9a;hb=4870d0ee29789b859931e4e2c73e13dcb29537d5;hp=927331c1eeff33cda37d49b6571be8cbb3c5fe5d;hpb=6f1a3fe149e4a6b9803382cb299c902f4cf58ec9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/copy_status.py b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/copy_status.py index 927331c1..ffec4f05 100644 --- a/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/copy_status.py +++ b/rwlaunchpad/plugins/rwpkgmgr/rift/tasklets/rwpkgmgr/publisher/copy_status.py @@ -16,17 +16,20 @@ # Author(s): Nandan Sinha # -import sys -import asyncio -import uuid import abc +import asyncio import functools +import gi +import sys +import uuid from concurrent.futures import Future from gi.repository import (RwDts as rwdts) import rift.mano.dts as mano_dts import rift.downloader as url_downloader import rift.tasklets.rwlaunchpad.onboard as onboard +gi.require_version('RwKeyspec', '1.0') +from gi.repository.RwKeyspec import quoted_key if sys.version_info < (3, 4, 4): asyncio.ensure_future = asyncio.async @@ -34,16 +37,16 @@ if sys.version_info < (3, 4, 4): class CopyStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol): - def __init__(self, log, dts, loop, tasklet_info): - super().__init__(log, dts, loop) - self.tasks = {} - self.tasklet_info = tasklet_info + def __init__(self, log, dts, loop, project): + super().__init__(log, dts, loop, project) + self.tasks = {} + self.tasklet_info = project.tasklet.tasklet_info def xpath(self, transaction_id=None): - return ("D,/rw-pkg-mgmt:copy-jobs/rw-pkg-mgmt:job" + - ("[transaction-id='{}']".format(transaction_id) if transaction_id else "")) + return self.project.add_project("D,/rw-pkg-mgmt:copy-jobs/rw-pkg-mgmt:job" + + ("[transaction-id={}]".format(quoted_key(transaction_id)) if transaction_id else "")) pass - + @asyncio.coroutine def register(self): self.reg = yield from self.dts.register(xpath=self.xpath(), @@ -51,6 +54,11 @@ class CopyStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol assert self.reg is not None + def deregister(self): + if self.reg: + self.reg.deregister() + self.reg = None + @asyncio.coroutine def register_copier(self, copier): copier.delegate = self @@ -89,7 +97,7 @@ class CopyStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol def on_download_progress(self, job_msg): """callback that triggers update. """ - return self._schedule_dts_work(job_msg) + return self._schedule_dts_work(job_msg.progress) def on_download_finished(self, job_msg): """callback that triggers update. @@ -99,24 +107,15 @@ class CopyStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol if key in self.tasks: del self.tasks[key] - return self._schedule_dts_work(job_msg) + return self._schedule_dts_work(job_msg.progress) def on_download_succeeded(self, job_msg): """Post the catalog descriptor object to the http endpoint. - Argument: job_msg (proto-gi descriptor_msg of the copied descriptor) + Argument: job_msg (of type PackageFileCopier) """ - manifest = self.tasklet_info.get_pb_manifest() - use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl - ssl_cert, ssl_key = None, None - if use_ssl: - ssl_cert = manifest.bootstrap_phase.rwsecurity.cert - ssl_key = manifest.bootstrap_phase.rwsecurity.key - - onboarder = onboard.DescriptorOnboarder(self.log, - "127.0.0.1", 8008, use_ssl, ssl_cert, ssl_key) try: - onboarder.onboard(job_msg) + job_msg.onboarder.onboard(job_msg.descriptor_msg, project=self._project.name) except onboard.OnboardError as e: self.log.error("Onboard exception triggered while posting copied catalog descriptor %s", e) raise