X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Ftests%2Ftest.py;h=a08409584bd27d67d60fc9d2d39bf9a2e0af85a1;hp=8adf20e5545408014bcfb6a8b4d9aac223b0c0f6;hb=e36ab859d275c9c1b863bbb6564a1da56cabe2dd;hpb=337ec516ed05bece8ef50694d277e55f34ab278e diff --git a/osm_nbi/tests/test.py b/osm_nbi/tests/test.py index 8adf20e..a084095 100755 --- a/osm_nbi/tests/test.py +++ b/osm_nbi/tests/test.py @@ -1,6 +1,19 @@ #! /usr/bin/python3 # -*- coding: utf-8 -*- +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import getopt import sys import requests @@ -116,16 +129,21 @@ class TestRest: # contains ID of tests obtained from Location response header. "" key contains last obtained id self.test_ids = {} self.old_test_description = "" + self.test_name = None + self.step = 0 def set_header(self, header): self.s.headers.update(header) + def set_tet_name(self, test_name): + self.test_name = test_name + def unset_header(self, key): if key in self.s.headers: del self.s.headers[key] def test(self, name, description, method, url, headers, payload, expected_codes, expected_headers, - expected_payload): + expected_payload, store_file=None): """ Performs an http request and check http code response. Exit if different than allowed. It get the returned id that can be used by following test in the URL with {name} where name is the name of the test @@ -137,7 +155,8 @@ class TestRest: :param payload: Can be a dict, transformed to json, a text or a file if starts with '@' :param expected_codes: expected response codes, can be int, int tuple or int range :param expected_headers: expected response headers, dict with key values - :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip' + :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip', 'octet-stream' + :param store_file: filename to store content :return: requests response """ r = None @@ -178,10 +197,13 @@ class TestRest: self.old_test_description = test_description logger.warning(test_description) stream = False - # if expected_payload == "zip": - # stream = True + if expected_payload in ("zip", "octet-string") or store_file: + stream = True r = getattr(self.s, method.lower())(url, data=payload, headers=headers, verify=self.verify, stream=stream) - logger.debug("RX {}: {}".format(r.status_code, r.text)) + if expected_payload in ("zip", "octet-string") or store_file: + logger.debug("RX {}".format(r.status_code)) + else: + logger.debug("RX {}: {}".format(r.status_code, r.text)) # check response if expected_codes: @@ -212,7 +234,7 @@ class TestRest: yaml.safe_load(r.text) except Exception as e: raise TestException("Expected yaml response payload, but got Exception {}".format(e)) - elif expected_payload == "zip": + elif expected_payload in ("zip", "octet-string"): if len(r.content) == 0: raise TestException("Expected some response payload, but got empty") # try: @@ -226,11 +248,17 @@ class TestRest: if len(r.content) == 0: raise TestException("Expected some response payload, but got empty") # r.text + if store_file: + with open(store_file, 'wb') as fd: + for chunk in r.iter_content(chunk_size=128): + fd.write(chunk) + location = r.headers.get("Location") if location: _id = location[location.rfind("/") + 1:] if _id: self.test_ids[name] = str(_id) + self.test_ids["last_id"] = str(_id) # last id self.test_ids[""] = str(_id) # last id return r except TestException as e: @@ -574,9 +602,11 @@ class TestDeploy: self.ns_test = None self.ns_id = None self.vnfds_test = [] + self.vnfds_id = [] self.descriptor_url = "https://osm-download.etsi.org/ftp/osm-3.0-three/2nd-hackfest/packages/" self.vnfd_filenames = ("cirros_vnf.tar.gz",) self.nsd_filename = "cirros_2vnf_ns.tar.gz" + self.descriptor_edit = None self.uses_configuration = False self.uss = {} self.passwds = {} @@ -585,12 +615,13 @@ class TestDeploy: self.timeout = 120 self.passed_tests = 0 self.total_tests = 0 + self.qforce = "" def create_descriptors(self, engine): temp_dir = os.path.dirname(os.path.abspath(__file__)) + "/temp/" if not os.path.exists(temp_dir): os.makedirs(temp_dir) - for vnfd_filename in self.vnfd_filenames: + for vnfd_index, vnfd_filename in enumerate(self.vnfd_filenames): if "/" in vnfd_filename: vnfd_filename_path = vnfd_filename if not os.path.exists(vnfd_filename_path): @@ -610,25 +641,38 @@ class TestDeploy: headers = headers_zip_yaml if self.step % 2 == 0: # vnfd CREATE AND UPLOAD in one step: - engine.test("DEPLOY{}".format(self.step), "Onboard VNFD in one step", "POST", - "/vnfpkgm/v1/vnf_packages_content", headers, "@b" + vnfd_filename_path, 201, - {"Location": "/vnfpkgm/v1/vnf_packages_content/", "Content-Type": "application/yaml"}, yaml) - self.vnfds_test.append("DEPLOY" + str(self.step)) + test_name = "DEPLOY{}".format(self.step) + engine.test(test_name, "Onboard VNFD in one step", "POST", + "/vnfpkgm/v1/vnf_packages_content" + self.qforce, headers, "@b" + vnfd_filename_path, 201, + {"Location": "/vnfpkgm/v1/vnf_packages_content/", "Content-Type": "application/yaml"}, + "yaml") + self.vnfds_test.append(test_name) + self.vnfds_id.append(engine.test_ids["last_id"]) self.step += 1 else: # vnfd CREATE AND UPLOAD ZIP - engine.test("DEPLOY{}".format(self.step), "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", + test_name = "DEPLOY{}".format(self.step) + engine.test(test_name, "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json, None, 201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json") - self.vnfds_test.append("DEPLOY" + str(self.step)) + self.vnfds_test.append(test_name) + self.vnfds_id.append(engine.test_ids["last_id"]) self.step += 1 # location = r.headers["Location"] # vnfd_id = location[location.rfind("/")+1:] engine.test("DEPLOY{}".format(self.step), "Onboard VNFD step 2 as ZIP", "PUT", - "/vnfpkgm/v1/vnf_packages/<>/package_content", + "/vnfpkgm/v1/vnf_packages/<>/package_content" + self.qforce, headers, "@b" + vnfd_filename_path, 204, None, 0) self.step += 2 + if self.descriptor_edit: + if "vnfd{}".format(vnfd_index) in self.descriptor_edit: + # Modify VNFD + engine.test("DEPLOY{}".format(self.step), "Edit VNFD ", "PATCH", + "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfds_id[-1]), + headers_yaml, self.descriptor_edit["vnfd{}".format(vnfd_index)], 204, None, None) + self.step += 1 + if "/" in self.nsd_filename: nsd_filename_path = self.nsd_filename if not os.path.exists(nsd_filename_path): @@ -651,32 +695,40 @@ class TestDeploy: if self.step % 2 == 0: # nsd CREATE AND UPLOAD in one step: engine.test("DEPLOY{}".format(self.step), "Onboard NSD in one step", "POST", - "/nsd/v1/ns_descriptors_content", headers, "@b" + nsd_filename_path, 201, + "/nsd/v1/ns_descriptors_content" + self.qforce, headers, "@b" + nsd_filename_path, 201, {"Location": "/nsd/v1/ns_descriptors_content/", "Content-Type": "application/yaml"}, yaml) self.step += 1 + self.nsd_id = engine.test_ids["last_id"] else: # nsd CREATE AND UPLOAD ZIP engine.test("DEPLOY{}".format(self.step), "Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json, None, 201, {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json") self.step += 1 + self.nsd_id = engine.test_ids["last_id"] # location = r.headers["Location"] # vnfd_id = location[location.rfind("/")+1:] engine.test("DEPLOY{}".format(self.step), "Onboard NSD step 2 as ZIP", "PUT", - "/nsd/v1/ns_descriptors/<>/nsd_content", + "/nsd/v1/ns_descriptors/<>/nsd_content" + self.qforce, headers, "@b" + nsd_filename_path, 204, None, 0) self.step += 2 - self.nsd_id = engine.test_ids[self.nsd_test] + + if self.descriptor_edit and "nsd" in self.descriptor_edit: + # Modify NSD + engine.test("DEPLOY{}".format(self.step), "Edit NSD ", "PATCH", + "/nsd/v1/ns_descriptors/{}".format(self.nsd_id), + headers_yaml, self.descriptor_edit["nsd"], 204, None, None) + self.step += 1 def delete_descriptors(self, engine): # delete descriptors engine.test("DEPLOY{}".format(self.step), "Delete NSSD SOL005", "DELETE", - "/nsd/v1/ns_descriptors/<{}>".format(self.nsd_test), + "/nsd/v1/ns_descriptors/{}".format(self.nsd_id), headers_yaml, None, 204, None, 0) self.step += 1 - for vnfd_test in self.vnfds_test: + for vnfd_id in self.vnfds_id: engine.test("DEPLOY{}".format(self.step), "Delete VNFD SOL005", "DELETE", - "/vnfpkgm/v1/vnf_packages/<{}>".format(vnfd_test), headers_yaml, None, 204, None, 0) + "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id), headers_yaml, None, 204, None, 0) self.step += 1 def instantiate(self, engine, ns_data): @@ -686,8 +738,7 @@ class TestDeploy: headers_yaml, ns_data_text, 201, {"Location": "nslcm/v1/ns_instances/", "Content-Type": "application/yaml"}, "yaml") self.ns_test = "DEPLOY{}".format(self.step) - self.ns_id = engine.test_ids[self.ns_test] - engine.test_ids[self.ns_test] + self.ns_id = engine.test_ids["last_id"] self.step += 1 r = engine.test("DEPLOY{}".format(self.step), "Instantiate NS step 2", "POST", "/nslcm/v1/ns_instances/<{}>/instantiate".format(self.ns_test), headers_yaml, ns_data_text, @@ -713,7 +764,7 @@ class TestDeploy: raise TestException("NS instantiate is not done after {} seconds".format(timeout_deploy)) self.step += 1 - def _wait_nslcmop_ready(self, engine, nslcmop_test, timeout_deploy): + def _wait_nslcmop_ready(self, engine, nslcmop_test, timeout_deploy, expected_fail=False): wait = timeout while wait >= 0: r = engine.test("DEPLOY{}".format(self.step), "Wait to ns lcm operation complete", "GET", @@ -721,9 +772,14 @@ class TestDeploy: 200, r_header_json, "json") nslcmop = r.json() if "COMPLETED" in nslcmop["operationState"]: + if expected_fail: + raise TestException("NS terminate has success, expecting failing: {}".format( + nslcmop["detailed-status"])) break elif "FAILED" in nslcmop["operationState"]: - raise TestException("NS terminate has failed: {}".format(nslcmop["detailed-status"])) + if not expected_fail: + raise TestException("NS terminate has failed: {}".format(nslcmop["detailed-status"])) + break wait -= 5 sleep(5) else: @@ -816,8 +872,8 @@ class TestDeploy: from pssh.utils import load_private_key from ssh2 import exceptions as ssh2Exception except ImportError as e: - logger.critical("package or/and is not installed. Please add it with 'pip3 install " - "parallel-ssh' and/or 'pip3 install urllib3': {}".format(e)) + logger.critical("Package or/and is not installed. Please add them with 'pip3 install " + "parallel-ssh urllib3': {}".format(e)) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) try: p_host = os.environ.get("PROXY_HOST") @@ -873,7 +929,7 @@ class TestDeploy: if manual_check: input('NS has been deployed. Perform manual check and press enter to resume') - else: + elif test_osm: self.test_ns(engine, test_osm, self.cmds, self.uss, self.pss, self.keys, self.timeout) self.aditional_operations(engine, test_osm, manual_check) self.terminate(engine) @@ -898,6 +954,82 @@ class TestDeployHackfestCirros(TestDeploy): self.pss = {'1': "cubswin:)", '2': "cubswin:)"} +class TestDeployHackfest1(TestDeploy): + description = "Load and deploy Hackfest_1_vnfd example" + + def __init__(self): + super().__init__() + self.vnfd_filenames = ("hackfest_1_vnfd.tar.gz",) + self.nsd_filename = "hackfest_1_nsd.tar.gz" + # self.cmds = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]} + # self.uss = {'1': "cirros", '2': "cirros"} + # self.pss = {'1': "cubswin:)", '2': "cubswin:)"} + + +class TestDeployHackfestCirrosScaling(TestDeploy): + description = "Load and deploy Hackfest cirros_2vnf_ns example with scaling modifications" + + def __init__(self): + super().__init__() + self.vnfd_filenames = ("cirros_vnf.tar.gz",) + self.nsd_filename = "cirros_2vnf_ns.tar.gz" + + def create_descriptors(self, engine): + super().create_descriptors(engine) + # Modify VNFD to add scaling and count=2 + payload = """ + vdu: + "$id: 'cirros_vnfd-VM'": + count: 2 + scaling-group-descriptor: + - name: "scale_cirros" + max-instance-count: 2 + vdu: + - vdu-id-ref: cirros_vnfd-VM + count: 2 + """ + engine.test("DEPLOY{}".format(self.step), "Edit VNFD ", "PATCH", + "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfds_id[0]), + headers_yaml, payload, 204, None, None) + self.step += 1 + + def aditional_operations(self, engine, test_osm, manual_check): + if not test_osm: + return + # 2 perform scale out twice + payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_OUT, scaleByStepData: ' \ + '{scaling-group-descriptor: scale_cirros, member-vnf-index: "1"}}}' + for i in range(0, 2): + engine.test("DEPLOY{}".format(self.step), "Execute scale action over NS", "POST", + "/nslcm/v1/ns_instances/<{}>/scale".format(self.ns_test), headers_yaml, payload, + 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml") + nslcmop2_scale_out = "DEPLOY{}".format(self.step) + self._wait_nslcmop_ready(engine, nslcmop2_scale_out, timeout_deploy) + if manual_check: + input('NS scale out done. Check that two more vdus are there') + # TODO check automatic + + # 2 perform scale in + payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_IN, scaleByStepData: ' \ + '{scaling-group-descriptor: scale_cirros, member-vnf-index: "1"}}}' + for i in range(0, 2): + engine.test("DEPLOY{}".format(self.step), "Execute scale IN action over NS", "POST", + "/nslcm/v1/ns_instances/<{}>/scale".format(self.ns_test), headers_yaml, payload, + 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml") + nslcmop2_scale_in = "DEPLOY{}".format(self.step) + self._wait_nslcmop_ready(engine, nslcmop2_scale_in, timeout_deploy) + if manual_check: + input('NS scale in done. Check that two less vdus are there') + # TODO check automatic + + # perform scale in that must fail as reached limit + engine.test("DEPLOY{}".format(self.step), "Execute scale IN out of limit action over NS", "POST", + "/nslcm/v1/ns_instances/<{}>/scale".format(self.ns_test), headers_yaml, payload, + 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml") + nslcmop2_scale_in = "DEPLOY{}".format(self.step) + self._wait_nslcmop_ready(engine, nslcmop2_scale_in, timeout_deploy, expected_fail=True) + + class TestDeployIpMac(TestDeploy): description = "Load and deploy descriptor examples setting mac, ip address at descriptor and instantiate params" @@ -1094,7 +1226,7 @@ class TestDeployHackfest3Charmed(TestDeploy): if manual_check: input('NS service primitive has been executed. Check that file /home/ubuntu/OSMTESTNBI is present at ' 'TODO_PUT_IP') - else: + elif test_osm: cmds = {'1': [''], '2': ['ls -lrt /home/ubuntu/OSMTESTNBI', ]} uss = {'1': "ubuntu", '2': "ubuntu"} pss = {'1': "osm4u", '2': "osm4u"} @@ -1125,6 +1257,433 @@ class TestDeployHackfest3Charmed(TestDeploy): # # TODO check automatic +class TestDeploySingleVdu(TestDeployHackfest3Charmed): + description = "Generate a single VDU base on editing Hackfest3Charmed descriptors and deploy" + + def __init__(self): + super().__init__() + self.qforce = "?FORCE=True" + self.descriptor_edit = { + # Modify VNFD to remove one VDU + "vnfd0": { + "vdu": { + "$[0]": { + "interface": {"$[0]": {"external-connection-point-ref": "pdu-mgmt"}} + }, + "$[1]": None + }, + "vnf-configuration": None, + "connection-point": { + "$[0]": { + "id": "pdu-mgmt", + "name": "pdu-mgmt", + "short-name": "pdu-mgmt" + }, + "$[1]": None + }, + "mgmt-interface": {"cp": "pdu-mgmt"}, + "description": "A vnf single vdu to be used as PDU", + "id": "vdu-as-pdu", + "internal-vld": { + "$[0]": { + "id": "pdu_internal", + "name": "pdu_internal", + "internal-connection-point": {"$[1]": None}, + "short-name": "pdu_internal", + "type": "ELAN" + } + } + }, + + # Modify NSD accordingly + "nsd": { + "constituent-vnfd": { + "$[0]": {"vnfd-id-ref": "vdu-as-pdu"}, + "$[1]": None, + }, + "description": "A nsd to deploy the vnf to act as as PDU", + "id": "nsd-as-pdu", + "name": "nsd-as-pdu", + "short-name": "nsd-as-pdu", + "vld": { + "$[0]": { + "id": "mgmt_pdu", + "name": "mgmt_pdu", + "short-name": "mgmt_pdu", + "vnfd-connection-point-ref": { + "$[0]": { + "vnfd-connection-point-ref": "pdu-mgmt", + "vnfd-id-ref": "vdu-as-pdu", + }, + "$[1]": None + }, + "type": "ELAN" + }, + "$[1]": None, + } + } + } + + +class TestDeployHnfd(TestDeployHackfest3Charmed): + description = "Generate a HNFD base on editing Hackfest3Charmed descriptors and deploy" + + def __init__(self): + super().__init__() + self.pduDeploy = TestDeploySingleVdu() + self.pdu_interface_0 = {} + self.pdu_interface_1 = {} + + self.pdu_id = None + # self.vnf_to_pdu = """ + # vdu: + # "$[0]": + # pdu-type: PDU-TYPE-1 + # interface: + # "$[0]": + # name: mgmt-iface + # "$[1]": + # name: pdu-iface-internal + # id: hfn1 + # description: HFND, one PDU + One VDU + # name: hfn1 + # short-name: hfn1 + # + # """ + + self.pdu_descriptor = { + "name": "my-PDU", + "type": "PDU-TYPE-1", + "vim_accounts": "to-override", + "interfaces": [ + { + "name": "mgmt-iface", + "mgmt": True, + "type": "overlay", + "ip-address": "to override", + "mac-address": "mac_address", + "vim-network-name": "mgmt", + }, + { + "name": "pdu-iface-internal", + "mgmt": False, + "type": "overlay", + "ip-address": "to override", + "mac-address": "mac_address", + "vim-network-name": "pdu_internal", # OSMNBITEST-PDU-pdu_internal + }, + ] + } + self.vnfd_filenames = ("hackfest_3charmed_vnfd.tar.gz", "hackfest_3charmed_vnfd.tar.gz") + + self.descriptor_edit = { + "vnfd0": { + "id": "hfnd1", + "name": "hfn1", + "short-name": "hfn1", + "vdu": { + "$[0]": { + "pdu-type": "PDU-TYPE-1", + "interface": { + "$[0]": {"name": "mgmt-iface"}, + "$[1]": {"name": "pdu-iface-internal"}, + } + } + } + }, + "nsd": { + "constituent-vnfd": { + "$[1]": {"vnfd-id-ref": "hfnd1"} + } + } + } + + def create_descriptors(self, engine): + super().create_descriptors(engine) + + # Create PDU + self.pdu_descriptor["interfaces"][0].update(self.pdu_interface_0) + self.pdu_descriptor["interfaces"][1].update(self.pdu_interface_1) + self.pdu_descriptor["vim_accounts"] = [self.vim_id] + # TODO get vim-network-name from vnfr.vld.name + self.pdu_descriptor["interfaces"][1]["vim-network-name"] = "{}-{}-{}".format( + os.environ.get("OSMNBITEST_NS_NAME", "OSMNBITEST"), + "PDU", self.pdu_descriptor["interfaces"][1]["vim-network-name"]) + test_name = "DEPLOY{}".format(self.step) + engine.test(test_name, "Onboard PDU descriptor", "POST", "/pdu/v1/pdu_descriptors", + {"Location": "/pdu/v1/pdu_descriptors/", "Content-Type": "application/yaml"}, self.pdu_descriptor, + 201, r_header_yaml, "yaml") + self.pdu_id = engine.test_ids["last_id"] + self.step += 1 + + def run(self, engine, test_osm, manual_check, test_params=None): + engine.get_autorization() + nsname = os.environ.get("OSMNBITEST_NS_NAME", "OSMNBITEST") + + # create real VIM if not exist + self.vim_id = engine.get_create_vim(test_osm) + # instanciate PDU + self.pduDeploy.create_descriptors(engine) + self.pduDeploy.instantiate(engine, {"nsDescription": "to be used as PDU", "nsName": nsname + "-PDU", + "nsdId": self.pduDeploy.nsd_id, "vimAccountId": self.vim_id}) + if manual_check: + input('VNF to be used as PDU has been deployed. Perform manual check and press enter to resume') + elif test_osm: + self.pduDeploy.test_ns(engine, test_osm, self.pduDeploy.cmds, self.pduDeploy.uss, self.pduDeploy.pss, + self.pduDeploy.keys, self.pduDeploy.timeout) + + if test_osm: + r = engine.test("DEPLOY{}".format(self.step), "GET IP_ADDRESS OF VNFR", "GET", + "/nslcm/v1/vnfrs?nsr-id-ref={}".format(self.pduDeploy.ns_id), headers_json, None, + 200, r_header_json, "json") + self.step += 1 + vnfr_data = r.json() + # print(vnfr_data) + + self.pdu_interface_0["ip-address"] = vnfr_data[0]["vdur"][0]["interfaces"][0].get("ip-address") + self.pdu_interface_1["ip-address"] = vnfr_data[0]["vdur"][0]["interfaces"][1].get("ip-address") + self.pdu_interface_0["mac-address"] = vnfr_data[0]["vdur"][0]["interfaces"][0].get("mac-address") + self.pdu_interface_1["mac-address"] = vnfr_data[0]["vdur"][0]["interfaces"][1].get("mac-address") + if not self.pdu_interface_0["ip-address"]: + raise TestException("Vnfr has not managment ip address") + else: + self.pdu_interface_0["ip-address"] = "192.168.10.10" + self.pdu_interface_1["ip-address"] = "192.168.11.10" + self.pdu_interface_0["mac-address"] = "52:33:44:55:66:13" + self.pdu_interface_1["mac-address"] = "52:33:44:55:66:14" + + self.create_descriptors(engine) + + ns_data = {"nsDescription": "default description", "nsName": nsname, "nsdId": self.nsd_id, + "vimAccountId": self.vim_id} + if test_params and test_params.get("ns-config"): + if isinstance(test_params["ns-config"], str): + ns_data.update(yaml.load(test_params["ns-config"])) + else: + ns_data.update(test_params["ns-config"]) + + self.instantiate(engine, ns_data) + if manual_check: + input('NS has been deployed. Perform manual check and press enter to resume') + elif test_osm: + self.test_ns(engine, test_osm, self.cmds, self.uss, self.pss, self.keys, self.timeout) + self.aditional_operations(engine, test_osm, manual_check) + self.terminate(engine) + self.pduDeploy.terminate(engine) + self.delete_descriptors(engine) + self.pduDeploy.delete_descriptors(engine) + + self.step += 1 + + self.print_results() + + def delete_descriptors(self, engine): + super().delete_descriptors(engine) + # delete pdu + engine.test("DEPLOY{}".format(self.step), "Delete PDU SOL005", "DELETE", + "/pdu/v1/pdu_descriptors/{}".format(self.pdu_id), + headers_yaml, None, 204, None, 0) + + +class TestDescriptors: + description = "Test VNFD, NSD, PDU descriptors CRUD and dependencies" + + def __init__(self): + self.step = 0 + self.vnfd_filename = "hackfest_3charmed_vnfd.tar.gz" + self.nsd_filename = "hackfest_3charmed_nsd.tar.gz" + self.descriptor_url = "https://osm-download.etsi.org/ftp/osm-3.0-three/2nd-hackfest/packages/" + self.vnfd_id = None + self.nsd_id = None + + def run(self, engine, test_osm, manual_check, test_params=None): + engine.get_autorization() + temp_dir = os.path.dirname(os.path.abspath(__file__)) + "/temp/" + if not os.path.exists(temp_dir): + os.makedirs(temp_dir) + + # download files + for filename in (self.vnfd_filename, self.nsd_filename): + filename_path = temp_dir + filename + if not os.path.exists(filename_path): + with open(filename_path, "wb") as file: + response = requests.get(self.descriptor_url + filename) + if response.status_code >= 300: + raise TestException("Error downloading descriptor from '{}': {}".format( + self.descriptor_url + filename, response.status_code)) + file.write(response.content) + + vnfd_filename_path = temp_dir + self.vnfd_filename + nsd_filename_path = temp_dir + self.nsd_filename + + # vnfd CREATE AND UPLOAD in one step: + test_name = "DESCRIPTOR{}".format(self.step) + engine.test(test_name, "Onboard VNFD in one step", "POST", + "/vnfpkgm/v1/vnf_packages_content", headers_zip_yaml, "@b" + vnfd_filename_path, 201, + {"Location": "/vnfpkgm/v1/vnf_packages_content/", "Content-Type": "application/yaml"}, "yaml") + self.vnfd_id = engine.test_ids["last_id"] + self.step += 1 + + # get vnfd descriptor + engine.test("DESCRIPTOR" + str(self.step), "Get VNFD descriptor", "GET", + "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_id), headers_yaml, None, 200, r_header_yaml, "yaml") + self.step += 1 + + # get vnfd file descriptor + engine.test("DESCRIPTOR" + str(self.step), "Get VNFD file descriptor", "GET", + "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(self.vnfd_id), headers_text, None, 200, + r_header_text, "text", temp_dir+"vnfd-yaml") + self.step += 1 + # TODO compare files: diff vnfd-yaml hackfest_3charmed_vnfd/hackfest_3charmed_vnfd.yaml + + # get vnfd zip file package + engine.test("DESCRIPTOR" + str(self.step), "Get VNFD zip package", "GET", + "/vnfpkgm/v1/vnf_packages/{}/package_content".format(self.vnfd_id), headers_zip, None, 200, + r_header_zip, "zip", temp_dir+"vnfd-zip") + self.step += 1 + # TODO compare files: diff vnfd-zip hackfest_3charmed_vnfd.tar.gz + + # get vnfd artifact + engine.test("DESCRIPTOR" + str(self.step), "Get VNFD artifact package", "GET", + "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/osm.png".format(self.vnfd_id), headers_zip, None, 200, + r_header_octect, "octet-string", temp_dir+"vnfd-icon") + self.step += 1 + # TODO compare files: diff vnfd-icon hackfest_3charmed_vnfd/icons/osm.png + + # nsd CREATE AND UPLOAD in one step: + test_name = "DESCRIPTOR{}".format(self.step) + engine.test(test_name, "Onboard NSD in one step", "POST", + "/nsd/v1/ns_descriptors_content", headers_zip_yaml, "@b" + nsd_filename_path, 201, + {"Location": "/nsd/v1/ns_descriptors_content/", "Content-Type": "application/yaml"}, "yaml") + self.nsd_id = engine.test_ids["last_id"] + self.step += 1 + + # get nsd descriptor + engine.test("DESCRIPTOR" + str(self.step), "Get NSD descriptor", "GET", + "/nsd/v1/ns_descriptors/{}".format(self.nsd_id), headers_yaml, None, 200, r_header_yaml, "yaml") + self.step += 1 + + # get nsd file descriptor + engine.test("DESCRIPTOR" + str(self.step), "Get NSD file descriptor", "GET", + "/nsd/v1/ns_descriptors/{}/nsd".format(self.nsd_id), headers_text, None, 200, + r_header_text, "text", temp_dir+"nsd-yaml") + self.step += 1 + # TODO compare files: diff nsd-yaml hackfest_3charmed_nsd/hackfest_3charmed_nsd.yaml + + # get nsd zip file package + engine.test("DESCRIPTOR" + str(self.step), "Get NSD zip package", "GET", + "/nsd/v1/ns_descriptors/{}/nsd_content".format(self.nsd_id), headers_zip, None, 200, + r_header_zip, "zip", temp_dir+"nsd-zip") + self.step += 1 + # TODO compare files: diff nsd-zip hackfest_3charmed_nsd.tar.gz + + # get nsd artifact + engine.test("DESCRIPTOR" + str(self.step), "Get NSD artifact package", "GET", + "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm.png".format(self.nsd_id), headers_zip, None, 200, + r_header_octect, "octet-string", temp_dir+"nsd-icon") + self.step += 1 + # TODO compare files: diff nsd-icon hackfest_3charmed_nsd/icons/osm.png + + # vnfd DELETE + test_rest.test("DESCRIPTOR" + str(self.step), "Delete VNFD conflict", "DELETE", + "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_id), headers_yaml, None, 409, None, None) + self.step += 1 + + test_rest.test("DESCRIPTOR" + str(self.step), "Delete VNFD force", "DELETE", + "/vnfpkgm/v1/vnf_packages/{}?FORCE=TRUE".format(self.vnfd_id), headers_yaml, None, 204, None, 0) + self.step += 1 + + # nsd DELETE + test_rest.test("DESCRIPTOR" + str(self.step), "Delete NSD", "DELETE", + "/nsd/v1/ns_descriptors/{}".format(self.nsd_id), headers_yaml, None, 204, None, 0) + self.step += 1 + + +class TestNetSliceTemplates: + description = "Upload a NST to OSM" + + def __init__(self): + self.nst_filenames = ("@./cirros_slice/cirros_slice.yaml") + + def run(self, engine, test_osm, manual_check, test_params=None): + # nst CREATE + engine.get_autorization() + r = engine.test("NST1", "Onboard NST", "POST", "/nst/v1/netslice_templates_content", headers_yaml, + self.nst_filenames, + 201, {"Location": "/nst/v1/netslice_templates_content", "Content-Type": "application/yaml"}, + "yaml") + location = r.headers["Location"] + nst_id = location[location.rfind("/")+1:] + + # nstd SHOW OSM format + r = engine.test("NST2", "Show NSTD OSM format", "GET", + "/nst/v1/netslice_templates/{}".format(nst_id), headers_json, None, + 200, r_header_json, "json") + + # nstd DELETE + r = engine.test("NST3", "Delete NSTD", "DELETE", + "/nst/v1/netslice_templates/{}".format(nst_id), headers_json, None, + 204, None, 0) + + +class TestNetSliceInstances: + description = "Upload a NST to OSM" + + def __init__(self): + self.nst_filenames = ("@./cirros_slice/cirros_slice.yaml") + + def run(self, engine, test_osm, manual_check, test_params=None): + # nst CREATE + engine.get_autorization() + r = engine.test("NST1", "Onboard NST", "POST", "/nst/v1/netslice_templates_content", headers_yaml, + self.nst_filenames, 201, + {"Location": "/nst/v1/netslice_templates_content", "Content-Type": "application/yaml"}, "yaml") + location = r.headers["Location"] + nst_id = location[location.rfind("/")+1:] + + # nsi CREATE + if test_osm: + self.vim_id = engine.get_create_vim(test_osm) + else: + r = engine.test("VIM1", "Get available VIM", "GET", "/admin/v1/vim_accounts", + headers_json, None, None, r_header_json, "json") + r_json = json.loads(r.text) + vim = r_json[0] + self.vim_id = vim["_id"] + + ns_data = {"nsiDescription": "default description", "nsiName": "my_slice", "nstdId": "cirros_nst", + "vimAccountId": self.vim_id} + ns_data_text = yaml.safe_dump(ns_data, default_flow_style=True, width=256) + + r = engine.test("NSI1", "Onboard NSI", "POST", "/nsilcm/v1/netslice_instances_content", headers_yaml, + ns_data_text, 201, + {"Location": "/nsilcm/v1/netslice_instances_content", "Content-Type": + "application/yaml"}, "yaml") + location = r.headers["Location"] + nsi_id = location[location.rfind("/")+1:] + + # TODO: Improve the wait with a polling if NSI was deployed + wait = 120 + sleep(wait) + + # Check deployment + r = engine.test("NSI2", "Wait until NSI is deployed", "GET", + "/nsilcm/v1/netslice_instances_content/{}".format(nsi_id), headers_json, None, + 200, r_header_json, "json") + + # nsi DELETE + r = engine.test("NSI3", "Delete NSI", "DELETE", + "/nsilcm/v1/netslice_instances_content/{}".format(nsi_id), headers_json, None, + 202, r_header_json, "json") + + sleep(60) + + # nstd DELETE + r = engine.test("NST2", "Delete NSTD", "DELETE", + "/nst/v1/netslice_templates/{}".format(nst_id), headers_json, None, + 204, None, 0) + + if __name__ == "__main__": global logger test = "" @@ -1152,10 +1711,17 @@ if __name__ == "__main__": "VIM-SDN": TestVIMSDN, "Deploy-Custom": TestDeploy, "Deploy-Hackfest-Cirros": TestDeployHackfestCirros, + "Deploy-Hackfest-Cirros-Scaling": TestDeployHackfestCirrosScaling, "Deploy-Hackfest-3Charmed": TestDeployHackfest3Charmed, "Deploy-Hackfest-4": TestDeployHackfest4, "Deploy-CirrosMacIp": TestDeployIpMac, + "TestDescriptors": TestDescriptors, + "TestDeployHackfest1": TestDeployHackfest1, # "Deploy-MultiVIM": TestDeployMultiVIM, + "DeploySingleVdu": TestDeploySingleVdu, + "DeployHnfd": TestDeployHnfd, + "Upload-Slice-Template": TestNetSliceTemplates, + "Deploy-Slice-Instance": TestNetSliceInstances, } test_to_do = [] test_params = {} @@ -1233,136 +1799,6 @@ if __name__ == "__main__": exit(0) # get token - - # # tests once authorized - # for t in test_authorized_list: - # test_rest.test(*t) - # - # # tests admin - # for t in test_admin_list1: - # test_rest.test(*t) - # - # # vnfd CREATE - # r = test_rest.test("VNFD1", "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json, None, - # 201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json") - # location = r.headers["Location"] - # vnfd_id = location[location.rfind("/")+1:] - # # print(location, vnfd_id) - # - # # vnfd UPLOAD test - # r = test_rest.test("VNFD2", "Onboard VNFD step 2 as TEXT", "PUT", - # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id), - # r_header_text, "@./cirros_vnf/cirros_vnfd.yaml", 204, None, 0) - # - # # vnfd SHOW OSM format - # r = test_rest.test("VNFD3", "Show VNFD OSM format", "GET", - # "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id), - # headers_json, None, 200, r_header_json, "json") - # - # # vnfd SHOW text - # r = test_rest.test("VNFD4", "Show VNFD SOL005 text", "GET", - # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id), - # headers_text, None, 200, r_header_text, "text") - # - # # vnfd UPLOAD ZIP - # makedirs("temp", exist_ok=True) - # tar = tarfile.open("temp/cirros_vnf.tar.gz", "w:gz") - # tar.add("cirros_vnf") - # tar.close() - # r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT", - # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id), - # r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0) - # - # # vnfd SHOW OSM format - # r = test_rest.test("VNFD6", "Show VNFD OSM format", "GET", - # "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id), - # headers_json, None, 200, r_header_json, "json") - # - # # vnfd SHOW zip - # r = test_rest.test("VNFD7", "Show VNFD SOL005 zip", "GET", - # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id), - # headers_zip, None, 200, r_header_zip, "zip") - # # vnfd SHOW descriptor - # r = test_rest.test("VNFD8", "Show VNFD descriptor", "GET", - # "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(vnfd_id), - # headers_text, None, 200, r_header_text, "text") - # # vnfd SHOW actifact - # r = test_rest.test("VNFD9", "Show VNFD artifact", "GET", - # "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/cirros-64.png".format(vnfd_id), - # headers_text, None, 200, r_header_octect, "text") - # - # # # vnfd DELETE - # # r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE", - # # "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id), - # # headers_yaml, None, 204, None, 0) - # - # # nsd CREATE - # r = test_rest.test("NSD1", "Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json, None, - # 201, {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json") - # location = r.headers["Location"] - # nsd_id = location[location.rfind("/")+1:] - # # print(location, nsd_id) - # - # # nsd UPLOAD test - # r = test_rest.test("NSD2", "Onboard NSD with missing vnfd", "PUT", - # "/nsd/v1/ns_descriptors/<>/nsd_content?constituent-vnfd.0.vnfd-id-ref" - # "=NONEXISTING-VNFD".format(nsd_id), - # r_header_text, "@./cirros_ns/cirros_nsd.yaml", 409, r_header_yaml, "yaml") - # - # # # VNF_CREATE - # # r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT", - # # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id), - # # r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0) - # - # r = test_rest.test("NSD2", "Onboard NSD step 2 as TEXT", "PUT", - # "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id), - # r_header_text, "@./cirros_ns/cirros_nsd.yaml", 204, None, 0) - # - # # nsd SHOW OSM format - # r = test_rest.test("NSD3", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id), - # headers_json, None, 200, r_header_json, "json") - # - # # nsd SHOW text - # r = test_rest.test("NSD4", "Show NSD SOL005 text", "GET", - # "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id), - # headers_text, None, 200, r_header_text, "text") - # - # # nsd UPLOAD ZIP - # makedirs("temp", exist_ok=True) - # tar = tarfile.open("temp/cirros_ns.tar.gz", "w:gz") - # tar.add("cirros_ns") - # tar.close() - # r = test_rest.test("NSD5", "Onboard NSD step 3 replace with ZIP", "PUT", - # "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id), - # r_header_zip, "@b./temp/cirros_ns.tar.gz", 204, None, 0) - # - # # nsd SHOW OSM format - # r = test_rest.test("NSD6", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id), - # headers_json, None, 200, r_header_json, "json") - # - # # nsd SHOW zip - # r = test_rest.test("NSD7","Show NSD SOL005 zip","GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id), - # headers_zip, None, 200, r_header_zip, "zip") - # - # # nsd SHOW descriptor - # r = test_rest.test("NSD8", "Show NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(nsd_id), - # headers_text, None, 200, r_header_text, "text") - # # nsd SHOW actifact - # r = test_rest.test("NSD9", "Show NSD artifact", "GET", - # "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm_2x.png".format(nsd_id), - # headers_text, None, 200, r_header_octect, "text") - # - # # vnfd DELETE - # r = test_rest.test("VNFD10", "Delete VNFD conflict", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id), - # headers_yaml, None, 409, r_header_yaml, "yaml") - # - # # nsd DELETE - # r = test_rest.test("NSD10", "Delete NSD SOL005 text", "DELETE", "/nsd/v1/ns_descriptors/{}".format(nsd_id), - # headers_yaml, None, 204, None, 0) - # - # # vnfd DELETE - # r = test_rest.test("VNFD10","Delete VNFD SOL005 text","DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id), - # headers_yaml, None, 204, None, 0) print("PASS") except TestException as e: