#
import asyncio
-import logging
import collections
import concurrent
-import types
-
+import gi
+import logging
import requests
import requests.auth
import tornado.escape
+import types
from requests.packages.urllib3.exceptions import InsecureRequestWarning
-import gi
gi.require_version('RwDts', '1.0')
import rift.tasklets
from gi.repository import (
)
import rift.mano.dts as mano_dts
import rwlogger
+import xmltodict, json
+gi.require_version('RwKeyspec', '1.0')
+from gi.repository.RwKeyspec import quoted_key
class MonitoringParamError(Exception):
class HTTPEndpoint(object):
- def __init__(self, log, loop, ip_address, ep_msg):
+ def __init__(self, log, loop, ip_address, ep_msg, executor=None):
self._log = log
self._loop = loop
self._ip_address = ip_address
self._ep_msg = ep_msg
-
+ self._executor = executor
+
# This is to suppress HTTPS related warning as we do not support
# certificate verification yet
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
return self._ep_msg.method
return "GET"
+ @property
+ def query_data(self):
+ if self._ep_msg.has_field("data"):
+ return self._ep_msg.data
+ return None
+
@property
def username(self):
if self._ep_msg.has_field("username"):
def _poll(self):
try:
resp = self._session.request(
- self.method, self.url, timeout=10, auth=self.auth,
- headers=self.headers, verify=False
- )
+ self.method, self.url, timeout=10, auth=self.auth,
+ headers=self.headers, verify=False, data=self.query_data
+ )
+
resp.raise_for_status()
except requests.exceptions.RequestException as e:
msg = "Got HTTP error when request monitoring method {} from url {}: {}".format(
@asyncio.coroutine
def poll(self):
try:
- with concurrent.futures.ThreadPoolExecutor(1) as executor:
- resp = yield from self._loop.run_in_executor(
+ if (self._executor is None):
+ with concurrent.futures.ThreadPoolExecutor(1) as executor:
+ resp = yield from self._loop.run_in_executor(
executor,
self._poll,
- )
+ )
+ else:
+ resp = yield from self._loop.run_in_executor(
+ self._executor,
+ self._poll,
+ )
except MonitoringParamError as e:
msg = "Caught exception when polling http endpoint: %s" % str(e)
self._log.warning("json querier is not created. Cannot extract value form response.")
return
+ try:
+ xml_data = xmltodict.parse(response_msg)
+ json_msg=json.dumps(xml_data)
+ response_msg = json_msg
+ except Exception as e:
+ pass
+
try:
value = self._json_querier.query(response_msg)
converted_value = self._value_converter.convert(value)
self._on_update_cb = on_update_cb
self._poll_task = None
-
+
@property
def poll_interval(self):
return self._endpoint.poll_interval
def _apply_response_to_mon_params(self, response_msg):
for mon_param in self._mon_params:
mon_param.extract_value_from_response(response_msg)
-
+
self._notify_subscriber()
-
+
@asyncio.coroutine
def _poll_loop(self):
self._log.debug("Starting http endpoint %s poll loop", self._endpoint.url)
try:
response = yield from self._endpoint.poll()
self._apply_response_to_mon_params(response)
+ except MonitoringParamError as e:
+ pass
except concurrent.futures.CancelledError as e:
return
self._poll_task = None
+ def retrieve(self, xact_info, ks_path, send_handler):
+ send_handler(xact_info, self._get_mon_param_msgs())
+
class VnfMonitoringParamsController(object):
def __init__(self, log, loop, vnfr_id, management_ip,
http_endpoint_msgs, monitoring_param_msgs,
- on_update_cb=None):
+ on_update_cb=None, executor=None):
self._log = log
self._loop = loop
self._vnfr_id = vnfr_id
+ self._executor = executor
self._management_ip = management_ip
self._http_endpoint_msgs = http_endpoint_msgs
self._monitoring_param_msgs = monitoring_param_msgs
self._endpoints, self._mon_params
)
self._endpoint_pollers = self._create_endpoint_pollers(self._endpoint_mon_param_map)
-
+
def _create_endpoints(self):
path_endpoint_map = {}
for ep_msg in self._http_endpoint_msgs:
- endpoint = HTTPEndpoint(
- self._log,
- self._loop,
- self._management_ip,
- ep_msg,
- )
+ endpoint = HTTPEndpoint(self._log,
+ self._loop,
+ self._management_ip,
+ ep_msg,self._executor)
+
path_endpoint_map[endpoint.path] = endpoint
return path_endpoint_map
mon_params,
self._on_update_cb
)
-
pollers.append(poller)
-
+
return pollers
@property
for poller in self._endpoint_pollers:
poller.stop()
-
+ def retrieve(self, xact_info, ks_path, send_handler):
+ """Retrieve Monitoring params information """
+ for poller in self._endpoint_pollers:
+ poller.retrieve(xact_info, ks_path, send_handler)
+
class VnfMonitorDtsHandler(mano_dts.DtsHandler):
""" VNF monitoring class """
# List of list: So we need to register for the list in the deepest level
XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr/vnfr:monitoring-param"
@classmethod
- def from_vnf_data(cls, tasklet, vnfr_msg, vnfd_msg):
- handler = cls(tasklet.log, tasklet.dts, tasklet.loop,
+ def from_vnf_data(cls, project, vnfr_msg, vnfd_msg):
+ handler = cls(project.log, project.dts, project.loop, project,
vnfr_msg.id, vnfr_msg.mgmt_interface.ip_address,
- vnfd_msg.monitoring_param, vnfd_msg.http_endpoint)
+ vnfd_msg.monitoring_param, vnfd_msg.http_endpoint)
return handler
- def __init__(self, log, dts, loop, vnfr_id, mgmt_ip, params, endpoints):
- super().__init__(log, dts, loop)
+ def __init__(self, log, dts, loop, project, vnfr_id, mgmt_ip, params, endpoints, executor=None):
+ super().__init__(log, dts, loop, project)
self._mgmt_ip = mgmt_ip
self._vnfr_id = vnfr_id
-
+ self._executor = executor
+
mon_params = []
for mon_param in params:
- param = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(
+ param = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam.from_dict(
mon_param.as_dict()
)
mon_params.append(param)
http_endpoints = []
for endpoint in endpoints:
- endpoint = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_HttpEndpoint.from_dict(
+ endpoint = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_HttpEndpoint.from_dict(
endpoint.as_dict()
)
http_endpoints.append(endpoint)
self.log.debug(" - Monitoring Params: %s", mon_params)
self._mon_param_controller = VnfMonitoringParamsController(
- self.log,
- self.loop,
- self._vnfr_id,
- self._mgmt_ip,
- http_endpoints,
- mon_params,
- self.on_update_mon_params
- )
+ self.log,
+ self.loop,
+ self._vnfr_id,
+ self._mgmt_ip,
+ http_endpoints,
+ mon_params,
+ on_update_cb = self.on_update_mon_params,
+ executor=self._executor,
+ )
+ self._nsr_mon = None
def on_update_mon_params(self, mon_param_msgs):
for param_msg in mon_param_msgs:
- self.reg.update_element(
- self.xpath(param_msg.id),
- param_msg,
- rwdts.XactFlag.ADVISE
- )
-
+ #self.reg.update_element(
+ # self.xpath(param_msg.id),
+ # param_msg,
+ # rwdts.XactFlag.ADVISE
+ # )
+ if (self._nsr_mon is not None):
+ self._nsr_mon.apply_vnfr_mon(param_msg, self._vnfr_id)
+
+ def update_dts_read(self, xact_info, mon_param_msgs):
+ for param_msg in mon_param_msgs:
+ xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
+ xpath=self.xpath(param_msg.id),
+ msg=param_msg)
+
def start(self):
self._mon_param_controller.start()
def xpath(self, param_id=None):
""" Monitoring params xpath """
- return("D,/vnfr:vnfr-catalog" +
- "/vnfr:vnfr[vnfr:id='{}']".format(self._vnfr_id) +
+ return self.project.add_project(("D,/vnfr:vnfr-catalog" +
+ "/vnfr:vnfr[vnfr:id={}]".format(quoted_key(self._vnfr_id)) +
"/vnfr:monitoring-param" +
- ("[vnfr:id='{}']".format(param_id) if param_id else ""))
+ ("[vnfr:id={}]".format(quoted_key(param_id)) if param_id else "")))
@property
def msg(self):
def __del__(self):
self.stop()
-
+
@asyncio.coroutine
def register(self):
""" Register with dts """
-
+ @asyncio.coroutine
+ def on_prepare(xact_info, query_action, ks_path, msg):
+ if (self.reg_ready):
+ if (query_action == rwdts.QueryAction.READ):
+ self._mon_param_controller.retrieve(xact_info, ks_path, self.update_dts_read)
+
+ xact_info.respond_xpath(rwdts.XactRspCode.ACK)
+ @asyncio.coroutine
+ def on_ready(regh, status):
+ self.reg_ready = 1
+
+ handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
+ self.reg_ready = 0
self.reg = yield from self.dts.register(xpath=self.xpath(),
- flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
+ flags=rwdts.Flag.PUBLISHER,
+ handler=handler)
assert self.reg is not None
self.reg.deregister()
self.reg = None
self._vnfr = None
+
+ def update_nsr_mon(self, nsr_mon):
+ """ update nsr mon """
+ self._nsr_mon = nsr_mon
+