3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
24 from . import scaling_operation
25 from . import subscribers
as monp_subscriber
26 from gi
.repository
import RwDts
as rwdts
27 import rift
.mano
.dts
as subscriber
31 """Convenience class to hold the data for the sliding window size.
33 def __init__(self
, threshold_time
):
36 threshold_time (int): window size in secs
39 # 0 -> contains a list of all timestamps
40 # 1 -> contains a list of all values.
41 # self._series = numpy.empty(shape=(2, 1), dtype='int64')
42 self
._series
= numpy
.array([[],[]], dtype
='int64')
43 self
.threshold_time
= threshold_time
45 def add_value(self
, timestamp
, value
):
46 timestamp
= int(timestamp
)
48 self
._series
= numpy
.append(
50 [[timestamp
], [value
]],
53 # Drop off stale value
56 # Get all indexes that are outside the window, and drop them
57 window_values
= self
._series
[0] >= (timestamp
- self
.threshold_time
)
58 self
._series
= self
._series
[:, window_values
]
61 return numpy
.average(self
._series
[1])
63 def is_window_full(self
):
64 """Verify if there is sufficient data for the current window.
66 if len(self
._series
[0]) < 2:
69 start_time
= self
._series
[0][0]
70 end_time
= self
._series
[0][-1]
72 if (end_time
- start_time
) >= self
.threshold_time
:
78 class ScalingCriteria
:
80 """Delegate: callbacks triggered by ScalingCriteris
83 def threshold_out_breached(self
, criteria_name
, avg_value
):
84 """Called when the value has crossed the scale-out-threshold
87 criteria_name (str): Criteria name
88 avg_value (float): The average value of the window.
94 def threshold_in_breached(self
, criteria_name
, avg_value
):
95 """Called when the value has drops below the scale-in-threshold
98 criteria_name (str): Criteria name
99 avg_value (float): The average value of the window.
123 monp_id (str): Monitoring parameter
124 scaling_criteria : Yang data model
125 window_size (int): Length of the window
126 delegate : ScalingCriteria.Delegate
134 self
.sampling_period
= sampling_period
135 self
.window_size
= window_size
136 self
.delegate
= delegate
137 self
.nsr_id
, self
.monp_id
= nsr_id
, monp_id
139 self
._scaling
_criteria
= scaling_criteria
140 self
._timeseries
= TimeSeries(self
.window_size
)
141 # Flag when set, triggers scale-in request.
142 self
._scl
_in
_limit
_enabled
= False
144 self
.nsr_monp_sub
= monp_subscriber
.NsrMonParamSubscriber(
151 callback
=self
.add_value
)
155 return self
._scaling
_criteria
.name
159 return self
._scaling
_criteria
.scale_in_threshold
163 return self
._scaling
_criteria
.scale_out_threshold
167 yield from self
.nsr_monp_sub
.register()
169 def deregister(self
):
170 self
.nsr_monp_sub
.deregister()
172 def trigger_action(self
, timestamp
, avg
):
173 """Triggers the scale out/in
176 timestamp : time in unix epoch
177 avg : Average of all the values in the window size.
180 if self
._timeseries
.average() >= self
.scale_out
:
181 self
.log
.info("Triggering a scaling-out request for the criteria {}".format(
183 self
.delegate
.threshold_out_breached(self
.name
, avg
)
185 elif self
._timeseries
.average() < self
.scale_in
:
186 self
.log
.info("Triggering a scaling-in request for the criteria {}".format(
188 self
.delegate
.threshold_in_breached(self
.name
, avg
)
191 def add_value(self
, monp
, action
):
192 """Callback from NsrMonParamSubscriber
196 action : rwdts.QueryAction
198 if action
== rwdts
.QueryAction
.DELETE
:
201 value
= monp
.value_integer
202 timestamp
= time
.time()
204 self
._timeseries
.add_value(timestamp
, value
)
206 if not self
._timeseries
.is_window_full():
209 self
.log
.debug("Sufficient sampling data obtained for criteria {}."
210 "Checking the scaling condition for the criteria".format(
213 if not self
.delegate
:
216 self
.trigger_action(timestamp
, value
)
219 class ScalingPolicy(ScalingCriteria
.Delegate
):
222 def scale_in(self
, scaling_group_name
, nsr_id
, instance_id
):
223 """Delegate called when all the criteria for scaling-in are met.
226 scaling_group_name (str): Description
227 nsr_id (str): Description
233 def scale_out(self
, scaling_group_name
, nsr_id
):
234 """Delegate called when all the criteria for scaling-out are met.
237 scaling_group_name (str): Description
238 nsr_id (str): Description
262 scaling_group_name (str): Scaling group ref
263 scaling_policy : Yang model
264 store (SubscriberStore): Subscriber store instance
265 delegate (None, optional): ScalingPolicy.Delegate
270 self
.project
= project
273 self
.scaling_group_name
= scaling_group_name
275 self
._scaling
_policy
= scaling_policy
276 self
.delegate
= delegate
279 self
.monp_sub
= monp_subscriber
.NsrMonParamSubscriber(
285 callback
=self
.handle_nsr_monp
)
287 self
.nsr_scale_sub
= monp_subscriber
.NsrScalingGroupRecordSubscriber(
293 self
.scaling_group_name
)
295 self
.criteria_store
= {}
297 # Timestamp at which the scale-in/scale-out request was generated.
298 self
._last
_triggered
_time
= None
299 self
.scale_in_status
= {cri
.name
: False for cri
in self
.scaling_criteria
}
300 self
.scale_out_status
= {cri
.name
: False for cri
in self
.scaling_criteria
}
301 self
.scale_out_count
= 0
303 def get_nsd_monp_cfg(self
, nsr_monp
):
304 """Get the NSD's mon-param config.
306 nsd
= self
.store
.get_nsd(self
.nsd_id
)
307 for monp
in nsd
.monitoring_param
:
308 if monp
.id == nsr_monp
.nsd_mon_param_ref
:
311 def handle_nsr_monp(self
, monp
, action
):
312 """Callback for NSR mon-param handler.
316 action : rwdts.QueryAction
320 if monp
.id in self
.criteria_store
:
323 nsd_monp
= self
.get_nsd_monp_cfg(monp
)
324 for cri
in self
.scaling_criteria
:
325 if cri
.ns_monitoring_param_ref
!= nsd_monp
.id:
328 # Create a criteria object as soon as the first monitoring data
330 self
.log
.debug("Created a ScalingCriteria monitor for {}".format(
333 criteria
= ScalingCriteria(
341 self
.threshold_time
, # window size
344 self
.criteria_store
[monp
.id] = criteria
348 yield from criteria
.register()
350 self
.loop
.create_task(task())
353 if monp
.id in self
.criteria_store
:
354 self
.criteria_store
[monp
.id].deregister()
355 del self
.criteria_store
[monp
.id]
357 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
359 elif action
== rwdts
.QueryAction
.DELETE
:
364 def scaling_criteria(self
):
365 return self
._scaling
_policy
.scaling_criteria
368 def scale_in_op(self
):
369 optype
= self
._scaling
_policy
.scale_in_operation_type
370 return scaling_operation
.get_operation(optype
)
373 def scale_out_op(self
):
374 optype
= self
._scaling
_policy
.scale_out_operation_type
375 return scaling_operation
.get_operation(optype
)
379 return self
._scaling
_policy
.name
382 def threshold_time(self
):
383 return self
._scaling
_policy
.threshold_time
386 def cooldown_time(self
):
387 return self
._scaling
_policy
.cooldown_time
391 yield from self
.monp_sub
.register()
392 yield from self
.nsr_scale_sub
.register()
394 def deregister(self
):
395 self
.monp_sub
.deregister()
397 def _is_in_cooldown(self
):
398 """Verify if the current policy is in cooldown.
400 if not self
._last
_triggered
_time
:
403 if (time
.time() - self
._last
_triggered
_time
) >= self
.cooldown_time
:
408 def can_trigger_action(self
):
409 if self
._is
_in
_cooldown
():
410 self
.log
.debug("In cooldown phase ignoring the scale action ")
416 def threshold_in_breached(self
, criteria_name
, value
):
417 """Delegate callback when scale-in threshold is breached
420 criteria_name : Criteria name
421 value : Average value
423 self
.log
.debug("Avg value {} has fallen below the threshold limit for "
424 "{}".format(value
, criteria_name
))
426 if not self
.can_trigger_action():
429 if self
.scale_out_count
< 1:
430 self
.log
.debug('There is no scaled-out VNFs at this point. Hence ignoring the scale-in')
433 self
.scale_in_status
[criteria_name
] = True
434 self
.log
.info("Applying {} operation to check if all criteria {} for"
435 " scale-in-threshold are met".format(
437 self
.scale_out_status
))
439 statuses
= self
.scale_in_status
.values()
440 is_breached
= self
.scale_in_op(statuses
)
442 if is_breached
and self
.delegate
:
443 self
.log
.info("Triggering a scale-in action for policy {} as "
444 "all criteria have been met".format(self
.name
))
447 def check_and_scale_in():
448 # data = yield from self.nsr_scale_sub.data()
452 # # Get an instance ID
453 # instance_id = data[-1].instance_id
455 instance_id
= 0 #assigning a value to follow existing scale_in signature
456 self
._last
_triggered
_time
= time
.time()
457 self
.scale_out_count
-= 1
459 self
.scale_in_status
= {cri
.name
: False for cri
in self
.scaling_criteria
}
460 self
.delegate
.scale_in(self
.scaling_group_name
, self
.nsr_id
, instance_id
)
462 self
.loop
.create_task(check_and_scale_in())
464 def threshold_out_breached(self
, criteria_name
, value
):
465 """Delegate callback when scale-out threshold is breached.
467 criteria_name : Criteria name
468 value : Average value
470 self
.log
.debug("Avg value {} has gone above the threshold limit for "
471 "{}".format(value
, criteria_name
))
473 if not self
.can_trigger_action():
476 self
.scale_out_status
[criteria_name
] = True
478 self
.log
.info("Applying {} operation to check if all criteria {} for"
479 " scale-out-threshold are met".format(
481 self
.scale_out_status
))
483 statuses
= self
.scale_out_status
.values()
484 is_breached
= self
.scale_out_op(statuses
)
486 if is_breached
and self
.delegate
:
487 self
.log
.info("Triggering a scale-out action for policy {} as "
488 "all criteria have been met".format(self
.name
))
489 self
._last
_triggered
_time
= time
.time()
490 self
.scale_out_count
+= 1
492 self
.scale_out_status
= {cri
.name
: False for cri
in self
.scaling_criteria
}
493 self
.delegate
.scale_out(self
.scaling_group_name
, self
.nsr_id
)