3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
27 from gi
.repository
import (
29 RwConmanYang
as conmanY
,
32 gi
.require_version('RwKeyspec', '1.0')
33 from gi
.repository
.RwKeyspec
import quoted_key
36 import rift
.package
.script
37 import rift
.package
.store
39 from . import rwconman_conagent
as conagent
40 from . import RiftCM_rpc
41 from . import riftcm_config_plugin
44 if sys
.version_info
< (3, 4, 4):
45 asyncio
.ensure_future
= asyncio
.async
47 def get_vnf_unique_name(nsr_name
, vnfr_name
, member_vnf_index
):
48 return "{}.{}.{}".format(nsr_name
, vnfr_name
, member_vnf_index
)
51 class ConmanConfigError(Exception):
55 class InitialConfigError(ConmanConfigError
):
59 class ScriptNotFoundError(InitialConfigError
):
63 def log_this_vnf(vnf_cfg
):
65 used_item_list
= ['nsr_name', 'vnfr_name', 'member_vnf_index', 'mgmt_ip_address']
66 for item
in used_item_list
:
68 if item
== 'mgmt_ip_address':
69 log_vnf
+= "({})".format(vnf_cfg
[item
])
71 log_vnf
+= "{}/".format(vnf_cfg
[item
])
74 class PretendNsm(object):
75 def __init__(self
, dts
, log
, loop
, parent
):
81 self
._nsr
_dict
= parent
._nsr
_dict
82 self
._config
_agent
_plugins
= []
87 # Expensive, instead use get_nsr, if you know id.
89 # Update the list of nsrs (agent nsr)
90 for id, nsr_obj
in self
._nsr
_dict
.items():
91 self
._nsrs
[id] = nsr_obj
.agent_nsr
94 def get_nsr(self
, nsr_id
):
95 if nsr_id
in self
._nsr
_dict
:
96 nsr_obj
= self
._nsr
_dict
[nsr_id
]
100 def get_vnfr_msg(self
, vnfr_id
, nsr_id
=None):
101 self
._log
.debug("get_vnfr_msg(vnfr=%s, nsr=%s)",
105 if nsr_id
in self
._nsr
_dict
:
106 nsr_obj
= self
._nsr
_dict
[nsr_id
]
107 if vnfr_id
in nsr_obj
._vnfr
_dict
:
110 for nsr_obj
in self
._nsr
_dict
.values():
111 if vnfr_id
in nsr_obj
._vnfr
_dict
:
116 vnf_cfg
= nsr_obj
._vnfr
_dict
[vnfr_id
]['vnf_cfg']
117 return vnf_cfg
['agent_vnfr'].vnfr_msg
122 def get_nsd(self
, nsr_id
):
123 if nsr_id
not in self
._nsd
_msg
:
124 nsr_config
= yield from self
._parent
.cmdts_obj
.get_nsr_config(nsr_id
)
125 self
._nsd
_msg
[nsr_id
] = nsr_config
.nsd
126 return self
._nsd
_msg
[nsr_id
]
129 def config_agent_plugins(self
):
130 self
._config
_agent
_plugins
= []
131 for agent
in self
._parent
._config
_agent
_mgr
._plugin
_instances
.values():
132 self
._config
_agent
_plugins
.append(agent
)
133 return self
._config
_agent
_plugins
135 class ConfigManagerConfig(object):
136 def __init__(self
, dts
, log
, loop
, parent
):
140 self
._parent
= parent
141 self
._project
= parent
._project
144 self
.pending_cfg
= {}
145 self
.terminate_cfg
= {}
146 self
.pending_tasks
= [] # User for NSRid get retry
147 # (mainly excercised at restart case)
149 self
._opdata
_xpath
= self
._project
.add_project("D,/rw-conman:cm-state")
151 # Initialize cm-state
153 self
.cm_state
['cm_nsr'] = []
154 self
.cm_state
['states'] = "Initialized"
156 # Initialize objects to register
157 self
.cmdts_obj
= ConfigManagerDTS(self
._log
, self
._loop
, self
, self
._dts
, self
._project
)
158 self
._config
_agent
_mgr
= conagent
.RiftCMConfigAgent(
165 self
.riftcm_rpc_handler
= RiftCM_rpc
.RiftCMRPCHandler(self
._dts
, self
._log
, self
._loop
, self
._project
,
167 self
._dts
, self
._log
, self
._loop
, self
))
171 self
._config
_agent
_mgr
,
172 self
.riftcm_rpc_handler
176 def is_nsr_valid(self
, nsr_id
):
177 if nsr_id
in self
._nsr
_dict
:
181 def add_to_pending_tasks(self
, task
):
182 if self
.pending_tasks
:
183 for p_task
in self
.pending_tasks
:
184 if (p_task
['nsrid'] == task
['nsrid']) and \
185 (p_task
['event'] == task
['event']):
189 self
.pending_tasks
.append(task
)
190 self
._log
.debug("add_to_pending_tasks (nsrid:%s)",
192 if len(self
.pending_tasks
) >= 1:
193 self
._loop
.create_task(self
.ConfigManagerConfig_pending_loop())
194 # TBD - change to info level
195 self
._log
.debug("Started pending_loop!")
197 except Exception as e
:
198 self
._log
.error("Failed adding to pending tasks (%s)", str(e
))
200 def del_from_pending_tasks(self
, task
):
202 self
.pending_tasks
.remove(task
)
203 except Exception as e
:
204 self
._log
.error("Failed removing from pending tasks (%s)", str(e
))
207 def ConfigManagerConfig_pending_loop(self
):
210 yield from asyncio
.sleep(loop_sleep
, loop
=self
._loop
)
212 This pending task queue is ordred by events,
213 must finish previous task successfully to be able to go on to the next task
215 if self
.pending_tasks
:
216 self
._log
.debug("self.pending_tasks len=%s", len(self
.pending_tasks
))
217 task
= self
.pending_tasks
.pop(0)
220 nsrid
= task
['nsrid']
221 self
._log
.debug("Will execute pending task for NSR id: %s", nsrid
)
223 # Try to configure this NSR
225 done
= yield from self
.config_NSR(nsrid
, task
['event'])
226 self
._log
.info("self.config_NSR status=%s", done
)
228 except Exception as e
:
229 self
._log
.error("Failed(%s) configuring NSR(%s) for task %s," \
230 "retries remained:%d!",
231 str(e
), nsrid
, task
['event'] , task
['retries'])
232 self
._log
.exception(e
)
233 if task
['event'] == 'terminate':
238 self
._log
.debug("Finished pending task NSR id: %s", nsrid
)
240 self
._log
.error("Failed configuring NSR(%s), retries remained:%d!",
241 nsrid
, task
['retries'])
243 # Failed, re-insert (append at the end)
244 # this failed task to be retried later
245 # If any retries remained.
247 self
.pending_tasks
.append(task
)
249 self
._log
.debug("Stopped pending_loop!")
254 yield from self
.register_cm_state_opdata()
256 # Initialize all handles that needs to be registered
257 for reg
in self
.reg_handles
:
258 yield from reg
.register()
260 def deregister(self
):
261 # De-register all reg handles
262 self
._log
.debug("De-register ConfigManagerConfig for project {}".
263 format(self
._project
))
265 for reg
in self
.reg_handles
:
269 self
._op
_reg
.delete_element(self
._opdata
_xpath
)
270 self
._op
_reg
.deregister()
273 def register_cm_state_opdata(self
):
275 def state_to_string(state
):
277 conmanY
.RecordState
.INIT
: "init",
278 conmanY
.RecordState
.RECEIVED
: "received",
279 conmanY
.RecordState
.CFG_PROCESS
: "cfg_process",
280 conmanY
.RecordState
.CFG_PROCESS_FAILED
: "cfg_process_failed",
281 conmanY
.RecordState
.CFG_SCHED
: "cfg_sched",
282 conmanY
.RecordState
.CONNECTING
: "connecting",
283 conmanY
.RecordState
.FAILED_CONNECTION
: "failed_connection",
284 conmanY
.RecordState
.CFG_SEND
: "cfg_send",
285 conmanY
.RecordState
.CFG_FAILED
: "cfg_failed",
286 conmanY
.RecordState
.READY_NO_CFG
: "ready_no_cfg",
287 conmanY
.RecordState
.READY
: "ready",
288 conmanY
.RecordState
.TERMINATE
: "terminate",
290 return state_dict
[state
]
293 def on_prepare(xact_info
, action
, ks_path
, msg
):
295 self
._log
.debug("Received cm-state: msg=%s, action=%s", msg
, action
)
297 if action
== rwdts
.QueryAction
.READ
:
298 self
._log
.debug("Responding to SHOW cm-state: %s", self
.cm_state
)
299 show_output
= conmanY
.YangData_RwProject_Project_CmState()
300 show_output
.from_dict(self
.cm_state
)
301 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
,
302 xpath
=self
._opdata
_xpath
,
305 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
307 self
._log
.info("Registering for cm-opdata xpath: %s",
311 handler
=rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
)
312 self
._op
_reg
= yield from self
._dts
.register(xpath
=self
._opdata
_xpath
,
314 flags
=rwdts
.Flag
.PUBLISHER
)
315 self
._log
.info("Successfully registered for opdata(%s)", self
._opdata
_xpath
)
316 except Exception as e
:
317 self
._log
.error("Failed to register for opdata as (%s)", e
)
319 def get_config_method(self
, vnf_config
):
320 cfg_types
= ['juju', 'script']
321 for method
in cfg_types
:
322 if method
in vnf_config
:
327 def process_nsd_vnf_configuration(self
, nsr_obj
, vnfr
):
329 # Get vnf_configuration from vnfr
330 vnf_config
= vnfr
['vnf_configuration']
332 # Save some parameters needed as short cuts in flat structure (Also generated)
333 vnf_cfg
= vnfr
['vnf_cfg']
334 # Prepare unique name for this VNF
335 vnf_cfg
['vnf_unique_name'] = get_vnf_unique_name(
336 vnf_cfg
['nsr_name'], vnfr
['name'], vnfr
['member_vnf_index_ref'])
338 self
._log
.debug("vnf_configuration = %s", vnf_config
)
340 method
= self
.get_config_method(vnf_config
)
342 if method
is not None:
343 self
._log
.debug("config method=%s", method
)
344 vnf_cfg
['config_method'] = method
346 # Set config agent based on method
347 self
._config
_agent
_mgr
.set_config_agent(
348 nsr_obj
.agent_nsr
, vnf_cfg
['agent_vnfr'], method
)
350 self
._log
.info("VNF:(%s) is not to be configured by Configuration Manager!",
351 log_this_vnf(vnfr
['vnf_cfg']))
352 yield from nsr_obj
.update_vnf_cm_state(vnfr
, conmanY
.RecordState
.READY_NO_CFG
)
354 # Update the cm-state
355 nsr_obj
.populate_cm_state_from_vnf_cfg()
358 def update_config_primitives(self
, nsr_obj
):
360 # Process all config-primitives in the member VNFs
361 for vnfr
in nsr_obj
.vnfrs
:
362 vnfd
= vnfr
['vnf_cfg']['agent_vnfr'].vnfd
365 prims
= vnfd
.vnf_configuration
.config_primitive
367 self
._log
.debug("VNFR {} with VNFD {} has no config primitives defined".
368 format(vnfr
['name'], vnfd
.name
))
370 except AttributeError as e
:
371 self
._log
.error("No config primitives found on VNFR {} ({})".
372 format(vnfr
['name'], vnfd
.name
))
375 cm_state
= nsr_obj
.find_vnfr_cm_state(vnfr
['id'])
376 srcs
= cm_state
['config_parameter']['config_parameter_source']
377 reqs
= cm_state
['config_parameter']['config_parameter_request']
379 vnf_configuration
= vnfd
.vnf_configuration
.as_dict()
380 vnf_configuration
['config_primitive'] = []
383 confp
= prim
.as_dict()
384 if 'parameter' not in confp
:
387 for param
in confp
['parameter']:
388 # First check the param in capabilities
391 for p
in src
['parameter']:
392 if (p
['config_primitive_ref'] == confp
['name']) \
393 and (p
['parameter_ref'] == param
['name']):
394 param
['default_value'] = src
['value']
402 for p
in req
['parameter']:
403 if (p
['config_primitive_ref'] == confp
['name']) \
404 and (p
['parameter_ref'] == param
['name']):
405 param
['default_value'] = req
['value']
411 self
._log
.debug("Config primitive: {}".format(confp
))
412 vnf_configuration
['config_primitive'].append(confp
)
414 cm_state
['vnf_configuration'] = vnf_configuration
417 def get_resolved_xpath(self
, xpath
, name
, vnf_name
, xpath_prefix
):
418 # For now, use DTS to resolve the path
419 # TODO (pjoseph): Add better xpath support
422 if xpath
.startswith('../'):
423 prefix
= xpath_prefix
425 while xp
.startswith('../'):
426 idx
= prefix
.rfind('/')
428 raise ValueError("VNF {}, Did not find the xpath specified: {}".
429 format(vnf_name
, xpath
))
430 prefix
= prefix
[:idx
]
433 dts_path
= prefix
+ '/' + xp
435 elif xpath
.startswith('/'):
436 dts_path
= 'C,' + xpath
437 elif xpath
.startswith('C,/') or xpath
.startswith('D,/'):
440 self
._log
.error("Invalid xpath {} for source {} in VNF {}".
441 format(xpath
, name
, vnf_name
))
442 raise ValueError("Descriptor xpath {} in source {} for VNF {} "
444 format(xpath
, name
, vnf_name
))
446 dts_path
= self
._project
.add_project(dts_path
)
450 def resolve_xpath(self
, xpath
, name
, vnfd
):
451 xpath_prefix
= "C,/project-vnfd:vnfd-catalog/vnfd[id={}]/config-parameter" \
452 "/config-parameter-source[name={}]" \
453 "/descriptor".format(quoted_key(vnfd
.id), quoted_key(name
))
455 dts_path
= yield from self
.get_resolved_xpath(xpath
, name
,
456 vnfd
.name
, xpath_prefix
)
457 idx
= dts_path
.rfind('/')
459 raise ValueError("VNFD {}, descriptor xpath {} should point to " \
460 "an attribute".format(vnfd
.name
, xpath
))
462 attr
= dts_path
[idx
+1:]
463 dts_path
= dts_path
[:idx
]
464 self
._log
.debug("DTS path: {}, attribute: {}".format(dts_path
, attr
))
466 resp
= yield from self
.cmdts_obj
.get_xpath(dts_path
)
468 raise ValueError("Xpath {} in capability {} for VNFD {} is not found".
469 format(xpath
, name
, vnfd
.name
))
470 self
._log
.debug("DTS response: {}".format(resp
.as_dict()))
473 val
= getattr(resp
, attr
)
474 except AttributeError as e
:
475 self
._log
.error("Did not find attribute : {}".format(attr
))
477 val
= getattr(resp
, attr
.replace('-', '_'))
478 except AttributeError as e
:
479 raise ValueError("Did not find attribute {} in XPath {} "
480 "for capability {} in VNF {}".
481 format(attr
, dts_path
, vnfd
.name
))
483 self
._log
.debug("XPath {}: {}".format(xpath
, val
))
487 def resolve_attribute(self
, attribute
, name
, vnfd
, vnfr
):
488 idx
= attribute
.rfind(',')
490 raise ValueError ("Invalid attribute {} for capability {} in "
492 format(attribute
, name
, vnfd
.name
))
493 xpath
= attribute
[:idx
].strip()
494 attr
= attribute
[idx
+1:].strip()
495 self
._log
.debug("Attribute {}, {}".format(xpath
, attr
))
496 if xpath
.startswith('C,/'):
497 raise ValueError("Attribute {} for capability {} in VNFD cannot "
499 format(attribute
, name
, vnfd
.name
))
501 xpath_prefix
= "D,/vnfr:vnfr-catalog/vnfr[id={}]/config_parameter" \
502 "/config-parameter-source[name={}]" \
503 "/attribute".format(quoted_key(vnfr
['id']), quoted_key(name
))
504 dts_path
= yield from self
.get_resolved_xpath(xpath
, name
,
507 self
._log
.debug("DTS query: {}".format(dts_path
))
509 resp
= yield from self
.cmdts_obj
.get_xpath(dts_path
)
511 raise ValueError("Attribute {} in request {} for VNFD {} is " \
513 format(xpath
, name
, vnfd
.name
))
514 self
._log
.debug("DTS response: {}".format(resp
.as_dict()))
517 val
= getattr(resp
, attr
)
518 except AttributeError as e
:
519 self
._log
.debug("Did not find attribute {}".format(attr
))
521 val
= getattr(resp
, attr
.replace('-', '_'))
522 except AttributeError as e
:
523 raise ValueError("Did not find attribute {} in XPath {} "
524 "for source {} in VNF {}".
525 format(attr
, dts_path
, vnfd
.name
))
527 self
._log
.debug("Attribute {}: {}".format(attribute
, val
))
531 def process_vnf_config_parameter(self
, nsr_obj
):
532 nsd
= nsr_obj
.agent_nsr
.nsd
534 # Process all capabilities in all the member VNFs
535 for vnfr
in nsr_obj
.vnfrs
:
536 vnfd
= vnfr
['vnf_cfg']['agent_vnfr'].vnfd
539 cparam
= vnfd
.config_parameter
540 except AttributeError as e
:
541 self
._log
.debug("VNFR {} does not have VNF config parameter".
547 srcs
= cparam
.config_parameter_source
548 except AttributeError as e
:
549 self
._log
.debug("VNFR {} has no source defined".
552 # Get the cm state dict for this vnfr
553 cm_state
= nsr_obj
.find_vnfr_cm_state(vnfr
['id'])
557 self
._log
.debug("VNFR {}: source {}".
558 format(vnfr
['name'], src
.as_dict()))
561 for p
in src
.parameter
:
563 'config_primitive_ref': p
.config_primitive_name_ref
,
564 'parameter_ref': p
.config_primitive_parameter_ref
569 self
._log
.debug("Got value {}".format(val
))
571 cm_srcs
.append({'name': src
.name
,
573 'parameter': param_refs
})
575 except AttributeError as e
:
579 xpath
= src
.descriptor
582 val
= yield from self
.resolve_xpath(xpath
, src
.name
, vnfd
)
583 self
._log
.debug("Got xpath value: {}".format(val
))
584 cm_srcs
.append({'name': src
.name
,
586 'parameter': param_refs
})
588 except AttributeError as e
:
592 attribute
= src
.attribute
595 val
= yield from self
.resolve_attribute(attribute
,
598 self
._log
.debug("Got attribute value: {}".format(val
))
599 cm_srcs
.append({'name': src
.name
,
601 'parameter': param_refs
})
603 except AttributeError as e
:
607 prim
= src
.primitive_ref
609 raise NotImplementedError("{}: VNF config parameter {}"
610 "source support for config"
611 "primitive not yet supported".
612 format(vnfr
.name
, prim
))
613 except AttributeError as e
:
616 self
._log
.debug("VNF config parameter sources: {}".format(cm_srcs
))
617 cm_state
['config_parameter']['config_parameter_source'] = cm_srcs
620 reqs
= cparam
.config_parameter_request
621 except AttributeError as e
:
622 self
._log
.debug("VNFR {} has no requests defined".
628 self
._log
.debug("VNFR{}: request {}".
629 format(vnfr
['name'], req
.as_dict()))
631 for p
in req
.parameter
:
633 'config_primitive_ref': p
.config_primitive_name_ref
,
634 'parameter_ref': p
.config_primitive_parameter_ref
636 cm_reqs
.append({'name': req
.name
,
637 'parameter': param_refs
})
639 self
._log
.debug("VNF requests: {}".format(cm_reqs
))
640 cm_state
['config_parameter']['config_parameter_request'] = cm_reqs
642 # Publish all config parameter for the VNFRs
643 # yield from nsr_obj.publish_cm_state()
647 cparam_map
= nsd
.config_parameter_map
648 except AttributeError as e
:
649 self
._log
.warning("No config parameter map specified for nsr: {}".
650 format(nsr_obj
.nsr_name
))
652 for cp
in cparam_map
:
653 src_vnfr
= nsr_obj
.agent_nsr
.get_member_vnfr(
654 cp
.config_parameter_source
.member_vnf_index_ref
)
655 cm_state
= nsr_obj
.find_vnfr_cm_state(src_vnfr
.id)
657 raise ValueError("Config parameter sources are not defined "
658 "for VNF member {} ({})".
659 format(cp
.config_parameter_source
.member_vnf_index_ref
,
661 srcs
= cm_state
['config_parameter']['config_parameter_source']
663 src_attr
= cp
.config_parameter_source
.config_parameter_source_ref
666 if src
['name'] == src_attr
:
670 req_vnfr
= nsr_obj
.agent_nsr
.get_member_vnfr(
671 cp
.config_parameter_request
.member_vnf_index_ref
)
672 req_attr
= cp
.config_parameter_request
.config_parameter_request_ref
673 cm_state
= nsr_obj
.find_vnfr_cm_state(req_vnfr
.id)
675 cm_reqs
= cm_state
['config_parameter']['config_parameter_request']
676 except KeyError as e
:
677 raise ValueError("VNFR index {} ({}) has no requests defined".
678 format(cp
.config_parameter_reequest
.member_vnf_index_ref
,
681 for i
, item
in enumerate(cm_reqs
):
682 if item
['name'] == req_attr
:
683 item
['value'] = str(val
)
685 self
._log
.debug("Request in VNFR {}: {}".
686 format(req_vnfr
.name
, item
))
689 yield from self
.update_config_primitives(nsr_obj
)
691 # TODO: Confd crashing with the config-parameter publish
692 # So removing config-parameter and publishing cm-state
693 for vnfr
in nsr_obj
.vnfrs
:
694 # Get the cm state dict for this vnfr
695 cm_state
= nsr_obj
.find_vnfr_cm_state(vnfr
['id'])
696 del cm_state
['config_parameter']['config_parameter_source']
697 del cm_state
['config_parameter']['config_parameter_request']
699 # Publish resolved dependencies for the VNFRs
700 yield from nsr_obj
.publish_cm_state()
703 def config_NSR(self
, id, event
):
705 cmdts_obj
= self
.cmdts_obj
706 if event
== 'running':
707 self
._log
.info("Configure NSR running, id = %s", id)
711 if id not in self
._nsr
_dict
:
712 nsr_obj
= ConfigManagerNSR(self
._log
, self
._loop
, self
, self
._project
, id)
713 self
._nsr
_dict
[id] = nsr_obj
715 self
._log
.info("NSR(%s) is already initialized!", id)
716 nsr_obj
= self
._nsr
_dict
[id]
718 except Exception as e
:
719 self
._log
.error("Failed creating NSR object for (%s) as (%s)", id, str(e
))
722 # Try to configure this NSR only if not already processed
723 if nsr_obj
.cm_nsr
['state'] != nsr_obj
.state_to_string(conmanY
.RecordState
.INIT
):
724 self
._log
.debug("NSR(%s) is already processed, state=%s",
725 nsr_obj
.nsr_name
, nsr_obj
.cm_nsr
['state'])
726 # Publish again in case NSM restarted
727 yield from nsr_obj
.publish_cm_state()
731 nsr
= yield from cmdts_obj
.get_nsr(id)
732 self
._log
.debug("Full NSR : %s", nsr
)
733 if nsr
['operational_status'] != "running":
734 self
._log
.info("NSR(%s) is not ready yet!", nsr
['nsd_name_ref'])
738 # Create Agent NSR class
739 nsr_config
= yield from cmdts_obj
.get_nsr_config(id)
740 self
._log
.debug("NSR {} config: {}".format(id, nsr_config
))
742 if nsr_config
is None:
743 # The NST Terminate has been initiated before the configuration. Hence
744 # not proceeding with config.
745 self
._log
.warning("NSR - %s is deleted before Configuration. Not proceeding with configuration.", id)
748 nsr_obj
.agent_nsr
= riftcm_config_plugin
.RiftCMnsr(nsr
, nsr_config
,
751 unique_cfg_vnfr_list
= list()
752 unique_agent_vnfr_list
= list()
754 yield from nsr_obj
.update_ns_cm_state(conmanY
.RecordState
.RECEIVED
)
756 nsr_obj
.set_nsr_name(nsr
['name_ref'])
757 for const_vnfr
in nsr
['constituent_vnfr_ref']:
758 self
._log
.debug("Fetching VNFR (%s)", const_vnfr
['vnfr_id'])
759 vnfr_msg
= yield from cmdts_obj
.get_vnfr(const_vnfr
['vnfr_id'])
761 vnfr
= vnfr_msg
.as_dict()
762 self
._log
.info("create VNF:{}/{} operational status {}".format(nsr_obj
.nsr_name
, vnfr
['name'], vnfr
['operational_status']))
763 agent_vnfr
= yield from nsr_obj
.add_vnfr(vnfr
, vnfr_msg
)
764 method
= self
.get_config_method(vnfr
['vnf_configuration'])
765 if method
is not None:
766 unique_cfg_vnfr_list
.append(vnfr
)
767 unique_agent_vnfr_list
.append(agent_vnfr
)
770 # Set up the config agent based on the method
771 yield from self
.process_nsd_vnf_configuration(nsr_obj
, vnfr
)
773 self
._log
.warning("NSR %s, VNFR not found yet (%s)", nsr_obj
.nsr_name
, const_vnfr
['vnfr_id'])
775 # Process VNF config parameter
776 yield from self
.process_vnf_config_parameter(nsr_obj
)
778 # Invoke the config agent plugin
779 for agent_vnfr
in unique_agent_vnfr_list
:
780 yield from self
._config
_agent
_mgr
.invoke_config_agent_plugins(
781 'notify_create_vnfr',
785 except Exception as e
:
786 self
._log
.error("Failed processing NSR (%s) as (%s)", nsr_obj
.nsr_name
, str(e
))
787 self
._log
.exception(e
)
788 yield from nsr_obj
.update_ns_cm_state(conmanY
.RecordState
.CFG_PROCESS_FAILED
)
791 self
._log
.debug("Starting to configure each VNF")
794 for cfg_vnfr
in unique_cfg_vnfr_list
:
795 # Apply configuration
796 vnf_unique_name
= get_vnf_unique_name(
799 str(cfg_vnfr
['member_vnf_index_ref']),
802 # Find vnfr for this vnf_unique_name
803 if vnf_unique_name
not in nsr_obj
._vnfr
_dict
:
804 self
._log
.error("NS (%s) - Can not find VNF to be configured: %s", nsr_obj
.nsr_name
, vnf_unique_name
)
806 # Save this unique VNF's config input parameters
807 nsr_obj
.ConfigVNF(nsr_obj
._vnfr
_dict
[vnf_unique_name
])
809 # Now add the entire NS to the pending config list.
810 self
._log
.info("Scheduling NSR:{} configuration ".format(nsr_obj
.nsr_name
))
811 self
._parent
.add_to_pending(nsr_obj
, unique_cfg_vnfr_list
)
812 self
._parent
.add_nsr_obj(nsr_obj
)
814 except Exception as e
:
815 self
._log
.error("Failed processing input parameters for NS (%s) as %s", nsr_obj
.nsr_name
, str(e
))
816 self
._log
.exception(e
)
819 except Exception as e
:
820 self
._log
.exception(e
)
822 self
._log
.error("Failed to configure NS (%s) as (%s)", nsr_obj
.nsr_name
, str(e
))
823 yield from nsr_obj
.update_ns_cm_state(conmanY
.RecordState
.CFG_PROCESS_FAILED
)
826 elif event
== 'terminate':
827 self
._log
.info("Configure NSR terminate, id = %s", id)
828 nsr_obj
= self
._parent
.get_nsr_obj(id)
830 # Can be none if the terminate is called again due to DTS query
834 yield from self
.process_ns_terminate_config(nsr_obj
, self
._project
.name
)
835 except Exception as e
:
836 self
._log
.warn("Terminate config failed for NSR {}: {}".
838 self
._log
.exception(e
)
841 yield from nsr_obj
.update_ns_cm_state(conmanY
.RecordState
.TERMINATE
)
842 yield from self
.terminate_NSR(id)
843 except Exception as e
:
844 self
._log
.error("Terminate failed for NSR {}: {}".
846 self
._log
.exception(e
)
851 def terminate_NSR(self
, id):
852 if id not in self
._nsr
_dict
:
853 self
._log
.error("NSR(%s) does not exist!", id)
856 nsr_obj
= self
._nsr
_dict
[id]
858 # Remove this NSR if we have it on pending task list
859 for task
in self
.pending_tasks
:
860 if task
['nsrid'] == id:
861 self
.del_from_pending_tasks(task
)
863 # Remove any scheduled configuration event
864 for nsr_obj_p
in self
._parent
.pending_cfg
:
865 if nsr_obj_p
== nsr_obj
:
866 assert id == nsr_obj_p
._nsr
_id
867 # Mark this as being deleted so we do not try to reconfigure it
868 # if we are in cfg_delay (will wake up and continue to process otherwise)
869 nsr_obj_p
.being_deleted
= True
870 self
._log
.info("Removed scheduled configuration for NSR(%s)", nsr_obj
.nsr_name
)
872 # Call Config Agent to clean up for each VNF
873 for agent_vnfr
in nsr_obj
.agent_nsr
.vnfrs
:
874 yield from self
._config
_agent
_mgr
.invoke_config_agent_plugins(
875 'notify_terminate_vnfr',
879 self
._log
.info("NSR(%s/%s) is terminated", nsr_obj
.nsr_name
, id)
882 def delete_NSR(self
, id):
883 if id not in self
._nsr
_dict
:
884 self
._log
.debug("NSR(%s) does not exist!", id)
887 # Remove this NSR if we have it on pending task list
888 for task
in self
.pending_tasks
:
889 if task
['nsrid'] == id:
890 self
.del_from_pending_tasks(task
)
892 # Remove this object from global list
893 nsr_obj
= self
._nsr
_dict
.pop(id, None)
895 # Remove this NS cm-state from global status list
896 self
.cm_state
['cm_nsr'].remove(nsr_obj
.cm_nsr
)
898 self
._parent
.remove_nsr_obj(id)
900 # publish delete cm-state (cm-nsr)
901 yield from nsr_obj
.delete_cm_nsr()
903 # Deleting any config jobs for NSR.
904 job_manager
= self
.riftcm_rpc_handler
.job_manager
.handler
905 job_manager
._terminate
_nsr
(id)
907 #####################TBD###########################
908 # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_terminate_ns', self.id)
910 self
._log
.info("NSR(%s/%s) is deleted", nsr_obj
.nsr_name
, id)
913 def process_initial_config(self
, nsr_obj
, conf
, script
, vnfr_name
=None):
914 '''Apply the initial-config-primitives specified in NSD or VNFD'''
916 def get_input_file(parameters
):
919 # Add NSR name to file
920 inp
['nsr_name'] = nsr_obj
.nsr_name
922 # Add VNFR name if available
924 inp
['vnfr_name'] = vnfr_name
926 # Add parameters for initial config
927 inp
['parameter'] = {}
928 for parameter
in parameters
:
930 inp
['parameter'][parameter
['name']] = parameter
['value']
931 except KeyError as e
:
933 self
._log
.info("VNFR {} initial config parameter {} with no value: {}".
934 format(vnfr_name
, parameter
, e
))
936 self
._log
.info("NSR {} initial config parameter {} with no value: {}".
937 format(nsr_obj
.nsr_name
, parameter
, e
))
940 # Add config agents specific to each VNFR
941 inp
['config-agent'] = {}
942 for vnfr
in nsr_obj
.agent_nsr
.vnfrs
:
943 # Get the config agent for the VNFR
944 # If vnfr name is specified, add only CA specific to that
945 if (vnfr_name
is None) or \
946 (vnfr_name
== vnfr
.name
):
947 agent
= self
._config
_agent
_mgr
.get_vnfr_config_agent(vnfr
.vnfr_msg
)
949 if agent
.agent_type
!= riftcm_config_plugin
.DEFAULT_CAP_TYPE
:
950 inp
['config-agent'][vnfr
.member_vnf_index
] = agent
.agent_data
951 inp
['config-agent'][vnfr
.member_vnf_index
] \
952 ['service-name'] = agent
.get_service_name(vnfr
.id)
954 # Add vnfrs specific data
956 for vnfr
in nsr_obj
.vnfrs
:
959 v
['name'] = vnfr
['name']
960 v
['mgmt_ip_address'] = vnfr
['vnf_cfg']['mgmt_ip_address']
961 v
['mgmt_port'] = vnfr
['vnf_cfg']['port']
962 v
['datacenter'] = vnfr
['datacenter']
964 if 'dashboard_url' in vnfr
:
965 v
['dashboard_url'] = vnfr
['dashboard_url']
967 if 'connection_point' in vnfr
:
968 v
['connection_point'] = []
969 for cp
in vnfr
['connection_point']:
970 cp_info
= dict(name
=cp
['name'],
971 ip_address
=cp
['ip_address'],
972 mac_address
=cp
.get('mac_address', None),
973 connection_point_id
=cp
.get('connection_point_id',None))
975 if 'virtual_cps' in cp
:
976 cp_info
['virtual_cps'] = [ {k
:v
for k
,v
in vcp
.items()
977 if k
in ['ip_address', 'mac_address']}
978 for vcp
in cp
['virtual_cps'] ]
979 v
['connection_point'].append(cp_info
)
983 vdu_data
= [(vdu
.get('name',None), vdu
.get('management_ip',None), vdu
.get('vm_management_ip',None), vdu
.get('id',None))
984 for vdu
in vnfr
['vdur']]
986 v
['vdur'] = [ dict(zip(['name', 'management_ip', 'vm_management_ip', 'id', 'vdu_id_ref'] , data
)) for data
in vdu_data
]
988 inp
['vnfr'][vnfr
['member_vnf_index_ref']] = v
991 self
._log
.debug("Input data for {}: {}".
992 format((vnfr_name
if vnfr_name
else nsr_obj
.nsr_name
),
995 # Convert to YAML string
996 yaml_string
= yaml
.dump(inp
, default_flow_style
=False)
998 # Write the inputs as yaml file
1000 with tempfile
.NamedTemporaryFile(delete
=False) as tmp_file
:
1001 tmp_file
.write(yaml_string
.encode("UTF-8"))
1002 self
._log
.debug("Input file created for {}: {}".
1003 format((vnfr_name
if vnfr_name \
1004 else nsr_obj
.nsr_name
),
1007 return tmp_file
.name
1011 parameters
= conf
['parameter']
1012 except Exception as e
:
1013 self
._log
.debug("Parameter conf: {}, e: {}".
1016 inp_file
= get_input_file(parameters
)
1018 cmd
= "{0} {1}".format(script
, inp_file
)
1019 self
._log
.debug("Running the CMD: {}".format(cmd
))
1021 process
= yield from asyncio
.create_subprocess_shell(cmd
,
1023 stdout
=subprocess
.PIPE
,
1024 stderr
=subprocess
.PIPE
)
1025 stdout
, stderr
= yield from process
.communicate()
1026 rc
= yield from process
.wait()
1029 msg
= "NSR/VNFR {} initial config using {} failed with {}: {}". \
1030 format(vnfr_name
if vnfr_name
else nsr_obj
.nsr_name
,
1032 self
._log
.error(msg
)
1033 raise InitialConfigError(msg
)
1037 except Exception as e
:
1038 self
._log
.error("Error removing input file {}: {}".
1039 format(inp_file
, e
))
1041 def get_script_file(self
, script_name
, d_name
, d_id
, d_type
, project
=None):
1042 # Get the full path to the script
1043 script
= os
.path
.join(os
.getenv('RIFT_VAR_ROOT'),
1044 'launchpad/packages',
1046 project
if project
else "",
1051 self
._log
.debug("Checking for script at %s", script
)
1052 if not os
.path
.exists(script
):
1053 err_msg
= ("{} {}: Did not find script {} for config".
1054 format(d_type
, d_name
, script
))
1055 self
._log
.error(err_msg
)
1056 raise ScriptNotFoundError(err_msg
)
1058 # Seen cases in jenkins, where the script execution fails
1059 # with permission denied. Setting the permission on script
1060 # to make sure it has execute permission
1061 perm
= os
.stat(script
).st_mode
1062 if not (perm
& stat
.S_IXUSR
):
1063 self
._log
.warning("NSR/VNFR {} script {} " \
1064 "without execute permission: {}".
1065 format(d_name
, script
, perm
))
1066 os
.chmod(script
, perm | stat
.S_IXUSR
)
1070 def process_ns_initial_config(self
, nsr_obj
, project
=None):
1071 '''Apply the initial-service-primitives specified in NSD'''
1072 nsr
= yield from self
.cmdts_obj
.get_nsr(nsr_obj
.nsr_id
)
1073 self
._log
.debug("NS initial config: {}".format(nsr
))
1074 if 'initial_service_primitive' not in nsr
:
1077 nsd
= yield from self
.cmdts_obj
.get_nsd(nsr_obj
.nsr_id
)
1078 for conf
in nsr
['initial_service_primitive']:
1079 self
._log
.debug("NSR {} initial config: {}".
1080 format(nsr_obj
.nsr_name
, conf
))
1081 script
= self
.get_script_file(conf
['user_defined_script'],
1088 yield from self
.process_initial_config(nsr_obj
, conf
, script
)
1091 def process_vnf_initial_config(self
, nsr_obj
, vnfr
, project
=None):
1092 '''Apply the initial-config-primitives specified in VNFD'''
1093 vnfr_name
= vnfr
.name
1096 vnf_cfg
= vnfd
.vnf_configuration
1098 for conf
in vnf_cfg
.initial_config_primitive
:
1099 self
._log
.debug("VNFR {} initial config: {} for vnfd id {}".
1100 format(vnfr_name
, conf
, vnfd
.id))
1102 if not conf
.user_defined_script
:
1103 self
._log
.debug("VNFR {} did not find user defined script: {}".
1104 format(vnfr_name
, conf
))
1107 script
= self
.get_script_file(conf
.user_defined_script
,
1114 yield from self
.process_initial_config(nsr_obj
,
1117 vnfr_name
=vnfr_name
)
1120 def process_ns_terminate_config(self
, nsr_obj
, project
=None):
1121 '''Apply the terminate-service-primitives specified in NSD'''
1124 if 'terminate_service_primitive' not in nsr
:
1128 nsd
= nsr_obj
.agent_nsr
.nsd
1129 for conf
in nsr
['terminate_service_primitive']:
1130 self
._log
.debug("NSR {} terminate service: {}".
1131 format(nsr_obj
.nsr_name
, conf
))
1132 script
= self
.get_script_file(conf
['user_defined_script'],
1139 yield from self
.process_initial_config(nsr_obj
, conf
, script
)
1141 except Exception as e
:
1142 # Ignore any failures on terminate
1143 self
._log
.warning("NSR {} terminate config script {} failed: {}".
1144 format(nsr_obj
.nsr_name
, script
, e
))
1148 class ConfigManagerNSR(object):
1149 def __init__(self
, log
, loop
, parent
, project
, id):
1153 self
._vnfr
_dict
= {}
1156 self
._parent
= parent
1157 self
._project
= project
1158 self
._log
.info("Instantiated NSR entry for id=%s", id)
1159 self
.nsr_cfg_config_attributes_dict
= {}
1160 self
.vnf_config_attributes_dict
= {}
1161 self
.num_vnfs_to_cfg
= 0
1162 self
._vnfr
_list
= []
1163 self
.vnf_cfg_list
= []
1164 self
.this_nsr_dir
= None
1165 self
.being_deleted
= False
1166 self
.dts_obj
= self
._parent
.cmdts_obj
1168 # Initialize cm-state for this NS
1170 self
.cm_nsr
['cm_vnfr'] = []
1171 self
.cm_nsr
['id'] = id
1172 self
.cm_nsr
['state'] = self
.state_to_string(conmanY
.RecordState
.INIT
)
1173 self
.cm_nsr
['state_details'] = None
1175 self
.set_nsr_name('Not Set')
1177 # Add this NSR cm-state object to global cm-state
1178 parent
.cm_state
['cm_nsr'].append(self
.cm_nsr
)
1180 # Place holders for NSR & VNFR classes
1181 self
.agent_nsr
= None
1184 def nsr_opdata_xpath(self
):
1185 ''' Returns full xpath for this NSR cm-state opdata '''
1186 return self
._project
.add_project((
1187 "D,/rw-conman:cm-state/rw-conman:cm-nsr[rw-conman:id={}]"
1188 ).format(quoted_key(self
._nsr
_id
)))
1192 return self
._vnfr
_list
1203 def publish_cm_state(self
):
1204 ''' This function publishes cm_state for this NSR '''
1206 cm_state
= conmanY
.YangData_RwProject_Project_CmState()
1207 cm_state_nsr
= cm_state
.cm_nsr
.add()
1208 cm_state_nsr
.from_dict(self
.cm_nsr
)
1209 #with self._dts.transaction() as xact:
1210 yield from self
.dts_obj
.update(self
.nsr_opdata_xpath
, cm_state_nsr
)
1211 self
._log
.info("Published cm-state with xpath %s and nsr %s",
1212 self
.nsr_opdata_xpath
,
1216 def delete_cm_nsr(self
):
1217 ''' This function publishes cm_state for this NSR '''
1219 yield from self
.dts_obj
.delete(self
.nsr_opdata_xpath
)
1220 self
._log
.info("Deleted cm-nsr with xpath %s",
1221 self
.nsr_opdata_xpath
)
1223 def set_nsr_name(self
, name
):
1224 self
.nsr_name
= name
1225 self
.cm_nsr
['name'] = name
1227 def ConfigVNF(self
, vnfr
):
1229 vnf_cfg
= vnfr
['vnf_cfg']
1230 vnf_cm_state
= self
.find_or_create_vnfr_cm_state(vnf_cfg
)
1232 if (vnf_cm_state
['state'] == self
.state_to_string(conmanY
.RecordState
.READY_NO_CFG
)
1234 vnf_cm_state
['state'] == self
.state_to_string(conmanY
.RecordState
.READY
)):
1235 self
._log
.warning("NS/VNF (%s/%s) is already configured! Skipped.", self
.nsr_name
, vnfr
['name'])
1239 vnf_cm_state
['state'] = self
.state_to_string(conmanY
.RecordState
.CFG_PROCESS
)
1241 # Now translate the configuration for iP addresses
1243 # Add cp_dict members (TAGS) for this VNF
1244 self
._cp
_dict
['rw_mgmt_ip'] = vnf_cfg
['mgmt_ip_address']
1245 self
._cp
_dict
['rw_username'] = vnf_cfg
['username']
1246 self
._cp
_dict
['rw_password'] = vnf_cfg
['password']
1247 except Exception as e
:
1248 vnf_cm_state
['state'] = self
.state_to_string(conmanY
.RecordState
.CFG_PROCESS_FAILED
)
1249 self
._log
.error("Failed to set tags for VNF: %s with (%s)", log_this_vnf(vnf_cfg
), str(e
))
1252 self
._log
.info("Applying config to VNF: %s = %s!", log_this_vnf(vnf_cfg
), vnf_cfg
)
1254 self
._log
.debug("Scheduled configuration!")
1255 vnf_cm_state
['state'] = self
.state_to_string(conmanY
.RecordState
.CFG_SCHED
)
1256 except Exception as e
:
1257 self
._log
.error("Failed apply_vnf_config to VNF: %s as (%s)", log_this_vnf(vnf_cfg
), str(e
))
1258 vnf_cm_state
['state'] = self
.state_to_string(conmanY
.RecordState
.CFG_PROCESS_FAILED
)
1262 self
._log
.info("Adding NS Record for id=%s", id)
1265 def sample_cm_state(self
):
1272 'connection_point': [
1273 {'ip_address': '1.1.1.1', 'name': 'vnf1cp1'},
1274 {'ip_address': '1.1.1.2', 'name': 'vnf1cp2'}
1277 'mgmt_interface': {'ip_address': '7.1.1.1',
1279 'name': 'vnfrname1',
1283 'connection_point': [{'ip_address': '2.1.1.1', 'name': 'vnf2cp1'},
1284 {'ip_address': '2.1.1.2', 'name': 'vnf2cp2'}],
1286 'mgmt_interface': {'ip_address': '7.1.1.2',
1288 'name': 'vnfrname2',
1295 'states': 'Initialized, '
1298 def populate_cm_state_from_vnf_cfg(self
):
1299 # Fill in each VNFR from this nsr object
1300 vnfr_list
= self
._vnfr
_list
1301 for vnfr
in vnfr_list
:
1302 vnf_cfg
= vnfr
['vnf_cfg']
1303 vnf_cm_state
= self
.find_vnfr_cm_state(vnfr
['id'])
1306 # Fill in VNF management interface
1307 vnf_cm_state
['mgmt_interface']['ip_address'] = vnf_cfg
['mgmt_ip_address']
1308 vnf_cm_state
['mgmt_interface']['port'] = vnf_cfg
['port']
1310 # Fill in VNF configuration details
1311 vnf_cm_state
['cfg_type'] = vnf_cfg
['config_method']
1313 # Fill in each connection-point for this VNF
1314 if "connection_point" in vnfr
:
1315 cp_list
= vnfr
['connection_point']
1316 for cp_item_dict
in cp_list
:
1318 vnf_cm_state
['connection_point'].append(
1320 'name' : cp_item_dict
['name'],
1321 'ip_address' : cp_item_dict
['ip_address'],
1322 'connection_point_id' : cp_item_dict
['connection_point_id'],
1326 # Added to make mano_ut work
1329 def state_to_string(self
, state
):
1331 conmanY
.RecordState
.INIT
: "init",
1332 conmanY
.RecordState
.RECEIVED
: "received",
1333 conmanY
.RecordState
.CFG_PROCESS
: "cfg_process",
1334 conmanY
.RecordState
.CFG_PROCESS_FAILED
: "cfg_process_failed",
1335 conmanY
.RecordState
.CFG_SCHED
: "cfg_sched",
1336 conmanY
.RecordState
.CONNECTING
: "connecting",
1337 conmanY
.RecordState
.FAILED_CONNECTION
: "failed_connection",
1338 conmanY
.RecordState
.CFG_SEND
: "cfg_send",
1339 conmanY
.RecordState
.CFG_FAILED
: "cfg_failed",
1340 conmanY
.RecordState
.READY_NO_CFG
: "ready_no_cfg",
1341 conmanY
.RecordState
.READY
: "ready",
1342 conmanY
.RecordState
.TERMINATE
: "terminate",
1344 return state_dict
[state
]
1346 def find_vnfr_cm_state(self
, id):
1347 if self
.cm_nsr
['cm_vnfr']:
1348 for vnf_cm_state
in self
.cm_nsr
['cm_vnfr']:
1349 if vnf_cm_state
['id'] == id:
1353 def find_or_create_vnfr_cm_state(self
, vnf_cfg
):
1354 vnfr
= vnf_cfg
['vnfr']
1355 vnf_cm_state
= self
.find_vnfr_cm_state(vnfr
['id'])
1357 if vnf_cm_state
is None:
1358 # Not found, Create and Initialize this VNF cm-state
1361 'name' : vnfr
['name'],
1362 'state' : self
.state_to_string(conmanY
.RecordState
.RECEIVED
),
1365 'ip_address' : vnf_cfg
['mgmt_ip_address'],
1366 'port' : vnf_cfg
['port'],
1368 'connection_point' : [],
1369 'config_parameter' :
1371 'config_parameter_source' : [],
1372 'config_parameter_request' : [],
1375 self
.cm_nsr
['cm_vnfr'].append(vnf_cm_state
)
1377 # Publish newly created cm-state
1383 def get_vnf_cm_state(self
, vnfr
):
1385 vnf_cm_state
= self
.find_vnfr_cm_state(vnfr
['id'])
1387 return vnf_cm_state
['state']
1391 def update_vnf_cm_state(self
, vnfr
, state
):
1393 vnf_cm_state
= self
.find_vnfr_cm_state(vnfr
['id'])
1394 if vnf_cm_state
is None:
1395 self
._log
.error("No opdata found for NS/VNF:%s/%s!",
1396 self
.nsr_name
, vnfr
['name'])
1399 if vnf_cm_state
['state'] != self
.state_to_string(state
):
1400 old_state
= vnf_cm_state
['state']
1401 vnf_cm_state
['state'] = self
.state_to_string(state
)
1403 yield from self
.publish_cm_state()
1404 self
._log
.info("VNF ({}/{}/{}) state change: {} -> {}"
1405 .format(self
.nsr_name
,
1407 vnfr
['member_vnf_index_ref'],
1409 vnf_cm_state
['state']))
1412 self
._log
.error("No VNFR supplied for state update (NS=%s)!",
1416 def get_ns_cm_state(self
):
1417 return self
.cm_nsr
['state']
1420 def update_ns_cm_state(self
, state
, state_details
=None):
1421 if self
.cm_nsr
['state'] != self
.state_to_string(state
):
1422 old_state
= self
.cm_nsr
['state']
1423 self
.cm_nsr
['state'] = self
.state_to_string(state
)
1424 self
.cm_nsr
['state_details'] = state_details
if state_details
is not None else None
1425 self
._log
.info("NS ({}) state change: {} -> {}"
1426 .format(self
.nsr_name
,
1428 self
.cm_nsr
['state']))
1430 yield from self
.publish_cm_state()
1433 def add_vnfr(self
, vnfr
, vnfr_msg
):
1436 def populate_subnets_from_vlr(id):
1438 # Populate cp_dict with VLR subnet info
1439 vlr
= yield from self
.dts_obj
.get_vlr(id)
1440 if vlr
is not None and 'assigned_subnet' in vlr
:
1441 subnet
= {vlr
.name
:vlr
.assigned_subnet
}
1442 self
._cp
_dict
[vnfr
['member_vnf_index_ref']].update(subnet
)
1443 self
._cp
_dict
.update(subnet
)
1444 self
._log
.debug("VNF:(%s) Updated assigned subnet = %s",
1445 vnfr
['name'], subnet
)
1446 except Exception as e
:
1447 self
._log
.error("VNF:(%s) VLR Error = %s",
1450 if vnfr
['id'] not in self
._vnfr
_dict
:
1451 self
._log
.info("NSR(%s) : Adding VNF Record for name=%s, id=%s", self
._nsr
_id
, vnfr
['name'], vnfr
['id'])
1452 # Add this vnfr to the list for show, or single traversal
1453 self
._vnfr
_list
.append(vnfr
)
1455 self
._log
.warning("NSR(%s) : VNF Record for name=%s, id=%s already exists, overwriting",
1456 self
._nsr
_id
, vnfr
['name'], vnfr
['id'])
1458 # Make vnfr available by id as well as by name
1459 unique_name
= get_vnf_unique_name(self
.nsr_name
, vnfr
['name'], vnfr
['member_vnf_index_ref'])
1460 self
._vnfr
_dict
[unique_name
] = vnfr
1461 self
._vnfr
_dict
[vnfr
['id']] = vnfr
1463 # Create vnf_cfg dictionary with default values
1467 'agent_vnfr' : self
.agent_nsr
.add_vnfr(vnfr
, vnfr_msg
),
1468 'nsr_name' : self
.nsr_name
,
1469 'nsr_id' : self
._nsr
_id
,
1470 'vnfr_name' : vnfr
['name'],
1471 'member_vnf_index' : vnfr
['member_vnf_index_ref'],
1473 'username' : '@rift',
1474 'password' : 'rift',
1475 'config_method' : 'None',
1476 'protocol' : 'None',
1477 'mgmt_ip_address' : '0.0.0.0',
1478 'cfg_file' : 'None',
1480 'script_type' : 'bash',
1483 ##########################
1484 # Update the mgmt ip address
1485 # In case the config method is none, this is not
1488 vnf_cfg
['mgmt_ip_address'] = vnfr_msg
.mgmt_interface
.ip_address
1489 vnf_cfg
['port'] = vnfr_msg
.mgmt_interface
.port
1490 except Exception as e
:
1492 "VNFR {}({}), unable to retrieve mgmt ip address: {}".
1493 format(vnfr
['name'], vnfr
['id'], e
))
1495 vnfr
['vnf_cfg'] = vnf_cfg
1496 self
.find_or_create_vnfr_cm_state(vnf_cfg
)
1499 Build the connection-points list for this VNF (self._cp_dict)
1501 # Populate global CP list self._cp_dict from VNFR
1503 if 'connection_point' in vnfr
:
1504 cp_list
= vnfr
['connection_point']
1506 self
._cp
_dict
[vnfr
['member_vnf_index_ref']] = {}
1508 for vdur
in vnfr
['vdur']:
1509 if 'internal_connection_point' in vdur
:
1510 cp_list
+= vdur
['internal_connection_point']
1512 for cp_item_dict
in cp_list
:
1513 if 'ip_address' not in cp_item_dict
:
1514 self
._log
.error("connection point {} doesnot have an ip address assigned ".
1515 format(cp_item_dict
['name']))
1517 # Populate global dictionary
1519 cp_item_dict
['name']
1520 ] = cp_item_dict
['ip_address']
1522 # Populate unique member specific dictionary
1524 vnfr
['member_vnf_index_ref']
1526 cp_item_dict
['name']
1527 ] = cp_item_dict
['ip_address']
1529 # Fill in the subnets from vlr
1530 if 'vlr_ref' in cp_item_dict
:
1531 ### HACK: Internal connection_point do not have VLR reference
1532 yield from populate_subnets_from_vlr(cp_item_dict
['vlr_ref'])
1534 if 'internal_vlr' in vnfr
:
1535 for ivlr
in vnfr
['internal_vlr']:
1536 yield from populate_subnets_from_vlr(ivlr
['vlr_ref'])
1539 vnf_cfg
['agent_vnfr']._vnfr
= vnfr
1540 return vnf_cfg
['agent_vnfr']
1543 class XPaths(object):
1545 def nsr_opdata(k
=None):
1546 return ("D,/nsr:ns-instance-opdata/nsr:nsr" +
1547 ("[nsr:ns-instance-config-ref={}]".format(quoted_key(k
)) if k
is not None else ""))
1550 def nsd_msg(k
=None):
1551 return ("C,/project-nsd:nsd-catalog/project-nsd:nsd" +
1552 "[project-nsd:id={}]".format(quoted_key(k
)) if k
is not None else "")
1555 def vnfr_opdata(k
=None):
1556 return ("D,/vnfr:vnfr-catalog/vnfr:vnfr" +
1557 ("[vnfr:id={}]".format(quoted_key(k
)) if k
is not None else ""))
1560 def vnfd_path(k
=None):
1561 return ("C,/vnfd:vnfd-catalog/vnfd:vnfd" +
1562 ("[vnfd:id={}]".format(quoted_key(k
)) if k
is not None else ""))
1565 def config_agent(k
=None):
1566 return ("D,/rw-config-agent:config-agent/rw-config-agent:account" +
1567 ("[rw-config-agent:name={}]".format(quoted_key(k
)) if k
is not None else ""))
1570 def nsr_config(k
=None):
1571 return ("C,/nsr:ns-instance-config/nsr:nsr" +
1572 ("[nsr:id={}]".format(quoted_key(k
)) if k
is not None else ""))
1576 return ("D,/vlr:vlr-catalog/vlr:vlr" +
1577 ("[vlr:id={}]".format(quoted_key(k
)) if k
is not None else ""))
1579 class ConfigManagerDTS(object):
1580 ''' This class either reads from DTS or publishes to DTS '''
1582 def __init__(self
, log
, loop
, parent
, dts
, project
):
1585 self
._parent
= parent
1587 self
._project
= project
1590 def _read_dts(self
, path
, do_trace
=False):
1591 xpath
= self
._project
.add_project(path
)
1592 self
._log
.debug("_read_dts path = %s", xpath
)
1593 flags
= rwdts
.XactFlag
.MERGE
1594 flags
+= rwdts
.XactFlag
.TRACE
if do_trace
else 0
1595 res_iter
= yield from self
._dts
.query_read(
1602 result
= yield from i
1603 if result
is not None:
1604 results
.append(result
.result
)
1612 def get_xpath(self
, xpath
):
1613 self
._log
.debug("Attempting to get xpath: {}".format(xpath
))
1614 resp
= yield from self
._read
_dts
(xpath
, False)
1616 self
._log
.debug("Got DTS resp: {}".format(resp
[0]))
1621 def get_nsr(self
, id):
1622 self
._log
.debug("Attempting to get NSR: %s", id)
1623 nsrl
= yield from self
._read
_dts
(XPaths
.nsr_opdata(id), False)
1626 nsr
= nsrl
[0].as_dict()
1630 def get_nsr_config(self
, id):
1631 self
._log
.debug("Attempting to get config NSR: %s", id)
1632 nsrl
= yield from self
._read
_dts
(XPaths
.nsr_config(id), False)
1639 def get_nsd_msg(self
, id):
1640 self
._log
.debug("Attempting to get NSD: %s", id)
1641 nsdl
= yield from self
._read
_dts
(XPaths
.nsd_msg(id), False)
1648 def get_nsd(self
, nsr_id
):
1649 self
._log
.debug("Attempting to get NSD for NSR: %s", id)
1650 nsr_config
= yield from self
.get_nsr_config(nsr_id
)
1651 nsd_msg
= nsr_config
.nsd
1655 def get_vnfr(self
, id):
1656 self
._log
.debug("Attempting to get VNFR: %s", id)
1657 vnfrl
= yield from self
._read
_dts
(XPaths
.vnfr_opdata(id), do_trace
=False)
1664 def get_vnfd(self
, id):
1665 self
._log
.debug("Attempting to get VNFD: %s", XPaths
.vnfd_path(id))
1666 vnfdl
= yield from self
._read
_dts
(XPaths
.vnfd_path(id), do_trace
=False)
1673 def get_vlr(self
, id):
1674 self
._log
.debug("Attempting to get VLR subnet: %s", id)
1675 vlrl
= yield from self
._read
_dts
(XPaths
.vlr(id), do_trace
=False)
1682 def get_config_agents(self
, name
):
1683 self
._log
.debug("Attempting to get config_agents: %s", name
)
1684 cfgagentl
= yield from self
._read
_dts
(XPaths
.config_agent(name
), False)
1688 def update(self
, xpath
, msg
, flags
=rwdts
.XactFlag
.REPLACE
):
1690 Update a cm-state (cm-nsr) record in DTS with the path and message
1692 path
= self
._project
.add_project(xpath
)
1693 self
._log
.debug("Updating cm-state %s:%s dts_pub_hdl = %s", path
, msg
, self
.dts_pub_hdl
)
1694 self
.dts_pub_hdl
.update_element(path
, msg
, flags
)
1695 self
._log
.debug("Updated cm-state, %s:%s", path
, msg
)
1698 def delete(self
, xpath
):
1700 Delete cm-nsr record in DTS with the path only
1702 path
= self
._project
.add_project(xpath
)
1703 self
._log
.debug("Deleting cm-nsr %s dts_pub_hdl = %s", path
, self
.dts_pub_hdl
)
1704 self
.dts_pub_hdl
.delete_element(path
)
1705 self
._log
.debug("Deleted cm-nsr, %s", path
)
1709 yield from self
.register_to_publish()
1710 yield from self
.register_for_nsr()
1712 def deregister(self
):
1713 self
._log
.debug("De-registering conman config for project {}".
1714 format(self
._project
.name
))
1715 if self
.dts_reg_hdl
:
1716 self
.dts_reg_hdl
.deregister()
1717 self
.dts_reg_hdl
= None
1719 if self
.dts_pub_hdl
:
1720 self
.dts_pub_hdl
.deregister()
1721 self
.dts_pub_hdl
= None
1724 def register_to_publish(self
):
1725 ''' Register to DTS for publishing cm-state opdata '''
1727 xpath
= self
._project
.add_project("D,/rw-conman:cm-state/rw-conman:cm-nsr")
1728 self
._log
.debug("Registering to publish cm-state @ %s", xpath
)
1729 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
1730 with self
._dts
.group_create() as group
:
1731 self
.dts_pub_hdl
= group
.register(xpath
=xpath
,
1733 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ
)
1736 def nsr_xpath(self
):
1737 return self
._project
.add_project("D,/nsr:ns-instance-opdata/nsr:nsr")
1740 def register_for_nsr(self
):
1741 """ Register for NSR changes """
1744 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
1745 """ This NSR is created """
1746 self
._log
.debug("Received NSR instantiate on_prepare (%s:%s:%s)",
1751 if (query_action
== rwdts
.QueryAction
.UPDATE
or
1752 query_action
== rwdts
.QueryAction
.CREATE
):
1753 # Update Each NSR/VNFR state
1754 if msg
.operational_status
in ['running', 'terminate']:
1755 # Add to the task list
1756 self
._parent
.add_to_pending_tasks({
1757 'nsrid' : msg
.ns_instance_config_ref
,
1759 'event' : msg
.operational_status
,
1762 elif query_action
== rwdts
.QueryAction
.DELETE
:
1763 nsr_id
= msg
.ns_instance_config_ref
1764 self
._log
.debug("Got terminate for NSR id %s", nsr_id
)
1765 asyncio
.ensure_future(self
._parent
.delete_NSR(nsr_id
), loop
=self
._loop
)
1768 raise NotImplementedError(
1769 "%s action on cm-state not supported",
1772 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
1775 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
)
1776 self
.dts_reg_hdl
= yield from self
._dts
.register(self
.nsr_xpath
,
1777 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
1779 except Exception as e
:
1780 self
._log
.error("Failed to register for NSR changes as %s", str(e
))