X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwcm%2Fplugins%2Frwconman%2Frift%2Ftasklets%2Frwconmantasklet%2Frwconmantasklet.py;h=e260b24505cdc22a40209e5c7638dbd8be9528e8;hb=9ad945aab0b5a992e1df860bede8ecc9b143470e;hp=7ea73c49c7a55fae1b5e26a5df2663083f9feef6;hpb=6f07e6f33f751ab4ffe624f6037f887b243bece2;p=osm%2FSO.git diff --git a/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconmantasklet.py b/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconmantasklet.py index 7ea73c49..e260b245 100755 --- a/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconmantasklet.py +++ b/rwcm/plugins/rwconman/rift/tasklets/rwconmantasklet/rwconmantasklet.py @@ -1,6 +1,6 @@ -# -# Copyright 2016 RIFT.IO Inc +# +# Copyright 2016-2017 RIFT.IO Inc # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +18,10 @@ ''' This file - ConfigManagerTasklet() | ++ +| +ConfigManagerProject() +| +--|--> ConfigurationManager() | +--> rwconman_config.py - ConfigManagerConfig() @@ -44,6 +48,10 @@ from gi.repository import ( ) import rift.tasklets +from rift.mano.utils.project import ( + ManoProject, + ProjectHandler, + ) from . import rwconman_config as Config from . import rwconman_events as Event @@ -60,10 +68,12 @@ def log_this_vnf(vnf_cfg): return log_vnf class ConfigurationManager(object): - def __init__(self, log, loop, dts): + def __init__(self, log, loop, dts, project): self._log = log self._loop = loop self._dts = dts + self._project = project + self.cfg_sleep = True self.cfg_dir = os.path.join(os.environ["RIFT_INSTALL"], "etc/conman") self._config = Config.ConfigManagerConfig(self._dts, self._log, self._loop, self) @@ -71,6 +81,7 @@ class ConfigurationManager(object): self.pending_cfg = [] self.pending_tasks = {} self._nsr_objs = {} + self._task = None # The configuration_handler task self._handlers = [ self._config, @@ -146,7 +157,20 @@ class ConfigurationManager(object): done)) if done: - yield from self.update_vnf_state(vnf_cfg, conmanY.RecordState.READY) + self._log.warn("Apply initial config on VNFR {}". + format(log_this_vnf(vnf_cfg))) + try: + yield from nsr_obj.parent.process_vnf_initial_config( + nsr_obj, + agent_vnfr.vnfr_msg) + yield from self.update_vnf_state(vnf_cfg, + conmanY.RecordState.READY) + + except Exception as e: + nsr_obj.vnf_failed = True + self._log.exception(e) + yield from self.update_vnf_state(vnf_cfg, + conmanY.RecordState.CFG_FAILED) else: # Check to see if the VNF configure failed @@ -163,6 +187,7 @@ class ConfigurationManager(object): self._log.error("Failed to apply configuration for VNF = {}" .format(log_this_vnf(vnf_cfg))) + return done @asyncio.coroutine @@ -214,71 +239,74 @@ class ConfigurationManager(object): yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_FAILED) return ret_status - # Basically, this loop will never end. - while True: - # Check the pending tasks are complete - # Store a list of tasks that are completed and - # remove from the pending_tasks list outside loop - ids = [] - for nsr_id, task in self.pending_tasks.items(): - if task.done(): - ids.append(nsr_id) - e = task.exception() - if e: - self._log.error("Exception in configuring nsr {}: {}". - format(nsr_id, e)) - nsr_obj = self.get_nsr_obj(nsr_id) - if nsr_obj: - yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_FAILED, str(e)) - + try: + # Basically, this loop will never end. + while True: + # Check the pending tasks are complete + # Store a list of tasks that are completed and + # remove from the pending_tasks list outside loop + ids = [] + for nsr_id, task in self.pending_tasks.items(): + if task.done(): + ids.append(nsr_id) + e = task.exception() + if e: + self._log.error("Exception in configuring nsr {}: {}". + format(nsr_id, e)) + nsr_obj = self.get_nsr_obj(nsr_id) + if nsr_obj: + yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_FAILED, str(e)) + + else: + rc = task.result() + self._log.debug("NSR {} configured: {}".format(nsr_id, rc)) else: - rc = task.result() - self._log.debug("NSR {} configured: {}".format(nsr_id, rc)) - else: - self._log.debug("NSR {} still configuring".format(nsr_id)) - - # Remove the completed tasks - for nsr_id in ids: - self.pending_tasks.pop(nsr_id) - - # TODO (pjoseph): Fix this - # Sleep before processing any NS (Why are we getting multiple NSR running DTS updates?) - # If the sleep is not 10 seconds it does not quite work, NSM is marking it 'running' - # wrongfully 10 seconds in advance? - yield from asyncio.sleep(10, loop=self._loop) - - if self.pending_cfg: - # get first NS, pending_cfg is nsr_obj list - nsr_obj = self.pending_cfg[0] - nsr_done = False - if nsr_obj.being_deleted is False: - # Process this NS, returns back same obj is successfull or exceeded retries - try: - self._log.info("Processing NSR:{}".format(nsr_obj.nsr_name)) - - # Check if we already have a task running for this NSR - # Case where we are still configuring and terminate is called - if nsr_obj.nsr_id in self.pending_tasks: - self._log.error("NSR {} in state {} has a configure task running.". - format(nsr_obj.nsr_name, nsr_obj.get_ns_cm_state())) - # Terminate the task for this NSR - self.pending_tasks[nsr_obj.nsr_id].cancel() - - yield from self.update_ns_state(nsr_obj, conmanY.RecordState.CFG_PROCESS) - - # Call in a separate thread - self.pending_tasks[nsr_obj.nsr_id] = \ - self._loop.create_task( - process_nsr_obj(nsr_obj) - ) - - # Remove this nsr_obj - self.pending_cfg.remove(nsr_obj) - - except Exception as e: - self._log.error("Failed to process NSR as %s", str(e)) - self._log.exception(e) - + self._log.debug("NSR {} still configuring".format(nsr_id)) + + # Remove the completed tasks + for nsr_id in ids: + self.pending_tasks.pop(nsr_id) + + # TODO (pjoseph): Fix this + # Sleep before processing any NS (Why are we getting multiple NSR running DTS updates?) + # If the sleep is not 10 seconds it does not quite work, NSM is marking it 'running' + # wrongfully 10 seconds in advance? + yield from asyncio.sleep(10, loop=self._loop) + + if self.pending_cfg: + # get first NS, pending_cfg is nsr_obj list + nsr_obj = self.pending_cfg[0] + nsr_done = False + if nsr_obj.being_deleted is False: + # Process this NS, returns back same obj is successfull or exceeded retries + try: + self._log.info("Processing NSR:{}".format(nsr_obj.nsr_name)) + + # Check if we already have a task running for this NSR + # Case where we are still configuring and terminate is called + if nsr_obj.nsr_id in self.pending_tasks: + self._log.error("NSR {} in state {} has a configure task running.". + format(nsr_obj.nsr_name, nsr_obj.get_ns_cm_state())) + # Terminate the task for this NSR + self.pending_tasks[nsr_obj.nsr_id].cancel() + + yield from self.update_ns_state(nsr_obj, conmanY.RecordState.CFG_PROCESS) + + # Call in a separate thread + self.pending_tasks[nsr_obj.nsr_id] = \ + self._loop.create_task( + process_nsr_obj(nsr_obj) + ) + + # Remove this nsr_obj + self.pending_cfg.remove(nsr_obj) + + except Exception as e: + self._log.error("Failed to process NSR as %s", str(e)) + self._log.exception(e) + + except asyncio.CancelledError as e: + self._log.debug("Stopped configuration handler for project {}".format(self._project)) @asyncio.coroutine def register(self): @@ -286,7 +314,37 @@ class ConfigurationManager(object): for reg in self._handlers: yield from reg.register() - asyncio.ensure_future(self.configuration_handler(), loop=self._loop) + self._task = asyncio.ensure_future(self.configuration_handler(), loop=self._loop) + + def deregister(self): + self._log.debug("De-register conman for project {}".format(self._project.name)) + self._task.cancel() + + for reg in self._handlers: + reg.deregister() + + +class ConfigManagerProject(ManoProject): + + def __init__(self, name, tasklet, **kw): + super(ConfigManagerProject, self).__init__(tasklet.log, name) + self.update(tasklet) + + self._con_man = None + + @asyncio.coroutine + def register (self): + self._log.info("Initializing the Configuration-Manager tasklet") + self._con_man = ConfigurationManager(self.log, + self.loop, + self._dts, + self,) + yield from self._con_man.register() + + def deregister(self): + self._log.debug("De-register project {}".format(self.name)) + self._con_man.deregister() + class ConfigManagerTasklet(rift.tasklets.Tasklet): def __init__(self, *args, **kwargs): @@ -294,7 +352,13 @@ class ConfigManagerTasklet(rift.tasklets.Tasklet): self.rwlog.set_category("rw-conman-log") self._dts = None - self._con_man = None + + self.project_handler = None + self.projects = {} + + @property + def dts(self): + return self._dts def start(self): super(ConfigManagerTasklet, self).start() @@ -313,11 +377,9 @@ class ConfigManagerTasklet(rift.tasklets.Tasklet): @asyncio.coroutine def init(self): - self._log.info("Initializing the Configuration-Manager tasklet") - self._con_man = ConfigurationManager(self.log, - self.loop, - self._dts) - yield from self._con_man.register() + self.log.debug("creating project handler") + self.project_handler = ProjectHandler(self, ConfigManagerProject) + self.project_handler.register() @asyncio.coroutine def run(self):