X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwcm%2Fplugins%2Frwconman%2Frift%2Ftasklets%2Frwconmantasklet%2Frwconman_config.py;h=d74f695e695eabaabea1334adcec05959161e845;hb=a3bb91f092d378448cb870eccd45d43865de143c;hp=8b47e6bd630421af51fe8d9d871137602ca902cb;hpb=8a13f9e5bc11c0de12be2907210ddd0fe724b688;p=osm%2FSO.git diff --git a/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconman_config.py b/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconman_config.py index 8b47e6bd..d74f695e 100644 --- a/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconman_config.py +++ b/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconman_config.py @@ -127,13 +127,16 @@ class ConfigManagerConfig(object): self._log = log self._loop = loop self._parent = parent + self._project = parent._project + self._nsr_dict = {} self.pending_cfg = {} self.terminate_cfg = {} self.pending_tasks = [] # User for NSRid get retry # (mainly excercised at restart case) - self._config_xpath = "C,/cm-config" - self._opdata_xpath = "D,/rw-conman:cm-state" + + self._config_xpath = self._project.add_project("C,/rw-conman:cm-config") + self._opdata_xpath = self._project.add_project("D,/rw-conman:cm-state") self.cm_config = conmanY.SoConfig() # RO specific configuration @@ -147,7 +150,7 @@ class ConfigManagerConfig(object): self.cm_state['states'] = "Initialized" # Initialize objects to register - self.cmdts_obj = ConfigManagerDTS(self._log, self._loop, self, self._dts) + self.cmdts_obj = ConfigManagerDTS(self._log, self._loop, self, self._dts, self._project) self._config_agent_mgr = conagent.RiftCMConfigAgent( self._dts, self._log, @@ -157,10 +160,11 @@ class ConfigManagerConfig(object): self.reg_handles = [ self.cmdts_obj, self._config_agent_mgr, - RiftCM_rpc.RiftCMRPCHandler(self._dts, self._log, self._loop, + RiftCM_rpc.RiftCMRPCHandler(self._dts, self._log, self._loop, self._project, PretendNsm( self._dts, self._log, self._loop, self)), ] + self._op_reg = None def is_nsr_valid(self, nsr_id): if nsr_id in self._nsr_dict: @@ -241,7 +245,19 @@ class ConfigManagerConfig(object): # Initialize all handles that needs to be registered for reg in self.reg_handles: yield from reg.register() - + + def deregister(self): + # De-register all reg handles + self._log.debug("De-register ConfigManagerConfig for project {}". + format(self._project)) + + for reg in self.reg_handles: + reg.deregister() + reg = None + + self._op_reg.delete_element(self._opdata_xpath) + self._op_reg.deregister() + @asyncio.coroutine def register_cm_state_opdata(self): @@ -285,9 +301,9 @@ class ConfigManagerConfig(object): try: handler=rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare) - yield from self._dts.register(xpath=self._opdata_xpath, - handler=handler, - flags=rwdts.Flag.PUBLISHER) + self._op_reg = yield from self._dts.register(xpath=self._opdata_xpath, + handler=handler, + flags=rwdts.Flag.PUBLISHER) self._log.info("Successfully registered for opdata(%s)", self._opdata_xpath) except Exception as e: self._log.error("Failed to register for opdata as (%s)", e) @@ -301,7 +317,7 @@ class ConfigManagerConfig(object): if method in vnf_config: return method return None - + def get_cfg_file_extension(method, configuration_options): ext_dict = { "netconf" : "xml", @@ -473,7 +489,7 @@ class ConfigManagerConfig(object): try: if id not in nsr_dict: - nsr_obj = ConfigManagerNSR(self._log, self._loop, self, id) + nsr_obj = ConfigManagerNSR(self._log, self._loop, self, self._project, id) nsr_dict[id] = nsr_obj else: self._log.info("NSR(%s) is already initialized!", id) @@ -486,7 +502,7 @@ class ConfigManagerConfig(object): if nsr_obj.cm_nsr['state'] != nsr_obj.state_to_string(conmanY.RecordState.INIT): self._log.debug("NSR(%s) is already processed, state=%s", nsr_obj.nsr_name, nsr_obj.cm_nsr['state']) - yield from nsr_obj.publish_cm_state() + # yield from nsr_obj.publish_cm_state() return True cmdts_obj = self.cmdts_obj @@ -502,7 +518,7 @@ class ConfigManagerConfig(object): # Create Agent NSR class nsr_config = yield from cmdts_obj.get_nsr_config(id) self._log.debug("NSR {} config: {}".format(id, nsr_config)) - nsr_obj.agent_nsr = riftcm_config_plugin.RiftCMnsr(nsr, nsr_config) + nsr_obj.agent_nsr = riftcm_config_plugin.RiftCMnsr(nsr, nsr_config, self._project) try: yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.RECEIVED) @@ -519,7 +535,7 @@ class ConfigManagerConfig(object): # return nsr_obj.set_config_dir(self) - + for const_vnfr in nsr['constituent_vnfr_ref']: self._log.debug("Fetching VNFR (%s)", const_vnfr['vnfr_id']) vnfr_msg = yield from cmdts_obj.get_vnfr(const_vnfr['vnfr_id']) @@ -624,45 +640,50 @@ class ConfigManagerConfig(object): def terminate_NSR(self, id): nsr_dict = self._nsr_dict if id not in nsr_dict: - self._log.error("NSR(%s) does not exist!", id) + self._log.debug("NSR(%s) does not exist!", id) return else: - # Remove this NSR if we have it on pending task list - for task in self.pending_tasks: - if task['nsrid'] == id: - self.del_from_pending_tasks(task) + try: + # Remove this NSR if we have it on pending task list + for task in self.pending_tasks: + if task['nsrid'] == id: + self.del_from_pending_tasks(task) - # Remove this object from global list - nsr_obj = nsr_dict.pop(id, None) + # Remove this object from global list + nsr_obj = nsr_dict.pop(id, None) - # Remove this NS cm-state from global status list - self.cm_state['cm_nsr'].remove(nsr_obj.cm_nsr) + # Remove this NS cm-state from global status list + self.cm_state['cm_nsr'].remove(nsr_obj.cm_nsr) - # Also remove any scheduled configuration event - for nsr_obj_p in self._parent.pending_cfg: - if nsr_obj_p == nsr_obj: - assert id == nsr_obj_p._nsr_id - #self._parent.pending_cfg.remove(nsr_obj_p) - # Mark this as being deleted so we do not try to configure it if we are in cfg_delay (will wake up and continue to process otherwise) - nsr_obj_p.being_deleted = True - self._log.info("Removed scheduled configuration for NSR(%s)", nsr_obj.nsr_name) + # Also remove any scheduled configuration event + for nsr_obj_p in self._parent.pending_cfg: + if nsr_obj_p == nsr_obj: + assert id == nsr_obj_p._nsr_id + #self._parent.pending_cfg.remove(nsr_obj_p) + # Mark this as being deleted so we do not try to configure + # it if we are in cfg_delay (will wake up and continue to process otherwise) + nsr_obj_p.being_deleted = True + self._log.info("Removed scheduled configuration for NSR(%s)", nsr_obj.nsr_name) - self._parent.remove_nsr_obj(id) + self._parent.remove_nsr_obj(id) - # Call Config Agent to clean up for each VNF - for agent_vnfr in nsr_obj.agent_nsr.vnfrs: - yield from self._config_agent_mgr.invoke_config_agent_plugins( - 'notify_terminate_vnfr', - nsr_obj.agent_nsr, - agent_vnfr) + # Call Config Agent to clean up for each VNF + for agent_vnfr in nsr_obj.agent_nsr.vnfrs: + yield from self._config_agent_mgr.invoke_config_agent_plugins( + 'notify_terminate_vnfr', + nsr_obj.agent_nsr, + agent_vnfr) - # publish delete cm-state (cm-nsr) - yield from nsr_obj.delete_cm_nsr() + # publish delete cm-state (cm-nsr) + yield from nsr_obj.delete_cm_nsr() - #####################TBD########################### - # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_terminate_ns', self.id) + #####################TBD########################### + # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_terminate_ns', self.id) - self._log.info("NSR(%s/%s) is deleted", nsr_obj.nsr_name, id) + self._log.info("NSR(%s/%s) is deleted", nsr_obj.nsr_name, id) + + except Exception as e: + self._log.exception("Terminate NSR exception: {}".format(e)) @asyncio.coroutine def process_initial_config(self, nsr_obj, conf, script, vnfr_name=None): @@ -678,10 +699,6 @@ class ConfigManagerConfig(object): if vnfr_name: inp['vnfr_name'] = vnfr_name - # TODO (pjoseph): Add config agents, we need to identify which all - # config agents are required from this NS and provide only those - inp['config-agent'] = {} - # Add parameters for initial config inp['parameter'] = {} for parameter in parameters: @@ -696,6 +713,20 @@ class ConfigManagerConfig(object): format(nsr_obj.nsr_name, parameter, e)) + # Add config agents specific to each VNFR + inp['config-agent'] = {} + for vnfr in nsr_obj.agent_nsr.vnfrs: + # Get the config agent for the VNFR + # If vnfr name is specified, add only CA specific to that + if (vnfr_name is None) or \ + (vnfr_name == vnfr.name): + agent = self._config_agent_mgr.get_vnfr_config_agent(vnfr.vnfr_msg) + if agent: + if agent.agent_type != riftcm_config_plugin.DEFAULT_CAP_TYPE: + inp['config-agent'][vnfr.member_vnf_index] = agent.agent_data + inp['config-agent'][vnfr.member_vnf_index] \ + ['service-name'] = agent.get_service_name(vnfr.id) + # Add vnfrs specific data inp['vnfr'] = {} for vnfr in nsr_obj.vnfrs: @@ -722,14 +753,15 @@ class ConfigManagerConfig(object): vdu_data = [] for vdu in vnfr['vdur']: d = {} - for k in ['name','management_ip', 'vm_management_ip', 'id']: + for k in ['name','management_ip', 'vm_management_ip', 'id', 'vdu_id_ref']: if k in vdu: d[k] = vdu[k] vdu_data.append(d) - v['vdur'].append(vdu_data) + v['vdur'] = vdu_data inp['vnfr'][vnfr['member_vnf_index_ref']] = v + self._log.debug("Input data for {}: {}". format((vnfr_name if vnfr_name else nsr_obj.nsr_name), inp)) @@ -761,18 +793,24 @@ class ConfigManagerConfig(object): self._log.debug("Running the CMD: {}".format(cmd)) process = yield from asyncio.create_subprocess_shell(cmd, - loop=self._loop) - yield from process.wait() - - if process.returncode: - msg = "NSR/VNFR {} initial config using {} failed with {}". \ + loop=self._loop, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = yield from process.communicate() + rc = yield from process.wait() + + if rc: + msg = "NSR/VNFR {} initial config using {} failed with {}: {}". \ format(vnfr_name if vnfr_name else nsr_obj.nsr_name, - script, process.returncode) + script, rc, stderr) self._log.error(msg) raise InitialConfigError(msg) - else: - # os.remove(inp_file) - pass + + try: + os.remove(inp_file) + except Exception as e: + self._log.debug("Error removing input file {}: {}". + format(inp_file, e)) def get_script_file(self, script_name, d_name, d_id, d_type): # Get the full path to the script @@ -791,7 +829,7 @@ class ConfigManagerConfig(object): script_name) self._log.debug("Checking for script at %s", script) if not os.path.exists(script): - self._log.debug("Did not find script %s", script) + self._log.warning("Did not find script %s", script) script = os.path.join(os.environ['RIFT_INSTALL'], 'usr/bin', script_name) @@ -801,9 +839,9 @@ class ConfigManagerConfig(object): # to make sure it has execute permission perm = os.stat(script).st_mode if not (perm & stat.S_IXUSR): - self._log.warn("NSR/VNFR {} initial config script {} " \ - "without execute permission: {}". - format(d_name, script, perm)) + self._log.warning("NSR/VNFR {} initial config script {} " \ + "without execute permission: {}". + format(d_name, script, perm)) os.chmod(script, perm | stat.S_IXUSR) return script @@ -846,8 +884,8 @@ class ConfigManagerConfig(object): continue script = self.get_script_file(conf.user_defined_script, - vnfd.id, vnfd.name, + vnfd.id, 'vnfd') yield from self.process_initial_config(nsr_obj, @@ -857,7 +895,7 @@ class ConfigManagerConfig(object): class ConfigManagerNSR(object): - def __init__(self, log, loop, parent, id): + def __init__(self, log, loop, parent, project, id): self._log = log self._loop = loop self._rwcal = None @@ -865,6 +903,7 @@ class ConfigManagerNSR(object): self._cp_dict = {} self._nsr_id = id self._parent = parent + self._project = project self._log.info("Instantiated NSR entry for id=%s", id) self.nsr_cfg_config_attributes_dict = {} self.vnf_config_attributes_dict = {} @@ -893,10 +932,9 @@ class ConfigManagerNSR(object): @property def nsr_opdata_xpath(self): ''' Returns full xpath for this NSR cm-state opdata ''' - return( - "D,/rw-conman:cm-state" + - "/rw-conman:cm-nsr[rw-conman:id='{}']" - ).format(self._nsr_id) + return self._project.add_project(( + "D,/rw-conman:cm-state/rw-conman:cm-nsr[rw-conman:id='{}']" + ).format(self._nsr_id)) @property def vnfrs(self): @@ -1323,8 +1361,8 @@ class XPaths(object): @staticmethod def nsd_msg(k=None): - return ("C,/nsd:nsd-catalog/nsd:nsd" + - "[nsd:id = '{}']".format(k) if k is not None else "") + return ("C,/project-nsd:nsd-catalog/project-nsd:nsd" + + "[project-nsd:id = '{}']".format(k) if k is not None else "") @staticmethod def vnfr_opdata(k=None): @@ -1333,8 +1371,8 @@ class XPaths(object): @staticmethod def vnfd(k=None): - return ("C,/vnfd:vnfd-catalog/vnfd:vnfd" + - ("[vnfd:id='{}']".format(k) if k is not None else "")) + return ("C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd" + + ("[project-vnfd:id='{}']".format(k) if k is not None else "")) @staticmethod def config_agent(k=None): @@ -1352,14 +1390,16 @@ class XPaths(object): class ConfigManagerDTS(object): ''' This class either reads from DTS or publishes to DTS ''' - def __init__(self, log, loop, parent, dts): + def __init__(self, log, loop, parent, dts, project): self._log = log self._loop = loop self._parent = parent self._dts = dts + self._project = project @asyncio.coroutine - def _read_dts(self, xpath, do_trace=False): + def _read_dts(self, path, do_trace=False): + xpath = self._project.add_project(path) self._log.debug("_read_dts path = %s", xpath) flags = rwdts.XactFlag.MERGE res_iter = yield from self._dts.query_read( @@ -1446,19 +1486,21 @@ class ConfigManagerDTS(object): return cfgagentl @asyncio.coroutine - def update(self, path, msg, flags=rwdts.XactFlag.REPLACE): + def update(self, xpath, msg, flags=rwdts.XactFlag.REPLACE): """ Update a cm-state (cm-nsr) record in DTS with the path and message """ + path = self._project.add_project(xpath) self._log.debug("Updating cm-state %s:%s dts_pub_hdl = %s", path, msg, self.dts_pub_hdl) self.dts_pub_hdl.update_element(path, msg, flags) self._log.debug("Updated cm-state, %s:%s", path, msg) @asyncio.coroutine - def delete(self, path): + def delete(self, xpath): """ Delete cm-nsr record in DTS with the path only """ + path = self._project.add_project(xpath) self._log.debug("Deleting cm-nsr %s dts_pub_hdl = %s", path, self.dts_pub_hdl) self.dts_pub_hdl.delete_element(path) self._log.debug("Deleted cm-nsr, %s", path) @@ -1467,12 +1509,23 @@ class ConfigManagerDTS(object): def register(self): yield from self.register_to_publish() yield from self.register_for_nsr() - + + def deregister(self): + self._log.debug("De-registering conman config for project {}". + format(self._project.name)) + if self.dts_reg_hdl: + self.dts_reg_hdl.deregister() + self.dts_reg_hdl = None + + if self.dts_pub_hdl: + self.dts_pub_hdl.deregister() + self.dts_pub_hdl = None + @asyncio.coroutine def register_to_publish(self): ''' Register to DTS for publishing cm-state opdata ''' - xpath = "D,/rw-conman:cm-state/rw-conman:cm-nsr" + xpath = self._project.add_project("D,/rw-conman:cm-state/rw-conman:cm-nsr") self._log.debug("Registering to publish cm-state @ %s", xpath) hdl = rift.tasklets.DTS.RegistrationHandler() with self._dts.group_create() as group: @@ -1482,7 +1535,7 @@ class ConfigManagerDTS(object): @property def nsr_xpath(self): - return "D,/nsr:ns-instance-opdata/nsr:nsr" + return self._project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr") @asyncio.coroutine def register_for_nsr(self):