update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwmonitor / rift / tasklets / rwmonitor / tasklet.py
index 4ab351e..c5caf9f 100644 (file)
@@ -1,5 +1,5 @@
 
-# 
+#
 #   Copyright 2016 RIFT.IO Inc
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
@@ -66,18 +66,18 @@ above).
 
 import asyncio
 import concurrent.futures
+import gi
 import time
-
-import tornado.web
 import tornado.httpserver
+import tornado.web
 
-import gi
 gi.require_version('RwDts', '1.0')
 gi.require_version('RwLog', '1.0')
 gi.require_version('RwMonitorYang', '1.0')
 gi.require_version('RwLaunchpadYang', '1.0')
 gi.require_version('RwNsrYang', '1.0')
 gi.require_version('RwVnfrYang', '1.0')
+gi.require_version('rwlib', '1.0')
 gi.require_version('RwLaunchpadYang', '1.0')
 from gi.repository import (
     RwDts as rwdts,
@@ -87,33 +87,40 @@ from gi.repository import (
     RwVnfrYang,
     VnfrYang,
 )
-
+import gi.repository.rwlib as rwlib
 import rift.tasklets
 import rift.mano.cloud
+from rift.mano.utils.project import (
+    ManoProject,
+    ProjectHandler,
+    )
+
+gi.require_version('RwKeyspec', '1.0')
+from gi.repository.RwKeyspec import quoted_key
 
 from . import core
 
 
 class DtsHandler(object):
-    def __init__(self, tasklet):
+    def __init__(self, project):
         self.reg = None
-        self.tasklet = tasklet
+        self.project = project
 
     @property
     def log(self):
-        return self.tasklet.log
+        return self.project._log
 
     @property
     def log_hdl(self):
-        return self.tasklet.log_hdl
+        return self.project._log_hdl
 
     @property
     def dts(self):
-        return self.tasklet.dts
+        return self.project._dts
 
     @property
     def loop(self):
-        return self.tasklet.loop
+        return self.project._loop
 
     @property
     def classname(self):
@@ -151,7 +158,7 @@ class VnfrCatalogSubscriber(DtsHandler):
 
         with self.dts.group_create() as group:
             group.register(
-                    xpath=VnfrCatalogSubscriber.XPATH,
+                    xpath=self.project.add_project(VnfrCatalogSubscriber.XPATH),
                     flags=rwdts.Flag.SUBSCRIBER,
                     handler=handler,
                     )
@@ -173,20 +180,20 @@ class NsInstanceConfigSubscriber(DtsHandler):
 
         with self.dts.appconf_group_create(acg_handler) as acg:
             self.reg = acg.register(
-                    xpath=NsInstanceConfigSubscriber.XPATH,
+                    xpath=self.project.add_project(NsInstanceConfigSubscriber.XPATH),
                     flags=rwdts.Flag.SUBSCRIBER,
                     )
 
 
 class CloudAccountDtsHandler(DtsHandler):
-    def __init__(self, tasklet):
-        super().__init__(tasklet)
+    def __init__(self, project):
+        super().__init__(project)
         self._cloud_cfg_subscriber = None
 
     def register(self):
         self.log.debug("creating cloud account config handler")
         self._cloud_cfg_subscriber = rift.mano.cloud.CloudAccountConfigSubscriber(
-               self.dts, self.log, self.log_hdl,
+               self.dts, self.log, self.log_hdl, self.project,
                rift.mano.cloud.CloudAccountConfigCallbacks(
                    on_add_apply=self.tasklet.on_cloud_account_create,
                    on_delete_apply=self.tasklet.on_cloud_account_delete,
@@ -201,14 +208,14 @@ class VdurNfviMetricsPublisher(DtsHandler):
     from a single VDU.
     """
 
-    XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id='{}']/vnfr:vdur[vnfr:id='{}']/rw-vnfr:nfvi-metrics"
+    XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]/vnfr:vdur[vnfr:id={}]/rw-vnfr:nfvi-metrics"
 
     # This timeout defines the length of time the publisher will wait for a
     # request to a data source to complete. If the request cannot be completed
     # before timing out, the current data will be published instead.
     TIMEOUT = 2.0
 
-    def __init__(self, tasklet, vnfr, vdur):
+    def __init__(self, project, vnfr, vdur):
         """Create an instance of VdurNvfiPublisher
 
         Arguments:
@@ -217,12 +224,12 @@ class VdurNfviMetricsPublisher(DtsHandler):
             vdur    - the VDUR of the VDU whose metrics are published
 
         """
-        super().__init__(tasklet)
+        super().__init__(project)
         self._vnfr = vnfr
         self._vdur = vdur
 
         self._handle = None
-        self._xpath = VdurNfviMetricsPublisher.XPATH.format(vnfr.id, vdur.id)
+        self._xpath = project.add_project(VdurNfviMetricsPublisher.XPATH.format(quoted_key(vnfr.id), quoted_key(vdur.id)))
 
         self._deregistered = asyncio.Event(loop=self.loop)
 
@@ -321,7 +328,7 @@ class LaunchpadConfigDtsSubscriber(DtsHandler):
 
             with self.dts.appconf_group_create(acg_handler) as acg:
                 self.reg = acg.register(
-                        xpath="C,/rw-launchpad:launchpad-config",
+                        xpath=self.project.add_project("C,/rw-launchpad:launchpad-config"),
                         flags=rwdts.Flag.SUBSCRIBER,
                         )
 
@@ -335,8 +342,8 @@ class CreateAlarmRPC(DtsHandler):
     them on to the tasklet.
     """
 
-    def __init__(self, tasklet):
-        super().__init__(tasklet)
+    def __init__(self, project):
+        super().__init__(project)
         self._handle = None
 
     @asyncio.coroutine
@@ -345,6 +352,10 @@ class CreateAlarmRPC(DtsHandler):
         @asyncio.coroutine
         def on_prepare(xact_info, action, ks_path, msg):
             try:
+
+                if not self.project.rpc_check(msg, xact_info=xact_info):
+                    return
+
                 response = VnfrYang.YangOutput_Vnfr_CreateAlarm()
                 response.alarm_id = yield from self.tasklet.on_create_alarm(
                         msg.cloud_account,
@@ -382,8 +393,8 @@ class DestroyAlarmRPC(DtsHandler):
     them on to the tasklet.
     """
 
-    def __init__(self, tasklet):
-        super().__init__(tasklet)
+    def __init__(self, project):
+        super().__init__(project)
         self._handle = None
 
     @asyncio.coroutine
@@ -392,6 +403,9 @@ class DestroyAlarmRPC(DtsHandler):
         @asyncio.coroutine
         def on_prepare(xact_info, action, ks_path, msg):
             try:
+                if not self.project.rpc_check(msg, xact_info=xact_info):
+                    return
+
                 yield from self.tasklet.on_destroy_alarm(
                         msg.cloud_account,
                         msg.alarm_id,
@@ -473,70 +487,31 @@ class WebhookApplication(tornado.web.Application):
                 ])
 
 
-class MonitorTasklet(rift.tasklets.Tasklet):
-    """
-    The MonitorTasklet provides a interface for DTS to interact with an
-    instance of the Monitor class. This allows the Monitor class to remain
-    independent of DTS.
-    """
+class MonitorProject(ManoProject):
 
-    DEFAULT_POLLING_PERIOD = 1.0
+    def __init__(self, name, tasklet, **kw):
+        super(MonitorProject, self).__init__(log, name)
+        self._tasklet = tasklet
+        self._log_hdl = tasklet.log_hdl
+        self._dts = tasklet.dts
+        self._loop = tasklet.loop
 
-    def __init__(self, *args, **kwargs):
-        try:
-            super().__init__(*args, **kwargs)
-            self.rwlog.set_category("rw-monitor-log")
+        self.vnfr_subscriber = VnfrCatalogSubscriber(self)
+        self.cloud_cfg_subscriber = CloudAccountDtsHandler(self)
+        self.ns_instance_config_subscriber = NsInstanceConfigSubscriber(self)
+        self.launchpad_cfg_subscriber = LaunchpadConfigDtsSubscriber(self)
 
-            self.vnfr_subscriber = VnfrCatalogSubscriber(self)
-            self.cloud_cfg_subscriber = CloudAccountDtsHandler(self)
-            self.ns_instance_config_subscriber = NsInstanceConfigSubscriber(self)
-            self.launchpad_cfg_subscriber = LaunchpadConfigDtsSubscriber(self)
+        self.config = core.InstanceConfiguration()
+        self.config.polling_period = MonitorTasklet.DEFAULT_POLLING_PERIOD
 
-            self.config = core.InstanceConfiguration()
-            self.config.polling_period = MonitorTasklet.DEFAULT_POLLING_PERIOD
+        self.monitor = core.Monitor(self.loop, self.log, self.config, self)
+        self.vdur_handlers = dict()
 
-            self.monitor = core.Monitor(self.loop, self.log, self.config)
-            self.vdur_handlers = dict()
-
-            self.webhooks = None
-            self.create_alarm_rpc = CreateAlarmRPC(self)
-            self.destroy_alarm_rpc = DestroyAlarmRPC(self)
-
-
-        except Exception as e:
-            self.log.exception(e)
-
-    @property
-    def polling_period(self):
-        return self.config.polling_period
-
-    @property
-    def public_ip(self):
-        """The public IP of the launchpad"""
-        return self.config.public_ip
-
-    def start(self):
-        super().start()
-        self.log.info("Starting MonitoringTasklet")
-
-        self.log.debug("Registering with dts")
-        self.dts = rift.tasklets.DTS(
-                self.tasklet_info,
-                RwLaunchpadYang.get_schema(),
-                self.loop,
-                self.on_dts_state_change
-                )
-
-        self.log.debug("Created DTS Api GI Object: %s", self.dts)
-
-    def stop(self):
-      try:
-          self.dts.deinit()
-      except Exception as e:
-          self.log.exception(e)
+        self.create_alarm_rpc = CreateAlarmRPC(self)
+        self.destroy_alarm_rpc = DestroyAlarmRPC(self)
 
     @asyncio.coroutine
-    def init(self):
+    def register (self):
         self.log.debug("creating cloud account handler")
         self.cloud_cfg_subscriber.register()
 
@@ -555,23 +530,15 @@ class MonitorTasklet(rift.tasklets.Tasklet):
         self.log.debug("creating destroy-alarm rpc handler")
         yield from self.destroy_alarm_rpc.register()
 
-        self.log.debug("creating webhook server")
-        loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
-        self.webhooks = WebhookApplication(self)
-        self.server = tornado.httpserver.HTTPServer(
-            self.webhooks,
-            io_loop=loop,
-        )
 
-    @asyncio.coroutine
-    def on_public_ip(self, ip):
-        """Store the public IP of the launchpad
-
-        Arguments:
-            ip - a string containing the public IP address of the launchpad
+    @property
+    def polling_period(self):
+        return self.config.polling_period
 
-        """
-        self.config.public_ip = ip
+    @property
+    def public_ip(self):
+        """The public IP of the launchpad"""
+        return self.config.public_ip
 
     def on_ns_instance_config_update(self, config):
         """Update configuration information
@@ -589,48 +556,16 @@ class MonitorTasklet(rift.tasklets.Tasklet):
     def on_cloud_account_delete(self, account_name):
         self.monitor.remove_cloud_account(account_name)
 
-    @asyncio.coroutine
-    def run(self):
-        self.webhooks.listen(WebhookApplication.DEFAULT_WEBHOOK_PORT)
-
-    def on_instance_started(self):
-        self.log.debug("Got instance started callback")
-
-    @asyncio.coroutine
-    def on_dts_state_change(self, state):
-        """Handle DTS state change
-
-        Take action according to current DTS state to transition application
-        into the corresponding application state
-
-        Arguments
-            state - current dts state
-
-        """
-        switch = {
-            rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
-            rwdts.State.CONFIG: rwdts.State.RUN,
-        }
-
-        handlers = {
-            rwdts.State.INIT: self.init,
-            rwdts.State.RUN: self.run,
-        }
-
-        # Transition application to next state
-        handler = handlers.get(state, None)
-        if handler is not None:
-            yield from handler()
-
-        # Transition dts to next state
-        next_state = switch.get(state, None)
-        if next_state is not None:
-            self.dts.handle.set_state(next_state)
-
     def on_vnfr_create(self, vnfr):
-        if not self.monitor.nfvi_metrics_available(vnfr.cloud_account):
+        try:
+            acc = vnfr.cloud_account
+        except AttributeError as e:
+            self.log.warning("NFVI metrics not supported")
+            return
+
+        if not self.monitor.nfvi_metrics_available(acc):
             msg = "NFVI metrics unavailable for {}"
-            self.log.warning(msg.format(vnfr.cloud_account))
+            self.log.warning(msg.format(acc))
             return
 
         self.monitor.add_vnfr(vnfr)
@@ -642,6 +577,12 @@ class MonitorTasklet(rift.tasklets.Tasklet):
                 self.loop.create_task(coro)
 
     def on_vnfr_update(self, vnfr):
+        try:
+            acc = vnfr.cloud_account
+        except AttributeError as e:
+            self.log.warning("NFVI metrics not supported")
+            return
+
         if not self.monitor.nfvi_metrics_available(vnfr.cloud_account):
             msg = "NFVI metrics unavailable for {}"
             self.log.warning(msg.format(vnfr.cloud_account))
@@ -712,3 +653,115 @@ class MonitorTasklet(rift.tasklets.Tasklet):
 
         """
         yield from self.monitor.destroy_alarm(account, alarm_id)
+
+    @asyncio.coroutine
+    def delete_prepare(self):
+        # Check if any cloud accounts present
+        if self.cloud_cfg_subscriber and self.cloud_cfg_subscriber._cloud_cfg_subscriber.accounts:
+            return False
+        return True
+
+
+class MonitorTasklet(rift.tasklets.Tasklet):
+    """
+    The MonitorTasklet provides a interface for DTS to interact with an
+    instance of the Monitor class. This allows the Monitor class to remain
+    independent of DTS.
+    """
+
+    DEFAULT_POLLING_PERIOD = 1.0
+
+    def __init__(self, *args, **kwargs):
+        try:
+            super().__init__(*args, **kwargs)
+            self.rwlog.set_category("rw-monitor-log")
+
+            self._project_handler = None
+            self.projects = {}
+
+            self.webhooks = None
+
+        except Exception as e:
+            self.log.exception(e)
+
+    def start(self):
+        super().start()
+        self.log.info("Starting MonitoringTasklet")
+
+        self.log.debug("Registering with dts")
+        self.dts = rift.tasklets.DTS(
+                self.tasklet_info,
+                RwLaunchpadYang.get_schema(),
+                self.loop,
+                self.on_dts_state_change
+                )
+
+        self.log.debug("Created DTS Api GI Object: %s", self.dts)
+
+    def stop(self):
+      try:
+          self.dts.deinit()
+      except Exception as e:
+          self.log.exception(e)
+
+    @asyncio.coroutine
+    def init(self):
+        self.log.debug("creating webhook server")
+        loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
+        self.webhooks = WebhookApplication(self)
+        self.server = tornado.httpserver.HTTPServer(
+            self.webhooks,
+            io_loop=loop,
+        )
+
+    @asyncio.coroutine
+    def on_public_ip(self, ip):
+        """Store the public IP of the launchpad
+
+        Arguments:
+            ip - a string containing the public IP address of the launchpad
+
+        """
+        self.config.public_ip = ip
+
+    @asyncio.coroutine
+    def run(self):
+        address = rwlib.getenv("RWVM_INTERNAL_IPADDR")
+        if (address is None):
+            address="" 
+        self.webhooks.listen(WebhookApplication.DEFAULT_WEBHOOK_PORT, address=address)
+
+    def on_instance_started(self):
+        self.log.debug("Got instance started callback")
+
+    @asyncio.coroutine
+    def on_dts_state_change(self, state):
+        """Handle DTS state change
+
+        Take action according to current DTS state to transition application
+        into the corresponding application state
+
+        Arguments
+            state - current dts state
+
+        """
+        switch = {
+            rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
+            rwdts.State.CONFIG: rwdts.State.RUN,
+        }
+
+        handlers = {
+            rwdts.State.INIT: self.init,
+            rwdts.State.RUN: self.run,
+        }
+
+        # Transition application to next state
+        handler = handlers.get(state, None)
+        if handler is not None:
+            yield from handler()
+
+        # Transition dts to next state
+        next_state = switch.get(state, None)
+        if next_state is not None:
+            self.dts.handle.set_state(next_state)
+