X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwcm%2Fplugins%2Frwconman%2Frift%2Ftasklets%2Frwconmantasklet%2FRiftCM_rpc.py;h=4a8dab8d824a6f4cf857e6bb5d161c789e46aff7;hb=9636c0930ddd937c70c08ce693592993b25dd46b;hp=9155d84bf1d4132460d735379f0c9b6b31aaed6c;hpb=6f07e6f33f751ab4ffe624f6037f887b243bece2;p=osm%2FSO.git diff --git a/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/RiftCM_rpc.py b/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/RiftCM_rpc.py index 9155d84b..4a8dab8d 100644 --- a/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/RiftCM_rpc.py +++ b/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/RiftCM_rpc.py @@ -16,9 +16,16 @@ # import asyncio +import os +import re +import tempfile import time +import yaml + +from . import riftcm_config_plugin import rift.mano.config_agent + import gi gi.require_version('RwDts', '1.0') gi.require_version('RwNsrYang', '1.0') @@ -47,6 +54,9 @@ class RiftCMRPCHandler(object): self.job_manager = rift.mano.config_agent.ConfigAgentJobManager(dts, log, loop, nsm) + self._rift_install_dir = os.environ['RIFT_INSTALL'] + self._rift_artif_dir = os.environ['RIFT_ARTIFACTS'] + @property def reghs(self): """ Return registration handles """ @@ -102,6 +112,149 @@ class RiftCMRPCHandler(object): raise ValueError("Could not find nsr/vnf {}/{} primitive {}" .format(nsr_id, vnfr_id, primitive_name)) + @asyncio.coroutine + def _apply_ns_config(self, agent_nsr, agent_vnfrs, rpc_ip): + """ + Hook: Runs the user defined script. Feeds all the necessary data + for the script thro' yaml file. + + TBD: Add support to pass multiple CA accounts if configures + Remove apply_ns_config from the Config Agent Plugins + + Args: + rpc_ip (YangInput_Nsr_ExecNsConfigPrimitive): The input data. + nsr (NetworkServiceRecord): Description + vnfrs (dict): VNFR ID => VirtualNetworkFunctionRecord + + """ + def xlate(tag, tags): + # TBD + if tag is None or tags is None: + return tag + val = tag + if re.search('<.*>', tag): + try: + if tag == '': + val = tags['rw_mgmt_ip'] + except KeyError as e: + self._log.info("RiftCA: Did not get a value for tag %s, e=%s", + tag, e) + return val + + def get_meta(agent_nsr, agent_vnfrs): + unit_names, initial_params, vnfr_index_map, vnfr_data_map = {}, {}, {}, {} + + for vnfr_id in agent_nsr.vnfr_ids: + vnfr = agent_vnfrs[vnfr_id] + self._log.debug("CA-RPC: VNFR metadata: {}".format(vnfr)) + + # index->vnfr ref + vnfr_index_map[vnfr.member_vnf_index] = vnfr_id + vnfr_data_dict = dict() + if 'mgmt_interface' in vnfr.vnfr: + vnfr_data_dict['mgmt_interface'] = vnfr.vnfr['mgmt_interface'] + + vnfr_data_dict['connection_point'] = [] + if 'connection_point' in vnfr.vnfr: + for cp in vnfr.vnfr['connection_point']: + cp_dict = dict() + cp_dict['name'] = cp['name'] + cp_dict['ip_address'] = cp['ip_address'] + vnfr_data_dict['connection_point'].append(cp_dict) + + try: + vnfr_data_dict['vdur'] = [] + vdu_data = [(vdu['name'], vdu['management_ip'], vdu['vm_management_ip'], vdu['id'], vdu['vdu_id_ref']) + for vdu in vnfr.vnfr['vdur']] + + for data in vdu_data: + data = dict(zip(['name', 'management_ip', 'vm_management_ip', 'id', 'vdu_id_ref'] , data)) + vnfr_data_dict['vdur'].append(data) + + vnfr_data_map[vnfr.member_vnf_index] = vnfr_data_dict + except KeyError as e: + self._log.warn("Error getting VDU data for VNFR {}".format(vnfr)) + + # Unit name + unit_names[vnfr_id] = None + for config_plugin in self.nsm.config_agent_plugins: + name = config_plugin.get_service_name(vnfr_id) + if name: + unit_names[vnfr_id] = name + break + + # Flatten the data for simplicity + param_data = {} + if 'initial_config_primitive' in vnfr.vnf_configuration: + for primitive in vnfr.vnf_configuration['initial_config_primitive']: + if 'parameter' in primitive: + for parameter in primitive['parameter']: + try: + value = xlate(parameter['value'], vnfr.tags) + param_data[parameter['name']] = value + except KeyError as e: + self._log.warn("Unable to parse the parameter{}: {}". + format(parameter)) + + initial_params[vnfr_id] = param_data + + + return unit_names, initial_params, vnfr_index_map, vnfr_data_map + + def get_config_agent(): + ret = {} + for config_plugin in self.nsm.config_agent_plugins: + if config_plugin.agent_type in [riftcm_config_plugin.DEFAULT_CAP_TYPE]: + ret = config_plugin.agent_data + else: + # Currently the first non default plugin is returned + return config_plugin.agent_data + return ret + + unit_names, init_data, vnfr_index_map, vnfr_data_map = get_meta(agent_nsr, agent_vnfrs) + + # The data consists of 4 sections + # 1. Account data + # 2. The input passed. + # 3. Juju unit names (keyed by vnfr ID). + # 4. Initial config data (keyed by vnfr ID). + data = dict() + data['config_agent'] = get_config_agent() + data["rpc_ip"] = rpc_ip.as_dict() + data["unit_names"] = unit_names + data["init_config"] = init_data + data["vnfr_index_map"] = vnfr_index_map + data["vnfr_data_map"] = vnfr_data_map + + tmp_file = None + with tempfile.NamedTemporaryFile(delete=False) as tmp_file: + tmp_file.write(yaml.dump(data, default_flow_style=True) + .encode("UTF-8")) + + self._log.debug("CA-RPC: Creating a temp file {} with input data: {}". + format(tmp_file.name, data)) + + # Get the full path to the script + script = '' + if rpc_ip.user_defined_script[0] == '/': + # The script has full path, use as is + script = rpc_ip.user_defined_script + else: + script = os.path.join(self._rift_artif_dir, 'launchpad/packages/nsd', + agent_nsr.id, 'scripts', + rpc_ip.user_defined_script) + self._log.debug("CA-RPC: Checking for script in %s", script) + if not os.path.exists(script): + script = os.path.join(self._rift_install_dir, 'usr/bin', rpc_ip.user_defined_script) + + cmd = "{} {}".format(script, tmp_file.name) + self._log.debug("CA-RPC: Running the CMD: {}".format(cmd)) + + process = asyncio.create_subprocess_shell(cmd, loop=self._loop, + stderr=asyncio.subprocess.PIPE) + + return process + @asyncio.coroutine def register(self): """ Register for NS monitoring read from dts """ @@ -160,17 +313,12 @@ class RiftCMRPCHandler(object): if nsd_cfg_prim_msg and nsd_cfg_prim_msg.has_field("user_defined_script"): rpc_ip.user_defined_script = nsd_cfg_prim_msg.user_defined_script - tasks = [] - for config_plugin in self.nsm.config_agent_plugins: - task, err = yield from config_plugin.apply_ns_config( - nsr, - vnfrs, - rpc_ip) - tasks.append(task) - if err: - rpc_op.job_status_details = err.decode() - - self.job_manager.add_job(rpc_op, tasks) + task = yield from self._apply_ns_config( + nsr, + vnfrs, + rpc_ip) + + self.job_manager.add_job(rpc_op, [task]) else: # Otherwise create VNF primitives. for vnf in rpc_ip.vnf_list: @@ -180,8 +328,11 @@ class RiftCMRPCHandler(object): vnf_op.vnfr_id_ref = vnfr_id vnf_op.member_vnf_index_ref = vnf_member_idx + idx = 0 for primitive in vnf.vnf_primitive: op_primitive = vnf_op.vnf_out_primitive.add() + op_primitive.index = idx + idx += 1 op_primitive.name = primitive.name op_primitive.execution_id = '' op_primitive.execution_status = 'completed'