2 # -*- coding: utf-8 -*-
5 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
6 # This file is part of openmano
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact with: nfvlabs@tid.es
26 Module for testing openmano functionality. It uses openmanoclient.py for invoking openmano
28 __author__
="Pablo Montes"
29 __date__
="$16-Feb-2017 17:08:16$"
31 version_date
="Feb 2017"
36 from optparse
import OptionParser
49 global scenario_test_folder
50 global test_image_name
51 global management_network
55 All unittest classes for code based tests must have prefix 'test_' in order to be taken into account for tests
57 class test_tenant_operations(unittest
.TestCase
):
64 logger
.info("{}. {}".format(test_number
, cls
.__name
__))
67 def tearDownClass(cls
):
68 globals().__setitem
__('test_number', globals().__getitem
__('test_number') + 1)
71 exec_info
= sys
.exc_info()
72 if exec_info
== (None, None, None):
73 logger
.info(self
.__class
__.test_text
+" -> TEST OK")
75 logger
.warning(self
.__class
__.test_text
+" -> TEST NOK")
76 error_trace
= traceback
.format_exception(exec_info
[0], exec_info
[1], exec_info
[2])
78 for line
in error_trace
:
80 logger
.critical("{}".format(msg
))
82 def test_000_create_RO_tenant(self
):
83 self
.__class
__.tenant_name
= _get_random_string(20)
84 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
85 inspect
.currentframe().f_code
.co_name
)
86 self
.__class
__.test_index
+= 1
87 tenant
= client
.create_tenant(name
=self
.__class
__.tenant_name
, description
=self
.__class
__.tenant_name
)
88 logger
.debug("{}".format(tenant
))
89 self
.assertEqual(tenant
.get('tenant', {}).get('name', ''), self
.__class
__.tenant_name
)
91 def test_010_list_RO_tenant(self
):
92 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
93 inspect
.currentframe().f_code
.co_name
)
94 self
.__class
__.test_index
+= 1
95 tenant
= client
.get_tenant(name
=self
.__class
__.tenant_name
)
96 logger
.debug("{}".format(tenant
))
97 self
.assertEqual(tenant
.get('tenant', {}).get('name', ''), self
.__class
__.tenant_name
)
99 def test_020_delete_RO_tenant(self
):
100 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
101 inspect
.currentframe().f_code
.co_name
)
102 self
.__class
__.test_index
+= 1
103 tenant
= client
.delete_tenant(name
=self
.__class
__.tenant_name
)
104 logger
.debug("{}".format(tenant
))
105 assert('deleted' in tenant
.get('result',""))
107 class test_datacenter_operations(unittest
.TestCase
):
109 datacenter_name
= None
114 logger
.info("{}. {}".format(test_number
, cls
.__name
__))
117 def tearDownClass(cls
):
118 globals().__setitem
__('test_number', globals().__getitem
__('test_number') + 1)
121 exec_info
= sys
.exc_info()
122 if exec_info
== (None, None, None):
123 logger
.info(self
.__class
__.test_text
+" -> TEST OK")
125 logger
.warning(self
.__class
__.test_text
+" -> TEST NOK")
126 error_trace
= traceback
.format_exception(exec_info
[0], exec_info
[1], exec_info
[2])
128 for line
in error_trace
:
130 logger
.critical("{}".format(msg
))
132 def test_000_create_datacenter(self
):
133 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
134 inspect
.currentframe().f_code
.co_name
)
135 self
.__class
__.datacenter_name
= _get_random_string(20)
136 self
.__class
__.test_index
+= 1
137 self
.datacenter
= client
.create_datacenter(name
=self
.__class
__.datacenter_name
, vim_url
="http://fakeurl/fake")
138 logger
.debug("{}".format(self
.datacenter
))
139 self
.assertEqual (self
.datacenter
.get('datacenter', {}).get('name',''), self
.__class
__.datacenter_name
)
141 def test_010_list_datacenter(self
):
142 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
143 inspect
.currentframe().f_code
.co_name
)
145 self
.__class
__.test_index
+= 1
146 self
.datacenter
= client
.get_datacenter(all_tenants
=True, name
=self
.__class
__.datacenter_name
)
147 logger
.debug("{}".format(self
.datacenter
))
148 self
.assertEqual (self
.datacenter
.get('datacenter', {}).get('name', ''), self
.__class
__.datacenter_name
)
150 def test_020_attach_datacenter(self
):
151 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
152 inspect
.currentframe().f_code
.co_name
)
154 self
.__class
__.test_index
+= 1
155 self
.datacenter
= client
.attach_datacenter(name
=self
.__class
__.datacenter_name
, vim_tenant_name
='fake')
156 logger
.debug("{}".format(self
.datacenter
))
157 assert ('vim_tenants' in self
.datacenter
.get('datacenter', {}))
159 def test_030_list_attached_datacenter(self
):
160 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
161 inspect
.currentframe().f_code
.co_name
)
163 self
.__class
__.test_index
+= 1
164 self
.datacenter
= client
.get_datacenter(all_tenants
=False, name
=self
.__class
__.datacenter_name
)
165 logger
.debug("{}".format(self
.datacenter
))
166 self
.assertEqual (self
.datacenter
.get('datacenter', {}).get('name', ''), self
.__class
__.datacenter_name
)
168 def test_040_detach_datacenter(self
):
169 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
170 inspect
.currentframe().f_code
.co_name
)
172 self
.__class
__.test_index
+= 1
173 self
.datacenter
= client
.detach_datacenter(name
=self
.__class
__.datacenter_name
)
174 logger
.debug("{}".format(self
.datacenter
))
175 assert ('detached' in self
.datacenter
.get('result', ""))
177 def test_050_delete_datacenter(self
):
178 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
179 inspect
.currentframe().f_code
.co_name
)
181 self
.__class
__.test_index
+= 1
182 self
.datacenter
= client
.delete_datacenter(name
=self
.__class
__.datacenter_name
)
183 logger
.debug("{}".format(self
.datacenter
))
184 assert('deleted' in self
.datacenter
.get('result',""))
186 class test_VIM_network_operations(unittest
.TestCase
):
188 vim_network_name
= None
190 vim_network_uuid
= None
194 logger
.info("{}. {}".format(test_number
, cls
.__name
__))
197 def tearDownClass(cls
):
198 globals().__setitem
__('test_number', globals().__getitem
__('test_number') + 1)
201 exec_info
= sys
.exc_info()
202 if exec_info
== (None, None, None):
203 logger
.info(self
.__class
__.test_text
+ " -> TEST OK")
205 logger
.warning(self
.__class
__.test_text
+ " -> TEST NOK")
206 error_trace
= traceback
.format_exception(exec_info
[0], exec_info
[1], exec_info
[2])
208 for line
in error_trace
:
210 logger
.critical("{}".format(msg
))
212 def test_000_create_VIM_network(self
):
213 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
214 inspect
.currentframe().f_code
.co_name
)
215 self
.__class
__.vim_network_name
= _get_random_string(20)
216 self
.__class
__.test_index
+= 1
217 network
= client
.vim_action("create", "networks", name
=self
.__class
__.vim_network_name
)
218 logger
.debug("{}".format(network
))
219 self
.__class
__.vim_network_uuid
= network
["network"]["id"]
220 self
.assertEqual(network
.get('network', {}).get('name', ''), self
.__class
__.vim_network_name
)
222 def test_010_list_VIM_networks(self
):
223 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
224 inspect
.currentframe().f_code
.co_name
)
225 self
.__class
__.test_index
+= 1
226 networks
= client
.vim_action("list", "networks")
227 logger
.debug("{}".format(networks
))
229 def test_020_get_VIM_network_by_uuid(self
):
230 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
231 inspect
.currentframe().f_code
.co_name
)
233 self
.__class
__.test_index
+= 1
234 network
= client
.vim_action("show", "networks", uuid
=self
.__class
__.vim_network_uuid
)
235 logger
.debug("{}".format(network
))
236 self
.assertEqual(network
.get('network', {}).get('name', ''), self
.__class
__.vim_network_name
)
238 def test_030_delete_VIM_network_by_uuid(self
):
239 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
240 inspect
.currentframe().f_code
.co_name
)
242 self
.__class
__.test_index
+= 1
243 network
= client
.vim_action("delete", "networks", uuid
=self
.__class
__.vim_network_uuid
)
244 logger
.debug("{}".format(network
))
245 assert ('deleted' in network
.get('result', ""))
247 class test_VIM_image_operations(unittest
.TestCase
):
253 logger
.info("{}. {}".format(test_number
, cls
.__name
__))
256 def tearDownClass(cls
):
257 globals().__setitem
__('test_number', globals().__getitem
__('test_number') + 1)
260 exec_info
= sys
.exc_info()
261 if exec_info
== (None, None, None):
262 logger
.info(self
.__class
__.test_text
+ " -> TEST OK")
264 logger
.warning(self
.__class
__.test_text
+ " -> TEST NOK")
265 error_trace
= traceback
.format_exception(exec_info
[0], exec_info
[1], exec_info
[2])
267 for line
in error_trace
:
269 logger
.critical("{}".format(msg
))
271 def test_000_list_VIM_images(self
):
272 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
273 inspect
.currentframe().f_code
.co_name
)
274 self
.__class
__.test_index
+= 1
275 images
= client
.vim_action("list", "images")
276 logger
.debug("{}".format(images
))
279 The following is a non critical test that will fail most of the times.
280 In case of OpenStack datacenter these tests will only success if RO has access to the admin endpoint
281 This test will only be executed in case it is specifically requested by the user
283 class test_VIM_tenant_operations(unittest
.TestCase
):
285 vim_tenant_name
= None
287 vim_tenant_uuid
= None
291 logger
.info("{}. {}".format(test_number
, cls
.__name
__))
292 logger
.warning("In case of OpenStack datacenter these tests will only success "
293 "if RO has access to the admin endpoint")
296 def tearDownClass(cls
):
297 globals().__setitem
__('test_number', globals().__getitem
__('test_number') + 1)
300 exec_info
= sys
.exc_info()
301 if exec_info
== (None, None, None):
302 logger
.info(self
.__class
__.test_text
+ " -> TEST OK")
304 logger
.warning(self
.__class
__.test_text
+ " -> TEST NOK")
305 error_trace
= traceback
.format_exception(exec_info
[0], exec_info
[1], exec_info
[2])
307 for line
in error_trace
:
309 logger
.critical("{}".format(msg
))
311 def test_000_create_VIM_tenant(self
):
312 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
313 inspect
.currentframe().f_code
.co_name
)
314 self
.__class
__.vim_tenant_name
= _get_random_string(20)
315 self
.__class
__.test_index
+= 1
316 tenant
= client
.vim_action("create", "tenants", name
=self
.__class
__.vim_tenant_name
)
317 logger
.debug("{}".format(tenant
))
318 self
.__class
__.vim_tenant_uuid
= tenant
["tenant"]["id"]
319 self
.assertEqual(tenant
.get('tenant', {}).get('name', ''), self
.__class
__.vim_tenant_name
)
321 def test_010_list_VIM_tenants(self
):
322 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
323 inspect
.currentframe().f_code
.co_name
)
324 self
.__class
__.test_index
+= 1
325 tenants
= client
.vim_action("list", "tenants")
326 logger
.debug("{}".format(tenants
))
328 def test_020_get_VIM_tenant_by_uuid(self
):
329 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
330 inspect
.currentframe().f_code
.co_name
)
332 self
.__class
__.test_index
+= 1
333 tenant
= client
.vim_action("show", "tenants", uuid
=self
.__class
__.vim_tenant_uuid
)
334 logger
.debug("{}".format(tenant
))
335 self
.assertEqual(tenant
.get('tenant', {}).get('name', ''), self
.__class
__.vim_tenant_name
)
337 def test_030_delete_VIM_tenant_by_uuid(self
):
338 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_number
, self
.__class
__.test_index
,
339 inspect
.currentframe().f_code
.co_name
)
341 self
.__class
__.test_index
+= 1
342 tenant
= client
.vim_action("delete", "tenants", uuid
=self
.__class
__.vim_tenant_uuid
)
343 logger
.debug("{}".format(tenant
))
344 assert ('deleted' in tenant
.get('result', ""))
348 The following unittest class does not have the 'test_' on purpose. This test is the one used for the
349 scenario based tests.
351 class descriptor_based_scenario_test(unittest
.TestCase
):
354 scenario_test_path
= None
356 instance_scenario_uuid
= None
362 cls
.to_delete_list
= []
363 cls
.scenario_test_path
= test_directory
+ '/' + scenario_test_folder
364 logger
.info("{}. {} {}".format(test_number
, cls
.__name
__, scenario_test_folder
))
367 def tearDownClass(cls
):
368 globals().__setitem
__('test_number', globals().__getitem
__('test_number') + 1)
371 exec_info
= sys
.exc_info()
372 if exec_info
== (None, None, None):
373 logger
.info(self
.__class
__.test_text
+ " -> TEST OK")
375 logger
.warning(self
.__class
__.test_text
+ " -> TEST NOK")
376 error_trace
= traceback
.format_exception(exec_info
[0], exec_info
[1], exec_info
[2])
378 for line
in error_trace
:
380 logger
.critical("{}".format(msg
))
383 def test_000_load_scenario(self
):
384 self
.__class
__.test_text
= "{}.{}. TEST {} {}".format(test_number
, self
.__class
__.test_index
,
385 inspect
.currentframe().f_code
.co_name
,
386 scenario_test_folder
)
387 self
.__class
__.test_index
+= 1
388 vnfd_files
= glob
.glob(self
.__class
__.scenario_test_path
+'/vnfd_*.yaml')
389 scenario_file
= glob
.glob(self
.__class
__.scenario_test_path
+ '/scenario_*.yaml')
390 if len(vnfd_files
) == 0 or len(scenario_file
) > 1:
391 raise Exception('Test '+scenario_test_folder
+' not valid. It must contain an scenario file and at least one'
395 for vnfd
in vnfd_files
:
396 with
open(vnfd
, 'r') as stream
:
397 vnf_descriptor
= yaml
.load(stream
)
399 vnfc_list
= vnf_descriptor
['vnf']['VNFC']
400 for vnfc
in vnfc_list
:
401 vnfc
['image name'] = test_image_name
402 devices
= vnfc
.get('devices',[])
403 for device
in devices
:
404 if device
['type'] == 'disk' and 'image name' in device
:
405 device
['image name'] = test_image_name
407 logger
.debug("VNF descriptor: {}".format(vnf_descriptor
))
408 vnf
= client
.create_vnf(descriptor
=vnf_descriptor
)
410 self
.__class
__.to_delete_list
.insert(0, {"item": "vnf", "function": client
.delete_vnf
,
411 "params": {"uuid": vnf
['vnf']['uuid']}})
413 #load the scenario definition
414 with
open(scenario_file
[0], 'r') as stream
:
415 scenario_descriptor
= yaml
.load(stream
)
416 networks
= scenario_descriptor
['scenario']['networks']
417 networks
[management_network
] = networks
.pop('mgmt')
418 logger
.debug("Scenario descriptor: {}".format(scenario_descriptor
))
419 scenario
= client
.create_scenario(descriptor
=scenario_descriptor
)
420 logger
.debug(scenario
)
421 self
.__class
__.to_delete_list
.insert(0,{"item": "scenario", "function": client
.delete_scenario
,
422 "params":{"uuid": scenario
['scenario']['uuid']} })
423 self
.__class
__.scenario_uuid
= scenario
['scenario']['uuid']
425 def test_010_instantiate_scenario(self
):
426 self
.__class
__.test_text
= "{}.{}. TEST {} {}".format(test_number
, self
.__class
__.test_index
,
427 inspect
.currentframe().f_code
.co_name
,
428 scenario_test_folder
)
429 self
.__class
__.test_index
+= 1
431 instance
= client
.create_instance(scenario_id
=self
.__class
__.scenario_uuid
, name
=self
.__class
__.test_text
)
432 logger
.debug(instance
)
433 self
.__class
__.to_delete_list
.insert(0, {"item": "instance", "function": client
.delete_instance
,
434 "params": {"uuid": instance
['uuid']}})
436 def test_020_clean_deployment(self
):
437 self
.__class
__.test_text
= "{}.{}. TEST {} {}".format(test_number
, self
.__class
__.test_index
,
438 inspect
.currentframe().f_code
.co_name
,
439 scenario_test_folder
)
440 self
.__class
__.test_index
+= 1
441 #At the moment if you delete an scenario right after creating it, in openstack datacenters
442 #sometimes scenario ports get orphaned. This sleep is just a dirty workaround
444 for item
in self
.__class
__.to_delete_list
:
445 response
= item
["function"](**item
["params"])
446 logger
.debug(response
)
448 def _get_random_string(maxLength
):
449 '''generates a string with random characters string.letters and string.digits
450 with a random length up to maxLength characters. If maxLength is <15 it will be changed automatically to 15
454 minLength
= min_string
- len(prefix
)
455 if maxLength
< min_string
: maxLength
= min_string
456 maxLength
-= len(prefix
)
457 length
= random
.randint(minLength
,maxLength
)
458 return 'testing_'+"".join([random
.choice(string
.letters
+string
.digits
) for i
in xrange(length
)])
460 if __name__
=="__main__":
461 sys
.path
.append(os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))))
462 import openmanoclient
464 parser
= OptionParser()
467 parser
.add_option("-v",'--version', help='Show current version', dest
='version', action
="store_true", default
=False)
468 parser
.add_option('--debug', help='Set logs to debug level', dest
='debug', action
="store_true", default
=False)
469 parser
.add_option('--failed', help='Set logs to show only failed tests. --debug disables this option',
470 dest
='failed', action
="store_true", default
=False)
471 parser
.add_option('-u', '--url', dest
='endpoint_url', help='Set the openmano server url. By default '
472 'http://localhost:9090/openmano',
473 default
='http://localhost:9090/openmano')
474 default_logger_file
= os
.path
.dirname(__file__
)+'/'+os
.path
.splitext(os
.path
.basename(__file__
))[0]+'.log'
475 parser
.add_option('--logger_file', dest
='logger_file', help='Set the logger file. By default '+default_logger_file
,
476 default
=default_logger_file
)
477 parser
.add_option('--list-tests', help='List all available tests', dest
='list-tests', action
="store_true",
479 parser
.add_option('--test', '--tests', help='Specify the tests to run', dest
='tests', default
=None)
482 parser
.add_option("-t", '--tenant', dest
='tenant_name', help='MANDATORY. Set the tenant name to test')
483 parser
.add_option('-d', '--datacenter', dest
='datacenter_name', help='MANDATORY, Set the datacenter name to test')
484 parser
.add_option("-i", '--image-name', dest
='image-name', help='MANDATORY. Image name of an Ubuntu 16.04 image '
485 'that will be used for testing available in the '
487 parser
.add_option("-n", '--mgmt-net-name', dest
='mgmt-net', help='MANDATORY. Set the tenant name to test')
489 (options
, args
) = parser
.parse_args()
491 # default logger level is INFO. Options --debug and --failed override this, being --debug prioritary
492 logger_level
= 'INFO'
493 if options
.__dict
__['debug']:
494 logger_level
= 'DEBUG'
495 elif options
.__dict
__['failed']:
496 logger_level
= 'WARNING'
497 logger_name
= os
.path
.basename(__file__
)
498 logger
= logging
.getLogger(logger_name
)
499 logger
.setLevel(logger_level
)
501 # Configure a logging handler to store in a logging file
502 fileHandler
= logging
.FileHandler(options
.__dict
__['logger_file'])
503 formatter_fileHandler
= logging
.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s')
504 fileHandler
.setFormatter(formatter_fileHandler
)
505 logger
.addHandler(fileHandler
)
507 # Configure a handler to print to stdout
508 consoleHandler
= logging
.StreamHandler(sys
.stdout
)
509 formatter_consoleHandler
= logging
.Formatter('%(message)s')
510 consoleHandler
.setFormatter(formatter_consoleHandler
)
511 logger
.addHandler(consoleHandler
)
513 logger
.debug('Program started with the following arguments: ' + str(options
.__dict
__))
515 #If version is required print it and exit
516 if options
.__dict
__['version']:
517 logger
.info("{}".format((sys
.argv
[0], __version__
+" version", version_date
)))
518 logger
.info ("(c) Copyright Telefonica")
521 test_directory
= os
.path
.dirname(__file__
) + "/RO_tests"
522 test_directory_content
= os
.listdir(test_directory
)
523 clsmembers
= inspect
.getmembers(sys
.modules
[__name__
], inspect
.isclass
)
525 # If only want to obtain a tests list print it and exit
526 if options
.__dict
__['list-tests']:
528 for cls
in clsmembers
:
529 if cls
[0].startswith('test_'):
530 tests_names
.append(cls
[0])
532 msg
= "The code based tests are:\n\t" + ', '.join(sorted(tests_names
))+'\n'+\
533 "The descriptor based tests are:\n\t"+ ', '.join(sorted(test_directory_content
))+'\n'+\
534 "NOTE: The test test_VIM_tenant_operations will fail in case the used datacenter is type OpenStack " \
535 "unless RO has access to the admin endpoint. Therefore this test is excluded by default"
540 #Make sure required arguments are present
541 required
= "tenant_name datacenter_name image-name mgmt-net".split()
544 if options
.__dict
__[r
] is None:
545 print "ERROR: parameter "+r
+" is required"
551 # set test image name and management network
552 test_image_name
= options
.__dict
__['image-name']
553 management_network
= options
.__dict
__['mgmt-net']
555 #Create the list of tests to be run
556 descriptor_based_tests
= []
557 code_based_tests
= []
558 if options
.__dict
__['tests'] != None:
559 tests
= sorted(options
.__dict
__['tests'].split(','))
561 matches_code_based_tests
= [item
for item
in clsmembers
if item
[0] == test
]
562 if test
in test_directory_content
:
563 descriptor_based_tests
.append(test
)
564 elif len(matches_code_based_tests
) > 0:
565 code_based_tests
.append(matches_code_based_tests
[0][1])
567 logger
.critical("Test {} is not among the possible ones".format(test
))
571 descriptor_based_tests
= test_directory_content
572 for cls
in clsmembers
:
573 #We exclude 'test_VIM_tenant_operations' unless it is specifically requested by the user
574 if cls
[0].startswith('test_') and cls
[0] != 'test_VIM_tenant_operations':
575 code_based_tests
.append(cls
[1])
577 logger
.debug("descriptor_based_tests to be executed: {}".format(descriptor_based_tests
))
578 logger
.debug("code_based_tests to be executed: {}".format(code_based_tests
))
580 # import openmanoclient from relative path
581 client
= openmanoclient
.openmanoclient(
582 endpoint_url
=options
.__dict
__['endpoint_url'],
583 tenant_name
=options
.__dict
__['tenant_name'],
584 datacenter_name
= options
.__dict
__['datacenter_name'],
585 debug
= options
.__dict
__['debug'], logger
= logger_name
)
587 # TextTestRunner stream is set to /dev/null in order to avoid the method to directly print the result of tests.
588 # This is handled in the tests using logging.
589 stream
= open('/dev/null', 'w')
594 #Run code based tests
595 basic_tests_suite
= unittest
.TestSuite()
596 for test
in code_based_tests
:
597 basic_tests_suite
.addTest(unittest
.makeSuite(test
))
598 result
= unittest
.TextTestRunner(stream
=stream
).run(basic_tests_suite
)
599 executed
+= result
.testsRun
600 failed
+= len(result
.failures
) + len(result
.errors
)
601 if len(result
.failures
) > 0:
602 logger
.debug("failures : {}".format(result
.failures
))
603 if len(result
.errors
) > 0:
604 logger
.debug("errors : {}".format(result
.errors
))
606 # Additionally to the previous tests, scenario based tests will be executed.
607 # This scenario based tests are defined as directories inside the directory defined in 'test_directory'
608 for test
in descriptor_based_tests
:
609 scenario_test_folder
= test
610 test_suite
= unittest
.TestSuite()
611 test_suite
.addTest(unittest
.makeSuite(descriptor_based_scenario_test
))
612 result
= unittest
.TextTestRunner(stream
=stream
).run(test_suite
)
613 executed
+= result
.testsRun
614 failed
+= len(result
.failures
) + len(result
.errors
)
615 if len(result
.failures
) > 0:
616 logger
.debug("failures : {}".format(result
.failures
))
617 if len(result
.errors
) > 0:
618 logger
.debug("errors : {}".format(result
.errors
))
621 logger
.warning("Total number of tests: {}; Total number of failures/errors: {}".format(executed
, failed
))