3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 @author Varun Prasad (varun.prasad@riftio.com)
28 from gi
.repository
import (RwDts
as rwdts
, NsrYang
)
29 import rift
.mano
.dts
as mano_dts
31 from . import aggregator
as aggregator
34 class MissingValueField(Exception):
38 class VnfrMonitoringParamSubscriber(mano_dts
.AbstractOpdataSubscriber
):
39 """Registers for VNFR monitoring parameter changes.
42 monp_id (str): Monitoring Param ID
43 vnfr_id (str): VNFR ID
45 def __init__(self
, log
, dts
, loop
, project
, vnfr_id
, monp_id
, callback
=None):
46 super().__init
__(log
, dts
, loop
, project
, callback
)
47 self
.vnfr_id
= vnfr_id
48 self
.monp_id
= monp_id
51 return self
.project
.add_project(("D,/vnfr:vnfr-catalog" +
52 "/vnfr:vnfr[vnfr:id='{}']".format(self
.vnfr_id
) +
53 "/vnfr:monitoring-param" +
54 "[vnfr:id='{}']".format(self
.monp_id
)))
57 class NsrMonitoringParam():
58 """Class that handles NS Mon-param data.
60 MonParamMsg
= NsrYang
.YangData_RwProject_Project_NsInstanceOpdata_Nsr_MonitoringParam
62 DEFAULT_AGGREGATION_TYPE
= "AVERAGE"
65 def create_nsr_mon_params(cls
, nsd
, constituent_vnfrs
, store
):
66 """Convenience class that constructs NSMonitoringParam objects
69 nsd (RwNsdYang.YangData_RwProject_Project_NsdCatalog_Nsd): Nsd object
70 constituent_vnfrs (list): List of constituent vnfr objects of NSR
71 store (SubscriberStore): Store object instance
74 list NsrMonitoringParam object.
76 Also handles legacy NSD descriptor which has no mon-param defines. In
77 such cases the mon-params are created from VNFD's mon-param config.
80 for mon_param_msg
in nsd
.monitoring_param
:
81 mon_params
.append(NsrMonitoringParam(
87 # This indicates that the NSD had no mon-param config.
88 if not nsd
.monitoring_param
:
89 for vnfr
in constituent_vnfrs
:
90 vnfd
= store
.get_vnfd(vnfr
.vnfd
.id)
91 for monp
in vnfd
.monitoring_param
:
92 mon_params
.append(NsrMonitoringParam(
99 def __init__(self
, monp_config
, constituent_vnfrs
, is_legacy
=False):
102 monp_config (GiObject): Config data to create the NSR mon-param msg
103 constituent_vnfrs (list): List of VNFRs that may contain the mon-param
104 is_legacy (bool, optional): If set then the mon-param are created from
105 vnfd's config and not NSD's config.
107 self
._constituent
_vnfr
_map
= {vnfr
.id:vnfr
for vnfr
in constituent_vnfrs
}
109 # An internal store to hold the data
110 # Key => (vnfr_id, monp_id)
111 # value => (value_type, value)
112 self
.vnfr_monparams
= {}
114 # create_nsr_mon_params() is already validating for 'is_legacy' by checking if
115 # nsd is having 'monitoring_param'. So removing 'self.aggregation_type is None' check for is_legacy.
116 self
.is_legacy
= is_legacy
119 self
._msg
= self
._convert
_nsd
_msg
(monp_config
)
121 self
._msg
= self
._convert
_vnfd
_msg
(monp_config
)
124 def nsr_mon_param_msg(self
):
131 return list(self
.vnfr_monparams
.keys())
134 def vnfr_values(self
):
136 return list(self
.vnfr_monparams
.values())
140 """Flag which indicates if all of the constituent vnfr values are
141 available to perform the aggregation"""
142 return (self
.MISSING
not in self
.vnfr_values
)
145 def aggregation_type(self
):
146 """Aggregation type"""
147 return self
.nsr_mon_param_msg
.aggregation_type
150 # def is_legacy(self):
151 # return (self.aggregation_type is None)
154 def extract_value(cls
, monp
):
155 """Class method to extract the value type and value from the
159 monp (GiObject): Mon param msg
162 Tuple: (value type, value)
165 MissingValueField: Raised if no valid field are available.
167 if monp
.has_field("value_integer"):
168 return ("value_integer", monp
.value_integer
)
169 elif monp
.has_field("value_decimal"):
170 return ("value_decimal", monp
.value_decimal
)
171 elif monp
.has_field("value_string"):
172 return ("value_string", monp
.value_string
)
176 def _constituent_vnfrs(self
, constituent_vnfr_ids
):
179 for constituent_vnfr
in constituent_vnfr_ids
:
180 vnfr_id
= constituent_vnfr
.vnfr_id
181 vnfr_map
[vnfr_id
] = self
._store
.get_vnfr(vnfr_id
)
185 def _extract_ui_elements(self
, monp
):
186 ui_fields
= ["group_tag", "description", "widget_type", "units", "value_type"]
187 ui_data
= [getattr(monp
, ui_field
) for ui_field
in ui_fields
]
189 return dict(zip(ui_fields
, ui_data
))
192 def _convert_nsd_msg(self
, nsd_monp
):
193 """Create initial msg without values"""
194 vnfd_to_vnfr
= {vnfr
.vnfd
.id: vnfr_id
195 for vnfr_id
, vnfr
in self
._constituent
_vnfr
_map
.items()}
197 # First, convert the monp param ref from vnfd to vnfr terms.
198 vnfr_mon_param_ref
= []
199 for vnfd_mon
in nsd_monp
.vnfd_monitoring_param
:
200 vnfr_id
= vnfd_to_vnfr
[vnfd_mon
.vnfd_id_ref
]
201 monp_id
= vnfd_mon
.vnfd_monitoring_param_ref
203 self
.vnfr_monparams
[(vnfr_id
, monp_id
)] = self
.MISSING
205 vnfr_mon_param_ref
.append({
206 'vnfr_id_ref': vnfr_id
,
207 'vnfr_mon_param_ref': monp_id
211 # For now both the NSD and NSR's monp ID are same.
213 'name': nsd_monp
.name
,
214 'nsd_mon_param_ref': nsd_monp
.id,
215 'vnfr_mon_param_ref': vnfr_mon_param_ref
,
216 'aggregation_type': nsd_monp
.aggregation_type
219 ui_fields
= self
._extract
_ui
_elements
(nsd_monp
)
220 monp_fields
.update(ui_fields
)
221 monp
= self
.MonParamMsg
.from_dict(monp_fields
)
225 def _convert_vnfd_msg(self
, vnfd_monp
):
227 vnfr
= list(self
._constituent
_vnfr
_map
.values())[0]
228 self
.vnfr_monparams
[(vnfr
.id, vnfd_monp
.id)] = self
.MISSING
231 'id': str(uuid
.uuid1()),
232 'name': vnfd_monp
.name
,
233 'vnfr_mon_param_ref': [{
234 'vnfr_id_ref': vnfr
.id,
235 'vnfr_mon_param_ref': vnfd_monp
.id
239 ui_fields
= self
._extract
_ui
_elements
(vnfd_monp
)
240 monp_data
.update(ui_fields
)
241 monp
= self
.MonParamMsg
.from_dict(monp_data
)
245 def update_vnfr_value(self
, key
, value
):
246 """Update the internal store
249 key (Tuple): (vnfr_id, monp_id)
250 value (Tuple): (value_type, value)
252 self
.vnfr_monparams
[key
] = value
254 def update_ns_value(self
, value_field
, value
):
255 """Updates the NS mon-param data with the aggregated value.
258 value_field (str): Value field in NSR
259 value : Aggregated value
261 setattr(self
.nsr_mon_param_msg
, value_field
, value
)
264 class NsrMonitoringParamPoller(mano_dts
.DtsHandler
):
265 """Handler responsible for publishing NS level monitoring
269 1. Created subscribers for each vnfr's monitoring parameter
270 2. Accumulates the VNFR's value into the NsrMonitoringParam's internal
272 3. Once all values are available, aggregate the value and triggers
273 callback notification to the subscribers.
276 def from_handler(cls
, handler
, monp
, callback
):
277 """Convenience class to build NsrMonitoringParamPoller object.
279 return cls(handler
.log
, handler
.dts
, handler
.loop
, handler
.project
,
282 def __init__(self
, log
, dts
, loop
, project
, monp
, callback
=None):
285 monp (NsrMonitoringParam): Param object
286 callback (None, optional): Callback to be triggered after value has
289 super().__init
__(log
, dts
, loop
, project
)
292 self
.subscribers
= []
293 self
.callback
= callback
296 def make_aggregator(self
, field_types
):
298 self
._agg
= aggregator
.make_aggregator(field_types
)
302 def update_value(self
, monp
, action
, vnfr_id
):
303 """Callback that gets triggered when VNFR's mon param changes.
306 monp (Gi Object): Gi object msg
307 action (rwdts.QueryAction)): Action type
308 vnfr_id (str): Vnfr ID
310 key
= (vnfr_id
, monp
.id)
311 value
= NsrMonitoringParam
.extract_value(monp
)
316 # Accumulate the value
317 self
.monp
.update_vnfr_value(key
, value
)
319 # If all values are not available, then don't start
320 # the aggregation process.
321 if not self
.monp
.is_ready
:
324 if self
.monp
.is_legacy
:
325 # If no monp are specified then copy over the vnfr's monp data
326 value_field
, value
= value
328 field_types
, values
= zip(*self
.monp
.vnfr_values
)
330 value_field
, value
= self
.make_aggregator(field_types
).aggregate(
331 self
.monp
.aggregation_type
,
334 self
.monp
.update_ns_value(value_field
, value
)
336 self
.callback(self
.monp
.nsr_mon_param_msg
)
340 for vnfr_id
, monp_id
in self
.monp
.vnfr_ids
:
341 callback
= functools
.partial(self
.update_value
, vnfr_id
=vnfr_id
)
342 self
.subscribers
.append(VnfrMonitoringParamSubscriber(
343 self
.loop
, self
.dts
, self
.loop
, self
.project
,
344 vnfr_id
, monp_id
, callback
=callback
))
348 for sub
in self
.subscribers
:
349 yield from sub
.register()
352 for sub
in self
.subscribers
:
356 class NsrMonitorDtsHandler(mano_dts
.DtsHandler
):
357 """ NSR monitoring class """
359 def __init__(self
, log
, dts
, loop
, project
, nsr
, constituent_vnfrs
, store
):
362 nsr (RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr): NSR object
363 constituent_vnfrs (list): list of VNFRs in NSR
364 store (SubscriberStore): Store instance
366 super().__init
__(log
, dts
, loop
, project
)
370 self
.constituent_vnfrs
= constituent_vnfrs
371 self
.mon_params_pollers
= []
373 def xpath(self
, param_id
=None):
374 return self
.project
.add_project("D,/nsr:ns-instance-opdata/nsr:nsr" +
375 "[nsr:ns-instance-config-ref='{}']".format(self
.nsr
.ns_instance_config_ref
) +
376 "/nsr:monitoring-param" +
377 ("[nsr:id='{}']".format(param_id
) if param_id
else ""))
381 self
.reg
= yield from self
.dts
.register(xpath
=self
.xpath(),
382 flags
=rwdts
.Flag
.PUBLISHER|rwdts
.Flag
.CACHE|rwdts
.Flag
.NO_PREP_READ
)
384 assert self
.reg
is not None
386 def callback(self
, nsr_mon_param_msg
):
387 """Callback that triggers update.
389 self
.reg
.update_element(
390 self
.xpath(param_id
=nsr_mon_param_msg
.id),
395 nsd
= self
.store
.get_nsd(self
.nsr
.nsd_ref
)
396 mon_params
= NsrMonitoringParam
.create_nsr_mon_params(
398 self
.constituent_vnfrs
,
401 for monp
in mon_params
:
402 poller
= NsrMonitoringParamPoller
.from_handler(
405 callback
=self
.callback
)
407 self
.mon_params_pollers
.append(poller
)
408 yield from poller
.register()
409 yield from poller
.start()
413 for poller
in self
.mon_params_pollers
:
417 def deregister(self
):
418 """ de-register with dts """
419 if self
.reg
is not None:
420 self
.reg
.deregister()