X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=common%2Fpython%2Frift%2Fmano%2Fconfig_agent%2Foperdata.py;h=5cbd35154d951b6429ab2fd45faa6df054e65e36;hb=refs%2Fchanges%2F35%2F1535%2F1;hp=fbf3c43a223095eba4c8f3e5e1149031b8e243e2;hpb=34c4454c8d6fbc83e83319e685790559432e4c54;p=osm%2FSO.git diff --git a/common/python/rift/mano/config_agent/operdata.py b/common/python/rift/mano/config_agent/operdata.py index fbf3c43a..5cbd3515 100644 --- a/common/python/rift/mano/config_agent/operdata.py +++ b/common/python/rift/mano/config_agent/operdata.py @@ -27,7 +27,6 @@ from gi.repository import ( RwDts as rwdts) import rift.tasklets - import rift.mano.utils.juju_api as juju @@ -153,12 +152,15 @@ class ConfigAgentAccount(object): ) class CfgAgentDtsOperdataHandler(object): - def __init__(self, dts, log, loop): + def __init__(self, dts, log, loop, project): self._dts = dts self._log = log self._loop = loop + self._project = project self.cfg_agent_accounts = {} + self._show_reg = None + self._rpc_reg = None def add_cfg_agent_account(self, account_msg): account = ConfigAgentAccount(self._log, account_msg) @@ -205,9 +207,10 @@ class CfgAgentDtsOperdataHandler(object): for account in saved_accounts: connection_status = account.connection_status self._log.debug("Responding to config agent connection status request: %s", connection_status) + xpath = self._project.add_project(get_xpath(account.name)) xact_info.respond_xpath( rwdts.XactRspCode.MORE, - xpath=get_xpath(account.name), + xpath=xpath, msg=account.connection_status, ) except KeyError as e: @@ -217,12 +220,13 @@ class CfgAgentDtsOperdataHandler(object): xact_info.respond_xpath(rwdts.XactRspCode.ACK) - yield from self._dts.register( - xpath=get_xpath(), - handler=rift.tasklets.DTS.RegistrationHandler( - on_prepare=on_prepare), - flags=rwdts.Flag.PUBLISHER, - ) + xpath = self._project.add_project(get_xpath()) + self._show_reg = yield from self._dts.register( + xpath=xpath, + handler=rift.tasklets.DTS.RegistrationHandler( + on_prepare=on_prepare), + flags=rwdts.Flag.PUBLISHER, + ) def _register_validate_rpc(self): def get_xpath(): @@ -234,6 +238,10 @@ class CfgAgentDtsOperdataHandler(object): raise ConfigAgentAccountNotFound("Config Agent account name not provided") cfg_agent_account_name = msg.cfg_agent_account + + if not self._project.rpc_check(msg, xact_info=xact_info): + return + try: account = self.cfg_agent_accounts[cfg_agent_account_name] except KeyError: @@ -243,24 +251,29 @@ class CfgAgentDtsOperdataHandler(object): xact_info.respond_xpath(rwdts.XactRspCode.ACK) - yield from self._dts.register( - xpath=get_xpath(), - handler=rift.tasklets.DTS.RegistrationHandler( - on_prepare=on_prepare - ), - flags=rwdts.Flag.PUBLISHER, - ) + self._rpc_reg = yield from self._dts.register( + xpath=get_xpath(), + handler=rift.tasklets.DTS.RegistrationHandler( + on_prepare=on_prepare + ), + flags=rwdts.Flag.PUBLISHER, + ) @asyncio.coroutine def register(self): yield from self._register_show_status() yield from self._register_validate_rpc() + def deregister(self): + self._show_reg.deregister() + self._rpc_reg.deregister() + + class ConfigAgentJob(object): """A wrapper over the config agent job object, providing some convenience functions. - YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob contains + YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob contains || ==> VNFRS || @@ -274,17 +287,19 @@ class ConfigAgentJob(object): "running" : "pending", "failed" : "failure"} - def __init__(self, nsr_id, job, tasks=None): + def __init__(self, nsr_id, job, project, tasks=None): """ Args: nsr_id (uuid): ID of NSR record - job (YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object + job (YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object tasks: List of asyncio.tasks. If provided the job monitor will use it to monitor the tasks instead of the execution IDs """ self._job = job self.nsr_id = nsr_id self.tasks = tasks + self._project = project + self._regh = None @property @@ -315,10 +330,10 @@ class ConfigAgentJob(object): @property def xpath(self): """Xpath of the job""" - return ("D,/nsr:ns-instance-opdata" + + return self._project.add_project(("D,/nsr:ns-instance-opdata" + "/nsr:nsr[nsr:ns-instance-config-ref='{}']" + "/nsr:config-agent-job[nsr:job-id='{}']" - ).format(self.nsr_id, self.id) + ).format(self.nsr_id, self.id)) @property def regh(self): @@ -333,7 +348,7 @@ class ConfigAgentJob(object): @staticmethod def convert_rpc_input_to_job(nsr_id, rpc_output, tasks): """A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive - to YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang) + to YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang) Args: nsr_id (uuid): NSR ID @@ -344,10 +359,10 @@ class ConfigAgentJob(object): ConfigAgentJob """ # Shortcuts to prevent the HUUGE names. - CfgAgentJob = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob - CfgAgentVnfr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr - CfgAgentPrimitive = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive - CfgAgentPrimitiveParam = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter + CfgAgentJob = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob + CfgAgentVnfr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr + CfgAgentPrimitive = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive + CfgAgentPrimitiveParam = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter job = CfgAgentJob.from_dict({ "job_id": rpc_output.job_id, @@ -385,7 +400,7 @@ class ConfigAgentJob(object): job.vnfr.append(vnfr_job) - return ConfigAgentJob(nsr_id, job, tasks) + return ConfigAgentJob(nsr_id, job, project, tasks) class ConfigAgentJobMonitor(object): @@ -669,6 +684,7 @@ class CfgAgentJobDtsHandler(object): self._regh = None self._nsr_regh = None + self._project = cfgm.project @property def regh(self): @@ -687,9 +703,9 @@ class CfgAgentJobDtsHandler(object): @staticmethod def cfg_job_xpath(nsr_id, job_id): - return ("D,/nsr:ns-instance-opdata" + + return self._project.add_project(("D,/nsr:ns-instance-opdata" + "/nsr:nsr[nsr:ns-instance-config-ref = '{}']" + - "/nsr:config-agent-job[nsr:job-id='{}']").format(nsr_id, job_id) + "/nsr:config-agent-job[nsr:job-id='{}']").format(nsr_id, job_id)) @asyncio.coroutine def register(self): @@ -700,7 +716,7 @@ class CfgAgentJobDtsHandler(object): """ prepare callback from dts """ xpath = ks_path.to_xpath(RwNsrYang.get_schema()) if action == rwdts.QueryAction.READ: - schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.schema() + schema = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.schema() path_entry = schema.keyspec_to_entry(ks_path) try: nsr_id = path_entry.key00.ns_instance_config_ref @@ -731,7 +747,8 @@ class CfgAgentJobDtsHandler(object): hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,) with self._dts.group_create() as group: - self._regh = group.register(xpath=CfgAgentJobDtsHandler.XPATH, + self._regh = group.register(xpath=self._project.add_project( + CfgAgentJobDtsHandler.XPATH), handler=hdl, flags=rwdts.Flag.PUBLISHER, ) @@ -752,7 +769,7 @@ class CfgAgentJobDtsHandler(object): @property def nsr_xpath(self): - return "D,/nsr:ns-instance-opdata/nsr:nsr" + return self._project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr") @asyncio.coroutine def register_for_nsr(self): @@ -787,6 +804,17 @@ class CfgAgentJobDtsHandler(object): except Exception as e: self._log.error("Failed to register for NSR changes as %s", str(e)) + def deregister(self): + self._log.debug("De-register config agent job for project". + format(self._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None + + if self._nsr_regh: + self._nsr_regh.deregister() + self._nsr_regh = None + class ConfigAgentJobManager(object): """A central class that manager all the Config Agent related data, @@ -794,7 +822,7 @@ class ConfigAgentJobManager(object): TODO: Needs to support multiple config agents. """ - def __init__(self, dts, log, loop, nsm): + def __init__(self, dts, log, loop, project, nsm): """ Args: dts : Dts handle @@ -807,11 +835,12 @@ class ConfigAgentJobManager(object): self.log = log self.loop = loop self.nsm = nsm + self.project = project self.handler = CfgAgentJobDtsHandler(dts, log, loop, nsm, self) self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) def add_job(self, rpc_output, tasks=None): - """Once an RPC is trigger add a now job + """Once an RPC is triggered, add a new job Args: rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): Rpc output @@ -821,7 +850,8 @@ class ConfigAgentJobManager(object): """ nsr_id = rpc_output.nsr_id_ref - job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, tasks) + job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, + tasks, self.project) self.log.debug("Creating a job monitor for Job id: {}".format( rpc_output.job_id)) @@ -867,3 +897,6 @@ class ConfigAgentJobManager(object): def register(self): yield from self.handler.register() yield from self.handler.register_for_nsr() + + def deregister(self): + yield from self.handler.deregister()