X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=common%2Fpython%2Frift%2Fmano%2Fconfig_agent%2Foperdata.py;h=94e9976419d3d69d373bc46195820b3a47d5c5cb;hb=95bd37e7dc1ccc3a18be8ab21d703c6e405fb824;hp=729a1f1c4c239f3f976e98496316e00c63d492ac;hpb=0f822755d5b1cc1c5e9816068710ca82851904a4;p=osm%2FSO.git
diff --git a/common/python/rift/mano/config_agent/operdata.py b/common/python/rift/mano/config_agent/operdata.py
index 729a1f1c..94e99764 100644
--- a/common/python/rift/mano/config_agent/operdata.py
+++ b/common/python/rift/mano/config_agent/operdata.py
@@ -1,4 +1,4 @@
-#
+#
# Copyright 2016 RIFT.IO Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -17,6 +17,9 @@
import asyncio
import concurrent.futures
import time
+import gi
+
+gi.require_version('RwNsrYang', '1.0')
from gi.repository import (
NsrYang,
@@ -25,15 +28,17 @@ from gi.repository import (
RwNsrYang,
RwConfigAgentYang,
RwDts as rwdts)
+gi.require_version('RwKeyspec', '1.0')
+from gi.repository.RwKeyspec import quoted_key
import rift.tasklets
-
import rift.mano.utils.juju_api as juju
class ConfigAgentAccountNotFound(Exception):
pass
+
class JujuClient(object):
def __init__(self, log, ip, port, user, passwd):
self._log = log
@@ -46,29 +51,43 @@ class JujuClient(object):
server=ip, port=port,
user=user, secret=passwd)
-
def validate_account_creds(self):
- status = RwcalYang.CloudConnectionStatus()
+ """Validate the account credentials.
+
+ Verifies if the account credentials can connect and login to a Juju
+ controller at the provided IP address.
+ """
+ status = "unknown"
+ details = "Connection status not known."
+
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
try:
- env = self._api._get_env()
- except juju.JujuEnvError as e:
- msg = "JujuClient: Invalid account credentials: %s", str(e)
- self._log.error(msg)
- raise Exception(msg)
- except ConnectionRefusedError as e:
- msg = "JujuClient: Wrong IP or Port: %s", str(e)
- self._log.error(msg)
- raise Exception(msg)
+ loop.run_until_complete(asyncio.gather(
+ self._api.logout(),
+ self._api.login(),
+ loop=loop,
+ ))
except Exception as e:
+ loop.close()
+
msg = "JujuClient: Connection Failed: %s", str(e)
self._log.error(msg)
+ status = "failure"
+ details = msg
raise Exception(msg)
else:
- status.status = "success"
- status.details = "Connection was successful"
+ self._log.error("Success reached.")
+ status = "success"
+ details = "Connection was successful"
self._log.info("JujuClient: Connection Successful")
+ finally:
+ loop.close()
- return status
+ return RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
+ status=status,
+ details=details,
+ )
class ConfigAgentAccount(object):
@@ -86,7 +105,7 @@ class ConfigAgentAccount(object):
else:
self._cfg_agent_client_plugin = None
- self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
+ self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
status="unknown",
details="Connection status lookup not started"
)
@@ -117,13 +136,13 @@ class ConfigAgentAccount(object):
def validate_cfg_agent_account_credentials(self, loop):
self._log.debug("Validating Config Agent Account %s, credential status %s", self._account_msg, self._status)
- self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
+ self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
status="validating",
details="Config Agent account connection validation in progress"
)
if self._cfg_agent_client_plugin is None:
- self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
+ self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
status="unknown",
details="Config Agent account does not support validation of account creds"
)
@@ -131,11 +150,11 @@ class ConfigAgentAccount(object):
try:
status = yield from loop.run_in_executor(
None,
- self._cfg_agent_client_plugin.validate_account_creds
+ self._cfg_agent_client_plugin.validate_account_creds,
)
- self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus.from_dict(status.as_dict())
+ self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus.from_dict(status.as_dict())
except Exception as e:
- self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
+ self._status = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account_ConnectionStatus(
status="failure",
details="Error - " + str(e)
)
@@ -153,12 +172,15 @@ class ConfigAgentAccount(object):
)
class CfgAgentDtsOperdataHandler(object):
- def __init__(self, dts, log, loop):
+ def __init__(self, dts, log, loop, project):
self._dts = dts
self._log = log
self._loop = loop
+ self._project = project
self.cfg_agent_accounts = {}
+ self._show_reg = None
+ self._rpc_reg = None
def add_cfg_agent_account(self, account_msg):
account = ConfigAgentAccount(self._log, account_msg)
@@ -191,12 +213,12 @@ class CfgAgentDtsOperdataHandler(object):
def _register_show_status(self):
def get_xpath(cfg_agent_name=None):
return "D,/rw-config-agent:config-agent/account{}/connection-status".format(
- "[name='%s']" % cfg_agent_name if cfg_agent_name is not None else ''
+ "[name=%s]" % quoted_key(cfg_agent_name) if cfg_agent_name is not None else ''
)
@asyncio.coroutine
def on_prepare(xact_info, action, ks_path, msg):
- path_entry = RwConfigAgentYang.ConfigAgentAccount.schema().keyspec_to_entry(ks_path)
+ path_entry = RwConfigAgentYang.YangData_RwProject_Project_ConfigAgent_Account.schema().keyspec_to_entry(ks_path)
cfg_agent_account_name = path_entry.key00.name
self._log.debug("Got show cfg_agent connection status request: %s", ks_path.create_string())
@@ -205,9 +227,10 @@ class CfgAgentDtsOperdataHandler(object):
for account in saved_accounts:
connection_status = account.connection_status
self._log.debug("Responding to config agent connection status request: %s", connection_status)
+ xpath = self._project.add_project(get_xpath(account.name))
xact_info.respond_xpath(
rwdts.XactRspCode.MORE,
- xpath=get_xpath(account.name),
+ xpath=xpath,
msg=account.connection_status,
)
except KeyError as e:
@@ -217,12 +240,13 @@ class CfgAgentDtsOperdataHandler(object):
xact_info.respond_xpath(rwdts.XactRspCode.ACK)
- yield from self._dts.register(
- xpath=get_xpath(),
- handler=rift.tasklets.DTS.RegistrationHandler(
- on_prepare=on_prepare),
- flags=rwdts.Flag.PUBLISHER,
- )
+ xpath = self._project.add_project(get_xpath())
+ self._show_reg = yield from self._dts.register(
+ xpath=xpath,
+ handler=rift.tasklets.DTS.RegistrationHandler(
+ on_prepare=on_prepare),
+ flags=rwdts.Flag.PUBLISHER,
+ )
def _register_validate_rpc(self):
def get_xpath():
@@ -234,6 +258,10 @@ class CfgAgentDtsOperdataHandler(object):
raise ConfigAgentAccountNotFound("Config Agent account name not provided")
cfg_agent_account_name = msg.cfg_agent_account
+
+ if not self._project.rpc_check(msg, xact_info=xact_info):
+ return
+
try:
account = self.cfg_agent_accounts[cfg_agent_account_name]
except KeyError:
@@ -243,24 +271,29 @@ class CfgAgentDtsOperdataHandler(object):
xact_info.respond_xpath(rwdts.XactRspCode.ACK)
- yield from self._dts.register(
- xpath=get_xpath(),
- handler=rift.tasklets.DTS.RegistrationHandler(
- on_prepare=on_prepare
- ),
- flags=rwdts.Flag.PUBLISHER,
- )
+ self._rpc_reg = yield from self._dts.register(
+ xpath=get_xpath(),
+ handler=rift.tasklets.DTS.RegistrationHandler(
+ on_prepare=on_prepare
+ ),
+ flags=rwdts.Flag.PUBLISHER,
+ )
@asyncio.coroutine
def register(self):
yield from self._register_show_status()
yield from self._register_validate_rpc()
+ def deregister(self):
+ self._show_reg.deregister()
+ self._rpc_reg.deregister()
+
+
class ConfigAgentJob(object):
"""A wrapper over the config agent job object, providing some
convenience functions.
- YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob contains
+ YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob contains
||
==> VNFRS
||
@@ -274,17 +307,19 @@ class ConfigAgentJob(object):
"running" : "pending",
"failed" : "failure"}
- def __init__(self, nsr_id, job, tasks=None):
+ def __init__(self, nsr_id, job, project, tasks=None):
"""
Args:
nsr_id (uuid): ID of NSR record
- job (YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
+ job (YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
tasks: List of asyncio.tasks. If provided the job monitor will
use it to monitor the tasks instead of the execution IDs
"""
self._job = job
self.nsr_id = nsr_id
self.tasks = tasks
+ self._project = project
+
self._regh = None
@property
@@ -315,10 +350,10 @@ class ConfigAgentJob(object):
@property
def xpath(self):
"""Xpath of the job"""
- return ("D,/nsr:ns-instance-opdata" +
- "/nsr:nsr[nsr:ns-instance-config-ref='{}']" +
- "/nsr:config-agent-job[nsr:job-id='{}']"
- ).format(self.nsr_id, self.id)
+ return self._project.add_project(("D,/nsr:ns-instance-opdata" +
+ "/nsr:nsr[nsr:ns-instance-config-ref={}]" +
+ "/nsr:config-agent-job[nsr:job-id={}]"
+ ).format(quoted_key(self.nsr_id), quoted_key(str(self.id))))
@property
def regh(self):
@@ -331,9 +366,9 @@ class ConfigAgentJob(object):
self._regh = hdl
@staticmethod
- def convert_rpc_input_to_job(nsr_id, rpc_output, tasks):
+ def convert_rpc_input_to_job(nsr_id, rpc_output, tasks, project):
"""A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive
- to YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
+ to YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
Args:
nsr_id (uuid): NSR ID
@@ -344,10 +379,10 @@ class ConfigAgentJob(object):
ConfigAgentJob
"""
# Shortcuts to prevent the HUUGE names.
- CfgAgentJob = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob
- CfgAgentVnfr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
- CfgAgentPrimitive = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
- CfgAgentPrimitiveParam = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
+ CfgAgentJob = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob
+ CfgAgentVnfr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
+ CfgAgentPrimitive = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
+ CfgAgentPrimitiveParam = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
job = CfgAgentJob.from_dict({
"job_id": rpc_output.job_id,
@@ -370,7 +405,8 @@ class ConfigAgentJob(object):
vnf_primitive = CfgAgentPrimitive.from_dict({
"name": primitive.name,
"execution_status": ConfigAgentJob.STATUS_MAP[primitive.execution_status],
- "execution_id": primitive.execution_id
+ "execution_id": primitive.execution_id,
+ "execution_error_details": primitive.execution_error_details,
})
# Copy over the input param
@@ -385,7 +421,7 @@ class ConfigAgentJob(object):
job.vnfr.append(vnfr_job)
- return ConfigAgentJob(nsr_id, job, tasks)
+ return ConfigAgentJob(nsr_id, job, project, tasks)
class ConfigAgentJobMonitor(object):
@@ -432,7 +468,10 @@ class ConfigAgentJobMonitor(object):
format(process, rc, err))
if len(err):
- errs += "{}".format(err)
+ if rc == 0:
+ errs += "{}".format(err)
+ else:
+ errs += "{}".format(err)
result |= rc
if result == 0:
@@ -445,13 +484,10 @@ class ConfigAgentJobMonitor(object):
registration_handle.update_element(self.job.xpath, self.job.job)
- def get_error_details(self):
+ def get_execution_details(self):
'''Get the error details from failed primitives'''
errs = ''
for vnfr in self.job.job.vnfr:
- if vnfr.vnf_job_status != "failure":
- continue
-
for primitive in vnfr.primitive:
if primitive.execution_status == "failure":
errs += ''
@@ -460,7 +496,11 @@ class ConfigAgentJobMonitor(object):
else:
errs += '{}: Unknown error'.format(primitive.name)
errs += ""
-
+ else:
+ if primitive.execution_error_details:
+ errs += '<{status}>{details}{status}>'.format(
+ status=primitive.execution_status,
+ details=primitive.execution_error_details)
return errs
@asyncio.coroutine
@@ -511,14 +551,15 @@ class ConfigAgentJobMonitor(object):
if "failure" in job_status:
self.job.job_status = "failure"
- errs = self.get_error_details()
- if len(errs):
- self.job.job.job_status_details = errs
elif "pending" in job_status:
self.job.job_status = "pending"
else:
self.job.job_status = "success"
+ errs = self.get_execution_details()
+ if len(errs):
+ self.job.job.job_status_details = errs
+
# self.log.debug("Publishing job status: {} at {} for nsr id: {}".format(
# self.job.job_status,
# self.job.xpath,
@@ -526,6 +567,7 @@ class ConfigAgentJobMonitor(object):
registration_handle.update_element(self.job.xpath, self.job.job)
+ registration_handle.update_element(self.job.xpath, self.job.job)
except Exception as e:
self.log.exception(e)
@@ -548,6 +590,9 @@ class ConfigAgentJobMonitor(object):
for primitive in vnfr.primitive:
if primitive.execution_status != 'pending':
+ if primitive.execution_id == "":
+ # We may not have processed the status for these yet
+ job_status.append(primitive.execution_status)
continue
if primitive.execution_id == "":
@@ -555,7 +600,7 @@ class ConfigAgentJobMonitor(object):
job_status.append(primitive.execution_status)
continue
- elif primitive.execution_id == "config":
+ if primitive.execution_id == "config":
# Config job. Check if service is active
task = self.loop.create_task(self.get_service_status(vnfr.id, primitive))
@@ -665,7 +710,7 @@ class CfgAgentJobDtsHandler(object):
self._nsm = nsm
self._regh = None
- self._nsr_regh = None
+ self._project = cfgm.project
@property
def regh(self):
@@ -682,11 +727,10 @@ class CfgAgentJobDtsHandler(object):
""" Return the ConfigManager manager instance """
return self._cfgm
- @staticmethod
- def cfg_job_xpath(nsr_id, job_id):
- return ("D,/nsr:ns-instance-opdata" +
- "/nsr:nsr[nsr:ns-instance-config-ref = '{}']" +
- "/nsr:config-agent-job[nsr:job-id='{}']").format(nsr_id, job_id)
+ def cfg_job_xpath(self, nsr_id, job_id):
+ return self._project.add_project(("D,/nsr:ns-instance-opdata" +
+ "/nsr:nsr[nsr:ns-instance-config-ref={}]" +
+ "/nsr:config-agent-job[nsr:job-id={}]").format(quoted_key(nsr_id), quoted_key(str(job_id))))
@asyncio.coroutine
def register(self):
@@ -697,7 +741,7 @@ class CfgAgentJobDtsHandler(object):
""" prepare callback from dts """
xpath = ks_path.to_xpath(RwNsrYang.get_schema())
if action == rwdts.QueryAction.READ:
- schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.schema()
+ schema = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.schema()
path_entry = schema.keyspec_to_entry(ks_path)
try:
nsr_id = path_entry.key00.ns_instance_config_ref
@@ -716,7 +760,7 @@ class CfgAgentJobDtsHandler(object):
for job in jobs:
xact_info.respond_xpath(
rwdts.XactRspCode.MORE,
- CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id),
+ self.cfg_job_xpath(nsr_id, job.id),
job.job)
except Exception as e:
@@ -728,17 +772,17 @@ class CfgAgentJobDtsHandler(object):
hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
with self._dts.group_create() as group:
- self._regh = group.register(xpath=CfgAgentJobDtsHandler.XPATH,
+ self._regh = group.register(xpath=self._project.add_project(
+ CfgAgentJobDtsHandler.XPATH),
handler=hdl,
flags=rwdts.Flag.PUBLISHER,
)
- @asyncio.coroutine
def _terminate_nsr(self, nsr_id):
self._log.debug("NSR {} being terminated".format(nsr_id))
jobs = self.cfgm.get_job(nsr_id)
for job in jobs:
- path = CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.id)
+ path = self.cfg_job_xpath(nsr_id, job.id)
with self._dts.transaction() as xact:
self._log.debug("Deleting job: {}".format(path))
job.regh.delete_element(path)
@@ -749,40 +793,14 @@ class CfgAgentJobDtsHandler(object):
@property
def nsr_xpath(self):
- return "D,/nsr:ns-instance-opdata/nsr:nsr"
-
- @asyncio.coroutine
- def register_for_nsr(self):
- """ Register for NSR changes """
-
- @asyncio.coroutine
- def on_prepare(xact_info, query_action, ks_path, msg):
- """ This NSR is created """
- self._log.debug("Received NSR instantiate on_prepare (%s:%s:%s)",
- query_action,
- ks_path,
- msg)
-
- if (query_action == rwdts.QueryAction.UPDATE or
- query_action == rwdts.QueryAction.CREATE):
- pass
- elif query_action == rwdts.QueryAction.DELETE:
- nsr_id = msg.ns_instance_config_ref
- asyncio.ensure_future(self._terminate_nsr(nsr_id), loop=self._loop)
- else:
- raise NotImplementedError(
- "%s action on cm-state not supported",
- query_action)
-
- xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+ return self._project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr")
- try:
- handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
- self._nsr_regh = yield from self._dts.register(self.nsr_xpath,
- flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
- handler=handler)
- except Exception as e:
- self._log.error("Failed to register for NSR changes as %s", str(e))
+ def deregister(self):
+ self._log.debug("De-register config agent job for project".
+ format(self._project.name))
+ if self._regh:
+ self._regh.deregister()
+ self._regh = None
class ConfigAgentJobManager(object):
@@ -791,7 +809,7 @@ class ConfigAgentJobManager(object):
TODO: Needs to support multiple config agents.
"""
- def __init__(self, dts, log, loop, nsm):
+ def __init__(self, dts, log, loop, project, nsm):
"""
Args:
dts : Dts handle
@@ -804,11 +822,12 @@ class ConfigAgentJobManager(object):
self.log = log
self.loop = loop
self.nsm = nsm
+ self.project = project
self.handler = CfgAgentJobDtsHandler(dts, log, loop, nsm, self)
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
def add_job(self, rpc_output, tasks=None):
- """Once an RPC is trigger add a now job
+ """Once an RPC is triggered, add a new job
Args:
rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): Rpc output
@@ -818,7 +837,8 @@ class ConfigAgentJobManager(object):
"""
nsr_id = rpc_output.nsr_id_ref
- job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, tasks)
+ job = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output,
+ tasks, self.project)
self.log.debug("Creating a job monitor for Job id: {}".format(
rpc_output.job_id))
@@ -837,6 +857,14 @@ class ConfigAgentJobManager(object):
ca = agent
break
+ def done_callback(fut):
+ e = fut.exception()
+ if e:
+ self.log.error("Exception on monitor job {}: {}".
+ format(rpc_output.job_id, e))
+ fut.print_stack()
+ self.log.debug("Monitor job done for {}".format(rpc_output.job_id))
+
# For every Job we will schedule a new monitoring process.
job_monitor = ConfigAgentJobMonitor(
self.dts,
@@ -847,6 +875,7 @@ class ConfigAgentJobManager(object):
ca
)
task = self.loop.create_task(job_monitor.publish_action_status())
+ task.add_done_callback(done_callback)
def get_job(self, nsr_id):
"""Get the job associated with the NSR Id, if present."""
@@ -863,4 +892,8 @@ class ConfigAgentJobManager(object):
@asyncio.coroutine
def register(self):
yield from self.handler.register()
- yield from self.handler.register_for_nsr()
+ # yield from self.handler.register_for_nsr()
+
+ def deregister(self):
+ self.handler.deregister()
+ self.handler = None