Revert "Full Juju Charm support"
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / rwconman_config.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import gi
20 import os
21 import stat
22 import subprocess
23 import sys
24 import tempfile
25 import yaml
26
27 from gi.repository import (
28 RwDts as rwdts,
29 RwConmanYang as conmanY,
30 ProtobufC,
31 )
32 gi.require_version('RwKeyspec', '1.0')
33 from gi.repository.RwKeyspec import quoted_key
34
35 import rift.tasklets
36 import rift.package.script
37 import rift.package.store
38
39 from . import rwconman_conagent as conagent
40 from . import RiftCM_rpc
41 from . import riftcm_config_plugin
42
43
44 if sys.version_info < (3, 4, 4):
45 asyncio.ensure_future = asyncio.async
46
47 def get_vnf_unique_name(nsr_name, vnfr_name, member_vnf_index):
48 return "{}.{}.{}".format(nsr_name, vnfr_name, member_vnf_index)
49
50
51 class ConmanConfigError(Exception):
52 pass
53
54
55 class InitialConfigError(ConmanConfigError):
56 pass
57
58
59 class ScriptNotFoundError(InitialConfigError):
60 pass
61
62
63 def log_this_vnf(vnf_cfg):
64 log_vnf = ""
65 used_item_list = ['nsr_name', 'vnfr_name', 'member_vnf_index', 'mgmt_ip_address']
66 for item in used_item_list:
67 if item in vnf_cfg:
68 if item == 'mgmt_ip_address':
69 log_vnf += "({})".format(vnf_cfg[item])
70 else:
71 log_vnf += "{}/".format(vnf_cfg[item])
72 return log_vnf
73
74 class PretendNsm(object):
75 def __init__(self, dts, log, loop, parent):
76 self._dts = dts
77 self._log = log
78 self._loop = loop
79 self._parent = parent
80 self._nsrs = {}
81 self._nsr_dict = parent._nsr_dict
82 self._config_agent_plugins = []
83 self._nsd_msg = {}
84
85 @property
86 def nsrs(self):
87 # Expensive, instead use get_nsr, if you know id.
88 self._nsrs = {}
89 # Update the list of nsrs (agent nsr)
90 for id, nsr_obj in self._nsr_dict.items():
91 self._nsrs[id] = nsr_obj.agent_nsr
92 return self._nsrs
93
94 def get_nsr(self, nsr_id):
95 if nsr_id in self._nsr_dict:
96 nsr_obj = self._nsr_dict[nsr_id]
97 return nsr_obj._nsr
98 return None
99
100 def get_vnfr_msg(self, vnfr_id, nsr_id=None):
101 self._log.debug("get_vnfr_msg(vnfr=%s, nsr=%s)",
102 vnfr_id, nsr_id)
103 found = False
104 if nsr_id:
105 if nsr_id in self._nsr_dict:
106 nsr_obj = self._nsr_dict[nsr_id]
107 if vnfr_id in nsr_obj._vnfr_dict:
108 found = True
109 else:
110 for nsr_obj in self._nsr_dict.values():
111 if vnfr_id in nsr_obj._vnfr_dict:
112 # Found it
113 found = True
114 break
115 if found:
116 vnf_cfg = nsr_obj._vnfr_dict[vnfr_id]['vnf_cfg']
117 return vnf_cfg['agent_vnfr'].vnfr_msg
118 else:
119 return None
120
121 @asyncio.coroutine
122 def get_nsd(self, nsr_id):
123 if nsr_id not in self._nsd_msg:
124 nsr_config = yield from self._parent.cmdts_obj.get_nsr_config(nsr_id)
125 self._nsd_msg[nsr_id] = nsr_config.nsd
126 return self._nsd_msg[nsr_id]
127
128 @property
129 def config_agent_plugins(self):
130 self._config_agent_plugins = []
131 for agent in self._parent._config_agent_mgr._plugin_instances.values():
132 self._config_agent_plugins.append(agent)
133 return self._config_agent_plugins
134
135 class ConfigManagerConfig(object):
136 def __init__(self, dts, log, loop, parent):
137 self._dts = dts
138 self._log = log
139 self._loop = loop
140 self._parent = parent
141 self._project = parent._project
142
143 self._nsr_dict = {}
144 self.pending_cfg = {}
145 self.terminate_cfg = {}
146 self.pending_tasks = [] # User for NSRid get retry
147 # (mainly excercised at restart case)
148
149 self._opdata_xpath = self._project.add_project("D,/rw-conman:cm-state")
150
151 # Initialize cm-state
152 self.cm_state = {}
153 self.cm_state['cm_nsr'] = []
154 self.cm_state['states'] = "Initialized"
155
156 # Initialize objects to register
157 self.cmdts_obj = ConfigManagerDTS(self._log, self._loop, self, self._dts, self._project)
158 self._config_agent_mgr = conagent.RiftCMConfigAgent(
159 self._dts,
160 self._log,
161 self._loop,
162 self,
163 )
164
165 self.riftcm_rpc_handler = RiftCM_rpc.RiftCMRPCHandler(self._dts, self._log, self._loop, self._project,
166 PretendNsm(
167 self._dts, self._log, self._loop, self))
168
169 self.reg_handles = [
170 self.cmdts_obj,
171 self._config_agent_mgr,
172 self.riftcm_rpc_handler
173 ]
174 self._op_reg = None
175
176 def is_nsr_valid(self, nsr_id):
177 if nsr_id in self._nsr_dict:
178 return True
179 return False
180
181 def add_to_pending_tasks(self, task):
182 if self.pending_tasks:
183 for p_task in self.pending_tasks:
184 if (p_task['nsrid'] == task['nsrid']) and \
185 (p_task['event'] == task['event']):
186 # Already queued
187 return
188 try:
189 self.pending_tasks.append(task)
190 self._log.debug("add_to_pending_tasks (nsrid:%s)",
191 task['nsrid'])
192 if len(self.pending_tasks) >= 1:
193 self._loop.create_task(self.ConfigManagerConfig_pending_loop())
194 # TBD - change to info level
195 self._log.debug("Started pending_loop!")
196
197 except Exception as e:
198 self._log.error("Failed adding to pending tasks (%s)", str(e))
199
200 def del_from_pending_tasks(self, task):
201 try:
202 self.pending_tasks.remove(task)
203 except Exception as e:
204 self._log.error("Failed removing from pending tasks (%s)", str(e))
205
206 @asyncio.coroutine
207 def ConfigManagerConfig_pending_loop(self):
208 loop_sleep = 2
209 while True:
210 yield from asyncio.sleep(loop_sleep, loop=self._loop)
211 """
212 This pending task queue is ordred by events,
213 must finish previous task successfully to be able to go on to the next task
214 """
215 if self.pending_tasks:
216 self._log.debug("self.pending_tasks len=%s", len(self.pending_tasks))
217 task = self.pending_tasks.pop(0)
218 done = False
219 if 'nsrid' in task:
220 nsrid = task['nsrid']
221 self._log.debug("Will execute pending task for NSR id: %s", nsrid)
222 try:
223 # Try to configure this NSR
224 task['retries'] -= 1
225 done = yield from self.config_NSR(nsrid, task['event'])
226 self._log.info("self.config_NSR status=%s", done)
227
228 except Exception as e:
229 self._log.error("Failed(%s) configuring NSR(%s) for task %s," \
230 "retries remained:%d!",
231 str(e), nsrid, task['event'] , task['retries'])
232 self._log.exception(e)
233 if task['event'] == 'terminate':
234 # Ignore failure
235 done = True
236
237 if done:
238 self._log.debug("Finished pending task NSR id: %s", nsrid)
239 else:
240 self._log.error("Failed configuring NSR(%s), retries remained:%d!",
241 nsrid, task['retries'])
242
243 # Failed, re-insert (append at the end)
244 # this failed task to be retried later
245 # If any retries remained.
246 if task['retries']:
247 self.pending_tasks.append(task)
248 else:
249 self._log.debug("Stopped pending_loop!")
250 break
251
252 @asyncio.coroutine
253 def register(self):
254 yield from self.register_cm_state_opdata()
255
256 # Initialize all handles that needs to be registered
257 for reg in self.reg_handles:
258 yield from reg.register()
259
260 def deregister(self):
261 # De-register all reg handles
262 self._log.debug("De-register ConfigManagerConfig for project {}".
263 format(self._project))
264
265 for reg in self.reg_handles:
266 reg.deregister()
267 reg = None
268
269 self._op_reg.delete_element(self._opdata_xpath)
270 self._op_reg.deregister()
271
272 @asyncio.coroutine
273 def register_cm_state_opdata(self):
274
275 def state_to_string(state):
276 state_dict = {
277 conmanY.RecordState.INIT : "init",
278 conmanY.RecordState.RECEIVED : "received",
279 conmanY.RecordState.CFG_PROCESS : "cfg_process",
280 conmanY.RecordState.CFG_PROCESS_FAILED : "cfg_process_failed",
281 conmanY.RecordState.CFG_SCHED : "cfg_sched",
282 conmanY.RecordState.CONNECTING : "connecting",
283 conmanY.RecordState.FAILED_CONNECTION : "failed_connection",
284 conmanY.RecordState.CFG_SEND : "cfg_send",
285 conmanY.RecordState.CFG_FAILED : "cfg_failed",
286 conmanY.RecordState.READY_NO_CFG : "ready_no_cfg",
287 conmanY.RecordState.READY : "ready",
288 conmanY.RecordState.TERMINATE : "terminate",
289 }
290 return state_dict[state]
291
292 @asyncio.coroutine
293 def on_prepare(xact_info, action, ks_path, msg):
294
295 self._log.debug("Received cm-state: msg=%s, action=%s", msg, action)
296
297 if action == rwdts.QueryAction.READ:
298 self._log.debug("Responding to SHOW cm-state: %s", self.cm_state)
299 show_output = conmanY.YangData_RwProject_Project_CmState()
300 show_output.from_dict(self.cm_state)
301 xact_info.respond_xpath(rwdts.XactRspCode.ACK,
302 xpath=self._opdata_xpath,
303 msg=show_output)
304 else:
305 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
306
307 self._log.info("Registering for cm-opdata xpath: %s",
308 self._opdata_xpath)
309
310 try:
311 handler=rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
312 self._op_reg = yield from self._dts.register(xpath=self._opdata_xpath,
313 handler=handler,
314 flags=rwdts.Flag.PUBLISHER)
315 self._log.info("Successfully registered for opdata(%s)", self._opdata_xpath)
316 except Exception as e:
317 self._log.error("Failed to register for opdata as (%s)", e)
318
319 def get_config_method(self, vnf_config):
320 cfg_types = ['juju', 'script']
321 for method in cfg_types:
322 if method in vnf_config:
323 return method
324 return None
325
326 @asyncio.coroutine
327 def process_nsd_vnf_configuration(self, nsr_obj, vnfr):
328
329 # Get vnf_configuration from vnfr
330 vnf_config = vnfr['vnf_configuration']
331
332 # Save some parameters needed as short cuts in flat structure (Also generated)
333 vnf_cfg = vnfr['vnf_cfg']
334 # Prepare unique name for this VNF
335 vnf_cfg['vnf_unique_name'] = get_vnf_unique_name(
336 vnf_cfg['nsr_name'], vnfr['name'], vnfr['member_vnf_index_ref'])
337
338 self._log.debug("vnf_configuration = %s", vnf_config)
339
340 method = self.get_config_method(vnf_config)
341
342 if method is not None:
343 self._log.debug("config method=%s", method)
344 vnf_cfg['config_method'] = method
345
346 # Set config agent based on method
347 self._config_agent_mgr.set_config_agent(
348 nsr_obj.agent_nsr, vnf_cfg['agent_vnfr'], method)
349 else:
350 self._log.info("VNF:(%s) is not to be configured by Configuration Manager!",
351 log_this_vnf(vnfr['vnf_cfg']))
352 yield from nsr_obj.update_vnf_cm_state(vnfr, conmanY.RecordState.READY_NO_CFG)
353
354 # Update the cm-state
355 nsr_obj.populate_cm_state_from_vnf_cfg()
356
357 @asyncio.coroutine
358 def update_config_primitives(self, nsr_obj):
359
360 # Process all config-primitives in the member VNFs
361 for vnfr in nsr_obj.vnfrs:
362 vnfd = vnfr['vnf_cfg']['agent_vnfr'].vnfd
363
364 try:
365 prims = vnfd.vnf_configuration.config_primitive
366 if not prims:
367 self._log.debug("VNFR {} with VNFD {} has no config primitives defined".
368 format(vnfr['name'], vnfd.name))
369 return
370 except AttributeError as e:
371 self._log.error("No config primitives found on VNFR {} ({})".
372 format(vnfr['name'], vnfd.name))
373 continue
374
375 cm_state = nsr_obj.find_vnfr_cm_state(vnfr['id'])
376 srcs = cm_state['config_parameter']['config_parameter_source']
377 reqs = cm_state['config_parameter']['config_parameter_request']
378
379 vnf_configuration = vnfd.vnf_configuration.as_dict()
380 vnf_configuration['config_primitive'] = []
381
382 for prim in prims:
383 confp = prim.as_dict()
384 if 'parameter' not in confp:
385 continue
386
387 for param in confp['parameter']:
388 # First check the param in capabilities
389 found = False
390 for src in srcs:
391 for p in src['parameter']:
392 if (p['config_primitive_ref'] == confp['name']) \
393 and (p['parameter_ref'] == param['name']):
394 param['default_value'] = src['value']
395 found = True
396 break
397 if found:
398 break
399
400 if not found:
401 for req in reqs:
402 for p in req['parameter']:
403 if (p['config_primitive_ref'] == confp['name']) \
404 and (p['parameter_ref'] == param['name']):
405 param['default_value'] = req['value']
406 found = True
407 break
408 if found:
409 break
410
411 self._log.debug("Config primitive: {}".format(confp))
412 vnf_configuration['config_primitive'].append(confp)
413
414 cm_state['vnf_configuration'] = vnf_configuration
415
416 @asyncio.coroutine
417 def get_resolved_xpath(self, xpath, name, vnf_name, xpath_prefix):
418 # For now, use DTS to resolve the path
419 # TODO (pjoseph): Add better xpath support
420
421 dts_path = xpath
422 if xpath.startswith('../'):
423 prefix = xpath_prefix
424 xp = xpath
425 while xp.startswith('../'):
426 idx = prefix.rfind('/')
427 if idx == -1:
428 raise ValueError("VNF {}, Did not find the xpath specified: {}".
429 format(vnf_name, xpath))
430 prefix = prefix[:idx]
431 xp = xp[3:]
432
433 dts_path = prefix + '/' + xp
434
435 elif xpath.startswith('/'):
436 dts_path = 'C,' + xpath
437 elif xpath.startswith('C,/') or xpath.startswith('D,/'):
438 dts_path = xpath
439 else:
440 self._log.error("Invalid xpath {} for source {} in VNF {}".
441 format(xpath, name, vnf_name))
442 raise ValueError("Descriptor xpath {} in source {} for VNF {} "
443 "is invalid".
444 format(xpath, name, vnf_name))
445
446 dts_path = self._project.add_project(dts_path)
447 return dts_path
448
449 @asyncio.coroutine
450 def resolve_xpath(self, xpath, name, vnfd):
451 xpath_prefix = "C,/project-vnfd:vnfd-catalog/vnfd[id={}]/config-parameter" \
452 "/config-parameter-source[name={}]" \
453 "/descriptor".format(quoted_key(vnfd.id), quoted_key(name))
454
455 dts_path = yield from self.get_resolved_xpath(xpath, name,
456 vnfd.name, xpath_prefix)
457 idx = dts_path.rfind('/')
458 if idx == -1:
459 raise ValueError("VNFD {}, descriptor xpath {} should point to " \
460 "an attribute".format(vnfd.name, xpath))
461
462 attr = dts_path[idx+1:]
463 dts_path = dts_path[:idx]
464 self._log.debug("DTS path: {}, attribute: {}".format(dts_path, attr))
465
466 resp = yield from self.cmdts_obj.get_xpath(dts_path)
467 if resp is None:
468 raise ValueError("Xpath {} in capability {} for VNFD {} is not found".
469 format(xpath, name, vnfd.name))
470 self._log.debug("DTS response: {}".format(resp.as_dict()))
471
472 try:
473 val = getattr(resp, attr)
474 except AttributeError as e:
475 self._log.error("Did not find attribute : {}".format(attr))
476 try:
477 val = getattr(resp, attr.replace('-', '_'))
478 except AttributeError as e:
479 raise ValueError("Did not find attribute {} in XPath {} "
480 "for capability {} in VNF {}".
481 format(attr, dts_path, vnfd.name))
482
483 self._log.debug("XPath {}: {}".format(xpath, val))
484 return val
485
486 @asyncio.coroutine
487 def resolve_attribute(self, attribute, name, vnfd, vnfr):
488 idx = attribute.rfind(',')
489 if idx == -1:
490 raise ValueError ("Invalid attribute {} for capability {} in "
491 "VNFD specified".
492 format(attribute, name, vnfd.name))
493 xpath = attribute[:idx].strip()
494 attr = attribute[idx+1:].strip()
495 self._log.debug("Attribute {}, {}".format(xpath, attr))
496 if xpath.startswith('C,/'):
497 raise ValueError("Attribute {} for capability {} in VNFD cannot "
498 "be a config".
499 format(attribute, name, vnfd.name))
500
501 xpath_prefix = "D,/vnfr:vnfr-catalog/vnfr[id={}]/config_parameter" \
502 "/config-parameter-source[name={}]" \
503 "/attribute".format(quoted_key(vnfr['id']), quoted_key(name))
504 dts_path = yield from self.get_resolved_xpath(xpath, name,
505 vnfr['name'],
506 xpath_prefix)
507 self._log.debug("DTS query: {}".format(dts_path))
508
509 resp = yield from self.cmdts_obj.get_xpath(dts_path)
510 if resp is None:
511 raise ValueError("Attribute {} in request {} for VNFD {} is " \
512 "not found".
513 format(xpath, name, vnfd.name))
514 self._log.debug("DTS response: {}".format(resp.as_dict()))
515
516 try:
517 val = getattr(resp, attr)
518 except AttributeError as e:
519 self._log.debug("Did not find attribute {}".format(attr))
520 try:
521 val = getattr(resp, attr.replace('-', '_'))
522 except AttributeError as e:
523 raise ValueError("Did not find attribute {} in XPath {} "
524 "for source {} in VNF {}".
525 format(attr, dts_path, vnfd.name))
526
527 self._log.debug("Attribute {}: {}".format(attribute, val))
528 return val
529
530 @asyncio.coroutine
531 def process_vnf_config_parameter(self, nsr_obj):
532 nsd = nsr_obj.agent_nsr.nsd
533
534 # Process all capabilities in all the member VNFs
535 for vnfr in nsr_obj.vnfrs:
536 vnfd = vnfr['vnf_cfg']['agent_vnfr'].vnfd
537
538 try:
539 cparam = vnfd.config_parameter
540 except AttributeError as e:
541 self._log.debug("VNFR {} does not have VNF config parameter".
542 format(vnfr.name))
543 continue
544
545 srcs = []
546 try:
547 srcs = cparam.config_parameter_source
548 except AttributeError as e:
549 self._log.debug("VNFR {} has no source defined".
550 format(vnfr.name))
551
552 # Get the cm state dict for this vnfr
553 cm_state = nsr_obj.find_vnfr_cm_state(vnfr['id'])
554
555 cm_srcs = []
556 for src in srcs:
557 self._log.debug("VNFR {}: source {}".
558 format(vnfr['name'], src.as_dict()))
559
560 param_refs = []
561 for p in src.parameter:
562 param_refs.append({
563 'config_primitive_ref': p.config_primitive_name_ref,
564 'parameter_ref': p.config_primitive_parameter_ref
565 })
566
567 try:
568 val = src.value
569 self._log.debug("Got value {}".format(val))
570 if val:
571 cm_srcs.append({'name': src.name,
572 'value': str(val),
573 'parameter': param_refs})
574 continue
575 except AttributeError as e:
576 pass
577
578 try:
579 xpath = src.descriptor
580 # resolve xpath
581 if xpath:
582 val = yield from self.resolve_xpath(xpath, src.name, vnfd)
583 self._log.debug("Got xpath value: {}".format(val))
584 cm_srcs.append({'name': src.name,
585 'value': str(val),
586 'parameter': param_refs})
587 continue
588 except AttributeError as e:
589 pass
590
591 try:
592 attribute = src.attribute
593 # resolve attribute
594 if attribute:
595 val = yield from self.resolve_attribute(attribute,
596 src.name,
597 vnfd, vnfr)
598 self._log.debug("Got attribute value: {}".format(val))
599 cm_srcs.append({'name': src.name,
600 'value': str(val),
601 'parameter': param_refs})
602 continue
603 except AttributeError as e:
604 pass
605
606 try:
607 prim = src.primitive_ref
608 if prim:
609 raise NotImplementedError("{}: VNF config parameter {}"
610 "source support for config"
611 "primitive not yet supported".
612 format(vnfr.name, prim))
613 except AttributeError as e:
614 pass
615
616 self._log.debug("VNF config parameter sources: {}".format(cm_srcs))
617 cm_state['config_parameter']['config_parameter_source'] = cm_srcs
618
619 try:
620 reqs = cparam.config_parameter_request
621 except AttributeError as e:
622 self._log.debug("VNFR {} has no requests defined".
623 format(vnfr.name))
624 continue
625
626 cm_reqs = []
627 for req in reqs:
628 self._log.debug("VNFR{}: request {}".
629 format(vnfr['name'], req.as_dict()))
630 param_refs = []
631 for p in req.parameter:
632 param_refs.append({
633 'config_primitive_ref': p.config_primitive_name_ref,
634 'parameter_ref': p.config_primitive_parameter_ref
635 })
636 cm_reqs.append({'name': req.name,
637 'parameter': param_refs})
638
639 self._log.debug("VNF requests: {}".format(cm_reqs))
640 cm_state['config_parameter']['config_parameter_request'] = cm_reqs
641
642 # Publish all config parameter for the VNFRs
643 # yield from nsr_obj.publish_cm_state()
644
645 cparam_map = []
646 try:
647 cparam_map = nsd.config_parameter_map
648 except AttributeError as e:
649 self._log.warning("No config parameter map specified for nsr: {}".
650 format(nsr_obj.nsr_name))
651
652 for cp in cparam_map:
653 src_vnfr = nsr_obj.agent_nsr.get_member_vnfr(
654 cp.config_parameter_source.member_vnf_index_ref)
655 cm_state = nsr_obj.find_vnfr_cm_state(src_vnfr.id)
656 if cm_state is None:
657 raise ValueError("Config parameter sources are not defined "
658 "for VNF member {} ({})".
659 format(cp.config_parameter_source.member_vnf_index_ref,
660 src_vnfr.name))
661 srcs = cm_state['config_parameter']['config_parameter_source']
662
663 src_attr = cp.config_parameter_source.config_parameter_source_ref
664 val = None
665 for src in srcs:
666 if src['name'] == src_attr:
667 val = src['value']
668 break
669
670 req_vnfr = nsr_obj.agent_nsr.get_member_vnfr(
671 cp.config_parameter_request.member_vnf_index_ref)
672 req_attr = cp.config_parameter_request.config_parameter_request_ref
673 cm_state = nsr_obj.find_vnfr_cm_state(req_vnfr.id)
674 try:
675 cm_reqs = cm_state['config_parameter']['config_parameter_request']
676 except KeyError as e:
677 raise ValueError("VNFR index {} ({}) has no requests defined".
678 format(cp.config_parameter_reequest.member_vnf_index_ref,
679 req_vnfr['name']))
680
681 for i, item in enumerate(cm_reqs):
682 if item['name'] == req_attr:
683 item['value'] = str(val)
684 cm_reqs[i] = item
685 self._log.debug("Request in VNFR {}: {}".
686 format(req_vnfr.name, item))
687 break
688
689 yield from self.update_config_primitives(nsr_obj)
690
691 # TODO: Confd crashing with the config-parameter publish
692 # So removing config-parameter and publishing cm-state
693 for vnfr in nsr_obj.vnfrs:
694 # Get the cm state dict for this vnfr
695 cm_state = nsr_obj.find_vnfr_cm_state(vnfr['id'])
696 del cm_state['config_parameter']['config_parameter_source']
697 del cm_state['config_parameter']['config_parameter_request']
698
699 # Publish resolved dependencies for the VNFRs
700 yield from nsr_obj.publish_cm_state()
701
702 @asyncio.coroutine
703 def config_NSR(self, id, event):
704
705 cmdts_obj = self.cmdts_obj
706 if event == 'running':
707 self._log.info("Configure NSR running, id = %s", id)
708 try:
709 nsr_obj = None
710 try:
711 if id not in self._nsr_dict:
712 nsr_obj = ConfigManagerNSR(self._log, self._loop, self, self._project, id)
713 self._nsr_dict[id] = nsr_obj
714 else:
715 self._log.info("NSR(%s) is already initialized!", id)
716 nsr_obj = self._nsr_dict[id]
717
718 except Exception as e:
719 self._log.error("Failed creating NSR object for (%s) as (%s)", id, str(e))
720 raise e
721
722 # Try to configure this NSR only if not already processed
723 if nsr_obj.cm_nsr['state'] != nsr_obj.state_to_string(conmanY.RecordState.INIT):
724 self._log.debug("NSR(%s) is already processed, state=%s",
725 nsr_obj.nsr_name, nsr_obj.cm_nsr['state'])
726 # Publish again in case NSM restarted
727 yield from nsr_obj.publish_cm_state()
728 return True
729
730 # Fetch NSR
731 nsr = yield from cmdts_obj.get_nsr(id)
732 self._log.debug("Full NSR : %s", nsr)
733 if nsr['operational_status'] != "running":
734 self._log.info("NSR(%s) is not ready yet!", nsr['nsd_name_ref'])
735 return False
736 self._nsr = nsr
737
738 # Create Agent NSR class
739 nsr_config = yield from cmdts_obj.get_nsr_config(id)
740 self._log.debug("NSR {} config: {}".format(id, nsr_config))
741
742 if nsr_config is None:
743 # The NST Terminate has been initiated before the configuration. Hence
744 # not proceeding with config.
745 self._log.warning("NSR - %s is deleted before Configuration. Not proceeding with configuration.", id)
746 return True
747
748 nsr_obj.agent_nsr = riftcm_config_plugin.RiftCMnsr(nsr, nsr_config,
749 self._project)
750
751 unique_cfg_vnfr_list = list()
752 unique_agent_vnfr_list = list()
753 try:
754 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.RECEIVED)
755
756 nsr_obj.set_nsr_name(nsr['name_ref'])
757 for const_vnfr in nsr['constituent_vnfr_ref']:
758 self._log.debug("Fetching VNFR (%s)", const_vnfr['vnfr_id'])
759 vnfr_msg = yield from cmdts_obj.get_vnfr(const_vnfr['vnfr_id'])
760 if vnfr_msg:
761 vnfr = vnfr_msg.as_dict()
762 self._log.info("create VNF:{}/{} operational status {}".format(nsr_obj.nsr_name, vnfr['name'], vnfr['operational_status']))
763 agent_vnfr = yield from nsr_obj.add_vnfr(vnfr, vnfr_msg)
764 method = self.get_config_method(vnfr['vnf_configuration'])
765 if method is not None:
766 unique_cfg_vnfr_list.append(vnfr)
767 unique_agent_vnfr_list.append(agent_vnfr)
768
769 # Process VNF Cfg
770 # Set up the config agent based on the method
771 yield from self.process_nsd_vnf_configuration(nsr_obj, vnfr)
772 else:
773 self._log.warning("NSR %s, VNFR not found yet (%s)", nsr_obj.nsr_name, const_vnfr['vnfr_id'])
774
775 # Process VNF config parameter
776 yield from self.process_vnf_config_parameter(nsr_obj)
777
778 # Invoke the config agent plugin
779 for agent_vnfr in unique_agent_vnfr_list:
780 yield from self._config_agent_mgr.invoke_config_agent_plugins(
781 'notify_create_vnfr',
782 nsr_obj.agent_nsr,
783 agent_vnfr)
784
785 except Exception as e:
786 self._log.error("Failed processing NSR (%s) as (%s)", nsr_obj.nsr_name, str(e))
787 self._log.exception(e)
788 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_PROCESS_FAILED)
789 raise e
790
791 self._log.debug("Starting to configure each VNF")
792
793 try:
794 for cfg_vnfr in unique_cfg_vnfr_list:
795 # Apply configuration
796 vnf_unique_name = get_vnf_unique_name(
797 nsr_obj.nsr_name,
798 cfg_vnfr['name'],
799 str(cfg_vnfr['member_vnf_index_ref']),
800 )
801
802 # Find vnfr for this vnf_unique_name
803 if vnf_unique_name not in nsr_obj._vnfr_dict:
804 self._log.error("NS (%s) - Can not find VNF to be configured: %s", nsr_obj.nsr_name, vnf_unique_name)
805 else:
806 # Save this unique VNF's config input parameters
807 nsr_obj.ConfigVNF(nsr_obj._vnfr_dict[vnf_unique_name])
808
809 # Now add the entire NS to the pending config list.
810 self._log.info("Scheduling NSR:{} configuration ".format(nsr_obj.nsr_name))
811 self._parent.add_to_pending(nsr_obj, unique_cfg_vnfr_list)
812 self._parent.add_nsr_obj(nsr_obj)
813
814 except Exception as e:
815 self._log.error("Failed processing input parameters for NS (%s) as %s", nsr_obj.nsr_name, str(e))
816 self._log.exception(e)
817 raise
818
819 except Exception as e:
820 self._log.exception(e)
821 if nsr_obj:
822 self._log.error("Failed to configure NS (%s) as (%s)", nsr_obj.nsr_name, str(e))
823 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_PROCESS_FAILED)
824 raise e
825
826 elif event == 'terminate':
827 self._log.info("Configure NSR terminate, id = %s", id)
828 nsr_obj = self._parent.get_nsr_obj(id)
829 if nsr_obj is None:
830 # Can be none if the terminate is called again due to DTS query
831 return True
832
833 try:
834 yield from self.process_ns_terminate_config(nsr_obj, self._project.name)
835 except Exception as e:
836 self._log.warn("Terminate config failed for NSR {}: {}".
837 format(id, e))
838 self._log.exception(e)
839
840 try:
841 yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.TERMINATE)
842 yield from self.terminate_NSR(id)
843 except Exception as e:
844 self._log.error("Terminate failed for NSR {}: {}".
845 format(id, e))
846 self._log.exception(e)
847
848 return True
849
850 @asyncio.coroutine
851 def terminate_NSR(self, id):
852 if id not in self._nsr_dict:
853 self._log.error("NSR(%s) does not exist!", id)
854 return
855 else:
856 nsr_obj = self._nsr_dict[id]
857
858 # Remove this NSR if we have it on pending task list
859 for task in self.pending_tasks:
860 if task['nsrid'] == id:
861 self.del_from_pending_tasks(task)
862
863 # Remove any scheduled configuration event
864 for nsr_obj_p in self._parent.pending_cfg:
865 if nsr_obj_p == nsr_obj:
866 assert id == nsr_obj_p._nsr_id
867 # Mark this as being deleted so we do not try to reconfigure it
868 # if we are in cfg_delay (will wake up and continue to process otherwise)
869 nsr_obj_p.being_deleted = True
870 self._log.info("Removed scheduled configuration for NSR(%s)", nsr_obj.nsr_name)
871
872 # Call Config Agent to clean up for each VNF
873 for agent_vnfr in nsr_obj.agent_nsr.vnfrs:
874 yield from self._config_agent_mgr.invoke_config_agent_plugins(
875 'notify_terminate_vnfr',
876 nsr_obj.agent_nsr,
877 agent_vnfr)
878
879 self._log.info("NSR(%s/%s) is terminated", nsr_obj.nsr_name, id)
880
881 @asyncio.coroutine
882 def delete_NSR(self, id):
883 if id not in self._nsr_dict:
884 self._log.debug("NSR(%s) does not exist!", id)
885 return
886 else:
887 # Remove this NSR if we have it on pending task list
888 for task in self.pending_tasks:
889 if task['nsrid'] == id:
890 self.del_from_pending_tasks(task)
891
892 # Remove this object from global list
893 nsr_obj = self._nsr_dict.pop(id, None)
894
895 # Remove this NS cm-state from global status list
896 self.cm_state['cm_nsr'].remove(nsr_obj.cm_nsr)
897
898 self._parent.remove_nsr_obj(id)
899
900 # publish delete cm-state (cm-nsr)
901 yield from nsr_obj.delete_cm_nsr()
902
903 # Deleting any config jobs for NSR.
904 job_manager = self.riftcm_rpc_handler.job_manager.handler
905 job_manager._terminate_nsr(id)
906
907 #####################TBD###########################
908 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_terminate_ns', self.id)
909
910 self._log.info("NSR(%s/%s) is deleted", nsr_obj.nsr_name, id)
911
912 @asyncio.coroutine
913 def process_initial_config(self, nsr_obj, conf, script, vnfr_name=None):
914 '''Apply the initial-config-primitives specified in NSD or VNFD'''
915
916 def get_input_file(parameters):
917 inp = {}
918
919 # Add NSR name to file
920 inp['nsr_name'] = nsr_obj.nsr_name
921
922 # Add VNFR name if available
923 if vnfr_name:
924 inp['vnfr_name'] = vnfr_name
925
926 # Add parameters for initial config
927 inp['parameter'] = {}
928 for parameter in parameters:
929 try:
930 inp['parameter'][parameter['name']] = parameter['value']
931 except KeyError as e:
932 if vnfr_name:
933 self._log.info("VNFR {} initial config parameter {} with no value: {}".
934 format(vnfr_name, parameter, e))
935 else:
936 self._log.info("NSR {} initial config parameter {} with no value: {}".
937 format(nsr_obj.nsr_name, parameter, e))
938
939
940 # Add config agents specific to each VNFR
941 inp['config-agent'] = {}
942 for vnfr in nsr_obj.agent_nsr.vnfrs:
943 # Get the config agent for the VNFR
944 # If vnfr name is specified, add only CA specific to that
945 if (vnfr_name is None) or \
946 (vnfr_name == vnfr.name):
947 agent = self._config_agent_mgr.get_vnfr_config_agent(vnfr.vnfr_msg)
948 if agent:
949 if agent.agent_type != riftcm_config_plugin.DEFAULT_CAP_TYPE:
950 inp['config-agent'][vnfr.member_vnf_index] = agent.agent_data
951 inp['config-agent'][vnfr.member_vnf_index] \
952 ['service-name'] = agent.get_service_name(vnfr.id)
953
954 # Add vnfrs specific data
955 inp['vnfr'] = {}
956 for vnfr in nsr_obj.vnfrs:
957 v = {}
958
959 v['name'] = vnfr['name']
960 v['mgmt_ip_address'] = vnfr['vnf_cfg']['mgmt_ip_address']
961 v['mgmt_port'] = vnfr['vnf_cfg']['port']
962 v['datacenter'] = vnfr['datacenter']
963
964 if 'dashboard_url' in vnfr:
965 v['dashboard_url'] = vnfr['dashboard_url']
966
967 if 'connection_point' in vnfr:
968 v['connection_point'] = []
969 for cp in vnfr['connection_point']:
970 cp_info = dict(name=cp['name'],
971 ip_address=cp['ip_address'],
972 mac_address=cp.get('mac_address', None),
973 connection_point_id=cp.get('connection_point_id',None))
974
975 if 'virtual_cps' in cp:
976 cp_info['virtual_cps'] = [ {k:v for k,v in vcp.items()
977 if k in ['ip_address', 'mac_address']}
978 for vcp in cp['virtual_cps'] ]
979 v['connection_point'].append(cp_info)
980
981
982 if 'vdur' in vnfr:
983 vdu_data = [(vdu.get('name',None), vdu.get('management_ip',None), vdu.get('vm_management_ip',None), vdu.get('id',None))
984 for vdu in vnfr['vdur']]
985
986 v['vdur'] = [ dict(zip(['name', 'management_ip', 'vm_management_ip', 'id', 'vdu_id_ref'] , data)) for data in vdu_data ]
987
988 inp['vnfr'][vnfr['member_vnf_index_ref']] = v
989
990
991 self._log.debug("Input data for {}: {}".
992 format((vnfr_name if vnfr_name else nsr_obj.nsr_name),
993 inp))
994
995 # Convert to YAML string
996 yaml_string = yaml.dump(inp, default_flow_style=False)
997
998 # Write the inputs as yaml file
999 tmp_file = None
1000 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
1001 tmp_file.write(yaml_string.encode("UTF-8"))
1002 self._log.debug("Input file created for {}: {}".
1003 format((vnfr_name if vnfr_name \
1004 else nsr_obj.nsr_name),
1005 tmp_file.name))
1006
1007 return tmp_file.name
1008
1009 parameters = []
1010 try:
1011 parameters = conf['parameter']
1012 except Exception as e:
1013 self._log.debug("Parameter conf: {}, e: {}".
1014 format(conf, e))
1015
1016 inp_file = get_input_file(parameters)
1017
1018 cmd = "{0} {1}".format(script, inp_file)
1019 self._log.debug("Running the CMD: {}".format(cmd))
1020
1021 process = yield from asyncio.create_subprocess_shell(cmd,
1022 loop=self._loop,
1023 stdout=subprocess.PIPE,
1024 stderr=subprocess.PIPE)
1025 stdout, stderr = yield from process.communicate()
1026 rc = yield from process.wait()
1027
1028 if rc:
1029 msg = "NSR/VNFR {} initial config using {} failed with {}: {}". \
1030 format(vnfr_name if vnfr_name else nsr_obj.nsr_name,
1031 script, rc, stderr)
1032 self._log.error(msg)
1033 raise InitialConfigError(msg)
1034
1035 try:
1036 os.remove(inp_file)
1037 except Exception as e:
1038 self._log.error("Error removing input file {}: {}".
1039 format(inp_file, e))
1040
1041 def get_script_file(self, script_name, d_name, d_id, d_type, project=None):
1042 # Get the full path to the script
1043 script = os.path.join(os.getenv('RIFT_VAR_ROOT'),
1044 'launchpad/packages',
1045 d_type,
1046 project if project else "",
1047 d_id,
1048 'scripts',
1049 script_name)
1050
1051 self._log.debug("Checking for script at %s", script)
1052 if not os.path.exists(script):
1053 err_msg = ("{} {}: Did not find script {} for config".
1054 format(d_type, d_name, script))
1055 self._log.error(err_msg)
1056 raise ScriptNotFoundError(err_msg)
1057
1058 # Seen cases in jenkins, where the script execution fails
1059 # with permission denied. Setting the permission on script
1060 # to make sure it has execute permission
1061 perm = os.stat(script).st_mode
1062 if not (perm & stat.S_IXUSR):
1063 self._log.warning("NSR/VNFR {} script {} " \
1064 "without execute permission: {}".
1065 format(d_name, script, perm))
1066 os.chmod(script, perm | stat.S_IXUSR)
1067 return script
1068
1069 @asyncio.coroutine
1070 def process_ns_initial_config(self, nsr_obj, project=None):
1071 '''Apply the initial-service-primitives specified in NSD'''
1072 nsr = yield from self.cmdts_obj.get_nsr(nsr_obj.nsr_id)
1073 self._log.debug("NS initial config: {}".format(nsr))
1074 if 'initial_service_primitive' not in nsr:
1075 return
1076 if nsr is not None:
1077 nsd = yield from self.cmdts_obj.get_nsd(nsr_obj.nsr_id)
1078 for conf in nsr['initial_service_primitive']:
1079 self._log.debug("NSR {} initial config: {}".
1080 format(nsr_obj.nsr_name, conf))
1081 script = self.get_script_file(conf['user_defined_script'],
1082 nsd.name,
1083 nsd.id,
1084 'nsd',
1085 project
1086 )
1087
1088 yield from self.process_initial_config(nsr_obj, conf, script)
1089
1090 @asyncio.coroutine
1091 def process_vnf_initial_config(self, nsr_obj, vnfr, project=None):
1092 '''Apply the initial-config-primitives specified in VNFD'''
1093 vnfr_name = vnfr.name
1094
1095 vnfd = vnfr.vnfd
1096 vnf_cfg = vnfd.vnf_configuration
1097
1098 for conf in vnf_cfg.initial_config_primitive:
1099 self._log.debug("VNFR {} initial config: {} for vnfd id {}".
1100 format(vnfr_name, conf, vnfd.id))
1101
1102 if not conf.user_defined_script:
1103 self._log.debug("VNFR {} did not find user defined script: {}".
1104 format(vnfr_name, conf))
1105 continue
1106
1107 script = self.get_script_file(conf.user_defined_script,
1108 vnfd.name,
1109 vnfd.id,
1110 'vnfd',
1111 project
1112 )
1113
1114 yield from self.process_initial_config(nsr_obj,
1115 conf.as_dict(),
1116 script,
1117 vnfr_name=vnfr_name)
1118
1119 @asyncio.coroutine
1120 def process_ns_terminate_config(self, nsr_obj, project=None):
1121 '''Apply the terminate-service-primitives specified in NSD'''
1122
1123 nsr = self._nsr
1124 if 'terminate_service_primitive' not in nsr:
1125 return
1126
1127 if nsr is not None:
1128 nsd = nsr_obj.agent_nsr.nsd
1129 for conf in nsr['terminate_service_primitive']:
1130 self._log.debug("NSR {} terminate service: {}".
1131 format(nsr_obj.nsr_name, conf))
1132 script = self.get_script_file(conf['user_defined_script'],
1133 nsd.name,
1134 nsd.id,
1135 'nsd',
1136 project)
1137
1138 try:
1139 yield from self.process_initial_config(nsr_obj, conf, script)
1140
1141 except Exception as e:
1142 # Ignore any failures on terminate
1143 self._log.warning("NSR {} terminate config script {} failed: {}".
1144 format(nsr_obj.nsr_name, script, e))
1145 break
1146
1147
1148 class ConfigManagerNSR(object):
1149 def __init__(self, log, loop, parent, project, id):
1150 self._log = log
1151 self._loop = loop
1152 self._rwcal = None
1153 self._vnfr_dict = {}
1154 self._cp_dict = {}
1155 self._nsr_id = id
1156 self._parent = parent
1157 self._project = project
1158 self._log.info("Instantiated NSR entry for id=%s", id)
1159 self.nsr_cfg_config_attributes_dict = {}
1160 self.vnf_config_attributes_dict = {}
1161 self.num_vnfs_to_cfg = 0
1162 self._vnfr_list = []
1163 self.vnf_cfg_list = []
1164 self.this_nsr_dir = None
1165 self.being_deleted = False
1166 self.dts_obj = self._parent.cmdts_obj
1167
1168 # Initialize cm-state for this NS
1169 self.cm_nsr = {}
1170 self.cm_nsr['cm_vnfr'] = []
1171 self.cm_nsr['id'] = id
1172 self.cm_nsr['state'] = self.state_to_string(conmanY.RecordState.INIT)
1173 self.cm_nsr['state_details'] = None
1174
1175 self.set_nsr_name('Not Set')
1176
1177 # Add this NSR cm-state object to global cm-state
1178 parent.cm_state['cm_nsr'].append(self.cm_nsr)
1179
1180 # Place holders for NSR & VNFR classes
1181 self.agent_nsr = None
1182
1183 @property
1184 def nsr_opdata_xpath(self):
1185 ''' Returns full xpath for this NSR cm-state opdata '''
1186 return self._project.add_project((
1187 "D,/rw-conman:cm-state/rw-conman:cm-nsr[rw-conman:id={}]"
1188 ).format(quoted_key(self._nsr_id)))
1189
1190 @property
1191 def vnfrs(self):
1192 return self._vnfr_list
1193
1194 @property
1195 def parent(self):
1196 return self._parent
1197
1198 @property
1199 def nsr_id(self):
1200 return self._nsr_id
1201
1202 @asyncio.coroutine
1203 def publish_cm_state(self):
1204 ''' This function publishes cm_state for this NSR '''
1205
1206 cm_state = conmanY.YangData_RwProject_Project_CmState()
1207 cm_state_nsr = cm_state.cm_nsr.add()
1208 cm_state_nsr.from_dict(self.cm_nsr)
1209 #with self._dts.transaction() as xact:
1210 yield from self.dts_obj.update(self.nsr_opdata_xpath, cm_state_nsr)
1211 self._log.info("Published cm-state with xpath %s and nsr %s",
1212 self.nsr_opdata_xpath,
1213 cm_state_nsr)
1214
1215 @asyncio.coroutine
1216 def delete_cm_nsr(self):
1217 ''' This function publishes cm_state for this NSR '''
1218
1219 yield from self.dts_obj.delete(self.nsr_opdata_xpath)
1220 self._log.info("Deleted cm-nsr with xpath %s",
1221 self.nsr_opdata_xpath)
1222
1223 def set_nsr_name(self, name):
1224 self.nsr_name = name
1225 self.cm_nsr['name'] = name
1226
1227 def ConfigVNF(self, vnfr):
1228
1229 vnf_cfg = vnfr['vnf_cfg']
1230 vnf_cm_state = self.find_or_create_vnfr_cm_state(vnf_cfg)
1231
1232 if (vnf_cm_state['state'] == self.state_to_string(conmanY.RecordState.READY_NO_CFG)
1233 or
1234 vnf_cm_state['state'] == self.state_to_string(conmanY.RecordState.READY)):
1235 self._log.warning("NS/VNF (%s/%s) is already configured! Skipped.", self.nsr_name, vnfr['name'])
1236 return
1237
1238 #UPdate VNF state
1239 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS)
1240
1241 # Now translate the configuration for iP addresses
1242 try:
1243 # Add cp_dict members (TAGS) for this VNF
1244 self._cp_dict['rw_mgmt_ip'] = vnf_cfg['mgmt_ip_address']
1245 self._cp_dict['rw_username'] = vnf_cfg['username']
1246 self._cp_dict['rw_password'] = vnf_cfg['password']
1247 except Exception as e:
1248 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS_FAILED)
1249 self._log.error("Failed to set tags for VNF: %s with (%s)", log_this_vnf(vnf_cfg), str(e))
1250 return
1251
1252 self._log.info("Applying config to VNF: %s = %s!", log_this_vnf(vnf_cfg), vnf_cfg)
1253 try:
1254 self._log.debug("Scheduled configuration!")
1255 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_SCHED)
1256 except Exception as e:
1257 self._log.error("Failed apply_vnf_config to VNF: %s as (%s)", log_this_vnf(vnf_cfg), str(e))
1258 vnf_cm_state['state'] = self.state_to_string(conmanY.RecordState.CFG_PROCESS_FAILED)
1259 raise
1260
1261 def add(self, nsr):
1262 self._log.info("Adding NS Record for id=%s", id)
1263 self._nsr = nsr
1264
1265 def sample_cm_state(self):
1266 return (
1267 {
1268 'cm_nsr': [
1269 {
1270 'cm_vnfr': [
1271 {
1272 'connection_point': [
1273 {'ip_address': '1.1.1.1', 'name': 'vnf1cp1'},
1274 {'ip_address': '1.1.1.2', 'name': 'vnf1cp2'}
1275 ],
1276 'id': 'vnfrid1',
1277 'mgmt_interface': {'ip_address': '7.1.1.1',
1278 'port': 1001},
1279 'name': 'vnfrname1',
1280 'state': 'init'
1281 },
1282 {
1283 'connection_point': [{'ip_address': '2.1.1.1', 'name': 'vnf2cp1'},
1284 {'ip_address': '2.1.1.2', 'name': 'vnf2cp2'}],
1285 'id': 'vnfrid2',
1286 'mgmt_interface': {'ip_address': '7.1.1.2',
1287 'port': 1001},
1288 'name': 'vnfrname2',
1289 'state': 'init'}
1290 ],
1291 'id': 'nsrid1',
1292 'name': 'nsrname1',
1293 'state': 'init'}
1294 ],
1295 'states': 'Initialized, '
1296 })
1297
1298 def populate_cm_state_from_vnf_cfg(self):
1299 # Fill in each VNFR from this nsr object
1300 vnfr_list = self._vnfr_list
1301 for vnfr in vnfr_list:
1302 vnf_cfg = vnfr['vnf_cfg']
1303 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1304
1305 if vnf_cm_state:
1306 # Fill in VNF management interface
1307 vnf_cm_state['mgmt_interface']['ip_address'] = vnf_cfg['mgmt_ip_address']
1308 vnf_cm_state['mgmt_interface']['port'] = vnf_cfg['port']
1309
1310 # Fill in VNF configuration details
1311 vnf_cm_state['cfg_type'] = vnf_cfg['config_method']
1312
1313 # Fill in each connection-point for this VNF
1314 if "connection_point" in vnfr:
1315 cp_list = vnfr['connection_point']
1316 for cp_item_dict in cp_list:
1317 try:
1318 vnf_cm_state['connection_point'].append(
1319 {
1320 'name' : cp_item_dict['name'],
1321 'ip_address' : cp_item_dict['ip_address'],
1322 'connection_point_id' : cp_item_dict['connection_point_id'],
1323 }
1324 )
1325 except Exception:
1326 # Added to make mano_ut work
1327 pass
1328
1329 def state_to_string(self, state):
1330 state_dict = {
1331 conmanY.RecordState.INIT : "init",
1332 conmanY.RecordState.RECEIVED : "received",
1333 conmanY.RecordState.CFG_PROCESS : "cfg_process",
1334 conmanY.RecordState.CFG_PROCESS_FAILED : "cfg_process_failed",
1335 conmanY.RecordState.CFG_SCHED : "cfg_sched",
1336 conmanY.RecordState.CONNECTING : "connecting",
1337 conmanY.RecordState.FAILED_CONNECTION : "failed_connection",
1338 conmanY.RecordState.CFG_SEND : "cfg_send",
1339 conmanY.RecordState.CFG_FAILED : "cfg_failed",
1340 conmanY.RecordState.READY_NO_CFG : "ready_no_cfg",
1341 conmanY.RecordState.READY : "ready",
1342 conmanY.RecordState.TERMINATE : "terminate",
1343 }
1344 return state_dict[state]
1345
1346 def find_vnfr_cm_state(self, id):
1347 if self.cm_nsr['cm_vnfr']:
1348 for vnf_cm_state in self.cm_nsr['cm_vnfr']:
1349 if vnf_cm_state['id'] == id:
1350 return vnf_cm_state
1351 return None
1352
1353 def find_or_create_vnfr_cm_state(self, vnf_cfg):
1354 vnfr = vnf_cfg['vnfr']
1355 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1356
1357 if vnf_cm_state is None:
1358 # Not found, Create and Initialize this VNF cm-state
1359 vnf_cm_state = {
1360 'id' : vnfr['id'],
1361 'name' : vnfr['name'],
1362 'state' : self.state_to_string(conmanY.RecordState.RECEIVED),
1363 'mgmt_interface' :
1364 {
1365 'ip_address' : vnf_cfg['mgmt_ip_address'],
1366 'port' : vnf_cfg['port'],
1367 },
1368 'connection_point' : [],
1369 'config_parameter' :
1370 {
1371 'config_parameter_source' : [],
1372 'config_parameter_request' : [],
1373 },
1374 }
1375 self.cm_nsr['cm_vnfr'].append(vnf_cm_state)
1376
1377 # Publish newly created cm-state
1378
1379
1380 return vnf_cm_state
1381
1382 @asyncio.coroutine
1383 def get_vnf_cm_state(self, vnfr):
1384 if vnfr:
1385 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1386 if vnf_cm_state:
1387 return vnf_cm_state['state']
1388 return False
1389
1390 @asyncio.coroutine
1391 def update_vnf_cm_state(self, vnfr, state):
1392 if vnfr:
1393 vnf_cm_state = self.find_vnfr_cm_state(vnfr['id'])
1394 if vnf_cm_state is None:
1395 self._log.error("No opdata found for NS/VNF:%s/%s!",
1396 self.nsr_name, vnfr['name'])
1397 return
1398
1399 if vnf_cm_state['state'] != self.state_to_string(state):
1400 old_state = vnf_cm_state['state']
1401 vnf_cm_state['state'] = self.state_to_string(state)
1402 # Publish new state
1403 yield from self.publish_cm_state()
1404 self._log.info("VNF ({}/{}/{}) state change: {} -> {}"
1405 .format(self.nsr_name,
1406 vnfr['name'],
1407 vnfr['member_vnf_index_ref'],
1408 old_state,
1409 vnf_cm_state['state']))
1410
1411 else:
1412 self._log.error("No VNFR supplied for state update (NS=%s)!",
1413 self.nsr_name)
1414
1415 @property
1416 def get_ns_cm_state(self):
1417 return self.cm_nsr['state']
1418
1419 @asyncio.coroutine
1420 def update_ns_cm_state(self, state, state_details=None):
1421 if self.cm_nsr['state'] != self.state_to_string(state):
1422 old_state = self.cm_nsr['state']
1423 self.cm_nsr['state'] = self.state_to_string(state)
1424 self.cm_nsr['state_details'] = state_details if state_details is not None else None
1425 self._log.info("NS ({}) state change: {} -> {}"
1426 .format(self.nsr_name,
1427 old_state,
1428 self.cm_nsr['state']))
1429 # Publish new state
1430 yield from self.publish_cm_state()
1431
1432 @asyncio.coroutine
1433 def add_vnfr(self, vnfr, vnfr_msg):
1434
1435 @asyncio.coroutine
1436 def populate_subnets_from_vlr(id):
1437 try:
1438 # Populate cp_dict with VLR subnet info
1439 vlr = yield from self.dts_obj.get_vlr(id)
1440 if vlr is not None and 'assigned_subnet' in vlr:
1441 subnet = {vlr.name:vlr.assigned_subnet}
1442 self._cp_dict[vnfr['member_vnf_index_ref']].update(subnet)
1443 self._cp_dict.update(subnet)
1444 self._log.debug("VNF:(%s) Updated assigned subnet = %s",
1445 vnfr['name'], subnet)
1446 except Exception as e:
1447 self._log.error("VNF:(%s) VLR Error = %s",
1448 vnfr['name'], e)
1449
1450 if vnfr['id'] not in self._vnfr_dict:
1451 self._log.info("NSR(%s) : Adding VNF Record for name=%s, id=%s", self._nsr_id, vnfr['name'], vnfr['id'])
1452 # Add this vnfr to the list for show, or single traversal
1453 self._vnfr_list.append(vnfr)
1454 else:
1455 self._log.warning("NSR(%s) : VNF Record for name=%s, id=%s already exists, overwriting",
1456 self._nsr_id, vnfr['name'], vnfr['id'])
1457
1458 # Make vnfr available by id as well as by name
1459 unique_name = get_vnf_unique_name(self.nsr_name, vnfr['name'], vnfr['member_vnf_index_ref'])
1460 self._vnfr_dict[unique_name] = vnfr
1461 self._vnfr_dict[vnfr['id']] = vnfr
1462
1463 # Create vnf_cfg dictionary with default values
1464 vnf_cfg = {
1465 'nsr_obj' : self,
1466 'vnfr' : vnfr,
1467 'agent_vnfr' : self.agent_nsr.add_vnfr(vnfr, vnfr_msg),
1468 'nsr_name' : self.nsr_name,
1469 'nsr_id' : self._nsr_id,
1470 'vnfr_name' : vnfr['name'],
1471 'member_vnf_index' : vnfr['member_vnf_index_ref'],
1472 'port' : 0,
1473 'username' : '@rift',
1474 'password' : 'rift',
1475 'config_method' : 'None',
1476 'protocol' : 'None',
1477 'mgmt_ip_address' : '0.0.0.0',
1478 'cfg_file' : 'None',
1479 'cfg_retries' : 0,
1480 'script_type' : 'bash',
1481 }
1482
1483 ##########################
1484 # Update the mgmt ip address
1485 # In case the config method is none, this is not
1486 # updated later
1487 try:
1488 vnf_cfg['mgmt_ip_address'] = vnfr_msg.mgmt_interface.ip_address
1489 vnf_cfg['port'] = vnfr_msg.mgmt_interface.port
1490 except Exception as e:
1491 self._log.warn(
1492 "VNFR {}({}), unable to retrieve mgmt ip address: {}".
1493 format(vnfr['name'], vnfr['id'], e))
1494
1495 vnfr['vnf_cfg'] = vnf_cfg
1496 self.find_or_create_vnfr_cm_state(vnf_cfg)
1497
1498 '''
1499 Build the connection-points list for this VNF (self._cp_dict)
1500 '''
1501 # Populate global CP list self._cp_dict from VNFR
1502 cp_list = []
1503 if 'connection_point' in vnfr:
1504 cp_list = vnfr['connection_point']
1505
1506 self._cp_dict[vnfr['member_vnf_index_ref']] = {}
1507 if 'vdur' in vnfr:
1508 for vdur in vnfr['vdur']:
1509 if 'internal_connection_point' in vdur:
1510 cp_list += vdur['internal_connection_point']
1511
1512 for cp_item_dict in cp_list:
1513 if 'ip_address' not in cp_item_dict:
1514 self._log.error("connection point {} doesnot have an ip address assigned ".
1515 format(cp_item_dict['name']))
1516 continue
1517 # Populate global dictionary
1518 self._cp_dict[
1519 cp_item_dict['name']
1520 ] = cp_item_dict['ip_address']
1521
1522 # Populate unique member specific dictionary
1523 self._cp_dict[
1524 vnfr['member_vnf_index_ref']
1525 ][
1526 cp_item_dict['name']
1527 ] = cp_item_dict['ip_address']
1528
1529 # Fill in the subnets from vlr
1530 if 'vlr_ref' in cp_item_dict:
1531 ### HACK: Internal connection_point do not have VLR reference
1532 yield from populate_subnets_from_vlr(cp_item_dict['vlr_ref'])
1533
1534 if 'internal_vlr' in vnfr:
1535 for ivlr in vnfr['internal_vlr']:
1536 yield from populate_subnets_from_vlr(ivlr['vlr_ref'])
1537
1538 # Update vnfr
1539 vnf_cfg['agent_vnfr']._vnfr = vnfr
1540 return vnf_cfg['agent_vnfr']
1541
1542
1543 class XPaths(object):
1544 @staticmethod
1545 def nsr_opdata(k=None):
1546 return ("D,/nsr:ns-instance-opdata/nsr:nsr" +
1547 ("[nsr:ns-instance-config-ref={}]".format(quoted_key(k)) if k is not None else ""))
1548
1549 @staticmethod
1550 def nsd_msg(k=None):
1551 return ("C,/project-nsd:nsd-catalog/project-nsd:nsd" +
1552 "[project-nsd:id={}]".format(quoted_key(k)) if k is not None else "")
1553
1554 @staticmethod
1555 def vnfr_opdata(k=None):
1556 return ("D,/vnfr:vnfr-catalog/vnfr:vnfr" +
1557 ("[vnfr:id={}]".format(quoted_key(k)) if k is not None else ""))
1558
1559 @staticmethod
1560 def vnfd_path(k=None):
1561 return ("C,/vnfd:vnfd-catalog/vnfd:vnfd" +
1562 ("[vnfd:id={}]".format(quoted_key(k)) if k is not None else ""))
1563
1564 @staticmethod
1565 def config_agent(k=None):
1566 return ("D,/rw-config-agent:config-agent/rw-config-agent:account" +
1567 ("[rw-config-agent:name={}]".format(quoted_key(k)) if k is not None else ""))
1568
1569 @staticmethod
1570 def nsr_config(k=None):
1571 return ("C,/nsr:ns-instance-config/nsr:nsr" +
1572 ("[nsr:id={}]".format(quoted_key(k)) if k is not None else ""))
1573
1574 @staticmethod
1575 def vlr(k=None):
1576 return ("D,/vlr:vlr-catalog/vlr:vlr" +
1577 ("[vlr:id={}]".format(quoted_key(k)) if k is not None else ""))
1578
1579 class ConfigManagerDTS(object):
1580 ''' This class either reads from DTS or publishes to DTS '''
1581
1582 def __init__(self, log, loop, parent, dts, project):
1583 self._log = log
1584 self._loop = loop
1585 self._parent = parent
1586 self._dts = dts
1587 self._project = project
1588
1589 @asyncio.coroutine
1590 def _read_dts(self, path, do_trace=False):
1591 xpath = self._project.add_project(path)
1592 self._log.debug("_read_dts path = %s", xpath)
1593 flags = rwdts.XactFlag.MERGE
1594 flags += rwdts.XactFlag.TRACE if do_trace else 0
1595 res_iter = yield from self._dts.query_read(
1596 xpath, flags=flags
1597 )
1598
1599 results = []
1600 try:
1601 for i in res_iter:
1602 result = yield from i
1603 if result is not None:
1604 results.append(result.result)
1605 except:
1606 pass
1607
1608 return results
1609
1610
1611 @asyncio.coroutine
1612 def get_xpath(self, xpath):
1613 self._log.debug("Attempting to get xpath: {}".format(xpath))
1614 resp = yield from self._read_dts(xpath, False)
1615 if len(resp) > 0:
1616 self._log.debug("Got DTS resp: {}".format(resp[0]))
1617 return resp[0]
1618 return None
1619
1620 @asyncio.coroutine
1621 def get_nsr(self, id):
1622 self._log.debug("Attempting to get NSR: %s", id)
1623 nsrl = yield from self._read_dts(XPaths.nsr_opdata(id), False)
1624 nsr = None
1625 if len(nsrl) > 0:
1626 nsr = nsrl[0].as_dict()
1627 return nsr
1628
1629 @asyncio.coroutine
1630 def get_nsr_config(self, id):
1631 self._log.debug("Attempting to get config NSR: %s", id)
1632 nsrl = yield from self._read_dts(XPaths.nsr_config(id), False)
1633 nsr = None
1634 if len(nsrl) > 0:
1635 nsr = nsrl[0]
1636 return nsr
1637
1638 @asyncio.coroutine
1639 def get_nsd_msg(self, id):
1640 self._log.debug("Attempting to get NSD: %s", id)
1641 nsdl = yield from self._read_dts(XPaths.nsd_msg(id), False)
1642 nsd_msg = None
1643 if len(nsdl) > 0:
1644 nsd_msg = nsdl[0]
1645 return nsd_msg
1646
1647 @asyncio.coroutine
1648 def get_nsd(self, nsr_id):
1649 self._log.debug("Attempting to get NSD for NSR: %s", id)
1650 nsr_config = yield from self.get_nsr_config(nsr_id)
1651 nsd_msg = nsr_config.nsd
1652 return nsd_msg
1653
1654 @asyncio.coroutine
1655 def get_vnfr(self, id):
1656 self._log.debug("Attempting to get VNFR: %s", id)
1657 vnfrl = yield from self._read_dts(XPaths.vnfr_opdata(id), do_trace=False)
1658 vnfr_msg = None
1659 if len(vnfrl) > 0:
1660 vnfr_msg = vnfrl[0]
1661 return vnfr_msg
1662
1663 @asyncio.coroutine
1664 def get_vnfd(self, id):
1665 self._log.debug("Attempting to get VNFD: %s", XPaths.vnfd_path(id))
1666 vnfdl = yield from self._read_dts(XPaths.vnfd_path(id), do_trace=False)
1667 vnfd_msg = None
1668 if len(vnfdl) > 0:
1669 vnfd_msg = vnfdl[0]
1670 return vnfd_msg
1671
1672 @asyncio.coroutine
1673 def get_vlr(self, id):
1674 self._log.debug("Attempting to get VLR subnet: %s", id)
1675 vlrl = yield from self._read_dts(XPaths.vlr(id), do_trace=False)
1676 vlr_msg = None
1677 if len(vlrl) > 0:
1678 vlr_msg = vlrl[0]
1679 return vlr_msg
1680
1681 @asyncio.coroutine
1682 def get_config_agents(self, name):
1683 self._log.debug("Attempting to get config_agents: %s", name)
1684 cfgagentl = yield from self._read_dts(XPaths.config_agent(name), False)
1685 return cfgagentl
1686
1687 @asyncio.coroutine
1688 def update(self, xpath, msg, flags=rwdts.XactFlag.REPLACE):
1689 """
1690 Update a cm-state (cm-nsr) record in DTS with the path and message
1691 """
1692 path = self._project.add_project(xpath)
1693 self._log.debug("Updating cm-state %s:%s dts_pub_hdl = %s", path, msg, self.dts_pub_hdl)
1694 self.dts_pub_hdl.update_element(path, msg, flags)
1695 self._log.debug("Updated cm-state, %s:%s", path, msg)
1696
1697 @asyncio.coroutine
1698 def delete(self, xpath):
1699 """
1700 Delete cm-nsr record in DTS with the path only
1701 """
1702 path = self._project.add_project(xpath)
1703 self._log.debug("Deleting cm-nsr %s dts_pub_hdl = %s", path, self.dts_pub_hdl)
1704 self.dts_pub_hdl.delete_element(path)
1705 self._log.debug("Deleted cm-nsr, %s", path)
1706
1707 @asyncio.coroutine
1708 def register(self):
1709 yield from self.register_to_publish()
1710 yield from self.register_for_nsr()
1711
1712 def deregister(self):
1713 self._log.debug("De-registering conman config for project {}".
1714 format(self._project.name))
1715 if self.dts_reg_hdl:
1716 self.dts_reg_hdl.deregister()
1717 self.dts_reg_hdl = None
1718
1719 if self.dts_pub_hdl:
1720 self.dts_pub_hdl.deregister()
1721 self.dts_pub_hdl = None
1722
1723 @asyncio.coroutine
1724 def register_to_publish(self):
1725 ''' Register to DTS for publishing cm-state opdata '''
1726
1727 xpath = self._project.add_project("D,/rw-conman:cm-state/rw-conman:cm-nsr")
1728 self._log.debug("Registering to publish cm-state @ %s", xpath)
1729 hdl = rift.tasklets.DTS.RegistrationHandler()
1730 with self._dts.group_create() as group:
1731 self.dts_pub_hdl = group.register(xpath=xpath,
1732 handler=hdl,
1733 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ)
1734
1735 @property
1736 def nsr_xpath(self):
1737 return self._project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr")
1738
1739 @asyncio.coroutine
1740 def register_for_nsr(self):
1741 """ Register for NSR changes """
1742
1743 @asyncio.coroutine
1744 def on_prepare(xact_info, query_action, ks_path, msg):
1745 """ This NSR is created """
1746 self._log.debug("Received NSR instantiate on_prepare (%s:%s:%s)",
1747 query_action,
1748 ks_path,
1749 msg)
1750
1751 if (query_action == rwdts.QueryAction.UPDATE or
1752 query_action == rwdts.QueryAction.CREATE):
1753 # Update Each NSR/VNFR state
1754 if msg.operational_status in ['running', 'terminate']:
1755 # Add to the task list
1756 self._parent.add_to_pending_tasks({
1757 'nsrid' : msg.ns_instance_config_ref,
1758 'retries' : 5,
1759 'event' : msg.operational_status,
1760 })
1761
1762 elif query_action == rwdts.QueryAction.DELETE:
1763 nsr_id = msg.ns_instance_config_ref
1764 self._log.debug("Got terminate for NSR id %s", nsr_id)
1765 asyncio.ensure_future(self._parent.delete_NSR(nsr_id), loop=self._loop)
1766
1767 else:
1768 raise NotImplementedError(
1769 "%s action on cm-state not supported",
1770 query_action)
1771
1772 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
1773
1774 try:
1775 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
1776 self.dts_reg_hdl = yield from self._dts.register(self.nsr_xpath,
1777 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
1778 handler=handler)
1779 except Exception as e:
1780 self._log.error("Failed to register for NSR changes as %s", str(e))
1781
1782