2 # -*- coding: utf-8 -*-
6 # This file is part of openmano
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
24 Module for testing openmano functionality. It uses openmanoclient.py for invoking openmano
26 __author__
= "Pablo Montes, Alfonso Tierno"
27 __date__
= "$16-Feb-2017 17:08:16$"
29 version_date
= "Jun 2017"
33 from argparse
import ArgumentParser
44 from pyvcloud
.vcloudair
import VCA
47 global test_config
# used for global variables with the test configuration
50 class test_base(unittest
.TestCase
):
56 logger
.info("{}. {}".format(test_config
["test_number"], cls
.__name
__))
59 def tearDownClass(cls
):
60 test_config
["test_number"] += 1
63 exec_info
= sys
.exc_info()
64 if exec_info
== (None, None, None):
65 logger
.info(self
.__class
__.test_text
+" -> TEST OK")
67 logger
.warning(self
.__class
__.test_text
+" -> TEST NOK")
68 logger
.critical("Traceback error",exc_info
=True)
71 def check_instance_scenario_active(uuid
):
72 instance
= test_config
["client"].get_instance(uuid
=uuid
)
74 for net
in instance
['nets']:
75 status
= net
['status']
76 if status
!= 'ACTIVE':
77 return (False, status
)
79 for vnf
in instance
['vnfs']:
82 if status
!= 'ACTIVE':
83 return (False, status
)
90 All unittest classes for code based tests must have prefix 'test_' in order to be taken into account for tests
92 class test_VIM_datacenter_tenant_operations(test_base
):
95 def test_000_create_RO_tenant(self
):
96 self
.__class
__.tenant_name
= _get_random_string(20)
97 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
98 inspect
.currentframe().f_code
.co_name
)
99 self
.__class
__.test_index
+= 1
100 tenant
= test_config
["client"].create_tenant(name
=self
.__class
__.tenant_name
,
101 description
=self
.__class
__.tenant_name
)
102 logger
.debug("{}".format(tenant
))
103 self
.assertEqual(tenant
.get('tenant', {}).get('name', ''), self
.__class
__.tenant_name
)
105 def test_010_list_RO_tenant(self
):
106 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
107 inspect
.currentframe().f_code
.co_name
)
108 self
.__class
__.test_index
+= 1
109 tenant
= test_config
["client"].get_tenant(name
=self
.__class
__.tenant_name
)
110 logger
.debug("{}".format(tenant
))
111 self
.assertEqual(tenant
.get('tenant', {}).get('name', ''), self
.__class
__.tenant_name
)
113 def test_020_delete_RO_tenant(self
):
114 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
115 inspect
.currentframe().f_code
.co_name
)
116 self
.__class
__.test_index
+= 1
117 tenant
= test_config
["client"].delete_tenant(name
=self
.__class
__.tenant_name
)
118 logger
.debug("{}".format(tenant
))
119 assert('deleted' in tenant
.get('result',""))
122 class test_VIM_datacenter_operations(test_base
):
123 datacenter_name
= None
125 def test_000_create_datacenter(self
):
126 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
127 inspect
.currentframe().f_code
.co_name
)
128 self
.__class
__.datacenter_name
= _get_random_string(20)
129 self
.__class
__.test_index
+= 1
130 self
.datacenter
= test_config
["client"].create_datacenter(name
=self
.__class
__.datacenter_name
,
131 vim_url
="http://fakeurl/fake")
132 logger
.debug("{}".format(self
.datacenter
))
133 self
.assertEqual (self
.datacenter
.get('datacenter', {}).get('name',''), self
.__class
__.datacenter_name
)
135 def test_010_list_datacenter(self
):
136 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
137 inspect
.currentframe().f_code
.co_name
)
139 self
.__class
__.test_index
+= 1
140 self
.datacenter
= test_config
["client"].get_datacenter(all_tenants
=True, name
=self
.__class
__.datacenter_name
)
141 logger
.debug("{}".format(self
.datacenter
))
142 self
.assertEqual (self
.datacenter
.get('datacenter', {}).get('name', ''), self
.__class
__.datacenter_name
)
144 def test_020_attach_datacenter(self
):
145 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
146 inspect
.currentframe().f_code
.co_name
)
148 self
.__class
__.test_index
+= 1
149 self
.datacenter
= test_config
["client"].attach_datacenter(name
=self
.__class
__.datacenter_name
,
150 vim_tenant_name
='fake')
151 logger
.debug("{}".format(self
.datacenter
))
152 assert ('vim_tenants' in self
.datacenter
.get('datacenter', {}))
154 def test_030_list_attached_datacenter(self
):
155 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
156 inspect
.currentframe().f_code
.co_name
)
158 self
.__class
__.test_index
+= 1
159 self
.datacenter
= test_config
["client"].get_datacenter(all_tenants
=False, name
=self
.__class
__.datacenter_name
)
160 logger
.debug("{}".format(self
.datacenter
))
161 self
.assertEqual (self
.datacenter
.get('datacenter', {}).get('name', ''), self
.__class
__.datacenter_name
)
163 def test_040_detach_datacenter(self
):
164 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
165 inspect
.currentframe().f_code
.co_name
)
167 self
.__class
__.test_index
+= 1
168 self
.datacenter
= test_config
["client"].detach_datacenter(name
=self
.__class
__.datacenter_name
)
169 logger
.debug("{}".format(self
.datacenter
))
170 assert ('detached' in self
.datacenter
.get('result', ""))
172 def test_050_delete_datacenter(self
):
173 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
174 inspect
.currentframe().f_code
.co_name
)
176 self
.__class
__.test_index
+= 1
177 self
.datacenter
= test_config
["client"].delete_datacenter(name
=self
.__class
__.datacenter_name
)
178 logger
.debug("{}".format(self
.datacenter
))
179 assert('deleted' in self
.datacenter
.get('result',""))
182 class test_VIM_network_operations(test_base
):
183 vim_network_name
= None
184 vim_network_uuid
= None
186 def test_000_create_VIM_network(self
):
187 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
188 inspect
.currentframe().f_code
.co_name
)
189 self
.__class
__.vim_network_name
= _get_random_string(20)
190 self
.__class
__.test_index
+= 1
191 network
= test_config
["client"].vim_action("create", "networks", name
=self
.__class
__.vim_network_name
)
192 logger
.debug("{}".format(network
))
193 self
.__class
__.vim_network_uuid
= network
["network"]["id"]
194 self
.assertEqual(network
.get('network', {}).get('name', ''), self
.__class
__.vim_network_name
)
196 def test_010_list_VIM_networks(self
):
197 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
198 inspect
.currentframe().f_code
.co_name
)
199 self
.__class
__.test_index
+= 1
200 networks
= test_config
["client"].vim_action("list", "networks")
201 logger
.debug("{}".format(networks
))
203 def test_020_get_VIM_network_by_uuid(self
):
204 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
205 inspect
.currentframe().f_code
.co_name
)
207 self
.__class
__.test_index
+= 1
208 network
= test_config
["client"].vim_action("show", "networks", uuid
=self
.__class
__.vim_network_uuid
)
209 logger
.debug("{}".format(network
))
210 self
.assertEqual(network
.get('network', {}).get('name', ''), self
.__class
__.vim_network_name
)
212 def test_030_delete_VIM_network_by_uuid(self
):
213 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
214 inspect
.currentframe().f_code
.co_name
)
216 self
.__class
__.test_index
+= 1
217 network
= test_config
["client"].vim_action("delete", "networks", uuid
=self
.__class
__.vim_network_uuid
)
218 logger
.debug("{}".format(network
))
219 assert ('deleted' in network
.get('result', ""))
222 class test_VIM_image_operations(test_base
):
224 def test_000_list_VIM_images(self
):
225 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
226 inspect
.currentframe().f_code
.co_name
)
227 self
.__class
__.test_index
+= 1
228 images
= test_config
["client"].vim_action("list", "images")
229 logger
.debug("{}".format(images
))
232 The following is a non critical test that will fail most of the times.
233 In case of OpenStack datacenter these tests will only success if RO has access to the admin endpoint
234 This test will only be executed in case it is specifically requested by the user
236 class test_VIM_tenant_operations(test_base
):
237 vim_tenant_name
= None
238 vim_tenant_uuid
= None
242 test_base
.setUpClass(cls
)
243 logger
.warning("In case of OpenStack datacenter these tests will only success "
244 "if RO has access to the admin endpoint")
246 def test_000_create_VIM_tenant(self
):
247 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
248 inspect
.currentframe().f_code
.co_name
)
249 self
.__class
__.vim_tenant_name
= _get_random_string(20)
250 self
.__class
__.test_index
+= 1
251 tenant
= test_config
["client"].vim_action("create", "tenants", name
=self
.__class
__.vim_tenant_name
)
252 logger
.debug("{}".format(tenant
))
253 self
.__class
__.vim_tenant_uuid
= tenant
["tenant"]["id"]
254 self
.assertEqual(tenant
.get('tenant', {}).get('name', ''), self
.__class
__.vim_tenant_name
)
256 def test_010_list_VIM_tenants(self
):
257 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
258 inspect
.currentframe().f_code
.co_name
)
259 self
.__class
__.test_index
+= 1
260 tenants
= test_config
["client"].vim_action("list", "tenants")
261 logger
.debug("{}".format(tenants
))
263 def test_020_get_VIM_tenant_by_uuid(self
):
264 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
265 inspect
.currentframe().f_code
.co_name
)
267 self
.__class
__.test_index
+= 1
268 tenant
= test_config
["client"].vim_action("show", "tenants", uuid
=self
.__class
__.vim_tenant_uuid
)
269 logger
.debug("{}".format(tenant
))
270 self
.assertEqual(tenant
.get('tenant', {}).get('name', ''), self
.__class
__.vim_tenant_name
)
272 def test_030_delete_VIM_tenant_by_uuid(self
):
273 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"], self
.__class
__.test_index
,
274 inspect
.currentframe().f_code
.co_name
)
276 self
.__class
__.test_index
+= 1
277 tenant
= test_config
["client"].vim_action("delete", "tenants", uuid
=self
.__class
__.vim_tenant_uuid
)
278 logger
.debug("{}".format(tenant
))
279 assert ('deleted' in tenant
.get('result', ""))
281 class test_vimconn_connect(test_base
):
286 # def setUpClass(cls):
287 # logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
290 # def tearDownClass(cls):
291 # test_config["test_number"] += 1
293 # def tearDown(self):
294 # exec_info = sys.exc_info()
295 # if exec_info == (None, None, None):
296 # logger.info(self.__class__.test_text+" -> TEST OK")
298 # logger.warning(self.__class__.test_text+" -> TEST NOK")
299 # logger.critical("Traceback error",exc_info=True)
301 def test_000_connect(self
):
302 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
303 self
.__class
__.test_index
,
304 inspect
.currentframe().f_code
.co_name
)
306 self
.__class
__.test_index
+= 1
307 if test_config
['vimtype'] == 'vmware':
308 vca_object
= test_config
["vim_conn"].connect()
309 logger
.debug("{}".format(vca_object
))
310 self
.assertIsInstance(vca_object
, VCA
)
313 class test_vimconn_new_network(test_base
):
319 # def setUpClass(cls):
320 # logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
323 # def tearDownClass(cls):
324 # test_config["test_number"] += 1
326 # def tearDown(self):
327 # exec_info = sys.exc_info()
328 # if exec_info == (None, None, None):
329 # logger.info(self.__class__.test_text+" -> TEST OK")
331 # logger.warning(self.__class__.test_text+" -> TEST NOK")
332 # logger.critical("Traceback error",exc_info=True)
334 def test_000_new_network(self
):
335 self
.__class
__.network_name
= _get_random_string(20)
336 network_type
= 'bridge'
338 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
339 self
.__class
__.test_index
, inspect
.currentframe().f_code
.co_name
)
340 self
.__class
__.test_index
+= 1
342 network
= test_config
["vim_conn"].new_network(net_name
=self
.__class
__.network_name
,
343 net_type
=network_type
)
344 self
.__class
__.network_id
= network
345 logger
.debug("{}".format(network
))
347 network_list
= test_config
["vim_conn"].get_vcd_network_list()
348 for net
in network_list
:
349 if self
.__class
__.network_name
in net
.get('name'):
350 self
.assertIn(self
.__class
__.network_name
, net
.get('name'))
351 self
.assertEqual(net
.get('type'), network_type
)
353 # Deleting created network
354 result
= test_config
["vim_conn"].delete_network(self
.__class
__.network_id
)
356 logger
.info("Network id {} sucessfully deleted".format(self
.__class
__.network_id
))
358 logger
.info("Failed to delete network id {}".format(self
.__class
__.network_id
))
360 def test_010_new_network_by_types(self
):
362 network_types
= ['data','bridge','mgmt']
363 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
364 self
.__class
__.test_index
,
365 inspect
.currentframe().f_code
.co_name
)
366 self
.__class
__.test_index
+= 1
367 for net_type
in network_types
:
368 self
.__class
__.network_name
= _get_random_string(20)
369 network_id
= test_config
["vim_conn"].new_network(net_name
=self
.__class
__.network_name
,
372 delete_net_ids
.append(network_id
)
373 logger
.debug("{}".format(network_id
))
375 network_list
= test_config
["vim_conn"].get_vcd_network_list()
376 for net
in network_list
:
377 if self
.__class
__.network_name
in net
.get('name'):
378 self
.assertIn(self
.__class
__.network_name
, net
.get('name'))
379 if net_type
in net
.get('type'):
380 self
.assertEqual(net
.get('type'), net_type
)
382 self
.assertNotEqual(net
.get('type'), net_type
)
384 # Deleting created network
385 for net_id
in delete_net_ids
:
386 result
= test_config
["vim_conn"].delete_network(net_id
)
388 logger
.info("Network id {} sucessfully deleted".format(net_id
))
390 logger
.info("Failed to delete network id {}".format(net_id
))
392 def test_020_new_network_by_ipprofile(self
):
393 test_directory_content
= os
.listdir(test_config
["test_directory"])
395 for dir_name
in test_directory_content
:
396 if dir_name
== 'simple_multi_vnfc':
397 self
.__class
__.scenario_test_path
= test_config
["test_directory"] + '/'+ dir_name
398 vnfd_files
= glob
.glob(self
.__class
__.scenario_test_path
+'/vnfd_*.yaml')
401 for vnfd
in vnfd_files
:
402 with
open(vnfd
, 'r') as stream
:
403 vnf_descriptor
= yaml
.load(stream
)
405 internal_connections_list
= vnf_descriptor
['vnf']['internal-connections']
406 for item
in internal_connections_list
:
407 if 'ip-profile' in item
:
408 version
= item
['ip-profile']['ip-version']
409 dhcp_count
= item
['ip-profile']['dhcp']['count']
410 dhcp_enabled
= item
['ip-profile']['dhcp']['enabled']
412 self
.__class
__.network_name
= _get_random_string(20)
413 ip_profile
= {'dhcp_count': dhcp_count
,
414 'dhcp_enabled': dhcp_enabled
,
415 'ip_version': version
417 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
418 self
.__class
__.test_index
,
419 inspect
.currentframe().f_code
.co_name
)
420 self
.__class
__.test_index
+= 1
421 network
= test_config
["vim_conn"].new_network(net_name
=self
.__class
__.network_name
,
423 ip_profile
=ip_profile
)
424 self
.__class
__.network_id
= network
425 logger
.debug("{}".format(network
))
427 network_list
= test_config
["vim_conn"].get_vcd_network_list()
428 for net
in network_list
:
429 if self
.__class
__.network_name
in net
.get('name'):
430 self
.assertIn(self
.__class
__.network_name
, net
.get('name'))
432 # Deleting created network
433 result
= test_config
["vim_conn"].delete_network(self
.__class
__.network_id
)
435 logger
.info("Network id {} sucessfully deleted".format(self
.__class
__.network_id
))
437 logger
.info("Failed to delete network id {}".format(self
.__class
__.network_id
))
439 def test_030_new_network_by_isshared(self
):
440 self
.__class
__.network_name
= _get_random_string(20)
442 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
443 self
.__class
__.test_index
,
444 inspect
.currentframe().f_code
.co_name
)
445 self
.__class
__.test_index
+= 1
446 network
= test_config
["vim_conn"].new_network(net_name
=self
.__class
__.network_name
,
449 self
.__class
__.network_id
= network
450 logger
.debug("{}".format(network
))
452 network_list
= test_config
["vim_conn"].get_vcd_network_list()
453 for net
in network_list
:
454 if self
.__class
__.network_name
in net
.get('name'):
455 self
.assertIn(self
.__class
__.network_name
, net
.get('name'))
456 self
.assertEqual(net
.get('shared'), shared
)
458 # Deleting created network
459 result
= test_config
["vim_conn"].delete_network(self
.__class
__.network_id
)
461 logger
.info("Network id {} sucessfully deleted".format(self
.__class
__.network_id
))
463 logger
.info("Failed to delete network id {}".format(self
.__class
__.network_id
))
465 def test_040_new_network_by_negative(self
):
466 self
.__class
__.network_name
= _get_random_string(20)
467 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
468 self
.__class
__.test_index
,
469 inspect
.currentframe().f_code
.co_name
)
470 self
.__class
__.test_index
+= 1
471 network
= test_config
["vim_conn"].new_network(net_name
=self
.__class
__.network_name
,
472 net_type
='unknowntype')
473 self
.__class
__.network_id
= network
474 logger
.debug("{}".format(network
))
475 network_list
= test_config
["vim_conn"].get_vcd_network_list()
476 for net
in network_list
:
477 if self
.__class
__.network_name
in net
.get('name'):
478 self
.assertIn(self
.__class
__.network_name
, net
.get('name'))
480 # Deleting created network
481 result
= test_config
["vim_conn"].delete_network(self
.__class
__.network_id
)
483 logger
.info("Network id {} sucessfully deleted".format(self
.__class
__.network_id
))
485 logger
.info("Failed to delete network id {}".format(self
.__class
__.network_id
))
487 class test_vimconn_get_network_list(test_base
):
493 # def setUpClass(cls):
494 # logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
497 # def tearDownClass(cls):
498 # test_config["test_number"] += 1
501 # creating new network
502 self
.__class
__.network_name
= _get_random_string(20)
503 self
.__class
__.net_type
= 'bridge'
504 network
= test_config
["vim_conn"].new_network(net_name
=self
.__class
__.network_name
,
505 net_type
=self
.__class
__.net_type
)
506 self
.__class
__.network_id
= network
507 logger
.debug("{}".format(network
))
510 test_base
.tearDown(self
)
511 # exec_info = sys.exc_info()
512 # if exec_info == (None, None, None):
513 # logger.info(self.__class__.test_text+" -> TEST OK")
515 # logger.warning(self.__class__.test_text+" -> TEST NOK")
516 # logger.critical("Traceback error",exc_info=True)
518 # Deleting created network
519 result
= test_config
["vim_conn"].delete_network(self
.__class
__.network_id
)
521 logger
.info("Network id {} sucessfully deleted".format(self
.__class
__.network_id
))
523 logger
.info("Failed to delete network id {}".format(self
.__class
__.network_id
))
525 def test_000_get_network_list(self
):
526 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
527 self
.__class
__.test_index
,
528 inspect
.currentframe().f_code
.co_name
)
529 self
.__class
__.test_index
+= 1
531 network_list
= test_config
["vim_conn"].get_network_list()
532 for net
in network_list
:
533 if self
.__class
__.network_name
in net
.get('name'):
534 self
.assertIn(self
.__class
__.network_name
, net
.get('name'))
535 self
.assertEqual(net
.get('type'), self
.__class
__.net_type
)
536 self
.assertEqual(net
.get('status'), 'ACTIVE')
537 self
.assertEqual(net
.get('shared'), False)
539 def test_010_get_network_list_by_name(self
):
540 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
541 self
.__class
__.test_index
,
542 inspect
.currentframe().f_code
.co_name
)
543 self
.__class
__.test_index
+= 1
545 network_name
= test_config
['vim_conn'].get_network_name_by_id(self
.__class
__.network_id
)
547 # find network from list by it's name
548 new_network_list
= test_config
["vim_conn"].get_network_list({'name': network_name
})
549 for list_item
in new_network_list
:
550 if self
.__class
__.network_name
in list_item
.get('name'):
551 self
.assertEqual(network_name
, list_item
.get('name'))
552 self
.assertEqual(list_item
.get('type'), self
.__class
__.net_type
)
553 self
.assertEqual(list_item
.get('status'), 'ACTIVE')
555 def test_020_get_network_list_by_id(self
):
556 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
557 self
.__class
__.test_index
,
558 inspect
.currentframe().f_code
.co_name
)
559 self
.__class
__.test_index
+= 1
561 # find network from list by it's id
562 new_network_list
= test_config
["vim_conn"].get_network_list({'id':self
.__class
__.network_id
})
563 for list_item
in new_network_list
:
564 if self
.__class
__.network_id
in list_item
.get('id'):
565 self
.assertEqual(self
.__class
__.network_id
, list_item
.get('id'))
566 self
.assertEqual(list_item
.get('type'), self
.__class
__.net_type
)
567 self
.assertEqual(list_item
.get('status'), 'ACTIVE')
569 def test_030_get_network_list_by_shared(self
):
571 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
572 self
.__class
__.test_index
,
573 inspect
.currentframe().f_code
.co_name
)
574 self
.__class
__.test_index
+= 1
576 network_name
= test_config
['vim_conn'].get_network_name_by_id(self
.__class
__.network_id
)
577 # find network from list by it's shared value
578 new_network_list
= test_config
["vim_conn"].get_network_list({'shared':Shared
,
579 'name':network_name
})
580 for list_item
in new_network_list
:
581 if list_item
.get('shared') == Shared
:
582 self
.assertEqual(list_item
.get('shared'), Shared
)
583 self
.assertEqual(list_item
.get('type'), self
.__class
__.net_type
)
584 self
.assertEqual(network_name
, list_item
.get('name'))
586 def test_040_get_network_list_by_tenant_id(self
):
587 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
588 self
.__class
__.test_index
,
589 inspect
.currentframe().f_code
.co_name
)
590 self
.__class
__.test_index
+= 1
592 tenant_list
= test_config
["vim_conn"].get_tenant_list()
593 network_name
= test_config
['vim_conn'].get_network_name_by_id(self
.__class
__.network_id
)
595 for tenant_item
in tenant_list
:
596 if test_config
['tenant'] == tenant_item
.get('name'):
597 # find network from list by it's tenant id
598 tenant_id
= tenant_item
.get('id')
599 new_network_list
= test_config
["vim_conn"].get_network_list({'tenant_id':tenant_id
,
600 'name':network_name
})
601 for list_item
in new_network_list
:
602 self
.assertEqual(tenant_id
, list_item
.get('tenant_id'))
603 self
.assertEqual(network_name
, list_item
.get('name'))
604 self
.assertEqual(list_item
.get('type'), self
.__class
__.net_type
)
605 self
.assertEqual(list_item
.get('status'), 'ACTIVE')
607 def test_050_get_network_list_by_status(self
):
608 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
609 self
.__class
__.test_index
,
610 inspect
.currentframe().f_code
.co_name
)
611 self
.__class
__.test_index
+= 1
614 network_name
= test_config
['vim_conn'].get_network_name_by_id(self
.__class
__.network_id
)
616 # find network from list by it's status
617 new_network_list
= test_config
["vim_conn"].get_network_list({'status':status
,
618 'name': network_name
})
619 for list_item
in new_network_list
:
620 self
.assertIn(self
.__class
__.network_name
, list_item
.get('name'))
621 self
.assertEqual(list_item
.get('type'), self
.__class
__.net_type
)
622 self
.assertEqual(list_item
.get('status'), status
)
624 def test_060_get_network_list_by_negative(self
):
625 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
626 self
.__class
__.test_index
,
627 inspect
.currentframe().f_code
.co_name
)
628 self
.__class
__.test_index
+= 1
630 network_list
= test_config
["vim_conn"].get_network_list({'name': 'unknown_name'})
631 self
.assertEqual(network_list
, [])
633 class test_vimconn_get_network(test_base
):
639 # def setUpClass(cls):
640 # logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
643 # def tearDownClass(cls):
644 # test_config["test_number"] += 1
647 # creating new network
648 self
.__class
__.network_name
= _get_random_string(20)
649 self
.__class
__.net_type
= 'bridge'
650 network
= test_config
["vim_conn"].new_network(net_name
=self
.__class
__.network_name
,
651 net_type
=self
.__class
__.net_type
)
652 self
.__class
__.network_id
= network
653 logger
.debug("{}".format(network
))
656 test_base
.tearDown(self
)
657 # exec_info = sys.exc_info()
658 # if exec_info == (None, None, None):
659 # logger.info(self.__class__.test_text+" -> TEST OK")
661 # logger.warning(self.__class__.test_text+" -> TEST NOK")
662 # logger.critical("Traceback error",exc_info=True)
664 # Deleting created network
665 result
= test_config
["vim_conn"].delete_network(self
.__class
__.network_id
)
667 logger
.info("Network id {} sucessfully deleted".format(self
.__class
__.network_id
))
669 logger
.info("Failed to delete network id {}".format(self
.__class
__.network_id
))
671 def test_000_get_network(self
):
672 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
673 self
.__class
__.test_index
,
674 inspect
.currentframe().f_code
.co_name
)
675 self
.__class
__.test_index
+= 1
677 network_info
= test_config
["vim_conn"].get_network(self
.__class
__.network_id
)
678 self
.assertEqual(network_info
.get('status'), 'ACTIVE')
679 self
.assertIn(self
.__class
__.network_name
, network_info
.get('name'))
680 self
.assertEqual(network_info
.get('type'), self
.__class
__.net_type
)
681 self
.assertEqual(network_info
.get('id'), self
.__class
__.network_id
)
683 def test_010_get_network_negative(self
):
684 Non_exist_id
= str(uuid
.uuid4())
685 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
686 self
.__class
__.test_index
,
687 inspect
.currentframe().f_code
.co_name
)
688 self
.__class
__.test_index
+= 1
690 network_info
= test_config
["vim_conn"].get_network(Non_exist_id
)
691 self
.assertEqual(network_info
, {})
693 class test_vimconn_delete_network(test_base
):
699 # def setUpClass(cls):
700 # logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
703 # def tearDownClass(cls):
704 # test_config["test_number"] += 1
706 # def tearDown(self):
707 # exec_info = sys.exc_info()
708 # if exec_info == (None, None, None):
709 # logger.info(self.__class__.test_text+" -> TEST OK")
711 # logger.warning(self.__class__.test_text+" -> TEST NOK")
712 # logger.critical("Traceback error",exc_info=True)
714 def test_000_delete_network(self
):
716 self
.__class
__.network_name
= _get_random_string(20)
717 self
.__class
__.net_type
= 'bridge'
718 network
= test_config
["vim_conn"].new_network(net_name
=self
.__class
__.network_name
,
719 net_type
=self
.__class
__.net_type
)
720 self
.__class
__.network_id
= network
721 logger
.debug("{}".format(network
))
723 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
724 self
.__class
__.test_index
,
725 inspect
.currentframe().f_code
.co_name
)
726 self
.__class
__.test_index
+= 1
728 result
= test_config
["vim_conn"].delete_network(self
.__class
__.network_id
)
730 logger
.info("Network id {} sucessfully deleted".format(self
.__class
__.network_id
))
732 logger
.info("Failed to delete network id {}".format(self
.__class
__.network_id
))
734 # after deleting network we check in network list
735 network_list
= test_config
["vim_conn"].get_network_list({ 'id':self
.__class
__.network_id
})
736 self
.assertEqual(network_list
, [])
738 def test_010_delete_network_negative(self
):
739 Non_exist_id
= str(uuid
.uuid4())
741 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
742 self
.__class
__.test_index
,
743 inspect
.currentframe().f_code
.co_name
)
744 self
.__class
__.test_index
+= 1
746 with self
.assertRaises(Exception) as context
:
747 test_config
["vim_conn"].delete_network(Non_exist_id
)
749 self
.assertEqual((context
.exception
).http_code
, 400)
751 class test_vimconn_get_flavor(test_base
):
756 # def setUpClass(cls):
757 # logger.info("{}. {}".format(test_config["test_number"], cls.__name__))
760 # def tearDownClass(cls):
761 # test_config["test_number"] += 1
763 # def tearDown(self):
764 # exec_info = sys.exc_info()
765 # if exec_info == (None, None, None):
766 # logger.info(self.__class__.test_text+" -> TEST OK")
768 # logger.warning(self.__class__.test_text+" -> TEST NOK")
769 # logger.critical("Traceback error",exc_info=True)
771 def test_000_get_flavor(self
):
772 test_directory_content
= os
.listdir(test_config
["test_directory"])
774 for dir_name
in test_directory_content
:
775 if dir_name
== 'simple_linux':
776 self
.__class
__.scenario_test_path
= test_config
["test_directory"] + '/'+ dir_name
777 vnfd_files
= glob
.glob(self
.__class
__.scenario_test_path
+'/vnfd_*.yaml')
780 for vnfd
in vnfd_files
:
781 with
open(vnfd
, 'r') as stream
:
782 vnf_descriptor
= yaml
.load(stream
)
784 vnfc_list
= vnf_descriptor
['vnf']['VNFC']
785 for item
in vnfc_list
:
786 if 'ram' in item
and 'vcpus' in item
and 'disk' in item
:
788 vcpus
= item
['vcpus']
791 flavor_data
= {'ram': ram
,
796 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
797 self
.__class
__.test_index
,
798 inspect
.currentframe().f_code
.co_name
)
799 self
.__class
__.test_index
+= 1
801 flavor_id
= test_config
["vim_conn"].new_flavor(flavor_data
)
803 result
= test_config
["vim_conn"].get_flavor(flavor_id
)
804 self
.assertEqual(ram
, result
['ram'])
805 self
.assertEqual(vcpus
, result
['vcpus'])
806 self
.assertEqual(disk
, result
['disk'])
809 result
= test_config
["vim_conn"].delete_flavor(flavor_id
)
811 logger
.info("Flavor id {} sucessfully deleted".format(result
))
813 logger
.info("Failed to delete flavor id {}".format(result
))
815 def test_010_get_flavor_negative(self
):
816 Non_exist_flavor_id
= str(uuid
.uuid4())
818 self
.__class
__.test_text
= "{}.{}. TEST {}".format(test_config
["test_number"],
819 self
.__class
__.test_index
,
820 inspect
.currentframe().f_code
.co_name
)
821 self
.__class
__.test_index
+= 1
823 with self
.assertRaises(Exception) as context
:
824 test_config
["vim_conn"].get_flavor(Non_exist_flavor_id
)
826 self
.assertEqual((context
.exception
).http_code
, 404)
831 The following unittest class does not have the 'test_' on purpose. This test is the one used for the
832 scenario based tests.
834 class descriptor_based_scenario_test(test_base
):
836 scenario_test_path
= None
838 instance_scenario_uuid
= None
844 cls
.to_delete_list
= []
845 cls
.scenario_test_path
= test_config
["test_directory"] + '/' + test_config
["test_folder"]
846 logger
.info("{}. {} {}".format(test_config
["test_number"], cls
.__name
__, test_config
["test_folder"]))
849 def tearDownClass(cls
):
850 test_config
["test_number"] += 1
852 def test_000_load_scenario(self
):
853 self
.__class
__.test_text
= "{}.{}. TEST {} {}".format(test_config
["test_number"], self
.__class
__.test_index
,
854 inspect
.currentframe().f_code
.co_name
,
855 test_config
["test_folder"])
856 self
.__class
__.test_index
+= 1
857 vnfd_files
= glob
.glob(self
.__class
__.scenario_test_path
+'/vnfd_*.yaml')
858 scenario_file
= glob
.glob(self
.__class
__.scenario_test_path
+ '/scenario_*.yaml')
859 if len(vnfd_files
) == 0 or len(scenario_file
) > 1:
860 raise Exception("Test '{}' not valid. It must contain an scenario file and at least one vnfd file'".format(
861 test_config
["test_folder"]))
864 for vnfd
in vnfd_files
:
865 with
open(vnfd
, 'r') as stream
:
866 vnf_descriptor
= yaml
.load(stream
)
868 vnfc_list
= vnf_descriptor
['vnf']['VNFC']
869 for vnfc
in vnfc_list
:
870 vnfc
['image name'] = test_config
["image_name"]
871 devices
= vnfc
.get('devices',[])
872 for device
in devices
:
873 if device
['type'] == 'disk' and 'image name' in device
:
874 device
['image name'] = test_config
["image_name"]
876 logger
.debug("VNF descriptor: {}".format(vnf_descriptor
))
877 vnf
= test_config
["client"].create_vnf(descriptor
=vnf_descriptor
)
879 self
.__class
__.to_delete_list
.insert(0, {"item": "vnf", "function": test_config
["client"].delete_vnf
,
880 "params": {"uuid": vnf
['vnf']['uuid']}})
882 #load the scenario definition
883 with
open(scenario_file
[0], 'r') as stream
:
884 scenario_descriptor
= yaml
.load(stream
)
885 networks
= scenario_descriptor
['scenario']['networks']
886 networks
[test_config
["mgmt_net"]] = networks
.pop('mgmt')
887 logger
.debug("Scenario descriptor: {}".format(scenario_descriptor
))
888 scenario
= test_config
["client"].create_scenario(descriptor
=scenario_descriptor
)
889 logger
.debug(scenario
)
890 self
.__class
__.to_delete_list
.insert(0,{"item": "scenario", "function": test_config
["client"].delete_scenario
,
891 "params":{"uuid": scenario
['scenario']['uuid']} })
892 self
.__class
__.scenario_uuid
= scenario
['scenario']['uuid']
894 def test_010_instantiate_scenario(self
):
895 self
.__class
__.test_text
= "{}.{}. TEST {} {}".format(test_config
["test_number"], self
.__class
__.test_index
,
896 inspect
.currentframe().f_code
.co_name
,
897 test_config
["test_folder"])
898 self
.__class
__.test_index
+= 1
900 instance
= test_config
["client"].create_instance(scenario_id
=self
.__class
__.scenario_uuid
,
901 name
=self
.__class
__.test_text
)
902 self
.__class
__.instance_scenario_uuid
= instance
['uuid']
903 logger
.debug(instance
)
904 self
.__class
__.to_delete_list
.insert(0, {"item": "instance", "function": test_config
["client"].delete_instance
,
905 "params": {"uuid": instance
['uuid']}})
907 def test_020_check_deployent(self
):
908 self
.__class
__.test_text
= "{}.{}. TEST {} {}".format(test_config
["test_number"], self
.__class
__.test_index
,
909 inspect
.currentframe().f_code
.co_name
,
910 test_config
["test_folder"])
911 self
.__class
__.test_index
+= 1
913 if test_config
["manual"]:
914 raw_input('Scenario has been deployed. Perform manual check and press any key to resume')
917 keep_waiting
= test_config
["timeout"]
918 instance_active
= False
920 result
= check_instance_scenario_active(self
.__class
__.instance_scenario_uuid
)
923 elif 'ERROR' in result
[1]:
924 msg
= 'Got error while waiting for the instance to get active: '+result
[1]
928 if keep_waiting
>= 5:
931 elif keep_waiting
> 0:
932 time
.sleep(keep_waiting
)
935 msg
= 'Timeout reached while waiting instance scenario to get active'
939 def test_030_clean_deployment(self
):
940 self
.__class
__.test_text
= "{}.{}. TEST {} {}".format(test_config
["test_number"], self
.__class
__.test_index
,
941 inspect
.currentframe().f_code
.co_name
,
942 test_config
["test_folder"])
943 self
.__class
__.test_index
+= 1
944 #At the moment if you delete an scenario right after creating it, in openstack datacenters
945 #sometimes scenario ports get orphaned. This sleep is just a dirty workaround
947 for item
in self
.__class
__.to_delete_list
:
948 response
= item
["function"](**item
["params"])
949 logger
.debug(response
)
952 def _get_random_string(maxLength
):
953 '''generates a string with random characters string.letters and string.digits
954 with a random length up to maxLength characters. If maxLength is <15 it will be changed automatically to 15
958 minLength
= min_string
- len(prefix
)
959 if maxLength
< min_string
: maxLength
= min_string
960 maxLength
-= len(prefix
)
961 length
= random
.randint(minLength
,maxLength
)
962 return 'testing_'+"".join([random
.choice(string
.letters
+string
.digits
) for i
in xrange(length
)])
965 def test_vimconnector(args
):
967 sys
.path
.append(os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))) + "/osm_ro")
968 test_config
['vimtype'] = args
.vimtype
969 if args
.vimtype
== "vmware":
970 import vimconn_vmware
as vim
972 test_config
["test_directory"] = os
.path
.dirname(__file__
) + "/RO_tests"
974 tenant_name
= args
.tenant_name
975 test_config
['tenant'] = tenant_name
976 config_params
= json
.loads(args
.config_param
)
977 org_name
= config_params
.get('orgname')
978 org_user
= config_params
.get('user')
979 org_passwd
= config_params
.get('passwd')
980 vim_url
= args
.endpoint_url
982 # vmware connector obj
983 test_config
['vim_conn'] = vim
.vimconnector(name
=org_name
, tenant_name
=tenant_name
, user
=org_user
,passwd
=org_passwd
, url
=vim_url
, config
=config_params
)
985 elif args
.vimtype
== "aws":
986 import vimconn_aws
as vim
987 elif args
.vimtype
== "openstack":
988 import vimconn_openstack
as vim
989 elif args
.vimtype
== "openvim":
990 import vimconn_openvim
as vim
992 logger
.critical("vimtype '{}' not supported".format(args
.vimtype
))
996 clsmembers
= inspect
.getmembers(sys
.modules
[__name__
], inspect
.isclass
)
997 # If only want to obtain a tests list print it and exit
1000 for cls
in clsmembers
:
1001 if cls
[0].startswith('test_vimconnector'):
1002 tests_names
.append(cls
[0])
1004 msg
= "The 'vim' set tests are:\n\t" + ', '.join(sorted(tests_names
))
1009 # Create the list of tests to be run
1010 code_based_tests
= []
1012 for test
in args
.tests
:
1013 for t
in test
.split(','):
1014 matches_code_based_tests
= [item
for item
in clsmembers
if item
[0] == t
]
1015 if len(matches_code_based_tests
) > 0:
1016 code_based_tests
.append(matches_code_based_tests
[0][1])
1018 logger
.critical("Test '{}' is not among the possible ones".format(t
))
1020 if not code_based_tests
:
1022 for cls
in clsmembers
:
1023 # We exclude 'test_VIM_tenant_operations' unless it is specifically requested by the user
1024 if cls
[0].startswith('test_vimconnector'):
1025 code_based_tests
.append(cls
[1])
1027 logger
.debug("tests to be executed: {}".format(code_based_tests
))
1029 # TextTestRunner stream is set to /dev/null in order to avoid the method to directly print the result of tests.
1030 # This is handled in the tests using logging.
1031 stream
= open('/dev/null', 'w')
1033 # Run code based tests
1034 basic_tests_suite
= unittest
.TestSuite()
1035 for test
in code_based_tests
:
1036 basic_tests_suite
.addTest(unittest
.makeSuite(test
))
1037 result
= unittest
.TextTestRunner(stream
=stream
, failfast
=failfast
).run(basic_tests_suite
)
1038 executed
+= result
.testsRun
1039 failed
+= len(result
.failures
) + len(result
.errors
)
1040 if failfast
and failed
:
1042 if len(result
.failures
) > 0:
1043 logger
.debug("failures : {}".format(result
.failures
))
1044 if len(result
.errors
) > 0:
1045 logger
.debug("errors : {}".format(result
.errors
))
1046 return executed
, failed
1051 sys
.path
.append(os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))) + "/osm_ro")
1052 import openmanoclient
1055 test_config
["client"] = openmanoclient
.openmanoclient(
1056 endpoint_url
=args
.endpoint_url
,
1057 tenant_name
=args
.tenant_name
,
1058 datacenter_name
=args
.datacenter
,
1059 debug
=args
.debug
, logger
=test_config
["logger_name"])
1060 clsmembers
= inspect
.getmembers(sys
.modules
[__name__
], inspect
.isclass
)
1061 # If only want to obtain a tests list print it and exit
1064 for cls
in clsmembers
:
1065 if cls
[0].startswith('test_VIM'):
1066 tests_names
.append(cls
[0])
1068 msg
= "The 'vim' set tests are:\n\t" + ', '.join(sorted(tests_names
)) +\
1069 "\nNOTE: The test test_VIM_tenant_operations will fail in case the used datacenter is type OpenStack " \
1070 "unless RO has access to the admin endpoint. Therefore this test is excluded by default"
1075 # Create the list of tests to be run
1076 code_based_tests
= []
1078 for test
in args
.tests
:
1079 for t
in test
.split(','):
1080 matches_code_based_tests
= [item
for item
in clsmembers
if item
[0] == t
]
1081 if len(matches_code_based_tests
) > 0:
1082 code_based_tests
.append(matches_code_based_tests
[0][1])
1084 logger
.critical("Test '{}' is not among the possible ones".format(t
))
1086 if not code_based_tests
:
1088 for cls
in clsmembers
:
1089 # We exclude 'test_VIM_tenant_operations' unless it is specifically requested by the user
1090 if cls
[0].startswith('test_VIM') and cls
[0] != 'test_VIM_tenant_operations':
1091 code_based_tests
.append(cls
[1])
1093 logger
.debug("tests to be executed: {}".format(code_based_tests
))
1095 # TextTestRunner stream is set to /dev/null in order to avoid the method to directly print the result of tests.
1096 # This is handled in the tests using logging.
1097 stream
= open('/dev/null', 'w')
1099 # Run code based tests
1100 basic_tests_suite
= unittest
.TestSuite()
1101 for test
in code_based_tests
:
1102 basic_tests_suite
.addTest(unittest
.makeSuite(test
))
1103 result
= unittest
.TextTestRunner(stream
=stream
, failfast
=failfast
).run(basic_tests_suite
)
1104 executed
+= result
.testsRun
1105 failed
+= len(result
.failures
) + len(result
.errors
)
1106 if failfast
and failed
:
1108 if len(result
.failures
) > 0:
1109 logger
.debug("failures : {}".format(result
.failures
))
1110 if len(result
.errors
) > 0:
1111 logger
.debug("errors : {}".format(result
.errors
))
1112 return executed
, failed
1115 def test_deploy(args
):
1117 sys
.path
.append(os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))) + "/osm_ro")
1118 import openmanoclient
1121 test_config
["test_directory"] = os
.path
.dirname(__file__
) + "/RO_tests"
1122 test_config
["image_name"] = args
.image_name
1123 test_config
["mgmt_net"] = args
.mgmt_net
1124 test_config
["manual"] = args
.manual
1125 test_directory_content
= os
.listdir(test_config
["test_directory"])
1126 # If only want to obtain a tests list print it and exit
1128 msg
= "the 'deploy' set tests are:\n\t" + ', '.join(sorted(test_directory_content
))
1133 descriptor_based_tests
= []
1134 # Create the list of tests to be run
1135 code_based_tests
= []
1137 for test
in args
.tests
:
1138 for t
in test
.split(','):
1139 if t
in test_directory_content
:
1140 descriptor_based_tests
.append(t
)
1142 logger
.critical("Test '{}' is not among the possible ones".format(t
))
1144 if not descriptor_based_tests
:
1146 descriptor_based_tests
= test_directory_content
1148 logger
.debug("tests to be executed: {}".format(code_based_tests
))
1150 # import openmanoclient from relative path
1151 test_config
["client"] = openmanoclient
.openmanoclient(
1152 endpoint_url
=args
.endpoint_url
,
1153 tenant_name
=args
.tenant_name
,
1154 datacenter_name
=args
.datacenter
,
1155 debug
=args
.debug
, logger
=test_config
["logger_name"])
1157 # TextTestRunner stream is set to /dev/null in order to avoid the method to directly print the result of tests.
1158 # This is handled in the tests using logging.
1159 stream
= open('/dev/null', 'w')
1160 # This scenario based tests are defined as directories inside the directory defined in 'test_directory'
1161 for test
in descriptor_based_tests
:
1162 test_config
["test_folder"] = test
1163 test_suite
= unittest
.TestSuite()
1164 test_suite
.addTest(unittest
.makeSuite(descriptor_based_scenario_test
))
1165 result
= unittest
.TextTestRunner(stream
=stream
, failfast
=False).run(test_suite
)
1166 executed
+= result
.testsRun
1167 failed
+= len(result
.failures
) + len(result
.errors
)
1168 if failfast
and failed
:
1170 if len(result
.failures
) > 0:
1171 logger
.debug("failures : {}".format(result
.failures
))
1172 if len(result
.errors
) > 0:
1173 logger
.debug("errors : {}".format(result
.errors
))
1175 return executed
, failed
1177 if __name__
=="__main__":
1179 parser
= ArgumentParser(description
='Test RO module')
1180 parser
.add_argument('-v','--version', action
='version', help="Show current version",
1181 version
='%(prog)s version ' + __version__
+ ' ' + version_date
)
1184 parent_parser
= ArgumentParser(add_help
=False)
1185 parent_parser
.add_argument('--failfast', help='Stop when a test fails rather than execute all tests',
1186 dest
='failfast', action
="store_true", default
=False)
1187 parent_parser
.add_argument('--failed', help='Set logs to show only failed tests. --debug disables this option',
1188 dest
='failed', action
="store_true", default
=False)
1189 default_logger_file
= os
.path
.dirname(__file__
)+'/'+os
.path
.splitext(os
.path
.basename(__file__
))[0]+'.log'
1190 parent_parser
.add_argument('--list-tests', help='List all available tests', dest
='list_tests', action
="store_true",
1192 parent_parser
.add_argument('--logger_file', dest
='logger_file', default
=default_logger_file
,
1193 help='Set the logger file. By default '+default_logger_file
)
1194 parent_parser
.add_argument("-t", '--tenant', dest
='tenant_name', default
="osm",
1195 help="Set the openmano tenant to use for the test. By default 'osm'")
1196 parent_parser
.add_argument('--debug', help='Set logs to debug level', dest
='debug', action
="store_true")
1197 parent_parser
.add_argument('--timeout', help='Specify the instantiation timeout in seconds. By default 300',
1198 dest
='timeout', type=int, default
=300)
1199 parent_parser
.add_argument('--test', '--tests', help='Specify the tests to run', dest
='tests', action
="append")
1201 subparsers
= parser
.add_subparsers(help='test sets')
1203 # Deployment test set
1204 # -------------------
1205 deploy_parser
= subparsers
.add_parser('deploy', parents
=[parent_parser
],
1206 help="test deployment using descriptors at RO_test folder ")
1207 deploy_parser
.set_defaults(func
=test_deploy
)
1209 # Mandatory arguments
1210 mandatory_arguments
= deploy_parser
.add_argument_group('mandatory arguments')
1211 mandatory_arguments
.add_argument('-d', '--datacenter', required
=True, help='Set the datacenter to test')
1212 mandatory_arguments
.add_argument("-i", '--image-name', required
=True, dest
="image_name",
1213 help='Image name available at datacenter used for the tests')
1214 mandatory_arguments
.add_argument("-n", '--mgmt-net-name', required
=True, dest
='mgmt_net',
1215 help='Set the vim management network to use for tests')
1217 # Optional arguments
1218 deploy_parser
.add_argument('-m', '--manual-check', dest
='manual', action
="store_true", default
=False,
1219 help='Pause execution once deployed to allow manual checking of the '
1220 'deployed instance scenario')
1221 deploy_parser
.add_argument('-u', '--url', dest
='endpoint_url', default
='http://localhost:9090/openmano',
1222 help="Set the openmano server url. By default 'http://localhost:9090/openmano'")
1225 # -------------------
1226 vimconn_parser
= subparsers
.add_parser('vimconn', parents
=[parent_parser
], help="test vimconnector plugin")
1227 vimconn_parser
.set_defaults(func
=test_vimconnector
)
1228 # Mandatory arguments
1229 mandatory_arguments
= vimconn_parser
.add_argument_group('mandatory arguments')
1230 mandatory_arguments
.add_argument('--vimtype', choices
=['vmware', 'aws', 'openstack', 'openvim'], required
=True,
1231 help='Set the vimconnector type to test')
1232 mandatory_arguments
.add_argument('-c', '--config', dest
='config_param', required
=True,
1233 help='Set the vimconnector specific config parameters in dictionary format')
1234 mandatory_arguments
.add_argument('-u', '--url', dest
='endpoint_url',required
=True, help="Set the vim connector url or Host IP")
1235 # Optional arguments
1236 # TODO add optional arguments for vimconn tests
1237 # vimconn_parser.add_argument("-i", '--image-name', dest='image_name', help='<HELP>'))
1239 # Datacenter test set
1240 # -------------------
1241 vimconn_parser
= subparsers
.add_parser('vim', parents
=[parent_parser
], help="test vim")
1242 vimconn_parser
.set_defaults(func
=test_vim
)
1244 # Mandatory arguments
1245 mandatory_arguments
= vimconn_parser
.add_argument_group('mandatory arguments')
1246 mandatory_arguments
.add_argument('-d', '--datacenter', required
=True, help='Set the datacenter to test')
1248 # Optional arguments
1249 vimconn_parser
.add_argument('-u', '--url', dest
='endpoint_url', default
='http://localhost:9090/openmano',
1250 help="Set the openmano server url. By default 'http://localhost:9090/openmano'")
1252 argcomplete
.autocomplete(parser
)
1253 args
= parser
.parse_args()
1257 # default logger level is INFO. Options --debug and --failed override this, being --debug prioritary
1258 logger_level
= 'INFO'
1260 logger_level
= 'DEBUG'
1262 logger_level
= 'WARNING'
1263 logger_name
= os
.path
.basename(__file__
)
1264 test_config
["logger_name"] = logger_name
1265 logger
= logging
.getLogger(logger_name
)
1266 logger
.setLevel(logger_level
)
1267 failfast
= args
.failfast
1269 # Configure a logging handler to store in a logging file
1270 if args
.logger_file
:
1271 fileHandler
= logging
.FileHandler(args
.logger_file
)
1272 formatter_fileHandler
= logging
.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s')
1273 fileHandler
.setFormatter(formatter_fileHandler
)
1274 logger
.addHandler(fileHandler
)
1276 # Configure a handler to print to stdout
1277 consoleHandler
= logging
.StreamHandler(sys
.stdout
)
1278 formatter_consoleHandler
= logging
.Formatter('%(asctime)s %(name)s %(levelname)s: %(message)s')
1279 consoleHandler
.setFormatter(formatter_consoleHandler
)
1280 logger
.addHandler(consoleHandler
)
1282 logger
.debug('Program started with the following arguments: ' + str(args
))
1284 # set test config parameters
1285 test_config
["timeout"] = args
.timeout
1286 test_config
["test_number"] = 1
1288 executed
, failed
= args
.func(args
)
1291 logger
.warning("Total number of tests: {}; Total number of failures/errors: {}".format(executed
, failed
))
1292 sys
.exit(1 if failed
else 0)