-#
-# Copyright 2016 RIFT.IO Inc
+#
+# Copyright 2016-2017 RIFT.IO Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
'''
This file - ConfigManagerTasklet()
|
++
+|
+ConfigManagerProject()
+|
+--|--> ConfigurationManager()
|
+--> rwconman_config.py - ConfigManagerConfig()
- | |
- | +--> ConfigManagerNSR()
- |
- +--> rwconman_events.py - ConfigManagerEvents()
- |
- +--> ConfigManagerROif()
+ |
+ +--> ConfigManagerNSR()
'''
)
import rift.tasklets
+from rift.mano.utils.project import (
+ ManoProject,
+ ProjectHandler,
+ )
from . import rwconman_config as Config
-from . import rwconman_events as Event
def log_this_vnf(vnf_cfg):
log_vnf = ""
return log_vnf
class ConfigurationManager(object):
- def __init__(self, log, loop, dts):
+ def __init__(self, log, loop, dts, project):
self._log = log
self._loop = loop
self._dts = dts
+ self._project = project
+
self.cfg_sleep = True
- self.cfg_dir = os.path.join(os.environ["RIFT_INSTALL"], "etc/conman")
self._config = Config.ConfigManagerConfig(self._dts, self._log, self._loop, self)
- self._event = Event.ConfigManagerEvents(self._dts, self._log, self._loop, self)
self.pending_cfg = []
self.pending_tasks = {}
self._nsr_objs = {}
+ self._task = None # The configuration_handler task
self._handlers = [
- self._config,
- self._event,
+ self._config
]
self._log.info("Updating cm-state for NS(%s) to:%s", nsr_obj.nsr_name, state)
yield from nsr_obj.update_ns_cm_state(state)
- def add_to_pending(self, nsr_obj):
+ def add_to_pending(self, nsr_obj, cfg_vnfr_list):
if (nsr_obj not in self.pending_cfg and
nsr_obj.cm_nsr['state'] == nsr_obj.state_to_string(conmanY.RecordState.RECEIVED)):
self._log.info("Adding NS={} to pending config list"
.format(nsr_obj.nsr_name))
- # Build the list
- nsr_obj.vnf_cfg_list = []
- # Sort all the VNF by their configuration attribute priority
- sorted_dict = dict(sorted(nsr_obj.nsr_cfg_config_attributes_dict.items()))
- for config_attributes_dict in sorted_dict.values():
- # Iterate through each priority level
- for config_priority in config_attributes_dict:
- # Iterate through each vnfr at this priority level
- vnfr = nsr_obj._vnfr_dict[config_priority['id']]
- self._log.debug("Adding VNF:(%s) to pending cfg list", log_this_vnf(vnfr['vnf_cfg']))
- nsr_obj.vnf_cfg_list.append(vnfr['vnf_cfg'])
+ for cfg_vnfr in cfg_vnfr_list:
+ self._log.debug("Adding VNF:(%s) to pending cfg list", log_this_vnf(cfg_vnfr['vnf_cfg']))
+ nsr_obj.vnf_cfg_list.append(cfg_vnfr['vnf_cfg'])
self.pending_cfg.append(nsr_obj)
def add_nsr_obj(self, nsr_obj):
del self._nsr_objs[nsr_id]
def get_nsr_obj(self, nsr_id):
+ if nsr_id not in self._nsr_objs:
+ self._log.info("NSR %s not found", nsr_id)
+ return None
self._log.debug("Returning nsr_obj (%s) from Configuration Manager", self._nsr_objs[nsr_id])
return self._nsr_objs.get(nsr_id)
done))
if done:
- yield from self.update_vnf_state(vnf_cfg, conmanY.RecordState.READY)
+ self._log.debug("Apply initial config on VNFR {}".
+ format(log_this_vnf(vnf_cfg)))
+ try:
+ yield from nsr_obj.parent.process_vnf_initial_config(
+ nsr_obj,
+ agent_vnfr.vnfr_msg,
+ self._project.name)
+ yield from self.update_vnf_state(vnf_cfg,
+ conmanY.RecordState.READY)
+
+ except Exception as e:
+ nsr_obj.vnf_failed = True
+ self._log.exception(e)
+ yield from self.update_vnf_state(vnf_cfg,
+ conmanY.RecordState.CFG_FAILED)
else:
+ self._log.debug("Getting config status {}".format(log_this_vnf(vnf_cfg)))
# Check to see if the VNF configure failed
status = yield from self._config._config_agent_mgr.invoke_config_agent_plugins(
'get_config_status',
self._log.error("Failed to apply configuration for VNF = {}"
.format(log_this_vnf(vnf_cfg)))
+
return done
@asyncio.coroutine
nsr_obj.nsr_failed = False
self._log.debug("Apply initial config on NSR {}".format(nsr_obj.nsr_name))
try:
- yield from nsr_obj.parent.process_ns_initial_config(nsr_obj)
+ yield from nsr_obj.parent.process_ns_initial_config(nsr_obj, self._project.name)
except Exception as e:
nsr_obj.nsr_failed = True
self._log.exception(e)
yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_FAILED)
return ret_status
- # Basically, this loop will never end.
- while True:
- # Check the pending tasks are complete
- # Store a list of tasks that are completed and
- # remove from the pending_tasks list outside loop
- ids = []
- for nsr_id, task in self.pending_tasks.items():
- if task.done():
- ids.append(nsr_id)
- e = task.exception()
- if e:
- self._log.error("Exception in configuring nsr {}: {}".
- format(nsr_id, e))
- nsr_obj = self.get_nsr_obj(nsr_id)
- if nsr_obj:
- yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_FAILED, str(e))
-
+ try:
+ # Basically, this loop will never end.
+ while True:
+ # Check the pending tasks are complete
+ # Store a list of tasks that are completed and
+ # remove from the pending_tasks list outside loop
+ ids = []
+ for nsr_id, task in self.pending_tasks.items():
+ if task.done():
+ ids.append(nsr_id)
+ e = task.exception()
+ if e:
+ self._log.error("Exception in configuring nsr {}: {}".
+ format(nsr_id, e))
+ nsr_obj = self.get_nsr_obj(nsr_id)
+ if nsr_obj:
+ yield from nsr_obj.update_ns_cm_state(conmanY.RecordState.CFG_FAILED, str(e))
+
+ else:
+ rc = task.result()
+ self._log.debug("NSR {} configured: {}".format(nsr_id, rc))
else:
- rc = task.result()
- self._log.debug("NSR {} configured: {}".format(nsr_id, rc))
- else:
- self._log.debug("NSR {} still configuring".format(nsr_id))
-
- # Remove the completed tasks
- for nsr_id in ids:
- self.pending_tasks.pop(nsr_id)
-
- # TODO (pjoseph): Fix this
- # Sleep before processing any NS (Why are we getting multiple NSR running DTS updates?)
- # If the sleep is not 10 seconds it does not quite work, NSM is marking it 'running'
- # wrongfully 10 seconds in advance?
- yield from asyncio.sleep(10, loop=self._loop)
-
- if self.pending_cfg:
- # get first NS, pending_cfg is nsr_obj list
- nsr_obj = self.pending_cfg[0]
- nsr_done = False
- if nsr_obj.being_deleted is False:
- # Process this NS, returns back same obj is successfull or exceeded retries
- try:
- self._log.info("Processing NSR:{}".format(nsr_obj.nsr_name))
-
- # Check if we already have a task running for this NSR
- # Case where we are still configuring and terminate is called
- if nsr_obj.nsr_id in self.pending_tasks:
- self._log.error("NSR {} in state {} has a configure task running.".
- format(nsr_obj.nsr_name, nsr_obj.get_ns_cm_state()))
- # Terminate the task for this NSR
- self.pending_tasks[nsr_obj.nsr_id].cancel()
-
- yield from self.update_ns_state(nsr_obj, conmanY.RecordState.CFG_PROCESS)
-
- # Call in a separate thread
- self.pending_tasks[nsr_obj.nsr_id] = \
- self._loop.create_task(
- process_nsr_obj(nsr_obj)
- )
-
- # Remove this nsr_obj
- self.pending_cfg.remove(nsr_obj)
-
- except Exception as e:
- self._log.error("Failed to process NSR as %s", str(e))
- self._log.exception(e)
-
+ self._log.debug("NSR {} still configuring".format(nsr_id))
+
+ # Remove the completed tasks
+ for nsr_id in ids:
+ self.pending_tasks.pop(nsr_id)
+
+ # TODO (pjoseph): Fix this
+ # Sleep before processing any NS (Why are we getting multiple NSR running DTS updates?)
+ # If the sleep is not 10 seconds it does not quite work, NSM is marking it 'running'
+ # wrongfully 10 seconds in advance?
+ yield from asyncio.sleep(10, loop=self._loop)
+
+ if self.pending_cfg:
+ # get first NS, pending_cfg is nsr_obj list
+ nsr_obj = self.pending_cfg[0]
+ nsr_done = False
+ if nsr_obj.being_deleted is False:
+ # Process this NS, returns back same obj is successfull or exceeded retries
+ try:
+ self._log.info("Processing NSR:{}".format(nsr_obj.nsr_name))
+
+ # Check if we already have a task running for this NSR
+ # Case where we are still configuring and terminate is called
+ if nsr_obj.nsr_id in self.pending_tasks:
+ self._log.error("NSR {} in state {} has a configure task running.".
+ format(nsr_obj.nsr_name, nsr_obj.get_ns_cm_state()))
+ # Terminate the task for this NSR
+ self.pending_tasks[nsr_obj.nsr_id].cancel()
+
+ yield from self.update_ns_state(nsr_obj, conmanY.RecordState.CFG_PROCESS)
+
+ # Call in a separate thread
+ self.pending_tasks[nsr_obj.nsr_id] = \
+ self._loop.create_task(
+ process_nsr_obj(nsr_obj)
+ )
+
+ # Remove this nsr_obj
+ self.pending_cfg.remove(nsr_obj)
+
+ except Exception as e:
+ self._log.error("Failed to process NSR as %s", str(e))
+ self._log.exception(e)
+
+ except asyncio.CancelledError as e:
+ self._log.debug("Stopped configuration handler for project {}".format(self._project))
@asyncio.coroutine
def register(self):
for reg in self._handlers:
yield from reg.register()
- asyncio.ensure_future(self.configuration_handler(), loop=self._loop)
+ self._task = asyncio.ensure_future(self.configuration_handler(), loop=self._loop)
+
+ def deregister(self):
+ self._log.debug("De-register conman for project {}".format(self._project.name))
+ self._task.cancel()
+
+ for reg in self._handlers:
+ reg.deregister()
+
+
+class ConfigManagerProject(ManoProject):
+
+ def __init__(self, name, tasklet, **kw):
+ super(ConfigManagerProject, self).__init__(tasklet.log, name)
+ self.update(tasklet)
+
+ self._con_man = None
+
+ @asyncio.coroutine
+ def register (self):
+ self._log.info("Initializing the Configuration-Manager tasklet")
+ self._con_man = ConfigurationManager(self.log,
+ self.loop,
+ self._dts,
+ self,)
+ yield from self._con_man.register()
+
+ def deregister(self):
+ self._log.debug("De-register project {}".format(self.name))
+ self._con_man.deregister()
+
class ConfigManagerTasklet(rift.tasklets.Tasklet):
def __init__(self, *args, **kwargs):
self.rwlog.set_category("rw-conman-log")
self._dts = None
- self._con_man = None
+
+ self.project_handler = None
+ self.projects = {}
+
+ @property
+ def dts(self):
+ return self._dts
def start(self):
super(ConfigManagerTasklet, self).start()
@asyncio.coroutine
def init(self):
- self._log.info("Initializing the Configuration-Manager tasklet")
- self._con_man = ConfigurationManager(self.log,
- self.loop,
- self._dts)
- yield from self._con_man.register()
+ self.log.debug("creating project handler")
+ self.project_handler = ProjectHandler(self, ConfigManagerProject)
+ self.project_handler.register()
@asyncio.coroutine
def run(self):