Merge branch 'v1.1'
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / rwconman_config.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import os
20 import stat
21 import subprocess
22 import sys
23 import tempfile
24 import yaml
25
26 from gi.repository import (
27 RwDts as rwdts,
28 RwConmanYang as conmanY,
29 ProtobufC,
30 )
31
32 import rift.tasklets
33
34 from . import rwconman_conagent as conagent
35 from . import RiftCM_rpc
36 from . import riftcm_config_plugin
37
38 if sys.version_info < (3, 4, 4):
39 asyncio.ensure_future = asyncio.async
40
41 def get_vnf_unique_name(nsr_name, vnfr_short_name, member_vnf_index):
42 return "{}.{}.{}".format(nsr_name, vnfr_short_name, member_vnf_index)
43
44 class ConmanConfigError(Exception):
45 pass
46
47
48 class InitialConfigError(ConmanConfigError):
49 pass
50
51
52 def log_this_vnf(vnf_cfg):
53 log_vnf = ""
54 used_item_list = ['nsr_name', 'vnfr_name', 'member_vnf_index', 'mgmt_ip_address']
55 for item in used_item_list:
56 if item in vnf_cfg:
57 if item == 'mgmt_ip_address':
58 log_vnf += "({})".format(vnf_cfg[item])
59 else:
60 log_vnf += "{}/".format(vnf_cfg[item])
61 return log_vnf
62
63 class PretendNsm(object):
64 def __init__(self, dts, log, loop, parent):
65 self._dts = dts
66 self._log = log
67 self._loop = loop
68 self._parent = parent
69 self._nsrs = {}
70 self._nsr_dict = parent._nsr_dict
71 self._config_agent_plugins = []
72 self._nsd_msg = {}
73
74 @property
75 def nsrs(self):
76 # Expensive, instead use get_nsr, if you know id.
77 self._nsrs = {}
78 # Update the list of nsrs (agent nsr)
79 for id, nsr_obj in self._nsr_dict.items():
80 self._nsrs[id] = nsr_obj.agent_nsr
81 return self._nsrs
82
83 def get_nsr(self, nsr_id):
84 if nsr_id in self._nsr_dict:
85 nsr_obj = self._nsr_dict[nsr_id]
86 return nsr_obj._nsr
87 return None
88
89 def get_vnfr_msg(self, vnfr_id, nsr_id=None):
90 self._log.debug("get_vnfr_msg(vnfr=%s, nsr=%s)",
91 vnfr_id, nsr_id)
92 found = False
93 if nsr_id:
94 if nsr_id in self._nsr_dict:
95 nsr_obj = self._nsr_dict[nsr_id]
96 if vnfr_id in nsr_obj._vnfr_dict:
97 found = True
98 else:
99 for nsr_obj in self._nsr_dict.values():
100 if vnfr_id in nsr_obj._vnfr_dict:
101 # Found it
102 found = True
103 break
104 if found:
105 vnf_cfg = nsr_obj._vnfr_dict[vnfr_id]['vnf_cfg']
106 return vnf_cfg['agent_vnfr'].vnfr_msg
107 else:
108 return None
109
110 @asyncio.coroutine
111 def get_nsd(self, nsr_id):
112 if nsr_id not in self._nsd_msg:
113 nsr_config = yield from self._parent.cmdts_obj.get_nsr_config(nsr_id)
114 self._nsd_msg[nsr_id] = nsr_config.nsd
115 return self._nsd_msg[nsr_id]
116
117 @property
118 def config_agent_plugins(self):
119 self._config_agent_plugins = []
120 for agent in self._parent._config_agent_mgr._plugin_instances.values():
121 self._config_agent_plugins.append(agent)
122 return self._config_agent_plugins
123
124 class ConfigManagerConfig(object):
125 def __init__(self, dts, log, loop, parent):
126 self._dts = dts
127 self._log = log
128 self._loop = loop
129 self._parent = parent
130 self._nsr_dict = {}
131 self.pending_cfg = {}
132 self.terminate_cfg = {}
133 self.pending_tasks = [] # User for NSRid get retry
134 # (mainly excercised at restart case)
135 self._config_xpath = "C,/cm-config"
136 self._opdata_xpath = "D,/rw-conman:cm-state"
137
138 self.cm_config = conmanY.SoConfig()
139 # RO specific configuration
140 self.ro_config = {}
141 for key in self.cm_config.ro_endpoint.fields:
142 self.ro_config[key] = None
143
144 # Initialize cm-state
145 self.cm_state = {}
146 self.cm_state['cm_nsr'] = []
147 self.cm_state['states'] = "Initialized"
148
149 # Initialize objects to register
150 self.cmdts_obj = ConfigManagerDTS(self._log, self._loop, self, self._dts)
151 self._config_agent_mgr = conagent.RiftCMConfigAgent(
152 self._dts,
153 self._log,
154 self._loop,
155 self,
156 )
157 self.reg_handles = [
158 self.cmdts_obj,
159 self._config_agent_mgr,
160 RiftCM_rpc.RiftCMRPCHandler(self._dts, self._log, self._loop,
161 PretendNsm(
162 self._dts, self._log, self._loop, self)),
163 ]
164
165 def is_nsr_valid(self, nsr_id):
166 if nsr_id in self._nsr_dict:
167 return True
168 return False
169
170 def add_to_pending_tasks(self, task):
171 if self.pending_tasks:
172 for p_task in self.pending_tasks:
173 if p_task['nsrid'] == task['nsrid']:
174 # Already queued
175 return
176 try:
177 self.pending_tasks.append(task)
178 self._log.debug("add_to_pending_tasks (nsrid:%s)",
179 task['nsrid'])
180 if len(self.pending_tasks) == 1:
181 self._loop.create_task(self.ConfigManagerConfig_pending_loop())
182 # TBD - change to info level
183 self._log.debug("Started pending_loop!")
184 except Exception as e:
185 self._log.error("Failed adding to pending tasks (%s)", str(e))
186
187 def del_from_pending_tasks(self, task):
188 try:
189 self.pending_tasks.remove(task)
190 except Exception as e:
191 self._log.error("Failed removing from pending tasks (%s)", str(e))
192
193 @asyncio.coroutine
194 def ConfigManagerConfig_pending_loop(self):
195 loop_sleep = 2
196 while True:
197 yield from asyncio.sleep(loop_sleep, loop=self._loop)
198 """
199 This pending task queue is ordred by events,
200 must finish previous task successfully to be able to go on to the next task
201 """
202 if self.pending_tasks:
203 self._log.debug("self.pending_tasks len=%s", len(self.pending_tasks))
204 task = self.pending_tasks[0]
205 done = False
206 if 'nsrid' in task:
207 nsrid = task['nsrid']
208 self._log.debug("Will execute pending task for NSR id(%s)", nsrid)
209 try:
210 # Try to configure this NSR
211 task['retries'] -= 1
212 done = yield from self.config_NSR(nsrid)
213 self._log.info("self.config_NSR status=%s", done)
214
215 except Exception as e:
216 self._log.error("Failed(%s) configuring NSR(%s)," \
217 "retries remained:%d!",
218 str(e), nsrid, task['retries'])
219 finally:
220 self.pending_tasks.remove(task)
221
222 if done:
223 self._log.debug("Finished pending task NSR id(%s):", nsrid)
224 else:
225 self._log.error("Failed configuring NSR(%s), retries remained:%d!",
226 nsrid, task['retries'])
227
228 # Failed, re-insert (append at the end)
229 # this failed task to be retried later
230 # If any retries remained.
231 if task['retries']:
232 self.pending_tasks.append(task)
233 else:
234 self._log.debug("Stopped pending_loop!")
235 break
236
237 @asyncio.coroutine
238 def register(self):
239 yield from self.register_cm_state_opdata()
240
241 # Initialize all handles that needs to be registered
242 for reg in self.reg_handles:
243 yield from reg.register()
244
245 @asyncio.coroutine
246 def register_cm_state_opdata(self):
247
248 def state_to_string(state):
249 state_dict = {
250 conmanY.RecordState.INIT : "init",
251 conmanY.RecordState.RECEIVED : "received",
252 conmanY.RecordState.CFG_PROCESS : "cfg_process",
253 conmanY.RecordState.CFG_PROCESS_FAILED : "cfg_process_failed",
254 conmanY.RecordState.CFG_SCHED : "cfg_sched",
255 conmanY.RecordState.CFG_DELAY : "cfg_delay",
256 conmanY.RecordState.CONNECTING : "connecting",
257 conmanY.RecordState.FAILED_CONNECTION : "failed_connection",
258 conmanY.RecordState.NETCONF_CONNECTED : "netconf_connected",
259 conmanY.RecordState.NETCONF_SSH_CONNECTED : "netconf_ssh_connected",
260 conmanY.RecordState.RESTCONF_CONNECTED : "restconf_connected",
261 conmanY.RecordState.CFG_SEND : "cfg_send",
262 conmanY.RecordState.CFG_FAILED : "cfg_failed",
263 conmanY.RecordState.READY_NO_CFG : "ready_no_cfg",
264 conmanY.RecordState.READY : "ready",
265 }
266 return state_dict[state]
267
268 @asyncio.coroutine
269 def on_prepare(xact_info, action, ks_path, msg):
270
271 self._log.debug("Received cm-state: msg=%s, action=%s", msg, action)
272
273 if action == rwdts.QueryAction.READ:
274 show_output = conmanY.CmOpdata()
275 show_output.from_dict(self.cm_state)
276 self._log.debug("Responding to SHOW cm-state: %s", self.cm_state)
277 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
278 xpath=self._opdata_xpath,
279 msg=show_output)
280 else:
281 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
282
283 self._log.info("Registering for cm-opdata xpath: %s",
284 self._opdata_xpath)
285
286 try:
287 handler=rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
288 yield from self._dts.register(xpath=self._opdata_xpath,
289 handler=handler,
290 flags=rwdts.Flag.PUBLISHER)
291 self._log.info("Successfully registered for opdata(%s)", self._opdata_xpath)
292 except Exception as e:
293 self._log.error("Failed to register for opdata as (%s)", e)
294
295 @asyncio.coroutine
296 def process_nsd_vnf_configuration(self, nsr_obj, vnfr):
297
298 def get_config_method(vnf_config):
299 cfg_types = ['netconf', 'juju', 'script']
300 for method in cfg_types:
301 if method in vnf_config:
302 return method
303 return None
304
305 def get_cfg_file_extension(method, configuration_options):
306 ext_dict = {
307 "netconf" : "xml",
308 "script" : {
309 "bash" : "sh",
310 "expect" : "exp",
311 },
312 "juju" : "yml"
313 }
314
315 if method == "netconf":
316 return ext_dict[method]
317 elif method == "script":
318 return ext_dict[method][configuration_options['script_type']]
319 elif method == "juju":
320 return ext_dict[method]
321 else:
322 return "cfg"
323
324 # This is how the YAML file should look like,
325 # This routine will be called for each VNF, so keep appending the file.
326 # priority order is determined by the number,
327 # hence no need to generate the file in that order. A dictionary will be
328 # used that will take care of the order by number.
329 '''
330 1 : <== This is priority
331 name : trafsink_vnfd
332 member_vnf_index : 2
333 configuration_delay : 120
334 configuration_type : netconf
335 configuration_options :
336 username : admin
337 password : admin
338 port : 2022
339 target : running
340 2 :
341 name : trafgen_vnfd
342 member_vnf_index : 1
343 configuration_delay : 0
344 configuration_type : netconf
345 configuration_options :
346 username : admin
347 password : admin
348 port : 2022
349 target : running
350 '''
351
352 # Save some parameters needed as short cuts in flat structure (Also generated)
353 vnf_cfg = vnfr['vnf_cfg']
354 # Prepare unique name for this VNF
355 vnf_cfg['vnf_unique_name'] = get_vnf_unique_name(
356 vnf_cfg['nsr_name'], vnfr['short_name'], vnfr['member_vnf_index_ref'])
357
358 nsr_obj.cfg_path_prefix = '{}/{}_{}'.format(
359 nsr_obj.this_nsr_dir, vnfr['short_name'], vnfr['member_vnf_index_ref'])
360 nsr_vnfr = '{}/{}_{}'.format(
361 vnf_cfg['nsr_name'], vnfr['short_name'], vnfr['member_vnf_index_ref'])
362
363 # Get vnf_configuration from vnfr
364 vnf_config = vnfr['vnf_configuration']
365
366 self._log.debug("vnf_configuration = %s", vnf_config)
367
368 # Create priority dictionary
369 cfg_priority_order = 0
370 if ('config_attributes' in vnf_config and
371 'config_priority' in vnf_config['config_attributes']):
372 cfg_priority_order = vnf_config['config_attributes']['config_priority']
373
374 if cfg_priority_order not in nsr_obj.nsr_cfg_config_attributes_dict:
375 # No VNFR with this priority yet, initialize the list
376 nsr_obj.nsr_cfg_config_attributes_dict[cfg_priority_order] = []
377
378 method = get_config_method(vnf_config)
379 if method is not None:
380 # Create all sub dictionaries first
381 config_priority = {
382 'id' : vnfr['id'],
383 'name' : vnfr['short_name'],
384 'member_vnf_index' : vnfr['member_vnf_index_ref'],
385 }
386
387 if 'config_delay' in vnf_config['config_attributes']:
388 config_priority['configuration_delay'] = vnf_config['config_attributes']['config_delay']
389 vnf_cfg['config_delay'] = config_priority['configuration_delay']
390
391 configuration_options = {}
392 self._log.debug("config method=%s", method)
393 config_priority['configuration_type'] = method
394 vnf_cfg['config_method'] = method
395
396 # Set config agent based on method
397 self._config_agent_mgr.set_config_agent(
398 nsr_obj.agent_nsr, vnf_cfg['agent_vnfr'], method)
399
400 cfg_opt_list = [
401 'port', 'target', 'script_type', 'ip_address', 'user', 'secret',
402 ]
403 for cfg_opt in cfg_opt_list:
404 if cfg_opt in vnf_config[method]:
405 configuration_options[cfg_opt] = vnf_config[method][cfg_opt]
406 vnf_cfg[cfg_opt] = configuration_options[cfg_opt]
407
408 cfg_opt_list = ['mgmt_ip_address', 'username', 'password']
409 for cfg_opt in cfg_opt_list:
410 if cfg_opt in vnf_config['config_access']:
411 configuration_options[cfg_opt] = vnf_config['config_access'][cfg_opt]
412 vnf_cfg[cfg_opt] = configuration_options[cfg_opt]
413
414 # Add to the cp_dict
415 vnf_cp_dict = nsr_obj._cp_dict[vnfr['member_vnf_index_ref']]
416 vnf_cp_dict['rw_mgmt_ip'] = vnf_cfg['mgmt_ip_address']
417 vnf_cp_dict['rw_username'] = vnf_cfg['username']
418 vnf_cp_dict['rw_password'] = vnf_cfg['password']
419
420
421 # TBD - see if we can neatly include the config in "config_attributes" file, no need though
422 #config_priority['config_template'] = vnf_config['config_template']
423 # Create config file
424 vnf_cfg['juju_script'] = os.path.join(self._parent.cfg_dir, 'juju_if.py')
425
426 if 'config_template' in vnf_config:
427 vnf_cfg['cfg_template'] = '{}_{}_template.cfg'.format(nsr_obj.cfg_path_prefix, config_priority['configuration_type'])
428 vnf_cfg['cfg_file'] = '{}.{}'.format(nsr_obj.cfg_path_prefix, get_cfg_file_extension(method, configuration_options))
429 vnf_cfg['xlate_script'] = os.path.join(self._parent.cfg_dir, 'xlate_cfg.py')
430 try:
431 # Now write this template into file
432 with open(vnf_cfg['cfg_template'], "w") as cf:
433 cf.write(vnf_config['config_template'])
434 except Exception as e:
435 self._log.error("Processing NSD, failed to generate configuration template : %s (Error : %s)",
436 vnf_config['config_template'], str(e))
437 raise
438
439 self._log.debug("VNF endpoint so far: %s", vnf_cfg)
440
441 # Populate filled up dictionary
442 config_priority['configuration_options'] = configuration_options
443 nsr_obj.nsr_cfg_config_attributes_dict[cfg_priority_order].append(config_priority)
444 nsr_obj.num_vnfs_to_cfg += 1
445 nsr_obj._vnfr_dict[vnf_cfg['vnf_unique_name']] = vnfr
446 nsr_obj._vnfr_dict[vnfr['id']] = vnfr
447
448 self._log.debug("VNF:(%s) config_attributes = %s",
449 log_this_vnf(vnfr['vnf_cfg']),
450 nsr_obj.nsr_cfg_config_attributes_dict)
451 else:
452 self._log.info("VNF:(%s) is not to be configured by Configuration Manager!",
453 log_this_vnf(vnfr['vnf_cfg']))
454 yield from nsr_obj.update_vnf_cm_state(vnfr, conmanY.RecordState.READY_NO_CFG)
455
456 # Update the cm-state
457 nsr_obj.populate_vm_state_from_vnf_cfg()
458
459 @asyncio.coroutine
460 def config_NSR(self, id):
461
462 def my_yaml_dump(config_attributes_dict, yf):
463
464 yaml_dict = dict(sorted(config_attributes_dict.items()))
465 yf.write(yaml.dump(yaml_dict, default_flow_style=False))
466
467 nsr_dict = self._nsr_dict
468 self._log.info("Configure NSR, id = %s", id)
469
470 #####################TBD###########################
471 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_create_nsr', self.id, self._nsd)
472 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_nsr_active', self.id, self._vnfrs)
473
474 try:
475 if id not in nsr_dict:
476 nsr_obj = ConfigManagerNSR(self._log, self._loop, self, id)
477 nsr_dict[id] = nsr_obj
478 else:
479 self._log.info("NSR(%s) is already initialized!", id)
480 nsr_obj = nsr_dict[id]
481 except Exception as e:
482 self._log.error("Failed creating NSR object for (%s) as (%s)", id, str(e))
483 raise
484
485 # Try to configure this NSR only if not already processed
486 if nsr_obj.cm_nsr['state'] != nsr_obj.state_to_string(conmanY.RecordState.INIT):
487 self._log.debug("NSR(%s) is already processed, state=%s",
488 nsr_obj.nsr_name, nsr_obj.cm_nsr['state'])
489 yield from nsr_obj.publish_cm_state()
490 return True
491
492 cmdts_obj = self.cmdts_obj
493 try:
494 # Fetch NSR
495 nsr = yield from cmdts_obj.get_nsr(id)
496 self._log.debug("Full NSR : %s", nsr)
497 if nsr['operational_status'] != "running":
498 self._log.info("NSR(%s) is not ready yet!", nsr['nsd_name_ref'])
499 return False
500 self._nsr = nsr
501
502 # Create Agent NSR class
503 nsr_config = yield from cmdts_obj.get_nsr_config(id)
504 self._log.debug("NSR {} config: {}".format(id, nsr_config))
505 nsr_obj.agent_nsr = riftcm_config_plugin.RiftCMnsr(nsr, nsr_config)
506
507 try:
508 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.RECEIVED)
509
510 # Parse NSR
511 if nsr is not None:
512 nsr_obj.set_nsr_name(nsr['name_ref'])
513 nsr_dir = os.path.join(self._parent.cfg_dir, nsr_obj.nsr_name)
514 self._log.info("Checking NS config directory: %s", nsr_dir)
515 if not os.path.isdir(nsr_dir):
516 os.makedirs(nsr_dir)
517 # self._log.critical("NS %s is not to be configured by Service Orchestrator!", nsr_obj.nsr_name)
518 # yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.READY_NO_CFG)
519 # return
520
521 nsr_obj.set_config_dir(self)
522
523 for const_vnfr in nsr['constituent_vnfr_ref']:
524 self._log.debug("Fetching VNFR (%s)", const_vnfr['vnfr_id'])
525 vnfr_msg = yield from cmdts_obj.get_vnfr(const_vnfr['vnfr_id'])
526 if vnfr_msg:
527 vnfr = vnfr_msg.as_dict()
528 self._log.info("create VNF:{}/{}".format(nsr_obj.nsr_name, vnfr['short_name']))
529 agent_vnfr = yield from nsr_obj.add_vnfr(vnfr, vnfr_msg)
530
531 # Preserve order, self.process_nsd_vnf_configuration()
532 # sets up the config agent based on the method
533 yield from self.process_nsd_vnf_configuration(nsr_obj, vnfr)
534 yield from self._config_agent_mgr.invoke_config_agent_plugins(
535 'notify_create_vnfr',
536 nsr_obj.agent_nsr,
537 agent_vnfr)
538
539 #####################TBD###########################
540 # self._log.debug("VNF active. Apply initial config for vnfr {}".format(vnfr.name))
541 # yield from self._config_agent_mgr.invoke_config_agent_plugins('apply_initial_config',
542 # vnfr.id, vnfr)
543 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_terminate_vnf', self.id, vnfr)
544
545 except Exception as e:
546 self._log.error("Failed processing NSR (%s) as (%s)", nsr_obj.nsr_name, str(e))
547 self._log.exception(e)
548 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_PROCESS_FAILED)
549 raise
550
551 try:
552 # Generate config_config_attributes.yaml (For debug reference)
553 with open(nsr_obj.config_attributes_file, "w") as yf:
554 my_yaml_dump(nsr_obj.nsr_cfg_config_attributes_dict, yf)
555 except Exception as e:
556 self._log.error("NS:(%s) failed to write config attributes file as (%s)", nsr_obj.nsr_name, str(e))
557
558 try:
559 # Generate nsr_xlate_dict.yaml (For debug reference)
560 with open(nsr_obj.xlate_dict_file, "w") as yf:
561 yf.write(yaml.dump(nsr_obj._cp_dict, default_flow_style=False))
562 except Exception as e:
563 self._log.error("NS:(%s) failed to write nsr xlate tags file as (%s)", nsr_obj.nsr_name, str(e))
564
565 self._log.debug("Starting to configure each VNF")
566
567 # Check if this NS has input parametrs
568 self._log.info("Checking NS configuration order: %s", nsr_obj.config_attributes_file)
569
570 if os.path.exists(nsr_obj.config_attributes_file):
571 # Apply configuration is specified order
572 try:
573 # Go in loop to configure by specified order
574 self._log.info("Using Dynamic configuration input parametrs for NS: %s", nsr_obj.nsr_name)
575
576 # cfg_delay = nsr_obj.nsr_cfg_config_attributes_dict['configuration_delay']
577 # if cfg_delay:
578 # self._log.info("Applying configuration delay for NS (%s) ; %d seconds",
579 # nsr_obj.nsr_name, cfg_delay)
580 # yield from asyncio.sleep(cfg_delay, loop=self._loop)
581
582 for config_attributes_dict in nsr_obj.nsr_cfg_config_attributes_dict.values():
583 # Iterate through each priority level
584 for vnf_config_attributes_dict in config_attributes_dict:
585 # Iterate through each vnfr at this priority level
586
587 # Make up vnf_unique_name with vnfd name and member index
588 #vnfr_name = "{}.{}".format(nsr_obj.nsr_name, vnf_config_attributes_dict['name'])
589 vnf_unique_name = get_vnf_unique_name(
590 nsr_obj.nsr_name,
591 vnf_config_attributes_dict['name'],
592 str(vnf_config_attributes_dict['member_vnf_index']),
593 )
594 self._log.info("NS (%s) : VNF (%s) - Processing configuration attributes",
595 nsr_obj.nsr_name, vnf_unique_name)
596
597 # Find vnfr for this vnf_unique_name
598 if vnf_unique_name not in nsr_obj._vnfr_dict:
599 self._log.error("NS (%s) - Can not find VNF to be configured: %s", nsr_obj.nsr_name, vnf_unique_name)
600 else:
601 # Save this unique VNF's config input parameters
602 nsr_obj.vnf_config_attributes_dict[vnf_unique_name] = vnf_config_attributes_dict
603 nsr_obj.ConfigVNF(nsr_obj._vnfr_dict[vnf_unique_name])
604
605 # Now add the entire NS to the pending config list.
606 self._log.info("Scheduling NSR:{} configuration".format(nsr_obj.nsr_name))
607 self._parent.add_to_pending(nsr_obj)
608 self._parent.add_nsr_obj(nsr_obj)
609
610 except Exception as e:
611 self._log.error("Failed processing input parameters for NS (%s) as %s", nsr_obj.nsr_name, str(e))
612 raise
613 else:
614 self._log.error("No configuration input parameters for NSR (%s)", nsr_obj.nsr_name)
615
616 except Exception as e:
617 self._log.error("Failed to configure NS (%s) as (%s)", nsr_obj.nsr_name, str(e))
618 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_PROCESS_FAILED)
619 raise
620
621 return True
622
623 @asyncio.coroutine
624 def terminate_NSR(self, id):
625 nsr_dict = self._nsr_dict
626 if id not in nsr_dict:
627 self._log.error("NSR(%s) does not exist!", id)
628 return
629 else:
630 # Remove this NSR if we have it on pending task list
631 for task in self.pending_tasks:
632 if task['nsrid'] == id:
633 self.del_from_pending_tasks(task)
634
635 # Remove this object from global list
636 nsr_obj = nsr_dict.pop(id, None)
637
638 # Remove this NS cm-state from global status list
639 self.cm_state['cm_nsr'].remove(nsr_obj.cm_nsr)
640
641 # Also remove any scheduled configuration event
642 for nsr_obj_p in self._parent.pending_cfg:
643 if nsr_obj_p == nsr_obj:
644 assert id == nsr_obj_p._nsr_id
645 #self._parent.pending_cfg.remove(nsr_obj_p)
646 # Mark this as being deleted so we do not try to configure it if we are in cfg_delay (will wake up and continue to process otherwise)
647 nsr_obj_p.being_deleted = True
648 self._log.info("Removed scheduled configuration for NSR(%s)", nsr_obj.nsr_name)
649
650 self._parent.remove_nsr_obj(id)
651
652 # Call Config Agent to clean up for each VNF
653 for agent_vnfr in nsr_obj.agent_nsr.vnfrs:
654 yield from self._config_agent_mgr.invoke_config_agent_plugins(
655 'notify_terminate_vnfr',
656 nsr_obj.agent_nsr,
657 agent_vnfr)
658
659 # publish delete cm-state (cm-nsr)
660 yield from nsr_obj.delete_cm_nsr()
661
662 #####################TBD###########################
663 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_terminate_ns', self.id)
664
665 self._log.info("NSR(%s/%s) is deleted", nsr_obj.nsr_name, id)
666
667 @asyncio.coroutine
668 def process_initial_config(self, nsr_obj, conf, script, vnfr_name=None):
669 '''Apply the initial-config-primitives specified in NSD or VNFD'''
670
671 def get_input_file(parameters):
672 inp = {}
673
674 # Add NSR name to file
675 inp['nsr_name'] = nsr_obj.nsr_name
676
677 # Add VNFR name if available
678 if vnfr_name:
679 inp['vnfr_name'] = vnfr_name
680
681 # TODO (pjoseph): Add config agents, we need to identify which all
682 # config agents are required from this NS and provide only those
683 inp['config-agent'] = {}
684
685 # Add parameters for initial config
686 inp['parameter'] = {}
687 for parameter in parameters:
688 try:
689 inp['parameter'][parameter['name']] = parameter['value']
690 except KeyError as e:
691 if vnfr_name:
692 self._log.info("VNFR {} initial config parameter {} with no value: {}".
693 format(vnfr_name, parameter, e))
694 else:
695 self._log.info("NSR {} initial config parameter {} with no value: {}".
696 format(nsr_obj.nsr_name, parameter, e))
697
698
699 # Add vnfrs specific data
700 inp['vnfr'] = {}
701 for vnfr in nsr_obj.vnfrs:
702 v = {}
703
704 v['name'] = vnfr['name']
705 v['mgmt_ip_address'] = vnfr['vnf_cfg']['mgmt_ip_address']
706 v['mgmt_port'] = vnfr['vnf_cfg']['port']
707
708 if 'dashboard_url' in vnfr:
709 v['dashboard_url'] = vnfr['dashboard_url']
710
711 if 'connection_point' in vnfr:
712 v['connection_point'] = []
713 for cp in vnfr['connection_point']:
714 v['connection_point'].append(
715 {
716 'name': cp['name'],
717 'ip_address': cp['ip_address'],
718 }
719 )
720
721 v['vdur'] = []
722 vdu_data = []
723 for vdu in vnfr['vdur']:
724 d = {}
725 for k in ['name','management_ip', 'vm_management_ip', 'id']:
726 if k in vdu:
727 d[k] = vdu[k]
728 vdu_data.append(d)
729 v['vdur'].append(vdu_data)
730
731 inp['vnfr'][vnfr['member_vnf_index_ref']] = v
732
733 self._log.debug("Input data for {}: {}".
734 format((vnfr_name if vnfr_name else nsr_obj.nsr_name),
735 inp))
736
737 # Convert to YAML string
738 yaml_string = yaml.dump(inp, default_flow_style=False)
739
740 # Write the inputs as yaml file
741 tmp_file = None
742 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
743 tmp_file.write(yaml_string.encode("UTF-8"))
744 self._log.debug("Input file created for {}: {}".
745 format((vnfr_name if vnfr_name \
746 else nsr_obj.nsr_name),
747 tmp_file.name))
748
749 return tmp_file.name
750
751 parameters = []
752 try:
753 parameters = conf['parameter']
754 except Exception as e:
755 self._log.debug("Parameter conf: {}, e: {}".
756 format(conf, e))
757
758 inp_file = get_input_file(parameters)
759
760 cmd = "{0} {1}".format(script, inp_file)
761 self._log.debug("Running the CMD: {}".format(cmd))
762
763 process = yield from asyncio.create_subprocess_shell(cmd,
764 loop=self._loop)
765 yield from process.wait()
766
767 if process.returncode:
768 msg = "NSR/VNFR {} initial config using {} failed with {}". \
769 format(vnfr_name if vnfr_name else nsr_obj.nsr_name,
770 script, process.returncode)
771 self._log.error(msg)
772 raise InitialConfigError(msg)
773 else:
774 # os.remove(inp_file)
775 pass
776
777 def get_script_file(self, script_name, d_name, d_id, d_type):
778 # Get the full path to the script
779 script = ''
780 # If script name starts with /, assume it is full path
781 if script_name[0] == '/':
782 # The script has full path, use as is
783 script = script_name
784 else:
785 script = os.path.join(os.environ['RIFT_ARTIFACTS'],
786 'launchpad/packages',
787 d_type,
788 d_id,
789 d_name,
790 'scripts',
791 script_name)
792 self._log.debug("Checking for script at %s", script)
793 if not os.path.exists(script):
794 self._log.debug("Did not find script %s", script)
795 script = os.path.join(os.environ['RIFT_INSTALL'],
796 'usr/bin',
797 script_name)
798
799 # Seen cases in jenkins, where the script execution fails
800 # with permission denied. Setting the permission on script
801 # to make sure it has execute permission
802 perm = os.stat(script).st_mode
803 if not (perm & stat.S_IXUSR):
804 self._log.warn("NSR/VNFR {} initial config script {} " \
805 "without execute permission: {}".
806 format(d_name, script, perm))
807 os.chmod(script, perm | stat.S_IXUSR)
808 return script
809
810 @asyncio.coroutine
811 def process_ns_initial_config(self, nsr_obj):
812 '''Apply the initial-config-primitives specified in NSD'''
813
814 nsr = yield from self.cmdts_obj.get_nsr(nsr_obj.nsr_id)
815 if 'initial_config_primitive' not in nsr:
816 return
817
818 if nsr is not None:
819 nsd = yield from self.cmdts_obj.get_nsd(nsr_obj.nsr_id)
820 for conf in nsr['initial_config_primitive']:
821 self._log.debug("NSR {} initial config: {}".
822 format(nsr_obj.nsr_name, conf))
823 script = self.get_script_file(conf['user_defined_script'],
824 nsd.name,
825 nsd.id,
826 'nsd')
827
828 yield from self.process_initial_config(nsr_obj, conf, script)
829
830 @asyncio.coroutine
831 def process_vnf_initial_config(self, nsr_obj, vnfr):
832 '''Apply the initial-config-primitives specified in VNFD'''
833
834 vnfr_name = vnfr.name
835
836 vnfd = vnfr.vnfd
837 vnf_cfg = vnfd.vnf_configuration
838
839 for conf in vnf_cfg.initial_config_primitive:
840 self._log.debug("VNFR {} initial config: {}".
841 format(vnfr_name, conf))
842
843 if not conf.user_defined_script:
844 self._log.debug("VNFR {} did not fine user defined script: {}".
845 format(vnfr_name, conf))
846 continue
847
848 script = self.get_script_file(conf.user_defined_script,
849 vnfd.id,
850 vnfd.name,
851 'vnfd')
852
853 yield from self.process_initial_config(nsr_obj,
854 conf.as_dict(),
855 script,
856 vnfr_name=vnfr_name)
857
858
859 class ConfigManagerNSR(object):
860 def __init__(self, log, loop, parent, id):
861 self._log = log
862 self._loop = loop
863 self._rwcal = None
864 self._vnfr_dict = {}
865 self._cp_dict = {}
866 self._nsr_id = id
867 self._parent = parent
868 self._log.info("Instantiated NSR entry for id=%s", id)
869 self.nsr_cfg_config_attributes_dict = {}
870 self.vnf_config_attributes_dict = {}
871 self.num_vnfs_to_cfg = 0
872 self._vnfr_list = []
873 self.vnf_cfg_list = []
874 self.this_nsr_dir = None
875 self.being_deleted = False
876 self.dts_obj = self._parent.cmdts_obj
877
878 # Initialize cm-state for this NS
879 self.cm_nsr = {}
880 self.cm_nsr['cm_vnfr'] = []
881 self.cm_nsr['id'] = id
882 self.cm_nsr['state'] = self.state_to_string(conmanY.RecordState.INIT)
883 self.cm_nsr['state_details'] = None
884
885 self.set_nsr_name('Not Set')
886
887 # Add this NSR cm-state object to global cm-state
888 parent.cm_state['cm_nsr'].append(self.cm_nsr)
889
890 # Place holders for NSR & VNFR classes
891 self.agent_nsr = None
892
893 @property
894 def nsr_opdata_xpath(self):
895 ''' Returns full xpath for this NSR cm-state opdata '''
896 return(
897 "D,/rw-conman:cm-state" +
898 "/rw-conman:cm-nsr[rw-conman:id='{}']"
899 ).format(self._nsr_id)
900
901 @property
902 def vnfrs(self):
903 return self._vnfr_list
904
905 @property
906 def parent(self):
907 return self._parent
908
909 @property
910 def nsr_id(self):
911 return self._nsr_id
912
913 @asyncio.coroutine
914 def publish_cm_state(self):
915 ''' This function publishes cm_state for this NSR '''
916
917 cm_state = conmanY.CmOpdata()
918 cm_state_nsr = cm_state.cm_nsr.add()
919 cm_state_nsr.from_dict(self.cm_nsr)
920 #with self._dts.transaction() as xact:
921 yield from self.dts_obj.update(self.nsr_opdata_xpath, cm_state_nsr)
922 self._log.info("Published cm-state with xpath %s and nsr %s",
923 self.nsr_opdata_xpath,
924 cm_state_nsr)
925
926 @asyncio.coroutine
927 def delete_cm_nsr(self):
928 ''' This function publishes cm_state for this NSR '''
929
930 yield from self.dts_obj.delete(self.nsr_opdata_xpath)
931 self._log.info("Deleted cm-nsr with xpath %s",
932 self.nsr_opdata_xpath)
933
934 def set_nsr_name(self, name):
935 self.nsr_name = name
936 self.cm_nsr['name'] = name
937
938 def set_config_dir(self, caller):
939 self.this_nsr_dir = os.path.join(
940 caller._parent.cfg_dir, self.nsr_name, caller._nsr['name_ref'])
941 if not os.path.exists(self.this_nsr_dir):
942 os.makedirs(self.this_nsr_dir)
943 self._log.debug("NSR:(%s), Created configuration directory(%s)",
944 caller._nsr['name_ref'], self.this_nsr_dir)
945 self.config_attributes_file = os.path.join(self.this_nsr_dir, "configuration_config_attributes.yml")
946 self.xlate_dict_file = os.path.join(self.this_nsr_dir, "nsr_xlate_dict.yml")
947
948 def xlate_conf(self, vnfr, vnf_cfg):
949
950 # If configuration type is not already set, try to read from attributes
951 if vnf_cfg['interface_type'] is None:
952 # Prepare unique name for this VNF
953 vnf_unique_name = get_vnf_unique_name(
954 vnf_cfg['nsr_name'],
955 vnfr['short_name'],
956 vnfr['member_vnf_index_ref'],
957 )
958
959 # Find this particular (unique) VNF's config attributes
960 if (vnf_unique_name in self.vnf_config_attributes_dict):
961 vnf_cfg_config_attributes_dict = self.vnf_config_attributes_dict[vnf_unique_name]
962 vnf_cfg['interface_type'] = vnf_cfg_config_attributes_dict['configuration_type']
963 if 'configuration_options' in vnf_cfg_config_attributes_dict:
964 cfg_opts = vnf_cfg_config_attributes_dict['configuration_options']
965 for key, value in cfg_opts.items():
966 vnf_cfg[key] = value
967
968 cfg_path_prefix = '{}/{}/{}_{}'.format(
969 self._parent._parent.cfg_dir,
970 vnf_cfg['nsr_name'],
971 vnfr['short_name'],
972 vnfr['member_vnf_index_ref'],
973 )
974
975 vnf_cfg['cfg_template'] = '{}_{}_template.cfg'.format(cfg_path_prefix, vnf_cfg['interface_type'])
976 vnf_cfg['cfg_file'] = '{}.cfg'.format(cfg_path_prefix)
977 vnf_cfg['xlate_script'] = self._parent._parent.cfg_dir + '/xlate_cfg.py'
978
979 self._log.debug("VNF endpoint so far: %s", vnf_cfg)
980
981 self._log.info("Checking cfg_template %s", vnf_cfg['cfg_template'])
982 if os.path.exists(vnf_cfg['cfg_template']):
983 return True
984 return False
985
986 def ConfigVNF(self, vnfr):
987
988 vnf_cfg = vnfr['vnf_cfg']
989 vnf_cm_state = self.find_or_create_vnfr_cm_state(vnf_cfg)
990
991 if (vnf_cm_state['state'] == self.state_to_string(conmanY.RecordState.READY_NO_CFG)
992 or
993 vnf_cm_state['state'] == self.state_to_string(conmanY.RecordState.READY)):
994 self._log.warning("NS/VNF (%s/%s) is already configured! Skipped.", self.nsr_name, vnfr['short_name'])
995 return
996
997 #UPdate VNF state
998 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS)
999
1000 # Now translate the configuration for iP addresses
1001 try:
1002 # Add cp_dict members (TAGS) for this VNF
1003 self._cp_dict['rw_mgmt_ip'] = vnf_cfg['mgmt_ip_address']
1004 self._cp_dict['rw_username'] = vnf_cfg['username']
1005 self._cp_dict['rw_password'] = vnf_cfg['password']
1006 ############################################################
1007 # TBD - Need to lookup above 3 for a given VNF, not global #
1008 # Once we do that no need to dump below file again before #
1009 # each VNF configuration translation. #
1010 # This will require all existing config templates to be #
1011 # changed for above three tags to include member index #
1012 ############################################################
1013 try:
1014 nsr_obj = vnf_cfg['nsr_obj']
1015 # Generate config_config_attributes.yaml (For debug reference)
1016 with open(nsr_obj.xlate_dict_file, "w") as yf:
1017 yf.write(yaml.dump(nsr_obj._cp_dict, default_flow_style=False))
1018 except Exception as e:
1019 self._log.error("NS:(%s) failed to write nsr xlate tags file as (%s)", nsr_obj.nsr_name, str(e))
1020
1021 if 'cfg_template' in vnf_cfg:
1022 script_cmd = 'python3 {} -i {} -o {} -x "{}"'.format(vnf_cfg['xlate_script'], vnf_cfg['cfg_template'], vnf_cfg['cfg_file'], self.xlate_dict_file)
1023 self._log.debug("xlate script command (%s)", script_cmd)
1024 #xlate_msg = subprocess.check_output(script_cmd).decode('utf-8')
1025 xlate_msg = subprocess.check_output(script_cmd, shell=True).decode('utf-8')
1026 self._log.info("xlate script output (%s)", xlate_msg)
1027 except Exception as e:
1028 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS_FAILED)
1029 self._log.error("Failed to execute translation script for VNF: %s with (%s)", log_this_vnf(vnf_cfg), str(e))
1030 return
1031
1032 self._log.info("Applying config to VNF: %s = %s!", log_this_vnf(vnf_cfg), vnf_cfg)
1033 try:
1034 #self.vnf_cfg_list.append(vnf_cfg)
1035 self._log.debug("Scheduled configuration!")
1036 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_SCHED)
1037 except Exception as e:
1038 self._log.error("Failed apply_vnf_config to VNF: %s as (%s)", log_this_vnf(vnf_cfg), str(e))
1039 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS_FAILED)
1040 raise
1041
1042 def add(self, nsr):
1043 self._log.info("Adding NS Record for id=%s", id)
1044 self._nsr = nsr
1045
1046 def sample_cm_state(self):
1047 return (
1048 {
1049 'cm_nsr': [
1050 {
1051 'cm_vnfr': [
1052 {
1053 'cfg_location': 'location1',
1054 'cfg_type': 'script',
1055 'connection_point': [
1056 {'ip_address': '1.1.1.1', 'name': 'vnf1cp1'},
1057 {'ip_address': '1.1.1.2', 'name': 'vnf1cp2'}
1058 ],
1059 'id': 'vnfrid1',
1060 'mgmt_interface': {'ip_address': '7.1.1.1',
1061 'port': 1001},
1062 'name': 'vnfrname1',
1063 'state': 'init'
1064 },
1065 {
1066 'cfg_location': 'location2',
1067 'cfg_type': 'netconf',
1068 'connection_point': [{'ip_address': '2.1.1.1', 'name': 'vnf2cp1'},
1069 {'ip_address': '2.1.1.2', 'name': 'vnf2cp2'}],
1070 'id': 'vnfrid2',
1071 'mgmt_interface': {'ip_address': '7.1.1.2',
1072 'port': 1001},
1073 'name': 'vnfrname2',
1074 'state': 'init'}
1075 ],
1076 'id': 'nsrid1',
1077 'name': 'nsrname1',
1078 'state': 'init'}
1079 ],
1080 'states': 'Initialized, '
1081 })
1082
1083 def populate_vm_state_from_vnf_cfg(self):
1084 # Fill in each VNFR from this nsr object
1085 vnfr_list = self._vnfr_list
1086 for vnfr in vnfr_list:
1087 vnf_cfg = vnfr['vnf_cfg']
1088 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1089
1090 if vnf_cm_state:
1091 # Fill in VNF management interface
1092 vnf_cm_state['mgmt_interface']['ip_address'] = vnf_cfg['mgmt_ip_address']
1093 vnf_cm_state['mgmt_interface']['port'] = vnf_cfg['port']
1094
1095 # Fill in VNF configuration details
1096 vnf_cm_state['cfg_type'] = vnf_cfg['config_method']
1097 vnf_cm_state['cfg_location'] = vnf_cfg['cfg_file']
1098
1099 # Fill in each connection-point for this VNF
1100 if "connection_point" in vnfr:
1101 cp_list = vnfr['connection_point']
1102 for cp_item_dict in cp_list:
1103 vnf_cm_state['connection_point'].append(
1104 {
1105 'name' : cp_item_dict['name'],
1106 'ip_address' : cp_item_dict['ip_address'],
1107 }
1108 )
1109
1110 def state_to_string(self, state):
1111 state_dict = {
1112 conmanY.RecordState.INIT : "init",
1113 conmanY.RecordState.RECEIVED : "received",
1114 conmanY.RecordState.CFG_PROCESS : "cfg_process",
1115 conmanY.RecordState.CFG_PROCESS_FAILED : "cfg_process_failed",
1116 conmanY.RecordState.CFG_SCHED : "cfg_sched",
1117 conmanY.RecordState.CFG_DELAY : "cfg_delay",
1118 conmanY.RecordState.CONNECTING : "connecting",
1119 conmanY.RecordState.FAILED_CONNECTION : "failed_connection",
1120 conmanY.RecordState.NETCONF_CONNECTED : "netconf_connected",
1121 conmanY.RecordState.NETCONF_SSH_CONNECTED : "netconf_ssh_connected",
1122 conmanY.RecordState.RESTCONF_CONNECTED : "restconf_connected",
1123 conmanY.RecordState.CFG_SEND : "cfg_send",
1124 conmanY.RecordState.CFG_FAILED : "cfg_failed",
1125 conmanY.RecordState.READY_NO_CFG : "ready_no_cfg",
1126 conmanY.RecordState.READY : "ready",
1127 }
1128 return state_dict[state]
1129
1130 def find_vnfr_cm_state(self, id):
1131 if self.cm_nsr['cm_vnfr']:
1132 for vnf_cm_state in self.cm_nsr['cm_vnfr']:
1133 if vnf_cm_state['id'] == id:
1134 return vnf_cm_state
1135 return None
1136
1137 def find_or_create_vnfr_cm_state(self, vnf_cfg):
1138 vnfr = vnf_cfg['vnfr']
1139 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1140
1141 if vnf_cm_state is None:
1142 # Not found, Create and Initialize this VNF cm-state
1143 vnf_cm_state = {
1144 'id' : vnfr['id'],
1145 'name' : vnfr['short_name'],
1146 'state' : self.state_to_string(conmanY.RecordState.RECEIVED),
1147 'mgmt_interface' :
1148 {
1149 'ip_address' : vnf_cfg['mgmt_ip_address'],
1150 'port' : vnf_cfg['port'],
1151 },
1152 'cfg_type' : vnf_cfg['config_method'],
1153 'cfg_location' : vnf_cfg['cfg_file'],
1154 'connection_point' : [],
1155 }
1156 self.cm_nsr['cm_vnfr'].append(vnf_cm_state)
1157
1158 # Publish newly created cm-state
1159
1160
1161 return vnf_cm_state
1162
1163 @asyncio.coroutine
1164 def get_vnf_cm_state(self, vnfr):
1165 if vnfr:
1166 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1167 if vnf_cm_state:
1168 return vnf_cm_state['state']
1169 return False
1170
1171 @asyncio.coroutine
1172 def update_vnf_cm_state(self, vnfr, state):
1173 if vnfr:
1174 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1175 if vnf_cm_state is None:
1176 self._log.error("No opdata found for NS/VNF:%s/%s!",
1177 self.nsr_name, vnfr['short_name'])
1178 return
1179
1180 if vnf_cm_state['state'] != self.state_to_string(state):
1181 old_state = vnf_cm_state['state']
1182 vnf_cm_state['state'] = self.state_to_string(state)
1183 # Publish new state
1184 yield from self.publish_cm_state()
1185 self._log.info("VNF ({}/{}/{}) state change: {} -> {}"
1186 .format(self.nsr_name,
1187 vnfr['short_name'],
1188 vnfr['member_vnf_index_ref'],
1189 old_state,
1190 vnf_cm_state['state']))
1191
1192 else:
1193 self._log.error("No VNFR supplied for state update (NS=%s)!",
1194 self.nsr_name)
1195
1196 @property
1197 def get_ns_cm_state(self):
1198 return self.cm_nsr['state']
1199
1200 @asyncio.coroutine
1201 def update_ns_cm_state(self, state, state_details=None):
1202 if self.cm_nsr['state'] != self.state_to_string(state):
1203 old_state = self.cm_nsr['state']
1204 self.cm_nsr['state'] = self.state_to_string(state)
1205 self.cm_nsr['state_details'] = state_details if state_details is not None else None
1206 self._log.info("NS ({}) state change: {} -> {}"
1207 .format(self.nsr_name,
1208 old_state,
1209 self.cm_nsr['state']))
1210 # Publish new state
1211 yield from self.publish_cm_state()
1212
1213 @asyncio.coroutine
1214 def add_vnfr(self, vnfr, vnfr_msg):
1215
1216 @asyncio.coroutine
1217 def populate_subnets_from_vlr(id):
1218 try:
1219 # Populate cp_dict with VLR subnet info
1220 vlr = yield from self.dts_obj.get_vlr(id)
1221 if vlr is not None and 'assigned_subnet' in vlr:
1222 subnet = {vlr.name:vlr.assigned_subnet}
1223 self._cp_dict[vnfr['member_vnf_index_ref']].update(subnet)
1224 self._cp_dict.update(subnet)
1225 self._log.debug("VNF:(%s) Updated assigned subnet = %s",
1226 vnfr['short_name'], subnet)
1227 except Exception as e:
1228 self._log.error("VNF:(%s) VLR Error = %s",
1229 vnfr['short_name'], e)
1230
1231 if vnfr['id'] not in self._vnfr_dict:
1232 self._log.info("NSR(%s) : Adding VNF Record for name=%s, id=%s", self._nsr_id, vnfr['short_name'], vnfr['id'])
1233 # Add this vnfr to the list for show, or single traversal
1234 self._vnfr_list.append(vnfr)
1235 else:
1236 self._log.warning("NSR(%s) : VNF Record for name=%s, id=%s already exists, overwriting", self._nsr_id, vnfr['short_name'], vnfr['id'])
1237
1238 # Make vnfr available by id as well as by name
1239 unique_name = get_vnf_unique_name(self.nsr_name, vnfr['short_name'], vnfr['member_vnf_index_ref'])
1240 self._vnfr_dict[unique_name] = vnfr
1241 self._vnfr_dict[vnfr['id']] = vnfr
1242
1243 # Create vnf_cfg dictionary with default values
1244 vnf_cfg = {
1245 'nsr_obj' : self,
1246 'vnfr' : vnfr,
1247 'agent_vnfr' : self.agent_nsr.add_vnfr(vnfr, vnfr_msg),
1248 'nsr_name' : self.nsr_name,
1249 'nsr_id' : self._nsr_id,
1250 'vnfr_name' : vnfr['short_name'],
1251 'member_vnf_index' : vnfr['member_vnf_index_ref'],
1252 'port' : 0,
1253 'username' : 'admin',
1254 'password' : 'admin',
1255 'config_method' : 'None',
1256 'protocol' : 'None',
1257 'mgmt_ip_address' : '0.0.0.0',
1258 'cfg_file' : 'None',
1259 'cfg_retries' : 0,
1260 'script_type' : 'bash',
1261 }
1262
1263 # Update the mgmt ip address
1264 # In case the config method is none, this is not
1265 # updated later
1266 try:
1267 vnf_cfg['mgmt_ip_address'] = vnfr_msg.mgmt_interface.ip_address
1268 vnf_cfg['port'] = vnfr_msg.mgmt_interface.port
1269 except Exception as e:
1270 self._log.warn(
1271 "VNFR {}({}), unable to retrieve mgmt ip address: {}".
1272 format(vnfr['short_name'], vnfr['id'], e))
1273
1274 vnfr['vnf_cfg'] = vnf_cfg
1275 self.find_or_create_vnfr_cm_state(vnf_cfg)
1276
1277 '''
1278 Build the connection-points list for this VNF (self._cp_dict)
1279 '''
1280 # Populate global CP list self._cp_dict from VNFR
1281 cp_list = []
1282 if 'connection_point' in vnfr:
1283 cp_list = vnfr['connection_point']
1284
1285 self._cp_dict[vnfr['member_vnf_index_ref']] = {}
1286 if 'vdur' in vnfr:
1287 for vdur in vnfr['vdur']:
1288 if 'internal_connection_point' in vdur:
1289 cp_list += vdur['internal_connection_point']
1290
1291 for cp_item_dict in cp_list:
1292 # Populate global dictionary
1293 self._cp_dict[
1294 cp_item_dict['name']
1295 ] = cp_item_dict['ip_address']
1296
1297 # Populate unique member specific dictionary
1298 self._cp_dict[
1299 vnfr['member_vnf_index_ref']
1300 ][
1301 cp_item_dict['name']
1302 ] = cp_item_dict['ip_address']
1303
1304 # Fill in the subnets from vlr
1305 if 'vlr_ref' in cp_item_dict:
1306 ### HACK: Internal connection_point do not have VLR reference
1307 yield from populate_subnets_from_vlr(cp_item_dict['vlr_ref'])
1308
1309 if 'internal_vlr' in vnfr:
1310 for ivlr in vnfr['internal_vlr']:
1311 yield from populate_subnets_from_vlr(ivlr['vlr_ref'])
1312
1313 # Update vnfr
1314 vnf_cfg['agent_vnfr']._vnfr = vnfr
1315 return vnf_cfg['agent_vnfr']
1316
1317
1318 class XPaths(object):
1319 @staticmethod
1320 def nsr_opdata(k=None):
1321 return ("D,/nsr:ns-instance-opdata/nsr:nsr" +
1322 ("[nsr:ns-instance-config-ref='{}']".format(k) if k is not None else ""))
1323
1324 @staticmethod
1325 def nsd_msg(k=None):
1326 return ("C,/nsd:nsd-catalog/nsd:nsd" +
1327 "[nsd:id = '{}']".format(k) if k is not None else "")
1328
1329 @staticmethod
1330 def vnfr_opdata(k=None):
1331 return ("D,/vnfr:vnfr-catalog/vnfr:vnfr" +
1332 ("[vnfr:id='{}']".format(k) if k is not None else ""))
1333
1334 @staticmethod
1335 def vnfd(k=None):
1336 return ("C,/vnfd:vnfd-catalog/vnfd:vnfd" +
1337 ("[vnfd:id='{}']".format(k) if k is not None else ""))
1338
1339 @staticmethod
1340 def config_agent(k=None):
1341 return ("D,/rw-config-agent:config-agent/rw-config-agent:account" +
1342 ("[rw-config-agent:name='{}']".format(k) if k is not None else ""))
1343
1344 @staticmethod
1345 def nsr_config(k=None):
1346 return ("C,/nsr:ns-instance-config/nsr:nsr[nsr:id='{}']".format(k) if k is not None else "")
1347
1348 @staticmethod
1349 def vlr(k=None):
1350 return ("D,/vlr:vlr-catalog/vlr:vlr[vlr:id='{}']".format(k) if k is not None else "")
1351
1352 class ConfigManagerDTS(object):
1353 ''' This class either reads from DTS or publishes to DTS '''
1354
1355 def __init__(self, log, loop, parent, dts):
1356 self._log = log
1357 self._loop = loop
1358 self._parent = parent
1359 self._dts = dts
1360
1361 @asyncio.coroutine
1362 def _read_dts(self, xpath, do_trace=False):
1363 self._log.debug("_read_dts path = %s", xpath)
1364 flags = rwdts.XactFlag.MERGE
1365 res_iter = yield from self._dts.query_read(
1366 xpath, flags=flags
1367 )
1368
1369 results = []
1370 try:
1371 for i in res_iter:
1372 result = yield from i
1373 if result is not None:
1374 results.append(result.result)
1375 except:
1376 pass
1377
1378 return results
1379
1380
1381 @asyncio.coroutine
1382 def get_nsr(self, id):
1383 self._log.debug("Attempting to get NSR: %s", id)
1384 nsrl = yield from self._read_dts(XPaths.nsr_opdata(id), False)
1385 nsr = None
1386 if len(nsrl) > 0:
1387 nsr = nsrl[0].as_dict()
1388 return nsr
1389
1390 @asyncio.coroutine
1391 def get_nsr_config(self, id):
1392 self._log.debug("Attempting to get config NSR: %s", id)
1393 nsrl = yield from self._read_dts(XPaths.nsr_config(id), False)
1394 nsr = None
1395 if len(nsrl) > 0:
1396 nsr = nsrl[0]
1397 return nsr
1398
1399 @asyncio.coroutine
1400 def get_nsd_msg(self, id):
1401 self._log.debug("Attempting to get NSD: %s", id)
1402 nsdl = yield from self._read_dts(XPaths.nsd_msg(id), False)
1403 nsd_msg = None
1404 if len(nsdl) > 0:
1405 nsd_msg = nsdl[0]
1406 return nsd_msg
1407
1408 @asyncio.coroutine
1409 def get_nsd(self, nsr_id):
1410 self._log.debug("Attempting to get NSD for NSR: %s", id)
1411 nsr_config = yield from self.get_nsr_config(nsr_id)
1412 nsd_msg = nsr_config.nsd
1413 return nsd_msg
1414
1415 @asyncio.coroutine
1416 def get_vnfr(self, id):
1417 self._log.debug("Attempting to get VNFR: %s", id)
1418 vnfrl = yield from self._read_dts(XPaths.vnfr_opdata(id), do_trace=False)
1419 vnfr_msg = None
1420 if len(vnfrl) > 0:
1421 vnfr_msg = vnfrl[0]
1422 return vnfr_msg
1423
1424 @asyncio.coroutine
1425 def get_vnfd(self, vnfd_id):
1426 self._log.debug("Attempting to get VNFD: %s", vnfd_id)
1427 vnfdl = yield from self._read_dts(XPaths.vnfd(vnfd_id), do_trace=False)
1428 vnfd_msg = None
1429 if len(vnfdl) > 0:
1430 vnfd_msg = vnfdl[0]
1431 return vnfd_msg
1432
1433 @asyncio.coroutine
1434 def get_vlr(self, id):
1435 self._log.debug("Attempting to get VLR subnet: %s", id)
1436 vlrl = yield from self._read_dts(XPaths.vlr(id), do_trace=True)
1437 vlr_msg = None
1438 if len(vlrl) > 0:
1439 vlr_msg = vlrl[0]
1440 return vlr_msg
1441
1442 @asyncio.coroutine
1443 def get_config_agents(self, name):
1444 self._log.debug("Attempting to get config_agents: %s", name)
1445 cfgagentl = yield from self._read_dts(XPaths.config_agent(name), False)
1446 return cfgagentl
1447
1448 @asyncio.coroutine
1449 def update(self, path, msg, flags=rwdts.XactFlag.REPLACE):
1450 """
1451 Update a cm-state (cm-nsr) record in DTS with the path and message
1452 """
1453 self._log.debug("Updating cm-state %s:%s dts_pub_hdl = %s", path, msg, self.dts_pub_hdl)
1454 self.dts_pub_hdl.update_element(path, msg, flags)
1455 self._log.debug("Updated cm-state, %s:%s", path, msg)
1456
1457 @asyncio.coroutine
1458 def delete(self, path):
1459 """
1460 Delete cm-nsr record in DTS with the path only
1461 """
1462 self._log.debug("Deleting cm-nsr %s dts_pub_hdl = %s", path, self.dts_pub_hdl)
1463 self.dts_pub_hdl.delete_element(path)
1464 self._log.debug("Deleted cm-nsr, %s", path)
1465
1466 @asyncio.coroutine
1467 def register(self):
1468 yield from self.register_to_publish()
1469 yield from self.register_for_nsr()
1470
1471 @asyncio.coroutine
1472 def register_to_publish(self):
1473 ''' Register to DTS for publishing cm-state opdata '''
1474
1475 xpath = "D,/rw-conman:cm-state/rw-conman:cm-nsr"
1476 self._log.debug("Registering to publish cm-state @ %s", xpath)
1477 hdl = rift.tasklets.DTS.RegistrationHandler()
1478 with self._dts.group_create() as group:
1479 self.dts_pub_hdl = group.register(xpath=xpath,
1480 handler=hdl,
1481 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ)
1482
1483 @property
1484 def nsr_xpath(self):
1485 return "D,/nsr:ns-instance-opdata/nsr:nsr"
1486
1487 @asyncio.coroutine
1488 def register_for_nsr(self):
1489 """ Register for NSR changes """
1490
1491 @asyncio.coroutine
1492 def on_prepare(xact_info, query_action, ks_path, msg):
1493 """ This NSR is created """
1494 self._log.debug("Received NSR instantiate on_prepare (%s:%s:%s)",
1495 query_action,
1496 ks_path,
1497 msg)
1498
1499 if (query_action == rwdts.QueryAction.UPDATE or
1500 query_action == rwdts.QueryAction.CREATE):
1501 msg_dict = msg.as_dict()
1502 # Update Each NSR/VNFR state)
1503 if ('operational_status' in msg_dict and
1504 msg_dict['operational_status'] == 'running'):
1505 # Add to the task list
1506 self._parent.add_to_pending_tasks({'nsrid' : msg_dict['ns_instance_config_ref'], 'retries' : 5})
1507 elif query_action == rwdts.QueryAction.DELETE:
1508 nsr_id = msg.ns_instance_config_ref
1509 asyncio.ensure_future(self._parent.terminate_NSR(nsr_id), loop=self._loop)
1510 else:
1511 raise NotImplementedError(
1512 "%s action on cm-state not supported",
1513 query_action)
1514
1515 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1516
1517 try:
1518 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
1519 self.dts_reg_hdl = yield from self._dts.register(self.nsr_xpath,
1520 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1521 handler=handler)
1522 except Exception as e:
1523 self._log.error("Failed to register for NSR changes as %s", str(e))
1524
1525