Move 'launchpad' directory to 'RIFT_VAR_ROOT' from 'RIFT_ARTIFACTS'
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / rwconman_config.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import os
20 import stat
21 import subprocess
22 import sys
23 import tempfile
24 import yaml
25
26 from gi.repository import (
27 RwDts as rwdts,
28 RwConmanYang as conmanY,
29 ProtobufC,
30 )
31
32 import rift.tasklets
33 import rift.package.script
34 import rift.package.store
35
36 from . import rwconman_conagent as conagent
37 from . import RiftCM_rpc
38 from . import riftcm_config_plugin
39
40
41 if sys.version_info < (3, 4, 4):
42 asyncio.ensure_future = asyncio.async
43
44 def get_vnf_unique_name(nsr_name, vnfr_short_name, member_vnf_index):
45 return "{}.{}.{}".format(nsr_name, vnfr_short_name, member_vnf_index)
46
47 class ConmanConfigError(Exception):
48 pass
49
50
51 class InitialConfigError(ConmanConfigError):
52 pass
53
54
55 def log_this_vnf(vnf_cfg):
56 log_vnf = ""
57 used_item_list = ['nsr_name', 'vnfr_name', 'member_vnf_index', 'mgmt_ip_address']
58 for item in used_item_list:
59 if item in vnf_cfg:
60 if item == 'mgmt_ip_address':
61 log_vnf += "({})".format(vnf_cfg[item])
62 else:
63 log_vnf += "{}/".format(vnf_cfg[item])
64 return log_vnf
65
66 class PretendNsm(object):
67 def __init__(self, dts, log, loop, parent):
68 self._dts = dts
69 self._log = log
70 self._loop = loop
71 self._parent = parent
72 self._nsrs = {}
73 self._nsr_dict = parent._nsr_dict
74 self._config_agent_plugins = []
75 self._nsd_msg = {}
76
77 @property
78 def nsrs(self):
79 # Expensive, instead use get_nsr, if you know id.
80 self._nsrs = {}
81 # Update the list of nsrs (agent nsr)
82 for id, nsr_obj in self._nsr_dict.items():
83 self._nsrs[id] = nsr_obj.agent_nsr
84 return self._nsrs
85
86 def get_nsr(self, nsr_id):
87 if nsr_id in self._nsr_dict:
88 nsr_obj = self._nsr_dict[nsr_id]
89 return nsr_obj._nsr
90 return None
91
92 def get_vnfr_msg(self, vnfr_id, nsr_id=None):
93 self._log.debug("get_vnfr_msg(vnfr=%s, nsr=%s)",
94 vnfr_id, nsr_id)
95 found = False
96 if nsr_id:
97 if nsr_id in self._nsr_dict:
98 nsr_obj = self._nsr_dict[nsr_id]
99 if vnfr_id in nsr_obj._vnfr_dict:
100 found = True
101 else:
102 for nsr_obj in self._nsr_dict.values():
103 if vnfr_id in nsr_obj._vnfr_dict:
104 # Found it
105 found = True
106 break
107 if found:
108 vnf_cfg = nsr_obj._vnfr_dict[vnfr_id]['vnf_cfg']
109 return vnf_cfg['agent_vnfr'].vnfr_msg
110 else:
111 return None
112
113 @asyncio.coroutine
114 def get_nsd(self, nsr_id):
115 if nsr_id not in self._nsd_msg:
116 nsr_config = yield from self._parent.cmdts_obj.get_nsr_config(nsr_id)
117 self._nsd_msg[nsr_id] = nsr_config.nsd
118 return self._nsd_msg[nsr_id]
119
120 @property
121 def config_agent_plugins(self):
122 self._config_agent_plugins = []
123 for agent in self._parent._config_agent_mgr._plugin_instances.values():
124 self._config_agent_plugins.append(agent)
125 return self._config_agent_plugins
126
127 class ConfigManagerConfig(object):
128 def __init__(self, dts, log, loop, parent):
129 self._dts = dts
130 self._log = log
131 self._loop = loop
132 self._parent = parent
133 self._nsr_dict = {}
134 self.pending_cfg = {}
135 self.terminate_cfg = {}
136 self.pending_tasks = [] # User for NSRid get retry
137 # (mainly excercised at restart case)
138 self._config_xpath = "C,/cm-config"
139 self._opdata_xpath = "D,/rw-conman:cm-state"
140
141 self.cm_config = conmanY.SoConfig()
142 # RO specific configuration
143 self.ro_config = {}
144 for key in self.cm_config.ro_endpoint.fields:
145 self.ro_config[key] = None
146
147 # Initialize cm-state
148 self.cm_state = {}
149 self.cm_state['cm_nsr'] = []
150 self.cm_state['states'] = "Initialized"
151
152 # Initialize objects to register
153 self.cmdts_obj = ConfigManagerDTS(self._log, self._loop, self, self._dts)
154 self._config_agent_mgr = conagent.RiftCMConfigAgent(
155 self._dts,
156 self._log,
157 self._loop,
158 self,
159 )
160 self.reg_handles = [
161 self.cmdts_obj,
162 self._config_agent_mgr,
163 RiftCM_rpc.RiftCMRPCHandler(self._dts, self._log, self._loop,
164 PretendNsm(
165 self._dts, self._log, self._loop, self)),
166 ]
167
168 def is_nsr_valid(self, nsr_id):
169 if nsr_id in self._nsr_dict:
170 return True
171 return False
172
173 def add_to_pending_tasks(self, task):
174 if self.pending_tasks:
175 for p_task in self.pending_tasks:
176 if p_task['nsrid'] == task['nsrid']:
177 # Already queued
178 return
179 try:
180 self.pending_tasks.append(task)
181 self._log.debug("add_to_pending_tasks (nsrid:%s)",
182 task['nsrid'])
183 if len(self.pending_tasks) == 1:
184 self._loop.create_task(self.ConfigManagerConfig_pending_loop())
185 # TBD - change to info level
186 self._log.debug("Started pending_loop!")
187 except Exception as e:
188 self._log.error("Failed adding to pending tasks (%s)", str(e))
189
190 def del_from_pending_tasks(self, task):
191 try:
192 self.pending_tasks.remove(task)
193 except Exception as e:
194 self._log.error("Failed removing from pending tasks (%s)", str(e))
195
196 @asyncio.coroutine
197 def ConfigManagerConfig_pending_loop(self):
198 loop_sleep = 2
199 while True:
200 yield from asyncio.sleep(loop_sleep, loop=self._loop)
201 """
202 This pending task queue is ordred by events,
203 must finish previous task successfully to be able to go on to the next task
204 """
205 if self.pending_tasks:
206 self._log.debug("self.pending_tasks len=%s", len(self.pending_tasks))
207 task = self.pending_tasks[0]
208 done = False
209 if 'nsrid' in task:
210 nsrid = task['nsrid']
211 self._log.debug("Will execute pending task for NSR id(%s)", nsrid)
212 try:
213 # Try to configure this NSR
214 task['retries'] -= 1
215 done = yield from self.config_NSR(nsrid)
216 self._log.info("self.config_NSR status=%s", done)
217
218 except Exception as e:
219 self._log.error("Failed(%s) configuring NSR(%s)," \
220 "retries remained:%d!",
221 str(e), nsrid, task['retries'])
222 finally:
223 self.pending_tasks.remove(task)
224
225 if done:
226 self._log.debug("Finished pending task NSR id(%s):", nsrid)
227 else:
228 self._log.error("Failed configuring NSR(%s), retries remained:%d!",
229 nsrid, task['retries'])
230
231 # Failed, re-insert (append at the end)
232 # this failed task to be retried later
233 # If any retries remained.
234 if task['retries']:
235 self.pending_tasks.append(task)
236 else:
237 self._log.debug("Stopped pending_loop!")
238 break
239
240 @asyncio.coroutine
241 def register(self):
242 yield from self.register_cm_state_opdata()
243
244 # Initialize all handles that needs to be registered
245 for reg in self.reg_handles:
246 yield from reg.register()
247
248 @asyncio.coroutine
249 def register_cm_state_opdata(self):
250
251 def state_to_string(state):
252 state_dict = {
253 conmanY.RecordState.INIT : "init",
254 conmanY.RecordState.RECEIVED : "received",
255 conmanY.RecordState.CFG_PROCESS : "cfg_process",
256 conmanY.RecordState.CFG_PROCESS_FAILED : "cfg_process_failed",
257 conmanY.RecordState.CFG_SCHED : "cfg_sched",
258 conmanY.RecordState.CFG_DELAY : "cfg_delay",
259 conmanY.RecordState.CONNECTING : "connecting",
260 conmanY.RecordState.FAILED_CONNECTION : "failed_connection",
261 conmanY.RecordState.NETCONF_CONNECTED : "netconf_connected",
262 conmanY.RecordState.NETCONF_SSH_CONNECTED : "netconf_ssh_connected",
263 conmanY.RecordState.RESTCONF_CONNECTED : "restconf_connected",
264 conmanY.RecordState.CFG_SEND : "cfg_send",
265 conmanY.RecordState.CFG_FAILED : "cfg_failed",
266 conmanY.RecordState.READY_NO_CFG : "ready_no_cfg",
267 conmanY.RecordState.READY : "ready",
268 }
269 return state_dict[state]
270
271 @asyncio.coroutine
272 def on_prepare(xact_info, action, ks_path, msg):
273
274 self._log.debug("Received cm-state: msg=%s, action=%s", msg, action)
275
276 if action == rwdts.QueryAction.READ:
277 show_output = conmanY.CmOpdata()
278 show_output.from_dict(self.cm_state)
279 self._log.debug("Responding to SHOW cm-state: %s", self.cm_state)
280 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
281 xpath=self._opdata_xpath,
282 msg=show_output)
283 else:
284 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
285
286 self._log.info("Registering for cm-opdata xpath: %s",
287 self._opdata_xpath)
288
289 try:
290 handler=rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
291 yield from self._dts.register(xpath=self._opdata_xpath,
292 handler=handler,
293 flags=rwdts.Flag.PUBLISHER)
294 self._log.info("Successfully registered for opdata(%s)", self._opdata_xpath)
295 except Exception as e:
296 self._log.error("Failed to register for opdata as (%s)", e)
297
298 @asyncio.coroutine
299 def process_nsd_vnf_configuration(self, nsr_obj, vnfr):
300
301 def get_config_method(vnf_config):
302 cfg_types = ['juju', 'script']
303 for method in cfg_types:
304 if method in vnf_config:
305 return method
306 return None
307
308 def get_cfg_file_extension(method):
309 ext_dict = {
310 "juju" : "yml",
311 "script" : "py"
312 }
313
314 if method == "juju":
315 return ext_dict[method]
316 elif method == "script":
317 return ext_dict[method]
318 else:
319 return "cfg"
320
321 # This is how the YAML file should look like,
322 # This routine will be called for each VNF, so keep appending the file.
323 # priority order is determined by the number,
324 # hence no need to generate the file in that order. A dictionary will be
325 # used that will take care of the order by number.
326 '''
327 1 : <== This is priority
328 name : trafsink_vnfd
329 member_vnf_index : 2
330 configuration_delay : 120
331 configuration_type : netconf
332 configuration_options :
333 username : admin
334 password : admin
335 port : 2022
336 target : running
337 2 :
338 name : trafgen_vnfd
339 member_vnf_index : 1
340 configuration_delay : 0
341 configuration_type : netconf
342 configuration_options :
343 username : admin
344 password : admin
345 port : 2022
346 target : running
347 '''
348
349 # Save some parameters needed as short cuts in flat structure (Also generated)
350 vnf_cfg = vnfr['vnf_cfg']
351 # Prepare unique name for this VNF
352 vnf_cfg['vnf_unique_name'] = get_vnf_unique_name(
353 vnf_cfg['nsr_name'], vnfr['short_name'], vnfr['member_vnf_index_ref'])
354
355 nsr_obj.cfg_path_prefix = '{}/{}_{}'.format(
356 nsr_obj.this_nsr_dir, vnfr['short_name'], vnfr['member_vnf_index_ref'])
357 nsr_vnfr = '{}/{}_{}'.format(
358 vnf_cfg['nsr_name'], vnfr['short_name'], vnfr['member_vnf_index_ref'])
359
360 # Get vnf_configuration from vnfr
361 vnf_config = vnfr['vnf_configuration']
362
363 self._log.debug("vnf_configuration = %s", vnf_config)
364
365 # Create priority dictionary
366 cfg_priority_order = 0
367 if ('config_attributes' in vnf_config and
368 'config_priority' in vnf_config['config_attributes']):
369 cfg_priority_order = vnf_config['config_attributes']['config_priority']
370
371 if cfg_priority_order not in nsr_obj.nsr_cfg_config_attributes_dict:
372 # No VNFR with this priority yet, initialize the list
373 nsr_obj.nsr_cfg_config_attributes_dict[cfg_priority_order] = []
374
375 method = get_config_method(vnf_config)
376 if method is not None:
377 # Create all sub dictionaries first
378 config_priority = {
379 'id' : vnfr['id'],
380 'name' : vnfr['short_name'],
381 'member_vnf_index' : vnfr['member_vnf_index_ref'],
382 }
383
384 if 'config_delay' in vnf_config['config_attributes']:
385 config_priority['configuration_delay'] = vnf_config['config_attributes']['config_delay']
386 vnf_cfg['config_delay'] = config_priority['configuration_delay']
387
388 configuration_options = {}
389 self._log.debug("config method=%s", method)
390 config_priority['configuration_type'] = method
391 vnf_cfg['config_method'] = method
392
393 # Set config agent based on method
394 self._config_agent_mgr.set_config_agent(
395 nsr_obj.agent_nsr, vnf_cfg['agent_vnfr'], method)
396
397 cfg_opt_list = [
398 'port', 'target', 'script_type', 'ip_address', 'user', 'secret',
399 ]
400 for cfg_opt in cfg_opt_list:
401 if cfg_opt in vnf_config[method]:
402 configuration_options[cfg_opt] = vnf_config[method][cfg_opt]
403 vnf_cfg[cfg_opt] = configuration_options[cfg_opt]
404
405 # Add to the cp_dict
406 vnf_cp_dict = nsr_obj._cp_dict[vnfr['member_vnf_index_ref']]
407 vnf_cp_dict['rw_mgmt_ip'] = vnf_cfg['mgmt_ip_address']
408 vnf_cp_dict['rw_username'] = vnf_cfg['username']
409 vnf_cp_dict['rw_password'] = vnf_cfg['password']
410
411
412 # TBD - see if we can neatly include the config in "config_attributes" file, no need though
413 # Create config file
414 vnf_cfg['juju_script'] = os.path.join(self._parent.cfg_dir, 'juju_if.py')
415 vnf_cfg['cfg_file'] = '{}.{}'.format(nsr_obj.cfg_path_prefix, get_cfg_file_extension(method))
416
417 self._log.debug("VNF endpoint so far: %s", vnf_cfg)
418
419 # Populate filled up dictionary
420 config_priority['configuration_options'] = configuration_options
421 nsr_obj.nsr_cfg_config_attributes_dict[cfg_priority_order].append(config_priority)
422 nsr_obj.num_vnfs_to_cfg += 1
423 nsr_obj._vnfr_dict[vnf_cfg['vnf_unique_name']] = vnfr
424 nsr_obj._vnfr_dict[vnfr['id']] = vnfr
425
426 self._log.debug("VNF:(%s) config_attributes = %s",
427 log_this_vnf(vnfr['vnf_cfg']),
428 nsr_obj.nsr_cfg_config_attributes_dict)
429 else:
430 self._log.info("VNF:(%s) is not to be configured by Configuration Manager!",
431 log_this_vnf(vnfr['vnf_cfg']))
432 yield from nsr_obj.update_vnf_cm_state(vnfr, conmanY.RecordState.READY_NO_CFG)
433
434 # Update the cm-state
435 nsr_obj.populate_vm_state_from_vnf_cfg()
436
437 @asyncio.coroutine
438 def config_NSR(self, id):
439
440 def my_yaml_dump(config_attributes_dict, yf):
441
442 yaml_dict = dict(sorted(config_attributes_dict.items()))
443 yf.write(yaml.dump(yaml_dict, default_flow_style=False))
444
445 nsr_dict = self._nsr_dict
446 self._log.info("Configure NSR, id = %s", id)
447
448 try:
449 if id not in nsr_dict:
450 nsr_obj = ConfigManagerNSR(self._log, self._loop, self, id)
451 nsr_dict[id] = nsr_obj
452 else:
453 self._log.info("NSR(%s) is already initialized!", id)
454 nsr_obj = nsr_dict[id]
455 except Exception as e:
456 self._log.error("Failed creating NSR object for (%s) as (%s)", id, str(e))
457 raise
458
459 # Try to configure this NSR only if not already processed
460 if nsr_obj.cm_nsr['state'] != nsr_obj.state_to_string(conmanY.RecordState.INIT):
461 self._log.debug("NSR(%s) is already processed, state=%s",
462 nsr_obj.nsr_name, nsr_obj.cm_nsr['state'])
463 yield from nsr_obj.publish_cm_state()
464 return True
465
466 cmdts_obj = self.cmdts_obj
467 try:
468 # Fetch NSR
469 nsr = yield from cmdts_obj.get_nsr(id)
470 self._log.debug("Full NSR : %s", nsr)
471 if nsr['operational_status'] != "running":
472 self._log.info("NSR(%s) is not ready yet!", nsr['nsd_name_ref'])
473 return False
474 self._nsr = nsr
475
476 # Create Agent NSR class
477 nsr_config = yield from cmdts_obj.get_nsr_config(id)
478 self._log.debug("NSR {} config: {}".format(id, nsr_config))
479 nsr_obj.agent_nsr = riftcm_config_plugin.RiftCMnsr(nsr, nsr_config)
480
481 try:
482 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.RECEIVED)
483
484 # Parse NSR
485 if nsr is not None:
486 nsr_obj.set_nsr_name(nsr['name_ref'])
487 nsr_dir = os.path.join(self._parent.cfg_dir, nsr_obj.nsr_name)
488 self._log.info("Checking NS config directory: %s", nsr_dir)
489 if not os.path.isdir(nsr_dir):
490 os.makedirs(nsr_dir)
491
492 nsr_obj.set_config_dir(self)
493
494 for const_vnfr in nsr['constituent_vnfr_ref']:
495 self._log.debug("Fetching VNFR (%s)", const_vnfr['vnfr_id'])
496 vnfr_msg = yield from cmdts_obj.get_vnfr(const_vnfr['vnfr_id'])
497 if vnfr_msg:
498 vnfr = vnfr_msg.as_dict()
499 self._log.info("create VNF:{}/{}".format(nsr_obj.nsr_name, vnfr['short_name']))
500 agent_vnfr = yield from nsr_obj.add_vnfr(vnfr, vnfr_msg)
501
502 # Preserve order, self.process_nsd_vnf_configuration()
503 # sets up the config agent based on the method
504 yield from self.process_nsd_vnf_configuration(nsr_obj, vnfr)
505 yield from self._config_agent_mgr.invoke_config_agent_plugins(
506 'notify_create_vnfr',
507 nsr_obj.agent_nsr,
508 agent_vnfr)
509
510 except Exception as e:
511 self._log.error("Failed processing NSR (%s) as (%s)", nsr_obj.nsr_name, str(e))
512 self._log.exception(e)
513 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_PROCESS_FAILED)
514 raise
515
516 try:
517 # Generate config_config_attributes.yaml (For debug reference)
518 with open(nsr_obj.config_attributes_file, "w") as yf:
519 my_yaml_dump(nsr_obj.nsr_cfg_config_attributes_dict, yf)
520 except Exception as e:
521 self._log.error("NS:(%s) failed to write config attributes file as (%s)", nsr_obj.nsr_name, str(e))
522
523 self._log.debug("Starting to configure each VNF")
524
525 # Check if this NS has input parametrs
526 self._log.info("Checking NS configuration order: %s", nsr_obj.config_attributes_file)
527
528 if os.path.exists(nsr_obj.config_attributes_file):
529 # Apply configuration is specified order
530 try:
531 # Go in loop to configure by specified order
532 self._log.info("Using Dynamic configuration input parametrs for NS: %s", nsr_obj.nsr_name)
533
534 for config_attributes_dict in nsr_obj.nsr_cfg_config_attributes_dict.values():
535 # Iterate through each priority level
536 for vnf_config_attributes_dict in config_attributes_dict:
537 # Iterate through each vnfr at this priority level
538
539 vnf_unique_name = get_vnf_unique_name(
540 nsr_obj.nsr_name,
541 vnf_config_attributes_dict['name'],
542 str(vnf_config_attributes_dict['member_vnf_index']),
543 )
544 self._log.info("NS (%s) : VNF (%s) - Processing configuration attributes",
545 nsr_obj.nsr_name, vnf_unique_name)
546
547 # Find vnfr for this vnf_unique_name
548 if vnf_unique_name not in nsr_obj._vnfr_dict:
549 self._log.error("NS (%s) - Can not find VNF to be configured: %s", nsr_obj.nsr_name, vnf_unique_name)
550 else:
551 # Save this unique VNF's config input parameters
552 nsr_obj.vnf_config_attributes_dict[vnf_unique_name] = vnf_config_attributes_dict
553 nsr_obj.ConfigVNF(nsr_obj._vnfr_dict[vnf_unique_name])
554
555 # Now add the entire NS to the pending config list.
556 self._log.info("Scheduling NSR:{} configuration".format(nsr_obj.nsr_name))
557 self._parent.add_to_pending(nsr_obj)
558 self._parent.add_nsr_obj(nsr_obj)
559
560 except Exception as e:
561 self._log.error("Failed processing input parameters for NS (%s) as %s", nsr_obj.nsr_name, str(e))
562 raise
563 else:
564 self._log.error("No configuration input parameters for NSR (%s)", nsr_obj.nsr_name)
565
566 except Exception as e:
567 self._log.error("Failed to configure NS (%s) as (%s)", nsr_obj.nsr_name, str(e))
568 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_PROCESS_FAILED)
569 raise
570
571 return True
572
573 @asyncio.coroutine
574 def terminate_NSR(self, id):
575 nsr_dict = self._nsr_dict
576 if id not in nsr_dict:
577 self._log.error("NSR(%s) does not exist!", id)
578 return
579 else:
580 # Remove this NSR if we have it on pending task list
581 for task in self.pending_tasks:
582 if task['nsrid'] == id:
583 self.del_from_pending_tasks(task)
584
585 # Remove this object from global list
586 nsr_obj = nsr_dict.pop(id, None)
587
588 # Remove this NS cm-state from global status list
589 self.cm_state['cm_nsr'].remove(nsr_obj.cm_nsr)
590
591 # Also remove any scheduled configuration event
592 for nsr_obj_p in self._parent.pending_cfg:
593 if nsr_obj_p == nsr_obj:
594 assert id == nsr_obj_p._nsr_id
595 #self._parent.pending_cfg.remove(nsr_obj_p)
596 # Mark this as being deleted so we do not try to configure it if we are in cfg_delay (will wake up and continue to process otherwise)
597 nsr_obj_p.being_deleted = True
598 self._log.info("Removed scheduled configuration for NSR(%s)", nsr_obj.nsr_name)
599
600 self._parent.remove_nsr_obj(id)
601
602 # Call Config Agent to clean up for each VNF
603 for agent_vnfr in nsr_obj.agent_nsr.vnfrs:
604 yield from self._config_agent_mgr.invoke_config_agent_plugins(
605 'notify_terminate_vnfr',
606 nsr_obj.agent_nsr,
607 agent_vnfr)
608
609 # publish delete cm-state (cm-nsr)
610 yield from nsr_obj.delete_cm_nsr()
611
612 self._log.info("NSR(%s/%s) is deleted", nsr_obj.nsr_name, id)
613
614 @asyncio.coroutine
615 def process_initial_config(self, nsr_obj, conf, script, vnfr_name=None):
616 '''Apply the initial-config-primitives specified in NSD or VNFD'''
617
618 def get_input_file(parameters):
619 inp = {}
620
621 # Add NSR name to file
622 inp['nsr_name'] = nsr_obj.nsr_name
623
624 # Add VNFR name if available
625 if vnfr_name:
626 inp['vnfr_name'] = vnfr_name
627
628 # Add parameters for initial config
629 inp['parameter'] = {}
630 for parameter in parameters:
631 try:
632 inp['parameter'][parameter['name']] = parameter['value']
633 except KeyError as e:
634 if vnfr_name:
635 self._log.info("VNFR {} initial config parameter {} with no value: {}".
636 format(vnfr_name, parameter, e))
637 else:
638 self._log.info("NSR {} initial config parameter {} with no value: {}".
639 format(nsr_obj.nsr_name, parameter, e))
640
641
642 # Add config agents specific to each VNFR
643 inp['config-agent'] = {}
644 for vnfr in nsr_obj.agent_nsr.vnfrs:
645 # Get the config agent for the VNFR
646 # If vnfr name is specified, add only CA specific to that
647 if (vnfr_name is None) or \
648 (vnfr_name == vnfr.name):
649 agent = self._config_agent_mgr.get_vnfr_config_agent(vnfr.vnfr_msg)
650 if agent:
651 if agent.agent_type != riftcm_config_plugin.DEFAULT_CAP_TYPE:
652 inp['config-agent'][vnfr.member_vnf_index] = agent.agent_data
653 inp['config-agent'][vnfr.member_vnf_index] \
654 ['service-name'] = agent.get_service_name(vnfr.id)
655
656 # Add vnfrs specific data
657 inp['vnfr'] = {}
658 for vnfr in nsr_obj.vnfrs:
659 v = {}
660
661 v['name'] = vnfr['name']
662 v['mgmt_ip_address'] = vnfr['vnf_cfg']['mgmt_ip_address']
663 v['mgmt_port'] = vnfr['vnf_cfg']['port']
664
665 if 'dashboard_url' in vnfr:
666 v['dashboard_url'] = vnfr['dashboard_url']
667
668 if 'connection_point' in vnfr:
669 v['connection_point'] = []
670 for cp in vnfr['connection_point']:
671 v['connection_point'].append(
672 {
673 'name': cp['name'],
674 'ip_address': cp['ip_address'],
675 }
676 )
677
678 v['vdur'] = []
679 vdu_data = []
680 for vdu in vnfr['vdur']:
681 d = {}
682 for k in ['name','management_ip', 'vm_management_ip', 'id', 'vdu_id_ref']:
683 if k in vdu:
684 d[k] = vdu[k]
685 vdu_data.append(d)
686 v['vdur'] = vdu_data
687
688 inp['vnfr'][vnfr['member_vnf_index_ref']] = v
689
690
691 self._log.debug("Input data for {}: {}".
692 format((vnfr_name if vnfr_name else nsr_obj.nsr_name),
693 inp))
694
695 # Convert to YAML string
696 yaml_string = yaml.dump(inp, default_flow_style=False)
697
698 # Write the inputs as yaml file
699 tmp_file = None
700 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
701 tmp_file.write(yaml_string.encode("UTF-8"))
702 self._log.debug("Input file created for {}: {}".
703 format((vnfr_name if vnfr_name \
704 else nsr_obj.nsr_name),
705 tmp_file.name))
706
707 return tmp_file.name
708
709 parameters = []
710 try:
711 parameters = conf['parameter']
712 except Exception as e:
713 self._log.debug("Parameter conf: {}, e: {}".
714 format(conf, e))
715
716 inp_file = get_input_file(parameters)
717
718 cmd = "{0} {1}".format(script, inp_file)
719 self._log.debug("Running the CMD: {}".format(cmd))
720
721 process = yield from asyncio.create_subprocess_shell(cmd,
722 loop=self._loop,
723 stdout=subprocess.PIPE,
724 stderr=subprocess.PIPE)
725 stdout, stderr = yield from process.communicate()
726 rc = yield from process.wait()
727
728 if rc:
729 msg = "NSR/VNFR {} initial config using {} failed with {}: {}". \
730 format(vnfr_name if vnfr_name else nsr_obj.nsr_name,
731 script, rc, stderr)
732 self._log.error(msg)
733 raise InitialConfigError(msg)
734
735 try:
736 os.remove(inp_file)
737 except Exception as e:
738 self._log.debug("Error removing input file {}: {}".
739 format(inp_file, e))
740
741 def get_script_file(self, script_name, d_name, d_id, d_type):
742 if d_type == "vnfd":
743 package_store = rift.package.store.VnfdPackageFilesystemStore(self._log)
744 package_store.refresh()
745 elif d_type == "nsd":
746 package_store = rift.package.store.NsdPackageFilesystemStore(self._log)
747 package_store.refresh()
748 else:
749 raise
750 script_extractor = rift.package.script.PackageScriptExtractor(self._log)
751 script = script_extractor.get_extracted_script_path(d_id, script_name)
752
753 self._log.debug("Checking for script at %s", script)
754 if not os.path.exists(script):
755 self._log.warning("Did not find script %s", script)
756 return
757
758 # Seen cases in jenkins, where the script execution fails
759 # with permission denied. Setting the permission on script
760 # to make sure it has execute permission
761 perm = os.stat(script).st_mode
762 if not (perm & stat.S_IXUSR):
763 self._log.warning("NSR/VNFR {} script {} " \
764 "without execute permission: {}".
765 format(d_name, script, perm))
766 os.chmod(script, perm | stat.S_IXUSR)
767 return script
768
769 @asyncio.coroutine
770 def process_ns_initial_config(self, nsr_obj):
771 '''Apply the initial-config-primitives specified in NSD'''
772
773 nsr = yield from self.cmdts_obj.get_nsr(nsr_obj.nsr_id)
774 if 'initial_config_primitive' not in nsr:
775 return
776
777 if nsr is not None:
778 nsd = yield from self.cmdts_obj.get_nsd(nsr_obj.nsr_id)
779 for conf in nsr['initial_config_primitive']:
780 self._log.debug("NSR {} initial config: {}".
781 format(nsr_obj.nsr_name, conf))
782 script = self.get_script_file(conf['user_defined_script'],
783 nsd.name,
784 nsd.id,
785 'nsd')
786
787 yield from self.process_initial_config(nsr_obj, conf, script)
788
789 @asyncio.coroutine
790 def process_vnf_initial_config(self, nsr_obj, vnfr):
791 '''Apply the initial-config-primitives specified in VNFD'''
792
793 vnfr_name = vnfr.name
794
795 vnfd = vnfr.vnfd
796 vnf_cfg = vnfd.vnf_configuration
797
798 for conf in vnf_cfg.initial_config_primitive:
799 self._log.debug("VNFR {} initial config: {} for vnfd id {}".
800 format(vnfr_name, conf, vnfd.id))
801
802 if not conf.user_defined_script:
803 self._log.debug("VNFR {} did not find user defined script: {}".
804 format(vnfr_name, conf))
805 continue
806
807 script = self.get_script_file(conf.user_defined_script,
808 vnfd.name,
809 vnfd.id,
810 'vnfd')
811
812 yield from self.process_initial_config(nsr_obj,
813 conf.as_dict(),
814 script,
815 vnfr_name=vnfr_name)
816
817
818 class ConfigManagerNSR(object):
819 def __init__(self, log, loop, parent, id):
820 self._log = log
821 self._loop = loop
822 self._rwcal = None
823 self._vnfr_dict = {}
824 self._cp_dict = {}
825 self._nsr_id = id
826 self._parent = parent
827 self._log.info("Instantiated NSR entry for id=%s", id)
828 self.nsr_cfg_config_attributes_dict = {}
829 self.vnf_config_attributes_dict = {}
830 self.num_vnfs_to_cfg = 0
831 self._vnfr_list = []
832 self.vnf_cfg_list = []
833 self.this_nsr_dir = None
834 self.being_deleted = False
835 self.dts_obj = self._parent.cmdts_obj
836
837 # Initialize cm-state for this NS
838 self.cm_nsr = {}
839 self.cm_nsr['cm_vnfr'] = []
840 self.cm_nsr['id'] = id
841 self.cm_nsr['state'] = self.state_to_string(conmanY.RecordState.INIT)
842 self.cm_nsr['state_details'] = None
843
844 self.set_nsr_name('Not Set')
845
846 # Add this NSR cm-state object to global cm-state
847 parent.cm_state['cm_nsr'].append(self.cm_nsr)
848
849 # Place holders for NSR & VNFR classes
850 self.agent_nsr = None
851
852 @property
853 def nsr_opdata_xpath(self):
854 ''' Returns full xpath for this NSR cm-state opdata '''
855 return(
856 "D,/rw-conman:cm-state" +
857 "/rw-conman:cm-nsr[rw-conman:id='{}']"
858 ).format(self._nsr_id)
859
860 @property
861 def vnfrs(self):
862 return self._vnfr_list
863
864 @property
865 def parent(self):
866 return self._parent
867
868 @property
869 def nsr_id(self):
870 return self._nsr_id
871
872 @asyncio.coroutine
873 def publish_cm_state(self):
874 ''' This function publishes cm_state for this NSR '''
875
876 cm_state = conmanY.CmOpdata()
877 cm_state_nsr = cm_state.cm_nsr.add()
878 cm_state_nsr.from_dict(self.cm_nsr)
879 #with self._dts.transaction() as xact:
880 yield from self.dts_obj.update(self.nsr_opdata_xpath, cm_state_nsr)
881 self._log.info("Published cm-state with xpath %s and nsr %s",
882 self.nsr_opdata_xpath,
883 cm_state_nsr)
884
885 @asyncio.coroutine
886 def delete_cm_nsr(self):
887 ''' This function publishes cm_state for this NSR '''
888
889 yield from self.dts_obj.delete(self.nsr_opdata_xpath)
890 self._log.info("Deleted cm-nsr with xpath %s",
891 self.nsr_opdata_xpath)
892
893 def set_nsr_name(self, name):
894 self.nsr_name = name
895 self.cm_nsr['name'] = name
896
897 def set_config_dir(self, caller):
898 self.this_nsr_dir = os.path.join(
899 caller._parent.cfg_dir, self.nsr_name, caller._nsr['name_ref'])
900 if not os.path.exists(self.this_nsr_dir):
901 os.makedirs(self.this_nsr_dir)
902 self._log.debug("NSR:(%s), Created configuration directory(%s)",
903 caller._nsr['name_ref'], self.this_nsr_dir)
904 self.config_attributes_file = os.path.join(self.this_nsr_dir, "configuration_config_attributes.yml")
905
906 def ConfigVNF(self, vnfr):
907
908 vnf_cfg = vnfr['vnf_cfg']
909 vnf_cm_state = self.find_or_create_vnfr_cm_state(vnf_cfg)
910
911 if (vnf_cm_state['state'] == self.state_to_string(conmanY.RecordState.READY_NO_CFG)
912 or
913 vnf_cm_state['state'] == self.state_to_string(conmanY.RecordState.READY)):
914 self._log.warning("NS/VNF (%s/%s) is already configured! Skipped.", self.nsr_name, vnfr['short_name'])
915 return
916
917 #UPdate VNF state
918 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS)
919
920 # Now translate the configuration for iP addresses
921 try:
922 # Add cp_dict members (TAGS) for this VNF
923 self._cp_dict['rw_mgmt_ip'] = vnf_cfg['mgmt_ip_address']
924 self._cp_dict['rw_username'] = vnf_cfg['username']
925 self._cp_dict['rw_password'] = vnf_cfg['password']
926 except Exception as e:
927 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS_FAILED)
928 self._log.error("Failed to set tags for VNF: %s with (%s)", log_this_vnf(vnf_cfg), str(e))
929 return
930
931 self._log.info("Applying config to VNF: %s = %s!", log_this_vnf(vnf_cfg), vnf_cfg)
932 try:
933 #self.vnf_cfg_list.append(vnf_cfg)
934 self._log.debug("Scheduled configuration!")
935 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_SCHED)
936 except Exception as e:
937 self._log.error("Failed apply_vnf_config to VNF: %s as (%s)", log_this_vnf(vnf_cfg), str(e))
938 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS_FAILED)
939 raise
940
941 def add(self, nsr):
942 self._log.info("Adding NS Record for id=%s", id)
943 self._nsr = nsr
944
945 def sample_cm_state(self):
946 return (
947 {
948 'cm_nsr': [
949 {
950 'cm_vnfr': [
951 {
952 'cfg_location': 'location1',
953 'cfg_type': 'script',
954 'connection_point': [
955 {'ip_address': '1.1.1.1', 'name': 'vnf1cp1'},
956 {'ip_address': '1.1.1.2', 'name': 'vnf1cp2'}
957 ],
958 'id': 'vnfrid1',
959 'mgmt_interface': {'ip_address': '7.1.1.1',
960 'port': 1001},
961 'name': 'vnfrname1',
962 'state': 'init'
963 },
964 {
965 'cfg_location': 'location2',
966 'cfg_type': 'netconf',
967 'connection_point': [{'ip_address': '2.1.1.1', 'name': 'vnf2cp1'},
968 {'ip_address': '2.1.1.2', 'name': 'vnf2cp2'}],
969 'id': 'vnfrid2',
970 'mgmt_interface': {'ip_address': '7.1.1.2',
971 'port': 1001},
972 'name': 'vnfrname2',
973 'state': 'init'}
974 ],
975 'id': 'nsrid1',
976 'name': 'nsrname1',
977 'state': 'init'}
978 ],
979 'states': 'Initialized, '
980 })
981
982 def populate_vm_state_from_vnf_cfg(self):
983 # Fill in each VNFR from this nsr object
984 vnfr_list = self._vnfr_list
985 for vnfr in vnfr_list:
986 vnf_cfg = vnfr['vnf_cfg']
987 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
988
989 if vnf_cm_state:
990 # Fill in VNF management interface
991 vnf_cm_state['mgmt_interface']['ip_address'] = vnf_cfg['mgmt_ip_address']
992 vnf_cm_state['mgmt_interface']['port'] = vnf_cfg['port']
993
994 # Fill in VNF configuration details
995 vnf_cm_state['cfg_type'] = vnf_cfg['config_method']
996 vnf_cm_state['cfg_location'] = vnf_cfg['cfg_file']
997
998 # Fill in each connection-point for this VNF
999 if "connection_point" in vnfr:
1000 cp_list = vnfr['connection_point']
1001 for cp_item_dict in cp_list:
1002 vnf_cm_state['connection_point'].append(
1003 {
1004 'name' : cp_item_dict['name'],
1005 'ip_address' : cp_item_dict['ip_address'],
1006 }
1007 )
1008
1009 def state_to_string(self, state):
1010 state_dict = {
1011 conmanY.RecordState.INIT : "init",
1012 conmanY.RecordState.RECEIVED : "received",
1013 conmanY.RecordState.CFG_PROCESS : "cfg_process",
1014 conmanY.RecordState.CFG_PROCESS_FAILED : "cfg_process_failed",
1015 conmanY.RecordState.CFG_SCHED : "cfg_sched",
1016 conmanY.RecordState.CFG_DELAY : "cfg_delay",
1017 conmanY.RecordState.CONNECTING : "connecting",
1018 conmanY.RecordState.FAILED_CONNECTION : "failed_connection",
1019 conmanY.RecordState.NETCONF_CONNECTED : "netconf_connected",
1020 conmanY.RecordState.NETCONF_SSH_CONNECTED : "netconf_ssh_connected",
1021 conmanY.RecordState.RESTCONF_CONNECTED : "restconf_connected",
1022 conmanY.RecordState.CFG_SEND : "cfg_send",
1023 conmanY.RecordState.CFG_FAILED : "cfg_failed",
1024 conmanY.RecordState.READY_NO_CFG : "ready_no_cfg",
1025 conmanY.RecordState.READY : "ready",
1026 }
1027 return state_dict[state]
1028
1029 def find_vnfr_cm_state(self, id):
1030 if self.cm_nsr['cm_vnfr']:
1031 for vnf_cm_state in self.cm_nsr['cm_vnfr']:
1032 if vnf_cm_state['id'] == id:
1033 return vnf_cm_state
1034 return None
1035
1036 def find_or_create_vnfr_cm_state(self, vnf_cfg):
1037 vnfr = vnf_cfg['vnfr']
1038 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1039
1040 if vnf_cm_state is None:
1041 # Not found, Create and Initialize this VNF cm-state
1042 vnf_cm_state = {
1043 'id' : vnfr['id'],
1044 'name' : vnfr['short_name'],
1045 'state' : self.state_to_string(conmanY.RecordState.RECEIVED),
1046 'mgmt_interface' :
1047 {
1048 'ip_address' : vnf_cfg['mgmt_ip_address'],
1049 'port' : vnf_cfg['port'],
1050 },
1051 'cfg_type' : vnf_cfg['config_method'],
1052 'cfg_location' : vnf_cfg['cfg_file'],
1053 'connection_point' : [],
1054 }
1055 self.cm_nsr['cm_vnfr'].append(vnf_cm_state)
1056
1057 # Publish newly created cm-state
1058
1059
1060 return vnf_cm_state
1061
1062 @asyncio.coroutine
1063 def get_vnf_cm_state(self, vnfr):
1064 if vnfr:
1065 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1066 if vnf_cm_state:
1067 return vnf_cm_state['state']
1068 return False
1069
1070 @asyncio.coroutine
1071 def update_vnf_cm_state(self, vnfr, state):
1072 if vnfr:
1073 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1074 if vnf_cm_state is None:
1075 self._log.error("No opdata found for NS/VNF:%s/%s!",
1076 self.nsr_name, vnfr['short_name'])
1077 return
1078
1079 if vnf_cm_state['state'] != self.state_to_string(state):
1080 old_state = vnf_cm_state['state']
1081 vnf_cm_state['state'] = self.state_to_string(state)
1082 # Publish new state
1083 yield from self.publish_cm_state()
1084 self._log.info("VNF ({}/{}/{}) state change: {} -> {}"
1085 .format(self.nsr_name,
1086 vnfr['short_name'],
1087 vnfr['member_vnf_index_ref'],
1088 old_state,
1089 vnf_cm_state['state']))
1090
1091 else:
1092 self._log.error("No VNFR supplied for state update (NS=%s)!",
1093 self.nsr_name)
1094
1095 @property
1096 def get_ns_cm_state(self):
1097 return self.cm_nsr['state']
1098
1099 @asyncio.coroutine
1100 def update_ns_cm_state(self, state, state_details=None):
1101 if self.cm_nsr['state'] != self.state_to_string(state):
1102 old_state = self.cm_nsr['state']
1103 self.cm_nsr['state'] = self.state_to_string(state)
1104 self.cm_nsr['state_details'] = state_details if state_details is not None else None
1105 self._log.info("NS ({}) state change: {} -> {}"
1106 .format(self.nsr_name,
1107 old_state,
1108 self.cm_nsr['state']))
1109 # Publish new state
1110 yield from self.publish_cm_state()
1111
1112 @asyncio.coroutine
1113 def add_vnfr(self, vnfr, vnfr_msg):
1114
1115 @asyncio.coroutine
1116 def populate_subnets_from_vlr(id):
1117 try:
1118 # Populate cp_dict with VLR subnet info
1119 vlr = yield from self.dts_obj.get_vlr(id)
1120 if vlr is not None and 'assigned_subnet' in vlr:
1121 subnet = {vlr.name:vlr.assigned_subnet}
1122 self._cp_dict[vnfr['member_vnf_index_ref']].update(subnet)
1123 self._cp_dict.update(subnet)
1124 self._log.debug("VNF:(%s) Updated assigned subnet = %s",
1125 vnfr['short_name'], subnet)
1126 except Exception as e:
1127 self._log.error("VNF:(%s) VLR Error = %s",
1128 vnfr['short_name'], e)
1129
1130 if vnfr['id'] not in self._vnfr_dict:
1131 self._log.info("NSR(%s) : Adding VNF Record for name=%s, id=%s", self._nsr_id, vnfr['short_name'], vnfr['id'])
1132 # Add this vnfr to the list for show, or single traversal
1133 self._vnfr_list.append(vnfr)
1134 else:
1135 self._log.warning("NSR(%s) : VNF Record for name=%s, id=%s already exists, overwriting", self._nsr_id, vnfr['short_name'], vnfr['id'])
1136
1137 # Make vnfr available by id as well as by name
1138 unique_name = get_vnf_unique_name(self.nsr_name, vnfr['short_name'], vnfr['member_vnf_index_ref'])
1139 self._vnfr_dict[unique_name] = vnfr
1140 self._vnfr_dict[vnfr['id']] = vnfr
1141
1142 # Create vnf_cfg dictionary with default values
1143 vnf_cfg = {
1144 'nsr_obj' : self,
1145 'vnfr' : vnfr,
1146 'agent_vnfr' : self.agent_nsr.add_vnfr(vnfr, vnfr_msg),
1147 'nsr_name' : self.nsr_name,
1148 'nsr_id' : self._nsr_id,
1149 'vnfr_name' : vnfr['short_name'],
1150 'member_vnf_index' : vnfr['member_vnf_index_ref'],
1151 'port' : 0,
1152 'username' : 'admin',
1153 'password' : 'admin',
1154 'config_method' : 'None',
1155 'protocol' : 'None',
1156 'mgmt_ip_address' : '0.0.0.0',
1157 'cfg_file' : 'None',
1158 'cfg_retries' : 0,
1159 'script_type' : 'bash',
1160 }
1161
1162 # Update the mgmt ip address
1163 # In case the config method is none, this is not
1164 # updated later
1165 try:
1166 vnf_cfg['mgmt_ip_address'] = vnfr_msg.mgmt_interface.ip_address
1167 vnf_cfg['port'] = vnfr_msg.mgmt_interface.port
1168 except Exception as e:
1169 self._log.warn(
1170 "VNFR {}({}), unable to retrieve mgmt ip address: {}".
1171 format(vnfr['short_name'], vnfr['id'], e))
1172
1173 vnfr['vnf_cfg'] = vnf_cfg
1174 self.find_or_create_vnfr_cm_state(vnf_cfg)
1175
1176 '''
1177 Build the connection-points list for this VNF (self._cp_dict)
1178 '''
1179 # Populate global CP list self._cp_dict from VNFR
1180 cp_list = []
1181 if 'connection_point' in vnfr:
1182 cp_list = vnfr['connection_point']
1183
1184 self._cp_dict[vnfr['member_vnf_index_ref']] = {}
1185 if 'vdur' in vnfr:
1186 for vdur in vnfr['vdur']:
1187 if 'internal_connection_point' in vdur:
1188 cp_list += vdur['internal_connection_point']
1189
1190 for cp_item_dict in cp_list:
1191 # Populate global dictionary
1192 self._cp_dict[
1193 cp_item_dict['name']
1194 ] = cp_item_dict['ip_address']
1195
1196 # Populate unique member specific dictionary
1197 self._cp_dict[
1198 vnfr['member_vnf_index_ref']
1199 ][
1200 cp_item_dict['name']
1201 ] = cp_item_dict['ip_address']
1202
1203 # Fill in the subnets from vlr
1204 if 'vlr_ref' in cp_item_dict:
1205 ### HACK: Internal connection_point do not have VLR reference
1206 yield from populate_subnets_from_vlr(cp_item_dict['vlr_ref'])
1207
1208 if 'internal_vlr' in vnfr:
1209 for ivlr in vnfr['internal_vlr']:
1210 yield from populate_subnets_from_vlr(ivlr['vlr_ref'])
1211
1212 # Update vnfr
1213 vnf_cfg['agent_vnfr']._vnfr = vnfr
1214 return vnf_cfg['agent_vnfr']
1215
1216
1217 class XPaths(object):
1218 @staticmethod
1219 def nsr_opdata(k=None):
1220 return ("D,/nsr:ns-instance-opdata/nsr:nsr" +
1221 ("[nsr:ns-instance-config-ref='{}']".format(k) if k is not None else ""))
1222
1223 @staticmethod
1224 def nsd_msg(k=None):
1225 return ("C,/nsd:nsd-catalog/nsd:nsd" +
1226 "[nsd:id = '{}']".format(k) if k is not None else "")
1227
1228 @staticmethod
1229 def vnfr_opdata(k=None):
1230 return ("D,/vnfr:vnfr-catalog/vnfr:vnfr" +
1231 ("[vnfr:id='{}']".format(k) if k is not None else ""))
1232
1233 @staticmethod
1234 def vnfd(k=None):
1235 return ("C,/vnfd:vnfd-catalog/vnfd:vnfd" +
1236 ("[vnfd:id='{}']".format(k) if k is not None else ""))
1237
1238 @staticmethod
1239 def config_agent(k=None):
1240 return ("D,/rw-config-agent:config-agent/rw-config-agent:account" +
1241 ("[rw-config-agent:name='{}']".format(k) if k is not None else ""))
1242
1243 @staticmethod
1244 def nsr_config(k=None):
1245 return ("C,/nsr:ns-instance-config/nsr:nsr" +
1246 ("[nsr:id='{}']".format(k) if k is not None else ""))
1247
1248 @staticmethod
1249 def vlr(k=None):
1250 return ("D,/vlr:vlr-catalog/vlr:vlr" +
1251 ("[vlr:id='{}']".format(k) if k is not None else ""))
1252
1253 class ConfigManagerDTS(object):
1254 ''' This class either reads from DTS or publishes to DTS '''
1255
1256 def __init__(self, log, loop, parent, dts):
1257 self._log = log
1258 self._loop = loop
1259 self._parent = parent
1260 self._dts = dts
1261
1262 @asyncio.coroutine
1263 def _read_dts(self, xpath, do_trace=False):
1264 self._log.debug("_read_dts path = %s", xpath)
1265 flags = rwdts.XactFlag.MERGE
1266 res_iter = yield from self._dts.query_read(
1267 xpath, flags=flags
1268 )
1269
1270 results = []
1271 try:
1272 for i in res_iter:
1273 result = yield from i
1274 if result is not None:
1275 results.append(result.result)
1276 except:
1277 pass
1278
1279 return results
1280
1281
1282 @asyncio.coroutine
1283 def get_nsr(self, id):
1284 self._log.debug("Attempting to get NSR: %s", id)
1285 nsrl = yield from self._read_dts(XPaths.nsr_opdata(id), False)
1286 nsr = None
1287 if len(nsrl) > 0:
1288 nsr = nsrl[0].as_dict()
1289 return nsr
1290
1291 @asyncio.coroutine
1292 def get_nsr_config(self, id):
1293 self._log.debug("Attempting to get config NSR: %s", id)
1294 nsrl = yield from self._read_dts(XPaths.nsr_config(id), False)
1295 nsr = None
1296 if len(nsrl) > 0:
1297 nsr = nsrl[0]
1298 return nsr
1299
1300 @asyncio.coroutine
1301 def get_nsd_msg(self, id):
1302 self._log.debug("Attempting to get NSD: %s", id)
1303 nsdl = yield from self._read_dts(XPaths.nsd_msg(id), False)
1304 nsd_msg = None
1305 if len(nsdl) > 0:
1306 nsd_msg = nsdl[0]
1307 return nsd_msg
1308
1309 @asyncio.coroutine
1310 def get_nsd(self, nsr_id):
1311 self._log.debug("Attempting to get NSD for NSR: %s", id)
1312 nsr_config = yield from self.get_nsr_config(nsr_id)
1313 nsd_msg = nsr_config.nsd
1314 return nsd_msg
1315
1316 @asyncio.coroutine
1317 def get_vnfr(self, id):
1318 self._log.debug("Attempting to get VNFR: %s", id)
1319 vnfrl = yield from self._read_dts(XPaths.vnfr_opdata(id), do_trace=False)
1320 vnfr_msg = None
1321 if len(vnfrl) > 0:
1322 vnfr_msg = vnfrl[0]
1323 return vnfr_msg
1324
1325 @asyncio.coroutine
1326 def get_vnfd(self, vnfd_id):
1327 self._log.debug("Attempting to get VNFD: %s", vnfd_id)
1328 vnfdl = yield from self._read_dts(XPaths.vnfd(vnfd_id), do_trace=False)
1329 vnfd_msg = None
1330 if len(vnfdl) > 0:
1331 vnfd_msg = vnfdl[0]
1332 return vnfd_msg
1333
1334 @asyncio.coroutine
1335 def get_vlr(self, id):
1336 self._log.debug("Attempting to get VLR subnet: %s", id)
1337 vlrl = yield from self._read_dts(XPaths.vlr(id), do_trace=True)
1338 vlr_msg = None
1339 if len(vlrl) > 0:
1340 vlr_msg = vlrl[0]
1341 return vlr_msg
1342
1343 @asyncio.coroutine
1344 def get_config_agents(self, name):
1345 self._log.debug("Attempting to get config_agents: %s", name)
1346 cfgagentl = yield from self._read_dts(XPaths.config_agent(name), False)
1347 return cfgagentl
1348
1349 @asyncio.coroutine
1350 def update(self, path, msg, flags=rwdts.XactFlag.REPLACE):
1351 """
1352 Update a cm-state (cm-nsr) record in DTS with the path and message
1353 """
1354 self._log.debug("Updating cm-state %s:%s dts_pub_hdl = %s", path, msg, self.dts_pub_hdl)
1355 self.dts_pub_hdl.update_element(path, msg, flags)
1356 self._log.debug("Updated cm-state, %s:%s", path, msg)
1357
1358 @asyncio.coroutine
1359 def delete(self, path):
1360 """
1361 Delete cm-nsr record in DTS with the path only
1362 """
1363 self._log.debug("Deleting cm-nsr %s dts_pub_hdl = %s", path, self.dts_pub_hdl)
1364 self.dts_pub_hdl.delete_element(path)
1365 self._log.debug("Deleted cm-nsr, %s", path)
1366
1367 @asyncio.coroutine
1368 def register(self):
1369 yield from self.register_to_publish()
1370 yield from self.register_for_nsr()
1371
1372 @asyncio.coroutine
1373 def register_to_publish(self):
1374 ''' Register to DTS for publishing cm-state opdata '''
1375
1376 xpath = "D,/rw-conman:cm-state/rw-conman:cm-nsr"
1377 self._log.debug("Registering to publish cm-state @ %s", xpath)
1378 hdl = rift.tasklets.DTS.RegistrationHandler()
1379 with self._dts.group_create() as group:
1380 self.dts_pub_hdl = group.register(xpath=xpath,
1381 handler=hdl,
1382 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ)
1383
1384 @property
1385 def nsr_xpath(self):
1386 return "D,/nsr:ns-instance-opdata/nsr:nsr"
1387
1388 @asyncio.coroutine
1389 def register_for_nsr(self):
1390 """ Register for NSR changes """
1391
1392 @asyncio.coroutine
1393 def on_prepare(xact_info, query_action, ks_path, msg):
1394 """ This NSR is created """
1395 self._log.debug("Received NSR instantiate on_prepare (%s:%s:%s)",
1396 query_action,
1397 ks_path,
1398 msg)
1399
1400 if (query_action == rwdts.QueryAction.UPDATE or
1401 query_action == rwdts.QueryAction.CREATE):
1402 msg_dict = msg.as_dict()
1403 # Update Each NSR/VNFR state)
1404 if ('operational_status' in msg_dict and
1405 msg_dict['operational_status'] == 'running'):
1406 # Add to the task list
1407 self._parent.add_to_pending_tasks({'nsrid' : msg_dict['ns_instance_config_ref'], 'retries' : 5})
1408 elif query_action == rwdts.QueryAction.DELETE:
1409 nsr_id = msg.ns_instance_config_ref
1410 asyncio.ensure_future(self._parent.terminate_NSR(nsr_id), loop=self._loop)
1411 else:
1412 raise NotImplementedError(
1413 "%s action on cm-state not supported",
1414 query_action)
1415
1416 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1417
1418 try:
1419 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
1420 self.dts_reg_hdl = yield from self._dts.register(self.nsr_xpath,
1421 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1422 handler=handler)
1423 except Exception as e:
1424 self._log.error("Failed to register for NSR changes as %s", str(e))
1425
1426