Merge "CLI for OSM"
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / RiftCM_rpc.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import os
20 import re
21 import tempfile
22 import time
23 import yaml
24
25
26 from . import riftcm_config_plugin
27 import rift.mano.config_agent
28
29 import gi
30 gi.require_version('RwDts', '1.0')
31 gi.require_version('RwNsrYang', '1.0')
32 from gi.repository import (
33 RwDts as rwdts,
34 NsrYang,
35 )
36
37 class RiftCMRPCHandler(object):
38 """ The Network service Monitor DTS handler """
39 EXEC_NS_CONF_XPATH = "I,/nsr:exec-ns-service-primitive"
40 EXEC_NS_CONF_O_XPATH = "O,/nsr:exec-ns-service-primitive"
41
42 GET_NS_CONF_XPATH = "I,/nsr:get-ns-service-primitive-values"
43 GET_NS_CONF_O_XPATH = "O,/nsr:get-ns-service-primitive-values"
44
45 def __init__(self, dts, log, loop, nsm):
46 self._dts = dts
47 self._log = log
48 self._loop = loop
49 self._nsm = nsm
50
51 self._ns_regh = None
52 self._vnf_regh = None
53 self._get_ns_conf_regh = None
54
55 self.job_manager = rift.mano.config_agent.ConfigAgentJobManager(dts, log, loop, nsm)
56
57 self._rift_install_dir = os.environ['RIFT_INSTALL']
58 self._rift_artif_dir = os.environ['RIFT_ARTIFACTS']
59
60 @property
61 def reghs(self):
62 """ Return registration handles """
63 return (self._ns_regh, self._vnf_regh, self._get_ns_conf_regh)
64
65 @property
66 def nsm(self):
67 """ Return the NS manager instance """
68 return self._nsm
69
70 def prepare_meta(self, rpc_ip):
71
72 try:
73 nsr_id = rpc_ip.nsr_id_ref
74 nsr = self._nsm.nsrs[nsr_id]
75 vnfrs = {}
76 for vnfr in nsr.vnfrs:
77 vnfr_id = vnfr.id
78 # vnfr is a dict containing all attributes
79 vnfrs[vnfr_id] = vnfr
80
81 return nsr, vnfrs
82 except KeyError as e:
83 raise ValueError("Record not found", str(e))
84
85 @asyncio.coroutine
86 def _get_ns_cfg_primitive(self, nsr_id, ns_cfg_name):
87 nsd_msg = yield from self._nsm.get_nsd(nsr_id)
88
89 def get_nsd_cfg_prim(name):
90 for ns_cfg_prim in nsd_msg.service_primitive:
91 if ns_cfg_prim.name == name:
92 return ns_cfg_prim
93 return None
94
95 ns_cfg_prim_msg = get_nsd_cfg_prim(ns_cfg_name)
96 if ns_cfg_prim_msg is not None:
97 ret_cfg_prim_msg = ns_cfg_prim_msg.deep_copy()
98 return ret_cfg_prim_msg
99 return None
100
101 @asyncio.coroutine
102 def _get_vnf_primitive(self, vnfr_id, nsr_id, primitive_name):
103 vnf = self._nsm.get_vnfr_msg(vnfr_id, nsr_id)
104 self._log.debug("vnfr_msg:%s", vnf)
105 if vnf:
106 self._log.debug("nsr/vnf {}/{}, vnf_configuration: %s",
107 vnf.vnf_configuration)
108 for primitive in vnf.vnf_configuration.service_primitive:
109 if primitive.name == primitive_name:
110 return primitive
111
112 raise ValueError("Could not find nsr/vnf {}/{} primitive {}"
113 .format(nsr_id, vnfr_id, primitive_name))
114
115 @asyncio.coroutine
116 def _apply_ns_config(self, agent_nsr, agent_vnfrs, rpc_ip):
117 """
118 Hook: Runs the user defined script. Feeds all the necessary data
119 for the script thro' yaml file.
120
121 TBD: Add support to pass multiple CA accounts if configures
122 Remove apply_ns_config from the Config Agent Plugins
123
124 Args:
125 rpc_ip (YangInput_Nsr_ExecNsConfigPrimitive): The input data.
126 nsr (NetworkServiceRecord): Description
127 vnfrs (dict): VNFR ID => VirtualNetworkFunctionRecord
128
129 """
130 def xlate(tag, tags):
131 # TBD
132 if tag is None or tags is None:
133 return tag
134 val = tag
135 if re.search('<.*>', tag):
136 try:
137 if tag == '<rw_mgmt_ip>':
138 val = tags['rw_mgmt_ip']
139 except KeyError as e:
140 self._log.info("RiftCA: Did not get a value for tag %s, e=%s",
141 tag, e)
142 return val
143
144 def get_meta(agent_nsr, agent_vnfrs):
145 unit_names, initial_params, vnfr_index_map, vnfr_data_map = {}, {}, {}, {}
146
147 for vnfr_id in agent_nsr.vnfr_ids:
148 vnfr = agent_vnfrs[vnfr_id]
149 self._log.debug("CA-RPC: VNFR metadata: {}".format(vnfr))
150
151 # index->vnfr ref
152 vnfr_index_map[vnfr.member_vnf_index] = vnfr_id
153 vnfr_data_dict = dict()
154 if 'mgmt_interface' in vnfr.vnfr:
155 vnfr_data_dict['mgmt_interface'] = vnfr.vnfr['mgmt_interface']
156
157 vnfr_data_dict['connection_point'] = []
158 if 'connection_point' in vnfr.vnfr:
159 for cp in vnfr.vnfr['connection_point']:
160 cp_dict = dict()
161 cp_dict['name'] = cp['name']
162 cp_dict['ip_address'] = cp['ip_address']
163 vnfr_data_dict['connection_point'].append(cp_dict)
164
165 try:
166 vnfr_data_dict['vdur'] = []
167 vdu_data = [(vdu['name'], vdu['management_ip'], vdu['vm_management_ip'], vdu['id'])
168 for vdu in vnfr.vnfr['vdur']]
169
170 for data in vdu_data:
171 data = dict(zip(['name', 'management_ip', 'vm_management_ip', 'id'] , data))
172 vnfr_data_dict['vdur'].append(data)
173
174 vnfr_data_map[vnfr.member_vnf_index] = vnfr_data_dict
175 except KeyError as e:
176 self._log.warn("Error getting VDU data for VNFR {}".format(vnfr))
177
178 # Unit name
179 unit_names[vnfr_id] = None
180 for config_plugin in self.nsm.config_agent_plugins:
181 name = config_plugin.get_service_name(vnfr_id)
182 if name:
183 unit_names[vnfr_id] = name
184 break
185
186 # Flatten the data for simplicity
187 param_data = {}
188 if 'initial_config_primitive' in vnfr.vnf_configuration:
189 for primitive in vnfr.vnf_configuration['initial_config_primitive']:
190 if 'parameter' in primitive:
191 for parameter in primitive['parameter']:
192 try:
193 value = xlate(parameter['value'], vnfr.tags)
194 param_data[parameter['name']] = value
195 except KeyError as e:
196 self._log.warn("Unable to parse the parameter{}: {}".
197 format(parameter))
198
199 initial_params[vnfr_id] = param_data
200
201
202 return unit_names, initial_params, vnfr_index_map, vnfr_data_map
203
204 def get_config_agent():
205 ret = {}
206 for config_plugin in self.nsm.config_agent_plugins:
207 if config_plugin.agent_type in [riftcm_config_plugin.DEFAULT_CAP_TYPE]:
208 ret = config_plugin.agent_data
209 else:
210 # Currently the first non default plugin is returned
211 return config_plugin.agent_data
212 return ret
213
214 unit_names, init_data, vnfr_index_map, vnfr_data_map = get_meta(agent_nsr, agent_vnfrs)
215
216 # The data consists of 4 sections
217 # 1. Account data
218 # 2. The input passed.
219 # 3. Juju unit names (keyed by vnfr ID).
220 # 4. Initial config data (keyed by vnfr ID).
221 data = dict()
222 data['config_agent'] = get_config_agent()
223 data["rpc_ip"] = rpc_ip.as_dict()
224 data["unit_names"] = unit_names
225 data["init_config"] = init_data
226 data["vnfr_index_map"] = vnfr_index_map
227 data["vnfr_data_map"] = vnfr_data_map
228
229 tmp_file = None
230 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
231 tmp_file.write(yaml.dump(data, default_flow_style=True)
232 .encode("UTF-8"))
233
234 self._log.debug("CA-RPC: Creating a temp file {} with input data: {}".
235 format(tmp_file.name, data))
236
237 # Get the full path to the script
238 script = ''
239 if rpc_ip.user_defined_script[0] == '/':
240 # The script has full path, use as is
241 script = rpc_ip.user_defined_script
242 else:
243 script = os.path.join(self._rift_artif_dir, 'launchpad/libs', agent_nsr.id, 'scripts',
244 rpc_ip.user_defined_script)
245 self._log.debug("CA-RPC: Checking for script in %s", script)
246 if not os.path.exists(script):
247 script = os.path.join(self._rift_install_dir, 'usr/bin', rpc_ip.user_defined_script)
248
249 cmd = "{} {}".format(rpc_ip.user_defined_script, tmp_file.name)
250 self._log.debug("CA-RPC: Running the CMD: {}".format(cmd))
251
252 coro = asyncio.create_subprocess_shell(cmd, loop=self._loop,
253 stderr=asyncio.subprocess.PIPE)
254 process = yield from coro
255 err = yield from process.stderr.read()
256 task = self._loop.create_task(process.wait())
257
258 return task, err
259
260 @asyncio.coroutine
261 def register(self):
262 """ Register for NS monitoring read from dts """
263 yield from self.job_manager.register()
264
265 @asyncio.coroutine
266 def on_ns_config_prepare(xact_info, action, ks_path, msg):
267 """ prepare callback from dts exec-ns-service-primitive"""
268 assert action == rwdts.QueryAction.RPC
269 rpc_ip = msg
270 rpc_op = NsrYang.YangOutput_Nsr_ExecNsServicePrimitive.from_dict({
271 "triggered_by": rpc_ip.triggered_by,
272 "create_time": int(time.time()),
273 "parameter": [param.as_dict() for param in rpc_ip.parameter],
274 "parameter_group": [pg.as_dict() for pg in rpc_ip.parameter_group]
275 })
276
277 try:
278 ns_cfg_prim_name = rpc_ip.name
279 nsr_id = rpc_ip.nsr_id_ref
280 nsr = self._nsm.nsrs[nsr_id]
281
282 nsd_cfg_prim_msg = yield from self._get_ns_cfg_primitive(nsr_id, ns_cfg_prim_name)
283
284 def find_nsd_vnf_prim_param_pool(vnf_index, vnf_prim_name, param_name):
285 for vnf_prim_group in nsd_cfg_prim_msg.vnf_primitive_group:
286 if vnf_prim_group.member_vnf_index_ref != vnf_index:
287 continue
288
289 for vnf_prim in vnf_prim_group.primitive:
290 if vnf_prim.name != vnf_prim_name:
291 continue
292
293 try:
294 nsr_param_pool = nsr.param_pools[pool_param.parameter_pool]
295 except KeyError:
296 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim.parameter_pool)
297
298 self._log.debug("Found parameter pool %s for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
299 nsr_param_pool, vnf_index, vnf_prim_name, param_name)
300 return nsr_param_pool
301
302 self._log.debug("Could not find parameter pool for vnf index(%s), vnf_prim_name(%s), param_name(%s)",
303 vnf_index, vnf_prim_name, param_name)
304 return None
305
306 rpc_op.nsr_id_ref = nsr_id
307 rpc_op.name = ns_cfg_prim_name
308
309 nsr, vnfrs = self.prepare_meta(rpc_ip)
310 rpc_op.job_id = nsr.job_id
311
312 # Copy over the NS level Parameters
313
314 # Give preference to user defined script.
315 if nsd_cfg_prim_msg and nsd_cfg_prim_msg.has_field("user_defined_script"):
316 rpc_ip.user_defined_script = nsd_cfg_prim_msg.user_defined_script
317
318 tasks = []
319 task, err = yield from self._apply_ns_config(
320 nsr,
321 vnfrs,
322 rpc_ip)
323 tasks.append(task)
324 if err:
325 rpc_op.job_status_details = err.decode()
326
327 self.job_manager.add_job(rpc_op, tasks)
328 else:
329 # Otherwise create VNF primitives.
330 for vnf in rpc_ip.vnf_list:
331 vnf_op = rpc_op.vnf_out_list.add()
332 vnf_member_idx = vnf.member_vnf_index_ref
333 vnfr_id = vnf.vnfr_id_ref
334 vnf_op.vnfr_id_ref = vnfr_id
335 vnf_op.member_vnf_index_ref = vnf_member_idx
336
337 idx = 0
338 for primitive in vnf.vnf_primitive:
339 op_primitive = vnf_op.vnf_out_primitive.add()
340 op_primitive.index = idx
341 idx += 1
342 op_primitive.name = primitive.name
343 op_primitive.execution_id = ''
344 op_primitive.execution_status = 'completed'
345 op_primitive.execution_error_details = ''
346
347 # Copy over the VNF pimitive's input parameters
348 for param in primitive.parameter:
349 output_param = op_primitive.parameter.add()
350 output_param.name = param.name
351 output_param.value = param.value
352
353 self._log.debug("%s:%s Got primitive %s:%s",
354 nsr_id, vnf.member_vnf_index_ref, primitive.name, primitive.parameter)
355
356 nsd_vnf_primitive = yield from self._get_vnf_primitive(
357 vnfr_id,
358 nsr_id,
359 primitive.name
360 )
361 for param in nsd_vnf_primitive.parameter:
362 if not param.has_field("parameter_pool"):
363 continue
364
365 try:
366 nsr_param_pool = nsr.param_pools[param.parameter_pool]
367 except KeyError:
368 raise ValueError("Parameter pool %s does not exist in nsr" % param.parameter_pool)
369 nsr_param_pool.add_used_value(param.value)
370
371 for config_plugin in self.nsm.config_agent_plugins:
372 yield from config_plugin.vnf_config_primitive(nsr_id,
373 vnfr_id,
374 primitive,
375 op_primitive)
376
377 self.job_manager.add_job(rpc_op)
378
379 # Get NSD
380 # Find Config Primitive
381 # For each vnf-primitive with parameter pool
382 # Find parameter pool
383 # Add used value to the pool
384 self._log.debug("RPC output: {}".format(rpc_op))
385 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
386 RiftCMRPCHandler.EXEC_NS_CONF_O_XPATH,
387 rpc_op)
388 except Exception as e:
389 self._log.error("Exception processing the "
390 "exec-ns-service-primitive: {}".format(e))
391 self._log.exception(e)
392 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
393 RiftCMRPCHandler.EXEC_NS_CONF_O_XPATH)
394
395 @asyncio.coroutine
396 def on_get_ns_config_values_prepare(xact_info, action, ks_path, msg):
397 assert action == rwdts.QueryAction.RPC
398 nsr_id = msg.nsr_id_ref
399 cfg_prim_name = msg.name
400 try:
401 nsr = self._nsm.nsrs[nsr_id]
402
403 rpc_op = NsrYang.YangOutput_Nsr_GetNsServicePrimitiveValues()
404
405 ns_cfg_prim_msg = yield from self._get_ns_cfg_primitive(nsr_id, cfg_prim_name)
406
407 # Get pool values for NS-level parameters
408 for ns_param in ns_cfg_prim_msg.parameter:
409 if not ns_param.has_field("parameter_pool"):
410 continue
411
412 try:
413 nsr_param_pool = nsr.param_pools[ns_param.parameter_pool]
414 except KeyError:
415 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param.parameter_pool)
416
417 new_ns_param = rpc_op.ns_parameter.add()
418 new_ns_param.name = ns_param.name
419 new_ns_param.value = str(nsr_param_pool.get_next_unused_value())
420
421 # Get pool values for NS-level parameters
422 for vnf_prim_group in ns_cfg_prim_msg.vnf_primitive_group:
423 rsp_prim_group = rpc_op.vnf_primitive_group.add()
424 rsp_prim_group.member_vnf_index_ref = vnf_prim_group.member_vnf_index_ref
425 if vnf_prim_group.has_field("vnfd_id_ref"):
426 rsp_prim_group.vnfd_id_ref = vnf_prim_group.vnfd_id_ref
427
428 for index, vnf_prim in enumerate(vnf_prim_group.primitive):
429 rsp_prim = rsp_prim_group.primitive.add()
430 rsp_prim.name = vnf_prim.name
431 rsp_prim.index = index
432 vnf_primitive = yield from self._get_vnf_primitive(
433 vnf_prim_group.vnfd_id_ref,
434 nsr_id,
435 vnf_prim.name
436 )
437 for param in vnf_primitive.parameter:
438 if not param.has_field("parameter_pool"):
439 continue
440
441 # Get pool values for NS-level parameters
442 for ns_param in ns_cfg_prim_msg.parameter:
443 if not ns_param.has_field("parameter_pool"):
444 continue
445
446 try:
447 nsr_param_pool = nsr.param_pools[ns_param.parameter_pool]
448 except KeyError:
449 raise ValueError("Parameter pool %s does not exist in nsr" % ns_param.parameter_pool)
450
451 new_ns_param = rpc_op.ns_parameter.add()
452 new_ns_param.name = ns_param.name
453 new_ns_param.value = str(nsr_param_pool.get_next_unused_value())
454
455 # Get pool values for NS-level parameters
456 for vnf_prim_group in ns_cfg_prim_msg.vnf_primitive_group:
457 rsp_prim_group = rpc_op.vnf_primitive_group.add()
458 rsp_prim_group.member_vnf_index_ref = vnf_prim_group.member_vnf_index_ref
459 if vnf_prim_group.has_field("vnfd_id_ref"):
460 rsp_prim_group.vnfd_id_ref = vnf_prim_group.vnfd_id_ref
461
462 for index, vnf_prim in enumerate(vnf_prim_group.primitive):
463 rsp_prim = rsp_prim_group.primitive.add()
464 rsp_prim.name = vnf_prim.name
465 rsp_prim.index = index
466 vnf_primitive = yield from self._get_vnf_primitive(
467 nsr_id,
468 vnf_prim_group.member_vnf_index_ref,
469 vnf_prim.name
470 )
471 for param in vnf_primitive.parameter:
472 if not param.has_field("parameter_pool"):
473 continue
474
475 try:
476 nsr_param_pool = nsr.param_pools[param.parameter_pool]
477 except KeyError:
478 raise ValueError("Parameter pool %s does not exist in nsr" % vnf_prim.parameter_pool)
479
480 vnf_param = rsp_prim.parameter.add()
481 vnf_param.name = param.name
482 vnf_param.value = str(nsr_param_pool.get_next_unused_value())
483
484 self._log.debug("RPC output: {}".format(rpc_op))
485 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
486 RiftCMRPCHandler.GET_NS_CONF_O_XPATH, rpc_op)
487 except Exception as e:
488 self._log.error("Exception processing the "
489 "get-ns-service-primitive-values: {}".format(e))
490 self._log.exception(e)
491 xact_info.respond_xpath(rwdts.XactRspCode.NACK,
492 RiftCMRPCHandler.GET_NS_CONF_O_XPATH)
493
494 hdl_ns = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_ns_config_prepare,)
495 hdl_ns_get = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_get_ns_config_values_prepare,)
496
497 with self._dts.group_create() as group:
498 self._ns_regh = group.register(xpath=RiftCMRPCHandler.EXEC_NS_CONF_XPATH,
499 handler=hdl_ns,
500 flags=rwdts.Flag.PUBLISHER,
501 )
502 self._get_ns_conf_regh = group.register(xpath=RiftCMRPCHandler.GET_NS_CONF_XPATH,
503 handler=hdl_ns_get,
504 flags=rwdts.Flag.PUBLISHER,
505 )
506
507