X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwcm%2Fplugins%2Frwconman%2Frift%2Ftasklets%2Frwconmantasklet%2Frwconman_config.py;h=d74f695e695eabaabea1334adcec05959161e845;hb=a3bb91f092d378448cb870eccd45d43865de143c;hp=4848e9ee10636c22ba730c2cf00b0dcb0a532eb8;hpb=49868d2c71eb364cee9707515be6841a568dad40;p=osm%2FSO.git diff --git a/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconman_config.py b/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconman_config.py index 4848e9ee..d74f695e 100644 --- a/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconman_config.py +++ b/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconman_config.py @@ -127,13 +127,16 @@ class ConfigManagerConfig(object): self._log = log self._loop = loop self._parent = parent + self._project = parent._project + self._nsr_dict = {} self.pending_cfg = {} self.terminate_cfg = {} self.pending_tasks = [] # User for NSRid get retry # (mainly excercised at restart case) - self._config_xpath = "C,/cm-config" - self._opdata_xpath = "D,/rw-conman:cm-state" + + self._config_xpath = self._project.add_project("C,/rw-conman:cm-config") + self._opdata_xpath = self._project.add_project("D,/rw-conman:cm-state") self.cm_config = conmanY.SoConfig() # RO specific configuration @@ -147,7 +150,7 @@ class ConfigManagerConfig(object): self.cm_state['states'] = "Initialized" # Initialize objects to register - self.cmdts_obj = ConfigManagerDTS(self._log, self._loop, self, self._dts) + self.cmdts_obj = ConfigManagerDTS(self._log, self._loop, self, self._dts, self._project) self._config_agent_mgr = conagent.RiftCMConfigAgent( self._dts, self._log, @@ -157,10 +160,11 @@ class ConfigManagerConfig(object): self.reg_handles = [ self.cmdts_obj, self._config_agent_mgr, - RiftCM_rpc.RiftCMRPCHandler(self._dts, self._log, self._loop, + RiftCM_rpc.RiftCMRPCHandler(self._dts, self._log, self._loop, self._project, PretendNsm( self._dts, self._log, self._loop, self)), ] + self._op_reg = None def is_nsr_valid(self, nsr_id): if nsr_id in self._nsr_dict: @@ -241,7 +245,19 @@ class ConfigManagerConfig(object): # Initialize all handles that needs to be registered for reg in self.reg_handles: yield from reg.register() - + + def deregister(self): + # De-register all reg handles + self._log.debug("De-register ConfigManagerConfig for project {}". + format(self._project)) + + for reg in self.reg_handles: + reg.deregister() + reg = None + + self._op_reg.delete_element(self._opdata_xpath) + self._op_reg.deregister() + @asyncio.coroutine def register_cm_state_opdata(self): @@ -285,9 +301,9 @@ class ConfigManagerConfig(object): try: handler=rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare) - yield from self._dts.register(xpath=self._opdata_xpath, - handler=handler, - flags=rwdts.Flag.PUBLISHER) + self._op_reg = yield from self._dts.register(xpath=self._opdata_xpath, + handler=handler, + flags=rwdts.Flag.PUBLISHER) self._log.info("Successfully registered for opdata(%s)", self._opdata_xpath) except Exception as e: self._log.error("Failed to register for opdata as (%s)", e) @@ -301,7 +317,7 @@ class ConfigManagerConfig(object): if method in vnf_config: return method return None - + def get_cfg_file_extension(method, configuration_options): ext_dict = { "netconf" : "xml", @@ -473,7 +489,7 @@ class ConfigManagerConfig(object): try: if id not in nsr_dict: - nsr_obj = ConfigManagerNSR(self._log, self._loop, self, id) + nsr_obj = ConfigManagerNSR(self._log, self._loop, self, self._project, id) nsr_dict[id] = nsr_obj else: self._log.info("NSR(%s) is already initialized!", id) @@ -486,7 +502,7 @@ class ConfigManagerConfig(object): if nsr_obj.cm_nsr['state'] != nsr_obj.state_to_string(conmanY.RecordState.INIT): self._log.debug("NSR(%s) is already processed, state=%s", nsr_obj.nsr_name, nsr_obj.cm_nsr['state']) - yield from nsr_obj.publish_cm_state() + # yield from nsr_obj.publish_cm_state() return True cmdts_obj = self.cmdts_obj @@ -502,14 +518,14 @@ class ConfigManagerConfig(object): # Create Agent NSR class nsr_config = yield from cmdts_obj.get_nsr_config(id) self._log.debug("NSR {} config: {}".format(id, nsr_config)) - nsr_obj.agent_nsr = riftcm_config_plugin.RiftCMnsr(nsr, nsr_config) + nsr_obj.agent_nsr = riftcm_config_plugin.RiftCMnsr(nsr, nsr_config, self._project) try: yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.RECEIVED) # Parse NSR if nsr is not None: - nsr_obj.set_nsr_name(nsr['nsd_name_ref']) + nsr_obj.set_nsr_name(nsr['name_ref']) nsr_dir = os.path.join(self._parent.cfg_dir, nsr_obj.nsr_name) self._log.info("Checking NS config directory: %s", nsr_dir) if not os.path.isdir(nsr_dir): @@ -519,7 +535,7 @@ class ConfigManagerConfig(object): # return nsr_obj.set_config_dir(self) - + for const_vnfr in nsr['constituent_vnfr_ref']: self._log.debug("Fetching VNFR (%s)", const_vnfr['vnfr_id']) vnfr_msg = yield from cmdts_obj.get_vnfr(const_vnfr['vnfr_id']) @@ -624,49 +640,54 @@ class ConfigManagerConfig(object): def terminate_NSR(self, id): nsr_dict = self._nsr_dict if id not in nsr_dict: - self._log.error("NSR(%s) does not exist!", id) + self._log.debug("NSR(%s) does not exist!", id) return else: - # Remove this NSR if we have it on pending task list - for task in self.pending_tasks: - if task['nsrid'] == id: - self.del_from_pending_tasks(task) + try: + # Remove this NSR if we have it on pending task list + for task in self.pending_tasks: + if task['nsrid'] == id: + self.del_from_pending_tasks(task) - # Remove this object from global list - nsr_obj = nsr_dict.pop(id, None) + # Remove this object from global list + nsr_obj = nsr_dict.pop(id, None) - # Remove this NS cm-state from global status list - self.cm_state['cm_nsr'].remove(nsr_obj.cm_nsr) + # Remove this NS cm-state from global status list + self.cm_state['cm_nsr'].remove(nsr_obj.cm_nsr) - # Also remove any scheduled configuration event - for nsr_obj_p in self._parent.pending_cfg: - if nsr_obj_p == nsr_obj: - assert id == nsr_obj_p._nsr_id - #self._parent.pending_cfg.remove(nsr_obj_p) - # Mark this as being deleted so we do not try to configure it if we are in cfg_delay (will wake up and continue to process otherwise) - nsr_obj_p.being_deleted = True - self._log.info("Removed scheduled configuration for NSR(%s)", nsr_obj.nsr_name) + # Also remove any scheduled configuration event + for nsr_obj_p in self._parent.pending_cfg: + if nsr_obj_p == nsr_obj: + assert id == nsr_obj_p._nsr_id + #self._parent.pending_cfg.remove(nsr_obj_p) + # Mark this as being deleted so we do not try to configure + # it if we are in cfg_delay (will wake up and continue to process otherwise) + nsr_obj_p.being_deleted = True + self._log.info("Removed scheduled configuration for NSR(%s)", nsr_obj.nsr_name) - self._parent.remove_nsr_obj(id) + self._parent.remove_nsr_obj(id) - # Call Config Agent to clean up for each VNF - for agent_vnfr in nsr_obj.agent_nsr.vnfrs: - yield from self._config_agent_mgr.invoke_config_agent_plugins( - 'notify_terminate_vnfr', - nsr_obj.agent_nsr, - agent_vnfr) + # Call Config Agent to clean up for each VNF + for agent_vnfr in nsr_obj.agent_nsr.vnfrs: + yield from self._config_agent_mgr.invoke_config_agent_plugins( + 'notify_terminate_vnfr', + nsr_obj.agent_nsr, + agent_vnfr) - # publish delete cm-state (cm-nsr) - yield from nsr_obj.delete_cm_nsr() + # publish delete cm-state (cm-nsr) + yield from nsr_obj.delete_cm_nsr() - #####################TBD########################### - # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_terminate_ns', self.id) + #####################TBD########################### + # yield from self._config_agent_mgr.invoke_config_agent_plugins('notify_terminate_ns', self.id) - self._log.info("NSR(%s/%s) is deleted", nsr_obj.nsr_name, id) + self._log.info("NSR(%s/%s) is deleted", nsr_obj.nsr_name, id) + + except Exception as e: + self._log.exception("Terminate NSR exception: {}".format(e)) @asyncio.coroutine - def process_ns_initial_config(self, nsr_obj): - '''Apply the initial-config-primitives specified in NSD''' + def process_initial_config(self, nsr_obj, conf, script, vnfr_name=None): + '''Apply the initial-config-primitives specified in NSD or VNFD''' def get_input_file(parameters): inp = {} @@ -674,9 +695,9 @@ class ConfigManagerConfig(object): # Add NSR name to file inp['nsr_name'] = nsr_obj.nsr_name - # TODO (pjoseph): Add config agents, we need to identify which all - # config agents are required from this NS and provide only those - inp['config-agent'] = {} + # Add VNFR name if available + if vnfr_name: + inp['vnfr_name'] = vnfr_name # Add parameters for initial config inp['parameter'] = {} @@ -684,10 +705,28 @@ class ConfigManagerConfig(object): try: inp['parameter'][parameter['name']] = parameter['value'] except KeyError as e: - self._log.info("NSR {} initial config parameter {} with no value: {}". - format(nsr_obj.nsr_name, parameter, e)) + if vnfr_name: + self._log.info("VNFR {} initial config parameter {} with no value: {}". + format(vnfr_name, parameter, e)) + else: + self._log.info("NSR {} initial config parameter {} with no value: {}". + format(nsr_obj.nsr_name, parameter, e)) + # Add config agents specific to each VNFR + inp['config-agent'] = {} + for vnfr in nsr_obj.agent_nsr.vnfrs: + # Get the config agent for the VNFR + # If vnfr name is specified, add only CA specific to that + if (vnfr_name is None) or \ + (vnfr_name == vnfr.name): + agent = self._config_agent_mgr.get_vnfr_config_agent(vnfr.vnfr_msg) + if agent: + if agent.agent_type != riftcm_config_plugin.DEFAULT_CAP_TYPE: + inp['config-agent'][vnfr.member_vnf_index] = agent.agent_data + inp['config-agent'][vnfr.member_vnf_index] \ + ['service-name'] = agent.get_service_name(vnfr.id) + # Add vnfrs specific data inp['vnfr'] = {} for vnfr in nsr_obj.vnfrs: @@ -711,17 +750,21 @@ class ConfigManagerConfig(object): ) v['vdur'] = [] - vdu_data = [(vdu['name'], vdu['management_ip'], vdu['vm_management_ip'], vdu['id']) - for vdu in vnfr['vdur']] - - for data in vdu_data: - data = dict(zip(['name', 'management_ip', 'vm_management_ip', 'id'] , data)) - v['vdur'].append(data) + vdu_data = [] + for vdu in vnfr['vdur']: + d = {} + for k in ['name','management_ip', 'vm_management_ip', 'id', 'vdu_id_ref']: + if k in vdu: + d[k] = vdu[k] + vdu_data.append(d) + v['vdur'] = vdu_data inp['vnfr'][vnfr['member_vnf_index_ref']] = v - self._log.debug("Input data for NSR {}: {}". - format(nsr_obj.nsr_name, inp)) + + self._log.debug("Input data for {}: {}". + format((vnfr_name if vnfr_name else nsr_obj.nsr_name), + inp)) # Convert to YAML string yaml_string = yaml.dump(inp, default_flow_style=False) @@ -730,95 +773,129 @@ class ConfigManagerConfig(object): tmp_file = None with tempfile.NamedTemporaryFile(delete=False) as tmp_file: tmp_file.write(yaml_string.encode("UTF-8")) - self._log.debug("Input file created for NSR {}: {}". - format(nsr_obj.nsr_name, tmp_file.name)) + self._log.debug("Input file created for {}: {}". + format((vnfr_name if vnfr_name \ + else nsr_obj.nsr_name), + tmp_file.name)) return tmp_file.name - def get_script_file(script_name, nsd_name, nsd_id): - # Get the full path to the script - script = '' - # If script name starts with /, assume it is full path - if script_name[0] == '/': - # The script has full path, use as is - script = script_name - else: - script = os.path.join(os.environ['RIFT_ARTIFACTS'], - 'launchpad/packages/nsd', - nsd_id, - nsd_name, - 'scripts', - script_name) - self._log.debug("Checking for script at %s", script) - if not os.path.exists(script): - self._log.debug("Did not find script %s", script) - script = os.path.join(os.environ['RIFT_INSTALL'], - 'usr/bin', - script_name) - - # Seen cases in jenkins, where the script execution fails - # with permission denied. Setting the permission on script - # to make sure it has execute permission - perm = os.stat(script).st_mode - if not (perm & stat.S_IXUSR): - self._log.warn("NSR {} initial config script {} " \ - "without execute permission: {}". - format(nsr_id, script, perm)) - os.chmod(script, perm | stat.S_IXUSR) - return script - - nsr_id = nsr_obj.nsr_id - nsr_name = nsr_obj.nsr_name - self._log.debug("Apply initial config for NSR {}({})". - format(nsr_name, nsr_id)) - - # Fetch NSR - nsr = yield from self.cmdts_obj.get_nsr(nsr_id) + parameters = [] + try: + parameters = conf['parameter'] + except Exception as e: + self._log.debug("Parameter conf: {}, e: {}". + format(conf, e)) + + inp_file = get_input_file(parameters) + + cmd = "{0} {1}".format(script, inp_file) + self._log.debug("Running the CMD: {}".format(cmd)) + + process = yield from asyncio.create_subprocess_shell(cmd, + loop=self._loop, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = yield from process.communicate() + rc = yield from process.wait() + + if rc: + msg = "NSR/VNFR {} initial config using {} failed with {}: {}". \ + format(vnfr_name if vnfr_name else nsr_obj.nsr_name, + script, rc, stderr) + self._log.error(msg) + raise InitialConfigError(msg) + + try: + os.remove(inp_file) + except Exception as e: + self._log.debug("Error removing input file {}: {}". + format(inp_file, e)) + + def get_script_file(self, script_name, d_name, d_id, d_type): + # Get the full path to the script + script = '' + # If script name starts with /, assume it is full path + if script_name[0] == '/': + # The script has full path, use as is + script = script_name + else: + script = os.path.join(os.environ['RIFT_ARTIFACTS'], + 'launchpad/packages', + d_type, + d_id, + d_name, + 'scripts', + script_name) + self._log.debug("Checking for script at %s", script) + if not os.path.exists(script): + self._log.warning("Did not find script %s", script) + script = os.path.join(os.environ['RIFT_INSTALL'], + 'usr/bin', + script_name) + + # Seen cases in jenkins, where the script execution fails + # with permission denied. Setting the permission on script + # to make sure it has execute permission + perm = os.stat(script).st_mode + if not (perm & stat.S_IXUSR): + self._log.warning("NSR/VNFR {} initial config script {} " \ + "without execute permission: {}". + format(d_name, script, perm)) + os.chmod(script, perm | stat.S_IXUSR) + return script + + @asyncio.coroutine + def process_ns_initial_config(self, nsr_obj): + '''Apply the initial-config-primitives specified in NSD''' + + nsr = yield from self.cmdts_obj.get_nsr(nsr_obj.nsr_id) + if 'initial_config_primitive' not in nsr: + return + if nsr is not None: - nsd = yield from self.cmdts_obj.get_nsd(nsr_id) + nsd = yield from self.cmdts_obj.get_nsd(nsr_obj.nsr_id) + for conf in nsr['initial_config_primitive']: + self._log.debug("NSR {} initial config: {}". + format(nsr_obj.nsr_name, conf)) + script = self.get_script_file(conf['user_defined_script'], + nsd.name, + nsd.id, + 'nsd') - try: - # Check if initial config is present - # TODO (pjoseph): Sort based on seq - for conf in nsr['initial_config_primitive']: - self._log.debug("Parameter conf: {}". - format(conf)) + yield from self.process_initial_config(nsr_obj, conf, script) - parameters = [] - try: - parameters = conf['parameter'] - except Exception as e: - self._log.debug("Parameter conf: {}, e: {}". - format(conf, e)) - pass - - inp_file = get_input_file(parameters) - - script = get_script_file(conf['user_defined_script'], - nsd.name, - nsd.id) - - cmd = "{0} {1}".format(script, inp_file) - self._log.debug("Running the CMD: {}".format(cmd)) - - process = yield from asyncio. \ - create_subprocess_shell(cmd, loop=self._loop) - yield from process.wait() - if process.returncode: - msg = "NSR {} initial config using {} failed with {}". \ - format(nsr_name, script, process.returncode) - self._log.error(msg) - raise InitialConfigError(msg) - else: - os.remove(inp_file) + @asyncio.coroutine + def process_vnf_initial_config(self, nsr_obj, vnfr): + '''Apply the initial-config-primitives specified in VNFD''' + + vnfr_name = vnfr.name + + vnfd = vnfr.vnfd + vnf_cfg = vnfd.vnf_configuration + + for conf in vnf_cfg.initial_config_primitive: + self._log.debug("VNFR {} initial config: {}". + format(vnfr_name, conf)) - except KeyError as e: - self._log.debug("Did not find initial config {}". - format(e)) + if not conf.user_defined_script: + self._log.debug("VNFR {} did not fine user defined script: {}". + format(vnfr_name, conf)) + continue + + script = self.get_script_file(conf.user_defined_script, + vnfd.name, + vnfd.id, + 'vnfd') + + yield from self.process_initial_config(nsr_obj, + conf.as_dict(), + script, + vnfr_name=vnfr_name) class ConfigManagerNSR(object): - def __init__(self, log, loop, parent, id): + def __init__(self, log, loop, parent, project, id): self._log = log self._loop = loop self._rwcal = None @@ -826,6 +903,7 @@ class ConfigManagerNSR(object): self._cp_dict = {} self._nsr_id = id self._parent = parent + self._project = project self._log.info("Instantiated NSR entry for id=%s", id) self.nsr_cfg_config_attributes_dict = {} self.vnf_config_attributes_dict = {} @@ -854,10 +932,9 @@ class ConfigManagerNSR(object): @property def nsr_opdata_xpath(self): ''' Returns full xpath for this NSR cm-state opdata ''' - return( - "D,/rw-conman:cm-state" + - "/rw-conman:cm-nsr[rw-conman:id='{}']" - ).format(self._nsr_id) + return self._project.add_project(( + "D,/rw-conman:cm-state/rw-conman:cm-nsr[rw-conman:id='{}']" + ).format(self._nsr_id)) @property def vnfrs(self): @@ -1284,14 +1361,19 @@ class XPaths(object): @staticmethod def nsd_msg(k=None): - return ("C,/nsd:nsd-catalog/nsd:nsd" + - "[nsd:id = '{}']".format(k) if k is not None else "") + return ("C,/project-nsd:nsd-catalog/project-nsd:nsd" + + "[project-nsd:id = '{}']".format(k) if k is not None else "") @staticmethod def vnfr_opdata(k=None): return ("D,/vnfr:vnfr-catalog/vnfr:vnfr" + ("[vnfr:id='{}']".format(k) if k is not None else "")) + @staticmethod + def vnfd(k=None): + return ("C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd" + + ("[project-vnfd:id='{}']".format(k) if k is not None else "")) + @staticmethod def config_agent(k=None): return ("D,/rw-config-agent:config-agent/rw-config-agent:account" + @@ -1308,14 +1390,16 @@ class XPaths(object): class ConfigManagerDTS(object): ''' This class either reads from DTS or publishes to DTS ''' - def __init__(self, log, loop, parent, dts): + def __init__(self, log, loop, parent, dts, project): self._log = log self._loop = loop self._parent = parent self._dts = dts + self._project = project @asyncio.coroutine - def _read_dts(self, xpath, do_trace=False): + def _read_dts(self, path, do_trace=False): + xpath = self._project.add_project(path) self._log.debug("_read_dts path = %s", xpath) flags = rwdts.XactFlag.MERGE res_iter = yield from self._dts.query_read( @@ -1377,6 +1461,15 @@ class ConfigManagerDTS(object): vnfr_msg = vnfrl[0] return vnfr_msg + @asyncio.coroutine + def get_vnfd(self, vnfd_id): + self._log.debug("Attempting to get VNFD: %s", vnfd_id) + vnfdl = yield from self._read_dts(XPaths.vnfd(vnfd_id), do_trace=False) + vnfd_msg = None + if len(vnfdl) > 0: + vnfd_msg = vnfdl[0] + return vnfd_msg + @asyncio.coroutine def get_vlr(self, id): self._log.debug("Attempting to get VLR subnet: %s", id) @@ -1393,19 +1486,21 @@ class ConfigManagerDTS(object): return cfgagentl @asyncio.coroutine - def update(self, path, msg, flags=rwdts.XactFlag.REPLACE): + def update(self, xpath, msg, flags=rwdts.XactFlag.REPLACE): """ Update a cm-state (cm-nsr) record in DTS with the path and message """ + path = self._project.add_project(xpath) self._log.debug("Updating cm-state %s:%s dts_pub_hdl = %s", path, msg, self.dts_pub_hdl) self.dts_pub_hdl.update_element(path, msg, flags) self._log.debug("Updated cm-state, %s:%s", path, msg) @asyncio.coroutine - def delete(self, path): + def delete(self, xpath): """ Delete cm-nsr record in DTS with the path only """ + path = self._project.add_project(xpath) self._log.debug("Deleting cm-nsr %s dts_pub_hdl = %s", path, self.dts_pub_hdl) self.dts_pub_hdl.delete_element(path) self._log.debug("Deleted cm-nsr, %s", path) @@ -1414,12 +1509,23 @@ class ConfigManagerDTS(object): def register(self): yield from self.register_to_publish() yield from self.register_for_nsr() - + + def deregister(self): + self._log.debug("De-registering conman config for project {}". + format(self._project.name)) + if self.dts_reg_hdl: + self.dts_reg_hdl.deregister() + self.dts_reg_hdl = None + + if self.dts_pub_hdl: + self.dts_pub_hdl.deregister() + self.dts_pub_hdl = None + @asyncio.coroutine def register_to_publish(self): ''' Register to DTS for publishing cm-state opdata ''' - xpath = "D,/rw-conman:cm-state/rw-conman:cm-nsr" + xpath = self._project.add_project("D,/rw-conman:cm-state/rw-conman:cm-nsr") self._log.debug("Registering to publish cm-state @ %s", xpath) hdl = rift.tasklets.DTS.RegistrationHandler() with self._dts.group_create() as group: @@ -1429,7 +1535,7 @@ class ConfigManagerDTS(object): @property def nsr_xpath(self): - return "D,/nsr:ns-instance-opdata/nsr:nsr" + return self._project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr") @asyncio.coroutine def register_for_nsr(self):