Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwmonparam / rift / tasklets / rwmonparam / nsr_core.py
1 """
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file nsr_core.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 09-Jul-2016
21
22 """
23
24 import asyncio
25 import functools
26 import uuid
27
28 from gi.repository import (RwDts as rwdts, NsrYang)
29 import rift.mano.dts as mano_dts
30
31 from . import aggregator as aggregator
32
33
34 class MissingValueField(Exception):
35 pass
36
37
38 class VnfrMonitoringParamSubscriber(mano_dts.AbstractOpdataSubscriber):
39 """Registers for VNFR monitoring parameter changes.
40
41 Attributes:
42 monp_id (str): Monitoring Param ID
43 vnfr_id (str): VNFR ID
44 """
45 def __init__(self, log, dts, loop, project, vnfr_id, monp_id, callback=None):
46 super().__init__(log, dts, loop, project, callback)
47 self.vnfr_id = vnfr_id
48 self.monp_id = monp_id
49
50 def get_xpath(self):
51 return self.project.add_project(("D,/vnfr:vnfr-catalog" +
52 "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id) +
53 "/vnfr:monitoring-param" +
54 "[vnfr:id='{}']".format(self.monp_id)))
55
56
57 class NsrMonitoringParam():
58 """Class that handles NS Mon-param data.
59 """
60 MonParamMsg = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_MonitoringParam
61 MISSING = None
62 DEFAULT_AGGREGATION_TYPE = "AVERAGE"
63
64 @classmethod
65 def create_nsr_mon_params(cls, nsd, constituent_vnfrs, store):
66 """Convenience class that constructs NSMonitoringParam objects
67
68 Args:
69 nsd (RwNsdYang.YangData_RwProject_Project_NsdCatalog_Nsd): Nsd object
70 constituent_vnfrs (list): List of constituent vnfr objects of NSR
71 store (SubscriberStore): Store object instance
72
73 Returns:
74 list NsrMonitoringParam object.
75
76 Also handles legacy NSD descriptor which has no mon-param defines. In
77 such cases the mon-params are created from VNFD's mon-param config.
78 """
79 mon_params = []
80 for mon_param_msg in nsd.monitoring_param:
81 mon_params.append(NsrMonitoringParam(
82 mon_param_msg,
83 constituent_vnfrs
84 ))
85
86 # Legacy Handling.
87 # This indicates that the NSD had no mon-param config.
88 if not nsd.monitoring_param:
89 for vnfr in constituent_vnfrs:
90 vnfd = store.get_vnfd(vnfr.vnfd.id)
91 for monp in vnfd.monitoring_param:
92 mon_params.append(NsrMonitoringParam(
93 monp,
94 [vnfr],
95 is_legacy=True))
96
97 return mon_params
98
99 def __init__(self, monp_config, constituent_vnfrs, is_legacy=False):
100 """
101 Args:
102 monp_config (GiObject): Config data to create the NSR mon-param msg
103 constituent_vnfrs (list): List of VNFRs that may contain the mon-param
104 is_legacy (bool, optional): If set then the mon-param are created from
105 vnfd's config and not NSD's config.
106 """
107 self._constituent_vnfr_map = {vnfr.id:vnfr for vnfr in constituent_vnfrs}
108
109 # An internal store to hold the data
110 # Key => (vnfr_id, monp_id)
111 # value => (value_type, value)
112 self.vnfr_monparams = {}
113
114 # create_nsr_mon_params() is already validating for 'is_legacy' by checking if
115 # nsd is having 'monitoring_param'. So removing 'self.aggregation_type is None' check for is_legacy.
116 self.is_legacy = is_legacy
117
118 if not is_legacy:
119 self._msg = self._convert_nsd_msg(monp_config)
120 else:
121 self._msg = self._convert_vnfd_msg(monp_config)
122
123 @property
124 def nsr_mon_param_msg(self):
125 """Gi object msg"""
126 return self._msg
127
128 @property
129 def vnfr_ids(self):
130 """Store Keys"""
131 return list(self.vnfr_monparams.keys())
132
133 @property
134 def vnfr_values(self):
135 """Store values"""
136 return list(self.vnfr_monparams.values())
137
138 @property
139 def is_ready(self):
140 """Flag which indicates if all of the constituent vnfr values are
141 available to perform the aggregation"""
142 return (self.MISSING not in self.vnfr_values)
143
144 @property
145 def aggregation_type(self):
146 """Aggregation type"""
147 return self.nsr_mon_param_msg.aggregation_type
148
149 # @property
150 # def is_legacy(self):
151 # return (self.aggregation_type is None)
152
153 @classmethod
154 def extract_value(cls, monp):
155 """Class method to extract the value type and value from the
156 mon-param gi message
157
158 Args:
159 monp (GiObject): Mon param msg
160
161 Returns:
162 Tuple: (value type, value)
163
164 Raises:
165 MissingValueField: Raised if no valid field are available.
166 """
167 if monp.has_field("value_integer"):
168 return ("value_integer", monp.value_integer)
169 elif monp.has_field("value_decimal"):
170 return ("value_decimal", monp.value_decimal)
171 elif monp.has_field("value_string"):
172 return ("value_string", monp.value_string)
173
174 return None
175
176 def _constituent_vnfrs(self, constituent_vnfr_ids):
177 # Fetch the VNFRs
178 vnfr_map = {}
179 for constituent_vnfr in constituent_vnfr_ids:
180 vnfr_id = constituent_vnfr.vnfr_id
181 vnfr_map[vnfr_id] = self._store.get_vnfr(vnfr_id)
182
183 return vnfr_map
184
185 def _extract_ui_elements(self, monp):
186 ui_fields = ["group_tag", "description", "widget_type", "units", "value_type"]
187 ui_data = [getattr(monp, ui_field) for ui_field in ui_fields]
188
189 return dict(zip(ui_fields, ui_data))
190
191
192 def _convert_nsd_msg(self, nsd_monp):
193 """Create initial msg without values"""
194 vnfd_to_vnfr = {vnfr.vnfd.id: vnfr_id
195 for vnfr_id, vnfr in self._constituent_vnfr_map.items()}
196
197 # First, convert the monp param ref from vnfd to vnfr terms.
198 vnfr_mon_param_ref = []
199 for vnfd_mon in nsd_monp.vnfd_monitoring_param:
200 vnfr_id = vnfd_to_vnfr[vnfd_mon.vnfd_id_ref]
201 monp_id = vnfd_mon.vnfd_monitoring_param_ref
202
203 self.vnfr_monparams[(vnfr_id, monp_id)] = self.MISSING
204
205 vnfr_mon_param_ref.append({
206 'vnfr_id_ref': vnfr_id,
207 'vnfr_mon_param_ref': monp_id
208 })
209
210 monp_fields = {
211 # For now both the NSD and NSR's monp ID are same.
212 'id': nsd_monp.id,
213 'name': nsd_monp.name,
214 'nsd_mon_param_ref': nsd_monp.id,
215 'vnfr_mon_param_ref': vnfr_mon_param_ref,
216 'aggregation_type': nsd_monp.aggregation_type
217 }
218
219 ui_fields = self._extract_ui_elements(nsd_monp)
220 monp_fields.update(ui_fields)
221 monp = self.MonParamMsg.from_dict(monp_fields)
222
223 return monp
224
225 def _convert_vnfd_msg(self, vnfd_monp):
226
227 vnfr = list(self._constituent_vnfr_map.values())[0]
228 self.vnfr_monparams[(vnfr.id, vnfd_monp.id)] = self.MISSING
229
230 monp_data = {
231 'id': str(uuid.uuid1()),
232 'name': vnfd_monp.name,
233 'vnfr_mon_param_ref': [{
234 'vnfr_id_ref': vnfr.id,
235 'vnfr_mon_param_ref': vnfd_monp.id
236 }]
237 }
238
239 ui_fields = self._extract_ui_elements(vnfd_monp)
240 monp_data.update(ui_fields)
241 monp = self.MonParamMsg.from_dict(monp_data)
242
243 return monp
244
245 def update_vnfr_value(self, key, value):
246 """Update the internal store
247
248 Args:
249 key (Tuple): (vnfr_id, monp_id)
250 value (Tuple): (value_type, value)
251 """
252 self.vnfr_monparams[key] = value
253
254 def update_ns_value(self, value_field, value):
255 """Updates the NS mon-param data with the aggregated value.
256
257 Args:
258 value_field (str): Value field in NSR
259 value : Aggregated value
260 """
261 setattr(self.nsr_mon_param_msg, value_field, value)
262
263
264 class NsrMonitoringParamPoller(mano_dts.DtsHandler):
265 """Handler responsible for publishing NS level monitoring
266 parameters.
267
268 Design:
269 1. Created subscribers for each vnfr's monitoring parameter
270 2. Accumulates the VNFR's value into the NsrMonitoringParam's internal
271 store.
272 3. Once all values are available, aggregate the value and triggers
273 callback notification to the subscribers.
274 """
275 @classmethod
276 def from_handler(cls, handler, monp, callback):
277 """Convenience class to build NsrMonitoringParamPoller object.
278 """
279 return cls(handler.log, handler.dts, handler.loop, handler.project,
280 monp, callback)
281
282 def __init__(self, log, dts, loop, project, monp, callback=None):
283 """
284 Args:
285 monp (NsrMonitoringParam): Param object
286 callback (None, optional): Callback to be triggered after value has
287 been aggregated.
288 """
289 super().__init__(log, dts, loop, project)
290
291 self.monp = monp
292 self.subscribers = []
293 self.callback = callback
294 self._agg = None
295
296 def make_aggregator(self, field_types):
297 if not self._agg:
298 self._agg = aggregator.make_aggregator(field_types)
299 return self._agg
300
301
302 def update_value(self, monp, action, vnfr_id):
303 """Callback that gets triggered when VNFR's mon param changes.
304
305 Args:
306 monp (Gi Object): Gi object msg
307 action (rwdts.QueryAction)): Action type
308 vnfr_id (str): Vnfr ID
309 """
310 key = (vnfr_id, monp.id)
311 value = NsrMonitoringParam.extract_value(monp)
312
313 if not value:
314 return
315
316 # Accumulate the value
317 self.monp.update_vnfr_value(key, value)
318
319 # If all values are not available, then don't start
320 # the aggregation process.
321 if not self.monp.is_ready:
322 return
323
324 if self.monp.is_legacy:
325 # If no monp are specified then copy over the vnfr's monp data
326 value_field, value = value
327 else:
328 field_types, values = zip(*self.monp.vnfr_values)
329
330 value_field, value = self.make_aggregator(field_types).aggregate(
331 self.monp.aggregation_type,
332 values)
333
334 self.monp.update_ns_value(value_field, value)
335 if self.callback:
336 self.callback(self.monp.nsr_mon_param_msg)
337
338 @asyncio.coroutine
339 def register(self):
340 for vnfr_id, monp_id in self.monp.vnfr_ids:
341 callback = functools.partial(self.update_value, vnfr_id=vnfr_id)
342 self.subscribers.append(VnfrMonitoringParamSubscriber(
343 self.loop, self.dts, self.loop, self.project,
344 vnfr_id, monp_id, callback=callback))
345
346 @asyncio.coroutine
347 def start(self):
348 for sub in self.subscribers:
349 yield from sub.register()
350
351 def stop(self):
352 for sub in self.subscribers:
353 sub.deregister()
354
355
356 class NsrMonitorDtsHandler(mano_dts.DtsHandler):
357 """ NSR monitoring class """
358
359 def __init__(self, log, dts, loop, project, nsr, constituent_vnfrs, store):
360 """
361 Args:
362 nsr (RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr): NSR object
363 constituent_vnfrs (list): list of VNFRs in NSR
364 store (SubscriberStore): Store instance
365 """
366 super().__init__(log, dts, loop, project)
367
368 self.nsr = nsr
369 self.store = store
370 self.constituent_vnfrs = constituent_vnfrs
371 self.mon_params_pollers = []
372
373 def xpath(self, param_id=None):
374 return self.project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr" +
375 "[nsr:ns-instance-config-ref='{}']".format(self.nsr.ns_instance_config_ref) +
376 "/nsr:monitoring-param" +
377 ("[nsr:id='{}']".format(param_id) if param_id else ""))
378
379 @asyncio.coroutine
380 def register(self):
381 self.reg = yield from self.dts.register(xpath=self.xpath(),
382 flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
383
384 assert self.reg is not None
385
386 def callback(self, nsr_mon_param_msg):
387 """Callback that triggers update.
388 """
389 self.reg.update_element(
390 self.xpath(param_id=nsr_mon_param_msg.id),
391 nsr_mon_param_msg)
392
393 @asyncio.coroutine
394 def start(self):
395 nsd = self.store.get_nsd(self.nsr.nsd_ref)
396 mon_params = NsrMonitoringParam.create_nsr_mon_params(
397 nsd,
398 self.constituent_vnfrs,
399 self.store)
400
401 for monp in mon_params:
402 poller = NsrMonitoringParamPoller.from_handler(
403 self,
404 monp,
405 callback=self.callback)
406
407 self.mon_params_pollers.append(poller)
408 yield from poller.register()
409 yield from poller.start()
410
411 def stop(self):
412 self.deregister()
413 for poller in self.mon_params_pollers:
414 poller.stop()
415
416
417 def deregister(self):
418 """ de-register with dts """
419 if self.reg is not None:
420 self.reg.deregister()
421 self.reg = None