RIFT-16103 : Adding VDU Name field in YAML File.
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / RiftCM_rpc.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import os
20 import re
21 import tempfile
22 import time
23 import yaml
24
25
26 from . import riftcm_config_plugin
27 import rift.mano.config_agent
28
29 import gi
30 gi.require_version('RwDts', '1.0')
31 gi.require_version('RwNsrYang', '1.0')
32 from gi.repository import (
33 RwDts as rwdts,
34 NsrYang,
35 )
36
37 class RiftCMRPCHandler(object):
38 """ The Network service Monitor DTS handler """
39 EXEC_NS_CONF_XPATH = "I,/nsr:exec-ns-service-primitive"
40 EXEC_NS_CONF_O_XPATH = "O,/nsr:exec-ns-service-primitive"
41
42 GET_NS_CONF_XPATH = "I,/nsr:get-ns-service-primitive-values"
43 GET_NS_CONF_O_XPATH = "O,/nsr:get-ns-service-primitive-values"
44
45 def __init__(self, dts, log, loop, nsm):
46 self._dts = dts
47 self._log = log
48 self._loop = loop
49 self._nsm = nsm
50
51 self._ns_regh = None
52 self._vnf_regh = None
53 self._get_ns_conf_regh = None
54
55 self.job_manager = rift.mano.config_agent.ConfigAgentJobManager(dts, log, loop, nsm)
56
57 self._rift_install_dir = os.environ['RIFT_INSTALL']
58 self._rift_artif_dir = os.environ['RIFT_ARTIFACTS']
59
60 @property
61 def reghs(self):
62 """ Return registration handles """
63 return (self._ns_regh, self._vnf_regh, self._get_ns_conf_regh)
64
65 @property
66 def nsm(self):
67 """ Return the NS manager instance """
68 return self._nsm
69
70 def prepare_meta(self, rpc_ip):
71
72 try:
73 nsr_id = rpc_ip.nsr_id_ref
74 nsr = self._nsm.nsrs[nsr_id]
75 vnfrs = {}
76 for vnfr in nsr.vnfrs:
77 vnfr_id = vnfr.id
78 # vnfr is a dict containing all attributes
79 vnfrs[vnfr_id] = vnfr
80
81 return nsr, vnfrs
82 except KeyError as e:
83 raise ValueError("Record not found", str(e))
84
85 @asyncio.coroutine
86 def _get_ns_cfg_primitive(self, nsr_id, ns_cfg_name):
87 nsd_msg = yield from self._nsm.get_nsd(nsr_id)
88
89 def get_nsd_cfg_prim(name):
90 for ns_cfg_prim in nsd_msg.service_primitive:
91 if ns_cfg_prim.name == name:
92 return ns_cfg_prim
93 return None
94
95 ns_cfg_prim_msg = get_nsd_cfg_prim(ns_cfg_name)
96 if ns_cfg_prim_msg is not None:
97 ret_cfg_prim_msg = ns_cfg_prim_msg.deep_copy()
98 return ret_cfg_prim_msg
99 return None
100
101 @asyncio.coroutine
102 def _get_vnf_primitive(self, vnfr_id, nsr_id, primitive_name):
103 vnf = self._nsm.get_vnfr_msg(vnfr_id, nsr_id)
104 self._log.debug("vnfr_msg:%s", vnf)
105 if vnf:
106 self._log.debug("nsr/vnf {}/{}, vnf_configuration: %s",
107 vnf.vnf_configuration)
108 for primitive in vnf.vnf_configuration.service_primitive:
109 if primitive.name == primitive_name:
110 return primitive
111
112 raise ValueError("Could not find nsr/vnf {}/{} primitive {}"
113 .format(nsr_id, vnfr_id, primitive_name))
114
115 @asyncio.coroutine
116 def _apply_ns_config(self, agent_nsr, agent_vnfrs, rpc_ip):
117 """
118 Hook: Runs the user defined script. Feeds all the necessary data
119 for the script thro' yaml file.
120
121 TBD: Add support to pass multiple CA accounts if configures
122 Remove apply_ns_config from the Config Agent Plugins
123
124 Args:
125 rpc_ip (YangInput_Nsr_ExecNsConfigPrimitive): The input data.
126 nsr (NetworkServiceRecord): Description
127 vnfrs (dict): VNFR ID => VirtualNetworkFunctionRecord
128
129 """
130 def xlate(tag, tags):
131 # TBD
132 if tag is None or tags is None:
133 return tag
134 val = tag
135 if re.search('<.*>', tag):
136 try:
137 if tag == '<rw_mgmt_ip>':
138 val = tags['rw_mgmt_ip']
139 except KeyError as e:
140 self._log.info("RiftCA: Did not get a value for tag %s, e=%s",
141 tag, e)
142 return val
143
144 def get_meta(agent_nsr, agent_vnfrs):
145 unit_names, initial_params, vnfr_index_map, vnfr_data_map = {}, {}, {}, {}
146
147 for vnfr_id in agent_nsr.vnfr_ids:
148 vnfr = agent_vnfrs[vnfr_id]
149 self._log.debug("CA-RPC: VNFR metadata: {}".format(vnfr))
150
151 # index->vnfr ref
152 vnfr_index_map[vnfr.member_vnf_index] = vnfr_id
153 vnfr_data_dict = dict()
154 if 'mgmt_interface' in vnfr.vnfr:
155 vnfr_data_dict['mgmt_interface'] = vnfr.vnfr['mgmt_interface']
156
157 vnfr_data_dict['connection_point'] = []
158 if 'connection_point' in vnfr.vnfr:
159 for cp in vnfr.vnfr['connection_point']:
160 cp_dict = dict()
161 cp_dict['name'] = cp['name']
162 cp_dict['ip_address'] = cp['ip_address']
163 vnfr_data_dict['connection_point'].append(cp_dict)
164
165 try:
166 vnfr_data_dict['vdur'] = []
167 vdu_data = [(vdu['name'], vdu['management_ip'], vdu['vm_management_ip'], vdu['id'], vdu['vdu_id_ref'])
168 for vdu in vnfr.vnfr['vdur']]
169
170 for data in vdu_data:
171 data = dict(zip(['name', 'management_ip', 'vm_management_ip', 'id', 'vdu_id_ref'] , data))
172 vnfr_data_dict['vdur'].append(data)
173
174 vnfr_data_map[vnfr.member_vnf_index] = vnfr_data_dict
175 except KeyError as e:
176 self._log.warn("Error getting VDU data for VNFR {}".format(vnfr))
177
178 # Unit name
179 unit_names[vnfr_id] = None
180 for config_plugin in self.nsm.config_agent_plugins:
181 name = config_plugin.get_service_name(vnfr_id)
182 if name:
183 unit_names[vnfr_id] = name
184 break
185
186 # Flatten the data for simplicity
187 param_data = {}
188 if 'initial_config_primitive' in vnfr.vnf_configuration:
189 for primitive in vnfr.vnf_configuration['initial_config_primitive']:
190 if 'parameter' in primitive:
191 for parameter in primitive['parameter']:
192 try:
193 value = xlate(parameter['value'], vnfr.tags)
194 param_data[parameter['name']] = value
195 except KeyError as e:
196 self._log.warn("Unable to parse the parameter{}: {}".
197 format(parameter))
198
199 initial_params[vnfr_id] = param_data
200
201
202 return unit_names, initial_params, vnfr_index_map, vnfr_data_map
203
204 def get_config_agent():
205 ret = {}
206 for config_plugin in self.nsm.config_agent_plugins:
207 if config_plugin.agent_type in [riftcm_config_plugin.DEFAULT_CAP_TYPE]:
208 ret = config_plugin.agent_data
209 else:
210 # Currently the first non default plugin is returned
211 return config_plugin.agent_data
212 return ret
213
214 unit_names, init_data, vnfr_index_map, vnfr_data_map = get_meta(agent_nsr, agent_vnfrs)
215
216 # The data consists of 4 sections
217 # 1. Account data
218 # 2. The input passed.
219 # 3. Juju unit names (keyed by vnfr ID).
220 # 4. Initial config data (keyed by vnfr ID).
221 data = dict()
222 data['config_agent'] = get_config_agent()
223 data["rpc_ip"] = rpc_ip.as_dict()
224 data["unit_names"] = unit_names
225 data["init_config"] = init_data
226 data["vnfr_index_map"] = vnfr_index_map
227 data["vnfr_data_map"] = vnfr_data_map
228
229 tmp_file = None
230 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
231 tmp_file.write(yaml.dump(data, default_flow_style=True)
232 .encode("UTF-8"))
233
234 self._log.debug("CA-RPC: Creating a temp file {} with input data: {}".
235 format(tmp_file.name, data))
236
237 # Get the full path to the script
238 script = ''
239 if rpc_ip.user_defined_script[0] == '/':
240 # The script has full path, use as is
241 script = rpc_ip.user_defined_script
242 else:
243 script = os.path.join(self._rift_artif_dir, 'launchpad/packages/nsd',
244 agent_nsr.id, 'scripts',
245 rpc_ip.user_defined_script)
246 self._log.debug("CA-RPC: Checking for script in %s", script)
247 if not os.path.exists(script):
248 script = os.path.join(self._rift_install_dir, 'usr/bin', rpc_ip.user_defined_script)
249
250 cmd = "{} {}".format(script, tmp_file.name)
251 self._log.debug("CA-RPC: Running the CMD: {}".format(cmd))
252
253 process = asyncio.create_subprocess_shell(cmd, loop=self._loop,
254 stderr=asyncio.subprocess.PIPE)
255
256 return process
257
258 @asyncio.coroutine
259 def register(self):
260 """ Register for NS monitoring read from dts """
261 yield from self.job_manager.register()
262
263 @asyncio.coroutine
264 def on_ns_config_prepare(xact_info, action, ks_path, msg):
265 """ prepare callback from dts exec-ns-service-primitive"""
266 assert action == rwdts.QueryAction.RPC
267 rpc_ip = msg
268 rpc_op = NsrYang.YangOutput_Nsr_ExecNsServicePrimitive.from_dict({
269 "triggered_by": rpc_ip.triggered_by,
270 "create_time": int(time.time()),
271 "parameter": [param.as_dict() for param in rpc_ip.parameter],
272 "parameter_group": [pg.as_dict() for pg in rpc_ip.parameter_group]
273 })
274
275 try:
276 ns_cfg_prim_name = rpc_ip.name
277 nsr_id = rpc_ip.nsr_id_ref
278 nsr = self._nsm.nsrs[nsr_id]
279
280 nsd_cfg_prim_msg = yield from self._get_ns_cfg_primitive(nsr_id, ns_cfg_prim_name)
281
282 def find_nsd_vnf_prim_param_pool(vnf_index, vnf_prim_name, param_name):
283 for vnf_prim_group in nsd_cfg_prim_msg.vnf_primitive_group:
284 if vnf_prim_group.member_vnf_index_ref != vnf_index:
285 continue
286
287 for vnf_prim in vnf_prim_group.primitive:
288 if vnf_prim.name != vnf_prim_name:
289 continue
290
291 try:
292 nsr_param_pool = nsr.param_pools[pool_param.parameter_pool]
293 except KeyError:
294 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim.parameter_pool)
295
296 self._log.debug("Found parameter pool %s for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
297 nsr_param_pool, vnf_index, vnf_prim_name, param_name)
298 return nsr_param_pool
299
300 self._log.debug("Could not find parameter pool for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
301 vnf_index, vnf_prim_name, param_name)
302 return None
303
304 rpc_op.nsr_id_ref = nsr_id
305 rpc_op.name = ns_cfg_prim_name
306
307 nsr, vnfrs = self.prepare_meta(rpc_ip)
308 rpc_op.job_id = nsr.job_id
309
310 # Copy over the NS level Parameters
311
312 # Give preference to user defined script.
313 if nsd_cfg_prim_msg and nsd_cfg_prim_msg.has_field("user_defined_script"):
314 rpc_ip.user_defined_script = nsd_cfg_prim_msg.user_defined_script
315
316 task = yield from self._apply_ns_config(
317 nsr,
318 vnfrs,
319 rpc_ip)
320
321 self.job_manager.add_job(rpc_op, [task])
322 else:
323 # Otherwise create VNF primitives.
324 for vnf in rpc_ip.vnf_list:
325 vnf_op = rpc_op.vnf_out_list.add()
326 vnf_member_idx = vnf.member_vnf_index_ref
327 vnfr_id = vnf.vnfr_id_ref
328 vnf_op.vnfr_id_ref = vnfr_id
329 vnf_op.member_vnf_index_ref = vnf_member_idx
330
331 idx = 0
332 for primitive in vnf.vnf_primitive:
333 op_primitive = vnf_op.vnf_out_primitive.add()
334 op_primitive.index = idx
335 idx += 1
336 op_primitive.name = primitive.name
337 op_primitive.execution_id = ''
338 op_primitive.execution_status = 'completed'
339 op_primitive.execution_error_details = ''
340
341 # Copy over the VNF pimitive's input parameters
342 for param in primitive.parameter:
343 output_param = op_primitive.parameter.add()
344 output_param.name = param.name
345 output_param.value = param.value
346
347 self._log.debug("%s:%s Got primitive %s:%s",
348 nsr_id, vnf.member_vnf_index_ref, primitive.name, primitive.parameter)
349
350 nsd_vnf_primitive = yield from self._get_vnf_primitive(
351 vnfr_id,
352 nsr_id,
353 primitive.name
354 )
355 for param in nsd_vnf_primitive.parameter:
356 if not param.has_field("parameter_pool"):
357 continue
358
359 try:
360 nsr_param_pool = nsr.param_pools[param.parameter_pool]
361 except KeyError:
362 raise ValueError("Parameter pool %s does not exist in nsr" % param.parameter_pool)
363 nsr_param_pool.add_used_value(param.value)
364
365 for config_plugin in self.nsm.config_agent_plugins:
366 yield from config_plugin.vnf_config_primitive(nsr_id,
367 vnfr_id,
368 primitive,
369 op_primitive)
370
371 self.job_manager.add_job(rpc_op)
372
373 # Get NSD
374 # Find Config Primitive
375 # For each vnf-primitive with parameter pool
376 # Find parameter pool
377 # Add used value to the pool
378 self._log.debug("RPC output: {}".format(rpc_op))
379 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
380 RiftCMRPCHandler.EXEC_NS_CONF_O_XPATH,
381 rpc_op)
382 except Exception as e:
383 self._log.error("Exception processing the "
384 "exec-ns-service-primitive: {}".format(e))
385 self._log.exception(e)
386 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
387 RiftCMRPCHandler.EXEC_NS_CONF_O_XPATH)
388
389 @asyncio.coroutine
390 def on_get_ns_config_values_prepare(xact_info, action, ks_path, msg):
391 assert action == rwdts.QueryAction.RPC
392 nsr_id = msg.nsr_id_ref
393 cfg_prim_name = msg.name
394 try:
395 nsr = self._nsm.nsrs[nsr_id]
396
397 rpc_op = NsrYang.YangOutput_Nsr_GetNsServicePrimitiveValues()
398
399 ns_cfg_prim_msg = yield from self._get_ns_cfg_primitive(nsr_id, cfg_prim_name)
400
401 # Get pool values for NS-level parameters
402 for ns_param in ns_cfg_prim_msg.parameter:
403 if not ns_param.has_field("parameter_pool"):
404 continue
405
406 try:
407 nsr_param_pool = nsr.param_pools[ns_param.parameter_pool]
408 except KeyError:
409 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param.parameter_pool)
410
411 new_ns_param = rpc_op.ns_parameter.add()
412 new_ns_param.name = ns_param.name
413 new_ns_param.value = str(nsr_param_pool.get_next_unused_value())
414
415 # Get pool values for NS-level parameters
416 for vnf_prim_group in ns_cfg_prim_msg.vnf_primitive_group:
417 rsp_prim_group = rpc_op.vnf_primitive_group.add()
418 rsp_prim_group.member_vnf_index_ref = vnf_prim_group.member_vnf_index_ref
419 if vnf_prim_group.has_field("vnfd_id_ref"):
420 rsp_prim_group.vnfd_id_ref = vnf_prim_group.vnfd_id_ref
421
422 for index, vnf_prim in enumerate(vnf_prim_group.primitive):
423 rsp_prim = rsp_prim_group.primitive.add()
424 rsp_prim.name = vnf_prim.name
425 rsp_prim.index = index
426 vnf_primitive = yield from self._get_vnf_primitive(
427 vnf_prim_group.vnfd_id_ref,
428 nsr_id,
429 vnf_prim.name
430 )
431 for param in vnf_primitive.parameter:
432 if not param.has_field("parameter_pool"):
433 continue
434
435 # Get pool values for NS-level parameters
436 for ns_param in ns_cfg_prim_msg.parameter:
437 if not ns_param.has_field("parameter_pool"):
438 continue
439
440 try:
441 nsr_param_pool = nsr.param_pools[ns_param.parameter_pool]
442 except KeyError:
443 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param.parameter_pool)
444
445 new_ns_param = rpc_op.ns_parameter.add()
446 new_ns_param.name = ns_param.name
447 new_ns_param.value = str(nsr_param_pool.get_next_unused_value())
448
449 # Get pool values for NS-level parameters
450 for vnf_prim_group in ns_cfg_prim_msg.vnf_primitive_group:
451 rsp_prim_group = rpc_op.vnf_primitive_group.add()
452 rsp_prim_group.member_vnf_index_ref = vnf_prim_group.member_vnf_index_ref
453 if vnf_prim_group.has_field("vnfd_id_ref"):
454 rsp_prim_group.vnfd_id_ref = vnf_prim_group.vnfd_id_ref
455
456 for index, vnf_prim in enumerate(vnf_prim_group.primitive):
457 rsp_prim = rsp_prim_group.primitive.add()
458 rsp_prim.name = vnf_prim.name
459 rsp_prim.index = index
460 vnf_primitive = yield from self._get_vnf_primitive(
461 nsr_id,
462 vnf_prim_group.member_vnf_index_ref,
463 vnf_prim.name
464 )
465 for param in vnf_primitive.parameter:
466 if not param.has_field("parameter_pool"):
467 continue
468
469 try:
470 nsr_param_pool = nsr.param_pools[param.parameter_pool]
471 except KeyError:
472 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim.parameter_pool)
473
474 vnf_param = rsp_prim.parameter.add()
475 vnf_param.name = param.name
476 vnf_param.value = str(nsr_param_pool.get_next_unused_value())
477
478 self._log.debug("RPC output: {}".format(rpc_op))
479 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
480 RiftCMRPCHandler.GET_NS_CONF_O_XPATH, rpc_op)
481 except Exception as e:
482 self._log.error("Exception processing the "
483 "get-ns-service-primitive-values: {}".format(e))
484 self._log.exception(e)
485 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
486 RiftCMRPCHandler.GET_NS_CONF_O_XPATH)
487
488 hdl_ns = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_ns_config_prepare,)
489 hdl_ns_get = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_get_ns_config_values_prepare,)
490
491 with self._dts.group_create() as group:
492 self._ns_regh = group.register(xpath=RiftCMRPCHandler.EXEC_NS_CONF_XPATH,
493 handler=hdl_ns,
494 flags=rwdts.Flag.PUBLISHER,
495 )
496 self._get_ns_conf_regh = group.register(xpath=RiftCMRPCHandler.GET_NS_CONF_XPATH,
497 handler=hdl_ns_get,
498 flags=rwdts.Flag.PUBLISHER,
499 )
500
501