#
##
-'''
+"""
Module for testing openmano functionality. It uses openmanoclient.py for invoking openmano
-'''
-__author__ = "Pablo Montes, Alfonso Tierno"
-__date__ = "$16-Feb-2017 17:08:16$"
-__version__ = "0.0.4"
-version_date = "Jun 2017"
+"""
import logging
import os
-from argparse import ArgumentParser
import argcomplete
import unittest
import string
import inspect
import random
-import traceback
+# import traceback
import glob
import yaml
import sys
import time
-from pyvcloud.vcloudair import VCA
import uuid
import json
+from argparse import ArgumentParser
+
+__author__ = "Pablo Montes, Alfonso Tierno"
+__date__ = "$16-Feb-2017 17:08:16$"
+__version__ = "0.1.0"
+version_date = "Oct 2017"
+
+test_config = {} # used for global variables with the test configuration
-global test_config # used for global variables with the test configuration
-test_config = {}
class test_base(unittest.TestCase):
test_index = 1
self.datacenter = test_config["client"].attach_datacenter(name=self.__class__.datacenter_name,
vim_tenant_name='fake')
logger.debug("{}".format(self.datacenter))
- assert ('vim_tenants' in self.datacenter.get('datacenter', {}))
+ assert ('uuid' in self.datacenter.get('datacenter', {}))
def test_030_list_attached_datacenter(self):
self.__class__.test_text = "{}.{}. TEST {}".format(test_config["test_number"], self.__class__.test_index,
logger.debug("{}".format(tenant))
assert ('deleted' in tenant.get('result', ""))
+
class test_vimconn_connect(test_base):
def test_000_connect(self):
if test_config['vimtype'] == 'vmware':
vca_object = test_config["vim_conn"].connect()
logger.debug("{}".format(vca_object))
- self.assertIsInstance(vca_object, VCA)
-
+ self.assertIsNotNone(vca_object)
class test_vimconn_new_network(test_base):
network_name = None
self.__class__.test_index,
inspect.currentframe().f_code.co_name)
self.__class__.test_index += 1
+ with self.assertRaises(Exception) as context:
+ test_config["vim_conn"].get_network(Non_exist_id)
- network_info = test_config["vim_conn"].get_network(Non_exist_id)
- self.assertEqual(network_info, {})
+ self.assertEqual((context.exception).http_code, 404)
class test_vimconn_delete_network(test_base):
network_name = None
image_path = test_config['image_path']
if image_path:
- image_id = test_config["vim_conn"].new_image({ 'name': 'TestImage', 'location' : image_path })
+ self.__class__.image_id = test_config["vim_conn"].new_image({ 'name': 'TestImage', 'location' : image_path })
time.sleep(20)
- self.assertEqual(type(image_id),str)
- self.assertIsInstance(uuid.UUID(image_id),uuid.UUID)
+ self.assertEqual(type(self.__class__.image_id),str)
+ self.assertIsInstance(uuid.UUID(self.__class__.image_id),uuid.UUID)
else:
self.skipTest("Skipping test as image file not present at RO container")
net_list = [{'use': self.__class__.net_type, 'name': name, 'floating_ip': False, 'vpci': vpci, 'port_security': True, 'type': 'virtual', 'net_id': self.__class__.network_id}]
- self.__class__.instance_id = test_config["vim_conn"].new_vminstance(name='Test1_vm', image_id=self.__class__.image_id, flavor_id=flavor_id, net_list=net_list)
+ self.__class__.instance_id, _ = test_config["vim_conn"].new_vminstance(name='Test1_vm', image_id=self.__class__.image_id, flavor_id=flavor_id, net_list=net_list)
self.assertEqual(type(self.__class__.instance_id),str)
net_list = [{'use': self.__class__.net_type, 'name': name, 'floating_ip': False, 'port_security': True, 'model': model_name, 'type': 'virtual', 'net_id': self.__class__.network_id}]
- instance_id = test_config["vim_conn"].new_vminstance(name='Test1_vm', image_id=self.__class__.image_id,
+ instance_id, _ = test_config["vim_conn"].new_vminstance(name='Test1_vm', image_id=self.__class__.image_id,
flavor_id=flavor_id,
net_list=net_list)
self.assertEqual(type(instance_id),str)
net_list = [{'use': net_use, 'name': name, 'floating_ip': False, 'port_security': True, 'type': 'virtual', 'net_id': self.__class__.network_id}]
- instance_id = test_config["vim_conn"].new_vminstance(name='Test1_vm', image_id=self.__class__.image_id,
+ instance_id, _ = test_config["vim_conn"].new_vminstance(name='Test1_vm', image_id=self.__class__.image_id,
flavor_id=flavor_id,
net_list=net_list)
self.assertEqual(type(instance_id),str)
net_list = [{'use': self.__class__.net_type, 'name': name, 'floating_ip': False, 'port_security': True, 'type': _type, 'net_id': self.__class__.network_id}]
- instance_id = test_config["vim_conn"].new_vminstance(name='Test1_vm', image_id=self.__class__.image_id,
+ instance_id, _ = test_config["vim_conn"].new_vminstance(name='Test1_vm', image_id=self.__class__.image_id,
flavor_id=flavor_id,
net_list=net_list)
self.assertEqual(type(instance_id),str)
net_list = [{'use': self.__class__.net_type, 'name': name, 'floating_ip': False, 'port_security': True, 'type': 'virtual', 'net_id': self.__class__.network_id}]
- instance_id = test_config["vim_conn"].new_vminstance(name='Cloud_vm', image_id=self.__class__.image_id,
+ instance_id, _ = test_config["vim_conn"].new_vminstance(name='Cloud_vm', image_id=self.__class__.image_id,
flavor_id=flavor_id,
net_list=net_list,
cloud_config=cloud_data)
net_list = [{'use': self.__class__.net_type, 'name': name, 'floating_ip': False, 'port_security': True, 'type': 'virtual', 'net_id': self.__class__.network_id}]
- instance_id = test_config["vim_conn"].new_vminstance(name='VM_test1', image_id=self.__class__.image_id,
+ instance_id, _ = test_config["vim_conn"].new_vminstance(name='VM_test1', image_id=self.__class__.image_id,
flavor_id=flavor_id,
net_list=net_list,
disk_list=device_data)
self.assertEqual((context.exception).http_code, 404)
+
'''
IMPORTANT NOTE
The following unittest class does not have the 'test_' on purpose. This test is the one used for the
class descriptor_based_scenario_test(test_base):
test_index = 0
scenario_test_path = None
- scenario_uuid = None
- instance_scenario_uuid = None
- to_delete_list = []
@classmethod
def setUpClass(cls):
cls.test_index = 1
cls.to_delete_list = []
+ cls.scenario_uuids = []
+ cls.instance_scenario_uuids = []
cls.scenario_test_path = test_config["test_directory"] + '/' + test_config["test_folder"]
logger.info("{}. {} {}".format(test_config["test_number"], cls.__name__, test_config["test_folder"]))
inspect.currentframe().f_code.co_name,
test_config["test_folder"])
self.__class__.test_index += 1
- vnfd_files = glob.glob(self.__class__.scenario_test_path+'/vnfd_*.yaml')
+ # load VNFD and NSD
+ descriptor_files = glob.glob(self.__class__.scenario_test_path+'/*.yaml')
+ vnf_descriptors = []
+ scenario_descriptors = []
+ for descriptor_file in descriptor_files:
+ with open(descriptor_file, 'r') as stream:
+ descriptor = yaml.load(stream)
+ if "vnf" in descriptor or "vnfd:vnfd-catalog" in descriptor or "vnfd-catalog" in descriptor:
+ vnf_descriptors.append(descriptor)
+ else:
+ scenario_descriptors.append(descriptor)
+
scenario_file = glob.glob(self.__class__.scenario_test_path + '/scenario_*.yaml')
- if len(vnfd_files) == 0 or len(scenario_file) > 1:
+ if not vnf_descriptors or not scenario_descriptors or len(scenario_descriptors) > 1:
raise Exception("Test '{}' not valid. It must contain an scenario file and at least one vnfd file'".format(
test_config["test_folder"]))
- #load all vnfd
- for vnfd in vnfd_files:
- with open(vnfd, 'r') as stream:
- vnf_descriptor = yaml.load(stream)
-
- vnfc_list = vnf_descriptor['vnf']['VNFC']
- for vnfc in vnfc_list:
- vnfc['image name'] = test_config["image_name"]
- devices = vnfc.get('devices',[])
- for device in devices:
- if device['type'] == 'disk' and 'image name' in device:
- device['image name'] = test_config["image_name"]
-
+ # load all vnfd
+ for vnf_descriptor in vnf_descriptors:
logger.debug("VNF descriptor: {}".format(vnf_descriptor))
- vnf = test_config["client"].create_vnf(descriptor=vnf_descriptor)
+ vnf = test_config["client"].create_vnf(descriptor=vnf_descriptor, image_name=test_config["image_name"])
logger.debug(vnf)
+ if 'vnf' in vnf:
+ vnf_uuid = vnf['vnf']['uuid']
+ else:
+ vnf_uuid = vnf['vnfd'][0]['uuid']
self.__class__.to_delete_list.insert(0, {"item": "vnf", "function": test_config["client"].delete_vnf,
- "params": {"uuid": vnf['vnf']['uuid']}})
-
- #load the scenario definition
- with open(scenario_file[0], 'r') as stream:
- scenario_descriptor = yaml.load(stream)
- networks = scenario_descriptor['scenario']['networks']
- networks[test_config["mgmt_net"]] = networks.pop('mgmt')
- logger.debug("Scenario descriptor: {}".format(scenario_descriptor))
- scenario = test_config["client"].create_scenario(descriptor=scenario_descriptor)
- logger.debug(scenario)
- self.__class__.to_delete_list.insert(0,{"item": "scenario", "function": test_config["client"].delete_scenario,
- "params":{"uuid": scenario['scenario']['uuid']} })
- self.__class__.scenario_uuid = scenario['scenario']['uuid']
+ "params": {"uuid": vnf_uuid}})
+
+ # load the scenario definition
+ for scenario_descriptor in scenario_descriptors:
+ # networks = scenario_descriptor['scenario']['networks']
+ # networks[test_config["mgmt_net"]] = networks.pop('mgmt')
+ logger.debug("Scenario descriptor: {}".format(scenario_descriptor))
+ scenario = test_config["client"].create_scenario(descriptor=scenario_descriptor)
+ logger.debug(scenario)
+ if 'scenario' in scenario:
+ scenario_uuid = scenario['scenario']['uuid']
+ else:
+ scenario_uuid = scenario['nsd'][0]['uuid']
+ self.__class__.to_delete_list.insert(0, {"item": "scenario",
+ "function": test_config["client"].delete_scenario,
+ "params": {"uuid": scenario_uuid}})
+ self.__class__.scenario_uuids.append(scenario_uuid)
def test_010_instantiate_scenario(self):
self.__class__.test_text = "{}.{}. TEST {} {}".format(test_config["test_number"], self.__class__.test_index,
inspect.currentframe().f_code.co_name,
test_config["test_folder"])
self.__class__.test_index += 1
-
- instance = test_config["client"].create_instance(scenario_id=self.__class__.scenario_uuid,
- name=self.__class__.test_text)
- self.__class__.instance_scenario_uuid = instance['uuid']
- logger.debug(instance)
- self.__class__.to_delete_list.insert(0, {"item": "instance", "function": test_config["client"].delete_instance,
- "params": {"uuid": instance['uuid']}})
+ for scenario_uuid in self.__class__.scenario_uuids:
+ instance_descriptor = {
+ "instance":{
+ "name": self.__class__.test_text,
+ "scenario": scenario_uuid,
+ "networks":{
+ "mgmt": {"sites": [ { "netmap-use": test_config["mgmt_net"]} ]}
+ }
+ }
+ }
+ instance = test_config["client"].create_instance(instance_descriptor)
+ self.__class__.instance_scenario_uuids.append(instance['uuid'])
+ logger.debug(instance)
+ self.__class__.to_delete_list.insert(0, {"item": "instance",
+ "function": test_config["client"].delete_instance,
+ "params": {"uuid": instance['uuid']}})
def test_020_check_deployent(self):
self.__class__.test_text = "{}.{}. TEST {} {}".format(test_config["test_number"], self.__class__.test_index,
return
keep_waiting = test_config["timeout"]
- instance_active = False
- while True:
- result = check_instance_scenario_active(self.__class__.instance_scenario_uuid)
- if result[0]:
- break
- elif 'ERROR' in result[1]:
- msg = 'Got error while waiting for the instance to get active: '+result[1]
- logging.error(msg)
- raise Exception(msg)
+ pending_instance_scenario_uuids = list(self.__class__.instance_scenario_uuids) # make a copy
+ while pending_instance_scenario_uuids:
+ index = 0
+ while index < len(pending_instance_scenario_uuids):
+ result = check_instance_scenario_active(pending_instance_scenario_uuids[index])
+ if result[0]:
+ del pending_instance_scenario_uuids[index]
+ break
+ elif 'ERROR' in result[1]:
+ msg = 'Got error while waiting for the instance to get active: '+result[1]
+ logging.error(msg)
+ raise Exception(msg)
+ index += 1
if keep_waiting >= 5:
time.sleep(5)
return executed, failed
+def test_wim(args):
+ global test_config
+ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "/osm_ro")
+ import openmanoclient
+ executed = 0
+ failed = 0
+ test_config["client"] = openmanoclient.openmanoclient(
+ endpoint_url=args.endpoint_url,
+ tenant_name=args.tenant_name,
+ datacenter_name=args.datacenter,
+ debug=args.debug, logger=test_config["logger_name"])
+ clsmembers = inspect.getmembers(sys.modules[__name__], inspect.isclass)
+ # If only want to obtain a tests list print it and exit
+ if args.list_tests:
+ tests_names = []
+ for cls in clsmembers:
+ if cls[0].startswith('test_WIM'):
+ tests_names.append(cls[0])
+
+ msg = "The 'wim' set tests are:\n\t" + ', '.join(sorted(tests_names)) +\
+ "\nNOTE: The test test_VIM_tenant_operations will fail in case the used datacenter is type OpenStack " \
+ "unless RO has access to the admin endpoint. Therefore this test is excluded by default"
+ print(msg)
+ logger.info(msg)
+ sys.exit(0)
+
+ # Create the list of tests to be run
+ code_based_tests = []
+ if args.tests:
+ for test in args.tests:
+ for t in test.split(','):
+ matches_code_based_tests = [item for item in clsmembers if item[0] == t]
+ if len(matches_code_based_tests) > 0:
+ code_based_tests.append(matches_code_based_tests[0][1])
+ else:
+ logger.critical("Test '{}' is not among the possible ones".format(t))
+ sys.exit(1)
+ if not code_based_tests:
+ # include all tests
+ for cls in clsmembers:
+ # We exclude 'test_VIM_tenant_operations' unless it is specifically requested by the user
+ if cls[0].startswith('test_VIM') and cls[0] != 'test_VIM_tenant_operations':
+ code_based_tests.append(cls[1])
+
+ logger.debug("tests to be executed: {}".format(code_based_tests))
+
+ # TextTestRunner stream is set to /dev/null in order to avoid the method to directly print the result of tests.
+ # This is handled in the tests using logging.
+ stream = open('/dev/null', 'w')
+
+ # Run code based tests
+ basic_tests_suite = unittest.TestSuite()
+ for test in code_based_tests:
+ basic_tests_suite.addTest(unittest.makeSuite(test))
+ result = unittest.TextTestRunner(stream=stream, failfast=failfast).run(basic_tests_suite)
+ executed += result.testsRun
+ failed += len(result.failures) + len(result.errors)
+ if failfast and failed:
+ sys.exit(1)
+ if len(result.failures) > 0:
+ logger.debug("failures : {}".format(result.failures))
+ if len(result.errors) > 0:
+ logger.debug("errors : {}".format(result.errors))
+ return executed, failed
+
+
def test_deploy(args):
global test_config
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + "/osm_ro")
vimconn_parser.add_argument('-u', '--url', dest='endpoint_url', default='http://localhost:9090/openmano',
help="Set the openmano server url. By default 'http://localhost:9090/openmano'")
+ # WIM test set
+ # -------------------
+ vimconn_parser = subparsers.add_parser('wim', parents=[parent_parser], help="test wim")
+ vimconn_parser.set_defaults(func=test_wim)
+
+ # Mandatory arguments
+ mandatory_arguments = vimconn_parser.add_argument_group('mandatory arguments')
+ mandatory_arguments.add_argument('-d', '--datacenter', required=True, help='Set the datacenter to test')
+
+ # Optional arguments
+ vimconn_parser.add_argument('-u', '--url', dest='endpoint_url', default='http://localhost:9090/openmano',
+ help="Set the openmano server url. By default 'http://localhost:9090/openmano'")
+
argcomplete.autocomplete(parser)
args = parser.parse_args()
# print str(args)