Merge "Revert "Functional spec for cloud-init support""
[osm/SO.git] / rwlaunchpad / plugins / rwmonparam / rift / tasklets / rwmonparam / nsr_core.py
1 """
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file nsr_core.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 09-Jul-2016
21
22 """
23
24 import asyncio
25 import functools
26 import uuid
27
28 from gi.repository import (RwDts as rwdts, NsrYang)
29 import rift.mano.dts as mano_dts
30
31 from . import aggregator as aggregator
32
33
34 class MissingValueField(Exception):
35 pass
36
37
38 class VnfrMonitoringParamSubscriber(mano_dts.AbstractOpdataSubscriber):
39 """Registers for VNFR monitoring parameter changes.
40
41 Attributes:
42 monp_id (str): Monitoring Param ID
43 vnfr_id (str): VNFR ID
44 """
45 def __init__(self, log, dts, loop, vnfr_id, monp_id, callback=None):
46 super().__init__(log, dts, loop, callback)
47 self.vnfr_id = vnfr_id
48 self.monp_id = monp_id
49
50 def get_xpath(self):
51 return("D,/vnfr:vnfr-catalog" +
52 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id) +
53 "/vnfr:monitoring-param" +
54 "[vnfr:id='{}']".format(self.monp_id))
55
56
57 class NsrMonitoringParam():
58 """Class that handles NS Mon-param data.
59 """
60 MonParamMsg = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_MonitoringParam
61 MISSING = None
62 DEFAULT_AGGREGATION_TYPE = "AVERAGE"
63
64 @classmethod
65 def create_nsr_mon_params(cls, nsd, constituent_vnfrs, store):
66 """Convenience class that constructs NSMonitoringParam objects
67
68 Args:
69 nsd (RwNsdYang.YangData_Nsd_NsdCatalog_Nsd): Nsd object
70 constituent_vnfrs (list): List of constituent vnfr objects of NSR
71 store (SubscriberStore): Store object instance
72
73 Returns:
74 list NsrMonitoringParam object.
75
76 Also handles legacy NSD descriptor which has no mon-param defines. In
77 such cases the mon-params are created from VNFD's mon-param config.
78 """
79 MonParamMsg = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_MonitoringParam
80
81 mon_params = []
82 for mon_param_msg in nsd.monitoring_param:
83 mon_params.append(NsrMonitoringParam(
84 mon_param_msg,
85 constituent_vnfrs
86 ))
87
88 # Legacy Handling.
89 # This indicates that the NSD had no mon-param config.
90 if not nsd.monitoring_param:
91 for vnfr in constituent_vnfrs:
92 vnfd = store.get_vnfd(vnfr.vnfd_ref)
93 for monp in vnfd.monitoring_param:
94 mon_params.append(NsrMonitoringParam(
95 monp,
96 [vnfr],
97 is_legacy=True))
98
99 return mon_params
100
101 def __init__(self, monp_config, constituent_vnfrs, is_legacy=False):
102 """
103 Args:
104 monp_config (GiObject): Config data to create the NSR mon-param msg
105 constituent_vnfrs (list): List of VNFRs that may contain the mon-param
106 is_legacy (bool, optional): If set then the mon-param are created from
107 vnfd's config and not NSD's config.
108 """
109 self._constituent_vnfr_map = {vnfr.id:vnfr for vnfr in constituent_vnfrs}
110
111 # An internal store to hold the data
112 # Key => (vnfr_id, monp_id)
113 # value => (value_type, value)
114 self.vnfr_monparams = {}
115
116 if not is_legacy:
117 self._msg = self._convert_nsd_msg(monp_config)
118 else:
119 self._msg = self._convert_vnfd_msg(monp_config)
120
121 @property
122 def nsr_mon_param_msg(self):
123 """Gi object msg"""
124 return self._msg
125
126 @property
127 def vnfr_ids(self):
128 """Store Keys"""
129 return list(self.vnfr_monparams.keys())
130
131 @property
132 def vnfr_values(self):
133 """Store values"""
134 return list(self.vnfr_monparams.values())
135
136 @property
137 def is_ready(self):
138 """Flag which indicates if all of the constituent vnfr values are
139 available to perform the aggregation"""
140 return (self.MISSING not in self.vnfr_values)
141
142 @property
143 def aggregation_type(self):
144 """Aggregation type"""
145 return self.nsr_mon_param_msg.aggregation_type
146
147 @property
148 def is_legacy(self):
149 return (self.aggregation_type is None)
150
151 @classmethod
152 def extract_value(cls, monp):
153 """Class method to extract the value type and value from the
154 mon-param gi message
155
156 Args:
157 monp (GiObject): Mon param msg
158
159 Returns:
160 Tuple: (value type, value)
161
162 Raises:
163 MissingValueField: Raised if no valid field are available.
164 """
165 if monp.has_field("value_integer"):
166 return ("value_integer", monp.value_integer)
167 elif monp.has_field("value_decimal"):
168 return ("value_decimal", monp.value_decimal)
169 elif monp.has_field("value_string"):
170 return ("value_string", monp.value_string)
171
172 return None
173
174 def _constituent_vnfrs(self, constituent_vnfr_ids):
175 # Fetch the VNFRs
176 vnfr_map = {}
177 for constituent_vnfr in constituent_vnfr_ids:
178 vnfr_id = constituent_vnfr.vnfr_id
179 vnfr_map[vnfr_id] = self._store.get_vnfr(vnfr_id)
180
181 return vnfr_map
182
183 def _extract_ui_elements(self, monp):
184 ui_fields = ["group_tag", "description", "widget_type", "units", "value_type"]
185 ui_data = [getattr(monp, ui_field) for ui_field in ui_fields]
186
187 return dict(zip(ui_fields, ui_data))
188
189
190 def _convert_nsd_msg(self, nsd_monp):
191 """Create initial msg without values"""
192 vnfd_to_vnfr = {vnfr.vnfd_ref: vnfr_id
193 for vnfr_id, vnfr in self._constituent_vnfr_map.items()}
194
195 # First, convert the monp param ref from vnfd to vnfr terms.
196 vnfr_mon_param_ref = []
197 for vnfd_mon in nsd_monp.vnfd_monitoring_param:
198 vnfr_id = vnfd_to_vnfr[vnfd_mon.vnfd_id_ref]
199 monp_id = vnfd_mon.vnfd_monitoring_param_ref
200
201 self.vnfr_monparams[(vnfr_id, monp_id)] = self.MISSING
202
203 vnfr_mon_param_ref.append({
204 'vnfr_id_ref': vnfr_id,
205 'vnfr_mon_param_ref': monp_id
206 })
207
208 monp_fields = {
209 # For now both the NSD and NSR's monp ID are same.
210 'id': nsd_monp.id,
211 'name': nsd_monp.name,
212 'nsd_mon_param_ref': nsd_monp.id,
213 'vnfr_mon_param_ref': vnfr_mon_param_ref,
214 'aggregation_type': nsd_monp.aggregation_type
215 }
216
217 ui_fields = self._extract_ui_elements(nsd_monp)
218 monp_fields.update(ui_fields)
219 monp = self.MonParamMsg.from_dict(monp_fields)
220
221 return monp
222
223 def _convert_vnfd_msg(self, vnfd_monp):
224
225 vnfr = list(self._constituent_vnfr_map.values())[0]
226 self.vnfr_monparams[(vnfr.id, vnfd_monp.id)] = self.MISSING
227
228 monp_data = {
229 'id': str(uuid.uuid1()),
230 'name': vnfd_monp.name,
231 'vnfr_mon_param_ref': [{
232 'vnfr_id_ref': vnfr.id,
233 'vnfr_mon_param_ref': vnfd_monp.id
234 }]
235 }
236
237 ui_fields = self._extract_ui_elements(vnfd_monp)
238 monp_data.update(ui_fields)
239 monp = self.MonParamMsg.from_dict(monp_data)
240
241 return monp
242
243 def update_vnfr_value(self, key, value):
244 """Update the internal store
245
246 Args:
247 key (Tuple): (vnfr_id, monp_id)
248 value (Tuple): (value_type, value)
249 """
250 self.vnfr_monparams[key] = value
251
252 def update_ns_value(self, value_field, value):
253 """Updates the NS mon-param data with the aggregated value.
254
255 Args:
256 value_field (str): Value field in NSR
257 value : Aggregated value
258 """
259 setattr(self.nsr_mon_param_msg, value_field, value)
260
261
262 class NsrMonitoringParamPoller(mano_dts.DtsHandler):
263 """Handler responsible for publishing NS level monitoring
264 parameters.
265
266 Design:
267 1. Created subscribers for each vnfr's monitoring parameter
268 2. Accumulates the VNFR's value into the NsrMonitoringParam's internal
269 store.
270 3. Once all values are available, aggregate the value and triggers
271 callback notification to the subscribers.
272 """
273 @classmethod
274 def from_handler(cls, handler, monp, callback):
275 """Convenience class to build NsrMonitoringParamPoller object.
276 """
277 return cls(handler.log, handler.dts, handler.loop, monp, callback)
278
279 def __init__(self, log, dts, loop, monp, callback=None):
280 """
281 Args:
282 monp (NsrMonitoringParam): Param object
283 callback (None, optional): Callback to be triggered after value has
284 been aggregated.
285 """
286 super().__init__(log, dts, loop)
287
288 self.monp = monp
289 self.subscribers = []
290 self.callback = callback
291 self._agg = None
292
293 def make_aggregator(self, field_types):
294 if not self._agg:
295 self._agg = aggregator.make_aggregator(field_types)
296 return self._agg
297
298
299 def update_value(self, monp, action, vnfr_id):
300 """Callback that gets triggered when VNFR's mon param changes.
301
302 Args:
303 monp (Gi Object): Gi object msg
304 action (rwdts.QueryAction)): Action type
305 vnfr_id (str): Vnfr ID
306 """
307 key = (vnfr_id, monp.id)
308 value = NsrMonitoringParam.extract_value(monp)
309
310 if not value:
311 return
312
313 # Accumulate the value
314 self.monp.update_vnfr_value(key, value)
315
316 # If all values are not available, then don't start
317 # the aggregation process.
318 if not self.monp.is_ready:
319 return
320
321 if self.monp.is_legacy:
322 # If no monp are specified then copy over the vnfr's monp data
323 value_field, value = value
324 else:
325 field_types, values = zip(*self.monp.vnfr_values)
326
327 value_field, value = self.make_aggregator(field_types).aggregate(
328 self.monp.aggregation_type,
329 values)
330
331 self.monp.update_ns_value(value_field, value)
332 if self.callback:
333 self.callback(self.monp.nsr_mon_param_msg)
334
335 @asyncio.coroutine
336 def register(self):
337 for vnfr_id, monp_id in self.monp.vnfr_ids:
338 callback = functools.partial(self.update_value, vnfr_id=vnfr_id)
339 self.subscribers.append(VnfrMonitoringParamSubscriber(
340 self.loop, self.dts, self.loop, vnfr_id, monp_id, callback=callback))
341
342 @asyncio.coroutine
343 def start(self):
344 for sub in self.subscribers:
345 yield from sub.register()
346
347 def stop(self):
348 for sub in self.subscribers:
349 sub.deregister()
350
351
352 class NsrMonitorDtsHandler(mano_dts.DtsHandler):
353 """ NSR monitoring class """
354
355 def __init__(self, log, dts, loop, nsr, constituent_vnfrs, store):
356 """
357 Args:
358 nsr (RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr): NSR object
359 constituent_vnfrs (list): list of VNFRs in NSR
360 store (SubscriberStore): Store instance
361 """
362 super().__init__(log, dts, loop)
363
364 self.nsr = nsr
365 self.store = store
366 self.constituent_vnfrs = constituent_vnfrs
367 self.mon_params_pollers = []
368
369 def xpath(self, param_id=None):
370 return ("D,/nsr:ns-instance-opdata/nsr:nsr" +
371 "[nsr:ns-instance-config-ref='{}']".format(self.nsr.ns_instance_config_ref) +
372 "/nsr:monitoring-param" +
373 ("[nsr:id='{}']".format(param_id) if param_id else ""))
374
375 @asyncio.coroutine
376 def register(self):
377 self.reg = yield from self.dts.register(xpath=self.xpath(),
378 flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
379
380 assert self.reg is not None
381
382 def callback(self, nsr_mon_param_msg):
383 """Callback that triggers update.
384 """
385 self.reg.update_element(
386 self.xpath(param_id=nsr_mon_param_msg.id),
387 nsr_mon_param_msg)
388
389 @asyncio.coroutine
390 def start(self):
391 nsd = self.store.get_nsd(self.nsr.nsd_ref)
392 mon_params = NsrMonitoringParam.create_nsr_mon_params(
393 nsd,
394 self.constituent_vnfrs,
395 self.store)
396
397 for monp in mon_params:
398 poller = NsrMonitoringParamPoller.from_handler(
399 self,
400 monp,
401 callback=self.callback)
402
403 self.mon_params_pollers.append(poller)
404 yield from poller.register()
405 yield from poller.start()
406
407 def stop(self):
408 self.deregister()
409 for poller in self.mon_params_pollers:
410 poller.stop()
411
412
413 def deregister(self):
414 """ de-register with dts """
415 if self.reg is not None:
416 self.reg.deregister()
417 self.reg = None