26af0564874b6974a16fb5072bcf59ce4f4ab421
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / rwconman_config.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import os
20 import stat
21 import subprocess
22 import sys
23 import tempfile
24 import yaml
25
26 from gi.repository import (
27 RwDts as rwdts,
28 RwConmanYang as conmanY,
29 ProtobufC,
30 )
31
32 import rift.tasklets
33
34 from . import rwconman_conagent as conagent
35 from . import RiftCM_rpc
36 from . import riftcm_config_plugin
37
38 if sys.version_info < (3, 4, 4):
39 asyncio.ensure_future = asyncio.async
40
41 def get_vnf_unique_name(nsr_name, vnfr_short_name, member_vnf_index):
42 return "{}.{}.{}".format(nsr_name, vnfr_short_name, member_vnf_index)
43
44 class ConmanConfigError(Exception):
45 pass
46
47
48 class InitialConfigError(ConmanConfigError):
49 pass
50
51
52 def log_this_vnf(vnf_cfg):
53 log_vnf = ""
54 used_item_list = ['nsr_name', 'vnfr_name', 'member_vnf_index', 'mgmt_ip_address']
55 for item in used_item_list:
56 if item in vnf_cfg:
57 if item == 'mgmt_ip_address':
58 log_vnf += "({})".format(vnf_cfg[item])
59 else:
60 log_vnf += "{}/".format(vnf_cfg[item])
61 return log_vnf
62
63 class PretendNsm(object):
64 def __init__(self, dts, log, loop, parent):
65 self._dts = dts
66 self._log = log
67 self._loop = loop
68 self._parent = parent
69 self._nsrs = {}
70 self._nsr_dict = parent._nsr_dict
71 self._config_agent_plugins = []
72 self._nsd_msg = {}
73
74 @property
75 def nsrs(self):
76 # Expensive, instead use get_nsr, if you know id.
77 self._nsrs = {}
78 # Update the list of nsrs (agent nsr)
79 for id, nsr_obj in self._nsr_dict.items():
80 self._nsrs[id] = nsr_obj.agent_nsr
81 return self._nsrs
82
83 def get_nsr(self, nsr_id):
84 if nsr_id in self._nsr_dict:
85 nsr_obj = self._nsr_dict[nsr_id]
86 return nsr_obj._nsr
87 return None
88
89 def get_vnfr_msg(self, vnfr_id, nsr_id=None):
90 self._log.debug("get_vnfr_msg(vnfr=%s, nsr=%s)",
91 vnfr_id, nsr_id)
92 found = False
93 if nsr_id:
94 if nsr_id in self._nsr_dict:
95 nsr_obj = self._nsr_dict[nsr_id]
96 if vnfr_id in nsr_obj._vnfr_dict:
97 found = True
98 else:
99 for nsr_obj in self._nsr_dict.values():
100 if vnfr_id in nsr_obj._vnfr_dict:
101 # Found it
102 found = True
103 break
104 if found:
105 vnf_cfg = nsr_obj._vnfr_dict[vnfr_id]['vnf_cfg']
106 return vnf_cfg['agent_vnfr'].vnfr_msg
107 else:
108 return None
109
110 @asyncio.coroutine
111 def get_nsd(self, nsr_id):
112 if nsr_id not in self._nsd_msg:
113 nsr_config = yield from self._parent.cmdts_obj.get_nsr_config(nsr_id)
114 self._nsd_msg[nsr_id] = nsr_config.nsd
115 return self._nsd_msg[nsr_id]
116
117 @property
118 def config_agent_plugins(self):
119 self._config_agent_plugins = []
120 for agent in self._parent._config_agent_mgr._plugin_instances.values():
121 self._config_agent_plugins.append(agent)
122 return self._config_agent_plugins
123
124 class ConfigManagerConfig(object):
125 def __init__(self, dts, log, loop, parent):
126 self._dts = dts
127 self._log = log
128 self._loop = loop
129 self._parent = parent
130 self._nsr_dict = {}
131 self.pending_cfg = {}
132 self.terminate_cfg = {}
133 self.pending_tasks = [] # User for NSRid get retry
134 # (mainly excercised at restart case)
135 self._config_xpath = "C,/cm-config"
136 self._opdata_xpath = "D,/rw-conman:cm-state"
137
138 self.cm_config = conmanY.SoConfig()
139 # RO specific configuration
140 self.ro_config = {}
141 for key in self.cm_config.ro_endpoint.fields:
142 self.ro_config[key] = None
143
144 # Initialize cm-state
145 self.cm_state = {}
146 self.cm_state['cm_nsr'] = []
147 self.cm_state['states'] = "Initialized"
148
149 # Initialize objects to register
150 self.cmdts_obj = ConfigManagerDTS(self._log, self._loop, self, self._dts)
151 self._config_agent_mgr = conagent.RiftCMConfigAgent(
152 self._dts,
153 self._log,
154 self._loop,
155 self,
156 )
157 self.reg_handles = [
158 self.cmdts_obj,
159 self._config_agent_mgr,
160 RiftCM_rpc.RiftCMRPCHandler(self._dts, self._log, self._loop,
161 PretendNsm(
162 self._dts, self._log, self._loop, self)),
163 ]
164
165 def is_nsr_valid(self, nsr_id):
166 if nsr_id in self._nsr_dict:
167 return True
168 return False
169
170 def add_to_pending_tasks(self, task):
171 if self.pending_tasks:
172 for p_task in self.pending_tasks:
173 if p_task['nsrid'] == task['nsrid']:
174 # Already queued
175 return
176 try:
177 self.pending_tasks.append(task)
178 self._log.debug("add_to_pending_tasks (nsrid:%s)",
179 task['nsrid'])
180 if len(self.pending_tasks) == 1:
181 self._loop.create_task(self.ConfigManagerConfig_pending_loop())
182 # TBD - change to info level
183 self._log.debug("Started pending_loop!")
184 except Exception as e:
185 self._log.error("Failed adding to pending tasks (%s)", str(e))
186
187 def del_from_pending_tasks(self, task):
188 try:
189 self.pending_tasks.remove(task)
190 except Exception as e:
191 self._log.error("Failed removing from pending tasks (%s)", str(e))
192
193 @asyncio.coroutine
194 def ConfigManagerConfig_pending_loop(self):
195 loop_sleep = 2
196 while True:
197 yield from asyncio.sleep(loop_sleep, loop=self._loop)
198 """
199 This pending task queue is ordred by events,
200 must finish previous task successfully to be able to go on to the next task
201 """
202 if self.pending_tasks:
203 self._log.debug("self.pending_tasks len=%s", len(self.pending_tasks))
204 task = self.pending_tasks[0]
205 done = False
206 if 'nsrid' in task:
207 nsrid = task['nsrid']
208 self._log.debug("Will execute pending task for NSR id(%s)", nsrid)
209 try:
210 # Try to configure this NSR
211 task['retries'] -= 1
212 done = yield from self.config_NSR(nsrid)
213 self._log.info("self.config_NSR status=%s", done)
214
215 except Exception as e:
216 self._log.error("Failed(%s) configuring NSR(%s)," \
217 "retries remained:%d!",
218 str(e), nsrid, task['retries'])
219 finally:
220 self.pending_tasks.remove(task)
221
222 if done:
223 self._log.debug("Finished pending task NSR id(%s):", nsrid)
224 else:
225 self._log.error("Failed configuring NSR(%s), retries remained:%d!",
226 nsrid, task['retries'])
227
228 # Failed, re-insert (append at the end)
229 # this failed task to be retried later
230 # If any retries remained.
231 if task['retries']:
232 self.pending_tasks.append(task)
233 else:
234 self._log.debug("Stopped pending_loop!")
235 break
236
237 @asyncio.coroutine
238 def register(self):
239 yield from self.register_cm_state_opdata()
240
241 # Initialize all handles that needs to be registered
242 for reg in self.reg_handles:
243 yield from reg.register()
244
245 @asyncio.coroutine
246 def register_cm_state_opdata(self):
247
248 def state_to_string(state):
249 state_dict = {
250 conmanY.RecordState.INIT : "init",
251 conmanY.RecordState.RECEIVED : "received",
252 conmanY.RecordState.CFG_PROCESS : "cfg_process",
253 conmanY.RecordState.CFG_PROCESS_FAILED : "cfg_process_failed",
254 conmanY.RecordState.CFG_SCHED : "cfg_sched",
255 conmanY.RecordState.CFG_DELAY : "cfg_delay",
256 conmanY.RecordState.CONNECTING : "connecting",
257 conmanY.RecordState.FAILED_CONNECTION : "failed_connection",
258 conmanY.RecordState.NETCONF_CONNECTED : "netconf_connected",
259 conmanY.RecordState.NETCONF_SSH_CONNECTED : "netconf_ssh_connected",
260 conmanY.RecordState.RESTCONF_CONNECTED : "restconf_connected",
261 conmanY.RecordState.CFG_SEND : "cfg_send",
262 conmanY.RecordState.CFG_FAILED : "cfg_failed",
263 conmanY.RecordState.READY_NO_CFG : "ready_no_cfg",
264 conmanY.RecordState.READY : "ready",
265 }
266 return state_dict[state]
267
268 @asyncio.coroutine
269 def on_prepare(xact_info, action, ks_path, msg):
270
271 self._log.debug("Received cm-state: msg=%s, action=%s", msg, action)
272
273 if action == rwdts.QueryAction.READ:
274 show_output = conmanY.CmOpdata()
275 show_output.from_dict(self.cm_state)
276 self._log.debug("Responding to SHOW cm-state: %s", self.cm_state)
277 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
278 xpath=self._opdata_xpath,
279 msg=show_output)
280 else:
281 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
282
283 self._log.info("Registering for cm-opdata xpath: %s",
284 self._opdata_xpath)
285
286 try:
287 handler=rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
288 yield from self._dts.register(xpath=self._opdata_xpath,
289 handler=handler,
290 flags=rwdts.Flag.PUBLISHER)
291 self._log.info("Successfully registered for opdata(%s)", self._opdata_xpath)
292 except Exception as e:
293 self._log.error("Failed to register for opdata as (%s)", e)
294
295 @asyncio.coroutine
296 def process_nsd_vnf_configuration(self, nsr_obj, vnfr):
297
298 def get_config_method(vnf_config):
299 cfg_types = ['netconf', 'juju', 'script']
300 for method in cfg_types:
301 if method in vnf_config:
302 return method
303 return None
304
305 def get_cfg_file_extension(method, configuration_options):
306 ext_dict = {
307 "netconf" : "xml",
308 "script" : {
309 "bash" : "sh",
310 "expect" : "exp",
311 },
312 "juju" : "yml"
313 }
314
315 if method == "netconf":
316 return ext_dict[method]
317 elif method == "script":
318 return ext_dict[method][configuration_options['script_type']]
319 elif method == "juju":
320 return ext_dict[method]
321 else:
322 return "cfg"
323
324 # This is how the YAML file should look like,
325 # This routine will be called for each VNF, so keep appending the file.
326 # priority order is determined by the number,
327 # hence no need to generate the file in that order. A dictionary will be
328 # used that will take care of the order by number.
329 '''
330 1 : <== This is priority
331 name : trafsink_vnfd
332 member_vnf_index : 2
333 configuration_delay : 120
334 configuration_type : netconf
335 configuration_options :
336 username : admin
337 password : admin
338 port : 2022
339 target : running
340 2 :
341 name : trafgen_vnfd
342 member_vnf_index : 1
343 configuration_delay : 0
344 configuration_type : netconf
345 configuration_options :
346 username : admin
347 password : admin
348 port : 2022
349 target : running
350 '''
351
352 # Save some parameters needed as short cuts in flat structure (Also generated)
353 vnf_cfg = vnfr['vnf_cfg']
354 # Prepare unique name for this VNF
355 vnf_cfg['vnf_unique_name'] = get_vnf_unique_name(
356 vnf_cfg['nsr_name'], vnfr['short_name'], vnfr['member_vnf_index_ref'])
357
358 nsr_obj.cfg_path_prefix = '{}/{}_{}'.format(
359 nsr_obj.this_nsr_dir, vnfr['short_name'], vnfr['member_vnf_index_ref'])
360 nsr_vnfr = '{}/{}_{}'.format(
361 vnf_cfg['nsr_name'], vnfr['short_name'], vnfr['member_vnf_index_ref'])
362
363 # Get vnf_configuration from vnfr
364 vnf_config = vnfr['vnf_configuration']
365
366 self._log.debug("vnf_configuration = %s", vnf_config)
367
368 # Create priority dictionary
369 cfg_priority_order = 0
370 if ('config_attributes' in vnf_config and
371 'config_priority' in vnf_config['config_attributes']):
372 cfg_priority_order = vnf_config['config_attributes']['config_priority']
373
374 if cfg_priority_order not in nsr_obj.nsr_cfg_config_attributes_dict:
375 # No VNFR with this priority yet, initialize the list
376 nsr_obj.nsr_cfg_config_attributes_dict[cfg_priority_order] = []
377
378 method = get_config_method(vnf_config)
379 if method is not None:
380 # Create all sub dictionaries first
381 config_priority = {
382 'id' : vnfr['id'],
383 'name' : vnfr['short_name'],
384 'member_vnf_index' : vnfr['member_vnf_index_ref'],
385 }
386
387 if 'config_delay' in vnf_config['config_attributes']:
388 config_priority['configuration_delay'] = vnf_config['config_attributes']['config_delay']
389 vnf_cfg['config_delay'] = config_priority['configuration_delay']
390
391 configuration_options = {}
392 self._log.debug("config method=%s", method)
393 config_priority['configuration_type'] = method
394 vnf_cfg['config_method'] = method
395
396 # Set config agent based on method
397 self._config_agent_mgr.set_config_agent(
398 nsr_obj.agent_nsr, vnf_cfg['agent_vnfr'], method)
399
400 cfg_opt_list = [
401 'port', 'target', 'script_type', 'ip_address', 'user', 'secret',
402 ]
403 for cfg_opt in cfg_opt_list:
404 if cfg_opt in vnf_config[method]:
405 configuration_options[cfg_opt] = vnf_config[method][cfg_opt]
406 vnf_cfg[cfg_opt] = configuration_options[cfg_opt]
407
408 cfg_opt_list = ['mgmt_ip_address', 'username', 'password']
409 for cfg_opt in cfg_opt_list:
410 if cfg_opt in vnf_config['config_access']:
411 configuration_options[cfg_opt] = vnf_config['config_access'][cfg_opt]
412 vnf_cfg[cfg_opt] = configuration_options[cfg_opt]
413
414 # Add to the cp_dict
415 vnf_cp_dict = nsr_obj._cp_dict[vnfr['member_vnf_index_ref']]
416 vnf_cp_dict['rw_mgmt_ip'] = vnf_cfg['mgmt_ip_address']
417 vnf_cp_dict['rw_username'] = vnf_cfg['username']
418 vnf_cp_dict['rw_password'] = vnf_cfg['password']
419
420
421 # TBD - see if we can neatly include the config in "config_attributes" file, no need though
422 #config_priority['config_template'] = vnf_config['config_template']
423 # Create config file
424 vnf_cfg['juju_script'] = os.path.join(self._parent.cfg_dir, 'juju_if.py')
425
426 if 'config_template' in vnf_config:
427 vnf_cfg['cfg_template'] = '{}_{}_template.cfg'.format(nsr_obj.cfg_path_prefix, config_priority['configuration_type'])
428 vnf_cfg['cfg_file'] = '{}.{}'.format(nsr_obj.cfg_path_prefix, get_cfg_file_extension(method, configuration_options))
429 vnf_cfg['xlate_script'] = os.path.join(self._parent.cfg_dir, 'xlate_cfg.py')
430 try:
431 # Now write this template into file
432 with open(vnf_cfg['cfg_template'], "w") as cf:
433 cf.write(vnf_config['config_template'])
434 except Exception as e:
435 self._log.error("Processing NSD, failed to generate configuration template : %s (Error : %s)",
436 vnf_config['config_template'], str(e))
437 raise
438
439 self._log.debug("VNF endpoint so far: %s", vnf_cfg)
440
441 # Populate filled up dictionary
442 config_priority['configuration_options'] = configuration_options
443 nsr_obj.nsr_cfg_config_attributes_dict[cfg_priority_order].append(config_priority)
444 nsr_obj.num_vnfs_to_cfg += 1
445 nsr_obj._vnfr_dict[vnf_cfg['vnf_unique_name']] = vnfr
446 nsr_obj._vnfr_dict[vnfr['id']] = vnfr
447
448 self._log.debug("VNF:(%s) config_attributes = %s",
449 log_this_vnf(vnfr['vnf_cfg']),
450 nsr_obj.nsr_cfg_config_attributes_dict)
451 else:
452 self._log.info("VNF:(%s) is not to be configured by Configuration Manager!",
453 log_this_vnf(vnfr['vnf_cfg']))
454 yield from nsr_obj.update_vnf_cm_state(vnfr, conmanY.RecordState.READY_NO_CFG)
455
456 # Update the cm-state
457 nsr_obj.populate_vm_state_from_vnf_cfg()
458
459 @asyncio.coroutine
460 def config_NSR(self, id):
461
462 def my_yaml_dump(config_attributes_dict, yf):
463
464 yaml_dict = dict(sorted(config_attributes_dict.items()))
465 yf.write(yaml.dump(yaml_dict, default_flow_style=False))
466
467 nsr_dict = self._nsr_dict
468 self._log.info("Configure NSR, id = %s", id)
469
470 #####################TBD###########################
471 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_create_nsr', self.id, self._nsd)
472 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_nsr_active', self.id, self._vnfrs)
473
474 try:
475 if id not in nsr_dict:
476 nsr_obj = ConfigManagerNSR(self._log, self._loop, self, id)
477 nsr_dict[id] = nsr_obj
478 else:
479 self._log.info("NSR(%s) is already initialized!", id)
480 nsr_obj = nsr_dict[id]
481 except Exception as e:
482 self._log.error("Failed creating NSR object for (%s) as (%s)", id, str(e))
483 raise
484
485 # Try to configure this NSR only if not already processed
486 if nsr_obj.cm_nsr['state'] != nsr_obj.state_to_string(conmanY.RecordState.INIT):
487 self._log.debug("NSR(%s) is already processed, state=%s",
488 nsr_obj.nsr_name, nsr_obj.cm_nsr['state'])
489 yield from nsr_obj.publish_cm_state()
490 return True
491
492 cmdts_obj = self.cmdts_obj
493 try:
494 # Fetch NSR
495 nsr = yield from cmdts_obj.get_nsr(id)
496 self._log.debug("Full NSR : %s", nsr)
497 if nsr['operational_status'] != "running":
498 self._log.info("NSR(%s) is not ready yet!", nsr['nsd_name_ref'])
499 return False
500 self._nsr = nsr
501
502 # Create Agent NSR class
503 nsr_config = yield from cmdts_obj.get_nsr_config(id)
504 self._log.debug("NSR {} config: {}".format(id, nsr_config))
505 nsr_obj.agent_nsr = riftcm_config_plugin.RiftCMnsr(nsr, nsr_config)
506
507 try:
508 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.RECEIVED)
509
510 # Parse NSR
511 if nsr is not None:
512 nsr_obj.set_nsr_name(nsr['name_ref'])
513 nsr_dir = os.path.join(self._parent.cfg_dir, nsr_obj.nsr_name)
514 self._log.info("Checking NS config directory: %s", nsr_dir)
515 if not os.path.isdir(nsr_dir):
516 os.makedirs(nsr_dir)
517 # self._log.critical("NS %s is not to be configured by Service Orchestrator!", nsr_obj.nsr_name)
518 # yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.READY_NO_CFG)
519 # return
520
521 nsr_obj.set_config_dir(self)
522
523 for const_vnfr in nsr['constituent_vnfr_ref']:
524 self._log.debug("Fetching VNFR (%s)", const_vnfr['vnfr_id'])
525 vnfr_msg = yield from cmdts_obj.get_vnfr(const_vnfr['vnfr_id'])
526 if vnfr_msg:
527 vnfr = vnfr_msg.as_dict()
528 self._log.info("create VNF:{}/{}".format(nsr_obj.nsr_name, vnfr['short_name']))
529 agent_vnfr = yield from nsr_obj.add_vnfr(vnfr, vnfr_msg)
530
531 # Preserve order, self.process_nsd_vnf_configuration()
532 # sets up the config agent based on the method
533 yield from self.process_nsd_vnf_configuration(nsr_obj, vnfr)
534 yield from self._config_agent_mgr.invoke_config_agent_plugins(
535 'notify_create_vnfr',
536 nsr_obj.agent_nsr,
537 agent_vnfr)
538
539 #####################TBD###########################
540 # self._log.debug("VNF active. Apply initial config for vnfr {}".format(vnfr.name))
541 # yield from self._config_agent_mgr.invoke_config_agent_plugins('apply_initial_config',
542 # vnfr.id, vnfr)
543 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_terminate_vnf', self.id, vnfr)
544
545 except Exception as e:
546 self._log.error("Failed processing NSR (%s) as (%s)", nsr_obj.nsr_name, str(e))
547 self._log.exception(e)
548 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_PROCESS_FAILED)
549 raise
550
551 try:
552 # Generate config_config_attributes.yaml (For debug reference)
553 with open(nsr_obj.config_attributes_file, "w") as yf:
554 my_yaml_dump(nsr_obj.nsr_cfg_config_attributes_dict, yf)
555 except Exception as e:
556 self._log.error("NS:(%s) failed to write config attributes file as (%s)", nsr_obj.nsr_name, str(e))
557
558 try:
559 # Generate nsr_xlate_dict.yaml (For debug reference)
560 with open(nsr_obj.xlate_dict_file, "w") as yf:
561 yf.write(yaml.dump(nsr_obj._cp_dict, default_flow_style=False))
562 except Exception as e:
563 self._log.error("NS:(%s) failed to write nsr xlate tags file as (%s)", nsr_obj.nsr_name, str(e))
564
565 self._log.debug("Starting to configure each VNF")
566
567 # Check if this NS has input parametrs
568 self._log.info("Checking NS configuration order: %s", nsr_obj.config_attributes_file)
569
570 if os.path.exists(nsr_obj.config_attributes_file):
571 # Apply configuration is specified order
572 try:
573 # Go in loop to configure by specified order
574 self._log.info("Using Dynamic configuration input parametrs for NS: %s", nsr_obj.nsr_name)
575
576 # cfg_delay = nsr_obj.nsr_cfg_config_attributes_dict['configuration_delay']
577 # if cfg_delay:
578 # self._log.info("Applying configuration delay for NS (%s) ; %d seconds",
579 # nsr_obj.nsr_name, cfg_delay)
580 # yield from asyncio.sleep(cfg_delay, loop=self._loop)
581
582 for config_attributes_dict in nsr_obj.nsr_cfg_config_attributes_dict.values():
583 # Iterate through each priority level
584 for vnf_config_attributes_dict in config_attributes_dict:
585 # Iterate through each vnfr at this priority level
586
587 # Make up vnf_unique_name with vnfd name and member index
588 #vnfr_name = "{}.{}".format(nsr_obj.nsr_name, vnf_config_attributes_dict['name'])
589 vnf_unique_name = get_vnf_unique_name(
590 nsr_obj.nsr_name,
591 vnf_config_attributes_dict['name'],
592 str(vnf_config_attributes_dict['member_vnf_index']),
593 )
594 self._log.info("NS (%s) : VNF (%s) - Processing configuration attributes",
595 nsr_obj.nsr_name, vnf_unique_name)
596
597 # Find vnfr for this vnf_unique_name
598 if vnf_unique_name not in nsr_obj._vnfr_dict:
599 self._log.error("NS (%s) - Can not find VNF to be configured: %s", nsr_obj.nsr_name, vnf_unique_name)
600 else:
601 # Save this unique VNF's config input parameters
602 nsr_obj.vnf_config_attributes_dict[vnf_unique_name] = vnf_config_attributes_dict
603 nsr_obj.ConfigVNF(nsr_obj._vnfr_dict[vnf_unique_name])
604
605 # Now add the entire NS to the pending config list.
606 self._log.info("Scheduling NSR:{} configuration".format(nsr_obj.nsr_name))
607 self._parent.add_to_pending(nsr_obj)
608 self._parent.add_nsr_obj(nsr_obj)
609
610 except Exception as e:
611 self._log.error("Failed processing input parameters for NS (%s) as %s", nsr_obj.nsr_name, str(e))
612 raise
613 else:
614 self._log.error("No configuration input parameters for NSR (%s)", nsr_obj.nsr_name)
615
616 except Exception as e:
617 self._log.error("Failed to configure NS (%s) as (%s)", nsr_obj.nsr_name, str(e))
618 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_PROCESS_FAILED)
619 raise
620
621 return True
622
623 @asyncio.coroutine
624 def terminate_NSR(self, id):
625 nsr_dict = self._nsr_dict
626 if id not in nsr_dict:
627 self._log.error("NSR(%s) does not exist!", id)
628 return
629 else:
630 # Remove this NSR if we have it on pending task list
631 for task in self.pending_tasks:
632 if task['nsrid'] == id:
633 self.del_from_pending_tasks(task)
634
635 # Remove this object from global list
636 nsr_obj = nsr_dict.pop(id, None)
637
638 # Remove this NS cm-state from global status list
639 self.cm_state['cm_nsr'].remove(nsr_obj.cm_nsr)
640
641 # Also remove any scheduled configuration event
642 for nsr_obj_p in self._parent.pending_cfg:
643 if nsr_obj_p == nsr_obj:
644 assert id == nsr_obj_p._nsr_id
645 #self._parent.pending_cfg.remove(nsr_obj_p)
646 # Mark this as being deleted so we do not try to configure it if we are in cfg_delay (will wake up and continue to process otherwise)
647 nsr_obj_p.being_deleted = True
648 self._log.info("Removed scheduled configuration for NSR(%s)", nsr_obj.nsr_name)
649
650 self._parent.remove_nsr_obj(id)
651
652 # Call Config Agent to clean up for each VNF
653 for agent_vnfr in nsr_obj.agent_nsr.vnfrs:
654 yield from self._config_agent_mgr.invoke_config_agent_plugins(
655 'notify_terminate_vnfr',
656 nsr_obj.agent_nsr,
657 agent_vnfr)
658
659 # publish delete cm-state (cm-nsr)
660 yield from nsr_obj.delete_cm_nsr()
661
662 #####################TBD###########################
663 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_terminate_ns', self.id)
664
665 self._log.info("NSR(%s/%s) is deleted", nsr_obj.nsr_name, id)
666
667 @asyncio.coroutine
668 def process_initial_config(self, nsr_obj, conf, script, vnfr_name=None):
669 '''Apply the initial-config-primitives specified in NSD or VNFD'''
670
671 def get_input_file(parameters):
672 inp = {}
673
674 # Add NSR name to file
675 inp['nsr_name'] = nsr_obj.nsr_name
676
677 # Add VNFR name if available
678 if vnfr_name:
679 inp['vnfr_name'] = vnfr_name
680
681 # TODO (pjoseph): Add config agents, we need to identify which all
682 # config agents are required from this NS and provide only those
683 inp['config-agent'] = {}
684
685 # Add parameters for initial config
686 inp['parameter'] = {}
687 for parameter in parameters:
688 try:
689 inp['parameter'][parameter['name']] = parameter['value']
690 except KeyError as e:
691 if vnfr_name:
692 self._log.info("VNFR {} initial config parameter {} with no value: {}".
693 format(vnfr_name, parameter, e))
694 else:
695 self._log.info("NSR {} initial config parameter {} with no value: {}".
696 format(nsr_obj.nsr_name, parameter, e))
697
698
699 # Add vnfrs specific data
700 inp['vnfr'] = {}
701 for vnfr in nsr_obj.vnfrs:
702 v = {}
703
704 v['name'] = vnfr['name']
705 v['mgmt_ip_address'] = vnfr['vnf_cfg']['mgmt_ip_address']
706 v['mgmt_port'] = vnfr['vnf_cfg']['port']
707
708 if 'dashboard_url' in vnfr:
709 v['dashboard_url'] = vnfr['dashboard_url']
710
711 if 'connection_point' in vnfr:
712 v['connection_point'] = []
713 for cp in vnfr['connection_point']:
714 v['connection_point'].append(
715 {
716 'name': cp['name'],
717 'ip_address': cp['ip_address'],
718 }
719 )
720
721 v['vdur'] = []
722 vdu_data = []
723 for vdu in vnfr['vdur']:
724 d = {}
725 for k in ['name','management_ip', 'vm_management_ip', 'id']:
726 if k in vdu:
727 d[k] = vdu[k]
728 vdu_data.append(d)
729 v['vdur'].append(vdu_data)
730
731 inp['vnfr'][vnfr['member_vnf_index_ref']] = v
732
733 self._log.debug("Input data for {}: {}".
734 format((vnfr_name if vnfr_name else nsr_obj.nsr_name),
735 inp))
736
737 # Convert to YAML string
738 yaml_string = yaml.dump(inp, default_flow_style=False)
739
740 # Write the inputs as yaml file
741 tmp_file = None
742 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
743 tmp_file.write(yaml_string.encode("UTF-8"))
744 self._log.debug("Input file created for {}: {}".
745 format((vnfr_name if vnfr_name \
746 else nsr_obj.nsr_name),
747 tmp_file.name))
748
749 return tmp_file.name
750
751 parameters = []
752 try:
753 parameters = conf['parameter']
754 except Exception as e:
755 self._log.debug("Parameter conf: {}, e: {}".
756 format(conf, e))
757
758 inp_file = get_input_file(parameters)
759
760 cmd = "{0} {1}".format(script, inp_file)
761 self._log.debug("Running the CMD: {}".format(cmd))
762
763 process = yield from asyncio.create_subprocess_shell(cmd,
764 loop=self._loop,
765 stdout=subprocess.PIPE,
766 stderr=subprocess.PIPE)
767 stdout, stderr = yield from process.communicate()
768 rc = yield from process.wait()
769
770 if rc:
771 msg = "NSR/VNFR {} initial config using {} failed with {}: {}". \
772 format(vnfr_name if vnfr_name else nsr_obj.nsr_name,
773 script, rc, stderr)
774 self._log.error(msg)
775 raise InitialConfigError(msg)
776
777 try:
778 os.remove(inp_file)
779 except Exception as e:
780 self._log.debug("Error removing input file {}: {}".
781 format(inp_file, e))
782
783 def get_script_file(self, script_name, d_name, d_id, d_type):
784 # Get the full path to the script
785 script = ''
786 # If script name starts with /, assume it is full path
787 if script_name[0] == '/':
788 # The script has full path, use as is
789 script = script_name
790 else:
791 script = os.path.join(os.environ['RIFT_ARTIFACTS'],
792 'launchpad/packages',
793 d_type,
794 d_id,
795 d_name,
796 'scripts',
797 script_name)
798 self._log.debug("Checking for script at %s", script)
799 if not os.path.exists(script):
800 self._log.warning("Did not find script %s", script)
801 script = os.path.join(os.environ['RIFT_INSTALL'],
802 'usr/bin',
803 script_name)
804
805 # Seen cases in jenkins, where the script execution fails
806 # with permission denied. Setting the permission on script
807 # to make sure it has execute permission
808 perm = os.stat(script).st_mode
809 if not (perm & stat.S_IXUSR):
810 self._log.warning("NSR/VNFR {} initial config script {} " \
811 "without execute permission: {}".
812 format(d_name, script, perm))
813 os.chmod(script, perm | stat.S_IXUSR)
814 return script
815
816 @asyncio.coroutine
817 def process_ns_initial_config(self, nsr_obj):
818 '''Apply the initial-config-primitives specified in NSD'''
819
820 nsr = yield from self.cmdts_obj.get_nsr(nsr_obj.nsr_id)
821 if 'initial_config_primitive' not in nsr:
822 return
823
824 if nsr is not None:
825 nsd = yield from self.cmdts_obj.get_nsd(nsr_obj.nsr_id)
826 for conf in nsr['initial_config_primitive']:
827 self._log.debug("NSR {} initial config: {}".
828 format(nsr_obj.nsr_name, conf))
829 script = self.get_script_file(conf['user_defined_script'],
830 nsd.name,
831 nsd.id,
832 'nsd')
833
834 yield from self.process_initial_config(nsr_obj, conf, script)
835
836 @asyncio.coroutine
837 def process_vnf_initial_config(self, nsr_obj, vnfr):
838 '''Apply the initial-config-primitives specified in VNFD'''
839
840 vnfr_name = vnfr.name
841
842 vnfd = vnfr.vnfd
843 vnf_cfg = vnfd.vnf_configuration
844
845 for conf in vnf_cfg.initial_config_primitive:
846 self._log.debug("VNFR {} initial config: {}".
847 format(vnfr_name, conf))
848
849 if not conf.user_defined_script:
850 self._log.debug("VNFR {} did not fine user defined script: {}".
851 format(vnfr_name, conf))
852 continue
853
854 script = self.get_script_file(conf.user_defined_script,
855 vnfd.name,
856 vnfd.id,
857 'vnfd')
858
859 yield from self.process_initial_config(nsr_obj,
860 conf.as_dict(),
861 script,
862 vnfr_name=vnfr_name)
863
864
865 class ConfigManagerNSR(object):
866 def __init__(self, log, loop, parent, id):
867 self._log = log
868 self._loop = loop
869 self._rwcal = None
870 self._vnfr_dict = {}
871 self._cp_dict = {}
872 self._nsr_id = id
873 self._parent = parent
874 self._log.info("Instantiated NSR entry for id=%s", id)
875 self.nsr_cfg_config_attributes_dict = {}
876 self.vnf_config_attributes_dict = {}
877 self.num_vnfs_to_cfg = 0
878 self._vnfr_list = []
879 self.vnf_cfg_list = []
880 self.this_nsr_dir = None
881 self.being_deleted = False
882 self.dts_obj = self._parent.cmdts_obj
883
884 # Initialize cm-state for this NS
885 self.cm_nsr = {}
886 self.cm_nsr['cm_vnfr'] = []
887 self.cm_nsr['id'] = id
888 self.cm_nsr['state'] = self.state_to_string(conmanY.RecordState.INIT)
889 self.cm_nsr['state_details'] = None
890
891 self.set_nsr_name('Not Set')
892
893 # Add this NSR cm-state object to global cm-state
894 parent.cm_state['cm_nsr'].append(self.cm_nsr)
895
896 # Place holders for NSR & VNFR classes
897 self.agent_nsr = None
898
899 @property
900 def nsr_opdata_xpath(self):
901 ''' Returns full xpath for this NSR cm-state opdata '''
902 return(
903 "D,/rw-conman:cm-state" +
904 "/rw-conman:cm-nsr[rw-conman:id='{}']"
905 ).format(self._nsr_id)
906
907 @property
908 def vnfrs(self):
909 return self._vnfr_list
910
911 @property
912 def parent(self):
913 return self._parent
914
915 @property
916 def nsr_id(self):
917 return self._nsr_id
918
919 @asyncio.coroutine
920 def publish_cm_state(self):
921 ''' This function publishes cm_state for this NSR '''
922
923 cm_state = conmanY.CmOpdata()
924 cm_state_nsr = cm_state.cm_nsr.add()
925 cm_state_nsr.from_dict(self.cm_nsr)
926 #with self._dts.transaction() as xact:
927 yield from self.dts_obj.update(self.nsr_opdata_xpath, cm_state_nsr)
928 self._log.info("Published cm-state with xpath %s and nsr %s",
929 self.nsr_opdata_xpath,
930 cm_state_nsr)
931
932 @asyncio.coroutine
933 def delete_cm_nsr(self):
934 ''' This function publishes cm_state for this NSR '''
935
936 yield from self.dts_obj.delete(self.nsr_opdata_xpath)
937 self._log.info("Deleted cm-nsr with xpath %s",
938 self.nsr_opdata_xpath)
939
940 def set_nsr_name(self, name):
941 self.nsr_name = name
942 self.cm_nsr['name'] = name
943
944 def set_config_dir(self, caller):
945 self.this_nsr_dir = os.path.join(
946 caller._parent.cfg_dir, self.nsr_name, caller._nsr['name_ref'])
947 if not os.path.exists(self.this_nsr_dir):
948 os.makedirs(self.this_nsr_dir)
949 self._log.debug("NSR:(%s), Created configuration directory(%s)",
950 caller._nsr['name_ref'], self.this_nsr_dir)
951 self.config_attributes_file = os.path.join(self.this_nsr_dir, "configuration_config_attributes.yml")
952 self.xlate_dict_file = os.path.join(self.this_nsr_dir, "nsr_xlate_dict.yml")
953
954 def xlate_conf(self, vnfr, vnf_cfg):
955
956 # If configuration type is not already set, try to read from attributes
957 if vnf_cfg['interface_type'] is None:
958 # Prepare unique name for this VNF
959 vnf_unique_name = get_vnf_unique_name(
960 vnf_cfg['nsr_name'],
961 vnfr['short_name'],
962 vnfr['member_vnf_index_ref'],
963 )
964
965 # Find this particular (unique) VNF's config attributes
966 if (vnf_unique_name in self.vnf_config_attributes_dict):
967 vnf_cfg_config_attributes_dict = self.vnf_config_attributes_dict[vnf_unique_name]
968 vnf_cfg['interface_type'] = vnf_cfg_config_attributes_dict['configuration_type']
969 if 'configuration_options' in vnf_cfg_config_attributes_dict:
970 cfg_opts = vnf_cfg_config_attributes_dict['configuration_options']
971 for key, value in cfg_opts.items():
972 vnf_cfg[key] = value
973
974 cfg_path_prefix = '{}/{}/{}_{}'.format(
975 self._parent._parent.cfg_dir,
976 vnf_cfg['nsr_name'],
977 vnfr['short_name'],
978 vnfr['member_vnf_index_ref'],
979 )
980
981 vnf_cfg['cfg_template'] = '{}_{}_template.cfg'.format(cfg_path_prefix, vnf_cfg['interface_type'])
982 vnf_cfg['cfg_file'] = '{}.cfg'.format(cfg_path_prefix)
983 vnf_cfg['xlate_script'] = self._parent._parent.cfg_dir + '/xlate_cfg.py'
984
985 self._log.debug("VNF endpoint so far: %s", vnf_cfg)
986
987 self._log.info("Checking cfg_template %s", vnf_cfg['cfg_template'])
988 if os.path.exists(vnf_cfg['cfg_template']):
989 return True
990 return False
991
992 def ConfigVNF(self, vnfr):
993
994 vnf_cfg = vnfr['vnf_cfg']
995 vnf_cm_state = self.find_or_create_vnfr_cm_state(vnf_cfg)
996
997 if (vnf_cm_state['state'] == self.state_to_string(conmanY.RecordState.READY_NO_CFG)
998 or
999 vnf_cm_state['state'] == self.state_to_string(conmanY.RecordState.READY)):
1000 self._log.warning("NS/VNF (%s/%s) is already configured! Skipped.", self.nsr_name, vnfr['short_name'])
1001 return
1002
1003 #UPdate VNF state
1004 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS)
1005
1006 # Now translate the configuration for iP addresses
1007 try:
1008 # Add cp_dict members (TAGS) for this VNF
1009 self._cp_dict['rw_mgmt_ip'] = vnf_cfg['mgmt_ip_address']
1010 self._cp_dict['rw_username'] = vnf_cfg['username']
1011 self._cp_dict['rw_password'] = vnf_cfg['password']
1012 ############################################################
1013 # TBD - Need to lookup above 3 for a given VNF, not global #
1014 # Once we do that no need to dump below file again before #
1015 # each VNF configuration translation. #
1016 # This will require all existing config templates to be #
1017 # changed for above three tags to include member index #
1018 ############################################################
1019 try:
1020 nsr_obj = vnf_cfg['nsr_obj']
1021 # Generate config_config_attributes.yaml (For debug reference)
1022 with open(nsr_obj.xlate_dict_file, "w") as yf:
1023 yf.write(yaml.dump(nsr_obj._cp_dict, default_flow_style=False))
1024 except Exception as e:
1025 self._log.error("NS:(%s) failed to write nsr xlate tags file as (%s)", nsr_obj.nsr_name, str(e))
1026
1027 if 'cfg_template' in vnf_cfg:
1028 script_cmd = 'python3 {} -i {} -o {} -x "{}"'.format(vnf_cfg['xlate_script'], vnf_cfg['cfg_template'], vnf_cfg['cfg_file'], self.xlate_dict_file)
1029 self._log.debug("xlate script command (%s)", script_cmd)
1030 #xlate_msg = subprocess.check_output(script_cmd).decode('utf-8')
1031 xlate_msg = subprocess.check_output(script_cmd, shell=True).decode('utf-8')
1032 self._log.info("xlate script output (%s)", xlate_msg)
1033 except Exception as e:
1034 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS_FAILED)
1035 self._log.error("Failed to execute translation script for VNF: %s with (%s)", log_this_vnf(vnf_cfg), str(e))
1036 return
1037
1038 self._log.info("Applying config to VNF: %s = %s!", log_this_vnf(vnf_cfg), vnf_cfg)
1039 try:
1040 #self.vnf_cfg_list.append(vnf_cfg)
1041 self._log.debug("Scheduled configuration!")
1042 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_SCHED)
1043 except Exception as e:
1044 self._log.error("Failed apply_vnf_config to VNF: %s as (%s)", log_this_vnf(vnf_cfg), str(e))
1045 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS_FAILED)
1046 raise
1047
1048 def add(self, nsr):
1049 self._log.info("Adding NS Record for id=%s", id)
1050 self._nsr = nsr
1051
1052 def sample_cm_state(self):
1053 return (
1054 {
1055 'cm_nsr': [
1056 {
1057 'cm_vnfr': [
1058 {
1059 'cfg_location': 'location1',
1060 'cfg_type': 'script',
1061 'connection_point': [
1062 {'ip_address': '1.1.1.1', 'name': 'vnf1cp1'},
1063 {'ip_address': '1.1.1.2', 'name': 'vnf1cp2'}
1064 ],
1065 'id': 'vnfrid1',
1066 'mgmt_interface': {'ip_address': '7.1.1.1',
1067 'port': 1001},
1068 'name': 'vnfrname1',
1069 'state': 'init'
1070 },
1071 {
1072 'cfg_location': 'location2',
1073 'cfg_type': 'netconf',
1074 'connection_point': [{'ip_address': '2.1.1.1', 'name': 'vnf2cp1'},
1075 {'ip_address': '2.1.1.2', 'name': 'vnf2cp2'}],
1076 'id': 'vnfrid2',
1077 'mgmt_interface': {'ip_address': '7.1.1.2',
1078 'port': 1001},
1079 'name': 'vnfrname2',
1080 'state': 'init'}
1081 ],
1082 'id': 'nsrid1',
1083 'name': 'nsrname1',
1084 'state': 'init'}
1085 ],
1086 'states': 'Initialized, '
1087 })
1088
1089 def populate_vm_state_from_vnf_cfg(self):
1090 # Fill in each VNFR from this nsr object
1091 vnfr_list = self._vnfr_list
1092 for vnfr in vnfr_list:
1093 vnf_cfg = vnfr['vnf_cfg']
1094 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1095
1096 if vnf_cm_state:
1097 # Fill in VNF management interface
1098 vnf_cm_state['mgmt_interface']['ip_address'] = vnf_cfg['mgmt_ip_address']
1099 vnf_cm_state['mgmt_interface']['port'] = vnf_cfg['port']
1100
1101 # Fill in VNF configuration details
1102 vnf_cm_state['cfg_type'] = vnf_cfg['config_method']
1103 vnf_cm_state['cfg_location'] = vnf_cfg['cfg_file']
1104
1105 # Fill in each connection-point for this VNF
1106 if "connection_point" in vnfr:
1107 cp_list = vnfr['connection_point']
1108 for cp_item_dict in cp_list:
1109 vnf_cm_state['connection_point'].append(
1110 {
1111 'name' : cp_item_dict['name'],
1112 'ip_address' : cp_item_dict['ip_address'],
1113 }
1114 )
1115
1116 def state_to_string(self, state):
1117 state_dict = {
1118 conmanY.RecordState.INIT : "init",
1119 conmanY.RecordState.RECEIVED : "received",
1120 conmanY.RecordState.CFG_PROCESS : "cfg_process",
1121 conmanY.RecordState.CFG_PROCESS_FAILED : "cfg_process_failed",
1122 conmanY.RecordState.CFG_SCHED : "cfg_sched",
1123 conmanY.RecordState.CFG_DELAY : "cfg_delay",
1124 conmanY.RecordState.CONNECTING : "connecting",
1125 conmanY.RecordState.FAILED_CONNECTION : "failed_connection",
1126 conmanY.RecordState.NETCONF_CONNECTED : "netconf_connected",
1127 conmanY.RecordState.NETCONF_SSH_CONNECTED : "netconf_ssh_connected",
1128 conmanY.RecordState.RESTCONF_CONNECTED : "restconf_connected",
1129 conmanY.RecordState.CFG_SEND : "cfg_send",
1130 conmanY.RecordState.CFG_FAILED : "cfg_failed",
1131 conmanY.RecordState.READY_NO_CFG : "ready_no_cfg",
1132 conmanY.RecordState.READY : "ready",
1133 }
1134 return state_dict[state]
1135
1136 def find_vnfr_cm_state(self, id):
1137 if self.cm_nsr['cm_vnfr']:
1138 for vnf_cm_state in self.cm_nsr['cm_vnfr']:
1139 if vnf_cm_state['id'] == id:
1140 return vnf_cm_state
1141 return None
1142
1143 def find_or_create_vnfr_cm_state(self, vnf_cfg):
1144 vnfr = vnf_cfg['vnfr']
1145 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1146
1147 if vnf_cm_state is None:
1148 # Not found, Create and Initialize this VNF cm-state
1149 vnf_cm_state = {
1150 'id' : vnfr['id'],
1151 'name' : vnfr['short_name'],
1152 'state' : self.state_to_string(conmanY.RecordState.RECEIVED),
1153 'mgmt_interface' :
1154 {
1155 'ip_address' : vnf_cfg['mgmt_ip_address'],
1156 'port' : vnf_cfg['port'],
1157 },
1158 'cfg_type' : vnf_cfg['config_method'],
1159 'cfg_location' : vnf_cfg['cfg_file'],
1160 'connection_point' : [],
1161 }
1162 self.cm_nsr['cm_vnfr'].append(vnf_cm_state)
1163
1164 # Publish newly created cm-state
1165
1166
1167 return vnf_cm_state
1168
1169 @asyncio.coroutine
1170 def get_vnf_cm_state(self, vnfr):
1171 if vnfr:
1172 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1173 if vnf_cm_state:
1174 return vnf_cm_state['state']
1175 return False
1176
1177 @asyncio.coroutine
1178 def update_vnf_cm_state(self, vnfr, state):
1179 if vnfr:
1180 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1181 if vnf_cm_state is None:
1182 self._log.error("No opdata found for NS/VNF:%s/%s!",
1183 self.nsr_name, vnfr['short_name'])
1184 return
1185
1186 if vnf_cm_state['state'] != self.state_to_string(state):
1187 old_state = vnf_cm_state['state']
1188 vnf_cm_state['state'] = self.state_to_string(state)
1189 # Publish new state
1190 yield from self.publish_cm_state()
1191 self._log.info("VNF ({}/{}/{}) state change: {} -> {}"
1192 .format(self.nsr_name,
1193 vnfr['short_name'],
1194 vnfr['member_vnf_index_ref'],
1195 old_state,
1196 vnf_cm_state['state']))
1197
1198 else:
1199 self._log.error("No VNFR supplied for state update (NS=%s)!",
1200 self.nsr_name)
1201
1202 @property
1203 def get_ns_cm_state(self):
1204 return self.cm_nsr['state']
1205
1206 @asyncio.coroutine
1207 def update_ns_cm_state(self, state, state_details=None):
1208 if self.cm_nsr['state'] != self.state_to_string(state):
1209 old_state = self.cm_nsr['state']
1210 self.cm_nsr['state'] = self.state_to_string(state)
1211 self.cm_nsr['state_details'] = state_details if state_details is not None else None
1212 self._log.info("NS ({}) state change: {} -> {}"
1213 .format(self.nsr_name,
1214 old_state,
1215 self.cm_nsr['state']))
1216 # Publish new state
1217 yield from self.publish_cm_state()
1218
1219 @asyncio.coroutine
1220 def add_vnfr(self, vnfr, vnfr_msg):
1221
1222 @asyncio.coroutine
1223 def populate_subnets_from_vlr(id):
1224 try:
1225 # Populate cp_dict with VLR subnet info
1226 vlr = yield from self.dts_obj.get_vlr(id)
1227 if vlr is not None and 'assigned_subnet' in vlr:
1228 subnet = {vlr.name:vlr.assigned_subnet}
1229 self._cp_dict[vnfr['member_vnf_index_ref']].update(subnet)
1230 self._cp_dict.update(subnet)
1231 self._log.debug("VNF:(%s) Updated assigned subnet = %s",
1232 vnfr['short_name'], subnet)
1233 except Exception as e:
1234 self._log.error("VNF:(%s) VLR Error = %s",
1235 vnfr['short_name'], e)
1236
1237 if vnfr['id'] not in self._vnfr_dict:
1238 self._log.info("NSR(%s) : Adding VNF Record for name=%s, id=%s", self._nsr_id, vnfr['short_name'], vnfr['id'])
1239 # Add this vnfr to the list for show, or single traversal
1240 self._vnfr_list.append(vnfr)
1241 else:
1242 self._log.warning("NSR(%s) : VNF Record for name=%s, id=%s already exists, overwriting", self._nsr_id, vnfr['short_name'], vnfr['id'])
1243
1244 # Make vnfr available by id as well as by name
1245 unique_name = get_vnf_unique_name(self.nsr_name, vnfr['short_name'], vnfr['member_vnf_index_ref'])
1246 self._vnfr_dict[unique_name] = vnfr
1247 self._vnfr_dict[vnfr['id']] = vnfr
1248
1249 # Create vnf_cfg dictionary with default values
1250 vnf_cfg = {
1251 'nsr_obj' : self,
1252 'vnfr' : vnfr,
1253 'agent_vnfr' : self.agent_nsr.add_vnfr(vnfr, vnfr_msg),
1254 'nsr_name' : self.nsr_name,
1255 'nsr_id' : self._nsr_id,
1256 'vnfr_name' : vnfr['short_name'],
1257 'member_vnf_index' : vnfr['member_vnf_index_ref'],
1258 'port' : 0,
1259 'username' : 'admin',
1260 'password' : 'admin',
1261 'config_method' : 'None',
1262 'protocol' : 'None',
1263 'mgmt_ip_address' : '0.0.0.0',
1264 'cfg_file' : 'None',
1265 'cfg_retries' : 0,
1266 'script_type' : 'bash',
1267 }
1268
1269 # Update the mgmt ip address
1270 # In case the config method is none, this is not
1271 # updated later
1272 try:
1273 vnf_cfg['mgmt_ip_address'] = vnfr_msg.mgmt_interface.ip_address
1274 vnf_cfg['port'] = vnfr_msg.mgmt_interface.port
1275 except Exception as e:
1276 self._log.warn(
1277 "VNFR {}({}), unable to retrieve mgmt ip address: {}".
1278 format(vnfr['short_name'], vnfr['id'], e))
1279
1280 vnfr['vnf_cfg'] = vnf_cfg
1281 self.find_or_create_vnfr_cm_state(vnf_cfg)
1282
1283 '''
1284 Build the connection-points list for this VNF (self._cp_dict)
1285 '''
1286 # Populate global CP list self._cp_dict from VNFR
1287 cp_list = []
1288 if 'connection_point' in vnfr:
1289 cp_list = vnfr['connection_point']
1290
1291 self._cp_dict[vnfr['member_vnf_index_ref']] = {}
1292 if 'vdur' in vnfr:
1293 for vdur in vnfr['vdur']:
1294 if 'internal_connection_point' in vdur:
1295 cp_list += vdur['internal_connection_point']
1296
1297 for cp_item_dict in cp_list:
1298 # Populate global dictionary
1299 self._cp_dict[
1300 cp_item_dict['name']
1301 ] = cp_item_dict['ip_address']
1302
1303 # Populate unique member specific dictionary
1304 self._cp_dict[
1305 vnfr['member_vnf_index_ref']
1306 ][
1307 cp_item_dict['name']
1308 ] = cp_item_dict['ip_address']
1309
1310 # Fill in the subnets from vlr
1311 if 'vlr_ref' in cp_item_dict:
1312 ### HACK: Internal connection_point do not have VLR reference
1313 yield from populate_subnets_from_vlr(cp_item_dict['vlr_ref'])
1314
1315 if 'internal_vlr' in vnfr:
1316 for ivlr in vnfr['internal_vlr']:
1317 yield from populate_subnets_from_vlr(ivlr['vlr_ref'])
1318
1319 # Update vnfr
1320 vnf_cfg['agent_vnfr']._vnfr = vnfr
1321 return vnf_cfg['agent_vnfr']
1322
1323
1324 class XPaths(object):
1325 @staticmethod
1326 def nsr_opdata(k=None):
1327 return ("D,/nsr:ns-instance-opdata/nsr:nsr" +
1328 ("[nsr:ns-instance-config-ref='{}']".format(k) if k is not None else ""))
1329
1330 @staticmethod
1331 def nsd_msg(k=None):
1332 return ("C,/nsd:nsd-catalog/nsd:nsd" +
1333 "[nsd:id = '{}']".format(k) if k is not None else "")
1334
1335 @staticmethod
1336 def vnfr_opdata(k=None):
1337 return ("D,/vnfr:vnfr-catalog/vnfr:vnfr" +
1338 ("[vnfr:id='{}']".format(k) if k is not None else ""))
1339
1340 @staticmethod
1341 def vnfd(k=None):
1342 return ("C,/vnfd:vnfd-catalog/vnfd:vnfd" +
1343 ("[vnfd:id='{}']".format(k) if k is not None else ""))
1344
1345 @staticmethod
1346 def config_agent(k=None):
1347 return ("D,/rw-config-agent:config-agent/rw-config-agent:account" +
1348 ("[rw-config-agent:name='{}']".format(k) if k is not None else ""))
1349
1350 @staticmethod
1351 def nsr_config(k=None):
1352 return ("C,/nsr:ns-instance-config/nsr:nsr[nsr:id='{}']".format(k) if k is not None else "")
1353
1354 @staticmethod
1355 def vlr(k=None):
1356 return ("D,/vlr:vlr-catalog/vlr:vlr[vlr:id='{}']".format(k) if k is not None else "")
1357
1358 class ConfigManagerDTS(object):
1359 ''' This class either reads from DTS or publishes to DTS '''
1360
1361 def __init__(self, log, loop, parent, dts):
1362 self._log = log
1363 self._loop = loop
1364 self._parent = parent
1365 self._dts = dts
1366
1367 @asyncio.coroutine
1368 def _read_dts(self, xpath, do_trace=False):
1369 self._log.debug("_read_dts path = %s", xpath)
1370 flags = rwdts.XactFlag.MERGE
1371 res_iter = yield from self._dts.query_read(
1372 xpath, flags=flags
1373 )
1374
1375 results = []
1376 try:
1377 for i in res_iter:
1378 result = yield from i
1379 if result is not None:
1380 results.append(result.result)
1381 except:
1382 pass
1383
1384 return results
1385
1386
1387 @asyncio.coroutine
1388 def get_nsr(self, id):
1389 self._log.debug("Attempting to get NSR: %s", id)
1390 nsrl = yield from self._read_dts(XPaths.nsr_opdata(id), False)
1391 nsr = None
1392 if len(nsrl) > 0:
1393 nsr = nsrl[0].as_dict()
1394 return nsr
1395
1396 @asyncio.coroutine
1397 def get_nsr_config(self, id):
1398 self._log.debug("Attempting to get config NSR: %s", id)
1399 nsrl = yield from self._read_dts(XPaths.nsr_config(id), False)
1400 nsr = None
1401 if len(nsrl) > 0:
1402 nsr = nsrl[0]
1403 return nsr
1404
1405 @asyncio.coroutine
1406 def get_nsd_msg(self, id):
1407 self._log.debug("Attempting to get NSD: %s", id)
1408 nsdl = yield from self._read_dts(XPaths.nsd_msg(id), False)
1409 nsd_msg = None
1410 if len(nsdl) > 0:
1411 nsd_msg = nsdl[0]
1412 return nsd_msg
1413
1414 @asyncio.coroutine
1415 def get_nsd(self, nsr_id):
1416 self._log.debug("Attempting to get NSD for NSR: %s", id)
1417 nsr_config = yield from self.get_nsr_config(nsr_id)
1418 nsd_msg = nsr_config.nsd
1419 return nsd_msg
1420
1421 @asyncio.coroutine
1422 def get_vnfr(self, id):
1423 self._log.debug("Attempting to get VNFR: %s", id)
1424 vnfrl = yield from self._read_dts(XPaths.vnfr_opdata(id), do_trace=False)
1425 vnfr_msg = None
1426 if len(vnfrl) > 0:
1427 vnfr_msg = vnfrl[0]
1428 return vnfr_msg
1429
1430 @asyncio.coroutine
1431 def get_vnfd(self, vnfd_id):
1432 self._log.debug("Attempting to get VNFD: %s", vnfd_id)
1433 vnfdl = yield from self._read_dts(XPaths.vnfd(vnfd_id), do_trace=False)
1434 vnfd_msg = None
1435 if len(vnfdl) > 0:
1436 vnfd_msg = vnfdl[0]
1437 return vnfd_msg
1438
1439 @asyncio.coroutine
1440 def get_vlr(self, id):
1441 self._log.debug("Attempting to get VLR subnet: %s", id)
1442 vlrl = yield from self._read_dts(XPaths.vlr(id), do_trace=True)
1443 vlr_msg = None
1444 if len(vlrl) > 0:
1445 vlr_msg = vlrl[0]
1446 return vlr_msg
1447
1448 @asyncio.coroutine
1449 def get_config_agents(self, name):
1450 self._log.debug("Attempting to get config_agents: %s", name)
1451 cfgagentl = yield from self._read_dts(XPaths.config_agent(name), False)
1452 return cfgagentl
1453
1454 @asyncio.coroutine
1455 def update(self, path, msg, flags=rwdts.XactFlag.REPLACE):
1456 """
1457 Update a cm-state (cm-nsr) record in DTS with the path and message
1458 """
1459 self._log.debug("Updating cm-state %s:%s dts_pub_hdl = %s", path, msg, self.dts_pub_hdl)
1460 self.dts_pub_hdl.update_element(path, msg, flags)
1461 self._log.debug("Updated cm-state, %s:%s", path, msg)
1462
1463 @asyncio.coroutine
1464 def delete(self, path):
1465 """
1466 Delete cm-nsr record in DTS with the path only
1467 """
1468 self._log.debug("Deleting cm-nsr %s dts_pub_hdl = %s", path, self.dts_pub_hdl)
1469 self.dts_pub_hdl.delete_element(path)
1470 self._log.debug("Deleted cm-nsr, %s", path)
1471
1472 @asyncio.coroutine
1473 def register(self):
1474 yield from self.register_to_publish()
1475 yield from self.register_for_nsr()
1476
1477 @asyncio.coroutine
1478 def register_to_publish(self):
1479 ''' Register to DTS for publishing cm-state opdata '''
1480
1481 xpath = "D,/rw-conman:cm-state/rw-conman:cm-nsr"
1482 self._log.debug("Registering to publish cm-state @ %s", xpath)
1483 hdl = rift.tasklets.DTS.RegistrationHandler()
1484 with self._dts.group_create() as group:
1485 self.dts_pub_hdl = group.register(xpath=xpath,
1486 handler=hdl,
1487 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ)
1488
1489 @property
1490 def nsr_xpath(self):
1491 return "D,/nsr:ns-instance-opdata/nsr:nsr"
1492
1493 @asyncio.coroutine
1494 def register_for_nsr(self):
1495 """ Register for NSR changes """
1496
1497 @asyncio.coroutine
1498 def on_prepare(xact_info, query_action, ks_path, msg):
1499 """ This NSR is created """
1500 self._log.debug("Received NSR instantiate on_prepare (%s:%s:%s)",
1501 query_action,
1502 ks_path,
1503 msg)
1504
1505 if (query_action == rwdts.QueryAction.UPDATE or
1506 query_action == rwdts.QueryAction.CREATE):
1507 msg_dict = msg.as_dict()
1508 # Update Each NSR/VNFR state)
1509 if ('operational_status' in msg_dict and
1510 msg_dict['operational_status'] == 'running'):
1511 # Add to the task list
1512 self._parent.add_to_pending_tasks({'nsrid' : msg_dict['ns_instance_config_ref'], 'retries' : 5})
1513 elif query_action == rwdts.QueryAction.DELETE:
1514 nsr_id = msg.ns_instance_config_ref
1515 asyncio.ensure_future(self._parent.terminate_NSR(nsr_id), loop=self._loop)
1516 else:
1517 raise NotImplementedError(
1518 "%s action on cm-state not supported",
1519 query_action)
1520
1521 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1522
1523 try:
1524 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
1525 self.dts_reg_hdl = yield from self._dts.register(self.nsr_xpath,
1526 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1527 handler=handler)
1528 except Exception as e:
1529 self._log.error("Failed to register for NSR changes as %s", str(e))
1530
1531