RIFT OSM R1 Initial Submission
[osm/SO.git] / rwlaunchpad / test / utest_rwmonitor.py
diff --git a/rwlaunchpad/test/utest_rwmonitor.py b/rwlaunchpad/test/utest_rwmonitor.py
new file mode 100755 (executable)
index 0000000..46c33b3
--- /dev/null
@@ -0,0 +1,873 @@
+#!/usr/bin/env python3
+
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+
+import argparse
+import asyncio
+import concurrent.futures
+import logging
+import os
+import sys
+import time
+import unittest
+import uuid
+import xmlrunner
+
+import gi
+gi.require_version('NsrYang', '1.0')
+gi.require_version('RwcalYang', '1.0')
+gi.require_version('RwmonYang', '1.0')
+gi.require_version('RwVnfrYang', '1.0')
+gi.require_version('RwTypes', '1.0')
+gi.require_version('RwMon', '1.0')
+
+from gi.repository import (
+        NsrYang,
+        RwTypes,
+        RwVnfrYang,
+        RwcalYang,
+        RwmonYang,
+        VnfrYang,
+        )
+
+from rift.tasklets.rwmonitor.core import (
+        AccountAlreadyRegisteredError,
+        AccountInUseError,
+        InstanceConfiguration,
+        Monitor,
+        NfviInterface,
+        NfviMetrics,
+        NfviMetricsCache,
+        NfviMetricsPluginManager,
+        PluginFactory,
+        PluginNotSupportedError,
+        PluginUnavailableError,
+        UnknownAccountError,
+        )
+import rw_peas
+
+
+class wait_for_pending_tasks(object):
+    """
+    This class defines a decorator that can be used to ensure that any asyncio
+    tasks created as a side-effect of coroutine are allowed to come to
+    completion.
+    """
+
+    def __init__(self, loop, timeout=1):
+        self.loop = loop
+        self.timeout = timeout
+
+    def __call__(self, coro):
+        @asyncio.coroutine
+        def impl():
+            original = self.pending_tasks()
+            result = yield from coro()
+
+            current = self.pending_tasks()
+            remaining = current - original
+
+            if remaining:
+                yield from asyncio.wait(
+                        remaining,
+                        timeout=self.timeout,
+                        loop=self.loop,
+                        )
+
+            return result
+
+        return impl
+
+    def pending_tasks(self):
+        return {t for t in asyncio.Task.all_tasks(loop=self.loop) if not t.done()}
+
+
+class MockTasklet(object):
+    def __init__(self, dts, log, loop, records):
+        self.dts = dts
+        self.log = log
+        self.loop = loop
+        self.records = records
+        self.polling_period = 0
+        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=16)
+
+
+def make_nsr(ns_instance_config_ref=str(uuid.uuid4())):
+    nsr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr()
+    nsr.ns_instance_config_ref = ns_instance_config_ref
+    return nsr
+
+def make_vnfr(id=str(uuid.uuid4())):
+    vnfr = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr()
+    vnfr.id = id
+    return vnfr
+
+def make_vdur(id=str(uuid.uuid4()), vim_id=str(uuid.uuid4())):
+    vdur = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur()
+    vdur.id = id
+    vdur.vim_id = vim_id
+    return vdur
+
+
+class TestNfviMetricsCache(unittest.TestCase):
+    class Plugin(object):
+        def nfvi_metrics_available(self, cloud_account):
+            return True
+
+        def nfvi_metrics(self, account, vim_id):
+            metrics = RwmonYang.NfviMetrics()
+            metrics.vcpu.utilization = 0.5
+            return metrics
+
+    def setUp(self):
+        self.loop = asyncio.new_event_loop()
+        self.logger = logging.getLogger('test-logger')
+
+        self.account = RwcalYang.CloudAccount(
+                name='test-cloud-account',
+                account_type="mock",
+                )
+
+        self.plugin_manager = NfviMetricsPluginManager(self.logger)
+        self.plugin_manager.register(self.account, "mock")
+
+        mock = self.plugin_manager.plugin(self.account.name)
+        mock.set_impl(TestNfviMetricsCache.Plugin())
+
+        self.vdur = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur()
+        self.vdur.id = "test-vdur-id"
+        self.vdur.vim_id = "test-vim-id"
+        self.vdur.vm_flavor.vcpu_count = 4
+        self.vdur.vm_flavor.memory_mb = 1
+        self.vdur.vm_flavor.storage_gb = 1
+
+    def test_create_destroy_entry(self):
+        cache = NfviMetricsCache(self.logger, self.loop, self.plugin_manager)
+        self.assertEqual(len(cache._nfvi_metrics), 0)
+
+        cache.create_entry(self.account, self.vdur)
+        self.assertEqual(len(cache._nfvi_metrics), 1)
+
+        cache.destroy_entry(self.vdur.id)
+        self.assertEqual(len(cache._nfvi_metrics), 0)
+
+    def test_retrieve(self):
+        NfviMetrics.SAMPLE_INTERVAL = 1
+
+        cache = NfviMetricsCache(self.logger, self.loop, self.plugin_manager)
+        cache.create_entry(self.account, self.vdur)
+
+        @wait_for_pending_tasks(self.loop)
+        @asyncio.coroutine
+        def retrieve_metrics():
+            metrics = cache.retrieve("test-vim-id")
+            self.assertEqual(metrics.vcpu.utilization, 0.0)
+
+            yield from asyncio.sleep(NfviMetrics.SAMPLE_INTERVAL, loop=self.loop)
+
+            metrics = cache.retrieve("test-vim-id")
+            self.assertEqual(metrics.vcpu.utilization, 0.5)
+
+        self.loop.run_until_complete(retrieve_metrics())
+
+    def test_id_mapping(self):
+        cache = NfviMetricsCache(self.logger, self.loop, self.plugin_manager)
+
+        cache.create_entry(self.account, self.vdur)
+
+        self.assertEqual(cache.to_vim_id(self.vdur.id), self.vdur.vim_id)
+        self.assertEqual(cache.to_vdur_id(self.vdur.vim_id), self.vdur.id)
+        self.assertTrue(cache.contains_vdur_id(self.vdur.id))
+        self.assertTrue(cache.contains_vim_id(self.vdur.vim_id))
+
+        cache.destroy_entry(self.vdur.id)
+
+        self.assertFalse(cache.contains_vdur_id(self.vdur.id))
+        self.assertFalse(cache.contains_vim_id(self.vdur.vim_id))
+
+
+class TestNfviMetrics(unittest.TestCase):
+    class Plugin(object):
+        def nfvi_metrics_available(self, cloud_account):
+            return True
+
+        def nfvi_metrics(self, account, vim_id):
+            metrics = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_NfviMetrics()
+            metrics.vcpu.utilization = 0.5
+            return None, metrics
+
+    def setUp(self):
+        self.loop = asyncio.new_event_loop()
+        self.account = RwcalYang.CloudAccount(
+                name='test-cloud-account',
+                account_type="mock",
+                )
+
+        self.plugin = TestNfviMetrics.Plugin()
+        self.logger = logging.getLogger('test-logger')
+
+        self.vdur = make_vdur()
+        self.vdur.vm_flavor.vcpu_count = 4
+        self.vdur.vm_flavor.memory_mb = 100
+        self.vdur.vm_flavor.storage_gb = 2
+        self.vdur.vim_id = 'test-vim-id'
+
+    def test_update(self):
+        nfvi_metrics = NfviMetrics(
+                self.logger,
+                self.loop,
+                self.account,
+                self.plugin,
+                self.vdur,
+                )
+
+        # Reduce the SAMPLE_INTERVAL so that the test does not take a long time
+        nfvi_metrics.SAMPLE_INTERVAL = 1
+
+        # The metrics have never been retrieved so they should be updated
+        self.assertTrue(nfvi_metrics.should_update())
+
+        # The metrics return will be empty because the cache version is empty.
+        # However, this trigger an update to retrieve metrics from the plugin.
+        metrics = nfvi_metrics.retrieve()
+        self.assertEqual(metrics.vcpu.utilization, 0.0)
+
+        # An update has been trigger by the retrieve call so additional updates
+        # should not happen
+        self.assertFalse(nfvi_metrics.should_update())
+        self.assertFalse(nfvi_metrics._updating.done())
+
+        # Allow the event loop to run until the update is complete
+        @asyncio.coroutine
+        @wait_for_pending_tasks(self.loop)
+        def wait_for_update():
+            yield from asyncio.wait_for(
+                    nfvi_metrics._updating,
+                    timeout=2,
+                    loop=self.loop,
+                    )
+
+        self.loop.run_until_complete(wait_for_update())
+
+        # Check that we have a new metrics object
+        metrics = nfvi_metrics.retrieve()
+        self.assertEqual(metrics.vcpu.utilization, 0.5)
+
+        # We have just updated the metrics so it should be unnecessary to update
+        # right now
+        self.assertFalse(nfvi_metrics.should_update())
+        self.assertTrue(nfvi_metrics._updating.done())
+
+        # Wait an amount of time equal to the SAMPLE_INTERVAL. This ensures
+        # that the metrics that were just retrieved become stale...
+        time.sleep(NfviMetrics.SAMPLE_INTERVAL)
+
+        # ...now it is time to update again
+        self.assertTrue(nfvi_metrics.should_update())
+
+
+class TestNfviInterface(unittest.TestCase):
+    class NfviPluginImpl(object):
+        def __init__(self):
+            self._alarms = set()
+
+        def nfvi_metrics(self, account, vm_id):
+            return rwmon.NfviMetrics()
+
+        def nfvi_metrics_available(self, account):
+            return True
+
+        def alarm_create(self, account, vim_id, alarm):
+            alarm.alarm_id = str(uuid.uuid4())
+            self._alarms.add(alarm.alarm_id)
+            return RwTypes.RwStatus.SUCCESS
+
+        def alarm_delete(self, account, alarm_id):
+            self._alarms.remove(alarm_id)
+            return RwTypes.RwStatus.SUCCESS
+
+    def setUp(self):
+        self.loop = asyncio.new_event_loop()
+        self.logger = logging.getLogger('test-logger')
+
+        self.account = RwcalYang.CloudAccount(
+                name='test-cloud-account',
+                account_type="mock",
+                )
+
+        # Define the VDUR to avoid division by zero
+        self.vdur = make_vdur()
+        self.vdur.vm_flavor.vcpu_count = 4
+        self.vdur.vm_flavor.memory_mb = 100
+        self.vdur.vm_flavor.storage_gb = 2
+        self.vdur.vim_id = 'test-vim-id'
+
+        self.plugin_manager = NfviMetricsPluginManager(self.logger)
+        self.plugin_manager.register(self.account, "mock")
+
+        self.cache = NfviMetricsCache(
+                self.logger,
+                self.loop,
+                self.plugin_manager,
+                )
+
+        self.nfvi_interface = NfviInterface(
+                self.loop,
+                self.logger,
+                self.plugin_manager,
+                self.cache
+                )
+
+    def test_nfvi_metrics_available(self):
+        self.assertTrue(self.nfvi_interface.nfvi_metrics_available(self.account))
+
+    def test_retrieve(self):
+        pass
+
+    def test_alarm_create_and_destroy(self):
+        alarm = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_Alarms()
+        alarm.name = "test-alarm"
+        alarm.description = "test-description"
+        alarm.vdur_id = "test-vdur-id"
+        alarm.metric = "CPU_UTILIZATION"
+        alarm.statistic = "MINIMUM"
+        alarm.operation = "GT"
+        alarm.value = 0.1
+        alarm.period = 10
+        alarm.evaluations = 1
+
+        plugin_impl = TestNfviInterface.NfviPluginImpl()
+        plugin = self.plugin_manager.plugin(self.account.name)
+        plugin.set_impl(plugin_impl)
+
+        self.assertEqual(len(plugin_impl._alarms), 0)
+
+        @asyncio.coroutine
+        @wait_for_pending_tasks(self.loop)
+        def wait_for_create():
+            coro = self.nfvi_interface.alarm_create(
+                    self.account,
+                    "test-vim-id",
+                    alarm,
+                    )
+            yield from asyncio.wait_for(
+                    coro,
+                    timeout=2,
+                    loop=self.loop,
+                    )
+
+        self.loop.run_until_complete(wait_for_create())
+        self.assertEqual(len(plugin_impl._alarms), 1)
+        self.assertTrue(alarm.alarm_id is not None)
+
+        @asyncio.coroutine
+        @wait_for_pending_tasks(self.loop)
+        def wait_for_destroy():
+            coro = self.nfvi_interface.alarm_destroy(
+                    self.account,
+                    alarm.alarm_id,
+                    )
+            yield from asyncio.wait_for(
+                    coro,
+                    timeout=2,
+                    loop=self.loop,
+                    )
+
+        self.loop.run_until_complete(wait_for_destroy())
+        self.assertEqual(len(plugin_impl._alarms), 0)
+
+
+class TestVdurNfviMetrics(unittest.TestCase):
+    def setUp(self):
+        # Reduce the sample interval so that test run quickly
+        NfviMetrics.SAMPLE_INTERVAL = 0.1
+
+        # Create a mock plugin to define the metrics retrieved. The plugin will
+        # return a VCPU utilization of 0.5.
+        class MockPlugin(object):
+            def __init__(self):
+                self.metrics = RwmonYang.NfviMetrics()
+
+            def nfvi_metrics(self, account, vim_id):
+                self.metrics.vcpu.utilization = 0.5
+                return self.metrics
+
+        self.loop = asyncio.get_event_loop()
+        self.logger = logging.getLogger('test-logger')
+
+        self.account = RwcalYang.CloudAccount(
+                name='test-cloud-account',
+                account_type="mock",
+                )
+
+        # Define the VDUR to avoid division by zero
+        vdur = make_vdur()
+        vdur.vm_flavor.vcpu_count = 4
+        vdur.vm_flavor.memory_mb = 100
+        vdur.vm_flavor.storage_gb = 2
+        vdur.vim_id = 'test-vim-id'
+
+        # Instantiate the mock plugin
+        self.plugin_manager = NfviMetricsPluginManager(self.logger)
+        self.plugin_manager.register(self.account, "mock")
+
+        self.plugin = self.plugin_manager.plugin(self.account.name)
+        self.plugin.set_impl(MockPlugin())
+
+        self.cache = NfviMetricsCache(
+                self.logger,
+                self.loop,
+                self.plugin_manager,
+                )
+
+        self.manager = NfviInterface(
+                self.loop,
+                self.logger,
+                self.plugin_manager,
+                self.cache,
+                )
+
+        self.metrics = NfviMetrics(
+                self.logger,
+                self.loop,
+                self.account,
+                self.plugin,
+                vdur,
+                )
+
+    def test_retrieval(self):
+        metrics_a = None
+        metrics_b = None
+
+        # Define a coroutine that can be added to the asyncio event loop
+        @asyncio.coroutine
+        def update():
+            # Output from the metrics calls with be written to these nonlocal
+            # variables
+            nonlocal metrics_a
+            nonlocal metrics_b
+
+            # This first call will return the current metrics values and
+            # schedule a request to the NFVI to retrieve metrics from the data
+            # source. All metrics will be zero at this point.
+            metrics_a = self.metrics.retrieve()
+
+            # Wait for the scheduled update to take effect
+            yield from asyncio.sleep(0.2, loop=self.loop)
+
+            # Retrieve the updated metrics
+            metrics_b = self.metrics.retrieve()
+
+        self.loop.run_until_complete(update())
+
+        # Check that the metrics returned indicate that the plugin was queried
+        # and returned the appropriate value, i.e. 0.5 utilization
+        self.assertEqual(0.0, metrics_a.vcpu.utilization)
+        self.assertEqual(0.5, metrics_b.vcpu.utilization)
+
+
+class TestNfviMetricsPluginManager(unittest.TestCase):
+    def setUp(self):
+        self.logger = logging.getLogger('test-logger')
+        self.plugins = NfviMetricsPluginManager(self.logger)
+        self.account = RwcalYang.CloudAccount(
+                name='test-cloud-account',
+                account_type="mock",
+                )
+
+    def test_mock_plugin(self):
+        # Register an account name with a mock plugin. If successful, the
+        # plugin manager should return a non-None object.
+        self.plugins.register(self.account, 'mock')
+        self.assertIsNotNone(self.plugins.plugin(self.account.name))
+
+        # Now unregister the cloud account
+        self.plugins.unregister(self.account.name)
+
+        # Trying to retrieve a plugin for a cloud account that has not been
+        # registered with the manager is expected to raise an exception.
+        with self.assertRaises(KeyError):
+            self.plugins.plugin(self.account.name)
+
+    def test_multiple_registration(self):
+        self.plugins.register(self.account, 'mock')
+
+        # Attempting to register the account with another type of plugin will
+        # also cause an exception to be raised.
+        with self.assertRaises(AccountAlreadyRegisteredError):
+            self.plugins.register(self.account, 'mock')
+
+        # Attempting to register the account with 'openstack' again with cause
+        # an exception to be raised.
+        with self.assertRaises(AccountAlreadyRegisteredError):
+            self.plugins.register(self.account, 'openstack')
+
+    def test_unsupported_plugin(self):
+        # If an attempt is made to register a cloud account with an unknown
+        # type of plugin, a PluginNotSupportedError should be raised.
+        with self.assertRaises(PluginNotSupportedError):
+            self.plugins.register(self.account, 'unsupported-plugin')
+
+    def test_anavailable_plugin(self):
+        # Create a factory that always raises PluginUnavailableError
+        class UnavailablePluginFactory(PluginFactory):
+            PLUGIN_NAME = "unavailable-plugin"
+
+            def create(self, cloud_account):
+                raise PluginUnavailableError()
+
+        # Register the factory
+        self.plugins.register_plugin_factory(UnavailablePluginFactory())
+
+        # Ensure that the correct exception propagates when the cloud account
+        # is registered.
+        with self.assertRaises(PluginUnavailableError):
+            self.plugins.register(self.account, "unavailable-plugin")
+
+
+class TestMonitor(unittest.TestCase):
+    """
+    The Monitor class is the implementation that is called by the
+    MonitorTasklet. It provides the unified interface for controlling and
+    querying the monitoring functionality.
+    """
+
+    def setUp(self):
+        # Reduce the sample interval so that test run quickly
+        NfviMetrics.SAMPLE_INTERVAL = 0.1
+
+        self.loop = asyncio.get_event_loop()
+        self.logger = logging.getLogger('test-logger')
+        self.config = InstanceConfiguration()
+        self.monitor = Monitor(self.loop, self.logger, self.config)
+
+        self.account = RwcalYang.CloudAccount(
+                name='test-cloud-account',
+                account_type="mock",
+                )
+
+    def test_instance_config(self):
+        """
+        Configuration data for an instance is pass to the Monitor when it is
+        created. The data is passed in the InstanceConfiguration object. This
+        object is typically shared between the tasklet and the monitor, and
+        provides a way for the tasklet to update the configuration of the
+        monitor.
+        """
+        self.assertTrue(hasattr(self.monitor._config, "polling_period"))
+        self.assertTrue(hasattr(self.monitor._config, "min_cache_lifetime"))
+        self.assertTrue(hasattr(self.monitor._config, "max_polling_frequency"))
+
+    def test_monitor_cloud_accounts(self):
+        """
+        This test checks the cloud accounts are correctly added and deleted,
+        and that the correct exceptions are raised on duplicate adds or
+        deletes.
+
+        """
+        # Add the cloud account to the monitor
+        self.monitor.add_cloud_account(self.account)
+        self.assertIn(self.account.name, self.monitor._cloud_accounts)
+
+        # Add the cloud account to the monitor again
+        with self.assertRaises(AccountAlreadyRegisteredError):
+            self.monitor.add_cloud_account(self.account)
+
+        # Delete the cloud account
+        self.monitor.remove_cloud_account(self.account.name)
+        self.assertNotIn(self.account.name, self.monitor._cloud_accounts)
+
+        # Delete the cloud account again
+        with self.assertRaises(UnknownAccountError):
+            self.monitor.remove_cloud_account(self.account.name)
+
+    def test_monitor_cloud_accounts_illegal_removal(self):
+        """
+        A cloud account may not be removed while there are plugins or records
+        that are associated with it. Attempting to delete such a cloud account
+        will raise an exception.
+        """
+        # Add the cloud account to the monitor
+        self.monitor.add_cloud_account(self.account)
+
+        # Create a VNFR associated with the cloud account
+        vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr()
+        vnfr.cloud_account = self.account.name
+        vnfr.id = 'test-vnfr-id'
+
+        # Add a VDUR to the VNFR
+        vdur = vnfr.vdur.add()
+        vdur.vim_id = 'test-vim-id-1'
+        vdur.id = 'test-vdur-id-1'
+
+        # Now add the VNFR to the monitor
+        self.monitor.add_vnfr(vnfr)
+
+        # Check that the monitor contains the VNFR, VDUR, and metrics
+        self.assertTrue(self.monitor.is_registered_vdur(vdur.id))
+        self.assertTrue(self.monitor.is_registered_vnfr(vnfr.id))
+        self.assertEqual(1, len(self.monitor.metrics))
+
+        # Deleting the cloud account now should raise an exception because the
+        # VNFR and VDUR are associated with the cloud account.
+        with self.assertRaises(AccountInUseError):
+            self.monitor.remove_cloud_account(self.account.name)
+
+        # Now remove the VNFR from the monitor
+        self.monitor.remove_vnfr(vnfr.id)
+        self.assertFalse(self.monitor.is_registered_vdur(vdur.id))
+        self.assertFalse(self.monitor.is_registered_vnfr(vnfr.id))
+        self.assertEqual(0, len(self.monitor.metrics))
+
+        # Safely delete the cloud account
+        self.monitor.remove_cloud_account(self.account.name)
+
+    def test_vdur_registration(self):
+        """
+        When a VDUR is registered with the Monitor it is registered with the
+        VdurNfviMetricsManager. Thus it is assigned a plugin that can be used
+        to retrieve the NFVI metrics associated with the VDU.
+        """
+        # Define the VDUR to be registered
+        vdur = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur()
+        vdur.vm_flavor.vcpu_count = 4
+        vdur.vm_flavor.memory_mb = 100
+        vdur.vm_flavor.storage_gb = 2
+        vdur.vim_id = 'test-vim-id'
+        vdur.id = 'test-vdur-id'
+
+        # Before registering the VDUR, the cloud account needs to be added to
+        # the monitor.
+        self.monitor.add_cloud_account(self.account)
+
+        # Register the VDUR with the monitor
+        self.monitor.add_vdur(self.account, vdur)
+        self.assertTrue(self.monitor.is_registered_vdur(vdur.id))
+
+        # Check that the VDUR has been added to the metrics cache
+        self.assertTrue(self.monitor.cache.contains_vdur_id(vdur.id))
+
+        # Unregister the VDUR
+        self.monitor.remove_vdur(vdur.id)
+        self.assertFalse(self.monitor.is_registered_vdur(vdur.id))
+
+        # Check that the VDUR has been removed from the metrics cache
+        self.assertFalse(self.monitor.cache.contains_vdur_id(vdur.id))
+
+    def test_vnfr_add_update_delete(self):
+        """
+        When a VNFR is added to the Monitor a record is created of the
+        relationship between the VNFR and any VDURs that it contains. Each VDUR
+        is then registered with the VdurNfviMetricsManager. A VNFR can also be
+        updated so that it contains more of less VDURs. Any VDURs that are
+        added to the VNFR are registered with the NdurNfviMetricsManager, and
+        any that are removed are unregistered. When a VNFR is deleted, all of
+        the VDURs contained in the VNFR are unregistered.
+        """
+        # Define the VDUR to be registered
+        vdur = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur()
+        vdur.vim_id = 'test-vim-id-1'
+        vdur.id = 'test-vdur-id-1'
+
+        vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr()
+        vnfr.cloud_account = self.account.name
+        vnfr.id = 'test-vnfr-id'
+
+        vnfr.vdur.append(vdur)
+
+        self.monitor.add_cloud_account(self.account)
+
+        # Add the VNFR to the monitor. This will also register VDURs contained
+        # in the VNFR with the monitor.
+        self.monitor.add_vnfr(vnfr)
+        self.assertTrue(self.monitor.is_registered_vdur('test-vdur-id-1'))
+
+        # Add another VDUR to the VNFR and update the monitor. Both VDURs
+        # should now be registered
+        vdur = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur()
+        vdur.vim_id = 'test-vim-id-2'
+        vdur.id = 'test-vdur-id-2'
+
+        vnfr.vdur.append(vdur)
+
+        self.monitor.update_vnfr(vnfr)
+        self.assertTrue(self.monitor.is_registered_vdur('test-vdur-id-1'))
+        self.assertTrue(self.monitor.is_registered_vdur('test-vdur-id-2'))
+
+        # Delete the VNFR from the monitor. This should remove the VNFR and all
+        # of the associated VDURs from the monitor.
+        self.monitor.remove_vnfr(vnfr.id)
+        self.assertFalse(self.monitor.is_registered_vnfr('test-vnfr-id'))
+        self.assertFalse(self.monitor.is_registered_vdur('test-vdur-id-1'))
+        self.assertFalse(self.monitor.is_registered_vdur('test-vdur-id-2'))
+
+        with self.assertRaises(KeyError):
+            self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
+
+        with self.assertRaises(KeyError):
+            self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
+
+    def test_complete(self):
+        """
+        This test simulates the addition of a VNFR to the Monitor (along with
+        updates), and retrieves NFVI metrics from the VDUR. The VNFR is then
+        deleted, which should result in a cleanup of all the data in the
+        Monitor.
+        """
+        # Create the VNFR
+        vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr()
+        vnfr.cloud_account = self.account.name
+        vnfr.id = 'test-vnfr-id'
+
+        # Create 2 VDURs
+        vdur = vnfr.vdur.add()
+        vdur.id = 'test-vdur-id-1'
+        vdur.vim_id = 'test-vim-id-1'
+        vdur.vm_flavor.vcpu_count = 4
+        vdur.vm_flavor.memory_mb = 100
+        vdur.vm_flavor.storage_gb = 2
+
+        vdur = vnfr.vdur.add()
+        vdur.id = 'test-vdur-id-2'
+        vdur.vim_id = 'test-vim-id-2'
+        vdur.vm_flavor.vcpu_count = 4
+        vdur.vm_flavor.memory_mb = 100
+        vdur.vm_flavor.storage_gb = 2
+
+        class MockPlugin(object):
+            def __init__(self):
+                self._metrics = dict()
+                self._metrics['test-vim-id-1'] = RwmonYang.NfviMetrics()
+                self._metrics['test-vim-id-2'] = RwmonYang.NfviMetrics()
+
+            def nfvi_metrics(self, account, vim_id):
+                metrics = self._metrics[vim_id]
+
+                if vim_id == 'test-vim-id-1':
+                    metrics.memory.used += 1000
+                else:
+                    metrics.memory.used += 2000
+
+                return metrics
+
+        class MockFactory(PluginFactory):
+            PLUGIN_NAME = "mock"
+
+            def create(self, cloud_account):
+                plugin = rw_peas.PeasPlugin("rwmon_mock", 'RwMon-1.0')
+                impl = plugin.get_interface("Monitoring")
+                impl.set_impl(MockPlugin())
+                return impl
+
+        # Modify the mock plugin factory
+        self.monitor._nfvi_plugins._factories["mock"] = MockFactory()
+
+        # Add the cloud account the monitor
+        self.monitor.add_cloud_account(self.account)
+
+        # Add the VNFR to the monitor.
+        self.monitor.add_vnfr(vnfr)
+
+        @wait_for_pending_tasks(self.loop)
+        @asyncio.coroutine
+        def call1():
+            # call #1 (time = 0.00s)
+            # The metrics for these VDURs have not been populated yet so a
+            # default metrics object (all zeros) is returned, and a request is
+            # scheduled with the data source to retrieve the metrics.
+            metrics1 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
+            metrics2 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
+
+            self.assertEqual(0, metrics1.memory.used)
+            self.assertEqual(0, metrics2.memory.used)
+
+        self.loop.run_until_complete(call1())
+
+        @wait_for_pending_tasks(self.loop)
+        @asyncio.coroutine
+        def call2():
+            # call #2 (wait 0.05s)
+            # The metrics have been populated with data from the data source
+            # due to the request made during call #1.
+            yield from asyncio.sleep(0.05)
+
+            metrics1 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
+            metrics2 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
+
+            self.assertEqual(1000, metrics1.memory.used)
+            self.assertEqual(2000, metrics2.memory.used)
+
+        self.loop.run_until_complete(call2())
+
+        @wait_for_pending_tasks(self.loop)
+        @asyncio.coroutine
+        def call3():
+            # call #3 (wait 0.50s)
+            # This call exceeds 0.1s (the sample interval of the plugin)
+            # from when the data was retrieved. The cached metrics are
+            # immediately returned, but a request is made to the data source to
+            # refresh these metrics.
+            yield from asyncio.sleep(0.10)
+
+            metrics1 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
+            metrics2 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
+
+            self.assertEqual(1000, metrics1.memory.used)
+            self.assertEqual(2000, metrics2.memory.used)
+
+        self.loop.run_until_complete(call3())
+
+        @wait_for_pending_tasks(self.loop)
+        @asyncio.coroutine
+        def call4():
+            # call #4 (wait 1.00s)
+            # The metrics retrieved differ from those in call #3 because the
+            # cached metrics have been updated.
+            yield from asyncio.sleep(0.10)
+            metrics1 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
+            metrics2 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
+
+            self.assertEqual(2000, metrics1.memory.used)
+            self.assertEqual(4000, metrics2.memory.used)
+
+        self.loop.run_until_complete(call4())
+
+
+def main(argv=sys.argv[1:]):
+    logging.basicConfig(format='TEST %(message)s')
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-v', '--verbose', action='store_true')
+
+    args = parser.parse_args(argv)
+
+    # Set the global logging level
+    logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
+
+    # Set the logger in this test to use a null handler
+    logging.getLogger('test-logger').addHandler(logging.NullHandler())
+
+    # The unittest framework requires a program name, so use the name of this
+    # file instead (we do not want to have to pass a fake program name to main
+    # when this is called from the interpreter).
+    unittest.main(argv=[__file__] + argv,
+            testRunner=xmlrunner.XMLTestRunner(
+                output=os.environ["RIFT_MODULE_TEST"]))
+
+if __name__ == '__main__':
+    main()