X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Ftests%2Ftest.py;h=0c08c22dadf80398ab30df953e106f1c1fdc4c0d;hp=54aa0a30e8b3367b54ed445d75ab43eff347e383;hb=ba0dbed4d7fe4cd1ee800a4a3402157224fd7aff;hpb=7ce1db93057aff3f9679f45ac15c9862e8ac6d2a diff --git a/osm_nbi/tests/test.py b/osm_nbi/tests/test.py index 54aa0a3..0c08c22 100755 --- a/osm_nbi/tests/test.py +++ b/osm_nbi/tests/test.py @@ -14,8 +14,8 @@ import os __author__ = "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com" __date__ = "$2018-03-01$" -__version__ = "0.2" -version_date = "Jul 2018" +__version__ = "0.3" +version_date = "Oct 2018" def usage(): @@ -120,8 +120,12 @@ class TestRest: def set_header(self, header): self.s.headers.update(header) + def unset_header(self, key): + if key in self.s.headers: + del self.s.headers[key] + def test(self, name, description, method, url, headers, payload, expected_codes, expected_headers, - expected_payload): + expected_payload, store_file=None): """ Performs an http request and check http code response. Exit if different than allowed. It get the returned id that can be used by following test in the URL with {name} where name is the name of the test @@ -133,7 +137,8 @@ class TestRest: :param payload: Can be a dict, transformed to json, a text or a file if starts with '@' :param expected_codes: expected response codes, can be int, int tuple or int range :param expected_headers: expected response headers, dict with key values - :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip' + :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip', 'octet-stream' + :param store_file: filename to store content :return: requests response """ r = None @@ -174,10 +179,13 @@ class TestRest: self.old_test_description = test_description logger.warning(test_description) stream = False - # if expected_payload == "zip": - # stream = True + if expected_payload in ("zip", "octet-string") or store_file: + stream = True r = getattr(self.s, method.lower())(url, data=payload, headers=headers, verify=self.verify, stream=stream) - logger.debug("RX {}: {}".format(r.status_code, r.text)) + if expected_payload in ("zip", "octet-string") or store_file: + logger.debug("RX {}".format(r.status_code)) + else: + logger.debug("RX {}: {}".format(r.status_code, r.text)) # check response if expected_codes: @@ -208,7 +216,7 @@ class TestRest: yaml.safe_load(r.text) except Exception as e: raise TestException("Expected yaml response payload, but got Exception {}".format(e)) - elif expected_payload == "zip": + elif expected_payload in ("zip", "octet-string"): if len(r.content) == 0: raise TestException("Expected some response payload, but got empty") # try: @@ -222,6 +230,11 @@ class TestRest: if len(r.content) == 0: raise TestException("Expected some response payload, but got empty") # r.text + if store_file: + with open(store_file, 'wb') as fd: + for chunk in r.iter_content(chunk_size=128): + fd.write(chunk) + location = r.headers.get("Location") if location: _id = location[location.rfind("/") + 1:] @@ -249,11 +262,18 @@ class TestRest: # self.project = project r = self.test("TOKEN", "Obtain token", "POST", "/admin/v1/tokens", headers_json, {"username": self.user, "password": self.password, "project_id": self.project}, - (200, 201), {"Content-Type": "application/json"}, "json") + (200, 201), r_header_json, "json") response = r.json() self.token = response["id"] self.set_header({"Authorization": "Bearer {}".format(self.token)}) + def remove_authorization(self): + if self.token: + self.test("TOKEN_DEL", "Delete token", "DELETE", "/admin/v1/tokens/{}".format(self.token), headers_json, + None, (200, 201, 204), None, None) + self.token = None + self.unset_header("Authorization") + def get_create_vim(self, test_osm): if self.vim_id: return self.vim_id @@ -300,7 +320,8 @@ class TestNonAuthorized: description = "test invalid URLs. methods and no authorization" @staticmethod - def run(engine, test_osm, manual_check): + def run(engine, test_osm, manual_check, test_params=None): + engine.remove_authorization() test_not_authorized_list = ( ("NA1", "Invalid token", "GET", "/admin/v1/users", headers_json, None, 401, r_header_json, "json"), ("NA2", "Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml, None, 405, r_header_yaml, "yaml"), @@ -310,6 +331,102 @@ class TestNonAuthorized: engine.test(*t) +class TestUsersProjects: + description = "test project and user creation" + + @staticmethod + def run(engine, test_osm, manual_check, test_params=None): + engine.get_autorization() + engine.test("PU1", "Create project non admin", "POST", "/admin/v1/projects", headers_json, {"name": "P1"}, + (201, 204), {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json") + engine.test("PU2", "Create project admin", "POST", "/admin/v1/projects", headers_json, + {"name": "Padmin", "admin": True}, (201, 204), + {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json") + engine.test("PU3", "Create project bad format", "POST", "/admin/v1/projects", headers_json, {"name": 1}, 422, + r_header_json, "json") + engine.test("PU4", "Create user with bad project", "POST", "/admin/v1/users", headers_json, + {"username": "U1", "projects": ["P1", "P2", "Padmin"], "password": "pw1"}, 409, + r_header_json, "json") + engine.test("PU5", "Create user with bad project and force", "POST", "/admin/v1/users?FORCE=True", headers_json, + {"username": "U1", "projects": ["P1", "P2", "Padmin"], "password": "pw1"}, 201, + {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json") + engine.test("PU6", "Create user 2", "POST", "/admin/v1/users", headers_json, + {"username": "U2", "projects": ["P1"], "password": "pw2"}, 201, + {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json") + + engine.test("PU7", "Edit user U1, delete P2 project", "PATCH", "/admin/v1/users/U1", headers_json, + {"projects": {"$'P2'": None}}, 204, None, None) + res = engine.test("PU1", "Check user U1, contains the right projects", "GET", "/admin/v1/users/U1", + headers_json, None, 200, None, json) + u1 = res.json() + # print(u1) + expected_projects = ["P1", "Padmin"] + if u1["projects"] != expected_projects: + raise TestException("User content projects '{}' different than expected '{}'. Edition has not done" + " properly".format(u1["projects"], expected_projects)) + + engine.test("PU8", "Edit user U1, set Padmin as default project", "PUT", "/admin/v1/users/U1", headers_json, + {"projects": {"$'Padmin'": None, "$+[0]": "Padmin"}}, 204, None, None) + res = engine.test("PU1", "Check user U1, contains the right projects", "GET", "/admin/v1/users/U1", + headers_json, None, 200, None, json) + u1 = res.json() + # print(u1) + expected_projects = ["Padmin", "P1"] + if u1["projects"] != expected_projects: + raise TestException("User content projects '{}' different than expected '{}'. Edition has not done" + " properly".format(u1["projects"], expected_projects)) + + engine.test("PU9", "Edit user U1, change password", "PATCH", "/admin/v1/users/U1", headers_json, + {"password": "pw1_new"}, 204, None, None) + + engine.test("PU10", "Change to project P1 non existing", "POST", "/admin/v1/tokens/", headers_json, + {"project_id": "P1"}, 401, r_header_json, "json") + + res = engine.test("PU1", "Change to user U1 project P1", "POST", "/admin/v1/tokens", headers_json, + {"username": "U1", "password": "pw1_new", "project_id": "P1"}, (200, 201), + r_header_json, "json") + response = res.json() + engine.set_header({"Authorization": "Bearer {}".format(response["id"])}) + + engine.test("PU11", "Edit user projects non admin", "PUT", "/admin/v1/users/U1", headers_json, + {"projects": {"$'P1'": None}}, 401, r_header_json, "json") + engine.test("PU12", "Add new project non admin", "POST", "/admin/v1/projects", headers_json, + {"name": "P2"}, 401, r_header_json, "json") + engine.test("PU13", "Add new user non admin", "POST", "/admin/v1/users", headers_json, + {"username": "U3", "projects": ["P1"], "password": "pw3"}, 401, + r_header_json, "json") + + res = engine.test("PU14", "Change to user U1 project Padmin", "POST", "/admin/v1/tokens", headers_json, + {"project_id": "Padmin"}, (200, 201), r_header_json, "json") + response = res.json() + engine.set_header({"Authorization": "Bearer {}".format(response["id"])}) + + engine.test("PU15", "Add new project admin", "POST", "/admin/v1/projects", headers_json, {"name": "P2"}, + (201, 204), {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json") + engine.test("PU16", "Add new user U3 admin", "POST", "/admin/v1/users", + headers_json, {"username": "U3", "projects": ["P2"], "password": "pw3"}, (201, 204), + {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json") + engine.test("PU17", "Edit user projects admin", "PUT", "/admin/v1/users/U3", headers_json, + {"projects": ["P2"]}, 204, None, None) + + engine.test("PU18", "Delete project P2 conflict", "DELETE", "/admin/v1/projects/P2", headers_json, None, 409, + r_header_json, "json") + engine.test("PU19", "Delete project P2 forcing", "DELETE", "/admin/v1/projects/P2?FORCE=True", headers_json, + None, 204, None, None) + + engine.test("PU20", "Delete user U1. Conflict deleting own user", "DELETE", "/admin/v1/users/U1", headers_json, + None, 409, r_header_json, "json") + engine.test("PU21", "Delete user U2", "DELETE", "/admin/v1/users/U2", headers_json, None, 204, None, None) + engine.test("PU22", "Delete user U3", "DELETE", "/admin/v1/users/U3", headers_json, None, 204, None, None) + # change to admin + engine.remove_authorization() # To force get authorization + engine.get_autorization() + engine.test("PU23", "Delete user U1", "DELETE", "/admin/v1/users/U1", headers_json, None, 204, None, None) + engine.test("PU24", "Delete project P1", "DELETE", "/admin/v1/projects/P1", headers_json, None, 204, None, None) + engine.test("PU25", "Delete project Padmin", "DELETE", "/admin/v1/projects/Padmin", headers_json, None, 204, + None, None) + + class TestFakeVim: description = "Creates/edit/delete fake VIMs and SDN controllers" @@ -348,7 +465,7 @@ class TestFakeVim: ]} ] - def run(self, engine, test_osm, manual_check): + def run(self, engine, test_osm, manual_check, test_params=None): vim_bad = self.vim.copy() vim_bad.pop("name") @@ -393,33 +510,66 @@ class TestVIMSDN(TestFakeVim): def __init__(self): TestFakeVim.__init__(self) - def run(self, engine, test_osm, manual_check): + def run(self, engine, test_osm, manual_check, test_params=None): engine.get_autorization() # Added SDN - engine.test("SDN1", "Create SDN", "POST", "/admin/v1/sdns", headers_json, self.sdn, (201, 204), + engine.test("VIMSDN1", "Create SDN", "POST", "/admin/v1/sdns", headers_json, self.sdn, (201, 204), {"Location": "/admin/v1/sdns/", "Content-Type": "application/json"}, "json") - sleep(5) + # sleep(5) # Edit SDN - engine.test("SDN1", "Edit SDN", "PATCH", "/admin/v1/sdns/", headers_json, {"name": "new_sdn_name"}, - (200, 201, 204), r_header_json, "json") - sleep(5) + engine.test("VIMSDN2", "Edit SDN", "PATCH", "/admin/v1/sdns/", headers_json, {"name": "new_sdn_name"}, + 204, None, None) + # sleep(5) # VIM with SDN - self.vim["config"]["sdn-controller"] = engine.test_ids["SDN1"] + self.vim["config"]["sdn-controller"] = engine.test_ids["VIMSDN1"] self.vim["config"]["sdn-port-mapping"] = self.port_mapping - engine.test("VIM2", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, self.vim, (200, 204, 201), + engine.test("VIMSDN3", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, self.vim, (200, 204, 201), {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/json"}, "json"), self.port_mapping[0]["compute_node"] = "compute node XX" - engine.test("VIMSDN2", "Edit VIM change port-mapping", "PUT", "/admin/v1/vim_accounts/", headers_json, - {"config": {"sdn-port-mapping": self.port_mapping}}, - (200, 201, 204), {"Content-Type": "application/json"}, "json") - engine.test("VIMSDN3", "Edit VIM remove port-mapping", "PUT", "/admin/v1/vim_accounts/", headers_json, - {"config": {"sdn-port-mapping": None}}, - (200, 201, 204), {"Content-Type": "application/json"}, "json") - engine.test("VIMSDN4", "Delete VIM remove port-mapping", "DELETE", "/admin/v1/vim_accounts/", - headers_json, None, (202, 201, 204), None, 0) - engine.test("VIMSDN5", "Delete SDN", "DELETE", "/admin/v1/sdns/", headers_json, None, - (202, 201, 204), None, 0) + engine.test("VIMSDN4", "Edit VIM change port-mapping", "PUT", "/admin/v1/vim_accounts/", headers_json, + {"config": {"sdn-port-mapping": self.port_mapping}}, 204, None, None) + engine.test("VIMSDN5", "Edit VIM remove port-mapping", "PUT", "/admin/v1/vim_accounts/", headers_json, + {"config": {"sdn-port-mapping": None}}, 204, None, None) + + if not test_osm: + # delete with FORCE + engine.test("VIMSDN6", "Delete VIM remove port-mapping", "DELETE", + "/admin/v1/vim_accounts/?FORCE=True", headers_json, None, 202, None, 0) + engine.test("VIMSDN7", "Delete SDNC", "DELETE", "/admin/v1/sdns/?FORCE=True", headers_json, None, + 202, None, 0) + + engine.test("VIMSDN8", "Check VIM is deleted", "GET", "/admin/v1/vim_accounts/", headers_yaml, + None, 404, r_header_yaml, "yaml") + engine.test("VIMSDN9", "Check SDN is deleted", "GET", "/admin/v1/sdns/", headers_yaml, None, + 404, r_header_yaml, "yaml") + else: + # delete and wait until is really deleted + engine.test("VIMSDN6", "Delete VIM remove port-mapping", "DELETE", "/admin/v1/vim_accounts/", + headers_json, None, (202, 201, 204), None, 0) + engine.test("VIMSDN7", "Delete SDN", "DELETE", "/admin/v1/sdns/", headers_json, None, + (202, 201, 204), None, 0) + wait = timeout + while wait >= 0: + r = engine.test("VIMSDN8", "Check VIM is deleted", "GET", "/admin/v1/vim_accounts/", + headers_yaml, None, None, r_header_yaml, "yaml") + if r.status_code == 404: + break + elif r.status_code == 200: + wait -= 5 + sleep(5) + else: + raise TestException("Vim created at 'VIMSDN3' is not delete after {} seconds".format(timeout)) + while wait >= 0: + r = engine.test("VIMSDN9", "Check SDNC is deleted", "GET", "/admin/v1/sdns/", + headers_yaml, None, None, r_header_yaml, "yaml") + if r.status_code == 404: + break + elif r.status_code == 200: + wait -= 5 + sleep(5) + else: + raise TestException("SDNC created at 'VIMSDN1' is not delete after {} seconds".format(timeout)) class TestDeploy: @@ -431,14 +581,23 @@ class TestDeploy: self.vim_id = None self.nsd_test = None self.ns_test = None + self.ns_id = None self.vnfds_test = [] + self.vnfds_id = [] self.descriptor_url = "https://osm-download.etsi.org/ftp/osm-3.0-three/2nd-hackfest/packages/" self.vnfd_filenames = ("cirros_vnf.tar.gz",) self.nsd_filename = "cirros_2vnf_ns.tar.gz" self.uses_configuration = False + self.uss = {} + self.passwds = {} + self.cmds = {} + self.keys = {} + self.timeout = 120 + self.passed_tests = 0 + self.total_tests = 0 def create_descriptors(self, engine): - temp_dir = os.path.dirname(__file__) + "/temp/" + temp_dir = os.path.dirname(os.path.abspath(__file__)) + "/temp/" if not os.path.exists(temp_dir): os.makedirs(temp_dir) for vnfd_filename in self.vnfd_filenames: @@ -461,17 +620,21 @@ class TestDeploy: headers = headers_zip_yaml if self.step % 2 == 0: # vnfd CREATE AND UPLOAD in one step: - engine.test("DEPLOY{}".format(self.step), "Onboard VNFD in one step", "POST", + test_name = "DEPLOY{}".format(self.step) + engine.test(test_name, "Onboard VNFD in one step", "POST", "/vnfpkgm/v1/vnf_packages_content", headers, "@b" + vnfd_filename_path, 201, {"Location": "/vnfpkgm/v1/vnf_packages_content/", "Content-Type": "application/yaml"}, yaml) - self.vnfds_test.append("DEPLOY" + str(self.step)) + self.vnfds_test.append(test_name) + self.vnfds_id.append(engine.test_ids[test_name]) self.step += 1 else: # vnfd CREATE AND UPLOAD ZIP - engine.test("DEPLOY{}".format(self.step), "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", + test_name = "DEPLOY{}".format(self.step) + engine.test(test_name, "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json, None, 201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json") - self.vnfds_test.append("DEPLOY" + str(self.step)) + self.vnfds_test.append(test_name) + self.vnfds_id.append(engine.test_ids[test_name]) self.step += 1 # location = r.headers["Location"] # vnfd_id = location[location.rfind("/")+1:] @@ -537,6 +700,7 @@ class TestDeploy: headers_yaml, ns_data_text, 201, {"Location": "nslcm/v1/ns_instances/", "Content-Type": "application/yaml"}, "yaml") self.ns_test = "DEPLOY{}".format(self.step) + self.ns_id = engine.test_ids[self.ns_test] engine.test_ids[self.ns_test] self.step += 1 r = engine.test("DEPLOY{}".format(self.step), "Instantiate NS step 2", "POST", @@ -563,7 +727,7 @@ class TestDeploy: raise TestException("NS instantiate is not done after {} seconds".format(timeout_deploy)) self.step += 1 - def _wait_nslcmop_ready(self, engine, nslcmop_test, timeout_deploy): + def _wait_nslcmop_ready(self, engine, nslcmop_test, timeout_deploy, expected_fail=False): wait = timeout while wait >= 0: r = engine.test("DEPLOY{}".format(self.step), "Wait to ns lcm operation complete", "GET", @@ -571,9 +735,14 @@ class TestDeploy: 200, r_header_json, "json") nslcmop = r.json() if "COMPLETED" in nslcmop["operationState"]: + if expected_fail: + raise TestException("NS terminate has success, expecting failing: {}".format( + nslcmop["detailed-status"])) break elif "FAILED" in nslcmop["operationState"]: - raise TestException("NS terminate has failed: {}".format(nslcmop["detailed-status"])) + if not expected_fail: + raise TestException("NS terminate has failed: {}".format(nslcmop["detailed-status"])) + break wait -= 5 sleep(5) else: @@ -612,8 +781,88 @@ class TestDeploy: if not isinstance(nslcmops, list) or nslcmops: raise TestException("NS {} deleted but with ns_lcm_op_occ active: {}".format(self.ns_test, nslcmops)) - def test_ns(self, engine, test_osm): - pass + def test_ns(self, engine, test_osm, commands=None, users=None, passwds=None, keys=None, timeout=0): + + n = 0 + r = engine.test("TEST_NS{}".format(n), "GET VNFR_IDs", "GET", + "/nslcm/v1/ns_instances/{}".format(self.ns_id), headers_json, None, + 200, r_header_json, "json") + n += 1 + ns_data = r.json() + + vnfr_list = ns_data['constituent-vnfr-ref'] + time = 0 + + for vnfr_id in vnfr_list: + self.total_tests += 1 + r = engine.test("TEST_NS{}".format(n), "GET IP_ADDRESS OF VNFR", "GET", + "/nslcm/v1/vnfrs/{}".format(vnfr_id), headers_json, None, + 200, r_header_json, "json") + n += 1 + vnfr_data = r.json() + + if vnfr_data.get("ip-address"): + name = "TEST_NS{}".format(n) + description = "Run tests in VNFR with IP {}".format(vnfr_data['ip-address']) + n += 1 + test_description = "Test {} {}".format(name, description) + logger.warning(test_description) + vnf_index = str(vnfr_data["member-vnf-index-ref"]) + while timeout >= time: + result, message = self.do_checks([vnfr_data["ip-address"]], + vnf_index=vnfr_data["member-vnf-index-ref"], + commands=commands.get(vnf_index), user=users.get(vnf_index), + passwd=passwds.get(vnf_index), key=keys.get(vnf_index)) + if result == 1: + logger.warning(message) + break + elif result == 0: + time += 20 + sleep(20) + elif result == -1: + logger.critical(message) + break + else: + time -= 20 + logger.critical(message) + else: + logger.critical("VNFR {} has not mgmt address. Check failed".format(vnfr_id)) + + def do_checks(self, ip, vnf_index, commands=[], user=None, passwd=None, key=None): + try: + import urllib3 + from pssh.clients import ParallelSSHClient + from pssh.utils import load_private_key + from ssh2 import exceptions as ssh2Exception + except ImportError as e: + logger.critical("package or/and is not installed. Please add it with 'pip3 install " + "parallel-ssh' and/or 'pip3 install urllib3': {}".format(e)) + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + try: + p_host = os.environ.get("PROXY_HOST") + p_user = os.environ.get("PROXY_USER") + p_password = os.environ.get("PROXY_PASSWD") + + if key: + pkey = load_private_key(key) + else: + pkey = None + + client = ParallelSSHClient(ip, user=user, password=passwd, pkey=pkey, proxy_host=p_host, + proxy_user=p_user, proxy_password=p_password, timeout=10, num_retries=0) + for cmd in commands: + output = client.run_command(cmd) + client.join(output) + if output[ip[0]].exit_code: + return -1, " VNFR {} could not be checked: {}".format(ip[0], output[ip[0]].stderr) + else: + self.passed_tests += 1 + return 1, " Test successful" + except (ssh2Exception.ChannelFailure, ssh2Exception.SocketDisconnectError, ssh2Exception.SocketTimeout, + ssh2Exception.SocketRecvError) as e: + return 0, "Timeout accessing the VNFR {}: {}".format(ip[0], str(e)) + except Exception as e: + return -1, "ERROR checking the VNFR {}: {}".format(ip[0], str(e)) def aditional_operations(self, engine, test_osm, manual_check): pass @@ -644,10 +893,16 @@ class TestDeploy: if manual_check: input('NS has been deployed. Perform manual check and press enter to resume') else: - self.test_ns(engine, test_osm) + self.test_ns(engine, test_osm, self.cmds, self.uss, self.pss, self.keys, self.timeout) self.aditional_operations(engine, test_osm, manual_check) self.terminate(engine) self.delete_descriptors(engine) + self.print_results() + + def print_results(self): + print("\n\n\n--------------------------------------------") + print("TEST RESULTS:\n PASSED TESTS: {} - TOTAL TESTS: {}".format(self.total_tests, self.passed_tests)) + print("--------------------------------------------") class TestDeployHackfestCirros(TestDeploy): @@ -657,9 +912,85 @@ class TestDeployHackfestCirros(TestDeploy): super().__init__() self.vnfd_filenames = ("cirros_vnf.tar.gz",) self.nsd_filename = "cirros_2vnf_ns.tar.gz" + self.cmds = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]} + self.uss = {'1': "cirros", '2': "cirros"} + self.pss = {'1': "cubswin:)", '2': "cubswin:)"} - def run(self, engine, test_osm, manual_check, test_params=None): - super().run(engine, test_osm, manual_check, test_params) + +class TestDeployHackfest1(TestDeploy): + description = "Load and deploy Hackfest_1_vnfd example" + + def __init__(self): + super().__init__() + self.vnfd_filenames = ("hackfest_1_vnfd.tar.gz",) + self.nsd_filename = "hackfest_1_nsd.tar.gz" + # self.cmds = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]} + # self.uss = {'1': "cirros", '2': "cirros"} + # self.pss = {'1': "cubswin:)", '2': "cubswin:)"} + + +class TestDeployHackfestCirrosScaling(TestDeploy): + description = "Load and deploy Hackfest cirros_2vnf_ns example with scaling modifications" + + def __init__(self): + super().__init__() + self.vnfd_filenames = ("cirros_vnf.tar.gz",) + self.nsd_filename = "cirros_2vnf_ns.tar.gz" + + def create_descriptors(self, engine): + super().create_descriptors(engine) + # Modify VNFD to add scaling and count=2 + payload = """ + vdu: + "$id: 'cirros_vnfd-VM'": + count: 2 + scaling-group-descriptor: + - name: "scale_cirros" + max-instance-count: 2 + vdu: + - vdu-id-ref: cirros_vnfd-VM + count: 2 + """ + engine.test("DEPLOY{}".format(self.step), "Edit VNFD ", "PATCH", + "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfds_id[0]), + headers_yaml, payload, 204, None, None) + self.step += 1 + + def aditional_operations(self, engine, test_osm, manual_check): + if not test_osm: + return + # 2 perform scale out twice + payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_OUT, scaleByStepData: ' \ + '{scaling-group-descriptor: scale_cirros, member-vnf-index: "1"}}}' + for i in range(0, 2): + engine.test("DEPLOY{}".format(self.step), "Execute scale action over NS", "POST", + "/nslcm/v1/ns_instances/<{}>/scale".format(self.ns_test), headers_yaml, payload, + 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml") + nslcmop2_scale_out = "DEPLOY{}".format(self.step) + self._wait_nslcmop_ready(engine, nslcmop2_scale_out, timeout_deploy) + if manual_check: + input('NS scale out done. Check that two more vdus are there') + # TODO check automatic + + # 2 perform scale in + payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_IN, scaleByStepData: ' \ + '{scaling-group-descriptor: scale_cirros, member-vnf-index: "1"}}}' + for i in range(0, 2): + engine.test("DEPLOY{}".format(self.step), "Execute scale IN action over NS", "POST", + "/nslcm/v1/ns_instances/<{}>/scale".format(self.ns_test), headers_yaml, payload, + 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml") + nslcmop2_scale_in = "DEPLOY{}".format(self.step) + self._wait_nslcmop_ready(engine, nslcmop2_scale_in, timeout_deploy) + if manual_check: + input('NS scale in done. Check that two less vdus are there') + # TODO check automatic + + # perform scale in that must fail as reached limit + engine.test("DEPLOY{}".format(self.step), "Execute scale IN out of limit action over NS", "POST", + "/nslcm/v1/ns_instances/<{}>/scale".format(self.ns_test), headers_yaml, payload, + 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml") + nslcmop2_scale_in = "DEPLOY{}".format(self.step) + self._wait_nslcmop_ready(engine, nslcmop2_scale_in, timeout_deploy, expected_fail=True) class TestDeployIpMac(TestDeploy): @@ -671,6 +1002,10 @@ class TestDeployIpMac(TestDeploy): self.nsd_filename = "scenario_2vdu_set_ip_mac.yaml" self.descriptor_url = \ "https://osm.etsi.org/gitweb/?p=osm/RO.git;a=blob_plain;f=test/RO_tests/v3_2vdu_set_ip_mac/" + self.cmds = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]} + self.uss = {'1': "osm", '2': "osm"} + self.pss = {'1': "osm4u", '2': "osm4u"} + self.timeout = 360 def run(self, engine, test_osm, manual_check, test_params=None): # super().run(engine, test_osm, manual_check, test_params) @@ -719,7 +1054,7 @@ class TestDeployIpMac(TestDeploy): "interface": [ { "name": "iface21", - "ip-address": "10.31.31.21", + "ip-address": "10.31.31.22", "mac-address": "52:33:44:55:66:21" }, ], @@ -728,6 +1063,7 @@ class TestDeployIpMac(TestDeploy): }, ] } + super().run(engine, test_osm, manual_check, test_params={"ns-config": instantiation_params}) @@ -739,6 +1075,9 @@ class TestDeployHackfest4(TestDeploy): self.vnfd_filenames = ("hackfest_4_vnfd.tar.gz",) self.nsd_filename = "hackfest_4_nsd.tar.gz" self.uses_configuration = True + self.cmds = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]} + self.uss = {'1': "ubuntu", '2': "ubuntu"} + self.pss = {'1': "osm4u", '2': "osm4u"} def create_descriptors(self, engine): super().create_descriptors(engine) @@ -776,15 +1115,9 @@ class TestDeployHackfest4(TestDeploy): default-value: '/home/ubuntu/touched' """ engine.test("DEPLOY{}".format(self.step), "Edit VNFD ", "PATCH", - "/vnfpkgm/v1/vnf_packages/<{}>".format(self.vnfds_test[0]), - headers_yaml, payload, 200, - r_header_yaml, yaml) - self.vnfds_test.append("DEPLOY" + str(self.step)) + "/vnfpkgm/v1/vnf_packages/<{}>".format(self.vnfds_test[0]), headers_yaml, payload, 204, None, None) self.step += 1 - def run(self, engine, test_osm, manual_check, test_params=None): - super().run(engine, test_osm, manual_check, test_params) - class TestDeployHackfest3Charmed(TestDeploy): description = "Load and deploy Hackfest 3charmed_ns example. Modifies it for adding scaling and performs " \ @@ -795,6 +1128,9 @@ class TestDeployHackfest3Charmed(TestDeploy): self.vnfd_filenames = ("hackfest_3charmed_vnfd.tar.gz",) self.nsd_filename = "hackfest_3charmed_nsd.tar.gz" self.uses_configuration = True + self.cmds = {'1': [''], '2': ['ls -lrt /home/ubuntu/first-touch', ]} + self.uss = {'1': "ubuntu", '2': "ubuntu"} + self.pss = {'1': "osm4u", '2': "osm4u"} # def create_descriptors(self, engine): # super().create_descriptors(engine) @@ -853,7 +1189,11 @@ class TestDeployHackfest3Charmed(TestDeploy): if manual_check: input('NS service primitive has been executed. Check that file /home/ubuntu/OSMTESTNBI is present at ' 'TODO_PUT_IP') - # TODO check automatic + else: + cmds = {'1': [''], '2': ['ls -lrt /home/ubuntu/OSMTESTNBI', ]} + uss = {'1': "ubuntu", '2': "ubuntu"} + pss = {'1': "osm4u", '2': "osm4u"} + self.test_ns(engine, test_osm, cmds, uss, pss, self.keys, self.timeout) # # 2 perform scale out # payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_OUT, scaleByStepData: ' \ @@ -879,8 +1219,119 @@ class TestDeployHackfest3Charmed(TestDeploy): # input('NS scale in done. Check that file /home/ubuntu/touched is updated and new VM is deleted') # # TODO check automatic + +class TestDescriptors: + description = "Test VNFD, NSD, PDU descriptors CRUD and dependencies" + + def __init__(self): + self.step = 0 + self.vnfd_filename = "hackfest_3charmed_vnfd.tar.gz" + self.nsd_filename = "hackfest_3charmed_nsd.tar.gz" + self.descriptor_url = "https://osm-download.etsi.org/ftp/osm-3.0-three/2nd-hackfest/packages/" + self.vnfd_id = None + self.nsd_id = None + def run(self, engine, test_osm, manual_check, test_params=None): - super().run(engine, test_osm, manual_check, test_params) + engine.get_autorization() + temp_dir = os.path.dirname(os.path.abspath(__file__)) + "/temp/" + if not os.path.exists(temp_dir): + os.makedirs(temp_dir) + + # download files + for filename in (self.vnfd_filename, self.nsd_filename): + filename_path = temp_dir + filename + if not os.path.exists(filename_path): + with open(filename_path, "wb") as file: + response = requests.get(self.descriptor_url + filename) + if response.status_code >= 300: + raise TestException("Error downloading descriptor from '{}': {}".format( + self.descriptor_url + filename, response.status_code)) + file.write(response.content) + + vnfd_filename_path = temp_dir + self.vnfd_filename + nsd_filename_path = temp_dir + self.nsd_filename + + # vnfd CREATE AND UPLOAD in one step: + test_name = "DESCRIPTOR{}".format(self.step) + engine.test(test_name, "Onboard VNFD in one step", "POST", + "/vnfpkgm/v1/vnf_packages_content", headers_zip_yaml, "@b" + vnfd_filename_path, 201, + {"Location": "/vnfpkgm/v1/vnf_packages_content/", "Content-Type": "application/yaml"}, "yaml") + self.vnfd_id = engine.test_ids[test_name] + self.step += 1 + + # get vnfd descriptor + engine.test("DESCRIPTOR" + str(self.step), "Get VNFD descriptor", "GET", + "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_id), headers_yaml, None, 200, r_header_yaml, "yaml") + self.step += 1 + + # get vnfd file descriptor + engine.test("DESCRIPTOR" + str(self.step), "Get VNFD file descriptor", "GET", + "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(self.vnfd_id), headers_text, None, 200, + r_header_text, "text", temp_dir+"vnfd-yaml") + self.step += 1 + # TODO compare files: diff vnfd-yaml hackfest_3charmed_vnfd/hackfest_3charmed_vnfd.yaml + + # get vnfd zip file package + engine.test("DESCRIPTOR" + str(self.step), "Get VNFD zip package", "GET", + "/vnfpkgm/v1/vnf_packages/{}/package_content".format(self.vnfd_id), headers_zip, None, 200, + r_header_zip, "zip", temp_dir+"vnfd-zip") + self.step += 1 + # TODO compare files: diff vnfd-zip hackfest_3charmed_vnfd.tar.gz + + # get vnfd artifact + engine.test("DESCRIPTOR" + str(self.step), "Get VNFD artifact package", "GET", + "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/osm.png".format(self.vnfd_id), headers_zip, None, 200, + r_header_octect, "octet-string", temp_dir+"vnfd-icon") + self.step += 1 + # TODO compare files: diff vnfd-icon hackfest_3charmed_vnfd/icons/osm.png + + # nsd CREATE AND UPLOAD in one step: + test_name = "DESCRIPTOR{}".format(self.step) + engine.test(test_name, "Onboard NSD in one step", "POST", + "/nsd/v1/ns_descriptors_content", headers_zip_yaml, "@b" + nsd_filename_path, 201, + {"Location": "/nsd/v1/ns_descriptors_content/", "Content-Type": "application/yaml"}, "yaml") + self.nsd_id = engine.test_ids[test_name] + self.step += 1 + + # get nsd descriptor + engine.test("DESCRIPTOR" + str(self.step), "Get NSD descriptor", "GET", + "/nsd/v1/ns_descriptors/{}".format(self.nsd_id), headers_yaml, None, 200, r_header_yaml, "yaml") + self.step += 1 + + # get nsd file descriptor + engine.test("DESCRIPTOR" + str(self.step), "Get NSD file descriptor", "GET", + "/nsd/v1/ns_descriptors/{}/nsd".format(self.nsd_id), headers_text, None, 200, + r_header_text, "text", temp_dir+"nsd-yaml") + self.step += 1 + # TODO compare files: diff nsd-yaml hackfest_3charmed_nsd/hackfest_3charmed_nsd.yaml + + # get nsd zip file package + engine.test("DESCRIPTOR" + str(self.step), "Get NSD zip package", "GET", + "/nsd/v1/ns_descriptors/{}/nsd_content".format(self.nsd_id), headers_zip, None, 200, + r_header_zip, "zip", temp_dir+"nsd-zip") + self.step += 1 + # TODO compare files: diff nsd-zip hackfest_3charmed_nsd.tar.gz + + # get nsd artifact + engine.test("DESCRIPTOR" + str(self.step), "Get NSD artifact package", "GET", + "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm.png".format(self.nsd_id), headers_zip, None, 200, + r_header_octect, "octet-string", temp_dir+"nsd-icon") + self.step += 1 + # TODO compare files: diff nsd-icon hackfest_3charmed_nsd/icons/osm.png + + # vnfd DELETE + test_rest.test("DESCRIPTOR" + str(self.step), "Delete VNFD conflict", "DELETE", + "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_id), headers_yaml, None, 409, None, None) + self.step += 1 + + test_rest.test("DESCRIPTOR" + str(self.step), "Delete VNFD force", "DELETE", + "/vnfpkgm/v1/vnf_packages/{}?FORCE=TRUE".format(self.vnfd_id), headers_yaml, None, 204, None, 0) + self.step += 1 + + # nsd DELETE + test_rest.test("DESCRIPTOR" + str(self.step), "Delete NSD", "DELETE", + "/nsd/v1/ns_descriptors/{}".format(self.nsd_id), headers_yaml, None, 204, None, 0) + self.step += 1 if __name__ == "__main__": @@ -906,12 +1357,17 @@ if __name__ == "__main__": test_classes = { "NonAuthorized": TestNonAuthorized, "FakeVIM": TestFakeVim, + "TestUsersProjects": TestUsersProjects, "VIM-SDN": TestVIMSDN, "Deploy-Custom": TestDeploy, "Deploy-Hackfest-Cirros": TestDeployHackfestCirros, + "Deploy-Hackfest-Cirros-Scaling": TestDeployHackfestCirrosScaling, "Deploy-Hackfest-3Charmed": TestDeployHackfest3Charmed, "Deploy-Hackfest-4": TestDeployHackfest4, "Deploy-CirrosMacIp": TestDeployIpMac, + "TestDescriptors": TestDescriptors, + "TestDeployHackfest1": TestDeployHackfest1, + # "Deploy-MultiVIM": TestDeployMultiVIM, } test_to_do = [] test_params = {} @@ -989,136 +1445,6 @@ if __name__ == "__main__": exit(0) # get token - - # # tests once authorized - # for t in test_authorized_list: - # test_rest.test(*t) - # - # # tests admin - # for t in test_admin_list1: - # test_rest.test(*t) - # - # # vnfd CREATE - # r = test_rest.test("VNFD1", "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json, None, - # 201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json") - # location = r.headers["Location"] - # vnfd_id = location[location.rfind("/")+1:] - # # print(location, vnfd_id) - # - # # vnfd UPLOAD test - # r = test_rest.test("VNFD2", "Onboard VNFD step 2 as TEXT", "PUT", - # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id), - # r_header_text, "@./cirros_vnf/cirros_vnfd.yaml", 204, None, 0) - # - # # vnfd SHOW OSM format - # r = test_rest.test("VNFD3", "Show VNFD OSM format", "GET", - # "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id), - # headers_json, None, 200, r_header_json, "json") - # - # # vnfd SHOW text - # r = test_rest.test("VNFD4", "Show VNFD SOL005 text", "GET", - # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id), - # headers_text, None, 200, r_header_text, "text") - # - # # vnfd UPLOAD ZIP - # makedirs("temp", exist_ok=True) - # tar = tarfile.open("temp/cirros_vnf.tar.gz", "w:gz") - # tar.add("cirros_vnf") - # tar.close() - # r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT", - # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id), - # r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0) - # - # # vnfd SHOW OSM format - # r = test_rest.test("VNFD6", "Show VNFD OSM format", "GET", - # "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id), - # headers_json, None, 200, r_header_json, "json") - # - # # vnfd SHOW zip - # r = test_rest.test("VNFD7", "Show VNFD SOL005 zip", "GET", - # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id), - # headers_zip, None, 200, r_header_zip, "zip") - # # vnfd SHOW descriptor - # r = test_rest.test("VNFD8", "Show VNFD descriptor", "GET", - # "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(vnfd_id), - # headers_text, None, 200, r_header_text, "text") - # # vnfd SHOW actifact - # r = test_rest.test("VNFD9", "Show VNFD artifact", "GET", - # "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/cirros-64.png".format(vnfd_id), - # headers_text, None, 200, r_header_octect, "text") - # - # # # vnfd DELETE - # # r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE", - # # "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id), - # # headers_yaml, None, 204, None, 0) - # - # # nsd CREATE - # r = test_rest.test("NSD1", "Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json, None, - # 201, {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json") - # location = r.headers["Location"] - # nsd_id = location[location.rfind("/")+1:] - # # print(location, nsd_id) - # - # # nsd UPLOAD test - # r = test_rest.test("NSD2", "Onboard NSD with missing vnfd", "PUT", - # "/nsd/v1/ns_descriptors/<>/nsd_content?constituent-vnfd.0.vnfd-id-ref" - # "=NONEXISTING-VNFD".format(nsd_id), - # r_header_text, "@./cirros_ns/cirros_nsd.yaml", 409, r_header_yaml, "yaml") - # - # # # VNF_CREATE - # # r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT", - # # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id), - # # r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0) - # - # r = test_rest.test("NSD2", "Onboard NSD step 2 as TEXT", "PUT", - # "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id), - # r_header_text, "@./cirros_ns/cirros_nsd.yaml", 204, None, 0) - # - # # nsd SHOW OSM format - # r = test_rest.test("NSD3", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id), - # headers_json, None, 200, r_header_json, "json") - # - # # nsd SHOW text - # r = test_rest.test("NSD4", "Show NSD SOL005 text", "GET", - # "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id), - # headers_text, None, 200, r_header_text, "text") - # - # # nsd UPLOAD ZIP - # makedirs("temp", exist_ok=True) - # tar = tarfile.open("temp/cirros_ns.tar.gz", "w:gz") - # tar.add("cirros_ns") - # tar.close() - # r = test_rest.test("NSD5", "Onboard NSD step 3 replace with ZIP", "PUT", - # "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id), - # r_header_zip, "@b./temp/cirros_ns.tar.gz", 204, None, 0) - # - # # nsd SHOW OSM format - # r = test_rest.test("NSD6", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id), - # headers_json, None, 200, r_header_json, "json") - # - # # nsd SHOW zip - # r = test_rest.test("NSD7","Show NSD SOL005 zip","GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id), - # headers_zip, None, 200, r_header_zip, "zip") - # - # # nsd SHOW descriptor - # r = test_rest.test("NSD8", "Show NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(nsd_id), - # headers_text, None, 200, r_header_text, "text") - # # nsd SHOW actifact - # r = test_rest.test("NSD9", "Show NSD artifact", "GET", - # "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm_2x.png".format(nsd_id), - # headers_text, None, 200, r_header_octect, "text") - # - # # vnfd DELETE - # r = test_rest.test("VNFD10", "Delete VNFD conflict", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id), - # headers_yaml, None, 409, r_header_yaml, "yaml") - # - # # nsd DELETE - # r = test_rest.test("NSD10", "Delete NSD SOL005 text", "DELETE", "/nsd/v1/ns_descriptors/{}".format(nsd_id), - # headers_yaml, None, 204, None, 0) - # - # # vnfd DELETE - # r = test_rest.test("VNFD10","Delete VNFD SOL005 text","DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id), - # headers_yaml, None, 204, None, 0) print("PASS") except TestException as e: