3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 @author Varun Prasad (varun.prasad@riftio.com)
31 from gi
.repository
import (RwDts
as rwdts
, NsrYang
)
32 import rift
.mano
.dts
as mano_dts
33 gi
.require_version('RwKeyspec', '1.0')
34 from gi
.repository
.RwKeyspec
import quoted_key
36 from . import aggregator
as aggregator
39 class MissingValueField(Exception):
43 class VnfrMonitoringParamSubscriber(mano_dts
.AbstractOpdataSubscriber
):
44 """Registers for VNFR monitoring parameter changes.
47 monp_id (str): Monitoring Param ID
48 vnfr_id (str): VNFR ID
50 def __init__(self
, log
, dts
, loop
, project
, vnfr_id
, monp_id
, callback
=None):
51 super().__init
__(log
, dts
, loop
, project
, callback
)
52 self
.vnfr_id
= vnfr_id
53 self
.monp_id
= monp_id
56 return self
.project
.add_project(("D,/vnfr:vnfr-catalog" +
57 "/vnfr:vnfr[vnfr:id={}]".format(quoted_key(self
.vnfr_id
)) +
58 "/vnfr:monitoring-param" +
59 "[vnfr:id={}]".format(quoted_key(self
.monp_id
))))
62 class NsrMonitoringParam():
63 """Class that handles NS Mon-param data.
65 MonParamMsg
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_MonitoringParam
67 DEFAULT_AGGREGATION_TYPE
= "AVERAGE"
70 def create_nsr_mon_params(cls
, nsd
, constituent_vnfrs
, mon_param_project
):
71 """Convenience class that constructs NSMonitoringParam objects
74 nsd (RwNsdYang.YangData_RwProject_Project_NsdCatalog_Nsd): Nsd object
75 constituent_vnfrs (list): List of constituent vnfr objects of NSR
76 mon_param_project (MonParamProject): Store object instance
79 list NsrMonitoringParam object.
81 Also handles legacy NSD descriptor which has no mon-param defines. In
82 such cases the mon-params are created from VNFD's mon-param config.
85 for mon_param_msg
in nsd
.monitoring_param
:
86 mon_params
.append(NsrMonitoringParam(
89 mon_param_name
=mon_param_msg
.name
93 # This indicates that the NSD had no mon-param config.
94 if not nsd
.monitoring_param
:
95 for vnfr
in constituent_vnfrs
:
96 vnfd
= mon_param_project
.get_vnfd(vnfr
.vnfd
.id)
97 for monp
in vnfd
.monitoring_param
:
98 mon_params
.append(NsrMonitoringParam(
102 mon_param_name
=monp
.name
))
106 def __init__(self
, monp_config
, constituent_vnfrs
, is_legacy
=False, mon_param_name
=None):
109 monp_config (GiObject): Config data to create the NSR mon-param msg
110 constituent_vnfrs (list): List of VNFRs that may contain the mon-param
111 is_legacy (bool, optional): If set then the mon-param are created from
112 vnfd's config and not NSD's config.
114 self
._nsd
_mon
_param
_msg
= monp_config
115 self
._constituent
_vnfr
_map
= {vnfr
.id:vnfr
for vnfr
in constituent_vnfrs
}
117 # An internal store to hold the data
118 # Key => (vnfr_id, monp_id)
119 # value => (value_type, value)
120 self
.vnfr_monparams
= {}
122 # create_nsr_mon_params() is already validating for 'is_legacy' by checking if
123 # nsd is having 'monitoring_param'. So removing 'self.aggregation_type is None' check for is_legacy.
124 self
.is_legacy
= is_legacy
125 self
.mon_param_name
= mon_param_name
128 self
._msg
= self
._convert
_nsd
_msg
()
130 # TODO remove arg for consistency
131 self
._msg
= self
._convert
_vnfd
_msg
(monp_config
)
133 def add_vnfr(self
, vnfr
):
134 # If already added ignore
135 if vnfr
.id in self
._constituent
_vnfr
_map
:
139 self
._constituent
_vnfr
_map
[vnfr
.id] = vnfr
141 if not self
.is_legacy
:
142 self
._msg
= self
._convert
_nsd
_msg
()
144 def delete_vnfr(self
, vnfr
):
146 if vnfr
.id in self
._constituent
_vnfr
_map
:
147 del self
._constituent
_vnfr
_map
[vnfr
.id]
149 # Delete the value stores.
150 for vnfr_id
, monp_id
in list(self
.vnfr_monparams
.keys()):
151 if vnfr_id
== vnfr
.id:
152 del self
.vnfr_monparams
[(vnfr_id
, monp_id
)]
154 if not self
.is_legacy
:
155 self
._msg
= self
._convert
_nsd
_msg
()
158 def nsd_mon_param_msg(self
):
159 return self
._nsd
_mon
_param
_msg
162 def nsr_mon_param_msg(self
):
169 return list(self
.vnfr_monparams
.keys())
172 def vnfr_values(self
):
174 return list(self
.vnfr_monparams
.values())
178 """Flag which indicates if all of the constituent vnfr values are
179 available to perform the aggregation"""
180 return (self
.MISSING
not in self
.vnfr_values
)
183 def aggregation_type(self
):
184 """Aggregation type"""
185 return self
.nsr_mon_param_msg
.aggregation_type
188 # def is_legacy(self):
189 # return (self.aggregation_type is None)
192 def extract_value(cls
, monp
):
193 """Class method to extract the value type and value from the
197 monp (GiObject): Mon param msg
200 Tuple: (value type, value)
203 MissingValueField: Raised if no valid field are available.
205 if monp
.has_field("value_integer"):
206 return ("value_integer", monp
.value_integer
)
207 elif monp
.has_field("value_decimal"):
208 return ("value_decimal", monp
.value_decimal
)
209 elif monp
.has_field("value_string"):
210 return ("value_string", monp
.value_string
)
215 def _extract_ui_elements(self
, monp
):
216 ui_fields
= ["group_tag", "description", "widget_type", "units", "value_type"]
217 ui_data
= [getattr(monp
, ui_field
) for ui_field
in ui_fields
]
219 return dict(zip(ui_fields
, ui_data
))
222 def _convert_nsd_msg(self
):
223 """Create/update msg. This is also called when a new VNFR is added."""
225 # For a single VNFD there might be multiple vnfrs
226 vnfd_to_vnfr
= collections
.defaultdict(list)
227 for vnfr_id
, vnfr
in self
._constituent
_vnfr
_map
.items():
228 vnfd_to_vnfr
[vnfr
.vnfd
.id].append(vnfr_id
)
230 # First, convert the monp param ref from vnfd to vnfr terms.
231 vnfr_mon_param_ref
= []
232 for vnfd_mon
in self
.nsd_mon_param_msg
.vnfd_monitoring_param
:
233 vnfr_ids
= vnfd_to_vnfr
[vnfd_mon
.vnfd_id_ref
]
234 monp_id
= vnfd_mon
.vnfd_monitoring_param_ref
236 for vnfr_id
in vnfr_ids
:
237 key
= (vnfr_id
, monp_id
)
238 if key
not in self
.vnfr_monparams
:
239 self
.vnfr_monparams
[key
] = self
.MISSING
241 vnfr_mon_param_ref
.append({
242 'vnfr_id_ref': vnfr_id
,
243 'vnfr_mon_param_ref': monp_id
247 # For now both the NSD and NSR's monp ID are same.
248 'id': self
.nsd_mon_param_msg
.id,
249 'name': self
.nsd_mon_param_msg
.name
,
250 'nsd_mon_param_ref': self
.nsd_mon_param_msg
.id,
251 'vnfr_mon_param_ref': vnfr_mon_param_ref
,
252 'aggregation_type': self
.nsd_mon_param_msg
.aggregation_type
255 ui_fields
= self
._extract
_ui
_elements
(self
.nsd_mon_param_msg
)
256 monp_fields
.update(ui_fields
)
257 monp
= self
.MonParamMsg
.from_dict(monp_fields
)
261 def _convert_vnfd_msg(self
, vnfd_monp
):
263 vnfr
= list(self
._constituent
_vnfr
_map
.values())[0]
264 self
.vnfr_monparams
[(vnfr
.id, vnfd_monp
.id)] = self
.MISSING
267 'id': str(uuid
.uuid1()),
268 'name': vnfd_monp
.name
,
269 'vnfr_mon_param_ref': [{
270 'vnfr_id_ref': vnfr
.id,
271 'vnfr_mon_param_ref': vnfd_monp
.id
275 ui_fields
= self
._extract
_ui
_elements
(vnfd_monp
)
276 monp_data
.update(ui_fields
)
277 monp
= self
.MonParamMsg
.from_dict(monp_data
)
281 def update_vnfr_value(self
, key
, value
):
282 """Update the internal store
285 key (Tuple): (vnfr_id, monp_id)
286 value (Tuple): (value_type, value)
288 self
.vnfr_monparams
[key
] = value
291 def update_ns_value(self
, value_field
, value
):
292 """Updates the NS mon-param data with the aggregated value.
295 value_field (str): Value field in NSR
296 value : Aggregated value
298 setattr(self
.nsr_mon_param_msg
, value_field
, value
)
301 class NsrMonitoringParamPoller(mano_dts
.DtsHandler
):
302 """Handler responsible for publishing NS level monitoring
306 1. Created subscribers for each vnfr's monitoring parameter
307 2. Accumulates the VNFR's value into the NsrMonitoringParam's internal
309 3. Once all values are available, aggregate the value and triggers
310 callback notification to the subscribers.
313 def from_handler(cls
, handler
, monp
, callback
):
314 """Convenience class to build NsrMonitoringParamPoller object.
316 return cls(handler
.log
, handler
.dts
, handler
.loop
, handler
.project
,
319 def __init__(self
, log
, dts
, loop
, project
, monp
, callback
=None):
322 monp (NsrMonitoringParam): Param object
323 callback (None, optional): Callback to be triggered after value has
326 super().__init
__(log
, dts
, loop
, project
)
329 self
.subscribers
= {}
330 self
.callback
= callback
333 def make_aggregator(self
, field_types
):
335 self
._agg
= aggregator
.make_aggregator(field_types
)
339 def update_value(self
, monp
, action
, vnfr_id
):
340 """Callback that gets triggered when VNFR's mon param changes.
343 monp (Gi Object): Gi object msg
344 action (rwdts.QueryAction)): Action type
345 vnfr_id (str): Vnfr ID
347 key
= (vnfr_id
, monp
.id)
348 value
= NsrMonitoringParam
.extract_value(monp
)
352 # Accumulate the value
353 self
.monp
.update_vnfr_value(key
, value
)
355 # If all values are not available, then don't start
356 # the aggregation process.
357 if not self
.monp
.is_ready
:
360 if self
.monp
.is_legacy
:
361 # If no monp are specified then copy over the vnfr's monp data
362 value_field
, value
= value
364 field_types
, values
= zip(*self
.monp
.vnfr_values
)
366 value_field
, value
= self
.make_aggregator(field_types
).aggregate(
367 self
.monp
.aggregation_type
,
370 self
.monp
.update_ns_value(value_field
, value
)
372 self
.callback(self
.monp
.nsr_mon_param_msg
)
375 def create_pollers(self
, create
=False, register
=False):
377 for vnfr_id
, monp_id
in self
.monp
.vnfr_ids
:
378 key
= (vnfr_id
, monp_id
)
379 callback
= functools
.partial(self
.update_value
, vnfr_id
=vnfr_id
)
381 # if the poller is already created, ignore
382 if key
in self
.subscribers
:
385 self
.subscribers
[key
] = VnfrMonitoringParamSubscriber(
395 yield from self
.subscribers
[key
].register()
398 def update(self
, vnfr
):
399 self
.monp
.add_vnfr(vnfr
)
400 yield from self
.create_pollers(create
=False, register
=True)
403 def delete(self
, vnfr
):
404 self
.monp
.delete_vnfr(vnfr
)
405 for vnfr_id
, monp_id
in list(self
.subscribers
.keys()):
406 if vnfr_id
!= vnfr
.id:
409 key
= (vnfr_id
, monp_id
)
410 sub
= self
.subscribers
.pop(key
)
416 yield from self
.create_pollers()
420 for sub
in self
.subscribers
.values():
421 yield from sub
.register()
424 for sub
in self
.subscribers
.values():
427 def retrieve_data(self
):
428 return self
.monp
.nsr_mon_param_msg
430 class NsrMonitorDtsHandler(mano_dts
.DtsHandler
):
431 """ NSR monitoring class """
433 def __init__(self
, log
, dts
, loop
, project
, nsr
, constituent_vnfrs
):
436 nsr (RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr): NSR object
437 constituent_vnfrs (list): list of VNFRs in NSR
439 super().__init
__(log
, dts
, loop
, project
)
442 self
.constituent_vnfrs
= constituent_vnfrs
443 self
.dts_updates
= dict()
444 self
.dts_update_task
= None
445 self
.mon_params_pollers
= []
448 return self
.project
.add_project("D,/nsr:ns-instance-opdata/nsr:nsr" +
449 "[nsr:ns-instance-config-ref={}]".format(quoted_key(self
.nsr
.ns_instance_config_ref
)))
451 def xpath(self
, param_id
=None):
452 return self
.project
.add_project("D,/nsr:ns-instance-opdata/nsr:nsr" +
453 "[nsr:ns-instance-config-ref={}]".format(quoted_key(self
.nsr
.ns_instance_config_ref
)) +
454 "/nsr:monitoring-param" +
455 ("[nsr:id={}]".format(quoted_key(param_id
)) if param_id
else ""))
460 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
464 if (query_action
== rwdts
.QueryAction
.READ
):
465 if (len(self
.mon_params_pollers
)):
466 nsr_dict
= {"ns_instance_config_ref": self
.nsr
.ns_instance_config_ref
}
467 nsrmsg
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr
. \
469 xpath
= self
.nsr_xpath()
471 for poller
in self
.mon_params_pollers
:
473 NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_MonitoringParam
. \
474 from_dict(poller
.retrieve_data().as_dict())
475 nsrmsg
.monitoring_param
.append(mp_dict
)
478 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.ACK
,
479 xpath
=self
.nsr_xpath(),
481 except rift
.tasklets
.dts
.ResponseError
:
485 def on_ready(regh
, status
):
488 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
, on_ready
=on_ready
)
491 self
.reg
= yield from self
.dts
.register(xpath
=self
.xpath(),
492 flags
=rwdts
.Flag
.PUBLISHER
,
495 assert self
.reg
is not None
498 def nsr_monparam_update(self
):
499 #check if the earlier xact is done or there is an xact
501 if (len(self
.dts_updates
) == 0):
502 self
.dts_update_task
= None
504 nsr_dict
= {"ns_instance_config_ref": self
.nsr
.ns_instance_config_ref
}
505 nsrmsg
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr
.from_dict(nsr_dict
)
507 for k
,v
in self
.dts_updates
.items():
509 YangData_RwProject_Project_NsInstanceOpdata_Nsr_MonitoringParam
. \
510 from_dict(v
.as_dict())
511 nsrmsg
.monitoring_param
.append(mp_dict
)
512 self
.dts_updates
.clear()
514 yield from self
.dts
.query_update(self
.nsr_xpath(), rwdts
.XactFlag
.ADVISE
,
517 self
.dts_update_task
= None
518 if (len(self
.dts_updates
) == 0):
519 #schedule a DTS task to update the NSR again
520 self
.add_dtsupdate_task()
522 except Exception as e
:
523 self
.log
.exception("Exception updating NSR mon-param: %s", str(e
))
525 def add_dtsupdate_task(self
):
526 if (self
.dts_update_task
is None):
527 self
.dts_update_task
= asyncio
.ensure_future(self
.nsr_monparam_update(), loop
=self
.loop
)
529 def callback(self
, nsr_mon_param_msg
):
530 """Callback that triggers update.
532 self
.dts_updates
[nsr_mon_param_msg
.id] = nsr_mon_param_msg
533 #schedule a DTS task to update the NSR if one does not exist
534 self
.add_dtsupdate_task()
538 nsd
= self
.project
.get_nsd(self
.nsr
.nsd_ref
)
540 mon_params
= NsrMonitoringParam
.create_nsr_mon_params(
542 self
.constituent_vnfrs
,
545 for monp
in mon_params
:
546 poller
= NsrMonitoringParamPoller
.from_handler(
549 callback
=self
.callback
)
551 self
.mon_params_pollers
.append(poller
)
552 yield from poller
.register()
553 yield from poller
.start()
556 def update(self
, additional_vnfrs
):
557 for vnfr
in additional_vnfrs
:
558 for poller
in self
.mon_params_pollers
:
559 yield from poller
.update(vnfr
)
562 def delete(self
, deleted_vnfrs
):
563 for vnfr
in deleted_vnfrs
:
564 for poller
in self
.mon_params_pollers
:
565 yield from poller
.delete(vnfr
)
569 for poller
in self
.mon_params_pollers
:
573 def deregister(self
):
574 """ de-register with dts """
575 if self
.reg
is not None:
576 self
.reg
.deregister()
579 def apply_vnfr_mon(self
, msg
, vnfr_id
):
580 """ Change in vnfr mon to ne applied"""
581 for poller
in self
.mon_params_pollers
:
582 if (poller
.monp
.mon_param_name
== msg
.name
):
583 poller
.update_value(msg
, rwdts
.QueryAction
.UPDATE
, vnfr_id
)