update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwmonparam / rift / tasklets / rwmonparam / vnfr_core.py
index 6dc3a25..78bfd2d 100644 (file)
 #
 
 import asyncio
-import logging
 import collections
 import concurrent
-import types
-
+import gi
+import logging
 import requests
 import requests.auth
 import tornado.escape
+import types
 
 from requests.packages.urllib3.exceptions import InsecureRequestWarning
 
-import gi
 gi.require_version('RwDts', '1.0')
 import rift.tasklets
 from gi.repository import (
@@ -37,6 +36,9 @@ from gi.repository import (
 import rift.mano.dts as mano_dts
 import rwlogger
 import xmltodict, json
+gi.require_version('RwKeyspec', '1.0')
+from gi.repository.RwKeyspec import quoted_key
+
 
 class MonitoringParamError(Exception):
     """Monitoring Parameter error"""
@@ -226,12 +228,13 @@ class HTTPBasicAuth(object):
 
 
 class HTTPEndpoint(object):
-    def __init__(self, log, loop, ip_address, ep_msg):
+    def __init__(self, log, loop, ip_address, ep_msg, executor=None):
         self._log = log
         self._loop = loop
         self._ip_address = ip_address
         self._ep_msg = ep_msg
-
+        self._executor = executor
+        
         # This is to suppress HTTPS related warning as we do not support
         # certificate verification yet
         requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
@@ -269,6 +272,12 @@ class HTTPEndpoint(object):
            return self._ep_msg.method
         return "GET"
 
+    @property
+    def query_data(self):
+        if self._ep_msg.has_field("data"):
+           return self._ep_msg.data
+        return None
+
     @property
     def username(self):
         if self._ep_msg.has_field("username"):
@@ -320,9 +329,10 @@ class HTTPEndpoint(object):
     def _poll(self):
         try:
             resp = self._session.request(
-                    self.method, self.url, timeout=10, auth=self.auth,
-                    headers=self.headers, verify=False
-                    )
+                      self.method, self.url, timeout=10, auth=self.auth,
+                      headers=self.headers, verify=False, data=self.query_data
+                      )
+               
             resp.raise_for_status()
         except requests.exceptions.RequestException as e:
             msg = "Got HTTP error when request monitoring method {} from url {}: {}".format(
@@ -338,11 +348,17 @@ class HTTPEndpoint(object):
     @asyncio.coroutine
     def poll(self):
         try:
-            with concurrent.futures.ThreadPoolExecutor(1) as executor:
-                resp = yield from self._loop.run_in_executor(
+            if (self._executor is None):
+                with concurrent.futures.ThreadPoolExecutor(1) as executor:
+                    resp = yield from self._loop.run_in_executor(
                         executor,
                         self._poll,
-                        )
+                    )
+            else:
+                resp = yield from self._loop.run_in_executor(
+                    self._executor,
+                    self._poll,
+                )
 
         except MonitoringParamError as e:
             msg = "Caught exception when polling http endpoint: %s" % str(e)
@@ -464,7 +480,7 @@ class EndpointMonParamsPoller(object):
         self._on_update_cb = on_update_cb
 
         self._poll_task = None
-
+    
     @property
     def poll_interval(self):
         return self._endpoint.poll_interval
@@ -481,9 +497,9 @@ class EndpointMonParamsPoller(object):
     def _apply_response_to_mon_params(self, response_msg):
         for mon_param in self._mon_params:
             mon_param.extract_value_from_response(response_msg)
-
+        
         self._notify_subscriber()
-
+    
     @asyncio.coroutine
     def _poll_loop(self):
         self._log.debug("Starting http endpoint %s poll loop", self._endpoint.url)
@@ -491,6 +507,8 @@ class EndpointMonParamsPoller(object):
             try:
                 response = yield from self._endpoint.poll()
                 self._apply_response_to_mon_params(response)
+            except MonitoringParamError as e:
+                pass
             except concurrent.futures.CancelledError as e:
                 return
 
@@ -513,14 +531,18 @@ class EndpointMonParamsPoller(object):
 
         self._poll_task = None
 
+    def retrieve(self, xact_info, ks_path, send_handler):
+        send_handler(xact_info, self._get_mon_param_msgs())
 
+        
 class VnfMonitoringParamsController(object):
     def __init__(self, log, loop, vnfr_id, management_ip,
                  http_endpoint_msgs, monitoring_param_msgs,
-                 on_update_cb=None):
+                 on_update_cb=None, executor=None):
         self._log = log
         self._loop = loop
         self._vnfr_id = vnfr_id
+        self._executor = executor
         self._management_ip = management_ip
         self._http_endpoint_msgs = http_endpoint_msgs
         self._monitoring_param_msgs = monitoring_param_msgs
@@ -533,16 +555,15 @@ class VnfMonitoringParamsController(object):
                 self._endpoints, self._mon_params
                 )
         self._endpoint_pollers = self._create_endpoint_pollers(self._endpoint_mon_param_map)
-
+    
     def _create_endpoints(self):
         path_endpoint_map = {}
         for ep_msg in self._http_endpoint_msgs:
-            endpoint = HTTPEndpoint(
-                    self._log,
-                    self._loop,
-                    self._management_ip,
-                    ep_msg,
-                    )
+            endpoint = HTTPEndpoint(self._log,
+                                    self._loop,
+                                    self._management_ip,
+                                    ep_msg,self._executor)
+                
             path_endpoint_map[endpoint.path] = endpoint
 
         return path_endpoint_map
@@ -576,9 +597,8 @@ class VnfMonitoringParamsController(object):
                     mon_params,
                     self._on_update_cb
                     )
-
             pollers.append(poller)
-
+            
         return pollers
 
     @property
@@ -609,36 +629,41 @@ class VnfMonitoringParamsController(object):
         for poller in self._endpoint_pollers:
             poller.stop()
 
-
+    def retrieve(self, xact_info, ks_path, send_handler):
+        """Retrieve Monitoring params information """
+        for poller in self._endpoint_pollers:
+            poller.retrieve(xact_info, ks_path, send_handler)
+            
 class VnfMonitorDtsHandler(mano_dts.DtsHandler):
     """ VNF monitoring class """
     # List of list: So we need to register for the list in the deepest level
     XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr/vnfr:monitoring-param"
 
     @classmethod
-    def from_vnf_data(cls, tasklet, vnfr_msg, vnfd_msg):
-        handler = cls(tasklet.log, tasklet.dts, tasklet.loop,
+    def from_vnf_data(cls, project, vnfr_msg, vnfd_msg):
+        handler = cls(project.log, project.dts, project.loop, project,
                 vnfr_msg.id, vnfr_msg.mgmt_interface.ip_address,
-                vnfd_msg.monitoring_param, vnfd_msg.http_endpoint)
+                      vnfd_msg.monitoring_param, vnfd_msg.http_endpoint)
 
         return handler
 
-    def __init__(self, log, dts, loop, vnfr_id, mgmt_ip, params, endpoints):
-        super().__init__(log, dts, loop)
+    def __init__(self, log, dts, loop, project, vnfr_id, mgmt_ip, params, endpoints, executor=None):
+        super().__init__(log, dts, loop, project)
 
         self._mgmt_ip = mgmt_ip
         self._vnfr_id = vnfr_id
-
+        self._executor = executor
+        
         mon_params = []
         for mon_param in params:
-            param = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(
+            param = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam.from_dict(
                     mon_param.as_dict()
                     )
             mon_params.append(param)
 
         http_endpoints = []
         for endpoint in endpoints:
-            endpoint = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_HttpEndpoint.from_dict(
+            endpoint = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_HttpEndpoint.from_dict(
                     endpoint.as_dict()
                     )
             http_endpoints.append(endpoint)
@@ -648,23 +673,33 @@ class VnfMonitorDtsHandler(mano_dts.DtsHandler):
         self.log.debug(" - Monitoring Params: %s", mon_params)
 
         self._mon_param_controller = VnfMonitoringParamsController(
-                self.log,
-                self.loop,
-                self._vnfr_id,
-                self._mgmt_ip,
-                http_endpoints,
-                mon_params,
-                self.on_update_mon_params
-                )
+            self.log,
+            self.loop,
+            self._vnfr_id,
+            self._mgmt_ip,
+            http_endpoints,
+            mon_params,
+            on_update_cb = self.on_update_mon_params,
+            executor=self._executor,
+        )
+        self._nsr_mon = None
 
     def on_update_mon_params(self, mon_param_msgs):
         for param_msg in mon_param_msgs:
-            self.reg.update_element(
-                    self.xpath(param_msg.id),
-                    param_msg,
-                    rwdts.XactFlag.ADVISE
-                   )
-
+            #self.reg.update_element(
+            #       self.xpath(param_msg.id),
+            #      param_msg,
+            #     rwdts.XactFlag.ADVISE
+            #   )
+            if (self._nsr_mon is not None):
+                self._nsr_mon.apply_vnfr_mon(param_msg, self._vnfr_id)
+    
+    def update_dts_read(self, xact_info, mon_param_msgs):
+        for param_msg in mon_param_msgs:
+           xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
+                                   xpath=self.xpath(param_msg.id),
+                                   msg=param_msg)
+    
     def start(self):
         self._mon_param_controller.start()
 
@@ -674,10 +709,10 @@ class VnfMonitorDtsHandler(mano_dts.DtsHandler):
 
     def xpath(self, param_id=None):
         """ Monitoring params xpath """
-        return("D,/vnfr:vnfr-catalog" +
-               "/vnfr:vnfr[vnfr:id='{}']".format(self._vnfr_id) +
+        return self.project.add_project(("D,/vnfr:vnfr-catalog" +
+               "/vnfr:vnfr[vnfr:id={}]".format(quoted_key(self._vnfr_id)) +
                "/vnfr:monitoring-param" +
-               ("[vnfr:id='{}']".format(param_id) if param_id else ""))
+               ("[vnfr:id={}]".format(quoted_key(param_id)) if param_id else "")))
 
     @property
     def msg(self):
@@ -686,13 +721,26 @@ class VnfMonitorDtsHandler(mano_dts.DtsHandler):
 
     def __del__(self):
         self.stop()
-
+    
     @asyncio.coroutine
     def register(self):
         """ Register with dts """
-
+        @asyncio.coroutine
+        def on_prepare(xact_info, query_action, ks_path, msg):
+            if (self.reg_ready):
+                if (query_action ==  rwdts.QueryAction.READ):
+                    self._mon_param_controller.retrieve(xact_info, ks_path, self.update_dts_read)
+                
+            xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+        @asyncio.coroutine
+        def on_ready(regh, status):
+            self.reg_ready = 1
+        
+        handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
+        self.reg_ready = 0
         self.reg = yield from self.dts.register(xpath=self.xpath(),
-                  flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
+                                                flags=rwdts.Flag.PUBLISHER,
+                                                handler=handler)
 
         assert self.reg is not None
 
@@ -705,3 +753,8 @@ class VnfMonitorDtsHandler(mano_dts.DtsHandler):
             self.reg.deregister()
             self.reg = None
             self._vnfr = None
+
+    def update_nsr_mon(self, nsr_mon):
+        """ update nsr mon """
+        self._nsr_mon = nsr_mon
+