update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwmonparam / rift / tasklets / rwmonparam / rwmonparam.py
index d0f31e3..0cb3e94 100644 (file)
@@ -1,5 +1,5 @@
 """
-# 
+#
 #   Copyright 2016 RIFT.IO Inc
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
@@ -30,106 +30,79 @@ gi.require_version('RwLaunchpadYang', '1.0')
 from gi.repository import (
         RwDts as rwdts,
         RwLaunchpadYang,
+        NsrYang,
         ProtobufC)
 import rift.mano.cloud
 import rift.mano.dts as subscriber
 import rift.tasklets
-
+import concurrent.futures
+from rift.mano.utils.project import (
+    ManoProject,
+    ProjectHandler,
+    )
 from . import vnfr_core
 from . import nsr_core
 
 
-class MonitoringParameterTasklet(rift.tasklets.Tasklet):
-    """The main task of this Tasklet is to listen for VNFR changes and once the
-    VNFR hits the running state, triggers the monitor.
-    """
-    def __init__(self, *args, **kwargs):
-        try:
-            super().__init__(*args, **kwargs)
-            self.rwlog.set_category("rw-monitor-log")
-        except Exception as e:
-            self.log.exception(e)
+class MonParamProject(ManoProject):
+
+    def __init__(self, name, tasklet, **kw):
+        super(MonParamProject, self).__init__(tasklet.log, name)
+        self.update(tasklet)
 
         self.vnfr_subscriber = None
-        self.store = None
 
         self.vnfr_monitors = {}
         self.nsr_monitors = {}
+        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
 
         # Needs to be moved to store once the DTS bug is resolved
+        # Gather all VNFRs
         self.vnfrs = {}
 
-    def start(self):
-        super().start()
-
-        self.log.info("Starting MonitoringParameterTasklet")
-        self.log.debug("Registering with dts")
-
-        self.dts = rift.tasklets.DTS(
-                self.tasklet_info,
-                RwLaunchpadYang.get_schema(),
-                self.loop,
-                self.on_dts_state_change
-                )
-
-        self.vnfr_subscriber = subscriber.VnfrCatalogSubscriber.from_tasklet(
+        self.vnfr_subscriber = subscriber.VnfrCatalogSubscriber.from_project(
                 self,
                 callback=self.handle_vnfr)
-        self.nsr_subsriber = subscriber.NsrCatalogSubscriber.from_tasklet(
+        self.nsr_subsriber = subscriber.NsrCatalogSubscriber.from_project(
                 self,
                 callback=self.handle_nsr)
 
-        self.store = subscriber.SubscriberStore.from_tasklet(self)
+        self._nsd_subscriber = subscriber.NsdCatalogSubscriber.from_project(self)
+        self._vnfd_subscriber = subscriber.VnfdCatalogSubscriber.from_project(self)
 
         self.log.debug("Created DTS Api GI Object: %s", self.dts)
 
-    def stop(self):
-      try:
-          self.dts.deinit()
-      except Exception as e:
-          self.log.exception(e)
-
     @asyncio.coroutine
-    def init(self):
+    def register (self):
         self.log.debug("creating vnfr subscriber")
-        yield from self.store.register()
+        yield from self._nsd_subscriber.register()
+        yield from self._vnfd_subscriber.register()
         yield from self.vnfr_subscriber.register()
         yield from self.nsr_subsriber.register()
 
-    @asyncio.coroutine
-    def run(self):
-        pass
 
-    @asyncio.coroutine
-    def on_dts_state_change(self, state):
-        """Handle DTS state change
+    def deregister(self):
+        self.log.debug("De-register vnfr project {}".format(self.name))
+        self._nsd_subscriber.deregister()
+        self._vnfd_subscriber.deregister()
+        self.vnfr_subscriber.deregister()
+        self.nsr_subsriber.deregister()
 
-        Take action according to current DTS state to transition application
-        into the corresponding application state
-
-        Arguments
-            state - current dts state
+    def _unwrap(self, values, id_name):
+        try:
+            return values[0]
+        except KeyError:
+            self.log.exception("Unable to find the object with the given "
+                "ID {}".format(id_name))
 
-        """
-        switch = {
-            rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
-            rwdts.State.CONFIG: rwdts.State.RUN,
-        }
+    def get_vnfd(self, vnfd_id):
+        values = [vnfd for vnfd in list(self._vnfd_subscriber.reg.get_xact_elements()) if vnfd.id == vnfd_id]
+        return self._unwrap(values, vnfd_id)
 
-        handlers = {
-            rwdts.State.INIT: self.init,
-            rwdts.State.RUN: self.run,
-        }
+    def get_nsd(self, nsd_id):
+        values = [nsd for nsd in list(self._nsd_subscriber.reg.get_xact_elements()) if nsd.id == nsd_id]
+        return self._unwrap(values, nsd_id)
 
-        # Transition application to next state
-        handler = handlers.get(state, None)
-        if handler is not None:
-            yield from handler()
-
-        # Transition dts to next state
-        next_state = switch.get(state, None)
-        if next_state is not None:
-            self.dts.handle.set_state(next_state)
 
     def handle_vnfr(self, vnfr, action):
         """Starts a monitoring parameter job for every VNFR that reaches
@@ -142,12 +115,15 @@ class MonitoringParameterTasklet(rift.tasklets.Tasklet):
 
         def vnfr_create():
             # if vnfr.operational_status == "running" and vnfr.id not in self.vnfr_monitors:
-            if vnfr.config_status == "configured" and vnfr.id not in self.vnfr_monitors:
+            vnfr_status = (vnfr.operational_status == "running" and
+                           vnfr.config_status in ["configured", "config_not_needed"])
+
+            if vnfr_status and vnfr.id not in self.vnfr_monitors:
 
                 vnf_mon = vnfr_core.VnfMonitorDtsHandler.from_vnf_data(
                         self,
                         vnfr,
-                        self.store.get_vnfd(vnfr.vnfd_ref))
+                        self.get_vnfd(vnfr.vnfd.id))
 
                 self.vnfr_monitors[vnfr.id] = vnf_mon
                 self.vnfrs[vnfr.id] = vnfr
@@ -155,7 +131,10 @@ class MonitoringParameterTasklet(rift.tasklets.Tasklet):
                 @asyncio.coroutine
                 def task():
                     yield from vnf_mon.register()
+                    if vnfr.nsr_id_ref in self.nsr_monitors:
+                        vnf_mon.update_nsr_mon(self.nsr_monitors[vnfr.nsr_id_ref])
                     vnf_mon.start()
+                    #self.update_nsrs(vnfr, action)
 
                 self.loop.create_task(task())
 
@@ -166,47 +145,94 @@ class MonitoringParameterTasklet(rift.tasklets.Tasklet):
                 vnf_mon = self.vnfr_monitors.pop(vnfr.id)
                 vnf_mon.stop()
                 self.vnfrs.pop(vnfr.id)
+                #self.update_nsrs(vnfr, action)
 
         if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
             vnfr_create()
         elif action == rwdts.QueryAction.DELETE:
             vnfr_delete()
 
+    def update_nsrs(self, vnfr, action):
+        if vnfr.nsr_id_ref not in self.nsr_monitors:
+            return
+
+        monitor = self.nsr_monitors[vnfr.nsr_id_ref]
+
+        if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
+            @asyncio.coroutine
+            def update_vnfr():
+                yield from monitor.update([vnfr])
+
+            self.loop.create_task(update_vnfr())
+        elif action == rwdts.QueryAction.DELETE:
+            @asyncio.coroutine
+            def delete_vnfr():
+                try:
+                    yield from monitor.delete([vnfr])
+                except Exception as e:
+                    self.log.exception(str(e))
+
+            self.loop.create_task(delete_vnfr())
+
+
 
     def handle_nsr(self, nsr, action):
         """Callback for NSR opdata changes. Creates a publisher for every
         NS that moves to config state.
 
         Args:
-            nsr (RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr): Ns Opdata
+            nsr (RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr): Ns Opdata
             action (rwdts.QueryAction): Action type of the change.
         """
+
         def nsr_create():
-            # if nsr.operational_status == "running" and nsr.ns_instance_config_ref not in self.nsr_monitors:
-            if nsr.config_status == "configured" and nsr.ns_instance_config_ref not in self.nsr_monitors:
-                nsr_mon = nsr_core.NsrMonitorDtsHandler(
-                        self.log,
-                        self.dts,
-                        self.loop,
-                        nsr,
-                        list(self.vnfrs.values()),
-                        self.store
-                        )
-
-                self.nsr_monitors[nsr.ns_instance_config_ref] = nsr_mon
+            # TODO clean up the if-else mess, exception
 
-                @asyncio.coroutine
-                def task():
-                    yield from nsr_mon.register()
-                    yield from nsr_mon.start()
+            success_state = (nsr.operational_status == "running" and
+                    nsr.config_status == "configured")
 
-                self.loop.create_task(task())
+            if not success_state:
+                return
+
+            if nsr.ns_instance_config_ref in self.nsr_monitors:
+                return
 
+            constituent_vnfrs = []
 
+            for vnfr_id in nsr.constituent_vnfr_ref:
+                if (vnfr_id.vnfr_id in self.vnfrs):
+                    vnfr_obj = self.vnfrs[vnfr_id.vnfr_id]
+                    constituent_vnfrs.append(vnfr_obj)
+                else:
+                    pass
+
+            nsr_mon = nsr_core.NsrMonitorDtsHandler(
+                self.log,
+                self.dts,
+                self.loop,
+                self,
+                nsr,
+                constituent_vnfrs
+            )
+            for vnfr_id in nsr.constituent_vnfr_ref:
+                if vnfr_id.vnfr_id in self.vnfr_monitors:
+                     self.vnfr_monitors[vnfr_id.vnfr_id].update_nsr_mon(nsr_mon)
+
+            self.nsr_monitors[nsr.ns_instance_config_ref] = nsr_mon
+
+
+            @asyncio.coroutine
+            def task():
+                try:
+                    yield from nsr_mon.register()
+                    yield from nsr_mon.start()
+                except Exception as e:
+                    self.log.exception(e)
+
+            self.loop.create_task(task())
 
         def nsr_delete():
             if nsr.ns_instance_config_ref in self.nsr_monitors:
-            # if vnfr.operational_status == "running" and vnfr.id in self.vnfr_monitors:
                 nsr_mon = self.nsr_monitors.pop(nsr.ns_instance_config_ref)
                 nsr_mon.stop()
 
@@ -214,3 +240,78 @@ class MonitoringParameterTasklet(rift.tasklets.Tasklet):
             nsr_create()
         elif action == rwdts.QueryAction.DELETE:
             nsr_delete()
+
+
+class MonitoringParameterTasklet(rift.tasklets.Tasklet):
+    """The main task of this Tasklet is to listen for VNFR changes and once the
+    VNFR hits the running state, triggers the monitor.
+    """
+    def __init__(self, *args, **kwargs):
+        try:
+            super().__init__(*args, **kwargs)
+            self.rwlog.set_category("rw-monitor-log")
+        except Exception as e:
+            self.log.exception(e)
+
+        self._project_handler = None
+        self.projects = {}
+
+    def start(self):
+        super().start()
+
+        self.log.info("Starting MonitoringParameterTasklet")
+        self.log.debug("Registering with dts")
+
+        self.dts = rift.tasklets.DTS(
+                self.tasklet_info,
+                NsrYang.get_schema(),
+                self.loop,
+                self.on_dts_state_change
+                )
+
+    def stop(self):
+      try:
+          self.dts.deinit()
+      except Exception as e:
+          self.log.exception(e)
+
+    @asyncio.coroutine
+    def init(self):
+        self.log.debug("creating project handler")
+        self.project_handler = ProjectHandler(self, MonParamProject)
+        self.project_handler.register()
+
+    @asyncio.coroutine
+    def run(self):
+        pass
+
+    @asyncio.coroutine
+    def on_dts_state_change(self, state):
+        """Handle DTS state change
+
+        Take action according to current DTS state to transition application
+        into the corresponding application state
+
+        Arguments
+            state - current dts state
+
+        """
+        switch = {
+            rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
+            rwdts.State.CONFIG: rwdts.State.RUN,
+        }
+
+        handlers = {
+            rwdts.State.INIT: self.init,
+            rwdts.State.RUN: self.run,
+        }
+
+        # Transition application to next state
+        handler = handlers.get(state, None)
+        if handler is not None:
+            yield from handler()
+
+        # Transition dts to next state
+        next_state = switch.get(state, None)
+        if next_state is not None:
+            self.dts.handle.set_state(next_state)