blob: 3033a346d8abd64a43ee9f8562ed15cacbddc8b0 [file] [log] [blame]
#! /usr/bin/python3
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import getopt
import sys
import requests
import json
import logging
import yaml
# import json
# import tarfile
from time import sleep
from random import randint
import os
from sys import stderr
from uuid import uuid4
import re
__author__ = "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com"
__date__ = "$2018-03-01$"
__version__ = "0.3"
version_date = "Oct 2018"
def usage():
print("Usage: ", sys.argv[0], "[options]")
print(" Performs system tests over running NBI. It can be used for real OSM test using option '--test-osm'")
print(" If this is the case env variables 'OSMNBITEST_VIM_NAME' must be supplied to create a VIM if not exist "
"where deployment is done")
print("OPTIONS")
print(" -h|--help: shows this help")
print(" --insecure: Allows non trusted https NBI server")
print(" --list: list available tests")
print(" --manual-check: Deployment tests stop after deployed to allow manual inspection. Only make sense with "
"'--test-osm'")
print(" -p|--password PASSWORD: NBI access password. 'admin' by default")
print(" ---project PROJECT: NBI access project. 'admin' by default")
print(" --test TEST[,...]: Execute only a test or a comma separated list of tests")
print(" --params key=val: params to the previous test. key can be vnfd-files, nsd-file, ns-name, ns-config")
print(" --test-osm: If missing this test is intended for NBI only, no other OSM components are expected. Use "
"this flag to test the system. LCM and RO components are expected to be up and running")
print(" --timeout TIMEOUT: General NBI timeout, by default {}s".format(timeout))
print(" --timeout-deploy TIMEOUT: Timeout used for getting NS deployed, by default {}s".format(timeout_deploy))
print(" --timeout-configure TIMEOUT: Timeout used for getting NS deployed and configured,"
" by default {}s".format(timeout_configure))
print(" -u|--user USERNAME: NBI access username. 'admin' by default")
print(" --url URL: complete NBI server URL. 'https//localhost:9999/osm' by default")
print(" -v|--verbose print debug information, can be used several times")
print(" --no-verbose remove verbosity")
print(" --version: prints current version")
print("ENV variables used for real deployment tests with option osm-test.")
print(" export OSMNBITEST_VIM_NAME=vim-name")
print(" export OSMNBITEST_VIM_URL=vim-url")
print(" export OSMNBITEST_VIM_TYPE=vim-type")
print(" export OSMNBITEST_VIM_TENANT=vim-tenant")
print(" export OSMNBITEST_VIM_USER=vim-user")
print(" export OSMNBITEST_VIM_PASSWORD=vim-password")
print(" export OSMNBITEST_VIM_CONFIG=\"vim-config\"")
print(" export OSMNBITEST_NS_NAME=\"vim-config\"")
return
r_header_json = {"Content-type": "application/json"}
headers_json = {"Content-type": "application/json", "Accept": "application/json"}
r_header_yaml = {"Content-type": "application/yaml"}
headers_yaml = {"Content-type": "application/yaml", "Accept": "application/yaml"}
r_header_text = {"Content-type": "text/plain"}
r_header_octect = {"Content-type": "application/octet-stream"}
headers_text = {"Accept": "text/plain,application/yaml"}
r_header_zip = {"Content-type": "application/zip"}
headers_zip = {"Accept": "application/zip,application/yaml"}
headers_zip_yaml = {"Accept": "application/yaml", "Content-type": "application/zip"}
headers_zip_json = {"Accept": "application/json", "Content-type": "application/zip"}
headers_txt_json = {"Accept": "application/json", "Content-type": "text/plain"}
r_headers_yaml_location_vnfd = {"Location": "/vnfpkgm/v1/vnf_packages_content/", "Content-Type": "application/yaml"}
r_headers_yaml_location_nsd = {"Location": "/nsd/v1/ns_descriptors_content/", "Content-Type": "application/yaml"}
r_headers_yaml_location_nst = {"Location": "/nst/v1/netslice_templates_content", "Content-Type": "application/yaml"}
r_headers_yaml_location_nslcmop = {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}
r_headers_yaml_location_nsilcmop = {"Location": "/osm/nsilcm/v1/nsi_lcm_op_occs/", "Content-Type": "application/yaml"}
# test ones authorized
test_authorized_list = (
("AU1", "Invalid vnfd id", "GET", "/vnfpkgm/v1/vnf_packages/non-existing-id",
headers_json, None, 404, r_header_json, "json"),
("AU2", "Invalid nsd id", "GET", "/nsd/v1/ns_descriptors/non-existing-id",
headers_yaml, None, 404, r_header_yaml, "yaml"),
("AU3", "Invalid nsd id", "DELETE", "/nsd/v1/ns_descriptors_content/non-existing-id",
headers_yaml, None, 404, r_header_yaml, "yaml"),
)
timeout = 120 # general timeout
timeout_deploy = 60*10 # timeout for NS deploying without charms
timeout_configure = 60*20 # timeout for NS deploying and configuring
class TestException(Exception):
pass
class TestRest:
def __init__(self, url_base, header_base=None, verify=False, user="admin", password="admin", project="admin"):
self.url_base = url_base
if header_base is None:
self.header_base = {}
else:
self.header_base = header_base.copy()
self.s = requests.session()
self.s.headers = self.header_base
self.verify = verify
self.token = False
self.user = user
self.password = password
self.project = project
self.vim_id = None
# contains ID of tests obtained from Location response header. "" key contains last obtained id
self.last_id = ""
self.test_name = None
self.step = 0 # number of subtest under test
self.passed_tests = 0
self.failed_tests = 0
def set_test_name(self, test_name):
self.test_name = test_name
self.step = 0
self.last_id = ""
def set_header(self, header):
self.s.headers.update(header)
def set_tet_name(self, test_name):
self.test_name = test_name
def unset_header(self, key):
if key in self.s.headers:
del self.s.headers[key]
def test(self, description, method, url, headers, payload, expected_codes, expected_headers,
expected_payload, store_file=None, pooling=False):
"""
Performs an http request and check http code response. Exit if different than allowed. It get the returned id
that can be used by following test in the URL with {name} where name is the name of the test
:param description: description of the test
:param method: HTTP method: GET,PUT,POST,DELETE,...
:param url: complete URL or relative URL
:param headers: request headers to add to the base headers
:param payload: Can be a dict, transformed to json, a text or a file if starts with '@'
:param expected_codes: expected response codes, can be int, int tuple or int range
:param expected_headers: expected response headers, dict with key values
:param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip', 'octet-stream'
:param store_file: filename to store content
:param pooling: if True do not count neither log this test. Because a pooling is done with many equal requests
:return: requests response
"""
r = None
try:
if not self.s:
self.s = requests.session()
# URL
if not url:
url = self.url_base
elif not url.startswith("http"):
url = self.url_base + url
# replace url <> with the last ID
url = url.replace("<>", self.last_id)
if payload:
if isinstance(payload, str):
if payload.startswith("@"):
mode = "r"
file_name = payload[1:]
if payload.startswith("@b"):
mode = "rb"
file_name = payload[2:]
with open(file_name, mode) as f:
payload = f.read()
elif isinstance(payload, dict):
payload = json.dumps(payload)
if not pooling:
test_description = "Test {}{} {} {} {}".format(self.test_name, self.step, description, method, url)
logger.warning(test_description)
self.step += 1
stream = False
if expected_payload in ("zip", "octet-string") or store_file:
stream = True
__retry = 0
while True:
try:
r = getattr(self.s, method.lower())(url, data=payload, headers=headers, verify=self.verify,
stream=stream)
break
except requests.exceptions.ConnectionError as e:
if __retry == 2:
raise
logger.error("Exception {}. Retrying".format(e))
__retry += 1
if expected_payload in ("zip", "octet-string") or store_file:
logger.debug("RX {}".format(r.status_code))
else:
logger.debug("RX {}: {}".format(r.status_code, r.text))
# check response
if expected_codes:
if isinstance(expected_codes, int):
expected_codes = (expected_codes,)
if r.status_code not in expected_codes:
raise TestException(
"Got status {}. Expected {}. {}".format(r.status_code, expected_codes, r.text))
if expected_headers:
for header_key, header_val in expected_headers.items():
if header_key.lower() not in r.headers:
raise TestException("Header {} not present".format(header_key))
if header_val and header_val.lower() not in r.headers[header_key]:
raise TestException("Header {} does not contain {} but {}".format(header_key, header_val,
r.headers[header_key]))
if expected_payload is not None:
if expected_payload == 0 and len(r.content) > 0:
raise TestException("Expected empty payload")
elif expected_payload == "json":
try:
r.json()
except Exception as e:
raise TestException("Expected json response payload, but got Exception {}".format(e))
elif expected_payload == "yaml":
try:
yaml.safe_load(r.text)
except Exception as e:
raise TestException("Expected yaml response payload, but got Exception {}".format(e))
elif expected_payload in ("zip", "octet-string"):
if len(r.content) == 0:
raise TestException("Expected some response payload, but got empty")
# try:
# tar = tarfile.open(None, 'r:gz', fileobj=r.raw)
# for tarinfo in tar:
# tarname = tarinfo.name
# print(tarname)
# except Exception as e:
# raise TestException("Expected zip response payload, but got Exception {}".format(e))
elif expected_payload == "text":
if len(r.content) == 0:
raise TestException("Expected some response payload, but got empty")
# r.text
if store_file:
with open(store_file, 'wb') as fd:
for chunk in r.iter_content(chunk_size=128):
fd.write(chunk)
location = r.headers.get("Location")
if location:
_id = location[location.rfind("/") + 1:]
if _id:
self.last_id = str(_id)
if not pooling:
self.passed_tests += 1
return r
except TestException as e:
self.failed_tests += 1
r_status_code = None
r_text = None
if r:
r_status_code = r.status_code
r_text = r.text
logger.error("{} \nRX code{}: {}".format(e, r_status_code, r_text))
return None
# exit(1)
except IOError as e:
if store_file:
logger.error("Cannot open file {}: {}".format(store_file, e))
else:
logger.error("Exception: {}".format(e), exc_info=True)
self.failed_tests += 1
return None
# exit(1)
except requests.exceptions.RequestException as e:
logger.error("Exception: {}".format(e))
def get_autorization(self): # user=None, password=None, project=None):
if self.token: # and self.user == user and self.password == password and self.project == project:
return
# self.user = user
# self.password = password
# self.project = project
r = self.test("Obtain token", "POST", "/admin/v1/tokens", headers_json,
{"username": self.user, "password": self.password, "project_id": self.project},
(200, 201), r_header_json, "json")
if not r:
return
response = r.json()
self.token = response["id"]
self.set_header({"Authorization": "Bearer {}".format(self.token)})
def remove_authorization(self):
if self.token:
self.test("Delete token", "DELETE", "/admin/v1/tokens/{}".format(self.token), headers_json,
None, (200, 201, 204), None, None)
self.token = None
self.unset_header("Authorization")
def get_create_vim(self, test_osm):
if self.vim_id:
return self.vim_id
self.get_autorization()
if test_osm:
vim_name = os.environ.get("OSMNBITEST_VIM_NAME")
if not vim_name:
raise TestException(
"Needed to define OSMNBITEST_VIM_XXX variables to create a real VIM for deployment")
else:
vim_name = "fakeVim"
# Get VIM
r = self.test("Get VIM ID", "GET", "/admin/v1/vim_accounts?name={}".format(vim_name), headers_json,
None, 200, r_header_json, "json")
if not r:
return
vims = r.json()
if vims:
return vims[0]["_id"]
# Add VIM
if test_osm:
# check needed environ parameters:
if not os.environ.get("OSMNBITEST_VIM_URL") or not os.environ.get("OSMNBITEST_VIM_TENANT"):
raise TestException("Env OSMNBITEST_VIM_URL and OSMNBITEST_VIM_TENANT are needed for create a real VIM"
" to deploy on whit the --test-osm option")
vim_data = "{{schema_version: '1.0', name: '{}', vim_type: {}, vim_url: '{}', vim_tenant_name: '{}', "\
"vim_user: {}, vim_password: {}".format(vim_name,
os.environ.get("OSMNBITEST_VIM_TYPE", "openstack"),
os.environ.get("OSMNBITEST_VIM_URL"),
os.environ.get("OSMNBITEST_VIM_TENANT"),
os.environ.get("OSMNBITEST_VIM_USER"),
os.environ.get("OSMNBITEST_VIM_PASSWORD"))
if os.environ.get("OSMNBITEST_VIM_CONFIG"):
vim_data += " ,config: {}".format(os.environ.get("OSMNBITEST_VIM_CONFIG"))
vim_data += "}"
else:
vim_data = "{schema_version: '1.0', name: fakeVim, vim_type: openstack, vim_url: 'http://10.11.12.13/fake'"\
", vim_tenant_name: 'vimtenant', vim_user: vimuser, vim_password: vimpassword}"
self.test("Create VIM", "POST", "/admin/v1/vim_accounts", headers_yaml, vim_data,
(201, 202), {"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/yaml"}, "yaml")
return self.last_id
def print_results(self):
print("\n\n\n--------------------------------------------")
print("TEST RESULTS: Total: {}, Passed: {}, Failed: {}".format(self.passed_tests + self.failed_tests,
self.passed_tests, self.failed_tests))
print("--------------------------------------------")
def wait_until_delete(self, url_op, timeout_delete):
"""
Make a pooling until topic is not present, because of deleted
:param url_op:
:param timeout_delete:
:return:
"""
description = "Wait to topic being deleted"
test_description = "Test {}{} {} {} {}".format(self.test_name, self.step, description, "GET", url_op)
logger.warning(test_description)
self.step += 1
wait = timeout_delete
while wait >= 0:
r = self.test(description, "GET", url_op, headers_yaml, None, (200, 404), None, r_header_yaml, "yaml",
pooling=True)
if not r:
return
if r.status_code == 404:
self.passed_tests += 1
break
elif r.status_code == 200:
wait -= 5
sleep(5)
else:
raise TestException("Topic is not deleted after {} seconds".format(timeout_delete))
self.failed_tests += 1
def wait_operation_ready(self, ns_nsi, opp_id, timeout, expected_fail=False):
"""
Wait until nslcmop or nsilcmop finished
:param ns_nsi: "ns" o "nsi"
:param opp_id: Id o fthe operation
:param timeout:
:param expected_fail:
:return: None. Updates passed/failed_tests
"""
if ns_nsi == "ns":
url_op = "/nslcm/v1/ns_lcm_op_occs/{}".format(opp_id)
else:
url_op = "/nsilcm/v1/nsi_lcm_op_occs/{}".format(opp_id)
description = "Wait to {} lcm operation complete".format(ns_nsi)
test_description = "Test {}{} {} {} {}".format(self.test_name, self.step, description, "GET", url_op)
logger.warning(test_description)
self.step += 1
wait = timeout
while wait >= 0:
r = self.test(description, "GET", url_op, headers_json, None,
200, r_header_json, "json", pooling=True)
if not r:
return
nslcmop = r.json()
if "COMPLETED" in nslcmop["operationState"]:
if expected_fail:
logger.error("NS terminate has success, expecting failing: {}".format(nslcmop["detailed-status"]))
self.failed_tests += 1
else:
self.passed_tests += 1
break
elif "FAILED" in nslcmop["operationState"]:
if not expected_fail:
logger.error("NS terminate has failed: {}".format(nslcmop["detailed-status"]))
self.failed_tests += 1
else:
self.passed_tests += 1
break
print(".", end="", file=stderr)
wait -= 10
sleep(10)
else:
self.failed_tests += 1
logger.error("NS instantiate is not terminate after {} seconds".format(timeout))
return
print("", file=stderr)
class TestNonAuthorized:
description = "Test invalid URLs. methods and no authorization"
@staticmethod
def run(engine, test_osm, manual_check, test_params=None):
engine.set_test_name("NonAuth")
engine.remove_authorization()
test_not_authorized_list = (
("Invalid token", "GET", "/admin/v1/users", headers_json, None, 401, r_header_json, "json"),
("Invalid URL", "POST", "/admin/v1/nonexist", headers_yaml, None, 405, r_header_yaml, "yaml"),
("Invalid version", "DELETE", "/admin/v2/users", headers_yaml, None, 405, r_header_yaml, "yaml"),
)
for t in test_not_authorized_list:
engine.test(*t)
class TestUsersProjects:
description = "test project and user creation"
@staticmethod
def run(engine, test_osm, manual_check, test_params=None):
engine.set_test_name("UserProject")
# backend = test_params.get("backend") if test_params else None # UNUSED
# Initialisation
p1 = p2 = p3 = None
padmin = pbad = None
u1 = u2 = u3 = u4 = None
engine.get_autorization()
res = engine.test("Create project non admin 1", "POST", "/admin/v1/projects", headers_json, {"name": "P1"},
(201, 204), {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json")
p1 = engine.last_id if res else None
res = engine.test("Create project admin", "POST", "/admin/v1/projects", headers_json,
{"name": "Padmin", "admin": True}, (201, 204),
{"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json")
padmin = engine.last_id if res else None
res = engine.test("Create project bad format", "POST", "/admin/v1/projects", headers_json, {"name": 1},
(400, 422), r_header_json, "json")
pbad = engine.last_id if res else None
res = engine.test("Get project admin role", "GET", "/admin/v1/roles?name=project_admin", headers_json, {},
(200), {"Content-Type": "application/json"}, "json")
rpa = res.json()[0]["_id"] if res else None
res = engine.test("Get project user role", "GET", "/admin/v1/roles?name=project_user", headers_json, {},
(200), {"Content-Type": "application/json"}, "json")
rpu = res.json()[0]["_id"] if res else None
res = engine.test("Get system admin role", "GET", "/admin/v1/roles?name=system_admin", headers_json, {},
(200), {"Content-Type": "application/json"}, "json")
rsa = res.json()[0]["_id"] if res else None
data = {"username": "U1", "password": "pw1"}
p2 = uuid4().hex
data["project_role_mappings"] = [
{"project": p1, "role": rpa},
{"project": p2, "role": rpa},
{"project": padmin, "role": rpu}
]
rc = 201
xhd = {"Location": "/admin/v1/users/", "Content-Type": "application/json"}
res = engine.test("Create user with bad project and force", "POST", "/admin/v1/users?FORCE=True", headers_json,
data, rc, xhd, "json")
if res:
u1 = engine.last_id
else:
# User is created sometimes even though an exception is raised
res = engine.test("Get user U1", "GET", "/admin/v1/users?username=U1", headers_json, {},
(200), {"Content-Type": "application/json"}, "json")
u1 = res.json()[0]["_id"] if res else None
data = {"username": "U2", "password": "pw2"}
data["project_role_mappings"] = [{"project": p1, "role": rpa}, {"project": padmin, "role": rsa}]
res = engine.test("Create user 2", "POST", "/admin/v1/users", headers_json,
data, 201, {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json")
u2 = engine.last_id if res else None
if u1:
ftt = "project_role_mappings"
xpr = [{"project": p1, "role": rpa}, {"project": padmin, "role": rpu}]
data = {ftt: xpr}
engine.test("Edit user U1, delete P2 project", "PATCH", "/admin/v1/users/"+u1, headers_json,
data, 204, None, None)
res = engine.test("Check user U1, contains the right projects", "GET", "/admin/v1/users/"+u1,
headers_json, None, 200, None, json)
if res:
rj = res.json()
xpr[0]["project_name"] = "P1"
xpr[0]["role_name"] = "project_admin"
xpr[1]["project_name"] = "Padmin"
xpr[1]["role_name"] = "project_user"
ok = True
for pr in rj[ftt]:
if pr not in xpr:
ok = False
for pr in xpr:
if pr not in rj[ftt]:
ok = False
if not ok:
logger.error("User {} '{}' are different than expected '{}'. Edition was not done properly"
.format(ftt, rj[ftt], xpr))
engine.failed_tests += 1
p2 = None # To prevent deletion attempts
# Add a test of 'default project' for Keystone?
if u2:
engine.test("Edit user U2, change password", "PUT", "/admin/v1/users/"+u2, headers_json,
{"password": "pw2_new"}, 204, None, None)
if p1:
engine.test("Change to project P1 non existing", "POST", "/admin/v1/tokens/", headers_json,
{"project_id": p1}, 401, r_header_json, "json")
if u2 and p1:
res = engine.test("Change to user U2 project P1", "POST", "/admin/v1/tokens", headers_json,
{"username": "U2", "password": "pw2_new", "project_id": "P1"}, (200, 201),
r_header_json, "json")
if res:
rj = res.json()
engine.set_header({"Authorization": "Bearer {}".format(rj["id"])})
engine.test("Edit user projects non admin", "PUT", "/admin/v1/users/U1", headers_json,
{"remove_project_role_mappings": [{"project": "P1", "role": None}]},
401, r_header_json, "json")
res = engine.test("Add new project non admin", "POST", "/admin/v1/projects", headers_json,
{"name": "P2"}, 401, r_header_json, "json")
if res is None or res.status_code == 201:
# The project has been created even though it shouldn't
res = engine.test("Get project P2", "GET", "/admin/v1/projects/P2", headers_json, None,
200, r_header_json, "json")
p2 = res.json()["_id"] if res else None
if p1:
data = {"username": "U3", "password": "pw3"}
data["project_role_mappings"] = [{"project": p1, "role": rpu}]
res = engine.test("Add new user non admin", "POST", "/admin/v1/users", headers_json,
data, 401, r_header_json, "json")
if res is None or res.status_code == 201:
# The user has been created even though it shouldn't
res = engine.test("Get user U3", "GET", "/admin/v1/users/U3", headers_json, None,
200, r_header_json, "json")
u3 = res.json()["_id"] if res else None
else:
u3 = None
if padmin:
res = engine.test("Change to user U2 project Padmin", "POST", "/admin/v1/tokens", headers_json,
{"project_id": "Padmin"}, # Caused a Keystone authentication error
# {"username": "U2", "password": "pw2_new", "project_id": "Padmin"},
(200, 201), r_header_json, "json")
if res:
rj = res.json()
engine.set_header({"Authorization": "Bearer {}".format(rj["id"])})
res = engine.test("Add new project admin", "POST", "/admin/v1/projects", headers_json,
{"name": "P3"}, (201, 204),
{"Location": "/admin/v1/projects/", "Content-Type": "application/json"},
"json")
p3 = engine.last_id if res else None
if p1:
data = {"username": "U4", "password": "pw4"}
data["project_role_mappings"] = [{"project": p1, "role": rpa}]
res = engine.test("Add new user admin", "POST", "/admin/v1/users", headers_json,
data, (201, 204),
{"Location": "/admin/v1/users/", "Content-Type": "application/json"},
"json")
u4 = engine.last_id if res else None
else:
u4 = None
if u4 and p3:
data = {"project_role_mappings": [{"project": p3, "role": rpa}]}
engine.test("Edit user projects admin", "PUT", "/admin/v1/users/U4", headers_json,
data, 204, None, None)
# Project is deleted even though it shouldn't - PROVISIONAL?
res = engine.test("Delete project P3 conflict", "DELETE", "/admin/v1/projects/"+p3,
headers_json, None, 409, None, None)
if res and res.status_code in (200, 204):
p3 = None
if p3:
res = engine.test("Delete project P3 forcing", "DELETE",
"/admin/v1/projects/"+p3+"?FORCE=True", headers_json, None, 204,
None, None)
if res and res.status_code in (200, 204):
p3 = None
if u2:
res = engine.test("Delete user U2. Conflict deleting own user", "DELETE",
"/admin/v1/users/"+u2, headers_json, None, 409, r_header_json, "json")
if res is None or res.status_code in (200, 204):
u2 = None
if u4:
res = engine.test("Delete user U4", "DELETE", "/admin/v1/users/"+u4, headers_json, None,
204, None, None)
if res and res.status_code in (200, 204):
u4 = None
if p3:
res = engine.test("Delete project P3", "DELETE", "/admin/v1/projects/"+p3, headers_json,
None, 204, None, None)
if res and res.status_code in (200, 204):
p3 = None
if u3:
res = engine.test("Delete user U3", "DELETE", "/admin/v1/users/"+u3, headers_json, None,
204, None, None)
if res:
u3 = None
# change to admin
engine.remove_authorization() # To force get authorization
engine.get_autorization()
if u1:
engine.test("Delete user U1", "DELETE", "/admin/v1/users/"+u1, headers_json, None, 204, None, None)
if u2:
engine.test("Delete user U2", "DELETE", "/admin/v1/users/"+u2, headers_json, None, 204, None, None)
if u3:
engine.test("Delete user U3", "DELETE", "/admin/v1/users/"+u3, headers_json, None, 204, None, None)
if u4:
engine.test("Delete user U4", "DELETE", "/admin/v1/users/"+u4, headers_json, None, 204, None, None)
if p1:
engine.test("Delete project P1", "DELETE", "/admin/v1/projects/"+p1, headers_json, None, 204, None, None)
if p2:
engine.test("Delete project P2", "DELETE", "/admin/v1/projects/"+p2, headers_json, None, 204, None, None)
if p3:
engine.test("Delete project P3", "DELETE", "/admin/v1/projects/"+p3, headers_json, None, 204, None, None)
if padmin:
engine.test("Delete project Padmin", "DELETE", "/admin/v1/projects/"+padmin, headers_json, None, 204,
None, None)
if pbad:
engine.test("Delete bad project", "DELETE", "/admin/v1/projects/"+pbad, headers_json, None, 204,
None, None)
# BEGIN New Tests - Addressing Projects/Users by Name/ID
pid1 = pid2 = None
uid1 = uid2 = None
res = engine.test("Create new project P1", "POST", "/admin/v1/projects", headers_json, {"name": "P1"},
201, {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json")
if res:
pid1 = res.json()["id"]
# print("# pid =", pid1)
res = engine.test("Create new project P2", "POST", "/admin/v1/projects", headers_json, {"name": "P2"},
201, {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json")
if res:
pid2 = res.json()["id"]
# print("# pid =", pid2)
data = {"username": "U1", "password": "pw1"}
data["project_role_mappings"] = [{"project": pid1, "role": rpu}]
res = engine.test("Create new user U1", "POST", "/admin/v1/users", headers_json, data, 201,
{"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json")
if res:
uid1 = res.json()["id"]
# print("# uid =", uid1)
data = {"username": "U2", "password": "pw2"}
data["project_role_mappings"] = [{"project": pid2, "role": rpu}]
res = engine.test("Create new user U2", "POST", "/admin/v1/users", headers_json, data, 201,
{"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json")
if res:
uid2 = res.json()["id"]
# print("# uid =", uid2)
if pid1:
engine.test("Get Project P1 by Name", "GET", "/admin/v1/projects/P1", headers_json, None,
200, None, "json")
engine.test("Get Project P1 by ID", "GET", "/admin/v1/projects/"+pid1, headers_json, None,
200, None, "json")
if uid1:
engine.test("Get User U1 by Name", "GET", "/admin/v1/users/U1", headers_json, None, 200, None, "json")
engine.test("Get User U1 by ID", "GET", "/admin/v1/users/"+uid1, headers_json, None, 200, None, "json")
if pid1:
res = engine.test("Rename Project P1 by Name", "PUT", "/admin/v1/projects/P1", headers_json,
{"name": "P3"}, 204, None, None)
if res:
engine.test("Get Project P1 by new Name", "GET", "/admin/v1/projects/P3", headers_json, None,
200, None, "json")
if pid2:
res = engine.test("Rename Project P2 by ID", "PUT", "/admin/v1/projects/"+pid2, headers_json,
{"name": "P4"}, 204, None, None)
if res:
engine.test("Get Project P2 by new Name", "GET", "/admin/v1/projects/P4", headers_json, None,
200, None, "json")
if uid1:
res = engine.test("Rename User U1 by Name", "PUT", "/admin/v1/users/U1", headers_json,
{"username": "U3"}, 204, None, None)
if res:
engine.test("Get User U1 by new Name", "GET", "/admin/v1/users/U3", headers_json, None,
200, None, "json")
if uid2:
res = engine.test("Rename User U2 by ID", "PUT", "/admin/v1/users/"+uid2, headers_json,
{"username": "U4"}, 204, None, None)
if res:
engine.test("Get User U2 by new Name", "GET", "/admin/v1/users/U4", headers_json, None,
200, None, "json")
if uid1:
res = engine.test("Delete User U1 by Name", "DELETE", "/admin/v1/users/U3", headers_json, None,
204, None, None)
if res:
uid1 = None
if uid2:
res = engine.test("Delete User U2 by ID", "DELETE", "/admin/v1/users/"+uid2, headers_json, None,
204, None, None)
if res:
uid2 = None
if pid1:
res = engine.test("Delete Project P1 by Name", "DELETE", "/admin/v1/projects/P3", headers_json, None,
204, None, None)
if res:
pid1 = None
if pid2:
res = engine.test("Delete Project P2 by ID", "DELETE", "/admin/v1/projects/"+pid2, headers_json, None,
204, None, None)
if res:
pid2 = None
# END New Tests - Addressing Projects/Users by Name
# CLEANUP
if pid1:
engine.test("Delete Project P1", "DELETE", "/admin/v1/projects/"+pid1, headers_json, None, 204, None, None)
if pid2:
engine.test("Delete Project P2", "DELETE", "/admin/v1/projects/"+pid2, headers_json, None, 204, None, None)
if uid1:
engine.test("Delete User U1", "DELETE", "/admin/v1/users/"+uid1, headers_json, None, 204, None, None)
if uid2:
engine.test("Delete User U2", "DELETE", "/admin/v1/users/"+uid2, headers_json, None, 204, None, None)
engine.remove_authorization() # To finish
class TestProjectsDescriptors:
description = "test descriptors visibility among projects"
@staticmethod
def run(engine, test_osm, manual_check, test_params=None):
vnfd_ids = []
engine.set_test_name("ProjectDescriptors")
engine.get_autorization()
project_admin_id = None
res = engine.test("Get my project Padmin", "GET", "/admin/v1/projects/{}".format(engine.project), headers_json,
None, 200, r_header_json, "json")
if res:
response = res.json()
project_admin_id = response["_id"]
engine.test("Create project Padmin", "POST", "/admin/v1/projects", headers_json,
{"name": "Padmin", "admin": True}, (201, 204),
{"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json")
engine.test("Create project P2", "POST", "/admin/v1/projects", headers_json, {"name": "P2"},
(201, 204), {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json")
engine.test("Create project P3", "POST", "/admin/v1/projects", headers_json, {"name": "P3"},
(201, 204), {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json")
engine.test("Create user U1", "POST", "/admin/v1/users", headers_json,
{"username": "U1", "password": "pw1",
"project_role_mappings": [{"project": "Padmin", "role": "system_admin"},
{"project": "P2", "role": "project_admin"},
{"project": "P3", "role": "project_admin"}],
}, 201, {"Location": "/admin/v1/users/", "Content-Type": "application/json"}, "json")
engine.test("Onboard VNFD id1", "POST", "/vnfpkgm/v1/vnf_packages_content?id=id1", headers_yaml,
TestDescriptors.vnfd_empty, 201, r_headers_yaml_location_vnfd, "yaml")
vnfd_ids.append(engine.last_id)
engine.test("Onboard VNFD id2 PUBLIC", "POST", "/vnfpkgm/v1/vnf_packages_content?id=id2&PUBLIC=TRUE",
headers_yaml, TestDescriptors.vnfd_empty, 201, r_headers_yaml_location_vnfd, "yaml")
vnfd_ids.append(engine.last_id)
engine.test("Onboard VNFD id3", "POST", "/vnfpkgm/v1/vnf_packages_content?id=id3&PUBLIC=FALSE", headers_yaml,
TestDescriptors.vnfd_empty, 201, r_headers_yaml_location_vnfd, "yaml")
vnfd_ids.append(engine.last_id)
res = engine.test("Get VNFD descriptors", "GET", "/vnfpkgm/v1/vnf_packages?id=id1,id2,id3",
headers_json, None, 200, r_header_json, "json")
response = res.json()
if len(response) != 3:
logger.error("Only 3 vnfds should be present for project admin. {} listed".format(len(response)))
engine.failed_tests += 1
# Change to other project Padmin
res = engine.test("Change to user U1 project Padmin", "POST", "/admin/v1/tokens", headers_json,
{"username": "U1", "password": "pw1", "project_id": "Padmin"}, (200, 201),
r_header_json, "json")
if res:
response = res.json()
engine.set_header({"Authorization": "Bearer {}".format(response["id"])})
# list vnfds
res = engine.test("List VNFD descriptors for Padmin", "GET", "/vnfpkgm/v1/vnf_packages",
headers_json, None, 200, r_header_json, "json")
response = res.json()
if len(response) != 0:
logger.error("Only 0 vnfds should be present for project Padmin. {} listed".format(len(response)))
engine.failed_tests += 1
# list Public vnfds
res = engine.test("List VNFD public descriptors", "GET", "/vnfpkgm/v1/vnf_packages?PUBLIC=True",
headers_json, None, 200, r_header_json, "json")
response = res.json()
if len(response) != 1:
logger.error("Only 1 vnfds should be present for project Padmin. {} listed".format(len(response)))
engine.failed_tests += 1
# list vnfds belonging to project "admin"
res = engine.test("List VNFD of admin project", "GET",
"/vnfpkgm/v1/vnf_packages?ADMIN={}".format(project_admin_id),
headers_json, None, 200, r_header_json, "json")
if res:
response = res.json()
if len(response) != 3:
logger.error("Only 3 vnfds should be present for project Padmin. {} listed".format(len(response)))
engine.failed_tests += 1
# Get Public vnfds
engine.test("Get VNFD public descriptors", "GET", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_ids[1]),
headers_json, None, 200, r_header_json, "json")
# Edit not owned vnfd
engine.test("Edit VNFD ", "PATCH", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_ids[0]),
headers_yaml, '{name: pepe}', 404, r_header_yaml, "yaml")
# Add to my catalog
engine.test("Add VNFD id2 to my catalog", "PATCH", "/vnfpkgm/v1/vnf_packages/{}?SET_PROJECT".
format(vnfd_ids[1]), headers_json, None, 204, None, 0)
# Add a new vnfd
engine.test("Onboard VNFD id4", "POST", "/vnfpkgm/v1/vnf_packages_content?id=id4", headers_yaml,
TestDescriptors.vnfd_empty, 201, r_headers_yaml_location_vnfd, "yaml")
vnfd_ids.append(engine.last_id)
# list vnfds
res = engine.test("List VNFD public descriptors", "GET", "/vnfpkgm/v1/vnf_packages",
headers_json, None, 200, r_header_json, "json")
response = res.json()
if len(response) != 2:
logger.error("Only 2 vnfds should be present for project Padmin. {} listed".format(len(response)))
engine.failed_tests += 1
if manual_check:
input('VNFDs have been omboarded. Perform manual check and press enter to resume')
test_rest.test("Delete VNFD id2", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_ids[1]),
headers_yaml, None, 204, None, 0)
# change to admin project
engine.remove_authorization() # To force get authorization
engine.get_autorization()
test_rest.test("Delete VNFD id1", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_ids[0]),
headers_yaml, None, 204, None, 0)
test_rest.test("Delete VNFD id2", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_ids[1]),
headers_yaml, None, 204, None, 0)
test_rest.test("Delete VNFD id3", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_ids[2]),
headers_yaml, None, 204, None, 0)
test_rest.test("Delete VNFD id4", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_ids[3]),
headers_yaml, None, 404, r_header_yaml, "yaml")
test_rest.test("Delete VNFD id4", "DELETE", "/vnfpkgm/v1/vnf_packages/{}?ADMIN".format(vnfd_ids[3]),
headers_yaml, None, 204, None, 0)
# Get Public vnfds
engine.test("Get VNFD deleted id1", "GET", "/vnfpkgm/v1/vnf_packages/{}?ADMIN".format(vnfd_ids[0]),
headers_json, None, 404, r_header_json, "json")
engine.test("Get VNFD deleted id2", "GET", "/vnfpkgm/v1/vnf_packages/{}?ADMIN".format(vnfd_ids[1]),
headers_json, None, 404, r_header_json, "json")
engine.test("Get VNFD deleted id3", "GET", "/vnfpkgm/v1/vnf_packages/{}?ADMIN".format(vnfd_ids[2]),
headers_json, None, 404, r_header_json, "json")
engine.test("Get VNFD deleted id4", "GET", "/vnfpkgm/v1/vnf_packages/{}?ADMIN".format(vnfd_ids[3]),
headers_json, None, 404, r_header_json, "json")
engine.test("Delete user U1", "DELETE", "/admin/v1/users/U1", headers_json, None, 204, None, None)
engine.test("Delete project Padmin", "DELETE", "/admin/v1/projects/Padmin", headers_json, None, 204, None, None)
engine.test("Delete project P2", "DELETE", "/admin/v1/projects/P2", headers_json, None, 204, None, None)
engine.test("Delete project P3", "DELETE", "/admin/v1/projects/P3", headers_json, None, 204, None, None)
class TestFakeVim:
description = "Creates/edit/delete fake VIMs and SDN controllers"
def __init__(self):
self.vim = {
"schema_version": "1.0",
"schema_type": "No idea",
"name": "myVim",
"description": "Descriptor name",
"vim_type": "openstack",
"vim_url": "http://localhost:/vim",
"vim_tenant_name": "vimTenant",
"vim_user": "user",
"vim_password": "password",
"config": {"config_param": 1}
}
self.sdn = {
"name": "sdn-name",
"description": "sdn-description",
"dpid": "50:50:52:54:00:94:21:21",
"ip": "192.168.15.17",
"port": 8080,
"type": "opendaylight",
"version": "3.5.6",
"user": "user",
"password": "passwd"
}
self.port_mapping = [
{"compute_node": "compute node 1",
"ports": [{"pci": "0000:81:00.0", "switch_port": "port-2/1", "switch_mac": "52:54:00:94:21:21"},
{"pci": "0000:81:00.1", "switch_port": "port-2/2", "switch_mac": "52:54:00:94:21:22"}
]},
{"compute_node": "compute node 2",
"ports": [{"pci": "0000:81:00.0", "switch_port": "port-2/3", "switch_mac": "52:54:00:94:21:23"},
{"pci": "0000:81:00.1", "switch_port": "port-2/4", "switch_mac": "52:54:00:94:21:24"}
]}
]
def run(self, engine, test_osm, manual_check, test_params=None):
vim_bad = self.vim.copy()
vim_bad.pop("name")
engine.set_test_name("FakeVim")
engine.get_autorization()
engine.test("Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, self.vim, (201, 202),
{"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/json"}, "json")
vim_id = engine.last_id
engine.test("Create VIM without name, bad schema", "POST", "/admin/v1/vim_accounts", headers_json,
vim_bad, 422, None, headers_json)
engine.test("Create VIM name repeated", "POST", "/admin/v1/vim_accounts", headers_json, self.vim,
409, None, headers_json)
engine.test("Show VIMs", "GET", "/admin/v1/vim_accounts", headers_yaml, None, 200, r_header_yaml,
"yaml")
engine.test("Show VIM", "GET", "/admin/v1/vim_accounts/{}".format(vim_id), headers_yaml, None, 200,
r_header_yaml, "yaml")
if not test_osm:
# delete with FORCE
engine.test("Delete VIM", "DELETE", "/admin/v1/vim_accounts/{}?FORCE=True".format(vim_id), headers_yaml,
None, 202, None, 0)
engine.test("Check VIM is deleted", "GET", "/admin/v1/vim_accounts/{}".format(vim_id), headers_yaml, None,
404, r_header_yaml, "yaml")
else:
# delete and wait until is really deleted
engine.test("Delete VIM", "DELETE", "/admin/v1/vim_accounts/{}".format(vim_id), headers_yaml, None, 202,
None, 0)
engine.wait_until_delete("/admin/v1/vim_accounts/{}".format(vim_id), timeout)
class TestVIMSDN(TestFakeVim):
description = "Creates VIM with SDN editing SDN controllers and port_mapping"
def __init__(self):
TestFakeVim.__init__(self)
self.wim = {
"schema_version": "1.0",
"schema_type": "No idea",
"name": "myWim",
"description": "Descriptor name",
"wim_type": "odl",
"wim_url": "http://localhost:/wim",
"user": "user",
"password": "password",
"config": {"config_param": 1}
}
def run(self, engine, test_osm, manual_check, test_params=None):
engine.set_test_name("VimSdn")
engine.get_autorization()
# Added SDN
engine.test("Create SDN", "POST", "/admin/v1/sdns", headers_json, self.sdn, (201, 202),
{"Location": "/admin/v1/sdns/", "Content-Type": "application/json"}, "json")
sdnc_id = engine.last_id
# sleep(5)
# Edit SDN
engine.test("Edit SDN", "PATCH", "/admin/v1/sdns/{}".format(sdnc_id), headers_json, {"name": "new_sdn_name"},
(202, 204), None, None)
# sleep(5)
# VIM with SDN
self.vim["config"]["sdn-controller"] = sdnc_id
self.vim["config"]["sdn-port-mapping"] = self.port_mapping
engine.test("Create VIM", "POST", "/admin/v1/vim_accounts", headers_json, self.vim, (200, 202, 201),
{"Location": "/admin/v1/vim_accounts/", "Content-Type": "application/json"}, "json"),
vim_id = engine.last_id
self.port_mapping[0]["compute_node"] = "compute node XX"
engine.test("Edit VIM change port-mapping", "PUT", "/admin/v1/vim_accounts/{}".format(vim_id), headers_json,
{"config": {"sdn-port-mapping": self.port_mapping}}, (202, 204), None, None)
engine.test("Edit VIM remove port-mapping", "PUT", "/admin/v1/vim_accounts/{}".format(vim_id), headers_json,
{"config": {"sdn-port-mapping": None}}, (202, 204), None, None)
engine.test("Create WIM", "POST", "/admin/v1/wim_accounts", headers_json, self.wim, (200, 202, 201),
{"Location": "/admin/v1/wim_accounts/", "Content-Type": "application/json"}, "json"),
wim_id = engine.last_id
if not test_osm:
# delete with FORCE
engine.test("Delete VIM remove port-mapping", "DELETE",
"/admin/v1/vim_accounts/{}?FORCE=True".format(vim_id), headers_json, None, 202, None, 0)
engine.test("Delete SDNC", "DELETE", "/admin/v1/sdns/{}?FORCE=True".format(sdnc_id), headers_json, None,
202, None, 0)
engine.test("Delete WIM", "DELETE",
"/admin/v1/wim_accounts/{}?FORCE=True".format(wim_id), headers_json, None, 202, None, 0)
engine.test("Check VIM is deleted", "GET", "/admin/v1/vim_accounts/{}".format(vim_id), headers_yaml,
None, 404, r_header_yaml, "yaml")
engine.test("Check SDN is deleted", "GET", "/admin/v1/sdns/{}".format(sdnc_id), headers_yaml, None,
404, r_header_yaml, "yaml")
engine.test("Check WIM is deleted", "GET", "/admin/v1/wim_accounts/{}".format(wim_id), headers_yaml,
None, 404, r_header_yaml, "yaml")
else:
if manual_check:
input('VIM, SDN, WIM has been deployed. Perform manual check and press enter to resume')
# delete and wait until is really deleted
engine.test("Delete VIM remove port-mapping", "DELETE", "/admin/v1/vim_accounts/{}".format(vim_id),
headers_json, None, (202, 201, 204), None, 0)
engine.test("Delete SDN", "DELETE", "/admin/v1/sdns/{}".format(sdnc_id), headers_json, None,
(202, 201, 204), None, 0)
engine.test("Delete VIM", "DELETE", "/admin/v1/wim_accounts/{}".format(wim_id),
headers_json, None, (202, 201, 204), None, 0)
engine.wait_until_delete("/admin/v1/vim_accounts/{}".format(vim_id), timeout)
engine.wait_until_delete("/admin/v1/sdns/{}".format(sdnc_id), timeout)
engine.wait_until_delete("/admin/v1/wim_accounts/{}".format(wim_id), timeout)
class TestDeploy:
description = "Base class for downloading descriptors from ETSI, onboard and deploy in real VIM"
def __init__(self):
self.test_name = "DEPLOY"
self.nsd_id = None
self.vim_id = None
self.ns_id = None
self.vnfds_id = []
self.descriptor_url = "https://osm-download.etsi.org/ftp/osm-3.0-three/2nd-hackfest/packages/"
self.vnfd_filenames = ("cirros_vnf.tar.gz",)
self.nsd_filename = "cirros_2vnf_ns.tar.gz"
self.descriptor_edit = None
self.uses_configuration = False
self.users = {}
self.passwords = {}
self.commands = {}
self.keys = {}
self.timeout = 120
self.qforce = ""
self.ns_params = None
self.vnfr_ip_list = {}
def create_descriptors(self, engine):
temp_dir = os.path.dirname(os.path.abspath(__file__)) + "/temp/"
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
for vnfd_index, vnfd_filename in enumerate(self.vnfd_filenames):
if "/" in vnfd_filename:
vnfd_filename_path = vnfd_filename
if not os.path.exists(vnfd_filename_path):
raise TestException("File '{}' does not exist".format(vnfd_filename_path))
else:
vnfd_filename_path = temp_dir + vnfd_filename
if not os.path.exists(vnfd_filename_path):
with open(vnfd_filename_path, "wb") as file:
response = requests.get(self.descriptor_url + vnfd_filename)
if response.status_code >= 300:
raise TestException("Error downloading descriptor from '{}': {}".format(
self.descriptor_url + vnfd_filename, response.status_code))
file.write(response.content)
if vnfd_filename_path.endswith(".yaml"):
headers = headers_yaml
else:
headers = headers_zip_yaml
if randint(0, 1) == 0:
# vnfd CREATE AND UPLOAD in one step:
engine.test("Onboard VNFD in one step", "POST",
"/vnfpkgm/v1/vnf_packages_content" + self.qforce, headers, "@b" + vnfd_filename_path, 201,
r_headers_yaml_location_vnfd,
"yaml")
self.vnfds_id.append(engine.last_id)
else:
# vnfd CREATE AND UPLOAD ZIP
engine.test("Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages",
headers_json, None, 201,
{"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json")
self.vnfds_id.append(engine.last_id)
engine.test("Onboard VNFD step 2 as ZIP", "PUT",
"/vnfpkgm/v1/vnf_packages/<>/package_content" + self.qforce,
headers, "@b" + vnfd_filename_path, 204, None, 0)
if self.descriptor_edit:
if "vnfd{}".format(vnfd_index) in self.descriptor_edit:
# Modify VNFD
engine.test("Edit VNFD ", "PATCH",
"/vnfpkgm/v1/vnf_packages/{}".format(self.vnfds_id[-1]),
headers_yaml, self.descriptor_edit["vnfd{}".format(vnfd_index)], 204, None, None)
if "/" in self.nsd_filename:
nsd_filename_path = self.nsd_filename
if not os.path.exists(nsd_filename_path):
raise TestException("File '{}' does not exist".format(nsd_filename_path))
else:
nsd_filename_path = temp_dir + self.nsd_filename
if not os.path.exists(nsd_filename_path):
with open(nsd_filename_path, "wb") as file:
response = requests.get(self.descriptor_url + self.nsd_filename)
if response.status_code >= 300:
raise TestException("Error downloading descriptor from '{}': {}".format(
self.descriptor_url + self.nsd_filename, response.status_code))
file.write(response.content)
if nsd_filename_path.endswith(".yaml"):
headers = headers_yaml
else:
headers = headers_zip_yaml
if randint(0, 1) == 0:
# nsd CREATE AND UPLOAD in one step:
engine.test("Onboard NSD in one step", "POST",
"/nsd/v1/ns_descriptors_content" + self.qforce, headers, "@b" + nsd_filename_path, 201,
r_headers_yaml_location_nsd, yaml)
self.nsd_id = engine.last_id
else:
# nsd CREATE AND UPLOAD ZIP
engine.test("Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors",
headers_json, None, 201,
{"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json")
self.nsd_id = engine.last_id
engine.test("Onboard NSD step 2 as ZIP", "PUT",
"/nsd/v1/ns_descriptors/<>/nsd_content" + self.qforce,
headers, "@b" + nsd_filename_path, 204, None, 0)
if self.descriptor_edit and "nsd" in self.descriptor_edit:
# Modify NSD
engine.test("Edit NSD ", "PATCH",
"/nsd/v1/ns_descriptors/{}".format(self.nsd_id),
headers_yaml, self.descriptor_edit["nsd"], 204, None, None)
def delete_descriptors(self, engine):
# delete descriptors
engine.test("Delete NSSD SOL005", "DELETE",
"/nsd/v1/ns_descriptors/{}".format(self.nsd_id),
headers_yaml, None, 204, None, 0)
for vnfd_id in self.vnfds_id:
engine.test("Delete VNFD SOL005", "DELETE",
"/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id), headers_yaml, None, 204, None, 0)
def instantiate(self, engine, ns_data):
ns_data_text = yaml.safe_dump(ns_data, default_flow_style=True, width=256)
# create NS Two steps
r = engine.test("Create NS step 1", "POST", "/nslcm/v1/ns_instances",
headers_yaml, ns_data_text, (201, 202),
{"Location": "nslcm/v1/ns_instances/", "Content-Type": "application/yaml"}, "yaml")
if not r:
return
self.ns_id = engine.last_id
engine.test("Instantiate NS step 2", "POST",
"/nslcm/v1/ns_instances/{}/instantiate".format(self.ns_id), headers_yaml, ns_data_text,
(201, 202), r_headers_yaml_location_nslcmop, "yaml")
nslcmop_id = engine.last_id
if test_osm:
# Wait until status is Ok
timeout = timeout_configure if self.uses_configuration else timeout_deploy
engine.wait_operation_ready("ns", nslcmop_id, timeout)
def terminate(self, engine):
# remove deployment
if test_osm:
engine.test("Terminate NS", "POST", "/nslcm/v1/ns_instances/{}/terminate".format(self.ns_id), headers_yaml,
None, (201, 202), r_headers_yaml_location_nslcmop, "yaml")
nslcmop2_id = engine.last_id
# Wait until status is Ok
engine.wait_operation_ready("ns", nslcmop2_id, timeout_deploy)
engine.test("Delete NS", "DELETE", "/nslcm/v1/ns_instances/{}".format(self.ns_id), headers_yaml, None,
204, None, 0)
else:
engine.test("Delete NS with FORCE", "DELETE", "/nslcm/v1/ns_instances/{}?FORCE=True".format(self.ns_id),
headers_yaml, None, 204, None, 0)
# check all it is deleted
engine.test("Check NS is deleted", "GET", "/nslcm/v1/ns_instances/{}".format(self.ns_id), headers_yaml, None,
404, None, "yaml")
r = engine.test("Check NSLCMOPs are deleted", "GET",
"/nslcm/v1/ns_lcm_op_occs?nsInstanceId={}".format(self.ns_id), headers_json, None,
200, None, "json")
if not r:
return
nslcmops = r.json()
if not isinstance(nslcmops, list) or nslcmops:
raise TestException("NS {} deleted but with ns_lcm_op_occ active: {}".format(self.ns_id, nslcmops))
def test_ns(self, engine, test_osm, commands=None, users=None, passwds=None, keys=None, timeout=0):
r = engine.test("GET VNFR IDs", "GET",
"/nslcm/v1/ns_instances/{}".format(self.ns_id), headers_json, None,
200, r_header_json, "json")
if not r:
return
ns_data = r.json()
vnfr_list = ns_data['constituent-vnfr-ref']
time = 0
_commands = commands if commands is not None else self.commands
_users = users if users is not None else self.users
_passwds = passwds if passwds is not None else self.passwords
_keys = keys if keys is not None else self.keys
_timeout = timeout if timeout != 0 else self.timeout
# vnfr_list=[d8272263-6bd3-4680-84ca-6a4be23b3f2d, 88b22e2f-994a-4b61-94fd-4a3c90de3dc4]
for vnfr_id in vnfr_list:
r = engine.test("Get VNFR to get IP_ADDRESS", "GET",
"/nslcm/v1/vnfrs/{}".format(vnfr_id), headers_json, None,
200, r_header_json, "json")
if not r:
continue
vnfr_data = r.json()
vnf_index = str(vnfr_data["member-vnf-index-ref"])
ip_address = self.get_vnfr_ip(engine, vnf_index)
description = "Exec command='{}' at VNFR={} IP={}".format(_commands.get(vnf_index)[0], vnf_index,
ip_address)
engine.step += 1
test_description = "{}{} {}".format(engine.test_name, engine.step, description)
logger.warning(test_description)
while _timeout >= time:
result, message = self.do_checks([ip_address],
vnf_index=vnfr_data["member-vnf-index-ref"],
commands=_commands.get(vnf_index), user=_users.get(vnf_index),
passwd=_passwds.get(vnf_index), key=_keys.get(vnf_index))
if result == 1:
engine.passed_tests += 1
logger.debug(message)
break
elif result == 0:
time += 20
sleep(20)
elif result == -1:
engine.failed_tests += 1
logger.error(message)
break
else:
time -= 20
engine.failed_tests += 1
logger.error(message)
else:
engine.failed_tests += 1
logger.error("VNFR {} has not mgmt address. Check failed".format(vnf_index))
def do_checks(self, ip, vnf_index, commands=[], user=None, passwd=None, key=None):
try:
import urllib3
from pssh.clients import ParallelSSHClient
from pssh.utils import load_private_key
from ssh2 import exceptions as ssh2Exception
except ImportError as e:
logger.critical("Package <pssh> or/and <urllib3> is not installed. Please add them with 'pip3 install "
"parallel-ssh urllib3': {}".format(e))
return -1, "install needed packages 'pip3 install parallel-ssh urllib3'"
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
try:
p_host = os.environ.get("PROXY_HOST")
p_user = os.environ.get("PROXY_USER")
p_password = os.environ.get("PROXY_PASSWD")
if key:
pkey = load_private_key(key)
else:
pkey = None
client = ParallelSSHClient(ip, user=user, password=passwd, pkey=pkey, proxy_host=p_host,
proxy_user=p_user, proxy_password=p_password, timeout=10, num_retries=0)
for cmd in commands:
output = client.run_command(cmd)
client.join(output)
if output[ip[0]].exit_code:
return -1, "VNFR {} command '{}' returns error: '{}'".format(ip[0], cmd,
"\n".join(output[ip[0]].stderr))
else:
return 1, "VNFR {} command '{}' successful".format(ip[0], cmd)
except (ssh2Exception.ChannelFailure, ssh2Exception.SocketDisconnectError, ssh2Exception.SocketTimeout,
ssh2Exception.SocketRecvError) as e:
return 0, "Timeout accessing the VNFR {}: {}".format(ip[0], str(e))
except Exception as e:
return -1, "ERROR checking the VNFR {}: {}".format(ip[0], str(e))
def additional_operations(self, engine, test_osm, manual_check):
pass
def run(self, engine, test_osm, manual_check, test_params=None):
engine.set_test_name(self.test_name)
engine.get_autorization()
nsname = os.environ.get("OSMNBITEST_NS_NAME", "OSMNBITEST")
if test_params:
if "vnfd-files" in test_params:
self.vnfd_filenames = test_params["vnfd-files"].split(",")
if "nsd-file" in test_params:
self.nsd_filename = test_params["nsd-file"]
if test_params.get("ns-name"):
nsname = test_params["ns-name"]
self.create_descriptors(engine)
# create real VIM if not exist
self.vim_id = engine.get_create_vim(test_osm)
ns_data = {"nsDescription": "default description", "nsName": nsname, "nsdId": self.nsd_id,
"vimAccountId": self.vim_id}
if self.ns_params:
ns_data.update(self.ns_params)
if test_params and test_params.get("ns-config"):
if isinstance(test_params["ns-config"], str):
ns_data.update(yaml.load(test_params["ns-config"]), Loader=yaml.Loader)
else:
ns_data.update(test_params["ns-config"])
self.instantiate(engine, ns_data)
if manual_check:
input('NS has been deployed. Perform manual check and press enter to resume')
if test_osm and self.commands:
self.test_ns(engine, test_osm)
self.additional_operations(engine, test_osm, manual_check)
self.terminate(engine)
self.delete_descriptors(engine)
def get_first_ip(self, ip_string):
# When using a floating IP, the vnfr_data['ip-address'] contains a semicolon-separated list of IP:s.
first_ip = ip_string.split(";")[0] if ip_string else ""
return first_ip
def get_vnfr_ip(self, engine, vnfr_index_wanted):
# If the IP address list has been obtained before, it has been stored in 'vnfr_ip_list'
ip = self.vnfr_ip_list.get(vnfr_index_wanted, "")
if (ip):
return self.get_first_ip(ip)
r = engine.test("Get VNFR to get IP_ADDRESS", "GET",
"/nslcm/v1/vnfrs?member-vnf-index-ref={}&nsr-id-ref={}".format(
vnfr_index_wanted, self.ns_id), headers_json, None,
200, r_header_json, "json")
if not r:
return ""
vnfr_data = r.json()
if not (vnfr_data and vnfr_data[0]):
return ""
# Store the IP (or list of IPs) in 'vnfr_ip_list'
ip_list = vnfr_data[0].get("ip-address", "")
if ip_list:
self.vnfr_ip_list[vnfr_index_wanted] = ip_list
ip = self.get_first_ip(ip_list)
return ip
class TestDeployHackfestCirros(TestDeploy):
description = "Load and deploy Hackfest cirros_2vnf_ns example"
def __init__(self):
super().__init__()
self.test_name = "CIRROS"
self.vnfd_filenames = ("cirros_vnf.tar.gz",)
self.nsd_filename = "cirros_2vnf_ns.tar.gz"
self.commands = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]}
self.users = {'1': "cirros", '2': "cirros"}
self.passwords = {'1': "cubswin:)", '2': "cubswin:)"}
def terminate(self, engine):
# Make a delete in one step, overriding the normal two step of TestDeploy that launched terminate and delete
if test_osm:
engine.test("Terminate and delete NS in one step", "DELETE", "/nslcm/v1/ns_instances_content/{}".
format(self.ns_id), headers_yaml, None, 202, None, "yaml")
engine .wait_until_delete("/nslcm/v1/ns_instances/{}".format(self.ns_id), timeout_deploy)
else:
engine.test("Delete NS with FORCE", "DELETE", "/nslcm/v1/ns_instances/{}?FORCE=True".format(self.ns_id),
headers_yaml, None, 204, None, 0)
# check all it is deleted
engine.test("Check NS is deleted", "GET", "/nslcm/v1/ns_instances/{}".format(self.ns_id), headers_yaml, None,
404, None, "yaml")
r = engine.test("Check NSLCMOPs are deleted", "GET",
"/nslcm/v1/ns_lcm_op_occs?nsInstanceId={}".format(self.ns_id), headers_json, None,
200, None, "json")
if not r:
return
nslcmops = r.json()
if not isinstance(nslcmops, list) or nslcmops:
raise TestException("NS {} deleted but with ns_lcm_op_occ active: {}".format(self.ns_id, nslcmops))
class TestDeployHackfest1(TestDeploy):
description = "Load and deploy Hackfest_1_vnfd example"
def __init__(self):
super().__init__()
self.test_name = "HACKFEST1-"
self.vnfd_filenames = ("hackfest_1_vnfd.tar.gz",)
self.nsd_filename = "hackfest_1_nsd.tar.gz"
# self.commands = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]}
# self.users = {'1': "cirros", '2': "cirros"}
# self.passwords = {'1': "cubswin:)", '2': "cubswin:)"}
class TestDeployHackfestCirrosScaling(TestDeploy):
description = "Load and deploy Hackfest cirros_2vnf_ns example with scaling modifications"
def __init__(self):
super().__init__()
self.test_name = "CIRROS-SCALE"
self.vnfd_filenames = ("cirros_vnf.tar.gz",)
self.nsd_filename = "cirros_2vnf_ns.tar.gz"
# Modify VNFD to add scaling and count=2
self.descriptor_edit = {
"vnfd0": {
"vdu": {
"$id: 'cirros_vnfd-VM'": {"count": 2}
},
"scaling-group-descriptor": [{
"name": "scale_cirros",
"max-instance-count": 2,
"vdu": [{
"vdu-id-ref": "cirros_vnfd-VM",
"count": 2
}]
}]
}
}
def additional_operations(self, engine, test_osm, manual_check):
if not test_osm:
return
# 2 perform scale out twice
payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_OUT, scaleByStepData: ' \
'{scaling-group-descriptor: scale_cirros, member-vnf-index: "1"}}}'
for i in range(0, 2):
engine.test("Execute scale action over NS", "POST",
"/nslcm/v1/ns_instances/{}/scale".format(self.ns_id), headers_yaml, payload,
(201, 202), r_headers_yaml_location_nslcmop, "yaml")
nslcmop2_scale_out = engine.last_id
engine.wait_operation_ready("ns", nslcmop2_scale_out, timeout_deploy)
if manual_check:
input('NS scale out done. Check that two more vdus are there')
# TODO check automatic
# 2 perform scale in
payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_IN, scaleByStepData: ' \
'{scaling-group-descriptor: scale_cirros, member-vnf-index: "1"}}}'
for i in range(0, 2):
engine.test("Execute scale IN action over NS", "POST",
"/nslcm/v1/ns_instances/{}/scale".format(self.ns_id), headers_yaml, payload,
(201, 202), r_headers_yaml_location_nslcmop, "yaml")
nslcmop2_scale_in = engine.last_id
engine.wait_operation_ready("ns", nslcmop2_scale_in, timeout_deploy)
if manual_check:
input('NS scale in done. Check that two less vdus are there')
# TODO check automatic
# perform scale in that must fail as reached limit
engine.test("Execute scale IN out of limit action over NS", "POST",
"/nslcm/v1/ns_instances/{}/scale".format(self.ns_id), headers_yaml, payload,
(201, 202), r_headers_yaml_location_nslcmop, "yaml")
nslcmop2_scale_in = engine.last_id
engine.wait_operation_ready("ns", nslcmop2_scale_in, timeout_deploy, expected_fail=True)
class TestDeployIpMac(TestDeploy):
description = "Load and deploy descriptor examples setting mac, ip address at descriptor and instantiate params"
def __init__(self):
super().__init__()
self.test_name = "SetIpMac"
self.vnfd_filenames = ("vnfd_2vdu_set_ip_mac2.yaml", "vnfd_2vdu_set_ip_mac.yaml")
self.nsd_filename = "scenario_2vdu_set_ip_mac.yaml"
self.descriptor_url = \
"https://osm.etsi.org/gitweb/?p=osm/RO.git;a=blob_plain;f=test/RO_tests/v3_2vdu_set_ip_mac/"
self.commands = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]}
self.users = {'1': "osm", '2': "osm"}
self.passwords = {'1': "osm4u", '2': "osm4u"}
self.timeout = 360
def run(self, engine, test_osm, manual_check, test_params=None):
# super().run(engine, test_osm, manual_check, test_params)
# run again setting IPs with instantiate parameters
instantiation_params = {
"vnf": [
{
"member-vnf-index": "1",
"internal-vld": [
{
"name": "internal_vld1", # net_internal
"ip-profile": {
"ip-version": "ipv4",
"subnet-address": "10.9.8.0/24",
"dhcp-params": {"count": 100, "start-address": "10.9.8.100"}
},
"internal-connection-point": [
{
"id-ref": "eth2",
"ip-address": "10.9.8.2",
},
{
"id-ref": "eth3",
"ip-address": "10.9.8.3",
}
]
},
],
"vdu": [
{
"id": "VM1",
"interface": [
# {
# "name": "iface11",
# "floating-ip-required": True,
# },
{
"name": "iface13",
"mac-address": "52:33:44:55:66:13"
},
],
},
{
"id": "VM2",
"interface": [
{
"name": "iface21",
"ip-address": "10.31.31.22",
"mac-address": "52:33:44:55:66:21"
},
],
},
]
},
]
}
super().run(engine, test_osm, manual_check, test_params={"ns-config": instantiation_params})
class TestDeployHackfest4(TestDeploy):
description = "Load and deploy Hackfest 4 example."
def __init__(self):
super().__init__()
self.test_name = "HACKFEST4-"
self.vnfd_filenames = ("hackfest_4_vnfd.tar.gz",)
self.nsd_filename = "hackfest_4_nsd.tar.gz"
self.uses_configuration = True
self.commands = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]}
self.users = {'1': "ubuntu", '2': "ubuntu"}
self.passwords = {'1': "osm4u", '2': "osm4u"}
# Modify VNFD to add scaling
# self.descriptor_edit = {
# "vnfd0": {
# 'vnf-configuration': {
# 'config-primitive': [{
# 'name': 'touch',
# 'parameter': [{
# 'name': 'filename',
# 'data-type': 'STRING',
# 'default-value': '/home/ubuntu/touched'
# }]
# }]
# },
# 'scaling-group-descriptor': [{
# 'name': 'scale_dataVM',
# 'scaling-policy': [{
# 'threshold-time': 0,
# 'name': 'auto_cpu_util_above_threshold',
# 'scaling-type': 'automatic',
# 'scaling-criteria': [{
# 'name': 'cpu_util_above_threshold',
# 'vnf-monitoring-param-ref': 'all_aaa_cpu_util',
# 'scale-out-relational-operation': 'GE',
# 'scale-in-threshold': 15,
# 'scale-out-threshold': 60,
# 'scale-in-relational-operation': 'LE'
# }],
# 'cooldown-time': 60
# }],
# 'max-instance-count': 10,
# 'scaling-config-action': [
# {'vnf-config-primitive-name-ref': 'touch',
# 'trigger': 'post-scale-out'},
# {'vnf-config-primitive-name-ref': 'touch',
# 'trigger': 'pre-scale-in'}
# ],
# 'vdu': [{
# 'vdu-id-ref': 'dataVM',
# 'count': 1
# }]
# }]
# }
# }
class TestDeployHackfest3Charmed(TestDeploy):
description = "Load and deploy Hackfest 3charmed_ns example"
def __init__(self):
super().__init__()
self.test_name = "HACKFEST3-"
self.vnfd_filenames = ("hackfest_3charmed_vnfd.tar.gz",)
self.nsd_filename = "hackfest_3charmed_nsd.tar.gz"
self.uses_configuration = True
self.commands = {'1': ['ls -lrt /home/ubuntu/first-touch'], '2': ['ls -lrt /home/ubuntu/first-touch']}
self.users = {'1': "ubuntu", '2': "ubuntu"}
self.passwords = {'1': "osm4u", '2': "osm4u"}
self.descriptor_edit = {
"vnfd0": yaml.safe_load(
"""
vnf-configuration:
terminate-config-primitive:
- seq: '1'
name: touch
parameter:
- name: filename
value: '/home/ubuntu/last-touch1'
- seq: '3'
name: touch
parameter:
- name: filename
value: '/home/ubuntu/last-touch3'
- seq: '2'
name: touch
parameter:
- name: filename
value: '/home/ubuntu/last-touch2'
""")
}
def additional_operations(self, engine, test_osm, manual_check):
if not test_osm:
return
# 1 perform action
vnfr_index_selected = "2"
payload = '{member_vnf_index: "2", primitive: touch, primitive_params: { filename: /home/ubuntu/OSMTESTNBI }}'
engine.test("Exec service primitive over NS", "POST",
"/nslcm/v1/ns_instances/{}/action".format(self.ns_id), headers_yaml, payload,
(201, 202), r_headers_yaml_location_nslcmop, "yaml")
nslcmop2_action = engine.last_id
# Wait until status is Ok
engine.wait_operation_ready("ns", nslcmop2_action, timeout_deploy)
vnfr_ip = self.get_vnfr_ip(engine, vnfr_index_selected)
if manual_check:
input(
"NS service primitive has been executed."
"Check that file /home/ubuntu/OSMTESTNBI is present at {}".
format(vnfr_ip))
if test_osm:
commands = {'1': [''], '2': ['ls -lrt /home/ubuntu/OSMTESTNBI', ]}
self.test_ns(engine, test_osm, commands=commands)
# # 2 perform scale out
# payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_OUT, scaleByStepData: ' \
# '{scaling-group-descriptor: scale_dataVM, member-vnf-index: "1"}}}'
# engine.test("Execute scale action over NS", "POST",
# "/nslcm/v1/ns_instances/{}/scale".format(self.ns_id), headers_yaml, payload,
# (201, 202), r_headers_yaml_location_nslcmop, "yaml")
# nslcmop2_scale_out = engine.last_id
# engine.wait_operation_ready("ns", nslcmop2_scale_out, timeout_deploy)
# if manual_check:
# input('NS scale out done. Check that file /home/ubuntu/touched is present and new VM is created')
# # TODO check automatic
#
# # 2 perform scale in
# payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_IN, scaleByStepData: ' \
# '{scaling-group-descriptor: scale_dataVM, member-vnf-index: "1"}}}'
# engine.test("Execute scale action over NS", "POST",
# "/nslcm/v1/ns_instances/{}/scale".format(self.ns_id), headers_yaml, payload,
# (201, 202), r_headers_yaml_location_nslcmop, "yaml")
# nslcmop2_scale_in = engine.last_id
# engine.wait_operation_ready("ns", nslcmop2_scale_in, timeout_deploy)
# if manual_check:
# input('NS scale in done. Check that file /home/ubuntu/touched is updated and new VM is deleted')
# # TODO check automatic
class TestDeployHackfest3Charmed2(TestDeployHackfest3Charmed):
description = "Load and deploy Hackfest 3charmed_ns example modified version of descriptors to have dots in " \
"ids and member-vnf-index."
def __init__(self):
super().__init__()
self.test_name = "HACKFEST3v2-"
self.qforce = "?FORCE=True"
self.descriptor_edit = {
"vnfd0": {
"vdu": {
"$[0]": {
"interface": {"$[0]": {"external-connection-point-ref": "pdu-mgmt"}}
},
"$[1]": None
},
"vnf-configuration": None,
"connection-point": {
"$[0]": {
"id": "pdu-mgmt",
"name": "pdu-mgmt",
"short-name": "pdu-mgmt"
},
"$[1]": None
},
"mgmt-interface": {"cp": "pdu-mgmt"},
"description": "A vnf single vdu to be used as PDU",
"id": "vdu-as-pdu",
"internal-vld": {
"$[0]": {
"id": "pdu_internal",
"name": "pdu_internal",
"internal-connection-point": {"$[1]": None},
"short-name": "pdu_internal",
"type": "ELAN"
}
}
},
# Modify NSD accordingly
"nsd": {
"constituent-vnfd": {
"$[0]": {"vnfd-id-ref": "vdu-as-pdu"},
"$[1]": None,
},
"description": "A nsd to deploy the vnf to act as as PDU",
"id": "nsd-as-pdu",
"name": "nsd-as-pdu",
"short-name": "nsd-as-pdu",
"vld": {
"$[0]": {
"id": "mgmt_pdu",
"name": "mgmt_pdu",
"short-name": "mgmt_pdu",
"vnfd-connection-point-ref": {
"$[0]": {
"vnfd-connection-point-ref": "pdu-mgmt",
"vnfd-id-ref": "vdu-as-pdu",
},
"$[1]": None
},
"type": "ELAN"
},
"$[1]": None,
}
}
}
class TestDeployHackfest3Charmed3(TestDeployHackfest3Charmed):
description = "Load and deploy Hackfest 3charmed_ns example modified version to test scaling and NS parameters"
def __init__(self):
super().__init__()
self.test_name = "HACKFEST3v3-"
self.commands = {'1': ['ls -lrt /home/ubuntu/first-touch-1'], '2': ['ls -lrt /home/ubuntu/first-touch-2']}
self.descriptor_edit = {
"vnfd0": yaml.load(
"""
scaling-group-descriptor:
- name: "scale_dataVM"
max-instance-count: 10
scaling-policy:
- name: "auto_cpu_util_above_threshold"
scaling-type: "automatic"
threshold-time: 0
cooldown-time: 60
scaling-criteria:
- name: "cpu_util_above_threshold"
scale-in-threshold: 15
scale-in-relational-operation: "LE"
scale-out-threshold: 60
scale-out-relational-operation: "GE"
vnf-monitoring-param-ref: "monitor1"
vdu:
- vdu-id-ref: dataVM
count: 1
scaling-config-action:
- trigger: post-scale-out
vnf-config-primitive-name-ref: touch
- trigger: pre-scale-in
vnf-config-primitive-name-ref: touch
vdu:
"$id: dataVM":
monitoring-param:
- id: "dataVM_cpu_util"
nfvi-metric: "cpu_utilization"
monitoring-param:
- id: "monitor1"
name: "monitor1"
aggregation-type: AVERAGE
vdu-monitoring-param:
vdu-ref: "dataVM"
vdu-monitoring-param-ref: "dataVM_cpu_util"
vnf-configuration:
initial-config-primitive:
"$[1]":
parameter:
"$[0]":
value: "<touch_filename>" # default-value: /home/ubuntu/first-touch
config-primitive:
"$[0]":
parameter:
"$[0]":
default-value: "<touch_filename2>"
""",
Loader=yaml.Loader)
}
self.ns_params = {
"additionalParamsForVnf": [
{"member-vnf-index": "1", "additionalParams": {"touch_filename": "/home/ubuntu/first-touch-1",
"touch_filename2": "/home/ubuntu/second-touch-1"}},
{"member-vnf-index": "2", "additionalParams": {"touch_filename": "/home/ubuntu/first-touch-2",
"touch_filename2": "/home/ubuntu/second-touch-2"}},
]
}
def additional_operations(self, engine, test_osm, manual_check):
super().additional_operations(engine, test_osm, manual_check)
if not test_osm:
return
# 2 perform scale out
payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_OUT, scaleByStepData: ' \
'{scaling-group-descriptor: scale_dataVM, member-vnf-index: "1"}}}'
engine.test("Execute scale action over NS", "POST",
"/nslcm/v1/ns_instances/{}/scale".format(self.ns_id), headers_yaml, payload,
(201, 202), r_headers_yaml_location_nslcmop, "yaml")
nslcmop2_scale_out = engine.last_id
engine.wait_operation_ready("ns", nslcmop2_scale_out, timeout_deploy)
if manual_check:
input('NS scale out done. Check that file /home/ubuntu/second-touch-1 is present and new VM is created')
if test_osm:
commands = {'1': ['ls -lrt /home/ubuntu/second-touch-1', ]}
self.test_ns(engine, test_osm, commands=commands)
# TODO check automatic connection to scaled VM
# 2 perform scale in
payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_IN, scaleByStepData: ' \
'{scaling-group-descriptor: scale_dataVM, member-vnf-index: "1"}}}'
engine.test("Execute scale action over NS", "POST",
"/nslcm/v1/ns_instances/{}/scale".format(self.ns_id), headers_yaml, payload,
(201, 202), r_headers_yaml_location_nslcmop, "yaml")
nslcmop2_scale_in = engine.last_id
engine.wait_operation_ready("ns", nslcmop2_scale_in, timeout_deploy)
if manual_check:
input('NS scale in done. Check that file /home/ubuntu/second-touch-1 is updated and new VM is deleted')
# TODO check automatic
class TestDeploySimpleCharm(TestDeploy):
description = "Deploy hackfest-4 hackfest_simplecharm example"
def __init__(self):
super().__init__()
self.test_name = "HACKFEST-SIMPLE"
self.descriptor_url = "https://osm-download.etsi.org/ftp/osm-4.0-four/4th-hackfest/packages/"
self.vnfd_filenames = ("hackfest_simplecharm_vnf.tar.gz",)
self.nsd_filename = "hackfest_simplecharm_ns.tar.gz"
self.uses_configuration = True
self.commands = {'1': [''], '2': ['ls -lrt /home/ubuntu/first-touch', ]}
self.users = {'1': "ubuntu", '2': "ubuntu"}
self.passwords = {'1': "osm4u", '2': "osm4u"}
class TestDeploySimpleCharm2(TestDeploySimpleCharm):
description = "Deploy hackfest-4 hackfest_simplecharm example changing naming to contain dots on ids and " \
"vnf-member-index"
def __init__(self):
super().__init__()
self.test_name = "HACKFEST-SIMPLE2-"
self.qforce = "?FORCE=True"
self.descriptor_edit = {
"vnfd0": {
"id": "hackfest.simplecharm.vnf"
},
"nsd": {
"id": "hackfest.simplecharm.ns",
"constituent-vnfd": {
"$[0]": {"vnfd-id-ref": "hackfest.simplecharm.vnf", "member-vnf-index": "$1"},
"$[1]": {"vnfd-id-ref": "hackfest.simplecharm.vnf", "member-vnf-index": "$2"},
},
"vld": {
"$[0]": {
"vnfd-connection-point-ref": {"$[0]": {"member-vnf-index-ref": "$1",
"vnfd-id-ref": "hackfest.simplecharm.vnf"},
"$[1]": {"member-vnf-index-ref": "$2",
"vnfd-id-ref": "hackfest.simplecharm.vnf"}},
},
"$[1]": {
"vnfd-connection-point-ref": {"$[0]": {"member-vnf-index-ref": "$1",
"vnfd-id-ref": "hackfest.simplecharm.vnf"},
"$[1]": {"member-vnf-index-ref": "$2",
"vnfd-id-ref": "hackfest.simplecharm.vnf"}},
},
}
}
}
class TestDeploySingleVdu(TestDeployHackfest3Charmed):
description = "Generate a single VDU base on editing Hackfest3Charmed descriptors and deploy"
def __init__(self):
super().__init__()
self.test_name = "SingleVDU"
self.qforce = "?FORCE=True"
self.descriptor_edit = {
# Modify VNFD to remove one VDU
"vnfd0": {
"vdu": {
"$[0]": {
"interface": {"$[0]": {"external-connection-point-ref": "pdu-mgmt"}}
},
"$[1]": None
},
"vnf-configuration": None,
"connection-point": {
"$[0]": {
"id": "pdu-mgmt",
"name": "pdu-mgmt",
"short-name": "pdu-mgmt"
},
"$[1]": None
},
"mgmt-interface": {"cp": "pdu-mgmt"},
"description": "A vnf single vdu to be used as PDU",
"id": "vdu-as-pdu",
"internal-vld": {
"$[0]": {
"id": "pdu_internal",
"name": "pdu_internal",
"internal-connection-point": {"$[1]": None},
"short-name": "pdu_internal",
"type": "ELAN"
}
}
},
# Modify NSD accordingly
"nsd": {
"constituent-vnfd": {
"$[0]": {"vnfd-id-ref": "vdu-as-pdu"},
"$[1]": None,
},
"description": "A nsd to deploy the vnf to act as as PDU",
"id": "nsd-as-pdu",
"name": "nsd-as-pdu",
"short-name": "nsd-as-pdu",
"vld": {
"$[0]": {
"id": "mgmt_pdu",
"name": "mgmt_pdu",
"short-name": "mgmt_pdu",
"vnfd-connection-point-ref": {
"$[0]": {
"vnfd-connection-point-ref": "pdu-mgmt",
"vnfd-id-ref": "vdu-as-pdu",
},
"$[1]": None
},
"type": "ELAN"
},
"$[1]": None,
}
}
}
class TestDeployHnfd(TestDeployHackfest3Charmed):
description = "Generate a HNFD base on editing Hackfest3Charmed descriptors and deploy"
def __init__(self):
super().__init__()
self.test_name = "HNFD"
self.pduDeploy = TestDeploySingleVdu()
self.pdu_interface_0 = {}
self.pdu_interface_1 = {}
self.pdu_id = None
# self.vnf_to_pdu = """
# vdu:
# "$[0]":
# pdu-type: PDU-TYPE-1
# interface:
# "$[0]":
# name: mgmt-iface
# "$[1]":
# name: pdu-iface-internal
# id: hfn1
# description: HFND, one PDU + One VDU
# name: hfn1
# short-name: hfn1
#
# """
self.pdu_descriptor = {
"name": "my-PDU",
"type": "PDU-TYPE-1",
"vim_accounts": "to-override",
"interfaces": [
{
"name": "mgmt-iface",
"mgmt": True,
"type": "overlay",
"ip-address": "to override",
"mac-address": "mac_address",
"vim-network-name": "mgmt",
},
{
"name": "pdu-iface-internal",
"mgmt": False,
"type": "overlay",
"ip-address": "to override",
"mac-address": "mac_address",
"vim-network-name": "pdu_internal", # OSMNBITEST-PDU-pdu_internal
},
]
}
self.vnfd_filenames = ("hackfest_3charmed_vnfd.tar.gz", "hackfest_3charmed_vnfd.tar.gz")
self.descriptor_edit = {
"vnfd0": {
"id": "hfnd1",
"name": "hfn1",
"short-name": "hfn1",
"vdu": {
"$[0]": {
"pdu-type": "PDU-TYPE-1",
"interface": {
"$[0]": {"name": "mgmt-iface"},
"$[1]": {"name": "pdu-iface-internal"},
}
}
}
},
"nsd": {
"constituent-vnfd": {
"$[1]": {"vnfd-id-ref": "hfnd1"}
},
"vld": {
"$[0]": {"vnfd-connection-point-ref": {"$[1]": {"vnfd-id-ref": "hfnd1"}}},
"$[1]": {"vnfd-connection-point-ref": {"$[1]": {"vnfd-id-ref": "hfnd1"}}}
}
}
}
def create_descriptors(self, engine):
super().create_descriptors(engine)
# Create PDU
self.pdu_descriptor["interfaces"][0].update(self.pdu_interface_0)
self.pdu_descriptor["interfaces"][1].update(self.pdu_interface_1)
self.pdu_descriptor["vim_accounts"] = [self.vim_id]
# TODO get vim-network-name from vnfr.vld.name
self.pdu_descriptor["interfaces"][1]["vim-network-name"] = "{}-{}-{}".format(
os.environ.get("OSMNBITEST_NS_NAME", "OSMNBITEST"),
"PDU", self.pdu_descriptor["interfaces"][1]["vim-network-name"])
engine.test("Onboard PDU descriptor", "POST", "/pdu/v1/pdu_descriptors",
{"Location": "/pdu/v1/pdu_descriptors/", "Content-Type": "application/yaml"}, self.pdu_descriptor,
201, r_header_yaml, "yaml")
self.pdu_id = engine.last_id
def run(self, engine, test_osm, manual_check, test_params=None):
engine.get_autorization()
engine.set_test_name(self.test_name)
nsname = os.environ.get("OSMNBITEST_NS_NAME", "OSMNBITEST")
# create real VIM if not exist
self.vim_id = engine.get_create_vim(test_osm)
# instantiate PDU
self.pduDeploy.create_descriptors(engine)
self.pduDeploy.instantiate(engine, {"nsDescription": "to be used as PDU", "nsName": nsname + "-PDU",
"nsdId": self.pduDeploy.nsd_id, "vimAccountId": self.vim_id})
if manual_check:
input('VNF to be used as PDU has been deployed. Perform manual check and press enter to resume')
if test_osm:
self.pduDeploy.test_ns(engine, test_osm)
if test_osm:
r = engine.test("Get VNFR to obtain IP_ADDRESS", "GET",
"/nslcm/v1/vnfrs?nsr-id-ref={}".format(self.pduDeploy.ns_id), headers_json, None,
200, r_header_json, "json")
if not r:
return
vnfr_data = r.json()
# print(vnfr_data)
self.pdu_interface_0["ip-address"] = vnfr_data[0]["vdur"][0]["interfaces"][0].get("ip-address")
self.pdu_interface_1["ip-address"] = vnfr_data[0]["vdur"][0]["interfaces"][1].get("ip-address")
self.pdu_interface_0["mac-address"] = vnfr_data[0]["vdur"][0]["interfaces"][0].get("mac-address")
self.pdu_interface_1["mac-address"] = vnfr_data[0]["vdur"][0]["interfaces"][1].get("mac-address")
if not self.pdu_interface_0["ip-address"]:
raise TestException("Vnfr has not managment ip address")
else:
self.pdu_interface_0["ip-address"] = "192.168.10.10"
self.pdu_interface_1["ip-address"] = "192.168.11.10"
self.pdu_interface_0["mac-address"] = "52:33:44:55:66:13"
self.pdu_interface_1["mac-address"] = "52:33:44:55:66:14"
self.create_descriptors(engine)
ns_data = {"nsDescription": "default description", "nsName": nsname, "nsdId": self.nsd_id,
"vimAccountId": self.vim_id}
if test_params and test_params.get("ns-config"):
if isinstance(test_params["ns-config"], str):
ns_data.update(yaml.load(test_params["ns-config"]), Loader=yaml.Loader)
else:
ns_data.update(test_params["ns-config"])
self.instantiate(engine, ns_data)
if manual_check:
input('NS has been deployed. Perform manual check and press enter to resume')
if test_osm:
self.test_ns(engine, test_osm)
self.additional_operations(engine, test_osm, manual_check)
self.terminate(engine)
self.pduDeploy.terminate(engine)
self.delete_descriptors(engine)
self.pduDeploy.delete_descriptors(engine)
def delete_descriptors(self, engine):
super().delete_descriptors(engine)
# delete pdu
engine.test("Delete PDU SOL005", "DELETE",
"/pdu/v1/pdu_descriptors/{}".format(self.pdu_id),
headers_yaml, None, 204, None, 0)
class TestDescriptors:
description = "Test VNFD, NSD, PDU descriptors CRUD and dependencies"
vnfd_empty = """vnfd:vnfd-catalog:
vnfd:
- name: prova
short-name: prova
id: prova
"""
vnfd_prova = """vnfd:vnfd-catalog:
vnfd:
- connection-point:
- name: cp_0h8m
type: VPORT
id: prova
name: prova
short-name: prova
vdu:
- id: vdu_z4bm
image: ubuntu
interface:
- external-connection-point-ref: cp_0h8m
name: eth0
virtual-interface:
type: VIRTIO
name: vdu_z4bm
version: '1.0'
"""
def __init__(self):
self.vnfd_filename = "hackfest_3charmed_vnfd.tar.gz"
self.nsd_filename = "hackfest_3charmed_nsd.tar.gz"
self.descriptor_url = "https://osm-download.etsi.org/ftp/osm-3.0-three/2nd-hackfest/packages/"
self.vnfd_id = None
self.nsd_id = None
def run(self, engine, test_osm, manual_check, test_params=None):
engine.set_test_name("Descriptors")
engine.get_autorization()
temp_dir = os.path.dirname(os.path.abspath(__file__)) + "/temp/"
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
# download files
for filename in (self.vnfd_filename, self.nsd_filename):
filename_path = temp_dir + filename
if not os.path.exists(filename_path):
with open(filename_path, "wb") as file:
response = requests.get(self.descriptor_url + filename)
if response.status_code >= 300:
raise TestException("Error downloading descriptor from '{}': {}".format(
self.descriptor_url + filename, response.status_code))
file.write(response.content)
vnfd_filename_path = temp_dir + self.vnfd_filename
nsd_filename_path = temp_dir + self.nsd_filename
engine.test("Onboard empty VNFD in one step", "POST", "/vnfpkgm/v1/vnf_packages_content", headers_yaml,
self.vnfd_empty, 201, r_headers_yaml_location_vnfd, "yaml")
self.vnfd_id = engine.last_id
# test bug 605
engine.test("Upload invalid VNFD ", "PUT", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(self.vnfd_id),
headers_yaml, self.vnfd_prova, 422, r_header_yaml, "yaml")
engine.test("Upload VNFD {}".format(self.vnfd_filename), "PUT",
"/vnfpkgm/v1/vnf_packages/{}/package_content".format(self.vnfd_id), headers_zip_yaml,
"@b" + vnfd_filename_path, 204, None, 0)
queries = ["mgmt-interface.cp=mgmt", "vdu.0.interface.0.external-connection-point-ref=mgmt",
"vdu.0.interface.1.internal-connection-point-ref=internal",
"internal-vld.0.internal-connection-point.0.id-ref=internal",
# Detection of duplicated VLD names in VNF Descriptors
# URL: internal-vld=[
# {id: internal1, name: internal, type:ELAN,
# internal-connection-point: [{id-ref: mgmtVM-internal}, {id-ref: dataVM-internal}]},
# {id: internal2, name: internal, type:ELAN,
# internal-connection-point: [{id-ref: mgmtVM-internal}, {id-ref: dataVM-internal}]}
# ]
"internal-vld=%5B%7Bid%3A%20internal1%2C%20name%3A%20internal%2C%20type%3A%20ELAN%2C%20"
"internal-connection-point%3A%20%5B%7Bid-ref%3A%20mgmtVM-internal%7D%2C%20%7Bid-ref%3A%20"
"dataVM-internal%7D%5D%7D%2C%20%7Bid%3A%20internal2%2C%20name%3A%20internal%2C%20type%3A%20"
"ELAN%2C%20internal-connection-point%3A%20%5B%7Bid-ref%3A%20mgmtVM-internal%7D%2C%20%7B"
"id-ref%3A%20dataVM-internal%7D%5D%7D%5D"
]
for query in queries:
engine.test("Upload invalid VNFD ", "PUT",
"/vnfpkgm/v1/vnf_packages/{}/package_content?{}".format(self.vnfd_id, query),
headers_zip_yaml, "@b" + vnfd_filename_path, 422, r_header_yaml, "yaml")
# test bug 605
engine.test("Upload invalid VNFD ", "PUT", "/vnfpkgm/v1/vnf_packages/{}/package_content".format(self.vnfd_id),
headers_yaml, self.vnfd_prova, 422, r_header_yaml, "yaml")
# get vnfd descriptor
engine.test("Get VNFD descriptor", "GET", "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_id),
headers_yaml, None, 200, r_header_yaml, "yaml")
# get vnfd file descriptor
engine.test("Get VNFD file descriptor", "GET", "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(self.vnfd_id),
headers_text, None, 200, r_header_text, "text", temp_dir+"vnfd-yaml")
# TODO compare files: diff vnfd-yaml hackfest_3charmed_vnfd/hackfest_3charmed_vnfd.yaml
# get vnfd zip file package
engine.test("Get VNFD zip package", "GET",
"/vnfpkgm/v1/vnf_packages/{}/package_content".format(self.vnfd_id), headers_zip, None, 200,
r_header_zip, "zip", temp_dir+"vnfd-zip")
# TODO compare files: diff vnfd-zip hackfest_3charmed_vnfd.tar.gz
# get vnfd artifact
engine.test("Get VNFD artifact package", "GET",
"/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/osm.png".format(self.vnfd_id), headers_zip, None, 200,
r_header_octect, "octet-string", temp_dir+"vnfd-icon")
# TODO compare files: diff vnfd-icon hackfest_3charmed_vnfd/icons/osm.png
# nsd CREATE AND UPLOAD in one step:
engine.test("Onboard NSD in one step", "POST", "/nsd/v1/ns_descriptors_content", headers_zip_yaml,
"@b" + nsd_filename_path, 201, r_headers_yaml_location_nsd, "yaml")
self.nsd_id = engine.last_id
queries = ["vld.0.vnfd-connection-point-ref.0.vnfd-id-ref=hf"]
for query in queries:
engine.test("Upload invalid NSD ", "PUT",
"/nsd/v1/ns_descriptors/{}/nsd_content?{}".format(self.nsd_id, query),
headers_zip_yaml, "@b" + nsd_filename_path, 422, r_header_yaml, "yaml")
# get nsd descriptor
engine.test("Get NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}".format(self.nsd_id), headers_yaml,
None, 200, r_header_yaml, "yaml")
# get nsd file descriptor
engine.test("Get NSD file descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(self.nsd_id), headers_text,
None, 200, r_header_text, "text", temp_dir+"nsd-yaml")
# TODO compare files: diff nsd-yaml hackfest_3charmed_nsd/hackfest_3charmed_nsd.yaml
# get nsd zip file package
engine.test("Get NSD zip package", "GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(self.nsd_id),
headers_zip, None, 200, r_header_zip, "zip", temp_dir+"nsd-zip")
# TODO compare files: diff nsd-zip hackfest_3charmed_nsd.tar.gz
# get nsd artifact
engine.test("Get NSD artifact package", "GET",
"/nsd/v1/ns_descriptors/{}/artifacts/icons/osm.png".format(self.nsd_id), headers_zip, None, 200,
r_header_octect, "octet-string", temp_dir+"nsd-icon")
# TODO compare files: diff nsd-icon hackfest_3charmed_nsd/icons/osm.png
# vnfd DELETE
test_rest.test("Delete VNFD conflict", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_id),
headers_yaml, None, 409, None, None)
test_rest.test("Delete VNFD force", "DELETE", "/vnfpkgm/v1/vnf_packages/{}?FORCE=TRUE".format(self.vnfd_id),
headers_yaml, None, 204, None, 0)
# nsd DELETE
test_rest.test("Delete NSD", "DELETE", "/nsd/v1/ns_descriptors/{}".format(self.nsd_id), headers_yaml, None, 204,
None, 0)
class TestNetSliceTemplates:
description = "Upload a NST to OSM"
def __init__(self):
self.vnfd_filename = ("@./slice_shared/vnfd/slice_shared_vnfd.yaml")
self.vnfd_filename_middle = ("@./slice_shared/vnfd/slice_shared_middle_vnfd.yaml")
self.nsd_filename = ("@./slice_shared/nsd/slice_shared_nsd.yaml")
self.nsd_filename_middle = ("@./slice_shared/nsd/slice_shared_middle_nsd.yaml")
self.nst_filenames = ("@./slice_shared/slice_shared_nstd.yaml")
def run(self, engine, test_osm, manual_check, test_params=None):
# nst CREATE
engine.set_test_name("NST step ")
engine.get_autorization()
temp_dir = os.path.dirname(os.path.abspath(__file__)) + "/temp/"
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
# Onboard VNFDs
engine.test("Onboard edge VNFD", "POST", "/vnfpkgm/v1/vnf_packages_content", headers_yaml,
self.vnfd_filename, 201, r_headers_yaml_location_vnfd, "yaml")
self.vnfd_edge_id = engine.last_id
engine.test("Onboard middle VNFD", "POST", "/vnfpkgm/v1/vnf_packages_content", headers_yaml,
self.vnfd_filename_middle, 201, r_headers_yaml_location_vnfd, "yaml")
self.vnfd_middle_id = engine.last_id
# Onboard NSDs
engine.test("Onboard NSD edge", "POST", "/nsd/v1/ns_descriptors_content", headers_yaml,
self.nsd_filename, 201, r_headers_yaml_location_nsd, "yaml")
self.nsd_edge_id = engine.last_id
engine.test("Onboard NSD middle", "POST", "/nsd/v1/ns_descriptors_content", headers_yaml,
self.nsd_filename_middle, 201, r_headers_yaml_location_nsd, "yaml")
self.nsd_middle_id = engine.last_id
# Onboard NST
engine.test("Onboard NST", "POST", "/nst/v1/netslice_templates_content", headers_yaml, self.nst_filenames,
201, r_headers_yaml_location_nst, "yaml")
nst_id = engine.last_id
# nstd SHOW OSM format
engine.test("Show NSTD OSM format", "GET", "/nst/v1/netslice_templates/{}".format(nst_id), headers_json, None,
200, r_header_json, "json")
# nstd DELETE
engine.test("Delete NSTD", "DELETE", "/nst/v1/netslice_templates/{}".format(nst_id), headers_json, None,
204, None, 0)
# NSDs DELETE
test_rest.test("Delete NSD middle", "DELETE", "/nsd/v1/ns_descriptors/{}".format(self.nsd_middle_id),
headers_json, None, 204, None, 0)
test_rest.test("Delete NSD edge", "DELETE", "/nsd/v1/ns_descriptors/{}".format(self.nsd_edge_id), headers_json,
None, 204, None, 0)
# VNFDs DELETE
test_rest.test("Delete VNFD edge", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_edge_id),
headers_yaml, None, 204, None, 0)
test_rest.test("Delete VNFD middle", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_middle_id),
headers_yaml, None, 204, None, 0)
class TestNetSliceInstances:
'''
Test procedure:
1. Populate databases with VNFD, NSD, NST with the following scenario
+-----------------management-----------------+
| | |
+--+---+ +----+----+ +---+--+
| | | | | |
| edge +---data1----+ middle +---data2-----+ edge |
| | | | | |
+------+ +---------+ +------+
shared-nss
2. Create NSI-1
3. Instantiate NSI-1
4. Create NSI-2
5. Instantiate NSI-2
Manual check - Are 2 slices instantiated correctly?
NSI-1 3 nss (2 nss-edges + 1 nss-middle)
NSI-2 2 nss (2 nss-edge sharing nss-middle)
6. Terminate NSI-1
7. Delete NSI-1
Manual check - Is slice NSI-1 deleted correctly?
NSI-2 with 2 nss-edge + 1 nss-middle (The one from NSI-1)
8. Create NSI-3
9. Instantiate NSI-3
Manual check - Is slice NSI-3 instantiated correctly?
NSI-3 reuse nss-middle. NSI-3 only create 2 nss-edge
10. Delete NSI-2
11. Terminate NSI-2
12. Delete NSI-3
13. Terminate NSI-3
Manual check - All cleaned correctly?
NSI-2 and NSI-3 were terminated and deleted
14. Cleanup database
'''
description = "Upload a NST to OSM"
def __init__(self):
self.vim_id = None
self.vnfd_filename = ("@./slice_shared/vnfd/slice_shared_vnfd.yaml")
self.vnfd_filename_middle = ("@./slice_shared/vnfd/slice_shared_middle_vnfd.yaml")
self.nsd_filename = ("@./slice_shared/nsd/slice_shared_nsd.yaml")
self.nsd_filename_middle = ("@./slice_shared/nsd/slice_shared_middle_nsd.yaml")
self.nst_filenames = ("@./slice_shared/slice_shared_nstd.yaml")
def create_slice(self, engine, nsi_data, name):
ns_data_text = yaml.safe_dump(nsi_data, default_flow_style=True, width=256)
r = engine.test(name, "POST", "/nsilcm/v1/netslice_instances",
headers_yaml, ns_data_text, (201, 202),
{"Location": "nsilcm/v1/netslice_instances/", "Content-Type": "application/yaml"}, "yaml")
return r
def instantiate_slice(self, engine, nsi_data, nsi_id, name):
ns_data_text = yaml.safe_dump(nsi_data, default_flow_style=True, width=256)
engine.test(name, "POST",
"/nsilcm/v1/netslice_instances/{}/instantiate".format(nsi_id), headers_yaml, ns_data_text,
(201, 202), r_headers_yaml_location_nsilcmop, "yaml")
def terminate_slice(self, engine, nsi_id, name):
engine.test(name, "POST", "/nsilcm/v1/netslice_instances/{}/terminate".format(nsi_id),
headers_yaml, None, (201, 202), r_headers_yaml_location_nsilcmop, "yaml")
def delete_slice(self, engine, nsi_id, name):
engine.test(name, "DELETE", "/nsilcm/v1/netslice_instances/{}".format(nsi_id), headers_yaml, None,
204, None, 0)
def run(self, engine, test_osm, manual_check, test_params=None):
# nst CREATE
engine.set_test_name("NSI")
engine.get_autorization()
# Onboard VNFDs
engine.test("Onboard edge VNFD", "POST", "/vnfpkgm/v1/vnf_packages_content", headers_yaml,
self.vnfd_filename, 201, r_headers_yaml_location_vnfd, "yaml")
self.vnfd_edge_id = engine.last_id
engine.test("Onboard middle VNFD", "POST", "/vnfpkgm/v1/vnf_packages_content", headers_yaml,
self.vnfd_filename_middle, 201, r_headers_yaml_location_vnfd, "yaml")
self.vnfd_middle_id = engine.last_id
# Onboard NSDs
engine.test("Onboard NSD edge", "POST", "/nsd/v1/ns_descriptors_content", headers_yaml,
self.nsd_filename, 201, r_headers_yaml_location_nsd, "yaml")
self.nsd_edge_id = engine.last_id
engine.test("Onboard NSD middle", "POST", "/nsd/v1/ns_descriptors_content", headers_yaml,
self.nsd_filename_middle, 201, r_headers_yaml_location_nsd, "yaml")
self.nsd_middle_id = engine.last_id
# Onboard NST
engine.test("Onboard NST", "POST", "/nst/v1/netslice_templates_content", headers_yaml, self.nst_filenames,
201, r_headers_yaml_location_nst, "yaml")
nst_id = engine.last_id
self.vim_id = engine.get_create_vim(test_osm)
# CREATE NSI-1
ns_data = {'nsiName': 'Deploy-NSI-1', 'vimAccountId': self.vim_id, 'nstId': nst_id, 'nsiDescription': 'default'}
r = self.create_slice(engine, ns_data, "Create NSI-1 step 1")
if not r:
return
self.nsi_id1 = engine.last_id
# INSTANTIATE NSI-1
self.instantiate_slice(engine, ns_data, self.nsi_id1, "Instantiate NSI-1 step 2")
nsilcmop_id1 = engine.last_id
# Waiting for NSI-1
if test_osm:
engine.wait_operation_ready("nsi", nsilcmop_id1, timeout_deploy)
# CREATE NSI-2
ns_data = {'nsiName': 'Deploy-NSI-2', 'vimAccountId': self.vim_id, 'nstId': nst_id, 'nsiDescription': 'default'}
r = self.create_slice(engine, ns_data, "Create NSI-2 step 1")
if not r:
return
self.nsi_id2 = engine.last_id
# INSTANTIATE NSI-2
self.instantiate_slice(engine, ns_data, self.nsi_id2, "Instantiate NSI-2 step 2")
nsilcmop_id2 = engine.last_id
# Waiting for NSI-2
if test_osm:
engine.wait_operation_ready("nsi", nsilcmop_id2, timeout_deploy)
if manual_check:
input('NSI-1 AND NSI-2 has been deployed. Perform manual check and press enter to resume')
# TERMINATE NSI-1
if test_osm:
self.terminate_slice(engine, self.nsi_id1, "Terminate NSI-1")
nsilcmop1_id = engine.last_id
# Wait terminate NSI-1
engine.wait_operation_ready("nsi", nsilcmop1_id, timeout_deploy)
# DELETE NSI-1
self.delete_slice(engine, self.nsi_id1, "Delete NS")
if manual_check:
input('NSI-1 has been deleted. Perform manual check and press enter to resume')
# CREATE NSI-3
ns_data = {'nsiName': 'Deploy-NSI-3', 'vimAccountId': self.vim_id, 'nstId': nst_id, 'nsiDescription': 'default'}
r = self.create_slice(engine, ns_data, "Create NSI-3 step 1")
if not r:
return
self.nsi_id3 = engine.last_id
# INSTANTIATE NSI-3
self.instantiate_slice(engine, ns_data, self.nsi_id3, "Instantiate NSI-3 step 2")
nsilcmop_id3 = engine.last_id
# Wait Instantiate NSI-3
if test_osm:
engine.wait_operation_ready("nsi", nsilcmop_id3, timeout_deploy)
if manual_check:
input('NSI-3 has been deployed. Perform manual check and press enter to resume')
# TERMINATE NSI-2
if test_osm:
self.terminate_slice(engine, self.nsi_id2, "Terminate NSI-2")
nsilcmop2_id = engine.last_id
# Wait terminate NSI-2
engine.wait_operation_ready("nsi", nsilcmop2_id, timeout_deploy)
# DELETE NSI-2
self.delete_slice(engine, self.nsi_id2, "DELETE NSI-2")
# TERMINATE NSI-3
if test_osm:
self. terminate_slice(engine, self.nsi_id3, "Terminate NSI-3")
nsilcmop3_id = engine.last_id
# Wait terminate NSI-3
engine.wait_operation_ready("nsi", nsilcmop3_id, timeout_deploy)
# DELETE NSI-3
self.delete_slice(engine, self.nsi_id3, "DELETE NSI-3")
if manual_check:
input('NSI-2 and NSI-3 has been deleted. Perform manual check and press enter to resume')
# nstd DELETE
engine.test("Delete NSTD", "DELETE", "/nst/v1/netslice_templates/{}".format(nst_id), headers_json, None,
204, None, 0)
# NSDs DELETE
test_rest.test("Delete NSD middle", "DELETE", "/nsd/v1/ns_descriptors/{}".format(self.nsd_middle_id),
headers_json, None, 204, None, 0)
test_rest.test("Delete NSD edge", "DELETE", "/nsd/v1/ns_descriptors/{}".format(self.nsd_edge_id), headers_json,
None, 204, None, 0)
# VNFDs DELETE
test_rest.test("Delete VNFD edge", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_edge_id),
headers_yaml, None, 204, None, 0)
test_rest.test("Delete VNFD middle", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_middle_id),
headers_yaml, None, 204, None, 0)
class TestAuthentication:
description = "Test Authentication"
@staticmethod
def run(engine, test_osm, manual_check, test_params=None):
engine.set_test_name("Authentication")
# backend = test_params.get("backend") if test_params else None # UNUSED
admin_project_id = test_project_id = None
project_admin_role_id = project_user_role_id = None
test_user_id = empty_user_id = None
default_role_id = empty_role_id = token_role_id = None
engine.get_autorization()
# GET
engine.test("Get tokens", "GET", "/admin/v1/tokens", headers_json, {},
(200), {"Content-Type": "application/json"}, "json")
engine.test("Get projects", "GET", "/admin/v1/projects", headers_json, {},
(200), {"Content-Type": "application/json"}, "json")
engine.test("Get users", "GET", "/admin/v1/users", headers_json, {},
(200), {"Content-Type": "application/json"}, "json")
engine.test("Get roles", "GET", "/admin/v1/roles", headers_json, {},
(200), {"Content-Type": "application/json"}, "json")
res = engine.test("Get admin project", "GET", "/admin/v1/projects?name=admin", headers_json, {},
(200), {"Content-Type": "application/json"}, "json")
admin_project_id = res.json()[0]["_id"] if res else None
res = engine.test("Get project admin role", "GET", "/admin/v1/roles?name=project_admin", headers_json, {},
(200), {"Content-Type": "application/json"}, "json")
project_admin_role_id = res.json()[0]["_id"] if res else None
res = engine.test("Get project user role", "GET", "/admin/v1/roles?name=project_user", headers_json, {},
(200), {"Content-Type": "application/json"}, "json")
project_user_role_id = res.json()[0]["_id"] if res else None
# POST
res = engine.test("Create test project", "POST", "/admin/v1/projects", headers_json, {"name": "test"},
(201), {"Location": "/admin/v1/projects/", "Content-Type": "application/json"}, "json")
test_project_id = engine.last_id if res else None
res = engine.test("Create role without permissions", "POST", "/admin/v1/roles", headers_json, {"name": "empty"},
(201), {"Content-Type": "application/json"}, "json")
empty_role_id = engine.last_id if res else None
res = engine.test("Create role with default permissions", "POST", "/admin/v1/roles", headers_json,
{"name": "default", "permissions": {"default": True}},
(201), {"Location": "/admin/v1/roles/", "Content-Type": "application/json"}, "json")
default_role_id = engine.last_id if res else None
res = engine.test("Create role with token permissions", "POST", "/admin/v1/roles", headers_json,
{"name": "tokens", "permissions": {"tokens": True}}, # is default required ?
(201), {"Location": "/admin/v1/roles/", "Content-Type": "application/json"}, "json")
token_role_id = engine.last_id if res else None
pr = "project-role mappings"
res = engine.test("Create user without "+pr, "POST", "/admin/v1/users", headers_json,
{"username": "empty", "password": "empty"},
201, {"Content-Type": "application/json"}, "json")
empty_user_id = engine.last_id if res else None
if admin_project_id and test_project_id and project_admin_role_id and project_user_role_id:
data = {"username": "test", "password": "test"}
data["project_role_mappings"] = [
{"project": test_project_id, "role": project_admin_role_id},
{"project": admin_project_id, "role": project_user_role_id}
]
res = engine.test("Create user with "+pr, "POST", "/admin/v1/users", headers_json, data,
(201), {"Content-Type": "application/json"}, "json")
test_user_id = engine.last_id if res else None
# PUT
if test_user_id:
engine.test("Modify test user's password", "PUT", "/admin/v1/users/"+test_user_id, headers_json,
{"password": "password"},
(204), {}, 0)
if empty_user_id and admin_project_id and test_project_id and project_admin_role_id and project_user_role_id:
data = {"project_role_mappings": [
{"project": test_project_id, "role": project_admin_role_id},
{"project": admin_project_id, "role": project_user_role_id}
]}
engine.test("Modify empty user's "+pr, "PUT", "/admin/v1/users/"+empty_user_id,
headers_json,
data,
(204), {}, 0)
# DELETE
if empty_user_id:
engine.test("Delete empty user", "DELETE", "/admin/v1/users/"+empty_user_id, headers_json, {},
(204), {}, 0)
if test_user_id:
engine.test("Delete test user", "DELETE", "/admin/v1/users/"+test_user_id, headers_json, {},
(204), {}, 0)
if empty_role_id:
engine.test("Delete empty role", "DELETE", "/admin/v1/roles/"+empty_role_id, headers_json, {},
(204), {}, 0)
if default_role_id:
engine.test("Delete default role", "DELETE", "/admin/v1/roles/"+default_role_id, headers_json, {},
(204), {}, 0)
if token_role_id:
engine.test("Delete token role", "DELETE", "/admin/v1/roles/"+token_role_id, headers_json, {},
(204), {}, 0)
if test_project_id:
engine.test("Delete test project", "DELETE", "/admin/v1/projects/"+test_project_id, headers_json, {},
(204), {}, 0)
# END Tests
engine.remove_authorization() # To finish
class TestNbiQuotas():
description = "Test NBI Quotas"
@staticmethod
def run(engine, test_osm, manual_check, test_params=None):
engine.set_test_name("NBI-Quotas_")
# backend = test_params.get("backend") if test_params else None # UNUSED
test_username = "test-nbi-quotas"
test_password = "test-nbi-quotas"
test_project = "test-nbi-quotas"
test_vim = "test-nbi-quotas"
test_wim = "test-nbi-quotas"
test_sdn = "test-nbi-quotas"
test_user_id = None
test_project_id = None
test_vim_ids = []
test_wim_ids = []
test_sdn_ids = []
test_vnfd_ids = []
test_nsd_ids = []
test_nst_ids = []
test_pdu_ids = []
test_nsr_ids = []
test_nsi_ids = []
# Save admin access data
admin_username = engine.user
admin_password = engine.password
admin_project = engine.project
# Get admin access
engine.get_autorization()
admin_token = engine.last_id
# Check that test project,user do not exist
res1 = engine.test("Check that test project doesn't exist", "GET", "/admin/v1/projects/"+test_project,
headers_json, {}, (404), {}, True)
res2 = engine.test("Check that test user doesn't exist", "GET", "/admin/v1/users/"+test_username,
headers_json, {}, (404), {}, True)
if None in [res1, res2]:
engine.remove_authorization()
logger.error("Test project and/or user already exist")
return
# Create test project&user
res = engine.test("Create test project", "POST", "/admin/v1/projects", headers_json,
{"name": test_username,
"quotas": {
"vnfds": 2,
"nsds": 2,
"nsts": 1,
"pdus": 1,
"nsrs": 2,
"nsis": 1,
"vim_accounts": 1,
"wim_accounts": 1,
"sdns": 1,
}
},
(201), r_header_json, "json")
test_project_id = engine.last_id if res else None
res = engine.test("Create test user", "POST", "/admin/v1/users", headers_json,
{"username": test_username, "password": test_password,
"project_role_mappings": [{"project": test_project, "role": "project_admin"}]},
(201), r_header_json, "json")
test_user_id = engine.last_id if res else None
if test_project_id and test_user_id:
# Get user access
engine.token = None
engine.user = test_username
engine.password = test_password
engine.project = test_project
engine.get_autorization()
user_token = engine.last_id
# Create test VIM
res = engine.test("Create test VIM", "POST", "/admin/v1/vim_accounts", headers_json,
{"name": test_vim,
"vim_type": "openvim",
"vim_user": test_username,
"vim_password": test_password,
"vim_tenant_name": test_project,
"vim_url": "https://0.0.0.0:0/v0.0",
},
(202), r_header_json, "json")
test_vim_ids += [engine.last_id if res else None]
res = engine.test("Try to create second test VIM", "POST", "/admin/v1/vim_accounts", headers_json,
{"name": test_vim + "_2",
"vim_type": "openvim",
"vim_user": test_username,
"vim_password": test_password,
"vim_tenant_name": test_project,
"vim_url": "https://0.0.0.0:0/v0.0",
},
(422), r_header_json, "json")
test_vim_ids += [engine.last_id if res is None else None]
res = engine.test("Try to create second test VIM with FORCE",
"POST", "/admin/v1/vim_accounts?FORCE", headers_json,
{"name": test_vim + "_3",
"vim_type": "openvim",
"vim_user": test_username,
"vim_password": test_password,
"vim_tenant_name": test_project,
"vim_url": "https://0.0.0.0:0/v0.0",
},
(202), r_header_json, "json")
test_vim_ids += [engine.last_id if res else None]
if test_vim_ids[0]:
# Download descriptor files (if required)
test_dir = "/tmp/"+test_username+"/"
test_url = "https://osm-download.etsi.org/ftp/osm-6.0-six/7th-hackfest/packages/"
vnfd_filenames = ["slice_hackfest_vnfd.tar.gz", "slice_hackfest_middle_vnfd.tar.gz"]
nsd_filenames = ["slice_hackfest_nsd.tar.gz", "slice_hackfest_middle_nsd.tar.gz"]
nst_filenames = ["slice_hackfest_nstd.yaml"]
pdu_filenames = ["PDU_router.yaml"]
desc_filenames = vnfd_filenames + nsd_filenames + nst_filenames + pdu_filenames
if not os.path.exists(test_dir):
os.makedirs(test_dir)
for filename in desc_filenames:
if not os.path.exists(test_dir+filename):
res = requests.get(test_url+filename)
if res.status_code < 300:
with open(test_dir+filename, "wb") as file:
file.write(res.content)
if all([os.path.exists(test_dir+p) for p in desc_filenames]):
# Test VNFD Quotas
res = engine.test("Create test VNFD #1", "POST", "/vnfpkgm/v1/vnf_packages_content",
headers_zip_json, "@b"+test_dir+vnfd_filenames[0],
(201), r_header_json, "json")
test_vnfd_ids += [engine.last_id if res else None]
res = engine.test("Create test VNFD #2", "POST", "/vnfpkgm/v1/vnf_packages_content",
headers_zip_json, "@b"+test_dir+vnfd_filenames[1],
(201), r_header_json, "json")
test_vnfd_ids += [engine.last_id if res else None]
res = engine.test("Try to create extra test VNFD", "POST",
"/vnfpkgm/v1/vnf_packages_content",
headers_zip_json, "@b"+test_dir+vnfd_filenames[0],
(422), r_header_json, "json")
test_vnfd_ids += [engine.last_id if res is None else None]
res = engine.test("Try to create extra test VNFD with FORCE",
"POST", "/vnfpkgm/v1/vnf_packages_content?FORCE",
headers_zip_json, "@b"+test_dir+vnfd_filenames[0],
(201), r_header_json, "json")
test_vnfd_ids += [engine.last_id if res else None]
# Remove extra VNFDs to prevent further errors
for i in [2, 3]:
if test_vnfd_ids[i]:
res = engine.test("Delete test VNFD #" + str(i), "DELETE",
"/vnfpkgm/v1/vnf_packages_content/"+test_vnfd_ids[i]+"?FORCE",
headers_json, {}, (204), {}, 0)
if res:
test_vnfd_ids[i] = None
if test_vnfd_ids[0] and test_vnfd_ids[1]:
# Test NSD Quotas
res = engine.test("Create test NSD #1", "POST", "/nsd/v1/ns_descriptors_content",
headers_zip_json, "@b"+test_dir+nsd_filenames[0],
(201), r_header_json, "json")
test_nsd_ids += [engine.last_id if res else None]
res = engine.test("Create test NSD #2", "POST", "/nsd/v1/ns_descriptors_content",
headers_zip_json, "@b"+test_dir+nsd_filenames[1],
(201), r_header_json, "json")
test_nsd_ids += [engine.last_id if res else None]
res = engine.test("Try to create extra test NSD", "POST", "/nsd/v1/ns_descriptors_content",
headers_zip_json, "@b"+test_dir+nsd_filenames[0],
(422), r_header_json, "json")
test_nsd_ids += [engine.last_id if res is None else None]
res = engine.test("Try to create extra test NSD with FORCE",
"POST", "/nsd/v1/ns_descriptors_content?FORCE",
headers_zip_json, "@b"+test_dir+nsd_filenames[0],
(201), r_header_json, "json")
test_nsd_ids += [engine.last_id if res else None]
# Remove extra NSDs to prevent further errors
for i in [2, 3]:
if test_nsd_ids[i]:
res = engine.test("Delete test NSD #" + str(i), "DELETE",
"/nsd/v1/ns_descriptors_content/"+test_nsd_ids[i]+"?FORCE",
headers_json, {}, (204), {}, 0)
if res:
test_nsd_ids[i] = None
if test_nsd_ids[0] and test_nsd_ids[1]:
# Test NSR Quotas
res = engine.test("Create test NSR #1", "POST", "/nslcm/v1/ns_instances_content",
headers_json,
{"nsName": test_username+"_1",
"nsdId": test_nsd_ids[0],
"vimAccountId": test_vim_ids[0],
},
(201), r_header_json, "json")
test_nsr_ids += [engine.last_id if res else None]
res = engine.test("Create test NSR #2", "POST", "/nslcm/v1/ns_instances_content",
headers_json,
{"nsName": test_username+"_2",
"nsdId": test_nsd_ids[1],
"vimAccountId": test_vim_ids[0],
},
(201), r_header_json, "json")
test_nsr_ids += [engine.last_id if res else None]
res = engine.test("Try to create extra test NSR", "POST", "/nslcm/v1/ns_instances_content",
headers_json,
{"nsName": test_username+"_3",
"nsdId": test_nsd_ids[0],
"vimAccountId": test_vim_ids[0],
},
(422), r_header_json, "json")
test_nsr_ids += [engine.last_id if res is None else None]
res = engine.test("Try to create test NSR with FORCE", "POST",
"/nslcm/v1/ns_instances_content?FORCE", headers_json,
{"nsName": test_username+"_4",
"nsdId": test_nsd_ids[0],
"vimAccountId": test_vim_ids[0],
},
(201), r_header_json, "json")
test_nsr_ids += [engine.last_id if res else None]
# Test NST Quotas
res = engine.test("Create test NST", "POST", "/nst/v1/netslice_templates_content",
headers_txt_json, "@b"+test_dir+nst_filenames[0],
(201), r_header_json, "json")
test_nst_ids += [engine.last_id if res else None]
res = engine.test("Try to create extra test NST", "POST",
"/nst/v1/netslice_templates_content",
headers_txt_json, "@b"+test_dir+nst_filenames[0],
(422), r_header_json, "json")
test_nst_ids += [engine.last_id if res is None else None]
res = engine.test("Try to create extra test NST with FORCE", "POST",
"/nst/v1/netslice_templates_content?FORCE",
headers_txt_json, "@b"+test_dir+nst_filenames[0],
(201), r_header_json, "json")
test_nst_ids += [engine.last_id if res else None]
if test_nst_ids[0]:
# Remove NSR Quota
engine.set_header({"Authorization": "Bearer {}".format(admin_token)})
res = engine.test("Remove NSR Quota", "PUT", "/admin/v1/projects/"+test_project_id,
headers_json,
{"quotas": {"nsrs": None}},
(204), {}, 0)
engine.set_header({"Authorization": "Bearer {}".format(user_token)})
if res:
# Test NSI Quotas
res = engine.test("Create test NSI", "POST",
"/nsilcm/v1/netslice_instances_content", headers_json,
{"nsiName": test_username,
"nstId": test_nst_ids[0],
"vimAccountId": test_vim_ids[0],
},
(201), r_header_json, "json")
test_nsi_ids += [engine.last_id if res else None]
res = engine.test("Try to create extra test NSI", "POST",
"/nsilcm/v1/netslice_instances_content", headers_json,
{"nsiName": test_username,
"nstId": test_nst_ids[0],
"vimAccountId": test_vim_ids[0],
},
(400), r_header_json, "json")
test_nsi_ids += [engine.last_id if res is None else None]
res = engine.test("Try to create extra test NSI with FORCE", "POST",
"/nsilcm/v1/netslice_instances_content?FORCE", headers_json,
{"nsiName": test_username,
"nstId": test_nst_ids[0],
"vimAccountId": test_vim_ids[0],
},
(201), r_header_json, "json")
test_nsi_ids += [engine.last_id if res else None]
# Test PDU Quotas
with open(test_dir+pdu_filenames[0], "rb") as file:
pdu_text = re.sub(r"ip-address: *\[[^\]]*\]", "ip-address: '0.0.0.0'",
file.read().decode("utf-8"))
with open(test_dir+pdu_filenames[0], "wb") as file:
file.write(pdu_text.encode("utf-8"))
res = engine.test("Create test PDU", "POST", "/pdu/v1/pdu_descriptors",
headers_yaml, "@b"+test_dir+pdu_filenames[0],
(201), r_header_yaml, "yaml")
test_pdu_ids += [engine.last_id if res else None]
res = engine.test("Try to create extra test PDU", "POST", "/pdu/v1/pdu_descriptors",
headers_yaml, "@b"+test_dir+pdu_filenames[0],
(422), r_header_yaml, "yaml")
test_pdu_ids += [engine.last_id if res is None else None]
res = engine.test("Try to create extra test PDU with FORCE", "POST",
"/pdu/v1/pdu_descriptors?FORCE",
headers_yaml, "@b"+test_dir+pdu_filenames[0],
(201), r_header_yaml, "yaml")
test_pdu_ids += [engine.last_id if res else None]
# Cleanup
for i, id in enumerate(test_nsi_ids):
if id:
engine.test("Delete test NSI #"+str(i), "DELETE",
"/nsilcm/v1/netslice_instances_content/"+id+"?FORCE",
headers_json, {}, (204), {}, 0)
for i, id in enumerate(test_nsr_ids):
if id:
engine.test("Delete test NSR #"+str(i), "DELETE",
"/nslcm/v1/ns_instances_content/"+id+"?FORCE",
headers_json, {}, (204), {}, 0)
for i, id in enumerate(test_nst_ids):
if id:
engine.test("Delete test NST #"+str(i), "DELETE",
"/nst/v1/netslice_templates_content/"+id+"?FORCE",
headers_json, {}, (204), {}, 0)
for i, id in enumerate(test_nsd_ids):
if id:
engine.test("Delete test NSD #"+str(i), "DELETE",
"/nsd/v1/ns_descriptors_content/"+id+"?FORCE",
headers_json, {}, (204), {}, 0)
for i, id in enumerate(test_vnfd_ids):
if id:
engine.test("Delete test VNFD #"+str(i), "DELETE",
"/vnfpkgm/v1/vnf_packages_content/"+id+"?FORCE",
headers_json, {}, (204), {}, 0)
for i, id in enumerate(test_pdu_ids):
if id:
engine.test("Delete test PDU #"+str(i), "DELETE",
"/pdu/v1/pdu_descriptors/"+id+"?FORCE",
headers_json, {}, (204), {}, 0)
# END Test NBI Quotas
# Test WIM Quotas
res = engine.test("Create test WIM", "POST", "/admin/v1/wim_accounts", headers_json,
{"name": test_wim,
"wim_type": "onos",
"wim_url": "https://0.0.0.0:0/v0.0",
},
(202), r_header_json, "json")
test_wim_ids += [engine.last_id if res else None]
res = engine.test("Try to create second test WIM", "POST", "/admin/v1/wim_accounts", headers_json,
{"name": test_wim + "_2",
"wim_type": "onos",
"wim_url": "https://0.0.0.0:0/v0.0",
},
(422), r_header_json, "json")
test_wim_ids += [engine.last_id if res is None else None]
res = engine.test("Try to create second test WIM with FORCE", "POST", "/admin/v1/wim_accounts?FORCE",
headers_json,
{"name": test_wim + "_3",
"wim_type": "onos",
"wim_url": "https://0.0.0.0:0/v0.0",
},
(202), r_header_json, "json")
test_wim_ids += [engine.last_id if res else None]
# Test SDN Quotas
res = engine.test("Create test SDN", "POST", "/admin/v1/sdns", headers_json,
{"name": test_sdn,
"type": "onos",
"ip": "0.0.0.0",
"port": 9999,
"dpid": "00:00:00:00:00:00:00:00",
},
(202), r_header_json, "json")
test_sdn_ids += [engine.last_id if res else None]
res = engine.test("Try to create second test SDN", "POST", "/admin/v1/sdns", headers_json,
{"name": test_sdn + "_2",
"type": "onos",
"ip": "0.0.0.0",
"port": 9999,
"dpid": "00:00:00:00:00:00:00:00",
},
(422), r_header_json, "json")
test_sdn_ids += [engine.last_id if res is None else None]
res = engine.test("Try to create second test SDN with FORCE", "POST", "/admin/v1/sdns?FORCE", headers_json,
{"name": test_sdn + "_3",
"type": "onos",
"ip": "0.0.0.0",
"port": 9999,
"dpid": "00:00:00:00:00:00:00:00",
},
(202), r_header_json, "json")
test_sdn_ids += [engine.last_id if res else None]
# Cleanup
for i, id in enumerate(test_vim_ids):
if id:
engine.test("Delete test VIM #"+str(i), "DELETE", "/admin/v1/vim_accounts/"+id+"?FORCE",
headers_json, {}, (202), {}, 0)
for i, id in enumerate(test_wim_ids):
if id:
engine.test("Delete test WIM #"+str(i), "DELETE", "/admin/v1/wim_accounts/"+id+"?FORCE",
headers_json, {}, (202), {}, 0)
for i, id in enumerate(test_sdn_ids):
if id:
engine.test("Delete test SDN #"+str(i), "DELETE", "/admin/v1/sdns/"+id+"?FORCE",
headers_json, {}, (202), {}, 0)
# Release user access
engine.remove_authorization()
# Cleanup
engine.user = admin_username
engine.password = admin_password
engine.project = admin_project
engine.get_autorization()
if test_user_id:
engine.test("Delete test user", "DELETE", "/admin/v1/users/"+test_user_id+"?FORCE",
headers_json, {}, (204), {}, 0)
if test_project_id:
engine.test("Delete test project", "DELETE", "/admin/v1/projects/"+test_project_id+"?FORCE",
headers_json, {}, (204), {}, 0)
engine.remove_authorization()
# END class TestNbiQuotas
if __name__ == "__main__":
global logger
test = ""
# Disable warnings from self-signed certificates.
requests.packages.urllib3.disable_warnings()
try:
logging.basicConfig(format="%(levelname)s %(message)s", level=logging.ERROR)
logger = logging.getLogger('NBI')
# load parameters and configuration
opts, args = getopt.getopt(sys.argv[1:], "hvu:p:",
["url=", "user=", "password=", "help", "version", "verbose", "no-verbose",
"project=", "insecure", "timeout", "timeout-deploy", "timeout-configure",
"test=", "list", "test-osm", "manual-check", "params=", 'fail-fast'])
url = "https://localhost:9999/osm"
user = password = project = "admin"
test_osm = False
manual_check = False
verbose = 0
verify = True
fail_fast = False
test_classes = {
"NonAuthorized": TestNonAuthorized,
"FakeVIM": TestFakeVim,
"Users-Projects": TestUsersProjects,
"Projects-Descriptors": TestProjectsDescriptors,
"VIM-SDN": TestVIMSDN,
"Deploy-Custom": TestDeploy,
"Deploy-Hackfest-Cirros": TestDeployHackfestCirros,
"Deploy-Hackfest-Cirros-Scaling": TestDeployHackfestCirrosScaling,
"Deploy-Hackfest-3Charmed": TestDeployHackfest3Charmed,
"Deploy-Hackfest-3Charmed2": TestDeployHackfest3Charmed2,
"Deploy-Hackfest-3Charmed3": TestDeployHackfest3Charmed3,
"Deploy-Hackfest-4": TestDeployHackfest4,
"Deploy-CirrosMacIp": TestDeployIpMac,
"Descriptors": TestDescriptors,
"Deploy-Hackfest1": TestDeployHackfest1,
# "Deploy-MultiVIM": TestDeployMultiVIM,
"Deploy-SingleVdu": TestDeploySingleVdu,
"Deploy-Hnfd": TestDeployHnfd,
"Upload-Slice-Template": TestNetSliceTemplates,
"Deploy-Slice-Instance": TestNetSliceInstances,
"Deploy-SimpleCharm": TestDeploySimpleCharm,
"Deploy-SimpleCharm2": TestDeploySimpleCharm2,
"Authentication": TestAuthentication,
"NBI-Quotas": TestNbiQuotas,
}
test_to_do = []
test_params = {}
for o, a in opts:
# print("parameter:", o, a)
if o == "--version":
print("test version " + __version__ + ' ' + version_date)
exit()
elif o == "--list":
for test, test_class in sorted(test_classes.items()):
print("{:32} {}".format(test + ":", test_class.description))
exit()
elif o in ("-v", "--verbose"):
verbose += 1
elif o == "no-verbose":
verbose = -1
elif o in ("-h", "--help"):
usage()
sys.exit()
elif o == "--test-osm":
test_osm = True
elif o == "--manual-check":
manual_check = True
elif o == "--url":
url = a
elif o in ("-u", "--user"):
user = a
elif o in ("-p", "--password"):
password = a
elif o == "--project":
project = a
elif o == "--fail-fast":
fail_fast = True
elif o == "--test":
# print("asdfadf", o, a, a.split(","))
for _test in a.split(","):
if _test not in test_classes:
print("Invalid test name '{}'. Use option '--list' to show available tests".format(_test),
file=sys.stderr)
exit(1)
test_to_do.append(_test)
elif o == "--params":
param_key, _, param_value = a.partition("=")
text_index = len(test_to_do)
if text_index not in test_params:
test_params[text_index] = {}
test_params[text_index][param_key] = param_value
elif o == "--insecure":
verify = False
elif o == "--timeout":
timeout = int(a)
elif o == "--timeout-deploy":
timeout_deploy = int(a)
elif o == "--timeout-configure":
timeout_configure = int(a)
else:
assert False, "Unhandled option"
if verbose == 0:
logger.setLevel(logging.WARNING)
elif verbose > 1:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.ERROR)
test_rest = TestRest(url, user=user, password=password, project=project)
# print("tests to do:", test_to_do)
if test_to_do:
text_index = 0
for test in test_to_do:
if fail_fast and test_rest.failed_tests:
break
text_index += 1
test_class = test_classes[test]
test_class().run(test_rest, test_osm, manual_check, test_params.get(text_index))
else:
for test, test_class in sorted(test_classes.items()):
if fail_fast and test_rest.failed_tests:
break
test_class().run(test_rest, test_osm, manual_check, test_params.get(0))
test_rest.print_results()
exit(1 if test_rest.failed_tests else 0)
except TestException as e:
logger.error(test + "Test {} Exception: {}".format(test, str(e)))
exit(1)
except getopt.GetoptError as e:
logger.error(e)
print(e, file=sys.stderr)
exit(1)
except Exception as e:
logger.critical(test + " Exception: " + str(e), exc_info=True)