Merge changes I9a6c5927,I055eabb8,I81efc98d,Icc5b7830
[osm/SO.git] / rwlaunchpad / plugins / rwmonparam / rift / tasklets / rwmonparam / nsr_core.py
1 """
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file nsr_core.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 09-Jul-2016
21
22 """
23
24 import asyncio
25 import functools
26 import uuid
27
28 from gi.repository import (RwDts as rwdts, NsrYang)
29 import rift.mano.dts as mano_dts
30
31 from . import aggregator as aggregator
32
33
34 class MissingValueField(Exception):
35 pass
36
37
38 class VnfrMonitoringParamSubscriber(mano_dts.AbstractOpdataSubscriber):
39 """Registers for VNFR monitoring parameter changes.
40
41 Attributes:
42 monp_id (str): Monitoring Param ID
43 vnfr_id (str): VNFR ID
44 """
45 def __init__(self, log, dts, loop, vnfr_id, monp_id, callback=None):
46 super().__init__(log, dts, loop, callback)
47 self.vnfr_id = vnfr_id
48 self.monp_id = monp_id
49
50 def get_xpath(self):
51 return("D,/vnfr:vnfr-catalog" +
52 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id) +
53 "/vnfr:monitoring-param" +
54 "[vnfr:id='{}']".format(self.monp_id))
55
56
57 class NsrMonitoringParam():
58 """Class that handles NS Mon-param data.
59 """
60 MonParamMsg = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_MonitoringParam
61 MISSING = None
62 DEFAULT_AGGREGATION_TYPE = "AVERAGE"
63
64 @classmethod
65 def create_nsr_mon_params(cls, nsd, constituent_vnfrs, store):
66 """Convenience class that constructs NSMonitoringParam objects
67
68 Args:
69 nsd (RwNsdYang.YangData_Nsd_NsdCatalog_Nsd): Nsd object
70 constituent_vnfrs (list): List of constituent vnfr objects of NSR
71 store (SubscriberStore): Store object instance
72
73 Returns:
74 list NsrMonitoringParam object.
75
76 Also handles legacy NSD descriptor which has no mon-param defines. In
77 such cases the mon-params are created from VNFD's mon-param config.
78 """
79 MonParamMsg = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_MonitoringParam
80
81 mon_params = []
82 for mon_param_msg in nsd.monitoring_param:
83 mon_params.append(NsrMonitoringParam(
84 mon_param_msg,
85 constituent_vnfrs
86 ))
87
88 # Legacy Handling.
89 # This indicates that the NSD had no mon-param config.
90 if not nsd.monitoring_param:
91 for vnfr in constituent_vnfrs:
92 vnfd = store.get_vnfd(vnfr.vnfd_ref)
93 for monp in vnfd.monitoring_param:
94 mon_params.append(NsrMonitoringParam(
95 monp,
96 [vnfr],
97 is_legacy=True))
98
99 return mon_params
100
101 def __init__(self, monp_config, constituent_vnfrs, is_legacy=False):
102 """
103 Args:
104 monp_config (GiObject): Config data to create the NSR mon-param msg
105 constituent_vnfrs (list): List of VNFRs that may contain the mon-param
106 is_legacy (bool, optional): If set then the mon-param are created from
107 vnfd's config and not NSD's config.
108 """
109 self._constituent_vnfr_map = {vnfr.id:vnfr for vnfr in constituent_vnfrs}
110
111 # An internal store to hold the data
112 # Key => (vnfr_id, monp_id)
113 # value => (value_type, value)
114 self.vnfr_monparams = {}
115
116 # create_nsr_mon_params() is already validating for 'is_legacy' by checking if
117 # nsd is having 'monitoring_param'. So removing 'self.aggregation_type is None' check for is_legacy.
118 self.is_legacy = is_legacy
119
120 if not is_legacy:
121 self._msg = self._convert_nsd_msg(monp_config)
122 else:
123 self._msg = self._convert_vnfd_msg(monp_config)
124
125 @property
126 def nsr_mon_param_msg(self):
127 """Gi object msg"""
128 return self._msg
129
130 @property
131 def vnfr_ids(self):
132 """Store Keys"""
133 return list(self.vnfr_monparams.keys())
134
135 @property
136 def vnfr_values(self):
137 """Store values"""
138 return list(self.vnfr_monparams.values())
139
140 @property
141 def is_ready(self):
142 """Flag which indicates if all of the constituent vnfr values are
143 available to perform the aggregation"""
144 return (self.MISSING not in self.vnfr_values)
145
146 @property
147 def aggregation_type(self):
148 """Aggregation type"""
149 return self.nsr_mon_param_msg.aggregation_type
150
151 # @property
152 # def is_legacy(self):
153 # return (self.aggregation_type is None)
154
155 @classmethod
156 def extract_value(cls, monp):
157 """Class method to extract the value type and value from the
158 mon-param gi message
159
160 Args:
161 monp (GiObject): Mon param msg
162
163 Returns:
164 Tuple: (value type, value)
165
166 Raises:
167 MissingValueField: Raised if no valid field are available.
168 """
169 if monp.has_field("value_integer"):
170 return ("value_integer", monp.value_integer)
171 elif monp.has_field("value_decimal"):
172 return ("value_decimal", monp.value_decimal)
173 elif monp.has_field("value_string"):
174 return ("value_string", monp.value_string)
175
176 return None
177
178 def _constituent_vnfrs(self, constituent_vnfr_ids):
179 # Fetch the VNFRs
180 vnfr_map = {}
181 for constituent_vnfr in constituent_vnfr_ids:
182 vnfr_id = constituent_vnfr.vnfr_id
183 vnfr_map[vnfr_id] = self._store.get_vnfr(vnfr_id)
184
185 return vnfr_map
186
187 def _extract_ui_elements(self, monp):
188 ui_fields = ["group_tag", "description", "widget_type", "units", "value_type"]
189 ui_data = [getattr(monp, ui_field) for ui_field in ui_fields]
190
191 return dict(zip(ui_fields, ui_data))
192
193
194 def _convert_nsd_msg(self, nsd_monp):
195 """Create initial msg without values"""
196 vnfd_to_vnfr = {vnfr.vnfd_ref: vnfr_id
197 for vnfr_id, vnfr in self._constituent_vnfr_map.items()}
198
199 # First, convert the monp param ref from vnfd to vnfr terms.
200 vnfr_mon_param_ref = []
201 for vnfd_mon in nsd_monp.vnfd_monitoring_param:
202 vnfr_id = vnfd_to_vnfr[vnfd_mon.vnfd_id_ref]
203 monp_id = vnfd_mon.vnfd_monitoring_param_ref
204
205 self.vnfr_monparams[(vnfr_id, monp_id)] = self.MISSING
206
207 vnfr_mon_param_ref.append({
208 'vnfr_id_ref': vnfr_id,
209 'vnfr_mon_param_ref': monp_id
210 })
211
212 monp_fields = {
213 # For now both the NSD and NSR's monp ID are same.
214 'id': nsd_monp.id,
215 'name': nsd_monp.name,
216 'nsd_mon_param_ref': nsd_monp.id,
217 'vnfr_mon_param_ref': vnfr_mon_param_ref,
218 'aggregation_type': nsd_monp.aggregation_type
219 }
220
221 ui_fields = self._extract_ui_elements(nsd_monp)
222 monp_fields.update(ui_fields)
223 monp = self.MonParamMsg.from_dict(monp_fields)
224
225 return monp
226
227 def _convert_vnfd_msg(self, vnfd_monp):
228
229 vnfr = list(self._constituent_vnfr_map.values())[0]
230 self.vnfr_monparams[(vnfr.id, vnfd_monp.id)] = self.MISSING
231
232 monp_data = {
233 'id': str(uuid.uuid1()),
234 'name': vnfd_monp.name,
235 'vnfr_mon_param_ref': [{
236 'vnfr_id_ref': vnfr.id,
237 'vnfr_mon_param_ref': vnfd_monp.id
238 }]
239 }
240
241 ui_fields = self._extract_ui_elements(vnfd_monp)
242 monp_data.update(ui_fields)
243 monp = self.MonParamMsg.from_dict(monp_data)
244
245 return monp
246
247 def update_vnfr_value(self, key, value):
248 """Update the internal store
249
250 Args:
251 key (Tuple): (vnfr_id, monp_id)
252 value (Tuple): (value_type, value)
253 """
254 self.vnfr_monparams[key] = value
255
256 def update_ns_value(self, value_field, value):
257 """Updates the NS mon-param data with the aggregated value.
258
259 Args:
260 value_field (str): Value field in NSR
261 value : Aggregated value
262 """
263 setattr(self.nsr_mon_param_msg, value_field, value)
264
265
266 class NsrMonitoringParamPoller(mano_dts.DtsHandler):
267 """Handler responsible for publishing NS level monitoring
268 parameters.
269
270 Design:
271 1. Created subscribers for each vnfr's monitoring parameter
272 2. Accumulates the VNFR's value into the NsrMonitoringParam's internal
273 store.
274 3. Once all values are available, aggregate the value and triggers
275 callback notification to the subscribers.
276 """
277 @classmethod
278 def from_handler(cls, handler, monp, callback):
279 """Convenience class to build NsrMonitoringParamPoller object.
280 """
281 return cls(handler.log, handler.dts, handler.loop, monp, callback)
282
283 def __init__(self, log, dts, loop, monp, callback=None):
284 """
285 Args:
286 monp (NsrMonitoringParam): Param object
287 callback (None, optional): Callback to be triggered after value has
288 been aggregated.
289 """
290 super().__init__(log, dts, loop)
291
292 self.monp = monp
293 self.subscribers = []
294 self.callback = callback
295 self._agg = None
296
297 def make_aggregator(self, field_types):
298 if not self._agg:
299 self._agg = aggregator.make_aggregator(field_types)
300 return self._agg
301
302
303 def update_value(self, monp, action, vnfr_id):
304 """Callback that gets triggered when VNFR's mon param changes.
305
306 Args:
307 monp (Gi Object): Gi object msg
308 action (rwdts.QueryAction)): Action type
309 vnfr_id (str): Vnfr ID
310 """
311 key = (vnfr_id, monp.id)
312 value = NsrMonitoringParam.extract_value(monp)
313
314 if not value:
315 return
316
317 # Accumulate the value
318 self.monp.update_vnfr_value(key, value)
319
320 # If all values are not available, then don't start
321 # the aggregation process.
322 if not self.monp.is_ready:
323 return
324
325 if self.monp.is_legacy:
326 # If no monp are specified then copy over the vnfr's monp data
327 value_field, value = value
328 else:
329 field_types, values = zip(*self.monp.vnfr_values)
330
331 value_field, value = self.make_aggregator(field_types).aggregate(
332 self.monp.aggregation_type,
333 values)
334
335 self.monp.update_ns_value(value_field, value)
336 if self.callback:
337 self.callback(self.monp.nsr_mon_param_msg)
338
339 @asyncio.coroutine
340 def register(self):
341 for vnfr_id, monp_id in self.monp.vnfr_ids:
342 callback = functools.partial(self.update_value, vnfr_id=vnfr_id)
343 self.subscribers.append(VnfrMonitoringParamSubscriber(
344 self.loop, self.dts, self.loop, vnfr_id, monp_id, callback=callback))
345
346 @asyncio.coroutine
347 def start(self):
348 for sub in self.subscribers:
349 yield from sub.register()
350
351 def stop(self):
352 for sub in self.subscribers:
353 sub.deregister()
354
355
356 class NsrMonitorDtsHandler(mano_dts.DtsHandler):
357 """ NSR monitoring class """
358
359 def __init__(self, log, dts, loop, nsr, constituent_vnfrs, store):
360 """
361 Args:
362 nsr (RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr): NSR object
363 constituent_vnfrs (list): list of VNFRs in NSR
364 store (SubscriberStore): Store instance
365 """
366 super().__init__(log, dts, loop)
367
368 self.nsr = nsr
369 self.store = store
370 self.constituent_vnfrs = constituent_vnfrs
371 self.mon_params_pollers = []
372
373 def xpath(self, param_id=None):
374 return ("D,/nsr:ns-instance-opdata/nsr:nsr" +
375 "[nsr:ns-instance-config-ref='{}']".format(self.nsr.ns_instance_config_ref) +
376 "/nsr:monitoring-param" +
377 ("[nsr:id='{}']".format(param_id) if param_id else ""))
378
379 @asyncio.coroutine
380 def register(self):
381 self.reg = yield from self.dts.register(xpath=self.xpath(),
382 flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
383
384 assert self.reg is not None
385
386 def callback(self, nsr_mon_param_msg):
387 """Callback that triggers update.
388 """
389 self.reg.update_element(
390 self.xpath(param_id=nsr_mon_param_msg.id),
391 nsr_mon_param_msg)
392
393 @asyncio.coroutine
394 def start(self):
395 nsd = self.store.get_nsd(self.nsr.nsd_ref)
396 mon_params = NsrMonitoringParam.create_nsr_mon_params(
397 nsd,
398 self.constituent_vnfrs,
399 self.store)
400
401 for monp in mon_params:
402 poller = NsrMonitoringParamPoller.from_handler(
403 self,
404 monp,
405 callback=self.callback)
406
407 self.mon_params_pollers.append(poller)
408 yield from poller.register()
409 yield from poller.start()
410
411 def stop(self):
412 self.deregister()
413 for poller in self.mon_params_pollers:
414 poller.stop()
415
416
417 def deregister(self):
418 """ de-register with dts """
419 if self.reg is not None:
420 self.reg.deregister()
421 self.reg = None