X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwmonparam%2Frift%2Ftasklets%2Frwmonparam%2Fnsr_core.py;h=d94eb8d365ae1926524bdca756afb3155fea3db0;hb=f314b4af9744068a7ed7a6a6314220c3aa857523;hp=78a3c8fc14a8c269f9665a5746bfd3b98d4ea4b0;hpb=6f1a3fe149e4a6b9803382cb299c902f4cf58ec9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwmonparam/rift/tasklets/rwmonparam/nsr_core.py b/rwlaunchpad/plugins/rwmonparam/rift/tasklets/rwmonparam/nsr_core.py index 78a3c8fc..d94eb8d3 100644 --- a/rwlaunchpad/plugins/rwmonparam/rift/tasklets/rwmonparam/nsr_core.py +++ b/rwlaunchpad/plugins/rwmonparam/rift/tasklets/rwmonparam/nsr_core.py @@ -20,13 +20,18 @@ @date 09-Jul-2016 """ - import asyncio +import collections import functools +import gi import uuid +import rift.tasklets + from gi.repository import (RwDts as rwdts, NsrYang) import rift.mano.dts as mano_dts +gi.require_version('RwKeyspec', '1.0') +from gi.repository.RwKeyspec import quoted_key from . import aggregator as aggregator @@ -37,68 +42,68 @@ class MissingValueField(Exception): class VnfrMonitoringParamSubscriber(mano_dts.AbstractOpdataSubscriber): """Registers for VNFR monitoring parameter changes. - + Attributes: monp_id (str): Monitoring Param ID vnfr_id (str): VNFR ID """ - def __init__(self, log, dts, loop, vnfr_id, monp_id, callback=None): - super().__init__(log, dts, loop, callback) + def __init__(self, log, dts, loop, project, vnfr_id, monp_id, callback=None): + super().__init__(log, dts, loop, project, callback) self.vnfr_id = vnfr_id self.monp_id = monp_id def get_xpath(self): - return("D,/vnfr:vnfr-catalog" + - "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id) + + return self.project.add_project(("D,/vnfr:vnfr-catalog" + + "/vnfr:vnfr[vnfr:id={}]".format(quoted_key(self.vnfr_id)) + "/vnfr:monitoring-param" + - "[vnfr:id='{}']".format(self.monp_id)) + "[vnfr:id={}]".format(quoted_key(self.monp_id)))) class NsrMonitoringParam(): """Class that handles NS Mon-param data. """ - MonParamMsg = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_MonitoringParam + MonParamMsg = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_MonitoringParam MISSING = None DEFAULT_AGGREGATION_TYPE = "AVERAGE" @classmethod - def create_nsr_mon_params(cls, nsd, constituent_vnfrs, store): + def create_nsr_mon_params(cls, nsd, constituent_vnfrs, mon_param_project): """Convenience class that constructs NSMonitoringParam objects - + Args: - nsd (RwNsdYang.YangData_Nsd_NsdCatalog_Nsd): Nsd object + nsd (RwNsdYang.YangData_RwProject_Project_NsdCatalog_Nsd): Nsd object constituent_vnfrs (list): List of constituent vnfr objects of NSR - store (SubscriberStore): Store object instance - + mon_param_project (MonParamProject): Store object instance + Returns: list NsrMonitoringParam object. Also handles legacy NSD descriptor which has no mon-param defines. In such cases the mon-params are created from VNFD's mon-param config. """ - MonParamMsg = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_MonitoringParam - mon_params = [] for mon_param_msg in nsd.monitoring_param: mon_params.append(NsrMonitoringParam( mon_param_msg, - constituent_vnfrs + constituent_vnfrs, + mon_param_name=mon_param_msg.name )) # Legacy Handling. # This indicates that the NSD had no mon-param config. if not nsd.monitoring_param: for vnfr in constituent_vnfrs: - vnfd = store.get_vnfd(vnfr.vnfd.id) + vnfd = mon_param_project.get_vnfd(vnfr.vnfd.id) for monp in vnfd.monitoring_param: mon_params.append(NsrMonitoringParam( monp, [vnfr], - is_legacy=True)) + is_legacy=True, + mon_param_name=monp.name)) return mon_params - def __init__(self, monp_config, constituent_vnfrs, is_legacy=False): + def __init__(self, monp_config, constituent_vnfrs, is_legacy=False, mon_param_name=None): """ Args: monp_config (GiObject): Config data to create the NSR mon-param msg @@ -106,6 +111,7 @@ class NsrMonitoringParam(): is_legacy (bool, optional): If set then the mon-param are created from vnfd's config and not NSD's config. """ + self._nsd_mon_param_msg = monp_config self._constituent_vnfr_map = {vnfr.id:vnfr for vnfr in constituent_vnfrs} # An internal store to hold the data @@ -116,12 +122,42 @@ class NsrMonitoringParam(): # create_nsr_mon_params() is already validating for 'is_legacy' by checking if # nsd is having 'monitoring_param'. So removing 'self.aggregation_type is None' check for is_legacy. self.is_legacy = is_legacy + self.mon_param_name = mon_param_name if not is_legacy: - self._msg = self._convert_nsd_msg(monp_config) + self._msg = self._convert_nsd_msg() else: + # TODO remove arg for consistency self._msg = self._convert_vnfd_msg(monp_config) + def add_vnfr(self, vnfr): + # If already added ignore + if vnfr.id in self._constituent_vnfr_map: + return + + # Update the map + self._constituent_vnfr_map[vnfr.id] = vnfr + + if not self.is_legacy: + self._msg = self._convert_nsd_msg() + + def delete_vnfr(self, vnfr): + # Update the map + if vnfr.id in self._constituent_vnfr_map: + del self._constituent_vnfr_map[vnfr.id] + + # Delete the value stores. + for vnfr_id, monp_id in list(self.vnfr_monparams.keys()): + if vnfr_id == vnfr.id: + del self.vnfr_monparams[(vnfr_id, monp_id)] + + if not self.is_legacy: + self._msg = self._convert_nsd_msg() + + @property + def nsd_mon_param_msg(self): + return self._nsd_mon_param_msg + @property def nsr_mon_param_msg(self): """Gi object msg""" @@ -175,14 +211,6 @@ class NsrMonitoringParam(): return None - def _constituent_vnfrs(self, constituent_vnfr_ids): - # Fetch the VNFRs - vnfr_map = {} - for constituent_vnfr in constituent_vnfr_ids: - vnfr_id = constituent_vnfr.vnfr_id - vnfr_map[vnfr_id] = self._store.get_vnfr(vnfr_id) - - return vnfr_map def _extract_ui_elements(self, monp): ui_fields = ["group_tag", "description", "widget_type", "units", "value_type"] @@ -191,34 +219,40 @@ class NsrMonitoringParam(): return dict(zip(ui_fields, ui_data)) - def _convert_nsd_msg(self, nsd_monp): - """Create initial msg without values""" - vnfd_to_vnfr = {vnfr.vnfd.id: vnfr_id - for vnfr_id, vnfr in self._constituent_vnfr_map.items()} + def _convert_nsd_msg(self): + """Create/update msg. This is also called when a new VNFR is added.""" + + # For a single VNFD there might be multiple vnfrs + vnfd_to_vnfr = collections.defaultdict(list) + for vnfr_id, vnfr in self._constituent_vnfr_map.items(): + vnfd_to_vnfr[vnfr.vnfd.id].append(vnfr_id) # First, convert the monp param ref from vnfd to vnfr terms. vnfr_mon_param_ref = [] - for vnfd_mon in nsd_monp.vnfd_monitoring_param: - vnfr_id = vnfd_to_vnfr[vnfd_mon.vnfd_id_ref] + for vnfd_mon in self.nsd_mon_param_msg.vnfd_monitoring_param: + vnfr_ids = vnfd_to_vnfr[vnfd_mon.vnfd_id_ref] monp_id = vnfd_mon.vnfd_monitoring_param_ref - self.vnfr_monparams[(vnfr_id, monp_id)] = self.MISSING + for vnfr_id in vnfr_ids: + key = (vnfr_id, monp_id) + if key not in self.vnfr_monparams: + self.vnfr_monparams[key] = self.MISSING - vnfr_mon_param_ref.append({ - 'vnfr_id_ref': vnfr_id, - 'vnfr_mon_param_ref': monp_id - }) + vnfr_mon_param_ref.append({ + 'vnfr_id_ref': vnfr_id, + 'vnfr_mon_param_ref': monp_id + }) monp_fields = { # For now both the NSD and NSR's monp ID are same. - 'id': nsd_monp.id, - 'name': nsd_monp.name, - 'nsd_mon_param_ref': nsd_monp.id, + 'id': self.nsd_mon_param_msg.id, + 'name': self.nsd_mon_param_msg.name, + 'nsd_mon_param_ref': self.nsd_mon_param_msg.id, 'vnfr_mon_param_ref': vnfr_mon_param_ref, - 'aggregation_type': nsd_monp.aggregation_type + 'aggregation_type': self.nsd_mon_param_msg.aggregation_type } - ui_fields = self._extract_ui_elements(nsd_monp) + ui_fields = self._extract_ui_elements(self.nsd_mon_param_msg) monp_fields.update(ui_fields) monp = self.MonParamMsg.from_dict(monp_fields) @@ -252,6 +286,7 @@ class NsrMonitoringParam(): value (Tuple): (value_type, value) """ self.vnfr_monparams[key] = value + def update_ns_value(self, value_field, value): """Updates the NS mon-param data with the aggregated value. @@ -278,19 +313,20 @@ class NsrMonitoringParamPoller(mano_dts.DtsHandler): def from_handler(cls, handler, monp, callback): """Convenience class to build NsrMonitoringParamPoller object. """ - return cls(handler.log, handler.dts, handler.loop, monp, callback) + return cls(handler.log, handler.dts, handler.loop, handler.project, + monp, callback) - def __init__(self, log, dts, loop, monp, callback=None): + def __init__(self, log, dts, loop, project, monp, callback=None): """ Args: monp (NsrMonitoringParam): Param object callback (None, optional): Callback to be triggered after value has been aggregated. """ - super().__init__(log, dts, loop) + super().__init__(log, dts, loop, project) self.monp = monp - self.subscribers = [] + self.subscribers = {} self.callback = callback self._agg = None @@ -310,7 +346,6 @@ class NsrMonitoringParamPoller(mano_dts.DtsHandler): """ key = (vnfr_id, monp.id) value = NsrMonitoringParam.extract_value(monp) - if not value: return @@ -336,67 +371,176 @@ class NsrMonitoringParamPoller(mano_dts.DtsHandler): if self.callback: self.callback(self.monp.nsr_mon_param_msg) + @asyncio.coroutine + def create_pollers(self, create=False, register=False): + if (create): + for vnfr_id, monp_id in self.monp.vnfr_ids: + key = (vnfr_id, monp_id) + callback = functools.partial(self.update_value, vnfr_id=vnfr_id) + + # if the poller is already created, ignore + if key in self.subscribers: + continue + + self.subscribers[key] = VnfrMonitoringParamSubscriber( + self.loop, + self.dts, + self.loop, + self.project, + vnfr_id, + monp_id, + callback=callback) + + if register: + yield from self.subscribers[key].register() + + @asyncio.coroutine + def update(self, vnfr): + self.monp.add_vnfr(vnfr) + yield from self.create_pollers(create=False, register=True) + + @asyncio.coroutine + def delete(self, vnfr): + self.monp.delete_vnfr(vnfr) + for vnfr_id, monp_id in list(self.subscribers.keys()): + if vnfr_id != vnfr.id: + continue + + key = (vnfr_id, monp_id) + sub = self.subscribers.pop(key) + sub.deregister() + + @asyncio.coroutine def register(self): - for vnfr_id, monp_id in self.monp.vnfr_ids: - callback = functools.partial(self.update_value, vnfr_id=vnfr_id) - self.subscribers.append(VnfrMonitoringParamSubscriber( - self.loop, self.dts, self.loop, vnfr_id, monp_id, callback=callback)) + yield from self.create_pollers() @asyncio.coroutine def start(self): - for sub in self.subscribers: + for sub in self.subscribers.values(): yield from sub.register() def stop(self): - for sub in self.subscribers: + for sub in self.subscribers.values(): sub.deregister() - + + def retrieve_data(self): + return self.monp.nsr_mon_param_msg class NsrMonitorDtsHandler(mano_dts.DtsHandler): """ NSR monitoring class """ - def __init__(self, log, dts, loop, nsr, constituent_vnfrs, store): + def __init__(self, log, dts, loop, project, nsr, constituent_vnfrs): """ Args: - nsr (RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr): NSR object + nsr (RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr): NSR object constituent_vnfrs (list): list of VNFRs in NSR - store (SubscriberStore): Store instance """ - super().__init__(log, dts, loop) + super().__init__(log, dts, loop, project) self.nsr = nsr - self.store = store self.constituent_vnfrs = constituent_vnfrs + self.dts_updates = dict() + self.dts_update_task = None self.mon_params_pollers = [] - + + def nsr_xpath(self): + return self.project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr" + + "[nsr:ns-instance-config-ref={}]".format(quoted_key(self.nsr.ns_instance_config_ref))) + def xpath(self, param_id=None): - return ("D,/nsr:ns-instance-opdata/nsr:nsr" + - "[nsr:ns-instance-config-ref='{}']".format(self.nsr.ns_instance_config_ref) + + return self.project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr" + + "[nsr:ns-instance-config-ref={}]".format(quoted_key(self.nsr.ns_instance_config_ref)) + "/nsr:monitoring-param" + - ("[nsr:id='{}']".format(param_id) if param_id else "")) - + ("[nsr:id={}]".format(quoted_key(param_id)) if param_id else "")) + @asyncio.coroutine def register(self): + @asyncio.coroutine + def on_prepare(xact_info, query_action, ks_path, msg): + nsrmsg =None + xpath=None + if (self.reg_ready): + if (query_action == rwdts.QueryAction.READ): + if (len(self.mon_params_pollers)): + nsr_dict = {"ns_instance_config_ref": self.nsr.ns_instance_config_ref} + nsrmsg = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr. \ + from_dict(nsr_dict) + xpath = self.nsr_xpath() + + for poller in self.mon_params_pollers: + mp_dict = \ + NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_MonitoringParam. \ + from_dict(poller.retrieve_data().as_dict()) + nsrmsg.monitoring_param.append(mp_dict) + + try: + xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK, + xpath=self.nsr_xpath(), + msg=nsrmsg) + except rift.tasklets.dts.ResponseError: + pass + + @asyncio.coroutine + def on_ready(regh, status): + self.reg_ready = 1 + + handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready) + self.reg_ready = 0 + self.reg = yield from self.dts.register(xpath=self.xpath(), - flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ) + flags=rwdts.Flag.PUBLISHER, + handler=handler) assert self.reg is not None + @asyncio.coroutine + def nsr_monparam_update(self): + #check if the earlier xact is done or there is an xact + try: + if (len(self.dts_updates) == 0): + self.dts_update_task = None + return + nsr_dict = {"ns_instance_config_ref": self.nsr.ns_instance_config_ref} + nsrmsg = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.from_dict(nsr_dict) + + for k,v in self.dts_updates.items(): + mp_dict = NsrYang. \ + YangData_RwProject_Project_NsInstanceOpdata_Nsr_MonitoringParam. \ + from_dict(v.as_dict()) + nsrmsg.monitoring_param.append(mp_dict) + self.dts_updates.clear() + + yield from self.dts.query_update(self.nsr_xpath(), rwdts.XactFlag.ADVISE, + nsrmsg) + + self.dts_update_task = None + if (len(self.dts_updates) == 0): + #schedule a DTS task to update the NSR again + self.add_dtsupdate_task() + + except Exception as e: + self.log.exception("Exception updating NSR mon-param: %s", str(e)) + + def add_dtsupdate_task(self): + if (self.dts_update_task is None): + self.dts_update_task = asyncio.ensure_future(self.nsr_monparam_update(), loop=self.loop) + def callback(self, nsr_mon_param_msg): """Callback that triggers update. """ - self.reg.update_element( - self.xpath(param_id=nsr_mon_param_msg.id), - nsr_mon_param_msg) - + self.dts_updates[nsr_mon_param_msg.id] = nsr_mon_param_msg + #schedule a DTS task to update the NSR if one does not exist + self.add_dtsupdate_task() + @asyncio.coroutine def start(self): - nsd = self.store.get_nsd(self.nsr.nsd_ref) + nsd = self.project.get_nsd(self.nsr.nsd_ref) + mon_params = NsrMonitoringParam.create_nsr_mon_params( nsd, self.constituent_vnfrs, - self.store) + self.project) for monp in mon_params: poller = NsrMonitoringParamPoller.from_handler( @@ -408,6 +552,18 @@ class NsrMonitorDtsHandler(mano_dts.DtsHandler): yield from poller.register() yield from poller.start() + @asyncio.coroutine + def update(self, additional_vnfrs): + for vnfr in additional_vnfrs: + for poller in self.mon_params_pollers: + yield from poller.update(vnfr) + + @asyncio.coroutine + def delete(self, deleted_vnfrs): + for vnfr in deleted_vnfrs: + for poller in self.mon_params_pollers: + yield from poller.delete(vnfr) + def stop(self): self.deregister() for poller in self.mon_params_pollers: @@ -419,3 +575,9 @@ class NsrMonitorDtsHandler(mano_dts.DtsHandler): if self.reg is not None: self.reg.deregister() self.reg = None + + def apply_vnfr_mon(self, msg, vnfr_id): + """ Change in vnfr mon to ne applied""" + for poller in self.mon_params_pollers: + if (poller.monp.mon_param_name == msg.name): + poller.update_value(msg, rwdts.QueryAction.UPDATE, vnfr_id)