3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
20 ==================================================
23 --------------------------------------------------
25 The monitoring tasklet consists of several types of data that are associated
26 with one another. The highest level data are the cloud accounts. These objects
27 contain authentication information that is used to retrieve metrics as well as
28 the provider (and hence the available data source platforms).
30 Each cloud account is associated with an NfviMetricsPlugin. This is a
31 one-to-one relationship. The plugin is the interface to the data source that
32 will actually provide the NFVI metrics.
34 Each cloud account is also associated with several VNFRs. Each VNFR, in turn,
35 contains several VDURs. The VDURs represent the level that the NFVI metrics are
36 collected at. However, it is important that the relationships among all these
37 different objects are carefully managed.
40 CloudAccount -------------- NfviMetricsPlugin
58 --------------------------------------------------
60 The monitoring tasklet (the MonitorTasklet class) is primarily responsible for
61 the communicating between DTS and the application (the Monitor class), which
62 provides the logic for managing and interacting with the data model (see
68 import concurrent
.futures
71 import tornado
.httpserver
74 gi
.require_version('RwDts', '1.0')
75 gi
.require_version('RwLog', '1.0')
76 gi
.require_version('RwMonitorYang', '1.0')
77 gi
.require_version('RwLaunchpadYang', '1.0')
78 gi
.require_version('RwNsrYang', '1.0')
79 gi
.require_version('RwVnfrYang', '1.0')
80 gi
.require_version('rwlib', '1.0')
81 gi
.require_version('RwLaunchpadYang', '1.0')
82 from gi
.repository
import (
85 RwMonitorYang
as rwmonitor
,
90 import gi
.repository
.rwlib
as rwlib
92 import rift
.mano
.cloud
93 from rift
.mano
.utils
.project
import (
98 gi
.require_version('RwKeyspec', '1.0')
99 from gi
.repository
.RwKeyspec
import quoted_key
104 class DtsHandler(object):
105 def __init__(self
, project
):
107 self
.project
= project
111 return self
.project
._log
115 return self
.project
._log
_hdl
119 return self
.project
._dts
123 return self
.project
._loop
127 return self
.__class
__.__name
__
129 class VnfrCatalogSubscriber(DtsHandler
):
130 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
135 def on_prepare(xact_info
, action
, ks_path
, msg
):
140 if action
== rwdts
.QueryAction
.CREATE
:
141 self
.tasklet
.on_vnfr_create(msg
)
143 elif action
== rwdts
.QueryAction
.UPDATE
:
144 self
.tasklet
.on_vnfr_update(msg
)
146 elif action
== rwdts
.QueryAction
.DELETE
:
147 self
.tasklet
.on_vnfr_delete(msg
)
149 except Exception as e
:
150 self
.log
.exception(e
)
153 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
155 handler
= rift
.tasklets
.DTS
.RegistrationHandler(
156 on_prepare
=on_prepare
,
159 with self
.dts
.group_create() as group
:
161 xpath
=self
.project
.add_project(VnfrCatalogSubscriber
.XPATH
),
162 flags
=rwdts
.Flag
.SUBSCRIBER
,
167 class NsInstanceConfigSubscriber(DtsHandler
):
168 XPATH
= "C,/nsr:ns-instance-config"
172 def on_apply(dts
, acg
, xact
, action
, _
):
173 xact_config
= list(self
.reg
.get_xact_elements(xact
))
174 for config
in xact_config
:
175 self
.tasklet
.on_ns_instance_config_update(config
)
177 acg_handler
= rift
.tasklets
.AppConfGroup
.Handler(
181 with self
.dts
.appconf_group_create(acg_handler
) as acg
:
182 self
.reg
= acg
.register(
183 xpath
=self
.project
.add_project(NsInstanceConfigSubscriber
.XPATH
),
184 flags
=rwdts
.Flag
.SUBSCRIBER
,
188 class CloudAccountDtsHandler(DtsHandler
):
189 def __init__(self
, project
):
190 super().__init
__(project
)
191 self
._cloud
_cfg
_subscriber
= None
194 self
.log
.debug("creating cloud account config handler")
195 self
._cloud
_cfg
_subscriber
= rift
.mano
.cloud
.CloudAccountConfigSubscriber(
196 self
.dts
, self
.log
, self
.log_hdl
, self
.project
,
197 rift
.mano
.cloud
.CloudAccountConfigCallbacks(
198 on_add_apply
=self
.tasklet
.on_cloud_account_create
,
199 on_delete_apply
=self
.tasklet
.on_cloud_account_delete
,
202 self
._cloud
_cfg
_subscriber
.register()
205 class VdurNfviMetricsPublisher(DtsHandler
):
207 A VdurNfviMetricsPublisher is responsible for publishing the NFVI metrics
211 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]/vnfr:vdur[vnfr:id={}]/rw-vnfr:nfvi-metrics"
213 # This timeout defines the length of time the publisher will wait for a
214 # request to a data source to complete. If the request cannot be completed
215 # before timing out, the current data will be published instead.
218 def __init__(self
, project
, vnfr
, vdur
):
219 """Create an instance of VdurNvfiPublisher
222 tasklet - the tasklet
223 vnfr - the VNFR that contains the VDUR
224 vdur - the VDUR of the VDU whose metrics are published
227 super().__init
__(project
)
232 self
._xpath
= project
.add_project(VdurNfviMetricsPublisher
.XPATH
.format(quoted_key(vnfr
.id), quoted_key(vdur
.id)))
234 self
._deregistered
= asyncio
.Event(loop
=self
.loop
)
238 """The VNFR associated with this publisher"""
243 """The VDUR associated with this publisher"""
248 """The VIM ID of the VDUR associated with this publisher"""
249 return self
._vdur
.vim_id
253 """The XPATH that the metrics are published on"""
257 def dts_on_prepare(self
, xact_info
, action
, ks_path
, msg
):
258 """Handles the DTS on_prepare callback"""
259 self
.log
.debug("{}:dts_on_prepare".format(self
.classname
))
261 if action
== rwdts
.QueryAction
.READ
:
262 # If the publisher has been deregistered, the xpath element has
263 # been deleted. So we do not want to publish the metrics and
264 # re-created the element.
265 if not self
._deregistered
.is_set():
266 metrics
= self
.tasklet
.on_retrieve_nfvi_metrics(self
.vdur
.id)
267 xact_info
.respond_xpath(
268 rwdts
.XactRspCode
.MORE
,
273 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, self
.xpath
)
277 """Register the publisher with DTS"""
278 self
._handle
= yield from self
.dts
.register(
280 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
281 on_prepare
=self
.dts_on_prepare
,
283 flags
=rwdts
.Flag
.PUBLISHER
,
286 def deregister(self
):
287 """Deregister the publisher from DTS"""
288 # Mark the publisher for deregistration. This prevents the publisher
289 # from creating an element after it has been deleted.
290 self
._deregistered
.set()
292 # Now that we are done with the registration handle, delete the element
293 # and tell DTS to deregister it
294 self
._handle
.delete_element(self
.xpath
)
295 self
._handle
.deregister()
299 class LaunchpadConfigDtsSubscriber(DtsHandler
):
301 This class subscribes to the launchpad configuration and alerts the tasklet
302 to any relevant changes.
308 def apply_config(dts
, acg
, xact
, action
, _
):
309 if xact
.xact
is None:
310 # When RIFT first comes up, an INSTALL is called with the current config
311 # Since confd doesn't actally persist data this never has any data so
313 self
.log
.debug("No xact handle. Skipping apply config")
317 cfg
= list(self
.reg
.get_xact_elements(xact
))[0]
318 if cfg
.public_ip
!= self
.tasklet
.public_ip
:
319 yield from self
.tasklet
.on_public_ip(cfg
.public_ip
)
321 except Exception as e
:
322 self
.log
.exception(e
)
325 acg_handler
= rift
.tasklets
.AppConfGroup
.Handler(
326 on_apply
=apply_config
,
329 with self
.dts
.appconf_group_create(acg_handler
) as acg
:
330 self
.reg
= acg
.register(
331 xpath
=self
.project
.add_project("C,/rw-launchpad:launchpad-config"),
332 flags
=rwdts
.Flag
.SUBSCRIBER
,
335 except Exception as e
:
336 self
.log
.exception(e
)
339 class CreateAlarmRPC(DtsHandler
):
341 This class is used to listen for RPC calls to /vnfr:create-alarm, and pass
342 them on to the tasklet.
345 def __init__(self
, project
):
346 super().__init
__(project
)
351 """Register this handler with DTS"""
353 def on_prepare(xact_info
, action
, ks_path
, msg
):
356 if not self
.project
.rpc_check(msg
, xact_info
=xact_info
):
359 response
= VnfrYang
.YangOutput_Vnfr_CreateAlarm()
360 response
.alarm_id
= yield from self
.tasklet
.on_create_alarm(
366 xact_info
.respond_xpath(
367 rwdts
.XactRspCode
.ACK
,
368 "O,/vnfr:create-alarm",
372 except Exception as e
:
373 self
.log
.exception(e
)
374 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
376 self
._handle
= yield from self
.dts
.register(
377 xpath
="I,/vnfr:create-alarm",
378 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
379 on_prepare
=on_prepare
381 flags
=rwdts
.Flag
.PUBLISHER
,
384 def deregister(self
):
385 """Deregister this handler"""
386 self
._handle
.deregister()
390 class DestroyAlarmRPC(DtsHandler
):
392 This class is used to listen for RPC calls to /vnfr:destroy-alarm, and pass
393 them on to the tasklet.
396 def __init__(self
, project
):
397 super().__init
__(project
)
402 """Register this handler with DTS"""
404 def on_prepare(xact_info
, action
, ks_path
, msg
):
406 if not self
.project
.rpc_check(msg
, xact_info
=xact_info
):
409 yield from self
.tasklet
.on_destroy_alarm(
414 xact_info
.respond_xpath(
415 rwdts
.XactRspCode
.ACK
,
416 "O,/vnfr:destroy-alarm"
419 except Exception as e
:
420 self
.log
.exception(e
)
421 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
)
423 self
._handle
= yield from self
.dts
.register(
424 xpath
="I,/vnfr:destroy-alarm",
425 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
426 on_prepare
=on_prepare
428 flags
=rwdts
.Flag
.PUBLISHER
,
431 def deregister(self
):
432 """Deregister this handler"""
433 self
._handle
.deregister()
437 class Delegate(object):
439 This class is used to delegate calls to collections of listener objects.
440 The listeners are expected to conform to the required function arguments,
441 but this is not enforced by the Delegate class itself.
445 self
._listeners
= list()
447 def __call__(self
, *args
, **kwargs
):
448 """Delegate the call to the registered listeners"""
449 for listener
in self
._listeners
:
450 listener(*args
, **kwargs
)
452 def register(self
, listener
):
453 """Register a listener
456 listener - an object that function calls will be delegated to
459 self
._listeners
.append(listener
)
462 class WebhookHandler(tornado
.web
.RequestHandler
):
465 return self
.application
.tasklet
.log
467 def options(self
, *args
, **kargs
):
470 def set_default_headers(self
):
471 self
.set_header('Access-Control-Allow-Origin', '*')
472 self
.set_header('Access-Control-Allow-Headers', 'Content-Type, Cache-Control, Accept, X-Requested-With, Authorization')
473 self
.set_header('Access-Control-Allow-Methods', 'POST')
475 def post(self
, action
, vim_id
):
479 class WebhookApplication(tornado
.web
.Application
):
480 DEFAULT_WEBHOOK_PORT
= 4568
482 def __init__(self
, tasklet
):
483 self
.tasklet
= tasklet
486 (r
"/([^/]+)/([^/]+)/?", WebhookHandler
),
490 class MonitorProject(ManoProject
):
492 def __init__(self
, name
, tasklet
, **kw
):
493 super(MonitorProject
, self
).__init
__(log
, name
)
494 self
._tasklet
= tasklet
495 self
._log
_hdl
= tasklet
.log_hdl
496 self
._dts
= tasklet
.dts
497 self
._loop
= tasklet
.loop
499 self
.vnfr_subscriber
= VnfrCatalogSubscriber(self
)
500 self
.cloud_cfg_subscriber
= CloudAccountDtsHandler(self
)
501 self
.ns_instance_config_subscriber
= NsInstanceConfigSubscriber(self
)
502 self
.launchpad_cfg_subscriber
= LaunchpadConfigDtsSubscriber(self
)
504 self
.config
= core
.InstanceConfiguration()
505 self
.config
.polling_period
= MonitorTasklet
.DEFAULT_POLLING_PERIOD
507 self
.monitor
= core
.Monitor(self
.loop
, self
.log
, self
.config
, self
)
508 self
.vdur_handlers
= dict()
510 self
.create_alarm_rpc
= CreateAlarmRPC(self
)
511 self
.destroy_alarm_rpc
= DestroyAlarmRPC(self
)
515 self
.log
.debug("creating cloud account handler")
516 self
.cloud_cfg_subscriber
.register()
518 self
.log
.debug("creating launchpad config subscriber")
519 yield from self
.launchpad_cfg_subscriber
.register()
521 self
.log
.debug("creating NS instance config subscriber")
522 yield from self
.ns_instance_config_subscriber
.register()
524 self
.log
.debug("creating vnfr subscriber")
525 yield from self
.vnfr_subscriber
.register()
527 self
.log
.debug("creating create-alarm rpc handler")
528 yield from self
.create_alarm_rpc
.register()
530 self
.log
.debug("creating destroy-alarm rpc handler")
531 yield from self
.destroy_alarm_rpc
.register()
535 def polling_period(self
):
536 return self
.config
.polling_period
540 """The public IP of the launchpad"""
541 return self
.config
.public_ip
543 def on_ns_instance_config_update(self
, config
):
544 """Update configuration information
547 config - an NsInstanceConfig object
550 if config
.nfvi_polling_period
is not None:
551 self
.config
.polling_period
= config
.nfvi_polling_period
553 def on_cloud_account_create(self
, account
):
554 self
.monitor
.add_cloud_account(account
.cal_account_msg
)
556 def on_cloud_account_delete(self
, account_name
):
557 self
.monitor
.remove_cloud_account(account_name
)
559 def on_vnfr_create(self
, vnfr
):
561 acc
= vnfr
.cloud_account
562 except AttributeError as e
:
563 self
.log
.warning("NFVI metrics not supported")
566 if not self
.monitor
.nfvi_metrics_available(acc
):
567 msg
= "NFVI metrics unavailable for {}"
568 self
.log
.warning(msg
.format(acc
))
571 self
.monitor
.add_vnfr(vnfr
)
573 # Create NFVI handlers for VDURs
574 for vdur
in vnfr
.vdur
:
575 if vdur
.vim_id
is not None:
576 coro
= self
.register_vdur_nfvi_handler(vnfr
, vdur
)
577 self
.loop
.create_task(coro
)
579 def on_vnfr_update(self
, vnfr
):
581 acc
= vnfr
.cloud_account
582 except AttributeError as e
:
583 self
.log
.warning("NFVI metrics not supported")
586 if not self
.monitor
.nfvi_metrics_available(vnfr
.cloud_account
):
587 msg
= "NFVI metrics unavailable for {}"
588 self
.log
.warning(msg
.format(vnfr
.cloud_account
))
591 self
.monitor
.update_vnfr(vnfr
)
593 # TODO handle the removal of vdurs
594 for vdur
in vnfr
.vdur
:
595 if vdur
.vim_id
is not None:
596 coro
= self
.register_vdur_nfvi_handler(vnfr
, vdur
)
597 self
.loop
.create_task(coro
)
599 def on_vnfr_delete(self
, vnfr
):
600 self
.monitor
.remove_vnfr(vnfr
.id)
602 # Delete any NFVI handlers associated with the VNFR
603 for vdur
in vnfr
.vdur
:
604 self
.deregister_vdur_nfvi_handler(vdur
.id)
606 def on_retrieve_nfvi_metrics(self
, vdur_id
):
607 return self
.monitor
.retrieve_nfvi_metrics(vdur_id
)
610 def register_vdur_nfvi_handler(self
, vnfr
, vdur
):
611 if vdur
.vim_id
is None:
614 if vdur
.operational_status
!= "running":
617 if vdur
.id not in self
.vdur_handlers
:
618 publisher
= VdurNfviMetricsPublisher(self
, vnfr
, vdur
)
619 yield from publisher
.register()
620 self
.vdur_handlers
[vdur
.id] = publisher
622 def deregister_vdur_nfvi_handler(self
, vdur_id
):
623 if vdur_id
in self
.vdur_handlers
:
624 handler
= self
.vdur_handlers
[vdur_id
]
626 del self
.vdur_handlers
[vdur_id
]
630 def on_create_alarm(self
, account
, vdur_id
, alarm
):
631 """Creates an alarm and returns an alarm ID
634 account - a name of the cloud account used to authenticate the
636 vdur_id - the identifier of VDUR to create the alarm for
637 alarm - a structure defining the alarm that should be created
640 An identifier specific to the created alarm
643 return (yield from self
.monitor
.create_alarm(account
, vdur_id
, alarm
))
646 def on_destroy_alarm(self
, account
, alarm_id
):
647 """Destroys an alarm with the specified identifier
650 account - the name of the cloud account used to authenticate the
651 destruction of the alarm
652 alarm_id - the identifier of the alarm to destroy
655 yield from self
.monitor
.destroy_alarm(account
, alarm_id
)
658 def delete_prepare(self
):
659 # Check if any cloud accounts present
660 if self
.cloud_cfg_subscriber
and self
.cloud_cfg_subscriber
._cloud
_cfg
_subscriber
.accounts
:
665 class MonitorTasklet(rift
.tasklets
.Tasklet
):
667 The MonitorTasklet provides a interface for DTS to interact with an
668 instance of the Monitor class. This allows the Monitor class to remain
672 DEFAULT_POLLING_PERIOD
= 1.0
674 def __init__(self
, *args
, **kwargs
):
676 super().__init
__(*args
, **kwargs
)
677 self
.rwlog
.set_category("rw-monitor-log")
679 self
._project
_handler
= None
684 except Exception as e
:
685 self
.log
.exception(e
)
689 self
.log
.info("Starting MonitoringTasklet")
691 self
.log
.debug("Registering with dts")
692 self
.dts
= rift
.tasklets
.DTS(
694 RwLaunchpadYang
.get_schema(),
696 self
.on_dts_state_change
699 self
.log
.debug("Created DTS Api GI Object: %s", self
.dts
)
704 except Exception as e
:
705 self
.log
.exception(e
)
709 self
.log
.debug("creating webhook server")
710 loop
= rift
.tasklets
.tornado
.TaskletAsyncIOLoop(asyncio_loop
=self
.loop
)
711 self
.webhooks
= WebhookApplication(self
)
712 self
.server
= tornado
.httpserver
.HTTPServer(
718 def on_public_ip(self
, ip
):
719 """Store the public IP of the launchpad
722 ip - a string containing the public IP address of the launchpad
725 self
.config
.public_ip
= ip
729 address
= rwlib
.getenv("RWVM_INTERNAL_IPADDR")
730 if (address
is None):
732 self
.webhooks
.listen(WebhookApplication
.DEFAULT_WEBHOOK_PORT
, address
=address
)
734 def on_instance_started(self
):
735 self
.log
.debug("Got instance started callback")
738 def on_dts_state_change(self
, state
):
739 """Handle DTS state change
741 Take action according to current DTS state to transition application
742 into the corresponding application state
745 state - current dts state
749 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
750 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
754 rwdts
.State
.INIT
: self
.init
,
755 rwdts
.State
.RUN
: self
.run
,
758 # Transition application to next state
759 handler
= handlers
.get(state
, None)
760 if handler
is not None:
763 # Transition dts to next state
764 next_state
= switch
.get(state
, None)
765 if next_state
is not None:
766 self
.dts
.handle
.set_state(next_state
)