-#
+#
# Copyright 2016 RIFT.IO Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
import asyncio
import concurrent.futures
+import gi
import time
-
-import tornado.web
import tornado.httpserver
+import tornado.web
-import gi
gi.require_version('RwDts', '1.0')
gi.require_version('RwLog', '1.0')
gi.require_version('RwMonitorYang', '1.0')
gi.require_version('RwLaunchpadYang', '1.0')
gi.require_version('RwNsrYang', '1.0')
gi.require_version('RwVnfrYang', '1.0')
+gi.require_version('rwlib', '1.0')
gi.require_version('RwLaunchpadYang', '1.0')
from gi.repository import (
RwDts as rwdts,
RwVnfrYang,
VnfrYang,
)
-
+import gi.repository.rwlib as rwlib
import rift.tasklets
import rift.mano.cloud
+from rift.mano.utils.project import (
+ ManoProject,
+ ProjectHandler,
+ )
+
+gi.require_version('RwKeyspec', '1.0')
+from gi.repository.RwKeyspec import quoted_key
from . import core
class DtsHandler(object):
- def __init__(self, tasklet):
+ def __init__(self, project):
self.reg = None
- self.tasklet = tasklet
+ self.project = project
@property
def log(self):
- return self.tasklet.log
+ return self.project._log
@property
def log_hdl(self):
- return self.tasklet.log_hdl
+ return self.project._log_hdl
@property
def dts(self):
- return self.tasklet.dts
+ return self.project._dts
@property
def loop(self):
- return self.tasklet.loop
+ return self.project._loop
@property
def classname(self):
with self.dts.group_create() as group:
group.register(
- xpath=VnfrCatalogSubscriber.XPATH,
+ xpath=self.project.add_project(VnfrCatalogSubscriber.XPATH),
flags=rwdts.Flag.SUBSCRIBER,
handler=handler,
)
with self.dts.appconf_group_create(acg_handler) as acg:
self.reg = acg.register(
- xpath=NsInstanceConfigSubscriber.XPATH,
+ xpath=self.project.add_project(NsInstanceConfigSubscriber.XPATH),
flags=rwdts.Flag.SUBSCRIBER,
)
class CloudAccountDtsHandler(DtsHandler):
- def __init__(self, tasklet):
- super().__init__(tasklet)
+ def __init__(self, project):
+ super().__init__(project)
self._cloud_cfg_subscriber = None
def register(self):
self.log.debug("creating cloud account config handler")
self._cloud_cfg_subscriber = rift.mano.cloud.CloudAccountConfigSubscriber(
- self.dts, self.log, self.log_hdl,
+ self.dts, self.log, self.log_hdl, self.project,
rift.mano.cloud.CloudAccountConfigCallbacks(
on_add_apply=self.tasklet.on_cloud_account_create,
on_delete_apply=self.tasklet.on_cloud_account_delete,
from a single VDU.
"""
- XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id='{}']/vnfr:vdur[vnfr:id='{}']/rw-vnfr:nfvi-metrics"
+ XPATH = "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id={}]/vnfr:vdur[vnfr:id={}]/rw-vnfr:nfvi-metrics"
# This timeout defines the length of time the publisher will wait for a
# request to a data source to complete. If the request cannot be completed
# before timing out, the current data will be published instead.
TIMEOUT = 2.0
- def __init__(self, tasklet, vnfr, vdur):
+ def __init__(self, project, vnfr, vdur):
"""Create an instance of VdurNvfiPublisher
Arguments:
vdur - the VDUR of the VDU whose metrics are published
"""
- super().__init__(tasklet)
+ super().__init__(project)
self._vnfr = vnfr
self._vdur = vdur
self._handle = None
- self._xpath = VdurNfviMetricsPublisher.XPATH.format(vnfr.id, vdur.id)
+ self._xpath = project.add_project(VdurNfviMetricsPublisher.XPATH.format(quoted_key(vnfr.id), quoted_key(vdur.id)))
self._deregistered = asyncio.Event(loop=self.loop)
with self.dts.appconf_group_create(acg_handler) as acg:
self.reg = acg.register(
- xpath="C,/rw-launchpad:launchpad-config",
+ xpath=self.project.add_project("C,/rw-launchpad:launchpad-config"),
flags=rwdts.Flag.SUBSCRIBER,
)
them on to the tasklet.
"""
- def __init__(self, tasklet):
- super().__init__(tasklet)
+ def __init__(self, project):
+ super().__init__(project)
self._handle = None
@asyncio.coroutine
@asyncio.coroutine
def on_prepare(xact_info, action, ks_path, msg):
try:
+
+ if not self.project.rpc_check(msg, xact_info=xact_info):
+ return
+
response = VnfrYang.YangOutput_Vnfr_CreateAlarm()
response.alarm_id = yield from self.tasklet.on_create_alarm(
msg.cloud_account,
them on to the tasklet.
"""
- def __init__(self, tasklet):
- super().__init__(tasklet)
+ def __init__(self, project):
+ super().__init__(project)
self._handle = None
@asyncio.coroutine
@asyncio.coroutine
def on_prepare(xact_info, action, ks_path, msg):
try:
+ if not self.project.rpc_check(msg, xact_info=xact_info):
+ return
+
yield from self.tasklet.on_destroy_alarm(
msg.cloud_account,
msg.alarm_id,
])
-class MonitorTasklet(rift.tasklets.Tasklet):
- """
- The MonitorTasklet provides a interface for DTS to interact with an
- instance of the Monitor class. This allows the Monitor class to remain
- independent of DTS.
- """
+class MonitorProject(ManoProject):
- DEFAULT_POLLING_PERIOD = 1.0
+ def __init__(self, name, tasklet, **kw):
+ super(MonitorProject, self).__init__(log, name)
+ self._tasklet = tasklet
+ self._log_hdl = tasklet.log_hdl
+ self._dts = tasklet.dts
+ self._loop = tasklet.loop
- def __init__(self, *args, **kwargs):
- try:
- super().__init__(*args, **kwargs)
- self.rwlog.set_category("rw-monitor-log")
+ self.vnfr_subscriber = VnfrCatalogSubscriber(self)
+ self.cloud_cfg_subscriber = CloudAccountDtsHandler(self)
+ self.ns_instance_config_subscriber = NsInstanceConfigSubscriber(self)
+ self.launchpad_cfg_subscriber = LaunchpadConfigDtsSubscriber(self)
- self.vnfr_subscriber = VnfrCatalogSubscriber(self)
- self.cloud_cfg_subscriber = CloudAccountDtsHandler(self)
- self.ns_instance_config_subscriber = NsInstanceConfigSubscriber(self)
- self.launchpad_cfg_subscriber = LaunchpadConfigDtsSubscriber(self)
+ self.config = core.InstanceConfiguration()
+ self.config.polling_period = MonitorTasklet.DEFAULT_POLLING_PERIOD
- self.config = core.InstanceConfiguration()
- self.config.polling_period = MonitorTasklet.DEFAULT_POLLING_PERIOD
+ self.monitor = core.Monitor(self.loop, self.log, self.config, self)
+ self.vdur_handlers = dict()
- self.monitor = core.Monitor(self.loop, self.log, self.config)
- self.vdur_handlers = dict()
-
- self.webhooks = None
- self.create_alarm_rpc = CreateAlarmRPC(self)
- self.destroy_alarm_rpc = DestroyAlarmRPC(self)
-
-
- except Exception as e:
- self.log.exception(e)
-
- @property
- def polling_period(self):
- return self.config.polling_period
-
- @property
- def public_ip(self):
- """The public IP of the launchpad"""
- return self.config.public_ip
-
- def start(self):
- super().start()
- self.log.info("Starting MonitoringTasklet")
-
- self.log.debug("Registering with dts")
- self.dts = rift.tasklets.DTS(
- self.tasklet_info,
- RwLaunchpadYang.get_schema(),
- self.loop,
- self.on_dts_state_change
- )
-
- self.log.debug("Created DTS Api GI Object: %s", self.dts)
-
- def stop(self):
- try:
- self.dts.deinit()
- except Exception as e:
- self.log.exception(e)
+ self.create_alarm_rpc = CreateAlarmRPC(self)
+ self.destroy_alarm_rpc = DestroyAlarmRPC(self)
@asyncio.coroutine
- def init(self):
+ def register (self):
self.log.debug("creating cloud account handler")
self.cloud_cfg_subscriber.register()
self.log.debug("creating destroy-alarm rpc handler")
yield from self.destroy_alarm_rpc.register()
- self.log.debug("creating webhook server")
- loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
- self.webhooks = WebhookApplication(self)
- self.server = tornado.httpserver.HTTPServer(
- self.webhooks,
- io_loop=loop,
- )
- @asyncio.coroutine
- def on_public_ip(self, ip):
- """Store the public IP of the launchpad
-
- Arguments:
- ip - a string containing the public IP address of the launchpad
+ @property
+ def polling_period(self):
+ return self.config.polling_period
- """
- self.config.public_ip = ip
+ @property
+ def public_ip(self):
+ """The public IP of the launchpad"""
+ return self.config.public_ip
def on_ns_instance_config_update(self, config):
"""Update configuration information
def on_cloud_account_delete(self, account_name):
self.monitor.remove_cloud_account(account_name)
- @asyncio.coroutine
- def run(self):
- self.webhooks.listen(WebhookApplication.DEFAULT_WEBHOOK_PORT)
-
- def on_instance_started(self):
- self.log.debug("Got instance started callback")
-
- @asyncio.coroutine
- def on_dts_state_change(self, state):
- """Handle DTS state change
-
- Take action according to current DTS state to transition application
- into the corresponding application state
-
- Arguments
- state - current dts state
-
- """
- switch = {
- rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
- rwdts.State.CONFIG: rwdts.State.RUN,
- }
-
- handlers = {
- rwdts.State.INIT: self.init,
- rwdts.State.RUN: self.run,
- }
-
- # Transition application to next state
- handler = handlers.get(state, None)
- if handler is not None:
- yield from handler()
-
- # Transition dts to next state
- next_state = switch.get(state, None)
- if next_state is not None:
- self.dts.handle.set_state(next_state)
-
def on_vnfr_create(self, vnfr):
- if not self.monitor.nfvi_metrics_available(vnfr.cloud_account):
+ try:
+ acc = vnfr.cloud_account
+ except AttributeError as e:
+ self.log.warning("NFVI metrics not supported")
+ return
+
+ if not self.monitor.nfvi_metrics_available(acc):
msg = "NFVI metrics unavailable for {}"
- self.log.warning(msg.format(vnfr.cloud_account))
+ self.log.warning(msg.format(acc))
return
self.monitor.add_vnfr(vnfr)
self.loop.create_task(coro)
def on_vnfr_update(self, vnfr):
+ try:
+ acc = vnfr.cloud_account
+ except AttributeError as e:
+ self.log.warning("NFVI metrics not supported")
+ return
+
if not self.monitor.nfvi_metrics_available(vnfr.cloud_account):
msg = "NFVI metrics unavailable for {}"
self.log.warning(msg.format(vnfr.cloud_account))
"""
yield from self.monitor.destroy_alarm(account, alarm_id)
+
+ @asyncio.coroutine
+ def delete_prepare(self):
+ # Check if any cloud accounts present
+ if self.cloud_cfg_subscriber and self.cloud_cfg_subscriber._cloud_cfg_subscriber.accounts:
+ return False
+ return True
+
+
+class MonitorTasklet(rift.tasklets.Tasklet):
+ """
+ The MonitorTasklet provides a interface for DTS to interact with an
+ instance of the Monitor class. This allows the Monitor class to remain
+ independent of DTS.
+ """
+
+ DEFAULT_POLLING_PERIOD = 1.0
+
+ def __init__(self, *args, **kwargs):
+ try:
+ super().__init__(*args, **kwargs)
+ self.rwlog.set_category("rw-monitor-log")
+
+ self._project_handler = None
+ self.projects = {}
+
+ self.webhooks = None
+
+ except Exception as e:
+ self.log.exception(e)
+
+ def start(self):
+ super().start()
+ self.log.info("Starting MonitoringTasklet")
+
+ self.log.debug("Registering with dts")
+ self.dts = rift.tasklets.DTS(
+ self.tasklet_info,
+ RwLaunchpadYang.get_schema(),
+ self.loop,
+ self.on_dts_state_change
+ )
+
+ self.log.debug("Created DTS Api GI Object: %s", self.dts)
+
+ def stop(self):
+ try:
+ self.dts.deinit()
+ except Exception as e:
+ self.log.exception(e)
+
+ @asyncio.coroutine
+ def init(self):
+ self.log.debug("creating webhook server")
+ loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop)
+ self.webhooks = WebhookApplication(self)
+ self.server = tornado.httpserver.HTTPServer(
+ self.webhooks,
+ io_loop=loop,
+ )
+
+ @asyncio.coroutine
+ def on_public_ip(self, ip):
+ """Store the public IP of the launchpad
+
+ Arguments:
+ ip - a string containing the public IP address of the launchpad
+
+ """
+ self.config.public_ip = ip
+
+ @asyncio.coroutine
+ def run(self):
+ address = rwlib.getenv("RWVM_INTERNAL_IPADDR")
+ if (address is None):
+ address=""
+ self.webhooks.listen(WebhookApplication.DEFAULT_WEBHOOK_PORT, address=address)
+
+ def on_instance_started(self):
+ self.log.debug("Got instance started callback")
+
+ @asyncio.coroutine
+ def on_dts_state_change(self, state):
+ """Handle DTS state change
+
+ Take action according to current DTS state to transition application
+ into the corresponding application state
+
+ Arguments
+ state - current dts state
+
+ """
+ switch = {
+ rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
+ rwdts.State.CONFIG: rwdts.State.RUN,
+ }
+
+ handlers = {
+ rwdts.State.INIT: self.init,
+ rwdts.State.RUN: self.run,
+ }
+
+ # Transition application to next state
+ handler = handlers.get(state, None)
+ if handler is not None:
+ yield from handler()
+
+ # Transition dts to next state
+ next_state = switch.get(state, None)
+ if next_state is not None:
+ self.dts.handle.set_state(next_state)
+