2 # -*- coding: utf-8 -*-
6 # This file is part of openmano
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
24 Module for testing openmano functionality. It uses openmanoclient.py for invoking openmano
26 __author__
="Pablo Montes, Alfonso Tierno"
27 __date__
="$16-Feb-2017 17:08:16$"
29 version_date
="May 2017"
33 from argparse
import ArgumentParser
44 from pyvcloud
.vcloudair
import VCA
46 global test_config
# used for global variables with the test configuration
50 def check_instance_scenario_active(uuid
):
51 instance
= test_config
["client"].get_instance(uuid
=uuid
)
53 for net
in instance
['nets']:
54 status
= net
['status']
55 if status
!= 'ACTIVE':
56 return (False, status
)
58 for vnf
in instance
['vnfs']:
61 if status
!= 'ACTIVE':
62 return (False, status
)
69 All unittest classes for code based tests must have prefix 'test_' in order to be taken into account for tests
71 class test_VIM_datacenter_tenant_operations(unittest
.TestCase
):
78 logger
.info("{}. {}".format(test_config
["test_number"], cls
.__name
__))
81 def tearDownClass(cls
):
82 test_config
["test_number"] += 1
85 exec_info
= sys
.exc_info()
86 if exec_info
== (None, None, None):
87 logger
.info(self
.__class
__.test_text
+" -> TEST OK")
89 logger
.warning(self
.__class
__.test_text
+" -> TEST NOK")
90 error_trace
= traceback
.format_exception(exec_info
[0], exec_info
[1], exec_info
[2])
92 for line
in error_trace
:
94 logger
.critical("{}".format(msg
))
96 def test_000_create_RO_tenant(self
):
97 self
.__class
__.tenant_name
= _get_random_string(20)
98 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
99 inspect
.currentframe().f_code
.co_name
)
100 self
.__class
__.test_index
+= 1
101 tenant
= test_config
["client"].create_tenant(name
=self
.__class
__.tenant_name
,
102 description
=self
.__class
__.tenant_name
)
103 logger
.debug("{}".format(tenant
))
104 self
.assertEqual(tenant
.get('tenant', {}).get('name', ''), self
.__class
__.tenant_name
)
106 def test_010_list_RO_tenant(self
):
107 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
108 inspect
.currentframe().f_code
.co_name
)
109 self
.__class
__.test_index
+= 1
110 tenant
= test_config
["client"].get_tenant(name
=self
.__class
__.tenant_name
)
111 logger
.debug("{}".format(tenant
))
112 self
.assertEqual(tenant
.get('tenant', {}).get('name', ''), self
.__class
__.tenant_name
)
114 def test_020_delete_RO_tenant(self
):
115 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
116 inspect
.currentframe().f_code
.co_name
)
117 self
.__class
__.test_index
+= 1
118 tenant
= test_config
["client"].delete_tenant(name
=self
.__class
__.tenant_name
)
119 logger
.debug("{}".format(tenant
))
120 assert('deleted' in tenant
.get('result',""))
123 class test_VIM_datacenter_operations(unittest
.TestCase
):
125 datacenter_name
= None
130 logger
.info("{}. {}".format(test_config
["test_number"], cls
.__name
__))
133 def tearDownClass(cls
):
134 test_config
["test_number"] += 1
137 exec_info
= sys
.exc_info()
138 if exec_info
== (None, None, None):
139 logger
.info(self
.__class
__.test_text
+" -> TEST OK")
141 logger
.warning(self
.__class
__.test_text
+" -> TEST NOK")
142 error_trace
= traceback
.format_exception(exec_info
[0], exec_info
[1], exec_info
[2])
144 for line
in error_trace
:
146 logger
.critical("{}".format(msg
))
148 def test_000_create_datacenter(self
):
149 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
150 inspect
.currentframe().f_code
.co_name
)
151 self
.__class
__.datacenter_name
= _get_random_string(20)
152 self
.__class
__.test_index
+= 1
153 self
.datacenter
= test_config
["client"].create_datacenter(name
=self
.__class
__.datacenter_name
,
154 vim_url
="http://fakeurl/fake")
155 logger
.debug("{}".format(self
.datacenter
))
156 self
.assertEqual (self
.datacenter
.get('datacenter', {}).get('name',''), self
.__class
__.datacenter_name
)
158 def test_010_list_datacenter(self
):
159 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
160 inspect
.currentframe().f_code
.co_name
)
162 self
.__class
__.test_index
+= 1
163 self
.datacenter
= test_config
["client"].get_datacenter(all_tenants
=True, name
=self
.__class
__.datacenter_name
)
164 logger
.debug("{}".format(self
.datacenter
))
165 self
.assertEqual (self
.datacenter
.get('datacenter', {}).get('name', ''), self
.__class
__.datacenter_name
)
167 def test_020_attach_datacenter(self
):
168 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
169 inspect
.currentframe().f_code
.co_name
)
171 self
.__class
__.test_index
+= 1
172 self
.datacenter
= test_config
["client"].attach_datacenter(name
=self
.__class
__.datacenter_name
,
173 vim_tenant_name
='fake')
174 logger
.debug("{}".format(self
.datacenter
))
175 assert ('vim_tenants' in self
.datacenter
.get('datacenter', {}))
177 def test_030_list_attached_datacenter(self
):
178 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
179 inspect
.currentframe().f_code
.co_name
)
181 self
.__class
__.test_index
+= 1
182 self
.datacenter
= test_config
["client"].get_datacenter(all_tenants
=False, name
=self
.__class
__.datacenter_name
)
183 logger
.debug("{}".format(self
.datacenter
))
184 self
.assertEqual (self
.datacenter
.get('datacenter', {}).get('name', ''), self
.__class
__.datacenter_name
)
186 def test_040_detach_datacenter(self
):
187 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
188 inspect
.currentframe().f_code
.co_name
)
190 self
.__class
__.test_index
+= 1
191 self
.datacenter
= test_config
["client"].detach_datacenter(name
=self
.__class
__.datacenter_name
)
192 logger
.debug("{}".format(self
.datacenter
))
193 assert ('detached' in self
.datacenter
.get('result', ""))
195 def test_050_delete_datacenter(self
):
196 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
197 inspect
.currentframe().f_code
.co_name
)
199 self
.__class
__.test_index
+= 1
200 self
.datacenter
= test_config
["client"].delete_datacenter(name
=self
.__class
__.datacenter_name
)
201 logger
.debug("{}".format(self
.datacenter
))
202 assert('deleted' in self
.datacenter
.get('result',""))
205 class test_VIM_network_operations(unittest
.TestCase
):
207 vim_network_name
= None
209 vim_network_uuid
= None
213 logger
.info("{}. {}".format(test_config
["test_number"], cls
.__name
__))
216 def tearDownClass(cls
):
217 test_config
["test_number"] += 1
220 exec_info
= sys
.exc_info()
221 if exec_info
== (None, None, None):
222 logger
.info(self
.__class
__.test_text
+ " -> TEST OK")
224 logger
.warning(self
.__class
__.test_text
+ " -> TEST NOK")
225 error_trace
= traceback
.format_exception(exec_info
[0], exec_info
[1], exec_info
[2])
227 for line
in error_trace
:
229 logger
.critical("{}".format(msg
))
231 def test_000_create_VIM_network(self
):
232 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
233 inspect
.currentframe().f_code
.co_name
)
234 self
.__class
__.vim_network_name
= _get_random_string(20)
235 self
.__class
__.test_index
+= 1
236 network
= test_config
["client"].vim_action("create", "networks", name
=self
.__class
__.vim_network_name
)
237 logger
.debug("{}".format(network
))
238 self
.__class
__.vim_network_uuid
= network
["network"]["id"]
239 self
.assertEqual(network
.get('network', {}).get('name', ''), self
.__class
__.vim_network_name
)
241 def test_010_list_VIM_networks(self
):
242 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
243 inspect
.currentframe().f_code
.co_name
)
244 self
.__class
__.test_index
+= 1
245 networks
= test_config
["client"].vim_action("list", "networks")
246 logger
.debug("{}".format(networks
))
248 def test_020_get_VIM_network_by_uuid(self
):
249 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
250 inspect
.currentframe().f_code
.co_name
)
252 self
.__class
__.test_index
+= 1
253 network
= test_config
["client"].vim_action("show", "networks", uuid
=self
.__class
__.vim_network_uuid
)
254 logger
.debug("{}".format(network
))
255 self
.assertEqual(network
.get('network', {}).get('name', ''), self
.__class
__.vim_network_name
)
257 def test_030_delete_VIM_network_by_uuid(self
):
258 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
259 inspect
.currentframe().f_code
.co_name
)
261 self
.__class
__.test_index
+= 1
262 network
= test_config
["client"].vim_action("delete", "networks", uuid
=self
.__class
__.vim_network_uuid
)
263 logger
.debug("{}".format(network
))
264 assert ('deleted' in network
.get('result', ""))
267 class test_VIM_image_operations(unittest
.TestCase
):
273 logger
.info("{}. {}".format(test_config
["test_number"], cls
.__name
__))
276 def tearDownClass(cls
):
277 test_config
["test_number"] += 1
280 exec_info
= sys
.exc_info()
281 if exec_info
== (None, None, None):
282 logger
.info(self
.__class
__.test_text
+ " -> TEST OK")
284 logger
.warning(self
.__class
__.test_text
+ " -> TEST NOK")
285 error_trace
= traceback
.format_exception(exec_info
[0], exec_info
[1], exec_info
[2])
287 for line
in error_trace
:
289 logger
.critical("{}".format(msg
))
291 def test_000_list_VIM_images(self
):
292 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
293 inspect
.currentframe().f_code
.co_name
)
294 self
.__class
__.test_index
+= 1
295 images
= test_config
["client"].vim_action("list", "images")
296 logger
.debug("{}".format(images
))
299 The following is a non critical test that will fail most of the times.
300 In case of OpenStack datacenter these tests will only success if RO has access to the admin endpoint
301 This test will only be executed in case it is specifically requested by the user
303 class test_VIM_tenant_operations(unittest
.TestCase
):
305 vim_tenant_name
= None
307 vim_tenant_uuid
= None
311 logger
.info("{}. {}".format(test_config
["test_number"], cls
.__name
__))
312 logger
.warning("In case of OpenStack datacenter these tests will only success "
313 "if RO has access to the admin endpoint")
316 def tearDownClass(cls
):
317 test_config
["test_number"] += 1
320 exec_info
= sys
.exc_info()
321 if exec_info
== (None, None, None):
322 logger
.info(self
.__class
__.test_text
+ " -> TEST OK")
324 logger
.warning(self
.__class
__.test_text
+ " -> TEST NOK")
325 error_trace
= traceback
.format_exception(exec_info
[0], exec_info
[1], exec_info
[2])
327 for line
in error_trace
:
329 logger
.critical("{}".format(msg
))
331 def test_000_create_VIM_tenant(self
):
332 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
333 inspect
.currentframe().f_code
.co_name
)
334 self
.__class
__.vim_tenant_name
= _get_random_string(20)
335 self
.__class
__.test_index
+= 1
336 tenant
= test_config
["client"].vim_action("create", "tenants", name
=self
.__class
__.vim_tenant_name
)
337 logger
.debug("{}".format(tenant
))
338 self
.__class
__.vim_tenant_uuid
= tenant
["tenant"]["id"]
339 self
.assertEqual(tenant
.get('tenant', {}).get('name', ''), self
.__class
__.vim_tenant_name
)
341 def test_010_list_VIM_tenants(self
):
342 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
343 inspect
.currentframe().f_code
.co_name
)
344 self
.__class
__.test_index
+= 1
345 tenants
= test_config
["client"].vim_action("list", "tenants")
346 logger
.debug("{}".format(tenants
))
348 def test_020_get_VIM_tenant_by_uuid(self
):
349 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
350 inspect
.currentframe().f_code
.co_name
)
352 self
.__class
__.test_index
+= 1
353 tenant
= test_config
["client"].vim_action("show", "tenants", uuid
=self
.__class
__.vim_tenant_uuid
)
354 logger
.debug("{}".format(tenant
))
355 self
.assertEqual(tenant
.get('tenant', {}).get('name', ''), self
.__class
__.vim_tenant_name
)
357 def test_030_delete_VIM_tenant_by_uuid(self
):
358 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
359 inspect
.currentframe().f_code
.co_name
)
361 self
.__class
__.test_index
+= 1
362 tenant
= test_config
["client"].vim_action("delete", "tenants", uuid
=self
.__class
__.vim_tenant_uuid
)
363 logger
.debug("{}".format(tenant
))
364 assert ('deleted' in tenant
.get('result', ""))
366 class test_vimconn_connect(unittest
.TestCase
):
372 logger
.info("{}. {}".format(test_config
["test_number"], cls
.__name
__))
375 def tearDownClass(cls
):
376 test_config
["test_number"] += 1
379 exec_info
= sys
.exc_info()
380 if exec_info
== (None, None, None):
381 logger
.info(self
.__class
__.test_text
+" -> TEST OK")
383 logger
.warning(self
.__class
__.test_text
+" -> TEST NOK")
384 logger
.critical("Traceback error",exc_info
=True)
386 def test_000_connect(self
):
387 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
388 self
.__class
__.test_index
,
389 inspect
.currentframe().f_code
.co_name
)
391 self
.__class
__.test_index
+= 1
392 if test_config
['vimtype'] == 'vmware':
393 vca_object
= test_config
["vim_conn"].connect()
394 logger
.debug("{}".format(vca_object
))
395 self
.assertIsInstance(vca_object
, VCA
)
398 class test_vimconn_new_network(unittest
.TestCase
):
405 logger
.info("{}. {}".format(test_config
["test_number"], cls
.__name
__))
408 def tearDownClass(cls
):
409 test_config
["test_number"] += 1
412 exec_info
= sys
.exc_info()
413 if exec_info
== (None, None, None):
414 logger
.info(self
.__class
__.test_text
+" -> TEST OK")
416 logger
.warning(self
.__class
__.test_text
+" -> TEST NOK")
417 logger
.critical("Traceback error",exc_info
=True)
419 def test_000_new_network(self
):
420 self
.__class
__.network_name
= _get_random_string(20)
421 network_type
= 'bridge'
423 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
424 self
.__class
__.test_index
, inspect
.currentframe().f_code
.co_name
)
425 self
.__class
__.test_index
+= 1
427 network
= test_config
["vim_conn"].new_network(net_name
=self
.__class
__.network_name
,
428 net_type
=network_type
)
429 self
.__class
__.network_id
= network
430 logger
.debug("{}".format(network
))
432 network_list
= test_config
["vim_conn"].get_vcd_network_list()
433 for net
in network_list
:
434 if self
.__class
__.network_name
in net
.get('name'):
435 self
.assertIn(self
.__class
__.network_name
, net
.get('name'))
436 self
.assertEqual(net
.get('type'), network_type
)
438 # Deleting created network
439 result
= test_config
["vim_conn"].delete_network(self
.__class
__.network_id
)
441 logger
.info("Network id {} sucessfully deleted".format(self
.__class
__.network_id
))
443 logger
.info("Failed to delete network id {}".format(self
.__class
__.network_id
))
445 def test_010_new_network_by_types(self
):
447 network_types
= ['data','bridge','mgmt']
448 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
449 self
.__class
__.test_index
,
450 inspect
.currentframe().f_code
.co_name
)
451 self
.__class
__.test_index
+= 1
452 for net_type
in network_types
:
453 self
.__class
__.network_name
= _get_random_string(20)
454 network_id
= test_config
["vim_conn"].new_network(net_name
=self
.__class
__.network_name
,
457 delete_net_ids
.append(network_id
)
458 logger
.debug("{}".format(network_id
))
460 network_list
= test_config
["vim_conn"].get_vcd_network_list()
461 for net
in network_list
:
462 if self
.__class
__.network_name
in net
.get('name'):
463 self
.assertIn(self
.__class
__.network_name
, net
.get('name'))
464 if net_type
in net
.get('type'):
465 self
.assertEqual(net
.get('type'), net_type
)
467 self
.assertNotEqual(net
.get('type'), net_type
)
469 # Deleting created network
470 for net_id
in delete_net_ids
:
471 result
= test_config
["vim_conn"].delete_network(net_id
)
473 logger
.info("Network id {} sucessfully deleted".format(net_id
))
475 logger
.info("Failed to delete network id {}".format(net_id
))
477 def test_020_new_network_by_ipprofile(self
):
478 test_directory_content
= os
.listdir(test_config
["test_directory"])
480 for dir_name
in test_directory_content
:
481 if dir_name
== 'simple_multi_vnfc':
482 self
.__class
__.scenario_test_path
= test_config
["test_directory"] + '/'+ dir_name
483 vnfd_files
= glob
.glob(self
.__class
__.scenario_test_path
+'/vnfd_*.yaml')
486 for vnfd
in vnfd_files
:
487 with
open(vnfd
, 'r') as stream
:
488 vnf_descriptor
= yaml
.load(stream
)
490 internal_connections_list
= vnf_descriptor
['vnf']['internal-connections']
491 for item
in internal_connections_list
:
492 if 'ip-profile' in item
:
493 version
= item
['ip-profile']['ip-version']
494 dhcp_count
= item
['ip-profile']['dhcp']['count']
495 dhcp_enabled
= item
['ip-profile']['dhcp']['enabled']
497 self
.__class
__.network_name
= _get_random_string(20)
498 ip_profile
= {'dhcp_count': dhcp_count
,
499 'dhcp_enabled': dhcp_enabled
,
500 'ip_version': version
502 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
503 self
.__class
__.test_index
,
504 inspect
.currentframe().f_code
.co_name
)
505 self
.__class
__.test_index
+= 1
506 network
= test_config
["vim_conn"].new_network(net_name
=self
.__class
__.network_name
,
508 ip_profile
=ip_profile
)
509 self
.__class
__.network_id
= network
510 logger
.debug("{}".format(network
))
512 network_list
= test_config
["vim_conn"].get_vcd_network_list()
513 for net
in network_list
:
514 if self
.__class
__.network_name
in net
.get('name'):
515 self
.assertIn(self
.__class
__.network_name
, net
.get('name'))
517 # Deleting created network
518 result
= test_config
["vim_conn"].delete_network(self
.__class
__.network_id
)
520 logger
.info("Network id {} sucessfully deleted".format(self
.__class
__.network_id
))
522 logger
.info("Failed to delete network id {}".format(self
.__class
__.network_id
))
524 def test_030_new_network_by_isshared(self
):
525 self
.__class
__.network_name
= _get_random_string(20)
527 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
528 self
.__class
__.test_index
,
529 inspect
.currentframe().f_code
.co_name
)
530 self
.__class
__.test_index
+= 1
531 network
= test_config
["vim_conn"].new_network(net_name
=self
.__class
__.network_name
,
534 self
.__class
__.network_id
= network
535 logger
.debug("{}".format(network
))
537 network_list
= test_config
["vim_conn"].get_vcd_network_list()
538 for net
in network_list
:
539 if self
.__class
__.network_name
in net
.get('name'):
540 self
.assertIn(self
.__class
__.network_name
, net
.get('name'))
541 self
.assertEqual(net
.get('shared'), shared
)
543 # Deleting created network
544 result
= test_config
["vim_conn"].delete_network(self
.__class
__.network_id
)
546 logger
.info("Network id {} sucessfully deleted".format(self
.__class
__.network_id
))
548 logger
.info("Failed to delete network id {}".format(self
.__class
__.network_id
))
550 def test_040_new_network_by_negative(self
):
551 self
.__class
__.network_name
= _get_random_string(20)
552 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
553 self
.__class
__.test_index
,
554 inspect
.currentframe().f_code
.co_name
)
555 self
.__class
__.test_index
+= 1
556 network
= test_config
["vim_conn"].new_network(net_name
=self
.__class
__.network_name
,
557 net_type
='unknowntype')
558 self
.__class
__.network_id
= network
559 logger
.debug("{}".format(network
))
560 network_list
= test_config
["vim_conn"].get_vcd_network_list()
561 for net
in network_list
:
562 if self
.__class
__.network_name
in net
.get('name'):
563 self
.assertIn(self
.__class
__.network_name
, net
.get('name'))
565 # Deleting created network
566 result
= test_config
["vim_conn"].delete_network(self
.__class
__.network_id
)
568 logger
.info("Network id {} sucessfully deleted".format(self
.__class
__.network_id
))
570 logger
.info("Failed to delete network id {}".format(self
.__class
__.network_id
))
572 class test_vimconn_get_network_list(unittest
.TestCase
):
579 logger
.info("{}. {}".format(test_config
["test_number"], cls
.__name
__))
582 def tearDownClass(cls
):
583 test_config
["test_number"] += 1
586 # creating new network
587 self
.__class
__.network_name
= _get_random_string(20)
588 self
.__class
__.net_type
= 'bridge'
589 network
= test_config
["vim_conn"].new_network(net_name
=self
.__class
__.network_name
,
590 net_type
=self
.__class
__.net_type
)
591 self
.__class
__.network_id
= network
592 logger
.debug("{}".format(network
))
595 exec_info
= sys
.exc_info()
596 if exec_info
== (None, None, None):
597 logger
.info(self
.__class
__.test_text
+" -> TEST OK")
599 logger
.warning(self
.__class
__.test_text
+" -> TEST NOK")
600 logger
.critical("Traceback error",exc_info
=True)
602 # Deleting created network
603 result
= test_config
["vim_conn"].delete_network(self
.__class
__.network_id
)
605 logger
.info("Network id {} sucessfully deleted".format(self
.__class
__.network_id
))
607 logger
.info("Failed to delete network id {}".format(self
.__class
__.network_id
))
609 def test_000_get_network_list(self
):
610 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
611 self
.__class
__.test_index
,
612 inspect
.currentframe().f_code
.co_name
)
613 self
.__class
__.test_index
+= 1
615 network_list
= test_config
["vim_conn"].get_network_list()
616 for net
in network_list
:
617 if self
.__class
__.network_name
in net
.get('name'):
618 self
.assertIn(self
.__class
__.network_name
, net
.get('name'))
619 self
.assertEqual(net
.get('type'), self
.__class
__.net_type
)
620 self
.assertEqual(net
.get('status'), 'ACTIVE')
621 self
.assertEqual(net
.get('shared'), False)
623 def test_010_get_network_list_by_name(self
):
624 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
625 self
.__class
__.test_index
,
626 inspect
.currentframe().f_code
.co_name
)
627 self
.__class
__.test_index
+= 1
629 network_name
= test_config
['vim_conn'].get_network_name_by_id(self
.__class
__.network_id
)
631 # find network from list by it's name
632 new_network_list
= test_config
["vim_conn"].get_network_list({'name': network_name
})
633 for list_item
in new_network_list
:
634 if self
.__class
__.network_name
in list_item
.get('name'):
635 self
.assertEqual(network_name
, list_item
.get('name'))
636 self
.assertEqual(list_item
.get('type'), self
.__class
__.net_type
)
637 self
.assertEqual(list_item
.get('status'), 'ACTIVE')
639 def test_020_get_network_list_by_id(self
):
640 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
641 self
.__class
__.test_index
,
642 inspect
.currentframe().f_code
.co_name
)
643 self
.__class
__.test_index
+= 1
645 # find network from list by it's id
646 new_network_list
= test_config
["vim_conn"].get_network_list({'id':self
.__class
__.network_id
})
647 for list_item
in new_network_list
:
648 if self
.__class
__.network_id
in list_item
.get('id'):
649 self
.assertEqual(self
.__class
__.network_id
, list_item
.get('id'))
650 self
.assertEqual(list_item
.get('type'), self
.__class
__.net_type
)
651 self
.assertEqual(list_item
.get('status'), 'ACTIVE')
653 def test_030_get_network_list_by_shared(self
):
655 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
656 self
.__class
__.test_index
,
657 inspect
.currentframe().f_code
.co_name
)
658 self
.__class
__.test_index
+= 1
660 network_name
= test_config
['vim_conn'].get_network_name_by_id(self
.__class
__.network_id
)
661 # find network from list by it's shared value
662 new_network_list
= test_config
["vim_conn"].get_network_list({'shared':Shared
,
663 'name':network_name
})
664 for list_item
in new_network_list
:
665 if list_item
.get('shared') == Shared
:
666 self
.assertEqual(list_item
.get('shared'), Shared
)
667 self
.assertEqual(list_item
.get('type'), self
.__class
__.net_type
)
668 self
.assertEqual(network_name
, list_item
.get('name'))
670 def test_040_get_network_list_by_tenant_id(self
):
671 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
672 self
.__class
__.test_index
,
673 inspect
.currentframe().f_code
.co_name
)
674 self
.__class
__.test_index
+= 1
676 tenant_list
= test_config
["vim_conn"].get_tenant_list()
677 network_name
= test_config
['vim_conn'].get_network_name_by_id(self
.__class
__.network_id
)
679 for tenant_item
in tenant_list
:
680 if test_config
['tenant'] == tenant_item
.get('name'):
681 # find network from list by it's tenant id
682 tenant_id
= tenant_item
.get('id')
683 new_network_list
= test_config
["vim_conn"].get_network_list({'tenant_id':tenant_id
,
684 'name':network_name
})
685 for list_item
in new_network_list
:
686 self
.assertEqual(tenant_id
, list_item
.get('tenant_id'))
687 self
.assertEqual(network_name
, list_item
.get('name'))
688 self
.assertEqual(list_item
.get('type'), self
.__class
__.net_type
)
689 self
.assertEqual(list_item
.get('status'), 'ACTIVE')
691 def test_050_get_network_list_by_status(self
):
692 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
693 self
.__class
__.test_index
,
694 inspect
.currentframe().f_code
.co_name
)
695 self
.__class
__.test_index
+= 1
698 network_name
= test_config
['vim_conn'].get_network_name_by_id(self
.__class
__.network_id
)
700 # find network from list by it's status
701 new_network_list
= test_config
["vim_conn"].get_network_list({'status':status
,
702 'name': network_name
})
703 for list_item
in new_network_list
:
704 self
.assertIn(self
.__class
__.network_name
, list_item
.get('name'))
705 self
.assertEqual(list_item
.get('type'), self
.__class
__.net_type
)
706 self
.assertEqual(list_item
.get('status'), status
)
708 def test_060_get_network_list_by_negative(self
):
709 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
710 self
.__class
__.test_index
,
711 inspect
.currentframe().f_code
.co_name
)
712 self
.__class
__.test_index
+= 1
714 network_list
= test_config
["vim_conn"].get_network_list({'name': 'unknown_name'})
715 self
.assertEqual(network_list
, [])
720 The following unittest class does not have the 'test_' on purpose. This test is the one used for the
721 scenario based tests.
723 class descriptor_based_scenario_test(unittest
.TestCase
):
726 scenario_test_path
= None
728 instance_scenario_uuid
= None
734 cls
.to_delete_list
= []
735 cls
.scenario_test_path
= test_config
["test_directory"] + '/' + test_config
["test_folder"]
736 logger
.info("{}. {} {}".format(test_config
["test_number"], cls
.__name
__, test_config
["test_folder"]))
739 def tearDownClass(cls
):
740 test_config
["test_number"] += 1
743 exec_info
= sys
.exc_info()
744 if exec_info
== (None, None, None):
745 logger
.info(self
.__class
__.test_text
+ " -> TEST OK")
747 logger
.warning(self
.__class
__.test_text
+ " -> TEST NOK")
748 error_trace
= traceback
.format_exception(exec_info
[0], exec_info
[1], exec_info
[2])
750 for line
in error_trace
:
752 logger
.critical("{}".format(msg
))
755 def test_000_load_scenario(self
):
756 self
.__class
__.test_text
= "{}.{}. TEST {} {}".format(test_config
["test_number"], self
.__class
__.test_index
,
757 inspect
.currentframe().f_code
.co_name
,
758 test_config
["test_folder"])
759 self
.__class
__.test_index
+= 1
760 vnfd_files
= glob
.glob(self
.__class
__.scenario_test_path
+'/vnfd_*.yaml')
761 scenario_file
= glob
.glob(self
.__class
__.scenario_test_path
+ '/scenario_*.yaml')
762 if len(vnfd_files
) == 0 or len(scenario_file
) > 1:
763 raise Exception("Test '{}' not valid. It must contain an scenario file and at least one vnfd file'".format(
764 test_config
["test_folder"]))
767 for vnfd
in vnfd_files
:
768 with
open(vnfd
, 'r') as stream
:
769 vnf_descriptor
= yaml
.load(stream
)
771 vnfc_list
= vnf_descriptor
['vnf']['VNFC']
772 for vnfc
in vnfc_list
:
773 vnfc
['image name'] = test_config
["image_name"]
774 devices
= vnfc
.get('devices',[])
775 for device
in devices
:
776 if device
['type'] == 'disk' and 'image name' in device
:
777 device
['image name'] = test_config
["image_name"]
779 logger
.debug("VNF descriptor: {}".format(vnf_descriptor
))
780 vnf
= test_config
["client"].create_vnf(descriptor
=vnf_descriptor
)
782 self
.__class
__.to_delete_list
.insert(0, {"item": "vnf", "function": test_config
["client"].delete_vnf
,
783 "params": {"uuid": vnf
['vnf']['uuid']}})
785 #load the scenario definition
786 with
open(scenario_file
[0], 'r') as stream
:
787 scenario_descriptor
= yaml
.load(stream
)
788 networks
= scenario_descriptor
['scenario']['networks']
789 networks
[test_config
["mgmt_net"]] = networks
.pop('mgmt')
790 logger
.debug("Scenario descriptor: {}".format(scenario_descriptor
))
791 scenario
= test_config
["client"].create_scenario(descriptor
=scenario_descriptor
)
792 logger
.debug(scenario
)
793 self
.__class
__.to_delete_list
.insert(0,{"item": "scenario", "function": test_config
["client"].delete_scenario
,
794 "params":{"uuid": scenario
['scenario']['uuid']} })
795 self
.__class
__.scenario_uuid
= scenario
['scenario']['uuid']
797 def test_010_instantiate_scenario(self
):
798 self
.__class
__.test_text
= "{}.{}. TEST {} {}".format(test_config
["test_number"], self
.__class
__.test_index
,
799 inspect
.currentframe().f_code
.co_name
,
800 test_config
["test_folder"])
801 self
.__class
__.test_index
+= 1
803 instance
= test_config
["client"].create_instance(scenario_id
=self
.__class
__.scenario_uuid
,
804 name
=self
.__class
__.test_text
)
805 self
.__class
__.instance_scenario_uuid
= instance
['uuid']
806 logger
.debug(instance
)
807 self
.__class
__.to_delete_list
.insert(0, {"item": "instance", "function": test_config
["client"].delete_instance
,
808 "params": {"uuid": instance
['uuid']}})
810 def test_020_check_deployent(self
):
811 self
.__class
__.test_text
= "{}.{}. TEST {} {}".format(test_config
["test_number"], self
.__class
__.test_index
,
812 inspect
.currentframe().f_code
.co_name
,
813 test_config
["test_folder"])
814 self
.__class
__.test_index
+= 1
816 if test_config
["manual"]:
817 raw_input('Scenario has been deployed. Perform manual check and press any key to resume')
820 keep_waiting
= test_config
["timeout"]
821 instance_active
= False
823 result
= check_instance_scenario_active(self
.__class
__.instance_scenario_uuid
)
826 elif 'ERROR' in result
[1]:
827 msg
= 'Got error while waiting for the instance to get active: '+result
[1]
831 if keep_waiting
>= 5:
834 elif keep_waiting
> 0:
835 time
.sleep(keep_waiting
)
838 msg
= 'Timeout reached while waiting instance scenario to get active'
842 def test_030_clean_deployment(self
):
843 self
.__class
__.test_text
= "{}.{}. TEST {} {}".format(test_config
["test_number"], self
.__class
__.test_index
,
844 inspect
.currentframe().f_code
.co_name
,
845 test_config
["test_folder"])
846 self
.__class
__.test_index
+= 1
847 #At the moment if you delete an scenario right after creating it, in openstack datacenters
848 #sometimes scenario ports get orphaned. This sleep is just a dirty workaround
850 for item
in self
.__class
__.to_delete_list
:
851 response
= item
["function"](**item
["params"])
852 logger
.debug(response
)
855 def _get_random_string(maxLength
):
856 '''generates a string with random characters string.letters and string.digits
857 with a random length up to maxLength characters. If maxLength is <15 it will be changed automatically to 15
861 minLength
= min_string
- len(prefix
)
862 if maxLength
< min_string
: maxLength
= min_string
863 maxLength
-= len(prefix
)
864 length
= random
.randint(minLength
,maxLength
)
865 return 'testing_'+"".join([random
.choice(string
.letters
+string
.digits
) for i
in xrange(length
)])
868 def test_vimconnector(args
):
870 sys
.path
.append(os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))) + "/osm_ro")
871 test_config
['vimtype'] = args
.vimtype
872 if args
.vimtype
== "vmware":
873 import vimconn_vmware
as vim
875 test_config
["test_directory"] = os
.path
.dirname(__file__
) + "/RO_tests"
877 tenant_name
= args
.tenant_name
878 test_config
['tenant'] = tenant_name
879 config_params
= json
.loads(args
.config_param
)
880 org_name
= config_params
.get('orgname')
881 org_user
= config_params
.get('user')
882 org_passwd
= config_params
.get('passwd')
883 vim_url
= args
.endpoint_url
885 # vmware connector obj
886 test_config
['vim_conn'] = vim
.vimconnector(name
=org_name
, tenant_name
=tenant_name
, user
=org_user
,passwd
=org_passwd
, url
=vim_url
, config
=config_params
)
888 elif args
.vimtype
== "aws":
889 import vimconn_aws
as vim
890 elif args
.vimtype
== "openstack":
891 import vimconn_openstack
as vim
892 elif args
.vimtype
== "openvim":
893 import vimconn_openvim
as vim
895 logger
.critical("vimtype '{}' not supported".format(args
.vimtype
))
899 clsmembers
= inspect
.getmembers(sys
.modules
[__name__
], inspect
.isclass
)
900 # If only want to obtain a tests list print it and exit
903 for cls
in clsmembers
:
904 if cls
[0].startswith('test_vimconnector'):
905 tests_names
.append(cls
[0])
907 msg
= "The 'vim' set tests are:\n\t" + ', '.join(sorted(tests_names
))
912 # Create the list of tests to be run
913 code_based_tests
= []
915 for test
in args
.tests
:
916 for t
in test
.split(','):
917 matches_code_based_tests
= [item
for item
in clsmembers
if item
[0] == t
]
918 if len(matches_code_based_tests
) > 0:
919 code_based_tests
.append(matches_code_based_tests
[0][1])
921 logger
.critical("Test '{}' is not among the possible ones".format(t
))
923 if not code_based_tests
:
925 for cls
in clsmembers
:
926 # We exclude 'test_VIM_tenant_operations' unless it is specifically requested by the user
927 if cls
[0].startswith('test_vimconnector'):
928 code_based_tests
.append(cls
[1])
930 logger
.debug("tests to be executed: {}".format(code_based_tests
))
932 # TextTestRunner stream is set to /dev/null in order to avoid the method to directly print the result of tests.
933 # This is handled in the tests using logging.
934 stream
= open('/dev/null', 'w')
936 # Run code based tests
937 basic_tests_suite
= unittest
.TestSuite()
938 for test
in code_based_tests
:
939 basic_tests_suite
.addTest(unittest
.makeSuite(test
))
940 result
= unittest
.TextTestRunner(stream
=stream
, failfast
=failfast
).run(basic_tests_suite
)
941 executed
+= result
.testsRun
942 failed
+= len(result
.failures
) + len(result
.errors
)
943 if failfast
and failed
:
945 if len(result
.failures
) > 0:
946 logger
.debug("failures : {}".format(result
.failures
))
947 if len(result
.errors
) > 0:
948 logger
.debug("errors : {}".format(result
.errors
))
949 return executed
, failed
954 sys
.path
.append(os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))) + "/osm_ro")
955 import openmanoclient
958 test_config
["client"] = openmanoclient
.openmanoclient(
959 endpoint_url
=args
.endpoint_url
,
960 tenant_name
=args
.tenant_name
,
961 datacenter_name
=args
.datacenter
,
962 debug
=args
.debug
, logger
=test_config
["logger_name"])
963 clsmembers
= inspect
.getmembers(sys
.modules
[__name__
], inspect
.isclass
)
964 # If only want to obtain a tests list print it and exit
967 for cls
in clsmembers
:
968 if cls
[0].startswith('test_VIM'):
969 tests_names
.append(cls
[0])
971 msg
= "The 'vim' set tests are:\n\t" + ', '.join(sorted(tests_names
)) +\
972 "\nNOTE: The test test_VIM_tenant_operations will fail in case the used datacenter is type OpenStack " \
973 "unless RO has access to the admin endpoint. Therefore this test is excluded by default"
978 # Create the list of tests to be run
979 code_based_tests
= []
981 for test
in args
.tests
:
982 for t
in test
.split(','):
983 matches_code_based_tests
= [item
for item
in clsmembers
if item
[0] == t
]
984 if len(matches_code_based_tests
) > 0:
985 code_based_tests
.append(matches_code_based_tests
[0][1])
987 logger
.critical("Test '{}' is not among the possible ones".format(t
))
989 if not code_based_tests
:
991 for cls
in clsmembers
:
992 # We exclude 'test_VIM_tenant_operations' unless it is specifically requested by the user
993 if cls
[0].startswith('test_VIM') and cls
[0] != 'test_VIM_tenant_operations':
994 code_based_tests
.append(cls
[1])
996 logger
.debug("tests to be executed: {}".format(code_based_tests
))
998 # TextTestRunner stream is set to /dev/null in order to avoid the method to directly print the result of tests.
999 # This is handled in the tests using logging.
1000 stream
= open('/dev/null', 'w')
1002 # Run code based tests
1003 basic_tests_suite
= unittest
.TestSuite()
1004 for test
in code_based_tests
:
1005 basic_tests_suite
.addTest(unittest
.makeSuite(test
))
1006 result
= unittest
.TextTestRunner(stream
=stream
, failfast
=failfast
).run(basic_tests_suite
)
1007 executed
+= result
.testsRun
1008 failed
+= len(result
.failures
) + len(result
.errors
)
1009 if failfast
and failed
:
1011 if len(result
.failures
) > 0:
1012 logger
.debug("failures : {}".format(result
.failures
))
1013 if len(result
.errors
) > 0:
1014 logger
.debug("errors : {}".format(result
.errors
))
1015 return executed
, failed
1018 def test_deploy(args
):
1020 sys
.path
.append(os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))) + "/osm_ro")
1021 import openmanoclient
1024 test_config
["test_directory"] = os
.path
.dirname(__file__
) + "/RO_tests"
1025 test_config
["image_name"] = args
.image_name
1026 test_config
["mgmt_net"] = args
.mgmt_net
1027 test_config
["manual"] = args
.manual
1028 test_directory_content
= os
.listdir(test_config
["test_directory"])
1029 # If only want to obtain a tests list print it and exit
1031 msg
= "he 'deploy' set tests are:\n\t" + ', '.join(sorted(test_directory_content
))
1036 descriptor_based_tests
= []
1037 # Create the list of tests to be run
1038 code_based_tests
= []
1040 for test
in args
.tests
:
1041 for t
in test
.split(','):
1042 if t
in test_directory_content
:
1043 descriptor_based_tests
.append(t
)
1045 logger
.critical("Test '{}' is not among the possible ones".format(t
))
1047 if not descriptor_based_tests
:
1049 descriptor_based_tests
= test_directory_content
1051 logger
.debug("tests to be executed: {}".format(code_based_tests
))
1053 # import openmanoclient from relative path
1054 test_config
["client"] = openmanoclient
.openmanoclient(
1055 endpoint_url
=args
.endpoint_url
,
1056 tenant_name
=args
.tenant_name
,
1057 datacenter_name
=args
.datacenter
,
1058 debug
=args
.debug
, logger
=test_config
["logger_name"])
1060 # TextTestRunner stream is set to /dev/null in order to avoid the method to directly print the result of tests.
1061 # This is handled in the tests using logging.
1062 stream
= open('/dev/null', 'w')
1063 # This scenario based tests are defined as directories inside the directory defined in 'test_directory'
1064 for test
in descriptor_based_tests
:
1065 test_config
["test_folder"] = test
1066 test_suite
= unittest
.TestSuite()
1067 test_suite
.addTest(unittest
.makeSuite(descriptor_based_scenario_test
))
1068 result
= unittest
.TextTestRunner(stream
=stream
, failfast
=False).run(test_suite
)
1069 executed
+= result
.testsRun
1070 failed
+= len(result
.failures
) + len(result
.errors
)
1071 if failfast
and failed
:
1073 if len(result
.failures
) > 0:
1074 logger
.debug("failures : {}".format(result
.failures
))
1075 if len(result
.errors
) > 0:
1076 logger
.debug("errors : {}".format(result
.errors
))
1078 return executed
, failed
1080 if __name__
=="__main__":
1082 parser
= ArgumentParser(description
='Test RO module')
1083 parser
.add_argument('-v','--version', action
='version', help="Show current version",
1084 version
='%(prog)s version ' + __version__
+ ' ' + version_date
)
1087 parent_parser
= ArgumentParser(add_help
=False)
1088 parent_parser
.add_argument('--failfast', help='Stop when a test fails rather than execute all tests',
1089 dest
='failfast', action
="store_true", default
=False)
1090 parent_parser
.add_argument('--failed', help='Set logs to show only failed tests. --debug disables this option',
1091 dest
='failed', action
="store_true", default
=False)
1092 default_logger_file
= os
.path
.dirname(__file__
)+'/'+os
.path
.splitext(os
.path
.basename(__file__
))[0]+'.log'
1093 parent_parser
.add_argument('--list-tests', help='List all available tests', dest
='list_tests', action
="store_true",
1095 parent_parser
.add_argument('--logger_file', dest
='logger_file', default
=default_logger_file
,
1096 help='Set the logger file. By default '+default_logger_file
)
1097 parent_parser
.add_argument("-t", '--tenant', dest
='tenant_name', default
="osm",
1098 help="Set the openmano tenant to use for the test. By default 'osm'")
1099 parent_parser
.add_argument('--debug', help='Set logs to debug level', dest
='debug', action
="store_true")
1100 parent_parser
.add_argument('--timeout', help='Specify the instantiation timeout in seconds. By default 300',
1101 dest
='timeout', type=int, default
=300)
1102 parent_parser
.add_argument('--test', '--tests', help='Specify the tests to run', dest
='tests', action
="append")
1104 subparsers
= parser
.add_subparsers(help='test sets')
1106 # Deployment test set
1107 # -------------------
1108 deploy_parser
= subparsers
.add_parser('deploy', parents
=[parent_parser
],
1109 help="test deployment using descriptors at RO_test folder ")
1110 deploy_parser
.set_defaults(func
=test_deploy
)
1112 # Mandatory arguments
1113 mandatory_arguments
= deploy_parser
.add_argument_group('mandatory arguments')
1114 mandatory_arguments
.add_argument('-d', '--datacenter', required
=True, help='Set the datacenter to test')
1115 mandatory_arguments
.add_argument("-i", '--image-name', required
=True, dest
="image_name",
1116 help='Image name available at datacenter used for the tests')
1117 mandatory_arguments
.add_argument("-n", '--mgmt-net-name', required
=True, dest
='mgmt_net',
1118 help='Set the vim management network to use for tests')
1120 # Optional arguments
1121 deploy_parser
.add_argument('-m', '--manual-check', dest
='manual', action
="store_true", default
=False,
1122 help='Pause execution once deployed to allow manual checking of the '
1123 'deployed instance scenario')
1124 deploy_parser
.add_argument('-u', '--url', dest
='endpoint_url', default
='http://localhost:9090/openmano',
1125 help="Set the openmano server url. By default 'http://localhost:9090/openmano'")
1128 # -------------------
1129 vimconn_parser
= subparsers
.add_parser('vimconn', parents
=[parent_parser
], help="test vimconnector plugin")
1130 vimconn_parser
.set_defaults(func
=test_vimconnector
)
1131 # Mandatory arguments
1132 mandatory_arguments
= vimconn_parser
.add_argument_group('mandatory arguments')
1133 mandatory_arguments
.add_argument('--vimtype', choices
=['vmware', 'aws', 'openstack', 'openvim'], required
=True,
1134 help='Set the vimconnector type to test')
1135 mandatory_arguments
.add_argument('-c', '--config', dest
='config_param', required
=True,
1136 help='Set the vimconnector specific config parameters in dictionary format')
1137 mandatory_arguments
.add_argument('-u', '--url', dest
='endpoint_url',required
=True, help="Set the vim connector url or Host IP")
1138 # Optional arguments
1139 # TODO add optional arguments for vimconn tests
1140 # vimconn_parser.add_argument("-i", '--image-name', dest='image_name', help='<HELP>'))
1142 # Datacenter test set
1143 # -------------------
1144 vimconn_parser
= subparsers
.add_parser('vim', parents
=[parent_parser
], help="test vim")
1145 vimconn_parser
.set_defaults(func
=test_vim
)
1147 # Mandatory arguments
1148 mandatory_arguments
= vimconn_parser
.add_argument_group('mandatory arguments')
1149 mandatory_arguments
.add_argument('-d', '--datacenter', required
=True, help='Set the datacenter to test')
1151 # Optional arguments
1152 vimconn_parser
.add_argument('-u', '--url', dest
='endpoint_url', default
='http://localhost:9090/openmano',
1153 help="Set the openmano server url. By default 'http://localhost:9090/openmano'")
1155 argcomplete
.autocomplete(parser
)
1156 args
= parser
.parse_args()
1160 # default logger level is INFO. Options --debug and --failed override this, being --debug prioritary
1161 logger_level
= 'INFO'
1163 logger_level
= 'DEBUG'
1165 logger_level
= 'WARNING'
1166 logger_name
= os
.path
.basename(__file__
)
1167 test_config
["logger_name"] = logger_name
1168 logger
= logging
.getLogger(logger_name
)
1169 logger
.setLevel(logger_level
)
1170 failfast
= args
.failfast
1172 # Configure a logging handler to store in a logging file
1173 if args
.logger_file
:
1174 fileHandler
= logging
.FileHandler(args
.logger_file
)
1175 formatter_fileHandler
= logging
.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s')
1176 fileHandler
.setFormatter(formatter_fileHandler
)
1177 logger
.addHandler(fileHandler
)
1179 # Configure a handler to print to stdout
1180 consoleHandler
= logging
.StreamHandler(sys
.stdout
)
1181 formatter_consoleHandler
= logging
.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s')
1182 consoleHandler
.setFormatter(formatter_consoleHandler
)
1183 logger
.addHandler(consoleHandler
)
1185 logger
.debug('Program started with the following arguments: ' + str(args
))
1187 # set test config parameters
1188 test_config
["timeout"] = args
.timeout
1189 test_config
["test_number"] = 1
1191 executed
, failed
= args
.func(args
)
1194 logger
.warning("Total number of tests: {}; Total number of failures/errors: {}".format(executed
, failed
))
1195 sys
.exit(1 if failed
else 0)