Merge from OSM SO master
[osm/SO.git] / common / python / rift / mano / config_agent / operdata.py
index 551aea2..5cbd351 100644 (file)
@@ -27,7 +27,6 @@ from gi.repository import (
     RwDts as rwdts)
 
 import rift.tasklets
-
 import rift.mano.utils.juju_api as juju
 
 
@@ -153,12 +152,15 @@ class ConfigAgentAccount(object):
                 )
 
 class CfgAgentDtsOperdataHandler(object):
-    def __init__(self, dts, log, loop):
+    def __init__(self, dts, log, loop, project):
         self._dts = dts
         self._log = log
         self._loop = loop
+        self._project = project
 
         self.cfg_agent_accounts = {}
+        self._show_reg = None
+        self._rpc_reg = None
 
     def add_cfg_agent_account(self, account_msg):
         account = ConfigAgentAccount(self._log, account_msg)
@@ -205,9 +207,10 @@ class CfgAgentDtsOperdataHandler(object):
                 for account in saved_accounts:
                     connection_status = account.connection_status
                     self._log.debug("Responding to config agent connection status request: %s", connection_status)
+                    xpath = self._project.add_project(get_xpath(account.name))
                     xact_info.respond_xpath(
                             rwdts.XactRspCode.MORE,
-                            xpath=get_xpath(account.name),
+                            xpath=xpath,
                             msg=account.connection_status,
                             )
             except KeyError as e:
@@ -217,12 +220,13 @@ class CfgAgentDtsOperdataHandler(object):
 
             xact_info.respond_xpath(rwdts.XactRspCode.ACK)
 
-        yield from self._dts.register(
-                xpath=get_xpath(),
-                handler=rift.tasklets.DTS.RegistrationHandler(
-                    on_prepare=on_prepare),
-                flags=rwdts.Flag.PUBLISHER,
-                )
+        xpath = self._project.add_project(get_xpath())
+        self._show_reg = yield from self._dts.register(
+            xpath=xpath,
+            handler=rift.tasklets.DTS.RegistrationHandler(
+                on_prepare=on_prepare),
+            flags=rwdts.Flag.PUBLISHER,
+        )
 
     def _register_validate_rpc(self):
         def get_xpath():
@@ -234,6 +238,10 @@ class CfgAgentDtsOperdataHandler(object):
                 raise ConfigAgentAccountNotFound("Config Agent account name not provided")
 
             cfg_agent_account_name = msg.cfg_agent_account
+
+            if not self._project.rpc_check(msg, xact_info=xact_info):
+                return
+
             try:
                 account = self.cfg_agent_accounts[cfg_agent_account_name]
             except KeyError:
@@ -243,24 +251,29 @@ class CfgAgentDtsOperdataHandler(object):
 
             xact_info.respond_xpath(rwdts.XactRspCode.ACK)
 
-        yield from self._dts.register(
-                xpath=get_xpath(),
-                handler=rift.tasklets.DTS.RegistrationHandler(
-                    on_prepare=on_prepare
-                    ),
-                flags=rwdts.Flag.PUBLISHER,
-                )
+        self._rpc_reg = yield from self._dts.register(
+            xpath=get_xpath(),
+            handler=rift.tasklets.DTS.RegistrationHandler(
+                on_prepare=on_prepare
+            ),
+            flags=rwdts.Flag.PUBLISHER,
+        )
 
     @asyncio.coroutine
     def register(self):
         yield from self._register_show_status()
         yield from self._register_validate_rpc()
 
+    def deregister(self):
+        self._show_reg.deregister()
+        self._rpc_reg.deregister()
+
+
 class ConfigAgentJob(object):
     """A wrapper over the config agent job object, providing some
     convenience functions.
 
-    YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob contains
+    YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob contains
     ||
      ==> VNFRS
           ||
@@ -274,17 +287,20 @@ class ConfigAgentJob(object):
                   "running"  : "pending",
                   "failed"   : "failure"}
 
-    def __init__(self, nsr_id, job, tasks=None):
+    def __init__(self, nsr_id, job, project, tasks=None):
         """
         Args:
             nsr_id (uuid): ID of NSR record
-            job (YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
+            job (YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
             tasks: List of asyncio.tasks. If provided the job monitor will
                 use it to monitor the tasks instead of the execution IDs
         """
         self._job = job
         self.nsr_id = nsr_id
         self.tasks = tasks
+        self._project = project
+
+        self._regh = None
 
     @property
     def id(self):
@@ -314,15 +330,25 @@ class ConfigAgentJob(object):
     @property
     def xpath(self):
         """Xpath of the job"""
-        return ("D,/nsr:ns-instance-opdata" +
+        return self._project.add_project(("D,/nsr:ns-instance-opdata" +
                 "/nsr:nsr[nsr:ns-instance-config-ref='{}']" +
                 "/nsr:config-agent-job[nsr:job-id='{}']"
-                ).format(self.nsr_id, self.id)
+                ).format(self.nsr_id, self.id))
+
+    @property
+    def regh(self):
+        """Registration handle for the job"""
+        return self._regh
+
+    @regh.setter
+    def regh(self, hdl):
+        """Setter for registration handle"""
+        self._regh = hdl
 
     @staticmethod
     def convert_rpc_input_to_job(nsr_id, rpc_output, tasks):
         """A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive
-        to YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
+        to YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
 
         Args:
             nsr_id (uuid): NSR ID
@@ -333,10 +359,10 @@ class ConfigAgentJob(object):
             ConfigAgentJob
         """
         # Shortcuts to prevent the HUUGE names.
-        CfgAgentJob = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob
-        CfgAgentVnfr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
-        CfgAgentPrimitive = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
-        CfgAgentPrimitiveParam =  NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
+        CfgAgentJob = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob
+        CfgAgentVnfr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
+        CfgAgentPrimitive = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
+        CfgAgentPrimitiveParam =  NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
 
         job = CfgAgentJob.from_dict({
                 "job_id": rpc_output.job_id,
@@ -374,7 +400,7 @@ class ConfigAgentJob(object):
 
             job.vnfr.append(vnfr_job)
 
-        return ConfigAgentJob(nsr_id, job, tasks)
+        return ConfigAgentJob(nsr_id, job, project, tasks)
 
 
 class ConfigAgentJobMonitor(object):
@@ -406,9 +432,25 @@ class ConfigAgentJobMonitor(object):
     @asyncio.coroutine
     def _monitor_processes(self, registration_handle):
         result = 0
+        errs = ""
         for process in self.job.tasks:
-            rc = yield from process
-            self.log.debug("Process {} returned rc: {}".format(process, rc))
+            if isinstance(process, asyncio.subprocess.Process):
+                rc = yield from process.wait()
+                err = yield from process.stderr.read()
+
+            else:
+                # Task instance
+                rc = yield from process
+                err = ''
+
+            self.log.debug("Process {} returned rc: {}, err: {}".
+                           format(process, rc, err))
+
+            if len(err):
+                if rc == 0:
+                    errs += "<success>{}</success>".format(err)
+                else:
+                    errs += "<error>{}</error>".format(err)
             result |= rc
 
         if result == 0:
@@ -416,6 +458,9 @@ class ConfigAgentJobMonitor(object):
         else:
             self.job.job_status = "failure"
 
+        if len(errs):
+            self.job.job.job_status_details = errs
+
         registration_handle.update_element(self.job.xpath, self.job.job)
 
     def get_error_details(self):
@@ -448,6 +493,7 @@ class ConfigAgentJobMonitor(object):
                 )
 
         self.log.debug('preparing to publish job status for {}'.format(self.job.xpath))
+        self.job.regh = registration_handle
 
         try:
             registration_handle.create_element(self.job.xpath, self.job.job)
@@ -637,6 +683,8 @@ class CfgAgentJobDtsHandler(object):
         self._nsm = nsm
 
         self._regh = None
+        self._nsr_regh = None
+        self._project = cfgm.project
 
     @property
     def regh(self):
@@ -655,9 +703,9 @@ class CfgAgentJobDtsHandler(object):
 
     @staticmethod
     def cfg_job_xpath(nsr_id, job_id):
-        return ("D,/nsr:ns-instance-opdata" +
+        return self._project.add_project(("D,/nsr:ns-instance-opdata" +
                 "/nsr:nsr[nsr:ns-instance-config-ref = '{}']" +
-                "/nsr:config-agent-job[nsr:job-id='{}']").format(nsr_id, job_id)
+                "/nsr:config-agent-job[nsr:job-id='{}']").format(nsr_id, job_id))
 
     @asyncio.coroutine
     def register(self):
@@ -668,7 +716,7 @@ class CfgAgentJobDtsHandler(object):
             """ prepare callback from dts """
             xpath = ks_path.to_xpath(RwNsrYang.get_schema())
             if action == rwdts.QueryAction.READ:
-                schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.schema()
+                schema = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.schema()
                 path_entry = schema.keyspec_to_entry(ks_path)
                 try:
                     nsr_id = path_entry.key00.ns_instance_config_ref
@@ -682,16 +730,13 @@ class CfgAgentJobDtsHandler(object):
                         nsr_ids = [nsr_id]
 
                     for nsr_id in nsr_ids:
-                        job = self.cfgm.get_job(nsr_id)
+                        jobs = self.cfgm.get_job(nsr_id)
 
-                        # If no jobs are queued for the NSR
-                        if job is None:
-                            continue
-
-                        xact_info.respond_xpath(
-                            rwdts.XactRspCode.MORE,
-                            CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.job_id),
-                            job)
+                        for job in jobs:
+                            xact_info.respond_xpath(
+                                rwdts.XactRspCode.MORE,
+                                CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id),
+                                job.job)
 
                 except Exception as e:
                     self._log.exception("Caught exception:%s", str(e))
@@ -702,11 +747,74 @@ class CfgAgentJobDtsHandler(object):
 
         hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
         with self._dts.group_create() as group:
-            self._regh = group.register(xpath=CfgAgentJobDtsHandler.XPATH,
+            self._regh = group.register(xpath=self._project.add_project(
+                CfgAgentJobDtsHandler.XPATH),
                                         handler=hdl,
                                         flags=rwdts.Flag.PUBLISHER,
                                         )
 
+    @asyncio.coroutine
+    def _terminate_nsr(self, nsr_id):
+        self._log.debug("NSR {} being terminated".format(nsr_id))
+        jobs = self.cfgm.get_job(nsr_id)
+        for job in jobs:
+            path = CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id)
+            with self._dts.transaction() as xact:
+                self._log.debug("Deleting job: {}".format(path))
+                job.regh.delete_element(path)
+                self._log.debug("Deleted job: {}".format(path))
+
+        # Remove the NSR id in manager
+        self.cfgm.del_nsr(nsr_id)
+
+    @property
+    def nsr_xpath(self):
+        return self._project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr")
+
+    @asyncio.coroutine
+    def register_for_nsr(self):
+        """ Register for NSR changes """
+
+        @asyncio.coroutine
+        def on_prepare(xact_info, query_action, ks_path, msg):
+            """ This NSR is created """
+            self._log.debug("Received NSR instantiate on_prepare (%s:%s:%s)",
+                            query_action,
+                            ks_path,
+                            msg)
+
+            if (query_action == rwdts.QueryAction.UPDATE or
+                query_action == rwdts.QueryAction.CREATE):
+                pass
+            elif query_action == rwdts.QueryAction.DELETE:
+                nsr_id = msg.ns_instance_config_ref
+                asyncio.ensure_future(self._terminate_nsr(nsr_id), loop=self._loop)
+            else:
+                raise NotImplementedError(
+                    "%s action on cm-state not supported",
+                    query_action)
+
+            xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+
+        try:
+            handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
+            self._nsr_regh = yield from self._dts.register(self.nsr_xpath,
+                                                          flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
+                                                          handler=handler)
+        except Exception as e:
+            self._log.error("Failed to register for NSR changes as %s", str(e))
+
+    def deregister(self):
+        self._log.debug("De-register config agent job for project".
+                        format(self._project.name))
+        if self._regh:
+            self._regh.deregister()
+            self._regh = None
+
+        if self._nsr_regh:
+            self._nsr_regh.deregister()
+            self._nsr_regh = None
+
 
 class ConfigAgentJobManager(object):
     """A central class that manager all the Config Agent related data,
@@ -714,7 +822,7 @@ class ConfigAgentJobManager(object):
 
     TODO: Needs to support multiple config agents.
     """
-    def __init__(self, dts, log, loop, nsm):
+    def __init__(self, dts, log, loop, project, nsm):
         """
         Args:
             dts  : Dts handle
@@ -727,11 +835,12 @@ class ConfigAgentJobManager(object):
         self.log = log
         self.loop = loop
         self.nsm = nsm
+        self.project = project
         self.handler = CfgAgentJobDtsHandler(dts, log, loop, nsm, self)
         self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
 
     def add_job(self, rpc_output, tasks=None):
-        """Once an RPC is trigger add a now job
+        """Once an RPC is triggered, add a new job
 
         Args:
             rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): Rpc output
@@ -741,11 +850,17 @@ class ConfigAgentJobManager(object):
         """
         nsr_id = rpc_output.nsr_id_ref
 
-        self.jobs[nsr_id] = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, tasks)
+        job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output,
+                                                      tasks, self.project)
 
         self.log.debug("Creating a job monitor for Job id: {}".format(
                 rpc_output.job_id))
 
+        if nsr_id not in self.jobs:
+            self.jobs[nsr_id] = [job]
+        else:
+            self.jobs[nsr_id].append(job)
+
         # If the tasks are none, assume juju actions
         # TBD: This logic need to be revisited
         ca = self.nsm.config_agent_plugins[0]
@@ -759,7 +874,7 @@ class ConfigAgentJobManager(object):
         job_monitor = ConfigAgentJobMonitor(
             self.dts,
             self.log,
-            self.jobs[nsr_id],
+            job,
             self.executor,
             self.loop,
             ca
@@ -769,10 +884,19 @@ class ConfigAgentJobManager(object):
     def get_job(self, nsr_id):
         """Get the job associated with the NSR Id, if present."""
         try:
-            return self.jobs[nsr_id].job
+            return self.jobs[nsr_id]
         except KeyError:
-            return None
+            return []
+
+    def del_nsr(self, nsr_id):
+        """Delete a NSR id from the jobs list"""
+        if nsr_id in self.jobs:
+            self.jobs.pop(nsr_id)
 
     @asyncio.coroutine
     def register(self):
         yield from self.handler.register()
+        yield from self.handler.register_for_nsr()
+
+    def deregister(self):
+        yield from self.handler.deregister()