update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / RiftCM_rpc.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import os
20 import re
21 import tempfile
22 import time
23 import yaml
24
25
26 from . import riftcm_config_plugin
27 import rift.mano.config_agent
28
29 import gi
30 gi.require_version('RwDts', '1.0')
31 gi.require_version('RwNsrYang', '1.0')
32 from gi.repository import (
33 RwDts as rwdts,
34 NsrYang,
35 )
36
37 class RiftCMRPCHandler(object):
38 """ The Network service Monitor DTS handler """
39 EXEC_NS_CONF_XPATH = "I,/nsr:exec-ns-service-primitive"
40 EXEC_NS_CONF_O_XPATH = "O,/nsr:exec-ns-service-primitive"
41
42 GET_NS_CONF_XPATH = "I,/nsr:get-ns-service-primitive-values"
43 GET_NS_CONF_O_XPATH = "O,/nsr:get-ns-service-primitive-values"
44
45 def __init__(self, dts, log, loop, project, nsm):
46 self._dts = dts
47 self._log = log
48 self._loop = loop
49 self._project = project
50 self._nsm = nsm
51
52 self._ns_regh = None
53 self._vnf_regh = None
54 self._get_ns_conf_regh = None
55
56 self.job_manager = rift.mano.config_agent.ConfigAgentJobManager(dts, log, loop,
57 project, nsm)
58
59 self._rift_install_dir = os.environ['RIFT_INSTALL']
60 self._rift_var_root_dir = os.environ['RIFT_VAR_ROOT']
61
62 @property
63 def reghs(self):
64 """ Return registration handles """
65 return (self._ns_regh, self._vnf_regh, self._get_ns_conf_regh)
66
67 @property
68 def nsm(self):
69 """ Return the NS manager instance """
70 return self._nsm
71
72 def deregister(self):
73 self._log.debug("De-register conman rpc handlers for project {}".
74 format(self._project))
75 for reg in self.reghs:
76 if reg:
77 reg.deregister()
78 reg = None
79
80 self.job_manager.deregister()
81 self.job_manager = None
82
83 def prepare_meta(self, rpc_ip):
84
85 try:
86 nsr_id = rpc_ip.nsr_id_ref
87 nsr = self._nsm.nsrs[nsr_id]
88 vnfrs = {}
89 for vnfr in nsr.vnfrs:
90 vnfr_id = vnfr.id
91 # vnfr is a dict containing all attributes
92 vnfrs[vnfr_id] = vnfr
93
94 return nsr, vnfrs
95 except KeyError as e:
96 raise ValueError("Record not found", str(e))
97
98 @asyncio.coroutine
99 def _get_ns_cfg_primitive(self, nsr_id, ns_cfg_name):
100 nsd_msg = yield from self._nsm.get_nsd(nsr_id)
101
102 def get_nsd_cfg_prim(name):
103 for ns_cfg_prim in nsd_msg.service_primitive:
104 if ns_cfg_prim.name == name:
105 return ns_cfg_prim
106 return None
107
108 ns_cfg_prim_msg = get_nsd_cfg_prim(ns_cfg_name)
109 if ns_cfg_prim_msg is not None:
110 ret_cfg_prim_msg = ns_cfg_prim_msg.deep_copy()
111 return ret_cfg_prim_msg
112 return None
113
114 @asyncio.coroutine
115 def _get_vnf_primitive(self, vnfr_id, nsr_id, primitive_name):
116 vnf = self._nsm.get_vnfr_msg(vnfr_id, nsr_id)
117 self._log.debug("vnfr_msg:%s", vnf)
118 if vnf:
119 self._log.debug("nsr/vnf {}/{}, vnf_configuration: %s",
120 vnf.vnf_configuration)
121 for primitive in vnf.vnf_configuration.config_primitive:
122 if primitive.name == primitive_name:
123 return primitive
124
125 raise ValueError("Could not find nsr/vnf {}/{} primitive {}"
126 .format(nsr_id, vnfr_id, primitive_name))
127
128 @asyncio.coroutine
129 def _apply_ns_config(self, agent_nsr, agent_vnfrs, rpc_ip):
130 """
131 Hook: Runs the user defined script. Feeds all the necessary data
132 for the script thro' yaml file.
133
134 TBD: Add support to pass multiple CA accounts if configures
135 Remove apply_ns_config from the Config Agent Plugins
136
137 Args:
138 rpc_ip (YangInput_Nsr_ExecNsConfigPrimitive): The input data.
139 nsr (NetworkServiceRecord): Description
140 vnfrs (dict): VNFR ID => VirtualNetworkFunctionRecord
141
142 """
143 def xlate(tag, tags):
144 # TBD
145 if tag is None or tags is None:
146 return tag
147 val = tag
148 if re.search('<.*>', tag):
149 try:
150 if tag == '<rw_mgmt_ip>':
151 val = tags['rw_mgmt_ip']
152 except KeyError as e:
153 self._log.info("RiftCA: Did not get a value for tag %s, e=%s",
154 tag, e)
155 return val
156
157 def get_meta(agent_nsr, agent_vnfrs):
158 unit_names, initial_params, vnfr_index_map, vnfr_data_map = {}, {}, {}, {}
159
160 for vnfr_id in agent_nsr.vnfr_ids:
161 vnfr = agent_vnfrs[vnfr_id]
162 self._log.debug("CA-RPC: VNFR metadata: {}".format(vnfr))
163
164 # index->vnfr ref
165 vnfr_index_map[vnfr.member_vnf_index] = vnfr_id
166 vnfr_data_dict = dict()
167 if 'mgmt_interface' in vnfr.vnfr:
168 vnfr_data_dict['mgmt_interface'] = vnfr.vnfr['mgmt_interface']
169
170 vnfr_data_dict['name'] = vnfr.vnfr['name']
171 vnfr_data_dict['connection_point'] = []
172 if 'connection_point' in vnfr.vnfr:
173 for cp in vnfr.vnfr['connection_point']:
174 cp_dict = dict(name = cp['name'],
175 ip_address = cp['ip_address'],
176 connection_point_id = cp['connection_point_id'])
177 if 'virtual_cps' in cp:
178 cp_info['virtual_cps'] = [ {k:v for k,v in vcp.items()
179 if k in ['ip_address', 'mac_address']}
180 for vcp in cp['virtual_cps'] ]
181
182 vnfr_data_dict['connection_point'].append(cp_dict)
183
184 try:
185 vnfr_data_dict['vdur'] = []
186 vdu_data = [(vdu['name'], vdu['management_ip'], vdu['vm_management_ip'], vdu['id'], vdu['vdu_id_ref'])
187 for vdu in vnfr.vnfr['vdur']]
188
189 for data in vdu_data:
190 data = dict(zip(['name', 'management_ip', 'vm_management_ip', 'id', 'vdu_id_ref'] , data))
191 vnfr_data_dict['vdur'].append(data)
192
193 vnfr_data_map[vnfr.member_vnf_index] = vnfr_data_dict
194 except KeyError as e:
195 self._log.warn("Error getting VDU data for VNFR {}".format(vnfr))
196
197 # Unit name
198 unit_names[vnfr_id] = None
199 for config_plugin in self.nsm.config_agent_plugins:
200 name = config_plugin.get_service_name(vnfr_id)
201 if name:
202 unit_names[vnfr_id] = name
203 break
204
205 # Flatten the data for simplicity
206 param_data = {}
207 if 'initial_config_primitive' in vnfr.vnf_configuration:
208 for primitive in vnfr.vnf_configuration['initial_config_primitive']:
209 if 'parameter' in primitive:
210 for parameter in primitive['parameter']:
211 try:
212 value = xlate(parameter['value'], vnfr.tags)
213 param_data[parameter['name']] = value
214 except KeyError as e:
215 self._log.warn("Unable to parse the parameter{}: {}".
216 format(parameter))
217
218 initial_params[vnfr_id] = param_data
219
220
221 return unit_names, initial_params, vnfr_index_map, vnfr_data_map
222
223 def get_config_agent():
224 ret = {}
225 for config_plugin in self.nsm.config_agent_plugins:
226 if config_plugin.agent_type in [riftcm_config_plugin.DEFAULT_CAP_TYPE]:
227 ret = config_plugin.agent_data
228 else:
229 # Currently the first non default plugin is returned
230 return config_plugin.agent_data
231 return ret
232
233 unit_names, init_data, vnfr_index_map, vnfr_data_map = get_meta(agent_nsr, agent_vnfrs)
234
235 # The data consists of 4 sections
236 # 1. Account data
237 # 2. The input passed.
238 # 3. Juju unit names (keyed by vnfr ID).
239 # 4. Initial config data (keyed by vnfr ID).
240 data = dict()
241 data['config_agent'] = get_config_agent()
242 data["rpc_ip"] = rpc_ip.as_dict()
243 data["unit_names"] = unit_names
244 data["init_config"] = init_data
245 data["vnfr_index_map"] = vnfr_index_map
246 data["vnfr_data_map"] = vnfr_data_map
247
248 tmp_file = None
249 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
250 tmp_file.write(yaml.dump(data, default_flow_style=True)
251 .encode("UTF-8"))
252
253 self._log.debug("CA-RPC: Creating a temp file {} with input data: {}".
254 format(tmp_file.name, data))
255
256 # Get the full path to the script
257 script = ''
258 if rpc_ip.user_defined_script[0] == '/':
259 # The script has full path, use as is
260 script = rpc_ip.user_defined_script
261 else:
262 script = os.path.join(self._rift_var_root_dir,
263 'launchpad/packages/nsd',
264 self._project.name,
265 agent_nsr.nsd_id, 'scripts',
266 rpc_ip.user_defined_script)
267 self._log.debug("CA-RPC: Checking for script in %s", script)
268
269 cmd = "{} {}".format(script, tmp_file.name)
270 self._log.debug("CA-RPC: Running the CMD: {}".format(cmd))
271
272 process = yield from asyncio.create_subprocess_shell(
273 cmd)
274
275 return process
276
277 @asyncio.coroutine
278 def register(self):
279 """ Register for NS monitoring read from dts """
280 yield from self.job_manager.register()
281
282 @asyncio.coroutine
283 def on_ns_config_prepare(xact_info, action, ks_path, msg):
284 """ prepare callback from dts exec-ns-service-primitive"""
285 assert action == rwdts.QueryAction.RPC
286
287 if not self._project.rpc_check(msg, xact_info):
288 return
289
290 rpc_ip = msg
291 rpc_op = NsrYang.YangOutput_Nsr_ExecNsServicePrimitive.from_dict({
292 "triggered_by": rpc_ip.triggered_by,
293 "create_time": int(time.time()),
294 "parameter": [param.as_dict() for param in rpc_ip.parameter],
295 "parameter_group": [pg.as_dict() for pg in rpc_ip.parameter_group]
296 })
297
298 try:
299 ns_cfg_prim_name = rpc_ip.name
300 nsr_id = rpc_ip.nsr_id_ref
301 nsr = self._nsm.nsrs[nsr_id]
302
303 nsd_cfg_prim_msg = yield from self._get_ns_cfg_primitive(nsr_id, ns_cfg_prim_name)
304
305 def find_nsd_vnf_prim_param_pool(vnf_index, vnf_prim_name, param_name):
306 for vnf_prim_group in nsd_cfg_prim_msg.vnf_primitive_group:
307 if vnf_prim_group.member_vnf_index_ref != vnf_index:
308 continue
309
310 for vnf_prim in vnf_prim_group.primitive:
311 if vnf_prim.name != vnf_prim_name:
312 continue
313
314 try:
315 nsr_param_pool = nsr.param_pools[pool_param.parameter_pool]
316 except KeyError:
317 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim.parameter_pool)
318
319 self._log.debug("Found parameter pool %s for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
320 nsr_param_pool, vnf_index, vnf_prim_name, param_name)
321 return nsr_param_pool
322
323 self._log.debug("Could not find parameter pool for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
324 vnf_index, vnf_prim_name, param_name)
325 return None
326
327 rpc_op.nsr_id_ref = nsr_id
328 rpc_op.name = ns_cfg_prim_name
329
330 nsr, vnfrs = self.prepare_meta(rpc_ip)
331 rpc_op.job_id = nsr.job_id
332
333 # Copy over the NS level Parameters
334
335 # Give preference to user defined script.
336 if nsd_cfg_prim_msg and nsd_cfg_prim_msg.has_field("user_defined_script"):
337 rpc_ip.user_defined_script = nsd_cfg_prim_msg.user_defined_script
338
339 task = yield from self._apply_ns_config(
340 nsr,
341 vnfrs,
342 rpc_ip)
343
344 self.job_manager.add_job(rpc_op, [task])
345 else:
346 # Otherwise create VNF primitives.
347 for vnf in rpc_ip.vnf_list:
348 vnf_op = rpc_op.vnf_out_list.add()
349 vnf_member_idx = vnf.member_vnf_index_ref
350 vnfr_id = vnf.vnfr_id_ref
351 vnf_op.vnfr_id_ref = vnfr_id
352 vnf_op.member_vnf_index_ref = vnf_member_idx
353
354 idx = 0
355 for primitive in vnf.vnf_primitive:
356 op_primitive = vnf_op.vnf_out_primitive.add()
357 op_primitive.index = idx
358 idx += 1
359 op_primitive.name = primitive.name
360 op_primitive.execution_id = ''
361 op_primitive.execution_status = 'pending'
362 op_primitive.execution_error_details = ''
363
364 # Copy over the VNF pimitive's input parameters
365 for param in primitive.parameter:
366 output_param = op_primitive.parameter.add()
367 output_param.name = param.name
368 output_param.value = param.value
369
370 self._log.debug("%s:%s Got primitive %s:%s",
371 nsr_id, vnf.member_vnf_index_ref, primitive.name, primitive.parameter)
372
373 nsd_vnf_primitive = yield from self._get_vnf_primitive(
374 vnfr_id,
375 nsr_id,
376 primitive.name
377 )
378 for param in nsd_vnf_primitive.parameter:
379 if not param.has_field("parameter_pool"):
380 continue
381
382 try:
383 nsr_param_pool = nsr.param_pools[param.parameter_pool]
384 except KeyError:
385 raise ValueError("Parameter pool %s does not exist in nsr" % param.parameter_pool)
386 nsr_param_pool.add_used_value(param.value)
387
388 for config_plugin in self.nsm.config_agent_plugins:
389 # TODO: Execute these in separate threads to prevent blocking
390 yield from config_plugin.vnf_config_primitive(nsr_id,
391 vnfr_id,
392 primitive,
393 op_primitive)
394
395 self.job_manager.add_job(rpc_op)
396
397 # Get NSD
398 # Find Config Primitive
399 # For each vnf-primitive with parameter pool
400 # Find parameter pool
401 # Add used value to the pool
402 self._log.debug("RPC output: {}".format(rpc_op))
403 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
404 RiftCMRPCHandler.EXEC_NS_CONF_O_XPATH,
405 rpc_op)
406 except Exception as e:
407 self._log.error("Exception processing the "
408 "exec-ns-service-primitive: {}".format(e))
409 self._log.exception(e)
410 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
411 RiftCMRPCHandler.EXEC_NS_CONF_O_XPATH)
412
413 @asyncio.coroutine
414 def on_get_ns_config_values_prepare(xact_info, action, ks_path, msg):
415 assert action == rwdts.QueryAction.RPC
416
417 if not self._project.rpc_check(msg, xact_info):
418 return
419
420 nsr_id = msg.nsr_id_ref
421 cfg_prim_name = msg.name
422 try:
423 nsr = self._nsm.nsrs[nsr_id]
424
425 rpc_op = NsrYang.YangOutput_Nsr_GetNsServicePrimitiveValues()
426
427 ns_cfg_prim_msg = yield from self._get_ns_cfg_primitive(nsr_id, cfg_prim_name)
428
429 # Get pool values for NS-level parameters
430 for ns_param in ns_cfg_prim_msg.parameter:
431 if not ns_param.has_field("parameter_pool"):
432 continue
433
434 try:
435 nsr_param_pool = nsr.param_pools[ns_param.parameter_pool]
436 except KeyError:
437 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param.parameter_pool)
438
439 new_ns_param = rpc_op.ns_parameter.add()
440 new_ns_param.name = ns_param.name
441 new_ns_param.value = str(nsr_param_pool.get_next_unused_value())
442
443 # Get pool values for NS-level parameters
444 for vnf_prim_group in ns_cfg_prim_msg.vnf_primitive_group:
445 rsp_prim_group = rpc_op.vnf_primitive_group.add()
446 rsp_prim_group.member_vnf_index_ref = vnf_prim_group.member_vnf_index_ref
447 if vnf_prim_group.has_field("vnfd_id_ref"):
448 rsp_prim_group.vnfd_id_ref = vnf_prim_group.vnfd_id_ref
449
450 for index, vnf_prim in enumerate(vnf_prim_group.primitive):
451 rsp_prim = rsp_prim_group.primitive.add()
452 rsp_prim.name = vnf_prim.name
453 rsp_prim.index = index
454 vnf_primitive = yield from self._get_vnf_primitive(
455 vnf_prim_group.vnfd_id_ref,
456 nsr_id,
457 vnf_prim.name
458 )
459 for param in vnf_primitive.parameter:
460 if not param.has_field("parameter_pool"):
461 continue
462
463 # Get pool values for NS-level parameters
464 for ns_param in ns_cfg_prim_msg.parameter:
465 if not ns_param.has_field("parameter_pool"):
466 continue
467
468 try:
469 nsr_param_pool = nsr.param_pools[ns_param.parameter_pool]
470 except KeyError:
471 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param.parameter_pool)
472
473 new_ns_param = rpc_op.ns_parameter.add()
474 new_ns_param.name = ns_param.name
475 new_ns_param.value = str(nsr_param_pool.get_next_unused_value())
476
477 # Get pool values for NS-level parameters
478 for vnf_prim_group in ns_cfg_prim_msg.vnf_primitive_group:
479 rsp_prim_group = rpc_op.vnf_primitive_group.add()
480 rsp_prim_group.member_vnf_index_ref = vnf_prim_group.member_vnf_index_ref
481 if vnf_prim_group.has_field("vnfd_id_ref"):
482 rsp_prim_group.vnfd_id_ref = vnf_prim_group.vnfd_id_ref
483
484 for index, vnf_prim in enumerate(vnf_prim_group.primitive):
485 rsp_prim = rsp_prim_group.primitive.add()
486 rsp_prim.name = vnf_prim.name
487 rsp_prim.index = index
488 vnf_primitive = yield from self._get_vnf_primitive(
489 nsr_id,
490 vnf_prim_group.member_vnf_index_ref,
491 vnf_prim.name
492 )
493 for param in vnf_primitive.parameter:
494 if not param.has_field("parameter_pool"):
495 continue
496
497 try:
498 nsr_param_pool = nsr.param_pools[param.parameter_pool]
499 except KeyError:
500 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim.parameter_pool)
501
502 vnf_param = rsp_prim.parameter.add()
503 vnf_param.name = param.name
504 vnf_param.value = str(nsr_param_pool.get_next_unused_value())
505
506 self._log.debug("RPC output: {}".format(rpc_op))
507 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
508 RiftCMRPCHandler.GET_NS_CONF_O_XPATH, rpc_op)
509 except Exception as e:
510 self._log.error("Exception processing the "
511 "get-ns-service-primitive-values: {}".format(e))
512 self._log.exception(e)
513 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
514 RiftCMRPCHandler.GET_NS_CONF_O_XPATH)
515
516 hdl_ns = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_ns_config_prepare,)
517 hdl_ns_get = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_get_ns_config_values_prepare,)
518
519 with self._dts.group_create() as group:
520 self._ns_regh = group.register(xpath=RiftCMRPCHandler.EXEC_NS_CONF_XPATH,
521 handler=hdl_ns,
522 flags=rwdts.Flag.PUBLISHER,
523 )
524 self._get_ns_conf_regh = group.register(xpath=RiftCMRPCHandler.GET_NS_CONF_XPATH,
525 handler=hdl_ns_get,
526 flags=rwdts.Flag.PUBLISHER,
527 )