update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / RiftCA.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import concurrent.futures
19 import os
20 import re
21 import shlex
22 import tempfile
23 import yaml
24
25 from gi.repository import (
26 RwDts as rwdts,
27 )
28
29 from . import riftcm_config_plugin
30
31 class RiftCAConfigPlugin(riftcm_config_plugin.RiftCMConfigPluginBase):
32 """
33 Implementation of the riftcm_config_plugin.RiftCMConfigPluginBase
34 """
35 def __init__(self, dts, log, loop, project, account):
36 riftcm_config_plugin.RiftCMConfigPluginBase.__init__(self, dts, log,
37 loop, project, account)
38 self._name = account.name
39 self._type = riftcm_config_plugin.DEFAULT_CAP_TYPE
40 self._rift_install_dir = os.environ['RIFT_INSTALL']
41 self._rift_var_root_dir = os.environ['RIFT_VAR_ROOT']
42 self._rift_vnfs = {}
43 self._tasks = {}
44
45 @property
46 def name(self):
47 return self._name
48
49 @property
50 def agent_type(self):
51 return self._type
52
53 @property
54 def agent_data(self):
55 return dict(
56 type=self.agent_type,
57 name=self.name,
58 )
59
60 def vnfr(self, vnfr_id):
61 try:
62 vnfr = self._rift_vnfs[vnfr_id].vnfr
63 except KeyError:
64 self._log.debug("RiftCA: Did not find VNFR %s in Rift plugin", vnfr_id)
65 return None
66
67 return vnfr
68
69 def get_service_name(self, vnfr_id):
70 vnfr = self.vnfr(vnfr_id)
71 if vnfr:
72 return vnfr['name']
73 return None
74
75 @asyncio.coroutine
76 def notify_create_vlr(self, agent_nsr, agent_vnfr, vld, vlr):
77 """
78 Notification of create VL record
79 """
80 pass
81
82 @asyncio.coroutine
83 def is_vnf_configurable(self, agent_vnfr):
84 '''
85 This needs to be part of abstract class
86 '''
87 loop_count = 10
88 while loop_count:
89 loop_count -= 1
90 # Set this VNF's configurability status (need method to check)
91 yield from asyncio.sleep(2, loop=self._loop)
92
93 def riftca_log(self, name, level, log_str, *args):
94 getattr(self._log, level)('RiftCA:({}) {}'.format(name, log_str), *args)
95
96 @asyncio.coroutine
97 def notify_create_vnfr(self, agent_nsr, agent_vnfr):
98 """
99 Notification of create Network VNF record
100 """
101 # Deploy the charm if specified for the vnf
102 self._log.debug("Rift config agent: create vnfr nsr={} vnfr={}"
103 .format(agent_nsr.name, agent_vnfr.name))
104 try:
105 self._loop.create_task(self.is_vnf_configurable(agent_vnfr))
106 except Exception as e:
107 self._log.debug("Rift config agent: vnf_configuration error for VNF:%s/%s: %s",
108 agent_nsr.name, agent_vnfr.name, str(e))
109 return False
110
111 return True
112
113 @asyncio.coroutine
114 def notify_instantiate_vnfr(self, agent_nsr, agent_vnfr):
115 """
116 Notification of Instantiate NSR with the passed nsr id
117 """
118 pass
119
120 @asyncio.coroutine
121 def notify_instantiate_vlr(self, agent_nsr, agent_vnfr, vlr):
122 """
123 Notification of Instantiate NSR with the passed nsr id
124 """
125 pass
126
127 @asyncio.coroutine
128 def notify_terminate_vnfr(self, agent_nsr, agent_vnfr):
129 """
130 Notification of Terminate the network service
131 """
132
133 @asyncio.coroutine
134 def notify_terminate_vlr(self, agent_nsr, agent_vnfr, vlr):
135 """
136 Notification of Terminate the virtual link
137 """
138 pass
139
140 @asyncio.coroutine
141 def _vnf_config_primitive(self, nsr_id, vnfr_id, primitive,
142 vnf_config=None, vnfd_descriptor=None):
143 '''
144 Pass vnf_config to avoid querying DTS each time
145 '''
146 self._log.debug("VNF config primitive {} for nsr {}, vnfr {}".
147 format(primitive.name, nsr_id, vnfr_id))
148
149 if vnf_config is None or vnfd_descriptor is None:
150 vnfr_msg = yield from self.get_vnfr(vnfr_id)
151 if vnfr_msg is None:
152 msg = "Unable to get VNFR {} through DTS".format(vnfr_id)
153 self._log.error(msg)
154 return 3, msg
155
156 vnf_config = vnfr_msg.vnf_configuration
157 vnfd_descriptor = vnfr_msg.vnfd
158 self._log.debug("VNF config= %s", vnf_config.as_dict())
159 self._log.debug("VNFD descriptor= %s", vnfd_descriptor.as_dict())
160
161 data = {}
162 script = None
163 found = False
164
165 configs = vnf_config.config_primitive
166 for config in configs:
167 if config.name == primitive.name:
168 found = True
169 self._log.debug("RiftCA: Found the config primitive %s",
170 config.name)
171
172 spt = config.user_defined_script
173 if spt is None:
174 self._log.error("RiftCA: VNFR {}, Did not find "
175 "script defined in config {}".
176 format(vnfr['name'], config.as_dict()))
177 return 1, "Did not find user defined script for " \
178 "config primitive {}".format(primitive.name)
179
180 spt = shlex.quote(spt.strip())
181 if spt[0] == '/':
182 script = spt
183 else:
184 script = os.path.join(self._rift_var_root_dir,
185 'launchpad/packages/vnfd',
186 self._project.name,
187 vnfd_descriptor.id,
188 'scripts',
189 spt)
190 self._log.debug("Rift config agent: Checking for script "
191 "in %s", script)
192 if not os.path.exists(script):
193 self._log.debug("Rift config agent: Did not find "
194 "script %s", script)
195 return 1, "Did not find user defined " \
196 "script {}".format(spt)
197
198 params = {}
199 for param in config.parameter:
200 val = None
201 for p in primitive.parameter:
202 if p.name == param.name:
203 val = p.value
204 break
205
206 if val is None:
207 val = param.default_value
208
209 if val is None:
210 # Check if mandatory parameter
211 if param.mandatory:
212 msg = "VNFR {}: Primitive {} called " \
213 "without mandatory parameter {}". \
214 format(vnfr.name, config.name,
215 param.name)
216 self._log.error(msg)
217 return 1, msg
218
219 if val:
220 val = self.convert_value(val, param.data_type)
221 params.update({param.name: val})
222
223 data['parameters'] = params
224 break
225
226 if not found:
227 msg = "Did not find the primitive {} in VNFR {}". \
228 format(primitive.name, vnfr.name)
229 self._log.error(msg)
230 return 1, msg
231
232 rc, script_err = yield from self.exec_script(script, data)
233 return rc, script_err
234
235 @asyncio.coroutine
236 def vnf_config_primitive(self, nsr_id, vnfr_id, primitive, output):
237 '''
238 primitives support by RiftCA
239
240 Pass vnf_config to avoid querying DTS each time
241 '''
242 try:
243 vnfr = self._rift_vnfs[vnfr_id].vnfr
244 except KeyError:
245 msg = "Did not find VNFR {} in Rift plugin".format(vnfr_id)
246 self._log.debug(msg)
247 return
248
249 output.execution_status = "failed"
250 output.execution_id = ''
251 output.execution_error_details = ''
252
253 rc, err = yield from self._vnf_config_primitive(nsr_id,
254 vnfr_id,
255 primitive)
256 self._log.debug("VNFR {} primitive {} exec status: {}".
257 format(vnfr_id, primitive.name, rc))
258
259 if rc == 0:
260 output.execution_status = "completed"
261 else:
262 self._rift_vnfs[vnfr_id].error = True
263
264 output.execution_error_details = '{}'.format(err)
265
266 @asyncio.coroutine
267 def apply_config(self, config, nsr, vnfr, rpc_ip):
268 """ Notification on configuration of an NSR """
269 pass
270
271 @asyncio.coroutine
272 def apply_ns_config(self, agent_nsr, agent_vnfrs, rpc_ip):
273 """Hook: Runs the user defined script. Feeds all the necessary data
274 for the script thro' yaml file.
275
276 Args:
277 rpc_ip (YangInput_Nsr_ExecNsConfigPrimitive): The input data.
278 nsr (NetworkServiceRecord): Description
279 vnfrs (dict): VNFR ID => VirtualNetworkFunctionRecord
280 """
281
282 def xlate(tag, tags):
283 # TBD
284 if tag is None or tags is None:
285 return tag
286 val = tag
287 if re.search('<.*>', tag):
288 try:
289 if tag == '<rw_mgmt_ip>':
290 val = tags['rw_mgmt_ip']
291 except KeyError as e:
292 self._log.info("RiftCA: Did not get a value for tag %s, e=%s",
293 tag, e)
294 return val
295
296 def get_meta(agent_nsr, agent_vnfrs):
297 unit_names, initial_params, vnfr_index_map, vnfr_data_map = {}, {}, {}, {}
298
299 for vnfr_id in agent_nsr.vnfr_ids:
300 vnfr = agent_vnfrs[vnfr_id]
301
302 # index->vnfr ref
303 vnfr_index_map[vnfr.member_vnf_index] = vnfr_id
304 vnfr_data_dict = dict()
305 if 'mgmt_interface' in vnfr.vnfr:
306 vnfr_data_dict['mgmt_interface'] = vnfr.vnfr['mgmt_interface']
307
308 vnfr_data_dict['connection_point'] = []
309 vnfr_data_dict['name'] = vnfr.vnfr['name']
310 vnfr_data_dict['datacenter'] = vnfr.vnfr['datacenter']
311 if 'connection_point' in vnfr.vnfr:
312 for cp in vnfr.vnfr['connection_point']:
313 cp_dict = dict()
314 cp_dict['name'] = cp['name']
315 cp_dict['ip_address'] = cp['ip_address']
316 cp_dict['connection_point_id'] = cp['connection_point_id']
317 if 'virtual_cps' in cp:
318 cp_dict['virtual_cps'] = [ {k:v for k,v in vcp.items()
319 if k in ['ip_address', 'mac_address']}
320 for vcp in cp['virtual_cps'] ]
321 vnfr_data_dict['connection_point'].append(cp_dict)
322
323 vnfr_data_dict['vdur'] = []
324 vdu_data = [(vdu['name'], vdu['management_ip'], vdu['vm_management_ip'], vdu['id'], vdu['vdu_id_ref'])
325 for vdu in vnfr.vnfr['vdur']]
326
327 for data in vdu_data:
328 data = dict(zip(['name', 'management_ip', 'vm_management_ip', 'id', 'vdu_id_ref'] , data))
329 vnfr_data_dict['vdur'].append(data)
330
331 vnfr_data_map[vnfr.member_vnf_index] = vnfr_data_dict
332 # Unit name
333 unit_names[vnfr_id] = vnfr.name
334 # Flatten the data for simplicity
335 param_data = {}
336 if 'initial_config_primitive' in vnfr.vnf_configuration:
337 for primitive in vnfr.vnf_configuration['initial_config_primitive']:
338 for parameter in primitive.parameter:
339 value = xlate(parameter.value, vnfr.tags)
340 param_data[parameter.name] = value
341
342 initial_params[vnfr_id] = param_data
343
344
345 return unit_names, initial_params, vnfr_index_map, vnfr_data_map
346
347 unit_names, init_data, vnfr_index_map, vnfr_data_map = get_meta(agent_nsr, agent_vnfrs)
348 # The data consists of 4 sections
349 # 1. Account data
350 # 2. The input passed.
351 # 3. Unit names (keyed by vnfr ID).
352 # 4. Initial config data (keyed by vnfr ID).
353 data = dict()
354 data['config_agent'] = dict(
355 name=self._name,
356 )
357 data["rpc_ip"] = rpc_ip.as_dict()
358 data["unit_names"] = unit_names
359 data["init_config"] = init_data
360 data["vnfr_index_map"] = vnfr_index_map
361 data["vnfr_data_map"] = vnfr_data_map
362
363 tmp_file = None
364 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
365 tmp_file.write(yaml.dump(data, default_flow_style=True)
366 .encode("UTF-8"))
367
368 # Get the full path to the script
369 script = ''
370 if rpc_ip.user_defined_script[0] == '/':
371 # The script has full path, use as is
372 script = rpc_ip.user_defined_script
373 else:
374 script = os.path.join(self._rift_var_root_dir,
375 'launchpad/packages/nsd',
376 self._project.name,
377 agent_nsr.nsd_id, 'scripts',
378 rpc_ip.user_defined_script)
379 self._log.debug("Rift config agent: Checking for script in %s", script)
380 if not os.path.exists(script):
381 self._log.error("Rift config agent: Did not find script %s", script)
382
383 cmd = "{} {}".format(script, tmp_file.name)
384 self._log.debug("Rift config agent: Running the CMD: {}".format(cmd))
385
386 coro = asyncio.create_subprocess_shell(cmd, loop=self._loop,
387 stderr=asyncio.subprocess.PIPE)
388 process = yield from coro
389 err = yield from process.stderr.read()
390 task = self._loop.create_task(process.wait())
391
392 return task, err
393
394 @asyncio.coroutine
395 def apply_initial_config_new(self, agent_nsr, agent_vnfr):
396 self._log.debug("RiftCA: VNF initial config primitive for nsr {}, vnfr {}".
397 format(agent_nsr.name, agent_vnfr.name))
398
399 try:
400 vnfr = self._rift_vnfs[agent_vnfr.id].vnfr
401 except KeyError:
402 self._log.error("RiftCA: Did not find VNFR %s in RiftCA plugin",
403 agent_vnfr.name)
404 return False
405
406 class Primitive:
407 def __init__(self, name):
408 self.name = name
409 self.value = None
410 self.parameter = []
411
412 vnfr = yield from self.get_vnfr(agent_vnfr.id)
413 if vnfr is None:
414 msg = "Unable to get VNFR {} ({}) through DTS". \
415 format(agent_vnfr.id, agent_vnfr.name)
416 self._log.error(msg)
417 raise RuntimeError(msg)
418
419 vnf_config = vnfr.vnf_configuration
420 self._log.debug("VNFR %s config: %s", vnfr.name,
421 vnf_config.as_dict())
422
423 vnfd_descriptor = vnfr.vnfd
424 self._log.debug("VNFR %s vnfd descriptor: %s", vnfr.name,
425 vnfd_descriptor.as_dict())
426
427
428 # Sort the primitive based on the sequence number
429 primitives = sorted(vnf_config.initial_config_primitive,
430 key=lambda k: k.seq)
431 if not primitives:
432 self._log.debug("VNFR {}: No initial-config-primitive specified".
433 format(vnfr.name))
434 return True
435
436 for primitive in primitives:
437 if primitive.config_primitive_ref:
438 # Reference to a primitive in config primitive
439 prim = Primitive(primitive.config_primitive_ref)
440 rc, err = yield from self._vnf_config_primitive(agent_nsr.id,
441 agent_vnfr.id,
442 prim,
443 vnf_config, vnfd_descriptor)
444 if rc != 0:
445 msg = "Error executing initial config primitive" \
446 " {} in VNFR {}: rc={}, stderr={}". \
447 format(prim.name, vnfr.name, rc, err)
448 self._log.error(msg)
449 return False
450
451 elif primitive.name:
452 if not primitive.user_defined_script:
453 msg = "Primitive {} definition in initial config " \
454 "primitive for VNFR {} not supported yet". \
455 format(primitive.name, vnfr.name)
456 self._log.error(msg)
457 raise NotImplementedError(msg)
458
459 return True
460
461 @asyncio.coroutine
462 def apply_initial_config(self, agent_nsr, agent_vnfr):
463 """
464 Apply the initial configuration
465 """
466 self._log.debug("Rift config agent: Apply initial config to VNF:%s/%s",
467 agent_nsr.name, agent_vnfr.name)
468 rc = False
469
470 try:
471 if agent_vnfr.id in self._rift_vnfs.keys():
472 rc = yield from self.apply_initial_config_new(agent_nsr, agent_vnfr)
473 if not rc:
474 agent_vnfr._error = True
475
476 else:
477 rc = True
478 except Exception as e:
479 self._log.error("Rift config agent: Error on initial configuration to VNF:{}/{}, e {}"
480 .format(agent_nsr.name, agent_vnfr.name, str(e)))
481
482 self._log.exception(e)
483 agent_vnfr.error = True
484 return False
485
486 return rc
487
488 def is_vnfr_managed(self, vnfr_id):
489 try:
490 if vnfr_id in self._rift_vnfs:
491 return True
492 except Exception as e:
493 self._log.debug("Rift config agent: Is VNFR {} managed: {}".
494 format(vnfr_id, e))
495 return False
496
497 def add_vnfr_managed(self, agent_vnfr):
498 if agent_vnfr.id not in self._rift_vnfs.keys():
499 self._log.info("Rift config agent: add vnfr={}/{}".format(agent_vnfr.name, agent_vnfr.id))
500 self._rift_vnfs[agent_vnfr.id] = agent_vnfr
501
502 @asyncio.coroutine
503 def get_config_status(self, agent_nsr, agent_vnfr):
504 if agent_vnfr.id in self._rift_vnfs.keys():
505 if agent_vnfr.error:
506 return 'error'
507 return 'configured'
508 return 'unknown'
509
510
511 def get_action_status(self, execution_id):
512 ''' Get the action status for an execution ID
513 *** Make sure this is NOT a asyncio coroutine function ***
514 '''
515 return None