d71aefcf1fa114104035f4f1f7addc26bb09167d
[osm/SO.git] / rwlaunchpad / plugins / rwautoscaler / rift / tasklets / rwautoscaler / engine.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import abc
19 import asyncio
20 import time
21
22 import numpy
23
24 from . import scaling_operation
25 from . import subscribers as monp_subscriber
26 from gi.repository import RwDts as rwdts
27 import rift.mano.dts as subscriber
28
29
30 class TimeSeries:
31 """Convenience class to hold the data for the sliding window size.
32 """
33 def __init__(self, threshold_time):
34 """
35 Args:
36 threshold_time (int): window size in secs
37 """
38
39 # 0 -> contains a list of all timestamps
40 # 1 -> contains a list of all values.
41 self._series = numpy.empty(shape=(2, 1), dtype='int64')
42 self.threshold_time = threshold_time
43
44 def add_value(self, timestamp, value):
45 timestamp = int(timestamp)
46
47 self._series = numpy.append(
48 self._series,
49 [[timestamp], [value]],
50 axis=1)
51
52 # Drop off stale value
53 # 0 -> timestamp
54 # 1 -> values
55 # Get all indexes that are outside the window, and drop them
56 window_values = self._series[0] >= (timestamp - self.threshold_time)
57 self._series = self._series[:, window_values]
58
59 def average(self):
60 return numpy.average(self._series[1])
61
62 def is_window_full(self):
63 """Verify if there is sufficient data for the current window.
64 """
65 if len(self._series[0]) <= 2:
66 return False
67
68 start_time = self._series[0][0]
69 end_time = self._series[0][-1]
70
71 if (end_time - start_time) >= self.threshold_time:
72 return True
73
74 return False
75
76
77 class ScalingCriteria:
78 class Delegate:
79 """Delegate: callbacks triggered by ScalingCriteris
80 """
81 @abc.abstractmethod
82 def threshold_out_breached(self, criteria_name, avg_value):
83 """Called when the value has crossed the scale-out-threshold
84
85 Args:
86 criteria_name (str): Criteria name
87 avg_value (float): The average value of the window.
88
89 """
90 pass
91
92 @abc.abstractmethod
93 def threshold_in_breached(self, criteria_name, avg_value):
94 """Called when the value has drops below the scale-in-threshold
95
96 Args:
97 criteria_name (str): Criteria name
98 avg_value (float): The average value of the window.
99
100 """
101
102 pass
103
104 def __init__(
105 self,
106 log,
107 dts,
108 loop,
109 nsr_id,
110 monp_id,
111 scaling_criteria,
112 window_size,
113 sampling_period=1,
114 delegate=None):
115 """
116 Args:
117 log : Log
118 dts : DTS handle
119 loop : Event Handle
120 nsr_id (str): NSR ID
121 monp_id (str): Monitoring parameter
122 scaling_criteria : Yang data model
123 window_size (int): Length of the window
124 delegate : ScalingCriteria.Delegate
125
126 Note:
127
128 """
129 self.log = log
130 self.dts = dts
131 self.loop = loop
132 self.sampling_period = sampling_period
133 self.window_size = window_size
134 self.delegate = delegate
135 self.nsr_id, self.monp_id = nsr_id, monp_id
136
137 self._scaling_criteria = scaling_criteria
138 self._timeseries = TimeSeries(self.window_size)
139 # Flag when set, triggers scale-in request.
140 self._scl_in_limit_enabled = False
141
142 self.nsr_monp_sub = monp_subscriber.NsrMonParamSubscriber(
143 self.log,
144 self.dts,
145 self.loop,
146 self.nsr_id,
147 self.monp_id,
148 callback=self.add_value)
149
150 @property
151 def name(self):
152 return self._scaling_criteria.name
153
154 @property
155 def scale_in(self):
156 return self._scaling_criteria.scale_in_threshold
157
158 @property
159 def scale_out(self):
160 return self._scaling_criteria.scale_out_threshold
161
162 @asyncio.coroutine
163 def register(self):
164 yield from self.nsr_monp_sub.register()
165
166 def deregister(self):
167 self.nsr_monp_sub.deregister()
168
169 def trigger_action(self, timestamp, avg):
170 """Triggers the scale out/in
171
172 Args:
173 timestamp : time in unix epoch
174 avg : Average of all the values in the window size.
175
176 """
177 if self._timeseries.average() >= self.scale_out:
178 # Enable the scale in limit, only when a scale-out has happened.
179 self._scl_in_limit_enabled = True
180 self.delegate.threshold_out_breached(self.name, avg)
181
182 elif self._timeseries.average() < self.scale_in and self._scl_in_limit_enabled:
183 self._scl_in_limit_enabled = False
184 self.delegate.threshold_in_breached(self.name, avg)
185
186
187 def add_value(self, monp, action):
188 """Callback from NsrMonParamSubscriber
189
190 Args:
191 monp : Yang model
192 action : rwdts.QueryAction
193 """
194 if action == rwdts.QueryAction.DELETE:
195 return
196
197 value = monp.value_integer
198 timestamp = time.time()
199
200 self._timeseries.add_value(timestamp, value)
201
202 if not self._timeseries.is_window_full():
203 return
204
205 if not self.delegate:
206 return
207
208 self.trigger_action(timestamp, value)
209
210
211 class ScalingPolicy(ScalingCriteria.Delegate):
212 class Delegate:
213 @abc.abstractmethod
214 def scale_in(self, scaling_group_name, nsr_id):
215 """Delegate called when all the criteria for scaling-in are met.
216
217 Args:
218 scaling_group_name (str): Description
219 nsr_id (str): Description
220
221 """
222 pass
223
224 @abc.abstractmethod
225 def scale_out(self, scaling_group_name, nsr_id):
226 """Delegate called when all the criteria for scaling-out are met.
227
228 Args:
229 scaling_group_name (str): Description
230 nsr_id (str): Description
231 """
232 pass
233
234 def __init__(
235 self,
236 log,
237 dts,
238 loop,
239 nsr_id,
240 nsd_id,
241 scaling_group_name,
242 scaling_policy,
243 store,
244 delegate=None):
245 """
246
247 Args:
248 log : Log
249 dts : DTS handle
250 loop : Event loop
251 nsr_id (str): NSR id
252 nsd_id (str): NSD id
253 scaling_group_name (str): Scaling group ref
254 scaling_policy : Yang model
255 store (SubscriberStore): Subscriber store instance
256 delegate (None, optional): ScalingPolicy.Delegate
257 """
258 self.loop = loop
259 self.log = log
260 self.dts = dts
261 self.nsd_id = nsd_id
262 self.nsr_id = nsr_id
263 self.scaling_group_name = scaling_group_name
264
265 self._scaling_policy = scaling_policy
266 self.delegate = delegate
267 self.store = store
268
269 self.monp_sub = monp_subscriber.NsrMonParamSubscriber(
270 self.log,
271 self.dts,
272 self.loop,
273 self.nsr_id,
274 callback=self.handle_nsr_monp)
275
276 self.criteria_store = {}
277
278 # Timestamp at which the scale-in/scale-out request was generated.
279 self._last_triggered_time = None
280 self.scale_in_status = {cri.name: False for cri in self.scaling_criteria}
281 self.scale_out_status = {cri.name: False for cri in self.scaling_criteria}
282
283 def get_nsd_monp_cfg(self, nsr_monp):
284 """Get the NSD's mon-param config.
285 """
286 nsd = self.store.get_nsd(self.nsd_id)
287 for monp in nsd.monitoring_param:
288 if monp.id == nsr_monp.nsd_mon_param_ref:
289 return monp
290
291 def handle_nsr_monp(self, monp, action):
292 """Callback for NSR mon-param handler.
293
294 Args:
295 monp : Yang Model
296 action : rwdts.QueryAction
297
298 """
299 def handle_create():
300 if monp.id in self.criteria_store:
301 return
302
303 nsd_monp = self.get_nsd_monp_cfg(monp)
304 for cri in self.scaling_criteria:
305 if cri.ns_monitoring_param_ref != nsd_monp.id:
306 continue
307
308 # Create a criteria object as soon as the first monitoring data
309 # is published.
310 criteria = ScalingCriteria(
311 self.log,
312 self.dts,
313 self.loop,
314 self.nsr_id,
315 monp.id,
316 cri,
317 self.threshold_time, # window size
318 delegate=self)
319
320 self.criteria_store[monp.id] = criteria
321
322 @asyncio.coroutine
323 def task():
324 yield from criteria.register()
325
326 self.loop.create_task(task())
327
328 def handle_delete():
329 if monp.id in self.criteria_store:
330 self.criteria_store[monp.id].deregister()
331 del self.criteria_store[monp.id]
332
333 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
334 handle_create()
335 elif action == rwdts.QueryAction.DELETE:
336 handle_delete()
337
338
339 @property
340 def scaling_criteria(self):
341 return self._scaling_policy.scaling_criteria
342
343 @property
344 def scale_in_op(self):
345 optype = self._scaling_policy.scale_in_operation_type
346 return scaling_operation.get_operation(optype)
347
348 @property
349 def scale_out_op(self):
350 optype = self._scaling_policy.scale_out_operation_type
351 return scaling_operation.get_operation(optype)
352
353 @property
354 def name(self):
355 return self._scaling_policy.name
356
357 @property
358 def threshold_time(self):
359 return self._scaling_policy.threshold_time
360
361 @property
362 def cooldown_time(self):
363 return self._scaling_policy.cooldown_time
364
365 @asyncio.coroutine
366 def register(self):
367 yield from self.monp_sub.register()
368
369 def deregister(self):
370 self.monp_sub.deregister()
371
372 def _is_in_cooldown(self):
373 """Verify if the current policy is in cooldown.
374 """
375 if not self._last_triggered_time:
376 return False
377
378 if (time.time() - self._last_triggered_time) >= self.cooldown_time:
379 return False
380
381 return True
382
383 def threshold_in_breached(self, criteria_name, value):
384 """Delegate callback when scale-in threshold is breached
385
386 Args:
387 criteria_name : Criteria name
388 value : Average value
389 """
390 if self._is_in_cooldown():
391 return
392
393 self.scale_in_status[criteria_name] = True
394
395 statuses = self.scale_in_status.values()
396 is_breached = self.scale_in_op(statuses)
397
398 if is_breached and self.delegate:
399 self._last_triggered_time = time.time()
400 # Reset all statuses
401 self.scale_in_status = {cri.name: False for cri in self.scaling_criteria}
402 self.delegate.scale_in(self.scaling_group_name, self.nsr_id)
403
404 def threshold_out_breached(self, criteria_name, value):
405 """Delegate callback when scale-out threshold is breached.
406 Args:
407 criteria_name : Criteria name
408 value : Average value
409 """
410 if self._is_in_cooldown():
411 return
412
413 self.scale_out_status[criteria_name] = True
414
415 statuses = self.scale_out_status.values()
416 is_breached = self.scale_out_op(statuses)
417
418 if is_breached and self.delegate:
419 self._last_triggered_time = time.time()
420 # Reset all statuses
421 self.scale_out_status = {cri.name: False for cri in self.scaling_criteria}
422 self.delegate.scale_out(self.scaling_group_name, self.nsr_id)