3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
26 from . import riftcm_config_plugin
27 import rift
.mano
.config_agent
30 gi
.require_version('RwDts', '1.0')
31 gi
.require_version('RwNsrYang', '1.0')
32 from gi
.repository
import (
37 class RiftCMRPCHandler(object):
38 """ The Network service Monitor DTS handler """
39 EXEC_NS_CONF_XPATH
= "I,/nsr:exec-ns-service-primitive"
40 EXEC_NS_CONF_O_XPATH
= "O,/nsr:exec-ns-service-primitive"
42 GET_NS_CONF_XPATH
= "I,/nsr:get-ns-service-primitive-values"
43 GET_NS_CONF_O_XPATH
= "O,/nsr:get-ns-service-primitive-values"
45 def __init__(self
, dts
, log
, loop
, project
, nsm
):
49 self
._project
= project
54 self
._get
_ns
_conf
_regh
= None
56 self
.job_manager
= rift
.mano
.config_agent
.ConfigAgentJobManager(dts
, log
, loop
,
59 self
._rift
_install
_dir
= os
.environ
['RIFT_INSTALL']
60 self
._rift
_var
_root
_dir
= os
.environ
['RIFT_VAR_ROOT']
64 """ Return registration handles """
65 return (self
._ns
_regh
, self
._vnf
_regh
, self
._get
_ns
_conf
_regh
)
69 """ Return the NS manager instance """
73 self
._log
.debug("De-register conman rpc handlers for project {}".
74 format(self
._project
))
75 for reg
in self
.reghs
:
80 self
.job_manager
.deregister()
81 self
.job_manager
= None
83 def prepare_meta(self
, rpc_ip
):
86 nsr_id
= rpc_ip
.nsr_id_ref
87 nsr
= self
._nsm
.nsrs
[nsr_id
]
89 for vnfr
in nsr
.vnfrs
:
91 # vnfr is a dict containing all attributes
96 raise ValueError("Record not found", str(e
))
99 def _get_ns_cfg_primitive(self
, nsr_id
, ns_cfg_name
):
100 nsd_msg
= yield from self
._nsm
.get_nsd(nsr_id
)
102 def get_nsd_cfg_prim(name
):
103 for ns_cfg_prim
in nsd_msg
.service_primitive
:
104 if ns_cfg_prim
.name
== name
:
108 ns_cfg_prim_msg
= get_nsd_cfg_prim(ns_cfg_name
)
109 if ns_cfg_prim_msg
is not None:
110 ret_cfg_prim_msg
= ns_cfg_prim_msg
.deep_copy()
111 return ret_cfg_prim_msg
115 def _get_vnf_primitive(self
, vnfr_id
, nsr_id
, primitive_name
):
116 vnf
= self
._nsm
.get_vnfr_msg(vnfr_id
, nsr_id
)
117 self
._log
.debug("vnfr_msg:%s", vnf
)
119 self
._log
.debug("nsr/vnf {}/{}, vnf_configuration: %s",
120 vnf
.vnf_configuration
)
121 for primitive
in vnf
.vnf_configuration
.config_primitive
:
122 if primitive
.name
== primitive_name
:
125 raise ValueError("Could not find nsr/vnf {}/{} primitive {}"
126 .format(nsr_id
, vnfr_id
, primitive_name
))
129 def _apply_ns_config(self
, agent_nsr
, agent_vnfrs
, rpc_ip
):
131 Hook: Runs the user defined script. Feeds all the necessary data
132 for the script thro' yaml file.
134 TBD: Add support to pass multiple CA accounts if configures
135 Remove apply_ns_config from the Config Agent Plugins
138 rpc_ip (YangInput_Nsr_ExecNsConfigPrimitive): The input data.
139 nsr (NetworkServiceRecord): Description
140 vnfrs (dict): VNFR ID => VirtualNetworkFunctionRecord
143 def xlate(tag
, tags
):
145 if tag
is None or tags
is None:
148 if re
.search('<.*>', tag
):
150 if tag
== '<rw_mgmt_ip>':
151 val
= tags
['rw_mgmt_ip']
152 except KeyError as e
:
153 self
._log
.info("RiftCA: Did not get a value for tag %s, e=%s",
157 def get_meta(agent_nsr
, agent_vnfrs
):
158 unit_names
, initial_params
, vnfr_index_map
, vnfr_data_map
= {}, {}, {}, {}
160 for vnfr_id
in agent_nsr
.vnfr_ids
:
161 vnfr
= agent_vnfrs
[vnfr_id
]
162 self
._log
.debug("CA-RPC: VNFR metadata: {}".format(vnfr
))
165 vnfr_index_map
[vnfr
.member_vnf_index
] = vnfr_id
166 vnfr_data_dict
= dict()
167 if 'mgmt_interface' in vnfr
.vnfr
:
168 vnfr_data_dict
['mgmt_interface'] = vnfr
.vnfr
['mgmt_interface']
170 vnfr_data_dict
['name'] = vnfr
.vnfr
['name']
171 vnfr_data_dict
['connection_point'] = []
172 if 'connection_point' in vnfr
.vnfr
:
173 for cp
in vnfr
.vnfr
['connection_point']:
174 cp_dict
= dict(name
= cp
['name'],
175 ip_address
= cp
['ip_address'],
176 connection_point_id
= cp
['connection_point_id'])
177 if 'virtual_cps' in cp
:
178 cp_info
['virtual_cps'] = [ {k
:v
for k
,v
in vcp
.items()
179 if k
in ['ip_address', 'mac_address']}
180 for vcp
in cp
['virtual_cps'] ]
182 vnfr_data_dict
['connection_point'].append(cp_dict
)
185 vnfr_data_dict
['vdur'] = []
186 vdu_data
= [(vdu
['name'], vdu
['management_ip'], vdu
['vm_management_ip'], vdu
['id'], vdu
['vdu_id_ref'])
187 for vdu
in vnfr
.vnfr
['vdur']]
189 for data
in vdu_data
:
190 data
= dict(zip(['name', 'management_ip', 'vm_management_ip', 'id', 'vdu_id_ref'] , data
))
191 vnfr_data_dict
['vdur'].append(data
)
193 vnfr_data_map
[vnfr
.member_vnf_index
] = vnfr_data_dict
194 except KeyError as e
:
195 self
._log
.warn("Error getting VDU data for VNFR {}".format(vnfr
))
198 unit_names
[vnfr_id
] = None
199 for config_plugin
in self
.nsm
.config_agent_plugins
:
200 name
= config_plugin
.get_service_name(vnfr_id
)
202 unit_names
[vnfr_id
] = name
205 # Flatten the data for simplicity
207 if 'initial_config_primitive' in vnfr
.vnf_configuration
:
208 for primitive
in vnfr
.vnf_configuration
['initial_config_primitive']:
209 if 'parameter' in primitive
:
210 for parameter
in primitive
['parameter']:
212 value
= xlate(parameter
['value'], vnfr
.tags
)
213 param_data
[parameter
['name']] = value
214 except KeyError as e
:
215 self
._log
.warn("Unable to parse the parameter{}: {}".
218 initial_params
[vnfr_id
] = param_data
221 return unit_names
, initial_params
, vnfr_index_map
, vnfr_data_map
223 def get_config_agent():
225 for config_plugin
in self
.nsm
.config_agent_plugins
:
226 if config_plugin
.agent_type
in [riftcm_config_plugin
.DEFAULT_CAP_TYPE
]:
227 ret
= config_plugin
.agent_data
229 # Currently the first non default plugin is returned
230 return config_plugin
.agent_data
233 unit_names
, init_data
, vnfr_index_map
, vnfr_data_map
= get_meta(agent_nsr
, agent_vnfrs
)
235 # The data consists of 4 sections
237 # 2. The input passed.
238 # 3. Juju unit names (keyed by vnfr ID).
239 # 4. Initial config data (keyed by vnfr ID).
241 data
['config_agent'] = get_config_agent()
242 data
["rpc_ip"] = rpc_ip
.as_dict()
243 data
["unit_names"] = unit_names
244 data
["init_config"] = init_data
245 data
["vnfr_index_map"] = vnfr_index_map
246 data
["vnfr_data_map"] = vnfr_data_map
249 with tempfile
.NamedTemporaryFile(delete
=False) as tmp_file
:
250 tmp_file
.write(yaml
.dump(data
, default_flow_style
=True)
253 self
._log
.debug("CA-RPC: Creating a temp file {} with input data: {}".
254 format(tmp_file
.name
, data
))
256 # Get the full path to the script
258 if rpc_ip
.user_defined_script
[0] == '/':
259 # The script has full path, use as is
260 script
= rpc_ip
.user_defined_script
262 script
= os
.path
.join(self
._rift
_var
_root
_dir
,
263 'launchpad/packages/nsd',
265 agent_nsr
.nsd_id
, 'scripts',
266 rpc_ip
.user_defined_script
)
267 self
._log
.debug("CA-RPC: Checking for script in %s", script
)
269 cmd
= "{} {}".format(script
, tmp_file
.name
)
270 self
._log
.debug("CA-RPC: Running the CMD: {}".format(cmd
))
272 process
= yield from asyncio
.create_subprocess_shell(
279 """ Register for NS monitoring read from dts """
280 yield from self
.job_manager
.register()
283 def on_ns_config_prepare(xact_info
, action
, ks_path
, msg
):
284 """ prepare callback from dts exec-ns-service-primitive"""
285 assert action
== rwdts
.QueryAction
.RPC
287 if not self
._project
.rpc_check(msg
, xact_info
):
291 rpc_op
= NsrYang
.YangOutput_Nsr_ExecNsServicePrimitive
.from_dict({
292 "triggered_by": rpc_ip
.triggered_by
,
293 "create_time": int(time
.time()),
294 "parameter": [param
.as_dict() for param
in rpc_ip
.parameter
],
295 "parameter_group": [pg
.as_dict() for pg
in rpc_ip
.parameter_group
]
299 ns_cfg_prim_name
= rpc_ip
.name
300 nsr_id
= rpc_ip
.nsr_id_ref
301 nsr
= self
._nsm
.nsrs
[nsr_id
]
303 nsd_cfg_prim_msg
= yield from self
._get
_ns
_cfg
_primitive
(nsr_id
, ns_cfg_prim_name
)
305 def find_nsd_vnf_prim_param_pool(vnf_index
, vnf_prim_name
, param_name
):
306 for vnf_prim_group
in nsd_cfg_prim_msg
.vnf_primitive_group
:
307 if vnf_prim_group
.member_vnf_index_ref
!= vnf_index
:
310 for vnf_prim
in vnf_prim_group
.primitive
:
311 if vnf_prim
.name
!= vnf_prim_name
:
315 nsr_param_pool
= nsr
.param_pools
[pool_param
.parameter_pool
]
317 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim
.parameter_pool
)
319 self
._log
.debug("Found parameter pool %s for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
320 nsr_param_pool
, vnf_index
, vnf_prim_name
, param_name
)
321 return nsr_param_pool
323 self
._log
.debug("Could not find parameter pool for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
324 vnf_index
, vnf_prim_name
, param_name
)
327 rpc_op
.nsr_id_ref
= nsr_id
328 rpc_op
.name
= ns_cfg_prim_name
330 nsr
, vnfrs
= self
.prepare_meta(rpc_ip
)
331 rpc_op
.job_id
= nsr
.job_id
333 # Copy over the NS level Parameters
335 # Give preference to user defined script.
336 if nsd_cfg_prim_msg
and nsd_cfg_prim_msg
.has_field("user_defined_script"):
337 rpc_ip
.user_defined_script
= nsd_cfg_prim_msg
.user_defined_script
339 task
= yield from self
._apply
_ns
_config
(
344 self
.job_manager
.add_job(rpc_op
, [task
])
346 # Otherwise create VNF primitives.
347 for vnf
in rpc_ip
.vnf_list
:
348 vnf_op
= rpc_op
.vnf_out_list
.add()
349 vnf_member_idx
= vnf
.member_vnf_index_ref
350 vnfr_id
= vnf
.vnfr_id_ref
351 vnf_op
.vnfr_id_ref
= vnfr_id
352 vnf_op
.member_vnf_index_ref
= vnf_member_idx
355 for primitive
in vnf
.vnf_primitive
:
356 op_primitive
= vnf_op
.vnf_out_primitive
.add()
357 op_primitive
.index
= idx
359 op_primitive
.name
= primitive
.name
360 op_primitive
.execution_id
= ''
361 op_primitive
.execution_status
= 'pending'
362 op_primitive
.execution_error_details
= ''
364 # Copy over the VNF pimitive's input parameters
365 for param
in primitive
.parameter
:
366 output_param
= op_primitive
.parameter
.add()
367 output_param
.name
= param
.name
368 output_param
.value
= param
.value
370 self
._log
.debug("%s:%s Got primitive %s:%s",
371 nsr_id
, vnf
.member_vnf_index_ref
, primitive
.name
, primitive
.parameter
)
373 nsd_vnf_primitive
= yield from self
._get
_vnf
_primitive
(
378 for param
in nsd_vnf_primitive
.parameter
:
379 if not param
.has_field("parameter_pool"):
383 nsr_param_pool
= nsr
.param_pools
[param
.parameter_pool
]
385 raise ValueError("Parameter pool %s does not exist in nsr" % param
.parameter_pool
)
386 nsr_param_pool
.add_used_value(param
.value
)
388 for config_plugin
in self
.nsm
.config_agent_plugins
:
389 # TODO: Execute these in separate threads to prevent blocking
390 yield from config_plugin
.vnf_config_primitive(nsr_id
,
395 self
.job_manager
.add_job(rpc_op
)
398 # Find Config Primitive
399 # For each vnf-primitive with parameter pool
400 # Find parameter pool
401 # Add used value to the pool
402 self
._log
.debug("RPC output: {}".format(rpc_op
))
403 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
,
404 RiftCMRPCHandler
.EXEC_NS_CONF_O_XPATH
,
406 except Exception as e
:
407 self
._log
.error("Exception processing the "
408 "exec-ns-service-primitive: {}".format(e
))
409 self
._log
.exception(e
)
410 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
,
411 RiftCMRPCHandler
.EXEC_NS_CONF_O_XPATH
)
414 def on_get_ns_config_values_prepare(xact_info
, action
, ks_path
, msg
):
415 assert action
== rwdts
.QueryAction
.RPC
417 if not self
._project
.rpc_check(msg
, xact_info
):
420 nsr_id
= msg
.nsr_id_ref
421 cfg_prim_name
= msg
.name
423 nsr
= self
._nsm
.nsrs
[nsr_id
]
425 rpc_op
= NsrYang
.YangOutput_Nsr_GetNsServicePrimitiveValues()
427 ns_cfg_prim_msg
= yield from self
._get
_ns
_cfg
_primitive
(nsr_id
, cfg_prim_name
)
429 # Get pool values for NS-level parameters
430 for ns_param
in ns_cfg_prim_msg
.parameter
:
431 if not ns_param
.has_field("parameter_pool"):
435 nsr_param_pool
= nsr
.param_pools
[ns_param
.parameter_pool
]
437 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param
.parameter_pool
)
439 new_ns_param
= rpc_op
.ns_parameter
.add()
440 new_ns_param
.name
= ns_param
.name
441 new_ns_param
.value
= str(nsr_param_pool
.get_next_unused_value())
443 # Get pool values for NS-level parameters
444 for vnf_prim_group
in ns_cfg_prim_msg
.vnf_primitive_group
:
445 rsp_prim_group
= rpc_op
.vnf_primitive_group
.add()
446 rsp_prim_group
.member_vnf_index_ref
= vnf_prim_group
.member_vnf_index_ref
447 if vnf_prim_group
.has_field("vnfd_id_ref"):
448 rsp_prim_group
.vnfd_id_ref
= vnf_prim_group
.vnfd_id_ref
450 for index
, vnf_prim
in enumerate(vnf_prim_group
.primitive
):
451 rsp_prim
= rsp_prim_group
.primitive
.add()
452 rsp_prim
.name
= vnf_prim
.name
453 rsp_prim
.index
= index
454 vnf_primitive
= yield from self
._get
_vnf
_primitive
(
455 vnf_prim_group
.vnfd_id_ref
,
459 for param
in vnf_primitive
.parameter
:
460 if not param
.has_field("parameter_pool"):
463 # Get pool values for NS-level parameters
464 for ns_param
in ns_cfg_prim_msg
.parameter
:
465 if not ns_param
.has_field("parameter_pool"):
469 nsr_param_pool
= nsr
.param_pools
[ns_param
.parameter_pool
]
471 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param
.parameter_pool
)
473 new_ns_param
= rpc_op
.ns_parameter
.add()
474 new_ns_param
.name
= ns_param
.name
475 new_ns_param
.value
= str(nsr_param_pool
.get_next_unused_value())
477 # Get pool values for NS-level parameters
478 for vnf_prim_group
in ns_cfg_prim_msg
.vnf_primitive_group
:
479 rsp_prim_group
= rpc_op
.vnf_primitive_group
.add()
480 rsp_prim_group
.member_vnf_index_ref
= vnf_prim_group
.member_vnf_index_ref
481 if vnf_prim_group
.has_field("vnfd_id_ref"):
482 rsp_prim_group
.vnfd_id_ref
= vnf_prim_group
.vnfd_id_ref
484 for index
, vnf_prim
in enumerate(vnf_prim_group
.primitive
):
485 rsp_prim
= rsp_prim_group
.primitive
.add()
486 rsp_prim
.name
= vnf_prim
.name
487 rsp_prim
.index
= index
488 vnf_primitive
= yield from self
._get
_vnf
_primitive
(
490 vnf_prim_group
.member_vnf_index_ref
,
493 for param
in vnf_primitive
.parameter
:
494 if not param
.has_field("parameter_pool"):
498 nsr_param_pool
= nsr
.param_pools
[param
.parameter_pool
]
500 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim
.parameter_pool
)
502 vnf_param
= rsp_prim
.parameter
.add()
503 vnf_param
.name
= param
.name
504 vnf_param
.value
= str(nsr_param_pool
.get_next_unused_value())
506 self
._log
.debug("RPC output: {}".format(rpc_op
))
507 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
,
508 RiftCMRPCHandler
.GET_NS_CONF_O_XPATH
, rpc_op
)
509 except Exception as e
:
510 self
._log
.error("Exception processing the "
511 "get-ns-service-primitive-values: {}".format(e
))
512 self
._log
.exception(e
)
513 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
,
514 RiftCMRPCHandler
.GET_NS_CONF_O_XPATH
)
516 hdl_ns
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_ns_config_prepare
,)
517 hdl_ns_get
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_get_ns_config_values_prepare
,)
519 with self
._dts
.group_create() as group
:
520 self
._ns
_regh
= group
.register(xpath
=RiftCMRPCHandler
.EXEC_NS_CONF_XPATH
,
522 flags
=rwdts
.Flag
.PUBLISHER
,
524 self
._get
_ns
_conf
_regh
= group
.register(xpath
=RiftCMRPCHandler
.GET_NS_CONF_XPATH
,
526 flags
=rwdts
.Flag
.PUBLISHER
,