--- /dev/null
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import asyncio
+import concurrent.futures
+import time
+
+from gi.repository import (
+ NsrYang,
+ RwTypes,
+ RwcalYang,
+ RwNsrYang,
+ RwConfigAgentYang,
+ RwDts as rwdts)
+
+import rift.tasklets
+
+import rift.mano.utils.juju_api as juju
+
+
+class ConfigAgentAccountNotFound(Exception):
+ pass
+
+class JujuClient(object):
+ def __init__(self, log, ip, port, user, passwd):
+ self._log = log
+ self._ip = ip
+ self._port = port
+ self._user = user
+ self._passwd = passwd
+
+ self._api = juju.JujuApi(log=log,
+ server=ip, port=port,
+ user=user, secret=passwd)
+
+
+ def validate_account_creds(self):
+ status = RwcalYang.CloudConnectionStatus()
+ try:
+ env = self._api._get_env()
+ except juju.JujuEnvError as e:
+ msg = "JujuClient: Invalid account credentials: %s", str(e)
+ self._log.error(msg)
+ raise Exception(msg)
+ except ConnectionRefusedError as e:
+ msg = "JujuClient: Wrong IP or Port: %s", str(e)
+ self._log.error(msg)
+ raise Exception(msg)
+ except Exception as e:
+ msg = "JujuClient: Connection Failed: %s", str(e)
+ self._log.error(msg)
+ raise Exception(msg)
+ else:
+ status.status = "success"
+ status.details = "Connection was successful"
+ self._log.info("JujuClient: Connection Successful")
+
+ return status
+
+
+class ConfigAgentAccount(object):
+ def __init__(self, log, account_msg):
+ self._log = log
+ self._account_msg = account_msg.deep_copy()
+
+ if account_msg.account_type == "juju":
+ self._cfg_agent_client_plugin = JujuClient(
+ log,
+ account_msg.juju.ip_address,
+ account_msg.juju.port,
+ account_msg.juju.user,
+ account_msg.juju.secret)
+ else:
+ self._cfg_agent_client_plugin = None
+
+ self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
+ status="unknown",
+ details="Connection status lookup not started"
+ )
+
+ self._validate_task = None
+
+ @property
+ def name(self):
+ return self._account_msg.name
+
+ @property
+ def account_msg(self):
+ return self._account_msg
+
+ @property
+ def account_type(self):
+ return self._account_msg.account_type
+
+ @property
+ def connection_status(self):
+ return self._status
+
+ def update_from_cfg(self, cfg):
+ self._log.debug("Updating parent ConfigAgentAccount to %s", cfg)
+ raise NotImplementedError("Update config agent account not yet supported")
+
+ @asyncio.coroutine
+ def validate_cfg_agent_account_credentials(self, loop):
+ self._log.debug("Validating Config Agent Account %s, credential status %s", self._account_msg, self._status)
+
+ self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
+ status="validating",
+ details="Config Agent account connection validation in progress"
+ )
+
+ if self._cfg_agent_client_plugin is None:
+ self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
+ status="unknown",
+ details="Config Agent account does not support validation of account creds"
+ )
+ else:
+ try:
+ status = yield from loop.run_in_executor(
+ None,
+ self._cfg_agent_client_plugin.validate_account_creds
+ )
+ self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus.from_dict(status.as_dict())
+ except Exception as e:
+ self._status = RwConfigAgentYang.ConfigAgentAccount_ConnectionStatus(
+ status="failure",
+ details="Error - " + str(e)
+ )
+
+ self._log.info("Got config agent account validation response: %s", self._status)
+
+ def start_validate_credentials(self, loop):
+ if self._validate_task is not None:
+ self._validate_task.cancel()
+ self._validate_task = None
+
+ self._validate_task = asyncio.ensure_future(
+ self.validate_cfg_agent_account_credentials(loop),
+ loop=loop
+ )
+
+class CfgAgentDtsOperdataHandler(object):
+ def __init__(self, dts, log, loop):
+ self._dts = dts
+ self._log = log
+ self._loop = loop
+
+ self.cfg_agent_accounts = {}
+
+ def add_cfg_agent_account(self, account_msg):
+ account = ConfigAgentAccount(self._log, account_msg)
+ self.cfg_agent_accounts[account.name] = account
+ self._log.info("ConfigAgent Operdata Handler added. Starting account validation")
+
+ account.start_validate_credentials(self._loop)
+
+ def delete_cfg_agent_account(self, account_name):
+ del self.cfg_agent_accounts[account_name]
+ self._log.info("ConfigAgent Operdata Handler deleted.")
+
+ def get_saved_cfg_agent_accounts(self, cfg_agent_account_name):
+ ''' Get Config Agent Account corresponding to passed name, or all saved accounts if name is None'''
+ saved_cfg_agent_accounts = []
+
+ if cfg_agent_account_name is None or cfg_agent_account_name == "":
+ cfg_agent_accounts = list(self.cfg_agent_accounts.values())
+ saved_cfg_agent_accounts.extend(cfg_agent_accounts)
+ elif cfg_agent_account_name in self.cfg_agent_accounts:
+ account = self.cfg_agent_accounts[cfg_agent_account_name]
+ saved_cfg_agent_accounts.append(account)
+ else:
+ errstr = "Config Agent account {} does not exist".format(cfg_agent_account_name)
+ raise KeyError(errstr)
+
+ return saved_cfg_agent_accounts
+
+
+ def _register_show_status(self):
+ def get_xpath(cfg_agent_name=None):
+ return "D,/rw-config-agent:config-agent/account{}/connection-status".format(
+ "[name='%s']" % cfg_agent_name if cfg_agent_name is not None else ''
+ )
+
+ @asyncio.coroutine
+ def on_prepare(xact_info, action, ks_path, msg):
+ path_entry = RwConfigAgentYang.ConfigAgentAccount.schema().keyspec_to_entry(ks_path)
+ cfg_agent_account_name = path_entry.key00.name
+ self._log.debug("Got show cfg_agent connection status request: %s", ks_path.create_string())
+
+ try:
+ saved_accounts = self.get_saved_cfg_agent_accounts(cfg_agent_account_name)
+ for account in saved_accounts:
+ connection_status = account.connection_status
+ self._log.debug("Responding to config agent connection status request: %s", connection_status)
+ xact_info.respond_xpath(
+ rwdts.XactRspCode.MORE,
+ xpath=get_xpath(account.name),
+ msg=account.connection_status,
+ )
+ except KeyError as e:
+ self._log.warning(str(e))
+ xact_info.respond_xpath(rwdts.XactRspCode.NA)
+ return
+
+ xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+
+ yield from self._dts.register(
+ xpath=get_xpath(),
+ handler=rift.tasklets.DTS.RegistrationHandler(
+ on_prepare=on_prepare),
+ flags=rwdts.Flag.PUBLISHER,
+ )
+
+ def _register_validate_rpc(self):
+ def get_xpath():
+ return "/rw-config-agent:update-cfg-agent-status"
+
+ @asyncio.coroutine
+ def on_prepare(xact_info, action, ks_path, msg):
+ if not msg.has_field("cfg_agent_account"):
+ raise ConfigAgentAccountNotFound("Config Agent account name not provided")
+
+ cfg_agent_account_name = msg.cfg_agent_account
+ try:
+ account = self.cfg_agent_accounts[cfg_agent_account_name]
+ except KeyError:
+ raise ConfigAgentAccountNotFound("Config Agent account name %s not found" % cfg_agent_account_name)
+
+ account.start_validate_credentials(self._loop)
+
+ xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+
+ yield from self._dts.register(
+ xpath=get_xpath(),
+ handler=rift.tasklets.DTS.RegistrationHandler(
+ on_prepare=on_prepare
+ ),
+ flags=rwdts.Flag.PUBLISHER,
+ )
+
+ @asyncio.coroutine
+ def register(self):
+ yield from self._register_show_status()
+ yield from self._register_validate_rpc()
+
+class ConfigAgentJob(object):
+ """A wrapper over the config agent job object, providing some
+ convenience functions.
+
+ YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob contains
+ ||
+ ==> VNFRS
+ ||
+ ==> Primitives
+
+ """
+ # The normalizes the state terms from Juju to our yang models
+ # Juju : Yang model
+ STATUS_MAP = {"completed": "success",
+ "pending" : "pending",
+ "running" : "pending",
+ "failed" : "failure"}
+
+ def __init__(self, nsr_id, job, tasks=None):
+ """
+ Args:
+ nsr_id (uuid): ID of NSR record
+ job (YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob): Gi object
+ tasks: List of asyncio.tasks. If provided the job monitor will
+ use it to monitor the tasks instead of the execution IDs
+ """
+ self._job = job
+ self.nsr_id = nsr_id
+ self.tasks = tasks
+
+ @property
+ def id(self):
+ """Job id"""
+ return self._job.job_id
+
+ @property
+ def name(self):
+ """Job name"""
+ return self._job.job_name
+
+ @property
+ def job_status(self):
+ """Status of the job (success|pending|failure)"""
+ return self._job.job_status
+
+ @job_status.setter
+ def job_status(self, value):
+ """Setter for job status"""
+ self._job.job_status = value
+
+ @property
+ def job(self):
+ """Gi object"""
+ return self._job
+
+ @property
+ def xpath(self):
+ """Xpath of the job"""
+ return ("D,/nsr:ns-instance-opdata" +
+ "/nsr:nsr[nsr:ns-instance-config-ref='{}']" +
+ "/nsr:config-agent-job[nsr:job-id='{}']"
+ ).format(self.nsr_id, self.id)
+
+ @staticmethod
+ def convert_rpc_input_to_job(nsr_id, rpc_output, tasks):
+ """A helper function to convert the YangOutput_Nsr_ExecNsConfigPrimitive
+ to YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob (NsrYang)
+
+ Args:
+ nsr_id (uuid): NSR ID
+ rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): RPC output
+ tasks (list): A list of asyncio.Tasks
+
+ Returns:
+ ConfigAgentJob
+ """
+ # Shortcuts to prevent the HUUGE names.
+ CfgAgentJob = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob
+ CfgAgentVnfr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr
+ CfgAgentPrimitive = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive
+ CfgAgentPrimitiveParam = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConfigAgentJob_Vnfr_Primitive_Parameter
+
+ job = CfgAgentJob.from_dict({
+ "job_id": rpc_output.job_id,
+ "job_name" : rpc_output.name,
+ "job_status": "pending",
+ "triggered_by": rpc_output.triggered_by,
+ "create_time": rpc_output.create_time,
+ "job_status_details": rpc_output.job_status_details if rpc_output.job_status_details is not None else None,
+ "parameter": [param.as_dict() for param in rpc_output.parameter],
+ "parameter_group": [pg.as_dict() for pg in rpc_output.parameter_group]
+ })
+
+ for vnfr in rpc_output.vnf_out_list:
+ vnfr_job = CfgAgentVnfr.from_dict({
+ "id": vnfr.vnfr_id_ref,
+ "vnf_job_status": "pending",
+ })
+
+ for primitive in vnfr.vnf_out_primitive:
+ vnf_primitive = CfgAgentPrimitive.from_dict({
+ "name": primitive.name,
+ "execution_status": ConfigAgentJob.STATUS_MAP[primitive.execution_status],
+ "execution_id": primitive.execution_id
+ })
+
+ # Copy over the input param
+ for param in primitive.parameter:
+ vnf_primitive.parameter.append(
+ CfgAgentPrimitiveParam.from_dict({
+ "name": param.name,
+ "value": param.value
+ }))
+
+ vnfr_job.primitive.append(vnf_primitive)
+
+ job.vnfr.append(vnfr_job)
+
+ return ConfigAgentJob(nsr_id, job, tasks)
+
+
+class ConfigAgentJobMonitor(object):
+ """Job monitor: Polls the Juju controller and get the status.
+ Rules:
+ If all Primitive are success, then vnf & nsr status will be "success"
+ If any one Primitive reaches a failed state then both vnf and nsr will fail.
+ """
+ POLLING_PERIOD = 2
+
+ def __init__(self, dts, log, job, executor, loop, config_plugin):
+ """
+ Args:
+ dts : DTS handle
+ log : log handle
+ job (ConfigAgentJob): ConfigAgentJob instance
+ executor (concurrent.futures): Executor for juju status api calls
+ loop (eventloop): Current event loop instance
+ config_plugin : Config plugin to be used.
+ """
+ self.job = job
+ self.log = log
+ self.loop = loop
+ self.executor = executor
+ self.polling_period = ConfigAgentJobMonitor.POLLING_PERIOD
+ self.config_plugin = config_plugin
+ self.dts = dts
+
+ @asyncio.coroutine
+ def _monitor_processes(self, registration_handle):
+ result = 0
+ for process in self.job.tasks:
+ rc = yield from process
+ self.log.debug("Process {} returned rc: {}".format(process, rc))
+ result |= rc
+
+ if result == 0:
+ self.job.job_status = "success"
+ else:
+ self.job.job_status = "failure"
+
+ registration_handle.update_element(self.job.xpath, self.job.job)
+
+ def get_error_details(self):
+ '''Get the error details from failed primitives'''
+ errs = ''
+ for vnfr in self.job.job.vnfr:
+ if vnfr.vnf_job_status != "failure":
+ continue
+
+ for primitive in vnfr.primitive:
+ if primitive.execution_status == "failure":
+ errs += '<error>'
+ errs += primitive.execution_error_details
+ errs += "</error>"
+
+ return errs
+
+ @asyncio.coroutine
+ def publish_action_status(self):
+ """
+ Starts publishing the status for jobs/primitives
+ """
+ registration_handle = yield from self.dts.register(
+ xpath=self.job.xpath,
+ handler=rift.tasklets.DTS.RegistrationHandler(),
+ flags=(rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ),
+ )
+
+ self.log.debug('preparing to publish job status for {}'.format(self.job.xpath))
+
+ try:
+ registration_handle.create_element(self.job.xpath, self.job.job)
+
+ # If the config is done via a user defined script
+ if self.job.tasks is not None:
+ yield from self._monitor_processes(registration_handle)
+ return
+
+ prev = time.time()
+ # Run until pending moves to either failure/success
+ while self.job.job_status == "pending":
+ curr = time.time()
+
+ if curr - prev < self.polling_period:
+ pause = self.polling_period - (curr - prev)
+ yield from asyncio.sleep(pause, loop=self.loop)
+
+ prev = time.time()
+
+ tasks = []
+ for vnfr in self.job.job.vnfr:
+ task = self.loop.create_task(self.get_vnfr_status(vnfr))
+ tasks.append(task)
+
+ # Exit, if no tasks are found
+ if not tasks:
+ break
+
+ yield from asyncio.wait(tasks, loop=self.loop)
+
+ job_status = [task.result() for task in tasks]
+
+ if "failure" in job_status:
+ self.job.job_status = "failure"
+ errs = self.get_error_details()
+ if len(errs):
+ self.job.job.job_status_details = errs
+ elif "pending" in job_status:
+ self.job.job_status = "pending"
+ else:
+ self.job.job_status = "success"
+
+ # self.log.debug("Publishing job status: {} at {} for nsr id: {}".format(
+ # self.job.job_status,
+ # self.job.xpath,
+ # self.job.nsr_id))
+
+ registration_handle.update_element(self.job.xpath, self.job.job)
+
+
+ except Exception as e:
+ self.log.exception(e)
+ raise
+
+
+ @asyncio.coroutine
+ def get_vnfr_status(self, vnfr):
+ """Schedules tasks for all containing primitives and updates it's own
+ status.
+
+ Args:
+ vnfr : Vnfr job record containing primitives.
+
+ Returns:
+ (str): "success|failure|pending"
+ """
+ tasks = []
+ job_status = []
+
+ for primitive in vnfr.primitive:
+ if primitive.execution_id == "":
+ # TODO: For some config data, the id will be empty, check if
+ # mapping is needed.
+ job_status.append(primitive.execution_status)
+ continue
+
+ task = self.loop.create_task(self.get_primitive_status(primitive))
+ tasks.append(task)
+
+ if tasks:
+ yield from asyncio.wait(tasks, loop=self.loop)
+
+ job_status.extend([task.result() for task in tasks])
+ if "failure" in job_status:
+ vnfr.vnf_job_status = "failure"
+ return "failure"
+
+ elif "pending" in job_status:
+ vnfr.vnf_job_status = "pending"
+ return "pending"
+
+ else:
+ vnfr.vnf_job_status = "success"
+ return "success"
+
+ @asyncio.coroutine
+ def get_primitive_status(self, primitive):
+ """
+ Queries the juju api and gets the status of the execution id.
+
+ Args:
+ primitive : Primitive containing the execution ID.
+ """
+
+ try:
+ resp = yield from self.loop.run_in_executor(
+ self.executor,
+ self.config_plugin.get_action_status,
+ primitive.execution_id
+ )
+
+ status = resp['status']
+ if status == 'failed':
+ self.log.warning("Execution of action {} failed: {}".
+ format(primitive.execution_id, resp))
+ primitive.execution_error_details = resp['message']
+
+ except Exception as e:
+ self.log.exception(e)
+ status = "failed"
+
+ # Handle case status is None
+ if status:
+ primitive.execution_status = ConfigAgentJob.STATUS_MAP[status]
+ else:
+ primitive.execution_status = "failure"
+
+ return primitive.execution_status
+
+
+class CfgAgentJobDtsHandler(object):
+ """Dts Handler for CfgAgent"""
+ XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr/nsr:config-agent-job"
+
+ def __init__(self, dts, log, loop, nsm, cfgm):
+ """
+ Args:
+ dts : Dts Handle.
+ log : Log handle.
+ loop : Event loop.
+ nsm : NsmManager.
+ cfgm : ConfigManager.
+ """
+ self._dts = dts
+ self._log = log
+ self._loop = loop
+ self._cfgm = cfgm
+ self._nsm = nsm
+
+ self._regh = None
+
+ @property
+ def regh(self):
+ """ Return registration handle """
+ return self._regh
+
+ @property
+ def nsm(self):
+ """ Return the NSManager manager instance """
+ return self._nsm
+
+ @property
+ def cfgm(self):
+ """ Return the ConfigManager manager instance """
+ return self._cfgm
+
+ @staticmethod
+ def cfg_job_xpath(nsr_id, job_id):
+ return ("D,/nsr:ns-instance-opdata" +
+ "/nsr:nsr[nsr:ns-instance-config-ref = '{}']" +
+ "/nsr:config-agent-job[nsr:job-id='{}']").format(nsr_id, job_id)
+
+ @asyncio.coroutine
+ def register(self):
+ """ Register for NS monitoring read from dts """
+
+ @asyncio.coroutine
+ def on_prepare(xact_info, action, ks_path, msg):
+ """ prepare callback from dts """
+ xpath = ks_path.to_xpath(RwNsrYang.get_schema())
+ if action == rwdts.QueryAction.READ:
+ schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.schema()
+ path_entry = schema.keyspec_to_entry(ks_path)
+ try:
+ nsr_id = path_entry.key00.ns_instance_config_ref
+
+ #print("###>>> self.nsm.nsrs:", self.nsm.nsrs)
+ nsr_ids = []
+ if nsr_id is None or nsr_id == "":
+ nsrs = list(self.nsm.nsrs.values())
+ nsr_ids = [nsr.id for nsr in nsrs if nsr is not None]
+ else:
+ nsr_ids = [nsr_id]
+
+ for nsr_id in nsr_ids:
+ job = self.cfgm.get_job(nsr_id)
+
+ # If no jobs are queued for the NSR
+ if job is None:
+ continue
+
+ xact_info.respond_xpath(
+ rwdts.XactRspCode.MORE,
+ CfgAgentJobDtsHandler.cfg_job_xpath(nsr_id, job.job_id),
+ job)
+
+ except Exception as e:
+ self._log.exception("Caught exception:%s", str(e))
+ xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+
+ else:
+ xact_info.respond_xpath(rwdts.XactRspCode.NA)
+
+ hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
+ with self._dts.group_create() as group:
+ self._regh = group.register(xpath=CfgAgentJobDtsHandler.XPATH,
+ handler=hdl,
+ flags=rwdts.Flag.PUBLISHER,
+ )
+
+
+class ConfigAgentJobManager(object):
+ """A central class that manager all the Config Agent related data,
+ Including updating the status
+
+ TODO: Needs to support multiple config agents.
+ """
+ def __init__(self, dts, log, loop, nsm):
+ """
+ Args:
+ dts : Dts handle
+ log : Log handler
+ loop : Event loop
+ nsm : NsmTasklet instance
+ """
+ self.jobs = {}
+ self.dts = dts
+ self.log = log
+ self.loop = loop
+ self.nsm = nsm
+ self.handler = CfgAgentJobDtsHandler(dts, log, loop, nsm, self)
+ self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
+
+ def add_job(self, rpc_output, tasks=None):
+ """Once an RPC is trigger add a now job
+
+ Args:
+ rpc_output (YangOutput_Nsr_ExecNsConfigPrimitive): Rpc output
+ rpc_input (YangInput_Nsr_ExecNsConfigPrimitive): Rpc input
+ tasks(list) A list of asyncio.Tasks
+
+ """
+ nsr_id = rpc_output.nsr_id_ref
+
+ self.jobs[nsr_id] = ConfigAgentJob.convert_rpc_input_to_job(nsr_id, rpc_output, tasks)
+
+ self.log.debug("Creating a job monitor for Job id: {}".format(
+ rpc_output.job_id))
+
+ # For every Job we will schedule a new monitoring process.
+ job_monitor = ConfigAgentJobMonitor(
+ self.dts,
+ self.log,
+ self.jobs[nsr_id],
+ self.executor,
+ self.loop,
+ self.nsm.config_agent_plugins[0] # Hack
+ )
+ task = self.loop.create_task(job_monitor.publish_action_status())
+
+ def get_job(self, nsr_id):
+ """Get the job associated with the NSR Id, if present."""
+ try:
+ return self.jobs[nsr_id].job
+ except KeyError:
+ return None
+
+ @asyncio.coroutine
+ def register(self):
+ yield from self.handler.register()