Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwmonitor / rift / tasklets / rwmonitor / tasklet.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 """
19 NFVI MONITORING
20 ==================================================
21
22 Data Model
23 --------------------------------------------------
24
25 The monitoring tasklet consists of several types of data that are associated
26 with one another. The highest level data are the cloud accounts. These objects
27 contain authentication information that is used to retrieve metrics as well as
28 the provider (and hence the available data source platforms).
29
30 Each cloud account is associated with an NfviMetricsPlugin. This is a
31 one-to-one relationship. The plugin is the interface to the data source that
32 will actually provide the NFVI metrics.
33
34 Each cloud account is also associated with several VNFRs. Each VNFR, in turn,
35 contains several VDURs. The VDURs represent the level that the NFVI metrics are
36 collected at. However, it is important that the relationships among all these
37 different objects are carefully managed.
38
39
40 CloudAccount -------------- NfviMetricsPlugin
41 / \
42 / \
43 / ... \
44 / \
45 VNFR VNFR
46 /\
47 / \
48 / \
49 / .... \
50 / \
51 VDUR VDUR
52 | |
53 | |
54 Metrics Metrics
55
56
57 Monitoring Tasklet
58 --------------------------------------------------
59
60 The monitoring tasklet (the MonitorTasklet class) is primarily responsible for
61 the communicating between DTS and the application (the Monitor class), which
62 provides the logic for managing and interacting with the data model (see
63 above).
64
65 """
66
67 import asyncio
68 import concurrent.futures
69 import time
70
71 import tornado.web
72 import tornado.httpserver
73
74 import gi
75 gi.require_version('RwDts', '1.0')
76 gi.require_version('RwLog', '1.0')
77 gi.require_version('RwMonitorYang', '1.0')
78 gi.require_version('RwLaunchpadYang', '1.0')
79 gi.require_version('RwNsrYang', '1.0')
80 gi.require_version('RwVnfrYang', '1.0')
81 gi.require_version('RwLaunchpadYang', '1.0')
82 from gi.repository import (
83 RwDts as rwdts,
84 RwLog as rwlog,
85 RwMonitorYang as rwmonitor,
86 RwLaunchpadYang,
87 RwVnfrYang,
88 VnfrYang,
89 )
90
91 import rift.tasklets
92 import rift.mano.cloud
93 from rift.mano.utils.project import (
94 ManoProject,
95 ProjectHandler,
96 )
97
98 from . import core
99
100
101 class DtsHandler(object):
102 def __init__(self, project):
103 self.reg = None
104 self.project = project
105
106 @property
107 def log(self):
108 return self.project._log
109
110 @property
111 def log_hdl(self):
112 return self.project._log_hdl
113
114 @property
115 def dts(self):
116 return self.project._dts
117
118 @property
119 def loop(self):
120 return self.project._loop
121
122 @property
123 def classname(self):
124 return self.__class__.__name__
125
126 class VnfrCatalogSubscriber(DtsHandler):
127 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
128
129 @asyncio.coroutine
130 def register(self):
131 @asyncio.coroutine
132 def on_prepare(xact_info, action, ks_path, msg):
133 try:
134 if msg is None:
135 return
136
137 if action == rwdts.QueryAction.CREATE:
138 self.tasklet.on_vnfr_create(msg)
139
140 elif action == rwdts.QueryAction.UPDATE:
141 self.tasklet.on_vnfr_update(msg)
142
143 elif action == rwdts.QueryAction.DELETE:
144 self.tasklet.on_vnfr_delete(msg)
145
146 except Exception as e:
147 self.log.exception(e)
148
149 finally:
150 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
151
152 handler = rift.tasklets.DTS.RegistrationHandler(
153 on_prepare=on_prepare,
154 )
155
156 with self.dts.group_create() as group:
157 group.register(
158 xpath=self.project.add_project(VnfrCatalogSubscriber.XPATH),
159 flags=rwdts.Flag.SUBSCRIBER,
160 handler=handler,
161 )
162
163
164 class NsInstanceConfigSubscriber(DtsHandler):
165 XPATH = "C,/nsr:ns-instance-config"
166
167 @asyncio.coroutine
168 def register(self):
169 def on_apply(dts, acg, xact, action, _):
170 xact_config = list(self.reg.get_xact_elements(xact))
171 for config in xact_config:
172 self.tasklet.on_ns_instance_config_update(config)
173
174 acg_handler = rift.tasklets.AppConfGroup.Handler(
175 on_apply=on_apply,
176 )
177
178 with self.dts.appconf_group_create(acg_handler) as acg:
179 self.reg = acg.register(
180 xpath=self.project.add_project(NsInstanceConfigSubscriber.XPATH),
181 flags=rwdts.Flag.SUBSCRIBER,
182 )
183
184
185 class CloudAccountDtsHandler(DtsHandler):
186 def __init__(self, project):
187 super().__init__(project)
188 self._cloud_cfg_subscriber = None
189
190 def register(self):
191 self.log.debug("creating cloud account config handler")
192 self._cloud_cfg_subscriber = rift.mano.cloud.CloudAccountConfigSubscriber(
193 self.dts, self.log, self.log_hdl, self.project,
194 rift.mano.cloud.CloudAccountConfigCallbacks(
195 on_add_apply=self.tasklet.on_cloud_account_create,
196 on_delete_apply=self.tasklet.on_cloud_account_delete,
197 )
198 )
199 self._cloud_cfg_subscriber.register()
200
201
202 class VdurNfviMetricsPublisher(DtsHandler):
203 """
204 A VdurNfviMetricsPublisher is responsible for publishing the NFVI metrics
205 from a single VDU.
206 """
207
208 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id='{}']/vnfr:vdur[vnfr:id='{}']/rw-vnfr:nfvi-metrics"
209
210 # This timeout defines the length of time the publisher will wait for a
211 # request to a data source to complete. If the request cannot be completed
212 # before timing out, the current data will be published instead.
213 TIMEOUT = 2.0
214
215 def __init__(self, project, vnfr, vdur):
216 """Create an instance of VdurNvfiPublisher
217
218 Arguments:
219 tasklet - the tasklet
220 vnfr - the VNFR that contains the VDUR
221 vdur - the VDUR of the VDU whose metrics are published
222
223 """
224 super().__init__(project)
225 self._vnfr = vnfr
226 self._vdur = vdur
227
228 self._handle = None
229 self._xpath = project.add_project(VdurNfviMetricsPublisher.XPATH.format(vnfr.id, vdur.id))
230
231 self._deregistered = asyncio.Event(loop=self.loop)
232
233 @property
234 def vnfr(self):
235 """The VNFR associated with this publisher"""
236 return self._vnfr
237
238 @property
239 def vdur(self):
240 """The VDUR associated with this publisher"""
241 return self._vdur
242
243 @property
244 def vim_id(self):
245 """The VIM ID of the VDUR associated with this publisher"""
246 return self._vdur.vim_id
247
248 @property
249 def xpath(self):
250 """The XPATH that the metrics are published on"""
251 return self._xpath
252
253 @asyncio.coroutine
254 def dts_on_prepare(self, xact_info, action, ks_path, msg):
255 """Handles the DTS on_prepare callback"""
256 self.log.debug("{}:dts_on_prepare".format(self.classname))
257
258 if action == rwdts.QueryAction.READ:
259 # If the publisher has been deregistered, the xpath element has
260 # been deleted. So we do not want to publish the metrics and
261 # re-created the element.
262 if not self._deregistered.is_set():
263 metrics = self.tasklet.on_retrieve_nfvi_metrics(self.vdur.id)
264 xact_info.respond_xpath(
265 rwdts.XactRspCode.MORE,
266 self.xpath,
267 metrics,
268 )
269
270 xact_info.respond_xpath(rwdts.XactRspCode.ACK, self.xpath)
271
272 @asyncio.coroutine
273 def register(self):
274 """Register the publisher with DTS"""
275 self._handle = yield from self.dts.register(
276 xpath=self.xpath,
277 handler=rift.tasklets.DTS.RegistrationHandler(
278 on_prepare=self.dts_on_prepare,
279 ),
280 flags=rwdts.Flag.PUBLISHER,
281 )
282
283 def deregister(self):
284 """Deregister the publisher from DTS"""
285 # Mark the publisher for deregistration. This prevents the publisher
286 # from creating an element after it has been deleted.
287 self._deregistered.set()
288
289 # Now that we are done with the registration handle, delete the element
290 # and tell DTS to deregister it
291 self._handle.delete_element(self.xpath)
292 self._handle.deregister()
293 self._handle = None
294
295
296 class LaunchpadConfigDtsSubscriber(DtsHandler):
297 """
298 This class subscribes to the launchpad configuration and alerts the tasklet
299 to any relevant changes.
300 """
301
302 @asyncio.coroutine
303 def register(self):
304 @asyncio.coroutine
305 def apply_config(dts, acg, xact, action, _):
306 if xact.xact is None:
307 # When RIFT first comes up, an INSTALL is called with the current config
308 # Since confd doesn't actally persist data this never has any data so
309 # skip this for now.
310 self.log.debug("No xact handle. Skipping apply config")
311 return
312
313 try:
314 cfg = list(self.reg.get_xact_elements(xact))[0]
315 if cfg.public_ip != self.tasklet.public_ip:
316 yield from self.tasklet.on_public_ip(cfg.public_ip)
317
318 except Exception as e:
319 self.log.exception(e)
320
321 try:
322 acg_handler = rift.tasklets.AppConfGroup.Handler(
323 on_apply=apply_config,
324 )
325
326 with self.dts.appconf_group_create(acg_handler) as acg:
327 self.reg = acg.register(
328 xpath=self.project.add_project("C,/rw-launchpad:launchpad-config"),
329 flags=rwdts.Flag.SUBSCRIBER,
330 )
331
332 except Exception as e:
333 self.log.exception(e)
334
335
336 class CreateAlarmRPC(DtsHandler):
337 """
338 This class is used to listen for RPC calls to /vnfr:create-alarm, and pass
339 them on to the tasklet.
340 """
341
342 def __init__(self, project):
343 super().__init__(project)
344 self._handle = None
345
346 @asyncio.coroutine
347 def register(self):
348 """Register this handler with DTS"""
349 @asyncio.coroutine
350 def on_prepare(xact_info, action, ks_path, msg):
351 try:
352
353 if not self.project.rpc_check(msg, xact_info=xact_info):
354 return
355
356 response = VnfrYang.YangOutput_Vnfr_CreateAlarm()
357 response.alarm_id = yield from self.tasklet.on_create_alarm(
358 msg.cloud_account,
359 msg.vdur_id,
360 msg.alarm,
361 )
362
363 xact_info.respond_xpath(
364 rwdts.XactRspCode.ACK,
365 "O,/vnfr:create-alarm",
366 response,
367 )
368
369 except Exception as e:
370 self.log.exception(e)
371 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
372
373 self._handle = yield from self.dts.register(
374 xpath="I,/vnfr:create-alarm",
375 handler=rift.tasklets.DTS.RegistrationHandler(
376 on_prepare=on_prepare
377 ),
378 flags=rwdts.Flag.PUBLISHER,
379 )
380
381 def deregister(self):
382 """Deregister this handler"""
383 self._handle.deregister()
384 self._handle = None
385
386
387 class DestroyAlarmRPC(DtsHandler):
388 """
389 This class is used to listen for RPC calls to /vnfr:destroy-alarm, and pass
390 them on to the tasklet.
391 """
392
393 def __init__(self, project):
394 super().__init__(project)
395 self._handle = None
396
397 @asyncio.coroutine
398 def register(self):
399 """Register this handler with DTS"""
400 @asyncio.coroutine
401 def on_prepare(xact_info, action, ks_path, msg):
402 try:
403 if not self.project.rpc_check(msg, xact_info=xact_info):
404 return
405
406 yield from self.tasklet.on_destroy_alarm(
407 msg.cloud_account,
408 msg.alarm_id,
409 )
410
411 xact_info.respond_xpath(
412 rwdts.XactRspCode.ACK,
413 "O,/vnfr:destroy-alarm"
414 )
415
416 except Exception as e:
417 self.log.exception(e)
418 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
419
420 self._handle = yield from self.dts.register(
421 xpath="I,/vnfr:destroy-alarm",
422 handler=rift.tasklets.DTS.RegistrationHandler(
423 on_prepare=on_prepare
424 ),
425 flags=rwdts.Flag.PUBLISHER,
426 )
427
428 def deregister(self):
429 """Deregister this handler"""
430 self._handle.deregister()
431 self._handle = None
432
433
434 class Delegate(object):
435 """
436 This class is used to delegate calls to collections of listener objects.
437 The listeners are expected to conform to the required function arguments,
438 but this is not enforced by the Delegate class itself.
439 """
440
441 def __init__(self):
442 self._listeners = list()
443
444 def __call__(self, *args, **kwargs):
445 """Delegate the call to the registered listeners"""
446 for listener in self._listeners:
447 listener(*args, **kwargs)
448
449 def register(self, listener):
450 """Register a listener
451
452 Arguments:
453 listener - an object that function calls will be delegated to
454
455 """
456 self._listeners.append(listener)
457
458
459 class WebhookHandler(tornado.web.RequestHandler):
460 @property
461 def log(self):
462 return self.application.tasklet.log
463
464 def options(self, *args, **kargs):
465 pass
466
467 def set_default_headers(self):
468 self.set_header('Access-Control-Allow-Origin', '*')
469 self.set_header('Access-Control-Allow-Headers', 'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization')
470 self.set_header('Access-Control-Allow-Methods', 'POST')
471
472 def post(self, action, vim_id):
473 pass
474
475
476 class WebhookApplication(tornado.web.Application):
477 DEFAULT_WEBHOOK_PORT = 4568
478
479 def __init__(self, tasklet):
480 self.tasklet = tasklet
481
482 super().__init__([
483 (r"/([^/]+)/([^/]+)/?", WebhookHandler),
484 ])
485
486
487 class MonitorProject(ManoProject):
488
489 def __init__(self, name, tasklet, **kw):
490 super(MonitorProject, self).__init__(log, name)
491 self._tasklet = tasklet
492 self._log_hdl = tasklet.log_hdl
493 self._dts = tasklet.dts
494 self._loop = tasklet.loop
495
496 self.vnfr_subscriber = VnfrCatalogSubscriber(self)
497 self.cloud_cfg_subscriber = CloudAccountDtsHandler(self)
498 self.ns_instance_config_subscriber = NsInstanceConfigSubscriber(self)
499 self.launchpad_cfg_subscriber = LaunchpadConfigDtsSubscriber(self)
500
501 self.config = core.InstanceConfiguration()
502 self.config.polling_period = MonitorTasklet.DEFAULT_POLLING_PERIOD
503
504 self.monitor = core.Monitor(self.loop, self.log, self.config, self)
505 self.vdur_handlers = dict()
506
507 self.create_alarm_rpc = CreateAlarmRPC(self)
508 self.destroy_alarm_rpc = DestroyAlarmRPC(self)
509
510 @asyncio.coroutine
511 def register (self):
512 self.log.debug("creating cloud account handler")
513 self.cloud_cfg_subscriber.register()
514
515 self.log.debug("creating launchpad config subscriber")
516 yield from self.launchpad_cfg_subscriber.register()
517
518 self.log.debug("creating NS instance config subscriber")
519 yield from self.ns_instance_config_subscriber.register()
520
521 self.log.debug("creating vnfr subscriber")
522 yield from self.vnfr_subscriber.register()
523
524 self.log.debug("creating create-alarm rpc handler")
525 yield from self.create_alarm_rpc.register()
526
527 self.log.debug("creating destroy-alarm rpc handler")
528 yield from self.destroy_alarm_rpc.register()
529
530
531 @property
532 def polling_period(self):
533 return self.config.polling_period
534
535 @property
536 def public_ip(self):
537 """The public IP of the launchpad"""
538 return self.config.public_ip
539
540 def on_ns_instance_config_update(self, config):
541 """Update configuration information
542
543 Arguments:
544 config - an NsInstanceConfig object
545
546 """
547 if config.nfvi_polling_period is not None:
548 self.config.polling_period = config.nfvi_polling_period
549
550 def on_cloud_account_create(self, account):
551 self.monitor.add_cloud_account(account.cal_account_msg)
552
553 def on_cloud_account_delete(self, account_name):
554 self.monitor.remove_cloud_account(account_name)
555
556 def on_vnfr_create(self, vnfr):
557 if not self.monitor.nfvi_metrics_available(vnfr.cloud_account):
558 msg = "NFVI metrics unavailable for {}"
559 self.log.warning(msg.format(vnfr.cloud_account))
560 return
561
562 self.monitor.add_vnfr(vnfr)
563
564 # Create NFVI handlers for VDURs
565 for vdur in vnfr.vdur:
566 if vdur.vim_id is not None:
567 coro = self.register_vdur_nfvi_handler(vnfr, vdur)
568 self.loop.create_task(coro)
569
570 def on_vnfr_update(self, vnfr):
571 if not self.monitor.nfvi_metrics_available(vnfr.cloud_account):
572 msg = "NFVI metrics unavailable for {}"
573 self.log.warning(msg.format(vnfr.cloud_account))
574 return
575
576 self.monitor.update_vnfr(vnfr)
577
578 # TODO handle the removal of vdurs
579 for vdur in vnfr.vdur:
580 if vdur.vim_id is not None:
581 coro = self.register_vdur_nfvi_handler(vnfr, vdur)
582 self.loop.create_task(coro)
583
584 def on_vnfr_delete(self, vnfr):
585 self.monitor.remove_vnfr(vnfr.id)
586
587 # Delete any NFVI handlers associated with the VNFR
588 for vdur in vnfr.vdur:
589 self.deregister_vdur_nfvi_handler(vdur.id)
590
591 def on_retrieve_nfvi_metrics(self, vdur_id):
592 return self.monitor.retrieve_nfvi_metrics(vdur_id)
593
594 @asyncio.coroutine
595 def register_vdur_nfvi_handler(self, vnfr, vdur):
596 if vdur.vim_id is None:
597 return
598
599 if vdur.operational_status != "running":
600 return
601
602 if vdur.id not in self.vdur_handlers:
603 publisher = VdurNfviMetricsPublisher(self, vnfr, vdur)
604 yield from publisher.register()
605 self.vdur_handlers[vdur.id] = publisher
606
607 def deregister_vdur_nfvi_handler(self, vdur_id):
608 if vdur_id in self.vdur_handlers:
609 handler = self.vdur_handlers[vdur_id]
610
611 del self.vdur_handlers[vdur_id]
612 handler.deregister()
613
614 @asyncio.coroutine
615 def on_create_alarm(self, account, vdur_id, alarm):
616 """Creates an alarm and returns an alarm ID
617
618 Arguments:
619 account - a name of the cloud account used to authenticate the
620 creation of an alarm
621 vdur_id - the identifier of VDUR to create the alarm for
622 alarm - a structure defining the alarm that should be created
623
624 Returns:
625 An identifier specific to the created alarm
626
627 """
628 return (yield from self.monitor.create_alarm(account, vdur_id, alarm))
629
630 @asyncio.coroutine
631 def on_destroy_alarm(self, account, alarm_id):
632 """Destroys an alarm with the specified identifier
633
634 Arguments:
635 account - the name of the cloud account used to authenticate the
636 destruction of the alarm
637 alarm_id - the identifier of the alarm to destroy
638
639 """
640 yield from self.monitor.destroy_alarm(account, alarm_id)
641
642
643 class MonitorTasklet(rift.tasklets.Tasklet):
644 """
645 The MonitorTasklet provides a interface for DTS to interact with an
646 instance of the Monitor class. This allows the Monitor class to remain
647 independent of DTS.
648 """
649
650 DEFAULT_POLLING_PERIOD = 1.0
651
652 def __init__(self, *args, **kwargs):
653 try:
654 super().__init__(*args, **kwargs)
655 self.rwlog.set_category("rw-monitor-log")
656
657 self._project_handler = None
658 self.projects = {}
659
660 self.webhooks = None
661
662 except Exception as e:
663 self.log.exception(e)
664
665 def start(self):
666 super().start()
667 self.log.info("Starting MonitoringTasklet")
668
669 self.log.debug("Registering with dts")
670 self.dts = rift.tasklets.DTS(
671 self.tasklet_info,
672 RwLaunchpadYang.get_schema(),
673 self.loop,
674 self.on_dts_state_change
675 )
676
677 self.log.debug("Created DTS Api GI Object: %s", self.dts)
678
679 def stop(self):
680 try:
681 self.dts.deinit()
682 except Exception as e:
683 self.log.exception(e)
684
685 @asyncio.coroutine
686 def init(self):
687 self.log.debug("creating webhook server")
688 loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
689 self.webhooks = WebhookApplication(self)
690 self.server = tornado.httpserver.HTTPServer(
691 self.webhooks,
692 io_loop=loop,
693 )
694
695 @asyncio.coroutine
696 def on_public_ip(self, ip):
697 """Store the public IP of the launchpad
698
699 Arguments:
700 ip - a string containing the public IP address of the launchpad
701
702 """
703 self.config.public_ip = ip
704
705 @asyncio.coroutine
706 def run(self):
707 self.webhooks.listen(WebhookApplication.DEFAULT_WEBHOOK_PORT)
708
709 def on_instance_started(self):
710 self.log.debug("Got instance started callback")
711
712 @asyncio.coroutine
713 def on_dts_state_change(self, state):
714 """Handle DTS state change
715
716 Take action according to current DTS state to transition application
717 into the corresponding application state
718
719 Arguments
720 state - current dts state
721
722 """
723 switch = {
724 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
725 rwdts.State.CONFIG: rwdts.State.RUN,
726 }
727
728 handlers = {
729 rwdts.State.INIT: self.init,
730 rwdts.State.RUN: self.run,
731 }
732
733 # Transition application to next state
734 handler = handlers.get(state, None)
735 if handler is not None:
736 yield from handler()
737
738 # Transition dts to next state
739 next_state = switch.get(state, None)
740 if next_state is not None:
741 self.dts.handle.set_state(next_state)
742