-#
+#
# Copyright 2016 RIFT.IO Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# 0 -> contains a list of all timestamps
# 1 -> contains a list of all values.
- self._series = numpy.empty(shape=(2, 1), dtype='int64')
+ # self._series = numpy.empty(shape=(2, 1), dtype='int64')
+ self._series = numpy.array([[],[]], dtype='int64')
self.threshold_time = threshold_time
def add_value(self, timestamp, value):
def is_window_full(self):
"""Verify if there is sufficient data for the current window.
"""
- if len(self._series[0]) <= 2:
+ if len(self._series[0]) < 2:
return False
start_time = self._series[0][0]
log,
dts,
loop,
+ project,
nsr_id,
monp_id,
scaling_criteria,
self.log,
self.dts,
self.loop,
+ project,
self.nsr_id,
self.monp_id,
callback=self.add_value)
"""
if self._timeseries.average() >= self.scale_out:
- # Enable the scale in limit, only when a scale-out has happened.
- self._scl_in_limit_enabled = True
+ self.log.info("Triggering a scaling-out request for the criteria {}".format(
+ self.name))
self.delegate.threshold_out_breached(self.name, avg)
- elif self._timeseries.average() < self.scale_in and self._scl_in_limit_enabled:
- self._scl_in_limit_enabled = False
+ elif self._timeseries.average() < self.scale_in :
+ self.log.info("Triggering a scaling-in request for the criteria {}".format(
+ self.name))
self.delegate.threshold_in_breached(self.name, avg)
if not self._timeseries.is_window_full():
return
+ self.log.debug("Sufficient sampling data obtained for criteria {}."
+ "Checking the scaling condition for the criteria".format(
+ self.name))
+
if not self.delegate:
return
class ScalingPolicy(ScalingCriteria.Delegate):
class Delegate:
@abc.abstractmethod
- def scale_in(self, scaling_group_name, nsr_id):
+ def scale_in(self, scaling_group_name, nsr_id, instance_id):
"""Delegate called when all the criteria for scaling-in are met.
Args:
log,
dts,
loop,
+ project,
nsr_id,
nsd_id,
scaling_group_name,
self.loop = loop
self.log = log
self.dts = dts
+ self.project = project
self.nsd_id = nsd_id
self.nsr_id = nsr_id
self.scaling_group_name = scaling_group_name
self.log,
self.dts,
self.loop,
+ self.project,
self.nsr_id,
callback=self.handle_nsr_monp)
+ self.nsr_scale_sub = monp_subscriber.NsrScalingGroupRecordSubscriber(
+ self.log,
+ self.dts,
+ self.loop,
+ self.project,
+ self.nsr_id,
+ self.scaling_group_name)
+
self.criteria_store = {}
# Timestamp at which the scale-in/scale-out request was generated.
self._last_triggered_time = None
self.scale_in_status = {cri.name: False for cri in self.scaling_criteria}
self.scale_out_status = {cri.name: False for cri in self.scaling_criteria}
+ self.scale_out_count = 0
def get_nsd_monp_cfg(self, nsr_monp):
"""Get the NSD's mon-param config.
Args:
monp : Yang Model
action : rwdts.QueryAction
-
+
"""
def handle_create():
if monp.id in self.criteria_store:
# Create a criteria object as soon as the first monitoring data
# is published.
+ self.log.debug("Created a ScalingCriteria monitor for {}".format(
+ cri.as_dict()))
+
criteria = ScalingCriteria(
self.log,
self.dts,
self.loop,
+ self.project,
self.nsr_id,
monp.id,
cri,
@asyncio.coroutine
def register(self):
yield from self.monp_sub.register()
+ yield from self.nsr_scale_sub.register()
def deregister(self):
self.monp_sub.deregister()
return True
+ def can_trigger_action(self):
+ if self._is_in_cooldown():
+ self.log.debug("In cooldown phase ignoring the scale action ")
+ return False
+
+ return True
+
+
def threshold_in_breached(self, criteria_name, value):
"""Delegate callback when scale-in threshold is breached
criteria_name : Criteria name
value : Average value
"""
- if self._is_in_cooldown():
+ self.log.debug("Avg value {} has fallen below the threshold limit for "
+ "{}".format(value, criteria_name))
+
+ if not self.can_trigger_action():
+ return
+
+ if self.scale_out_count < 1:
+ self.log.debug('There is no scaled-out VNFs at this point. Hence ignoring the scale-in')
return
self.scale_in_status[criteria_name] = True
+ self.log.info("Applying {} operation to check if all criteria {} for"
+ " scale-in-threshold are met".format(
+ self.scale_out_op,
+ self.scale_out_status))
statuses = self.scale_in_status.values()
is_breached = self.scale_in_op(statuses)
if is_breached and self.delegate:
- self._last_triggered_time = time.time()
- # Reset all statuses
- self.scale_in_status = {cri.name: False for cri in self.scaling_criteria}
- self.delegate.scale_in(self.scaling_group_name, self.nsr_id)
+ self.log.info("Triggering a scale-in action for policy {} as "
+ "all criteria have been met".format(self.name))
+
+ @asyncio.coroutine
+ def check_and_scale_in():
+ # data = yield from self.nsr_scale_sub.data()
+ # if len(data) <= 1:
+ # return
+
+ # # Get an instance ID
+ # instance_id = data[-1].instance_id
+
+ instance_id = 0 #assigning a value to follow existing scale_in signature
+ self._last_triggered_time = time.time()
+ self.scale_out_count -= 1
+ # Reset all statuses
+ self.scale_in_status = {cri.name: False for cri in self.scaling_criteria}
+ self.delegate.scale_in(self.scaling_group_name, self.nsr_id, instance_id)
+
+ self.loop.create_task(check_and_scale_in())
def threshold_out_breached(self, criteria_name, value):
"""Delegate callback when scale-out threshold is breached.
criteria_name : Criteria name
value : Average value
"""
- if self._is_in_cooldown():
+ self.log.debug("Avg value {} has gone above the threshold limit for "
+ "{}".format(value, criteria_name))
+
+ if not self.can_trigger_action():
return
self.scale_out_status[criteria_name] = True
+ self.log.info("Applying {} operation to check if all criteria {} for"
+ " scale-out-threshold are met".format(
+ self.scale_out_op,
+ self.scale_out_status))
+
statuses = self.scale_out_status.values()
is_breached = self.scale_out_op(statuses)
if is_breached and self.delegate:
+ self.log.info("Triggering a scale-out action for policy {} as "
+ "all criteria have been met".format(self.name))
self._last_triggered_time = time.time()
+ self.scale_out_count += 1
# Reset all statuses
self.scale_out_status = {cri.name: False for cri in self.scaling_criteria}
self.delegate.scale_out(self.scaling_group_name, self.nsr_id)