4ab351e7157fcc3b79a01130f670b123570fae00
[osm/SO.git] / rwlaunchpad / plugins / rwmonitor / rift / tasklets / rwmonitor / tasklet.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 """
19 NFVI MONITORING
20 ==================================================
21
22 Data Model
23 --------------------------------------------------
24
25 The monitoring tasklet consists of several types of data that are associated
26 with one another. The highest level data are the cloud accounts. These objects
27 contain authentication information that is used to retrieve metrics as well as
28 the provider (and hence the available data source platforms).
29
30 Each cloud account is associated with an NfviMetricsPlugin. This is a
31 one-to-one relationship. The plugin is the interface to the data source that
32 will actually provide the NFVI metrics.
33
34 Each cloud account is also associated with several VNFRs. Each VNFR, in turn,
35 contains several VDURs. The VDURs represent the level that the NFVI metrics are
36 collected at. However, it is important that the relationships among all these
37 different objects are carefully managed.
38
39
40 CloudAccount -------------- NfviMetricsPlugin
41 / \
42 / \
43 / ... \
44 / \
45 VNFR VNFR
46 /\
47 / \
48 / \
49 / .... \
50 / \
51 VDUR VDUR
52 | |
53 | |
54 Metrics Metrics
55
56
57 Monitoring Tasklet
58 --------------------------------------------------
59
60 The monitoring tasklet (the MonitorTasklet class) is primarily responsible for
61 the communicating between DTS and the application (the Monitor class), which
62 provides the logic for managing and interacting with the data model (see
63 above).
64
65 """
66
67 import asyncio
68 import concurrent.futures
69 import time
70
71 import tornado.web
72 import tornado.httpserver
73
74 import gi
75 gi.require_version('RwDts', '1.0')
76 gi.require_version('RwLog', '1.0')
77 gi.require_version('RwMonitorYang', '1.0')
78 gi.require_version('RwLaunchpadYang', '1.0')
79 gi.require_version('RwNsrYang', '1.0')
80 gi.require_version('RwVnfrYang', '1.0')
81 gi.require_version('RwLaunchpadYang', '1.0')
82 from gi.repository import (
83 RwDts as rwdts,
84 RwLog as rwlog,
85 RwMonitorYang as rwmonitor,
86 RwLaunchpadYang,
87 RwVnfrYang,
88 VnfrYang,
89 )
90
91 import rift.tasklets
92 import rift.mano.cloud
93
94 from . import core
95
96
97 class DtsHandler(object):
98 def __init__(self, tasklet):
99 self.reg = None
100 self.tasklet = tasklet
101
102 @property
103 def log(self):
104 return self.tasklet.log
105
106 @property
107 def log_hdl(self):
108 return self.tasklet.log_hdl
109
110 @property
111 def dts(self):
112 return self.tasklet.dts
113
114 @property
115 def loop(self):
116 return self.tasklet.loop
117
118 @property
119 def classname(self):
120 return self.__class__.__name__
121
122 class VnfrCatalogSubscriber(DtsHandler):
123 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
124
125 @asyncio.coroutine
126 def register(self):
127 @asyncio.coroutine
128 def on_prepare(xact_info, action, ks_path, msg):
129 try:
130 if msg is None:
131 return
132
133 if action == rwdts.QueryAction.CREATE:
134 self.tasklet.on_vnfr_create(msg)
135
136 elif action == rwdts.QueryAction.UPDATE:
137 self.tasklet.on_vnfr_update(msg)
138
139 elif action == rwdts.QueryAction.DELETE:
140 self.tasklet.on_vnfr_delete(msg)
141
142 except Exception as e:
143 self.log.exception(e)
144
145 finally:
146 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
147
148 handler = rift.tasklets.DTS.RegistrationHandler(
149 on_prepare=on_prepare,
150 )
151
152 with self.dts.group_create() as group:
153 group.register(
154 xpath=VnfrCatalogSubscriber.XPATH,
155 flags=rwdts.Flag.SUBSCRIBER,
156 handler=handler,
157 )
158
159
160 class NsInstanceConfigSubscriber(DtsHandler):
161 XPATH = "C,/nsr:ns-instance-config"
162
163 @asyncio.coroutine
164 def register(self):
165 def on_apply(dts, acg, xact, action, _):
166 xact_config = list(self.reg.get_xact_elements(xact))
167 for config in xact_config:
168 self.tasklet.on_ns_instance_config_update(config)
169
170 acg_handler = rift.tasklets.AppConfGroup.Handler(
171 on_apply=on_apply,
172 )
173
174 with self.dts.appconf_group_create(acg_handler) as acg:
175 self.reg = acg.register(
176 xpath=NsInstanceConfigSubscriber.XPATH,
177 flags=rwdts.Flag.SUBSCRIBER,
178 )
179
180
181 class CloudAccountDtsHandler(DtsHandler):
182 def __init__(self, tasklet):
183 super().__init__(tasklet)
184 self._cloud_cfg_subscriber = None
185
186 def register(self):
187 self.log.debug("creating cloud account config handler")
188 self._cloud_cfg_subscriber = rift.mano.cloud.CloudAccountConfigSubscriber(
189 self.dts, self.log, self.log_hdl,
190 rift.mano.cloud.CloudAccountConfigCallbacks(
191 on_add_apply=self.tasklet.on_cloud_account_create,
192 on_delete_apply=self.tasklet.on_cloud_account_delete,
193 )
194 )
195 self._cloud_cfg_subscriber.register()
196
197
198 class VdurNfviMetricsPublisher(DtsHandler):
199 """
200 A VdurNfviMetricsPublisher is responsible for publishing the NFVI metrics
201 from a single VDU.
202 """
203
204 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id='{}']/vnfr:vdur[vnfr:id='{}']/rw-vnfr:nfvi-metrics"
205
206 # This timeout defines the length of time the publisher will wait for a
207 # request to a data source to complete. If the request cannot be completed
208 # before timing out, the current data will be published instead.
209 TIMEOUT = 2.0
210
211 def __init__(self, tasklet, vnfr, vdur):
212 """Create an instance of VdurNvfiPublisher
213
214 Arguments:
215 tasklet - the tasklet
216 vnfr - the VNFR that contains the VDUR
217 vdur - the VDUR of the VDU whose metrics are published
218
219 """
220 super().__init__(tasklet)
221 self._vnfr = vnfr
222 self._vdur = vdur
223
224 self._handle = None
225 self._xpath = VdurNfviMetricsPublisher.XPATH.format(vnfr.id, vdur.id)
226
227 self._deregistered = asyncio.Event(loop=self.loop)
228
229 @property
230 def vnfr(self):
231 """The VNFR associated with this publisher"""
232 return self._vnfr
233
234 @property
235 def vdur(self):
236 """The VDUR associated with this publisher"""
237 return self._vdur
238
239 @property
240 def vim_id(self):
241 """The VIM ID of the VDUR associated with this publisher"""
242 return self._vdur.vim_id
243
244 @property
245 def xpath(self):
246 """The XPATH that the metrics are published on"""
247 return self._xpath
248
249 @asyncio.coroutine
250 def dts_on_prepare(self, xact_info, action, ks_path, msg):
251 """Handles the DTS on_prepare callback"""
252 self.log.debug("{}:dts_on_prepare".format(self.classname))
253
254 if action == rwdts.QueryAction.READ:
255 # If the publisher has been deregistered, the xpath element has
256 # been deleted. So we do not want to publish the metrics and
257 # re-created the element.
258 if not self._deregistered.is_set():
259 metrics = self.tasklet.on_retrieve_nfvi_metrics(self.vdur.id)
260 xact_info.respond_xpath(
261 rwdts.XactRspCode.MORE,
262 self.xpath,
263 metrics,
264 )
265
266 xact_info.respond_xpath(rwdts.XactRspCode.ACK, self.xpath)
267
268 @asyncio.coroutine
269 def register(self):
270 """Register the publisher with DTS"""
271 self._handle = yield from self.dts.register(
272 xpath=self.xpath,
273 handler=rift.tasklets.DTS.RegistrationHandler(
274 on_prepare=self.dts_on_prepare,
275 ),
276 flags=rwdts.Flag.PUBLISHER,
277 )
278
279 def deregister(self):
280 """Deregister the publisher from DTS"""
281 # Mark the publisher for deregistration. This prevents the publisher
282 # from creating an element after it has been deleted.
283 self._deregistered.set()
284
285 # Now that we are done with the registration handle, delete the element
286 # and tell DTS to deregister it
287 self._handle.delete_element(self.xpath)
288 self._handle.deregister()
289 self._handle = None
290
291
292 class LaunchpadConfigDtsSubscriber(DtsHandler):
293 """
294 This class subscribes to the launchpad configuration and alerts the tasklet
295 to any relevant changes.
296 """
297
298 @asyncio.coroutine
299 def register(self):
300 @asyncio.coroutine
301 def apply_config(dts, acg, xact, action, _):
302 if xact.xact is None:
303 # When RIFT first comes up, an INSTALL is called with the current config
304 # Since confd doesn't actally persist data this never has any data so
305 # skip this for now.
306 self.log.debug("No xact handle. Skipping apply config")
307 return
308
309 try:
310 cfg = list(self.reg.get_xact_elements(xact))[0]
311 if cfg.public_ip != self.tasklet.public_ip:
312 yield from self.tasklet.on_public_ip(cfg.public_ip)
313
314 except Exception as e:
315 self.log.exception(e)
316
317 try:
318 acg_handler = rift.tasklets.AppConfGroup.Handler(
319 on_apply=apply_config,
320 )
321
322 with self.dts.appconf_group_create(acg_handler) as acg:
323 self.reg = acg.register(
324 xpath="C,/rw-launchpad:launchpad-config",
325 flags=rwdts.Flag.SUBSCRIBER,
326 )
327
328 except Exception as e:
329 self.log.exception(e)
330
331
332 class CreateAlarmRPC(DtsHandler):
333 """
334 This class is used to listen for RPC calls to /vnfr:create-alarm, and pass
335 them on to the tasklet.
336 """
337
338 def __init__(self, tasklet):
339 super().__init__(tasklet)
340 self._handle = None
341
342 @asyncio.coroutine
343 def register(self):
344 """Register this handler with DTS"""
345 @asyncio.coroutine
346 def on_prepare(xact_info, action, ks_path, msg):
347 try:
348 response = VnfrYang.YangOutput_Vnfr_CreateAlarm()
349 response.alarm_id = yield from self.tasklet.on_create_alarm(
350 msg.cloud_account,
351 msg.vdur_id,
352 msg.alarm,
353 )
354
355 xact_info.respond_xpath(
356 rwdts.XactRspCode.ACK,
357 "O,/vnfr:create-alarm",
358 response,
359 )
360
361 except Exception as e:
362 self.log.exception(e)
363 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
364
365 self._handle = yield from self.dts.register(
366 xpath="I,/vnfr:create-alarm",
367 handler=rift.tasklets.DTS.RegistrationHandler(
368 on_prepare=on_prepare
369 ),
370 flags=rwdts.Flag.PUBLISHER,
371 )
372
373 def deregister(self):
374 """Deregister this handler"""
375 self._handle.deregister()
376 self._handle = None
377
378
379 class DestroyAlarmRPC(DtsHandler):
380 """
381 This class is used to listen for RPC calls to /vnfr:destroy-alarm, and pass
382 them on to the tasklet.
383 """
384
385 def __init__(self, tasklet):
386 super().__init__(tasklet)
387 self._handle = None
388
389 @asyncio.coroutine
390 def register(self):
391 """Register this handler with DTS"""
392 @asyncio.coroutine
393 def on_prepare(xact_info, action, ks_path, msg):
394 try:
395 yield from self.tasklet.on_destroy_alarm(
396 msg.cloud_account,
397 msg.alarm_id,
398 )
399
400 xact_info.respond_xpath(
401 rwdts.XactRspCode.ACK,
402 "O,/vnfr:destroy-alarm"
403 )
404
405 except Exception as e:
406 self.log.exception(e)
407 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
408
409 self._handle = yield from self.dts.register(
410 xpath="I,/vnfr:destroy-alarm",
411 handler=rift.tasklets.DTS.RegistrationHandler(
412 on_prepare=on_prepare
413 ),
414 flags=rwdts.Flag.PUBLISHER,
415 )
416
417 def deregister(self):
418 """Deregister this handler"""
419 self._handle.deregister()
420 self._handle = None
421
422
423 class Delegate(object):
424 """
425 This class is used to delegate calls to collections of listener objects.
426 The listeners are expected to conform to the required function arguments,
427 but this is not enforced by the Delegate class itself.
428 """
429
430 def __init__(self):
431 self._listeners = list()
432
433 def __call__(self, *args, **kwargs):
434 """Delegate the call to the registered listeners"""
435 for listener in self._listeners:
436 listener(*args, **kwargs)
437
438 def register(self, listener):
439 """Register a listener
440
441 Arguments:
442 listener - an object that function calls will be delegated to
443
444 """
445 self._listeners.append(listener)
446
447
448 class WebhookHandler(tornado.web.RequestHandler):
449 @property
450 def log(self):
451 return self.application.tasklet.log
452
453 def options(self, *args, **kargs):
454 pass
455
456 def set_default_headers(self):
457 self.set_header('Access-Control-Allow-Origin', '*')
458 self.set_header('Access-Control-Allow-Headers', 'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization')
459 self.set_header('Access-Control-Allow-Methods', 'POST')
460
461 def post(self, action, vim_id):
462 pass
463
464
465 class WebhookApplication(tornado.web.Application):
466 DEFAULT_WEBHOOK_PORT = 4568
467
468 def __init__(self, tasklet):
469 self.tasklet = tasklet
470
471 super().__init__([
472 (r"/([^/]+)/([^/]+)/?", WebhookHandler),
473 ])
474
475
476 class MonitorTasklet(rift.tasklets.Tasklet):
477 """
478 The MonitorTasklet provides a interface for DTS to interact with an
479 instance of the Monitor class. This allows the Monitor class to remain
480 independent of DTS.
481 """
482
483 DEFAULT_POLLING_PERIOD = 1.0
484
485 def __init__(self, *args, **kwargs):
486 try:
487 super().__init__(*args, **kwargs)
488 self.rwlog.set_category("rw-monitor-log")
489
490 self.vnfr_subscriber = VnfrCatalogSubscriber(self)
491 self.cloud_cfg_subscriber = CloudAccountDtsHandler(self)
492 self.ns_instance_config_subscriber = NsInstanceConfigSubscriber(self)
493 self.launchpad_cfg_subscriber = LaunchpadConfigDtsSubscriber(self)
494
495 self.config = core.InstanceConfiguration()
496 self.config.polling_period = MonitorTasklet.DEFAULT_POLLING_PERIOD
497
498 self.monitor = core.Monitor(self.loop, self.log, self.config)
499 self.vdur_handlers = dict()
500
501 self.webhooks = None
502 self.create_alarm_rpc = CreateAlarmRPC(self)
503 self.destroy_alarm_rpc = DestroyAlarmRPC(self)
504
505
506 except Exception as e:
507 self.log.exception(e)
508
509 @property
510 def polling_period(self):
511 return self.config.polling_period
512
513 @property
514 def public_ip(self):
515 """The public IP of the launchpad"""
516 return self.config.public_ip
517
518 def start(self):
519 super().start()
520 self.log.info("Starting MonitoringTasklet")
521
522 self.log.debug("Registering with dts")
523 self.dts = rift.tasklets.DTS(
524 self.tasklet_info,
525 RwLaunchpadYang.get_schema(),
526 self.loop,
527 self.on_dts_state_change
528 )
529
530 self.log.debug("Created DTS Api GI Object: %s", self.dts)
531
532 def stop(self):
533 try:
534 self.dts.deinit()
535 except Exception as e:
536 self.log.exception(e)
537
538 @asyncio.coroutine
539 def init(self):
540 self.log.debug("creating cloud account handler")
541 self.cloud_cfg_subscriber.register()
542
543 self.log.debug("creating launchpad config subscriber")
544 yield from self.launchpad_cfg_subscriber.register()
545
546 self.log.debug("creating NS instance config subscriber")
547 yield from self.ns_instance_config_subscriber.register()
548
549 self.log.debug("creating vnfr subscriber")
550 yield from self.vnfr_subscriber.register()
551
552 self.log.debug("creating create-alarm rpc handler")
553 yield from self.create_alarm_rpc.register()
554
555 self.log.debug("creating destroy-alarm rpc handler")
556 yield from self.destroy_alarm_rpc.register()
557
558 self.log.debug("creating webhook server")
559 loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
560 self.webhooks = WebhookApplication(self)
561 self.server = tornado.httpserver.HTTPServer(
562 self.webhooks,
563 io_loop=loop,
564 )
565
566 @asyncio.coroutine
567 def on_public_ip(self, ip):
568 """Store the public IP of the launchpad
569
570 Arguments:
571 ip - a string containing the public IP address of the launchpad
572
573 """
574 self.config.public_ip = ip
575
576 def on_ns_instance_config_update(self, config):
577 """Update configuration information
578
579 Arguments:
580 config - an NsInstanceConfig object
581
582 """
583 if config.nfvi_polling_period is not None:
584 self.config.polling_period = config.nfvi_polling_period
585
586 def on_cloud_account_create(self, account):
587 self.monitor.add_cloud_account(account.cal_account_msg)
588
589 def on_cloud_account_delete(self, account_name):
590 self.monitor.remove_cloud_account(account_name)
591
592 @asyncio.coroutine
593 def run(self):
594 self.webhooks.listen(WebhookApplication.DEFAULT_WEBHOOK_PORT)
595
596 def on_instance_started(self):
597 self.log.debug("Got instance started callback")
598
599 @asyncio.coroutine
600 def on_dts_state_change(self, state):
601 """Handle DTS state change
602
603 Take action according to current DTS state to transition application
604 into the corresponding application state
605
606 Arguments
607 state - current dts state
608
609 """
610 switch = {
611 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
612 rwdts.State.CONFIG: rwdts.State.RUN,
613 }
614
615 handlers = {
616 rwdts.State.INIT: self.init,
617 rwdts.State.RUN: self.run,
618 }
619
620 # Transition application to next state
621 handler = handlers.get(state, None)
622 if handler is not None:
623 yield from handler()
624
625 # Transition dts to next state
626 next_state = switch.get(state, None)
627 if next_state is not None:
628 self.dts.handle.set_state(next_state)
629
630 def on_vnfr_create(self, vnfr):
631 if not self.monitor.nfvi_metrics_available(vnfr.cloud_account):
632 msg = "NFVI metrics unavailable for {}"
633 self.log.warning(msg.format(vnfr.cloud_account))
634 return
635
636 self.monitor.add_vnfr(vnfr)
637
638 # Create NFVI handlers for VDURs
639 for vdur in vnfr.vdur:
640 if vdur.vim_id is not None:
641 coro = self.register_vdur_nfvi_handler(vnfr, vdur)
642 self.loop.create_task(coro)
643
644 def on_vnfr_update(self, vnfr):
645 if not self.monitor.nfvi_metrics_available(vnfr.cloud_account):
646 msg = "NFVI metrics unavailable for {}"
647 self.log.warning(msg.format(vnfr.cloud_account))
648 return
649
650 self.monitor.update_vnfr(vnfr)
651
652 # TODO handle the removal of vdurs
653 for vdur in vnfr.vdur:
654 if vdur.vim_id is not None:
655 coro = self.register_vdur_nfvi_handler(vnfr, vdur)
656 self.loop.create_task(coro)
657
658 def on_vnfr_delete(self, vnfr):
659 self.monitor.remove_vnfr(vnfr.id)
660
661 # Delete any NFVI handlers associated with the VNFR
662 for vdur in vnfr.vdur:
663 self.deregister_vdur_nfvi_handler(vdur.id)
664
665 def on_retrieve_nfvi_metrics(self, vdur_id):
666 return self.monitor.retrieve_nfvi_metrics(vdur_id)
667
668 @asyncio.coroutine
669 def register_vdur_nfvi_handler(self, vnfr, vdur):
670 if vdur.vim_id is None:
671 return
672
673 if vdur.operational_status != "running":
674 return
675
676 if vdur.id not in self.vdur_handlers:
677 publisher = VdurNfviMetricsPublisher(self, vnfr, vdur)
678 yield from publisher.register()
679 self.vdur_handlers[vdur.id] = publisher
680
681 def deregister_vdur_nfvi_handler(self, vdur_id):
682 if vdur_id in self.vdur_handlers:
683 handler = self.vdur_handlers[vdur_id]
684
685 del self.vdur_handlers[vdur_id]
686 handler.deregister()
687
688 @asyncio.coroutine
689 def on_create_alarm(self, account, vdur_id, alarm):
690 """Creates an alarm and returns an alarm ID
691
692 Arguments:
693 account - a name of the cloud account used to authenticate the
694 creation of an alarm
695 vdur_id - the identifier of VDUR to create the alarm for
696 alarm - a structure defining the alarm that should be created
697
698 Returns:
699 An identifier specific to the created alarm
700
701 """
702 return (yield from self.monitor.create_alarm(account, vdur_id, alarm))
703
704 @asyncio.coroutine
705 def on_destroy_alarm(self, account, alarm_id):
706 """Destroys an alarm with the specified identifier
707
708 Arguments:
709 account - the name of the cloud account used to authenticate the
710 destruction of the alarm
711 alarm_id - the identifier of the alarm to destroy
712
713 """
714 yield from self.monitor.destroy_alarm(account, alarm_id)