X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=common%2Fpython%2Frift%2Fmano%2Fconfig_agent%2Foperdata.py;h=fbf3c43a223095eba4c8f3e5e1149031b8e243e2;hb=refs%2Fchanges%2F42%2F1342%2F1;hp=b941667f715c39aa32a4a73c5f7b8e7a43fcf209;hpb=6f07e6f33f751ab4ffe624f6037f887b243bece2;p=osm%2FSO.git diff --git a/common/python/rift/mano/config_agent/operdata.py b/common/python/rift/mano/config_agent/operdata.py index b941667f..fbf3c43a 100644 --- a/common/python/rift/mano/config_agent/operdata.py +++ b/common/python/rift/mano/config_agent/operdata.py @@ -285,6 +285,7 @@ class ConfigAgentJob(object): self._job = job self.nsr_id = nsr_id self.tasks = tasks + self._regh = None @property def id(self): @@ -319,6 +320,16 @@ class ConfigAgentJob(object): "/nsr:config-agent-job[nsr:job-id='{}']" ).format(self.nsr_id, self.id) + @property + def regh(self): + """Registration handle for the job""" + return self._regh + + @regh.setter + def regh(self, hdl): + """Setter for registration handle""" + self._regh = hdl + @staticmethod def convert_rpc_input_to_job(nsr_id, rpc_output, tasks): """A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive @@ -406,9 +417,25 @@ class ConfigAgentJobMonitor(object): @asyncio.coroutine def _monitor_processes(self, registration_handle): result = 0 + errs = "" for process in self.job.tasks: - rc = yield from process - self.log.debug("Process {} returned rc: {}".format(process, rc)) + if isinstance(process, asyncio.subprocess.Process): + rc = yield from process.wait() + err = yield from process.stderr.read() + + else: + # Task instance + rc = yield from process + err = '' + + self.log.debug("Process {} returned rc: {}, err: {}". + format(process, rc, err)) + + if len(err): + if rc == 0: + errs += "{}".format(err) + else: + errs += "{}".format(err) result |= rc if result == 0: @@ -416,6 +443,9 @@ class ConfigAgentJobMonitor(object): else: self.job.job_status = "failure" + if len(errs): + self.job.job.job_status_details = errs + registration_handle.update_element(self.job.xpath, self.job.job) def get_error_details(self): @@ -428,7 +458,10 @@ class ConfigAgentJobMonitor(object): for primitive in vnfr.primitive: if primitive.execution_status == "failure": errs += '' - errs += primitive.execution_error_details + if primitive.execution_error_details: + errs += primitive.execution_error_details + else: + errs += '{}: Unknown error'.format(primitive.name) errs += "" return errs @@ -445,6 +478,7 @@ class ConfigAgentJobMonitor(object): ) self.log.debug('preparing to publish job status for {}'.format(self.job.xpath)) + self.job.regh = registration_handle try: registration_handle.create_element(self.job.xpath, self.job.job) @@ -516,13 +550,21 @@ class ConfigAgentJobMonitor(object): job_status = [] for primitive in vnfr.primitive: + if primitive.execution_status != 'pending': + continue + if primitive.execution_id == "": - # TODO: For some config data, the id will be empty, check if - # mapping is needed. + # Actions which failed to queue can have empty id job_status.append(primitive.execution_status) continue - task = self.loop.create_task(self.get_primitive_status(primitive)) + elif primitive.execution_id == "config": + # Config job. Check if service is active + task = self.loop.create_task(self.get_service_status(vnfr.id, primitive)) + + else: + task = self.loop.create_task(self.get_primitive_status(primitive)) + tasks.append(task) if tasks: @@ -541,6 +583,35 @@ class ConfigAgentJobMonitor(object): vnfr.vnf_job_status = "success" return "success" + @asyncio.coroutine + def get_service_status(self, vnfr_id, primitive): + try: + status = yield from self.loop.run_in_executor( + self.executor, + self.config_plugin.get_service_status, + vnfr_id + ) + + self.log.debug("Service status: {}".format(status)) + if status in ['error', 'blocked']: + self.log.warning("Execution of config {} failed: {}". + format(primitive.execution_id, status)) + primitive.execution_error_details = 'Config failed' + status = 'failure' + elif status in ['active']: + status = 'success' + elif status is None: + status = 'failure' + else: + status = 'pending' + + except Exception as e: + self.log.exception(e) + status = "failed" + + primitive.execution_status = status + return primitive.execution_status + @asyncio.coroutine def get_primitive_status(self, primitive): """ @@ -557,6 +628,7 @@ class ConfigAgentJobMonitor(object): primitive.execution_id ) + self.log.debug("Action status: {}".format(resp)) status = resp['status'] if status == 'failed': self.log.warning("Execution of action {} failed: {}". @@ -596,6 +668,7 @@ class CfgAgentJobDtsHandler(object): self._nsm = nsm self._regh = None + self._nsr_regh = None @property def regh(self): @@ -641,16 +714,13 @@ class CfgAgentJobDtsHandler(object): nsr_ids = [nsr_id] for nsr_id in nsr_ids: - job = self.cfgm.get_job(nsr_id) - - # If no jobs are queued for the NSR - if job is None: - continue + jobs = self.cfgm.get_job(nsr_id) - xact_info.respond_xpath( - rwdts.XactRspCode.MORE, - CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.job_id), - job) + for job in jobs: + xact_info.respond_xpath( + rwdts.XactRspCode.MORE, + CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id), + job.job) except Exception as e: self._log.exception("Caught exception:%s", str(e)) @@ -666,6 +736,57 @@ class CfgAgentJobDtsHandler(object): flags=rwdts.Flag.PUBLISHER, ) + @asyncio.coroutine + def _terminate_nsr(self, nsr_id): + self._log.debug("NSR {} being terminated".format(nsr_id)) + jobs = self.cfgm.get_job(nsr_id) + for job in jobs: + path = CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id) + with self._dts.transaction() as xact: + self._log.debug("Deleting job: {}".format(path)) + job.regh.delete_element(path) + self._log.debug("Deleted job: {}".format(path)) + + # Remove the NSR id in manager + self.cfgm.del_nsr(nsr_id) + + @property + def nsr_xpath(self): + return "D,/nsr:ns-instance-opdata/nsr:nsr" + + @asyncio.coroutine + def register_for_nsr(self): + """ Register for NSR changes """ + + @asyncio.coroutine + def on_prepare(xact_info, query_action, ks_path, msg): + """ This NSR is created """ + self._log.debug("Received NSR instantiate on_prepare (%s:%s:%s)", + query_action, + ks_path, + msg) + + if (query_action == rwdts.QueryAction.UPDATE or + query_action == rwdts.QueryAction.CREATE): + pass + elif query_action == rwdts.QueryAction.DELETE: + nsr_id = msg.ns_instance_config_ref + asyncio.ensure_future(self._terminate_nsr(nsr_id), loop=self._loop) + else: + raise NotImplementedError( + "%s action on cm-state not supported", + query_action) + + xact_info.respond_xpath(rwdts.XactRspCode.ACK) + + try: + handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare) + self._nsr_regh = yield from self._dts.register(self.nsr_xpath, + flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY, + handler=handler) + except Exception as e: + self._log.error("Failed to register for NSR changes as %s", str(e)) + class ConfigAgentJobManager(object): """A central class that manager all the Config Agent related data, @@ -700,29 +821,49 @@ class ConfigAgentJobManager(object): """ nsr_id = rpc_output.nsr_id_ref - self.jobs[nsr_id] = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, tasks) + job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, tasks) self.log.debug("Creating a job monitor for Job id: {}".format( rpc_output.job_id)) + if nsr_id not in self.jobs: + self.jobs[nsr_id] = [job] + else: + self.jobs[nsr_id].append(job) + + # If the tasks are none, assume juju actions + # TBD: This logic need to be revisited + ca = self.nsm.config_agent_plugins[0] + if tasks is None: + for agent in self.nsm.config_agent_plugins: + if agent.agent_type == 'juju': + ca = agent + break + # For every Job we will schedule a new monitoring process. job_monitor = ConfigAgentJobMonitor( self.dts, self.log, - self.jobs[nsr_id], + job, self.executor, self.loop, - self.nsm.config_agent_plugins[0] # Hack + ca ) task = self.loop.create_task(job_monitor.publish_action_status()) def get_job(self, nsr_id): """Get the job associated with the NSR Id, if present.""" try: - return self.jobs[nsr_id].job + return self.jobs[nsr_id] except KeyError: - return None + return [] + + def del_nsr(self, nsr_id): + """Delete a NSR id from the jobs list""" + if nsr_id in self.jobs: + self.jobs.pop(nsr_id) @asyncio.coroutine def register(self): yield from self.handler.register() + yield from self.handler.register_for_nsr()