-#
+#
# Copyright 2016 RIFT.IO Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
#
import asyncio
+import os
+import re
+import tempfile
import time
+import yaml
+
+from . import riftcm_config_plugin
import rift.mano.config_agent
+
import gi
gi.require_version('RwDts', '1.0')
gi.require_version('RwNsrYang', '1.0')
GET_NS_CONF_XPATH = "I,/nsr:get-ns-service-primitive-values"
GET_NS_CONF_O_XPATH = "O,/nsr:get-ns-service-primitive-values"
- def __init__(self, dts, log, loop, nsm):
+ def __init__(self, dts, log, loop, project, nsm):
self._dts = dts
self._log = log
self._loop = loop
+ self._project = project
self._nsm = nsm
self._ns_regh = None
self._vnf_regh = None
self._get_ns_conf_regh = None
- self.job_manager = rift.mano.config_agent.ConfigAgentJobManager(dts, log, loop, nsm)
+ self.job_manager = rift.mano.config_agent.ConfigAgentJobManager(dts, log, loop,
+ project, nsm)
+
+ self._rift_install_dir = os.environ['RIFT_INSTALL']
+ self._rift_var_root_dir = os.environ['RIFT_VAR_ROOT']
@property
def reghs(self):
""" Return the NS manager instance """
return self._nsm
+ def deregister(self):
+ self._log.debug("De-register conman rpc handlers for project {}".
+ format(self._project))
+ for reg in self.reghs:
+ if reg:
+ reg.deregister()
+ reg = None
+
+ self.job_manager.deregister()
+ self.job_manager = None
+
def prepare_meta(self, rpc_ip):
try:
if vnf:
self._log.debug("nsr/vnf {}/{}, vnf_configuration: %s",
vnf.vnf_configuration)
- for primitive in vnf.vnf_configuration.service_primitive:
+ for primitive in vnf.vnf_configuration.config_primitive:
if primitive.name == primitive_name:
return primitive
raise ValueError("Could not find nsr/vnf {}/{} primitive {}"
.format(nsr_id, vnfr_id, primitive_name))
+ @asyncio.coroutine
+ def _apply_ns_config(self, agent_nsr, agent_vnfrs, rpc_ip):
+ """
+ Hook: Runs the user defined script. Feeds all the necessary data
+ for the script thro' yaml file.
+
+ TBD: Add support to pass multiple CA accounts if configures
+ Remove apply_ns_config from the Config Agent Plugins
+
+ Args:
+ rpc_ip (YangInput_Nsr_ExecNsConfigPrimitive): The input data.
+ nsr (NetworkServiceRecord): Description
+ vnfrs (dict): VNFR ID => VirtualNetworkFunctionRecord
+
+ """
+ def xlate(tag, tags):
+ # TBD
+ if tag is None or tags is None:
+ return tag
+ val = tag
+ if re.search('<.*>', tag):
+ try:
+ if tag == '<rw_mgmt_ip>':
+ val = tags['rw_mgmt_ip']
+ except KeyError as e:
+ self._log.info("RiftCA: Did not get a value for tag %s, e=%s",
+ tag, e)
+ return val
+
+ def get_meta(agent_nsr, agent_vnfrs):
+ unit_names, initial_params, vnfr_index_map, vnfr_data_map = {}, {}, {}, {}
+
+ for vnfr_id in agent_nsr.vnfr_ids:
+ vnfr = agent_vnfrs[vnfr_id]
+ self._log.debug("CA-RPC: VNFR metadata: {}".format(vnfr))
+
+ # index->vnfr ref
+ vnfr_index_map[vnfr.member_vnf_index] = vnfr_id
+ vnfr_data_dict = dict()
+ if 'mgmt_interface' in vnfr.vnfr:
+ vnfr_data_dict['mgmt_interface'] = vnfr.vnfr['mgmt_interface']
+
+ vnfr_data_dict['name'] = vnfr.vnfr['name']
+ vnfr_data_dict['connection_point'] = []
+ if 'connection_point' in vnfr.vnfr:
+ for cp in vnfr.vnfr['connection_point']:
+ cp_dict = dict(name = cp['name'],
+ ip_address = cp['ip_address'],
+ connection_point_id = cp['connection_point_id'])
+ if 'virtual_cps' in cp:
+ cp_info['virtual_cps'] = [ {k:v for k,v in vcp.items()
+ if k in ['ip_address', 'mac_address']}
+ for vcp in cp['virtual_cps'] ]
+
+ vnfr_data_dict['connection_point'].append(cp_dict)
+
+ try:
+ vnfr_data_dict['vdur'] = []
+ vdu_data = [(vdu['name'], vdu['management_ip'], vdu['vm_management_ip'], vdu['id'], vdu['vdu_id_ref'])
+ for vdu in vnfr.vnfr['vdur']]
+
+ for data in vdu_data:
+ data = dict(zip(['name', 'management_ip', 'vm_management_ip', 'id', 'vdu_id_ref'] , data))
+ vnfr_data_dict['vdur'].append(data)
+
+ vnfr_data_map[vnfr.member_vnf_index] = vnfr_data_dict
+ except KeyError as e:
+ self._log.warn("Error getting VDU data for VNFR {}".format(vnfr))
+
+ # Unit name
+ unit_names[vnfr_id] = None
+ for config_plugin in self.nsm.config_agent_plugins:
+ name = config_plugin.get_service_name(vnfr_id)
+ if name:
+ unit_names[vnfr_id] = name
+ break
+
+ # Flatten the data for simplicity
+ param_data = {}
+ if 'initial_config_primitive' in vnfr.vnf_configuration:
+ for primitive in vnfr.vnf_configuration['initial_config_primitive']:
+ if 'parameter' in primitive:
+ for parameter in primitive['parameter']:
+ try:
+ value = xlate(parameter['value'], vnfr.tags)
+ param_data[parameter['name']] = value
+ except KeyError as e:
+ self._log.warn("Unable to parse the parameter{}: {}".
+ format(parameter))
+
+ initial_params[vnfr_id] = param_data
+
+
+ return unit_names, initial_params, vnfr_index_map, vnfr_data_map
+
+ def get_config_agent():
+ ret = {}
+ for config_plugin in self.nsm.config_agent_plugins:
+ if config_plugin.agent_type in [riftcm_config_plugin.DEFAULT_CAP_TYPE]:
+ ret = config_plugin.agent_data
+ else:
+ # Currently the first non default plugin is returned
+ return config_plugin.agent_data
+ return ret
+
+ unit_names, init_data, vnfr_index_map, vnfr_data_map = get_meta(agent_nsr, agent_vnfrs)
+
+ # The data consists of 4 sections
+ # 1. Account data
+ # 2. The input passed.
+ # 3. Juju unit names (keyed by vnfr ID).
+ # 4. Initial config data (keyed by vnfr ID).
+ data = dict()
+ data['config_agent'] = get_config_agent()
+ data["rpc_ip"] = rpc_ip.as_dict()
+ data["unit_names"] = unit_names
+ data["init_config"] = init_data
+ data["vnfr_index_map"] = vnfr_index_map
+ data["vnfr_data_map"] = vnfr_data_map
+
+ tmp_file = None
+ with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
+ tmp_file.write(yaml.dump(data, default_flow_style=True)
+ .encode("UTF-8"))
+
+ self._log.debug("CA-RPC: Creating a temp file {} with input data: {}".
+ format(tmp_file.name, data))
+
+ # Get the full path to the script
+ script = ''
+ if rpc_ip.user_defined_script[0] == '/':
+ # The script has full path, use as is
+ script = rpc_ip.user_defined_script
+ else:
+ script = os.path.join(self._rift_var_root_dir,
+ 'launchpad/packages/nsd',
+ self._project.name,
+ agent_nsr.nsd_id, 'scripts',
+ rpc_ip.user_defined_script)
+ self._log.debug("CA-RPC: Checking for script in %s", script)
+
+ cmd = "{} {}".format(script, tmp_file.name)
+ self._log.debug("CA-RPC: Running the CMD: {}".format(cmd))
+
+ process = yield from asyncio.create_subprocess_shell(
+ cmd)
+
+ return process
+
@asyncio.coroutine
def register(self):
""" Register for NS monitoring read from dts """
def on_ns_config_prepare(xact_info, action, ks_path, msg):
""" prepare callback from dts exec-ns-service-primitive"""
assert action == rwdts.QueryAction.RPC
+
+ if not self._project.rpc_check(msg, xact_info):
+ return
+
rpc_ip = msg
rpc_op = NsrYang.YangOutput_Nsr_ExecNsServicePrimitive.from_dict({
"triggered_by": rpc_ip.triggered_by,
if nsd_cfg_prim_msg and nsd_cfg_prim_msg.has_field("user_defined_script"):
rpc_ip.user_defined_script = nsd_cfg_prim_msg.user_defined_script
- tasks = []
- for config_plugin in self.nsm.config_agent_plugins:
- task, err = yield from config_plugin.apply_ns_config(
- nsr,
- vnfrs,
- rpc_ip)
- tasks.append(task)
- if err:
- rpc_op.job_status_details = err.decode()
-
- self.job_manager.add_job(rpc_op, tasks)
+ task = yield from self._apply_ns_config(
+ nsr,
+ vnfrs,
+ rpc_ip)
+
+ self.job_manager.add_job(rpc_op, [task])
else:
# Otherwise create VNF primitives.
for vnf in rpc_ip.vnf_list:
vnf_op.vnfr_id_ref = vnfr_id
vnf_op.member_vnf_index_ref = vnf_member_idx
+ idx = 0
for primitive in vnf.vnf_primitive:
op_primitive = vnf_op.vnf_out_primitive.add()
+ op_primitive.index = idx
+ idx += 1
op_primitive.name = primitive.name
op_primitive.execution_id = ''
- op_primitive.execution_status = 'completed'
+ op_primitive.execution_status = 'pending'
op_primitive.execution_error_details = ''
# Copy over the VNF pimitive's input parameters
nsr_param_pool.add_used_value(param.value)
for config_plugin in self.nsm.config_agent_plugins:
+ # TODO: Execute these in separate threads to prevent blocking
yield from config_plugin.vnf_config_primitive(nsr_id,
vnfr_id,
primitive,
@asyncio.coroutine
def on_get_ns_config_values_prepare(xact_info, action, ks_path, msg):
assert action == rwdts.QueryAction.RPC
+
+ if not self._project.rpc_check(msg, xact_info):
+ return
+
nsr_id = msg.nsr_id_ref
cfg_prim_name = msg.name
try:
handler=hdl_ns_get,
flags=rwdts.Flag.PUBLISHER,
)
-
-