update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwautoscaler / rift / tasklets / rwautoscaler / rwautoscaler.py
index affa579..1741a58 100644 (file)
@@ -1,6 +1,6 @@
 """
-# 
-#   Copyright 2016 RIFT.IO Inc
+#
+#   Copyright 2016-2017 RIFT.IO Inc
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -39,97 +39,46 @@ from gi.repository import (
 import rift.mano.cloud
 import rift.mano.dts as subscriber
 import rift.tasklets
+from rift.mano.utils.project import (
+    ManoProject,
+    ProjectHandler,
+    )
 
+class AutoScalerProject(ManoProject, engine.ScalingPolicy.Delegate):
 
+    def __init__(self, name, tasklet, **kw):
+        super(AutoScalerProject, self).__init__(tasklet.log, name)
+        self.update(tasklet)
 
-class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate):
-    """The main task of this Tasklet is to listen for NSR changes and once the
-    NSR is configured, ScalingPolicy is created.
-    """
-    def __init__(self, *args, **kwargs):
-
-        try:
-            super().__init__(*args, **kwargs)
-            self.store = None
-            self.monparam_store = None
-
-            self.nsr_sub = None
-            self.nsr_monp_subscribers = {}
-            self.instance_id_store = collections.defaultdict(list)
-
-        except Exception as e:
-            self.log.exception(e)
-
-    def start(self):
-        super().start()
-
-        self.log.debug("Registering with dts")
-
-        self.dts = rift.tasklets.DTS(
-                self.tasklet_info,
-                RwLaunchpadYang.get_schema(),
-                self.loop,
-                self.on_dts_state_change
-                )
+        self.store = None
+        self.monparam_store = None
+        self.nsr_sub = None
+        self.nsr_monp_subscribers = {}
+        self.instance_id_store = collections.defaultdict(list)
 
-        self.store = subscriber.SubscriberStore.from_tasklet(self)
-        self.nsr_sub = subscriber.NsrCatalogSubscriber(self.log, self.dts, self.loop, self.handle_nsr)
+        self.store = subscriber.SubscriberStore.from_project(self)
+        self.nsr_sub = subscriber.NsrCatalogSubscriber(self.log, self.dts, self.loop,
+                                                       self, self.handle_nsr)
 
-        self.log.debug("Created DTS Api GI Object: %s", self.dts)
+    def deregister(self):
+        self.log.debug("De-register project {}".format(self.name))
+        self.nsr_sub.deregister()
+        self.store.deregister()
 
-    def stop(self):
-        try:
-            self.dts.deinit()
-        except Exception as e:
-            self.log.exception(e)
 
     @asyncio.coroutine
-    def init(self):
+    def register (self):
         self.log.debug("creating vnfr subscriber")
         yield from self.store.register()
         yield from self.nsr_sub.register()
 
-    @asyncio.coroutine
-    def run(self):
-        pass
-
-    @asyncio.coroutine
-    def on_dts_state_change(self, state):
-        """Handle DTS state change
-
-        Take action according to current DTS state to transition application
-        into the corresponding application state
-
-        Arguments
-            state - current dts state
-
-        """
-        switch = {
-            rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
-            rwdts.State.CONFIG: rwdts.State.RUN,
-        }
-
-        handlers = {
-            rwdts.State.INIT: self.init,
-            rwdts.State.RUN: self.run,
-        }
-
-        # Transition application to next state
-        handler = handlers.get(state, None)
-        if handler is not None:
-            yield from handler()
-
-        # Transition dts to next state
-        next_state = switch.get(state, None)
-        if next_state is not None:
-            self.dts.handle.set_state(next_state)
-
-    def scale_in(self, scaling_group_name, nsr_id):
+    def scale_in(self, scaling_group_name, nsr_id, instance_id):
         """Delegate callback
 
         Args:
             scaling_group_name (str): Scaling group name to be scaled in
             nsr_id (str): NSR id
+            instance_id (str): Instance id of the scaling group
 
         """
         self.log.info("Sending a scaling-in request for {} in NSR: {}".format(
@@ -138,12 +87,14 @@ class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate):
 
         @asyncio.coroutine
         def _scale_in():
-            instance_id = self.instance_id_store[(scaling_group_name, nsr_id)].pop()
 
+            # Purposely ignore passed instance_id
+            instance_id_ = self.instance_id_store[(scaling_group_name, nsr_id)].pop()
             # Trigger an rpc
             rpc_ip = NsrYang.YangInput_Nsr_ExecScaleIn.from_dict({
+                'project_name': self.name,
                 'nsr_id_ref': nsr_id,
-                'instance_id': instance_id,
+                'instance_id': instance_id_,
                 'scaling_group_name_ref': scaling_group_name})
 
             rpc_out = yield from self.dts.query_rpc(
@@ -151,7 +102,9 @@ class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate):
                         0,
                         rpc_ip)
 
-        self.loop.create_task(_scale_in())
+        # Check for existing scaled-out VNFs if any.
+        if len(self.instance_id_store):
+            self.loop.create_task(_scale_in())
 
     def scale_out(self, scaling_group_name, nsr_id):
         """Delegate callback for scale out requests
@@ -168,6 +121,7 @@ class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate):
         def _scale_out():
             # Trigger an rpc
             rpc_ip = NsrYang.YangInput_Nsr_ExecScaleOut.from_dict({
+                'project_name': self.name,
                 'nsr_id_ref': nsr_id ,
                 'scaling_group_name_ref': scaling_group_name})
 
@@ -191,7 +145,7 @@ class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate):
         NS that moves to config state.
 
         Args:
-            nsr (RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr): Ns Opdata
+            nsr (RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr): Ns Opdata
             action (rwdts.QueryAction): Action type of the change.
         """
         def nsr_create():
@@ -199,12 +153,15 @@ class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate):
                 nsr_id = nsr.ns_instance_config_ref
                 self.nsr_monp_subscribers[nsr_id] = []
                 nsd = self.store.get_nsd(nsr.nsd_ref)
+                self.log.debug ("Creating a scaling policy monitor for NSR: {}".format(
+                    nsr_id))
+
                 @asyncio.coroutine
                 def task():
                     for scaling_group in nsd.scaling_group_descriptor:
                         for policy_cfg in scaling_group.scaling_policy:
                             policy = engine.ScalingPolicy(
-                                self.log, self.dts, self.loop,
+                                self.log, self.dts, self.loop, self,
                                 nsr.ns_instance_config_ref,
                                 nsr.nsd_ref,
                                 scaling_group.name,
@@ -213,6 +170,9 @@ class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate):
                                 delegate=self)
                             self.nsr_monp_subscribers[nsr_id].append(policy)
                             yield from policy.register()
+                    self.log.debug ("Started a scaling policy monitor for NSR: {}".format(
+                        nsr_id))
+
 
                 self.loop.create_task(task())
 
@@ -223,8 +183,90 @@ class AutoScalerTasklet(rift.tasklets.Tasklet, engine.ScalingPolicy.Delegate):
                 for policy in policies:
                     policy.deregister()
                 del self.nsr_monp_subscribers[nsr.ns_instance_config_ref]
+                self.log.debug ("Deleted the scaling policy monitor for NSD: {}".format(
+                    nsr.ns_instance_config_ref))
+
 
         if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
             nsr_create()
         elif action == rwdts.QueryAction.DELETE:
             nsr_delete()
+
+
+class AutoScalerTasklet(rift.tasklets.Tasklet):
+    """The main task of this Tasklet is to listen for NSR changes and once the
+    NSR is configured, ScalingPolicy is created.
+    """
+    def __init__(self, *args, **kwargs):
+
+        try:
+            super().__init__(*args, **kwargs)
+            self.rwlog.set_category("rw-mano-log")
+
+            self._project_handler = None
+            self.projects = {}
+
+        except Exception as e:
+            self.log.exception(e)
+
+    def start(self):
+        super().start()
+
+        self.log.debug("Registering with dts")
+
+        self.dts = rift.tasklets.DTS(
+                self.tasklet_info,
+                RwLaunchpadYang.get_schema(),
+                self.loop,
+                self.on_dts_state_change
+                )
+
+        self.log.debug("Created DTS Api GI Object: %s", self.dts)
+
+    def stop(self):
+        try:
+            self.dts.deinit()
+        except Exception as e:
+            self.log.exception(e)
+
+    @asyncio.coroutine
+    def init(self):
+        self.log.debug("creating project handler")
+        self.project_handler = ProjectHandler(self, AutoScalerProject)
+        self.project_handler.register()
+
+    @asyncio.coroutine
+    def run(self):
+        pass
+
+    @asyncio.coroutine
+    def on_dts_state_change(self, state):
+        """Handle DTS state change
+
+        Take action according to current DTS state to transition application
+        into the corresponding application state
+
+        Arguments
+            state - current dts state
+
+        """
+        switch = {
+            rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
+            rwdts.State.CONFIG: rwdts.State.RUN,
+        }
+
+        handlers = {
+            rwdts.State.INIT: self.init,
+            rwdts.State.RUN: self.run,
+        }
+
+        # Transition application to next state
+        handler = handlers.get(state, None)
+        if handler is not None:
+            yield from handler()
+
+        # Transition dts to next state
+        next_state = switch.get(state, None)
+        if next_state is not None:
+            self.dts.handle.set_state(next_state)
+