3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 @author Varun Prasad (varun.prasad@riftio.com)
28 from gi
.repository
import (RwDts
as rwdts
, NsrYang
)
29 import rift
.mano
.dts
as mano_dts
31 from . import aggregator
as aggregator
34 class MissingValueField(Exception):
38 class VnfrMonitoringParamSubscriber(mano_dts
.AbstractOpdataSubscriber
):
39 """Registers for VNFR monitoring parameter changes.
42 monp_id (str): Monitoring Param ID
43 vnfr_id (str): VNFR ID
45 def __init__(self
, log
, dts
, loop
, vnfr_id
, monp_id
, callback
=None):
46 super().__init
__(log
, dts
, loop
, callback
)
47 self
.vnfr_id
= vnfr_id
48 self
.monp_id
= monp_id
51 return("D,/vnfr:vnfr-catalog" +
52 "/vnfr:vnfr[vnfr:id='{}']".format(self
.vnfr_id
) +
53 "/vnfr:monitoring-param" +
54 "[vnfr:id='{}']".format(self
.monp_id
))
57 class NsrMonitoringParam():
58 """Class that handles NS Mon-param data.
60 MonParamMsg
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_MonitoringParam
62 DEFAULT_AGGREGATION_TYPE
= "AVERAGE"
65 def create_nsr_mon_params(cls
, nsd
, constituent_vnfrs
, store
):
66 """Convenience class that constructs NSMonitoringParam objects
69 nsd (RwNsdYang.YangData_Nsd_NsdCatalog_Nsd): Nsd object
70 constituent_vnfrs (list): List of constituent vnfr objects of NSR
71 store (SubscriberStore): Store object instance
74 list NsrMonitoringParam object.
76 Also handles legacy NSD descriptor which has no mon-param defines. In
77 such cases the mon-params are created from VNFD's mon-param config.
79 MonParamMsg
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_MonitoringParam
82 for mon_param_msg
in nsd
.monitoring_param
:
83 mon_params
.append(NsrMonitoringParam(
89 # This indicates that the NSD had no mon-param config.
90 if not nsd
.monitoring_param
:
91 for vnfr
in constituent_vnfrs
:
92 vnfd
= store
.get_vnfd(vnfr
.vnfd
.id)
93 for monp
in vnfd
.monitoring_param
:
94 mon_params
.append(NsrMonitoringParam(
101 def __init__(self
, monp_config
, constituent_vnfrs
, is_legacy
=False):
104 monp_config (GiObject): Config data to create the NSR mon-param msg
105 constituent_vnfrs (list): List of VNFRs that may contain the mon-param
106 is_legacy (bool, optional): If set then the mon-param are created from
107 vnfd's config and not NSD's config.
109 self
._constituent
_vnfr
_map
= {vnfr
.id:vnfr
for vnfr
in constituent_vnfrs
}
111 # An internal store to hold the data
112 # Key => (vnfr_id, monp_id)
113 # value => (value_type, value)
114 self
.vnfr_monparams
= {}
116 # create_nsr_mon_params() is already validating for 'is_legacy' by checking if
117 # nsd is having 'monitoring_param'. So removing 'self.aggregation_type is None' check for is_legacy.
118 self
.is_legacy
= is_legacy
121 self
._msg
= self
._convert
_nsd
_msg
(monp_config
)
123 self
._msg
= self
._convert
_vnfd
_msg
(monp_config
)
126 def nsr_mon_param_msg(self
):
133 return list(self
.vnfr_monparams
.keys())
136 def vnfr_values(self
):
138 return list(self
.vnfr_monparams
.values())
142 """Flag which indicates if all of the constituent vnfr values are
143 available to perform the aggregation"""
144 return (self
.MISSING
not in self
.vnfr_values
)
147 def aggregation_type(self
):
148 """Aggregation type"""
149 return self
.nsr_mon_param_msg
.aggregation_type
152 # def is_legacy(self):
153 # return (self.aggregation_type is None)
156 def extract_value(cls
, monp
):
157 """Class method to extract the value type and value from the
161 monp (GiObject): Mon param msg
164 Tuple: (value type, value)
167 MissingValueField: Raised if no valid field are available.
169 if monp
.has_field("value_integer"):
170 return ("value_integer", monp
.value_integer
)
171 elif monp
.has_field("value_decimal"):
172 return ("value_decimal", monp
.value_decimal
)
173 elif monp
.has_field("value_string"):
174 return ("value_string", monp
.value_string
)
178 def _constituent_vnfrs(self
, constituent_vnfr_ids
):
181 for constituent_vnfr
in constituent_vnfr_ids
:
182 vnfr_id
= constituent_vnfr
.vnfr_id
183 vnfr_map
[vnfr_id
] = self
._store
.get_vnfr(vnfr_id
)
187 def _extract_ui_elements(self
, monp
):
188 ui_fields
= ["group_tag", "description", "widget_type", "units", "value_type"]
189 ui_data
= [getattr(monp
, ui_field
) for ui_field
in ui_fields
]
191 return dict(zip(ui_fields
, ui_data
))
194 def _convert_nsd_msg(self
, nsd_monp
):
195 """Create initial msg without values"""
196 vnfd_to_vnfr
= {vnfr
.vnfd
.id: vnfr_id
197 for vnfr_id
, vnfr
in self
._constituent
_vnfr
_map
.items()}
199 # First, convert the monp param ref from vnfd to vnfr terms.
200 vnfr_mon_param_ref
= []
201 for vnfd_mon
in nsd_monp
.vnfd_monitoring_param
:
202 vnfr_id
= vnfd_to_vnfr
[vnfd_mon
.vnfd_id_ref
]
203 monp_id
= vnfd_mon
.vnfd_monitoring_param_ref
205 self
.vnfr_monparams
[(vnfr_id
, monp_id
)] = self
.MISSING
207 vnfr_mon_param_ref
.append({
208 'vnfr_id_ref': vnfr_id
,
209 'vnfr_mon_param_ref': monp_id
213 # For now both the NSD and NSR's monp ID are same.
215 'name': nsd_monp
.name
,
216 'nsd_mon_param_ref': nsd_monp
.id,
217 'vnfr_mon_param_ref': vnfr_mon_param_ref
,
218 'aggregation_type': nsd_monp
.aggregation_type
221 ui_fields
= self
._extract
_ui
_elements
(nsd_monp
)
222 monp_fields
.update(ui_fields
)
223 monp
= self
.MonParamMsg
.from_dict(monp_fields
)
227 def _convert_vnfd_msg(self
, vnfd_monp
):
229 vnfr
= list(self
._constituent
_vnfr
_map
.values())[0]
230 self
.vnfr_monparams
[(vnfr
.id, vnfd_monp
.id)] = self
.MISSING
233 'id': str(uuid
.uuid1()),
234 'name': vnfd_monp
.name
,
235 'vnfr_mon_param_ref': [{
236 'vnfr_id_ref': vnfr
.id,
237 'vnfr_mon_param_ref': vnfd_monp
.id
241 ui_fields
= self
._extract
_ui
_elements
(vnfd_monp
)
242 monp_data
.update(ui_fields
)
243 monp
= self
.MonParamMsg
.from_dict(monp_data
)
247 def update_vnfr_value(self
, key
, value
):
248 """Update the internal store
251 key (Tuple): (vnfr_id, monp_id)
252 value (Tuple): (value_type, value)
254 self
.vnfr_monparams
[key
] = value
256 def update_ns_value(self
, value_field
, value
):
257 """Updates the NS mon-param data with the aggregated value.
260 value_field (str): Value field in NSR
261 value : Aggregated value
263 setattr(self
.nsr_mon_param_msg
, value_field
, value
)
266 class NsrMonitoringParamPoller(mano_dts
.DtsHandler
):
267 """Handler responsible for publishing NS level monitoring
271 1. Created subscribers for each vnfr's monitoring parameter
272 2. Accumulates the VNFR's value into the NsrMonitoringParam's internal
274 3. Once all values are available, aggregate the value and triggers
275 callback notification to the subscribers.
278 def from_handler(cls
, handler
, monp
, callback
):
279 """Convenience class to build NsrMonitoringParamPoller object.
281 return cls(handler
.log
, handler
.dts
, handler
.loop
, monp
, callback
)
283 def __init__(self
, log
, dts
, loop
, monp
, callback
=None):
286 monp (NsrMonitoringParam): Param object
287 callback (None, optional): Callback to be triggered after value has
290 super().__init
__(log
, dts
, loop
)
293 self
.subscribers
= []
294 self
.callback
= callback
297 def make_aggregator(self
, field_types
):
299 self
._agg
= aggregator
.make_aggregator(field_types
)
303 def update_value(self
, monp
, action
, vnfr_id
):
304 """Callback that gets triggered when VNFR's mon param changes.
307 monp (Gi Object): Gi object msg
308 action (rwdts.QueryAction)): Action type
309 vnfr_id (str): Vnfr ID
311 key
= (vnfr_id
, monp
.id)
312 value
= NsrMonitoringParam
.extract_value(monp
)
317 # Accumulate the value
318 self
.monp
.update_vnfr_value(key
, value
)
320 # If all values are not available, then don't start
321 # the aggregation process.
322 if not self
.monp
.is_ready
:
325 if self
.monp
.is_legacy
:
326 # If no monp are specified then copy over the vnfr's monp data
327 value_field
, value
= value
329 field_types
, values
= zip(*self
.monp
.vnfr_values
)
331 value_field
, value
= self
.make_aggregator(field_types
).aggregate(
332 self
.monp
.aggregation_type
,
335 self
.monp
.update_ns_value(value_field
, value
)
337 self
.callback(self
.monp
.nsr_mon_param_msg
)
341 for vnfr_id
, monp_id
in self
.monp
.vnfr_ids
:
342 callback
= functools
.partial(self
.update_value
, vnfr_id
=vnfr_id
)
343 self
.subscribers
.append(VnfrMonitoringParamSubscriber(
344 self
.loop
, self
.dts
, self
.loop
, vnfr_id
, monp_id
, callback
=callback
))
348 for sub
in self
.subscribers
:
349 yield from sub
.register()
352 for sub
in self
.subscribers
:
356 class NsrMonitorDtsHandler(mano_dts
.DtsHandler
):
357 """ NSR monitoring class """
359 def __init__(self
, log
, dts
, loop
, nsr
, constituent_vnfrs
, store
):
362 nsr (RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr): NSR object
363 constituent_vnfrs (list): list of VNFRs in NSR
364 store (SubscriberStore): Store instance
366 super().__init
__(log
, dts
, loop
)
370 self
.constituent_vnfrs
= constituent_vnfrs
371 self
.mon_params_pollers
= []
373 def xpath(self
, param_id
=None):
374 return ("D,/nsr:ns-instance-opdata/nsr:nsr" +
375 "[nsr:ns-instance-config-ref='{}']".format(self
.nsr
.ns_instance_config_ref
) +
376 "/nsr:monitoring-param" +
377 ("[nsr:id='{}']".format(param_id
) if param_id
else ""))
381 self
.reg
= yield from self
.dts
.register(xpath
=self
.xpath(),
382 flags
=rwdts
.Flag
.PUBLISHER|rwdts
.Flag
.CACHE|rwdts
.Flag
.NO_PREP_READ
)
384 assert self
.reg
is not None
386 def callback(self
, nsr_mon_param_msg
):
387 """Callback that triggers update.
389 self
.reg
.update_element(
390 self
.xpath(param_id
=nsr_mon_param_msg
.id),
395 nsd
= self
.store
.get_nsd(self
.nsr
.nsd_ref
)
396 mon_params
= NsrMonitoringParam
.create_nsr_mon_params(
398 self
.constituent_vnfrs
,
401 for monp
in mon_params
:
402 poller
= NsrMonitoringParamPoller
.from_handler(
405 callback
=self
.callback
)
407 self
.mon_params_pollers
.append(poller
)
408 yield from poller
.register()
409 yield from poller
.start()
413 for poller
in self
.mon_params_pollers
:
417 def deregister(self
):
418 """ de-register with dts """
419 if self
.reg
is not None:
420 self
.reg
.deregister()