update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwmonparam / rift / tasklets / rwmonparam / nsr_core.py
index 78a3c8f..d94eb8d 100644 (file)
 @date 09-Jul-2016
 
 """
-
 import asyncio
+import collections
 import functools
+import gi
 import uuid
 
+import rift.tasklets
+
 from gi.repository import (RwDts as rwdts, NsrYang)
 import rift.mano.dts as mano_dts
+gi.require_version('RwKeyspec', '1.0')
+from gi.repository.RwKeyspec import quoted_key
 
 from . import aggregator as aggregator
 
@@ -37,68 +42,68 @@ class MissingValueField(Exception):
 
 class VnfrMonitoringParamSubscriber(mano_dts.AbstractOpdataSubscriber):
     """Registers for VNFR monitoring parameter changes.
-    
+
     Attributes:
         monp_id (str): Monitoring Param ID
         vnfr_id (str): VNFR ID
     """
-    def __init__(self, log, dts, loop, vnfr_id, monp_id, callback=None):
-        super().__init__(log, dts, loop, callback)
+    def __init__(self, log, dts, loop, project, vnfr_id, monp_id, callback=None):
+        super().__init__(log, dts, loop, project, callback)
         self.vnfr_id = vnfr_id
         self.monp_id = monp_id
 
     def get_xpath(self):
-        return("D,/vnfr:vnfr-catalog" +
-               "/vnfr:vnfr[vnfr:id='{}']".format(self.vnfr_id) +
+        return self.project.add_project(("D,/vnfr:vnfr-catalog" +
+               "/vnfr:vnfr[vnfr:id={}]".format(quoted_key(self.vnfr_id)) +
                "/vnfr:monitoring-param" +
-               "[vnfr:id='{}']".format(self.monp_id))
+               "[vnfr:id={}]".format(quoted_key(self.monp_id))))
 
 
 class NsrMonitoringParam():
     """Class that handles NS Mon-param data.
     """
-    MonParamMsg = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_MonitoringParam
+    MonParamMsg = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_MonitoringParam
     MISSING = None
     DEFAULT_AGGREGATION_TYPE = "AVERAGE"
 
     @classmethod
-    def create_nsr_mon_params(cls, nsd, constituent_vnfrs, store):
+    def create_nsr_mon_params(cls, nsd, constituent_vnfrs, mon_param_project):
         """Convenience class that constructs NSMonitoringParam objects
-        
+
         Args:
-            nsd (RwNsdYang.YangData_Nsd_NsdCatalog_Nsd): Nsd object
+            nsd (RwNsdYang.YangData_RwProject_Project_NsdCatalog_Nsd): Nsd object
             constituent_vnfrs (list): List of constituent vnfr objects of NSR
-            store (SubscriberStore): Store object instance
-        
+            mon_param_project (MonParamProject): Store object instance
+
         Returns:
             list NsrMonitoringParam object.
 
         Also handles legacy NSD descriptor which has no mon-param defines. In
         such cases the mon-params are created from VNFD's mon-param config.
         """
-        MonParamMsg = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_MonitoringParam
-
         mon_params = []
         for mon_param_msg in nsd.monitoring_param:
             mon_params.append(NsrMonitoringParam(
                     mon_param_msg,
-                    constituent_vnfrs
+                    constituent_vnfrs,
+                    mon_param_name=mon_param_msg.name
                     ))
 
         # Legacy Handling.
         # This indicates that the NSD had no mon-param config.
         if not nsd.monitoring_param:
             for vnfr in constituent_vnfrs:
-                vnfd = store.get_vnfd(vnfr.vnfd.id)
+                vnfd = mon_param_project.get_vnfd(vnfr.vnfd.id)
                 for monp in vnfd.monitoring_param:
                     mon_params.append(NsrMonitoringParam(
                         monp,
                         [vnfr],
-                        is_legacy=True))
+                        is_legacy=True,
+                        mon_param_name=monp.name))
 
         return mon_params
 
-    def __init__(self, monp_config, constituent_vnfrs, is_legacy=False):
+    def __init__(self, monp_config, constituent_vnfrs, is_legacy=False, mon_param_name=None):
         """
         Args:
             monp_config (GiObject): Config data to create the NSR mon-param msg
@@ -106,6 +111,7 @@ class NsrMonitoringParam():
             is_legacy (bool, optional): If set then the mon-param are created from
                 vnfd's config and not NSD's config.
         """
+        self._nsd_mon_param_msg = monp_config
         self._constituent_vnfr_map = {vnfr.id:vnfr for vnfr in constituent_vnfrs}
 
         # An internal store to hold the data
@@ -116,12 +122,42 @@ class NsrMonitoringParam():
         # create_nsr_mon_params() is already validating for 'is_legacy' by checking if
         # nsd is having 'monitoring_param'. So removing 'self.aggregation_type is None' check for is_legacy.
         self.is_legacy = is_legacy
+        self.mon_param_name = mon_param_name
 
         if not is_legacy:
-            self._msg = self._convert_nsd_msg(monp_config)
+            self._msg = self._convert_nsd_msg()
         else:
+            # TODO remove arg for consistency
             self._msg = self._convert_vnfd_msg(monp_config)
 
+    def add_vnfr(self, vnfr):
+        # If already added ignore
+        if vnfr.id in self._constituent_vnfr_map:
+            return
+
+        # Update the map
+        self._constituent_vnfr_map[vnfr.id] = vnfr
+
+        if not self.is_legacy:
+            self._msg = self._convert_nsd_msg()
+
+    def delete_vnfr(self, vnfr):
+        # Update the map
+        if vnfr.id in self._constituent_vnfr_map:
+            del self._constituent_vnfr_map[vnfr.id]
+
+            # Delete the value stores.
+            for vnfr_id, monp_id in list(self.vnfr_monparams.keys()):
+                if vnfr_id == vnfr.id:
+                    del self.vnfr_monparams[(vnfr_id, monp_id)]
+
+        if not self.is_legacy:
+            self._msg = self._convert_nsd_msg()
+
+    @property
+    def nsd_mon_param_msg(self):
+        return self._nsd_mon_param_msg
+
     @property
     def nsr_mon_param_msg(self):
         """Gi object msg"""
@@ -175,14 +211,6 @@ class NsrMonitoringParam():
 
         return None
 
-    def _constituent_vnfrs(self, constituent_vnfr_ids):
-        # Fetch the VNFRs
-        vnfr_map = {}
-        for constituent_vnfr in constituent_vnfr_ids:
-            vnfr_id = constituent_vnfr.vnfr_id
-            vnfr_map[vnfr_id] = self._store.get_vnfr(vnfr_id)
-
-        return vnfr_map
 
     def _extract_ui_elements(self, monp):
         ui_fields = ["group_tag", "description", "widget_type", "units", "value_type"]
@@ -191,34 +219,40 @@ class NsrMonitoringParam():
         return dict(zip(ui_fields, ui_data))
 
 
-    def _convert_nsd_msg(self, nsd_monp):
-        """Create initial msg without values"""
-        vnfd_to_vnfr = {vnfr.vnfd.id: vnfr_id
-                for vnfr_id, vnfr in self._constituent_vnfr_map.items()}
+    def _convert_nsd_msg(self):
+        """Create/update msg. This is also called when a new VNFR is added."""
+
+        # For a single VNFD there might be multiple vnfrs
+        vnfd_to_vnfr = collections.defaultdict(list)
+        for vnfr_id, vnfr in self._constituent_vnfr_map.items():
+            vnfd_to_vnfr[vnfr.vnfd.id].append(vnfr_id)
 
         # First, convert the monp param ref from vnfd to vnfr terms.
         vnfr_mon_param_ref = []
-        for vnfd_mon in nsd_monp.vnfd_monitoring_param:
-            vnfr_id = vnfd_to_vnfr[vnfd_mon.vnfd_id_ref]
+        for vnfd_mon in self.nsd_mon_param_msg.vnfd_monitoring_param:
+            vnfr_ids = vnfd_to_vnfr[vnfd_mon.vnfd_id_ref]
             monp_id = vnfd_mon.vnfd_monitoring_param_ref
 
-            self.vnfr_monparams[(vnfr_id, monp_id)] = self.MISSING
+            for vnfr_id in vnfr_ids:
+                key = (vnfr_id, monp_id)
+                if key not in self.vnfr_monparams:
+                    self.vnfr_monparams[key] = self.MISSING
 
-            vnfr_mon_param_ref.append({
-                'vnfr_id_ref': vnfr_id,
-                'vnfr_mon_param_ref': monp_id
-                })
+                vnfr_mon_param_ref.append({
+                    'vnfr_id_ref': vnfr_id,
+                    'vnfr_mon_param_ref': monp_id
+                    })
 
         monp_fields = {
                 # For now both the NSD and NSR's monp ID are same.
-                'id': nsd_monp.id,
-                'name': nsd_monp.name,
-                'nsd_mon_param_ref': nsd_monp.id,
+                'id': self.nsd_mon_param_msg.id,
+                'name': self.nsd_mon_param_msg.name,
+                'nsd_mon_param_ref': self.nsd_mon_param_msg.id,
                 'vnfr_mon_param_ref': vnfr_mon_param_ref,
-                'aggregation_type': nsd_monp.aggregation_type
+                'aggregation_type': self.nsd_mon_param_msg.aggregation_type
             }
 
-        ui_fields = self._extract_ui_elements(nsd_monp)
+        ui_fields = self._extract_ui_elements(self.nsd_mon_param_msg)
         monp_fields.update(ui_fields)
         monp = self.MonParamMsg.from_dict(monp_fields)
 
@@ -252,6 +286,7 @@ class NsrMonitoringParam():
             value (Tuple): (value_type, value)
         """
         self.vnfr_monparams[key] = value
+        
 
     def update_ns_value(self, value_field, value):
         """Updates the NS mon-param data with the aggregated value.
@@ -278,19 +313,20 @@ class NsrMonitoringParamPoller(mano_dts.DtsHandler):
     def from_handler(cls, handler, monp, callback):
         """Convenience class to build NsrMonitoringParamPoller object.
         """
-        return cls(handler.log, handler.dts, handler.loop, monp, callback)
+        return cls(handler.log, handler.dts, handler.loop, handler.project,
+                   monp, callback)
 
-    def __init__(self, log, dts, loop, monp, callback=None):
+    def __init__(self, log, dts, loop, project, monp, callback=None):
         """
         Args:
             monp (NsrMonitoringParam): Param object
             callback (None, optional): Callback to be triggered after value has
                 been aggregated.
         """
-        super().__init__(log, dts, loop)
+        super().__init__(log, dts, loop, project)
 
         self.monp = monp
-        self.subscribers = []
+        self.subscribers = {}
         self.callback = callback
         self._agg = None
 
@@ -310,7 +346,6 @@ class NsrMonitoringParamPoller(mano_dts.DtsHandler):
         """
         key = (vnfr_id, monp.id)
         value = NsrMonitoringParam.extract_value(monp)
-
         if not value:
             return
 
@@ -336,67 +371,176 @@ class NsrMonitoringParamPoller(mano_dts.DtsHandler):
         if self.callback:
             self.callback(self.monp.nsr_mon_param_msg)
 
+    @asyncio.coroutine
+    def create_pollers(self, create=False, register=False):
+        if (create):
+            for vnfr_id, monp_id in self.monp.vnfr_ids:
+                key = (vnfr_id, monp_id)
+                callback = functools.partial(self.update_value, vnfr_id=vnfr_id)
+                
+                # if the poller is already created, ignore
+                if key in self.subscribers:
+                    continue
+                
+                self.subscribers[key] = VnfrMonitoringParamSubscriber(
+                    self.loop,
+                    self.dts,
+                    self.loop,
+                    self.project,
+                    vnfr_id,
+                    monp_id,
+                    callback=callback)
+                
+                if register:
+                    yield from self.subscribers[key].register()
+        
+    @asyncio.coroutine
+    def update(self, vnfr):
+        self.monp.add_vnfr(vnfr)
+        yield from self.create_pollers(create=False, register=True)
+
+    @asyncio.coroutine
+    def delete(self, vnfr):
+        self.monp.delete_vnfr(vnfr)
+        for vnfr_id, monp_id in list(self.subscribers.keys()):
+            if vnfr_id != vnfr.id:
+                continue
+
+            key = (vnfr_id, monp_id)
+            sub = self.subscribers.pop(key)
+            sub.deregister()
+
+
     @asyncio.coroutine
     def register(self):
-        for vnfr_id, monp_id in self.monp.vnfr_ids:
-            callback = functools.partial(self.update_value, vnfr_id=vnfr_id)
-            self.subscribers.append(VnfrMonitoringParamSubscriber(
-                self.loop, self.dts, self.loop, vnfr_id, monp_id, callback=callback))
+        yield from self.create_pollers()
 
     @asyncio.coroutine
     def start(self):
-        for sub in self.subscribers:
+        for sub in self.subscribers.values():
             yield from sub.register()
 
     def stop(self):
-        for sub in self.subscribers:
+        for sub in self.subscribers.values():
             sub.deregister()
-
+    
+    def retrieve_data(self):
+        return self.monp.nsr_mon_param_msg
 
 class NsrMonitorDtsHandler(mano_dts.DtsHandler):
     """ NSR monitoring class """
 
-    def __init__(self, log, dts, loop, nsr, constituent_vnfrs, store):
+    def __init__(self, log, dts, loop, project, nsr, constituent_vnfrs):
         """
         Args:
-            nsr (RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr): NSR object
+            nsr (RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr): NSR object
             constituent_vnfrs (list): list of VNFRs in NSR
-            store (SubscriberStore): Store instance
         """
-        super().__init__(log, dts, loop)
+        super().__init__(log, dts, loop, project)
 
         self.nsr = nsr
-        self.store = store
         self.constituent_vnfrs = constituent_vnfrs
+        self.dts_updates = dict()
+        self.dts_update_task = None
         self.mon_params_pollers = []
-
+        
+    def nsr_xpath(self):
+        return self.project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr" +
+                "[nsr:ns-instance-config-ref={}]".format(quoted_key(self.nsr.ns_instance_config_ref)))
+    
     def xpath(self, param_id=None):
-        return ("D,/nsr:ns-instance-opdata/nsr:nsr" +
-            "[nsr:ns-instance-config-ref='{}']".format(self.nsr.ns_instance_config_ref) +
+        return self.project.add_project("D,/nsr:ns-instance-opdata/nsr:nsr" +
+            "[nsr:ns-instance-config-ref={}]".format(quoted_key(self.nsr.ns_instance_config_ref)) +
             "/nsr:monitoring-param" +
-            ("[nsr:id='{}']".format(param_id) if param_id else ""))
-
+            ("[nsr:id={}]".format(quoted_key(param_id)) if param_id else ""))
+        
     @asyncio.coroutine
     def register(self):
+        @asyncio.coroutine
+        def on_prepare(xact_info, query_action, ks_path, msg):
+            nsrmsg =None
+            xpath=None
+            if (self.reg_ready):
+                if (query_action ==  rwdts.QueryAction.READ):
+                    if (len(self.mon_params_pollers)):
+                        nsr_dict = {"ns_instance_config_ref": self.nsr.ns_instance_config_ref}
+                        nsrmsg = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr. \
+                                 from_dict(nsr_dict)
+                        xpath = self.nsr_xpath()
+
+                        for poller in self.mon_params_pollers:
+                            mp_dict = \
+                                      NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_MonitoringParam. \
+                                      from_dict(poller.retrieve_data().as_dict())
+                            nsrmsg.monitoring_param.append(mp_dict)
+
+            try:
+                xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.ACK,
+                                        xpath=self.nsr_xpath(),
+                                        msg=nsrmsg)
+            except rift.tasklets.dts.ResponseError:
+                pass
+
+        @asyncio.coroutine
+        def on_ready(regh, status):
+            self.reg_ready = 1
+
+        handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
+        self.reg_ready = 0
+
         self.reg = yield from self.dts.register(xpath=self.xpath(),
-                  flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
+                                                flags=rwdts.Flag.PUBLISHER,
+                                                handler=handler)
 
         assert self.reg is not None
 
+    @asyncio.coroutine
+    def nsr_monparam_update(self):
+        #check if the earlier xact is done or there is an xact
+        try:
+            if (len(self.dts_updates) == 0):
+                self.dts_update_task = None
+                return
+            nsr_dict = {"ns_instance_config_ref": self.nsr.ns_instance_config_ref}
+            nsrmsg = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.from_dict(nsr_dict)
+
+            for k,v in self.dts_updates.items():
+                mp_dict = NsrYang. \
+                          YangData_RwProject_Project_NsInstanceOpdata_Nsr_MonitoringParam. \
+                          from_dict(v.as_dict())
+                nsrmsg.monitoring_param.append(mp_dict)
+            self.dts_updates.clear()
+
+            yield from self.dts.query_update(self.nsr_xpath(), rwdts.XactFlag.ADVISE,
+                                             nsrmsg)
+
+            self.dts_update_task = None
+            if (len(self.dts_updates) == 0):
+                #schedule a DTS task to update the NSR again
+                self.add_dtsupdate_task()
+
+        except Exception as e:
+            self.log.exception("Exception updating NSR mon-param: %s", str(e))
+
+    def add_dtsupdate_task(self):
+        if (self.dts_update_task is None):
+            self.dts_update_task = asyncio.ensure_future(self.nsr_monparam_update(), loop=self.loop)
+        
     def callback(self, nsr_mon_param_msg):
         """Callback that triggers update.
         """
-        self.reg.update_element(
-                self.xpath(param_id=nsr_mon_param_msg.id),
-                nsr_mon_param_msg)
-
+        self.dts_updates[nsr_mon_param_msg.id] = nsr_mon_param_msg
+        #schedule a DTS task to update the NSR if one does not exist
+        self.add_dtsupdate_task()
+    
     @asyncio.coroutine
     def start(self):
-        nsd = self.store.get_nsd(self.nsr.nsd_ref)
+        nsd = self.project.get_nsd(self.nsr.nsd_ref)
+
         mon_params = NsrMonitoringParam.create_nsr_mon_params(
                 nsd,
                 self.constituent_vnfrs,
-                self.store)
+                self.project)
 
         for monp in mon_params:
             poller = NsrMonitoringParamPoller.from_handler(
@@ -408,6 +552,18 @@ class NsrMonitorDtsHandler(mano_dts.DtsHandler):
             yield from poller.register()
             yield from poller.start()
 
+    @asyncio.coroutine
+    def update(self, additional_vnfrs):
+        for vnfr in additional_vnfrs:
+            for poller in self.mon_params_pollers:
+                yield from poller.update(vnfr)
+
+    @asyncio.coroutine
+    def delete(self, deleted_vnfrs):
+        for vnfr in deleted_vnfrs:
+            for poller in self.mon_params_pollers:
+                yield from poller.delete(vnfr)
+
     def stop(self):
         self.deregister()
         for poller in self.mon_params_pollers:
@@ -419,3 +575,9 @@ class NsrMonitorDtsHandler(mano_dts.DtsHandler):
         if self.reg is not None:
             self.reg.deregister()
             self.reg = None
+
+    def apply_vnfr_mon(self, msg, vnfr_id):
+        """ Change in vnfr mon to ne applied"""
+        for poller in self.mon_params_pollers:
+            if (poller.monp.mon_param_name == msg.name):
+                poller.update_value(msg, rwdts.QueryAction.UPDATE, vnfr_id)