Merge "Revert "Functional spec for cloud-init support""
[osm/SO.git] / rwlaunchpad / plugins / rwmonparam / rift / tasklets / rwmonparam / vnfr_core.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import logging
20 import collections
21 import concurrent
22 import types
23
24 import requests
25 import requests.auth
26 import tornado.escape
27
28 from requests.packages.urllib3.exceptions import InsecureRequestWarning
29
30 import gi
31 gi.require_version('RwDts', '1.0')
32 import rift.tasklets
33 from gi.repository import (
34 RwDts as rwdts,
35 VnfrYang
36 )
37 import rift.mano.dts as mano_dts
38 import rwlogger
39
40
41 class MonitoringParamError(Exception):
42 """Monitoring Parameter error"""
43 pass
44
45
46 class JsonPathValueQuerier(object):
47 def __init__(self, log, json_path):
48 self._log = log
49 self._json_path = json_path
50 self._json_path_expr = None
51
52 try:
53 import jsonpath_rw
54 self._json_path_expr = jsonpath_rw.parse(self._json_path)
55 except Exception as e:
56 self._log.error("Could not create json_path parser: %s", str(e))
57
58 def query(self, json_msg):
59 try:
60 json_dict = tornado.escape.json_decode(json_msg)
61 except ValueError as e:
62 msg = "Failed to convert response into json"
63 self._log.warning(msg)
64 raise MonitoringParamError(e)
65
66 if self._json_path_expr is None:
67 raise MonitoringParamError(
68 "Parser not created. Unable to extract value from %s" % json_msg
69 )
70
71 try:
72 matches = self._json_path_expr.find(json_dict)
73 values = [m.value for m in matches]
74 except Exception as e:
75 raise MonitoringParamError(
76 "Failed to run find using json_path (%s) against json_msg: %s" %
77 (self._json_path, str(e))
78 )
79
80 if len(values) == 0:
81 raise MonitoringParamError(
82 "No values found from json_path (%s)" % self._json_path
83 )
84
85 if len(values) > 1:
86 self._log.debug("Got multiple values from json_path (%s). Only returning the first.",
87 self._json_path)
88
89 return values[0]
90
91
92 class ObjectPathValueQuerier(object):
93 def __init__(self, log, object_path):
94 self._log = log
95 self._object_path = object_path
96 self._object_path_expr = None
97
98 def query(self, object_msg):
99 try:
100 object_dict = tornado.escape.json_decode(object_msg)
101 except ValueError as e:
102 msg = "Failed to convert response into object"
103 self._log.warning(msg)
104 raise MonitoringParamError(e)
105
106 import objectpath
107 try:
108 tree = objectpath.Tree(object_dict)
109 except Exception as e:
110 msg = "Could not create objectpath tree: %s", str(e)
111 self._log.error(msg)
112 raise MonitoringParamError(msg)
113
114 try:
115 value = tree.execute(self._object_path)
116 except Exception as e:
117 raise MonitoringParamError(
118 "Failed to run execute object_path (%s) against object_msg: %s" %
119 (self._object_path, str(e))
120 )
121
122 if isinstance(value, types.GeneratorType):
123 try:
124 value = next(value)
125 except Exception as e:
126 raise MonitoringParamError(
127 "Failed to get value from objectpath %s execute generator: %s" %
128 (self._object_path, str(e))
129 )
130
131 if isinstance(value, (list, tuple)):
132 if len(value) == 0:
133 raise MonitoringParamError(
134 "No values found from object_path (%s)" % self._object_path
135 )
136
137 elif len(value) > 1:
138 self._log.debug(
139 "Got multiple values from object_path (%s). "
140 "Only returning the first.", self._object_path
141 )
142
143 # Only take the first element
144 value = value[0]
145
146 return value
147
148
149 class JsonKeyValueQuerier(object):
150 def __init__(self, log, key):
151 self._log = log
152 self._key = key
153
154 def query(self, json_msg):
155 try:
156 json_dict = tornado.escape.json_decode(json_msg)
157 except ValueError as e:
158 msg = "Failed to convert response into json"
159 self._log.warning(msg)
160 raise MonitoringParamError(e)
161
162 if self._key not in json_dict:
163 msg = "Did not find '{}' key in response: {}".format(
164 self._key, json_dict
165 )
166 self._log.warning(msg)
167 raise MonitoringParamError(msg)
168
169 value = json_dict[self._key]
170
171 return value
172
173
174 class ValueConverter(object):
175 def __init__(self, value_type):
176 self._value_type = value_type
177
178 def _convert_int(self, value):
179 if isinstance(value, int):
180 return value
181
182 try:
183 return int(value)
184 except (ValueError, TypeError) as e:
185 raise MonitoringParamError(
186 "Could not convert value into integer: %s", str(e)
187 )
188
189 def _convert_text(self, value):
190 if isinstance(value, str):
191 return value
192
193 try:
194 return str(value)
195 except (ValueError, TypeError) as e:
196 raise MonitoringParamError(
197 "Could not convert value into string: %s", str(e)
198 )
199
200 def _convert_decimal(self, value):
201 if isinstance(value, float):
202 return value
203
204 try:
205 return float(value)
206 except (ValueError, TypeError) as e:
207 raise MonitoringParamError(
208 "Could not convert value into string: %s", str(e)
209 )
210
211 def convert(self, value):
212 if self._value_type == "INT":
213 return self._convert_int(value)
214 elif self._value_type == "DECIMAL":
215 return self._convert_decimal(value)
216 elif self._value_type == "STRING":
217 return self._convert_text(value)
218 else:
219 raise MonitoringParamError("Unknown value type: %s", self._value_type)
220
221
222 class HTTPBasicAuth(object):
223 def __init__(self, username, password):
224 self.username = username
225 self.password = password
226
227
228 class HTTPEndpoint(object):
229 def __init__(self, log, loop, ip_address, ep_msg):
230 self._log = log
231 self._loop = loop
232 self._ip_address = ip_address
233 self._ep_msg = ep_msg
234
235 # This is to suppress HTTPS related warning as we do not support
236 # certificate verification yet
237 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
238 self._session = requests.Session()
239 self._auth = None
240 self._headers = None
241
242 @property
243 def poll_interval(self):
244 return self._ep_msg.polling_interval_secs
245
246 @property
247 def ip_address(self):
248 return self._ip_address
249
250 @property
251 def port(self):
252 return self._ep_msg.port
253
254 @property
255 def protocol(self):
256 if self._ep_msg.has_field("https"):
257 if self._ep_msg.https is True:
258 return "https"
259
260 return "http"
261
262 @property
263 def path(self):
264 return self._ep_msg.path
265
266 @property
267 def method(self):
268 if self._ep_msg.has_field("method"):
269 return self._ep_msg.method
270 return "GET"
271
272 @property
273 def username(self):
274 if self._ep_msg.has_field("username"):
275 return self._ep_msg.username
276
277 return None
278
279 @property
280 def headers(self):
281 if self._headers is None:
282 headers = {}
283 for header in self._ep_msg.headers:
284 if header.has_field("key") and header.has_field("value"):
285 headers[header.key] = header.value
286
287 self._headers = headers
288
289 return self._headers
290
291 @property
292 def password(self):
293 if self._ep_msg.has_field("password"):
294 return self._ep_msg.password
295
296 return None
297
298 @property
299 def auth(self):
300 if self._auth is None:
301 if self.username is not None and self.password is not None:
302 self._auth = requests.auth.HTTPBasicAuth(
303 self.username,
304 self.password,
305 )
306
307 return self._auth
308
309 @property
310 def url(self):
311 url = "{protocol}://{ip_address}:{port}/{path}".format(
312 protocol=self.protocol,
313 ip_address=self.ip_address,
314 port=self.port,
315 path=self.path.lstrip("/"),
316 )
317
318 return url
319
320 def _poll(self):
321 try:
322 resp = self._session.request(
323 self.method, self.url, timeout=10, auth=self.auth,
324 headers=self.headers, verify=False
325 )
326 resp.raise_for_status()
327 except requests.exceptions.RequestException as e:
328 msg = "Got HTTP error when request monitoring method {} from url {}: {}".format(
329 self.method,
330 self.url,
331 str(e),
332 )
333 self._log.warning(msg)
334 raise MonitoringParamError(msg)
335
336 return resp.text
337
338 @asyncio.coroutine
339 def poll(self):
340 try:
341 with concurrent.futures.ThreadPoolExecutor(1) as executor:
342 resp = yield from self._loop.run_in_executor(
343 executor,
344 self._poll,
345 )
346
347 except MonitoringParamError as e:
348 msg = "Caught exception when polling http endpoint: %s" % str(e)
349 self._log.warning(msg)
350 raise MonitoringParamError(msg)
351
352 self._log.debug("Got response from http endpoint (%s): %s",
353 self.url, resp)
354
355 return resp
356
357
358 class MonitoringParam(object):
359 def __init__(self, log, vnfr_mon_param_msg):
360 self._log = log
361 self._vnfr_mon_param_msg = vnfr_mon_param_msg
362
363 self._current_value = None
364
365 self._json_querier = self._create_json_querier()
366 self._value_converter = ValueConverter(self.value_type)
367
368 def _create_json_querier(self):
369 if self.msg.json_query_method == "NAMEKEY":
370 return JsonKeyValueQuerier(self._log, self.msg.name)
371 elif self.msg.json_query_method == "JSONPATH":
372 if not self.msg.json_query_params.has_field("json_path"):
373 msg = "JSONPATH query_method requires json_query_params.json_path to be filled in %s"
374 self._log.error(msg, self.msg)
375 raise ValueError(msg)
376 return JsonPathValueQuerier(self._log, self.msg.json_query_params.json_path)
377 elif self.msg.json_query_method == "OBJECTPATH":
378 if not self.msg.json_query_params.has_field("object_path"):
379 msg = "OBJECTPATH query_method requires json_query_params.object_path to be filled in %s"
380 self._log.error(msg, self.msg)
381 raise ValueError(msg)
382 return ObjectPathValueQuerier(self._log, self.msg.json_query_params.object_path)
383 else:
384 msg = "Unknown JSON query method: %s" % self.json_query_method
385 self._log.error(msg)
386 raise ValueError(msg)
387
388 @property
389 def current_value(self):
390 return self._current_value
391
392 @property
393 def msg(self):
394 msg = self._vnfr_mon_param_msg
395 value_type = msg.value_type
396
397 if self._current_value is None:
398 return msg
399
400 if value_type == "INT":
401 msg.value_integer = self._current_value
402
403 elif value_type == "DECIMAL":
404 msg.value_decimal = self._current_value
405
406 elif value_type == "STRING":
407 msg.value_string = self._current_value
408
409 else:
410 self._log.debug("Unknown value_type: %s", value_type)
411
412 return msg
413
414 @property
415 def path(self):
416 return self.msg.http_endpoint_ref
417
418 @property
419 def value_type(self):
420 return self.msg.value_type
421
422 @property
423 def json_query_method(self):
424 return self.msg.json_query_method
425
426 @property
427 def json_path(self):
428 return self.msg.json_path_params.json_path
429
430 @property
431 def name(self):
432 return self.msg.name
433
434 def extract_value_from_response(self, response_msg):
435 if self._json_querier is None:
436 self._log.warning("json querier is not created. Cannot extract value form response.")
437 return
438
439 try:
440 value = self._json_querier.query(response_msg)
441 converted_value = self._value_converter.convert(value)
442 except MonitoringParamError as e:
443 self._log.warning("Failed to extract value from json response: %s", str(e))
444 return
445 else:
446 self._current_value = converted_value
447
448
449 class EndpointMonParamsPoller(object):
450 REQUEST_TIMEOUT_SECS = 10
451
452 def __init__(self, log, loop, endpoint, mon_params, on_update_cb=None):
453 self._log = log
454 self._loop = loop
455 self._endpoint = endpoint
456 self._mon_params = mon_params
457 self._on_update_cb = on_update_cb
458
459 self._poll_task = None
460
461 @property
462 def poll_interval(self):
463 return self._endpoint.poll_interval
464
465 def _get_mon_param_msgs(self):
466 return [mon_param.msg for mon_param in self._mon_params]
467
468 def _notify_subscriber(self):
469 if self._on_update_cb is None:
470 return
471
472 self._on_update_cb(self._get_mon_param_msgs())
473
474 def _apply_response_to_mon_params(self, response_msg):
475 for mon_param in self._mon_params:
476 mon_param.extract_value_from_response(response_msg)
477
478 self._notify_subscriber()
479
480 @asyncio.coroutine
481 def _poll_loop(self):
482 self._log.debug("Starting http endpoint %s poll loop", self._endpoint.url)
483 while True:
484 try:
485 response = yield from self._endpoint.poll()
486 self._apply_response_to_mon_params(response)
487 except concurrent.futures.CancelledError as e:
488 return
489
490 yield from asyncio.sleep(self.poll_interval, loop=self._loop)
491
492 def start(self):
493 self._log.debug("Got start request for endpoint poller: %s",
494 self._endpoint.url)
495 if self._poll_task is not None:
496 return
497 self._poll_task = self._loop.create_task(self._poll_loop())
498
499 def stop(self):
500 self._log.debug("Got stop request for endpoint poller: %s",
501 self._endpoint.url)
502 if self._poll_task is None:
503 return
504
505 self._poll_task.cancel()
506
507 self._poll_task = None
508
509
510 class VnfMonitoringParamsController(object):
511 def __init__(self, log, loop, vnfr_id, management_ip,
512 http_endpoint_msgs, monitoring_param_msgs,
513 on_update_cb=None):
514 self._log = log
515 self._loop = loop
516 self._vnfr_id = vnfr_id
517 self._management_ip = management_ip
518 self._http_endpoint_msgs = http_endpoint_msgs
519 self._monitoring_param_msgs = monitoring_param_msgs
520
521 self._on_update_cb = on_update_cb
522 self._endpoints = self._create_endpoints()
523 self._mon_params = self._create_mon_params()
524
525 self._endpoint_mon_param_map = self._create_endpoint_mon_param_map(
526 self._endpoints, self._mon_params
527 )
528 self._endpoint_pollers = self._create_endpoint_pollers(self._endpoint_mon_param_map)
529
530 def _create_endpoints(self):
531 path_endpoint_map = {}
532 for ep_msg in self._http_endpoint_msgs:
533 endpoint = HTTPEndpoint(
534 self._log,
535 self._loop,
536 self._management_ip,
537 ep_msg,
538 )
539 path_endpoint_map[endpoint.path] = endpoint
540
541 return path_endpoint_map
542
543 def _create_mon_params(self):
544 mon_params = {}
545 for mp_msg in self._monitoring_param_msgs:
546 mon_params[mp_msg.id] = MonitoringParam(
547 self._log,
548 mp_msg,
549 )
550
551 return mon_params
552
553 def _create_endpoint_mon_param_map(self, endpoints, mon_params):
554 ep_mp_map = collections.defaultdict(list)
555 for mp in mon_params.values():
556 endpoint = endpoints[mp.path]
557 ep_mp_map[endpoint].append(mp)
558
559 return ep_mp_map
560
561 def _create_endpoint_pollers(self, ep_mp_map):
562 pollers = []
563
564 for endpoint, mon_params in ep_mp_map.items():
565 poller = EndpointMonParamsPoller(
566 self._log,
567 self._loop,
568 endpoint,
569 mon_params,
570 self._on_update_cb
571 )
572
573 pollers.append(poller)
574
575 return pollers
576
577 @property
578 def msgs(self):
579 msgs = []
580 for mp in self.mon_params:
581 msgs.append(mp.msg)
582
583 return msgs
584
585 @property
586 def mon_params(self):
587 return list(self._mon_params.values())
588
589 @property
590 def endpoints(self):
591 return list(self._endpoints.values())
592
593 def start(self):
594 """ Start monitoring """
595 self._log.debug("Starting monitoring of VNF id: %s", self._vnfr_id)
596 for poller in self._endpoint_pollers:
597 poller.start()
598
599 def stop(self):
600 """ Stop monitoring """
601 self._log.debug("Stopping monitoring of VNF id: %s", self._vnfr_id)
602 for poller in self._endpoint_pollers:
603 poller.stop()
604
605
606 class VnfMonitorDtsHandler(mano_dts.DtsHandler):
607 """ VNF monitoring class """
608 # List of list: So we need to register for the list in the deepest level
609 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr/vnfr:monitoring-param"
610
611 @classmethod
612 def from_vnf_data(cls, tasklet, vnfr_msg, vnfd_msg):
613 handler = cls(tasklet.log, tasklet.dts, tasklet.loop,
614 vnfr_msg.id, vnfr_msg.mgmt_interface.ip_address,
615 vnfd_msg.monitoring_param, vnfd_msg.http_endpoint)
616
617 return handler
618
619 def __init__(self, log, dts, loop, vnfr_id, mgmt_ip, params, endpoints):
620 super().__init__(log, dts, loop)
621
622 self._mgmt_ip = mgmt_ip
623 self._vnfr_id = vnfr_id
624
625 mon_params = []
626 for mon_param in params:
627 param = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict(
628 mon_param.as_dict()
629 )
630 mon_params.append(param)
631
632 http_endpoints = []
633 for endpoint in endpoints:
634 endpoint = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_HttpEndpoint.from_dict(
635 endpoint.as_dict()
636 )
637 http_endpoints.append(endpoint)
638
639 self.log.debug("Creating monitoring param controller")
640 self.log.debug(" - Endpoints: %s", http_endpoints)
641 self.log.debug(" - Monitoring Params: %s", mon_params)
642
643 self._mon_param_controller = VnfMonitoringParamsController(
644 self.log,
645 self.loop,
646 self._vnfr_id,
647 self._mgmt_ip,
648 http_endpoints,
649 mon_params,
650 self.on_update_mon_params
651 )
652
653 def on_update_mon_params(self, mon_param_msgs):
654 for param_msg in mon_param_msgs:
655 self.reg.update_element(
656 self.xpath(param_msg.id),
657 param_msg,
658 rwdts.XactFlag.ADVISE
659 )
660
661 def start(self):
662 self._mon_param_controller.start()
663
664 def stop(self):
665 self.deregister()
666 self._mon_param_controller.stop()
667
668 def xpath(self, param_id=None):
669 """ Monitoring params xpath """
670 return("D,/vnfr:vnfr-catalog" +
671 "/vnfr:vnfr[vnfr:id='{}']".format(self._vnfr_id) +
672 "/vnfr:monitoring-param" +
673 ("[vnfr:id='{}']".format(param_id) if param_id else ""))
674
675 @property
676 def msg(self):
677 """ The message with the monitoing params """
678 return self._mon_param_controller.msgs
679
680 def __del__(self):
681 self.stop()
682
683 @asyncio.coroutine
684 def register(self):
685 """ Register with dts """
686
687 self.reg = yield from self.dts.register(xpath=self.xpath(),
688 flags=rwdts.Flag.PUBLISHER|rwdts.Flag.CACHE|rwdts.Flag.NO_PREP_READ)
689
690 assert self.reg is not None
691
692 def deregister(self):
693 """ de-register with dts """
694 if self.reg is not None:
695 self.log.debug("Deregistering path %s, regh = %s",
696 VnfMonitorDtsHandler.XPATH,
697 self.reg)
698 self.reg.deregister()
699 self.reg = None
700 self._vnfr = None