Change tag based on job status
[osm/SO.git] / common / python / rift / mano / config_agent / operdata.py
index 6bbf7f2..fbf3c43 100644 (file)
@@ -285,6 +285,7 @@ class ConfigAgentJob(object):
         self._job = job
         self.nsr_id = nsr_id
         self.tasks = tasks
+        self._regh = None
 
     @property
     def id(self):
@@ -319,6 +320,16 @@ class ConfigAgentJob(object):
                 "/nsr:config-agent-job[nsr:job-id='{}']"
                 ).format(self.nsr_id, self.id)
 
+    @property
+    def regh(self):
+        """Registration handle for the job"""
+        return self._regh
+
+    @regh.setter
+    def regh(self, hdl):
+        """Setter for registration handle"""
+        self._regh = hdl
+
     @staticmethod
     def convert_rpc_input_to_job(nsr_id, rpc_output, tasks):
         """A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive
@@ -406,9 +417,25 @@ class ConfigAgentJobMonitor(object):
     @asyncio.coroutine
     def _monitor_processes(self, registration_handle):
         result = 0
+        errs = ""
         for process in self.job.tasks:
-            rc = yield from process
-            self.log.debug("Process {} returned rc: {}".format(process, rc))
+            if isinstance(process, asyncio.subprocess.Process):
+                rc = yield from process.wait()
+                err = yield from process.stderr.read()
+
+            else:
+                # Task instance
+                rc = yield from process
+                err = ''
+
+            self.log.debug("Process {} returned rc: {}, err: {}".
+                           format(process, rc, err))
+
+            if len(err):
+                if rc == 0:
+                    errs += "<success>{}</success>".format(err)
+                else:
+                    errs += "<error>{}</error>".format(err)
             result |= rc
 
         if result == 0:
@@ -416,6 +443,9 @@ class ConfigAgentJobMonitor(object):
         else:
             self.job.job_status = "failure"
 
+        if len(errs):
+            self.job.job.job_status_details = errs
+
         registration_handle.update_element(self.job.xpath, self.job.job)
 
     def get_error_details(self):
@@ -428,7 +458,10 @@ class ConfigAgentJobMonitor(object):
             for primitive in vnfr.primitive:
                 if primitive.execution_status == "failure":
                     errs += '<error>'
-                    errs += primitive.execution_error_details
+                    if primitive.execution_error_details:
+                        errs += primitive.execution_error_details
+                    else:
+                        errs += '{}: Unknown error'.format(primitive.name)
                     errs += "</error>"
 
         return errs
@@ -445,6 +478,7 @@ class ConfigAgentJobMonitor(object):
                 )
 
         self.log.debug('preparing to publish job status for {}'.format(self.job.xpath))
+        self.job.regh = registration_handle
 
         try:
             registration_handle.create_element(self.job.xpath, self.job.job)
@@ -516,13 +550,21 @@ class ConfigAgentJobMonitor(object):
         job_status = []
 
         for primitive in vnfr.primitive:
+            if primitive.execution_status != 'pending':
+                continue
+
             if primitive.execution_id == "":
-                # TODO: For some config data, the id will be empty, check if
-                # mapping is needed.
+                # Actions which failed to queue can have empty id
                 job_status.append(primitive.execution_status)
                 continue
 
-            task = self.loop.create_task(self.get_primitive_status(primitive))
+            elif primitive.execution_id == "config":
+                # Config job. Check if service is active
+                task = self.loop.create_task(self.get_service_status(vnfr.id, primitive))
+
+            else:
+                task = self.loop.create_task(self.get_primitive_status(primitive))
+
             tasks.append(task)
 
         if tasks:
@@ -541,6 +583,35 @@ class ConfigAgentJobMonitor(object):
             vnfr.vnf_job_status = "success"
             return "success"
 
+    @asyncio.coroutine
+    def get_service_status(self, vnfr_id, primitive):
+        try:
+            status = yield from self.loop.run_in_executor(
+                self.executor,
+                self.config_plugin.get_service_status,
+                vnfr_id
+            )
+
+            self.log.debug("Service status: {}".format(status))
+            if status in ['error', 'blocked']:
+                self.log.warning("Execution of config {} failed: {}".
+                                 format(primitive.execution_id, status))
+                primitive.execution_error_details = 'Config failed'
+                status = 'failure'
+            elif status in ['active']:
+                status = 'success'
+            elif status is None:
+                status = 'failure'
+            else:
+                status = 'pending'
+
+        except Exception as e:
+            self.log.exception(e)
+            status = "failed"
+
+        primitive.execution_status = status
+        return primitive.execution_status
+
     @asyncio.coroutine
     def get_primitive_status(self, primitive):
         """
@@ -597,6 +668,7 @@ class CfgAgentJobDtsHandler(object):
         self._nsm = nsm
 
         self._regh = None
+        self._nsr_regh = None
 
     @property
     def regh(self):
@@ -642,16 +714,13 @@ class CfgAgentJobDtsHandler(object):
                         nsr_ids = [nsr_id]
 
                     for nsr_id in nsr_ids:
-                        job = self.cfgm.get_job(nsr_id)
-
-                        # If no jobs are queued for the NSR
-                        if job is None:
-                            continue
+                        jobs = self.cfgm.get_job(nsr_id)
 
-                        xact_info.respond_xpath(
-                            rwdts.XactRspCode.MORE,
-                            CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.job_id),
-                            job)
+                        for job in jobs:
+                            xact_info.respond_xpath(
+                                rwdts.XactRspCode.MORE,
+                                CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id),
+                                job.job)
 
                 except Exception as e:
                     self._log.exception("Caught exception:%s", str(e))
@@ -667,6 +736,57 @@ class CfgAgentJobDtsHandler(object):
                                         flags=rwdts.Flag.PUBLISHER,
                                         )
 
+    @asyncio.coroutine
+    def _terminate_nsr(self, nsr_id):
+        self._log.debug("NSR {} being terminated".format(nsr_id))
+        jobs = self.cfgm.get_job(nsr_id)
+        for job in jobs:
+            path = CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id)
+            with self._dts.transaction() as xact:
+                self._log.debug("Deleting job: {}".format(path))
+                job.regh.delete_element(path)
+                self._log.debug("Deleted job: {}".format(path))
+
+        # Remove the NSR id in manager
+        self.cfgm.del_nsr(nsr_id)
+
+    @property
+    def nsr_xpath(self):
+        return "D,/nsr:ns-instance-opdata/nsr:nsr"
+
+    @asyncio.coroutine
+    def register_for_nsr(self):
+        """ Register for NSR changes """
+
+        @asyncio.coroutine
+        def on_prepare(xact_info, query_action, ks_path, msg):
+            """ This NSR is created """
+            self._log.debug("Received NSR instantiate on_prepare (%s:%s:%s)",
+                            query_action,
+                            ks_path,
+                            msg)
+
+            if (query_action == rwdts.QueryAction.UPDATE or
+                query_action == rwdts.QueryAction.CREATE):
+                pass
+            elif query_action == rwdts.QueryAction.DELETE:
+                nsr_id = msg.ns_instance_config_ref
+                asyncio.ensure_future(self._terminate_nsr(nsr_id), loop=self._loop)
+            else:
+                raise NotImplementedError(
+                    "%s action on cm-state not supported",
+                    query_action)
+
+            xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+
+        try:
+            handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
+            self._nsr_regh = yield from self._dts.register(self.nsr_xpath,
+                                                          flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
+                                                          handler=handler)
+        except Exception as e:
+            self._log.error("Failed to register for NSR changes as %s", str(e))
+
 
 class ConfigAgentJobManager(object):
     """A central class that manager all the Config Agent related data,
@@ -701,29 +821,49 @@ class ConfigAgentJobManager(object):
         """
         nsr_id = rpc_output.nsr_id_ref
 
-        self.jobs[nsr_id] = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, tasks)
+        job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, tasks)
 
         self.log.debug("Creating a job monitor for Job id: {}".format(
                 rpc_output.job_id))
 
+        if nsr_id not in self.jobs:
+            self.jobs[nsr_id] = [job]
+        else:
+            self.jobs[nsr_id].append(job)
+
+        # If the tasks are none, assume juju actions
+        # TBD: This logic need to be revisited
+        ca = self.nsm.config_agent_plugins[0]
+        if tasks is None:
+            for agent in self.nsm.config_agent_plugins:
+                if agent.agent_type == 'juju':
+                    ca = agent
+                    break
+
         # For every Job we will schedule a new monitoring process.
         job_monitor = ConfigAgentJobMonitor(
             self.dts,
             self.log,
-            self.jobs[nsr_id],
+            job,
             self.executor,
             self.loop,
-            self.nsm.config_agent_plugins[0]  # Hack
+            ca
             )
         task = self.loop.create_task(job_monitor.publish_action_status())
 
     def get_job(self, nsr_id):
         """Get the job associated with the NSR Id, if present."""
         try:
-            return self.jobs[nsr_id].job
+            return self.jobs[nsr_id]
         except KeyError:
-            return None
+            return []
+
+    def del_nsr(self, nsr_id):
+        """Delete a NSR id from the jobs list"""
+        if nsr_id in self.jobs:
+            self.jobs.pop(nsr_id)
 
     @asyncio.coroutine
     def register(self):
         yield from self.handler.register()
+        yield from self.handler.register_for_nsr()