3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
28 from requests
.packages
.urllib3
.exceptions
import InsecureRequestWarning
30 gi
.require_version('RwDts', '1.0')
32 from gi
.repository
import (
36 import rift
.mano
.dts
as mano_dts
38 import xmltodict
, json
39 gi
.require_version('RwKeyspec', '1.0')
40 from gi
.repository
.RwKeyspec
import quoted_key
43 class MonitoringParamError(Exception):
44 """Monitoring Parameter error"""
48 class JsonPathValueQuerier(object):
49 def __init__(self
, log
, json_path
):
51 self
._json
_path
= json_path
52 self
._json
_path
_expr
= None
56 self
._json
_path
_expr
= jsonpath_rw
.parse(self
._json
_path
)
57 except Exception as e
:
58 self
._log
.error("Could not create json_path parser: %s", str(e
))
60 def query(self
, json_msg
):
62 json_dict
= tornado
.escape
.json_decode(json_msg
)
63 except ValueError as e
:
64 msg
= "Failed to convert response into json"
65 self
._log
.warning(msg
)
66 raise MonitoringParamError(e
)
68 if self
._json
_path
_expr
is None:
69 raise MonitoringParamError(
70 "Parser not created. Unable to extract value from %s" % json_msg
74 matches
= self
._json
_path
_expr
.find(json_dict
)
75 values
= [m
.value
for m
in matches
]
76 except Exception as e
:
77 raise MonitoringParamError(
78 "Failed to run find using json_path (%s) against json_msg: %s" %
79 (self
._json
_path
, str(e
))
83 raise MonitoringParamError(
84 "No values found from json_path (%s)" % self
._json
_path
88 self
._log
.debug("Got multiple values from json_path (%s). Only returning the first.",
94 class ObjectPathValueQuerier(object):
95 def __init__(self
, log
, object_path
):
97 self
._object
_path
= object_path
98 self
._object
_path
_expr
= None
100 def query(self
, object_msg
):
102 object_dict
= tornado
.escape
.json_decode(object_msg
)
103 except ValueError as e
:
104 msg
= "Failed to convert response into object"
105 self
._log
.warning(msg
)
106 raise MonitoringParamError(e
)
110 tree
= objectpath
.Tree(object_dict
)
111 except Exception as e
:
112 msg
= "Could not create objectpath tree: %s", str(e
)
114 raise MonitoringParamError(msg
)
117 value
= tree
.execute(self
._object
_path
)
118 except Exception as e
:
119 raise MonitoringParamError(
120 "Failed to run execute object_path (%s) against object_msg: %s" %
121 (self
._object
_path
, str(e
))
124 if isinstance(value
, types
.GeneratorType
):
127 except Exception as e
:
128 raise MonitoringParamError(
129 "Failed to get value from objectpath %s execute generator: %s" %
130 (self
._object
_path
, str(e
))
133 if isinstance(value
, (list, tuple)):
135 raise MonitoringParamError(
136 "No values found from object_path (%s)" % self
._object
_path
141 "Got multiple values from object_path (%s). "
142 "Only returning the first.", self
._object
_path
145 # Only take the first element
151 class JsonKeyValueQuerier(object):
152 def __init__(self
, log
, key
):
156 def query(self
, json_msg
):
158 json_dict
= tornado
.escape
.json_decode(json_msg
)
159 except ValueError as e
:
160 msg
= "Failed to convert response into json"
161 self
._log
.warning(msg
)
162 raise MonitoringParamError(e
)
164 if self
._key
not in json_dict
:
165 msg
= "Did not find '{}' key in response: {}".format(
168 self
._log
.warning(msg
)
169 raise MonitoringParamError(msg
)
171 value
= json_dict
[self
._key
]
176 class ValueConverter(object):
177 def __init__(self
, value_type
):
178 self
._value
_type
= value_type
180 def _convert_int(self
, value
):
181 if isinstance(value
, int):
186 except (ValueError, TypeError) as e
:
187 raise MonitoringParamError(
188 "Could not convert value into integer: %s", str(e
)
191 def _convert_text(self
, value
):
192 if isinstance(value
, str):
197 except (ValueError, TypeError) as e
:
198 raise MonitoringParamError(
199 "Could not convert value into string: %s", str(e
)
202 def _convert_decimal(self
, value
):
203 if isinstance(value
, float):
208 except (ValueError, TypeError) as e
:
209 raise MonitoringParamError(
210 "Could not convert value into string: %s", str(e
)
213 def convert(self
, value
):
214 if self
._value
_type
== "INT":
215 return self
._convert
_int
(value
)
216 elif self
._value
_type
== "DECIMAL":
217 return self
._convert
_decimal
(value
)
218 elif self
._value
_type
== "STRING":
219 return self
._convert
_text
(value
)
221 raise MonitoringParamError("Unknown value type: %s", self
._value
_type
)
224 class HTTPBasicAuth(object):
225 def __init__(self
, username
, password
):
226 self
.username
= username
227 self
.password
= password
230 class HTTPEndpoint(object):
231 def __init__(self
, log
, loop
, ip_address
, ep_msg
, executor
=None):
234 self
._ip
_address
= ip_address
235 self
._ep
_msg
= ep_msg
236 self
._executor
= executor
238 # This is to suppress HTTPS related warning as we do not support
239 # certificate verification yet
240 requests
.packages
.urllib3
.disable_warnings(InsecureRequestWarning
)
241 self
._session
= requests
.Session()
246 def poll_interval(self
):
247 return self
._ep
_msg
.polling_interval_secs
250 def ip_address(self
):
251 return self
._ip
_address
255 return self
._ep
_msg
.port
259 if self
._ep
_msg
.has_field("https"):
260 if self
._ep
_msg
.https
is True:
267 return self
._ep
_msg
.path
271 if self
._ep
_msg
.has_field("method"):
272 return self
._ep
_msg
.method
276 def query_data(self
):
277 if self
._ep
_msg
.has_field("data"):
278 return self
._ep
_msg
.data
283 if self
._ep
_msg
.has_field("username"):
284 return self
._ep
_msg
.username
290 if self
._headers
is None:
292 for header
in self
._ep
_msg
.headers
:
293 if header
.has_field("key") and header
.has_field("value"):
294 headers
[header
.key
] = header
.value
296 self
._headers
= headers
302 if self
._ep
_msg
.has_field("password"):
303 return self
._ep
_msg
.password
309 if self
._auth
is None:
310 if self
.username
is not None and self
.password
is not None:
311 self
._auth
= requests
.auth
.HTTPBasicAuth(
320 url
= "{protocol}://{ip_address}:{port}/{path}".format(
321 protocol
=self
.protocol
,
322 ip_address
=self
.ip_address
,
324 path
=self
.path
.lstrip("/"),
331 resp
= self
._session
.request(
332 self
.method
, self
.url
, timeout
=10, auth
=self
.auth
,
333 headers
=self
.headers
, verify
=False, data
=self
.query_data
336 resp
.raise_for_status()
337 except requests
.exceptions
.RequestException
as e
:
338 msg
= "Got HTTP error when request monitoring method {} from url {}: {}".format(
343 self
._log
.warning(msg
)
344 raise MonitoringParamError(msg
)
351 if (self
._executor
is None):
352 with concurrent
.futures
.ThreadPoolExecutor(1) as executor
:
353 resp
= yield from self
._loop
.run_in_executor(
358 resp
= yield from self
._loop
.run_in_executor(
363 except MonitoringParamError
as e
:
364 msg
= "Caught exception when polling http endpoint: %s" % str(e
)
365 self
._log
.warning(msg
)
366 raise MonitoringParamError(msg
)
368 self
._log
.debug("Got response from http endpoint (%s): %s",
374 class MonitoringParam(object):
375 def __init__(self
, log
, vnfr_mon_param_msg
):
377 self
._vnfr
_mon
_param
_msg
= vnfr_mon_param_msg
379 self
._current
_value
= None
381 self
._json
_querier
= self
._create
_json
_querier
()
382 self
._value
_converter
= ValueConverter(self
.value_type
)
384 def _create_json_querier(self
):
385 if self
.msg
.json_query_method
== "NAMEKEY":
386 return JsonKeyValueQuerier(self
._log
, self
.msg
.name
)
387 elif self
.msg
.json_query_method
== "JSONPATH":
388 if not self
.msg
.json_query_params
.has_field("json_path"):
389 msg
= "JSONPATH query_method requires json_query_params.json_path to be filled in %s"
390 self
._log
.error(msg
, self
.msg
)
391 raise ValueError(msg
)
392 return JsonPathValueQuerier(self
._log
, self
.msg
.json_query_params
.json_path
)
393 elif self
.msg
.json_query_method
== "OBJECTPATH":
394 if not self
.msg
.json_query_params
.has_field("object_path"):
395 msg
= "OBJECTPATH query_method requires json_query_params.object_path to be filled in %s"
396 self
._log
.error(msg
, self
.msg
)
397 raise ValueError(msg
)
398 return ObjectPathValueQuerier(self
._log
, self
.msg
.json_query_params
.object_path
)
400 msg
= "Unknown JSON query method: %s" % self
.json_query_method
402 raise ValueError(msg
)
405 def current_value(self
):
406 return self
._current
_value
410 msg
= self
._vnfr
_mon
_param
_msg
411 value_type
= msg
.value_type
413 if self
._current
_value
is None:
416 if value_type
== "INT":
417 msg
.value_integer
= self
._current
_value
419 elif value_type
== "DECIMAL":
420 msg
.value_decimal
= self
._current
_value
422 elif value_type
== "STRING":
423 msg
.value_string
= self
._current
_value
426 self
._log
.debug("Unknown value_type: %s", value_type
)
432 return self
.msg
.http_endpoint_ref
435 def value_type(self
):
436 return self
.msg
.value_type
439 def json_query_method(self
):
440 return self
.msg
.json_query_method
444 return self
.msg
.json_path_params
.json_path
450 def extract_value_from_response(self
, response_msg
):
451 if self
._json
_querier
is None:
452 self
._log
.warning("json querier is not created. Cannot extract value form response.")
456 xml_data
= xmltodict
.parse(response_msg
)
457 json_msg
=json
.dumps(xml_data
)
458 response_msg
= json_msg
459 except Exception as e
:
463 value
= self
._json
_querier
.query(response_msg
)
464 converted_value
= self
._value
_converter
.convert(value
)
465 except MonitoringParamError
as e
:
466 self
._log
.warning("Failed to extract value from json response: %s", str(e
))
469 self
._current
_value
= converted_value
472 class EndpointMonParamsPoller(object):
473 REQUEST_TIMEOUT_SECS
= 10
475 def __init__(self
, log
, loop
, endpoint
, mon_params
, on_update_cb
=None):
478 self
._endpoint
= endpoint
479 self
._mon
_params
= mon_params
480 self
._on
_update
_cb
= on_update_cb
482 self
._poll
_task
= None
485 def poll_interval(self
):
486 return self
._endpoint
.poll_interval
488 def _get_mon_param_msgs(self
):
489 return [mon_param
.msg
for mon_param
in self
._mon
_params
]
491 def _notify_subscriber(self
):
492 if self
._on
_update
_cb
is None:
495 self
._on
_update
_cb
(self
._get
_mon
_param
_msgs
())
497 def _apply_response_to_mon_params(self
, response_msg
):
498 for mon_param
in self
._mon
_params
:
499 mon_param
.extract_value_from_response(response_msg
)
501 self
._notify
_subscriber
()
504 def _poll_loop(self
):
505 self
._log
.debug("Starting http endpoint %s poll loop", self
._endpoint
.url
)
508 response
= yield from self
._endpoint
.poll()
509 self
._apply
_response
_to
_mon
_params
(response
)
510 except MonitoringParamError
as e
:
512 except concurrent
.futures
.CancelledError
as e
:
515 yield from asyncio
.sleep(self
.poll_interval
, loop
=self
._loop
)
518 self
._log
.debug("Got start request for endpoint poller: %s",
520 if self
._poll
_task
is not None:
522 self
._poll
_task
= self
._loop
.create_task(self
._poll
_loop
())
525 self
._log
.debug("Got stop request for endpoint poller: %s",
527 if self
._poll
_task
is None:
530 self
._poll
_task
.cancel()
532 self
._poll
_task
= None
534 def retrieve(self
, xact_info
, ks_path
, send_handler
):
535 send_handler(xact_info
, self
._get
_mon
_param
_msgs
())
538 class VnfMonitoringParamsController(object):
539 def __init__(self
, log
, loop
, vnfr_id
, management_ip
,
540 http_endpoint_msgs
, monitoring_param_msgs
,
541 on_update_cb
=None, executor
=None):
544 self
._vnfr
_id
= vnfr_id
545 self
._executor
= executor
546 self
._management
_ip
= management_ip
547 self
._http
_endpoint
_msgs
= http_endpoint_msgs
548 self
._monitoring
_param
_msgs
= monitoring_param_msgs
550 self
._on
_update
_cb
= on_update_cb
551 self
._endpoints
= self
._create
_endpoints
()
552 self
._mon
_params
= self
._create
_mon
_params
()
554 self
._endpoint
_mon
_param
_map
= self
._create
_endpoint
_mon
_param
_map
(
555 self
._endpoints
, self
._mon
_params
557 self
._endpoint
_pollers
= self
._create
_endpoint
_pollers
(self
._endpoint
_mon
_param
_map
)
559 def _create_endpoints(self
):
560 path_endpoint_map
= {}
561 for ep_msg
in self
._http
_endpoint
_msgs
:
562 endpoint
= HTTPEndpoint(self
._log
,
565 ep_msg
,self
._executor
)
567 path_endpoint_map
[endpoint
.path
] = endpoint
569 return path_endpoint_map
571 def _create_mon_params(self
):
573 for mp_msg
in self
._monitoring
_param
_msgs
:
574 mon_params
[mp_msg
.id] = MonitoringParam(
581 def _create_endpoint_mon_param_map(self
, endpoints
, mon_params
):
582 ep_mp_map
= collections
.defaultdict(list)
583 for mp
in mon_params
.values():
584 endpoint
= endpoints
[mp
.path
]
585 ep_mp_map
[endpoint
].append(mp
)
589 def _create_endpoint_pollers(self
, ep_mp_map
):
592 for endpoint
, mon_params
in ep_mp_map
.items():
593 poller
= EndpointMonParamsPoller(
600 pollers
.append(poller
)
607 for mp
in self
.mon_params
:
613 def mon_params(self
):
614 return list(self
._mon
_params
.values())
618 return list(self
._endpoints
.values())
621 """ Start monitoring """
622 self
._log
.debug("Starting monitoring of VNF id: %s", self
._vnfr
_id
)
623 for poller
in self
._endpoint
_pollers
:
627 """ Stop monitoring """
628 self
._log
.debug("Stopping monitoring of VNF id: %s", self
._vnfr
_id
)
629 for poller
in self
._endpoint
_pollers
:
632 def retrieve(self
, xact_info
, ks_path
, send_handler
):
633 """Retrieve Monitoring params information """
634 for poller
in self
._endpoint
_pollers
:
635 poller
.retrieve(xact_info
, ks_path
, send_handler
)
637 class VnfMonitorDtsHandler(mano_dts
.DtsHandler
):
638 """ VNF monitoring class """
639 # List of list: So we need to register for the list in the deepest level
640 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr/vnfr:monitoring-param"
643 def from_vnf_data(cls
, project
, vnfr_msg
, vnfd_msg
):
644 handler
= cls(project
.log
, project
.dts
, project
.loop
, project
,
645 vnfr_msg
.id, vnfr_msg
.mgmt_interface
.ip_address
,
646 vnfd_msg
.monitoring_param
, vnfd_msg
.http_endpoint
)
650 def __init__(self
, log
, dts
, loop
, project
, vnfr_id
, mgmt_ip
, params
, endpoints
, executor
=None):
651 super().__init
__(log
, dts
, loop
, project
)
653 self
._mgmt
_ip
= mgmt_ip
654 self
._vnfr
_id
= vnfr_id
655 self
._executor
= executor
658 for mon_param
in params
:
659 param
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam
.from_dict(
662 mon_params
.append(param
)
665 for endpoint
in endpoints
:
666 endpoint
= VnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_HttpEndpoint
.from_dict(
669 http_endpoints
.append(endpoint
)
671 self
.log
.debug("Creating monitoring param controller")
672 self
.log
.debug(" - Endpoints: %s", http_endpoints
)
673 self
.log
.debug(" - Monitoring Params: %s", mon_params
)
675 self
._mon
_param
_controller
= VnfMonitoringParamsController(
682 on_update_cb
= self
.on_update_mon_params
,
683 executor
=self
._executor
,
687 def on_update_mon_params(self
, mon_param_msgs
):
688 for param_msg
in mon_param_msgs
:
689 #self.reg.update_element(
690 # self.xpath(param_msg.id),
692 # rwdts.XactFlag.ADVISE
694 if (self
._nsr
_mon
is not None):
695 self
._nsr
_mon
.apply_vnfr_mon(param_msg
, self
._vnfr
_id
)
697 def update_dts_read(self
, xact_info
, mon_param_msgs
):
698 for param_msg
in mon_param_msgs
:
699 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
700 xpath
=self
.xpath(param_msg
.id),
704 self
._mon
_param
_controller
.start()
708 self
._mon
_param
_controller
.stop()
710 def xpath(self
, param_id
=None):
711 """ Monitoring params xpath """
712 return self
.project
.add_project(("D,/vnfr:vnfr-catalog" +
713 "/vnfr:vnfr[vnfr:id={}]".format(quoted_key(self
._vnfr
_id
)) +
714 "/vnfr:monitoring-param" +
715 ("[vnfr:id={}]".format(quoted_key(param_id
)) if param_id
else "")))
719 """ The message with the monitoing params """
720 return self
._mon
_param
_controller
.msgs
727 """ Register with dts """
729 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
731 if (query_action
== rwdts
.QueryAction
.READ
):
732 self
._mon
_param
_controller
.retrieve(xact_info
, ks_path
, self
.update_dts_read
)
734 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
736 def on_ready(regh
, status
):
739 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
, on_ready
=on_ready
)
741 self
.reg
= yield from self
.dts
.register(xpath
=self
.xpath(),
742 flags
=rwdts
.Flag
.PUBLISHER
,
745 assert self
.reg
is not None
747 def deregister(self
):
748 """ de-register with dts """
749 if self
.reg
is not None:
750 self
.log
.debug("Deregistering path %s, regh = %s",
751 VnfMonitorDtsHandler
.XPATH
,
753 self
.reg
.deregister()
757 def update_nsr_mon(self
, nsr_mon
):
758 """ update nsr mon """
759 self
._nsr
_mon
= nsr_mon