X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=common%2Fpython%2Frift%2Fmano%2Fconfig_agent%2Foperdata.py;h=5cbd35154d951b6429ab2fd45faa6df054e65e36;hb=refs%2Fchanges%2F35%2F1535%2F1;hp=b941667f715c39aa32a4a73c5f7b8e7a43fcf209;hpb=6f07e6f33f751ab4ffe624f6037f887b243bece2;p=osm%2FSO.git
diff --git a/common/python/rift/mano/config_agent/operdata.py b/common/python/rift/mano/config_agent/operdata.py
index b941667f..5cbd3515 100644
--- a/common/python/rift/mano/config_agent/operdata.py
+++ b/common/python/rift/mano/config_agent/operdata.py
@@ -27,7 +27,6 @@ from gi.repository import (
RwDts as rwdts)
import rift.tasklets
-
import rift.mano.utils.juju_api as juju
@@ -153,12 +152,15 @@ class ConfigAgentAccount(object):
)
class CfgAgentDtsOperdataHandler(object):
- def __init__(self, dts, log, loop):
+ def __init__(self, dts, log, loop, project):
self._dts = dts
self._log = log
self._loop = loop
+ self._project = project
self.cfg_agent_accounts = {}
+ self._show_reg = None
+ self._rpc_reg = None
def add_cfg_agent_account(self, account_msg):
account = ConfigAgentAccount(self._log, account_msg)
@@ -205,9 +207,10 @@ class CfgAgentDtsOperdataHandler(object):
for account in saved_accounts:
connection_status = account.connection_status
self._log.debug("Responding to config agent connection status request: %s", connection_status)
+ xpath = self._project.add_project(get_xpath(account.name))
xact_info.respond_xpath(
rwdts.XactRspCode.MORE,
- xpath=get_xpath(account.name),
+ xpath=xpath,
msg=account.connection_status,
)
except KeyError as e:
@@ -217,12 +220,13 @@ class CfgAgentDtsOperdataHandler(object):
xact_info.respond_xpath(rwdts.XactRspCode.ACK)
- yield from self._dts.register(
- xpath=get_xpath(),
- handler=rift.tasklets.DTS.RegistrationHandler(
- on_prepare=on_prepare),
- flags=rwdts.Flag.PUBLISHER,
- )
+ xpath = self._project.add_project(get_xpath())
+ self._show_reg = yield from self._dts.register(
+ xpath=xpath,
+ handler=rift.tasklets.DTS.RegistrationHandler(
+ on_prepare=on_prepare),
+ flags=rwdts.Flag.PUBLISHER,
+ )
def _register_validate_rpc(self):
def get_xpath():
@@ -234,6 +238,10 @@ class CfgAgentDtsOperdataHandler(object):
raise ConfigAgentAccountNotFound("Config Agent account name not provided")
cfg_agent_account_name = msg.cfg_agent_account
+
+ if not self._project.rpc_check(msg, xact_info=xact_info):
+ return
+
try:
account = self.cfg_agent_accounts[cfg_agent_account_name]
except KeyError:
@@ -243,24 +251,29 @@ class CfgAgentDtsOperdataHandler(object):
xact_info.respond_xpath(rwdts.XactRspCode.ACK)
- yield from self._dts.register(
- xpath=get_xpath(),
- handler=rift.tasklets.DTS.RegistrationHandler(
- on_prepare=on_prepare
- ),
- flags=rwdts.Flag.PUBLISHER,
- )
+ self._rpc_reg = yield from self._dts.register(
+ xpath=get_xpath(),
+ handler=rift.tasklets.DTS.RegistrationHandler(
+ on_prepare=on_prepare
+ ),
+ flags=rwdts.Flag.PUBLISHER,
+ )
@asyncio.coroutine
def register(self):
yield from self._register_show_status()
yield from self._register_validate_rpc()
+ def deregister(self):
+ self._show_reg.deregister()
+ self._rpc_reg.deregister()
+
+
class ConfigAgentJob(object):
"""A wrapper over the config agent job object, providing some
convenience functions.
- YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob contains
+ YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob contains
||
==> VNFRS
||
@@ -274,17 +287,20 @@ class ConfigAgentJob(object):
"running" : "pending",
"failed" : "failure"}
- def __init__(self, nsr_id, job, tasks=None):
+ def __init__(self, nsr_id, job, project, tasks=None):
"""
Args:
nsr_id (uuid): ID of NSR record
- job (YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
+ job (YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
tasks: List of asyncio.tasks. If provided the job monitor will
use it to monitor the tasks instead of the execution IDs
"""
self._job = job
self.nsr_id = nsr_id
self.tasks = tasks
+ self._project = project
+
+ self._regh = None
@property
def id(self):
@@ -314,15 +330,25 @@ class ConfigAgentJob(object):
@property
def xpath(self):
"""Xpath of the job"""
- return ("D,/nsr:ns-instance-opdata" +
+ return self._project.add_project(("D,/nsr:ns-instance-opdata" +
"/nsr:nsr[nsr:ns-instance-config-ref='{}']" +
"/nsr:config-agent-job[nsr:job-id='{}']"
- ).format(self.nsr_id, self.id)
+ ).format(self.nsr_id, self.id))
+
+ @property
+ def regh(self):
+ """Registration handle for the job"""
+ return self._regh
+
+ @regh.setter
+ def regh(self, hdl):
+ """Setter for registration handle"""
+ self._regh = hdl
@staticmethod
def convert_rpc_input_to_job(nsr_id, rpc_output, tasks):
"""A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive
- to YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
+ to YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
Args:
nsr_id (uuid): NSR ID
@@ -333,10 +359,10 @@ class ConfigAgentJob(object):
ConfigAgentJob
"""
# Shortcuts to prevent the HUUGE names.
- CfgAgentJob = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob
- CfgAgentVnfr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
- CfgAgentPrimitive = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
- CfgAgentPrimitiveParam = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
+ CfgAgentJob = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob
+ CfgAgentVnfr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
+ CfgAgentPrimitive = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
+ CfgAgentPrimitiveParam = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
job = CfgAgentJob.from_dict({
"job_id": rpc_output.job_id,
@@ -374,7 +400,7 @@ class ConfigAgentJob(object):
job.vnfr.append(vnfr_job)
- return ConfigAgentJob(nsr_id, job, tasks)
+ return ConfigAgentJob(nsr_id, job, project, tasks)
class ConfigAgentJobMonitor(object):
@@ -406,9 +432,25 @@ class ConfigAgentJobMonitor(object):
@asyncio.coroutine
def _monitor_processes(self, registration_handle):
result = 0
+ errs = ""
for process in self.job.tasks:
- rc = yield from process
- self.log.debug("Process {} returned rc: {}".format(process, rc))
+ if isinstance(process, asyncio.subprocess.Process):
+ rc = yield from process.wait()
+ err = yield from process.stderr.read()
+
+ else:
+ # Task instance
+ rc = yield from process
+ err = ''
+
+ self.log.debug("Process {} returned rc: {}, err: {}".
+ format(process, rc, err))
+
+ if len(err):
+ if rc == 0:
+ errs += "{}".format(err)
+ else:
+ errs += "{}".format(err)
result |= rc
if result == 0:
@@ -416,6 +458,9 @@ class ConfigAgentJobMonitor(object):
else:
self.job.job_status = "failure"
+ if len(errs):
+ self.job.job.job_status_details = errs
+
registration_handle.update_element(self.job.xpath, self.job.job)
def get_error_details(self):
@@ -428,7 +473,10 @@ class ConfigAgentJobMonitor(object):
for primitive in vnfr.primitive:
if primitive.execution_status == "failure":
errs += ''
- errs += primitive.execution_error_details
+ if primitive.execution_error_details:
+ errs += primitive.execution_error_details
+ else:
+ errs += '{}: Unknown error'.format(primitive.name)
errs += ""
return errs
@@ -445,6 +493,7 @@ class ConfigAgentJobMonitor(object):
)
self.log.debug('preparing to publish job status for {}'.format(self.job.xpath))
+ self.job.regh = registration_handle
try:
registration_handle.create_element(self.job.xpath, self.job.job)
@@ -516,13 +565,21 @@ class ConfigAgentJobMonitor(object):
job_status = []
for primitive in vnfr.primitive:
+ if primitive.execution_status != 'pending':
+ continue
+
if primitive.execution_id == "":
- # TODO: For some config data, the id will be empty, check if
- # mapping is needed.
+ # Actions which failed to queue can have empty id
job_status.append(primitive.execution_status)
continue
- task = self.loop.create_task(self.get_primitive_status(primitive))
+ elif primitive.execution_id == "config":
+ # Config job. Check if service is active
+ task = self.loop.create_task(self.get_service_status(vnfr.id, primitive))
+
+ else:
+ task = self.loop.create_task(self.get_primitive_status(primitive))
+
tasks.append(task)
if tasks:
@@ -541,6 +598,35 @@ class ConfigAgentJobMonitor(object):
vnfr.vnf_job_status = "success"
return "success"
+ @asyncio.coroutine
+ def get_service_status(self, vnfr_id, primitive):
+ try:
+ status = yield from self.loop.run_in_executor(
+ self.executor,
+ self.config_plugin.get_service_status,
+ vnfr_id
+ )
+
+ self.log.debug("Service status: {}".format(status))
+ if status in ['error', 'blocked']:
+ self.log.warning("Execution of config {} failed: {}".
+ format(primitive.execution_id, status))
+ primitive.execution_error_details = 'Config failed'
+ status = 'failure'
+ elif status in ['active']:
+ status = 'success'
+ elif status is None:
+ status = 'failure'
+ else:
+ status = 'pending'
+
+ except Exception as e:
+ self.log.exception(e)
+ status = "failed"
+
+ primitive.execution_status = status
+ return primitive.execution_status
+
@asyncio.coroutine
def get_primitive_status(self, primitive):
"""
@@ -557,6 +643,7 @@ class ConfigAgentJobMonitor(object):
primitive.execution_id
)
+ self.log.debug("Action status: {}".format(resp))
status = resp['status']
if status == 'failed':
self.log.warning("Execution of action {} failed: {}".
@@ -596,6 +683,8 @@ class CfgAgentJobDtsHandler(object):
self._nsm = nsm
self._regh = None
+ self._nsr_regh = None
+ self._project = cfgm.project
@property
def regh(self):
@@ -614,9 +703,9 @@ class CfgAgentJobDtsHandler(object):
@staticmethod
def cfg_job_xpath(nsr_id, job_id):
- return ("D,/nsr:ns-instance-opdata" +
+ return self._project.add_project(("D,/nsr:ns-instance-opdata" +
"/nsr:nsr[nsr:ns-instance-config-ref = '{}']" +
- "/nsr:config-agent-job[nsr:job-id='{}']").format(nsr_id, job_id)
+ "/nsr:config-agent-job[nsr:job-id='{}']").format(nsr_id, job_id))
@asyncio.coroutine
def register(self):
@@ -627,7 +716,7 @@ class CfgAgentJobDtsHandler(object):
""" prepare callback from dts """
xpath = ks_path.to_xpath(RwNsrYang.get_schema())
if action == rwdts.QueryAction.READ:
- schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.schema()
+ schema = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.schema()
path_entry = schema.keyspec_to_entry(ks_path)
try:
nsr_id = path_entry.key00.ns_instance_config_ref
@@ -641,16 +730,13 @@ class CfgAgentJobDtsHandler(object):
nsr_ids = [nsr_id]
for nsr_id in nsr_ids:
- job = self.cfgm.get_job(nsr_id)
+ jobs = self.cfgm.get_job(nsr_id)
- # If no jobs are queued for the NSR
- if job is None:
- continue
-
- xact_info.respond_xpath(
- rwdts.XactRspCode.MORE,
- CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.job_id),
- job)
+ for job in jobs:
+ xact_info.respond_xpath(
+ rwdts.XactRspCode.MORE,
+ CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id),
+ job.job)
except Exception as e:
self._log.exception("Caught exception:%s", str(e))
@@ -661,11 +747,74 @@ class CfgAgentJobDtsHandler(object):
hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
with self._dts.group_create() as group:
- self._regh = group.register(xpath=CfgAgentJobDtsHandler.XPATH,
+ self._regh = group.register(xpath=self._project.add_project(
+ CfgAgentJobDtsHandler.XPATH),
handler=hdl,
flags=rwdts.Flag.PUBLISHER,
)
+ @asyncio.coroutine
+ def _terminate_nsr(self, nsr_id):
+ self._log.debug("NSR {} being terminated".format(nsr_id))
+ jobs = self.cfgm.get_job(nsr_id)
+ for job in jobs:
+ path = CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id)
+ with self._dts.transaction() as xact:
+ self._log.debug("Deleting job: {}".format(path))
+ job.regh.delete_element(path)
+ self._log.debug("Deleted job: {}".format(path))
+
+ # Remove the NSR id in manager
+ self.cfgm.del_nsr(nsr_id)
+
+ @property
+ def nsr_xpath(self):
+ return self._project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr")
+
+ @asyncio.coroutine
+ def register_for_nsr(self):
+ """ Register for NSR changes """
+
+ @asyncio.coroutine
+ def on_prepare(xact_info, query_action, ks_path, msg):
+ """ This NSR is created """
+ self._log.debug("Received NSR instantiate on_prepare (%s:%s:%s)",
+ query_action,
+ ks_path,
+ msg)
+
+ if (query_action == rwdts.QueryAction.UPDATE or
+ query_action == rwdts.QueryAction.CREATE):
+ pass
+ elif query_action == rwdts.QueryAction.DELETE:
+ nsr_id = msg.ns_instance_config_ref
+ asyncio.ensure_future(self._terminate_nsr(nsr_id), loop=self._loop)
+ else:
+ raise NotImplementedError(
+ "%s action on cm-state not supported",
+ query_action)
+
+ xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+
+ try:
+ handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
+ self._nsr_regh = yield from self._dts.register(self.nsr_xpath,
+ flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
+ handler=handler)
+ except Exception as e:
+ self._log.error("Failed to register for NSR changes as %s", str(e))
+
+ def deregister(self):
+ self._log.debug("De-register config agent job for project".
+ format(self._project.name))
+ if self._regh:
+ self._regh.deregister()
+ self._regh = None
+
+ if self._nsr_regh:
+ self._nsr_regh.deregister()
+ self._nsr_regh = None
+
class ConfigAgentJobManager(object):
"""A central class that manager all the Config Agent related data,
@@ -673,7 +822,7 @@ class ConfigAgentJobManager(object):
TODO: Needs to support multiple config agents.
"""
- def __init__(self, dts, log, loop, nsm):
+ def __init__(self, dts, log, loop, project, nsm):
"""
Args:
dts : Dts handle
@@ -686,11 +835,12 @@ class ConfigAgentJobManager(object):
self.log = log
self.loop = loop
self.nsm = nsm
+ self.project = project
self.handler = CfgAgentJobDtsHandler(dts, log, loop, nsm, self)
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
def add_job(self, rpc_output, tasks=None):
- """Once an RPC is trigger add a now job
+ """Once an RPC is triggered, add a new job
Args:
rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): Rpc output
@@ -700,29 +850,53 @@ class ConfigAgentJobManager(object):
"""
nsr_id = rpc_output.nsr_id_ref
- self.jobs[nsr_id] = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, tasks)
+ job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output,
+ tasks, self.project)
self.log.debug("Creating a job monitor for Job id: {}".format(
rpc_output.job_id))
+ if nsr_id not in self.jobs:
+ self.jobs[nsr_id] = [job]
+ else:
+ self.jobs[nsr_id].append(job)
+
+ # If the tasks are none, assume juju actions
+ # TBD: This logic need to be revisited
+ ca = self.nsm.config_agent_plugins[0]
+ if tasks is None:
+ for agent in self.nsm.config_agent_plugins:
+ if agent.agent_type == 'juju':
+ ca = agent
+ break
+
# For every Job we will schedule a new monitoring process.
job_monitor = ConfigAgentJobMonitor(
self.dts,
self.log,
- self.jobs[nsr_id],
+ job,
self.executor,
self.loop,
- self.nsm.config_agent_plugins[0] # Hack
+ ca
)
task = self.loop.create_task(job_monitor.publish_action_status())
def get_job(self, nsr_id):
"""Get the job associated with the NSR Id, if present."""
try:
- return self.jobs[nsr_id].job
+ return self.jobs[nsr_id]
except KeyError:
- return None
+ return []
+
+ def del_nsr(self, nsr_id):
+ """Delete a NSR id from the jobs list"""
+ if nsr_id in self.jobs:
+ self.jobs.pop(nsr_id)
@asyncio.coroutine
def register(self):
yield from self.handler.register()
+ yield from self.handler.register_for_nsr()
+
+ def deregister(self):
+ yield from self.handler.deregister()