update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwmonparam / rift / tasklets / rwmonparam / vnfr_core.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import collections
20 import concurrent
21 import gi
22 import logging
23 import requests
24 import requests.auth
25 import tornado.escape
26 import types
27
28 from requests.packages.urllib3.exceptions import InsecureRequestWarning
29
30 gi.require_version('RwDts', '1.0')
31 import rift.tasklets
32 from gi.repository import (
33 RwDts as rwdts,
34 VnfrYang
35 )
36 import rift.mano.dts as mano_dts
37 import rwlogger
38 import xmltodict, json
39 gi.require_version('RwKeyspec', '1.0')
40 from gi.repository.RwKeyspec import quoted_key
41
42
43 class MonitoringParamError(Exception):
44 """Monitoring Parameter error"""
45 pass
46
47
48 class JsonPathValueQuerier(object):
49 def __init__(self, log, json_path):
50 self._log = log
51 self._json_path = json_path
52 self._json_path_expr = None
53
54 try:
55 import jsonpath_rw
56 self._json_path_expr = jsonpath_rw.parse(self._json_path)
57 except Exception as e:
58 self._log.error("Could not create json_path parser: %s", str(e))
59
60 def query(self, json_msg):
61 try:
62 json_dict = tornado.escape.json_decode(json_msg)
63 except ValueError as e:
64 msg = "Failed to convert response into json"
65 self._log.warning(msg)
66 raise MonitoringParamError(e)
67
68 if self._json_path_expr is None:
69 raise MonitoringParamError(
70 "Parser not created. Unable to extract value from %s" % json_msg
71 )
72
73 try:
74 matches = self._json_path_expr.find(json_dict)
75 values = [m.value for m in matches]
76 except Exception as e:
77 raise MonitoringParamError(
78 "Failed to run find using json_path (%s) against json_msg: %s" %
79 (self._json_path, str(e))
80 )
81
82 if len(values) == 0:
83 raise MonitoringParamError(
84 "No values found from json_path (%s)" % self._json_path
85 )
86
87 if len(values) > 1:
88 self._log.debug("Got multiple values from json_path (%s). Only returning the first.",
89 self._json_path)
90
91 return values[0]
92
93
94 class ObjectPathValueQuerier(object):
95 def __init__(self, log, object_path):
96 self._log = log
97 self._object_path = object_path
98 self._object_path_expr = None
99
100 def query(self, object_msg):
101 try:
102 object_dict = tornado.escape.json_decode(object_msg)
103 except ValueError as e:
104 msg = "Failed to convert response into object"
105 self._log.warning(msg)
106 raise MonitoringParamError(e)
107
108 import objectpath
109 try:
110 tree = objectpath.Tree(object_dict)
111 except Exception as e:
112 msg = "Could not create objectpath tree: %s", str(e)
113 self._log.error(msg)
114 raise MonitoringParamError(msg)
115
116 try:
117 value = tree.execute(self._object_path)
118 except Exception as e:
119 raise MonitoringParamError(
120 "Failed to run execute object_path (%s) against object_msg: %s" %
121 (self._object_path, str(e))
122 )
123
124 if isinstance(value, types.GeneratorType):
125 try:
126 value = next(value)
127 except Exception as e:
128 raise MonitoringParamError(
129 "Failed to get value from objectpath %s execute generator: %s" %
130 (self._object_path, str(e))
131 )
132
133 if isinstance(value, (list, tuple)):
134 if len(value) == 0:
135 raise MonitoringParamError(
136 "No values found from object_path (%s)" % self._object_path
137 )
138
139 elif len(value) > 1:
140 self._log.debug(
141 "Got multiple values from object_path (%s). "
142 "Only returning the first.", self._object_path
143 )
144
145 # Only take the first element
146 value = value[0]
147
148 return value
149
150
151 class JsonKeyValueQuerier(object):
152 def __init__(self, log, key):
153 self._log = log
154 self._key = key
155
156 def query(self, json_msg):
157 try:
158 json_dict = tornado.escape.json_decode(json_msg)
159 except ValueError as e:
160 msg = "Failed to convert response into json"
161 self._log.warning(msg)
162 raise MonitoringParamError(e)
163
164 if self._key not in json_dict:
165 msg = "Did not find '{}' key in response: {}".format(
166 self._key, json_dict
167 )
168 self._log.warning(msg)
169 raise MonitoringParamError(msg)
170
171 value = json_dict[self._key]
172
173 return value
174
175
176 class ValueConverter(object):
177 def __init__(self, value_type):
178 self._value_type = value_type
179
180 def _convert_int(self, value):
181 if isinstance(value, int):
182 return value
183
184 try:
185 return int(value)
186 except (ValueError, TypeError) as e:
187 raise MonitoringParamError(
188 "Could not convert value into integer: %s", str(e)
189 )
190
191 def _convert_text(self, value):
192 if isinstance(value, str):
193 return value
194
195 try:
196 return str(value)
197 except (ValueError, TypeError) as e:
198 raise MonitoringParamError(
199 "Could not convert value into string: %s", str(e)
200 )
201
202 def _convert_decimal(self, value):
203 if isinstance(value, float):
204 return value
205
206 try:
207 return float(value)
208 except (ValueError, TypeError) as e:
209 raise MonitoringParamError(
210 "Could not convert value into string: %s", str(e)
211 )
212
213 def convert(self, value):
214 if self._value_type == "INT":
215 return self._convert_int(value)
216 elif self._value_type == "DECIMAL":
217 return self._convert_decimal(value)
218 elif self._value_type == "STRING":
219 return self._convert_text(value)
220 else:
221 raise MonitoringParamError("Unknown value type: %s", self._value_type)
222
223
224 class HTTPBasicAuth(object):
225 def __init__(self, username, password):
226 self.username = username
227 self.password = password
228
229
230 class HTTPEndpoint(object):
231 def __init__(self, log, loop, ip_address, ep_msg, executor=None):
232 self._log = log
233 self._loop = loop
234 self._ip_address = ip_address
235 self._ep_msg = ep_msg
236 self._executor = executor
237
238 # This is to suppress HTTPS related warning as we do not support
239 # certificate verification yet
240 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
241 self._session = requests.Session()
242 self._auth = None
243 self._headers = None
244
245 @property
246 def poll_interval(self):
247 return self._ep_msg.polling_interval_secs
248
249 @property
250 def ip_address(self):
251 return self._ip_address
252
253 @property
254 def port(self):
255 return self._ep_msg.port
256
257 @property
258 def protocol(self):
259 if self._ep_msg.has_field("https"):
260 if self._ep_msg.https is True:
261 return "https"
262
263 return "http"
264
265 @property
266 def path(self):
267 return self._ep_msg.path
268
269 @property
270 def method(self):
271 if self._ep_msg.has_field("method"):
272 return self._ep_msg.method
273 return "GET"
274
275 @property
276 def query_data(self):
277 if self._ep_msg.has_field("data"):
278 return self._ep_msg.data
279 return None
280
281 @property
282 def username(self):
283 if self._ep_msg.has_field("username"):
284 return self._ep_msg.username
285
286 return None
287
288 @property
289 def headers(self):
290 if self._headers is None:
291 headers = {}
292 for header in self._ep_msg.headers:
293 if header.has_field("key") and header.has_field("value"):
294 headers[header.key] = header.value
295
296 self._headers = headers
297
298 return self._headers
299
300 @property
301 def password(self):
302 if self._ep_msg.has_field("password"):
303 return self._ep_msg.password
304
305 return None
306
307 @property
308 def auth(self):
309 if self._auth is None:
310 if self.username is not None and self.password is not None:
311 self._auth = requests.auth.HTTPBasicAuth(
312 self.username,
313 self.password,
314 )
315
316 return self._auth
317
318 @property
319 def url(self):
320 url = "{protocol}://{ip_address}:{port}/{path}".format(
321 protocol=self.protocol,
322 ip_address=self.ip_address,
323 port=self.port,
324 path=self.path.lstrip("/"),
325 )
326
327 return url
328
329 def _poll(self):
330 try:
331 resp = self._session.request(
332 self.method, self.url, timeout=10, auth=self.auth,
333 headers=self.headers, verify=False, data=self.query_data
334 )
335
336 resp.raise_for_status()
337 except requests.exceptions.RequestException as e:
338 msg = "Got HTTP error when request monitoring method {} from url {}: {}".format(
339 self.method,
340 self.url,
341 str(e),
342 )
343 self._log.warning(msg)
344 raise MonitoringParamError(msg)
345
346 return resp.text
347
348 @asyncio.coroutine
349 def poll(self):
350 try:
351 if (self._executor is None):
352 with concurrent.futures.ThreadPoolExecutor(1) as executor:
353 resp = yield from self._loop.run_in_executor(
354 executor,
355 self._poll,
356 )
357 else:
358 resp = yield from self._loop.run_in_executor(
359 self._executor,
360 self._poll,
361 )
362
363 except MonitoringParamError as e:
364 msg = "Caught exception when polling http endpoint: %s" % str(e)
365 self._log.warning(msg)
366 raise MonitoringParamError(msg)
367
368 self._log.debug("Got response from http endpoint (%s): %s",
369 self.url, resp)
370
371 return resp
372
373
374 class MonitoringParam(object):
375 def __init__(self, log, vnfr_mon_param_msg):
376 self._log = log
377 self._vnfr_mon_param_msg = vnfr_mon_param_msg
378
379 self._current_value = None
380
381 self._json_querier = self._create_json_querier()
382 self._value_converter = ValueConverter(self.value_type)
383
384 def _create_json_querier(self):
385 if self.msg.json_query_method == "NAMEKEY":
386 return JsonKeyValueQuerier(self._log, self.msg.name)
387 elif self.msg.json_query_method == "JSONPATH":
388 if not self.msg.json_query_params.has_field("json_path"):
389 msg = "JSONPATH query_method requires json_query_params.json_path to be filled in %s"
390 self._log.error(msg, self.msg)
391 raise ValueError(msg)
392 return JsonPathValueQuerier(self._log, self.msg.json_query_params.json_path)
393 elif self.msg.json_query_method == "OBJECTPATH":
394 if not self.msg.json_query_params.has_field("object_path"):
395 msg = "OBJECTPATH query_method requires json_query_params.object_path to be filled in %s"
396 self._log.error(msg, self.msg)
397 raise ValueError(msg)
398 return ObjectPathValueQuerier(self._log, self.msg.json_query_params.object_path)
399 else:
400 msg = "Unknown JSON query method: %s" % self.json_query_method
401 self._log.error(msg)
402 raise ValueError(msg)
403
404 @property
405 def current_value(self):
406 return self._current_value
407
408 @property
409 def msg(self):
410 msg = self._vnfr_mon_param_msg
411 value_type = msg.value_type
412
413 if self._current_value is None:
414 return msg
415
416 if value_type == "INT":
417 msg.value_integer = self._current_value
418
419 elif value_type == "DECIMAL":
420 msg.value_decimal = self._current_value
421
422 elif value_type == "STRING":
423 msg.value_string = self._current_value
424
425 else:
426 self._log.debug("Unknown value_type: %s", value_type)
427
428 return msg
429
430 @property
431 def path(self):
432 return self.msg.http_endpoint_ref
433
434 @property
435 def value_type(self):
436 return self.msg.value_type
437
438 @property
439 def json_query_method(self):
440 return self.msg.json_query_method
441
442 @property
443 def json_path(self):
444 return self.msg.json_path_params.json_path
445
446 @property
447 def name(self):
448 return self.msg.name
449
450 def extract_value_from_response(self, response_msg):
451 if self._json_querier is None:
452 self._log.warning("json querier is not created. Cannot extract value form response.")
453 return
454
455 try:
456 xml_data = xmltodict.parse(response_msg)
457 json_msg=json.dumps(xml_data)
458 response_msg = json_msg
459 except Exception as e:
460 pass
461
462 try:
463 value = self._json_querier.query(response_msg)
464 converted_value = self._value_converter.convert(value)
465 except MonitoringParamError as e:
466 self._log.warning("Failed to extract value from json response: %s", str(e))
467 return
468 else:
469 self._current_value = converted_value
470
471
472 class EndpointMonParamsPoller(object):
473 REQUEST_TIMEOUT_SECS = 10
474
475 def __init__(self, log, loop, endpoint, mon_params, on_update_cb=None):
476 self._log = log
477 self._loop = loop
478 self._endpoint = endpoint
479 self._mon_params = mon_params
480 self._on_update_cb = on_update_cb
481
482 self._poll_task = None
483
484 @property
485 def poll_interval(self):
486 return self._endpoint.poll_interval
487
488 def _get_mon_param_msgs(self):
489 return [mon_param.msg for mon_param in self._mon_params]
490
491 def _notify_subscriber(self):
492 if self._on_update_cb is None:
493 return
494
495 self._on_update_cb(self._get_mon_param_msgs())
496
497 def _apply_response_to_mon_params(self, response_msg):
498 for mon_param in self._mon_params:
499 mon_param.extract_value_from_response(response_msg)
500
501 self._notify_subscriber()
502
503 @asyncio.coroutine
504 def _poll_loop(self):
505 self._log.debug("Starting http endpoint %s poll loop", self._endpoint.url)
506 while True:
507 try:
508 response = yield from self._endpoint.poll()
509 self._apply_response_to_mon_params(response)
510 except MonitoringParamError as e:
511 pass
512 except concurrent.futures.CancelledError as e:
513 return
514
515 yield from asyncio.sleep(self.poll_interval, loop=self._loop)
516
517 def start(self):
518 self._log.debug("Got start request for endpoint poller: %s",
519 self._endpoint.url)
520 if self._poll_task is not None:
521 return
522 self._poll_task = self._loop.create_task(self._poll_loop())
523
524 def stop(self):
525 self._log.debug("Got stop request for endpoint poller: %s",
526 self._endpoint.url)
527 if self._poll_task is None:
528 return
529
530 self._poll_task.cancel()
531
532 self._poll_task = None
533
534 def retrieve(self, xact_info, ks_path, send_handler):
535 send_handler(xact_info, self._get_mon_param_msgs())
536
537
538 class VnfMonitoringParamsController(object):
539 def __init__(self, log, loop, vnfr_id, management_ip,
540 http_endpoint_msgs, monitoring_param_msgs,
541 on_update_cb=None, executor=None):
542 self._log = log
543 self._loop = loop
544 self._vnfr_id = vnfr_id
545 self._executor = executor
546 self._management_ip = management_ip
547 self._http_endpoint_msgs = http_endpoint_msgs
548 self._monitoring_param_msgs = monitoring_param_msgs
549
550 self._on_update_cb = on_update_cb
551 self._endpoints = self._create_endpoints()
552 self._mon_params = self._create_mon_params()
553
554 self._endpoint_mon_param_map = self._create_endpoint_mon_param_map(
555 self._endpoints, self._mon_params
556 )
557 self._endpoint_pollers = self._create_endpoint_pollers(self._endpoint_mon_param_map)
558
559 def _create_endpoints(self):
560 path_endpoint_map = {}
561 for ep_msg in self._http_endpoint_msgs:
562 endpoint = HTTPEndpoint(self._log,
563 self._loop,
564 self._management_ip,
565 ep_msg,self._executor)
566
567 path_endpoint_map[endpoint.path] = endpoint
568
569 return path_endpoint_map
570
571 def _create_mon_params(self):
572 mon_params = {}
573 for mp_msg in self._monitoring_param_msgs:
574 mon_params[mp_msg.id] = MonitoringParam(
575 self._log,
576 mp_msg,
577 )
578
579 return mon_params
580
581 def _create_endpoint_mon_param_map(self, endpoints, mon_params):
582 ep_mp_map = collections.defaultdict(list)
583 for mp in mon_params.values():
584 endpoint = endpoints[mp.path]
585 ep_mp_map[endpoint].append(mp)
586
587 return ep_mp_map
588
589 def _create_endpoint_pollers(self, ep_mp_map):
590 pollers = []
591
592 for endpoint, mon_params in ep_mp_map.items():
593 poller = EndpointMonParamsPoller(
594 self._log,
595 self._loop,
596 endpoint,
597 mon_params,
598 self._on_update_cb
599 )
600 pollers.append(poller)
601
602 return pollers
603
604 @property
605 def msgs(self):
606 msgs = []
607 for mp in self.mon_params:
608 msgs.append(mp.msg)
609
610 return msgs
611
612 @property
613 def mon_params(self):
614 return list(self._mon_params.values())
615
616 @property
617 def endpoints(self):
618 return list(self._endpoints.values())
619
620 def start(self):
621 """ Start monitoring """
622 self._log.debug("Starting monitoring of VNF id: %s", self._vnfr_id)
623 for poller in self._endpoint_pollers:
624 poller.start()
625
626 def stop(self):
627 """ Stop monitoring """
628 self._log.debug("Stopping monitoring of VNF id: %s", self._vnfr_id)
629 for poller in self._endpoint_pollers:
630 poller.stop()
631
632 def retrieve(self, xact_info, ks_path, send_handler):
633 """Retrieve Monitoring params information """
634 for poller in self._endpoint_pollers:
635 poller.retrieve(xact_info, ks_path, send_handler)
636
637 class VnfMonitorDtsHandler(mano_dts.DtsHandler):
638 """ VNF monitoring class """
639 # List of list: So we need to register for the list in the deepest level
640 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr/vnfr:monitoring-param"
641
642 @classmethod
643 def from_vnf_data(cls, project, vnfr_msg, vnfd_msg):
644 handler = cls(project.log, project.dts, project.loop, project,
645 vnfr_msg.id, vnfr_msg.mgmt_interface.ip_address,
646 vnfd_msg.monitoring_param, vnfd_msg.http_endpoint)
647
648 return handler
649
650 def __init__(self, log, dts, loop, project, vnfr_id, mgmt_ip, params, endpoints, executor=None):
651 super().__init__(log, dts, loop, project)
652
653 self._mgmt_ip = mgmt_ip
654 self._vnfr_id = vnfr_id
655 self._executor = executor
656
657 mon_params = []
658 for mon_param in params:
659 param = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam.from_dict(
660 mon_param.as_dict()
661 )
662 mon_params.append(param)
663
664 http_endpoints = []
665 for endpoint in endpoints:
666 endpoint = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_HttpEndpoint.from_dict(
667 endpoint.as_dict()
668 )
669 http_endpoints.append(endpoint)
670
671 self.log.debug("Creating monitoring param controller")
672 self.log.debug(" - Endpoints: %s", http_endpoints)
673 self.log.debug(" - Monitoring Params: %s", mon_params)
674
675 self._mon_param_controller = VnfMonitoringParamsController(
676 self.log,
677 self.loop,
678 self._vnfr_id,
679 self._mgmt_ip,
680 http_endpoints,
681 mon_params,
682 on_update_cb = self.on_update_mon_params,
683 executor=self._executor,
684 )
685 self._nsr_mon = None
686
687 def on_update_mon_params(self, mon_param_msgs):
688 for param_msg in mon_param_msgs:
689 #self.reg.update_element(
690 # self.xpath(param_msg.id),
691 # param_msg,
692 # rwdts.XactFlag.ADVISE
693 # )
694 if (self._nsr_mon is not None):
695 self._nsr_mon.apply_vnfr_mon(param_msg, self._vnfr_id)
696
697 def update_dts_read(self, xact_info, mon_param_msgs):
698 for param_msg in mon_param_msgs:
699 xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE,
700 xpath=self.xpath(param_msg.id),
701 msg=param_msg)
702
703 def start(self):
704 self._mon_param_controller.start()
705
706 def stop(self):
707 self.deregister()
708 self._mon_param_controller.stop()
709
710 def xpath(self, param_id=None):
711 """ Monitoring params xpath """
712 return self.project.add_project(("D,/vnfr:vnfr-catalog" +
713 "/vnfr:vnfr[vnfr:id={}]".format(quoted_key(self._vnfr_id)) +
714 "/vnfr:monitoring-param" +
715 ("[vnfr:id={}]".format(quoted_key(param_id)) if param_id else "")))
716
717 @property
718 def msg(self):
719 """ The message with the monitoing params """
720 return self._mon_param_controller.msgs
721
722 def __del__(self):
723 self.stop()
724
725 @asyncio.coroutine
726 def register(self):
727 """ Register with dts """
728 @asyncio.coroutine
729 def on_prepare(xact_info, query_action, ks_path, msg):
730 if (self.reg_ready):
731 if (query_action == rwdts.QueryAction.READ):
732 self._mon_param_controller.retrieve(xact_info, ks_path, self.update_dts_read)
733
734 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
735 @asyncio.coroutine
736 def on_ready(regh, status):
737 self.reg_ready = 1
738
739 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare, on_ready=on_ready)
740 self.reg_ready = 0
741 self.reg = yield from self.dts.register(xpath=self.xpath(),
742 flags=rwdts.Flag.PUBLISHER,
743 handler=handler)
744
745 assert self.reg is not None
746
747 def deregister(self):
748 """ de-register with dts """
749 if self.reg is not None:
750 self.log.debug("Deregistering path %s, regh = %s",
751 VnfMonitorDtsHandler.XPATH,
752 self.reg)
753 self.reg.deregister()
754 self.reg = None
755 self._vnfr = None
756
757 def update_nsr_mon(self, nsr_mon):
758 """ update nsr mon """
759 self._nsr_mon = nsr_mon
760