X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwmonparam%2Frift%2Ftasklets%2Frwmonparam%2Frwmonparam.py;fp=rwlaunchpad%2Fplugins%2Frwmonparam%2Frift%2Ftasklets%2Frwmonparam%2Frwmonparam.py;h=d0f31e34f3de7508b441ba26b46ac83f633ea643;hb=6f07e6f33f751ab4ffe624f6037f887b243bece2;hp=0000000000000000000000000000000000000000;hpb=72a563886272088feb7cb52e4aafbe6d2c580ff9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwmonparam/rift/tasklets/rwmonparam/rwmonparam.py b/rwlaunchpad/plugins/rwmonparam/rift/tasklets/rwmonparam/rwmonparam.py new file mode 100644 index 00000000..d0f31e34 --- /dev/null +++ b/rwlaunchpad/plugins/rwmonparam/rift/tasklets/rwmonparam/rwmonparam.py @@ -0,0 +1,216 @@ +""" +# +# Copyright 2016 RIFT.IO Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +@file rwmonparam.py +@author Varun Prasad (varun.prasad@riftio.com) +@date 01-Jul-2016 + +""" + +import asyncio + +import gi +gi.require_version('RwDts', '1.0') +gi.require_version('RwLaunchpadYang', '1.0') + +from gi.repository import ( + RwDts as rwdts, + RwLaunchpadYang, + ProtobufC) +import rift.mano.cloud +import rift.mano.dts as subscriber +import rift.tasklets + +from . import vnfr_core +from . import nsr_core + + +class MonitoringParameterTasklet(rift.tasklets.Tasklet): + """The main task of this Tasklet is to listen for VNFR changes and once the + VNFR hits the running state, triggers the monitor. + """ + def __init__(self, *args, **kwargs): + try: + super().__init__(*args, **kwargs) + self.rwlog.set_category("rw-monitor-log") + except Exception as e: + self.log.exception(e) + + self.vnfr_subscriber = None + self.store = None + + self.vnfr_monitors = {} + self.nsr_monitors = {} + + # Needs to be moved to store once the DTS bug is resolved + self.vnfrs = {} + + def start(self): + super().start() + + self.log.info("Starting MonitoringParameterTasklet") + self.log.debug("Registering with dts") + + self.dts = rift.tasklets.DTS( + self.tasklet_info, + RwLaunchpadYang.get_schema(), + self.loop, + self.on_dts_state_change + ) + + self.vnfr_subscriber = subscriber.VnfrCatalogSubscriber.from_tasklet( + self, + callback=self.handle_vnfr) + self.nsr_subsriber = subscriber.NsrCatalogSubscriber.from_tasklet( + self, + callback=self.handle_nsr) + + self.store = subscriber.SubscriberStore.from_tasklet(self) + + self.log.debug("Created DTS Api GI Object: %s", self.dts) + + def stop(self): + try: + self.dts.deinit() + except Exception as e: + self.log.exception(e) + + @asyncio.coroutine + def init(self): + self.log.debug("creating vnfr subscriber") + yield from self.store.register() + yield from self.vnfr_subscriber.register() + yield from self.nsr_subsriber.register() + + @asyncio.coroutine + def run(self): + pass + + @asyncio.coroutine + def on_dts_state_change(self, state): + """Handle DTS state change + + Take action according to current DTS state to transition application + into the corresponding application state + + Arguments + state - current dts state + + """ + switch = { + rwdts.State.INIT: rwdts.State.REGN_COMPLETE, + rwdts.State.CONFIG: rwdts.State.RUN, + } + + handlers = { + rwdts.State.INIT: self.init, + rwdts.State.RUN: self.run, + } + + # Transition application to next state + handler = handlers.get(state, None) + if handler is not None: + yield from handler() + + # Transition dts to next state + next_state = switch.get(state, None) + if next_state is not None: + self.dts.handle.set_state(next_state) + + def handle_vnfr(self, vnfr, action): + """Starts a monitoring parameter job for every VNFR that reaches + running state + + Args: + vnfr (GiOBject): VNFR Gi object message from DTS + delete_mode (bool, optional): if set, stops and removes the monitor. + """ + + def vnfr_create(): + # if vnfr.operational_status == "running" and vnfr.id not in self.vnfr_monitors: + if vnfr.config_status == "configured" and vnfr.id not in self.vnfr_monitors: + + vnf_mon = vnfr_core.VnfMonitorDtsHandler.from_vnf_data( + self, + vnfr, + self.store.get_vnfd(vnfr.vnfd_ref)) + + self.vnfr_monitors[vnfr.id] = vnf_mon + self.vnfrs[vnfr.id] = vnfr + + @asyncio.coroutine + def task(): + yield from vnf_mon.register() + vnf_mon.start() + + self.loop.create_task(task()) + + + def vnfr_delete(): + if vnfr.id in self.vnfr_monitors: + self.log.debug("VNFR %s deleted: Stopping vnfr monitoring", vnfr.id) + vnf_mon = self.vnfr_monitors.pop(vnfr.id) + vnf_mon.stop() + self.vnfrs.pop(vnfr.id) + + if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]: + vnfr_create() + elif action == rwdts.QueryAction.DELETE: + vnfr_delete() + + + def handle_nsr(self, nsr, action): + """Callback for NSR opdata changes. Creates a publisher for every + NS that moves to config state. + + Args: + nsr (RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr): Ns Opdata + action (rwdts.QueryAction): Action type of the change. + """ + def nsr_create(): + # if nsr.operational_status == "running" and nsr.ns_instance_config_ref not in self.nsr_monitors: + if nsr.config_status == "configured" and nsr.ns_instance_config_ref not in self.nsr_monitors: + nsr_mon = nsr_core.NsrMonitorDtsHandler( + self.log, + self.dts, + self.loop, + nsr, + list(self.vnfrs.values()), + self.store + ) + + self.nsr_monitors[nsr.ns_instance_config_ref] = nsr_mon + + @asyncio.coroutine + def task(): + yield from nsr_mon.register() + yield from nsr_mon.start() + + self.loop.create_task(task()) + + + + def nsr_delete(): + if nsr.ns_instance_config_ref in self.nsr_monitors: + # if vnfr.operational_status == "running" and vnfr.id in self.vnfr_monitors: + nsr_mon = self.nsr_monitors.pop(nsr.ns_instance_config_ref) + nsr_mon.stop() + + if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]: + nsr_create() + elif action == rwdts.QueryAction.DELETE: + nsr_delete()