6dc3a25b908bfa94fe2c7fc495a949f3aeecbb4d
[osm/SO.git] / rwlaunchpad / plugins / rwmonparam / rift / tasklets / rwmonparam / vnfr_core.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import logging
20 import collections
21 import concurrent
22 import types
23
24 import requests
25 import requests.auth
26 import tornado.escape
27
28 from requests.packages.urllib3.exceptions import InsecureRequestWarning
29
30 import gi
31 gi.require_version('RwDts', '1.0')
32 import rift.tasklets
33 from gi.repository import (
34 RwDts as rwdts,
35 VnfrYang
36 )
37 import rift.mano.dts as mano_dts
38 import rwlogger
39 import xmltodict, json
40
41 class MonitoringParamError(Exception):
42 """Monitoring Parameter error"""
43 pass
44
45
46 class JsonPathValueQuerier(object):
47 def __init__(self, log, json_path):
48 self._log = log
49 self._json_path = json_path
50 self._json_path_expr = None
51
52 try:
53 import jsonpath_rw
54 self._json_path_expr = jsonpath_rw.parse(self._json_path)
55 except Exception as e:
56 self._log.error("Could not create json_path parser: %s", str(e))
57
58 def query(self, json_msg):
59 try:
60 json_dict = tornado.escape.json_decode(json_msg)
61 except ValueError as e:
62 msg = "Failed to convert response into json"
63 self._log.warning(msg)
64 raise MonitoringParamError(e)
65
66 if self._json_path_expr is None:
67 raise MonitoringParamError(
68 "Parser not created. Unable to extract value from %s" % json_msg
69 )
70
71 try:
72 matches = self._json_path_expr.find(json_dict)
73 values = [m.value for m in matches]
74 except Exception as e:
75 raise MonitoringParamError(
76 "Failed to run find using json_path (%s) against json_msg: %s" %
77 (self._json_path, str(e))
78 )
79
80 if len(values) == 0:
81 raise MonitoringParamError(
82 "No values found from json_path (%s)" % self._json_path
83 )
84
85 if len(values) > 1:
86 self._log.debug("Got multiple values from json_path (%s). Only returning the first.",
87 self._json_path)
88
89 return values[0]
90
91
92 class ObjectPathValueQuerier(object):
93 def __init__(self, log, object_path):
94 self._log = log
95 self._object_path = object_path
96 self._object_path_expr = None
97
98 def query(self, object_msg):
99 try:
100 object_dict = tornado.escape.json_decode(object_msg)
101 except ValueError as e:
102 msg = "Failed to convert response into object"
103 self._log.warning(msg)
104 raise MonitoringParamError(e)
105
106 import objectpath
107 try:
108 tree = objectpath.Tree(object_dict)
109 except Exception as e:
110 msg = "Could not create objectpath tree: %s", str(e)
111 self._log.error(msg)
112 raise MonitoringParamError(msg)
113
114 try:
115 value = tree.execute(self._object_path)
116 except Exception as e:
117 raise MonitoringParamError(
118 "Failed to run execute object_path (%s) against object_msg: %s" %
119 (self._object_path, str(e))
120 )
121
122 if isinstance(value, types.GeneratorType):
123 try:
124 value = next(value)
125 except Exception as e:
126 raise MonitoringParamError(
127 "Failed to get value from objectpath %s execute generator: %s" %
128 (self._object_path, str(e))
129 )
130
131 if isinstance(value, (list, tuple)):
132 if len(value) == 0:
133 raise MonitoringParamError(
134 "No values found from object_path (%s)" % self._object_path
135 )
136
137 elif len(value) > 1:
138 self._log.debug(
139 "Got multiple values from object_path (%s). "
140 "Only returning the first.", self._object_path
141 )
142
143 # Only take the first element
144 value = value[0]
145
146 return value
147
148
149 class JsonKeyValueQuerier(object):
150 def __init__(self, log, key):
151 self._log = log
152 self._key = key
153
154 def query(self, json_msg):
155 try:
156 json_dict = tornado.escape.json_decode(json_msg)
157 except ValueError as e:
158 msg = "Failed to convert response into json"
159 self._log.warning(msg)
160 raise MonitoringParamError(e)
161
162 if self._key not in json_dict:
163 msg = "Did not find '{}' key in response: {}".format(
164 self._key, json_dict
165 )
166 self._log.warning(msg)
167 raise MonitoringParamError(msg)
168
169 value = json_dict[self._key]
170
171 return value
172
173
174 class ValueConverter(object):
175 def __init__(self, value_type):
176 self._value_type = value_type
177
178 def _convert_int(self, value):
179 if isinstance(value, int):
180 return value
181
182 try:
183 return int(value)
184 except (ValueError, TypeError) as e:
185 raise MonitoringParamError(
186 "Could not convert value into integer: %s", str(e)
187 )
188
189 def _convert_text(self, value):
190 if isinstance(value, str):
191 return value
192
193 try:
194 return str(value)
195 except (ValueError, TypeError) as e:
196 raise MonitoringParamError(
197 "Could not convert value into string: %s", str(e)
198 )
199
200 def _convert_decimal(self, value):
201 if isinstance(value, float):
202 return value
203
204 try:
205 return float(value)
206 except (ValueError, TypeError) as e:
207 raise MonitoringParamError(
208 "Could not convert value into string: %s", str(e)
209 )
210
211 def convert(self, value):
212 if self._value_type == "INT":
213 return self._convert_int(value)
214 elif self._value_type == "DECIMAL":
215 return self._convert_decimal(value)
216 elif self._value_type == "STRING":
217 return self._convert_text(value)
218 else:
219 raise MonitoringParamError("Unknown value type: %s", self._value_type)
220
221
222 class HTTPBasicAuth(object):
223 def __init__(self, username, password):
224 self.username = username
225 self.password = password
226
227
228 class HTTPEndpoint(object):
229 def __init__(self, log, loop, ip_address, ep_msg):
230 self._log = log
231 self._loop = loop
232 self._ip_address = ip_address
233 self._ep_msg = ep_msg
234
235 # This is to suppress HTTPS related warning as we do not support
236 # certificate verification yet
237 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
238 self._session = requests.Session()
239 self._auth = None
240 self._headers = None
241
242 @property
243 def poll_interval(self):
244 return self._ep_msg.polling_interval_secs
245
246 @property
247 def ip_address(self):
248 return self._ip_address
249
250 @property
251 def port(self):
252 return self._ep_msg.port
253
254 @property
255 def protocol(self):
256 if self._ep_msg.has_field("https"):
257 if self._ep_msg.https is True:
258 return "https"
259
260 return "http"
261
262 @property
263 def path(self):
264 return self._ep_msg.path
265
266 @property
267 def method(self):
268 if self._ep_msg.has_field("method"):
269 return self._ep_msg.method
270 return "GET"
271
272 @property
273 def username(self):
274 if self._ep_msg.has_field("username"):
275 return self._ep_msg.username
276
277 return None
278
279 @property
280 def headers(self):
281 if self._headers is None:
282 headers = {}
283 for header in self._ep_msg.headers:
284 if header.has_field("key") and header.has_field("value"):
285 headers[header.key] = header.value
286
287 self._headers = headers
288
289 return self._headers
290
291 @property
292 def password(self):
293 if self._ep_msg.has_field("password"):
294 return self._ep_msg.password
295
296 return None
297
298 @property
299 def auth(self):
300 if self._auth is None:
301 if self.username is not None and self.password is not None:
302 self._auth = requests.auth.HTTPBasicAuth(
303 self.username,
304 self.password,
305 )
306
307 return self._auth
308
309 @property
310 def url(self):
311 url = "{protocol}://{ip_address}:{port}/{path}".format(
312 protocol=self.protocol,
313 ip_address=self.ip_address,
314 port=self.port,
315 path=self.path.lstrip("/"),
316 )
317
318 return url
319
320 def _poll(self):
321 try:
322 resp = self._session.request(
323 self.method, self.url, timeout=10, auth=self.auth,
324 headers=self.headers, verify=False
325 )
326 resp.raise_for_status()
327 except requests.exceptions.RequestException as e:
328 msg = "Got HTTP error when request monitoring method {} from url {}: {}".format(
329 self.method,
330 self.url,
331 str(e),
332 )
333 self._log.warning(msg)
334 raise MonitoringParamError(msg)
335
336 return resp.text
337
338 @asyncio.coroutine
339 def poll(self):
340 try:
341 with concurrent.futures.ThreadPoolExecutor(1) as executor:
342 resp = yield from self._loop.run_in_executor(
343 executor,
344 self._poll,
345 )
346
347 except MonitoringParamError as e:
348 msg = "Caught exception when polling http endpoint: %s" % str(e)
349 self._log.warning(msg)
350 raise MonitoringParamError(msg)
351
352 self._log.debug("Got response from http endpoint (%s): %s",
353 self.url, resp)
354
355 return resp
356
357
358 class MonitoringParam(object):
359 def __init__(self, log, vnfr_mon_param_msg):
360 self._log = log
361 self._vnfr_mon_param_msg = vnfr_mon_param_msg
362
363 self._current_value = None
364
365 self._json_querier = self._create_json_querier()
366 self._value_converter = ValueConverter(self.value_type)
367
368 def _create_json_querier(self):
369 if self.msg.json_query_method == "NAMEKEY":
370 return JsonKeyValueQuerier(self._log, self.msg.name)
371 elif self.msg.json_query_method == "JSONPATH":
372 if not self.msg.json_query_params.has_field("json_path"):
373 msg = "JSONPATH query_method requires json_query_params.json_path to be filled in %s"
374 self._log.error(msg, self.msg)
375 raise ValueError(msg)
376 return JsonPathValueQuerier(self._log, self.msg.json_query_params.json_path)
377 elif self.msg.json_query_method == "OBJECTPATH":
378 if not self.msg.json_query_params.has_field("object_path"):
379 msg = "OBJECTPATH query_method requires json_query_params.object_path to be filled in %s"
380 self._log.error(msg, self.msg)
381 raise ValueError(msg)
382 return ObjectPathValueQuerier(self._log, self.msg.json_query_params.object_path)
383 else:
384 msg = "Unknown JSON query method: %s" % self.json_query_method
385 self._log.error(msg)
386 raise ValueError(msg)
387
388 @property
389 def current_value(self):
390 return self._current_value
391
392 @property
393 def msg(self):
394 msg = self._vnfr_mon_param_msg
395 value_type = msg.value_type
396
397 if self._current_value is None:
398 return msg
399
400 if value_type == "INT":
401 msg.value_integer = self._current_value
402
403 elif value_type == "DECIMAL":
404 msg.value_decimal = self._current_value
405
406 elif value_type == "STRING":
407 msg.value_string = self._current_value
408
409 else:
410 self._log.debug("Unknown value_type: %s", value_type)
411
412 return msg
413
414 @property
415 def path(self):
416 return self.msg.http_endpoint_ref
417
418 @property
419 def value_type(self):
420 return self.msg.value_type
421
422 @property
423 def json_query_method(self):
424 return self.msg.json_query_method
425
426 @property
427 def json_path(self):
428 return self.msg.json_path_params.json_path
429
430 @property
431 def name(self):
432 return self.msg.name
433
434 def extract_value_from_response(self, response_msg):
435 if self._json_querier is None:
436 self._log.warning("json querier is not created. Cannot extract value form response.")
437 return
438
439 try:
440 xml_data = xmltodict.parse(response_msg)
441 json_msg=json.dumps(xml_data)
442 response_msg = json_msg
443 except Exception as e:
444 pass
445
446 try:
447 value = self._json_querier.query(response_msg)
448 converted_value = self._value_converter.convert(value)
449 except MonitoringParamError as e:
450 self._log.warning("Failed to extract value from json response: %s", str(e))
451 return
452 else:
453 self._current_value = converted_value
454
455
456 class EndpointMonParamsPoller(object):
457 REQUEST_TIMEOUT_SECS = 10
458
459 def __init__(self, log, loop, endpoint, mon_params, on_update_cb=None):
460 self._log = log
461 self._loop = loop
462 self._endpoint = endpoint
463 self._mon_params = mon_params
464 self._on_update_cb = on_update_cb
465
466 self._poll_task = None
467
468 @property
469 def poll_interval(self):
470 return self._endpoint.poll_interval
471
472 def _get_mon_param_msgs(self):
473 return [mon_param.msg for mon_param in self._mon_params]
474
475 def _notify_subscriber(self):
476 if self._on_update_cb is None:
477 return
478
479 self._on_update_cb(self._get_mon_param_msgs())
480
481 def _apply_response_to_mon_params(self, response_msg):
482 for mon_param in self._mon_params:
483 mon_param.extract_value_from_response(response_msg)
484
485 self._notify_subscriber()
486
487 @asyncio.coroutine
488 def _poll_loop(self):
489 self._log.debug("Starting http endpoint %s poll loop", self._endpoint.url)
490 while True:
491 try:
492 response = yield from self._endpoint.poll()
493 self._apply_response_to_mon_params(response)
494 except concurrent.futures.CancelledError as e:
495 return
496
497 yield from asyncio.sleep(self.poll_interval, loop=self._loop)
498
499 def start(self):
500 self._log.debug("Got start request for endpoint poller: %s",
501 self._endpoint.url)
502 if self._poll_task is not None:
503 return
504 self._poll_task = self._loop.create_task(self._poll_loop())
505
506 def stop(self):
507 self._log.debug("Got stop request for endpoint poller: %s",
508 self._endpoint.url)
509 if self._poll_task is None:
510 return
511
512 self._poll_task.cancel()
513
514 self._poll_task = None
515
516
517 class VnfMonitoringParamsController(object):
518 def __init__(self, log, loop, vnfr_id, management_ip,
519 http_endpoint_msgs, monitoring_param_msgs,
520 on_update_cb=None):
521 self._log = log
522 self._loop = loop
523 self._vnfr_id = vnfr_id
524 self._management_ip = management_ip
525 self._http_endpoint_msgs = http_endpoint_msgs
526 self._monitoring_param_msgs = monitoring_param_msgs
527
528 self._on_update_cb = on_update_cb
529 self._endpoints = self._create_endpoints()
530 self._mon_params = self._create_mon_params()
531
532 self._endpoint_mon_param_map = self._create_endpoint_mon_param_map(
533 self._endpoints, self._mon_params
534 )
535 self._endpoint_pollers = self._create_endpoint_pollers(self._endpoint_mon_param_map)
536
537 def _create_endpoints(self):
538 path_endpoint_map = {}
539 for ep_msg in self._http_endpoint_msgs:
540 endpoint = HTTPEndpoint(
541 self._log,
542 self._loop,
543 self._management_ip,
544 ep_msg,
545 )
546 path_endpoint_map[endpoint.path] = endpoint
547
548 return path_endpoint_map
549
550 def _create_mon_params(self):
551 mon_params = {}
552 for mp_msg in self._monitoring_param_msgs:
553 mon_params[mp_msg.id] = MonitoringParam(
554 self._log,
555 mp_msg,
556 )
557
558 return mon_params
559
560 def _create_endpoint_mon_param_map(self, endpoints, mon_params):
561 ep_mp_map = collections.defaultdict(list)
562 for mp in mon_params.values():
563 endpoint = endpoints[mp.path]
564 ep_mp_map[endpoint].append(mp)
565
566 return ep_mp_map
567
568 def _create_endpoint_pollers(self, ep_mp_map):
569 pollers = []
570
571 for endpoint, mon_params in ep_mp_map.items():
572 poller = EndpointMonParamsPoller(
573 self._log,
574 self._loop,
575 endpoint,
576 mon_params,
577 self._on_update_cb
578 )
579
580 pollers.append(poller)
581
582 return pollers
583
584 @property
585 def msgs(self):
586 msgs = []
587 for mp in self.mon_params:
588 msgs.append(mp.msg)
589
590 return msgs
591
592 @property
593 def mon_params(self):
594 return list(self._mon_params.values())
595
596 @property
597 def endpoints(self):
598 return list(self._endpoints.values())
599
600 def start(self):
601 """ Start monitoring """
602 self._log.debug("Starting monitoring of VNF id: %s", self._vnfr_id)
603 for poller in self._endpoint_pollers:
604 poller.start()
605
606 def stop(self):
607 """ Stop monitoring """
608 self._log.debug("Stopping monitoring of VNF id: %s", self._vnfr_id)
609 for poller in self._endpoint_pollers:
610 poller.stop()
611
612
613 class VnfMonitorDtsHandler(mano_dts.DtsHandler):
614 """ VNF monitoring class """
615 # List of list: So we need to register for the list in the deepest level
616 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr/vnfr:monitoring-param"
617
618 @classmethod
619 def from_vnf_data(cls, tasklet, vnfr_msg, vnfd_msg):
620 handler = cls(tasklet.log, tasklet.dts, tasklet.loop,
621 vnfr_msg.id, vnfr_msg.mgmt_interface.ip_address,
622 vnfd_msg.monitoring_param, vnfd_msg.http_endpoint)
623
624 return handler
625
626 def __init__(self, log, dts, loop, vnfr_id, mgmt_ip, params, endpoints):
627 super().__init__(log, dts, loop)
628
629 self._mgmt_ip = mgmt_ip
630 self._vnfr_id = vnfr_id
631
632 mon_params = []
633 for mon_param in params:
634 param = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(
635 mon_param.as_dict()
636 )
637 mon_params.append(param)
638
639 http_endpoints = []
640 for endpoint in endpoints:
641 endpoint = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_HttpEndpoint.from_dict(
642 endpoint.as_dict()
643 )
644 http_endpoints.append(endpoint)
645
646 self.log.debug("Creating monitoring param controller")
647 self.log.debug(" - Endpoints: %s", http_endpoints)
648 self.log.debug(" - Monitoring Params: %s", mon_params)
649
650 self._mon_param_controller = VnfMonitoringParamsController(
651 self.log,
652 self.loop,
653 self._vnfr_id,
654 self._mgmt_ip,
655 http_endpoints,
656 mon_params,
657 self.on_update_mon_params
658 )
659
660 def on_update_mon_params(self, mon_param_msgs):
661 for param_msg in mon_param_msgs:
662 self.reg.update_element(
663 self.xpath(param_msg.id),
664 param_msg,
665 rwdts.XactFlag.ADVISE
666 )
667
668 def start(self):
669 self._mon_param_controller.start()
670
671 def stop(self):
672 self.deregister()
673 self._mon_param_controller.stop()
674
675 def xpath(self, param_id=None):
676 """ Monitoring params xpath """
677 return("D,/vnfr:vnfr-catalog" +
678 "/vnfr:vnfr[vnfr:id='{}']".format(self._vnfr_id) +
679 "/vnfr:monitoring-param" +
680 ("[vnfr:id='{}']".format(param_id) if param_id else ""))
681
682 @property
683 def msg(self):
684 """ The message with the monitoing params """
685 return self._mon_param_controller.msgs
686
687 def __del__(self):
688 self.stop()
689
690 @asyncio.coroutine
691 def register(self):
692 """ Register with dts """
693
694 self.reg = yield from self.dts.register(xpath=self.xpath(),
695 flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
696
697 assert self.reg is not None
698
699 def deregister(self):
700 """ de-register with dts """
701 if self.reg is not None:
702 self.log.debug("Deregistering path %s, regh = %s",
703 VnfMonitorDtsHandler.XPATH,
704 self.reg)
705 self.reg.deregister()
706 self.reg = None
707 self._vnfr = None