#!/usr/bin/env python3
-#
+
+#
# Copyright 2016 RIFT.IO Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
import argparse
import asyncio
+import gi
+import logging
import os
+import random
import sys
import unittest
-import random
-
import xmlrunner
+
import unittest.mock as mock
import rift.test.dts
import rift.tasklets.rwautoscaler.engine as engine
-import gi
gi.require_version('RwDtsYang', '1.0')
from gi.repository import (
RwNsrYang,
NsrYang,
- NsdYang,
+ ProjectNsdYang as NsdYang,
RwLaunchpadYang as launchpadyang,
RwVnfrYang,
- RwVnfdYang,
- RwNsdYang,
+ RwProjectVnfdYang as RwVnfdYang,
+ RwProjectNsdYang as RwNsdYang,
VnfrYang
)
+gi.require_version('RwKeyspec', '1.0')
+from gi.repository.RwKeyspec import quoted_key
-ScalingCriteria = NsdYang.YangData_Nsd_NsdCatalog_Nsd_ScalingGroupDescriptor_ScalingPolicy_ScalingCriteria
-ScalingPolicy = NsdYang.YangData_Nsd_NsdCatalog_Nsd_ScalingGroupDescriptor_ScalingPolicy
+ScalingCriteria = NsdYang.YangData_RwProject_Project_NsdCatalog_Nsd_ScalingGroupDescriptor_ScalingPolicy_ScalingCriteria
+ScalingPolicy = NsdYang.YangData_RwProject_Project_NsdCatalog_Nsd_ScalingGroupDescriptor_ScalingPolicy
class MockDelegate(engine.ScalingCriteria.Delegate):
def __init__(self, aggregation_type="AVERAGE", legacy=False):
self.aggregation_type = aggregation_type
self.legacy = legacy
- self.threshold_time = 3
+ self.threshold_time = 2
def __call__(self):
store = mock.MagicMock()
- mock_vnfd = RwVnfdYang.YangData_Vnfd_VnfdCatalog_Vnfd.from_dict({
+ mock_vnfd = RwVnfdYang.YangData_RwProject_Project_VnfdCatalog_Vnfd.from_dict({
'id': "1",
'monitoring_param': [
{'description': 'no of ping requests',
store.get_vnfd = mock.MagicMock(return_value=mock_vnfd)
- mock_vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict({'id': '1'})
- mock_vnfr.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict({'id': '1'})
+ mock_vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict({'id': '1'})
+ mock_vnfr.vnfd = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd.from_dict({'id': '1'})
store.get_vnfr = mock.MagicMock(return_value=mock_vnfr)
- mock_nsr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.from_dict({
+ mock_nsr = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.from_dict({
'ns_instance_config_ref': "1",
'name_ref': "Foo",
'nsd_ref': '1',
scale_in_val = 100
scale_out_val = 200
- mock_nsd = RwNsdYang.YangData_Nsd_NsdCatalog_Nsd.from_dict({
+ mock_nsd = RwNsdYang.YangData_RwProject_Project_NsdCatalog_Nsd.from_dict({
'id': '1',
+ 'name': 'mock',
+ 'short_name': 'm',
'monitoring_param': (monp_cfg if not self.legacy else []),
'constituent_vnfd': [{'member_vnf_index': 1,
'start_by_default': True,
def _populate_mock_values(self, criterias, nsr_id, floor, ceil):
# Mock publish
# Verify Scale in AND operator
- NsMonParam = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_MonitoringParam
+ NsMonParam = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_MonitoringParam
publisher = rift.test.dts.DescriptorPublisher(self.log, self.dts, self.loop)
for criteria in criterias:
monp_id = criteria.ns_monitoring_param_ref
- w_xpath = "D,/nsr:ns-instance-opdata/nsr:nsr"
- w_xpath = w_xpath + "[nsr:ns-instance-config-ref='{}']/nsr:monitoring-param".format(nsr_id)
- xpath = w_xpath + "[nsr:id ='{}']".format(monp_id)
+ w_xpath = "D,/rw-project:project/nsr:ns-instance-opdata/nsr:nsr"
+ w_xpath = w_xpath + "[nsr:ns-instance-config-ref={}]/nsr:monitoring-param".format(quoted_key(nsr_id))
+ xpath = w_xpath + "[nsr:id={}]".format(quoted_key(monp_id))
- for i in range(self.mock_store.threshold_time + 1):
+ for i in range(self.mock_store.threshold_time + 2):
value = random.randint(floor, ceil)
monp = NsMonParam.from_dict({
yield from self._populate_mock_values(policy.scaling_criteria, nsr_id, floor, ceil)
assert mock_delegate.scale_in_called == 0
- # Test 2: AND operation
+ # Test 2: AND operation
yield from scale_out(policy)
yield from self._populate_mock_values(policy.scaling_criteria, nsr_id, floor, ceil)
assert mock_delegate.scale_in_called == 1
assert mock_delegate.scale_in_called == 1
@rift.test.dts.async_test
- def _test_scale_out(self):
+ def test_scale_out(self):
""" Tests scale out
Asserts:
1. Scale out
2. Scale out doesn't happen during cooldown
- 3. AND operation
+ 3. AND operation
4. OR operation.
"""
store = self.mock_store()
def main():
+ logging.basicConfig(format='TEST %(message)s')
runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
parser = argparse.ArgumentParser()
if args.no_runner:
runner = None
+ # Set the global logging level
+ logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
+
unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)