--- /dev/null
+#!/usr/bin/env python3
+
+#
+# Copyright 2016 RIFT.IO Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+import argparse
+import asyncio
+import concurrent.futures
+import logging
+import os
+import sys
+import time
+import unittest
+import uuid
+import xmlrunner
+
+import gi
+gi.require_version('NsrYang', '1.0')
+gi.require_version('RwcalYang', '1.0')
+gi.require_version('RwmonYang', '1.0')
+gi.require_version('RwVnfrYang', '1.0')
+gi.require_version('RwTypes', '1.0')
+gi.require_version('RwMon', '1.0')
+
+from gi.repository import (
+ NsrYang,
+ RwTypes,
+ RwVnfrYang,
+ RwcalYang,
+ RwmonYang,
+ VnfrYang,
+ )
+
+from rift.tasklets.rwmonitor.core import (
+ AccountAlreadyRegisteredError,
+ AccountInUseError,
+ InstanceConfiguration,
+ Monitor,
+ NfviInterface,
+ NfviMetrics,
+ NfviMetricsCache,
+ NfviMetricsPluginManager,
+ PluginFactory,
+ PluginNotSupportedError,
+ PluginUnavailableError,
+ UnknownAccountError,
+ )
+import rw_peas
+
+
+class wait_for_pending_tasks(object):
+ """
+ This class defines a decorator that can be used to ensure that any asyncio
+ tasks created as a side-effect of coroutine are allowed to come to
+ completion.
+ """
+
+ def __init__(self, loop, timeout=1):
+ self.loop = loop
+ self.timeout = timeout
+
+ def __call__(self, coro):
+ @asyncio.coroutine
+ def impl():
+ original = self.pending_tasks()
+ result = yield from coro()
+
+ current = self.pending_tasks()
+ remaining = current - original
+
+ if remaining:
+ yield from asyncio.wait(
+ remaining,
+ timeout=self.timeout,
+ loop=self.loop,
+ )
+
+ return result
+
+ return impl
+
+ def pending_tasks(self):
+ return {t for t in asyncio.Task.all_tasks(loop=self.loop) if not t.done()}
+
+
+class MockTasklet(object):
+ def __init__(self, dts, log, loop, records):
+ self.dts = dts
+ self.log = log
+ self.loop = loop
+ self.records = records
+ self.polling_period = 0
+ self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=16)
+
+
+def make_nsr(ns_instance_config_ref=str(uuid.uuid4())):
+ nsr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr()
+ nsr.ns_instance_config_ref = ns_instance_config_ref
+ return nsr
+
+def make_vnfr(id=str(uuid.uuid4())):
+ vnfr = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr()
+ vnfr.id = id
+ return vnfr
+
+def make_vdur(id=str(uuid.uuid4()), vim_id=str(uuid.uuid4())):
+ vdur = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur()
+ vdur.id = id
+ vdur.vim_id = vim_id
+ return vdur
+
+
+class TestNfviMetricsCache(unittest.TestCase):
+ class Plugin(object):
+ def nfvi_metrics_available(self, cloud_account):
+ return True
+
+ def nfvi_metrics(self, account, vim_id):
+ metrics = RwmonYang.NfviMetrics()
+ metrics.vcpu.utilization = 0.5
+ return metrics
+
+ def setUp(self):
+ self.loop = asyncio.new_event_loop()
+ self.logger = logging.getLogger('test-logger')
+
+ self.account = RwcalYang.CloudAccount(
+ name='test-cloud-account',
+ account_type="mock",
+ )
+
+ self.plugin_manager = NfviMetricsPluginManager(self.logger)
+ self.plugin_manager.register(self.account, "mock")
+
+ mock = self.plugin_manager.plugin(self.account.name)
+ mock.set_impl(TestNfviMetricsCache.Plugin())
+
+ self.vdur = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur()
+ self.vdur.id = "test-vdur-id"
+ self.vdur.vim_id = "test-vim-id"
+ self.vdur.vm_flavor.vcpu_count = 4
+ self.vdur.vm_flavor.memory_mb = 1
+ self.vdur.vm_flavor.storage_gb = 1
+
+ def test_create_destroy_entry(self):
+ cache = NfviMetricsCache(self.logger, self.loop, self.plugin_manager)
+ self.assertEqual(len(cache._nfvi_metrics), 0)
+
+ cache.create_entry(self.account, self.vdur)
+ self.assertEqual(len(cache._nfvi_metrics), 1)
+
+ cache.destroy_entry(self.vdur.id)
+ self.assertEqual(len(cache._nfvi_metrics), 0)
+
+ def test_retrieve(self):
+ NfviMetrics.SAMPLE_INTERVAL = 1
+
+ cache = NfviMetricsCache(self.logger, self.loop, self.plugin_manager)
+ cache.create_entry(self.account, self.vdur)
+
+ @wait_for_pending_tasks(self.loop)
+ @asyncio.coroutine
+ def retrieve_metrics():
+ metrics = cache.retrieve("test-vim-id")
+ self.assertEqual(metrics.vcpu.utilization, 0.0)
+
+ yield from asyncio.sleep(NfviMetrics.SAMPLE_INTERVAL, loop=self.loop)
+
+ metrics = cache.retrieve("test-vim-id")
+ self.assertEqual(metrics.vcpu.utilization, 0.5)
+
+ self.loop.run_until_complete(retrieve_metrics())
+
+ def test_id_mapping(self):
+ cache = NfviMetricsCache(self.logger, self.loop, self.plugin_manager)
+
+ cache.create_entry(self.account, self.vdur)
+
+ self.assertEqual(cache.to_vim_id(self.vdur.id), self.vdur.vim_id)
+ self.assertEqual(cache.to_vdur_id(self.vdur.vim_id), self.vdur.id)
+ self.assertTrue(cache.contains_vdur_id(self.vdur.id))
+ self.assertTrue(cache.contains_vim_id(self.vdur.vim_id))
+
+ cache.destroy_entry(self.vdur.id)
+
+ self.assertFalse(cache.contains_vdur_id(self.vdur.id))
+ self.assertFalse(cache.contains_vim_id(self.vdur.vim_id))
+
+
+class TestNfviMetrics(unittest.TestCase):
+ class Plugin(object):
+ def nfvi_metrics_available(self, cloud_account):
+ return True
+
+ def nfvi_metrics(self, account, vim_id):
+ metrics = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_NfviMetrics()
+ metrics.vcpu.utilization = 0.5
+ return None, metrics
+
+ def setUp(self):
+ self.loop = asyncio.new_event_loop()
+ self.account = RwcalYang.CloudAccount(
+ name='test-cloud-account',
+ account_type="mock",
+ )
+
+ self.plugin = TestNfviMetrics.Plugin()
+ self.logger = logging.getLogger('test-logger')
+
+ self.vdur = make_vdur()
+ self.vdur.vm_flavor.vcpu_count = 4
+ self.vdur.vm_flavor.memory_mb = 100
+ self.vdur.vm_flavor.storage_gb = 2
+ self.vdur.vim_id = 'test-vim-id'
+
+ def test_update(self):
+ nfvi_metrics = NfviMetrics(
+ self.logger,
+ self.loop,
+ self.account,
+ self.plugin,
+ self.vdur,
+ )
+
+ # Reduce the SAMPLE_INTERVAL so that the test does not take a long time
+ nfvi_metrics.SAMPLE_INTERVAL = 1
+
+ # The metrics have never been retrieved so they should be updated
+ self.assertTrue(nfvi_metrics.should_update())
+
+ # The metrics return will be empty because the cache version is empty.
+ # However, this trigger an update to retrieve metrics from the plugin.
+ metrics = nfvi_metrics.retrieve()
+ self.assertEqual(metrics.vcpu.utilization, 0.0)
+
+ # An update has been trigger by the retrieve call so additional updates
+ # should not happen
+ self.assertFalse(nfvi_metrics.should_update())
+ self.assertFalse(nfvi_metrics._updating.done())
+
+ # Allow the event loop to run until the update is complete
+ @asyncio.coroutine
+ @wait_for_pending_tasks(self.loop)
+ def wait_for_update():
+ yield from asyncio.wait_for(
+ nfvi_metrics._updating,
+ timeout=2,
+ loop=self.loop,
+ )
+
+ self.loop.run_until_complete(wait_for_update())
+
+ # Check that we have a new metrics object
+ metrics = nfvi_metrics.retrieve()
+ self.assertEqual(metrics.vcpu.utilization, 0.5)
+
+ # We have just updated the metrics so it should be unnecessary to update
+ # right now
+ self.assertFalse(nfvi_metrics.should_update())
+ self.assertTrue(nfvi_metrics._updating.done())
+
+ # Wait an amount of time equal to the SAMPLE_INTERVAL. This ensures
+ # that the metrics that were just retrieved become stale...
+ time.sleep(NfviMetrics.SAMPLE_INTERVAL)
+
+ # ...now it is time to update again
+ self.assertTrue(nfvi_metrics.should_update())
+
+
+class TestNfviInterface(unittest.TestCase):
+ class NfviPluginImpl(object):
+ def __init__(self):
+ self._alarms = set()
+
+ def nfvi_metrics(self, account, vm_id):
+ return rwmon.NfviMetrics()
+
+ def nfvi_metrics_available(self, account):
+ return True
+
+ def alarm_create(self, account, vim_id, alarm):
+ alarm.alarm_id = str(uuid.uuid4())
+ self._alarms.add(alarm.alarm_id)
+ return RwTypes.RwStatus.SUCCESS
+
+ def alarm_delete(self, account, alarm_id):
+ self._alarms.remove(alarm_id)
+ return RwTypes.RwStatus.SUCCESS
+
+ def setUp(self):
+ self.loop = asyncio.new_event_loop()
+ self.logger = logging.getLogger('test-logger')
+
+ self.account = RwcalYang.CloudAccount(
+ name='test-cloud-account',
+ account_type="mock",
+ )
+
+ # Define the VDUR to avoid division by zero
+ self.vdur = make_vdur()
+ self.vdur.vm_flavor.vcpu_count = 4
+ self.vdur.vm_flavor.memory_mb = 100
+ self.vdur.vm_flavor.storage_gb = 2
+ self.vdur.vim_id = 'test-vim-id'
+
+ self.plugin_manager = NfviMetricsPluginManager(self.logger)
+ self.plugin_manager.register(self.account, "mock")
+
+ self.cache = NfviMetricsCache(
+ self.logger,
+ self.loop,
+ self.plugin_manager,
+ )
+
+ self.nfvi_interface = NfviInterface(
+ self.loop,
+ self.logger,
+ self.plugin_manager,
+ self.cache
+ )
+
+ def test_nfvi_metrics_available(self):
+ self.assertTrue(self.nfvi_interface.nfvi_metrics_available(self.account))
+
+ def test_retrieve(self):
+ pass
+
+ def test_alarm_create_and_destroy(self):
+ alarm = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_Alarms()
+ alarm.name = "test-alarm"
+ alarm.description = "test-description"
+ alarm.vdur_id = "test-vdur-id"
+ alarm.metric = "CPU_UTILIZATION"
+ alarm.statistic = "MINIMUM"
+ alarm.operation = "GT"
+ alarm.value = 0.1
+ alarm.period = 10
+ alarm.evaluations = 1
+
+ plugin_impl = TestNfviInterface.NfviPluginImpl()
+ plugin = self.plugin_manager.plugin(self.account.name)
+ plugin.set_impl(plugin_impl)
+
+ self.assertEqual(len(plugin_impl._alarms), 0)
+
+ @asyncio.coroutine
+ @wait_for_pending_tasks(self.loop)
+ def wait_for_create():
+ coro = self.nfvi_interface.alarm_create(
+ self.account,
+ "test-vim-id",
+ alarm,
+ )
+ yield from asyncio.wait_for(
+ coro,
+ timeout=2,
+ loop=self.loop,
+ )
+
+ self.loop.run_until_complete(wait_for_create())
+ self.assertEqual(len(plugin_impl._alarms), 1)
+ self.assertTrue(alarm.alarm_id is not None)
+
+ @asyncio.coroutine
+ @wait_for_pending_tasks(self.loop)
+ def wait_for_destroy():
+ coro = self.nfvi_interface.alarm_destroy(
+ self.account,
+ alarm.alarm_id,
+ )
+ yield from asyncio.wait_for(
+ coro,
+ timeout=2,
+ loop=self.loop,
+ )
+
+ self.loop.run_until_complete(wait_for_destroy())
+ self.assertEqual(len(plugin_impl._alarms), 0)
+
+
+class TestVdurNfviMetrics(unittest.TestCase):
+ def setUp(self):
+ # Reduce the sample interval so that test run quickly
+ NfviMetrics.SAMPLE_INTERVAL = 0.1
+
+ # Create a mock plugin to define the metrics retrieved. The plugin will
+ # return a VCPU utilization of 0.5.
+ class MockPlugin(object):
+ def __init__(self):
+ self.metrics = RwmonYang.NfviMetrics()
+
+ def nfvi_metrics(self, account, vim_id):
+ self.metrics.vcpu.utilization = 0.5
+ return self.metrics
+
+ self.loop = asyncio.get_event_loop()
+ self.logger = logging.getLogger('test-logger')
+
+ self.account = RwcalYang.CloudAccount(
+ name='test-cloud-account',
+ account_type="mock",
+ )
+
+ # Define the VDUR to avoid division by zero
+ vdur = make_vdur()
+ vdur.vm_flavor.vcpu_count = 4
+ vdur.vm_flavor.memory_mb = 100
+ vdur.vm_flavor.storage_gb = 2
+ vdur.vim_id = 'test-vim-id'
+
+ # Instantiate the mock plugin
+ self.plugin_manager = NfviMetricsPluginManager(self.logger)
+ self.plugin_manager.register(self.account, "mock")
+
+ self.plugin = self.plugin_manager.plugin(self.account.name)
+ self.plugin.set_impl(MockPlugin())
+
+ self.cache = NfviMetricsCache(
+ self.logger,
+ self.loop,
+ self.plugin_manager,
+ )
+
+ self.manager = NfviInterface(
+ self.loop,
+ self.logger,
+ self.plugin_manager,
+ self.cache,
+ )
+
+ self.metrics = NfviMetrics(
+ self.logger,
+ self.loop,
+ self.account,
+ self.plugin,
+ vdur,
+ )
+
+ def test_retrieval(self):
+ metrics_a = None
+ metrics_b = None
+
+ # Define a coroutine that can be added to the asyncio event loop
+ @asyncio.coroutine
+ def update():
+ # Output from the metrics calls with be written to these nonlocal
+ # variables
+ nonlocal metrics_a
+ nonlocal metrics_b
+
+ # This first call will return the current metrics values and
+ # schedule a request to the NFVI to retrieve metrics from the data
+ # source. All metrics will be zero at this point.
+ metrics_a = self.metrics.retrieve()
+
+ # Wait for the scheduled update to take effect
+ yield from asyncio.sleep(0.2, loop=self.loop)
+
+ # Retrieve the updated metrics
+ metrics_b = self.metrics.retrieve()
+
+ self.loop.run_until_complete(update())
+
+ # Check that the metrics returned indicate that the plugin was queried
+ # and returned the appropriate value, i.e. 0.5 utilization
+ self.assertEqual(0.0, metrics_a.vcpu.utilization)
+ self.assertEqual(0.5, metrics_b.vcpu.utilization)
+
+
+class TestNfviMetricsPluginManager(unittest.TestCase):
+ def setUp(self):
+ self.logger = logging.getLogger('test-logger')
+ self.plugins = NfviMetricsPluginManager(self.logger)
+ self.account = RwcalYang.CloudAccount(
+ name='test-cloud-account',
+ account_type="mock",
+ )
+
+ def test_mock_plugin(self):
+ # Register an account name with a mock plugin. If successful, the
+ # plugin manager should return a non-None object.
+ self.plugins.register(self.account, 'mock')
+ self.assertIsNotNone(self.plugins.plugin(self.account.name))
+
+ # Now unregister the cloud account
+ self.plugins.unregister(self.account.name)
+
+ # Trying to retrieve a plugin for a cloud account that has not been
+ # registered with the manager is expected to raise an exception.
+ with self.assertRaises(KeyError):
+ self.plugins.plugin(self.account.name)
+
+ def test_multiple_registration(self):
+ self.plugins.register(self.account, 'mock')
+
+ # Attempting to register the account with another type of plugin will
+ # also cause an exception to be raised.
+ with self.assertRaises(AccountAlreadyRegisteredError):
+ self.plugins.register(self.account, 'mock')
+
+ # Attempting to register the account with 'openstack' again with cause
+ # an exception to be raised.
+ with self.assertRaises(AccountAlreadyRegisteredError):
+ self.plugins.register(self.account, 'openstack')
+
+ def test_unsupported_plugin(self):
+ # If an attempt is made to register a cloud account with an unknown
+ # type of plugin, a PluginNotSupportedError should be raised.
+ with self.assertRaises(PluginNotSupportedError):
+ self.plugins.register(self.account, 'unsupported-plugin')
+
+ def test_anavailable_plugin(self):
+ # Create a factory that always raises PluginUnavailableError
+ class UnavailablePluginFactory(PluginFactory):
+ PLUGIN_NAME = "unavailable-plugin"
+
+ def create(self, cloud_account):
+ raise PluginUnavailableError()
+
+ # Register the factory
+ self.plugins.register_plugin_factory(UnavailablePluginFactory())
+
+ # Ensure that the correct exception propagates when the cloud account
+ # is registered.
+ with self.assertRaises(PluginUnavailableError):
+ self.plugins.register(self.account, "unavailable-plugin")
+
+
+class TestMonitor(unittest.TestCase):
+ """
+ The Monitor class is the implementation that is called by the
+ MonitorTasklet. It provides the unified interface for controlling and
+ querying the monitoring functionality.
+ """
+
+ def setUp(self):
+ # Reduce the sample interval so that test run quickly
+ NfviMetrics.SAMPLE_INTERVAL = 0.1
+
+ self.loop = asyncio.get_event_loop()
+ self.logger = logging.getLogger('test-logger')
+ self.config = InstanceConfiguration()
+ self.monitor = Monitor(self.loop, self.logger, self.config)
+
+ self.account = RwcalYang.CloudAccount(
+ name='test-cloud-account',
+ account_type="mock",
+ )
+
+ def test_instance_config(self):
+ """
+ Configuration data for an instance is pass to the Monitor when it is
+ created. The data is passed in the InstanceConfiguration object. This
+ object is typically shared between the tasklet and the monitor, and
+ provides a way for the tasklet to update the configuration of the
+ monitor.
+ """
+ self.assertTrue(hasattr(self.monitor._config, "polling_period"))
+ self.assertTrue(hasattr(self.monitor._config, "min_cache_lifetime"))
+ self.assertTrue(hasattr(self.monitor._config, "max_polling_frequency"))
+
+ def test_monitor_cloud_accounts(self):
+ """
+ This test checks the cloud accounts are correctly added and deleted,
+ and that the correct exceptions are raised on duplicate adds or
+ deletes.
+
+ """
+ # Add the cloud account to the monitor
+ self.monitor.add_cloud_account(self.account)
+ self.assertIn(self.account.name, self.monitor._cloud_accounts)
+
+ # Add the cloud account to the monitor again
+ with self.assertRaises(AccountAlreadyRegisteredError):
+ self.monitor.add_cloud_account(self.account)
+
+ # Delete the cloud account
+ self.monitor.remove_cloud_account(self.account.name)
+ self.assertNotIn(self.account.name, self.monitor._cloud_accounts)
+
+ # Delete the cloud account again
+ with self.assertRaises(UnknownAccountError):
+ self.monitor.remove_cloud_account(self.account.name)
+
+ def test_monitor_cloud_accounts_illegal_removal(self):
+ """
+ A cloud account may not be removed while there are plugins or records
+ that are associated with it. Attempting to delete such a cloud account
+ will raise an exception.
+ """
+ # Add the cloud account to the monitor
+ self.monitor.add_cloud_account(self.account)
+
+ # Create a VNFR associated with the cloud account
+ vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr()
+ vnfr.cloud_account = self.account.name
+ vnfr.id = 'test-vnfr-id'
+
+ # Add a VDUR to the VNFR
+ vdur = vnfr.vdur.add()
+ vdur.vim_id = 'test-vim-id-1'
+ vdur.id = 'test-vdur-id-1'
+
+ # Now add the VNFR to the monitor
+ self.monitor.add_vnfr(vnfr)
+
+ # Check that the monitor contains the VNFR, VDUR, and metrics
+ self.assertTrue(self.monitor.is_registered_vdur(vdur.id))
+ self.assertTrue(self.monitor.is_registered_vnfr(vnfr.id))
+ self.assertEqual(1, len(self.monitor.metrics))
+
+ # Deleting the cloud account now should raise an exception because the
+ # VNFR and VDUR are associated with the cloud account.
+ with self.assertRaises(AccountInUseError):
+ self.monitor.remove_cloud_account(self.account.name)
+
+ # Now remove the VNFR from the monitor
+ self.monitor.remove_vnfr(vnfr.id)
+ self.assertFalse(self.monitor.is_registered_vdur(vdur.id))
+ self.assertFalse(self.monitor.is_registered_vnfr(vnfr.id))
+ self.assertEqual(0, len(self.monitor.metrics))
+
+ # Safely delete the cloud account
+ self.monitor.remove_cloud_account(self.account.name)
+
+ def test_vdur_registration(self):
+ """
+ When a VDUR is registered with the Monitor it is registered with the
+ VdurNfviMetricsManager. Thus it is assigned a plugin that can be used
+ to retrieve the NFVI metrics associated with the VDU.
+ """
+ # Define the VDUR to be registered
+ vdur = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur()
+ vdur.vm_flavor.vcpu_count = 4
+ vdur.vm_flavor.memory_mb = 100
+ vdur.vm_flavor.storage_gb = 2
+ vdur.vim_id = 'test-vim-id'
+ vdur.id = 'test-vdur-id'
+
+ # Before registering the VDUR, the cloud account needs to be added to
+ # the monitor.
+ self.monitor.add_cloud_account(self.account)
+
+ # Register the VDUR with the monitor
+ self.monitor.add_vdur(self.account, vdur)
+ self.assertTrue(self.monitor.is_registered_vdur(vdur.id))
+
+ # Check that the VDUR has been added to the metrics cache
+ self.assertTrue(self.monitor.cache.contains_vdur_id(vdur.id))
+
+ # Unregister the VDUR
+ self.monitor.remove_vdur(vdur.id)
+ self.assertFalse(self.monitor.is_registered_vdur(vdur.id))
+
+ # Check that the VDUR has been removed from the metrics cache
+ self.assertFalse(self.monitor.cache.contains_vdur_id(vdur.id))
+
+ def test_vnfr_add_update_delete(self):
+ """
+ When a VNFR is added to the Monitor a record is created of the
+ relationship between the VNFR and any VDURs that it contains. Each VDUR
+ is then registered with the VdurNfviMetricsManager. A VNFR can also be
+ updated so that it contains more of less VDURs. Any VDURs that are
+ added to the VNFR are registered with the NdurNfviMetricsManager, and
+ any that are removed are unregistered. When a VNFR is deleted, all of
+ the VDURs contained in the VNFR are unregistered.
+ """
+ # Define the VDUR to be registered
+ vdur = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur()
+ vdur.vim_id = 'test-vim-id-1'
+ vdur.id = 'test-vdur-id-1'
+
+ vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr()
+ vnfr.cloud_account = self.account.name
+ vnfr.id = 'test-vnfr-id'
+
+ vnfr.vdur.append(vdur)
+
+ self.monitor.add_cloud_account(self.account)
+
+ # Add the VNFR to the monitor. This will also register VDURs contained
+ # in the VNFR with the monitor.
+ self.monitor.add_vnfr(vnfr)
+ self.assertTrue(self.monitor.is_registered_vdur('test-vdur-id-1'))
+
+ # Add another VDUR to the VNFR and update the monitor. Both VDURs
+ # should now be registered
+ vdur = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur()
+ vdur.vim_id = 'test-vim-id-2'
+ vdur.id = 'test-vdur-id-2'
+
+ vnfr.vdur.append(vdur)
+
+ self.monitor.update_vnfr(vnfr)
+ self.assertTrue(self.monitor.is_registered_vdur('test-vdur-id-1'))
+ self.assertTrue(self.monitor.is_registered_vdur('test-vdur-id-2'))
+
+ # Delete the VNFR from the monitor. This should remove the VNFR and all
+ # of the associated VDURs from the monitor.
+ self.monitor.remove_vnfr(vnfr.id)
+ self.assertFalse(self.monitor.is_registered_vnfr('test-vnfr-id'))
+ self.assertFalse(self.monitor.is_registered_vdur('test-vdur-id-1'))
+ self.assertFalse(self.monitor.is_registered_vdur('test-vdur-id-2'))
+
+ with self.assertRaises(KeyError):
+ self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
+
+ with self.assertRaises(KeyError):
+ self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
+
+ def test_complete(self):
+ """
+ This test simulates the addition of a VNFR to the Monitor (along with
+ updates), and retrieves NFVI metrics from the VDUR. The VNFR is then
+ deleted, which should result in a cleanup of all the data in the
+ Monitor.
+ """
+ # Create the VNFR
+ vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr()
+ vnfr.cloud_account = self.account.name
+ vnfr.id = 'test-vnfr-id'
+
+ # Create 2 VDURs
+ vdur = vnfr.vdur.add()
+ vdur.id = 'test-vdur-id-1'
+ vdur.vim_id = 'test-vim-id-1'
+ vdur.vm_flavor.vcpu_count = 4
+ vdur.vm_flavor.memory_mb = 100
+ vdur.vm_flavor.storage_gb = 2
+
+ vdur = vnfr.vdur.add()
+ vdur.id = 'test-vdur-id-2'
+ vdur.vim_id = 'test-vim-id-2'
+ vdur.vm_flavor.vcpu_count = 4
+ vdur.vm_flavor.memory_mb = 100
+ vdur.vm_flavor.storage_gb = 2
+
+ class MockPlugin(object):
+ def __init__(self):
+ self._metrics = dict()
+ self._metrics['test-vim-id-1'] = RwmonYang.NfviMetrics()
+ self._metrics['test-vim-id-2'] = RwmonYang.NfviMetrics()
+
+ def nfvi_metrics(self, account, vim_id):
+ metrics = self._metrics[vim_id]
+
+ if vim_id == 'test-vim-id-1':
+ metrics.memory.used += 1000
+ else:
+ metrics.memory.used += 2000
+
+ return metrics
+
+ class MockFactory(PluginFactory):
+ PLUGIN_NAME = "mock"
+
+ def create(self, cloud_account):
+ plugin = rw_peas.PeasPlugin("rwmon_mock", 'RwMon-1.0')
+ impl = plugin.get_interface("Monitoring")
+ impl.set_impl(MockPlugin())
+ return impl
+
+ # Modify the mock plugin factory
+ self.monitor._nfvi_plugins._factories["mock"] = MockFactory()
+
+ # Add the cloud account the monitor
+ self.monitor.add_cloud_account(self.account)
+
+ # Add the VNFR to the monitor.
+ self.monitor.add_vnfr(vnfr)
+
+ @wait_for_pending_tasks(self.loop)
+ @asyncio.coroutine
+ def call1():
+ # call #1 (time = 0.00s)
+ # The metrics for these VDURs have not been populated yet so a
+ # default metrics object (all zeros) is returned, and a request is
+ # scheduled with the data source to retrieve the metrics.
+ metrics1 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
+ metrics2 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
+
+ self.assertEqual(0, metrics1.memory.used)
+ self.assertEqual(0, metrics2.memory.used)
+
+ self.loop.run_until_complete(call1())
+
+ @wait_for_pending_tasks(self.loop)
+ @asyncio.coroutine
+ def call2():
+ # call #2 (wait 0.05s)
+ # The metrics have been populated with data from the data source
+ # due to the request made during call #1.
+ yield from asyncio.sleep(0.05)
+
+ metrics1 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
+ metrics2 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
+
+ self.assertEqual(1000, metrics1.memory.used)
+ self.assertEqual(2000, metrics2.memory.used)
+
+ self.loop.run_until_complete(call2())
+
+ @wait_for_pending_tasks(self.loop)
+ @asyncio.coroutine
+ def call3():
+ # call #3 (wait 0.50s)
+ # This call exceeds 0.1s (the sample interval of the plugin)
+ # from when the data was retrieved. The cached metrics are
+ # immediately returned, but a request is made to the data source to
+ # refresh these metrics.
+ yield from asyncio.sleep(0.10)
+
+ metrics1 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
+ metrics2 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
+
+ self.assertEqual(1000, metrics1.memory.used)
+ self.assertEqual(2000, metrics2.memory.used)
+
+ self.loop.run_until_complete(call3())
+
+ @wait_for_pending_tasks(self.loop)
+ @asyncio.coroutine
+ def call4():
+ # call #4 (wait 1.00s)
+ # The metrics retrieved differ from those in call #3 because the
+ # cached metrics have been updated.
+ yield from asyncio.sleep(0.10)
+ metrics1 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
+ metrics2 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
+
+ self.assertEqual(2000, metrics1.memory.used)
+ self.assertEqual(4000, metrics2.memory.used)
+
+ self.loop.run_until_complete(call4())
+
+
+def main(argv=sys.argv[1:]):
+ logging.basicConfig(format='TEST %(message)s')
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-v', '--verbose', action='store_true')
+
+ args = parser.parse_args(argv)
+
+ # Set the global logging level
+ logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
+
+ # Set the logger in this test to use a null handler
+ logging.getLogger('test-logger').addHandler(logging.NullHandler())
+
+ # The unittest framework requires a program name, so use the name of this
+ # file instead (we do not want to have to pass a fake program name to main
+ # when this is called from the interpreter).
+ unittest.main(argv=[__file__] + argv,
+ testRunner=xmlrunner.XMLTestRunner(
+ output=os.environ["RIFT_MODULE_TEST"]))
+
+if __name__ == '__main__':
+ main()