X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Ftests%2Ftest.py;h=3033a346d8abd64a43ee9f8562ed15cacbddc8b0;hp=bcdfadf0d6ef21c1307325b0be004fae4e0c01b5;hb=0952a48159c11b6d31fff6617f04f06351ed79f3;hpb=b57758d4a3fd88baa348cfa0812e9f4a742761d1 diff --git a/osm_nbi/tests/test.py b/osm_nbi/tests/test.py index bcdfadf..3033a34 100755 --- a/osm_nbi/tests/test.py +++ b/osm_nbi/tests/test.py @@ -1,6 +1,19 @@ #! /usr/bin/python3 # -*- coding: utf-8 -*- +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import getopt import sys import requests @@ -10,7 +23,11 @@ import yaml # import json # import tarfile from time import sleep +from random import randint import os +from sys import stderr +from uuid import uuid4 +import re __author__ = "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com" __date__ = "$2018-03-01$" @@ -21,7 +38,7 @@ version_date = "Oct 2018" def usage(): print("Usage: ", sys.argv[0], "[options]") print(" Performs system tests over running NBI. It can be used for real OSM test using option '--test-osm'") - print(" If this is the case env variables 'OSMNBITEST_VIM_NAME' must be suplied to create a VIM if not exist " + print(" If this is the case env variables 'OSMNBITEST_VIM_NAME' must be supplied to create a VIM if not exist " "where deployment is done") print("OPTIONS") print(" -h|--help: shows this help") @@ -57,28 +74,22 @@ def usage(): r_header_json = {"Content-type": "application/json"} -headers_json = { - "Content-type": "application/json", - "Accept": "application/json", -} +headers_json = {"Content-type": "application/json", "Accept": "application/json"} r_header_yaml = {"Content-type": "application/yaml"} -headers_yaml = { - "Content-type": "application/yaml", - "Accept": "application/yaml", -} +headers_yaml = {"Content-type": "application/yaml", "Accept": "application/yaml"} r_header_text = {"Content-type": "text/plain"} r_header_octect = {"Content-type": "application/octet-stream"} -headers_text = { - "Accept": "text/plain", -} +headers_text = {"Accept": "text/plain,application/yaml"} r_header_zip = {"Content-type": "application/zip"} -headers_zip = { - "Accept": "application/zip", -} -headers_zip_yaml = { - "Accept": "application/yaml", "Content-type": "application/zip" -} - +headers_zip = {"Accept": "application/zip,application/yaml"} +headers_zip_yaml = {"Accept": "application/yaml", "Content-type": "application/zip"} +headers_zip_json = {"Accept": "application/json", "Content-type": "application/zip"} +headers_txt_json = {"Accept": "application/json", "Content-type": "text/plain"} +r_headers_yaml_location_vnfd = {"Location": "/vnfpkgm/v1/vnf_packages_content/", "Content-Type": "application/yaml"} +r_headers_yaml_location_nsd = {"Location": "/nsd/v1/ns_descriptors_content/", "Content-Type": "application/yaml"} +r_headers_yaml_location_nst = {"Location": "/nst/v1/netslice_templates_content", "Content-Type": "application/yaml"} +r_headers_yaml_location_nslcmop = {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"} +r_headers_yaml_location_nsilcmop = {"Location": "/osm/nsilcm/v1/nsi_lcm_op_occs/", "Content-Type": "application/yaml"} # test ones authorized test_authorized_list = ( @@ -114,22 +125,32 @@ class TestRest: self.project = project self.vim_id = None # contains ID of tests obtained from Location response header. "" key contains last obtained id - self.test_ids = {} - self.old_test_description = "" + self.last_id = "" + self.test_name = None + self.step = 0 # number of subtest under test + self.passed_tests = 0 + self.failed_tests = 0 + + def set_test_name(self, test_name): + self.test_name = test_name + self.step = 0 + self.last_id = "" def set_header(self, header): self.s.headers.update(header) + def set_tet_name(self, test_name): + self.test_name = test_name + def unset_header(self, key): if key in self.s.headers: del self.s.headers[key] - def test(self, name, description, method, url, headers, payload, expected_codes, expected_headers, - expected_payload, store_file=None): + def test(self, description, method, url, headers, payload, expected_codes, expected_headers, + expected_payload, store_file=None, pooling=False): """ Performs an http request and check http code response. Exit if different than allowed. It get the returned id that can be used by following test in the URL with {name} where name is the name of the test - :param name: short name of the test :param description: description of the test :param method: HTTP method: GET,PUT,POST,DELETE,... :param url: complete URL or relative URL @@ -139,6 +160,7 @@ class TestRest: :param expected_headers: expected response headers, dict with key values :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip', 'octet-stream' :param store_file: filename to store content + :param pooling: if True do not count neither log this test. Because a pooling is done with many equal requests :return: requests response """ r = None @@ -151,16 +173,8 @@ class TestRest: elif not url.startswith("http"): url = self.url_base + url - var_start = url.find("<") + 1 - while var_start: - var_end = url.find(">", var_start) - if var_end == -1: - break - var_name = url[var_start:var_end] - if var_name in self.test_ids: - url = url[:var_start-1] + self.test_ids[var_name] + url[var_end+1:] - var_start += len(self.test_ids[var_name]) - var_start = url.find("<", var_start) + 1 + # replace url <> with the last ID + url = url.replace("<>", self.last_id) if payload: if isinstance(payload, str): if payload.startswith("@"): @@ -174,14 +188,25 @@ class TestRest: elif isinstance(payload, dict): payload = json.dumps(payload) - test_description = "Test {} {} {} {}".format(name, description, method, url) - if self.old_test_description != test_description: - self.old_test_description = test_description + if not pooling: + test_description = "Test {}{} {} {} {}".format(self.test_name, self.step, description, method, url) logger.warning(test_description) + self.step += 1 stream = False if expected_payload in ("zip", "octet-string") or store_file: stream = True - r = getattr(self.s, method.lower())(url, data=payload, headers=headers, verify=self.verify, stream=stream) + __retry = 0 + while True: + try: + r = getattr(self.s, method.lower())(url, data=payload, headers=headers, verify=self.verify, + stream=stream) + break + except requests.exceptions.ConnectionError as e: + if __retry == 2: + raise + logger.error("Exception {}. Retrying".format(e)) + __retry += 1 + if expected_payload in ("zip", "octet-string") or store_file: logger.debug("RX {}".format(r.status_code)) else: @@ -239,20 +264,30 @@ class TestRest: if location: _id = location[location.rfind("/") + 1:] if _id: - self.test_ids[name] = str(_id) - self.test_ids[""] = str(_id) # last id + self.last_id = str(_id) + if not pooling: + self.passed_tests += 1 return r except TestException as e: + self.failed_tests += 1 r_status_code = None r_text = None if r: r_status_code = r.status_code r_text = r.text logger.error("{} \nRX code{}: {}".format(e, r_status_code, r_text)) - exit(1) + return None + # exit(1) except IOError as e: - logger.error("Cannot open file {}".format(e)) - exit(1) + if store_file: + logger.error("Cannot open file {}: {}".format(store_file, e)) + else: + logger.error("Exception: {}".format(e), exc_info=True) + self.failed_tests += 1 + return None + # exit(1) + except requests.exceptions.RequestException as e: + logger.error("Exception: {}".format(e)) def get_autorization(self): # user=None, password=None, project=None): if self.token: # and self.user == user and self.password == password and self.project == project: @@ -260,16 +295,18 @@ class TestRest: # self.user = user # self.password = password # self.project = project - r = self.test("TOKEN", "Obtain token", "POST", "/admin/v1/tokens", headers_json, + r = self.test("Obtain token", "POST", "/admin/v1/tokens", headers_json, {"username": self.user, "password": self.password, "project_id": self.project}, (200, 201), r_header_json, "json") + if not r: + return response = r.json() self.token = response["id"] self.set_header({"Authorization": "Bearer {}".format(self.token)}) def remove_authorization(self): if self.token: - self.test("TOKEN_DEL", "Delete token", "DELETE", "/admin/v1/tokens/{}".format(self.token), headers_json, + self.test("Delete token", "DELETE", "/admin/v1/tokens/{}".format(self.token), headers_json, None, (200, 201, 204), None, None) self.token = None self.unset_header("Authorization") @@ -286,8 +323,10 @@ class TestRest: else: vim_name = "fakeVim" # Get VIM - r = self.test("_VIMGET1", "Get VIM ID", "GET", "/admin/v1/vim_accounts?name={}".format(vim_name), headers_json, + r = self.test("Get VIM ID", "GET", "/admin/v1/vim_accounts?name={}".format(vim_name), headers_json, None, 200, r_header_json, "json") + if not r: + return vims = r.json() if vims: return vims[0]["_id"] @@ -310,22 +349,104 @@ class TestRest: else: vim_data = "{schema_version: '1.0', name: fakeVim, vim_type: openstack, vim_url: 'http://10.11.12.13/fake'"\ ", vim_tenant_name: 'vimtenant', vim_user: vimuser, vim_password: vimpassword}" - r = self.test("_VIMGET2", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_yaml, vim_data, - (201), {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/yaml"}, "yaml") - location = r.headers.get("Location") - return location[location.rfind("/") + 1:] + self.test("Create VIM", "POST", "/admin/v1/vim_accounts", headers_yaml, vim_data, + (201, 202), {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/yaml"}, "yaml") + return self.last_id + + def print_results(self): + print("\n\n\n--------------------------------------------") + print("TEST RESULTS: Total: {}, Passed: {}, Failed: {}".format(self.passed_tests + self.failed_tests, + self.passed_tests, self.failed_tests)) + print("--------------------------------------------") + + def wait_until_delete(self, url_op, timeout_delete): + """ + Make a pooling until topic is not present, because of deleted + :param url_op: + :param timeout_delete: + :return: + """ + description = "Wait to topic being deleted" + test_description = "Test {}{} {} {} {}".format(self.test_name, self.step, description, "GET", url_op) + logger.warning(test_description) + self.step += 1 + + wait = timeout_delete + while wait >= 0: + r = self.test(description, "GET", url_op, headers_yaml, None, (200, 404), None, r_header_yaml, "yaml", + pooling=True) + if not r: + return + if r.status_code == 404: + self.passed_tests += 1 + break + elif r.status_code == 200: + wait -= 5 + sleep(5) + else: + raise TestException("Topic is not deleted after {} seconds".format(timeout_delete)) + self.failed_tests += 1 + + def wait_operation_ready(self, ns_nsi, opp_id, timeout, expected_fail=False): + """ + Wait until nslcmop or nsilcmop finished + :param ns_nsi: "ns" o "nsi" + :param opp_id: Id o fthe operation + :param timeout: + :param expected_fail: + :return: None. Updates passed/failed_tests + """ + if ns_nsi == "ns": + url_op = "/nslcm/v1/ns_lcm_op_occs/{}".format(opp_id) + else: + url_op = "/nsilcm/v1/nsi_lcm_op_occs/{}".format(opp_id) + description = "Wait to {} lcm operation complete".format(ns_nsi) + test_description = "Test {}{} {} {} {}".format(self.test_name, self.step, description, "GET", url_op) + logger.warning(test_description) + self.step += 1 + wait = timeout + while wait >= 0: + r = self.test(description, "GET", url_op, headers_json, None, + 200, r_header_json, "json", pooling=True) + if not r: + return + nslcmop = r.json() + if "COMPLETED" in nslcmop["operationState"]: + if expected_fail: + logger.error("NS terminate has success, expecting failing: {}".format(nslcmop["detailed-status"])) + self.failed_tests += 1 + else: + self.passed_tests += 1 + break + elif "FAILED" in nslcmop["operationState"]: + if not expected_fail: + logger.error("NS terminate has failed: {}".format(nslcmop["detailed-status"])) + self.failed_tests += 1 + else: + self.passed_tests += 1 + break + + print(".", end="", file=stderr) + wait -= 10 + sleep(10) + else: + self.failed_tests += 1 + logger.error("NS instantiate is not terminate after {} seconds".format(timeout)) + return + print("", file=stderr) class TestNonAuthorized: - description = "test invalid URLs. methods and no authorization" + description = "Test invalid URLs. methods and no authorization" @staticmethod def run(engine, test_osm, manual_check, test_params=None): + engine.set_test_name("NonAuth") engine.remove_authorization() test_not_authorized_list = ( - ("NA1", "Invalid token", "GET", "/admin/v1/users", headers_json, None, 401, r_header_json, "json"), - ("NA2", "Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml, None, 405, r_header_yaml, "yaml"), - ("NA3", "Invalid version", "DELETE", "/admin/v2/users", headers_yaml, None, 405, r_header_yaml, "yaml"), + ("Invalid token", "GET", "/admin/v1/users", headers_json, None, 401, r_header_json, "json"), + ("Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml, None, 405, r_header_yaml, "yaml"), + ("Invalid version", "DELETE", "/admin/v2/users", headers_yaml, None, 405, r_header_yaml, "yaml"), ) for t in test_not_authorized_list: engine.test(*t) @@ -336,95 +457,461 @@ class TestUsersProjects: @staticmethod def run(engine, test_osm, manual_check, test_params=None): + engine.set_test_name("UserProject") + # backend = test_params.get("backend") if test_params else None # UNUSED + + # Initialisation + p1 = p2 = p3 = None + padmin = pbad = None + u1 = u2 = u3 = u4 = None + engine.get_autorization() - engine.test("PU1", "Create project non admin", "POST", "/admin/v1/projects", headers_json, {"name": "P1"}, - (201, 204), {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json") - engine.test("PU2", "Create project admin", "POST", "/admin/v1/projects", headers_json, + + res = engine.test("Create project non admin 1", "POST", "/admin/v1/projects", headers_json, {"name": "P1"}, + (201, 204), {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json") + p1 = engine.last_id if res else None + + res = engine.test("Create project admin", "POST", "/admin/v1/projects", headers_json, + {"name": "Padmin", "admin": True}, (201, 204), + {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json") + padmin = engine.last_id if res else None + + res = engine.test("Create project bad format", "POST", "/admin/v1/projects", headers_json, {"name": 1}, + (400, 422), r_header_json, "json") + pbad = engine.last_id if res else None + + res = engine.test("Get project admin role", "GET", "/admin/v1/roles?name=project_admin", headers_json, {}, + (200), {"Content-Type": "application/json"}, "json") + rpa = res.json()[0]["_id"] if res else None + res = engine.test("Get project user role", "GET", "/admin/v1/roles?name=project_user", headers_json, {}, + (200), {"Content-Type": "application/json"}, "json") + rpu = res.json()[0]["_id"] if res else None + res = engine.test("Get system admin role", "GET", "/admin/v1/roles?name=system_admin", headers_json, {}, + (200), {"Content-Type": "application/json"}, "json") + rsa = res.json()[0]["_id"] if res else None + + data = {"username": "U1", "password": "pw1"} + p2 = uuid4().hex + data["project_role_mappings"] = [ + {"project": p1, "role": rpa}, + {"project": p2, "role": rpa}, + {"project": padmin, "role": rpu} + ] + rc = 201 + xhd = {"Location": "/admin/v1/users/", "Content-Type": "application/json"} + res = engine.test("Create user with bad project and force", "POST", "/admin/v1/users?FORCE=True", headers_json, + data, rc, xhd, "json") + if res: + u1 = engine.last_id + else: + # User is created sometimes even though an exception is raised + res = engine.test("Get user U1", "GET", "/admin/v1/users?username=U1", headers_json, {}, + (200), {"Content-Type": "application/json"}, "json") + u1 = res.json()[0]["_id"] if res else None + + data = {"username": "U2", "password": "pw2"} + data["project_role_mappings"] = [{"project": p1, "role": rpa}, {"project": padmin, "role": rsa}] + res = engine.test("Create user 2", "POST", "/admin/v1/users", headers_json, + data, 201, {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json") + u2 = engine.last_id if res else None + + if u1: + ftt = "project_role_mappings" + xpr = [{"project": p1, "role": rpa}, {"project": padmin, "role": rpu}] + data = {ftt: xpr} + engine.test("Edit user U1, delete P2 project", "PATCH", "/admin/v1/users/"+u1, headers_json, + data, 204, None, None) + res = engine.test("Check user U1, contains the right projects", "GET", "/admin/v1/users/"+u1, + headers_json, None, 200, None, json) + if res: + rj = res.json() + xpr[0]["project_name"] = "P1" + xpr[0]["role_name"] = "project_admin" + xpr[1]["project_name"] = "Padmin" + xpr[1]["role_name"] = "project_user" + ok = True + for pr in rj[ftt]: + if pr not in xpr: + ok = False + for pr in xpr: + if pr not in rj[ftt]: + ok = False + if not ok: + logger.error("User {} '{}' are different than expected '{}'. Edition was not done properly" + .format(ftt, rj[ftt], xpr)) + engine.failed_tests += 1 + + p2 = None # To prevent deletion attempts + + # Add a test of 'default project' for Keystone? + + if u2: + engine.test("Edit user U2, change password", "PUT", "/admin/v1/users/"+u2, headers_json, + {"password": "pw2_new"}, 204, None, None) + + if p1: + engine.test("Change to project P1 non existing", "POST", "/admin/v1/tokens/", headers_json, + {"project_id": p1}, 401, r_header_json, "json") + + if u2 and p1: + res = engine.test("Change to user U2 project P1", "POST", "/admin/v1/tokens", headers_json, + {"username": "U2", "password": "pw2_new", "project_id": "P1"}, (200, 201), + r_header_json, "json") + if res: + rj = res.json() + engine.set_header({"Authorization": "Bearer {}".format(rj["id"])}) + + engine.test("Edit user projects non admin", "PUT", "/admin/v1/users/U1", headers_json, + {"remove_project_role_mappings": [{"project": "P1", "role": None}]}, + 401, r_header_json, "json") + + res = engine.test("Add new project non admin", "POST", "/admin/v1/projects", headers_json, + {"name": "P2"}, 401, r_header_json, "json") + if res is None or res.status_code == 201: + # The project has been created even though it shouldn't + res = engine.test("Get project P2", "GET", "/admin/v1/projects/P2", headers_json, None, + 200, r_header_json, "json") + p2 = res.json()["_id"] if res else None + + if p1: + data = {"username": "U3", "password": "pw3"} + data["project_role_mappings"] = [{"project": p1, "role": rpu}] + res = engine.test("Add new user non admin", "POST", "/admin/v1/users", headers_json, + data, 401, r_header_json, "json") + if res is None or res.status_code == 201: + # The user has been created even though it shouldn't + res = engine.test("Get user U3", "GET", "/admin/v1/users/U3", headers_json, None, + 200, r_header_json, "json") + u3 = res.json()["_id"] if res else None + else: + u3 = None + + if padmin: + res = engine.test("Change to user U2 project Padmin", "POST", "/admin/v1/tokens", headers_json, + {"project_id": "Padmin"}, # Caused a Keystone authentication error + # {"username": "U2", "password": "pw2_new", "project_id": "Padmin"}, + (200, 201), r_header_json, "json") + if res: + rj = res.json() + engine.set_header({"Authorization": "Bearer {}".format(rj["id"])}) + + res = engine.test("Add new project admin", "POST", "/admin/v1/projects", headers_json, + {"name": "P3"}, (201, 204), + {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, + "json") + p3 = engine.last_id if res else None + + if p1: + data = {"username": "U4", "password": "pw4"} + data["project_role_mappings"] = [{"project": p1, "role": rpa}] + res = engine.test("Add new user admin", "POST", "/admin/v1/users", headers_json, + data, (201, 204), + {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, + "json") + u4 = engine.last_id if res else None + else: + u4 = None + + if u4 and p3: + data = {"project_role_mappings": [{"project": p3, "role": rpa}]} + engine.test("Edit user projects admin", "PUT", "/admin/v1/users/U4", headers_json, + data, 204, None, None) + # Project is deleted even though it shouldn't - PROVISIONAL? + res = engine.test("Delete project P3 conflict", "DELETE", "/admin/v1/projects/"+p3, + headers_json, None, 409, None, None) + if res and res.status_code in (200, 204): + p3 = None + if p3: + res = engine.test("Delete project P3 forcing", "DELETE", + "/admin/v1/projects/"+p3+"?FORCE=True", headers_json, None, 204, + None, None) + if res and res.status_code in (200, 204): + p3 = None + + if u2: + res = engine.test("Delete user U2. Conflict deleting own user", "DELETE", + "/admin/v1/users/"+u2, headers_json, None, 409, r_header_json, "json") + if res is None or res.status_code in (200, 204): + u2 = None + if u4: + res = engine.test("Delete user U4", "DELETE", "/admin/v1/users/"+u4, headers_json, None, + 204, None, None) + if res and res.status_code in (200, 204): + u4 = None + if p3: + res = engine.test("Delete project P3", "DELETE", "/admin/v1/projects/"+p3, headers_json, + None, 204, None, None) + if res and res.status_code in (200, 204): + p3 = None + + if u3: + res = engine.test("Delete user U3", "DELETE", "/admin/v1/users/"+u3, headers_json, None, + 204, None, None) + if res: + u3 = None + + # change to admin + engine.remove_authorization() # To force get authorization + engine.get_autorization() + if u1: + engine.test("Delete user U1", "DELETE", "/admin/v1/users/"+u1, headers_json, None, 204, None, None) + if u2: + engine.test("Delete user U2", "DELETE", "/admin/v1/users/"+u2, headers_json, None, 204, None, None) + if u3: + engine.test("Delete user U3", "DELETE", "/admin/v1/users/"+u3, headers_json, None, 204, None, None) + if u4: + engine.test("Delete user U4", "DELETE", "/admin/v1/users/"+u4, headers_json, None, 204, None, None) + if p1: + engine.test("Delete project P1", "DELETE", "/admin/v1/projects/"+p1, headers_json, None, 204, None, None) + if p2: + engine.test("Delete project P2", "DELETE", "/admin/v1/projects/"+p2, headers_json, None, 204, None, None) + if p3: + engine.test("Delete project P3", "DELETE", "/admin/v1/projects/"+p3, headers_json, None, 204, None, None) + if padmin: + engine.test("Delete project Padmin", "DELETE", "/admin/v1/projects/"+padmin, headers_json, None, 204, + None, None) + if pbad: + engine.test("Delete bad project", "DELETE", "/admin/v1/projects/"+pbad, headers_json, None, 204, + None, None) + + # BEGIN New Tests - Addressing Projects/Users by Name/ID + pid1 = pid2 = None + uid1 = uid2 = None + res = engine.test("Create new project P1", "POST", "/admin/v1/projects", headers_json, {"name": "P1"}, + 201, {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json") + if res: + pid1 = res.json()["id"] + # print("# pid =", pid1) + res = engine.test("Create new project P2", "POST", "/admin/v1/projects", headers_json, {"name": "P2"}, + 201, {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json") + if res: + pid2 = res.json()["id"] + # print("# pid =", pid2) + data = {"username": "U1", "password": "pw1"} + data["project_role_mappings"] = [{"project": pid1, "role": rpu}] + res = engine.test("Create new user U1", "POST", "/admin/v1/users", headers_json, data, 201, + {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json") + if res: + uid1 = res.json()["id"] + # print("# uid =", uid1) + data = {"username": "U2", "password": "pw2"} + data["project_role_mappings"] = [{"project": pid2, "role": rpu}] + res = engine.test("Create new user U2", "POST", "/admin/v1/users", headers_json, data, 201, + {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json") + if res: + uid2 = res.json()["id"] + # print("# uid =", uid2) + if pid1: + engine.test("Get Project P1 by Name", "GET", "/admin/v1/projects/P1", headers_json, None, + 200, None, "json") + engine.test("Get Project P1 by ID", "GET", "/admin/v1/projects/"+pid1, headers_json, None, + 200, None, "json") + if uid1: + engine.test("Get User U1 by Name", "GET", "/admin/v1/users/U1", headers_json, None, 200, None, "json") + engine.test("Get User U1 by ID", "GET", "/admin/v1/users/"+uid1, headers_json, None, 200, None, "json") + if pid1: + res = engine.test("Rename Project P1 by Name", "PUT", "/admin/v1/projects/P1", headers_json, + {"name": "P3"}, 204, None, None) + if res: + engine.test("Get Project P1 by new Name", "GET", "/admin/v1/projects/P3", headers_json, None, + 200, None, "json") + if pid2: + res = engine.test("Rename Project P2 by ID", "PUT", "/admin/v1/projects/"+pid2, headers_json, + {"name": "P4"}, 204, None, None) + if res: + engine.test("Get Project P2 by new Name", "GET", "/admin/v1/projects/P4", headers_json, None, + 200, None, "json") + + if uid1: + res = engine.test("Rename User U1 by Name", "PUT", "/admin/v1/users/U1", headers_json, + {"username": "U3"}, 204, None, None) + if res: + engine.test("Get User U1 by new Name", "GET", "/admin/v1/users/U3", headers_json, None, + 200, None, "json") + + if uid2: + res = engine.test("Rename User U2 by ID", "PUT", "/admin/v1/users/"+uid2, headers_json, + {"username": "U4"}, 204, None, None) + if res: + engine.test("Get User U2 by new Name", "GET", "/admin/v1/users/U4", headers_json, None, + 200, None, "json") + if uid1: + res = engine.test("Delete User U1 by Name", "DELETE", "/admin/v1/users/U3", headers_json, None, + 204, None, None) + if res: + uid1 = None + + if uid2: + res = engine.test("Delete User U2 by ID", "DELETE", "/admin/v1/users/"+uid2, headers_json, None, + 204, None, None) + if res: + uid2 = None + + if pid1: + res = engine.test("Delete Project P1 by Name", "DELETE", "/admin/v1/projects/P3", headers_json, None, + 204, None, None) + if res: + pid1 = None + + if pid2: + res = engine.test("Delete Project P2 by ID", "DELETE", "/admin/v1/projects/"+pid2, headers_json, None, + 204, None, None) + if res: + pid2 = None + + # END New Tests - Addressing Projects/Users by Name + + # CLEANUP + if pid1: + engine.test("Delete Project P1", "DELETE", "/admin/v1/projects/"+pid1, headers_json, None, 204, None, None) + if pid2: + engine.test("Delete Project P2", "DELETE", "/admin/v1/projects/"+pid2, headers_json, None, 204, None, None) + if uid1: + engine.test("Delete User U1", "DELETE", "/admin/v1/users/"+uid1, headers_json, None, 204, None, None) + if uid2: + engine.test("Delete User U2", "DELETE", "/admin/v1/users/"+uid2, headers_json, None, 204, None, None) + + engine.remove_authorization() # To finish + + +class TestProjectsDescriptors: + description = "test descriptors visibility among projects" + + @staticmethod + def run(engine, test_osm, manual_check, test_params=None): + vnfd_ids = [] + engine.set_test_name("ProjectDescriptors") + engine.get_autorization() + + project_admin_id = None + res = engine.test("Get my project Padmin", "GET", "/admin/v1/projects/{}".format(engine.project), headers_json, + None, 200, r_header_json, "json") + if res: + response = res.json() + project_admin_id = response["_id"] + engine.test("Create project Padmin", "POST", "/admin/v1/projects", headers_json, {"name": "Padmin", "admin": True}, (201, 204), {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json") - engine.test("PU3", "Create project bad format", "POST", "/admin/v1/projects", headers_json, {"name": 1}, 422, - r_header_json, "json") - engine.test("PU4", "Create user with bad project", "POST", "/admin/v1/users", headers_json, - {"username": "U1", "projects": ["P1", "P2", "Padmin"], "password": "pw1"}, 409, - r_header_json, "json") - engine.test("PU5", "Create user with bad project and force", "POST", "/admin/v1/users?FORCE=True", headers_json, - {"username": "U1", "projects": ["P1", "P2", "Padmin"], "password": "pw1"}, 201, - {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json") - engine.test("PU6", "Create user 2", "POST", "/admin/v1/users", headers_json, - {"username": "U2", "projects": ["P1"], "password": "pw2"}, 201, - {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json") - - engine.test("PU7", "Edit user U1, delete P2 project", "PATCH", "/admin/v1/users/U1", headers_json, - {"projects": {"$'P2'": None}}, 204, None, None) - res = engine.test("PU1", "Check user U1, contains the right projects", "GET", "/admin/v1/users/U1", - headers_json, None, 200, None, json) - u1 = res.json() - # print(u1) - expected_projects = ["P1", "Padmin"] - if u1["projects"] != expected_projects: - raise TestException("User content projects '{}' different than expected '{}'. Edition has not done" - " properly".format(u1["projects"], expected_projects)) - - engine.test("PU8", "Edit user U1, set Padmin as default project", "PUT", "/admin/v1/users/U1", headers_json, - {"projects": {"$'Padmin'": None, "$+[0]": "Padmin"}}, 204, None, None) - res = engine.test("PU1", "Check user U1, contains the right projects", "GET", "/admin/v1/users/U1", - headers_json, None, 200, None, json) - u1 = res.json() - # print(u1) - expected_projects = ["Padmin", "P1"] - if u1["projects"] != expected_projects: - raise TestException("User content projects '{}' different than expected '{}'. Edition has not done" - " properly".format(u1["projects"], expected_projects)) - - engine.test("PU9", "Edit user U1, change password", "PATCH", "/admin/v1/users/U1", headers_json, - {"password": "pw1_new"}, 204, None, None) - - engine.test("PU10", "Change to project P1 non existing", "POST", "/admin/v1/tokens/", headers_json, - {"project_id": "P1"}, 401, r_header_json, "json") - - res = engine.test("PU1", "Change to user U1 project P1", "POST", "/admin/v1/tokens", headers_json, - {"username": "U1", "password": "pw1_new", "project_id": "P1"}, (200, 201), + engine.test("Create project P2", "POST", "/admin/v1/projects", headers_json, {"name": "P2"}, + (201, 204), {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json") + engine.test("Create project P3", "POST", "/admin/v1/projects", headers_json, {"name": "P3"}, + (201, 204), {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json") + + engine.test("Create user U1", "POST", "/admin/v1/users", headers_json, + {"username": "U1", "password": "pw1", + "project_role_mappings": [{"project": "Padmin", "role": "system_admin"}, + {"project": "P2", "role": "project_admin"}, + {"project": "P3", "role": "project_admin"}], + }, 201, {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json") + + engine.test("Onboard VNFD id1", "POST", "/vnfpkgm/v1/vnf_packages_content?id=id1", headers_yaml, + TestDescriptors.vnfd_empty, 201, r_headers_yaml_location_vnfd, "yaml") + vnfd_ids.append(engine.last_id) + engine.test("Onboard VNFD id2 PUBLIC", "POST", "/vnfpkgm/v1/vnf_packages_content?id=id2&PUBLIC=TRUE", + headers_yaml, TestDescriptors.vnfd_empty, 201, r_headers_yaml_location_vnfd, "yaml") + vnfd_ids.append(engine.last_id) + engine.test("Onboard VNFD id3", "POST", "/vnfpkgm/v1/vnf_packages_content?id=id3&PUBLIC=FALSE", headers_yaml, + TestDescriptors.vnfd_empty, 201, r_headers_yaml_location_vnfd, "yaml") + vnfd_ids.append(engine.last_id) + + res = engine.test("Get VNFD descriptors", "GET", "/vnfpkgm/v1/vnf_packages?id=id1,id2,id3", + headers_json, None, 200, r_header_json, "json") + response = res.json() + if len(response) != 3: + logger.error("Only 3 vnfds should be present for project admin. {} listed".format(len(response))) + engine.failed_tests += 1 + + # Change to other project Padmin + res = engine.test("Change to user U1 project Padmin", "POST", "/admin/v1/tokens", headers_json, + {"username": "U1", "password": "pw1", "project_id": "Padmin"}, (200, 201), r_header_json, "json") + if res: + response = res.json() + engine.set_header({"Authorization": "Bearer {}".format(response["id"])}) + + # list vnfds + res = engine.test("List VNFD descriptors for Padmin", "GET", "/vnfpkgm/v1/vnf_packages", + headers_json, None, 200, r_header_json, "json") response = res.json() - engine.set_header({"Authorization": "Bearer {}".format(response["id"])}) - - engine.test("PU11", "Edit user projects non admin", "PUT", "/admin/v1/users/U1", headers_json, - {"projects": {"$'P1'": None}}, 401, r_header_json, "json") - engine.test("PU12", "Add new project non admin", "POST", "/admin/v1/projects", headers_json, - {"name": "P2"}, 401, r_header_json, "json") - engine.test("PU13", "Add new user non admin", "POST", "/admin/v1/users", headers_json, - {"username": "U3", "projects": ["P1"], "password": "pw3"}, 401, - r_header_json, "json") - - res = engine.test("PU14", "Change to user U1 project Padmin", "POST", "/admin/v1/tokens", headers_json, - {"project_id": "Padmin"}, (200, 201), r_header_json, "json") + if len(response) != 0: + logger.error("Only 0 vnfds should be present for project Padmin. {} listed".format(len(response))) + engine.failed_tests += 1 + + # list Public vnfds + res = engine.test("List VNFD public descriptors", "GET", "/vnfpkgm/v1/vnf_packages?PUBLIC=True", + headers_json, None, 200, r_header_json, "json") + response = res.json() + if len(response) != 1: + logger.error("Only 1 vnfds should be present for project Padmin. {} listed".format(len(response))) + engine.failed_tests += 1 + + # list vnfds belonging to project "admin" + res = engine.test("List VNFD of admin project", "GET", + "/vnfpkgm/v1/vnf_packages?ADMIN={}".format(project_admin_id), + headers_json, None, 200, r_header_json, "json") + if res: + response = res.json() + if len(response) != 3: + logger.error("Only 3 vnfds should be present for project Padmin. {} listed".format(len(response))) + engine.failed_tests += 1 + + # Get Public vnfds + engine.test("Get VNFD public descriptors", "GET", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_ids[1]), + headers_json, None, 200, r_header_json, "json") + # Edit not owned vnfd + engine.test("Edit VNFD ", "PATCH", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_ids[0]), + headers_yaml, '{name: pepe}', 404, r_header_yaml, "yaml") + + # Add to my catalog + engine.test("Add VNFD id2 to my catalog", "PATCH", "/vnfpkgm/v1/vnf_packages/{}?SET_PROJECT". + format(vnfd_ids[1]), headers_json, None, 204, None, 0) + + # Add a new vnfd + engine.test("Onboard VNFD id4", "POST", "/vnfpkgm/v1/vnf_packages_content?id=id4", headers_yaml, + TestDescriptors.vnfd_empty, 201, r_headers_yaml_location_vnfd, "yaml") + vnfd_ids.append(engine.last_id) + + # list vnfds + res = engine.test("List VNFD public descriptors", "GET", "/vnfpkgm/v1/vnf_packages", + headers_json, None, 200, r_header_json, "json") response = res.json() - engine.set_header({"Authorization": "Bearer {}".format(response["id"])}) + if len(response) != 2: + logger.error("Only 2 vnfds should be present for project Padmin. {} listed".format(len(response))) + engine.failed_tests += 1 - engine.test("PU15", "Add new project admin", "POST", "/admin/v1/projects", headers_json, {"name": "P2"}, - (201, 204), {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json") - engine.test("PU16", "Add new user U3 admin", "POST", "/admin/v1/users", - headers_json, {"username": "U3", "projects": ["P2"], "password": "pw3"}, (201, 204), - {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json") - engine.test("PU17", "Edit user projects admin", "PUT", "/admin/v1/users/U3", headers_json, - {"projects": ["P2"]}, 204, None, None) - - engine.test("PU18", "Delete project P2 conflict", "DELETE", "/admin/v1/projects/P2", headers_json, None, 409, - r_header_json, "json") - engine.test("PU19", "Delete project P2 forcing", "DELETE", "/admin/v1/projects/P2?FORCE=True", headers_json, - None, 204, None, None) - - engine.test("PU20", "Delete user U1. Conflict deleting own user", "DELETE", "/admin/v1/users/U1", headers_json, - None, 409, r_header_json, "json") - engine.test("PU21", "Delete user U2", "DELETE", "/admin/v1/users/U2", headers_json, None, 204, None, None) - engine.test("PU22", "Delete user U3", "DELETE", "/admin/v1/users/U3", headers_json, None, 204, None, None) - # change to admin + if manual_check: + input('VNFDs have been omboarded. Perform manual check and press enter to resume') + + test_rest.test("Delete VNFD id2", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_ids[1]), + headers_yaml, None, 204, None, 0) + + # change to admin project engine.remove_authorization() # To force get authorization engine.get_autorization() - engine.test("PU23", "Delete user U1", "DELETE", "/admin/v1/users/U1", headers_json, None, 204, None, None) - engine.test("PU24", "Delete project P1", "DELETE", "/admin/v1/projects/P1", headers_json, None, 204, None, None) - engine.test("PU25", "Delete project Padmin", "DELETE", "/admin/v1/projects/Padmin", headers_json, None, 204, - None, None) + test_rest.test("Delete VNFD id1", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_ids[0]), + headers_yaml, None, 204, None, 0) + test_rest.test("Delete VNFD id2", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_ids[1]), + headers_yaml, None, 204, None, 0) + test_rest.test("Delete VNFD id3", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_ids[2]), + headers_yaml, None, 204, None, 0) + test_rest.test("Delete VNFD id4", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_ids[3]), + headers_yaml, None, 404, r_header_yaml, "yaml") + test_rest.test("Delete VNFD id4", "DELETE", "/vnfpkgm/v1/vnf_packages/{}?ADMIN".format(vnfd_ids[3]), + headers_yaml, None, 204, None, 0) + # Get Public vnfds + engine.test("Get VNFD deleted id1", "GET", "/vnfpkgm/v1/vnf_packages/{}?ADMIN".format(vnfd_ids[0]), + headers_json, None, 404, r_header_json, "json") + engine.test("Get VNFD deleted id2", "GET", "/vnfpkgm/v1/vnf_packages/{}?ADMIN".format(vnfd_ids[1]), + headers_json, None, 404, r_header_json, "json") + engine.test("Get VNFD deleted id3", "GET", "/vnfpkgm/v1/vnf_packages/{}?ADMIN".format(vnfd_ids[2]), + headers_json, None, 404, r_header_json, "json") + engine.test("Get VNFD deleted id4", "GET", "/vnfpkgm/v1/vnf_packages/{}?ADMIN".format(vnfd_ids[3]), + headers_json, None, 404, r_header_json, "json") + + engine.test("Delete user U1", "DELETE", "/admin/v1/users/U1", headers_json, None, 204, None, None) + engine.test("Delete project Padmin", "DELETE", "/admin/v1/projects/Padmin", headers_json, None, 204, None, None) + engine.test("Delete project P2", "DELETE", "/admin/v1/projects/P2", headers_json, None, 204, None, None) + engine.test("Delete project P3", "DELETE", "/admin/v1/projects/P3", headers_json, None, 204, None, None) class TestFakeVim: @@ -470,38 +957,30 @@ class TestFakeVim: vim_bad = self.vim.copy() vim_bad.pop("name") + engine.set_test_name("FakeVim") engine.get_autorization() - engine.test("FVIM1", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, self.vim, (201, 204), + engine.test("Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, self.vim, (201, 202), {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/json"}, "json") - engine.test("FVIM2", "Create VIM without name, bad schema", "POST", "/admin/v1/vim_accounts", headers_json, + vim_id = engine.last_id + engine.test("Create VIM without name, bad schema", "POST", "/admin/v1/vim_accounts", headers_json, vim_bad, 422, None, headers_json) - engine.test("FVIM3", "Create VIM name repeated", "POST", "/admin/v1/vim_accounts", headers_json, self.vim, + engine.test("Create VIM name repeated", "POST", "/admin/v1/vim_accounts", headers_json, self.vim, 409, None, headers_json) - engine.test("FVIM4", "Show VIMs", "GET", "/admin/v1/vim_accounts", headers_yaml, None, 200, r_header_yaml, + engine.test("Show VIMs", "GET", "/admin/v1/vim_accounts", headers_yaml, None, 200, r_header_yaml, "yaml") - engine.test("FVIM5", "Show VIM", "GET", "/admin/v1/vim_accounts/", headers_yaml, None, 200, + engine.test("Show VIM", "GET", "/admin/v1/vim_accounts/{}".format(vim_id), headers_yaml, None, 200, r_header_yaml, "yaml") if not test_osm: # delete with FORCE - engine.test("FVIM6", "Delete VIM", "DELETE", "/admin/v1/vim_accounts/?FORCE=True", headers_yaml, + engine.test("Delete VIM", "DELETE", "/admin/v1/vim_accounts/{}?FORCE=True".format(vim_id), headers_yaml, None, 202, None, 0) - engine.test("FVIM7", "Check VIM is deleted", "GET", "/admin/v1/vim_accounts/", headers_yaml, None, + engine.test("Check VIM is deleted", "GET", "/admin/v1/vim_accounts/{}".format(vim_id), headers_yaml, None, 404, r_header_yaml, "yaml") else: # delete and wait until is really deleted - engine.test("FVIM6", "Delete VIM", "DELETE", "/admin/v1/vim_accounts/", headers_yaml, None, 202, + engine.test("Delete VIM", "DELETE", "/admin/v1/vim_accounts/{}".format(vim_id), headers_yaml, None, 202, None, 0) - wait = timeout - while wait >= 0: - r = engine.test("FVIM7", "Check VIM is deleted", "GET", "/admin/v1/vim_accounts/", headers_yaml, - None, None, r_header_yaml, "yaml") - if r.status_code == 404: - break - elif r.status_code == 200: - wait -= 5 - sleep(5) - else: - raise TestException("Vim created at 'FVIM1' is not delete after {} seconds".format(timeout)) + engine.wait_until_delete("/admin/v1/vim_accounts/{}".format(vim_id), timeout) class TestVIMSDN(TestFakeVim): @@ -509,98 +988,105 @@ class TestVIMSDN(TestFakeVim): def __init__(self): TestFakeVim.__init__(self) + self.wim = { + "schema_version": "1.0", + "schema_type": "No idea", + "name": "myWim", + "description": "Descriptor name", + "wim_type": "odl", + "wim_url": "http://localhost:/wim", + "user": "user", + "password": "password", + "config": {"config_param": 1} + } def run(self, engine, test_osm, manual_check, test_params=None): + engine.set_test_name("VimSdn") engine.get_autorization() # Added SDN - engine.test("VIMSDN1", "Create SDN", "POST", "/admin/v1/sdns", headers_json, self.sdn, (201, 204), + engine.test("Create SDN", "POST", "/admin/v1/sdns", headers_json, self.sdn, (201, 202), {"Location": "/admin/v1/sdns/", "Content-Type": "application/json"}, "json") + sdnc_id = engine.last_id # sleep(5) # Edit SDN - engine.test("VIMSDN2", "Edit SDN", "PATCH", "/admin/v1/sdns/", headers_json, {"name": "new_sdn_name"}, - 204, None, None) + engine.test("Edit SDN", "PATCH", "/admin/v1/sdns/{}".format(sdnc_id), headers_json, {"name": "new_sdn_name"}, + (202, 204), None, None) # sleep(5) # VIM with SDN - self.vim["config"]["sdn-controller"] = engine.test_ids["VIMSDN1"] + self.vim["config"]["sdn-controller"] = sdnc_id self.vim["config"]["sdn-port-mapping"] = self.port_mapping - engine.test("VIMSDN3", "Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, self.vim, (200, 204, 201), + engine.test("Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, self.vim, (200, 202, 201), {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/json"}, "json"), + vim_id = engine.last_id self.port_mapping[0]["compute_node"] = "compute node XX" - engine.test("VIMSDN4", "Edit VIM change port-mapping", "PUT", "/admin/v1/vim_accounts/", headers_json, - {"config": {"sdn-port-mapping": self.port_mapping}}, 204, None, None) - engine.test("VIMSDN5", "Edit VIM remove port-mapping", "PUT", "/admin/v1/vim_accounts/", headers_json, - {"config": {"sdn-port-mapping": None}}, 204, None, None) + engine.test("Edit VIM change port-mapping", "PUT", "/admin/v1/vim_accounts/{}".format(vim_id), headers_json, + {"config": {"sdn-port-mapping": self.port_mapping}}, (202, 204), None, None) + engine.test("Edit VIM remove port-mapping", "PUT", "/admin/v1/vim_accounts/{}".format(vim_id), headers_json, + {"config": {"sdn-port-mapping": None}}, (202, 204), None, None) + + engine.test("Create WIM", "POST", "/admin/v1/wim_accounts", headers_json, self.wim, (200, 202, 201), + {"Location": "/admin/v1/wim_accounts/", "Content-Type": "application/json"}, "json"), + wim_id = engine.last_id if not test_osm: # delete with FORCE - engine.test("VIMSDN6", "Delete VIM remove port-mapping", "DELETE", - "/admin/v1/vim_accounts/?FORCE=True", headers_json, None, 202, None, 0) - engine.test("VIMSDN7", "Delete SDNC", "DELETE", "/admin/v1/sdns/?FORCE=True", headers_json, None, + engine.test("Delete VIM remove port-mapping", "DELETE", + "/admin/v1/vim_accounts/{}?FORCE=True".format(vim_id), headers_json, None, 202, None, 0) + engine.test("Delete SDNC", "DELETE", "/admin/v1/sdns/{}?FORCE=True".format(sdnc_id), headers_json, None, 202, None, 0) - engine.test("VIMSDN8", "Check VIM is deleted", "GET", "/admin/v1/vim_accounts/", headers_yaml, + engine.test("Delete WIM", "DELETE", + "/admin/v1/wim_accounts/{}?FORCE=True".format(wim_id), headers_json, None, 202, None, 0) + engine.test("Check VIM is deleted", "GET", "/admin/v1/vim_accounts/{}".format(vim_id), headers_yaml, None, 404, r_header_yaml, "yaml") - engine.test("VIMSDN9", "Check SDN is deleted", "GET", "/admin/v1/sdns/", headers_yaml, None, + engine.test("Check SDN is deleted", "GET", "/admin/v1/sdns/{}".format(sdnc_id), headers_yaml, None, 404, r_header_yaml, "yaml") + engine.test("Check WIM is deleted", "GET", "/admin/v1/wim_accounts/{}".format(wim_id), headers_yaml, + None, 404, r_header_yaml, "yaml") else: + if manual_check: + input('VIM, SDN, WIM has been deployed. Perform manual check and press enter to resume') # delete and wait until is really deleted - engine.test("VIMSDN6", "Delete VIM remove port-mapping", "DELETE", "/admin/v1/vim_accounts/", + engine.test("Delete VIM remove port-mapping", "DELETE", "/admin/v1/vim_accounts/{}".format(vim_id), headers_json, None, (202, 201, 204), None, 0) - engine.test("VIMSDN7", "Delete SDN", "DELETE", "/admin/v1/sdns/", headers_json, None, + engine.test("Delete SDN", "DELETE", "/admin/v1/sdns/{}".format(sdnc_id), headers_json, None, (202, 201, 204), None, 0) - wait = timeout - while wait >= 0: - r = engine.test("VIMSDN8", "Check VIM is deleted", "GET", "/admin/v1/vim_accounts/", - headers_yaml, None, None, r_header_yaml, "yaml") - if r.status_code == 404: - break - elif r.status_code == 200: - wait -= 5 - sleep(5) - else: - raise TestException("Vim created at 'VIMSDN3' is not delete after {} seconds".format(timeout)) - while wait >= 0: - r = engine.test("VIMSDN9", "Check SDNC is deleted", "GET", "/admin/v1/sdns/", - headers_yaml, None, None, r_header_yaml, "yaml") - if r.status_code == 404: - break - elif r.status_code == 200: - wait -= 5 - sleep(5) - else: - raise TestException("SDNC created at 'VIMSDN1' is not delete after {} seconds".format(timeout)) + engine.test("Delete VIM", "DELETE", "/admin/v1/wim_accounts/{}".format(wim_id), + headers_json, None, (202, 201, 204), None, 0) + engine.wait_until_delete("/admin/v1/vim_accounts/{}".format(vim_id), timeout) + engine.wait_until_delete("/admin/v1/sdns/{}".format(sdnc_id), timeout) + engine.wait_until_delete("/admin/v1/wim_accounts/{}".format(wim_id), timeout) class TestDeploy: description = "Base class for downloading descriptors from ETSI, onboard and deploy in real VIM" def __init__(self): - self.step = 0 + self.test_name = "DEPLOY" self.nsd_id = None self.vim_id = None - self.nsd_test = None - self.ns_test = None self.ns_id = None - self.vnfds_test = [] self.vnfds_id = [] self.descriptor_url = "https://osm-download.etsi.org/ftp/osm-3.0-three/2nd-hackfest/packages/" self.vnfd_filenames = ("cirros_vnf.tar.gz",) self.nsd_filename = "cirros_2vnf_ns.tar.gz" + self.descriptor_edit = None self.uses_configuration = False - self.uss = {} - self.passwds = {} - self.cmds = {} + self.users = {} + self.passwords = {} + self.commands = {} self.keys = {} self.timeout = 120 - self.passed_tests = 0 - self.total_tests = 0 + self.qforce = "" + self.ns_params = None + self.vnfr_ip_list = {} def create_descriptors(self, engine): temp_dir = os.path.dirname(os.path.abspath(__file__)) + "/temp/" if not os.path.exists(temp_dir): os.makedirs(temp_dir) - for vnfd_filename in self.vnfd_filenames: + for vnfd_index, vnfd_filename in enumerate(self.vnfd_filenames): if "/" in vnfd_filename: vnfd_filename_path = vnfd_filename if not os.path.exists(vnfd_filename_path): @@ -618,30 +1104,29 @@ class TestDeploy: headers = headers_yaml else: headers = headers_zip_yaml - if self.step % 2 == 0: + if randint(0, 1) == 0: # vnfd CREATE AND UPLOAD in one step: - test_name = "DEPLOY{}".format(self.step) - engine.test(test_name, "Onboard VNFD in one step", "POST", - "/vnfpkgm/v1/vnf_packages_content", headers, "@b" + vnfd_filename_path, 201, - {"Location": "/vnfpkgm/v1/vnf_packages_content/", "Content-Type": "application/yaml"}, yaml) - self.vnfds_test.append(test_name) - self.vnfds_id.append(engine.test_ids[test_name]) - self.step += 1 + engine.test("Onboard VNFD in one step", "POST", + "/vnfpkgm/v1/vnf_packages_content" + self.qforce, headers, "@b" + vnfd_filename_path, 201, + r_headers_yaml_location_vnfd, + "yaml") + self.vnfds_id.append(engine.last_id) else: # vnfd CREATE AND UPLOAD ZIP - test_name = "DEPLOY{}".format(self.step) - engine.test(test_name, "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", + engine.test("Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json, None, 201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json") - self.vnfds_test.append(test_name) - self.vnfds_id.append(engine.test_ids[test_name]) - self.step += 1 - # location = r.headers["Location"] - # vnfd_id = location[location.rfind("/")+1:] - engine.test("DEPLOY{}".format(self.step), "Onboard VNFD step 2 as ZIP", "PUT", - "/vnfpkgm/v1/vnf_packages/<>/package_content", + self.vnfds_id.append(engine.last_id) + engine.test("Onboard VNFD step 2 as ZIP", "PUT", + "/vnfpkgm/v1/vnf_packages/<>/package_content" + self.qforce, headers, "@b" + vnfd_filename_path, 204, None, 0) - self.step += 2 + + if self.descriptor_edit: + if "vnfd{}".format(vnfd_index) in self.descriptor_edit: + # Modify VNFD + engine.test("Edit VNFD ", "PATCH", + "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfds_id[-1]), + headers_yaml, self.descriptor_edit["vnfd{}".format(vnfd_index)], 204, None, None) if "/" in self.nsd_filename: nsd_filename_path = self.nsd_filename @@ -661,172 +1146,140 @@ class TestDeploy: else: headers = headers_zip_yaml - self.nsd_test = "DEPLOY" + str(self.step) - if self.step % 2 == 0: + if randint(0, 1) == 0: # nsd CREATE AND UPLOAD in one step: - engine.test("DEPLOY{}".format(self.step), "Onboard NSD in one step", "POST", - "/nsd/v1/ns_descriptors_content", headers, "@b" + nsd_filename_path, 201, - {"Location": "/nsd/v1/ns_descriptors_content/", "Content-Type": "application/yaml"}, yaml) - self.step += 1 + engine.test("Onboard NSD in one step", "POST", + "/nsd/v1/ns_descriptors_content" + self.qforce, headers, "@b" + nsd_filename_path, 201, + r_headers_yaml_location_nsd, yaml) + self.nsd_id = engine.last_id else: # nsd CREATE AND UPLOAD ZIP - engine.test("DEPLOY{}".format(self.step), "Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", + engine.test("Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json, None, 201, {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json") - self.step += 1 - # location = r.headers["Location"] - # vnfd_id = location[location.rfind("/")+1:] - engine.test("DEPLOY{}".format(self.step), "Onboard NSD step 2 as ZIP", "PUT", - "/nsd/v1/ns_descriptors/<>/nsd_content", + self.nsd_id = engine.last_id + engine.test("Onboard NSD step 2 as ZIP", "PUT", + "/nsd/v1/ns_descriptors/<>/nsd_content" + self.qforce, headers, "@b" + nsd_filename_path, 204, None, 0) - self.step += 2 - self.nsd_id = engine.test_ids[self.nsd_test] + + if self.descriptor_edit and "nsd" in self.descriptor_edit: + # Modify NSD + engine.test("Edit NSD ", "PATCH", + "/nsd/v1/ns_descriptors/{}".format(self.nsd_id), + headers_yaml, self.descriptor_edit["nsd"], 204, None, None) def delete_descriptors(self, engine): # delete descriptors - engine.test("DEPLOY{}".format(self.step), "Delete NSSD SOL005", "DELETE", - "/nsd/v1/ns_descriptors/<{}>".format(self.nsd_test), + engine.test("Delete NSSD SOL005", "DELETE", + "/nsd/v1/ns_descriptors/{}".format(self.nsd_id), headers_yaml, None, 204, None, 0) - self.step += 1 - for vnfd_test in self.vnfds_test: - engine.test("DEPLOY{}".format(self.step), "Delete VNFD SOL005", "DELETE", - "/vnfpkgm/v1/vnf_packages/<{}>".format(vnfd_test), headers_yaml, None, 204, None, 0) - self.step += 1 + for vnfd_id in self.vnfds_id: + engine.test("Delete VNFD SOL005", "DELETE", + "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id), headers_yaml, None, 204, None, 0) def instantiate(self, engine, ns_data): ns_data_text = yaml.safe_dump(ns_data, default_flow_style=True, width=256) # create NS Two steps - r = engine.test("DEPLOY{}".format(self.step), "Create NS step 1", "POST", "/nslcm/v1/ns_instances", - headers_yaml, ns_data_text, 201, + r = engine.test("Create NS step 1", "POST", "/nslcm/v1/ns_instances", + headers_yaml, ns_data_text, (201, 202), {"Location": "nslcm/v1/ns_instances/", "Content-Type": "application/yaml"}, "yaml") - self.ns_test = "DEPLOY{}".format(self.step) - self.ns_id = engine.test_ids[self.ns_test] - engine.test_ids[self.ns_test] - self.step += 1 - r = engine.test("DEPLOY{}".format(self.step), "Instantiate NS step 2", "POST", - "/nslcm/v1/ns_instances/<{}>/instantiate".format(self.ns_test), headers_yaml, ns_data_text, - 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml") - nslcmop_test = "DEPLOY{}".format(self.step) - self.step += 1 + if not r: + return + self.ns_id = engine.last_id + engine.test("Instantiate NS step 2", "POST", + "/nslcm/v1/ns_instances/{}/instantiate".format(self.ns_id), headers_yaml, ns_data_text, + (201, 202), r_headers_yaml_location_nslcmop, "yaml") + nslcmop_id = engine.last_id if test_osm: # Wait until status is Ok - wait = timeout_configure if self.uses_configuration else timeout_deploy - while wait >= 0: - r = engine.test("DEPLOY{}".format(self.step), "Wait until NS is deployed and configured", "GET", - "/nslcm/v1/ns_lcm_op_occs/<{}>".format(nslcmop_test), headers_json, None, - 200, r_header_json, "json") - nslcmop = r.json() - if "COMPLETED" in nslcmop["operationState"]: - break - elif "FAILED" in nslcmop["operationState"]: - raise TestException("NS instantiate has failed: {}".format(nslcmop["detailed-status"])) - wait -= 5 - sleep(5) - else: - raise TestException("NS instantiate is not done after {} seconds".format(timeout_deploy)) - self.step += 1 - - def _wait_nslcmop_ready(self, engine, nslcmop_test, timeout_deploy, expected_fail=False): - wait = timeout - while wait >= 0: - r = engine.test("DEPLOY{}".format(self.step), "Wait to ns lcm operation complete", "GET", - "/nslcm/v1/ns_lcm_op_occs/<{}>".format(nslcmop_test), headers_json, None, - 200, r_header_json, "json") - nslcmop = r.json() - if "COMPLETED" in nslcmop["operationState"]: - if expected_fail: - raise TestException("NS terminate has success, expecting failing: {}".format( - nslcmop["detailed-status"])) - break - elif "FAILED" in nslcmop["operationState"]: - if not expected_fail: - raise TestException("NS terminate has failed: {}".format(nslcmop["detailed-status"])) - break - wait -= 5 - sleep(5) - else: - raise TestException("NS instantiate is not terminate after {} seconds".format(timeout)) + timeout = timeout_configure if self.uses_configuration else timeout_deploy + engine.wait_operation_ready("ns", nslcmop_id, timeout) def terminate(self, engine): # remove deployment if test_osm: - r = engine.test("DEPLOY{}".format(self.step), "Terminate NS", "POST", - "/nslcm/v1/ns_instances/<{}>/terminate".format(self.ns_test), headers_yaml, None, - 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml") - nslcmop2_test = "DEPLOY{}".format(self.step) - self.step += 1 + engine.test("Terminate NS", "POST", "/nslcm/v1/ns_instances/{}/terminate".format(self.ns_id), headers_yaml, + None, (201, 202), r_headers_yaml_location_nslcmop, "yaml") + nslcmop2_id = engine.last_id # Wait until status is Ok - self._wait_nslcmop_ready(engine, nslcmop2_test, timeout_deploy) + engine.wait_operation_ready("ns", nslcmop2_id, timeout_deploy) - r = engine.test("DEPLOY{}".format(self.step), "Delete NS", "DELETE", - "/nslcm/v1/ns_instances/<{}>".format(self.ns_test), headers_yaml, None, - 204, None, 0) - self.step += 1 + engine.test("Delete NS", "DELETE", "/nslcm/v1/ns_instances/{}".format(self.ns_id), headers_yaml, None, + 204, None, 0) else: - r = engine.test("DEPLOY{}".format(self.step), "Delete NS with FORCE", "DELETE", - "/nslcm/v1/ns_instances/<{}>?FORCE=True".format(self.ns_test), headers_yaml, None, - 204, None, 0) - self.step += 1 + engine.test("Delete NS with FORCE", "DELETE", "/nslcm/v1/ns_instances/{}?FORCE=True".format(self.ns_id), + headers_yaml, None, 204, None, 0) # check all it is deleted - r = engine.test("DEPLOY{}".format(self.step), "Check NS is deleted", "GET", - "/nslcm/v1/ns_instances/<{}>".format(self.ns_test), headers_yaml, None, - 404, None, "yaml") - self.step += 1 - r = engine.test("DEPLOY{}".format(self.step), "Check NSLCMOPs are deleted", "GET", - "/nslcm/v1/ns_lcm_op_occs?nsInstanceId=<{}>".format(self.ns_test), headers_json, None, + engine.test("Check NS is deleted", "GET", "/nslcm/v1/ns_instances/{}".format(self.ns_id), headers_yaml, None, + 404, None, "yaml") + r = engine.test("Check NSLCMOPs are deleted", "GET", + "/nslcm/v1/ns_lcm_op_occs?nsInstanceId={}".format(self.ns_id), headers_json, None, 200, None, "json") + if not r: + return nslcmops = r.json() if not isinstance(nslcmops, list) or nslcmops: - raise TestException("NS {} deleted but with ns_lcm_op_occ active: {}".format(self.ns_test, nslcmops)) + raise TestException("NS {} deleted but with ns_lcm_op_occ active: {}".format(self.ns_id, nslcmops)) def test_ns(self, engine, test_osm, commands=None, users=None, passwds=None, keys=None, timeout=0): - n = 0 - r = engine.test("TEST_NS{}".format(n), "GET VNFR_IDs", "GET", + r = engine.test("GET VNFR IDs", "GET", "/nslcm/v1/ns_instances/{}".format(self.ns_id), headers_json, None, 200, r_header_json, "json") - n += 1 + if not r: + return ns_data = r.json() vnfr_list = ns_data['constituent-vnfr-ref'] time = 0 + _commands = commands if commands is not None else self.commands + _users = users if users is not None else self.users + _passwds = passwds if passwds is not None else self.passwords + _keys = keys if keys is not None else self.keys + _timeout = timeout if timeout != 0 else self.timeout + # vnfr_list=[d8272263-6bd3-4680-84ca-6a4be23b3f2d, 88b22e2f-994a-4b61-94fd-4a3c90de3dc4] for vnfr_id in vnfr_list: - self.total_tests += 1 - r = engine.test("TEST_NS{}".format(n), "GET IP_ADDRESS OF VNFR", "GET", + r = engine.test("Get VNFR to get IP_ADDRESS", "GET", "/nslcm/v1/vnfrs/{}".format(vnfr_id), headers_json, None, 200, r_header_json, "json") - n += 1 + if not r: + continue vnfr_data = r.json() - if vnfr_data.get("ip-address"): - name = "TEST_NS{}".format(n) - description = "Run tests in VNFR with IP {}".format(vnfr_data['ip-address']) - n += 1 - test_description = "Test {} {}".format(name, description) - logger.warning(test_description) - vnf_index = str(vnfr_data["member-vnf-index-ref"]) - while timeout >= time: - result, message = self.do_checks([vnfr_data["ip-address"]], - vnf_index=vnfr_data["member-vnf-index-ref"], - commands=commands.get(vnf_index), user=users.get(vnf_index), - passwd=passwds.get(vnf_index), key=keys.get(vnf_index)) - if result == 1: - logger.warning(message) - break - elif result == 0: - time += 20 - sleep(20) - elif result == -1: - logger.critical(message) - break + vnf_index = str(vnfr_data["member-vnf-index-ref"]) + + ip_address = self.get_vnfr_ip(engine, vnf_index) + description = "Exec command='{}' at VNFR={} IP={}".format(_commands.get(vnf_index)[0], vnf_index, + ip_address) + engine.step += 1 + test_description = "{}{} {}".format(engine.test_name, engine.step, description) + logger.warning(test_description) + while _timeout >= time: + result, message = self.do_checks([ip_address], + vnf_index=vnfr_data["member-vnf-index-ref"], + commands=_commands.get(vnf_index), user=_users.get(vnf_index), + passwd=_passwds.get(vnf_index), key=_keys.get(vnf_index)) + if result == 1: + engine.passed_tests += 1 + logger.debug(message) + break + elif result == 0: + time += 20 + sleep(20) + elif result == -1: + engine.failed_tests += 1 + logger.error(message) + break else: time -= 20 - logger.critical(message) + engine.failed_tests += 1 + logger.error(message) else: - logger.critical("VNFR {} has not mgmt address. Check failed".format(vnfr_id)) + engine.failed_tests += 1 + logger.error("VNFR {} has not mgmt address. Check failed".format(vnf_index)) def do_checks(self, ip, vnf_index, commands=[], user=None, passwd=None, key=None): try: @@ -835,8 +1288,9 @@ class TestDeploy: from pssh.utils import load_private_key from ssh2 import exceptions as ssh2Exception except ImportError as e: - logger.critical("package or/and is not installed. Please add it with 'pip3 install " - "parallel-ssh' and/or 'pip3 install urllib3': {}".format(e)) + logger.critical("Package or/and is not installed. Please add them with 'pip3 install " + "parallel-ssh urllib3': {}".format(e)) + return -1, "install needed packages 'pip3 install parallel-ssh urllib3'" urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) try: p_host = os.environ.get("PROXY_HOST") @@ -854,20 +1308,21 @@ class TestDeploy: output = client.run_command(cmd) client.join(output) if output[ip[0]].exit_code: - return -1, " VNFR {} could not be checked: {}".format(ip[0], output[ip[0]].stderr) + return -1, "VNFR {} command '{}' returns error: '{}'".format(ip[0], cmd, + "\n".join(output[ip[0]].stderr)) else: - self.passed_tests += 1 - return 1, " Test successful" + return 1, "VNFR {} command '{}' successful".format(ip[0], cmd) except (ssh2Exception.ChannelFailure, ssh2Exception.SocketDisconnectError, ssh2Exception.SocketTimeout, ssh2Exception.SocketRecvError) as e: return 0, "Timeout accessing the VNFR {}: {}".format(ip[0], str(e)) except Exception as e: return -1, "ERROR checking the VNFR {}: {}".format(ip[0], str(e)) - def aditional_operations(self, engine, test_osm, manual_check): + def additional_operations(self, engine, test_osm, manual_check): pass def run(self, engine, test_osm, manual_check, test_params=None): + engine.set_test_name(self.test_name) engine.get_autorization() nsname = os.environ.get("OSMNBITEST_NS_NAME", "OSMNBITEST") if test_params: @@ -883,26 +1338,48 @@ class TestDeploy: self.vim_id = engine.get_create_vim(test_osm) ns_data = {"nsDescription": "default description", "nsName": nsname, "nsdId": self.nsd_id, "vimAccountId": self.vim_id} + if self.ns_params: + ns_data.update(self.ns_params) if test_params and test_params.get("ns-config"): if isinstance(test_params["ns-config"], str): - ns_data.update(yaml.load(test_params["ns-config"])) + ns_data.update(yaml.load(test_params["ns-config"]), Loader=yaml.Loader) else: ns_data.update(test_params["ns-config"]) self.instantiate(engine, ns_data) if manual_check: input('NS has been deployed. Perform manual check and press enter to resume') - else: - self.test_ns(engine, test_osm, self.cmds, self.uss, self.pss, self.keys, self.timeout) - self.aditional_operations(engine, test_osm, manual_check) + if test_osm and self.commands: + self.test_ns(engine, test_osm) + self.additional_operations(engine, test_osm, manual_check) self.terminate(engine) self.delete_descriptors(engine) - self.print_results() - def print_results(self): - print("\n\n\n--------------------------------------------") - print("TEST RESULTS:\n PASSED TESTS: {} - TOTAL TESTS: {}".format(self.total_tests, self.passed_tests)) - print("--------------------------------------------") + def get_first_ip(self, ip_string): + # When using a floating IP, the vnfr_data['ip-address'] contains a semicolon-separated list of IP:s. + first_ip = ip_string.split(";")[0] if ip_string else "" + return first_ip + + def get_vnfr_ip(self, engine, vnfr_index_wanted): + # If the IP address list has been obtained before, it has been stored in 'vnfr_ip_list' + ip = self.vnfr_ip_list.get(vnfr_index_wanted, "") + if (ip): + return self.get_first_ip(ip) + r = engine.test("Get VNFR to get IP_ADDRESS", "GET", + "/nslcm/v1/vnfrs?member-vnf-index-ref={}&nsr-id-ref={}".format( + vnfr_index_wanted, self.ns_id), headers_json, None, + 200, r_header_json, "json") + if not r: + return "" + vnfr_data = r.json() + if not (vnfr_data and vnfr_data[0]): + return "" + # Store the IP (or list of IPs) in 'vnfr_ip_list' + ip_list = vnfr_data[0].get("ip-address", "") + if ip_list: + self.vnfr_ip_list[vnfr_index_wanted] = ip_list + ip = self.get_first_ip(ip_list) + return ip class TestDeployHackfestCirros(TestDeploy): @@ -910,11 +1387,35 @@ class TestDeployHackfestCirros(TestDeploy): def __init__(self): super().__init__() + self.test_name = "CIRROS" self.vnfd_filenames = ("cirros_vnf.tar.gz",) self.nsd_filename = "cirros_2vnf_ns.tar.gz" - self.cmds = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]} - self.uss = {'1': "cirros", '2': "cirros"} - self.pss = {'1': "cubswin:)", '2': "cubswin:)"} + self.commands = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]} + self.users = {'1': "cirros", '2': "cirros"} + self.passwords = {'1': "cubswin:)", '2': "cubswin:)"} + + def terminate(self, engine): + # Make a delete in one step, overriding the normal two step of TestDeploy that launched terminate and delete + if test_osm: + engine.test("Terminate and delete NS in one step", "DELETE", "/nslcm/v1/ns_instances_content/{}". + format(self.ns_id), headers_yaml, None, 202, None, "yaml") + + engine .wait_until_delete("/nslcm/v1/ns_instances/{}".format(self.ns_id), timeout_deploy) + else: + engine.test("Delete NS with FORCE", "DELETE", "/nslcm/v1/ns_instances/{}?FORCE=True".format(self.ns_id), + headers_yaml, None, 204, None, 0) + + # check all it is deleted + engine.test("Check NS is deleted", "GET", "/nslcm/v1/ns_instances/{}".format(self.ns_id), headers_yaml, None, + 404, None, "yaml") + r = engine.test("Check NSLCMOPs are deleted", "GET", + "/nslcm/v1/ns_lcm_op_occs?nsInstanceId={}".format(self.ns_id), headers_json, None, + 200, None, "json") + if not r: + return + nslcmops = r.json() + if not isinstance(nslcmops, list) or nslcmops: + raise TestException("NS {} deleted but with ns_lcm_op_occ active: {}".format(self.ns_id, nslcmops)) class TestDeployHackfest1(TestDeploy): @@ -922,11 +1423,12 @@ class TestDeployHackfest1(TestDeploy): def __init__(self): super().__init__() + self.test_name = "HACKFEST1-" self.vnfd_filenames = ("hackfest_1_vnfd.tar.gz",) self.nsd_filename = "hackfest_1_nsd.tar.gz" - # self.cmds = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]} - # self.uss = {'1': "cirros", '2': "cirros"} - # self.pss = {'1': "cubswin:)", '2': "cubswin:)"} + # self.commands = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]} + # self.users = {'1': "cirros", '2': "cirros"} + # self.passwords = {'1': "cubswin:)", '2': "cubswin:)"} class TestDeployHackfestCirrosScaling(TestDeploy): @@ -934,40 +1436,38 @@ class TestDeployHackfestCirrosScaling(TestDeploy): def __init__(self): super().__init__() + self.test_name = "CIRROS-SCALE" self.vnfd_filenames = ("cirros_vnf.tar.gz",) self.nsd_filename = "cirros_2vnf_ns.tar.gz" - - def create_descriptors(self, engine): - super().create_descriptors(engine) # Modify VNFD to add scaling and count=2 - payload = """ - vdu: - "$id: 'cirros_vnfd-VM'": - count: 2 - scaling-group-descriptor: - - name: "scale_cirros" - max-instance-count: 2 - vdu: - - vdu-id-ref: cirros_vnfd-VM - count: 2 - """ - engine.test("DEPLOY{}".format(self.step), "Edit VNFD ", "PATCH", - "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfds_id[0]), - headers_yaml, payload, 204, None, None) - self.step += 1 + self.descriptor_edit = { + "vnfd0": { + "vdu": { + "$id: 'cirros_vnfd-VM'": {"count": 2} + }, + "scaling-group-descriptor": [{ + "name": "scale_cirros", + "max-instance-count": 2, + "vdu": [{ + "vdu-id-ref": "cirros_vnfd-VM", + "count": 2 + }] + }] + } + } - def aditional_operations(self, engine, test_osm, manual_check): + def additional_operations(self, engine, test_osm, manual_check): if not test_osm: return # 2 perform scale out twice payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_OUT, scaleByStepData: ' \ '{scaling-group-descriptor: scale_cirros, member-vnf-index: "1"}}}' for i in range(0, 2): - engine.test("DEPLOY{}".format(self.step), "Execute scale action over NS", "POST", - "/nslcm/v1/ns_instances/<{}>/scale".format(self.ns_test), headers_yaml, payload, - 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml") - nslcmop2_scale_out = "DEPLOY{}".format(self.step) - self._wait_nslcmop_ready(engine, nslcmop2_scale_out, timeout_deploy) + engine.test("Execute scale action over NS", "POST", + "/nslcm/v1/ns_instances/{}/scale".format(self.ns_id), headers_yaml, payload, + (201, 202), r_headers_yaml_location_nslcmop, "yaml") + nslcmop2_scale_out = engine.last_id + engine.wait_operation_ready("ns", nslcmop2_scale_out, timeout_deploy) if manual_check: input('NS scale out done. Check that two more vdus are there') # TODO check automatic @@ -976,21 +1476,21 @@ class TestDeployHackfestCirrosScaling(TestDeploy): payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_IN, scaleByStepData: ' \ '{scaling-group-descriptor: scale_cirros, member-vnf-index: "1"}}}' for i in range(0, 2): - engine.test("DEPLOY{}".format(self.step), "Execute scale IN action over NS", "POST", - "/nslcm/v1/ns_instances/<{}>/scale".format(self.ns_test), headers_yaml, payload, - 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml") - nslcmop2_scale_in = "DEPLOY{}".format(self.step) - self._wait_nslcmop_ready(engine, nslcmop2_scale_in, timeout_deploy) + engine.test("Execute scale IN action over NS", "POST", + "/nslcm/v1/ns_instances/{}/scale".format(self.ns_id), headers_yaml, payload, + (201, 202), r_headers_yaml_location_nslcmop, "yaml") + nslcmop2_scale_in = engine.last_id + engine.wait_operation_ready("ns", nslcmop2_scale_in, timeout_deploy) if manual_check: input('NS scale in done. Check that two less vdus are there') # TODO check automatic # perform scale in that must fail as reached limit - engine.test("DEPLOY{}".format(self.step), "Execute scale IN out of limit action over NS", "POST", - "/nslcm/v1/ns_instances/<{}>/scale".format(self.ns_test), headers_yaml, payload, - 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml") - nslcmop2_scale_in = "DEPLOY{}".format(self.step) - self._wait_nslcmop_ready(engine, nslcmop2_scale_in, timeout_deploy, expected_fail=True) + engine.test("Execute scale IN out of limit action over NS", "POST", + "/nslcm/v1/ns_instances/{}/scale".format(self.ns_id), headers_yaml, payload, + (201, 202), r_headers_yaml_location_nslcmop, "yaml") + nslcmop2_scale_in = engine.last_id + engine.wait_operation_ready("ns", nslcmop2_scale_in, timeout_deploy, expected_fail=True) class TestDeployIpMac(TestDeploy): @@ -998,13 +1498,14 @@ class TestDeployIpMac(TestDeploy): def __init__(self): super().__init__() + self.test_name = "SetIpMac" self.vnfd_filenames = ("vnfd_2vdu_set_ip_mac2.yaml", "vnfd_2vdu_set_ip_mac.yaml") self.nsd_filename = "scenario_2vdu_set_ip_mac.yaml" self.descriptor_url = \ "https://osm.etsi.org/gitweb/?p=osm/RO.git;a=blob_plain;f=test/RO_tests/v3_2vdu_set_ip_mac/" - self.cmds = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]} - self.uss = {'1': "osm", '2': "osm"} - self.pss = {'1': "osm4u", '2': "osm4u"} + self.commands = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]} + self.users = {'1': "osm", '2': "osm"} + self.passwords = {'1': "osm4u", '2': "osm4u"} self.timeout = 360 def run(self, engine, test_osm, manual_check, test_params=None): @@ -1072,137 +1573,123 @@ class TestDeployHackfest4(TestDeploy): def __init__(self): super().__init__() + self.test_name = "HACKFEST4-" self.vnfd_filenames = ("hackfest_4_vnfd.tar.gz",) self.nsd_filename = "hackfest_4_nsd.tar.gz" self.uses_configuration = True - self.cmds = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]} - self.uss = {'1': "ubuntu", '2': "ubuntu"} - self.pss = {'1': "osm4u", '2': "osm4u"} - - def create_descriptors(self, engine): - super().create_descriptors(engine) + self.commands = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]} + self.users = {'1': "ubuntu", '2': "ubuntu"} + self.passwords = {'1': "osm4u", '2': "osm4u"} # Modify VNFD to add scaling - payload = """ - scaling-group-descriptor: - - name: "scale_dataVM" - max-instance-count: 10 - scaling-policy: - - name: "auto_cpu_util_above_threshold" - scaling-type: "automatic" - threshold-time: 0 - cooldown-time: 60 - scaling-criteria: - - name: "cpu_util_above_threshold" - scale-in-threshold: 15 - scale-in-relational-operation: "LE" - scale-out-threshold: 60 - scale-out-relational-operation: "GE" - vnf-monitoring-param-ref: "all_aaa_cpu_util" - vdu: - - vdu-id-ref: dataVM - count: 1 - scaling-config-action: - - trigger: post-scale-out - vnf-config-primitive-name-ref: touch - - trigger: pre-scale-in - vnf-config-primitive-name-ref: touch - vnf-configuration: - config-primitive: - - name: touch - parameter: - - name: filename - data-type: STRING - default-value: '/home/ubuntu/touched' - """ - engine.test("DEPLOY{}".format(self.step), "Edit VNFD ", "PATCH", - "/vnfpkgm/v1/vnf_packages/<{}>".format(self.vnfds_test[0]), headers_yaml, payload, 204, None, None) - self.step += 1 + # self.descriptor_edit = { + # "vnfd0": { + # 'vnf-configuration': { + # 'config-primitive': [{ + # 'name': 'touch', + # 'parameter': [{ + # 'name': 'filename', + # 'data-type': 'STRING', + # 'default-value': '/home/ubuntu/touched' + # }] + # }] + # }, + # 'scaling-group-descriptor': [{ + # 'name': 'scale_dataVM', + # 'scaling-policy': [{ + # 'threshold-time': 0, + # 'name': 'auto_cpu_util_above_threshold', + # 'scaling-type': 'automatic', + # 'scaling-criteria': [{ + # 'name': 'cpu_util_above_threshold', + # 'vnf-monitoring-param-ref': 'all_aaa_cpu_util', + # 'scale-out-relational-operation': 'GE', + # 'scale-in-threshold': 15, + # 'scale-out-threshold': 60, + # 'scale-in-relational-operation': 'LE' + # }], + # 'cooldown-time': 60 + # }], + # 'max-instance-count': 10, + # 'scaling-config-action': [ + # {'vnf-config-primitive-name-ref': 'touch', + # 'trigger': 'post-scale-out'}, + # {'vnf-config-primitive-name-ref': 'touch', + # 'trigger': 'pre-scale-in'} + # ], + # 'vdu': [{ + # 'vdu-id-ref': 'dataVM', + # 'count': 1 + # }] + # }] + # } + # } class TestDeployHackfest3Charmed(TestDeploy): - description = "Load and deploy Hackfest 3charmed_ns example. Modifies it for adding scaling and performs " \ - "primitive actions and scaling" + description = "Load and deploy Hackfest 3charmed_ns example" def __init__(self): super().__init__() + self.test_name = "HACKFEST3-" self.vnfd_filenames = ("hackfest_3charmed_vnfd.tar.gz",) self.nsd_filename = "hackfest_3charmed_nsd.tar.gz" self.uses_configuration = True - self.cmds = {'1': [''], '2': ['ls -lrt /home/ubuntu/first-touch', ]} - self.uss = {'1': "ubuntu", '2': "ubuntu"} - self.pss = {'1': "osm4u", '2': "osm4u"} - - # def create_descriptors(self, engine): - # super().create_descriptors(engine) - # # Modify VNFD to add scaling - # payload = """ - # scaling-group-descriptor: - # - name: "scale_dataVM" - # max-instance-count: 10 - # scaling-policy: - # - name: "auto_cpu_util_above_threshold" - # scaling-type: "automatic" - # threshold-time: 0 - # cooldown-time: 60 - # scaling-criteria: - # - name: "cpu_util_above_threshold" - # scale-in-threshold: 15 - # scale-in-relational-operation: "LE" - # scale-out-threshold: 60 - # scale-out-relational-operation: "GE" - # vnf-monitoring-param-ref: "all_aaa_cpu_util" - # vdu: - # - vdu-id-ref: dataVM - # count: 1 - # scaling-config-action: - # - trigger: post-scale-out - # vnf-config-primitive-name-ref: touch - # - trigger: pre-scale-in - # vnf-config-primitive-name-ref: touch - # vnf-configuration: - # config-primitive: - # - name: touch - # parameter: - # - name: filename - # data-type: STRING - # default-value: '/home/ubuntu/touched' - # """ - # engine.test("DEPLOY{}".format(self.step), "Edit VNFD ", "PATCH", - # "/vnfpkgm/v1/vnf_packages/<{}>".format(self.vnfds_test[0]), - # headers_yaml, payload, 200, - # r_header_yaml, yaml) - # self.vnfds_test.append("DEPLOY" + str(self.step)) - # self.step += 1 - - def aditional_operations(self, engine, test_osm, manual_check): + self.commands = {'1': ['ls -lrt /home/ubuntu/first-touch'], '2': ['ls -lrt /home/ubuntu/first-touch']} + self.users = {'1': "ubuntu", '2': "ubuntu"} + self.passwords = {'1': "osm4u", '2': "osm4u"} + self.descriptor_edit = { + "vnfd0": yaml.safe_load( + """ + vnf-configuration: + terminate-config-primitive: + - seq: '1' + name: touch + parameter: + - name: filename + value: '/home/ubuntu/last-touch1' + - seq: '3' + name: touch + parameter: + - name: filename + value: '/home/ubuntu/last-touch3' + - seq: '2' + name: touch + parameter: + - name: filename + value: '/home/ubuntu/last-touch2' + """) + } + + def additional_operations(self, engine, test_osm, manual_check): if not test_osm: return # 1 perform action + vnfr_index_selected = "2" payload = '{member_vnf_index: "2", primitive: touch, primitive_params: { filename: /home/ubuntu/OSMTESTNBI }}' - engine.test("DEPLOY{}".format(self.step), "Executer service primitive over NS", "POST", - "/nslcm/v1/ns_instances/<{}>/action".format(self.ns_test), headers_yaml, payload, - 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml") - nslcmop2_action = "DEPLOY{}".format(self.step) - self.step += 1 + engine.test("Exec service primitive over NS", "POST", + "/nslcm/v1/ns_instances/{}/action".format(self.ns_id), headers_yaml, payload, + (201, 202), r_headers_yaml_location_nslcmop, "yaml") + nslcmop2_action = engine.last_id # Wait until status is Ok - self._wait_nslcmop_ready(engine, nslcmop2_action, timeout_deploy) + engine.wait_operation_ready("ns", nslcmop2_action, timeout_deploy) + vnfr_ip = self.get_vnfr_ip(engine, vnfr_index_selected) if manual_check: - input('NS service primitive has been executed. Check that file /home/ubuntu/OSMTESTNBI is present at ' - 'TODO_PUT_IP') - else: - cmds = {'1': [''], '2': ['ls -lrt /home/ubuntu/OSMTESTNBI', ]} - uss = {'1': "ubuntu", '2': "ubuntu"} - pss = {'1': "osm4u", '2': "osm4u"} - self.test_ns(engine, test_osm, cmds, uss, pss, self.keys, self.timeout) + input( + "NS service primitive has been executed." + "Check that file /home/ubuntu/OSMTESTNBI is present at {}". + format(vnfr_ip)) + if test_osm: + commands = {'1': [''], '2': ['ls -lrt /home/ubuntu/OSMTESTNBI', ]} + self.test_ns(engine, test_osm, commands=commands) # # 2 perform scale out # payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_OUT, scaleByStepData: ' \ # '{scaling-group-descriptor: scale_dataVM, member-vnf-index: "1"}}}' - # engine.test("DEPLOY{}".format(self.step), "Execute scale action over NS", "POST", - # "/nslcm/v1/ns_instances/<{}>/scale".format(self.ns_test), headers_yaml, payload, - # 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml") - # nslcmop2_scale_out = "DEPLOY{}".format(self.step) - # self._wait_nslcmop_ready(engine, nslcmop2_scale_out, timeout_deploy) + # engine.test("Execute scale action over NS", "POST", + # "/nslcm/v1/ns_instances/{}/scale".format(self.ns_id), headers_yaml, payload, + # (201, 202), r_headers_yaml_location_nslcmop, "yaml") + # nslcmop2_scale_out = engine.last_id + # engine.wait_operation_ready("ns", nslcmop2_scale_out, timeout_deploy) # if manual_check: # input('NS scale out done. Check that file /home/ubuntu/touched is present and new VM is created') # # TODO check automatic @@ -1210,21 +1697,497 @@ class TestDeployHackfest3Charmed(TestDeploy): # # 2 perform scale in # payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_IN, scaleByStepData: ' \ # '{scaling-group-descriptor: scale_dataVM, member-vnf-index: "1"}}}' - # engine.test("DEPLOY{}".format(self.step), "Execute scale action over NS", "POST", - # "/nslcm/v1/ns_instances/<{}>/scale".format(self.ns_test), headers_yaml, payload, - # 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml") - # nslcmop2_scale_in = "DEPLOY{}".format(self.step) - # self._wait_nslcmop_ready(engine, nslcmop2_scale_in, timeout_deploy) + # engine.test("Execute scale action over NS", "POST", + # "/nslcm/v1/ns_instances/{}/scale".format(self.ns_id), headers_yaml, payload, + # (201, 202), r_headers_yaml_location_nslcmop, "yaml") + # nslcmop2_scale_in = engine.last_id + # engine.wait_operation_ready("ns", nslcmop2_scale_in, timeout_deploy) # if manual_check: # input('NS scale in done. Check that file /home/ubuntu/touched is updated and new VM is deleted') # # TODO check automatic +class TestDeployHackfest3Charmed2(TestDeployHackfest3Charmed): + description = "Load and deploy Hackfest 3charmed_ns example modified version of descriptors to have dots in " \ + "ids and member-vnf-index." + + def __init__(self): + super().__init__() + self.test_name = "HACKFEST3v2-" + self.qforce = "?FORCE=True" + self.descriptor_edit = { + "vnfd0": { + "vdu": { + "$[0]": { + "interface": {"$[0]": {"external-connection-point-ref": "pdu-mgmt"}} + }, + "$[1]": None + }, + "vnf-configuration": None, + "connection-point": { + "$[0]": { + "id": "pdu-mgmt", + "name": "pdu-mgmt", + "short-name": "pdu-mgmt" + }, + "$[1]": None + }, + "mgmt-interface": {"cp": "pdu-mgmt"}, + "description": "A vnf single vdu to be used as PDU", + "id": "vdu-as-pdu", + "internal-vld": { + "$[0]": { + "id": "pdu_internal", + "name": "pdu_internal", + "internal-connection-point": {"$[1]": None}, + "short-name": "pdu_internal", + "type": "ELAN" + } + } + }, + + # Modify NSD accordingly + "nsd": { + "constituent-vnfd": { + "$[0]": {"vnfd-id-ref": "vdu-as-pdu"}, + "$[1]": None, + }, + "description": "A nsd to deploy the vnf to act as as PDU", + "id": "nsd-as-pdu", + "name": "nsd-as-pdu", + "short-name": "nsd-as-pdu", + "vld": { + "$[0]": { + "id": "mgmt_pdu", + "name": "mgmt_pdu", + "short-name": "mgmt_pdu", + "vnfd-connection-point-ref": { + "$[0]": { + "vnfd-connection-point-ref": "pdu-mgmt", + "vnfd-id-ref": "vdu-as-pdu", + }, + "$[1]": None + }, + "type": "ELAN" + }, + "$[1]": None, + } + } + } + + +class TestDeployHackfest3Charmed3(TestDeployHackfest3Charmed): + description = "Load and deploy Hackfest 3charmed_ns example modified version to test scaling and NS parameters" + + def __init__(self): + super().__init__() + self.test_name = "HACKFEST3v3-" + self.commands = {'1': ['ls -lrt /home/ubuntu/first-touch-1'], '2': ['ls -lrt /home/ubuntu/first-touch-2']} + self.descriptor_edit = { + "vnfd0": yaml.load( + """ + scaling-group-descriptor: + - name: "scale_dataVM" + max-instance-count: 10 + scaling-policy: + - name: "auto_cpu_util_above_threshold" + scaling-type: "automatic" + threshold-time: 0 + cooldown-time: 60 + scaling-criteria: + - name: "cpu_util_above_threshold" + scale-in-threshold: 15 + scale-in-relational-operation: "LE" + scale-out-threshold: 60 + scale-out-relational-operation: "GE" + vnf-monitoring-param-ref: "monitor1" + vdu: + - vdu-id-ref: dataVM + count: 1 + scaling-config-action: + - trigger: post-scale-out + vnf-config-primitive-name-ref: touch + - trigger: pre-scale-in + vnf-config-primitive-name-ref: touch + vdu: + "$id: dataVM": + monitoring-param: + - id: "dataVM_cpu_util" + nfvi-metric: "cpu_utilization" + + monitoring-param: + - id: "monitor1" + name: "monitor1" + aggregation-type: AVERAGE + vdu-monitoring-param: + vdu-ref: "dataVM" + vdu-monitoring-param-ref: "dataVM_cpu_util" + vnf-configuration: + initial-config-primitive: + "$[1]": + parameter: + "$[0]": + value: "" # default-value: /home/ubuntu/first-touch + config-primitive: + "$[0]": + parameter: + "$[0]": + default-value: "" + """, + Loader=yaml.Loader) + } + self.ns_params = { + "additionalParamsForVnf": [ + {"member-vnf-index": "1", "additionalParams": {"touch_filename": "/home/ubuntu/first-touch-1", + "touch_filename2": "/home/ubuntu/second-touch-1"}}, + {"member-vnf-index": "2", "additionalParams": {"touch_filename": "/home/ubuntu/first-touch-2", + "touch_filename2": "/home/ubuntu/second-touch-2"}}, + ] + } + + def additional_operations(self, engine, test_osm, manual_check): + super().additional_operations(engine, test_osm, manual_check) + if not test_osm: + return + + # 2 perform scale out + payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_OUT, scaleByStepData: ' \ + '{scaling-group-descriptor: scale_dataVM, member-vnf-index: "1"}}}' + engine.test("Execute scale action over NS", "POST", + "/nslcm/v1/ns_instances/{}/scale".format(self.ns_id), headers_yaml, payload, + (201, 202), r_headers_yaml_location_nslcmop, "yaml") + nslcmop2_scale_out = engine.last_id + engine.wait_operation_ready("ns", nslcmop2_scale_out, timeout_deploy) + if manual_check: + input('NS scale out done. Check that file /home/ubuntu/second-touch-1 is present and new VM is created') + if test_osm: + commands = {'1': ['ls -lrt /home/ubuntu/second-touch-1', ]} + self.test_ns(engine, test_osm, commands=commands) + # TODO check automatic connection to scaled VM + + # 2 perform scale in + payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_IN, scaleByStepData: ' \ + '{scaling-group-descriptor: scale_dataVM, member-vnf-index: "1"}}}' + engine.test("Execute scale action over NS", "POST", + "/nslcm/v1/ns_instances/{}/scale".format(self.ns_id), headers_yaml, payload, + (201, 202), r_headers_yaml_location_nslcmop, "yaml") + nslcmop2_scale_in = engine.last_id + engine.wait_operation_ready("ns", nslcmop2_scale_in, timeout_deploy) + if manual_check: + input('NS scale in done. Check that file /home/ubuntu/second-touch-1 is updated and new VM is deleted') + # TODO check automatic + + +class TestDeploySimpleCharm(TestDeploy): + description = "Deploy hackfest-4 hackfest_simplecharm example" + + def __init__(self): + super().__init__() + self.test_name = "HACKFEST-SIMPLE" + self.descriptor_url = "https://osm-download.etsi.org/ftp/osm-4.0-four/4th-hackfest/packages/" + self.vnfd_filenames = ("hackfest_simplecharm_vnf.tar.gz",) + self.nsd_filename = "hackfest_simplecharm_ns.tar.gz" + self.uses_configuration = True + self.commands = {'1': [''], '2': ['ls -lrt /home/ubuntu/first-touch', ]} + self.users = {'1': "ubuntu", '2': "ubuntu"} + self.passwords = {'1': "osm4u", '2': "osm4u"} + + +class TestDeploySimpleCharm2(TestDeploySimpleCharm): + description = "Deploy hackfest-4 hackfest_simplecharm example changing naming to contain dots on ids and " \ + "vnf-member-index" + + def __init__(self): + super().__init__() + self.test_name = "HACKFEST-SIMPLE2-" + self.qforce = "?FORCE=True" + self.descriptor_edit = { + "vnfd0": { + "id": "hackfest.simplecharm.vnf" + }, + + "nsd": { + "id": "hackfest.simplecharm.ns", + "constituent-vnfd": { + "$[0]": {"vnfd-id-ref": "hackfest.simplecharm.vnf", "member-vnf-index": "$1"}, + "$[1]": {"vnfd-id-ref": "hackfest.simplecharm.vnf", "member-vnf-index": "$2"}, + }, + "vld": { + "$[0]": { + "vnfd-connection-point-ref": {"$[0]": {"member-vnf-index-ref": "$1", + "vnfd-id-ref": "hackfest.simplecharm.vnf"}, + "$[1]": {"member-vnf-index-ref": "$2", + "vnfd-id-ref": "hackfest.simplecharm.vnf"}}, + }, + "$[1]": { + "vnfd-connection-point-ref": {"$[0]": {"member-vnf-index-ref": "$1", + "vnfd-id-ref": "hackfest.simplecharm.vnf"}, + "$[1]": {"member-vnf-index-ref": "$2", + "vnfd-id-ref": "hackfest.simplecharm.vnf"}}, + }, + } + } + } + + +class TestDeploySingleVdu(TestDeployHackfest3Charmed): + description = "Generate a single VDU base on editing Hackfest3Charmed descriptors and deploy" + + def __init__(self): + super().__init__() + self.test_name = "SingleVDU" + self.qforce = "?FORCE=True" + self.descriptor_edit = { + # Modify VNFD to remove one VDU + "vnfd0": { + "vdu": { + "$[0]": { + "interface": {"$[0]": {"external-connection-point-ref": "pdu-mgmt"}} + }, + "$[1]": None + }, + "vnf-configuration": None, + "connection-point": { + "$[0]": { + "id": "pdu-mgmt", + "name": "pdu-mgmt", + "short-name": "pdu-mgmt" + }, + "$[1]": None + }, + "mgmt-interface": {"cp": "pdu-mgmt"}, + "description": "A vnf single vdu to be used as PDU", + "id": "vdu-as-pdu", + "internal-vld": { + "$[0]": { + "id": "pdu_internal", + "name": "pdu_internal", + "internal-connection-point": {"$[1]": None}, + "short-name": "pdu_internal", + "type": "ELAN" + } + } + }, + + # Modify NSD accordingly + "nsd": { + "constituent-vnfd": { + "$[0]": {"vnfd-id-ref": "vdu-as-pdu"}, + "$[1]": None, + }, + "description": "A nsd to deploy the vnf to act as as PDU", + "id": "nsd-as-pdu", + "name": "nsd-as-pdu", + "short-name": "nsd-as-pdu", + "vld": { + "$[0]": { + "id": "mgmt_pdu", + "name": "mgmt_pdu", + "short-name": "mgmt_pdu", + "vnfd-connection-point-ref": { + "$[0]": { + "vnfd-connection-point-ref": "pdu-mgmt", + "vnfd-id-ref": "vdu-as-pdu", + }, + "$[1]": None + }, + "type": "ELAN" + }, + "$[1]": None, + } + } + } + + +class TestDeployHnfd(TestDeployHackfest3Charmed): + description = "Generate a HNFD base on editing Hackfest3Charmed descriptors and deploy" + + def __init__(self): + super().__init__() + self.test_name = "HNFD" + self.pduDeploy = TestDeploySingleVdu() + self.pdu_interface_0 = {} + self.pdu_interface_1 = {} + + self.pdu_id = None + # self.vnf_to_pdu = """ + # vdu: + # "$[0]": + # pdu-type: PDU-TYPE-1 + # interface: + # "$[0]": + # name: mgmt-iface + # "$[1]": + # name: pdu-iface-internal + # id: hfn1 + # description: HFND, one PDU + One VDU + # name: hfn1 + # short-name: hfn1 + # + # """ + + self.pdu_descriptor = { + "name": "my-PDU", + "type": "PDU-TYPE-1", + "vim_accounts": "to-override", + "interfaces": [ + { + "name": "mgmt-iface", + "mgmt": True, + "type": "overlay", + "ip-address": "to override", + "mac-address": "mac_address", + "vim-network-name": "mgmt", + }, + { + "name": "pdu-iface-internal", + "mgmt": False, + "type": "overlay", + "ip-address": "to override", + "mac-address": "mac_address", + "vim-network-name": "pdu_internal", # OSMNBITEST-PDU-pdu_internal + }, + ] + } + self.vnfd_filenames = ("hackfest_3charmed_vnfd.tar.gz", "hackfest_3charmed_vnfd.tar.gz") + + self.descriptor_edit = { + "vnfd0": { + "id": "hfnd1", + "name": "hfn1", + "short-name": "hfn1", + "vdu": { + "$[0]": { + "pdu-type": "PDU-TYPE-1", + "interface": { + "$[0]": {"name": "mgmt-iface"}, + "$[1]": {"name": "pdu-iface-internal"}, + } + } + } + }, + "nsd": { + "constituent-vnfd": { + "$[1]": {"vnfd-id-ref": "hfnd1"} + }, + "vld": { + "$[0]": {"vnfd-connection-point-ref": {"$[1]": {"vnfd-id-ref": "hfnd1"}}}, + "$[1]": {"vnfd-connection-point-ref": {"$[1]": {"vnfd-id-ref": "hfnd1"}}} + } + } + } + + def create_descriptors(self, engine): + super().create_descriptors(engine) + + # Create PDU + self.pdu_descriptor["interfaces"][0].update(self.pdu_interface_0) + self.pdu_descriptor["interfaces"][1].update(self.pdu_interface_1) + self.pdu_descriptor["vim_accounts"] = [self.vim_id] + # TODO get vim-network-name from vnfr.vld.name + self.pdu_descriptor["interfaces"][1]["vim-network-name"] = "{}-{}-{}".format( + os.environ.get("OSMNBITEST_NS_NAME", "OSMNBITEST"), + "PDU", self.pdu_descriptor["interfaces"][1]["vim-network-name"]) + engine.test("Onboard PDU descriptor", "POST", "/pdu/v1/pdu_descriptors", + {"Location": "/pdu/v1/pdu_descriptors/", "Content-Type": "application/yaml"}, self.pdu_descriptor, + 201, r_header_yaml, "yaml") + self.pdu_id = engine.last_id + + def run(self, engine, test_osm, manual_check, test_params=None): + engine.get_autorization() + engine.set_test_name(self.test_name) + nsname = os.environ.get("OSMNBITEST_NS_NAME", "OSMNBITEST") + + # create real VIM if not exist + self.vim_id = engine.get_create_vim(test_osm) + # instantiate PDU + self.pduDeploy.create_descriptors(engine) + self.pduDeploy.instantiate(engine, {"nsDescription": "to be used as PDU", "nsName": nsname + "-PDU", + "nsdId": self.pduDeploy.nsd_id, "vimAccountId": self.vim_id}) + if manual_check: + input('VNF to be used as PDU has been deployed. Perform manual check and press enter to resume') + if test_osm: + self.pduDeploy.test_ns(engine, test_osm) + + if test_osm: + r = engine.test("Get VNFR to obtain IP_ADDRESS", "GET", + "/nslcm/v1/vnfrs?nsr-id-ref={}".format(self.pduDeploy.ns_id), headers_json, None, + 200, r_header_json, "json") + if not r: + return + vnfr_data = r.json() + # print(vnfr_data) + + self.pdu_interface_0["ip-address"] = vnfr_data[0]["vdur"][0]["interfaces"][0].get("ip-address") + self.pdu_interface_1["ip-address"] = vnfr_data[0]["vdur"][0]["interfaces"][1].get("ip-address") + self.pdu_interface_0["mac-address"] = vnfr_data[0]["vdur"][0]["interfaces"][0].get("mac-address") + self.pdu_interface_1["mac-address"] = vnfr_data[0]["vdur"][0]["interfaces"][1].get("mac-address") + if not self.pdu_interface_0["ip-address"]: + raise TestException("Vnfr has not managment ip address") + else: + self.pdu_interface_0["ip-address"] = "192.168.10.10" + self.pdu_interface_1["ip-address"] = "192.168.11.10" + self.pdu_interface_0["mac-address"] = "52:33:44:55:66:13" + self.pdu_interface_1["mac-address"] = "52:33:44:55:66:14" + + self.create_descriptors(engine) + + ns_data = {"nsDescription": "default description", "nsName": nsname, "nsdId": self.nsd_id, + "vimAccountId": self.vim_id} + if test_params and test_params.get("ns-config"): + if isinstance(test_params["ns-config"], str): + ns_data.update(yaml.load(test_params["ns-config"]), Loader=yaml.Loader) + else: + ns_data.update(test_params["ns-config"]) + + self.instantiate(engine, ns_data) + if manual_check: + input('NS has been deployed. Perform manual check and press enter to resume') + if test_osm: + self.test_ns(engine, test_osm) + self.additional_operations(engine, test_osm, manual_check) + self.terminate(engine) + self.pduDeploy.terminate(engine) + self.delete_descriptors(engine) + self.pduDeploy.delete_descriptors(engine) + + def delete_descriptors(self, engine): + super().delete_descriptors(engine) + # delete pdu + engine.test("Delete PDU SOL005", "DELETE", + "/pdu/v1/pdu_descriptors/{}".format(self.pdu_id), + headers_yaml, None, 204, None, 0) + + class TestDescriptors: description = "Test VNFD, NSD, PDU descriptors CRUD and dependencies" + vnfd_empty = """vnfd:vnfd-catalog: + vnfd: + - name: prova + short-name: prova + id: prova + """ + vnfd_prova = """vnfd:vnfd-catalog: + vnfd: + - connection-point: + - name: cp_0h8m + type: VPORT + id: prova + name: prova + short-name: prova + vdu: + - id: vdu_z4bm + image: ubuntu + interface: + - external-connection-point-ref: cp_0h8m + name: eth0 + virtual-interface: + type: VIRTIO + name: vdu_z4bm + version: '1.0' + """ def __init__(self): - self.step = 0 self.vnfd_filename = "hackfest_3charmed_vnfd.tar.gz" self.nsd_filename = "hackfest_3charmed_nsd.tar.gz" self.descriptor_url = "https://osm-download.etsi.org/ftp/osm-3.0-three/2nd-hackfest/packages/" @@ -1232,6 +2195,7 @@ class TestDescriptors: self.nsd_id = None def run(self, engine, test_osm, manual_check, test_params=None): + engine.set_test_name("Descriptors") engine.get_autorization() temp_dir = os.path.dirname(os.path.abspath(__file__)) + "/temp/" if not os.path.exists(temp_dir): @@ -1251,114 +2215,903 @@ class TestDescriptors: vnfd_filename_path = temp_dir + self.vnfd_filename nsd_filename_path = temp_dir + self.nsd_filename - # vnfd CREATE AND UPLOAD in one step: - test_name = "DESCRIPTOR{}".format(self.step) - engine.test(test_name, "Onboard VNFD in one step", "POST", - "/vnfpkgm/v1/vnf_packages_content", headers_zip_yaml, "@b" + vnfd_filename_path, 201, - {"Location": "/vnfpkgm/v1/vnf_packages_content/", "Content-Type": "application/yaml"}, "yaml") - self.vnfd_id = engine.test_ids[test_name] - self.step += 1 + engine.test("Onboard empty VNFD in one step", "POST", "/vnfpkgm/v1/vnf_packages_content", headers_yaml, + self.vnfd_empty, 201, r_headers_yaml_location_vnfd, "yaml") + self.vnfd_id = engine.last_id + + # test bug 605 + engine.test("Upload invalid VNFD ", "PUT", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(self.vnfd_id), + headers_yaml, self.vnfd_prova, 422, r_header_yaml, "yaml") + + engine.test("Upload VNFD {}".format(self.vnfd_filename), "PUT", + "/vnfpkgm/v1/vnf_packages/{}/package_content".format(self.vnfd_id), headers_zip_yaml, + "@b" + vnfd_filename_path, 204, None, 0) + + queries = ["mgmt-interface.cp=mgmt", "vdu.0.interface.0.external-connection-point-ref=mgmt", + "vdu.0.interface.1.internal-connection-point-ref=internal", + "internal-vld.0.internal-connection-point.0.id-ref=internal", + # Detection of duplicated VLD names in VNF Descriptors + # URL: internal-vld=[ + # {id: internal1, name: internal, type:ELAN, + # internal-connection-point: [{id-ref: mgmtVM-internal}, {id-ref: dataVM-internal}]}, + # {id: internal2, name: internal, type:ELAN, + # internal-connection-point: [{id-ref: mgmtVM-internal}, {id-ref: dataVM-internal}]} + # ] + "internal-vld=%5B%7Bid%3A%20internal1%2C%20name%3A%20internal%2C%20type%3A%20ELAN%2C%20" + "internal-connection-point%3A%20%5B%7Bid-ref%3A%20mgmtVM-internal%7D%2C%20%7Bid-ref%3A%20" + "dataVM-internal%7D%5D%7D%2C%20%7Bid%3A%20internal2%2C%20name%3A%20internal%2C%20type%3A%20" + "ELAN%2C%20internal-connection-point%3A%20%5B%7Bid-ref%3A%20mgmtVM-internal%7D%2C%20%7B" + "id-ref%3A%20dataVM-internal%7D%5D%7D%5D" + ] + for query in queries: + engine.test("Upload invalid VNFD ", "PUT", + "/vnfpkgm/v1/vnf_packages/{}/package_content?{}".format(self.vnfd_id, query), + headers_zip_yaml, "@b" + vnfd_filename_path, 422, r_header_yaml, "yaml") + + # test bug 605 + engine.test("Upload invalid VNFD ", "PUT", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(self.vnfd_id), + headers_yaml, self.vnfd_prova, 422, r_header_yaml, "yaml") # get vnfd descriptor - engine.test("DESCRIPTOR" + str(self.step), "Get VNFD descriptor", "GET", - "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_id), headers_yaml, None, 200, r_header_yaml, "yaml") - self.step += 1 + engine.test("Get VNFD descriptor", "GET", "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_id), + headers_yaml, None, 200, r_header_yaml, "yaml") # get vnfd file descriptor - engine.test("DESCRIPTOR" + str(self.step), "Get VNFD file descriptor", "GET", - "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(self.vnfd_id), headers_text, None, 200, - r_header_text, "text", temp_dir+"vnfd-yaml") - self.step += 1 + engine.test("Get VNFD file descriptor", "GET", "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(self.vnfd_id), + headers_text, None, 200, r_header_text, "text", temp_dir+"vnfd-yaml") # TODO compare files: diff vnfd-yaml hackfest_3charmed_vnfd/hackfest_3charmed_vnfd.yaml # get vnfd zip file package - engine.test("DESCRIPTOR" + str(self.step), "Get VNFD zip package", "GET", + engine.test("Get VNFD zip package", "GET", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(self.vnfd_id), headers_zip, None, 200, r_header_zip, "zip", temp_dir+"vnfd-zip") - self.step += 1 # TODO compare files: diff vnfd-zip hackfest_3charmed_vnfd.tar.gz # get vnfd artifact - engine.test("DESCRIPTOR" + str(self.step), "Get VNFD artifact package", "GET", + engine.test("Get VNFD artifact package", "GET", "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/osm.png".format(self.vnfd_id), headers_zip, None, 200, r_header_octect, "octet-string", temp_dir+"vnfd-icon") - self.step += 1 # TODO compare files: diff vnfd-icon hackfest_3charmed_vnfd/icons/osm.png # nsd CREATE AND UPLOAD in one step: - test_name = "DESCRIPTOR{}".format(self.step) - engine.test(test_name, "Onboard NSD in one step", "POST", - "/nsd/v1/ns_descriptors_content", headers_zip_yaml, "@b" + nsd_filename_path, 201, - {"Location": "/nsd/v1/ns_descriptors_content/", "Content-Type": "application/yaml"}, "yaml") - self.nsd_id = engine.test_ids[test_name] - self.step += 1 + engine.test("Onboard NSD in one step", "POST", "/nsd/v1/ns_descriptors_content", headers_zip_yaml, + "@b" + nsd_filename_path, 201, r_headers_yaml_location_nsd, "yaml") + self.nsd_id = engine.last_id + + queries = ["vld.0.vnfd-connection-point-ref.0.vnfd-id-ref=hf"] + for query in queries: + engine.test("Upload invalid NSD ", "PUT", + "/nsd/v1/ns_descriptors/{}/nsd_content?{}".format(self.nsd_id, query), + headers_zip_yaml, "@b" + nsd_filename_path, 422, r_header_yaml, "yaml") # get nsd descriptor - engine.test("DESCRIPTOR" + str(self.step), "Get NSD descriptor", "GET", - "/nsd/v1/ns_descriptors/{}".format(self.nsd_id), headers_yaml, None, 200, r_header_yaml, "yaml") - self.step += 1 + engine.test("Get NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}".format(self.nsd_id), headers_yaml, + None, 200, r_header_yaml, "yaml") # get nsd file descriptor - engine.test("DESCRIPTOR" + str(self.step), "Get NSD file descriptor", "GET", - "/nsd/v1/ns_descriptors/{}/nsd".format(self.nsd_id), headers_text, None, 200, - r_header_text, "text", temp_dir+"nsd-yaml") - self.step += 1 + engine.test("Get NSD file descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(self.nsd_id), headers_text, + None, 200, r_header_text, "text", temp_dir+"nsd-yaml") # TODO compare files: diff nsd-yaml hackfest_3charmed_nsd/hackfest_3charmed_nsd.yaml # get nsd zip file package - engine.test("DESCRIPTOR" + str(self.step), "Get NSD zip package", "GET", - "/nsd/v1/ns_descriptors/{}/nsd_content".format(self.nsd_id), headers_zip, None, 200, - r_header_zip, "zip", temp_dir+"nsd-zip") - self.step += 1 + engine.test("Get NSD zip package", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(self.nsd_id), + headers_zip, None, 200, r_header_zip, "zip", temp_dir+"nsd-zip") # TODO compare files: diff nsd-zip hackfest_3charmed_nsd.tar.gz # get nsd artifact - engine.test("DESCRIPTOR" + str(self.step), "Get NSD artifact package", "GET", + engine.test("Get NSD artifact package", "GET", "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm.png".format(self.nsd_id), headers_zip, None, 200, r_header_octect, "octet-string", temp_dir+"nsd-icon") - self.step += 1 # TODO compare files: diff nsd-icon hackfest_3charmed_nsd/icons/osm.png # vnfd DELETE - test_rest.test("DESCRIPTOR" + str(self.step), "Delete VNFD conflict", "DELETE", - "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_id), headers_yaml, None, 409, None, None) - self.step += 1 + test_rest.test("Delete VNFD conflict", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_id), + headers_yaml, None, 409, None, None) - test_rest.test("DESCRIPTOR" + str(self.step), "Delete VNFD force", "DELETE", - "/vnfpkgm/v1/vnf_packages/{}?FORCE=TRUE".format(self.vnfd_id), headers_yaml, None, 204, None, 0) - self.step += 1 + test_rest.test("Delete VNFD force", "DELETE", "/vnfpkgm/v1/vnf_packages/{}?FORCE=TRUE".format(self.vnfd_id), + headers_yaml, None, 204, None, 0) # nsd DELETE - test_rest.test("DESCRIPTOR" + str(self.step), "Delete NSD", "DELETE", - "/nsd/v1/ns_descriptors/{}".format(self.nsd_id), headers_yaml, None, 204, None, 0) - self.step += 1 + test_rest.test("Delete NSD", "DELETE", "/nsd/v1/ns_descriptors/{}".format(self.nsd_id), headers_yaml, None, 204, + None, 0) -class TestNstTemplates: +class TestNetSliceTemplates: description = "Upload a NST to OSM" def __init__(self): - self.nst_filenames = ("@./cirros_slice/cirros_slice.yaml") + self.vnfd_filename = ("@./slice_shared/vnfd/slice_shared_vnfd.yaml") + self.vnfd_filename_middle = ("@./slice_shared/vnfd/slice_shared_middle_vnfd.yaml") + self.nsd_filename = ("@./slice_shared/nsd/slice_shared_nsd.yaml") + self.nsd_filename_middle = ("@./slice_shared/nsd/slice_shared_middle_nsd.yaml") + self.nst_filenames = ("@./slice_shared/slice_shared_nstd.yaml") def run(self, engine, test_osm, manual_check, test_params=None): # nst CREATE + engine.set_test_name("NST step ") engine.get_autorization() - r = engine.test("NST", "Onboard NST", "POST", "/nst/v1/netslice_templates_content", headers_yaml, - self.nst_filenames, - 201, {"Location": "/nst/v1/netslice_templates_content", "Content-Type": "application/yaml"}, - "yaml") - location = r.headers["Location"] - nst_id = location[location.rfind("/")+1:] + temp_dir = os.path.dirname(os.path.abspath(__file__)) + "/temp/" + if not os.path.exists(temp_dir): + os.makedirs(temp_dir) + + # Onboard VNFDs + engine.test("Onboard edge VNFD", "POST", "/vnfpkgm/v1/vnf_packages_content", headers_yaml, + self.vnfd_filename, 201, r_headers_yaml_location_vnfd, "yaml") + self.vnfd_edge_id = engine.last_id + + engine.test("Onboard middle VNFD", "POST", "/vnfpkgm/v1/vnf_packages_content", headers_yaml, + self.vnfd_filename_middle, 201, r_headers_yaml_location_vnfd, "yaml") + self.vnfd_middle_id = engine.last_id + + # Onboard NSDs + engine.test("Onboard NSD edge", "POST", "/nsd/v1/ns_descriptors_content", headers_yaml, + self.nsd_filename, 201, r_headers_yaml_location_nsd, "yaml") + self.nsd_edge_id = engine.last_id + + engine.test("Onboard NSD middle", "POST", "/nsd/v1/ns_descriptors_content", headers_yaml, + self.nsd_filename_middle, 201, r_headers_yaml_location_nsd, "yaml") + self.nsd_middle_id = engine.last_id + + # Onboard NST + engine.test("Onboard NST", "POST", "/nst/v1/netslice_templates_content", headers_yaml, self.nst_filenames, + 201, r_headers_yaml_location_nst, "yaml") + nst_id = engine.last_id # nstd SHOW OSM format - r = engine.test("NST", "Show NSTD OSM format", "GET", - "/nst/v1/netslice_templates_content/{}".format(nst_id), headers_json, None, - 200, r_header_json, "json") + engine.test("Show NSTD OSM format", "GET", "/nst/v1/netslice_templates/{}".format(nst_id), headers_json, None, + 200, r_header_json, "json") # nstd DELETE - r = engine.test("NST", "Delete NSTD", "DELETE", - "/nst/v1/netslice_templates_content/{}".format(nst_id), headers_json, None, - 204, None, 0) + engine.test("Delete NSTD", "DELETE", "/nst/v1/netslice_templates/{}".format(nst_id), headers_json, None, + 204, None, 0) + + # NSDs DELETE + test_rest.test("Delete NSD middle", "DELETE", "/nsd/v1/ns_descriptors/{}".format(self.nsd_middle_id), + headers_json, None, 204, None, 0) + + test_rest.test("Delete NSD edge", "DELETE", "/nsd/v1/ns_descriptors/{}".format(self.nsd_edge_id), headers_json, + None, 204, None, 0) + + # VNFDs DELETE + test_rest.test("Delete VNFD edge", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_edge_id), + headers_yaml, None, 204, None, 0) + + test_rest.test("Delete VNFD middle", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_middle_id), + headers_yaml, None, 204, None, 0) + + +class TestNetSliceInstances: + ''' + Test procedure: + 1. Populate databases with VNFD, NSD, NST with the following scenario + +-----------------management-----------------+ + | | | + +--+---+ +----+----+ +---+--+ + | | | | | | + | edge +---data1----+ middle +---data2-----+ edge | + | | | | | | + +------+ +---------+ +------+ + shared-nss + 2. Create NSI-1 + 3. Instantiate NSI-1 + 4. Create NSI-2 + 5. Instantiate NSI-2 + Manual check - Are 2 slices instantiated correctly? + NSI-1 3 nss (2 nss-edges + 1 nss-middle) + NSI-2 2 nss (2 nss-edge sharing nss-middle) + 6. Terminate NSI-1 + 7. Delete NSI-1 + Manual check - Is slice NSI-1 deleted correctly? + NSI-2 with 2 nss-edge + 1 nss-middle (The one from NSI-1) + 8. Create NSI-3 + 9. Instantiate NSI-3 + Manual check - Is slice NSI-3 instantiated correctly? + NSI-3 reuse nss-middle. NSI-3 only create 2 nss-edge + 10. Delete NSI-2 + 11. Terminate NSI-2 + 12. Delete NSI-3 + 13. Terminate NSI-3 + Manual check - All cleaned correctly? + NSI-2 and NSI-3 were terminated and deleted + 14. Cleanup database + ''' + + description = "Upload a NST to OSM" + + def __init__(self): + self.vim_id = None + self.vnfd_filename = ("@./slice_shared/vnfd/slice_shared_vnfd.yaml") + self.vnfd_filename_middle = ("@./slice_shared/vnfd/slice_shared_middle_vnfd.yaml") + self.nsd_filename = ("@./slice_shared/nsd/slice_shared_nsd.yaml") + self.nsd_filename_middle = ("@./slice_shared/nsd/slice_shared_middle_nsd.yaml") + self.nst_filenames = ("@./slice_shared/slice_shared_nstd.yaml") + + def create_slice(self, engine, nsi_data, name): + ns_data_text = yaml.safe_dump(nsi_data, default_flow_style=True, width=256) + r = engine.test(name, "POST", "/nsilcm/v1/netslice_instances", + headers_yaml, ns_data_text, (201, 202), + {"Location": "nsilcm/v1/netslice_instances/", "Content-Type": "application/yaml"}, "yaml") + return r + + def instantiate_slice(self, engine, nsi_data, nsi_id, name): + ns_data_text = yaml.safe_dump(nsi_data, default_flow_style=True, width=256) + engine.test(name, "POST", + "/nsilcm/v1/netslice_instances/{}/instantiate".format(nsi_id), headers_yaml, ns_data_text, + (201, 202), r_headers_yaml_location_nsilcmop, "yaml") + + def terminate_slice(self, engine, nsi_id, name): + engine.test(name, "POST", "/nsilcm/v1/netslice_instances/{}/terminate".format(nsi_id), + headers_yaml, None, (201, 202), r_headers_yaml_location_nsilcmop, "yaml") + + def delete_slice(self, engine, nsi_id, name): + engine.test(name, "DELETE", "/nsilcm/v1/netslice_instances/{}".format(nsi_id), headers_yaml, None, + 204, None, 0) + + def run(self, engine, test_osm, manual_check, test_params=None): + # nst CREATE + engine.set_test_name("NSI") + engine.get_autorization() + + # Onboard VNFDs + engine.test("Onboard edge VNFD", "POST", "/vnfpkgm/v1/vnf_packages_content", headers_yaml, + self.vnfd_filename, 201, r_headers_yaml_location_vnfd, "yaml") + self.vnfd_edge_id = engine.last_id + + engine.test("Onboard middle VNFD", "POST", "/vnfpkgm/v1/vnf_packages_content", headers_yaml, + self.vnfd_filename_middle, 201, r_headers_yaml_location_vnfd, "yaml") + self.vnfd_middle_id = engine.last_id + + # Onboard NSDs + engine.test("Onboard NSD edge", "POST", "/nsd/v1/ns_descriptors_content", headers_yaml, + self.nsd_filename, 201, r_headers_yaml_location_nsd, "yaml") + self.nsd_edge_id = engine.last_id + + engine.test("Onboard NSD middle", "POST", "/nsd/v1/ns_descriptors_content", headers_yaml, + self.nsd_filename_middle, 201, r_headers_yaml_location_nsd, "yaml") + self.nsd_middle_id = engine.last_id + + # Onboard NST + engine.test("Onboard NST", "POST", "/nst/v1/netslice_templates_content", headers_yaml, self.nst_filenames, + 201, r_headers_yaml_location_nst, "yaml") + nst_id = engine.last_id + + self.vim_id = engine.get_create_vim(test_osm) + + # CREATE NSI-1 + ns_data = {'nsiName': 'Deploy-NSI-1', 'vimAccountId': self.vim_id, 'nstId': nst_id, 'nsiDescription': 'default'} + r = self.create_slice(engine, ns_data, "Create NSI-1 step 1") + if not r: + return + self.nsi_id1 = engine.last_id + + # INSTANTIATE NSI-1 + self.instantiate_slice(engine, ns_data, self.nsi_id1, "Instantiate NSI-1 step 2") + nsilcmop_id1 = engine.last_id + + # Waiting for NSI-1 + if test_osm: + engine.wait_operation_ready("nsi", nsilcmop_id1, timeout_deploy) + + # CREATE NSI-2 + ns_data = {'nsiName': 'Deploy-NSI-2', 'vimAccountId': self.vim_id, 'nstId': nst_id, 'nsiDescription': 'default'} + r = self.create_slice(engine, ns_data, "Create NSI-2 step 1") + if not r: + return + self.nsi_id2 = engine.last_id + + # INSTANTIATE NSI-2 + self.instantiate_slice(engine, ns_data, self.nsi_id2, "Instantiate NSI-2 step 2") + nsilcmop_id2 = engine.last_id + + # Waiting for NSI-2 + if test_osm: + engine.wait_operation_ready("nsi", nsilcmop_id2, timeout_deploy) + + if manual_check: + input('NSI-1 AND NSI-2 has been deployed. Perform manual check and press enter to resume') + + # TERMINATE NSI-1 + if test_osm: + self.terminate_slice(engine, self.nsi_id1, "Terminate NSI-1") + nsilcmop1_id = engine.last_id + + # Wait terminate NSI-1 + engine.wait_operation_ready("nsi", nsilcmop1_id, timeout_deploy) + + # DELETE NSI-1 + self.delete_slice(engine, self.nsi_id1, "Delete NS") + + if manual_check: + input('NSI-1 has been deleted. Perform manual check and press enter to resume') + + # CREATE NSI-3 + ns_data = {'nsiName': 'Deploy-NSI-3', 'vimAccountId': self.vim_id, 'nstId': nst_id, 'nsiDescription': 'default'} + r = self.create_slice(engine, ns_data, "Create NSI-3 step 1") + + if not r: + return + self.nsi_id3 = engine.last_id + + # INSTANTIATE NSI-3 + self.instantiate_slice(engine, ns_data, self.nsi_id3, "Instantiate NSI-3 step 2") + nsilcmop_id3 = engine.last_id + + # Wait Instantiate NSI-3 + if test_osm: + engine.wait_operation_ready("nsi", nsilcmop_id3, timeout_deploy) + + if manual_check: + input('NSI-3 has been deployed. Perform manual check and press enter to resume') + + # TERMINATE NSI-2 + if test_osm: + self.terminate_slice(engine, self.nsi_id2, "Terminate NSI-2") + nsilcmop2_id = engine.last_id + + # Wait terminate NSI-2 + engine.wait_operation_ready("nsi", nsilcmop2_id, timeout_deploy) + + # DELETE NSI-2 + self.delete_slice(engine, self.nsi_id2, "DELETE NSI-2") + + # TERMINATE NSI-3 + if test_osm: + self. terminate_slice(engine, self.nsi_id3, "Terminate NSI-3") + nsilcmop3_id = engine.last_id + + # Wait terminate NSI-3 + engine.wait_operation_ready("nsi", nsilcmop3_id, timeout_deploy) + + # DELETE NSI-3 + self.delete_slice(engine, self.nsi_id3, "DELETE NSI-3") + + if manual_check: + input('NSI-2 and NSI-3 has been deleted. Perform manual check and press enter to resume') + + # nstd DELETE + engine.test("Delete NSTD", "DELETE", "/nst/v1/netslice_templates/{}".format(nst_id), headers_json, None, + 204, None, 0) + + # NSDs DELETE + test_rest.test("Delete NSD middle", "DELETE", "/nsd/v1/ns_descriptors/{}".format(self.nsd_middle_id), + headers_json, None, 204, None, 0) + + test_rest.test("Delete NSD edge", "DELETE", "/nsd/v1/ns_descriptors/{}".format(self.nsd_edge_id), headers_json, + None, 204, None, 0) + + # VNFDs DELETE + test_rest.test("Delete VNFD edge", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_edge_id), + headers_yaml, None, 204, None, 0) + + test_rest.test("Delete VNFD middle", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_middle_id), + headers_yaml, None, 204, None, 0) + + +class TestAuthentication: + description = "Test Authentication" + + @staticmethod + def run(engine, test_osm, manual_check, test_params=None): + engine.set_test_name("Authentication") + # backend = test_params.get("backend") if test_params else None # UNUSED + + admin_project_id = test_project_id = None + project_admin_role_id = project_user_role_id = None + test_user_id = empty_user_id = None + default_role_id = empty_role_id = token_role_id = None + + engine.get_autorization() + + # GET + engine.test("Get tokens", "GET", "/admin/v1/tokens", headers_json, {}, + (200), {"Content-Type": "application/json"}, "json") + engine.test("Get projects", "GET", "/admin/v1/projects", headers_json, {}, + (200), {"Content-Type": "application/json"}, "json") + engine.test("Get users", "GET", "/admin/v1/users", headers_json, {}, + (200), {"Content-Type": "application/json"}, "json") + engine.test("Get roles", "GET", "/admin/v1/roles", headers_json, {}, + (200), {"Content-Type": "application/json"}, "json") + res = engine.test("Get admin project", "GET", "/admin/v1/projects?name=admin", headers_json, {}, + (200), {"Content-Type": "application/json"}, "json") + admin_project_id = res.json()[0]["_id"] if res else None + res = engine.test("Get project admin role", "GET", "/admin/v1/roles?name=project_admin", headers_json, {}, + (200), {"Content-Type": "application/json"}, "json") + project_admin_role_id = res.json()[0]["_id"] if res else None + res = engine.test("Get project user role", "GET", "/admin/v1/roles?name=project_user", headers_json, {}, + (200), {"Content-Type": "application/json"}, "json") + project_user_role_id = res.json()[0]["_id"] if res else None + + # POST + res = engine.test("Create test project", "POST", "/admin/v1/projects", headers_json, {"name": "test"}, + (201), {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json") + test_project_id = engine.last_id if res else None + res = engine.test("Create role without permissions", "POST", "/admin/v1/roles", headers_json, {"name": "empty"}, + (201), {"Content-Type": "application/json"}, "json") + empty_role_id = engine.last_id if res else None + res = engine.test("Create role with default permissions", "POST", "/admin/v1/roles", headers_json, + {"name": "default", "permissions": {"default": True}}, + (201), {"Location": "/admin/v1/roles/", "Content-Type": "application/json"}, "json") + default_role_id = engine.last_id if res else None + res = engine.test("Create role with token permissions", "POST", "/admin/v1/roles", headers_json, + {"name": "tokens", "permissions": {"tokens": True}}, # is default required ? + (201), {"Location": "/admin/v1/roles/", "Content-Type": "application/json"}, "json") + token_role_id = engine.last_id if res else None + pr = "project-role mappings" + res = engine.test("Create user without "+pr, "POST", "/admin/v1/users", headers_json, + {"username": "empty", "password": "empty"}, + 201, {"Content-Type": "application/json"}, "json") + empty_user_id = engine.last_id if res else None + if admin_project_id and test_project_id and project_admin_role_id and project_user_role_id: + data = {"username": "test", "password": "test"} + data["project_role_mappings"] = [ + {"project": test_project_id, "role": project_admin_role_id}, + {"project": admin_project_id, "role": project_user_role_id} + ] + res = engine.test("Create user with "+pr, "POST", "/admin/v1/users", headers_json, data, + (201), {"Content-Type": "application/json"}, "json") + test_user_id = engine.last_id if res else None + + # PUT + if test_user_id: + engine.test("Modify test user's password", "PUT", "/admin/v1/users/"+test_user_id, headers_json, + {"password": "password"}, + (204), {}, 0) + if empty_user_id and admin_project_id and test_project_id and project_admin_role_id and project_user_role_id: + data = {"project_role_mappings": [ + {"project": test_project_id, "role": project_admin_role_id}, + {"project": admin_project_id, "role": project_user_role_id} + ]} + engine.test("Modify empty user's "+pr, "PUT", "/admin/v1/users/"+empty_user_id, + headers_json, + data, + (204), {}, 0) + + # DELETE + if empty_user_id: + engine.test("Delete empty user", "DELETE", "/admin/v1/users/"+empty_user_id, headers_json, {}, + (204), {}, 0) + if test_user_id: + engine.test("Delete test user", "DELETE", "/admin/v1/users/"+test_user_id, headers_json, {}, + (204), {}, 0) + if empty_role_id: + engine.test("Delete empty role", "DELETE", "/admin/v1/roles/"+empty_role_id, headers_json, {}, + (204), {}, 0) + if default_role_id: + engine.test("Delete default role", "DELETE", "/admin/v1/roles/"+default_role_id, headers_json, {}, + (204), {}, 0) + if token_role_id: + engine.test("Delete token role", "DELETE", "/admin/v1/roles/"+token_role_id, headers_json, {}, + (204), {}, 0) + if test_project_id: + engine.test("Delete test project", "DELETE", "/admin/v1/projects/"+test_project_id, headers_json, {}, + (204), {}, 0) + + # END Tests + + engine.remove_authorization() # To finish + + +class TestNbiQuotas(): + description = "Test NBI Quotas" + + @staticmethod + def run(engine, test_osm, manual_check, test_params=None): + engine.set_test_name("NBI-Quotas_") + # backend = test_params.get("backend") if test_params else None # UNUSED + + test_username = "test-nbi-quotas" + test_password = "test-nbi-quotas" + test_project = "test-nbi-quotas" + + test_vim = "test-nbi-quotas" + test_wim = "test-nbi-quotas" + test_sdn = "test-nbi-quotas" + + test_user_id = None + test_project_id = None + + test_vim_ids = [] + test_wim_ids = [] + test_sdn_ids = [] + test_vnfd_ids = [] + test_nsd_ids = [] + test_nst_ids = [] + test_pdu_ids = [] + test_nsr_ids = [] + test_nsi_ids = [] + + # Save admin access data + admin_username = engine.user + admin_password = engine.password + admin_project = engine.project + + # Get admin access + engine.get_autorization() + admin_token = engine.last_id + + # Check that test project,user do not exist + res1 = engine.test("Check that test project doesn't exist", "GET", "/admin/v1/projects/"+test_project, + headers_json, {}, (404), {}, True) + res2 = engine.test("Check that test user doesn't exist", "GET", "/admin/v1/users/"+test_username, + headers_json, {}, (404), {}, True) + if None in [res1, res2]: + engine.remove_authorization() + logger.error("Test project and/or user already exist") + return + + # Create test project&user + res = engine.test("Create test project", "POST", "/admin/v1/projects", headers_json, + {"name": test_username, + "quotas": { + "vnfds": 2, + "nsds": 2, + "nsts": 1, + "pdus": 1, + "nsrs": 2, + "nsis": 1, + "vim_accounts": 1, + "wim_accounts": 1, + "sdns": 1, + } + }, + (201), r_header_json, "json") + test_project_id = engine.last_id if res else None + res = engine.test("Create test user", "POST", "/admin/v1/users", headers_json, + {"username": test_username, "password": test_password, + "project_role_mappings": [{"project": test_project, "role": "project_admin"}]}, + (201), r_header_json, "json") + test_user_id = engine.last_id if res else None + + if test_project_id and test_user_id: + + # Get user access + engine.token = None + engine.user = test_username + engine.password = test_password + engine.project = test_project + engine.get_autorization() + user_token = engine.last_id + + # Create test VIM + res = engine.test("Create test VIM", "POST", "/admin/v1/vim_accounts", headers_json, + {"name": test_vim, + "vim_type": "openvim", + "vim_user": test_username, + "vim_password": test_password, + "vim_tenant_name": test_project, + "vim_url": "https://0.0.0.0:0/v0.0", + }, + (202), r_header_json, "json") + test_vim_ids += [engine.last_id if res else None] + + res = engine.test("Try to create second test VIM", "POST", "/admin/v1/vim_accounts", headers_json, + {"name": test_vim + "_2", + "vim_type": "openvim", + "vim_user": test_username, + "vim_password": test_password, + "vim_tenant_name": test_project, + "vim_url": "https://0.0.0.0:0/v0.0", + }, + (422), r_header_json, "json") + test_vim_ids += [engine.last_id if res is None else None] + + res = engine.test("Try to create second test VIM with FORCE", + "POST", "/admin/v1/vim_accounts?FORCE", headers_json, + {"name": test_vim + "_3", + "vim_type": "openvim", + "vim_user": test_username, + "vim_password": test_password, + "vim_tenant_name": test_project, + "vim_url": "https://0.0.0.0:0/v0.0", + }, + (202), r_header_json, "json") + test_vim_ids += [engine.last_id if res else None] + + if test_vim_ids[0]: + + # Download descriptor files (if required) + test_dir = "/tmp/"+test_username+"/" + test_url = "https://osm-download.etsi.org/ftp/osm-6.0-six/7th-hackfest/packages/" + vnfd_filenames = ["slice_hackfest_vnfd.tar.gz", "slice_hackfest_middle_vnfd.tar.gz"] + nsd_filenames = ["slice_hackfest_nsd.tar.gz", "slice_hackfest_middle_nsd.tar.gz"] + nst_filenames = ["slice_hackfest_nstd.yaml"] + pdu_filenames = ["PDU_router.yaml"] + desc_filenames = vnfd_filenames + nsd_filenames + nst_filenames + pdu_filenames + if not os.path.exists(test_dir): + os.makedirs(test_dir) + for filename in desc_filenames: + if not os.path.exists(test_dir+filename): + res = requests.get(test_url+filename) + if res.status_code < 300: + with open(test_dir+filename, "wb") as file: + file.write(res.content) + + if all([os.path.exists(test_dir+p) for p in desc_filenames]): + + # Test VNFD Quotas + res = engine.test("Create test VNFD #1", "POST", "/vnfpkgm/v1/vnf_packages_content", + headers_zip_json, "@b"+test_dir+vnfd_filenames[0], + (201), r_header_json, "json") + test_vnfd_ids += [engine.last_id if res else None] + res = engine.test("Create test VNFD #2", "POST", "/vnfpkgm/v1/vnf_packages_content", + headers_zip_json, "@b"+test_dir+vnfd_filenames[1], + (201), r_header_json, "json") + test_vnfd_ids += [engine.last_id if res else None] + res = engine.test("Try to create extra test VNFD", "POST", + "/vnfpkgm/v1/vnf_packages_content", + headers_zip_json, "@b"+test_dir+vnfd_filenames[0], + (422), r_header_json, "json") + test_vnfd_ids += [engine.last_id if res is None else None] + res = engine.test("Try to create extra test VNFD with FORCE", + "POST", "/vnfpkgm/v1/vnf_packages_content?FORCE", + headers_zip_json, "@b"+test_dir+vnfd_filenames[0], + (201), r_header_json, "json") + test_vnfd_ids += [engine.last_id if res else None] + + # Remove extra VNFDs to prevent further errors + for i in [2, 3]: + if test_vnfd_ids[i]: + res = engine.test("Delete test VNFD #" + str(i), "DELETE", + "/vnfpkgm/v1/vnf_packages_content/"+test_vnfd_ids[i]+"?FORCE", + headers_json, {}, (204), {}, 0) + if res: + test_vnfd_ids[i] = None + + if test_vnfd_ids[0] and test_vnfd_ids[1]: + + # Test NSD Quotas + res = engine.test("Create test NSD #1", "POST", "/nsd/v1/ns_descriptors_content", + headers_zip_json, "@b"+test_dir+nsd_filenames[0], + (201), r_header_json, "json") + test_nsd_ids += [engine.last_id if res else None] + res = engine.test("Create test NSD #2", "POST", "/nsd/v1/ns_descriptors_content", + headers_zip_json, "@b"+test_dir+nsd_filenames[1], + (201), r_header_json, "json") + test_nsd_ids += [engine.last_id if res else None] + res = engine.test("Try to create extra test NSD", "POST", "/nsd/v1/ns_descriptors_content", + headers_zip_json, "@b"+test_dir+nsd_filenames[0], + (422), r_header_json, "json") + test_nsd_ids += [engine.last_id if res is None else None] + res = engine.test("Try to create extra test NSD with FORCE", + "POST", "/nsd/v1/ns_descriptors_content?FORCE", + headers_zip_json, "@b"+test_dir+nsd_filenames[0], + (201), r_header_json, "json") + test_nsd_ids += [engine.last_id if res else None] + + # Remove extra NSDs to prevent further errors + for i in [2, 3]: + if test_nsd_ids[i]: + res = engine.test("Delete test NSD #" + str(i), "DELETE", + "/nsd/v1/ns_descriptors_content/"+test_nsd_ids[i]+"?FORCE", + headers_json, {}, (204), {}, 0) + if res: + test_nsd_ids[i] = None + + if test_nsd_ids[0] and test_nsd_ids[1]: + + # Test NSR Quotas + res = engine.test("Create test NSR #1", "POST", "/nslcm/v1/ns_instances_content", + headers_json, + {"nsName": test_username+"_1", + "nsdId": test_nsd_ids[0], + "vimAccountId": test_vim_ids[0], + }, + (201), r_header_json, "json") + test_nsr_ids += [engine.last_id if res else None] + res = engine.test("Create test NSR #2", "POST", "/nslcm/v1/ns_instances_content", + headers_json, + {"nsName": test_username+"_2", + "nsdId": test_nsd_ids[1], + "vimAccountId": test_vim_ids[0], + }, + (201), r_header_json, "json") + test_nsr_ids += [engine.last_id if res else None] + res = engine.test("Try to create extra test NSR", "POST", "/nslcm/v1/ns_instances_content", + headers_json, + {"nsName": test_username+"_3", + "nsdId": test_nsd_ids[0], + "vimAccountId": test_vim_ids[0], + }, + (422), r_header_json, "json") + test_nsr_ids += [engine.last_id if res is None else None] + res = engine.test("Try to create test NSR with FORCE", "POST", + "/nslcm/v1/ns_instances_content?FORCE", headers_json, + {"nsName": test_username+"_4", + "nsdId": test_nsd_ids[0], + "vimAccountId": test_vim_ids[0], + }, + (201), r_header_json, "json") + test_nsr_ids += [engine.last_id if res else None] + + # Test NST Quotas + res = engine.test("Create test NST", "POST", "/nst/v1/netslice_templates_content", + headers_txt_json, "@b"+test_dir+nst_filenames[0], + (201), r_header_json, "json") + test_nst_ids += [engine.last_id if res else None] + res = engine.test("Try to create extra test NST", "POST", + "/nst/v1/netslice_templates_content", + headers_txt_json, "@b"+test_dir+nst_filenames[0], + (422), r_header_json, "json") + test_nst_ids += [engine.last_id if res is None else None] + res = engine.test("Try to create extra test NST with FORCE", "POST", + "/nst/v1/netslice_templates_content?FORCE", + headers_txt_json, "@b"+test_dir+nst_filenames[0], + (201), r_header_json, "json") + test_nst_ids += [engine.last_id if res else None] + + if test_nst_ids[0]: + # Remove NSR Quota + engine.set_header({"Authorization": "Bearer {}".format(admin_token)}) + res = engine.test("Remove NSR Quota", "PUT", "/admin/v1/projects/"+test_project_id, + headers_json, + {"quotas": {"nsrs": None}}, + (204), {}, 0) + engine.set_header({"Authorization": "Bearer {}".format(user_token)}) + if res: + # Test NSI Quotas + res = engine.test("Create test NSI", "POST", + "/nsilcm/v1/netslice_instances_content", headers_json, + {"nsiName": test_username, + "nstId": test_nst_ids[0], + "vimAccountId": test_vim_ids[0], + }, + (201), r_header_json, "json") + test_nsi_ids += [engine.last_id if res else None] + res = engine.test("Try to create extra test NSI", "POST", + "/nsilcm/v1/netslice_instances_content", headers_json, + {"nsiName": test_username, + "nstId": test_nst_ids[0], + "vimAccountId": test_vim_ids[0], + }, + (400), r_header_json, "json") + test_nsi_ids += [engine.last_id if res is None else None] + res = engine.test("Try to create extra test NSI with FORCE", "POST", + "/nsilcm/v1/netslice_instances_content?FORCE", headers_json, + {"nsiName": test_username, + "nstId": test_nst_ids[0], + "vimAccountId": test_vim_ids[0], + }, + (201), r_header_json, "json") + test_nsi_ids += [engine.last_id if res else None] + + # Test PDU Quotas + with open(test_dir+pdu_filenames[0], "rb") as file: + pdu_text = re.sub(r"ip-address: *\[[^\]]*\]", "ip-address: '0.0.0.0'", + file.read().decode("utf-8")) + with open(test_dir+pdu_filenames[0], "wb") as file: + file.write(pdu_text.encode("utf-8")) + res = engine.test("Create test PDU", "POST", "/pdu/v1/pdu_descriptors", + headers_yaml, "@b"+test_dir+pdu_filenames[0], + (201), r_header_yaml, "yaml") + test_pdu_ids += [engine.last_id if res else None] + res = engine.test("Try to create extra test PDU", "POST", "/pdu/v1/pdu_descriptors", + headers_yaml, "@b"+test_dir+pdu_filenames[0], + (422), r_header_yaml, "yaml") + test_pdu_ids += [engine.last_id if res is None else None] + res = engine.test("Try to create extra test PDU with FORCE", "POST", + "/pdu/v1/pdu_descriptors?FORCE", + headers_yaml, "@b"+test_dir+pdu_filenames[0], + (201), r_header_yaml, "yaml") + test_pdu_ids += [engine.last_id if res else None] + + # Cleanup + for i, id in enumerate(test_nsi_ids): + if id: + engine.test("Delete test NSI #"+str(i), "DELETE", + "/nsilcm/v1/netslice_instances_content/"+id+"?FORCE", + headers_json, {}, (204), {}, 0) + for i, id in enumerate(test_nsr_ids): + if id: + engine.test("Delete test NSR #"+str(i), "DELETE", + "/nslcm/v1/ns_instances_content/"+id+"?FORCE", + headers_json, {}, (204), {}, 0) + for i, id in enumerate(test_nst_ids): + if id: + engine.test("Delete test NST #"+str(i), "DELETE", + "/nst/v1/netslice_templates_content/"+id+"?FORCE", + headers_json, {}, (204), {}, 0) + for i, id in enumerate(test_nsd_ids): + if id: + engine.test("Delete test NSD #"+str(i), "DELETE", + "/nsd/v1/ns_descriptors_content/"+id+"?FORCE", + headers_json, {}, (204), {}, 0) + for i, id in enumerate(test_vnfd_ids): + if id: + engine.test("Delete test VNFD #"+str(i), "DELETE", + "/vnfpkgm/v1/vnf_packages_content/"+id+"?FORCE", + headers_json, {}, (204), {}, 0) + for i, id in enumerate(test_pdu_ids): + if id: + engine.test("Delete test PDU #"+str(i), "DELETE", + "/pdu/v1/pdu_descriptors/"+id+"?FORCE", + headers_json, {}, (204), {}, 0) + + # END Test NBI Quotas + + # Test WIM Quotas + res = engine.test("Create test WIM", "POST", "/admin/v1/wim_accounts", headers_json, + {"name": test_wim, + "wim_type": "onos", + "wim_url": "https://0.0.0.0:0/v0.0", + }, + (202), r_header_json, "json") + test_wim_ids += [engine.last_id if res else None] + res = engine.test("Try to create second test WIM", "POST", "/admin/v1/wim_accounts", headers_json, + {"name": test_wim + "_2", + "wim_type": "onos", + "wim_url": "https://0.0.0.0:0/v0.0", + }, + (422), r_header_json, "json") + test_wim_ids += [engine.last_id if res is None else None] + res = engine.test("Try to create second test WIM with FORCE", "POST", "/admin/v1/wim_accounts?FORCE", + headers_json, + {"name": test_wim + "_3", + "wim_type": "onos", + "wim_url": "https://0.0.0.0:0/v0.0", + }, + (202), r_header_json, "json") + test_wim_ids += [engine.last_id if res else None] + + # Test SDN Quotas + res = engine.test("Create test SDN", "POST", "/admin/v1/sdns", headers_json, + {"name": test_sdn, + "type": "onos", + "ip": "0.0.0.0", + "port": 9999, + "dpid": "00:00:00:00:00:00:00:00", + }, + (202), r_header_json, "json") + test_sdn_ids += [engine.last_id if res else None] + res = engine.test("Try to create second test SDN", "POST", "/admin/v1/sdns", headers_json, + {"name": test_sdn + "_2", + "type": "onos", + "ip": "0.0.0.0", + "port": 9999, + "dpid": "00:00:00:00:00:00:00:00", + }, + (422), r_header_json, "json") + test_sdn_ids += [engine.last_id if res is None else None] + res = engine.test("Try to create second test SDN with FORCE", "POST", "/admin/v1/sdns?FORCE", headers_json, + {"name": test_sdn + "_3", + "type": "onos", + "ip": "0.0.0.0", + "port": 9999, + "dpid": "00:00:00:00:00:00:00:00", + }, + (202), r_header_json, "json") + test_sdn_ids += [engine.last_id if res else None] + + # Cleanup + for i, id in enumerate(test_vim_ids): + if id: + engine.test("Delete test VIM #"+str(i), "DELETE", "/admin/v1/vim_accounts/"+id+"?FORCE", + headers_json, {}, (202), {}, 0) + for i, id in enumerate(test_wim_ids): + if id: + engine.test("Delete test WIM #"+str(i), "DELETE", "/admin/v1/wim_accounts/"+id+"?FORCE", + headers_json, {}, (202), {}, 0) + for i, id in enumerate(test_sdn_ids): + if id: + engine.test("Delete test SDN #"+str(i), "DELETE", "/admin/v1/sdns/"+id+"?FORCE", + headers_json, {}, (202), {}, 0) + + # Release user access + engine.remove_authorization() + + # Cleanup + engine.user = admin_username + engine.password = admin_password + engine.project = admin_project + engine.get_autorization() + if test_user_id: + engine.test("Delete test user", "DELETE", "/admin/v1/users/"+test_user_id+"?FORCE", + headers_json, {}, (204), {}, 0) + if test_project_id: + engine.test("Delete test project", "DELETE", "/admin/v1/projects/"+test_project_id+"?FORCE", + headers_json, {}, (204), {}, 0) + engine.remove_authorization() + + # END class TestNbiQuotas if __name__ == "__main__": @@ -1374,28 +3127,39 @@ if __name__ == "__main__": opts, args = getopt.getopt(sys.argv[1:], "hvu:p:", ["url=", "user=", "password=", "help", "version", "verbose", "no-verbose", "project=", "insecure", "timeout", "timeout-deploy", "timeout-configure", - "test=", "list", "test-osm", "manual-check", "params="]) + "test=", "list", "test-osm", "manual-check", "params=", 'fail-fast']) url = "https://localhost:9999/osm" user = password = project = "admin" test_osm = False manual_check = False verbose = 0 verify = True + fail_fast = False test_classes = { "NonAuthorized": TestNonAuthorized, "FakeVIM": TestFakeVim, - "TestUsersProjects": TestUsersProjects, + "Users-Projects": TestUsersProjects, + "Projects-Descriptors": TestProjectsDescriptors, "VIM-SDN": TestVIMSDN, "Deploy-Custom": TestDeploy, "Deploy-Hackfest-Cirros": TestDeployHackfestCirros, "Deploy-Hackfest-Cirros-Scaling": TestDeployHackfestCirrosScaling, "Deploy-Hackfest-3Charmed": TestDeployHackfest3Charmed, + "Deploy-Hackfest-3Charmed2": TestDeployHackfest3Charmed2, + "Deploy-Hackfest-3Charmed3": TestDeployHackfest3Charmed3, "Deploy-Hackfest-4": TestDeployHackfest4, "Deploy-CirrosMacIp": TestDeployIpMac, - "TestDescriptors": TestDescriptors, - "TestDeployHackfest1": TestDeployHackfest1, + "Descriptors": TestDescriptors, + "Deploy-Hackfest1": TestDeployHackfest1, # "Deploy-MultiVIM": TestDeployMultiVIM, - "Upload-Slice-Template": TestNstTemplates, + "Deploy-SingleVdu": TestDeploySingleVdu, + "Deploy-Hnfd": TestDeployHnfd, + "Upload-Slice-Template": TestNetSliceTemplates, + "Deploy-Slice-Instance": TestNetSliceInstances, + "Deploy-SimpleCharm": TestDeploySimpleCharm, + "Deploy-SimpleCharm2": TestDeploySimpleCharm2, + "Authentication": TestAuthentication, + "NBI-Quotas": TestNbiQuotas, } test_to_do = [] test_params = {} @@ -1406,8 +3170,8 @@ if __name__ == "__main__": print("test version " + __version__ + ' ' + version_date) exit() elif o == "--list": - for test, test_class in test_classes.items(): - print("{:20} {}".format(test + ":", test_class.description)) + for test, test_class in sorted(test_classes.items()): + print("{:32} {}".format(test + ":", test_class.description)) exit() elif o in ("-v", "--verbose"): verbose += 1 @@ -1428,6 +3192,8 @@ if __name__ == "__main__": password = a elif o == "--project": project = a + elif o == "--fail-fast": + fail_fast = True elif o == "--test": # print("asdfadf", o, a, a.split(",")) for _test in a.split(","): @@ -1464,16 +3230,18 @@ if __name__ == "__main__": if test_to_do: text_index = 0 for test in test_to_do: + if fail_fast and test_rest.failed_tests: + break text_index += 1 test_class = test_classes[test] test_class().run(test_rest, test_osm, manual_check, test_params.get(text_index)) else: - for test, test_class in test_classes.items(): + for test, test_class in sorted(test_classes.items()): + if fail_fast and test_rest.failed_tests: + break test_class().run(test_rest, test_osm, manual_check, test_params.get(0)) - exit(0) - - # get token - print("PASS") + test_rest.print_results() + exit(1 if test_rest.failed_tests else 0) except TestException as e: logger.error(test + "Test {} Exception: {}".format(test, str(e)))