DEBIAN_FRONTEND=noninteractive apt-get -y install python-pip libmysqlclient-dev libssl-dev libffi-dev && \
DEBIAN_FRONTEND=noninteractive pip install --upgrade pip && \
DEBIAN_FRONTEND=noninteractive pip install --upgrade setuptools && \
- DEBIAN_FRONTEND=noninteractive apt-get -y install python-argcomplete python-boto python-bottle python-jsonschema python-logutils python-cinderclient python-glanceclient python-keystoneclient python-neutronclient python-novaclient python-mysqldb
+ DEBIAN_FRONTEND=noninteractive apt-get -y install python-argcomplete python-boto python-bottle python-jsonschema python-logutils python-cinderclient python-glanceclient python-keystoneclient python-neutronclient python-novaclient python-openstackclient python-mysqldb
for scenario_net in scenarioDict['nets']:
if net_name == scenario_net["name"]:
if 'ip-profile' in net_instance_desc:
- ipprofile = net_instance_desc['ip-profile']
- ipprofile['subnet_address'] = ipprofile.pop('subnet-address',None)
- ipprofile['ip_version'] = ipprofile.pop('ip-version','IPv4')
- ipprofile['gateway_address'] = ipprofile.pop('gateway-address',None)
- ipprofile['dns_address'] = ipprofile.pop('dns-address',None)
- if 'dhcp' in ipprofile:
- ipprofile['dhcp_start_address'] = ipprofile['dhcp'].get('start-address',None)
- ipprofile['dhcp_enabled'] = ipprofile['dhcp'].get('enabled',True)
- ipprofile['dhcp_count'] = ipprofile['dhcp'].get('count',None)
- del ipprofile['dhcp']
+ # translate from input format to database format
+ ipprofile_in = net_instance_desc['ip-profile']
+ ipprofile_db = {}
+ ipprofile_db['subnet_address'] = ipprofile_in.get('subnet-address')
+ ipprofile_db['ip_version'] = ipprofile_in.get('ip-version', 'IPv4')
+ ipprofile_db['gateway_address'] = ipprofile_in.get('gateway-address')
+ ipprofile_db['dns_address'] = ipprofile_in.get('dns-address')
+ if isinstance(ipprofile_db['dns_address'], (list, tuple)):
+ ipprofile_db['dns_address'] = ";".join(ipprofile_db['dns_address'])
+ if 'dhcp' in ipprofile_in:
+ ipprofile_db['dhcp_start_address'] = ipprofile_in['dhcp'].get('start-address')
+ ipprofile_db['dhcp_enabled'] = ipprofile_in['dhcp'].get('enabled', True)
+ ipprofile_db['dhcp_count'] = ipprofile_in['dhcp'].get('count' )
if 'ip_profile' not in scenario_net:
- scenario_net['ip_profile'] = ipprofile
+ scenario_net['ip_profile'] = ipprofile_db
else:
- update(scenario_net['ip_profile'],ipprofile)
+ update(scenario_net['ip_profile'], ipprofile_db)
for interface in net_instance_desc.get('interfaces', () ):
if 'ip_address' in interface:
for vnf in scenarioDict['vnfs']:
created_time += 0.00001
db_base._convert_bandwidth(dataiface, logger=self.logger)
dataifacesDict[vm['name']][dataiface['name']] = {}
- dataifacesDict[vm['name']][dataiface['name']]['vpci'] = dataiface['vpci']
+ dataifacesDict[vm['name']][dataiface['name']]['vpci'] = dataiface.get('vpci')
dataifacesDict[vm['name']][dataiface['name']]['bw'] = dataiface['bandwidth']
dataifacesDict[vm['name']][dataiface['name']]['model'] = "PF" if dataiface[
'dedicated'] == "yes" else (
created_time += 0.00001
db_base._convert_bandwidth(dataiface, logger=self.logger)
ifaceDict = {}
- ifaceDict['vpci'] = dataiface['vpci']
+ ifaceDict['vpci'] = dataiface.get('vpci')
ifaceDict['bw'] = dataiface['bandwidth']
ifaceDict['model'] = "PF" if dataiface['dedicated'] == "yes" else \
("VF" if dataiface['dedicated'] == "no" else "VFnotShared")
"$schema": "http://json-schema.org/draft-04/schema#",
"type":"object",
"properties":{
- "ip-version": {"type":"string", "enum":["IPv4","IPv6"]},
+ "ip-version": {"type": "string", "enum": ["IPv4","IPv6"]},
"subnet-address": ip_prefix_schema,
"gateway-address": ip_schema,
- "dns-address": ip_schema,
+ "dns-address": {"oneOf": [ip_schema, # for backward compatibility
+ {"type": "array", "items": ip_schema}]},
"dhcp": dhcp_schema
},
}
sdn_net_id = None
sdn_controller = self.vim.config.get('sdn-controller')
if sdn_controller and (net_type == "data" or net_type == "ptp"):
- network = {"name": net_name, "type": net_type}
+ network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
vim_net = self.vim.get_network(net_id)
if vim_net.get('encapsulation') != 'vlan':
except IOError as e:
raise vimconn.vimconnException("Error reading file '{}': {}".format(flavor_data[1:], e))
elif isinstance(flavor_data, dict):
- self.flavor_data = flavor_data
+ self.flavor_info = flavor_data
self.logger = logging.getLogger('openmano.vim.aws')
if log_level:
import yaml
import random
-from novaclient import client as nClient_v2, exceptions as nvExceptions
-from novaclient import api_versions
-import keystoneclient.v2_0.client as ksClient_v2
-from novaclient.v2.client import Client as nClient
-import keystoneclient.v3.client as ksClient
+from novaclient import client as nClient, exceptions as nvExceptions
+from keystoneauth1.identity import v2, v3
+from keystoneauth1 import session
import keystoneclient.exceptions as ksExceptions
-import glanceclient.v2.client as glClient
+from glanceclient import client as glClient
import glanceclient.client as gl1Client
import glanceclient.exc as gl1Exceptions
-import cinderclient.v2.client as cClient_v2
+from cinderclient import client as cClient
from httplib import HTTPException
-from neutronclient.neutron import client as neClient_v2
-from neutronclient.v2_0 import client as neClient
+from neutronclient.neutron import client as neClient
from neutronclient.common import exceptions as neExceptions
from requests.exceptions import ConnectionError
'url' is the keystone authorization url,
'url_admin' is not use
'''
- self.osc_api_version = 'v2.0'
- if config.get('APIversion') == 'v3.3':
- self.osc_api_version = 'v3.3'
- vimconn.vimconnector.__init__(self, uuid, name, tenant_id, tenant_name, url, url_admin, user, passwd, log_level, config)
+ self.osc_api_version = config.get('APIversion')
+ if self.osc_api_version != 'v3.3' and self.osc_api_version != 'v2.0' and self.osc_api_version:
+ raise vimconn.vimconnException("Invalid value '{}' for config:APIversion. "
+ "Allowed values are 'v3.3' or 'v2.0'".format(self.osc_api_version))
+ vimconn.vimconnector.__init__(self, uuid, name, tenant_id, tenant_name, url, url_admin, user, passwd, log_level,
+ config)
- self.persistent_info = persistent_info
- self.k_creds={}
- self.n_creds={}
- if self.config.get("insecure"):
- self.k_creds["insecure"] = True
- self.n_creds["insecure"] = True
+ self.insecure = self.config.get("insecure", False)
if not url:
raise TypeError, 'url param can not be NoneType'
- self.k_creds['auth_url'] = url
- self.n_creds['auth_url'] = url
- if tenant_name:
- self.k_creds['tenant_name'] = tenant_name
- self.n_creds['project_id'] = tenant_name
- if tenant_id:
- self.k_creds['tenant_id'] = tenant_id
- self.n_creds['tenant_id'] = tenant_id
- if user:
- self.k_creds['username'] = user
- self.n_creds['username'] = user
- if passwd:
- self.k_creds['password'] = passwd
- self.n_creds['api_key'] = passwd
- if self.osc_api_version == 'v3.3':
- self.k_creds['project_name'] = tenant_name
- self.k_creds['project_id'] = tenant_id
- if config.get('region_name'):
- self.k_creds['region_name'] = config.get('region_name')
- self.n_creds['region_name'] = config.get('region_name')
+ self.auth_url = url
+ self.tenant_name = tenant_name
+ self.tenant_id = tenant_id
+ self.user = user
+ self.passwd = passwd
+ self.persistent_info = persistent_info
+ self.session = persistent_info.get('session', {'reload_client': True})
+ self.nova = self.session.get('nova')
+ self.neutron = self.session.get('neutron')
+ self.cinder = self.session.get('cinder')
+ self.glance = self.session.get('glance')
- self.reload_client = True
self.logger = logging.getLogger('openmano.vim.openstack')
if log_level:
self.logger.setLevel( getattr(logging, log_level) )
def __setitem__(self,index, value):
- '''Set individuals parameters
+ '''Set individuals parameters
Throw TypeError, KeyError
'''
- if index=='tenant_id':
- self.reload_client=True
- self.tenant_id = value
- if self.osc_api_version == 'v3.3':
- if value:
- self.k_creds['project_id'] = value
- self.n_creds['project_id'] = value
- else:
- del self.k_creds['project_id']
- del self.n_creds['project_id']
- else:
- if value:
- self.k_creds['tenant_id'] = value
- self.n_creds['tenant_id'] = value
- else:
- del self.k_creds['tenant_id']
- del self.n_creds['tenant_id']
- elif index=='tenant_name':
- self.reload_client=True
- self.tenant_name = value
- if self.osc_api_version == 'v3.3':
- if value:
- self.k_creds['project_name'] = value
- self.n_creds['project_name'] = value
- else:
- del self.k_creds['project_name']
- del self.n_creds['project_name']
- else:
- if value:
- self.k_creds['tenant_name'] = value
- self.n_creds['project_id'] = value
- else:
- del self.k_creds['tenant_name']
- del self.n_creds['project_id']
- elif index=='user':
- self.reload_client=True
- self.user = value
- if value:
- self.k_creds['username'] = value
- self.n_creds['username'] = value
- else:
- del self.k_creds['username']
- del self.n_creds['username']
- elif index=='passwd':
- self.reload_client=True
- self.passwd = value
- if value:
- self.k_creds['password'] = value
- self.n_creds['api_key'] = value
- else:
- del self.k_creds['password']
- del self.n_creds['api_key']
- elif index=='url':
- self.reload_client=True
- self.url = value
- if value:
- self.k_creds['auth_url'] = value
- self.n_creds['auth_url'] = value
- else:
- raise TypeError, 'url param can not be NoneType'
- else:
- vimconn.vimconnector.__setitem__(self,index, value)
+ self.session['reload_client'] = True
+ vimconn.vimconnector.__setitem__(self,index, value)
def _reload_connection(self):
'''Called before any operation, it check if credentials has changed
Throw keystoneclient.apiclient.exceptions.AuthorizationFailure
'''
#TODO control the timing and possible token timeout, but it seams that python client does this task for us :-)
- if self.reload_client:
- #test valid params
- if len(self.n_creds) <4:
- raise ksExceptions.ClientException("Not enough parameters to connect to openstack")
- if self.osc_api_version == 'v3.3':
- self.nova = nClient(api_version=api_versions.APIVersion(version_str='2.0'), **self.n_creds)
- #TODO To be updated for v3
- #self.cinder = cClient.Client(**self.n_creds)
- self.keystone = ksClient.Client(**self.k_creds)
- self.ne_endpoint=self.keystone.service_catalog.url_for(service_type='network', endpoint_type='publicURL')
- self.neutron = neClient.Client(api_version=api_versions.APIVersion(version_str='2.0'), endpoint_url=self.ne_endpoint, token=self.keystone.auth_token, **self.k_creds)
+ if self.session['reload_client']:
+ if self.osc_api_version == 'v3.3' or self.osc_api_version == '3' or \
+ (not self.osc_api_version and self.auth_url.split("/")[-1] == "v3"):
+ auth = v3.Password(auth_url=self.auth_url,
+ username=self.user,
+ password=self.passwd,
+ project_name=self.tenant_name,
+ project_id=self.tenant_id,
+ project_domain_id=self.config.get('project_domain_id', 'default'),
+ user_domain_id=self.config.get('user_domain_id', 'default'))
else:
- self.nova = nClient_v2.Client(version='2', **self.n_creds)
- self.cinder = cClient_v2.Client(**self.n_creds)
- self.keystone = ksClient_v2.Client(**self.k_creds)
- self.ne_endpoint=self.keystone.service_catalog.url_for(service_type='network', endpoint_type='publicURL')
- self.neutron = neClient_v2.Client('2.0', endpoint_url=self.ne_endpoint, token=self.keystone.auth_token, **self.k_creds)
- self.glance_endpoint = self.keystone.service_catalog.url_for(service_type='image', endpoint_type='publicURL')
- self.glance = glClient.Client(self.glance_endpoint, token=self.keystone.auth_token, **self.k_creds) #TODO check k_creds vs n_creds
- self.reload_client = False
+ auth = v2.Password(auth_url=self.auth_url,
+ username=self.user,
+ password=self.passwd,
+ tenant_name=self.tenant_name,
+ tenant_id=self.tenant_id)
+ sess = session.Session(auth=auth, verify=not self.insecure)
+ self.nova = self.session['nova'] = nClient.Client("2.1", session=sess)
+ self.neutron = self.session['neutron'] = neClient.Client('2.0', session=sess)
+ self.cinder = self.session['cinder'] = cClient.Client(2, session=sess)
+ self.glance = self.session['glance'] = glClient.Client(2, session=sess)
+ self.session['reload_client'] = False
+ self.persistent_info['session'] = self.session
def __net_os2mano(self, net_list_dict):
'''Transform the net openstack format to mano format
if 'gateway_address' in ip_profile:
subnet['gateway_ip'] = ip_profile['gateway_address']
if ip_profile.get('dns_address'):
- #TODO: manage dns_address as a list of addresses separated by commas
- subnet['dns_nameservers'] = []
- subnet['dns_nameservers'].append(ip_profile['dns_address'])
+ subnet['dns_nameservers'] = ip_profile['dns_address'].split(";")
if 'dhcp_enabled' in ip_profile:
subnet['enable_dhcp'] = False if ip_profile['dhcp_enabled']=="false" else True
if 'dhcp_start_address' in ip_profile:
def get_flavor_id_from_data(self, flavor_dict):
"""Obtain flavor id that match the flavor description
Returns the flavor_id or raises a vimconnNotFoundException
+ flavor_dict: contains the required ram, vcpus, disk
+ If 'use_existing_flavors' is set to True at config, the closer flavor that provides same or more ram, vcpus
+ and disk is returned. Otherwise a flavor with exactly same ram, vcpus and disk is returned or a
+ vimconnNotFoundException is raised
"""
+ exact_match = False if self.config.get('use_existing_flavors') else True
try:
self._reload_connection()
- numa=None
- numas = flavor_dict.get("extended",{}).get("numas")
+ flavor_candidate_id = None
+ flavor_candidate_data = (10000, 10000, 10000)
+ flavor_target = (flavor_dict["ram"], flavor_dict["vcpus"], flavor_dict["disk"])
+ # numa=None
+ numas = flavor_dict.get("extended", {}).get("numas")
if numas:
#TODO
raise vimconn.vimconnNotFoundException("Flavor with EPA still not implemted")
epa = flavor.get_keys()
if epa:
continue
- #TODO
- if flavor.ram != flavor_dict["ram"]:
- continue
- if flavor.vcpus != flavor_dict["vcpus"]:
- continue
- if flavor.disk != flavor_dict["disk"]:
- continue
- return flavor.id
+ # TODO
+ flavor_data = (flavor.ram, flavor.vcpus, flavor.disk)
+ if flavor_data == flavor_target:
+ return flavor.id
+ elif not exact_match and flavor_target < flavor_data < flavor_candidate_data:
+ flavor_candidate_id = flavor.id
+ flavor_candidate_data = flavor_data
+ if not exact_match and flavor_candidate_id:
+ return flavor_candidate_id
raise vimconn.vimconnNotFoundException("Cannot find any flavor matching '{}'".format(str(flavor_dict)))
except (nvExceptions.NotFound, nvExceptions.ClientException, ksExceptions.ClientException, ConnectionError) as e:
self._format_exception(e)
metadata: metadata of the image
Returns the image_id
'''
+ # ALF TODO: revise and change for the new method or session
#using version 1 of glance client
glancev1 = gl1Client.Client('1',self.glance_endpoint, token=self.keystone.auth_token, **self.k_creds) #TODO check k_creds vs n_creds
retry=0
'''Returns the flavor identifier'''
try:
new_flavor_dict = flavor_data.copy()
+ for device in new_flavor_dict.get('extended', {}).get('devices', ()):
+ if 'image name' in device:
+ del device['image name']
new_flavor_dict["name"] = flavor_data["name"][:64]
self._get_my_tenant()
payload_req = json.dumps({'flavor': new_flavor_dict})
# -*- coding: utf-8 -*-
##
-# Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
-# This file is part of openmano
+# Copyright 2016-2017 VMware Inc.
+# This file is part of ETSI OSM
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
-# contact with: nfvlabs@tid.es
+# contact: osslegalrouting@vmware.com
##
"""
#Unused in case of Underlay (data/ptp) network interface.
fence_mode="bridged"
is_inherited='false'
+ dns_list = dns_address.split(";")
+ dns1 = dns_list[0]
+ dns2_text = ""
+ if len(dns_list) >= 2:
+ dns2_text = "\n <Dns2>{}</Dns2>\n".format(dns_list[1])
data = """ <OrgVdcNetwork name="{0:s}" xmlns="http://www.vmware.com/vcloud/v1.5">
<Description>Openmano created</Description>
<Configuration>
<IsInherited>{1:s}</IsInherited>
<Gateway>{2:s}</Gateway>
<Netmask>{3:s}</Netmask>
- <Dns1>{4:s}</Dns1>
- <IsEnabled>{5:s}</IsEnabled>
+ <Dns1>{4:s}</Dns1>{5:s}
+ <IsEnabled>{6:s}</IsEnabled>
<IpRanges>
<IpRange>
- <StartAddress>{6:s}</StartAddress>
- <EndAddress>{7:s}</EndAddress>
+ <StartAddress>{7:s}</StartAddress>
+ <EndAddress>{8:s}</EndAddress>
</IpRange>
</IpRanges>
</IpScope>
</IpScopes>
- <ParentNetwork href="{8:s}"/>
- <FenceMode>{9:s}</FenceMode>
+ <ParentNetwork href="{9:s}"/>
+ <FenceMode>{10:s}</FenceMode>
</Configuration>
- <IsShared>{10:s}</IsShared>
+ <IsShared>{11:s}</IsShared>
</OrgVdcNetwork> """.format(escape(network_name), is_inherited, gateway_address,
- subnet_address, dns_address, dhcp_enabled,
+ subnet_address, dns1, dns2_text, dhcp_enabled,
dhcp_start_address, dhcp_end_address, available_networks,
fence_mode, isshared)
argcomplete
requests
logutils
+python-openstackclient
python-novaclient
python-keystoneclient
python-glanceclient
[ "$_DISTRO" == "CentOS" -o "$_DISTRO" == "Red" ] && install_packages "python-boto" #TODO check if at Centos it exists with this name, or PIP should be used
# install openstack client needed for using openstack as a VIM
- [ "$_DISTRO" == "Ubuntu" ] && install_packages "python-novaclient python-keystoneclient python-glanceclient python-neutronclient python-cinderclient"
- [ "$_DISTRO" == "CentOS" -o "$_DISTRO" == "Red" ] && install_packages "python-devel" && easy_install python-novaclient python-keystoneclient python-glanceclient python-neutronclient python-cinderclient #TODO revise if gcc python-pip is needed
+ [ "$_DISTRO" == "Ubuntu" ] && install_packages "python-novaclient python-keystoneclient python-glanceclient "\
+ "python-neutronclient python-cinderclient python-openstackclient"
+ [ "$_DISTRO" == "CentOS" -o "$_DISTRO" == "Red" ] && install_packages "python-devel" && easy_install \
+ python-novaclient python-keystoneclient python-glanceclient python-neutronclient python-cinderclient \
+ python-openstackclient #TODO revise if gcc python-pip is needed
fi # [[ -z "$NO_PACKAGES" ]]
if [[ -z $NOCLONE ]]; then
"argcomplete",
"requests",
"logutils",
+ "python-openstackclient",
"python-novaclient",
"python-keystoneclient",
"python-glanceclient",
Suite: xenial
XS-Python-Version: >= 2.7
Maintainer: Gerardo Garcia <gerardo.garciadeblas@telefonica.com>
-Depends: python-pip, libmysqlclient-dev, libssl-dev, libffi-dev, python-argcomplete, python-boto, python-bottle, python-jsonschema, python-logutils, python-cinderclient, python-glanceclient, python-keystoneclient, python-neutronclient, python-novaclient, python-mysqldb
+Depends: python-pip, libmysqlclient-dev, libssl-dev, libffi-dev, python-argcomplete, python-boto, python-bottle, python-jsonschema, python-logutils, python-cinderclient, python-glanceclient, python-keystoneclient, python-neutronclient, python-novaclient, python-openstackclient, python-mysqldb
'''
Module for testing openmano functionality. It uses openmanoclient.py for invoking openmano
'''
-__author__="Pablo Montes, Alfonso Tierno"
-__date__ ="$16-Feb-2017 17:08:16$"
-__version__="0.0.3"
-version_date="May 2017"
+__author__ = "Pablo Montes, Alfonso Tierno"
+__date__ = "$16-Feb-2017 17:08:16$"
+__version__ = "0.0.4"
+version_date = "Jun 2017"
import logging
import os
global test_config # used for global variables with the test configuration
test_config = {}
+class test_base(unittest.TestCase):
+ test_index = 1
+ test_text = None
+
+ @classmethod
+ def setUpClass(cls):
+ logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
+
+ @classmethod
+ def tearDownClass(cls):
+ test_config["test_number"] += 1
+
+ def tearDown(self):
+ exec_info = sys.exc_info()
+ if exec_info == (None, None, None):
+ logger.info(self.__class__.test_text+" -> TEST OK")
+ else:
+ logger.warning(self.__class__.test_text+" -> TEST NOK")
+ logger.critical("Traceback error",exc_info=True)
+
def check_instance_scenario_active(uuid):
instance = test_config["client"].get_instance(uuid=uuid)
IMPORTANT NOTE
All unittest classes for code based tests must have prefix 'test_' in order to be taken into account for tests
'''
-class test_VIM_datacenter_tenant_operations(unittest.TestCase):
- test_index = 1
+class test_VIM_datacenter_tenant_operations(test_base):
tenant_name = None
- test_text = None
-
- @classmethod
- def setUpClass(cls):
- logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
-
- @classmethod
- def tearDownClass(cls):
- test_config["test_number"] += 1
-
- def tearDown(self):
- exec_info = sys.exc_info()
- if exec_info == (None, None, None):
- logger.info(self.__class__.test_text+" -> TEST OK")
- else:
- logger.warning(self.__class__.test_text+" -> TEST NOK")
- error_trace = traceback.format_exception(exec_info[0], exec_info[1], exec_info[2])
- msg = ""
- for line in error_trace:
- msg = msg + line
- logger.critical("{}".format(msg))
def test_000_create_RO_tenant(self):
self.__class__.tenant_name = _get_random_string(20)
assert('deleted' in tenant.get('result',""))
-class test_VIM_datacenter_operations(unittest.TestCase):
- test_index = 1
+class test_VIM_datacenter_operations(test_base):
datacenter_name = None
- test_text = None
-
- @classmethod
- def setUpClass(cls):
- logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
-
- @classmethod
- def tearDownClass(cls):
- test_config["test_number"] += 1
-
- def tearDown(self):
- exec_info = sys.exc_info()
- if exec_info == (None, None, None):
- logger.info(self.__class__.test_text+" -> TEST OK")
- else:
- logger.warning(self.__class__.test_text+" -> TEST NOK")
- error_trace = traceback.format_exception(exec_info[0], exec_info[1], exec_info[2])
- msg = ""
- for line in error_trace:
- msg = msg + line
- logger.critical("{}".format(msg))
def test_000_create_datacenter(self):
self.__class__.test_text = "{}.{}. TEST {}".format(test_config["test_number"], self.__class__.test_index,
assert('deleted' in self.datacenter.get('result',""))
-class test_VIM_network_operations(unittest.TestCase):
- test_index = 1
+class test_VIM_network_operations(test_base):
vim_network_name = None
- test_text = None
vim_network_uuid = None
- @classmethod
- def setUpClass(cls):
- logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
-
- @classmethod
- def tearDownClass(cls):
- test_config["test_number"] += 1
-
- def tearDown(self):
- exec_info = sys.exc_info()
- if exec_info == (None, None, None):
- logger.info(self.__class__.test_text + " -> TEST OK")
- else:
- logger.warning(self.__class__.test_text + " -> TEST NOK")
- error_trace = traceback.format_exception(exec_info[0], exec_info[1], exec_info[2])
- msg = ""
- for line in error_trace:
- msg = msg + line
- logger.critical("{}".format(msg))
-
def test_000_create_VIM_network(self):
self.__class__.test_text = "{}.{}. TEST {}".format(test_config["test_number"], self.__class__.test_index,
inspect.currentframe().f_code.co_name)
assert ('deleted' in network.get('result', ""))
-class test_VIM_image_operations(unittest.TestCase):
- test_index = 1
- test_text = None
-
- @classmethod
- def setUpClass(cls):
- logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
-
- @classmethod
- def tearDownClass(cls):
- test_config["test_number"] += 1
-
- def tearDown(self):
- exec_info = sys.exc_info()
- if exec_info == (None, None, None):
- logger.info(self.__class__.test_text + " -> TEST OK")
- else:
- logger.warning(self.__class__.test_text + " -> TEST NOK")
- error_trace = traceback.format_exception(exec_info[0], exec_info[1], exec_info[2])
- msg = ""
- for line in error_trace:
- msg = msg + line
- logger.critical("{}".format(msg))
+class test_VIM_image_operations(test_base):
def test_000_list_VIM_images(self):
self.__class__.test_text = "{}.{}. TEST {}".format(test_config["test_number"], self.__class__.test_index,
In case of OpenStack datacenter these tests will only success if RO has access to the admin endpoint
This test will only be executed in case it is specifically requested by the user
'''
-class test_VIM_tenant_operations(unittest.TestCase):
- test_index = 1
+class test_VIM_tenant_operations(test_base):
vim_tenant_name = None
- test_text = None
vim_tenant_uuid = None
@classmethod
def setUpClass(cls):
- logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
+ test_base.setUpClass(cls)
logger.warning("In case of OpenStack datacenter these tests will only success "
"if RO has access to the admin endpoint")
- @classmethod
- def tearDownClass(cls):
- test_config["test_number"] += 1
-
- def tearDown(self):
- exec_info = sys.exc_info()
- if exec_info == (None, None, None):
- logger.info(self.__class__.test_text + " -> TEST OK")
- else:
- logger.warning(self.__class__.test_text + " -> TEST NOK")
- error_trace = traceback.format_exception(exec_info[0], exec_info[1], exec_info[2])
- msg = ""
- for line in error_trace:
- msg = msg + line
- logger.critical("{}".format(msg))
-
def test_000_create_VIM_tenant(self):
self.__class__.test_text = "{}.{}. TEST {}".format(test_config["test_number"], self.__class__.test_index,
inspect.currentframe().f_code.co_name)
logger.debug("{}".format(tenant))
assert ('deleted' in tenant.get('result', ""))
-class test_vimconn_connect(unittest.TestCase):
- test_index = 1
- test_text = None
+class test_vimconn_connect(test_base):
+ # test_index = 1
+ # test_text = None
- @classmethod
- def setUpClass(cls):
- logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
+ # @classmethod
+ # def setUpClass(cls):
+ # logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
- @classmethod
- def tearDownClass(cls):
- test_config["test_number"] += 1
+ # @classmethod
+ # def tearDownClass(cls):
+ # test_config["test_number"] += 1
- def tearDown(self):
- exec_info = sys.exc_info()
- if exec_info == (None, None, None):
- logger.info(self.__class__.test_text+" -> TEST OK")
- else:
- logger.warning(self.__class__.test_text+" -> TEST NOK")
- logger.critical("Traceback error",exc_info=True)
+ # def tearDown(self):
+ # exec_info = sys.exc_info()
+ # if exec_info == (None, None, None):
+ # logger.info(self.__class__.test_text+" -> TEST OK")
+ # else:
+ # logger.warning(self.__class__.test_text+" -> TEST NOK")
+ # logger.critical("Traceback error",exc_info=True)
def test_000_connect(self):
self.__class__.test_text = "{}.{}. TEST {}".format(test_config["test_number"],
self.assertIsInstance(vca_object, VCA)
-class test_vimconn_new_network(unittest.TestCase):
- test_index = 1
+class test_vimconn_new_network(test_base):
+ # test_index = 1
network_name = None
- test_text = None
+ # test_text = None
- @classmethod
- def setUpClass(cls):
- logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
+ # @classmethod
+ # def setUpClass(cls):
+ # logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
- @classmethod
- def tearDownClass(cls):
- test_config["test_number"] += 1
+ # @classmethod
+ # def tearDownClass(cls):
+ # test_config["test_number"] += 1
- def tearDown(self):
- exec_info = sys.exc_info()
- if exec_info == (None, None, None):
- logger.info(self.__class__.test_text+" -> TEST OK")
- else:
- logger.warning(self.__class__.test_text+" -> TEST NOK")
- logger.critical("Traceback error",exc_info=True)
+ # def tearDown(self):
+ # exec_info = sys.exc_info()
+ # if exec_info == (None, None, None):
+ # logger.info(self.__class__.test_text+" -> TEST OK")
+ # else:
+ # logger.warning(self.__class__.test_text+" -> TEST NOK")
+ # logger.critical("Traceback error",exc_info=True)
def test_000_new_network(self):
self.__class__.network_name = _get_random_string(20)
else:
logger.info("Failed to delete network id {}".format(self.__class__.network_id))
-class test_vimconn_get_network_list(unittest.TestCase):
- test_index = 1
+class test_vimconn_get_network_list(test_base):
+ # test_index = 1
network_name = None
- test_text = None
- @classmethod
- def setUpClass(cls):
- logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
+ # test_text = None
+ # @classmethod
+ # def setUpClass(cls):
+ # logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
- @classmethod
- def tearDownClass(cls):
- test_config["test_number"] += 1
+ # @classmethod
+ # def tearDownClass(cls):
+ # test_config["test_number"] += 1
def setUp(self):
# creating new network
logger.debug("{}".format(network))
def tearDown(self):
- exec_info = sys.exc_info()
- if exec_info == (None, None, None):
- logger.info(self.__class__.test_text+" -> TEST OK")
- else:
- logger.warning(self.__class__.test_text+" -> TEST NOK")
- logger.critical("Traceback error",exc_info=True)
+ test_base.tearDown(self)
+ # exec_info = sys.exc_info()
+ # if exec_info == (None, None, None):
+ # logger.info(self.__class__.test_text+" -> TEST OK")
+ # else:
+ # logger.warning(self.__class__.test_text+" -> TEST NOK")
+ # logger.critical("Traceback error",exc_info=True)
# Deleting created network
result = test_config["vim_conn"].delete_network(self.__class__.network_id)
network_list = test_config["vim_conn"].get_network_list({'name': 'unknown_name'})
self.assertEqual(network_list, [])
-class test_vimconn_get_network(unittest.TestCase):
- test_index = 1
+class test_vimconn_get_network(test_base):
+ # test_index = 1
network_name = None
- test_text = None
+ # test_text = None
- @classmethod
- def setUpClass(cls):
- logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
+ # @classmethod
+ # def setUpClass(cls):
+ # logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
- @classmethod
- def tearDownClass(cls):
- test_config["test_number"] += 1
+ # @classmethod
+ # def tearDownClass(cls):
+ # test_config["test_number"] += 1
def setUp(self):
# creating new network
logger.debug("{}".format(network))
def tearDown(self):
- exec_info = sys.exc_info()
- if exec_info == (None, None, None):
- logger.info(self.__class__.test_text+" -> TEST OK")
- else:
- logger.warning(self.__class__.test_text+" -> TEST NOK")
- logger.critical("Traceback error",exc_info=True)
+ test_base.tearDown(self)
+ # exec_info = sys.exc_info()
+ # if exec_info == (None, None, None):
+ # logger.info(self.__class__.test_text+" -> TEST OK")
+ # else:
+ # logger.warning(self.__class__.test_text+" -> TEST NOK")
+ # logger.critical("Traceback error",exc_info=True)
# Deleting created network
result = test_config["vim_conn"].delete_network(self.__class__.network_id)
network_info = test_config["vim_conn"].get_network(Non_exist_id)
self.assertEqual(network_info, {})
-class test_vimconn_delete_network(unittest.TestCase):
- test_index = 1
+class test_vimconn_delete_network(test_base):
+ # test_index = 1
network_name = None
- test_text = None
+ # test_text = None
- @classmethod
- def setUpClass(cls):
- logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
+ # @classmethod
+ # def setUpClass(cls):
+ # logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
- @classmethod
- def tearDownClass(cls):
- test_config["test_number"] += 1
+ # @classmethod
+ # def tearDownClass(cls):
+ # test_config["test_number"] += 1
- def tearDown(self):
- exec_info = sys.exc_info()
- if exec_info == (None, None, None):
- logger.info(self.__class__.test_text+" -> TEST OK")
- else:
- logger.warning(self.__class__.test_text+" -> TEST NOK")
- logger.critical("Traceback error",exc_info=True)
+ # def tearDown(self):
+ # exec_info = sys.exc_info()
+ # if exec_info == (None, None, None):
+ # logger.info(self.__class__.test_text+" -> TEST OK")
+ # else:
+ # logger.warning(self.__class__.test_text+" -> TEST NOK")
+ # logger.critical("Traceback error",exc_info=True)
def test_000_delete_network(self):
# Creating network
self.assertEqual((context.exception).http_code, 400)
-class test_vimconn_get_flavor(unittest.TestCase):
- test_index = 1
- test_text = None
+class test_vimconn_get_flavor(test_base):
+ # test_index = 1
+ # test_text = None
- @classmethod
- def setUpClass(cls):
- logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
+ # @classmethod
+ # def setUpClass(cls):
+ # logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
- @classmethod
- def tearDownClass(cls):
- test_config["test_number"] += 1
+ # @classmethod
+ # def tearDownClass(cls):
+ # test_config["test_number"] += 1
- def tearDown(self):
- exec_info = sys.exc_info()
- if exec_info == (None, None, None):
- logger.info(self.__class__.test_text+" -> TEST OK")
- else:
- logger.warning(self.__class__.test_text+" -> TEST NOK")
- logger.critical("Traceback error",exc_info=True)
+ # def tearDown(self):
+ # exec_info = sys.exc_info()
+ # if exec_info == (None, None, None):
+ # logger.info(self.__class__.test_text+" -> TEST OK")
+ # else:
+ # logger.warning(self.__class__.test_text+" -> TEST NOK")
+ # logger.critical("Traceback error",exc_info=True)
def test_000_get_flavor(self):
test_directory_content = os.listdir(test_config["test_directory"])
The following unittest class does not have the 'test_' on purpose. This test is the one used for the
scenario based tests.
'''
-class descriptor_based_scenario_test(unittest.TestCase):
+class descriptor_based_scenario_test(test_base):
test_index = 0
- test_text = None
scenario_test_path = None
scenario_uuid = None
instance_scenario_uuid = None
def tearDownClass(cls):
test_config["test_number"] += 1
- def tearDown(self):
- exec_info = sys.exc_info()
- if exec_info == (None, None, None):
- logger.info(self.__class__.test_text + " -> TEST OK")
- else:
- logger.warning(self.__class__.test_text + " -> TEST NOK")
- error_trace = traceback.format_exception(exec_info[0], exec_info[1], exec_info[2])
- msg = ""
- for line in error_trace:
- msg = msg + line
- logger.critical("{}".format(msg))
-
-
def test_000_load_scenario(self):
self.__class__.test_text = "{}.{}. TEST {} {}".format(test_config["test_number"], self.__class__.test_index,
inspect.currentframe().f_code.co_name,
test_directory_content = os.listdir(test_config["test_directory"])
# If only want to obtain a tests list print it and exit
if args.list_tests:
- msg = "he 'deploy' set tests are:\n\t" + ', '.join(sorted(test_directory_content))
+ msg = "the 'deploy' set tests are:\n\t" + ', '.join(sorted(test_directory_content))
print(msg)
- logger.info(msg)
+ # logger.info(msg)
sys.exit(0)
descriptor_based_tests = []