Merge "Bug 140"
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / rwconman_config.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import os
20 import stat
21 import subprocess
22 import sys
23 import tempfile
24 import yaml
25
26 from gi.repository import (
27 RwDts as rwdts,
28 RwConmanYang as conmanY,
29 ProtobufC,
30 )
31
32 import rift.tasklets
33
34 from . import rwconman_conagent as conagent
35 from . import RiftCM_rpc
36 from . import riftcm_config_plugin
37
38 if sys.version_info < (3, 4, 4):
39 asyncio.ensure_future = asyncio.async
40
41 def get_vnf_unique_name(nsr_name, vnfr_short_name, member_vnf_index):
42 return "{}.{}.{}".format(nsr_name, vnfr_short_name, member_vnf_index)
43
44 class ConmanConfigError(Exception):
45 pass
46
47
48 class InitialConfigError(ConmanConfigError):
49 pass
50
51
52 def log_this_vnf(vnf_cfg):
53 log_vnf = ""
54 used_item_list = ['nsr_name', 'vnfr_name', 'member_vnf_index', 'mgmt_ip_address']
55 for item in used_item_list:
56 if item in vnf_cfg:
57 if item == 'mgmt_ip_address':
58 log_vnf += "({})".format(vnf_cfg[item])
59 else:
60 log_vnf += "{}/".format(vnf_cfg[item])
61 return log_vnf
62
63 class PretendNsm(object):
64 def __init__(self, dts, log, loop, parent):
65 self._dts = dts
66 self._log = log
67 self._loop = loop
68 self._parent = parent
69 self._nsrs = {}
70 self._nsr_dict = parent._nsr_dict
71 self._config_agent_plugins = []
72 self._nsd_msg = {}
73
74 @property
75 def nsrs(self):
76 # Expensive, instead use get_nsr, if you know id.
77 self._nsrs = {}
78 # Update the list of nsrs (agent nsr)
79 for id, nsr_obj in self._nsr_dict.items():
80 self._nsrs[id] = nsr_obj.agent_nsr
81 return self._nsrs
82
83 def get_nsr(self, nsr_id):
84 if nsr_id in self._nsr_dict:
85 nsr_obj = self._nsr_dict[nsr_id]
86 return nsr_obj._nsr
87 return None
88
89 def get_vnfr_msg(self, vnfr_id, nsr_id=None):
90 self._log.debug("get_vnfr_msg(vnfr=%s, nsr=%s)",
91 vnfr_id, nsr_id)
92 found = False
93 if nsr_id:
94 if nsr_id in self._nsr_dict:
95 nsr_obj = self._nsr_dict[nsr_id]
96 if vnfr_id in nsr_obj._vnfr_dict:
97 found = True
98 else:
99 for nsr_obj in self._nsr_dict.values():
100 if vnfr_id in nsr_obj._vnfr_dict:
101 # Found it
102 found = True
103 break
104 if found:
105 vnf_cfg = nsr_obj._vnfr_dict[vnfr_id]['vnf_cfg']
106 return vnf_cfg['agent_vnfr'].vnfr_msg
107 else:
108 return None
109
110 @asyncio.coroutine
111 def get_nsd(self, nsr_id):
112 if nsr_id not in self._nsd_msg:
113 nsr_config = yield from self._parent.cmdts_obj.get_nsr_config(nsr_id)
114 self._nsd_msg[nsr_id] = nsr_config.nsd
115 return self._nsd_msg[nsr_id]
116
117 @property
118 def config_agent_plugins(self):
119 self._config_agent_plugins = []
120 for agent in self._parent._config_agent_mgr._plugin_instances.values():
121 self._config_agent_plugins.append(agent)
122 return self._config_agent_plugins
123
124 class ConfigManagerConfig(object):
125 def __init__(self, dts, log, loop, parent):
126 self._dts = dts
127 self._log = log
128 self._loop = loop
129 self._parent = parent
130 self._nsr_dict = {}
131 self.pending_cfg = {}
132 self.terminate_cfg = {}
133 self.pending_tasks = [] # User for NSRid get retry
134 # (mainly excercised at restart case)
135 self._config_xpath = "C,/cm-config"
136 self._opdata_xpath = "D,/rw-conman:cm-state"
137
138 self.cm_config = conmanY.SoConfig()
139 # RO specific configuration
140 self.ro_config = {}
141 for key in self.cm_config.ro_endpoint.fields:
142 self.ro_config[key] = None
143
144 # Initialize cm-state
145 self.cm_state = {}
146 self.cm_state['cm_nsr'] = []
147 self.cm_state['states'] = "Initialized"
148
149 # Initialize objects to register
150 self.cmdts_obj = ConfigManagerDTS(self._log, self._loop, self, self._dts)
151 self._config_agent_mgr = conagent.RiftCMConfigAgent(
152 self._dts,
153 self._log,
154 self._loop,
155 self,
156 )
157 self.reg_handles = [
158 self.cmdts_obj,
159 self._config_agent_mgr,
160 RiftCM_rpc.RiftCMRPCHandler(self._dts, self._log, self._loop,
161 PretendNsm(
162 self._dts, self._log, self._loop, self)),
163 ]
164
165 def is_nsr_valid(self, nsr_id):
166 if nsr_id in self._nsr_dict:
167 return True
168 return False
169
170 def add_to_pending_tasks(self, task):
171 if self.pending_tasks:
172 for p_task in self.pending_tasks:
173 if p_task['nsrid'] == task['nsrid']:
174 # Already queued
175 return
176 try:
177 self.pending_tasks.append(task)
178 self._log.debug("add_to_pending_tasks (nsrid:%s)",
179 task['nsrid'])
180 if len(self.pending_tasks) == 1:
181 self._loop.create_task(self.ConfigManagerConfig_pending_loop())
182 # TBD - change to info level
183 self._log.debug("Started pending_loop!")
184 except Exception as e:
185 self._log.error("Failed adding to pending tasks (%s)", str(e))
186
187 def del_from_pending_tasks(self, task):
188 try:
189 self.pending_tasks.remove(task)
190 except Exception as e:
191 self._log.error("Failed removing from pending tasks (%s)", str(e))
192
193 @asyncio.coroutine
194 def ConfigManagerConfig_pending_loop(self):
195 loop_sleep = 2
196 while True:
197 yield from asyncio.sleep(loop_sleep, loop=self._loop)
198 """
199 This pending task queue is ordred by events,
200 must finish previous task successfully to be able to go on to the next task
201 """
202 if self.pending_tasks:
203 self._log.debug("self.pending_tasks len=%s", len(self.pending_tasks))
204 task = self.pending_tasks[0]
205 done = False
206 if 'nsrid' in task:
207 nsrid = task['nsrid']
208 self._log.debug("Will execute pending task for NSR id(%s)", nsrid)
209 try:
210 # Try to configure this NSR
211 task['retries'] -= 1
212 done = yield from self.config_NSR(nsrid)
213 self._log.info("self.config_NSR status=%s", done)
214
215 except Exception as e:
216 self._log.error("Failed(%s) configuring NSR(%s)," \
217 "retries remained:%d!",
218 str(e), nsrid, task['retries'])
219 finally:
220 self.pending_tasks.remove(task)
221
222 if done:
223 self._log.debug("Finished pending task NSR id(%s):", nsrid)
224 else:
225 self._log.error("Failed configuring NSR(%s), retries remained:%d!",
226 nsrid, task['retries'])
227
228 # Failed, re-insert (append at the end)
229 # this failed task to be retried later
230 # If any retries remained.
231 if task['retries']:
232 self.pending_tasks.append(task)
233 else:
234 self._log.debug("Stopped pending_loop!")
235 break
236
237 @asyncio.coroutine
238 def register(self):
239 yield from self.register_cm_state_opdata()
240
241 # Initialize all handles that needs to be registered
242 for reg in self.reg_handles:
243 yield from reg.register()
244
245 @asyncio.coroutine
246 def register_cm_state_opdata(self):
247
248 def state_to_string(state):
249 state_dict = {
250 conmanY.RecordState.INIT : "init",
251 conmanY.RecordState.RECEIVED : "received",
252 conmanY.RecordState.CFG_PROCESS : "cfg_process",
253 conmanY.RecordState.CFG_PROCESS_FAILED : "cfg_process_failed",
254 conmanY.RecordState.CFG_SCHED : "cfg_sched",
255 conmanY.RecordState.CFG_DELAY : "cfg_delay",
256 conmanY.RecordState.CONNECTING : "connecting",
257 conmanY.RecordState.FAILED_CONNECTION : "failed_connection",
258 conmanY.RecordState.NETCONF_CONNECTED : "netconf_connected",
259 conmanY.RecordState.NETCONF_SSH_CONNECTED : "netconf_ssh_connected",
260 conmanY.RecordState.RESTCONF_CONNECTED : "restconf_connected",
261 conmanY.RecordState.CFG_SEND : "cfg_send",
262 conmanY.RecordState.CFG_FAILED : "cfg_failed",
263 conmanY.RecordState.READY_NO_CFG : "ready_no_cfg",
264 conmanY.RecordState.READY : "ready",
265 }
266 return state_dict[state]
267
268 @asyncio.coroutine
269 def on_prepare(xact_info, action, ks_path, msg):
270
271 self._log.debug("Received cm-state: msg=%s, action=%s", msg, action)
272
273 if action == rwdts.QueryAction.READ:
274 show_output = conmanY.CmOpdata()
275 show_output.from_dict(self.cm_state)
276 self._log.debug("Responding to SHOW cm-state: %s", self.cm_state)
277 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
278 xpath=self._opdata_xpath,
279 msg=show_output)
280 else:
281 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
282
283 self._log.info("Registering for cm-opdata xpath: %s",
284 self._opdata_xpath)
285
286 try:
287 handler=rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
288 yield from self._dts.register(xpath=self._opdata_xpath,
289 handler=handler,
290 flags=rwdts.Flag.PUBLISHER)
291 self._log.info("Successfully registered for opdata(%s)", self._opdata_xpath)
292 except Exception as e:
293 self._log.error("Failed to register for opdata as (%s)", e)
294
295 @asyncio.coroutine
296 def process_nsd_vnf_configuration(self, nsr_obj, vnfr):
297
298 def get_config_method(vnf_config):
299 cfg_types = ['netconf', 'juju', 'script']
300 for method in cfg_types:
301 if method in vnf_config:
302 return method
303 return None
304
305 def get_cfg_file_extension(method, configuration_options):
306 ext_dict = {
307 "netconf" : "xml",
308 "script" : {
309 "bash" : "sh",
310 "expect" : "exp",
311 },
312 "juju" : "yml"
313 }
314
315 if method == "netconf":
316 return ext_dict[method]
317 elif method == "script":
318 return ext_dict[method][configuration_options['script_type']]
319 elif method == "juju":
320 return ext_dict[method]
321 else:
322 return "cfg"
323
324 # This is how the YAML file should look like,
325 # This routine will be called for each VNF, so keep appending the file.
326 # priority order is determined by the number,
327 # hence no need to generate the file in that order. A dictionary will be
328 # used that will take care of the order by number.
329 '''
330 1 : <== This is priority
331 name : trafsink_vnfd
332 member_vnf_index : 2
333 configuration_delay : 120
334 configuration_type : netconf
335 configuration_options :
336 username : admin
337 password : admin
338 port : 2022
339 target : running
340 2 :
341 name : trafgen_vnfd
342 member_vnf_index : 1
343 configuration_delay : 0
344 configuration_type : netconf
345 configuration_options :
346 username : admin
347 password : admin
348 port : 2022
349 target : running
350 '''
351
352 # Save some parameters needed as short cuts in flat structure (Also generated)
353 vnf_cfg = vnfr['vnf_cfg']
354 # Prepare unique name for this VNF
355 vnf_cfg['vnf_unique_name'] = get_vnf_unique_name(
356 vnf_cfg['nsr_name'], vnfr['short_name'], vnfr['member_vnf_index_ref'])
357
358 nsr_obj.cfg_path_prefix = '{}/{}_{}'.format(
359 nsr_obj.this_nsr_dir, vnfr['short_name'], vnfr['member_vnf_index_ref'])
360 nsr_vnfr = '{}/{}_{}'.format(
361 vnf_cfg['nsr_name'], vnfr['short_name'], vnfr['member_vnf_index_ref'])
362
363 # Get vnf_configuration from vnfr
364 vnf_config = vnfr['vnf_configuration']
365
366 self._log.debug("vnf_configuration = %s", vnf_config)
367
368 # Create priority dictionary
369 cfg_priority_order = 0
370 if ('config_attributes' in vnf_config and
371 'config_priority' in vnf_config['config_attributes']):
372 cfg_priority_order = vnf_config['config_attributes']['config_priority']
373
374 if cfg_priority_order not in nsr_obj.nsr_cfg_config_attributes_dict:
375 # No VNFR with this priority yet, initialize the list
376 nsr_obj.nsr_cfg_config_attributes_dict[cfg_priority_order] = []
377
378 method = get_config_method(vnf_config)
379 if method is not None:
380 # Create all sub dictionaries first
381 config_priority = {
382 'id' : vnfr['id'],
383 'name' : vnfr['short_name'],
384 'member_vnf_index' : vnfr['member_vnf_index_ref'],
385 }
386
387 if 'config_delay' in vnf_config['config_attributes']:
388 config_priority['configuration_delay'] = vnf_config['config_attributes']['config_delay']
389 vnf_cfg['config_delay'] = config_priority['configuration_delay']
390
391 configuration_options = {}
392 self._log.debug("config method=%s", method)
393 config_priority['configuration_type'] = method
394 vnf_cfg['config_method'] = method
395
396 # Set config agent based on method
397 self._config_agent_mgr.set_config_agent(
398 nsr_obj.agent_nsr, vnf_cfg['agent_vnfr'], method)
399
400 cfg_opt_list = [
401 'port', 'target', 'script_type', 'ip_address', 'user', 'secret',
402 ]
403 for cfg_opt in cfg_opt_list:
404 if cfg_opt in vnf_config[method]:
405 configuration_options[cfg_opt] = vnf_config[method][cfg_opt]
406 vnf_cfg[cfg_opt] = configuration_options[cfg_opt]
407
408 cfg_opt_list = ['mgmt_ip_address', 'username', 'password']
409 for cfg_opt in cfg_opt_list:
410 if cfg_opt in vnf_config['config_access']:
411 configuration_options[cfg_opt] = vnf_config['config_access'][cfg_opt]
412 vnf_cfg[cfg_opt] = configuration_options[cfg_opt]
413
414 # Add to the cp_dict
415 vnf_cp_dict = nsr_obj._cp_dict[vnfr['member_vnf_index_ref']]
416 vnf_cp_dict['rw_mgmt_ip'] = vnf_cfg['mgmt_ip_address']
417 vnf_cp_dict['rw_username'] = vnf_cfg['username']
418 vnf_cp_dict['rw_password'] = vnf_cfg['password']
419
420
421 # TBD - see if we can neatly include the config in "config_attributes" file, no need though
422 #config_priority['config_template'] = vnf_config['config_template']
423 # Create config file
424 vnf_cfg['juju_script'] = os.path.join(self._parent.cfg_dir, 'juju_if.py')
425
426 if 'config_template' in vnf_config:
427 vnf_cfg['cfg_template'] = '{}_{}_template.cfg'.format(nsr_obj.cfg_path_prefix, config_priority['configuration_type'])
428 vnf_cfg['cfg_file'] = '{}.{}'.format(nsr_obj.cfg_path_prefix, get_cfg_file_extension(method, configuration_options))
429 vnf_cfg['xlate_script'] = os.path.join(self._parent.cfg_dir, 'xlate_cfg.py')
430 try:
431 # Now write this template into file
432 with open(vnf_cfg['cfg_template'], "w") as cf:
433 cf.write(vnf_config['config_template'])
434 except Exception as e:
435 self._log.error("Processing NSD, failed to generate configuration template : %s (Error : %s)",
436 vnf_config['config_template'], str(e))
437 raise
438
439 self._log.debug("VNF endpoint so far: %s", vnf_cfg)
440
441 # Populate filled up dictionary
442 config_priority['configuration_options'] = configuration_options
443 nsr_obj.nsr_cfg_config_attributes_dict[cfg_priority_order].append(config_priority)
444 nsr_obj.num_vnfs_to_cfg += 1
445 nsr_obj._vnfr_dict[vnf_cfg['vnf_unique_name']] = vnfr
446 nsr_obj._vnfr_dict[vnfr['id']] = vnfr
447
448 self._log.debug("VNF:(%s) config_attributes = %s",
449 log_this_vnf(vnfr['vnf_cfg']),
450 nsr_obj.nsr_cfg_config_attributes_dict)
451 else:
452 self._log.info("VNF:(%s) is not to be configured by Configuration Manager!",
453 log_this_vnf(vnfr['vnf_cfg']))
454 yield from nsr_obj.update_vnf_cm_state(vnfr, conmanY.RecordState.READY_NO_CFG)
455
456 # Update the cm-state
457 nsr_obj.populate_vm_state_from_vnf_cfg()
458
459 @asyncio.coroutine
460 def config_NSR(self, id):
461
462 def my_yaml_dump(config_attributes_dict, yf):
463
464 yaml_dict = dict(sorted(config_attributes_dict.items()))
465 yf.write(yaml.dump(yaml_dict, default_flow_style=False))
466
467 nsr_dict = self._nsr_dict
468 self._log.info("Configure NSR, id = %s", id)
469
470 #####################TBD###########################
471 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_create_nsr', self.id, self._nsd)
472 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_nsr_active', self.id, self._vnfrs)
473
474 try:
475 if id not in nsr_dict:
476 nsr_obj = ConfigManagerNSR(self._log, self._loop, self, id)
477 nsr_dict[id] = nsr_obj
478 else:
479 self._log.info("NSR(%s) is already initialized!", id)
480 nsr_obj = nsr_dict[id]
481 except Exception as e:
482 self._log.error("Failed creating NSR object for (%s) as (%s)", id, str(e))
483 raise
484
485 # Try to configure this NSR only if not already processed
486 if nsr_obj.cm_nsr['state'] != nsr_obj.state_to_string(conmanY.RecordState.INIT):
487 self._log.debug("NSR(%s) is already processed, state=%s",
488 nsr_obj.nsr_name, nsr_obj.cm_nsr['state'])
489 yield from nsr_obj.publish_cm_state()
490 return True
491
492 cmdts_obj = self.cmdts_obj
493 try:
494 # Fetch NSR
495 nsr = yield from cmdts_obj.get_nsr(id)
496 self._log.debug("Full NSR : %s", nsr)
497 if nsr['operational_status'] != "running":
498 self._log.info("NSR(%s) is not ready yet!", nsr['nsd_name_ref'])
499 return False
500 self._nsr = nsr
501
502 # Create Agent NSR class
503 nsr_config = yield from cmdts_obj.get_nsr_config(id)
504 self._log.debug("NSR {} config: {}".format(id, nsr_config))
505 nsr_obj.agent_nsr = riftcm_config_plugin.RiftCMnsr(nsr, nsr_config)
506
507 try:
508 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.RECEIVED)
509
510 # Parse NSR
511 if nsr is not None:
512 nsr_obj.set_nsr_name(nsr['nsd_name_ref'])
513 nsr_dir = os.path.join(self._parent.cfg_dir, nsr_obj.nsr_name)
514 self._log.info("Checking NS config directory: %s", nsr_dir)
515 if not os.path.isdir(nsr_dir):
516 os.makedirs(nsr_dir)
517 # self._log.critical("NS %s is not to be configured by Service Orchestrator!", nsr_obj.nsr_name)
518 # yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.READY_NO_CFG)
519 # return
520
521 nsr_obj.set_config_dir(self)
522
523 for const_vnfr in nsr['constituent_vnfr_ref']:
524 self._log.debug("Fetching VNFR (%s)", const_vnfr['vnfr_id'])
525 vnfr_msg = yield from cmdts_obj.get_vnfr(const_vnfr['vnfr_id'])
526 if vnfr_msg:
527 vnfr = vnfr_msg.as_dict()
528 self._log.info("create VNF:{}/{}".format(nsr_obj.nsr_name, vnfr['short_name']))
529 agent_vnfr = yield from nsr_obj.add_vnfr(vnfr, vnfr_msg)
530
531 # Preserve order, self.process_nsd_vnf_configuration()
532 # sets up the config agent based on the method
533 yield from self.process_nsd_vnf_configuration(nsr_obj, vnfr)
534 yield from self._config_agent_mgr.invoke_config_agent_plugins(
535 'notify_create_vnfr',
536 nsr_obj.agent_nsr,
537 agent_vnfr)
538
539 #####################TBD###########################
540 # self._log.debug("VNF active. Apply initial config for vnfr {}".format(vnfr.name))
541 # yield from self._config_agent_mgr.invoke_config_agent_plugins('apply_initial_config',
542 # vnfr.id, vnfr)
543 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_terminate_vnf', self.id, vnfr)
544
545 except Exception as e:
546 self._log.error("Failed processing NSR (%s) as (%s)", nsr_obj.nsr_name, str(e))
547 self._log.exception(e)
548 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_PROCESS_FAILED)
549 raise
550
551 try:
552 # Generate config_config_attributes.yaml (For debug reference)
553 with open(nsr_obj.config_attributes_file, "w") as yf:
554 my_yaml_dump(nsr_obj.nsr_cfg_config_attributes_dict, yf)
555 except Exception as e:
556 self._log.error("NS:(%s) failed to write config attributes file as (%s)", nsr_obj.nsr_name, str(e))
557
558 try:
559 # Generate nsr_xlate_dict.yaml (For debug reference)
560 with open(nsr_obj.xlate_dict_file, "w") as yf:
561 yf.write(yaml.dump(nsr_obj._cp_dict, default_flow_style=False))
562 except Exception as e:
563 self._log.error("NS:(%s) failed to write nsr xlate tags file as (%s)", nsr_obj.nsr_name, str(e))
564
565 self._log.debug("Starting to configure each VNF")
566
567 # Check if this NS has input parametrs
568 self._log.info("Checking NS configuration order: %s", nsr_obj.config_attributes_file)
569
570 if os.path.exists(nsr_obj.config_attributes_file):
571 # Apply configuration is specified order
572 try:
573 # Go in loop to configure by specified order
574 self._log.info("Using Dynamic configuration input parametrs for NS: %s", nsr_obj.nsr_name)
575
576 # cfg_delay = nsr_obj.nsr_cfg_config_attributes_dict['configuration_delay']
577 # if cfg_delay:
578 # self._log.info("Applying configuration delay for NS (%s) ; %d seconds",
579 # nsr_obj.nsr_name, cfg_delay)
580 # yield from asyncio.sleep(cfg_delay, loop=self._loop)
581
582 for config_attributes_dict in nsr_obj.nsr_cfg_config_attributes_dict.values():
583 # Iterate through each priority level
584 for vnf_config_attributes_dict in config_attributes_dict:
585 # Iterate through each vnfr at this priority level
586
587 # Make up vnf_unique_name with vnfd name and member index
588 #vnfr_name = "{}.{}".format(nsr_obj.nsr_name, vnf_config_attributes_dict['name'])
589 vnf_unique_name = get_vnf_unique_name(
590 nsr_obj.nsr_name,
591 vnf_config_attributes_dict['name'],
592 str(vnf_config_attributes_dict['member_vnf_index']),
593 )
594 self._log.info("NS (%s) : VNF (%s) - Processing configuration attributes",
595 nsr_obj.nsr_name, vnf_unique_name)
596
597 # Find vnfr for this vnf_unique_name
598 if vnf_unique_name not in nsr_obj._vnfr_dict:
599 self._log.error("NS (%s) - Can not find VNF to be configured: %s", nsr_obj.nsr_name, vnf_unique_name)
600 else:
601 # Save this unique VNF's config input parameters
602 nsr_obj.vnf_config_attributes_dict[vnf_unique_name] = vnf_config_attributes_dict
603 nsr_obj.ConfigVNF(nsr_obj._vnfr_dict[vnf_unique_name])
604
605 # Now add the entire NS to the pending config list.
606 self._log.info("Scheduling NSR:{} configuration".format(nsr_obj.nsr_name))
607 self._parent.add_to_pending(nsr_obj)
608 self._parent.add_nsr_obj(nsr_obj)
609
610 except Exception as e:
611 self._log.error("Failed processing input parameters for NS (%s) as %s", nsr_obj.nsr_name, str(e))
612 raise
613 else:
614 self._log.error("No configuration input parameters for NSR (%s)", nsr_obj.nsr_name)
615
616 except Exception as e:
617 self._log.error("Failed to configure NS (%s) as (%s)", nsr_obj.nsr_name, str(e))
618 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_PROCESS_FAILED)
619 raise
620
621 return True
622
623 @asyncio.coroutine
624 def terminate_NSR(self, id):
625 nsr_dict = self._nsr_dict
626 if id not in nsr_dict:
627 self._log.error("NSR(%s) does not exist!", id)
628 return
629 else:
630 # Remove this NSR if we have it on pending task list
631 for task in self.pending_tasks:
632 if task['nsrid'] == id:
633 self.del_from_pending_tasks(task)
634
635 # Remove this object from global list
636 nsr_obj = nsr_dict.pop(id, None)
637
638 # Remove this NS cm-state from global status list
639 self.cm_state['cm_nsr'].remove(nsr_obj.cm_nsr)
640
641 # Also remove any scheduled configuration event
642 for nsr_obj_p in self._parent.pending_cfg:
643 if nsr_obj_p == nsr_obj:
644 assert id == nsr_obj_p._nsr_id
645 #self._parent.pending_cfg.remove(nsr_obj_p)
646 # Mark this as being deleted so we do not try to configure it if we are in cfg_delay (will wake up and continue to process otherwise)
647 nsr_obj_p.being_deleted = True
648 self._log.info("Removed scheduled configuration for NSR(%s)", nsr_obj.nsr_name)
649
650 self._parent.remove_nsr_obj(id)
651
652 # Call Config Agent to clean up for each VNF
653 for agent_vnfr in nsr_obj.agent_nsr.vnfrs:
654 yield from self._config_agent_mgr.invoke_config_agent_plugins(
655 'notify_terminate_vnfr',
656 nsr_obj.agent_nsr,
657 agent_vnfr)
658
659 # publish delete cm-state (cm-nsr)
660 yield from nsr_obj.delete_cm_nsr()
661
662 #####################TBD###########################
663 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_terminate_ns', self.id)
664
665 self._log.info("NSR(%s/%s) is deleted", nsr_obj.nsr_name, id)
666
667 @asyncio.coroutine
668 def process_initial_config(self, nsr_obj, conf, script, vnfr_name=None):
669 '''Apply the initial-config-primitives specified in NSD or VNFD'''
670
671 def get_input_file(parameters):
672 inp = {}
673
674 # Add NSR name to file
675 inp['nsr_name'] = nsr_obj.nsr_name
676
677 # Add VNFR name if available
678 if vnfr_name:
679 inp['vnfr_name'] = vnfr_name
680
681 # TODO (pjoseph): Add config agents, we need to identify which all
682 # config agents are required from this NS and provide only those
683 inp['config-agent'] = {}
684
685 # Add parameters for initial config
686 inp['parameter'] = {}
687 for parameter in parameters:
688 try:
689 inp['parameter'][parameter['name']] = parameter['value']
690 except KeyError as e:
691 if vnfr_name:
692 self._log.info("VNFR {} initial config parameter {} with no value: {}".
693 format(vnfr_name, parameter, e))
694 else:
695 self._log.info("NSR {} initial config parameter {} with no value: {}".
696 format(nsr_obj.nsr_name, parameter, e))
697
698
699 # Add vnfrs specific data
700 inp['vnfr'] = {}
701 for vnfr in nsr_obj.vnfrs:
702 v = {}
703
704 v['name'] = vnfr['name']
705 v['mgmt_ip_address'] = vnfr['vnf_cfg']['mgmt_ip_address']
706 v['mgmt_port'] = vnfr['vnf_cfg']['port']
707
708 if 'dashboard_url' in vnfr:
709 v['dashboard_url'] = vnfr['dashboard_url']
710
711 if 'connection_point' in vnfr:
712 v['connection_point'] = []
713 for cp in vnfr['connection_point']:
714 v['connection_point'].append(
715 {
716 'name': cp['name'],
717 'ip_address': cp['ip_address'],
718 }
719 )
720
721 v['vdur'] = []
722 vdu_data = [(vdu['name'], vdu['management_ip'], vdu['vm_management_ip'], vdu['id'])
723 for vdu in vnfr['vdur']]
724
725 for data in vdu_data:
726 data = dict(zip(['name', 'management_ip', 'vm_management_ip', 'id'] , data))
727 v['vdur'].append(data)
728
729 inp['vnfr'][vnfr['member_vnf_index_ref']] = v
730
731 self._log.debug("Input data for {}: {}".
732 format((vnfr_name if vnfr_name else nsr_obj.nsr_name),
733 inp))
734
735 # Convert to YAML string
736 yaml_string = yaml.dump(inp, default_flow_style=False)
737
738 # Write the inputs as yaml file
739 tmp_file = None
740 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
741 tmp_file.write(yaml_string.encode("UTF-8"))
742 self._log.debug("Input file created for {}: {}".
743 format((vnfr_name if vnfr_name \
744 else nsr_obj.nsr_name),
745 tmp_file.name))
746
747 return tmp_file.name
748
749 parameters = []
750 try:
751 parameters = conf['parameter']
752 except Exception as e:
753 self._log.debug("Parameter conf: {}, e: {}".
754 format(conf, e))
755
756 inp_file = get_input_file(parameters)
757
758 cmd = "{0} {1}".format(script, inp_file)
759 self._log.debug("Running the CMD: {}".format(cmd))
760
761 process = yield from asyncio.create_subprocess_shell(cmd,
762 loop=self._loop)
763 yield from process.wait()
764
765 if process.returncode:
766 msg = "NSR/VNFR {} initial config using {} failed with {}". \
767 format(vnfr_name if vnfr_name else nsr_obj.nsr_name,
768 script, process.returncode)
769 self._log.error(msg)
770 raise InitialConfigError(msg)
771 else:
772 # os.remove(inp_file)
773 pass
774
775 def get_script_file(self, script_name, d_name, d_id, d_type):
776 # Get the full path to the script
777 script = ''
778 # If script name starts with /, assume it is full path
779 if script_name[0] == '/':
780 # The script has full path, use as is
781 script = script_name
782 else:
783 script = os.path.join(os.environ['RIFT_ARTIFACTS'],
784 'launchpad/packages',
785 d_type,
786 d_id,
787 d_name,
788 'scripts',
789 script_name)
790 self._log.debug("Checking for script at %s", script)
791 if not os.path.exists(script):
792 self._log.debug("Did not find script %s", script)
793 script = os.path.join(os.environ['RIFT_INSTALL'],
794 'usr/bin',
795 script_name)
796
797 # Seen cases in jenkins, where the script execution fails
798 # with permission denied. Setting the permission on script
799 # to make sure it has execute permission
800 perm = os.stat(script).st_mode
801 if not (perm & stat.S_IXUSR):
802 self._log.warn("NSR/VNFR {} initial config script {} " \
803 "without execute permission: {}".
804 format(d_name, script, perm))
805 os.chmod(script, perm | stat.S_IXUSR)
806 return script
807
808 @asyncio.coroutine
809 def process_ns_initial_config(self, nsr_obj):
810 '''Apply the initial-config-primitives specified in NSD'''
811
812 nsr = yield from self.cmdts_obj.get_nsr(nsr_obj.nsr_id)
813 if 'initial_config_primitive' not in nsr:
814 return
815
816 if nsr is not None:
817 nsd = yield from self.cmdts_obj.get_nsd(nsr_obj.nsr_id)
818 for conf in nsr['initial_config_primitive']:
819 self._log.debug("NSR {} initial config: {}".
820 format(nsr_obj.nsr_name, conf))
821 script = self.get_script_file(conf['user_defined_script'],
822 nsd.name,
823 nsd.id,
824 'nsd')
825
826 yield from self.process_initial_config(nsr_obj, conf, script)
827
828 @asyncio.coroutine
829 def process_vnf_initial_config(self, nsr_obj, vnfr):
830 '''Apply the initial-config-primitives specified in VNFD'''
831
832 vnfr_name = vnfr.name
833
834 vnfd = yield from self.cmdts_obj.get_vnfd(vnfr.vnfd_ref)
835 if vnfd is None:
836 msg = "VNFR {}, unable to get VNFD {}". \
837 format(vnfr_name, vnfr.vnfd_ref)
838 self._log.error(msg)
839 raise InitialConfigError(msg)
840
841 vnf_cfg = vnfd.vnf_configuration
842
843 for conf in vnf_cfg.initial_config_primitive:
844 self._log.debug("VNFR {} initial config: {}".
845 format(vnfr_name, conf))
846
847 if not conf.user_defined_script:
848 self._log.debug("VNFR {} did not fine user defined script: {}".
849 format(vnfr_name, conf))
850 continue
851
852 script = self.get_script_file(conf.user_defined_script,
853 vnfd.id,
854 vnfd.name,
855 'vnfd')
856
857 yield from self.process_initial_config(nsr_obj,
858 conf.as_dict(),
859 script,
860 vnfr_name=vnfr_name)
861
862
863 class ConfigManagerNSR(object):
864 def __init__(self, log, loop, parent, id):
865 self._log = log
866 self._loop = loop
867 self._rwcal = None
868 self._vnfr_dict = {}
869 self._cp_dict = {}
870 self._nsr_id = id
871 self._parent = parent
872 self._log.info("Instantiated NSR entry for id=%s", id)
873 self.nsr_cfg_config_attributes_dict = {}
874 self.vnf_config_attributes_dict = {}
875 self.num_vnfs_to_cfg = 0
876 self._vnfr_list = []
877 self.vnf_cfg_list = []
878 self.this_nsr_dir = None
879 self.being_deleted = False
880 self.dts_obj = self._parent.cmdts_obj
881
882 # Initialize cm-state for this NS
883 self.cm_nsr = {}
884 self.cm_nsr['cm_vnfr'] = []
885 self.cm_nsr['id'] = id
886 self.cm_nsr['state'] = self.state_to_string(conmanY.RecordState.INIT)
887 self.cm_nsr['state_details'] = None
888
889 self.set_nsr_name('Not Set')
890
891 # Add this NSR cm-state object to global cm-state
892 parent.cm_state['cm_nsr'].append(self.cm_nsr)
893
894 # Place holders for NSR & VNFR classes
895 self.agent_nsr = None
896
897 @property
898 def nsr_opdata_xpath(self):
899 ''' Returns full xpath for this NSR cm-state opdata '''
900 return(
901 "D,/rw-conman:cm-state" +
902 "/rw-conman:cm-nsr[rw-conman:id='{}']"
903 ).format(self._nsr_id)
904
905 @property
906 def vnfrs(self):
907 return self._vnfr_list
908
909 @property
910 def parent(self):
911 return self._parent
912
913 @property
914 def nsr_id(self):
915 return self._nsr_id
916
917 @asyncio.coroutine
918 def publish_cm_state(self):
919 ''' This function publishes cm_state for this NSR '''
920
921 cm_state = conmanY.CmOpdata()
922 cm_state_nsr = cm_state.cm_nsr.add()
923 cm_state_nsr.from_dict(self.cm_nsr)
924 #with self._dts.transaction() as xact:
925 yield from self.dts_obj.update(self.nsr_opdata_xpath, cm_state_nsr)
926 self._log.info("Published cm-state with xpath %s and nsr %s",
927 self.nsr_opdata_xpath,
928 cm_state_nsr)
929
930 @asyncio.coroutine
931 def delete_cm_nsr(self):
932 ''' This function publishes cm_state for this NSR '''
933
934 yield from self.dts_obj.delete(self.nsr_opdata_xpath)
935 self._log.info("Deleted cm-nsr with xpath %s",
936 self.nsr_opdata_xpath)
937
938 def set_nsr_name(self, name):
939 self.nsr_name = name
940 self.cm_nsr['name'] = name
941
942 def set_config_dir(self, caller):
943 self.this_nsr_dir = os.path.join(
944 caller._parent.cfg_dir, self.nsr_name, caller._nsr['name_ref'])
945 if not os.path.exists(self.this_nsr_dir):
946 os.makedirs(self.this_nsr_dir)
947 self._log.debug("NSR:(%s), Created configuration directory(%s)",
948 caller._nsr['name_ref'], self.this_nsr_dir)
949 self.config_attributes_file = os.path.join(self.this_nsr_dir, "configuration_config_attributes.yml")
950 self.xlate_dict_file = os.path.join(self.this_nsr_dir, "nsr_xlate_dict.yml")
951
952 def xlate_conf(self, vnfr, vnf_cfg):
953
954 # If configuration type is not already set, try to read from attributes
955 if vnf_cfg['interface_type'] is None:
956 # Prepare unique name for this VNF
957 vnf_unique_name = get_vnf_unique_name(
958 vnf_cfg['nsr_name'],
959 vnfr['short_name'],
960 vnfr['member_vnf_index_ref'],
961 )
962
963 # Find this particular (unique) VNF's config attributes
964 if (vnf_unique_name in self.vnf_config_attributes_dict):
965 vnf_cfg_config_attributes_dict = self.vnf_config_attributes_dict[vnf_unique_name]
966 vnf_cfg['interface_type'] = vnf_cfg_config_attributes_dict['configuration_type']
967 if 'configuration_options' in vnf_cfg_config_attributes_dict:
968 cfg_opts = vnf_cfg_config_attributes_dict['configuration_options']
969 for key, value in cfg_opts.items():
970 vnf_cfg[key] = value
971
972 cfg_path_prefix = '{}/{}/{}_{}'.format(
973 self._parent._parent.cfg_dir,
974 vnf_cfg['nsr_name'],
975 vnfr['short_name'],
976 vnfr['member_vnf_index_ref'],
977 )
978
979 vnf_cfg['cfg_template'] = '{}_{}_template.cfg'.format(cfg_path_prefix, vnf_cfg['interface_type'])
980 vnf_cfg['cfg_file'] = '{}.cfg'.format(cfg_path_prefix)
981 vnf_cfg['xlate_script'] = self._parent._parent.cfg_dir + '/xlate_cfg.py'
982
983 self._log.debug("VNF endpoint so far: %s", vnf_cfg)
984
985 self._log.info("Checking cfg_template %s", vnf_cfg['cfg_template'])
986 if os.path.exists(vnf_cfg['cfg_template']):
987 return True
988 return False
989
990 def ConfigVNF(self, vnfr):
991
992 vnf_cfg = vnfr['vnf_cfg']
993 vnf_cm_state = self.find_or_create_vnfr_cm_state(vnf_cfg)
994
995 if (vnf_cm_state['state'] == self.state_to_string(conmanY.RecordState.READY_NO_CFG)
996 or
997 vnf_cm_state['state'] == self.state_to_string(conmanY.RecordState.READY)):
998 self._log.warning("NS/VNF (%s/%s) is already configured! Skipped.", self.nsr_name, vnfr['short_name'])
999 return
1000
1001 #UPdate VNF state
1002 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS)
1003
1004 # Now translate the configuration for iP addresses
1005 try:
1006 # Add cp_dict members (TAGS) for this VNF
1007 self._cp_dict['rw_mgmt_ip'] = vnf_cfg['mgmt_ip_address']
1008 self._cp_dict['rw_username'] = vnf_cfg['username']
1009 self._cp_dict['rw_password'] = vnf_cfg['password']
1010 ############################################################
1011 # TBD - Need to lookup above 3 for a given VNF, not global #
1012 # Once we do that no need to dump below file again before #
1013 # each VNF configuration translation. #
1014 # This will require all existing config templates to be #
1015 # changed for above three tags to include member index #
1016 ############################################################
1017 try:
1018 nsr_obj = vnf_cfg['nsr_obj']
1019 # Generate config_config_attributes.yaml (For debug reference)
1020 with open(nsr_obj.xlate_dict_file, "w") as yf:
1021 yf.write(yaml.dump(nsr_obj._cp_dict, default_flow_style=False))
1022 except Exception as e:
1023 self._log.error("NS:(%s) failed to write nsr xlate tags file as (%s)", nsr_obj.nsr_name, str(e))
1024
1025 if 'cfg_template' in vnf_cfg:
1026 script_cmd = 'python3 {} -i {} -o {} -x "{}"'.format(vnf_cfg['xlate_script'], vnf_cfg['cfg_template'], vnf_cfg['cfg_file'], self.xlate_dict_file)
1027 self._log.debug("xlate script command (%s)", script_cmd)
1028 #xlate_msg = subprocess.check_output(script_cmd).decode('utf-8')
1029 xlate_msg = subprocess.check_output(script_cmd, shell=True).decode('utf-8')
1030 self._log.info("xlate script output (%s)", xlate_msg)
1031 except Exception as e:
1032 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS_FAILED)
1033 self._log.error("Failed to execute translation script for VNF: %s with (%s)", log_this_vnf(vnf_cfg), str(e))
1034 return
1035
1036 self._log.info("Applying config to VNF: %s = %s!", log_this_vnf(vnf_cfg), vnf_cfg)
1037 try:
1038 #self.vnf_cfg_list.append(vnf_cfg)
1039 self._log.debug("Scheduled configuration!")
1040 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_SCHED)
1041 except Exception as e:
1042 self._log.error("Failed apply_vnf_config to VNF: %s as (%s)", log_this_vnf(vnf_cfg), str(e))
1043 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS_FAILED)
1044 raise
1045
1046 def add(self, nsr):
1047 self._log.info("Adding NS Record for id=%s", id)
1048 self._nsr = nsr
1049
1050 def sample_cm_state(self):
1051 return (
1052 {
1053 'cm_nsr': [
1054 {
1055 'cm_vnfr': [
1056 {
1057 'cfg_location': 'location1',
1058 'cfg_type': 'script',
1059 'connection_point': [
1060 {'ip_address': '1.1.1.1', 'name': 'vnf1cp1'},
1061 {'ip_address': '1.1.1.2', 'name': 'vnf1cp2'}
1062 ],
1063 'id': 'vnfrid1',
1064 'mgmt_interface': {'ip_address': '7.1.1.1',
1065 'port': 1001},
1066 'name': 'vnfrname1',
1067 'state': 'init'
1068 },
1069 {
1070 'cfg_location': 'location2',
1071 'cfg_type': 'netconf',
1072 'connection_point': [{'ip_address': '2.1.1.1', 'name': 'vnf2cp1'},
1073 {'ip_address': '2.1.1.2', 'name': 'vnf2cp2'}],
1074 'id': 'vnfrid2',
1075 'mgmt_interface': {'ip_address': '7.1.1.2',
1076 'port': 1001},
1077 'name': 'vnfrname2',
1078 'state': 'init'}
1079 ],
1080 'id': 'nsrid1',
1081 'name': 'nsrname1',
1082 'state': 'init'}
1083 ],
1084 'states': 'Initialized, '
1085 })
1086
1087 def populate_vm_state_from_vnf_cfg(self):
1088 # Fill in each VNFR from this nsr object
1089 vnfr_list = self._vnfr_list
1090 for vnfr in vnfr_list:
1091 vnf_cfg = vnfr['vnf_cfg']
1092 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1093
1094 if vnf_cm_state:
1095 # Fill in VNF management interface
1096 vnf_cm_state['mgmt_interface']['ip_address'] = vnf_cfg['mgmt_ip_address']
1097 vnf_cm_state['mgmt_interface']['port'] = vnf_cfg['port']
1098
1099 # Fill in VNF configuration details
1100 vnf_cm_state['cfg_type'] = vnf_cfg['config_method']
1101 vnf_cm_state['cfg_location'] = vnf_cfg['cfg_file']
1102
1103 # Fill in each connection-point for this VNF
1104 if "connection_point" in vnfr:
1105 cp_list = vnfr['connection_point']
1106 for cp_item_dict in cp_list:
1107 vnf_cm_state['connection_point'].append(
1108 {
1109 'name' : cp_item_dict['name'],
1110 'ip_address' : cp_item_dict['ip_address'],
1111 }
1112 )
1113
1114 def state_to_string(self, state):
1115 state_dict = {
1116 conmanY.RecordState.INIT : "init",
1117 conmanY.RecordState.RECEIVED : "received",
1118 conmanY.RecordState.CFG_PROCESS : "cfg_process",
1119 conmanY.RecordState.CFG_PROCESS_FAILED : "cfg_process_failed",
1120 conmanY.RecordState.CFG_SCHED : "cfg_sched",
1121 conmanY.RecordState.CFG_DELAY : "cfg_delay",
1122 conmanY.RecordState.CONNECTING : "connecting",
1123 conmanY.RecordState.FAILED_CONNECTION : "failed_connection",
1124 conmanY.RecordState.NETCONF_CONNECTED : "netconf_connected",
1125 conmanY.RecordState.NETCONF_SSH_CONNECTED : "netconf_ssh_connected",
1126 conmanY.RecordState.RESTCONF_CONNECTED : "restconf_connected",
1127 conmanY.RecordState.CFG_SEND : "cfg_send",
1128 conmanY.RecordState.CFG_FAILED : "cfg_failed",
1129 conmanY.RecordState.READY_NO_CFG : "ready_no_cfg",
1130 conmanY.RecordState.READY : "ready",
1131 }
1132 return state_dict[state]
1133
1134 def find_vnfr_cm_state(self, id):
1135 if self.cm_nsr['cm_vnfr']:
1136 for vnf_cm_state in self.cm_nsr['cm_vnfr']:
1137 if vnf_cm_state['id'] == id:
1138 return vnf_cm_state
1139 return None
1140
1141 def find_or_create_vnfr_cm_state(self, vnf_cfg):
1142 vnfr = vnf_cfg['vnfr']
1143 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1144
1145 if vnf_cm_state is None:
1146 # Not found, Create and Initialize this VNF cm-state
1147 vnf_cm_state = {
1148 'id' : vnfr['id'],
1149 'name' : vnfr['short_name'],
1150 'state' : self.state_to_string(conmanY.RecordState.RECEIVED),
1151 'mgmt_interface' :
1152 {
1153 'ip_address' : vnf_cfg['mgmt_ip_address'],
1154 'port' : vnf_cfg['port'],
1155 },
1156 'cfg_type' : vnf_cfg['config_method'],
1157 'cfg_location' : vnf_cfg['cfg_file'],
1158 'connection_point' : [],
1159 }
1160 self.cm_nsr['cm_vnfr'].append(vnf_cm_state)
1161
1162 # Publish newly created cm-state
1163
1164
1165 return vnf_cm_state
1166
1167 @asyncio.coroutine
1168 def get_vnf_cm_state(self, vnfr):
1169 if vnfr:
1170 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1171 if vnf_cm_state:
1172 return vnf_cm_state['state']
1173 return False
1174
1175 @asyncio.coroutine
1176 def update_vnf_cm_state(self, vnfr, state):
1177 if vnfr:
1178 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1179 if vnf_cm_state is None:
1180 self._log.error("No opdata found for NS/VNF:%s/%s!",
1181 self.nsr_name, vnfr['short_name'])
1182 return
1183
1184 if vnf_cm_state['state'] != self.state_to_string(state):
1185 old_state = vnf_cm_state['state']
1186 vnf_cm_state['state'] = self.state_to_string(state)
1187 # Publish new state
1188 yield from self.publish_cm_state()
1189 self._log.info("VNF ({}/{}/{}) state change: {} -> {}"
1190 .format(self.nsr_name,
1191 vnfr['short_name'],
1192 vnfr['member_vnf_index_ref'],
1193 old_state,
1194 vnf_cm_state['state']))
1195
1196 else:
1197 self._log.error("No VNFR supplied for state update (NS=%s)!",
1198 self.nsr_name)
1199
1200 @property
1201 def get_ns_cm_state(self):
1202 return self.cm_nsr['state']
1203
1204 @asyncio.coroutine
1205 def update_ns_cm_state(self, state, state_details=None):
1206 if self.cm_nsr['state'] != self.state_to_string(state):
1207 old_state = self.cm_nsr['state']
1208 self.cm_nsr['state'] = self.state_to_string(state)
1209 self.cm_nsr['state_details'] = state_details if state_details is not None else None
1210 self._log.info("NS ({}) state change: {} -> {}"
1211 .format(self.nsr_name,
1212 old_state,
1213 self.cm_nsr['state']))
1214 # Publish new state
1215 yield from self.publish_cm_state()
1216
1217 @asyncio.coroutine
1218 def add_vnfr(self, vnfr, vnfr_msg):
1219
1220 @asyncio.coroutine
1221 def populate_subnets_from_vlr(id):
1222 try:
1223 # Populate cp_dict with VLR subnet info
1224 vlr = yield from self.dts_obj.get_vlr(id)
1225 if vlr is not None and 'assigned_subnet' in vlr:
1226 subnet = {vlr.name:vlr.assigned_subnet}
1227 self._cp_dict[vnfr['member_vnf_index_ref']].update(subnet)
1228 self._cp_dict.update(subnet)
1229 self._log.debug("VNF:(%s) Updated assigned subnet = %s",
1230 vnfr['short_name'], subnet)
1231 except Exception as e:
1232 self._log.error("VNF:(%s) VLR Error = %s",
1233 vnfr['short_name'], e)
1234
1235 if vnfr['id'] not in self._vnfr_dict:
1236 self._log.info("NSR(%s) : Adding VNF Record for name=%s, id=%s", self._nsr_id, vnfr['short_name'], vnfr['id'])
1237 # Add this vnfr to the list for show, or single traversal
1238 self._vnfr_list.append(vnfr)
1239 else:
1240 self._log.warning("NSR(%s) : VNF Record for name=%s, id=%s already exists, overwriting", self._nsr_id, vnfr['short_name'], vnfr['id'])
1241
1242 # Make vnfr available by id as well as by name
1243 unique_name = get_vnf_unique_name(self.nsr_name, vnfr['short_name'], vnfr['member_vnf_index_ref'])
1244 self._vnfr_dict[unique_name] = vnfr
1245 self._vnfr_dict[vnfr['id']] = vnfr
1246
1247 # Create vnf_cfg dictionary with default values
1248 vnf_cfg = {
1249 'nsr_obj' : self,
1250 'vnfr' : vnfr,
1251 'agent_vnfr' : self.agent_nsr.add_vnfr(vnfr, vnfr_msg),
1252 'nsr_name' : self.nsr_name,
1253 'nsr_id' : self._nsr_id,
1254 'vnfr_name' : vnfr['short_name'],
1255 'member_vnf_index' : vnfr['member_vnf_index_ref'],
1256 'port' : 0,
1257 'username' : 'admin',
1258 'password' : 'admin',
1259 'config_method' : 'None',
1260 'protocol' : 'None',
1261 'mgmt_ip_address' : '0.0.0.0',
1262 'cfg_file' : 'None',
1263 'cfg_retries' : 0,
1264 'script_type' : 'bash',
1265 }
1266
1267 # Update the mgmt ip address
1268 # In case the config method is none, this is not
1269 # updated later
1270 try:
1271 vnf_cfg['mgmt_ip_address'] = vnfr_msg.mgmt_interface.ip_address
1272 vnf_cfg['port'] = vnfr_msg.mgmt_interface.port
1273 except Exception as e:
1274 self._log.warn(
1275 "VNFR {}({}), unable to retrieve mgmt ip address: {}".
1276 format(vnfr['short_name'], vnfr['id'], e))
1277
1278 vnfr['vnf_cfg'] = vnf_cfg
1279 self.find_or_create_vnfr_cm_state(vnf_cfg)
1280
1281 '''
1282 Build the connection-points list for this VNF (self._cp_dict)
1283 '''
1284 # Populate global CP list self._cp_dict from VNFR
1285 cp_list = []
1286 if 'connection_point' in vnfr:
1287 cp_list = vnfr['connection_point']
1288
1289 self._cp_dict[vnfr['member_vnf_index_ref']] = {}
1290 if 'vdur' in vnfr:
1291 for vdur in vnfr['vdur']:
1292 if 'internal_connection_point' in vdur:
1293 cp_list += vdur['internal_connection_point']
1294
1295 for cp_item_dict in cp_list:
1296 # Populate global dictionary
1297 self._cp_dict[
1298 cp_item_dict['name']
1299 ] = cp_item_dict['ip_address']
1300
1301 # Populate unique member specific dictionary
1302 self._cp_dict[
1303 vnfr['member_vnf_index_ref']
1304 ][
1305 cp_item_dict['name']
1306 ] = cp_item_dict['ip_address']
1307
1308 # Fill in the subnets from vlr
1309 if 'vlr_ref' in cp_item_dict:
1310 ### HACK: Internal connection_point do not have VLR reference
1311 yield from populate_subnets_from_vlr(cp_item_dict['vlr_ref'])
1312
1313 if 'internal_vlr' in vnfr:
1314 for ivlr in vnfr['internal_vlr']:
1315 yield from populate_subnets_from_vlr(ivlr['vlr_ref'])
1316
1317 # Update vnfr
1318 vnf_cfg['agent_vnfr']._vnfr = vnfr
1319 return vnf_cfg['agent_vnfr']
1320
1321
1322 class XPaths(object):
1323 @staticmethod
1324 def nsr_opdata(k=None):
1325 return ("D,/nsr:ns-instance-opdata/nsr:nsr" +
1326 ("[nsr:ns-instance-config-ref='{}']".format(k) if k is not None else ""))
1327
1328 @staticmethod
1329 def nsd_msg(k=None):
1330 return ("C,/nsd:nsd-catalog/nsd:nsd" +
1331 "[nsd:id = '{}']".format(k) if k is not None else "")
1332
1333 @staticmethod
1334 def vnfr_opdata(k=None):
1335 return ("D,/vnfr:vnfr-catalog/vnfr:vnfr" +
1336 ("[vnfr:id='{}']".format(k) if k is not None else ""))
1337
1338 @staticmethod
1339 def vnfd(k=None):
1340 return ("C,/vnfd:vnfd-catalog/vnfd:vnfd" +
1341 ("[vnfd:id='{}']".format(k) if k is not None else ""))
1342
1343 @staticmethod
1344 def config_agent(k=None):
1345 return ("D,/rw-config-agent:config-agent/rw-config-agent:account" +
1346 ("[rw-config-agent:name='{}']".format(k) if k is not None else ""))
1347
1348 @staticmethod
1349 def nsr_config(k=None):
1350 return ("C,/nsr:ns-instance-config/nsr:nsr[nsr:id='{}']".format(k) if k is not None else "")
1351
1352 @staticmethod
1353 def vlr(k=None):
1354 return ("D,/vlr:vlr-catalog/vlr:vlr[vlr:id='{}']".format(k) if k is not None else "")
1355
1356 class ConfigManagerDTS(object):
1357 ''' This class either reads from DTS or publishes to DTS '''
1358
1359 def __init__(self, log, loop, parent, dts):
1360 self._log = log
1361 self._loop = loop
1362 self._parent = parent
1363 self._dts = dts
1364
1365 @asyncio.coroutine
1366 def _read_dts(self, xpath, do_trace=False):
1367 self._log.debug("_read_dts path = %s", xpath)
1368 flags = rwdts.XactFlag.MERGE
1369 res_iter = yield from self._dts.query_read(
1370 xpath, flags=flags
1371 )
1372
1373 results = []
1374 try:
1375 for i in res_iter:
1376 result = yield from i
1377 if result is not None:
1378 results.append(result.result)
1379 except:
1380 pass
1381
1382 return results
1383
1384
1385 @asyncio.coroutine
1386 def get_nsr(self, id):
1387 self._log.debug("Attempting to get NSR: %s", id)
1388 nsrl = yield from self._read_dts(XPaths.nsr_opdata(id), False)
1389 nsr = None
1390 if len(nsrl) > 0:
1391 nsr = nsrl[0].as_dict()
1392 return nsr
1393
1394 @asyncio.coroutine
1395 def get_nsr_config(self, id):
1396 self._log.debug("Attempting to get config NSR: %s", id)
1397 nsrl = yield from self._read_dts(XPaths.nsr_config(id), False)
1398 nsr = None
1399 if len(nsrl) > 0:
1400 nsr = nsrl[0]
1401 return nsr
1402
1403 @asyncio.coroutine
1404 def get_nsd_msg(self, id):
1405 self._log.debug("Attempting to get NSD: %s", id)
1406 nsdl = yield from self._read_dts(XPaths.nsd_msg(id), False)
1407 nsd_msg = None
1408 if len(nsdl) > 0:
1409 nsd_msg = nsdl[0]
1410 return nsd_msg
1411
1412 @asyncio.coroutine
1413 def get_nsd(self, nsr_id):
1414 self._log.debug("Attempting to get NSD for NSR: %s", id)
1415 nsr_config = yield from self.get_nsr_config(nsr_id)
1416 nsd_msg = nsr_config.nsd
1417 return nsd_msg
1418
1419 @asyncio.coroutine
1420 def get_vnfr(self, id):
1421 self._log.debug("Attempting to get VNFR: %s", id)
1422 vnfrl = yield from self._read_dts(XPaths.vnfr_opdata(id), do_trace=False)
1423 vnfr_msg = None
1424 if len(vnfrl) > 0:
1425 vnfr_msg = vnfrl[0]
1426 return vnfr_msg
1427
1428 @asyncio.coroutine
1429 def get_vnfd(self, vnfd_id):
1430 self._log.debug("Attempting to get VNFD: %s", vnfd_id)
1431 vnfdl = yield from self._read_dts(XPaths.vnfd(vnfd_id), do_trace=False)
1432 vnfd_msg = None
1433 if len(vnfdl) > 0:
1434 vnfd_msg = vnfdl[0]
1435 return vnfd_msg
1436
1437 @asyncio.coroutine
1438 def get_vlr(self, id):
1439 self._log.debug("Attempting to get VLR subnet: %s", id)
1440 vlrl = yield from self._read_dts(XPaths.vlr(id), do_trace=True)
1441 vlr_msg = None
1442 if len(vlrl) > 0:
1443 vlr_msg = vlrl[0]
1444 return vlr_msg
1445
1446 @asyncio.coroutine
1447 def get_config_agents(self, name):
1448 self._log.debug("Attempting to get config_agents: %s", name)
1449 cfgagentl = yield from self._read_dts(XPaths.config_agent(name), False)
1450 return cfgagentl
1451
1452 @asyncio.coroutine
1453 def update(self, path, msg, flags=rwdts.XactFlag.REPLACE):
1454 """
1455 Update a cm-state (cm-nsr) record in DTS with the path and message
1456 """
1457 self._log.debug("Updating cm-state %s:%s dts_pub_hdl = %s", path, msg, self.dts_pub_hdl)
1458 self.dts_pub_hdl.update_element(path, msg, flags)
1459 self._log.debug("Updated cm-state, %s:%s", path, msg)
1460
1461 @asyncio.coroutine
1462 def delete(self, path):
1463 """
1464 Delete cm-nsr record in DTS with the path only
1465 """
1466 self._log.debug("Deleting cm-nsr %s dts_pub_hdl = %s", path, self.dts_pub_hdl)
1467 self.dts_pub_hdl.delete_element(path)
1468 self._log.debug("Deleted cm-nsr, %s", path)
1469
1470 @asyncio.coroutine
1471 def register(self):
1472 yield from self.register_to_publish()
1473 yield from self.register_for_nsr()
1474
1475 @asyncio.coroutine
1476 def register_to_publish(self):
1477 ''' Register to DTS for publishing cm-state opdata '''
1478
1479 xpath = "D,/rw-conman:cm-state/rw-conman:cm-nsr"
1480 self._log.debug("Registering to publish cm-state @ %s", xpath)
1481 hdl = rift.tasklets.DTS.RegistrationHandler()
1482 with self._dts.group_create() as group:
1483 self.dts_pub_hdl = group.register(xpath=xpath,
1484 handler=hdl,
1485 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ)
1486
1487 @property
1488 def nsr_xpath(self):
1489 return "D,/nsr:ns-instance-opdata/nsr:nsr"
1490
1491 @asyncio.coroutine
1492 def register_for_nsr(self):
1493 """ Register for NSR changes """
1494
1495 @asyncio.coroutine
1496 def on_prepare(xact_info, query_action, ks_path, msg):
1497 """ This NSR is created """
1498 self._log.debug("Received NSR instantiate on_prepare (%s:%s:%s)",
1499 query_action,
1500 ks_path,
1501 msg)
1502
1503 if (query_action == rwdts.QueryAction.UPDATE or
1504 query_action == rwdts.QueryAction.CREATE):
1505 msg_dict = msg.as_dict()
1506 # Update Each NSR/VNFR state)
1507 if ('operational_status' in msg_dict and
1508 msg_dict['operational_status'] == 'running'):
1509 # Add to the task list
1510 self._parent.add_to_pending_tasks({'nsrid' : msg_dict['ns_instance_config_ref'], 'retries' : 5})
1511 elif query_action == rwdts.QueryAction.DELETE:
1512 nsr_id = msg.ns_instance_config_ref
1513 asyncio.ensure_future(self._parent.terminate_NSR(nsr_id), loop=self._loop)
1514 else:
1515 raise NotImplementedError(
1516 "%s action on cm-state not supported",
1517 query_action)
1518
1519 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1520
1521 try:
1522 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
1523 self.dts_reg_hdl = yield from self._dts.register(self.nsr_xpath,
1524 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1525 handler=handler)
1526 except Exception as e:
1527 self._log.error("Failed to register for NSR changes as %s", str(e))
1528
1529