3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
20 ==================================================
23 --------------------------------------------------
25 The monitoring tasklet consists of several types of data that are associated
26 with one another. The highest level data are the cloud accounts. These objects
27 contain authentication information that is used to retrieve metrics as well as
28 the provider (and hence the available data source platforms).
30 Each cloud account is associated with an NfviMetricsPlugin. This is a
31 one-to-one relationship. The plugin is the interface to the data source that
32 will actually provide the NFVI metrics.
34 Each cloud account is also associated with several VNFRs. Each VNFR, in turn,
35 contains several VDURs. The VDURs represent the level that the NFVI metrics are
36 collected at. However, it is important that the relationships among all these
37 different objects are carefully managed.
40 CloudAccount -------------- NfviMetricsPlugin
58 --------------------------------------------------
60 The monitoring tasklet (the MonitorTasklet class) is primarily responsible for
61 the communicating between DTS and the application (the Monitor class), which
62 provides the logic for managing and interacting with the data model (see
68 import concurrent
.futures
72 import tornado
.httpserver
75 gi
.require_version('RwDts', '1.0')
76 gi
.require_version('RwLog', '1.0')
77 gi
.require_version('RwMonitorYang', '1.0')
78 gi
.require_version('RwLaunchpadYang', '1.0')
79 gi
.require_version('RwNsrYang', '1.0')
80 gi
.require_version('RwVnfrYang', '1.0')
81 gi
.require_version('RwLaunchpadYang', '1.0')
82 from gi
.repository
import (
85 RwMonitorYang
as rwmonitor
,
92 import rift
.mano
.cloud
93 from rift
.mano
.utils
.project
import (
101 class DtsHandler(object):
102 def __init__(self
, project
):
104 self
.project
= project
108 return self
.project
._log
112 return self
.project
._log
_hdl
116 return self
.project
._dts
120 return self
.project
._loop
124 return self
.__class
__.__name
__
126 class VnfrCatalogSubscriber(DtsHandler
):
127 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
132 def on_prepare(xact_info
, action
, ks_path
, msg
):
137 if action
== rwdts
.QueryAction
.CREATE
:
138 self
.tasklet
.on_vnfr_create(msg
)
140 elif action
== rwdts
.QueryAction
.UPDATE
:
141 self
.tasklet
.on_vnfr_update(msg
)
143 elif action
== rwdts
.QueryAction
.DELETE
:
144 self
.tasklet
.on_vnfr_delete(msg
)
146 except Exception as e
:
147 self
.log
.exception(e
)
150 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
152 handler
= rift
.tasklets
.DTS
.RegistrationHandler(
153 on_prepare
=on_prepare
,
156 with self
.dts
.group_create() as group
:
158 xpath
=self
.project
.add_project(VnfrCatalogSubscriber
.XPATH
),
159 flags
=rwdts
.Flag
.SUBSCRIBER
,
164 class NsInstanceConfigSubscriber(DtsHandler
):
165 XPATH
= "C,/nsr:ns-instance-config"
169 def on_apply(dts
, acg
, xact
, action
, _
):
170 xact_config
= list(self
.reg
.get_xact_elements(xact
))
171 for config
in xact_config
:
172 self
.tasklet
.on_ns_instance_config_update(config
)
174 acg_handler
= rift
.tasklets
.AppConfGroup
.Handler(
178 with self
.dts
.appconf_group_create(acg_handler
) as acg
:
179 self
.reg
= acg
.register(
180 xpath
=self
.project
.add_project(NsInstanceConfigSubscriber
.XPATH
),
181 flags
=rwdts
.Flag
.SUBSCRIBER
,
185 class CloudAccountDtsHandler(DtsHandler
):
186 def __init__(self
, project
):
187 super().__init
__(project
)
188 self
._cloud
_cfg
_subscriber
= None
191 self
.log
.debug("creating cloud account config handler")
192 self
._cloud
_cfg
_subscriber
= rift
.mano
.cloud
.CloudAccountConfigSubscriber(
193 self
.dts
, self
.log
, self
.log_hdl
, self
.project
,
194 rift
.mano
.cloud
.CloudAccountConfigCallbacks(
195 on_add_apply
=self
.tasklet
.on_cloud_account_create
,
196 on_delete_apply
=self
.tasklet
.on_cloud_account_delete
,
199 self
._cloud
_cfg
_subscriber
.register()
202 class VdurNfviMetricsPublisher(DtsHandler
):
204 A VdurNfviMetricsPublisher is responsible for publishing the NFVI metrics
208 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id='{}']/vnfr:vdur[vnfr:id='{}']/rw-vnfr:nfvi-metrics"
210 # This timeout defines the length of time the publisher will wait for a
211 # request to a data source to complete. If the request cannot be completed
212 # before timing out, the current data will be published instead.
215 def __init__(self
, project
, vnfr
, vdur
):
216 """Create an instance of VdurNvfiPublisher
219 tasklet - the tasklet
220 vnfr - the VNFR that contains the VDUR
221 vdur - the VDUR of the VDU whose metrics are published
224 super().__init
__(project
)
229 self
._xpath
= project
.add_project(VdurNfviMetricsPublisher
.XPATH
.format(vnfr
.id, vdur
.id))
231 self
._deregistered
= asyncio
.Event(loop
=self
.loop
)
235 """The VNFR associated with this publisher"""
240 """The VDUR associated with this publisher"""
245 """The VIM ID of the VDUR associated with this publisher"""
246 return self
._vdur
.vim_id
250 """The XPATH that the metrics are published on"""
254 def dts_on_prepare(self
, xact_info
, action
, ks_path
, msg
):
255 """Handles the DTS on_prepare callback"""
256 self
.log
.debug("{}:dts_on_prepare".format(self
.classname
))
258 if action
== rwdts
.QueryAction
.READ
:
259 # If the publisher has been deregistered, the xpath element has
260 # been deleted. So we do not want to publish the metrics and
261 # re-created the element.
262 if not self
._deregistered
.is_set():
263 metrics
= self
.tasklet
.on_retrieve_nfvi_metrics(self
.vdur
.id)
264 xact_info
.respond_xpath(
265 rwdts
.XactRspCode
.MORE
,
270 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, self
.xpath
)
274 """Register the publisher with DTS"""
275 self
._handle
= yield from self
.dts
.register(
277 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
278 on_prepare
=self
.dts_on_prepare
,
280 flags
=rwdts
.Flag
.PUBLISHER
,
283 def deregister(self
):
284 """Deregister the publisher from DTS"""
285 # Mark the publisher for deregistration. This prevents the publisher
286 # from creating an element after it has been deleted.
287 self
._deregistered
.set()
289 # Now that we are done with the registration handle, delete the element
290 # and tell DTS to deregister it
291 self
._handle
.delete_element(self
.xpath
)
292 self
._handle
.deregister()
296 class LaunchpadConfigDtsSubscriber(DtsHandler
):
298 This class subscribes to the launchpad configuration and alerts the tasklet
299 to any relevant changes.
305 def apply_config(dts
, acg
, xact
, action
, _
):
306 if xact
.xact
is None:
307 # When RIFT first comes up, an INSTALL is called with the current config
308 # Since confd doesn't actally persist data this never has any data so
310 self
.log
.debug("No xact handle. Skipping apply config")
314 cfg
= list(self
.reg
.get_xact_elements(xact
))[0]
315 if cfg
.public_ip
!= self
.tasklet
.public_ip
:
316 yield from self
.tasklet
.on_public_ip(cfg
.public_ip
)
318 except Exception as e
:
319 self
.log
.exception(e
)
322 acg_handler
= rift
.tasklets
.AppConfGroup
.Handler(
323 on_apply
=apply_config
,
326 with self
.dts
.appconf_group_create(acg_handler
) as acg
:
327 self
.reg
= acg
.register(
328 xpath
=self
.project
.add_project("C,/rw-launchpad:launchpad-config"),
329 flags
=rwdts
.Flag
.SUBSCRIBER
,
332 except Exception as e
:
333 self
.log
.exception(e
)
336 class CreateAlarmRPC(DtsHandler
):
338 This class is used to listen for RPC calls to /vnfr:create-alarm, and pass
339 them on to the tasklet.
342 def __init__(self
, project
):
343 super().__init
__(project
)
348 """Register this handler with DTS"""
350 def on_prepare(xact_info
, action
, ks_path
, msg
):
353 if not self
.project
.rpc_check(msg
, xact_info
=xact_info
):
356 response
= VnfrYang
.YangOutput_Vnfr_CreateAlarm()
357 response
.alarm_id
= yield from self
.tasklet
.on_create_alarm(
363 xact_info
.respond_xpath(
364 rwdts
.XactRspCode
.ACK
,
365 "O,/vnfr:create-alarm",
369 except Exception as e
:
370 self
.log
.exception(e
)
371 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
373 self
._handle
= yield from self
.dts
.register(
374 xpath
="I,/vnfr:create-alarm",
375 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
376 on_prepare
=on_prepare
378 flags
=rwdts
.Flag
.PUBLISHER
,
381 def deregister(self
):
382 """Deregister this handler"""
383 self
._handle
.deregister()
387 class DestroyAlarmRPC(DtsHandler
):
389 This class is used to listen for RPC calls to /vnfr:destroy-alarm, and pass
390 them on to the tasklet.
393 def __init__(self
, project
):
394 super().__init
__(project
)
399 """Register this handler with DTS"""
401 def on_prepare(xact_info
, action
, ks_path
, msg
):
403 if not self
.project
.rpc_check(msg
, xact_info
=xact_info
):
406 yield from self
.tasklet
.on_destroy_alarm(
411 xact_info
.respond_xpath(
412 rwdts
.XactRspCode
.ACK
,
413 "O,/vnfr:destroy-alarm"
416 except Exception as e
:
417 self
.log
.exception(e
)
418 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
420 self
._handle
= yield from self
.dts
.register(
421 xpath
="I,/vnfr:destroy-alarm",
422 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
423 on_prepare
=on_prepare
425 flags
=rwdts
.Flag
.PUBLISHER
,
428 def deregister(self
):
429 """Deregister this handler"""
430 self
._handle
.deregister()
434 class Delegate(object):
436 This class is used to delegate calls to collections of listener objects.
437 The listeners are expected to conform to the required function arguments,
438 but this is not enforced by the Delegate class itself.
442 self
._listeners
= list()
444 def __call__(self
, *args
, **kwargs
):
445 """Delegate the call to the registered listeners"""
446 for listener
in self
._listeners
:
447 listener(*args
, **kwargs
)
449 def register(self
, listener
):
450 """Register a listener
453 listener - an object that function calls will be delegated to
456 self
._listeners
.append(listener
)
459 class WebhookHandler(tornado
.web
.RequestHandler
):
462 return self
.application
.tasklet
.log
464 def options(self
, *args
, **kargs
):
467 def set_default_headers(self
):
468 self
.set_header('Access-Control-Allow-Origin', '*')
469 self
.set_header('Access-Control-Allow-Headers', 'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization')
470 self
.set_header('Access-Control-Allow-Methods', 'POST')
472 def post(self
, action
, vim_id
):
476 class WebhookApplication(tornado
.web
.Application
):
477 DEFAULT_WEBHOOK_PORT
= 4568
479 def __init__(self
, tasklet
):
480 self
.tasklet
= tasklet
483 (r
"/([^/]+)/([^/]+)/?", WebhookHandler
),
487 class MonitorProject(ManoProject
):
489 def __init__(self
, name
, tasklet
, **kw
):
490 super(MonitorProject
, self
).__init
__(log
, name
)
491 self
._tasklet
= tasklet
492 self
._log
_hdl
= tasklet
.log_hdl
493 self
._dts
= tasklet
.dts
494 self
._loop
= tasklet
.loop
496 self
.vnfr_subscriber
= VnfrCatalogSubscriber(self
)
497 self
.cloud_cfg_subscriber
= CloudAccountDtsHandler(self
)
498 self
.ns_instance_config_subscriber
= NsInstanceConfigSubscriber(self
)
499 self
.launchpad_cfg_subscriber
= LaunchpadConfigDtsSubscriber(self
)
501 self
.config
= core
.InstanceConfiguration()
502 self
.config
.polling_period
= MonitorTasklet
.DEFAULT_POLLING_PERIOD
504 self
.monitor
= core
.Monitor(self
.loop
, self
.log
, self
.config
, self
)
505 self
.vdur_handlers
= dict()
507 self
.create_alarm_rpc
= CreateAlarmRPC(self
)
508 self
.destroy_alarm_rpc
= DestroyAlarmRPC(self
)
512 self
.log
.debug("creating cloud account handler")
513 self
.cloud_cfg_subscriber
.register()
515 self
.log
.debug("creating launchpad config subscriber")
516 yield from self
.launchpad_cfg_subscriber
.register()
518 self
.log
.debug("creating NS instance config subscriber")
519 yield from self
.ns_instance_config_subscriber
.register()
521 self
.log
.debug("creating vnfr subscriber")
522 yield from self
.vnfr_subscriber
.register()
524 self
.log
.debug("creating create-alarm rpc handler")
525 yield from self
.create_alarm_rpc
.register()
527 self
.log
.debug("creating destroy-alarm rpc handler")
528 yield from self
.destroy_alarm_rpc
.register()
532 def polling_period(self
):
533 return self
.config
.polling_period
537 """The public IP of the launchpad"""
538 return self
.config
.public_ip
540 def on_ns_instance_config_update(self
, config
):
541 """Update configuration information
544 config - an NsInstanceConfig object
547 if config
.nfvi_polling_period
is not None:
548 self
.config
.polling_period
= config
.nfvi_polling_period
550 def on_cloud_account_create(self
, account
):
551 self
.monitor
.add_cloud_account(account
.cal_account_msg
)
553 def on_cloud_account_delete(self
, account_name
):
554 self
.monitor
.remove_cloud_account(account_name
)
556 def on_vnfr_create(self
, vnfr
):
557 if not self
.monitor
.nfvi_metrics_available(vnfr
.cloud_account
):
558 msg
= "NFVI metrics unavailable for {}"
559 self
.log
.warning(msg
.format(vnfr
.cloud_account
))
562 self
.monitor
.add_vnfr(vnfr
)
564 # Create NFVI handlers for VDURs
565 for vdur
in vnfr
.vdur
:
566 if vdur
.vim_id
is not None:
567 coro
= self
.register_vdur_nfvi_handler(vnfr
, vdur
)
568 self
.loop
.create_task(coro
)
570 def on_vnfr_update(self
, vnfr
):
571 if not self
.monitor
.nfvi_metrics_available(vnfr
.cloud_account
):
572 msg
= "NFVI metrics unavailable for {}"
573 self
.log
.warning(msg
.format(vnfr
.cloud_account
))
576 self
.monitor
.update_vnfr(vnfr
)
578 # TODO handle the removal of vdurs
579 for vdur
in vnfr
.vdur
:
580 if vdur
.vim_id
is not None:
581 coro
= self
.register_vdur_nfvi_handler(vnfr
, vdur
)
582 self
.loop
.create_task(coro
)
584 def on_vnfr_delete(self
, vnfr
):
585 self
.monitor
.remove_vnfr(vnfr
.id)
587 # Delete any NFVI handlers associated with the VNFR
588 for vdur
in vnfr
.vdur
:
589 self
.deregister_vdur_nfvi_handler(vdur
.id)
591 def on_retrieve_nfvi_metrics(self
, vdur_id
):
592 return self
.monitor
.retrieve_nfvi_metrics(vdur_id
)
595 def register_vdur_nfvi_handler(self
, vnfr
, vdur
):
596 if vdur
.vim_id
is None:
599 if vdur
.operational_status
!= "running":
602 if vdur
.id not in self
.vdur_handlers
:
603 publisher
= VdurNfviMetricsPublisher(self
, vnfr
, vdur
)
604 yield from publisher
.register()
605 self
.vdur_handlers
[vdur
.id] = publisher
607 def deregister_vdur_nfvi_handler(self
, vdur_id
):
608 if vdur_id
in self
.vdur_handlers
:
609 handler
= self
.vdur_handlers
[vdur_id
]
611 del self
.vdur_handlers
[vdur_id
]
615 def on_create_alarm(self
, account
, vdur_id
, alarm
):
616 """Creates an alarm and returns an alarm ID
619 account - a name of the cloud account used to authenticate the
621 vdur_id - the identifier of VDUR to create the alarm for
622 alarm - a structure defining the alarm that should be created
625 An identifier specific to the created alarm
628 return (yield from self
.monitor
.create_alarm(account
, vdur_id
, alarm
))
631 def on_destroy_alarm(self
, account
, alarm_id
):
632 """Destroys an alarm with the specified identifier
635 account - the name of the cloud account used to authenticate the
636 destruction of the alarm
637 alarm_id - the identifier of the alarm to destroy
640 yield from self
.monitor
.destroy_alarm(account
, alarm_id
)
643 class MonitorTasklet(rift
.tasklets
.Tasklet
):
645 The MonitorTasklet provides a interface for DTS to interact with an
646 instance of the Monitor class. This allows the Monitor class to remain
650 DEFAULT_POLLING_PERIOD
= 1.0
652 def __init__(self
, *args
, **kwargs
):
654 super().__init
__(*args
, **kwargs
)
655 self
.rwlog
.set_category("rw-monitor-log")
657 self
._project
_handler
= None
662 except Exception as e
:
663 self
.log
.exception(e
)
667 self
.log
.info("Starting MonitoringTasklet")
669 self
.log
.debug("Registering with dts")
670 self
.dts
= rift
.tasklets
.DTS(
672 RwLaunchpadYang
.get_schema(),
674 self
.on_dts_state_change
677 self
.log
.debug("Created DTS Api GI Object: %s", self
.dts
)
682 except Exception as e
:
683 self
.log
.exception(e
)
687 self
.log
.debug("creating webhook server")
688 loop
= rift
.tasklets
.tornado
.TaskletAsyncIOLoop(asyncio_loop
=self
.loop
)
689 self
.webhooks
= WebhookApplication(self
)
690 self
.server
= tornado
.httpserver
.HTTPServer(
696 def on_public_ip(self
, ip
):
697 """Store the public IP of the launchpad
700 ip - a string containing the public IP address of the launchpad
703 self
.config
.public_ip
= ip
707 self
.webhooks
.listen(WebhookApplication
.DEFAULT_WEBHOOK_PORT
)
709 def on_instance_started(self
):
710 self
.log
.debug("Got instance started callback")
713 def on_dts_state_change(self
, state
):
714 """Handle DTS state change
716 Take action according to current DTS state to transition application
717 into the corresponding application state
720 state - current dts state
724 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
725 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
729 rwdts
.State
.INIT
: self
.init
,
730 rwdts
.State
.RUN
: self
.run
,
733 # Transition application to next state
734 handler
= handlers
.get(state
, None)
735 if handler
is not None:
738 # Transition dts to next state
739 next_state
= switch
.get(state
, None)
740 if next_state
is not None:
741 self
.dts
.handle
.set_state(next_state
)