}
# test without authorization
test_not_authorized_list = (
- ("Invalid token", "GET", "/admin/v1/users", headers_json, None, 401, r_header_json, "json"),
- ("Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml, None, 405, r_header_yaml, "yaml"),
- ("Invalid version", "DELETE", "/admin/v2/users", headers_yaml, None, 405, r_header_yaml, "yaml"),
+ ("NA1", "Invalid token", "GET", "/admin/v1/users", headers_json, None, 401, r_header_json, "json"),
+ ("NA2", "Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml, None, 405, r_header_yaml, "yaml"),
+ ("NA3", "Invalid version", "DELETE", "/admin/v2/users", headers_yaml, None, 405, r_header_yaml, "yaml"),
)
# test ones authorized
test_authorized_list = (
- ("Invalid vnfd id", "GET", "/vnfpkgm/v1/vnf_packages/non-existing-id", headers_json, None, 404, r_header_json, "json"),
- ("Invalid nsd id", "GET", "/nsd/v1/ns_descriptors/non-existing-id", headers_yaml, None, 404, r_header_yaml, "yaml"),
- ("Invalid nsd id", "DELETE", "/nsd/v1/ns_descriptors_content/non-existing-id", headers_yaml, None, 404, r_header_yaml, "yaml"),
+ ("AU1", "Invalid vnfd id", "GET", "/vnfpkgm/v1/vnf_packages/non-existing-id", headers_json, None, 404, r_header_json, "json"),
+ ("AU2","Invalid nsd id", "GET", "/nsd/v1/ns_descriptors/non-existing-id", headers_yaml, None, 404, r_header_yaml, "yaml"),
+ ("AU3","Invalid nsd id", "DELETE", "/nsd/v1/ns_descriptors_content/non-existing-id", headers_yaml, None, 404, r_header_yaml, "yaml"),
+)
+
+vim = {
+ "schema_version": "1.0",
+ "schema_type": "No idea",
+ "name": "myVim",
+ "description": "Descriptor name",
+ "vim_type": "openstack",
+ "vim_url": "http://localhost:/vim",
+ "vim_tenant_name": "vimTenant",
+ "vim_user": "user",
+ "vim_password": "password",
+ "config": {"config_param": 1}
+}
+
+vim_bad = vim.copy()
+vim_bad.pop("name")
+
+test_admin_list1 = (
+ ("VIM1", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, vim, (201, 204), {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/json"}, "json"),
+ ("VIM2", "Create VIM bad schema", "POST", "/admin/v1/vim_accounts", headers_json, vim_bad, 422, None, headers_json),
+ ("VIM2", "Create VIM name repeated", "POST", "/admin/v1/vim_accounts", headers_json, vim, 409, None, headers_json),
+ ("VIM4", "Show VIMs", "GET", "/admin/v1/vim_accounts", headers_yaml, None, 200, r_header_yaml, "yaml"),
+ ("VIM5", "Show VIM", "GET", "/admin/v1/vim_accounts/{VIM1}", headers_yaml, None, 200, r_header_yaml, "yaml"),
+ ("VIM6", "Delete VIM", "DELETE", "/admin/v1/vim_accounts/{VIM1}", headers_yaml, None, 202, None, 0),
)
class TestException(Exception):
self.s = requests.session()
self.s.headers = header_base
self.verify = verify
+ # contains ID of tests obtained from Location response header. "" key contains last obtained id
+ self.test_ids = {}
def set_header(self, header):
self.s.headers.update(header)
- def test(self, name, method, url, headers, payload, expected_codes, expected_headers, expected_payload):
+ def test(self, name, description, method, url, headers, payload, expected_codes, expected_headers, expected_payload):
"""
- Performs an http request and check http code response. Exit if different than allowed
- :param name: name of the test
+ Performs an http request and check http code response. Exit if different than allowed. It get the returned id
+ that can be used by following test in the URL with {name} where name is the name of the test
+ :param name: short name of the test
+ :param description: description of the test
:param method: HTTP method: GET,PUT,POST,DELETE,...
:param url: complete URL or relative URL
:param headers: request headers to add to the base headers
:param expected_codes: expected response codes, can be int, int tuple or int range
:param expected_headers: expected response headers, dict with key values
:param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip'
- :return:
+ :return: requests response
"""
try:
if not self.s:
self.s = requests.session()
+ # URL
if not url:
url = self.url_base
elif not url.startswith("http"):
url = self.url_base + url
+
+ var_start = url.find("{") + 1
+ while var_start:
+ var_end = url.find("}", var_start)
+ if var_end == -1:
+ break
+ var_name = url[var_start:var_end]
+ if var_name in self.test_ids:
+ url = url[:var_start-1] + self.test_ids[var_name] + url[var_end+1:]
+ var_start += len(self.test_ids[var_name])
+ var_start = url.find("{", var_start) + 1
if payload:
if isinstance(payload, str):
if payload.startswith("@"):
elif isinstance(payload, dict):
payload = json.dumps(payload)
- test = "Test {} {} {}".format(name, method, url)
+ test = "Test {} {} {} {}".format(name, description, method, url)
logger.warning(test)
stream = False
# if expected_payload == "zip":
if len(r.content) == 0:
raise TestException("Expected some response payload, but got empty")
#r.text
+ location = r.headers.get("Location")
+ if location:
+ _id = location[location.rfind("/") + 1:]
+ if _id:
+ self.test_ids[name] = str(_id)
+ self.test_ids[""] = str(_id) # last id
return r
except TestException as e:
logger.error("{} \nRX code{}: {}".format(e, r.status_code, r.text))
if __name__ == "__main__":
global logger
test = ""
+
+ # Disable warnings from self-signed certificates.
+ requests.packages.urllib3.disable_warnings()
try:
logging.basicConfig(format="%(levelname)s %(message)s", level=logging.ERROR)
logger = logging.getLogger('NBI')
test_rest.test(*t)
# get token
- r = test_rest.test("Obtain token", "POST", "/admin/v1/tokens", headers_json,
+ r = test_rest.test("token1", "Obtain token", "POST", "/admin/v1/tokens", headers_json,
{"username": user, "password": password, "project_id": project},
(200, 201), {"Content-Type": "application/json"}, "json")
response = r.json()
for t in test_authorized_list:
test_rest.test(*t)
+ # tests admin
+ for t in test_admin_list1:
+ test_rest.test(*t)
+
+
# nsd CREATE
- r = test_rest.test("Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json, None,
+ r = test_rest.test("NSD1", "Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json, None,
201, {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json")
location = r.headers["Location"]
nsd_id = location[location.rfind("/")+1:]
# print(location, nsd_id)
# nsd UPLOAD test
- r = test_rest.test("Onboard NSD step 2 as TEXT", "PUT", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
+ r = test_rest.test("NSD2", "Onboard NSD step 2 as TEXT", "PUT", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
r_header_text, "@./cirros_ns/cirros_nsd.yaml", 204, None, 0)
# nsd SHOW OSM format
- r = test_rest.test("Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
+ r = test_rest.test("NSD3", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
headers_json, None, 200, r_header_json, "json")
# nsd SHOW text
- r = test_rest.test("Show NSD SOL005 text", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
+ r = test_rest.test("NSD4", "Show NSD SOL005 text", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
headers_text, None, 200, r_header_text, "text")
# nsd UPLOAD ZIP
tar = tarfile.open("temp/cirros_ns.tar.gz", "w:gz")
tar.add("cirros_ns")
tar.close()
- r = test_rest.test("Onboard NSD step 3 replace with ZIP", "PUT", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
+ r = test_rest.test("NSD5", "Onboard NSD step 3 replace with ZIP", "PUT", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
r_header_zip, "@b./temp/cirros_ns.tar.gz", 204, None, 0)
# nsd SHOW OSM format
- r = test_rest.test("Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
+ r = test_rest.test("NSD6", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
headers_json, None, 200, r_header_json, "json")
# nsd SHOW zip
- r = test_rest.test("Show NSD SOL005 zip", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
+ r = test_rest.test("NSD7", "Show NSD SOL005 zip", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
headers_zip, None, 200, r_header_zip, "zip")
# nsd SHOW descriptor
- r = test_rest.test("Show NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(nsd_id),
+ r = test_rest.test("NSD8", "Show NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(nsd_id),
headers_text, None, 200, r_header_text, "text")
# nsd SHOW actifact
- r = test_rest.test("Show NSD artifact", "GET", "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm_2x.png".format(nsd_id),
+ r = test_rest.test("NSD9", "Show NSD artifact", "GET", "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm_2x.png".format(nsd_id),
headers_text, None, 200, r_header_octect, "text")
# nsd DELETE
- r = test_rest.test("Delete NSD SOL005 text", "DELETE", "/nsd/v1/ns_descriptors/{}".format(nsd_id),
+ r = test_rest.test("NSD10", "Delete NSD SOL005 text", "DELETE", "/nsd/v1/ns_descriptors/{}".format(nsd_id),
headers_yaml, None, 204, None, 0)
# vnfd CREATE
- r = test_rest.test("Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json, None,
+ r = test_rest.test("VNFD1", "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json, None,
201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json")
location = r.headers["Location"]
vnfd_id = location[location.rfind("/")+1:]
# print(location, vnfd_id)
# vnfd UPLOAD test
- r = test_rest.test("Onboard VNFD step 2 as TEXT", "PUT", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
+ r = test_rest.test("VNFD2", "Onboard VNFD step 2 as TEXT", "PUT", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
r_header_text, "@./cirros_vnf/cirros_vnfd.yaml", 204, None, 0)
# vnfd SHOW OSM format
- r = test_rest.test("Show VNFD OSM format", "GET", "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
+ r = test_rest.test("VNFD3", "Show VNFD OSM format", "GET", "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
headers_json, None, 200, r_header_json, "json")
# vnfd SHOW text
- r = test_rest.test("Show VNFD SOL005 text", "GET", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
+ r = test_rest.test("VNFD4", "Show VNFD SOL005 text", "GET", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
headers_text, None, 200, r_header_text, "text")
# vnfd UPLOAD ZIP
tar = tarfile.open("temp/cirros_vnf.tar.gz", "w:gz")
tar.add("cirros_vnf")
tar.close()
- r = test_rest.test("Onboard VNFD step 3 replace with ZIP", "PUT", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
+ r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
# vnfd SHOW OSM format
- r = test_rest.test("Show VNFD OSM format", "GET", "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
+ r = test_rest.test("VNFD6", "Show VNFD OSM format", "GET", "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
headers_json, None, 200, r_header_json, "json")
# vnfd SHOW zip
- r = test_rest.test("Show VNFD SOL005 zip", "GET", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
+ r = test_rest.test("VNFD7", "Show VNFD SOL005 zip", "GET", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
headers_zip, None, 200, r_header_zip, "zip")
# vnfd SHOW descriptor
- r = test_rest.test("Show VNFD descriptor", "GET", "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(vnfd_id),
+ r = test_rest.test("VNFD8", "Show VNFD descriptor", "GET", "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(vnfd_id),
headers_text, None, 200, r_header_text, "text")
# vnfd SHOW actifact
- r = test_rest.test("Show VNFD artifact", "GET", "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/cirros-64.png".format(vnfd_id),
+ r = test_rest.test("VNFD9", "Show VNFD artifact", "GET", "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/cirros-64.png".format(vnfd_id),
headers_text, None, 200, r_header_octect, "text")
# vnfd DELETE
- r = test_rest.test("Delete VNFD SOL005 text", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
+ r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
headers_yaml, None, 204, None, 0)
print("PASS")