blob: b69815f45e7d22da441f73bef728ccb8790b59f3 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2016 RIFT.IO Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import asyncio
import concurrent.futures
import logging
import os
import sys
import time
import unittest
import uuid
import xmlrunner
import gi
gi.require_version('NsrYang', '1.0')
gi.require_version('RwcalYang', '1.0')
gi.require_version('RwmonYang', '1.0')
gi.require_version('RwVnfrYang', '1.0')
gi.require_version('RwTypes', '1.0')
gi.require_version('RwMon', '1.0')
from gi.repository import (
NsrYang,
RwTypes,
RwVnfrYang,
RwcalYang,
RwmonYang,
VnfrYang,
)
from rift.tasklets.rwmonitor.core import (
AccountAlreadyRegisteredError,
AccountInUseError,
InstanceConfiguration,
Monitor,
NfviInterface,
NfviMetrics,
NfviMetricsCache,
NfviMetricsPluginManager,
PluginFactory,
PluginNotSupportedError,
PluginUnavailableError,
UnknownAccountError,
)
import rw_peas
from rift.mano.utils.project import ManoProject, DEFAULT_PROJECT
class wait_for_pending_tasks(object):
"""
This class defines a decorator that can be used to ensure that any asyncio
tasks created as a side-effect of coroutine are allowed to come to
completion.
"""
def __init__(self, loop, timeout=1):
self.loop = loop
self.timeout = timeout
def __call__(self, coro):
@asyncio.coroutine
def impl():
original = self.pending_tasks()
result = yield from coro()
current = self.pending_tasks()
remaining = current - original
if remaining:
yield from asyncio.wait(
remaining,
timeout=self.timeout,
loop=self.loop,
)
return result
return impl
def pending_tasks(self):
return {t for t in asyncio.Task.all_tasks(loop=self.loop) if not t.done()}
class MockTasklet(object):
def __init__(self, dts, log, loop, records):
self.dts = dts
self.log = log
self.loop = loop
self.records = records
self.polling_period = 0
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=16)
def make_nsr(ns_instance_config_ref=str(uuid.uuid4())):
nsr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr()
nsr.ns_instance_config_ref = ns_instance_config_ref
return nsr
def make_vnfr(id=str(uuid.uuid4())):
vnfr = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr()
vnfr.id = id
return vnfr
def make_vdur(id=str(uuid.uuid4()), vim_id=str(uuid.uuid4())):
vdur = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur()
vdur.id = id
vdur.vim_id = vim_id
return vdur
class TestNfviMetricsCache(unittest.TestCase):
class Plugin(object):
def nfvi_metrics_available(self, cloud_account):
return True
def nfvi_metrics(self, account, vim_id):
metrics = RwmonYang.YangData_RwProject_Project_NfviMetrics()
metrics.vcpu.utilization = 0.5
return metrics
def setUp(self):
self.loop = asyncio.new_event_loop()
self.logger = logging.getLogger('test-logger')
self.account = RwcalYang.YangData_RwProject_Project_CloudAccounts_CloudAccountList(
name='test-cloud-account',
account_type="mock",
)
self.plugin_manager = NfviMetricsPluginManager(self.logger)
self.plugin_manager.register(self.account, "mock")
mock = self.plugin_manager.plugin(self.account.name)
mock.set_impl(TestNfviMetricsCache.Plugin())
self.vdur = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur()
self.vdur.id = "test-vdur-id"
self.vdur.vim_id = "test-vim-id"
self.vdur.vm_flavor.vcpu_count = 4
self.vdur.vm_flavor.memory_mb = 1
self.vdur.vm_flavor.storage_gb = 1
def test_create_destroy_entry(self):
cache = NfviMetricsCache(self.logger, self.loop, self.plugin_manager)
self.assertEqual(len(cache._nfvi_metrics), 0)
cache.create_entry(self.account, self.vdur)
self.assertEqual(len(cache._nfvi_metrics), 1)
cache.destroy_entry(self.vdur.id)
self.assertEqual(len(cache._nfvi_metrics), 0)
def test_retrieve(self):
NfviMetrics.SAMPLE_INTERVAL = 1
cache = NfviMetricsCache(self.logger, self.loop, self.plugin_manager)
cache.create_entry(self.account, self.vdur)
@wait_for_pending_tasks(self.loop)
@asyncio.coroutine
def retrieve_metrics():
metrics = cache.retrieve("test-vim-id")
self.assertEqual(metrics.vcpu.utilization, 0.0)
yield from asyncio.sleep(NfviMetrics.SAMPLE_INTERVAL, loop=self.loop)
metrics = cache.retrieve("test-vim-id")
self.assertEqual(metrics.vcpu.utilization, 0.5)
self.loop.run_until_complete(retrieve_metrics())
def test_id_mapping(self):
cache = NfviMetricsCache(self.logger, self.loop, self.plugin_manager)
cache.create_entry(self.account, self.vdur)
self.assertEqual(cache.to_vim_id(self.vdur.id), self.vdur.vim_id)
self.assertEqual(cache.to_vdur_id(self.vdur.vim_id), self.vdur.id)
self.assertTrue(cache.contains_vdur_id(self.vdur.id))
self.assertTrue(cache.contains_vim_id(self.vdur.vim_id))
cache.destroy_entry(self.vdur.id)
self.assertFalse(cache.contains_vdur_id(self.vdur.id))
self.assertFalse(cache.contains_vim_id(self.vdur.vim_id))
class TestNfviMetrics(unittest.TestCase):
class Plugin(object):
def nfvi_metrics_available(self, cloud_account):
return True
def nfvi_metrics(self, account, vim_id):
metrics = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_NfviMetrics()
metrics.vcpu.utilization = 0.5
return None, metrics
def setUp(self):
self.loop = asyncio.new_event_loop()
self.account = RwcalYang.YangData_RwProject_Project_CloudAccounts_CloudAccountList(
name='test-cloud-account',
account_type="mock",
)
self.plugin = TestNfviMetrics.Plugin()
self.logger = logging.getLogger('test-logger')
self.vdur = make_vdur()
self.vdur.vm_flavor.vcpu_count = 4
self.vdur.vm_flavor.memory_mb = 100
self.vdur.vm_flavor.storage_gb = 2
self.vdur.vim_id = 'test-vim-id'
def test_update(self):
nfvi_metrics = NfviMetrics(
self.logger,
self.loop,
self.account,
self.plugin,
self.vdur,
)
# Reduce the SAMPLE_INTERVAL so that the test does not take a long time
nfvi_metrics.SAMPLE_INTERVAL = 1
# The metrics have never been retrieved so they should be updated
self.assertTrue(nfvi_metrics.should_update())
# The metrics return will be empty because the cache version is empty.
# However, this trigger an update to retrieve metrics from the plugin.
metrics = nfvi_metrics.retrieve()
self.assertEqual(metrics.vcpu.utilization, 0.0)
# An update has been trigger by the retrieve call so additional updates
# should not happen
self.assertFalse(nfvi_metrics.should_update())
self.assertFalse(nfvi_metrics._updating.done())
# Allow the event loop to run until the update is complete
@asyncio.coroutine
@wait_for_pending_tasks(self.loop)
def wait_for_update():
yield from asyncio.wait_for(
nfvi_metrics._updating,
timeout=2,
loop=self.loop,
)
self.loop.run_until_complete(wait_for_update())
# Check that we have a new metrics object
metrics = nfvi_metrics.retrieve()
self.assertEqual(metrics.vcpu.utilization, 0.5)
# We have just updated the metrics so it should be unnecessary to update
# right now
self.assertFalse(nfvi_metrics.should_update())
self.assertTrue(nfvi_metrics._updating.done())
# Wait an amount of time equal to the SAMPLE_INTERVAL. This ensures
# that the metrics that were just retrieved become stale...
time.sleep(NfviMetrics.SAMPLE_INTERVAL)
# ...now it is time to update again
self.assertTrue(nfvi_metrics.should_update())
class TestNfviInterface(unittest.TestCase):
class NfviPluginImpl(object):
def __init__(self):
self._alarms = set()
def nfvi_metrics(self, account, vm_id):
return rwmon.YangData_RwProject_Project_NfviMetrics()
def nfvi_metrics_available(self, account):
return True
def alarm_create(self, account, vim_id, alarm):
alarm.alarm_id = str(uuid.uuid4())
self._alarms.add(alarm.alarm_id)
return RwTypes.RwStatus.SUCCESS
def alarm_delete(self, account, alarm_id):
self._alarms.remove(alarm_id)
return RwTypes.RwStatus.SUCCESS
def setUp(self):
self.loop = asyncio.new_event_loop()
self.logger = logging.getLogger('test-logger')
self.account = RwcalYang.YangData_RwProject_Project_CloudAccounts_CloudAccountList(
name='test-cloud-account',
account_type="mock",
)
# Define the VDUR to avoid division by zero
self.vdur = make_vdur()
self.vdur.vm_flavor.vcpu_count = 4
self.vdur.vm_flavor.memory_mb = 100
self.vdur.vm_flavor.storage_gb = 2
self.vdur.vim_id = 'test-vim-id'
self.plugin_manager = NfviMetricsPluginManager(self.logger)
self.plugin_manager.register(self.account, "mock")
self.cache = NfviMetricsCache(
self.logger,
self.loop,
self.plugin_manager,
)
self.nfvi_interface = NfviInterface(
self.loop,
self.logger,
self.plugin_manager,
self.cache
)
def test_nfvi_metrics_available(self):
self.assertTrue(self.nfvi_interface.nfvi_metrics_available(self.account))
def test_retrieve(self):
pass
@unittest.skip("Alarms are being disabled in monitor")
def test_alarm_create_and_destroy(self):
alarm = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_Alarms()
alarm.name = "test-alarm"
alarm.description = "test-description"
alarm.vdur_id = "test-vdur-id"
alarm.metric = "CPU_UTILIZATION"
alarm.statistic = "MINIMUM"
alarm.operation = "GT"
alarm.value = 0.1
alarm.period = 10
alarm.evaluations = 1
plugin_impl = TestNfviInterface.NfviPluginImpl()
plugin = self.plugin_manager.plugin(self.account.name)
plugin.set_impl(plugin_impl)
self.assertEqual(len(plugin_impl._alarms), 0)
@asyncio.coroutine
@wait_for_pending_tasks(self.loop)
def wait_for_create():
coro = self.nfvi_interface.alarm_create(
self.account,
"test-vim-id",
alarm,
)
yield from asyncio.wait_for(
coro,
timeout=2,
loop=self.loop,
)
self.loop.run_until_complete(wait_for_create())
self.assertEqual(len(plugin_impl._alarms), 1)
self.assertTrue(alarm.alarm_id is not None)
@asyncio.coroutine
@wait_for_pending_tasks(self.loop)
def wait_for_destroy():
coro = self.nfvi_interface.alarm_destroy(
self.account,
alarm.alarm_id,
)
yield from asyncio.wait_for(
coro,
timeout=2,
loop=self.loop,
)
self.loop.run_until_complete(wait_for_destroy())
self.assertEqual(len(plugin_impl._alarms), 0)
class TestVdurNfviMetrics(unittest.TestCase):
def setUp(self):
# Reduce the sample interval so that test run quickly
NfviMetrics.SAMPLE_INTERVAL = 0.1
# Create a mock plugin to define the metrics retrieved. The plugin will
# return a VCPU utilization of 0.5.
class MockPlugin(object):
def __init__(self):
self.metrics = RwmonYang.YangData_RwProject_Project_NfviMetrics()
def nfvi_metrics(self, account, vim_id):
self.metrics.vcpu.utilization = 0.5
return self.metrics
self.loop = asyncio.get_event_loop()
self.logger = logging.getLogger('test-logger')
self.account = RwcalYang.YangData_RwProject_Project_CloudAccounts_CloudAccountList(
name='test-cloud-account',
account_type="mock",
)
# Define the VDUR to avoid division by zero
vdur = make_vdur()
vdur.vm_flavor.vcpu_count = 4
vdur.vm_flavor.memory_mb = 100
vdur.vm_flavor.storage_gb = 2
vdur.vim_id = 'test-vim-id'
# Instantiate the mock plugin
self.plugin_manager = NfviMetricsPluginManager(self.logger)
self.plugin_manager.register(self.account, "mock")
self.plugin = self.plugin_manager.plugin(self.account.name)
self.plugin.set_impl(MockPlugin())
self.cache = NfviMetricsCache(
self.logger,
self.loop,
self.plugin_manager,
)
self.manager = NfviInterface(
self.loop,
self.logger,
self.plugin_manager,
self.cache,
)
self.metrics = NfviMetrics(
self.logger,
self.loop,
self.account,
self.plugin,
vdur,
)
def test_retrieval(self):
metrics_a = None
metrics_b = None
# Define a coroutine that can be added to the asyncio event loop
@asyncio.coroutine
def update():
# Output from the metrics calls with be written to these nonlocal
# variables
nonlocal metrics_a
nonlocal metrics_b
# This first call will return the current metrics values and
# schedule a request to the NFVI to retrieve metrics from the data
# source. All metrics will be zero at this point.
metrics_a = self.metrics.retrieve()
# Wait for the scheduled update to take effect
yield from asyncio.sleep(0.2, loop=self.loop)
# Retrieve the updated metrics
metrics_b = self.metrics.retrieve()
self.loop.run_until_complete(update())
# Check that the metrics returned indicate that the plugin was queried
# and returned the appropriate value, i.e. 0.5 utilization
self.assertEqual(0.0, metrics_a.vcpu.utilization)
self.assertEqual(0.5, metrics_b.vcpu.utilization)
class TestNfviMetricsPluginManager(unittest.TestCase):
def setUp(self):
self.logger = logging.getLogger('test-logger')
self.plugins = NfviMetricsPluginManager(self.logger)
self.account = RwcalYang.YangData_RwProject_Project_CloudAccounts_CloudAccountList(
name='test-cloud-account',
account_type="mock",
)
def test_mock_plugin(self):
# Register an account name with a mock plugin. If successful, the
# plugin manager should return a non-None object.
self.plugins.register(self.account, 'mock')
self.assertIsNotNone(self.plugins.plugin(self.account.name))
# Now unregister the cloud account
self.plugins.unregister(self.account.name)
# Trying to retrieve a plugin for a cloud account that has not been
# registered with the manager is expected to raise an exception.
with self.assertRaises(KeyError):
self.plugins.plugin(self.account.name)
def test_multiple_registration(self):
self.plugins.register(self.account, 'mock')
# Attempting to register the account with another type of plugin will
# also cause an exception to be raised.
with self.assertRaises(AccountAlreadyRegisteredError):
self.plugins.register(self.account, 'mock')
# Attempting to register the account with 'openstack' again with cause
# an exception to be raised.
with self.assertRaises(AccountAlreadyRegisteredError):
self.plugins.register(self.account, 'openstack')
def test_unsupported_plugin(self):
# If an attempt is made to register a cloud account with an unknown
# type of plugin, a PluginNotSupportedError should be raised.
with self.assertRaises(PluginNotSupportedError):
self.plugins.register(self.account, 'unsupported-plugin')
def test_anavailable_plugin(self):
# Create a factory that always raises PluginUnavailableError
class UnavailablePluginFactory(PluginFactory):
PLUGIN_NAME = "unavailable-plugin"
def create(self, cloud_account):
raise PluginUnavailableError()
# Register the factory
self.plugins.register_plugin_factory(UnavailablePluginFactory())
# Ensure that the correct exception propagates when the cloud account
# is registered.
with self.assertRaises(PluginUnavailableError):
self.plugins.register(self.account, "unavailable-plugin")
class TestMonitor(unittest.TestCase):
"""
The Monitor class is the implementation that is called by the
MonitorTasklet. It provides the unified interface for controlling and
querying the monitoring functionality.
"""
def setUp(self):
# Reduce the sample interval so that test run quickly
NfviMetrics.SAMPLE_INTERVAL = 0.1
self.loop = asyncio.get_event_loop()
self.logger = logging.getLogger('test-logger')
self.project = ManoProject(self.logger, name=DEFAULT_PROJECT)
self.config = InstanceConfiguration()
self.monitor = Monitor(self.loop, self.logger, self.config, self.project)
self.account = RwcalYang.YangData_RwProject_Project_CloudAccounts_CloudAccountList(
name='test-cloud-account',
account_type="mock",
)
def test_instance_config(self):
"""
Configuration data for an instance is pass to the Monitor when it is
created. The data is passed in the InstanceConfiguration object. This
object is typically shared between the tasklet and the monitor, and
provides a way for the tasklet to update the configuration of the
monitor.
"""
self.assertTrue(hasattr(self.monitor._config, "polling_period"))
self.assertTrue(hasattr(self.monitor._config, "min_cache_lifetime"))
self.assertTrue(hasattr(self.monitor._config, "max_polling_frequency"))
def test_monitor_cloud_accounts(self):
"""
This test checks the cloud accounts are correctly added and deleted,
and that the correct exceptions are raised on duplicate adds or
deletes.
"""
# Add the cloud account to the monitor
self.monitor.add_cloud_account(self.account)
self.assertIn(self.account.name, self.monitor._cloud_accounts)
# Add the cloud account to the monitor again
with self.assertRaises(AccountAlreadyRegisteredError):
self.monitor.add_cloud_account(self.account)
# Delete the cloud account
self.monitor.remove_cloud_account(self.account.name)
self.assertNotIn(self.account.name, self.monitor._cloud_accounts)
# Delete the cloud account again
with self.assertRaises(UnknownAccountError):
self.monitor.remove_cloud_account(self.account.name)
def test_monitor_cloud_accounts_illegal_removal(self):
"""
A cloud account may not be removed while there are plugins or records
that are associated with it. Attempting to delete such a cloud account
will raise an exception.
"""
# Add the cloud account to the monitor
self.monitor.add_cloud_account(self.account)
# Create a VNFR associated with the cloud account
vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr()
vnfr.datacenter = self.account.name
vnfr.id = 'test-vnfr-id'
# Add a VDUR to the VNFR
vdur = vnfr.vdur.add()
vdur.vim_id = 'test-vim-id-1'
vdur.id = 'test-vdur-id-1'
# Now add the VNFR to the monitor
self.monitor.add_vnfr(vnfr)
# Check that the monitor contains the VNFR, VDUR, and metrics
self.assertTrue(self.monitor.is_registered_vdur(vdur.id))
self.assertTrue(self.monitor.is_registered_vnfr(vnfr.id))
self.assertEqual(1, len(self.monitor.metrics))
# Deleting the cloud account now should raise an exception because the
# VNFR and VDUR are associated with the cloud account.
with self.assertRaises(AccountInUseError):
self.monitor.remove_cloud_account(self.account.name)
# Now remove the VNFR from the monitor
self.monitor.remove_vnfr(vnfr.id)
self.assertFalse(self.monitor.is_registered_vdur(vdur.id))
self.assertFalse(self.monitor.is_registered_vnfr(vnfr.id))
self.assertEqual(0, len(self.monitor.metrics))
# Safely delete the cloud account
self.monitor.remove_cloud_account(self.account.name)
def test_vdur_registration(self):
"""
When a VDUR is registered with the Monitor it is registered with the
VdurNfviMetricsManager. Thus it is assigned a plugin that can be used
to retrieve the NFVI metrics associated with the VDU.
"""
# Define the VDUR to be registered
vdur = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur()
vdur.vm_flavor.vcpu_count = 4
vdur.vm_flavor.memory_mb = 100
vdur.vm_flavor.storage_gb = 2
vdur.vim_id = 'test-vim-id'
vdur.id = 'test-vdur-id'
# Before registering the VDUR, the cloud account needs to be added to
# the monitor.
self.monitor.add_cloud_account(self.account)
# Register the VDUR with the monitor
self.monitor.add_vdur(self.account, vdur)
self.assertTrue(self.monitor.is_registered_vdur(vdur.id))
# Check that the VDUR has been added to the metrics cache
self.assertTrue(self.monitor.cache.contains_vdur_id(vdur.id))
# Unregister the VDUR
self.monitor.remove_vdur(vdur.id)
self.assertFalse(self.monitor.is_registered_vdur(vdur.id))
# Check that the VDUR has been removed from the metrics cache
self.assertFalse(self.monitor.cache.contains_vdur_id(vdur.id))
def test_vnfr_add_update_delete(self):
"""
When a VNFR is added to the Monitor a record is created of the
relationship between the VNFR and any VDURs that it contains. Each VDUR
is then registered with the VdurNfviMetricsManager. A VNFR can also be
updated so that it contains more of less VDURs. Any VDURs that are
added to the VNFR are registered with the NdurNfviMetricsManager, and
any that are removed are unregistered. When a VNFR is deleted, all of
the VDURs contained in the VNFR are unregistered.
"""
# Define the VDUR to be registered
vdur = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur()
vdur.vim_id = 'test-vim-id-1'
vdur.id = 'test-vdur-id-1'
vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr()
vnfr.datacenter = self.account.name
vnfr.id = 'test-vnfr-id'
vnfr.vdur.append(vdur)
self.monitor.add_cloud_account(self.account)
# Add the VNFR to the monitor. This will also register VDURs contained
# in the VNFR with the monitor.
self.monitor.add_vnfr(vnfr)
self.assertTrue(self.monitor.is_registered_vdur('test-vdur-id-1'))
# Add another VDUR to the VNFR and update the monitor. Both VDURs
# should now be registered
vdur = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur()
vdur.vim_id = 'test-vim-id-2'
vdur.id = 'test-vdur-id-2'
vnfr.vdur.append(vdur)
self.monitor.update_vnfr(vnfr)
self.assertTrue(self.monitor.is_registered_vdur('test-vdur-id-1'))
self.assertTrue(self.monitor.is_registered_vdur('test-vdur-id-2'))
# Delete the VNFR from the monitor. This should remove the VNFR and all
# of the associated VDURs from the monitor.
self.monitor.remove_vnfr(vnfr.id)
self.assertFalse(self.monitor.is_registered_vnfr('test-vnfr-id'))
self.assertFalse(self.monitor.is_registered_vdur('test-vdur-id-1'))
self.assertFalse(self.monitor.is_registered_vdur('test-vdur-id-2'))
with self.assertRaises(KeyError):
self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
with self.assertRaises(KeyError):
self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
def test_complete(self):
"""
This test simulates the addition of a VNFR to the Monitor (along with
updates), and retrieves NFVI metrics from the VDUR. The VNFR is then
deleted, which should result in a cleanup of all the data in the
Monitor.
"""
# Create the VNFR
vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr()
vnfr.datacenter = self.account.name
vnfr.id = 'test-vnfr-id'
# Create 2 VDURs
vdur = vnfr.vdur.add()
vdur.id = 'test-vdur-id-1'
vdur.vim_id = 'test-vim-id-1'
vdur.vm_flavor.vcpu_count = 4
vdur.vm_flavor.memory_mb = 100
vdur.vm_flavor.storage_gb = 2
vdur = vnfr.vdur.add()
vdur.id = 'test-vdur-id-2'
vdur.vim_id = 'test-vim-id-2'
vdur.vm_flavor.vcpu_count = 4
vdur.vm_flavor.memory_mb = 100
vdur.vm_flavor.storage_gb = 2
class MockPlugin(object):
def __init__(self):
self._metrics = dict()
self._metrics['test-vim-id-1'] = RwmonYang.YangData_RwProject_Project_NfviMetrics()
self._metrics['test-vim-id-2'] = RwmonYang.YangData_RwProject_Project_NfviMetrics()
def nfvi_metrics(self, account, vim_id):
metrics = self._metrics[vim_id]
if vim_id == 'test-vim-id-1':
metrics.memory.used += 1000
else:
metrics.memory.used += 2000
return metrics
class MockFactory(PluginFactory):
PLUGIN_NAME = "mock"
def create(self, cloud_account):
plugin = rw_peas.PeasPlugin("rwmon_mock", 'RwMon-1.0')
impl = plugin.get_interface("Monitoring")
impl.set_impl(MockPlugin())
return impl
# Modify the mock plugin factory
self.monitor._nfvi_plugins._factories["mock"] = MockFactory()
# Add the cloud account the monitor
self.monitor.add_cloud_account(self.account)
# Add the VNFR to the monitor.
self.monitor.add_vnfr(vnfr)
@wait_for_pending_tasks(self.loop)
@asyncio.coroutine
def call1():
# call #1 (time = 0.00s)
# The metrics for these VDURs have not been populated yet so a
# default metrics object (all zeros) is returned, and a request is
# scheduled with the data source to retrieve the metrics.
metrics1 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
metrics2 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
self.assertEqual(0, metrics1.memory.used)
self.assertEqual(0, metrics2.memory.used)
self.loop.run_until_complete(call1())
@wait_for_pending_tasks(self.loop)
@asyncio.coroutine
def call2():
# call #2 (wait 0.05s)
# The metrics have been populated with data from the data source
# due to the request made during call #1.
yield from asyncio.sleep(0.05)
metrics1 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
metrics2 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
self.assertEqual(1000, metrics1.memory.used)
self.assertEqual(2000, metrics2.memory.used)
self.loop.run_until_complete(call2())
@wait_for_pending_tasks(self.loop)
@asyncio.coroutine
def call3():
# call #3 (wait 0.50s)
# This call exceeds 0.1s (the sample interval of the plugin)
# from when the data was retrieved. The cached metrics are
# immediately returned, but a request is made to the data source to
# refresh these metrics.
yield from asyncio.sleep(0.10)
metrics1 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
metrics2 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
self.assertEqual(1000, metrics1.memory.used)
self.assertEqual(2000, metrics2.memory.used)
self.loop.run_until_complete(call3())
@wait_for_pending_tasks(self.loop)
@asyncio.coroutine
def call4():
# call #4 (wait 1.00s)
# The metrics retrieved differ from those in call #3 because the
# cached metrics have been updated.
yield from asyncio.sleep(0.10)
metrics1 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-1')
metrics2 = self.monitor.retrieve_nfvi_metrics('test-vdur-id-2')
self.assertEqual(2000, metrics1.memory.used)
self.assertEqual(4000, metrics2.memory.used)
self.loop.run_until_complete(call4())
def main(argv=sys.argv[1:]):
logging.basicConfig(format='TEST %(message)s')
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true')
args = parser.parse_args(argv)
# Set the global logging level
logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
# Set the logger in this test to use a null handler
logging.getLogger('test-logger').addHandler(logging.NullHandler())
# The unittest framework requires a program name, so use the name of this
# file instead (we do not want to have to pass a fake program name to main
# when this is called from the interpreter).
unittest.main(argv=[__file__] + argv,
testRunner=xmlrunner.XMLTestRunner(
output=os.environ["RIFT_MODULE_TEST"]))
if __name__ == '__main__':
main()