3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
26 from . import riftcm_config_plugin
27 import rift
.mano
.config_agent
30 gi
.require_version('RwDts', '1.0')
31 gi
.require_version('RwNsrYang', '1.0')
32 from gi
.repository
import (
37 class RiftCMRPCHandler(object):
38 """ The Network service Monitor DTS handler """
39 EXEC_NS_CONF_XPATH
= "I,/nsr:exec-ns-service-primitive"
40 EXEC_NS_CONF_O_XPATH
= "O,/nsr:exec-ns-service-primitive"
42 GET_NS_CONF_XPATH
= "I,/nsr:get-ns-service-primitive-values"
43 GET_NS_CONF_O_XPATH
= "O,/nsr:get-ns-service-primitive-values"
45 def __init__(self
, dts
, log
, loop
, project
, nsm
):
49 self
._project
= project
54 self
._get
_ns
_conf
_regh
= None
56 self
.job_manager
= rift
.mano
.config_agent
.ConfigAgentJobManager(dts
, log
, loop
,
59 self
._rift
_install
_dir
= os
.environ
['RIFT_INSTALL']
60 self
._rift
_artif
_dir
= os
.environ
['RIFT_ARTIFACTS']
64 """ Return registration handles """
65 return (self
._ns
_regh
, self
._vnf
_regh
, self
._get
_ns
_conf
_regh
)
69 """ Return the NS manager instance """
73 self
._log
.debug("De-register conman rpc handlers for project {}".
74 format(self
._project
))
75 for reg
in self
.reghs
:
80 self
.job_manager
.deregister()
82 def prepare_meta(self
, rpc_ip
):
85 nsr_id
= rpc_ip
.nsr_id_ref
86 nsr
= self
._nsm
.nsrs
[nsr_id
]
88 for vnfr
in nsr
.vnfrs
:
90 # vnfr is a dict containing all attributes
95 raise ValueError("Record not found", str(e
))
98 def _get_ns_cfg_primitive(self
, nsr_id
, ns_cfg_name
):
99 nsd_msg
= yield from self
._nsm
.get_nsd(nsr_id
)
101 def get_nsd_cfg_prim(name
):
102 for ns_cfg_prim
in nsd_msg
.service_primitive
:
103 if ns_cfg_prim
.name
== name
:
107 ns_cfg_prim_msg
= get_nsd_cfg_prim(ns_cfg_name
)
108 if ns_cfg_prim_msg
is not None:
109 ret_cfg_prim_msg
= ns_cfg_prim_msg
.deep_copy()
110 return ret_cfg_prim_msg
114 def _get_vnf_primitive(self
, vnfr_id
, nsr_id
, primitive_name
):
115 vnf
= self
._nsm
.get_vnfr_msg(vnfr_id
, nsr_id
)
116 self
._log
.debug("vnfr_msg:%s", vnf
)
118 self
._log
.debug("nsr/vnf {}/{}, vnf_configuration: %s",
119 vnf
.vnf_configuration
)
120 for primitive
in vnf
.vnf_configuration
.service_primitive
:
121 if primitive
.name
== primitive_name
:
124 raise ValueError("Could not find nsr/vnf {}/{} primitive {}"
125 .format(nsr_id
, vnfr_id
, primitive_name
))
128 def _apply_ns_config(self
, agent_nsr
, agent_vnfrs
, rpc_ip
):
130 Hook: Runs the user defined script. Feeds all the necessary data
131 for the script thro' yaml file.
133 TBD: Add support to pass multiple CA accounts if configures
134 Remove apply_ns_config from the Config Agent Plugins
137 rpc_ip (YangInput_Nsr_ExecNsConfigPrimitive): The input data.
138 nsr (NetworkServiceRecord): Description
139 vnfrs (dict): VNFR ID => VirtualNetworkFunctionRecord
142 def xlate(tag
, tags
):
144 if tag
is None or tags
is None:
147 if re
.search('<.*>', tag
):
149 if tag
== '<rw_mgmt_ip>':
150 val
= tags
['rw_mgmt_ip']
151 except KeyError as e
:
152 self
._log
.info("RiftCA: Did not get a value for tag %s, e=%s",
156 def get_meta(agent_nsr
, agent_vnfrs
):
157 unit_names
, initial_params
, vnfr_index_map
, vnfr_data_map
= {}, {}, {}, {}
159 for vnfr_id
in agent_nsr
.vnfr_ids
:
160 vnfr
= agent_vnfrs
[vnfr_id
]
161 self
._log
.debug("CA-RPC: VNFR metadata: {}".format(vnfr
))
164 vnfr_index_map
[vnfr
.member_vnf_index
] = vnfr_id
165 vnfr_data_dict
= dict()
166 if 'mgmt_interface' in vnfr
.vnfr
:
167 vnfr_data_dict
['mgmt_interface'] = vnfr
.vnfr
['mgmt_interface']
169 vnfr_data_dict
['connection_point'] = []
170 if 'connection_point' in vnfr
.vnfr
:
171 for cp
in vnfr
.vnfr
['connection_point']:
173 cp_dict
['name'] = cp
['name']
174 cp_dict
['ip_address'] = cp
['ip_address']
175 vnfr_data_dict
['connection_point'].append(cp_dict
)
178 vnfr_data_dict
['vdur'] = []
179 vdu_data
= [(vdu
['name'], vdu
['management_ip'], vdu
['vm_management_ip'], vdu
['id'])
180 for vdu
in vnfr
.vnfr
['vdur']]
182 for data
in vdu_data
:
183 data
= dict(zip(['name', 'management_ip', 'vm_management_ip', 'id'] , data
))
184 vnfr_data_dict
['vdur'].append(data
)
186 vnfr_data_map
[vnfr
.member_vnf_index
] = vnfr_data_dict
187 except KeyError as e
:
188 self
._log
.warn("Error getting VDU data for VNFR {}".format(vnfr
))
191 unit_names
[vnfr_id
] = None
192 for config_plugin
in self
.nsm
.config_agent_plugins
:
193 name
= config_plugin
.get_service_name(vnfr_id
)
195 unit_names
[vnfr_id
] = name
198 # Flatten the data for simplicity
200 if 'initial_config_primitive' in vnfr
.vnf_configuration
:
201 for primitive
in vnfr
.vnf_configuration
['initial_config_primitive']:
202 if 'parameter' in primitive
:
203 for parameter
in primitive
['parameter']:
205 value
= xlate(parameter
['value'], vnfr
.tags
)
206 param_data
[parameter
['name']] = value
207 except KeyError as e
:
208 self
._log
.warn("Unable to parse the parameter{}: {}".
211 initial_params
[vnfr_id
] = param_data
214 return unit_names
, initial_params
, vnfr_index_map
, vnfr_data_map
216 def get_config_agent():
218 for config_plugin
in self
.nsm
.config_agent_plugins
:
219 if config_plugin
.agent_type
in [riftcm_config_plugin
.DEFAULT_CAP_TYPE
]:
220 ret
= config_plugin
.agent_data
222 # Currently the first non default plugin is returned
223 return config_plugin
.agent_data
226 unit_names
, init_data
, vnfr_index_map
, vnfr_data_map
= get_meta(agent_nsr
, agent_vnfrs
)
228 # The data consists of 4 sections
230 # 2. The input passed.
231 # 3. Juju unit names (keyed by vnfr ID).
232 # 4. Initial config data (keyed by vnfr ID).
234 data
['config_agent'] = get_config_agent()
235 data
["rpc_ip"] = rpc_ip
.as_dict()
236 data
["unit_names"] = unit_names
237 data
["init_config"] = init_data
238 data
["vnfr_index_map"] = vnfr_index_map
239 data
["vnfr_data_map"] = vnfr_data_map
242 with tempfile
.NamedTemporaryFile(delete
=False) as tmp_file
:
243 tmp_file
.write(yaml
.dump(data
, default_flow_style
=True)
246 self
._log
.debug("CA-RPC: Creating a temp file {} with input data: {}".
247 format(tmp_file
.name
, data
))
249 # Get the full path to the script
251 if rpc_ip
.user_defined_script
[0] == '/':
252 # The script has full path, use as is
253 script
= rpc_ip
.user_defined_script
255 script
= os
.path
.join(self
._rift
_artif
_dir
, 'launchpad/packages/nsd',
256 agent_nsr
.id, 'scripts',
257 rpc_ip
.user_defined_script
)
258 self
._log
.debug("CA-RPC: Checking for script in %s", script
)
259 if not os
.path
.exists(script
):
260 script
= os
.path
.join(self
._rift
_install
_dir
, 'usr/bin', rpc_ip
.user_defined_script
)
262 cmd
= "{} {}".format(script
, tmp_file
.name
)
263 self
._log
.debug("CA-RPC: Running the CMD: {}".format(cmd
))
265 process
= asyncio
.create_subprocess_shell(cmd
, loop
=self
._loop
,
266 stderr
=asyncio
.subprocess
.PIPE
)
272 """ Register for NS monitoring read from dts """
273 yield from self
.job_manager
.register()
276 def on_ns_config_prepare(xact_info
, action
, ks_path
, msg
):
277 """ prepare callback from dts exec-ns-service-primitive"""
278 assert action
== rwdts
.QueryAction
.RPC
280 if not self
._project
.rpc_check(msg
, xact_info
):
284 rpc_op
= NsrYang
.YangOutput_Nsr_ExecNsServicePrimitive
.from_dict({
285 "triggered_by": rpc_ip
.triggered_by
,
286 "create_time": int(time
.time()),
287 "parameter": [param
.as_dict() for param
in rpc_ip
.parameter
],
288 "parameter_group": [pg
.as_dict() for pg
in rpc_ip
.parameter_group
]
292 ns_cfg_prim_name
= rpc_ip
.name
293 nsr_id
= rpc_ip
.nsr_id_ref
294 nsr
= self
._nsm
.nsrs
[nsr_id
]
296 nsd_cfg_prim_msg
= yield from self
._get
_ns
_cfg
_primitive
(nsr_id
, ns_cfg_prim_name
)
298 def find_nsd_vnf_prim_param_pool(vnf_index
, vnf_prim_name
, param_name
):
299 for vnf_prim_group
in nsd_cfg_prim_msg
.vnf_primitive_group
:
300 if vnf_prim_group
.member_vnf_index_ref
!= vnf_index
:
303 for vnf_prim
in vnf_prim_group
.primitive
:
304 if vnf_prim
.name
!= vnf_prim_name
:
308 nsr_param_pool
= nsr
.param_pools
[pool_param
.parameter_pool
]
310 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim
.parameter_pool
)
312 self
._log
.debug("Found parameter pool %s for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
313 nsr_param_pool
, vnf_index
, vnf_prim_name
, param_name
)
314 return nsr_param_pool
316 self
._log
.debug("Could not find parameter pool for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
317 vnf_index
, vnf_prim_name
, param_name
)
320 rpc_op
.nsr_id_ref
= nsr_id
321 rpc_op
.name
= ns_cfg_prim_name
323 nsr
, vnfrs
= self
.prepare_meta(rpc_ip
)
324 rpc_op
.job_id
= nsr
.job_id
326 # Copy over the NS level Parameters
328 # Give preference to user defined script.
329 if nsd_cfg_prim_msg
and nsd_cfg_prim_msg
.has_field("user_defined_script"):
330 rpc_ip
.user_defined_script
= nsd_cfg_prim_msg
.user_defined_script
332 task
= yield from self
._apply
_ns
_config
(
337 self
.job_manager
.add_job(rpc_op
, [task
])
339 # Otherwise create VNF primitives.
340 for vnf
in rpc_ip
.vnf_list
:
341 vnf_op
= rpc_op
.vnf_out_list
.add()
342 vnf_member_idx
= vnf
.member_vnf_index_ref
343 vnfr_id
= vnf
.vnfr_id_ref
344 vnf_op
.vnfr_id_ref
= vnfr_id
345 vnf_op
.member_vnf_index_ref
= vnf_member_idx
348 for primitive
in vnf
.vnf_primitive
:
349 op_primitive
= vnf_op
.vnf_out_primitive
.add()
350 op_primitive
.index
= idx
352 op_primitive
.name
= primitive
.name
353 op_primitive
.execution_id
= ''
354 op_primitive
.execution_status
= 'completed'
355 op_primitive
.execution_error_details
= ''
357 # Copy over the VNF pimitive's input parameters
358 for param
in primitive
.parameter
:
359 output_param
= op_primitive
.parameter
.add()
360 output_param
.name
= param
.name
361 output_param
.value
= param
.value
363 self
._log
.debug("%s:%s Got primitive %s:%s",
364 nsr_id
, vnf
.member_vnf_index_ref
, primitive
.name
, primitive
.parameter
)
366 nsd_vnf_primitive
= yield from self
._get
_vnf
_primitive
(
371 for param
in nsd_vnf_primitive
.parameter
:
372 if not param
.has_field("parameter_pool"):
376 nsr_param_pool
= nsr
.param_pools
[param
.parameter_pool
]
378 raise ValueError("Parameter pool %s does not exist in nsr" % param
.parameter_pool
)
379 nsr_param_pool
.add_used_value(param
.value
)
381 for config_plugin
in self
.nsm
.config_agent_plugins
:
382 yield from config_plugin
.vnf_config_primitive(nsr_id
,
387 self
.job_manager
.add_job(rpc_op
)
390 # Find Config Primitive
391 # For each vnf-primitive with parameter pool
392 # Find parameter pool
393 # Add used value to the pool
394 self
._log
.debug("RPC output: {}".format(rpc_op
))
395 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
,
396 RiftCMRPCHandler
.EXEC_NS_CONF_O_XPATH
,
398 except Exception as e
:
399 self
._log
.error("Exception processing the "
400 "exec-ns-service-primitive: {}".format(e
))
401 self
._log
.exception(e
)
402 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
,
403 RiftCMRPCHandler
.EXEC_NS_CONF_O_XPATH
)
406 def on_get_ns_config_values_prepare(xact_info
, action
, ks_path
, msg
):
407 assert action
== rwdts
.QueryAction
.RPC
409 if not self
._project
.rpc_check(msg
, xact_info
):
412 nsr_id
= msg
.nsr_id_ref
413 cfg_prim_name
= msg
.name
415 nsr
= self
._nsm
.nsrs
[nsr_id
]
417 rpc_op
= NsrYang
.YangOutput_Nsr_GetNsServicePrimitiveValues()
419 ns_cfg_prim_msg
= yield from self
._get
_ns
_cfg
_primitive
(nsr_id
, cfg_prim_name
)
421 # Get pool values for NS-level parameters
422 for ns_param
in ns_cfg_prim_msg
.parameter
:
423 if not ns_param
.has_field("parameter_pool"):
427 nsr_param_pool
= nsr
.param_pools
[ns_param
.parameter_pool
]
429 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param
.parameter_pool
)
431 new_ns_param
= rpc_op
.ns_parameter
.add()
432 new_ns_param
.name
= ns_param
.name
433 new_ns_param
.value
= str(nsr_param_pool
.get_next_unused_value())
435 # Get pool values for NS-level parameters
436 for vnf_prim_group
in ns_cfg_prim_msg
.vnf_primitive_group
:
437 rsp_prim_group
= rpc_op
.vnf_primitive_group
.add()
438 rsp_prim_group
.member_vnf_index_ref
= vnf_prim_group
.member_vnf_index_ref
439 if vnf_prim_group
.has_field("vnfd_id_ref"):
440 rsp_prim_group
.vnfd_id_ref
= vnf_prim_group
.vnfd_id_ref
442 for index
, vnf_prim
in enumerate(vnf_prim_group
.primitive
):
443 rsp_prim
= rsp_prim_group
.primitive
.add()
444 rsp_prim
.name
= vnf_prim
.name
445 rsp_prim
.index
= index
446 vnf_primitive
= yield from self
._get
_vnf
_primitive
(
447 vnf_prim_group
.vnfd_id_ref
,
451 for param
in vnf_primitive
.parameter
:
452 if not param
.has_field("parameter_pool"):
455 # Get pool values for NS-level parameters
456 for ns_param
in ns_cfg_prim_msg
.parameter
:
457 if not ns_param
.has_field("parameter_pool"):
461 nsr_param_pool
= nsr
.param_pools
[ns_param
.parameter_pool
]
463 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param
.parameter_pool
)
465 new_ns_param
= rpc_op
.ns_parameter
.add()
466 new_ns_param
.name
= ns_param
.name
467 new_ns_param
.value
= str(nsr_param_pool
.get_next_unused_value())
469 # Get pool values for NS-level parameters
470 for vnf_prim_group
in ns_cfg_prim_msg
.vnf_primitive_group
:
471 rsp_prim_group
= rpc_op
.vnf_primitive_group
.add()
472 rsp_prim_group
.member_vnf_index_ref
= vnf_prim_group
.member_vnf_index_ref
473 if vnf_prim_group
.has_field("vnfd_id_ref"):
474 rsp_prim_group
.vnfd_id_ref
= vnf_prim_group
.vnfd_id_ref
476 for index
, vnf_prim
in enumerate(vnf_prim_group
.primitive
):
477 rsp_prim
= rsp_prim_group
.primitive
.add()
478 rsp_prim
.name
= vnf_prim
.name
479 rsp_prim
.index
= index
480 vnf_primitive
= yield from self
._get
_vnf
_primitive
(
482 vnf_prim_group
.member_vnf_index_ref
,
485 for param
in vnf_primitive
.parameter
:
486 if not param
.has_field("parameter_pool"):
490 nsr_param_pool
= nsr
.param_pools
[param
.parameter_pool
]
492 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim
.parameter_pool
)
494 vnf_param
= rsp_prim
.parameter
.add()
495 vnf_param
.name
= param
.name
496 vnf_param
.value
= str(nsr_param_pool
.get_next_unused_value())
498 self
._log
.debug("RPC output: {}".format(rpc_op
))
499 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
,
500 RiftCMRPCHandler
.GET_NS_CONF_O_XPATH
, rpc_op
)
501 except Exception as e
:
502 self
._log
.error("Exception processing the "
503 "get-ns-service-primitive-values: {}".format(e
))
504 self
._log
.exception(e
)
505 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
,
506 RiftCMRPCHandler
.GET_NS_CONF_O_XPATH
)
508 hdl_ns
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_ns_config_prepare
,)
509 hdl_ns_get
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_get_ns_config_values_prepare
,)
511 with self
._dts
.group_create() as group
:
512 self
._ns
_regh
= group
.register(xpath
=RiftCMRPCHandler
.EXEC_NS_CONF_XPATH
,
514 flags
=rwdts
.Flag
.PUBLISHER
,
516 self
._get
_ns
_conf
_regh
= group
.register(xpath
=RiftCMRPCHandler
.GET_NS_CONF_XPATH
,
518 flags
=rwdts
.Flag
.PUBLISHER
,