update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwautoscaler / rift / tasklets / rwautoscaler / engine.py
index d71aefc..3bd2645 100644 (file)
@@ -1,5 +1,5 @@
 
-# 
+#
 #   Copyright 2016 RIFT.IO Inc
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
@@ -38,7 +38,8 @@ class TimeSeries:
 
         # 0 -> contains a list of all timestamps
         # 1 -> contains a list of all values.
-        self._series = numpy.empty(shape=(2, 1), dtype='int64')
+        # self._series = numpy.empty(shape=(2, 1), dtype='int64')
+        self._series = numpy.array([[],[]], dtype='int64')
         self.threshold_time = threshold_time
 
     def add_value(self, timestamp, value):
@@ -62,7 +63,7 @@ class TimeSeries:
     def is_window_full(self):
         """Verify if there is sufficient data for the current window.
         """
-        if len(self._series[0]) <= 2:
+        if len(self._series[0]) < 2:
             return False
 
         start_time = self._series[0][0]
@@ -106,6 +107,7 @@ class ScalingCriteria:
             log,
             dts,
             loop,
+            project,
             nsr_id,
             monp_id,
             scaling_criteria,
@@ -143,6 +145,7 @@ class ScalingCriteria:
                 self.log,
                 self.dts,
                 self.loop,
+                project,
                 self.nsr_id,
                 self.monp_id,
                 callback=self.add_value)
@@ -175,12 +178,13 @@ class ScalingCriteria:
 
         """
         if self._timeseries.average() >= self.scale_out:
-            # Enable the scale in limit, only when a scale-out has happened.
-            self._scl_in_limit_enabled = True
+            self.log.info("Triggering a scaling-out request for the criteria {}".format(
+                self.name))
             self.delegate.threshold_out_breached(self.name, avg)
 
-        elif self._timeseries.average() < self.scale_in and self._scl_in_limit_enabled:
-            self._scl_in_limit_enabled = False
+        elif self._timeseries.average() < self.scale_in :
+            self.log.info("Triggering a scaling-in request for the criteria {}".format(
+                self.name))
             self.delegate.threshold_in_breached(self.name, avg)
 
 
@@ -202,6 +206,10 @@ class ScalingCriteria:
         if not self._timeseries.is_window_full():
             return
 
+        self.log.debug("Sufficient sampling data obtained for criteria {}."
+                       "Checking the scaling condition for the criteria".format(
+                           self.name))
+
         if not self.delegate:
             return
 
@@ -211,7 +219,7 @@ class ScalingCriteria:
 class ScalingPolicy(ScalingCriteria.Delegate):
     class Delegate:
         @abc.abstractmethod
-        def scale_in(self, scaling_group_name, nsr_id):
+        def scale_in(self, scaling_group_name, nsr_id, instance_id):
             """Delegate called when all the criteria for scaling-in are met.
 
             Args:
@@ -236,6 +244,7 @@ class ScalingPolicy(ScalingCriteria.Delegate):
             log,
             dts,
             loop,
+            project,
             nsr_id,
             nsd_id,
             scaling_group_name,
@@ -258,6 +267,7 @@ class ScalingPolicy(ScalingCriteria.Delegate):
         self.loop = loop
         self.log = log
         self.dts = dts
+        self.project = project
         self.nsd_id = nsd_id
         self.nsr_id = nsr_id
         self.scaling_group_name = scaling_group_name
@@ -270,15 +280,25 @@ class ScalingPolicy(ScalingCriteria.Delegate):
                                 self.log,
                                 self.dts,
                                 self.loop,
+                                self.project,
                                 self.nsr_id,
                                 callback=self.handle_nsr_monp)
 
+        self.nsr_scale_sub = monp_subscriber.NsrScalingGroupRecordSubscriber(
+                                self.log,
+                                self.dts,
+                                self.loop,
+                                self.project,
+                                self.nsr_id,
+                                self.scaling_group_name)
+
         self.criteria_store = {}
 
         # Timestamp at which the scale-in/scale-out request was generated.
         self._last_triggered_time = None
         self.scale_in_status = {cri.name: False for cri in self.scaling_criteria}
         self.scale_out_status = {cri.name: False for cri in self.scaling_criteria}
+        self.scale_out_count = 0
 
     def get_nsd_monp_cfg(self, nsr_monp):
         """Get the NSD's mon-param config.
@@ -294,7 +314,7 @@ class ScalingPolicy(ScalingCriteria.Delegate):
         Args:
             monp : Yang Model
             action : rwdts.QueryAction
-        
+
         """
         def handle_create():
             if monp.id in self.criteria_store:
@@ -307,10 +327,14 @@ class ScalingPolicy(ScalingCriteria.Delegate):
 
                 # Create a criteria object as soon as the first monitoring data
                 # is published.
+                self.log.debug("Created a ScalingCriteria monitor for {}".format(
+                    cri.as_dict()))
+
                 criteria = ScalingCriteria(
                         self.log,
                         self.dts,
                         self.loop,
+                        self.project,
                         self.nsr_id,
                         monp.id,
                         cri,
@@ -365,6 +389,7 @@ class ScalingPolicy(ScalingCriteria.Delegate):
     @asyncio.coroutine
     def register(self):
         yield from self.monp_sub.register()
+        yield from self.nsr_scale_sub.register()
 
     def deregister(self):
         self.monp_sub.deregister()
@@ -380,6 +405,14 @@ class ScalingPolicy(ScalingCriteria.Delegate):
 
         return True
 
+    def can_trigger_action(self):
+        if self._is_in_cooldown():
+            self.log.debug("In cooldown phase ignoring the scale action ")
+            return False
+
+        return True
+
+
     def threshold_in_breached(self, criteria_name, value):
         """Delegate callback when scale-in threshold is breached
 
@@ -387,19 +420,46 @@ class ScalingPolicy(ScalingCriteria.Delegate):
             criteria_name : Criteria name
             value : Average value
         """
-        if self._is_in_cooldown():
+        self.log.debug("Avg value {} has fallen below the threshold limit for "
+                      "{}".format(value, criteria_name))
+
+        if not self.can_trigger_action():
+            return
+
+        if self.scale_out_count < 1:
+            self.log.debug('There is no scaled-out VNFs at this point. Hence ignoring the scale-in')
             return
 
         self.scale_in_status[criteria_name] = True
+        self.log.info("Applying {} operation to check if all criteria {} for"
+                      " scale-in-threshold are met".format(
+                          self.scale_out_op,
+                          self.scale_out_status))
 
         statuses = self.scale_in_status.values()
         is_breached = self.scale_in_op(statuses)
 
         if is_breached and self.delegate:
-            self._last_triggered_time = time.time()
-            # Reset all statuses
-            self.scale_in_status = {cri.name: False for cri in self.scaling_criteria}
-            self.delegate.scale_in(self.scaling_group_name, self.nsr_id)
+            self.log.info("Triggering a scale-in action for policy {} as "
+                           "all criteria have been met".format(self.name))
+
+            @asyncio.coroutine
+            def check_and_scale_in():
+                # data = yield from self.nsr_scale_sub.data()
+                # if len(data) <= 1:
+                #     return
+
+                # # Get an instance ID
+                # instance_id = data[-1].instance_id
+
+                instance_id = 0     #assigning a value to follow existing scale_in signature
+                self._last_triggered_time = time.time()
+                self.scale_out_count -= 1
+                # Reset all statuses
+                self.scale_in_status = {cri.name: False for cri in self.scaling_criteria}
+                self.delegate.scale_in(self.scaling_group_name, self.nsr_id, instance_id)
+
+            self.loop.create_task(check_and_scale_in())
 
     def threshold_out_breached(self, criteria_name, value):
         """Delegate callback when scale-out threshold is breached.
@@ -407,16 +467,27 @@ class ScalingPolicy(ScalingCriteria.Delegate):
             criteria_name : Criteria name
             value : Average value
         """
-        if self._is_in_cooldown():
+        self.log.debug("Avg value {} has gone above the threshold limit for "
+                      "{}".format(value, criteria_name))
+
+        if not self.can_trigger_action():
             return
 
         self.scale_out_status[criteria_name] = True
 
+        self.log.info("Applying {} operation to check if all criteria {} for"
+                      " scale-out-threshold are met".format(
+                          self.scale_out_op,
+                          self.scale_out_status))
+
         statuses = self.scale_out_status.values()
         is_breached = self.scale_out_op(statuses)
 
         if is_breached and self.delegate:
+            self.log.info("Triggering a scale-out action for policy {} as "
+                           "all criteria have been met".format(self.name))
             self._last_triggered_time = time.time()
+            self.scale_out_count += 1
             # Reset all statuses
             self.scale_out_status = {cri.name: False for cri in self.scaling_criteria}
             self.delegate.scale_out(self.scaling_group_name, self.nsr_id)