3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
20 ==================================================
23 --------------------------------------------------
25 The monitoring tasklet consists of several types of data that are associated
26 with one another. The highest level data are the cloud accounts. These objects
27 contain authentication information that is used to retrieve metrics as well as
28 the provider (and hence the available data source platforms).
30 Each cloud account is associated with an NfviMetricsPlugin. This is a
31 one-to-one relationship. The plugin is the interface to the data source that
32 will actually provide the NFVI metrics.
34 Each cloud account is also associated with several VNFRs. Each VNFR, in turn,
35 contains several VDURs. The VDURs represent the level that the NFVI metrics are
36 collected at. However, it is important that the relationships among all these
37 different objects are carefully managed.
40 CloudAccount -------------- NfviMetricsPlugin
58 --------------------------------------------------
60 The monitoring tasklet (the MonitorTasklet class) is primarily responsible for
61 the communicating between DTS and the application (the Monitor class), which
62 provides the logic for managing and interacting with the data model (see
68 import concurrent
.futures
72 import tornado
.httpserver
75 gi
.require_version('RwDts', '1.0')
76 gi
.require_version('RwLog', '1.0')
77 gi
.require_version('RwMonitorYang', '1.0')
78 gi
.require_version('RwLaunchpadYang', '1.0')
79 gi
.require_version('RwNsrYang', '1.0')
80 gi
.require_version('RwVnfrYang', '1.0')
81 gi
.require_version('RwLaunchpadYang', '1.0')
82 from gi
.repository
import (
85 RwMonitorYang
as rwmonitor
,
92 import rift
.mano
.cloud
97 class DtsHandler(object):
98 def __init__(self
, tasklet
):
100 self
.tasklet
= tasklet
104 return self
.tasklet
.log
108 return self
.tasklet
.log_hdl
112 return self
.tasklet
.dts
116 return self
.tasklet
.loop
120 return self
.__class
__.__name
__
122 class VnfrCatalogSubscriber(DtsHandler
):
123 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
128 def on_prepare(xact_info
, action
, ks_path
, msg
):
133 if action
== rwdts
.QueryAction
.CREATE
:
134 self
.tasklet
.on_vnfr_create(msg
)
136 elif action
== rwdts
.QueryAction
.UPDATE
:
137 self
.tasklet
.on_vnfr_update(msg
)
139 elif action
== rwdts
.QueryAction
.DELETE
:
140 self
.tasklet
.on_vnfr_delete(msg
)
142 except Exception as e
:
143 self
.log
.exception(e
)
146 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
148 handler
= rift
.tasklets
.DTS
.RegistrationHandler(
149 on_prepare
=on_prepare
,
152 with self
.dts
.group_create() as group
:
154 xpath
=VnfrCatalogSubscriber
.XPATH
,
155 flags
=rwdts
.Flag
.SUBSCRIBER
,
160 class NsInstanceConfigSubscriber(DtsHandler
):
161 XPATH
= "C,/nsr:ns-instance-config"
165 def on_apply(dts
, acg
, xact
, action
, _
):
166 xact_config
= list(self
.reg
.get_xact_elements(xact
))
167 for config
in xact_config
:
168 self
.tasklet
.on_ns_instance_config_update(config
)
170 acg_handler
= rift
.tasklets
.AppConfGroup
.Handler(
174 with self
.dts
.appconf_group_create(acg_handler
) as acg
:
175 self
.reg
= acg
.register(
176 xpath
=NsInstanceConfigSubscriber
.XPATH
,
177 flags
=rwdts
.Flag
.SUBSCRIBER
,
181 class CloudAccountDtsHandler(DtsHandler
):
182 def __init__(self
, tasklet
):
183 super().__init
__(tasklet
)
184 self
._cloud
_cfg
_subscriber
= None
187 self
.log
.debug("creating cloud account config handler")
188 self
._cloud
_cfg
_subscriber
= rift
.mano
.cloud
.CloudAccountConfigSubscriber(
189 self
.dts
, self
.log
, self
.log_hdl
,
190 rift
.mano
.cloud
.CloudAccountConfigCallbacks(
191 on_add_apply
=self
.tasklet
.on_cloud_account_create
,
192 on_delete_apply
=self
.tasklet
.on_cloud_account_delete
,
195 self
._cloud
_cfg
_subscriber
.register()
198 class VdurNfviMetricsPublisher(DtsHandler
):
200 A VdurNfviMetricsPublisher is responsible for publishing the NFVI metrics
204 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id='{}']/vnfr:vdur[vnfr:id='{}']/rw-vnfr:nfvi-metrics"
206 # This timeout defines the length of time the publisher will wait for a
207 # request to a data source to complete. If the request cannot be completed
208 # before timing out, the current data will be published instead.
211 def __init__(self
, tasklet
, vnfr
, vdur
):
212 """Create an instance of VdurNvfiPublisher
215 tasklet - the tasklet
216 vnfr - the VNFR that contains the VDUR
217 vdur - the VDUR of the VDU whose metrics are published
220 super().__init
__(tasklet
)
225 self
._xpath
= VdurNfviMetricsPublisher
.XPATH
.format(vnfr
.id, vdur
.id)
227 self
._deregistered
= asyncio
.Event(loop
=self
.loop
)
231 """The VNFR associated with this publisher"""
236 """The VDUR associated with this publisher"""
241 """The VIM ID of the VDUR associated with this publisher"""
242 return self
._vdur
.vim_id
246 """The XPATH that the metrics are published on"""
250 def dts_on_prepare(self
, xact_info
, action
, ks_path
, msg
):
251 """Handles the DTS on_prepare callback"""
252 self
.log
.debug("{}:dts_on_prepare".format(self
.classname
))
254 if action
== rwdts
.QueryAction
.READ
:
255 # If the publisher has been deregistered, the xpath element has
256 # been deleted. So we do not want to publish the metrics and
257 # re-created the element.
258 if not self
._deregistered
.is_set():
259 metrics
= self
.tasklet
.on_retrieve_nfvi_metrics(self
.vdur
.id)
260 xact_info
.respond_xpath(
261 rwdts
.XactRspCode
.MORE
,
266 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, self
.xpath
)
270 """Register the publisher with DTS"""
271 self
._handle
= yield from self
.dts
.register(
273 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
274 on_prepare
=self
.dts_on_prepare
,
276 flags
=rwdts
.Flag
.PUBLISHER
,
279 def deregister(self
):
280 """Deregister the publisher from DTS"""
281 # Mark the publisher for deregistration. This prevents the publisher
282 # from creating an element after it has been deleted.
283 self
._deregistered
.set()
285 # Now that we are done with the registration handle, delete the element
286 # and tell DTS to deregister it
287 self
._handle
.delete_element(self
.xpath
)
288 self
._handle
.deregister()
292 class LaunchpadConfigDtsSubscriber(DtsHandler
):
294 This class subscribes to the launchpad configuration and alerts the tasklet
295 to any relevant changes.
301 def apply_config(dts
, acg
, xact
, action
, _
):
302 if xact
.xact
is None:
303 # When RIFT first comes up, an INSTALL is called with the current config
304 # Since confd doesn't actally persist data this never has any data so
306 self
.log
.debug("No xact handle. Skipping apply config")
310 cfg
= list(self
.reg
.get_xact_elements(xact
))[0]
311 if cfg
.public_ip
!= self
.tasklet
.public_ip
:
312 yield from self
.tasklet
.on_public_ip(cfg
.public_ip
)
314 except Exception as e
:
315 self
.log
.exception(e
)
318 acg_handler
= rift
.tasklets
.AppConfGroup
.Handler(
319 on_apply
=apply_config
,
322 with self
.dts
.appconf_group_create(acg_handler
) as acg
:
323 self
.reg
= acg
.register(
324 xpath
="C,/rw-launchpad:launchpad-config",
325 flags
=rwdts
.Flag
.SUBSCRIBER
,
328 except Exception as e
:
329 self
.log
.exception(e
)
332 class CreateAlarmRPC(DtsHandler
):
334 This class is used to listen for RPC calls to /vnfr:create-alarm, and pass
335 them on to the tasklet.
338 def __init__(self
, tasklet
):
339 super().__init
__(tasklet
)
344 """Register this handler with DTS"""
346 def on_prepare(xact_info
, action
, ks_path
, msg
):
348 response
= VnfrYang
.YangOutput_Vnfr_CreateAlarm()
349 response
.alarm_id
= yield from self
.tasklet
.on_create_alarm(
355 xact_info
.respond_xpath(
356 rwdts
.XactRspCode
.ACK
,
357 "O,/vnfr:create-alarm",
361 except Exception as e
:
362 self
.log
.exception(e
)
363 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
365 self
._handle
= yield from self
.dts
.register(
366 xpath
="I,/vnfr:create-alarm",
367 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
368 on_prepare
=on_prepare
370 flags
=rwdts
.Flag
.PUBLISHER
,
373 def deregister(self
):
374 """Deregister this handler"""
375 self
._handle
.deregister()
379 class DestroyAlarmRPC(DtsHandler
):
381 This class is used to listen for RPC calls to /vnfr:destroy-alarm, and pass
382 them on to the tasklet.
385 def __init__(self
, tasklet
):
386 super().__init
__(tasklet
)
391 """Register this handler with DTS"""
393 def on_prepare(xact_info
, action
, ks_path
, msg
):
395 yield from self
.tasklet
.on_destroy_alarm(
400 xact_info
.respond_xpath(
401 rwdts
.XactRspCode
.ACK
,
402 "O,/vnfr:destroy-alarm"
405 except Exception as e
:
406 self
.log
.exception(e
)
407 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
409 self
._handle
= yield from self
.dts
.register(
410 xpath
="I,/vnfr:destroy-alarm",
411 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
412 on_prepare
=on_prepare
414 flags
=rwdts
.Flag
.PUBLISHER
,
417 def deregister(self
):
418 """Deregister this handler"""
419 self
._handle
.deregister()
423 class Delegate(object):
425 This class is used to delegate calls to collections of listener objects.
426 The listeners are expected to conform to the required function arguments,
427 but this is not enforced by the Delegate class itself.
431 self
._listeners
= list()
433 def __call__(self
, *args
, **kwargs
):
434 """Delegate the call to the registered listeners"""
435 for listener
in self
._listeners
:
436 listener(*args
, **kwargs
)
438 def register(self
, listener
):
439 """Register a listener
442 listener - an object that function calls will be delegated to
445 self
._listeners
.append(listener
)
448 class WebhookHandler(tornado
.web
.RequestHandler
):
451 return self
.application
.tasklet
.log
453 def options(self
, *args
, **kargs
):
456 def set_default_headers(self
):
457 self
.set_header('Access-Control-Allow-Origin', '*')
458 self
.set_header('Access-Control-Allow-Headers', 'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization')
459 self
.set_header('Access-Control-Allow-Methods', 'POST')
461 def post(self
, action
, vim_id
):
465 class WebhookApplication(tornado
.web
.Application
):
466 DEFAULT_WEBHOOK_PORT
= 4568
468 def __init__(self
, tasklet
):
469 self
.tasklet
= tasklet
472 (r
"/([^/]+)/([^/]+)/?", WebhookHandler
),
476 class MonitorTasklet(rift
.tasklets
.Tasklet
):
478 The MonitorTasklet provides a interface for DTS to interact with an
479 instance of the Monitor class. This allows the Monitor class to remain
483 DEFAULT_POLLING_PERIOD
= 1.0
485 def __init__(self
, *args
, **kwargs
):
487 super().__init
__(*args
, **kwargs
)
488 self
.rwlog
.set_category("rw-monitor-log")
490 self
.vnfr_subscriber
= VnfrCatalogSubscriber(self
)
491 self
.cloud_cfg_subscriber
= CloudAccountDtsHandler(self
)
492 self
.ns_instance_config_subscriber
= NsInstanceConfigSubscriber(self
)
493 self
.launchpad_cfg_subscriber
= LaunchpadConfigDtsSubscriber(self
)
495 self
.config
= core
.InstanceConfiguration()
496 self
.config
.polling_period
= MonitorTasklet
.DEFAULT_POLLING_PERIOD
498 self
.monitor
= core
.Monitor(self
.loop
, self
.log
, self
.config
)
499 self
.vdur_handlers
= dict()
502 self
.create_alarm_rpc
= CreateAlarmRPC(self
)
503 self
.destroy_alarm_rpc
= DestroyAlarmRPC(self
)
506 except Exception as e
:
507 self
.log
.exception(e
)
510 def polling_period(self
):
511 return self
.config
.polling_period
515 """The public IP of the launchpad"""
516 return self
.config
.public_ip
520 self
.log
.info("Starting MonitoringTasklet")
522 self
.log
.debug("Registering with dts")
523 self
.dts
= rift
.tasklets
.DTS(
525 RwLaunchpadYang
.get_schema(),
527 self
.on_dts_state_change
530 self
.log
.debug("Created DTS Api GI Object: %s", self
.dts
)
535 except Exception as e
:
536 self
.log
.exception(e
)
540 self
.log
.debug("creating cloud account handler")
541 self
.cloud_cfg_subscriber
.register()
543 self
.log
.debug("creating launchpad config subscriber")
544 yield from self
.launchpad_cfg_subscriber
.register()
546 self
.log
.debug("creating NS instance config subscriber")
547 yield from self
.ns_instance_config_subscriber
.register()
549 self
.log
.debug("creating vnfr subscriber")
550 yield from self
.vnfr_subscriber
.register()
552 self
.log
.debug("creating create-alarm rpc handler")
553 yield from self
.create_alarm_rpc
.register()
555 self
.log
.debug("creating destroy-alarm rpc handler")
556 yield from self
.destroy_alarm_rpc
.register()
558 self
.log
.debug("creating webhook server")
559 loop
= rift
.tasklets
.tornado
.TaskletAsyncIOLoop(asyncio_loop
=self
.loop
)
560 self
.webhooks
= WebhookApplication(self
)
561 self
.server
= tornado
.httpserver
.HTTPServer(
567 def on_public_ip(self
, ip
):
568 """Store the public IP of the launchpad
571 ip - a string containing the public IP address of the launchpad
574 self
.config
.public_ip
= ip
576 def on_ns_instance_config_update(self
, config
):
577 """Update configuration information
580 config - an NsInstanceConfig object
583 if config
.nfvi_polling_period
is not None:
584 self
.config
.polling_period
= config
.nfvi_polling_period
586 def on_cloud_account_create(self
, account
):
587 self
.monitor
.add_cloud_account(account
.cal_account_msg
)
589 def on_cloud_account_delete(self
, account_name
):
590 self
.monitor
.remove_cloud_account(account_name
)
594 self
.webhooks
.listen(WebhookApplication
.DEFAULT_WEBHOOK_PORT
)
596 def on_instance_started(self
):
597 self
.log
.debug("Got instance started callback")
600 def on_dts_state_change(self
, state
):
601 """Handle DTS state change
603 Take action according to current DTS state to transition application
604 into the corresponding application state
607 state - current dts state
611 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
612 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
616 rwdts
.State
.INIT
: self
.init
,
617 rwdts
.State
.RUN
: self
.run
,
620 # Transition application to next state
621 handler
= handlers
.get(state
, None)
622 if handler
is not None:
625 # Transition dts to next state
626 next_state
= switch
.get(state
, None)
627 if next_state
is not None:
628 self
.dts
.handle
.set_state(next_state
)
630 def on_vnfr_create(self
, vnfr
):
631 if not self
.monitor
.nfvi_metrics_available(vnfr
.cloud_account
):
632 msg
= "NFVI metrics unavailable for {}"
633 self
.log
.warning(msg
.format(vnfr
.cloud_account
))
636 self
.monitor
.add_vnfr(vnfr
)
638 # Create NFVI handlers for VDURs
639 for vdur
in vnfr
.vdur
:
640 if vdur
.vim_id
is not None:
641 coro
= self
.register_vdur_nfvi_handler(vnfr
, vdur
)
642 self
.loop
.create_task(coro
)
644 def on_vnfr_update(self
, vnfr
):
645 if not self
.monitor
.nfvi_metrics_available(vnfr
.cloud_account
):
646 msg
= "NFVI metrics unavailable for {}"
647 self
.log
.warning(msg
.format(vnfr
.cloud_account
))
650 self
.monitor
.update_vnfr(vnfr
)
652 # TODO handle the removal of vdurs
653 for vdur
in vnfr
.vdur
:
654 if vdur
.vim_id
is not None:
655 coro
= self
.register_vdur_nfvi_handler(vnfr
, vdur
)
656 self
.loop
.create_task(coro
)
658 def on_vnfr_delete(self
, vnfr
):
659 self
.monitor
.remove_vnfr(vnfr
.id)
661 # Delete any NFVI handlers associated with the VNFR
662 for vdur
in vnfr
.vdur
:
663 self
.deregister_vdur_nfvi_handler(vdur
.id)
665 def on_retrieve_nfvi_metrics(self
, vdur_id
):
666 return self
.monitor
.retrieve_nfvi_metrics(vdur_id
)
669 def register_vdur_nfvi_handler(self
, vnfr
, vdur
):
670 if vdur
.vim_id
is None:
673 if vdur
.operational_status
!= "running":
676 if vdur
.id not in self
.vdur_handlers
:
677 publisher
= VdurNfviMetricsPublisher(self
, vnfr
, vdur
)
678 yield from publisher
.register()
679 self
.vdur_handlers
[vdur
.id] = publisher
681 def deregister_vdur_nfvi_handler(self
, vdur_id
):
682 if vdur_id
in self
.vdur_handlers
:
683 handler
= self
.vdur_handlers
[vdur_id
]
685 del self
.vdur_handlers
[vdur_id
]
689 def on_create_alarm(self
, account
, vdur_id
, alarm
):
690 """Creates an alarm and returns an alarm ID
693 account - a name of the cloud account used to authenticate the
695 vdur_id - the identifier of VDUR to create the alarm for
696 alarm - a structure defining the alarm that should be created
699 An identifier specific to the created alarm
702 return (yield from self
.monitor
.create_alarm(account
, vdur_id
, alarm
))
705 def on_destroy_alarm(self
, account
, alarm_id
):
706 """Destroys an alarm with the specified identifier
709 account - the name of the cloud account used to authenticate the
710 destruction of the alarm
711 alarm_id - the identifier of the alarm to destroy
714 yield from self
.monitor
.destroy_alarm(account
, alarm_id
)