Merge from OSM SO master
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / RiftCM_rpc.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import os
20 import re
21 import tempfile
22 import time
23 import yaml
24
25
26 from . import riftcm_config_plugin
27 import rift.mano.config_agent
28
29 import gi
30 gi.require_version('RwDts', '1.0')
31 gi.require_version('RwNsrYang', '1.0')
32 from gi.repository import (
33 RwDts as rwdts,
34 NsrYang,
35 )
36
37 class RiftCMRPCHandler(object):
38 """ The Network service Monitor DTS handler """
39 EXEC_NS_CONF_XPATH = "I,/nsr:exec-ns-service-primitive"
40 EXEC_NS_CONF_O_XPATH = "O,/nsr:exec-ns-service-primitive"
41
42 GET_NS_CONF_XPATH = "I,/nsr:get-ns-service-primitive-values"
43 GET_NS_CONF_O_XPATH = "O,/nsr:get-ns-service-primitive-values"
44
45 def __init__(self, dts, log, loop, project, nsm):
46 self._dts = dts
47 self._log = log
48 self._loop = loop
49 self._project = project
50 self._nsm = nsm
51
52 self._ns_regh = None
53 self._vnf_regh = None
54 self._get_ns_conf_regh = None
55
56 self.job_manager = rift.mano.config_agent.ConfigAgentJobManager(dts, log, loop,
57 project, nsm)
58
59 self._rift_install_dir = os.environ['RIFT_INSTALL']
60 self._rift_artif_dir = os.environ['RIFT_ARTIFACTS']
61
62 @property
63 def reghs(self):
64 """ Return registration handles """
65 return (self._ns_regh, self._vnf_regh, self._get_ns_conf_regh)
66
67 @property
68 def nsm(self):
69 """ Return the NS manager instance """
70 return self._nsm
71
72 def deregister(self):
73 self._log.debug("De-register conman rpc handlers for project {}".
74 format(self._project))
75 for reg in self.reghs:
76 if reg:
77 reg.deregister()
78 reg = None
79
80 self.job_manager.deregister()
81
82 def prepare_meta(self, rpc_ip):
83
84 try:
85 nsr_id = rpc_ip.nsr_id_ref
86 nsr = self._nsm.nsrs[nsr_id]
87 vnfrs = {}
88 for vnfr in nsr.vnfrs:
89 vnfr_id = vnfr.id
90 # vnfr is a dict containing all attributes
91 vnfrs[vnfr_id] = vnfr
92
93 return nsr, vnfrs
94 except KeyError as e:
95 raise ValueError("Record not found", str(e))
96
97 @asyncio.coroutine
98 def _get_ns_cfg_primitive(self, nsr_id, ns_cfg_name):
99 nsd_msg = yield from self._nsm.get_nsd(nsr_id)
100
101 def get_nsd_cfg_prim(name):
102 for ns_cfg_prim in nsd_msg.service_primitive:
103 if ns_cfg_prim.name == name:
104 return ns_cfg_prim
105 return None
106
107 ns_cfg_prim_msg = get_nsd_cfg_prim(ns_cfg_name)
108 if ns_cfg_prim_msg is not None:
109 ret_cfg_prim_msg = ns_cfg_prim_msg.deep_copy()
110 return ret_cfg_prim_msg
111 return None
112
113 @asyncio.coroutine
114 def _get_vnf_primitive(self, vnfr_id, nsr_id, primitive_name):
115 vnf = self._nsm.get_vnfr_msg(vnfr_id, nsr_id)
116 self._log.debug("vnfr_msg:%s", vnf)
117 if vnf:
118 self._log.debug("nsr/vnf {}/{}, vnf_configuration: %s",
119 vnf.vnf_configuration)
120 for primitive in vnf.vnf_configuration.service_primitive:
121 if primitive.name == primitive_name:
122 return primitive
123
124 raise ValueError("Could not find nsr/vnf {}/{} primitive {}"
125 .format(nsr_id, vnfr_id, primitive_name))
126
127 @asyncio.coroutine
128 def _apply_ns_config(self, agent_nsr, agent_vnfrs, rpc_ip):
129 """
130 Hook: Runs the user defined script. Feeds all the necessary data
131 for the script thro' yaml file.
132
133 TBD: Add support to pass multiple CA accounts if configures
134 Remove apply_ns_config from the Config Agent Plugins
135
136 Args:
137 rpc_ip (YangInput_Nsr_ExecNsConfigPrimitive): The input data.
138 nsr (NetworkServiceRecord): Description
139 vnfrs (dict): VNFR ID => VirtualNetworkFunctionRecord
140
141 """
142 def xlate(tag, tags):
143 # TBD
144 if tag is None or tags is None:
145 return tag
146 val = tag
147 if re.search('<.*>', tag):
148 try:
149 if tag == '<rw_mgmt_ip>':
150 val = tags['rw_mgmt_ip']
151 except KeyError as e:
152 self._log.info("RiftCA: Did not get a value for tag %s, e=%s",
153 tag, e)
154 return val
155
156 def get_meta(agent_nsr, agent_vnfrs):
157 unit_names, initial_params, vnfr_index_map, vnfr_data_map = {}, {}, {}, {}
158
159 for vnfr_id in agent_nsr.vnfr_ids:
160 vnfr = agent_vnfrs[vnfr_id]
161 self._log.debug("CA-RPC: VNFR metadata: {}".format(vnfr))
162
163 # index->vnfr ref
164 vnfr_index_map[vnfr.member_vnf_index] = vnfr_id
165 vnfr_data_dict = dict()
166 if 'mgmt_interface' in vnfr.vnfr:
167 vnfr_data_dict['mgmt_interface'] = vnfr.vnfr['mgmt_interface']
168
169 vnfr_data_dict['connection_point'] = []
170 if 'connection_point' in vnfr.vnfr:
171 for cp in vnfr.vnfr['connection_point']:
172 cp_dict = dict()
173 cp_dict['name'] = cp['name']
174 cp_dict['ip_address'] = cp['ip_address']
175 vnfr_data_dict['connection_point'].append(cp_dict)
176
177 try:
178 vnfr_data_dict['vdur'] = []
179 vdu_data = [(vdu['name'], vdu['management_ip'], vdu['vm_management_ip'], vdu['id'])
180 for vdu in vnfr.vnfr['vdur']]
181
182 for data in vdu_data:
183 data = dict(zip(['name', 'management_ip', 'vm_management_ip', 'id'] , data))
184 vnfr_data_dict['vdur'].append(data)
185
186 vnfr_data_map[vnfr.member_vnf_index] = vnfr_data_dict
187 except KeyError as e:
188 self._log.warn("Error getting VDU data for VNFR {}".format(vnfr))
189
190 # Unit name
191 unit_names[vnfr_id] = None
192 for config_plugin in self.nsm.config_agent_plugins:
193 name = config_plugin.get_service_name(vnfr_id)
194 if name:
195 unit_names[vnfr_id] = name
196 break
197
198 # Flatten the data for simplicity
199 param_data = {}
200 if 'initial_config_primitive' in vnfr.vnf_configuration:
201 for primitive in vnfr.vnf_configuration['initial_config_primitive']:
202 if 'parameter' in primitive:
203 for parameter in primitive['parameter']:
204 try:
205 value = xlate(parameter['value'], vnfr.tags)
206 param_data[parameter['name']] = value
207 except KeyError as e:
208 self._log.warn("Unable to parse the parameter{}: {}".
209 format(parameter))
210
211 initial_params[vnfr_id] = param_data
212
213
214 return unit_names, initial_params, vnfr_index_map, vnfr_data_map
215
216 def get_config_agent():
217 ret = {}
218 for config_plugin in self.nsm.config_agent_plugins:
219 if config_plugin.agent_type in [riftcm_config_plugin.DEFAULT_CAP_TYPE]:
220 ret = config_plugin.agent_data
221 else:
222 # Currently the first non default plugin is returned
223 return config_plugin.agent_data
224 return ret
225
226 unit_names, init_data, vnfr_index_map, vnfr_data_map = get_meta(agent_nsr, agent_vnfrs)
227
228 # The data consists of 4 sections
229 # 1. Account data
230 # 2. The input passed.
231 # 3. Juju unit names (keyed by vnfr ID).
232 # 4. Initial config data (keyed by vnfr ID).
233 data = dict()
234 data['config_agent'] = get_config_agent()
235 data["rpc_ip"] = rpc_ip.as_dict()
236 data["unit_names"] = unit_names
237 data["init_config"] = init_data
238 data["vnfr_index_map"] = vnfr_index_map
239 data["vnfr_data_map"] = vnfr_data_map
240
241 tmp_file = None
242 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
243 tmp_file.write(yaml.dump(data, default_flow_style=True)
244 .encode("UTF-8"))
245
246 self._log.debug("CA-RPC: Creating a temp file {} with input data: {}".
247 format(tmp_file.name, data))
248
249 # Get the full path to the script
250 script = ''
251 if rpc_ip.user_defined_script[0] == '/':
252 # The script has full path, use as is
253 script = rpc_ip.user_defined_script
254 else:
255 script = os.path.join(self._rift_artif_dir, 'launchpad/packages/nsd',
256 agent_nsr.id, 'scripts',
257 rpc_ip.user_defined_script)
258 self._log.debug("CA-RPC: Checking for script in %s", script)
259 if not os.path.exists(script):
260 script = os.path.join(self._rift_install_dir, 'usr/bin', rpc_ip.user_defined_script)
261
262 cmd = "{} {}".format(script, tmp_file.name)
263 self._log.debug("CA-RPC: Running the CMD: {}".format(cmd))
264
265 process = asyncio.create_subprocess_shell(cmd, loop=self._loop,
266 stderr=asyncio.subprocess.PIPE)
267
268 return process
269
270 @asyncio.coroutine
271 def register(self):
272 """ Register for NS monitoring read from dts """
273 yield from self.job_manager.register()
274
275 @asyncio.coroutine
276 def on_ns_config_prepare(xact_info, action, ks_path, msg):
277 """ prepare callback from dts exec-ns-service-primitive"""
278 assert action == rwdts.QueryAction.RPC
279
280 if not self._project.rpc_check(msg, xact_info):
281 return
282
283 rpc_ip = msg
284 rpc_op = NsrYang.YangOutput_Nsr_ExecNsServicePrimitive.from_dict({
285 "triggered_by": rpc_ip.triggered_by,
286 "create_time": int(time.time()),
287 "parameter": [param.as_dict() for param in rpc_ip.parameter],
288 "parameter_group": [pg.as_dict() for pg in rpc_ip.parameter_group]
289 })
290
291 try:
292 ns_cfg_prim_name = rpc_ip.name
293 nsr_id = rpc_ip.nsr_id_ref
294 nsr = self._nsm.nsrs[nsr_id]
295
296 nsd_cfg_prim_msg = yield from self._get_ns_cfg_primitive(nsr_id, ns_cfg_prim_name)
297
298 def find_nsd_vnf_prim_param_pool(vnf_index, vnf_prim_name, param_name):
299 for vnf_prim_group in nsd_cfg_prim_msg.vnf_primitive_group:
300 if vnf_prim_group.member_vnf_index_ref != vnf_index:
301 continue
302
303 for vnf_prim in vnf_prim_group.primitive:
304 if vnf_prim.name != vnf_prim_name:
305 continue
306
307 try:
308 nsr_param_pool = nsr.param_pools[pool_param.parameter_pool]
309 except KeyError:
310 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim.parameter_pool)
311
312 self._log.debug("Found parameter pool %s for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
313 nsr_param_pool, vnf_index, vnf_prim_name, param_name)
314 return nsr_param_pool
315
316 self._log.debug("Could not find parameter pool for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
317 vnf_index, vnf_prim_name, param_name)
318 return None
319
320 rpc_op.nsr_id_ref = nsr_id
321 rpc_op.name = ns_cfg_prim_name
322
323 nsr, vnfrs = self.prepare_meta(rpc_ip)
324 rpc_op.job_id = nsr.job_id
325
326 # Copy over the NS level Parameters
327
328 # Give preference to user defined script.
329 if nsd_cfg_prim_msg and nsd_cfg_prim_msg.has_field("user_defined_script"):
330 rpc_ip.user_defined_script = nsd_cfg_prim_msg.user_defined_script
331
332 task = yield from self._apply_ns_config(
333 nsr,
334 vnfrs,
335 rpc_ip)
336
337 self.job_manager.add_job(rpc_op, [task])
338 else:
339 # Otherwise create VNF primitives.
340 for vnf in rpc_ip.vnf_list:
341 vnf_op = rpc_op.vnf_out_list.add()
342 vnf_member_idx = vnf.member_vnf_index_ref
343 vnfr_id = vnf.vnfr_id_ref
344 vnf_op.vnfr_id_ref = vnfr_id
345 vnf_op.member_vnf_index_ref = vnf_member_idx
346
347 idx = 0
348 for primitive in vnf.vnf_primitive:
349 op_primitive = vnf_op.vnf_out_primitive.add()
350 op_primitive.index = idx
351 idx += 1
352 op_primitive.name = primitive.name
353 op_primitive.execution_id = ''
354 op_primitive.execution_status = 'completed'
355 op_primitive.execution_error_details = ''
356
357 # Copy over the VNF pimitive's input parameters
358 for param in primitive.parameter:
359 output_param = op_primitive.parameter.add()
360 output_param.name = param.name
361 output_param.value = param.value
362
363 self._log.debug("%s:%s Got primitive %s:%s",
364 nsr_id, vnf.member_vnf_index_ref, primitive.name, primitive.parameter)
365
366 nsd_vnf_primitive = yield from self._get_vnf_primitive(
367 vnfr_id,
368 nsr_id,
369 primitive.name
370 )
371 for param in nsd_vnf_primitive.parameter:
372 if not param.has_field("parameter_pool"):
373 continue
374
375 try:
376 nsr_param_pool = nsr.param_pools[param.parameter_pool]
377 except KeyError:
378 raise ValueError("Parameter pool %s does not exist in nsr" % param.parameter_pool)
379 nsr_param_pool.add_used_value(param.value)
380
381 for config_plugin in self.nsm.config_agent_plugins:
382 yield from config_plugin.vnf_config_primitive(nsr_id,
383 vnfr_id,
384 primitive,
385 op_primitive)
386
387 self.job_manager.add_job(rpc_op)
388
389 # Get NSD
390 # Find Config Primitive
391 # For each vnf-primitive with parameter pool
392 # Find parameter pool
393 # Add used value to the pool
394 self._log.debug("RPC output: {}".format(rpc_op))
395 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
396 RiftCMRPCHandler.EXEC_NS_CONF_O_XPATH,
397 rpc_op)
398 except Exception as e:
399 self._log.error("Exception processing the "
400 "exec-ns-service-primitive: {}".format(e))
401 self._log.exception(e)
402 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
403 RiftCMRPCHandler.EXEC_NS_CONF_O_XPATH)
404
405 @asyncio.coroutine
406 def on_get_ns_config_values_prepare(xact_info, action, ks_path, msg):
407 assert action == rwdts.QueryAction.RPC
408
409 if not self._project.rpc_check(msg, xact_info):
410 return
411
412 nsr_id = msg.nsr_id_ref
413 cfg_prim_name = msg.name
414 try:
415 nsr = self._nsm.nsrs[nsr_id]
416
417 rpc_op = NsrYang.YangOutput_Nsr_GetNsServicePrimitiveValues()
418
419 ns_cfg_prim_msg = yield from self._get_ns_cfg_primitive(nsr_id, cfg_prim_name)
420
421 # Get pool values for NS-level parameters
422 for ns_param in ns_cfg_prim_msg.parameter:
423 if not ns_param.has_field("parameter_pool"):
424 continue
425
426 try:
427 nsr_param_pool = nsr.param_pools[ns_param.parameter_pool]
428 except KeyError:
429 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param.parameter_pool)
430
431 new_ns_param = rpc_op.ns_parameter.add()
432 new_ns_param.name = ns_param.name
433 new_ns_param.value = str(nsr_param_pool.get_next_unused_value())
434
435 # Get pool values for NS-level parameters
436 for vnf_prim_group in ns_cfg_prim_msg.vnf_primitive_group:
437 rsp_prim_group = rpc_op.vnf_primitive_group.add()
438 rsp_prim_group.member_vnf_index_ref = vnf_prim_group.member_vnf_index_ref
439 if vnf_prim_group.has_field("vnfd_id_ref"):
440 rsp_prim_group.vnfd_id_ref = vnf_prim_group.vnfd_id_ref
441
442 for index, vnf_prim in enumerate(vnf_prim_group.primitive):
443 rsp_prim = rsp_prim_group.primitive.add()
444 rsp_prim.name = vnf_prim.name
445 rsp_prim.index = index
446 vnf_primitive = yield from self._get_vnf_primitive(
447 vnf_prim_group.vnfd_id_ref,
448 nsr_id,
449 vnf_prim.name
450 )
451 for param in vnf_primitive.parameter:
452 if not param.has_field("parameter_pool"):
453 continue
454
455 # Get pool values for NS-level parameters
456 for ns_param in ns_cfg_prim_msg.parameter:
457 if not ns_param.has_field("parameter_pool"):
458 continue
459
460 try:
461 nsr_param_pool = nsr.param_pools[ns_param.parameter_pool]
462 except KeyError:
463 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param.parameter_pool)
464
465 new_ns_param = rpc_op.ns_parameter.add()
466 new_ns_param.name = ns_param.name
467 new_ns_param.value = str(nsr_param_pool.get_next_unused_value())
468
469 # Get pool values for NS-level parameters
470 for vnf_prim_group in ns_cfg_prim_msg.vnf_primitive_group:
471 rsp_prim_group = rpc_op.vnf_primitive_group.add()
472 rsp_prim_group.member_vnf_index_ref = vnf_prim_group.member_vnf_index_ref
473 if vnf_prim_group.has_field("vnfd_id_ref"):
474 rsp_prim_group.vnfd_id_ref = vnf_prim_group.vnfd_id_ref
475
476 for index, vnf_prim in enumerate(vnf_prim_group.primitive):
477 rsp_prim = rsp_prim_group.primitive.add()
478 rsp_prim.name = vnf_prim.name
479 rsp_prim.index = index
480 vnf_primitive = yield from self._get_vnf_primitive(
481 nsr_id,
482 vnf_prim_group.member_vnf_index_ref,
483 vnf_prim.name
484 )
485 for param in vnf_primitive.parameter:
486 if not param.has_field("parameter_pool"):
487 continue
488
489 try:
490 nsr_param_pool = nsr.param_pools[param.parameter_pool]
491 except KeyError:
492 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim.parameter_pool)
493
494 vnf_param = rsp_prim.parameter.add()
495 vnf_param.name = param.name
496 vnf_param.value = str(nsr_param_pool.get_next_unused_value())
497
498 self._log.debug("RPC output: {}".format(rpc_op))
499 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
500 RiftCMRPCHandler.GET_NS_CONF_O_XPATH, rpc_op)
501 except Exception as e:
502 self._log.error("Exception processing the "
503 "get-ns-service-primitive-values: {}".format(e))
504 self._log.exception(e)
505 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
506 RiftCMRPCHandler.GET_NS_CONF_O_XPATH)
507
508 hdl_ns = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_ns_config_prepare,)
509 hdl_ns_get = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_get_ns_config_values_prepare,)
510
511 with self._dts.group_create() as group:
512 self._ns_regh = group.register(xpath=RiftCMRPCHandler.EXEC_NS_CONF_XPATH,
513 handler=hdl_ns,
514 flags=rwdts.Flag.PUBLISHER,
515 )
516 self._get_ns_conf_regh = group.register(xpath=RiftCMRPCHandler.GET_NS_CONF_XPATH,
517 handler=hdl_ns_get,
518 flags=rwdts.Flag.PUBLISHER,
519 )
520
521