#
import asyncio
+import os
+import re
+import tempfile
import time
+import yaml
+
+from . import riftcm_config_plugin
import rift.mano.config_agent
+
import gi
gi.require_version('RwDts', '1.0')
gi.require_version('RwNsrYang', '1.0')
self.job_manager = rift.mano.config_agent.ConfigAgentJobManager(dts, log, loop, nsm)
+ self._rift_install_dir = os.environ['RIFT_INSTALL']
+ self._rift_artif_dir = os.environ['RIFT_ARTIFACTS']
+
@property
def reghs(self):
""" Return registration handles """
raise ValueError("Could not find nsr/vnf {}/{} primitive {}"
.format(nsr_id, vnfr_id, primitive_name))
+ @asyncio.coroutine
+ def _apply_ns_config(self, agent_nsr, agent_vnfrs, rpc_ip):
+ """
+ Hook: Runs the user defined script. Feeds all the necessary data
+ for the script thro' yaml file.
+
+ TBD: Add support to pass multiple CA accounts if configures
+ Remove apply_ns_config from the Config Agent Plugins
+
+ Args:
+ rpc_ip (YangInput_Nsr_ExecNsConfigPrimitive): The input data.
+ nsr (NetworkServiceRecord): Description
+ vnfrs (dict): VNFR ID => VirtualNetworkFunctionRecord
+
+ """
+ def xlate(tag, tags):
+ # TBD
+ if tag is None or tags is None:
+ return tag
+ val = tag
+ if re.search('<.*>', tag):
+ try:
+ if tag == '<rw_mgmt_ip>':
+ val = tags['rw_mgmt_ip']
+ except KeyError as e:
+ self._log.info("RiftCA: Did not get a value for tag %s, e=%s",
+ tag, e)
+ return val
+
+ def get_meta(agent_nsr, agent_vnfrs):
+ unit_names, initial_params, vnfr_index_map, vnfr_data_map = {}, {}, {}, {}
+
+ for vnfr_id in agent_nsr.vnfr_ids:
+ vnfr = agent_vnfrs[vnfr_id]
+ self._log.debug("CA_RPC: VNFR metadata: {}".format(vnfr))
+
+ # index->vnfr ref
+ vnfr_index_map[vnfr.member_vnf_index] = vnfr_id
+ vnfr_data_dict = dict()
+ if 'mgmt_interface' in vnfr.vnfr:
+ vnfr_data_dict['mgmt_interface'] = vnfr.vnfr['mgmt_interface']
+
+ vnfr_data_dict['connection_point'] = []
+ if 'connection_point' in vnfr.vnfr:
+ for cp in vnfr.vnfr['connection_point']:
+ cp_dict = dict()
+ cp_dict['name'] = cp['name']
+ cp_dict['ip_address'] = cp['ip_address']
+ vnfr_data_dict['connection_point'].append(cp_dict)
+
+ try:
+ vnfr_data_dict['vdur'] = []
+ vdu_data = [(vdu['name'], vdu['management_ip'], vdu['vm_management_ip'], vdu['id'])
+ for vdu in vnfr.vnfr['vdur']]
+
+ for data in vdu_data:
+ data = dict(zip(['name', 'management_ip', 'vm_management_ip', 'id'] , data))
+ vnfr_data_dict['vdur'].append(data)
+
+ vnfr_data_map[vnfr.member_vnf_index] = vnfr_data_dict
+ except KeyError as e:
+ self._log.warn("Error getting VDU data for VNFR {}".format(vnfr))
+
+ # Unit name
+ unit_names[vnfr_id] = None
+ for config_plugin in self.nsm.config_agent_plugins:
+ name = config_plugin.get_service_name(vnfr_id)
+ if name:
+ unit_names[vnfr_id] = name
+ break
+
+ # Flatten the data for simplicity
+ param_data = {}
+ if 'initial_config_primitive' in vnfr.vnf_configuration:
+ for primitive in vnfr.vnf_configuration['initial_config_primitive']:
+ if 'parameter' in primitive:
+ for parameter in primitive['parameter']:
+ value = xlate(parameter['value'], vnfr.tags)
+ param_data[parameter.name] = value
+
+ initial_params[vnfr_id] = param_data
+
+
+ return unit_names, initial_params, vnfr_index_map, vnfr_data_map
+
+ def get_config_agent():
+ ret = {}
+ for config_plugin in self.nsm.config_agent_plugins:
+ if config_plugin.agent_type in [riftcm_config_plugin.DEFAULT_CAP_TYPE]:
+ ret = config_plugin.agent_data
+ else:
+ # Currently the first non default plugin is returned
+ return config_plugin.agent_data
+ return ret
+
+ unit_names, init_data, vnfr_index_map, vnf_data_map = get_meta(agent_nsr, agent_vnfrs)
+
+ # The data consists of 4 sections
+ # 1. Account data
+ # 2. The input passed.
+ # 3. Juju unit names (keyed by vnfr ID).
+ # 4. Initial config data (keyed by vnfr ID).
+ data = dict()
+ data['config_agent'] = get_config_agent()
+ data["rpc_ip"] = rpc_ip.as_dict()
+ data["unit_names"] = unit_names
+ data["init_config"] = init_data
+ data["vnfr_index_map"] = vnfr_index_map
+ data["vnfr_data_map"] = vnfr_data_map
+
+ tmp_file = None
+ with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
+ tmp_file.write(yaml.dump(data, default_flow_style=True)
+ .encode("UTF-8"))
+
+ self._log.debug("CA_RPC: Creating a temp file {} with input data: {}".
+ format(tmp_file.name, data))
+
+ # Get the full path to the script
+ script = ''
+ if rpc_ip.user_defined_script[0] == '/':
+ # The script has full path, use as is
+ script = rpc_ip.user_defined_script
+ else:
+ script = os.path.join(self._rift_artif_dir, 'launchpad/libs', agent_nsr.id, 'scripts',
+ rpc_ip.user_defined_script)
+ self.log.debug("CA_RPC: Checking for script in %s", script)
+ if not os.path.exists(script):
+ script = os.path.join(self._rift_install_dir, 'usr/bin', rpc_ip.user_defined_script)
+
+ cmd = "{} {}".format(rpc_ip.user_defined_script, tmp_file.name)
+ self._log.debug("CA_RPC: Running the CMD: {}".format(cmd))
+
+ coro = asyncio.create_subprocess_shell(cmd, loop=self._loop,
+ stderr=asyncio.subprocess.PIPE)
+ process = yield from coro
+ err = yield from process.stderr.read()
+ task = self._loop.create_task(process.wait())
+
+ return task, err
+
@asyncio.coroutine
def register(self):
""" Register for NS monitoring read from dts """
rpc_ip.user_defined_script = nsd_cfg_prim_msg.user_defined_script
tasks = []
- for config_plugin in self.nsm.config_agent_plugins:
- task, err = yield from config_plugin.apply_ns_config(
- nsr,
- vnfrs,
- rpc_ip)
- tasks.append(task)
- if err:
- rpc_op.job_status_details = err.decode()
+ task, err = yield from self._apply_ns_config(
+ nsr,
+ vnfrs,
+ rpc_ip)
+ tasks.append(task)
+ if err:
+ rpc_op.job_status_details = err.decode()
self.job_manager.add_job(rpc_op, tasks)
else:
return self._api
@property
+ def agent_data(self):
+ return dict(
+ type=self.agent_type,
+ name=self.name,
+ host=self._ip_address,
+ port=self._port,
+ user=self._user,
+ secret=self._secret
+ )
+
def vnfr(self, vnfr_id):
try:
vnfr = self._juju_vnfs[vnfr_id].vnfr
return vnfr
+ def get_service_name(self, vnfr_id):
+ vnfr = self.vnfr(vnfr_id)
+ if vnfr and 'vnf_juju_name' in vnfr:
+ return vnfr['vnf_juju_name']
+ return None
+
def juju_log(self, level, name, log_str, *args):
if name is not None:
g_log_str = 'jujuCA:({}) {}'.format(name, log_str)
self._log.info("jujuCA:(%s) Action %s with params %s",
vnfr['vnf_juju_name'], action, params)
- resp = yield from self.api.execute_actions(action, params,
- service=service)
+ resp = yield from self.api.execute_action(action, params,
+ service=service)
if 'error' in resp:
- self._log.error("Applying initial config failed: {}".
- format(resp))
+ self._log.error("Applying initial config failed for {} with {}: {}".
+ format(action, params, resp))
return False
action_ids.append(resp['action']['tag'])
while pending:
pending = False
for act in action_ids:
- resp = yield from self.api.get_action_status(act, service=service)
+ resp = yield from self.api.get_action_status(act)
if 'error' in resp:
self._log.error("Initial config failed: {}".format(resp))
return False
'''
try:
- return self.api._get_action_status(execution_id)
+ self._log.debug("jujuCA: Get action status for {}".format(execution_id))
+ resp = self.api._get_action_status(execution_id)
+ self._log.debug("jujuCA: Action status: {}".format(resp))
+ return resp
except Exception as e:
self._log.error("jujuCA: Error fetching execution status for %s",
execution_id)