X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwcm%2Fplugins%2Frwconman%2Frift%2Ftasklets%2Frwconmantasklet%2Frwconmantasklet.py;h=3c0cd48f026d9b5e7b6f6169649cb6b97ec1ec0c;hb=refs%2Fchanges%2F77%2F5477%2F1;hp=4e92b6c1fe9201a35cbe4ed7d3e74e2b8fb85014;hpb=6f1a3fe149e4a6b9803382cb299c902f4cf58ec9;p=osm%2FSO.git diff --git a/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconmantasklet.py b/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconmantasklet.py index 4e92b6c1..3c0cd48f 100755 --- a/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconmantasklet.py +++ b/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconmantasklet.py @@ -1,6 +1,6 @@ -# -# Copyright 2016 RIFT.IO Inc +# +# Copyright 2016-2017 RIFT.IO Inc # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,15 +18,15 @@ ''' This file - ConfigManagerTasklet() | ++ +| +ConfigManagerProject() +| +--|--> ConfigurationManager() | +--> rwconman_config.py - ConfigManagerConfig() - | | - | +--> ConfigManagerNSR() - | - +--> rwconman_events.py - ConfigManagerEvents() - | - +--> ConfigManagerROif() + | + +--> ConfigManagerNSR() ''' @@ -44,9 +44,12 @@ from gi.repository import ( ) import rift.tasklets +from rift.mano.utils.project import ( + ManoProject, + ProjectHandler, + ) from . import rwconman_config as Config -from . import rwconman_events as Event def log_this_vnf(vnf_cfg): log_vnf = "" @@ -60,21 +63,21 @@ def log_this_vnf(vnf_cfg): return log_vnf class ConfigurationManager(object): - def __init__(self, log, loop, dts): + def __init__(self, log, loop, dts, project): self._log = log self._loop = loop self._dts = dts + self._project = project + self.cfg_sleep = True - self.cfg_dir = os.path.join(os.environ["RIFT_INSTALL"], "etc/conman") self._config = Config.ConfigManagerConfig(self._dts, self._log, self._loop, self) - self._event = Event.ConfigManagerEvents(self._dts, self._log, self._loop, self) self.pending_cfg = [] self.pending_tasks = {} self._nsr_objs = {} + self._task = None # The configuration_handler task self._handlers = [ - self._config, - self._event, + self._config ] @@ -89,7 +92,7 @@ class ConfigurationManager(object): self._log.info("Updating cm-state for NS(%s) to:%s", nsr_obj.nsr_name, state) yield from nsr_obj.update_ns_cm_state(state) - def add_to_pending(self, nsr_obj): + def add_to_pending(self, nsr_obj, cfg_vnfr_list): if (nsr_obj not in self.pending_cfg and nsr_obj.cm_nsr['state'] == nsr_obj.state_to_string(conmanY.RecordState.RECEIVED)): @@ -97,17 +100,9 @@ class ConfigurationManager(object): self._log.info("Adding NS={} to pending config list" .format(nsr_obj.nsr_name)) - # Build the list - nsr_obj.vnf_cfg_list = [] - # Sort all the VNF by their configuration attribute priority - sorted_dict = dict(sorted(nsr_obj.nsr_cfg_config_attributes_dict.items())) - for config_attributes_dict in sorted_dict.values(): - # Iterate through each priority level - for config_priority in config_attributes_dict: - # Iterate through each vnfr at this priority level - vnfr = nsr_obj._vnfr_dict[config_priority['id']] - self._log.debug("Adding VNF:(%s) to pending cfg list", log_this_vnf(vnfr['vnf_cfg'])) - nsr_obj.vnf_cfg_list.append(vnfr['vnf_cfg']) + for cfg_vnfr in cfg_vnfr_list: + self._log.debug("Adding VNF:(%s) to pending cfg list", log_this_vnf(cfg_vnfr['vnf_cfg'])) + nsr_obj.vnf_cfg_list.append(cfg_vnfr['vnf_cfg']) self.pending_cfg.append(nsr_obj) def add_nsr_obj(self, nsr_obj): @@ -119,6 +114,9 @@ class ConfigurationManager(object): del self._nsr_objs[nsr_id] def get_nsr_obj(self, nsr_id): + if nsr_id not in self._nsr_objs: + self._log.info("NSR %s not found", nsr_id) + return None self._log.debug("Returning nsr_obj (%s) from Configuration Manager", self._nsr_objs[nsr_id]) return self._nsr_objs.get(nsr_id) @@ -146,12 +144,13 @@ class ConfigurationManager(object): done)) if done: - self._log.warn("Apply initial config on VNFR {}". + self._log.debug("Apply initial config on VNFR {}". format(log_this_vnf(vnf_cfg))) try: yield from nsr_obj.parent.process_vnf_initial_config( nsr_obj, - agent_vnfr.vnfr_msg) + agent_vnfr.vnfr_msg, + self._project.name) yield from self.update_vnf_state(vnf_cfg, conmanY.RecordState.READY) @@ -162,6 +161,7 @@ class ConfigurationManager(object): conmanY.RecordState.CFG_FAILED) else: + self._log.debug("Getting config status {}".format(log_this_vnf(vnf_cfg))) # Check to see if the VNF configure failed status = yield from self._config._config_agent_mgr.invoke_config_agent_plugins( 'get_config_status', @@ -215,7 +215,7 @@ class ConfigurationManager(object): nsr_obj.nsr_failed = False self._log.debug("Apply initial config on NSR {}".format(nsr_obj.nsr_name)) try: - yield from nsr_obj.parent.process_ns_initial_config(nsr_obj) + yield from nsr_obj.parent.process_ns_initial_config(nsr_obj, self._project.name) except Exception as e: nsr_obj.nsr_failed = True self._log.exception(e) @@ -228,71 +228,74 @@ class ConfigurationManager(object): yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_FAILED) return ret_status - # Basically, this loop will never end. - while True: - # Check the pending tasks are complete - # Store a list of tasks that are completed and - # remove from the pending_tasks list outside loop - ids = [] - for nsr_id, task in self.pending_tasks.items(): - if task.done(): - ids.append(nsr_id) - e = task.exception() - if e: - self._log.error("Exception in configuring nsr {}: {}". - format(nsr_id, e)) - nsr_obj = self.get_nsr_obj(nsr_id) - if nsr_obj: - yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_FAILED, str(e)) - + try: + # Basically, this loop will never end. + while True: + # Check the pending tasks are complete + # Store a list of tasks that are completed and + # remove from the pending_tasks list outside loop + ids = [] + for nsr_id, task in self.pending_tasks.items(): + if task.done(): + ids.append(nsr_id) + e = task.exception() + if e: + self._log.error("Exception in configuring nsr {}: {}". + format(nsr_id, e)) + nsr_obj = self.get_nsr_obj(nsr_id) + if nsr_obj: + yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_FAILED, str(e)) + + else: + rc = task.result() + self._log.debug("NSR {} configured: {}".format(nsr_id, rc)) else: - rc = task.result() - self._log.debug("NSR {} configured: {}".format(nsr_id, rc)) - else: - self._log.debug("NSR {} still configuring".format(nsr_id)) - - # Remove the completed tasks - for nsr_id in ids: - self.pending_tasks.pop(nsr_id) - - # TODO (pjoseph): Fix this - # Sleep before processing any NS (Why are we getting multiple NSR running DTS updates?) - # If the sleep is not 10 seconds it does not quite work, NSM is marking it 'running' - # wrongfully 10 seconds in advance? - yield from asyncio.sleep(10, loop=self._loop) - - if self.pending_cfg: - # get first NS, pending_cfg is nsr_obj list - nsr_obj = self.pending_cfg[0] - nsr_done = False - if nsr_obj.being_deleted is False: - # Process this NS, returns back same obj is successfull or exceeded retries - try: - self._log.info("Processing NSR:{}".format(nsr_obj.nsr_name)) - - # Check if we already have a task running for this NSR - # Case where we are still configuring and terminate is called - if nsr_obj.nsr_id in self.pending_tasks: - self._log.error("NSR {} in state {} has a configure task running.". - format(nsr_obj.nsr_name, nsr_obj.get_ns_cm_state())) - # Terminate the task for this NSR - self.pending_tasks[nsr_obj.nsr_id].cancel() - - yield from self.update_ns_state(nsr_obj, conmanY.RecordState.CFG_PROCESS) - - # Call in a separate thread - self.pending_tasks[nsr_obj.nsr_id] = \ - self._loop.create_task( - process_nsr_obj(nsr_obj) - ) - - # Remove this nsr_obj - self.pending_cfg.remove(nsr_obj) - - except Exception as e: - self._log.error("Failed to process NSR as %s", str(e)) - self._log.exception(e) - + self._log.debug("NSR {} still configuring".format(nsr_id)) + + # Remove the completed tasks + for nsr_id in ids: + self.pending_tasks.pop(nsr_id) + + # TODO (pjoseph): Fix this + # Sleep before processing any NS (Why are we getting multiple NSR running DTS updates?) + # If the sleep is not 10 seconds it does not quite work, NSM is marking it 'running' + # wrongfully 10 seconds in advance? + yield from asyncio.sleep(10, loop=self._loop) + + if self.pending_cfg: + # get first NS, pending_cfg is nsr_obj list + nsr_obj = self.pending_cfg[0] + nsr_done = False + if nsr_obj.being_deleted is False: + # Process this NS, returns back same obj is successfull or exceeded retries + try: + self._log.info("Processing NSR:{}".format(nsr_obj.nsr_name)) + + # Check if we already have a task running for this NSR + # Case where we are still configuring and terminate is called + if nsr_obj.nsr_id in self.pending_tasks: + self._log.error("NSR {} in state {} has a configure task running.". + format(nsr_obj.nsr_name, nsr_obj.get_ns_cm_state())) + # Terminate the task for this NSR + self.pending_tasks[nsr_obj.nsr_id].cancel() + + yield from self.update_ns_state(nsr_obj, conmanY.RecordState.CFG_PROCESS) + + # Call in a separate thread + self.pending_tasks[nsr_obj.nsr_id] = \ + self._loop.create_task( + process_nsr_obj(nsr_obj) + ) + + # Remove this nsr_obj + self.pending_cfg.remove(nsr_obj) + + except Exception as e: + self._log.error("Failed to process NSR as %s", str(e)) + self._log.exception(e) + + except asyncio.CancelledError as e: + self._log.debug("Stopped configuration handler for project {}".format(self._project)) @asyncio.coroutine def register(self): @@ -300,7 +303,37 @@ class ConfigurationManager(object): for reg in self._handlers: yield from reg.register() - asyncio.ensure_future(self.configuration_handler(), loop=self._loop) + self._task = asyncio.ensure_future(self.configuration_handler(), loop=self._loop) + + def deregister(self): + self._log.debug("De-register conman for project {}".format(self._project.name)) + self._task.cancel() + + for reg in self._handlers: + reg.deregister() + + +class ConfigManagerProject(ManoProject): + + def __init__(self, name, tasklet, **kw): + super(ConfigManagerProject, self).__init__(tasklet.log, name) + self.update(tasklet) + + self._con_man = None + + @asyncio.coroutine + def register (self): + self._log.info("Initializing the Configuration-Manager tasklet") + self._con_man = ConfigurationManager(self.log, + self.loop, + self._dts, + self,) + yield from self._con_man.register() + + def deregister(self): + self._log.debug("De-register project {}".format(self.name)) + self._con_man.deregister() + class ConfigManagerTasklet(rift.tasklets.Tasklet): def __init__(self, *args, **kwargs): @@ -308,7 +341,13 @@ class ConfigManagerTasklet(rift.tasklets.Tasklet): self.rwlog.set_category("rw-conman-log") self._dts = None - self._con_man = None + + self.project_handler = None + self.projects = {} + + @property + def dts(self): + return self._dts def start(self): super(ConfigManagerTasklet, self).start() @@ -327,11 +366,9 @@ class ConfigManagerTasklet(rift.tasklets.Tasklet): @asyncio.coroutine def init(self): - self._log.info("Initializing the Configuration-Manager tasklet") - self._con_man = ConfigurationManager(self.log, - self.loop, - self._dts) - yield from self._con_man.register() + self.log.debug("creating project handler") + self.project_handler = ProjectHandler(self, ConfigManagerProject) + self.project_handler.register() @asyncio.coroutine def run(self):