--- /dev/null
+
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import asyncio
+import logging
+import collections
+import concurrent
+import types
+
+import requests
+import requests.auth
+import tornado.escape
+
+from requests.packages.urllib3.exceptions import InsecureRequestWarning
+
+import gi
+gi.require_version('RwDts', '1.0')
+import rift.tasklets
+from gi.repository import (
+ RwDts as rwdts,
+ VnfrYang
+ )
+import rift.mano.dts as mano_dts
+import rwlogger
+
+
+class MonitoringParamError(Exception):
+ """Monitoring Parameter error"""
+ pass
+
+
+class JsonPathValueQuerier(object):
+ def __init__(self, log, json_path):
+ self._log = log
+ self._json_path = json_path
+ self._json_path_expr = None
+
+ try:
+ import jsonpath_rw
+ self._json_path_expr = jsonpath_rw.parse(self._json_path)
+ except Exception as e:
+ self._log.error("Could not create json_path parser: %s", str(e))
+
+ def query(self, json_msg):
+ try:
+ json_dict = tornado.escape.json_decode(json_msg)
+ except ValueError as e:
+ msg = "Failed to convert response into json"
+ self._log.warning(msg)
+ raise MonitoringParamError(e)
+
+ if self._json_path_expr is None:
+ raise MonitoringParamError(
+ "Parser not created. Unable to extract value from %s" % json_msg
+ )
+
+ try:
+ matches = self._json_path_expr.find(json_dict)
+ values = [m.value for m in matches]
+ except Exception as e:
+ raise MonitoringParamError(
+ "Failed to run find using json_path (%s) against json_msg: %s" %
+ (self._json_path, str(e))
+ )
+
+ if len(values) == 0:
+ raise MonitoringParamError(
+ "No values found from json_path (%s)" % self._json_path
+ )
+
+ if len(values) > 1:
+ self._log.debug("Got multiple values from json_path (%s). Only returning the first.",
+ self._json_path)
+
+ return values[0]
+
+
+class ObjectPathValueQuerier(object):
+ def __init__(self, log, object_path):
+ self._log = log
+ self._object_path = object_path
+ self._object_path_expr = None
+
+ def query(self, object_msg):
+ try:
+ object_dict = tornado.escape.json_decode(object_msg)
+ except ValueError as e:
+ msg = "Failed to convert response into object"
+ self._log.warning(msg)
+ raise MonitoringParamError(e)
+
+ import objectpath
+ try:
+ tree = objectpath.Tree(object_dict)
+ except Exception as e:
+ msg = "Could not create objectpath tree: %s", str(e)
+ self._log.error(msg)
+ raise MonitoringParamError(msg)
+
+ try:
+ value = tree.execute(self._object_path)
+ except Exception as e:
+ raise MonitoringParamError(
+ "Failed to run execute object_path (%s) against object_msg: %s" %
+ (self._object_path, str(e))
+ )
+
+ if isinstance(value, types.GeneratorType):
+ try:
+ value = next(value)
+ except Exception as e:
+ raise MonitoringParamError(
+ "Failed to get value from objectpath %s execute generator: %s" %
+ (self._object_path, str(e))
+ )
+
+ if isinstance(value, (list, tuple)):
+ if len(value) == 0:
+ raise MonitoringParamError(
+ "No values found from object_path (%s)" % self._object_path
+ )
+
+ elif len(value) > 1:
+ self._log.debug(
+ "Got multiple values from object_path (%s). "
+ "Only returning the first.", self._object_path
+ )
+
+ # Only take the first element
+ value = value[0]
+
+ return value
+
+
+class JsonKeyValueQuerier(object):
+ def __init__(self, log, key):
+ self._log = log
+ self._key = key
+
+ def query(self, json_msg):
+ try:
+ json_dict = tornado.escape.json_decode(json_msg)
+ except ValueError as e:
+ msg = "Failed to convert response into json"
+ self._log.warning(msg)
+ raise MonitoringParamError(e)
+
+ if self._key not in json_dict:
+ msg = "Did not find '{}' key in response: {}".format(
+ self._key, json_dict
+ )
+ self._log.warning(msg)
+ raise MonitoringParamError(msg)
+
+ value = json_dict[self._key]
+
+ return value
+
+
+class ValueConverter(object):
+ def __init__(self, value_type):
+ self._value_type = value_type
+
+ def _convert_int(self, value):
+ if isinstance(value, int):
+ return value
+
+ try:
+ return int(value)
+ except (ValueError, TypeError) as e:
+ raise MonitoringParamError(
+ "Could not convert value into integer: %s", str(e)
+ )
+
+ def _convert_text(self, value):
+ if isinstance(value, str):
+ return value
+
+ try:
+ return str(value)
+ except (ValueError, TypeError) as e:
+ raise MonitoringParamError(
+ "Could not convert value into string: %s", str(e)
+ )
+
+ def _convert_decimal(self, value):
+ if isinstance(value, float):
+ return value
+
+ try:
+ return float(value)
+ except (ValueError, TypeError) as e:
+ raise MonitoringParamError(
+ "Could not convert value into string: %s", str(e)
+ )
+
+ def convert(self, value):
+ if self._value_type == "INT":
+ return self._convert_int(value)
+ elif self._value_type == "DECIMAL":
+ return self._convert_decimal(value)
+ elif self._value_type == "STRING":
+ return self._convert_text(value)
+ else:
+ raise MonitoringParamError("Unknown value type: %s", self._value_type)
+
+
+class HTTPBasicAuth(object):
+ def __init__(self, username, password):
+ self.username = username
+ self.password = password
+
+
+class HTTPEndpoint(object):
+ def __init__(self, log, loop, ip_address, ep_msg):
+ self._log = log
+ self._loop = loop
+ self._ip_address = ip_address
+ self._ep_msg = ep_msg
+
+ # This is to suppress HTTPS related warning as we do not support
+ # certificate verification yet
+ requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
+ self._session = requests.Session()
+ self._auth = None
+ self._headers = None
+
+ @property
+ def poll_interval(self):
+ return self._ep_msg.polling_interval_secs
+
+ @property
+ def ip_address(self):
+ return self._ip_address
+
+ @property
+ def port(self):
+ return self._ep_msg.port
+
+ @property
+ def protocol(self):
+ if self._ep_msg.has_field("https"):
+ if self._ep_msg.https is True:
+ return "https"
+
+ return "http"
+
+ @property
+ def path(self):
+ return self._ep_msg.path
+
+ @property
+ def method(self):
+ if self._ep_msg.has_field("method"):
+ return self._ep_msg.method
+ return "GET"
+
+ @property
+ def username(self):
+ if self._ep_msg.has_field("username"):
+ return self._ep_msg.username
+
+ return None
+
+ @property
+ def headers(self):
+ if self._headers is None:
+ headers = {}
+ for header in self._ep_msg.headers:
+ if header.has_field("key") and header.has_field("value"):
+ headers[header.key] = header.value
+
+ self._headers = headers
+
+ return self._headers
+
+ @property
+ def password(self):
+ if self._ep_msg.has_field("password"):
+ return self._ep_msg.password
+
+ return None
+
+ @property
+ def auth(self):
+ if self._auth is None:
+ if self.username is not None and self.password is not None:
+ self._auth = requests.auth.HTTPBasicAuth(
+ self.username,
+ self.password,
+ )
+
+ return self._auth
+
+ @property
+ def url(self):
+ url = "{protocol}://{ip_address}:{port}/{path}".format(
+ protocol=self.protocol,
+ ip_address=self.ip_address,
+ port=self.port,
+ path=self.path.lstrip("/"),
+ )
+
+ return url
+
+ def _poll(self):
+ try:
+ resp = self._session.request(
+ self.method, self.url, timeout=10, auth=self.auth,
+ headers=self.headers, verify=False
+ )
+ resp.raise_for_status()
+ except requests.exceptions.RequestException as e:
+ msg = "Got HTTP error when request monitoring method {} from url {}: {}".format(
+ self.method,
+ self.url,
+ str(e),
+ )
+ self._log.warning(msg)
+ raise MonitoringParamError(msg)
+
+ return resp.text
+
+ @asyncio.coroutine
+ def poll(self):
+ try:
+ with concurrent.futures.ThreadPoolExecutor(1) as executor:
+ resp = yield from self._loop.run_in_executor(
+ executor,
+ self._poll,
+ )
+
+ except MonitoringParamError as e:
+ msg = "Caught exception when polling http endpoint: %s" % str(e)
+ self._log.warning(msg)
+ raise MonitoringParamError(msg)
+
+ self._log.debug("Got response from http endpoint (%s): %s",
+ self.url, resp)
+
+ return resp
+
+
+class MonitoringParam(object):
+ def __init__(self, log, vnfr_mon_param_msg):
+ self._log = log
+ self._vnfr_mon_param_msg = vnfr_mon_param_msg
+
+ self._current_value = None
+
+ self._json_querier = self._create_json_querier()
+ self._value_converter = ValueConverter(self.value_type)
+
+ def _create_json_querier(self):
+ if self.msg.json_query_method == "NAMEKEY":
+ return JsonKeyValueQuerier(self._log, self.msg.name)
+ elif self.msg.json_query_method == "JSONPATH":
+ if not self.msg.json_query_params.has_field("json_path"):
+ msg = "JSONPATH query_method requires json_query_params.json_path to be filled in %s"
+ self._log.error(msg, self.msg)
+ raise ValueError(msg)
+ return JsonPathValueQuerier(self._log, self.msg.json_query_params.json_path)
+ elif self.msg.json_query_method == "OBJECTPATH":
+ if not self.msg.json_query_params.has_field("object_path"):
+ msg = "OBJECTPATH query_method requires json_query_params.object_path to be filled in %s"
+ self._log.error(msg, self.msg)
+ raise ValueError(msg)
+ return ObjectPathValueQuerier(self._log, self.msg.json_query_params.object_path)
+ else:
+ msg = "Unknown JSON query method: %s" % self.json_query_method
+ self._log.error(msg)
+ raise ValueError(msg)
+
+ @property
+ def current_value(self):
+ return self._current_value
+
+ @property
+ def msg(self):
+ msg = self._vnfr_mon_param_msg
+ value_type = msg.value_type
+
+ if self._current_value is None:
+ return msg
+
+ if value_type == "INT":
+ msg.value_integer = self._current_value
+
+ elif value_type == "DECIMAL":
+ msg.value_decimal = self._current_value
+
+ elif value_type == "STRING":
+ msg.value_string = self._current_value
+
+ else:
+ self._log.debug("Unknown value_type: %s", value_type)
+
+ return msg
+
+ @property
+ def path(self):
+ return self.msg.http_endpoint_ref
+
+ @property
+ def value_type(self):
+ return self.msg.value_type
+
+ @property
+ def json_query_method(self):
+ return self.msg.json_query_method
+
+ @property
+ def json_path(self):
+ return self.msg.json_path_params.json_path
+
+ @property
+ def name(self):
+ return self.msg.name
+
+ def extract_value_from_response(self, response_msg):
+ if self._json_querier is None:
+ self._log.warning("json querier is not created. Cannot extract value form response.")
+ return
+
+ try:
+ value = self._json_querier.query(response_msg)
+ converted_value = self._value_converter.convert(value)
+ except MonitoringParamError as e:
+ self._log.warning("Failed to extract value from json response: %s", str(e))
+ return
+ else:
+ self._current_value = converted_value
+
+
+class EndpointMonParamsPoller(object):
+ REQUEST_TIMEOUT_SECS = 10
+
+ def __init__(self, log, loop, endpoint, mon_params, on_update_cb=None):
+ self._log = log
+ self._loop = loop
+ self._endpoint = endpoint
+ self._mon_params = mon_params
+ self._on_update_cb = on_update_cb
+
+ self._poll_task = None
+
+ @property
+ def poll_interval(self):
+ return self._endpoint.poll_interval
+
+ def _get_mon_param_msgs(self):
+ return [mon_param.msg for mon_param in self._mon_params]
+
+ def _notify_subscriber(self):
+ if self._on_update_cb is None:
+ return
+
+ self._on_update_cb(self._get_mon_param_msgs())
+
+ def _apply_response_to_mon_params(self, response_msg):
+ for mon_param in self._mon_params:
+ mon_param.extract_value_from_response(response_msg)
+
+ self._notify_subscriber()
+
+ @asyncio.coroutine
+ def _poll_loop(self):
+ self._log.debug("Starting http endpoint %s poll loop", self._endpoint.url)
+ while True:
+ try:
+ response = yield from self._endpoint.poll()
+ self._apply_response_to_mon_params(response)
+ except concurrent.futures.CancelledError as e:
+ return
+
+ yield from asyncio.sleep(self.poll_interval, loop=self._loop)
+
+ def start(self):
+ self._log.debug("Got start request for endpoint poller: %s",
+ self._endpoint.url)
+ if self._poll_task is not None:
+ return
+ self._poll_task = self._loop.create_task(self._poll_loop())
+
+ def stop(self):
+ self._log.debug("Got stop request for endpoint poller: %s",
+ self._endpoint.url)
+ if self._poll_task is None:
+ return
+
+ self._poll_task.cancel()
+
+ self._poll_task = None
+
+
+class VnfMonitoringParamsController(object):
+ def __init__(self, log, loop, vnfr_id, management_ip,
+ http_endpoint_msgs, monitoring_param_msgs,
+ on_update_cb=None):
+ self._log = log
+ self._loop = loop
+ self._vnfr_id = vnfr_id
+ self._management_ip = management_ip
+ self._http_endpoint_msgs = http_endpoint_msgs
+ self._monitoring_param_msgs = monitoring_param_msgs
+
+ self._on_update_cb = on_update_cb
+ self._endpoints = self._create_endpoints()
+ self._mon_params = self._create_mon_params()
+
+ self._endpoint_mon_param_map = self._create_endpoint_mon_param_map(
+ self._endpoints, self._mon_params
+ )
+ self._endpoint_pollers = self._create_endpoint_pollers(self._endpoint_mon_param_map)
+
+ def _create_endpoints(self):
+ path_endpoint_map = {}
+ for ep_msg in self._http_endpoint_msgs:
+ endpoint = HTTPEndpoint(
+ self._log,
+ self._loop,
+ self._management_ip,
+ ep_msg,
+ )
+ path_endpoint_map[endpoint.path] = endpoint
+
+ return path_endpoint_map
+
+ def _create_mon_params(self):
+ mon_params = {}
+ for mp_msg in self._monitoring_param_msgs:
+ mon_params[mp_msg.id] = MonitoringParam(
+ self._log,
+ mp_msg,
+ )
+
+ return mon_params
+
+ def _create_endpoint_mon_param_map(self, endpoints, mon_params):
+ ep_mp_map = collections.defaultdict(list)
+ for mp in mon_params.values():
+ endpoint = endpoints[mp.path]
+ ep_mp_map[endpoint].append(mp)
+
+ return ep_mp_map
+
+ def _create_endpoint_pollers(self, ep_mp_map):
+ pollers = []
+
+ for endpoint, mon_params in ep_mp_map.items():
+ poller = EndpointMonParamsPoller(
+ self._log,
+ self._loop,
+ endpoint,
+ mon_params,
+ self._on_update_cb
+ )
+
+ pollers.append(poller)
+
+ return pollers
+
+ @property
+ def msgs(self):
+ msgs = []
+ for mp in self.mon_params:
+ msgs.append(mp.msg)
+
+ return msgs
+
+ @property
+ def mon_params(self):
+ return list(self._mon_params.values())
+
+ @property
+ def endpoints(self):
+ return list(self._endpoints.values())
+
+ def start(self):
+ """ Start monitoring """
+ self._log.debug("Starting monitoring of VNF id: %s", self._vnfr_id)
+ for poller in self._endpoint_pollers:
+ poller.start()
+
+ def stop(self):
+ """ Stop monitoring """
+ self._log.debug("Stopping monitoring of VNF id: %s", self._vnfr_id)
+ for poller in self._endpoint_pollers:
+ poller.stop()
+
+
+class VnfMonitorDtsHandler(mano_dts.DtsHandler):
+ """ VNF monitoring class """
+ # List of list: So we need to register for the list in the deepest level
+ XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr/vnfr:monitoring-param"
+
+ @classmethod
+ def from_vnf_data(cls, tasklet, vnfr_msg, vnfd_msg):
+ handler = cls(tasklet.log, tasklet.dts, tasklet.loop,
+ vnfr_msg.id, vnfr_msg.mgmt_interface.ip_address,
+ vnfd_msg.monitoring_param, vnfd_msg.http_endpoint)
+
+ return handler
+
+ def __init__(self, log, dts, loop, vnfr_id, mgmt_ip, params, endpoints):
+ super().__init__(log, dts, loop)
+
+ self._mgmt_ip = mgmt_ip
+ self._vnfr_id = vnfr_id
+
+ mon_params = []
+ for mon_param in params:
+ param = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(
+ mon_param.as_dict()
+ )
+ mon_params.append(param)
+
+ http_endpoints = []
+ for endpoint in endpoints:
+ endpoint = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_HttpEndpoint.from_dict(
+ endpoint.as_dict()
+ )
+ http_endpoints.append(endpoint)
+
+ self.log.debug("Creating monitoring param controller")
+ self.log.debug(" - Endpoints: %s", http_endpoints)
+ self.log.debug(" - Monitoring Params: %s", mon_params)
+
+ self._mon_param_controller = VnfMonitoringParamsController(
+ self.log,
+ self.loop,
+ self._vnfr_id,
+ self._mgmt_ip,
+ http_endpoints,
+ mon_params,
+ self.on_update_mon_params
+ )
+
+ def on_update_mon_params(self, mon_param_msgs):
+ for param_msg in mon_param_msgs:
+ self.reg.update_element(
+ self.xpath(param_msg.id),
+ param_msg,
+ rwdts.XactFlag.ADVISE
+ )
+
+ def start(self):
+ self._mon_param_controller.start()
+
+ def stop(self):
+ self.deregister()
+ self._mon_param_controller.stop()
+
+ def xpath(self, param_id=None):
+ """ Monitoring params xpath """
+ return("D,/vnfr:vnfr-catalog" +
+ "/vnfr:vnfr[vnfr:id='{}']".format(self._vnfr_id) +
+ "/vnfr:monitoring-param" +
+ ("[vnfr:id='{}']".format(param_id) if param_id else ""))
+
+ @property
+ def msg(self):
+ """ The message with the monitoing params """
+ return self._mon_param_controller.msgs
+
+ def __del__(self):
+ self.stop()
+
+ @asyncio.coroutine
+ def register(self):
+ """ Register with dts """
+
+ self.reg = yield from self.dts.register(xpath=self.xpath(),
+ flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
+
+ assert self.reg is not None
+
+ def deregister(self):
+ """ de-register with dts """
+ if self.reg is not None:
+ self.log.debug("Deregistering path %s, regh = %s",
+ VnfMonitorDtsHandler.XPATH,
+ self.reg)
+ self.reg.deregister()
+ self.reg = None
+ self._vnfr = None