+ def on_ro_account_deleted(self, account_name):
+ self._log.debug(" launchpad tasklet RO account deleted")
+ self._app.ro_accounts[self._project.name] = \
+ list(self.ro_cfg_handler.accounts.values())
+ self.ro_operdata_handler.delete_ro_account(account_name)
+
+ def on_ro_account_added(self, account):
+ self._log.debug(" launchpad tasklet RO account added")
+ self._app.ro_accounts[self._project.name] = \
+ list(self.ro_cfg_handler.accounts.values())
+ self._log.debug("Accounts: %s", self._app.ro_accounts)
+ self.ro_operdata_handler.add_ro_account(account)
+
+ @asyncio.coroutine
+ def register(self):
+ yield from self.ro_cfg_handler.register()
+ yield from self.ro_operdata_handler.register()
+
+ def deregister(self):
+ self.ro_cfg_handler.deregister()
+ self.ro_operdata_handler.deregister()
+
+class StatusHandlers(object):
+ STATUS_MAP = {
+ downloader.DownloadStatus.STARTED: TaskStatus.QUEUED.value_nick.upper(),
+ downloader.DownloadStatus.IN_PROGRESS: TaskStatus.IN_PROGRESS.value_nick.upper(),
+ downloader.DownloadStatus.COMPLETED: TaskStatus.COMPLETED.value_nick.upper(),
+ downloader.DownloadStatus.FAILED: TaskStatus.FAILED.value_nick.upper(),
+ downloader.DownloadStatus.CANCELLED: TaskStatus.CANCELLED.value_nick.upper()
+ }
+
+ def __init__(self, dts, log, loop, app, project):
+ self.log = log
+ self.dts = dts
+ self.loop = loop
+ self.app = app
+ self.project = project
+
+ @abc.abstractmethod
+ def xpath(self, transaction_id=None):
+ return
+
+ @asyncio.coroutine
+ def register(self):
+ self.reg = yield from self.dts.register(xpath=self.xpath(),
+ flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
+
+ assert self.reg is not None
+
+ def deregister(self):
+ if self.reg:
+ self.reg.deregister()
+ self.reg = None
+
+
+class UploadStatusHandlers(StatusHandlers):
+ """Publisher for status of onboarded packages.
+ """
+ def __init__(self, dts, log, loop, app, project):
+ super(UploadStatusHandlers, self).__init__(dts, log, loop, app, project)
+ self.reg = None
+ self.transaction_to_job_map = {}
+
+ def xpath(self, transaction_id=None):
+ return self.project.add_project("D,/rw-pkg-mgmt:create-jobs/rw-pkg-mgmt:job" +
+ ("[transaction-id={}]".format(quoted_key(transaction_id)) if transaction_id else ""))
+
+ def create_job_xpath(self):
+ return self.project.add_project("D,/rw-pkg-mgmt:create-jobs")
+
+ @asyncio.coroutine
+ def register(self):
+ @asyncio.coroutine
+ def on_prepare(xact_info, action, ks_path, msg):
+ """ prepare callback from dts """
+
+ if action == rwdts.QueryAction.READ:
+ xpath = ks_path.to_xpath(RwPkgMgmtYang.get_schema())
+ path_entry = RwPkgMgmtYang.YangData_RwProject_Project_CreateJobs_Job().schema().keyspec_to_entry(ks_path)
+ transaction_id = path_entry.key00.transaction_id
+ if transaction_id:
+ create_job_msg = msg.as_dict()
+ if create_job_msg:
+ if transaction_id in self.transaction_to_job_map:
+ job = self.transaction_to_job_map[transaction_id]
+ xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
+ xpath=xpath,
+ msg=job)
+ return
+ else:
+ jobs = RwPkgMgmtYang.YangData_RwProject_Project_CreateJobs()
+ for job in self.transaction_to_job_map.values():
+ jb = RwPkgMgmtYang.YangData_RwProject_Project_CreateJobs_Job.from_dict({
+ "transaction_id": job.transaction_id,
+ "status": job.status
+ })
+ jobs.job.append(jb)
+ xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
+ xpath=self.create_job_xpath(),
+ msg=jobs)
+ return
+ xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+
+ hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
+ with self.dts.group_create() as group:
+ self.reg = group.register(xpath=self.xpath(),
+ handler=hdl,
+ flags=rwdts.Flag.PUBLISHER,
+ )
+
+ def upload_status(self, job, trans_id):
+ try:
+ create_job = RwPkgMgmtYang.YangData_RwProject_Project_CreateJobs_Job.from_dict({
+ "transaction_id": trans_id,
+ "status": StatusHandlers.STATUS_MAP[job.status]
+ })
+ self.transaction_to_job_map[trans_id] = create_job
+ except Exception as e:
+ self.log.error("Exception : {}".format(e))
+
+class UpdateStatusHandlers(StatusHandlers):
+ """Publisher for status of updated packages.
+ """
+ def __init__(self, dts, log, loop, app, project):
+ super(UpdateStatusHandlers, self).__init__(dts, log, loop, app, project)
+
+ def xpath(self, transaction_id=None):
+ return self.project.add_project("D,/rw-pkg-mgmt:update-jobs/rw-pkg-mgmt:job" +
+ ("[transaction-id={}]".format(quoted_key(transaction_id)) if transaction_id else ""))
+
+ @asyncio.coroutine
+ def schedule_dts_work(self, job, transaction_id):
+ # Publish the download state
+ create_job = RwPkgMgmtYang.YangData_RwProject_Project_UpdateJobs_Job.from_dict({
+ "transaction_id": transaction_id,
+ "status": StatusHandlers.STATUS_MAP[job.status]
+ })
+
+ self.reg.update_element(
+ self.xpath(transaction_id=transaction_id), create_job)
+
+ def update_status(self, job, trans_id):
+ self.log.debug("Download completed, writing status of task")
+ asyncio.ensure_future(self.schedule_dts_work(job, trans_id), loop=self.loop)
+
+class LaunchpadProject(ManoProject):
+
+ def __init__(self, name, tasklet, **kw):
+ super(LaunchpadProject, self).__init__(tasklet.log, name)
+ self.update(tasklet)
+ self._app = kw['app']