Merge from OSM SO master
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / rwconman_config.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import os
20 import stat
21 import subprocess
22 import sys
23 import tempfile
24 import yaml
25
26 from gi.repository import (
27 RwDts as rwdts,
28 RwConmanYang as conmanY,
29 ProtobufC,
30 )
31
32 import rift.tasklets
33
34 from . import rwconman_conagent as conagent
35 from . import RiftCM_rpc
36 from . import riftcm_config_plugin
37
38 if sys.version_info < (3, 4, 4):
39 asyncio.ensure_future = asyncio.async
40
41 def get_vnf_unique_name(nsr_name, vnfr_short_name, member_vnf_index):
42 return "{}.{}.{}".format(nsr_name, vnfr_short_name, member_vnf_index)
43
44 class ConmanConfigError(Exception):
45 pass
46
47
48 class InitialConfigError(ConmanConfigError):
49 pass
50
51
52 def log_this_vnf(vnf_cfg):
53 log_vnf = ""
54 used_item_list = ['nsr_name', 'vnfr_name', 'member_vnf_index', 'mgmt_ip_address']
55 for item in used_item_list:
56 if item in vnf_cfg:
57 if item == 'mgmt_ip_address':
58 log_vnf += "({})".format(vnf_cfg[item])
59 else:
60 log_vnf += "{}/".format(vnf_cfg[item])
61 return log_vnf
62
63 class PretendNsm(object):
64 def __init__(self, dts, log, loop, parent):
65 self._dts = dts
66 self._log = log
67 self._loop = loop
68 self._parent = parent
69 self._nsrs = {}
70 self._nsr_dict = parent._nsr_dict
71 self._config_agent_plugins = []
72 self._nsd_msg = {}
73
74 @property
75 def nsrs(self):
76 # Expensive, instead use get_nsr, if you know id.
77 self._nsrs = {}
78 # Update the list of nsrs (agent nsr)
79 for id, nsr_obj in self._nsr_dict.items():
80 self._nsrs[id] = nsr_obj.agent_nsr
81 return self._nsrs
82
83 def get_nsr(self, nsr_id):
84 if nsr_id in self._nsr_dict:
85 nsr_obj = self._nsr_dict[nsr_id]
86 return nsr_obj._nsr
87 return None
88
89 def get_vnfr_msg(self, vnfr_id, nsr_id=None):
90 self._log.debug("get_vnfr_msg(vnfr=%s, nsr=%s)",
91 vnfr_id, nsr_id)
92 found = False
93 if nsr_id:
94 if nsr_id in self._nsr_dict:
95 nsr_obj = self._nsr_dict[nsr_id]
96 if vnfr_id in nsr_obj._vnfr_dict:
97 found = True
98 else:
99 for nsr_obj in self._nsr_dict.values():
100 if vnfr_id in nsr_obj._vnfr_dict:
101 # Found it
102 found = True
103 break
104 if found:
105 vnf_cfg = nsr_obj._vnfr_dict[vnfr_id]['vnf_cfg']
106 return vnf_cfg['agent_vnfr'].vnfr_msg
107 else:
108 return None
109
110 @asyncio.coroutine
111 def get_nsd(self, nsr_id):
112 if nsr_id not in self._nsd_msg:
113 nsr_config = yield from self._parent.cmdts_obj.get_nsr_config(nsr_id)
114 self._nsd_msg[nsr_id] = nsr_config.nsd
115 return self._nsd_msg[nsr_id]
116
117 @property
118 def config_agent_plugins(self):
119 self._config_agent_plugins = []
120 for agent in self._parent._config_agent_mgr._plugin_instances.values():
121 self._config_agent_plugins.append(agent)
122 return self._config_agent_plugins
123
124 class ConfigManagerConfig(object):
125 def __init__(self, dts, log, loop, parent):
126 self._dts = dts
127 self._log = log
128 self._loop = loop
129 self._parent = parent
130 self._project = parent._project
131
132 self._nsr_dict = {}
133 self.pending_cfg = {}
134 self.terminate_cfg = {}
135 self.pending_tasks = [] # User for NSRid get retry
136 # (mainly excercised at restart case)
137
138 self._config_xpath = self._project.add_project("C,/rw-conman:cm-config")
139 self._opdata_xpath = self._project.add_project("D,/rw-conman:cm-state")
140
141 self.cm_config = conmanY.SoConfig()
142 # RO specific configuration
143 self.ro_config = {}
144 for key in self.cm_config.ro_endpoint.fields:
145 self.ro_config[key] = None
146
147 # Initialize cm-state
148 self.cm_state = {}
149 self.cm_state['cm_nsr'] = []
150 self.cm_state['states'] = "Initialized"
151
152 # Initialize objects to register
153 self.cmdts_obj = ConfigManagerDTS(self._log, self._loop, self, self._dts, self._project)
154 self._config_agent_mgr = conagent.RiftCMConfigAgent(
155 self._dts,
156 self._log,
157 self._loop,
158 self,
159 )
160 self.reg_handles = [
161 self.cmdts_obj,
162 self._config_agent_mgr,
163 RiftCM_rpc.RiftCMRPCHandler(self._dts, self._log, self._loop, self._project,
164 PretendNsm(
165 self._dts, self._log, self._loop, self)),
166 ]
167 self._op_reg = None
168
169 def is_nsr_valid(self, nsr_id):
170 if nsr_id in self._nsr_dict:
171 return True
172 return False
173
174 def add_to_pending_tasks(self, task):
175 if self.pending_tasks:
176 for p_task in self.pending_tasks:
177 if p_task['nsrid'] == task['nsrid']:
178 # Already queued
179 return
180 try:
181 self.pending_tasks.append(task)
182 self._log.debug("add_to_pending_tasks (nsrid:%s)",
183 task['nsrid'])
184 if len(self.pending_tasks) == 1:
185 self._loop.create_task(self.ConfigManagerConfig_pending_loop())
186 # TBD - change to info level
187 self._log.debug("Started pending_loop!")
188 except Exception as e:
189 self._log.error("Failed adding to pending tasks (%s)", str(e))
190
191 def del_from_pending_tasks(self, task):
192 try:
193 self.pending_tasks.remove(task)
194 except Exception as e:
195 self._log.error("Failed removing from pending tasks (%s)", str(e))
196
197 @asyncio.coroutine
198 def ConfigManagerConfig_pending_loop(self):
199 loop_sleep = 2
200 while True:
201 yield from asyncio.sleep(loop_sleep, loop=self._loop)
202 """
203 This pending task queue is ordred by events,
204 must finish previous task successfully to be able to go on to the next task
205 """
206 if self.pending_tasks:
207 self._log.debug("self.pending_tasks len=%s", len(self.pending_tasks))
208 task = self.pending_tasks[0]
209 done = False
210 if 'nsrid' in task:
211 nsrid = task['nsrid']
212 self._log.debug("Will execute pending task for NSR id(%s)", nsrid)
213 try:
214 # Try to configure this NSR
215 task['retries'] -= 1
216 done = yield from self.config_NSR(nsrid)
217 self._log.info("self.config_NSR status=%s", done)
218
219 except Exception as e:
220 self._log.error("Failed(%s) configuring NSR(%s)," \
221 "retries remained:%d!",
222 str(e), nsrid, task['retries'])
223 finally:
224 self.pending_tasks.remove(task)
225
226 if done:
227 self._log.debug("Finished pending task NSR id(%s):", nsrid)
228 else:
229 self._log.error("Failed configuring NSR(%s), retries remained:%d!",
230 nsrid, task['retries'])
231
232 # Failed, re-insert (append at the end)
233 # this failed task to be retried later
234 # If any retries remained.
235 if task['retries']:
236 self.pending_tasks.append(task)
237 else:
238 self._log.debug("Stopped pending_loop!")
239 break
240
241 @asyncio.coroutine
242 def register(self):
243 yield from self.register_cm_state_opdata()
244
245 # Initialize all handles that needs to be registered
246 for reg in self.reg_handles:
247 yield from reg.register()
248
249 def deregister(self):
250 # De-register all reg handles
251 self._log.debug("De-register ConfigManagerConfig for project {}".
252 format(self._project))
253
254 for reg in self.reg_handles:
255 reg.deregister()
256 reg = None
257
258 self._op_reg.delete_element(self._opdata_xpath)
259 self._op_reg.deregister()
260
261 @asyncio.coroutine
262 def register_cm_state_opdata(self):
263
264 def state_to_string(state):
265 state_dict = {
266 conmanY.RecordState.INIT : "init",
267 conmanY.RecordState.RECEIVED : "received",
268 conmanY.RecordState.CFG_PROCESS : "cfg_process",
269 conmanY.RecordState.CFG_PROCESS_FAILED : "cfg_process_failed",
270 conmanY.RecordState.CFG_SCHED : "cfg_sched",
271 conmanY.RecordState.CFG_DELAY : "cfg_delay",
272 conmanY.RecordState.CONNECTING : "connecting",
273 conmanY.RecordState.FAILED_CONNECTION : "failed_connection",
274 conmanY.RecordState.NETCONF_CONNECTED : "netconf_connected",
275 conmanY.RecordState.NETCONF_SSH_CONNECTED : "netconf_ssh_connected",
276 conmanY.RecordState.RESTCONF_CONNECTED : "restconf_connected",
277 conmanY.RecordState.CFG_SEND : "cfg_send",
278 conmanY.RecordState.CFG_FAILED : "cfg_failed",
279 conmanY.RecordState.READY_NO_CFG : "ready_no_cfg",
280 conmanY.RecordState.READY : "ready",
281 }
282 return state_dict[state]
283
284 @asyncio.coroutine
285 def on_prepare(xact_info, action, ks_path, msg):
286
287 self._log.debug("Received cm-state: msg=%s, action=%s", msg, action)
288
289 if action == rwdts.QueryAction.READ:
290 show_output = conmanY.CmOpdata()
291 show_output.from_dict(self.cm_state)
292 self._log.debug("Responding to SHOW cm-state: %s", self.cm_state)
293 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
294 xpath=self._opdata_xpath,
295 msg=show_output)
296 else:
297 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
298
299 self._log.info("Registering for cm-opdata xpath: %s",
300 self._opdata_xpath)
301
302 try:
303 handler=rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
304 self._op_reg = yield from self._dts.register(xpath=self._opdata_xpath,
305 handler=handler,
306 flags=rwdts.Flag.PUBLISHER)
307 self._log.info("Successfully registered for opdata(%s)", self._opdata_xpath)
308 except Exception as e:
309 self._log.error("Failed to register for opdata as (%s)", e)
310
311 @asyncio.coroutine
312 def process_nsd_vnf_configuration(self, nsr_obj, vnfr):
313
314 def get_config_method(vnf_config):
315 cfg_types = ['netconf', 'juju', 'script']
316 for method in cfg_types:
317 if method in vnf_config:
318 return method
319 return None
320
321 def get_cfg_file_extension(method, configuration_options):
322 ext_dict = {
323 "netconf" : "xml",
324 "script" : {
325 "bash" : "sh",
326 "expect" : "exp",
327 },
328 "juju" : "yml"
329 }
330
331 if method == "netconf":
332 return ext_dict[method]
333 elif method == "script":
334 return ext_dict[method][configuration_options['script_type']]
335 elif method == "juju":
336 return ext_dict[method]
337 else:
338 return "cfg"
339
340 # This is how the YAML file should look like,
341 # This routine will be called for each VNF, so keep appending the file.
342 # priority order is determined by the number,
343 # hence no need to generate the file in that order. A dictionary will be
344 # used that will take care of the order by number.
345 '''
346 1 : <== This is priority
347 name : trafsink_vnfd
348 member_vnf_index : 2
349 configuration_delay : 120
350 configuration_type : netconf
351 configuration_options :
352 username : admin
353 password : admin
354 port : 2022
355 target : running
356 2 :
357 name : trafgen_vnfd
358 member_vnf_index : 1
359 configuration_delay : 0
360 configuration_type : netconf
361 configuration_options :
362 username : admin
363 password : admin
364 port : 2022
365 target : running
366 '''
367
368 # Save some parameters needed as short cuts in flat structure (Also generated)
369 vnf_cfg = vnfr['vnf_cfg']
370 # Prepare unique name for this VNF
371 vnf_cfg['vnf_unique_name'] = get_vnf_unique_name(
372 vnf_cfg['nsr_name'], vnfr['short_name'], vnfr['member_vnf_index_ref'])
373
374 nsr_obj.cfg_path_prefix = '{}/{}_{}'.format(
375 nsr_obj.this_nsr_dir, vnfr['short_name'], vnfr['member_vnf_index_ref'])
376 nsr_vnfr = '{}/{}_{}'.format(
377 vnf_cfg['nsr_name'], vnfr['short_name'], vnfr['member_vnf_index_ref'])
378
379 # Get vnf_configuration from vnfr
380 vnf_config = vnfr['vnf_configuration']
381
382 self._log.debug("vnf_configuration = %s", vnf_config)
383
384 # Create priority dictionary
385 cfg_priority_order = 0
386 if ('config_attributes' in vnf_config and
387 'config_priority' in vnf_config['config_attributes']):
388 cfg_priority_order = vnf_config['config_attributes']['config_priority']
389
390 if cfg_priority_order not in nsr_obj.nsr_cfg_config_attributes_dict:
391 # No VNFR with this priority yet, initialize the list
392 nsr_obj.nsr_cfg_config_attributes_dict[cfg_priority_order] = []
393
394 method = get_config_method(vnf_config)
395 if method is not None:
396 # Create all sub dictionaries first
397 config_priority = {
398 'id' : vnfr['id'],
399 'name' : vnfr['short_name'],
400 'member_vnf_index' : vnfr['member_vnf_index_ref'],
401 }
402
403 if 'config_delay' in vnf_config['config_attributes']:
404 config_priority['configuration_delay'] = vnf_config['config_attributes']['config_delay']
405 vnf_cfg['config_delay'] = config_priority['configuration_delay']
406
407 configuration_options = {}
408 self._log.debug("config method=%s", method)
409 config_priority['configuration_type'] = method
410 vnf_cfg['config_method'] = method
411
412 # Set config agent based on method
413 self._config_agent_mgr.set_config_agent(
414 nsr_obj.agent_nsr, vnf_cfg['agent_vnfr'], method)
415
416 cfg_opt_list = [
417 'port', 'target', 'script_type', 'ip_address', 'user', 'secret',
418 ]
419 for cfg_opt in cfg_opt_list:
420 if cfg_opt in vnf_config[method]:
421 configuration_options[cfg_opt] = vnf_config[method][cfg_opt]
422 vnf_cfg[cfg_opt] = configuration_options[cfg_opt]
423
424 cfg_opt_list = ['mgmt_ip_address', 'username', 'password']
425 for cfg_opt in cfg_opt_list:
426 if cfg_opt in vnf_config['config_access']:
427 configuration_options[cfg_opt] = vnf_config['config_access'][cfg_opt]
428 vnf_cfg[cfg_opt] = configuration_options[cfg_opt]
429
430 # Add to the cp_dict
431 vnf_cp_dict = nsr_obj._cp_dict[vnfr['member_vnf_index_ref']]
432 vnf_cp_dict['rw_mgmt_ip'] = vnf_cfg['mgmt_ip_address']
433 vnf_cp_dict['rw_username'] = vnf_cfg['username']
434 vnf_cp_dict['rw_password'] = vnf_cfg['password']
435
436
437 # TBD - see if we can neatly include the config in "config_attributes" file, no need though
438 #config_priority['config_template'] = vnf_config['config_template']
439 # Create config file
440 vnf_cfg['juju_script'] = os.path.join(self._parent.cfg_dir, 'juju_if.py')
441
442 if 'config_template' in vnf_config:
443 vnf_cfg['cfg_template'] = '{}_{}_template.cfg'.format(nsr_obj.cfg_path_prefix, config_priority['configuration_type'])
444 vnf_cfg['cfg_file'] = '{}.{}'.format(nsr_obj.cfg_path_prefix, get_cfg_file_extension(method, configuration_options))
445 vnf_cfg['xlate_script'] = os.path.join(self._parent.cfg_dir, 'xlate_cfg.py')
446 try:
447 # Now write this template into file
448 with open(vnf_cfg['cfg_template'], "w") as cf:
449 cf.write(vnf_config['config_template'])
450 except Exception as e:
451 self._log.error("Processing NSD, failed to generate configuration template : %s (Error : %s)",
452 vnf_config['config_template'], str(e))
453 raise
454
455 self._log.debug("VNF endpoint so far: %s", vnf_cfg)
456
457 # Populate filled up dictionary
458 config_priority['configuration_options'] = configuration_options
459 nsr_obj.nsr_cfg_config_attributes_dict[cfg_priority_order].append(config_priority)
460 nsr_obj.num_vnfs_to_cfg += 1
461 nsr_obj._vnfr_dict[vnf_cfg['vnf_unique_name']] = vnfr
462 nsr_obj._vnfr_dict[vnfr['id']] = vnfr
463
464 self._log.debug("VNF:(%s) config_attributes = %s",
465 log_this_vnf(vnfr['vnf_cfg']),
466 nsr_obj.nsr_cfg_config_attributes_dict)
467 else:
468 self._log.info("VNF:(%s) is not to be configured by Configuration Manager!",
469 log_this_vnf(vnfr['vnf_cfg']))
470 yield from nsr_obj.update_vnf_cm_state(vnfr, conmanY.RecordState.READY_NO_CFG)
471
472 # Update the cm-state
473 nsr_obj.populate_vm_state_from_vnf_cfg()
474
475 @asyncio.coroutine
476 def config_NSR(self, id):
477
478 def my_yaml_dump(config_attributes_dict, yf):
479
480 yaml_dict = dict(sorted(config_attributes_dict.items()))
481 yf.write(yaml.dump(yaml_dict, default_flow_style=False))
482
483 nsr_dict = self._nsr_dict
484 self._log.info("Configure NSR, id = %s", id)
485
486 #####################TBD###########################
487 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_create_nsr', self.id, self._nsd)
488 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_nsr_active', self.id, self._vnfrs)
489
490 try:
491 if id not in nsr_dict:
492 nsr_obj = ConfigManagerNSR(self._log, self._loop, self, self._project, id)
493 nsr_dict[id] = nsr_obj
494 else:
495 self._log.info("NSR(%s) is already initialized!", id)
496 nsr_obj = nsr_dict[id]
497 except Exception as e:
498 self._log.error("Failed creating NSR object for (%s) as (%s)", id, str(e))
499 raise
500
501 # Try to configure this NSR only if not already processed
502 if nsr_obj.cm_nsr['state'] != nsr_obj.state_to_string(conmanY.RecordState.INIT):
503 self._log.debug("NSR(%s) is already processed, state=%s",
504 nsr_obj.nsr_name, nsr_obj.cm_nsr['state'])
505 # yield from nsr_obj.publish_cm_state()
506 return True
507
508 cmdts_obj = self.cmdts_obj
509 try:
510 # Fetch NSR
511 nsr = yield from cmdts_obj.get_nsr(id)
512 self._log.debug("Full NSR : %s", nsr)
513 if nsr['operational_status'] != "running":
514 self._log.info("NSR(%s) is not ready yet!", nsr['nsd_name_ref'])
515 return False
516 self._nsr = nsr
517
518 # Create Agent NSR class
519 nsr_config = yield from cmdts_obj.get_nsr_config(id)
520 self._log.debug("NSR {} config: {}".format(id, nsr_config))
521 nsr_obj.agent_nsr = riftcm_config_plugin.RiftCMnsr(nsr, nsr_config, self._project)
522
523 try:
524 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.RECEIVED)
525
526 # Parse NSR
527 if nsr is not None:
528 nsr_obj.set_nsr_name(nsr['name_ref'])
529 nsr_dir = os.path.join(self._parent.cfg_dir, nsr_obj.nsr_name)
530 self._log.info("Checking NS config directory: %s", nsr_dir)
531 if not os.path.isdir(nsr_dir):
532 os.makedirs(nsr_dir)
533 # self._log.critical("NS %s is not to be configured by Service Orchestrator!", nsr_obj.nsr_name)
534 # yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.READY_NO_CFG)
535 # return
536
537 nsr_obj.set_config_dir(self)
538
539 for const_vnfr in nsr['constituent_vnfr_ref']:
540 self._log.debug("Fetching VNFR (%s)", const_vnfr['vnfr_id'])
541 vnfr_msg = yield from cmdts_obj.get_vnfr(const_vnfr['vnfr_id'])
542 if vnfr_msg:
543 vnfr = vnfr_msg.as_dict()
544 self._log.info("create VNF:{}/{}".format(nsr_obj.nsr_name, vnfr['short_name']))
545 agent_vnfr = yield from nsr_obj.add_vnfr(vnfr, vnfr_msg)
546
547 # Preserve order, self.process_nsd_vnf_configuration()
548 # sets up the config agent based on the method
549 yield from self.process_nsd_vnf_configuration(nsr_obj, vnfr)
550 yield from self._config_agent_mgr.invoke_config_agent_plugins(
551 'notify_create_vnfr',
552 nsr_obj.agent_nsr,
553 agent_vnfr)
554
555 #####################TBD###########################
556 # self._log.debug("VNF active. Apply initial config for vnfr {}".format(vnfr.name))
557 # yield from self._config_agent_mgr.invoke_config_agent_plugins('apply_initial_config',
558 # vnfr.id, vnfr)
559 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_terminate_vnf', self.id, vnfr)
560
561 except Exception as e:
562 self._log.error("Failed processing NSR (%s) as (%s)", nsr_obj.nsr_name, str(e))
563 self._log.exception(e)
564 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_PROCESS_FAILED)
565 raise
566
567 try:
568 # Generate config_config_attributes.yaml (For debug reference)
569 with open(nsr_obj.config_attributes_file, "w") as yf:
570 my_yaml_dump(nsr_obj.nsr_cfg_config_attributes_dict, yf)
571 except Exception as e:
572 self._log.error("NS:(%s) failed to write config attributes file as (%s)", nsr_obj.nsr_name, str(e))
573
574 try:
575 # Generate nsr_xlate_dict.yaml (For debug reference)
576 with open(nsr_obj.xlate_dict_file, "w") as yf:
577 yf.write(yaml.dump(nsr_obj._cp_dict, default_flow_style=False))
578 except Exception as e:
579 self._log.error("NS:(%s) failed to write nsr xlate tags file as (%s)", nsr_obj.nsr_name, str(e))
580
581 self._log.debug("Starting to configure each VNF")
582
583 # Check if this NS has input parametrs
584 self._log.info("Checking NS configuration order: %s", nsr_obj.config_attributes_file)
585
586 if os.path.exists(nsr_obj.config_attributes_file):
587 # Apply configuration is specified order
588 try:
589 # Go in loop to configure by specified order
590 self._log.info("Using Dynamic configuration input parametrs for NS: %s", nsr_obj.nsr_name)
591
592 # cfg_delay = nsr_obj.nsr_cfg_config_attributes_dict['configuration_delay']
593 # if cfg_delay:
594 # self._log.info("Applying configuration delay for NS (%s) ; %d seconds",
595 # nsr_obj.nsr_name, cfg_delay)
596 # yield from asyncio.sleep(cfg_delay, loop=self._loop)
597
598 for config_attributes_dict in nsr_obj.nsr_cfg_config_attributes_dict.values():
599 # Iterate through each priority level
600 for vnf_config_attributes_dict in config_attributes_dict:
601 # Iterate through each vnfr at this priority level
602
603 # Make up vnf_unique_name with vnfd name and member index
604 #vnfr_name = "{}.{}".format(nsr_obj.nsr_name, vnf_config_attributes_dict['name'])
605 vnf_unique_name = get_vnf_unique_name(
606 nsr_obj.nsr_name,
607 vnf_config_attributes_dict['name'],
608 str(vnf_config_attributes_dict['member_vnf_index']),
609 )
610 self._log.info("NS (%s) : VNF (%s) - Processing configuration attributes",
611 nsr_obj.nsr_name, vnf_unique_name)
612
613 # Find vnfr for this vnf_unique_name
614 if vnf_unique_name not in nsr_obj._vnfr_dict:
615 self._log.error("NS (%s) - Can not find VNF to be configured: %s", nsr_obj.nsr_name, vnf_unique_name)
616 else:
617 # Save this unique VNF's config input parameters
618 nsr_obj.vnf_config_attributes_dict[vnf_unique_name] = vnf_config_attributes_dict
619 nsr_obj.ConfigVNF(nsr_obj._vnfr_dict[vnf_unique_name])
620
621 # Now add the entire NS to the pending config list.
622 self._log.info("Scheduling NSR:{} configuration".format(nsr_obj.nsr_name))
623 self._parent.add_to_pending(nsr_obj)
624 self._parent.add_nsr_obj(nsr_obj)
625
626 except Exception as e:
627 self._log.error("Failed processing input parameters for NS (%s) as %s", nsr_obj.nsr_name, str(e))
628 raise
629 else:
630 self._log.error("No configuration input parameters for NSR (%s)", nsr_obj.nsr_name)
631
632 except Exception as e:
633 self._log.error("Failed to configure NS (%s) as (%s)", nsr_obj.nsr_name, str(e))
634 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_PROCESS_FAILED)
635 raise
636
637 return True
638
639 @asyncio.coroutine
640 def terminate_NSR(self, id):
641 nsr_dict = self._nsr_dict
642 if id not in nsr_dict:
643 self._log.debug("NSR(%s) does not exist!", id)
644 return
645 else:
646 try:
647 # Remove this NSR if we have it on pending task list
648 for task in self.pending_tasks:
649 if task['nsrid'] == id:
650 self.del_from_pending_tasks(task)
651
652 # Remove this object from global list
653 nsr_obj = nsr_dict.pop(id, None)
654
655 # Remove this NS cm-state from global status list
656 self.cm_state['cm_nsr'].remove(nsr_obj.cm_nsr)
657
658 # Also remove any scheduled configuration event
659 for nsr_obj_p in self._parent.pending_cfg:
660 if nsr_obj_p == nsr_obj:
661 assert id == nsr_obj_p._nsr_id
662 #self._parent.pending_cfg.remove(nsr_obj_p)
663 # Mark this as being deleted so we do not try to configure
664 # it if we are in cfg_delay (will wake up and continue to process otherwise)
665 nsr_obj_p.being_deleted = True
666 self._log.info("Removed scheduled configuration for NSR(%s)", nsr_obj.nsr_name)
667
668 self._parent.remove_nsr_obj(id)
669
670 # Call Config Agent to clean up for each VNF
671 for agent_vnfr in nsr_obj.agent_nsr.vnfrs:
672 yield from self._config_agent_mgr.invoke_config_agent_plugins(
673 'notify_terminate_vnfr',
674 nsr_obj.agent_nsr,
675 agent_vnfr)
676
677 # publish delete cm-state (cm-nsr)
678 yield from nsr_obj.delete_cm_nsr()
679
680 #####################TBD###########################
681 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_terminate_ns', self.id)
682
683 self._log.info("NSR(%s/%s) is deleted", nsr_obj.nsr_name, id)
684
685 except Exception as e:
686 self._log.exception("Terminate NSR exception: {}".format(e))
687
688 @asyncio.coroutine
689 def process_initial_config(self, nsr_obj, conf, script, vnfr_name=None):
690 '''Apply the initial-config-primitives specified in NSD or VNFD'''
691
692 def get_input_file(parameters):
693 inp = {}
694
695 # Add NSR name to file
696 inp['nsr_name'] = nsr_obj.nsr_name
697
698 # Add VNFR name if available
699 if vnfr_name:
700 inp['vnfr_name'] = vnfr_name
701
702 # Add parameters for initial config
703 inp['parameter'] = {}
704 for parameter in parameters:
705 try:
706 inp['parameter'][parameter['name']] = parameter['value']
707 except KeyError as e:
708 if vnfr_name:
709 self._log.info("VNFR {} initial config parameter {} with no value: {}".
710 format(vnfr_name, parameter, e))
711 else:
712 self._log.info("NSR {} initial config parameter {} with no value: {}".
713 format(nsr_obj.nsr_name, parameter, e))
714
715
716 # Add config agents specific to each VNFR
717 inp['config-agent'] = {}
718 for vnfr in nsr_obj.agent_nsr.vnfrs:
719 # Get the config agent for the VNFR
720 # If vnfr name is specified, add only CA specific to that
721 if (vnfr_name is None) or \
722 (vnfr_name == vnfr.name):
723 agent = self._config_agent_mgr.get_vnfr_config_agent(vnfr.vnfr_msg)
724 if agent:
725 if agent.agent_type != riftcm_config_plugin.DEFAULT_CAP_TYPE:
726 inp['config-agent'][vnfr.member_vnf_index] = agent.agent_data
727 inp['config-agent'][vnfr.member_vnf_index] \
728 ['service-name'] = agent.get_service_name(vnfr.id)
729
730 # Add vnfrs specific data
731 inp['vnfr'] = {}
732 for vnfr in nsr_obj.vnfrs:
733 v = {}
734
735 v['name'] = vnfr['name']
736 v['mgmt_ip_address'] = vnfr['vnf_cfg']['mgmt_ip_address']
737 v['mgmt_port'] = vnfr['vnf_cfg']['port']
738
739 if 'dashboard_url' in vnfr:
740 v['dashboard_url'] = vnfr['dashboard_url']
741
742 if 'connection_point' in vnfr:
743 v['connection_point'] = []
744 for cp in vnfr['connection_point']:
745 v['connection_point'].append(
746 {
747 'name': cp['name'],
748 'ip_address': cp['ip_address'],
749 }
750 )
751
752 v['vdur'] = []
753 vdu_data = []
754 for vdu in vnfr['vdur']:
755 d = {}
756 for k in ['name','management_ip', 'vm_management_ip', 'id', 'vdu_id_ref']:
757 if k in vdu:
758 d[k] = vdu[k]
759 vdu_data.append(d)
760 v['vdur'] = vdu_data
761
762 inp['vnfr'][vnfr['member_vnf_index_ref']] = v
763
764
765 self._log.debug("Input data for {}: {}".
766 format((vnfr_name if vnfr_name else nsr_obj.nsr_name),
767 inp))
768
769 # Convert to YAML string
770 yaml_string = yaml.dump(inp, default_flow_style=False)
771
772 # Write the inputs as yaml file
773 tmp_file = None
774 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
775 tmp_file.write(yaml_string.encode("UTF-8"))
776 self._log.debug("Input file created for {}: {}".
777 format((vnfr_name if vnfr_name \
778 else nsr_obj.nsr_name),
779 tmp_file.name))
780
781 return tmp_file.name
782
783 parameters = []
784 try:
785 parameters = conf['parameter']
786 except Exception as e:
787 self._log.debug("Parameter conf: {}, e: {}".
788 format(conf, e))
789
790 inp_file = get_input_file(parameters)
791
792 cmd = "{0} {1}".format(script, inp_file)
793 self._log.debug("Running the CMD: {}".format(cmd))
794
795 process = yield from asyncio.create_subprocess_shell(cmd,
796 loop=self._loop,
797 stdout=subprocess.PIPE,
798 stderr=subprocess.PIPE)
799 stdout, stderr = yield from process.communicate()
800 rc = yield from process.wait()
801
802 if rc:
803 msg = "NSR/VNFR {} initial config using {} failed with {}: {}". \
804 format(vnfr_name if vnfr_name else nsr_obj.nsr_name,
805 script, rc, stderr)
806 self._log.error(msg)
807 raise InitialConfigError(msg)
808
809 try:
810 os.remove(inp_file)
811 except Exception as e:
812 self._log.debug("Error removing input file {}: {}".
813 format(inp_file, e))
814
815 def get_script_file(self, script_name, d_name, d_id, d_type):
816 # Get the full path to the script
817 script = ''
818 # If script name starts with /, assume it is full path
819 if script_name[0] == '/':
820 # The script has full path, use as is
821 script = script_name
822 else:
823 script = os.path.join(os.environ['RIFT_ARTIFACTS'],
824 'launchpad/packages',
825 d_type,
826 d_id,
827 d_name,
828 'scripts',
829 script_name)
830 self._log.debug("Checking for script at %s", script)
831 if not os.path.exists(script):
832 self._log.warning("Did not find script %s", script)
833 script = os.path.join(os.environ['RIFT_INSTALL'],
834 'usr/bin',
835 script_name)
836
837 # Seen cases in jenkins, where the script execution fails
838 # with permission denied. Setting the permission on script
839 # to make sure it has execute permission
840 perm = os.stat(script).st_mode
841 if not (perm & stat.S_IXUSR):
842 self._log.warning("NSR/VNFR {} initial config script {} " \
843 "without execute permission: {}".
844 format(d_name, script, perm))
845 os.chmod(script, perm | stat.S_IXUSR)
846 return script
847
848 @asyncio.coroutine
849 def process_ns_initial_config(self, nsr_obj):
850 '''Apply the initial-config-primitives specified in NSD'''
851
852 nsr = yield from self.cmdts_obj.get_nsr(nsr_obj.nsr_id)
853 if 'initial_config_primitive' not in nsr:
854 return
855
856 if nsr is not None:
857 nsd = yield from self.cmdts_obj.get_nsd(nsr_obj.nsr_id)
858 for conf in nsr['initial_config_primitive']:
859 self._log.debug("NSR {} initial config: {}".
860 format(nsr_obj.nsr_name, conf))
861 script = self.get_script_file(conf['user_defined_script'],
862 nsd.name,
863 nsd.id,
864 'nsd')
865
866 yield from self.process_initial_config(nsr_obj, conf, script)
867
868 @asyncio.coroutine
869 def process_vnf_initial_config(self, nsr_obj, vnfr):
870 '''Apply the initial-config-primitives specified in VNFD'''
871
872 vnfr_name = vnfr.name
873
874 vnfd = vnfr.vnfd
875 vnf_cfg = vnfd.vnf_configuration
876
877 for conf in vnf_cfg.initial_config_primitive:
878 self._log.debug("VNFR {} initial config: {}".
879 format(vnfr_name, conf))
880
881 if not conf.user_defined_script:
882 self._log.debug("VNFR {} did not fine user defined script: {}".
883 format(vnfr_name, conf))
884 continue
885
886 script = self.get_script_file(conf.user_defined_script,
887 vnfd.name,
888 vnfd.id,
889 'vnfd')
890
891 yield from self.process_initial_config(nsr_obj,
892 conf.as_dict(),
893 script,
894 vnfr_name=vnfr_name)
895
896
897 class ConfigManagerNSR(object):
898 def __init__(self, log, loop, parent, project, id):
899 self._log = log
900 self._loop = loop
901 self._rwcal = None
902 self._vnfr_dict = {}
903 self._cp_dict = {}
904 self._nsr_id = id
905 self._parent = parent
906 self._project = project
907 self._log.info("Instantiated NSR entry for id=%s", id)
908 self.nsr_cfg_config_attributes_dict = {}
909 self.vnf_config_attributes_dict = {}
910 self.num_vnfs_to_cfg = 0
911 self._vnfr_list = []
912 self.vnf_cfg_list = []
913 self.this_nsr_dir = None
914 self.being_deleted = False
915 self.dts_obj = self._parent.cmdts_obj
916
917 # Initialize cm-state for this NS
918 self.cm_nsr = {}
919 self.cm_nsr['cm_vnfr'] = []
920 self.cm_nsr['id'] = id
921 self.cm_nsr['state'] = self.state_to_string(conmanY.RecordState.INIT)
922 self.cm_nsr['state_details'] = None
923
924 self.set_nsr_name('Not Set')
925
926 # Add this NSR cm-state object to global cm-state
927 parent.cm_state['cm_nsr'].append(self.cm_nsr)
928
929 # Place holders for NSR & VNFR classes
930 self.agent_nsr = None
931
932 @property
933 def nsr_opdata_xpath(self):
934 ''' Returns full xpath for this NSR cm-state opdata '''
935 return self._project.add_project((
936 "D,/rw-conman:cm-state/rw-conman:cm-nsr[rw-conman:id='{}']"
937 ).format(self._nsr_id))
938
939 @property
940 def vnfrs(self):
941 return self._vnfr_list
942
943 @property
944 def parent(self):
945 return self._parent
946
947 @property
948 def nsr_id(self):
949 return self._nsr_id
950
951 @asyncio.coroutine
952 def publish_cm_state(self):
953 ''' This function publishes cm_state for this NSR '''
954
955 cm_state = conmanY.CmOpdata()
956 cm_state_nsr = cm_state.cm_nsr.add()
957 cm_state_nsr.from_dict(self.cm_nsr)
958 #with self._dts.transaction() as xact:
959 yield from self.dts_obj.update(self.nsr_opdata_xpath, cm_state_nsr)
960 self._log.info("Published cm-state with xpath %s and nsr %s",
961 self.nsr_opdata_xpath,
962 cm_state_nsr)
963
964 @asyncio.coroutine
965 def delete_cm_nsr(self):
966 ''' This function publishes cm_state for this NSR '''
967
968 yield from self.dts_obj.delete(self.nsr_opdata_xpath)
969 self._log.info("Deleted cm-nsr with xpath %s",
970 self.nsr_opdata_xpath)
971
972 def set_nsr_name(self, name):
973 self.nsr_name = name
974 self.cm_nsr['name'] = name
975
976 def set_config_dir(self, caller):
977 self.this_nsr_dir = os.path.join(
978 caller._parent.cfg_dir, self.nsr_name, caller._nsr['name_ref'])
979 if not os.path.exists(self.this_nsr_dir):
980 os.makedirs(self.this_nsr_dir)
981 self._log.debug("NSR:(%s), Created configuration directory(%s)",
982 caller._nsr['name_ref'], self.this_nsr_dir)
983 self.config_attributes_file = os.path.join(self.this_nsr_dir, "configuration_config_attributes.yml")
984 self.xlate_dict_file = os.path.join(self.this_nsr_dir, "nsr_xlate_dict.yml")
985
986 def xlate_conf(self, vnfr, vnf_cfg):
987
988 # If configuration type is not already set, try to read from attributes
989 if vnf_cfg['interface_type'] is None:
990 # Prepare unique name for this VNF
991 vnf_unique_name = get_vnf_unique_name(
992 vnf_cfg['nsr_name'],
993 vnfr['short_name'],
994 vnfr['member_vnf_index_ref'],
995 )
996
997 # Find this particular (unique) VNF's config attributes
998 if (vnf_unique_name in self.vnf_config_attributes_dict):
999 vnf_cfg_config_attributes_dict = self.vnf_config_attributes_dict[vnf_unique_name]
1000 vnf_cfg['interface_type'] = vnf_cfg_config_attributes_dict['configuration_type']
1001 if 'configuration_options' in vnf_cfg_config_attributes_dict:
1002 cfg_opts = vnf_cfg_config_attributes_dict['configuration_options']
1003 for key, value in cfg_opts.items():
1004 vnf_cfg[key] = value
1005
1006 cfg_path_prefix = '{}/{}/{}_{}'.format(
1007 self._parent._parent.cfg_dir,
1008 vnf_cfg['nsr_name'],
1009 vnfr['short_name'],
1010 vnfr['member_vnf_index_ref'],
1011 )
1012
1013 vnf_cfg['cfg_template'] = '{}_{}_template.cfg'.format(cfg_path_prefix, vnf_cfg['interface_type'])
1014 vnf_cfg['cfg_file'] = '{}.cfg'.format(cfg_path_prefix)
1015 vnf_cfg['xlate_script'] = self._parent._parent.cfg_dir + '/xlate_cfg.py'
1016
1017 self._log.debug("VNF endpoint so far: %s", vnf_cfg)
1018
1019 self._log.info("Checking cfg_template %s", vnf_cfg['cfg_template'])
1020 if os.path.exists(vnf_cfg['cfg_template']):
1021 return True
1022 return False
1023
1024 def ConfigVNF(self, vnfr):
1025
1026 vnf_cfg = vnfr['vnf_cfg']
1027 vnf_cm_state = self.find_or_create_vnfr_cm_state(vnf_cfg)
1028
1029 if (vnf_cm_state['state'] == self.state_to_string(conmanY.RecordState.READY_NO_CFG)
1030 or
1031 vnf_cm_state['state'] == self.state_to_string(conmanY.RecordState.READY)):
1032 self._log.warning("NS/VNF (%s/%s) is already configured! Skipped.", self.nsr_name, vnfr['short_name'])
1033 return
1034
1035 #UPdate VNF state
1036 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS)
1037
1038 # Now translate the configuration for iP addresses
1039 try:
1040 # Add cp_dict members (TAGS) for this VNF
1041 self._cp_dict['rw_mgmt_ip'] = vnf_cfg['mgmt_ip_address']
1042 self._cp_dict['rw_username'] = vnf_cfg['username']
1043 self._cp_dict['rw_password'] = vnf_cfg['password']
1044 ############################################################
1045 # TBD - Need to lookup above 3 for a given VNF, not global #
1046 # Once we do that no need to dump below file again before #
1047 # each VNF configuration translation. #
1048 # This will require all existing config templates to be #
1049 # changed for above three tags to include member index #
1050 ############################################################
1051 try:
1052 nsr_obj = vnf_cfg['nsr_obj']
1053 # Generate config_config_attributes.yaml (For debug reference)
1054 with open(nsr_obj.xlate_dict_file, "w") as yf:
1055 yf.write(yaml.dump(nsr_obj._cp_dict, default_flow_style=False))
1056 except Exception as e:
1057 self._log.error("NS:(%s) failed to write nsr xlate tags file as (%s)", nsr_obj.nsr_name, str(e))
1058
1059 if 'cfg_template' in vnf_cfg:
1060 script_cmd = 'python3 {} -i {} -o {} -x "{}"'.format(vnf_cfg['xlate_script'], vnf_cfg['cfg_template'], vnf_cfg['cfg_file'], self.xlate_dict_file)
1061 self._log.debug("xlate script command (%s)", script_cmd)
1062 #xlate_msg = subprocess.check_output(script_cmd).decode('utf-8')
1063 xlate_msg = subprocess.check_output(script_cmd, shell=True).decode('utf-8')
1064 self._log.info("xlate script output (%s)", xlate_msg)
1065 except Exception as e:
1066 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS_FAILED)
1067 self._log.error("Failed to execute translation script for VNF: %s with (%s)", log_this_vnf(vnf_cfg), str(e))
1068 return
1069
1070 self._log.info("Applying config to VNF: %s = %s!", log_this_vnf(vnf_cfg), vnf_cfg)
1071 try:
1072 #self.vnf_cfg_list.append(vnf_cfg)
1073 self._log.debug("Scheduled configuration!")
1074 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_SCHED)
1075 except Exception as e:
1076 self._log.error("Failed apply_vnf_config to VNF: %s as (%s)", log_this_vnf(vnf_cfg), str(e))
1077 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS_FAILED)
1078 raise
1079
1080 def add(self, nsr):
1081 self._log.info("Adding NS Record for id=%s", id)
1082 self._nsr = nsr
1083
1084 def sample_cm_state(self):
1085 return (
1086 {
1087 'cm_nsr': [
1088 {
1089 'cm_vnfr': [
1090 {
1091 'cfg_location': 'location1',
1092 'cfg_type': 'script',
1093 'connection_point': [
1094 {'ip_address': '1.1.1.1', 'name': 'vnf1cp1'},
1095 {'ip_address': '1.1.1.2', 'name': 'vnf1cp2'}
1096 ],
1097 'id': 'vnfrid1',
1098 'mgmt_interface': {'ip_address': '7.1.1.1',
1099 'port': 1001},
1100 'name': 'vnfrname1',
1101 'state': 'init'
1102 },
1103 {
1104 'cfg_location': 'location2',
1105 'cfg_type': 'netconf',
1106 'connection_point': [{'ip_address': '2.1.1.1', 'name': 'vnf2cp1'},
1107 {'ip_address': '2.1.1.2', 'name': 'vnf2cp2'}],
1108 'id': 'vnfrid2',
1109 'mgmt_interface': {'ip_address': '7.1.1.2',
1110 'port': 1001},
1111 'name': 'vnfrname2',
1112 'state': 'init'}
1113 ],
1114 'id': 'nsrid1',
1115 'name': 'nsrname1',
1116 'state': 'init'}
1117 ],
1118 'states': 'Initialized, '
1119 })
1120
1121 def populate_vm_state_from_vnf_cfg(self):
1122 # Fill in each VNFR from this nsr object
1123 vnfr_list = self._vnfr_list
1124 for vnfr in vnfr_list:
1125 vnf_cfg = vnfr['vnf_cfg']
1126 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1127
1128 if vnf_cm_state:
1129 # Fill in VNF management interface
1130 vnf_cm_state['mgmt_interface']['ip_address'] = vnf_cfg['mgmt_ip_address']
1131 vnf_cm_state['mgmt_interface']['port'] = vnf_cfg['port']
1132
1133 # Fill in VNF configuration details
1134 vnf_cm_state['cfg_type'] = vnf_cfg['config_method']
1135 vnf_cm_state['cfg_location'] = vnf_cfg['cfg_file']
1136
1137 # Fill in each connection-point for this VNF
1138 if "connection_point" in vnfr:
1139 cp_list = vnfr['connection_point']
1140 for cp_item_dict in cp_list:
1141 vnf_cm_state['connection_point'].append(
1142 {
1143 'name' : cp_item_dict['name'],
1144 'ip_address' : cp_item_dict['ip_address'],
1145 }
1146 )
1147
1148 def state_to_string(self, state):
1149 state_dict = {
1150 conmanY.RecordState.INIT : "init",
1151 conmanY.RecordState.RECEIVED : "received",
1152 conmanY.RecordState.CFG_PROCESS : "cfg_process",
1153 conmanY.RecordState.CFG_PROCESS_FAILED : "cfg_process_failed",
1154 conmanY.RecordState.CFG_SCHED : "cfg_sched",
1155 conmanY.RecordState.CFG_DELAY : "cfg_delay",
1156 conmanY.RecordState.CONNECTING : "connecting",
1157 conmanY.RecordState.FAILED_CONNECTION : "failed_connection",
1158 conmanY.RecordState.NETCONF_CONNECTED : "netconf_connected",
1159 conmanY.RecordState.NETCONF_SSH_CONNECTED : "netconf_ssh_connected",
1160 conmanY.RecordState.RESTCONF_CONNECTED : "restconf_connected",
1161 conmanY.RecordState.CFG_SEND : "cfg_send",
1162 conmanY.RecordState.CFG_FAILED : "cfg_failed",
1163 conmanY.RecordState.READY_NO_CFG : "ready_no_cfg",
1164 conmanY.RecordState.READY : "ready",
1165 }
1166 return state_dict[state]
1167
1168 def find_vnfr_cm_state(self, id):
1169 if self.cm_nsr['cm_vnfr']:
1170 for vnf_cm_state in self.cm_nsr['cm_vnfr']:
1171 if vnf_cm_state['id'] == id:
1172 return vnf_cm_state
1173 return None
1174
1175 def find_or_create_vnfr_cm_state(self, vnf_cfg):
1176 vnfr = vnf_cfg['vnfr']
1177 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1178
1179 if vnf_cm_state is None:
1180 # Not found, Create and Initialize this VNF cm-state
1181 vnf_cm_state = {
1182 'id' : vnfr['id'],
1183 'name' : vnfr['short_name'],
1184 'state' : self.state_to_string(conmanY.RecordState.RECEIVED),
1185 'mgmt_interface' :
1186 {
1187 'ip_address' : vnf_cfg['mgmt_ip_address'],
1188 'port' : vnf_cfg['port'],
1189 },
1190 'cfg_type' : vnf_cfg['config_method'],
1191 'cfg_location' : vnf_cfg['cfg_file'],
1192 'connection_point' : [],
1193 }
1194 self.cm_nsr['cm_vnfr'].append(vnf_cm_state)
1195
1196 # Publish newly created cm-state
1197
1198
1199 return vnf_cm_state
1200
1201 @asyncio.coroutine
1202 def get_vnf_cm_state(self, vnfr):
1203 if vnfr:
1204 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1205 if vnf_cm_state:
1206 return vnf_cm_state['state']
1207 return False
1208
1209 @asyncio.coroutine
1210 def update_vnf_cm_state(self, vnfr, state):
1211 if vnfr:
1212 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1213 if vnf_cm_state is None:
1214 self._log.error("No opdata found for NS/VNF:%s/%s!",
1215 self.nsr_name, vnfr['short_name'])
1216 return
1217
1218 if vnf_cm_state['state'] != self.state_to_string(state):
1219 old_state = vnf_cm_state['state']
1220 vnf_cm_state['state'] = self.state_to_string(state)
1221 # Publish new state
1222 yield from self.publish_cm_state()
1223 self._log.info("VNF ({}/{}/{}) state change: {} -> {}"
1224 .format(self.nsr_name,
1225 vnfr['short_name'],
1226 vnfr['member_vnf_index_ref'],
1227 old_state,
1228 vnf_cm_state['state']))
1229
1230 else:
1231 self._log.error("No VNFR supplied for state update (NS=%s)!",
1232 self.nsr_name)
1233
1234 @property
1235 def get_ns_cm_state(self):
1236 return self.cm_nsr['state']
1237
1238 @asyncio.coroutine
1239 def update_ns_cm_state(self, state, state_details=None):
1240 if self.cm_nsr['state'] != self.state_to_string(state):
1241 old_state = self.cm_nsr['state']
1242 self.cm_nsr['state'] = self.state_to_string(state)
1243 self.cm_nsr['state_details'] = state_details if state_details is not None else None
1244 self._log.info("NS ({}) state change: {} -> {}"
1245 .format(self.nsr_name,
1246 old_state,
1247 self.cm_nsr['state']))
1248 # Publish new state
1249 yield from self.publish_cm_state()
1250
1251 @asyncio.coroutine
1252 def add_vnfr(self, vnfr, vnfr_msg):
1253
1254 @asyncio.coroutine
1255 def populate_subnets_from_vlr(id):
1256 try:
1257 # Populate cp_dict with VLR subnet info
1258 vlr = yield from self.dts_obj.get_vlr(id)
1259 if vlr is not None and 'assigned_subnet' in vlr:
1260 subnet = {vlr.name:vlr.assigned_subnet}
1261 self._cp_dict[vnfr['member_vnf_index_ref']].update(subnet)
1262 self._cp_dict.update(subnet)
1263 self._log.debug("VNF:(%s) Updated assigned subnet = %s",
1264 vnfr['short_name'], subnet)
1265 except Exception as e:
1266 self._log.error("VNF:(%s) VLR Error = %s",
1267 vnfr['short_name'], e)
1268
1269 if vnfr['id'] not in self._vnfr_dict:
1270 self._log.info("NSR(%s) : Adding VNF Record for name=%s, id=%s", self._nsr_id, vnfr['short_name'], vnfr['id'])
1271 # Add this vnfr to the list for show, or single traversal
1272 self._vnfr_list.append(vnfr)
1273 else:
1274 self._log.warning("NSR(%s) : VNF Record for name=%s, id=%s already exists, overwriting", self._nsr_id, vnfr['short_name'], vnfr['id'])
1275
1276 # Make vnfr available by id as well as by name
1277 unique_name = get_vnf_unique_name(self.nsr_name, vnfr['short_name'], vnfr['member_vnf_index_ref'])
1278 self._vnfr_dict[unique_name] = vnfr
1279 self._vnfr_dict[vnfr['id']] = vnfr
1280
1281 # Create vnf_cfg dictionary with default values
1282 vnf_cfg = {
1283 'nsr_obj' : self,
1284 'vnfr' : vnfr,
1285 'agent_vnfr' : self.agent_nsr.add_vnfr(vnfr, vnfr_msg),
1286 'nsr_name' : self.nsr_name,
1287 'nsr_id' : self._nsr_id,
1288 'vnfr_name' : vnfr['short_name'],
1289 'member_vnf_index' : vnfr['member_vnf_index_ref'],
1290 'port' : 0,
1291 'username' : 'admin',
1292 'password' : 'admin',
1293 'config_method' : 'None',
1294 'protocol' : 'None',
1295 'mgmt_ip_address' : '0.0.0.0',
1296 'cfg_file' : 'None',
1297 'cfg_retries' : 0,
1298 'script_type' : 'bash',
1299 }
1300
1301 # Update the mgmt ip address
1302 # In case the config method is none, this is not
1303 # updated later
1304 try:
1305 vnf_cfg['mgmt_ip_address'] = vnfr_msg.mgmt_interface.ip_address
1306 vnf_cfg['port'] = vnfr_msg.mgmt_interface.port
1307 except Exception as e:
1308 self._log.warn(
1309 "VNFR {}({}), unable to retrieve mgmt ip address: {}".
1310 format(vnfr['short_name'], vnfr['id'], e))
1311
1312 vnfr['vnf_cfg'] = vnf_cfg
1313 self.find_or_create_vnfr_cm_state(vnf_cfg)
1314
1315 '''
1316 Build the connection-points list for this VNF (self._cp_dict)
1317 '''
1318 # Populate global CP list self._cp_dict from VNFR
1319 cp_list = []
1320 if 'connection_point' in vnfr:
1321 cp_list = vnfr['connection_point']
1322
1323 self._cp_dict[vnfr['member_vnf_index_ref']] = {}
1324 if 'vdur' in vnfr:
1325 for vdur in vnfr['vdur']:
1326 if 'internal_connection_point' in vdur:
1327 cp_list += vdur['internal_connection_point']
1328
1329 for cp_item_dict in cp_list:
1330 # Populate global dictionary
1331 self._cp_dict[
1332 cp_item_dict['name']
1333 ] = cp_item_dict['ip_address']
1334
1335 # Populate unique member specific dictionary
1336 self._cp_dict[
1337 vnfr['member_vnf_index_ref']
1338 ][
1339 cp_item_dict['name']
1340 ] = cp_item_dict['ip_address']
1341
1342 # Fill in the subnets from vlr
1343 if 'vlr_ref' in cp_item_dict:
1344 ### HACK: Internal connection_point do not have VLR reference
1345 yield from populate_subnets_from_vlr(cp_item_dict['vlr_ref'])
1346
1347 if 'internal_vlr' in vnfr:
1348 for ivlr in vnfr['internal_vlr']:
1349 yield from populate_subnets_from_vlr(ivlr['vlr_ref'])
1350
1351 # Update vnfr
1352 vnf_cfg['agent_vnfr']._vnfr = vnfr
1353 return vnf_cfg['agent_vnfr']
1354
1355
1356 class XPaths(object):
1357 @staticmethod
1358 def nsr_opdata(k=None):
1359 return ("D,/nsr:ns-instance-opdata/nsr:nsr" +
1360 ("[nsr:ns-instance-config-ref='{}']".format(k) if k is not None else ""))
1361
1362 @staticmethod
1363 def nsd_msg(k=None):
1364 return ("C,/project-nsd:nsd-catalog/project-nsd:nsd" +
1365 "[project-nsd:id = '{}']".format(k) if k is not None else "")
1366
1367 @staticmethod
1368 def vnfr_opdata(k=None):
1369 return ("D,/vnfr:vnfr-catalog/vnfr:vnfr" +
1370 ("[vnfr:id='{}']".format(k) if k is not None else ""))
1371
1372 @staticmethod
1373 def vnfd(k=None):
1374 return ("C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd" +
1375 ("[project-vnfd:id='{}']".format(k) if k is not None else ""))
1376
1377 @staticmethod
1378 def config_agent(k=None):
1379 return ("D,/rw-config-agent:config-agent/rw-config-agent:account" +
1380 ("[rw-config-agent:name='{}']".format(k) if k is not None else ""))
1381
1382 @staticmethod
1383 def nsr_config(k=None):
1384 return ("C,/nsr:ns-instance-config/nsr:nsr[nsr:id='{}']".format(k) if k is not None else "")
1385
1386 @staticmethod
1387 def vlr(k=None):
1388 return ("D,/vlr:vlr-catalog/vlr:vlr[vlr:id='{}']".format(k) if k is not None else "")
1389
1390 class ConfigManagerDTS(object):
1391 ''' This class either reads from DTS or publishes to DTS '''
1392
1393 def __init__(self, log, loop, parent, dts, project):
1394 self._log = log
1395 self._loop = loop
1396 self._parent = parent
1397 self._dts = dts
1398 self._project = project
1399
1400 @asyncio.coroutine
1401 def _read_dts(self, path, do_trace=False):
1402 xpath = self._project.add_project(path)
1403 self._log.debug("_read_dts path = %s", xpath)
1404 flags = rwdts.XactFlag.MERGE
1405 res_iter = yield from self._dts.query_read(
1406 xpath, flags=flags
1407 )
1408
1409 results = []
1410 try:
1411 for i in res_iter:
1412 result = yield from i
1413 if result is not None:
1414 results.append(result.result)
1415 except:
1416 pass
1417
1418 return results
1419
1420
1421 @asyncio.coroutine
1422 def get_nsr(self, id):
1423 self._log.debug("Attempting to get NSR: %s", id)
1424 nsrl = yield from self._read_dts(XPaths.nsr_opdata(id), False)
1425 nsr = None
1426 if len(nsrl) > 0:
1427 nsr = nsrl[0].as_dict()
1428 return nsr
1429
1430 @asyncio.coroutine
1431 def get_nsr_config(self, id):
1432 self._log.debug("Attempting to get config NSR: %s", id)
1433 nsrl = yield from self._read_dts(XPaths.nsr_config(id), False)
1434 nsr = None
1435 if len(nsrl) > 0:
1436 nsr = nsrl[0]
1437 return nsr
1438
1439 @asyncio.coroutine
1440 def get_nsd_msg(self, id):
1441 self._log.debug("Attempting to get NSD: %s", id)
1442 nsdl = yield from self._read_dts(XPaths.nsd_msg(id), False)
1443 nsd_msg = None
1444 if len(nsdl) > 0:
1445 nsd_msg = nsdl[0]
1446 return nsd_msg
1447
1448 @asyncio.coroutine
1449 def get_nsd(self, nsr_id):
1450 self._log.debug("Attempting to get NSD for NSR: %s", id)
1451 nsr_config = yield from self.get_nsr_config(nsr_id)
1452 nsd_msg = nsr_config.nsd
1453 return nsd_msg
1454
1455 @asyncio.coroutine
1456 def get_vnfr(self, id):
1457 self._log.debug("Attempting to get VNFR: %s", id)
1458 vnfrl = yield from self._read_dts(XPaths.vnfr_opdata(id), do_trace=False)
1459 vnfr_msg = None
1460 if len(vnfrl) > 0:
1461 vnfr_msg = vnfrl[0]
1462 return vnfr_msg
1463
1464 @asyncio.coroutine
1465 def get_vnfd(self, vnfd_id):
1466 self._log.debug("Attempting to get VNFD: %s", vnfd_id)
1467 vnfdl = yield from self._read_dts(XPaths.vnfd(vnfd_id), do_trace=False)
1468 vnfd_msg = None
1469 if len(vnfdl) > 0:
1470 vnfd_msg = vnfdl[0]
1471 return vnfd_msg
1472
1473 @asyncio.coroutine
1474 def get_vlr(self, id):
1475 self._log.debug("Attempting to get VLR subnet: %s", id)
1476 vlrl = yield from self._read_dts(XPaths.vlr(id), do_trace=True)
1477 vlr_msg = None
1478 if len(vlrl) > 0:
1479 vlr_msg = vlrl[0]
1480 return vlr_msg
1481
1482 @asyncio.coroutine
1483 def get_config_agents(self, name):
1484 self._log.debug("Attempting to get config_agents: %s", name)
1485 cfgagentl = yield from self._read_dts(XPaths.config_agent(name), False)
1486 return cfgagentl
1487
1488 @asyncio.coroutine
1489 def update(self, xpath, msg, flags=rwdts.XactFlag.REPLACE):
1490 """
1491 Update a cm-state (cm-nsr) record in DTS with the path and message
1492 """
1493 path = self._project.add_project(xpath)
1494 self._log.debug("Updating cm-state %s:%s dts_pub_hdl = %s", path, msg, self.dts_pub_hdl)
1495 self.dts_pub_hdl.update_element(path, msg, flags)
1496 self._log.debug("Updated cm-state, %s:%s", path, msg)
1497
1498 @asyncio.coroutine
1499 def delete(self, xpath):
1500 """
1501 Delete cm-nsr record in DTS with the path only
1502 """
1503 path = self._project.add_project(xpath)
1504 self._log.debug("Deleting cm-nsr %s dts_pub_hdl = %s", path, self.dts_pub_hdl)
1505 self.dts_pub_hdl.delete_element(path)
1506 self._log.debug("Deleted cm-nsr, %s", path)
1507
1508 @asyncio.coroutine
1509 def register(self):
1510 yield from self.register_to_publish()
1511 yield from self.register_for_nsr()
1512
1513 def deregister(self):
1514 self._log.debug("De-registering conman config for project {}".
1515 format(self._project.name))
1516 if self.dts_reg_hdl:
1517 self.dts_reg_hdl.deregister()
1518 self.dts_reg_hdl = None
1519
1520 if self.dts_pub_hdl:
1521 self.dts_pub_hdl.deregister()
1522 self.dts_pub_hdl = None
1523
1524 @asyncio.coroutine
1525 def register_to_publish(self):
1526 ''' Register to DTS for publishing cm-state opdata '''
1527
1528 xpath = self._project.add_project("D,/rw-conman:cm-state/rw-conman:cm-nsr")
1529 self._log.debug("Registering to publish cm-state @ %s", xpath)
1530 hdl = rift.tasklets.DTS.RegistrationHandler()
1531 with self._dts.group_create() as group:
1532 self.dts_pub_hdl = group.register(xpath=xpath,
1533 handler=hdl,
1534 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ)
1535
1536 @property
1537 def nsr_xpath(self):
1538 return self._project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr")
1539
1540 @asyncio.coroutine
1541 def register_for_nsr(self):
1542 """ Register for NSR changes """
1543
1544 @asyncio.coroutine
1545 def on_prepare(xact_info, query_action, ks_path, msg):
1546 """ This NSR is created """
1547 self._log.debug("Received NSR instantiate on_prepare (%s:%s:%s)",
1548 query_action,
1549 ks_path,
1550 msg)
1551
1552 if (query_action == rwdts.QueryAction.UPDATE or
1553 query_action == rwdts.QueryAction.CREATE):
1554 msg_dict = msg.as_dict()
1555 # Update Each NSR/VNFR state)
1556 if ('operational_status' in msg_dict and
1557 msg_dict['operational_status'] == 'running'):
1558 # Add to the task list
1559 self._parent.add_to_pending_tasks({'nsrid' : msg_dict['ns_instance_config_ref'], 'retries' : 5})
1560 elif query_action == rwdts.QueryAction.DELETE:
1561 nsr_id = msg.ns_instance_config_ref
1562 asyncio.ensure_future(self._parent.terminate_NSR(nsr_id), loop=self._loop)
1563 else:
1564 raise NotImplementedError(
1565 "%s action on cm-state not supported",
1566 query_action)
1567
1568 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1569
1570 try:
1571 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
1572 self.dts_reg_hdl = yield from self._dts.register(self.nsr_xpath,
1573 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1574 handler=handler)
1575 except Exception as e:
1576 self._log.error("Failed to register for NSR changes as %s", str(e))
1577
1578