update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwmonparam / rift / tasklets / rwmonparam / nsr_core.py
1 """
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file nsr_core.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 09-Jul-2016
21
22 """
23 import asyncio
24 import collections
25 import functools
26 import gi
27 import uuid
28
29 import rift.tasklets
30
31 from gi.repository import (RwDts as rwdts, NsrYang)
32 import rift.mano.dts as mano_dts
33 gi.require_version('RwKeyspec', '1.0')
34 from gi.repository.RwKeyspec import quoted_key
35
36 from . import aggregator as aggregator
37
38
39 class MissingValueField(Exception):
40 pass
41
42
43 class VnfrMonitoringParamSubscriber(mano_dts.AbstractOpdataSubscriber):
44 """Registers for VNFR monitoring parameter changes.
45
46 Attributes:
47 monp_id (str): Monitoring Param ID
48 vnfr_id (str): VNFR ID
49 """
50 def __init__(self, log, dts, loop, project, vnfr_id, monp_id, callback=None):
51 super().__init__(log, dts, loop, project, callback)
52 self.vnfr_id = vnfr_id
53 self.monp_id = monp_id
54
55 def get_xpath(self):
56 return self.project.add_project(("D,/vnfr:vnfr-catalog" +
57 "/vnfr:vnfr[vnfr:id={}]".format(quoted_key(self.vnfr_id)) +
58 "/vnfr:monitoring-param" +
59 "[vnfr:id={}]".format(quoted_key(self.monp_id))))
60
61
62 class NsrMonitoringParam():
63 """Class that handles NS Mon-param data.
64 """
65 MonParamMsg = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_MonitoringParam
66 MISSING = None
67 DEFAULT_AGGREGATION_TYPE = "AVERAGE"
68
69 @classmethod
70 def create_nsr_mon_params(cls, nsd, constituent_vnfrs, mon_param_project):
71 """Convenience class that constructs NSMonitoringParam objects
72
73 Args:
74 nsd (RwNsdYang.YangData_RwProject_Project_NsdCatalog_Nsd): Nsd object
75 constituent_vnfrs (list): List of constituent vnfr objects of NSR
76 mon_param_project (MonParamProject): Store object instance
77
78 Returns:
79 list NsrMonitoringParam object.
80
81 Also handles legacy NSD descriptor which has no mon-param defines. In
82 such cases the mon-params are created from VNFD's mon-param config.
83 """
84 mon_params = []
85 for mon_param_msg in nsd.monitoring_param:
86 mon_params.append(NsrMonitoringParam(
87 mon_param_msg,
88 constituent_vnfrs,
89 mon_param_name=mon_param_msg.name
90 ))
91
92 # Legacy Handling.
93 # This indicates that the NSD had no mon-param config.
94 if not nsd.monitoring_param:
95 for vnfr in constituent_vnfrs:
96 vnfd = mon_param_project.get_vnfd(vnfr.vnfd.id)
97 for monp in vnfd.monitoring_param:
98 mon_params.append(NsrMonitoringParam(
99 monp,
100 [vnfr],
101 is_legacy=True,
102 mon_param_name=monp.name))
103
104 return mon_params
105
106 def __init__(self, monp_config, constituent_vnfrs, is_legacy=False, mon_param_name=None):
107 """
108 Args:
109 monp_config (GiObject): Config data to create the NSR mon-param msg
110 constituent_vnfrs (list): List of VNFRs that may contain the mon-param
111 is_legacy (bool, optional): If set then the mon-param are created from
112 vnfd's config and not NSD's config.
113 """
114 self._nsd_mon_param_msg = monp_config
115 self._constituent_vnfr_map = {vnfr.id:vnfr for vnfr in constituent_vnfrs}
116
117 # An internal store to hold the data
118 # Key => (vnfr_id, monp_id)
119 # value => (value_type, value)
120 self.vnfr_monparams = {}
121
122 # create_nsr_mon_params() is already validating for 'is_legacy' by checking if
123 # nsd is having 'monitoring_param'. So removing 'self.aggregation_type is None' check for is_legacy.
124 self.is_legacy = is_legacy
125 self.mon_param_name = mon_param_name
126
127 if not is_legacy:
128 self._msg = self._convert_nsd_msg()
129 else:
130 # TODO remove arg for consistency
131 self._msg = self._convert_vnfd_msg(monp_config)
132
133 def add_vnfr(self, vnfr):
134 # If already added ignore
135 if vnfr.id in self._constituent_vnfr_map:
136 return
137
138 # Update the map
139 self._constituent_vnfr_map[vnfr.id] = vnfr
140
141 if not self.is_legacy:
142 self._msg = self._convert_nsd_msg()
143
144 def delete_vnfr(self, vnfr):
145 # Update the map
146 if vnfr.id in self._constituent_vnfr_map:
147 del self._constituent_vnfr_map[vnfr.id]
148
149 # Delete the value stores.
150 for vnfr_id, monp_id in list(self.vnfr_monparams.keys()):
151 if vnfr_id == vnfr.id:
152 del self.vnfr_monparams[(vnfr_id, monp_id)]
153
154 if not self.is_legacy:
155 self._msg = self._convert_nsd_msg()
156
157 @property
158 def nsd_mon_param_msg(self):
159 return self._nsd_mon_param_msg
160
161 @property
162 def nsr_mon_param_msg(self):
163 """Gi object msg"""
164 return self._msg
165
166 @property
167 def vnfr_ids(self):
168 """Store Keys"""
169 return list(self.vnfr_monparams.keys())
170
171 @property
172 def vnfr_values(self):
173 """Store values"""
174 return list(self.vnfr_monparams.values())
175
176 @property
177 def is_ready(self):
178 """Flag which indicates if all of the constituent vnfr values are
179 available to perform the aggregation"""
180 return (self.MISSING not in self.vnfr_values)
181
182 @property
183 def aggregation_type(self):
184 """Aggregation type"""
185 return self.nsr_mon_param_msg.aggregation_type
186
187 # @property
188 # def is_legacy(self):
189 # return (self.aggregation_type is None)
190
191 @classmethod
192 def extract_value(cls, monp):
193 """Class method to extract the value type and value from the
194 mon-param gi message
195
196 Args:
197 monp (GiObject): Mon param msg
198
199 Returns:
200 Tuple: (value type, value)
201
202 Raises:
203 MissingValueField: Raised if no valid field are available.
204 """
205 if monp.has_field("value_integer"):
206 return ("value_integer", monp.value_integer)
207 elif monp.has_field("value_decimal"):
208 return ("value_decimal", monp.value_decimal)
209 elif monp.has_field("value_string"):
210 return ("value_string", monp.value_string)
211
212 return None
213
214
215 def _extract_ui_elements(self, monp):
216 ui_fields = ["group_tag", "description", "widget_type", "units", "value_type"]
217 ui_data = [getattr(monp, ui_field) for ui_field in ui_fields]
218
219 return dict(zip(ui_fields, ui_data))
220
221
222 def _convert_nsd_msg(self):
223 """Create/update msg. This is also called when a new VNFR is added."""
224
225 # For a single VNFD there might be multiple vnfrs
226 vnfd_to_vnfr = collections.defaultdict(list)
227 for vnfr_id, vnfr in self._constituent_vnfr_map.items():
228 vnfd_to_vnfr[vnfr.vnfd.id].append(vnfr_id)
229
230 # First, convert the monp param ref from vnfd to vnfr terms.
231 vnfr_mon_param_ref = []
232 for vnfd_mon in self.nsd_mon_param_msg.vnfd_monitoring_param:
233 vnfr_ids = vnfd_to_vnfr[vnfd_mon.vnfd_id_ref]
234 monp_id = vnfd_mon.vnfd_monitoring_param_ref
235
236 for vnfr_id in vnfr_ids:
237 key = (vnfr_id, monp_id)
238 if key not in self.vnfr_monparams:
239 self.vnfr_monparams[key] = self.MISSING
240
241 vnfr_mon_param_ref.append({
242 'vnfr_id_ref': vnfr_id,
243 'vnfr_mon_param_ref': monp_id
244 })
245
246 monp_fields = {
247 # For now both the NSD and NSR's monp ID are same.
248 'id': self.nsd_mon_param_msg.id,
249 'name': self.nsd_mon_param_msg.name,
250 'nsd_mon_param_ref': self.nsd_mon_param_msg.id,
251 'vnfr_mon_param_ref': vnfr_mon_param_ref,
252 'aggregation_type': self.nsd_mon_param_msg.aggregation_type
253 }
254
255 ui_fields = self._extract_ui_elements(self.nsd_mon_param_msg)
256 monp_fields.update(ui_fields)
257 monp = self.MonParamMsg.from_dict(monp_fields)
258
259 return monp
260
261 def _convert_vnfd_msg(self, vnfd_monp):
262
263 vnfr = list(self._constituent_vnfr_map.values())[0]
264 self.vnfr_monparams[(vnfr.id, vnfd_monp.id)] = self.MISSING
265
266 monp_data = {
267 'id': str(uuid.uuid1()),
268 'name': vnfd_monp.name,
269 'vnfr_mon_param_ref': [{
270 'vnfr_id_ref': vnfr.id,
271 'vnfr_mon_param_ref': vnfd_monp.id
272 }]
273 }
274
275 ui_fields = self._extract_ui_elements(vnfd_monp)
276 monp_data.update(ui_fields)
277 monp = self.MonParamMsg.from_dict(monp_data)
278
279 return monp
280
281 def update_vnfr_value(self, key, value):
282 """Update the internal store
283
284 Args:
285 key (Tuple): (vnfr_id, monp_id)
286 value (Tuple): (value_type, value)
287 """
288 self.vnfr_monparams[key] = value
289
290
291 def update_ns_value(self, value_field, value):
292 """Updates the NS mon-param data with the aggregated value.
293
294 Args:
295 value_field (str): Value field in NSR
296 value : Aggregated value
297 """
298 setattr(self.nsr_mon_param_msg, value_field, value)
299
300
301 class NsrMonitoringParamPoller(mano_dts.DtsHandler):
302 """Handler responsible for publishing NS level monitoring
303 parameters.
304
305 Design:
306 1. Created subscribers for each vnfr's monitoring parameter
307 2. Accumulates the VNFR's value into the NsrMonitoringParam's internal
308 store.
309 3. Once all values are available, aggregate the value and triggers
310 callback notification to the subscribers.
311 """
312 @classmethod
313 def from_handler(cls, handler, monp, callback):
314 """Convenience class to build NsrMonitoringParamPoller object.
315 """
316 return cls(handler.log, handler.dts, handler.loop, handler.project,
317 monp, callback)
318
319 def __init__(self, log, dts, loop, project, monp, callback=None):
320 """
321 Args:
322 monp (NsrMonitoringParam): Param object
323 callback (None, optional): Callback to be triggered after value has
324 been aggregated.
325 """
326 super().__init__(log, dts, loop, project)
327
328 self.monp = monp
329 self.subscribers = {}
330 self.callback = callback
331 self._agg = None
332
333 def make_aggregator(self, field_types):
334 if not self._agg:
335 self._agg = aggregator.make_aggregator(field_types)
336 return self._agg
337
338
339 def update_value(self, monp, action, vnfr_id):
340 """Callback that gets triggered when VNFR's mon param changes.
341
342 Args:
343 monp (Gi Object): Gi object msg
344 action (rwdts.QueryAction)): Action type
345 vnfr_id (str): Vnfr ID
346 """
347 key = (vnfr_id, monp.id)
348 value = NsrMonitoringParam.extract_value(monp)
349 if not value:
350 return
351
352 # Accumulate the value
353 self.monp.update_vnfr_value(key, value)
354
355 # If all values are not available, then don't start
356 # the aggregation process.
357 if not self.monp.is_ready:
358 return
359
360 if self.monp.is_legacy:
361 # If no monp are specified then copy over the vnfr's monp data
362 value_field, value = value
363 else:
364 field_types, values = zip(*self.monp.vnfr_values)
365
366 value_field, value = self.make_aggregator(field_types).aggregate(
367 self.monp.aggregation_type,
368 values)
369
370 self.monp.update_ns_value(value_field, value)
371 if self.callback:
372 self.callback(self.monp.nsr_mon_param_msg)
373
374 @asyncio.coroutine
375 def create_pollers(self, create=False, register=False):
376 if (create):
377 for vnfr_id, monp_id in self.monp.vnfr_ids:
378 key = (vnfr_id, monp_id)
379 callback = functools.partial(self.update_value, vnfr_id=vnfr_id)
380
381 # if the poller is already created, ignore
382 if key in self.subscribers:
383 continue
384
385 self.subscribers[key] = VnfrMonitoringParamSubscriber(
386 self.loop,
387 self.dts,
388 self.loop,
389 self.project,
390 vnfr_id,
391 monp_id,
392 callback=callback)
393
394 if register:
395 yield from self.subscribers[key].register()
396
397 @asyncio.coroutine
398 def update(self, vnfr):
399 self.monp.add_vnfr(vnfr)
400 yield from self.create_pollers(create=False, register=True)
401
402 @asyncio.coroutine
403 def delete(self, vnfr):
404 self.monp.delete_vnfr(vnfr)
405 for vnfr_id, monp_id in list(self.subscribers.keys()):
406 if vnfr_id != vnfr.id:
407 continue
408
409 key = (vnfr_id, monp_id)
410 sub = self.subscribers.pop(key)
411 sub.deregister()
412
413
414 @asyncio.coroutine
415 def register(self):
416 yield from self.create_pollers()
417
418 @asyncio.coroutine
419 def start(self):
420 for sub in self.subscribers.values():
421 yield from sub.register()
422
423 def stop(self):
424 for sub in self.subscribers.values():
425 sub.deregister()
426
427 def retrieve_data(self):
428 return self.monp.nsr_mon_param_msg
429
430 class NsrMonitorDtsHandler(mano_dts.DtsHandler):
431 """ NSR monitoring class """
432
433 def __init__(self, log, dts, loop, project, nsr, constituent_vnfrs):
434 """
435 Args:
436 nsr (RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr): NSR object
437 constituent_vnfrs (list): list of VNFRs in NSR
438 """
439 super().__init__(log, dts, loop, project)
440
441 self.nsr = nsr
442 self.constituent_vnfrs = constituent_vnfrs
443 self.dts_updates = dict()
444 self.dts_update_task = None
445 self.mon_params_pollers = []
446
447 def nsr_xpath(self):
448 return self.project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr" +
449 "[nsr:ns-instance-config-ref={}]".format(quoted_key(self.nsr.ns_instance_config_ref)))
450
451 def xpath(self, param_id=None):
452 return self.project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr" +
453 "[nsr:ns-instance-config-ref={}]".format(quoted_key(self.nsr.ns_instance_config_ref)) +
454 "/nsr:monitoring-param" +
455 ("[nsr:id={}]".format(quoted_key(param_id)) if param_id else ""))
456
457 @asyncio.coroutine
458 def register(self):
459 @asyncio.coroutine
460 def on_prepare(xact_info, query_action, ks_path, msg):
461 nsrmsg =None
462 xpath=None
463 if (self.reg_ready):
464 if (query_action == rwdts.QueryAction.READ):
465 if (len(self.mon_params_pollers)):
466 nsr_dict = {"ns_instance_config_ref": self.nsr.ns_instance_config_ref}
467 nsrmsg = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr. \
468 from_dict(nsr_dict)
469 xpath = self.nsr_xpath()
470
471 for poller in self.mon_params_pollers:
472 mp_dict = \
473 NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_MonitoringParam. \
474 from_dict(poller.retrieve_data().as_dict())
475 nsrmsg.monitoring_param.append(mp_dict)
476
477 try:
478 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
479 xpath=self.nsr_xpath(),
480 msg=nsrmsg)
481 except rift.tasklets.dts.ResponseError:
482 pass
483
484 @asyncio.coroutine
485 def on_ready(regh, status):
486 self.reg_ready = 1
487
488 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
489 self.reg_ready = 0
490
491 self.reg = yield from self.dts.register(xpath=self.xpath(),
492 flags=rwdts.Flag.PUBLISHER,
493 handler=handler)
494
495 assert self.reg is not None
496
497 @asyncio.coroutine
498 def nsr_monparam_update(self):
499 #check if the earlier xact is done or there is an xact
500 try:
501 if (len(self.dts_updates) == 0):
502 self.dts_update_task = None
503 return
504 nsr_dict = {"ns_instance_config_ref": self.nsr.ns_instance_config_ref}
505 nsrmsg = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.from_dict(nsr_dict)
506
507 for k,v in self.dts_updates.items():
508 mp_dict = NsrYang. \
509 YangData_RwProject_Project_NsInstanceOpdata_Nsr_MonitoringParam. \
510 from_dict(v.as_dict())
511 nsrmsg.monitoring_param.append(mp_dict)
512 self.dts_updates.clear()
513
514 yield from self.dts.query_update(self.nsr_xpath(), rwdts.XactFlag.ADVISE,
515 nsrmsg)
516
517 self.dts_update_task = None
518 if (len(self.dts_updates) == 0):
519 #schedule a DTS task to update the NSR again
520 self.add_dtsupdate_task()
521
522 except Exception as e:
523 self.log.exception("Exception updating NSR mon-param: %s", str(e))
524
525 def add_dtsupdate_task(self):
526 if (self.dts_update_task is None):
527 self.dts_update_task = asyncio.ensure_future(self.nsr_monparam_update(), loop=self.loop)
528
529 def callback(self, nsr_mon_param_msg):
530 """Callback that triggers update.
531 """
532 self.dts_updates[nsr_mon_param_msg.id] = nsr_mon_param_msg
533 #schedule a DTS task to update the NSR if one does not exist
534 self.add_dtsupdate_task()
535
536 @asyncio.coroutine
537 def start(self):
538 nsd = self.project.get_nsd(self.nsr.nsd_ref)
539
540 mon_params = NsrMonitoringParam.create_nsr_mon_params(
541 nsd,
542 self.constituent_vnfrs,
543 self.project)
544
545 for monp in mon_params:
546 poller = NsrMonitoringParamPoller.from_handler(
547 self,
548 monp,
549 callback=self.callback)
550
551 self.mon_params_pollers.append(poller)
552 yield from poller.register()
553 yield from poller.start()
554
555 @asyncio.coroutine
556 def update(self, additional_vnfrs):
557 for vnfr in additional_vnfrs:
558 for poller in self.mon_params_pollers:
559 yield from poller.update(vnfr)
560
561 @asyncio.coroutine
562 def delete(self, deleted_vnfrs):
563 for vnfr in deleted_vnfrs:
564 for poller in self.mon_params_pollers:
565 yield from poller.delete(vnfr)
566
567 def stop(self):
568 self.deregister()
569 for poller in self.mon_params_pollers:
570 poller.stop()
571
572
573 def deregister(self):
574 """ de-register with dts """
575 if self.reg is not None:
576 self.reg.deregister()
577 self.reg = None
578
579 def apply_vnfr_mon(self, msg, vnfr_id):
580 """ Change in vnfr mon to ne applied"""
581 for poller in self.mon_params_pollers:
582 if (poller.monp.mon_param_name == msg.name):
583 poller.update_value(msg, rwdts.QueryAction.UPDATE, vnfr_id)