X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwmonparam%2Frift%2Ftasklets%2Frwmonparam%2Fvnfr_core.py;fp=rwlaunchpad%2Fplugins%2Frwmonparam%2Frift%2Ftasklets%2Frwmonparam%2Fvnfr_core.py;h=78bfd2d6249e22f70b3aca0612bd9c9d4eb517b8;hb=4870d0ee29789b859931e4e2c73e13dcb29537d5;hp=6dc3a25b908bfa94fe2c7fc495a949f3aeecbb4d;hpb=6f1a3fe149e4a6b9803382cb299c902f4cf58ec9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwmonparam/rift/tasklets/rwmonparam/vnfr_core.py b/rwlaunchpad/plugins/rwmonparam/rift/tasklets/rwmonparam/vnfr_core.py index 6dc3a25b..78bfd2d6 100644 --- a/rwlaunchpad/plugins/rwmonparam/rift/tasklets/rwmonparam/vnfr_core.py +++ b/rwlaunchpad/plugins/rwmonparam/rift/tasklets/rwmonparam/vnfr_core.py @@ -16,18 +16,17 @@ # import asyncio -import logging import collections import concurrent -import types - +import gi +import logging import requests import requests.auth import tornado.escape +import types from requests.packages.urllib3.exceptions import InsecureRequestWarning -import gi gi.require_version('RwDts', '1.0') import rift.tasklets from gi.repository import ( @@ -37,6 +36,9 @@ from gi.repository import ( import rift.mano.dts as mano_dts import rwlogger import xmltodict, json +gi.require_version('RwKeyspec', '1.0') +from gi.repository.RwKeyspec import quoted_key + class MonitoringParamError(Exception): """Monitoring Parameter error""" @@ -226,12 +228,13 @@ class HTTPBasicAuth(object): class HTTPEndpoint(object): - def __init__(self, log, loop, ip_address, ep_msg): + def __init__(self, log, loop, ip_address, ep_msg, executor=None): self._log = log self._loop = loop self._ip_address = ip_address self._ep_msg = ep_msg - + self._executor = executor + # This is to suppress HTTPS related warning as we do not support # certificate verification yet requests.packages.urllib3.disable_warnings(InsecureRequestWarning) @@ -269,6 +272,12 @@ class HTTPEndpoint(object): return self._ep_msg.method return "GET" + @property + def query_data(self): + if self._ep_msg.has_field("data"): + return self._ep_msg.data + return None + @property def username(self): if self._ep_msg.has_field("username"): @@ -320,9 +329,10 @@ class HTTPEndpoint(object): def _poll(self): try: resp = self._session.request( - self.method, self.url, timeout=10, auth=self.auth, - headers=self.headers, verify=False - ) + self.method, self.url, timeout=10, auth=self.auth, + headers=self.headers, verify=False, data=self.query_data + ) + resp.raise_for_status() except requests.exceptions.RequestException as e: msg = "Got HTTP error when request monitoring method {} from url {}: {}".format( @@ -338,11 +348,17 @@ class HTTPEndpoint(object): @asyncio.coroutine def poll(self): try: - with concurrent.futures.ThreadPoolExecutor(1) as executor: - resp = yield from self._loop.run_in_executor( + if (self._executor is None): + with concurrent.futures.ThreadPoolExecutor(1) as executor: + resp = yield from self._loop.run_in_executor( executor, self._poll, - ) + ) + else: + resp = yield from self._loop.run_in_executor( + self._executor, + self._poll, + ) except MonitoringParamError as e: msg = "Caught exception when polling http endpoint: %s" % str(e) @@ -464,7 +480,7 @@ class EndpointMonParamsPoller(object): self._on_update_cb = on_update_cb self._poll_task = None - + @property def poll_interval(self): return self._endpoint.poll_interval @@ -481,9 +497,9 @@ class EndpointMonParamsPoller(object): def _apply_response_to_mon_params(self, response_msg): for mon_param in self._mon_params: mon_param.extract_value_from_response(response_msg) - + self._notify_subscriber() - + @asyncio.coroutine def _poll_loop(self): self._log.debug("Starting http endpoint %s poll loop", self._endpoint.url) @@ -491,6 +507,8 @@ class EndpointMonParamsPoller(object): try: response = yield from self._endpoint.poll() self._apply_response_to_mon_params(response) + except MonitoringParamError as e: + pass except concurrent.futures.CancelledError as e: return @@ -513,14 +531,18 @@ class EndpointMonParamsPoller(object): self._poll_task = None + def retrieve(self, xact_info, ks_path, send_handler): + send_handler(xact_info, self._get_mon_param_msgs()) + class VnfMonitoringParamsController(object): def __init__(self, log, loop, vnfr_id, management_ip, http_endpoint_msgs, monitoring_param_msgs, - on_update_cb=None): + on_update_cb=None, executor=None): self._log = log self._loop = loop self._vnfr_id = vnfr_id + self._executor = executor self._management_ip = management_ip self._http_endpoint_msgs = http_endpoint_msgs self._monitoring_param_msgs = monitoring_param_msgs @@ -533,16 +555,15 @@ class VnfMonitoringParamsController(object): self._endpoints, self._mon_params ) self._endpoint_pollers = self._create_endpoint_pollers(self._endpoint_mon_param_map) - + def _create_endpoints(self): path_endpoint_map = {} for ep_msg in self._http_endpoint_msgs: - endpoint = HTTPEndpoint( - self._log, - self._loop, - self._management_ip, - ep_msg, - ) + endpoint = HTTPEndpoint(self._log, + self._loop, + self._management_ip, + ep_msg,self._executor) + path_endpoint_map[endpoint.path] = endpoint return path_endpoint_map @@ -576,9 +597,8 @@ class VnfMonitoringParamsController(object): mon_params, self._on_update_cb ) - pollers.append(poller) - + return pollers @property @@ -609,36 +629,41 @@ class VnfMonitoringParamsController(object): for poller in self._endpoint_pollers: poller.stop() - + def retrieve(self, xact_info, ks_path, send_handler): + """Retrieve Monitoring params information """ + for poller in self._endpoint_pollers: + poller.retrieve(xact_info, ks_path, send_handler) + class VnfMonitorDtsHandler(mano_dts.DtsHandler): """ VNF monitoring class """ # List of list: So we need to register for the list in the deepest level XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr/vnfr:monitoring-param" @classmethod - def from_vnf_data(cls, tasklet, vnfr_msg, vnfd_msg): - handler = cls(tasklet.log, tasklet.dts, tasklet.loop, + def from_vnf_data(cls, project, vnfr_msg, vnfd_msg): + handler = cls(project.log, project.dts, project.loop, project, vnfr_msg.id, vnfr_msg.mgmt_interface.ip_address, - vnfd_msg.monitoring_param, vnfd_msg.http_endpoint) + vnfd_msg.monitoring_param, vnfd_msg.http_endpoint) return handler - def __init__(self, log, dts, loop, vnfr_id, mgmt_ip, params, endpoints): - super().__init__(log, dts, loop) + def __init__(self, log, dts, loop, project, vnfr_id, mgmt_ip, params, endpoints, executor=None): + super().__init__(log, dts, loop, project) self._mgmt_ip = mgmt_ip self._vnfr_id = vnfr_id - + self._executor = executor + mon_params = [] for mon_param in params: - param = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict( + param = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam.from_dict( mon_param.as_dict() ) mon_params.append(param) http_endpoints = [] for endpoint in endpoints: - endpoint = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_HttpEndpoint.from_dict( + endpoint = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_HttpEndpoint.from_dict( endpoint.as_dict() ) http_endpoints.append(endpoint) @@ -648,23 +673,33 @@ class VnfMonitorDtsHandler(mano_dts.DtsHandler): self.log.debug(" - Monitoring Params: %s", mon_params) self._mon_param_controller = VnfMonitoringParamsController( - self.log, - self.loop, - self._vnfr_id, - self._mgmt_ip, - http_endpoints, - mon_params, - self.on_update_mon_params - ) + self.log, + self.loop, + self._vnfr_id, + self._mgmt_ip, + http_endpoints, + mon_params, + on_update_cb = self.on_update_mon_params, + executor=self._executor, + ) + self._nsr_mon = None def on_update_mon_params(self, mon_param_msgs): for param_msg in mon_param_msgs: - self.reg.update_element( - self.xpath(param_msg.id), - param_msg, - rwdts.XactFlag.ADVISE - ) - + #self.reg.update_element( + # self.xpath(param_msg.id), + # param_msg, + # rwdts.XactFlag.ADVISE + # ) + if (self._nsr_mon is not None): + self._nsr_mon.apply_vnfr_mon(param_msg, self._vnfr_id) + + def update_dts_read(self, xact_info, mon_param_msgs): + for param_msg in mon_param_msgs: + xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE, + xpath=self.xpath(param_msg.id), + msg=param_msg) + def start(self): self._mon_param_controller.start() @@ -674,10 +709,10 @@ class VnfMonitorDtsHandler(mano_dts.DtsHandler): def xpath(self, param_id=None): """ Monitoring params xpath """ - return("D,/vnfr:vnfr-catalog" + - "/vnfr:vnfr[vnfr:id='{}']".format(self._vnfr_id) + + return self.project.add_project(("D,/vnfr:vnfr-catalog" + + "/vnfr:vnfr[vnfr:id={}]".format(quoted_key(self._vnfr_id)) + "/vnfr:monitoring-param" + - ("[vnfr:id='{}']".format(param_id) if param_id else "")) + ("[vnfr:id={}]".format(quoted_key(param_id)) if param_id else ""))) @property def msg(self): @@ -686,13 +721,26 @@ class VnfMonitorDtsHandler(mano_dts.DtsHandler): def __del__(self): self.stop() - + @asyncio.coroutine def register(self): """ Register with dts """ - + @asyncio.coroutine + def on_prepare(xact_info, query_action, ks_path, msg): + if (self.reg_ready): + if (query_action == rwdts.QueryAction.READ): + self._mon_param_controller.retrieve(xact_info, ks_path, self.update_dts_read) + + xact_info.respond_xpath(rwdts.XactRspCode.ACK) + @asyncio.coroutine + def on_ready(regh, status): + self.reg_ready = 1 + + handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready) + self.reg_ready = 0 self.reg = yield from self.dts.register(xpath=self.xpath(), - flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ) + flags=rwdts.Flag.PUBLISHER, + handler=handler) assert self.reg is not None @@ -705,3 +753,8 @@ class VnfMonitorDtsHandler(mano_dts.DtsHandler): self.reg.deregister() self.reg = None self._vnfr = None + + def update_nsr_mon(self, nsr_mon): + """ update nsr mon """ + self._nsr_mon = nsr_mon +