X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwmonparam%2Frift%2Ftasklets%2Frwmonparam%2Frwmonparam.py;h=0cb3e94ee651879a6b41f13d8368e4bf82ba2941;hb=f314b4af9744068a7ed7a6a6314220c3aa857523;hp=04e03061bb42d6ab16e7e4a28ba677fc6cdb1473;hpb=7203e6545b8957eef84f60845285b3256269637e;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwmonparam/rift/tasklets/rwmonparam/rwmonparam.py b/rwlaunchpad/plugins/rwmonparam/rift/tasklets/rwmonparam/rwmonparam.py index 04e03061..0cb3e94e 100644 --- a/rwlaunchpad/plugins/rwmonparam/rift/tasklets/rwmonparam/rwmonparam.py +++ b/rwlaunchpad/plugins/rwmonparam/rift/tasklets/rwmonparam/rwmonparam.py @@ -1,5 +1,5 @@ """ -# +# # Copyright 2016 RIFT.IO Inc # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -30,106 +30,79 @@ gi.require_version('RwLaunchpadYang', '1.0') from gi.repository import ( RwDts as rwdts, RwLaunchpadYang, + NsrYang, ProtobufC) import rift.mano.cloud import rift.mano.dts as subscriber import rift.tasklets - +import concurrent.futures +from rift.mano.utils.project import ( + ManoProject, + ProjectHandler, + ) from . import vnfr_core from . import nsr_core -class MonitoringParameterTasklet(rift.tasklets.Tasklet): - """The main task of this Tasklet is to listen for VNFR changes and once the - VNFR hits the running state, triggers the monitor. - """ - def __init__(self, *args, **kwargs): - try: - super().__init__(*args, **kwargs) - self.rwlog.set_category("rw-monitor-log") - except Exception as e: - self.log.exception(e) +class MonParamProject(ManoProject): + + def __init__(self, name, tasklet, **kw): + super(MonParamProject, self).__init__(tasklet.log, name) + self.update(tasklet) self.vnfr_subscriber = None - self.store = None self.vnfr_monitors = {} self.nsr_monitors = {} + self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) # Needs to be moved to store once the DTS bug is resolved + # Gather all VNFRs self.vnfrs = {} - def start(self): - super().start() - - self.log.info("Starting MonitoringParameterTasklet") - self.log.debug("Registering with dts") - - self.dts = rift.tasklets.DTS( - self.tasklet_info, - RwLaunchpadYang.get_schema(), - self.loop, - self.on_dts_state_change - ) - - self.vnfr_subscriber = subscriber.VnfrCatalogSubscriber.from_tasklet( + self.vnfr_subscriber = subscriber.VnfrCatalogSubscriber.from_project( self, callback=self.handle_vnfr) - self.nsr_subsriber = subscriber.NsrCatalogSubscriber.from_tasklet( + self.nsr_subsriber = subscriber.NsrCatalogSubscriber.from_project( self, callback=self.handle_nsr) - self.store = subscriber.SubscriberStore.from_tasklet(self) + self._nsd_subscriber = subscriber.NsdCatalogSubscriber.from_project(self) + self._vnfd_subscriber = subscriber.VnfdCatalogSubscriber.from_project(self) self.log.debug("Created DTS Api GI Object: %s", self.dts) - def stop(self): - try: - self.dts.deinit() - except Exception as e: - self.log.exception(e) - @asyncio.coroutine - def init(self): + def register (self): self.log.debug("creating vnfr subscriber") - yield from self.store.register() + yield from self._nsd_subscriber.register() + yield from self._vnfd_subscriber.register() yield from self.vnfr_subscriber.register() yield from self.nsr_subsriber.register() - @asyncio.coroutine - def run(self): - pass - @asyncio.coroutine - def on_dts_state_change(self, state): - """Handle DTS state change + def deregister(self): + self.log.debug("De-register vnfr project {}".format(self.name)) + self._nsd_subscriber.deregister() + self._vnfd_subscriber.deregister() + self.vnfr_subscriber.deregister() + self.nsr_subsriber.deregister() - Take action according to current DTS state to transition application - into the corresponding application state - - Arguments - state - current dts state + def _unwrap(self, values, id_name): + try: + return values[0] + except KeyError: + self.log.exception("Unable to find the object with the given " + "ID {}".format(id_name)) - """ - switch = { - rwdts.State.INIT: rwdts.State.REGN_COMPLETE, - rwdts.State.CONFIG: rwdts.State.RUN, - } + def get_vnfd(self, vnfd_id): + values = [vnfd for vnfd in list(self._vnfd_subscriber.reg.get_xact_elements()) if vnfd.id == vnfd_id] + return self._unwrap(values, vnfd_id) - handlers = { - rwdts.State.INIT: self.init, - rwdts.State.RUN: self.run, - } + def get_nsd(self, nsd_id): + values = [nsd for nsd in list(self._nsd_subscriber.reg.get_xact_elements()) if nsd.id == nsd_id] + return self._unwrap(values, nsd_id) - # Transition application to next state - handler = handlers.get(state, None) - if handler is not None: - yield from handler() - - # Transition dts to next state - next_state = switch.get(state, None) - if next_state is not None: - self.dts.handle.set_state(next_state) def handle_vnfr(self, vnfr, action): """Starts a monitoring parameter job for every VNFR that reaches @@ -142,12 +115,15 @@ class MonitoringParameterTasklet(rift.tasklets.Tasklet): def vnfr_create(): # if vnfr.operational_status == "running" and vnfr.id not in self.vnfr_monitors: - if vnfr.config_status == "configured" and vnfr.id not in self.vnfr_monitors: + vnfr_status = (vnfr.operational_status == "running" and + vnfr.config_status in ["configured", "config_not_needed"]) + + if vnfr_status and vnfr.id not in self.vnfr_monitors: vnf_mon = vnfr_core.VnfMonitorDtsHandler.from_vnf_data( self, vnfr, - self.store.get_vnfd(vnfr.vnfd.id)) + self.get_vnfd(vnfr.vnfd.id)) self.vnfr_monitors[vnfr.id] = vnf_mon self.vnfrs[vnfr.id] = vnfr @@ -155,7 +131,10 @@ class MonitoringParameterTasklet(rift.tasklets.Tasklet): @asyncio.coroutine def task(): yield from vnf_mon.register() + if vnfr.nsr_id_ref in self.nsr_monitors: + vnf_mon.update_nsr_mon(self.nsr_monitors[vnfr.nsr_id_ref]) vnf_mon.start() + #self.update_nsrs(vnfr, action) self.loop.create_task(task()) @@ -166,47 +145,94 @@ class MonitoringParameterTasklet(rift.tasklets.Tasklet): vnf_mon = self.vnfr_monitors.pop(vnfr.id) vnf_mon.stop() self.vnfrs.pop(vnfr.id) + #self.update_nsrs(vnfr, action) if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]: vnfr_create() elif action == rwdts.QueryAction.DELETE: vnfr_delete() + def update_nsrs(self, vnfr, action): + if vnfr.nsr_id_ref not in self.nsr_monitors: + return + + monitor = self.nsr_monitors[vnfr.nsr_id_ref] + + if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]: + @asyncio.coroutine + def update_vnfr(): + yield from monitor.update([vnfr]) + + self.loop.create_task(update_vnfr()) + elif action == rwdts.QueryAction.DELETE: + @asyncio.coroutine + def delete_vnfr(): + try: + yield from monitor.delete([vnfr]) + except Exception as e: + self.log.exception(str(e)) + + self.loop.create_task(delete_vnfr()) + + def handle_nsr(self, nsr, action): """Callback for NSR opdata changes. Creates a publisher for every NS that moves to config state. Args: - nsr (RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr): Ns Opdata + nsr (RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr): Ns Opdata action (rwdts.QueryAction): Action type of the change. """ + def nsr_create(): - # if nsr.operational_status == "running" and nsr.ns_instance_config_ref not in self.nsr_monitors: - if nsr.config_status == "configured" and nsr.ns_instance_config_ref not in self.nsr_monitors: - nsr_mon = nsr_core.NsrMonitorDtsHandler( - self.log, - self.dts, - self.loop, - nsr, - list(self.vnfrs.values()), - self.store - ) - - self.nsr_monitors[nsr.ns_instance_config_ref] = nsr_mon + # TODO clean up the if-else mess, exception - @asyncio.coroutine - def task(): - yield from nsr_mon.register() - yield from nsr_mon.start() + success_state = (nsr.operational_status == "running" and + nsr.config_status == "configured") - self.loop.create_task(task()) + if not success_state: + return + + if nsr.ns_instance_config_ref in self.nsr_monitors: + return + constituent_vnfrs = [] + for vnfr_id in nsr.constituent_vnfr_ref: + if (vnfr_id.vnfr_id in self.vnfrs): + vnfr_obj = self.vnfrs[vnfr_id.vnfr_id] + constituent_vnfrs.append(vnfr_obj) + else: + pass + + nsr_mon = nsr_core.NsrMonitorDtsHandler( + self.log, + self.dts, + self.loop, + self, + nsr, + constituent_vnfrs + ) + for vnfr_id in nsr.constituent_vnfr_ref: + if vnfr_id.vnfr_id in self.vnfr_monitors: + self.vnfr_monitors[vnfr_id.vnfr_id].update_nsr_mon(nsr_mon) + + self.nsr_monitors[nsr.ns_instance_config_ref] = nsr_mon + + + @asyncio.coroutine + def task(): + try: + yield from nsr_mon.register() + yield from nsr_mon.start() + except Exception as e: + self.log.exception(e) + + self.loop.create_task(task()) def nsr_delete(): if nsr.ns_instance_config_ref in self.nsr_monitors: - # if vnfr.operational_status == "running" and vnfr.id in self.vnfr_monitors: nsr_mon = self.nsr_monitors.pop(nsr.ns_instance_config_ref) nsr_mon.stop() @@ -214,3 +240,78 @@ class MonitoringParameterTasklet(rift.tasklets.Tasklet): nsr_create() elif action == rwdts.QueryAction.DELETE: nsr_delete() + + +class MonitoringParameterTasklet(rift.tasklets.Tasklet): + """The main task of this Tasklet is to listen for VNFR changes and once the + VNFR hits the running state, triggers the monitor. + """ + def __init__(self, *args, **kwargs): + try: + super().__init__(*args, **kwargs) + self.rwlog.set_category("rw-monitor-log") + except Exception as e: + self.log.exception(e) + + self._project_handler = None + self.projects = {} + + def start(self): + super().start() + + self.log.info("Starting MonitoringParameterTasklet") + self.log.debug("Registering with dts") + + self.dts = rift.tasklets.DTS( + self.tasklet_info, + NsrYang.get_schema(), + self.loop, + self.on_dts_state_change + ) + + def stop(self): + try: + self.dts.deinit() + except Exception as e: + self.log.exception(e) + + @asyncio.coroutine + def init(self): + self.log.debug("creating project handler") + self.project_handler = ProjectHandler(self, MonParamProject) + self.project_handler.register() + + @asyncio.coroutine + def run(self): + pass + + @asyncio.coroutine + def on_dts_state_change(self, state): + """Handle DTS state change + + Take action according to current DTS state to transition application + into the corresponding application state + + Arguments + state - current dts state + + """ + switch = { + rwdts.State.INIT: rwdts.State.REGN_COMPLETE, + rwdts.State.CONFIG: rwdts.State.RUN, + } + + handlers = { + rwdts.State.INIT: self.init, + rwdts.State.RUN: self.run, + } + + # Transition application to next state + handler = handlers.get(state, None) + if handler is not None: + yield from handler() + + # Transition dts to next state + next_state = switch.get(state, None) + if next_state is not None: + self.dts.handle.set_state(next_state)