3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
28 from requests
.packages
.urllib3
.exceptions
import InsecureRequestWarning
31 gi
.require_version('RwDts', '1.0')
33 from gi
.repository
import (
37 import rift
.mano
.dts
as mano_dts
39 import xmltodict
, json
41 class MonitoringParamError(Exception):
42 """Monitoring Parameter error"""
46 class JsonPathValueQuerier(object):
47 def __init__(self
, log
, json_path
):
49 self
._json
_path
= json_path
50 self
._json
_path
_expr
= None
54 self
._json
_path
_expr
= jsonpath_rw
.parse(self
._json
_path
)
55 except Exception as e
:
56 self
._log
.error("Could not create json_path parser: %s", str(e
))
58 def query(self
, json_msg
):
60 json_dict
= tornado
.escape
.json_decode(json_msg
)
61 except ValueError as e
:
62 msg
= "Failed to convert response into json"
63 self
._log
.warning(msg
)
64 raise MonitoringParamError(e
)
66 if self
._json
_path
_expr
is None:
67 raise MonitoringParamError(
68 "Parser not created. Unable to extract value from %s" % json_msg
72 matches
= self
._json
_path
_expr
.find(json_dict
)
73 values
= [m
.value
for m
in matches
]
74 except Exception as e
:
75 raise MonitoringParamError(
76 "Failed to run find using json_path (%s) against json_msg: %s" %
77 (self
._json
_path
, str(e
))
81 raise MonitoringParamError(
82 "No values found from json_path (%s)" % self
._json
_path
86 self
._log
.debug("Got multiple values from json_path (%s). Only returning the first.",
92 class ObjectPathValueQuerier(object):
93 def __init__(self
, log
, object_path
):
95 self
._object
_path
= object_path
96 self
._object
_path
_expr
= None
98 def query(self
, object_msg
):
100 object_dict
= tornado
.escape
.json_decode(object_msg
)
101 except ValueError as e
:
102 msg
= "Failed to convert response into object"
103 self
._log
.warning(msg
)
104 raise MonitoringParamError(e
)
108 tree
= objectpath
.Tree(object_dict
)
109 except Exception as e
:
110 msg
= "Could not create objectpath tree: %s", str(e
)
112 raise MonitoringParamError(msg
)
115 value
= tree
.execute(self
._object
_path
)
116 except Exception as e
:
117 raise MonitoringParamError(
118 "Failed to run execute object_path (%s) against object_msg: %s" %
119 (self
._object
_path
, str(e
))
122 if isinstance(value
, types
.GeneratorType
):
125 except Exception as e
:
126 raise MonitoringParamError(
127 "Failed to get value from objectpath %s execute generator: %s" %
128 (self
._object
_path
, str(e
))
131 if isinstance(value
, (list, tuple)):
133 raise MonitoringParamError(
134 "No values found from object_path (%s)" % self
._object
_path
139 "Got multiple values from object_path (%s). "
140 "Only returning the first.", self
._object
_path
143 # Only take the first element
149 class JsonKeyValueQuerier(object):
150 def __init__(self
, log
, key
):
154 def query(self
, json_msg
):
156 json_dict
= tornado
.escape
.json_decode(json_msg
)
157 except ValueError as e
:
158 msg
= "Failed to convert response into json"
159 self
._log
.warning(msg
)
160 raise MonitoringParamError(e
)
162 if self
._key
not in json_dict
:
163 msg
= "Did not find '{}' key in response: {}".format(
166 self
._log
.warning(msg
)
167 raise MonitoringParamError(msg
)
169 value
= json_dict
[self
._key
]
174 class ValueConverter(object):
175 def __init__(self
, value_type
):
176 self
._value
_type
= value_type
178 def _convert_int(self
, value
):
179 if isinstance(value
, int):
184 except (ValueError, TypeError) as e
:
185 raise MonitoringParamError(
186 "Could not convert value into integer: %s", str(e
)
189 def _convert_text(self
, value
):
190 if isinstance(value
, str):
195 except (ValueError, TypeError) as e
:
196 raise MonitoringParamError(
197 "Could not convert value into string: %s", str(e
)
200 def _convert_decimal(self
, value
):
201 if isinstance(value
, float):
206 except (ValueError, TypeError) as e
:
207 raise MonitoringParamError(
208 "Could not convert value into string: %s", str(e
)
211 def convert(self
, value
):
212 if self
._value
_type
== "INT":
213 return self
._convert
_int
(value
)
214 elif self
._value
_type
== "DECIMAL":
215 return self
._convert
_decimal
(value
)
216 elif self
._value
_type
== "STRING":
217 return self
._convert
_text
(value
)
219 raise MonitoringParamError("Unknown value type: %s", self
._value
_type
)
222 class HTTPBasicAuth(object):
223 def __init__(self
, username
, password
):
224 self
.username
= username
225 self
.password
= password
228 class HTTPEndpoint(object):
229 def __init__(self
, log
, loop
, ip_address
, ep_msg
):
232 self
._ip
_address
= ip_address
233 self
._ep
_msg
= ep_msg
235 # This is to suppress HTTPS related warning as we do not support
236 # certificate verification yet
237 requests
.packages
.urllib3
.disable_warnings(InsecureRequestWarning
)
238 self
._session
= requests
.Session()
243 def poll_interval(self
):
244 return self
._ep
_msg
.polling_interval_secs
247 def ip_address(self
):
248 return self
._ip
_address
252 return self
._ep
_msg
.port
256 if self
._ep
_msg
.has_field("https"):
257 if self
._ep
_msg
.https
is True:
264 return self
._ep
_msg
.path
268 if self
._ep
_msg
.has_field("method"):
269 return self
._ep
_msg
.method
274 if self
._ep
_msg
.has_field("username"):
275 return self
._ep
_msg
.username
281 if self
._headers
is None:
283 for header
in self
._ep
_msg
.headers
:
284 if header
.has_field("key") and header
.has_field("value"):
285 headers
[header
.key
] = header
.value
287 self
._headers
= headers
293 if self
._ep
_msg
.has_field("password"):
294 return self
._ep
_msg
.password
300 if self
._auth
is None:
301 if self
.username
is not None and self
.password
is not None:
302 self
._auth
= requests
.auth
.HTTPBasicAuth(
311 url
= "{protocol}://{ip_address}:{port}/{path}".format(
312 protocol
=self
.protocol
,
313 ip_address
=self
.ip_address
,
315 path
=self
.path
.lstrip("/"),
322 resp
= self
._session
.request(
323 self
.method
, self
.url
, timeout
=10, auth
=self
.auth
,
324 headers
=self
.headers
, verify
=False
326 resp
.raise_for_status()
327 except requests
.exceptions
.RequestException
as e
:
328 msg
= "Got HTTP error when request monitoring method {} from url {}: {}".format(
333 self
._log
.warning(msg
)
334 raise MonitoringParamError(msg
)
341 with concurrent
.futures
.ThreadPoolExecutor(1) as executor
:
342 resp
= yield from self
._loop
.run_in_executor(
347 except MonitoringParamError
as e
:
348 msg
= "Caught exception when polling http endpoint: %s" % str(e
)
349 self
._log
.warning(msg
)
350 raise MonitoringParamError(msg
)
352 self
._log
.debug("Got response from http endpoint (%s): %s",
358 class MonitoringParam(object):
359 def __init__(self
, log
, vnfr_mon_param_msg
):
361 self
._vnfr
_mon
_param
_msg
= vnfr_mon_param_msg
363 self
._current
_value
= None
365 self
._json
_querier
= self
._create
_json
_querier
()
366 self
._value
_converter
= ValueConverter(self
.value_type
)
368 def _create_json_querier(self
):
369 if self
.msg
.json_query_method
== "NAMEKEY":
370 return JsonKeyValueQuerier(self
._log
, self
.msg
.name
)
371 elif self
.msg
.json_query_method
== "JSONPATH":
372 if not self
.msg
.json_query_params
.has_field("json_path"):
373 msg
= "JSONPATH query_method requires json_query_params.json_path to be filled in %s"
374 self
._log
.error(msg
, self
.msg
)
375 raise ValueError(msg
)
376 return JsonPathValueQuerier(self
._log
, self
.msg
.json_query_params
.json_path
)
377 elif self
.msg
.json_query_method
== "OBJECTPATH":
378 if not self
.msg
.json_query_params
.has_field("object_path"):
379 msg
= "OBJECTPATH query_method requires json_query_params.object_path to be filled in %s"
380 self
._log
.error(msg
, self
.msg
)
381 raise ValueError(msg
)
382 return ObjectPathValueQuerier(self
._log
, self
.msg
.json_query_params
.object_path
)
384 msg
= "Unknown JSON query method: %s" % self
.json_query_method
386 raise ValueError(msg
)
389 def current_value(self
):
390 return self
._current
_value
394 msg
= self
._vnfr
_mon
_param
_msg
395 value_type
= msg
.value_type
397 if self
._current
_value
is None:
400 if value_type
== "INT":
401 msg
.value_integer
= self
._current
_value
403 elif value_type
== "DECIMAL":
404 msg
.value_decimal
= self
._current
_value
406 elif value_type
== "STRING":
407 msg
.value_string
= self
._current
_value
410 self
._log
.debug("Unknown value_type: %s", value_type
)
416 return self
.msg
.http_endpoint_ref
419 def value_type(self
):
420 return self
.msg
.value_type
423 def json_query_method(self
):
424 return self
.msg
.json_query_method
428 return self
.msg
.json_path_params
.json_path
434 def extract_value_from_response(self
, response_msg
):
435 if self
._json
_querier
is None:
436 self
._log
.warning("json querier is not created. Cannot extract value form response.")
440 xml_data
= xmltodict
.parse(response_msg
)
441 json_msg
=json
.dumps(xml_data
)
442 response_msg
= json_msg
443 except Exception as e
:
447 value
= self
._json
_querier
.query(response_msg
)
448 converted_value
= self
._value
_converter
.convert(value
)
449 except MonitoringParamError
as e
:
450 self
._log
.warning("Failed to extract value from json response: %s", str(e
))
453 self
._current
_value
= converted_value
456 class EndpointMonParamsPoller(object):
457 REQUEST_TIMEOUT_SECS
= 10
459 def __init__(self
, log
, loop
, endpoint
, mon_params
, on_update_cb
=None):
462 self
._endpoint
= endpoint
463 self
._mon
_params
= mon_params
464 self
._on
_update
_cb
= on_update_cb
466 self
._poll
_task
= None
469 def poll_interval(self
):
470 return self
._endpoint
.poll_interval
472 def _get_mon_param_msgs(self
):
473 return [mon_param
.msg
for mon_param
in self
._mon
_params
]
475 def _notify_subscriber(self
):
476 if self
._on
_update
_cb
is None:
479 self
._on
_update
_cb
(self
._get
_mon
_param
_msgs
())
481 def _apply_response_to_mon_params(self
, response_msg
):
482 for mon_param
in self
._mon
_params
:
483 mon_param
.extract_value_from_response(response_msg
)
485 self
._notify
_subscriber
()
488 def _poll_loop(self
):
489 self
._log
.debug("Starting http endpoint %s poll loop", self
._endpoint
.url
)
492 response
= yield from self
._endpoint
.poll()
493 self
._apply
_response
_to
_mon
_params
(response
)
494 except concurrent
.futures
.CancelledError
as e
:
497 yield from asyncio
.sleep(self
.poll_interval
, loop
=self
._loop
)
500 self
._log
.debug("Got start request for endpoint poller: %s",
502 if self
._poll
_task
is not None:
504 self
._poll
_task
= self
._loop
.create_task(self
._poll
_loop
())
507 self
._log
.debug("Got stop request for endpoint poller: %s",
509 if self
._poll
_task
is None:
512 self
._poll
_task
.cancel()
514 self
._poll
_task
= None
517 class VnfMonitoringParamsController(object):
518 def __init__(self
, log
, loop
, vnfr_id
, management_ip
,
519 http_endpoint_msgs
, monitoring_param_msgs
,
523 self
._vnfr
_id
= vnfr_id
524 self
._management
_ip
= management_ip
525 self
._http
_endpoint
_msgs
= http_endpoint_msgs
526 self
._monitoring
_param
_msgs
= monitoring_param_msgs
528 self
._on
_update
_cb
= on_update_cb
529 self
._endpoints
= self
._create
_endpoints
()
530 self
._mon
_params
= self
._create
_mon
_params
()
532 self
._endpoint
_mon
_param
_map
= self
._create
_endpoint
_mon
_param
_map
(
533 self
._endpoints
, self
._mon
_params
535 self
._endpoint
_pollers
= self
._create
_endpoint
_pollers
(self
._endpoint
_mon
_param
_map
)
537 def _create_endpoints(self
):
538 path_endpoint_map
= {}
539 for ep_msg
in self
._http
_endpoint
_msgs
:
540 endpoint
= HTTPEndpoint(
546 path_endpoint_map
[endpoint
.path
] = endpoint
548 return path_endpoint_map
550 def _create_mon_params(self
):
552 for mp_msg
in self
._monitoring
_param
_msgs
:
553 mon_params
[mp_msg
.id] = MonitoringParam(
560 def _create_endpoint_mon_param_map(self
, endpoints
, mon_params
):
561 ep_mp_map
= collections
.defaultdict(list)
562 for mp
in mon_params
.values():
563 endpoint
= endpoints
[mp
.path
]
564 ep_mp_map
[endpoint
].append(mp
)
568 def _create_endpoint_pollers(self
, ep_mp_map
):
571 for endpoint
, mon_params
in ep_mp_map
.items():
572 poller
= EndpointMonParamsPoller(
580 pollers
.append(poller
)
587 for mp
in self
.mon_params
:
593 def mon_params(self
):
594 return list(self
._mon
_params
.values())
598 return list(self
._endpoints
.values())
601 """ Start monitoring """
602 self
._log
.debug("Starting monitoring of VNF id: %s", self
._vnfr
_id
)
603 for poller
in self
._endpoint
_pollers
:
607 """ Stop monitoring """
608 self
._log
.debug("Stopping monitoring of VNF id: %s", self
._vnfr
_id
)
609 for poller
in self
._endpoint
_pollers
:
613 class VnfMonitorDtsHandler(mano_dts
.DtsHandler
):
614 """ VNF monitoring class """
615 # List of list: So we need to register for the list in the deepest level
616 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr/vnfr:monitoring-param"
619 def from_vnf_data(cls
, tasklet
, vnfr_msg
, vnfd_msg
):
620 handler
= cls(tasklet
.log
, tasklet
.dts
, tasklet
.loop
,
621 vnfr_msg
.id, vnfr_msg
.mgmt_interface
.ip_address
,
622 vnfd_msg
.monitoring_param
, vnfd_msg
.http_endpoint
)
626 def __init__(self
, log
, dts
, loop
, vnfr_id
, mgmt_ip
, params
, endpoints
):
627 super().__init
__(log
, dts
, loop
)
629 self
._mgmt
_ip
= mgmt_ip
630 self
._vnfr
_id
= vnfr_id
633 for mon_param
in params
:
634 param
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam
.from_dict(
637 mon_params
.append(param
)
640 for endpoint
in endpoints
:
641 endpoint
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_HttpEndpoint
.from_dict(
644 http_endpoints
.append(endpoint
)
646 self
.log
.debug("Creating monitoring param controller")
647 self
.log
.debug(" - Endpoints: %s", http_endpoints
)
648 self
.log
.debug(" - Monitoring Params: %s", mon_params
)
650 self
._mon
_param
_controller
= VnfMonitoringParamsController(
657 self
.on_update_mon_params
660 def on_update_mon_params(self
, mon_param_msgs
):
661 for param_msg
in mon_param_msgs
:
662 self
.reg
.update_element(
663 self
.xpath(param_msg
.id),
665 rwdts
.XactFlag
.ADVISE
669 self
._mon
_param
_controller
.start()
673 self
._mon
_param
_controller
.stop()
675 def xpath(self
, param_id
=None):
676 """ Monitoring params xpath """
677 return("D,/vnfr:vnfr-catalog" +
678 "/vnfr:vnfr[vnfr:id='{}']".format(self
._vnfr
_id
) +
679 "/vnfr:monitoring-param" +
680 ("[vnfr:id='{}']".format(param_id
) if param_id
else ""))
684 """ The message with the monitoing params """
685 return self
._mon
_param
_controller
.msgs
692 """ Register with dts """
694 self
.reg
= yield from self
.dts
.register(xpath
=self
.xpath(),
695 flags
=rwdts
.Flag
.PUBLISHER|rwdts
.Flag
.CACHE|rwdts
.Flag
.NO_PREP_READ
)
697 assert self
.reg
is not None
699 def deregister(self
):
700 """ de-register with dts """
701 if self
.reg
is not None:
702 self
.log
.debug("Deregistering path %s, regh = %s",
703 VnfMonitorDtsHandler
.XPATH
,
705 self
.reg
.deregister()