fix flake8 tests
[osm/NBI.git] / osm_nbi / test / test.py
diff --git a/osm_nbi/test/test.py b/osm_nbi/test/test.py
deleted file mode 100755 (executable)
index 88d57e9..0000000
+++ /dev/null
@@ -1,424 +0,0 @@
-#! /usr/bin/python3
-# -*- coding: utf-8 -*-
-
-import getopt
-import sys
-import requests
-import json
-import logging
-import yaml
-# import json
-import tarfile
-from os import makedirs
-
-__author__ = "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com"
-__date__ = "$2018-03-01$"
-__version__ = "0.1"
-version_date = "Mar 2018"
-
-
-def usage():
-    print("Usage: ", sys.argv[0], "[options]")
-    print("      --version: prints current version")
-    print("      -f|--file FILE: file to be sent")
-    print("      -h|--help: shows this help")
-    print("      -u|--url URL: complete server URL")
-    print("      -s|--chunk-size SIZE: size of chunks, by default 1000")
-    print("      -t|--token TOKEN: Authorizaton token, previously obtained from server")
-    print("      -v|--verbose print debug information, can be used several times")
-    return
-
-
-r_header_json = {"Content-type": "application/json"}
-headers_json = {
-    "Content-type": "application/json",
-    "Accept": "application/json",
-}
-r_header_yaml = {"Content-type": "application/yaml"}
-headers_yaml = {
-    "Content-type": "application/yaml",
-    "Accept": "application/yaml",
-}
-r_header_text = {"Content-type": "text/plain"}
-r_header_octect = {"Content-type": "application/octet-stream"}
-headers_text = {
-    "Accept": "text/plain",
-}
-r_header_zip = {"Content-type": "application/zip"}
-headers_zip = {
-    "Accept": "application/zip",
-}
-# test without authorization
-test_not_authorized_list = (
-    ("NA1", "Invalid token", "GET", "/admin/v1/users", headers_json, None, 401, r_header_json, "json"),
-    ("NA2", "Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml, None, 405, r_header_yaml, "yaml"),
-    ("NA3", "Invalid version", "DELETE", "/admin/v2/users", headers_yaml, None, 405, r_header_yaml, "yaml"),
-)
-
-# test ones authorized
-test_authorized_list = (
-    ("AU1", "Invalid vnfd id", "GET", "/vnfpkgm/v1/vnf_packages/non-existing-id",
-     headers_json, None, 404, r_header_json, "json"),
-    ("AU2", "Invalid nsd id", "GET", "/nsd/v1/ns_descriptors/non-existing-id",
-     headers_yaml, None, 404, r_header_yaml, "yaml"),
-    ("AU3", "Invalid nsd id", "DELETE", "/nsd/v1/ns_descriptors_content/non-existing-id",
-     headers_yaml, None, 404, r_header_yaml, "yaml"),
-)
-
-vim = {
-    "schema_version": "1.0",
-    "schema_type": "No idea",
-    "name": "myVim",
-    "description": "Descriptor name",
-    "vim_type": "openstack",
-    "vim_url": "http://localhost:/vim",
-    "vim_tenant_name": "vimTenant",
-    "vim_user": "user",
-    "vim_password": "password",
-    "config": {"config_param": 1}
-}
-
-vim_bad = vim.copy()
-vim_bad.pop("name")
-
-test_admin_list1 = (
-    ("VIM1", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, vim, (201, 204),
-     {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/json"}, "json"),
-    ("VIM2", "Create VIM bad schema", "POST", "/admin/v1/vim_accounts", headers_json, vim_bad, 422, None, headers_json),
-    ("VIM2", "Create VIM name repeated", "POST", "/admin/v1/vim_accounts", headers_json, vim, 409, None, headers_json),
-    ("VIM4", "Show VIMs", "GET", "/admin/v1/vim_accounts", headers_yaml, None, 200, r_header_yaml, "yaml"),
-    ("VIM5", "Show VIM", "GET", "/admin/v1/vim_accounts/{VIM1}", headers_yaml, None, 200, r_header_yaml, "yaml"),
-    ("VIM6", "Delete VIM", "DELETE", "/admin/v1/vim_accounts/{VIM1}", headers_yaml, None, 202, None, 0),
-)
-
-
-class TestException(Exception):
-    pass
-
-
-class TestRest:
-    def __init__(self, url_base, header_base={}, verify=False):
-        self.url_base = url_base
-        self.header_base = header_base
-        self.s = requests.session()
-        self.s.headers = header_base
-        self.verify = verify
-        # contains ID of tests obtained from Location response header. "" key contains last obtained id
-        self.test_ids = {}
-
-    def set_header(self, header):
-        self.s.headers.update(header)
-
-    def test(self, name, description, method, url, headers, payload, expected_codes, expected_headers,
-             expected_payload):
-        """
-        Performs an http request and check http code response. Exit if different than allowed. It get the returned id
-        that can be used by following test in the URL with {name} where name is the name of the test
-        :param name:  short name of the test
-        :param description:  description of the test
-        :param method: HTTP method: GET,PUT,POST,DELETE,...
-        :param url: complete URL or relative URL
-        :param headers: request headers to add to the base headers
-        :param payload: Can be a dict, transformed to json, a text or a file if starts with '@'
-        :param expected_codes: expected response codes, can be int, int tuple or int range
-        :param expected_headers: expected response headers, dict with key values
-        :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip'
-        :return: requests response
-        """
-        try:
-            if not self.s:
-                self.s = requests.session()
-            # URL
-            if not url:
-                url = self.url_base
-            elif not url.startswith("http"):
-                url = self.url_base + url
-
-            var_start = url.find("{") + 1
-            while var_start:
-                var_end = url.find("}", var_start)
-                if var_end == -1:
-                    break
-                var_name = url[var_start:var_end]
-                if var_name in self.test_ids:
-                    url = url[:var_start-1] + self.test_ids[var_name] + url[var_end+1:]
-                    var_start += len(self.test_ids[var_name])
-                var_start = url.find("{", var_start) + 1
-            if payload:
-                if isinstance(payload, str):
-                    if payload.startswith("@"):
-                        mode = "r"
-                        file_name = payload[1:]
-                        if payload.startswith("@b"):
-                            mode = "rb"
-                            file_name = payload[2:]
-                        with open(file_name, mode) as f:
-                            payload = f.read()
-                elif isinstance(payload, dict):
-                    payload = json.dumps(payload)
-
-            test = "Test {} {} {} {}".format(name, description, method, url)
-            logger.warning(test)
-            stream = False
-            # if expected_payload == "zip":
-            #     stream = True
-            r = getattr(self.s, method.lower())(url, data=payload, headers=headers, verify=self.verify, stream=stream)
-            logger.debug("RX {}: {}".format(r.status_code, r.text))
-
-            # check response
-            if expected_codes:
-                if isinstance(expected_codes, int):
-                    expected_codes = (expected_codes,)
-                if r.status_code not in expected_codes:
-                    raise TestException(
-                        "Got status {}. Expected {}. {}".format(r.status_code, expected_codes, r.text))
-
-            if expected_headers:
-                for header_key, header_val in expected_headers.items():
-                    if header_key.lower() not in r.headers:
-                        raise TestException("Header {} not present".format(header_key))
-                    if header_val and header_val.lower() not in r.headers[header_key]:
-                        raise TestException("Header {} does not contain {} but {}".format(header_key, header_val,
-                                            r.headers[header_key]))
-
-            if expected_payload is not None:
-                if expected_payload == 0 and len(r.content) > 0:
-                    raise TestException("Expected empty payload")
-                elif expected_payload == "json":
-                    try:
-                        r.json()
-                    except Exception as e:
-                        raise TestException("Expected json response payload, but got Exception {}".format(e))
-                elif expected_payload == "yaml":
-                    try:
-                        yaml.safe_load(r.text)
-                    except Exception as e:
-                        raise TestException("Expected yaml response payload, but got Exception {}".format(e))
-                elif expected_payload == "zip":
-                    if len(r.content) == 0:
-                        raise TestException("Expected some response payload, but got empty")
-                    # try:
-                    #     tar = tarfile.open(None, 'r:gz', fileobj=r.raw)
-                    #     for tarinfo in tar:
-                    #         tarname = tarinfo.name
-                    #         print(tarname)
-                    # except Exception as e:
-                    #     raise TestException("Expected zip response payload, but got Exception {}".format(e))
-                elif expected_payload == "text":
-                    if len(r.content) == 0:
-                        raise TestException("Expected some response payload, but got empty")
-                    # r.text
-            location = r.headers.get("Location")
-            if location:
-                _id = location[location.rfind("/") + 1:]
-                if _id:
-                    self.test_ids[name] = str(_id)
-                    self.test_ids[""] = str(_id)  # last id
-            return r
-        except TestException as e:
-            logger.error("{} \nRX code{}: {}".format(e, r.status_code, r.text))
-            exit(1)
-        except IOError as e:
-            logger.error("Cannot open file {}".format(e))
-            exit(1)
-
-
-if __name__ == "__main__":
-    global logger
-    test = ""
-
-    # Disable warnings from self-signed certificates.
-    requests.packages.urllib3.disable_warnings()
-    try:
-        logging.basicConfig(format="%(levelname)s %(message)s", level=logging.ERROR)
-        logger = logging.getLogger('NBI')
-        # load parameters and configuration
-        opts, args = getopt.getopt(sys.argv[1:], "hvu:p:",
-                                   ["url=", "user=", "password=", "help", "version", "verbose", "project=", "insecure"])
-        url = "https://localhost:9999/osm"
-        user = password = project = "admin"
-        verbose = 0
-        verify = True
-
-        for o, a in opts:
-            if o == "--version":
-                print("test version " + __version__ + ' ' + version_date)
-                sys.exit()
-            elif o in ("-v", "--verbose"):
-                verbose += 1
-            elif o == "no-verbose":
-                verbose = -1
-            elif o in ("-h", "--help"):
-                usage()
-                sys.exit()
-            elif o == "--url":
-                url = a
-            elif o in ("-u", "--user"):
-                user = a
-            elif o in ("-p", "--password"):
-                password = a
-            elif o == "--project":
-                project = a
-            elif o == "--insecure":
-                verify = False
-            else:
-                assert False, "Unhandled option"
-        if verbose == 0:
-            logger.setLevel(logging.WARNING)
-        elif verbose > 1:
-            logger.setLevel(logging.DEBUG)
-        else:
-            logger.setLevel(logging.ERROR)
-
-        test_rest = TestRest(url)
-
-        # tests without authorization
-        for t in test_not_authorized_list:
-            test_rest.test(*t)
-
-        # get token
-        r = test_rest.test("token1", "Obtain token", "POST", "/admin/v1/tokens", headers_json,
-                           {"username": user, "password": password, "project_id": project},
-                           (200, 201), {"Content-Type": "application/json"}, "json")
-        response = r.json()
-        token = response["id"]
-        test_rest.set_header({"Authorization": "Bearer {}".format(token)})
-
-        # tests once authorized
-        for t in test_authorized_list:
-            test_rest.test(*t)
-
-        # tests admin
-        for t in test_admin_list1:
-            test_rest.test(*t)
-
-        # vnfd CREATE
-        r = test_rest.test("VNFD1", "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json, None,
-                           201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json")
-        location = r.headers["Location"]
-        vnfd_id = location[location.rfind("/")+1:]
-        # print(location, vnfd_id)
-
-        # vnfd UPLOAD test
-        r = test_rest.test("VNFD2", "Onboard VNFD step 2 as TEXT", "PUT",
-                           "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
-                           r_header_text, "@./cirros_vnf/cirros_vnfd.yaml", 204, None, 0)
-
-        # vnfd SHOW OSM format
-        r = test_rest.test("VNFD3", "Show VNFD OSM format", "GET",
-                           "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
-                           headers_json, None, 200, r_header_json, "json")
-
-        # vnfd SHOW text
-        r = test_rest.test("VNFD4", "Show VNFD SOL005 text", "GET",
-                           "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
-                           headers_text, None, 200, r_header_text, "text")
-
-        # vnfd UPLOAD ZIP
-        makedirs("temp", exist_ok=True)
-        tar = tarfile.open("temp/cirros_vnf.tar.gz", "w:gz")
-        tar.add("cirros_vnf")
-        tar.close()
-        r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT",
-                           "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
-                           r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
-
-        # vnfd SHOW OSM format
-        r = test_rest.test("VNFD6", "Show VNFD OSM format", "GET",
-                           "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
-                           headers_json, None, 200, r_header_json, "json")
-
-        # vnfd SHOW zip
-        r = test_rest.test("VNFD7", "Show VNFD SOL005 zip", "GET",
-                           "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
-                           headers_zip, None, 200, r_header_zip, "zip")
-        # vnfd SHOW descriptor
-        r = test_rest.test("VNFD8", "Show VNFD descriptor", "GET",
-                           "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(vnfd_id),
-                           headers_text, None, 200, r_header_text, "text")
-        # vnfd SHOW actifact
-        r = test_rest.test("VNFD9", "Show VNFD artifact", "GET",
-                           "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/cirros-64.png".format(vnfd_id),
-                           headers_text, None, 200, r_header_octect, "text")
-
-        # # vnfd DELETE
-        # r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE",
-        # "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
-        #                    headers_yaml, None, 204, None, 0)
-
-        # nsd CREATE
-        r = test_rest.test("NSD1", "Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json, None,
-                           201, {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json")
-        location = r.headers["Location"]
-        nsd_id = location[location.rfind("/")+1:]
-        # print(location, nsd_id)
-
-        # nsd UPLOAD test
-        r = test_rest.test("NSD2", "Onboard NSD with missing vnfd", "PUT",
-                           "/nsd/v1/ns_descriptors/{}/nsd_content?constituent-vnfd.0.vnfd-id-ref"
-                           "=NONEXISTING-VNFD".format(nsd_id),
-                           r_header_text, "@./cirros_ns/cirros_nsd.yaml", 409, r_header_yaml, "yaml")
-
-        # # VNF_CREATE
-        # r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT",
-        # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
-        #                    r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
-
-        r = test_rest.test("NSD2", "Onboard NSD step 2 as TEXT", "PUT",
-                           "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
-                           r_header_text, "@./cirros_ns/cirros_nsd.yaml", 204, None, 0)
-
-        # nsd SHOW OSM format
-        r = test_rest.test("NSD3", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
-                           headers_json, None, 200, r_header_json, "json")
-
-        # nsd SHOW text
-        r = test_rest.test("NSD4", "Show NSD SOL005 text", "GET",
-                           "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
-                           headers_text, None, 200, r_header_text, "text")
-
-        # nsd UPLOAD ZIP
-        makedirs("temp", exist_ok=True)
-        tar = tarfile.open("temp/cirros_ns.tar.gz", "w:gz")
-        tar.add("cirros_ns")
-        tar.close()
-        r = test_rest.test("NSD5", "Onboard NSD step 3 replace with ZIP", "PUT",
-                           "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
-                           r_header_zip, "@b./temp/cirros_ns.tar.gz", 204, None, 0)
-
-        # nsd SHOW OSM format
-        r = test_rest.test("NSD6", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
-                           headers_json, None, 200, r_header_json, "json")
-
-        # nsd SHOW zip
-        r = test_rest.test("NSD7", "Show NSD SOL005 zip", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
-                           headers_zip, None, 200, r_header_zip, "zip")
-
-        # nsd SHOW descriptor
-        r = test_rest.test("NSD8", "Show NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(nsd_id),
-                           headers_text, None, 200, r_header_text, "text")
-        # nsd SHOW actifact
-        r = test_rest.test("NSD9", "Show NSD artifact", "GET",
-                           "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm_2x.png".format(nsd_id),
-                           headers_text, None, 200, r_header_octect, "text")
-
-        # vnfd DELETE
-        r = test_rest.test("VNFD10", "Delete VNFD conflict", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
-                           headers_yaml, None, 409, r_header_yaml, "yaml")
-
-        # nsd DELETE
-        r = test_rest.test("NSD10", "Delete NSD SOL005 text", "DELETE", "/nsd/v1/ns_descriptors/{}".format(nsd_id),
-                           headers_yaml, None, 204, None, 0)
-
-        # vnfd DELETE
-        r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
-                           headers_yaml, None, 204, None, 0)
-
-        print("PASS")
-
-    except Exception as e:
-        if test:
-            logger.error(test + " Exception: " + str(e))
-            exit(1)
-        else:
-            logger.critical(test + " Exception: " + str(e), exc_info=True)