X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwautoscaler%2Frift%2Ftasklets%2Frwautoscaler%2Fengine.py;fp=rwlaunchpad%2Fplugins%2Frwautoscaler%2Frift%2Ftasklets%2Frwautoscaler%2Fengine.py;h=3bd2645aeb390746fcbf31d24b4a18f0fad50d0f;hb=4870d0ee29789b859931e4e2c73e13dcb29537d5;hp=d71aefcf1fa114104035f4f1f7addc26bb09167d;hpb=6f1a3fe149e4a6b9803382cb299c902f4cf58ec9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwautoscaler/rift/tasklets/rwautoscaler/engine.py b/rwlaunchpad/plugins/rwautoscaler/rift/tasklets/rwautoscaler/engine.py index d71aefcf..3bd2645a 100644 --- a/rwlaunchpad/plugins/rwautoscaler/rift/tasklets/rwautoscaler/engine.py +++ b/rwlaunchpad/plugins/rwautoscaler/rift/tasklets/rwautoscaler/engine.py @@ -1,5 +1,5 @@ -# +# # Copyright 2016 RIFT.IO Inc # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -38,7 +38,8 @@ class TimeSeries: # 0 -> contains a list of all timestamps # 1 -> contains a list of all values. - self._series = numpy.empty(shape=(2, 1), dtype='int64') + # self._series = numpy.empty(shape=(2, 1), dtype='int64') + self._series = numpy.array([[],[]], dtype='int64') self.threshold_time = threshold_time def add_value(self, timestamp, value): @@ -62,7 +63,7 @@ class TimeSeries: def is_window_full(self): """Verify if there is sufficient data for the current window. """ - if len(self._series[0]) <= 2: + if len(self._series[0]) < 2: return False start_time = self._series[0][0] @@ -106,6 +107,7 @@ class ScalingCriteria: log, dts, loop, + project, nsr_id, monp_id, scaling_criteria, @@ -143,6 +145,7 @@ class ScalingCriteria: self.log, self.dts, self.loop, + project, self.nsr_id, self.monp_id, callback=self.add_value) @@ -175,12 +178,13 @@ class ScalingCriteria: """ if self._timeseries.average() >= self.scale_out: - # Enable the scale in limit, only when a scale-out has happened. - self._scl_in_limit_enabled = True + self.log.info("Triggering a scaling-out request for the criteria {}".format( + self.name)) self.delegate.threshold_out_breached(self.name, avg) - elif self._timeseries.average() < self.scale_in and self._scl_in_limit_enabled: - self._scl_in_limit_enabled = False + elif self._timeseries.average() < self.scale_in : + self.log.info("Triggering a scaling-in request for the criteria {}".format( + self.name)) self.delegate.threshold_in_breached(self.name, avg) @@ -202,6 +206,10 @@ class ScalingCriteria: if not self._timeseries.is_window_full(): return + self.log.debug("Sufficient sampling data obtained for criteria {}." + "Checking the scaling condition for the criteria".format( + self.name)) + if not self.delegate: return @@ -211,7 +219,7 @@ class ScalingCriteria: class ScalingPolicy(ScalingCriteria.Delegate): class Delegate: @abc.abstractmethod - def scale_in(self, scaling_group_name, nsr_id): + def scale_in(self, scaling_group_name, nsr_id, instance_id): """Delegate called when all the criteria for scaling-in are met. Args: @@ -236,6 +244,7 @@ class ScalingPolicy(ScalingCriteria.Delegate): log, dts, loop, + project, nsr_id, nsd_id, scaling_group_name, @@ -258,6 +267,7 @@ class ScalingPolicy(ScalingCriteria.Delegate): self.loop = loop self.log = log self.dts = dts + self.project = project self.nsd_id = nsd_id self.nsr_id = nsr_id self.scaling_group_name = scaling_group_name @@ -270,15 +280,25 @@ class ScalingPolicy(ScalingCriteria.Delegate): self.log, self.dts, self.loop, + self.project, self.nsr_id, callback=self.handle_nsr_monp) + self.nsr_scale_sub = monp_subscriber.NsrScalingGroupRecordSubscriber( + self.log, + self.dts, + self.loop, + self.project, + self.nsr_id, + self.scaling_group_name) + self.criteria_store = {} # Timestamp at which the scale-in/scale-out request was generated. self._last_triggered_time = None self.scale_in_status = {cri.name: False for cri in self.scaling_criteria} self.scale_out_status = {cri.name: False for cri in self.scaling_criteria} + self.scale_out_count = 0 def get_nsd_monp_cfg(self, nsr_monp): """Get the NSD's mon-param config. @@ -294,7 +314,7 @@ class ScalingPolicy(ScalingCriteria.Delegate): Args: monp : Yang Model action : rwdts.QueryAction - + """ def handle_create(): if monp.id in self.criteria_store: @@ -307,10 +327,14 @@ class ScalingPolicy(ScalingCriteria.Delegate): # Create a criteria object as soon as the first monitoring data # is published. + self.log.debug("Created a ScalingCriteria monitor for {}".format( + cri.as_dict())) + criteria = ScalingCriteria( self.log, self.dts, self.loop, + self.project, self.nsr_id, monp.id, cri, @@ -365,6 +389,7 @@ class ScalingPolicy(ScalingCriteria.Delegate): @asyncio.coroutine def register(self): yield from self.monp_sub.register() + yield from self.nsr_scale_sub.register() def deregister(self): self.monp_sub.deregister() @@ -380,6 +405,14 @@ class ScalingPolicy(ScalingCriteria.Delegate): return True + def can_trigger_action(self): + if self._is_in_cooldown(): + self.log.debug("In cooldown phase ignoring the scale action ") + return False + + return True + + def threshold_in_breached(self, criteria_name, value): """Delegate callback when scale-in threshold is breached @@ -387,19 +420,46 @@ class ScalingPolicy(ScalingCriteria.Delegate): criteria_name : Criteria name value : Average value """ - if self._is_in_cooldown(): + self.log.debug("Avg value {} has fallen below the threshold limit for " + "{}".format(value, criteria_name)) + + if not self.can_trigger_action(): + return + + if self.scale_out_count < 1: + self.log.debug('There is no scaled-out VNFs at this point. Hence ignoring the scale-in') return self.scale_in_status[criteria_name] = True + self.log.info("Applying {} operation to check if all criteria {} for" + " scale-in-threshold are met".format( + self.scale_out_op, + self.scale_out_status)) statuses = self.scale_in_status.values() is_breached = self.scale_in_op(statuses) if is_breached and self.delegate: - self._last_triggered_time = time.time() - # Reset all statuses - self.scale_in_status = {cri.name: False for cri in self.scaling_criteria} - self.delegate.scale_in(self.scaling_group_name, self.nsr_id) + self.log.info("Triggering a scale-in action for policy {} as " + "all criteria have been met".format(self.name)) + + @asyncio.coroutine + def check_and_scale_in(): + # data = yield from self.nsr_scale_sub.data() + # if len(data) <= 1: + # return + + # # Get an instance ID + # instance_id = data[-1].instance_id + + instance_id = 0 #assigning a value to follow existing scale_in signature + self._last_triggered_time = time.time() + self.scale_out_count -= 1 + # Reset all statuses + self.scale_in_status = {cri.name: False for cri in self.scaling_criteria} + self.delegate.scale_in(self.scaling_group_name, self.nsr_id, instance_id) + + self.loop.create_task(check_and_scale_in()) def threshold_out_breached(self, criteria_name, value): """Delegate callback when scale-out threshold is breached. @@ -407,16 +467,27 @@ class ScalingPolicy(ScalingCriteria.Delegate): criteria_name : Criteria name value : Average value """ - if self._is_in_cooldown(): + self.log.debug("Avg value {} has gone above the threshold limit for " + "{}".format(value, criteria_name)) + + if not self.can_trigger_action(): return self.scale_out_status[criteria_name] = True + self.log.info("Applying {} operation to check if all criteria {} for" + " scale-out-threshold are met".format( + self.scale_out_op, + self.scale_out_status)) + statuses = self.scale_out_status.values() is_breached = self.scale_out_op(statuses) if is_breached and self.delegate: + self.log.info("Triggering a scale-out action for policy {} as " + "all criteria have been met".format(self.name)) self._last_triggered_time = time.time() + self.scale_out_count += 1 # Reset all statuses self.scale_out_status = {cri.name: False for cri in self.scaling_criteria} self.delegate.scale_out(self.scaling_group_name, self.nsr_id)