self._job = job
self.nsr_id = nsr_id
self.tasks = tasks
+ self._regh = None
@property
def id(self):
"/nsr:config-agent-job[nsr:job-id='{}']"
).format(self.nsr_id, self.id)
+ @property
+ def regh(self):
+ """Registration handle for the job"""
+ return self._regh
+
+ @regh.setter
+ def regh(self, hdl):
+ """Setter for registration handle"""
+ self._regh = hdl
+
@staticmethod
def convert_rpc_input_to_job(nsr_id, rpc_output, tasks):
"""A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive
)
self.log.debug('preparing to publish job status for {}'.format(self.job.xpath))
+ self.job.regh = registration_handle
try:
registration_handle.create_element(self.job.xpath, self.job.job)
self._nsm = nsm
self._regh = None
+ self._nsr_regh = None
@property
def regh(self):
nsr_ids = [nsr_id]
for nsr_id in nsr_ids:
- job = self.cfgm.get_job(nsr_id)
+ jobs = self.cfgm.get_job(nsr_id)
- # If no jobs are queued for the NSR
- if job is None:
- continue
-
- xact_info.respond_xpath(
- rwdts.XactRspCode.MORE,
- CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.job_id),
- job)
+ for job in jobs:
+ xact_info.respond_xpath(
+ rwdts.XactRspCode.MORE,
+ CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id),
+ job.job)
except Exception as e:
self._log.exception("Caught exception:%s", str(e))
flags=rwdts.Flag.PUBLISHER,
)
+ @asyncio.coroutine
+ def _terminate_nsr(self, nsr_id):
+ self._log.debug("NSR {} being terminated".format(nsr_id))
+ jobs = self.cfgm.get_job(nsr_id)
+ for job in jobs:
+ path = CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id)
+ with self._dts.transaction() as xact:
+ self._log.debug("Deleting job: {}".format(path))
+ job.regh.delete_element(path)
+ self._log.debug("Deleted job: {}".format(path))
+
+ # Remove the NSR id in manager
+ self.cfgm.del_nsr(nsr_id)
+
+ @property
+ def nsr_xpath(self):
+ return "D,/nsr:ns-instance-opdata/nsr:nsr"
+
+ @asyncio.coroutine
+ def register_for_nsr(self):
+ """ Register for NSR changes """
+
+ @asyncio.coroutine
+ def on_prepare(xact_info, query_action, ks_path, msg):
+ """ This NSR is created """
+ self._log.debug("Received NSR instantiate on_prepare (%s:%s:%s)",
+ query_action,
+ ks_path,
+ msg)
+
+ if (query_action == rwdts.QueryAction.UPDATE or
+ query_action == rwdts.QueryAction.CREATE):
+ pass
+ elif query_action == rwdts.QueryAction.DELETE:
+ nsr_id = msg.ns_instance_config_ref
+ asyncio.ensure_future(self._terminate_nsr(nsr_id), loop=self._loop)
+ else:
+ raise NotImplementedError(
+ "%s action on cm-state not supported",
+ query_action)
+
+ xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+
+ try:
+ handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
+ self._nsr_regh = yield from self._dts.register(self.nsr_xpath,
+ flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
+ handler=handler)
+ except Exception as e:
+ self._log.error("Failed to register for NSR changes as %s", str(e))
+
class ConfigAgentJobManager(object):
"""A central class that manager all the Config Agent related data,
"""
nsr_id = rpc_output.nsr_id_ref
- self.jobs[nsr_id] = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, tasks)
+ job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, tasks)
self.log.debug("Creating a job monitor for Job id: {}".format(
rpc_output.job_id))
+ if nsr_id not in self.jobs:
+ self.jobs[nsr_id] = [job]
+ else:
+ self.jobs[nsr_id].append(job)
+
# If the tasks are none, assume juju actions
# TBD: This logic need to be revisited
ca = self.nsm.config_agent_plugins[0]
job_monitor = ConfigAgentJobMonitor(
self.dts,
self.log,
- self.jobs[nsr_id],
+ job,
self.executor,
self.loop,
ca
def get_job(self, nsr_id):
"""Get the job associated with the NSR Id, if present."""
try:
- return self.jobs[nsr_id].job
+ return self.jobs[nsr_id]
except KeyError:
- return None
+ return []
+
+ def del_nsr(self, nsr_id):
+ """Delete a NSR id from the jobs list"""
+ if nsr_id in self.jobs:
+ self.jobs.pop(nsr_id)
@asyncio.coroutine
def register(self):
yield from self.handler.register()
+ yield from self.handler.register_for_nsr()