X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_nbi%2Ftest%2Ftest.py;fp=osm_nbi%2Ftest%2Ftest.py;h=0000000000000000000000000000000000000000;hb=e128118419c3bb413de4cff8445b9b6c5598a5ed;hp=88d57e94ed11865e189905f33e7913b1f50db56b;hpb=0e96a88ce78692b6e4dc4f5a43ade2ccf7fca195;p=osm%2FNBI.git diff --git a/osm_nbi/test/test.py b/osm_nbi/test/test.py deleted file mode 100755 index 88d57e9..0000000 --- a/osm_nbi/test/test.py +++ /dev/null @@ -1,424 +0,0 @@ -#! /usr/bin/python3 -# -*- coding: utf-8 -*- - -import getopt -import sys -import requests -import json -import logging -import yaml -# import json -import tarfile -from os import makedirs - -__author__ = "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com" -__date__ = "$2018-03-01$" -__version__ = "0.1" -version_date = "Mar 2018" - - -def usage(): - print("Usage: ", sys.argv[0], "[options]") - print(" --version: prints current version") - print(" -f|--file FILE: file to be sent") - print(" -h|--help: shows this help") - print(" -u|--url URL: complete server URL") - print(" -s|--chunk-size SIZE: size of chunks, by default 1000") - print(" -t|--token TOKEN: Authorizaton token, previously obtained from server") - print(" -v|--verbose print debug information, can be used several times") - return - - -r_header_json = {"Content-type": "application/json"} -headers_json = { - "Content-type": "application/json", - "Accept": "application/json", -} -r_header_yaml = {"Content-type": "application/yaml"} -headers_yaml = { - "Content-type": "application/yaml", - "Accept": "application/yaml", -} -r_header_text = {"Content-type": "text/plain"} -r_header_octect = {"Content-type": "application/octet-stream"} -headers_text = { - "Accept": "text/plain", -} -r_header_zip = {"Content-type": "application/zip"} -headers_zip = { - "Accept": "application/zip", -} -# test without authorization -test_not_authorized_list = ( - ("NA1", "Invalid token", "GET", "/admin/v1/users", headers_json, None, 401, r_header_json, "json"), - ("NA2", "Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml, None, 405, r_header_yaml, "yaml"), - ("NA3", "Invalid version", "DELETE", "/admin/v2/users", headers_yaml, None, 405, r_header_yaml, "yaml"), -) - -# test ones authorized -test_authorized_list = ( - ("AU1", "Invalid vnfd id", "GET", "/vnfpkgm/v1/vnf_packages/non-existing-id", - headers_json, None, 404, r_header_json, "json"), - ("AU2", "Invalid nsd id", "GET", "/nsd/v1/ns_descriptors/non-existing-id", - headers_yaml, None, 404, r_header_yaml, "yaml"), - ("AU3", "Invalid nsd id", "DELETE", "/nsd/v1/ns_descriptors_content/non-existing-id", - headers_yaml, None, 404, r_header_yaml, "yaml"), -) - -vim = { - "schema_version": "1.0", - "schema_type": "No idea", - "name": "myVim", - "description": "Descriptor name", - "vim_type": "openstack", - "vim_url": "http://localhost:/vim", - "vim_tenant_name": "vimTenant", - "vim_user": "user", - "vim_password": "password", - "config": {"config_param": 1} -} - -vim_bad = vim.copy() -vim_bad.pop("name") - -test_admin_list1 = ( - ("VIM1", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, vim, (201, 204), - {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/json"}, "json"), - ("VIM2", "Create VIM bad schema", "POST", "/admin/v1/vim_accounts", headers_json, vim_bad, 422, None, headers_json), - ("VIM2", "Create VIM name repeated", "POST", "/admin/v1/vim_accounts", headers_json, vim, 409, None, headers_json), - ("VIM4", "Show VIMs", "GET", "/admin/v1/vim_accounts", headers_yaml, None, 200, r_header_yaml, "yaml"), - ("VIM5", "Show VIM", "GET", "/admin/v1/vim_accounts/{VIM1}", headers_yaml, None, 200, r_header_yaml, "yaml"), - ("VIM6", "Delete VIM", "DELETE", "/admin/v1/vim_accounts/{VIM1}", headers_yaml, None, 202, None, 0), -) - - -class TestException(Exception): - pass - - -class TestRest: - def __init__(self, url_base, header_base={}, verify=False): - self.url_base = url_base - self.header_base = header_base - self.s = requests.session() - self.s.headers = header_base - self.verify = verify - # contains ID of tests obtained from Location response header. "" key contains last obtained id - self.test_ids = {} - - def set_header(self, header): - self.s.headers.update(header) - - def test(self, name, description, method, url, headers, payload, expected_codes, expected_headers, - expected_payload): - """ - Performs an http request and check http code response. Exit if different than allowed. It get the returned id - that can be used by following test in the URL with {name} where name is the name of the test - :param name: short name of the test - :param description: description of the test - :param method: HTTP method: GET,PUT,POST,DELETE,... - :param url: complete URL or relative URL - :param headers: request headers to add to the base headers - :param payload: Can be a dict, transformed to json, a text or a file if starts with '@' - :param expected_codes: expected response codes, can be int, int tuple or int range - :param expected_headers: expected response headers, dict with key values - :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip' - :return: requests response - """ - try: - if not self.s: - self.s = requests.session() - # URL - if not url: - url = self.url_base - elif not url.startswith("http"): - url = self.url_base + url - - var_start = url.find("{") + 1 - while var_start: - var_end = url.find("}", var_start) - if var_end == -1: - break - var_name = url[var_start:var_end] - if var_name in self.test_ids: - url = url[:var_start-1] + self.test_ids[var_name] + url[var_end+1:] - var_start += len(self.test_ids[var_name]) - var_start = url.find("{", var_start) + 1 - if payload: - if isinstance(payload, str): - if payload.startswith("@"): - mode = "r" - file_name = payload[1:] - if payload.startswith("@b"): - mode = "rb" - file_name = payload[2:] - with open(file_name, mode) as f: - payload = f.read() - elif isinstance(payload, dict): - payload = json.dumps(payload) - - test = "Test {} {} {} {}".format(name, description, method, url) - logger.warning(test) - stream = False - # if expected_payload == "zip": - # stream = True - r = getattr(self.s, method.lower())(url, data=payload, headers=headers, verify=self.verify, stream=stream) - logger.debug("RX {}: {}".format(r.status_code, r.text)) - - # check response - if expected_codes: - if isinstance(expected_codes, int): - expected_codes = (expected_codes,) - if r.status_code not in expected_codes: - raise TestException( - "Got status {}. Expected {}. {}".format(r.status_code, expected_codes, r.text)) - - if expected_headers: - for header_key, header_val in expected_headers.items(): - if header_key.lower() not in r.headers: - raise TestException("Header {} not present".format(header_key)) - if header_val and header_val.lower() not in r.headers[header_key]: - raise TestException("Header {} does not contain {} but {}".format(header_key, header_val, - r.headers[header_key])) - - if expected_payload is not None: - if expected_payload == 0 and len(r.content) > 0: - raise TestException("Expected empty payload") - elif expected_payload == "json": - try: - r.json() - except Exception as e: - raise TestException("Expected json response payload, but got Exception {}".format(e)) - elif expected_payload == "yaml": - try: - yaml.safe_load(r.text) - except Exception as e: - raise TestException("Expected yaml response payload, but got Exception {}".format(e)) - elif expected_payload == "zip": - if len(r.content) == 0: - raise TestException("Expected some response payload, but got empty") - # try: - # tar = tarfile.open(None, 'r:gz', fileobj=r.raw) - # for tarinfo in tar: - # tarname = tarinfo.name - # print(tarname) - # except Exception as e: - # raise TestException("Expected zip response payload, but got Exception {}".format(e)) - elif expected_payload == "text": - if len(r.content) == 0: - raise TestException("Expected some response payload, but got empty") - # r.text - location = r.headers.get("Location") - if location: - _id = location[location.rfind("/") + 1:] - if _id: - self.test_ids[name] = str(_id) - self.test_ids[""] = str(_id) # last id - return r - except TestException as e: - logger.error("{} \nRX code{}: {}".format(e, r.status_code, r.text)) - exit(1) - except IOError as e: - logger.error("Cannot open file {}".format(e)) - exit(1) - - -if __name__ == "__main__": - global logger - test = "" - - # Disable warnings from self-signed certificates. - requests.packages.urllib3.disable_warnings() - try: - logging.basicConfig(format="%(levelname)s %(message)s", level=logging.ERROR) - logger = logging.getLogger('NBI') - # load parameters and configuration - opts, args = getopt.getopt(sys.argv[1:], "hvu:p:", - ["url=", "user=", "password=", "help", "version", "verbose", "project=", "insecure"]) - url = "https://localhost:9999/osm" - user = password = project = "admin" - verbose = 0 - verify = True - - for o, a in opts: - if o == "--version": - print("test version " + __version__ + ' ' + version_date) - sys.exit() - elif o in ("-v", "--verbose"): - verbose += 1 - elif o == "no-verbose": - verbose = -1 - elif o in ("-h", "--help"): - usage() - sys.exit() - elif o == "--url": - url = a - elif o in ("-u", "--user"): - user = a - elif o in ("-p", "--password"): - password = a - elif o == "--project": - project = a - elif o == "--insecure": - verify = False - else: - assert False, "Unhandled option" - if verbose == 0: - logger.setLevel(logging.WARNING) - elif verbose > 1: - logger.setLevel(logging.DEBUG) - else: - logger.setLevel(logging.ERROR) - - test_rest = TestRest(url) - - # tests without authorization - for t in test_not_authorized_list: - test_rest.test(*t) - - # get token - r = test_rest.test("token1", "Obtain token", "POST", "/admin/v1/tokens", headers_json, - {"username": user, "password": password, "project_id": project}, - (200, 201), {"Content-Type": "application/json"}, "json") - response = r.json() - token = response["id"] - test_rest.set_header({"Authorization": "Bearer {}".format(token)}) - - # tests once authorized - for t in test_authorized_list: - test_rest.test(*t) - - # tests admin - for t in test_admin_list1: - test_rest.test(*t) - - # vnfd CREATE - r = test_rest.test("VNFD1", "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json, None, - 201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json") - location = r.headers["Location"] - vnfd_id = location[location.rfind("/")+1:] - # print(location, vnfd_id) - - # vnfd UPLOAD test - r = test_rest.test("VNFD2", "Onboard VNFD step 2 as TEXT", "PUT", - "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id), - r_header_text, "@./cirros_vnf/cirros_vnfd.yaml", 204, None, 0) - - # vnfd SHOW OSM format - r = test_rest.test("VNFD3", "Show VNFD OSM format", "GET", - "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id), - headers_json, None, 200, r_header_json, "json") - - # vnfd SHOW text - r = test_rest.test("VNFD4", "Show VNFD SOL005 text", "GET", - "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id), - headers_text, None, 200, r_header_text, "text") - - # vnfd UPLOAD ZIP - makedirs("temp", exist_ok=True) - tar = tarfile.open("temp/cirros_vnf.tar.gz", "w:gz") - tar.add("cirros_vnf") - tar.close() - r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT", - "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id), - r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0) - - # vnfd SHOW OSM format - r = test_rest.test("VNFD6", "Show VNFD OSM format", "GET", - "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id), - headers_json, None, 200, r_header_json, "json") - - # vnfd SHOW zip - r = test_rest.test("VNFD7", "Show VNFD SOL005 zip", "GET", - "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id), - headers_zip, None, 200, r_header_zip, "zip") - # vnfd SHOW descriptor - r = test_rest.test("VNFD8", "Show VNFD descriptor", "GET", - "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(vnfd_id), - headers_text, None, 200, r_header_text, "text") - # vnfd SHOW actifact - r = test_rest.test("VNFD9", "Show VNFD artifact", "GET", - "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/cirros-64.png".format(vnfd_id), - headers_text, None, 200, r_header_octect, "text") - - # # vnfd DELETE - # r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE", - # "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id), - # headers_yaml, None, 204, None, 0) - - # nsd CREATE - r = test_rest.test("NSD1", "Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json, None, - 201, {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json") - location = r.headers["Location"] - nsd_id = location[location.rfind("/")+1:] - # print(location, nsd_id) - - # nsd UPLOAD test - r = test_rest.test("NSD2", "Onboard NSD with missing vnfd", "PUT", - "/nsd/v1/ns_descriptors/{}/nsd_content?constituent-vnfd.0.vnfd-id-ref" - "=NONEXISTING-VNFD".format(nsd_id), - r_header_text, "@./cirros_ns/cirros_nsd.yaml", 409, r_header_yaml, "yaml") - - # # VNF_CREATE - # r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT", - # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id), - # r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0) - - r = test_rest.test("NSD2", "Onboard NSD step 2 as TEXT", "PUT", - "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id), - r_header_text, "@./cirros_ns/cirros_nsd.yaml", 204, None, 0) - - # nsd SHOW OSM format - r = test_rest.test("NSD3", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id), - headers_json, None, 200, r_header_json, "json") - - # nsd SHOW text - r = test_rest.test("NSD4", "Show NSD SOL005 text", "GET", - "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id), - headers_text, None, 200, r_header_text, "text") - - # nsd UPLOAD ZIP - makedirs("temp", exist_ok=True) - tar = tarfile.open("temp/cirros_ns.tar.gz", "w:gz") - tar.add("cirros_ns") - tar.close() - r = test_rest.test("NSD5", "Onboard NSD step 3 replace with ZIP", "PUT", - "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id), - r_header_zip, "@b./temp/cirros_ns.tar.gz", 204, None, 0) - - # nsd SHOW OSM format - r = test_rest.test("NSD6", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id), - headers_json, None, 200, r_header_json, "json") - - # nsd SHOW zip - r = test_rest.test("NSD7", "Show NSD SOL005 zip", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id), - headers_zip, None, 200, r_header_zip, "zip") - - # nsd SHOW descriptor - r = test_rest.test("NSD8", "Show NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(nsd_id), - headers_text, None, 200, r_header_text, "text") - # nsd SHOW actifact - r = test_rest.test("NSD9", "Show NSD artifact", "GET", - "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm_2x.png".format(nsd_id), - headers_text, None, 200, r_header_octect, "text") - - # vnfd DELETE - r = test_rest.test("VNFD10", "Delete VNFD conflict", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id), - headers_yaml, None, 409, r_header_yaml, "yaml") - - # nsd DELETE - r = test_rest.test("NSD10", "Delete NSD SOL005 text", "DELETE", "/nsd/v1/ns_descriptors/{}".format(nsd_id), - headers_yaml, None, 204, None, 0) - - # vnfd DELETE - r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id), - headers_yaml, None, 204, None, 0) - - print("PASS") - - except Exception as e: - if test: - logger.error(test + " Exception: " + str(e)) - exit(1) - else: - logger.critical(test + " Exception: " + str(e), exc_info=True)