def _format_exception(self, exception):
'''Transform a keystone, nova, neutron exception into a vimconn exception'''
- if isinstance(exception, (HTTPException, gl1Exceptions.HTTPException, gl1Exceptions.CommunicationError,
- ConnectionError, ksExceptions.ConnectionError, neExceptions.ConnectionFailed
- )):
- raise vimconn.vimconnConnectionException(type(exception).__name__ + ": " + str(exception))
- elif isinstance(exception, (neExceptions.NetworkNotFoundClient, nvExceptions.NotFound)):
+ if isinstance(exception, (neExceptions.NetworkNotFoundClient, nvExceptions.NotFound, ksExceptions.NotFound, gl1Exceptions.HTTPNotFound)):
raise vimconn.vimconnNotFoundException(type(exception).__name__ + ": " + str(exception))
- elif isinstance(exception, (KeyError, nvExceptions.BadRequest)):
+ elif isinstance(exception, (HTTPException, gl1Exceptions.HTTPException, gl1Exceptions.CommunicationError,
+ ConnectionError, ksExceptions.ConnectionError, neExceptions.ConnectionFailed)):
+ raise vimconn.vimconnConnectionException(type(exception).__name__ + ": " + str(exception))
+ elif isinstance(exception, (KeyError, nvExceptions.BadRequest, ksExceptions.BadRequest)):
raise vimconn.vimconnException(type(exception).__name__ + ": " + str(exception))
elif isinstance(exception, (nvExceptions.ClientException, ksExceptions.ClientException,
neExceptions.NeutronException)):
else:
project = self.keystone.tenants.create(tenant_name, tenant_description)
return project.id
- except (ksExceptions.ConnectionError, ksExceptions.ClientException, ConnectionError) as e:
+ except (ksExceptions.ConnectionError, ksExceptions.ClientException, ksExceptions.BadRequest, ConnectionError) as e:
self._format_exception(e)
def delete_tenant(self, tenant_id):
else:
self.keystone.tenants.delete(tenant_id)
return tenant_id
- except (ksExceptions.ConnectionError, ksExceptions.ClientException, ConnectionError) as e:
+ except (ksExceptions.ConnectionError, ksExceptions.ClientException, ksExceptions.NotFound, ConnectionError) as e:
self._format_exception(e)
def new_network(self,net_name, net_type, ip_profile=None, shared=False, vlan=None):
else:
disk_format="raw"
self.logger.debug("new_image: '%s' loading from '%s'", image_dict['name'], image_dict['location'])
- new_image = self.glance.images.create(name=image_dict['name'])
+ if self.vim_type == "VIO":
+ container_format = "bare"
+ if 'container_format' in image_dict:
+ container_format = image_dict['container_format']
+ new_image = self.glance.images.create(name=image_dict['name'], container_format=container_format,
+ disk_format=disk_format)
+ else:
+ new_image = self.glance.images.create(name=image_dict['name'])
if image_dict['location'].startswith("http"):
# TODO there is not a method to direct download. It must be downloaded locally with requests
raise vimconn.vimconnNotImplemented("Cannot create image from URL")
#new_image = self.glancev1.images.create(name=image_dict['name'], is_public=image_dict.get('public',"yes")=="yes",
# container_format="bare", data=fimage, disk_format=disk_format)
metadata_to_load = image_dict.get('metadata')
- #TODO location is a reserved word for current openstack versions. Use another word
- metadata_to_load['location'] = image_dict['location']
+ # TODO location is a reserved word for current openstack versions. fixed for VIO please check for openstack
+ if self.vim_type == "VIO":
+ metadata_to_load['upload_location'] = image_dict['location']
+ else:
+ metadata_to_load['location'] = image_dict['location']
self.glance.images.update(new_image.id, **metadata_to_load)
return new_image.id
except (nvExceptions.Conflict, ksExceptions.ClientException, nvExceptions.ClientException) as e:
self._reload_connection()
self.glance.images.delete(image_id)
return image_id
- except (nvExceptions.NotFound, ksExceptions.ClientException, nvExceptions.ClientException, gl1Exceptions.CommunicationError, ConnectionError) as e: #TODO remove
+ except (nvExceptions.NotFound, ksExceptions.ClientException, nvExceptions.ClientException, gl1Exceptions.CommunicationError, gl1Exceptions.HTTPNotFound, ConnectionError) as e: #TODO remove
self._format_exception(e)
def get_image_id_from_path(self, path):
image_path = test_config['image_path']
if image_path:
- self.__class__.image_id = test_config["vim_conn"].new_image({ 'name': 'TestImage', 'location' : image_path })
+ self.__class__.image_id = test_config["vim_conn"].new_image({ 'name': 'TestImage', 'location' : image_path, 'metadata': {'upload_location':None} })
time.sleep(20)
- self.assertEqual(type(self.__class__.image_id),str)
- self.assertIsInstance(uuid.UUID(self.__class__.image_id),uuid.UUID)
+
+ self.assertIsInstance(self.__class__.image_id, (str, unicode))
+ self.assertIsInstance(uuid.UUID(self.__class__.image_id), uuid.UUID)
else:
self.skipTest("Skipping test as image file not present at RO container")
self.__class__.test_index += 1
with self.assertRaises(Exception) as context:
- test_config["vim_conn"].new_image({ 'name': 'TestImage', 'location' : Non_exist_image_path })
+ test_config["vim_conn"].new_image({ 'name': 'TestImage', 'location' : Non_exist_image_path})
self.assertEqual((context.exception).http_code, 400)
self.__class__.test_index += 1
image_id = test_config["vim_conn"].delete_image(self.__class__.image_id)
- self.assertEqual(type(image_id),str)
+
+ self.assertIsInstance(image_id, (str, unicode))
def test_030_delete_image_negative(self):
Non_exist_image_id = str(uuid.uuid4())
if 'name' in item:
self.__class__.image_name = item['name']
self.__class__.image_id = item['id']
- self.assertEqual(type(self.__class__.image_name),str)
- self.assertEqual(type(self.__class__.image_id),str)
+ self.assertIsInstance(self.__class__.image_name, (str, unicode))
+ self.assertIsInstance(self.__class__.image_id, (str, unicode))
def test_010_get_image_list_by_name(self):
self.__class__.test_text = "{}.{}. TEST {}".format(test_config["test_number"],
image_list = test_config["vim_conn"].get_image_list({'name': self.__class__.image_name})
for item in image_list:
- self.assertEqual(type(item['id']), str)
+ self.assertIsInstance(item['id'], (str, unicode))
+ self.assertIsInstance(item['name'], (str, unicode))
self.assertEqual(item['id'], self.__class__.image_id)
- self.assertEqual(type(item['name']), str)
self.assertEqual(item['name'], self.__class__.image_name)
def test_020_get_image_list_by_id(self):
filter_image_list = test_config["vim_conn"].get_image_list({'id': self.__class__.image_id})
for item1 in filter_image_list:
- self.assertEqual(type(item1.get('id')), str)
- self.assertEqual(item1.get('id'), self.__class__.image_id)
- self.assertEqual(type(item1.get('name')), str)
- self.assertEqual(item1.get('name'), self.__class__.image_name)
+ self.assertIsInstance(item1['id'], (str, unicode))
+ self.assertIsInstance(item1['name'], (str, unicode))
+ self.assertEqual(item1['id'], self.__class__.image_id)
+ self.assertEqual(item1['name'], self.__class__.image_name)
def test_030_get_image_list_negative(self):
Non_exist_image_id = uuid.uuid4()
for item in tenant_list:
if test_config['tenant'] == item['name']:
self.__class__.tenant_id = item['id']
- self.assertEqual(type(item['name']), str)
- self.assertEqual(type(item['id']), str)
+ self.assertIsInstance(item['name'], (str, unicode))
+ self.assertIsInstance(item['id'], (str, unicode))
def test_010_get_tenant_list_by_id(self):
self.__class__.test_text = "{}.{}. TEST {}".format(test_config["test_number"],
filter_tenant_list = test_config["vim_conn"].get_tenant_list({'id': self.__class__.tenant_id})
for item in filter_tenant_list:
- self.assertEqual(type(item['id']), str)
+ self.assertIsInstance(item['id'], (str, unicode))
self.assertEqual(item['id'], self.__class__.tenant_id)
def test_020_get_tenant_list_by_name(self):
filter_tenant_list = test_config["vim_conn"].get_tenant_list({'name': test_config['tenant']})
for item in filter_tenant_list:
- self.assertEqual(type(item['name']), str)
+ self.assertIsInstance(item['name'], (str, unicode))
self.assertEqual(item['name'], test_config['tenant'])
def test_030_get_tenant_list_by_name_and_id(self):
'id': self.__class__.tenant_id})
for item in filter_tenant_list:
- self.assertEqual(type(item['name']), str)
- self.assertEqual(type(item['id']), str)
+ self.assertIsInstance(item['name'], (str, unicode))
+ self.assertIsInstance(item['id'], (str, unicode))
self.assertEqual(item['name'], test_config['tenant'])
self.assertEqual(item['id'], self.__class__.tenant_id)
self.assertEqual(filter_tenant_list, [])
+
class test_vimconn_new_tenant(test_base):
tenant_id = None
inspect.currentframe().f_code.co_name)
self.__class__.test_index += 1
- self.__class__.tenant_id = test_config["vim_conn"].new_tenant(tenant_name)
+ self.__class__.tenant_id = test_config["vim_conn"].new_tenant(tenant_name, "")
time.sleep(15)
- self.assertEqual(type(self.__class__.tenant_id), str)
+ self.assertIsInstance(self.__class__.tenant_id, (str, unicode))
+
def test_010_new_tenant_negative(self):
Invalid_tenant_name = 10121
self.__class__.test_index += 1
with self.assertRaises(Exception) as context:
- test_config["vim_conn"].new_tenant(Invalid_tenant_name)
+ test_config["vim_conn"].new_tenant(Invalid_tenant_name, "")
self.assertEqual((context.exception).http_code, 400)
+
def test_020_delete_tenant(self):
self.__class__.test_text = "{}.{}. TEST {}".format(test_config["test_number"],
self.__class__.test_index,
self.__class__.test_index += 1
tenant_id = test_config["vim_conn"].delete_tenant(self.__class__.tenant_id)
- self.assertEqual(type(tenant_id), str)
+
+ self.assertIsInstance(tenant_id, (str, unicode))
def test_030_delete_tenant_negative(self):
Non_exist_tenant_name = 'Test_30_tenant'