update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwmonitor / rift / tasklets / rwmonitor / tasklet.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 """
19 NFVI MONITORING
20 ==================================================
21
22 Data Model
23 --------------------------------------------------
24
25 The monitoring tasklet consists of several types of data that are associated
26 with one another. The highest level data are the cloud accounts. These objects
27 contain authentication information that is used to retrieve metrics as well as
28 the provider (and hence the available data source platforms).
29
30 Each cloud account is associated with an NfviMetricsPlugin. This is a
31 one-to-one relationship. The plugin is the interface to the data source that
32 will actually provide the NFVI metrics.
33
34 Each cloud account is also associated with several VNFRs. Each VNFR, in turn,
35 contains several VDURs. The VDURs represent the level that the NFVI metrics are
36 collected at. However, it is important that the relationships among all these
37 different objects are carefully managed.
38
39
40 CloudAccount -------------- NfviMetricsPlugin
41 / \
42 / \
43 / ... \
44 / \
45 VNFR VNFR
46 /\
47 / \
48 / \
49 / .... \
50 / \
51 VDUR VDUR
52 | |
53 | |
54 Metrics Metrics
55
56
57 Monitoring Tasklet
58 --------------------------------------------------
59
60 The monitoring tasklet (the MonitorTasklet class) is primarily responsible for
61 the communicating between DTS and the application (the Monitor class), which
62 provides the logic for managing and interacting with the data model (see
63 above).
64
65 """
66
67 import asyncio
68 import concurrent.futures
69 import gi
70 import time
71 import tornado.httpserver
72 import tornado.web
73
74 gi.require_version('RwDts', '1.0')
75 gi.require_version('RwLog', '1.0')
76 gi.require_version('RwMonitorYang', '1.0')
77 gi.require_version('RwLaunchpadYang', '1.0')
78 gi.require_version('RwNsrYang', '1.0')
79 gi.require_version('RwVnfrYang', '1.0')
80 gi.require_version('rwlib', '1.0')
81 gi.require_version('RwLaunchpadYang', '1.0')
82 from gi.repository import (
83 RwDts as rwdts,
84 RwLog as rwlog,
85 RwMonitorYang as rwmonitor,
86 RwLaunchpadYang,
87 RwVnfrYang,
88 VnfrYang,
89 )
90 import gi.repository.rwlib as rwlib
91 import rift.tasklets
92 import rift.mano.cloud
93 from rift.mano.utils.project import (
94 ManoProject,
95 ProjectHandler,
96 )
97
98 gi.require_version('RwKeyspec', '1.0')
99 from gi.repository.RwKeyspec import quoted_key
100
101 from . import core
102
103
104 class DtsHandler(object):
105 def __init__(self, project):
106 self.reg = None
107 self.project = project
108
109 @property
110 def log(self):
111 return self.project._log
112
113 @property
114 def log_hdl(self):
115 return self.project._log_hdl
116
117 @property
118 def dts(self):
119 return self.project._dts
120
121 @property
122 def loop(self):
123 return self.project._loop
124
125 @property
126 def classname(self):
127 return self.__class__.__name__
128
129 class VnfrCatalogSubscriber(DtsHandler):
130 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr"
131
132 @asyncio.coroutine
133 def register(self):
134 @asyncio.coroutine
135 def on_prepare(xact_info, action, ks_path, msg):
136 try:
137 if msg is None:
138 return
139
140 if action == rwdts.QueryAction.CREATE:
141 self.tasklet.on_vnfr_create(msg)
142
143 elif action == rwdts.QueryAction.UPDATE:
144 self.tasklet.on_vnfr_update(msg)
145
146 elif action == rwdts.QueryAction.DELETE:
147 self.tasklet.on_vnfr_delete(msg)
148
149 except Exception as e:
150 self.log.exception(e)
151
152 finally:
153 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
154
155 handler = rift.tasklets.DTS.RegistrationHandler(
156 on_prepare=on_prepare,
157 )
158
159 with self.dts.group_create() as group:
160 group.register(
161 xpath=self.project.add_project(VnfrCatalogSubscriber.XPATH),
162 flags=rwdts.Flag.SUBSCRIBER,
163 handler=handler,
164 )
165
166
167 class NsInstanceConfigSubscriber(DtsHandler):
168 XPATH = "C,/nsr:ns-instance-config"
169
170 @asyncio.coroutine
171 def register(self):
172 def on_apply(dts, acg, xact, action, _):
173 xact_config = list(self.reg.get_xact_elements(xact))
174 for config in xact_config:
175 self.tasklet.on_ns_instance_config_update(config)
176
177 acg_handler = rift.tasklets.AppConfGroup.Handler(
178 on_apply=on_apply,
179 )
180
181 with self.dts.appconf_group_create(acg_handler) as acg:
182 self.reg = acg.register(
183 xpath=self.project.add_project(NsInstanceConfigSubscriber.XPATH),
184 flags=rwdts.Flag.SUBSCRIBER,
185 )
186
187
188 class CloudAccountDtsHandler(DtsHandler):
189 def __init__(self, project):
190 super().__init__(project)
191 self._cloud_cfg_subscriber = None
192
193 def register(self):
194 self.log.debug("creating cloud account config handler")
195 self._cloud_cfg_subscriber = rift.mano.cloud.CloudAccountConfigSubscriber(
196 self.dts, self.log, self.log_hdl, self.project,
197 rift.mano.cloud.CloudAccountConfigCallbacks(
198 on_add_apply=self.tasklet.on_cloud_account_create,
199 on_delete_apply=self.tasklet.on_cloud_account_delete,
200 )
201 )
202 self._cloud_cfg_subscriber.register()
203
204
205 class VdurNfviMetricsPublisher(DtsHandler):
206 """
207 A VdurNfviMetricsPublisher is responsible for publishing the NFVI metrics
208 from a single VDU.
209 """
210
211 XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]/vnfr:vdur[vnfr:id={}]/rw-vnfr:nfvi-metrics"
212
213 # This timeout defines the length of time the publisher will wait for a
214 # request to a data source to complete. If the request cannot be completed
215 # before timing out, the current data will be published instead.
216 TIMEOUT = 2.0
217
218 def __init__(self, project, vnfr, vdur):
219 """Create an instance of VdurNvfiPublisher
220
221 Arguments:
222 tasklet - the tasklet
223 vnfr - the VNFR that contains the VDUR
224 vdur - the VDUR of the VDU whose metrics are published
225
226 """
227 super().__init__(project)
228 self._vnfr = vnfr
229 self._vdur = vdur
230
231 self._handle = None
232 self._xpath = project.add_project(VdurNfviMetricsPublisher.XPATH.format(quoted_key(vnfr.id), quoted_key(vdur.id)))
233
234 self._deregistered = asyncio.Event(loop=self.loop)
235
236 @property
237 def vnfr(self):
238 """The VNFR associated with this publisher"""
239 return self._vnfr
240
241 @property
242 def vdur(self):
243 """The VDUR associated with this publisher"""
244 return self._vdur
245
246 @property
247 def vim_id(self):
248 """The VIM ID of the VDUR associated with this publisher"""
249 return self._vdur.vim_id
250
251 @property
252 def xpath(self):
253 """The XPATH that the metrics are published on"""
254 return self._xpath
255
256 @asyncio.coroutine
257 def dts_on_prepare(self, xact_info, action, ks_path, msg):
258 """Handles the DTS on_prepare callback"""
259 self.log.debug("{}:dts_on_prepare".format(self.classname))
260
261 if action == rwdts.QueryAction.READ:
262 # If the publisher has been deregistered, the xpath element has
263 # been deleted. So we do not want to publish the metrics and
264 # re-created the element.
265 if not self._deregistered.is_set():
266 metrics = self.tasklet.on_retrieve_nfvi_metrics(self.vdur.id)
267 xact_info.respond_xpath(
268 rwdts.XactRspCode.MORE,
269 self.xpath,
270 metrics,
271 )
272
273 xact_info.respond_xpath(rwdts.XactRspCode.ACK, self.xpath)
274
275 @asyncio.coroutine
276 def register(self):
277 """Register the publisher with DTS"""
278 self._handle = yield from self.dts.register(
279 xpath=self.xpath,
280 handler=rift.tasklets.DTS.RegistrationHandler(
281 on_prepare=self.dts_on_prepare,
282 ),
283 flags=rwdts.Flag.PUBLISHER,
284 )
285
286 def deregister(self):
287 """Deregister the publisher from DTS"""
288 # Mark the publisher for deregistration. This prevents the publisher
289 # from creating an element after it has been deleted.
290 self._deregistered.set()
291
292 # Now that we are done with the registration handle, delete the element
293 # and tell DTS to deregister it
294 self._handle.delete_element(self.xpath)
295 self._handle.deregister()
296 self._handle = None
297
298
299 class LaunchpadConfigDtsSubscriber(DtsHandler):
300 """
301 This class subscribes to the launchpad configuration and alerts the tasklet
302 to any relevant changes.
303 """
304
305 @asyncio.coroutine
306 def register(self):
307 @asyncio.coroutine
308 def apply_config(dts, acg, xact, action, _):
309 if xact.xact is None:
310 # When RIFT first comes up, an INSTALL is called with the current config
311 # Since confd doesn't actally persist data this never has any data so
312 # skip this for now.
313 self.log.debug("No xact handle. Skipping apply config")
314 return
315
316 try:
317 cfg = list(self.reg.get_xact_elements(xact))[0]
318 if cfg.public_ip != self.tasklet.public_ip:
319 yield from self.tasklet.on_public_ip(cfg.public_ip)
320
321 except Exception as e:
322 self.log.exception(e)
323
324 try:
325 acg_handler = rift.tasklets.AppConfGroup.Handler(
326 on_apply=apply_config,
327 )
328
329 with self.dts.appconf_group_create(acg_handler) as acg:
330 self.reg = acg.register(
331 xpath=self.project.add_project("C,/rw-launchpad:launchpad-config"),
332 flags=rwdts.Flag.SUBSCRIBER,
333 )
334
335 except Exception as e:
336 self.log.exception(e)
337
338
339 class CreateAlarmRPC(DtsHandler):
340 """
341 This class is used to listen for RPC calls to /vnfr:create-alarm, and pass
342 them on to the tasklet.
343 """
344
345 def __init__(self, project):
346 super().__init__(project)
347 self._handle = None
348
349 @asyncio.coroutine
350 def register(self):
351 """Register this handler with DTS"""
352 @asyncio.coroutine
353 def on_prepare(xact_info, action, ks_path, msg):
354 try:
355
356 if not self.project.rpc_check(msg, xact_info=xact_info):
357 return
358
359 response = VnfrYang.YangOutput_Vnfr_CreateAlarm()
360 response.alarm_id = yield from self.tasklet.on_create_alarm(
361 msg.cloud_account,
362 msg.vdur_id,
363 msg.alarm,
364 )
365
366 xact_info.respond_xpath(
367 rwdts.XactRspCode.ACK,
368 "O,/vnfr:create-alarm",
369 response,
370 )
371
372 except Exception as e:
373 self.log.exception(e)
374 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
375
376 self._handle = yield from self.dts.register(
377 xpath="I,/vnfr:create-alarm",
378 handler=rift.tasklets.DTS.RegistrationHandler(
379 on_prepare=on_prepare
380 ),
381 flags=rwdts.Flag.PUBLISHER,
382 )
383
384 def deregister(self):
385 """Deregister this handler"""
386 self._handle.deregister()
387 self._handle = None
388
389
390 class DestroyAlarmRPC(DtsHandler):
391 """
392 This class is used to listen for RPC calls to /vnfr:destroy-alarm, and pass
393 them on to the tasklet.
394 """
395
396 def __init__(self, project):
397 super().__init__(project)
398 self._handle = None
399
400 @asyncio.coroutine
401 def register(self):
402 """Register this handler with DTS"""
403 @asyncio.coroutine
404 def on_prepare(xact_info, action, ks_path, msg):
405 try:
406 if not self.project.rpc_check(msg, xact_info=xact_info):
407 return
408
409 yield from self.tasklet.on_destroy_alarm(
410 msg.cloud_account,
411 msg.alarm_id,
412 )
413
414 xact_info.respond_xpath(
415 rwdts.XactRspCode.ACK,
416 "O,/vnfr:destroy-alarm"
417 )
418
419 except Exception as e:
420 self.log.exception(e)
421 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
422
423 self._handle = yield from self.dts.register(
424 xpath="I,/vnfr:destroy-alarm",
425 handler=rift.tasklets.DTS.RegistrationHandler(
426 on_prepare=on_prepare
427 ),
428 flags=rwdts.Flag.PUBLISHER,
429 )
430
431 def deregister(self):
432 """Deregister this handler"""
433 self._handle.deregister()
434 self._handle = None
435
436
437 class Delegate(object):
438 """
439 This class is used to delegate calls to collections of listener objects.
440 The listeners are expected to conform to the required function arguments,
441 but this is not enforced by the Delegate class itself.
442 """
443
444 def __init__(self):
445 self._listeners = list()
446
447 def __call__(self, *args, **kwargs):
448 """Delegate the call to the registered listeners"""
449 for listener in self._listeners:
450 listener(*args, **kwargs)
451
452 def register(self, listener):
453 """Register a listener
454
455 Arguments:
456 listener - an object that function calls will be delegated to
457
458 """
459 self._listeners.append(listener)
460
461
462 class WebhookHandler(tornado.web.RequestHandler):
463 @property
464 def log(self):
465 return self.application.tasklet.log
466
467 def options(self, *args, **kargs):
468 pass
469
470 def set_default_headers(self):
471 self.set_header('Access-Control-Allow-Origin', '*')
472 self.set_header('Access-Control-Allow-Headers', 'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization')
473 self.set_header('Access-Control-Allow-Methods', 'POST')
474
475 def post(self, action, vim_id):
476 pass
477
478
479 class WebhookApplication(tornado.web.Application):
480 DEFAULT_WEBHOOK_PORT = 4568
481
482 def __init__(self, tasklet):
483 self.tasklet = tasklet
484
485 super().__init__([
486 (r"/([^/]+)/([^/]+)/?", WebhookHandler),
487 ])
488
489
490 class MonitorProject(ManoProject):
491
492 def __init__(self, name, tasklet, **kw):
493 super(MonitorProject, self).__init__(log, name)
494 self._tasklet = tasklet
495 self._log_hdl = tasklet.log_hdl
496 self._dts = tasklet.dts
497 self._loop = tasklet.loop
498
499 self.vnfr_subscriber = VnfrCatalogSubscriber(self)
500 self.cloud_cfg_subscriber = CloudAccountDtsHandler(self)
501 self.ns_instance_config_subscriber = NsInstanceConfigSubscriber(self)
502 self.launchpad_cfg_subscriber = LaunchpadConfigDtsSubscriber(self)
503
504 self.config = core.InstanceConfiguration()
505 self.config.polling_period = MonitorTasklet.DEFAULT_POLLING_PERIOD
506
507 self.monitor = core.Monitor(self.loop, self.log, self.config, self)
508 self.vdur_handlers = dict()
509
510 self.create_alarm_rpc = CreateAlarmRPC(self)
511 self.destroy_alarm_rpc = DestroyAlarmRPC(self)
512
513 @asyncio.coroutine
514 def register (self):
515 self.log.debug("creating cloud account handler")
516 self.cloud_cfg_subscriber.register()
517
518 self.log.debug("creating launchpad config subscriber")
519 yield from self.launchpad_cfg_subscriber.register()
520
521 self.log.debug("creating NS instance config subscriber")
522 yield from self.ns_instance_config_subscriber.register()
523
524 self.log.debug("creating vnfr subscriber")
525 yield from self.vnfr_subscriber.register()
526
527 self.log.debug("creating create-alarm rpc handler")
528 yield from self.create_alarm_rpc.register()
529
530 self.log.debug("creating destroy-alarm rpc handler")
531 yield from self.destroy_alarm_rpc.register()
532
533
534 @property
535 def polling_period(self):
536 return self.config.polling_period
537
538 @property
539 def public_ip(self):
540 """The public IP of the launchpad"""
541 return self.config.public_ip
542
543 def on_ns_instance_config_update(self, config):
544 """Update configuration information
545
546 Arguments:
547 config - an NsInstanceConfig object
548
549 """
550 if config.nfvi_polling_period is not None:
551 self.config.polling_period = config.nfvi_polling_period
552
553 def on_cloud_account_create(self, account):
554 self.monitor.add_cloud_account(account.cal_account_msg)
555
556 def on_cloud_account_delete(self, account_name):
557 self.monitor.remove_cloud_account(account_name)
558
559 def on_vnfr_create(self, vnfr):
560 try:
561 acc = vnfr.cloud_account
562 except AttributeError as e:
563 self.log.warning("NFVI metrics not supported")
564 return
565
566 if not self.monitor.nfvi_metrics_available(acc):
567 msg = "NFVI metrics unavailable for {}"
568 self.log.warning(msg.format(acc))
569 return
570
571 self.monitor.add_vnfr(vnfr)
572
573 # Create NFVI handlers for VDURs
574 for vdur in vnfr.vdur:
575 if vdur.vim_id is not None:
576 coro = self.register_vdur_nfvi_handler(vnfr, vdur)
577 self.loop.create_task(coro)
578
579 def on_vnfr_update(self, vnfr):
580 try:
581 acc = vnfr.cloud_account
582 except AttributeError as e:
583 self.log.warning("NFVI metrics not supported")
584 return
585
586 if not self.monitor.nfvi_metrics_available(vnfr.cloud_account):
587 msg = "NFVI metrics unavailable for {}"
588 self.log.warning(msg.format(vnfr.cloud_account))
589 return
590
591 self.monitor.update_vnfr(vnfr)
592
593 # TODO handle the removal of vdurs
594 for vdur in vnfr.vdur:
595 if vdur.vim_id is not None:
596 coro = self.register_vdur_nfvi_handler(vnfr, vdur)
597 self.loop.create_task(coro)
598
599 def on_vnfr_delete(self, vnfr):
600 self.monitor.remove_vnfr(vnfr.id)
601
602 # Delete any NFVI handlers associated with the VNFR
603 for vdur in vnfr.vdur:
604 self.deregister_vdur_nfvi_handler(vdur.id)
605
606 def on_retrieve_nfvi_metrics(self, vdur_id):
607 return self.monitor.retrieve_nfvi_metrics(vdur_id)
608
609 @asyncio.coroutine
610 def register_vdur_nfvi_handler(self, vnfr, vdur):
611 if vdur.vim_id is None:
612 return
613
614 if vdur.operational_status != "running":
615 return
616
617 if vdur.id not in self.vdur_handlers:
618 publisher = VdurNfviMetricsPublisher(self, vnfr, vdur)
619 yield from publisher.register()
620 self.vdur_handlers[vdur.id] = publisher
621
622 def deregister_vdur_nfvi_handler(self, vdur_id):
623 if vdur_id in self.vdur_handlers:
624 handler = self.vdur_handlers[vdur_id]
625
626 del self.vdur_handlers[vdur_id]
627 handler.deregister()
628
629 @asyncio.coroutine
630 def on_create_alarm(self, account, vdur_id, alarm):
631 """Creates an alarm and returns an alarm ID
632
633 Arguments:
634 account - a name of the cloud account used to authenticate the
635 creation of an alarm
636 vdur_id - the identifier of VDUR to create the alarm for
637 alarm - a structure defining the alarm that should be created
638
639 Returns:
640 An identifier specific to the created alarm
641
642 """
643 return (yield from self.monitor.create_alarm(account, vdur_id, alarm))
644
645 @asyncio.coroutine
646 def on_destroy_alarm(self, account, alarm_id):
647 """Destroys an alarm with the specified identifier
648
649 Arguments:
650 account - the name of the cloud account used to authenticate the
651 destruction of the alarm
652 alarm_id - the identifier of the alarm to destroy
653
654 """
655 yield from self.monitor.destroy_alarm(account, alarm_id)
656
657 @asyncio.coroutine
658 def delete_prepare(self):
659 # Check if any cloud accounts present
660 if self.cloud_cfg_subscriber and self.cloud_cfg_subscriber._cloud_cfg_subscriber.accounts:
661 return False
662 return True
663
664
665 class MonitorTasklet(rift.tasklets.Tasklet):
666 """
667 The MonitorTasklet provides a interface for DTS to interact with an
668 instance of the Monitor class. This allows the Monitor class to remain
669 independent of DTS.
670 """
671
672 DEFAULT_POLLING_PERIOD = 1.0
673
674 def __init__(self, *args, **kwargs):
675 try:
676 super().__init__(*args, **kwargs)
677 self.rwlog.set_category("rw-monitor-log")
678
679 self._project_handler = None
680 self.projects = {}
681
682 self.webhooks = None
683
684 except Exception as e:
685 self.log.exception(e)
686
687 def start(self):
688 super().start()
689 self.log.info("Starting MonitoringTasklet")
690
691 self.log.debug("Registering with dts")
692 self.dts = rift.tasklets.DTS(
693 self.tasklet_info,
694 RwLaunchpadYang.get_schema(),
695 self.loop,
696 self.on_dts_state_change
697 )
698
699 self.log.debug("Created DTS Api GI Object: %s", self.dts)
700
701 def stop(self):
702 try:
703 self.dts.deinit()
704 except Exception as e:
705 self.log.exception(e)
706
707 @asyncio.coroutine
708 def init(self):
709 self.log.debug("creating webhook server")
710 loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
711 self.webhooks = WebhookApplication(self)
712 self.server = tornado.httpserver.HTTPServer(
713 self.webhooks,
714 io_loop=loop,
715 )
716
717 @asyncio.coroutine
718 def on_public_ip(self, ip):
719 """Store the public IP of the launchpad
720
721 Arguments:
722 ip - a string containing the public IP address of the launchpad
723
724 """
725 self.config.public_ip = ip
726
727 @asyncio.coroutine
728 def run(self):
729 address = rwlib.getenv("RWVM_INTERNAL_IPADDR")
730 if (address is None):
731 address=""
732 self.webhooks.listen(WebhookApplication.DEFAULT_WEBHOOK_PORT, address=address)
733
734 def on_instance_started(self):
735 self.log.debug("Got instance started callback")
736
737 @asyncio.coroutine
738 def on_dts_state_change(self, state):
739 """Handle DTS state change
740
741 Take action according to current DTS state to transition application
742 into the corresponding application state
743
744 Arguments
745 state - current dts state
746
747 """
748 switch = {
749 rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
750 rwdts.State.CONFIG: rwdts.State.RUN,
751 }
752
753 handlers = {
754 rwdts.State.INIT: self.init,
755 rwdts.State.RUN: self.run,
756 }
757
758 # Transition application to next state
759 handler = handlers.get(state, None)
760 if handler is not None:
761 yield from handler()
762
763 # Transition dts to next state
764 next_state = switch.get(state, None)
765 if next_state is not None:
766 self.dts.handle.set_state(next_state)
767