3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
24 from . import scaling_operation
25 from . import subscribers
as monp_subscriber
26 from gi
.repository
import RwDts
as rwdts
27 import rift
.mano
.dts
as subscriber
31 """Convenience class to hold the data for the sliding window size.
33 def __init__(self
, threshold_time
):
36 threshold_time (int): window size in secs
39 # 0 -> contains a list of all timestamps
40 # 1 -> contains a list of all values.
41 self
._series
= numpy
.empty(shape
=(2, 1), dtype
='int64')
42 self
.threshold_time
= threshold_time
44 def add_value(self
, timestamp
, value
):
45 timestamp
= int(timestamp
)
47 self
._series
= numpy
.append(
49 [[timestamp
], [value
]],
52 # Drop off stale value
55 # Get all indexes that are outside the window, and drop them
56 window_values
= self
._series
[0] >= (timestamp
- self
.threshold_time
)
57 self
._series
= self
._series
[:, window_values
]
60 return numpy
.average(self
._series
[1])
62 def is_window_full(self
):
63 """Verify if there is sufficient data for the current window.
65 if len(self
._series
[0]) <= 2:
68 start_time
= self
._series
[0][0]
69 end_time
= self
._series
[0][-1]
71 if (end_time
- start_time
) >= self
.threshold_time
:
77 class ScalingCriteria
:
79 """Delegate: callbacks triggered by ScalingCriteris
82 def threshold_out_breached(self
, criteria_name
, avg_value
):
83 """Called when the value has crossed the scale-out-threshold
86 criteria_name (str): Criteria name
87 avg_value (float): The average value of the window.
93 def threshold_in_breached(self
, criteria_name
, avg_value
):
94 """Called when the value has drops below the scale-in-threshold
97 criteria_name (str): Criteria name
98 avg_value (float): The average value of the window.
121 monp_id (str): Monitoring parameter
122 scaling_criteria : Yang data model
123 window_size (int): Length of the window
124 delegate : ScalingCriteria.Delegate
132 self
.sampling_period
= sampling_period
133 self
.window_size
= window_size
134 self
.delegate
= delegate
135 self
.nsr_id
, self
.monp_id
= nsr_id
, monp_id
137 self
._scaling
_criteria
= scaling_criteria
138 self
._timeseries
= TimeSeries(self
.window_size
)
139 # Flag when set, triggers scale-in request.
140 self
._scl
_in
_limit
_enabled
= False
142 self
.nsr_monp_sub
= monp_subscriber
.NsrMonParamSubscriber(
148 callback
=self
.add_value
)
152 return self
._scaling
_criteria
.name
156 return self
._scaling
_criteria
.scale_in_threshold
160 return self
._scaling
_criteria
.scale_out_threshold
164 yield from self
.nsr_monp_sub
.register()
166 def deregister(self
):
167 self
.nsr_monp_sub
.deregister()
169 def trigger_action(self
, timestamp
, avg
):
170 """Triggers the scale out/in
173 timestamp : time in unix epoch
174 avg : Average of all the values in the window size.
177 if self
._timeseries
.average() >= self
.scale_out
:
178 # Enable the scale in limit, only when a scale-out has happened.
179 self
._scl
_in
_limit
_enabled
= True
180 self
.delegate
.threshold_out_breached(self
.name
, avg
)
182 elif self
._timeseries
.average() < self
.scale_in
and self
._scl
_in
_limit
_enabled
:
183 self
._scl
_in
_limit
_enabled
= False
184 self
.delegate
.threshold_in_breached(self
.name
, avg
)
187 def add_value(self
, monp
, action
):
188 """Callback from NsrMonParamSubscriber
192 action : rwdts.QueryAction
194 if action
== rwdts
.QueryAction
.DELETE
:
197 value
= monp
.value_integer
198 timestamp
= time
.time()
200 self
._timeseries
.add_value(timestamp
, value
)
202 if not self
._timeseries
.is_window_full():
205 if not self
.delegate
:
208 self
.trigger_action(timestamp
, value
)
211 class ScalingPolicy(ScalingCriteria
.Delegate
):
214 def scale_in(self
, scaling_group_name
, nsr_id
):
215 """Delegate called when all the criteria for scaling-in are met.
218 scaling_group_name (str): Description
219 nsr_id (str): Description
225 def scale_out(self
, scaling_group_name
, nsr_id
):
226 """Delegate called when all the criteria for scaling-out are met.
229 scaling_group_name (str): Description
230 nsr_id (str): Description
253 scaling_group_name (str): Scaling group ref
254 scaling_policy : Yang model
255 store (SubscriberStore): Subscriber store instance
256 delegate (None, optional): ScalingPolicy.Delegate
263 self
.scaling_group_name
= scaling_group_name
265 self
._scaling
_policy
= scaling_policy
266 self
.delegate
= delegate
269 self
.monp_sub
= monp_subscriber
.NsrMonParamSubscriber(
274 callback
=self
.handle_nsr_monp
)
276 self
.criteria_store
= {}
278 # Timestamp at which the scale-in/scale-out request was generated.
279 self
._last
_triggered
_time
= None
280 self
.scale_in_status
= {cri
.name
: False for cri
in self
.scaling_criteria
}
281 self
.scale_out_status
= {cri
.name
: False for cri
in self
.scaling_criteria
}
283 def get_nsd_monp_cfg(self
, nsr_monp
):
284 """Get the NSD's mon-param config.
286 nsd
= self
.store
.get_nsd(self
.nsd_id
)
287 for monp
in nsd
.monitoring_param
:
288 if monp
.id == nsr_monp
.nsd_mon_param_ref
:
291 def handle_nsr_monp(self
, monp
, action
):
292 """Callback for NSR mon-param handler.
296 action : rwdts.QueryAction
300 if monp
.id in self
.criteria_store
:
303 nsd_monp
= self
.get_nsd_monp_cfg(monp
)
304 for cri
in self
.scaling_criteria
:
305 if cri
.ns_monitoring_param_ref
!= nsd_monp
.id:
308 # Create a criteria object as soon as the first monitoring data
310 criteria
= ScalingCriteria(
317 self
.threshold_time
, # window size
320 self
.criteria_store
[monp
.id] = criteria
324 yield from criteria
.register()
326 self
.loop
.create_task(task())
329 if monp
.id in self
.criteria_store
:
330 self
.criteria_store
[monp
.id].deregister()
331 del self
.criteria_store
[monp
.id]
333 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
335 elif action
== rwdts
.QueryAction
.DELETE
:
340 def scaling_criteria(self
):
341 return self
._scaling
_policy
.scaling_criteria
344 def scale_in_op(self
):
345 optype
= self
._scaling
_policy
.scale_in_operation_type
346 return scaling_operation
.get_operation(optype
)
349 def scale_out_op(self
):
350 optype
= self
._scaling
_policy
.scale_out_operation_type
351 return scaling_operation
.get_operation(optype
)
355 return self
._scaling
_policy
.name
358 def threshold_time(self
):
359 return self
._scaling
_policy
.threshold_time
362 def cooldown_time(self
):
363 return self
._scaling
_policy
.cooldown_time
367 yield from self
.monp_sub
.register()
369 def deregister(self
):
370 self
.monp_sub
.deregister()
372 def _is_in_cooldown(self
):
373 """Verify if the current policy is in cooldown.
375 if not self
._last
_triggered
_time
:
378 if (time
.time() - self
._last
_triggered
_time
) >= self
.cooldown_time
:
383 def threshold_in_breached(self
, criteria_name
, value
):
384 """Delegate callback when scale-in threshold is breached
387 criteria_name : Criteria name
388 value : Average value
390 if self
._is
_in
_cooldown
():
393 self
.scale_in_status
[criteria_name
] = True
395 statuses
= self
.scale_in_status
.values()
396 is_breached
= self
.scale_in_op(statuses
)
398 if is_breached
and self
.delegate
:
399 self
._last
_triggered
_time
= time
.time()
401 self
.scale_in_status
= {cri
.name
: False for cri
in self
.scaling_criteria
}
402 self
.delegate
.scale_in(self
.scaling_group_name
, self
.nsr_id
)
404 def threshold_out_breached(self
, criteria_name
, value
):
405 """Delegate callback when scale-out threshold is breached.
407 criteria_name : Criteria name
408 value : Average value
410 if self
._is
_in
_cooldown
():
413 self
.scale_out_status
[criteria_name
] = True
415 statuses
= self
.scale_out_status
.values()
416 is_breached
= self
.scale_out_op(statuses
)
418 if is_breached
and self
.delegate
:
419 self
._last
_triggered
_time
= time
.time()
421 self
.scale_out_status
= {cri
.name
: False for cri
in self
.scaling_criteria
}
422 self
.delegate
.scale_out(self
.scaling_group_name
, self
.nsr_id
)