update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwautoscaler / rift / tasklets / rwautoscaler / engine.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import abc
19 import asyncio
20 import time
21
22 import numpy
23
24 from . import scaling_operation
25 from . import subscribers as monp_subscriber
26 from gi.repository import RwDts as rwdts
27 import rift.mano.dts as subscriber
28
29
30 class TimeSeries:
31 """Convenience class to hold the data for the sliding window size.
32 """
33 def __init__(self, threshold_time):
34 """
35 Args:
36 threshold_time (int): window size in secs
37 """
38
39 # 0 -> contains a list of all timestamps
40 # 1 -> contains a list of all values.
41 # self._series = numpy.empty(shape=(2, 1), dtype='int64')
42 self._series = numpy.array([[],[]], dtype='int64')
43 self.threshold_time = threshold_time
44
45 def add_value(self, timestamp, value):
46 timestamp = int(timestamp)
47
48 self._series = numpy.append(
49 self._series,
50 [[timestamp], [value]],
51 axis=1)
52
53 # Drop off stale value
54 # 0 -> timestamp
55 # 1 -> values
56 # Get all indexes that are outside the window, and drop them
57 window_values = self._series[0] >= (timestamp - self.threshold_time)
58 self._series = self._series[:, window_values]
59
60 def average(self):
61 return numpy.average(self._series[1])
62
63 def is_window_full(self):
64 """Verify if there is sufficient data for the current window.
65 """
66 if len(self._series[0]) < 2:
67 return False
68
69 start_time = self._series[0][0]
70 end_time = self._series[0][-1]
71
72 if (end_time - start_time) >= self.threshold_time:
73 return True
74
75 return False
76
77
78 class ScalingCriteria:
79 class Delegate:
80 """Delegate: callbacks triggered by ScalingCriteris
81 """
82 @abc.abstractmethod
83 def threshold_out_breached(self, criteria_name, avg_value):
84 """Called when the value has crossed the scale-out-threshold
85
86 Args:
87 criteria_name (str): Criteria name
88 avg_value (float): The average value of the window.
89
90 """
91 pass
92
93 @abc.abstractmethod
94 def threshold_in_breached(self, criteria_name, avg_value):
95 """Called when the value has drops below the scale-in-threshold
96
97 Args:
98 criteria_name (str): Criteria name
99 avg_value (float): The average value of the window.
100
101 """
102
103 pass
104
105 def __init__(
106 self,
107 log,
108 dts,
109 loop,
110 project,
111 nsr_id,
112 monp_id,
113 scaling_criteria,
114 window_size,
115 sampling_period=1,
116 delegate=None):
117 """
118 Args:
119 log : Log
120 dts : DTS handle
121 loop : Event Handle
122 nsr_id (str): NSR ID
123 monp_id (str): Monitoring parameter
124 scaling_criteria : Yang data model
125 window_size (int): Length of the window
126 delegate : ScalingCriteria.Delegate
127
128 Note:
129
130 """
131 self.log = log
132 self.dts = dts
133 self.loop = loop
134 self.sampling_period = sampling_period
135 self.window_size = window_size
136 self.delegate = delegate
137 self.nsr_id, self.monp_id = nsr_id, monp_id
138
139 self._scaling_criteria = scaling_criteria
140 self._timeseries = TimeSeries(self.window_size)
141 # Flag when set, triggers scale-in request.
142 self._scl_in_limit_enabled = False
143
144 self.nsr_monp_sub = monp_subscriber.NsrMonParamSubscriber(
145 self.log,
146 self.dts,
147 self.loop,
148 project,
149 self.nsr_id,
150 self.monp_id,
151 callback=self.add_value)
152
153 @property
154 def name(self):
155 return self._scaling_criteria.name
156
157 @property
158 def scale_in(self):
159 return self._scaling_criteria.scale_in_threshold
160
161 @property
162 def scale_out(self):
163 return self._scaling_criteria.scale_out_threshold
164
165 @asyncio.coroutine
166 def register(self):
167 yield from self.nsr_monp_sub.register()
168
169 def deregister(self):
170 self.nsr_monp_sub.deregister()
171
172 def trigger_action(self, timestamp, avg):
173 """Triggers the scale out/in
174
175 Args:
176 timestamp : time in unix epoch
177 avg : Average of all the values in the window size.
178
179 """
180 if self._timeseries.average() >= self.scale_out:
181 self.log.info("Triggering a scaling-out request for the criteria {}".format(
182 self.name))
183 self.delegate.threshold_out_breached(self.name, avg)
184
185 elif self._timeseries.average() < self.scale_in :
186 self.log.info("Triggering a scaling-in request for the criteria {}".format(
187 self.name))
188 self.delegate.threshold_in_breached(self.name, avg)
189
190
191 def add_value(self, monp, action):
192 """Callback from NsrMonParamSubscriber
193
194 Args:
195 monp : Yang model
196 action : rwdts.QueryAction
197 """
198 if action == rwdts.QueryAction.DELETE:
199 return
200
201 value = monp.value_integer
202 timestamp = time.time()
203
204 self._timeseries.add_value(timestamp, value)
205
206 if not self._timeseries.is_window_full():
207 return
208
209 self.log.debug("Sufficient sampling data obtained for criteria {}."
210 "Checking the scaling condition for the criteria".format(
211 self.name))
212
213 if not self.delegate:
214 return
215
216 self.trigger_action(timestamp, value)
217
218
219 class ScalingPolicy(ScalingCriteria.Delegate):
220 class Delegate:
221 @abc.abstractmethod
222 def scale_in(self, scaling_group_name, nsr_id, instance_id):
223 """Delegate called when all the criteria for scaling-in are met.
224
225 Args:
226 scaling_group_name (str): Description
227 nsr_id (str): Description
228
229 """
230 pass
231
232 @abc.abstractmethod
233 def scale_out(self, scaling_group_name, nsr_id):
234 """Delegate called when all the criteria for scaling-out are met.
235
236 Args:
237 scaling_group_name (str): Description
238 nsr_id (str): Description
239 """
240 pass
241
242 def __init__(
243 self,
244 log,
245 dts,
246 loop,
247 project,
248 nsr_id,
249 nsd_id,
250 scaling_group_name,
251 scaling_policy,
252 store,
253 delegate=None):
254 """
255
256 Args:
257 log : Log
258 dts : DTS handle
259 loop : Event loop
260 nsr_id (str): NSR id
261 nsd_id (str): NSD id
262 scaling_group_name (str): Scaling group ref
263 scaling_policy : Yang model
264 store (SubscriberStore): Subscriber store instance
265 delegate (None, optional): ScalingPolicy.Delegate
266 """
267 self.loop = loop
268 self.log = log
269 self.dts = dts
270 self.project = project
271 self.nsd_id = nsd_id
272 self.nsr_id = nsr_id
273 self.scaling_group_name = scaling_group_name
274
275 self._scaling_policy = scaling_policy
276 self.delegate = delegate
277 self.store = store
278
279 self.monp_sub = monp_subscriber.NsrMonParamSubscriber(
280 self.log,
281 self.dts,
282 self.loop,
283 self.project,
284 self.nsr_id,
285 callback=self.handle_nsr_monp)
286
287 self.nsr_scale_sub = monp_subscriber.NsrScalingGroupRecordSubscriber(
288 self.log,
289 self.dts,
290 self.loop,
291 self.project,
292 self.nsr_id,
293 self.scaling_group_name)
294
295 self.criteria_store = {}
296
297 # Timestamp at which the scale-in/scale-out request was generated.
298 self._last_triggered_time = None
299 self.scale_in_status = {cri.name: False for cri in self.scaling_criteria}
300 self.scale_out_status = {cri.name: False for cri in self.scaling_criteria}
301 self.scale_out_count = 0
302
303 def get_nsd_monp_cfg(self, nsr_monp):
304 """Get the NSD's mon-param config.
305 """
306 nsd = self.store.get_nsd(self.nsd_id)
307 for monp in nsd.monitoring_param:
308 if monp.id == nsr_monp.nsd_mon_param_ref:
309 return monp
310
311 def handle_nsr_monp(self, monp, action):
312 """Callback for NSR mon-param handler.
313
314 Args:
315 monp : Yang Model
316 action : rwdts.QueryAction
317
318 """
319 def handle_create():
320 if monp.id in self.criteria_store:
321 return
322
323 nsd_monp = self.get_nsd_monp_cfg(monp)
324 for cri in self.scaling_criteria:
325 if cri.ns_monitoring_param_ref != nsd_monp.id:
326 continue
327
328 # Create a criteria object as soon as the first monitoring data
329 # is published.
330 self.log.debug("Created a ScalingCriteria monitor for {}".format(
331 cri.as_dict()))
332
333 criteria = ScalingCriteria(
334 self.log,
335 self.dts,
336 self.loop,
337 self.project,
338 self.nsr_id,
339 monp.id,
340 cri,
341 self.threshold_time, # window size
342 delegate=self)
343
344 self.criteria_store[monp.id] = criteria
345
346 @asyncio.coroutine
347 def task():
348 yield from criteria.register()
349
350 self.loop.create_task(task())
351
352 def handle_delete():
353 if monp.id in self.criteria_store:
354 self.criteria_store[monp.id].deregister()
355 del self.criteria_store[monp.id]
356
357 if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]:
358 handle_create()
359 elif action == rwdts.QueryAction.DELETE:
360 handle_delete()
361
362
363 @property
364 def scaling_criteria(self):
365 return self._scaling_policy.scaling_criteria
366
367 @property
368 def scale_in_op(self):
369 optype = self._scaling_policy.scale_in_operation_type
370 return scaling_operation.get_operation(optype)
371
372 @property
373 def scale_out_op(self):
374 optype = self._scaling_policy.scale_out_operation_type
375 return scaling_operation.get_operation(optype)
376
377 @property
378 def name(self):
379 return self._scaling_policy.name
380
381 @property
382 def threshold_time(self):
383 return self._scaling_policy.threshold_time
384
385 @property
386 def cooldown_time(self):
387 return self._scaling_policy.cooldown_time
388
389 @asyncio.coroutine
390 def register(self):
391 yield from self.monp_sub.register()
392 yield from self.nsr_scale_sub.register()
393
394 def deregister(self):
395 self.monp_sub.deregister()
396
397 def _is_in_cooldown(self):
398 """Verify if the current policy is in cooldown.
399 """
400 if not self._last_triggered_time:
401 return False
402
403 if (time.time() - self._last_triggered_time) >= self.cooldown_time:
404 return False
405
406 return True
407
408 def can_trigger_action(self):
409 if self._is_in_cooldown():
410 self.log.debug("In cooldown phase ignoring the scale action ")
411 return False
412
413 return True
414
415
416 def threshold_in_breached(self, criteria_name, value):
417 """Delegate callback when scale-in threshold is breached
418
419 Args:
420 criteria_name : Criteria name
421 value : Average value
422 """
423 self.log.debug("Avg value {} has fallen below the threshold limit for "
424 "{}".format(value, criteria_name))
425
426 if not self.can_trigger_action():
427 return
428
429 if self.scale_out_count < 1:
430 self.log.debug('There is no scaled-out VNFs at this point. Hence ignoring the scale-in')
431 return
432
433 self.scale_in_status[criteria_name] = True
434 self.log.info("Applying {} operation to check if all criteria {} for"
435 " scale-in-threshold are met".format(
436 self.scale_out_op,
437 self.scale_out_status))
438
439 statuses = self.scale_in_status.values()
440 is_breached = self.scale_in_op(statuses)
441
442 if is_breached and self.delegate:
443 self.log.info("Triggering a scale-in action for policy {} as "
444 "all criteria have been met".format(self.name))
445
446 @asyncio.coroutine
447 def check_and_scale_in():
448 # data = yield from self.nsr_scale_sub.data()
449 # if len(data) <= 1:
450 # return
451
452 # # Get an instance ID
453 # instance_id = data[-1].instance_id
454
455 instance_id = 0 #assigning a value to follow existing scale_in signature
456 self._last_triggered_time = time.time()
457 self.scale_out_count -= 1
458 # Reset all statuses
459 self.scale_in_status = {cri.name: False for cri in self.scaling_criteria}
460 self.delegate.scale_in(self.scaling_group_name, self.nsr_id, instance_id)
461
462 self.loop.create_task(check_and_scale_in())
463
464 def threshold_out_breached(self, criteria_name, value):
465 """Delegate callback when scale-out threshold is breached.
466 Args:
467 criteria_name : Criteria name
468 value : Average value
469 """
470 self.log.debug("Avg value {} has gone above the threshold limit for "
471 "{}".format(value, criteria_name))
472
473 if not self.can_trigger_action():
474 return
475
476 self.scale_out_status[criteria_name] = True
477
478 self.log.info("Applying {} operation to check if all criteria {} for"
479 " scale-out-threshold are met".format(
480 self.scale_out_op,
481 self.scale_out_status))
482
483 statuses = self.scale_out_status.values()
484 is_breached = self.scale_out_op(statuses)
485
486 if is_breached and self.delegate:
487 self.log.info("Triggering a scale-out action for policy {} as "
488 "all criteria have been met".format(self.name))
489 self._last_triggered_time = time.time()
490 self.scale_out_count += 1
491 # Reset all statuses
492 self.scale_out_status = {cri.name: False for cri in self.scaling_criteria}
493 self.delegate.scale_out(self.scaling_group_name, self.nsr_id)