New feature: Code changes for project support
[osm/SO.git] / rwlaunchpad / plugins / rwmonitor / rift / tasklets / rwmonitor / tasklet.py
index 4ab351e..b9173e6 100644 (file)
@@ -90,30 +90,34 @@ from gi.repository import (
 
 import rift.tasklets
 import rift.mano.cloud
+from rift.mano.utils.project import (
+    ManoProject,
+    ProjectHandler,
+    )
 
 from . import core
 
 
 class DtsHandler(object):
-    def __init__(self, tasklet):
+    def __init__(self, project):
         self.reg = None
-        self.tasklet = tasklet
+        self.project = project
 
     @property
     def log(self):
-        return self.tasklet.log
+        return self.project._log
 
     @property
     def log_hdl(self):
-        return self.tasklet.log_hdl
+        return self.project._log_hdl
 
     @property
     def dts(self):
-        return self.tasklet.dts
+        return self.project._dts
 
     @property
     def loop(self):
-        return self.tasklet.loop
+        return self.project._loop
 
     @property
     def classname(self):
@@ -151,7 +155,7 @@ class VnfrCatalogSubscriber(DtsHandler):
 
         with self.dts.group_create() as group:
             group.register(
-                    xpath=VnfrCatalogSubscriber.XPATH,
+                    xpath=self.project.add_project(VnfrCatalogSubscriber.XPATH),
                     flags=rwdts.Flag.SUBSCRIBER,
                     handler=handler,
                     )
@@ -173,20 +177,20 @@ class NsInstanceConfigSubscriber(DtsHandler):
 
         with self.dts.appconf_group_create(acg_handler) as acg:
             self.reg = acg.register(
-                    xpath=NsInstanceConfigSubscriber.XPATH,
+                    xpath=self.project.add_project(NsInstanceConfigSubscriber.XPATH),
                     flags=rwdts.Flag.SUBSCRIBER,
                     )
 
 
 class CloudAccountDtsHandler(DtsHandler):
-    def __init__(self, tasklet):
-        super().__init__(tasklet)
+    def __init__(self, project):
+        super().__init__(project)
         self._cloud_cfg_subscriber = None
 
     def register(self):
         self.log.debug("creating cloud account config handler")
         self._cloud_cfg_subscriber = rift.mano.cloud.CloudAccountConfigSubscriber(
-               self.dts, self.log, self.log_hdl,
+               self.dts, self.log, self.log_hdl, self.project,
                rift.mano.cloud.CloudAccountConfigCallbacks(
                    on_add_apply=self.tasklet.on_cloud_account_create,
                    on_delete_apply=self.tasklet.on_cloud_account_delete,
@@ -208,7 +212,7 @@ class VdurNfviMetricsPublisher(DtsHandler):
     # before timing out, the current data will be published instead.
     TIMEOUT = 2.0
 
-    def __init__(self, tasklet, vnfr, vdur):
+    def __init__(self, project, vnfr, vdur):
         """Create an instance of VdurNvfiPublisher
 
         Arguments:
@@ -217,12 +221,12 @@ class VdurNfviMetricsPublisher(DtsHandler):
             vdur    - the VDUR of the VDU whose metrics are published
 
         """
-        super().__init__(tasklet)
+        super().__init__(project)
         self._vnfr = vnfr
         self._vdur = vdur
 
         self._handle = None
-        self._xpath = VdurNfviMetricsPublisher.XPATH.format(vnfr.id, vdur.id)
+        self._xpath = project.add_project(VdurNfviMetricsPublisher.XPATH.format(vnfr.id, vdur.id))
 
         self._deregistered = asyncio.Event(loop=self.loop)
 
@@ -321,7 +325,7 @@ class LaunchpadConfigDtsSubscriber(DtsHandler):
 
             with self.dts.appconf_group_create(acg_handler) as acg:
                 self.reg = acg.register(
-                        xpath="C,/rw-launchpad:launchpad-config",
+                        xpath=self.project.add_project("C,/rw-launchpad:launchpad-config"),
                         flags=rwdts.Flag.SUBSCRIBER,
                         )
 
@@ -335,8 +339,8 @@ class CreateAlarmRPC(DtsHandler):
     them on to the tasklet.
     """
 
-    def __init__(self, tasklet):
-        super().__init__(tasklet)
+    def __init__(self, project):
+        super().__init__(project)
         self._handle = None
 
     @asyncio.coroutine
@@ -345,6 +349,10 @@ class CreateAlarmRPC(DtsHandler):
         @asyncio.coroutine
         def on_prepare(xact_info, action, ks_path, msg):
             try:
+
+                if not self.project.rpc_check(msg, xact_info=xact_info):
+                    return
+
                 response = VnfrYang.YangOutput_Vnfr_CreateAlarm()
                 response.alarm_id = yield from self.tasklet.on_create_alarm(
                         msg.cloud_account,
@@ -382,8 +390,8 @@ class DestroyAlarmRPC(DtsHandler):
     them on to the tasklet.
     """
 
-    def __init__(self, tasklet):
-        super().__init__(tasklet)
+    def __init__(self, project):
+        super().__init__(project)
         self._handle = None
 
     @asyncio.coroutine
@@ -392,6 +400,9 @@ class DestroyAlarmRPC(DtsHandler):
         @asyncio.coroutine
         def on_prepare(xact_info, action, ks_path, msg):
             try:
+                if not self.project.rpc_check(msg, xact_info=xact_info):
+                    return
+
                 yield from self.tasklet.on_destroy_alarm(
                         msg.cloud_account,
                         msg.alarm_id,
@@ -473,70 +484,31 @@ class WebhookApplication(tornado.web.Application):
                 ])
 
 
-class MonitorTasklet(rift.tasklets.Tasklet):
-    """
-    The MonitorTasklet provides a interface for DTS to interact with an
-    instance of the Monitor class. This allows the Monitor class to remain
-    independent of DTS.
-    """
-
-    DEFAULT_POLLING_PERIOD = 1.0
-
-    def __init__(self, *args, **kwargs):
-        try:
-            super().__init__(*args, **kwargs)
-            self.rwlog.set_category("rw-monitor-log")
-
-            self.vnfr_subscriber = VnfrCatalogSubscriber(self)
-            self.cloud_cfg_subscriber = CloudAccountDtsHandler(self)
-            self.ns_instance_config_subscriber = NsInstanceConfigSubscriber(self)
-            self.launchpad_cfg_subscriber = LaunchpadConfigDtsSubscriber(self)
+class MonitorProject(ManoProject):
 
-            self.config = core.InstanceConfiguration()
-            self.config.polling_period = MonitorTasklet.DEFAULT_POLLING_PERIOD
+    def __init__(self, name, tasklet, **kw):
+        super(MonitorProject, self).__init__(log, name)
+        self._tasklet = tasklet
+        self._log_hdl = tasklet.log_hdl
+        self._dts = tasklet.dts
+        self._loop = tasklet.loop
 
-            self.monitor = core.Monitor(self.loop, self.log, self.config)
-            self.vdur_handlers = dict()
+        self.vnfr_subscriber = VnfrCatalogSubscriber(self)
+        self.cloud_cfg_subscriber = CloudAccountDtsHandler(self)
+        self.ns_instance_config_subscriber = NsInstanceConfigSubscriber(self)
+        self.launchpad_cfg_subscriber = LaunchpadConfigDtsSubscriber(self)
 
-            self.webhooks = None
-            self.create_alarm_rpc = CreateAlarmRPC(self)
-            self.destroy_alarm_rpc = DestroyAlarmRPC(self)
+        self.config = core.InstanceConfiguration()
+        self.config.polling_period = MonitorTasklet.DEFAULT_POLLING_PERIOD
 
+        self.monitor = core.Monitor(self.loop, self.log, self.config, self)
+        self.vdur_handlers = dict()
 
-        except Exception as e:
-            self.log.exception(e)
-
-    @property
-    def polling_period(self):
-        return self.config.polling_period
-
-    @property
-    def public_ip(self):
-        """The public IP of the launchpad"""
-        return self.config.public_ip
-
-    def start(self):
-        super().start()
-        self.log.info("Starting MonitoringTasklet")
-
-        self.log.debug("Registering with dts")
-        self.dts = rift.tasklets.DTS(
-                self.tasklet_info,
-                RwLaunchpadYang.get_schema(),
-                self.loop,
-                self.on_dts_state_change
-                )
-
-        self.log.debug("Created DTS Api GI Object: %s", self.dts)
-
-    def stop(self):
-      try:
-          self.dts.deinit()
-      except Exception as e:
-          self.log.exception(e)
+        self.create_alarm_rpc = CreateAlarmRPC(self)
+        self.destroy_alarm_rpc = DestroyAlarmRPC(self)
 
     @asyncio.coroutine
-    def init(self):
+    def register (self):
         self.log.debug("creating cloud account handler")
         self.cloud_cfg_subscriber.register()
 
@@ -555,23 +527,15 @@ class MonitorTasklet(rift.tasklets.Tasklet):
         self.log.debug("creating destroy-alarm rpc handler")
         yield from self.destroy_alarm_rpc.register()
 
-        self.log.debug("creating webhook server")
-        loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
-        self.webhooks = WebhookApplication(self)
-        self.server = tornado.httpserver.HTTPServer(
-            self.webhooks,
-            io_loop=loop,
-        )
-
-    @asyncio.coroutine
-    def on_public_ip(self, ip):
-        """Store the public IP of the launchpad
 
-        Arguments:
-            ip - a string containing the public IP address of the launchpad
+    @property
+    def polling_period(self):
+        return self.config.polling_period
 
-        """
-        self.config.public_ip = ip
+    @property
+    def public_ip(self):
+        """The public IP of the launchpad"""
+        return self.config.public_ip
 
     def on_ns_instance_config_update(self, config):
         """Update configuration information
@@ -589,44 +553,6 @@ class MonitorTasklet(rift.tasklets.Tasklet):
     def on_cloud_account_delete(self, account_name):
         self.monitor.remove_cloud_account(account_name)
 
-    @asyncio.coroutine
-    def run(self):
-        self.webhooks.listen(WebhookApplication.DEFAULT_WEBHOOK_PORT)
-
-    def on_instance_started(self):
-        self.log.debug("Got instance started callback")
-
-    @asyncio.coroutine
-    def on_dts_state_change(self, state):
-        """Handle DTS state change
-
-        Take action according to current DTS state to transition application
-        into the corresponding application state
-
-        Arguments
-            state - current dts state
-
-        """
-        switch = {
-            rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
-            rwdts.State.CONFIG: rwdts.State.RUN,
-        }
-
-        handlers = {
-            rwdts.State.INIT: self.init,
-            rwdts.State.RUN: self.run,
-        }
-
-        # Transition application to next state
-        handler = handlers.get(state, None)
-        if handler is not None:
-            yield from handler()
-
-        # Transition dts to next state
-        next_state = switch.get(state, None)
-        if next_state is not None:
-            self.dts.handle.set_state(next_state)
-
     def on_vnfr_create(self, vnfr):
         if not self.monitor.nfvi_metrics_available(vnfr.cloud_account):
             msg = "NFVI metrics unavailable for {}"
@@ -712,3 +638,105 @@ class MonitorTasklet(rift.tasklets.Tasklet):
 
         """
         yield from self.monitor.destroy_alarm(account, alarm_id)
+
+
+class MonitorTasklet(rift.tasklets.Tasklet):
+    """
+    The MonitorTasklet provides a interface for DTS to interact with an
+    instance of the Monitor class. This allows the Monitor class to remain
+    independent of DTS.
+    """
+
+    DEFAULT_POLLING_PERIOD = 1.0
+
+    def __init__(self, *args, **kwargs):
+        try:
+            super().__init__(*args, **kwargs)
+            self.rwlog.set_category("rw-monitor-log")
+
+            self._project_handler = None
+            self.projects = {}
+
+            self.webhooks = None
+
+        except Exception as e:
+            self.log.exception(e)
+
+    def start(self):
+        super().start()
+        self.log.info("Starting MonitoringTasklet")
+
+        self.log.debug("Registering with dts")
+        self.dts = rift.tasklets.DTS(
+                self.tasklet_info,
+                RwLaunchpadYang.get_schema(),
+                self.loop,
+                self.on_dts_state_change
+                )
+
+        self.log.debug("Created DTS Api GI Object: %s", self.dts)
+
+    def stop(self):
+      try:
+          self.dts.deinit()
+      except Exception as e:
+          self.log.exception(e)
+
+    @asyncio.coroutine
+    def init(self):
+        self.log.debug("creating webhook server")
+        loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
+        self.webhooks = WebhookApplication(self)
+        self.server = tornado.httpserver.HTTPServer(
+            self.webhooks,
+            io_loop=loop,
+        )
+
+    @asyncio.coroutine
+    def on_public_ip(self, ip):
+        """Store the public IP of the launchpad
+
+        Arguments:
+            ip - a string containing the public IP address of the launchpad
+
+        """
+        self.config.public_ip = ip
+
+    @asyncio.coroutine
+    def run(self):
+        self.webhooks.listen(WebhookApplication.DEFAULT_WEBHOOK_PORT)
+
+    def on_instance_started(self):
+        self.log.debug("Got instance started callback")
+
+    @asyncio.coroutine
+    def on_dts_state_change(self, state):
+        """Handle DTS state change
+
+        Take action according to current DTS state to transition application
+        into the corresponding application state
+
+        Arguments
+            state - current dts state
+
+        """
+        switch = {
+            rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
+            rwdts.State.CONFIG: rwdts.State.RUN,
+        }
+
+        handlers = {
+            rwdts.State.INIT: self.init,
+            rwdts.State.RUN: self.run,
+        }
+
+        # Transition application to next state
+        handler = handlers.get(state, None)
+        if handler is not None:
+            yield from handler()
+
+        # Transition dts to next state
+        next_state = switch.get(state, None)
+        if next_state is not None:
+            self.dts.handle.set_state(next_state)
+