3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 @author Varun Prasad (varun.prasad@riftio.com)
28 from gi
.repository
import (RwDts
as rwdts
, NsrYang
)
29 import rift
.mano
.dts
as mano_dts
31 from . import aggregator
as aggregator
34 class MissingValueField(Exception):
38 class VnfrMonitoringParamSubscriber(mano_dts
.AbstractOpdataSubscriber
):
39 """Registers for VNFR monitoring parameter changes.
42 monp_id (str): Monitoring Param ID
43 vnfr_id (str): VNFR ID
45 def __init__(self
, log
, dts
, loop
, vnfr_id
, monp_id
, callback
=None):
46 super().__init
__(log
, dts
, loop
, callback
)
47 self
.vnfr_id
= vnfr_id
48 self
.monp_id
= monp_id
51 return("D,/vnfr:vnfr-catalog" +
52 "/vnfr:vnfr[vnfr:id='{}']".format(self
.vnfr_id
) +
53 "/vnfr:monitoring-param" +
54 "[vnfr:id='{}']".format(self
.monp_id
))
57 class NsrMonitoringParam():
58 """Class that handles NS Mon-param data.
60 MonParamMsg
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_MonitoringParam
62 DEFAULT_AGGREGATION_TYPE
= "AVERAGE"
65 def create_nsr_mon_params(cls
, nsd
, constituent_vnfrs
, store
):
66 """Convenience class that constructs NSMonitoringParam objects
69 nsd (RwNsdYang.YangData_Nsd_NsdCatalog_Nsd): Nsd object
70 constituent_vnfrs (list): List of constituent vnfr objects of NSR
71 store (SubscriberStore): Store object instance
74 list NsrMonitoringParam object.
76 Also handles legacy NSD descriptor which has no mon-param defines. In
77 such cases the mon-params are created from VNFD's mon-param config.
79 MonParamMsg
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_MonitoringParam
82 for mon_param_msg
in nsd
.monitoring_param
:
83 mon_params
.append(NsrMonitoringParam(
89 # This indicates that the NSD had no mon-param config.
90 if not nsd
.monitoring_param
:
91 for vnfr
in constituent_vnfrs
:
92 vnfd
= store
.get_vnfd(vnfr
.vnfd_ref
)
93 for monp
in vnfd
.monitoring_param
:
94 mon_params
.append(NsrMonitoringParam(
101 def __init__(self
, monp_config
, constituent_vnfrs
, is_legacy
=False):
104 monp_config (GiObject): Config data to create the NSR mon-param msg
105 constituent_vnfrs (list): List of VNFRs that may contain the mon-param
106 is_legacy (bool, optional): If set then the mon-param are created from
107 vnfd's config and not NSD's config.
109 self
._constituent
_vnfr
_map
= {vnfr
.id:vnfr
for vnfr
in constituent_vnfrs
}
111 # An internal store to hold the data
112 # Key => (vnfr_id, monp_id)
113 # value => (value_type, value)
114 self
.vnfr_monparams
= {}
117 self
._msg
= self
._convert
_nsd
_msg
(monp_config
)
119 self
._msg
= self
._convert
_vnfd
_msg
(monp_config
)
122 def nsr_mon_param_msg(self
):
129 return list(self
.vnfr_monparams
.keys())
132 def vnfr_values(self
):
134 return list(self
.vnfr_monparams
.values())
138 """Flag which indicates if all of the constituent vnfr values are
139 available to perform the aggregation"""
140 return (self
.MISSING
not in self
.vnfr_values
)
143 def aggregation_type(self
):
144 """Aggregation type"""
145 return self
.nsr_mon_param_msg
.aggregation_type
149 return (self
.aggregation_type
is None)
152 def extract_value(cls
, monp
):
153 """Class method to extract the value type and value from the
157 monp (GiObject): Mon param msg
160 Tuple: (value type, value)
163 MissingValueField: Raised if no valid field are available.
165 if monp
.has_field("value_integer"):
166 return ("value_integer", monp
.value_integer
)
167 elif monp
.has_field("value_decimal"):
168 return ("value_decimal", monp
.value_decimal
)
169 elif monp
.has_field("value_string"):
170 return ("value_string", monp
.value_string
)
174 def _constituent_vnfrs(self
, constituent_vnfr_ids
):
177 for constituent_vnfr
in constituent_vnfr_ids
:
178 vnfr_id
= constituent_vnfr
.vnfr_id
179 vnfr_map
[vnfr_id
] = self
._store
.get_vnfr(vnfr_id
)
183 def _extract_ui_elements(self
, monp
):
184 ui_fields
= ["group_tag", "description", "widget_type", "units", "value_type"]
185 ui_data
= [getattr(monp
, ui_field
) for ui_field
in ui_fields
]
187 return dict(zip(ui_fields
, ui_data
))
190 def _convert_nsd_msg(self
, nsd_monp
):
191 """Create initial msg without values"""
192 vnfd_to_vnfr
= {vnfr
.vnfd_ref
: vnfr_id
193 for vnfr_id
, vnfr
in self
._constituent
_vnfr
_map
.items()}
195 # First, convert the monp param ref from vnfd to vnfr terms.
196 vnfr_mon_param_ref
= []
197 for vnfd_mon
in nsd_monp
.vnfd_monitoring_param
:
198 vnfr_id
= vnfd_to_vnfr
[vnfd_mon
.vnfd_id_ref
]
199 monp_id
= vnfd_mon
.vnfd_monitoring_param_ref
201 self
.vnfr_monparams
[(vnfr_id
, monp_id
)] = self
.MISSING
203 vnfr_mon_param_ref
.append({
204 'vnfr_id_ref': vnfr_id
,
205 'vnfr_mon_param_ref': monp_id
209 # For now both the NSD and NSR's monp ID are same.
211 'name': nsd_monp
.name
,
212 'nsd_mon_param_ref': nsd_monp
.id,
213 'vnfr_mon_param_ref': vnfr_mon_param_ref
,
214 'aggregation_type': nsd_monp
.aggregation_type
217 ui_fields
= self
._extract
_ui
_elements
(nsd_monp
)
218 monp_fields
.update(ui_fields
)
219 monp
= self
.MonParamMsg
.from_dict(monp_fields
)
223 def _convert_vnfd_msg(self
, vnfd_monp
):
225 vnfr
= list(self
._constituent
_vnfr
_map
.values())[0]
226 self
.vnfr_monparams
[(vnfr
.id, vnfd_monp
.id)] = self
.MISSING
229 'id': str(uuid
.uuid1()),
230 'name': vnfd_monp
.name
,
231 'vnfr_mon_param_ref': [{
232 'vnfr_id_ref': vnfr
.id,
233 'vnfr_mon_param_ref': vnfd_monp
.id
237 ui_fields
= self
._extract
_ui
_elements
(vnfd_monp
)
238 monp_data
.update(ui_fields
)
239 monp
= self
.MonParamMsg
.from_dict(monp_data
)
243 def update_vnfr_value(self
, key
, value
):
244 """Update the internal store
247 key (Tuple): (vnfr_id, monp_id)
248 value (Tuple): (value_type, value)
250 self
.vnfr_monparams
[key
] = value
252 def update_ns_value(self
, value_field
, value
):
253 """Updates the NS mon-param data with the aggregated value.
256 value_field (str): Value field in NSR
257 value : Aggregated value
259 setattr(self
.nsr_mon_param_msg
, value_field
, value
)
262 class NsrMonitoringParamPoller(mano_dts
.DtsHandler
):
263 """Handler responsible for publishing NS level monitoring
267 1. Created subscribers for each vnfr's monitoring parameter
268 2. Accumulates the VNFR's value into the NsrMonitoringParam's internal
270 3. Once all values are available, aggregate the value and triggers
271 callback notification to the subscribers.
274 def from_handler(cls
, handler
, monp
, callback
):
275 """Convenience class to build NsrMonitoringParamPoller object.
277 return cls(handler
.log
, handler
.dts
, handler
.loop
, monp
, callback
)
279 def __init__(self
, log
, dts
, loop
, monp
, callback
=None):
282 monp (NsrMonitoringParam): Param object
283 callback (None, optional): Callback to be triggered after value has
286 super().__init
__(log
, dts
, loop
)
289 self
.subscribers
= []
290 self
.callback
= callback
293 def make_aggregator(self
, field_types
):
295 self
._agg
= aggregator
.make_aggregator(field_types
)
299 def update_value(self
, monp
, action
, vnfr_id
):
300 """Callback that gets triggered when VNFR's mon param changes.
303 monp (Gi Object): Gi object msg
304 action (rwdts.QueryAction)): Action type
305 vnfr_id (str): Vnfr ID
307 key
= (vnfr_id
, monp
.id)
308 value
= NsrMonitoringParam
.extract_value(monp
)
313 # Accumulate the value
314 self
.monp
.update_vnfr_value(key
, value
)
316 # If all values are not available, then don't start
317 # the aggregation process.
318 if not self
.monp
.is_ready
:
321 if self
.monp
.is_legacy
:
322 # If no monp are specified then copy over the vnfr's monp data
323 value_field
, value
= value
325 field_types
, values
= zip(*self
.monp
.vnfr_values
)
327 value_field
, value
= self
.make_aggregator(field_types
).aggregate(
328 self
.monp
.aggregation_type
,
331 self
.monp
.update_ns_value(value_field
, value
)
333 self
.callback(self
.monp
.nsr_mon_param_msg
)
337 for vnfr_id
, monp_id
in self
.monp
.vnfr_ids
:
338 callback
= functools
.partial(self
.update_value
, vnfr_id
=vnfr_id
)
339 self
.subscribers
.append(VnfrMonitoringParamSubscriber(
340 self
.loop
, self
.dts
, self
.loop
, vnfr_id
, monp_id
, callback
=callback
))
344 for sub
in self
.subscribers
:
345 yield from sub
.register()
348 for sub
in self
.subscribers
:
352 class NsrMonitorDtsHandler(mano_dts
.DtsHandler
):
353 """ NSR monitoring class """
355 def __init__(self
, log
, dts
, loop
, nsr
, constituent_vnfrs
, store
):
358 nsr (RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr): NSR object
359 constituent_vnfrs (list): list of VNFRs in NSR
360 store (SubscriberStore): Store instance
362 super().__init
__(log
, dts
, loop
)
366 self
.constituent_vnfrs
= constituent_vnfrs
367 self
.mon_params_pollers
= []
369 def xpath(self
, param_id
=None):
370 return ("D,/nsr:ns-instance-opdata/nsr:nsr" +
371 "[nsr:ns-instance-config-ref='{}']".format(self
.nsr
.ns_instance_config_ref
) +
372 "/nsr:monitoring-param" +
373 ("[nsr:id='{}']".format(param_id
) if param_id
else ""))
377 self
.reg
= yield from self
.dts
.register(xpath
=self
.xpath(),
378 flags
=rwdts
.Flag
.PUBLISHER|rwdts
.Flag
.CACHE|rwdts
.Flag
.NO_PREP_READ
)
380 assert self
.reg
is not None
382 def callback(self
, nsr_mon_param_msg
):
383 """Callback that triggers update.
385 self
.reg
.update_element(
386 self
.xpath(param_id
=nsr_mon_param_msg
.id),
391 nsd
= self
.store
.get_nsd(self
.nsr
.nsd_ref
)
392 mon_params
= NsrMonitoringParam
.create_nsr_mon_params(
394 self
.constituent_vnfrs
,
397 for monp
in mon_params
:
398 poller
= NsrMonitoringParamPoller
.from_handler(
401 callback
=self
.callback
)
403 self
.mon_params_pollers
.append(poller
)
404 yield from poller
.register()
405 yield from poller
.start()
409 for poller
in self
.mon_params_pollers
:
413 def deregister(self
):
414 """ de-register with dts """
415 if self
.reg
is not None:
416 self
.reg
.deregister()