Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwautoscaler / rift / tasklets / rwautoscaler / engine.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import abc
19 import asyncio
20 import time
21
22 import numpy
23
24 from . import scaling_operation
25 from . import subscribers as monp_subscriber
26 from gi.repository import RwDts as rwdts
27 import rift.mano.dts as subscriber
28
29
30 class TimeSeries:
31 """Convenience class to hold the data for the sliding window size.
32 """
33 def __init__(self, threshold_time):
34 """
35 Args:
36 threshold_time (int): window size in secs
37 """
38
39 # 0 -> contains a list of all timestamps
40 # 1 -> contains a list of all values.
41 self._series = numpy.empty(shape=(2, 1), dtype='int64')
42 self.threshold_time = threshold_time
43
44 def add_value(self, timestamp, value):
45 timestamp = int(timestamp)
46
47 self._series = numpy.append(
48 self._series,
49 [[timestamp], [value]],
50 axis=1)
51
52 # Drop off stale value
53 # 0 -> timestamp
54 # 1 -> values
55 # Get all indexes that are outside the window, and drop them
56 window_values = self._series[0] >= (timestamp - self.threshold_time)
57 self._series = self._series[:, window_values]
58
59 def average(self):
60 return numpy.average(self._series[1])
61
62 def is_window_full(self):
63 """Verify if there is sufficient data for the current window.
64 """
65 if len(self._series[0]) <= 2:
66 return False
67
68 start_time = self._series[0][0]
69 end_time = self._series[0][-1]
70
71 if (end_time - start_time) >= self.threshold_time:
72 return True
73
74 return False
75
76
77 class ScalingCriteria:
78 class Delegate:
79 """Delegate: callbacks triggered by ScalingCriteris
80 """
81 @abc.abstractmethod
82 def threshold_out_breached(self, criteria_name, avg_value):
83 """Called when the value has crossed the scale-out-threshold
84
85 Args:
86 criteria_name (str): Criteria name
87 avg_value (float): The average value of the window.
88
89 """
90 pass
91
92 @abc.abstractmethod
93 def threshold_in_breached(self, criteria_name, avg_value):
94 """Called when the value has drops below the scale-in-threshold
95
96 Args:
97 criteria_name (str): Criteria name
98 avg_value (float): The average value of the window.
99
100 """
101
102 pass
103
104 def __init__(
105 self,
106 log,
107 dts,
108 loop,
109 project,
110 nsr_id,
111 monp_id,
112 scaling_criteria,
113 window_size,
114 sampling_period=1,
115 delegate=None):
116 """
117 Args:
118 log : Log
119 dts : DTS handle
120 loop : Event Handle
121 nsr_id (str): NSR ID
122 monp_id (str): Monitoring parameter
123 scaling_criteria : Yang data model
124 window_size (int): Length of the window
125 delegate : ScalingCriteria.Delegate
126
127 Note:
128
129 """
130 self.log = log
131 self.dts = dts
132 self.loop = loop
133 self.sampling_period = sampling_period
134 self.window_size = window_size
135 self.delegate = delegate
136 self.nsr_id, self.monp_id = nsr_id, monp_id
137
138 self._scaling_criteria = scaling_criteria
139 self._timeseries = TimeSeries(self.window_size)
140 # Flag when set, triggers scale-in request.
141 self._scl_in_limit_enabled = False
142
143 self.nsr_monp_sub = monp_subscriber.NsrMonParamSubscriber(
144 self.log,
145 self.dts,
146 self.loop,
147 project,
148 self.nsr_id,
149 self.monp_id,
150 callback=self.add_value)
151
152 @property
153 def name(self):
154 return self._scaling_criteria.name
155
156 @property
157 def scale_in(self):
158 return self._scaling_criteria.scale_in_threshold
159
160 @property
161 def scale_out(self):
162 return self._scaling_criteria.scale_out_threshold
163
164 @asyncio.coroutine
165 def register(self):
166 yield from self.nsr_monp_sub.register()
167
168 def deregister(self):
169 self.nsr_monp_sub.deregister()
170
171 def trigger_action(self, timestamp, avg):
172 """Triggers the scale out/in
173
174 Args:
175 timestamp : time in unix epoch
176 avg : Average of all the values in the window size.
177
178 """
179 if self._timeseries.average() >= self.scale_out:
180 # Enable the scale in limit, only when a scale-out has happened.
181 self._scl_in_limit_enabled = True
182 self.delegate.threshold_out_breached(self.name, avg)
183
184 elif self._timeseries.average() < self.scale_in and self._scl_in_limit_enabled:
185 self._scl_in_limit_enabled = False
186 self.delegate.threshold_in_breached(self.name, avg)
187
188
189 def add_value(self, monp, action):
190 """Callback from NsrMonParamSubscriber
191
192 Args:
193 monp : Yang model
194 action : rwdts.QueryAction
195 """
196 if action == rwdts.QueryAction.DELETE:
197 return
198
199 value = monp.value_integer
200 timestamp = time.time()
201
202 self._timeseries.add_value(timestamp, value)
203
204 if not self._timeseries.is_window_full():
205 return
206
207 if not self.delegate:
208 return
209
210 self.trigger_action(timestamp, value)
211
212
213 class ScalingPolicy(ScalingCriteria.Delegate):
214 class Delegate:
215 @abc.abstractmethod
216 def scale_in(self, scaling_group_name, nsr_id):
217 """Delegate called when all the criteria for scaling-in are met.
218
219 Args:
220 scaling_group_name (str): Description
221 nsr_id (str): Description
222
223 """
224 pass
225
226 @abc.abstractmethod
227 def scale_out(self, scaling_group_name, nsr_id):
228 """Delegate called when all the criteria for scaling-out are met.
229
230 Args:
231 scaling_group_name (str): Description
232 nsr_id (str): Description
233 """
234 pass
235
236 def __init__(
237 self,
238 log,
239 dts,
240 loop,
241 project,
242 nsr_id,
243 nsd_id,
244 scaling_group_name,
245 scaling_policy,
246 store,
247 delegate=None):
248 """
249
250 Args:
251 log : Log
252 dts : DTS handle
253 loop : Event loop
254 nsr_id (str): NSR id
255 nsd_id (str): NSD id
256 scaling_group_name (str): Scaling group ref
257 scaling_policy : Yang model
258 store (SubscriberStore): Subscriber store instance
259 delegate (None, optional): ScalingPolicy.Delegate
260 """
261 self.loop = loop
262 self.log = log
263 self.dts = dts
264 self.project = project
265 self.nsd_id = nsd_id
266 self.nsr_id = nsr_id
267 self.scaling_group_name = scaling_group_name
268
269 self._scaling_policy = scaling_policy
270 self.delegate = delegate
271 self.store = store
272
273 self.monp_sub = monp_subscriber.NsrMonParamSubscriber(
274 self.log,
275 self.dts,
276 self.loop,
277 self.project,
278 self.nsr_id,
279 callback=self.handle_nsr_monp)
280
281 self.criteria_store = {}
282
283 # Timestamp at which the scale-in/scale-out request was generated.
284 self._last_triggered_time = None
285 self.scale_in_status = {cri.name: False for cri in self.scaling_criteria}
286 self.scale_out_status = {cri.name: False for cri in self.scaling_criteria}
287
288 def get_nsd_monp_cfg(self, nsr_monp):
289 """Get the NSD's mon-param config.
290 """
291 nsd = self.store.get_nsd(self.nsd_id)
292 for monp in nsd.monitoring_param:
293 if monp.id == nsr_monp.nsd_mon_param_ref:
294 return monp
295
296 def handle_nsr_monp(self, monp, action):
297 """Callback for NSR mon-param handler.
298
299 Args:
300 monp : Yang Model
301 action : rwdts.QueryAction
302
303 """
304 def handle_create():
305 if monp.id in self.criteria_store:
306 return
307
308 nsd_monp = self.get_nsd_monp_cfg(monp)
309 for cri in self.scaling_criteria:
310 if cri.ns_monitoring_param_ref != nsd_monp.id:
311 continue
312
313 # Create a criteria object as soon as the first monitoring data
314 # is published.
315 criteria = ScalingCriteria(
316 self.log,
317 self.dts,
318 self.loop,
319 self.project,
320 self.nsr_id,
321 monp.id,
322 cri,
323 self.threshold_time, # window size
324 delegate=self)
325
326 self.criteria_store[monp.id] = criteria
327
328 @asyncio.coroutine
329 def task():
330 yield from criteria.register()
331
332 self.loop.create_task(task())
333
334 def handle_delete():
335 if monp.id in self.criteria_store:
336 self.criteria_store[monp.id].deregister()
337 del self.criteria_store[monp.id]
338
339 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
340 handle_create()
341 elif action == rwdts.QueryAction.DELETE:
342 handle_delete()
343
344
345 @property
346 def scaling_criteria(self):
347 return self._scaling_policy.scaling_criteria
348
349 @property
350 def scale_in_op(self):
351 optype = self._scaling_policy.scale_in_operation_type
352 return scaling_operation.get_operation(optype)
353
354 @property
355 def scale_out_op(self):
356 optype = self._scaling_policy.scale_out_operation_type
357 return scaling_operation.get_operation(optype)
358
359 @property
360 def name(self):
361 return self._scaling_policy.name
362
363 @property
364 def threshold_time(self):
365 return self._scaling_policy.threshold_time
366
367 @property
368 def cooldown_time(self):
369 return self._scaling_policy.cooldown_time
370
371 @asyncio.coroutine
372 def register(self):
373 yield from self.monp_sub.register()
374
375 def deregister(self):
376 self.monp_sub.deregister()
377
378 def _is_in_cooldown(self):
379 """Verify if the current policy is in cooldown.
380 """
381 if not self._last_triggered_time:
382 return False
383
384 if (time.time() - self._last_triggered_time) >= self.cooldown_time:
385 return False
386
387 return True
388
389 def threshold_in_breached(self, criteria_name, value):
390 """Delegate callback when scale-in threshold is breached
391
392 Args:
393 criteria_name : Criteria name
394 value : Average value
395 """
396 if self._is_in_cooldown():
397 return
398
399 self.scale_in_status[criteria_name] = True
400
401 statuses = self.scale_in_status.values()
402 is_breached = self.scale_in_op(statuses)
403
404 if is_breached and self.delegate:
405 self._last_triggered_time = time.time()
406 # Reset all statuses
407 self.scale_in_status = {cri.name: False for cri in self.scaling_criteria}
408 self.delegate.scale_in(self.scaling_group_name, self.nsr_id)
409
410 def threshold_out_breached(self, criteria_name, value):
411 """Delegate callback when scale-out threshold is breached.
412 Args:
413 criteria_name : Criteria name
414 value : Average value
415 """
416 if self._is_in_cooldown():
417 return
418
419 self.scale_out_status[criteria_name] = True
420
421 statuses = self.scale_out_status.values()
422 is_breached = self.scale_out_op(statuses)
423
424 if is_breached and self.delegate:
425 self._last_triggered_time = time.time()
426 # Reset all statuses
427 self.scale_out_status = {cri.name: False for cri in self.scaling_criteria}
428 self.delegate.scale_out(self.scaling_group_name, self.nsr_id)