X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwmonitor%2Frift%2Ftasklets%2Frwmonitor%2Ftasklet.py;h=c5caf9f74d03950b84de92449d3b373afc74c970;hb=f314b4af9744068a7ed7a6a6314220c3aa857523;hp=4ab351e7157fcc3b79a01130f670b123570fae00;hpb=6f1a3fe149e4a6b9803382cb299c902f4cf58ec9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwmonitor/rift/tasklets/rwmonitor/tasklet.py b/rwlaunchpad/plugins/rwmonitor/rift/tasklets/rwmonitor/tasklet.py index 4ab351e7..c5caf9f7 100644 --- a/rwlaunchpad/plugins/rwmonitor/rift/tasklets/rwmonitor/tasklet.py +++ b/rwlaunchpad/plugins/rwmonitor/rift/tasklets/rwmonitor/tasklet.py @@ -1,5 +1,5 @@ -# +# # Copyright 2016 RIFT.IO Inc # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -66,18 +66,18 @@ above). import asyncio import concurrent.futures +import gi import time - -import tornado.web import tornado.httpserver +import tornado.web -import gi gi.require_version('RwDts', '1.0') gi.require_version('RwLog', '1.0') gi.require_version('RwMonitorYang', '1.0') gi.require_version('RwLaunchpadYang', '1.0') gi.require_version('RwNsrYang', '1.0') gi.require_version('RwVnfrYang', '1.0') +gi.require_version('rwlib', '1.0') gi.require_version('RwLaunchpadYang', '1.0') from gi.repository import ( RwDts as rwdts, @@ -87,33 +87,40 @@ from gi.repository import ( RwVnfrYang, VnfrYang, ) - +import gi.repository.rwlib as rwlib import rift.tasklets import rift.mano.cloud +from rift.mano.utils.project import ( + ManoProject, + ProjectHandler, + ) + +gi.require_version('RwKeyspec', '1.0') +from gi.repository.RwKeyspec import quoted_key from . import core class DtsHandler(object): - def __init__(self, tasklet): + def __init__(self, project): self.reg = None - self.tasklet = tasklet + self.project = project @property def log(self): - return self.tasklet.log + return self.project._log @property def log_hdl(self): - return self.tasklet.log_hdl + return self.project._log_hdl @property def dts(self): - return self.tasklet.dts + return self.project._dts @property def loop(self): - return self.tasklet.loop + return self.project._loop @property def classname(self): @@ -151,7 +158,7 @@ class VnfrCatalogSubscriber(DtsHandler): with self.dts.group_create() as group: group.register( - xpath=VnfrCatalogSubscriber.XPATH, + xpath=self.project.add_project(VnfrCatalogSubscriber.XPATH), flags=rwdts.Flag.SUBSCRIBER, handler=handler, ) @@ -173,20 +180,20 @@ class NsInstanceConfigSubscriber(DtsHandler): with self.dts.appconf_group_create(acg_handler) as acg: self.reg = acg.register( - xpath=NsInstanceConfigSubscriber.XPATH, + xpath=self.project.add_project(NsInstanceConfigSubscriber.XPATH), flags=rwdts.Flag.SUBSCRIBER, ) class CloudAccountDtsHandler(DtsHandler): - def __init__(self, tasklet): - super().__init__(tasklet) + def __init__(self, project): + super().__init__(project) self._cloud_cfg_subscriber = None def register(self): self.log.debug("creating cloud account config handler") self._cloud_cfg_subscriber = rift.mano.cloud.CloudAccountConfigSubscriber( - self.dts, self.log, self.log_hdl, + self.dts, self.log, self.log_hdl, self.project, rift.mano.cloud.CloudAccountConfigCallbacks( on_add_apply=self.tasklet.on_cloud_account_create, on_delete_apply=self.tasklet.on_cloud_account_delete, @@ -201,14 +208,14 @@ class VdurNfviMetricsPublisher(DtsHandler): from a single VDU. """ - XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id='{}']/vnfr:vdur[vnfr:id='{}']/rw-vnfr:nfvi-metrics" + XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]/vnfr:vdur[vnfr:id={}]/rw-vnfr:nfvi-metrics" # This timeout defines the length of time the publisher will wait for a # request to a data source to complete. If the request cannot be completed # before timing out, the current data will be published instead. TIMEOUT = 2.0 - def __init__(self, tasklet, vnfr, vdur): + def __init__(self, project, vnfr, vdur): """Create an instance of VdurNvfiPublisher Arguments: @@ -217,12 +224,12 @@ class VdurNfviMetricsPublisher(DtsHandler): vdur - the VDUR of the VDU whose metrics are published """ - super().__init__(tasklet) + super().__init__(project) self._vnfr = vnfr self._vdur = vdur self._handle = None - self._xpath = VdurNfviMetricsPublisher.XPATH.format(vnfr.id, vdur.id) + self._xpath = project.add_project(VdurNfviMetricsPublisher.XPATH.format(quoted_key(vnfr.id), quoted_key(vdur.id))) self._deregistered = asyncio.Event(loop=self.loop) @@ -321,7 +328,7 @@ class LaunchpadConfigDtsSubscriber(DtsHandler): with self.dts.appconf_group_create(acg_handler) as acg: self.reg = acg.register( - xpath="C,/rw-launchpad:launchpad-config", + xpath=self.project.add_project("C,/rw-launchpad:launchpad-config"), flags=rwdts.Flag.SUBSCRIBER, ) @@ -335,8 +342,8 @@ class CreateAlarmRPC(DtsHandler): them on to the tasklet. """ - def __init__(self, tasklet): - super().__init__(tasklet) + def __init__(self, project): + super().__init__(project) self._handle = None @asyncio.coroutine @@ -345,6 +352,10 @@ class CreateAlarmRPC(DtsHandler): @asyncio.coroutine def on_prepare(xact_info, action, ks_path, msg): try: + + if not self.project.rpc_check(msg, xact_info=xact_info): + return + response = VnfrYang.YangOutput_Vnfr_CreateAlarm() response.alarm_id = yield from self.tasklet.on_create_alarm( msg.cloud_account, @@ -382,8 +393,8 @@ class DestroyAlarmRPC(DtsHandler): them on to the tasklet. """ - def __init__(self, tasklet): - super().__init__(tasklet) + def __init__(self, project): + super().__init__(project) self._handle = None @asyncio.coroutine @@ -392,6 +403,9 @@ class DestroyAlarmRPC(DtsHandler): @asyncio.coroutine def on_prepare(xact_info, action, ks_path, msg): try: + if not self.project.rpc_check(msg, xact_info=xact_info): + return + yield from self.tasklet.on_destroy_alarm( msg.cloud_account, msg.alarm_id, @@ -473,70 +487,31 @@ class WebhookApplication(tornado.web.Application): ]) -class MonitorTasklet(rift.tasklets.Tasklet): - """ - The MonitorTasklet provides a interface for DTS to interact with an - instance of the Monitor class. This allows the Monitor class to remain - independent of DTS. - """ +class MonitorProject(ManoProject): - DEFAULT_POLLING_PERIOD = 1.0 + def __init__(self, name, tasklet, **kw): + super(MonitorProject, self).__init__(log, name) + self._tasklet = tasklet + self._log_hdl = tasklet.log_hdl + self._dts = tasklet.dts + self._loop = tasklet.loop - def __init__(self, *args, **kwargs): - try: - super().__init__(*args, **kwargs) - self.rwlog.set_category("rw-monitor-log") + self.vnfr_subscriber = VnfrCatalogSubscriber(self) + self.cloud_cfg_subscriber = CloudAccountDtsHandler(self) + self.ns_instance_config_subscriber = NsInstanceConfigSubscriber(self) + self.launchpad_cfg_subscriber = LaunchpadConfigDtsSubscriber(self) - self.vnfr_subscriber = VnfrCatalogSubscriber(self) - self.cloud_cfg_subscriber = CloudAccountDtsHandler(self) - self.ns_instance_config_subscriber = NsInstanceConfigSubscriber(self) - self.launchpad_cfg_subscriber = LaunchpadConfigDtsSubscriber(self) + self.config = core.InstanceConfiguration() + self.config.polling_period = MonitorTasklet.DEFAULT_POLLING_PERIOD - self.config = core.InstanceConfiguration() - self.config.polling_period = MonitorTasklet.DEFAULT_POLLING_PERIOD + self.monitor = core.Monitor(self.loop, self.log, self.config, self) + self.vdur_handlers = dict() - self.monitor = core.Monitor(self.loop, self.log, self.config) - self.vdur_handlers = dict() - - self.webhooks = None - self.create_alarm_rpc = CreateAlarmRPC(self) - self.destroy_alarm_rpc = DestroyAlarmRPC(self) - - - except Exception as e: - self.log.exception(e) - - @property - def polling_period(self): - return self.config.polling_period - - @property - def public_ip(self): - """The public IP of the launchpad""" - return self.config.public_ip - - def start(self): - super().start() - self.log.info("Starting MonitoringTasklet") - - self.log.debug("Registering with dts") - self.dts = rift.tasklets.DTS( - self.tasklet_info, - RwLaunchpadYang.get_schema(), - self.loop, - self.on_dts_state_change - ) - - self.log.debug("Created DTS Api GI Object: %s", self.dts) - - def stop(self): - try: - self.dts.deinit() - except Exception as e: - self.log.exception(e) + self.create_alarm_rpc = CreateAlarmRPC(self) + self.destroy_alarm_rpc = DestroyAlarmRPC(self) @asyncio.coroutine - def init(self): + def register (self): self.log.debug("creating cloud account handler") self.cloud_cfg_subscriber.register() @@ -555,23 +530,15 @@ class MonitorTasklet(rift.tasklets.Tasklet): self.log.debug("creating destroy-alarm rpc handler") yield from self.destroy_alarm_rpc.register() - self.log.debug("creating webhook server") - loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop) - self.webhooks = WebhookApplication(self) - self.server = tornado.httpserver.HTTPServer( - self.webhooks, - io_loop=loop, - ) - @asyncio.coroutine - def on_public_ip(self, ip): - """Store the public IP of the launchpad - - Arguments: - ip - a string containing the public IP address of the launchpad + @property + def polling_period(self): + return self.config.polling_period - """ - self.config.public_ip = ip + @property + def public_ip(self): + """The public IP of the launchpad""" + return self.config.public_ip def on_ns_instance_config_update(self, config): """Update configuration information @@ -589,48 +556,16 @@ class MonitorTasklet(rift.tasklets.Tasklet): def on_cloud_account_delete(self, account_name): self.monitor.remove_cloud_account(account_name) - @asyncio.coroutine - def run(self): - self.webhooks.listen(WebhookApplication.DEFAULT_WEBHOOK_PORT) - - def on_instance_started(self): - self.log.debug("Got instance started callback") - - @asyncio.coroutine - def on_dts_state_change(self, state): - """Handle DTS state change - - Take action according to current DTS state to transition application - into the corresponding application state - - Arguments - state - current dts state - - """ - switch = { - rwdts.State.INIT: rwdts.State.REGN_COMPLETE, - rwdts.State.CONFIG: rwdts.State.RUN, - } - - handlers = { - rwdts.State.INIT: self.init, - rwdts.State.RUN: self.run, - } - - # Transition application to next state - handler = handlers.get(state, None) - if handler is not None: - yield from handler() - - # Transition dts to next state - next_state = switch.get(state, None) - if next_state is not None: - self.dts.handle.set_state(next_state) - def on_vnfr_create(self, vnfr): - if not self.monitor.nfvi_metrics_available(vnfr.cloud_account): + try: + acc = vnfr.cloud_account + except AttributeError as e: + self.log.warning("NFVI metrics not supported") + return + + if not self.monitor.nfvi_metrics_available(acc): msg = "NFVI metrics unavailable for {}" - self.log.warning(msg.format(vnfr.cloud_account)) + self.log.warning(msg.format(acc)) return self.monitor.add_vnfr(vnfr) @@ -642,6 +577,12 @@ class MonitorTasklet(rift.tasklets.Tasklet): self.loop.create_task(coro) def on_vnfr_update(self, vnfr): + try: + acc = vnfr.cloud_account + except AttributeError as e: + self.log.warning("NFVI metrics not supported") + return + if not self.monitor.nfvi_metrics_available(vnfr.cloud_account): msg = "NFVI metrics unavailable for {}" self.log.warning(msg.format(vnfr.cloud_account)) @@ -712,3 +653,115 @@ class MonitorTasklet(rift.tasklets.Tasklet): """ yield from self.monitor.destroy_alarm(account, alarm_id) + + @asyncio.coroutine + def delete_prepare(self): + # Check if any cloud accounts present + if self.cloud_cfg_subscriber and self.cloud_cfg_subscriber._cloud_cfg_subscriber.accounts: + return False + return True + + +class MonitorTasklet(rift.tasklets.Tasklet): + """ + The MonitorTasklet provides a interface for DTS to interact with an + instance of the Monitor class. This allows the Monitor class to remain + independent of DTS. + """ + + DEFAULT_POLLING_PERIOD = 1.0 + + def __init__(self, *args, **kwargs): + try: + super().__init__(*args, **kwargs) + self.rwlog.set_category("rw-monitor-log") + + self._project_handler = None + self.projects = {} + + self.webhooks = None + + except Exception as e: + self.log.exception(e) + + def start(self): + super().start() + self.log.info("Starting MonitoringTasklet") + + self.log.debug("Registering with dts") + self.dts = rift.tasklets.DTS( + self.tasklet_info, + RwLaunchpadYang.get_schema(), + self.loop, + self.on_dts_state_change + ) + + self.log.debug("Created DTS Api GI Object: %s", self.dts) + + def stop(self): + try: + self.dts.deinit() + except Exception as e: + self.log.exception(e) + + @asyncio.coroutine + def init(self): + self.log.debug("creating webhook server") + loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop) + self.webhooks = WebhookApplication(self) + self.server = tornado.httpserver.HTTPServer( + self.webhooks, + io_loop=loop, + ) + + @asyncio.coroutine + def on_public_ip(self, ip): + """Store the public IP of the launchpad + + Arguments: + ip - a string containing the public IP address of the launchpad + + """ + self.config.public_ip = ip + + @asyncio.coroutine + def run(self): + address = rwlib.getenv("RWVM_INTERNAL_IPADDR") + if (address is None): + address="" + self.webhooks.listen(WebhookApplication.DEFAULT_WEBHOOK_PORT, address=address) + + def on_instance_started(self): + self.log.debug("Got instance started callback") + + @asyncio.coroutine + def on_dts_state_change(self, state): + """Handle DTS state change + + Take action according to current DTS state to transition application + into the corresponding application state + + Arguments + state - current dts state + + """ + switch = { + rwdts.State.INIT: rwdts.State.REGN_COMPLETE, + rwdts.State.CONFIG: rwdts.State.RUN, + } + + handlers = { + rwdts.State.INIT: self.init, + rwdts.State.RUN: self.run, + } + + # Transition application to next state + handler = handlers.get(state, None) + if handler is not None: + yield from handler() + + # Transition dts to next state + next_state = switch.get(state, None) + if next_state is not None: + self.dts.handle.set_state(next_state) +