Bug 54 : Fix PE NS instantiation error
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / RiftCM_rpc.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import os
20 import re
21 import tempfile
22 import time
23 import yaml
24
25
26 from . import riftcm_config_plugin
27 import rift.mano.config_agent
28
29 import gi
30 gi.require_version('RwDts', '1.0')
31 gi.require_version('RwNsrYang', '1.0')
32 from gi.repository import (
33 RwDts as rwdts,
34 NsrYang,
35 )
36
37 class RiftCMRPCHandler(object):
38 """ The Network service Monitor DTS handler """
39 EXEC_NS_CONF_XPATH = "I,/nsr:exec-ns-service-primitive"
40 EXEC_NS_CONF_O_XPATH = "O,/nsr:exec-ns-service-primitive"
41
42 GET_NS_CONF_XPATH = "I,/nsr:get-ns-service-primitive-values"
43 GET_NS_CONF_O_XPATH = "O,/nsr:get-ns-service-primitive-values"
44
45 def __init__(self, dts, log, loop, nsm):
46 self._dts = dts
47 self._log = log
48 self._loop = loop
49 self._nsm = nsm
50
51 self._ns_regh = None
52 self._vnf_regh = None
53 self._get_ns_conf_regh = None
54
55 self.job_manager = rift.mano.config_agent.ConfigAgentJobManager(dts, log, loop, nsm)
56
57 self._rift_install_dir = os.environ['RIFT_INSTALL']
58 self._rift_artif_dir = os.environ['RIFT_ARTIFACTS']
59
60 @property
61 def reghs(self):
62 """ Return registration handles """
63 return (self._ns_regh, self._vnf_regh, self._get_ns_conf_regh)
64
65 @property
66 def nsm(self):
67 """ Return the NS manager instance """
68 return self._nsm
69
70 def prepare_meta(self, rpc_ip):
71
72 try:
73 nsr_id = rpc_ip.nsr_id_ref
74 nsr = self._nsm.nsrs[nsr_id]
75 vnfrs = {}
76 for vnfr in nsr.vnfrs:
77 vnfr_id = vnfr.id
78 # vnfr is a dict containing all attributes
79 vnfrs[vnfr_id] = vnfr
80
81 return nsr, vnfrs
82 except KeyError as e:
83 raise ValueError("Record not found", str(e))
84
85 @asyncio.coroutine
86 def _get_ns_cfg_primitive(self, nsr_id, ns_cfg_name):
87 nsd_msg = yield from self._nsm.get_nsd(nsr_id)
88
89 def get_nsd_cfg_prim(name):
90 for ns_cfg_prim in nsd_msg.service_primitive:
91 if ns_cfg_prim.name == name:
92 return ns_cfg_prim
93 return None
94
95 ns_cfg_prim_msg = get_nsd_cfg_prim(ns_cfg_name)
96 if ns_cfg_prim_msg is not None:
97 ret_cfg_prim_msg = ns_cfg_prim_msg.deep_copy()
98 return ret_cfg_prim_msg
99 return None
100
101 @asyncio.coroutine
102 def _get_vnf_primitive(self, vnfr_id, nsr_id, primitive_name):
103 vnf = self._nsm.get_vnfr_msg(vnfr_id, nsr_id)
104 self._log.debug("vnfr_msg:%s", vnf)
105 if vnf:
106 self._log.debug("nsr/vnf {}/{}, vnf_configuration: %s",
107 vnf.vnf_configuration)
108 for primitive in vnf.vnf_configuration.service_primitive:
109 if primitive.name == primitive_name:
110 return primitive
111
112 raise ValueError("Could not find nsr/vnf {}/{} primitive {}"
113 .format(nsr_id, vnfr_id, primitive_name))
114
115 @asyncio.coroutine
116 def _apply_ns_config(self, agent_nsr, agent_vnfrs, rpc_ip):
117 """
118 Hook: Runs the user defined script. Feeds all the necessary data
119 for the script thro' yaml file.
120
121 TBD: Add support to pass multiple CA accounts if configures
122 Remove apply_ns_config from the Config Agent Plugins
123
124 Args:
125 rpc_ip (YangInput_Nsr_ExecNsConfigPrimitive): The input data.
126 nsr (NetworkServiceRecord): Description
127 vnfrs (dict): VNFR ID => VirtualNetworkFunctionRecord
128
129 """
130 def xlate(tag, tags):
131 # TBD
132 if tag is None or tags is None:
133 return tag
134 val = tag
135 if re.search('<.*>', tag):
136 try:
137 if tag == '<rw_mgmt_ip>':
138 val = tags['rw_mgmt_ip']
139 except KeyError as e:
140 self._log.info("RiftCA: Did not get a value for tag %s, e=%s",
141 tag, e)
142 return val
143
144 def get_meta(agent_nsr, agent_vnfrs):
145 unit_names, initial_params, vnfr_index_map, vnfr_data_map = {}, {}, {}, {}
146
147 for vnfr_id in agent_nsr.vnfr_ids:
148 vnfr = agent_vnfrs[vnfr_id]
149 self._log.debug("CA_RPC: VNFR metadata: {}".format(vnfr))
150
151 # index->vnfr ref
152 vnfr_index_map[vnfr.member_vnf_index] = vnfr_id
153 vnfr_data_dict = dict()
154 if 'mgmt_interface' in vnfr.vnfr:
155 vnfr_data_dict['mgmt_interface'] = vnfr.vnfr['mgmt_interface']
156
157 vnfr_data_dict['connection_point'] = []
158 if 'connection_point' in vnfr.vnfr:
159 for cp in vnfr.vnfr['connection_point']:
160 cp_dict = dict()
161 cp_dict['name'] = cp['name']
162 cp_dict['ip_address'] = cp['ip_address']
163 vnfr_data_dict['connection_point'].append(cp_dict)
164
165 try:
166 vnfr_data_dict['vdur'] = []
167 vdu_data = [(vdu['name'], vdu['management_ip'], vdu['vm_management_ip'], vdu['id'])
168 for vdu in vnfr.vnfr['vdur']]
169
170 for data in vdu_data:
171 data = dict(zip(['name', 'management_ip', 'vm_management_ip', 'id'] , data))
172 vnfr_data_dict['vdur'].append(data)
173
174 vnfr_data_map[vnfr.member_vnf_index] = vnfr_data_dict
175 except KeyError as e:
176 self._log.warn("Error getting VDU data for VNFR {}".format(vnfr))
177
178 # Unit name
179 unit_names[vnfr_id] = None
180 for config_plugin in self.nsm.config_agent_plugins:
181 name = config_plugin.get_service_name(vnfr_id)
182 if name:
183 unit_names[vnfr_id] = name
184 break
185
186 # Flatten the data for simplicity
187 param_data = {}
188 if 'initial_config_primitive' in vnfr.vnf_configuration:
189 for primitive in vnfr.vnf_configuration['initial_config_primitive']:
190 if 'parameter' in primitive:
191 for parameter in primitive['parameter']:
192 value = xlate(parameter['value'], vnfr.tags)
193 param_data[parameter.name] = value
194
195 initial_params[vnfr_id] = param_data
196
197
198 return unit_names, initial_params, vnfr_index_map, vnfr_data_map
199
200 def get_config_agent():
201 ret = {}
202 for config_plugin in self.nsm.config_agent_plugins:
203 if config_plugin.agent_type in [riftcm_config_plugin.DEFAULT_CAP_TYPE]:
204 ret = config_plugin.agent_data
205 else:
206 # Currently the first non default plugin is returned
207 return config_plugin.agent_data
208 return ret
209
210 unit_names, init_data, vnfr_index_map, vnf_data_map = get_meta(agent_nsr, agent_vnfrs)
211
212 # The data consists of 4 sections
213 # 1. Account data
214 # 2. The input passed.
215 # 3. Juju unit names (keyed by vnfr ID).
216 # 4. Initial config data (keyed by vnfr ID).
217 data = dict()
218 data['config_agent'] = get_config_agent()
219 data["rpc_ip"] = rpc_ip.as_dict()
220 data["unit_names"] = unit_names
221 data["init_config"] = init_data
222 data["vnfr_index_map"] = vnfr_index_map
223 data["vnfr_data_map"] = vnfr_data_map
224
225 tmp_file = None
226 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
227 tmp_file.write(yaml.dump(data, default_flow_style=True)
228 .encode("UTF-8"))
229
230 self._log.debug("CA_RPC: Creating a temp file {} with input data: {}".
231 format(tmp_file.name, data))
232
233 # Get the full path to the script
234 script = ''
235 if rpc_ip.user_defined_script[0] == '/':
236 # The script has full path, use as is
237 script = rpc_ip.user_defined_script
238 else:
239 script = os.path.join(self._rift_artif_dir, 'launchpad/libs', agent_nsr.id, 'scripts',
240 rpc_ip.user_defined_script)
241 self.log.debug("CA_RPC: Checking for script in %s", script)
242 if not os.path.exists(script):
243 script = os.path.join(self._rift_install_dir, 'usr/bin', rpc_ip.user_defined_script)
244
245 cmd = "{} {}".format(rpc_ip.user_defined_script, tmp_file.name)
246 self._log.debug("CA_RPC: Running the CMD: {}".format(cmd))
247
248 coro = asyncio.create_subprocess_shell(cmd, loop=self._loop,
249 stderr=asyncio.subprocess.PIPE)
250 process = yield from coro
251 err = yield from process.stderr.read()
252 task = self._loop.create_task(process.wait())
253
254 return task, err
255
256 @asyncio.coroutine
257 def register(self):
258 """ Register for NS monitoring read from dts """
259 yield from self.job_manager.register()
260
261 @asyncio.coroutine
262 def on_ns_config_prepare(xact_info, action, ks_path, msg):
263 """ prepare callback from dts exec-ns-service-primitive"""
264 assert action == rwdts.QueryAction.RPC
265 rpc_ip = msg
266 rpc_op = NsrYang.YangOutput_Nsr_ExecNsServicePrimitive.from_dict({
267 "triggered_by": rpc_ip.triggered_by,
268 "create_time": int(time.time()),
269 "parameter": [param.as_dict() for param in rpc_ip.parameter],
270 "parameter_group": [pg.as_dict() for pg in rpc_ip.parameter_group]
271 })
272
273 try:
274 ns_cfg_prim_name = rpc_ip.name
275 nsr_id = rpc_ip.nsr_id_ref
276 nsr = self._nsm.nsrs[nsr_id]
277
278 nsd_cfg_prim_msg = yield from self._get_ns_cfg_primitive(nsr_id, ns_cfg_prim_name)
279
280 def find_nsd_vnf_prim_param_pool(vnf_index, vnf_prim_name, param_name):
281 for vnf_prim_group in nsd_cfg_prim_msg.vnf_primitive_group:
282 if vnf_prim_group.member_vnf_index_ref != vnf_index:
283 continue
284
285 for vnf_prim in vnf_prim_group.primitive:
286 if vnf_prim.name != vnf_prim_name:
287 continue
288
289 try:
290 nsr_param_pool = nsr.param_pools[pool_param.parameter_pool]
291 except KeyError:
292 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim.parameter_pool)
293
294 self._log.debug("Found parameter pool %s for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
295 nsr_param_pool, vnf_index, vnf_prim_name, param_name)
296 return nsr_param_pool
297
298 self._log.debug("Could not find parameter pool for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
299 vnf_index, vnf_prim_name, param_name)
300 return None
301
302 rpc_op.nsr_id_ref = nsr_id
303 rpc_op.name = ns_cfg_prim_name
304
305 nsr, vnfrs = self.prepare_meta(rpc_ip)
306 rpc_op.job_id = nsr.job_id
307
308 # Copy over the NS level Parameters
309
310 # Give preference to user defined script.
311 if nsd_cfg_prim_msg and nsd_cfg_prim_msg.has_field("user_defined_script"):
312 rpc_ip.user_defined_script = nsd_cfg_prim_msg.user_defined_script
313
314 tasks = []
315 task, err = yield from self._apply_ns_config(
316 nsr,
317 vnfrs,
318 rpc_ip)
319 tasks.append(task)
320 if err:
321 rpc_op.job_status_details = err.decode()
322
323 self.job_manager.add_job(rpc_op, tasks)
324 else:
325 # Otherwise create VNF primitives.
326 for vnf in rpc_ip.vnf_list:
327 vnf_op = rpc_op.vnf_out_list.add()
328 vnf_member_idx = vnf.member_vnf_index_ref
329 vnfr_id = vnf.vnfr_id_ref
330 vnf_op.vnfr_id_ref = vnfr_id
331 vnf_op.member_vnf_index_ref = vnf_member_idx
332
333 idx = 0
334 for primitive in vnf.vnf_primitive:
335 op_primitive = vnf_op.vnf_out_primitive.add()
336 op_primitive.index = idx
337 idx += 1
338 op_primitive.name = primitive.name
339 op_primitive.execution_id = ''
340 op_primitive.execution_status = 'completed'
341 op_primitive.execution_error_details = ''
342
343 # Copy over the VNF pimitive's input parameters
344 for param in primitive.parameter:
345 output_param = op_primitive.parameter.add()
346 output_param.name = param.name
347 output_param.value = param.value
348
349 self._log.debug("%s:%s Got primitive %s:%s",
350 nsr_id, vnf.member_vnf_index_ref, primitive.name, primitive.parameter)
351
352 nsd_vnf_primitive = yield from self._get_vnf_primitive(
353 vnfr_id,
354 nsr_id,
355 primitive.name
356 )
357 for param in nsd_vnf_primitive.parameter:
358 if not param.has_field("parameter_pool"):
359 continue
360
361 try:
362 nsr_param_pool = nsr.param_pools[param.parameter_pool]
363 except KeyError:
364 raise ValueError("Parameter pool %s does not exist in nsr" % param.parameter_pool)
365 nsr_param_pool.add_used_value(param.value)
366
367 for config_plugin in self.nsm.config_agent_plugins:
368 yield from config_plugin.vnf_config_primitive(nsr_id,
369 vnfr_id,
370 primitive,
371 op_primitive)
372
373 self.job_manager.add_job(rpc_op)
374
375 # Get NSD
376 # Find Config Primitive
377 # For each vnf-primitive with parameter pool
378 # Find parameter pool
379 # Add used value to the pool
380 self._log.debug("RPC output: {}".format(rpc_op))
381 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
382 RiftCMRPCHandler.EXEC_NS_CONF_O_XPATH,
383 rpc_op)
384 except Exception as e:
385 self._log.error("Exception processing the "
386 "exec-ns-service-primitive: {}".format(e))
387 self._log.exception(e)
388 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
389 RiftCMRPCHandler.EXEC_NS_CONF_O_XPATH)
390
391 @asyncio.coroutine
392 def on_get_ns_config_values_prepare(xact_info, action, ks_path, msg):
393 assert action == rwdts.QueryAction.RPC
394 nsr_id = msg.nsr_id_ref
395 cfg_prim_name = msg.name
396 try:
397 nsr = self._nsm.nsrs[nsr_id]
398
399 rpc_op = NsrYang.YangOutput_Nsr_GetNsServicePrimitiveValues()
400
401 ns_cfg_prim_msg = yield from self._get_ns_cfg_primitive(nsr_id, cfg_prim_name)
402
403 # Get pool values for NS-level parameters
404 for ns_param in ns_cfg_prim_msg.parameter:
405 if not ns_param.has_field("parameter_pool"):
406 continue
407
408 try:
409 nsr_param_pool = nsr.param_pools[ns_param.parameter_pool]
410 except KeyError:
411 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param.parameter_pool)
412
413 new_ns_param = rpc_op.ns_parameter.add()
414 new_ns_param.name = ns_param.name
415 new_ns_param.value = str(nsr_param_pool.get_next_unused_value())
416
417 # Get pool values for NS-level parameters
418 for vnf_prim_group in ns_cfg_prim_msg.vnf_primitive_group:
419 rsp_prim_group = rpc_op.vnf_primitive_group.add()
420 rsp_prim_group.member_vnf_index_ref = vnf_prim_group.member_vnf_index_ref
421 if vnf_prim_group.has_field("vnfd_id_ref"):
422 rsp_prim_group.vnfd_id_ref = vnf_prim_group.vnfd_id_ref
423
424 for index, vnf_prim in enumerate(vnf_prim_group.primitive):
425 rsp_prim = rsp_prim_group.primitive.add()
426 rsp_prim.name = vnf_prim.name
427 rsp_prim.index = index
428 vnf_primitive = yield from self._get_vnf_primitive(
429 vnf_prim_group.vnfd_id_ref,
430 nsr_id,
431 vnf_prim.name
432 )
433 for param in vnf_primitive.parameter:
434 if not param.has_field("parameter_pool"):
435 continue
436
437 # Get pool values for NS-level parameters
438 for ns_param in ns_cfg_prim_msg.parameter:
439 if not ns_param.has_field("parameter_pool"):
440 continue
441
442 try:
443 nsr_param_pool = nsr.param_pools[ns_param.parameter_pool]
444 except KeyError:
445 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param.parameter_pool)
446
447 new_ns_param = rpc_op.ns_parameter.add()
448 new_ns_param.name = ns_param.name
449 new_ns_param.value = str(nsr_param_pool.get_next_unused_value())
450
451 # Get pool values for NS-level parameters
452 for vnf_prim_group in ns_cfg_prim_msg.vnf_primitive_group:
453 rsp_prim_group = rpc_op.vnf_primitive_group.add()
454 rsp_prim_group.member_vnf_index_ref = vnf_prim_group.member_vnf_index_ref
455 if vnf_prim_group.has_field("vnfd_id_ref"):
456 rsp_prim_group.vnfd_id_ref = vnf_prim_group.vnfd_id_ref
457
458 for index, vnf_prim in enumerate(vnf_prim_group.primitive):
459 rsp_prim = rsp_prim_group.primitive.add()
460 rsp_prim.name = vnf_prim.name
461 rsp_prim.index = index
462 vnf_primitive = yield from self._get_vnf_primitive(
463 nsr_id,
464 vnf_prim_group.member_vnf_index_ref,
465 vnf_prim.name
466 )
467 for param in vnf_primitive.parameter:
468 if not param.has_field("parameter_pool"):
469 continue
470
471 try:
472 nsr_param_pool = nsr.param_pools[param.parameter_pool]
473 except KeyError:
474 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim.parameter_pool)
475
476 vnf_param = rsp_prim.parameter.add()
477 vnf_param.name = param.name
478 vnf_param.value = str(nsr_param_pool.get_next_unused_value())
479
480 self._log.debug("RPC output: {}".format(rpc_op))
481 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
482 RiftCMRPCHandler.GET_NS_CONF_O_XPATH, rpc_op)
483 except Exception as e:
484 self._log.error("Exception processing the "
485 "get-ns-service-primitive-values: {}".format(e))
486 self._log.exception(e)
487 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
488 RiftCMRPCHandler.GET_NS_CONF_O_XPATH)
489
490 hdl_ns = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_ns_config_prepare,)
491 hdl_ns_get = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_get_ns_config_values_prepare,)
492
493 with self._dts.group_create() as group:
494 self._ns_regh = group.register(xpath=RiftCMRPCHandler.EXEC_NS_CONF_XPATH,
495 handler=hdl_ns,
496 flags=rwdts.Flag.PUBLISHER,
497 )
498 self._get_ns_conf_regh = group.register(xpath=RiftCMRPCHandler.GET_NS_CONF_XPATH,
499 handler=hdl_ns_get,
500 flags=rwdts.Flag.PUBLISHER,
501 )
502
503