update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwpkgmgr / rift / tasklets / rwpkgmgr / publisher / copy_status.py
index 927331c..ffec4f0 100644 (file)
 #   Author(s): Nandan Sinha
 #
 
-import sys
-import asyncio
-import uuid
 import abc
+import asyncio
 import functools 
+import gi
+import sys
+import uuid
 from concurrent.futures import Future
 
 from gi.repository import (RwDts as rwdts)
 import rift.mano.dts as mano_dts
 import rift.downloader as url_downloader
 import rift.tasklets.rwlaunchpad.onboard as onboard 
+gi.require_version('RwKeyspec', '1.0')
+from gi.repository.RwKeyspec import quoted_key
 
 if sys.version_info < (3, 4, 4): 
     asyncio.ensure_future = asyncio.async
@@ -34,16 +37,16 @@ if sys.version_info < (3, 4, 4):
 
 class CopyStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol): 
 
-    def __init__(self, log, dts, loop, tasklet_info):
-        super().__init__(log, dts, loop
-        self.tasks = {} 
-        self.tasklet_info = tasklet_info
+    def __init__(self, log, dts, loop, project):
+        super().__init__(log, dts, loop, project)
+        self.tasks = {}
+        self.tasklet_info = project.tasklet.tasklet_info
 
     def xpath(self, transaction_id=None):
-        return ("D,/rw-pkg-mgmt:copy-jobs/rw-pkg-mgmt:job" +
-            ("[transaction-id='{}']".format(transaction_id) if transaction_id else ""))
+        return self.project.add_project("D,/rw-pkg-mgmt:copy-jobs/rw-pkg-mgmt:job" +
+            ("[transaction-id={}]".format(quoted_key(transaction_id)) if transaction_id else ""))
         pass
-    
+
     @asyncio.coroutine
     def register(self):
         self.reg = yield from self.dts.register(xpath=self.xpath(),
@@ -51,6 +54,11 @@ class CopyStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol
 
         assert self.reg is not None
 
+    def deregister(self):
+        if self.reg:
+            self.reg.deregister()
+            self.reg = None
+
     @asyncio.coroutine
     def register_copier(self, copier):
         copier.delegate = self
@@ -89,7 +97,7 @@ class CopyStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol
     def on_download_progress(self, job_msg):
         """callback that triggers update.
         """
-        return self._schedule_dts_work(job_msg) 
+        return self._schedule_dts_work(job_msg.progress
 
     def on_download_finished(self, job_msg):
         """callback that triggers update.
@@ -99,24 +107,15 @@ class CopyStatusPublisher(mano_dts.DtsHandler, url_downloader.DownloaderProtocol
         if key in self.tasks:
             del self.tasks[key]
 
-        return self._schedule_dts_work(job_msg)
+        return self._schedule_dts_work(job_msg.progress)
 
     def on_download_succeeded(self, job_msg): 
         """Post the catalog descriptor object to the http endpoint.
-        Argument: job_msg (proto-gi descriptor_msg of the copied descriptor)
+        Argument: job_msg  (of type PackageFileCopier) 
 
         """
-        manifest = self.tasklet_info.get_pb_manifest()
-        use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl
-        ssl_cert, ssl_key = None, None 
-        if use_ssl:
-            ssl_cert = manifest.bootstrap_phase.rwsecurity.cert
-            ssl_key = manifest.bootstrap_phase.rwsecurity.key
-
-        onboarder = onboard.DescriptorOnboarder(self.log, 
-                "127.0.0.1", 8008, use_ssl, ssl_cert, ssl_key)
         try:
-            onboarder.onboard(job_msg)
+            job_msg.onboarder.onboard(job_msg.descriptor_msg, project=self._project.name)
         except onboard.OnboardError as e: 
             self.log.error("Onboard exception triggered while posting copied catalog descriptor %s", e)
             raise